Code

rrdcached treats relative and absolute paths the same -- kevin
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 typedef enum
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
119   /* state for BATCH processing */
120   time_t batch_start;
121   int batch_cmd;
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
150 struct callback_flush_data_s
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
159 enum queue_side_e
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(void) /* {{{ */
291   int fd;
292   char *file;
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
298   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
301             file, rrd_strerror(errno));
303   return(fd);
304 } /* }}} static int open_pidfile */
306 static int write_pidfile (int fd) /* {{{ */
308   pid_t pid;
309   FILE *fh;
311   pid = getpid ();
313   fh = fdopen (fd, "w");
314   if (fh == NULL)
315   {
316     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
317     close(fd);
318     return (-1);
319   }
321   fprintf (fh, "%i\n", (int) pid);
322   fclose (fh);
324   return (0);
325 } /* }}} int write_pidfile */
327 static int remove_pidfile (void) /* {{{ */
329   char *file;
330   int status;
332   file = (config_pid_file != NULL)
333     ? config_pid_file
334     : LOCALSTATEDIR "/run/rrdcached.pid";
336   status = unlink (file);
337   if (status == 0)
338     return (0);
339   return (errno);
340 } /* }}} int remove_pidfile */
342 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
344   char *eol;
346   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
347                sock->next_read - sock->next_cmd);
349   if (eol == NULL)
350   {
351     /* no commands left, move remainder back to front of rbuf */
352     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
353             sock->next_read - sock->next_cmd);
354     sock->next_read -= sock->next_cmd;
355     sock->next_cmd = 0;
356     *len = 0;
357     return NULL;
358   }
359   else
360   {
361     char *cmd = sock->rbuf + sock->next_cmd;
362     *eol = '\0';
364     sock->next_cmd = eol - sock->rbuf + 1;
366     if (eol > sock->rbuf && *(eol-1) == '\r')
367       *(--eol) = '\0'; /* handle "\r\n" EOL */
369     *len = eol - cmd;
371     return cmd;
372   }
374   /* NOTREACHED */
375   assert(1==0);
378 /* add the characters directly to the write buffer */
379 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
381   char *new_buf;
383   assert(sock != NULL);
385   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
386   if (new_buf == NULL)
387   {
388     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
389     return -1;
390   }
392   strncpy(new_buf + sock->wbuf_len, str, len + 1);
394   sock->wbuf = new_buf;
395   sock->wbuf_len += len;
397   return 0;
398 } /* }}} static int add_to_wbuf */
400 /* add the text to the "extra" info that's sent after the status line */
401 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
403   va_list argp;
404   char buffer[CMD_MAX];
405   int len;
407   if (sock == NULL) return 0; /* journal replay mode */
408   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
410   va_start(argp, fmt);
411 #ifdef HAVE_VSNPRINTF
412   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
413 #else
414   len = vsprintf(buffer, fmt, argp);
415 #endif
416   va_end(argp);
417   if (len < 0)
418   {
419     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
420     return -1;
421   }
423   return add_to_wbuf(sock, buffer, len);
424 } /* }}} static int add_response_info */
426 static int count_lines(char *str) /* {{{ */
428   int lines = 0;
430   if (str != NULL)
431   {
432     while ((str = strchr(str, '\n')) != NULL)
433     {
434       ++lines;
435       ++str;
436     }
437   }
439   return lines;
440 } /* }}} static int count_lines */
442 /* send the response back to the user.
443  * returns 0 on success, -1 on error
444  * write buffer is always zeroed after this call */
445 static int send_response (listen_socket_t *sock, response_code rc,
446                           char *fmt, ...) /* {{{ */
448   va_list argp;
449   char buffer[CMD_MAX];
450   int lines;
451   ssize_t wrote;
452   int rclen, len;
454   if (sock == NULL) return rc;  /* journal replay mode */
456   if (sock->batch_start)
457   {
458     if (rc == RESP_OK)
459       return rc; /* no response on success during BATCH */
460     lines = sock->batch_cmd;
461   }
462   else if (rc == RESP_OK)
463     lines = count_lines(sock->wbuf);
464   else
465     lines = -1;
467   rclen = sprintf(buffer, "%d ", lines);
468   va_start(argp, fmt);
469 #ifdef HAVE_VSNPRINTF
470   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
471 #else
472   len = vsprintf(buffer+rclen, fmt, argp);
473 #endif
474   va_end(argp);
475   if (len < 0)
476     return -1;
478   len += rclen;
480   /* append the result to the wbuf, don't write to the user */
481   if (sock->batch_start)
482     return add_to_wbuf(sock, buffer, len);
484   /* first write must be complete */
485   if (len != write(sock->fd, buffer, len))
486   {
487     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
488     return -1;
489   }
491   if (sock->wbuf != NULL && rc == RESP_OK)
492   {
493     wrote = 0;
494     while (wrote < sock->wbuf_len)
495     {
496       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
497       if (wb <= 0)
498       {
499         RRDD_LOG(LOG_INFO, "send_response: could not write results");
500         return -1;
501       }
502       wrote += wb;
503     }
504   }
506   free(sock->wbuf); sock->wbuf = NULL;
507   sock->wbuf_len = 0;
509   return 0;
510 } /* }}} */
512 static void wipe_ci_values(cache_item_t *ci, time_t when)
514   ci->values = NULL;
515   ci->values_num = 0;
517   ci->last_flush_time = when;
518   if (config_write_jitter > 0)
519     ci->last_flush_time += (random() % config_write_jitter);
522 /* remove_from_queue
523  * remove a "cache_item_t" item from the queue.
524  * must hold 'cache_lock' when calling this
525  */
526 static void remove_from_queue(cache_item_t *ci) /* {{{ */
528   if (ci == NULL) return;
530   if (ci->prev == NULL)
531     cache_queue_head = ci->next; /* reset head */
532   else
533     ci->prev->next = ci->next;
535   if (ci->next == NULL)
536     cache_queue_tail = ci->prev; /* reset the tail */
537   else
538     ci->next->prev = ci->prev;
540   ci->next = ci->prev = NULL;
541   ci->flags &= ~CI_FLAGS_IN_QUEUE;
542 } /* }}} static void remove_from_queue */
544 /* remove an entry from the tree and free all its resources.
545  * must hold 'cache lock' while calling this.
546  * returns 0 on success, otherwise errno */
547 static int forget_file(const char *file)
549   cache_item_t *ci;
551   ci = g_tree_lookup(cache_tree, file);
552   if (ci == NULL)
553     return ENOENT;
555   g_tree_remove (cache_tree, file);
556   remove_from_queue(ci);
558   for (int i=0; i < ci->values_num; i++)
559     free(ci->values[i]);
561   free (ci->values);
562   free (ci->file);
564   /* in case anyone is waiting */
565   pthread_cond_broadcast(&ci->flushed);
567   free (ci);
569   return 0;
570 } /* }}} static int forget_file */
572 /*
573  * enqueue_cache_item:
574  * `cache_lock' must be acquired before calling this function!
575  */
576 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
577     queue_side_t side)
579   if (ci == NULL)
580     return (-1);
582   if (ci->values_num == 0)
583     return (0);
585   if (side == HEAD)
586   {
587     if (cache_queue_head == ci)
588       return 0;
590     /* remove from the double linked list */
591     if (ci->flags & CI_FLAGS_IN_QUEUE)
592       remove_from_queue(ci);
594     ci->prev = NULL;
595     ci->next = cache_queue_head;
596     if (ci->next != NULL)
597       ci->next->prev = ci;
598     cache_queue_head = ci;
600     if (cache_queue_tail == NULL)
601       cache_queue_tail = cache_queue_head;
602   }
603   else /* (side == TAIL) */
604   {
605     /* We don't move values back in the list.. */
606     if (ci->flags & CI_FLAGS_IN_QUEUE)
607       return (0);
609     assert (ci->next == NULL);
610     assert (ci->prev == NULL);
612     ci->prev = cache_queue_tail;
614     if (cache_queue_tail == NULL)
615       cache_queue_head = ci;
616     else
617       cache_queue_tail->next = ci;
619     cache_queue_tail = ci;
620   }
622   ci->flags |= CI_FLAGS_IN_QUEUE;
624   pthread_cond_broadcast(&cache_cond);
625   pthread_mutex_lock (&stats_lock);
626   stats_queue_length++;
627   pthread_mutex_unlock (&stats_lock);
629   return (0);
630 } /* }}} int enqueue_cache_item */
632 /*
633  * tree_callback_flush:
634  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
635  * while this is in progress.
636  */
637 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
638     gpointer data)
640   cache_item_t *ci;
641   callback_flush_data_t *cfd;
643   ci = (cache_item_t *) value;
644   cfd = (callback_flush_data_t *) data;
646   if ((ci->last_flush_time <= cfd->abs_timeout)
647       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
648       && (ci->values_num > 0))
649   {
650     enqueue_cache_item (ci, TAIL);
651   }
652   else if ((do_shutdown != 0)
653       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
654       && (ci->values_num > 0))
655   {
656     enqueue_cache_item (ci, TAIL);
657   }
658   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
659       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
660       && (ci->values_num <= 0))
661   {
662     char **temp;
664     temp = (char **) realloc (cfd->keys,
665         sizeof (char *) * (cfd->keys_num + 1));
666     if (temp == NULL)
667     {
668       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
669       return (FALSE);
670     }
671     cfd->keys = temp;
672     /* Make really sure this points to the _same_ place */
673     assert ((char *) key == ci->file);
674     cfd->keys[cfd->keys_num] = (char *) key;
675     cfd->keys_num++;
676   }
678   return (FALSE);
679 } /* }}} gboolean tree_callback_flush */
681 static int flush_old_values (int max_age)
683   callback_flush_data_t cfd;
684   size_t k;
686   memset (&cfd, 0, sizeof (cfd));
687   /* Pass the current time as user data so that we don't need to call
688    * `time' for each node. */
689   cfd.now = time (NULL);
690   cfd.keys = NULL;
691   cfd.keys_num = 0;
693   if (max_age > 0)
694     cfd.abs_timeout = cfd.now - max_age;
695   else
696     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
698   /* `tree_callback_flush' will return the keys of all values that haven't
699    * been touched in the last `config_flush_interval' seconds in `cfd'.
700    * The char*'s in this array point to the same memory as ci->file, so we
701    * don't need to free them separately. */
702   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
704   for (k = 0; k < cfd.keys_num; k++)
705   {
706     /* should never fail, since we have held the cache_lock
707      * the entire time */
708     assert( forget_file(cfd.keys[k]) == 0 );
709   }
711   if (cfd.keys != NULL)
712   {
713     free (cfd.keys);
714     cfd.keys = NULL;
715   }
717   return (0);
718 } /* int flush_old_values */
720 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
722   struct timeval now;
723   struct timespec next_flush;
724   int final_flush = 0; /* make sure we only flush once on shutdown */
726   gettimeofday (&now, NULL);
727   next_flush.tv_sec = now.tv_sec + config_flush_interval;
728   next_flush.tv_nsec = 1000 * now.tv_usec;
730   pthread_mutex_lock (&cache_lock);
731   while ((do_shutdown == 0) || (cache_queue_head != NULL))
732   {
733     cache_item_t *ci;
734     char *file;
735     char **values;
736     int values_num;
737     int status;
738     int i;
740     /* First, check if it's time to do the cache flush. */
741     gettimeofday (&now, NULL);
742     if ((now.tv_sec > next_flush.tv_sec)
743         || ((now.tv_sec == next_flush.tv_sec)
744           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
745     {
746       /* Flush all values that haven't been written in the last
747        * `config_write_interval' seconds. */
748       flush_old_values (config_write_interval);
750       /* Determine the time of the next cache flush. */
751       next_flush.tv_sec =
752         now.tv_sec + next_flush.tv_sec % config_flush_interval;
754       /* unlock the cache while we rotate so we don't block incoming
755        * updates if the fsync() blocks on disk I/O */
756       pthread_mutex_unlock(&cache_lock);
757       journal_rotate();
758       pthread_mutex_lock(&cache_lock);
759     }
761     /* Now, check if there's something to store away. If not, wait until
762      * something comes in or it's time to do the cache flush.  if we are
763      * shutting down, do not wait around.  */
764     if (cache_queue_head == NULL && !do_shutdown)
765     {
766       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
767       if ((status != 0) && (status != ETIMEDOUT))
768       {
769         RRDD_LOG (LOG_ERR, "queue_thread_main: "
770             "pthread_cond_timedwait returned %i.", status);
771       }
772     }
774     /* We're about to shut down */
775     if (do_shutdown != 0 && !final_flush++)
776     {
777       if (config_flush_at_shutdown)
778         flush_old_values (-1); /* flush everything */
779       else
780         break;
781     }
783     /* Check if a value has arrived. This may be NULL if we timed out or there
784      * was an interrupt such as a signal. */
785     if (cache_queue_head == NULL)
786       continue;
788     ci = cache_queue_head;
790     /* copy the relevant parts */
791     file = strdup (ci->file);
792     if (file == NULL)
793     {
794       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
795       continue;
796     }
798     assert(ci->values != NULL);
799     assert(ci->values_num > 0);
801     values = ci->values;
802     values_num = ci->values_num;
804     wipe_ci_values(ci, time(NULL));
805     remove_from_queue(ci);
807     pthread_mutex_lock (&stats_lock);
808     assert (stats_queue_length > 0);
809     stats_queue_length--;
810     pthread_mutex_unlock (&stats_lock);
812     pthread_mutex_unlock (&cache_lock);
814     rrd_clear_error ();
815     status = rrd_update_r (file, NULL, values_num, (void *) values);
816     if (status != 0)
817     {
818       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
819           "rrd_update_r (%s) failed with status %i. (%s)",
820           file, status, rrd_get_error());
821     }
823     journal_write("wrote", file);
824     pthread_cond_broadcast(&ci->flushed);
826     for (i = 0; i < values_num; i++)
827       free (values[i]);
829     free(values);
830     free(file);
832     if (status == 0)
833     {
834       pthread_mutex_lock (&stats_lock);
835       stats_updates_written++;
836       stats_data_sets_written += values_num;
837       pthread_mutex_unlock (&stats_lock);
838     }
840     pthread_mutex_lock (&cache_lock);
842     /* We're about to shut down */
843     if (do_shutdown != 0 && !final_flush++)
844     {
845       if (config_flush_at_shutdown)
846           flush_old_values (-1); /* flush everything */
847       else
848         break;
849     }
850   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
851   pthread_mutex_unlock (&cache_lock);
853   if (config_flush_at_shutdown)
854   {
855     assert(cache_queue_head == NULL);
856     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
857   }
859   journal_done();
861   return (NULL);
862 } /* }}} void *queue_thread_main */
864 static int buffer_get_field (char **buffer_ret, /* {{{ */
865     size_t *buffer_size_ret, char **field_ret)
867   char *buffer;
868   size_t buffer_pos;
869   size_t buffer_size;
870   char *field;
871   size_t field_size;
872   int status;
874   buffer = *buffer_ret;
875   buffer_pos = 0;
876   buffer_size = *buffer_size_ret;
877   field = *buffer_ret;
878   field_size = 0;
880   if (buffer_size <= 0)
881     return (-1);
883   /* This is ensured by `handle_request'. */
884   assert (buffer[buffer_size - 1] == '\0');
886   status = -1;
887   while (buffer_pos < buffer_size)
888   {
889     /* Check for end-of-field or end-of-buffer */
890     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
891     {
892       field[field_size] = 0;
893       field_size++;
894       buffer_pos++;
895       status = 0;
896       break;
897     }
898     /* Handle escaped characters. */
899     else if (buffer[buffer_pos] == '\\')
900     {
901       if (buffer_pos >= (buffer_size - 1))
902         break;
903       buffer_pos++;
904       field[field_size] = buffer[buffer_pos];
905       field_size++;
906       buffer_pos++;
907     }
908     /* Normal operation */ 
909     else
910     {
911       field[field_size] = buffer[buffer_pos];
912       field_size++;
913       buffer_pos++;
914     }
915   } /* while (buffer_pos < buffer_size) */
917   if (status != 0)
918     return (status);
920   *buffer_ret = buffer + buffer_pos;
921   *buffer_size_ret = buffer_size - buffer_pos;
922   *field_ret = field;
924   return (0);
925 } /* }}} int buffer_get_field */
927 /* if we're restricting writes to the base directory,
928  * check whether the file falls within the dir
929  * returns 1 if OK, otherwise 0
930  */
931 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
933   assert(file != NULL);
935   if (!config_write_base_only
936       || sock == NULL /* journal replay */
937       || config_base_dir == NULL)
938     return 1;
940   if (strstr(file, "../") != NULL) goto err;
942   /* relative paths without "../" are ok */
943   if (*file != '/') return 1;
945   /* file must be of the format base + "/" + <1+ char filename> */
946   if (strlen(file) < _config_base_dir_len + 2) goto err;
947   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
948   if (*(file + _config_base_dir_len) != '/') goto err;
950   return 1;
952 err:
953   if (sock != NULL && sock->fd >= 0)
954     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
956   return 0;
957 } /* }}} static int check_file_access */
959 /* when using a base dir, convert relative paths to absolute paths.
960  * if necessary, modifies the "filename" pointer to point
961  * to the new path created in "tmp".  "tmp" is provided
962  * by the caller and sizeof(tmp) must be >= PATH_MAX.
963  *
964  * this allows us to optimize for the expected case (absolute path)
965  * with a no-op.
966  */
967 static void get_abs_path(char **filename, char *tmp)
969   assert(tmp != NULL);
970   assert(filename != NULL && *filename != NULL);
972   if (config_base_dir == NULL || **filename == '/')
973     return;
975   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
976   *filename = tmp;
977 } /* }}} static int get_abs_path */
979 /* returns 1 if we have the required privilege level,
980  * otherwise issue an error to the user on sock */
981 static int has_privilege (listen_socket_t *sock, /* {{{ */
982                           socket_privilege priv)
984   if (sock == NULL) /* journal replay */
985     return 1;
987   if (sock->privilege >= priv)
988     return 1;
990   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
991 } /* }}} static int has_privilege */
993 static int flush_file (const char *filename) /* {{{ */
995   cache_item_t *ci;
997   pthread_mutex_lock (&cache_lock);
999   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1000   if (ci == NULL)
1001   {
1002     pthread_mutex_unlock (&cache_lock);
1003     return (ENOENT);
1004   }
1006   if (ci->values_num > 0)
1007   {
1008     /* Enqueue at head */
1009     enqueue_cache_item (ci, HEAD);
1010     pthread_cond_wait(&ci->flushed, &cache_lock);
1011   }
1013   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1014    * may have been purged during our cond_wait() */
1016   pthread_mutex_unlock(&cache_lock);
1018   return (0);
1019 } /* }}} int flush_file */
1021 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1022     char *buffer, size_t buffer_size)
1024   int status;
1025   char **help_text;
1026   char *command;
1028   char *help_help[2] =
1029   {
1030     "Command overview\n"
1031     ,
1032     "HELP [<command>]\n"
1033     "FLUSH <filename>\n"
1034     "FLUSHALL\n"
1035     "PENDING <filename>\n"
1036     "FORGET <filename>\n"
1037     "UPDATE <filename> <values> [<values> ...]\n"
1038     "BATCH\n"
1039     "STATS\n"
1040   };
1042   char *help_flush[2] =
1043   {
1044     "Help for FLUSH\n"
1045     ,
1046     "Usage: FLUSH <filename>\n"
1047     "\n"
1048     "Adds the given filename to the head of the update queue and returns\n"
1049     "after is has been dequeued.\n"
1050   };
1052   char *help_flushall[2] =
1053   {
1054     "Help for FLUSHALL\n"
1055     ,
1056     "Usage: FLUSHALL\n"
1057     "\n"
1058     "Triggers writing of all pending updates.  Returns immediately.\n"
1059   };
1061   char *help_pending[2] =
1062   {
1063     "Help for PENDING\n"
1064     ,
1065     "Usage: PENDING <filename>\n"
1066     "\n"
1067     "Shows any 'pending' updates for a file, in order.\n"
1068     "The updates shown have not yet been written to the underlying RRD file.\n"
1069   };
1071   char *help_forget[2] =
1072   {
1073     "Help for FORGET\n"
1074     ,
1075     "Usage: FORGET <filename>\n"
1076     "\n"
1077     "Removes the file completely from the cache.\n"
1078     "Any pending updates for the file will be lost.\n"
1079   };
1081   char *help_update[2] =
1082   {
1083     "Help for UPDATE\n"
1084     ,
1085     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1086     "\n"
1087     "Adds the given file to the internal cache if it is not yet known and\n"
1088     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1089     "for details.\n"
1090     "\n"
1091     "Each <values> has the following form:\n"
1092     "  <values> = <time>:<value>[:<value>[...]]\n"
1093     "See the rrdupdate(1) manpage for details.\n"
1094   };
1096   char *help_stats[2] =
1097   {
1098     "Help for STATS\n"
1099     ,
1100     "Usage: STATS\n"
1101     "\n"
1102     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1103     "a description of the values.\n"
1104   };
1106   char *help_batch[2] =
1107   {
1108     "Help for BATCH\n"
1109     ,
1110     "The 'BATCH' command permits the client to initiate a bulk load\n"
1111     "   of commands to rrdcached.\n"
1112     "\n"
1113     "Usage:\n"
1114     "\n"
1115     "    client: BATCH\n"
1116     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1117     "    client: command #1\n"
1118     "    client: command #2\n"
1119     "    client: ... and so on\n"
1120     "    client: .\n"
1121     "    server: 2 errors\n"
1122     "    server: 7 message for command #7\n"
1123     "    server: 9 message for command #9\n"
1124     "\n"
1125     "For more information, consult the rrdcached(1) documentation.\n"
1126   };
1128   status = buffer_get_field (&buffer, &buffer_size, &command);
1129   if (status != 0)
1130     help_text = help_help;
1131   else
1132   {
1133     if (strcasecmp (command, "update") == 0)
1134       help_text = help_update;
1135     else if (strcasecmp (command, "flush") == 0)
1136       help_text = help_flush;
1137     else if (strcasecmp (command, "flushall") == 0)
1138       help_text = help_flushall;
1139     else if (strcasecmp (command, "pending") == 0)
1140       help_text = help_pending;
1141     else if (strcasecmp (command, "forget") == 0)
1142       help_text = help_forget;
1143     else if (strcasecmp (command, "stats") == 0)
1144       help_text = help_stats;
1145     else if (strcasecmp (command, "batch") == 0)
1146       help_text = help_batch;
1147     else
1148       help_text = help_help;
1149   }
1151   add_response_info(sock, help_text[1]);
1152   return send_response(sock, RESP_OK, help_text[0]);
1153 } /* }}} int handle_request_help */
1155 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1157   uint64_t copy_queue_length;
1158   uint64_t copy_updates_received;
1159   uint64_t copy_flush_received;
1160   uint64_t copy_updates_written;
1161   uint64_t copy_data_sets_written;
1162   uint64_t copy_journal_bytes;
1163   uint64_t copy_journal_rotate;
1165   uint64_t tree_nodes_number;
1166   uint64_t tree_depth;
1168   pthread_mutex_lock (&stats_lock);
1169   copy_queue_length       = stats_queue_length;
1170   copy_updates_received   = stats_updates_received;
1171   copy_flush_received     = stats_flush_received;
1172   copy_updates_written    = stats_updates_written;
1173   copy_data_sets_written  = stats_data_sets_written;
1174   copy_journal_bytes      = stats_journal_bytes;
1175   copy_journal_rotate     = stats_journal_rotate;
1176   pthread_mutex_unlock (&stats_lock);
1178   pthread_mutex_lock (&cache_lock);
1179   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1180   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1181   pthread_mutex_unlock (&cache_lock);
1183   add_response_info(sock,
1184                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1185   add_response_info(sock,
1186                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1187   add_response_info(sock,
1188                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1189   add_response_info(sock,
1190                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1191   add_response_info(sock,
1192                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1193   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1194   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1195   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1196   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1198   send_response(sock, RESP_OK, "Statistics follow\n");
1200   return (0);
1201 } /* }}} int handle_request_stats */
1203 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1204     char *buffer, size_t buffer_size)
1206   char *file, file_tmp[PATH_MAX];
1207   int status;
1209   status = buffer_get_field (&buffer, &buffer_size, &file);
1210   if (status != 0)
1211   {
1212     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1213   }
1214   else
1215   {
1216     pthread_mutex_lock(&stats_lock);
1217     stats_flush_received++;
1218     pthread_mutex_unlock(&stats_lock);
1220     get_abs_path(&file, file_tmp);
1221     if (!check_file_access(file, sock)) return 0;
1223     status = flush_file (file);
1224     if (status == 0)
1225       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1226     else if (status == ENOENT)
1227     {
1228       /* no file in our tree; see whether it exists at all */
1229       struct stat statbuf;
1231       memset(&statbuf, 0, sizeof(statbuf));
1232       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1233         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1234       else
1235         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1236     }
1237     else if (status < 0)
1238       return send_response(sock, RESP_ERR, "Internal error.\n");
1239     else
1240       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1241   }
1243   /* NOTREACHED */
1244   assert(1==0);
1245 } /* }}} int handle_request_flush */
1247 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1249   int status;
1251   status = has_privilege(sock, PRIV_HIGH);
1252   if (status <= 0)
1253     return status;
1255   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1257   pthread_mutex_lock(&cache_lock);
1258   flush_old_values(-1);
1259   pthread_mutex_unlock(&cache_lock);
1261   return send_response(sock, RESP_OK, "Started flush.\n");
1262 } /* }}} static int handle_request_flushall */
1264 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1265                                   char *buffer, size_t buffer_size)
1267   int status;
1268   char *file, file_tmp[PATH_MAX];
1269   cache_item_t *ci;
1271   status = buffer_get_field(&buffer, &buffer_size, &file);
1272   if (status != 0)
1273     return send_response(sock, RESP_ERR,
1274                          "Usage: PENDING <filename>\n");
1276   status = has_privilege(sock, PRIV_HIGH);
1277   if (status <= 0)
1278     return status;
1280   get_abs_path(&file, file_tmp);
1282   pthread_mutex_lock(&cache_lock);
1283   ci = g_tree_lookup(cache_tree, file);
1284   if (ci == NULL)
1285   {
1286     pthread_mutex_unlock(&cache_lock);
1287     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1288   }
1290   for (int i=0; i < ci->values_num; i++)
1291     add_response_info(sock, "%s\n", ci->values[i]);
1293   pthread_mutex_unlock(&cache_lock);
1294   return send_response(sock, RESP_OK, "updates pending\n");
1295 } /* }}} static int handle_request_pending */
1297 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1298                                  char *buffer, size_t buffer_size)
1300   int status;
1301   char *file, file_tmp[PATH_MAX];
1303   status = buffer_get_field(&buffer, &buffer_size, &file);
1304   if (status != 0)
1305     return send_response(sock, RESP_ERR,
1306                          "Usage: FORGET <filename>\n");
1308   status = has_privilege(sock, PRIV_HIGH);
1309   if (status <= 0)
1310     return status;
1312   get_abs_path(&file, file_tmp);
1313   if (!check_file_access(file, sock)) return 0;
1315   pthread_mutex_lock(&cache_lock);
1316   status = forget_file(file);
1317   pthread_mutex_unlock(&cache_lock);
1319   if (status == 0)
1320   {
1321     if (sock != NULL)
1322       journal_write("forget", file);
1324     return send_response(sock, RESP_OK, "Gone!\n");
1325   }
1326   else
1327     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1328                          status < 0 ? "Internal error" : rrd_strerror(status));
1330   /* NOTREACHED */
1331   assert(1==0);
1332 } /* }}} static int handle_request_forget */
1334 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1335                                   time_t now,
1336                                   char *buffer, size_t buffer_size)
1338   char *file, file_tmp[PATH_MAX];
1339   int values_num = 0;
1340   int bad_timestamps = 0;
1341   int status;
1342   char orig_buf[CMD_MAX];
1344   cache_item_t *ci;
1346   status = has_privilege(sock, PRIV_HIGH);
1347   if (status <= 0)
1348     return status;
1350   /* save it for the journal later */
1351   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1353   status = buffer_get_field (&buffer, &buffer_size, &file);
1354   if (status != 0)
1355     return send_response(sock, RESP_ERR,
1356                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1358   pthread_mutex_lock(&stats_lock);
1359   stats_updates_received++;
1360   pthread_mutex_unlock(&stats_lock);
1362   get_abs_path(&file, file_tmp);
1363   if (!check_file_access(file, sock)) return 0;
1365   pthread_mutex_lock (&cache_lock);
1366   ci = g_tree_lookup (cache_tree, file);
1368   if (ci == NULL) /* {{{ */
1369   {
1370     struct stat statbuf;
1372     /* don't hold the lock while we setup; stat(2) might block */
1373     pthread_mutex_unlock(&cache_lock);
1375     memset (&statbuf, 0, sizeof (statbuf));
1376     status = stat (file, &statbuf);
1377     if (status != 0)
1378     {
1379       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1381       status = errno;
1382       if (status == ENOENT)
1383         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1384       else
1385         return send_response(sock, RESP_ERR,
1386                              "stat failed with error %i.\n", status);
1387     }
1388     if (!S_ISREG (statbuf.st_mode))
1389       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1391     if (access(file, R_OK|W_OK) != 0)
1392       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1393                            file, rrd_strerror(errno));
1395     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1396     if (ci == NULL)
1397     {
1398       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1400       return send_response(sock, RESP_ERR, "malloc failed.\n");
1401     }
1402     memset (ci, 0, sizeof (cache_item_t));
1404     ci->file = strdup (file);
1405     if (ci->file == NULL)
1406     {
1407       free (ci);
1408       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1410       return send_response(sock, RESP_ERR, "strdup failed.\n");
1411     }
1413     wipe_ci_values(ci, now);
1414     ci->flags = CI_FLAGS_IN_TREE;
1416     pthread_mutex_lock(&cache_lock);
1417     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1418   } /* }}} */
1419   assert (ci != NULL);
1421   /* don't re-write updates in replay mode */
1422   if (sock != NULL)
1423     journal_write("update", orig_buf);
1425   while (buffer_size > 0)
1426   {
1427     char **temp;
1428     char *value;
1429     time_t stamp;
1430     char *eostamp;
1432     status = buffer_get_field (&buffer, &buffer_size, &value);
1433     if (status != 0)
1434     {
1435       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1436       break;
1437     }
1439     /* make sure update time is always moving forward */
1440     stamp = strtol(value, &eostamp, 10);
1441     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1442     {
1443       ++bad_timestamps;
1444       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1445       continue;
1446     }
1447     else if (stamp <= ci->last_update_stamp)
1448     {
1449       ++bad_timestamps;
1450       add_response_info(sock,
1451                         "illegal attempt to update using time %ld when"
1452                         " last update time is %ld (minimum one second step)\n",
1453                         stamp, ci->last_update_stamp);
1454       continue;
1455     }
1456     else
1457       ci->last_update_stamp = stamp;
1459     temp = (char **) realloc (ci->values,
1460         sizeof (char *) * (ci->values_num + 1));
1461     if (temp == NULL)
1462     {
1463       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1464       continue;
1465     }
1466     ci->values = temp;
1468     ci->values[ci->values_num] = strdup (value);
1469     if (ci->values[ci->values_num] == NULL)
1470     {
1471       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1472       continue;
1473     }
1474     ci->values_num++;
1476     values_num++;
1477   }
1479   if (((now - ci->last_flush_time) >= config_write_interval)
1480       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1481       && (ci->values_num > 0))
1482   {
1483     enqueue_cache_item (ci, TAIL);
1484   }
1486   pthread_mutex_unlock (&cache_lock);
1488   if (values_num < 1)
1489   {
1490     /* if we had only one update attempt, then return the full
1491        error message... try to get the most information out
1492        of the limited error space allowed by the protocol
1493     */
1494     if (bad_timestamps == 1)
1495       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1496     else
1497       return send_response(sock, RESP_ERR,
1498                            "No values updated (%d bad timestamps).\n",
1499                            bad_timestamps);
1500   }
1501   else
1502     return send_response(sock, RESP_OK,
1503                          "errors, enqueued %i value(s).\n", values_num);
1505   /* NOTREACHED */
1506   assert(1==0);
1508 } /* }}} int handle_request_update */
1510 /* we came across a "WROTE" entry during journal replay.
1511  * throw away any values that we have accumulated for this file
1512  */
1513 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1515   int i;
1516   cache_item_t *ci;
1517   const char *file = buffer;
1519   pthread_mutex_lock(&cache_lock);
1521   ci = g_tree_lookup(cache_tree, file);
1522   if (ci == NULL)
1523   {
1524     pthread_mutex_unlock(&cache_lock);
1525     return (0);
1526   }
1528   if (ci->values)
1529   {
1530     for (i=0; i < ci->values_num; i++)
1531       free(ci->values[i]);
1533     free(ci->values);
1534   }
1536   wipe_ci_values(ci, now);
1537   remove_from_queue(ci);
1539   pthread_mutex_unlock(&cache_lock);
1540   return (0);
1541 } /* }}} int handle_request_wrote */
1543 /* start "BATCH" processing */
1544 static int batch_start (listen_socket_t *sock) /* {{{ */
1546   int status;
1547   if (sock->batch_start)
1548     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1550   status = send_response(sock, RESP_OK,
1551                          "Go ahead.  End with dot '.' on its own line.\n");
1552   sock->batch_start = time(NULL);
1553   sock->batch_cmd = 0;
1555   return status;
1556 } /* }}} static int batch_start */
1558 /* finish "BATCH" processing and return results to the client */
1559 static int batch_done (listen_socket_t *sock) /* {{{ */
1561   assert(sock->batch_start);
1562   sock->batch_start = 0;
1563   sock->batch_cmd  = 0;
1564   return send_response(sock, RESP_OK, "errors\n");
1565 } /* }}} static int batch_done */
1567 /* if sock==NULL, we are in journal replay mode */
1568 static int handle_request (listen_socket_t *sock, /* {{{ */
1569                            time_t now,
1570                            char *buffer, size_t buffer_size)
1572   char *buffer_ptr;
1573   char *command;
1574   int status;
1576   assert (buffer[buffer_size - 1] == '\0');
1578   buffer_ptr = buffer;
1579   command = NULL;
1580   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1581   if (status != 0)
1582   {
1583     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1584     return (-1);
1585   }
1587   if (sock != NULL && sock->batch_start)
1588     sock->batch_cmd++;
1590   if (strcasecmp (command, "update") == 0)
1591     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1592   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1593   {
1594     /* this is only valid in replay mode */
1595     return (handle_request_wrote (buffer_ptr, now));
1596   }
1597   else if (strcasecmp (command, "flush") == 0)
1598     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1599   else if (strcasecmp (command, "flushall") == 0)
1600     return (handle_request_flushall(sock));
1601   else if (strcasecmp (command, "pending") == 0)
1602     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1603   else if (strcasecmp (command, "forget") == 0)
1604     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1605   else if (strcasecmp (command, "stats") == 0)
1606     return (handle_request_stats (sock));
1607   else if (strcasecmp (command, "help") == 0)
1608     return (handle_request_help (sock, buffer_ptr, buffer_size));
1609   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1610     return batch_start(sock);
1611   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1612     return batch_done(sock);
1613   else
1614     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1616   /* NOTREACHED */
1617   assert(1==0);
1618 } /* }}} int handle_request */
1620 /* MUST NOT hold journal_lock before calling this */
1621 static void journal_rotate(void) /* {{{ */
1623   FILE *old_fh = NULL;
1624   int new_fd;
1626   if (journal_cur == NULL || journal_old == NULL)
1627     return;
1629   pthread_mutex_lock(&journal_lock);
1631   /* we rotate this way (rename before close) so that the we can release
1632    * the journal lock as fast as possible.  Journal writes to the new
1633    * journal can proceed immediately after the new file is opened.  The
1634    * fclose can then block without affecting new updates.
1635    */
1636   if (journal_fh != NULL)
1637   {
1638     old_fh = journal_fh;
1639     journal_fh = NULL;
1640     rename(journal_cur, journal_old);
1641     ++stats_journal_rotate;
1642   }
1644   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1645                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1646   if (new_fd >= 0)
1647   {
1648     journal_fh = fdopen(new_fd, "a");
1649     if (journal_fh == NULL)
1650       close(new_fd);
1651   }
1653   pthread_mutex_unlock(&journal_lock);
1655   if (old_fh != NULL)
1656     fclose(old_fh);
1658   if (journal_fh == NULL)
1659   {
1660     RRDD_LOG(LOG_CRIT,
1661              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1662              journal_cur, rrd_strerror(errno));
1664     RRDD_LOG(LOG_ERR,
1665              "JOURNALING DISABLED: All values will be flushed at shutdown");
1666     config_flush_at_shutdown = 1;
1667   }
1669 } /* }}} static void journal_rotate */
1671 static void journal_done(void) /* {{{ */
1673   if (journal_cur == NULL)
1674     return;
1676   pthread_mutex_lock(&journal_lock);
1677   if (journal_fh != NULL)
1678   {
1679     fclose(journal_fh);
1680     journal_fh = NULL;
1681   }
1683   if (config_flush_at_shutdown)
1684   {
1685     RRDD_LOG(LOG_INFO, "removing journals");
1686     unlink(journal_old);
1687     unlink(journal_cur);
1688   }
1689   else
1690   {
1691     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1692              "journals will be used at next startup");
1693   }
1695   pthread_mutex_unlock(&journal_lock);
1697 } /* }}} static void journal_done */
1699 static int journal_write(char *cmd, char *args) /* {{{ */
1701   int chars;
1703   if (journal_fh == NULL)
1704     return 0;
1706   pthread_mutex_lock(&journal_lock);
1707   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1708   pthread_mutex_unlock(&journal_lock);
1710   if (chars > 0)
1711   {
1712     pthread_mutex_lock(&stats_lock);
1713     stats_journal_bytes += chars;
1714     pthread_mutex_unlock(&stats_lock);
1715   }
1717   return chars;
1718 } /* }}} static int journal_write */
1720 static int journal_replay (const char *file) /* {{{ */
1722   FILE *fh;
1723   int entry_cnt = 0;
1724   int fail_cnt = 0;
1725   uint64_t line = 0;
1726   char entry[CMD_MAX];
1727   time_t now;
1729   if (file == NULL) return 0;
1731   {
1732     char *reason;
1733     int status = 0;
1734     struct stat statbuf;
1736     memset(&statbuf, 0, sizeof(statbuf));
1737     if (stat(file, &statbuf) != 0)
1738     {
1739       if (errno == ENOENT)
1740         return 0;
1742       reason = "stat error";
1743       status = errno;
1744     }
1745     else if (!S_ISREG(statbuf.st_mode))
1746     {
1747       reason = "not a regular file";
1748       status = EPERM;
1749     }
1750     if (statbuf.st_uid != daemon_uid)
1751     {
1752       reason = "not owned by daemon user";
1753       status = EACCES;
1754     }
1755     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1756     {
1757       reason = "must not be user/group writable";
1758       status = EACCES;
1759     }
1761     if (status != 0)
1762     {
1763       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1764                file, rrd_strerror(status), reason);
1765       return 0;
1766     }
1767   }
1769   fh = fopen(file, "r");
1770   if (fh == NULL)
1771   {
1772     if (errno != ENOENT)
1773       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1774                file, rrd_strerror(errno));
1775     return 0;
1776   }
1777   else
1778     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1780   now = time(NULL);
1782   while(!feof(fh))
1783   {
1784     size_t entry_len;
1786     ++line;
1787     if (fgets(entry, sizeof(entry), fh) == NULL)
1788       break;
1789     entry_len = strlen(entry);
1791     /* check \n termination in case journal writing crashed mid-line */
1792     if (entry_len == 0)
1793       continue;
1794     else if (entry[entry_len - 1] != '\n')
1795     {
1796       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1797       ++fail_cnt;
1798       continue;
1799     }
1801     entry[entry_len - 1] = '\0';
1803     if (handle_request(NULL, now, entry, entry_len) == 0)
1804       ++entry_cnt;
1805     else
1806       ++fail_cnt;
1807   }
1809   fclose(fh);
1811   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1812            entry_cnt, fail_cnt);
1814   return entry_cnt > 0 ? 1 : 0;
1815 } /* }}} static int journal_replay */
1817 static void journal_init(void) /* {{{ */
1819   int had_journal = 0;
1821   if (journal_cur == NULL) return;
1823   pthread_mutex_lock(&journal_lock);
1825   RRDD_LOG(LOG_INFO, "checking for journal files");
1827   had_journal += journal_replay(journal_old);
1828   had_journal += journal_replay(journal_cur);
1830   /* it must have been a crash.  start a flush */
1831   if (had_journal && config_flush_at_shutdown)
1832     flush_old_values(-1);
1834   pthread_mutex_unlock(&journal_lock);
1835   journal_rotate();
1837   RRDD_LOG(LOG_INFO, "journal processing complete");
1839 } /* }}} static void journal_init */
1841 static void close_connection(listen_socket_t *sock)
1843   close(sock->fd) ;  sock->fd   = -1;
1844   free(sock->rbuf);  sock->rbuf = NULL;
1845   free(sock->wbuf);  sock->wbuf = NULL;
1847   free(sock);
1850 static void *connection_thread_main (void *args) /* {{{ */
1852   pthread_t self;
1853   listen_socket_t *sock;
1854   int i;
1855   int fd;
1857   sock = (listen_socket_t *) args;
1858   fd = sock->fd;
1860   /* init read buffers */
1861   sock->next_read = sock->next_cmd = 0;
1862   sock->rbuf = malloc(RBUF_SIZE);
1863   if (sock->rbuf == NULL)
1864   {
1865     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1866     close_connection(sock);
1867     return NULL;
1868   }
1870   pthread_mutex_lock (&connection_threads_lock);
1871   {
1872     pthread_t *temp;
1874     temp = (pthread_t *) realloc (connection_threads,
1875         sizeof (pthread_t) * (connection_threads_num + 1));
1876     if (temp == NULL)
1877     {
1878       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1879     }
1880     else
1881     {
1882       connection_threads = temp;
1883       connection_threads[connection_threads_num] = pthread_self ();
1884       connection_threads_num++;
1885     }
1886   }
1887   pthread_mutex_unlock (&connection_threads_lock);
1889   while (do_shutdown == 0)
1890   {
1891     char *cmd;
1892     ssize_t cmd_len;
1893     ssize_t rbytes;
1894     time_t now;
1896     struct pollfd pollfd;
1897     int status;
1899     pollfd.fd = fd;
1900     pollfd.events = POLLIN | POLLPRI;
1901     pollfd.revents = 0;
1903     status = poll (&pollfd, 1, /* timeout = */ 500);
1904     if (do_shutdown)
1905       break;
1906     else if (status == 0) /* timeout */
1907       continue;
1908     else if (status < 0) /* error */
1909     {
1910       status = errno;
1911       if (status != EINTR)
1912         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1913       continue;
1914     }
1916     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1917       break;
1918     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1919     {
1920       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1921           "poll(2) returned something unexpected: %#04hx",
1922           pollfd.revents);
1923       break;
1924     }
1926     rbytes = read(fd, sock->rbuf + sock->next_read,
1927                   RBUF_SIZE - sock->next_read);
1928     if (rbytes < 0)
1929     {
1930       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1931       break;
1932     }
1933     else if (rbytes == 0)
1934       break; /* eof */
1936     sock->next_read += rbytes;
1938     if (sock->batch_start)
1939       now = sock->batch_start;
1940     else
1941       now = time(NULL);
1943     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1944     {
1945       status = handle_request (sock, now, cmd, cmd_len+1);
1946       if (status != 0)
1947         goto out_close;
1948     }
1949   }
1951 out_close:
1952   close_connection(sock);
1954   self = pthread_self ();
1955   /* Remove this thread from the connection threads list */
1956   pthread_mutex_lock (&connection_threads_lock);
1957   /* Find out own index in the array */
1958   for (i = 0; i < connection_threads_num; i++)
1959     if (pthread_equal (connection_threads[i], self) != 0)
1960       break;
1961   assert (i < connection_threads_num);
1963   /* Move the trailing threads forward. */
1964   if (i < (connection_threads_num - 1))
1965   {
1966     memmove (connection_threads + i,
1967         connection_threads + i + 1,
1968         sizeof (pthread_t) * (connection_threads_num - i - 1));
1969   }
1971   connection_threads_num--;
1972   pthread_mutex_unlock (&connection_threads_lock);
1974   return (NULL);
1975 } /* }}} void *connection_thread_main */
1977 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1979   int fd;
1980   struct sockaddr_un sa;
1981   listen_socket_t *temp;
1982   int status;
1983   const char *path;
1985   path = sock->addr;
1986   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1987     path += strlen("unix:");
1989   temp = (listen_socket_t *) realloc (listen_fds,
1990       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1991   if (temp == NULL)
1992   {
1993     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1994     return (-1);
1995   }
1996   listen_fds = temp;
1997   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1999   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2000   if (fd < 0)
2001   {
2002     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
2003     return (-1);
2004   }
2006   memset (&sa, 0, sizeof (sa));
2007   sa.sun_family = AF_UNIX;
2008   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2010   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2011   if (status != 0)
2012   {
2013     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
2014     close (fd);
2015     unlink (path);
2016     return (-1);
2017   }
2019   status = listen (fd, /* backlog = */ 10);
2020   if (status != 0)
2021   {
2022     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
2023     close (fd);
2024     unlink (path);
2025     return (-1);
2026   }
2028   listen_fds[listen_fds_num].fd = fd;
2029   listen_fds[listen_fds_num].family = PF_UNIX;
2030   strncpy(listen_fds[listen_fds_num].addr, path,
2031           sizeof (listen_fds[listen_fds_num].addr) - 1);
2032   listen_fds_num++;
2034   return (0);
2035 } /* }}} int open_listen_socket_unix */
2037 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2039   struct addrinfo ai_hints;
2040   struct addrinfo *ai_res;
2041   struct addrinfo *ai_ptr;
2042   char addr_copy[NI_MAXHOST];
2043   char *addr;
2044   char *port;
2045   int status;
2047   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2048   addr_copy[sizeof (addr_copy) - 1] = 0;
2049   addr = addr_copy;
2051   memset (&ai_hints, 0, sizeof (ai_hints));
2052   ai_hints.ai_flags = 0;
2053 #ifdef AI_ADDRCONFIG
2054   ai_hints.ai_flags |= AI_ADDRCONFIG;
2055 #endif
2056   ai_hints.ai_family = AF_UNSPEC;
2057   ai_hints.ai_socktype = SOCK_STREAM;
2059   port = NULL;
2060   if (*addr == '[') /* IPv6+port format */
2061   {
2062     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2063     addr++;
2065     port = strchr (addr, ']');
2066     if (port == NULL)
2067     {
2068       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2069           sock->addr);
2070       return (-1);
2071     }
2072     *port = 0;
2073     port++;
2075     if (*port == ':')
2076       port++;
2077     else if (*port == 0)
2078       port = NULL;
2079     else
2080     {
2081       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2082           port);
2083       return (-1);
2084     }
2085   } /* if (*addr = ']') */
2086   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2087   {
2088     port = rindex(addr, ':');
2089     if (port != NULL)
2090     {
2091       *port = 0;
2092       port++;
2093     }
2094   }
2095   ai_res = NULL;
2096   status = getaddrinfo (addr,
2097                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2098                         &ai_hints, &ai_res);
2099   if (status != 0)
2100   {
2101     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2102         "%s", addr, gai_strerror (status));
2103     return (-1);
2104   }
2106   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2107   {
2108     int fd;
2109     listen_socket_t *temp;
2110     int one = 1;
2112     temp = (listen_socket_t *) realloc (listen_fds,
2113         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2114     if (temp == NULL)
2115     {
2116       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2117       continue;
2118     }
2119     listen_fds = temp;
2120     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2122     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2123     if (fd < 0)
2124     {
2125       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2126       continue;
2127     }
2129     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2131     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2132     if (status != 0)
2133     {
2134       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2135       close (fd);
2136       continue;
2137     }
2139     status = listen (fd, /* backlog = */ 10);
2140     if (status != 0)
2141     {
2142       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2143       close (fd);
2144       return (-1);
2145     }
2147     listen_fds[listen_fds_num].fd = fd;
2148     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2149     listen_fds_num++;
2150   } /* for (ai_ptr) */
2152   return (0);
2153 } /* }}} static int open_listen_socket_network */
2155 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2157   assert(sock != NULL);
2158   assert(sock->addr != NULL);
2160   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2161       || sock->addr[0] == '/')
2162     return (open_listen_socket_unix(sock));
2163   else
2164     return (open_listen_socket_network(sock));
2165 } /* }}} int open_listen_socket */
2167 static int close_listen_sockets (void) /* {{{ */
2169   size_t i;
2171   for (i = 0; i < listen_fds_num; i++)
2172   {
2173     close (listen_fds[i].fd);
2175     if (listen_fds[i].family == PF_UNIX)
2176       unlink(listen_fds[i].addr);
2177   }
2179   free (listen_fds);
2180   listen_fds = NULL;
2181   listen_fds_num = 0;
2183   return (0);
2184 } /* }}} int close_listen_sockets */
2186 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2188   struct pollfd *pollfds;
2189   int pollfds_num;
2190   int status;
2191   int i;
2193   for (i = 0; i < config_listen_address_list_len; i++)
2194     open_listen_socket (config_listen_address_list[i]);
2196   if (config_listen_address_list_len < 1)
2197   {
2198     listen_socket_t sock;
2199     memset(&sock, 0, sizeof(sock));
2200     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2201     open_listen_socket (&sock);
2202   }
2204   if (listen_fds_num < 1)
2205   {
2206     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2207         "could be opened. Sorry.");
2208     return (NULL);
2209   }
2211   pollfds_num = listen_fds_num;
2212   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2213   if (pollfds == NULL)
2214   {
2215     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2216     return (NULL);
2217   }
2218   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2220   RRDD_LOG(LOG_INFO, "listening for connections");
2222   while (do_shutdown == 0)
2223   {
2224     assert (pollfds_num == ((int) listen_fds_num));
2225     for (i = 0; i < pollfds_num; i++)
2226     {
2227       pollfds[i].fd = listen_fds[i].fd;
2228       pollfds[i].events = POLLIN | POLLPRI;
2229       pollfds[i].revents = 0;
2230     }
2232     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2233     if (do_shutdown)
2234       break;
2235     else if (status == 0) /* timeout */
2236       continue;
2237     else if (status < 0) /* error */
2238     {
2239       status = errno;
2240       if (status != EINTR)
2241       {
2242         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2243       }
2244       continue;
2245     }
2247     for (i = 0; i < pollfds_num; i++)
2248     {
2249       listen_socket_t *client_sock;
2250       struct sockaddr_storage client_sa;
2251       socklen_t client_sa_size;
2252       pthread_t tid;
2253       pthread_attr_t attr;
2255       if (pollfds[i].revents == 0)
2256         continue;
2258       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2259       {
2260         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2261             "poll(2) returned something unexpected for listen FD #%i.",
2262             pollfds[i].fd);
2263         continue;
2264       }
2266       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2267       if (client_sock == NULL)
2268       {
2269         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2270         continue;
2271       }
2272       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2274       client_sa_size = sizeof (client_sa);
2275       client_sock->fd = accept (pollfds[i].fd,
2276           (struct sockaddr *) &client_sa, &client_sa_size);
2277       if (client_sock->fd < 0)
2278       {
2279         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2280         free(client_sock);
2281         continue;
2282       }
2284       pthread_attr_init (&attr);
2285       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2287       status = pthread_create (&tid, &attr, connection_thread_main,
2288                                client_sock);
2289       if (status != 0)
2290       {
2291         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2292         close_connection(client_sock);
2293         continue;
2294       }
2295     } /* for (pollfds_num) */
2296   } /* while (do_shutdown == 0) */
2298   RRDD_LOG(LOG_INFO, "starting shutdown");
2300   close_listen_sockets ();
2302   pthread_mutex_lock (&connection_threads_lock);
2303   while (connection_threads_num > 0)
2304   {
2305     pthread_t wait_for;
2307     wait_for = connection_threads[0];
2309     pthread_mutex_unlock (&connection_threads_lock);
2310     pthread_join (wait_for, /* retval = */ NULL);
2311     pthread_mutex_lock (&connection_threads_lock);
2312   }
2313   pthread_mutex_unlock (&connection_threads_lock);
2315   return (NULL);
2316 } /* }}} void *listen_thread_main */
2318 static int daemonize (void) /* {{{ */
2320   int status;
2321   int fd;
2322   char *base_dir;
2324   daemon_uid = geteuid();
2326   fd = open_pidfile();
2327   if (fd < 0) return fd;
2329   if (!stay_foreground)
2330   {
2331     pid_t child;
2333     child = fork ();
2334     if (child < 0)
2335     {
2336       fprintf (stderr, "daemonize: fork(2) failed.\n");
2337       return (-1);
2338     }
2339     else if (child > 0)
2340     {
2341       return (1);
2342     }
2344     /* Become session leader */
2345     setsid ();
2347     /* Open the first three file descriptors to /dev/null */
2348     close (2);
2349     close (1);
2350     close (0);
2352     open ("/dev/null", O_RDWR);
2353     dup (0);
2354     dup (0);
2355   } /* if (!stay_foreground) */
2357   /* Change into the /tmp directory. */
2358   base_dir = (config_base_dir != NULL)
2359     ? config_base_dir
2360     : "/tmp";
2361   status = chdir (base_dir);
2362   if (status != 0)
2363   {
2364     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2365     return (-1);
2366   }
2368   install_signal_handlers();
2370   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2371   RRDD_LOG(LOG_INFO, "starting up");
2373   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2374   if (cache_tree == NULL)
2375   {
2376     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2377     return (-1);
2378   }
2380   status = write_pidfile (fd);
2381   return status;
2382 } /* }}} int daemonize */
2384 static int cleanup (void) /* {{{ */
2386   do_shutdown++;
2388   pthread_cond_signal (&cache_cond);
2389   pthread_join (queue_thread, /* return = */ NULL);
2391   remove_pidfile ();
2393   RRDD_LOG(LOG_INFO, "goodbye");
2394   closelog ();
2396   return (0);
2397 } /* }}} int cleanup */
2399 static int read_options (int argc, char **argv) /* {{{ */
2401   int option;
2402   int status = 0;
2404   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2405   {
2406     switch (option)
2407     {
2408       case 'g':
2409         stay_foreground=1;
2410         break;
2412       case 'L':
2413       case 'l':
2414       {
2415         listen_socket_t **temp;
2416         listen_socket_t *new;
2418         new = malloc(sizeof(listen_socket_t));
2419         if (new == NULL)
2420         {
2421           fprintf(stderr, "read_options: malloc failed.\n");
2422           return(2);
2423         }
2424         memset(new, 0, sizeof(listen_socket_t));
2426         temp = (listen_socket_t **) realloc (config_listen_address_list,
2427             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2428         if (temp == NULL)
2429         {
2430           fprintf (stderr, "read_options: realloc failed.\n");
2431           return (2);
2432         }
2433         config_listen_address_list = temp;
2435         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2436         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2438         temp[config_listen_address_list_len] = new;
2439         config_listen_address_list_len++;
2440       }
2441       break;
2443       case 'f':
2444       {
2445         int temp;
2447         temp = atoi (optarg);
2448         if (temp > 0)
2449           config_flush_interval = temp;
2450         else
2451         {
2452           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2453           status = 3;
2454         }
2455       }
2456       break;
2458       case 'w':
2459       {
2460         int temp;
2462         temp = atoi (optarg);
2463         if (temp > 0)
2464           config_write_interval = temp;
2465         else
2466         {
2467           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2468           status = 2;
2469         }
2470       }
2471       break;
2473       case 'z':
2474       {
2475         int temp;
2477         temp = atoi(optarg);
2478         if (temp > 0)
2479           config_write_jitter = temp;
2480         else
2481         {
2482           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2483           status = 2;
2484         }
2486         break;
2487       }
2489       case 'B':
2490         config_write_base_only = 1;
2491         break;
2493       case 'b':
2494       {
2495         size_t len;
2497         if (config_base_dir != NULL)
2498           free (config_base_dir);
2499         config_base_dir = strdup (optarg);
2500         if (config_base_dir == NULL)
2501         {
2502           fprintf (stderr, "read_options: strdup failed.\n");
2503           return (3);
2504         }
2506         len = strlen (config_base_dir);
2507         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2508         {
2509           config_base_dir[len - 1] = 0;
2510           len--;
2511         }
2513         if (len < 1)
2514         {
2515           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2516           return (4);
2517         }
2519         _config_base_dir_len = len;
2520       }
2521       break;
2523       case 'p':
2524       {
2525         if (config_pid_file != NULL)
2526           free (config_pid_file);
2527         config_pid_file = strdup (optarg);
2528         if (config_pid_file == NULL)
2529         {
2530           fprintf (stderr, "read_options: strdup failed.\n");
2531           return (3);
2532         }
2533       }
2534       break;
2536       case 'F':
2537         config_flush_at_shutdown = 1;
2538         break;
2540       case 'j':
2541       {
2542         struct stat statbuf;
2543         const char *dir = optarg;
2545         status = stat(dir, &statbuf);
2546         if (status != 0)
2547         {
2548           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2549           return 6;
2550         }
2552         if (!S_ISDIR(statbuf.st_mode)
2553             || access(dir, R_OK|W_OK|X_OK) != 0)
2554         {
2555           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2556                   errno ? rrd_strerror(errno) : "");
2557           return 6;
2558         }
2560         journal_cur = malloc(PATH_MAX + 1);
2561         journal_old = malloc(PATH_MAX + 1);
2562         if (journal_cur == NULL || journal_old == NULL)
2563         {
2564           fprintf(stderr, "malloc failure for journal files\n");
2565           return 6;
2566         }
2567         else 
2568         {
2569           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2570           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2571         }
2572       }
2573       break;
2575       case 'h':
2576       case '?':
2577         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2578             "\n"
2579             "Usage: rrdcached [options]\n"
2580             "\n"
2581             "Valid options are:\n"
2582             "  -l <address>  Socket address to listen to.\n"
2583             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2584             "  -w <seconds>  Interval in which to write data.\n"
2585             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2586             "  -f <seconds>  Interval in which to flush dead data.\n"
2587             "  -p <file>     Location of the PID-file.\n"
2588             "  -b <dir>      Base directory to change to.\n"
2589             "  -B            Restrict file access to paths within -b <dir>\n"
2590             "  -g            Do not fork and run in the foreground.\n"
2591             "  -j <dir>      Directory in which to create the journal files.\n"
2592             "  -F            Always flush all updates at shutdown\n"
2593             "\n"
2594             "For more information and a detailed description of all options "
2595             "please refer\n"
2596             "to the rrdcached(1) manual page.\n",
2597             VERSION);
2598         status = -1;
2599         break;
2600     } /* switch (option) */
2601   } /* while (getopt) */
2603   /* advise the user when values are not sane */
2604   if (config_flush_interval < 2 * config_write_interval)
2605     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2606             " 2x write interval (-w) !\n");
2607   if (config_write_jitter > config_write_interval)
2608     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2609             " write interval (-w) !\n");
2611   if (config_write_base_only && config_base_dir == NULL)
2612     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2613             "  Consult the rrdcached documentation\n");
2615   if (journal_cur == NULL)
2616     config_flush_at_shutdown = 1;
2618   return (status);
2619 } /* }}} int read_options */
2621 int main (int argc, char **argv)
2623   int status;
2625   status = read_options (argc, argv);
2626   if (status != 0)
2627   {
2628     if (status < 0)
2629       status = 0;
2630     return (status);
2631   }
2633   status = daemonize ();
2634   if (status == 1)
2635   {
2636     struct sigaction sigchld;
2638     memset (&sigchld, 0, sizeof (sigchld));
2639     sigchld.sa_handler = SIG_IGN;
2640     sigaction (SIGCHLD, &sigchld, NULL);
2642     return (0);
2643   }
2644   else if (status != 0)
2645   {
2646     fprintf (stderr, "daemonize failed, exiting.\n");
2647     return (1);
2648   }
2650   journal_init();
2652   /* start the queue thread */
2653   memset (&queue_thread, 0, sizeof (queue_thread));
2654   status = pthread_create (&queue_thread,
2655                            NULL, /* attr */
2656                            queue_thread_main,
2657                            NULL); /* args */
2658   if (status != 0)
2659   {
2660     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2661     cleanup();
2662     return (1);
2663   }
2665   listen_thread_main (NULL);
2666   cleanup ();
2668   return (0);
2669 } /* int main */
2671 /*
2672  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2673  */