Code

This patch allows for multiple RRD writer threads to service the queue.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111  * Types
112  */
113 typedef enum
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
159 struct callback_flush_data_s
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t *queue_threads;
191 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
192 static int config_queue_threads = 4;
194 static pthread_t flush_thread;
195 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
197 static pthread_t *connection_threads = NULL;
198 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
199 static int connection_threads_num = 0;
201 /* Cache stuff */
202 static GTree          *cache_tree = NULL;
203 static cache_item_t   *cache_queue_head = NULL;
204 static cache_item_t   *cache_queue_tail = NULL;
205 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
207 static int config_write_interval = 300;
208 static int config_write_jitter   = 0;
209 static int config_flush_interval = 3600;
210 static int config_flush_at_shutdown = 0;
211 static char *config_pid_file = NULL;
212 static char *config_base_dir = NULL;
213 static size_t _config_base_dir_len = 0;
214 static int config_write_base_only = 0;
216 static listen_socket_t **config_listen_address_list = NULL;
217 static int config_listen_address_list_len = 0;
219 static uint64_t stats_queue_length = 0;
220 static uint64_t stats_updates_received = 0;
221 static uint64_t stats_flush_received = 0;
222 static uint64_t stats_updates_written = 0;
223 static uint64_t stats_data_sets_written = 0;
224 static uint64_t stats_journal_bytes = 0;
225 static uint64_t stats_journal_rotate = 0;
226 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
228 /* Journaled updates */
229 static char *journal_cur = NULL;
230 static char *journal_old = NULL;
231 static FILE *journal_fh = NULL;
232 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
233 static int journal_write(char *cmd, char *args);
234 static void journal_done(void);
235 static void journal_rotate(void);
237 /* 
238  * Functions
239  */
240 static void sig_common (const char *sig) /* {{{ */
242   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
243   do_shutdown++;
244   pthread_cond_broadcast(&flush_cond);
245   pthread_cond_broadcast(&queue_cond);
246 } /* }}} void sig_common */
248 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
250   sig_common("INT");
251 } /* }}} void sig_int_handler */
253 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
255   sig_common("TERM");
256 } /* }}} void sig_term_handler */
258 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
260   config_flush_at_shutdown = 1;
261   sig_common("USR1");
262 } /* }}} void sig_usr1_handler */
264 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
266   config_flush_at_shutdown = 0;
267   sig_common("USR2");
268 } /* }}} void sig_usr2_handler */
270 static void install_signal_handlers(void) /* {{{ */
272   /* These structures are static, because `sigaction' behaves weird if the are
273    * overwritten.. */
274   static struct sigaction sa_int;
275   static struct sigaction sa_term;
276   static struct sigaction sa_pipe;
277   static struct sigaction sa_usr1;
278   static struct sigaction sa_usr2;
280   /* Install signal handlers */
281   memset (&sa_int, 0, sizeof (sa_int));
282   sa_int.sa_handler = sig_int_handler;
283   sigaction (SIGINT, &sa_int, NULL);
285   memset (&sa_term, 0, sizeof (sa_term));
286   sa_term.sa_handler = sig_term_handler;
287   sigaction (SIGTERM, &sa_term, NULL);
289   memset (&sa_pipe, 0, sizeof (sa_pipe));
290   sa_pipe.sa_handler = SIG_IGN;
291   sigaction (SIGPIPE, &sa_pipe, NULL);
293   memset (&sa_pipe, 0, sizeof (sa_usr1));
294   sa_usr1.sa_handler = sig_usr1_handler;
295   sigaction (SIGUSR1, &sa_usr1, NULL);
297   memset (&sa_usr2, 0, sizeof (sa_usr2));
298   sa_usr2.sa_handler = sig_usr2_handler;
299   sigaction (SIGUSR2, &sa_usr2, NULL);
301 } /* }}} void install_signal_handlers */
303 static int open_pidfile(char *action, int oflag) /* {{{ */
305   int fd;
306   char *file;
308   file = (config_pid_file != NULL)
309     ? config_pid_file
310     : LOCALSTATEDIR "/run/rrdcached.pid";
312   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
313   if (fd < 0)
314     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
315             action, file, rrd_strerror(errno));
317   return(fd);
318 } /* }}} static int open_pidfile */
320 /* check existing pid file to see whether a daemon is running */
321 static int check_pidfile(void)
323   int pid_fd;
324   pid_t pid;
325   char pid_str[16];
327   pid_fd = open_pidfile("open", O_RDWR);
328   if (pid_fd < 0)
329     return pid_fd;
331   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
332     return -1;
334   pid = atoi(pid_str);
335   if (pid <= 0)
336     return -1;
338   /* another running process that we can signal COULD be
339    * a competing rrdcached */
340   if (pid != getpid() && kill(pid, 0) == 0)
341   {
342     fprintf(stderr,
343             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
344     close(pid_fd);
345     return -1;
346   }
348   lseek(pid_fd, 0, SEEK_SET);
349   ftruncate(pid_fd, 0);
351   fprintf(stderr,
352           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
353           "rrdcached: starting normally.\n", pid);
355   return pid_fd;
356 } /* }}} static int check_pidfile */
358 static int write_pidfile (int fd) /* {{{ */
360   pid_t pid;
361   FILE *fh;
363   pid = getpid ();
365   fh = fdopen (fd, "w");
366   if (fh == NULL)
367   {
368     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
369     close(fd);
370     return (-1);
371   }
373   fprintf (fh, "%i\n", (int) pid);
374   fclose (fh);
376   return (0);
377 } /* }}} int write_pidfile */
379 static int remove_pidfile (void) /* {{{ */
381   char *file;
382   int status;
384   file = (config_pid_file != NULL)
385     ? config_pid_file
386     : LOCALSTATEDIR "/run/rrdcached.pid";
388   status = unlink (file);
389   if (status == 0)
390     return (0);
391   return (errno);
392 } /* }}} int remove_pidfile */
394 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
396   char *eol;
398   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
399                sock->next_read - sock->next_cmd);
401   if (eol == NULL)
402   {
403     /* no commands left, move remainder back to front of rbuf */
404     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
405             sock->next_read - sock->next_cmd);
406     sock->next_read -= sock->next_cmd;
407     sock->next_cmd = 0;
408     *len = 0;
409     return NULL;
410   }
411   else
412   {
413     char *cmd = sock->rbuf + sock->next_cmd;
414     *eol = '\0';
416     sock->next_cmd = eol - sock->rbuf + 1;
418     if (eol > sock->rbuf && *(eol-1) == '\r')
419       *(--eol) = '\0'; /* handle "\r\n" EOL */
421     *len = eol - cmd;
423     return cmd;
424   }
426   /* NOTREACHED */
427   assert(1==0);
430 /* add the characters directly to the write buffer */
431 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
433   char *new_buf;
435   assert(sock != NULL);
437   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
438   if (new_buf == NULL)
439   {
440     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
441     return -1;
442   }
444   strncpy(new_buf + sock->wbuf_len, str, len + 1);
446   sock->wbuf = new_buf;
447   sock->wbuf_len += len;
449   return 0;
450 } /* }}} static int add_to_wbuf */
452 /* add the text to the "extra" info that's sent after the status line */
453 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
455   va_list argp;
456   char buffer[CMD_MAX];
457   int len;
459   if (sock == NULL) return 0; /* journal replay mode */
460   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
462   va_start(argp, fmt);
463 #ifdef HAVE_VSNPRINTF
464   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
465 #else
466   len = vsprintf(buffer, fmt, argp);
467 #endif
468   va_end(argp);
469   if (len < 0)
470   {
471     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
472     return -1;
473   }
475   return add_to_wbuf(sock, buffer, len);
476 } /* }}} static int add_response_info */
478 static int count_lines(char *str) /* {{{ */
480   int lines = 0;
482   if (str != NULL)
483   {
484     while ((str = strchr(str, '\n')) != NULL)
485     {
486       ++lines;
487       ++str;
488     }
489   }
491   return lines;
492 } /* }}} static int count_lines */
494 /* send the response back to the user.
495  * returns 0 on success, -1 on error
496  * write buffer is always zeroed after this call */
497 static int send_response (listen_socket_t *sock, response_code rc,
498                           char *fmt, ...) /* {{{ */
500   va_list argp;
501   char buffer[CMD_MAX];
502   int lines;
503   ssize_t wrote;
504   int rclen, len;
506   if (sock == NULL) return rc;  /* journal replay mode */
508   if (sock->batch_start)
509   {
510     if (rc == RESP_OK)
511       return rc; /* no response on success during BATCH */
512     lines = sock->batch_cmd;
513   }
514   else if (rc == RESP_OK)
515     lines = count_lines(sock->wbuf);
516   else
517     lines = -1;
519   rclen = sprintf(buffer, "%d ", lines);
520   va_start(argp, fmt);
521 #ifdef HAVE_VSNPRINTF
522   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
523 #else
524   len = vsprintf(buffer+rclen, fmt, argp);
525 #endif
526   va_end(argp);
527   if (len < 0)
528     return -1;
530   len += rclen;
532   /* append the result to the wbuf, don't write to the user */
533   if (sock->batch_start)
534     return add_to_wbuf(sock, buffer, len);
536   /* first write must be complete */
537   if (len != write(sock->fd, buffer, len))
538   {
539     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
540     return -1;
541   }
543   if (sock->wbuf != NULL && rc == RESP_OK)
544   {
545     wrote = 0;
546     while (wrote < sock->wbuf_len)
547     {
548       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
549       if (wb <= 0)
550       {
551         RRDD_LOG(LOG_INFO, "send_response: could not write results");
552         return -1;
553       }
554       wrote += wb;
555     }
556   }
558   free(sock->wbuf); sock->wbuf = NULL;
559   sock->wbuf_len = 0;
561   return 0;
562 } /* }}} */
564 static void wipe_ci_values(cache_item_t *ci, time_t when)
566   ci->values = NULL;
567   ci->values_num = 0;
569   ci->last_flush_time = when;
570   if (config_write_jitter > 0)
571     ci->last_flush_time += (random() % config_write_jitter);
574 /* remove_from_queue
575  * remove a "cache_item_t" item from the queue.
576  * must hold 'cache_lock' when calling this
577  */
578 static void remove_from_queue(cache_item_t *ci) /* {{{ */
580   if (ci == NULL) return;
581   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
583   if (ci->prev == NULL)
584     cache_queue_head = ci->next; /* reset head */
585   else
586     ci->prev->next = ci->next;
588   if (ci->next == NULL)
589     cache_queue_tail = ci->prev; /* reset the tail */
590   else
591     ci->next->prev = ci->prev;
593   ci->next = ci->prev = NULL;
594   ci->flags &= ~CI_FLAGS_IN_QUEUE;
596   pthread_mutex_lock (&stats_lock);
597   assert (stats_queue_length > 0);
598   stats_queue_length--;
599   pthread_mutex_unlock (&stats_lock);
601 } /* }}} static void remove_from_queue */
603 /* free the resources associated with the cache_item_t
604  * must hold cache_lock when calling this function
605  */
606 static void *free_cache_item(cache_item_t *ci) /* {{{ */
608   if (ci == NULL) return NULL;
610   remove_from_queue(ci);
612   for (int i=0; i < ci->values_num; i++)
613     free(ci->values[i]);
615   free (ci->values);
616   free (ci->file);
618   /* in case anyone is waiting */
619   pthread_cond_broadcast(&ci->flushed);
621   free (ci);
623   return NULL;
624 } /* }}} static void *free_cache_item */
626 /*
627  * enqueue_cache_item:
628  * `cache_lock' must be acquired before calling this function!
629  */
630 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
631     queue_side_t side)
633   if (ci == NULL)
634     return (-1);
636   if (ci->values_num == 0)
637     return (0);
639   if (side == HEAD)
640   {
641     if (cache_queue_head == ci)
642       return 0;
644     /* remove if further down in queue */
645     remove_from_queue(ci);
647     ci->prev = NULL;
648     ci->next = cache_queue_head;
649     if (ci->next != NULL)
650       ci->next->prev = ci;
651     cache_queue_head = ci;
653     if (cache_queue_tail == NULL)
654       cache_queue_tail = cache_queue_head;
655   }
656   else /* (side == TAIL) */
657   {
658     /* We don't move values back in the list.. */
659     if (ci->flags & CI_FLAGS_IN_QUEUE)
660       return (0);
662     assert (ci->next == NULL);
663     assert (ci->prev == NULL);
665     ci->prev = cache_queue_tail;
667     if (cache_queue_tail == NULL)
668       cache_queue_head = ci;
669     else
670       cache_queue_tail->next = ci;
672     cache_queue_tail = ci;
673   }
675   ci->flags |= CI_FLAGS_IN_QUEUE;
677   pthread_cond_signal(&queue_cond);
678   pthread_mutex_lock (&stats_lock);
679   stats_queue_length++;
680   pthread_mutex_unlock (&stats_lock);
682   return (0);
683 } /* }}} int enqueue_cache_item */
685 /*
686  * tree_callback_flush:
687  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
688  * while this is in progress.
689  */
690 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
691     gpointer data)
693   cache_item_t *ci;
694   callback_flush_data_t *cfd;
696   ci = (cache_item_t *) value;
697   cfd = (callback_flush_data_t *) data;
699   if (ci->flags & CI_FLAGS_IN_QUEUE)
700     return FALSE;
702   if ((ci->last_flush_time <= cfd->abs_timeout)
703       && (ci->values_num > 0))
704   {
705     enqueue_cache_item (ci, TAIL);
706   }
707   else if ((do_shutdown != 0)
708       && (ci->values_num > 0))
709   {
710     enqueue_cache_item (ci, TAIL);
711   }
712   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
713       && (ci->values_num <= 0))
714   {
715     char **temp;
717     temp = (char **) rrd_realloc (cfd->keys,
718         sizeof (char *) * (cfd->keys_num + 1));
719     if (temp == NULL)
720     {
721       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
722       return (FALSE);
723     }
724     cfd->keys = temp;
725     /* Make really sure this points to the _same_ place */
726     assert ((char *) key == ci->file);
727     cfd->keys[cfd->keys_num] = (char *) key;
728     cfd->keys_num++;
729   }
731   return (FALSE);
732 } /* }}} gboolean tree_callback_flush */
734 static int flush_old_values (int max_age)
736   callback_flush_data_t cfd;
737   size_t k;
739   memset (&cfd, 0, sizeof (cfd));
740   /* Pass the current time as user data so that we don't need to call
741    * `time' for each node. */
742   cfd.now = time (NULL);
743   cfd.keys = NULL;
744   cfd.keys_num = 0;
746   if (max_age > 0)
747     cfd.abs_timeout = cfd.now - max_age;
748   else
749     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
751   /* `tree_callback_flush' will return the keys of all values that haven't
752    * been touched in the last `config_flush_interval' seconds in `cfd'.
753    * The char*'s in this array point to the same memory as ci->file, so we
754    * don't need to free them separately. */
755   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
757   for (k = 0; k < cfd.keys_num; k++)
758   {
759     /* should never fail, since we have held the cache_lock
760      * the entire time */
761     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
762   }
764   if (cfd.keys != NULL)
765   {
766     free (cfd.keys);
767     cfd.keys = NULL;
768   }
770   return (0);
771 } /* int flush_old_values */
773 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
775   struct timeval now;
776   struct timespec next_flush;
777   int status;
779   gettimeofday (&now, NULL);
780   next_flush.tv_sec = now.tv_sec + config_flush_interval;
781   next_flush.tv_nsec = 1000 * now.tv_usec;
783   pthread_mutex_lock(&cache_lock);
785   while (!do_shutdown)
786   {
787     gettimeofday (&now, NULL);
788     if ((now.tv_sec > next_flush.tv_sec)
789         || ((now.tv_sec == next_flush.tv_sec)
790           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
791     {
792       /* Flush all values that haven't been written in the last
793        * `config_write_interval' seconds. */
794       flush_old_values (config_write_interval);
796       /* Determine the time of the next cache flush. */
797       next_flush.tv_sec =
798         now.tv_sec + next_flush.tv_sec % config_flush_interval;
800       /* unlock the cache while we rotate so we don't block incoming
801        * updates if the fsync() blocks on disk I/O */
802       pthread_mutex_unlock(&cache_lock);
803       journal_rotate();
804       pthread_mutex_lock(&cache_lock);
805     }
807     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
808     if (status != 0 && status != ETIMEDOUT)
809     {
810       RRDD_LOG (LOG_ERR, "flush_thread_main: "
811                 "pthread_cond_timedwait returned %i.", status);
812     }
813   }
815   if (config_flush_at_shutdown)
816     flush_old_values (-1); /* flush everything */
818   pthread_mutex_unlock(&cache_lock);
820   return NULL;
821 } /* void *flush_thread_main */
823 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
825   pthread_mutex_lock (&cache_lock);
827   while (!do_shutdown
828          || (cache_queue_head != NULL && config_flush_at_shutdown))
829   {
830     cache_item_t *ci;
831     char *file;
832     char **values;
833     int values_num;
834     int status;
835     int i;
837     /* Now, check if there's something to store away. If not, wait until
838      * something comes in.  if we are shutting down, do not wait around.  */
839     if (cache_queue_head == NULL && !do_shutdown)
840     {
841       status = pthread_cond_wait (&queue_cond, &cache_lock);
842       if ((status != 0) && (status != ETIMEDOUT))
843       {
844         RRDD_LOG (LOG_ERR, "queue_thread_main: "
845             "pthread_cond_wait returned %i.", status);
846       }
847     }
849     /* Check if a value has arrived. This may be NULL if we timed out or there
850      * was an interrupt such as a signal. */
851     if (cache_queue_head == NULL)
852       continue;
854     ci = cache_queue_head;
856     /* copy the relevant parts */
857     file = strdup (ci->file);
858     if (file == NULL)
859     {
860       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
861       continue;
862     }
864     assert(ci->values != NULL);
865     assert(ci->values_num > 0);
867     values = ci->values;
868     values_num = ci->values_num;
870     wipe_ci_values(ci, time(NULL));
871     remove_from_queue(ci);
873     pthread_mutex_unlock (&cache_lock);
875     rrd_clear_error ();
876     status = rrd_update_r (file, NULL, values_num, (void *) values);
877     if (status != 0)
878     {
879       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
880           "rrd_update_r (%s) failed with status %i. (%s)",
881           file, status, rrd_get_error());
882     }
884     journal_write("wrote", file);
885     pthread_cond_broadcast(&ci->flushed);
887     for (i = 0; i < values_num; i++)
888       free (values[i]);
890     free(values);
891     free(file);
893     if (status == 0)
894     {
895       pthread_mutex_lock (&stats_lock);
896       stats_updates_written++;
897       stats_data_sets_written += values_num;
898       pthread_mutex_unlock (&stats_lock);
899     }
901     pthread_mutex_lock (&cache_lock);
902   }
903   pthread_mutex_unlock (&cache_lock);
905   return (NULL);
906 } /* }}} void *queue_thread_main */
908 static int buffer_get_field (char **buffer_ret, /* {{{ */
909     size_t *buffer_size_ret, char **field_ret)
911   char *buffer;
912   size_t buffer_pos;
913   size_t buffer_size;
914   char *field;
915   size_t field_size;
916   int status;
918   buffer = *buffer_ret;
919   buffer_pos = 0;
920   buffer_size = *buffer_size_ret;
921   field = *buffer_ret;
922   field_size = 0;
924   if (buffer_size <= 0)
925     return (-1);
927   /* This is ensured by `handle_request'. */
928   assert (buffer[buffer_size - 1] == '\0');
930   status = -1;
931   while (buffer_pos < buffer_size)
932   {
933     /* Check for end-of-field or end-of-buffer */
934     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
935     {
936       field[field_size] = 0;
937       field_size++;
938       buffer_pos++;
939       status = 0;
940       break;
941     }
942     /* Handle escaped characters. */
943     else if (buffer[buffer_pos] == '\\')
944     {
945       if (buffer_pos >= (buffer_size - 1))
946         break;
947       buffer_pos++;
948       field[field_size] = buffer[buffer_pos];
949       field_size++;
950       buffer_pos++;
951     }
952     /* Normal operation */ 
953     else
954     {
955       field[field_size] = buffer[buffer_pos];
956       field_size++;
957       buffer_pos++;
958     }
959   } /* while (buffer_pos < buffer_size) */
961   if (status != 0)
962     return (status);
964   *buffer_ret = buffer + buffer_pos;
965   *buffer_size_ret = buffer_size - buffer_pos;
966   *field_ret = field;
968   return (0);
969 } /* }}} int buffer_get_field */
971 /* if we're restricting writes to the base directory,
972  * check whether the file falls within the dir
973  * returns 1 if OK, otherwise 0
974  */
975 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
977   assert(file != NULL);
979   if (!config_write_base_only
980       || sock == NULL /* journal replay */
981       || config_base_dir == NULL)
982     return 1;
984   if (strstr(file, "../") != NULL) goto err;
986   /* relative paths without "../" are ok */
987   if (*file != '/') return 1;
989   /* file must be of the format base + "/" + <1+ char filename> */
990   if (strlen(file) < _config_base_dir_len + 2) goto err;
991   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
992   if (*(file + _config_base_dir_len) != '/') goto err;
994   return 1;
996 err:
997   if (sock != NULL && sock->fd >= 0)
998     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1000   return 0;
1001 } /* }}} static int check_file_access */
1003 /* when using a base dir, convert relative paths to absolute paths.
1004  * if necessary, modifies the "filename" pointer to point
1005  * to the new path created in "tmp".  "tmp" is provided
1006  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1007  *
1008  * this allows us to optimize for the expected case (absolute path)
1009  * with a no-op.
1010  */
1011 static void get_abs_path(char **filename, char *tmp)
1013   assert(tmp != NULL);
1014   assert(filename != NULL && *filename != NULL);
1016   if (config_base_dir == NULL || **filename == '/')
1017     return;
1019   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1020   *filename = tmp;
1021 } /* }}} static int get_abs_path */
1023 /* returns 1 if we have the required privilege level,
1024  * otherwise issue an error to the user on sock */
1025 static int has_privilege (listen_socket_t *sock, /* {{{ */
1026                           socket_privilege priv)
1028   if (sock == NULL) /* journal replay */
1029     return 1;
1031   if (sock->privilege >= priv)
1032     return 1;
1034   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1035 } /* }}} static int has_privilege */
1037 static int flush_file (const char *filename) /* {{{ */
1039   cache_item_t *ci;
1041   pthread_mutex_lock (&cache_lock);
1043   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1044   if (ci == NULL)
1045   {
1046     pthread_mutex_unlock (&cache_lock);
1047     return (ENOENT);
1048   }
1050   if (ci->values_num > 0)
1051   {
1052     /* Enqueue at head */
1053     enqueue_cache_item (ci, HEAD);
1054     pthread_cond_wait(&ci->flushed, &cache_lock);
1055   }
1057   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1058    * may have been purged during our cond_wait() */
1060   pthread_mutex_unlock(&cache_lock);
1062   return (0);
1063 } /* }}} int flush_file */
1065 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1066     char *buffer, size_t buffer_size)
1068   int status;
1069   char **help_text;
1070   char *command;
1072   char *help_help[2] =
1073   {
1074     "Command overview\n"
1075     ,
1076     "HELP [<command>]\n"
1077     "FLUSH <filename>\n"
1078     "FLUSHALL\n"
1079     "PENDING <filename>\n"
1080     "FORGET <filename>\n"
1081     "QUEUE\n"
1082     "UPDATE <filename> <values> [<values> ...]\n"
1083     "BATCH\n"
1084     "STATS\n"
1085     "QUIT\n"
1086   };
1088   char *help_flush[2] =
1089   {
1090     "Help for FLUSH\n"
1091     ,
1092     "Usage: FLUSH <filename>\n"
1093     "\n"
1094     "Adds the given filename to the head of the update queue and returns\n"
1095     "after is has been dequeued.\n"
1096   };
1098   char *help_flushall[2] =
1099   {
1100     "Help for FLUSHALL\n"
1101     ,
1102     "Usage: FLUSHALL\n"
1103     "\n"
1104     "Triggers writing of all pending updates.  Returns immediately.\n"
1105   };
1107   char *help_pending[2] =
1108   {
1109     "Help for PENDING\n"
1110     ,
1111     "Usage: PENDING <filename>\n"
1112     "\n"
1113     "Shows any 'pending' updates for a file, in order.\n"
1114     "The updates shown have not yet been written to the underlying RRD file.\n"
1115   };
1117   char *help_forget[2] =
1118   {
1119     "Help for FORGET\n"
1120     ,
1121     "Usage: FORGET <filename>\n"
1122     "\n"
1123     "Removes the file completely from the cache.\n"
1124     "Any pending updates for the file will be lost.\n"
1125   };
1127   char *help_queue[2] =
1128   {
1129     "Help for QUEUE\n"
1130     ,
1131     "Shows all files in the output queue.\n"
1132     "The output is zero or more lines in the following format:\n"
1133     "(where <num_vals> is the number of values to be written)\n"
1134     "\n"
1135     "<num_vals> <filename>\n"
1136     "\n"
1137   };
1139   char *help_update[2] =
1140   {
1141     "Help for UPDATE\n"
1142     ,
1143     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1144     "\n"
1145     "Adds the given file to the internal cache if it is not yet known and\n"
1146     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1147     "for details.\n"
1148     "\n"
1149     "Each <values> has the following form:\n"
1150     "  <values> = <time>:<value>[:<value>[...]]\n"
1151     "See the rrdupdate(1) manpage for details.\n"
1152   };
1154   char *help_stats[2] =
1155   {
1156     "Help for STATS\n"
1157     ,
1158     "Usage: STATS\n"
1159     "\n"
1160     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1161     "a description of the values.\n"
1162   };
1164   char *help_batch[2] =
1165   {
1166     "Help for BATCH\n"
1167     ,
1168     "The 'BATCH' command permits the client to initiate a bulk load\n"
1169     "   of commands to rrdcached.\n"
1170     "\n"
1171     "Usage:\n"
1172     "\n"
1173     "    client: BATCH\n"
1174     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1175     "    client: command #1\n"
1176     "    client: command #2\n"
1177     "    client: ... and so on\n"
1178     "    client: .\n"
1179     "    server: 2 errors\n"
1180     "    server: 7 message for command #7\n"
1181     "    server: 9 message for command #9\n"
1182     "\n"
1183     "For more information, consult the rrdcached(1) documentation.\n"
1184   };
1186   char *help_quit[2] =
1187   {
1188     "Help for QUIT\n"
1189     ,
1190     "Disconnect from rrdcached.\n"
1191   };
1193   status = buffer_get_field (&buffer, &buffer_size, &command);
1194   if (status != 0)
1195     help_text = help_help;
1196   else
1197   {
1198     if (strcasecmp (command, "update") == 0)
1199       help_text = help_update;
1200     else if (strcasecmp (command, "flush") == 0)
1201       help_text = help_flush;
1202     else if (strcasecmp (command, "flushall") == 0)
1203       help_text = help_flushall;
1204     else if (strcasecmp (command, "pending") == 0)
1205       help_text = help_pending;
1206     else if (strcasecmp (command, "forget") == 0)
1207       help_text = help_forget;
1208     else if (strcasecmp (command, "queue") == 0)
1209       help_text = help_queue;
1210     else if (strcasecmp (command, "stats") == 0)
1211       help_text = help_stats;
1212     else if (strcasecmp (command, "batch") == 0)
1213       help_text = help_batch;
1214     else if (strcasecmp (command, "quit") == 0)
1215       help_text = help_quit;
1216     else
1217       help_text = help_help;
1218   }
1220   add_response_info(sock, help_text[1]);
1221   return send_response(sock, RESP_OK, help_text[0]);
1222 } /* }}} int handle_request_help */
1224 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1226   uint64_t copy_queue_length;
1227   uint64_t copy_updates_received;
1228   uint64_t copy_flush_received;
1229   uint64_t copy_updates_written;
1230   uint64_t copy_data_sets_written;
1231   uint64_t copy_journal_bytes;
1232   uint64_t copy_journal_rotate;
1234   uint64_t tree_nodes_number;
1235   uint64_t tree_depth;
1237   pthread_mutex_lock (&stats_lock);
1238   copy_queue_length       = stats_queue_length;
1239   copy_updates_received   = stats_updates_received;
1240   copy_flush_received     = stats_flush_received;
1241   copy_updates_written    = stats_updates_written;
1242   copy_data_sets_written  = stats_data_sets_written;
1243   copy_journal_bytes      = stats_journal_bytes;
1244   copy_journal_rotate     = stats_journal_rotate;
1245   pthread_mutex_unlock (&stats_lock);
1247   pthread_mutex_lock (&cache_lock);
1248   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1249   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1250   pthread_mutex_unlock (&cache_lock);
1252   add_response_info(sock,
1253                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1254   add_response_info(sock,
1255                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1256   add_response_info(sock,
1257                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1258   add_response_info(sock,
1259                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1260   add_response_info(sock,
1261                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1262   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1263   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1264   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1265   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1267   send_response(sock, RESP_OK, "Statistics follow\n");
1269   return (0);
1270 } /* }}} int handle_request_stats */
1272 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1273     char *buffer, size_t buffer_size)
1275   char *file, file_tmp[PATH_MAX];
1276   int status;
1278   status = buffer_get_field (&buffer, &buffer_size, &file);
1279   if (status != 0)
1280   {
1281     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1282   }
1283   else
1284   {
1285     pthread_mutex_lock(&stats_lock);
1286     stats_flush_received++;
1287     pthread_mutex_unlock(&stats_lock);
1289     get_abs_path(&file, file_tmp);
1290     if (!check_file_access(file, sock)) return 0;
1292     status = flush_file (file);
1293     if (status == 0)
1294       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1295     else if (status == ENOENT)
1296     {
1297       /* no file in our tree; see whether it exists at all */
1298       struct stat statbuf;
1300       memset(&statbuf, 0, sizeof(statbuf));
1301       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1302         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1303       else
1304         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1305     }
1306     else if (status < 0)
1307       return send_response(sock, RESP_ERR, "Internal error.\n");
1308     else
1309       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1310   }
1312   /* NOTREACHED */
1313   assert(1==0);
1314 } /* }}} int handle_request_flush */
1316 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1318   int status;
1320   status = has_privilege(sock, PRIV_HIGH);
1321   if (status <= 0)
1322     return status;
1324   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1326   pthread_mutex_lock(&cache_lock);
1327   flush_old_values(-1);
1328   pthread_mutex_unlock(&cache_lock);
1330   return send_response(sock, RESP_OK, "Started flush.\n");
1331 } /* }}} static int handle_request_flushall */
1333 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1334                                   char *buffer, size_t buffer_size)
1336   int status;
1337   char *file, file_tmp[PATH_MAX];
1338   cache_item_t *ci;
1340   status = buffer_get_field(&buffer, &buffer_size, &file);
1341   if (status != 0)
1342     return send_response(sock, RESP_ERR,
1343                          "Usage: PENDING <filename>\n");
1345   status = has_privilege(sock, PRIV_HIGH);
1346   if (status <= 0)
1347     return status;
1349   get_abs_path(&file, file_tmp);
1351   pthread_mutex_lock(&cache_lock);
1352   ci = g_tree_lookup(cache_tree, file);
1353   if (ci == NULL)
1354   {
1355     pthread_mutex_unlock(&cache_lock);
1356     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1357   }
1359   for (int i=0; i < ci->values_num; i++)
1360     add_response_info(sock, "%s\n", ci->values[i]);
1362   pthread_mutex_unlock(&cache_lock);
1363   return send_response(sock, RESP_OK, "updates pending\n");
1364 } /* }}} static int handle_request_pending */
1366 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1367                                  char *buffer, size_t buffer_size)
1369   int status;
1370   gboolean found;
1371   char *file, file_tmp[PATH_MAX];
1373   status = buffer_get_field(&buffer, &buffer_size, &file);
1374   if (status != 0)
1375     return send_response(sock, RESP_ERR,
1376                          "Usage: FORGET <filename>\n");
1378   status = has_privilege(sock, PRIV_HIGH);
1379   if (status <= 0)
1380     return status;
1382   get_abs_path(&file, file_tmp);
1383   if (!check_file_access(file, sock)) return 0;
1385   pthread_mutex_lock(&cache_lock);
1386   found = g_tree_remove(cache_tree, file);
1387   pthread_mutex_unlock(&cache_lock);
1389   if (found == TRUE)
1390   {
1391     if (sock != NULL)
1392       journal_write("forget", file);
1394     return send_response(sock, RESP_OK, "Gone!\n");
1395   }
1396   else
1397     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1399   /* NOTREACHED */
1400   assert(1==0);
1401 } /* }}} static int handle_request_forget */
1403 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1405   cache_item_t *ci;
1407   pthread_mutex_lock(&cache_lock);
1409   ci = cache_queue_head;
1410   while (ci != NULL)
1411   {
1412     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1413     ci = ci->next;
1414   }
1416   pthread_mutex_unlock(&cache_lock);
1418   return send_response(sock, RESP_OK, "in queue.\n");
1419 } /* }}} int handle_request_queue */
1421 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1422                                   time_t now,
1423                                   char *buffer, size_t buffer_size)
1425   char *file, file_tmp[PATH_MAX];
1426   int values_num = 0;
1427   int status;
1428   char orig_buf[CMD_MAX];
1430   cache_item_t *ci;
1432   status = has_privilege(sock, PRIV_HIGH);
1433   if (status <= 0)
1434     return status;
1436   /* save it for the journal later */
1437   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1439   status = buffer_get_field (&buffer, &buffer_size, &file);
1440   if (status != 0)
1441     return send_response(sock, RESP_ERR,
1442                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1444   pthread_mutex_lock(&stats_lock);
1445   stats_updates_received++;
1446   pthread_mutex_unlock(&stats_lock);
1448   get_abs_path(&file, file_tmp);
1449   if (!check_file_access(file, sock)) return 0;
1451   pthread_mutex_lock (&cache_lock);
1452   ci = g_tree_lookup (cache_tree, file);
1454   if (ci == NULL) /* {{{ */
1455   {
1456     struct stat statbuf;
1458     /* don't hold the lock while we setup; stat(2) might block */
1459     pthread_mutex_unlock(&cache_lock);
1461     memset (&statbuf, 0, sizeof (statbuf));
1462     status = stat (file, &statbuf);
1463     if (status != 0)
1464     {
1465       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1467       status = errno;
1468       if (status == ENOENT)
1469         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1470       else
1471         return send_response(sock, RESP_ERR,
1472                              "stat failed with error %i.\n", status);
1473     }
1474     if (!S_ISREG (statbuf.st_mode))
1475       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1477     if (access(file, R_OK|W_OK) != 0)
1478       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1479                            file, rrd_strerror(errno));
1481     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1482     if (ci == NULL)
1483     {
1484       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1486       return send_response(sock, RESP_ERR, "malloc failed.\n");
1487     }
1488     memset (ci, 0, sizeof (cache_item_t));
1490     ci->file = strdup (file);
1491     if (ci->file == NULL)
1492     {
1493       free (ci);
1494       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1496       return send_response(sock, RESP_ERR, "strdup failed.\n");
1497     }
1499     wipe_ci_values(ci, now);
1500     ci->flags = CI_FLAGS_IN_TREE;
1501     pthread_cond_init(&ci->flushed, NULL);
1503     pthread_mutex_lock(&cache_lock);
1504     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1505   } /* }}} */
1506   assert (ci != NULL);
1508   /* don't re-write updates in replay mode */
1509   if (sock != NULL)
1510     journal_write("update", orig_buf);
1512   while (buffer_size > 0)
1513   {
1514     char **temp;
1515     char *value;
1516     time_t stamp;
1517     char *eostamp;
1519     status = buffer_get_field (&buffer, &buffer_size, &value);
1520     if (status != 0)
1521     {
1522       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1523       break;
1524     }
1526     /* make sure update time is always moving forward */
1527     stamp = strtol(value, &eostamp, 10);
1528     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1529     {
1530       pthread_mutex_unlock(&cache_lock);
1531       return send_response(sock, RESP_ERR,
1532                            "Cannot find timestamp in '%s'!\n", value);
1533     }
1534     else if (stamp <= ci->last_update_stamp)
1535     {
1536       pthread_mutex_unlock(&cache_lock);
1537       return send_response(sock, RESP_ERR,
1538                            "illegal attempt to update using time %ld when last"
1539                            " update time is %ld (minimum one second step)\n",
1540                            stamp, ci->last_update_stamp);
1541     }
1542     else
1543       ci->last_update_stamp = stamp;
1545     temp = (char **) rrd_realloc (ci->values,
1546         sizeof (char *) * (ci->values_num + 1));
1547     if (temp == NULL)
1548     {
1549       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1550       continue;
1551     }
1552     ci->values = temp;
1554     ci->values[ci->values_num] = strdup (value);
1555     if (ci->values[ci->values_num] == NULL)
1556     {
1557       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1558       continue;
1559     }
1560     ci->values_num++;
1562     values_num++;
1563   }
1565   if (((now - ci->last_flush_time) >= config_write_interval)
1566       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1567       && (ci->values_num > 0))
1568   {
1569     enqueue_cache_item (ci, TAIL);
1570   }
1572   pthread_mutex_unlock (&cache_lock);
1574   if (values_num < 1)
1575     return send_response(sock, RESP_ERR, "No values updated.\n");
1576   else
1577     return send_response(sock, RESP_OK,
1578                          "errors, enqueued %i value(s).\n", values_num);
1580   /* NOTREACHED */
1581   assert(1==0);
1583 } /* }}} int handle_request_update */
1585 /* we came across a "WROTE" entry during journal replay.
1586  * throw away any values that we have accumulated for this file
1587  */
1588 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1590   int i;
1591   cache_item_t *ci;
1592   const char *file = buffer;
1594   pthread_mutex_lock(&cache_lock);
1596   ci = g_tree_lookup(cache_tree, file);
1597   if (ci == NULL)
1598   {
1599     pthread_mutex_unlock(&cache_lock);
1600     return (0);
1601   }
1603   if (ci->values)
1604   {
1605     for (i=0; i < ci->values_num; i++)
1606       free(ci->values[i]);
1608     free(ci->values);
1609   }
1611   wipe_ci_values(ci, now);
1612   remove_from_queue(ci);
1614   pthread_mutex_unlock(&cache_lock);
1615   return (0);
1616 } /* }}} int handle_request_wrote */
1618 /* start "BATCH" processing */
1619 static int batch_start (listen_socket_t *sock) /* {{{ */
1621   int status;
1622   if (sock->batch_start)
1623     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1625   status = send_response(sock, RESP_OK,
1626                          "Go ahead.  End with dot '.' on its own line.\n");
1627   sock->batch_start = time(NULL);
1628   sock->batch_cmd = 0;
1630   return status;
1631 } /* }}} static int batch_start */
1633 /* finish "BATCH" processing and return results to the client */
1634 static int batch_done (listen_socket_t *sock) /* {{{ */
1636   assert(sock->batch_start);
1637   sock->batch_start = 0;
1638   sock->batch_cmd  = 0;
1639   return send_response(sock, RESP_OK, "errors\n");
1640 } /* }}} static int batch_done */
1642 /* if sock==NULL, we are in journal replay mode */
1643 static int handle_request (listen_socket_t *sock, /* {{{ */
1644                            time_t now,
1645                            char *buffer, size_t buffer_size)
1647   char *buffer_ptr;
1648   char *command;
1649   int status;
1651   assert (buffer[buffer_size - 1] == '\0');
1653   buffer_ptr = buffer;
1654   command = NULL;
1655   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1656   if (status != 0)
1657   {
1658     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1659     return (-1);
1660   }
1662   if (sock != NULL && sock->batch_start)
1663     sock->batch_cmd++;
1665   if (strcasecmp (command, "update") == 0)
1666     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1667   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1668   {
1669     /* this is only valid in replay mode */
1670     return (handle_request_wrote (buffer_ptr, now));
1671   }
1672   else if (strcasecmp (command, "flush") == 0)
1673     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1674   else if (strcasecmp (command, "flushall") == 0)
1675     return (handle_request_flushall(sock));
1676   else if (strcasecmp (command, "pending") == 0)
1677     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1678   else if (strcasecmp (command, "forget") == 0)
1679     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1680   else if (strcasecmp (command, "queue") == 0)
1681     return (handle_request_queue(sock));
1682   else if (strcasecmp (command, "stats") == 0)
1683     return (handle_request_stats (sock));
1684   else if (strcasecmp (command, "help") == 0)
1685     return (handle_request_help (sock, buffer_ptr, buffer_size));
1686   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1687     return batch_start(sock);
1688   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1689     return batch_done(sock);
1690   else if (strcasecmp (command, "quit") == 0)
1691     return -1;
1692   else
1693     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1695   /* NOTREACHED */
1696   assert(1==0);
1697 } /* }}} int handle_request */
1699 /* MUST NOT hold journal_lock before calling this */
1700 static void journal_rotate(void) /* {{{ */
1702   FILE *old_fh = NULL;
1703   int new_fd;
1705   if (journal_cur == NULL || journal_old == NULL)
1706     return;
1708   pthread_mutex_lock(&journal_lock);
1710   /* we rotate this way (rename before close) so that the we can release
1711    * the journal lock as fast as possible.  Journal writes to the new
1712    * journal can proceed immediately after the new file is opened.  The
1713    * fclose can then block without affecting new updates.
1714    */
1715   if (journal_fh != NULL)
1716   {
1717     old_fh = journal_fh;
1718     journal_fh = NULL;
1719     rename(journal_cur, journal_old);
1720     ++stats_journal_rotate;
1721   }
1723   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1724                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1725   if (new_fd >= 0)
1726   {
1727     journal_fh = fdopen(new_fd, "a");
1728     if (journal_fh == NULL)
1729       close(new_fd);
1730   }
1732   pthread_mutex_unlock(&journal_lock);
1734   if (old_fh != NULL)
1735     fclose(old_fh);
1737   if (journal_fh == NULL)
1738   {
1739     RRDD_LOG(LOG_CRIT,
1740              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1741              journal_cur, rrd_strerror(errno));
1743     RRDD_LOG(LOG_ERR,
1744              "JOURNALING DISABLED: All values will be flushed at shutdown");
1745     config_flush_at_shutdown = 1;
1746   }
1748 } /* }}} static void journal_rotate */
1750 static void journal_done(void) /* {{{ */
1752   if (journal_cur == NULL)
1753     return;
1755   pthread_mutex_lock(&journal_lock);
1756   if (journal_fh != NULL)
1757   {
1758     fclose(journal_fh);
1759     journal_fh = NULL;
1760   }
1762   if (config_flush_at_shutdown)
1763   {
1764     RRDD_LOG(LOG_INFO, "removing journals");
1765     unlink(journal_old);
1766     unlink(journal_cur);
1767   }
1768   else
1769   {
1770     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1771              "journals will be used at next startup");
1772   }
1774   pthread_mutex_unlock(&journal_lock);
1776 } /* }}} static void journal_done */
1778 static int journal_write(char *cmd, char *args) /* {{{ */
1780   int chars;
1782   if (journal_fh == NULL)
1783     return 0;
1785   pthread_mutex_lock(&journal_lock);
1786   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1787   pthread_mutex_unlock(&journal_lock);
1789   if (chars > 0)
1790   {
1791     pthread_mutex_lock(&stats_lock);
1792     stats_journal_bytes += chars;
1793     pthread_mutex_unlock(&stats_lock);
1794   }
1796   return chars;
1797 } /* }}} static int journal_write */
1799 static int journal_replay (const char *file) /* {{{ */
1801   FILE *fh;
1802   int entry_cnt = 0;
1803   int fail_cnt = 0;
1804   uint64_t line = 0;
1805   char entry[CMD_MAX];
1806   time_t now;
1808   if (file == NULL) return 0;
1810   {
1811     char *reason = "unknown error";
1812     int status = 0;
1813     struct stat statbuf;
1815     memset(&statbuf, 0, sizeof(statbuf));
1816     if (stat(file, &statbuf) != 0)
1817     {
1818       if (errno == ENOENT)
1819         return 0;
1821       reason = "stat error";
1822       status = errno;
1823     }
1824     else if (!S_ISREG(statbuf.st_mode))
1825     {
1826       reason = "not a regular file";
1827       status = EPERM;
1828     }
1829     if (statbuf.st_uid != daemon_uid)
1830     {
1831       reason = "not owned by daemon user";
1832       status = EACCES;
1833     }
1834     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1835     {
1836       reason = "must not be user/group writable";
1837       status = EACCES;
1838     }
1840     if (status != 0)
1841     {
1842       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1843                file, rrd_strerror(status), reason);
1844       return 0;
1845     }
1846   }
1848   fh = fopen(file, "r");
1849   if (fh == NULL)
1850   {
1851     if (errno != ENOENT)
1852       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1853                file, rrd_strerror(errno));
1854     return 0;
1855   }
1856   else
1857     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1859   now = time(NULL);
1861   while(!feof(fh))
1862   {
1863     size_t entry_len;
1865     ++line;
1866     if (fgets(entry, sizeof(entry), fh) == NULL)
1867       break;
1868     entry_len = strlen(entry);
1870     /* check \n termination in case journal writing crashed mid-line */
1871     if (entry_len == 0)
1872       continue;
1873     else if (entry[entry_len - 1] != '\n')
1874     {
1875       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1876       ++fail_cnt;
1877       continue;
1878     }
1880     entry[entry_len - 1] = '\0';
1882     if (handle_request(NULL, now, entry, entry_len) == 0)
1883       ++entry_cnt;
1884     else
1885       ++fail_cnt;
1886   }
1888   fclose(fh);
1890   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1891            entry_cnt, fail_cnt);
1893   return entry_cnt > 0 ? 1 : 0;
1894 } /* }}} static int journal_replay */
1896 static void journal_init(void) /* {{{ */
1898   int had_journal = 0;
1900   if (journal_cur == NULL) return;
1902   pthread_mutex_lock(&journal_lock);
1904   RRDD_LOG(LOG_INFO, "checking for journal files");
1906   had_journal += journal_replay(journal_old);
1907   had_journal += journal_replay(journal_cur);
1909   /* it must have been a crash.  start a flush */
1910   if (had_journal && config_flush_at_shutdown)
1911     flush_old_values(-1);
1913   pthread_mutex_unlock(&journal_lock);
1914   journal_rotate();
1916   RRDD_LOG(LOG_INFO, "journal processing complete");
1918 } /* }}} static void journal_init */
1920 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1922   assert(sock != NULL);
1924   free(sock->rbuf);  sock->rbuf = NULL;
1925   free(sock->wbuf);  sock->wbuf = NULL;
1926   free(sock);
1927 } /* }}} void free_listen_socket */
1929 static void close_connection(listen_socket_t *sock) /* {{{ */
1931   if (sock->fd >= 0)
1932   {
1933     close(sock->fd);
1934     sock->fd = -1;
1935   }
1937   free_listen_socket(sock);
1939 } /* }}} void close_connection */
1941 static void *connection_thread_main (void *args) /* {{{ */
1943   listen_socket_t *sock;
1944   int i;
1945   int fd;
1947   sock = (listen_socket_t *) args;
1948   fd = sock->fd;
1950   /* init read buffers */
1951   sock->next_read = sock->next_cmd = 0;
1952   sock->rbuf = malloc(RBUF_SIZE);
1953   if (sock->rbuf == NULL)
1954   {
1955     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1956     close_connection(sock);
1957     return NULL;
1958   }
1960   pthread_mutex_lock (&connection_threads_lock);
1961   {
1962     pthread_t *temp;
1964     temp = (pthread_t *) rrd_realloc (connection_threads,
1965         sizeof (pthread_t) * (connection_threads_num + 1));
1966     if (temp == NULL)
1967     {
1968       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1969     }
1970     else
1971     {
1972       connection_threads = temp;
1973       connection_threads[connection_threads_num] = pthread_self ();
1974       connection_threads_num++;
1975     }
1976   }
1977   pthread_mutex_unlock (&connection_threads_lock);
1979   while (do_shutdown == 0)
1980   {
1981     char *cmd;
1982     ssize_t cmd_len;
1983     ssize_t rbytes;
1984     time_t now;
1986     struct pollfd pollfd;
1987     int status;
1989     pollfd.fd = fd;
1990     pollfd.events = POLLIN | POLLPRI;
1991     pollfd.revents = 0;
1993     status = poll (&pollfd, 1, /* timeout = */ 500);
1994     if (do_shutdown)
1995       break;
1996     else if (status == 0) /* timeout */
1997       continue;
1998     else if (status < 0) /* error */
1999     {
2000       status = errno;
2001       if (status != EINTR)
2002         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2003       continue;
2004     }
2006     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2007       break;
2008     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2009     {
2010       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2011           "poll(2) returned something unexpected: %#04hx",
2012           pollfd.revents);
2013       break;
2014     }
2016     rbytes = read(fd, sock->rbuf + sock->next_read,
2017                   RBUF_SIZE - sock->next_read);
2018     if (rbytes < 0)
2019     {
2020       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2021       break;
2022     }
2023     else if (rbytes == 0)
2024       break; /* eof */
2026     sock->next_read += rbytes;
2028     if (sock->batch_start)
2029       now = sock->batch_start;
2030     else
2031       now = time(NULL);
2033     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2034     {
2035       status = handle_request (sock, now, cmd, cmd_len+1);
2036       if (status != 0)
2037         goto out_close;
2038     }
2039   }
2041 out_close:
2042   close_connection(sock);
2044   /* Remove this thread from the connection threads list */
2045   pthread_mutex_lock (&connection_threads_lock);
2046   {
2047     pthread_t self;
2048     pthread_t *temp;
2050     /* Find out own index in the array */
2051     self = pthread_self ();
2052     for (i = 0; i < connection_threads_num; i++)
2053       if (pthread_equal (connection_threads[i], self) != 0)
2054         break;
2055     assert (i < connection_threads_num);
2057     /* Move the trailing threads forward. */
2058     if (i < (connection_threads_num - 1))
2059     {
2060       memmove (connection_threads + i,
2061                connection_threads + i + 1,
2062                sizeof (pthread_t) * (connection_threads_num - i - 1));
2063     }
2065     connection_threads_num--;
2067     temp = rrd_realloc(connection_threads,
2068                    sizeof(*connection_threads) * connection_threads_num);
2069     if (connection_threads_num > 0 && temp == NULL)
2070       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2071     else
2072       connection_threads = temp;
2073   }
2074   pthread_mutex_unlock (&connection_threads_lock);
2076   return (NULL);
2077 } /* }}} void *connection_thread_main */
2079 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2081   int fd;
2082   struct sockaddr_un sa;
2083   listen_socket_t *temp;
2084   int status;
2085   const char *path;
2087   path = sock->addr;
2088   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2089     path += strlen("unix:");
2091   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2092       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2093   if (temp == NULL)
2094   {
2095     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2096     return (-1);
2097   }
2098   listen_fds = temp;
2099   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2101   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2102   if (fd < 0)
2103   {
2104     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2105              rrd_strerror(errno));
2106     return (-1);
2107   }
2109   memset (&sa, 0, sizeof (sa));
2110   sa.sun_family = AF_UNIX;
2111   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2113   /* if we've gotten this far, we own the pid file.  any daemon started
2114    * with the same args must not be alive.  therefore, ensure that we can
2115    * create the socket...
2116    */
2117   unlink(path);
2119   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2120   if (status != 0)
2121   {
2122     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2123              path, rrd_strerror(errno));
2124     close (fd);
2125     return (-1);
2126   }
2128   status = listen (fd, /* backlog = */ 10);
2129   if (status != 0)
2130   {
2131     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2132              path, rrd_strerror(errno));
2133     close (fd);
2134     unlink (path);
2135     return (-1);
2136   }
2138   listen_fds[listen_fds_num].fd = fd;
2139   listen_fds[listen_fds_num].family = PF_UNIX;
2140   strncpy(listen_fds[listen_fds_num].addr, path,
2141           sizeof (listen_fds[listen_fds_num].addr) - 1);
2142   listen_fds_num++;
2144   return (0);
2145 } /* }}} int open_listen_socket_unix */
2147 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2149   struct addrinfo ai_hints;
2150   struct addrinfo *ai_res;
2151   struct addrinfo *ai_ptr;
2152   char addr_copy[NI_MAXHOST];
2153   char *addr;
2154   char *port;
2155   int status;
2157   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2158   addr_copy[sizeof (addr_copy) - 1] = 0;
2159   addr = addr_copy;
2161   memset (&ai_hints, 0, sizeof (ai_hints));
2162   ai_hints.ai_flags = 0;
2163 #ifdef AI_ADDRCONFIG
2164   ai_hints.ai_flags |= AI_ADDRCONFIG;
2165 #endif
2166   ai_hints.ai_family = AF_UNSPEC;
2167   ai_hints.ai_socktype = SOCK_STREAM;
2169   port = NULL;
2170   if (*addr == '[') /* IPv6+port format */
2171   {
2172     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2173     addr++;
2175     port = strchr (addr, ']');
2176     if (port == NULL)
2177     {
2178       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2179       return (-1);
2180     }
2181     *port = 0;
2182     port++;
2184     if (*port == ':')
2185       port++;
2186     else if (*port == 0)
2187       port = NULL;
2188     else
2189     {
2190       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2191       return (-1);
2192     }
2193   } /* if (*addr = ']') */
2194   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2195   {
2196     port = rindex(addr, ':');
2197     if (port != NULL)
2198     {
2199       *port = 0;
2200       port++;
2201     }
2202   }
2203   ai_res = NULL;
2204   status = getaddrinfo (addr,
2205                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2206                         &ai_hints, &ai_res);
2207   if (status != 0)
2208   {
2209     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2210              addr, gai_strerror (status));
2211     return (-1);
2212   }
2214   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2215   {
2216     int fd;
2217     listen_socket_t *temp;
2218     int one = 1;
2220     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2221         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2222     if (temp == NULL)
2223     {
2224       fprintf (stderr,
2225                "rrdcached: open_listen_socket_network: realloc failed.\n");
2226       continue;
2227     }
2228     listen_fds = temp;
2229     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2231     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2232     if (fd < 0)
2233     {
2234       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2235                rrd_strerror(errno));
2236       continue;
2237     }
2239     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2241     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2242     if (status != 0)
2243     {
2244       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2245                sock->addr, rrd_strerror(errno));
2246       close (fd);
2247       continue;
2248     }
2250     status = listen (fd, /* backlog = */ 10);
2251     if (status != 0)
2252     {
2253       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2254                sock->addr, rrd_strerror(errno));
2255       close (fd);
2256       freeaddrinfo(ai_res);
2257       return (-1);
2258     }
2260     listen_fds[listen_fds_num].fd = fd;
2261     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2262     listen_fds_num++;
2263   } /* for (ai_ptr) */
2265   freeaddrinfo(ai_res);
2266   return (0);
2267 } /* }}} static int open_listen_socket_network */
2269 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2271   assert(sock != NULL);
2272   assert(sock->addr != NULL);
2274   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2275       || sock->addr[0] == '/')
2276     return (open_listen_socket_unix(sock));
2277   else
2278     return (open_listen_socket_network(sock));
2279 } /* }}} int open_listen_socket */
2281 static int close_listen_sockets (void) /* {{{ */
2283   size_t i;
2285   for (i = 0; i < listen_fds_num; i++)
2286   {
2287     close (listen_fds[i].fd);
2289     if (listen_fds[i].family == PF_UNIX)
2290       unlink(listen_fds[i].addr);
2291   }
2293   free (listen_fds);
2294   listen_fds = NULL;
2295   listen_fds_num = 0;
2297   return (0);
2298 } /* }}} int close_listen_sockets */
2300 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2302   struct pollfd *pollfds;
2303   int pollfds_num;
2304   int status;
2305   int i;
2307   if (listen_fds_num < 1)
2308   {
2309     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2310     return (NULL);
2311   }
2313   pollfds_num = listen_fds_num;
2314   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2315   if (pollfds == NULL)
2316   {
2317     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2318     return (NULL);
2319   }
2320   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2322   RRDD_LOG(LOG_INFO, "listening for connections");
2324   while (do_shutdown == 0)
2325   {
2326     for (i = 0; i < pollfds_num; i++)
2327     {
2328       pollfds[i].fd = listen_fds[i].fd;
2329       pollfds[i].events = POLLIN | POLLPRI;
2330       pollfds[i].revents = 0;
2331     }
2333     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2334     if (do_shutdown)
2335       break;
2336     else if (status == 0) /* timeout */
2337       continue;
2338     else if (status < 0) /* error */
2339     {
2340       status = errno;
2341       if (status != EINTR)
2342       {
2343         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2344       }
2345       continue;
2346     }
2348     for (i = 0; i < pollfds_num; i++)
2349     {
2350       listen_socket_t *client_sock;
2351       struct sockaddr_storage client_sa;
2352       socklen_t client_sa_size;
2353       pthread_t tid;
2354       pthread_attr_t attr;
2356       if (pollfds[i].revents == 0)
2357         continue;
2359       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2360       {
2361         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2362             "poll(2) returned something unexpected for listen FD #%i.",
2363             pollfds[i].fd);
2364         continue;
2365       }
2367       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2368       if (client_sock == NULL)
2369       {
2370         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2371         continue;
2372       }
2373       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2375       client_sa_size = sizeof (client_sa);
2376       client_sock->fd = accept (pollfds[i].fd,
2377           (struct sockaddr *) &client_sa, &client_sa_size);
2378       if (client_sock->fd < 0)
2379       {
2380         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2381         free(client_sock);
2382         continue;
2383       }
2385       pthread_attr_init (&attr);
2386       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2388       status = pthread_create (&tid, &attr, connection_thread_main,
2389                                client_sock);
2390       if (status != 0)
2391       {
2392         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2393         close_connection(client_sock);
2394         continue;
2395       }
2396     } /* for (pollfds_num) */
2397   } /* while (do_shutdown == 0) */
2399   RRDD_LOG(LOG_INFO, "starting shutdown");
2401   close_listen_sockets ();
2403   pthread_mutex_lock (&connection_threads_lock);
2404   while (connection_threads_num > 0)
2405   {
2406     pthread_t wait_for;
2408     wait_for = connection_threads[0];
2410     pthread_mutex_unlock (&connection_threads_lock);
2411     pthread_join (wait_for, /* retval = */ NULL);
2412     pthread_mutex_lock (&connection_threads_lock);
2413   }
2414   pthread_mutex_unlock (&connection_threads_lock);
2416   free(pollfds);
2418   return (NULL);
2419 } /* }}} void *listen_thread_main */
2421 static int daemonize (void) /* {{{ */
2423   int pid_fd;
2424   char *base_dir;
2426   daemon_uid = geteuid();
2428   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2429   if (pid_fd < 0)
2430     pid_fd = check_pidfile();
2431   if (pid_fd < 0)
2432     return pid_fd;
2434   /* open all the listen sockets */
2435   if (config_listen_address_list_len > 0)
2436   {
2437     for (int i = 0; i < config_listen_address_list_len; i++)
2438     {
2439       open_listen_socket (config_listen_address_list[i]);
2440       free_listen_socket (config_listen_address_list[i]);
2441     }
2443     free(config_listen_address_list);
2444   }
2445   else
2446   {
2447     listen_socket_t sock;
2448     memset(&sock, 0, sizeof(sock));
2449     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2450     open_listen_socket (&sock);
2451   }
2453   if (listen_fds_num < 1)
2454   {
2455     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2456     goto error;
2457   }
2459   if (!stay_foreground)
2460   {
2461     pid_t child;
2463     child = fork ();
2464     if (child < 0)
2465     {
2466       fprintf (stderr, "daemonize: fork(2) failed.\n");
2467       goto error;
2468     }
2469     else if (child > 0)
2470       exit(0);
2472     /* Become session leader */
2473     setsid ();
2475     /* Open the first three file descriptors to /dev/null */
2476     close (2);
2477     close (1);
2478     close (0);
2480     open ("/dev/null", O_RDWR);
2481     dup (0);
2482     dup (0);
2483   } /* if (!stay_foreground) */
2485   /* Change into the /tmp directory. */
2486   base_dir = (config_base_dir != NULL)
2487     ? config_base_dir
2488     : "/tmp";
2490   if (chdir (base_dir) != 0)
2491   {
2492     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2493     goto error;
2494   }
2496   install_signal_handlers();
2498   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2499   RRDD_LOG(LOG_INFO, "starting up");
2501   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2502                                 (GDestroyNotify) free_cache_item);
2503   if (cache_tree == NULL)
2504   {
2505     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2506     goto error;
2507   }
2509   return write_pidfile (pid_fd);
2511 error:
2512   remove_pidfile();
2513   return -1;
2514 } /* }}} int daemonize */
2516 static int cleanup (void) /* {{{ */
2518   do_shutdown++;
2520   pthread_cond_broadcast (&flush_cond);
2521   pthread_join (flush_thread, NULL);
2523   pthread_cond_broadcast (&queue_cond);
2524   for (int i = 0; i < config_queue_threads; i++)
2525     pthread_join (queue_threads[i], NULL);
2527   if (config_flush_at_shutdown)
2528   {
2529     assert(cache_queue_head == NULL);
2530     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2531   }
2533   journal_done();
2534   remove_pidfile ();
2536   free(queue_threads);
2537   free(config_base_dir);
2538   free(config_pid_file);
2539   free(journal_cur);
2540   free(journal_old);
2542   pthread_mutex_lock(&cache_lock);
2543   g_tree_destroy(cache_tree);
2545   RRDD_LOG(LOG_INFO, "goodbye");
2546   closelog ();
2548   return (0);
2549 } /* }}} int cleanup */
2551 static int read_options (int argc, char **argv) /* {{{ */
2553   int option;
2554   int status = 0;
2556   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2557   {
2558     switch (option)
2559     {
2560       case 'g':
2561         stay_foreground=1;
2562         break;
2564       case 'L':
2565       case 'l':
2566       {
2567         listen_socket_t **temp;
2568         listen_socket_t *new;
2570         new = malloc(sizeof(listen_socket_t));
2571         if (new == NULL)
2572         {
2573           fprintf(stderr, "read_options: malloc failed.\n");
2574           return(2);
2575         }
2576         memset(new, 0, sizeof(listen_socket_t));
2578         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2579             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2580         if (temp == NULL)
2581         {
2582           fprintf (stderr, "read_options: realloc failed.\n");
2583           return (2);
2584         }
2585         config_listen_address_list = temp;
2587         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2588         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2590         temp[config_listen_address_list_len] = new;
2591         config_listen_address_list_len++;
2592       }
2593       break;
2595       case 'f':
2596       {
2597         int temp;
2599         temp = atoi (optarg);
2600         if (temp > 0)
2601           config_flush_interval = temp;
2602         else
2603         {
2604           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2605           status = 3;
2606         }
2607       }
2608       break;
2610       case 'w':
2611       {
2612         int temp;
2614         temp = atoi (optarg);
2615         if (temp > 0)
2616           config_write_interval = temp;
2617         else
2618         {
2619           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2620           status = 2;
2621         }
2622       }
2623       break;
2625       case 'z':
2626       {
2627         int temp;
2629         temp = atoi(optarg);
2630         if (temp > 0)
2631           config_write_jitter = temp;
2632         else
2633         {
2634           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2635           status = 2;
2636         }
2638         break;
2639       }
2641       case 't':
2642       {
2643         int threads;
2644         threads = atoi(optarg);
2645         if (threads >= 1)
2646           config_queue_threads = threads;
2647         else
2648         {
2649           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2650           return 1;
2651         }
2652       }
2653       break;
2655       case 'B':
2656         config_write_base_only = 1;
2657         break;
2659       case 'b':
2660       {
2661         size_t len;
2662         char base_realpath[PATH_MAX];
2664         if (config_base_dir != NULL)
2665           free (config_base_dir);
2666         config_base_dir = strdup (optarg);
2667         if (config_base_dir == NULL)
2668         {
2669           fprintf (stderr, "read_options: strdup failed.\n");
2670           return (3);
2671         }
2673         /* make sure that the base directory is not resolved via
2674          * symbolic links.  this makes some performance-enhancing
2675          * assumptions possible (we don't have to resolve paths
2676          * that start with a "/")
2677          */
2678         if (realpath(config_base_dir, base_realpath) == NULL)
2679         {
2680           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2681           return 5;
2682         }
2683         else if (strncmp(config_base_dir,
2684                          base_realpath, sizeof(base_realpath)) != 0)
2685         {
2686           fprintf(stderr,
2687                   "Base directory (-b) resolved via file system links!\n"
2688                   "Please consult rrdcached '-b' documentation!\n"
2689                   "Consider specifying the real directory (%s)\n",
2690                   base_realpath);
2691           return 5;
2692         }
2694         len = strlen (config_base_dir);
2695         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2696         {
2697           config_base_dir[len - 1] = 0;
2698           len--;
2699         }
2701         if (len < 1)
2702         {
2703           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2704           return (4);
2705         }
2707         _config_base_dir_len = len;
2708       }
2709       break;
2711       case 'p':
2712       {
2713         if (config_pid_file != NULL)
2714           free (config_pid_file);
2715         config_pid_file = strdup (optarg);
2716         if (config_pid_file == NULL)
2717         {
2718           fprintf (stderr, "read_options: strdup failed.\n");
2719           return (3);
2720         }
2721       }
2722       break;
2724       case 'F':
2725         config_flush_at_shutdown = 1;
2726         break;
2728       case 'j':
2729       {
2730         struct stat statbuf;
2731         const char *dir = optarg;
2733         status = stat(dir, &statbuf);
2734         if (status != 0)
2735         {
2736           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2737           return 6;
2738         }
2740         if (!S_ISDIR(statbuf.st_mode)
2741             || access(dir, R_OK|W_OK|X_OK) != 0)
2742         {
2743           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2744                   errno ? rrd_strerror(errno) : "");
2745           return 6;
2746         }
2748         journal_cur = malloc(PATH_MAX + 1);
2749         journal_old = malloc(PATH_MAX + 1);
2750         if (journal_cur == NULL || journal_old == NULL)
2751         {
2752           fprintf(stderr, "malloc failure for journal files\n");
2753           return 6;
2754         }
2755         else 
2756         {
2757           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2758           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2759         }
2760       }
2761       break;
2763       case 'h':
2764       case '?':
2765         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2766             "\n"
2767             "Usage: rrdcached [options]\n"
2768             "\n"
2769             "Valid options are:\n"
2770             "  -l <address>  Socket address to listen to.\n"
2771             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2772             "  -w <seconds>  Interval in which to write data.\n"
2773             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2774             "  -t <threads>  Number of write threads.\n"
2775             "  -f <seconds>  Interval in which to flush dead data.\n"
2776             "  -p <file>     Location of the PID-file.\n"
2777             "  -b <dir>      Base directory to change to.\n"
2778             "  -B            Restrict file access to paths within -b <dir>\n"
2779             "  -g            Do not fork and run in the foreground.\n"
2780             "  -j <dir>      Directory in which to create the journal files.\n"
2781             "  -F            Always flush all updates at shutdown\n"
2782             "\n"
2783             "For more information and a detailed description of all options "
2784             "please refer\n"
2785             "to the rrdcached(1) manual page.\n",
2786             VERSION);
2787         status = -1;
2788         break;
2789     } /* switch (option) */
2790   } /* while (getopt) */
2792   /* advise the user when values are not sane */
2793   if (config_flush_interval < 2 * config_write_interval)
2794     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2795             " 2x write interval (-w) !\n");
2796   if (config_write_jitter > config_write_interval)
2797     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2798             " write interval (-w) !\n");
2800   if (config_write_base_only && config_base_dir == NULL)
2801     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2802             "  Consult the rrdcached documentation\n");
2804   if (journal_cur == NULL)
2805     config_flush_at_shutdown = 1;
2807   return (status);
2808 } /* }}} int read_options */
2810 int main (int argc, char **argv)
2812   int status;
2814   status = read_options (argc, argv);
2815   if (status != 0)
2816   {
2817     if (status < 0)
2818       status = 0;
2819     return (status);
2820   }
2822   status = daemonize ();
2823   if (status != 0)
2824   {
2825     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2826     return (1);
2827   }
2829   journal_init();
2831   /* start the queue threads */
2832   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2833   if (queue_threads == NULL)
2834   {
2835     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2836     cleanup();
2837     return (1);
2838   }
2839   for (int i = 0; i < config_queue_threads; i++)
2840   {
2841     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2842     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2843     if (status != 0)
2844     {
2845       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2846       cleanup();
2847       return (1);
2848     }
2849   }
2851   /* start the flush thread */
2852   memset(&flush_thread, 0, sizeof(flush_thread));
2853   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2854   if (status != 0)
2855   {
2856     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2857     cleanup();
2858     return (1);
2859   }
2861   listen_thread_main (NULL);
2862   cleanup ();
2864   return (0);
2865 } /* int main */
2867 /*
2868  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2869  */