Code

rrdcached: pull in rrd_config.h so we can use its defines
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111  * Types
112  */
113 typedef enum
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
159 struct callback_flush_data_s
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree          *cache_tree = NULL;
198 static cache_item_t   *cache_queue_head = NULL;
199 static cache_item_t   *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter   = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /* 
234  * Functions
235  */
236 static void sig_common (const char *sig) /* {{{ */
238   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239   do_shutdown++;
240   pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
245   sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
250   sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
255   config_flush_at_shutdown = 1;
256   sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
261   config_flush_at_shutdown = 0;
262   sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
267   /* These structures are static, because `sigaction' behaves weird if the are
268    * overwritten.. */
269   static struct sigaction sa_int;
270   static struct sigaction sa_term;
271   static struct sigaction sa_pipe;
272   static struct sigaction sa_usr1;
273   static struct sigaction sa_usr2;
275   /* Install signal handlers */
276   memset (&sa_int, 0, sizeof (sa_int));
277   sa_int.sa_handler = sig_int_handler;
278   sigaction (SIGINT, &sa_int, NULL);
280   memset (&sa_term, 0, sizeof (sa_term));
281   sa_term.sa_handler = sig_term_handler;
282   sigaction (SIGTERM, &sa_term, NULL);
284   memset (&sa_pipe, 0, sizeof (sa_pipe));
285   sa_pipe.sa_handler = SIG_IGN;
286   sigaction (SIGPIPE, &sa_pipe, NULL);
288   memset (&sa_pipe, 0, sizeof (sa_usr1));
289   sa_usr1.sa_handler = sig_usr1_handler;
290   sigaction (SIGUSR1, &sa_usr1, NULL);
292   memset (&sa_usr2, 0, sizeof (sa_usr2));
293   sa_usr2.sa_handler = sig_usr2_handler;
294   sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
300   int fd;
301   char *file;
303   file = (config_pid_file != NULL)
304     ? config_pid_file
305     : LOCALSTATEDIR "/run/rrdcached.pid";
307   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308   if (fd < 0)
309     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310             action, file, rrd_strerror(errno));
312   return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
318   int pid_fd;
319   pid_t pid;
320   char pid_str[16];
322   pid_fd = open_pidfile("open", O_RDWR);
323   if (pid_fd < 0)
324     return pid_fd;
326   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327     return -1;
329   pid = atoi(pid_str);
330   if (pid <= 0)
331     return -1;
333   /* another running process that we can signal COULD be
334    * a competing rrdcached */
335   if (pid != getpid() && kill(pid, 0) == 0)
336   {
337     fprintf(stderr,
338             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339     close(pid_fd);
340     return -1;
341   }
343   lseek(pid_fd, 0, SEEK_SET);
344   ftruncate(pid_fd, 0);
346   fprintf(stderr,
347           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348           "rrdcached: starting normally.\n", pid);
350   return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
355   pid_t pid;
356   FILE *fh;
358   pid = getpid ();
360   fh = fdopen (fd, "w");
361   if (fh == NULL)
362   {
363     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364     close(fd);
365     return (-1);
366   }
368   fprintf (fh, "%i\n", (int) pid);
369   fclose (fh);
371   return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
376   char *file;
377   int status;
379   file = (config_pid_file != NULL)
380     ? config_pid_file
381     : LOCALSTATEDIR "/run/rrdcached.pid";
383   status = unlink (file);
384   if (status == 0)
385     return (0);
386   return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
391   char *eol;
393   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394                sock->next_read - sock->next_cmd);
396   if (eol == NULL)
397   {
398     /* no commands left, move remainder back to front of rbuf */
399     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400             sock->next_read - sock->next_cmd);
401     sock->next_read -= sock->next_cmd;
402     sock->next_cmd = 0;
403     *len = 0;
404     return NULL;
405   }
406   else
407   {
408     char *cmd = sock->rbuf + sock->next_cmd;
409     *eol = '\0';
411     sock->next_cmd = eol - sock->rbuf + 1;
413     if (eol > sock->rbuf && *(eol-1) == '\r')
414       *(--eol) = '\0'; /* handle "\r\n" EOL */
416     *len = eol - cmd;
418     return cmd;
419   }
421   /* NOTREACHED */
422   assert(1==0);
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
428   char *new_buf;
430   assert(sock != NULL);
432   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433   if (new_buf == NULL)
434   {
435     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436     return -1;
437   }
439   strncpy(new_buf + sock->wbuf_len, str, len + 1);
441   sock->wbuf = new_buf;
442   sock->wbuf_len += len;
444   return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
450   va_list argp;
451   char buffer[CMD_MAX];
452   int len;
454   if (sock == NULL) return 0; /* journal replay mode */
455   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461   len = vsprintf(buffer, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465   {
466     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467     return -1;
468   }
470   return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
475   int lines = 0;
477   if (str != NULL)
478   {
479     while ((str = strchr(str, '\n')) != NULL)
480     {
481       ++lines;
482       ++str;
483     }
484   }
486   return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490  * returns 0 on success, -1 on error
491  * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493                           char *fmt, ...) /* {{{ */
495   va_list argp;
496   char buffer[CMD_MAX];
497   int lines;
498   ssize_t wrote;
499   int rclen, len;
501   if (sock == NULL) return rc;  /* journal replay mode */
503   if (sock->batch_start)
504   {
505     if (rc == RESP_OK)
506       return rc; /* no response on success during BATCH */
507     lines = sock->batch_cmd;
508   }
509   else if (rc == RESP_OK)
510     lines = count_lines(sock->wbuf);
511   else
512     lines = -1;
514   rclen = sprintf(buffer, "%d ", lines);
515   va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519   len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521   va_end(argp);
522   if (len < 0)
523     return -1;
525   len += rclen;
527   /* append the result to the wbuf, don't write to the user */
528   if (sock->batch_start)
529     return add_to_wbuf(sock, buffer, len);
531   /* first write must be complete */
532   if (len != write(sock->fd, buffer, len))
533   {
534     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535     return -1;
536   }
538   if (sock->wbuf != NULL && rc == RESP_OK)
539   {
540     wrote = 0;
541     while (wrote < sock->wbuf_len)
542     {
543       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544       if (wb <= 0)
545       {
546         RRDD_LOG(LOG_INFO, "send_response: could not write results");
547         return -1;
548       }
549       wrote += wb;
550     }
551   }
553   free(sock->wbuf); sock->wbuf = NULL;
554   sock->wbuf_len = 0;
556   return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
561   ci->values = NULL;
562   ci->values_num = 0;
564   ci->last_flush_time = when;
565   if (config_write_jitter > 0)
566     ci->last_flush_time += (random() % config_write_jitter);
569 /* remove_from_queue
570  * remove a "cache_item_t" item from the queue.
571  * must hold 'cache_lock' when calling this
572  */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
575   if (ci == NULL) return;
576   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578   if (ci->prev == NULL)
579     cache_queue_head = ci->next; /* reset head */
580   else
581     ci->prev->next = ci->next;
583   if (ci->next == NULL)
584     cache_queue_tail = ci->prev; /* reset the tail */
585   else
586     ci->next->prev = ci->prev;
588   ci->next = ci->prev = NULL;
589   ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
592 /* remove an entry from the tree and free all its resources.
593  * must hold 'cache lock' while calling this.
594  * returns 0 on success, otherwise errno */
595 static int forget_file(const char *file)
597   cache_item_t *ci;
599   ci = g_tree_lookup(cache_tree, file);
600   if (ci == NULL)
601     return ENOENT;
603   g_tree_remove (cache_tree, file);
604   remove_from_queue(ci);
606   for (int i=0; i < ci->values_num; i++)
607     free(ci->values[i]);
609   free (ci->values);
610   free (ci->file);
612   /* in case anyone is waiting */
613   pthread_cond_broadcast(&ci->flushed);
615   free (ci);
617   return 0;
618 } /* }}} static int forget_file */
620 /*
621  * enqueue_cache_item:
622  * `cache_lock' must be acquired before calling this function!
623  */
624 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
625     queue_side_t side)
627   if (ci == NULL)
628     return (-1);
630   if (ci->values_num == 0)
631     return (0);
633   if (side == HEAD)
634   {
635     if (cache_queue_head == ci)
636       return 0;
638     /* remove if further down in queue */
639     remove_from_queue(ci);
641     ci->prev = NULL;
642     ci->next = cache_queue_head;
643     if (ci->next != NULL)
644       ci->next->prev = ci;
645     cache_queue_head = ci;
647     if (cache_queue_tail == NULL)
648       cache_queue_tail = cache_queue_head;
649   }
650   else /* (side == TAIL) */
651   {
652     /* We don't move values back in the list.. */
653     if (ci->flags & CI_FLAGS_IN_QUEUE)
654       return (0);
656     assert (ci->next == NULL);
657     assert (ci->prev == NULL);
659     ci->prev = cache_queue_tail;
661     if (cache_queue_tail == NULL)
662       cache_queue_head = ci;
663     else
664       cache_queue_tail->next = ci;
666     cache_queue_tail = ci;
667   }
669   ci->flags |= CI_FLAGS_IN_QUEUE;
671   pthread_cond_broadcast(&cache_cond);
672   pthread_mutex_lock (&stats_lock);
673   stats_queue_length++;
674   pthread_mutex_unlock (&stats_lock);
676   return (0);
677 } /* }}} int enqueue_cache_item */
679 /*
680  * tree_callback_flush:
681  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
682  * while this is in progress.
683  */
684 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
685     gpointer data)
687   cache_item_t *ci;
688   callback_flush_data_t *cfd;
690   ci = (cache_item_t *) value;
691   cfd = (callback_flush_data_t *) data;
693   if (ci->flags & CI_FLAGS_IN_QUEUE)
694     return FALSE;
696   if ((ci->last_flush_time <= cfd->abs_timeout)
697       && (ci->values_num > 0))
698   {
699     enqueue_cache_item (ci, TAIL);
700   }
701   else if ((do_shutdown != 0)
702       && (ci->values_num > 0))
703   {
704     enqueue_cache_item (ci, TAIL);
705   }
706   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
707       && (ci->values_num <= 0))
708   {
709     char **temp;
711     temp = (char **) realloc (cfd->keys,
712         sizeof (char *) * (cfd->keys_num + 1));
713     if (temp == NULL)
714     {
715       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
716       return (FALSE);
717     }
718     cfd->keys = temp;
719     /* Make really sure this points to the _same_ place */
720     assert ((char *) key == ci->file);
721     cfd->keys[cfd->keys_num] = (char *) key;
722     cfd->keys_num++;
723   }
725   return (FALSE);
726 } /* }}} gboolean tree_callback_flush */
728 static int flush_old_values (int max_age)
730   callback_flush_data_t cfd;
731   size_t k;
733   memset (&cfd, 0, sizeof (cfd));
734   /* Pass the current time as user data so that we don't need to call
735    * `time' for each node. */
736   cfd.now = time (NULL);
737   cfd.keys = NULL;
738   cfd.keys_num = 0;
740   if (max_age > 0)
741     cfd.abs_timeout = cfd.now - max_age;
742   else
743     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
745   /* `tree_callback_flush' will return the keys of all values that haven't
746    * been touched in the last `config_flush_interval' seconds in `cfd'.
747    * The char*'s in this array point to the same memory as ci->file, so we
748    * don't need to free them separately. */
749   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
751   for (k = 0; k < cfd.keys_num; k++)
752   {
753     /* should never fail, since we have held the cache_lock
754      * the entire time */
755     assert( forget_file(cfd.keys[k]) == 0 );
756   }
758   if (cfd.keys != NULL)
759   {
760     free (cfd.keys);
761     cfd.keys = NULL;
762   }
764   return (0);
765 } /* int flush_old_values */
767 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
769   struct timeval now;
770   struct timespec next_flush;
771   int final_flush = 0; /* make sure we only flush once on shutdown */
773   gettimeofday (&now, NULL);
774   next_flush.tv_sec = now.tv_sec + config_flush_interval;
775   next_flush.tv_nsec = 1000 * now.tv_usec;
777   pthread_mutex_lock (&cache_lock);
778   while ((do_shutdown == 0) || (cache_queue_head != NULL))
779   {
780     cache_item_t *ci;
781     char *file;
782     char **values;
783     int values_num;
784     int status;
785     int i;
787     /* First, check if it's time to do the cache flush. */
788     gettimeofday (&now, NULL);
789     if ((now.tv_sec > next_flush.tv_sec)
790         || ((now.tv_sec == next_flush.tv_sec)
791           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
792     {
793       /* Flush all values that haven't been written in the last
794        * `config_write_interval' seconds. */
795       flush_old_values (config_write_interval);
797       /* Determine the time of the next cache flush. */
798       next_flush.tv_sec =
799         now.tv_sec + next_flush.tv_sec % config_flush_interval;
801       /* unlock the cache while we rotate so we don't block incoming
802        * updates if the fsync() blocks on disk I/O */
803       pthread_mutex_unlock(&cache_lock);
804       journal_rotate();
805       pthread_mutex_lock(&cache_lock);
806     }
808     /* Now, check if there's something to store away. If not, wait until
809      * something comes in or it's time to do the cache flush.  if we are
810      * shutting down, do not wait around.  */
811     if (cache_queue_head == NULL && !do_shutdown)
812     {
813       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
814       if ((status != 0) && (status != ETIMEDOUT))
815       {
816         RRDD_LOG (LOG_ERR, "queue_thread_main: "
817             "pthread_cond_timedwait returned %i.", status);
818       }
819     }
821     /* We're about to shut down */
822     if (do_shutdown != 0 && !final_flush++)
823     {
824       if (config_flush_at_shutdown)
825         flush_old_values (-1); /* flush everything */
826       else
827         break;
828     }
830     /* Check if a value has arrived. This may be NULL if we timed out or there
831      * was an interrupt such as a signal. */
832     if (cache_queue_head == NULL)
833       continue;
835     ci = cache_queue_head;
837     /* copy the relevant parts */
838     file = strdup (ci->file);
839     if (file == NULL)
840     {
841       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
842       continue;
843     }
845     assert(ci->values != NULL);
846     assert(ci->values_num > 0);
848     values = ci->values;
849     values_num = ci->values_num;
851     wipe_ci_values(ci, time(NULL));
852     remove_from_queue(ci);
854     pthread_mutex_lock (&stats_lock);
855     assert (stats_queue_length > 0);
856     stats_queue_length--;
857     pthread_mutex_unlock (&stats_lock);
859     pthread_mutex_unlock (&cache_lock);
861     rrd_clear_error ();
862     status = rrd_update_r (file, NULL, values_num, (void *) values);
863     if (status != 0)
864     {
865       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
866           "rrd_update_r (%s) failed with status %i. (%s)",
867           file, status, rrd_get_error());
868     }
870     journal_write("wrote", file);
871     pthread_cond_broadcast(&ci->flushed);
873     for (i = 0; i < values_num; i++)
874       free (values[i]);
876     free(values);
877     free(file);
879     if (status == 0)
880     {
881       pthread_mutex_lock (&stats_lock);
882       stats_updates_written++;
883       stats_data_sets_written += values_num;
884       pthread_mutex_unlock (&stats_lock);
885     }
887     pthread_mutex_lock (&cache_lock);
889     /* We're about to shut down */
890     if (do_shutdown != 0 && !final_flush++)
891     {
892       if (config_flush_at_shutdown)
893           flush_old_values (-1); /* flush everything */
894       else
895         break;
896     }
897   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
898   pthread_mutex_unlock (&cache_lock);
900   if (config_flush_at_shutdown)
901   {
902     assert(cache_queue_head == NULL);
903     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
904   }
906   journal_done();
908   return (NULL);
909 } /* }}} void *queue_thread_main */
911 static int buffer_get_field (char **buffer_ret, /* {{{ */
912     size_t *buffer_size_ret, char **field_ret)
914   char *buffer;
915   size_t buffer_pos;
916   size_t buffer_size;
917   char *field;
918   size_t field_size;
919   int status;
921   buffer = *buffer_ret;
922   buffer_pos = 0;
923   buffer_size = *buffer_size_ret;
924   field = *buffer_ret;
925   field_size = 0;
927   if (buffer_size <= 0)
928     return (-1);
930   /* This is ensured by `handle_request'. */
931   assert (buffer[buffer_size - 1] == '\0');
933   status = -1;
934   while (buffer_pos < buffer_size)
935   {
936     /* Check for end-of-field or end-of-buffer */
937     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
938     {
939       field[field_size] = 0;
940       field_size++;
941       buffer_pos++;
942       status = 0;
943       break;
944     }
945     /* Handle escaped characters. */
946     else if (buffer[buffer_pos] == '\\')
947     {
948       if (buffer_pos >= (buffer_size - 1))
949         break;
950       buffer_pos++;
951       field[field_size] = buffer[buffer_pos];
952       field_size++;
953       buffer_pos++;
954     }
955     /* Normal operation */ 
956     else
957     {
958       field[field_size] = buffer[buffer_pos];
959       field_size++;
960       buffer_pos++;
961     }
962   } /* while (buffer_pos < buffer_size) */
964   if (status != 0)
965     return (status);
967   *buffer_ret = buffer + buffer_pos;
968   *buffer_size_ret = buffer_size - buffer_pos;
969   *field_ret = field;
971   return (0);
972 } /* }}} int buffer_get_field */
974 /* if we're restricting writes to the base directory,
975  * check whether the file falls within the dir
976  * returns 1 if OK, otherwise 0
977  */
978 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
980   assert(file != NULL);
982   if (!config_write_base_only
983       || sock == NULL /* journal replay */
984       || config_base_dir == NULL)
985     return 1;
987   if (strstr(file, "../") != NULL) goto err;
989   /* relative paths without "../" are ok */
990   if (*file != '/') return 1;
992   /* file must be of the format base + "/" + <1+ char filename> */
993   if (strlen(file) < _config_base_dir_len + 2) goto err;
994   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
995   if (*(file + _config_base_dir_len) != '/') goto err;
997   return 1;
999 err:
1000   if (sock != NULL && sock->fd >= 0)
1001     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1003   return 0;
1004 } /* }}} static int check_file_access */
1006 /* when using a base dir, convert relative paths to absolute paths.
1007  * if necessary, modifies the "filename" pointer to point
1008  * to the new path created in "tmp".  "tmp" is provided
1009  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1010  *
1011  * this allows us to optimize for the expected case (absolute path)
1012  * with a no-op.
1013  */
1014 static void get_abs_path(char **filename, char *tmp)
1016   assert(tmp != NULL);
1017   assert(filename != NULL && *filename != NULL);
1019   if (config_base_dir == NULL || **filename == '/')
1020     return;
1022   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1023   *filename = tmp;
1024 } /* }}} static int get_abs_path */
1026 /* returns 1 if we have the required privilege level,
1027  * otherwise issue an error to the user on sock */
1028 static int has_privilege (listen_socket_t *sock, /* {{{ */
1029                           socket_privilege priv)
1031   if (sock == NULL) /* journal replay */
1032     return 1;
1034   if (sock->privilege >= priv)
1035     return 1;
1037   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1038 } /* }}} static int has_privilege */
1040 static int flush_file (const char *filename) /* {{{ */
1042   cache_item_t *ci;
1044   pthread_mutex_lock (&cache_lock);
1046   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1047   if (ci == NULL)
1048   {
1049     pthread_mutex_unlock (&cache_lock);
1050     return (ENOENT);
1051   }
1053   if (ci->values_num > 0)
1054   {
1055     /* Enqueue at head */
1056     enqueue_cache_item (ci, HEAD);
1057     pthread_cond_wait(&ci->flushed, &cache_lock);
1058   }
1060   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1061    * may have been purged during our cond_wait() */
1063   pthread_mutex_unlock(&cache_lock);
1065   return (0);
1066 } /* }}} int flush_file */
1068 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1069     char *buffer, size_t buffer_size)
1071   int status;
1072   char **help_text;
1073   char *command;
1075   char *help_help[2] =
1076   {
1077     "Command overview\n"
1078     ,
1079     "HELP [<command>]\n"
1080     "FLUSH <filename>\n"
1081     "FLUSHALL\n"
1082     "PENDING <filename>\n"
1083     "FORGET <filename>\n"
1084     "UPDATE <filename> <values> [<values> ...]\n"
1085     "BATCH\n"
1086     "STATS\n"
1087   };
1089   char *help_flush[2] =
1090   {
1091     "Help for FLUSH\n"
1092     ,
1093     "Usage: FLUSH <filename>\n"
1094     "\n"
1095     "Adds the given filename to the head of the update queue and returns\n"
1096     "after is has been dequeued.\n"
1097   };
1099   char *help_flushall[2] =
1100   {
1101     "Help for FLUSHALL\n"
1102     ,
1103     "Usage: FLUSHALL\n"
1104     "\n"
1105     "Triggers writing of all pending updates.  Returns immediately.\n"
1106   };
1108   char *help_pending[2] =
1109   {
1110     "Help for PENDING\n"
1111     ,
1112     "Usage: PENDING <filename>\n"
1113     "\n"
1114     "Shows any 'pending' updates for a file, in order.\n"
1115     "The updates shown have not yet been written to the underlying RRD file.\n"
1116   };
1118   char *help_forget[2] =
1119   {
1120     "Help for FORGET\n"
1121     ,
1122     "Usage: FORGET <filename>\n"
1123     "\n"
1124     "Removes the file completely from the cache.\n"
1125     "Any pending updates for the file will be lost.\n"
1126   };
1128   char *help_update[2] =
1129   {
1130     "Help for UPDATE\n"
1131     ,
1132     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1133     "\n"
1134     "Adds the given file to the internal cache if it is not yet known and\n"
1135     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1136     "for details.\n"
1137     "\n"
1138     "Each <values> has the following form:\n"
1139     "  <values> = <time>:<value>[:<value>[...]]\n"
1140     "See the rrdupdate(1) manpage for details.\n"
1141   };
1143   char *help_stats[2] =
1144   {
1145     "Help for STATS\n"
1146     ,
1147     "Usage: STATS\n"
1148     "\n"
1149     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1150     "a description of the values.\n"
1151   };
1153   char *help_batch[2] =
1154   {
1155     "Help for BATCH\n"
1156     ,
1157     "The 'BATCH' command permits the client to initiate a bulk load\n"
1158     "   of commands to rrdcached.\n"
1159     "\n"
1160     "Usage:\n"
1161     "\n"
1162     "    client: BATCH\n"
1163     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1164     "    client: command #1\n"
1165     "    client: command #2\n"
1166     "    client: ... and so on\n"
1167     "    client: .\n"
1168     "    server: 2 errors\n"
1169     "    server: 7 message for command #7\n"
1170     "    server: 9 message for command #9\n"
1171     "\n"
1172     "For more information, consult the rrdcached(1) documentation.\n"
1173   };
1175   status = buffer_get_field (&buffer, &buffer_size, &command);
1176   if (status != 0)
1177     help_text = help_help;
1178   else
1179   {
1180     if (strcasecmp (command, "update") == 0)
1181       help_text = help_update;
1182     else if (strcasecmp (command, "flush") == 0)
1183       help_text = help_flush;
1184     else if (strcasecmp (command, "flushall") == 0)
1185       help_text = help_flushall;
1186     else if (strcasecmp (command, "pending") == 0)
1187       help_text = help_pending;
1188     else if (strcasecmp (command, "forget") == 0)
1189       help_text = help_forget;
1190     else if (strcasecmp (command, "stats") == 0)
1191       help_text = help_stats;
1192     else if (strcasecmp (command, "batch") == 0)
1193       help_text = help_batch;
1194     else
1195       help_text = help_help;
1196   }
1198   add_response_info(sock, help_text[1]);
1199   return send_response(sock, RESP_OK, help_text[0]);
1200 } /* }}} int handle_request_help */
1202 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1204   uint64_t copy_queue_length;
1205   uint64_t copy_updates_received;
1206   uint64_t copy_flush_received;
1207   uint64_t copy_updates_written;
1208   uint64_t copy_data_sets_written;
1209   uint64_t copy_journal_bytes;
1210   uint64_t copy_journal_rotate;
1212   uint64_t tree_nodes_number;
1213   uint64_t tree_depth;
1215   pthread_mutex_lock (&stats_lock);
1216   copy_queue_length       = stats_queue_length;
1217   copy_updates_received   = stats_updates_received;
1218   copy_flush_received     = stats_flush_received;
1219   copy_updates_written    = stats_updates_written;
1220   copy_data_sets_written  = stats_data_sets_written;
1221   copy_journal_bytes      = stats_journal_bytes;
1222   copy_journal_rotate     = stats_journal_rotate;
1223   pthread_mutex_unlock (&stats_lock);
1225   pthread_mutex_lock (&cache_lock);
1226   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1227   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1228   pthread_mutex_unlock (&cache_lock);
1230   add_response_info(sock,
1231                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1232   add_response_info(sock,
1233                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1234   add_response_info(sock,
1235                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1236   add_response_info(sock,
1237                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1238   add_response_info(sock,
1239                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1240   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1241   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1242   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1243   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1245   send_response(sock, RESP_OK, "Statistics follow\n");
1247   return (0);
1248 } /* }}} int handle_request_stats */
1250 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1251     char *buffer, size_t buffer_size)
1253   char *file, file_tmp[PATH_MAX];
1254   int status;
1256   status = buffer_get_field (&buffer, &buffer_size, &file);
1257   if (status != 0)
1258   {
1259     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1260   }
1261   else
1262   {
1263     pthread_mutex_lock(&stats_lock);
1264     stats_flush_received++;
1265     pthread_mutex_unlock(&stats_lock);
1267     get_abs_path(&file, file_tmp);
1268     if (!check_file_access(file, sock)) return 0;
1270     status = flush_file (file);
1271     if (status == 0)
1272       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1273     else if (status == ENOENT)
1274     {
1275       /* no file in our tree; see whether it exists at all */
1276       struct stat statbuf;
1278       memset(&statbuf, 0, sizeof(statbuf));
1279       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1280         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1281       else
1282         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1283     }
1284     else if (status < 0)
1285       return send_response(sock, RESP_ERR, "Internal error.\n");
1286     else
1287       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1288   }
1290   /* NOTREACHED */
1291   assert(1==0);
1292 } /* }}} int handle_request_flush */
1294 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1296   int status;
1298   status = has_privilege(sock, PRIV_HIGH);
1299   if (status <= 0)
1300     return status;
1302   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1304   pthread_mutex_lock(&cache_lock);
1305   flush_old_values(-1);
1306   pthread_mutex_unlock(&cache_lock);
1308   return send_response(sock, RESP_OK, "Started flush.\n");
1309 } /* }}} static int handle_request_flushall */
1311 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1312                                   char *buffer, size_t buffer_size)
1314   int status;
1315   char *file, file_tmp[PATH_MAX];
1316   cache_item_t *ci;
1318   status = buffer_get_field(&buffer, &buffer_size, &file);
1319   if (status != 0)
1320     return send_response(sock, RESP_ERR,
1321                          "Usage: PENDING <filename>\n");
1323   status = has_privilege(sock, PRIV_HIGH);
1324   if (status <= 0)
1325     return status;
1327   get_abs_path(&file, file_tmp);
1329   pthread_mutex_lock(&cache_lock);
1330   ci = g_tree_lookup(cache_tree, file);
1331   if (ci == NULL)
1332   {
1333     pthread_mutex_unlock(&cache_lock);
1334     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1335   }
1337   for (int i=0; i < ci->values_num; i++)
1338     add_response_info(sock, "%s\n", ci->values[i]);
1340   pthread_mutex_unlock(&cache_lock);
1341   return send_response(sock, RESP_OK, "updates pending\n");
1342 } /* }}} static int handle_request_pending */
1344 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1345                                  char *buffer, size_t buffer_size)
1347   int status;
1348   char *file, file_tmp[PATH_MAX];
1350   status = buffer_get_field(&buffer, &buffer_size, &file);
1351   if (status != 0)
1352     return send_response(sock, RESP_ERR,
1353                          "Usage: FORGET <filename>\n");
1355   status = has_privilege(sock, PRIV_HIGH);
1356   if (status <= 0)
1357     return status;
1359   get_abs_path(&file, file_tmp);
1360   if (!check_file_access(file, sock)) return 0;
1362   pthread_mutex_lock(&cache_lock);
1363   status = forget_file(file);
1364   pthread_mutex_unlock(&cache_lock);
1366   if (status == 0)
1367   {
1368     if (sock != NULL)
1369       journal_write("forget", file);
1371     return send_response(sock, RESP_OK, "Gone!\n");
1372   }
1373   else
1374     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1375                          status < 0 ? "Internal error" : rrd_strerror(status));
1377   /* NOTREACHED */
1378   assert(1==0);
1379 } /* }}} static int handle_request_forget */
1381 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1382                                   time_t now,
1383                                   char *buffer, size_t buffer_size)
1385   char *file, file_tmp[PATH_MAX];
1386   int values_num = 0;
1387   int bad_timestamps = 0;
1388   int status;
1389   char orig_buf[CMD_MAX];
1391   cache_item_t *ci;
1393   status = has_privilege(sock, PRIV_HIGH);
1394   if (status <= 0)
1395     return status;
1397   /* save it for the journal later */
1398   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1400   status = buffer_get_field (&buffer, &buffer_size, &file);
1401   if (status != 0)
1402     return send_response(sock, RESP_ERR,
1403                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1405   pthread_mutex_lock(&stats_lock);
1406   stats_updates_received++;
1407   pthread_mutex_unlock(&stats_lock);
1409   get_abs_path(&file, file_tmp);
1410   if (!check_file_access(file, sock)) return 0;
1412   pthread_mutex_lock (&cache_lock);
1413   ci = g_tree_lookup (cache_tree, file);
1415   if (ci == NULL) /* {{{ */
1416   {
1417     struct stat statbuf;
1419     /* don't hold the lock while we setup; stat(2) might block */
1420     pthread_mutex_unlock(&cache_lock);
1422     memset (&statbuf, 0, sizeof (statbuf));
1423     status = stat (file, &statbuf);
1424     if (status != 0)
1425     {
1426       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1428       status = errno;
1429       if (status == ENOENT)
1430         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1431       else
1432         return send_response(sock, RESP_ERR,
1433                              "stat failed with error %i.\n", status);
1434     }
1435     if (!S_ISREG (statbuf.st_mode))
1436       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1438     if (access(file, R_OK|W_OK) != 0)
1439       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1440                            file, rrd_strerror(errno));
1442     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1443     if (ci == NULL)
1444     {
1445       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1447       return send_response(sock, RESP_ERR, "malloc failed.\n");
1448     }
1449     memset (ci, 0, sizeof (cache_item_t));
1451     ci->file = strdup (file);
1452     if (ci->file == NULL)
1453     {
1454       free (ci);
1455       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1457       return send_response(sock, RESP_ERR, "strdup failed.\n");
1458     }
1460     wipe_ci_values(ci, now);
1461     ci->flags = CI_FLAGS_IN_TREE;
1462     pthread_cond_init(&ci->flushed, NULL);
1464     pthread_mutex_lock(&cache_lock);
1465     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1466   } /* }}} */
1467   assert (ci != NULL);
1469   /* don't re-write updates in replay mode */
1470   if (sock != NULL)
1471     journal_write("update", orig_buf);
1473   while (buffer_size > 0)
1474   {
1475     char **temp;
1476     char *value;
1477     time_t stamp;
1478     char *eostamp;
1480     status = buffer_get_field (&buffer, &buffer_size, &value);
1481     if (status != 0)
1482     {
1483       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1484       break;
1485     }
1487     /* make sure update time is always moving forward */
1488     stamp = strtol(value, &eostamp, 10);
1489     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1490     {
1491       ++bad_timestamps;
1492       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1493       continue;
1494     }
1495     else if (stamp <= ci->last_update_stamp)
1496     {
1497       ++bad_timestamps;
1498       add_response_info(sock,
1499                         "illegal attempt to update using time %ld when"
1500                         " last update time is %ld (minimum one second step)\n",
1501                         stamp, ci->last_update_stamp);
1502       continue;
1503     }
1504     else
1505       ci->last_update_stamp = stamp;
1507     temp = (char **) realloc (ci->values,
1508         sizeof (char *) * (ci->values_num + 1));
1509     if (temp == NULL)
1510     {
1511       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1512       continue;
1513     }
1514     ci->values = temp;
1516     ci->values[ci->values_num] = strdup (value);
1517     if (ci->values[ci->values_num] == NULL)
1518     {
1519       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1520       continue;
1521     }
1522     ci->values_num++;
1524     values_num++;
1525   }
1527   if (((now - ci->last_flush_time) >= config_write_interval)
1528       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1529       && (ci->values_num > 0))
1530   {
1531     enqueue_cache_item (ci, TAIL);
1532   }
1534   pthread_mutex_unlock (&cache_lock);
1536   if (values_num < 1)
1537   {
1538     /* journal replay mode */
1539     if (sock == NULL) return RESP_ERR;
1541     /* if we had only one update attempt, then return the full
1542        error message... try to get the most information out
1543        of the limited error space allowed by the protocol
1544     */
1545     if (bad_timestamps == 1)
1546       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1547     else
1548       return send_response(sock, RESP_ERR,
1549                            "No values updated (%d bad timestamps).\n",
1550                            bad_timestamps);
1551   }
1552   else
1553     return send_response(sock, RESP_OK,
1554                          "errors, enqueued %i value(s).\n", values_num);
1556   /* NOTREACHED */
1557   assert(1==0);
1559 } /* }}} int handle_request_update */
1561 /* we came across a "WROTE" entry during journal replay.
1562  * throw away any values that we have accumulated for this file
1563  */
1564 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1566   int i;
1567   cache_item_t *ci;
1568   const char *file = buffer;
1570   pthread_mutex_lock(&cache_lock);
1572   ci = g_tree_lookup(cache_tree, file);
1573   if (ci == NULL)
1574   {
1575     pthread_mutex_unlock(&cache_lock);
1576     return (0);
1577   }
1579   if (ci->values)
1580   {
1581     for (i=0; i < ci->values_num; i++)
1582       free(ci->values[i]);
1584     free(ci->values);
1585   }
1587   wipe_ci_values(ci, now);
1588   remove_from_queue(ci);
1590   pthread_mutex_unlock(&cache_lock);
1591   return (0);
1592 } /* }}} int handle_request_wrote */
1594 /* start "BATCH" processing */
1595 static int batch_start (listen_socket_t *sock) /* {{{ */
1597   int status;
1598   if (sock->batch_start)
1599     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1601   status = send_response(sock, RESP_OK,
1602                          "Go ahead.  End with dot '.' on its own line.\n");
1603   sock->batch_start = time(NULL);
1604   sock->batch_cmd = 0;
1606   return status;
1607 } /* }}} static int batch_start */
1609 /* finish "BATCH" processing and return results to the client */
1610 static int batch_done (listen_socket_t *sock) /* {{{ */
1612   assert(sock->batch_start);
1613   sock->batch_start = 0;
1614   sock->batch_cmd  = 0;
1615   return send_response(sock, RESP_OK, "errors\n");
1616 } /* }}} static int batch_done */
1618 /* if sock==NULL, we are in journal replay mode */
1619 static int handle_request (listen_socket_t *sock, /* {{{ */
1620                            time_t now,
1621                            char *buffer, size_t buffer_size)
1623   char *buffer_ptr;
1624   char *command;
1625   int status;
1627   assert (buffer[buffer_size - 1] == '\0');
1629   buffer_ptr = buffer;
1630   command = NULL;
1631   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1632   if (status != 0)
1633   {
1634     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1635     return (-1);
1636   }
1638   if (sock != NULL && sock->batch_start)
1639     sock->batch_cmd++;
1641   if (strcasecmp (command, "update") == 0)
1642     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1643   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1644   {
1645     /* this is only valid in replay mode */
1646     return (handle_request_wrote (buffer_ptr, now));
1647   }
1648   else if (strcasecmp (command, "flush") == 0)
1649     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1650   else if (strcasecmp (command, "flushall") == 0)
1651     return (handle_request_flushall(sock));
1652   else if (strcasecmp (command, "pending") == 0)
1653     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1654   else if (strcasecmp (command, "forget") == 0)
1655     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1656   else if (strcasecmp (command, "stats") == 0)
1657     return (handle_request_stats (sock));
1658   else if (strcasecmp (command, "help") == 0)
1659     return (handle_request_help (sock, buffer_ptr, buffer_size));
1660   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1661     return batch_start(sock);
1662   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1663     return batch_done(sock);
1664   else
1665     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1667   /* NOTREACHED */
1668   assert(1==0);
1669 } /* }}} int handle_request */
1671 /* MUST NOT hold journal_lock before calling this */
1672 static void journal_rotate(void) /* {{{ */
1674   FILE *old_fh = NULL;
1675   int new_fd;
1677   if (journal_cur == NULL || journal_old == NULL)
1678     return;
1680   pthread_mutex_lock(&journal_lock);
1682   /* we rotate this way (rename before close) so that the we can release
1683    * the journal lock as fast as possible.  Journal writes to the new
1684    * journal can proceed immediately after the new file is opened.  The
1685    * fclose can then block without affecting new updates.
1686    */
1687   if (journal_fh != NULL)
1688   {
1689     old_fh = journal_fh;
1690     journal_fh = NULL;
1691     rename(journal_cur, journal_old);
1692     ++stats_journal_rotate;
1693   }
1695   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1696                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1697   if (new_fd >= 0)
1698   {
1699     journal_fh = fdopen(new_fd, "a");
1700     if (journal_fh == NULL)
1701       close(new_fd);
1702   }
1704   pthread_mutex_unlock(&journal_lock);
1706   if (old_fh != NULL)
1707     fclose(old_fh);
1709   if (journal_fh == NULL)
1710   {
1711     RRDD_LOG(LOG_CRIT,
1712              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1713              journal_cur, rrd_strerror(errno));
1715     RRDD_LOG(LOG_ERR,
1716              "JOURNALING DISABLED: All values will be flushed at shutdown");
1717     config_flush_at_shutdown = 1;
1718   }
1720 } /* }}} static void journal_rotate */
1722 static void journal_done(void) /* {{{ */
1724   if (journal_cur == NULL)
1725     return;
1727   pthread_mutex_lock(&journal_lock);
1728   if (journal_fh != NULL)
1729   {
1730     fclose(journal_fh);
1731     journal_fh = NULL;
1732   }
1734   if (config_flush_at_shutdown)
1735   {
1736     RRDD_LOG(LOG_INFO, "removing journals");
1737     unlink(journal_old);
1738     unlink(journal_cur);
1739   }
1740   else
1741   {
1742     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1743              "journals will be used at next startup");
1744   }
1746   pthread_mutex_unlock(&journal_lock);
1748 } /* }}} static void journal_done */
1750 static int journal_write(char *cmd, char *args) /* {{{ */
1752   int chars;
1754   if (journal_fh == NULL)
1755     return 0;
1757   pthread_mutex_lock(&journal_lock);
1758   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1759   pthread_mutex_unlock(&journal_lock);
1761   if (chars > 0)
1762   {
1763     pthread_mutex_lock(&stats_lock);
1764     stats_journal_bytes += chars;
1765     pthread_mutex_unlock(&stats_lock);
1766   }
1768   return chars;
1769 } /* }}} static int journal_write */
1771 static int journal_replay (const char *file) /* {{{ */
1773   FILE *fh;
1774   int entry_cnt = 0;
1775   int fail_cnt = 0;
1776   uint64_t line = 0;
1777   char entry[CMD_MAX];
1778   time_t now;
1780   if (file == NULL) return 0;
1782   {
1783     char *reason;
1784     int status = 0;
1785     struct stat statbuf;
1787     memset(&statbuf, 0, sizeof(statbuf));
1788     if (stat(file, &statbuf) != 0)
1789     {
1790       if (errno == ENOENT)
1791         return 0;
1793       reason = "stat error";
1794       status = errno;
1795     }
1796     else if (!S_ISREG(statbuf.st_mode))
1797     {
1798       reason = "not a regular file";
1799       status = EPERM;
1800     }
1801     if (statbuf.st_uid != daemon_uid)
1802     {
1803       reason = "not owned by daemon user";
1804       status = EACCES;
1805     }
1806     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1807     {
1808       reason = "must not be user/group writable";
1809       status = EACCES;
1810     }
1812     if (status != 0)
1813     {
1814       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1815                file, rrd_strerror(status), reason);
1816       return 0;
1817     }
1818   }
1820   fh = fopen(file, "r");
1821   if (fh == NULL)
1822   {
1823     if (errno != ENOENT)
1824       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1825                file, rrd_strerror(errno));
1826     return 0;
1827   }
1828   else
1829     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1831   now = time(NULL);
1833   while(!feof(fh))
1834   {
1835     size_t entry_len;
1837     ++line;
1838     if (fgets(entry, sizeof(entry), fh) == NULL)
1839       break;
1840     entry_len = strlen(entry);
1842     /* check \n termination in case journal writing crashed mid-line */
1843     if (entry_len == 0)
1844       continue;
1845     else if (entry[entry_len - 1] != '\n')
1846     {
1847       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1848       ++fail_cnt;
1849       continue;
1850     }
1852     entry[entry_len - 1] = '\0';
1854     if (handle_request(NULL, now, entry, entry_len) == 0)
1855       ++entry_cnt;
1856     else
1857       ++fail_cnt;
1858   }
1860   fclose(fh);
1862   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1863            entry_cnt, fail_cnt);
1865   return entry_cnt > 0 ? 1 : 0;
1866 } /* }}} static int journal_replay */
1868 static void journal_init(void) /* {{{ */
1870   int had_journal = 0;
1872   if (journal_cur == NULL) return;
1874   pthread_mutex_lock(&journal_lock);
1876   RRDD_LOG(LOG_INFO, "checking for journal files");
1878   had_journal += journal_replay(journal_old);
1879   had_journal += journal_replay(journal_cur);
1881   /* it must have been a crash.  start a flush */
1882   if (had_journal && config_flush_at_shutdown)
1883     flush_old_values(-1);
1885   pthread_mutex_unlock(&journal_lock);
1886   journal_rotate();
1888   RRDD_LOG(LOG_INFO, "journal processing complete");
1890 } /* }}} static void journal_init */
1892 static void close_connection(listen_socket_t *sock)
1894   close(sock->fd) ;  sock->fd   = -1;
1895   free(sock->rbuf);  sock->rbuf = NULL;
1896   free(sock->wbuf);  sock->wbuf = NULL;
1898   free(sock);
1901 static void *connection_thread_main (void *args) /* {{{ */
1903   pthread_t self;
1904   listen_socket_t *sock;
1905   int i;
1906   int fd;
1908   sock = (listen_socket_t *) args;
1909   fd = sock->fd;
1911   /* init read buffers */
1912   sock->next_read = sock->next_cmd = 0;
1913   sock->rbuf = malloc(RBUF_SIZE);
1914   if (sock->rbuf == NULL)
1915   {
1916     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1917     close_connection(sock);
1918     return NULL;
1919   }
1921   pthread_mutex_lock (&connection_threads_lock);
1922   {
1923     pthread_t *temp;
1925     temp = (pthread_t *) realloc (connection_threads,
1926         sizeof (pthread_t) * (connection_threads_num + 1));
1927     if (temp == NULL)
1928     {
1929       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1930     }
1931     else
1932     {
1933       connection_threads = temp;
1934       connection_threads[connection_threads_num] = pthread_self ();
1935       connection_threads_num++;
1936     }
1937   }
1938   pthread_mutex_unlock (&connection_threads_lock);
1940   while (do_shutdown == 0)
1941   {
1942     char *cmd;
1943     ssize_t cmd_len;
1944     ssize_t rbytes;
1945     time_t now;
1947     struct pollfd pollfd;
1948     int status;
1950     pollfd.fd = fd;
1951     pollfd.events = POLLIN | POLLPRI;
1952     pollfd.revents = 0;
1954     status = poll (&pollfd, 1, /* timeout = */ 500);
1955     if (do_shutdown)
1956       break;
1957     else if (status == 0) /* timeout */
1958       continue;
1959     else if (status < 0) /* error */
1960     {
1961       status = errno;
1962       if (status != EINTR)
1963         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1964       continue;
1965     }
1967     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1968       break;
1969     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1970     {
1971       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1972           "poll(2) returned something unexpected: %#04hx",
1973           pollfd.revents);
1974       break;
1975     }
1977     rbytes = read(fd, sock->rbuf + sock->next_read,
1978                   RBUF_SIZE - sock->next_read);
1979     if (rbytes < 0)
1980     {
1981       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1982       break;
1983     }
1984     else if (rbytes == 0)
1985       break; /* eof */
1987     sock->next_read += rbytes;
1989     if (sock->batch_start)
1990       now = sock->batch_start;
1991     else
1992       now = time(NULL);
1994     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1995     {
1996       status = handle_request (sock, now, cmd, cmd_len+1);
1997       if (status != 0)
1998         goto out_close;
1999     }
2000   }
2002 out_close:
2003   close_connection(sock);
2005   self = pthread_self ();
2006   /* Remove this thread from the connection threads list */
2007   pthread_mutex_lock (&connection_threads_lock);
2008   /* Find out own index in the array */
2009   for (i = 0; i < connection_threads_num; i++)
2010     if (pthread_equal (connection_threads[i], self) != 0)
2011       break;
2012   assert (i < connection_threads_num);
2014   /* Move the trailing threads forward. */
2015   if (i < (connection_threads_num - 1))
2016   {
2017     memmove (connection_threads + i,
2018         connection_threads + i + 1,
2019         sizeof (pthread_t) * (connection_threads_num - i - 1));
2020   }
2022   connection_threads_num--;
2023   pthread_mutex_unlock (&connection_threads_lock);
2025   return (NULL);
2026 } /* }}} void *connection_thread_main */
2028 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2030   int fd;
2031   struct sockaddr_un sa;
2032   listen_socket_t *temp;
2033   int status;
2034   const char *path;
2036   path = sock->addr;
2037   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2038     path += strlen("unix:");
2040   temp = (listen_socket_t *) realloc (listen_fds,
2041       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2042   if (temp == NULL)
2043   {
2044     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2045     return (-1);
2046   }
2047   listen_fds = temp;
2048   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2050   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2051   if (fd < 0)
2052   {
2053     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2054              rrd_strerror(errno));
2055     return (-1);
2056   }
2058   memset (&sa, 0, sizeof (sa));
2059   sa.sun_family = AF_UNIX;
2060   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2062   /* if we've gotten this far, we own the pid file.  any daemon started
2063    * with the same args must not be alive.  therefore, ensure that we can
2064    * create the socket...
2065    */
2066   unlink(path);
2068   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2069   if (status != 0)
2070   {
2071     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2072              path, rrd_strerror(errno));
2073     close (fd);
2074     return (-1);
2075   }
2077   status = listen (fd, /* backlog = */ 10);
2078   if (status != 0)
2079   {
2080     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2081              path, rrd_strerror(errno));
2082     close (fd);
2083     unlink (path);
2084     return (-1);
2085   }
2087   listen_fds[listen_fds_num].fd = fd;
2088   listen_fds[listen_fds_num].family = PF_UNIX;
2089   strncpy(listen_fds[listen_fds_num].addr, path,
2090           sizeof (listen_fds[listen_fds_num].addr) - 1);
2091   listen_fds_num++;
2093   return (0);
2094 } /* }}} int open_listen_socket_unix */
2096 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2098   struct addrinfo ai_hints;
2099   struct addrinfo *ai_res;
2100   struct addrinfo *ai_ptr;
2101   char addr_copy[NI_MAXHOST];
2102   char *addr;
2103   char *port;
2104   int status;
2106   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2107   addr_copy[sizeof (addr_copy) - 1] = 0;
2108   addr = addr_copy;
2110   memset (&ai_hints, 0, sizeof (ai_hints));
2111   ai_hints.ai_flags = 0;
2112 #ifdef AI_ADDRCONFIG
2113   ai_hints.ai_flags |= AI_ADDRCONFIG;
2114 #endif
2115   ai_hints.ai_family = AF_UNSPEC;
2116   ai_hints.ai_socktype = SOCK_STREAM;
2118   port = NULL;
2119   if (*addr == '[') /* IPv6+port format */
2120   {
2121     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2122     addr++;
2124     port = strchr (addr, ']');
2125     if (port == NULL)
2126     {
2127       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2128       return (-1);
2129     }
2130     *port = 0;
2131     port++;
2133     if (*port == ':')
2134       port++;
2135     else if (*port == 0)
2136       port = NULL;
2137     else
2138     {
2139       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2140       return (-1);
2141     }
2142   } /* if (*addr = ']') */
2143   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2144   {
2145     port = rindex(addr, ':');
2146     if (port != NULL)
2147     {
2148       *port = 0;
2149       port++;
2150     }
2151   }
2152   ai_res = NULL;
2153   status = getaddrinfo (addr,
2154                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2155                         &ai_hints, &ai_res);
2156   if (status != 0)
2157   {
2158     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2159              addr, gai_strerror (status));
2160     return (-1);
2161   }
2163   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2164   {
2165     int fd;
2166     listen_socket_t *temp;
2167     int one = 1;
2169     temp = (listen_socket_t *) realloc (listen_fds,
2170         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2171     if (temp == NULL)
2172     {
2173       fprintf (stderr,
2174                "rrdcached: open_listen_socket_network: realloc failed.\n");
2175       continue;
2176     }
2177     listen_fds = temp;
2178     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2180     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2181     if (fd < 0)
2182     {
2183       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2184                rrd_strerror(errno));
2185       continue;
2186     }
2188     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2190     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2191     if (status != 0)
2192     {
2193       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2194                sock->addr, rrd_strerror(errno));
2195       close (fd);
2196       continue;
2197     }
2199     status = listen (fd, /* backlog = */ 10);
2200     if (status != 0)
2201     {
2202       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2203                sock->addr, rrd_strerror(errno));
2204       close (fd);
2205       return (-1);
2206     }
2208     listen_fds[listen_fds_num].fd = fd;
2209     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2210     listen_fds_num++;
2211   } /* for (ai_ptr) */
2213   return (0);
2214 } /* }}} static int open_listen_socket_network */
2216 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2218   assert(sock != NULL);
2219   assert(sock->addr != NULL);
2221   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2222       || sock->addr[0] == '/')
2223     return (open_listen_socket_unix(sock));
2224   else
2225     return (open_listen_socket_network(sock));
2226 } /* }}} int open_listen_socket */
2228 static int close_listen_sockets (void) /* {{{ */
2230   size_t i;
2232   for (i = 0; i < listen_fds_num; i++)
2233   {
2234     close (listen_fds[i].fd);
2236     if (listen_fds[i].family == PF_UNIX)
2237       unlink(listen_fds[i].addr);
2238   }
2240   free (listen_fds);
2241   listen_fds = NULL;
2242   listen_fds_num = 0;
2244   return (0);
2245 } /* }}} int close_listen_sockets */
2247 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2249   struct pollfd *pollfds;
2250   int pollfds_num;
2251   int status;
2252   int i;
2254   if (listen_fds_num < 1)
2255   {
2256     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2257     return (NULL);
2258   }
2260   pollfds_num = listen_fds_num;
2261   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2262   if (pollfds == NULL)
2263   {
2264     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2265     return (NULL);
2266   }
2267   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2269   RRDD_LOG(LOG_INFO, "listening for connections");
2271   while (do_shutdown == 0)
2272   {
2273     assert (pollfds_num == ((int) listen_fds_num));
2274     for (i = 0; i < pollfds_num; i++)
2275     {
2276       pollfds[i].fd = listen_fds[i].fd;
2277       pollfds[i].events = POLLIN | POLLPRI;
2278       pollfds[i].revents = 0;
2279     }
2281     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2282     if (do_shutdown)
2283       break;
2284     else if (status == 0) /* timeout */
2285       continue;
2286     else if (status < 0) /* error */
2287     {
2288       status = errno;
2289       if (status != EINTR)
2290       {
2291         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2292       }
2293       continue;
2294     }
2296     for (i = 0; i < pollfds_num; i++)
2297     {
2298       listen_socket_t *client_sock;
2299       struct sockaddr_storage client_sa;
2300       socklen_t client_sa_size;
2301       pthread_t tid;
2302       pthread_attr_t attr;
2304       if (pollfds[i].revents == 0)
2305         continue;
2307       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2308       {
2309         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2310             "poll(2) returned something unexpected for listen FD #%i.",
2311             pollfds[i].fd);
2312         continue;
2313       }
2315       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2316       if (client_sock == NULL)
2317       {
2318         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2319         continue;
2320       }
2321       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2323       client_sa_size = sizeof (client_sa);
2324       client_sock->fd = accept (pollfds[i].fd,
2325           (struct sockaddr *) &client_sa, &client_sa_size);
2326       if (client_sock->fd < 0)
2327       {
2328         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2329         free(client_sock);
2330         continue;
2331       }
2333       pthread_attr_init (&attr);
2334       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2336       status = pthread_create (&tid, &attr, connection_thread_main,
2337                                client_sock);
2338       if (status != 0)
2339       {
2340         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2341         close_connection(client_sock);
2342         continue;
2343       }
2344     } /* for (pollfds_num) */
2345   } /* while (do_shutdown == 0) */
2347   RRDD_LOG(LOG_INFO, "starting shutdown");
2349   close_listen_sockets ();
2351   pthread_mutex_lock (&connection_threads_lock);
2352   while (connection_threads_num > 0)
2353   {
2354     pthread_t wait_for;
2356     wait_for = connection_threads[0];
2358     pthread_mutex_unlock (&connection_threads_lock);
2359     pthread_join (wait_for, /* retval = */ NULL);
2360     pthread_mutex_lock (&connection_threads_lock);
2361   }
2362   pthread_mutex_unlock (&connection_threads_lock);
2364   return (NULL);
2365 } /* }}} void *listen_thread_main */
2367 static int daemonize (void) /* {{{ */
2369   int pid_fd;
2370   char *base_dir;
2372   daemon_uid = geteuid();
2374   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2375   if (pid_fd < 0)
2376     pid_fd = check_pidfile();
2377   if (pid_fd < 0)
2378     return pid_fd;
2380   /* open all the listen sockets */
2381   if (config_listen_address_list_len > 0)
2382   {
2383     for (int i = 0; i < config_listen_address_list_len; i++)
2384       open_listen_socket (config_listen_address_list[i]);
2385   }
2386   else
2387   {
2388     listen_socket_t sock;
2389     memset(&sock, 0, sizeof(sock));
2390     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2391     open_listen_socket (&sock);
2392   }
2394   if (listen_fds_num < 1)
2395   {
2396     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2397     goto error;
2398   }
2400   if (!stay_foreground)
2401   {
2402     pid_t child;
2404     child = fork ();
2405     if (child < 0)
2406     {
2407       fprintf (stderr, "daemonize: fork(2) failed.\n");
2408       goto error;
2409     }
2410     else if (child > 0)
2411       exit(0);
2413     /* Become session leader */
2414     setsid ();
2416     /* Open the first three file descriptors to /dev/null */
2417     close (2);
2418     close (1);
2419     close (0);
2421     open ("/dev/null", O_RDWR);
2422     dup (0);
2423     dup (0);
2424   } /* if (!stay_foreground) */
2426   /* Change into the /tmp directory. */
2427   base_dir = (config_base_dir != NULL)
2428     ? config_base_dir
2429     : "/tmp";
2431   if (chdir (base_dir) != 0)
2432   {
2433     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2434     goto error;
2435   }
2437   install_signal_handlers();
2439   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2440   RRDD_LOG(LOG_INFO, "starting up");
2442   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2443   if (cache_tree == NULL)
2444   {
2445     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2446     goto error;
2447   }
2449   return write_pidfile (pid_fd);
2451 error:
2452   remove_pidfile();
2453   return -1;
2454 } /* }}} int daemonize */
2456 static int cleanup (void) /* {{{ */
2458   do_shutdown++;
2460   pthread_cond_signal (&cache_cond);
2461   pthread_join (queue_thread, /* return = */ NULL);
2463   remove_pidfile ();
2465   RRDD_LOG(LOG_INFO, "goodbye");
2466   closelog ();
2468   return (0);
2469 } /* }}} int cleanup */
2471 static int read_options (int argc, char **argv) /* {{{ */
2473   int option;
2474   int status = 0;
2476   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2477   {
2478     switch (option)
2479     {
2480       case 'g':
2481         stay_foreground=1;
2482         break;
2484       case 'L':
2485       case 'l':
2486       {
2487         listen_socket_t **temp;
2488         listen_socket_t *new;
2490         new = malloc(sizeof(listen_socket_t));
2491         if (new == NULL)
2492         {
2493           fprintf(stderr, "read_options: malloc failed.\n");
2494           return(2);
2495         }
2496         memset(new, 0, sizeof(listen_socket_t));
2498         temp = (listen_socket_t **) realloc (config_listen_address_list,
2499             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2500         if (temp == NULL)
2501         {
2502           fprintf (stderr, "read_options: realloc failed.\n");
2503           return (2);
2504         }
2505         config_listen_address_list = temp;
2507         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2508         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2510         temp[config_listen_address_list_len] = new;
2511         config_listen_address_list_len++;
2512       }
2513       break;
2515       case 'f':
2516       {
2517         int temp;
2519         temp = atoi (optarg);
2520         if (temp > 0)
2521           config_flush_interval = temp;
2522         else
2523         {
2524           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2525           status = 3;
2526         }
2527       }
2528       break;
2530       case 'w':
2531       {
2532         int temp;
2534         temp = atoi (optarg);
2535         if (temp > 0)
2536           config_write_interval = temp;
2537         else
2538         {
2539           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2540           status = 2;
2541         }
2542       }
2543       break;
2545       case 'z':
2546       {
2547         int temp;
2549         temp = atoi(optarg);
2550         if (temp > 0)
2551           config_write_jitter = temp;
2552         else
2553         {
2554           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2555           status = 2;
2556         }
2558         break;
2559       }
2561       case 'B':
2562         config_write_base_only = 1;
2563         break;
2565       case 'b':
2566       {
2567         size_t len;
2568         char base_realpath[PATH_MAX];
2570         if (config_base_dir != NULL)
2571           free (config_base_dir);
2572         config_base_dir = strdup (optarg);
2573         if (config_base_dir == NULL)
2574         {
2575           fprintf (stderr, "read_options: strdup failed.\n");
2576           return (3);
2577         }
2579         /* make sure that the base directory is not resolved via
2580          * symbolic links.  this makes some performance-enhancing
2581          * assumptions possible (we don't have to resolve paths
2582          * that start with a "/")
2583          */
2584         if (realpath(config_base_dir, base_realpath) == NULL)
2585         {
2586           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2587           return 5;
2588         }
2589         else if (strncmp(config_base_dir,
2590                          base_realpath, sizeof(base_realpath)) != 0)
2591         {
2592           fprintf(stderr,
2593                   "Base directory (-b) resolved via file system links!\n"
2594                   "Please consult rrdcached '-b' documentation!\n"
2595                   "Consider specifying the real directory (%s)\n",
2596                   base_realpath);
2597           return 5;
2598         }
2600         len = strlen (config_base_dir);
2601         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2602         {
2603           config_base_dir[len - 1] = 0;
2604           len--;
2605         }
2607         if (len < 1)
2608         {
2609           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2610           return (4);
2611         }
2613         _config_base_dir_len = len;
2614       }
2615       break;
2617       case 'p':
2618       {
2619         if (config_pid_file != NULL)
2620           free (config_pid_file);
2621         config_pid_file = strdup (optarg);
2622         if (config_pid_file == NULL)
2623         {
2624           fprintf (stderr, "read_options: strdup failed.\n");
2625           return (3);
2626         }
2627       }
2628       break;
2630       case 'F':
2631         config_flush_at_shutdown = 1;
2632         break;
2634       case 'j':
2635       {
2636         struct stat statbuf;
2637         const char *dir = optarg;
2639         status = stat(dir, &statbuf);
2640         if (status != 0)
2641         {
2642           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2643           return 6;
2644         }
2646         if (!S_ISDIR(statbuf.st_mode)
2647             || access(dir, R_OK|W_OK|X_OK) != 0)
2648         {
2649           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2650                   errno ? rrd_strerror(errno) : "");
2651           return 6;
2652         }
2654         journal_cur = malloc(PATH_MAX + 1);
2655         journal_old = malloc(PATH_MAX + 1);
2656         if (journal_cur == NULL || journal_old == NULL)
2657         {
2658           fprintf(stderr, "malloc failure for journal files\n");
2659           return 6;
2660         }
2661         else 
2662         {
2663           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2664           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2665         }
2666       }
2667       break;
2669       case 'h':
2670       case '?':
2671         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2672             "\n"
2673             "Usage: rrdcached [options]\n"
2674             "\n"
2675             "Valid options are:\n"
2676             "  -l <address>  Socket address to listen to.\n"
2677             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2678             "  -w <seconds>  Interval in which to write data.\n"
2679             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2680             "  -f <seconds>  Interval in which to flush dead data.\n"
2681             "  -p <file>     Location of the PID-file.\n"
2682             "  -b <dir>      Base directory to change to.\n"
2683             "  -B            Restrict file access to paths within -b <dir>\n"
2684             "  -g            Do not fork and run in the foreground.\n"
2685             "  -j <dir>      Directory in which to create the journal files.\n"
2686             "  -F            Always flush all updates at shutdown\n"
2687             "\n"
2688             "For more information and a detailed description of all options "
2689             "please refer\n"
2690             "to the rrdcached(1) manual page.\n",
2691             VERSION);
2692         status = -1;
2693         break;
2694     } /* switch (option) */
2695   } /* while (getopt) */
2697   /* advise the user when values are not sane */
2698   if (config_flush_interval < 2 * config_write_interval)
2699     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2700             " 2x write interval (-w) !\n");
2701   if (config_write_jitter > config_write_interval)
2702     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2703             " write interval (-w) !\n");
2705   if (config_write_base_only && config_base_dir == NULL)
2706     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2707             "  Consult the rrdcached documentation\n");
2709   if (journal_cur == NULL)
2710     config_flush_at_shutdown = 1;
2712   return (status);
2713 } /* }}} int read_options */
2715 int main (int argc, char **argv)
2717   int status;
2719   status = read_options (argc, argv);
2720   if (status != 0)
2721   {
2722     if (status < 0)
2723       status = 0;
2724     return (status);
2725   }
2727   status = daemonize ();
2728   if (status != 0)
2729   {
2730     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2731     return (1);
2732   }
2734   journal_init();
2736   /* start the queue thread */
2737   memset (&queue_thread, 0, sizeof (queue_thread));
2738   status = pthread_create (&queue_thread,
2739                            NULL, /* attr */
2740                            queue_thread_main,
2741                            NULL); /* args */
2742   if (status != 0)
2743   {
2744     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2745     cleanup();
2746     return (1);
2747   }
2749   listen_thread_main (NULL);
2750   cleanup ();
2752   return (0);
2753 } /* int main */
2755 /*
2756  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2757  */