Code

win32 portability patch and win32/rrdlib.vcproj file for the source
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116  * Types
117  */
118 typedef enum
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct cache_item_s;
148 typedef struct cache_item_s cache_item_t;
149 struct cache_item_s
151   char *file;
152   char **values;
153   int values_num;
154   time_t last_flush_time;
155   time_t last_update_stamp;
156 #define CI_FLAGS_IN_TREE  (1<<0)
157 #define CI_FLAGS_IN_QUEUE (1<<1)
158   int flags;
159   pthread_cond_t  flushed;
160   cache_item_t *prev;
161   cache_item_t *next;
162 };
164 struct callback_flush_data_s
166   time_t now;
167   time_t abs_timeout;
168   char **keys;
169   size_t keys_num;
170 };
171 typedef struct callback_flush_data_s callback_flush_data_t;
173 enum queue_side_e
175   HEAD,
176   TAIL
177 };
178 typedef enum queue_side_e queue_side_t;
180 /* max length of socket command or response */
181 #define CMD_MAX 4096
182 #define RBUF_SIZE (CMD_MAX*2)
184 /*
185  * Variables
186  */
187 static int stay_foreground = 0;
188 static uid_t daemon_uid;
190 static listen_socket_t *listen_fds = NULL;
191 static size_t listen_fds_num = 0;
193 static int do_shutdown = 0;
195 static pthread_t *queue_threads;
196 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
197 static int config_queue_threads = 4;
199 static pthread_t flush_thread;
200 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
202 static pthread_t *connection_threads = NULL;
203 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
204 static int connection_threads_num = 0;
206 /* Cache stuff */
207 static GTree          *cache_tree = NULL;
208 static cache_item_t   *cache_queue_head = NULL;
209 static cache_item_t   *cache_queue_tail = NULL;
210 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
212 static int config_write_interval = 300;
213 static int config_write_jitter   = 0;
214 static int config_flush_interval = 3600;
215 static int config_flush_at_shutdown = 0;
216 static char *config_pid_file = NULL;
217 static char *config_base_dir = NULL;
218 static size_t _config_base_dir_len = 0;
219 static int config_write_base_only = 0;
221 static listen_socket_t **config_listen_address_list = NULL;
222 static int config_listen_address_list_len = 0;
224 static uint64_t stats_queue_length = 0;
225 static uint64_t stats_updates_received = 0;
226 static uint64_t stats_flush_received = 0;
227 static uint64_t stats_updates_written = 0;
228 static uint64_t stats_data_sets_written = 0;
229 static uint64_t stats_journal_bytes = 0;
230 static uint64_t stats_journal_rotate = 0;
231 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
233 /* Journaled updates */
234 static char *journal_cur = NULL;
235 static char *journal_old = NULL;
236 static FILE *journal_fh = NULL;
237 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
238 static int journal_write(char *cmd, char *args);
239 static void journal_done(void);
240 static void journal_rotate(void);
242 /* 
243  * Functions
244  */
245 static void sig_common (const char *sig) /* {{{ */
247   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
248   do_shutdown++;
249   pthread_cond_broadcast(&flush_cond);
250   pthread_cond_broadcast(&queue_cond);
251 } /* }}} void sig_common */
253 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
255   sig_common("INT");
256 } /* }}} void sig_int_handler */
258 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
260   sig_common("TERM");
261 } /* }}} void sig_term_handler */
263 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
265   config_flush_at_shutdown = 1;
266   sig_common("USR1");
267 } /* }}} void sig_usr1_handler */
269 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
271   config_flush_at_shutdown = 0;
272   sig_common("USR2");
273 } /* }}} void sig_usr2_handler */
275 static void install_signal_handlers(void) /* {{{ */
277   /* These structures are static, because `sigaction' behaves weird if the are
278    * overwritten.. */
279   static struct sigaction sa_int;
280   static struct sigaction sa_term;
281   static struct sigaction sa_pipe;
282   static struct sigaction sa_usr1;
283   static struct sigaction sa_usr2;
285   /* Install signal handlers */
286   memset (&sa_int, 0, sizeof (sa_int));
287   sa_int.sa_handler = sig_int_handler;
288   sigaction (SIGINT, &sa_int, NULL);
290   memset (&sa_term, 0, sizeof (sa_term));
291   sa_term.sa_handler = sig_term_handler;
292   sigaction (SIGTERM, &sa_term, NULL);
294   memset (&sa_pipe, 0, sizeof (sa_pipe));
295   sa_pipe.sa_handler = SIG_IGN;
296   sigaction (SIGPIPE, &sa_pipe, NULL);
298   memset (&sa_pipe, 0, sizeof (sa_usr1));
299   sa_usr1.sa_handler = sig_usr1_handler;
300   sigaction (SIGUSR1, &sa_usr1, NULL);
302   memset (&sa_usr2, 0, sizeof (sa_usr2));
303   sa_usr2.sa_handler = sig_usr2_handler;
304   sigaction (SIGUSR2, &sa_usr2, NULL);
306 } /* }}} void install_signal_handlers */
308 static int open_pidfile(char *action, int oflag) /* {{{ */
310   int fd;
311   char *file;
313   file = (config_pid_file != NULL)
314     ? config_pid_file
315     : LOCALSTATEDIR "/run/rrdcached.pid";
317   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
318   if (fd < 0)
319     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
320             action, file, rrd_strerror(errno));
322   return(fd);
323 } /* }}} static int open_pidfile */
325 /* check existing pid file to see whether a daemon is running */
326 static int check_pidfile(void)
328   int pid_fd;
329   pid_t pid;
330   char pid_str[16];
332   pid_fd = open_pidfile("open", O_RDWR);
333   if (pid_fd < 0)
334     return pid_fd;
336   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
337     return -1;
339   pid = atoi(pid_str);
340   if (pid <= 0)
341     return -1;
343   /* another running process that we can signal COULD be
344    * a competing rrdcached */
345   if (pid != getpid() && kill(pid, 0) == 0)
346   {
347     fprintf(stderr,
348             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
349     close(pid_fd);
350     return -1;
351   }
353   lseek(pid_fd, 0, SEEK_SET);
354   ftruncate(pid_fd, 0);
356   fprintf(stderr,
357           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
358           "rrdcached: starting normally.\n", pid);
360   return pid_fd;
361 } /* }}} static int check_pidfile */
363 static int write_pidfile (int fd) /* {{{ */
365   pid_t pid;
366   FILE *fh;
368   pid = getpid ();
370   fh = fdopen (fd, "w");
371   if (fh == NULL)
372   {
373     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
374     close(fd);
375     return (-1);
376   }
378   fprintf (fh, "%i\n", (int) pid);
379   fclose (fh);
381   return (0);
382 } /* }}} int write_pidfile */
384 static int remove_pidfile (void) /* {{{ */
386   char *file;
387   int status;
389   file = (config_pid_file != NULL)
390     ? config_pid_file
391     : LOCALSTATEDIR "/run/rrdcached.pid";
393   status = unlink (file);
394   if (status == 0)
395     return (0);
396   return (errno);
397 } /* }}} int remove_pidfile */
399 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
401   char *eol;
403   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
404                sock->next_read - sock->next_cmd);
406   if (eol == NULL)
407   {
408     /* no commands left, move remainder back to front of rbuf */
409     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
410             sock->next_read - sock->next_cmd);
411     sock->next_read -= sock->next_cmd;
412     sock->next_cmd = 0;
413     *len = 0;
414     return NULL;
415   }
416   else
417   {
418     char *cmd = sock->rbuf + sock->next_cmd;
419     *eol = '\0';
421     sock->next_cmd = eol - sock->rbuf + 1;
423     if (eol > sock->rbuf && *(eol-1) == '\r')
424       *(--eol) = '\0'; /* handle "\r\n" EOL */
426     *len = eol - cmd;
428     return cmd;
429   }
431   /* NOTREACHED */
432   assert(1==0);
435 /* add the characters directly to the write buffer */
436 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
438   char *new_buf;
440   assert(sock != NULL);
442   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
443   if (new_buf == NULL)
444   {
445     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
446     return -1;
447   }
449   strncpy(new_buf + sock->wbuf_len, str, len + 1);
451   sock->wbuf = new_buf;
452   sock->wbuf_len += len;
454   return 0;
455 } /* }}} static int add_to_wbuf */
457 /* add the text to the "extra" info that's sent after the status line */
458 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
460   va_list argp;
461   char buffer[CMD_MAX];
462   int len;
464   if (sock == NULL) return 0; /* journal replay mode */
465   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
467   va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
470 #else
471   len = vsprintf(buffer, fmt, argp);
472 #endif
473   va_end(argp);
474   if (len < 0)
475   {
476     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
477     return -1;
478   }
480   return add_to_wbuf(sock, buffer, len);
481 } /* }}} static int add_response_info */
483 static int count_lines(char *str) /* {{{ */
485   int lines = 0;
487   if (str != NULL)
488   {
489     while ((str = strchr(str, '\n')) != NULL)
490     {
491       ++lines;
492       ++str;
493     }
494   }
496   return lines;
497 } /* }}} static int count_lines */
499 /* send the response back to the user.
500  * returns 0 on success, -1 on error
501  * write buffer is always zeroed after this call */
502 static int send_response (listen_socket_t *sock, response_code rc,
503                           char *fmt, ...) /* {{{ */
505   va_list argp;
506   char buffer[CMD_MAX];
507   int lines;
508   ssize_t wrote;
509   int rclen, len;
511   if (sock == NULL) return rc;  /* journal replay mode */
513   if (sock->batch_start)
514   {
515     if (rc == RESP_OK)
516       return rc; /* no response on success during BATCH */
517     lines = sock->batch_cmd;
518   }
519   else if (rc == RESP_OK)
520     lines = count_lines(sock->wbuf);
521   else
522     lines = -1;
524   rclen = sprintf(buffer, "%d ", lines);
525   va_start(argp, fmt);
526 #ifdef HAVE_VSNPRINTF
527   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
528 #else
529   len = vsprintf(buffer+rclen, fmt, argp);
530 #endif
531   va_end(argp);
532   if (len < 0)
533     return -1;
535   len += rclen;
537   /* append the result to the wbuf, don't write to the user */
538   if (sock->batch_start)
539     return add_to_wbuf(sock, buffer, len);
541   /* first write must be complete */
542   if (len != write(sock->fd, buffer, len))
543   {
544     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
545     return -1;
546   }
548   if (sock->wbuf != NULL && rc == RESP_OK)
549   {
550     wrote = 0;
551     while (wrote < sock->wbuf_len)
552     {
553       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
554       if (wb <= 0)
555       {
556         RRDD_LOG(LOG_INFO, "send_response: could not write results");
557         return -1;
558       }
559       wrote += wb;
560     }
561   }
563   free(sock->wbuf); sock->wbuf = NULL;
564   sock->wbuf_len = 0;
566   return 0;
567 } /* }}} */
569 static void wipe_ci_values(cache_item_t *ci, time_t when)
571   ci->values = NULL;
572   ci->values_num = 0;
574   ci->last_flush_time = when;
575   if (config_write_jitter > 0)
576     ci->last_flush_time += (random() % config_write_jitter);
579 /* remove_from_queue
580  * remove a "cache_item_t" item from the queue.
581  * must hold 'cache_lock' when calling this
582  */
583 static void remove_from_queue(cache_item_t *ci) /* {{{ */
585   if (ci == NULL) return;
586   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
588   if (ci->prev == NULL)
589     cache_queue_head = ci->next; /* reset head */
590   else
591     ci->prev->next = ci->next;
593   if (ci->next == NULL)
594     cache_queue_tail = ci->prev; /* reset the tail */
595   else
596     ci->next->prev = ci->prev;
598   ci->next = ci->prev = NULL;
599   ci->flags &= ~CI_FLAGS_IN_QUEUE;
601   pthread_mutex_lock (&stats_lock);
602   assert (stats_queue_length > 0);
603   stats_queue_length--;
604   pthread_mutex_unlock (&stats_lock);
606 } /* }}} static void remove_from_queue */
608 /* free the resources associated with the cache_item_t
609  * must hold cache_lock when calling this function
610  */
611 static void *free_cache_item(cache_item_t *ci) /* {{{ */
613   if (ci == NULL) return NULL;
615   remove_from_queue(ci);
617   for (int i=0; i < ci->values_num; i++)
618     free(ci->values[i]);
620   free (ci->values);
621   free (ci->file);
623   /* in case anyone is waiting */
624   pthread_cond_broadcast(&ci->flushed);
626   free (ci);
628   return NULL;
629 } /* }}} static void *free_cache_item */
631 /*
632  * enqueue_cache_item:
633  * `cache_lock' must be acquired before calling this function!
634  */
635 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
636     queue_side_t side)
638   if (ci == NULL)
639     return (-1);
641   if (ci->values_num == 0)
642     return (0);
644   if (side == HEAD)
645   {
646     if (cache_queue_head == ci)
647       return 0;
649     /* remove if further down in queue */
650     remove_from_queue(ci);
652     ci->prev = NULL;
653     ci->next = cache_queue_head;
654     if (ci->next != NULL)
655       ci->next->prev = ci;
656     cache_queue_head = ci;
658     if (cache_queue_tail == NULL)
659       cache_queue_tail = cache_queue_head;
660   }
661   else /* (side == TAIL) */
662   {
663     /* We don't move values back in the list.. */
664     if (ci->flags & CI_FLAGS_IN_QUEUE)
665       return (0);
667     assert (ci->next == NULL);
668     assert (ci->prev == NULL);
670     ci->prev = cache_queue_tail;
672     if (cache_queue_tail == NULL)
673       cache_queue_head = ci;
674     else
675       cache_queue_tail->next = ci;
677     cache_queue_tail = ci;
678   }
680   ci->flags |= CI_FLAGS_IN_QUEUE;
682   pthread_cond_signal(&queue_cond);
683   pthread_mutex_lock (&stats_lock);
684   stats_queue_length++;
685   pthread_mutex_unlock (&stats_lock);
687   return (0);
688 } /* }}} int enqueue_cache_item */
690 /*
691  * tree_callback_flush:
692  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
693  * while this is in progress.
694  */
695 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
696     gpointer data)
698   cache_item_t *ci;
699   callback_flush_data_t *cfd;
701   ci = (cache_item_t *) value;
702   cfd = (callback_flush_data_t *) data;
704   if (ci->flags & CI_FLAGS_IN_QUEUE)
705     return FALSE;
707   if ((ci->last_flush_time <= cfd->abs_timeout)
708       && (ci->values_num > 0))
709   {
710     enqueue_cache_item (ci, TAIL);
711   }
712   else if ((do_shutdown != 0)
713       && (ci->values_num > 0))
714   {
715     enqueue_cache_item (ci, TAIL);
716   }
717   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
718       && (ci->values_num <= 0))
719   {
720     char **temp;
722     temp = (char **) rrd_realloc (cfd->keys,
723         sizeof (char *) * (cfd->keys_num + 1));
724     if (temp == NULL)
725     {
726       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
727       return (FALSE);
728     }
729     cfd->keys = temp;
730     /* Make really sure this points to the _same_ place */
731     assert ((char *) key == ci->file);
732     cfd->keys[cfd->keys_num] = (char *) key;
733     cfd->keys_num++;
734   }
736   return (FALSE);
737 } /* }}} gboolean tree_callback_flush */
739 static int flush_old_values (int max_age)
741   callback_flush_data_t cfd;
742   size_t k;
744   memset (&cfd, 0, sizeof (cfd));
745   /* Pass the current time as user data so that we don't need to call
746    * `time' for each node. */
747   cfd.now = time (NULL);
748   cfd.keys = NULL;
749   cfd.keys_num = 0;
751   if (max_age > 0)
752     cfd.abs_timeout = cfd.now - max_age;
753   else
754     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
756   /* `tree_callback_flush' will return the keys of all values that haven't
757    * been touched in the last `config_flush_interval' seconds in `cfd'.
758    * The char*'s in this array point to the same memory as ci->file, so we
759    * don't need to free them separately. */
760   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
762   for (k = 0; k < cfd.keys_num; k++)
763   {
764     /* should never fail, since we have held the cache_lock
765      * the entire time */
766     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
767   }
769   if (cfd.keys != NULL)
770   {
771     free (cfd.keys);
772     cfd.keys = NULL;
773   }
775   return (0);
776 } /* int flush_old_values */
778 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
780   struct timeval now;
781   struct timespec next_flush;
782   int status;
784   gettimeofday (&now, NULL);
785   next_flush.tv_sec = now.tv_sec + config_flush_interval;
786   next_flush.tv_nsec = 1000 * now.tv_usec;
788   pthread_mutex_lock(&cache_lock);
790   while (!do_shutdown)
791   {
792     gettimeofday (&now, NULL);
793     if ((now.tv_sec > next_flush.tv_sec)
794         || ((now.tv_sec == next_flush.tv_sec)
795           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
796     {
797       /* Flush all values that haven't been written in the last
798        * `config_write_interval' seconds. */
799       flush_old_values (config_write_interval);
801       /* Determine the time of the next cache flush. */
802       next_flush.tv_sec =
803         now.tv_sec + next_flush.tv_sec % config_flush_interval;
805       /* unlock the cache while we rotate so we don't block incoming
806        * updates if the fsync() blocks on disk I/O */
807       pthread_mutex_unlock(&cache_lock);
808       journal_rotate();
809       pthread_mutex_lock(&cache_lock);
810     }
812     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
813     if (status != 0 && status != ETIMEDOUT)
814     {
815       RRDD_LOG (LOG_ERR, "flush_thread_main: "
816                 "pthread_cond_timedwait returned %i.", status);
817     }
818   }
820   if (config_flush_at_shutdown)
821     flush_old_values (-1); /* flush everything */
823   pthread_mutex_unlock(&cache_lock);
825   return NULL;
826 } /* void *flush_thread_main */
828 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
830   pthread_mutex_lock (&cache_lock);
832   while (!do_shutdown
833          || (cache_queue_head != NULL && config_flush_at_shutdown))
834   {
835     cache_item_t *ci;
836     char *file;
837     char **values;
838     int values_num;
839     int status;
840     int i;
842     /* Now, check if there's something to store away. If not, wait until
843      * something comes in.  if we are shutting down, do not wait around.  */
844     if (cache_queue_head == NULL && !do_shutdown)
845     {
846       status = pthread_cond_wait (&queue_cond, &cache_lock);
847       if ((status != 0) && (status != ETIMEDOUT))
848       {
849         RRDD_LOG (LOG_ERR, "queue_thread_main: "
850             "pthread_cond_wait returned %i.", status);
851       }
852     }
854     /* Check if a value has arrived. This may be NULL if we timed out or there
855      * was an interrupt such as a signal. */
856     if (cache_queue_head == NULL)
857       continue;
859     ci = cache_queue_head;
861     /* copy the relevant parts */
862     file = strdup (ci->file);
863     if (file == NULL)
864     {
865       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
866       continue;
867     }
869     assert(ci->values != NULL);
870     assert(ci->values_num > 0);
872     values = ci->values;
873     values_num = ci->values_num;
875     wipe_ci_values(ci, time(NULL));
876     remove_from_queue(ci);
878     pthread_mutex_unlock (&cache_lock);
880     rrd_clear_error ();
881     status = rrd_update_r (file, NULL, values_num, (void *) values);
882     if (status != 0)
883     {
884       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
885           "rrd_update_r (%s) failed with status %i. (%s)",
886           file, status, rrd_get_error());
887     }
889     journal_write("wrote", file);
890     pthread_cond_broadcast(&ci->flushed);
892     for (i = 0; i < values_num; i++)
893       free (values[i]);
895     free(values);
896     free(file);
898     if (status == 0)
899     {
900       pthread_mutex_lock (&stats_lock);
901       stats_updates_written++;
902       stats_data_sets_written += values_num;
903       pthread_mutex_unlock (&stats_lock);
904     }
906     pthread_mutex_lock (&cache_lock);
907   }
908   pthread_mutex_unlock (&cache_lock);
910   return (NULL);
911 } /* }}} void *queue_thread_main */
913 static int buffer_get_field (char **buffer_ret, /* {{{ */
914     size_t *buffer_size_ret, char **field_ret)
916   char *buffer;
917   size_t buffer_pos;
918   size_t buffer_size;
919   char *field;
920   size_t field_size;
921   int status;
923   buffer = *buffer_ret;
924   buffer_pos = 0;
925   buffer_size = *buffer_size_ret;
926   field = *buffer_ret;
927   field_size = 0;
929   if (buffer_size <= 0)
930     return (-1);
932   /* This is ensured by `handle_request'. */
933   assert (buffer[buffer_size - 1] == '\0');
935   status = -1;
936   while (buffer_pos < buffer_size)
937   {
938     /* Check for end-of-field or end-of-buffer */
939     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
940     {
941       field[field_size] = 0;
942       field_size++;
943       buffer_pos++;
944       status = 0;
945       break;
946     }
947     /* Handle escaped characters. */
948     else if (buffer[buffer_pos] == '\\')
949     {
950       if (buffer_pos >= (buffer_size - 1))
951         break;
952       buffer_pos++;
953       field[field_size] = buffer[buffer_pos];
954       field_size++;
955       buffer_pos++;
956     }
957     /* Normal operation */ 
958     else
959     {
960       field[field_size] = buffer[buffer_pos];
961       field_size++;
962       buffer_pos++;
963     }
964   } /* while (buffer_pos < buffer_size) */
966   if (status != 0)
967     return (status);
969   *buffer_ret = buffer + buffer_pos;
970   *buffer_size_ret = buffer_size - buffer_pos;
971   *field_ret = field;
973   return (0);
974 } /* }}} int buffer_get_field */
976 /* if we're restricting writes to the base directory,
977  * check whether the file falls within the dir
978  * returns 1 if OK, otherwise 0
979  */
980 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
982   assert(file != NULL);
984   if (!config_write_base_only
985       || sock == NULL /* journal replay */
986       || config_base_dir == NULL)
987     return 1;
989   if (strstr(file, "../") != NULL) goto err;
991   /* relative paths without "../" are ok */
992   if (*file != '/') return 1;
994   /* file must be of the format base + "/" + <1+ char filename> */
995   if (strlen(file) < _config_base_dir_len + 2) goto err;
996   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
997   if (*(file + _config_base_dir_len) != '/') goto err;
999   return 1;
1001 err:
1002   if (sock != NULL && sock->fd >= 0)
1003     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1005   return 0;
1006 } /* }}} static int check_file_access */
1008 /* when using a base dir, convert relative paths to absolute paths.
1009  * if necessary, modifies the "filename" pointer to point
1010  * to the new path created in "tmp".  "tmp" is provided
1011  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1012  *
1013  * this allows us to optimize for the expected case (absolute path)
1014  * with a no-op.
1015  */
1016 static void get_abs_path(char **filename, char *tmp)
1018   assert(tmp != NULL);
1019   assert(filename != NULL && *filename != NULL);
1021   if (config_base_dir == NULL || **filename == '/')
1022     return;
1024   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1025   *filename = tmp;
1026 } /* }}} static int get_abs_path */
1028 /* returns 1 if we have the required privilege level,
1029  * otherwise issue an error to the user on sock */
1030 static int has_privilege (listen_socket_t *sock, /* {{{ */
1031                           socket_privilege priv)
1033   if (sock == NULL) /* journal replay */
1034     return 1;
1036   if (sock->privilege >= priv)
1037     return 1;
1039   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1040 } /* }}} static int has_privilege */
1042 static int flush_file (const char *filename) /* {{{ */
1044   cache_item_t *ci;
1046   pthread_mutex_lock (&cache_lock);
1048   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1049   if (ci == NULL)
1050   {
1051     pthread_mutex_unlock (&cache_lock);
1052     return (ENOENT);
1053   }
1055   if (ci->values_num > 0)
1056   {
1057     /* Enqueue at head */
1058     enqueue_cache_item (ci, HEAD);
1059     pthread_cond_wait(&ci->flushed, &cache_lock);
1060   }
1062   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1063    * may have been purged during our cond_wait() */
1065   pthread_mutex_unlock(&cache_lock);
1067   return (0);
1068 } /* }}} int flush_file */
1070 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1071     char *buffer, size_t buffer_size)
1073   int status;
1074   char **help_text;
1075   char *command;
1077   char *help_help[2] =
1078   {
1079     "Command overview\n"
1080     ,
1081     "HELP [<command>]\n"
1082     "FLUSH <filename>\n"
1083     "FLUSHALL\n"
1084     "PENDING <filename>\n"
1085     "FORGET <filename>\n"
1086     "QUEUE\n"
1087     "UPDATE <filename> <values> [<values> ...]\n"
1088     "BATCH\n"
1089     "STATS\n"
1090     "QUIT\n"
1091   };
1093   char *help_flush[2] =
1094   {
1095     "Help for FLUSH\n"
1096     ,
1097     "Usage: FLUSH <filename>\n"
1098     "\n"
1099     "Adds the given filename to the head of the update queue and returns\n"
1100     "after it has been dequeued.\n"
1101   };
1103   char *help_flushall[2] =
1104   {
1105     "Help for FLUSHALL\n"
1106     ,
1107     "Usage: FLUSHALL\n"
1108     "\n"
1109     "Triggers writing of all pending updates.  Returns immediately.\n"
1110   };
1112   char *help_pending[2] =
1113   {
1114     "Help for PENDING\n"
1115     ,
1116     "Usage: PENDING <filename>\n"
1117     "\n"
1118     "Shows any 'pending' updates for a file, in order.\n"
1119     "The updates shown have not yet been written to the underlying RRD file.\n"
1120   };
1122   char *help_forget[2] =
1123   {
1124     "Help for FORGET\n"
1125     ,
1126     "Usage: FORGET <filename>\n"
1127     "\n"
1128     "Removes the file completely from the cache.\n"
1129     "Any pending updates for the file will be lost.\n"
1130   };
1132   char *help_queue[2] =
1133   {
1134     "Help for QUEUE\n"
1135     ,
1136     "Shows all files in the output queue.\n"
1137     "The output is zero or more lines in the following format:\n"
1138     "(where <num_vals> is the number of values to be written)\n"
1139     "\n"
1140     "<num_vals> <filename>\n"
1141     "\n"
1142   };
1144   char *help_update[2] =
1145   {
1146     "Help for UPDATE\n"
1147     ,
1148     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1149     "\n"
1150     "Adds the given file to the internal cache if it is not yet known and\n"
1151     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1152     "for details.\n"
1153     "\n"
1154     "Each <values> has the following form:\n"
1155     "  <values> = <time>:<value>[:<value>[...]]\n"
1156     "See the rrdupdate(1) manpage for details.\n"
1157   };
1159   char *help_stats[2] =
1160   {
1161     "Help for STATS\n"
1162     ,
1163     "Usage: STATS\n"
1164     "\n"
1165     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1166     "a description of the values.\n"
1167   };
1169   char *help_batch[2] =
1170   {
1171     "Help for BATCH\n"
1172     ,
1173     "The 'BATCH' command permits the client to initiate a bulk load\n"
1174     "   of commands to rrdcached.\n"
1175     "\n"
1176     "Usage:\n"
1177     "\n"
1178     "    client: BATCH\n"
1179     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1180     "    client: command #1\n"
1181     "    client: command #2\n"
1182     "    client: ... and so on\n"
1183     "    client: .\n"
1184     "    server: 2 errors\n"
1185     "    server: 7 message for command #7\n"
1186     "    server: 9 message for command #9\n"
1187     "\n"
1188     "For more information, consult the rrdcached(1) documentation.\n"
1189   };
1191   char *help_quit[2] =
1192   {
1193     "Help for QUIT\n"
1194     ,
1195     "Disconnect from rrdcached.\n"
1196   };
1198   status = buffer_get_field (&buffer, &buffer_size, &command);
1199   if (status != 0)
1200     help_text = help_help;
1201   else
1202   {
1203     if (strcasecmp (command, "update") == 0)
1204       help_text = help_update;
1205     else if (strcasecmp (command, "flush") == 0)
1206       help_text = help_flush;
1207     else if (strcasecmp (command, "flushall") == 0)
1208       help_text = help_flushall;
1209     else if (strcasecmp (command, "pending") == 0)
1210       help_text = help_pending;
1211     else if (strcasecmp (command, "forget") == 0)
1212       help_text = help_forget;
1213     else if (strcasecmp (command, "queue") == 0)
1214       help_text = help_queue;
1215     else if (strcasecmp (command, "stats") == 0)
1216       help_text = help_stats;
1217     else if (strcasecmp (command, "batch") == 0)
1218       help_text = help_batch;
1219     else if (strcasecmp (command, "quit") == 0)
1220       help_text = help_quit;
1221     else
1222       help_text = help_help;
1223   }
1225   add_response_info(sock, help_text[1]);
1226   return send_response(sock, RESP_OK, help_text[0]);
1227 } /* }}} int handle_request_help */
1229 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1231   uint64_t copy_queue_length;
1232   uint64_t copy_updates_received;
1233   uint64_t copy_flush_received;
1234   uint64_t copy_updates_written;
1235   uint64_t copy_data_sets_written;
1236   uint64_t copy_journal_bytes;
1237   uint64_t copy_journal_rotate;
1239   uint64_t tree_nodes_number;
1240   uint64_t tree_depth;
1242   pthread_mutex_lock (&stats_lock);
1243   copy_queue_length       = stats_queue_length;
1244   copy_updates_received   = stats_updates_received;
1245   copy_flush_received     = stats_flush_received;
1246   copy_updates_written    = stats_updates_written;
1247   copy_data_sets_written  = stats_data_sets_written;
1248   copy_journal_bytes      = stats_journal_bytes;
1249   copy_journal_rotate     = stats_journal_rotate;
1250   pthread_mutex_unlock (&stats_lock);
1252   pthread_mutex_lock (&cache_lock);
1253   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1254   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1255   pthread_mutex_unlock (&cache_lock);
1257   add_response_info(sock,
1258                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1259   add_response_info(sock,
1260                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1261   add_response_info(sock,
1262                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1263   add_response_info(sock,
1264                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1265   add_response_info(sock,
1266                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1267   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1268   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1269   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1270   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1272   send_response(sock, RESP_OK, "Statistics follow\n");
1274   return (0);
1275 } /* }}} int handle_request_stats */
1277 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1278     char *buffer, size_t buffer_size)
1280   char *file, file_tmp[PATH_MAX];
1281   int status;
1283   status = buffer_get_field (&buffer, &buffer_size, &file);
1284   if (status != 0)
1285   {
1286     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1287   }
1288   else
1289   {
1290     pthread_mutex_lock(&stats_lock);
1291     stats_flush_received++;
1292     pthread_mutex_unlock(&stats_lock);
1294     get_abs_path(&file, file_tmp);
1295     if (!check_file_access(file, sock)) return 0;
1297     status = flush_file (file);
1298     if (status == 0)
1299       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1300     else if (status == ENOENT)
1301     {
1302       /* no file in our tree; see whether it exists at all */
1303       struct stat statbuf;
1305       memset(&statbuf, 0, sizeof(statbuf));
1306       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1307         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1308       else
1309         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1310     }
1311     else if (status < 0)
1312       return send_response(sock, RESP_ERR, "Internal error.\n");
1313     else
1314       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1315   }
1317   /* NOTREACHED */
1318   assert(1==0);
1319 } /* }}} int handle_request_flush */
1321 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1323   int status;
1325   status = has_privilege(sock, PRIV_HIGH);
1326   if (status <= 0)
1327     return status;
1329   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1331   pthread_mutex_lock(&cache_lock);
1332   flush_old_values(-1);
1333   pthread_mutex_unlock(&cache_lock);
1335   return send_response(sock, RESP_OK, "Started flush.\n");
1336 } /* }}} static int handle_request_flushall */
1338 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1339                                   char *buffer, size_t buffer_size)
1341   int status;
1342   char *file, file_tmp[PATH_MAX];
1343   cache_item_t *ci;
1345   status = buffer_get_field(&buffer, &buffer_size, &file);
1346   if (status != 0)
1347     return send_response(sock, RESP_ERR,
1348                          "Usage: PENDING <filename>\n");
1350   status = has_privilege(sock, PRIV_HIGH);
1351   if (status <= 0)
1352     return status;
1354   get_abs_path(&file, file_tmp);
1356   pthread_mutex_lock(&cache_lock);
1357   ci = g_tree_lookup(cache_tree, file);
1358   if (ci == NULL)
1359   {
1360     pthread_mutex_unlock(&cache_lock);
1361     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1362   }
1364   for (int i=0; i < ci->values_num; i++)
1365     add_response_info(sock, "%s\n", ci->values[i]);
1367   pthread_mutex_unlock(&cache_lock);
1368   return send_response(sock, RESP_OK, "updates pending\n");
1369 } /* }}} static int handle_request_pending */
1371 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1372                                  char *buffer, size_t buffer_size)
1374   int status;
1375   gboolean found;
1376   char *file, file_tmp[PATH_MAX];
1378   status = buffer_get_field(&buffer, &buffer_size, &file);
1379   if (status != 0)
1380     return send_response(sock, RESP_ERR,
1381                          "Usage: FORGET <filename>\n");
1383   status = has_privilege(sock, PRIV_HIGH);
1384   if (status <= 0)
1385     return status;
1387   get_abs_path(&file, file_tmp);
1388   if (!check_file_access(file, sock)) return 0;
1390   pthread_mutex_lock(&cache_lock);
1391   found = g_tree_remove(cache_tree, file);
1392   pthread_mutex_unlock(&cache_lock);
1394   if (found == TRUE)
1395   {
1396     if (sock != NULL)
1397       journal_write("forget", file);
1399     return send_response(sock, RESP_OK, "Gone!\n");
1400   }
1401   else
1402     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1404   /* NOTREACHED */
1405   assert(1==0);
1406 } /* }}} static int handle_request_forget */
1408 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1410   cache_item_t *ci;
1412   pthread_mutex_lock(&cache_lock);
1414   ci = cache_queue_head;
1415   while (ci != NULL)
1416   {
1417     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1418     ci = ci->next;
1419   }
1421   pthread_mutex_unlock(&cache_lock);
1423   return send_response(sock, RESP_OK, "in queue.\n");
1424 } /* }}} int handle_request_queue */
1426 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1427                                   time_t now,
1428                                   char *buffer, size_t buffer_size)
1430   char *file, file_tmp[PATH_MAX];
1431   int values_num = 0;
1432   int status;
1433   char orig_buf[CMD_MAX];
1435   cache_item_t *ci;
1437   status = has_privilege(sock, PRIV_HIGH);
1438   if (status <= 0)
1439     return status;
1441   /* save it for the journal later */
1442   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1444   status = buffer_get_field (&buffer, &buffer_size, &file);
1445   if (status != 0)
1446     return send_response(sock, RESP_ERR,
1447                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1449   pthread_mutex_lock(&stats_lock);
1450   stats_updates_received++;
1451   pthread_mutex_unlock(&stats_lock);
1453   get_abs_path(&file, file_tmp);
1454   if (!check_file_access(file, sock)) return 0;
1456   pthread_mutex_lock (&cache_lock);
1457   ci = g_tree_lookup (cache_tree, file);
1459   if (ci == NULL) /* {{{ */
1460   {
1461     struct stat statbuf;
1463     /* don't hold the lock while we setup; stat(2) might block */
1464     pthread_mutex_unlock(&cache_lock);
1466     memset (&statbuf, 0, sizeof (statbuf));
1467     status = stat (file, &statbuf);
1468     if (status != 0)
1469     {
1470       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1472       status = errno;
1473       if (status == ENOENT)
1474         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1475       else
1476         return send_response(sock, RESP_ERR,
1477                              "stat failed with error %i.\n", status);
1478     }
1479     if (!S_ISREG (statbuf.st_mode))
1480       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1482     if (access(file, R_OK|W_OK) != 0)
1483       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1484                            file, rrd_strerror(errno));
1486     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1487     if (ci == NULL)
1488     {
1489       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1491       return send_response(sock, RESP_ERR, "malloc failed.\n");
1492     }
1493     memset (ci, 0, sizeof (cache_item_t));
1495     ci->file = strdup (file);
1496     if (ci->file == NULL)
1497     {
1498       free (ci);
1499       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1501       return send_response(sock, RESP_ERR, "strdup failed.\n");
1502     }
1504     wipe_ci_values(ci, now);
1505     ci->flags = CI_FLAGS_IN_TREE;
1506     pthread_cond_init(&ci->flushed, NULL);
1508     pthread_mutex_lock(&cache_lock);
1509     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1510   } /* }}} */
1511   assert (ci != NULL);
1513   /* don't re-write updates in replay mode */
1514   if (sock != NULL)
1515     journal_write("update", orig_buf);
1517   while (buffer_size > 0)
1518   {
1519     char **temp;
1520     char *value;
1521     time_t stamp;
1522     char *eostamp;
1524     status = buffer_get_field (&buffer, &buffer_size, &value);
1525     if (status != 0)
1526     {
1527       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1528       break;
1529     }
1531     /* make sure update time is always moving forward */
1532     stamp = strtol(value, &eostamp, 10);
1533     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1534     {
1535       pthread_mutex_unlock(&cache_lock);
1536       return send_response(sock, RESP_ERR,
1537                            "Cannot find timestamp in '%s'!\n", value);
1538     }
1539     else if (stamp <= ci->last_update_stamp)
1540     {
1541       pthread_mutex_unlock(&cache_lock);
1542       return send_response(sock, RESP_ERR,
1543                            "illegal attempt to update using time %ld when last"
1544                            " update time is %ld (minimum one second step)\n",
1545                            stamp, ci->last_update_stamp);
1546     }
1547     else
1548       ci->last_update_stamp = stamp;
1550     temp = (char **) rrd_realloc (ci->values,
1551         sizeof (char *) * (ci->values_num + 1));
1552     if (temp == NULL)
1553     {
1554       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1555       continue;
1556     }
1557     ci->values = temp;
1559     ci->values[ci->values_num] = strdup (value);
1560     if (ci->values[ci->values_num] == NULL)
1561     {
1562       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1563       continue;
1564     }
1565     ci->values_num++;
1567     values_num++;
1568   }
1570   if (((now - ci->last_flush_time) >= config_write_interval)
1571       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1572       && (ci->values_num > 0))
1573   {
1574     enqueue_cache_item (ci, TAIL);
1575   }
1577   pthread_mutex_unlock (&cache_lock);
1579   if (values_num < 1)
1580     return send_response(sock, RESP_ERR, "No values updated.\n");
1581   else
1582     return send_response(sock, RESP_OK,
1583                          "errors, enqueued %i value(s).\n", values_num);
1585   /* NOTREACHED */
1586   assert(1==0);
1588 } /* }}} int handle_request_update */
1590 /* we came across a "WROTE" entry during journal replay.
1591  * throw away any values that we have accumulated for this file
1592  */
1593 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1595   int i;
1596   cache_item_t *ci;
1597   const char *file = buffer;
1599   pthread_mutex_lock(&cache_lock);
1601   ci = g_tree_lookup(cache_tree, file);
1602   if (ci == NULL)
1603   {
1604     pthread_mutex_unlock(&cache_lock);
1605     return (0);
1606   }
1608   if (ci->values)
1609   {
1610     for (i=0; i < ci->values_num; i++)
1611       free(ci->values[i]);
1613     free(ci->values);
1614   }
1616   wipe_ci_values(ci, now);
1617   remove_from_queue(ci);
1619   pthread_mutex_unlock(&cache_lock);
1620   return (0);
1621 } /* }}} int handle_request_wrote */
1623 /* start "BATCH" processing */
1624 static int batch_start (listen_socket_t *sock) /* {{{ */
1626   int status;
1627   if (sock->batch_start)
1628     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1630   status = send_response(sock, RESP_OK,
1631                          "Go ahead.  End with dot '.' on its own line.\n");
1632   sock->batch_start = time(NULL);
1633   sock->batch_cmd = 0;
1635   return status;
1636 } /* }}} static int batch_start */
1638 /* finish "BATCH" processing and return results to the client */
1639 static int batch_done (listen_socket_t *sock) /* {{{ */
1641   assert(sock->batch_start);
1642   sock->batch_start = 0;
1643   sock->batch_cmd  = 0;
1644   return send_response(sock, RESP_OK, "errors\n");
1645 } /* }}} static int batch_done */
1647 /* if sock==NULL, we are in journal replay mode */
1648 static int handle_request (listen_socket_t *sock, /* {{{ */
1649                            time_t now,
1650                            char *buffer, size_t buffer_size)
1652   char *buffer_ptr;
1653   char *command;
1654   int status;
1656   assert (buffer[buffer_size - 1] == '\0');
1658   buffer_ptr = buffer;
1659   command = NULL;
1660   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1661   if (status != 0)
1662   {
1663     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1664     return (-1);
1665   }
1667   if (sock != NULL && sock->batch_start)
1668     sock->batch_cmd++;
1670   if (strcasecmp (command, "update") == 0)
1671     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1672   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1673   {
1674     /* this is only valid in replay mode */
1675     return (handle_request_wrote (buffer_ptr, now));
1676   }
1677   else if (strcasecmp (command, "flush") == 0)
1678     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1679   else if (strcasecmp (command, "flushall") == 0)
1680     return (handle_request_flushall(sock));
1681   else if (strcasecmp (command, "pending") == 0)
1682     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1683   else if (strcasecmp (command, "forget") == 0)
1684     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1685   else if (strcasecmp (command, "queue") == 0)
1686     return (handle_request_queue(sock));
1687   else if (strcasecmp (command, "stats") == 0)
1688     return (handle_request_stats (sock));
1689   else if (strcasecmp (command, "help") == 0)
1690     return (handle_request_help (sock, buffer_ptr, buffer_size));
1691   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1692     return batch_start(sock);
1693   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1694     return batch_done(sock);
1695   else if (strcasecmp (command, "quit") == 0)
1696     return -1;
1697   else
1698     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1700   /* NOTREACHED */
1701   assert(1==0);
1702 } /* }}} int handle_request */
1704 /* MUST NOT hold journal_lock before calling this */
1705 static void journal_rotate(void) /* {{{ */
1707   FILE *old_fh = NULL;
1708   int new_fd;
1710   if (journal_cur == NULL || journal_old == NULL)
1711     return;
1713   pthread_mutex_lock(&journal_lock);
1715   /* we rotate this way (rename before close) so that the we can release
1716    * the journal lock as fast as possible.  Journal writes to the new
1717    * journal can proceed immediately after the new file is opened.  The
1718    * fclose can then block without affecting new updates.
1719    */
1720   if (journal_fh != NULL)
1721   {
1722     old_fh = journal_fh;
1723     journal_fh = NULL;
1724     rename(journal_cur, journal_old);
1725     ++stats_journal_rotate;
1726   }
1728   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1729                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1730   if (new_fd >= 0)
1731   {
1732     journal_fh = fdopen(new_fd, "a");
1733     if (journal_fh == NULL)
1734       close(new_fd);
1735   }
1737   pthread_mutex_unlock(&journal_lock);
1739   if (old_fh != NULL)
1740     fclose(old_fh);
1742   if (journal_fh == NULL)
1743   {
1744     RRDD_LOG(LOG_CRIT,
1745              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1746              journal_cur, rrd_strerror(errno));
1748     RRDD_LOG(LOG_ERR,
1749              "JOURNALING DISABLED: All values will be flushed at shutdown");
1750     config_flush_at_shutdown = 1;
1751   }
1753 } /* }}} static void journal_rotate */
1755 static void journal_done(void) /* {{{ */
1757   if (journal_cur == NULL)
1758     return;
1760   pthread_mutex_lock(&journal_lock);
1761   if (journal_fh != NULL)
1762   {
1763     fclose(journal_fh);
1764     journal_fh = NULL;
1765   }
1767   if (config_flush_at_shutdown)
1768   {
1769     RRDD_LOG(LOG_INFO, "removing journals");
1770     unlink(journal_old);
1771     unlink(journal_cur);
1772   }
1773   else
1774   {
1775     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1776              "journals will be used at next startup");
1777   }
1779   pthread_mutex_unlock(&journal_lock);
1781 } /* }}} static void journal_done */
1783 static int journal_write(char *cmd, char *args) /* {{{ */
1785   int chars;
1787   if (journal_fh == NULL)
1788     return 0;
1790   pthread_mutex_lock(&journal_lock);
1791   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1792   pthread_mutex_unlock(&journal_lock);
1794   if (chars > 0)
1795   {
1796     pthread_mutex_lock(&stats_lock);
1797     stats_journal_bytes += chars;
1798     pthread_mutex_unlock(&stats_lock);
1799   }
1801   return chars;
1802 } /* }}} static int journal_write */
1804 static int journal_replay (const char *file) /* {{{ */
1806   FILE *fh;
1807   int entry_cnt = 0;
1808   int fail_cnt = 0;
1809   uint64_t line = 0;
1810   char entry[CMD_MAX];
1811   time_t now;
1813   if (file == NULL) return 0;
1815   {
1816     char *reason = "unknown error";
1817     int status = 0;
1818     struct stat statbuf;
1820     memset(&statbuf, 0, sizeof(statbuf));
1821     if (stat(file, &statbuf) != 0)
1822     {
1823       if (errno == ENOENT)
1824         return 0;
1826       reason = "stat error";
1827       status = errno;
1828     }
1829     else if (!S_ISREG(statbuf.st_mode))
1830     {
1831       reason = "not a regular file";
1832       status = EPERM;
1833     }
1834     if (statbuf.st_uid != daemon_uid)
1835     {
1836       reason = "not owned by daemon user";
1837       status = EACCES;
1838     }
1839     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1840     {
1841       reason = "must not be user/group writable";
1842       status = EACCES;
1843     }
1845     if (status != 0)
1846     {
1847       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1848                file, rrd_strerror(status), reason);
1849       return 0;
1850     }
1851   }
1853   fh = fopen(file, "r");
1854   if (fh == NULL)
1855   {
1856     if (errno != ENOENT)
1857       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1858                file, rrd_strerror(errno));
1859     return 0;
1860   }
1861   else
1862     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1864   now = time(NULL);
1866   while(!feof(fh))
1867   {
1868     size_t entry_len;
1870     ++line;
1871     if (fgets(entry, sizeof(entry), fh) == NULL)
1872       break;
1873     entry_len = strlen(entry);
1875     /* check \n termination in case journal writing crashed mid-line */
1876     if (entry_len == 0)
1877       continue;
1878     else if (entry[entry_len - 1] != '\n')
1879     {
1880       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1881       ++fail_cnt;
1882       continue;
1883     }
1885     entry[entry_len - 1] = '\0';
1887     if (handle_request(NULL, now, entry, entry_len) == 0)
1888       ++entry_cnt;
1889     else
1890       ++fail_cnt;
1891   }
1893   fclose(fh);
1895   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1896            entry_cnt, fail_cnt);
1898   return entry_cnt > 0 ? 1 : 0;
1899 } /* }}} static int journal_replay */
1901 static void journal_init(void) /* {{{ */
1903   int had_journal = 0;
1905   if (journal_cur == NULL) return;
1907   pthread_mutex_lock(&journal_lock);
1909   RRDD_LOG(LOG_INFO, "checking for journal files");
1911   had_journal += journal_replay(journal_old);
1912   had_journal += journal_replay(journal_cur);
1914   /* it must have been a crash.  start a flush */
1915   if (had_journal && config_flush_at_shutdown)
1916     flush_old_values(-1);
1918   pthread_mutex_unlock(&journal_lock);
1919   journal_rotate();
1921   RRDD_LOG(LOG_INFO, "journal processing complete");
1923 } /* }}} static void journal_init */
1925 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1927   assert(sock != NULL);
1929   free(sock->rbuf);  sock->rbuf = NULL;
1930   free(sock->wbuf);  sock->wbuf = NULL;
1931   free(sock);
1932 } /* }}} void free_listen_socket */
1934 static void close_connection(listen_socket_t *sock) /* {{{ */
1936   if (sock->fd >= 0)
1937   {
1938     close(sock->fd);
1939     sock->fd = -1;
1940   }
1942   free_listen_socket(sock);
1944 } /* }}} void close_connection */
1946 static void *connection_thread_main (void *args) /* {{{ */
1948   listen_socket_t *sock;
1949   int i;
1950   int fd;
1952   sock = (listen_socket_t *) args;
1953   fd = sock->fd;
1955   /* init read buffers */
1956   sock->next_read = sock->next_cmd = 0;
1957   sock->rbuf = malloc(RBUF_SIZE);
1958   if (sock->rbuf == NULL)
1959   {
1960     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1961     close_connection(sock);
1962     return NULL;
1963   }
1965   pthread_mutex_lock (&connection_threads_lock);
1966   {
1967     pthread_t *temp;
1969     temp = (pthread_t *) rrd_realloc (connection_threads,
1970         sizeof (pthread_t) * (connection_threads_num + 1));
1971     if (temp == NULL)
1972     {
1973       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1974     }
1975     else
1976     {
1977       connection_threads = temp;
1978       connection_threads[connection_threads_num] = pthread_self ();
1979       connection_threads_num++;
1980     }
1981   }
1982   pthread_mutex_unlock (&connection_threads_lock);
1984   while (do_shutdown == 0)
1985   {
1986     char *cmd;
1987     ssize_t cmd_len;
1988     ssize_t rbytes;
1989     time_t now;
1991     struct pollfd pollfd;
1992     int status;
1994     pollfd.fd = fd;
1995     pollfd.events = POLLIN | POLLPRI;
1996     pollfd.revents = 0;
1998     status = poll (&pollfd, 1, /* timeout = */ 500);
1999     if (do_shutdown)
2000       break;
2001     else if (status == 0) /* timeout */
2002       continue;
2003     else if (status < 0) /* error */
2004     {
2005       status = errno;
2006       if (status != EINTR)
2007         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2008       continue;
2009     }
2011     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2012       break;
2013     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2014     {
2015       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2016           "poll(2) returned something unexpected: %#04hx",
2017           pollfd.revents);
2018       break;
2019     }
2021     rbytes = read(fd, sock->rbuf + sock->next_read,
2022                   RBUF_SIZE - sock->next_read);
2023     if (rbytes < 0)
2024     {
2025       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2026       break;
2027     }
2028     else if (rbytes == 0)
2029       break; /* eof */
2031     sock->next_read += rbytes;
2033     if (sock->batch_start)
2034       now = sock->batch_start;
2035     else
2036       now = time(NULL);
2038     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2039     {
2040       status = handle_request (sock, now, cmd, cmd_len+1);
2041       if (status != 0)
2042         goto out_close;
2043     }
2044   }
2046 out_close:
2047   close_connection(sock);
2049   /* Remove this thread from the connection threads list */
2050   pthread_mutex_lock (&connection_threads_lock);
2051   {
2052     pthread_t self;
2053     pthread_t *temp;
2055     /* Find out own index in the array */
2056     self = pthread_self ();
2057     for (i = 0; i < connection_threads_num; i++)
2058       if (pthread_equal (connection_threads[i], self) != 0)
2059         break;
2060     assert (i < connection_threads_num);
2062     /* Move the trailing threads forward. */
2063     if (i < (connection_threads_num - 1))
2064     {
2065       memmove (connection_threads + i,
2066                connection_threads + i + 1,
2067                sizeof (pthread_t) * (connection_threads_num - i - 1));
2068     }
2070     connection_threads_num--;
2072     temp = rrd_realloc(connection_threads,
2073                    sizeof(*connection_threads) * connection_threads_num);
2074     if (connection_threads_num > 0 && temp == NULL)
2075       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2076     else
2077       connection_threads = temp;
2078   }
2079   pthread_mutex_unlock (&connection_threads_lock);
2081   return (NULL);
2082 } /* }}} void *connection_thread_main */
2084 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2086   int fd;
2087   struct sockaddr_un sa;
2088   listen_socket_t *temp;
2089   int status;
2090   const char *path;
2092   path = sock->addr;
2093   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2094     path += strlen("unix:");
2096   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2097       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2098   if (temp == NULL)
2099   {
2100     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2101     return (-1);
2102   }
2103   listen_fds = temp;
2104   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2106   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2107   if (fd < 0)
2108   {
2109     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2110              rrd_strerror(errno));
2111     return (-1);
2112   }
2114   memset (&sa, 0, sizeof (sa));
2115   sa.sun_family = AF_UNIX;
2116   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2118   /* if we've gotten this far, we own the pid file.  any daemon started
2119    * with the same args must not be alive.  therefore, ensure that we can
2120    * create the socket...
2121    */
2122   unlink(path);
2124   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2125   if (status != 0)
2126   {
2127     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2128              path, rrd_strerror(errno));
2129     close (fd);
2130     return (-1);
2131   }
2133   status = listen (fd, /* backlog = */ 10);
2134   if (status != 0)
2135   {
2136     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2137              path, rrd_strerror(errno));
2138     close (fd);
2139     unlink (path);
2140     return (-1);
2141   }
2143   listen_fds[listen_fds_num].fd = fd;
2144   listen_fds[listen_fds_num].family = PF_UNIX;
2145   strncpy(listen_fds[listen_fds_num].addr, path,
2146           sizeof (listen_fds[listen_fds_num].addr) - 1);
2147   listen_fds_num++;
2149   return (0);
2150 } /* }}} int open_listen_socket_unix */
2152 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2154   struct addrinfo ai_hints;
2155   struct addrinfo *ai_res;
2156   struct addrinfo *ai_ptr;
2157   char addr_copy[NI_MAXHOST];
2158   char *addr;
2159   char *port;
2160   int status;
2162   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2163   addr_copy[sizeof (addr_copy) - 1] = 0;
2164   addr = addr_copy;
2166   memset (&ai_hints, 0, sizeof (ai_hints));
2167   ai_hints.ai_flags = 0;
2168 #ifdef AI_ADDRCONFIG
2169   ai_hints.ai_flags |= AI_ADDRCONFIG;
2170 #endif
2171   ai_hints.ai_family = AF_UNSPEC;
2172   ai_hints.ai_socktype = SOCK_STREAM;
2174   port = NULL;
2175   if (*addr == '[') /* IPv6+port format */
2176   {
2177     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2178     addr++;
2180     port = strchr (addr, ']');
2181     if (port == NULL)
2182     {
2183       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2184       return (-1);
2185     }
2186     *port = 0;
2187     port++;
2189     if (*port == ':')
2190       port++;
2191     else if (*port == 0)
2192       port = NULL;
2193     else
2194     {
2195       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2196       return (-1);
2197     }
2198   } /* if (*addr = ']') */
2199   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2200   {
2201     port = rindex(addr, ':');
2202     if (port != NULL)
2203     {
2204       *port = 0;
2205       port++;
2206     }
2207   }
2208   ai_res = NULL;
2209   status = getaddrinfo (addr,
2210                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2211                         &ai_hints, &ai_res);
2212   if (status != 0)
2213   {
2214     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2215              addr, gai_strerror (status));
2216     return (-1);
2217   }
2219   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2220   {
2221     int fd;
2222     listen_socket_t *temp;
2223     int one = 1;
2225     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2226         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2227     if (temp == NULL)
2228     {
2229       fprintf (stderr,
2230                "rrdcached: open_listen_socket_network: realloc failed.\n");
2231       continue;
2232     }
2233     listen_fds = temp;
2234     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2236     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2237     if (fd < 0)
2238     {
2239       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2240                rrd_strerror(errno));
2241       continue;
2242     }
2244     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2246     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2247     if (status != 0)
2248     {
2249       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2250                sock->addr, rrd_strerror(errno));
2251       close (fd);
2252       continue;
2253     }
2255     status = listen (fd, /* backlog = */ 10);
2256     if (status != 0)
2257     {
2258       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2259                sock->addr, rrd_strerror(errno));
2260       close (fd);
2261       freeaddrinfo(ai_res);
2262       return (-1);
2263     }
2265     listen_fds[listen_fds_num].fd = fd;
2266     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2267     listen_fds_num++;
2268   } /* for (ai_ptr) */
2270   freeaddrinfo(ai_res);
2271   return (0);
2272 } /* }}} static int open_listen_socket_network */
2274 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2276   assert(sock != NULL);
2277   assert(sock->addr != NULL);
2279   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2280       || sock->addr[0] == '/')
2281     return (open_listen_socket_unix(sock));
2282   else
2283     return (open_listen_socket_network(sock));
2284 } /* }}} int open_listen_socket */
2286 static int close_listen_sockets (void) /* {{{ */
2288   size_t i;
2290   for (i = 0; i < listen_fds_num; i++)
2291   {
2292     close (listen_fds[i].fd);
2294     if (listen_fds[i].family == PF_UNIX)
2295       unlink(listen_fds[i].addr);
2296   }
2298   free (listen_fds);
2299   listen_fds = NULL;
2300   listen_fds_num = 0;
2302   return (0);
2303 } /* }}} int close_listen_sockets */
2305 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2307   struct pollfd *pollfds;
2308   int pollfds_num;
2309   int status;
2310   int i;
2312   if (listen_fds_num < 1)
2313   {
2314     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2315     return (NULL);
2316   }
2318   pollfds_num = listen_fds_num;
2319   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2320   if (pollfds == NULL)
2321   {
2322     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2323     return (NULL);
2324   }
2325   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2327   RRDD_LOG(LOG_INFO, "listening for connections");
2329   while (do_shutdown == 0)
2330   {
2331     for (i = 0; i < pollfds_num; i++)
2332     {
2333       pollfds[i].fd = listen_fds[i].fd;
2334       pollfds[i].events = POLLIN | POLLPRI;
2335       pollfds[i].revents = 0;
2336     }
2338     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2339     if (do_shutdown)
2340       break;
2341     else if (status == 0) /* timeout */
2342       continue;
2343     else if (status < 0) /* error */
2344     {
2345       status = errno;
2346       if (status != EINTR)
2347       {
2348         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2349       }
2350       continue;
2351     }
2353     for (i = 0; i < pollfds_num; i++)
2354     {
2355       listen_socket_t *client_sock;
2356       struct sockaddr_storage client_sa;
2357       socklen_t client_sa_size;
2358       pthread_t tid;
2359       pthread_attr_t attr;
2361       if (pollfds[i].revents == 0)
2362         continue;
2364       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2365       {
2366         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2367             "poll(2) returned something unexpected for listen FD #%i.",
2368             pollfds[i].fd);
2369         continue;
2370       }
2372       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2373       if (client_sock == NULL)
2374       {
2375         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2376         continue;
2377       }
2378       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2380       client_sa_size = sizeof (client_sa);
2381       client_sock->fd = accept (pollfds[i].fd,
2382           (struct sockaddr *) &client_sa, &client_sa_size);
2383       if (client_sock->fd < 0)
2384       {
2385         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2386         free(client_sock);
2387         continue;
2388       }
2390       pthread_attr_init (&attr);
2391       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2393       status = pthread_create (&tid, &attr, connection_thread_main,
2394                                client_sock);
2395       if (status != 0)
2396       {
2397         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2398         close_connection(client_sock);
2399         continue;
2400       }
2401     } /* for (pollfds_num) */
2402   } /* while (do_shutdown == 0) */
2404   RRDD_LOG(LOG_INFO, "starting shutdown");
2406   close_listen_sockets ();
2408   pthread_mutex_lock (&connection_threads_lock);
2409   while (connection_threads_num > 0)
2410   {
2411     pthread_t wait_for;
2413     wait_for = connection_threads[0];
2415     pthread_mutex_unlock (&connection_threads_lock);
2416     pthread_join (wait_for, /* retval = */ NULL);
2417     pthread_mutex_lock (&connection_threads_lock);
2418   }
2419   pthread_mutex_unlock (&connection_threads_lock);
2421   free(pollfds);
2423   return (NULL);
2424 } /* }}} void *listen_thread_main */
2426 static int daemonize (void) /* {{{ */
2428   int pid_fd;
2429   char *base_dir;
2431   daemon_uid = geteuid();
2433   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2434   if (pid_fd < 0)
2435     pid_fd = check_pidfile();
2436   if (pid_fd < 0)
2437     return pid_fd;
2439   /* open all the listen sockets */
2440   if (config_listen_address_list_len > 0)
2441   {
2442     for (int i = 0; i < config_listen_address_list_len; i++)
2443     {
2444       open_listen_socket (config_listen_address_list[i]);
2445       free_listen_socket (config_listen_address_list[i]);
2446     }
2448     free(config_listen_address_list);
2449   }
2450   else
2451   {
2452     listen_socket_t sock;
2453     memset(&sock, 0, sizeof(sock));
2454     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2455     open_listen_socket (&sock);
2456   }
2458   if (listen_fds_num < 1)
2459   {
2460     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2461     goto error;
2462   }
2464   if (!stay_foreground)
2465   {
2466     pid_t child;
2468     child = fork ();
2469     if (child < 0)
2470     {
2471       fprintf (stderr, "daemonize: fork(2) failed.\n");
2472       goto error;
2473     }
2474     else if (child > 0)
2475       exit(0);
2477     /* Become session leader */
2478     setsid ();
2480     /* Open the first three file descriptors to /dev/null */
2481     close (2);
2482     close (1);
2483     close (0);
2485     open ("/dev/null", O_RDWR);
2486     dup (0);
2487     dup (0);
2488   } /* if (!stay_foreground) */
2490   /* Change into the /tmp directory. */
2491   base_dir = (config_base_dir != NULL)
2492     ? config_base_dir
2493     : "/tmp";
2495   if (chdir (base_dir) != 0)
2496   {
2497     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2498     goto error;
2499   }
2501   install_signal_handlers();
2503   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2504   RRDD_LOG(LOG_INFO, "starting up");
2506   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2507                                 (GDestroyNotify) free_cache_item);
2508   if (cache_tree == NULL)
2509   {
2510     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2511     goto error;
2512   }
2514   return write_pidfile (pid_fd);
2516 error:
2517   remove_pidfile();
2518   return -1;
2519 } /* }}} int daemonize */
2521 static int cleanup (void) /* {{{ */
2523   do_shutdown++;
2525   pthread_cond_broadcast (&flush_cond);
2526   pthread_join (flush_thread, NULL);
2528   pthread_cond_broadcast (&queue_cond);
2529   for (int i = 0; i < config_queue_threads; i++)
2530     pthread_join (queue_threads[i], NULL);
2532   if (config_flush_at_shutdown)
2533   {
2534     assert(cache_queue_head == NULL);
2535     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2536   }
2538   journal_done();
2539   remove_pidfile ();
2541   free(queue_threads);
2542   free(config_base_dir);
2543   free(config_pid_file);
2544   free(journal_cur);
2545   free(journal_old);
2547   pthread_mutex_lock(&cache_lock);
2548   g_tree_destroy(cache_tree);
2550   RRDD_LOG(LOG_INFO, "goodbye");
2551   closelog ();
2553   return (0);
2554 } /* }}} int cleanup */
2556 static int read_options (int argc, char **argv) /* {{{ */
2558   int option;
2559   int status = 0;
2561   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2562   {
2563     switch (option)
2564     {
2565       case 'g':
2566         stay_foreground=1;
2567         break;
2569       case 'L':
2570       case 'l':
2571       {
2572         listen_socket_t **temp;
2573         listen_socket_t *new;
2575         new = malloc(sizeof(listen_socket_t));
2576         if (new == NULL)
2577         {
2578           fprintf(stderr, "read_options: malloc failed.\n");
2579           return(2);
2580         }
2581         memset(new, 0, sizeof(listen_socket_t));
2583         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2584             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2585         if (temp == NULL)
2586         {
2587           fprintf (stderr, "read_options: realloc failed.\n");
2588           return (2);
2589         }
2590         config_listen_address_list = temp;
2592         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2593         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2595         temp[config_listen_address_list_len] = new;
2596         config_listen_address_list_len++;
2597       }
2598       break;
2600       case 'f':
2601       {
2602         int temp;
2604         temp = atoi (optarg);
2605         if (temp > 0)
2606           config_flush_interval = temp;
2607         else
2608         {
2609           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2610           status = 3;
2611         }
2612       }
2613       break;
2615       case 'w':
2616       {
2617         int temp;
2619         temp = atoi (optarg);
2620         if (temp > 0)
2621           config_write_interval = temp;
2622         else
2623         {
2624           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2625           status = 2;
2626         }
2627       }
2628       break;
2630       case 'z':
2631       {
2632         int temp;
2634         temp = atoi(optarg);
2635         if (temp > 0)
2636           config_write_jitter = temp;
2637         else
2638         {
2639           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2640           status = 2;
2641         }
2643         break;
2644       }
2646       case 't':
2647       {
2648         int threads;
2649         threads = atoi(optarg);
2650         if (threads >= 1)
2651           config_queue_threads = threads;
2652         else
2653         {
2654           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2655           return 1;
2656         }
2657       }
2658       break;
2660       case 'B':
2661         config_write_base_only = 1;
2662         break;
2664       case 'b':
2665       {
2666         size_t len;
2667         char base_realpath[PATH_MAX];
2669         if (config_base_dir != NULL)
2670           free (config_base_dir);
2671         config_base_dir = strdup (optarg);
2672         if (config_base_dir == NULL)
2673         {
2674           fprintf (stderr, "read_options: strdup failed.\n");
2675           return (3);
2676         }
2678         /* make sure that the base directory is not resolved via
2679          * symbolic links.  this makes some performance-enhancing
2680          * assumptions possible (we don't have to resolve paths
2681          * that start with a "/")
2682          */
2683         if (realpath(config_base_dir, base_realpath) == NULL)
2684         {
2685           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2686           return 5;
2687         }
2688         else if (strncmp(config_base_dir,
2689                          base_realpath, sizeof(base_realpath)) != 0)
2690         {
2691           fprintf(stderr,
2692                   "Base directory (-b) resolved via file system links!\n"
2693                   "Please consult rrdcached '-b' documentation!\n"
2694                   "Consider specifying the real directory (%s)\n",
2695                   base_realpath);
2696           return 5;
2697         }
2699         len = strlen (config_base_dir);
2700         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2701         {
2702           config_base_dir[len - 1] = 0;
2703           len--;
2704         }
2706         if (len < 1)
2707         {
2708           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2709           return (4);
2710         }
2712         _config_base_dir_len = len;
2713       }
2714       break;
2716       case 'p':
2717       {
2718         if (config_pid_file != NULL)
2719           free (config_pid_file);
2720         config_pid_file = strdup (optarg);
2721         if (config_pid_file == NULL)
2722         {
2723           fprintf (stderr, "read_options: strdup failed.\n");
2724           return (3);
2725         }
2726       }
2727       break;
2729       case 'F':
2730         config_flush_at_shutdown = 1;
2731         break;
2733       case 'j':
2734       {
2735         struct stat statbuf;
2736         const char *dir = optarg;
2738         status = stat(dir, &statbuf);
2739         if (status != 0)
2740         {
2741           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2742           return 6;
2743         }
2745         if (!S_ISDIR(statbuf.st_mode)
2746             || access(dir, R_OK|W_OK|X_OK) != 0)
2747         {
2748           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2749                   errno ? rrd_strerror(errno) : "");
2750           return 6;
2751         }
2753         journal_cur = malloc(PATH_MAX + 1);
2754         journal_old = malloc(PATH_MAX + 1);
2755         if (journal_cur == NULL || journal_old == NULL)
2756         {
2757           fprintf(stderr, "malloc failure for journal files\n");
2758           return 6;
2759         }
2760         else 
2761         {
2762           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2763           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2764         }
2765       }
2766       break;
2768       case 'h':
2769       case '?':
2770         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2771             "\n"
2772             "Usage: rrdcached [options]\n"
2773             "\n"
2774             "Valid options are:\n"
2775             "  -l <address>  Socket address to listen to.\n"
2776             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2777             "  -w <seconds>  Interval in which to write data.\n"
2778             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2779             "  -t <threads>  Number of write threads.\n"
2780             "  -f <seconds>  Interval in which to flush dead data.\n"
2781             "  -p <file>     Location of the PID-file.\n"
2782             "  -b <dir>      Base directory to change to.\n"
2783             "  -B            Restrict file access to paths within -b <dir>\n"
2784             "  -g            Do not fork and run in the foreground.\n"
2785             "  -j <dir>      Directory in which to create the journal files.\n"
2786             "  -F            Always flush all updates at shutdown\n"
2787             "\n"
2788             "For more information and a detailed description of all options "
2789             "please refer\n"
2790             "to the rrdcached(1) manual page.\n",
2791             VERSION);
2792         status = -1;
2793         break;
2794     } /* switch (option) */
2795   } /* while (getopt) */
2797   /* advise the user when values are not sane */
2798   if (config_flush_interval < 2 * config_write_interval)
2799     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2800             " 2x write interval (-w) !\n");
2801   if (config_write_jitter > config_write_interval)
2802     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2803             " write interval (-w) !\n");
2805   if (config_write_base_only && config_base_dir == NULL)
2806     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2807             "  Consult the rrdcached documentation\n");
2809   if (journal_cur == NULL)
2810     config_flush_at_shutdown = 1;
2812   return (status);
2813 } /* }}} int read_options */
2815 int main (int argc, char **argv)
2817   int status;
2819   status = read_options (argc, argv);
2820   if (status != 0)
2821   {
2822     if (status < 0)
2823       status = 0;
2824     return (status);
2825   }
2827   status = daemonize ();
2828   if (status != 0)
2829   {
2830     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2831     return (1);
2832   }
2834   journal_init();
2836   /* start the queue threads */
2837   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2838   if (queue_threads == NULL)
2839   {
2840     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2841     cleanup();
2842     return (1);
2843   }
2844   for (int i = 0; i < config_queue_threads; i++)
2845   {
2846     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2847     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2848     if (status != 0)
2849     {
2850       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2851       cleanup();
2852       return (1);
2853     }
2854   }
2856   /* start the flush thread */
2857   memset(&flush_thread, 0, sizeof(flush_thread));
2858   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2859   if (status != 0)
2860   {
2861     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2862     cleanup();
2863     return (1);
2864   }
2866   listen_thread_main (NULL);
2867   cleanup ();
2869   return (0);
2870 } /* int main */
2872 /*
2873  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2874  */