Code

This patch ensures that the "FLUSH" command will write the updates out to
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 struct listen_socket_s
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122   pthread_cond_t  flushed;
123   cache_item_t *next;
124 };
126 struct callback_flush_data_s
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
168 static int config_write_interval = 300;
169 static int config_write_jitter   = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
195 /* 
196  * Functions
197  */
198 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
200   RRDD_LOG(LOG_NOTICE, "caught SIGINT");
201   do_shutdown++;
202   pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_int_handler */
205 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
207   RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
208   do_shutdown++;
209   pthread_cond_broadcast(&cache_cond);
210 } /* }}} void sig_term_handler */
212 static int write_pidfile (void) /* {{{ */
214   pid_t pid;
215   char *file;
216   int fd;
217   FILE *fh;
219   pid = getpid ();
220   
221   file = (config_pid_file != NULL)
222     ? config_pid_file
223     : LOCALSTATEDIR "/run/rrdcached.pid";
225   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
226   if (fd < 0)
227   {
228     RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
229              file, rrd_strerror(errno));
230     return (-1);
231   }
233   fh = fdopen (fd, "w");
234   if (fh == NULL)
235   {
236     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
237     close(fd);
238     return (-1);
239   }
241   fprintf (fh, "%i\n", (int) pid);
242   fclose (fh);
244   return (0);
245 } /* }}} int write_pidfile */
247 static int remove_pidfile (void) /* {{{ */
249   char *file;
250   int status;
252   file = (config_pid_file != NULL)
253     ? config_pid_file
254     : LOCALSTATEDIR "/run/rrdcached.pid";
256   status = unlink (file);
257   if (status == 0)
258     return (0);
259   return (errno);
260 } /* }}} int remove_pidfile */
262 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
264   char    *buffer;
265   size_t   buffer_used;
266   size_t   buffer_free;
267   ssize_t  status;
269   buffer       = (char *) buffer_void;
270   buffer_used  = 0;
271   buffer_free  = buffer_size;
273   while (buffer_free > 0)
274   {
275     status = read (fd, buffer + buffer_used, buffer_free);
276     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
277       continue;
279     if (status < 0)
280       return (-1);
282     if (status == 0)
283       return (0);
285     assert ((0 > status) || (buffer_free >= (size_t) status));
287     buffer_free = buffer_free - status;
288     buffer_used = buffer_used + status;
290     if (buffer[buffer_used - 1] == '\n')
291       break;
292   }
294   assert (buffer_used > 0);
296   if (buffer[buffer_used - 1] != '\n')
297   {
298     errno = ENOBUFS;
299     return (-1);
300   }
302   buffer[buffer_used - 1] = 0;
304   /* Fix network line endings. */
305   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
306   {
307     buffer_used--;
308     buffer[buffer_used - 1] = 0;
309   }
311   return (buffer_used);
312 } /* }}} ssize_t sread */
314 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
316   const char *ptr;
317   size_t      nleft;
318   ssize_t     status;
320   /* special case for journal replay */
321   if (fd < 0) return 0;
323   ptr   = (const char *) buf;
324   nleft = count;
326   while (nleft > 0)
327   {
328     status = write (fd, (const void *) ptr, nleft);
330     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
331       continue;
333     if (status < 0)
334       return (status);
336     nleft -= status;
337     ptr   += status;
338   }
340   return (0);
341 } /* }}} ssize_t swrite */
343 static void _wipe_ci_values(cache_item_t *ci, time_t when)
345   ci->values = NULL;
346   ci->values_num = 0;
348   ci->last_flush_time = when;
349   if (config_write_jitter > 0)
350     ci->last_flush_time += (random() % config_write_jitter);
352   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
355 /*
356  * enqueue_cache_item:
357  * `cache_lock' must be acquired before calling this function!
358  */
359 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
360     queue_side_t side)
362   int did_insert = 0;
364   if (ci == NULL)
365     return (-1);
367   if (ci->values_num == 0)
368     return (0);
370   if (side == HEAD)
371   {
372     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
373     {
374       assert (ci->next == NULL);
375       ci->next = cache_queue_head;
376       cache_queue_head = ci;
378       if (cache_queue_tail == NULL)
379         cache_queue_tail = cache_queue_head;
381       did_insert = 1;
382     }
383     else if (cache_queue_head == ci)
384     {
385       /* do nothing */
386     }
387     else /* enqueued, but not first entry */
388     {
389       cache_item_t *prev;
391       /* find previous entry */
392       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
393         if (prev->next == ci)
394           break;
395       assert (prev != NULL);
397       /* move to the front */
398       prev->next = ci->next;
399       ci->next = cache_queue_head;
400       cache_queue_head = ci;
402       /* check if we need to adapt the tail */
403       if (cache_queue_tail == ci)
404         cache_queue_tail = prev;
405     }
406   }
407   else /* (side == TAIL) */
408   {
409     /* We don't move values back in the list.. */
410     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
411       return (0);
413     assert (ci->next == NULL);
415     if (cache_queue_tail == NULL)
416       cache_queue_head = ci;
417     else
418       cache_queue_tail->next = ci;
419     cache_queue_tail = ci;
421     did_insert = 1;
422   }
424   ci->flags |= CI_FLAGS_IN_QUEUE;
426   if (did_insert)
427   {
428     pthread_mutex_lock (&stats_lock);
429     stats_queue_length++;
430     pthread_mutex_unlock (&stats_lock);
431   }
433   return (0);
434 } /* }}} int enqueue_cache_item */
436 /*
437  * tree_callback_flush:
438  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
439  * while this is in progress.
440  */
441 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
442     gpointer data)
444   cache_item_t *ci;
445   callback_flush_data_t *cfd;
447   ci = (cache_item_t *) value;
448   cfd = (callback_flush_data_t *) data;
450   if ((ci->last_flush_time <= cfd->abs_timeout)
451       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
452       && (ci->values_num > 0))
453   {
454     enqueue_cache_item (ci, TAIL);
455   }
456   else if ((do_shutdown != 0)
457       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
458       && (ci->values_num > 0))
459   {
460     enqueue_cache_item (ci, TAIL);
461   }
462   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
463       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
464       && (ci->values_num <= 0))
465   {
466     char **temp;
468     temp = (char **) realloc (cfd->keys,
469         sizeof (char *) * (cfd->keys_num + 1));
470     if (temp == NULL)
471     {
472       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
473       return (FALSE);
474     }
475     cfd->keys = temp;
476     /* Make really sure this points to the _same_ place */
477     assert ((char *) key == ci->file);
478     cfd->keys[cfd->keys_num] = (char *) key;
479     cfd->keys_num++;
480   }
482   return (FALSE);
483 } /* }}} gboolean tree_callback_flush */
485 static int flush_old_values (int max_age)
487   callback_flush_data_t cfd;
488   size_t k;
490   memset (&cfd, 0, sizeof (cfd));
491   /* Pass the current time as user data so that we don't need to call
492    * `time' for each node. */
493   cfd.now = time (NULL);
494   cfd.keys = NULL;
495   cfd.keys_num = 0;
497   if (max_age > 0)
498     cfd.abs_timeout = cfd.now - max_age;
499   else
500     cfd.abs_timeout = cfd.now + 1;
502   /* `tree_callback_flush' will return the keys of all values that haven't
503    * been touched in the last `config_flush_interval' seconds in `cfd'.
504    * The char*'s in this array point to the same memory as ci->file, so we
505    * don't need to free them separately. */
506   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
508   for (k = 0; k < cfd.keys_num; k++)
509   {
510     cache_item_t *ci;
512     /* This must not fail. */
513     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
514     assert (ci != NULL);
516     /* If we end up here with values available, something's seriously
517      * messed up. */
518     assert (ci->values_num == 0);
520     /* Remove the node from the tree */
521     g_tree_remove (cache_tree, cfd.keys[k]);
522     cfd.keys[k] = NULL;
524     /* Now free and clean up `ci'. */
525     free (ci->file);
526     ci->file = NULL;
527     free (ci);
528     ci = NULL;
529   } /* for (k = 0; k < cfd.keys_num; k++) */
531   if (cfd.keys != NULL)
532   {
533     free (cfd.keys);
534     cfd.keys = NULL;
535   }
537   return (0);
538 } /* int flush_old_values */
540 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
542   struct timeval now;
543   struct timespec next_flush;
545   gettimeofday (&now, NULL);
546   next_flush.tv_sec = now.tv_sec + config_flush_interval;
547   next_flush.tv_nsec = 1000 * now.tv_usec;
549   pthread_mutex_lock (&cache_lock);
550   while ((do_shutdown == 0) || (cache_queue_head != NULL))
551   {
552     cache_item_t *ci;
553     char *file;
554     char **values;
555     int values_num;
556     int status;
557     int i;
559     /* First, check if it's time to do the cache flush. */
560     gettimeofday (&now, NULL);
561     if ((now.tv_sec > next_flush.tv_sec)
562         || ((now.tv_sec == next_flush.tv_sec)
563           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
564     {
565       /* Flush all values that haven't been written in the last
566        * `config_write_interval' seconds. */
567       flush_old_values (config_write_interval);
569       /* Determine the time of the next cache flush. */
570       while (next_flush.tv_sec <= now.tv_sec)
571         next_flush.tv_sec += config_flush_interval;
573       /* unlock the cache while we rotate so we don't block incoming
574        * updates if the fsync() blocks on disk I/O */
575       pthread_mutex_unlock(&cache_lock);
576       journal_rotate();
577       pthread_mutex_lock(&cache_lock);
578     }
580     /* Now, check if there's something to store away. If not, wait until
581      * something comes in or it's time to do the cache flush. */
582     if (cache_queue_head == NULL)
583     {
584       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
585       if ((status != 0) && (status != ETIMEDOUT))
586       {
587         RRDD_LOG (LOG_ERR, "queue_thread_main: "
588             "pthread_cond_timedwait returned %i.", status);
589       }
590     }
592     /* We're about to shut down, so lets flush the entire tree. */
593     if ((do_shutdown != 0) && (cache_queue_head == NULL))
594       flush_old_values (/* max age = */ -1);
596     /* Check if a value has arrived. This may be NULL if we timed out or there
597      * was an interrupt such as a signal. */
598     if (cache_queue_head == NULL)
599       continue;
601     ci = cache_queue_head;
603     /* copy the relevant parts */
604     file = strdup (ci->file);
605     if (file == NULL)
606     {
607       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
608       continue;
609     }
611     assert(ci->values != NULL);
612     assert(ci->values_num > 0);
614     values = ci->values;
615     values_num = ci->values_num;
617     _wipe_ci_values(ci, time(NULL));
619     cache_queue_head = ci->next;
620     if (cache_queue_head == NULL)
621       cache_queue_tail = NULL;
622     ci->next = NULL;
624     pthread_mutex_lock (&stats_lock);
625     assert (stats_queue_length > 0);
626     stats_queue_length--;
627     pthread_mutex_unlock (&stats_lock);
629     pthread_mutex_unlock (&cache_lock);
631     rrd_clear_error ();
632     status = rrd_update_r (file, NULL, values_num, (void *) values);
633     if (status != 0)
634     {
635       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
636           "rrd_update_r (%s) failed with status %i. (%s)",
637           file, status, rrd_get_error());
638     }
640     journal_write("wrote", file);
641     pthread_cond_broadcast(&ci->flushed);
643     for (i = 0; i < values_num; i++)
644       free (values[i]);
646     free(values);
647     free(file);
649     if (status == 0)
650     {
651       pthread_mutex_lock (&stats_lock);
652       stats_updates_written++;
653       stats_data_sets_written += values_num;
654       pthread_mutex_unlock (&stats_lock);
655     }
657     pthread_mutex_lock (&cache_lock);
659     /* We're about to shut down, so lets flush the entire tree. */
660     if ((do_shutdown != 0) && (cache_queue_head == NULL))
661       flush_old_values (/* max age = */ -1);
662   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
663   pthread_mutex_unlock (&cache_lock);
665   assert(cache_queue_head == NULL);
666   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
667   journal_done();
669   return (NULL);
670 } /* }}} void *queue_thread_main */
672 static int buffer_get_field (char **buffer_ret, /* {{{ */
673     size_t *buffer_size_ret, char **field_ret)
675   char *buffer;
676   size_t buffer_pos;
677   size_t buffer_size;
678   char *field;
679   size_t field_size;
680   int status;
682   buffer = *buffer_ret;
683   buffer_pos = 0;
684   buffer_size = *buffer_size_ret;
685   field = *buffer_ret;
686   field_size = 0;
688   if (buffer_size <= 0)
689     return (-1);
691   /* This is ensured by `handle_request'. */
692   assert (buffer[buffer_size - 1] == '\0');
694   status = -1;
695   while (buffer_pos < buffer_size)
696   {
697     /* Check for end-of-field or end-of-buffer */
698     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
699     {
700       field[field_size] = 0;
701       field_size++;
702       buffer_pos++;
703       status = 0;
704       break;
705     }
706     /* Handle escaped characters. */
707     else if (buffer[buffer_pos] == '\\')
708     {
709       if (buffer_pos >= (buffer_size - 1))
710         break;
711       buffer_pos++;
712       field[field_size] = buffer[buffer_pos];
713       field_size++;
714       buffer_pos++;
715     }
716     /* Normal operation */ 
717     else
718     {
719       field[field_size] = buffer[buffer_pos];
720       field_size++;
721       buffer_pos++;
722     }
723   } /* while (buffer_pos < buffer_size) */
725   if (status != 0)
726     return (status);
728   *buffer_ret = buffer + buffer_pos;
729   *buffer_size_ret = buffer_size - buffer_pos;
730   *field_ret = field;
732   return (0);
733 } /* }}} int buffer_get_field */
735 static int flush_file (const char *filename) /* {{{ */
737   cache_item_t *ci;
739   pthread_mutex_lock (&cache_lock);
741   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
742   if (ci == NULL)
743   {
744     pthread_mutex_unlock (&cache_lock);
745     return (ENOENT);
746   }
748   /* Enqueue at head */
749   enqueue_cache_item (ci, HEAD);
750   pthread_cond_signal (&cache_cond);
752   pthread_cond_wait(&ci->flushed, &cache_lock);
753   pthread_mutex_unlock(&cache_lock);
755   return (0);
756 } /* }}} int flush_file */
758 static int handle_request_help (int fd, /* {{{ */
759     char *buffer, size_t buffer_size)
761   int status;
762   char **help_text;
763   size_t help_text_len;
764   char *command;
765   size_t i;
767   char *help_help[] =
768   {
769     "4 Command overview\n",
770     "FLUSH <filename>\n",
771     "HELP [<command>]\n",
772     "UPDATE <filename> <values> [<values> ...]\n",
773     "STATS\n"
774   };
775   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
777   char *help_flush[] =
778   {
779     "4 Help for FLUSH\n",
780     "Usage: FLUSH <filename>\n",
781     "\n",
782     "Adds the given filename to the head of the update queue and returns\n",
783     "after is has been dequeued.\n"
784   };
785   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
787   char *help_update[] =
788   {
789     "9 Help for UPDATE\n",
790     "Usage: UPDATE <filename> <values> [<values> ...]\n"
791     "\n",
792     "Adds the given file to the internal cache if it is not yet known and\n",
793     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
794     "for details.\n",
795     "\n",
796     "Each <values> has the following form:\n",
797     "  <values> = <time>:<value>[:<value>[...]]\n",
798     "See the rrdupdate(1) manpage for details.\n"
799   };
800   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
802   char *help_stats[] =
803   {
804     "4 Help for STATS\n",
805     "Usage: STATS\n",
806     "\n",
807     "Returns some performance counters, see the rrdcached(1) manpage for\n",
808     "a description of the values.\n"
809   };
810   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
812   status = buffer_get_field (&buffer, &buffer_size, &command);
813   if (status != 0)
814   {
815     help_text = help_help;
816     help_text_len = help_help_len;
817   }
818   else
819   {
820     if (strcasecmp (command, "update") == 0)
821     {
822       help_text = help_update;
823       help_text_len = help_update_len;
824     }
825     else if (strcasecmp (command, "flush") == 0)
826     {
827       help_text = help_flush;
828       help_text_len = help_flush_len;
829     }
830     else if (strcasecmp (command, "stats") == 0)
831     {
832       help_text = help_stats;
833       help_text_len = help_stats_len;
834     }
835     else
836     {
837       help_text = help_help;
838       help_text_len = help_help_len;
839     }
840   }
842   for (i = 0; i < help_text_len; i++)
843   {
844     status = swrite (fd, help_text[i], strlen (help_text[i]));
845     if (status < 0)
846     {
847       status = errno;
848       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
849       return (status);
850     }
851   }
853   return (0);
854 } /* }}} int handle_request_help */
856 static int handle_request_stats (int fd, /* {{{ */
857     char *buffer __attribute__((unused)),
858     size_t buffer_size __attribute__((unused)))
860   int status;
861   char outbuf[CMD_MAX];
863   uint64_t copy_queue_length;
864   uint64_t copy_updates_received;
865   uint64_t copy_flush_received;
866   uint64_t copy_updates_written;
867   uint64_t copy_data_sets_written;
868   uint64_t copy_journal_bytes;
869   uint64_t copy_journal_rotate;
871   uint64_t tree_nodes_number;
872   uint64_t tree_depth;
874   pthread_mutex_lock (&stats_lock);
875   copy_queue_length       = stats_queue_length;
876   copy_updates_received   = stats_updates_received;
877   copy_flush_received     = stats_flush_received;
878   copy_updates_written    = stats_updates_written;
879   copy_data_sets_written  = stats_data_sets_written;
880   copy_journal_bytes      = stats_journal_bytes;
881   copy_journal_rotate     = stats_journal_rotate;
882   pthread_mutex_unlock (&stats_lock);
884   pthread_mutex_lock (&cache_lock);
885   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
886   tree_depth        = (uint64_t) g_tree_height (cache_tree);
887   pthread_mutex_unlock (&cache_lock);
889 #define RRDD_STATS_SEND \
890   outbuf[sizeof (outbuf) - 1] = 0; \
891   status = swrite (fd, outbuf, strlen (outbuf)); \
892   if (status < 0) \
893   { \
894     status = errno; \
895     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
896     return (status); \
897   }
899   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
900   RRDD_STATS_SEND;
902   snprintf (outbuf, sizeof (outbuf),
903       "QueueLength: %"PRIu64"\n", copy_queue_length);
904   RRDD_STATS_SEND;
906   snprintf (outbuf, sizeof (outbuf),
907       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
908   RRDD_STATS_SEND;
910   snprintf (outbuf, sizeof (outbuf),
911       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
912   RRDD_STATS_SEND;
914   snprintf (outbuf, sizeof (outbuf),
915       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
916   RRDD_STATS_SEND;
918   snprintf (outbuf, sizeof (outbuf),
919       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
920   RRDD_STATS_SEND;
922   snprintf (outbuf, sizeof (outbuf),
923       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
924   RRDD_STATS_SEND;
926   snprintf (outbuf, sizeof (outbuf),
927       "TreeDepth: %"PRIu64"\n", tree_depth);
928   RRDD_STATS_SEND;
930   snprintf (outbuf, sizeof(outbuf),
931       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
932   RRDD_STATS_SEND;
934   snprintf (outbuf, sizeof(outbuf),
935       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
936   RRDD_STATS_SEND;
938   return (0);
939 #undef RRDD_STATS_SEND
940 } /* }}} int handle_request_stats */
942 static int handle_request_flush (int fd, /* {{{ */
943     char *buffer, size_t buffer_size)
945   char *file;
946   int status;
947   char result[CMD_MAX];
949   status = buffer_get_field (&buffer, &buffer_size, &file);
950   if (status != 0)
951   {
952     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
953   }
954   else
955   {
956     pthread_mutex_lock(&stats_lock);
957     stats_flush_received++;
958     pthread_mutex_unlock(&stats_lock);
960     status = flush_file (file);
961     if (status == 0)
962       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
963     else if (status == ENOENT)
964     {
965       /* no file in our tree; see whether it exists at all */
966       struct stat statbuf;
968       memset(&statbuf, 0, sizeof(statbuf));
969       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
970         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
971       else
972         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
973     }
974     else if (status < 0)
975       strncpy (result, "-1 Internal error.\n", sizeof (result));
976     else
977       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
978   }
979   result[sizeof (result) - 1] = 0;
981   status = swrite (fd, result, strlen (result));
982   if (status < 0)
983   {
984     status = errno;
985     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
986     return (status);
987   }
989   return (0);
990 } /* }}} int handle_request_flush */
992 static int handle_request_update (int fd, /* {{{ */
993     char *buffer, size_t buffer_size)
995   char *file;
996   int values_num = 0;
997   int status;
999   time_t now;
1001   cache_item_t *ci;
1002   char answer[CMD_MAX];
1004 #define RRDD_UPDATE_SEND \
1005   answer[sizeof (answer) - 1] = 0; \
1006   status = swrite (fd, answer, strlen (answer)); \
1007   if (status < 0) \
1008   { \
1009     status = errno; \
1010     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1011     return (status); \
1012   }
1014   now = time (NULL);
1016   status = buffer_get_field (&buffer, &buffer_size, &file);
1017   if (status != 0)
1018   {
1019     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1020         sizeof (answer));
1021     RRDD_UPDATE_SEND;
1022     return (0);
1023   }
1025   pthread_mutex_lock(&stats_lock);
1026   stats_updates_received++;
1027   pthread_mutex_unlock(&stats_lock);
1029   pthread_mutex_lock (&cache_lock);
1030   ci = g_tree_lookup (cache_tree, file);
1032   if (ci == NULL) /* {{{ */
1033   {
1034     struct stat statbuf;
1036     /* don't hold the lock while we setup; stat(2) might block */
1037     pthread_mutex_unlock(&cache_lock);
1039     memset (&statbuf, 0, sizeof (statbuf));
1040     status = stat (file, &statbuf);
1041     if (status != 0)
1042     {
1043       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1045       status = errno;
1046       if (status == ENOENT)
1047         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1048       else
1049         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1050             status);
1051       RRDD_UPDATE_SEND;
1052       return (0);
1053     }
1054     if (!S_ISREG (statbuf.st_mode))
1055     {
1056       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1057       RRDD_UPDATE_SEND;
1058       return (0);
1059     }
1060     if (access(file, R_OK|W_OK) != 0)
1061     {
1062       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1063                 file, rrd_strerror(errno));
1064       RRDD_UPDATE_SEND;
1065       return (0);
1066     }
1068     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1069     if (ci == NULL)
1070     {
1071       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1073       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1074       RRDD_UPDATE_SEND;
1075       return (0);
1076     }
1077     memset (ci, 0, sizeof (cache_item_t));
1079     ci->file = strdup (file);
1080     if (ci->file == NULL)
1081     {
1082       free (ci);
1083       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1085       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1086       RRDD_UPDATE_SEND;
1087       return (0);
1088     }
1090     _wipe_ci_values(ci, now);
1091     ci->flags = CI_FLAGS_IN_TREE;
1093     pthread_mutex_lock(&cache_lock);
1094     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1095   } /* }}} */
1096   assert (ci != NULL);
1098   while (buffer_size > 0)
1099   {
1100     char **temp;
1101     char *value;
1103     status = buffer_get_field (&buffer, &buffer_size, &value);
1104     if (status != 0)
1105     {
1106       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1107       break;
1108     }
1110     temp = (char **) realloc (ci->values,
1111         sizeof (char *) * (ci->values_num + 1));
1112     if (temp == NULL)
1113     {
1114       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1115       continue;
1116     }
1117     ci->values = temp;
1119     ci->values[ci->values_num] = strdup (value);
1120     if (ci->values[ci->values_num] == NULL)
1121     {
1122       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1123       continue;
1124     }
1125     ci->values_num++;
1127     values_num++;
1128   }
1130   if (((now - ci->last_flush_time) >= config_write_interval)
1131       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1132       && (ci->values_num > 0))
1133   {
1134     enqueue_cache_item (ci, TAIL);
1135     pthread_cond_signal (&cache_cond);
1136   }
1138   pthread_mutex_unlock (&cache_lock);
1140   if (values_num < 1)
1141   {
1142     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1143   }
1144   else
1145   {
1146     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1147         (values_num == 1) ? "" : "s");
1148   }
1149   RRDD_UPDATE_SEND;
1150   return (0);
1151 #undef RRDD_UPDATE_SEND
1152 } /* }}} int handle_request_update */
1154 /* we came across a "WROTE" entry during journal replay.
1155  * throw away any values that we have accumulated for this file
1156  */
1157 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1158                                  const char *buffer,
1159                                  size_t buffer_size __attribute__((unused)))
1161   int i;
1162   cache_item_t *ci;
1163   const char *file = buffer;
1165   pthread_mutex_lock(&cache_lock);
1167   ci = g_tree_lookup(cache_tree, file);
1168   if (ci == NULL)
1169   {
1170     pthread_mutex_unlock(&cache_lock);
1171     return (0);
1172   }
1174   if (ci->values)
1175   {
1176     for (i=0; i < ci->values_num; i++)
1177       free(ci->values[i]);
1179     free(ci->values);
1180   }
1182   _wipe_ci_values(ci, time(NULL));
1184   pthread_mutex_unlock(&cache_lock);
1185   return (0);
1186 } /* }}} int handle_request_wrote */
1188 /* if fd < 0, we are in journal replay mode */
1189 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1191   char *buffer_ptr;
1192   char *command;
1193   int status;
1195   assert (buffer[buffer_size - 1] == '\0');
1197   buffer_ptr = buffer;
1198   command = NULL;
1199   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1200   if (status != 0)
1201   {
1202     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1203     return (-1);
1204   }
1206   if (strcasecmp (command, "update") == 0)
1207   {
1208     /* don't re-write updates in replay mode */
1209     if (fd >= 0)
1210       journal_write(command, buffer_ptr);
1212     return (handle_request_update (fd, buffer_ptr, buffer_size));
1213   }
1214   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1215   {
1216     /* this is only valid in replay mode */
1217     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1218   }
1219   else if (strcasecmp (command, "flush") == 0)
1220   {
1221     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1222   }
1223   else if (strcasecmp (command, "stats") == 0)
1224   {
1225     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1226   }
1227   else if (strcasecmp (command, "help") == 0)
1228   {
1229     return (handle_request_help (fd, buffer_ptr, buffer_size));
1230   }
1231   else
1232   {
1233     char result[CMD_MAX];
1235     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1236     result[sizeof (result) - 1] = 0;
1238     status = swrite (fd, result, strlen (result));
1239     if (status < 0)
1240     {
1241       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1242       return (-1);
1243     }
1244   }
1246   return (0);
1247 } /* }}} int handle_request */
1249 /* MUST NOT hold journal_lock before calling this */
1250 static void journal_rotate(void) /* {{{ */
1252   FILE *old_fh = NULL;
1254   if (journal_cur == NULL || journal_old == NULL)
1255     return;
1257   pthread_mutex_lock(&journal_lock);
1259   /* we rotate this way (rename before close) so that the we can release
1260    * the journal lock as fast as possible.  Journal writes to the new
1261    * journal can proceed immediately after the new file is opened.  The
1262    * fclose can then block without affecting new updates.
1263    */
1264   if (journal_fh != NULL)
1265   {
1266     old_fh = journal_fh;
1267     rename(journal_cur, journal_old);
1268     ++stats_journal_rotate;
1269   }
1271   journal_fh = fopen(journal_cur, "a");
1272   pthread_mutex_unlock(&journal_lock);
1274   if (old_fh != NULL)
1275     fclose(old_fh);
1277   if (journal_fh == NULL)
1278     RRDD_LOG(LOG_CRIT,
1279              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1280              journal_cur, rrd_strerror(errno));
1282 } /* }}} static void journal_rotate */
1284 static void journal_done(void) /* {{{ */
1286   if (journal_cur == NULL)
1287     return;
1289   pthread_mutex_lock(&journal_lock);
1290   if (journal_fh != NULL)
1291   {
1292     fclose(journal_fh);
1293     journal_fh = NULL;
1294   }
1296   RRDD_LOG(LOG_INFO, "removing journals");
1298   unlink(journal_old);
1299   unlink(journal_cur);
1300   pthread_mutex_unlock(&journal_lock);
1302 } /* }}} static void journal_done */
1304 static int journal_write(char *cmd, char *args) /* {{{ */
1306   int chars;
1308   if (journal_fh == NULL)
1309     return 0;
1311   pthread_mutex_lock(&journal_lock);
1312   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1313   pthread_mutex_unlock(&journal_lock);
1315   if (chars > 0)
1316   {
1317     pthread_mutex_lock(&stats_lock);
1318     stats_journal_bytes += chars;
1319     pthread_mutex_unlock(&stats_lock);
1320   }
1322   return chars;
1323 } /* }}} static int journal_write */
1325 static int journal_replay (const char *file) /* {{{ */
1327   FILE *fh;
1328   int entry_cnt = 0;
1329   int fail_cnt = 0;
1330   uint64_t line = 0;
1331   char entry[CMD_MAX];
1333   if (file == NULL) return 0;
1335   fh = fopen(file, "r");
1336   if (fh == NULL)
1337   {
1338     if (errno != ENOENT)
1339       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1340                file, rrd_strerror(errno));
1341     return 0;
1342   }
1343   else
1344     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1346   while(!feof(fh))
1347   {
1348     size_t entry_len;
1350     ++line;
1351     fgets(entry, sizeof(entry), fh);
1352     entry_len = strlen(entry);
1354     /* check \n termination in case journal writing crashed mid-line */
1355     if (entry_len == 0)
1356       continue;
1357     else if (entry[entry_len - 1] != '\n')
1358     {
1359       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1360       ++fail_cnt;
1361       continue;
1362     }
1364     entry[entry_len - 1] = '\0';
1366     if (handle_request(-1, entry, entry_len) == 0)
1367       ++entry_cnt;
1368     else
1369       ++fail_cnt;
1370   }
1372   fclose(fh);
1374   if (entry_cnt > 0)
1375   {
1376     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1377              entry_cnt, fail_cnt);
1378     return 1;
1379   }
1380   else
1381     return 0;
1383 } /* }}} static int journal_replay */
1385 static void *connection_thread_main (void *args) /* {{{ */
1387   pthread_t self;
1388   int i;
1389   int fd;
1390   
1391   fd = *((int *) args);
1392   free (args);
1394   pthread_mutex_lock (&connection_threads_lock);
1395   {
1396     pthread_t *temp;
1398     temp = (pthread_t *) realloc (connection_threads,
1399         sizeof (pthread_t) * (connection_threads_num + 1));
1400     if (temp == NULL)
1401     {
1402       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1403     }
1404     else
1405     {
1406       connection_threads = temp;
1407       connection_threads[connection_threads_num] = pthread_self ();
1408       connection_threads_num++;
1409     }
1410   }
1411   pthread_mutex_unlock (&connection_threads_lock);
1413   while (do_shutdown == 0)
1414   {
1415     char buffer[CMD_MAX];
1417     struct pollfd pollfd;
1418     int status;
1420     pollfd.fd = fd;
1421     pollfd.events = POLLIN | POLLPRI;
1422     pollfd.revents = 0;
1424     status = poll (&pollfd, 1, /* timeout = */ 500);
1425     if (status == 0) /* timeout */
1426       continue;
1427     else if (status < 0) /* error */
1428     {
1429       status = errno;
1430       if (status == EINTR)
1431         continue;
1432       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1433       continue;
1434     }
1436     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1437     {
1438       close (fd);
1439       break;
1440     }
1441     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1442     {
1443       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1444           "poll(2) returned something unexpected: %#04hx",
1445           pollfd.revents);
1446       close (fd);
1447       break;
1448     }
1450     status = (int) sread (fd, buffer, sizeof (buffer));
1451     if (status <= 0)
1452     {
1453       close (fd);
1455       if (status < 0)
1456         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1458       break;
1459     }
1461     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1462     if (status != 0)
1463       break;
1464   }
1466   close(fd);
1468   self = pthread_self ();
1469   /* Remove this thread from the connection threads list */
1470   pthread_mutex_lock (&connection_threads_lock);
1471   /* Find out own index in the array */
1472   for (i = 0; i < connection_threads_num; i++)
1473     if (pthread_equal (connection_threads[i], self) != 0)
1474       break;
1475   assert (i < connection_threads_num);
1477   /* Move the trailing threads forward. */
1478   if (i < (connection_threads_num - 1))
1479   {
1480     memmove (connection_threads + i,
1481         connection_threads + i + 1,
1482         sizeof (pthread_t) * (connection_threads_num - i - 1));
1483   }
1485   connection_threads_num--;
1486   pthread_mutex_unlock (&connection_threads_lock);
1488   return (NULL);
1489 } /* }}} void *connection_thread_main */
1491 static int open_listen_socket_unix (const char *path) /* {{{ */
1493   int fd;
1494   struct sockaddr_un sa;
1495   listen_socket_t *temp;
1496   int status;
1498   temp = (listen_socket_t *) realloc (listen_fds,
1499       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1500   if (temp == NULL)
1501   {
1502     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1503     return (-1);
1504   }
1505   listen_fds = temp;
1506   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1508   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1509   if (fd < 0)
1510   {
1511     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1512     return (-1);
1513   }
1515   memset (&sa, 0, sizeof (sa));
1516   sa.sun_family = AF_UNIX;
1517   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1519   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1520   if (status != 0)
1521   {
1522     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1523     close (fd);
1524     unlink (path);
1525     return (-1);
1526   }
1528   status = listen (fd, /* backlog = */ 10);
1529   if (status != 0)
1530   {
1531     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1532     close (fd);
1533     unlink (path);
1534     return (-1);
1535   }
1536   
1537   listen_fds[listen_fds_num].fd = fd;
1538   snprintf (listen_fds[listen_fds_num].path,
1539       sizeof (listen_fds[listen_fds_num].path) - 1,
1540       "unix:%s", path);
1541   listen_fds_num++;
1543   return (0);
1544 } /* }}} int open_listen_socket_unix */
1546 static int open_listen_socket (const char *addr_orig) /* {{{ */
1548   struct addrinfo ai_hints;
1549   struct addrinfo *ai_res;
1550   struct addrinfo *ai_ptr;
1551   char addr_copy[NI_MAXHOST];
1552   char *addr;
1553   char *port;
1554   int status;
1556   assert (addr_orig != NULL);
1558   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1559   addr_copy[sizeof (addr_copy) - 1] = 0;
1560   addr = addr_copy;
1562   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1563     return (open_listen_socket_unix (addr + strlen ("unix:")));
1564   else if (addr[0] == '/')
1565     return (open_listen_socket_unix (addr));
1567   memset (&ai_hints, 0, sizeof (ai_hints));
1568   ai_hints.ai_flags = 0;
1569 #ifdef AI_ADDRCONFIG
1570   ai_hints.ai_flags |= AI_ADDRCONFIG;
1571 #endif
1572   ai_hints.ai_family = AF_UNSPEC;
1573   ai_hints.ai_socktype = SOCK_STREAM;
1575   port = NULL;
1576  if (*addr == '[') /* IPv6+port format */
1577   {
1578     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1579     addr++;
1581     port = strchr (addr, ']');
1582     if (port == NULL)
1583     {
1584       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1585           addr_orig);
1586       return (-1);
1587     }
1588     *port = 0;
1589     port++;
1591     if (*port == ':')
1592       port++;
1593     else if (*port == 0)
1594       port = NULL;
1595     else
1596     {
1597       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1598           port);
1599       return (-1);
1600     }
1601   } /* if (*addr = ']') */
1602   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1603   {
1604     port = rindex(addr, ':');
1605     if (port != NULL)
1606     {
1607       *port = 0;
1608       port++;
1609     }
1610   }
1611   ai_res = NULL;
1612   status = getaddrinfo (addr,
1613                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1614                         &ai_hints, &ai_res);
1615   if (status != 0)
1616   {
1617     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1618         "%s", addr, gai_strerror (status));
1619     return (-1);
1620   }
1622   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1623   {
1624     int fd;
1625     listen_socket_t *temp;
1626     int one = 1;
1628     temp = (listen_socket_t *) realloc (listen_fds,
1629         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1630     if (temp == NULL)
1631     {
1632       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1633       continue;
1634     }
1635     listen_fds = temp;
1636     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1638     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1639     if (fd < 0)
1640     {
1641       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1642       continue;
1643     }
1645     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1647     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1648     if (status != 0)
1649     {
1650       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1651       close (fd);
1652       continue;
1653     }
1655     status = listen (fd, /* backlog = */ 10);
1656     if (status != 0)
1657     {
1658       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1659       close (fd);
1660       return (-1);
1661     }
1663     listen_fds[listen_fds_num].fd = fd;
1664     strncpy (listen_fds[listen_fds_num].path, addr,
1665         sizeof (listen_fds[listen_fds_num].path) - 1);
1666     listen_fds_num++;
1667   } /* for (ai_ptr) */
1669   return (0);
1670 } /* }}} int open_listen_socket */
1672 static int close_listen_sockets (void) /* {{{ */
1674   size_t i;
1676   for (i = 0; i < listen_fds_num; i++)
1677   {
1678     close (listen_fds[i].fd);
1679     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1680       unlink (listen_fds[i].path + strlen ("unix:"));
1681   }
1683   free (listen_fds);
1684   listen_fds = NULL;
1685   listen_fds_num = 0;
1687   return (0);
1688 } /* }}} int close_listen_sockets */
1690 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1692   struct pollfd *pollfds;
1693   int pollfds_num;
1694   int status;
1695   int i;
1697   for (i = 0; i < config_listen_address_list_len; i++)
1698     open_listen_socket (config_listen_address_list[i]);
1700   if (config_listen_address_list_len < 1)
1701     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1703   if (listen_fds_num < 1)
1704   {
1705     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1706         "could be opened. Sorry.");
1707     return (NULL);
1708   }
1710   pollfds_num = listen_fds_num;
1711   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1712   if (pollfds == NULL)
1713   {
1714     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1715     return (NULL);
1716   }
1717   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1719   RRDD_LOG(LOG_INFO, "listening for connections");
1721   while (do_shutdown == 0)
1722   {
1723     assert (pollfds_num == ((int) listen_fds_num));
1724     for (i = 0; i < pollfds_num; i++)
1725     {
1726       pollfds[i].fd = listen_fds[i].fd;
1727       pollfds[i].events = POLLIN | POLLPRI;
1728       pollfds[i].revents = 0;
1729     }
1731     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1732     if (status == 0)
1733     {
1734       continue; /* timeout */
1735     }
1736     else if (status < 0)
1737     {
1738       status = errno;
1739       if (status != EINTR)
1740       {
1741         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1742       }
1743       continue;
1744     }
1746     for (i = 0; i < pollfds_num; i++)
1747     {
1748       int *client_sd;
1749       struct sockaddr_storage client_sa;
1750       socklen_t client_sa_size;
1751       pthread_t tid;
1752       pthread_attr_t attr;
1754       if (pollfds[i].revents == 0)
1755         continue;
1757       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1758       {
1759         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1760             "poll(2) returned something unexpected for listen FD #%i.",
1761             pollfds[i].fd);
1762         continue;
1763       }
1765       client_sd = (int *) malloc (sizeof (int));
1766       if (client_sd == NULL)
1767       {
1768         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1769         continue;
1770       }
1772       client_sa_size = sizeof (client_sa);
1773       *client_sd = accept (pollfds[i].fd,
1774           (struct sockaddr *) &client_sa, &client_sa_size);
1775       if (*client_sd < 0)
1776       {
1777         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1778         continue;
1779       }
1781       pthread_attr_init (&attr);
1782       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1784       status = pthread_create (&tid, &attr, connection_thread_main,
1785           /* args = */ (void *) client_sd);
1786       if (status != 0)
1787       {
1788         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1789         close (*client_sd);
1790         free (client_sd);
1791         continue;
1792       }
1793     } /* for (pollfds_num) */
1794   } /* while (do_shutdown == 0) */
1796   RRDD_LOG(LOG_INFO, "starting shutdown");
1798   close_listen_sockets ();
1800   pthread_mutex_lock (&connection_threads_lock);
1801   while (connection_threads_num > 0)
1802   {
1803     pthread_t wait_for;
1805     wait_for = connection_threads[0];
1807     pthread_mutex_unlock (&connection_threads_lock);
1808     pthread_join (wait_for, /* retval = */ NULL);
1809     pthread_mutex_lock (&connection_threads_lock);
1810   }
1811   pthread_mutex_unlock (&connection_threads_lock);
1813   return (NULL);
1814 } /* }}} void *listen_thread_main */
1816 static int daemonize (void) /* {{{ */
1818   int status;
1820   /* These structures are static, because `sigaction' behaves weird if the are
1821    * overwritten.. */
1822   static struct sigaction sa_int;
1823   static struct sigaction sa_term;
1824   static struct sigaction sa_pipe;
1826   if (!stay_foreground)
1827   {
1828     pid_t child;
1829     char *base_dir;
1831     child = fork ();
1832     if (child < 0)
1833     {
1834       fprintf (stderr, "daemonize: fork(2) failed.\n");
1835       return (-1);
1836     }
1837     else if (child > 0)
1838     {
1839       return (1);
1840     }
1842     /* Change into the /tmp directory. */
1843     base_dir = (config_base_dir != NULL)
1844       ? config_base_dir
1845       : "/tmp";
1846     status = chdir (base_dir);
1847     if (status != 0)
1848     {
1849       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1850       return (-1);
1851     }
1853     /* Become session leader */
1854     setsid ();
1856     /* Open the first three file descriptors to /dev/null */
1857     close (2);
1858     close (1);
1859     close (0);
1861     open ("/dev/null", O_RDWR);
1862     dup (0);
1863     dup (0);
1864   } /* if (!stay_foreground) */
1866   /* Install signal handlers */
1867   memset (&sa_int, 0, sizeof (sa_int));
1868   sa_int.sa_handler = sig_int_handler;
1869   sigaction (SIGINT, &sa_int, NULL);
1871   memset (&sa_term, 0, sizeof (sa_term));
1872   sa_term.sa_handler = sig_term_handler;
1873   sigaction (SIGTERM, &sa_term, NULL);
1875   memset (&sa_pipe, 0, sizeof (sa_pipe));
1876   sa_pipe.sa_handler = SIG_IGN;
1877   sigaction (SIGPIPE, &sa_pipe, NULL);
1879   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1880   RRDD_LOG(LOG_INFO, "starting up");
1882   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1883   if (cache_tree == NULL)
1884   {
1885     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1886     return (-1);
1887   }
1889   status = write_pidfile ();
1890   return status;
1891 } /* }}} int daemonize */
1893 static int cleanup (void) /* {{{ */
1895   do_shutdown++;
1897   pthread_cond_signal (&cache_cond);
1898   pthread_join (queue_thread, /* return = */ NULL);
1900   remove_pidfile ();
1902   RRDD_LOG(LOG_INFO, "goodbye");
1903   closelog ();
1905   return (0);
1906 } /* }}} int cleanup */
1908 static int read_options (int argc, char **argv) /* {{{ */
1910   int option;
1911   int status = 0;
1913   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1914   {
1915     switch (option)
1916     {
1917       case 'g':
1918         stay_foreground=1;
1919         break;
1921       case 'l':
1922       {
1923         char **temp;
1925         temp = (char **) realloc (config_listen_address_list,
1926             sizeof (char *) * (config_listen_address_list_len + 1));
1927         if (temp == NULL)
1928         {
1929           fprintf (stderr, "read_options: realloc failed.\n");
1930           return (2);
1931         }
1932         config_listen_address_list = temp;
1934         temp[config_listen_address_list_len] = strdup (optarg);
1935         if (temp[config_listen_address_list_len] == NULL)
1936         {
1937           fprintf (stderr, "read_options: strdup failed.\n");
1938           return (2);
1939         }
1940         config_listen_address_list_len++;
1941       }
1942       break;
1944       case 'f':
1945       {
1946         int temp;
1948         temp = atoi (optarg);
1949         if (temp > 0)
1950           config_flush_interval = temp;
1951         else
1952         {
1953           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1954           status = 3;
1955         }
1956       }
1957       break;
1959       case 'w':
1960       {
1961         int temp;
1963         temp = atoi (optarg);
1964         if (temp > 0)
1965           config_write_interval = temp;
1966         else
1967         {
1968           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1969           status = 2;
1970         }
1971       }
1972       break;
1974       case 'z':
1975       {
1976         int temp;
1978         temp = atoi(optarg);
1979         if (temp > 0)
1980           config_write_jitter = temp;
1981         else
1982         {
1983           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1984           status = 2;
1985         }
1987         break;
1988       }
1990       case 'b':
1991       {
1992         size_t len;
1994         if (config_base_dir != NULL)
1995           free (config_base_dir);
1996         config_base_dir = strdup (optarg);
1997         if (config_base_dir == NULL)
1998         {
1999           fprintf (stderr, "read_options: strdup failed.\n");
2000           return (3);
2001         }
2003         len = strlen (config_base_dir);
2004         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2005         {
2006           config_base_dir[len - 1] = 0;
2007           len--;
2008         }
2010         if (len < 1)
2011         {
2012           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2013           return (4);
2014         }
2015       }
2016       break;
2018       case 'p':
2019       {
2020         if (config_pid_file != NULL)
2021           free (config_pid_file);
2022         config_pid_file = strdup (optarg);
2023         if (config_pid_file == NULL)
2024         {
2025           fprintf (stderr, "read_options: strdup failed.\n");
2026           return (3);
2027         }
2028       }
2029       break;
2031       case 'j':
2032       {
2033         struct stat statbuf;
2034         const char *dir = optarg;
2036         status = stat(dir, &statbuf);
2037         if (status != 0)
2038         {
2039           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2040           return 6;
2041         }
2043         if (!S_ISDIR(statbuf.st_mode)
2044             || access(dir, R_OK|W_OK|X_OK) != 0)
2045         {
2046           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2047                   errno ? rrd_strerror(errno) : "");
2048           return 6;
2049         }
2051         journal_cur = malloc(PATH_MAX + 1);
2052         journal_old = malloc(PATH_MAX + 1);
2053         if (journal_cur == NULL || journal_old == NULL)
2054         {
2055           fprintf(stderr, "malloc failure for journal files\n");
2056           return 6;
2057         }
2058         else 
2059         {
2060           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2061           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2062         }
2063       }
2064       break;
2066       case 'h':
2067       case '?':
2068         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2069             "\n"
2070             "Usage: rrdcached [options]\n"
2071             "\n"
2072             "Valid options are:\n"
2073             "  -l <address>  Socket address to listen to.\n"
2074             "  -w <seconds>  Interval in which to write data.\n"
2075             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2076             "  -f <seconds>  Interval in which to flush dead data.\n"
2077             "  -p <file>     Location of the PID-file.\n"
2078             "  -b <dir>      Base directory to change to.\n"
2079             "  -g            Do not fork and run in the foreground.\n"
2080             "  -j <dir>      Directory in which to create the journal files.\n"
2081             "\n"
2082             "For more information and a detailed description of all options "
2083             "please refer\n"
2084             "to the rrdcached(1) manual page.\n",
2085             VERSION);
2086         status = -1;
2087         break;
2088     } /* switch (option) */
2089   } /* while (getopt) */
2091   /* advise the user when values are not sane */
2092   if (config_flush_interval < 2 * config_write_interval)
2093     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2094             " 2x write interval (-w) !\n");
2095   if (config_write_jitter > config_write_interval)
2096     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2097             " write interval (-w) !\n");
2099   return (status);
2100 } /* }}} int read_options */
2102 int main (int argc, char **argv)
2104   int status;
2106   status = read_options (argc, argv);
2107   if (status != 0)
2108   {
2109     if (status < 0)
2110       status = 0;
2111     return (status);
2112   }
2114   status = daemonize ();
2115   if (status == 1)
2116   {
2117     struct sigaction sigchld;
2119     memset (&sigchld, 0, sizeof (sigchld));
2120     sigchld.sa_handler = SIG_IGN;
2121     sigaction (SIGCHLD, &sigchld, NULL);
2123     return (0);
2124   }
2125   else if (status != 0)
2126   {
2127     fprintf (stderr, "daemonize failed, exiting.\n");
2128     return (1);
2129   }
2131   if (journal_cur != NULL)
2132   {
2133     int had_journal = 0;
2135     pthread_mutex_lock(&journal_lock);
2137     RRDD_LOG(LOG_INFO, "checking for journal files");
2139     had_journal += journal_replay(journal_old);
2140     had_journal += journal_replay(journal_cur);
2142     if (had_journal)
2143       flush_old_values(-1);
2145     pthread_mutex_unlock(&journal_lock);
2146     journal_rotate();
2148     RRDD_LOG(LOG_INFO, "journal processing complete");
2149   }
2151   /* start the queue thread */
2152   memset (&queue_thread, 0, sizeof (queue_thread));
2153   status = pthread_create (&queue_thread,
2154                            NULL, /* attr */
2155                            queue_thread_main,
2156                            NULL); /* args */
2157   if (status != 0)
2158   {
2159     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2160     cleanup();
2161     return (1);
2162   }
2164   listen_thread_main (NULL);
2165   cleanup ();
2167   return (0);
2168 } /* int main */
2170 /*
2171  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2172  */