Code

This patch introduces "BATCH" mode.
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 typedef enum
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
119   /* state for BATCH processing */
120   int batch_mode;
121   int batch_cmd;
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE  (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143   int flags;
144   pthread_cond_t  flushed;
145   cache_item_t *prev;
146   cache_item_t *next;
147 };
149 struct callback_flush_data_s
151   time_t now;
152   time_t abs_timeout;
153   char **keys;
154   size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
158 enum queue_side_e
160   HEAD,
161   TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
169 /*
170  * Variables
171  */
172 static int stay_foreground = 0;
174 static listen_socket_t *listen_fds = NULL;
175 static size_t listen_fds_num = 0;
177 static int do_shutdown = 0;
179 static pthread_t queue_thread;
181 static pthread_t *connection_threads = NULL;
182 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
183 static int connection_threads_num = 0;
185 /* Cache stuff */
186 static GTree          *cache_tree = NULL;
187 static cache_item_t   *cache_queue_head = NULL;
188 static cache_item_t   *cache_queue_tail = NULL;
189 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
190 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
192 static int config_write_interval = 300;
193 static int config_write_jitter   = 0;
194 static int config_flush_interval = 3600;
195 static int config_flush_at_shutdown = 0;
196 static char *config_pid_file = NULL;
197 static char *config_base_dir = NULL;
198 static size_t _config_base_dir_len = 0;
199 static int config_write_base_only = 0;
201 static listen_socket_t **config_listen_address_list = NULL;
202 static int config_listen_address_list_len = 0;
204 static uint64_t stats_queue_length = 0;
205 static uint64_t stats_updates_received = 0;
206 static uint64_t stats_flush_received = 0;
207 static uint64_t stats_updates_written = 0;
208 static uint64_t stats_data_sets_written = 0;
209 static uint64_t stats_journal_bytes = 0;
210 static uint64_t stats_journal_rotate = 0;
211 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
213 /* Journaled updates */
214 static char *journal_cur = NULL;
215 static char *journal_old = NULL;
216 static FILE *journal_fh = NULL;
217 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
218 static int journal_write(char *cmd, char *args);
219 static void journal_done(void);
220 static void journal_rotate(void);
222 /* 
223  * Functions
224  */
225 static void sig_common (const char *sig) /* {{{ */
227   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
228   do_shutdown++;
229   pthread_cond_broadcast(&cache_cond);
230 } /* }}} void sig_common */
232 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
234   sig_common("INT");
235 } /* }}} void sig_int_handler */
237 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
239   sig_common("TERM");
240 } /* }}} void sig_term_handler */
242 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
244   config_flush_at_shutdown = 1;
245   sig_common("USR1");
246 } /* }}} void sig_usr1_handler */
248 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
250   config_flush_at_shutdown = 0;
251   sig_common("USR2");
252 } /* }}} void sig_usr2_handler */
254 static void install_signal_handlers(void) /* {{{ */
256   /* These structures are static, because `sigaction' behaves weird if the are
257    * overwritten.. */
258   static struct sigaction sa_int;
259   static struct sigaction sa_term;
260   static struct sigaction sa_pipe;
261   static struct sigaction sa_usr1;
262   static struct sigaction sa_usr2;
264   /* Install signal handlers */
265   memset (&sa_int, 0, sizeof (sa_int));
266   sa_int.sa_handler = sig_int_handler;
267   sigaction (SIGINT, &sa_int, NULL);
269   memset (&sa_term, 0, sizeof (sa_term));
270   sa_term.sa_handler = sig_term_handler;
271   sigaction (SIGTERM, &sa_term, NULL);
273   memset (&sa_pipe, 0, sizeof (sa_pipe));
274   sa_pipe.sa_handler = SIG_IGN;
275   sigaction (SIGPIPE, &sa_pipe, NULL);
277   memset (&sa_pipe, 0, sizeof (sa_usr1));
278   sa_usr1.sa_handler = sig_usr1_handler;
279   sigaction (SIGUSR1, &sa_usr1, NULL);
281   memset (&sa_usr2, 0, sizeof (sa_usr2));
282   sa_usr2.sa_handler = sig_usr2_handler;
283   sigaction (SIGUSR2, &sa_usr2, NULL);
285 } /* }}} void install_signal_handlers */
287 static int open_pidfile(void) /* {{{ */
289   int fd;
290   char *file;
292   file = (config_pid_file != NULL)
293     ? config_pid_file
294     : LOCALSTATEDIR "/run/rrdcached.pid";
296   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
297   if (fd < 0)
298     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
299             file, rrd_strerror(errno));
301   return(fd);
302 } /* }}} static int open_pidfile */
304 static int write_pidfile (int fd) /* {{{ */
306   pid_t pid;
307   FILE *fh;
309   pid = getpid ();
311   fh = fdopen (fd, "w");
312   if (fh == NULL)
313   {
314     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
315     close(fd);
316     return (-1);
317   }
319   fprintf (fh, "%i\n", (int) pid);
320   fclose (fh);
322   return (0);
323 } /* }}} int write_pidfile */
325 static int remove_pidfile (void) /* {{{ */
327   char *file;
328   int status;
330   file = (config_pid_file != NULL)
331     ? config_pid_file
332     : LOCALSTATEDIR "/run/rrdcached.pid";
334   status = unlink (file);
335   if (status == 0)
336     return (0);
337   return (errno);
338 } /* }}} int remove_pidfile */
340 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
342   char *eol;
344   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
345                sock->next_read - sock->next_cmd);
347   if (eol == NULL)
348   {
349     /* no commands left, move remainder back to front of rbuf */
350     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
351             sock->next_read - sock->next_cmd);
352     sock->next_read -= sock->next_cmd;
353     sock->next_cmd = 0;
354     *len = 0;
355     return NULL;
356   }
357   else
358   {
359     char *cmd = sock->rbuf + sock->next_cmd;
360     *eol = '\0';
362     sock->next_cmd = eol - sock->rbuf + 1;
364     if (eol > sock->rbuf && *(eol-1) == '\r')
365       *(--eol) = '\0'; /* handle "\r\n" EOL */
367     *len = eol - cmd;
369     return cmd;
370   }
372   /* NOTREACHED */
373   assert(1==0);
376 /* add the characters directly to the write buffer */
377 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
379   char *new_buf;
381   assert(sock != NULL);
383   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
384   if (new_buf == NULL)
385   {
386     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
387     return -1;
388   }
390   strncpy(new_buf + sock->wbuf_len, str, len + 1);
392   sock->wbuf = new_buf;
393   sock->wbuf_len += len;
395   return 0;
396 } /* }}} static int add_to_wbuf */
398 /* add the text to the "extra" info that's sent after the status line */
399 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
401   va_list argp;
402   char buffer[CMD_MAX];
403   int len;
405   if (sock == NULL) return 0; /* journal replay mode */
406   if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
408   va_start(argp, fmt);
409 #ifdef HAVE_VSNPRINTF
410   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
411 #else
412   len = vsprintf(buffer, fmt, argp);
413 #endif
414   va_end(argp);
415   if (len < 0)
416   {
417     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
418     return -1;
419   }
421   return add_to_wbuf(sock, buffer, len);
422 } /* }}} static int add_response_info */
424 static int count_lines(char *str) /* {{{ */
426   int lines = 0;
428   if (str != NULL)
429   {
430     while ((str = strchr(str, '\n')) != NULL)
431     {
432       ++lines;
433       ++str;
434     }
435   }
437   return lines;
438 } /* }}} static int count_lines */
440 /* send the response back to the user.
441  * returns 0 on success, -1 on error
442  * write buffer is always zeroed after this call */
443 static int send_response (listen_socket_t *sock, response_code rc,
444                           char *fmt, ...) /* {{{ */
446   va_list argp;
447   char buffer[CMD_MAX];
448   int lines;
449   ssize_t wrote;
450   int rclen, len;
452   if (sock == NULL) return rc;  /* journal replay mode */
454   if (sock->batch_mode)
455   {
456     if (rc == RESP_OK)
457       return rc; /* no response on success during BATCH */
458     lines = sock->batch_cmd;
459   }
460   else if (rc == RESP_OK)
461     lines = count_lines(sock->wbuf);
462   else
463     lines = -1;
465   rclen = sprintf(buffer, "%d ", lines);
466   va_start(argp, fmt);
467 #ifdef HAVE_VSNPRINTF
468   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
469 #else
470   len = vsprintf(buffer+rclen, fmt, argp);
471 #endif
472   va_end(argp);
473   if (len < 0)
474     return -1;
476   len += rclen;
478   /* append the result to the wbuf, don't write to the user */
479   if (sock->batch_mode)
480     return add_to_wbuf(sock, buffer, len);
482   /* first write must be complete */
483   if (len != write(sock->fd, buffer, len))
484   {
485     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
486     return -1;
487   }
489   if (sock->wbuf != NULL)
490   {
491     wrote = 0;
492     while (wrote < sock->wbuf_len)
493     {
494       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
495       if (wb <= 0)
496       {
497         RRDD_LOG(LOG_INFO, "send_response: could not write results");
498         return -1;
499       }
500       wrote += wb;
501     }
502   }
504   free(sock->wbuf); sock->wbuf = NULL;
505   sock->wbuf_len = 0;
507   return 0;
508 } /* }}} */
510 static void wipe_ci_values(cache_item_t *ci, time_t when)
512   ci->values = NULL;
513   ci->values_num = 0;
515   ci->last_flush_time = when;
516   if (config_write_jitter > 0)
517     ci->last_flush_time += (random() % config_write_jitter);
520 /* remove_from_queue
521  * remove a "cache_item_t" item from the queue.
522  * must hold 'cache_lock' when calling this
523  */
524 static void remove_from_queue(cache_item_t *ci) /* {{{ */
526   if (ci == NULL) return;
528   if (ci->prev == NULL)
529     cache_queue_head = ci->next; /* reset head */
530   else
531     ci->prev->next = ci->next;
533   if (ci->next == NULL)
534     cache_queue_tail = ci->prev; /* reset the tail */
535   else
536     ci->next->prev = ci->prev;
538   ci->next = ci->prev = NULL;
539   ci->flags &= ~CI_FLAGS_IN_QUEUE;
540 } /* }}} static void remove_from_queue */
542 /*
543  * enqueue_cache_item:
544  * `cache_lock' must be acquired before calling this function!
545  */
546 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
547     queue_side_t side)
549   if (ci == NULL)
550     return (-1);
552   if (ci->values_num == 0)
553     return (0);
555   if (side == HEAD)
556   {
557     if (cache_queue_head == ci)
558       return 0;
560     /* remove from the double linked list */
561     if (ci->flags & CI_FLAGS_IN_QUEUE)
562       remove_from_queue(ci);
564     ci->prev = NULL;
565     ci->next = cache_queue_head;
566     if (ci->next != NULL)
567       ci->next->prev = ci;
568     cache_queue_head = ci;
570     if (cache_queue_tail == NULL)
571       cache_queue_tail = cache_queue_head;
572   }
573   else /* (side == TAIL) */
574   {
575     /* We don't move values back in the list.. */
576     if (ci->flags & CI_FLAGS_IN_QUEUE)
577       return (0);
579     assert (ci->next == NULL);
580     assert (ci->prev == NULL);
582     ci->prev = cache_queue_tail;
584     if (cache_queue_tail == NULL)
585       cache_queue_head = ci;
586     else
587       cache_queue_tail->next = ci;
589     cache_queue_tail = ci;
590   }
592   ci->flags |= CI_FLAGS_IN_QUEUE;
594   pthread_cond_broadcast(&cache_cond);
595   pthread_mutex_lock (&stats_lock);
596   stats_queue_length++;
597   pthread_mutex_unlock (&stats_lock);
599   return (0);
600 } /* }}} int enqueue_cache_item */
602 /*
603  * tree_callback_flush:
604  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
605  * while this is in progress.
606  */
607 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
608     gpointer data)
610   cache_item_t *ci;
611   callback_flush_data_t *cfd;
613   ci = (cache_item_t *) value;
614   cfd = (callback_flush_data_t *) data;
616   if ((ci->last_flush_time <= cfd->abs_timeout)
617       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
618       && (ci->values_num > 0))
619   {
620     enqueue_cache_item (ci, TAIL);
621   }
622   else if ((do_shutdown != 0)
623       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
624       && (ci->values_num > 0))
625   {
626     enqueue_cache_item (ci, TAIL);
627   }
628   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
629       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
630       && (ci->values_num <= 0))
631   {
632     char **temp;
634     temp = (char **) realloc (cfd->keys,
635         sizeof (char *) * (cfd->keys_num + 1));
636     if (temp == NULL)
637     {
638       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
639       return (FALSE);
640     }
641     cfd->keys = temp;
642     /* Make really sure this points to the _same_ place */
643     assert ((char *) key == ci->file);
644     cfd->keys[cfd->keys_num] = (char *) key;
645     cfd->keys_num++;
646   }
648   return (FALSE);
649 } /* }}} gboolean tree_callback_flush */
651 static int flush_old_values (int max_age)
653   callback_flush_data_t cfd;
654   size_t k;
656   memset (&cfd, 0, sizeof (cfd));
657   /* Pass the current time as user data so that we don't need to call
658    * `time' for each node. */
659   cfd.now = time (NULL);
660   cfd.keys = NULL;
661   cfd.keys_num = 0;
663   if (max_age > 0)
664     cfd.abs_timeout = cfd.now - max_age;
665   else
666     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
668   /* `tree_callback_flush' will return the keys of all values that haven't
669    * been touched in the last `config_flush_interval' seconds in `cfd'.
670    * The char*'s in this array point to the same memory as ci->file, so we
671    * don't need to free them separately. */
672   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
674   for (k = 0; k < cfd.keys_num; k++)
675   {
676     cache_item_t *ci;
678     /* This must not fail. */
679     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
680     assert (ci != NULL);
682     /* If we end up here with values available, something's seriously
683      * messed up. */
684     assert (ci->values_num == 0);
686     /* Remove the node from the tree */
687     g_tree_remove (cache_tree, cfd.keys[k]);
688     cfd.keys[k] = NULL;
690     /* Now free and clean up `ci'. */
691     free (ci->file);
692     ci->file = NULL;
693     free (ci);
694     ci = NULL;
695   } /* for (k = 0; k < cfd.keys_num; k++) */
697   if (cfd.keys != NULL)
698   {
699     free (cfd.keys);
700     cfd.keys = NULL;
701   }
703   return (0);
704 } /* int flush_old_values */
706 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
708   struct timeval now;
709   struct timespec next_flush;
710   int final_flush = 0; /* make sure we only flush once on shutdown */
712   gettimeofday (&now, NULL);
713   next_flush.tv_sec = now.tv_sec + config_flush_interval;
714   next_flush.tv_nsec = 1000 * now.tv_usec;
716   pthread_mutex_lock (&cache_lock);
717   while ((do_shutdown == 0) || (cache_queue_head != NULL))
718   {
719     cache_item_t *ci;
720     char *file;
721     char **values;
722     int values_num;
723     int status;
724     int i;
726     /* First, check if it's time to do the cache flush. */
727     gettimeofday (&now, NULL);
728     if ((now.tv_sec > next_flush.tv_sec)
729         || ((now.tv_sec == next_flush.tv_sec)
730           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
731     {
732       /* Flush all values that haven't been written in the last
733        * `config_write_interval' seconds. */
734       flush_old_values (config_write_interval);
736       /* Determine the time of the next cache flush. */
737       while (next_flush.tv_sec <= now.tv_sec)
738         next_flush.tv_sec += config_flush_interval;
740       /* unlock the cache while we rotate so we don't block incoming
741        * updates if the fsync() blocks on disk I/O */
742       pthread_mutex_unlock(&cache_lock);
743       journal_rotate();
744       pthread_mutex_lock(&cache_lock);
745     }
747     /* Now, check if there's something to store away. If not, wait until
748      * something comes in or it's time to do the cache flush.  if we are
749      * shutting down, do not wait around.  */
750     if (cache_queue_head == NULL && !do_shutdown)
751     {
752       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
753       if ((status != 0) && (status != ETIMEDOUT))
754       {
755         RRDD_LOG (LOG_ERR, "queue_thread_main: "
756             "pthread_cond_timedwait returned %i.", status);
757       }
758     }
760     /* We're about to shut down */
761     if (do_shutdown != 0 && !final_flush++)
762     {
763       if (config_flush_at_shutdown)
764         flush_old_values (-1); /* flush everything */
765       else
766         break;
767     }
769     /* Check if a value has arrived. This may be NULL if we timed out or there
770      * was an interrupt such as a signal. */
771     if (cache_queue_head == NULL)
772       continue;
774     ci = cache_queue_head;
776     /* copy the relevant parts */
777     file = strdup (ci->file);
778     if (file == NULL)
779     {
780       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
781       continue;
782     }
784     assert(ci->values != NULL);
785     assert(ci->values_num > 0);
787     values = ci->values;
788     values_num = ci->values_num;
790     wipe_ci_values(ci, time(NULL));
791     remove_from_queue(ci);
793     pthread_mutex_lock (&stats_lock);
794     assert (stats_queue_length > 0);
795     stats_queue_length--;
796     pthread_mutex_unlock (&stats_lock);
798     pthread_mutex_unlock (&cache_lock);
800     rrd_clear_error ();
801     status = rrd_update_r (file, NULL, values_num, (void *) values);
802     if (status != 0)
803     {
804       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
805           "rrd_update_r (%s) failed with status %i. (%s)",
806           file, status, rrd_get_error());
807     }
809     journal_write("wrote", file);
810     pthread_cond_broadcast(&ci->flushed);
812     for (i = 0; i < values_num; i++)
813       free (values[i]);
815     free(values);
816     free(file);
818     if (status == 0)
819     {
820       pthread_mutex_lock (&stats_lock);
821       stats_updates_written++;
822       stats_data_sets_written += values_num;
823       pthread_mutex_unlock (&stats_lock);
824     }
826     pthread_mutex_lock (&cache_lock);
828     /* We're about to shut down */
829     if (do_shutdown != 0 && !final_flush++)
830     {
831       if (config_flush_at_shutdown)
832           flush_old_values (-1); /* flush everything */
833       else
834         break;
835     }
836   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
837   pthread_mutex_unlock (&cache_lock);
839   if (config_flush_at_shutdown)
840   {
841     assert(cache_queue_head == NULL);
842     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
843   }
845   journal_done();
847   return (NULL);
848 } /* }}} void *queue_thread_main */
850 static int buffer_get_field (char **buffer_ret, /* {{{ */
851     size_t *buffer_size_ret, char **field_ret)
853   char *buffer;
854   size_t buffer_pos;
855   size_t buffer_size;
856   char *field;
857   size_t field_size;
858   int status;
860   buffer = *buffer_ret;
861   buffer_pos = 0;
862   buffer_size = *buffer_size_ret;
863   field = *buffer_ret;
864   field_size = 0;
866   if (buffer_size <= 0)
867     return (-1);
869   /* This is ensured by `handle_request'. */
870   assert (buffer[buffer_size - 1] == '\0');
872   status = -1;
873   while (buffer_pos < buffer_size)
874   {
875     /* Check for end-of-field or end-of-buffer */
876     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
877     {
878       field[field_size] = 0;
879       field_size++;
880       buffer_pos++;
881       status = 0;
882       break;
883     }
884     /* Handle escaped characters. */
885     else if (buffer[buffer_pos] == '\\')
886     {
887       if (buffer_pos >= (buffer_size - 1))
888         break;
889       buffer_pos++;
890       field[field_size] = buffer[buffer_pos];
891       field_size++;
892       buffer_pos++;
893     }
894     /* Normal operation */ 
895     else
896     {
897       field[field_size] = buffer[buffer_pos];
898       field_size++;
899       buffer_pos++;
900     }
901   } /* while (buffer_pos < buffer_size) */
903   if (status != 0)
904     return (status);
906   *buffer_ret = buffer + buffer_pos;
907   *buffer_size_ret = buffer_size - buffer_pos;
908   *field_ret = field;
910   return (0);
911 } /* }}} int buffer_get_field */
913 /* if we're restricting writes to the base directory,
914  * check whether the file falls within the dir
915  * returns 1 if OK, otherwise 0
916  */
917 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
919   assert(file != NULL);
921   if (!config_write_base_only
922       || sock == NULL /* journal replay */
923       || config_base_dir == NULL)
924     return 1;
926   if (strstr(file, "../") != NULL) goto err;
928   /* relative paths without "../" are ok */
929   if (*file != '/') return 1;
931   /* file must be of the format base + "/" + <1+ char filename> */
932   if (strlen(file) < _config_base_dir_len + 2) goto err;
933   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
934   if (*(file + _config_base_dir_len) != '/') goto err;
936   return 1;
938 err:
939   if (sock != NULL && sock->fd >= 0)
940     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
942   return 0;
943 } /* }}} static int check_file_access */
945 static int flush_file (const char *filename) /* {{{ */
947   cache_item_t *ci;
949   pthread_mutex_lock (&cache_lock);
951   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
952   if (ci == NULL)
953   {
954     pthread_mutex_unlock (&cache_lock);
955     return (ENOENT);
956   }
958   if (ci->values_num > 0)
959   {
960     /* Enqueue at head */
961     enqueue_cache_item (ci, HEAD);
962     pthread_cond_wait(&ci->flushed, &cache_lock);
963   }
965   pthread_mutex_unlock(&cache_lock);
967   return (0);
968 } /* }}} int flush_file */
970 static int handle_request_help (listen_socket_t *sock, /* {{{ */
971     char *buffer, size_t buffer_size)
973   int status;
974   char **help_text;
975   char *command;
977   char *help_help[2] =
978   {
979     "Command overview\n"
980     ,
981     "FLUSH <filename>\n"
982     "FLUSHALL\n"
983     "HELP [<command>]\n"
984     "UPDATE <filename> <values> [<values> ...]\n"
985     "BATCH\n"
986     "STATS\n"
987   };
989   char *help_flush[2] =
990   {
991     "Help for FLUSH\n"
992     ,
993     "Usage: FLUSH <filename>\n"
994     "\n"
995     "Adds the given filename to the head of the update queue and returns\n"
996     "after is has been dequeued.\n"
997   };
999   char *help_flushall[2] =
1000   {
1001     "Help for FLUSHALL\n"
1002     ,
1003     "Usage: FLUSHALL\n"
1004     "\n"
1005     "Triggers writing of all pending updates.  Returns immediately.\n"
1006   };
1008   char *help_update[2] =
1009   {
1010     "Help for UPDATE\n"
1011     ,
1012     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1013     "\n"
1014     "Adds the given file to the internal cache if it is not yet known and\n"
1015     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1016     "for details.\n"
1017     "\n"
1018     "Each <values> has the following form:\n"
1019     "  <values> = <time>:<value>[:<value>[...]]\n"
1020     "See the rrdupdate(1) manpage for details.\n"
1021   };
1023   char *help_stats[2] =
1024   {
1025     "Help for STATS\n"
1026     ,
1027     "Usage: STATS\n"
1028     "\n"
1029     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1030     "a description of the values.\n"
1031   };
1033   char *help_batch[2] =
1034   {
1035     "Help for BATCH\n"
1036     ,
1037     "The 'BATCH' command permits the client to initiate a bulk load\n"
1038     "   of commands to rrdcached.\n"
1039     "\n"
1040     "Usage:\n"
1041     "\n"
1042     "    client: BATCH\n"
1043     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1044     "    client: command #1\n"
1045     "    client: command #2\n"
1046     "    client: ... and so on\n"
1047     "    client: .\n"
1048     "    server: 2 errors\n"
1049     "    server: 7 message for command #7\n"
1050     "    server: 9 message for command #9\n"
1051     "\n"
1052     "For more information, consult the rrdcached(1) documentation.\n"
1053   };
1055   status = buffer_get_field (&buffer, &buffer_size, &command);
1056   if (status != 0)
1057     help_text = help_help;
1058   else
1059   {
1060     if (strcasecmp (command, "update") == 0)
1061       help_text = help_update;
1062     else if (strcasecmp (command, "flush") == 0)
1063       help_text = help_flush;
1064     else if (strcasecmp (command, "flushall") == 0)
1065       help_text = help_flushall;
1066     else if (strcasecmp (command, "stats") == 0)
1067       help_text = help_stats;
1068     else if (strcasecmp (command, "batch") == 0)
1069       help_text = help_batch;
1070     else
1071       help_text = help_help;
1072   }
1074   add_response_info(sock, help_text[1]);
1075   return send_response(sock, RESP_OK, help_text[0]);
1076 } /* }}} int handle_request_help */
1078 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1080   uint64_t copy_queue_length;
1081   uint64_t copy_updates_received;
1082   uint64_t copy_flush_received;
1083   uint64_t copy_updates_written;
1084   uint64_t copy_data_sets_written;
1085   uint64_t copy_journal_bytes;
1086   uint64_t copy_journal_rotate;
1088   uint64_t tree_nodes_number;
1089   uint64_t tree_depth;
1091   pthread_mutex_lock (&stats_lock);
1092   copy_queue_length       = stats_queue_length;
1093   copy_updates_received   = stats_updates_received;
1094   copy_flush_received     = stats_flush_received;
1095   copy_updates_written    = stats_updates_written;
1096   copy_data_sets_written  = stats_data_sets_written;
1097   copy_journal_bytes      = stats_journal_bytes;
1098   copy_journal_rotate     = stats_journal_rotate;
1099   pthread_mutex_unlock (&stats_lock);
1101   pthread_mutex_lock (&cache_lock);
1102   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1103   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1104   pthread_mutex_unlock (&cache_lock);
1106   add_response_info(sock,
1107                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1108   add_response_info(sock,
1109                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1110   add_response_info(sock,
1111                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1112   add_response_info(sock,
1113                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1114   add_response_info(sock,
1115                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1116   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1117   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1118   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1119   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1121   send_response(sock, RESP_OK, "Statistics follow\n");
1123   return (0);
1124 } /* }}} int handle_request_stats */
1126 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1127     char *buffer, size_t buffer_size)
1129   char *file;
1130   int status;
1132   status = buffer_get_field (&buffer, &buffer_size, &file);
1133   if (status != 0)
1134   {
1135     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1136   }
1137   else
1138   {
1139     pthread_mutex_lock(&stats_lock);
1140     stats_flush_received++;
1141     pthread_mutex_unlock(&stats_lock);
1143     if (!check_file_access(file, sock)) return 0;
1145     status = flush_file (file);
1146     if (status == 0)
1147       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1148     else if (status == ENOENT)
1149     {
1150       /* no file in our tree; see whether it exists at all */
1151       struct stat statbuf;
1153       memset(&statbuf, 0, sizeof(statbuf));
1154       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1155         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1156       else
1157         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1158     }
1159     else if (status < 0)
1160       return send_response(sock, RESP_ERR, "Internal error.\n");
1161     else
1162       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1163   }
1165   /* NOTREACHED */
1166   assert(1==0);
1167 } /* }}} int handle_request_slurp */
1169 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1172   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1174   pthread_mutex_lock(&cache_lock);
1175   flush_old_values(-1);
1176   pthread_mutex_unlock(&cache_lock);
1178   return send_response(sock, RESP_OK, "Started flush.\n");
1179 } /* }}} static int handle_request_flushall */
1181 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1182     char *buffer, size_t buffer_size)
1184   char *file;
1185   int values_num = 0;
1186   int status;
1188   time_t now;
1189   cache_item_t *ci;
1191   now = time (NULL);
1193   status = buffer_get_field (&buffer, &buffer_size, &file);
1194   if (status != 0)
1195     return send_response(sock, RESP_ERR,
1196                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1198   pthread_mutex_lock(&stats_lock);
1199   stats_updates_received++;
1200   pthread_mutex_unlock(&stats_lock);
1202   if (!check_file_access(file, sock)) return 0;
1204   pthread_mutex_lock (&cache_lock);
1205   ci = g_tree_lookup (cache_tree, file);
1207   if (ci == NULL) /* {{{ */
1208   {
1209     struct stat statbuf;
1211     /* don't hold the lock while we setup; stat(2) might block */
1212     pthread_mutex_unlock(&cache_lock);
1214     memset (&statbuf, 0, sizeof (statbuf));
1215     status = stat (file, &statbuf);
1216     if (status != 0)
1217     {
1218       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1220       status = errno;
1221       if (status == ENOENT)
1222         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1223       else
1224         return send_response(sock, RESP_ERR,
1225                              "stat failed with error %i.\n", status);
1226     }
1227     if (!S_ISREG (statbuf.st_mode))
1228       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1230     if (access(file, R_OK|W_OK) != 0)
1231       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1232                            file, rrd_strerror(errno));
1234     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1235     if (ci == NULL)
1236     {
1237       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1239       return send_response(sock, RESP_ERR, "malloc failed.\n");
1240     }
1241     memset (ci, 0, sizeof (cache_item_t));
1243     ci->file = strdup (file);
1244     if (ci->file == NULL)
1245     {
1246       free (ci);
1247       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1249       return send_response(sock, RESP_ERR, "strdup failed.\n");
1250     }
1252     wipe_ci_values(ci, now);
1253     ci->flags = CI_FLAGS_IN_TREE;
1255     pthread_mutex_lock(&cache_lock);
1256     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1257   } /* }}} */
1258   assert (ci != NULL);
1260   while (buffer_size > 0)
1261   {
1262     char **temp;
1263     char *value;
1265     status = buffer_get_field (&buffer, &buffer_size, &value);
1266     if (status != 0)
1267     {
1268       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1269       break;
1270     }
1272     temp = (char **) realloc (ci->values,
1273         sizeof (char *) * (ci->values_num + 1));
1274     if (temp == NULL)
1275     {
1276       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1277       continue;
1278     }
1279     ci->values = temp;
1281     ci->values[ci->values_num] = strdup (value);
1282     if (ci->values[ci->values_num] == NULL)
1283     {
1284       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1285       continue;
1286     }
1287     ci->values_num++;
1289     values_num++;
1290   }
1292   if (((now - ci->last_flush_time) >= config_write_interval)
1293       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1294       && (ci->values_num > 0))
1295   {
1296     enqueue_cache_item (ci, TAIL);
1297   }
1299   pthread_mutex_unlock (&cache_lock);
1301   if (values_num < 1)
1302     return send_response(sock, RESP_ERR, "No values updated.\n");
1303   else
1304     return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1306   /* NOTREACHED */
1307   assert(1==0);
1309 } /* }}} int handle_request_update */
1311 /* we came across a "WROTE" entry during journal replay.
1312  * throw away any values that we have accumulated for this file
1313  */
1314 static int handle_request_wrote (const char *buffer) /* {{{ */
1316   int i;
1317   cache_item_t *ci;
1318   const char *file = buffer;
1320   pthread_mutex_lock(&cache_lock);
1322   ci = g_tree_lookup(cache_tree, file);
1323   if (ci == NULL)
1324   {
1325     pthread_mutex_unlock(&cache_lock);
1326     return (0);
1327   }
1329   if (ci->values)
1330   {
1331     for (i=0; i < ci->values_num; i++)
1332       free(ci->values[i]);
1334     free(ci->values);
1335   }
1337   wipe_ci_values(ci, time(NULL));
1338   remove_from_queue(ci);
1340   pthread_mutex_unlock(&cache_lock);
1341   return (0);
1342 } /* }}} int handle_request_wrote */
1344 /* start "BATCH" processing */
1345 static int batch_start (listen_socket_t *sock) /* {{{ */
1347   int status;
1348   if (sock->batch_mode)
1349     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1351   status = send_response(sock, RESP_OK,
1352                          "Go ahead.  End with dot '.' on its own line.\n");
1353   sock->batch_mode = 1;
1354   sock->batch_cmd = 0;
1356   return status;
1357 } /* }}} static int batch_start */
1359 /* finish "BATCH" processing and return results to the client */
1360 static int batch_done (listen_socket_t *sock) /* {{{ */
1362   assert(sock->batch_mode);
1363   sock->batch_mode = 0;
1364   sock->batch_cmd  = 0;
1365   return send_response(sock, RESP_OK, "errors\n");
1366 } /* }}} static int batch_done */
1368 /* returns 1 if we have the required privilege level */
1369 static int has_privilege (listen_socket_t *sock, /* {{{ */
1370                           socket_privilege priv)
1372   if (sock == NULL) /* journal replay */
1373     return 1;
1375   if (sock->privilege >= priv)
1376     return 1;
1378   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1379 } /* }}} static int has_privilege */
1381 /* if sock==NULL, we are in journal replay mode */
1382 static int handle_request (listen_socket_t *sock, /* {{{ */
1383                            char *buffer, size_t buffer_size)
1385   char *buffer_ptr;
1386   char *command;
1387   int status;
1389   assert (buffer[buffer_size - 1] == '\0');
1391   buffer_ptr = buffer;
1392   command = NULL;
1393   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1394   if (status != 0)
1395   {
1396     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1397     return (-1);
1398   }
1400   if (sock != NULL && sock->batch_mode)
1401     sock->batch_cmd++;
1403   if (strcasecmp (command, "update") == 0)
1404   {
1405     status = has_privilege(sock, PRIV_HIGH);
1406     if (status <= 0)
1407       return status;
1409     /* don't re-write updates in replay mode */
1410     if (sock != NULL)
1411       journal_write(command, buffer_ptr);
1413     return (handle_request_update (sock, buffer_ptr, buffer_size));
1414   }
1415   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1416   {
1417     /* this is only valid in replay mode */
1418     return (handle_request_wrote (buffer_ptr));
1419   }
1420   else if (strcasecmp (command, "flush") == 0)
1421     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1422   else if (strcasecmp (command, "flushall") == 0)
1423   {
1424     status = has_privilege(sock, PRIV_HIGH);
1425     if (status <= 0)
1426       return status;
1428     return (handle_request_flushall(sock));
1429   }
1430   else if (strcasecmp (command, "stats") == 0)
1431     return (handle_request_stats (sock));
1432   else if (strcasecmp (command, "help") == 0)
1433     return (handle_request_help (sock, buffer_ptr, buffer_size));
1434   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1435     return batch_start(sock);
1436   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1437     return batch_done(sock);
1438   else
1439     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1441   /* NOTREACHED */
1442   assert(1==0);
1443 } /* }}} int handle_request */
1445 /* MUST NOT hold journal_lock before calling this */
1446 static void journal_rotate(void) /* {{{ */
1448   FILE *old_fh = NULL;
1450   if (journal_cur == NULL || journal_old == NULL)
1451     return;
1453   pthread_mutex_lock(&journal_lock);
1455   /* we rotate this way (rename before close) so that the we can release
1456    * the journal lock as fast as possible.  Journal writes to the new
1457    * journal can proceed immediately after the new file is opened.  The
1458    * fclose can then block without affecting new updates.
1459    */
1460   if (journal_fh != NULL)
1461   {
1462     old_fh = journal_fh;
1463     rename(journal_cur, journal_old);
1464     ++stats_journal_rotate;
1465   }
1467   journal_fh = fopen(journal_cur, "a");
1468   pthread_mutex_unlock(&journal_lock);
1470   if (old_fh != NULL)
1471     fclose(old_fh);
1473   if (journal_fh == NULL)
1474   {
1475     RRDD_LOG(LOG_CRIT,
1476              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1477              journal_cur, rrd_strerror(errno));
1479     RRDD_LOG(LOG_ERR,
1480              "JOURNALING DISABLED: All values will be flushed at shutdown");
1481     config_flush_at_shutdown = 1;
1482   }
1484 } /* }}} static void journal_rotate */
1486 static void journal_done(void) /* {{{ */
1488   if (journal_cur == NULL)
1489     return;
1491   pthread_mutex_lock(&journal_lock);
1492   if (journal_fh != NULL)
1493   {
1494     fclose(journal_fh);
1495     journal_fh = NULL;
1496   }
1498   if (config_flush_at_shutdown)
1499   {
1500     RRDD_LOG(LOG_INFO, "removing journals");
1501     unlink(journal_old);
1502     unlink(journal_cur);
1503   }
1504   else
1505   {
1506     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1507              "journals will be used at next startup");
1508   }
1510   pthread_mutex_unlock(&journal_lock);
1512 } /* }}} static void journal_done */
1514 static int journal_write(char *cmd, char *args) /* {{{ */
1516   int chars;
1518   if (journal_fh == NULL)
1519     return 0;
1521   pthread_mutex_lock(&journal_lock);
1522   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1523   pthread_mutex_unlock(&journal_lock);
1525   if (chars > 0)
1526   {
1527     pthread_mutex_lock(&stats_lock);
1528     stats_journal_bytes += chars;
1529     pthread_mutex_unlock(&stats_lock);
1530   }
1532   return chars;
1533 } /* }}} static int journal_write */
1535 static int journal_replay (const char *file) /* {{{ */
1537   FILE *fh;
1538   int entry_cnt = 0;
1539   int fail_cnt = 0;
1540   uint64_t line = 0;
1541   char entry[CMD_MAX];
1543   if (file == NULL) return 0;
1545   fh = fopen(file, "r");
1546   if (fh == NULL)
1547   {
1548     if (errno != ENOENT)
1549       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1550                file, rrd_strerror(errno));
1551     return 0;
1552   }
1553   else
1554     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1556   while(!feof(fh))
1557   {
1558     size_t entry_len;
1560     ++line;
1561     if (fgets(entry, sizeof(entry), fh) == NULL)
1562       break;
1563     entry_len = strlen(entry);
1565     /* check \n termination in case journal writing crashed mid-line */
1566     if (entry_len == 0)
1567       continue;
1568     else if (entry[entry_len - 1] != '\n')
1569     {
1570       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1571       ++fail_cnt;
1572       continue;
1573     }
1575     entry[entry_len - 1] = '\0';
1577     if (handle_request(NULL, entry, entry_len) == 0)
1578       ++entry_cnt;
1579     else
1580       ++fail_cnt;
1581   }
1583   fclose(fh);
1585   if (entry_cnt > 0)
1586   {
1587     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1588              entry_cnt, fail_cnt);
1589     return 1;
1590   }
1591   else
1592     return 0;
1594 } /* }}} static int journal_replay */
1596 static void close_connection(listen_socket_t *sock)
1598   close(sock->fd) ;  sock->fd   = -1;
1599   free(sock->rbuf);  sock->rbuf = NULL;
1600   free(sock->wbuf);  sock->wbuf = NULL;
1602   free(sock);
1605 static void *connection_thread_main (void *args) /* {{{ */
1607   pthread_t self;
1608   listen_socket_t *sock;
1609   int i;
1610   int fd;
1612   sock = (listen_socket_t *) args;
1613   fd = sock->fd;
1615   /* init read buffers */
1616   sock->next_read = sock->next_cmd = 0;
1617   sock->rbuf = malloc(RBUF_SIZE);
1618   if (sock->rbuf == NULL)
1619   {
1620     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1621     close_connection(sock);
1622     return NULL;
1623   }
1625   pthread_mutex_lock (&connection_threads_lock);
1626   {
1627     pthread_t *temp;
1629     temp = (pthread_t *) realloc (connection_threads,
1630         sizeof (pthread_t) * (connection_threads_num + 1));
1631     if (temp == NULL)
1632     {
1633       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1634     }
1635     else
1636     {
1637       connection_threads = temp;
1638       connection_threads[connection_threads_num] = pthread_self ();
1639       connection_threads_num++;
1640     }
1641   }
1642   pthread_mutex_unlock (&connection_threads_lock);
1644   while (do_shutdown == 0)
1645   {
1646     char *cmd;
1647     ssize_t cmd_len;
1648     ssize_t rbytes;
1650     struct pollfd pollfd;
1651     int status;
1653     pollfd.fd = fd;
1654     pollfd.events = POLLIN | POLLPRI;
1655     pollfd.revents = 0;
1657     status = poll (&pollfd, 1, /* timeout = */ 500);
1658     if (do_shutdown)
1659       break;
1660     else if (status == 0) /* timeout */
1661       continue;
1662     else if (status < 0) /* error */
1663     {
1664       status = errno;
1665       if (status == EINTR)
1666         continue;
1667       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1668       continue;
1669     }
1671     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1672     {
1673       close_connection(sock);
1674       break;
1675     }
1676     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1677     {
1678       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1679           "poll(2) returned something unexpected: %#04hx",
1680           pollfd.revents);
1681       close_connection(sock);
1682       break;
1683     }
1685     rbytes = read(fd, sock->rbuf + sock->next_read,
1686                   RBUF_SIZE - sock->next_read);
1687     if (rbytes < 0)
1688     {
1689       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1690       break;
1691     }
1692     else if (rbytes == 0)
1693       break; /* eof */
1695     sock->next_read += rbytes;
1697     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1698     {
1699       status = handle_request (sock, cmd, cmd_len+1);
1700       if (status != 0)
1701         goto out_close;
1702     }
1703   }
1705 out_close:
1706   close_connection(sock);
1708   self = pthread_self ();
1709   /* Remove this thread from the connection threads list */
1710   pthread_mutex_lock (&connection_threads_lock);
1711   /* Find out own index in the array */
1712   for (i = 0; i < connection_threads_num; i++)
1713     if (pthread_equal (connection_threads[i], self) != 0)
1714       break;
1715   assert (i < connection_threads_num);
1717   /* Move the trailing threads forward. */
1718   if (i < (connection_threads_num - 1))
1719   {
1720     memmove (connection_threads + i,
1721         connection_threads + i + 1,
1722         sizeof (pthread_t) * (connection_threads_num - i - 1));
1723   }
1725   connection_threads_num--;
1726   pthread_mutex_unlock (&connection_threads_lock);
1728   return (NULL);
1729 } /* }}} void *connection_thread_main */
1731 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1733   int fd;
1734   struct sockaddr_un sa;
1735   listen_socket_t *temp;
1736   int status;
1737   const char *path;
1739   path = sock->addr;
1740   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1741     path += strlen("unix:");
1743   temp = (listen_socket_t *) realloc (listen_fds,
1744       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1745   if (temp == NULL)
1746   {
1747     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1748     return (-1);
1749   }
1750   listen_fds = temp;
1751   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1753   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1754   if (fd < 0)
1755   {
1756     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1757     return (-1);
1758   }
1760   memset (&sa, 0, sizeof (sa));
1761   sa.sun_family = AF_UNIX;
1762   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1764   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1765   if (status != 0)
1766   {
1767     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1768     close (fd);
1769     unlink (path);
1770     return (-1);
1771   }
1773   status = listen (fd, /* backlog = */ 10);
1774   if (status != 0)
1775   {
1776     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1777     close (fd);
1778     unlink (path);
1779     return (-1);
1780   }
1782   listen_fds[listen_fds_num].fd = fd;
1783   listen_fds[listen_fds_num].family = PF_UNIX;
1784   strncpy(listen_fds[listen_fds_num].addr, path,
1785           sizeof (listen_fds[listen_fds_num].addr) - 1);
1786   listen_fds_num++;
1788   return (0);
1789 } /* }}} int open_listen_socket_unix */
1791 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1793   struct addrinfo ai_hints;
1794   struct addrinfo *ai_res;
1795   struct addrinfo *ai_ptr;
1796   char addr_copy[NI_MAXHOST];
1797   char *addr;
1798   char *port;
1799   int status;
1801   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1802   addr_copy[sizeof (addr_copy) - 1] = 0;
1803   addr = addr_copy;
1805   memset (&ai_hints, 0, sizeof (ai_hints));
1806   ai_hints.ai_flags = 0;
1807 #ifdef AI_ADDRCONFIG
1808   ai_hints.ai_flags |= AI_ADDRCONFIG;
1809 #endif
1810   ai_hints.ai_family = AF_UNSPEC;
1811   ai_hints.ai_socktype = SOCK_STREAM;
1813   port = NULL;
1814   if (*addr == '[') /* IPv6+port format */
1815   {
1816     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1817     addr++;
1819     port = strchr (addr, ']');
1820     if (port == NULL)
1821     {
1822       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1823           sock->addr);
1824       return (-1);
1825     }
1826     *port = 0;
1827     port++;
1829     if (*port == ':')
1830       port++;
1831     else if (*port == 0)
1832       port = NULL;
1833     else
1834     {
1835       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1836           port);
1837       return (-1);
1838     }
1839   } /* if (*addr = ']') */
1840   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1841   {
1842     port = rindex(addr, ':');
1843     if (port != NULL)
1844     {
1845       *port = 0;
1846       port++;
1847     }
1848   }
1849   ai_res = NULL;
1850   status = getaddrinfo (addr,
1851                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1852                         &ai_hints, &ai_res);
1853   if (status != 0)
1854   {
1855     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1856         "%s", addr, gai_strerror (status));
1857     return (-1);
1858   }
1860   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1861   {
1862     int fd;
1863     listen_socket_t *temp;
1864     int one = 1;
1866     temp = (listen_socket_t *) realloc (listen_fds,
1867         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1868     if (temp == NULL)
1869     {
1870       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1871       continue;
1872     }
1873     listen_fds = temp;
1874     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1876     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1877     if (fd < 0)
1878     {
1879       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1880       continue;
1881     }
1883     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1885     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1886     if (status != 0)
1887     {
1888       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1889       close (fd);
1890       continue;
1891     }
1893     status = listen (fd, /* backlog = */ 10);
1894     if (status != 0)
1895     {
1896       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1897       close (fd);
1898       return (-1);
1899     }
1901     listen_fds[listen_fds_num].fd = fd;
1902     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1903     listen_fds_num++;
1904   } /* for (ai_ptr) */
1906   return (0);
1907 } /* }}} static int open_listen_socket_network */
1909 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1911   assert(sock != NULL);
1912   assert(sock->addr != NULL);
1914   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1915       || sock->addr[0] == '/')
1916     return (open_listen_socket_unix(sock));
1917   else
1918     return (open_listen_socket_network(sock));
1919 } /* }}} int open_listen_socket */
1921 static int close_listen_sockets (void) /* {{{ */
1923   size_t i;
1925   for (i = 0; i < listen_fds_num; i++)
1926   {
1927     close (listen_fds[i].fd);
1929     if (listen_fds[i].family == PF_UNIX)
1930       unlink(listen_fds[i].addr);
1931   }
1933   free (listen_fds);
1934   listen_fds = NULL;
1935   listen_fds_num = 0;
1937   return (0);
1938 } /* }}} int close_listen_sockets */
1940 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1942   struct pollfd *pollfds;
1943   int pollfds_num;
1944   int status;
1945   int i;
1947   for (i = 0; i < config_listen_address_list_len; i++)
1948     open_listen_socket (config_listen_address_list[i]);
1950   if (config_listen_address_list_len < 1)
1951   {
1952     listen_socket_t sock;
1953     memset(&sock, 0, sizeof(sock));
1954     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1955     open_listen_socket (&sock);
1956   }
1958   if (listen_fds_num < 1)
1959   {
1960     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1961         "could be opened. Sorry.");
1962     return (NULL);
1963   }
1965   pollfds_num = listen_fds_num;
1966   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1967   if (pollfds == NULL)
1968   {
1969     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1970     return (NULL);
1971   }
1972   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1974   RRDD_LOG(LOG_INFO, "listening for connections");
1976   while (do_shutdown == 0)
1977   {
1978     assert (pollfds_num == ((int) listen_fds_num));
1979     for (i = 0; i < pollfds_num; i++)
1980     {
1981       pollfds[i].fd = listen_fds[i].fd;
1982       pollfds[i].events = POLLIN | POLLPRI;
1983       pollfds[i].revents = 0;
1984     }
1986     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1987     if (do_shutdown)
1988       break;
1989     else if (status == 0) /* timeout */
1990       continue;
1991     else if (status < 0) /* error */
1992     {
1993       status = errno;
1994       if (status != EINTR)
1995       {
1996         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1997       }
1998       continue;
1999     }
2001     for (i = 0; i < pollfds_num; i++)
2002     {
2003       listen_socket_t *client_sock;
2004       struct sockaddr_storage client_sa;
2005       socklen_t client_sa_size;
2006       pthread_t tid;
2007       pthread_attr_t attr;
2009       if (pollfds[i].revents == 0)
2010         continue;
2012       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2013       {
2014         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2015             "poll(2) returned something unexpected for listen FD #%i.",
2016             pollfds[i].fd);
2017         continue;
2018       }
2020       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2021       if (client_sock == NULL)
2022       {
2023         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2024         continue;
2025       }
2026       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2028       client_sa_size = sizeof (client_sa);
2029       client_sock->fd = accept (pollfds[i].fd,
2030           (struct sockaddr *) &client_sa, &client_sa_size);
2031       if (client_sock->fd < 0)
2032       {
2033         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2034         free(client_sock);
2035         continue;
2036       }
2038       pthread_attr_init (&attr);
2039       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2041       status = pthread_create (&tid, &attr, connection_thread_main,
2042                                client_sock);
2043       if (status != 0)
2044       {
2045         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2046         close_connection(client_sock);
2047         continue;
2048       }
2049     } /* for (pollfds_num) */
2050   } /* while (do_shutdown == 0) */
2052   RRDD_LOG(LOG_INFO, "starting shutdown");
2054   close_listen_sockets ();
2056   pthread_mutex_lock (&connection_threads_lock);
2057   while (connection_threads_num > 0)
2058   {
2059     pthread_t wait_for;
2061     wait_for = connection_threads[0];
2063     pthread_mutex_unlock (&connection_threads_lock);
2064     pthread_join (wait_for, /* retval = */ NULL);
2065     pthread_mutex_lock (&connection_threads_lock);
2066   }
2067   pthread_mutex_unlock (&connection_threads_lock);
2069   return (NULL);
2070 } /* }}} void *listen_thread_main */
2072 static int daemonize (void) /* {{{ */
2074   int status;
2075   int fd;
2076   char *base_dir;
2078   fd = open_pidfile();
2079   if (fd < 0) return fd;
2081   if (!stay_foreground)
2082   {
2083     pid_t child;
2085     child = fork ();
2086     if (child < 0)
2087     {
2088       fprintf (stderr, "daemonize: fork(2) failed.\n");
2089       return (-1);
2090     }
2091     else if (child > 0)
2092     {
2093       return (1);
2094     }
2096     /* Become session leader */
2097     setsid ();
2099     /* Open the first three file descriptors to /dev/null */
2100     close (2);
2101     close (1);
2102     close (0);
2104     open ("/dev/null", O_RDWR);
2105     dup (0);
2106     dup (0);
2107   } /* if (!stay_foreground) */
2109   /* Change into the /tmp directory. */
2110   base_dir = (config_base_dir != NULL)
2111     ? config_base_dir
2112     : "/tmp";
2113   status = chdir (base_dir);
2114   if (status != 0)
2115   {
2116     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2117     return (-1);
2118   }
2120   install_signal_handlers();
2122   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2123   RRDD_LOG(LOG_INFO, "starting up");
2125   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2126   if (cache_tree == NULL)
2127   {
2128     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2129     return (-1);
2130   }
2132   status = write_pidfile (fd);
2133   return status;
2134 } /* }}} int daemonize */
2136 static int cleanup (void) /* {{{ */
2138   do_shutdown++;
2140   pthread_cond_signal (&cache_cond);
2141   pthread_join (queue_thread, /* return = */ NULL);
2143   remove_pidfile ();
2145   RRDD_LOG(LOG_INFO, "goodbye");
2146   closelog ();
2148   return (0);
2149 } /* }}} int cleanup */
2151 static int read_options (int argc, char **argv) /* {{{ */
2153   int option;
2154   int status = 0;
2156   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2157   {
2158     switch (option)
2159     {
2160       case 'g':
2161         stay_foreground=1;
2162         break;
2164       case 'L':
2165       case 'l':
2166       {
2167         listen_socket_t **temp;
2168         listen_socket_t *new;
2170         new = malloc(sizeof(listen_socket_t));
2171         if (new == NULL)
2172         {
2173           fprintf(stderr, "read_options: malloc failed.\n");
2174           return(2);
2175         }
2176         memset(new, 0, sizeof(listen_socket_t));
2178         temp = (listen_socket_t **) realloc (config_listen_address_list,
2179             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2180         if (temp == NULL)
2181         {
2182           fprintf (stderr, "read_options: realloc failed.\n");
2183           return (2);
2184         }
2185         config_listen_address_list = temp;
2187         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2188         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2190         temp[config_listen_address_list_len] = new;
2191         config_listen_address_list_len++;
2192       }
2193       break;
2195       case 'f':
2196       {
2197         int temp;
2199         temp = atoi (optarg);
2200         if (temp > 0)
2201           config_flush_interval = temp;
2202         else
2203         {
2204           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2205           status = 3;
2206         }
2207       }
2208       break;
2210       case 'w':
2211       {
2212         int temp;
2214         temp = atoi (optarg);
2215         if (temp > 0)
2216           config_write_interval = temp;
2217         else
2218         {
2219           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2220           status = 2;
2221         }
2222       }
2223       break;
2225       case 'z':
2226       {
2227         int temp;
2229         temp = atoi(optarg);
2230         if (temp > 0)
2231           config_write_jitter = temp;
2232         else
2233         {
2234           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2235           status = 2;
2236         }
2238         break;
2239       }
2241       case 'B':
2242         config_write_base_only = 1;
2243         break;
2245       case 'b':
2246       {
2247         size_t len;
2249         if (config_base_dir != NULL)
2250           free (config_base_dir);
2251         config_base_dir = strdup (optarg);
2252         if (config_base_dir == NULL)
2253         {
2254           fprintf (stderr, "read_options: strdup failed.\n");
2255           return (3);
2256         }
2258         len = strlen (config_base_dir);
2259         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2260         {
2261           config_base_dir[len - 1] = 0;
2262           len--;
2263         }
2265         if (len < 1)
2266         {
2267           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2268           return (4);
2269         }
2271         _config_base_dir_len = len;
2272       }
2273       break;
2275       case 'p':
2276       {
2277         if (config_pid_file != NULL)
2278           free (config_pid_file);
2279         config_pid_file = strdup (optarg);
2280         if (config_pid_file == NULL)
2281         {
2282           fprintf (stderr, "read_options: strdup failed.\n");
2283           return (3);
2284         }
2285       }
2286       break;
2288       case 'F':
2289         config_flush_at_shutdown = 1;
2290         break;
2292       case 'j':
2293       {
2294         struct stat statbuf;
2295         const char *dir = optarg;
2297         status = stat(dir, &statbuf);
2298         if (status != 0)
2299         {
2300           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2301           return 6;
2302         }
2304         if (!S_ISDIR(statbuf.st_mode)
2305             || access(dir, R_OK|W_OK|X_OK) != 0)
2306         {
2307           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2308                   errno ? rrd_strerror(errno) : "");
2309           return 6;
2310         }
2312         journal_cur = malloc(PATH_MAX + 1);
2313         journal_old = malloc(PATH_MAX + 1);
2314         if (journal_cur == NULL || journal_old == NULL)
2315         {
2316           fprintf(stderr, "malloc failure for journal files\n");
2317           return 6;
2318         }
2319         else 
2320         {
2321           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2322           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2323         }
2324       }
2325       break;
2327       case 'h':
2328       case '?':
2329         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2330             "\n"
2331             "Usage: rrdcached [options]\n"
2332             "\n"
2333             "Valid options are:\n"
2334             "  -l <address>  Socket address to listen to.\n"
2335             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2336             "  -w <seconds>  Interval in which to write data.\n"
2337             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2338             "  -f <seconds>  Interval in which to flush dead data.\n"
2339             "  -p <file>     Location of the PID-file.\n"
2340             "  -b <dir>      Base directory to change to.\n"
2341             "  -B            Restrict file access to paths within -b <dir>\n"
2342             "  -g            Do not fork and run in the foreground.\n"
2343             "  -j <dir>      Directory in which to create the journal files.\n"
2344             "  -F            Always flush all updates at shutdown\n"
2345             "\n"
2346             "For more information and a detailed description of all options "
2347             "please refer\n"
2348             "to the rrdcached(1) manual page.\n",
2349             VERSION);
2350         status = -1;
2351         break;
2352     } /* switch (option) */
2353   } /* while (getopt) */
2355   /* advise the user when values are not sane */
2356   if (config_flush_interval < 2 * config_write_interval)
2357     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2358             " 2x write interval (-w) !\n");
2359   if (config_write_jitter > config_write_interval)
2360     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2361             " write interval (-w) !\n");
2363   if (config_write_base_only && config_base_dir == NULL)
2364     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2365             "  Consult the rrdcached documentation\n");
2367   if (journal_cur == NULL)
2368     config_flush_at_shutdown = 1;
2370   return (status);
2371 } /* }}} int read_options */
2373 int main (int argc, char **argv)
2375   int status;
2377   status = read_options (argc, argv);
2378   if (status != 0)
2379   {
2380     if (status < 0)
2381       status = 0;
2382     return (status);
2383   }
2385   status = daemonize ();
2386   if (status == 1)
2387   {
2388     struct sigaction sigchld;
2390     memset (&sigchld, 0, sizeof (sigchld));
2391     sigchld.sa_handler = SIG_IGN;
2392     sigaction (SIGCHLD, &sigchld, NULL);
2394     return (0);
2395   }
2396   else if (status != 0)
2397   {
2398     fprintf (stderr, "daemonize failed, exiting.\n");
2399     return (1);
2400   }
2402   if (journal_cur != NULL)
2403   {
2404     int had_journal = 0;
2406     pthread_mutex_lock(&journal_lock);
2408     RRDD_LOG(LOG_INFO, "checking for journal files");
2410     had_journal += journal_replay(journal_old);
2411     had_journal += journal_replay(journal_cur);
2413     if (had_journal)
2414       flush_old_values(-1);
2416     pthread_mutex_unlock(&journal_lock);
2417     journal_rotate();
2419     RRDD_LOG(LOG_INFO, "journal processing complete");
2420   }
2422   /* start the queue thread */
2423   memset (&queue_thread, 0, sizeof (queue_thread));
2424   status = pthread_create (&queue_thread,
2425                            NULL, /* attr */
2426                            queue_thread_main,
2427                            NULL); /* args */
2428   if (status != 0)
2429   {
2430     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2431     cleanup();
2432     return (1);
2433   }
2435   listen_thread_main (NULL);
2436   cleanup ();
2438   return (0);
2439 } /* int main */
2441 /*
2442  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2443  */