Code

0e29f131ab1387fa657ae0160e2e6d956de4bab7
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 struct listen_socket_s
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122   pthread_cond_t  flushed;
123   cache_item_t *next;
124 };
126 struct callback_flush_data_s
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
168 static int config_write_interval = 300;
169 static int config_write_jitter   = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
195 /* 
196  * Functions
197  */
198 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
200   RRDD_LOG(LOG_NOTICE, "caught SIGINT");
201   do_shutdown++;
202   pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_int_handler */
205 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
207   RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
208   do_shutdown++;
209   pthread_cond_broadcast(&cache_cond);
210 } /* }}} void sig_term_handler */
212 static int open_pidfile(void) /* {{{ */
214   int fd;
215   char *file;
217   file = (config_pid_file != NULL)
218     ? config_pid_file
219     : LOCALSTATEDIR "/run/rrdcached.pid";
221   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
222   if (fd < 0)
223     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
224             file, rrd_strerror(errno));
226   return(fd);
229 static int write_pidfile (int fd) /* {{{ */
231   pid_t pid;
232   FILE *fh;
234   pid = getpid ();
236   fh = fdopen (fd, "w");
237   if (fh == NULL)
238   {
239     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
240     close(fd);
241     return (-1);
242   }
244   fprintf (fh, "%i\n", (int) pid);
245   fclose (fh);
247   return (0);
248 } /* }}} int write_pidfile */
250 static int remove_pidfile (void) /* {{{ */
252   char *file;
253   int status;
255   file = (config_pid_file != NULL)
256     ? config_pid_file
257     : LOCALSTATEDIR "/run/rrdcached.pid";
259   status = unlink (file);
260   if (status == 0)
261     return (0);
262   return (errno);
263 } /* }}} int remove_pidfile */
265 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
267   char    *buffer;
268   size_t   buffer_used;
269   size_t   buffer_free;
270   ssize_t  status;
272   buffer       = (char *) buffer_void;
273   buffer_used  = 0;
274   buffer_free  = buffer_size;
276   while (buffer_free > 0)
277   {
278     status = read (fd, buffer + buffer_used, buffer_free);
279     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
280       continue;
282     if (status < 0)
283       return (-1);
285     if (status == 0)
286       return (0);
288     assert ((0 > status) || (buffer_free >= (size_t) status));
290     buffer_free = buffer_free - status;
291     buffer_used = buffer_used + status;
293     if (buffer[buffer_used - 1] == '\n')
294       break;
295   }
297   assert (buffer_used > 0);
299   if (buffer[buffer_used - 1] != '\n')
300   {
301     errno = ENOBUFS;
302     return (-1);
303   }
305   buffer[buffer_used - 1] = 0;
307   /* Fix network line endings. */
308   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
309   {
310     buffer_used--;
311     buffer[buffer_used - 1] = 0;
312   }
314   return (buffer_used);
315 } /* }}} ssize_t sread */
317 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
319   const char *ptr;
320   size_t      nleft;
321   ssize_t     status;
323   /* special case for journal replay */
324   if (fd < 0) return 0;
326   ptr   = (const char *) buf;
327   nleft = count;
329   while (nleft > 0)
330   {
331     status = write (fd, (const void *) ptr, nleft);
333     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
334       continue;
336     if (status < 0)
337       return (status);
339     nleft -= status;
340     ptr   += status;
341   }
343   return (0);
344 } /* }}} ssize_t swrite */
346 static void _wipe_ci_values(cache_item_t *ci, time_t when)
348   ci->values = NULL;
349   ci->values_num = 0;
351   ci->last_flush_time = when;
352   if (config_write_jitter > 0)
353     ci->last_flush_time += (random() % config_write_jitter);
355   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
358 /*
359  * enqueue_cache_item:
360  * `cache_lock' must be acquired before calling this function!
361  */
362 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
363     queue_side_t side)
365   int did_insert = 0;
367   if (ci == NULL)
368     return (-1);
370   if (ci->values_num == 0)
371     return (0);
373   if (side == HEAD)
374   {
375     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
376     {
377       assert (ci->next == NULL);
378       ci->next = cache_queue_head;
379       cache_queue_head = ci;
381       if (cache_queue_tail == NULL)
382         cache_queue_tail = cache_queue_head;
384       did_insert = 1;
385     }
386     else if (cache_queue_head == ci)
387     {
388       /* do nothing */
389     }
390     else /* enqueued, but not first entry */
391     {
392       cache_item_t *prev;
394       /* find previous entry */
395       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
396         if (prev->next == ci)
397           break;
398       assert (prev != NULL);
400       /* move to the front */
401       prev->next = ci->next;
402       ci->next = cache_queue_head;
403       cache_queue_head = ci;
405       /* check if we need to adapt the tail */
406       if (cache_queue_tail == ci)
407         cache_queue_tail = prev;
408     }
409   }
410   else /* (side == TAIL) */
411   {
412     /* We don't move values back in the list.. */
413     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
414       return (0);
416     assert (ci->next == NULL);
418     if (cache_queue_tail == NULL)
419       cache_queue_head = ci;
420     else
421       cache_queue_tail->next = ci;
422     cache_queue_tail = ci;
424     did_insert = 1;
425   }
427   ci->flags |= CI_FLAGS_IN_QUEUE;
429   if (did_insert)
430   {
431     pthread_cond_broadcast(&cache_cond);
432     pthread_mutex_lock (&stats_lock);
433     stats_queue_length++;
434     pthread_mutex_unlock (&stats_lock);
435   }
437   return (0);
438 } /* }}} int enqueue_cache_item */
440 /*
441  * tree_callback_flush:
442  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
443  * while this is in progress.
444  */
445 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
446     gpointer data)
448   cache_item_t *ci;
449   callback_flush_data_t *cfd;
451   ci = (cache_item_t *) value;
452   cfd = (callback_flush_data_t *) data;
454   if ((ci->last_flush_time <= cfd->abs_timeout)
455       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
456       && (ci->values_num > 0))
457   {
458     enqueue_cache_item (ci, TAIL);
459   }
460   else if ((do_shutdown != 0)
461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
462       && (ci->values_num > 0))
463   {
464     enqueue_cache_item (ci, TAIL);
465   }
466   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
467       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
468       && (ci->values_num <= 0))
469   {
470     char **temp;
472     temp = (char **) realloc (cfd->keys,
473         sizeof (char *) * (cfd->keys_num + 1));
474     if (temp == NULL)
475     {
476       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
477       return (FALSE);
478     }
479     cfd->keys = temp;
480     /* Make really sure this points to the _same_ place */
481     assert ((char *) key == ci->file);
482     cfd->keys[cfd->keys_num] = (char *) key;
483     cfd->keys_num++;
484   }
486   return (FALSE);
487 } /* }}} gboolean tree_callback_flush */
489 static int flush_old_values (int max_age)
491   callback_flush_data_t cfd;
492   size_t k;
494   memset (&cfd, 0, sizeof (cfd));
495   /* Pass the current time as user data so that we don't need to call
496    * `time' for each node. */
497   cfd.now = time (NULL);
498   cfd.keys = NULL;
499   cfd.keys_num = 0;
501   if (max_age > 0)
502     cfd.abs_timeout = cfd.now - max_age;
503   else
504     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
506   /* `tree_callback_flush' will return the keys of all values that haven't
507    * been touched in the last `config_flush_interval' seconds in `cfd'.
508    * The char*'s in this array point to the same memory as ci->file, so we
509    * don't need to free them separately. */
510   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
512   for (k = 0; k < cfd.keys_num; k++)
513   {
514     cache_item_t *ci;
516     /* This must not fail. */
517     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
518     assert (ci != NULL);
520     /* If we end up here with values available, something's seriously
521      * messed up. */
522     assert (ci->values_num == 0);
524     /* Remove the node from the tree */
525     g_tree_remove (cache_tree, cfd.keys[k]);
526     cfd.keys[k] = NULL;
528     /* Now free and clean up `ci'. */
529     free (ci->file);
530     ci->file = NULL;
531     free (ci);
532     ci = NULL;
533   } /* for (k = 0; k < cfd.keys_num; k++) */
535   if (cfd.keys != NULL)
536   {
537     free (cfd.keys);
538     cfd.keys = NULL;
539   }
541   return (0);
542 } /* int flush_old_values */
544 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
546   struct timeval now;
547   struct timespec next_flush;
549   gettimeofday (&now, NULL);
550   next_flush.tv_sec = now.tv_sec + config_flush_interval;
551   next_flush.tv_nsec = 1000 * now.tv_usec;
553   pthread_mutex_lock (&cache_lock);
554   while ((do_shutdown == 0) || (cache_queue_head != NULL))
555   {
556     cache_item_t *ci;
557     char *file;
558     char **values;
559     int values_num;
560     int status;
561     int i;
563     /* First, check if it's time to do the cache flush. */
564     gettimeofday (&now, NULL);
565     if ((now.tv_sec > next_flush.tv_sec)
566         || ((now.tv_sec == next_flush.tv_sec)
567           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
568     {
569       /* Flush all values that haven't been written in the last
570        * `config_write_interval' seconds. */
571       flush_old_values (config_write_interval);
573       /* Determine the time of the next cache flush. */
574       while (next_flush.tv_sec <= now.tv_sec)
575         next_flush.tv_sec += config_flush_interval;
577       /* unlock the cache while we rotate so we don't block incoming
578        * updates if the fsync() blocks on disk I/O */
579       pthread_mutex_unlock(&cache_lock);
580       journal_rotate();
581       pthread_mutex_lock(&cache_lock);
582     }
584     /* Now, check if there's something to store away. If not, wait until
585      * something comes in or it's time to do the cache flush. */
586     if (cache_queue_head == NULL)
587     {
588       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
589       if ((status != 0) && (status != ETIMEDOUT))
590       {
591         RRDD_LOG (LOG_ERR, "queue_thread_main: "
592             "pthread_cond_timedwait returned %i.", status);
593       }
594     }
596     /* We're about to shut down, so lets flush the entire tree. */
597     if ((do_shutdown != 0) && (cache_queue_head == NULL))
598       flush_old_values (/* max age = */ -1);
600     /* Check if a value has arrived. This may be NULL if we timed out or there
601      * was an interrupt such as a signal. */
602     if (cache_queue_head == NULL)
603       continue;
605     ci = cache_queue_head;
607     /* copy the relevant parts */
608     file = strdup (ci->file);
609     if (file == NULL)
610     {
611       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
612       continue;
613     }
615     assert(ci->values != NULL);
616     assert(ci->values_num > 0);
618     values = ci->values;
619     values_num = ci->values_num;
621     _wipe_ci_values(ci, time(NULL));
623     cache_queue_head = ci->next;
624     if (cache_queue_head == NULL)
625       cache_queue_tail = NULL;
626     ci->next = NULL;
628     pthread_mutex_lock (&stats_lock);
629     assert (stats_queue_length > 0);
630     stats_queue_length--;
631     pthread_mutex_unlock (&stats_lock);
633     pthread_mutex_unlock (&cache_lock);
635     rrd_clear_error ();
636     status = rrd_update_r (file, NULL, values_num, (void *) values);
637     if (status != 0)
638     {
639       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
640           "rrd_update_r (%s) failed with status %i. (%s)",
641           file, status, rrd_get_error());
642     }
644     journal_write("wrote", file);
645     pthread_cond_broadcast(&ci->flushed);
647     for (i = 0; i < values_num; i++)
648       free (values[i]);
650     free(values);
651     free(file);
653     if (status == 0)
654     {
655       pthread_mutex_lock (&stats_lock);
656       stats_updates_written++;
657       stats_data_sets_written += values_num;
658       pthread_mutex_unlock (&stats_lock);
659     }
661     pthread_mutex_lock (&cache_lock);
663     /* We're about to shut down, so lets flush the entire tree. */
664     if ((do_shutdown != 0) && (cache_queue_head == NULL))
665       flush_old_values (/* max age = */ -1);
666   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
667   pthread_mutex_unlock (&cache_lock);
669   assert(cache_queue_head == NULL);
670   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
671   journal_done();
673   return (NULL);
674 } /* }}} void *queue_thread_main */
676 static int buffer_get_field (char **buffer_ret, /* {{{ */
677     size_t *buffer_size_ret, char **field_ret)
679   char *buffer;
680   size_t buffer_pos;
681   size_t buffer_size;
682   char *field;
683   size_t field_size;
684   int status;
686   buffer = *buffer_ret;
687   buffer_pos = 0;
688   buffer_size = *buffer_size_ret;
689   field = *buffer_ret;
690   field_size = 0;
692   if (buffer_size <= 0)
693     return (-1);
695   /* This is ensured by `handle_request'. */
696   assert (buffer[buffer_size - 1] == '\0');
698   status = -1;
699   while (buffer_pos < buffer_size)
700   {
701     /* Check for end-of-field or end-of-buffer */
702     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
703     {
704       field[field_size] = 0;
705       field_size++;
706       buffer_pos++;
707       status = 0;
708       break;
709     }
710     /* Handle escaped characters. */
711     else if (buffer[buffer_pos] == '\\')
712     {
713       if (buffer_pos >= (buffer_size - 1))
714         break;
715       buffer_pos++;
716       field[field_size] = buffer[buffer_pos];
717       field_size++;
718       buffer_pos++;
719     }
720     /* Normal operation */ 
721     else
722     {
723       field[field_size] = buffer[buffer_pos];
724       field_size++;
725       buffer_pos++;
726     }
727   } /* while (buffer_pos < buffer_size) */
729   if (status != 0)
730     return (status);
732   *buffer_ret = buffer + buffer_pos;
733   *buffer_size_ret = buffer_size - buffer_pos;
734   *field_ret = field;
736   return (0);
737 } /* }}} int buffer_get_field */
739 static int flush_file (const char *filename) /* {{{ */
741   cache_item_t *ci;
743   pthread_mutex_lock (&cache_lock);
745   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
746   if (ci == NULL)
747   {
748     pthread_mutex_unlock (&cache_lock);
749     return (ENOENT);
750   }
752   /* Enqueue at head */
753   enqueue_cache_item (ci, HEAD);
755   pthread_cond_wait(&ci->flushed, &cache_lock);
756   pthread_mutex_unlock(&cache_lock);
758   return (0);
759 } /* }}} int flush_file */
761 static int handle_request_help (int fd, /* {{{ */
762     char *buffer, size_t buffer_size)
764   int status;
765   char **help_text;
766   size_t help_text_len;
767   char *command;
768   size_t i;
770   char *help_help[] =
771   {
772     "5 Command overview\n",
773     "FLUSH <filename>\n",
774     "FLUSHALL\n",
775     "HELP [<command>]\n",
776     "UPDATE <filename> <values> [<values> ...]\n",
777     "STATS\n"
778   };
779   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
781   char *help_flush[] =
782   {
783     "4 Help for FLUSH\n",
784     "Usage: FLUSH <filename>\n",
785     "\n",
786     "Adds the given filename to the head of the update queue and returns\n",
787     "after is has been dequeued.\n"
788   };
789   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
791   char *help_flushall[] =
792   {
793     "3 Help for FLUSHALL\n",
794     "Usage: FLUSHALL\n",
795     "\n",
796     "Triggers writing of all pending updates.  Returns immediately.\n"
797   };
798   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
800   char *help_update[] =
801   {
802     "9 Help for UPDATE\n",
803     "Usage: UPDATE <filename> <values> [<values> ...]\n"
804     "\n",
805     "Adds the given file to the internal cache if it is not yet known and\n",
806     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
807     "for details.\n",
808     "\n",
809     "Each <values> has the following form:\n",
810     "  <values> = <time>:<value>[:<value>[...]]\n",
811     "See the rrdupdate(1) manpage for details.\n"
812   };
813   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
815   char *help_stats[] =
816   {
817     "4 Help for STATS\n",
818     "Usage: STATS\n",
819     "\n",
820     "Returns some performance counters, see the rrdcached(1) manpage for\n",
821     "a description of the values.\n"
822   };
823   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
825   status = buffer_get_field (&buffer, &buffer_size, &command);
826   if (status != 0)
827   {
828     help_text = help_help;
829     help_text_len = help_help_len;
830   }
831   else
832   {
833     if (strcasecmp (command, "update") == 0)
834     {
835       help_text = help_update;
836       help_text_len = help_update_len;
837     }
838     else if (strcasecmp (command, "flush") == 0)
839     {
840       help_text = help_flush;
841       help_text_len = help_flush_len;
842     }
843     else if (strcasecmp (command, "flushall") == 0)
844     {
845       help_text = help_flushall;
846       help_text_len = help_flushall_len;
847     }
848     else if (strcasecmp (command, "stats") == 0)
849     {
850       help_text = help_stats;
851       help_text_len = help_stats_len;
852     }
853     else
854     {
855       help_text = help_help;
856       help_text_len = help_help_len;
857     }
858   }
860   for (i = 0; i < help_text_len; i++)
861   {
862     status = swrite (fd, help_text[i], strlen (help_text[i]));
863     if (status < 0)
864     {
865       status = errno;
866       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
867       return (status);
868     }
869   }
871   return (0);
872 } /* }}} int handle_request_help */
874 static int handle_request_stats (int fd, /* {{{ */
875     char *buffer __attribute__((unused)),
876     size_t buffer_size __attribute__((unused)))
878   int status;
879   char outbuf[CMD_MAX];
881   uint64_t copy_queue_length;
882   uint64_t copy_updates_received;
883   uint64_t copy_flush_received;
884   uint64_t copy_updates_written;
885   uint64_t copy_data_sets_written;
886   uint64_t copy_journal_bytes;
887   uint64_t copy_journal_rotate;
889   uint64_t tree_nodes_number;
890   uint64_t tree_depth;
892   pthread_mutex_lock (&stats_lock);
893   copy_queue_length       = stats_queue_length;
894   copy_updates_received   = stats_updates_received;
895   copy_flush_received     = stats_flush_received;
896   copy_updates_written    = stats_updates_written;
897   copy_data_sets_written  = stats_data_sets_written;
898   copy_journal_bytes      = stats_journal_bytes;
899   copy_journal_rotate     = stats_journal_rotate;
900   pthread_mutex_unlock (&stats_lock);
902   pthread_mutex_lock (&cache_lock);
903   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
904   tree_depth        = (uint64_t) g_tree_height (cache_tree);
905   pthread_mutex_unlock (&cache_lock);
907 #define RRDD_STATS_SEND \
908   outbuf[sizeof (outbuf) - 1] = 0; \
909   status = swrite (fd, outbuf, strlen (outbuf)); \
910   if (status < 0) \
911   { \
912     status = errno; \
913     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
914     return (status); \
915   }
917   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
918   RRDD_STATS_SEND;
920   snprintf (outbuf, sizeof (outbuf),
921       "QueueLength: %"PRIu64"\n", copy_queue_length);
922   RRDD_STATS_SEND;
924   snprintf (outbuf, sizeof (outbuf),
925       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
926   RRDD_STATS_SEND;
928   snprintf (outbuf, sizeof (outbuf),
929       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
930   RRDD_STATS_SEND;
932   snprintf (outbuf, sizeof (outbuf),
933       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
934   RRDD_STATS_SEND;
936   snprintf (outbuf, sizeof (outbuf),
937       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
938   RRDD_STATS_SEND;
940   snprintf (outbuf, sizeof (outbuf),
941       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
942   RRDD_STATS_SEND;
944   snprintf (outbuf, sizeof (outbuf),
945       "TreeDepth: %"PRIu64"\n", tree_depth);
946   RRDD_STATS_SEND;
948   snprintf (outbuf, sizeof(outbuf),
949       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
950   RRDD_STATS_SEND;
952   snprintf (outbuf, sizeof(outbuf),
953       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
954   RRDD_STATS_SEND;
956   return (0);
957 #undef RRDD_STATS_SEND
958 } /* }}} int handle_request_stats */
960 static int handle_request_flush (int fd, /* {{{ */
961     char *buffer, size_t buffer_size)
963   char *file;
964   int status;
965   char result[CMD_MAX];
967   status = buffer_get_field (&buffer, &buffer_size, &file);
968   if (status != 0)
969   {
970     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
971   }
972   else
973   {
974     pthread_mutex_lock(&stats_lock);
975     stats_flush_received++;
976     pthread_mutex_unlock(&stats_lock);
978     status = flush_file (file);
979     if (status == 0)
980       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
981     else if (status == ENOENT)
982     {
983       /* no file in our tree; see whether it exists at all */
984       struct stat statbuf;
986       memset(&statbuf, 0, sizeof(statbuf));
987       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
988         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
989       else
990         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
991     }
992     else if (status < 0)
993       strncpy (result, "-1 Internal error.\n", sizeof (result));
994     else
995       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
996   }
997   result[sizeof (result) - 1] = 0;
999   status = swrite (fd, result, strlen (result));
1000   if (status < 0)
1001   {
1002     status = errno;
1003     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1004     return (status);
1005   }
1007   return (0);
1008 } /* }}} int handle_request_flush */
1010 static int handle_request_flushall(int fd) /* {{{ */
1012   int status;
1013   char answer[] ="0 Started flush.\n";
1015   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1017   pthread_mutex_lock(&cache_lock);
1018   flush_old_values(-1);
1019   pthread_mutex_unlock(&cache_lock);
1021   status = swrite(fd, answer, strlen(answer));
1022   if (status < 0)
1023   {
1024     status = errno;
1025     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1026   }
1028   return (status);
1031 static int handle_request_update (int fd, /* {{{ */
1032     char *buffer, size_t buffer_size)
1034   char *file;
1035   int values_num = 0;
1036   int status;
1038   time_t now;
1040   cache_item_t *ci;
1041   char answer[CMD_MAX];
1043 #define RRDD_UPDATE_SEND \
1044   answer[sizeof (answer) - 1] = 0; \
1045   status = swrite (fd, answer, strlen (answer)); \
1046   if (status < 0) \
1047   { \
1048     status = errno; \
1049     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1050     return (status); \
1051   }
1053   now = time (NULL);
1055   status = buffer_get_field (&buffer, &buffer_size, &file);
1056   if (status != 0)
1057   {
1058     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1059         sizeof (answer));
1060     RRDD_UPDATE_SEND;
1061     return (0);
1062   }
1064   pthread_mutex_lock(&stats_lock);
1065   stats_updates_received++;
1066   pthread_mutex_unlock(&stats_lock);
1068   pthread_mutex_lock (&cache_lock);
1069   ci = g_tree_lookup (cache_tree, file);
1071   if (ci == NULL) /* {{{ */
1072   {
1073     struct stat statbuf;
1075     /* don't hold the lock while we setup; stat(2) might block */
1076     pthread_mutex_unlock(&cache_lock);
1078     memset (&statbuf, 0, sizeof (statbuf));
1079     status = stat (file, &statbuf);
1080     if (status != 0)
1081     {
1082       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1084       status = errno;
1085       if (status == ENOENT)
1086         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1087       else
1088         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1089             status);
1090       RRDD_UPDATE_SEND;
1091       return (0);
1092     }
1093     if (!S_ISREG (statbuf.st_mode))
1094     {
1095       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1096       RRDD_UPDATE_SEND;
1097       return (0);
1098     }
1099     if (access(file, R_OK|W_OK) != 0)
1100     {
1101       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1102                 file, rrd_strerror(errno));
1103       RRDD_UPDATE_SEND;
1104       return (0);
1105     }
1107     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1108     if (ci == NULL)
1109     {
1110       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1112       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1113       RRDD_UPDATE_SEND;
1114       return (0);
1115     }
1116     memset (ci, 0, sizeof (cache_item_t));
1118     ci->file = strdup (file);
1119     if (ci->file == NULL)
1120     {
1121       free (ci);
1122       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1124       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1125       RRDD_UPDATE_SEND;
1126       return (0);
1127     }
1129     _wipe_ci_values(ci, now);
1130     ci->flags = CI_FLAGS_IN_TREE;
1132     pthread_mutex_lock(&cache_lock);
1133     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1134   } /* }}} */
1135   assert (ci != NULL);
1137   while (buffer_size > 0)
1138   {
1139     char **temp;
1140     char *value;
1142     status = buffer_get_field (&buffer, &buffer_size, &value);
1143     if (status != 0)
1144     {
1145       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1146       break;
1147     }
1149     temp = (char **) realloc (ci->values,
1150         sizeof (char *) * (ci->values_num + 1));
1151     if (temp == NULL)
1152     {
1153       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1154       continue;
1155     }
1156     ci->values = temp;
1158     ci->values[ci->values_num] = strdup (value);
1159     if (ci->values[ci->values_num] == NULL)
1160     {
1161       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1162       continue;
1163     }
1164     ci->values_num++;
1166     values_num++;
1167   }
1169   if (((now - ci->last_flush_time) >= config_write_interval)
1170       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1171       && (ci->values_num > 0))
1172   {
1173     enqueue_cache_item (ci, TAIL);
1174   }
1176   pthread_mutex_unlock (&cache_lock);
1178   if (values_num < 1)
1179   {
1180     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1181   }
1182   else
1183   {
1184     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1185         (values_num == 1) ? "" : "s");
1186   }
1187   RRDD_UPDATE_SEND;
1188   return (0);
1189 #undef RRDD_UPDATE_SEND
1190 } /* }}} int handle_request_update */
1192 /* we came across a "WROTE" entry during journal replay.
1193  * throw away any values that we have accumulated for this file
1194  */
1195 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1196                                  const char *buffer,
1197                                  size_t buffer_size __attribute__((unused)))
1199   int i;
1200   cache_item_t *ci;
1201   const char *file = buffer;
1203   pthread_mutex_lock(&cache_lock);
1205   ci = g_tree_lookup(cache_tree, file);
1206   if (ci == NULL)
1207   {
1208     pthread_mutex_unlock(&cache_lock);
1209     return (0);
1210   }
1212   if (ci->values)
1213   {
1214     for (i=0; i < ci->values_num; i++)
1215       free(ci->values[i]);
1217     free(ci->values);
1218   }
1220   _wipe_ci_values(ci, time(NULL));
1222   pthread_mutex_unlock(&cache_lock);
1223   return (0);
1224 } /* }}} int handle_request_wrote */
1226 /* if fd < 0, we are in journal replay mode */
1227 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1229   char *buffer_ptr;
1230   char *command;
1231   int status;
1233   assert (buffer[buffer_size - 1] == '\0');
1235   buffer_ptr = buffer;
1236   command = NULL;
1237   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1238   if (status != 0)
1239   {
1240     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1241     return (-1);
1242   }
1244   if (strcasecmp (command, "update") == 0)
1245   {
1246     /* don't re-write updates in replay mode */
1247     if (fd >= 0)
1248       journal_write(command, buffer_ptr);
1250     return (handle_request_update (fd, buffer_ptr, buffer_size));
1251   }
1252   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1253   {
1254     /* this is only valid in replay mode */
1255     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1256   }
1257   else if (strcasecmp (command, "flush") == 0)
1258   {
1259     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1260   }
1261   else if (strcasecmp (command, "flushall") == 0)
1262   {
1263     return (handle_request_flushall(fd));
1264   }
1265   else if (strcasecmp (command, "stats") == 0)
1266   {
1267     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1268   }
1269   else if (strcasecmp (command, "help") == 0)
1270   {
1271     return (handle_request_help (fd, buffer_ptr, buffer_size));
1272   }
1273   else
1274   {
1275     char result[CMD_MAX];
1277     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1278     result[sizeof (result) - 1] = 0;
1280     status = swrite (fd, result, strlen (result));
1281     if (status < 0)
1282     {
1283       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1284       return (-1);
1285     }
1286   }
1288   return (0);
1289 } /* }}} int handle_request */
1291 /* MUST NOT hold journal_lock before calling this */
1292 static void journal_rotate(void) /* {{{ */
1294   FILE *old_fh = NULL;
1296   if (journal_cur == NULL || journal_old == NULL)
1297     return;
1299   pthread_mutex_lock(&journal_lock);
1301   /* we rotate this way (rename before close) so that the we can release
1302    * the journal lock as fast as possible.  Journal writes to the new
1303    * journal can proceed immediately after the new file is opened.  The
1304    * fclose can then block without affecting new updates.
1305    */
1306   if (journal_fh != NULL)
1307   {
1308     old_fh = journal_fh;
1309     rename(journal_cur, journal_old);
1310     ++stats_journal_rotate;
1311   }
1313   journal_fh = fopen(journal_cur, "a");
1314   pthread_mutex_unlock(&journal_lock);
1316   if (old_fh != NULL)
1317     fclose(old_fh);
1319   if (journal_fh == NULL)
1320     RRDD_LOG(LOG_CRIT,
1321              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1322              journal_cur, rrd_strerror(errno));
1324 } /* }}} static void journal_rotate */
1326 static void journal_done(void) /* {{{ */
1328   if (journal_cur == NULL)
1329     return;
1331   pthread_mutex_lock(&journal_lock);
1332   if (journal_fh != NULL)
1333   {
1334     fclose(journal_fh);
1335     journal_fh = NULL;
1336   }
1338   RRDD_LOG(LOG_INFO, "removing journals");
1340   unlink(journal_old);
1341   unlink(journal_cur);
1342   pthread_mutex_unlock(&journal_lock);
1344 } /* }}} static void journal_done */
1346 static int journal_write(char *cmd, char *args) /* {{{ */
1348   int chars;
1350   if (journal_fh == NULL)
1351     return 0;
1353   pthread_mutex_lock(&journal_lock);
1354   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1355   pthread_mutex_unlock(&journal_lock);
1357   if (chars > 0)
1358   {
1359     pthread_mutex_lock(&stats_lock);
1360     stats_journal_bytes += chars;
1361     pthread_mutex_unlock(&stats_lock);
1362   }
1364   return chars;
1365 } /* }}} static int journal_write */
1367 static int journal_replay (const char *file) /* {{{ */
1369   FILE *fh;
1370   int entry_cnt = 0;
1371   int fail_cnt = 0;
1372   uint64_t line = 0;
1373   char entry[CMD_MAX];
1375   if (file == NULL) return 0;
1377   fh = fopen(file, "r");
1378   if (fh == NULL)
1379   {
1380     if (errno != ENOENT)
1381       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1382                file, rrd_strerror(errno));
1383     return 0;
1384   }
1385   else
1386     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1388   while(!feof(fh))
1389   {
1390     size_t entry_len;
1392     ++line;
1393     fgets(entry, sizeof(entry), fh);
1394     entry_len = strlen(entry);
1396     /* check \n termination in case journal writing crashed mid-line */
1397     if (entry_len == 0)
1398       continue;
1399     else if (entry[entry_len - 1] != '\n')
1400     {
1401       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1402       ++fail_cnt;
1403       continue;
1404     }
1406     entry[entry_len - 1] = '\0';
1408     if (handle_request(-1, entry, entry_len) == 0)
1409       ++entry_cnt;
1410     else
1411       ++fail_cnt;
1412   }
1414   fclose(fh);
1416   if (entry_cnt > 0)
1417   {
1418     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1419              entry_cnt, fail_cnt);
1420     return 1;
1421   }
1422   else
1423     return 0;
1425 } /* }}} static int journal_replay */
1427 static void *connection_thread_main (void *args) /* {{{ */
1429   pthread_t self;
1430   int i;
1431   int fd;
1432   
1433   fd = *((int *) args);
1434   free (args);
1436   pthread_mutex_lock (&connection_threads_lock);
1437   {
1438     pthread_t *temp;
1440     temp = (pthread_t *) realloc (connection_threads,
1441         sizeof (pthread_t) * (connection_threads_num + 1));
1442     if (temp == NULL)
1443     {
1444       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1445     }
1446     else
1447     {
1448       connection_threads = temp;
1449       connection_threads[connection_threads_num] = pthread_self ();
1450       connection_threads_num++;
1451     }
1452   }
1453   pthread_mutex_unlock (&connection_threads_lock);
1455   while (do_shutdown == 0)
1456   {
1457     char buffer[CMD_MAX];
1459     struct pollfd pollfd;
1460     int status;
1462     pollfd.fd = fd;
1463     pollfd.events = POLLIN | POLLPRI;
1464     pollfd.revents = 0;
1466     status = poll (&pollfd, 1, /* timeout = */ 500);
1467     if (status == 0) /* timeout */
1468       continue;
1469     else if (status < 0) /* error */
1470     {
1471       status = errno;
1472       if (status == EINTR)
1473         continue;
1474       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1475       continue;
1476     }
1478     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1479     {
1480       close (fd);
1481       break;
1482     }
1483     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1484     {
1485       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1486           "poll(2) returned something unexpected: %#04hx",
1487           pollfd.revents);
1488       close (fd);
1489       break;
1490     }
1492     status = (int) sread (fd, buffer, sizeof (buffer));
1493     if (status <= 0)
1494     {
1495       close (fd);
1497       if (status < 0)
1498         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1500       break;
1501     }
1503     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1504     if (status != 0)
1505       break;
1506   }
1508   close(fd);
1510   self = pthread_self ();
1511   /* Remove this thread from the connection threads list */
1512   pthread_mutex_lock (&connection_threads_lock);
1513   /* Find out own index in the array */
1514   for (i = 0; i < connection_threads_num; i++)
1515     if (pthread_equal (connection_threads[i], self) != 0)
1516       break;
1517   assert (i < connection_threads_num);
1519   /* Move the trailing threads forward. */
1520   if (i < (connection_threads_num - 1))
1521   {
1522     memmove (connection_threads + i,
1523         connection_threads + i + 1,
1524         sizeof (pthread_t) * (connection_threads_num - i - 1));
1525   }
1527   connection_threads_num--;
1528   pthread_mutex_unlock (&connection_threads_lock);
1530   return (NULL);
1531 } /* }}} void *connection_thread_main */
1533 static int open_listen_socket_unix (const char *path) /* {{{ */
1535   int fd;
1536   struct sockaddr_un sa;
1537   listen_socket_t *temp;
1538   int status;
1540   temp = (listen_socket_t *) realloc (listen_fds,
1541       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1542   if (temp == NULL)
1543   {
1544     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1545     return (-1);
1546   }
1547   listen_fds = temp;
1548   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1550   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1551   if (fd < 0)
1552   {
1553     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1554     return (-1);
1555   }
1557   memset (&sa, 0, sizeof (sa));
1558   sa.sun_family = AF_UNIX;
1559   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1561   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1562   if (status != 0)
1563   {
1564     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1565     close (fd);
1566     unlink (path);
1567     return (-1);
1568   }
1570   status = listen (fd, /* backlog = */ 10);
1571   if (status != 0)
1572   {
1573     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1574     close (fd);
1575     unlink (path);
1576     return (-1);
1577   }
1578   
1579   listen_fds[listen_fds_num].fd = fd;
1580   snprintf (listen_fds[listen_fds_num].path,
1581       sizeof (listen_fds[listen_fds_num].path) - 1,
1582       "unix:%s", path);
1583   listen_fds_num++;
1585   return (0);
1586 } /* }}} int open_listen_socket_unix */
1588 static int open_listen_socket (const char *addr_orig) /* {{{ */
1590   struct addrinfo ai_hints;
1591   struct addrinfo *ai_res;
1592   struct addrinfo *ai_ptr;
1593   char addr_copy[NI_MAXHOST];
1594   char *addr;
1595   char *port;
1596   int status;
1598   assert (addr_orig != NULL);
1600   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1601   addr_copy[sizeof (addr_copy) - 1] = 0;
1602   addr = addr_copy;
1604   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1605     return (open_listen_socket_unix (addr + strlen ("unix:")));
1606   else if (addr[0] == '/')
1607     return (open_listen_socket_unix (addr));
1609   memset (&ai_hints, 0, sizeof (ai_hints));
1610   ai_hints.ai_flags = 0;
1611 #ifdef AI_ADDRCONFIG
1612   ai_hints.ai_flags |= AI_ADDRCONFIG;
1613 #endif
1614   ai_hints.ai_family = AF_UNSPEC;
1615   ai_hints.ai_socktype = SOCK_STREAM;
1617   port = NULL;
1618  if (*addr == '[') /* IPv6+port format */
1619   {
1620     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1621     addr++;
1623     port = strchr (addr, ']');
1624     if (port == NULL)
1625     {
1626       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1627           addr_orig);
1628       return (-1);
1629     }
1630     *port = 0;
1631     port++;
1633     if (*port == ':')
1634       port++;
1635     else if (*port == 0)
1636       port = NULL;
1637     else
1638     {
1639       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1640           port);
1641       return (-1);
1642     }
1643   } /* if (*addr = ']') */
1644   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1645   {
1646     port = rindex(addr, ':');
1647     if (port != NULL)
1648     {
1649       *port = 0;
1650       port++;
1651     }
1652   }
1653   ai_res = NULL;
1654   status = getaddrinfo (addr,
1655                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1656                         &ai_hints, &ai_res);
1657   if (status != 0)
1658   {
1659     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1660         "%s", addr, gai_strerror (status));
1661     return (-1);
1662   }
1664   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1665   {
1666     int fd;
1667     listen_socket_t *temp;
1668     int one = 1;
1670     temp = (listen_socket_t *) realloc (listen_fds,
1671         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1672     if (temp == NULL)
1673     {
1674       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1675       continue;
1676     }
1677     listen_fds = temp;
1678     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1680     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1681     if (fd < 0)
1682     {
1683       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1684       continue;
1685     }
1687     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1689     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1690     if (status != 0)
1691     {
1692       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1693       close (fd);
1694       continue;
1695     }
1697     status = listen (fd, /* backlog = */ 10);
1698     if (status != 0)
1699     {
1700       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1701       close (fd);
1702       return (-1);
1703     }
1705     listen_fds[listen_fds_num].fd = fd;
1706     strncpy (listen_fds[listen_fds_num].path, addr,
1707         sizeof (listen_fds[listen_fds_num].path) - 1);
1708     listen_fds_num++;
1709   } /* for (ai_ptr) */
1711   return (0);
1712 } /* }}} int open_listen_socket */
1714 static int close_listen_sockets (void) /* {{{ */
1716   size_t i;
1718   for (i = 0; i < listen_fds_num; i++)
1719   {
1720     close (listen_fds[i].fd);
1721     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1722       unlink (listen_fds[i].path + strlen ("unix:"));
1723   }
1725   free (listen_fds);
1726   listen_fds = NULL;
1727   listen_fds_num = 0;
1729   return (0);
1730 } /* }}} int close_listen_sockets */
1732 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1734   struct pollfd *pollfds;
1735   int pollfds_num;
1736   int status;
1737   int i;
1739   for (i = 0; i < config_listen_address_list_len; i++)
1740     open_listen_socket (config_listen_address_list[i]);
1742   if (config_listen_address_list_len < 1)
1743     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1745   if (listen_fds_num < 1)
1746   {
1747     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1748         "could be opened. Sorry.");
1749     return (NULL);
1750   }
1752   pollfds_num = listen_fds_num;
1753   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1754   if (pollfds == NULL)
1755   {
1756     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1757     return (NULL);
1758   }
1759   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1761   RRDD_LOG(LOG_INFO, "listening for connections");
1763   while (do_shutdown == 0)
1764   {
1765     assert (pollfds_num == ((int) listen_fds_num));
1766     for (i = 0; i < pollfds_num; i++)
1767     {
1768       pollfds[i].fd = listen_fds[i].fd;
1769       pollfds[i].events = POLLIN | POLLPRI;
1770       pollfds[i].revents = 0;
1771     }
1773     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1774     if (status == 0)
1775     {
1776       continue; /* timeout */
1777     }
1778     else if (status < 0)
1779     {
1780       status = errno;
1781       if (status != EINTR)
1782       {
1783         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1784       }
1785       continue;
1786     }
1788     for (i = 0; i < pollfds_num; i++)
1789     {
1790       int *client_sd;
1791       struct sockaddr_storage client_sa;
1792       socklen_t client_sa_size;
1793       pthread_t tid;
1794       pthread_attr_t attr;
1796       if (pollfds[i].revents == 0)
1797         continue;
1799       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1800       {
1801         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1802             "poll(2) returned something unexpected for listen FD #%i.",
1803             pollfds[i].fd);
1804         continue;
1805       }
1807       client_sd = (int *) malloc (sizeof (int));
1808       if (client_sd == NULL)
1809       {
1810         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1811         continue;
1812       }
1814       client_sa_size = sizeof (client_sa);
1815       *client_sd = accept (pollfds[i].fd,
1816           (struct sockaddr *) &client_sa, &client_sa_size);
1817       if (*client_sd < 0)
1818       {
1819         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1820         continue;
1821       }
1823       pthread_attr_init (&attr);
1824       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1826       status = pthread_create (&tid, &attr, connection_thread_main,
1827           /* args = */ (void *) client_sd);
1828       if (status != 0)
1829       {
1830         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1831         close (*client_sd);
1832         free (client_sd);
1833         continue;
1834       }
1835     } /* for (pollfds_num) */
1836   } /* while (do_shutdown == 0) */
1838   RRDD_LOG(LOG_INFO, "starting shutdown");
1840   close_listen_sockets ();
1842   pthread_mutex_lock (&connection_threads_lock);
1843   while (connection_threads_num > 0)
1844   {
1845     pthread_t wait_for;
1847     wait_for = connection_threads[0];
1849     pthread_mutex_unlock (&connection_threads_lock);
1850     pthread_join (wait_for, /* retval = */ NULL);
1851     pthread_mutex_lock (&connection_threads_lock);
1852   }
1853   pthread_mutex_unlock (&connection_threads_lock);
1855   return (NULL);
1856 } /* }}} void *listen_thread_main */
1858 static int daemonize (void) /* {{{ */
1860   int status;
1861   int fd;
1863   /* These structures are static, because `sigaction' behaves weird if the are
1864    * overwritten.. */
1865   static struct sigaction sa_int;
1866   static struct sigaction sa_term;
1867   static struct sigaction sa_pipe;
1869   fd = open_pidfile();
1870   if (fd < 0) return fd;
1872   if (!stay_foreground)
1873   {
1874     pid_t child;
1875     char *base_dir;
1877     child = fork ();
1878     if (child < 0)
1879     {
1880       fprintf (stderr, "daemonize: fork(2) failed.\n");
1881       return (-1);
1882     }
1883     else if (child > 0)
1884     {
1885       return (1);
1886     }
1888     /* Change into the /tmp directory. */
1889     base_dir = (config_base_dir != NULL)
1890       ? config_base_dir
1891       : "/tmp";
1892     status = chdir (base_dir);
1893     if (status != 0)
1894     {
1895       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1896       return (-1);
1897     }
1899     /* Become session leader */
1900     setsid ();
1902     /* Open the first three file descriptors to /dev/null */
1903     close (2);
1904     close (1);
1905     close (0);
1907     open ("/dev/null", O_RDWR);
1908     dup (0);
1909     dup (0);
1910   } /* if (!stay_foreground) */
1912   /* Install signal handlers */
1913   memset (&sa_int, 0, sizeof (sa_int));
1914   sa_int.sa_handler = sig_int_handler;
1915   sigaction (SIGINT, &sa_int, NULL);
1917   memset (&sa_term, 0, sizeof (sa_term));
1918   sa_term.sa_handler = sig_term_handler;
1919   sigaction (SIGTERM, &sa_term, NULL);
1921   memset (&sa_pipe, 0, sizeof (sa_pipe));
1922   sa_pipe.sa_handler = SIG_IGN;
1923   sigaction (SIGPIPE, &sa_pipe, NULL);
1925   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1926   RRDD_LOG(LOG_INFO, "starting up");
1928   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1929   if (cache_tree == NULL)
1930   {
1931     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1932     return (-1);
1933   }
1935   status = write_pidfile (fd);
1936   return status;
1937 } /* }}} int daemonize */
1939 static int cleanup (void) /* {{{ */
1941   do_shutdown++;
1943   pthread_cond_signal (&cache_cond);
1944   pthread_join (queue_thread, /* return = */ NULL);
1946   remove_pidfile ();
1948   RRDD_LOG(LOG_INFO, "goodbye");
1949   closelog ();
1951   return (0);
1952 } /* }}} int cleanup */
1954 static int read_options (int argc, char **argv) /* {{{ */
1956   int option;
1957   int status = 0;
1959   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1960   {
1961     switch (option)
1962     {
1963       case 'g':
1964         stay_foreground=1;
1965         break;
1967       case 'l':
1968       {
1969         char **temp;
1971         temp = (char **) realloc (config_listen_address_list,
1972             sizeof (char *) * (config_listen_address_list_len + 1));
1973         if (temp == NULL)
1974         {
1975           fprintf (stderr, "read_options: realloc failed.\n");
1976           return (2);
1977         }
1978         config_listen_address_list = temp;
1980         temp[config_listen_address_list_len] = strdup (optarg);
1981         if (temp[config_listen_address_list_len] == NULL)
1982         {
1983           fprintf (stderr, "read_options: strdup failed.\n");
1984           return (2);
1985         }
1986         config_listen_address_list_len++;
1987       }
1988       break;
1990       case 'f':
1991       {
1992         int temp;
1994         temp = atoi (optarg);
1995         if (temp > 0)
1996           config_flush_interval = temp;
1997         else
1998         {
1999           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2000           status = 3;
2001         }
2002       }
2003       break;
2005       case 'w':
2006       {
2007         int temp;
2009         temp = atoi (optarg);
2010         if (temp > 0)
2011           config_write_interval = temp;
2012         else
2013         {
2014           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2015           status = 2;
2016         }
2017       }
2018       break;
2020       case 'z':
2021       {
2022         int temp;
2024         temp = atoi(optarg);
2025         if (temp > 0)
2026           config_write_jitter = temp;
2027         else
2028         {
2029           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2030           status = 2;
2031         }
2033         break;
2034       }
2036       case 'b':
2037       {
2038         size_t len;
2040         if (config_base_dir != NULL)
2041           free (config_base_dir);
2042         config_base_dir = strdup (optarg);
2043         if (config_base_dir == NULL)
2044         {
2045           fprintf (stderr, "read_options: strdup failed.\n");
2046           return (3);
2047         }
2049         len = strlen (config_base_dir);
2050         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2051         {
2052           config_base_dir[len - 1] = 0;
2053           len--;
2054         }
2056         if (len < 1)
2057         {
2058           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2059           return (4);
2060         }
2061       }
2062       break;
2064       case 'p':
2065       {
2066         if (config_pid_file != NULL)
2067           free (config_pid_file);
2068         config_pid_file = strdup (optarg);
2069         if (config_pid_file == NULL)
2070         {
2071           fprintf (stderr, "read_options: strdup failed.\n");
2072           return (3);
2073         }
2074       }
2075       break;
2077       case 'j':
2078       {
2079         struct stat statbuf;
2080         const char *dir = optarg;
2082         status = stat(dir, &statbuf);
2083         if (status != 0)
2084         {
2085           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2086           return 6;
2087         }
2089         if (!S_ISDIR(statbuf.st_mode)
2090             || access(dir, R_OK|W_OK|X_OK) != 0)
2091         {
2092           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2093                   errno ? rrd_strerror(errno) : "");
2094           return 6;
2095         }
2097         journal_cur = malloc(PATH_MAX + 1);
2098         journal_old = malloc(PATH_MAX + 1);
2099         if (journal_cur == NULL || journal_old == NULL)
2100         {
2101           fprintf(stderr, "malloc failure for journal files\n");
2102           return 6;
2103         }
2104         else 
2105         {
2106           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2107           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2108         }
2109       }
2110       break;
2112       case 'h':
2113       case '?':
2114         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2115             "\n"
2116             "Usage: rrdcached [options]\n"
2117             "\n"
2118             "Valid options are:\n"
2119             "  -l <address>  Socket address to listen to.\n"
2120             "  -w <seconds>  Interval in which to write data.\n"
2121             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2122             "  -f <seconds>  Interval in which to flush dead data.\n"
2123             "  -p <file>     Location of the PID-file.\n"
2124             "  -b <dir>      Base directory to change to.\n"
2125             "  -g            Do not fork and run in the foreground.\n"
2126             "  -j <dir>      Directory in which to create the journal files.\n"
2127             "\n"
2128             "For more information and a detailed description of all options "
2129             "please refer\n"
2130             "to the rrdcached(1) manual page.\n",
2131             VERSION);
2132         status = -1;
2133         break;
2134     } /* switch (option) */
2135   } /* while (getopt) */
2137   /* advise the user when values are not sane */
2138   if (config_flush_interval < 2 * config_write_interval)
2139     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2140             " 2x write interval (-w) !\n");
2141   if (config_write_jitter > config_write_interval)
2142     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2143             " write interval (-w) !\n");
2145   return (status);
2146 } /* }}} int read_options */
2148 int main (int argc, char **argv)
2150   int status;
2152   status = read_options (argc, argv);
2153   if (status != 0)
2154   {
2155     if (status < 0)
2156       status = 0;
2157     return (status);
2158   }
2160   status = daemonize ();
2161   if (status == 1)
2162   {
2163     struct sigaction sigchld;
2165     memset (&sigchld, 0, sizeof (sigchld));
2166     sigchld.sa_handler = SIG_IGN;
2167     sigaction (SIGCHLD, &sigchld, NULL);
2169     return (0);
2170   }
2171   else if (status != 0)
2172   {
2173     fprintf (stderr, "daemonize failed, exiting.\n");
2174     return (1);
2175   }
2177   if (journal_cur != NULL)
2178   {
2179     int had_journal = 0;
2181     pthread_mutex_lock(&journal_lock);
2183     RRDD_LOG(LOG_INFO, "checking for journal files");
2185     had_journal += journal_replay(journal_old);
2186     had_journal += journal_replay(journal_cur);
2188     if (had_journal)
2189       flush_old_values(-1);
2191     pthread_mutex_unlock(&journal_lock);
2192     journal_rotate();
2194     RRDD_LOG(LOG_INFO, "journal processing complete");
2195   }
2197   /* start the queue thread */
2198   memset (&queue_thread, 0, sizeof (queue_thread));
2199   status = pthread_create (&queue_thread,
2200                            NULL, /* attr */
2201                            queue_thread_main,
2202                            NULL); /* args */
2203   if (status != 0)
2204   {
2205     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2206     cleanup();
2207     return (1);
2208   }
2210   listen_thread_main (NULL);
2211   cleanup ();
2213   return (0);
2214 } /* int main */
2216 /*
2217  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2218  */