Code

rrdcached: move queue length decrement into remove_from_queue
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111  * Types
112  */
113 typedef enum
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
159 struct callback_flush_data_s
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree          *cache_tree = NULL;
198 static cache_item_t   *cache_queue_head = NULL;
199 static cache_item_t   *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter   = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /* 
234  * Functions
235  */
236 static void sig_common (const char *sig) /* {{{ */
238   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239   do_shutdown++;
240   pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
245   sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
250   sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
255   config_flush_at_shutdown = 1;
256   sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
261   config_flush_at_shutdown = 0;
262   sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
267   /* These structures are static, because `sigaction' behaves weird if the are
268    * overwritten.. */
269   static struct sigaction sa_int;
270   static struct sigaction sa_term;
271   static struct sigaction sa_pipe;
272   static struct sigaction sa_usr1;
273   static struct sigaction sa_usr2;
275   /* Install signal handlers */
276   memset (&sa_int, 0, sizeof (sa_int));
277   sa_int.sa_handler = sig_int_handler;
278   sigaction (SIGINT, &sa_int, NULL);
280   memset (&sa_term, 0, sizeof (sa_term));
281   sa_term.sa_handler = sig_term_handler;
282   sigaction (SIGTERM, &sa_term, NULL);
284   memset (&sa_pipe, 0, sizeof (sa_pipe));
285   sa_pipe.sa_handler = SIG_IGN;
286   sigaction (SIGPIPE, &sa_pipe, NULL);
288   memset (&sa_pipe, 0, sizeof (sa_usr1));
289   sa_usr1.sa_handler = sig_usr1_handler;
290   sigaction (SIGUSR1, &sa_usr1, NULL);
292   memset (&sa_usr2, 0, sizeof (sa_usr2));
293   sa_usr2.sa_handler = sig_usr2_handler;
294   sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
300   int fd;
301   char *file;
303   file = (config_pid_file != NULL)
304     ? config_pid_file
305     : LOCALSTATEDIR "/run/rrdcached.pid";
307   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308   if (fd < 0)
309     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310             action, file, rrd_strerror(errno));
312   return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
318   int pid_fd;
319   pid_t pid;
320   char pid_str[16];
322   pid_fd = open_pidfile("open", O_RDWR);
323   if (pid_fd < 0)
324     return pid_fd;
326   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327     return -1;
329   pid = atoi(pid_str);
330   if (pid <= 0)
331     return -1;
333   /* another running process that we can signal COULD be
334    * a competing rrdcached */
335   if (pid != getpid() && kill(pid, 0) == 0)
336   {
337     fprintf(stderr,
338             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339     close(pid_fd);
340     return -1;
341   }
343   lseek(pid_fd, 0, SEEK_SET);
344   ftruncate(pid_fd, 0);
346   fprintf(stderr,
347           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348           "rrdcached: starting normally.\n", pid);
350   return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
355   pid_t pid;
356   FILE *fh;
358   pid = getpid ();
360   fh = fdopen (fd, "w");
361   if (fh == NULL)
362   {
363     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364     close(fd);
365     return (-1);
366   }
368   fprintf (fh, "%i\n", (int) pid);
369   fclose (fh);
371   return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
376   char *file;
377   int status;
379   file = (config_pid_file != NULL)
380     ? config_pid_file
381     : LOCALSTATEDIR "/run/rrdcached.pid";
383   status = unlink (file);
384   if (status == 0)
385     return (0);
386   return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
391   char *eol;
393   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394                sock->next_read - sock->next_cmd);
396   if (eol == NULL)
397   {
398     /* no commands left, move remainder back to front of rbuf */
399     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400             sock->next_read - sock->next_cmd);
401     sock->next_read -= sock->next_cmd;
402     sock->next_cmd = 0;
403     *len = 0;
404     return NULL;
405   }
406   else
407   {
408     char *cmd = sock->rbuf + sock->next_cmd;
409     *eol = '\0';
411     sock->next_cmd = eol - sock->rbuf + 1;
413     if (eol > sock->rbuf && *(eol-1) == '\r')
414       *(--eol) = '\0'; /* handle "\r\n" EOL */
416     *len = eol - cmd;
418     return cmd;
419   }
421   /* NOTREACHED */
422   assert(1==0);
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
428   char *new_buf;
430   assert(sock != NULL);
432   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
433   if (new_buf == NULL)
434   {
435     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436     return -1;
437   }
439   strncpy(new_buf + sock->wbuf_len, str, len + 1);
441   sock->wbuf = new_buf;
442   sock->wbuf_len += len;
444   return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
450   va_list argp;
451   char buffer[CMD_MAX];
452   int len;
454   if (sock == NULL) return 0; /* journal replay mode */
455   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461   len = vsprintf(buffer, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465   {
466     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467     return -1;
468   }
470   return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
475   int lines = 0;
477   if (str != NULL)
478   {
479     while ((str = strchr(str, '\n')) != NULL)
480     {
481       ++lines;
482       ++str;
483     }
484   }
486   return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490  * returns 0 on success, -1 on error
491  * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493                           char *fmt, ...) /* {{{ */
495   va_list argp;
496   char buffer[CMD_MAX];
497   int lines;
498   ssize_t wrote;
499   int rclen, len;
501   if (sock == NULL) return rc;  /* journal replay mode */
503   if (sock->batch_start)
504   {
505     if (rc == RESP_OK)
506       return rc; /* no response on success during BATCH */
507     lines = sock->batch_cmd;
508   }
509   else if (rc == RESP_OK)
510     lines = count_lines(sock->wbuf);
511   else
512     lines = -1;
514   rclen = sprintf(buffer, "%d ", lines);
515   va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519   len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521   va_end(argp);
522   if (len < 0)
523     return -1;
525   len += rclen;
527   /* append the result to the wbuf, don't write to the user */
528   if (sock->batch_start)
529     return add_to_wbuf(sock, buffer, len);
531   /* first write must be complete */
532   if (len != write(sock->fd, buffer, len))
533   {
534     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535     return -1;
536   }
538   if (sock->wbuf != NULL && rc == RESP_OK)
539   {
540     wrote = 0;
541     while (wrote < sock->wbuf_len)
542     {
543       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544       if (wb <= 0)
545       {
546         RRDD_LOG(LOG_INFO, "send_response: could not write results");
547         return -1;
548       }
549       wrote += wb;
550     }
551   }
553   free(sock->wbuf); sock->wbuf = NULL;
554   sock->wbuf_len = 0;
556   return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
561   ci->values = NULL;
562   ci->values_num = 0;
564   ci->last_flush_time = when;
565   if (config_write_jitter > 0)
566     ci->last_flush_time += (random() % config_write_jitter);
569 /* remove_from_queue
570  * remove a "cache_item_t" item from the queue.
571  * must hold 'cache_lock' when calling this
572  */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
575   if (ci == NULL) return;
576   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578   if (ci->prev == NULL)
579     cache_queue_head = ci->next; /* reset head */
580   else
581     ci->prev->next = ci->next;
583   if (ci->next == NULL)
584     cache_queue_tail = ci->prev; /* reset the tail */
585   else
586     ci->next->prev = ci->prev;
588   ci->next = ci->prev = NULL;
589   ci->flags &= ~CI_FLAGS_IN_QUEUE;
591   pthread_mutex_lock (&stats_lock);
592   assert (stats_queue_length > 0);
593   stats_queue_length--;
594   pthread_mutex_unlock (&stats_lock);
596 } /* }}} static void remove_from_queue */
598 /* free the resources associated with the cache_item_t
599  * must hold cache_lock when calling this function
600  */
601 static void *free_cache_item(cache_item_t *ci) /* {{{ */
603   if (ci == NULL) return NULL;
605   remove_from_queue(ci);
607   for (int i=0; i < ci->values_num; i++)
608     free(ci->values[i]);
610   free (ci->values);
611   free (ci->file);
613   /* in case anyone is waiting */
614   pthread_cond_broadcast(&ci->flushed);
616   free (ci);
618   return NULL;
619 } /* }}} static void *free_cache_item */
621 /*
622  * enqueue_cache_item:
623  * `cache_lock' must be acquired before calling this function!
624  */
625 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
626     queue_side_t side)
628   if (ci == NULL)
629     return (-1);
631   if (ci->values_num == 0)
632     return (0);
634   if (side == HEAD)
635   {
636     if (cache_queue_head == ci)
637       return 0;
639     /* remove if further down in queue */
640     remove_from_queue(ci);
642     ci->prev = NULL;
643     ci->next = cache_queue_head;
644     if (ci->next != NULL)
645       ci->next->prev = ci;
646     cache_queue_head = ci;
648     if (cache_queue_tail == NULL)
649       cache_queue_tail = cache_queue_head;
650   }
651   else /* (side == TAIL) */
652   {
653     /* We don't move values back in the list.. */
654     if (ci->flags & CI_FLAGS_IN_QUEUE)
655       return (0);
657     assert (ci->next == NULL);
658     assert (ci->prev == NULL);
660     ci->prev = cache_queue_tail;
662     if (cache_queue_tail == NULL)
663       cache_queue_head = ci;
664     else
665       cache_queue_tail->next = ci;
667     cache_queue_tail = ci;
668   }
670   ci->flags |= CI_FLAGS_IN_QUEUE;
672   pthread_cond_broadcast(&cache_cond);
673   pthread_mutex_lock (&stats_lock);
674   stats_queue_length++;
675   pthread_mutex_unlock (&stats_lock);
677   return (0);
678 } /* }}} int enqueue_cache_item */
680 /*
681  * tree_callback_flush:
682  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
683  * while this is in progress.
684  */
685 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
686     gpointer data)
688   cache_item_t *ci;
689   callback_flush_data_t *cfd;
691   ci = (cache_item_t *) value;
692   cfd = (callback_flush_data_t *) data;
694   if (ci->flags & CI_FLAGS_IN_QUEUE)
695     return FALSE;
697   if ((ci->last_flush_time <= cfd->abs_timeout)
698       && (ci->values_num > 0))
699   {
700     enqueue_cache_item (ci, TAIL);
701   }
702   else if ((do_shutdown != 0)
703       && (ci->values_num > 0))
704   {
705     enqueue_cache_item (ci, TAIL);
706   }
707   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
708       && (ci->values_num <= 0))
709   {
710     char **temp;
712     temp = (char **) rrd_realloc (cfd->keys,
713         sizeof (char *) * (cfd->keys_num + 1));
714     if (temp == NULL)
715     {
716       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
717       return (FALSE);
718     }
719     cfd->keys = temp;
720     /* Make really sure this points to the _same_ place */
721     assert ((char *) key == ci->file);
722     cfd->keys[cfd->keys_num] = (char *) key;
723     cfd->keys_num++;
724   }
726   return (FALSE);
727 } /* }}} gboolean tree_callback_flush */
729 static int flush_old_values (int max_age)
731   callback_flush_data_t cfd;
732   size_t k;
734   memset (&cfd, 0, sizeof (cfd));
735   /* Pass the current time as user data so that we don't need to call
736    * `time' for each node. */
737   cfd.now = time (NULL);
738   cfd.keys = NULL;
739   cfd.keys_num = 0;
741   if (max_age > 0)
742     cfd.abs_timeout = cfd.now - max_age;
743   else
744     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
746   /* `tree_callback_flush' will return the keys of all values that haven't
747    * been touched in the last `config_flush_interval' seconds in `cfd'.
748    * The char*'s in this array point to the same memory as ci->file, so we
749    * don't need to free them separately. */
750   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
752   for (k = 0; k < cfd.keys_num; k++)
753   {
754     /* should never fail, since we have held the cache_lock
755      * the entire time */
756     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
757   }
759   if (cfd.keys != NULL)
760   {
761     free (cfd.keys);
762     cfd.keys = NULL;
763   }
765   return (0);
766 } /* int flush_old_values */
768 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
770   struct timeval now;
771   struct timespec next_flush;
772   int final_flush = 0; /* make sure we only flush once on shutdown */
774   gettimeofday (&now, NULL);
775   next_flush.tv_sec = now.tv_sec + config_flush_interval;
776   next_flush.tv_nsec = 1000 * now.tv_usec;
778   pthread_mutex_lock (&cache_lock);
779   while ((do_shutdown == 0) || (cache_queue_head != NULL))
780   {
781     cache_item_t *ci;
782     char *file;
783     char **values;
784     int values_num;
785     int status;
786     int i;
788     /* First, check if it's time to do the cache flush. */
789     gettimeofday (&now, NULL);
790     if ((now.tv_sec > next_flush.tv_sec)
791         || ((now.tv_sec == next_flush.tv_sec)
792           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
793     {
794       /* Flush all values that haven't been written in the last
795        * `config_write_interval' seconds. */
796       flush_old_values (config_write_interval);
798       /* Determine the time of the next cache flush. */
799       next_flush.tv_sec =
800         now.tv_sec + next_flush.tv_sec % config_flush_interval;
802       /* unlock the cache while we rotate so we don't block incoming
803        * updates if the fsync() blocks on disk I/O */
804       pthread_mutex_unlock(&cache_lock);
805       journal_rotate();
806       pthread_mutex_lock(&cache_lock);
807     }
809     /* Now, check if there's something to store away. If not, wait until
810      * something comes in or it's time to do the cache flush.  if we are
811      * shutting down, do not wait around.  */
812     if (cache_queue_head == NULL && !do_shutdown)
813     {
814       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
815       if ((status != 0) && (status != ETIMEDOUT))
816       {
817         RRDD_LOG (LOG_ERR, "queue_thread_main: "
818             "pthread_cond_timedwait returned %i.", status);
819       }
820     }
822     /* We're about to shut down */
823     if (do_shutdown != 0 && !final_flush++)
824     {
825       if (config_flush_at_shutdown)
826         flush_old_values (-1); /* flush everything */
827       else
828         break;
829     }
831     /* Check if a value has arrived. This may be NULL if we timed out or there
832      * was an interrupt such as a signal. */
833     if (cache_queue_head == NULL)
834       continue;
836     ci = cache_queue_head;
838     /* copy the relevant parts */
839     file = strdup (ci->file);
840     if (file == NULL)
841     {
842       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
843       continue;
844     }
846     assert(ci->values != NULL);
847     assert(ci->values_num > 0);
849     values = ci->values;
850     values_num = ci->values_num;
852     wipe_ci_values(ci, time(NULL));
853     remove_from_queue(ci);
855     pthread_mutex_unlock (&cache_lock);
857     rrd_clear_error ();
858     status = rrd_update_r (file, NULL, values_num, (void *) values);
859     if (status != 0)
860     {
861       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
862           "rrd_update_r (%s) failed with status %i. (%s)",
863           file, status, rrd_get_error());
864     }
866     journal_write("wrote", file);
867     pthread_cond_broadcast(&ci->flushed);
869     for (i = 0; i < values_num; i++)
870       free (values[i]);
872     free(values);
873     free(file);
875     if (status == 0)
876     {
877       pthread_mutex_lock (&stats_lock);
878       stats_updates_written++;
879       stats_data_sets_written += values_num;
880       pthread_mutex_unlock (&stats_lock);
881     }
883     pthread_mutex_lock (&cache_lock);
885     /* We're about to shut down */
886     if (do_shutdown != 0 && !final_flush++)
887     {
888       if (config_flush_at_shutdown)
889           flush_old_values (-1); /* flush everything */
890       else
891         break;
892     }
893   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
894   pthread_mutex_unlock (&cache_lock);
896   if (config_flush_at_shutdown)
897   {
898     assert(cache_queue_head == NULL);
899     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
900   }
902   journal_done();
904   return (NULL);
905 } /* }}} void *queue_thread_main */
907 static int buffer_get_field (char **buffer_ret, /* {{{ */
908     size_t *buffer_size_ret, char **field_ret)
910   char *buffer;
911   size_t buffer_pos;
912   size_t buffer_size;
913   char *field;
914   size_t field_size;
915   int status;
917   buffer = *buffer_ret;
918   buffer_pos = 0;
919   buffer_size = *buffer_size_ret;
920   field = *buffer_ret;
921   field_size = 0;
923   if (buffer_size <= 0)
924     return (-1);
926   /* This is ensured by `handle_request'. */
927   assert (buffer[buffer_size - 1] == '\0');
929   status = -1;
930   while (buffer_pos < buffer_size)
931   {
932     /* Check for end-of-field or end-of-buffer */
933     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
934     {
935       field[field_size] = 0;
936       field_size++;
937       buffer_pos++;
938       status = 0;
939       break;
940     }
941     /* Handle escaped characters. */
942     else if (buffer[buffer_pos] == '\\')
943     {
944       if (buffer_pos >= (buffer_size - 1))
945         break;
946       buffer_pos++;
947       field[field_size] = buffer[buffer_pos];
948       field_size++;
949       buffer_pos++;
950     }
951     /* Normal operation */ 
952     else
953     {
954       field[field_size] = buffer[buffer_pos];
955       field_size++;
956       buffer_pos++;
957     }
958   } /* while (buffer_pos < buffer_size) */
960   if (status != 0)
961     return (status);
963   *buffer_ret = buffer + buffer_pos;
964   *buffer_size_ret = buffer_size - buffer_pos;
965   *field_ret = field;
967   return (0);
968 } /* }}} int buffer_get_field */
970 /* if we're restricting writes to the base directory,
971  * check whether the file falls within the dir
972  * returns 1 if OK, otherwise 0
973  */
974 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
976   assert(file != NULL);
978   if (!config_write_base_only
979       || sock == NULL /* journal replay */
980       || config_base_dir == NULL)
981     return 1;
983   if (strstr(file, "../") != NULL) goto err;
985   /* relative paths without "../" are ok */
986   if (*file != '/') return 1;
988   /* file must be of the format base + "/" + <1+ char filename> */
989   if (strlen(file) < _config_base_dir_len + 2) goto err;
990   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
991   if (*(file + _config_base_dir_len) != '/') goto err;
993   return 1;
995 err:
996   if (sock != NULL && sock->fd >= 0)
997     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
999   return 0;
1000 } /* }}} static int check_file_access */
1002 /* when using a base dir, convert relative paths to absolute paths.
1003  * if necessary, modifies the "filename" pointer to point
1004  * to the new path created in "tmp".  "tmp" is provided
1005  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1006  *
1007  * this allows us to optimize for the expected case (absolute path)
1008  * with a no-op.
1009  */
1010 static void get_abs_path(char **filename, char *tmp)
1012   assert(tmp != NULL);
1013   assert(filename != NULL && *filename != NULL);
1015   if (config_base_dir == NULL || **filename == '/')
1016     return;
1018   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1019   *filename = tmp;
1020 } /* }}} static int get_abs_path */
1022 /* returns 1 if we have the required privilege level,
1023  * otherwise issue an error to the user on sock */
1024 static int has_privilege (listen_socket_t *sock, /* {{{ */
1025                           socket_privilege priv)
1027   if (sock == NULL) /* journal replay */
1028     return 1;
1030   if (sock->privilege >= priv)
1031     return 1;
1033   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1034 } /* }}} static int has_privilege */
1036 static int flush_file (const char *filename) /* {{{ */
1038   cache_item_t *ci;
1040   pthread_mutex_lock (&cache_lock);
1042   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1043   if (ci == NULL)
1044   {
1045     pthread_mutex_unlock (&cache_lock);
1046     return (ENOENT);
1047   }
1049   if (ci->values_num > 0)
1050   {
1051     /* Enqueue at head */
1052     enqueue_cache_item (ci, HEAD);
1053     pthread_cond_wait(&ci->flushed, &cache_lock);
1054   }
1056   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1057    * may have been purged during our cond_wait() */
1059   pthread_mutex_unlock(&cache_lock);
1061   return (0);
1062 } /* }}} int flush_file */
1064 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1065     char *buffer, size_t buffer_size)
1067   int status;
1068   char **help_text;
1069   char *command;
1071   char *help_help[2] =
1072   {
1073     "Command overview\n"
1074     ,
1075     "HELP [<command>]\n"
1076     "FLUSH <filename>\n"
1077     "FLUSHALL\n"
1078     "PENDING <filename>\n"
1079     "FORGET <filename>\n"
1080     "QUEUE\n"
1081     "UPDATE <filename> <values> [<values> ...]\n"
1082     "BATCH\n"
1083     "STATS\n"
1084     "QUIT\n"
1085   };
1087   char *help_flush[2] =
1088   {
1089     "Help for FLUSH\n"
1090     ,
1091     "Usage: FLUSH <filename>\n"
1092     "\n"
1093     "Adds the given filename to the head of the update queue and returns\n"
1094     "after is has been dequeued.\n"
1095   };
1097   char *help_flushall[2] =
1098   {
1099     "Help for FLUSHALL\n"
1100     ,
1101     "Usage: FLUSHALL\n"
1102     "\n"
1103     "Triggers writing of all pending updates.  Returns immediately.\n"
1104   };
1106   char *help_pending[2] =
1107   {
1108     "Help for PENDING\n"
1109     ,
1110     "Usage: PENDING <filename>\n"
1111     "\n"
1112     "Shows any 'pending' updates for a file, in order.\n"
1113     "The updates shown have not yet been written to the underlying RRD file.\n"
1114   };
1116   char *help_forget[2] =
1117   {
1118     "Help for FORGET\n"
1119     ,
1120     "Usage: FORGET <filename>\n"
1121     "\n"
1122     "Removes the file completely from the cache.\n"
1123     "Any pending updates for the file will be lost.\n"
1124   };
1126   char *help_queue[2] =
1127   {
1128     "Help for QUEUE\n"
1129     ,
1130     "Shows all files in the output queue.\n"
1131     "The output is zero or more lines in the following format:\n"
1132     "(where <num_vals> is the number of values to be written)\n"
1133     "\n"
1134     "<num_vals> <filename>\n"
1135     "\n"
1136   };
1138   char *help_update[2] =
1139   {
1140     "Help for UPDATE\n"
1141     ,
1142     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1143     "\n"
1144     "Adds the given file to the internal cache if it is not yet known and\n"
1145     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1146     "for details.\n"
1147     "\n"
1148     "Each <values> has the following form:\n"
1149     "  <values> = <time>:<value>[:<value>[...]]\n"
1150     "See the rrdupdate(1) manpage for details.\n"
1151   };
1153   char *help_stats[2] =
1154   {
1155     "Help for STATS\n"
1156     ,
1157     "Usage: STATS\n"
1158     "\n"
1159     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1160     "a description of the values.\n"
1161   };
1163   char *help_batch[2] =
1164   {
1165     "Help for BATCH\n"
1166     ,
1167     "The 'BATCH' command permits the client to initiate a bulk load\n"
1168     "   of commands to rrdcached.\n"
1169     "\n"
1170     "Usage:\n"
1171     "\n"
1172     "    client: BATCH\n"
1173     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1174     "    client: command #1\n"
1175     "    client: command #2\n"
1176     "    client: ... and so on\n"
1177     "    client: .\n"
1178     "    server: 2 errors\n"
1179     "    server: 7 message for command #7\n"
1180     "    server: 9 message for command #9\n"
1181     "\n"
1182     "For more information, consult the rrdcached(1) documentation.\n"
1183   };
1185   char *help_quit[2] =
1186   {
1187     "Help for QUIT\n"
1188     ,
1189     "Disconnect from rrdcached.\n"
1190   };
1192   status = buffer_get_field (&buffer, &buffer_size, &command);
1193   if (status != 0)
1194     help_text = help_help;
1195   else
1196   {
1197     if (strcasecmp (command, "update") == 0)
1198       help_text = help_update;
1199     else if (strcasecmp (command, "flush") == 0)
1200       help_text = help_flush;
1201     else if (strcasecmp (command, "flushall") == 0)
1202       help_text = help_flushall;
1203     else if (strcasecmp (command, "pending") == 0)
1204       help_text = help_pending;
1205     else if (strcasecmp (command, "forget") == 0)
1206       help_text = help_forget;
1207     else if (strcasecmp (command, "queue") == 0)
1208       help_text = help_queue;
1209     else if (strcasecmp (command, "stats") == 0)
1210       help_text = help_stats;
1211     else if (strcasecmp (command, "batch") == 0)
1212       help_text = help_batch;
1213     else if (strcasecmp (command, "quit") == 0)
1214       help_text = help_quit;
1215     else
1216       help_text = help_help;
1217   }
1219   add_response_info(sock, help_text[1]);
1220   return send_response(sock, RESP_OK, help_text[0]);
1221 } /* }}} int handle_request_help */
1223 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1225   uint64_t copy_queue_length;
1226   uint64_t copy_updates_received;
1227   uint64_t copy_flush_received;
1228   uint64_t copy_updates_written;
1229   uint64_t copy_data_sets_written;
1230   uint64_t copy_journal_bytes;
1231   uint64_t copy_journal_rotate;
1233   uint64_t tree_nodes_number;
1234   uint64_t tree_depth;
1236   pthread_mutex_lock (&stats_lock);
1237   copy_queue_length       = stats_queue_length;
1238   copy_updates_received   = stats_updates_received;
1239   copy_flush_received     = stats_flush_received;
1240   copy_updates_written    = stats_updates_written;
1241   copy_data_sets_written  = stats_data_sets_written;
1242   copy_journal_bytes      = stats_journal_bytes;
1243   copy_journal_rotate     = stats_journal_rotate;
1244   pthread_mutex_unlock (&stats_lock);
1246   pthread_mutex_lock (&cache_lock);
1247   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1248   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1249   pthread_mutex_unlock (&cache_lock);
1251   add_response_info(sock,
1252                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1253   add_response_info(sock,
1254                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1255   add_response_info(sock,
1256                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1257   add_response_info(sock,
1258                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1259   add_response_info(sock,
1260                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1261   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1262   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1263   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1264   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1266   send_response(sock, RESP_OK, "Statistics follow\n");
1268   return (0);
1269 } /* }}} int handle_request_stats */
1271 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1272     char *buffer, size_t buffer_size)
1274   char *file, file_tmp[PATH_MAX];
1275   int status;
1277   status = buffer_get_field (&buffer, &buffer_size, &file);
1278   if (status != 0)
1279   {
1280     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1281   }
1282   else
1283   {
1284     pthread_mutex_lock(&stats_lock);
1285     stats_flush_received++;
1286     pthread_mutex_unlock(&stats_lock);
1288     get_abs_path(&file, file_tmp);
1289     if (!check_file_access(file, sock)) return 0;
1291     status = flush_file (file);
1292     if (status == 0)
1293       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1294     else if (status == ENOENT)
1295     {
1296       /* no file in our tree; see whether it exists at all */
1297       struct stat statbuf;
1299       memset(&statbuf, 0, sizeof(statbuf));
1300       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1301         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1302       else
1303         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1304     }
1305     else if (status < 0)
1306       return send_response(sock, RESP_ERR, "Internal error.\n");
1307     else
1308       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1309   }
1311   /* NOTREACHED */
1312   assert(1==0);
1313 } /* }}} int handle_request_flush */
1315 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1317   int status;
1319   status = has_privilege(sock, PRIV_HIGH);
1320   if (status <= 0)
1321     return status;
1323   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1325   pthread_mutex_lock(&cache_lock);
1326   flush_old_values(-1);
1327   pthread_mutex_unlock(&cache_lock);
1329   return send_response(sock, RESP_OK, "Started flush.\n");
1330 } /* }}} static int handle_request_flushall */
1332 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1333                                   char *buffer, size_t buffer_size)
1335   int status;
1336   char *file, file_tmp[PATH_MAX];
1337   cache_item_t *ci;
1339   status = buffer_get_field(&buffer, &buffer_size, &file);
1340   if (status != 0)
1341     return send_response(sock, RESP_ERR,
1342                          "Usage: PENDING <filename>\n");
1344   status = has_privilege(sock, PRIV_HIGH);
1345   if (status <= 0)
1346     return status;
1348   get_abs_path(&file, file_tmp);
1350   pthread_mutex_lock(&cache_lock);
1351   ci = g_tree_lookup(cache_tree, file);
1352   if (ci == NULL)
1353   {
1354     pthread_mutex_unlock(&cache_lock);
1355     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1356   }
1358   for (int i=0; i < ci->values_num; i++)
1359     add_response_info(sock, "%s\n", ci->values[i]);
1361   pthread_mutex_unlock(&cache_lock);
1362   return send_response(sock, RESP_OK, "updates pending\n");
1363 } /* }}} static int handle_request_pending */
1365 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1366                                  char *buffer, size_t buffer_size)
1368   int status;
1369   gboolean found;
1370   char *file, file_tmp[PATH_MAX];
1372   status = buffer_get_field(&buffer, &buffer_size, &file);
1373   if (status != 0)
1374     return send_response(sock, RESP_ERR,
1375                          "Usage: FORGET <filename>\n");
1377   status = has_privilege(sock, PRIV_HIGH);
1378   if (status <= 0)
1379     return status;
1381   get_abs_path(&file, file_tmp);
1382   if (!check_file_access(file, sock)) return 0;
1384   pthread_mutex_lock(&cache_lock);
1385   found = g_tree_remove(cache_tree, file);
1386   pthread_mutex_unlock(&cache_lock);
1388   if (found == TRUE)
1389   {
1390     if (sock != NULL)
1391       journal_write("forget", file);
1393     return send_response(sock, RESP_OK, "Gone!\n");
1394   }
1395   else
1396     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1398   /* NOTREACHED */
1399   assert(1==0);
1400 } /* }}} static int handle_request_forget */
1402 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1404   cache_item_t *ci;
1406   pthread_mutex_lock(&cache_lock);
1408   ci = cache_queue_head;
1409   while (ci != NULL)
1410   {
1411     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1412     ci = ci->next;
1413   }
1415   pthread_mutex_unlock(&cache_lock);
1417   return send_response(sock, RESP_OK, "in queue.\n");
1418 } /* }}} int handle_request_queue */
1420 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1421                                   time_t now,
1422                                   char *buffer, size_t buffer_size)
1424   char *file, file_tmp[PATH_MAX];
1425   int values_num = 0;
1426   int status;
1427   char orig_buf[CMD_MAX];
1429   cache_item_t *ci;
1431   status = has_privilege(sock, PRIV_HIGH);
1432   if (status <= 0)
1433     return status;
1435   /* save it for the journal later */
1436   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1438   status = buffer_get_field (&buffer, &buffer_size, &file);
1439   if (status != 0)
1440     return send_response(sock, RESP_ERR,
1441                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1443   pthread_mutex_lock(&stats_lock);
1444   stats_updates_received++;
1445   pthread_mutex_unlock(&stats_lock);
1447   get_abs_path(&file, file_tmp);
1448   if (!check_file_access(file, sock)) return 0;
1450   pthread_mutex_lock (&cache_lock);
1451   ci = g_tree_lookup (cache_tree, file);
1453   if (ci == NULL) /* {{{ */
1454   {
1455     struct stat statbuf;
1457     /* don't hold the lock while we setup; stat(2) might block */
1458     pthread_mutex_unlock(&cache_lock);
1460     memset (&statbuf, 0, sizeof (statbuf));
1461     status = stat (file, &statbuf);
1462     if (status != 0)
1463     {
1464       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1466       status = errno;
1467       if (status == ENOENT)
1468         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1469       else
1470         return send_response(sock, RESP_ERR,
1471                              "stat failed with error %i.\n", status);
1472     }
1473     if (!S_ISREG (statbuf.st_mode))
1474       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1476     if (access(file, R_OK|W_OK) != 0)
1477       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1478                            file, rrd_strerror(errno));
1480     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1481     if (ci == NULL)
1482     {
1483       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1485       return send_response(sock, RESP_ERR, "malloc failed.\n");
1486     }
1487     memset (ci, 0, sizeof (cache_item_t));
1489     ci->file = strdup (file);
1490     if (ci->file == NULL)
1491     {
1492       free (ci);
1493       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1495       return send_response(sock, RESP_ERR, "strdup failed.\n");
1496     }
1498     wipe_ci_values(ci, now);
1499     ci->flags = CI_FLAGS_IN_TREE;
1500     pthread_cond_init(&ci->flushed, NULL);
1502     pthread_mutex_lock(&cache_lock);
1503     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1504   } /* }}} */
1505   assert (ci != NULL);
1507   /* don't re-write updates in replay mode */
1508   if (sock != NULL)
1509     journal_write("update", orig_buf);
1511   while (buffer_size > 0)
1512   {
1513     char **temp;
1514     char *value;
1515     time_t stamp;
1516     char *eostamp;
1518     status = buffer_get_field (&buffer, &buffer_size, &value);
1519     if (status != 0)
1520     {
1521       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1522       break;
1523     }
1525     /* make sure update time is always moving forward */
1526     stamp = strtol(value, &eostamp, 10);
1527     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1528     {
1529       pthread_mutex_unlock(&cache_lock);
1530       return send_response(sock, RESP_ERR,
1531                            "Cannot find timestamp in '%s'!\n", value);
1532     }
1533     else if (stamp <= ci->last_update_stamp)
1534     {
1535       pthread_mutex_unlock(&cache_lock);
1536       return send_response(sock, RESP_ERR,
1537                            "illegal attempt to update using time %ld when last"
1538                            " update time is %ld (minimum one second step)\n",
1539                            stamp, ci->last_update_stamp);
1540     }
1541     else
1542       ci->last_update_stamp = stamp;
1544     temp = (char **) rrd_realloc (ci->values,
1545         sizeof (char *) * (ci->values_num + 1));
1546     if (temp == NULL)
1547     {
1548       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1549       continue;
1550     }
1551     ci->values = temp;
1553     ci->values[ci->values_num] = strdup (value);
1554     if (ci->values[ci->values_num] == NULL)
1555     {
1556       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1557       continue;
1558     }
1559     ci->values_num++;
1561     values_num++;
1562   }
1564   if (((now - ci->last_flush_time) >= config_write_interval)
1565       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1566       && (ci->values_num > 0))
1567   {
1568     enqueue_cache_item (ci, TAIL);
1569   }
1571   pthread_mutex_unlock (&cache_lock);
1573   if (values_num < 1)
1574     return send_response(sock, RESP_ERR, "No values updated.\n");
1575   else
1576     return send_response(sock, RESP_OK,
1577                          "errors, enqueued %i value(s).\n", values_num);
1579   /* NOTREACHED */
1580   assert(1==0);
1582 } /* }}} int handle_request_update */
1584 /* we came across a "WROTE" entry during journal replay.
1585  * throw away any values that we have accumulated for this file
1586  */
1587 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1589   int i;
1590   cache_item_t *ci;
1591   const char *file = buffer;
1593   pthread_mutex_lock(&cache_lock);
1595   ci = g_tree_lookup(cache_tree, file);
1596   if (ci == NULL)
1597   {
1598     pthread_mutex_unlock(&cache_lock);
1599     return (0);
1600   }
1602   if (ci->values)
1603   {
1604     for (i=0; i < ci->values_num; i++)
1605       free(ci->values[i]);
1607     free(ci->values);
1608   }
1610   wipe_ci_values(ci, now);
1611   remove_from_queue(ci);
1613   pthread_mutex_unlock(&cache_lock);
1614   return (0);
1615 } /* }}} int handle_request_wrote */
1617 /* start "BATCH" processing */
1618 static int batch_start (listen_socket_t *sock) /* {{{ */
1620   int status;
1621   if (sock->batch_start)
1622     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1624   status = send_response(sock, RESP_OK,
1625                          "Go ahead.  End with dot '.' on its own line.\n");
1626   sock->batch_start = time(NULL);
1627   sock->batch_cmd = 0;
1629   return status;
1630 } /* }}} static int batch_start */
1632 /* finish "BATCH" processing and return results to the client */
1633 static int batch_done (listen_socket_t *sock) /* {{{ */
1635   assert(sock->batch_start);
1636   sock->batch_start = 0;
1637   sock->batch_cmd  = 0;
1638   return send_response(sock, RESP_OK, "errors\n");
1639 } /* }}} static int batch_done */
1641 /* if sock==NULL, we are in journal replay mode */
1642 static int handle_request (listen_socket_t *sock, /* {{{ */
1643                            time_t now,
1644                            char *buffer, size_t buffer_size)
1646   char *buffer_ptr;
1647   char *command;
1648   int status;
1650   assert (buffer[buffer_size - 1] == '\0');
1652   buffer_ptr = buffer;
1653   command = NULL;
1654   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1655   if (status != 0)
1656   {
1657     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1658     return (-1);
1659   }
1661   if (sock != NULL && sock->batch_start)
1662     sock->batch_cmd++;
1664   if (strcasecmp (command, "update") == 0)
1665     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1666   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1667   {
1668     /* this is only valid in replay mode */
1669     return (handle_request_wrote (buffer_ptr, now));
1670   }
1671   else if (strcasecmp (command, "flush") == 0)
1672     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1673   else if (strcasecmp (command, "flushall") == 0)
1674     return (handle_request_flushall(sock));
1675   else if (strcasecmp (command, "pending") == 0)
1676     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1677   else if (strcasecmp (command, "forget") == 0)
1678     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1679   else if (strcasecmp (command, "queue") == 0)
1680     return (handle_request_queue(sock));
1681   else if (strcasecmp (command, "stats") == 0)
1682     return (handle_request_stats (sock));
1683   else if (strcasecmp (command, "help") == 0)
1684     return (handle_request_help (sock, buffer_ptr, buffer_size));
1685   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1686     return batch_start(sock);
1687   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1688     return batch_done(sock);
1689   else if (strcasecmp (command, "quit") == 0)
1690     return -1;
1691   else
1692     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1694   /* NOTREACHED */
1695   assert(1==0);
1696 } /* }}} int handle_request */
1698 /* MUST NOT hold journal_lock before calling this */
1699 static void journal_rotate(void) /* {{{ */
1701   FILE *old_fh = NULL;
1702   int new_fd;
1704   if (journal_cur == NULL || journal_old == NULL)
1705     return;
1707   pthread_mutex_lock(&journal_lock);
1709   /* we rotate this way (rename before close) so that the we can release
1710    * the journal lock as fast as possible.  Journal writes to the new
1711    * journal can proceed immediately after the new file is opened.  The
1712    * fclose can then block without affecting new updates.
1713    */
1714   if (journal_fh != NULL)
1715   {
1716     old_fh = journal_fh;
1717     journal_fh = NULL;
1718     rename(journal_cur, journal_old);
1719     ++stats_journal_rotate;
1720   }
1722   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1723                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1724   if (new_fd >= 0)
1725   {
1726     journal_fh = fdopen(new_fd, "a");
1727     if (journal_fh == NULL)
1728       close(new_fd);
1729   }
1731   pthread_mutex_unlock(&journal_lock);
1733   if (old_fh != NULL)
1734     fclose(old_fh);
1736   if (journal_fh == NULL)
1737   {
1738     RRDD_LOG(LOG_CRIT,
1739              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1740              journal_cur, rrd_strerror(errno));
1742     RRDD_LOG(LOG_ERR,
1743              "JOURNALING DISABLED: All values will be flushed at shutdown");
1744     config_flush_at_shutdown = 1;
1745   }
1747 } /* }}} static void journal_rotate */
1749 static void journal_done(void) /* {{{ */
1751   if (journal_cur == NULL)
1752     return;
1754   pthread_mutex_lock(&journal_lock);
1755   if (journal_fh != NULL)
1756   {
1757     fclose(journal_fh);
1758     journal_fh = NULL;
1759   }
1761   if (config_flush_at_shutdown)
1762   {
1763     RRDD_LOG(LOG_INFO, "removing journals");
1764     unlink(journal_old);
1765     unlink(journal_cur);
1766   }
1767   else
1768   {
1769     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1770              "journals will be used at next startup");
1771   }
1773   pthread_mutex_unlock(&journal_lock);
1775 } /* }}} static void journal_done */
1777 static int journal_write(char *cmd, char *args) /* {{{ */
1779   int chars;
1781   if (journal_fh == NULL)
1782     return 0;
1784   pthread_mutex_lock(&journal_lock);
1785   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1786   pthread_mutex_unlock(&journal_lock);
1788   if (chars > 0)
1789   {
1790     pthread_mutex_lock(&stats_lock);
1791     stats_journal_bytes += chars;
1792     pthread_mutex_unlock(&stats_lock);
1793   }
1795   return chars;
1796 } /* }}} static int journal_write */
1798 static int journal_replay (const char *file) /* {{{ */
1800   FILE *fh;
1801   int entry_cnt = 0;
1802   int fail_cnt = 0;
1803   uint64_t line = 0;
1804   char entry[CMD_MAX];
1805   time_t now;
1807   if (file == NULL) return 0;
1809   {
1810     char *reason = "unknown error";
1811     int status = 0;
1812     struct stat statbuf;
1814     memset(&statbuf, 0, sizeof(statbuf));
1815     if (stat(file, &statbuf) != 0)
1816     {
1817       if (errno == ENOENT)
1818         return 0;
1820       reason = "stat error";
1821       status = errno;
1822     }
1823     else if (!S_ISREG(statbuf.st_mode))
1824     {
1825       reason = "not a regular file";
1826       status = EPERM;
1827     }
1828     if (statbuf.st_uid != daemon_uid)
1829     {
1830       reason = "not owned by daemon user";
1831       status = EACCES;
1832     }
1833     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1834     {
1835       reason = "must not be user/group writable";
1836       status = EACCES;
1837     }
1839     if (status != 0)
1840     {
1841       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1842                file, rrd_strerror(status), reason);
1843       return 0;
1844     }
1845   }
1847   fh = fopen(file, "r");
1848   if (fh == NULL)
1849   {
1850     if (errno != ENOENT)
1851       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1852                file, rrd_strerror(errno));
1853     return 0;
1854   }
1855   else
1856     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1858   now = time(NULL);
1860   while(!feof(fh))
1861   {
1862     size_t entry_len;
1864     ++line;
1865     if (fgets(entry, sizeof(entry), fh) == NULL)
1866       break;
1867     entry_len = strlen(entry);
1869     /* check \n termination in case journal writing crashed mid-line */
1870     if (entry_len == 0)
1871       continue;
1872     else if (entry[entry_len - 1] != '\n')
1873     {
1874       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1875       ++fail_cnt;
1876       continue;
1877     }
1879     entry[entry_len - 1] = '\0';
1881     if (handle_request(NULL, now, entry, entry_len) == 0)
1882       ++entry_cnt;
1883     else
1884       ++fail_cnt;
1885   }
1887   fclose(fh);
1889   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1890            entry_cnt, fail_cnt);
1892   return entry_cnt > 0 ? 1 : 0;
1893 } /* }}} static int journal_replay */
1895 static void journal_init(void) /* {{{ */
1897   int had_journal = 0;
1899   if (journal_cur == NULL) return;
1901   pthread_mutex_lock(&journal_lock);
1903   RRDD_LOG(LOG_INFO, "checking for journal files");
1905   had_journal += journal_replay(journal_old);
1906   had_journal += journal_replay(journal_cur);
1908   /* it must have been a crash.  start a flush */
1909   if (had_journal && config_flush_at_shutdown)
1910     flush_old_values(-1);
1912   pthread_mutex_unlock(&journal_lock);
1913   journal_rotate();
1915   RRDD_LOG(LOG_INFO, "journal processing complete");
1917 } /* }}} static void journal_init */
1919 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1921   assert(sock != NULL);
1923   free(sock->rbuf);  sock->rbuf = NULL;
1924   free(sock->wbuf);  sock->wbuf = NULL;
1925   free(sock);
1926 } /* }}} void free_listen_socket */
1928 static void close_connection(listen_socket_t *sock) /* {{{ */
1930   if (sock->fd >= 0)
1931   {
1932     close(sock->fd);
1933     sock->fd = -1;
1934   }
1936   free_listen_socket(sock);
1938 } /* }}} void close_connection */
1940 static void *connection_thread_main (void *args) /* {{{ */
1942   listen_socket_t *sock;
1943   int i;
1944   int fd;
1946   sock = (listen_socket_t *) args;
1947   fd = sock->fd;
1949   /* init read buffers */
1950   sock->next_read = sock->next_cmd = 0;
1951   sock->rbuf = malloc(RBUF_SIZE);
1952   if (sock->rbuf == NULL)
1953   {
1954     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1955     close_connection(sock);
1956     return NULL;
1957   }
1959   pthread_mutex_lock (&connection_threads_lock);
1960   {
1961     pthread_t *temp;
1963     temp = (pthread_t *) rrd_realloc (connection_threads,
1964         sizeof (pthread_t) * (connection_threads_num + 1));
1965     if (temp == NULL)
1966     {
1967       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1968     }
1969     else
1970     {
1971       connection_threads = temp;
1972       connection_threads[connection_threads_num] = pthread_self ();
1973       connection_threads_num++;
1974     }
1975   }
1976   pthread_mutex_unlock (&connection_threads_lock);
1978   while (do_shutdown == 0)
1979   {
1980     char *cmd;
1981     ssize_t cmd_len;
1982     ssize_t rbytes;
1983     time_t now;
1985     struct pollfd pollfd;
1986     int status;
1988     pollfd.fd = fd;
1989     pollfd.events = POLLIN | POLLPRI;
1990     pollfd.revents = 0;
1992     status = poll (&pollfd, 1, /* timeout = */ 500);
1993     if (do_shutdown)
1994       break;
1995     else if (status == 0) /* timeout */
1996       continue;
1997     else if (status < 0) /* error */
1998     {
1999       status = errno;
2000       if (status != EINTR)
2001         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2002       continue;
2003     }
2005     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2006       break;
2007     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2008     {
2009       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2010           "poll(2) returned something unexpected: %#04hx",
2011           pollfd.revents);
2012       break;
2013     }
2015     rbytes = read(fd, sock->rbuf + sock->next_read,
2016                   RBUF_SIZE - sock->next_read);
2017     if (rbytes < 0)
2018     {
2019       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2020       break;
2021     }
2022     else if (rbytes == 0)
2023       break; /* eof */
2025     sock->next_read += rbytes;
2027     if (sock->batch_start)
2028       now = sock->batch_start;
2029     else
2030       now = time(NULL);
2032     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2033     {
2034       status = handle_request (sock, now, cmd, cmd_len+1);
2035       if (status != 0)
2036         goto out_close;
2037     }
2038   }
2040 out_close:
2041   close_connection(sock);
2043   /* Remove this thread from the connection threads list */
2044   pthread_mutex_lock (&connection_threads_lock);
2045   {
2046     pthread_t self;
2047     pthread_t *temp;
2049     /* Find out own index in the array */
2050     self = pthread_self ();
2051     for (i = 0; i < connection_threads_num; i++)
2052       if (pthread_equal (connection_threads[i], self) != 0)
2053         break;
2054     assert (i < connection_threads_num);
2056     /* Move the trailing threads forward. */
2057     if (i < (connection_threads_num - 1))
2058     {
2059       memmove (connection_threads + i,
2060                connection_threads + i + 1,
2061                sizeof (pthread_t) * (connection_threads_num - i - 1));
2062     }
2064     connection_threads_num--;
2066     temp = rrd_realloc(connection_threads,
2067                    sizeof(*connection_threads) * connection_threads_num);
2068     if (connection_threads_num > 0 && temp == NULL)
2069       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2070     else
2071       connection_threads = temp;
2072   }
2073   pthread_mutex_unlock (&connection_threads_lock);
2075   return (NULL);
2076 } /* }}} void *connection_thread_main */
2078 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2080   int fd;
2081   struct sockaddr_un sa;
2082   listen_socket_t *temp;
2083   int status;
2084   const char *path;
2086   path = sock->addr;
2087   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2088     path += strlen("unix:");
2090   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2091       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2092   if (temp == NULL)
2093   {
2094     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2095     return (-1);
2096   }
2097   listen_fds = temp;
2098   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2100   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2101   if (fd < 0)
2102   {
2103     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2104              rrd_strerror(errno));
2105     return (-1);
2106   }
2108   memset (&sa, 0, sizeof (sa));
2109   sa.sun_family = AF_UNIX;
2110   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2112   /* if we've gotten this far, we own the pid file.  any daemon started
2113    * with the same args must not be alive.  therefore, ensure that we can
2114    * create the socket...
2115    */
2116   unlink(path);
2118   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2119   if (status != 0)
2120   {
2121     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2122              path, rrd_strerror(errno));
2123     close (fd);
2124     return (-1);
2125   }
2127   status = listen (fd, /* backlog = */ 10);
2128   if (status != 0)
2129   {
2130     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2131              path, rrd_strerror(errno));
2132     close (fd);
2133     unlink (path);
2134     return (-1);
2135   }
2137   listen_fds[listen_fds_num].fd = fd;
2138   listen_fds[listen_fds_num].family = PF_UNIX;
2139   strncpy(listen_fds[listen_fds_num].addr, path,
2140           sizeof (listen_fds[listen_fds_num].addr) - 1);
2141   listen_fds_num++;
2143   return (0);
2144 } /* }}} int open_listen_socket_unix */
2146 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2148   struct addrinfo ai_hints;
2149   struct addrinfo *ai_res;
2150   struct addrinfo *ai_ptr;
2151   char addr_copy[NI_MAXHOST];
2152   char *addr;
2153   char *port;
2154   int status;
2156   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2157   addr_copy[sizeof (addr_copy) - 1] = 0;
2158   addr = addr_copy;
2160   memset (&ai_hints, 0, sizeof (ai_hints));
2161   ai_hints.ai_flags = 0;
2162 #ifdef AI_ADDRCONFIG
2163   ai_hints.ai_flags |= AI_ADDRCONFIG;
2164 #endif
2165   ai_hints.ai_family = AF_UNSPEC;
2166   ai_hints.ai_socktype = SOCK_STREAM;
2168   port = NULL;
2169   if (*addr == '[') /* IPv6+port format */
2170   {
2171     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2172     addr++;
2174     port = strchr (addr, ']');
2175     if (port == NULL)
2176     {
2177       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2178       return (-1);
2179     }
2180     *port = 0;
2181     port++;
2183     if (*port == ':')
2184       port++;
2185     else if (*port == 0)
2186       port = NULL;
2187     else
2188     {
2189       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2190       return (-1);
2191     }
2192   } /* if (*addr = ']') */
2193   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2194   {
2195     port = rindex(addr, ':');
2196     if (port != NULL)
2197     {
2198       *port = 0;
2199       port++;
2200     }
2201   }
2202   ai_res = NULL;
2203   status = getaddrinfo (addr,
2204                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2205                         &ai_hints, &ai_res);
2206   if (status != 0)
2207   {
2208     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2209              addr, gai_strerror (status));
2210     return (-1);
2211   }
2213   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2214   {
2215     int fd;
2216     listen_socket_t *temp;
2217     int one = 1;
2219     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2220         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2221     if (temp == NULL)
2222     {
2223       fprintf (stderr,
2224                "rrdcached: open_listen_socket_network: realloc failed.\n");
2225       continue;
2226     }
2227     listen_fds = temp;
2228     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2230     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2231     if (fd < 0)
2232     {
2233       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2234                rrd_strerror(errno));
2235       continue;
2236     }
2238     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2240     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2241     if (status != 0)
2242     {
2243       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2244                sock->addr, rrd_strerror(errno));
2245       close (fd);
2246       continue;
2247     }
2249     status = listen (fd, /* backlog = */ 10);
2250     if (status != 0)
2251     {
2252       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2253                sock->addr, rrd_strerror(errno));
2254       close (fd);
2255       freeaddrinfo(ai_res);
2256       return (-1);
2257     }
2259     listen_fds[listen_fds_num].fd = fd;
2260     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2261     listen_fds_num++;
2262   } /* for (ai_ptr) */
2264   freeaddrinfo(ai_res);
2265   return (0);
2266 } /* }}} static int open_listen_socket_network */
2268 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2270   assert(sock != NULL);
2271   assert(sock->addr != NULL);
2273   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2274       || sock->addr[0] == '/')
2275     return (open_listen_socket_unix(sock));
2276   else
2277     return (open_listen_socket_network(sock));
2278 } /* }}} int open_listen_socket */
2280 static int close_listen_sockets (void) /* {{{ */
2282   size_t i;
2284   for (i = 0; i < listen_fds_num; i++)
2285   {
2286     close (listen_fds[i].fd);
2288     if (listen_fds[i].family == PF_UNIX)
2289       unlink(listen_fds[i].addr);
2290   }
2292   free (listen_fds);
2293   listen_fds = NULL;
2294   listen_fds_num = 0;
2296   return (0);
2297 } /* }}} int close_listen_sockets */
2299 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2301   struct pollfd *pollfds;
2302   int pollfds_num;
2303   int status;
2304   int i;
2306   if (listen_fds_num < 1)
2307   {
2308     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2309     return (NULL);
2310   }
2312   pollfds_num = listen_fds_num;
2313   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2314   if (pollfds == NULL)
2315   {
2316     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2317     return (NULL);
2318   }
2319   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2321   RRDD_LOG(LOG_INFO, "listening for connections");
2323   while (do_shutdown == 0)
2324   {
2325     for (i = 0; i < pollfds_num; i++)
2326     {
2327       pollfds[i].fd = listen_fds[i].fd;
2328       pollfds[i].events = POLLIN | POLLPRI;
2329       pollfds[i].revents = 0;
2330     }
2332     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2333     if (do_shutdown)
2334       break;
2335     else if (status == 0) /* timeout */
2336       continue;
2337     else if (status < 0) /* error */
2338     {
2339       status = errno;
2340       if (status != EINTR)
2341       {
2342         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2343       }
2344       continue;
2345     }
2347     for (i = 0; i < pollfds_num; i++)
2348     {
2349       listen_socket_t *client_sock;
2350       struct sockaddr_storage client_sa;
2351       socklen_t client_sa_size;
2352       pthread_t tid;
2353       pthread_attr_t attr;
2355       if (pollfds[i].revents == 0)
2356         continue;
2358       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2359       {
2360         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2361             "poll(2) returned something unexpected for listen FD #%i.",
2362             pollfds[i].fd);
2363         continue;
2364       }
2366       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2367       if (client_sock == NULL)
2368       {
2369         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2370         continue;
2371       }
2372       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2374       client_sa_size = sizeof (client_sa);
2375       client_sock->fd = accept (pollfds[i].fd,
2376           (struct sockaddr *) &client_sa, &client_sa_size);
2377       if (client_sock->fd < 0)
2378       {
2379         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2380         free(client_sock);
2381         continue;
2382       }
2384       pthread_attr_init (&attr);
2385       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2387       status = pthread_create (&tid, &attr, connection_thread_main,
2388                                client_sock);
2389       if (status != 0)
2390       {
2391         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2392         close_connection(client_sock);
2393         continue;
2394       }
2395     } /* for (pollfds_num) */
2396   } /* while (do_shutdown == 0) */
2398   RRDD_LOG(LOG_INFO, "starting shutdown");
2400   close_listen_sockets ();
2402   pthread_mutex_lock (&connection_threads_lock);
2403   while (connection_threads_num > 0)
2404   {
2405     pthread_t wait_for;
2407     wait_for = connection_threads[0];
2409     pthread_mutex_unlock (&connection_threads_lock);
2410     pthread_join (wait_for, /* retval = */ NULL);
2411     pthread_mutex_lock (&connection_threads_lock);
2412   }
2413   pthread_mutex_unlock (&connection_threads_lock);
2415   free(pollfds);
2417   return (NULL);
2418 } /* }}} void *listen_thread_main */
2420 static int daemonize (void) /* {{{ */
2422   int pid_fd;
2423   char *base_dir;
2425   daemon_uid = geteuid();
2427   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2428   if (pid_fd < 0)
2429     pid_fd = check_pidfile();
2430   if (pid_fd < 0)
2431     return pid_fd;
2433   /* open all the listen sockets */
2434   if (config_listen_address_list_len > 0)
2435   {
2436     for (int i = 0; i < config_listen_address_list_len; i++)
2437     {
2438       open_listen_socket (config_listen_address_list[i]);
2439       free_listen_socket (config_listen_address_list[i]);
2440     }
2442     free(config_listen_address_list);
2443   }
2444   else
2445   {
2446     listen_socket_t sock;
2447     memset(&sock, 0, sizeof(sock));
2448     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2449     open_listen_socket (&sock);
2450   }
2452   if (listen_fds_num < 1)
2453   {
2454     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2455     goto error;
2456   }
2458   if (!stay_foreground)
2459   {
2460     pid_t child;
2462     child = fork ();
2463     if (child < 0)
2464     {
2465       fprintf (stderr, "daemonize: fork(2) failed.\n");
2466       goto error;
2467     }
2468     else if (child > 0)
2469       exit(0);
2471     /* Become session leader */
2472     setsid ();
2474     /* Open the first three file descriptors to /dev/null */
2475     close (2);
2476     close (1);
2477     close (0);
2479     open ("/dev/null", O_RDWR);
2480     dup (0);
2481     dup (0);
2482   } /* if (!stay_foreground) */
2484   /* Change into the /tmp directory. */
2485   base_dir = (config_base_dir != NULL)
2486     ? config_base_dir
2487     : "/tmp";
2489   if (chdir (base_dir) != 0)
2490   {
2491     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2492     goto error;
2493   }
2495   install_signal_handlers();
2497   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2498   RRDD_LOG(LOG_INFO, "starting up");
2500   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2501                                 (GDestroyNotify) free_cache_item);
2502   if (cache_tree == NULL)
2503   {
2504     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2505     goto error;
2506   }
2508   return write_pidfile (pid_fd);
2510 error:
2511   remove_pidfile();
2512   return -1;
2513 } /* }}} int daemonize */
2515 static int cleanup (void) /* {{{ */
2517   do_shutdown++;
2519   pthread_cond_signal (&cache_cond);
2520   pthread_join (queue_thread, /* return = */ NULL);
2522   remove_pidfile ();
2524   free(config_base_dir);
2525   free(config_pid_file);
2526   free(journal_cur);
2527   free(journal_old);
2529   pthread_mutex_lock(&cache_lock);
2530   g_tree_destroy(cache_tree);
2532   RRDD_LOG(LOG_INFO, "goodbye");
2533   closelog ();
2535   return (0);
2536 } /* }}} int cleanup */
2538 static int read_options (int argc, char **argv) /* {{{ */
2540   int option;
2541   int status = 0;
2543   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2544   {
2545     switch (option)
2546     {
2547       case 'g':
2548         stay_foreground=1;
2549         break;
2551       case 'L':
2552       case 'l':
2553       {
2554         listen_socket_t **temp;
2555         listen_socket_t *new;
2557         new = malloc(sizeof(listen_socket_t));
2558         if (new == NULL)
2559         {
2560           fprintf(stderr, "read_options: malloc failed.\n");
2561           return(2);
2562         }
2563         memset(new, 0, sizeof(listen_socket_t));
2565         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2566             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2567         if (temp == NULL)
2568         {
2569           fprintf (stderr, "read_options: realloc failed.\n");
2570           return (2);
2571         }
2572         config_listen_address_list = temp;
2574         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2575         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2577         temp[config_listen_address_list_len] = new;
2578         config_listen_address_list_len++;
2579       }
2580       break;
2582       case 'f':
2583       {
2584         int temp;
2586         temp = atoi (optarg);
2587         if (temp > 0)
2588           config_flush_interval = temp;
2589         else
2590         {
2591           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2592           status = 3;
2593         }
2594       }
2595       break;
2597       case 'w':
2598       {
2599         int temp;
2601         temp = atoi (optarg);
2602         if (temp > 0)
2603           config_write_interval = temp;
2604         else
2605         {
2606           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2607           status = 2;
2608         }
2609       }
2610       break;
2612       case 'z':
2613       {
2614         int temp;
2616         temp = atoi(optarg);
2617         if (temp > 0)
2618           config_write_jitter = temp;
2619         else
2620         {
2621           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2622           status = 2;
2623         }
2625         break;
2626       }
2628       case 'B':
2629         config_write_base_only = 1;
2630         break;
2632       case 'b':
2633       {
2634         size_t len;
2635         char base_realpath[PATH_MAX];
2637         if (config_base_dir != NULL)
2638           free (config_base_dir);
2639         config_base_dir = strdup (optarg);
2640         if (config_base_dir == NULL)
2641         {
2642           fprintf (stderr, "read_options: strdup failed.\n");
2643           return (3);
2644         }
2646         /* make sure that the base directory is not resolved via
2647          * symbolic links.  this makes some performance-enhancing
2648          * assumptions possible (we don't have to resolve paths
2649          * that start with a "/")
2650          */
2651         if (realpath(config_base_dir, base_realpath) == NULL)
2652         {
2653           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2654           return 5;
2655         }
2656         else if (strncmp(config_base_dir,
2657                          base_realpath, sizeof(base_realpath)) != 0)
2658         {
2659           fprintf(stderr,
2660                   "Base directory (-b) resolved via file system links!\n"
2661                   "Please consult rrdcached '-b' documentation!\n"
2662                   "Consider specifying the real directory (%s)\n",
2663                   base_realpath);
2664           return 5;
2665         }
2667         len = strlen (config_base_dir);
2668         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2669         {
2670           config_base_dir[len - 1] = 0;
2671           len--;
2672         }
2674         if (len < 1)
2675         {
2676           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2677           return (4);
2678         }
2680         _config_base_dir_len = len;
2681       }
2682       break;
2684       case 'p':
2685       {
2686         if (config_pid_file != NULL)
2687           free (config_pid_file);
2688         config_pid_file = strdup (optarg);
2689         if (config_pid_file == NULL)
2690         {
2691           fprintf (stderr, "read_options: strdup failed.\n");
2692           return (3);
2693         }
2694       }
2695       break;
2697       case 'F':
2698         config_flush_at_shutdown = 1;
2699         break;
2701       case 'j':
2702       {
2703         struct stat statbuf;
2704         const char *dir = optarg;
2706         status = stat(dir, &statbuf);
2707         if (status != 0)
2708         {
2709           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2710           return 6;
2711         }
2713         if (!S_ISDIR(statbuf.st_mode)
2714             || access(dir, R_OK|W_OK|X_OK) != 0)
2715         {
2716           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2717                   errno ? rrd_strerror(errno) : "");
2718           return 6;
2719         }
2721         journal_cur = malloc(PATH_MAX + 1);
2722         journal_old = malloc(PATH_MAX + 1);
2723         if (journal_cur == NULL || journal_old == NULL)
2724         {
2725           fprintf(stderr, "malloc failure for journal files\n");
2726           return 6;
2727         }
2728         else 
2729         {
2730           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2731           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2732         }
2733       }
2734       break;
2736       case 'h':
2737       case '?':
2738         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2739             "\n"
2740             "Usage: rrdcached [options]\n"
2741             "\n"
2742             "Valid options are:\n"
2743             "  -l <address>  Socket address to listen to.\n"
2744             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2745             "  -w <seconds>  Interval in which to write data.\n"
2746             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2747             "  -f <seconds>  Interval in which to flush dead data.\n"
2748             "  -p <file>     Location of the PID-file.\n"
2749             "  -b <dir>      Base directory to change to.\n"
2750             "  -B            Restrict file access to paths within -b <dir>\n"
2751             "  -g            Do not fork and run in the foreground.\n"
2752             "  -j <dir>      Directory in which to create the journal files.\n"
2753             "  -F            Always flush all updates at shutdown\n"
2754             "\n"
2755             "For more information and a detailed description of all options "
2756             "please refer\n"
2757             "to the rrdcached(1) manual page.\n",
2758             VERSION);
2759         status = -1;
2760         break;
2761     } /* switch (option) */
2762   } /* while (getopt) */
2764   /* advise the user when values are not sane */
2765   if (config_flush_interval < 2 * config_write_interval)
2766     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2767             " 2x write interval (-w) !\n");
2768   if (config_write_jitter > config_write_interval)
2769     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2770             " write interval (-w) !\n");
2772   if (config_write_base_only && config_base_dir == NULL)
2773     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2774             "  Consult the rrdcached documentation\n");
2776   if (journal_cur == NULL)
2777     config_flush_at_shutdown = 1;
2779   return (status);
2780 } /* }}} int read_options */
2782 int main (int argc, char **argv)
2784   int status;
2786   status = read_options (argc, argv);
2787   if (status != 0)
2788   {
2789     if (status < 0)
2790       status = 0;
2791     return (status);
2792   }
2794   status = daemonize ();
2795   if (status != 0)
2796   {
2797     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2798     return (1);
2799   }
2801   journal_init();
2803   /* start the queue thread */
2804   memset (&queue_thread, 0, sizeof (queue_thread));
2805   status = pthread_create (&queue_thread,
2806                            NULL, /* attr */
2807                            queue_thread_main,
2808                            NULL); /* args */
2809   if (status != 0)
2810   {
2811     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2812     cleanup();
2813     return (1);
2814   }
2816   listen_thread_main (NULL);
2817   cleanup ();
2819   return (0);
2820 } /* int main */
2822 /*
2823  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2824  */