Code

document the "QUIT" command -- kevin
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111  * Types
112  */
113 typedef enum
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
159 struct callback_flush_data_s
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree          *cache_tree = NULL;
198 static cache_item_t   *cache_queue_head = NULL;
199 static cache_item_t   *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter   = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /* 
234  * Functions
235  */
236 static void sig_common (const char *sig) /* {{{ */
238   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239   do_shutdown++;
240   pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
245   sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
250   sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
255   config_flush_at_shutdown = 1;
256   sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
261   config_flush_at_shutdown = 0;
262   sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
267   /* These structures are static, because `sigaction' behaves weird if the are
268    * overwritten.. */
269   static struct sigaction sa_int;
270   static struct sigaction sa_term;
271   static struct sigaction sa_pipe;
272   static struct sigaction sa_usr1;
273   static struct sigaction sa_usr2;
275   /* Install signal handlers */
276   memset (&sa_int, 0, sizeof (sa_int));
277   sa_int.sa_handler = sig_int_handler;
278   sigaction (SIGINT, &sa_int, NULL);
280   memset (&sa_term, 0, sizeof (sa_term));
281   sa_term.sa_handler = sig_term_handler;
282   sigaction (SIGTERM, &sa_term, NULL);
284   memset (&sa_pipe, 0, sizeof (sa_pipe));
285   sa_pipe.sa_handler = SIG_IGN;
286   sigaction (SIGPIPE, &sa_pipe, NULL);
288   memset (&sa_pipe, 0, sizeof (sa_usr1));
289   sa_usr1.sa_handler = sig_usr1_handler;
290   sigaction (SIGUSR1, &sa_usr1, NULL);
292   memset (&sa_usr2, 0, sizeof (sa_usr2));
293   sa_usr2.sa_handler = sig_usr2_handler;
294   sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
300   int fd;
301   char *file;
303   file = (config_pid_file != NULL)
304     ? config_pid_file
305     : LOCALSTATEDIR "/run/rrdcached.pid";
307   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308   if (fd < 0)
309     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310             action, file, rrd_strerror(errno));
312   return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
318   int pid_fd;
319   pid_t pid;
320   char pid_str[16];
322   pid_fd = open_pidfile("open", O_RDWR);
323   if (pid_fd < 0)
324     return pid_fd;
326   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327     return -1;
329   pid = atoi(pid_str);
330   if (pid <= 0)
331     return -1;
333   /* another running process that we can signal COULD be
334    * a competing rrdcached */
335   if (pid != getpid() && kill(pid, 0) == 0)
336   {
337     fprintf(stderr,
338             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339     close(pid_fd);
340     return -1;
341   }
343   lseek(pid_fd, 0, SEEK_SET);
344   ftruncate(pid_fd, 0);
346   fprintf(stderr,
347           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348           "rrdcached: starting normally.\n", pid);
350   return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
355   pid_t pid;
356   FILE *fh;
358   pid = getpid ();
360   fh = fdopen (fd, "w");
361   if (fh == NULL)
362   {
363     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364     close(fd);
365     return (-1);
366   }
368   fprintf (fh, "%i\n", (int) pid);
369   fclose (fh);
371   return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
376   char *file;
377   int status;
379   file = (config_pid_file != NULL)
380     ? config_pid_file
381     : LOCALSTATEDIR "/run/rrdcached.pid";
383   status = unlink (file);
384   if (status == 0)
385     return (0);
386   return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
391   char *eol;
393   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394                sock->next_read - sock->next_cmd);
396   if (eol == NULL)
397   {
398     /* no commands left, move remainder back to front of rbuf */
399     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400             sock->next_read - sock->next_cmd);
401     sock->next_read -= sock->next_cmd;
402     sock->next_cmd = 0;
403     *len = 0;
404     return NULL;
405   }
406   else
407   {
408     char *cmd = sock->rbuf + sock->next_cmd;
409     *eol = '\0';
411     sock->next_cmd = eol - sock->rbuf + 1;
413     if (eol > sock->rbuf && *(eol-1) == '\r')
414       *(--eol) = '\0'; /* handle "\r\n" EOL */
416     *len = eol - cmd;
418     return cmd;
419   }
421   /* NOTREACHED */
422   assert(1==0);
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
428   char *new_buf;
430   assert(sock != NULL);
432   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433   if (new_buf == NULL)
434   {
435     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436     return -1;
437   }
439   strncpy(new_buf + sock->wbuf_len, str, len + 1);
441   sock->wbuf = new_buf;
442   sock->wbuf_len += len;
444   return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
450   va_list argp;
451   char buffer[CMD_MAX];
452   int len;
454   if (sock == NULL) return 0; /* journal replay mode */
455   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461   len = vsprintf(buffer, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465   {
466     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467     return -1;
468   }
470   return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
475   int lines = 0;
477   if (str != NULL)
478   {
479     while ((str = strchr(str, '\n')) != NULL)
480     {
481       ++lines;
482       ++str;
483     }
484   }
486   return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490  * returns 0 on success, -1 on error
491  * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493                           char *fmt, ...) /* {{{ */
495   va_list argp;
496   char buffer[CMD_MAX];
497   int lines;
498   ssize_t wrote;
499   int rclen, len;
501   if (sock == NULL) return rc;  /* journal replay mode */
503   if (sock->batch_start)
504   {
505     if (rc == RESP_OK)
506       return rc; /* no response on success during BATCH */
507     lines = sock->batch_cmd;
508   }
509   else if (rc == RESP_OK)
510     lines = count_lines(sock->wbuf);
511   else
512     lines = -1;
514   rclen = sprintf(buffer, "%d ", lines);
515   va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519   len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521   va_end(argp);
522   if (len < 0)
523     return -1;
525   len += rclen;
527   /* append the result to the wbuf, don't write to the user */
528   if (sock->batch_start)
529     return add_to_wbuf(sock, buffer, len);
531   /* first write must be complete */
532   if (len != write(sock->fd, buffer, len))
533   {
534     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535     return -1;
536   }
538   if (sock->wbuf != NULL && rc == RESP_OK)
539   {
540     wrote = 0;
541     while (wrote < sock->wbuf_len)
542     {
543       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544       if (wb <= 0)
545       {
546         RRDD_LOG(LOG_INFO, "send_response: could not write results");
547         return -1;
548       }
549       wrote += wb;
550     }
551   }
553   free(sock->wbuf); sock->wbuf = NULL;
554   sock->wbuf_len = 0;
556   return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
561   ci->values = NULL;
562   ci->values_num = 0;
564   ci->last_flush_time = when;
565   if (config_write_jitter > 0)
566     ci->last_flush_time += (random() % config_write_jitter);
569 /* remove_from_queue
570  * remove a "cache_item_t" item from the queue.
571  * must hold 'cache_lock' when calling this
572  */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
575   if (ci == NULL) return;
576   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578   if (ci->prev == NULL)
579     cache_queue_head = ci->next; /* reset head */
580   else
581     ci->prev->next = ci->next;
583   if (ci->next == NULL)
584     cache_queue_tail = ci->prev; /* reset the tail */
585   else
586     ci->next->prev = ci->prev;
588   ci->next = ci->prev = NULL;
589   ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
592 /* free the resources associated with the cache_item_t
593  * must hold cache_lock when calling this function
594  */
595 static void *free_cache_item(cache_item_t *ci) /* {{{ */
597   if (ci == NULL) return NULL;
599   remove_from_queue(ci);
601   for (int i=0; i < ci->values_num; i++)
602     free(ci->values[i]);
604   free (ci->values);
605   free (ci->file);
607   /* in case anyone is waiting */
608   pthread_cond_broadcast(&ci->flushed);
610   free (ci);
612   return NULL;
613 } /* }}} static void *free_cache_item */
615 /*
616  * enqueue_cache_item:
617  * `cache_lock' must be acquired before calling this function!
618  */
619 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
620     queue_side_t side)
622   if (ci == NULL)
623     return (-1);
625   if (ci->values_num == 0)
626     return (0);
628   if (side == HEAD)
629   {
630     if (cache_queue_head == ci)
631       return 0;
633     /* remove if further down in queue */
634     remove_from_queue(ci);
636     ci->prev = NULL;
637     ci->next = cache_queue_head;
638     if (ci->next != NULL)
639       ci->next->prev = ci;
640     cache_queue_head = ci;
642     if (cache_queue_tail == NULL)
643       cache_queue_tail = cache_queue_head;
644   }
645   else /* (side == TAIL) */
646   {
647     /* We don't move values back in the list.. */
648     if (ci->flags & CI_FLAGS_IN_QUEUE)
649       return (0);
651     assert (ci->next == NULL);
652     assert (ci->prev == NULL);
654     ci->prev = cache_queue_tail;
656     if (cache_queue_tail == NULL)
657       cache_queue_head = ci;
658     else
659       cache_queue_tail->next = ci;
661     cache_queue_tail = ci;
662   }
664   ci->flags |= CI_FLAGS_IN_QUEUE;
666   pthread_cond_broadcast(&cache_cond);
667   pthread_mutex_lock (&stats_lock);
668   stats_queue_length++;
669   pthread_mutex_unlock (&stats_lock);
671   return (0);
672 } /* }}} int enqueue_cache_item */
674 /*
675  * tree_callback_flush:
676  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
677  * while this is in progress.
678  */
679 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
680     gpointer data)
682   cache_item_t *ci;
683   callback_flush_data_t *cfd;
685   ci = (cache_item_t *) value;
686   cfd = (callback_flush_data_t *) data;
688   if (ci->flags & CI_FLAGS_IN_QUEUE)
689     return FALSE;
691   if ((ci->last_flush_time <= cfd->abs_timeout)
692       && (ci->values_num > 0))
693   {
694     enqueue_cache_item (ci, TAIL);
695   }
696   else if ((do_shutdown != 0)
697       && (ci->values_num > 0))
698   {
699     enqueue_cache_item (ci, TAIL);
700   }
701   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
702       && (ci->values_num <= 0))
703   {
704     char **temp;
706     temp = (char **) realloc (cfd->keys,
707         sizeof (char *) * (cfd->keys_num + 1));
708     if (temp == NULL)
709     {
710       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
711       return (FALSE);
712     }
713     cfd->keys = temp;
714     /* Make really sure this points to the _same_ place */
715     assert ((char *) key == ci->file);
716     cfd->keys[cfd->keys_num] = (char *) key;
717     cfd->keys_num++;
718   }
720   return (FALSE);
721 } /* }}} gboolean tree_callback_flush */
723 static int flush_old_values (int max_age)
725   callback_flush_data_t cfd;
726   size_t k;
728   memset (&cfd, 0, sizeof (cfd));
729   /* Pass the current time as user data so that we don't need to call
730    * `time' for each node. */
731   cfd.now = time (NULL);
732   cfd.keys = NULL;
733   cfd.keys_num = 0;
735   if (max_age > 0)
736     cfd.abs_timeout = cfd.now - max_age;
737   else
738     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
740   /* `tree_callback_flush' will return the keys of all values that haven't
741    * been touched in the last `config_flush_interval' seconds in `cfd'.
742    * The char*'s in this array point to the same memory as ci->file, so we
743    * don't need to free them separately. */
744   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
746   for (k = 0; k < cfd.keys_num; k++)
747   {
748     /* should never fail, since we have held the cache_lock
749      * the entire time */
750     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
751   }
753   if (cfd.keys != NULL)
754   {
755     free (cfd.keys);
756     cfd.keys = NULL;
757   }
759   return (0);
760 } /* int flush_old_values */
762 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
764   struct timeval now;
765   struct timespec next_flush;
766   int final_flush = 0; /* make sure we only flush once on shutdown */
768   gettimeofday (&now, NULL);
769   next_flush.tv_sec = now.tv_sec + config_flush_interval;
770   next_flush.tv_nsec = 1000 * now.tv_usec;
772   pthread_mutex_lock (&cache_lock);
773   while ((do_shutdown == 0) || (cache_queue_head != NULL))
774   {
775     cache_item_t *ci;
776     char *file;
777     char **values;
778     int values_num;
779     int status;
780     int i;
782     /* First, check if it's time to do the cache flush. */
783     gettimeofday (&now, NULL);
784     if ((now.tv_sec > next_flush.tv_sec)
785         || ((now.tv_sec == next_flush.tv_sec)
786           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
787     {
788       /* Flush all values that haven't been written in the last
789        * `config_write_interval' seconds. */
790       flush_old_values (config_write_interval);
792       /* Determine the time of the next cache flush. */
793       next_flush.tv_sec =
794         now.tv_sec + next_flush.tv_sec % config_flush_interval;
796       /* unlock the cache while we rotate so we don't block incoming
797        * updates if the fsync() blocks on disk I/O */
798       pthread_mutex_unlock(&cache_lock);
799       journal_rotate();
800       pthread_mutex_lock(&cache_lock);
801     }
803     /* Now, check if there's something to store away. If not, wait until
804      * something comes in or it's time to do the cache flush.  if we are
805      * shutting down, do not wait around.  */
806     if (cache_queue_head == NULL && !do_shutdown)
807     {
808       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
809       if ((status != 0) && (status != ETIMEDOUT))
810       {
811         RRDD_LOG (LOG_ERR, "queue_thread_main: "
812             "pthread_cond_timedwait returned %i.", status);
813       }
814     }
816     /* We're about to shut down */
817     if (do_shutdown != 0 && !final_flush++)
818     {
819       if (config_flush_at_shutdown)
820         flush_old_values (-1); /* flush everything */
821       else
822         break;
823     }
825     /* Check if a value has arrived. This may be NULL if we timed out or there
826      * was an interrupt such as a signal. */
827     if (cache_queue_head == NULL)
828       continue;
830     ci = cache_queue_head;
832     /* copy the relevant parts */
833     file = strdup (ci->file);
834     if (file == NULL)
835     {
836       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
837       continue;
838     }
840     assert(ci->values != NULL);
841     assert(ci->values_num > 0);
843     values = ci->values;
844     values_num = ci->values_num;
846     wipe_ci_values(ci, time(NULL));
847     remove_from_queue(ci);
849     pthread_mutex_lock (&stats_lock);
850     assert (stats_queue_length > 0);
851     stats_queue_length--;
852     pthread_mutex_unlock (&stats_lock);
854     pthread_mutex_unlock (&cache_lock);
856     rrd_clear_error ();
857     status = rrd_update_r (file, NULL, values_num, (void *) values);
858     if (status != 0)
859     {
860       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
861           "rrd_update_r (%s) failed with status %i. (%s)",
862           file, status, rrd_get_error());
863     }
865     journal_write("wrote", file);
866     pthread_cond_broadcast(&ci->flushed);
868     for (i = 0; i < values_num; i++)
869       free (values[i]);
871     free(values);
872     free(file);
874     if (status == 0)
875     {
876       pthread_mutex_lock (&stats_lock);
877       stats_updates_written++;
878       stats_data_sets_written += values_num;
879       pthread_mutex_unlock (&stats_lock);
880     }
882     pthread_mutex_lock (&cache_lock);
884     /* We're about to shut down */
885     if (do_shutdown != 0 && !final_flush++)
886     {
887       if (config_flush_at_shutdown)
888           flush_old_values (-1); /* flush everything */
889       else
890         break;
891     }
892   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
893   pthread_mutex_unlock (&cache_lock);
895   if (config_flush_at_shutdown)
896   {
897     assert(cache_queue_head == NULL);
898     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
899   }
901   journal_done();
903   return (NULL);
904 } /* }}} void *queue_thread_main */
906 static int buffer_get_field (char **buffer_ret, /* {{{ */
907     size_t *buffer_size_ret, char **field_ret)
909   char *buffer;
910   size_t buffer_pos;
911   size_t buffer_size;
912   char *field;
913   size_t field_size;
914   int status;
916   buffer = *buffer_ret;
917   buffer_pos = 0;
918   buffer_size = *buffer_size_ret;
919   field = *buffer_ret;
920   field_size = 0;
922   if (buffer_size <= 0)
923     return (-1);
925   /* This is ensured by `handle_request'. */
926   assert (buffer[buffer_size - 1] == '\0');
928   status = -1;
929   while (buffer_pos < buffer_size)
930   {
931     /* Check for end-of-field or end-of-buffer */
932     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
933     {
934       field[field_size] = 0;
935       field_size++;
936       buffer_pos++;
937       status = 0;
938       break;
939     }
940     /* Handle escaped characters. */
941     else if (buffer[buffer_pos] == '\\')
942     {
943       if (buffer_pos >= (buffer_size - 1))
944         break;
945       buffer_pos++;
946       field[field_size] = buffer[buffer_pos];
947       field_size++;
948       buffer_pos++;
949     }
950     /* Normal operation */ 
951     else
952     {
953       field[field_size] = buffer[buffer_pos];
954       field_size++;
955       buffer_pos++;
956     }
957   } /* while (buffer_pos < buffer_size) */
959   if (status != 0)
960     return (status);
962   *buffer_ret = buffer + buffer_pos;
963   *buffer_size_ret = buffer_size - buffer_pos;
964   *field_ret = field;
966   return (0);
967 } /* }}} int buffer_get_field */
969 /* if we're restricting writes to the base directory,
970  * check whether the file falls within the dir
971  * returns 1 if OK, otherwise 0
972  */
973 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
975   assert(file != NULL);
977   if (!config_write_base_only
978       || sock == NULL /* journal replay */
979       || config_base_dir == NULL)
980     return 1;
982   if (strstr(file, "../") != NULL) goto err;
984   /* relative paths without "../" are ok */
985   if (*file != '/') return 1;
987   /* file must be of the format base + "/" + <1+ char filename> */
988   if (strlen(file) < _config_base_dir_len + 2) goto err;
989   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
990   if (*(file + _config_base_dir_len) != '/') goto err;
992   return 1;
994 err:
995   if (sock != NULL && sock->fd >= 0)
996     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
998   return 0;
999 } /* }}} static int check_file_access */
1001 /* when using a base dir, convert relative paths to absolute paths.
1002  * if necessary, modifies the "filename" pointer to point
1003  * to the new path created in "tmp".  "tmp" is provided
1004  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1005  *
1006  * this allows us to optimize for the expected case (absolute path)
1007  * with a no-op.
1008  */
1009 static void get_abs_path(char **filename, char *tmp)
1011   assert(tmp != NULL);
1012   assert(filename != NULL && *filename != NULL);
1014   if (config_base_dir == NULL || **filename == '/')
1015     return;
1017   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1018   *filename = tmp;
1019 } /* }}} static int get_abs_path */
1021 /* returns 1 if we have the required privilege level,
1022  * otherwise issue an error to the user on sock */
1023 static int has_privilege (listen_socket_t *sock, /* {{{ */
1024                           socket_privilege priv)
1026   if (sock == NULL) /* journal replay */
1027     return 1;
1029   if (sock->privilege >= priv)
1030     return 1;
1032   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033 } /* }}} static int has_privilege */
1035 static int flush_file (const char *filename) /* {{{ */
1037   cache_item_t *ci;
1039   pthread_mutex_lock (&cache_lock);
1041   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1042   if (ci == NULL)
1043   {
1044     pthread_mutex_unlock (&cache_lock);
1045     return (ENOENT);
1046   }
1048   if (ci->values_num > 0)
1049   {
1050     /* Enqueue at head */
1051     enqueue_cache_item (ci, HEAD);
1052     pthread_cond_wait(&ci->flushed, &cache_lock);
1053   }
1055   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1056    * may have been purged during our cond_wait() */
1058   pthread_mutex_unlock(&cache_lock);
1060   return (0);
1061 } /* }}} int flush_file */
1063 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1064     char *buffer, size_t buffer_size)
1066   int status;
1067   char **help_text;
1068   char *command;
1070   char *help_help[2] =
1071   {
1072     "Command overview\n"
1073     ,
1074     "HELP [<command>]\n"
1075     "FLUSH <filename>\n"
1076     "FLUSHALL\n"
1077     "PENDING <filename>\n"
1078     "FORGET <filename>\n"
1079     "UPDATE <filename> <values> [<values> ...]\n"
1080     "BATCH\n"
1081     "STATS\n"
1082     "QUIT\n"
1083   };
1085   char *help_flush[2] =
1086   {
1087     "Help for FLUSH\n"
1088     ,
1089     "Usage: FLUSH <filename>\n"
1090     "\n"
1091     "Adds the given filename to the head of the update queue and returns\n"
1092     "after is has been dequeued.\n"
1093   };
1095   char *help_flushall[2] =
1096   {
1097     "Help for FLUSHALL\n"
1098     ,
1099     "Usage: FLUSHALL\n"
1100     "\n"
1101     "Triggers writing of all pending updates.  Returns immediately.\n"
1102   };
1104   char *help_pending[2] =
1105   {
1106     "Help for PENDING\n"
1107     ,
1108     "Usage: PENDING <filename>\n"
1109     "\n"
1110     "Shows any 'pending' updates for a file, in order.\n"
1111     "The updates shown have not yet been written to the underlying RRD file.\n"
1112   };
1114   char *help_forget[2] =
1115   {
1116     "Help for FORGET\n"
1117     ,
1118     "Usage: FORGET <filename>\n"
1119     "\n"
1120     "Removes the file completely from the cache.\n"
1121     "Any pending updates for the file will be lost.\n"
1122   };
1124   char *help_update[2] =
1125   {
1126     "Help for UPDATE\n"
1127     ,
1128     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1129     "\n"
1130     "Adds the given file to the internal cache if it is not yet known and\n"
1131     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1132     "for details.\n"
1133     "\n"
1134     "Each <values> has the following form:\n"
1135     "  <values> = <time>:<value>[:<value>[...]]\n"
1136     "See the rrdupdate(1) manpage for details.\n"
1137   };
1139   char *help_stats[2] =
1140   {
1141     "Help for STATS\n"
1142     ,
1143     "Usage: STATS\n"
1144     "\n"
1145     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1146     "a description of the values.\n"
1147   };
1149   char *help_batch[2] =
1150   {
1151     "Help for BATCH\n"
1152     ,
1153     "The 'BATCH' command permits the client to initiate a bulk load\n"
1154     "   of commands to rrdcached.\n"
1155     "\n"
1156     "Usage:\n"
1157     "\n"
1158     "    client: BATCH\n"
1159     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1160     "    client: command #1\n"
1161     "    client: command #2\n"
1162     "    client: ... and so on\n"
1163     "    client: .\n"
1164     "    server: 2 errors\n"
1165     "    server: 7 message for command #7\n"
1166     "    server: 9 message for command #9\n"
1167     "\n"
1168     "For more information, consult the rrdcached(1) documentation.\n"
1169   };
1171   char *help_quit[2] =
1172   {
1173     "Help for QUIT\n"
1174     ,
1175     "Disconnect from rrdcached.\n"
1176   };
1178   status = buffer_get_field (&buffer, &buffer_size, &command);
1179   if (status != 0)
1180     help_text = help_help;
1181   else
1182   {
1183     if (strcasecmp (command, "update") == 0)
1184       help_text = help_update;
1185     else if (strcasecmp (command, "flush") == 0)
1186       help_text = help_flush;
1187     else if (strcasecmp (command, "flushall") == 0)
1188       help_text = help_flushall;
1189     else if (strcasecmp (command, "pending") == 0)
1190       help_text = help_pending;
1191     else if (strcasecmp (command, "forget") == 0)
1192       help_text = help_forget;
1193     else if (strcasecmp (command, "stats") == 0)
1194       help_text = help_stats;
1195     else if (strcasecmp (command, "batch") == 0)
1196       help_text = help_batch;
1197     else if (strcasecmp (command, "quit") == 0)
1198       help_text = help_quit;
1199     else
1200       help_text = help_help;
1201   }
1203   add_response_info(sock, help_text[1]);
1204   return send_response(sock, RESP_OK, help_text[0]);
1205 } /* }}} int handle_request_help */
1207 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1209   uint64_t copy_queue_length;
1210   uint64_t copy_updates_received;
1211   uint64_t copy_flush_received;
1212   uint64_t copy_updates_written;
1213   uint64_t copy_data_sets_written;
1214   uint64_t copy_journal_bytes;
1215   uint64_t copy_journal_rotate;
1217   uint64_t tree_nodes_number;
1218   uint64_t tree_depth;
1220   pthread_mutex_lock (&stats_lock);
1221   copy_queue_length       = stats_queue_length;
1222   copy_updates_received   = stats_updates_received;
1223   copy_flush_received     = stats_flush_received;
1224   copy_updates_written    = stats_updates_written;
1225   copy_data_sets_written  = stats_data_sets_written;
1226   copy_journal_bytes      = stats_journal_bytes;
1227   copy_journal_rotate     = stats_journal_rotate;
1228   pthread_mutex_unlock (&stats_lock);
1230   pthread_mutex_lock (&cache_lock);
1231   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1232   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1233   pthread_mutex_unlock (&cache_lock);
1235   add_response_info(sock,
1236                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1237   add_response_info(sock,
1238                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1239   add_response_info(sock,
1240                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1241   add_response_info(sock,
1242                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1243   add_response_info(sock,
1244                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1245   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1246   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1247   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1248   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1250   send_response(sock, RESP_OK, "Statistics follow\n");
1252   return (0);
1253 } /* }}} int handle_request_stats */
1255 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1256     char *buffer, size_t buffer_size)
1258   char *file, file_tmp[PATH_MAX];
1259   int status;
1261   status = buffer_get_field (&buffer, &buffer_size, &file);
1262   if (status != 0)
1263   {
1264     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1265   }
1266   else
1267   {
1268     pthread_mutex_lock(&stats_lock);
1269     stats_flush_received++;
1270     pthread_mutex_unlock(&stats_lock);
1272     get_abs_path(&file, file_tmp);
1273     if (!check_file_access(file, sock)) return 0;
1275     status = flush_file (file);
1276     if (status == 0)
1277       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1278     else if (status == ENOENT)
1279     {
1280       /* no file in our tree; see whether it exists at all */
1281       struct stat statbuf;
1283       memset(&statbuf, 0, sizeof(statbuf));
1284       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1285         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1286       else
1287         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1288     }
1289     else if (status < 0)
1290       return send_response(sock, RESP_ERR, "Internal error.\n");
1291     else
1292       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1293   }
1295   /* NOTREACHED */
1296   assert(1==0);
1297 } /* }}} int handle_request_flush */
1299 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1301   int status;
1303   status = has_privilege(sock, PRIV_HIGH);
1304   if (status <= 0)
1305     return status;
1307   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1309   pthread_mutex_lock(&cache_lock);
1310   flush_old_values(-1);
1311   pthread_mutex_unlock(&cache_lock);
1313   return send_response(sock, RESP_OK, "Started flush.\n");
1314 } /* }}} static int handle_request_flushall */
1316 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1317                                   char *buffer, size_t buffer_size)
1319   int status;
1320   char *file, file_tmp[PATH_MAX];
1321   cache_item_t *ci;
1323   status = buffer_get_field(&buffer, &buffer_size, &file);
1324   if (status != 0)
1325     return send_response(sock, RESP_ERR,
1326                          "Usage: PENDING <filename>\n");
1328   status = has_privilege(sock, PRIV_HIGH);
1329   if (status <= 0)
1330     return status;
1332   get_abs_path(&file, file_tmp);
1334   pthread_mutex_lock(&cache_lock);
1335   ci = g_tree_lookup(cache_tree, file);
1336   if (ci == NULL)
1337   {
1338     pthread_mutex_unlock(&cache_lock);
1339     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1340   }
1342   for (int i=0; i < ci->values_num; i++)
1343     add_response_info(sock, "%s\n", ci->values[i]);
1345   pthread_mutex_unlock(&cache_lock);
1346   return send_response(sock, RESP_OK, "updates pending\n");
1347 } /* }}} static int handle_request_pending */
1349 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1350                                  char *buffer, size_t buffer_size)
1352   int status;
1353   gboolean found;
1354   char *file, file_tmp[PATH_MAX];
1356   status = buffer_get_field(&buffer, &buffer_size, &file);
1357   if (status != 0)
1358     return send_response(sock, RESP_ERR,
1359                          "Usage: FORGET <filename>\n");
1361   status = has_privilege(sock, PRIV_HIGH);
1362   if (status <= 0)
1363     return status;
1365   get_abs_path(&file, file_tmp);
1366   if (!check_file_access(file, sock)) return 0;
1368   pthread_mutex_lock(&cache_lock);
1369   found = g_tree_remove(cache_tree, file);
1370   pthread_mutex_unlock(&cache_lock);
1372   if (found == TRUE)
1373   {
1374     if (sock != NULL)
1375       journal_write("forget", file);
1377     return send_response(sock, RESP_OK, "Gone!\n");
1378   }
1379   else
1380     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1382   /* NOTREACHED */
1383   assert(1==0);
1384 } /* }}} static int handle_request_forget */
1386 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1387                                   time_t now,
1388                                   char *buffer, size_t buffer_size)
1390   char *file, file_tmp[PATH_MAX];
1391   int values_num = 0;
1392   int status;
1393   char orig_buf[CMD_MAX];
1395   cache_item_t *ci;
1397   status = has_privilege(sock, PRIV_HIGH);
1398   if (status <= 0)
1399     return status;
1401   /* save it for the journal later */
1402   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1404   status = buffer_get_field (&buffer, &buffer_size, &file);
1405   if (status != 0)
1406     return send_response(sock, RESP_ERR,
1407                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1409   pthread_mutex_lock(&stats_lock);
1410   stats_updates_received++;
1411   pthread_mutex_unlock(&stats_lock);
1413   get_abs_path(&file, file_tmp);
1414   if (!check_file_access(file, sock)) return 0;
1416   pthread_mutex_lock (&cache_lock);
1417   ci = g_tree_lookup (cache_tree, file);
1419   if (ci == NULL) /* {{{ */
1420   {
1421     struct stat statbuf;
1423     /* don't hold the lock while we setup; stat(2) might block */
1424     pthread_mutex_unlock(&cache_lock);
1426     memset (&statbuf, 0, sizeof (statbuf));
1427     status = stat (file, &statbuf);
1428     if (status != 0)
1429     {
1430       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1432       status = errno;
1433       if (status == ENOENT)
1434         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1435       else
1436         return send_response(sock, RESP_ERR,
1437                              "stat failed with error %i.\n", status);
1438     }
1439     if (!S_ISREG (statbuf.st_mode))
1440       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1442     if (access(file, R_OK|W_OK) != 0)
1443       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1444                            file, rrd_strerror(errno));
1446     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1447     if (ci == NULL)
1448     {
1449       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1451       return send_response(sock, RESP_ERR, "malloc failed.\n");
1452     }
1453     memset (ci, 0, sizeof (cache_item_t));
1455     ci->file = strdup (file);
1456     if (ci->file == NULL)
1457     {
1458       free (ci);
1459       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1461       return send_response(sock, RESP_ERR, "strdup failed.\n");
1462     }
1464     wipe_ci_values(ci, now);
1465     ci->flags = CI_FLAGS_IN_TREE;
1466     pthread_cond_init(&ci->flushed, NULL);
1468     pthread_mutex_lock(&cache_lock);
1469     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1470   } /* }}} */
1471   assert (ci != NULL);
1473   /* don't re-write updates in replay mode */
1474   if (sock != NULL)
1475     journal_write("update", orig_buf);
1477   while (buffer_size > 0)
1478   {
1479     char **temp;
1480     char *value;
1481     time_t stamp;
1482     char *eostamp;
1484     status = buffer_get_field (&buffer, &buffer_size, &value);
1485     if (status != 0)
1486     {
1487       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1488       break;
1489     }
1491     /* make sure update time is always moving forward */
1492     stamp = strtol(value, &eostamp, 10);
1493     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1494     {
1495       pthread_mutex_unlock(&cache_lock);
1496       return send_response(sock, RESP_ERR,
1497                            "Cannot find timestamp in '%s'!\n", value);
1498     }
1499     else if (stamp <= ci->last_update_stamp)
1500     {
1501       pthread_mutex_unlock(&cache_lock);
1502       return send_response(sock, RESP_ERR,
1503                            "illegal attempt to update using time %ld when last"
1504                            " update time is %ld (minimum one second step)\n",
1505                            stamp, ci->last_update_stamp);
1506     }
1507     else
1508       ci->last_update_stamp = stamp;
1510     temp = (char **) realloc (ci->values,
1511         sizeof (char *) * (ci->values_num + 1));
1512     if (temp == NULL)
1513     {
1514       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1515       continue;
1516     }
1517     ci->values = temp;
1519     ci->values[ci->values_num] = strdup (value);
1520     if (ci->values[ci->values_num] == NULL)
1521     {
1522       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1523       continue;
1524     }
1525     ci->values_num++;
1527     values_num++;
1528   }
1530   if (((now - ci->last_flush_time) >= config_write_interval)
1531       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1532       && (ci->values_num > 0))
1533   {
1534     enqueue_cache_item (ci, TAIL);
1535   }
1537   pthread_mutex_unlock (&cache_lock);
1539   if (values_num < 1)
1540     return send_response(sock, RESP_ERR, "No values updated.\n");
1541   else
1542     return send_response(sock, RESP_OK,
1543                          "errors, enqueued %i value(s).\n", values_num);
1545   /* NOTREACHED */
1546   assert(1==0);
1548 } /* }}} int handle_request_update */
1550 /* we came across a "WROTE" entry during journal replay.
1551  * throw away any values that we have accumulated for this file
1552  */
1553 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1555   int i;
1556   cache_item_t *ci;
1557   const char *file = buffer;
1559   pthread_mutex_lock(&cache_lock);
1561   ci = g_tree_lookup(cache_tree, file);
1562   if (ci == NULL)
1563   {
1564     pthread_mutex_unlock(&cache_lock);
1565     return (0);
1566   }
1568   if (ci->values)
1569   {
1570     for (i=0; i < ci->values_num; i++)
1571       free(ci->values[i]);
1573     free(ci->values);
1574   }
1576   wipe_ci_values(ci, now);
1577   remove_from_queue(ci);
1579   pthread_mutex_unlock(&cache_lock);
1580   return (0);
1581 } /* }}} int handle_request_wrote */
1583 /* start "BATCH" processing */
1584 static int batch_start (listen_socket_t *sock) /* {{{ */
1586   int status;
1587   if (sock->batch_start)
1588     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1590   status = send_response(sock, RESP_OK,
1591                          "Go ahead.  End with dot '.' on its own line.\n");
1592   sock->batch_start = time(NULL);
1593   sock->batch_cmd = 0;
1595   return status;
1596 } /* }}} static int batch_start */
1598 /* finish "BATCH" processing and return results to the client */
1599 static int batch_done (listen_socket_t *sock) /* {{{ */
1601   assert(sock->batch_start);
1602   sock->batch_start = 0;
1603   sock->batch_cmd  = 0;
1604   return send_response(sock, RESP_OK, "errors\n");
1605 } /* }}} static int batch_done */
1607 /* if sock==NULL, we are in journal replay mode */
1608 static int handle_request (listen_socket_t *sock, /* {{{ */
1609                            time_t now,
1610                            char *buffer, size_t buffer_size)
1612   char *buffer_ptr;
1613   char *command;
1614   int status;
1616   assert (buffer[buffer_size - 1] == '\0');
1618   buffer_ptr = buffer;
1619   command = NULL;
1620   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1621   if (status != 0)
1622   {
1623     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1624     return (-1);
1625   }
1627   if (sock != NULL && sock->batch_start)
1628     sock->batch_cmd++;
1630   if (strcasecmp (command, "update") == 0)
1631     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1632   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1633   {
1634     /* this is only valid in replay mode */
1635     return (handle_request_wrote (buffer_ptr, now));
1636   }
1637   else if (strcasecmp (command, "flush") == 0)
1638     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1639   else if (strcasecmp (command, "flushall") == 0)
1640     return (handle_request_flushall(sock));
1641   else if (strcasecmp (command, "pending") == 0)
1642     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1643   else if (strcasecmp (command, "forget") == 0)
1644     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1645   else if (strcasecmp (command, "stats") == 0)
1646     return (handle_request_stats (sock));
1647   else if (strcasecmp (command, "help") == 0)
1648     return (handle_request_help (sock, buffer_ptr, buffer_size));
1649   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1650     return batch_start(sock);
1651   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1652     return batch_done(sock);
1653   else if (strcasecmp (command, "quit") == 0)
1654     return -1;
1655   else
1656     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1658   /* NOTREACHED */
1659   assert(1==0);
1660 } /* }}} int handle_request */
1662 /* MUST NOT hold journal_lock before calling this */
1663 static void journal_rotate(void) /* {{{ */
1665   FILE *old_fh = NULL;
1666   int new_fd;
1668   if (journal_cur == NULL || journal_old == NULL)
1669     return;
1671   pthread_mutex_lock(&journal_lock);
1673   /* we rotate this way (rename before close) so that the we can release
1674    * the journal lock as fast as possible.  Journal writes to the new
1675    * journal can proceed immediately after the new file is opened.  The
1676    * fclose can then block without affecting new updates.
1677    */
1678   if (journal_fh != NULL)
1679   {
1680     old_fh = journal_fh;
1681     journal_fh = NULL;
1682     rename(journal_cur, journal_old);
1683     ++stats_journal_rotate;
1684   }
1686   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1687                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1688   if (new_fd >= 0)
1689   {
1690     journal_fh = fdopen(new_fd, "a");
1691     if (journal_fh == NULL)
1692       close(new_fd);
1693   }
1695   pthread_mutex_unlock(&journal_lock);
1697   if (old_fh != NULL)
1698     fclose(old_fh);
1700   if (journal_fh == NULL)
1701   {
1702     RRDD_LOG(LOG_CRIT,
1703              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1704              journal_cur, rrd_strerror(errno));
1706     RRDD_LOG(LOG_ERR,
1707              "JOURNALING DISABLED: All values will be flushed at shutdown");
1708     config_flush_at_shutdown = 1;
1709   }
1711 } /* }}} static void journal_rotate */
1713 static void journal_done(void) /* {{{ */
1715   if (journal_cur == NULL)
1716     return;
1718   pthread_mutex_lock(&journal_lock);
1719   if (journal_fh != NULL)
1720   {
1721     fclose(journal_fh);
1722     journal_fh = NULL;
1723   }
1725   if (config_flush_at_shutdown)
1726   {
1727     RRDD_LOG(LOG_INFO, "removing journals");
1728     unlink(journal_old);
1729     unlink(journal_cur);
1730   }
1731   else
1732   {
1733     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1734              "journals will be used at next startup");
1735   }
1737   pthread_mutex_unlock(&journal_lock);
1739 } /* }}} static void journal_done */
1741 static int journal_write(char *cmd, char *args) /* {{{ */
1743   int chars;
1745   if (journal_fh == NULL)
1746     return 0;
1748   pthread_mutex_lock(&journal_lock);
1749   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1750   pthread_mutex_unlock(&journal_lock);
1752   if (chars > 0)
1753   {
1754     pthread_mutex_lock(&stats_lock);
1755     stats_journal_bytes += chars;
1756     pthread_mutex_unlock(&stats_lock);
1757   }
1759   return chars;
1760 } /* }}} static int journal_write */
1762 static int journal_replay (const char *file) /* {{{ */
1764   FILE *fh;
1765   int entry_cnt = 0;
1766   int fail_cnt = 0;
1767   uint64_t line = 0;
1768   char entry[CMD_MAX];
1769   time_t now;
1771   if (file == NULL) return 0;
1773   {
1774     char *reason = "unknown error";
1775     int status = 0;
1776     struct stat statbuf;
1778     memset(&statbuf, 0, sizeof(statbuf));
1779     if (stat(file, &statbuf) != 0)
1780     {
1781       if (errno == ENOENT)
1782         return 0;
1784       reason = "stat error";
1785       status = errno;
1786     }
1787     else if (!S_ISREG(statbuf.st_mode))
1788     {
1789       reason = "not a regular file";
1790       status = EPERM;
1791     }
1792     if (statbuf.st_uid != daemon_uid)
1793     {
1794       reason = "not owned by daemon user";
1795       status = EACCES;
1796     }
1797     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1798     {
1799       reason = "must not be user/group writable";
1800       status = EACCES;
1801     }
1803     if (status != 0)
1804     {
1805       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1806                file, rrd_strerror(status), reason);
1807       return 0;
1808     }
1809   }
1811   fh = fopen(file, "r");
1812   if (fh == NULL)
1813   {
1814     if (errno != ENOENT)
1815       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1816                file, rrd_strerror(errno));
1817     return 0;
1818   }
1819   else
1820     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1822   now = time(NULL);
1824   while(!feof(fh))
1825   {
1826     size_t entry_len;
1828     ++line;
1829     if (fgets(entry, sizeof(entry), fh) == NULL)
1830       break;
1831     entry_len = strlen(entry);
1833     /* check \n termination in case journal writing crashed mid-line */
1834     if (entry_len == 0)
1835       continue;
1836     else if (entry[entry_len - 1] != '\n')
1837     {
1838       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1839       ++fail_cnt;
1840       continue;
1841     }
1843     entry[entry_len - 1] = '\0';
1845     if (handle_request(NULL, now, entry, entry_len) == 0)
1846       ++entry_cnt;
1847     else
1848       ++fail_cnt;
1849   }
1851   fclose(fh);
1853   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1854            entry_cnt, fail_cnt);
1856   return entry_cnt > 0 ? 1 : 0;
1857 } /* }}} static int journal_replay */
1859 static void journal_init(void) /* {{{ */
1861   int had_journal = 0;
1863   if (journal_cur == NULL) return;
1865   pthread_mutex_lock(&journal_lock);
1867   RRDD_LOG(LOG_INFO, "checking for journal files");
1869   had_journal += journal_replay(journal_old);
1870   had_journal += journal_replay(journal_cur);
1872   /* it must have been a crash.  start a flush */
1873   if (had_journal && config_flush_at_shutdown)
1874     flush_old_values(-1);
1876   pthread_mutex_unlock(&journal_lock);
1877   journal_rotate();
1879   RRDD_LOG(LOG_INFO, "journal processing complete");
1881 } /* }}} static void journal_init */
1883 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1885   assert(sock != NULL);
1887   free(sock->rbuf);  sock->rbuf = NULL;
1888   free(sock->wbuf);  sock->wbuf = NULL;
1889   free(sock);
1890 } /* }}} void free_listen_socket */
1892 static void close_connection(listen_socket_t *sock) /* {{{ */
1894   if (sock->fd >= 0)
1895   {
1896     close(sock->fd);
1897     sock->fd = -1;
1898   }
1900   free_listen_socket(sock);
1902 } /* }}} void close_connection */
1904 static void *connection_thread_main (void *args) /* {{{ */
1906   listen_socket_t *sock;
1907   int i;
1908   int fd;
1910   sock = (listen_socket_t *) args;
1911   fd = sock->fd;
1913   /* init read buffers */
1914   sock->next_read = sock->next_cmd = 0;
1915   sock->rbuf = malloc(RBUF_SIZE);
1916   if (sock->rbuf == NULL)
1917   {
1918     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1919     close_connection(sock);
1920     return NULL;
1921   }
1923   pthread_mutex_lock (&connection_threads_lock);
1924   {
1925     pthread_t *temp;
1927     temp = (pthread_t *) realloc (connection_threads,
1928         sizeof (pthread_t) * (connection_threads_num + 1));
1929     if (temp == NULL)
1930     {
1931       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1932     }
1933     else
1934     {
1935       connection_threads = temp;
1936       connection_threads[connection_threads_num] = pthread_self ();
1937       connection_threads_num++;
1938     }
1939   }
1940   pthread_mutex_unlock (&connection_threads_lock);
1942   while (do_shutdown == 0)
1943   {
1944     char *cmd;
1945     ssize_t cmd_len;
1946     ssize_t rbytes;
1947     time_t now;
1949     struct pollfd pollfd;
1950     int status;
1952     pollfd.fd = fd;
1953     pollfd.events = POLLIN | POLLPRI;
1954     pollfd.revents = 0;
1956     status = poll (&pollfd, 1, /* timeout = */ 500);
1957     if (do_shutdown)
1958       break;
1959     else if (status == 0) /* timeout */
1960       continue;
1961     else if (status < 0) /* error */
1962     {
1963       status = errno;
1964       if (status != EINTR)
1965         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1966       continue;
1967     }
1969     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1970       break;
1971     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1972     {
1973       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1974           "poll(2) returned something unexpected: %#04hx",
1975           pollfd.revents);
1976       break;
1977     }
1979     rbytes = read(fd, sock->rbuf + sock->next_read,
1980                   RBUF_SIZE - sock->next_read);
1981     if (rbytes < 0)
1982     {
1983       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1984       break;
1985     }
1986     else if (rbytes == 0)
1987       break; /* eof */
1989     sock->next_read += rbytes;
1991     if (sock->batch_start)
1992       now = sock->batch_start;
1993     else
1994       now = time(NULL);
1996     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1997     {
1998       status = handle_request (sock, now, cmd, cmd_len+1);
1999       if (status != 0)
2000         goto out_close;
2001     }
2002   }
2004 out_close:
2005   close_connection(sock);
2007   /* Remove this thread from the connection threads list */
2008   pthread_mutex_lock (&connection_threads_lock);
2009   {
2010     pthread_t self;
2011     pthread_t *temp;
2013     /* Find out own index in the array */
2014     self = pthread_self ();
2015     for (i = 0; i < connection_threads_num; i++)
2016       if (pthread_equal (connection_threads[i], self) != 0)
2017         break;
2018     assert (i < connection_threads_num);
2020     /* Move the trailing threads forward. */
2021     if (i < (connection_threads_num - 1))
2022     {
2023       memmove (connection_threads + i,
2024                connection_threads + i + 1,
2025                sizeof (pthread_t) * (connection_threads_num - i - 1));
2026     }
2028     connection_threads_num--;
2030     temp = realloc(connection_threads,
2031                    sizeof(*connection_threads) * connection_threads_num);
2032     if (connection_threads_num > 0 && temp == NULL)
2033       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2034     else
2035       connection_threads = temp;
2036   }
2037   pthread_mutex_unlock (&connection_threads_lock);
2039   return (NULL);
2040 } /* }}} void *connection_thread_main */
2042 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2044   int fd;
2045   struct sockaddr_un sa;
2046   listen_socket_t *temp;
2047   int status;
2048   const char *path;
2050   path = sock->addr;
2051   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2052     path += strlen("unix:");
2054   temp = (listen_socket_t *) realloc (listen_fds,
2055       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2056   if (temp == NULL)
2057   {
2058     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2059     return (-1);
2060   }
2061   listen_fds = temp;
2062   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2064   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2065   if (fd < 0)
2066   {
2067     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2068              rrd_strerror(errno));
2069     return (-1);
2070   }
2072   memset (&sa, 0, sizeof (sa));
2073   sa.sun_family = AF_UNIX;
2074   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2076   /* if we've gotten this far, we own the pid file.  any daemon started
2077    * with the same args must not be alive.  therefore, ensure that we can
2078    * create the socket...
2079    */
2080   unlink(path);
2082   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2083   if (status != 0)
2084   {
2085     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2086              path, rrd_strerror(errno));
2087     close (fd);
2088     return (-1);
2089   }
2091   status = listen (fd, /* backlog = */ 10);
2092   if (status != 0)
2093   {
2094     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2095              path, rrd_strerror(errno));
2096     close (fd);
2097     unlink (path);
2098     return (-1);
2099   }
2101   listen_fds[listen_fds_num].fd = fd;
2102   listen_fds[listen_fds_num].family = PF_UNIX;
2103   strncpy(listen_fds[listen_fds_num].addr, path,
2104           sizeof (listen_fds[listen_fds_num].addr) - 1);
2105   listen_fds_num++;
2107   return (0);
2108 } /* }}} int open_listen_socket_unix */
2110 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2112   struct addrinfo ai_hints;
2113   struct addrinfo *ai_res;
2114   struct addrinfo *ai_ptr;
2115   char addr_copy[NI_MAXHOST];
2116   char *addr;
2117   char *port;
2118   int status;
2120   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2121   addr_copy[sizeof (addr_copy) - 1] = 0;
2122   addr = addr_copy;
2124   memset (&ai_hints, 0, sizeof (ai_hints));
2125   ai_hints.ai_flags = 0;
2126 #ifdef AI_ADDRCONFIG
2127   ai_hints.ai_flags |= AI_ADDRCONFIG;
2128 #endif
2129   ai_hints.ai_family = AF_UNSPEC;
2130   ai_hints.ai_socktype = SOCK_STREAM;
2132   port = NULL;
2133   if (*addr == '[') /* IPv6+port format */
2134   {
2135     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2136     addr++;
2138     port = strchr (addr, ']');
2139     if (port == NULL)
2140     {
2141       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2142       return (-1);
2143     }
2144     *port = 0;
2145     port++;
2147     if (*port == ':')
2148       port++;
2149     else if (*port == 0)
2150       port = NULL;
2151     else
2152     {
2153       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2154       return (-1);
2155     }
2156   } /* if (*addr = ']') */
2157   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2158   {
2159     port = rindex(addr, ':');
2160     if (port != NULL)
2161     {
2162       *port = 0;
2163       port++;
2164     }
2165   }
2166   ai_res = NULL;
2167   status = getaddrinfo (addr,
2168                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2169                         &ai_hints, &ai_res);
2170   if (status != 0)
2171   {
2172     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2173              addr, gai_strerror (status));
2174     return (-1);
2175   }
2177   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2178   {
2179     int fd;
2180     listen_socket_t *temp;
2181     int one = 1;
2183     temp = (listen_socket_t *) realloc (listen_fds,
2184         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2185     if (temp == NULL)
2186     {
2187       fprintf (stderr,
2188                "rrdcached: open_listen_socket_network: realloc failed.\n");
2189       continue;
2190     }
2191     listen_fds = temp;
2192     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2194     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2195     if (fd < 0)
2196     {
2197       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2198                rrd_strerror(errno));
2199       continue;
2200     }
2202     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2204     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2205     if (status != 0)
2206     {
2207       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2208                sock->addr, rrd_strerror(errno));
2209       close (fd);
2210       continue;
2211     }
2213     status = listen (fd, /* backlog = */ 10);
2214     if (status != 0)
2215     {
2216       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2217                sock->addr, rrd_strerror(errno));
2218       close (fd);
2219       freeaddrinfo(ai_res);
2220       return (-1);
2221     }
2223     listen_fds[listen_fds_num].fd = fd;
2224     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2225     listen_fds_num++;
2226   } /* for (ai_ptr) */
2228   freeaddrinfo(ai_res);
2229   return (0);
2230 } /* }}} static int open_listen_socket_network */
2232 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2234   assert(sock != NULL);
2235   assert(sock->addr != NULL);
2237   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2238       || sock->addr[0] == '/')
2239     return (open_listen_socket_unix(sock));
2240   else
2241     return (open_listen_socket_network(sock));
2242 } /* }}} int open_listen_socket */
2244 static int close_listen_sockets (void) /* {{{ */
2246   size_t i;
2248   for (i = 0; i < listen_fds_num; i++)
2249   {
2250     close (listen_fds[i].fd);
2252     if (listen_fds[i].family == PF_UNIX)
2253       unlink(listen_fds[i].addr);
2254   }
2256   free (listen_fds);
2257   listen_fds = NULL;
2258   listen_fds_num = 0;
2260   return (0);
2261 } /* }}} int close_listen_sockets */
2263 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2265   struct pollfd *pollfds;
2266   int pollfds_num;
2267   int status;
2268   int i;
2270   if (listen_fds_num < 1)
2271   {
2272     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2273     return (NULL);
2274   }
2276   pollfds_num = listen_fds_num;
2277   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2278   if (pollfds == NULL)
2279   {
2280     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2281     return (NULL);
2282   }
2283   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2285   RRDD_LOG(LOG_INFO, "listening for connections");
2287   while (do_shutdown == 0)
2288   {
2289     for (i = 0; i < pollfds_num; i++)
2290     {
2291       pollfds[i].fd = listen_fds[i].fd;
2292       pollfds[i].events = POLLIN | POLLPRI;
2293       pollfds[i].revents = 0;
2294     }
2296     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2297     if (do_shutdown)
2298       break;
2299     else if (status == 0) /* timeout */
2300       continue;
2301     else if (status < 0) /* error */
2302     {
2303       status = errno;
2304       if (status != EINTR)
2305       {
2306         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2307       }
2308       continue;
2309     }
2311     for (i = 0; i < pollfds_num; i++)
2312     {
2313       listen_socket_t *client_sock;
2314       struct sockaddr_storage client_sa;
2315       socklen_t client_sa_size;
2316       pthread_t tid;
2317       pthread_attr_t attr;
2319       if (pollfds[i].revents == 0)
2320         continue;
2322       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2323       {
2324         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2325             "poll(2) returned something unexpected for listen FD #%i.",
2326             pollfds[i].fd);
2327         continue;
2328       }
2330       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2331       if (client_sock == NULL)
2332       {
2333         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2334         continue;
2335       }
2336       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2338       client_sa_size = sizeof (client_sa);
2339       client_sock->fd = accept (pollfds[i].fd,
2340           (struct sockaddr *) &client_sa, &client_sa_size);
2341       if (client_sock->fd < 0)
2342       {
2343         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2344         free(client_sock);
2345         continue;
2346       }
2348       pthread_attr_init (&attr);
2349       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2351       status = pthread_create (&tid, &attr, connection_thread_main,
2352                                client_sock);
2353       if (status != 0)
2354       {
2355         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2356         close_connection(client_sock);
2357         continue;
2358       }
2359     } /* for (pollfds_num) */
2360   } /* while (do_shutdown == 0) */
2362   RRDD_LOG(LOG_INFO, "starting shutdown");
2364   close_listen_sockets ();
2366   pthread_mutex_lock (&connection_threads_lock);
2367   while (connection_threads_num > 0)
2368   {
2369     pthread_t wait_for;
2371     wait_for = connection_threads[0];
2373     pthread_mutex_unlock (&connection_threads_lock);
2374     pthread_join (wait_for, /* retval = */ NULL);
2375     pthread_mutex_lock (&connection_threads_lock);
2376   }
2377   pthread_mutex_unlock (&connection_threads_lock);
2379   free(pollfds);
2381   return (NULL);
2382 } /* }}} void *listen_thread_main */
2384 static int daemonize (void) /* {{{ */
2386   int pid_fd;
2387   char *base_dir;
2389   daemon_uid = geteuid();
2391   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2392   if (pid_fd < 0)
2393     pid_fd = check_pidfile();
2394   if (pid_fd < 0)
2395     return pid_fd;
2397   /* open all the listen sockets */
2398   if (config_listen_address_list_len > 0)
2399   {
2400     for (int i = 0; i < config_listen_address_list_len; i++)
2401     {
2402       open_listen_socket (config_listen_address_list[i]);
2403       free_listen_socket (config_listen_address_list[i]);
2404     }
2406     free(config_listen_address_list);
2407   }
2408   else
2409   {
2410     listen_socket_t sock;
2411     memset(&sock, 0, sizeof(sock));
2412     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2413     open_listen_socket (&sock);
2414   }
2416   if (listen_fds_num < 1)
2417   {
2418     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2419     goto error;
2420   }
2422   if (!stay_foreground)
2423   {
2424     pid_t child;
2426     child = fork ();
2427     if (child < 0)
2428     {
2429       fprintf (stderr, "daemonize: fork(2) failed.\n");
2430       goto error;
2431     }
2432     else if (child > 0)
2433       exit(0);
2435     /* Become session leader */
2436     setsid ();
2438     /* Open the first three file descriptors to /dev/null */
2439     close (2);
2440     close (1);
2441     close (0);
2443     open ("/dev/null", O_RDWR);
2444     dup (0);
2445     dup (0);
2446   } /* if (!stay_foreground) */
2448   /* Change into the /tmp directory. */
2449   base_dir = (config_base_dir != NULL)
2450     ? config_base_dir
2451     : "/tmp";
2453   if (chdir (base_dir) != 0)
2454   {
2455     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2456     goto error;
2457   }
2459   install_signal_handlers();
2461   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2462   RRDD_LOG(LOG_INFO, "starting up");
2464   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2465                                 (GDestroyNotify) free_cache_item);
2466   if (cache_tree == NULL)
2467   {
2468     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2469     goto error;
2470   }
2472   return write_pidfile (pid_fd);
2474 error:
2475   remove_pidfile();
2476   return -1;
2477 } /* }}} int daemonize */
2479 static int cleanup (void) /* {{{ */
2481   do_shutdown++;
2483   pthread_cond_signal (&cache_cond);
2484   pthread_join (queue_thread, /* return = */ NULL);
2486   remove_pidfile ();
2488   free(config_base_dir);
2489   free(config_pid_file);
2490   free(journal_cur);
2491   free(journal_old);
2493   pthread_mutex_lock(&cache_lock);
2494   g_tree_destroy(cache_tree);
2496   RRDD_LOG(LOG_INFO, "goodbye");
2497   closelog ();
2499   return (0);
2500 } /* }}} int cleanup */
2502 static int read_options (int argc, char **argv) /* {{{ */
2504   int option;
2505   int status = 0;
2507   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2508   {
2509     switch (option)
2510     {
2511       case 'g':
2512         stay_foreground=1;
2513         break;
2515       case 'L':
2516       case 'l':
2517       {
2518         listen_socket_t **temp;
2519         listen_socket_t *new;
2521         new = malloc(sizeof(listen_socket_t));
2522         if (new == NULL)
2523         {
2524           fprintf(stderr, "read_options: malloc failed.\n");
2525           return(2);
2526         }
2527         memset(new, 0, sizeof(listen_socket_t));
2529         temp = (listen_socket_t **) realloc (config_listen_address_list,
2530             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2531         if (temp == NULL)
2532         {
2533           fprintf (stderr, "read_options: realloc failed.\n");
2534           return (2);
2535         }
2536         config_listen_address_list = temp;
2538         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2539         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2541         temp[config_listen_address_list_len] = new;
2542         config_listen_address_list_len++;
2543       }
2544       break;
2546       case 'f':
2547       {
2548         int temp;
2550         temp = atoi (optarg);
2551         if (temp > 0)
2552           config_flush_interval = temp;
2553         else
2554         {
2555           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2556           status = 3;
2557         }
2558       }
2559       break;
2561       case 'w':
2562       {
2563         int temp;
2565         temp = atoi (optarg);
2566         if (temp > 0)
2567           config_write_interval = temp;
2568         else
2569         {
2570           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2571           status = 2;
2572         }
2573       }
2574       break;
2576       case 'z':
2577       {
2578         int temp;
2580         temp = atoi(optarg);
2581         if (temp > 0)
2582           config_write_jitter = temp;
2583         else
2584         {
2585           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2586           status = 2;
2587         }
2589         break;
2590       }
2592       case 'B':
2593         config_write_base_only = 1;
2594         break;
2596       case 'b':
2597       {
2598         size_t len;
2599         char base_realpath[PATH_MAX];
2601         if (config_base_dir != NULL)
2602           free (config_base_dir);
2603         config_base_dir = strdup (optarg);
2604         if (config_base_dir == NULL)
2605         {
2606           fprintf (stderr, "read_options: strdup failed.\n");
2607           return (3);
2608         }
2610         /* make sure that the base directory is not resolved via
2611          * symbolic links.  this makes some performance-enhancing
2612          * assumptions possible (we don't have to resolve paths
2613          * that start with a "/")
2614          */
2615         if (realpath(config_base_dir, base_realpath) == NULL)
2616         {
2617           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2618           return 5;
2619         }
2620         else if (strncmp(config_base_dir,
2621                          base_realpath, sizeof(base_realpath)) != 0)
2622         {
2623           fprintf(stderr,
2624                   "Base directory (-b) resolved via file system links!\n"
2625                   "Please consult rrdcached '-b' documentation!\n"
2626                   "Consider specifying the real directory (%s)\n",
2627                   base_realpath);
2628           return 5;
2629         }
2631         len = strlen (config_base_dir);
2632         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2633         {
2634           config_base_dir[len - 1] = 0;
2635           len--;
2636         }
2638         if (len < 1)
2639         {
2640           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2641           return (4);
2642         }
2644         _config_base_dir_len = len;
2645       }
2646       break;
2648       case 'p':
2649       {
2650         if (config_pid_file != NULL)
2651           free (config_pid_file);
2652         config_pid_file = strdup (optarg);
2653         if (config_pid_file == NULL)
2654         {
2655           fprintf (stderr, "read_options: strdup failed.\n");
2656           return (3);
2657         }
2658       }
2659       break;
2661       case 'F':
2662         config_flush_at_shutdown = 1;
2663         break;
2665       case 'j':
2666       {
2667         struct stat statbuf;
2668         const char *dir = optarg;
2670         status = stat(dir, &statbuf);
2671         if (status != 0)
2672         {
2673           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2674           return 6;
2675         }
2677         if (!S_ISDIR(statbuf.st_mode)
2678             || access(dir, R_OK|W_OK|X_OK) != 0)
2679         {
2680           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2681                   errno ? rrd_strerror(errno) : "");
2682           return 6;
2683         }
2685         journal_cur = malloc(PATH_MAX + 1);
2686         journal_old = malloc(PATH_MAX + 1);
2687         if (journal_cur == NULL || journal_old == NULL)
2688         {
2689           fprintf(stderr, "malloc failure for journal files\n");
2690           return 6;
2691         }
2692         else 
2693         {
2694           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2695           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2696         }
2697       }
2698       break;
2700       case 'h':
2701       case '?':
2702         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2703             "\n"
2704             "Usage: rrdcached [options]\n"
2705             "\n"
2706             "Valid options are:\n"
2707             "  -l <address>  Socket address to listen to.\n"
2708             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2709             "  -w <seconds>  Interval in which to write data.\n"
2710             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2711             "  -f <seconds>  Interval in which to flush dead data.\n"
2712             "  -p <file>     Location of the PID-file.\n"
2713             "  -b <dir>      Base directory to change to.\n"
2714             "  -B            Restrict file access to paths within -b <dir>\n"
2715             "  -g            Do not fork and run in the foreground.\n"
2716             "  -j <dir>      Directory in which to create the journal files.\n"
2717             "  -F            Always flush all updates at shutdown\n"
2718             "\n"
2719             "For more information and a detailed description of all options "
2720             "please refer\n"
2721             "to the rrdcached(1) manual page.\n",
2722             VERSION);
2723         status = -1;
2724         break;
2725     } /* switch (option) */
2726   } /* while (getopt) */
2728   /* advise the user when values are not sane */
2729   if (config_flush_interval < 2 * config_write_interval)
2730     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2731             " 2x write interval (-w) !\n");
2732   if (config_write_jitter > config_write_interval)
2733     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2734             " write interval (-w) !\n");
2736   if (config_write_base_only && config_base_dir == NULL)
2737     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2738             "  Consult the rrdcached documentation\n");
2740   if (journal_cur == NULL)
2741     config_flush_at_shutdown = 1;
2743   return (status);
2744 } /* }}} int read_options */
2746 int main (int argc, char **argv)
2748   int status;
2750   status = read_options (argc, argv);
2751   if (status != 0)
2752   {
2753     if (status < 0)
2754       status = 0;
2755     return (status);
2756   }
2758   status = daemonize ();
2759   if (status != 0)
2760   {
2761     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2762     return (1);
2763   }
2765   journal_init();
2767   /* start the queue thread */
2768   memset (&queue_thread, 0, sizeof (queue_thread));
2769   status = pthread_create (&queue_thread,
2770                            NULL, /* attr */
2771                            queue_thread_main,
2772                            NULL); /* args */
2773   if (status != 0)
2774   {
2775     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2776     cleanup();
2777     return (1);
2778   }
2780   listen_thread_main (NULL);
2781   cleanup ();
2783   return (0);
2784 } /* int main */
2786 /*
2787  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2788  */