Code

17153ce98be3309f985b885b486e3a0fc7b5fd6d
[pkg-rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #include "rrd_tool.h"
67 #include "rrd_client.h"
68 #include "unused.h"
70 #include <stdlib.h>
72 #ifndef WIN32
73 #ifdef HAVE_STDINT_H
74 #  include <stdint.h>
75 #endif
76 #include <unistd.h>
77 #include <strings.h>
78 #include <inttypes.h>
79 #include <sys/socket.h>
81 #else
83 #endif
84 #include <stdio.h>
85 #include <string.h>
87 #include <sys/types.h>
88 #include <sys/stat.h>
89 #include <dirent.h>
90 #include <fcntl.h>
91 #include <signal.h>
92 #include <sys/un.h>
93 #include <netdb.h>
94 #include <poll.h>
95 #include <syslog.h>
96 #include <pthread.h>
97 #include <errno.h>
98 #include <assert.h>
99 #include <sys/time.h>
100 #include <time.h>
101 #include <libgen.h>
102 #include <grp.h>
104 #ifdef HAVE_LIBWRAP
105 #include <tcpd.h>
106 #endif /* HAVE_LIBWRAP */
108 #include <glib-2.0/glib.h>
109 /* }}} */
111 #define RRDD_LOG(severity, ...) \
112   do { \
113     if (stay_foreground) { \
114       fprintf(stderr, __VA_ARGS__); \
115       fprintf(stderr, "\n"); } \
116     syslog ((severity), __VA_ARGS__); \
117   } while (0)
119 /*
120  * Types
121  */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124 struct listen_socket_s
126   int fd;
127   char addr[PATH_MAX + 1];
128   int family;
130   /* state for BATCH processing */
131   time_t batch_start;
132   int batch_cmd;
134   /* buffered IO */
135   char *rbuf;
136   off_t next_cmd;
137   off_t next_read;
139   char *wbuf;
140   ssize_t wbuf_len;
142   uint32_t permissions;
144   gid_t  socket_group;
145   mode_t socket_permissions;
146 };
147 typedef struct listen_socket_s listen_socket_t;
149 struct command_s;
150 typedef struct command_s command_t;
151 /* note: guard against "unused" warnings in the handlers */
152 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
153                         time_t UNUSED(now),\
154                         char  UNUSED(*buffer),\
155                         size_t UNUSED(buffer_size)
157 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
158                         DISPATCH_PROTO
160 struct command_s {
161   char   *cmd;
162   int (*handler)(HANDLER_PROTO);
164   char  context;                /* where we expect to see it */
165 #define CMD_CONTEXT_CLIENT      (1<<0)
166 #define CMD_CONTEXT_BATCH       (1<<1)
167 #define CMD_CONTEXT_JOURNAL     (1<<2)
168 #define CMD_CONTEXT_ANY         (0x7f)
170   char *syntax;
171   char *help;
172 };
174 struct cache_item_s;
175 typedef struct cache_item_s cache_item_t;
176 struct cache_item_s
178   char *file;
179   char **values;
180   size_t values_num;
181   time_t last_flush_time;
182   double last_update_stamp;
183 #define CI_FLAGS_IN_TREE  (1<<0)
184 #define CI_FLAGS_IN_QUEUE (1<<1)
185   int flags;
186   pthread_cond_t  flushed;
187   cache_item_t *prev;
188   cache_item_t *next;
189 };
191 struct callback_flush_data_s
193   time_t now;
194   time_t abs_timeout;
195   char **keys;
196   size_t keys_num;
197 };
198 typedef struct callback_flush_data_s callback_flush_data_t;
200 enum queue_side_e
202   HEAD,
203   TAIL
204 };
205 typedef enum queue_side_e queue_side_t;
207 /* describe a set of journal files */
208 typedef struct {
209   char **files;
210   size_t files_num;
211 } journal_set;
213 /* max length of socket command or response */
214 #define CMD_MAX 4096
215 #define RBUF_SIZE (CMD_MAX*2)
217 /*
218  * Variables
219  */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
226 static listen_socket_t default_socket;
228 enum {
229   RUNNING,              /* normal operation */
230   FLUSHING,             /* flushing remaining values */
231   SHUTDOWN              /* shutting down */
232 } state = RUNNING;
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
245 /* Cache stuff */
246 static GTree          *cache_tree = NULL;
247 static cache_item_t   *cache_queue_head = NULL;
248 static cache_item_t   *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
251 static int config_write_interval = 300;
252 static int config_write_jitter   = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
260 static listen_socket_t **config_listen_address_list = NULL;
261 static size_t config_listen_address_list_len = 0;
263 static uint64_t stats_queue_length = 0;
264 static uint64_t stats_updates_received = 0;
265 static uint64_t stats_flush_received = 0;
266 static uint64_t stats_updates_written = 0;
267 static uint64_t stats_data_sets_written = 0;
268 static uint64_t stats_journal_bytes = 0;
269 static uint64_t stats_journal_rotate = 0;
270 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
272 /* Journaled updates */
273 #define JOURNAL_REPLAY(s) ((s) == NULL)
274 #define JOURNAL_BASE "rrd.journal"
275 static journal_set *journal_cur = NULL;
276 static journal_set *journal_old = NULL;
277 static char *journal_dir = NULL;
278 static FILE *journal_fh = NULL;         /* current journal file handle */
279 static long  journal_size = 0;          /* current journal size */
280 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
281 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
282 static int journal_write(char *cmd, char *args);
283 static void journal_done(void);
284 static void journal_rotate(void);
286 /* prototypes for forward refernces */
287 static int handle_request_help (HANDLER_PROTO);
289 /* 
290  * Functions
291  */
292 static void sig_common (const char *sig) /* {{{ */
294   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
295   state = FLUSHING;
296   pthread_cond_broadcast(&flush_cond);
297   pthread_cond_broadcast(&queue_cond);
298 } /* }}} void sig_common */
300 static void sig_int_handler (int UNUSED(s)) /* {{{ */
302   sig_common("INT");
303 } /* }}} void sig_int_handler */
305 static void sig_term_handler (int UNUSED(s)) /* {{{ */
307   sig_common("TERM");
308 } /* }}} void sig_term_handler */
310 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
312   config_flush_at_shutdown = 1;
313   sig_common("USR1");
314 } /* }}} void sig_usr1_handler */
316 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
318   config_flush_at_shutdown = 0;
319   sig_common("USR2");
320 } /* }}} void sig_usr2_handler */
322 static void install_signal_handlers(void) /* {{{ */
324   /* These structures are static, because `sigaction' behaves weird if the are
325    * overwritten.. */
326   static struct sigaction sa_int;
327   static struct sigaction sa_term;
328   static struct sigaction sa_pipe;
329   static struct sigaction sa_usr1;
330   static struct sigaction sa_usr2;
332   /* Install signal handlers */
333   memset (&sa_int, 0, sizeof (sa_int));
334   sa_int.sa_handler = sig_int_handler;
335   sigaction (SIGINT, &sa_int, NULL);
337   memset (&sa_term, 0, sizeof (sa_term));
338   sa_term.sa_handler = sig_term_handler;
339   sigaction (SIGTERM, &sa_term, NULL);
341   memset (&sa_pipe, 0, sizeof (sa_pipe));
342   sa_pipe.sa_handler = SIG_IGN;
343   sigaction (SIGPIPE, &sa_pipe, NULL);
345   memset (&sa_pipe, 0, sizeof (sa_usr1));
346   sa_usr1.sa_handler = sig_usr1_handler;
347   sigaction (SIGUSR1, &sa_usr1, NULL);
349   memset (&sa_usr2, 0, sizeof (sa_usr2));
350   sa_usr2.sa_handler = sig_usr2_handler;
351   sigaction (SIGUSR2, &sa_usr2, NULL);
353 } /* }}} void install_signal_handlers */
355 static int open_pidfile(char *action, int oflag) /* {{{ */
357   int fd;
358   const char *file;
359   char *file_copy, *dir;
361   file = (config_pid_file != NULL)
362     ? config_pid_file
363     : LOCALSTATEDIR "/run/rrdcached.pid";
365   /* dirname may modify its argument */
366   file_copy = strdup(file);
367   if (file_copy == NULL)
368   {
369     fprintf(stderr, "rrdcached: strdup(): %s\n",
370         rrd_strerror(errno));
371     return -1;
372   }
374   dir = dirname(file_copy);
375   if (rrd_mkdir_p(dir, 0777) != 0)
376   {
377     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
378         dir, rrd_strerror(errno));
379     return -1;
380   }
382   free(file_copy);
384   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
385   if (fd < 0)
386     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
387             action, file, rrd_strerror(errno));
389   return(fd);
390 } /* }}} static int open_pidfile */
392 /* check existing pid file to see whether a daemon is running */
393 static int check_pidfile(void)
395   int pid_fd;
396   pid_t pid;
397   char pid_str[16];
399   pid_fd = open_pidfile("open", O_RDWR);
400   if (pid_fd < 0)
401     return pid_fd;
403   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
404     return -1;
406   pid = atoi(pid_str);
407   if (pid <= 0)
408     return -1;
410   /* another running process that we can signal COULD be
411    * a competing rrdcached */
412   if (pid != getpid() && kill(pid, 0) == 0)
413   {
414     fprintf(stderr,
415             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
416     close(pid_fd);
417     return -1;
418   }
420   lseek(pid_fd, 0, SEEK_SET);
421   if (ftruncate(pid_fd, 0) == -1)
422   {
423     fprintf(stderr,
424             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
425     close(pid_fd);
426     return -1;
427   }
429   fprintf(stderr,
430           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
431           "rrdcached: starting normally.\n", pid);
433   return pid_fd;
434 } /* }}} static int check_pidfile */
436 static int write_pidfile (int fd) /* {{{ */
438   pid_t pid;
439   FILE *fh;
441   pid = getpid ();
443   fh = fdopen (fd, "w");
444   if (fh == NULL)
445   {
446     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
447     close(fd);
448     return (-1);
449   }
451   fprintf (fh, "%i\n", (int) pid);
452   fclose (fh);
454   return (0);
455 } /* }}} int write_pidfile */
457 static int remove_pidfile (void) /* {{{ */
459   char *file;
460   int status;
462   file = (config_pid_file != NULL)
463     ? config_pid_file
464     : LOCALSTATEDIR "/run/rrdcached.pid";
466   status = unlink (file);
467   if (status == 0)
468     return (0);
469   return (errno);
470 } /* }}} int remove_pidfile */
472 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
474   char *eol;
476   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
477                sock->next_read - sock->next_cmd);
479   if (eol == NULL)
480   {
481     /* no commands left, move remainder back to front of rbuf */
482     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
483             sock->next_read - sock->next_cmd);
484     sock->next_read -= sock->next_cmd;
485     sock->next_cmd = 0;
486     *len = 0;
487     return NULL;
488   }
489   else
490   {
491     char *cmd = sock->rbuf + sock->next_cmd;
492     *eol = '\0';
494     sock->next_cmd = eol - sock->rbuf + 1;
496     if (eol > sock->rbuf && *(eol-1) == '\r')
497       *(--eol) = '\0'; /* handle "\r\n" EOL */
499     *len = eol - cmd;
501     return cmd;
502   }
504   /* NOTREACHED */
505   assert(1==0);
506 } /* }}} char *next_cmd */
508 /* add the characters directly to the write buffer */
509 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
511   char *new_buf;
513   assert(sock != NULL);
515   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
516   if (new_buf == NULL)
517   {
518     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
519     return -1;
520   }
522   strncpy(new_buf + sock->wbuf_len, str, len + 1);
524   sock->wbuf = new_buf;
525   sock->wbuf_len += len;
527   return 0;
528 } /* }}} static int add_to_wbuf */
530 /* add the text to the "extra" info that's sent after the status line */
531 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
533   va_list argp;
534   char buffer[CMD_MAX];
535   int len;
537   if (JOURNAL_REPLAY(sock)) return 0;
538   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
540   va_start(argp, fmt);
541 #ifdef HAVE_VSNPRINTF
542   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
543 #else
544   len = vsprintf(buffer, fmt, argp);
545 #endif
546   va_end(argp);
547   if (len < 0)
548   {
549     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
550     return -1;
551   }
553   return add_to_wbuf(sock, buffer, len);
554 } /* }}} static int add_response_info */
556 static int count_lines(char *str) /* {{{ */
558   int lines = 0;
560   if (str != NULL)
561   {
562     while ((str = strchr(str, '\n')) != NULL)
563     {
564       ++lines;
565       ++str;
566     }
567   }
569   return lines;
570 } /* }}} static int count_lines */
572 /* send the response back to the user.
573  * returns 0 on success, -1 on error
574  * write buffer is always zeroed after this call */
575 static int send_response (listen_socket_t *sock, response_code rc,
576                           char *fmt, ...) /* {{{ */
578   va_list argp;
579   char buffer[CMD_MAX];
580   int lines;
581   ssize_t wrote;
582   int rclen, len;
584   if (JOURNAL_REPLAY(sock)) return rc;
586   if (sock->batch_start)
587   {
588     if (rc == RESP_OK)
589       return rc; /* no response on success during BATCH */
590     lines = sock->batch_cmd;
591   }
592   else if (rc == RESP_OK)
593     lines = count_lines(sock->wbuf);
594   else
595     lines = -1;
597   rclen = sprintf(buffer, "%d ", lines);
598   va_start(argp, fmt);
599 #ifdef HAVE_VSNPRINTF
600   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
601 #else
602   len = vsprintf(buffer+rclen, fmt, argp);
603 #endif
604   va_end(argp);
605   if (len < 0)
606     return -1;
608   len += rclen;
610   /* append the result to the wbuf, don't write to the user */
611   if (sock->batch_start)
612     return add_to_wbuf(sock, buffer, len);
614   /* first write must be complete */
615   if (len != write(sock->fd, buffer, len))
616   {
617     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
618     return -1;
619   }
621   if (sock->wbuf != NULL && rc == RESP_OK)
622   {
623     wrote = 0;
624     while (wrote < sock->wbuf_len)
625     {
626       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
627       if (wb <= 0)
628       {
629         RRDD_LOG(LOG_INFO, "send_response: could not write results");
630         return -1;
631       }
632       wrote += wb;
633     }
634   }
636   free(sock->wbuf); sock->wbuf = NULL;
637   sock->wbuf_len = 0;
639   return 0;
640 } /* }}} */
642 static void wipe_ci_values(cache_item_t *ci, time_t when)
644   ci->values = NULL;
645   ci->values_num = 0;
647   ci->last_flush_time = when;
648   if (config_write_jitter > 0)
649     ci->last_flush_time += (rrd_random() % config_write_jitter);
652 /* remove_from_queue
653  * remove a "cache_item_t" item from the queue.
654  * must hold 'cache_lock' when calling this
655  */
656 static void remove_from_queue(cache_item_t *ci) /* {{{ */
658   if (ci == NULL) return;
659   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
661   if (ci->prev == NULL)
662     cache_queue_head = ci->next; /* reset head */
663   else
664     ci->prev->next = ci->next;
666   if (ci->next == NULL)
667     cache_queue_tail = ci->prev; /* reset the tail */
668   else
669     ci->next->prev = ci->prev;
671   ci->next = ci->prev = NULL;
672   ci->flags &= ~CI_FLAGS_IN_QUEUE;
674   pthread_mutex_lock (&stats_lock);
675   assert (stats_queue_length > 0);
676   stats_queue_length--;
677   pthread_mutex_unlock (&stats_lock);
679 } /* }}} static void remove_from_queue */
681 /* free the resources associated with the cache_item_t
682  * must hold cache_lock when calling this function
683  */
684 static void *free_cache_item(cache_item_t *ci) /* {{{ */
686   if (ci == NULL) return NULL;
688   remove_from_queue(ci);
690   for (size_t i=0; i < ci->values_num; i++)
691     free(ci->values[i]);
693   free (ci->values);
694   free (ci->file);
696   /* in case anyone is waiting */
697   pthread_cond_broadcast(&ci->flushed);
698   pthread_cond_destroy(&ci->flushed);
700   free (ci);
702   return NULL;
703 } /* }}} static void *free_cache_item */
705 /*
706  * enqueue_cache_item:
707  * `cache_lock' must be acquired before calling this function!
708  */
709 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
710     queue_side_t side)
712   if (ci == NULL)
713     return (-1);
715   if (ci->values_num == 0)
716     return (0);
718   if (side == HEAD)
719   {
720     if (cache_queue_head == ci)
721       return 0;
723     /* remove if further down in queue */
724     remove_from_queue(ci);
726     ci->prev = NULL;
727     ci->next = cache_queue_head;
728     if (ci->next != NULL)
729       ci->next->prev = ci;
730     cache_queue_head = ci;
732     if (cache_queue_tail == NULL)
733       cache_queue_tail = cache_queue_head;
734   }
735   else /* (side == TAIL) */
736   {
737     /* We don't move values back in the list.. */
738     if (ci->flags & CI_FLAGS_IN_QUEUE)
739       return (0);
741     assert (ci->next == NULL);
742     assert (ci->prev == NULL);
744     ci->prev = cache_queue_tail;
746     if (cache_queue_tail == NULL)
747       cache_queue_head = ci;
748     else
749       cache_queue_tail->next = ci;
751     cache_queue_tail = ci;
752   }
754   ci->flags |= CI_FLAGS_IN_QUEUE;
756   pthread_cond_signal(&queue_cond);
757   pthread_mutex_lock (&stats_lock);
758   stats_queue_length++;
759   pthread_mutex_unlock (&stats_lock);
761   return (0);
762 } /* }}} int enqueue_cache_item */
764 /*
765  * tree_callback_flush:
766  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
767  * while this is in progress.
768  */
769 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
770     gpointer data)
772   cache_item_t *ci;
773   callback_flush_data_t *cfd;
775   ci = (cache_item_t *) value;
776   cfd = (callback_flush_data_t *) data;
778   if (ci->flags & CI_FLAGS_IN_QUEUE)
779     return FALSE;
781   if (ci->values_num > 0
782       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
783   {
784     enqueue_cache_item (ci, TAIL);
785   }
786   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
787       && (ci->values_num <= 0))
788   {
789     assert ((char *) key == ci->file);
790     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
791     {
792       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
793       return (FALSE);
794     }
795   }
797   return (FALSE);
798 } /* }}} gboolean tree_callback_flush */
800 static int flush_old_values (int max_age)
802   callback_flush_data_t cfd;
803   size_t k;
805   memset (&cfd, 0, sizeof (cfd));
806   /* Pass the current time as user data so that we don't need to call
807    * `time' for each node. */
808   cfd.now = time (NULL);
809   cfd.keys = NULL;
810   cfd.keys_num = 0;
812   if (max_age > 0)
813     cfd.abs_timeout = cfd.now - max_age;
814   else
815     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
817   /* `tree_callback_flush' will return the keys of all values that haven't
818    * been touched in the last `config_flush_interval' seconds in `cfd'.
819    * The char*'s in this array point to the same memory as ci->file, so we
820    * don't need to free them separately. */
821   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
823   for (k = 0; k < cfd.keys_num; k++)
824   {
825     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
826     /* should never fail, since we have held the cache_lock
827      * the entire time */
828     assert(status == TRUE);
829   }
831   if (cfd.keys != NULL)
832   {
833     free (cfd.keys);
834     cfd.keys = NULL;
835   }
837   return (0);
838 } /* int flush_old_values */
840 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
842   struct timeval now;
843   struct timespec next_flush;
844   int status;
846   gettimeofday (&now, NULL);
847   next_flush.tv_sec = now.tv_sec + config_flush_interval;
848   next_flush.tv_nsec = 1000 * now.tv_usec;
850   pthread_mutex_lock(&cache_lock);
852   while (state == RUNNING)
853   {
854     gettimeofday (&now, NULL);
855     if ((now.tv_sec > next_flush.tv_sec)
856         || ((now.tv_sec == next_flush.tv_sec)
857           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
858     {
859       RRDD_LOG(LOG_DEBUG, "flushing old values");
861       /* Determine the time of the next cache flush. */
862       next_flush.tv_sec = now.tv_sec + config_flush_interval;
864       /* Flush all values that haven't been written in the last
865        * `config_write_interval' seconds. */
866       flush_old_values (config_write_interval);
868       /* unlock the cache while we rotate so we don't block incoming
869        * updates if the fsync() blocks on disk I/O */
870       pthread_mutex_unlock(&cache_lock);
871       journal_rotate();
872       pthread_mutex_lock(&cache_lock);
873     }
875     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
876     if (status != 0 && status != ETIMEDOUT)
877     {
878       RRDD_LOG (LOG_ERR, "flush_thread_main: "
879                 "pthread_cond_timedwait returned %i.", status);
880     }
881   }
883   if (config_flush_at_shutdown)
884     flush_old_values (-1); /* flush everything */
886   state = SHUTDOWN;
888   pthread_mutex_unlock(&cache_lock);
890   return NULL;
891 } /* void *flush_thread_main */
893 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
895   pthread_mutex_lock (&cache_lock);
897   while (state != SHUTDOWN
898          || (cache_queue_head != NULL && config_flush_at_shutdown))
899   {
900     cache_item_t *ci;
901     char *file;
902     char **values;
903     size_t values_num;
904     int status;
906     /* Now, check if there's something to store away. If not, wait until
907      * something comes in. */
908     if (cache_queue_head == NULL)
909     {
910       status = pthread_cond_wait (&queue_cond, &cache_lock);
911       if ((status != 0) && (status != ETIMEDOUT))
912       {
913         RRDD_LOG (LOG_ERR, "queue_thread_main: "
914             "pthread_cond_wait returned %i.", status);
915       }
916     }
918     /* Check if a value has arrived. This may be NULL if we timed out or there
919      * was an interrupt such as a signal. */
920     if (cache_queue_head == NULL)
921       continue;
923     ci = cache_queue_head;
925     /* copy the relevant parts */
926     file = strdup (ci->file);
927     if (file == NULL)
928     {
929       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
930       continue;
931     }
933     assert(ci->values != NULL);
934     assert(ci->values_num > 0);
936     values = ci->values;
937     values_num = ci->values_num;
939     wipe_ci_values(ci, time(NULL));
940     remove_from_queue(ci);
942     pthread_mutex_unlock (&cache_lock);
944     rrd_clear_error ();
945     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
946     if (status != 0)
947     {
948       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
949           "rrd_update_r (%s) failed with status %i. (%s)",
950           file, status, rrd_get_error());
951     }
953     journal_write("wrote", file);
955     /* Search again in the tree.  It's possible someone issued a "FORGET"
956      * while we were writing the update values. */
957     pthread_mutex_lock(&cache_lock);
958     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
959     if (ci)
960       pthread_cond_broadcast(&ci->flushed);
961     pthread_mutex_unlock(&cache_lock);
963     if (status == 0)
964     {
965       pthread_mutex_lock (&stats_lock);
966       stats_updates_written++;
967       stats_data_sets_written += values_num;
968       pthread_mutex_unlock (&stats_lock);
969     }
971     rrd_free_ptrs((void ***) &values, &values_num);
972     free(file);
974     pthread_mutex_lock (&cache_lock);
975   }
976   pthread_mutex_unlock (&cache_lock);
978   return (NULL);
979 } /* }}} void *queue_thread_main */
981 static int buffer_get_field (char **buffer_ret, /* {{{ */
982     size_t *buffer_size_ret, char **field_ret)
984   char *buffer;
985   size_t buffer_pos;
986   size_t buffer_size;
987   char *field;
988   size_t field_size;
989   int status;
991   buffer = *buffer_ret;
992   buffer_pos = 0;
993   buffer_size = *buffer_size_ret;
994   field = *buffer_ret;
995   field_size = 0;
997   if (buffer_size <= 0)
998     return (-1);
1000   /* This is ensured by `handle_request'. */
1001   assert (buffer[buffer_size - 1] == '\0');
1003   status = -1;
1004   while (buffer_pos < buffer_size)
1005   {
1006     /* Check for end-of-field or end-of-buffer */
1007     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1008     {
1009       field[field_size] = 0;
1010       field_size++;
1011       buffer_pos++;
1012       status = 0;
1013       break;
1014     }
1015     /* Handle escaped characters. */
1016     else if (buffer[buffer_pos] == '\\')
1017     {
1018       if (buffer_pos >= (buffer_size - 1))
1019         break;
1020       buffer_pos++;
1021       field[field_size] = buffer[buffer_pos];
1022       field_size++;
1023       buffer_pos++;
1024     }
1025     /* Normal operation */ 
1026     else
1027     {
1028       field[field_size] = buffer[buffer_pos];
1029       field_size++;
1030       buffer_pos++;
1031     }
1032   } /* while (buffer_pos < buffer_size) */
1034   if (status != 0)
1035     return (status);
1037   *buffer_ret = buffer + buffer_pos;
1038   *buffer_size_ret = buffer_size - buffer_pos;
1039   *field_ret = field;
1041   return (0);
1042 } /* }}} int buffer_get_field */
1044 /* if we're restricting writes to the base directory,
1045  * check whether the file falls within the dir
1046  * returns 1 if OK, otherwise 0
1047  */
1048 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1050   assert(file != NULL);
1052   if (!config_write_base_only
1053       || JOURNAL_REPLAY(sock)
1054       || config_base_dir == NULL)
1055     return 1;
1057   if (strstr(file, "../") != NULL) goto err;
1059   /* relative paths without "../" are ok */
1060   if (*file != '/') return 1;
1062   /* file must be of the format base + "/" + <1+ char filename> */
1063   if (strlen(file) < _config_base_dir_len + 2) goto err;
1064   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1065   if (*(file + _config_base_dir_len) != '/') goto err;
1067   return 1;
1069 err:
1070   if (sock != NULL && sock->fd >= 0)
1071     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1073   return 0;
1074 } /* }}} static int check_file_access */
1076 /* when using a base dir, convert relative paths to absolute paths.
1077  * if necessary, modifies the "filename" pointer to point
1078  * to the new path created in "tmp".  "tmp" is provided
1079  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1080  *
1081  * this allows us to optimize for the expected case (absolute path)
1082  * with a no-op.
1083  */
1084 static void get_abs_path(char **filename, char *tmp)
1086   assert(tmp != NULL);
1087   assert(filename != NULL && *filename != NULL);
1089   if (config_base_dir == NULL || **filename == '/')
1090     return;
1092   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1093   *filename = tmp;
1094 } /* }}} static int get_abs_path */
1096 static int flush_file (const char *filename) /* {{{ */
1098   cache_item_t *ci;
1100   pthread_mutex_lock (&cache_lock);
1102   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1103   if (ci == NULL)
1104   {
1105     pthread_mutex_unlock (&cache_lock);
1106     return (ENOENT);
1107   }
1109   if (ci->values_num > 0)
1110   {
1111     /* Enqueue at head */
1112     enqueue_cache_item (ci, HEAD);
1113     pthread_cond_wait(&ci->flushed, &cache_lock);
1114   }
1116   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1117    * may have been purged during our cond_wait() */
1119   pthread_mutex_unlock(&cache_lock);
1121   return (0);
1122 } /* }}} int flush_file */
1124 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1126   char *err = "Syntax error.\n";
1128   if (cmd && cmd->syntax)
1129     err = cmd->syntax;
1131   return send_response(sock, RESP_ERR, "Usage: %s", err);
1132 } /* }}} static int syntax_error() */
1134 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1136   uint64_t copy_queue_length;
1137   uint64_t copy_updates_received;
1138   uint64_t copy_flush_received;
1139   uint64_t copy_updates_written;
1140   uint64_t copy_data_sets_written;
1141   uint64_t copy_journal_bytes;
1142   uint64_t copy_journal_rotate;
1144   uint64_t tree_nodes_number;
1145   uint64_t tree_depth;
1147   pthread_mutex_lock (&stats_lock);
1148   copy_queue_length       = stats_queue_length;
1149   copy_updates_received   = stats_updates_received;
1150   copy_flush_received     = stats_flush_received;
1151   copy_updates_written    = stats_updates_written;
1152   copy_data_sets_written  = stats_data_sets_written;
1153   copy_journal_bytes      = stats_journal_bytes;
1154   copy_journal_rotate     = stats_journal_rotate;
1155   pthread_mutex_unlock (&stats_lock);
1157   pthread_mutex_lock (&cache_lock);
1158   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1159   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1160   pthread_mutex_unlock (&cache_lock);
1162   add_response_info(sock,
1163                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1164   add_response_info(sock,
1165                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1166   add_response_info(sock,
1167                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1168   add_response_info(sock,
1169                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1170   add_response_info(sock,
1171                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1172   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1173   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1174   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1175   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1177   send_response(sock, RESP_OK, "Statistics follow\n");
1179   return (0);
1180 } /* }}} int handle_request_stats */
1182 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1184   char *file, file_tmp[PATH_MAX];
1185   int status;
1187   status = buffer_get_field (&buffer, &buffer_size, &file);
1188   if (status != 0)
1189   {
1190     return syntax_error(sock,cmd);
1191   }
1192   else
1193   {
1194     pthread_mutex_lock(&stats_lock);
1195     stats_flush_received++;
1196     pthread_mutex_unlock(&stats_lock);
1198     get_abs_path(&file, file_tmp);
1199     if (!check_file_access(file, sock)) return 0;
1201     status = flush_file (file);
1202     if (status == 0)
1203       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1204     else if (status == ENOENT)
1205     {
1206       /* no file in our tree; see whether it exists at all */
1207       struct stat statbuf;
1209       memset(&statbuf, 0, sizeof(statbuf));
1210       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1211         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1212       else
1213         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1214     }
1215     else if (status < 0)
1216       return send_response(sock, RESP_ERR, "Internal error.\n");
1217     else
1218       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1219   }
1221   /* NOTREACHED */
1222   assert(1==0);
1223 } /* }}} int handle_request_flush */
1225 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1227   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1229   pthread_mutex_lock(&cache_lock);
1230   flush_old_values(-1);
1231   pthread_mutex_unlock(&cache_lock);
1233   return send_response(sock, RESP_OK, "Started flush.\n");
1234 } /* }}} static int handle_request_flushall */
1236 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1238   int status;
1239   char *file, file_tmp[PATH_MAX];
1240   cache_item_t *ci;
1242   status = buffer_get_field(&buffer, &buffer_size, &file);
1243   if (status != 0)
1244     return syntax_error(sock,cmd);
1246   get_abs_path(&file, file_tmp);
1248   pthread_mutex_lock(&cache_lock);
1249   ci = g_tree_lookup(cache_tree, file);
1250   if (ci == NULL)
1251   {
1252     pthread_mutex_unlock(&cache_lock);
1253     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1254   }
1256   for (size_t i=0; i < ci->values_num; i++)
1257     add_response_info(sock, "%s\n", ci->values[i]);
1259   pthread_mutex_unlock(&cache_lock);
1260   return send_response(sock, RESP_OK, "updates pending\n");
1261 } /* }}} static int handle_request_pending */
1263 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1265   int status;
1266   gboolean found;
1267   char *file, file_tmp[PATH_MAX];
1269   status = buffer_get_field(&buffer, &buffer_size, &file);
1270   if (status != 0)
1271     return syntax_error(sock,cmd);
1273   get_abs_path(&file, file_tmp);
1274   if (!check_file_access(file, sock)) return 0;
1276   pthread_mutex_lock(&cache_lock);
1277   found = g_tree_remove(cache_tree, file);
1278   pthread_mutex_unlock(&cache_lock);
1280   if (found == TRUE)
1281   {
1282     if (!JOURNAL_REPLAY(sock))
1283       journal_write("forget", file);
1285     return send_response(sock, RESP_OK, "Gone!\n");
1286   }
1287   else
1288     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1290   /* NOTREACHED */
1291   assert(1==0);
1292 } /* }}} static int handle_request_forget */
1294 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1296   cache_item_t *ci;
1298   pthread_mutex_lock(&cache_lock);
1300   ci = cache_queue_head;
1301   while (ci != NULL)
1302   {
1303     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1304     ci = ci->next;
1305   }
1307   pthread_mutex_unlock(&cache_lock);
1309   return send_response(sock, RESP_OK, "in queue.\n");
1310 } /* }}} int handle_request_queue */
1312 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1314   char *file, file_tmp[PATH_MAX];
1315   int values_num = 0;
1316   int status;
1317   char orig_buf[CMD_MAX];
1319   cache_item_t *ci;
1321   /* save it for the journal later */
1322   if (!JOURNAL_REPLAY(sock))
1323     strncpy(orig_buf, buffer, min(CMD_MAX,buffer_size));
1325   status = buffer_get_field (&buffer, &buffer_size, &file);
1326   if (status != 0)
1327     return syntax_error(sock,cmd);
1329   pthread_mutex_lock(&stats_lock);
1330   stats_updates_received++;
1331   pthread_mutex_unlock(&stats_lock);
1333   get_abs_path(&file, file_tmp);
1334   if (!check_file_access(file, sock)) return 0;
1336   pthread_mutex_lock (&cache_lock);
1337   ci = g_tree_lookup (cache_tree, file);
1339   if (ci == NULL) /* {{{ */
1340   {
1341     struct stat statbuf;
1342     cache_item_t *tmp;
1344     /* don't hold the lock while we setup; stat(2) might block */
1345     pthread_mutex_unlock(&cache_lock);
1347     memset (&statbuf, 0, sizeof (statbuf));
1348     status = stat (file, &statbuf);
1349     if (status != 0)
1350     {
1351       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1353       status = errno;
1354       if (status == ENOENT)
1355         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1356       else
1357         return send_response(sock, RESP_ERR,
1358                              "stat failed with error %i.\n", status);
1359     }
1360     if (!S_ISREG (statbuf.st_mode))
1361       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1363     if (access(file, R_OK|W_OK) != 0)
1364       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1365                            file, rrd_strerror(errno));
1367     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1368     if (ci == NULL)
1369     {
1370       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1372       return send_response(sock, RESP_ERR, "malloc failed.\n");
1373     }
1374     memset (ci, 0, sizeof (cache_item_t));
1376     ci->file = strdup (file);
1377     if (ci->file == NULL)
1378     {
1379       free (ci);
1380       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1382       return send_response(sock, RESP_ERR, "strdup failed.\n");
1383     }
1385     wipe_ci_values(ci, now);
1386     ci->flags = CI_FLAGS_IN_TREE;
1387     pthread_cond_init(&ci->flushed, NULL);
1389     pthread_mutex_lock(&cache_lock);
1391     /* another UPDATE might have added this entry in the meantime */
1392     tmp = g_tree_lookup (cache_tree, file);
1393     if (tmp == NULL)
1394       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1395     else
1396     {
1397       free_cache_item (ci);
1398       ci = tmp;
1399     }
1401     /* state may have changed while we were unlocked */
1402     if (state == SHUTDOWN)
1403       return -1;
1404   } /* }}} */
1405   assert (ci != NULL);
1407   /* don't re-write updates in replay mode */
1408   if (!JOURNAL_REPLAY(sock))
1409     journal_write("update", orig_buf);
1411   while (buffer_size > 0)
1412   {
1413     char *value;
1414     double stamp;
1415     char *eostamp;
1417     status = buffer_get_field (&buffer, &buffer_size, &value);
1418     if (status != 0)
1419     {
1420       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1421       break;
1422     }
1424     /* make sure update time is always moving forward. We use double here since
1425        update does support subsecond precision for timestamps ... */
1426     stamp = strtod(value, &eostamp);
1427     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1428     {
1429       pthread_mutex_unlock(&cache_lock);
1430       return send_response(sock, RESP_ERR,
1431                            "Cannot find timestamp in '%s'!\n", value);
1432     }
1433     else if (stamp <= ci->last_update_stamp)
1434     {
1435       pthread_mutex_unlock(&cache_lock);
1436       return send_response(sock, RESP_ERR,
1437                            "illegal attempt to update using time %lf when last"
1438                            " update time is %lf (minimum one second step)\n",
1439                            stamp, ci->last_update_stamp);
1440     }
1441     else
1442       ci->last_update_stamp = stamp;
1444     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1445     {
1446       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1447       continue;
1448     }
1450     values_num++;
1451   }
1453   if (((now - ci->last_flush_time) >= config_write_interval)
1454       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1455       && (ci->values_num > 0))
1456   {
1457     enqueue_cache_item (ci, TAIL);
1458   }
1460   pthread_mutex_unlock (&cache_lock);
1462   if (values_num < 1)
1463     return send_response(sock, RESP_ERR, "No values updated.\n");
1464   else
1465     return send_response(sock, RESP_OK,
1466                          "errors, enqueued %i value(s).\n", values_num);
1468   /* NOTREACHED */
1469   assert(1==0);
1471 } /* }}} int handle_request_update */
1473 /* we came across a "WROTE" entry during journal replay.
1474  * throw away any values that we have accumulated for this file
1475  */
1476 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1478   cache_item_t *ci;
1479   const char *file = buffer;
1481   pthread_mutex_lock(&cache_lock);
1483   ci = g_tree_lookup(cache_tree, file);
1484   if (ci == NULL)
1485   {
1486     pthread_mutex_unlock(&cache_lock);
1487     return (0);
1488   }
1490   if (ci->values)
1491     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1493   wipe_ci_values(ci, now);
1494   remove_from_queue(ci);
1496   pthread_mutex_unlock(&cache_lock);
1497   return (0);
1498 } /* }}} int handle_request_wrote */
1500 /* start "BATCH" processing */
1501 static int batch_start (HANDLER_PROTO) /* {{{ */
1503   int status;
1504   if (sock->batch_start)
1505     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1507   status = send_response(sock, RESP_OK,
1508                          "Go ahead.  End with dot '.' on its own line.\n");
1509   sock->batch_start = time(NULL);
1510   sock->batch_cmd = 0;
1512   return status;
1513 } /* }}} static int batch_start */
1515 /* finish "BATCH" processing and return results to the client */
1516 static int batch_done (HANDLER_PROTO) /* {{{ */
1518   assert(sock->batch_start);
1519   sock->batch_start = 0;
1520   sock->batch_cmd  = 0;
1521   return send_response(sock, RESP_OK, "errors\n");
1522 } /* }}} static int batch_done */
1524 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1526   return -1;
1527 } /* }}} static int handle_request_quit */
1529 static command_t list_of_commands[] = { /* {{{ */
1530   {
1531     "UPDATE",
1532     handle_request_update,
1533     CMD_CONTEXT_ANY,
1534     "UPDATE <filename> <values> [<values> ...]\n"
1535     ,
1536     "Adds the given file to the internal cache if it is not yet known and\n"
1537     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1538     "for details.\n"
1539     "\n"
1540     "Each <values> has the following form:\n"
1541     "  <values> = <time>:<value>[:<value>[...]]\n"
1542     "See the rrdupdate(1) manpage for details.\n"
1543   },
1544   {
1545     "WROTE",
1546     handle_request_wrote,
1547     CMD_CONTEXT_JOURNAL,
1548     NULL,
1549     NULL
1550   },
1551   {
1552     "FLUSH",
1553     handle_request_flush,
1554     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1555     "FLUSH <filename>\n"
1556     ,
1557     "Adds the given filename to the head of the update queue and returns\n"
1558     "after it has been dequeued.\n"
1559   },
1560   {
1561     "FLUSHALL",
1562     handle_request_flushall,
1563     CMD_CONTEXT_CLIENT,
1564     "FLUSHALL\n"
1565     ,
1566     "Triggers writing of all pending updates.  Returns immediately.\n"
1567   },
1568   {
1569     "PENDING",
1570     handle_request_pending,
1571     CMD_CONTEXT_CLIENT,
1572     "PENDING <filename>\n"
1573     ,
1574     "Shows any 'pending' updates for a file, in order.\n"
1575     "The updates shown have not yet been written to the underlying RRD file.\n"
1576   },
1577   {
1578     "FORGET",
1579     handle_request_forget,
1580     CMD_CONTEXT_ANY,
1581     "FORGET <filename>\n"
1582     ,
1583     "Removes the file completely from the cache.\n"
1584     "Any pending updates for the file will be lost.\n"
1585   },
1586   {
1587     "QUEUE",
1588     handle_request_queue,
1589     CMD_CONTEXT_CLIENT,
1590     "QUEUE\n"
1591     ,
1592         "Shows all files in the output queue.\n"
1593     "The output is zero or more lines in the following format:\n"
1594     "(where <num_vals> is the number of values to be written)\n"
1595     "\n"
1596     "<num_vals> <filename>\n"
1597   },
1598   {
1599     "STATS",
1600     handle_request_stats,
1601     CMD_CONTEXT_CLIENT,
1602     "STATS\n"
1603     ,
1604     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1605     "a description of the values.\n"
1606   },
1607   {
1608     "HELP",
1609     handle_request_help,
1610     CMD_CONTEXT_CLIENT,
1611     "HELP [<command>]\n",
1612     NULL, /* special! */
1613   },
1614   {
1615     "BATCH",
1616     batch_start,
1617     CMD_CONTEXT_CLIENT,
1618     "BATCH\n"
1619     ,
1620     "The 'BATCH' command permits the client to initiate a bulk load\n"
1621     "   of commands to rrdcached.\n"
1622     "\n"
1623     "Usage:\n"
1624     "\n"
1625     "    client: BATCH\n"
1626     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1627     "    client: command #1\n"
1628     "    client: command #2\n"
1629     "    client: ... and so on\n"
1630     "    client: .\n"
1631     "    server: 2 errors\n"
1632     "    server: 7 message for command #7\n"
1633     "    server: 9 message for command #9\n"
1634     "\n"
1635     "For more information, consult the rrdcached(1) documentation.\n"
1636   },
1637   {
1638     ".",   /* BATCH terminator */
1639     batch_done,
1640     CMD_CONTEXT_BATCH,
1641     NULL,
1642     NULL
1643   },
1644   {
1645     "QUIT",
1646     handle_request_quit,
1647     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1648     "QUIT\n"
1649     ,
1650     "Disconnect from rrdcached.\n"
1651   }
1652 }; /* }}} command_t list_of_commands[] */
1653 static size_t list_of_commands_len = sizeof (list_of_commands)
1654   / sizeof (list_of_commands[0]);
1656 static command_t *find_command(char *cmd)
1658   size_t i;
1660   for (i = 0; i < list_of_commands_len; i++)
1661     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1662       return (&list_of_commands[i]);
1663   return NULL;
1666 /* We currently use the index in the `list_of_commands' array as a bit position
1667  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1668  * outside these functions so that switching to a more elegant storage method
1669  * is easily possible. */
1670 static ssize_t find_command_index (const char *cmd) /* {{{ */
1672   size_t i;
1674   for (i = 0; i < list_of_commands_len; i++)
1675     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1676       return ((ssize_t) i);
1677   return (-1);
1678 } /* }}} ssize_t find_command_index */
1680 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1681     const char *cmd)
1683   ssize_t i;
1685   if (JOURNAL_REPLAY(sock))
1686     return (1);
1688   if (cmd == NULL)
1689     return (-1);
1691   if ((strcasecmp ("QUIT", cmd) == 0)
1692       || (strcasecmp ("HELP", cmd) == 0))
1693     return (1);
1694   else if (strcmp (".", cmd) == 0)
1695     cmd = "BATCH";
1697   i = find_command_index (cmd);
1698   if (i < 0)
1699     return (-1);
1700   assert (i < 32);
1702   if ((sock->permissions & (1 << i)) != 0)
1703     return (1);
1704   return (0);
1705 } /* }}} int socket_permission_check */
1707 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1708     const char *cmd)
1710   ssize_t i;
1712   i = find_command_index (cmd);
1713   if (i < 0)
1714     return (-1);
1715   assert (i < 32);
1717   sock->permissions |= (1 << i);
1718   return (0);
1719 } /* }}} int socket_permission_add */
1721 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1723   sock->permissions = 0;
1724 } /* }}} socket_permission_clear */
1726 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1727     listen_socket_t *src)
1729   dest->permissions = src->permissions;
1730 } /* }}} socket_permission_copy */
1732 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
1734   size_t i;
1736   sock->permissions = 0;
1737   for (i = 0; i < list_of_commands_len; i++)
1738     sock->permissions |= (1 << i);
1739 } /* }}} void socket_permission_set_all */
1741 /* check whether commands are received in the expected context */
1742 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1744   if (JOURNAL_REPLAY(sock))
1745     return (cmd->context & CMD_CONTEXT_JOURNAL);
1746   else if (sock->batch_start)
1747     return (cmd->context & CMD_CONTEXT_BATCH);
1748   else
1749     return (cmd->context & CMD_CONTEXT_CLIENT);
1751   /* NOTREACHED */
1752   assert(1==0);
1755 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1757   int status;
1758   char *cmd_str;
1759   char *resp_txt;
1760   command_t *help = NULL;
1762   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1763   if (status == 0)
1764     help = find_command(cmd_str);
1766   if (help && (help->syntax || help->help))
1767   {
1768     char tmp[CMD_MAX];
1770     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1771     resp_txt = tmp;
1773     if (help->syntax)
1774       add_response_info(sock, "Usage: %s\n", help->syntax);
1776     if (help->help)
1777       add_response_info(sock, "%s\n", help->help);
1778   }
1779   else
1780   {
1781     size_t i;
1783     resp_txt = "Command overview\n";
1785     for (i = 0; i < list_of_commands_len; i++)
1786     {
1787       if (list_of_commands[i].syntax == NULL)
1788         continue;
1789       add_response_info (sock, "%s", list_of_commands[i].syntax);
1790     }
1791   }
1793   return send_response(sock, RESP_OK, resp_txt);
1794 } /* }}} int handle_request_help */
1796 static int handle_request (DISPATCH_PROTO) /* {{{ */
1798   char *buffer_ptr = buffer;
1799   char *cmd_str = NULL;
1800   command_t *cmd = NULL;
1801   int status;
1803   assert (buffer[buffer_size - 1] == '\0');
1805   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1806   if (status != 0)
1807   {
1808     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1809     return (-1);
1810   }
1812   if (sock != NULL && sock->batch_start)
1813     sock->batch_cmd++;
1815   cmd = find_command(cmd_str);
1816   if (!cmd)
1817     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1819   if (!socket_permission_check (sock, cmd->cmd))
1820     return send_response(sock, RESP_ERR, "Permission denied.\n");
1822   if (!command_check_context(sock, cmd))
1823     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1825   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1826 } /* }}} int handle_request */
1828 static void journal_set_free (journal_set *js) /* {{{ */
1830   if (js == NULL)
1831     return;
1833   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1835   free(js);
1836 } /* }}} journal_set_free */
1838 static void journal_set_remove (journal_set *js) /* {{{ */
1840   if (js == NULL)
1841     return;
1843   for (uint i=0; i < js->files_num; i++)
1844   {
1845     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1846     unlink(js->files[i]);
1847   }
1848 } /* }}} journal_set_remove */
1850 /* close current journal file handle.
1851  * MUST hold journal_lock before calling */
1852 static void journal_close(void) /* {{{ */
1854   if (journal_fh != NULL)
1855   {
1856     if (fclose(journal_fh) != 0)
1857       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1858   }
1860   journal_fh = NULL;
1861   journal_size = 0;
1862 } /* }}} journal_close */
1864 /* MUST hold journal_lock before calling */
1865 static void journal_new_file(void) /* {{{ */
1867   struct timeval now;
1868   int  new_fd;
1869   char new_file[PATH_MAX + 1];
1871   assert(journal_dir != NULL);
1872   assert(journal_cur != NULL);
1874   journal_close();
1876   gettimeofday(&now, NULL);
1877   /* this format assures that the files sort in strcmp() order */
1878   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1879            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1881   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1882                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1883   if (new_fd < 0)
1884     goto error;
1886   journal_fh = fdopen(new_fd, "a");
1887   if (journal_fh == NULL)
1888     goto error;
1890   journal_size = ftell(journal_fh);
1891   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1893   /* record the file in the journal set */
1894   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1896   return;
1898 error:
1899   RRDD_LOG(LOG_CRIT,
1900            "JOURNALING DISABLED: Error while trying to create %s : %s",
1901            new_file, rrd_strerror(errno));
1902   RRDD_LOG(LOG_CRIT,
1903            "JOURNALING DISABLED: All values will be flushed at shutdown");
1905   close(new_fd);
1906   config_flush_at_shutdown = 1;
1908 } /* }}} journal_new_file */
1910 /* MUST NOT hold journal_lock before calling this */
1911 static void journal_rotate(void) /* {{{ */
1913   journal_set *old_js = NULL;
1915   if (journal_dir == NULL)
1916     return;
1918   RRDD_LOG(LOG_DEBUG, "rotating journals");
1920   pthread_mutex_lock(&stats_lock);
1921   ++stats_journal_rotate;
1922   pthread_mutex_unlock(&stats_lock);
1924   pthread_mutex_lock(&journal_lock);
1926   journal_close();
1928   /* rotate the journal sets */
1929   old_js = journal_old;
1930   journal_old = journal_cur;
1931   journal_cur = calloc(1, sizeof(journal_set));
1933   if (journal_cur != NULL)
1934     journal_new_file();
1935   else
1936     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1938   pthread_mutex_unlock(&journal_lock);
1940   journal_set_remove(old_js);
1941   journal_set_free  (old_js);
1943 } /* }}} static void journal_rotate */
1945 /* MUST hold journal_lock when calling */
1946 static void journal_done(void) /* {{{ */
1948   if (journal_cur == NULL)
1949     return;
1951   journal_close();
1953   if (config_flush_at_shutdown)
1954   {
1955     RRDD_LOG(LOG_INFO, "removing journals");
1956     journal_set_remove(journal_old);
1957     journal_set_remove(journal_cur);
1958   }
1959   else
1960   {
1961     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1962              "journals will be used at next startup");
1963   }
1965   journal_set_free(journal_cur);
1966   journal_set_free(journal_old);
1967   free(journal_dir);
1969 } /* }}} static void journal_done */
1971 static int journal_write(char *cmd, char *args) /* {{{ */
1973   int chars;
1975   if (journal_fh == NULL)
1976     return 0;
1978   pthread_mutex_lock(&journal_lock);
1979   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1980   journal_size += chars;
1982   if (journal_size > JOURNAL_MAX)
1983     journal_new_file();
1985   pthread_mutex_unlock(&journal_lock);
1987   if (chars > 0)
1988   {
1989     pthread_mutex_lock(&stats_lock);
1990     stats_journal_bytes += chars;
1991     pthread_mutex_unlock(&stats_lock);
1992   }
1994   return chars;
1995 } /* }}} static int journal_write */
1997 static int journal_replay (const char *file) /* {{{ */
1999   FILE *fh;
2000   int entry_cnt = 0;
2001   int fail_cnt = 0;
2002   uint64_t line = 0;
2003   char entry[CMD_MAX];
2004   time_t now;
2006   if (file == NULL) return 0;
2008   {
2009     char *reason = "unknown error";
2010     int status = 0;
2011     struct stat statbuf;
2013     memset(&statbuf, 0, sizeof(statbuf));
2014     if (stat(file, &statbuf) != 0)
2015     {
2016       reason = "stat error";
2017       status = errno;
2018     }
2019     else if (!S_ISREG(statbuf.st_mode))
2020     {
2021       reason = "not a regular file";
2022       status = EPERM;
2023     }
2024     if (statbuf.st_uid != daemon_uid)
2025     {
2026       reason = "not owned by daemon user";
2027       status = EACCES;
2028     }
2029     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2030     {
2031       reason = "must not be user/group writable";
2032       status = EACCES;
2033     }
2035     if (status != 0)
2036     {
2037       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2038                file, rrd_strerror(status), reason);
2039       return 0;
2040     }
2041   }
2043   fh = fopen(file, "r");
2044   if (fh == NULL)
2045   {
2046     if (errno != ENOENT)
2047       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2048                file, rrd_strerror(errno));
2049     return 0;
2050   }
2051   else
2052     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2054   now = time(NULL);
2056   while(!feof(fh))
2057   {
2058     size_t entry_len;
2060     ++line;
2061     if (fgets(entry, sizeof(entry), fh) == NULL)
2062       break;
2063     entry_len = strlen(entry);
2065     /* check \n termination in case journal writing crashed mid-line */
2066     if (entry_len == 0)
2067       continue;
2068     else if (entry[entry_len - 1] != '\n')
2069     {
2070       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2071       ++fail_cnt;
2072       continue;
2073     }
2075     entry[entry_len - 1] = '\0';
2077     if (handle_request(NULL, now, entry, entry_len) == 0)
2078       ++entry_cnt;
2079     else
2080       ++fail_cnt;
2081   }
2083   fclose(fh);
2085   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2086            entry_cnt, fail_cnt);
2088   return entry_cnt > 0 ? 1 : 0;
2089 } /* }}} static int journal_replay */
2091 static int journal_sort(const void *v1, const void *v2)
2093   char **jn1 = (char **) v1;
2094   char **jn2 = (char **) v2;
2096   return strcmp(*jn1,*jn2);
2099 static void journal_init(void) /* {{{ */
2101   int had_journal = 0;
2102   DIR *dir;
2103   struct dirent *dent;
2104   char path[PATH_MAX+1];
2106   if (journal_dir == NULL) return;
2108   pthread_mutex_lock(&journal_lock);
2110   journal_cur = calloc(1, sizeof(journal_set));
2111   if (journal_cur == NULL)
2112   {
2113     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2114     return;
2115   }
2117   RRDD_LOG(LOG_INFO, "checking for journal files");
2119   /* Handle old journal files during transition.  This gives them the
2120    * correct sort order.  TODO: remove after first release
2121    */
2122   {
2123     char old_path[PATH_MAX+1];
2124     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2125     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2126     rename(old_path, path);
2128     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2129     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2130     rename(old_path, path);
2131   }
2133   dir = opendir(journal_dir);
2134   if (!dir) {
2135     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2136     return;
2137   }
2138   while ((dent = readdir(dir)) != NULL)
2139   {
2140     /* looks like a journal file? */
2141     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2142       continue;
2144     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2146     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2147     {
2148       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2149                dent->d_name);
2150       break;
2151     }
2152   }
2153   closedir(dir);
2155   qsort(journal_cur->files, journal_cur->files_num,
2156         sizeof(journal_cur->files[0]), journal_sort);
2158   for (uint i=0; i < journal_cur->files_num; i++)
2159     had_journal += journal_replay(journal_cur->files[i]);
2161   journal_new_file();
2163   /* it must have been a crash.  start a flush */
2164   if (had_journal && config_flush_at_shutdown)
2165     flush_old_values(-1);
2167   pthread_mutex_unlock(&journal_lock);
2169   RRDD_LOG(LOG_INFO, "journal processing complete");
2171 } /* }}} static void journal_init */
2173 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2175   assert(sock != NULL);
2177   free(sock->rbuf);  sock->rbuf = NULL;
2178   free(sock->wbuf);  sock->wbuf = NULL;
2179   free(sock);
2180 } /* }}} void free_listen_socket */
2182 static void close_connection(listen_socket_t *sock) /* {{{ */
2184   if (sock->fd >= 0)
2185   {
2186     close(sock->fd);
2187     sock->fd = -1;
2188   }
2190   free_listen_socket(sock);
2192 } /* }}} void close_connection */
2194 static void *connection_thread_main (void *args) /* {{{ */
2196   listen_socket_t *sock;
2197   int fd;
2199   sock = (listen_socket_t *) args;
2200   fd = sock->fd;
2202   /* init read buffers */
2203   sock->next_read = sock->next_cmd = 0;
2204   sock->rbuf = malloc(RBUF_SIZE);
2205   if (sock->rbuf == NULL)
2206   {
2207     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2208     close_connection(sock);
2209     return NULL;
2210   }
2212   pthread_mutex_lock (&connection_threads_lock);
2213 #ifdef HAVE_LIBWRAP
2214   /* LIBWRAP does not support multiple threads! By putting this code
2215      inside pthread_mutex_lock we do not have to worry about request_info
2216      getting overwritten by another thread.
2217   */
2218   struct request_info req;
2219   request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2220   fromhost(&req);
2221   if(!hosts_access(&req)) {
2222     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2223     pthread_mutex_unlock (&connection_threads_lock);
2224     close_connection(sock);
2225     return NULL;
2226   }
2227 #endif /* HAVE_LIBWRAP */
2228   connection_threads_num++;
2229   pthread_mutex_unlock (&connection_threads_lock);
2231   while (state == RUNNING)
2232   {
2233     char *cmd;
2234     ssize_t cmd_len;
2235     ssize_t rbytes;
2236     time_t now;
2238     struct pollfd pollfd;
2239     int status;
2241     pollfd.fd = fd;
2242     pollfd.events = POLLIN | POLLPRI;
2243     pollfd.revents = 0;
2245     status = poll (&pollfd, 1, /* timeout = */ 500);
2246     if (state != RUNNING)
2247       break;
2248     else if (status == 0) /* timeout */
2249       continue;
2250     else if (status < 0) /* error */
2251     {
2252       status = errno;
2253       if (status != EINTR)
2254         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2255       continue;
2256     }
2258     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2259       break;
2260     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2261     {
2262       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2263           "poll(2) returned something unexpected: %#04hx",
2264           pollfd.revents);
2265       break;
2266     }
2268     rbytes = read(fd, sock->rbuf + sock->next_read,
2269                   RBUF_SIZE - sock->next_read);
2270     if (rbytes < 0)
2271     {
2272       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2273       break;
2274     }
2275     else if (rbytes == 0)
2276       break; /* eof */
2278     sock->next_read += rbytes;
2280     if (sock->batch_start)
2281       now = sock->batch_start;
2282     else
2283       now = time(NULL);
2285     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2286     {
2287       status = handle_request (sock, now, cmd, cmd_len+1);
2288       if (status != 0)
2289         goto out_close;
2290     }
2291   }
2293 out_close:
2294   close_connection(sock);
2296   /* Remove this thread from the connection threads list */
2297   pthread_mutex_lock (&connection_threads_lock);
2298   connection_threads_num--;
2299   if (connection_threads_num <= 0)
2300     pthread_cond_broadcast(&connection_threads_done);
2301   pthread_mutex_unlock (&connection_threads_lock);
2303   return (NULL);
2304 } /* }}} void *connection_thread_main */
2306 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2308   int fd;
2309   struct sockaddr_un sa;
2310   listen_socket_t *temp;
2311   int status;
2312   const char *path;
2313   char *path_copy, *dir;
2315   path = sock->addr;
2316   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2317     path += strlen("unix:");
2319   /* dirname may modify its argument */
2320   path_copy = strdup(path);
2321   if (path_copy == NULL)
2322   {
2323     fprintf(stderr, "rrdcached: strdup(): %s\n",
2324         rrd_strerror(errno));
2325     return (-1);
2326   }
2328   dir = dirname(path_copy);
2329   if (rrd_mkdir_p(dir, 0777) != 0)
2330   {
2331     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2332         dir, rrd_strerror(errno));
2333     return (-1);
2334   }
2336   free(path_copy);
2338   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2339       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2340   if (temp == NULL)
2341   {
2342     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2343     return (-1);
2344   }
2345   listen_fds = temp;
2346   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2348   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2349   if (fd < 0)
2350   {
2351     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2352              rrd_strerror(errno));
2353     return (-1);
2354   }
2356   memset (&sa, 0, sizeof (sa));
2357   sa.sun_family = AF_UNIX;
2358   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2360   /* if we've gotten this far, we own the pid file.  any daemon started
2361    * with the same args must not be alive.  therefore, ensure that we can
2362    * create the socket...
2363    */
2364   unlink(path);
2366   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2367   if (status != 0)
2368   {
2369     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2370              path, rrd_strerror(errno));
2371     close (fd);
2372     return (-1);
2373   }
2375   /* tweak the sockets group ownership */
2376   if (sock->socket_group != (gid_t)-1)
2377   {
2378     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2379          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2380     {
2381       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2382     }
2383   }
2385   if (sock->socket_permissions != (mode_t)-1)
2386   {
2387     if (chmod(path, sock->socket_permissions) != 0)
2388       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2389           (unsigned int)sock->socket_permissions, strerror(errno));
2390   }
2392   status = listen (fd, /* backlog = */ 10);
2393   if (status != 0)
2394   {
2395     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2396              path, rrd_strerror(errno));
2397     close (fd);
2398     unlink (path);
2399     return (-1);
2400   }
2402   listen_fds[listen_fds_num].fd = fd;
2403   listen_fds[listen_fds_num].family = PF_UNIX;
2404   strncpy(listen_fds[listen_fds_num].addr, path,
2405           sizeof (listen_fds[listen_fds_num].addr) - 1);
2406   listen_fds_num++;
2408   return (0);
2409 } /* }}} int open_listen_socket_unix */
2411 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2413   struct addrinfo ai_hints;
2414   struct addrinfo *ai_res;
2415   struct addrinfo *ai_ptr;
2416   char addr_copy[NI_MAXHOST];
2417   char *addr;
2418   char *port;
2419   int status;
2421   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2422   addr_copy[sizeof (addr_copy) - 1] = 0;
2423   addr = addr_copy;
2425   memset (&ai_hints, 0, sizeof (ai_hints));
2426   ai_hints.ai_flags = 0;
2427 #ifdef AI_ADDRCONFIG
2428   ai_hints.ai_flags |= AI_ADDRCONFIG;
2429 #endif
2430   ai_hints.ai_family = AF_UNSPEC;
2431   ai_hints.ai_socktype = SOCK_STREAM;
2433   port = NULL;
2434   if (*addr == '[') /* IPv6+port format */
2435   {
2436     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2437     addr++;
2439     port = strchr (addr, ']');
2440     if (port == NULL)
2441     {
2442       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2443       return (-1);
2444     }
2445     *port = 0;
2446     port++;
2448     if (*port == ':')
2449       port++;
2450     else if (*port == 0)
2451       port = NULL;
2452     else
2453     {
2454       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2455       return (-1);
2456     }
2457   } /* if (*addr == '[') */
2458   else
2459   {
2460     port = rindex(addr, ':');
2461     if (port != NULL)
2462     {
2463       *port = 0;
2464       port++;
2465     }
2466   }
2467   ai_res = NULL;
2468   status = getaddrinfo (addr,
2469                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2470                         &ai_hints, &ai_res);
2471   if (status != 0)
2472   {
2473     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2474              addr, gai_strerror (status));
2475     return (-1);
2476   }
2478   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2479   {
2480     int fd;
2481     listen_socket_t *temp;
2482     int one = 1;
2484     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2485         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2486     if (temp == NULL)
2487     {
2488       fprintf (stderr,
2489                "rrdcached: open_listen_socket_network: realloc failed.\n");
2490       continue;
2491     }
2492     listen_fds = temp;
2493     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2495     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2496     if (fd < 0)
2497     {
2498       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2499                rrd_strerror(errno));
2500       continue;
2501     }
2503     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2505     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2506     if (status != 0)
2507     {
2508       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2509                sock->addr, rrd_strerror(errno));
2510       close (fd);
2511       continue;
2512     }
2514     status = listen (fd, /* backlog = */ 10);
2515     if (status != 0)
2516     {
2517       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2518                sock->addr, rrd_strerror(errno));
2519       close (fd);
2520       freeaddrinfo(ai_res);
2521       return (-1);
2522     }
2524     listen_fds[listen_fds_num].fd = fd;
2525     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2526     listen_fds_num++;
2527   } /* for (ai_ptr) */
2529   freeaddrinfo(ai_res);
2530   return (0);
2531 } /* }}} static int open_listen_socket_network */
2533 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2535   assert(sock != NULL);
2536   assert(sock->addr != NULL);
2538   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2539       || sock->addr[0] == '/')
2540     return (open_listen_socket_unix(sock));
2541   else
2542     return (open_listen_socket_network(sock));
2543 } /* }}} int open_listen_socket */
2545 static int close_listen_sockets (void) /* {{{ */
2547   size_t i;
2549   for (i = 0; i < listen_fds_num; i++)
2550   {
2551     close (listen_fds[i].fd);
2553     if (listen_fds[i].family == PF_UNIX)
2554       unlink(listen_fds[i].addr);
2555   }
2557   free (listen_fds);
2558   listen_fds = NULL;
2559   listen_fds_num = 0;
2561   return (0);
2562 } /* }}} int close_listen_sockets */
2564 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2566   struct pollfd *pollfds;
2567   int pollfds_num;
2568   int status;
2569   int i;
2571   if (listen_fds_num < 1)
2572   {
2573     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2574     return (NULL);
2575   }
2577   pollfds_num = listen_fds_num;
2578   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2579   if (pollfds == NULL)
2580   {
2581     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2582     return (NULL);
2583   }
2584   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2586   RRDD_LOG(LOG_INFO, "listening for connections");
2588   while (state == RUNNING)
2589   {
2590     for (i = 0; i < pollfds_num; i++)
2591     {
2592       pollfds[i].fd = listen_fds[i].fd;
2593       pollfds[i].events = POLLIN | POLLPRI;
2594       pollfds[i].revents = 0;
2595     }
2597     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2598     if (state != RUNNING)
2599       break;
2600     else if (status == 0) /* timeout */
2601       continue;
2602     else if (status < 0) /* error */
2603     {
2604       status = errno;
2605       if (status != EINTR)
2606       {
2607         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2608       }
2609       continue;
2610     }
2612     for (i = 0; i < pollfds_num; i++)
2613     {
2614       listen_socket_t *client_sock;
2615       struct sockaddr_storage client_sa;
2616       socklen_t client_sa_size;
2617       pthread_t tid;
2618       pthread_attr_t attr;
2620       if (pollfds[i].revents == 0)
2621         continue;
2623       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2624       {
2625         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2626             "poll(2) returned something unexpected for listen FD #%i.",
2627             pollfds[i].fd);
2628         continue;
2629       }
2631       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2632       if (client_sock == NULL)
2633       {
2634         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2635         continue;
2636       }
2637       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2639       client_sa_size = sizeof (client_sa);
2640       client_sock->fd = accept (pollfds[i].fd,
2641           (struct sockaddr *) &client_sa, &client_sa_size);
2642       if (client_sock->fd < 0)
2643       {
2644         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2645         free(client_sock);
2646         continue;
2647       }
2649       pthread_attr_init (&attr);
2650       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2652       status = pthread_create (&tid, &attr, connection_thread_main,
2653                                client_sock);
2654       if (status != 0)
2655       {
2656         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2657         close_connection(client_sock);
2658         continue;
2659       }
2660     } /* for (pollfds_num) */
2661   } /* while (state == RUNNING) */
2663   RRDD_LOG(LOG_INFO, "starting shutdown");
2665   close_listen_sockets ();
2667   pthread_mutex_lock (&connection_threads_lock);
2668   while (connection_threads_num > 0)
2669     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2670   pthread_mutex_unlock (&connection_threads_lock);
2672   free(pollfds);
2674   return (NULL);
2675 } /* }}} void *listen_thread_main */
2677 static int daemonize (void) /* {{{ */
2679   int pid_fd;
2680   char *base_dir;
2682   daemon_uid = geteuid();
2684   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2685   if (pid_fd < 0)
2686     pid_fd = check_pidfile();
2687   if (pid_fd < 0)
2688     return pid_fd;
2690   /* open all the listen sockets */
2691   if (config_listen_address_list_len > 0)
2692   {
2693     for (size_t i = 0; i < config_listen_address_list_len; i++)
2694       open_listen_socket (config_listen_address_list[i]);
2696     rrd_free_ptrs((void ***) &config_listen_address_list,
2697                   &config_listen_address_list_len);
2698   }
2699   else
2700   {
2701     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2702         sizeof(default_socket.addr) - 1);
2703     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2705     if (default_socket.permissions == 0)
2706       socket_permission_set_all (&default_socket);
2708     open_listen_socket (&default_socket);
2709   }
2711   if (listen_fds_num < 1)
2712   {
2713     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2714     goto error;
2715   }
2717   if (!stay_foreground)
2718   {
2719     pid_t child;
2721     child = fork ();
2722     if (child < 0)
2723     {
2724       fprintf (stderr, "daemonize: fork(2) failed.\n");
2725       goto error;
2726     }
2727     else if (child > 0)
2728       exit(0);
2730     /* Become session leader */
2731     setsid ();
2733     /* Open the first three file descriptors to /dev/null */
2734     close (2);
2735     close (1);
2736     close (0);
2738     open ("/dev/null", O_RDWR);
2739     if (dup(0) == -1 || dup(0) == -1){
2740         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2741     }
2742   } /* if (!stay_foreground) */
2744   /* Change into the /tmp directory. */
2745   base_dir = (config_base_dir != NULL)
2746     ? config_base_dir
2747     : "/tmp";
2749   if (chdir (base_dir) != 0)
2750   {
2751     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2752     goto error;
2753   }
2755   install_signal_handlers();
2757   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2758   RRDD_LOG(LOG_INFO, "starting up");
2760   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2761                                 (GDestroyNotify) free_cache_item);
2762   if (cache_tree == NULL)
2763   {
2764     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2765     goto error;
2766   }
2768   return write_pidfile (pid_fd);
2770 error:
2771   remove_pidfile();
2772   return -1;
2773 } /* }}} int daemonize */
2775 static int cleanup (void) /* {{{ */
2777   pthread_cond_broadcast (&flush_cond);
2778   pthread_join (flush_thread, NULL);
2780   pthread_cond_broadcast (&queue_cond);
2781   for (int i = 0; i < config_queue_threads; i++)
2782     pthread_join (queue_threads[i], NULL);
2784   if (config_flush_at_shutdown)
2785   {
2786     assert(cache_queue_head == NULL);
2787     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2788   }
2790   free(queue_threads);
2791   free(config_base_dir);
2793   pthread_mutex_lock(&cache_lock);
2794   g_tree_destroy(cache_tree);
2796   pthread_mutex_lock(&journal_lock);
2797   journal_done();
2799   RRDD_LOG(LOG_INFO, "goodbye");
2800   closelog ();
2802   remove_pidfile ();
2803   free(config_pid_file);
2805   return (0);
2806 } /* }}} int cleanup */
2808 static int read_options (int argc, char **argv) /* {{{ */
2810   int option;
2811   int status = 0;
2813   socket_permission_clear (&default_socket);
2815   default_socket.socket_group = (gid_t)-1;
2816   default_socket.socket_permissions = (mode_t)-1;
2818   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2819   {
2820     switch (option)
2821     {
2822       case 'g':
2823         stay_foreground=1;
2824         break;
2826       case 'l':
2827       {
2828         listen_socket_t *new;
2830         new = malloc(sizeof(listen_socket_t));
2831         if (new == NULL)
2832         {
2833           fprintf(stderr, "read_options: malloc failed.\n");
2834           return(2);
2835         }
2836         memset(new, 0, sizeof(listen_socket_t));
2838         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2840         /* Add permissions to the socket {{{ */
2841         if (default_socket.permissions != 0)
2842         {
2843           socket_permission_copy (new, &default_socket);
2844         }
2845         else /* if (default_socket.permissions == 0) */
2846         {
2847           /* Add permission for ALL commands to the socket. */
2848           socket_permission_set_all (new);
2849         }
2850         /* }}} Done adding permissions. */
2852         new->socket_group = default_socket.socket_group;
2853         new->socket_permissions = default_socket.socket_permissions;
2855         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2856                          &config_listen_address_list_len, new))
2857         {
2858           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2859           return (2);
2860         }
2861       }
2862       break;
2864       /* set socket group permissions */
2865       case 's':
2866       {
2867         gid_t group_gid;
2868         struct group *grp;
2870         group_gid = strtoul(optarg, NULL, 10);
2871         if (errno != EINVAL && group_gid>0)
2872         {
2873           /* we were passed a number */
2874           grp = getgrgid(group_gid);
2875         }
2876         else
2877         {
2878           grp = getgrnam(optarg);
2879         }
2881         if (grp)
2882         {
2883           default_socket.socket_group = grp->gr_gid;
2884         }
2885         else
2886         {
2887           /* no idea what the user wanted... */
2888           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2889           return (5);
2890         }
2891       }
2892       break;
2894       /* set socket file permissions */
2895       case 'm':
2896       {
2897         long  tmp;
2898         char *endptr = NULL;
2900         tmp = strtol (optarg, &endptr, 8);
2901         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2902             || (tmp > 07777) || (tmp < 0)) {
2903           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2904               optarg);
2905           return (5);
2906         }
2908         default_socket.socket_permissions = (mode_t)tmp;
2909       }
2910       break;
2912       case 'P':
2913       {
2914         char *optcopy;
2915         char *saveptr;
2916         char *dummy;
2917         char *ptr;
2919         socket_permission_clear (&default_socket);
2921         optcopy = strdup (optarg);
2922         dummy = optcopy;
2923         saveptr = NULL;
2924         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2925         {
2926           dummy = NULL;
2927           status = socket_permission_add (&default_socket, ptr);
2928           if (status != 0)
2929           {
2930             fprintf (stderr, "read_options: Adding permission \"%s\" to "
2931                 "socket failed. Most likely, this permission doesn't "
2932                 "exist. Check your command line.\n", ptr);
2933             status = 4;
2934           }
2935         }
2937         free (optcopy);
2938       }
2939       break;
2941       case 'f':
2942       {
2943         int temp;
2945         temp = atoi (optarg);
2946         if (temp > 0)
2947           config_flush_interval = temp;
2948         else
2949         {
2950           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2951           status = 3;
2952         }
2953       }
2954       break;
2956       case 'w':
2957       {
2958         int temp;
2960         temp = atoi (optarg);
2961         if (temp > 0)
2962           config_write_interval = temp;
2963         else
2964         {
2965           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2966           status = 2;
2967         }
2968       }
2969       break;
2971       case 'z':
2972       {
2973         int temp;
2975         temp = atoi(optarg);
2976         if (temp > 0)
2977           config_write_jitter = temp;
2978         else
2979         {
2980           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2981           status = 2;
2982         }
2984         break;
2985       }
2987       case 't':
2988       {
2989         int threads;
2990         threads = atoi(optarg);
2991         if (threads >= 1)
2992           config_queue_threads = threads;
2993         else
2994         {
2995           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2996           return 1;
2997         }
2998       }
2999       break;
3001       case 'B':
3002         config_write_base_only = 1;
3003         break;
3005       case 'b':
3006       {
3007         size_t len;
3008         char base_realpath[PATH_MAX];
3010         if (config_base_dir != NULL)
3011           free (config_base_dir);
3012         config_base_dir = strdup (optarg);
3013         if (config_base_dir == NULL)
3014         {
3015           fprintf (stderr, "read_options: strdup failed.\n");
3016           return (3);
3017         }
3019         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3020         {
3021           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3022               config_base_dir, rrd_strerror (errno));
3023           return (3);
3024         }
3026         /* make sure that the base directory is not resolved via
3027          * symbolic links.  this makes some performance-enhancing
3028          * assumptions possible (we don't have to resolve paths
3029          * that start with a "/")
3030          */
3031         if (realpath(config_base_dir, base_realpath) == NULL)
3032         {
3033           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3034               "%s\n", config_base_dir, rrd_strerror(errno));
3035           return 5;
3036         }
3038         len = strlen (config_base_dir);
3039         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3040         {
3041           config_base_dir[len - 1] = 0;
3042           len--;
3043         }
3045         if (len < 1)
3046         {
3047           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3048           return (4);
3049         }
3051         _config_base_dir_len = len;
3053         len = strlen (base_realpath);
3054         while ((len > 0) && (base_realpath[len - 1] == '/'))
3055         {
3056           base_realpath[len - 1] = '\0';
3057           len--;
3058         }
3060         if (strncmp(config_base_dir,
3061                          base_realpath, sizeof(base_realpath)) != 0)
3062         {
3063           fprintf(stderr,
3064                   "Base directory (-b) resolved via file system links!\n"
3065                   "Please consult rrdcached '-b' documentation!\n"
3066                   "Consider specifying the real directory (%s)\n",
3067                   base_realpath);
3068           return 5;
3069         }
3070       }
3071       break;
3073       case 'p':
3074       {
3075         if (config_pid_file != NULL)
3076           free (config_pid_file);
3077         config_pid_file = strdup (optarg);
3078         if (config_pid_file == NULL)
3079         {
3080           fprintf (stderr, "read_options: strdup failed.\n");
3081           return (3);
3082         }
3083       }
3084       break;
3086       case 'F':
3087         config_flush_at_shutdown = 1;
3088         break;
3090       case 'j':
3091       {
3092         char journal_dir_actual[PATH_MAX];
3093         const char *dir;
3094         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3096         status = rrd_mkdir_p(dir, 0777);
3097         if (status != 0)
3098         {
3099           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3100               dir, rrd_strerror(errno));
3101           return 6;
3102         }
3104         if (access(dir, R_OK|W_OK|X_OK) != 0)
3105         {
3106           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3107                   errno ? rrd_strerror(errno) : "");
3108           return 6;
3109         }
3110       }
3111       break;
3113       case 'h':
3114       case '?':
3115         printf ("RRDCacheD %s\n"
3116             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3117             "\n"
3118             "Usage: rrdcached [options]\n"
3119             "\n"
3120             "Valid options are:\n"
3121             "  -l <address>  Socket address to listen to.\n"
3122             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3123             "  -P <perms>    Sets the permissions to assign to all following "
3124                             "sockets\n"
3125             "  -w <seconds>  Interval in which to write data.\n"
3126             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3127             "  -t <threads>  Number of write threads.\n"
3128             "  -f <seconds>  Interval in which to flush dead data.\n"
3129             "  -p <file>     Location of the PID-file.\n"
3130             "  -b <dir>      Base directory to change to.\n"
3131             "  -B            Restrict file access to paths within -b <dir>\n"
3132             "  -g            Do not fork and run in the foreground.\n"
3133             "  -j <dir>      Directory in which to create the journal files.\n"
3134             "  -F            Always flush all updates at shutdown\n"
3135             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3136             "                (the socket will also have read/write permissions "
3137                             "for that group)\n"
3138             "  -m <mode>     File permissions (octal) of all following UNIX "
3139                             "sockets\n"
3140             "\n"
3141             "For more information and a detailed description of all options "
3142             "please refer\n"
3143             "to the rrdcached(1) manual page.\n",
3144             VERSION);
3145         if (option == 'h')
3146           status = -1;
3147         else
3148           status = 1;
3149         break;
3150     } /* switch (option) */
3151   } /* while (getopt) */
3153   /* advise the user when values are not sane */
3154   if (config_flush_interval < 2 * config_write_interval)
3155     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3156             " 2x write interval (-w) !\n");
3157   if (config_write_jitter > config_write_interval)
3158     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3159             " write interval (-w) !\n");
3161   if (config_write_base_only && config_base_dir == NULL)
3162     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3163             "  Consult the rrdcached documentation\n");
3165   if (journal_dir == NULL)
3166     config_flush_at_shutdown = 1;
3168   return (status);
3169 } /* }}} int read_options */
3171 int main (int argc, char **argv)
3173   int status;
3175   status = read_options (argc, argv);
3176   if (status != 0)
3177   {
3178     if (status < 0)
3179       status = 0;
3180     return (status);
3181   }
3183   status = daemonize ();
3184   if (status != 0)
3185   {
3186     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3187     return (1);
3188   }
3190   journal_init();
3192   /* start the queue threads */
3193   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3194   if (queue_threads == NULL)
3195   {
3196     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3197     cleanup();
3198     return (1);
3199   }
3200   for (int i = 0; i < config_queue_threads; i++)
3201   {
3202     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3203     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3204     if (status != 0)
3205     {
3206       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3207       cleanup();
3208       return (1);
3209     }
3210   }
3212   /* start the flush thread */
3213   memset(&flush_thread, 0, sizeof(flush_thread));
3214   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3215   if (status != 0)
3216   {
3217     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3218     cleanup();
3219     return (1);
3220   }
3222   listen_thread_main (NULL);
3223   cleanup ();
3225   return (0);
3226 } /* int main */
3228 /*
3229  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3230  */