Code

rrdcached now frees all of its resources correctly. This facilitates
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111  * Types
112  */
113 typedef enum
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
159 struct callback_flush_data_s
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree          *cache_tree = NULL;
198 static cache_item_t   *cache_queue_head = NULL;
199 static cache_item_t   *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter   = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /* 
234  * Functions
235  */
236 static void sig_common (const char *sig) /* {{{ */
238   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239   do_shutdown++;
240   pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
245   sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
250   sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
255   config_flush_at_shutdown = 1;
256   sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
261   config_flush_at_shutdown = 0;
262   sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
267   /* These structures are static, because `sigaction' behaves weird if the are
268    * overwritten.. */
269   static struct sigaction sa_int;
270   static struct sigaction sa_term;
271   static struct sigaction sa_pipe;
272   static struct sigaction sa_usr1;
273   static struct sigaction sa_usr2;
275   /* Install signal handlers */
276   memset (&sa_int, 0, sizeof (sa_int));
277   sa_int.sa_handler = sig_int_handler;
278   sigaction (SIGINT, &sa_int, NULL);
280   memset (&sa_term, 0, sizeof (sa_term));
281   sa_term.sa_handler = sig_term_handler;
282   sigaction (SIGTERM, &sa_term, NULL);
284   memset (&sa_pipe, 0, sizeof (sa_pipe));
285   sa_pipe.sa_handler = SIG_IGN;
286   sigaction (SIGPIPE, &sa_pipe, NULL);
288   memset (&sa_pipe, 0, sizeof (sa_usr1));
289   sa_usr1.sa_handler = sig_usr1_handler;
290   sigaction (SIGUSR1, &sa_usr1, NULL);
292   memset (&sa_usr2, 0, sizeof (sa_usr2));
293   sa_usr2.sa_handler = sig_usr2_handler;
294   sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
300   int fd;
301   char *file;
303   file = (config_pid_file != NULL)
304     ? config_pid_file
305     : LOCALSTATEDIR "/run/rrdcached.pid";
307   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308   if (fd < 0)
309     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310             action, file, rrd_strerror(errno));
312   return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
318   int pid_fd;
319   pid_t pid;
320   char pid_str[16];
322   pid_fd = open_pidfile("open", O_RDWR);
323   if (pid_fd < 0)
324     return pid_fd;
326   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327     return -1;
329   pid = atoi(pid_str);
330   if (pid <= 0)
331     return -1;
333   /* another running process that we can signal COULD be
334    * a competing rrdcached */
335   if (pid != getpid() && kill(pid, 0) == 0)
336   {
337     fprintf(stderr,
338             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339     close(pid_fd);
340     return -1;
341   }
343   lseek(pid_fd, 0, SEEK_SET);
344   ftruncate(pid_fd, 0);
346   fprintf(stderr,
347           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348           "rrdcached: starting normally.\n", pid);
350   return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
355   pid_t pid;
356   FILE *fh;
358   pid = getpid ();
360   fh = fdopen (fd, "w");
361   if (fh == NULL)
362   {
363     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364     close(fd);
365     return (-1);
366   }
368   fprintf (fh, "%i\n", (int) pid);
369   fclose (fh);
371   return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
376   char *file;
377   int status;
379   file = (config_pid_file != NULL)
380     ? config_pid_file
381     : LOCALSTATEDIR "/run/rrdcached.pid";
383   status = unlink (file);
384   if (status == 0)
385     return (0);
386   return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
391   char *eol;
393   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394                sock->next_read - sock->next_cmd);
396   if (eol == NULL)
397   {
398     /* no commands left, move remainder back to front of rbuf */
399     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400             sock->next_read - sock->next_cmd);
401     sock->next_read -= sock->next_cmd;
402     sock->next_cmd = 0;
403     *len = 0;
404     return NULL;
405   }
406   else
407   {
408     char *cmd = sock->rbuf + sock->next_cmd;
409     *eol = '\0';
411     sock->next_cmd = eol - sock->rbuf + 1;
413     if (eol > sock->rbuf && *(eol-1) == '\r')
414       *(--eol) = '\0'; /* handle "\r\n" EOL */
416     *len = eol - cmd;
418     return cmd;
419   }
421   /* NOTREACHED */
422   assert(1==0);
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
428   char *new_buf;
430   assert(sock != NULL);
432   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433   if (new_buf == NULL)
434   {
435     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436     return -1;
437   }
439   strncpy(new_buf + sock->wbuf_len, str, len + 1);
441   sock->wbuf = new_buf;
442   sock->wbuf_len += len;
444   return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
450   va_list argp;
451   char buffer[CMD_MAX];
452   int len;
454   if (sock == NULL) return 0; /* journal replay mode */
455   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461   len = vsprintf(buffer, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465   {
466     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467     return -1;
468   }
470   return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
475   int lines = 0;
477   if (str != NULL)
478   {
479     while ((str = strchr(str, '\n')) != NULL)
480     {
481       ++lines;
482       ++str;
483     }
484   }
486   return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490  * returns 0 on success, -1 on error
491  * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493                           char *fmt, ...) /* {{{ */
495   va_list argp;
496   char buffer[CMD_MAX];
497   int lines;
498   ssize_t wrote;
499   int rclen, len;
501   if (sock == NULL) return rc;  /* journal replay mode */
503   if (sock->batch_start)
504   {
505     if (rc == RESP_OK)
506       return rc; /* no response on success during BATCH */
507     lines = sock->batch_cmd;
508   }
509   else if (rc == RESP_OK)
510     lines = count_lines(sock->wbuf);
511   else
512     lines = -1;
514   rclen = sprintf(buffer, "%d ", lines);
515   va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519   len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521   va_end(argp);
522   if (len < 0)
523     return -1;
525   len += rclen;
527   /* append the result to the wbuf, don't write to the user */
528   if (sock->batch_start)
529     return add_to_wbuf(sock, buffer, len);
531   /* first write must be complete */
532   if (len != write(sock->fd, buffer, len))
533   {
534     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535     return -1;
536   }
538   if (sock->wbuf != NULL && rc == RESP_OK)
539   {
540     wrote = 0;
541     while (wrote < sock->wbuf_len)
542     {
543       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544       if (wb <= 0)
545       {
546         RRDD_LOG(LOG_INFO, "send_response: could not write results");
547         return -1;
548       }
549       wrote += wb;
550     }
551   }
553   free(sock->wbuf); sock->wbuf = NULL;
554   sock->wbuf_len = 0;
556   return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
561   ci->values = NULL;
562   ci->values_num = 0;
564   ci->last_flush_time = when;
565   if (config_write_jitter > 0)
566     ci->last_flush_time += (random() % config_write_jitter);
569 /* remove_from_queue
570  * remove a "cache_item_t" item from the queue.
571  * must hold 'cache_lock' when calling this
572  */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
575   if (ci == NULL) return;
576   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578   if (ci->prev == NULL)
579     cache_queue_head = ci->next; /* reset head */
580   else
581     ci->prev->next = ci->next;
583   if (ci->next == NULL)
584     cache_queue_tail = ci->prev; /* reset the tail */
585   else
586     ci->next->prev = ci->prev;
588   ci->next = ci->prev = NULL;
589   ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
592 /* free the resources associated with the cache_item_t
593  * must hold cache_lock when calling this function
594  */
595 static void *free_cache_item(cache_item_t *ci) /* {{{ */
597   if (ci == NULL) return NULL;
599   remove_from_queue(ci);
601   for (int i=0; i < ci->values_num; i++)
602     free(ci->values[i]);
604   free (ci->values);
605   free (ci->file);
607   /* in case anyone is waiting */
608   pthread_cond_broadcast(&ci->flushed);
610   free (ci);
612   return NULL;
613 } /* }}} static void *free_cache_item */
615 /*
616  * enqueue_cache_item:
617  * `cache_lock' must be acquired before calling this function!
618  */
619 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
620     queue_side_t side)
622   if (ci == NULL)
623     return (-1);
625   if (ci->values_num == 0)
626     return (0);
628   if (side == HEAD)
629   {
630     if (cache_queue_head == ci)
631       return 0;
633     /* remove if further down in queue */
634     remove_from_queue(ci);
636     ci->prev = NULL;
637     ci->next = cache_queue_head;
638     if (ci->next != NULL)
639       ci->next->prev = ci;
640     cache_queue_head = ci;
642     if (cache_queue_tail == NULL)
643       cache_queue_tail = cache_queue_head;
644   }
645   else /* (side == TAIL) */
646   {
647     /* We don't move values back in the list.. */
648     if (ci->flags & CI_FLAGS_IN_QUEUE)
649       return (0);
651     assert (ci->next == NULL);
652     assert (ci->prev == NULL);
654     ci->prev = cache_queue_tail;
656     if (cache_queue_tail == NULL)
657       cache_queue_head = ci;
658     else
659       cache_queue_tail->next = ci;
661     cache_queue_tail = ci;
662   }
664   ci->flags |= CI_FLAGS_IN_QUEUE;
666   pthread_cond_broadcast(&cache_cond);
667   pthread_mutex_lock (&stats_lock);
668   stats_queue_length++;
669   pthread_mutex_unlock (&stats_lock);
671   return (0);
672 } /* }}} int enqueue_cache_item */
674 /*
675  * tree_callback_flush:
676  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
677  * while this is in progress.
678  */
679 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
680     gpointer data)
682   cache_item_t *ci;
683   callback_flush_data_t *cfd;
685   ci = (cache_item_t *) value;
686   cfd = (callback_flush_data_t *) data;
688   if (ci->flags & CI_FLAGS_IN_QUEUE)
689     return FALSE;
691   if ((ci->last_flush_time <= cfd->abs_timeout)
692       && (ci->values_num > 0))
693   {
694     enqueue_cache_item (ci, TAIL);
695   }
696   else if ((do_shutdown != 0)
697       && (ci->values_num > 0))
698   {
699     enqueue_cache_item (ci, TAIL);
700   }
701   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
702       && (ci->values_num <= 0))
703   {
704     char **temp;
706     temp = (char **) realloc (cfd->keys,
707         sizeof (char *) * (cfd->keys_num + 1));
708     if (temp == NULL)
709     {
710       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
711       return (FALSE);
712     }
713     cfd->keys = temp;
714     /* Make really sure this points to the _same_ place */
715     assert ((char *) key == ci->file);
716     cfd->keys[cfd->keys_num] = (char *) key;
717     cfd->keys_num++;
718   }
720   return (FALSE);
721 } /* }}} gboolean tree_callback_flush */
723 static int flush_old_values (int max_age)
725   callback_flush_data_t cfd;
726   size_t k;
728   memset (&cfd, 0, sizeof (cfd));
729   /* Pass the current time as user data so that we don't need to call
730    * `time' for each node. */
731   cfd.now = time (NULL);
732   cfd.keys = NULL;
733   cfd.keys_num = 0;
735   if (max_age > 0)
736     cfd.abs_timeout = cfd.now - max_age;
737   else
738     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
740   /* `tree_callback_flush' will return the keys of all values that haven't
741    * been touched in the last `config_flush_interval' seconds in `cfd'.
742    * The char*'s in this array point to the same memory as ci->file, so we
743    * don't need to free them separately. */
744   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
746   for (k = 0; k < cfd.keys_num; k++)
747   {
748     /* should never fail, since we have held the cache_lock
749      * the entire time */
750     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
751   }
753   if (cfd.keys != NULL)
754   {
755     free (cfd.keys);
756     cfd.keys = NULL;
757   }
759   return (0);
760 } /* int flush_old_values */
762 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
764   struct timeval now;
765   struct timespec next_flush;
766   int final_flush = 0; /* make sure we only flush once on shutdown */
768   gettimeofday (&now, NULL);
769   next_flush.tv_sec = now.tv_sec + config_flush_interval;
770   next_flush.tv_nsec = 1000 * now.tv_usec;
772   pthread_mutex_lock (&cache_lock);
773   while ((do_shutdown == 0) || (cache_queue_head != NULL))
774   {
775     cache_item_t *ci;
776     char *file;
777     char **values;
778     int values_num;
779     int status;
780     int i;
782     /* First, check if it's time to do the cache flush. */
783     gettimeofday (&now, NULL);
784     if ((now.tv_sec > next_flush.tv_sec)
785         || ((now.tv_sec == next_flush.tv_sec)
786           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
787     {
788       /* Flush all values that haven't been written in the last
789        * `config_write_interval' seconds. */
790       flush_old_values (config_write_interval);
792       /* Determine the time of the next cache flush. */
793       next_flush.tv_sec =
794         now.tv_sec + next_flush.tv_sec % config_flush_interval;
796       /* unlock the cache while we rotate so we don't block incoming
797        * updates if the fsync() blocks on disk I/O */
798       pthread_mutex_unlock(&cache_lock);
799       journal_rotate();
800       pthread_mutex_lock(&cache_lock);
801     }
803     /* Now, check if there's something to store away. If not, wait until
804      * something comes in or it's time to do the cache flush.  if we are
805      * shutting down, do not wait around.  */
806     if (cache_queue_head == NULL && !do_shutdown)
807     {
808       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
809       if ((status != 0) && (status != ETIMEDOUT))
810       {
811         RRDD_LOG (LOG_ERR, "queue_thread_main: "
812             "pthread_cond_timedwait returned %i.", status);
813       }
814     }
816     /* We're about to shut down */
817     if (do_shutdown != 0 && !final_flush++)
818     {
819       if (config_flush_at_shutdown)
820         flush_old_values (-1); /* flush everything */
821       else
822         break;
823     }
825     /* Check if a value has arrived. This may be NULL if we timed out or there
826      * was an interrupt such as a signal. */
827     if (cache_queue_head == NULL)
828       continue;
830     ci = cache_queue_head;
832     /* copy the relevant parts */
833     file = strdup (ci->file);
834     if (file == NULL)
835     {
836       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
837       continue;
838     }
840     assert(ci->values != NULL);
841     assert(ci->values_num > 0);
843     values = ci->values;
844     values_num = ci->values_num;
846     wipe_ci_values(ci, time(NULL));
847     remove_from_queue(ci);
849     pthread_mutex_lock (&stats_lock);
850     assert (stats_queue_length > 0);
851     stats_queue_length--;
852     pthread_mutex_unlock (&stats_lock);
854     pthread_mutex_unlock (&cache_lock);
856     rrd_clear_error ();
857     status = rrd_update_r (file, NULL, values_num, (void *) values);
858     if (status != 0)
859     {
860       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
861           "rrd_update_r (%s) failed with status %i. (%s)",
862           file, status, rrd_get_error());
863     }
865     journal_write("wrote", file);
866     pthread_cond_broadcast(&ci->flushed);
868     for (i = 0; i < values_num; i++)
869       free (values[i]);
871     free(values);
872     free(file);
874     if (status == 0)
875     {
876       pthread_mutex_lock (&stats_lock);
877       stats_updates_written++;
878       stats_data_sets_written += values_num;
879       pthread_mutex_unlock (&stats_lock);
880     }
882     pthread_mutex_lock (&cache_lock);
884     /* We're about to shut down */
885     if (do_shutdown != 0 && !final_flush++)
886     {
887       if (config_flush_at_shutdown)
888           flush_old_values (-1); /* flush everything */
889       else
890         break;
891     }
892   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
893   pthread_mutex_unlock (&cache_lock);
895   if (config_flush_at_shutdown)
896   {
897     assert(cache_queue_head == NULL);
898     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
899   }
901   journal_done();
903   return (NULL);
904 } /* }}} void *queue_thread_main */
906 static int buffer_get_field (char **buffer_ret, /* {{{ */
907     size_t *buffer_size_ret, char **field_ret)
909   char *buffer;
910   size_t buffer_pos;
911   size_t buffer_size;
912   char *field;
913   size_t field_size;
914   int status;
916   buffer = *buffer_ret;
917   buffer_pos = 0;
918   buffer_size = *buffer_size_ret;
919   field = *buffer_ret;
920   field_size = 0;
922   if (buffer_size <= 0)
923     return (-1);
925   /* This is ensured by `handle_request'. */
926   assert (buffer[buffer_size - 1] == '\0');
928   status = -1;
929   while (buffer_pos < buffer_size)
930   {
931     /* Check for end-of-field or end-of-buffer */
932     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
933     {
934       field[field_size] = 0;
935       field_size++;
936       buffer_pos++;
937       status = 0;
938       break;
939     }
940     /* Handle escaped characters. */
941     else if (buffer[buffer_pos] == '\\')
942     {
943       if (buffer_pos >= (buffer_size - 1))
944         break;
945       buffer_pos++;
946       field[field_size] = buffer[buffer_pos];
947       field_size++;
948       buffer_pos++;
949     }
950     /* Normal operation */ 
951     else
952     {
953       field[field_size] = buffer[buffer_pos];
954       field_size++;
955       buffer_pos++;
956     }
957   } /* while (buffer_pos < buffer_size) */
959   if (status != 0)
960     return (status);
962   *buffer_ret = buffer + buffer_pos;
963   *buffer_size_ret = buffer_size - buffer_pos;
964   *field_ret = field;
966   return (0);
967 } /* }}} int buffer_get_field */
969 /* if we're restricting writes to the base directory,
970  * check whether the file falls within the dir
971  * returns 1 if OK, otherwise 0
972  */
973 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
975   assert(file != NULL);
977   if (!config_write_base_only
978       || sock == NULL /* journal replay */
979       || config_base_dir == NULL)
980     return 1;
982   if (strstr(file, "../") != NULL) goto err;
984   /* relative paths without "../" are ok */
985   if (*file != '/') return 1;
987   /* file must be of the format base + "/" + <1+ char filename> */
988   if (strlen(file) < _config_base_dir_len + 2) goto err;
989   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
990   if (*(file + _config_base_dir_len) != '/') goto err;
992   return 1;
994 err:
995   if (sock != NULL && sock->fd >= 0)
996     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
998   return 0;
999 } /* }}} static int check_file_access */
1001 /* when using a base dir, convert relative paths to absolute paths.
1002  * if necessary, modifies the "filename" pointer to point
1003  * to the new path created in "tmp".  "tmp" is provided
1004  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1005  *
1006  * this allows us to optimize for the expected case (absolute path)
1007  * with a no-op.
1008  */
1009 static void get_abs_path(char **filename, char *tmp)
1011   assert(tmp != NULL);
1012   assert(filename != NULL && *filename != NULL);
1014   if (config_base_dir == NULL || **filename == '/')
1015     return;
1017   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1018   *filename = tmp;
1019 } /* }}} static int get_abs_path */
1021 /* returns 1 if we have the required privilege level,
1022  * otherwise issue an error to the user on sock */
1023 static int has_privilege (listen_socket_t *sock, /* {{{ */
1024                           socket_privilege priv)
1026   if (sock == NULL) /* journal replay */
1027     return 1;
1029   if (sock->privilege >= priv)
1030     return 1;
1032   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033 } /* }}} static int has_privilege */
1035 static int flush_file (const char *filename) /* {{{ */
1037   cache_item_t *ci;
1039   pthread_mutex_lock (&cache_lock);
1041   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1042   if (ci == NULL)
1043   {
1044     pthread_mutex_unlock (&cache_lock);
1045     return (ENOENT);
1046   }
1048   if (ci->values_num > 0)
1049   {
1050     /* Enqueue at head */
1051     enqueue_cache_item (ci, HEAD);
1052     pthread_cond_wait(&ci->flushed, &cache_lock);
1053   }
1055   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1056    * may have been purged during our cond_wait() */
1058   pthread_mutex_unlock(&cache_lock);
1060   return (0);
1061 } /* }}} int flush_file */
1063 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1064     char *buffer, size_t buffer_size)
1066   int status;
1067   char **help_text;
1068   char *command;
1070   char *help_help[2] =
1071   {
1072     "Command overview\n"
1073     ,
1074     "HELP [<command>]\n"
1075     "FLUSH <filename>\n"
1076     "FLUSHALL\n"
1077     "PENDING <filename>\n"
1078     "FORGET <filename>\n"
1079     "UPDATE <filename> <values> [<values> ...]\n"
1080     "BATCH\n"
1081     "STATS\n"
1082   };
1084   char *help_flush[2] =
1085   {
1086     "Help for FLUSH\n"
1087     ,
1088     "Usage: FLUSH <filename>\n"
1089     "\n"
1090     "Adds the given filename to the head of the update queue and returns\n"
1091     "after is has been dequeued.\n"
1092   };
1094   char *help_flushall[2] =
1095   {
1096     "Help for FLUSHALL\n"
1097     ,
1098     "Usage: FLUSHALL\n"
1099     "\n"
1100     "Triggers writing of all pending updates.  Returns immediately.\n"
1101   };
1103   char *help_pending[2] =
1104   {
1105     "Help for PENDING\n"
1106     ,
1107     "Usage: PENDING <filename>\n"
1108     "\n"
1109     "Shows any 'pending' updates for a file, in order.\n"
1110     "The updates shown have not yet been written to the underlying RRD file.\n"
1111   };
1113   char *help_forget[2] =
1114   {
1115     "Help for FORGET\n"
1116     ,
1117     "Usage: FORGET <filename>\n"
1118     "\n"
1119     "Removes the file completely from the cache.\n"
1120     "Any pending updates for the file will be lost.\n"
1121   };
1123   char *help_update[2] =
1124   {
1125     "Help for UPDATE\n"
1126     ,
1127     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1128     "\n"
1129     "Adds the given file to the internal cache if it is not yet known and\n"
1130     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1131     "for details.\n"
1132     "\n"
1133     "Each <values> has the following form:\n"
1134     "  <values> = <time>:<value>[:<value>[...]]\n"
1135     "See the rrdupdate(1) manpage for details.\n"
1136   };
1138   char *help_stats[2] =
1139   {
1140     "Help for STATS\n"
1141     ,
1142     "Usage: STATS\n"
1143     "\n"
1144     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1145     "a description of the values.\n"
1146   };
1148   char *help_batch[2] =
1149   {
1150     "Help for BATCH\n"
1151     ,
1152     "The 'BATCH' command permits the client to initiate a bulk load\n"
1153     "   of commands to rrdcached.\n"
1154     "\n"
1155     "Usage:\n"
1156     "\n"
1157     "    client: BATCH\n"
1158     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1159     "    client: command #1\n"
1160     "    client: command #2\n"
1161     "    client: ... and so on\n"
1162     "    client: .\n"
1163     "    server: 2 errors\n"
1164     "    server: 7 message for command #7\n"
1165     "    server: 9 message for command #9\n"
1166     "\n"
1167     "For more information, consult the rrdcached(1) documentation.\n"
1168   };
1170   status = buffer_get_field (&buffer, &buffer_size, &command);
1171   if (status != 0)
1172     help_text = help_help;
1173   else
1174   {
1175     if (strcasecmp (command, "update") == 0)
1176       help_text = help_update;
1177     else if (strcasecmp (command, "flush") == 0)
1178       help_text = help_flush;
1179     else if (strcasecmp (command, "flushall") == 0)
1180       help_text = help_flushall;
1181     else if (strcasecmp (command, "pending") == 0)
1182       help_text = help_pending;
1183     else if (strcasecmp (command, "forget") == 0)
1184       help_text = help_forget;
1185     else if (strcasecmp (command, "stats") == 0)
1186       help_text = help_stats;
1187     else if (strcasecmp (command, "batch") == 0)
1188       help_text = help_batch;
1189     else
1190       help_text = help_help;
1191   }
1193   add_response_info(sock, help_text[1]);
1194   return send_response(sock, RESP_OK, help_text[0]);
1195 } /* }}} int handle_request_help */
1197 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1199   uint64_t copy_queue_length;
1200   uint64_t copy_updates_received;
1201   uint64_t copy_flush_received;
1202   uint64_t copy_updates_written;
1203   uint64_t copy_data_sets_written;
1204   uint64_t copy_journal_bytes;
1205   uint64_t copy_journal_rotate;
1207   uint64_t tree_nodes_number;
1208   uint64_t tree_depth;
1210   pthread_mutex_lock (&stats_lock);
1211   copy_queue_length       = stats_queue_length;
1212   copy_updates_received   = stats_updates_received;
1213   copy_flush_received     = stats_flush_received;
1214   copy_updates_written    = stats_updates_written;
1215   copy_data_sets_written  = stats_data_sets_written;
1216   copy_journal_bytes      = stats_journal_bytes;
1217   copy_journal_rotate     = stats_journal_rotate;
1218   pthread_mutex_unlock (&stats_lock);
1220   pthread_mutex_lock (&cache_lock);
1221   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1222   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1223   pthread_mutex_unlock (&cache_lock);
1225   add_response_info(sock,
1226                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1227   add_response_info(sock,
1228                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1229   add_response_info(sock,
1230                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1231   add_response_info(sock,
1232                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1233   add_response_info(sock,
1234                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1235   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1236   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1237   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1238   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1240   send_response(sock, RESP_OK, "Statistics follow\n");
1242   return (0);
1243 } /* }}} int handle_request_stats */
1245 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1246     char *buffer, size_t buffer_size)
1248   char *file, file_tmp[PATH_MAX];
1249   int status;
1251   status = buffer_get_field (&buffer, &buffer_size, &file);
1252   if (status != 0)
1253   {
1254     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1255   }
1256   else
1257   {
1258     pthread_mutex_lock(&stats_lock);
1259     stats_flush_received++;
1260     pthread_mutex_unlock(&stats_lock);
1262     get_abs_path(&file, file_tmp);
1263     if (!check_file_access(file, sock)) return 0;
1265     status = flush_file (file);
1266     if (status == 0)
1267       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1268     else if (status == ENOENT)
1269     {
1270       /* no file in our tree; see whether it exists at all */
1271       struct stat statbuf;
1273       memset(&statbuf, 0, sizeof(statbuf));
1274       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1275         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1276       else
1277         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1278     }
1279     else if (status < 0)
1280       return send_response(sock, RESP_ERR, "Internal error.\n");
1281     else
1282       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1283   }
1285   /* NOTREACHED */
1286   assert(1==0);
1287 } /* }}} int handle_request_flush */
1289 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1291   int status;
1293   status = has_privilege(sock, PRIV_HIGH);
1294   if (status <= 0)
1295     return status;
1297   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1299   pthread_mutex_lock(&cache_lock);
1300   flush_old_values(-1);
1301   pthread_mutex_unlock(&cache_lock);
1303   return send_response(sock, RESP_OK, "Started flush.\n");
1304 } /* }}} static int handle_request_flushall */
1306 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1307                                   char *buffer, size_t buffer_size)
1309   int status;
1310   char *file, file_tmp[PATH_MAX];
1311   cache_item_t *ci;
1313   status = buffer_get_field(&buffer, &buffer_size, &file);
1314   if (status != 0)
1315     return send_response(sock, RESP_ERR,
1316                          "Usage: PENDING <filename>\n");
1318   status = has_privilege(sock, PRIV_HIGH);
1319   if (status <= 0)
1320     return status;
1322   get_abs_path(&file, file_tmp);
1324   pthread_mutex_lock(&cache_lock);
1325   ci = g_tree_lookup(cache_tree, file);
1326   if (ci == NULL)
1327   {
1328     pthread_mutex_unlock(&cache_lock);
1329     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1330   }
1332   for (int i=0; i < ci->values_num; i++)
1333     add_response_info(sock, "%s\n", ci->values[i]);
1335   pthread_mutex_unlock(&cache_lock);
1336   return send_response(sock, RESP_OK, "updates pending\n");
1337 } /* }}} static int handle_request_pending */
1339 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1340                                  char *buffer, size_t buffer_size)
1342   int status;
1343   gboolean found;
1344   char *file, file_tmp[PATH_MAX];
1346   status = buffer_get_field(&buffer, &buffer_size, &file);
1347   if (status != 0)
1348     return send_response(sock, RESP_ERR,
1349                          "Usage: FORGET <filename>\n");
1351   status = has_privilege(sock, PRIV_HIGH);
1352   if (status <= 0)
1353     return status;
1355   get_abs_path(&file, file_tmp);
1356   if (!check_file_access(file, sock)) return 0;
1358   pthread_mutex_lock(&cache_lock);
1359   found = g_tree_remove(cache_tree, file);
1360   pthread_mutex_unlock(&cache_lock);
1362   if (found == TRUE)
1363   {
1364     if (sock != NULL)
1365       journal_write("forget", file);
1367     return send_response(sock, RESP_OK, "Gone!\n");
1368   }
1369   else
1370     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1372   /* NOTREACHED */
1373   assert(1==0);
1374 } /* }}} static int handle_request_forget */
1376 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1377                                   time_t now,
1378                                   char *buffer, size_t buffer_size)
1380   char *file, file_tmp[PATH_MAX];
1381   int values_num = 0;
1382   int status;
1383   char orig_buf[CMD_MAX];
1385   cache_item_t *ci;
1387   status = has_privilege(sock, PRIV_HIGH);
1388   if (status <= 0)
1389     return status;
1391   /* save it for the journal later */
1392   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1394   status = buffer_get_field (&buffer, &buffer_size, &file);
1395   if (status != 0)
1396     return send_response(sock, RESP_ERR,
1397                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1399   pthread_mutex_lock(&stats_lock);
1400   stats_updates_received++;
1401   pthread_mutex_unlock(&stats_lock);
1403   get_abs_path(&file, file_tmp);
1404   if (!check_file_access(file, sock)) return 0;
1406   pthread_mutex_lock (&cache_lock);
1407   ci = g_tree_lookup (cache_tree, file);
1409   if (ci == NULL) /* {{{ */
1410   {
1411     struct stat statbuf;
1413     /* don't hold the lock while we setup; stat(2) might block */
1414     pthread_mutex_unlock(&cache_lock);
1416     memset (&statbuf, 0, sizeof (statbuf));
1417     status = stat (file, &statbuf);
1418     if (status != 0)
1419     {
1420       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1422       status = errno;
1423       if (status == ENOENT)
1424         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1425       else
1426         return send_response(sock, RESP_ERR,
1427                              "stat failed with error %i.\n", status);
1428     }
1429     if (!S_ISREG (statbuf.st_mode))
1430       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1432     if (access(file, R_OK|W_OK) != 0)
1433       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1434                            file, rrd_strerror(errno));
1436     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1437     if (ci == NULL)
1438     {
1439       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1441       return send_response(sock, RESP_ERR, "malloc failed.\n");
1442     }
1443     memset (ci, 0, sizeof (cache_item_t));
1445     ci->file = strdup (file);
1446     if (ci->file == NULL)
1447     {
1448       free (ci);
1449       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1451       return send_response(sock, RESP_ERR, "strdup failed.\n");
1452     }
1454     wipe_ci_values(ci, now);
1455     ci->flags = CI_FLAGS_IN_TREE;
1456     pthread_cond_init(&ci->flushed, NULL);
1458     pthread_mutex_lock(&cache_lock);
1459     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1460   } /* }}} */
1461   assert (ci != NULL);
1463   /* don't re-write updates in replay mode */
1464   if (sock != NULL)
1465     journal_write("update", orig_buf);
1467   while (buffer_size > 0)
1468   {
1469     char **temp;
1470     char *value;
1471     time_t stamp;
1472     char *eostamp;
1474     status = buffer_get_field (&buffer, &buffer_size, &value);
1475     if (status != 0)
1476     {
1477       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1478       break;
1479     }
1481     /* make sure update time is always moving forward */
1482     stamp = strtol(value, &eostamp, 10);
1483     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1484     {
1485       pthread_mutex_unlock(&cache_lock);
1486       return send_response(sock, RESP_ERR,
1487                            "Cannot find timestamp in '%s'!\n", value);
1488     }
1489     else if (stamp <= ci->last_update_stamp)
1490     {
1491       pthread_mutex_unlock(&cache_lock);
1492       return send_response(sock, RESP_ERR,
1493                            "illegal attempt to update using time %ld when last"
1494                            " update time is %ld (minimum one second step)\n",
1495                            stamp, ci->last_update_stamp);
1496     }
1497     else
1498       ci->last_update_stamp = stamp;
1500     temp = (char **) realloc (ci->values,
1501         sizeof (char *) * (ci->values_num + 1));
1502     if (temp == NULL)
1503     {
1504       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1505       continue;
1506     }
1507     ci->values = temp;
1509     ci->values[ci->values_num] = strdup (value);
1510     if (ci->values[ci->values_num] == NULL)
1511     {
1512       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1513       continue;
1514     }
1515     ci->values_num++;
1517     values_num++;
1518   }
1520   if (((now - ci->last_flush_time) >= config_write_interval)
1521       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1522       && (ci->values_num > 0))
1523   {
1524     enqueue_cache_item (ci, TAIL);
1525   }
1527   pthread_mutex_unlock (&cache_lock);
1529   if (values_num < 1)
1530     return send_response(sock, RESP_ERR, "No values updated.\n");
1531   else
1532     return send_response(sock, RESP_OK,
1533                          "errors, enqueued %i value(s).\n", values_num);
1535   /* NOTREACHED */
1536   assert(1==0);
1538 } /* }}} int handle_request_update */
1540 /* we came across a "WROTE" entry during journal replay.
1541  * throw away any values that we have accumulated for this file
1542  */
1543 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1545   int i;
1546   cache_item_t *ci;
1547   const char *file = buffer;
1549   pthread_mutex_lock(&cache_lock);
1551   ci = g_tree_lookup(cache_tree, file);
1552   if (ci == NULL)
1553   {
1554     pthread_mutex_unlock(&cache_lock);
1555     return (0);
1556   }
1558   if (ci->values)
1559   {
1560     for (i=0; i < ci->values_num; i++)
1561       free(ci->values[i]);
1563     free(ci->values);
1564   }
1566   wipe_ci_values(ci, now);
1567   remove_from_queue(ci);
1569   pthread_mutex_unlock(&cache_lock);
1570   return (0);
1571 } /* }}} int handle_request_wrote */
1573 /* start "BATCH" processing */
1574 static int batch_start (listen_socket_t *sock) /* {{{ */
1576   int status;
1577   if (sock->batch_start)
1578     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1580   status = send_response(sock, RESP_OK,
1581                          "Go ahead.  End with dot '.' on its own line.\n");
1582   sock->batch_start = time(NULL);
1583   sock->batch_cmd = 0;
1585   return status;
1586 } /* }}} static int batch_start */
1588 /* finish "BATCH" processing and return results to the client */
1589 static int batch_done (listen_socket_t *sock) /* {{{ */
1591   assert(sock->batch_start);
1592   sock->batch_start = 0;
1593   sock->batch_cmd  = 0;
1594   return send_response(sock, RESP_OK, "errors\n");
1595 } /* }}} static int batch_done */
1597 /* if sock==NULL, we are in journal replay mode */
1598 static int handle_request (listen_socket_t *sock, /* {{{ */
1599                            time_t now,
1600                            char *buffer, size_t buffer_size)
1602   char *buffer_ptr;
1603   char *command;
1604   int status;
1606   assert (buffer[buffer_size - 1] == '\0');
1608   buffer_ptr = buffer;
1609   command = NULL;
1610   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1611   if (status != 0)
1612   {
1613     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1614     return (-1);
1615   }
1617   if (sock != NULL && sock->batch_start)
1618     sock->batch_cmd++;
1620   if (strcasecmp (command, "update") == 0)
1621     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1622   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1623   {
1624     /* this is only valid in replay mode */
1625     return (handle_request_wrote (buffer_ptr, now));
1626   }
1627   else if (strcasecmp (command, "flush") == 0)
1628     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1629   else if (strcasecmp (command, "flushall") == 0)
1630     return (handle_request_flushall(sock));
1631   else if (strcasecmp (command, "pending") == 0)
1632     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1633   else if (strcasecmp (command, "forget") == 0)
1634     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1635   else if (strcasecmp (command, "stats") == 0)
1636     return (handle_request_stats (sock));
1637   else if (strcasecmp (command, "help") == 0)
1638     return (handle_request_help (sock, buffer_ptr, buffer_size));
1639   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1640     return batch_start(sock);
1641   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1642     return batch_done(sock);
1643   else
1644     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1646   /* NOTREACHED */
1647   assert(1==0);
1648 } /* }}} int handle_request */
1650 /* MUST NOT hold journal_lock before calling this */
1651 static void journal_rotate(void) /* {{{ */
1653   FILE *old_fh = NULL;
1654   int new_fd;
1656   if (journal_cur == NULL || journal_old == NULL)
1657     return;
1659   pthread_mutex_lock(&journal_lock);
1661   /* we rotate this way (rename before close) so that the we can release
1662    * the journal lock as fast as possible.  Journal writes to the new
1663    * journal can proceed immediately after the new file is opened.  The
1664    * fclose can then block without affecting new updates.
1665    */
1666   if (journal_fh != NULL)
1667   {
1668     old_fh = journal_fh;
1669     journal_fh = NULL;
1670     rename(journal_cur, journal_old);
1671     ++stats_journal_rotate;
1672   }
1674   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1675                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1676   if (new_fd >= 0)
1677   {
1678     journal_fh = fdopen(new_fd, "a");
1679     if (journal_fh == NULL)
1680       close(new_fd);
1681   }
1683   pthread_mutex_unlock(&journal_lock);
1685   if (old_fh != NULL)
1686     fclose(old_fh);
1688   if (journal_fh == NULL)
1689   {
1690     RRDD_LOG(LOG_CRIT,
1691              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1692              journal_cur, rrd_strerror(errno));
1694     RRDD_LOG(LOG_ERR,
1695              "JOURNALING DISABLED: All values will be flushed at shutdown");
1696     config_flush_at_shutdown = 1;
1697   }
1699 } /* }}} static void journal_rotate */
1701 static void journal_done(void) /* {{{ */
1703   if (journal_cur == NULL)
1704     return;
1706   pthread_mutex_lock(&journal_lock);
1707   if (journal_fh != NULL)
1708   {
1709     fclose(journal_fh);
1710     journal_fh = NULL;
1711   }
1713   if (config_flush_at_shutdown)
1714   {
1715     RRDD_LOG(LOG_INFO, "removing journals");
1716     unlink(journal_old);
1717     unlink(journal_cur);
1718   }
1719   else
1720   {
1721     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1722              "journals will be used at next startup");
1723   }
1725   pthread_mutex_unlock(&journal_lock);
1727 } /* }}} static void journal_done */
1729 static int journal_write(char *cmd, char *args) /* {{{ */
1731   int chars;
1733   if (journal_fh == NULL)
1734     return 0;
1736   pthread_mutex_lock(&journal_lock);
1737   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1738   pthread_mutex_unlock(&journal_lock);
1740   if (chars > 0)
1741   {
1742     pthread_mutex_lock(&stats_lock);
1743     stats_journal_bytes += chars;
1744     pthread_mutex_unlock(&stats_lock);
1745   }
1747   return chars;
1748 } /* }}} static int journal_write */
1750 static int journal_replay (const char *file) /* {{{ */
1752   FILE *fh;
1753   int entry_cnt = 0;
1754   int fail_cnt = 0;
1755   uint64_t line = 0;
1756   char entry[CMD_MAX];
1757   time_t now;
1759   if (file == NULL) return 0;
1761   {
1762     char *reason;
1763     int status = 0;
1764     struct stat statbuf;
1766     memset(&statbuf, 0, sizeof(statbuf));
1767     if (stat(file, &statbuf) != 0)
1768     {
1769       if (errno == ENOENT)
1770         return 0;
1772       reason = "stat error";
1773       status = errno;
1774     }
1775     else if (!S_ISREG(statbuf.st_mode))
1776     {
1777       reason = "not a regular file";
1778       status = EPERM;
1779     }
1780     if (statbuf.st_uid != daemon_uid)
1781     {
1782       reason = "not owned by daemon user";
1783       status = EACCES;
1784     }
1785     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1786     {
1787       reason = "must not be user/group writable";
1788       status = EACCES;
1789     }
1791     if (status != 0)
1792     {
1793       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1794                file, rrd_strerror(status), reason);
1795       return 0;
1796     }
1797   }
1799   fh = fopen(file, "r");
1800   if (fh == NULL)
1801   {
1802     if (errno != ENOENT)
1803       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1804                file, rrd_strerror(errno));
1805     return 0;
1806   }
1807   else
1808     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1810   now = time(NULL);
1812   while(!feof(fh))
1813   {
1814     size_t entry_len;
1816     ++line;
1817     if (fgets(entry, sizeof(entry), fh) == NULL)
1818       break;
1819     entry_len = strlen(entry);
1821     /* check \n termination in case journal writing crashed mid-line */
1822     if (entry_len == 0)
1823       continue;
1824     else if (entry[entry_len - 1] != '\n')
1825     {
1826       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1827       ++fail_cnt;
1828       continue;
1829     }
1831     entry[entry_len - 1] = '\0';
1833     if (handle_request(NULL, now, entry, entry_len) == 0)
1834       ++entry_cnt;
1835     else
1836       ++fail_cnt;
1837   }
1839   fclose(fh);
1841   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1842            entry_cnt, fail_cnt);
1844   return entry_cnt > 0 ? 1 : 0;
1845 } /* }}} static int journal_replay */
1847 static void journal_init(void) /* {{{ */
1849   int had_journal = 0;
1851   if (journal_cur == NULL) return;
1853   pthread_mutex_lock(&journal_lock);
1855   RRDD_LOG(LOG_INFO, "checking for journal files");
1857   had_journal += journal_replay(journal_old);
1858   had_journal += journal_replay(journal_cur);
1860   /* it must have been a crash.  start a flush */
1861   if (had_journal && config_flush_at_shutdown)
1862     flush_old_values(-1);
1864   pthread_mutex_unlock(&journal_lock);
1865   journal_rotate();
1867   RRDD_LOG(LOG_INFO, "journal processing complete");
1869 } /* }}} static void journal_init */
1871 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1873   assert(sock != NULL);
1875   free(sock->rbuf);  sock->rbuf = NULL;
1876   free(sock->wbuf);  sock->wbuf = NULL;
1877   free(sock);
1878 } /* }}} void free_listen_socket */
1880 static void close_connection(listen_socket_t *sock) /* {{{ */
1882   if (sock->fd >= 0)
1883   {
1884     close(sock->fd);
1885     sock->fd = -1;
1886   }
1888   free_listen_socket(sock);
1890 } /* }}} void close_connection */
1892 static void *connection_thread_main (void *args) /* {{{ */
1894   listen_socket_t *sock;
1895   int i;
1896   int fd;
1898   sock = (listen_socket_t *) args;
1899   fd = sock->fd;
1901   /* init read buffers */
1902   sock->next_read = sock->next_cmd = 0;
1903   sock->rbuf = malloc(RBUF_SIZE);
1904   if (sock->rbuf == NULL)
1905   {
1906     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1907     close_connection(sock);
1908     return NULL;
1909   }
1911   pthread_mutex_lock (&connection_threads_lock);
1912   {
1913     pthread_t *temp;
1915     temp = (pthread_t *) realloc (connection_threads,
1916         sizeof (pthread_t) * (connection_threads_num + 1));
1917     if (temp == NULL)
1918     {
1919       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1920     }
1921     else
1922     {
1923       connection_threads = temp;
1924       connection_threads[connection_threads_num] = pthread_self ();
1925       connection_threads_num++;
1926     }
1927   }
1928   pthread_mutex_unlock (&connection_threads_lock);
1930   while (do_shutdown == 0)
1931   {
1932     char *cmd;
1933     ssize_t cmd_len;
1934     ssize_t rbytes;
1935     time_t now;
1937     struct pollfd pollfd;
1938     int status;
1940     pollfd.fd = fd;
1941     pollfd.events = POLLIN | POLLPRI;
1942     pollfd.revents = 0;
1944     status = poll (&pollfd, 1, /* timeout = */ 500);
1945     if (do_shutdown)
1946       break;
1947     else if (status == 0) /* timeout */
1948       continue;
1949     else if (status < 0) /* error */
1950     {
1951       status = errno;
1952       if (status != EINTR)
1953         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1954       continue;
1955     }
1957     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1958       break;
1959     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1960     {
1961       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1962           "poll(2) returned something unexpected: %#04hx",
1963           pollfd.revents);
1964       break;
1965     }
1967     rbytes = read(fd, sock->rbuf + sock->next_read,
1968                   RBUF_SIZE - sock->next_read);
1969     if (rbytes < 0)
1970     {
1971       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1972       break;
1973     }
1974     else if (rbytes == 0)
1975       break; /* eof */
1977     sock->next_read += rbytes;
1979     if (sock->batch_start)
1980       now = sock->batch_start;
1981     else
1982       now = time(NULL);
1984     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1985     {
1986       status = handle_request (sock, now, cmd, cmd_len+1);
1987       if (status != 0)
1988         goto out_close;
1989     }
1990   }
1992 out_close:
1993   close_connection(sock);
1995   /* Remove this thread from the connection threads list */
1996   pthread_mutex_lock (&connection_threads_lock);
1997   {
1998     pthread_t self;
1999     pthread_t *temp;
2001     /* Find out own index in the array */
2002     self = pthread_self ();
2003     for (i = 0; i < connection_threads_num; i++)
2004       if (pthread_equal (connection_threads[i], self) != 0)
2005         break;
2006     assert (i < connection_threads_num);
2008     /* Move the trailing threads forward. */
2009     if (i < (connection_threads_num - 1))
2010     {
2011       memmove (connection_threads + i,
2012                connection_threads + i + 1,
2013                sizeof (pthread_t) * (connection_threads_num - i - 1));
2014     }
2016     connection_threads_num--;
2018     temp = realloc(connection_threads,
2019                    sizeof(*connection_threads) * connection_threads_num);
2020     if (connection_threads_num > 0 && temp == NULL)
2021       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2022     else
2023       connection_threads = temp;
2024   }
2025   pthread_mutex_unlock (&connection_threads_lock);
2027   return (NULL);
2028 } /* }}} void *connection_thread_main */
2030 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2032   int fd;
2033   struct sockaddr_un sa;
2034   listen_socket_t *temp;
2035   int status;
2036   const char *path;
2038   path = sock->addr;
2039   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2040     path += strlen("unix:");
2042   temp = (listen_socket_t *) realloc (listen_fds,
2043       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2044   if (temp == NULL)
2045   {
2046     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2047     return (-1);
2048   }
2049   listen_fds = temp;
2050   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2052   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2053   if (fd < 0)
2054   {
2055     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2056              rrd_strerror(errno));
2057     return (-1);
2058   }
2060   memset (&sa, 0, sizeof (sa));
2061   sa.sun_family = AF_UNIX;
2062   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2064   /* if we've gotten this far, we own the pid file.  any daemon started
2065    * with the same args must not be alive.  therefore, ensure that we can
2066    * create the socket...
2067    */
2068   unlink(path);
2070   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2071   if (status != 0)
2072   {
2073     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2074              path, rrd_strerror(errno));
2075     close (fd);
2076     return (-1);
2077   }
2079   status = listen (fd, /* backlog = */ 10);
2080   if (status != 0)
2081   {
2082     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2083              path, rrd_strerror(errno));
2084     close (fd);
2085     unlink (path);
2086     return (-1);
2087   }
2089   listen_fds[listen_fds_num].fd = fd;
2090   listen_fds[listen_fds_num].family = PF_UNIX;
2091   strncpy(listen_fds[listen_fds_num].addr, path,
2092           sizeof (listen_fds[listen_fds_num].addr) - 1);
2093   listen_fds_num++;
2095   return (0);
2096 } /* }}} int open_listen_socket_unix */
2098 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2100   struct addrinfo ai_hints;
2101   struct addrinfo *ai_res;
2102   struct addrinfo *ai_ptr;
2103   char addr_copy[NI_MAXHOST];
2104   char *addr;
2105   char *port;
2106   int status;
2108   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2109   addr_copy[sizeof (addr_copy) - 1] = 0;
2110   addr = addr_copy;
2112   memset (&ai_hints, 0, sizeof (ai_hints));
2113   ai_hints.ai_flags = 0;
2114 #ifdef AI_ADDRCONFIG
2115   ai_hints.ai_flags |= AI_ADDRCONFIG;
2116 #endif
2117   ai_hints.ai_family = AF_UNSPEC;
2118   ai_hints.ai_socktype = SOCK_STREAM;
2120   port = NULL;
2121   if (*addr == '[') /* IPv6+port format */
2122   {
2123     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2124     addr++;
2126     port = strchr (addr, ']');
2127     if (port == NULL)
2128     {
2129       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2130       return (-1);
2131     }
2132     *port = 0;
2133     port++;
2135     if (*port == ':')
2136       port++;
2137     else if (*port == 0)
2138       port = NULL;
2139     else
2140     {
2141       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2142       return (-1);
2143     }
2144   } /* if (*addr = ']') */
2145   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2146   {
2147     port = rindex(addr, ':');
2148     if (port != NULL)
2149     {
2150       *port = 0;
2151       port++;
2152     }
2153   }
2154   ai_res = NULL;
2155   status = getaddrinfo (addr,
2156                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2157                         &ai_hints, &ai_res);
2158   if (status != 0)
2159   {
2160     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2161              addr, gai_strerror (status));
2162     return (-1);
2163   }
2165   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2166   {
2167     int fd;
2168     listen_socket_t *temp;
2169     int one = 1;
2171     temp = (listen_socket_t *) realloc (listen_fds,
2172         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2173     if (temp == NULL)
2174     {
2175       fprintf (stderr,
2176                "rrdcached: open_listen_socket_network: realloc failed.\n");
2177       continue;
2178     }
2179     listen_fds = temp;
2180     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2182     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2183     if (fd < 0)
2184     {
2185       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2186                rrd_strerror(errno));
2187       continue;
2188     }
2190     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2192     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2193     if (status != 0)
2194     {
2195       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2196                sock->addr, rrd_strerror(errno));
2197       close (fd);
2198       continue;
2199     }
2201     status = listen (fd, /* backlog = */ 10);
2202     if (status != 0)
2203     {
2204       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2205                sock->addr, rrd_strerror(errno));
2206       close (fd);
2207       freeaddrinfo(ai_res);
2208       return (-1);
2209     }
2211     listen_fds[listen_fds_num].fd = fd;
2212     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2213     listen_fds_num++;
2214   } /* for (ai_ptr) */
2216   freeaddrinfo(ai_res);
2217   return (0);
2218 } /* }}} static int open_listen_socket_network */
2220 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2222   assert(sock != NULL);
2223   assert(sock->addr != NULL);
2225   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2226       || sock->addr[0] == '/')
2227     return (open_listen_socket_unix(sock));
2228   else
2229     return (open_listen_socket_network(sock));
2230 } /* }}} int open_listen_socket */
2232 static int close_listen_sockets (void) /* {{{ */
2234   size_t i;
2236   for (i = 0; i < listen_fds_num; i++)
2237   {
2238     close (listen_fds[i].fd);
2240     if (listen_fds[i].family == PF_UNIX)
2241       unlink(listen_fds[i].addr);
2242   }
2244   free (listen_fds);
2245   listen_fds = NULL;
2246   listen_fds_num = 0;
2248   return (0);
2249 } /* }}} int close_listen_sockets */
2251 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2253   struct pollfd *pollfds;
2254   int pollfds_num;
2255   int status;
2256   int i;
2258   if (listen_fds_num < 1)
2259   {
2260     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2261     return (NULL);
2262   }
2264   pollfds_num = listen_fds_num;
2265   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2266   if (pollfds == NULL)
2267   {
2268     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2269     return (NULL);
2270   }
2271   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2273   RRDD_LOG(LOG_INFO, "listening for connections");
2275   while (do_shutdown == 0)
2276   {
2277     for (i = 0; i < pollfds_num; i++)
2278     {
2279       pollfds[i].fd = listen_fds[i].fd;
2280       pollfds[i].events = POLLIN | POLLPRI;
2281       pollfds[i].revents = 0;
2282     }
2284     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2285     if (do_shutdown)
2286       break;
2287     else if (status == 0) /* timeout */
2288       continue;
2289     else if (status < 0) /* error */
2290     {
2291       status = errno;
2292       if (status != EINTR)
2293       {
2294         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2295       }
2296       continue;
2297     }
2299     for (i = 0; i < pollfds_num; i++)
2300     {
2301       listen_socket_t *client_sock;
2302       struct sockaddr_storage client_sa;
2303       socklen_t client_sa_size;
2304       pthread_t tid;
2305       pthread_attr_t attr;
2307       if (pollfds[i].revents == 0)
2308         continue;
2310       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2311       {
2312         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2313             "poll(2) returned something unexpected for listen FD #%i.",
2314             pollfds[i].fd);
2315         continue;
2316       }
2318       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2319       if (client_sock == NULL)
2320       {
2321         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2322         continue;
2323       }
2324       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2326       client_sa_size = sizeof (client_sa);
2327       client_sock->fd = accept (pollfds[i].fd,
2328           (struct sockaddr *) &client_sa, &client_sa_size);
2329       if (client_sock->fd < 0)
2330       {
2331         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2332         free(client_sock);
2333         continue;
2334       }
2336       pthread_attr_init (&attr);
2337       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2339       status = pthread_create (&tid, &attr, connection_thread_main,
2340                                client_sock);
2341       if (status != 0)
2342       {
2343         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2344         close_connection(client_sock);
2345         continue;
2346       }
2347     } /* for (pollfds_num) */
2348   } /* while (do_shutdown == 0) */
2350   RRDD_LOG(LOG_INFO, "starting shutdown");
2352   close_listen_sockets ();
2354   pthread_mutex_lock (&connection_threads_lock);
2355   while (connection_threads_num > 0)
2356   {
2357     pthread_t wait_for;
2359     wait_for = connection_threads[0];
2361     pthread_mutex_unlock (&connection_threads_lock);
2362     pthread_join (wait_for, /* retval = */ NULL);
2363     pthread_mutex_lock (&connection_threads_lock);
2364   }
2365   pthread_mutex_unlock (&connection_threads_lock);
2367   free(pollfds);
2369   return (NULL);
2370 } /* }}} void *listen_thread_main */
2372 static int daemonize (void) /* {{{ */
2374   int pid_fd;
2375   char *base_dir;
2377   daemon_uid = geteuid();
2379   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2380   if (pid_fd < 0)
2381     pid_fd = check_pidfile();
2382   if (pid_fd < 0)
2383     return pid_fd;
2385   /* open all the listen sockets */
2386   if (config_listen_address_list_len > 0)
2387   {
2388     for (int i = 0; i < config_listen_address_list_len; i++)
2389     {
2390       open_listen_socket (config_listen_address_list[i]);
2391       free_listen_socket (config_listen_address_list[i]);
2392     }
2394     free(config_listen_address_list);
2395   }
2396   else
2397   {
2398     listen_socket_t sock;
2399     memset(&sock, 0, sizeof(sock));
2400     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2401     open_listen_socket (&sock);
2402   }
2404   if (listen_fds_num < 1)
2405   {
2406     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2407     goto error;
2408   }
2410   if (!stay_foreground)
2411   {
2412     pid_t child;
2414     child = fork ();
2415     if (child < 0)
2416     {
2417       fprintf (stderr, "daemonize: fork(2) failed.\n");
2418       goto error;
2419     }
2420     else if (child > 0)
2421       exit(0);
2423     /* Become session leader */
2424     setsid ();
2426     /* Open the first three file descriptors to /dev/null */
2427     close (2);
2428     close (1);
2429     close (0);
2431     open ("/dev/null", O_RDWR);
2432     dup (0);
2433     dup (0);
2434   } /* if (!stay_foreground) */
2436   /* Change into the /tmp directory. */
2437   base_dir = (config_base_dir != NULL)
2438     ? config_base_dir
2439     : "/tmp";
2441   if (chdir (base_dir) != 0)
2442   {
2443     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2444     goto error;
2445   }
2447   install_signal_handlers();
2449   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2450   RRDD_LOG(LOG_INFO, "starting up");
2452   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2453                                 (GDestroyNotify) free_cache_item);
2454   if (cache_tree == NULL)
2455   {
2456     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2457     goto error;
2458   }
2460   return write_pidfile (pid_fd);
2462 error:
2463   remove_pidfile();
2464   return -1;
2465 } /* }}} int daemonize */
2467 static int cleanup (void) /* {{{ */
2469   do_shutdown++;
2471   pthread_cond_signal (&cache_cond);
2472   pthread_join (queue_thread, /* return = */ NULL);
2474   remove_pidfile ();
2476   free(config_base_dir);
2477   free(config_pid_file);
2478   free(journal_cur);
2479   free(journal_old);
2481   pthread_mutex_lock(&cache_lock);
2482   g_tree_destroy(cache_tree);
2484   RRDD_LOG(LOG_INFO, "goodbye");
2485   closelog ();
2487   return (0);
2488 } /* }}} int cleanup */
2490 static int read_options (int argc, char **argv) /* {{{ */
2492   int option;
2493   int status = 0;
2495   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2496   {
2497     switch (option)
2498     {
2499       case 'g':
2500         stay_foreground=1;
2501         break;
2503       case 'L':
2504       case 'l':
2505       {
2506         listen_socket_t **temp;
2507         listen_socket_t *new;
2509         new = malloc(sizeof(listen_socket_t));
2510         if (new == NULL)
2511         {
2512           fprintf(stderr, "read_options: malloc failed.\n");
2513           return(2);
2514         }
2515         memset(new, 0, sizeof(listen_socket_t));
2517         temp = (listen_socket_t **) realloc (config_listen_address_list,
2518             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2519         if (temp == NULL)
2520         {
2521           fprintf (stderr, "read_options: realloc failed.\n");
2522           return (2);
2523         }
2524         config_listen_address_list = temp;
2526         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2527         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2529         temp[config_listen_address_list_len] = new;
2530         config_listen_address_list_len++;
2531       }
2532       break;
2534       case 'f':
2535       {
2536         int temp;
2538         temp = atoi (optarg);
2539         if (temp > 0)
2540           config_flush_interval = temp;
2541         else
2542         {
2543           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2544           status = 3;
2545         }
2546       }
2547       break;
2549       case 'w':
2550       {
2551         int temp;
2553         temp = atoi (optarg);
2554         if (temp > 0)
2555           config_write_interval = temp;
2556         else
2557         {
2558           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2559           status = 2;
2560         }
2561       }
2562       break;
2564       case 'z':
2565       {
2566         int temp;
2568         temp = atoi(optarg);
2569         if (temp > 0)
2570           config_write_jitter = temp;
2571         else
2572         {
2573           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2574           status = 2;
2575         }
2577         break;
2578       }
2580       case 'B':
2581         config_write_base_only = 1;
2582         break;
2584       case 'b':
2585       {
2586         size_t len;
2587         char base_realpath[PATH_MAX];
2589         if (config_base_dir != NULL)
2590           free (config_base_dir);
2591         config_base_dir = strdup (optarg);
2592         if (config_base_dir == NULL)
2593         {
2594           fprintf (stderr, "read_options: strdup failed.\n");
2595           return (3);
2596         }
2598         /* make sure that the base directory is not resolved via
2599          * symbolic links.  this makes some performance-enhancing
2600          * assumptions possible (we don't have to resolve paths
2601          * that start with a "/")
2602          */
2603         if (realpath(config_base_dir, base_realpath) == NULL)
2604         {
2605           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2606           return 5;
2607         }
2608         else if (strncmp(config_base_dir,
2609                          base_realpath, sizeof(base_realpath)) != 0)
2610         {
2611           fprintf(stderr,
2612                   "Base directory (-b) resolved via file system links!\n"
2613                   "Please consult rrdcached '-b' documentation!\n"
2614                   "Consider specifying the real directory (%s)\n",
2615                   base_realpath);
2616           return 5;
2617         }
2619         len = strlen (config_base_dir);
2620         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2621         {
2622           config_base_dir[len - 1] = 0;
2623           len--;
2624         }
2626         if (len < 1)
2627         {
2628           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2629           return (4);
2630         }
2632         _config_base_dir_len = len;
2633       }
2634       break;
2636       case 'p':
2637       {
2638         if (config_pid_file != NULL)
2639           free (config_pid_file);
2640         config_pid_file = strdup (optarg);
2641         if (config_pid_file == NULL)
2642         {
2643           fprintf (stderr, "read_options: strdup failed.\n");
2644           return (3);
2645         }
2646       }
2647       break;
2649       case 'F':
2650         config_flush_at_shutdown = 1;
2651         break;
2653       case 'j':
2654       {
2655         struct stat statbuf;
2656         const char *dir = optarg;
2658         status = stat(dir, &statbuf);
2659         if (status != 0)
2660         {
2661           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2662           return 6;
2663         }
2665         if (!S_ISDIR(statbuf.st_mode)
2666             || access(dir, R_OK|W_OK|X_OK) != 0)
2667         {
2668           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2669                   errno ? rrd_strerror(errno) : "");
2670           return 6;
2671         }
2673         journal_cur = malloc(PATH_MAX + 1);
2674         journal_old = malloc(PATH_MAX + 1);
2675         if (journal_cur == NULL || journal_old == NULL)
2676         {
2677           fprintf(stderr, "malloc failure for journal files\n");
2678           return 6;
2679         }
2680         else 
2681         {
2682           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2683           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2684         }
2685       }
2686       break;
2688       case 'h':
2689       case '?':
2690         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2691             "\n"
2692             "Usage: rrdcached [options]\n"
2693             "\n"
2694             "Valid options are:\n"
2695             "  -l <address>  Socket address to listen to.\n"
2696             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2697             "  -w <seconds>  Interval in which to write data.\n"
2698             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2699             "  -f <seconds>  Interval in which to flush dead data.\n"
2700             "  -p <file>     Location of the PID-file.\n"
2701             "  -b <dir>      Base directory to change to.\n"
2702             "  -B            Restrict file access to paths within -b <dir>\n"
2703             "  -g            Do not fork and run in the foreground.\n"
2704             "  -j <dir>      Directory in which to create the journal files.\n"
2705             "  -F            Always flush all updates at shutdown\n"
2706             "\n"
2707             "For more information and a detailed description of all options "
2708             "please refer\n"
2709             "to the rrdcached(1) manual page.\n",
2710             VERSION);
2711         status = -1;
2712         break;
2713     } /* switch (option) */
2714   } /* while (getopt) */
2716   /* advise the user when values are not sane */
2717   if (config_flush_interval < 2 * config_write_interval)
2718     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2719             " 2x write interval (-w) !\n");
2720   if (config_write_jitter > config_write_interval)
2721     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2722             " write interval (-w) !\n");
2724   if (config_write_base_only && config_base_dir == NULL)
2725     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2726             "  Consult the rrdcached documentation\n");
2728   if (journal_cur == NULL)
2729     config_flush_at_shutdown = 1;
2731   return (status);
2732 } /* }}} int read_options */
2734 int main (int argc, char **argv)
2736   int status;
2738   status = read_options (argc, argv);
2739   if (status != 0)
2740   {
2741     if (status < 0)
2742       status = 0;
2743     return (status);
2744   }
2746   status = daemonize ();
2747   if (status != 0)
2748   {
2749     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2750     return (1);
2751   }
2753   journal_init();
2755   /* start the queue thread */
2756   memset (&queue_thread, 0, sizeof (queue_thread));
2757   status = pthread_create (&queue_thread,
2758                            NULL, /* attr */
2759                            queue_thread_main,
2760                            NULL); /* args */
2761   if (status != 0)
2762   {
2763     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2764     cleanup();
2765     return (1);
2766   }
2768   listen_thread_main (NULL);
2769   cleanup ();
2771   return (0);
2772 } /* int main */
2774 /*
2775  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2776  */