Code

This patch moves the permission handling code around a bit.
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 typedef enum
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
119   /* state for BATCH processing */
120   int batch_mode;
121   int batch_cmd;
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE  (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143   int flags;
144   pthread_cond_t  flushed;
145   cache_item_t *prev;
146   cache_item_t *next;
147 };
149 struct callback_flush_data_s
151   time_t now;
152   time_t abs_timeout;
153   char **keys;
154   size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
158 enum queue_side_e
160   HEAD,
161   TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
169 /*
170  * Variables
171  */
172 static int stay_foreground = 0;
173 static uid_t daemon_uid;
175 static listen_socket_t *listen_fds = NULL;
176 static size_t listen_fds_num = 0;
178 static int do_shutdown = 0;
180 static pthread_t queue_thread;
182 static pthread_t *connection_threads = NULL;
183 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
184 static int connection_threads_num = 0;
186 /* Cache stuff */
187 static GTree          *cache_tree = NULL;
188 static cache_item_t   *cache_queue_head = NULL;
189 static cache_item_t   *cache_queue_tail = NULL;
190 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
193 static int config_write_interval = 300;
194 static int config_write_jitter   = 0;
195 static int config_flush_interval = 3600;
196 static int config_flush_at_shutdown = 0;
197 static char *config_pid_file = NULL;
198 static char *config_base_dir = NULL;
199 static size_t _config_base_dir_len = 0;
200 static int config_write_base_only = 0;
202 static listen_socket_t **config_listen_address_list = NULL;
203 static int config_listen_address_list_len = 0;
205 static uint64_t stats_queue_length = 0;
206 static uint64_t stats_updates_received = 0;
207 static uint64_t stats_flush_received = 0;
208 static uint64_t stats_updates_written = 0;
209 static uint64_t stats_data_sets_written = 0;
210 static uint64_t stats_journal_bytes = 0;
211 static uint64_t stats_journal_rotate = 0;
212 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214 /* Journaled updates */
215 static char *journal_cur = NULL;
216 static char *journal_old = NULL;
217 static FILE *journal_fh = NULL;
218 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
219 static int journal_write(char *cmd, char *args);
220 static void journal_done(void);
221 static void journal_rotate(void);
223 /* 
224  * Functions
225  */
226 static void sig_common (const char *sig) /* {{{ */
228   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
229   do_shutdown++;
230   pthread_cond_broadcast(&cache_cond);
231 } /* }}} void sig_common */
233 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
235   sig_common("INT");
236 } /* }}} void sig_int_handler */
238 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
240   sig_common("TERM");
241 } /* }}} void sig_term_handler */
243 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
245   config_flush_at_shutdown = 1;
246   sig_common("USR1");
247 } /* }}} void sig_usr1_handler */
249 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
251   config_flush_at_shutdown = 0;
252   sig_common("USR2");
253 } /* }}} void sig_usr2_handler */
255 static void install_signal_handlers(void) /* {{{ */
257   /* These structures are static, because `sigaction' behaves weird if the are
258    * overwritten.. */
259   static struct sigaction sa_int;
260   static struct sigaction sa_term;
261   static struct sigaction sa_pipe;
262   static struct sigaction sa_usr1;
263   static struct sigaction sa_usr2;
265   /* Install signal handlers */
266   memset (&sa_int, 0, sizeof (sa_int));
267   sa_int.sa_handler = sig_int_handler;
268   sigaction (SIGINT, &sa_int, NULL);
270   memset (&sa_term, 0, sizeof (sa_term));
271   sa_term.sa_handler = sig_term_handler;
272   sigaction (SIGTERM, &sa_term, NULL);
274   memset (&sa_pipe, 0, sizeof (sa_pipe));
275   sa_pipe.sa_handler = SIG_IGN;
276   sigaction (SIGPIPE, &sa_pipe, NULL);
278   memset (&sa_pipe, 0, sizeof (sa_usr1));
279   sa_usr1.sa_handler = sig_usr1_handler;
280   sigaction (SIGUSR1, &sa_usr1, NULL);
282   memset (&sa_usr2, 0, sizeof (sa_usr2));
283   sa_usr2.sa_handler = sig_usr2_handler;
284   sigaction (SIGUSR2, &sa_usr2, NULL);
286 } /* }}} void install_signal_handlers */
288 static int open_pidfile(void) /* {{{ */
290   int fd;
291   char *file;
293   file = (config_pid_file != NULL)
294     ? config_pid_file
295     : LOCALSTATEDIR "/run/rrdcached.pid";
297   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
298   if (fd < 0)
299     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
300             file, rrd_strerror(errno));
302   return(fd);
303 } /* }}} static int open_pidfile */
305 static int write_pidfile (int fd) /* {{{ */
307   pid_t pid;
308   FILE *fh;
310   pid = getpid ();
312   fh = fdopen (fd, "w");
313   if (fh == NULL)
314   {
315     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
316     close(fd);
317     return (-1);
318   }
320   fprintf (fh, "%i\n", (int) pid);
321   fclose (fh);
323   return (0);
324 } /* }}} int write_pidfile */
326 static int remove_pidfile (void) /* {{{ */
328   char *file;
329   int status;
331   file = (config_pid_file != NULL)
332     ? config_pid_file
333     : LOCALSTATEDIR "/run/rrdcached.pid";
335   status = unlink (file);
336   if (status == 0)
337     return (0);
338   return (errno);
339 } /* }}} int remove_pidfile */
341 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
343   char *eol;
345   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
346                sock->next_read - sock->next_cmd);
348   if (eol == NULL)
349   {
350     /* no commands left, move remainder back to front of rbuf */
351     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
352             sock->next_read - sock->next_cmd);
353     sock->next_read -= sock->next_cmd;
354     sock->next_cmd = 0;
355     *len = 0;
356     return NULL;
357   }
358   else
359   {
360     char *cmd = sock->rbuf + sock->next_cmd;
361     *eol = '\0';
363     sock->next_cmd = eol - sock->rbuf + 1;
365     if (eol > sock->rbuf && *(eol-1) == '\r')
366       *(--eol) = '\0'; /* handle "\r\n" EOL */
368     *len = eol - cmd;
370     return cmd;
371   }
373   /* NOTREACHED */
374   assert(1==0);
377 /* add the characters directly to the write buffer */
378 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
380   char *new_buf;
382   assert(sock != NULL);
384   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
385   if (new_buf == NULL)
386   {
387     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
388     return -1;
389   }
391   strncpy(new_buf + sock->wbuf_len, str, len + 1);
393   sock->wbuf = new_buf;
394   sock->wbuf_len += len;
396   return 0;
397 } /* }}} static int add_to_wbuf */
399 /* add the text to the "extra" info that's sent after the status line */
400 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
402   va_list argp;
403   char buffer[CMD_MAX];
404   int len;
406   if (sock == NULL) return 0; /* journal replay mode */
407   if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
409   va_start(argp, fmt);
410 #ifdef HAVE_VSNPRINTF
411   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
412 #else
413   len = vsprintf(buffer, fmt, argp);
414 #endif
415   va_end(argp);
416   if (len < 0)
417   {
418     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
419     return -1;
420   }
422   return add_to_wbuf(sock, buffer, len);
423 } /* }}} static int add_response_info */
425 static int count_lines(char *str) /* {{{ */
427   int lines = 0;
429   if (str != NULL)
430   {
431     while ((str = strchr(str, '\n')) != NULL)
432     {
433       ++lines;
434       ++str;
435     }
436   }
438   return lines;
439 } /* }}} static int count_lines */
441 /* send the response back to the user.
442  * returns 0 on success, -1 on error
443  * write buffer is always zeroed after this call */
444 static int send_response (listen_socket_t *sock, response_code rc,
445                           char *fmt, ...) /* {{{ */
447   va_list argp;
448   char buffer[CMD_MAX];
449   int lines;
450   ssize_t wrote;
451   int rclen, len;
453   if (sock == NULL) return rc;  /* journal replay mode */
455   if (sock->batch_mode)
456   {
457     if (rc == RESP_OK)
458       return rc; /* no response on success during BATCH */
459     lines = sock->batch_cmd;
460   }
461   else if (rc == RESP_OK)
462     lines = count_lines(sock->wbuf);
463   else
464     lines = -1;
466   rclen = sprintf(buffer, "%d ", lines);
467   va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
470 #else
471   len = vsprintf(buffer+rclen, fmt, argp);
472 #endif
473   va_end(argp);
474   if (len < 0)
475     return -1;
477   len += rclen;
479   /* append the result to the wbuf, don't write to the user */
480   if (sock->batch_mode)
481     return add_to_wbuf(sock, buffer, len);
483   /* first write must be complete */
484   if (len != write(sock->fd, buffer, len))
485   {
486     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
487     return -1;
488   }
490   if (sock->wbuf != NULL)
491   {
492     wrote = 0;
493     while (wrote < sock->wbuf_len)
494     {
495       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
496       if (wb <= 0)
497       {
498         RRDD_LOG(LOG_INFO, "send_response: could not write results");
499         return -1;
500       }
501       wrote += wb;
502     }
503   }
505   free(sock->wbuf); sock->wbuf = NULL;
506   sock->wbuf_len = 0;
508   return 0;
509 } /* }}} */
511 static void wipe_ci_values(cache_item_t *ci, time_t when)
513   ci->values = NULL;
514   ci->values_num = 0;
516   ci->last_flush_time = when;
517   if (config_write_jitter > 0)
518     ci->last_flush_time += (random() % config_write_jitter);
521 /* remove_from_queue
522  * remove a "cache_item_t" item from the queue.
523  * must hold 'cache_lock' when calling this
524  */
525 static void remove_from_queue(cache_item_t *ci) /* {{{ */
527   if (ci == NULL) return;
529   if (ci->prev == NULL)
530     cache_queue_head = ci->next; /* reset head */
531   else
532     ci->prev->next = ci->next;
534   if (ci->next == NULL)
535     cache_queue_tail = ci->prev; /* reset the tail */
536   else
537     ci->next->prev = ci->prev;
539   ci->next = ci->prev = NULL;
540   ci->flags &= ~CI_FLAGS_IN_QUEUE;
541 } /* }}} static void remove_from_queue */
543 /*
544  * enqueue_cache_item:
545  * `cache_lock' must be acquired before calling this function!
546  */
547 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
548     queue_side_t side)
550   if (ci == NULL)
551     return (-1);
553   if (ci->values_num == 0)
554     return (0);
556   if (side == HEAD)
557   {
558     if (cache_queue_head == ci)
559       return 0;
561     /* remove from the double linked list */
562     if (ci->flags & CI_FLAGS_IN_QUEUE)
563       remove_from_queue(ci);
565     ci->prev = NULL;
566     ci->next = cache_queue_head;
567     if (ci->next != NULL)
568       ci->next->prev = ci;
569     cache_queue_head = ci;
571     if (cache_queue_tail == NULL)
572       cache_queue_tail = cache_queue_head;
573   }
574   else /* (side == TAIL) */
575   {
576     /* We don't move values back in the list.. */
577     if (ci->flags & CI_FLAGS_IN_QUEUE)
578       return (0);
580     assert (ci->next == NULL);
581     assert (ci->prev == NULL);
583     ci->prev = cache_queue_tail;
585     if (cache_queue_tail == NULL)
586       cache_queue_head = ci;
587     else
588       cache_queue_tail->next = ci;
590     cache_queue_tail = ci;
591   }
593   ci->flags |= CI_FLAGS_IN_QUEUE;
595   pthread_cond_broadcast(&cache_cond);
596   pthread_mutex_lock (&stats_lock);
597   stats_queue_length++;
598   pthread_mutex_unlock (&stats_lock);
600   return (0);
601 } /* }}} int enqueue_cache_item */
603 /*
604  * tree_callback_flush:
605  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
606  * while this is in progress.
607  */
608 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
609     gpointer data)
611   cache_item_t *ci;
612   callback_flush_data_t *cfd;
614   ci = (cache_item_t *) value;
615   cfd = (callback_flush_data_t *) data;
617   if ((ci->last_flush_time <= cfd->abs_timeout)
618       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
619       && (ci->values_num > 0))
620   {
621     enqueue_cache_item (ci, TAIL);
622   }
623   else if ((do_shutdown != 0)
624       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
625       && (ci->values_num > 0))
626   {
627     enqueue_cache_item (ci, TAIL);
628   }
629   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
630       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
631       && (ci->values_num <= 0))
632   {
633     char **temp;
635     temp = (char **) realloc (cfd->keys,
636         sizeof (char *) * (cfd->keys_num + 1));
637     if (temp == NULL)
638     {
639       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
640       return (FALSE);
641     }
642     cfd->keys = temp;
643     /* Make really sure this points to the _same_ place */
644     assert ((char *) key == ci->file);
645     cfd->keys[cfd->keys_num] = (char *) key;
646     cfd->keys_num++;
647   }
649   return (FALSE);
650 } /* }}} gboolean tree_callback_flush */
652 static int flush_old_values (int max_age)
654   callback_flush_data_t cfd;
655   size_t k;
657   memset (&cfd, 0, sizeof (cfd));
658   /* Pass the current time as user data so that we don't need to call
659    * `time' for each node. */
660   cfd.now = time (NULL);
661   cfd.keys = NULL;
662   cfd.keys_num = 0;
664   if (max_age > 0)
665     cfd.abs_timeout = cfd.now - max_age;
666   else
667     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
669   /* `tree_callback_flush' will return the keys of all values that haven't
670    * been touched in the last `config_flush_interval' seconds in `cfd'.
671    * The char*'s in this array point to the same memory as ci->file, so we
672    * don't need to free them separately. */
673   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
675   for (k = 0; k < cfd.keys_num; k++)
676   {
677     cache_item_t *ci;
679     /* This must not fail. */
680     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
681     assert (ci != NULL);
683     /* If we end up here with values available, something's seriously
684      * messed up. */
685     assert (ci->values_num == 0);
687     /* Remove the node from the tree */
688     g_tree_remove (cache_tree, cfd.keys[k]);
689     cfd.keys[k] = NULL;
691     /* Now free and clean up `ci'. */
692     free (ci->file);
693     ci->file = NULL;
694     free (ci);
695     ci = NULL;
696   } /* for (k = 0; k < cfd.keys_num; k++) */
698   if (cfd.keys != NULL)
699   {
700     free (cfd.keys);
701     cfd.keys = NULL;
702   }
704   return (0);
705 } /* int flush_old_values */
707 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
709   struct timeval now;
710   struct timespec next_flush;
711   int final_flush = 0; /* make sure we only flush once on shutdown */
713   gettimeofday (&now, NULL);
714   next_flush.tv_sec = now.tv_sec + config_flush_interval;
715   next_flush.tv_nsec = 1000 * now.tv_usec;
717   pthread_mutex_lock (&cache_lock);
718   while ((do_shutdown == 0) || (cache_queue_head != NULL))
719   {
720     cache_item_t *ci;
721     char *file;
722     char **values;
723     int values_num;
724     int status;
725     int i;
727     /* First, check if it's time to do the cache flush. */
728     gettimeofday (&now, NULL);
729     if ((now.tv_sec > next_flush.tv_sec)
730         || ((now.tv_sec == next_flush.tv_sec)
731           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
732     {
733       /* Flush all values that haven't been written in the last
734        * `config_write_interval' seconds. */
735       flush_old_values (config_write_interval);
737       /* Determine the time of the next cache flush. */
738       while (next_flush.tv_sec <= now.tv_sec)
739         next_flush.tv_sec += config_flush_interval;
741       /* unlock the cache while we rotate so we don't block incoming
742        * updates if the fsync() blocks on disk I/O */
743       pthread_mutex_unlock(&cache_lock);
744       journal_rotate();
745       pthread_mutex_lock(&cache_lock);
746     }
748     /* Now, check if there's something to store away. If not, wait until
749      * something comes in or it's time to do the cache flush.  if we are
750      * shutting down, do not wait around.  */
751     if (cache_queue_head == NULL && !do_shutdown)
752     {
753       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
754       if ((status != 0) && (status != ETIMEDOUT))
755       {
756         RRDD_LOG (LOG_ERR, "queue_thread_main: "
757             "pthread_cond_timedwait returned %i.", status);
758       }
759     }
761     /* We're about to shut down */
762     if (do_shutdown != 0 && !final_flush++)
763     {
764       if (config_flush_at_shutdown)
765         flush_old_values (-1); /* flush everything */
766       else
767         break;
768     }
770     /* Check if a value has arrived. This may be NULL if we timed out or there
771      * was an interrupt such as a signal. */
772     if (cache_queue_head == NULL)
773       continue;
775     ci = cache_queue_head;
777     /* copy the relevant parts */
778     file = strdup (ci->file);
779     if (file == NULL)
780     {
781       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
782       continue;
783     }
785     assert(ci->values != NULL);
786     assert(ci->values_num > 0);
788     values = ci->values;
789     values_num = ci->values_num;
791     wipe_ci_values(ci, time(NULL));
792     remove_from_queue(ci);
794     pthread_mutex_lock (&stats_lock);
795     assert (stats_queue_length > 0);
796     stats_queue_length--;
797     pthread_mutex_unlock (&stats_lock);
799     pthread_mutex_unlock (&cache_lock);
801     rrd_clear_error ();
802     status = rrd_update_r (file, NULL, values_num, (void *) values);
803     if (status != 0)
804     {
805       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
806           "rrd_update_r (%s) failed with status %i. (%s)",
807           file, status, rrd_get_error());
808     }
810     journal_write("wrote", file);
811     pthread_cond_broadcast(&ci->flushed);
813     for (i = 0; i < values_num; i++)
814       free (values[i]);
816     free(values);
817     free(file);
819     if (status == 0)
820     {
821       pthread_mutex_lock (&stats_lock);
822       stats_updates_written++;
823       stats_data_sets_written += values_num;
824       pthread_mutex_unlock (&stats_lock);
825     }
827     pthread_mutex_lock (&cache_lock);
829     /* We're about to shut down */
830     if (do_shutdown != 0 && !final_flush++)
831     {
832       if (config_flush_at_shutdown)
833           flush_old_values (-1); /* flush everything */
834       else
835         break;
836     }
837   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
838   pthread_mutex_unlock (&cache_lock);
840   if (config_flush_at_shutdown)
841   {
842     assert(cache_queue_head == NULL);
843     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
844   }
846   journal_done();
848   return (NULL);
849 } /* }}} void *queue_thread_main */
851 static int buffer_get_field (char **buffer_ret, /* {{{ */
852     size_t *buffer_size_ret, char **field_ret)
854   char *buffer;
855   size_t buffer_pos;
856   size_t buffer_size;
857   char *field;
858   size_t field_size;
859   int status;
861   buffer = *buffer_ret;
862   buffer_pos = 0;
863   buffer_size = *buffer_size_ret;
864   field = *buffer_ret;
865   field_size = 0;
867   if (buffer_size <= 0)
868     return (-1);
870   /* This is ensured by `handle_request'. */
871   assert (buffer[buffer_size - 1] == '\0');
873   status = -1;
874   while (buffer_pos < buffer_size)
875   {
876     /* Check for end-of-field or end-of-buffer */
877     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
878     {
879       field[field_size] = 0;
880       field_size++;
881       buffer_pos++;
882       status = 0;
883       break;
884     }
885     /* Handle escaped characters. */
886     else if (buffer[buffer_pos] == '\\')
887     {
888       if (buffer_pos >= (buffer_size - 1))
889         break;
890       buffer_pos++;
891       field[field_size] = buffer[buffer_pos];
892       field_size++;
893       buffer_pos++;
894     }
895     /* Normal operation */ 
896     else
897     {
898       field[field_size] = buffer[buffer_pos];
899       field_size++;
900       buffer_pos++;
901     }
902   } /* while (buffer_pos < buffer_size) */
904   if (status != 0)
905     return (status);
907   *buffer_ret = buffer + buffer_pos;
908   *buffer_size_ret = buffer_size - buffer_pos;
909   *field_ret = field;
911   return (0);
912 } /* }}} int buffer_get_field */
914 /* if we're restricting writes to the base directory,
915  * check whether the file falls within the dir
916  * returns 1 if OK, otherwise 0
917  */
918 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
920   assert(file != NULL);
922   if (!config_write_base_only
923       || sock == NULL /* journal replay */
924       || config_base_dir == NULL)
925     return 1;
927   if (strstr(file, "../") != NULL) goto err;
929   /* relative paths without "../" are ok */
930   if (*file != '/') return 1;
932   /* file must be of the format base + "/" + <1+ char filename> */
933   if (strlen(file) < _config_base_dir_len + 2) goto err;
934   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
935   if (*(file + _config_base_dir_len) != '/') goto err;
937   return 1;
939 err:
940   if (sock != NULL && sock->fd >= 0)
941     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
943   return 0;
944 } /* }}} static int check_file_access */
946 /* returns 1 if we have the required privilege level,
947  * otherwise issue an error to the user on sock */
948 static int has_privilege (listen_socket_t *sock, /* {{{ */
949                           socket_privilege priv)
951   if (sock == NULL) /* journal replay */
952     return 1;
954   if (sock->privilege >= priv)
955     return 1;
957   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
958 } /* }}} static int has_privilege */
960 static int flush_file (const char *filename) /* {{{ */
962   cache_item_t *ci;
964   pthread_mutex_lock (&cache_lock);
966   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
967   if (ci == NULL)
968   {
969     pthread_mutex_unlock (&cache_lock);
970     return (ENOENT);
971   }
973   if (ci->values_num > 0)
974   {
975     /* Enqueue at head */
976     enqueue_cache_item (ci, HEAD);
977     pthread_cond_wait(&ci->flushed, &cache_lock);
978   }
980   pthread_mutex_unlock(&cache_lock);
982   return (0);
983 } /* }}} int flush_file */
985 static int handle_request_help (listen_socket_t *sock, /* {{{ */
986     char *buffer, size_t buffer_size)
988   int status;
989   char **help_text;
990   char *command;
992   char *help_help[2] =
993   {
994     "Command overview\n"
995     ,
996     "FLUSH <filename>\n"
997     "FLUSHALL\n"
998     "HELP [<command>]\n"
999     "UPDATE <filename> <values> [<values> ...]\n"
1000     "BATCH\n"
1001     "STATS\n"
1002   };
1004   char *help_flush[2] =
1005   {
1006     "Help for FLUSH\n"
1007     ,
1008     "Usage: FLUSH <filename>\n"
1009     "\n"
1010     "Adds the given filename to the head of the update queue and returns\n"
1011     "after is has been dequeued.\n"
1012   };
1014   char *help_flushall[2] =
1015   {
1016     "Help for FLUSHALL\n"
1017     ,
1018     "Usage: FLUSHALL\n"
1019     "\n"
1020     "Triggers writing of all pending updates.  Returns immediately.\n"
1021   };
1023   char *help_update[2] =
1024   {
1025     "Help for UPDATE\n"
1026     ,
1027     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1028     "\n"
1029     "Adds the given file to the internal cache if it is not yet known and\n"
1030     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1031     "for details.\n"
1032     "\n"
1033     "Each <values> has the following form:\n"
1034     "  <values> = <time>:<value>[:<value>[...]]\n"
1035     "See the rrdupdate(1) manpage for details.\n"
1036   };
1038   char *help_stats[2] =
1039   {
1040     "Help for STATS\n"
1041     ,
1042     "Usage: STATS\n"
1043     "\n"
1044     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1045     "a description of the values.\n"
1046   };
1048   char *help_batch[2] =
1049   {
1050     "Help for BATCH\n"
1051     ,
1052     "The 'BATCH' command permits the client to initiate a bulk load\n"
1053     "   of commands to rrdcached.\n"
1054     "\n"
1055     "Usage:\n"
1056     "\n"
1057     "    client: BATCH\n"
1058     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1059     "    client: command #1\n"
1060     "    client: command #2\n"
1061     "    client: ... and so on\n"
1062     "    client: .\n"
1063     "    server: 2 errors\n"
1064     "    server: 7 message for command #7\n"
1065     "    server: 9 message for command #9\n"
1066     "\n"
1067     "For more information, consult the rrdcached(1) documentation.\n"
1068   };
1070   status = buffer_get_field (&buffer, &buffer_size, &command);
1071   if (status != 0)
1072     help_text = help_help;
1073   else
1074   {
1075     if (strcasecmp (command, "update") == 0)
1076       help_text = help_update;
1077     else if (strcasecmp (command, "flush") == 0)
1078       help_text = help_flush;
1079     else if (strcasecmp (command, "flushall") == 0)
1080       help_text = help_flushall;
1081     else if (strcasecmp (command, "stats") == 0)
1082       help_text = help_stats;
1083     else if (strcasecmp (command, "batch") == 0)
1084       help_text = help_batch;
1085     else
1086       help_text = help_help;
1087   }
1089   add_response_info(sock, help_text[1]);
1090   return send_response(sock, RESP_OK, help_text[0]);
1091 } /* }}} int handle_request_help */
1093 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1095   uint64_t copy_queue_length;
1096   uint64_t copy_updates_received;
1097   uint64_t copy_flush_received;
1098   uint64_t copy_updates_written;
1099   uint64_t copy_data_sets_written;
1100   uint64_t copy_journal_bytes;
1101   uint64_t copy_journal_rotate;
1103   uint64_t tree_nodes_number;
1104   uint64_t tree_depth;
1106   pthread_mutex_lock (&stats_lock);
1107   copy_queue_length       = stats_queue_length;
1108   copy_updates_received   = stats_updates_received;
1109   copy_flush_received     = stats_flush_received;
1110   copy_updates_written    = stats_updates_written;
1111   copy_data_sets_written  = stats_data_sets_written;
1112   copy_journal_bytes      = stats_journal_bytes;
1113   copy_journal_rotate     = stats_journal_rotate;
1114   pthread_mutex_unlock (&stats_lock);
1116   pthread_mutex_lock (&cache_lock);
1117   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1118   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1119   pthread_mutex_unlock (&cache_lock);
1121   add_response_info(sock,
1122                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1123   add_response_info(sock,
1124                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1125   add_response_info(sock,
1126                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1127   add_response_info(sock,
1128                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1129   add_response_info(sock,
1130                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1131   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1132   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1133   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1134   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1136   send_response(sock, RESP_OK, "Statistics follow\n");
1138   return (0);
1139 } /* }}} int handle_request_stats */
1141 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1142     char *buffer, size_t buffer_size)
1144   char *file;
1145   int status;
1147   status = buffer_get_field (&buffer, &buffer_size, &file);
1148   if (status != 0)
1149   {
1150     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1151   }
1152   else
1153   {
1154     pthread_mutex_lock(&stats_lock);
1155     stats_flush_received++;
1156     pthread_mutex_unlock(&stats_lock);
1158     if (!check_file_access(file, sock)) return 0;
1160     status = flush_file (file);
1161     if (status == 0)
1162       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1163     else if (status == ENOENT)
1164     {
1165       /* no file in our tree; see whether it exists at all */
1166       struct stat statbuf;
1168       memset(&statbuf, 0, sizeof(statbuf));
1169       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1170         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1171       else
1172         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1173     }
1174     else if (status < 0)
1175       return send_response(sock, RESP_ERR, "Internal error.\n");
1176     else
1177       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1178   }
1180   /* NOTREACHED */
1181   assert(1==0);
1182 } /* }}} int handle_request_slurp */
1184 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1186   int status;
1188   status = has_privilege(sock, PRIV_HIGH);
1189   if (status <= 0)
1190     return status;
1192   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1194   pthread_mutex_lock(&cache_lock);
1195   flush_old_values(-1);
1196   pthread_mutex_unlock(&cache_lock);
1198   return send_response(sock, RESP_OK, "Started flush.\n");
1199 } /* }}} static int handle_request_flushall */
1201 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1202     char *buffer, size_t buffer_size)
1204   char *file;
1205   int values_num = 0;
1206   int status;
1207   char orig_buf[CMD_MAX];
1209   time_t now;
1210   cache_item_t *ci;
1212   now = time (NULL);
1214   status = has_privilege(sock, PRIV_HIGH);
1215   if (status <= 0)
1216     return status;
1218   /* save it for the journal later */
1219   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1221   status = buffer_get_field (&buffer, &buffer_size, &file);
1222   if (status != 0)
1223     return send_response(sock, RESP_ERR,
1224                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1226   pthread_mutex_lock(&stats_lock);
1227   stats_updates_received++;
1228   pthread_mutex_unlock(&stats_lock);
1230   if (!check_file_access(file, sock)) return 0;
1232   pthread_mutex_lock (&cache_lock);
1233   ci = g_tree_lookup (cache_tree, file);
1235   if (ci == NULL) /* {{{ */
1236   {
1237     struct stat statbuf;
1239     /* don't hold the lock while we setup; stat(2) might block */
1240     pthread_mutex_unlock(&cache_lock);
1242     memset (&statbuf, 0, sizeof (statbuf));
1243     status = stat (file, &statbuf);
1244     if (status != 0)
1245     {
1246       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1248       status = errno;
1249       if (status == ENOENT)
1250         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1251       else
1252         return send_response(sock, RESP_ERR,
1253                              "stat failed with error %i.\n", status);
1254     }
1255     if (!S_ISREG (statbuf.st_mode))
1256       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1258     if (access(file, R_OK|W_OK) != 0)
1259       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1260                            file, rrd_strerror(errno));
1262     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1263     if (ci == NULL)
1264     {
1265       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1267       return send_response(sock, RESP_ERR, "malloc failed.\n");
1268     }
1269     memset (ci, 0, sizeof (cache_item_t));
1271     ci->file = strdup (file);
1272     if (ci->file == NULL)
1273     {
1274       free (ci);
1275       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1277       return send_response(sock, RESP_ERR, "strdup failed.\n");
1278     }
1280     wipe_ci_values(ci, now);
1281     ci->flags = CI_FLAGS_IN_TREE;
1283     pthread_mutex_lock(&cache_lock);
1284     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1285   } /* }}} */
1286   assert (ci != NULL);
1288   /* don't re-write updates in replay mode */
1289   if (sock != NULL)
1290     journal_write("update", orig_buf);
1292   while (buffer_size > 0)
1293   {
1294     char **temp;
1295     char *value;
1297     status = buffer_get_field (&buffer, &buffer_size, &value);
1298     if (status != 0)
1299     {
1300       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1301       break;
1302     }
1304     temp = (char **) realloc (ci->values,
1305         sizeof (char *) * (ci->values_num + 1));
1306     if (temp == NULL)
1307     {
1308       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1309       continue;
1310     }
1311     ci->values = temp;
1313     ci->values[ci->values_num] = strdup (value);
1314     if (ci->values[ci->values_num] == NULL)
1315     {
1316       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1317       continue;
1318     }
1319     ci->values_num++;
1321     values_num++;
1322   }
1324   if (((now - ci->last_flush_time) >= config_write_interval)
1325       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1326       && (ci->values_num > 0))
1327   {
1328     enqueue_cache_item (ci, TAIL);
1329   }
1331   pthread_mutex_unlock (&cache_lock);
1333   if (values_num < 1)
1334     return send_response(sock, RESP_ERR, "No values updated.\n");
1335   else
1336     return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1338   /* NOTREACHED */
1339   assert(1==0);
1341 } /* }}} int handle_request_update */
1343 /* we came across a "WROTE" entry during journal replay.
1344  * throw away any values that we have accumulated for this file
1345  */
1346 static int handle_request_wrote (const char *buffer) /* {{{ */
1348   int i;
1349   cache_item_t *ci;
1350   const char *file = buffer;
1352   pthread_mutex_lock(&cache_lock);
1354   ci = g_tree_lookup(cache_tree, file);
1355   if (ci == NULL)
1356   {
1357     pthread_mutex_unlock(&cache_lock);
1358     return (0);
1359   }
1361   if (ci->values)
1362   {
1363     for (i=0; i < ci->values_num; i++)
1364       free(ci->values[i]);
1366     free(ci->values);
1367   }
1369   wipe_ci_values(ci, time(NULL));
1370   remove_from_queue(ci);
1372   pthread_mutex_unlock(&cache_lock);
1373   return (0);
1374 } /* }}} int handle_request_wrote */
1376 /* start "BATCH" processing */
1377 static int batch_start (listen_socket_t *sock) /* {{{ */
1379   int status;
1380   if (sock->batch_mode)
1381     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1383   status = send_response(sock, RESP_OK,
1384                          "Go ahead.  End with dot '.' on its own line.\n");
1385   sock->batch_mode = 1;
1386   sock->batch_cmd = 0;
1388   return status;
1389 } /* }}} static int batch_start */
1391 /* finish "BATCH" processing and return results to the client */
1392 static int batch_done (listen_socket_t *sock) /* {{{ */
1394   assert(sock->batch_mode);
1395   sock->batch_mode = 0;
1396   sock->batch_cmd  = 0;
1397   return send_response(sock, RESP_OK, "errors\n");
1398 } /* }}} static int batch_done */
1400 /* if sock==NULL, we are in journal replay mode */
1401 static int handle_request (listen_socket_t *sock, /* {{{ */
1402                            char *buffer, size_t buffer_size)
1404   char *buffer_ptr;
1405   char *command;
1406   int status;
1408   assert (buffer[buffer_size - 1] == '\0');
1410   buffer_ptr = buffer;
1411   command = NULL;
1412   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1413   if (status != 0)
1414   {
1415     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1416     return (-1);
1417   }
1419   if (sock != NULL && sock->batch_mode)
1420     sock->batch_cmd++;
1422   if (strcasecmp (command, "update") == 0)
1423     return (handle_request_update (sock, buffer_ptr, buffer_size));
1424   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1425   {
1426     /* this is only valid in replay mode */
1427     return (handle_request_wrote (buffer_ptr));
1428   }
1429   else if (strcasecmp (command, "flush") == 0)
1430     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1431   else if (strcasecmp (command, "flushall") == 0)
1432     return (handle_request_flushall(sock));
1433   else if (strcasecmp (command, "stats") == 0)
1434     return (handle_request_stats (sock));
1435   else if (strcasecmp (command, "help") == 0)
1436     return (handle_request_help (sock, buffer_ptr, buffer_size));
1437   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1438     return batch_start(sock);
1439   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1440     return batch_done(sock);
1441   else
1442     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1444   /* NOTREACHED */
1445   assert(1==0);
1446 } /* }}} int handle_request */
1448 /* MUST NOT hold journal_lock before calling this */
1449 static void journal_rotate(void) /* {{{ */
1451   FILE *old_fh = NULL;
1452   int new_fd;
1454   if (journal_cur == NULL || journal_old == NULL)
1455     return;
1457   pthread_mutex_lock(&journal_lock);
1459   /* we rotate this way (rename before close) so that the we can release
1460    * the journal lock as fast as possible.  Journal writes to the new
1461    * journal can proceed immediately after the new file is opened.  The
1462    * fclose can then block without affecting new updates.
1463    */
1464   if (journal_fh != NULL)
1465   {
1466     old_fh = journal_fh;
1467     journal_fh = NULL;
1468     rename(journal_cur, journal_old);
1469     ++stats_journal_rotate;
1470   }
1472   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1473                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1474   if (new_fd >= 0)
1475   {
1476     journal_fh = fdopen(new_fd, "a");
1477     if (journal_fh == NULL)
1478       close(new_fd);
1479   }
1481   pthread_mutex_unlock(&journal_lock);
1483   if (old_fh != NULL)
1484     fclose(old_fh);
1486   if (journal_fh == NULL)
1487   {
1488     RRDD_LOG(LOG_CRIT,
1489              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1490              journal_cur, rrd_strerror(errno));
1492     RRDD_LOG(LOG_ERR,
1493              "JOURNALING DISABLED: All values will be flushed at shutdown");
1494     config_flush_at_shutdown = 1;
1495   }
1497 } /* }}} static void journal_rotate */
1499 static void journal_done(void) /* {{{ */
1501   if (journal_cur == NULL)
1502     return;
1504   pthread_mutex_lock(&journal_lock);
1505   if (journal_fh != NULL)
1506   {
1507     fclose(journal_fh);
1508     journal_fh = NULL;
1509   }
1511   if (config_flush_at_shutdown)
1512   {
1513     RRDD_LOG(LOG_INFO, "removing journals");
1514     unlink(journal_old);
1515     unlink(journal_cur);
1516   }
1517   else
1518   {
1519     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1520              "journals will be used at next startup");
1521   }
1523   pthread_mutex_unlock(&journal_lock);
1525 } /* }}} static void journal_done */
1527 static int journal_write(char *cmd, char *args) /* {{{ */
1529   int chars;
1531   if (journal_fh == NULL)
1532     return 0;
1534   pthread_mutex_lock(&journal_lock);
1535   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1536   pthread_mutex_unlock(&journal_lock);
1538   if (chars > 0)
1539   {
1540     pthread_mutex_lock(&stats_lock);
1541     stats_journal_bytes += chars;
1542     pthread_mutex_unlock(&stats_lock);
1543   }
1545   return chars;
1546 } /* }}} static int journal_write */
1548 static int journal_replay (const char *file) /* {{{ */
1550   FILE *fh;
1551   int entry_cnt = 0;
1552   int fail_cnt = 0;
1553   uint64_t line = 0;
1554   char entry[CMD_MAX];
1556   if (file == NULL) return 0;
1558   {
1559     char *reason;
1560     int status = 0;
1561     struct stat statbuf;
1563     memset(&statbuf, 0, sizeof(statbuf));
1564     if (stat(file, &statbuf) != 0)
1565     {
1566       if (errno == ENOENT)
1567         return 0;
1569       reason = "stat error";
1570       status = errno;
1571     }
1572     else if (!S_ISREG(statbuf.st_mode))
1573     {
1574       reason = "not a regular file";
1575       status = EPERM;
1576     }
1577     if (statbuf.st_uid != daemon_uid)
1578     {
1579       reason = "not owned by daemon user";
1580       status = EACCES;
1581     }
1582     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1583     {
1584       reason = "must not be user/group writable";
1585       status = EACCES;
1586     }
1588     if (status != 0)
1589     {
1590       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1591                file, rrd_strerror(status), reason);
1592       return 0;
1593     }
1594   }
1596   fh = fopen(file, "r");
1597   if (fh == NULL)
1598   {
1599     if (errno != ENOENT)
1600       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1601                file, rrd_strerror(errno));
1602     return 0;
1603   }
1604   else
1605     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1607   while(!feof(fh))
1608   {
1609     size_t entry_len;
1611     ++line;
1612     if (fgets(entry, sizeof(entry), fh) == NULL)
1613       break;
1614     entry_len = strlen(entry);
1616     /* check \n termination in case journal writing crashed mid-line */
1617     if (entry_len == 0)
1618       continue;
1619     else if (entry[entry_len - 1] != '\n')
1620     {
1621       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1622       ++fail_cnt;
1623       continue;
1624     }
1626     entry[entry_len - 1] = '\0';
1628     if (handle_request(NULL, entry, entry_len) == 0)
1629       ++entry_cnt;
1630     else
1631       ++fail_cnt;
1632   }
1634   fclose(fh);
1636   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1637            entry_cnt, fail_cnt);
1639   return entry_cnt > 0 ? 1 : 0;
1640 } /* }}} static int journal_replay */
1642 static void journal_init(void) /* {{{ */
1644   int had_journal = 0;
1646   if (journal_cur == NULL) return;
1648   pthread_mutex_lock(&journal_lock);
1650   RRDD_LOG(LOG_INFO, "checking for journal files");
1652   had_journal += journal_replay(journal_old);
1653   had_journal += journal_replay(journal_cur);
1655   /* it must have been a crash.  start a flush */
1656   if (had_journal && config_flush_at_shutdown)
1657     flush_old_values(-1);
1659   pthread_mutex_unlock(&journal_lock);
1660   journal_rotate();
1662   RRDD_LOG(LOG_INFO, "journal processing complete");
1664 } /* }}} static void journal_init */
1666 static void close_connection(listen_socket_t *sock)
1668   close(sock->fd) ;  sock->fd   = -1;
1669   free(sock->rbuf);  sock->rbuf = NULL;
1670   free(sock->wbuf);  sock->wbuf = NULL;
1672   free(sock);
1675 static void *connection_thread_main (void *args) /* {{{ */
1677   pthread_t self;
1678   listen_socket_t *sock;
1679   int i;
1680   int fd;
1682   sock = (listen_socket_t *) args;
1683   fd = sock->fd;
1685   /* init read buffers */
1686   sock->next_read = sock->next_cmd = 0;
1687   sock->rbuf = malloc(RBUF_SIZE);
1688   if (sock->rbuf == NULL)
1689   {
1690     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1691     close_connection(sock);
1692     return NULL;
1693   }
1695   pthread_mutex_lock (&connection_threads_lock);
1696   {
1697     pthread_t *temp;
1699     temp = (pthread_t *) realloc (connection_threads,
1700         sizeof (pthread_t) * (connection_threads_num + 1));
1701     if (temp == NULL)
1702     {
1703       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1704     }
1705     else
1706     {
1707       connection_threads = temp;
1708       connection_threads[connection_threads_num] = pthread_self ();
1709       connection_threads_num++;
1710     }
1711   }
1712   pthread_mutex_unlock (&connection_threads_lock);
1714   while (do_shutdown == 0)
1715   {
1716     char *cmd;
1717     ssize_t cmd_len;
1718     ssize_t rbytes;
1720     struct pollfd pollfd;
1721     int status;
1723     pollfd.fd = fd;
1724     pollfd.events = POLLIN | POLLPRI;
1725     pollfd.revents = 0;
1727     status = poll (&pollfd, 1, /* timeout = */ 500);
1728     if (do_shutdown)
1729       break;
1730     else if (status == 0) /* timeout */
1731       continue;
1732     else if (status < 0) /* error */
1733     {
1734       status = errno;
1735       if (status == EINTR)
1736         continue;
1737       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1738       continue;
1739     }
1741     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1742     {
1743       close_connection(sock);
1744       break;
1745     }
1746     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1747     {
1748       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1749           "poll(2) returned something unexpected: %#04hx",
1750           pollfd.revents);
1751       close_connection(sock);
1752       break;
1753     }
1755     rbytes = read(fd, sock->rbuf + sock->next_read,
1756                   RBUF_SIZE - sock->next_read);
1757     if (rbytes < 0)
1758     {
1759       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1760       break;
1761     }
1762     else if (rbytes == 0)
1763       break; /* eof */
1765     sock->next_read += rbytes;
1767     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1768     {
1769       status = handle_request (sock, cmd, cmd_len+1);
1770       if (status != 0)
1771         goto out_close;
1772     }
1773   }
1775 out_close:
1776   close_connection(sock);
1778   self = pthread_self ();
1779   /* Remove this thread from the connection threads list */
1780   pthread_mutex_lock (&connection_threads_lock);
1781   /* Find out own index in the array */
1782   for (i = 0; i < connection_threads_num; i++)
1783     if (pthread_equal (connection_threads[i], self) != 0)
1784       break;
1785   assert (i < connection_threads_num);
1787   /* Move the trailing threads forward. */
1788   if (i < (connection_threads_num - 1))
1789   {
1790     memmove (connection_threads + i,
1791         connection_threads + i + 1,
1792         sizeof (pthread_t) * (connection_threads_num - i - 1));
1793   }
1795   connection_threads_num--;
1796   pthread_mutex_unlock (&connection_threads_lock);
1798   return (NULL);
1799 } /* }}} void *connection_thread_main */
1801 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1803   int fd;
1804   struct sockaddr_un sa;
1805   listen_socket_t *temp;
1806   int status;
1807   const char *path;
1809   path = sock->addr;
1810   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1811     path += strlen("unix:");
1813   temp = (listen_socket_t *) realloc (listen_fds,
1814       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1815   if (temp == NULL)
1816   {
1817     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1818     return (-1);
1819   }
1820   listen_fds = temp;
1821   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1823   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1824   if (fd < 0)
1825   {
1826     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1827     return (-1);
1828   }
1830   memset (&sa, 0, sizeof (sa));
1831   sa.sun_family = AF_UNIX;
1832   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1834   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1835   if (status != 0)
1836   {
1837     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1838     close (fd);
1839     unlink (path);
1840     return (-1);
1841   }
1843   status = listen (fd, /* backlog = */ 10);
1844   if (status != 0)
1845   {
1846     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1847     close (fd);
1848     unlink (path);
1849     return (-1);
1850   }
1852   listen_fds[listen_fds_num].fd = fd;
1853   listen_fds[listen_fds_num].family = PF_UNIX;
1854   strncpy(listen_fds[listen_fds_num].addr, path,
1855           sizeof (listen_fds[listen_fds_num].addr) - 1);
1856   listen_fds_num++;
1858   return (0);
1859 } /* }}} int open_listen_socket_unix */
1861 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1863   struct addrinfo ai_hints;
1864   struct addrinfo *ai_res;
1865   struct addrinfo *ai_ptr;
1866   char addr_copy[NI_MAXHOST];
1867   char *addr;
1868   char *port;
1869   int status;
1871   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1872   addr_copy[sizeof (addr_copy) - 1] = 0;
1873   addr = addr_copy;
1875   memset (&ai_hints, 0, sizeof (ai_hints));
1876   ai_hints.ai_flags = 0;
1877 #ifdef AI_ADDRCONFIG
1878   ai_hints.ai_flags |= AI_ADDRCONFIG;
1879 #endif
1880   ai_hints.ai_family = AF_UNSPEC;
1881   ai_hints.ai_socktype = SOCK_STREAM;
1883   port = NULL;
1884   if (*addr == '[') /* IPv6+port format */
1885   {
1886     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1887     addr++;
1889     port = strchr (addr, ']');
1890     if (port == NULL)
1891     {
1892       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1893           sock->addr);
1894       return (-1);
1895     }
1896     *port = 0;
1897     port++;
1899     if (*port == ':')
1900       port++;
1901     else if (*port == 0)
1902       port = NULL;
1903     else
1904     {
1905       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1906           port);
1907       return (-1);
1908     }
1909   } /* if (*addr = ']') */
1910   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1911   {
1912     port = rindex(addr, ':');
1913     if (port != NULL)
1914     {
1915       *port = 0;
1916       port++;
1917     }
1918   }
1919   ai_res = NULL;
1920   status = getaddrinfo (addr,
1921                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1922                         &ai_hints, &ai_res);
1923   if (status != 0)
1924   {
1925     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1926         "%s", addr, gai_strerror (status));
1927     return (-1);
1928   }
1930   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1931   {
1932     int fd;
1933     listen_socket_t *temp;
1934     int one = 1;
1936     temp = (listen_socket_t *) realloc (listen_fds,
1937         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1938     if (temp == NULL)
1939     {
1940       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1941       continue;
1942     }
1943     listen_fds = temp;
1944     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1946     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1947     if (fd < 0)
1948     {
1949       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1950       continue;
1951     }
1953     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1955     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1956     if (status != 0)
1957     {
1958       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1959       close (fd);
1960       continue;
1961     }
1963     status = listen (fd, /* backlog = */ 10);
1964     if (status != 0)
1965     {
1966       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1967       close (fd);
1968       return (-1);
1969     }
1971     listen_fds[listen_fds_num].fd = fd;
1972     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1973     listen_fds_num++;
1974   } /* for (ai_ptr) */
1976   return (0);
1977 } /* }}} static int open_listen_socket_network */
1979 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1981   assert(sock != NULL);
1982   assert(sock->addr != NULL);
1984   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1985       || sock->addr[0] == '/')
1986     return (open_listen_socket_unix(sock));
1987   else
1988     return (open_listen_socket_network(sock));
1989 } /* }}} int open_listen_socket */
1991 static int close_listen_sockets (void) /* {{{ */
1993   size_t i;
1995   for (i = 0; i < listen_fds_num; i++)
1996   {
1997     close (listen_fds[i].fd);
1999     if (listen_fds[i].family == PF_UNIX)
2000       unlink(listen_fds[i].addr);
2001   }
2003   free (listen_fds);
2004   listen_fds = NULL;
2005   listen_fds_num = 0;
2007   return (0);
2008 } /* }}} int close_listen_sockets */
2010 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2012   struct pollfd *pollfds;
2013   int pollfds_num;
2014   int status;
2015   int i;
2017   for (i = 0; i < config_listen_address_list_len; i++)
2018     open_listen_socket (config_listen_address_list[i]);
2020   if (config_listen_address_list_len < 1)
2021   {
2022     listen_socket_t sock;
2023     memset(&sock, 0, sizeof(sock));
2024     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2025     open_listen_socket (&sock);
2026   }
2028   if (listen_fds_num < 1)
2029   {
2030     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2031         "could be opened. Sorry.");
2032     return (NULL);
2033   }
2035   pollfds_num = listen_fds_num;
2036   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2037   if (pollfds == NULL)
2038   {
2039     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2040     return (NULL);
2041   }
2042   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2044   RRDD_LOG(LOG_INFO, "listening for connections");
2046   while (do_shutdown == 0)
2047   {
2048     assert (pollfds_num == ((int) listen_fds_num));
2049     for (i = 0; i < pollfds_num; i++)
2050     {
2051       pollfds[i].fd = listen_fds[i].fd;
2052       pollfds[i].events = POLLIN | POLLPRI;
2053       pollfds[i].revents = 0;
2054     }
2056     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2057     if (do_shutdown)
2058       break;
2059     else if (status == 0) /* timeout */
2060       continue;
2061     else if (status < 0) /* error */
2062     {
2063       status = errno;
2064       if (status != EINTR)
2065       {
2066         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2067       }
2068       continue;
2069     }
2071     for (i = 0; i < pollfds_num; i++)
2072     {
2073       listen_socket_t *client_sock;
2074       struct sockaddr_storage client_sa;
2075       socklen_t client_sa_size;
2076       pthread_t tid;
2077       pthread_attr_t attr;
2079       if (pollfds[i].revents == 0)
2080         continue;
2082       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2083       {
2084         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2085             "poll(2) returned something unexpected for listen FD #%i.",
2086             pollfds[i].fd);
2087         continue;
2088       }
2090       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2091       if (client_sock == NULL)
2092       {
2093         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2094         continue;
2095       }
2096       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2098       client_sa_size = sizeof (client_sa);
2099       client_sock->fd = accept (pollfds[i].fd,
2100           (struct sockaddr *) &client_sa, &client_sa_size);
2101       if (client_sock->fd < 0)
2102       {
2103         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2104         free(client_sock);
2105         continue;
2106       }
2108       pthread_attr_init (&attr);
2109       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2111       status = pthread_create (&tid, &attr, connection_thread_main,
2112                                client_sock);
2113       if (status != 0)
2114       {
2115         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2116         close_connection(client_sock);
2117         continue;
2118       }
2119     } /* for (pollfds_num) */
2120   } /* while (do_shutdown == 0) */
2122   RRDD_LOG(LOG_INFO, "starting shutdown");
2124   close_listen_sockets ();
2126   pthread_mutex_lock (&connection_threads_lock);
2127   while (connection_threads_num > 0)
2128   {
2129     pthread_t wait_for;
2131     wait_for = connection_threads[0];
2133     pthread_mutex_unlock (&connection_threads_lock);
2134     pthread_join (wait_for, /* retval = */ NULL);
2135     pthread_mutex_lock (&connection_threads_lock);
2136   }
2137   pthread_mutex_unlock (&connection_threads_lock);
2139   return (NULL);
2140 } /* }}} void *listen_thread_main */
2142 static int daemonize (void) /* {{{ */
2144   int status;
2145   int fd;
2146   char *base_dir;
2148   daemon_uid = geteuid();
2150   fd = open_pidfile();
2151   if (fd < 0) return fd;
2153   if (!stay_foreground)
2154   {
2155     pid_t child;
2157     child = fork ();
2158     if (child < 0)
2159     {
2160       fprintf (stderr, "daemonize: fork(2) failed.\n");
2161       return (-1);
2162     }
2163     else if (child > 0)
2164     {
2165       return (1);
2166     }
2168     /* Become session leader */
2169     setsid ();
2171     /* Open the first three file descriptors to /dev/null */
2172     close (2);
2173     close (1);
2174     close (0);
2176     open ("/dev/null", O_RDWR);
2177     dup (0);
2178     dup (0);
2179   } /* if (!stay_foreground) */
2181   /* Change into the /tmp directory. */
2182   base_dir = (config_base_dir != NULL)
2183     ? config_base_dir
2184     : "/tmp";
2185   status = chdir (base_dir);
2186   if (status != 0)
2187   {
2188     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2189     return (-1);
2190   }
2192   install_signal_handlers();
2194   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2195   RRDD_LOG(LOG_INFO, "starting up");
2197   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2198   if (cache_tree == NULL)
2199   {
2200     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2201     return (-1);
2202   }
2204   status = write_pidfile (fd);
2205   return status;
2206 } /* }}} int daemonize */
2208 static int cleanup (void) /* {{{ */
2210   do_shutdown++;
2212   pthread_cond_signal (&cache_cond);
2213   pthread_join (queue_thread, /* return = */ NULL);
2215   remove_pidfile ();
2217   RRDD_LOG(LOG_INFO, "goodbye");
2218   closelog ();
2220   return (0);
2221 } /* }}} int cleanup */
2223 static int read_options (int argc, char **argv) /* {{{ */
2225   int option;
2226   int status = 0;
2228   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2229   {
2230     switch (option)
2231     {
2232       case 'g':
2233         stay_foreground=1;
2234         break;
2236       case 'L':
2237       case 'l':
2238       {
2239         listen_socket_t **temp;
2240         listen_socket_t *new;
2242         new = malloc(sizeof(listen_socket_t));
2243         if (new == NULL)
2244         {
2245           fprintf(stderr, "read_options: malloc failed.\n");
2246           return(2);
2247         }
2248         memset(new, 0, sizeof(listen_socket_t));
2250         temp = (listen_socket_t **) realloc (config_listen_address_list,
2251             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2252         if (temp == NULL)
2253         {
2254           fprintf (stderr, "read_options: realloc failed.\n");
2255           return (2);
2256         }
2257         config_listen_address_list = temp;
2259         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2260         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2262         temp[config_listen_address_list_len] = new;
2263         config_listen_address_list_len++;
2264       }
2265       break;
2267       case 'f':
2268       {
2269         int temp;
2271         temp = atoi (optarg);
2272         if (temp > 0)
2273           config_flush_interval = temp;
2274         else
2275         {
2276           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2277           status = 3;
2278         }
2279       }
2280       break;
2282       case 'w':
2283       {
2284         int temp;
2286         temp = atoi (optarg);
2287         if (temp > 0)
2288           config_write_interval = temp;
2289         else
2290         {
2291           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2292           status = 2;
2293         }
2294       }
2295       break;
2297       case 'z':
2298       {
2299         int temp;
2301         temp = atoi(optarg);
2302         if (temp > 0)
2303           config_write_jitter = temp;
2304         else
2305         {
2306           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2307           status = 2;
2308         }
2310         break;
2311       }
2313       case 'B':
2314         config_write_base_only = 1;
2315         break;
2317       case 'b':
2318       {
2319         size_t len;
2321         if (config_base_dir != NULL)
2322           free (config_base_dir);
2323         config_base_dir = strdup (optarg);
2324         if (config_base_dir == NULL)
2325         {
2326           fprintf (stderr, "read_options: strdup failed.\n");
2327           return (3);
2328         }
2330         len = strlen (config_base_dir);
2331         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2332         {
2333           config_base_dir[len - 1] = 0;
2334           len--;
2335         }
2337         if (len < 1)
2338         {
2339           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2340           return (4);
2341         }
2343         _config_base_dir_len = len;
2344       }
2345       break;
2347       case 'p':
2348       {
2349         if (config_pid_file != NULL)
2350           free (config_pid_file);
2351         config_pid_file = strdup (optarg);
2352         if (config_pid_file == NULL)
2353         {
2354           fprintf (stderr, "read_options: strdup failed.\n");
2355           return (3);
2356         }
2357       }
2358       break;
2360       case 'F':
2361         config_flush_at_shutdown = 1;
2362         break;
2364       case 'j':
2365       {
2366         struct stat statbuf;
2367         const char *dir = optarg;
2369         status = stat(dir, &statbuf);
2370         if (status != 0)
2371         {
2372           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2373           return 6;
2374         }
2376         if (!S_ISDIR(statbuf.st_mode)
2377             || access(dir, R_OK|W_OK|X_OK) != 0)
2378         {
2379           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2380                   errno ? rrd_strerror(errno) : "");
2381           return 6;
2382         }
2384         journal_cur = malloc(PATH_MAX + 1);
2385         journal_old = malloc(PATH_MAX + 1);
2386         if (journal_cur == NULL || journal_old == NULL)
2387         {
2388           fprintf(stderr, "malloc failure for journal files\n");
2389           return 6;
2390         }
2391         else 
2392         {
2393           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2394           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2395         }
2396       }
2397       break;
2399       case 'h':
2400       case '?':
2401         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2402             "\n"
2403             "Usage: rrdcached [options]\n"
2404             "\n"
2405             "Valid options are:\n"
2406             "  -l <address>  Socket address to listen to.\n"
2407             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2408             "  -w <seconds>  Interval in which to write data.\n"
2409             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2410             "  -f <seconds>  Interval in which to flush dead data.\n"
2411             "  -p <file>     Location of the PID-file.\n"
2412             "  -b <dir>      Base directory to change to.\n"
2413             "  -B            Restrict file access to paths within -b <dir>\n"
2414             "  -g            Do not fork and run in the foreground.\n"
2415             "  -j <dir>      Directory in which to create the journal files.\n"
2416             "  -F            Always flush all updates at shutdown\n"
2417             "\n"
2418             "For more information and a detailed description of all options "
2419             "please refer\n"
2420             "to the rrdcached(1) manual page.\n",
2421             VERSION);
2422         status = -1;
2423         break;
2424     } /* switch (option) */
2425   } /* while (getopt) */
2427   /* advise the user when values are not sane */
2428   if (config_flush_interval < 2 * config_write_interval)
2429     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2430             " 2x write interval (-w) !\n");
2431   if (config_write_jitter > config_write_interval)
2432     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2433             " write interval (-w) !\n");
2435   if (config_write_base_only && config_base_dir == NULL)
2436     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2437             "  Consult the rrdcached documentation\n");
2439   if (journal_cur == NULL)
2440     config_flush_at_shutdown = 1;
2442   return (status);
2443 } /* }}} int read_options */
2445 int main (int argc, char **argv)
2447   int status;
2449   status = read_options (argc, argv);
2450   if (status != 0)
2451   {
2452     if (status < 0)
2453       status = 0;
2454     return (status);
2455   }
2457   status = daemonize ();
2458   if (status == 1)
2459   {
2460     struct sigaction sigchld;
2462     memset (&sigchld, 0, sizeof (sigchld));
2463     sigchld.sa_handler = SIG_IGN;
2464     sigaction (SIGCHLD, &sigchld, NULL);
2466     return (0);
2467   }
2468   else if (status != 0)
2469   {
2470     fprintf (stderr, "daemonize failed, exiting.\n");
2471     return (1);
2472   }
2474   journal_init();
2476   /* start the queue thread */
2477   memset (&queue_thread, 0, sizeof (queue_thread));
2478   status = pthread_create (&queue_thread,
2479                            NULL, /* attr */
2480                            queue_thread_main,
2481                            NULL); /* args */
2482   if (status != 0)
2483   {
2484     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2485     cleanup();
2486     return (1);
2487   }
2489   listen_thread_main (NULL);
2490   cleanup ();
2492   return (0);
2493 } /* int main */
2495 /*
2496  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2497  */