Code

folding fix
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 typedef enum
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
110 struct listen_socket_s
112   int fd;
113   char addr[PATH_MAX + 1];
114   int family;
115   socket_privilege privilege;
116 };
117 typedef struct listen_socket_s listen_socket_t;
119 struct cache_item_s;
120 typedef struct cache_item_s cache_item_t;
121 struct cache_item_s
123   char *file;
124   char **values;
125   int values_num;
126   time_t last_flush_time;
127 #define CI_FLAGS_IN_TREE  (1<<0)
128 #define CI_FLAGS_IN_QUEUE (1<<1)
129   int flags;
130   pthread_cond_t  flushed;
131   cache_item_t *prev;
132   cache_item_t *next;
133 };
135 struct callback_flush_data_s
137   time_t now;
138   time_t abs_timeout;
139   char **keys;
140   size_t keys_num;
141 };
142 typedef struct callback_flush_data_s callback_flush_data_t;
144 enum queue_side_e
146   HEAD,
147   TAIL
148 };
149 typedef enum queue_side_e queue_side_t;
151 /* max length of socket command or response */
152 #define CMD_MAX 4096
154 /*
155  * Variables
156  */
157 static int stay_foreground = 0;
159 static listen_socket_t *listen_fds = NULL;
160 static size_t listen_fds_num = 0;
162 static int do_shutdown = 0;
164 static pthread_t queue_thread;
166 static pthread_t *connection_threads = NULL;
167 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
168 static int connection_threads_num = 0;
170 /* Cache stuff */
171 static GTree          *cache_tree = NULL;
172 static cache_item_t   *cache_queue_head = NULL;
173 static cache_item_t   *cache_queue_tail = NULL;
174 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
175 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
177 static int config_write_interval = 300;
178 static int config_write_jitter   = 0;
179 static int config_flush_interval = 3600;
180 static int config_flush_at_shutdown = 0;
181 static char *config_pid_file = NULL;
182 static char *config_base_dir = NULL;
183 static size_t _config_base_dir_len = 0;
184 static int config_write_base_only = 0;
186 static listen_socket_t **config_listen_address_list = NULL;
187 static int config_listen_address_list_len = 0;
189 static uint64_t stats_queue_length = 0;
190 static uint64_t stats_updates_received = 0;
191 static uint64_t stats_flush_received = 0;
192 static uint64_t stats_updates_written = 0;
193 static uint64_t stats_data_sets_written = 0;
194 static uint64_t stats_journal_bytes = 0;
195 static uint64_t stats_journal_rotate = 0;
196 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
198 /* Journaled updates */
199 static char *journal_cur = NULL;
200 static char *journal_old = NULL;
201 static FILE *journal_fh = NULL;
202 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
203 static int journal_write(char *cmd, char *args);
204 static void journal_done(void);
205 static void journal_rotate(void);
207 /* 
208  * Functions
209  */
210 static void sig_common (const char *sig) /* {{{ */
212   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
213   do_shutdown++;
214   pthread_cond_broadcast(&cache_cond);
215 } /* }}} void sig_common */
217 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
219   sig_common("INT");
220 } /* }}} void sig_int_handler */
222 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
224   sig_common("TERM");
225 } /* }}} void sig_term_handler */
227 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
229   config_flush_at_shutdown = 1;
230   sig_common("USR1");
231 } /* }}} void sig_usr1_handler */
233 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
235   config_flush_at_shutdown = 0;
236   sig_common("USR2");
237 } /* }}} void sig_usr2_handler */
239 static void install_signal_handlers(void) /* {{{ */
241   /* These structures are static, because `sigaction' behaves weird if the are
242    * overwritten.. */
243   static struct sigaction sa_int;
244   static struct sigaction sa_term;
245   static struct sigaction sa_pipe;
246   static struct sigaction sa_usr1;
247   static struct sigaction sa_usr2;
249   /* Install signal handlers */
250   memset (&sa_int, 0, sizeof (sa_int));
251   sa_int.sa_handler = sig_int_handler;
252   sigaction (SIGINT, &sa_int, NULL);
254   memset (&sa_term, 0, sizeof (sa_term));
255   sa_term.sa_handler = sig_term_handler;
256   sigaction (SIGTERM, &sa_term, NULL);
258   memset (&sa_pipe, 0, sizeof (sa_pipe));
259   sa_pipe.sa_handler = SIG_IGN;
260   sigaction (SIGPIPE, &sa_pipe, NULL);
262   memset (&sa_pipe, 0, sizeof (sa_usr1));
263   sa_usr1.sa_handler = sig_usr1_handler;
264   sigaction (SIGUSR1, &sa_usr1, NULL);
266   memset (&sa_usr2, 0, sizeof (sa_usr2));
267   sa_usr2.sa_handler = sig_usr2_handler;
268   sigaction (SIGUSR2, &sa_usr2, NULL);
270 } /* }}} void install_signal_handlers */
272 static int open_pidfile(void) /* {{{ */
274   int fd;
275   char *file;
277   file = (config_pid_file != NULL)
278     ? config_pid_file
279     : LOCALSTATEDIR "/run/rrdcached.pid";
281   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
282   if (fd < 0)
283     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
284             file, rrd_strerror(errno));
286   return(fd);
287 } /* }}} static int open_pidfile */
289 static int write_pidfile (int fd) /* {{{ */
291   pid_t pid;
292   FILE *fh;
294   pid = getpid ();
296   fh = fdopen (fd, "w");
297   if (fh == NULL)
298   {
299     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
300     close(fd);
301     return (-1);
302   }
304   fprintf (fh, "%i\n", (int) pid);
305   fclose (fh);
307   return (0);
308 } /* }}} int write_pidfile */
310 static int remove_pidfile (void) /* {{{ */
312   char *file;
313   int status;
315   file = (config_pid_file != NULL)
316     ? config_pid_file
317     : LOCALSTATEDIR "/run/rrdcached.pid";
319   status = unlink (file);
320   if (status == 0)
321     return (0);
322   return (errno);
323 } /* }}} int remove_pidfile */
325 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
327   char    *buffer;
328   size_t   buffer_used;
329   size_t   buffer_free;
330   ssize_t  status;
332   buffer       = (char *) buffer_void;
333   buffer_used  = 0;
334   buffer_free  = buffer_size;
336   while (buffer_free > 0)
337   {
338     status = read (fd, buffer + buffer_used, buffer_free);
339     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
340       continue;
342     if (status < 0)
343       return (-1);
345     if (status == 0)
346       return (0);
348     assert ((0 > status) || (buffer_free >= (size_t) status));
350     buffer_free = buffer_free - status;
351     buffer_used = buffer_used + status;
353     if (buffer[buffer_used - 1] == '\n')
354       break;
355   }
357   assert (buffer_used > 0);
359   if (buffer[buffer_used - 1] != '\n')
360   {
361     errno = ENOBUFS;
362     return (-1);
363   }
365   buffer[buffer_used - 1] = 0;
367   /* Fix network line endings. */
368   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
369   {
370     buffer_used--;
371     buffer[buffer_used - 1] = 0;
372   }
374   return (buffer_used);
375 } /* }}} ssize_t sread */
377 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
379   const char *ptr;
380   size_t      nleft;
381   ssize_t     status;
383   /* special case for journal replay */
384   if (fd < 0) return 0;
386   ptr   = (const char *) buf;
387   nleft = count;
389   while (nleft > 0)
390   {
391     status = write (fd, (const void *) ptr, nleft);
393     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
394       continue;
396     if (status < 0)
397       return (status);
399     nleft -= status;
400     ptr   += status;
401   }
403   return (0);
404 } /* }}} ssize_t swrite */
406 static void wipe_ci_values(cache_item_t *ci, time_t when)
408   ci->values = NULL;
409   ci->values_num = 0;
411   ci->last_flush_time = when;
412   if (config_write_jitter > 0)
413     ci->last_flush_time += (random() % config_write_jitter);
416 /* remove_from_queue
417  * remove a "cache_item_t" item from the queue.
418  * must hold 'cache_lock' when calling this
419  */
420 static void remove_from_queue(cache_item_t *ci) /* {{{ */
422   if (ci == NULL) return;
424   if (ci->prev == NULL)
425     cache_queue_head = ci->next; /* reset head */
426   else
427     ci->prev->next = ci->next;
429   if (ci->next == NULL)
430     cache_queue_tail = ci->prev; /* reset the tail */
431   else
432     ci->next->prev = ci->prev;
434   ci->next = ci->prev = NULL;
435   ci->flags &= ~CI_FLAGS_IN_QUEUE;
436 } /* }}} static void remove_from_queue */
438 /*
439  * enqueue_cache_item:
440  * `cache_lock' must be acquired before calling this function!
441  */
442 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
443     queue_side_t side)
445   if (ci == NULL)
446     return (-1);
448   if (ci->values_num == 0)
449     return (0);
451   if (side == HEAD)
452   {
453     if (cache_queue_head == ci)
454       return 0;
456     /* remove from the double linked list */
457     if (ci->flags & CI_FLAGS_IN_QUEUE)
458       remove_from_queue(ci);
460     ci->prev = NULL;
461     ci->next = cache_queue_head;
462     if (ci->next != NULL)
463       ci->next->prev = ci;
464     cache_queue_head = ci;
466     if (cache_queue_tail == NULL)
467       cache_queue_tail = cache_queue_head;
468   }
469   else /* (side == TAIL) */
470   {
471     /* We don't move values back in the list.. */
472     if (ci->flags & CI_FLAGS_IN_QUEUE)
473       return (0);
475     assert (ci->next == NULL);
476     assert (ci->prev == NULL);
478     ci->prev = cache_queue_tail;
480     if (cache_queue_tail == NULL)
481       cache_queue_head = ci;
482     else
483       cache_queue_tail->next = ci;
485     cache_queue_tail = ci;
486   }
488   ci->flags |= CI_FLAGS_IN_QUEUE;
490   pthread_cond_broadcast(&cache_cond);
491   pthread_mutex_lock (&stats_lock);
492   stats_queue_length++;
493   pthread_mutex_unlock (&stats_lock);
495   return (0);
496 } /* }}} int enqueue_cache_item */
498 /*
499  * tree_callback_flush:
500  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
501  * while this is in progress.
502  */
503 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
504     gpointer data)
506   cache_item_t *ci;
507   callback_flush_data_t *cfd;
509   ci = (cache_item_t *) value;
510   cfd = (callback_flush_data_t *) data;
512   if ((ci->last_flush_time <= cfd->abs_timeout)
513       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
514       && (ci->values_num > 0))
515   {
516     enqueue_cache_item (ci, TAIL);
517   }
518   else if ((do_shutdown != 0)
519       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
520       && (ci->values_num > 0))
521   {
522     enqueue_cache_item (ci, TAIL);
523   }
524   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
525       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
526       && (ci->values_num <= 0))
527   {
528     char **temp;
530     temp = (char **) realloc (cfd->keys,
531         sizeof (char *) * (cfd->keys_num + 1));
532     if (temp == NULL)
533     {
534       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
535       return (FALSE);
536     }
537     cfd->keys = temp;
538     /* Make really sure this points to the _same_ place */
539     assert ((char *) key == ci->file);
540     cfd->keys[cfd->keys_num] = (char *) key;
541     cfd->keys_num++;
542   }
544   return (FALSE);
545 } /* }}} gboolean tree_callback_flush */
547 static int flush_old_values (int max_age)
549   callback_flush_data_t cfd;
550   size_t k;
552   memset (&cfd, 0, sizeof (cfd));
553   /* Pass the current time as user data so that we don't need to call
554    * `time' for each node. */
555   cfd.now = time (NULL);
556   cfd.keys = NULL;
557   cfd.keys_num = 0;
559   if (max_age > 0)
560     cfd.abs_timeout = cfd.now - max_age;
561   else
562     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
564   /* `tree_callback_flush' will return the keys of all values that haven't
565    * been touched in the last `config_flush_interval' seconds in `cfd'.
566    * The char*'s in this array point to the same memory as ci->file, so we
567    * don't need to free them separately. */
568   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
570   for (k = 0; k < cfd.keys_num; k++)
571   {
572     cache_item_t *ci;
574     /* This must not fail. */
575     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
576     assert (ci != NULL);
578     /* If we end up here with values available, something's seriously
579      * messed up. */
580     assert (ci->values_num == 0);
582     /* Remove the node from the tree */
583     g_tree_remove (cache_tree, cfd.keys[k]);
584     cfd.keys[k] = NULL;
586     /* Now free and clean up `ci'. */
587     free (ci->file);
588     ci->file = NULL;
589     free (ci);
590     ci = NULL;
591   } /* for (k = 0; k < cfd.keys_num; k++) */
593   if (cfd.keys != NULL)
594   {
595     free (cfd.keys);
596     cfd.keys = NULL;
597   }
599   return (0);
600 } /* int flush_old_values */
602 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
604   struct timeval now;
605   struct timespec next_flush;
606   int final_flush = 0; /* make sure we only flush once on shutdown */
608   gettimeofday (&now, NULL);
609   next_flush.tv_sec = now.tv_sec + config_flush_interval;
610   next_flush.tv_nsec = 1000 * now.tv_usec;
612   pthread_mutex_lock (&cache_lock);
613   while ((do_shutdown == 0) || (cache_queue_head != NULL))
614   {
615     cache_item_t *ci;
616     char *file;
617     char **values;
618     int values_num;
619     int status;
620     int i;
622     /* First, check if it's time to do the cache flush. */
623     gettimeofday (&now, NULL);
624     if ((now.tv_sec > next_flush.tv_sec)
625         || ((now.tv_sec == next_flush.tv_sec)
626           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
627     {
628       /* Flush all values that haven't been written in the last
629        * `config_write_interval' seconds. */
630       flush_old_values (config_write_interval);
632       /* Determine the time of the next cache flush. */
633       while (next_flush.tv_sec <= now.tv_sec)
634         next_flush.tv_sec += config_flush_interval;
636       /* unlock the cache while we rotate so we don't block incoming
637        * updates if the fsync() blocks on disk I/O */
638       pthread_mutex_unlock(&cache_lock);
639       journal_rotate();
640       pthread_mutex_lock(&cache_lock);
641     }
643     /* Now, check if there's something to store away. If not, wait until
644      * something comes in or it's time to do the cache flush.  if we are
645      * shutting down, do not wait around.  */
646     if (cache_queue_head == NULL && !do_shutdown)
647     {
648       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
649       if ((status != 0) && (status != ETIMEDOUT))
650       {
651         RRDD_LOG (LOG_ERR, "queue_thread_main: "
652             "pthread_cond_timedwait returned %i.", status);
653       }
654     }
656     /* We're about to shut down */
657     if (do_shutdown != 0 && !final_flush++)
658     {
659       if (config_flush_at_shutdown)
660         flush_old_values (-1); /* flush everything */
661       else
662         break;
663     }
665     /* Check if a value has arrived. This may be NULL if we timed out or there
666      * was an interrupt such as a signal. */
667     if (cache_queue_head == NULL)
668       continue;
670     ci = cache_queue_head;
672     /* copy the relevant parts */
673     file = strdup (ci->file);
674     if (file == NULL)
675     {
676       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
677       continue;
678     }
680     assert(ci->values != NULL);
681     assert(ci->values_num > 0);
683     values = ci->values;
684     values_num = ci->values_num;
686     wipe_ci_values(ci, time(NULL));
687     remove_from_queue(ci);
689     pthread_mutex_lock (&stats_lock);
690     assert (stats_queue_length > 0);
691     stats_queue_length--;
692     pthread_mutex_unlock (&stats_lock);
694     pthread_mutex_unlock (&cache_lock);
696     rrd_clear_error ();
697     status = rrd_update_r (file, NULL, values_num, (void *) values);
698     if (status != 0)
699     {
700       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
701           "rrd_update_r (%s) failed with status %i. (%s)",
702           file, status, rrd_get_error());
703     }
705     journal_write("wrote", file);
706     pthread_cond_broadcast(&ci->flushed);
708     for (i = 0; i < values_num; i++)
709       free (values[i]);
711     free(values);
712     free(file);
714     if (status == 0)
715     {
716       pthread_mutex_lock (&stats_lock);
717       stats_updates_written++;
718       stats_data_sets_written += values_num;
719       pthread_mutex_unlock (&stats_lock);
720     }
722     pthread_mutex_lock (&cache_lock);
724     /* We're about to shut down */
725     if (do_shutdown != 0 && !final_flush++)
726     {
727       if (config_flush_at_shutdown)
728           flush_old_values (-1); /* flush everything */
729       else
730         break;
731     }
732   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
733   pthread_mutex_unlock (&cache_lock);
735   if (config_flush_at_shutdown)
736   {
737     assert(cache_queue_head == NULL);
738     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
739   }
741   journal_done();
743   return (NULL);
744 } /* }}} void *queue_thread_main */
746 static int buffer_get_field (char **buffer_ret, /* {{{ */
747     size_t *buffer_size_ret, char **field_ret)
749   char *buffer;
750   size_t buffer_pos;
751   size_t buffer_size;
752   char *field;
753   size_t field_size;
754   int status;
756   buffer = *buffer_ret;
757   buffer_pos = 0;
758   buffer_size = *buffer_size_ret;
759   field = *buffer_ret;
760   field_size = 0;
762   if (buffer_size <= 0)
763     return (-1);
765   /* This is ensured by `handle_request'. */
766   assert (buffer[buffer_size - 1] == '\0');
768   status = -1;
769   while (buffer_pos < buffer_size)
770   {
771     /* Check for end-of-field or end-of-buffer */
772     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
773     {
774       field[field_size] = 0;
775       field_size++;
776       buffer_pos++;
777       status = 0;
778       break;
779     }
780     /* Handle escaped characters. */
781     else if (buffer[buffer_pos] == '\\')
782     {
783       if (buffer_pos >= (buffer_size - 1))
784         break;
785       buffer_pos++;
786       field[field_size] = buffer[buffer_pos];
787       field_size++;
788       buffer_pos++;
789     }
790     /* Normal operation */ 
791     else
792     {
793       field[field_size] = buffer[buffer_pos];
794       field_size++;
795       buffer_pos++;
796     }
797   } /* while (buffer_pos < buffer_size) */
799   if (status != 0)
800     return (status);
802   *buffer_ret = buffer + buffer_pos;
803   *buffer_size_ret = buffer_size - buffer_pos;
804   *field_ret = field;
806   return (0);
807 } /* }}} int buffer_get_field */
809 /* if we're restricting writes to the base directory,
810  * check whether the file falls within the dir
811  * returns 1 if OK, otherwise 0
812  */
813 static int check_file_access (const char *file, int fd) /* {{{ */
815   char error[CMD_MAX];
816   assert(file != NULL);
818   if (!config_write_base_only
819       || fd < 0 /* journal replay */
820       || config_base_dir == NULL)
821     return 1;
823   if (strstr(file, "../") != NULL) goto err;
825   /* relative paths without "../" are ok */
826   if (*file != '/') return 1;
828   /* file must be of the format base + "/" + <1+ char filename> */
829   if (strlen(file) < _config_base_dir_len + 2) goto err;
830   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
831   if (*(file + _config_base_dir_len) != '/') goto err;
833   return 1;
835 err:
836   snprintf(error, sizeof(error)-1, "-1 %s\n", rrd_strerror(EACCES));
837   swrite(fd, error, strlen(error));
838   return 0;
839 } /* }}} static int check_file_access */
841 static int flush_file (const char *filename) /* {{{ */
843   cache_item_t *ci;
845   pthread_mutex_lock (&cache_lock);
847   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
848   if (ci == NULL)
849   {
850     pthread_mutex_unlock (&cache_lock);
851     return (ENOENT);
852   }
854   if (ci->values_num > 0)
855   {
856     /* Enqueue at head */
857     enqueue_cache_item (ci, HEAD);
858     pthread_cond_wait(&ci->flushed, &cache_lock);
859   }
861   pthread_mutex_unlock(&cache_lock);
863   return (0);
864 } /* }}} int flush_file */
866 static int handle_request_help (int fd, /* {{{ */
867     char *buffer, size_t buffer_size)
869   int status;
870   char **help_text;
871   size_t help_text_len;
872   char *command;
873   size_t i;
875   char *help_help[] =
876   {
877     "5 Command overview\n",
878     "FLUSH <filename>\n",
879     "FLUSHALL\n",
880     "HELP [<command>]\n",
881     "UPDATE <filename> <values> [<values> ...]\n",
882     "STATS\n"
883   };
884   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
886   char *help_flush[] =
887   {
888     "4 Help for FLUSH\n",
889     "Usage: FLUSH <filename>\n",
890     "\n",
891     "Adds the given filename to the head of the update queue and returns\n",
892     "after is has been dequeued.\n"
893   };
894   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
896   char *help_flushall[] =
897   {
898     "3 Help for FLUSHALL\n",
899     "Usage: FLUSHALL\n",
900     "\n",
901     "Triggers writing of all pending updates.  Returns immediately.\n"
902   };
903   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
905   char *help_update[] =
906   {
907     "9 Help for UPDATE\n",
908     "Usage: UPDATE <filename> <values> [<values> ...]\n"
909     "\n",
910     "Adds the given file to the internal cache if it is not yet known and\n",
911     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
912     "for details.\n",
913     "\n",
914     "Each <values> has the following form:\n",
915     "  <values> = <time>:<value>[:<value>[...]]\n",
916     "See the rrdupdate(1) manpage for details.\n"
917   };
918   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
920   char *help_stats[] =
921   {
922     "4 Help for STATS\n",
923     "Usage: STATS\n",
924     "\n",
925     "Returns some performance counters, see the rrdcached(1) manpage for\n",
926     "a description of the values.\n"
927   };
928   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
930   status = buffer_get_field (&buffer, &buffer_size, &command);
931   if (status != 0)
932   {
933     help_text = help_help;
934     help_text_len = help_help_len;
935   }
936   else
937   {
938     if (strcasecmp (command, "update") == 0)
939     {
940       help_text = help_update;
941       help_text_len = help_update_len;
942     }
943     else if (strcasecmp (command, "flush") == 0)
944     {
945       help_text = help_flush;
946       help_text_len = help_flush_len;
947     }
948     else if (strcasecmp (command, "flushall") == 0)
949     {
950       help_text = help_flushall;
951       help_text_len = help_flushall_len;
952     }
953     else if (strcasecmp (command, "stats") == 0)
954     {
955       help_text = help_stats;
956       help_text_len = help_stats_len;
957     }
958     else
959     {
960       help_text = help_help;
961       help_text_len = help_help_len;
962     }
963   }
965   for (i = 0; i < help_text_len; i++)
966   {
967     status = swrite (fd, help_text[i], strlen (help_text[i]));
968     if (status < 0)
969     {
970       status = errno;
971       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
972       return (status);
973     }
974   }
976   return (0);
977 } /* }}} int handle_request_help */
979 static int handle_request_stats (int fd, /* {{{ */
980     char *buffer __attribute__((unused)),
981     size_t buffer_size __attribute__((unused)))
983   int status;
984   char outbuf[CMD_MAX];
986   uint64_t copy_queue_length;
987   uint64_t copy_updates_received;
988   uint64_t copy_flush_received;
989   uint64_t copy_updates_written;
990   uint64_t copy_data_sets_written;
991   uint64_t copy_journal_bytes;
992   uint64_t copy_journal_rotate;
994   uint64_t tree_nodes_number;
995   uint64_t tree_depth;
997   pthread_mutex_lock (&stats_lock);
998   copy_queue_length       = stats_queue_length;
999   copy_updates_received   = stats_updates_received;
1000   copy_flush_received     = stats_flush_received;
1001   copy_updates_written    = stats_updates_written;
1002   copy_data_sets_written  = stats_data_sets_written;
1003   copy_journal_bytes      = stats_journal_bytes;
1004   copy_journal_rotate     = stats_journal_rotate;
1005   pthread_mutex_unlock (&stats_lock);
1007   pthread_mutex_lock (&cache_lock);
1008   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1009   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1010   pthread_mutex_unlock (&cache_lock);
1012 #define RRDD_STATS_SEND \
1013   outbuf[sizeof (outbuf) - 1] = 0; \
1014   status = swrite (fd, outbuf, strlen (outbuf)); \
1015   if (status < 0) \
1016   { \
1017     status = errno; \
1018     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
1019     return (status); \
1020   }
1022   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
1023   RRDD_STATS_SEND;
1025   snprintf (outbuf, sizeof (outbuf),
1026       "QueueLength: %"PRIu64"\n", copy_queue_length);
1027   RRDD_STATS_SEND;
1029   snprintf (outbuf, sizeof (outbuf),
1030       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1031   RRDD_STATS_SEND;
1033   snprintf (outbuf, sizeof (outbuf),
1034       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1035   RRDD_STATS_SEND;
1037   snprintf (outbuf, sizeof (outbuf),
1038       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1039   RRDD_STATS_SEND;
1041   snprintf (outbuf, sizeof (outbuf),
1042       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1043   RRDD_STATS_SEND;
1045   snprintf (outbuf, sizeof (outbuf),
1046       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1047   RRDD_STATS_SEND;
1049   snprintf (outbuf, sizeof (outbuf),
1050       "TreeDepth: %"PRIu64"\n", tree_depth);
1051   RRDD_STATS_SEND;
1053   snprintf (outbuf, sizeof(outbuf),
1054       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1055   RRDD_STATS_SEND;
1057   snprintf (outbuf, sizeof(outbuf),
1058       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1059   RRDD_STATS_SEND;
1061   return (0);
1062 #undef RRDD_STATS_SEND
1063 } /* }}} int handle_request_stats */
1065 static int handle_request_flush (int fd, /* {{{ */
1066     char *buffer, size_t buffer_size)
1068   char *file;
1069   int status;
1070   char result[CMD_MAX];
1072   status = buffer_get_field (&buffer, &buffer_size, &file);
1073   if (status != 0)
1074   {
1075     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1076   }
1077   else
1078   {
1079     pthread_mutex_lock(&stats_lock);
1080     stats_flush_received++;
1081     pthread_mutex_unlock(&stats_lock);
1083     if (!check_file_access(file, fd)) return 0;
1085     status = flush_file (file);
1086     if (status == 0)
1087       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1088     else if (status == ENOENT)
1089     {
1090       /* no file in our tree; see whether it exists at all */
1091       struct stat statbuf;
1093       memset(&statbuf, 0, sizeof(statbuf));
1094       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1095         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1096       else
1097         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1098     }
1099     else if (status < 0)
1100       strncpy (result, "-1 Internal error.\n", sizeof (result));
1101     else
1102       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1103   }
1104   result[sizeof (result) - 1] = 0;
1106   status = swrite (fd, result, strlen (result));
1107   if (status < 0)
1108   {
1109     status = errno;
1110     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1111     return (status);
1112   }
1114   return (0);
1115 } /* }}} int handle_request_slurp */
1117 static int handle_request_flushall(int fd) /* {{{ */
1119   int status;
1120   char answer[] ="0 Started flush.\n";
1122   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1124   pthread_mutex_lock(&cache_lock);
1125   flush_old_values(-1);
1126   pthread_mutex_unlock(&cache_lock);
1128   status = swrite(fd, answer, strlen(answer));
1129   if (status < 0)
1130   {
1131     status = errno;
1132     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1133   }
1135   return (status);
1136 } /* }}} static int handle_request_flushall */
1138 static int handle_request_update (int fd, /* {{{ */
1139     char *buffer, size_t buffer_size)
1141   char *file;
1142   int values_num = 0;
1143   int status;
1145   time_t now;
1147   cache_item_t *ci;
1148   char answer[CMD_MAX];
1150 #define RRDD_UPDATE_SEND \
1151   answer[sizeof (answer) - 1] = 0; \
1152   status = swrite (fd, answer, strlen (answer)); \
1153   if (status < 0) \
1154   { \
1155     status = errno; \
1156     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1157     return (status); \
1158   }
1160   now = time (NULL);
1162   status = buffer_get_field (&buffer, &buffer_size, &file);
1163   if (status != 0)
1164   {
1165     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1166         sizeof (answer));
1167     RRDD_UPDATE_SEND;
1168     return (0);
1169   }
1171   pthread_mutex_lock(&stats_lock);
1172   stats_updates_received++;
1173   pthread_mutex_unlock(&stats_lock);
1175   if (!check_file_access(file, fd)) return 0;
1177   pthread_mutex_lock (&cache_lock);
1178   ci = g_tree_lookup (cache_tree, file);
1180   if (ci == NULL) /* {{{ */
1181   {
1182     struct stat statbuf;
1184     /* don't hold the lock while we setup; stat(2) might block */
1185     pthread_mutex_unlock(&cache_lock);
1187     memset (&statbuf, 0, sizeof (statbuf));
1188     status = stat (file, &statbuf);
1189     if (status != 0)
1190     {
1191       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1193       status = errno;
1194       if (status == ENOENT)
1195         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1196       else
1197         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1198             status);
1199       RRDD_UPDATE_SEND;
1200       return (0);
1201     }
1202     if (!S_ISREG (statbuf.st_mode))
1203     {
1204       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1205       RRDD_UPDATE_SEND;
1206       return (0);
1207     }
1208     if (access(file, R_OK|W_OK) != 0)
1209     {
1210       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1211                 file, rrd_strerror(errno));
1212       RRDD_UPDATE_SEND;
1213       return (0);
1214     }
1216     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1217     if (ci == NULL)
1218     {
1219       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1221       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1222       RRDD_UPDATE_SEND;
1223       return (0);
1224     }
1225     memset (ci, 0, sizeof (cache_item_t));
1227     ci->file = strdup (file);
1228     if (ci->file == NULL)
1229     {
1230       free (ci);
1231       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1233       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1234       RRDD_UPDATE_SEND;
1235       return (0);
1236     }
1238     wipe_ci_values(ci, now);
1239     ci->flags = CI_FLAGS_IN_TREE;
1241     pthread_mutex_lock(&cache_lock);
1242     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1243   } /* }}} */
1244   assert (ci != NULL);
1246   while (buffer_size > 0)
1247   {
1248     char **temp;
1249     char *value;
1251     status = buffer_get_field (&buffer, &buffer_size, &value);
1252     if (status != 0)
1253     {
1254       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1255       break;
1256     }
1258     temp = (char **) realloc (ci->values,
1259         sizeof (char *) * (ci->values_num + 1));
1260     if (temp == NULL)
1261     {
1262       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1263       continue;
1264     }
1265     ci->values = temp;
1267     ci->values[ci->values_num] = strdup (value);
1268     if (ci->values[ci->values_num] == NULL)
1269     {
1270       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1271       continue;
1272     }
1273     ci->values_num++;
1275     values_num++;
1276   }
1278   if (((now - ci->last_flush_time) >= config_write_interval)
1279       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1280       && (ci->values_num > 0))
1281   {
1282     enqueue_cache_item (ci, TAIL);
1283   }
1285   pthread_mutex_unlock (&cache_lock);
1287   if (values_num < 1)
1288   {
1289     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1290   }
1291   else
1292   {
1293     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1294         (values_num == 1) ? "" : "s");
1295   }
1296   RRDD_UPDATE_SEND;
1297   return (0);
1298 #undef RRDD_UPDATE_SEND
1299 } /* }}} int handle_request_update */
1301 /* we came across a "WROTE" entry during journal replay.
1302  * throw away any values that we have accumulated for this file
1303  */
1304 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1305                                  const char *buffer,
1306                                  size_t buffer_size __attribute__((unused)))
1308   int i;
1309   cache_item_t *ci;
1310   const char *file = buffer;
1312   pthread_mutex_lock(&cache_lock);
1314   ci = g_tree_lookup(cache_tree, file);
1315   if (ci == NULL)
1316   {
1317     pthread_mutex_unlock(&cache_lock);
1318     return (0);
1319   }
1321   if (ci->values)
1322   {
1323     for (i=0; i < ci->values_num; i++)
1324       free(ci->values[i]);
1326     free(ci->values);
1327   }
1329   wipe_ci_values(ci, time(NULL));
1330   remove_from_queue(ci);
1332   pthread_mutex_unlock(&cache_lock);
1333   return (0);
1334 } /* }}} int handle_request_wrote */
1336 /* returns 1 if we have the required privilege level */
1337 static int has_privilege (socket_privilege priv, /* {{{ */
1338                           socket_privilege required, int fd)
1340   int status;
1341   char error[CMD_MAX];
1343   if (priv >= required)
1344     return 1;
1346   sprintf(error, "-1 %s\n", rrd_strerror(EACCES));
1347   status = swrite(fd, error, strlen(error));
1349   if (status < 0)
1350     return status;
1351   else
1352     return 0;
1353 } /* }}} static int has_privilege */
1355 /* if fd < 0, we are in journal replay mode */
1356 static int handle_request (int fd, socket_privilege privilege, /* {{{ */
1357                            char *buffer, size_t buffer_size)
1359   char *buffer_ptr;
1360   char *command;
1361   int status;
1363   assert (buffer[buffer_size - 1] == '\0');
1365   buffer_ptr = buffer;
1366   command = NULL;
1367   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1368   if (status != 0)
1369   {
1370     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1371     return (-1);
1372   }
1374   if (strcasecmp (command, "update") == 0)
1375   {
1376     status = has_privilege(privilege, PRIV_HIGH, fd);
1377     if (status <= 0)
1378       return status;
1380     /* don't re-write updates in replay mode */
1381     if (fd >= 0)
1382       journal_write(command, buffer_ptr);
1384     return (handle_request_update (fd, buffer_ptr, buffer_size));
1385   }
1386   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1387   {
1388     /* this is only valid in replay mode */
1389     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1390   }
1391   else if (strcasecmp (command, "flush") == 0)
1392   {
1393     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1394   }
1395   else if (strcasecmp (command, "flushall") == 0)
1396   {
1397     status = has_privilege(privilege, PRIV_HIGH, fd);
1398     if (status <= 0)
1399       return status;
1401     return (handle_request_flushall(fd));
1402   }
1403   else if (strcasecmp (command, "stats") == 0)
1404   {
1405     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1406   }
1407   else if (strcasecmp (command, "help") == 0)
1408   {
1409     return (handle_request_help (fd, buffer_ptr, buffer_size));
1410   }
1411   else
1412   {
1413     char result[CMD_MAX];
1415     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1416     result[sizeof (result) - 1] = 0;
1418     status = swrite (fd, result, strlen (result));
1419     if (status < 0)
1420     {
1421       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1422       return (-1);
1423     }
1424   }
1426   return (0);
1427 } /* }}} int handle_request */
1429 /* MUST NOT hold journal_lock before calling this */
1430 static void journal_rotate(void) /* {{{ */
1432   FILE *old_fh = NULL;
1434   if (journal_cur == NULL || journal_old == NULL)
1435     return;
1437   pthread_mutex_lock(&journal_lock);
1439   /* we rotate this way (rename before close) so that the we can release
1440    * the journal lock as fast as possible.  Journal writes to the new
1441    * journal can proceed immediately after the new file is opened.  The
1442    * fclose can then block without affecting new updates.
1443    */
1444   if (journal_fh != NULL)
1445   {
1446     old_fh = journal_fh;
1447     rename(journal_cur, journal_old);
1448     ++stats_journal_rotate;
1449   }
1451   journal_fh = fopen(journal_cur, "a");
1452   pthread_mutex_unlock(&journal_lock);
1454   if (old_fh != NULL)
1455     fclose(old_fh);
1457   if (journal_fh == NULL)
1458   {
1459     RRDD_LOG(LOG_CRIT,
1460              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1461              journal_cur, rrd_strerror(errno));
1463     RRDD_LOG(LOG_ERR,
1464              "JOURNALING DISABLED: All values will be flushed at shutdown");
1465     config_flush_at_shutdown = 1;
1466   }
1468 } /* }}} static void journal_rotate */
1470 static void journal_done(void) /* {{{ */
1472   if (journal_cur == NULL)
1473     return;
1475   pthread_mutex_lock(&journal_lock);
1476   if (journal_fh != NULL)
1477   {
1478     fclose(journal_fh);
1479     journal_fh = NULL;
1480   }
1482   if (config_flush_at_shutdown)
1483   {
1484     RRDD_LOG(LOG_INFO, "removing journals");
1485     unlink(journal_old);
1486     unlink(journal_cur);
1487   }
1488   else
1489   {
1490     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1491              "journals will be used at next startup");
1492   }
1494   pthread_mutex_unlock(&journal_lock);
1496 } /* }}} static void journal_done */
1498 static int journal_write(char *cmd, char *args) /* {{{ */
1500   int chars;
1502   if (journal_fh == NULL)
1503     return 0;
1505   pthread_mutex_lock(&journal_lock);
1506   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1507   pthread_mutex_unlock(&journal_lock);
1509   if (chars > 0)
1510   {
1511     pthread_mutex_lock(&stats_lock);
1512     stats_journal_bytes += chars;
1513     pthread_mutex_unlock(&stats_lock);
1514   }
1516   return chars;
1517 } /* }}} static int journal_write */
1519 static int journal_replay (const char *file) /* {{{ */
1521   FILE *fh;
1522   int entry_cnt = 0;
1523   int fail_cnt = 0;
1524   uint64_t line = 0;
1525   char entry[CMD_MAX];
1527   if (file == NULL) return 0;
1529   fh = fopen(file, "r");
1530   if (fh == NULL)
1531   {
1532     if (errno != ENOENT)
1533       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1534                file, rrd_strerror(errno));
1535     return 0;
1536   }
1537   else
1538     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1540   while(!feof(fh))
1541   {
1542     size_t entry_len;
1544     ++line;
1545     if (fgets(entry, sizeof(entry), fh) == NULL)
1546       break;
1547     entry_len = strlen(entry);
1549     /* check \n termination in case journal writing crashed mid-line */
1550     if (entry_len == 0)
1551       continue;
1552     else if (entry[entry_len - 1] != '\n')
1553     {
1554       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1555       ++fail_cnt;
1556       continue;
1557     }
1559     entry[entry_len - 1] = '\0';
1561     if (handle_request(-1, PRIV_HIGH, entry, entry_len) == 0)
1562       ++entry_cnt;
1563     else
1564       ++fail_cnt;
1565   }
1567   fclose(fh);
1569   if (entry_cnt > 0)
1570   {
1571     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1572              entry_cnt, fail_cnt);
1573     return 1;
1574   }
1575   else
1576     return 0;
1578 } /* }}} static int journal_replay */
1580 static void *connection_thread_main (void *args) /* {{{ */
1582   pthread_t self;
1583   listen_socket_t *sock;
1584   int i;
1585   int fd;
1587   sock = (listen_socket_t *) args;
1588   fd = sock->fd;
1590   pthread_mutex_lock (&connection_threads_lock);
1591   {
1592     pthread_t *temp;
1594     temp = (pthread_t *) realloc (connection_threads,
1595         sizeof (pthread_t) * (connection_threads_num + 1));
1596     if (temp == NULL)
1597     {
1598       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1599     }
1600     else
1601     {
1602       connection_threads = temp;
1603       connection_threads[connection_threads_num] = pthread_self ();
1604       connection_threads_num++;
1605     }
1606   }
1607   pthread_mutex_unlock (&connection_threads_lock);
1609   while (do_shutdown == 0)
1610   {
1611     char buffer[CMD_MAX];
1613     struct pollfd pollfd;
1614     int status;
1616     pollfd.fd = fd;
1617     pollfd.events = POLLIN | POLLPRI;
1618     pollfd.revents = 0;
1620     status = poll (&pollfd, 1, /* timeout = */ 500);
1621     if (do_shutdown)
1622       break;
1623     else if (status == 0) /* timeout */
1624       continue;
1625     else if (status < 0) /* error */
1626     {
1627       status = errno;
1628       if (status == EINTR)
1629         continue;
1630       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1631       continue;
1632     }
1634     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1635     {
1636       close (fd);
1637       break;
1638     }
1639     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1640     {
1641       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1642           "poll(2) returned something unexpected: %#04hx",
1643           pollfd.revents);
1644       close (fd);
1645       break;
1646     }
1648     status = (int) sread (fd, buffer, sizeof (buffer));
1649     if (status <= 0)
1650     {
1651       close (fd);
1653       if (status < 0)
1654         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1656       break;
1657     }
1659     status = handle_request (fd, sock->privilege, buffer, status);
1660     if (status != 0)
1661       break;
1662   }
1664   close(fd);
1665   free(args);
1667   self = pthread_self ();
1668   /* Remove this thread from the connection threads list */
1669   pthread_mutex_lock (&connection_threads_lock);
1670   /* Find out own index in the array */
1671   for (i = 0; i < connection_threads_num; i++)
1672     if (pthread_equal (connection_threads[i], self) != 0)
1673       break;
1674   assert (i < connection_threads_num);
1676   /* Move the trailing threads forward. */
1677   if (i < (connection_threads_num - 1))
1678   {
1679     memmove (connection_threads + i,
1680         connection_threads + i + 1,
1681         sizeof (pthread_t) * (connection_threads_num - i - 1));
1682   }
1684   connection_threads_num--;
1685   pthread_mutex_unlock (&connection_threads_lock);
1687   return (NULL);
1688 } /* }}} void *connection_thread_main */
1690 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1692   int fd;
1693   struct sockaddr_un sa;
1694   listen_socket_t *temp;
1695   int status;
1696   const char *path;
1698   path = sock->addr;
1699   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1700     path += strlen("unix:");
1702   temp = (listen_socket_t *) realloc (listen_fds,
1703       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1704   if (temp == NULL)
1705   {
1706     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1707     return (-1);
1708   }
1709   listen_fds = temp;
1710   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1712   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1713   if (fd < 0)
1714   {
1715     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1716     return (-1);
1717   }
1719   memset (&sa, 0, sizeof (sa));
1720   sa.sun_family = AF_UNIX;
1721   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1723   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1724   if (status != 0)
1725   {
1726     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1727     close (fd);
1728     unlink (path);
1729     return (-1);
1730   }
1732   status = listen (fd, /* backlog = */ 10);
1733   if (status != 0)
1734   {
1735     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1736     close (fd);
1737     unlink (path);
1738     return (-1);
1739   }
1741   listen_fds[listen_fds_num].fd = fd;
1742   listen_fds[listen_fds_num].family = PF_UNIX;
1743   strncpy(listen_fds[listen_fds_num].addr, path,
1744           sizeof (listen_fds[listen_fds_num].addr) - 1);
1745   listen_fds_num++;
1747   return (0);
1748 } /* }}} int open_listen_socket_unix */
1750 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1752   struct addrinfo ai_hints;
1753   struct addrinfo *ai_res;
1754   struct addrinfo *ai_ptr;
1755   char addr_copy[NI_MAXHOST];
1756   char *addr;
1757   char *port;
1758   int status;
1760   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1761   addr_copy[sizeof (addr_copy) - 1] = 0;
1762   addr = addr_copy;
1764   memset (&ai_hints, 0, sizeof (ai_hints));
1765   ai_hints.ai_flags = 0;
1766 #ifdef AI_ADDRCONFIG
1767   ai_hints.ai_flags |= AI_ADDRCONFIG;
1768 #endif
1769   ai_hints.ai_family = AF_UNSPEC;
1770   ai_hints.ai_socktype = SOCK_STREAM;
1772   port = NULL;
1773   if (*addr == '[') /* IPv6+port format */
1774   {
1775     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1776     addr++;
1778     port = strchr (addr, ']');
1779     if (port == NULL)
1780     {
1781       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1782           sock->addr);
1783       return (-1);
1784     }
1785     *port = 0;
1786     port++;
1788     if (*port == ':')
1789       port++;
1790     else if (*port == 0)
1791       port = NULL;
1792     else
1793     {
1794       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1795           port);
1796       return (-1);
1797     }
1798   } /* if (*addr = ']') */
1799   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1800   {
1801     port = rindex(addr, ':');
1802     if (port != NULL)
1803     {
1804       *port = 0;
1805       port++;
1806     }
1807   }
1808   ai_res = NULL;
1809   status = getaddrinfo (addr,
1810                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1811                         &ai_hints, &ai_res);
1812   if (status != 0)
1813   {
1814     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1815         "%s", addr, gai_strerror (status));
1816     return (-1);
1817   }
1819   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1820   {
1821     int fd;
1822     listen_socket_t *temp;
1823     int one = 1;
1825     temp = (listen_socket_t *) realloc (listen_fds,
1826         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1827     if (temp == NULL)
1828     {
1829       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1830       continue;
1831     }
1832     listen_fds = temp;
1833     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1835     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1836     if (fd < 0)
1837     {
1838       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1839       continue;
1840     }
1842     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1844     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1845     if (status != 0)
1846     {
1847       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1848       close (fd);
1849       continue;
1850     }
1852     status = listen (fd, /* backlog = */ 10);
1853     if (status != 0)
1854     {
1855       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1856       close (fd);
1857       return (-1);
1858     }
1860     listen_fds[listen_fds_num].fd = fd;
1861     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1862     listen_fds_num++;
1863   } /* for (ai_ptr) */
1865   return (0);
1866 } /* }}} static int open_listen_socket_network */
1868 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1870   assert(sock != NULL);
1871   assert(sock->addr != NULL);
1873   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1874       || sock->addr[0] == '/')
1875     return (open_listen_socket_unix(sock));
1876   else
1877     return (open_listen_socket_network(sock));
1878 } /* }}} int open_listen_socket */
1880 static int close_listen_sockets (void) /* {{{ */
1882   size_t i;
1884   for (i = 0; i < listen_fds_num; i++)
1885   {
1886     close (listen_fds[i].fd);
1888     if (listen_fds[i].family == PF_UNIX)
1889       unlink(listen_fds[i].addr);
1890   }
1892   free (listen_fds);
1893   listen_fds = NULL;
1894   listen_fds_num = 0;
1896   return (0);
1897 } /* }}} int close_listen_sockets */
1899 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1901   struct pollfd *pollfds;
1902   int pollfds_num;
1903   int status;
1904   int i;
1906   for (i = 0; i < config_listen_address_list_len; i++)
1907     open_listen_socket (config_listen_address_list[i]);
1909   if (config_listen_address_list_len < 1)
1910   {
1911     listen_socket_t sock;
1912     memset(&sock, 0, sizeof(sock));
1913     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1914     open_listen_socket (&sock);
1915   }
1917   if (listen_fds_num < 1)
1918   {
1919     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1920         "could be opened. Sorry.");
1921     return (NULL);
1922   }
1924   pollfds_num = listen_fds_num;
1925   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1926   if (pollfds == NULL)
1927   {
1928     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1929     return (NULL);
1930   }
1931   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1933   RRDD_LOG(LOG_INFO, "listening for connections");
1935   while (do_shutdown == 0)
1936   {
1937     assert (pollfds_num == ((int) listen_fds_num));
1938     for (i = 0; i < pollfds_num; i++)
1939     {
1940       pollfds[i].fd = listen_fds[i].fd;
1941       pollfds[i].events = POLLIN | POLLPRI;
1942       pollfds[i].revents = 0;
1943     }
1945     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1946     if (do_shutdown)
1947       break;
1948     else if (status == 0) /* timeout */
1949       continue;
1950     else if (status < 0) /* error */
1951     {
1952       status = errno;
1953       if (status != EINTR)
1954       {
1955         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1956       }
1957       continue;
1958     }
1960     for (i = 0; i < pollfds_num; i++)
1961     {
1962       listen_socket_t *client_sock;
1963       struct sockaddr_storage client_sa;
1964       socklen_t client_sa_size;
1965       pthread_t tid;
1966       pthread_attr_t attr;
1968       if (pollfds[i].revents == 0)
1969         continue;
1971       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1972       {
1973         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1974             "poll(2) returned something unexpected for listen FD #%i.",
1975             pollfds[i].fd);
1976         continue;
1977       }
1979       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1980       if (client_sock == NULL)
1981       {
1982         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1983         continue;
1984       }
1985       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1987       client_sa_size = sizeof (client_sa);
1988       client_sock->fd = accept (pollfds[i].fd,
1989           (struct sockaddr *) &client_sa, &client_sa_size);
1990       if (client_sock->fd < 0)
1991       {
1992         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1993         free(client_sock);
1994         continue;
1995       }
1997       pthread_attr_init (&attr);
1998       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2000       status = pthread_create (&tid, &attr, connection_thread_main,
2001                                client_sock);
2002       if (status != 0)
2003       {
2004         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2005         close (client_sock->fd);
2006         free (client_sock);
2007         continue;
2008       }
2009     } /* for (pollfds_num) */
2010   } /* while (do_shutdown == 0) */
2012   RRDD_LOG(LOG_INFO, "starting shutdown");
2014   close_listen_sockets ();
2016   pthread_mutex_lock (&connection_threads_lock);
2017   while (connection_threads_num > 0)
2018   {
2019     pthread_t wait_for;
2021     wait_for = connection_threads[0];
2023     pthread_mutex_unlock (&connection_threads_lock);
2024     pthread_join (wait_for, /* retval = */ NULL);
2025     pthread_mutex_lock (&connection_threads_lock);
2026   }
2027   pthread_mutex_unlock (&connection_threads_lock);
2029   return (NULL);
2030 } /* }}} void *listen_thread_main */
2032 static int daemonize (void) /* {{{ */
2034   int status;
2035   int fd;
2036   char *base_dir;
2038   fd = open_pidfile();
2039   if (fd < 0) return fd;
2041   if (!stay_foreground)
2042   {
2043     pid_t child;
2045     child = fork ();
2046     if (child < 0)
2047     {
2048       fprintf (stderr, "daemonize: fork(2) failed.\n");
2049       return (-1);
2050     }
2051     else if (child > 0)
2052     {
2053       return (1);
2054     }
2056     /* Become session leader */
2057     setsid ();
2059     /* Open the first three file descriptors to /dev/null */
2060     close (2);
2061     close (1);
2062     close (0);
2064     open ("/dev/null", O_RDWR);
2065     dup (0);
2066     dup (0);
2067   } /* if (!stay_foreground) */
2069   /* Change into the /tmp directory. */
2070   base_dir = (config_base_dir != NULL)
2071     ? config_base_dir
2072     : "/tmp";
2073   status = chdir (base_dir);
2074   if (status != 0)
2075   {
2076     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2077     return (-1);
2078   }
2080   install_signal_handlers();
2082   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2083   RRDD_LOG(LOG_INFO, "starting up");
2085   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2086   if (cache_tree == NULL)
2087   {
2088     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2089     return (-1);
2090   }
2092   status = write_pidfile (fd);
2093   return status;
2094 } /* }}} int daemonize */
2096 static int cleanup (void) /* {{{ */
2098   do_shutdown++;
2100   pthread_cond_signal (&cache_cond);
2101   pthread_join (queue_thread, /* return = */ NULL);
2103   remove_pidfile ();
2105   RRDD_LOG(LOG_INFO, "goodbye");
2106   closelog ();
2108   return (0);
2109 } /* }}} int cleanup */
2111 static int read_options (int argc, char **argv) /* {{{ */
2113   int option;
2114   int status = 0;
2116   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2117   {
2118     switch (option)
2119     {
2120       case 'g':
2121         stay_foreground=1;
2122         break;
2124       case 'L':
2125       case 'l':
2126       {
2127         listen_socket_t **temp;
2128         listen_socket_t *new;
2130         new = malloc(sizeof(listen_socket_t));
2131         if (new == NULL)
2132         {
2133           fprintf(stderr, "read_options: malloc failed.\n");
2134           return(2);
2135         }
2136         memset(new, 0, sizeof(listen_socket_t));
2138         temp = (listen_socket_t **) realloc (config_listen_address_list,
2139             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2140         if (temp == NULL)
2141         {
2142           fprintf (stderr, "read_options: realloc failed.\n");
2143           return (2);
2144         }
2145         config_listen_address_list = temp;
2147         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2148         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2150         temp[config_listen_address_list_len] = new;
2151         config_listen_address_list_len++;
2152       }
2153       break;
2155       case 'f':
2156       {
2157         int temp;
2159         temp = atoi (optarg);
2160         if (temp > 0)
2161           config_flush_interval = temp;
2162         else
2163         {
2164           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2165           status = 3;
2166         }
2167       }
2168       break;
2170       case 'w':
2171       {
2172         int temp;
2174         temp = atoi (optarg);
2175         if (temp > 0)
2176           config_write_interval = temp;
2177         else
2178         {
2179           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2180           status = 2;
2181         }
2182       }
2183       break;
2185       case 'z':
2186       {
2187         int temp;
2189         temp = atoi(optarg);
2190         if (temp > 0)
2191           config_write_jitter = temp;
2192         else
2193         {
2194           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2195           status = 2;
2196         }
2198         break;
2199       }
2201       case 'B':
2202         config_write_base_only = 1;
2203         break;
2205       case 'b':
2206       {
2207         size_t len;
2209         if (config_base_dir != NULL)
2210           free (config_base_dir);
2211         config_base_dir = strdup (optarg);
2212         if (config_base_dir == NULL)
2213         {
2214           fprintf (stderr, "read_options: strdup failed.\n");
2215           return (3);
2216         }
2218         len = strlen (config_base_dir);
2219         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2220         {
2221           config_base_dir[len - 1] = 0;
2222           len--;
2223         }
2225         if (len < 1)
2226         {
2227           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2228           return (4);
2229         }
2231         _config_base_dir_len = len;
2232       }
2233       break;
2235       case 'p':
2236       {
2237         if (config_pid_file != NULL)
2238           free (config_pid_file);
2239         config_pid_file = strdup (optarg);
2240         if (config_pid_file == NULL)
2241         {
2242           fprintf (stderr, "read_options: strdup failed.\n");
2243           return (3);
2244         }
2245       }
2246       break;
2248       case 'F':
2249         config_flush_at_shutdown = 1;
2250         break;
2252       case 'j':
2253       {
2254         struct stat statbuf;
2255         const char *dir = optarg;
2257         status = stat(dir, &statbuf);
2258         if (status != 0)
2259         {
2260           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2261           return 6;
2262         }
2264         if (!S_ISDIR(statbuf.st_mode)
2265             || access(dir, R_OK|W_OK|X_OK) != 0)
2266         {
2267           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2268                   errno ? rrd_strerror(errno) : "");
2269           return 6;
2270         }
2272         journal_cur = malloc(PATH_MAX + 1);
2273         journal_old = malloc(PATH_MAX + 1);
2274         if (journal_cur == NULL || journal_old == NULL)
2275         {
2276           fprintf(stderr, "malloc failure for journal files\n");
2277           return 6;
2278         }
2279         else 
2280         {
2281           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2282           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2283         }
2284       }
2285       break;
2287       case 'h':
2288       case '?':
2289         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2290             "\n"
2291             "Usage: rrdcached [options]\n"
2292             "\n"
2293             "Valid options are:\n"
2294             "  -l <address>  Socket address to listen to.\n"
2295             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2296             "  -w <seconds>  Interval in which to write data.\n"
2297             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2298             "  -f <seconds>  Interval in which to flush dead data.\n"
2299             "  -p <file>     Location of the PID-file.\n"
2300             "  -b <dir>      Base directory to change to.\n"
2301             "  -B            Restrict file access to paths within -b <dir>\n"
2302             "  -g            Do not fork and run in the foreground.\n"
2303             "  -j <dir>      Directory in which to create the journal files.\n"
2304             "  -F            Always flush all updates at shutdown\n"
2305             "\n"
2306             "For more information and a detailed description of all options "
2307             "please refer\n"
2308             "to the rrdcached(1) manual page.\n",
2309             VERSION);
2310         status = -1;
2311         break;
2312     } /* switch (option) */
2313   } /* while (getopt) */
2315   /* advise the user when values are not sane */
2316   if (config_flush_interval < 2 * config_write_interval)
2317     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2318             " 2x write interval (-w) !\n");
2319   if (config_write_jitter > config_write_interval)
2320     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2321             " write interval (-w) !\n");
2323   if (config_write_base_only && config_base_dir == NULL)
2324     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2325             "  Consult the rrdcached documentation\n");
2327   if (journal_cur == NULL)
2328     config_flush_at_shutdown = 1;
2330   return (status);
2331 } /* }}} int read_options */
2333 int main (int argc, char **argv)
2335   int status;
2337   status = read_options (argc, argv);
2338   if (status != 0)
2339   {
2340     if (status < 0)
2341       status = 0;
2342     return (status);
2343   }
2345   status = daemonize ();
2346   if (status == 1)
2347   {
2348     struct sigaction sigchld;
2350     memset (&sigchld, 0, sizeof (sigchld));
2351     sigchld.sa_handler = SIG_IGN;
2352     sigaction (SIGCHLD, &sigchld, NULL);
2354     return (0);
2355   }
2356   else if (status != 0)
2357   {
2358     fprintf (stderr, "daemonize failed, exiting.\n");
2359     return (1);
2360   }
2362   if (journal_cur != NULL)
2363   {
2364     int had_journal = 0;
2366     pthread_mutex_lock(&journal_lock);
2368     RRDD_LOG(LOG_INFO, "checking for journal files");
2370     had_journal += journal_replay(journal_old);
2371     had_journal += journal_replay(journal_cur);
2373     if (had_journal)
2374       flush_old_values(-1);
2376     pthread_mutex_unlock(&journal_lock);
2377     journal_rotate();
2379     RRDD_LOG(LOG_INFO, "journal processing complete");
2380   }
2382   /* start the queue thread */
2383   memset (&queue_thread, 0, sizeof (queue_thread));
2384   status = pthread_create (&queue_thread,
2385                            NULL, /* attr */
2386                            queue_thread_main,
2387                            NULL); /* args */
2388   if (status != 0)
2389   {
2390     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2391     cleanup();
2392     return (1);
2393   }
2395   listen_thread_main (NULL);
2396   cleanup ();
2398   return (0);
2399 } /* int main */
2401 /*
2402  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2403  */