Code

This patch introduces "fast shutdown" mode and two new signals.
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 struct listen_socket_s
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122   pthread_cond_t  flushed;
123   cache_item_t *next;
124 };
126 struct callback_flush_data_s
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
168 static int config_write_interval = 300;
169 static int config_write_jitter   = 0;
170 static int config_flush_interval = 3600;
171 static int config_flush_at_shutdown = 0;
172 static char *config_pid_file = NULL;
173 static char *config_base_dir = NULL;
175 static char **config_listen_address_list = NULL;
176 static int config_listen_address_list_len = 0;
178 static uint64_t stats_queue_length = 0;
179 static uint64_t stats_updates_received = 0;
180 static uint64_t stats_flush_received = 0;
181 static uint64_t stats_updates_written = 0;
182 static uint64_t stats_data_sets_written = 0;
183 static uint64_t stats_journal_bytes = 0;
184 static uint64_t stats_journal_rotate = 0;
185 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
187 /* Journaled updates */
188 static char *journal_cur = NULL;
189 static char *journal_old = NULL;
190 static FILE *journal_fh = NULL;
191 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
192 static int journal_write(char *cmd, char *args);
193 static void journal_done(void);
194 static void journal_rotate(void);
196 /* 
197  * Functions
198  */
199 static void sig_common (const char *sig) /* {{{ */
201   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
202   do_shutdown++;
203   pthread_cond_broadcast(&cache_cond);
204 } /* }}} void sig_common */
206 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
208   sig_common("INT");
209 } /* }}} void sig_int_handler */
211 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
213   sig_common("TERM");
214 } /* }}} void sig_term_handler */
216 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
218   config_flush_at_shutdown = 1;
219   sig_common("USR1");
220 } /* }}} void sig_usr1_handler */
222 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
224   config_flush_at_shutdown = 0;
225   sig_common("USR2");
226 } /* }}} void sig_usr2_handler */
228 static void install_signal_handlers(void) /* {{{ */
230   /* These structures are static, because `sigaction' behaves weird if the are
231    * overwritten.. */
232   static struct sigaction sa_int;
233   static struct sigaction sa_term;
234   static struct sigaction sa_pipe;
235   static struct sigaction sa_usr1;
236   static struct sigaction sa_usr2;
238   /* Install signal handlers */
239   memset (&sa_int, 0, sizeof (sa_int));
240   sa_int.sa_handler = sig_int_handler;
241   sigaction (SIGINT, &sa_int, NULL);
243   memset (&sa_term, 0, sizeof (sa_term));
244   sa_term.sa_handler = sig_term_handler;
245   sigaction (SIGTERM, &sa_term, NULL);
247   memset (&sa_pipe, 0, sizeof (sa_pipe));
248   sa_pipe.sa_handler = SIG_IGN;
249   sigaction (SIGPIPE, &sa_pipe, NULL);
251   memset (&sa_pipe, 0, sizeof (sa_usr1));
252   sa_usr1.sa_handler = sig_usr1_handler;
253   sigaction (SIGUSR1, &sa_usr1, NULL);
255   memset (&sa_usr2, 0, sizeof (sa_usr2));
256   sa_usr2.sa_handler = sig_usr2_handler;
257   sigaction (SIGUSR2, &sa_usr2, NULL);
259 } /* }}} void install_signal_handlers */
261 static int open_pidfile(void) /* {{{ */
263   int fd;
264   char *file;
266   file = (config_pid_file != NULL)
267     ? config_pid_file
268     : LOCALSTATEDIR "/run/rrdcached.pid";
270   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
271   if (fd < 0)
272     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
273             file, rrd_strerror(errno));
275   return(fd);
278 static int write_pidfile (int fd) /* {{{ */
280   pid_t pid;
281   FILE *fh;
283   pid = getpid ();
285   fh = fdopen (fd, "w");
286   if (fh == NULL)
287   {
288     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
289     close(fd);
290     return (-1);
291   }
293   fprintf (fh, "%i\n", (int) pid);
294   fclose (fh);
296   return (0);
297 } /* }}} int write_pidfile */
299 static int remove_pidfile (void) /* {{{ */
301   char *file;
302   int status;
304   file = (config_pid_file != NULL)
305     ? config_pid_file
306     : LOCALSTATEDIR "/run/rrdcached.pid";
308   status = unlink (file);
309   if (status == 0)
310     return (0);
311   return (errno);
312 } /* }}} int remove_pidfile */
314 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
316   char    *buffer;
317   size_t   buffer_used;
318   size_t   buffer_free;
319   ssize_t  status;
321   buffer       = (char *) buffer_void;
322   buffer_used  = 0;
323   buffer_free  = buffer_size;
325   while (buffer_free > 0)
326   {
327     status = read (fd, buffer + buffer_used, buffer_free);
328     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
329       continue;
331     if (status < 0)
332       return (-1);
334     if (status == 0)
335       return (0);
337     assert ((0 > status) || (buffer_free >= (size_t) status));
339     buffer_free = buffer_free - status;
340     buffer_used = buffer_used + status;
342     if (buffer[buffer_used - 1] == '\n')
343       break;
344   }
346   assert (buffer_used > 0);
348   if (buffer[buffer_used - 1] != '\n')
349   {
350     errno = ENOBUFS;
351     return (-1);
352   }
354   buffer[buffer_used - 1] = 0;
356   /* Fix network line endings. */
357   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
358   {
359     buffer_used--;
360     buffer[buffer_used - 1] = 0;
361   }
363   return (buffer_used);
364 } /* }}} ssize_t sread */
366 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
368   const char *ptr;
369   size_t      nleft;
370   ssize_t     status;
372   /* special case for journal replay */
373   if (fd < 0) return 0;
375   ptr   = (const char *) buf;
376   nleft = count;
378   while (nleft > 0)
379   {
380     status = write (fd, (const void *) ptr, nleft);
382     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
383       continue;
385     if (status < 0)
386       return (status);
388     nleft -= status;
389     ptr   += status;
390   }
392   return (0);
393 } /* }}} ssize_t swrite */
395 static void _wipe_ci_values(cache_item_t *ci, time_t when)
397   ci->values = NULL;
398   ci->values_num = 0;
400   ci->last_flush_time = when;
401   if (config_write_jitter > 0)
402     ci->last_flush_time += (random() % config_write_jitter);
404   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
407 /*
408  * enqueue_cache_item:
409  * `cache_lock' must be acquired before calling this function!
410  */
411 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
412     queue_side_t side)
414   int did_insert = 0;
416   if (ci == NULL)
417     return (-1);
419   if (ci->values_num == 0)
420     return (0);
422   if (side == HEAD)
423   {
424     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
425     {
426       assert (ci->next == NULL);
427       ci->next = cache_queue_head;
428       cache_queue_head = ci;
430       if (cache_queue_tail == NULL)
431         cache_queue_tail = cache_queue_head;
433       did_insert = 1;
434     }
435     else if (cache_queue_head == ci)
436     {
437       /* do nothing */
438     }
439     else /* enqueued, but not first entry */
440     {
441       cache_item_t *prev;
443       /* find previous entry */
444       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
445         if (prev->next == ci)
446           break;
447       assert (prev != NULL);
449       /* move to the front */
450       prev->next = ci->next;
451       ci->next = cache_queue_head;
452       cache_queue_head = ci;
454       /* check if we need to adapt the tail */
455       if (cache_queue_tail == ci)
456         cache_queue_tail = prev;
457     }
458   }
459   else /* (side == TAIL) */
460   {
461     /* We don't move values back in the list.. */
462     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
463       return (0);
465     assert (ci->next == NULL);
467     if (cache_queue_tail == NULL)
468       cache_queue_head = ci;
469     else
470       cache_queue_tail->next = ci;
471     cache_queue_tail = ci;
473     did_insert = 1;
474   }
476   ci->flags |= CI_FLAGS_IN_QUEUE;
478   if (did_insert)
479   {
480     pthread_cond_broadcast(&cache_cond);
481     pthread_mutex_lock (&stats_lock);
482     stats_queue_length++;
483     pthread_mutex_unlock (&stats_lock);
484   }
486   return (0);
487 } /* }}} int enqueue_cache_item */
489 /*
490  * tree_callback_flush:
491  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
492  * while this is in progress.
493  */
494 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
495     gpointer data)
497   cache_item_t *ci;
498   callback_flush_data_t *cfd;
500   ci = (cache_item_t *) value;
501   cfd = (callback_flush_data_t *) data;
503   if ((ci->last_flush_time <= cfd->abs_timeout)
504       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
505       && (ci->values_num > 0))
506   {
507     enqueue_cache_item (ci, TAIL);
508   }
509   else if ((do_shutdown != 0)
510       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
511       && (ci->values_num > 0))
512   {
513     enqueue_cache_item (ci, TAIL);
514   }
515   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
516       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
517       && (ci->values_num <= 0))
518   {
519     char **temp;
521     temp = (char **) realloc (cfd->keys,
522         sizeof (char *) * (cfd->keys_num + 1));
523     if (temp == NULL)
524     {
525       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
526       return (FALSE);
527     }
528     cfd->keys = temp;
529     /* Make really sure this points to the _same_ place */
530     assert ((char *) key == ci->file);
531     cfd->keys[cfd->keys_num] = (char *) key;
532     cfd->keys_num++;
533   }
535   return (FALSE);
536 } /* }}} gboolean tree_callback_flush */
538 static int flush_old_values (int max_age)
540   callback_flush_data_t cfd;
541   size_t k;
543   memset (&cfd, 0, sizeof (cfd));
544   /* Pass the current time as user data so that we don't need to call
545    * `time' for each node. */
546   cfd.now = time (NULL);
547   cfd.keys = NULL;
548   cfd.keys_num = 0;
550   if (max_age > 0)
551     cfd.abs_timeout = cfd.now - max_age;
552   else
553     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
555   /* `tree_callback_flush' will return the keys of all values that haven't
556    * been touched in the last `config_flush_interval' seconds in `cfd'.
557    * The char*'s in this array point to the same memory as ci->file, so we
558    * don't need to free them separately. */
559   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
561   for (k = 0; k < cfd.keys_num; k++)
562   {
563     cache_item_t *ci;
565     /* This must not fail. */
566     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
567     assert (ci != NULL);
569     /* If we end up here with values available, something's seriously
570      * messed up. */
571     assert (ci->values_num == 0);
573     /* Remove the node from the tree */
574     g_tree_remove (cache_tree, cfd.keys[k]);
575     cfd.keys[k] = NULL;
577     /* Now free and clean up `ci'. */
578     free (ci->file);
579     ci->file = NULL;
580     free (ci);
581     ci = NULL;
582   } /* for (k = 0; k < cfd.keys_num; k++) */
584   if (cfd.keys != NULL)
585   {
586     free (cfd.keys);
587     cfd.keys = NULL;
588   }
590   return (0);
591 } /* int flush_old_values */
593 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
595   struct timeval now;
596   struct timespec next_flush;
597   int final_flush = 0; /* make sure we only flush once on shutdown */
599   gettimeofday (&now, NULL);
600   next_flush.tv_sec = now.tv_sec + config_flush_interval;
601   next_flush.tv_nsec = 1000 * now.tv_usec;
603   pthread_mutex_lock (&cache_lock);
604   while ((do_shutdown == 0) || (cache_queue_head != NULL))
605   {
606     cache_item_t *ci;
607     char *file;
608     char **values;
609     int values_num;
610     int status;
611     int i;
613     /* First, check if it's time to do the cache flush. */
614     gettimeofday (&now, NULL);
615     if ((now.tv_sec > next_flush.tv_sec)
616         || ((now.tv_sec == next_flush.tv_sec)
617           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
618     {
619       /* Flush all values that haven't been written in the last
620        * `config_write_interval' seconds. */
621       flush_old_values (config_write_interval);
623       /* Determine the time of the next cache flush. */
624       while (next_flush.tv_sec <= now.tv_sec)
625         next_flush.tv_sec += config_flush_interval;
627       /* unlock the cache while we rotate so we don't block incoming
628        * updates if the fsync() blocks on disk I/O */
629       pthread_mutex_unlock(&cache_lock);
630       journal_rotate();
631       pthread_mutex_lock(&cache_lock);
632     }
634     /* Now, check if there's something to store away. If not, wait until
635      * something comes in or it's time to do the cache flush.  if we are
636      * shutting down, do not wait around.  */
637     if (cache_queue_head == NULL && !do_shutdown)
638     {
639       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
640       if ((status != 0) && (status != ETIMEDOUT))
641       {
642         RRDD_LOG (LOG_ERR, "queue_thread_main: "
643             "pthread_cond_timedwait returned %i.", status);
644       }
645     }
647     /* We're about to shut down */
648     if (do_shutdown != 0 && !final_flush++)
649     {
650       if (config_flush_at_shutdown)
651         flush_old_values (-1); /* flush everything */
652       else
653         break;
654     }
656     /* Check if a value has arrived. This may be NULL if we timed out or there
657      * was an interrupt such as a signal. */
658     if (cache_queue_head == NULL)
659       continue;
661     ci = cache_queue_head;
663     /* copy the relevant parts */
664     file = strdup (ci->file);
665     if (file == NULL)
666     {
667       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
668       continue;
669     }
671     assert(ci->values != NULL);
672     assert(ci->values_num > 0);
674     values = ci->values;
675     values_num = ci->values_num;
677     _wipe_ci_values(ci, time(NULL));
679     cache_queue_head = ci->next;
680     if (cache_queue_head == NULL)
681       cache_queue_tail = NULL;
682     ci->next = NULL;
684     pthread_mutex_lock (&stats_lock);
685     assert (stats_queue_length > 0);
686     stats_queue_length--;
687     pthread_mutex_unlock (&stats_lock);
689     pthread_mutex_unlock (&cache_lock);
691     rrd_clear_error ();
692     status = rrd_update_r (file, NULL, values_num, (void *) values);
693     if (status != 0)
694     {
695       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
696           "rrd_update_r (%s) failed with status %i. (%s)",
697           file, status, rrd_get_error());
698     }
700     journal_write("wrote", file);
701     pthread_cond_broadcast(&ci->flushed);
703     for (i = 0; i < values_num; i++)
704       free (values[i]);
706     free(values);
707     free(file);
709     if (status == 0)
710     {
711       pthread_mutex_lock (&stats_lock);
712       stats_updates_written++;
713       stats_data_sets_written += values_num;
714       pthread_mutex_unlock (&stats_lock);
715     }
717     pthread_mutex_lock (&cache_lock);
719     /* We're about to shut down */
720     if (do_shutdown != 0 && !final_flush++)
721     {
722       if (config_flush_at_shutdown)
723           flush_old_values (-1); /* flush everything */
724       else
725         break;
726     }
727   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
728   pthread_mutex_unlock (&cache_lock);
730   if (config_flush_at_shutdown)
731   {
732     assert(cache_queue_head == NULL);
733     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
734   }
736   journal_done();
738   return (NULL);
739 } /* }}} void *queue_thread_main */
741 static int buffer_get_field (char **buffer_ret, /* {{{ */
742     size_t *buffer_size_ret, char **field_ret)
744   char *buffer;
745   size_t buffer_pos;
746   size_t buffer_size;
747   char *field;
748   size_t field_size;
749   int status;
751   buffer = *buffer_ret;
752   buffer_pos = 0;
753   buffer_size = *buffer_size_ret;
754   field = *buffer_ret;
755   field_size = 0;
757   if (buffer_size <= 0)
758     return (-1);
760   /* This is ensured by `handle_request'. */
761   assert (buffer[buffer_size - 1] == '\0');
763   status = -1;
764   while (buffer_pos < buffer_size)
765   {
766     /* Check for end-of-field or end-of-buffer */
767     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
768     {
769       field[field_size] = 0;
770       field_size++;
771       buffer_pos++;
772       status = 0;
773       break;
774     }
775     /* Handle escaped characters. */
776     else if (buffer[buffer_pos] == '\\')
777     {
778       if (buffer_pos >= (buffer_size - 1))
779         break;
780       buffer_pos++;
781       field[field_size] = buffer[buffer_pos];
782       field_size++;
783       buffer_pos++;
784     }
785     /* Normal operation */ 
786     else
787     {
788       field[field_size] = buffer[buffer_pos];
789       field_size++;
790       buffer_pos++;
791     }
792   } /* while (buffer_pos < buffer_size) */
794   if (status != 0)
795     return (status);
797   *buffer_ret = buffer + buffer_pos;
798   *buffer_size_ret = buffer_size - buffer_pos;
799   *field_ret = field;
801   return (0);
802 } /* }}} int buffer_get_field */
804 static int flush_file (const char *filename) /* {{{ */
806   cache_item_t *ci;
808   pthread_mutex_lock (&cache_lock);
810   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
811   if (ci == NULL)
812   {
813     pthread_mutex_unlock (&cache_lock);
814     return (ENOENT);
815   }
817   /* Enqueue at head */
818   enqueue_cache_item (ci, HEAD);
820   pthread_cond_wait(&ci->flushed, &cache_lock);
821   pthread_mutex_unlock(&cache_lock);
823   return (0);
824 } /* }}} int flush_file */
826 static int handle_request_help (int fd, /* {{{ */
827     char *buffer, size_t buffer_size)
829   int status;
830   char **help_text;
831   size_t help_text_len;
832   char *command;
833   size_t i;
835   char *help_help[] =
836   {
837     "5 Command overview\n",
838     "FLUSH <filename>\n",
839     "FLUSHALL\n",
840     "HELP [<command>]\n",
841     "UPDATE <filename> <values> [<values> ...]\n",
842     "STATS\n"
843   };
844   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
846   char *help_flush[] =
847   {
848     "4 Help for FLUSH\n",
849     "Usage: FLUSH <filename>\n",
850     "\n",
851     "Adds the given filename to the head of the update queue and returns\n",
852     "after is has been dequeued.\n"
853   };
854   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
856   char *help_flushall[] =
857   {
858     "3 Help for FLUSHALL\n",
859     "Usage: FLUSHALL\n",
860     "\n",
861     "Triggers writing of all pending updates.  Returns immediately.\n"
862   };
863   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
865   char *help_update[] =
866   {
867     "9 Help for UPDATE\n",
868     "Usage: UPDATE <filename> <values> [<values> ...]\n"
869     "\n",
870     "Adds the given file to the internal cache if it is not yet known and\n",
871     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
872     "for details.\n",
873     "\n",
874     "Each <values> has the following form:\n",
875     "  <values> = <time>:<value>[:<value>[...]]\n",
876     "See the rrdupdate(1) manpage for details.\n"
877   };
878   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
880   char *help_stats[] =
881   {
882     "4 Help for STATS\n",
883     "Usage: STATS\n",
884     "\n",
885     "Returns some performance counters, see the rrdcached(1) manpage for\n",
886     "a description of the values.\n"
887   };
888   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
890   status = buffer_get_field (&buffer, &buffer_size, &command);
891   if (status != 0)
892   {
893     help_text = help_help;
894     help_text_len = help_help_len;
895   }
896   else
897   {
898     if (strcasecmp (command, "update") == 0)
899     {
900       help_text = help_update;
901       help_text_len = help_update_len;
902     }
903     else if (strcasecmp (command, "flush") == 0)
904     {
905       help_text = help_flush;
906       help_text_len = help_flush_len;
907     }
908     else if (strcasecmp (command, "flushall") == 0)
909     {
910       help_text = help_flushall;
911       help_text_len = help_flushall_len;
912     }
913     else if (strcasecmp (command, "stats") == 0)
914     {
915       help_text = help_stats;
916       help_text_len = help_stats_len;
917     }
918     else
919     {
920       help_text = help_help;
921       help_text_len = help_help_len;
922     }
923   }
925   for (i = 0; i < help_text_len; i++)
926   {
927     status = swrite (fd, help_text[i], strlen (help_text[i]));
928     if (status < 0)
929     {
930       status = errno;
931       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
932       return (status);
933     }
934   }
936   return (0);
937 } /* }}} int handle_request_help */
939 static int handle_request_stats (int fd, /* {{{ */
940     char *buffer __attribute__((unused)),
941     size_t buffer_size __attribute__((unused)))
943   int status;
944   char outbuf[CMD_MAX];
946   uint64_t copy_queue_length;
947   uint64_t copy_updates_received;
948   uint64_t copy_flush_received;
949   uint64_t copy_updates_written;
950   uint64_t copy_data_sets_written;
951   uint64_t copy_journal_bytes;
952   uint64_t copy_journal_rotate;
954   uint64_t tree_nodes_number;
955   uint64_t tree_depth;
957   pthread_mutex_lock (&stats_lock);
958   copy_queue_length       = stats_queue_length;
959   copy_updates_received   = stats_updates_received;
960   copy_flush_received     = stats_flush_received;
961   copy_updates_written    = stats_updates_written;
962   copy_data_sets_written  = stats_data_sets_written;
963   copy_journal_bytes      = stats_journal_bytes;
964   copy_journal_rotate     = stats_journal_rotate;
965   pthread_mutex_unlock (&stats_lock);
967   pthread_mutex_lock (&cache_lock);
968   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
969   tree_depth        = (uint64_t) g_tree_height (cache_tree);
970   pthread_mutex_unlock (&cache_lock);
972 #define RRDD_STATS_SEND \
973   outbuf[sizeof (outbuf) - 1] = 0; \
974   status = swrite (fd, outbuf, strlen (outbuf)); \
975   if (status < 0) \
976   { \
977     status = errno; \
978     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
979     return (status); \
980   }
982   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
983   RRDD_STATS_SEND;
985   snprintf (outbuf, sizeof (outbuf),
986       "QueueLength: %"PRIu64"\n", copy_queue_length);
987   RRDD_STATS_SEND;
989   snprintf (outbuf, sizeof (outbuf),
990       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
991   RRDD_STATS_SEND;
993   snprintf (outbuf, sizeof (outbuf),
994       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
995   RRDD_STATS_SEND;
997   snprintf (outbuf, sizeof (outbuf),
998       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
999   RRDD_STATS_SEND;
1001   snprintf (outbuf, sizeof (outbuf),
1002       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1003   RRDD_STATS_SEND;
1005   snprintf (outbuf, sizeof (outbuf),
1006       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1007   RRDD_STATS_SEND;
1009   snprintf (outbuf, sizeof (outbuf),
1010       "TreeDepth: %"PRIu64"\n", tree_depth);
1011   RRDD_STATS_SEND;
1013   snprintf (outbuf, sizeof(outbuf),
1014       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1015   RRDD_STATS_SEND;
1017   snprintf (outbuf, sizeof(outbuf),
1018       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1019   RRDD_STATS_SEND;
1021   return (0);
1022 #undef RRDD_STATS_SEND
1023 } /* }}} int handle_request_stats */
1025 static int handle_request_flush (int fd, /* {{{ */
1026     char *buffer, size_t buffer_size)
1028   char *file;
1029   int status;
1030   char result[CMD_MAX];
1032   status = buffer_get_field (&buffer, &buffer_size, &file);
1033   if (status != 0)
1034   {
1035     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1036   }
1037   else
1038   {
1039     pthread_mutex_lock(&stats_lock);
1040     stats_flush_received++;
1041     pthread_mutex_unlock(&stats_lock);
1043     status = flush_file (file);
1044     if (status == 0)
1045       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1046     else if (status == ENOENT)
1047     {
1048       /* no file in our tree; see whether it exists at all */
1049       struct stat statbuf;
1051       memset(&statbuf, 0, sizeof(statbuf));
1052       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1053         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1054       else
1055         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1056     }
1057     else if (status < 0)
1058       strncpy (result, "-1 Internal error.\n", sizeof (result));
1059     else
1060       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1061   }
1062   result[sizeof (result) - 1] = 0;
1064   status = swrite (fd, result, strlen (result));
1065   if (status < 0)
1066   {
1067     status = errno;
1068     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1069     return (status);
1070   }
1072   return (0);
1073 } /* }}} int handle_request_flush */
1075 static int handle_request_flushall(int fd) /* {{{ */
1077   int status;
1078   char answer[] ="0 Started flush.\n";
1080   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1082   pthread_mutex_lock(&cache_lock);
1083   flush_old_values(-1);
1084   pthread_mutex_unlock(&cache_lock);
1086   status = swrite(fd, answer, strlen(answer));
1087   if (status < 0)
1088   {
1089     status = errno;
1090     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1091   }
1093   return (status);
1096 static int handle_request_update (int fd, /* {{{ */
1097     char *buffer, size_t buffer_size)
1099   char *file;
1100   int values_num = 0;
1101   int status;
1103   time_t now;
1105   cache_item_t *ci;
1106   char answer[CMD_MAX];
1108 #define RRDD_UPDATE_SEND \
1109   answer[sizeof (answer) - 1] = 0; \
1110   status = swrite (fd, answer, strlen (answer)); \
1111   if (status < 0) \
1112   { \
1113     status = errno; \
1114     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1115     return (status); \
1116   }
1118   now = time (NULL);
1120   status = buffer_get_field (&buffer, &buffer_size, &file);
1121   if (status != 0)
1122   {
1123     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1124         sizeof (answer));
1125     RRDD_UPDATE_SEND;
1126     return (0);
1127   }
1129   pthread_mutex_lock(&stats_lock);
1130   stats_updates_received++;
1131   pthread_mutex_unlock(&stats_lock);
1133   pthread_mutex_lock (&cache_lock);
1134   ci = g_tree_lookup (cache_tree, file);
1136   if (ci == NULL) /* {{{ */
1137   {
1138     struct stat statbuf;
1140     /* don't hold the lock while we setup; stat(2) might block */
1141     pthread_mutex_unlock(&cache_lock);
1143     memset (&statbuf, 0, sizeof (statbuf));
1144     status = stat (file, &statbuf);
1145     if (status != 0)
1146     {
1147       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1149       status = errno;
1150       if (status == ENOENT)
1151         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1152       else
1153         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1154             status);
1155       RRDD_UPDATE_SEND;
1156       return (0);
1157     }
1158     if (!S_ISREG (statbuf.st_mode))
1159     {
1160       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1161       RRDD_UPDATE_SEND;
1162       return (0);
1163     }
1164     if (access(file, R_OK|W_OK) != 0)
1165     {
1166       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1167                 file, rrd_strerror(errno));
1168       RRDD_UPDATE_SEND;
1169       return (0);
1170     }
1172     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1173     if (ci == NULL)
1174     {
1175       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1177       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1178       RRDD_UPDATE_SEND;
1179       return (0);
1180     }
1181     memset (ci, 0, sizeof (cache_item_t));
1183     ci->file = strdup (file);
1184     if (ci->file == NULL)
1185     {
1186       free (ci);
1187       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1189       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1190       RRDD_UPDATE_SEND;
1191       return (0);
1192     }
1194     _wipe_ci_values(ci, now);
1195     ci->flags = CI_FLAGS_IN_TREE;
1197     pthread_mutex_lock(&cache_lock);
1198     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1199   } /* }}} */
1200   assert (ci != NULL);
1202   while (buffer_size > 0)
1203   {
1204     char **temp;
1205     char *value;
1207     status = buffer_get_field (&buffer, &buffer_size, &value);
1208     if (status != 0)
1209     {
1210       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1211       break;
1212     }
1214     temp = (char **) realloc (ci->values,
1215         sizeof (char *) * (ci->values_num + 1));
1216     if (temp == NULL)
1217     {
1218       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1219       continue;
1220     }
1221     ci->values = temp;
1223     ci->values[ci->values_num] = strdup (value);
1224     if (ci->values[ci->values_num] == NULL)
1225     {
1226       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1227       continue;
1228     }
1229     ci->values_num++;
1231     values_num++;
1232   }
1234   if (((now - ci->last_flush_time) >= config_write_interval)
1235       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1236       && (ci->values_num > 0))
1237   {
1238     enqueue_cache_item (ci, TAIL);
1239   }
1241   pthread_mutex_unlock (&cache_lock);
1243   if (values_num < 1)
1244   {
1245     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1246   }
1247   else
1248   {
1249     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1250         (values_num == 1) ? "" : "s");
1251   }
1252   RRDD_UPDATE_SEND;
1253   return (0);
1254 #undef RRDD_UPDATE_SEND
1255 } /* }}} int handle_request_update */
1257 /* we came across a "WROTE" entry during journal replay.
1258  * throw away any values that we have accumulated for this file
1259  */
1260 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1261                                  const char *buffer,
1262                                  size_t buffer_size __attribute__((unused)))
1264   int i;
1265   cache_item_t *ci;
1266   const char *file = buffer;
1268   pthread_mutex_lock(&cache_lock);
1270   ci = g_tree_lookup(cache_tree, file);
1271   if (ci == NULL)
1272   {
1273     pthread_mutex_unlock(&cache_lock);
1274     return (0);
1275   }
1277   if (ci->values)
1278   {
1279     for (i=0; i < ci->values_num; i++)
1280       free(ci->values[i]);
1282     free(ci->values);
1283   }
1285   _wipe_ci_values(ci, time(NULL));
1287   pthread_mutex_unlock(&cache_lock);
1288   return (0);
1289 } /* }}} int handle_request_wrote */
1291 /* if fd < 0, we are in journal replay mode */
1292 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1294   char *buffer_ptr;
1295   char *command;
1296   int status;
1298   assert (buffer[buffer_size - 1] == '\0');
1300   buffer_ptr = buffer;
1301   command = NULL;
1302   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1303   if (status != 0)
1304   {
1305     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1306     return (-1);
1307   }
1309   if (strcasecmp (command, "update") == 0)
1310   {
1311     /* don't re-write updates in replay mode */
1312     if (fd >= 0)
1313       journal_write(command, buffer_ptr);
1315     return (handle_request_update (fd, buffer_ptr, buffer_size));
1316   }
1317   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1318   {
1319     /* this is only valid in replay mode */
1320     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1321   }
1322   else if (strcasecmp (command, "flush") == 0)
1323   {
1324     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1325   }
1326   else if (strcasecmp (command, "flushall") == 0)
1327   {
1328     return (handle_request_flushall(fd));
1329   }
1330   else if (strcasecmp (command, "stats") == 0)
1331   {
1332     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1333   }
1334   else if (strcasecmp (command, "help") == 0)
1335   {
1336     return (handle_request_help (fd, buffer_ptr, buffer_size));
1337   }
1338   else
1339   {
1340     char result[CMD_MAX];
1342     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1343     result[sizeof (result) - 1] = 0;
1345     status = swrite (fd, result, strlen (result));
1346     if (status < 0)
1347     {
1348       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1349       return (-1);
1350     }
1351   }
1353   return (0);
1354 } /* }}} int handle_request */
1356 /* MUST NOT hold journal_lock before calling this */
1357 static void journal_rotate(void) /* {{{ */
1359   FILE *old_fh = NULL;
1361   if (journal_cur == NULL || journal_old == NULL)
1362     return;
1364   pthread_mutex_lock(&journal_lock);
1366   /* we rotate this way (rename before close) so that the we can release
1367    * the journal lock as fast as possible.  Journal writes to the new
1368    * journal can proceed immediately after the new file is opened.  The
1369    * fclose can then block without affecting new updates.
1370    */
1371   if (journal_fh != NULL)
1372   {
1373     old_fh = journal_fh;
1374     rename(journal_cur, journal_old);
1375     ++stats_journal_rotate;
1376   }
1378   journal_fh = fopen(journal_cur, "a");
1379   pthread_mutex_unlock(&journal_lock);
1381   if (old_fh != NULL)
1382     fclose(old_fh);
1384   if (journal_fh == NULL)
1385   {
1386     RRDD_LOG(LOG_CRIT,
1387              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1388              journal_cur, rrd_strerror(errno));
1390     RRDD_LOG(LOG_ERR,
1391              "JOURNALING DISABLED: All values will be flushed at shutdown");
1392     config_flush_at_shutdown = 1;
1393   }
1395 } /* }}} static void journal_rotate */
1397 static void journal_done(void) /* {{{ */
1399   if (journal_cur == NULL)
1400     return;
1402   pthread_mutex_lock(&journal_lock);
1403   if (journal_fh != NULL)
1404   {
1405     fclose(journal_fh);
1406     journal_fh = NULL;
1407   }
1409   if (config_flush_at_shutdown)
1410   {
1411     RRDD_LOG(LOG_INFO, "removing journals");
1412     unlink(journal_old);
1413     unlink(journal_cur);
1414   }
1415   else
1416   {
1417     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1418              "journals will be used at next startup");
1419   }
1421   pthread_mutex_unlock(&journal_lock);
1423 } /* }}} static void journal_done */
1425 static int journal_write(char *cmd, char *args) /* {{{ */
1427   int chars;
1429   if (journal_fh == NULL)
1430     return 0;
1432   pthread_mutex_lock(&journal_lock);
1433   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1434   pthread_mutex_unlock(&journal_lock);
1436   if (chars > 0)
1437   {
1438     pthread_mutex_lock(&stats_lock);
1439     stats_journal_bytes += chars;
1440     pthread_mutex_unlock(&stats_lock);
1441   }
1443   return chars;
1444 } /* }}} static int journal_write */
1446 static int journal_replay (const char *file) /* {{{ */
1448   FILE *fh;
1449   int entry_cnt = 0;
1450   int fail_cnt = 0;
1451   uint64_t line = 0;
1452   char entry[CMD_MAX];
1454   if (file == NULL) return 0;
1456   fh = fopen(file, "r");
1457   if (fh == NULL)
1458   {
1459     if (errno != ENOENT)
1460       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1461                file, rrd_strerror(errno));
1462     return 0;
1463   }
1464   else
1465     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1467   while(!feof(fh))
1468   {
1469     size_t entry_len;
1471     ++line;
1472     fgets(entry, sizeof(entry), fh);
1473     entry_len = strlen(entry);
1475     /* check \n termination in case journal writing crashed mid-line */
1476     if (entry_len == 0)
1477       continue;
1478     else if (entry[entry_len - 1] != '\n')
1479     {
1480       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1481       ++fail_cnt;
1482       continue;
1483     }
1485     entry[entry_len - 1] = '\0';
1487     if (handle_request(-1, entry, entry_len) == 0)
1488       ++entry_cnt;
1489     else
1490       ++fail_cnt;
1491   }
1493   fclose(fh);
1495   if (entry_cnt > 0)
1496   {
1497     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1498              entry_cnt, fail_cnt);
1499     return 1;
1500   }
1501   else
1502     return 0;
1504 } /* }}} static int journal_replay */
1506 static void *connection_thread_main (void *args) /* {{{ */
1508   pthread_t self;
1509   int i;
1510   int fd;
1511   
1512   fd = *((int *) args);
1513   free (args);
1515   pthread_mutex_lock (&connection_threads_lock);
1516   {
1517     pthread_t *temp;
1519     temp = (pthread_t *) realloc (connection_threads,
1520         sizeof (pthread_t) * (connection_threads_num + 1));
1521     if (temp == NULL)
1522     {
1523       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1524     }
1525     else
1526     {
1527       connection_threads = temp;
1528       connection_threads[connection_threads_num] = pthread_self ();
1529       connection_threads_num++;
1530     }
1531   }
1532   pthread_mutex_unlock (&connection_threads_lock);
1534   while (do_shutdown == 0)
1535   {
1536     char buffer[CMD_MAX];
1538     struct pollfd pollfd;
1539     int status;
1541     pollfd.fd = fd;
1542     pollfd.events = POLLIN | POLLPRI;
1543     pollfd.revents = 0;
1545     status = poll (&pollfd, 1, /* timeout = */ 500);
1546     if (do_shutdown)
1547       break;
1548     else if (status == 0) /* timeout */
1549       continue;
1550     else if (status < 0) /* error */
1551     {
1552       status = errno;
1553       if (status == EINTR)
1554         continue;
1555       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1556       continue;
1557     }
1559     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1560     {
1561       close (fd);
1562       break;
1563     }
1564     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1565     {
1566       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1567           "poll(2) returned something unexpected: %#04hx",
1568           pollfd.revents);
1569       close (fd);
1570       break;
1571     }
1573     status = (int) sread (fd, buffer, sizeof (buffer));
1574     if (status <= 0)
1575     {
1576       close (fd);
1578       if (status < 0)
1579         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1581       break;
1582     }
1584     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1585     if (status != 0)
1586       break;
1587   }
1589   close(fd);
1591   self = pthread_self ();
1592   /* Remove this thread from the connection threads list */
1593   pthread_mutex_lock (&connection_threads_lock);
1594   /* Find out own index in the array */
1595   for (i = 0; i < connection_threads_num; i++)
1596     if (pthread_equal (connection_threads[i], self) != 0)
1597       break;
1598   assert (i < connection_threads_num);
1600   /* Move the trailing threads forward. */
1601   if (i < (connection_threads_num - 1))
1602   {
1603     memmove (connection_threads + i,
1604         connection_threads + i + 1,
1605         sizeof (pthread_t) * (connection_threads_num - i - 1));
1606   }
1608   connection_threads_num--;
1609   pthread_mutex_unlock (&connection_threads_lock);
1611   return (NULL);
1612 } /* }}} void *connection_thread_main */
1614 static int open_listen_socket_unix (const char *path) /* {{{ */
1616   int fd;
1617   struct sockaddr_un sa;
1618   listen_socket_t *temp;
1619   int status;
1621   temp = (listen_socket_t *) realloc (listen_fds,
1622       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1623   if (temp == NULL)
1624   {
1625     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1626     return (-1);
1627   }
1628   listen_fds = temp;
1629   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1631   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1632   if (fd < 0)
1633   {
1634     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1635     return (-1);
1636   }
1638   memset (&sa, 0, sizeof (sa));
1639   sa.sun_family = AF_UNIX;
1640   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1642   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1643   if (status != 0)
1644   {
1645     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1646     close (fd);
1647     unlink (path);
1648     return (-1);
1649   }
1651   status = listen (fd, /* backlog = */ 10);
1652   if (status != 0)
1653   {
1654     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1655     close (fd);
1656     unlink (path);
1657     return (-1);
1658   }
1659   
1660   listen_fds[listen_fds_num].fd = fd;
1661   snprintf (listen_fds[listen_fds_num].path,
1662       sizeof (listen_fds[listen_fds_num].path) - 1,
1663       "unix:%s", path);
1664   listen_fds_num++;
1666   return (0);
1667 } /* }}} int open_listen_socket_unix */
1669 static int open_listen_socket (const char *addr_orig) /* {{{ */
1671   struct addrinfo ai_hints;
1672   struct addrinfo *ai_res;
1673   struct addrinfo *ai_ptr;
1674   char addr_copy[NI_MAXHOST];
1675   char *addr;
1676   char *port;
1677   int status;
1679   assert (addr_orig != NULL);
1681   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1682   addr_copy[sizeof (addr_copy) - 1] = 0;
1683   addr = addr_copy;
1685   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1686     return (open_listen_socket_unix (addr + strlen ("unix:")));
1687   else if (addr[0] == '/')
1688     return (open_listen_socket_unix (addr));
1690   memset (&ai_hints, 0, sizeof (ai_hints));
1691   ai_hints.ai_flags = 0;
1692 #ifdef AI_ADDRCONFIG
1693   ai_hints.ai_flags |= AI_ADDRCONFIG;
1694 #endif
1695   ai_hints.ai_family = AF_UNSPEC;
1696   ai_hints.ai_socktype = SOCK_STREAM;
1698   port = NULL;
1699  if (*addr == '[') /* IPv6+port format */
1700   {
1701     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1702     addr++;
1704     port = strchr (addr, ']');
1705     if (port == NULL)
1706     {
1707       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1708           addr_orig);
1709       return (-1);
1710     }
1711     *port = 0;
1712     port++;
1714     if (*port == ':')
1715       port++;
1716     else if (*port == 0)
1717       port = NULL;
1718     else
1719     {
1720       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1721           port);
1722       return (-1);
1723     }
1724   } /* if (*addr = ']') */
1725   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1726   {
1727     port = rindex(addr, ':');
1728     if (port != NULL)
1729     {
1730       *port = 0;
1731       port++;
1732     }
1733   }
1734   ai_res = NULL;
1735   status = getaddrinfo (addr,
1736                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1737                         &ai_hints, &ai_res);
1738   if (status != 0)
1739   {
1740     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1741         "%s", addr, gai_strerror (status));
1742     return (-1);
1743   }
1745   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1746   {
1747     int fd;
1748     listen_socket_t *temp;
1749     int one = 1;
1751     temp = (listen_socket_t *) realloc (listen_fds,
1752         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1753     if (temp == NULL)
1754     {
1755       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1756       continue;
1757     }
1758     listen_fds = temp;
1759     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1761     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1762     if (fd < 0)
1763     {
1764       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1765       continue;
1766     }
1768     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1770     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1771     if (status != 0)
1772     {
1773       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1774       close (fd);
1775       continue;
1776     }
1778     status = listen (fd, /* backlog = */ 10);
1779     if (status != 0)
1780     {
1781       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1782       close (fd);
1783       return (-1);
1784     }
1786     listen_fds[listen_fds_num].fd = fd;
1787     strncpy (listen_fds[listen_fds_num].path, addr,
1788         sizeof (listen_fds[listen_fds_num].path) - 1);
1789     listen_fds_num++;
1790   } /* for (ai_ptr) */
1792   return (0);
1793 } /* }}} int open_listen_socket */
1795 static int close_listen_sockets (void) /* {{{ */
1797   size_t i;
1799   for (i = 0; i < listen_fds_num; i++)
1800   {
1801     close (listen_fds[i].fd);
1802     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1803       unlink (listen_fds[i].path + strlen ("unix:"));
1804   }
1806   free (listen_fds);
1807   listen_fds = NULL;
1808   listen_fds_num = 0;
1810   return (0);
1811 } /* }}} int close_listen_sockets */
1813 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1815   struct pollfd *pollfds;
1816   int pollfds_num;
1817   int status;
1818   int i;
1820   for (i = 0; i < config_listen_address_list_len; i++)
1821     open_listen_socket (config_listen_address_list[i]);
1823   if (config_listen_address_list_len < 1)
1824     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1826   if (listen_fds_num < 1)
1827   {
1828     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1829         "could be opened. Sorry.");
1830     return (NULL);
1831   }
1833   pollfds_num = listen_fds_num;
1834   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1835   if (pollfds == NULL)
1836   {
1837     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1838     return (NULL);
1839   }
1840   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1842   RRDD_LOG(LOG_INFO, "listening for connections");
1844   while (do_shutdown == 0)
1845   {
1846     assert (pollfds_num == ((int) listen_fds_num));
1847     for (i = 0; i < pollfds_num; i++)
1848     {
1849       pollfds[i].fd = listen_fds[i].fd;
1850       pollfds[i].events = POLLIN | POLLPRI;
1851       pollfds[i].revents = 0;
1852     }
1854     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1855     if (do_shutdown)
1856       break;
1857     else if (status == 0) /* timeout */
1858       continue;
1859     else if (status < 0) /* error */
1860     {
1861       status = errno;
1862       if (status != EINTR)
1863       {
1864         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1865       }
1866       continue;
1867     }
1869     for (i = 0; i < pollfds_num; i++)
1870     {
1871       int *client_sd;
1872       struct sockaddr_storage client_sa;
1873       socklen_t client_sa_size;
1874       pthread_t tid;
1875       pthread_attr_t attr;
1877       if (pollfds[i].revents == 0)
1878         continue;
1880       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1881       {
1882         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1883             "poll(2) returned something unexpected for listen FD #%i.",
1884             pollfds[i].fd);
1885         continue;
1886       }
1888       client_sd = (int *) malloc (sizeof (int));
1889       if (client_sd == NULL)
1890       {
1891         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1892         continue;
1893       }
1895       client_sa_size = sizeof (client_sa);
1896       *client_sd = accept (pollfds[i].fd,
1897           (struct sockaddr *) &client_sa, &client_sa_size);
1898       if (*client_sd < 0)
1899       {
1900         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1901         continue;
1902       }
1904       pthread_attr_init (&attr);
1905       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1907       status = pthread_create (&tid, &attr, connection_thread_main,
1908           /* args = */ (void *) client_sd);
1909       if (status != 0)
1910       {
1911         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1912         close (*client_sd);
1913         free (client_sd);
1914         continue;
1915       }
1916     } /* for (pollfds_num) */
1917   } /* while (do_shutdown == 0) */
1919   RRDD_LOG(LOG_INFO, "starting shutdown");
1921   close_listen_sockets ();
1923   pthread_mutex_lock (&connection_threads_lock);
1924   while (connection_threads_num > 0)
1925   {
1926     pthread_t wait_for;
1928     wait_for = connection_threads[0];
1930     pthread_mutex_unlock (&connection_threads_lock);
1931     pthread_join (wait_for, /* retval = */ NULL);
1932     pthread_mutex_lock (&connection_threads_lock);
1933   }
1934   pthread_mutex_unlock (&connection_threads_lock);
1936   return (NULL);
1937 } /* }}} void *listen_thread_main */
1939 static int daemonize (void) /* {{{ */
1941   int status;
1942   int fd;
1944   fd = open_pidfile();
1945   if (fd < 0) return fd;
1947   if (!stay_foreground)
1948   {
1949     pid_t child;
1950     char *base_dir;
1952     child = fork ();
1953     if (child < 0)
1954     {
1955       fprintf (stderr, "daemonize: fork(2) failed.\n");
1956       return (-1);
1957     }
1958     else if (child > 0)
1959     {
1960       return (1);
1961     }
1963     /* Change into the /tmp directory. */
1964     base_dir = (config_base_dir != NULL)
1965       ? config_base_dir
1966       : "/tmp";
1967     status = chdir (base_dir);
1968     if (status != 0)
1969     {
1970       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1971       return (-1);
1972     }
1974     /* Become session leader */
1975     setsid ();
1977     /* Open the first three file descriptors to /dev/null */
1978     close (2);
1979     close (1);
1980     close (0);
1982     open ("/dev/null", O_RDWR);
1983     dup (0);
1984     dup (0);
1985   } /* if (!stay_foreground) */
1987   install_signal_handlers();
1989   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1990   RRDD_LOG(LOG_INFO, "starting up");
1992   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1993   if (cache_tree == NULL)
1994   {
1995     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1996     return (-1);
1997   }
1999   status = write_pidfile (fd);
2000   return status;
2001 } /* }}} int daemonize */
2003 static int cleanup (void) /* {{{ */
2005   do_shutdown++;
2007   pthread_cond_signal (&cache_cond);
2008   pthread_join (queue_thread, /* return = */ NULL);
2010   remove_pidfile ();
2012   RRDD_LOG(LOG_INFO, "goodbye");
2013   closelog ();
2015   return (0);
2016 } /* }}} int cleanup */
2018 static int read_options (int argc, char **argv) /* {{{ */
2020   int option;
2021   int status = 0;
2023   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?F")) != -1)
2024   {
2025     switch (option)
2026     {
2027       case 'g':
2028         stay_foreground=1;
2029         break;
2031       case 'l':
2032       {
2033         char **temp;
2035         temp = (char **) realloc (config_listen_address_list,
2036             sizeof (char *) * (config_listen_address_list_len + 1));
2037         if (temp == NULL)
2038         {
2039           fprintf (stderr, "read_options: realloc failed.\n");
2040           return (2);
2041         }
2042         config_listen_address_list = temp;
2044         temp[config_listen_address_list_len] = strdup (optarg);
2045         if (temp[config_listen_address_list_len] == NULL)
2046         {
2047           fprintf (stderr, "read_options: strdup failed.\n");
2048           return (2);
2049         }
2050         config_listen_address_list_len++;
2051       }
2052       break;
2054       case 'f':
2055       {
2056         int temp;
2058         temp = atoi (optarg);
2059         if (temp > 0)
2060           config_flush_interval = temp;
2061         else
2062         {
2063           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2064           status = 3;
2065         }
2066       }
2067       break;
2069       case 'w':
2070       {
2071         int temp;
2073         temp = atoi (optarg);
2074         if (temp > 0)
2075           config_write_interval = temp;
2076         else
2077         {
2078           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2079           status = 2;
2080         }
2081       }
2082       break;
2084       case 'z':
2085       {
2086         int temp;
2088         temp = atoi(optarg);
2089         if (temp > 0)
2090           config_write_jitter = temp;
2091         else
2092         {
2093           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2094           status = 2;
2095         }
2097         break;
2098       }
2100       case 'b':
2101       {
2102         size_t len;
2104         if (config_base_dir != NULL)
2105           free (config_base_dir);
2106         config_base_dir = strdup (optarg);
2107         if (config_base_dir == NULL)
2108         {
2109           fprintf (stderr, "read_options: strdup failed.\n");
2110           return (3);
2111         }
2113         len = strlen (config_base_dir);
2114         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2115         {
2116           config_base_dir[len - 1] = 0;
2117           len--;
2118         }
2120         if (len < 1)
2121         {
2122           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2123           return (4);
2124         }
2125       }
2126       break;
2128       case 'p':
2129       {
2130         if (config_pid_file != NULL)
2131           free (config_pid_file);
2132         config_pid_file = strdup (optarg);
2133         if (config_pid_file == NULL)
2134         {
2135           fprintf (stderr, "read_options: strdup failed.\n");
2136           return (3);
2137         }
2138       }
2139       break;
2141       case 'F':
2142         config_flush_at_shutdown = 1;
2143         break;
2145       case 'j':
2146       {
2147         struct stat statbuf;
2148         const char *dir = optarg;
2150         status = stat(dir, &statbuf);
2151         if (status != 0)
2152         {
2153           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2154           return 6;
2155         }
2157         if (!S_ISDIR(statbuf.st_mode)
2158             || access(dir, R_OK|W_OK|X_OK) != 0)
2159         {
2160           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2161                   errno ? rrd_strerror(errno) : "");
2162           return 6;
2163         }
2165         journal_cur = malloc(PATH_MAX + 1);
2166         journal_old = malloc(PATH_MAX + 1);
2167         if (journal_cur == NULL || journal_old == NULL)
2168         {
2169           fprintf(stderr, "malloc failure for journal files\n");
2170           return 6;
2171         }
2172         else 
2173         {
2174           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2175           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2176         }
2177       }
2178       break;
2180       case 'h':
2181       case '?':
2182         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2183             "\n"
2184             "Usage: rrdcached [options]\n"
2185             "\n"
2186             "Valid options are:\n"
2187             "  -l <address>  Socket address to listen to.\n"
2188             "  -w <seconds>  Interval in which to write data.\n"
2189             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2190             "  -f <seconds>  Interval in which to flush dead data.\n"
2191             "  -p <file>     Location of the PID-file.\n"
2192             "  -b <dir>      Base directory to change to.\n"
2193             "  -g            Do not fork and run in the foreground.\n"
2194             "  -j <dir>      Directory in which to create the journal files.\n"
2195             "  -F            Always flush all updates at shutdown\n"
2196             "\n"
2197             "For more information and a detailed description of all options "
2198             "please refer\n"
2199             "to the rrdcached(1) manual page.\n",
2200             VERSION);
2201         status = -1;
2202         break;
2203     } /* switch (option) */
2204   } /* while (getopt) */
2206   /* advise the user when values are not sane */
2207   if (config_flush_interval < 2 * config_write_interval)
2208     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2209             " 2x write interval (-w) !\n");
2210   if (config_write_jitter > config_write_interval)
2211     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2212             " write interval (-w) !\n");
2214   if (journal_cur == NULL)
2215     config_flush_at_shutdown = 1;
2217   return (status);
2218 } /* }}} int read_options */
2220 int main (int argc, char **argv)
2222   int status;
2224   status = read_options (argc, argv);
2225   if (status != 0)
2226   {
2227     if (status < 0)
2228       status = 0;
2229     return (status);
2230   }
2232   status = daemonize ();
2233   if (status == 1)
2234   {
2235     struct sigaction sigchld;
2237     memset (&sigchld, 0, sizeof (sigchld));
2238     sigchld.sa_handler = SIG_IGN;
2239     sigaction (SIGCHLD, &sigchld, NULL);
2241     return (0);
2242   }
2243   else if (status != 0)
2244   {
2245     fprintf (stderr, "daemonize failed, exiting.\n");
2246     return (1);
2247   }
2249   if (journal_cur != NULL)
2250   {
2251     int had_journal = 0;
2253     pthread_mutex_lock(&journal_lock);
2255     RRDD_LOG(LOG_INFO, "checking for journal files");
2257     had_journal += journal_replay(journal_old);
2258     had_journal += journal_replay(journal_cur);
2260     if (had_journal)
2261       flush_old_values(-1);
2263     pthread_mutex_unlock(&journal_lock);
2264     journal_rotate();
2266     RRDD_LOG(LOG_INFO, "journal processing complete");
2267   }
2269   /* start the queue thread */
2270   memset (&queue_thread, 0, sizeof (queue_thread));
2271   status = pthread_create (&queue_thread,
2272                            NULL, /* attr */
2273                            queue_thread_main,
2274                            NULL); /* args */
2275   if (status != 0)
2276   {
2277     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2278     cleanup();
2279     return (1);
2280   }
2282   listen_thread_main (NULL);
2283   cleanup ();
2285   return (0);
2286 } /* int main */
2288 /*
2289  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2290  */