Code

5c47ac1fd7b3908edce814a15631c1768e571442
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 typedef enum
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
119   /* state for BATCH processing */
120   time_t batch_start;
121   int batch_cmd;
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
150 struct callback_flush_data_s
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
159 enum queue_side_e
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(void) /* {{{ */
291   int fd;
292   char *file;
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
298   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
301             file, rrd_strerror(errno));
303   return(fd);
304 } /* }}} static int open_pidfile */
306 static int write_pidfile (int fd) /* {{{ */
308   pid_t pid;
309   FILE *fh;
311   pid = getpid ();
313   fh = fdopen (fd, "w");
314   if (fh == NULL)
315   {
316     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
317     close(fd);
318     return (-1);
319   }
321   fprintf (fh, "%i\n", (int) pid);
322   fclose (fh);
324   return (0);
325 } /* }}} int write_pidfile */
327 static int remove_pidfile (void) /* {{{ */
329   char *file;
330   int status;
332   file = (config_pid_file != NULL)
333     ? config_pid_file
334     : LOCALSTATEDIR "/run/rrdcached.pid";
336   status = unlink (file);
337   if (status == 0)
338     return (0);
339   return (errno);
340 } /* }}} int remove_pidfile */
342 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
344   char *eol;
346   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
347                sock->next_read - sock->next_cmd);
349   if (eol == NULL)
350   {
351     /* no commands left, move remainder back to front of rbuf */
352     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
353             sock->next_read - sock->next_cmd);
354     sock->next_read -= sock->next_cmd;
355     sock->next_cmd = 0;
356     *len = 0;
357     return NULL;
358   }
359   else
360   {
361     char *cmd = sock->rbuf + sock->next_cmd;
362     *eol = '\0';
364     sock->next_cmd = eol - sock->rbuf + 1;
366     if (eol > sock->rbuf && *(eol-1) == '\r')
367       *(--eol) = '\0'; /* handle "\r\n" EOL */
369     *len = eol - cmd;
371     return cmd;
372   }
374   /* NOTREACHED */
375   assert(1==0);
378 /* add the characters directly to the write buffer */
379 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
381   char *new_buf;
383   assert(sock != NULL);
385   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
386   if (new_buf == NULL)
387   {
388     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
389     return -1;
390   }
392   strncpy(new_buf + sock->wbuf_len, str, len + 1);
394   sock->wbuf = new_buf;
395   sock->wbuf_len += len;
397   return 0;
398 } /* }}} static int add_to_wbuf */
400 /* add the text to the "extra" info that's sent after the status line */
401 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
403   va_list argp;
404   char buffer[CMD_MAX];
405   int len;
407   if (sock == NULL) return 0; /* journal replay mode */
408   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
410   va_start(argp, fmt);
411 #ifdef HAVE_VSNPRINTF
412   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
413 #else
414   len = vsprintf(buffer, fmt, argp);
415 #endif
416   va_end(argp);
417   if (len < 0)
418   {
419     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
420     return -1;
421   }
423   return add_to_wbuf(sock, buffer, len);
424 } /* }}} static int add_response_info */
426 static int count_lines(char *str) /* {{{ */
428   int lines = 0;
430   if (str != NULL)
431   {
432     while ((str = strchr(str, '\n')) != NULL)
433     {
434       ++lines;
435       ++str;
436     }
437   }
439   return lines;
440 } /* }}} static int count_lines */
442 /* send the response back to the user.
443  * returns 0 on success, -1 on error
444  * write buffer is always zeroed after this call */
445 static int send_response (listen_socket_t *sock, response_code rc,
446                           char *fmt, ...) /* {{{ */
448   va_list argp;
449   char buffer[CMD_MAX];
450   int lines;
451   ssize_t wrote;
452   int rclen, len;
454   if (sock == NULL) return rc;  /* journal replay mode */
456   if (sock->batch_start)
457   {
458     if (rc == RESP_OK)
459       return rc; /* no response on success during BATCH */
460     lines = sock->batch_cmd;
461   }
462   else if (rc == RESP_OK)
463     lines = count_lines(sock->wbuf);
464   else
465     lines = -1;
467   rclen = sprintf(buffer, "%d ", lines);
468   va_start(argp, fmt);
469 #ifdef HAVE_VSNPRINTF
470   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
471 #else
472   len = vsprintf(buffer+rclen, fmt, argp);
473 #endif
474   va_end(argp);
475   if (len < 0)
476     return -1;
478   len += rclen;
480   /* append the result to the wbuf, don't write to the user */
481   if (sock->batch_start)
482     return add_to_wbuf(sock, buffer, len);
484   /* first write must be complete */
485   if (len != write(sock->fd, buffer, len))
486   {
487     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
488     return -1;
489   }
491   if (sock->wbuf != NULL && rc == RESP_OK)
492   {
493     wrote = 0;
494     while (wrote < sock->wbuf_len)
495     {
496       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
497       if (wb <= 0)
498       {
499         RRDD_LOG(LOG_INFO, "send_response: could not write results");
500         return -1;
501       }
502       wrote += wb;
503     }
504   }
506   free(sock->wbuf); sock->wbuf = NULL;
507   sock->wbuf_len = 0;
509   return 0;
510 } /* }}} */
512 static void wipe_ci_values(cache_item_t *ci, time_t when)
514   ci->values = NULL;
515   ci->values_num = 0;
517   ci->last_flush_time = when;
518   if (config_write_jitter > 0)
519     ci->last_flush_time += (random() % config_write_jitter);
522 /* remove_from_queue
523  * remove a "cache_item_t" item from the queue.
524  * must hold 'cache_lock' when calling this
525  */
526 static void remove_from_queue(cache_item_t *ci) /* {{{ */
528   if (ci == NULL) return;
530   if (ci->prev == NULL)
531     cache_queue_head = ci->next; /* reset head */
532   else
533     ci->prev->next = ci->next;
535   if (ci->next == NULL)
536     cache_queue_tail = ci->prev; /* reset the tail */
537   else
538     ci->next->prev = ci->prev;
540   ci->next = ci->prev = NULL;
541   ci->flags &= ~CI_FLAGS_IN_QUEUE;
542 } /* }}} static void remove_from_queue */
544 /* remove an entry from the tree and free all its resources.
545  * must hold 'cache lock' while calling this.
546  * returns 0 on success, otherwise errno */
547 static int forget_file(const char *file)
549   cache_item_t *ci;
551   ci = g_tree_lookup(cache_tree, file);
552   if (ci == NULL)
553     return ENOENT;
555   g_tree_remove (cache_tree, file);
556   remove_from_queue(ci);
558   for (int i=0; i < ci->values_num; i++)
559     free(ci->values[i]);
561   free (ci->values);
562   free (ci->file);
564   /* in case anyone is waiting */
565   pthread_cond_broadcast(&ci->flushed);
567   free (ci);
569   return 0;
570 } /* }}} static int forget_file */
572 /*
573  * enqueue_cache_item:
574  * `cache_lock' must be acquired before calling this function!
575  */
576 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
577     queue_side_t side)
579   if (ci == NULL)
580     return (-1);
582   if (ci->values_num == 0)
583     return (0);
585   if (side == HEAD)
586   {
587     if (cache_queue_head == ci)
588       return 0;
590     /* remove from the double linked list */
591     if (ci->flags & CI_FLAGS_IN_QUEUE)
592       remove_from_queue(ci);
594     ci->prev = NULL;
595     ci->next = cache_queue_head;
596     if (ci->next != NULL)
597       ci->next->prev = ci;
598     cache_queue_head = ci;
600     if (cache_queue_tail == NULL)
601       cache_queue_tail = cache_queue_head;
602   }
603   else /* (side == TAIL) */
604   {
605     /* We don't move values back in the list.. */
606     if (ci->flags & CI_FLAGS_IN_QUEUE)
607       return (0);
609     assert (ci->next == NULL);
610     assert (ci->prev == NULL);
612     ci->prev = cache_queue_tail;
614     if (cache_queue_tail == NULL)
615       cache_queue_head = ci;
616     else
617       cache_queue_tail->next = ci;
619     cache_queue_tail = ci;
620   }
622   ci->flags |= CI_FLAGS_IN_QUEUE;
624   pthread_cond_broadcast(&cache_cond);
625   pthread_mutex_lock (&stats_lock);
626   stats_queue_length++;
627   pthread_mutex_unlock (&stats_lock);
629   return (0);
630 } /* }}} int enqueue_cache_item */
632 /*
633  * tree_callback_flush:
634  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
635  * while this is in progress.
636  */
637 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
638     gpointer data)
640   cache_item_t *ci;
641   callback_flush_data_t *cfd;
643   ci = (cache_item_t *) value;
644   cfd = (callback_flush_data_t *) data;
646   if ((ci->last_flush_time <= cfd->abs_timeout)
647       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
648       && (ci->values_num > 0))
649   {
650     enqueue_cache_item (ci, TAIL);
651   }
652   else if ((do_shutdown != 0)
653       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
654       && (ci->values_num > 0))
655   {
656     enqueue_cache_item (ci, TAIL);
657   }
658   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
659       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
660       && (ci->values_num <= 0))
661   {
662     char **temp;
664     temp = (char **) realloc (cfd->keys,
665         sizeof (char *) * (cfd->keys_num + 1));
666     if (temp == NULL)
667     {
668       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
669       return (FALSE);
670     }
671     cfd->keys = temp;
672     /* Make really sure this points to the _same_ place */
673     assert ((char *) key == ci->file);
674     cfd->keys[cfd->keys_num] = (char *) key;
675     cfd->keys_num++;
676   }
678   return (FALSE);
679 } /* }}} gboolean tree_callback_flush */
681 static int flush_old_values (int max_age)
683   callback_flush_data_t cfd;
684   size_t k;
686   memset (&cfd, 0, sizeof (cfd));
687   /* Pass the current time as user data so that we don't need to call
688    * `time' for each node. */
689   cfd.now = time (NULL);
690   cfd.keys = NULL;
691   cfd.keys_num = 0;
693   if (max_age > 0)
694     cfd.abs_timeout = cfd.now - max_age;
695   else
696     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
698   /* `tree_callback_flush' will return the keys of all values that haven't
699    * been touched in the last `config_flush_interval' seconds in `cfd'.
700    * The char*'s in this array point to the same memory as ci->file, so we
701    * don't need to free them separately. */
702   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
704   for (k = 0; k < cfd.keys_num; k++)
705   {
706     /* should never fail, since we have held the cache_lock
707      * the entire time */
708     assert( forget_file(cfd.keys[k]) == 0 );
709   }
711   if (cfd.keys != NULL)
712   {
713     free (cfd.keys);
714     cfd.keys = NULL;
715   }
717   return (0);
718 } /* int flush_old_values */
720 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
722   struct timeval now;
723   struct timespec next_flush;
724   int final_flush = 0; /* make sure we only flush once on shutdown */
726   gettimeofday (&now, NULL);
727   next_flush.tv_sec = now.tv_sec + config_flush_interval;
728   next_flush.tv_nsec = 1000 * now.tv_usec;
730   pthread_mutex_lock (&cache_lock);
731   while ((do_shutdown == 0) || (cache_queue_head != NULL))
732   {
733     cache_item_t *ci;
734     char *file;
735     char **values;
736     int values_num;
737     int status;
738     int i;
740     /* First, check if it's time to do the cache flush. */
741     gettimeofday (&now, NULL);
742     if ((now.tv_sec > next_flush.tv_sec)
743         || ((now.tv_sec == next_flush.tv_sec)
744           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
745     {
746       /* Flush all values that haven't been written in the last
747        * `config_write_interval' seconds. */
748       flush_old_values (config_write_interval);
750       /* Determine the time of the next cache flush. */
751       next_flush.tv_sec =
752         now.tv_sec + next_flush.tv_sec % config_flush_interval;
754       /* unlock the cache while we rotate so we don't block incoming
755        * updates if the fsync() blocks on disk I/O */
756       pthread_mutex_unlock(&cache_lock);
757       journal_rotate();
758       pthread_mutex_lock(&cache_lock);
759     }
761     /* Now, check if there's something to store away. If not, wait until
762      * something comes in or it's time to do the cache flush.  if we are
763      * shutting down, do not wait around.  */
764     if (cache_queue_head == NULL && !do_shutdown)
765     {
766       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
767       if ((status != 0) && (status != ETIMEDOUT))
768       {
769         RRDD_LOG (LOG_ERR, "queue_thread_main: "
770             "pthread_cond_timedwait returned %i.", status);
771       }
772     }
774     /* We're about to shut down */
775     if (do_shutdown != 0 && !final_flush++)
776     {
777       if (config_flush_at_shutdown)
778         flush_old_values (-1); /* flush everything */
779       else
780         break;
781     }
783     /* Check if a value has arrived. This may be NULL if we timed out or there
784      * was an interrupt such as a signal. */
785     if (cache_queue_head == NULL)
786       continue;
788     ci = cache_queue_head;
790     /* copy the relevant parts */
791     file = strdup (ci->file);
792     if (file == NULL)
793     {
794       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
795       continue;
796     }
798     assert(ci->values != NULL);
799     assert(ci->values_num > 0);
801     values = ci->values;
802     values_num = ci->values_num;
804     wipe_ci_values(ci, time(NULL));
805     remove_from_queue(ci);
807     pthread_mutex_lock (&stats_lock);
808     assert (stats_queue_length > 0);
809     stats_queue_length--;
810     pthread_mutex_unlock (&stats_lock);
812     pthread_mutex_unlock (&cache_lock);
814     rrd_clear_error ();
815     status = rrd_update_r (file, NULL, values_num, (void *) values);
816     if (status != 0)
817     {
818       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
819           "rrd_update_r (%s) failed with status %i. (%s)",
820           file, status, rrd_get_error());
821     }
823     journal_write("wrote", file);
824     pthread_cond_broadcast(&ci->flushed);
826     for (i = 0; i < values_num; i++)
827       free (values[i]);
829     free(values);
830     free(file);
832     if (status == 0)
833     {
834       pthread_mutex_lock (&stats_lock);
835       stats_updates_written++;
836       stats_data_sets_written += values_num;
837       pthread_mutex_unlock (&stats_lock);
838     }
840     pthread_mutex_lock (&cache_lock);
842     /* We're about to shut down */
843     if (do_shutdown != 0 && !final_flush++)
844     {
845       if (config_flush_at_shutdown)
846           flush_old_values (-1); /* flush everything */
847       else
848         break;
849     }
850   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
851   pthread_mutex_unlock (&cache_lock);
853   if (config_flush_at_shutdown)
854   {
855     assert(cache_queue_head == NULL);
856     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
857   }
859   journal_done();
861   return (NULL);
862 } /* }}} void *queue_thread_main */
864 static int buffer_get_field (char **buffer_ret, /* {{{ */
865     size_t *buffer_size_ret, char **field_ret)
867   char *buffer;
868   size_t buffer_pos;
869   size_t buffer_size;
870   char *field;
871   size_t field_size;
872   int status;
874   buffer = *buffer_ret;
875   buffer_pos = 0;
876   buffer_size = *buffer_size_ret;
877   field = *buffer_ret;
878   field_size = 0;
880   if (buffer_size <= 0)
881     return (-1);
883   /* This is ensured by `handle_request'. */
884   assert (buffer[buffer_size - 1] == '\0');
886   status = -1;
887   while (buffer_pos < buffer_size)
888   {
889     /* Check for end-of-field or end-of-buffer */
890     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
891     {
892       field[field_size] = 0;
893       field_size++;
894       buffer_pos++;
895       status = 0;
896       break;
897     }
898     /* Handle escaped characters. */
899     else if (buffer[buffer_pos] == '\\')
900     {
901       if (buffer_pos >= (buffer_size - 1))
902         break;
903       buffer_pos++;
904       field[field_size] = buffer[buffer_pos];
905       field_size++;
906       buffer_pos++;
907     }
908     /* Normal operation */ 
909     else
910     {
911       field[field_size] = buffer[buffer_pos];
912       field_size++;
913       buffer_pos++;
914     }
915   } /* while (buffer_pos < buffer_size) */
917   if (status != 0)
918     return (status);
920   *buffer_ret = buffer + buffer_pos;
921   *buffer_size_ret = buffer_size - buffer_pos;
922   *field_ret = field;
924   return (0);
925 } /* }}} int buffer_get_field */
927 /* if we're restricting writes to the base directory,
928  * check whether the file falls within the dir
929  * returns 1 if OK, otherwise 0
930  */
931 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
933   assert(file != NULL);
935   if (!config_write_base_only
936       || sock == NULL /* journal replay */
937       || config_base_dir == NULL)
938     return 1;
940   if (strstr(file, "../") != NULL) goto err;
942   /* relative paths without "../" are ok */
943   if (*file != '/') return 1;
945   /* file must be of the format base + "/" + <1+ char filename> */
946   if (strlen(file) < _config_base_dir_len + 2) goto err;
947   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
948   if (*(file + _config_base_dir_len) != '/') goto err;
950   return 1;
952 err:
953   if (sock != NULL && sock->fd >= 0)
954     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
956   return 0;
957 } /* }}} static int check_file_access */
959 /* returns 1 if we have the required privilege level,
960  * otherwise issue an error to the user on sock */
961 static int has_privilege (listen_socket_t *sock, /* {{{ */
962                           socket_privilege priv)
964   if (sock == NULL) /* journal replay */
965     return 1;
967   if (sock->privilege >= priv)
968     return 1;
970   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
971 } /* }}} static int has_privilege */
973 static int flush_file (const char *filename) /* {{{ */
975   cache_item_t *ci;
977   pthread_mutex_lock (&cache_lock);
979   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
980   if (ci == NULL)
981   {
982     pthread_mutex_unlock (&cache_lock);
983     return (ENOENT);
984   }
986   if (ci->values_num > 0)
987   {
988     /* Enqueue at head */
989     enqueue_cache_item (ci, HEAD);
990     pthread_cond_wait(&ci->flushed, &cache_lock);
991   }
993   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
994    * may have been purged during our cond_wait() */
996   pthread_mutex_unlock(&cache_lock);
998   return (0);
999 } /* }}} int flush_file */
1001 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1002     char *buffer, size_t buffer_size)
1004   int status;
1005   char **help_text;
1006   char *command;
1008   char *help_help[2] =
1009   {
1010     "Command overview\n"
1011     ,
1012     "HELP [<command>]\n"
1013     "FLUSH <filename>\n"
1014     "FLUSHALL\n"
1015     "PENDING <filename>\n"
1016     "FORGET <filename>\n"
1017     "UPDATE <filename> <values> [<values> ...]\n"
1018     "BATCH\n"
1019     "STATS\n"
1020   };
1022   char *help_flush[2] =
1023   {
1024     "Help for FLUSH\n"
1025     ,
1026     "Usage: FLUSH <filename>\n"
1027     "\n"
1028     "Adds the given filename to the head of the update queue and returns\n"
1029     "after is has been dequeued.\n"
1030   };
1032   char *help_flushall[2] =
1033   {
1034     "Help for FLUSHALL\n"
1035     ,
1036     "Usage: FLUSHALL\n"
1037     "\n"
1038     "Triggers writing of all pending updates.  Returns immediately.\n"
1039   };
1041   char *help_pending[2] =
1042   {
1043     "Help for PENDING\n"
1044     ,
1045     "Usage: PENDING <filename>\n"
1046     "\n"
1047     "Shows any 'pending' updates for a file, in order.\n"
1048     "The updates shown have not yet been written to the underlying RRD file.\n"
1049   };
1051   char *help_forget[2] =
1052   {
1053     "Help for FORGET\n"
1054     ,
1055     "Usage: FORGET <filename>\n"
1056     "\n"
1057     "Removes the file completely from the cache.\n"
1058     "Any pending updates for the file will be lost.\n"
1059   };
1061   char *help_update[2] =
1062   {
1063     "Help for UPDATE\n"
1064     ,
1065     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1066     "\n"
1067     "Adds the given file to the internal cache if it is not yet known and\n"
1068     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1069     "for details.\n"
1070     "\n"
1071     "Each <values> has the following form:\n"
1072     "  <values> = <time>:<value>[:<value>[...]]\n"
1073     "See the rrdupdate(1) manpage for details.\n"
1074   };
1076   char *help_stats[2] =
1077   {
1078     "Help for STATS\n"
1079     ,
1080     "Usage: STATS\n"
1081     "\n"
1082     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1083     "a description of the values.\n"
1084   };
1086   char *help_batch[2] =
1087   {
1088     "Help for BATCH\n"
1089     ,
1090     "The 'BATCH' command permits the client to initiate a bulk load\n"
1091     "   of commands to rrdcached.\n"
1092     "\n"
1093     "Usage:\n"
1094     "\n"
1095     "    client: BATCH\n"
1096     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1097     "    client: command #1\n"
1098     "    client: command #2\n"
1099     "    client: ... and so on\n"
1100     "    client: .\n"
1101     "    server: 2 errors\n"
1102     "    server: 7 message for command #7\n"
1103     "    server: 9 message for command #9\n"
1104     "\n"
1105     "For more information, consult the rrdcached(1) documentation.\n"
1106   };
1108   status = buffer_get_field (&buffer, &buffer_size, &command);
1109   if (status != 0)
1110     help_text = help_help;
1111   else
1112   {
1113     if (strcasecmp (command, "update") == 0)
1114       help_text = help_update;
1115     else if (strcasecmp (command, "flush") == 0)
1116       help_text = help_flush;
1117     else if (strcasecmp (command, "flushall") == 0)
1118       help_text = help_flushall;
1119     else if (strcasecmp (command, "pending") == 0)
1120       help_text = help_pending;
1121     else if (strcasecmp (command, "forget") == 0)
1122       help_text = help_forget;
1123     else if (strcasecmp (command, "stats") == 0)
1124       help_text = help_stats;
1125     else if (strcasecmp (command, "batch") == 0)
1126       help_text = help_batch;
1127     else
1128       help_text = help_help;
1129   }
1131   add_response_info(sock, help_text[1]);
1132   return send_response(sock, RESP_OK, help_text[0]);
1133 } /* }}} int handle_request_help */
1135 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1137   uint64_t copy_queue_length;
1138   uint64_t copy_updates_received;
1139   uint64_t copy_flush_received;
1140   uint64_t copy_updates_written;
1141   uint64_t copy_data_sets_written;
1142   uint64_t copy_journal_bytes;
1143   uint64_t copy_journal_rotate;
1145   uint64_t tree_nodes_number;
1146   uint64_t tree_depth;
1148   pthread_mutex_lock (&stats_lock);
1149   copy_queue_length       = stats_queue_length;
1150   copy_updates_received   = stats_updates_received;
1151   copy_flush_received     = stats_flush_received;
1152   copy_updates_written    = stats_updates_written;
1153   copy_data_sets_written  = stats_data_sets_written;
1154   copy_journal_bytes      = stats_journal_bytes;
1155   copy_journal_rotate     = stats_journal_rotate;
1156   pthread_mutex_unlock (&stats_lock);
1158   pthread_mutex_lock (&cache_lock);
1159   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1161   pthread_mutex_unlock (&cache_lock);
1163   add_response_info(sock,
1164                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1165   add_response_info(sock,
1166                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167   add_response_info(sock,
1168                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169   add_response_info(sock,
1170                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171   add_response_info(sock,
1172                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1178   send_response(sock, RESP_OK, "Statistics follow\n");
1180   return (0);
1181 } /* }}} int handle_request_stats */
1183 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1184     char *buffer, size_t buffer_size)
1186   char *file;
1187   int status;
1189   status = buffer_get_field (&buffer, &buffer_size, &file);
1190   if (status != 0)
1191   {
1192     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1193   }
1194   else
1195   {
1196     pthread_mutex_lock(&stats_lock);
1197     stats_flush_received++;
1198     pthread_mutex_unlock(&stats_lock);
1200     if (!check_file_access(file, sock)) return 0;
1202     status = flush_file (file);
1203     if (status == 0)
1204       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205     else if (status == ENOENT)
1206     {
1207       /* no file in our tree; see whether it exists at all */
1208       struct stat statbuf;
1210       memset(&statbuf, 0, sizeof(statbuf));
1211       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1213       else
1214         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1215     }
1216     else if (status < 0)
1217       return send_response(sock, RESP_ERR, "Internal error.\n");
1218     else
1219       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1220   }
1222   /* NOTREACHED */
1223   assert(1==0);
1224 } /* }}} int handle_request_flush */
1226 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1228   int status;
1230   status = has_privilege(sock, PRIV_HIGH);
1231   if (status <= 0)
1232     return status;
1234   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1236   pthread_mutex_lock(&cache_lock);
1237   flush_old_values(-1);
1238   pthread_mutex_unlock(&cache_lock);
1240   return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1243 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1244                                   char *buffer, size_t buffer_size)
1246   int status;
1247   char *file;
1248   cache_item_t *ci;
1250   status = buffer_get_field(&buffer, &buffer_size, &file);
1251   if (status != 0)
1252     return send_response(sock, RESP_ERR,
1253                          "Usage: PENDING <filename>\n");
1255   status = has_privilege(sock, PRIV_HIGH);
1256   if (status <= 0)
1257     return status;
1259   pthread_mutex_lock(&cache_lock);
1260   ci = g_tree_lookup(cache_tree, file);
1261   if (ci == NULL)
1262   {
1263     pthread_mutex_unlock(&cache_lock);
1264     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265   }
1267   for (int i=0; i < ci->values_num; i++)
1268     add_response_info(sock, "%s\n", ci->values[i]);
1270   pthread_mutex_unlock(&cache_lock);
1271   return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1274 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1275                                  char *buffer, size_t buffer_size)
1277   int status;
1278   char *file;
1280   status = buffer_get_field(&buffer, &buffer_size, &file);
1281   if (status != 0)
1282     return send_response(sock, RESP_ERR,
1283                          "Usage: FORGET <filename>\n");
1285   status = has_privilege(sock, PRIV_HIGH);
1286   if (status <= 0)
1287     return status;
1289   if (!check_file_access(file, sock)) return 0;
1291   pthread_mutex_lock(&cache_lock);
1292   status = forget_file(file);
1293   pthread_mutex_unlock(&cache_lock);
1295   if (status == 0)
1296   {
1297     if (sock != NULL)
1298       journal_write("forget", file);
1300     return send_response(sock, RESP_OK, "Gone!\n");
1301   }
1302   else
1303     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1304                          status < 0 ? "Internal error" : rrd_strerror(status));
1306   /* NOTREACHED */
1307   assert(1==0);
1308 } /* }}} static int handle_request_forget */
1310 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1311                                   time_t now,
1312                                   char *buffer, size_t buffer_size)
1314   char *file;
1315   int values_num = 0;
1316   int bad_timestamps = 0;
1317   int status;
1318   char orig_buf[CMD_MAX];
1320   cache_item_t *ci;
1322   status = has_privilege(sock, PRIV_HIGH);
1323   if (status <= 0)
1324     return status;
1326   /* save it for the journal later */
1327   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return send_response(sock, RESP_ERR,
1332                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1334   pthread_mutex_lock(&stats_lock);
1335   stats_updates_received++;
1336   pthread_mutex_unlock(&stats_lock);
1338   if (!check_file_access(file, sock)) return 0;
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1347     /* don't hold the lock while we setup; stat(2) might block */
1348     pthread_mutex_unlock(&cache_lock);
1350     memset (&statbuf, 0, sizeof (statbuf));
1351     status = stat (file, &statbuf);
1352     if (status != 0)
1353     {
1354       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1356       status = errno;
1357       if (status == ENOENT)
1358         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1359       else
1360         return send_response(sock, RESP_ERR,
1361                              "stat failed with error %i.\n", status);
1362     }
1363     if (!S_ISREG (statbuf.st_mode))
1364       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1366     if (access(file, R_OK|W_OK) != 0)
1367       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1368                            file, rrd_strerror(errno));
1370     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1371     if (ci == NULL)
1372     {
1373       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1375       return send_response(sock, RESP_ERR, "malloc failed.\n");
1376     }
1377     memset (ci, 0, sizeof (cache_item_t));
1379     ci->file = strdup (file);
1380     if (ci->file == NULL)
1381     {
1382       free (ci);
1383       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1385       return send_response(sock, RESP_ERR, "strdup failed.\n");
1386     }
1388     wipe_ci_values(ci, now);
1389     ci->flags = CI_FLAGS_IN_TREE;
1391     pthread_mutex_lock(&cache_lock);
1392     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1393   } /* }}} */
1394   assert (ci != NULL);
1396   /* don't re-write updates in replay mode */
1397   if (sock != NULL)
1398     journal_write("update", orig_buf);
1400   while (buffer_size > 0)
1401   {
1402     char **temp;
1403     char *value;
1404     time_t stamp;
1405     char *eostamp;
1407     status = buffer_get_field (&buffer, &buffer_size, &value);
1408     if (status != 0)
1409     {
1410       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1411       break;
1412     }
1414     /* make sure update time is always moving forward */
1415     stamp = strtol(value, &eostamp, 10);
1416     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1417     {
1418       ++bad_timestamps;
1419       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1420       continue;
1421     }
1422     else if (stamp <= ci->last_update_stamp)
1423     {
1424       ++bad_timestamps;
1425       add_response_info(sock,
1426                         "illegal attempt to update using time %ld when"
1427                         " last update time is %ld (minimum one second step)\n",
1428                         stamp, ci->last_update_stamp);
1429       continue;
1430     }
1431     else
1432       ci->last_update_stamp = stamp;
1434     temp = (char **) realloc (ci->values,
1435         sizeof (char *) * (ci->values_num + 1));
1436     if (temp == NULL)
1437     {
1438       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1439       continue;
1440     }
1441     ci->values = temp;
1443     ci->values[ci->values_num] = strdup (value);
1444     if (ci->values[ci->values_num] == NULL)
1445     {
1446       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1447       continue;
1448     }
1449     ci->values_num++;
1451     values_num++;
1452   }
1454   if (((now - ci->last_flush_time) >= config_write_interval)
1455       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1456       && (ci->values_num > 0))
1457   {
1458     enqueue_cache_item (ci, TAIL);
1459   }
1461   pthread_mutex_unlock (&cache_lock);
1463   if (values_num < 1)
1464   {
1465     /* if we had only one update attempt, then return the full
1466        error message... try to get the most information out
1467        of the limited error space allowed by the protocol
1468     */
1469     if (bad_timestamps == 1)
1470       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1471     else
1472       return send_response(sock, RESP_ERR,
1473                            "No values updated (%d bad timestamps).\n",
1474                            bad_timestamps);
1475   }
1476   else
1477     return send_response(sock, RESP_OK,
1478                          "errors, enqueued %i value(s).\n", values_num);
1480   /* NOTREACHED */
1481   assert(1==0);
1483 } /* }}} int handle_request_update */
1485 /* we came across a "WROTE" entry during journal replay.
1486  * throw away any values that we have accumulated for this file
1487  */
1488 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1490   int i;
1491   cache_item_t *ci;
1492   const char *file = buffer;
1494   pthread_mutex_lock(&cache_lock);
1496   ci = g_tree_lookup(cache_tree, file);
1497   if (ci == NULL)
1498   {
1499     pthread_mutex_unlock(&cache_lock);
1500     return (0);
1501   }
1503   if (ci->values)
1504   {
1505     for (i=0; i < ci->values_num; i++)
1506       free(ci->values[i]);
1508     free(ci->values);
1509   }
1511   wipe_ci_values(ci, now);
1512   remove_from_queue(ci);
1514   pthread_mutex_unlock(&cache_lock);
1515   return (0);
1516 } /* }}} int handle_request_wrote */
1518 /* start "BATCH" processing */
1519 static int batch_start (listen_socket_t *sock) /* {{{ */
1521   int status;
1522   if (sock->batch_start)
1523     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1525   status = send_response(sock, RESP_OK,
1526                          "Go ahead.  End with dot '.' on its own line.\n");
1527   sock->batch_start = time(NULL);
1528   sock->batch_cmd = 0;
1530   return status;
1531 } /* }}} static int batch_start */
1533 /* finish "BATCH" processing and return results to the client */
1534 static int batch_done (listen_socket_t *sock) /* {{{ */
1536   assert(sock->batch_start);
1537   sock->batch_start = 0;
1538   sock->batch_cmd  = 0;
1539   return send_response(sock, RESP_OK, "errors\n");
1540 } /* }}} static int batch_done */
1542 /* if sock==NULL, we are in journal replay mode */
1543 static int handle_request (listen_socket_t *sock, /* {{{ */
1544                            time_t now,
1545                            char *buffer, size_t buffer_size)
1547   char *buffer_ptr;
1548   char *command;
1549   int status;
1551   assert (buffer[buffer_size - 1] == '\0');
1553   buffer_ptr = buffer;
1554   command = NULL;
1555   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1556   if (status != 0)
1557   {
1558     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1559     return (-1);
1560   }
1562   if (sock != NULL && sock->batch_start)
1563     sock->batch_cmd++;
1565   if (strcasecmp (command, "update") == 0)
1566     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1567   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1568   {
1569     /* this is only valid in replay mode */
1570     return (handle_request_wrote (buffer_ptr, now));
1571   }
1572   else if (strcasecmp (command, "flush") == 0)
1573     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1574   else if (strcasecmp (command, "flushall") == 0)
1575     return (handle_request_flushall(sock));
1576   else if (strcasecmp (command, "pending") == 0)
1577     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1578   else if (strcasecmp (command, "forget") == 0)
1579     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1580   else if (strcasecmp (command, "stats") == 0)
1581     return (handle_request_stats (sock));
1582   else if (strcasecmp (command, "help") == 0)
1583     return (handle_request_help (sock, buffer_ptr, buffer_size));
1584   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1585     return batch_start(sock);
1586   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1587     return batch_done(sock);
1588   else
1589     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1591   /* NOTREACHED */
1592   assert(1==0);
1593 } /* }}} int handle_request */
1595 /* MUST NOT hold journal_lock before calling this */
1596 static void journal_rotate(void) /* {{{ */
1598   FILE *old_fh = NULL;
1599   int new_fd;
1601   if (journal_cur == NULL || journal_old == NULL)
1602     return;
1604   pthread_mutex_lock(&journal_lock);
1606   /* we rotate this way (rename before close) so that the we can release
1607    * the journal lock as fast as possible.  Journal writes to the new
1608    * journal can proceed immediately after the new file is opened.  The
1609    * fclose can then block without affecting new updates.
1610    */
1611   if (journal_fh != NULL)
1612   {
1613     old_fh = journal_fh;
1614     journal_fh = NULL;
1615     rename(journal_cur, journal_old);
1616     ++stats_journal_rotate;
1617   }
1619   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1620                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1621   if (new_fd >= 0)
1622   {
1623     journal_fh = fdopen(new_fd, "a");
1624     if (journal_fh == NULL)
1625       close(new_fd);
1626   }
1628   pthread_mutex_unlock(&journal_lock);
1630   if (old_fh != NULL)
1631     fclose(old_fh);
1633   if (journal_fh == NULL)
1634   {
1635     RRDD_LOG(LOG_CRIT,
1636              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1637              journal_cur, rrd_strerror(errno));
1639     RRDD_LOG(LOG_ERR,
1640              "JOURNALING DISABLED: All values will be flushed at shutdown");
1641     config_flush_at_shutdown = 1;
1642   }
1644 } /* }}} static void journal_rotate */
1646 static void journal_done(void) /* {{{ */
1648   if (journal_cur == NULL)
1649     return;
1651   pthread_mutex_lock(&journal_lock);
1652   if (journal_fh != NULL)
1653   {
1654     fclose(journal_fh);
1655     journal_fh = NULL;
1656   }
1658   if (config_flush_at_shutdown)
1659   {
1660     RRDD_LOG(LOG_INFO, "removing journals");
1661     unlink(journal_old);
1662     unlink(journal_cur);
1663   }
1664   else
1665   {
1666     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1667              "journals will be used at next startup");
1668   }
1670   pthread_mutex_unlock(&journal_lock);
1672 } /* }}} static void journal_done */
1674 static int journal_write(char *cmd, char *args) /* {{{ */
1676   int chars;
1678   if (journal_fh == NULL)
1679     return 0;
1681   pthread_mutex_lock(&journal_lock);
1682   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1683   pthread_mutex_unlock(&journal_lock);
1685   if (chars > 0)
1686   {
1687     pthread_mutex_lock(&stats_lock);
1688     stats_journal_bytes += chars;
1689     pthread_mutex_unlock(&stats_lock);
1690   }
1692   return chars;
1693 } /* }}} static int journal_write */
1695 static int journal_replay (const char *file) /* {{{ */
1697   FILE *fh;
1698   int entry_cnt = 0;
1699   int fail_cnt = 0;
1700   uint64_t line = 0;
1701   char entry[CMD_MAX];
1702   time_t now;
1704   if (file == NULL) return 0;
1706   {
1707     char *reason;
1708     int status = 0;
1709     struct stat statbuf;
1711     memset(&statbuf, 0, sizeof(statbuf));
1712     if (stat(file, &statbuf) != 0)
1713     {
1714       if (errno == ENOENT)
1715         return 0;
1717       reason = "stat error";
1718       status = errno;
1719     }
1720     else if (!S_ISREG(statbuf.st_mode))
1721     {
1722       reason = "not a regular file";
1723       status = EPERM;
1724     }
1725     if (statbuf.st_uid != daemon_uid)
1726     {
1727       reason = "not owned by daemon user";
1728       status = EACCES;
1729     }
1730     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1731     {
1732       reason = "must not be user/group writable";
1733       status = EACCES;
1734     }
1736     if (status != 0)
1737     {
1738       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1739                file, rrd_strerror(status), reason);
1740       return 0;
1741     }
1742   }
1744   fh = fopen(file, "r");
1745   if (fh == NULL)
1746   {
1747     if (errno != ENOENT)
1748       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1749                file, rrd_strerror(errno));
1750     return 0;
1751   }
1752   else
1753     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1755   now = time(NULL);
1757   while(!feof(fh))
1758   {
1759     size_t entry_len;
1761     ++line;
1762     if (fgets(entry, sizeof(entry), fh) == NULL)
1763       break;
1764     entry_len = strlen(entry);
1766     /* check \n termination in case journal writing crashed mid-line */
1767     if (entry_len == 0)
1768       continue;
1769     else if (entry[entry_len - 1] != '\n')
1770     {
1771       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1772       ++fail_cnt;
1773       continue;
1774     }
1776     entry[entry_len - 1] = '\0';
1778     if (handle_request(NULL, now, entry, entry_len) == 0)
1779       ++entry_cnt;
1780     else
1781       ++fail_cnt;
1782   }
1784   fclose(fh);
1786   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1787            entry_cnt, fail_cnt);
1789   return entry_cnt > 0 ? 1 : 0;
1790 } /* }}} static int journal_replay */
1792 static void journal_init(void) /* {{{ */
1794   int had_journal = 0;
1796   if (journal_cur == NULL) return;
1798   pthread_mutex_lock(&journal_lock);
1800   RRDD_LOG(LOG_INFO, "checking for journal files");
1802   had_journal += journal_replay(journal_old);
1803   had_journal += journal_replay(journal_cur);
1805   /* it must have been a crash.  start a flush */
1806   if (had_journal && config_flush_at_shutdown)
1807     flush_old_values(-1);
1809   pthread_mutex_unlock(&journal_lock);
1810   journal_rotate();
1812   RRDD_LOG(LOG_INFO, "journal processing complete");
1814 } /* }}} static void journal_init */
1816 static void close_connection(listen_socket_t *sock)
1818   close(sock->fd) ;  sock->fd   = -1;
1819   free(sock->rbuf);  sock->rbuf = NULL;
1820   free(sock->wbuf);  sock->wbuf = NULL;
1822   free(sock);
1825 static void *connection_thread_main (void *args) /* {{{ */
1827   pthread_t self;
1828   listen_socket_t *sock;
1829   int i;
1830   int fd;
1832   sock = (listen_socket_t *) args;
1833   fd = sock->fd;
1835   /* init read buffers */
1836   sock->next_read = sock->next_cmd = 0;
1837   sock->rbuf = malloc(RBUF_SIZE);
1838   if (sock->rbuf == NULL)
1839   {
1840     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1841     close_connection(sock);
1842     return NULL;
1843   }
1845   pthread_mutex_lock (&connection_threads_lock);
1846   {
1847     pthread_t *temp;
1849     temp = (pthread_t *) realloc (connection_threads,
1850         sizeof (pthread_t) * (connection_threads_num + 1));
1851     if (temp == NULL)
1852     {
1853       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1854     }
1855     else
1856     {
1857       connection_threads = temp;
1858       connection_threads[connection_threads_num] = pthread_self ();
1859       connection_threads_num++;
1860     }
1861   }
1862   pthread_mutex_unlock (&connection_threads_lock);
1864   while (do_shutdown == 0)
1865   {
1866     char *cmd;
1867     ssize_t cmd_len;
1868     ssize_t rbytes;
1869     time_t now;
1871     struct pollfd pollfd;
1872     int status;
1874     pollfd.fd = fd;
1875     pollfd.events = POLLIN | POLLPRI;
1876     pollfd.revents = 0;
1878     status = poll (&pollfd, 1, /* timeout = */ 500);
1879     if (do_shutdown)
1880       break;
1881     else if (status == 0) /* timeout */
1882       continue;
1883     else if (status < 0) /* error */
1884     {
1885       status = errno;
1886       if (status != EINTR)
1887         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1888       continue;
1889     }
1891     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1892       break;
1893     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1894     {
1895       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1896           "poll(2) returned something unexpected: %#04hx",
1897           pollfd.revents);
1898       break;
1899     }
1901     rbytes = read(fd, sock->rbuf + sock->next_read,
1902                   RBUF_SIZE - sock->next_read);
1903     if (rbytes < 0)
1904     {
1905       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1906       break;
1907     }
1908     else if (rbytes == 0)
1909       break; /* eof */
1911     sock->next_read += rbytes;
1913     if (sock->batch_start)
1914       now = sock->batch_start;
1915     else
1916       now = time(NULL);
1918     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1919     {
1920       status = handle_request (sock, now, cmd, cmd_len+1);
1921       if (status != 0)
1922         goto out_close;
1923     }
1924   }
1926 out_close:
1927   close_connection(sock);
1929   self = pthread_self ();
1930   /* Remove this thread from the connection threads list */
1931   pthread_mutex_lock (&connection_threads_lock);
1932   /* Find out own index in the array */
1933   for (i = 0; i < connection_threads_num; i++)
1934     if (pthread_equal (connection_threads[i], self) != 0)
1935       break;
1936   assert (i < connection_threads_num);
1938   /* Move the trailing threads forward. */
1939   if (i < (connection_threads_num - 1))
1940   {
1941     memmove (connection_threads + i,
1942         connection_threads + i + 1,
1943         sizeof (pthread_t) * (connection_threads_num - i - 1));
1944   }
1946   connection_threads_num--;
1947   pthread_mutex_unlock (&connection_threads_lock);
1949   return (NULL);
1950 } /* }}} void *connection_thread_main */
1952 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1954   int fd;
1955   struct sockaddr_un sa;
1956   listen_socket_t *temp;
1957   int status;
1958   const char *path;
1960   path = sock->addr;
1961   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1962     path += strlen("unix:");
1964   temp = (listen_socket_t *) realloc (listen_fds,
1965       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1966   if (temp == NULL)
1967   {
1968     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1969     return (-1);
1970   }
1971   listen_fds = temp;
1972   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1974   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1975   if (fd < 0)
1976   {
1977     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1978     return (-1);
1979   }
1981   memset (&sa, 0, sizeof (sa));
1982   sa.sun_family = AF_UNIX;
1983   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1985   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1986   if (status != 0)
1987   {
1988     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1989     close (fd);
1990     unlink (path);
1991     return (-1);
1992   }
1994   status = listen (fd, /* backlog = */ 10);
1995   if (status != 0)
1996   {
1997     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1998     close (fd);
1999     unlink (path);
2000     return (-1);
2001   }
2003   listen_fds[listen_fds_num].fd = fd;
2004   listen_fds[listen_fds_num].family = PF_UNIX;
2005   strncpy(listen_fds[listen_fds_num].addr, path,
2006           sizeof (listen_fds[listen_fds_num].addr) - 1);
2007   listen_fds_num++;
2009   return (0);
2010 } /* }}} int open_listen_socket_unix */
2012 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2014   struct addrinfo ai_hints;
2015   struct addrinfo *ai_res;
2016   struct addrinfo *ai_ptr;
2017   char addr_copy[NI_MAXHOST];
2018   char *addr;
2019   char *port;
2020   int status;
2022   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2023   addr_copy[sizeof (addr_copy) - 1] = 0;
2024   addr = addr_copy;
2026   memset (&ai_hints, 0, sizeof (ai_hints));
2027   ai_hints.ai_flags = 0;
2028 #ifdef AI_ADDRCONFIG
2029   ai_hints.ai_flags |= AI_ADDRCONFIG;
2030 #endif
2031   ai_hints.ai_family = AF_UNSPEC;
2032   ai_hints.ai_socktype = SOCK_STREAM;
2034   port = NULL;
2035   if (*addr == '[') /* IPv6+port format */
2036   {
2037     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2038     addr++;
2040     port = strchr (addr, ']');
2041     if (port == NULL)
2042     {
2043       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2044           sock->addr);
2045       return (-1);
2046     }
2047     *port = 0;
2048     port++;
2050     if (*port == ':')
2051       port++;
2052     else if (*port == 0)
2053       port = NULL;
2054     else
2055     {
2056       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2057           port);
2058       return (-1);
2059     }
2060   } /* if (*addr = ']') */
2061   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2062   {
2063     port = rindex(addr, ':');
2064     if (port != NULL)
2065     {
2066       *port = 0;
2067       port++;
2068     }
2069   }
2070   ai_res = NULL;
2071   status = getaddrinfo (addr,
2072                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2073                         &ai_hints, &ai_res);
2074   if (status != 0)
2075   {
2076     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2077         "%s", addr, gai_strerror (status));
2078     return (-1);
2079   }
2081   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2082   {
2083     int fd;
2084     listen_socket_t *temp;
2085     int one = 1;
2087     temp = (listen_socket_t *) realloc (listen_fds,
2088         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2089     if (temp == NULL)
2090     {
2091       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2092       continue;
2093     }
2094     listen_fds = temp;
2095     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2097     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2098     if (fd < 0)
2099     {
2100       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2101       continue;
2102     }
2104     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2106     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2107     if (status != 0)
2108     {
2109       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2110       close (fd);
2111       continue;
2112     }
2114     status = listen (fd, /* backlog = */ 10);
2115     if (status != 0)
2116     {
2117       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2118       close (fd);
2119       return (-1);
2120     }
2122     listen_fds[listen_fds_num].fd = fd;
2123     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2124     listen_fds_num++;
2125   } /* for (ai_ptr) */
2127   return (0);
2128 } /* }}} static int open_listen_socket_network */
2130 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2132   assert(sock != NULL);
2133   assert(sock->addr != NULL);
2135   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2136       || sock->addr[0] == '/')
2137     return (open_listen_socket_unix(sock));
2138   else
2139     return (open_listen_socket_network(sock));
2140 } /* }}} int open_listen_socket */
2142 static int close_listen_sockets (void) /* {{{ */
2144   size_t i;
2146   for (i = 0; i < listen_fds_num; i++)
2147   {
2148     close (listen_fds[i].fd);
2150     if (listen_fds[i].family == PF_UNIX)
2151       unlink(listen_fds[i].addr);
2152   }
2154   free (listen_fds);
2155   listen_fds = NULL;
2156   listen_fds_num = 0;
2158   return (0);
2159 } /* }}} int close_listen_sockets */
2161 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2163   struct pollfd *pollfds;
2164   int pollfds_num;
2165   int status;
2166   int i;
2168   for (i = 0; i < config_listen_address_list_len; i++)
2169     open_listen_socket (config_listen_address_list[i]);
2171   if (config_listen_address_list_len < 1)
2172   {
2173     listen_socket_t sock;
2174     memset(&sock, 0, sizeof(sock));
2175     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2176     open_listen_socket (&sock);
2177   }
2179   if (listen_fds_num < 1)
2180   {
2181     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2182         "could be opened. Sorry.");
2183     return (NULL);
2184   }
2186   pollfds_num = listen_fds_num;
2187   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2188   if (pollfds == NULL)
2189   {
2190     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2191     return (NULL);
2192   }
2193   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2195   RRDD_LOG(LOG_INFO, "listening for connections");
2197   while (do_shutdown == 0)
2198   {
2199     assert (pollfds_num == ((int) listen_fds_num));
2200     for (i = 0; i < pollfds_num; i++)
2201     {
2202       pollfds[i].fd = listen_fds[i].fd;
2203       pollfds[i].events = POLLIN | POLLPRI;
2204       pollfds[i].revents = 0;
2205     }
2207     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2208     if (do_shutdown)
2209       break;
2210     else if (status == 0) /* timeout */
2211       continue;
2212     else if (status < 0) /* error */
2213     {
2214       status = errno;
2215       if (status != EINTR)
2216       {
2217         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2218       }
2219       continue;
2220     }
2222     for (i = 0; i < pollfds_num; i++)
2223     {
2224       listen_socket_t *client_sock;
2225       struct sockaddr_storage client_sa;
2226       socklen_t client_sa_size;
2227       pthread_t tid;
2228       pthread_attr_t attr;
2230       if (pollfds[i].revents == 0)
2231         continue;
2233       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2234       {
2235         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2236             "poll(2) returned something unexpected for listen FD #%i.",
2237             pollfds[i].fd);
2238         continue;
2239       }
2241       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2242       if (client_sock == NULL)
2243       {
2244         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2245         continue;
2246       }
2247       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2249       client_sa_size = sizeof (client_sa);
2250       client_sock->fd = accept (pollfds[i].fd,
2251           (struct sockaddr *) &client_sa, &client_sa_size);
2252       if (client_sock->fd < 0)
2253       {
2254         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2255         free(client_sock);
2256         continue;
2257       }
2259       pthread_attr_init (&attr);
2260       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2262       status = pthread_create (&tid, &attr, connection_thread_main,
2263                                client_sock);
2264       if (status != 0)
2265       {
2266         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2267         close_connection(client_sock);
2268         continue;
2269       }
2270     } /* for (pollfds_num) */
2271   } /* while (do_shutdown == 0) */
2273   RRDD_LOG(LOG_INFO, "starting shutdown");
2275   close_listen_sockets ();
2277   pthread_mutex_lock (&connection_threads_lock);
2278   while (connection_threads_num > 0)
2279   {
2280     pthread_t wait_for;
2282     wait_for = connection_threads[0];
2284     pthread_mutex_unlock (&connection_threads_lock);
2285     pthread_join (wait_for, /* retval = */ NULL);
2286     pthread_mutex_lock (&connection_threads_lock);
2287   }
2288   pthread_mutex_unlock (&connection_threads_lock);
2290   return (NULL);
2291 } /* }}} void *listen_thread_main */
2293 static int daemonize (void) /* {{{ */
2295   int status;
2296   int fd;
2297   char *base_dir;
2299   daemon_uid = geteuid();
2301   fd = open_pidfile();
2302   if (fd < 0) return fd;
2304   if (!stay_foreground)
2305   {
2306     pid_t child;
2308     child = fork ();
2309     if (child < 0)
2310     {
2311       fprintf (stderr, "daemonize: fork(2) failed.\n");
2312       return (-1);
2313     }
2314     else if (child > 0)
2315     {
2316       return (1);
2317     }
2319     /* Become session leader */
2320     setsid ();
2322     /* Open the first three file descriptors to /dev/null */
2323     close (2);
2324     close (1);
2325     close (0);
2327     open ("/dev/null", O_RDWR);
2328     dup (0);
2329     dup (0);
2330   } /* if (!stay_foreground) */
2332   /* Change into the /tmp directory. */
2333   base_dir = (config_base_dir != NULL)
2334     ? config_base_dir
2335     : "/tmp";
2336   status = chdir (base_dir);
2337   if (status != 0)
2338   {
2339     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2340     return (-1);
2341   }
2343   install_signal_handlers();
2345   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2346   RRDD_LOG(LOG_INFO, "starting up");
2348   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2349   if (cache_tree == NULL)
2350   {
2351     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2352     return (-1);
2353   }
2355   status = write_pidfile (fd);
2356   return status;
2357 } /* }}} int daemonize */
2359 static int cleanup (void) /* {{{ */
2361   do_shutdown++;
2363   pthread_cond_signal (&cache_cond);
2364   pthread_join (queue_thread, /* return = */ NULL);
2366   remove_pidfile ();
2368   RRDD_LOG(LOG_INFO, "goodbye");
2369   closelog ();
2371   return (0);
2372 } /* }}} int cleanup */
2374 static int read_options (int argc, char **argv) /* {{{ */
2376   int option;
2377   int status = 0;
2379   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2380   {
2381     switch (option)
2382     {
2383       case 'g':
2384         stay_foreground=1;
2385         break;
2387       case 'L':
2388       case 'l':
2389       {
2390         listen_socket_t **temp;
2391         listen_socket_t *new;
2393         new = malloc(sizeof(listen_socket_t));
2394         if (new == NULL)
2395         {
2396           fprintf(stderr, "read_options: malloc failed.\n");
2397           return(2);
2398         }
2399         memset(new, 0, sizeof(listen_socket_t));
2401         temp = (listen_socket_t **) realloc (config_listen_address_list,
2402             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2403         if (temp == NULL)
2404         {
2405           fprintf (stderr, "read_options: realloc failed.\n");
2406           return (2);
2407         }
2408         config_listen_address_list = temp;
2410         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2411         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2413         temp[config_listen_address_list_len] = new;
2414         config_listen_address_list_len++;
2415       }
2416       break;
2418       case 'f':
2419       {
2420         int temp;
2422         temp = atoi (optarg);
2423         if (temp > 0)
2424           config_flush_interval = temp;
2425         else
2426         {
2427           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2428           status = 3;
2429         }
2430       }
2431       break;
2433       case 'w':
2434       {
2435         int temp;
2437         temp = atoi (optarg);
2438         if (temp > 0)
2439           config_write_interval = temp;
2440         else
2441         {
2442           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2443           status = 2;
2444         }
2445       }
2446       break;
2448       case 'z':
2449       {
2450         int temp;
2452         temp = atoi(optarg);
2453         if (temp > 0)
2454           config_write_jitter = temp;
2455         else
2456         {
2457           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2458           status = 2;
2459         }
2461         break;
2462       }
2464       case 'B':
2465         config_write_base_only = 1;
2466         break;
2468       case 'b':
2469       {
2470         size_t len;
2472         if (config_base_dir != NULL)
2473           free (config_base_dir);
2474         config_base_dir = strdup (optarg);
2475         if (config_base_dir == NULL)
2476         {
2477           fprintf (stderr, "read_options: strdup failed.\n");
2478           return (3);
2479         }
2481         len = strlen (config_base_dir);
2482         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2483         {
2484           config_base_dir[len - 1] = 0;
2485           len--;
2486         }
2488         if (len < 1)
2489         {
2490           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2491           return (4);
2492         }
2494         _config_base_dir_len = len;
2495       }
2496       break;
2498       case 'p':
2499       {
2500         if (config_pid_file != NULL)
2501           free (config_pid_file);
2502         config_pid_file = strdup (optarg);
2503         if (config_pid_file == NULL)
2504         {
2505           fprintf (stderr, "read_options: strdup failed.\n");
2506           return (3);
2507         }
2508       }
2509       break;
2511       case 'F':
2512         config_flush_at_shutdown = 1;
2513         break;
2515       case 'j':
2516       {
2517         struct stat statbuf;
2518         const char *dir = optarg;
2520         status = stat(dir, &statbuf);
2521         if (status != 0)
2522         {
2523           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2524           return 6;
2525         }
2527         if (!S_ISDIR(statbuf.st_mode)
2528             || access(dir, R_OK|W_OK|X_OK) != 0)
2529         {
2530           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2531                   errno ? rrd_strerror(errno) : "");
2532           return 6;
2533         }
2535         journal_cur = malloc(PATH_MAX + 1);
2536         journal_old = malloc(PATH_MAX + 1);
2537         if (journal_cur == NULL || journal_old == NULL)
2538         {
2539           fprintf(stderr, "malloc failure for journal files\n");
2540           return 6;
2541         }
2542         else 
2543         {
2544           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2545           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2546         }
2547       }
2548       break;
2550       case 'h':
2551       case '?':
2552         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2553             "\n"
2554             "Usage: rrdcached [options]\n"
2555             "\n"
2556             "Valid options are:\n"
2557             "  -l <address>  Socket address to listen to.\n"
2558             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2559             "  -w <seconds>  Interval in which to write data.\n"
2560             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2561             "  -f <seconds>  Interval in which to flush dead data.\n"
2562             "  -p <file>     Location of the PID-file.\n"
2563             "  -b <dir>      Base directory to change to.\n"
2564             "  -B            Restrict file access to paths within -b <dir>\n"
2565             "  -g            Do not fork and run in the foreground.\n"
2566             "  -j <dir>      Directory in which to create the journal files.\n"
2567             "  -F            Always flush all updates at shutdown\n"
2568             "\n"
2569             "For more information and a detailed description of all options "
2570             "please refer\n"
2571             "to the rrdcached(1) manual page.\n",
2572             VERSION);
2573         status = -1;
2574         break;
2575     } /* switch (option) */
2576   } /* while (getopt) */
2578   /* advise the user when values are not sane */
2579   if (config_flush_interval < 2 * config_write_interval)
2580     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2581             " 2x write interval (-w) !\n");
2582   if (config_write_jitter > config_write_interval)
2583     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2584             " write interval (-w) !\n");
2586   if (config_write_base_only && config_base_dir == NULL)
2587     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2588             "  Consult the rrdcached documentation\n");
2590   if (journal_cur == NULL)
2591     config_flush_at_shutdown = 1;
2593   return (status);
2594 } /* }}} int read_options */
2596 int main (int argc, char **argv)
2598   int status;
2600   status = read_options (argc, argv);
2601   if (status != 0)
2602   {
2603     if (status < 0)
2604       status = 0;
2605     return (status);
2606   }
2608   status = daemonize ();
2609   if (status == 1)
2610   {
2611     struct sigaction sigchld;
2613     memset (&sigchld, 0, sizeof (sigchld));
2614     sigchld.sa_handler = SIG_IGN;
2615     sigaction (SIGCHLD, &sigchld, NULL);
2617     return (0);
2618   }
2619   else if (status != 0)
2620   {
2621     fprintf (stderr, "daemonize failed, exiting.\n");
2622     return (1);
2623   }
2625   journal_init();
2627   /* start the queue thread */
2628   memset (&queue_thread, 0, sizeof (queue_thread));
2629   status = pthread_create (&queue_thread,
2630                            NULL, /* attr */
2631                            queue_thread_main,
2632                            NULL); /* args */
2633   if (status != 0)
2634   {
2635     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2636     cleanup();
2637     return (1);
2638   }
2640   listen_thread_main (NULL);
2641   cleanup ();
2643   return (0);
2644 } /* int main */
2646 /*
2647  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2648  */