Code

fixed bad folding marker (on handle_request_flush)
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 typedef enum
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
119   /* state for BATCH processing */
120   int batch_mode;
121   int batch_cmd;
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
150 struct callback_flush_data_s
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
159 enum queue_side_e
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(void) /* {{{ */
291   int fd;
292   char *file;
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
298   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
301             file, rrd_strerror(errno));
303   return(fd);
304 } /* }}} static int open_pidfile */
306 static int write_pidfile (int fd) /* {{{ */
308   pid_t pid;
309   FILE *fh;
311   pid = getpid ();
313   fh = fdopen (fd, "w");
314   if (fh == NULL)
315   {
316     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
317     close(fd);
318     return (-1);
319   }
321   fprintf (fh, "%i\n", (int) pid);
322   fclose (fh);
324   return (0);
325 } /* }}} int write_pidfile */
327 static int remove_pidfile (void) /* {{{ */
329   char *file;
330   int status;
332   file = (config_pid_file != NULL)
333     ? config_pid_file
334     : LOCALSTATEDIR "/run/rrdcached.pid";
336   status = unlink (file);
337   if (status == 0)
338     return (0);
339   return (errno);
340 } /* }}} int remove_pidfile */
342 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
344   char *eol;
346   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
347                sock->next_read - sock->next_cmd);
349   if (eol == NULL)
350   {
351     /* no commands left, move remainder back to front of rbuf */
352     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
353             sock->next_read - sock->next_cmd);
354     sock->next_read -= sock->next_cmd;
355     sock->next_cmd = 0;
356     *len = 0;
357     return NULL;
358   }
359   else
360   {
361     char *cmd = sock->rbuf + sock->next_cmd;
362     *eol = '\0';
364     sock->next_cmd = eol - sock->rbuf + 1;
366     if (eol > sock->rbuf && *(eol-1) == '\r')
367       *(--eol) = '\0'; /* handle "\r\n" EOL */
369     *len = eol - cmd;
371     return cmd;
372   }
374   /* NOTREACHED */
375   assert(1==0);
378 /* add the characters directly to the write buffer */
379 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
381   char *new_buf;
383   assert(sock != NULL);
385   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
386   if (new_buf == NULL)
387   {
388     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
389     return -1;
390   }
392   strncpy(new_buf + sock->wbuf_len, str, len + 1);
394   sock->wbuf = new_buf;
395   sock->wbuf_len += len;
397   return 0;
398 } /* }}} static int add_to_wbuf */
400 /* add the text to the "extra" info that's sent after the status line */
401 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
403   va_list argp;
404   char buffer[CMD_MAX];
405   int len;
407   if (sock == NULL) return 0; /* journal replay mode */
408   if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
410   va_start(argp, fmt);
411 #ifdef HAVE_VSNPRINTF
412   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
413 #else
414   len = vsprintf(buffer, fmt, argp);
415 #endif
416   va_end(argp);
417   if (len < 0)
418   {
419     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
420     return -1;
421   }
423   return add_to_wbuf(sock, buffer, len);
424 } /* }}} static int add_response_info */
426 static int count_lines(char *str) /* {{{ */
428   int lines = 0;
430   if (str != NULL)
431   {
432     while ((str = strchr(str, '\n')) != NULL)
433     {
434       ++lines;
435       ++str;
436     }
437   }
439   return lines;
440 } /* }}} static int count_lines */
442 /* send the response back to the user.
443  * returns 0 on success, -1 on error
444  * write buffer is always zeroed after this call */
445 static int send_response (listen_socket_t *sock, response_code rc,
446                           char *fmt, ...) /* {{{ */
448   va_list argp;
449   char buffer[CMD_MAX];
450   int lines;
451   ssize_t wrote;
452   int rclen, len;
454   if (sock == NULL) return rc;  /* journal replay mode */
456   if (sock->batch_mode)
457   {
458     if (rc == RESP_OK)
459       return rc; /* no response on success during BATCH */
460     lines = sock->batch_cmd;
461   }
462   else if (rc == RESP_OK)
463     lines = count_lines(sock->wbuf);
464   else
465     lines = -1;
467   rclen = sprintf(buffer, "%d ", lines);
468   va_start(argp, fmt);
469 #ifdef HAVE_VSNPRINTF
470   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
471 #else
472   len = vsprintf(buffer+rclen, fmt, argp);
473 #endif
474   va_end(argp);
475   if (len < 0)
476     return -1;
478   len += rclen;
480   /* append the result to the wbuf, don't write to the user */
481   if (sock->batch_mode)
482     return add_to_wbuf(sock, buffer, len);
484   /* first write must be complete */
485   if (len != write(sock->fd, buffer, len))
486   {
487     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
488     return -1;
489   }
491   if (sock->wbuf != NULL && rc == RESP_OK)
492   {
493     wrote = 0;
494     while (wrote < sock->wbuf_len)
495     {
496       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
497       if (wb <= 0)
498       {
499         RRDD_LOG(LOG_INFO, "send_response: could not write results");
500         return -1;
501       }
502       wrote += wb;
503     }
504   }
506   free(sock->wbuf); sock->wbuf = NULL;
507   sock->wbuf_len = 0;
509   return 0;
510 } /* }}} */
512 static void wipe_ci_values(cache_item_t *ci, time_t when)
514   ci->values = NULL;
515   ci->values_num = 0;
517   ci->last_flush_time = when;
518   if (config_write_jitter > 0)
519     ci->last_flush_time += (random() % config_write_jitter);
522 /* remove_from_queue
523  * remove a "cache_item_t" item from the queue.
524  * must hold 'cache_lock' when calling this
525  */
526 static void remove_from_queue(cache_item_t *ci) /* {{{ */
528   if (ci == NULL) return;
530   if (ci->prev == NULL)
531     cache_queue_head = ci->next; /* reset head */
532   else
533     ci->prev->next = ci->next;
535   if (ci->next == NULL)
536     cache_queue_tail = ci->prev; /* reset the tail */
537   else
538     ci->next->prev = ci->prev;
540   ci->next = ci->prev = NULL;
541   ci->flags &= ~CI_FLAGS_IN_QUEUE;
542 } /* }}} static void remove_from_queue */
544 /* remove an entry from the tree and free all its resources.
545  * must hold 'cache lock' while calling this.
546  * returns 0 on success, otherwise errno */
547 static int forget_file(const char *file)
549   cache_item_t *ci;
551   ci = g_tree_lookup(cache_tree, file);
552   if (ci == NULL)
553     return ENOENT;
555   g_tree_remove (cache_tree, file);
556   remove_from_queue(ci);
558   for (int i=0; i < ci->values_num; i++)
559     free(ci->values[i]);
561   free (ci->values);
562   free (ci->file);
564   /* in case anyone is waiting */
565   pthread_cond_broadcast(&ci->flushed);
567   free (ci);
569   return 0;
570 } /* }}} static int forget_file */
572 /*
573  * enqueue_cache_item:
574  * `cache_lock' must be acquired before calling this function!
575  */
576 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
577     queue_side_t side)
579   if (ci == NULL)
580     return (-1);
582   if (ci->values_num == 0)
583     return (0);
585   if (side == HEAD)
586   {
587     if (cache_queue_head == ci)
588       return 0;
590     /* remove from the double linked list */
591     if (ci->flags & CI_FLAGS_IN_QUEUE)
592       remove_from_queue(ci);
594     ci->prev = NULL;
595     ci->next = cache_queue_head;
596     if (ci->next != NULL)
597       ci->next->prev = ci;
598     cache_queue_head = ci;
600     if (cache_queue_tail == NULL)
601       cache_queue_tail = cache_queue_head;
602   }
603   else /* (side == TAIL) */
604   {
605     /* We don't move values back in the list.. */
606     if (ci->flags & CI_FLAGS_IN_QUEUE)
607       return (0);
609     assert (ci->next == NULL);
610     assert (ci->prev == NULL);
612     ci->prev = cache_queue_tail;
614     if (cache_queue_tail == NULL)
615       cache_queue_head = ci;
616     else
617       cache_queue_tail->next = ci;
619     cache_queue_tail = ci;
620   }
622   ci->flags |= CI_FLAGS_IN_QUEUE;
624   pthread_cond_broadcast(&cache_cond);
625   pthread_mutex_lock (&stats_lock);
626   stats_queue_length++;
627   pthread_mutex_unlock (&stats_lock);
629   return (0);
630 } /* }}} int enqueue_cache_item */
632 /*
633  * tree_callback_flush:
634  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
635  * while this is in progress.
636  */
637 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
638     gpointer data)
640   cache_item_t *ci;
641   callback_flush_data_t *cfd;
643   ci = (cache_item_t *) value;
644   cfd = (callback_flush_data_t *) data;
646   if ((ci->last_flush_time <= cfd->abs_timeout)
647       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
648       && (ci->values_num > 0))
649   {
650     enqueue_cache_item (ci, TAIL);
651   }
652   else if ((do_shutdown != 0)
653       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
654       && (ci->values_num > 0))
655   {
656     enqueue_cache_item (ci, TAIL);
657   }
658   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
659       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
660       && (ci->values_num <= 0))
661   {
662     char **temp;
664     temp = (char **) realloc (cfd->keys,
665         sizeof (char *) * (cfd->keys_num + 1));
666     if (temp == NULL)
667     {
668       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
669       return (FALSE);
670     }
671     cfd->keys = temp;
672     /* Make really sure this points to the _same_ place */
673     assert ((char *) key == ci->file);
674     cfd->keys[cfd->keys_num] = (char *) key;
675     cfd->keys_num++;
676   }
678   return (FALSE);
679 } /* }}} gboolean tree_callback_flush */
681 static int flush_old_values (int max_age)
683   callback_flush_data_t cfd;
684   size_t k;
686   memset (&cfd, 0, sizeof (cfd));
687   /* Pass the current time as user data so that we don't need to call
688    * `time' for each node. */
689   cfd.now = time (NULL);
690   cfd.keys = NULL;
691   cfd.keys_num = 0;
693   if (max_age > 0)
694     cfd.abs_timeout = cfd.now - max_age;
695   else
696     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
698   /* `tree_callback_flush' will return the keys of all values that haven't
699    * been touched in the last `config_flush_interval' seconds in `cfd'.
700    * The char*'s in this array point to the same memory as ci->file, so we
701    * don't need to free them separately. */
702   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
704   for (k = 0; k < cfd.keys_num; k++)
705   {
706     /* should never fail, since we have held the cache_lock
707      * the entire time */
708     assert( forget_file(cfd.keys[k]) == 0 );
709   }
711   if (cfd.keys != NULL)
712   {
713     free (cfd.keys);
714     cfd.keys = NULL;
715   }
717   return (0);
718 } /* int flush_old_values */
720 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
722   struct timeval now;
723   struct timespec next_flush;
724   int final_flush = 0; /* make sure we only flush once on shutdown */
726   gettimeofday (&now, NULL);
727   next_flush.tv_sec = now.tv_sec + config_flush_interval;
728   next_flush.tv_nsec = 1000 * now.tv_usec;
730   pthread_mutex_lock (&cache_lock);
731   while ((do_shutdown == 0) || (cache_queue_head != NULL))
732   {
733     cache_item_t *ci;
734     char *file;
735     char **values;
736     int values_num;
737     int status;
738     int i;
740     /* First, check if it's time to do the cache flush. */
741     gettimeofday (&now, NULL);
742     if ((now.tv_sec > next_flush.tv_sec)
743         || ((now.tv_sec == next_flush.tv_sec)
744           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
745     {
746       /* Flush all values that haven't been written in the last
747        * `config_write_interval' seconds. */
748       flush_old_values (config_write_interval);
750       /* Determine the time of the next cache flush. */
751       while (next_flush.tv_sec <= now.tv_sec)
752         next_flush.tv_sec += config_flush_interval;
754       /* unlock the cache while we rotate so we don't block incoming
755        * updates if the fsync() blocks on disk I/O */
756       pthread_mutex_unlock(&cache_lock);
757       journal_rotate();
758       pthread_mutex_lock(&cache_lock);
759     }
761     /* Now, check if there's something to store away. If not, wait until
762      * something comes in or it's time to do the cache flush.  if we are
763      * shutting down, do not wait around.  */
764     if (cache_queue_head == NULL && !do_shutdown)
765     {
766       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
767       if ((status != 0) && (status != ETIMEDOUT))
768       {
769         RRDD_LOG (LOG_ERR, "queue_thread_main: "
770             "pthread_cond_timedwait returned %i.", status);
771       }
772     }
774     /* We're about to shut down */
775     if (do_shutdown != 0 && !final_flush++)
776     {
777       if (config_flush_at_shutdown)
778         flush_old_values (-1); /* flush everything */
779       else
780         break;
781     }
783     /* Check if a value has arrived. This may be NULL if we timed out or there
784      * was an interrupt such as a signal. */
785     if (cache_queue_head == NULL)
786       continue;
788     ci = cache_queue_head;
790     /* copy the relevant parts */
791     file = strdup (ci->file);
792     if (file == NULL)
793     {
794       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
795       continue;
796     }
798     assert(ci->values != NULL);
799     assert(ci->values_num > 0);
801     values = ci->values;
802     values_num = ci->values_num;
804     wipe_ci_values(ci, time(NULL));
805     remove_from_queue(ci);
807     pthread_mutex_lock (&stats_lock);
808     assert (stats_queue_length > 0);
809     stats_queue_length--;
810     pthread_mutex_unlock (&stats_lock);
812     pthread_mutex_unlock (&cache_lock);
814     rrd_clear_error ();
815     status = rrd_update_r (file, NULL, values_num, (void *) values);
816     if (status != 0)
817     {
818       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
819           "rrd_update_r (%s) failed with status %i. (%s)",
820           file, status, rrd_get_error());
821     }
823     journal_write("wrote", file);
824     pthread_cond_broadcast(&ci->flushed);
826     for (i = 0; i < values_num; i++)
827       free (values[i]);
829     free(values);
830     free(file);
832     if (status == 0)
833     {
834       pthread_mutex_lock (&stats_lock);
835       stats_updates_written++;
836       stats_data_sets_written += values_num;
837       pthread_mutex_unlock (&stats_lock);
838     }
840     pthread_mutex_lock (&cache_lock);
842     /* We're about to shut down */
843     if (do_shutdown != 0 && !final_flush++)
844     {
845       if (config_flush_at_shutdown)
846           flush_old_values (-1); /* flush everything */
847       else
848         break;
849     }
850   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
851   pthread_mutex_unlock (&cache_lock);
853   if (config_flush_at_shutdown)
854   {
855     assert(cache_queue_head == NULL);
856     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
857   }
859   journal_done();
861   return (NULL);
862 } /* }}} void *queue_thread_main */
864 static int buffer_get_field (char **buffer_ret, /* {{{ */
865     size_t *buffer_size_ret, char **field_ret)
867   char *buffer;
868   size_t buffer_pos;
869   size_t buffer_size;
870   char *field;
871   size_t field_size;
872   int status;
874   buffer = *buffer_ret;
875   buffer_pos = 0;
876   buffer_size = *buffer_size_ret;
877   field = *buffer_ret;
878   field_size = 0;
880   if (buffer_size <= 0)
881     return (-1);
883   /* This is ensured by `handle_request'. */
884   assert (buffer[buffer_size - 1] == '\0');
886   status = -1;
887   while (buffer_pos < buffer_size)
888   {
889     /* Check for end-of-field or end-of-buffer */
890     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
891     {
892       field[field_size] = 0;
893       field_size++;
894       buffer_pos++;
895       status = 0;
896       break;
897     }
898     /* Handle escaped characters. */
899     else if (buffer[buffer_pos] == '\\')
900     {
901       if (buffer_pos >= (buffer_size - 1))
902         break;
903       buffer_pos++;
904       field[field_size] = buffer[buffer_pos];
905       field_size++;
906       buffer_pos++;
907     }
908     /* Normal operation */ 
909     else
910     {
911       field[field_size] = buffer[buffer_pos];
912       field_size++;
913       buffer_pos++;
914     }
915   } /* while (buffer_pos < buffer_size) */
917   if (status != 0)
918     return (status);
920   *buffer_ret = buffer + buffer_pos;
921   *buffer_size_ret = buffer_size - buffer_pos;
922   *field_ret = field;
924   return (0);
925 } /* }}} int buffer_get_field */
927 /* if we're restricting writes to the base directory,
928  * check whether the file falls within the dir
929  * returns 1 if OK, otherwise 0
930  */
931 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
933   assert(file != NULL);
935   if (!config_write_base_only
936       || sock == NULL /* journal replay */
937       || config_base_dir == NULL)
938     return 1;
940   if (strstr(file, "../") != NULL) goto err;
942   /* relative paths without "../" are ok */
943   if (*file != '/') return 1;
945   /* file must be of the format base + "/" + <1+ char filename> */
946   if (strlen(file) < _config_base_dir_len + 2) goto err;
947   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
948   if (*(file + _config_base_dir_len) != '/') goto err;
950   return 1;
952 err:
953   if (sock != NULL && sock->fd >= 0)
954     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
956   return 0;
957 } /* }}} static int check_file_access */
959 /* returns 1 if we have the required privilege level,
960  * otherwise issue an error to the user on sock */
961 static int has_privilege (listen_socket_t *sock, /* {{{ */
962                           socket_privilege priv)
964   if (sock == NULL) /* journal replay */
965     return 1;
967   if (sock->privilege >= priv)
968     return 1;
970   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
971 } /* }}} static int has_privilege */
973 static int flush_file (const char *filename) /* {{{ */
975   cache_item_t *ci;
977   pthread_mutex_lock (&cache_lock);
979   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
980   if (ci == NULL)
981   {
982     pthread_mutex_unlock (&cache_lock);
983     return (ENOENT);
984   }
986   if (ci->values_num > 0)
987   {
988     /* Enqueue at head */
989     enqueue_cache_item (ci, HEAD);
990     pthread_cond_wait(&ci->flushed, &cache_lock);
991   }
993   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
994    * may have been purged during our cond_wait() */
996   pthread_mutex_unlock(&cache_lock);
998   return (0);
999 } /* }}} int flush_file */
1001 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1002     char *buffer, size_t buffer_size)
1004   int status;
1005   char **help_text;
1006   char *command;
1008   char *help_help[2] =
1009   {
1010     "Command overview\n"
1011     ,
1012     "HELP [<command>]\n"
1013     "FLUSH <filename>\n"
1014     "FLUSHALL\n"
1015     "PENDING <filename>\n"
1016     "FORGET <filename>\n"
1017     "UPDATE <filename> <values> [<values> ...]\n"
1018     "BATCH\n"
1019     "STATS\n"
1020   };
1022   char *help_flush[2] =
1023   {
1024     "Help for FLUSH\n"
1025     ,
1026     "Usage: FLUSH <filename>\n"
1027     "\n"
1028     "Adds the given filename to the head of the update queue and returns\n"
1029     "after is has been dequeued.\n"
1030   };
1032   char *help_flushall[2] =
1033   {
1034     "Help for FLUSHALL\n"
1035     ,
1036     "Usage: FLUSHALL\n"
1037     "\n"
1038     "Triggers writing of all pending updates.  Returns immediately.\n"
1039   };
1041   char *help_pending[2] =
1042   {
1043     "Help for PENDING\n"
1044     ,
1045     "Usage: PENDING <filename>\n"
1046     "\n"
1047     "Shows any 'pending' updates for a file, in order.\n"
1048     "The updates shown have not yet been written to the underlying RRD file.\n"
1049   };
1051   char *help_forget[2] =
1052   {
1053     "Help for FORGET\n"
1054     ,
1055     "Usage: FORGET <filename>\n"
1056     "\n"
1057     "Removes the file completely from the cache.\n"
1058     "Any pending updates for the file will be lost.\n"
1059   };
1061   char *help_update[2] =
1062   {
1063     "Help for UPDATE\n"
1064     ,
1065     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1066     "\n"
1067     "Adds the given file to the internal cache if it is not yet known and\n"
1068     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1069     "for details.\n"
1070     "\n"
1071     "Each <values> has the following form:\n"
1072     "  <values> = <time>:<value>[:<value>[...]]\n"
1073     "See the rrdupdate(1) manpage for details.\n"
1074   };
1076   char *help_stats[2] =
1077   {
1078     "Help for STATS\n"
1079     ,
1080     "Usage: STATS\n"
1081     "\n"
1082     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1083     "a description of the values.\n"
1084   };
1086   char *help_batch[2] =
1087   {
1088     "Help for BATCH\n"
1089     ,
1090     "The 'BATCH' command permits the client to initiate a bulk load\n"
1091     "   of commands to rrdcached.\n"
1092     "\n"
1093     "Usage:\n"
1094     "\n"
1095     "    client: BATCH\n"
1096     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1097     "    client: command #1\n"
1098     "    client: command #2\n"
1099     "    client: ... and so on\n"
1100     "    client: .\n"
1101     "    server: 2 errors\n"
1102     "    server: 7 message for command #7\n"
1103     "    server: 9 message for command #9\n"
1104     "\n"
1105     "For more information, consult the rrdcached(1) documentation.\n"
1106   };
1108   status = buffer_get_field (&buffer, &buffer_size, &command);
1109   if (status != 0)
1110     help_text = help_help;
1111   else
1112   {
1113     if (strcasecmp (command, "update") == 0)
1114       help_text = help_update;
1115     else if (strcasecmp (command, "flush") == 0)
1116       help_text = help_flush;
1117     else if (strcasecmp (command, "flushall") == 0)
1118       help_text = help_flushall;
1119     else if (strcasecmp (command, "pending") == 0)
1120       help_text = help_pending;
1121     else if (strcasecmp (command, "forget") == 0)
1122       help_text = help_forget;
1123     else if (strcasecmp (command, "stats") == 0)
1124       help_text = help_stats;
1125     else if (strcasecmp (command, "batch") == 0)
1126       help_text = help_batch;
1127     else
1128       help_text = help_help;
1129   }
1131   add_response_info(sock, help_text[1]);
1132   return send_response(sock, RESP_OK, help_text[0]);
1133 } /* }}} int handle_request_help */
1135 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1137   uint64_t copy_queue_length;
1138   uint64_t copy_updates_received;
1139   uint64_t copy_flush_received;
1140   uint64_t copy_updates_written;
1141   uint64_t copy_data_sets_written;
1142   uint64_t copy_journal_bytes;
1143   uint64_t copy_journal_rotate;
1145   uint64_t tree_nodes_number;
1146   uint64_t tree_depth;
1148   pthread_mutex_lock (&stats_lock);
1149   copy_queue_length       = stats_queue_length;
1150   copy_updates_received   = stats_updates_received;
1151   copy_flush_received     = stats_flush_received;
1152   copy_updates_written    = stats_updates_written;
1153   copy_data_sets_written  = stats_data_sets_written;
1154   copy_journal_bytes      = stats_journal_bytes;
1155   copy_journal_rotate     = stats_journal_rotate;
1156   pthread_mutex_unlock (&stats_lock);
1158   pthread_mutex_lock (&cache_lock);
1159   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1161   pthread_mutex_unlock (&cache_lock);
1163   add_response_info(sock,
1164                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1165   add_response_info(sock,
1166                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167   add_response_info(sock,
1168                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169   add_response_info(sock,
1170                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171   add_response_info(sock,
1172                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1178   send_response(sock, RESP_OK, "Statistics follow\n");
1180   return (0);
1181 } /* }}} int handle_request_stats */
1183 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1184     char *buffer, size_t buffer_size)
1186   char *file;
1187   int status;
1189   status = buffer_get_field (&buffer, &buffer_size, &file);
1190   if (status != 0)
1191   {
1192     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1193   }
1194   else
1195   {
1196     pthread_mutex_lock(&stats_lock);
1197     stats_flush_received++;
1198     pthread_mutex_unlock(&stats_lock);
1200     if (!check_file_access(file, sock)) return 0;
1202     status = flush_file (file);
1203     if (status == 0)
1204       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205     else if (status == ENOENT)
1206     {
1207       /* no file in our tree; see whether it exists at all */
1208       struct stat statbuf;
1210       memset(&statbuf, 0, sizeof(statbuf));
1211       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1213       else
1214         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1215     }
1216     else if (status < 0)
1217       return send_response(sock, RESP_ERR, "Internal error.\n");
1218     else
1219       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1220   }
1222   /* NOTREACHED */
1223   assert(1==0);
1224 } /* }}} int handle_request_flush */
1226 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1228   int status;
1230   status = has_privilege(sock, PRIV_HIGH);
1231   if (status <= 0)
1232     return status;
1234   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1236   pthread_mutex_lock(&cache_lock);
1237   flush_old_values(-1);
1238   pthread_mutex_unlock(&cache_lock);
1240   return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1243 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1244                                   char *buffer, size_t buffer_size)
1246   int status;
1247   char *file;
1248   cache_item_t *ci;
1250   status = buffer_get_field(&buffer, &buffer_size, &file);
1251   if (status != 0)
1252     return send_response(sock, RESP_ERR,
1253                          "Usage: PENDING <filename>\n");
1255   status = has_privilege(sock, PRIV_HIGH);
1256   if (status <= 0)
1257     return status;
1259   pthread_mutex_lock(&cache_lock);
1260   ci = g_tree_lookup(cache_tree, file);
1261   if (ci == NULL)
1262   {
1263     pthread_mutex_unlock(&cache_lock);
1264     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265   }
1267   for (int i=0; i < ci->values_num; i++)
1268     add_response_info(sock, "%s\n", ci->values[i]);
1270   pthread_mutex_unlock(&cache_lock);
1271   return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1274 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1275                                  char *buffer, size_t buffer_size)
1277   int status;
1278   char *file;
1280   status = buffer_get_field(&buffer, &buffer_size, &file);
1281   if (status != 0)
1282     return send_response(sock, RESP_ERR,
1283                          "Usage: FORGET <filename>\n");
1285   status = has_privilege(sock, PRIV_HIGH);
1286   if (status <= 0)
1287     return status;
1289   if (!check_file_access(file, sock)) return 0;
1291   pthread_mutex_lock(&cache_lock);
1292   status = forget_file(file);
1293   pthread_mutex_unlock(&cache_lock);
1295   if (status == 0)
1296   {
1297     if (sock != NULL)
1298       journal_write("forget", file);
1300     return send_response(sock, RESP_OK, "Gone!\n");
1301   }
1302   else
1303     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1304                          status < 0 ? "Internal error" : rrd_strerror(status));
1306   /* NOTREACHED */
1307   assert(1==0);
1308 } /* }}} static int handle_request_forget */
1310 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1311     char *buffer, size_t buffer_size)
1313   char *file;
1314   int values_num = 0;
1315   int bad_timestamps = 0;
1316   int status;
1317   char orig_buf[CMD_MAX];
1319   time_t now;
1320   cache_item_t *ci;
1322   now = time (NULL);
1324   status = has_privilege(sock, PRIV_HIGH);
1325   if (status <= 0)
1326     return status;
1328   /* save it for the journal later */
1329   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1331   status = buffer_get_field (&buffer, &buffer_size, &file);
1332   if (status != 0)
1333     return send_response(sock, RESP_ERR,
1334                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1336   pthread_mutex_lock(&stats_lock);
1337   stats_updates_received++;
1338   pthread_mutex_unlock(&stats_lock);
1340   if (!check_file_access(file, sock)) return 0;
1342   pthread_mutex_lock (&cache_lock);
1343   ci = g_tree_lookup (cache_tree, file);
1345   if (ci == NULL) /* {{{ */
1346   {
1347     struct stat statbuf;
1349     /* don't hold the lock while we setup; stat(2) might block */
1350     pthread_mutex_unlock(&cache_lock);
1352     memset (&statbuf, 0, sizeof (statbuf));
1353     status = stat (file, &statbuf);
1354     if (status != 0)
1355     {
1356       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1358       status = errno;
1359       if (status == ENOENT)
1360         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1361       else
1362         return send_response(sock, RESP_ERR,
1363                              "stat failed with error %i.\n", status);
1364     }
1365     if (!S_ISREG (statbuf.st_mode))
1366       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1368     if (access(file, R_OK|W_OK) != 0)
1369       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1370                            file, rrd_strerror(errno));
1372     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1373     if (ci == NULL)
1374     {
1375       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1377       return send_response(sock, RESP_ERR, "malloc failed.\n");
1378     }
1379     memset (ci, 0, sizeof (cache_item_t));
1381     ci->file = strdup (file);
1382     if (ci->file == NULL)
1383     {
1384       free (ci);
1385       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1387       return send_response(sock, RESP_ERR, "strdup failed.\n");
1388     }
1390     wipe_ci_values(ci, now);
1391     ci->flags = CI_FLAGS_IN_TREE;
1393     pthread_mutex_lock(&cache_lock);
1394     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1395   } /* }}} */
1396   assert (ci != NULL);
1398   /* don't re-write updates in replay mode */
1399   if (sock != NULL)
1400     journal_write("update", orig_buf);
1402   while (buffer_size > 0)
1403   {
1404     char **temp;
1405     char *value;
1406     time_t stamp;
1407     char *eostamp;
1409     status = buffer_get_field (&buffer, &buffer_size, &value);
1410     if (status != 0)
1411     {
1412       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1413       break;
1414     }
1416     /* make sure update time is always moving forward */
1417     stamp = strtol(value, &eostamp, 10);
1418     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1419     {
1420       ++bad_timestamps;
1421       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1422       continue;
1423     }
1424     else if (stamp <= ci->last_update_stamp)
1425     {
1426       ++bad_timestamps;
1427       add_response_info(sock,
1428                         "illegal attempt to update using time %ld when"
1429                         " last update time is %ld (minimum one second step)\n",
1430                         stamp, ci->last_update_stamp);
1431       continue;
1432     }
1433     else
1434       ci->last_update_stamp = stamp;
1436     temp = (char **) realloc (ci->values,
1437         sizeof (char *) * (ci->values_num + 1));
1438     if (temp == NULL)
1439     {
1440       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1441       continue;
1442     }
1443     ci->values = temp;
1445     ci->values[ci->values_num] = strdup (value);
1446     if (ci->values[ci->values_num] == NULL)
1447     {
1448       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1449       continue;
1450     }
1451     ci->values_num++;
1453     values_num++;
1454   }
1456   if (((now - ci->last_flush_time) >= config_write_interval)
1457       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1458       && (ci->values_num > 0))
1459   {
1460     enqueue_cache_item (ci, TAIL);
1461   }
1463   pthread_mutex_unlock (&cache_lock);
1465   if (values_num < 1)
1466   {
1467     /* if we had only one update attempt, then return the full
1468        error message... try to get the most information out
1469        of the limited error space allowed by the protocol
1470     */
1471     if (bad_timestamps == 1)
1472       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1473     else
1474       return send_response(sock, RESP_ERR,
1475                            "No values updated (%d bad timestamps).\n",
1476                            bad_timestamps);
1477   }
1478   else
1479     return send_response(sock, RESP_OK,
1480                          "errors, enqueued %i value(s).\n", values_num);
1482   /* NOTREACHED */
1483   assert(1==0);
1485 } /* }}} int handle_request_update */
1487 /* we came across a "WROTE" entry during journal replay.
1488  * throw away any values that we have accumulated for this file
1489  */
1490 static int handle_request_wrote (const char *buffer) /* {{{ */
1492   int i;
1493   cache_item_t *ci;
1494   const char *file = buffer;
1496   pthread_mutex_lock(&cache_lock);
1498   ci = g_tree_lookup(cache_tree, file);
1499   if (ci == NULL)
1500   {
1501     pthread_mutex_unlock(&cache_lock);
1502     return (0);
1503   }
1505   if (ci->values)
1506   {
1507     for (i=0; i < ci->values_num; i++)
1508       free(ci->values[i]);
1510     free(ci->values);
1511   }
1513   wipe_ci_values(ci, time(NULL));
1514   remove_from_queue(ci);
1516   pthread_mutex_unlock(&cache_lock);
1517   return (0);
1518 } /* }}} int handle_request_wrote */
1520 /* start "BATCH" processing */
1521 static int batch_start (listen_socket_t *sock) /* {{{ */
1523   int status;
1524   if (sock->batch_mode)
1525     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1527   status = send_response(sock, RESP_OK,
1528                          "Go ahead.  End with dot '.' on its own line.\n");
1529   sock->batch_mode = 1;
1530   sock->batch_cmd = 0;
1532   return status;
1533 } /* }}} static int batch_start */
1535 /* finish "BATCH" processing and return results to the client */
1536 static int batch_done (listen_socket_t *sock) /* {{{ */
1538   assert(sock->batch_mode);
1539   sock->batch_mode = 0;
1540   sock->batch_cmd  = 0;
1541   return send_response(sock, RESP_OK, "errors\n");
1542 } /* }}} static int batch_done */
1544 /* if sock==NULL, we are in journal replay mode */
1545 static int handle_request (listen_socket_t *sock, /* {{{ */
1546                            char *buffer, size_t buffer_size)
1548   char *buffer_ptr;
1549   char *command;
1550   int status;
1552   assert (buffer[buffer_size - 1] == '\0');
1554   buffer_ptr = buffer;
1555   command = NULL;
1556   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1557   if (status != 0)
1558   {
1559     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1560     return (-1);
1561   }
1563   if (sock != NULL && sock->batch_mode)
1564     sock->batch_cmd++;
1566   if (strcasecmp (command, "update") == 0)
1567     return (handle_request_update (sock, buffer_ptr, buffer_size));
1568   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1569   {
1570     /* this is only valid in replay mode */
1571     return (handle_request_wrote (buffer_ptr));
1572   }
1573   else if (strcasecmp (command, "flush") == 0)
1574     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1575   else if (strcasecmp (command, "flushall") == 0)
1576     return (handle_request_flushall(sock));
1577   else if (strcasecmp (command, "pending") == 0)
1578     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1579   else if (strcasecmp (command, "forget") == 0)
1580     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1581   else if (strcasecmp (command, "stats") == 0)
1582     return (handle_request_stats (sock));
1583   else if (strcasecmp (command, "help") == 0)
1584     return (handle_request_help (sock, buffer_ptr, buffer_size));
1585   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1586     return batch_start(sock);
1587   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1588     return batch_done(sock);
1589   else
1590     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1592   /* NOTREACHED */
1593   assert(1==0);
1594 } /* }}} int handle_request */
1596 /* MUST NOT hold journal_lock before calling this */
1597 static void journal_rotate(void) /* {{{ */
1599   FILE *old_fh = NULL;
1600   int new_fd;
1602   if (journal_cur == NULL || journal_old == NULL)
1603     return;
1605   pthread_mutex_lock(&journal_lock);
1607   /* we rotate this way (rename before close) so that the we can release
1608    * the journal lock as fast as possible.  Journal writes to the new
1609    * journal can proceed immediately after the new file is opened.  The
1610    * fclose can then block without affecting new updates.
1611    */
1612   if (journal_fh != NULL)
1613   {
1614     old_fh = journal_fh;
1615     journal_fh = NULL;
1616     rename(journal_cur, journal_old);
1617     ++stats_journal_rotate;
1618   }
1620   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1621                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1622   if (new_fd >= 0)
1623   {
1624     journal_fh = fdopen(new_fd, "a");
1625     if (journal_fh == NULL)
1626       close(new_fd);
1627   }
1629   pthread_mutex_unlock(&journal_lock);
1631   if (old_fh != NULL)
1632     fclose(old_fh);
1634   if (journal_fh == NULL)
1635   {
1636     RRDD_LOG(LOG_CRIT,
1637              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1638              journal_cur, rrd_strerror(errno));
1640     RRDD_LOG(LOG_ERR,
1641              "JOURNALING DISABLED: All values will be flushed at shutdown");
1642     config_flush_at_shutdown = 1;
1643   }
1645 } /* }}} static void journal_rotate */
1647 static void journal_done(void) /* {{{ */
1649   if (journal_cur == NULL)
1650     return;
1652   pthread_mutex_lock(&journal_lock);
1653   if (journal_fh != NULL)
1654   {
1655     fclose(journal_fh);
1656     journal_fh = NULL;
1657   }
1659   if (config_flush_at_shutdown)
1660   {
1661     RRDD_LOG(LOG_INFO, "removing journals");
1662     unlink(journal_old);
1663     unlink(journal_cur);
1664   }
1665   else
1666   {
1667     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1668              "journals will be used at next startup");
1669   }
1671   pthread_mutex_unlock(&journal_lock);
1673 } /* }}} static void journal_done */
1675 static int journal_write(char *cmd, char *args) /* {{{ */
1677   int chars;
1679   if (journal_fh == NULL)
1680     return 0;
1682   pthread_mutex_lock(&journal_lock);
1683   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1684   pthread_mutex_unlock(&journal_lock);
1686   if (chars > 0)
1687   {
1688     pthread_mutex_lock(&stats_lock);
1689     stats_journal_bytes += chars;
1690     pthread_mutex_unlock(&stats_lock);
1691   }
1693   return chars;
1694 } /* }}} static int journal_write */
1696 static int journal_replay (const char *file) /* {{{ */
1698   FILE *fh;
1699   int entry_cnt = 0;
1700   int fail_cnt = 0;
1701   uint64_t line = 0;
1702   char entry[CMD_MAX];
1704   if (file == NULL) return 0;
1706   {
1707     char *reason;
1708     int status = 0;
1709     struct stat statbuf;
1711     memset(&statbuf, 0, sizeof(statbuf));
1712     if (stat(file, &statbuf) != 0)
1713     {
1714       if (errno == ENOENT)
1715         return 0;
1717       reason = "stat error";
1718       status = errno;
1719     }
1720     else if (!S_ISREG(statbuf.st_mode))
1721     {
1722       reason = "not a regular file";
1723       status = EPERM;
1724     }
1725     if (statbuf.st_uid != daemon_uid)
1726     {
1727       reason = "not owned by daemon user";
1728       status = EACCES;
1729     }
1730     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1731     {
1732       reason = "must not be user/group writable";
1733       status = EACCES;
1734     }
1736     if (status != 0)
1737     {
1738       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1739                file, rrd_strerror(status), reason);
1740       return 0;
1741     }
1742   }
1744   fh = fopen(file, "r");
1745   if (fh == NULL)
1746   {
1747     if (errno != ENOENT)
1748       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1749                file, rrd_strerror(errno));
1750     return 0;
1751   }
1752   else
1753     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1755   while(!feof(fh))
1756   {
1757     size_t entry_len;
1759     ++line;
1760     if (fgets(entry, sizeof(entry), fh) == NULL)
1761       break;
1762     entry_len = strlen(entry);
1764     /* check \n termination in case journal writing crashed mid-line */
1765     if (entry_len == 0)
1766       continue;
1767     else if (entry[entry_len - 1] != '\n')
1768     {
1769       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1770       ++fail_cnt;
1771       continue;
1772     }
1774     entry[entry_len - 1] = '\0';
1776     if (handle_request(NULL, entry, entry_len) == 0)
1777       ++entry_cnt;
1778     else
1779       ++fail_cnt;
1780   }
1782   fclose(fh);
1784   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1785            entry_cnt, fail_cnt);
1787   return entry_cnt > 0 ? 1 : 0;
1788 } /* }}} static int journal_replay */
1790 static void journal_init(void) /* {{{ */
1792   int had_journal = 0;
1794   if (journal_cur == NULL) return;
1796   pthread_mutex_lock(&journal_lock);
1798   RRDD_LOG(LOG_INFO, "checking for journal files");
1800   had_journal += journal_replay(journal_old);
1801   had_journal += journal_replay(journal_cur);
1803   /* it must have been a crash.  start a flush */
1804   if (had_journal && config_flush_at_shutdown)
1805     flush_old_values(-1);
1807   pthread_mutex_unlock(&journal_lock);
1808   journal_rotate();
1810   RRDD_LOG(LOG_INFO, "journal processing complete");
1812 } /* }}} static void journal_init */
1814 static void close_connection(listen_socket_t *sock)
1816   close(sock->fd) ;  sock->fd   = -1;
1817   free(sock->rbuf);  sock->rbuf = NULL;
1818   free(sock->wbuf);  sock->wbuf = NULL;
1820   free(sock);
1823 static void *connection_thread_main (void *args) /* {{{ */
1825   pthread_t self;
1826   listen_socket_t *sock;
1827   int i;
1828   int fd;
1830   sock = (listen_socket_t *) args;
1831   fd = sock->fd;
1833   /* init read buffers */
1834   sock->next_read = sock->next_cmd = 0;
1835   sock->rbuf = malloc(RBUF_SIZE);
1836   if (sock->rbuf == NULL)
1837   {
1838     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1839     close_connection(sock);
1840     return NULL;
1841   }
1843   pthread_mutex_lock (&connection_threads_lock);
1844   {
1845     pthread_t *temp;
1847     temp = (pthread_t *) realloc (connection_threads,
1848         sizeof (pthread_t) * (connection_threads_num + 1));
1849     if (temp == NULL)
1850     {
1851       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1852     }
1853     else
1854     {
1855       connection_threads = temp;
1856       connection_threads[connection_threads_num] = pthread_self ();
1857       connection_threads_num++;
1858     }
1859   }
1860   pthread_mutex_unlock (&connection_threads_lock);
1862   while (do_shutdown == 0)
1863   {
1864     char *cmd;
1865     ssize_t cmd_len;
1866     ssize_t rbytes;
1868     struct pollfd pollfd;
1869     int status;
1871     pollfd.fd = fd;
1872     pollfd.events = POLLIN | POLLPRI;
1873     pollfd.revents = 0;
1875     status = poll (&pollfd, 1, /* timeout = */ 500);
1876     if (do_shutdown)
1877       break;
1878     else if (status == 0) /* timeout */
1879       continue;
1880     else if (status < 0) /* error */
1881     {
1882       status = errno;
1883       if (status != EINTR)
1884         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1885       continue;
1886     }
1888     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1889       break;
1890     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1891     {
1892       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1893           "poll(2) returned something unexpected: %#04hx",
1894           pollfd.revents);
1895       break;
1896     }
1898     rbytes = read(fd, sock->rbuf + sock->next_read,
1899                   RBUF_SIZE - sock->next_read);
1900     if (rbytes < 0)
1901     {
1902       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1903       break;
1904     }
1905     else if (rbytes == 0)
1906       break; /* eof */
1908     sock->next_read += rbytes;
1910     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1911     {
1912       status = handle_request (sock, cmd, cmd_len+1);
1913       if (status != 0)
1914         goto out_close;
1915     }
1916   }
1918 out_close:
1919   close_connection(sock);
1921   self = pthread_self ();
1922   /* Remove this thread from the connection threads list */
1923   pthread_mutex_lock (&connection_threads_lock);
1924   /* Find out own index in the array */
1925   for (i = 0; i < connection_threads_num; i++)
1926     if (pthread_equal (connection_threads[i], self) != 0)
1927       break;
1928   assert (i < connection_threads_num);
1930   /* Move the trailing threads forward. */
1931   if (i < (connection_threads_num - 1))
1932   {
1933     memmove (connection_threads + i,
1934         connection_threads + i + 1,
1935         sizeof (pthread_t) * (connection_threads_num - i - 1));
1936   }
1938   connection_threads_num--;
1939   pthread_mutex_unlock (&connection_threads_lock);
1941   return (NULL);
1942 } /* }}} void *connection_thread_main */
1944 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1946   int fd;
1947   struct sockaddr_un sa;
1948   listen_socket_t *temp;
1949   int status;
1950   const char *path;
1952   path = sock->addr;
1953   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1954     path += strlen("unix:");
1956   temp = (listen_socket_t *) realloc (listen_fds,
1957       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1958   if (temp == NULL)
1959   {
1960     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1961     return (-1);
1962   }
1963   listen_fds = temp;
1964   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1966   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1967   if (fd < 0)
1968   {
1969     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1970     return (-1);
1971   }
1973   memset (&sa, 0, sizeof (sa));
1974   sa.sun_family = AF_UNIX;
1975   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1977   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1978   if (status != 0)
1979   {
1980     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1981     close (fd);
1982     unlink (path);
1983     return (-1);
1984   }
1986   status = listen (fd, /* backlog = */ 10);
1987   if (status != 0)
1988   {
1989     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1990     close (fd);
1991     unlink (path);
1992     return (-1);
1993   }
1995   listen_fds[listen_fds_num].fd = fd;
1996   listen_fds[listen_fds_num].family = PF_UNIX;
1997   strncpy(listen_fds[listen_fds_num].addr, path,
1998           sizeof (listen_fds[listen_fds_num].addr) - 1);
1999   listen_fds_num++;
2001   return (0);
2002 } /* }}} int open_listen_socket_unix */
2004 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2006   struct addrinfo ai_hints;
2007   struct addrinfo *ai_res;
2008   struct addrinfo *ai_ptr;
2009   char addr_copy[NI_MAXHOST];
2010   char *addr;
2011   char *port;
2012   int status;
2014   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2015   addr_copy[sizeof (addr_copy) - 1] = 0;
2016   addr = addr_copy;
2018   memset (&ai_hints, 0, sizeof (ai_hints));
2019   ai_hints.ai_flags = 0;
2020 #ifdef AI_ADDRCONFIG
2021   ai_hints.ai_flags |= AI_ADDRCONFIG;
2022 #endif
2023   ai_hints.ai_family = AF_UNSPEC;
2024   ai_hints.ai_socktype = SOCK_STREAM;
2026   port = NULL;
2027   if (*addr == '[') /* IPv6+port format */
2028   {
2029     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2030     addr++;
2032     port = strchr (addr, ']');
2033     if (port == NULL)
2034     {
2035       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2036           sock->addr);
2037       return (-1);
2038     }
2039     *port = 0;
2040     port++;
2042     if (*port == ':')
2043       port++;
2044     else if (*port == 0)
2045       port = NULL;
2046     else
2047     {
2048       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2049           port);
2050       return (-1);
2051     }
2052   } /* if (*addr = ']') */
2053   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2054   {
2055     port = rindex(addr, ':');
2056     if (port != NULL)
2057     {
2058       *port = 0;
2059       port++;
2060     }
2061   }
2062   ai_res = NULL;
2063   status = getaddrinfo (addr,
2064                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2065                         &ai_hints, &ai_res);
2066   if (status != 0)
2067   {
2068     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2069         "%s", addr, gai_strerror (status));
2070     return (-1);
2071   }
2073   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2074   {
2075     int fd;
2076     listen_socket_t *temp;
2077     int one = 1;
2079     temp = (listen_socket_t *) realloc (listen_fds,
2080         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2081     if (temp == NULL)
2082     {
2083       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2084       continue;
2085     }
2086     listen_fds = temp;
2087     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2089     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2090     if (fd < 0)
2091     {
2092       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2093       continue;
2094     }
2096     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2098     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2099     if (status != 0)
2100     {
2101       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2102       close (fd);
2103       continue;
2104     }
2106     status = listen (fd, /* backlog = */ 10);
2107     if (status != 0)
2108     {
2109       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2110       close (fd);
2111       return (-1);
2112     }
2114     listen_fds[listen_fds_num].fd = fd;
2115     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2116     listen_fds_num++;
2117   } /* for (ai_ptr) */
2119   return (0);
2120 } /* }}} static int open_listen_socket_network */
2122 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2124   assert(sock != NULL);
2125   assert(sock->addr != NULL);
2127   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2128       || sock->addr[0] == '/')
2129     return (open_listen_socket_unix(sock));
2130   else
2131     return (open_listen_socket_network(sock));
2132 } /* }}} int open_listen_socket */
2134 static int close_listen_sockets (void) /* {{{ */
2136   size_t i;
2138   for (i = 0; i < listen_fds_num; i++)
2139   {
2140     close (listen_fds[i].fd);
2142     if (listen_fds[i].family == PF_UNIX)
2143       unlink(listen_fds[i].addr);
2144   }
2146   free (listen_fds);
2147   listen_fds = NULL;
2148   listen_fds_num = 0;
2150   return (0);
2151 } /* }}} int close_listen_sockets */
2153 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2155   struct pollfd *pollfds;
2156   int pollfds_num;
2157   int status;
2158   int i;
2160   for (i = 0; i < config_listen_address_list_len; i++)
2161     open_listen_socket (config_listen_address_list[i]);
2163   if (config_listen_address_list_len < 1)
2164   {
2165     listen_socket_t sock;
2166     memset(&sock, 0, sizeof(sock));
2167     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2168     open_listen_socket (&sock);
2169   }
2171   if (listen_fds_num < 1)
2172   {
2173     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2174         "could be opened. Sorry.");
2175     return (NULL);
2176   }
2178   pollfds_num = listen_fds_num;
2179   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2180   if (pollfds == NULL)
2181   {
2182     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2183     return (NULL);
2184   }
2185   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2187   RRDD_LOG(LOG_INFO, "listening for connections");
2189   while (do_shutdown == 0)
2190   {
2191     assert (pollfds_num == ((int) listen_fds_num));
2192     for (i = 0; i < pollfds_num; i++)
2193     {
2194       pollfds[i].fd = listen_fds[i].fd;
2195       pollfds[i].events = POLLIN | POLLPRI;
2196       pollfds[i].revents = 0;
2197     }
2199     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2200     if (do_shutdown)
2201       break;
2202     else if (status == 0) /* timeout */
2203       continue;
2204     else if (status < 0) /* error */
2205     {
2206       status = errno;
2207       if (status != EINTR)
2208       {
2209         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2210       }
2211       continue;
2212     }
2214     for (i = 0; i < pollfds_num; i++)
2215     {
2216       listen_socket_t *client_sock;
2217       struct sockaddr_storage client_sa;
2218       socklen_t client_sa_size;
2219       pthread_t tid;
2220       pthread_attr_t attr;
2222       if (pollfds[i].revents == 0)
2223         continue;
2225       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2226       {
2227         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2228             "poll(2) returned something unexpected for listen FD #%i.",
2229             pollfds[i].fd);
2230         continue;
2231       }
2233       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2234       if (client_sock == NULL)
2235       {
2236         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2237         continue;
2238       }
2239       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2241       client_sa_size = sizeof (client_sa);
2242       client_sock->fd = accept (pollfds[i].fd,
2243           (struct sockaddr *) &client_sa, &client_sa_size);
2244       if (client_sock->fd < 0)
2245       {
2246         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2247         free(client_sock);
2248         continue;
2249       }
2251       pthread_attr_init (&attr);
2252       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2254       status = pthread_create (&tid, &attr, connection_thread_main,
2255                                client_sock);
2256       if (status != 0)
2257       {
2258         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2259         close_connection(client_sock);
2260         continue;
2261       }
2262     } /* for (pollfds_num) */
2263   } /* while (do_shutdown == 0) */
2265   RRDD_LOG(LOG_INFO, "starting shutdown");
2267   close_listen_sockets ();
2269   pthread_mutex_lock (&connection_threads_lock);
2270   while (connection_threads_num > 0)
2271   {
2272     pthread_t wait_for;
2274     wait_for = connection_threads[0];
2276     pthread_mutex_unlock (&connection_threads_lock);
2277     pthread_join (wait_for, /* retval = */ NULL);
2278     pthread_mutex_lock (&connection_threads_lock);
2279   }
2280   pthread_mutex_unlock (&connection_threads_lock);
2282   return (NULL);
2283 } /* }}} void *listen_thread_main */
2285 static int daemonize (void) /* {{{ */
2287   int status;
2288   int fd;
2289   char *base_dir;
2291   daemon_uid = geteuid();
2293   fd = open_pidfile();
2294   if (fd < 0) return fd;
2296   if (!stay_foreground)
2297   {
2298     pid_t child;
2300     child = fork ();
2301     if (child < 0)
2302     {
2303       fprintf (stderr, "daemonize: fork(2) failed.\n");
2304       return (-1);
2305     }
2306     else if (child > 0)
2307     {
2308       return (1);
2309     }
2311     /* Become session leader */
2312     setsid ();
2314     /* Open the first three file descriptors to /dev/null */
2315     close (2);
2316     close (1);
2317     close (0);
2319     open ("/dev/null", O_RDWR);
2320     dup (0);
2321     dup (0);
2322   } /* if (!stay_foreground) */
2324   /* Change into the /tmp directory. */
2325   base_dir = (config_base_dir != NULL)
2326     ? config_base_dir
2327     : "/tmp";
2328   status = chdir (base_dir);
2329   if (status != 0)
2330   {
2331     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2332     return (-1);
2333   }
2335   install_signal_handlers();
2337   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2338   RRDD_LOG(LOG_INFO, "starting up");
2340   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2341   if (cache_tree == NULL)
2342   {
2343     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2344     return (-1);
2345   }
2347   status = write_pidfile (fd);
2348   return status;
2349 } /* }}} int daemonize */
2351 static int cleanup (void) /* {{{ */
2353   do_shutdown++;
2355   pthread_cond_signal (&cache_cond);
2356   pthread_join (queue_thread, /* return = */ NULL);
2358   remove_pidfile ();
2360   RRDD_LOG(LOG_INFO, "goodbye");
2361   closelog ();
2363   return (0);
2364 } /* }}} int cleanup */
2366 static int read_options (int argc, char **argv) /* {{{ */
2368   int option;
2369   int status = 0;
2371   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2372   {
2373     switch (option)
2374     {
2375       case 'g':
2376         stay_foreground=1;
2377         break;
2379       case 'L':
2380       case 'l':
2381       {
2382         listen_socket_t **temp;
2383         listen_socket_t *new;
2385         new = malloc(sizeof(listen_socket_t));
2386         if (new == NULL)
2387         {
2388           fprintf(stderr, "read_options: malloc failed.\n");
2389           return(2);
2390         }
2391         memset(new, 0, sizeof(listen_socket_t));
2393         temp = (listen_socket_t **) realloc (config_listen_address_list,
2394             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2395         if (temp == NULL)
2396         {
2397           fprintf (stderr, "read_options: realloc failed.\n");
2398           return (2);
2399         }
2400         config_listen_address_list = temp;
2402         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2403         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2405         temp[config_listen_address_list_len] = new;
2406         config_listen_address_list_len++;
2407       }
2408       break;
2410       case 'f':
2411       {
2412         int temp;
2414         temp = atoi (optarg);
2415         if (temp > 0)
2416           config_flush_interval = temp;
2417         else
2418         {
2419           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2420           status = 3;
2421         }
2422       }
2423       break;
2425       case 'w':
2426       {
2427         int temp;
2429         temp = atoi (optarg);
2430         if (temp > 0)
2431           config_write_interval = temp;
2432         else
2433         {
2434           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2435           status = 2;
2436         }
2437       }
2438       break;
2440       case 'z':
2441       {
2442         int temp;
2444         temp = atoi(optarg);
2445         if (temp > 0)
2446           config_write_jitter = temp;
2447         else
2448         {
2449           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2450           status = 2;
2451         }
2453         break;
2454       }
2456       case 'B':
2457         config_write_base_only = 1;
2458         break;
2460       case 'b':
2461       {
2462         size_t len;
2464         if (config_base_dir != NULL)
2465           free (config_base_dir);
2466         config_base_dir = strdup (optarg);
2467         if (config_base_dir == NULL)
2468         {
2469           fprintf (stderr, "read_options: strdup failed.\n");
2470           return (3);
2471         }
2473         len = strlen (config_base_dir);
2474         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2475         {
2476           config_base_dir[len - 1] = 0;
2477           len--;
2478         }
2480         if (len < 1)
2481         {
2482           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2483           return (4);
2484         }
2486         _config_base_dir_len = len;
2487       }
2488       break;
2490       case 'p':
2491       {
2492         if (config_pid_file != NULL)
2493           free (config_pid_file);
2494         config_pid_file = strdup (optarg);
2495         if (config_pid_file == NULL)
2496         {
2497           fprintf (stderr, "read_options: strdup failed.\n");
2498           return (3);
2499         }
2500       }
2501       break;
2503       case 'F':
2504         config_flush_at_shutdown = 1;
2505         break;
2507       case 'j':
2508       {
2509         struct stat statbuf;
2510         const char *dir = optarg;
2512         status = stat(dir, &statbuf);
2513         if (status != 0)
2514         {
2515           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2516           return 6;
2517         }
2519         if (!S_ISDIR(statbuf.st_mode)
2520             || access(dir, R_OK|W_OK|X_OK) != 0)
2521         {
2522           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2523                   errno ? rrd_strerror(errno) : "");
2524           return 6;
2525         }
2527         journal_cur = malloc(PATH_MAX + 1);
2528         journal_old = malloc(PATH_MAX + 1);
2529         if (journal_cur == NULL || journal_old == NULL)
2530         {
2531           fprintf(stderr, "malloc failure for journal files\n");
2532           return 6;
2533         }
2534         else 
2535         {
2536           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2537           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2538         }
2539       }
2540       break;
2542       case 'h':
2543       case '?':
2544         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2545             "\n"
2546             "Usage: rrdcached [options]\n"
2547             "\n"
2548             "Valid options are:\n"
2549             "  -l <address>  Socket address to listen to.\n"
2550             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2551             "  -w <seconds>  Interval in which to write data.\n"
2552             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2553             "  -f <seconds>  Interval in which to flush dead data.\n"
2554             "  -p <file>     Location of the PID-file.\n"
2555             "  -b <dir>      Base directory to change to.\n"
2556             "  -B            Restrict file access to paths within -b <dir>\n"
2557             "  -g            Do not fork and run in the foreground.\n"
2558             "  -j <dir>      Directory in which to create the journal files.\n"
2559             "  -F            Always flush all updates at shutdown\n"
2560             "\n"
2561             "For more information and a detailed description of all options "
2562             "please refer\n"
2563             "to the rrdcached(1) manual page.\n",
2564             VERSION);
2565         status = -1;
2566         break;
2567     } /* switch (option) */
2568   } /* while (getopt) */
2570   /* advise the user when values are not sane */
2571   if (config_flush_interval < 2 * config_write_interval)
2572     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2573             " 2x write interval (-w) !\n");
2574   if (config_write_jitter > config_write_interval)
2575     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2576             " write interval (-w) !\n");
2578   if (config_write_base_only && config_base_dir == NULL)
2579     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2580             "  Consult the rrdcached documentation\n");
2582   if (journal_cur == NULL)
2583     config_flush_at_shutdown = 1;
2585   return (status);
2586 } /* }}} int read_options */
2588 int main (int argc, char **argv)
2590   int status;
2592   status = read_options (argc, argv);
2593   if (status != 0)
2594   {
2595     if (status < 0)
2596       status = 0;
2597     return (status);
2598   }
2600   status = daemonize ();
2601   if (status == 1)
2602   {
2603     struct sigaction sigchld;
2605     memset (&sigchld, 0, sizeof (sigchld));
2606     sigchld.sa_handler = SIG_IGN;
2607     sigaction (SIGCHLD, &sigchld, NULL);
2609     return (0);
2610   }
2611   else if (status != 0)
2612   {
2613     fprintf (stderr, "daemonize failed, exiting.\n");
2614     return (1);
2615   }
2617   journal_init();
2619   /* start the queue thread */
2620   memset (&queue_thread, 0, sizeof (queue_thread));
2621   status = pthread_create (&queue_thread,
2622                            NULL, /* attr */
2623                            queue_thread_main,
2624                            NULL); /* args */
2625   if (status != 0)
2626   {
2627     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2628     cleanup();
2629     return (1);
2630   }
2632   listen_thread_main (NULL);
2633   cleanup ();
2635   return (0);
2636 } /* int main */
2638 /*
2639  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2640  */