Code

Imported upstream version 1.4.8
[pkg-rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #include "rrd_tool.h"
67 #include "rrd_client.h"
68 #include "unused.h"
70 #include <stdlib.h>
72 #ifndef WIN32
73 #ifdef HAVE_STDINT_H
74 #  include <stdint.h>
75 #endif
76 #include <unistd.h>
77 #include <strings.h>
78 #include <inttypes.h>
79 #include <sys/socket.h>
81 #else
83 #endif
84 #include <stdio.h>
85 #include <string.h>
87 #include <sys/types.h>
88 #include <sys/stat.h>
89 #include <dirent.h>
90 #include <fcntl.h>
91 #include <signal.h>
92 #include <sys/un.h>
93 #include <netdb.h>
94 #include <poll.h>
95 #include <syslog.h>
96 #include <pthread.h>
97 #include <errno.h>
98 #include <assert.h>
99 #include <sys/time.h>
100 #include <time.h>
101 #include <libgen.h>
102 #include <grp.h>
104 #ifdef HAVE_LIBWRAP
105 #include <tcpd.h>
106 #endif /* HAVE_LIBWRAP */
108 #include <glib.h>
109 /* }}} */
111 #define RRDD_LOG(severity, ...) \
112   do { \
113     if (stay_foreground) { \
114       fprintf(stderr, __VA_ARGS__); \
115       fprintf(stderr, "\n"); } \
116     syslog ((severity), __VA_ARGS__); \
117   } while (0)
119 /*
120  * Types
121  */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124 struct listen_socket_s
126   int fd;
127   char addr[PATH_MAX + 1];
128   int family;
130   /* state for BATCH processing */
131   time_t batch_start;
132   int batch_cmd;
134   /* buffered IO */
135   char *rbuf;
136   off_t next_cmd;
137   off_t next_read;
139   char *wbuf;
140   ssize_t wbuf_len;
142   uint32_t permissions;
144   gid_t  socket_group;
145   mode_t socket_permissions;
146 };
147 typedef struct listen_socket_s listen_socket_t;
149 struct command_s;
150 typedef struct command_s command_t;
151 /* note: guard against "unused" warnings in the handlers */
152 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
153                         time_t UNUSED(now),\
154                         char  UNUSED(*buffer),\
155                         size_t UNUSED(buffer_size)
157 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
158                         DISPATCH_PROTO
160 struct command_s {
161   char   *cmd;
162   int (*handler)(HANDLER_PROTO);
164   char  context;                /* where we expect to see it */
165 #define CMD_CONTEXT_CLIENT      (1<<0)
166 #define CMD_CONTEXT_BATCH       (1<<1)
167 #define CMD_CONTEXT_JOURNAL     (1<<2)
168 #define CMD_CONTEXT_ANY         (0x7f)
170   char *syntax;
171   char *help;
172 };
174 struct cache_item_s;
175 typedef struct cache_item_s cache_item_t;
176 struct cache_item_s
178   char *file;
179   char **values;
180   size_t values_num;
181   time_t last_flush_time;
182   double last_update_stamp;
183 #define CI_FLAGS_IN_TREE  (1<<0)
184 #define CI_FLAGS_IN_QUEUE (1<<1)
185   int flags;
186   pthread_cond_t  flushed;
187   cache_item_t *prev;
188   cache_item_t *next;
189 };
191 struct callback_flush_data_s
193   time_t now;
194   time_t abs_timeout;
195   char **keys;
196   size_t keys_num;
197 };
198 typedef struct callback_flush_data_s callback_flush_data_t;
200 enum queue_side_e
202   HEAD,
203   TAIL
204 };
205 typedef enum queue_side_e queue_side_t;
207 /* describe a set of journal files */
208 typedef struct {
209   char **files;
210   size_t files_num;
211 } journal_set;
213 /* max length of socket command or response */
214 #define CMD_MAX 4096
215 #define RBUF_SIZE (CMD_MAX*2)
217 /*
218  * Variables
219  */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
226 static listen_socket_t default_socket;
228 enum {
229   RUNNING,              /* normal operation */
230   FLUSHING,             /* flushing remaining values */
231   SHUTDOWN              /* shutting down */
232 } state = RUNNING;
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
245 /* Cache stuff */
246 static GTree          *cache_tree = NULL;
247 static cache_item_t   *cache_queue_head = NULL;
248 static cache_item_t   *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
251 static int config_write_interval = 300;
252 static int config_write_jitter   = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
260 static listen_socket_t **config_listen_address_list = NULL;
261 static size_t config_listen_address_list_len = 0;
263 static uint64_t stats_queue_length = 0;
264 static uint64_t stats_updates_received = 0;
265 static uint64_t stats_flush_received = 0;
266 static uint64_t stats_updates_written = 0;
267 static uint64_t stats_data_sets_written = 0;
268 static uint64_t stats_journal_bytes = 0;
269 static uint64_t stats_journal_rotate = 0;
270 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
272 /* Journaled updates */
273 #define JOURNAL_REPLAY(s) ((s) == NULL)
274 #define JOURNAL_BASE "rrd.journal"
275 static journal_set *journal_cur = NULL;
276 static journal_set *journal_old = NULL;
277 static char *journal_dir = NULL;
278 static FILE *journal_fh = NULL;         /* current journal file handle */
279 static long  journal_size = 0;          /* current journal size */
280 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
281 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
282 static int journal_write(char *cmd, char *args);
283 static void journal_done(void);
284 static void journal_rotate(void);
286 /* prototypes for forward refernces */
287 static int handle_request_help (HANDLER_PROTO);
289 /* 
290  * Functions
291  */
292 static void sig_common (const char *sig) /* {{{ */
294   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
295   if (state == RUNNING) {
296       state = FLUSHING;
297   }
298   pthread_cond_broadcast(&flush_cond);
299   pthread_cond_broadcast(&queue_cond);
300 } /* }}} void sig_common */
302 static void sig_int_handler (int UNUSED(s)) /* {{{ */
304   sig_common("INT");
305 } /* }}} void sig_int_handler */
307 static void sig_term_handler (int UNUSED(s)) /* {{{ */
309   sig_common("TERM");
310 } /* }}} void sig_term_handler */
312 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
314   config_flush_at_shutdown = 1;
315   sig_common("USR1");
316 } /* }}} void sig_usr1_handler */
318 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
320   config_flush_at_shutdown = 0;
321   sig_common("USR2");
322 } /* }}} void sig_usr2_handler */
324 static void install_signal_handlers(void) /* {{{ */
326   /* These structures are static, because `sigaction' behaves weird if the are
327    * overwritten.. */
328   static struct sigaction sa_int;
329   static struct sigaction sa_term;
330   static struct sigaction sa_pipe;
331   static struct sigaction sa_usr1;
332   static struct sigaction sa_usr2;
334   /* Install signal handlers */
335   memset (&sa_int, 0, sizeof (sa_int));
336   sa_int.sa_handler = sig_int_handler;
337   sigaction (SIGINT, &sa_int, NULL);
339   memset (&sa_term, 0, sizeof (sa_term));
340   sa_term.sa_handler = sig_term_handler;
341   sigaction (SIGTERM, &sa_term, NULL);
343   memset (&sa_pipe, 0, sizeof (sa_pipe));
344   sa_pipe.sa_handler = SIG_IGN;
345   sigaction (SIGPIPE, &sa_pipe, NULL);
347   memset (&sa_pipe, 0, sizeof (sa_usr1));
348   sa_usr1.sa_handler = sig_usr1_handler;
349   sigaction (SIGUSR1, &sa_usr1, NULL);
351   memset (&sa_usr2, 0, sizeof (sa_usr2));
352   sa_usr2.sa_handler = sig_usr2_handler;
353   sigaction (SIGUSR2, &sa_usr2, NULL);
355 } /* }}} void install_signal_handlers */
357 static int open_pidfile(char *action, int oflag) /* {{{ */
359   int fd;
360   const char *file;
361   char *file_copy, *dir;
363   file = (config_pid_file != NULL)
364     ? config_pid_file
365     : LOCALSTATEDIR "/run/rrdcached.pid";
367   /* dirname may modify its argument */
368   file_copy = strdup(file);
369   if (file_copy == NULL)
370   {
371     fprintf(stderr, "rrdcached: strdup(): %s\n",
372         rrd_strerror(errno));
373     return -1;
374   }
376   dir = dirname(file_copy);
377   if (rrd_mkdir_p(dir, 0777) != 0)
378   {
379     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
380         dir, rrd_strerror(errno));
381     return -1;
382   }
384   free(file_copy);
386   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
387   if (fd < 0)
388     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
389             action, file, rrd_strerror(errno));
391   return(fd);
392 } /* }}} static int open_pidfile */
394 /* check existing pid file to see whether a daemon is running */
395 static int check_pidfile(void)
397   int pid_fd;
398   pid_t pid;
399   char pid_str[16];
401   pid_fd = open_pidfile("open", O_RDWR);
402   if (pid_fd < 0)
403     return pid_fd;
405   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
406     return -1;
408   pid = atoi(pid_str);
409   if (pid <= 0)
410     return -1;
412   /* another running process that we can signal COULD be
413    * a competing rrdcached */
414   if (pid != getpid() && kill(pid, 0) == 0)
415   {
416     fprintf(stderr,
417             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
418     close(pid_fd);
419     return -1;
420   }
422   lseek(pid_fd, 0, SEEK_SET);
423   if (ftruncate(pid_fd, 0) == -1)
424   {
425     fprintf(stderr,
426             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
427     close(pid_fd);
428     return -1;
429   }
431   fprintf(stderr,
432           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
433           "rrdcached: starting normally.\n", pid);
435   return pid_fd;
436 } /* }}} static int check_pidfile */
438 static int write_pidfile (int fd) /* {{{ */
440   pid_t pid;
441   FILE *fh;
443   pid = getpid ();
445   fh = fdopen (fd, "w");
446   if (fh == NULL)
447   {
448     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
449     close(fd);
450     return (-1);
451   }
453   fprintf (fh, "%i\n", (int) pid);
454   fclose (fh);
456   return (0);
457 } /* }}} int write_pidfile */
459 static int remove_pidfile (void) /* {{{ */
461   char *file;
462   int status;
464   file = (config_pid_file != NULL)
465     ? config_pid_file
466     : LOCALSTATEDIR "/run/rrdcached.pid";
468   status = unlink (file);
469   if (status == 0)
470     return (0);
471   return (errno);
472 } /* }}} int remove_pidfile */
474 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
476   char *eol;
478   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
479                sock->next_read - sock->next_cmd);
481   if (eol == NULL)
482   {
483     /* no commands left, move remainder back to front of rbuf */
484     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
485             sock->next_read - sock->next_cmd);
486     sock->next_read -= sock->next_cmd;
487     sock->next_cmd = 0;
488     *len = 0;
489     return NULL;
490   }
491   else
492   {
493     char *cmd = sock->rbuf + sock->next_cmd;
494     *eol = '\0';
496     sock->next_cmd = eol - sock->rbuf + 1;
498     if (eol > sock->rbuf && *(eol-1) == '\r')
499       *(--eol) = '\0'; /* handle "\r\n" EOL */
501     *len = eol - cmd;
503     return cmd;
504   }
506   /* NOTREACHED */
507   assert(1==0);
508 } /* }}} char *next_cmd */
510 /* add the characters directly to the write buffer */
511 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
513   char *new_buf;
515   assert(sock != NULL);
517   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
518   if (new_buf == NULL)
519   {
520     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
521     return -1;
522   }
524   strncpy(new_buf + sock->wbuf_len, str, len + 1);
526   sock->wbuf = new_buf;
527   sock->wbuf_len += len;
529   return 0;
530 } /* }}} static int add_to_wbuf */
532 /* add the text to the "extra" info that's sent after the status line */
533 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
535   va_list argp;
536   char buffer[CMD_MAX];
537   int len;
539   if (JOURNAL_REPLAY(sock)) return 0;
540   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
542   va_start(argp, fmt);
543 #ifdef HAVE_VSNPRINTF
544   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
545 #else
546   len = vsprintf(buffer, fmt, argp);
547 #endif
548   va_end(argp);
549   if (len < 0)
550   {
551     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
552     return -1;
553   }
555   return add_to_wbuf(sock, buffer, len);
556 } /* }}} static int add_response_info */
558 static int count_lines(char *str) /* {{{ */
560   int lines = 0;
562   if (str != NULL)
563   {
564     while ((str = strchr(str, '\n')) != NULL)
565     {
566       ++lines;
567       ++str;
568     }
569   }
571   return lines;
572 } /* }}} static int count_lines */
574 /* send the response back to the user.
575  * returns 0 on success, -1 on error
576  * write buffer is always zeroed after this call */
577 static int send_response (listen_socket_t *sock, response_code rc,
578                           char *fmt, ...) /* {{{ */
580   va_list argp;
581   char buffer[CMD_MAX];
582   int lines;
583   ssize_t wrote;
584   int rclen, len;
586   if (JOURNAL_REPLAY(sock)) return rc;
588   if (sock->batch_start)
589   {
590     if (rc == RESP_OK)
591       return rc; /* no response on success during BATCH */
592     lines = sock->batch_cmd;
593   }
594   else if (rc == RESP_OK)
595     lines = count_lines(sock->wbuf);
596   else
597     lines = -1;
599   rclen = sprintf(buffer, "%d ", lines);
600   va_start(argp, fmt);
601 #ifdef HAVE_VSNPRINTF
602   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
603 #else
604   len = vsprintf(buffer+rclen, fmt, argp);
605 #endif
606   va_end(argp);
607   if (len < 0)
608     return -1;
610   len += rclen;
612   /* append the result to the wbuf, don't write to the user */
613   if (sock->batch_start)
614     return add_to_wbuf(sock, buffer, len);
616   /* first write must be complete */
617   if (len != write(sock->fd, buffer, len))
618   {
619     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
620     return -1;
621   }
623   if (sock->wbuf != NULL && rc == RESP_OK)
624   {
625     wrote = 0;
626     while (wrote < sock->wbuf_len)
627     {
628       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
629       if (wb <= 0)
630       {
631         RRDD_LOG(LOG_INFO, "send_response: could not write results");
632         return -1;
633       }
634       wrote += wb;
635     }
636   }
638   free(sock->wbuf); sock->wbuf = NULL;
639   sock->wbuf_len = 0;
641   return 0;
642 } /* }}} */
644 static void wipe_ci_values(cache_item_t *ci, time_t when)
646   ci->values = NULL;
647   ci->values_num = 0;
649   ci->last_flush_time = when;
650   if (config_write_jitter > 0)
651     ci->last_flush_time += (rrd_random() % config_write_jitter);
654 /* remove_from_queue
655  * remove a "cache_item_t" item from the queue.
656  * must hold 'cache_lock' when calling this
657  */
658 static void remove_from_queue(cache_item_t *ci) /* {{{ */
660   if (ci == NULL) return;
661   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
663   if (ci->prev == NULL)
664     cache_queue_head = ci->next; /* reset head */
665   else
666     ci->prev->next = ci->next;
668   if (ci->next == NULL)
669     cache_queue_tail = ci->prev; /* reset the tail */
670   else
671     ci->next->prev = ci->prev;
673   ci->next = ci->prev = NULL;
674   ci->flags &= ~CI_FLAGS_IN_QUEUE;
676   pthread_mutex_lock (&stats_lock);
677   assert (stats_queue_length > 0);
678   stats_queue_length--;
679   pthread_mutex_unlock (&stats_lock);
681 } /* }}} static void remove_from_queue */
683 /* free the resources associated with the cache_item_t
684  * must hold cache_lock when calling this function
685  */
686 static void *free_cache_item(cache_item_t *ci) /* {{{ */
688   if (ci == NULL) return NULL;
690   remove_from_queue(ci);
692   for (size_t i=0; i < ci->values_num; i++)
693     free(ci->values[i]);
695   free (ci->values);
696   free (ci->file);
698   /* in case anyone is waiting */
699   pthread_cond_broadcast(&ci->flushed);
700   pthread_cond_destroy(&ci->flushed);
702   free (ci);
704   return NULL;
705 } /* }}} static void *free_cache_item */
707 /*
708  * enqueue_cache_item:
709  * `cache_lock' must be acquired before calling this function!
710  */
711 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
712     queue_side_t side)
714   if (ci == NULL)
715     return (-1);
717   if (ci->values_num == 0)
718     return (0);
720   if (side == HEAD)
721   {
722     if (cache_queue_head == ci)
723       return 0;
725     /* remove if further down in queue */
726     remove_from_queue(ci);
728     ci->prev = NULL;
729     ci->next = cache_queue_head;
730     if (ci->next != NULL)
731       ci->next->prev = ci;
732     cache_queue_head = ci;
734     if (cache_queue_tail == NULL)
735       cache_queue_tail = cache_queue_head;
736   }
737   else /* (side == TAIL) */
738   {
739     /* We don't move values back in the list.. */
740     if (ci->flags & CI_FLAGS_IN_QUEUE)
741       return (0);
743     assert (ci->next == NULL);
744     assert (ci->prev == NULL);
746     ci->prev = cache_queue_tail;
748     if (cache_queue_tail == NULL)
749       cache_queue_head = ci;
750     else
751       cache_queue_tail->next = ci;
753     cache_queue_tail = ci;
754   }
756   ci->flags |= CI_FLAGS_IN_QUEUE;
758   pthread_cond_signal(&queue_cond);
759   pthread_mutex_lock (&stats_lock);
760   stats_queue_length++;
761   pthread_mutex_unlock (&stats_lock);
763   return (0);
764 } /* }}} int enqueue_cache_item */
766 /*
767  * tree_callback_flush:
768  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
769  * while this is in progress.
770  */
771 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
772     gpointer data)
774   cache_item_t *ci;
775   callback_flush_data_t *cfd;
777   ci = (cache_item_t *) value;
778   cfd = (callback_flush_data_t *) data;
780   if (ci->flags & CI_FLAGS_IN_QUEUE)
781     return FALSE;
783   if (ci->values_num > 0
784       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
785   {
786     enqueue_cache_item (ci, TAIL);
787   }
788   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
789       && (ci->values_num <= 0))
790   {
791     assert ((char *) key == ci->file);
792     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
793     {
794       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
795       return (FALSE);
796     }
797   }
799   return (FALSE);
800 } /* }}} gboolean tree_callback_flush */
802 static int flush_old_values (int max_age)
804   callback_flush_data_t cfd;
805   size_t k;
807   memset (&cfd, 0, sizeof (cfd));
808   /* Pass the current time as user data so that we don't need to call
809    * `time' for each node. */
810   cfd.now = time (NULL);
811   cfd.keys = NULL;
812   cfd.keys_num = 0;
814   if (max_age > 0)
815     cfd.abs_timeout = cfd.now - max_age;
816   else
817     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
819   /* `tree_callback_flush' will return the keys of all values that haven't
820    * been touched in the last `config_flush_interval' seconds in `cfd'.
821    * The char*'s in this array point to the same memory as ci->file, so we
822    * don't need to free them separately. */
823   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
825   for (k = 0; k < cfd.keys_num; k++)
826   {
827     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
828     /* should never fail, since we have held the cache_lock
829      * the entire time */
830     assert(status == TRUE);
831   }
833   if (cfd.keys != NULL)
834   {
835     free (cfd.keys);
836     cfd.keys = NULL;
837   }
839   return (0);
840 } /* int flush_old_values */
842 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
844   struct timeval now;
845   struct timespec next_flush;
846   int status;
848   gettimeofday (&now, NULL);
849   next_flush.tv_sec = now.tv_sec + config_flush_interval;
850   next_flush.tv_nsec = 1000 * now.tv_usec;
852   pthread_mutex_lock(&cache_lock);
854   while (state == RUNNING)
855   {
856     gettimeofday (&now, NULL);
857     if ((now.tv_sec > next_flush.tv_sec)
858         || ((now.tv_sec == next_flush.tv_sec)
859           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
860     {
861       RRDD_LOG(LOG_DEBUG, "flushing old values");
863       /* Determine the time of the next cache flush. */
864       next_flush.tv_sec = now.tv_sec + config_flush_interval;
866       /* Flush all values that haven't been written in the last
867        * `config_write_interval' seconds. */
868       flush_old_values (config_write_interval);
870       /* unlock the cache while we rotate so we don't block incoming
871        * updates if the fsync() blocks on disk I/O */
872       pthread_mutex_unlock(&cache_lock);
873       journal_rotate();
874       pthread_mutex_lock(&cache_lock);
875     }
877     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
878     if (status != 0 && status != ETIMEDOUT)
879     {
880       RRDD_LOG (LOG_ERR, "flush_thread_main: "
881                 "pthread_cond_timedwait returned %i.", status);
882     }
883   }
885   if (config_flush_at_shutdown)
886     flush_old_values (-1); /* flush everything */
888   state = SHUTDOWN;
890   pthread_mutex_unlock(&cache_lock);
892   return NULL;
893 } /* void *flush_thread_main */
895 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
897   pthread_mutex_lock (&cache_lock);
899   while (state != SHUTDOWN
900          || (cache_queue_head != NULL && config_flush_at_shutdown))
901   {
902     cache_item_t *ci;
903     char *file;
904     char **values;
905     size_t values_num;
906     int status;
908     /* Now, check if there's something to store away. If not, wait until
909      * something comes in. */
910     if (cache_queue_head == NULL)
911     {
912       status = pthread_cond_wait (&queue_cond, &cache_lock);
913       if ((status != 0) && (status != ETIMEDOUT))
914       {
915         RRDD_LOG (LOG_ERR, "queue_thread_main: "
916             "pthread_cond_wait returned %i.", status);
917       }
918     }
920     /* Check if a value has arrived. This may be NULL if we timed out or there
921      * was an interrupt such as a signal. */
922     if (cache_queue_head == NULL)
923       continue;
925     ci = cache_queue_head;
927     /* copy the relevant parts */
928     file = strdup (ci->file);
929     if (file == NULL)
930     {
931       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
932       continue;
933     }
935     assert(ci->values != NULL);
936     assert(ci->values_num > 0);
938     values = ci->values;
939     values_num = ci->values_num;
941     wipe_ci_values(ci, time(NULL));
942     remove_from_queue(ci);
944     pthread_mutex_unlock (&cache_lock);
946     rrd_clear_error ();
947     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
948     if (status != 0)
949     {
950       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
951           "rrd_update_r (%s) failed with status %i. (%s)",
952           file, status, rrd_get_error());
953     }
955     journal_write("wrote", file);
957     /* Search again in the tree.  It's possible someone issued a "FORGET"
958      * while we were writing the update values. */
959     pthread_mutex_lock(&cache_lock);
960     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
961     if (ci)
962       pthread_cond_broadcast(&ci->flushed);
963     pthread_mutex_unlock(&cache_lock);
965     if (status == 0)
966     {
967       pthread_mutex_lock (&stats_lock);
968       stats_updates_written++;
969       stats_data_sets_written += values_num;
970       pthread_mutex_unlock (&stats_lock);
971     }
973     rrd_free_ptrs((void ***) &values, &values_num);
974     free(file);
976     pthread_mutex_lock (&cache_lock);
977   }
978   pthread_mutex_unlock (&cache_lock);
980   return (NULL);
981 } /* }}} void *queue_thread_main */
983 static int buffer_get_field (char **buffer_ret, /* {{{ */
984     size_t *buffer_size_ret, char **field_ret)
986   char *buffer;
987   size_t buffer_pos;
988   size_t buffer_size;
989   char *field;
990   size_t field_size;
991   int status;
993   buffer = *buffer_ret;
994   buffer_pos = 0;
995   buffer_size = *buffer_size_ret;
996   field = *buffer_ret;
997   field_size = 0;
999   if (buffer_size <= 0)
1000     return (-1);
1002   /* This is ensured by `handle_request'. */
1003   assert (buffer[buffer_size - 1] == '\0');
1005   status = -1;
1006   while (buffer_pos < buffer_size)
1007   {
1008     /* Check for end-of-field or end-of-buffer */
1009     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1010     {
1011       field[field_size] = 0;
1012       field_size++;
1013       buffer_pos++;
1014       status = 0;
1015       break;
1016     }
1017     /* Handle escaped characters. */
1018     else if (buffer[buffer_pos] == '\\')
1019     {
1020       if (buffer_pos >= (buffer_size - 1))
1021         break;
1022       buffer_pos++;
1023       field[field_size] = buffer[buffer_pos];
1024       field_size++;
1025       buffer_pos++;
1026     }
1027     /* Normal operation */ 
1028     else
1029     {
1030       field[field_size] = buffer[buffer_pos];
1031       field_size++;
1032       buffer_pos++;
1033     }
1034   } /* while (buffer_pos < buffer_size) */
1036   if (status != 0)
1037     return (status);
1039   *buffer_ret = buffer + buffer_pos;
1040   *buffer_size_ret = buffer_size - buffer_pos;
1041   *field_ret = field;
1043   return (0);
1044 } /* }}} int buffer_get_field */
1046 /* if we're restricting writes to the base directory,
1047  * check whether the file falls within the dir
1048  * returns 1 if OK, otherwise 0
1049  */
1050 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1052   assert(file != NULL);
1054   if (!config_write_base_only
1055       || JOURNAL_REPLAY(sock)
1056       || config_base_dir == NULL)
1057     return 1;
1059   if (strstr(file, "../") != NULL) goto err;
1061   /* relative paths without "../" are ok */
1062   if (*file != '/') return 1;
1064   /* file must be of the format base + "/" + <1+ char filename> */
1065   if (strlen(file) < _config_base_dir_len + 2) goto err;
1066   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1067   if (*(file + _config_base_dir_len) != '/') goto err;
1069   return 1;
1071 err:
1072   if (sock != NULL && sock->fd >= 0)
1073     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1075   return 0;
1076 } /* }}} static int check_file_access */
1078 /* when using a base dir, convert relative paths to absolute paths.
1079  * if necessary, modifies the "filename" pointer to point
1080  * to the new path created in "tmp".  "tmp" is provided
1081  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1082  *
1083  * this allows us to optimize for the expected case (absolute path)
1084  * with a no-op.
1085  */
1086 static void get_abs_path(char **filename, char *tmp)
1088   assert(tmp != NULL);
1089   assert(filename != NULL && *filename != NULL);
1091   if (config_base_dir == NULL || **filename == '/')
1092     return;
1094   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1095   *filename = tmp;
1096 } /* }}} static int get_abs_path */
1098 static int flush_file (const char *filename) /* {{{ */
1100   cache_item_t *ci;
1102   pthread_mutex_lock (&cache_lock);
1104   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1105   if (ci == NULL)
1106   {
1107     pthread_mutex_unlock (&cache_lock);
1108     return (ENOENT);
1109   }
1111   if (ci->values_num > 0)
1112   {
1113     /* Enqueue at head */
1114     enqueue_cache_item (ci, HEAD);
1115     pthread_cond_wait(&ci->flushed, &cache_lock);
1116   }
1118   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1119    * may have been purged during our cond_wait() */
1121   pthread_mutex_unlock(&cache_lock);
1123   return (0);
1124 } /* }}} int flush_file */
1126 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1128   char *err = "Syntax error.\n";
1130   if (cmd && cmd->syntax)
1131     err = cmd->syntax;
1133   return send_response(sock, RESP_ERR, "Usage: %s", err);
1134 } /* }}} static int syntax_error() */
1136 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1138   uint64_t copy_queue_length;
1139   uint64_t copy_updates_received;
1140   uint64_t copy_flush_received;
1141   uint64_t copy_updates_written;
1142   uint64_t copy_data_sets_written;
1143   uint64_t copy_journal_bytes;
1144   uint64_t copy_journal_rotate;
1146   uint64_t tree_nodes_number;
1147   uint64_t tree_depth;
1149   pthread_mutex_lock (&stats_lock);
1150   copy_queue_length       = stats_queue_length;
1151   copy_updates_received   = stats_updates_received;
1152   copy_flush_received     = stats_flush_received;
1153   copy_updates_written    = stats_updates_written;
1154   copy_data_sets_written  = stats_data_sets_written;
1155   copy_journal_bytes      = stats_journal_bytes;
1156   copy_journal_rotate     = stats_journal_rotate;
1157   pthread_mutex_unlock (&stats_lock);
1159   pthread_mutex_lock (&cache_lock);
1160   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1161   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1162   pthread_mutex_unlock (&cache_lock);
1164   add_response_info(sock,
1165                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1166   add_response_info(sock,
1167                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1168   add_response_info(sock,
1169                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1170   add_response_info(sock,
1171                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1172   add_response_info(sock,
1173                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1174   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1175   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1176   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1177   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1179   send_response(sock, RESP_OK, "Statistics follow\n");
1181   return (0);
1182 } /* }}} int handle_request_stats */
1184 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1186   char *file, file_tmp[PATH_MAX];
1187   int status;
1189   status = buffer_get_field (&buffer, &buffer_size, &file);
1190   if (status != 0)
1191   {
1192     return syntax_error(sock,cmd);
1193   }
1194   else
1195   {
1196     pthread_mutex_lock(&stats_lock);
1197     stats_flush_received++;
1198     pthread_mutex_unlock(&stats_lock);
1200     get_abs_path(&file, file_tmp);
1201     if (!check_file_access(file, sock)) return 0;
1203     status = flush_file (file);
1204     if (status == 0)
1205       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1206     else if (status == ENOENT)
1207     {
1208       /* no file in our tree; see whether it exists at all */
1209       struct stat statbuf;
1211       memset(&statbuf, 0, sizeof(statbuf));
1212       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1213         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1214       else
1215         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1216     }
1217     else if (status < 0)
1218       return send_response(sock, RESP_ERR, "Internal error.\n");
1219     else
1220       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1221   }
1223   /* NOTREACHED */
1224   assert(1==0);
1225 } /* }}} int handle_request_flush */
1227 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1229   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1231   pthread_mutex_lock(&cache_lock);
1232   flush_old_values(-1);
1233   pthread_mutex_unlock(&cache_lock);
1235   return send_response(sock, RESP_OK, "Started flush.\n");
1236 } /* }}} static int handle_request_flushall */
1238 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1240   int status;
1241   char *file, file_tmp[PATH_MAX];
1242   cache_item_t *ci;
1244   status = buffer_get_field(&buffer, &buffer_size, &file);
1245   if (status != 0)
1246     return syntax_error(sock,cmd);
1248   get_abs_path(&file, file_tmp);
1250   pthread_mutex_lock(&cache_lock);
1251   ci = g_tree_lookup(cache_tree, file);
1252   if (ci == NULL)
1253   {
1254     pthread_mutex_unlock(&cache_lock);
1255     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1256   }
1258   for (size_t i=0; i < ci->values_num; i++)
1259     add_response_info(sock, "%s\n", ci->values[i]);
1261   pthread_mutex_unlock(&cache_lock);
1262   return send_response(sock, RESP_OK, "updates pending\n");
1263 } /* }}} static int handle_request_pending */
1265 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1267   int status;
1268   gboolean found;
1269   char *file, file_tmp[PATH_MAX];
1271   status = buffer_get_field(&buffer, &buffer_size, &file);
1272   if (status != 0)
1273     return syntax_error(sock,cmd);
1275   get_abs_path(&file, file_tmp);
1276   if (!check_file_access(file, sock)) return 0;
1278   pthread_mutex_lock(&cache_lock);
1279   found = g_tree_remove(cache_tree, file);
1280   pthread_mutex_unlock(&cache_lock);
1282   if (found == TRUE)
1283   {
1284     if (!JOURNAL_REPLAY(sock))
1285       journal_write("forget", file);
1287     return send_response(sock, RESP_OK, "Gone!\n");
1288   }
1289   else
1290     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1292   /* NOTREACHED */
1293   assert(1==0);
1294 } /* }}} static int handle_request_forget */
1296 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1298   cache_item_t *ci;
1300   pthread_mutex_lock(&cache_lock);
1302   ci = cache_queue_head;
1303   while (ci != NULL)
1304   {
1305     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1306     ci = ci->next;
1307   }
1309   pthread_mutex_unlock(&cache_lock);
1311   return send_response(sock, RESP_OK, "in queue.\n");
1312 } /* }}} int handle_request_queue */
1314 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1316   char *file, file_tmp[PATH_MAX];
1317   int values_num = 0;
1318   int status;
1319   char orig_buf[CMD_MAX];
1321   cache_item_t *ci;
1323   /* save it for the journal later */
1324   if (!JOURNAL_REPLAY(sock))
1325     strncpy(orig_buf, buffer, min(CMD_MAX,buffer_size));
1327   status = buffer_get_field (&buffer, &buffer_size, &file);
1328   if (status != 0)
1329     return syntax_error(sock,cmd);
1331   pthread_mutex_lock(&stats_lock);
1332   stats_updates_received++;
1333   pthread_mutex_unlock(&stats_lock);
1335   get_abs_path(&file, file_tmp);
1336   if (!check_file_access(file, sock)) return 0;
1338   pthread_mutex_lock (&cache_lock);
1339   ci = g_tree_lookup (cache_tree, file);
1341   if (ci == NULL) /* {{{ */
1342   {
1343     struct stat statbuf;
1344     cache_item_t *tmp;
1346     /* don't hold the lock while we setup; stat(2) might block */
1347     pthread_mutex_unlock(&cache_lock);
1349     memset (&statbuf, 0, sizeof (statbuf));
1350     status = stat (file, &statbuf);
1351     if (status != 0)
1352     {
1353       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1355       status = errno;
1356       if (status == ENOENT)
1357         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1358       else
1359         return send_response(sock, RESP_ERR,
1360                              "stat failed with error %i.\n", status);
1361     }
1362     if (!S_ISREG (statbuf.st_mode))
1363       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1365     if (access(file, R_OK|W_OK) != 0)
1366       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1367                            file, rrd_strerror(errno));
1369     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1370     if (ci == NULL)
1371     {
1372       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1374       return send_response(sock, RESP_ERR, "malloc failed.\n");
1375     }
1376     memset (ci, 0, sizeof (cache_item_t));
1378     ci->file = strdup (file);
1379     if (ci->file == NULL)
1380     {
1381       free (ci);
1382       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1384       return send_response(sock, RESP_ERR, "strdup failed.\n");
1385     }
1387     wipe_ci_values(ci, now);
1388     ci->flags = CI_FLAGS_IN_TREE;
1389     pthread_cond_init(&ci->flushed, NULL);
1391     pthread_mutex_lock(&cache_lock);
1393     /* another UPDATE might have added this entry in the meantime */
1394     tmp = g_tree_lookup (cache_tree, file);
1395     if (tmp == NULL)
1396       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1397     else
1398     {
1399       free_cache_item (ci);
1400       ci = tmp;
1401     }
1403     /* state may have changed while we were unlocked */
1404     if (state == SHUTDOWN)
1405       return -1;
1406   } /* }}} */
1407   assert (ci != NULL);
1409   /* don't re-write updates in replay mode */
1410   if (!JOURNAL_REPLAY(sock))
1411     journal_write("update", orig_buf);
1413   while (buffer_size > 0)
1414   {
1415     char *value;
1416     double stamp;
1417     char *eostamp;
1419     status = buffer_get_field (&buffer, &buffer_size, &value);
1420     if (status != 0)
1421     {
1422       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1423       break;
1424     }
1426     /* make sure update time is always moving forward. We use double here since
1427        update does support subsecond precision for timestamps ... */
1428     stamp = strtod(value, &eostamp);
1429     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1430     {
1431       pthread_mutex_unlock(&cache_lock);
1432       return send_response(sock, RESP_ERR,
1433                            "Cannot find timestamp in '%s'!\n", value);
1434     }
1435     else if (stamp <= ci->last_update_stamp)
1436     {
1437       pthread_mutex_unlock(&cache_lock);
1438       return send_response(sock, RESP_ERR,
1439                            "illegal attempt to update using time %lf when last"
1440                            " update time is %lf (minimum one second step)\n",
1441                            stamp, ci->last_update_stamp);
1442     }
1443     else
1444       ci->last_update_stamp = stamp;
1446     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1447     {
1448       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1449       continue;
1450     }
1452     values_num++;
1453   }
1455   if (((now - ci->last_flush_time) >= config_write_interval)
1456       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1457       && (ci->values_num > 0))
1458   {
1459     enqueue_cache_item (ci, TAIL);
1460   }
1462   pthread_mutex_unlock (&cache_lock);
1464   if (values_num < 1)
1465     return send_response(sock, RESP_ERR, "No values updated.\n");
1466   else
1467     return send_response(sock, RESP_OK,
1468                          "errors, enqueued %i value(s).\n", values_num);
1470   /* NOTREACHED */
1471   assert(1==0);
1473 } /* }}} int handle_request_update */
1475 /* we came across a "WROTE" entry during journal replay.
1476  * throw away any values that we have accumulated for this file
1477  */
1478 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1480   cache_item_t *ci;
1481   const char *file = buffer;
1483   pthread_mutex_lock(&cache_lock);
1485   ci = g_tree_lookup(cache_tree, file);
1486   if (ci == NULL)
1487   {
1488     pthread_mutex_unlock(&cache_lock);
1489     return (0);
1490   }
1492   if (ci->values)
1493     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1495   wipe_ci_values(ci, now);
1496   remove_from_queue(ci);
1498   pthread_mutex_unlock(&cache_lock);
1499   return (0);
1500 } /* }}} int handle_request_wrote */
1502 /* start "BATCH" processing */
1503 static int batch_start (HANDLER_PROTO) /* {{{ */
1505   int status;
1506   if (sock->batch_start)
1507     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1509   status = send_response(sock, RESP_OK,
1510                          "Go ahead.  End with dot '.' on its own line.\n");
1511   sock->batch_start = time(NULL);
1512   sock->batch_cmd = 0;
1514   return status;
1515 } /* }}} static int batch_start */
1517 /* finish "BATCH" processing and return results to the client */
1518 static int batch_done (HANDLER_PROTO) /* {{{ */
1520   assert(sock->batch_start);
1521   sock->batch_start = 0;
1522   sock->batch_cmd  = 0;
1523   return send_response(sock, RESP_OK, "errors\n");
1524 } /* }}} static int batch_done */
1526 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1528   return -1;
1529 } /* }}} static int handle_request_quit */
1531 static command_t list_of_commands[] = { /* {{{ */
1532   {
1533     "UPDATE",
1534     handle_request_update,
1535     CMD_CONTEXT_ANY,
1536     "UPDATE <filename> <values> [<values> ...]\n"
1537     ,
1538     "Adds the given file to the internal cache if it is not yet known and\n"
1539     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1540     "for details.\n"
1541     "\n"
1542     "Each <values> has the following form:\n"
1543     "  <values> = <time>:<value>[:<value>[...]]\n"
1544     "See the rrdupdate(1) manpage for details.\n"
1545   },
1546   {
1547     "WROTE",
1548     handle_request_wrote,
1549     CMD_CONTEXT_JOURNAL,
1550     NULL,
1551     NULL
1552   },
1553   {
1554     "FLUSH",
1555     handle_request_flush,
1556     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1557     "FLUSH <filename>\n"
1558     ,
1559     "Adds the given filename to the head of the update queue and returns\n"
1560     "after it has been dequeued.\n"
1561   },
1562   {
1563     "FLUSHALL",
1564     handle_request_flushall,
1565     CMD_CONTEXT_CLIENT,
1566     "FLUSHALL\n"
1567     ,
1568     "Triggers writing of all pending updates.  Returns immediately.\n"
1569   },
1570   {
1571     "PENDING",
1572     handle_request_pending,
1573     CMD_CONTEXT_CLIENT,
1574     "PENDING <filename>\n"
1575     ,
1576     "Shows any 'pending' updates for a file, in order.\n"
1577     "The updates shown have not yet been written to the underlying RRD file.\n"
1578   },
1579   {
1580     "FORGET",
1581     handle_request_forget,
1582     CMD_CONTEXT_ANY,
1583     "FORGET <filename>\n"
1584     ,
1585     "Removes the file completely from the cache.\n"
1586     "Any pending updates for the file will be lost.\n"
1587   },
1588   {
1589     "QUEUE",
1590     handle_request_queue,
1591     CMD_CONTEXT_CLIENT,
1592     "QUEUE\n"
1593     ,
1594         "Shows all files in the output queue.\n"
1595     "The output is zero or more lines in the following format:\n"
1596     "(where <num_vals> is the number of values to be written)\n"
1597     "\n"
1598     "<num_vals> <filename>\n"
1599   },
1600   {
1601     "STATS",
1602     handle_request_stats,
1603     CMD_CONTEXT_CLIENT,
1604     "STATS\n"
1605     ,
1606     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1607     "a description of the values.\n"
1608   },
1609   {
1610     "HELP",
1611     handle_request_help,
1612     CMD_CONTEXT_CLIENT,
1613     "HELP [<command>]\n",
1614     NULL, /* special! */
1615   },
1616   {
1617     "BATCH",
1618     batch_start,
1619     CMD_CONTEXT_CLIENT,
1620     "BATCH\n"
1621     ,
1622     "The 'BATCH' command permits the client to initiate a bulk load\n"
1623     "   of commands to rrdcached.\n"
1624     "\n"
1625     "Usage:\n"
1626     "\n"
1627     "    client: BATCH\n"
1628     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1629     "    client: command #1\n"
1630     "    client: command #2\n"
1631     "    client: ... and so on\n"
1632     "    client: .\n"
1633     "    server: 2 errors\n"
1634     "    server: 7 message for command #7\n"
1635     "    server: 9 message for command #9\n"
1636     "\n"
1637     "For more information, consult the rrdcached(1) documentation.\n"
1638   },
1639   {
1640     ".",   /* BATCH terminator */
1641     batch_done,
1642     CMD_CONTEXT_BATCH,
1643     NULL,
1644     NULL
1645   },
1646   {
1647     "QUIT",
1648     handle_request_quit,
1649     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1650     "QUIT\n"
1651     ,
1652     "Disconnect from rrdcached.\n"
1653   }
1654 }; /* }}} command_t list_of_commands[] */
1655 static size_t list_of_commands_len = sizeof (list_of_commands)
1656   / sizeof (list_of_commands[0]);
1658 static command_t *find_command(char *cmd)
1660   size_t i;
1662   for (i = 0; i < list_of_commands_len; i++)
1663     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1664       return (&list_of_commands[i]);
1665   return NULL;
1668 /* We currently use the index in the `list_of_commands' array as a bit position
1669  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1670  * outside these functions so that switching to a more elegant storage method
1671  * is easily possible. */
1672 static ssize_t find_command_index (const char *cmd) /* {{{ */
1674   size_t i;
1676   for (i = 0; i < list_of_commands_len; i++)
1677     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1678       return ((ssize_t) i);
1679   return (-1);
1680 } /* }}} ssize_t find_command_index */
1682 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1683     const char *cmd)
1685   ssize_t i;
1687   if (JOURNAL_REPLAY(sock))
1688     return (1);
1690   if (cmd == NULL)
1691     return (-1);
1693   if ((strcasecmp ("QUIT", cmd) == 0)
1694       || (strcasecmp ("HELP", cmd) == 0))
1695     return (1);
1696   else if (strcmp (".", cmd) == 0)
1697     cmd = "BATCH";
1699   i = find_command_index (cmd);
1700   if (i < 0)
1701     return (-1);
1702   assert (i < 32);
1704   if ((sock->permissions & (1 << i)) != 0)
1705     return (1);
1706   return (0);
1707 } /* }}} int socket_permission_check */
1709 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1710     const char *cmd)
1712   ssize_t i;
1714   i = find_command_index (cmd);
1715   if (i < 0)
1716     return (-1);
1717   assert (i < 32);
1719   sock->permissions |= (1 << i);
1720   return (0);
1721 } /* }}} int socket_permission_add */
1723 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1725   sock->permissions = 0;
1726 } /* }}} socket_permission_clear */
1728 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1729     listen_socket_t *src)
1731   dest->permissions = src->permissions;
1732 } /* }}} socket_permission_copy */
1734 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
1736   size_t i;
1738   sock->permissions = 0;
1739   for (i = 0; i < list_of_commands_len; i++)
1740     sock->permissions |= (1 << i);
1741 } /* }}} void socket_permission_set_all */
1743 /* check whether commands are received in the expected context */
1744 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1746   if (JOURNAL_REPLAY(sock))
1747     return (cmd->context & CMD_CONTEXT_JOURNAL);
1748   else if (sock->batch_start)
1749     return (cmd->context & CMD_CONTEXT_BATCH);
1750   else
1751     return (cmd->context & CMD_CONTEXT_CLIENT);
1753   /* NOTREACHED */
1754   assert(1==0);
1757 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1759   int status;
1760   char *cmd_str;
1761   char *resp_txt;
1762   command_t *help = NULL;
1764   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1765   if (status == 0)
1766     help = find_command(cmd_str);
1768   if (help && (help->syntax || help->help))
1769   {
1770     char tmp[CMD_MAX];
1772     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1773     resp_txt = tmp;
1775     if (help->syntax)
1776       add_response_info(sock, "Usage: %s\n", help->syntax);
1778     if (help->help)
1779       add_response_info(sock, "%s\n", help->help);
1780   }
1781   else
1782   {
1783     size_t i;
1785     resp_txt = "Command overview\n";
1787     for (i = 0; i < list_of_commands_len; i++)
1788     {
1789       if (list_of_commands[i].syntax == NULL)
1790         continue;
1791       add_response_info (sock, "%s", list_of_commands[i].syntax);
1792     }
1793   }
1795   return send_response(sock, RESP_OK, resp_txt);
1796 } /* }}} int handle_request_help */
1798 static int handle_request (DISPATCH_PROTO) /* {{{ */
1800   char *buffer_ptr = buffer;
1801   char *cmd_str = NULL;
1802   command_t *cmd = NULL;
1803   int status;
1805   assert (buffer[buffer_size - 1] == '\0');
1807   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1808   if (status != 0)
1809   {
1810     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1811     return (-1);
1812   }
1814   if (sock != NULL && sock->batch_start)
1815     sock->batch_cmd++;
1817   cmd = find_command(cmd_str);
1818   if (!cmd)
1819     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1821   if (!socket_permission_check (sock, cmd->cmd))
1822     return send_response(sock, RESP_ERR, "Permission denied.\n");
1824   if (!command_check_context(sock, cmd))
1825     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1827   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1828 } /* }}} int handle_request */
1830 static void journal_set_free (journal_set *js) /* {{{ */
1832   if (js == NULL)
1833     return;
1835   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1837   free(js);
1838 } /* }}} journal_set_free */
1840 static void journal_set_remove (journal_set *js) /* {{{ */
1842   if (js == NULL)
1843     return;
1845   for (uint i=0; i < js->files_num; i++)
1846   {
1847     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1848     unlink(js->files[i]);
1849   }
1850 } /* }}} journal_set_remove */
1852 /* close current journal file handle.
1853  * MUST hold journal_lock before calling */
1854 static void journal_close(void) /* {{{ */
1856   if (journal_fh != NULL)
1857   {
1858     if (fclose(journal_fh) != 0)
1859       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1860   }
1862   journal_fh = NULL;
1863   journal_size = 0;
1864 } /* }}} journal_close */
1866 /* MUST hold journal_lock before calling */
1867 static void journal_new_file(void) /* {{{ */
1869   struct timeval now;
1870   int  new_fd;
1871   char new_file[PATH_MAX + 1];
1873   assert(journal_dir != NULL);
1874   assert(journal_cur != NULL);
1876   journal_close();
1878   gettimeofday(&now, NULL);
1879   /* this format assures that the files sort in strcmp() order */
1880   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1881            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1883   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1884                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1885   if (new_fd < 0)
1886     goto error;
1888   journal_fh = fdopen(new_fd, "a");
1889   if (journal_fh == NULL)
1890     goto error;
1892   journal_size = ftell(journal_fh);
1893   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1895   /* record the file in the journal set */
1896   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1898   return;
1900 error:
1901   RRDD_LOG(LOG_CRIT,
1902            "JOURNALING DISABLED: Error while trying to create %s : %s",
1903            new_file, rrd_strerror(errno));
1904   RRDD_LOG(LOG_CRIT,
1905            "JOURNALING DISABLED: All values will be flushed at shutdown");
1907   close(new_fd);
1908   config_flush_at_shutdown = 1;
1910 } /* }}} journal_new_file */
1912 /* MUST NOT hold journal_lock before calling this */
1913 static void journal_rotate(void) /* {{{ */
1915   journal_set *old_js = NULL;
1917   if (journal_dir == NULL)
1918     return;
1920   RRDD_LOG(LOG_DEBUG, "rotating journals");
1922   pthread_mutex_lock(&stats_lock);
1923   ++stats_journal_rotate;
1924   pthread_mutex_unlock(&stats_lock);
1926   pthread_mutex_lock(&journal_lock);
1928   journal_close();
1930   /* rotate the journal sets */
1931   old_js = journal_old;
1932   journal_old = journal_cur;
1933   journal_cur = calloc(1, sizeof(journal_set));
1935   if (journal_cur != NULL)
1936     journal_new_file();
1937   else
1938     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1940   pthread_mutex_unlock(&journal_lock);
1942   journal_set_remove(old_js);
1943   journal_set_free  (old_js);
1945 } /* }}} static void journal_rotate */
1947 /* MUST hold journal_lock when calling */
1948 static void journal_done(void) /* {{{ */
1950   if (journal_cur == NULL)
1951     return;
1953   journal_close();
1955   if (config_flush_at_shutdown)
1956   {
1957     RRDD_LOG(LOG_INFO, "removing journals");
1958     journal_set_remove(journal_old);
1959     journal_set_remove(journal_cur);
1960   }
1961   else
1962   {
1963     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1964              "journals will be used at next startup");
1965   }
1967   journal_set_free(journal_cur);
1968   journal_set_free(journal_old);
1969   free(journal_dir);
1971 } /* }}} static void journal_done */
1973 static int journal_write(char *cmd, char *args) /* {{{ */
1975   int chars;
1977   if (journal_fh == NULL)
1978     return 0;
1980   pthread_mutex_lock(&journal_lock);
1981   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1982   journal_size += chars;
1984   if (journal_size > JOURNAL_MAX)
1985     journal_new_file();
1987   pthread_mutex_unlock(&journal_lock);
1989   if (chars > 0)
1990   {
1991     pthread_mutex_lock(&stats_lock);
1992     stats_journal_bytes += chars;
1993     pthread_mutex_unlock(&stats_lock);
1994   }
1996   return chars;
1997 } /* }}} static int journal_write */
1999 static int journal_replay (const char *file) /* {{{ */
2001   FILE *fh;
2002   int entry_cnt = 0;
2003   int fail_cnt = 0;
2004   uint64_t line = 0;
2005   char entry[CMD_MAX];
2006   time_t now;
2008   if (file == NULL) return 0;
2010   {
2011     char *reason = "unknown error";
2012     int status = 0;
2013     struct stat statbuf;
2015     memset(&statbuf, 0, sizeof(statbuf));
2016     if (stat(file, &statbuf) != 0)
2017     {
2018       reason = "stat error";
2019       status = errno;
2020     }
2021     else if (!S_ISREG(statbuf.st_mode))
2022     {
2023       reason = "not a regular file";
2024       status = EPERM;
2025     }
2026     if (statbuf.st_uid != daemon_uid)
2027     {
2028       reason = "not owned by daemon user";
2029       status = EACCES;
2030     }
2031     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2032     {
2033       reason = "must not be user/group writable";
2034       status = EACCES;
2035     }
2037     if (status != 0)
2038     {
2039       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2040                file, rrd_strerror(status), reason);
2041       return 0;
2042     }
2043   }
2045   fh = fopen(file, "r");
2046   if (fh == NULL)
2047   {
2048     if (errno != ENOENT)
2049       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2050                file, rrd_strerror(errno));
2051     return 0;
2052   }
2053   else
2054     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2056   now = time(NULL);
2058   while(!feof(fh))
2059   {
2060     size_t entry_len;
2062     ++line;
2063     if (fgets(entry, sizeof(entry), fh) == NULL)
2064       break;
2065     entry_len = strlen(entry);
2067     /* check \n termination in case journal writing crashed mid-line */
2068     if (entry_len == 0)
2069       continue;
2070     else if (entry[entry_len - 1] != '\n')
2071     {
2072       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2073       ++fail_cnt;
2074       continue;
2075     }
2077     entry[entry_len - 1] = '\0';
2079     if (handle_request(NULL, now, entry, entry_len) == 0)
2080       ++entry_cnt;
2081     else
2082       ++fail_cnt;
2083   }
2085   fclose(fh);
2087   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2088            entry_cnt, fail_cnt);
2090   return entry_cnt > 0 ? 1 : 0;
2091 } /* }}} static int journal_replay */
2093 static int journal_sort(const void *v1, const void *v2)
2095   char **jn1 = (char **) v1;
2096   char **jn2 = (char **) v2;
2098   return strcmp(*jn1,*jn2);
2101 static void journal_init(void) /* {{{ */
2103   int had_journal = 0;
2104   DIR *dir;
2105   struct dirent *dent;
2106   char path[PATH_MAX+1];
2108   if (journal_dir == NULL) return;
2110   pthread_mutex_lock(&journal_lock);
2112   journal_cur = calloc(1, sizeof(journal_set));
2113   if (journal_cur == NULL)
2114   {
2115     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2116     return;
2117   }
2119   RRDD_LOG(LOG_INFO, "checking for journal files");
2121   /* Handle old journal files during transition.  This gives them the
2122    * correct sort order.  TODO: remove after first release
2123    */
2124   {
2125     char old_path[PATH_MAX+1];
2126     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2127     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2128     rename(old_path, path);
2130     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2131     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2132     rename(old_path, path);
2133   }
2135   dir = opendir(journal_dir);
2136   if (!dir) {
2137     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2138     return;
2139   }
2140   while ((dent = readdir(dir)) != NULL)
2141   {
2142     /* looks like a journal file? */
2143     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2144       continue;
2146     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2148     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2149     {
2150       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2151                dent->d_name);
2152       break;
2153     }
2154   }
2155   closedir(dir);
2157   qsort(journal_cur->files, journal_cur->files_num,
2158         sizeof(journal_cur->files[0]), journal_sort);
2160   for (uint i=0; i < journal_cur->files_num; i++)
2161     had_journal += journal_replay(journal_cur->files[i]);
2163   journal_new_file();
2165   /* it must have been a crash.  start a flush */
2166   if (had_journal && config_flush_at_shutdown)
2167     flush_old_values(-1);
2169   pthread_mutex_unlock(&journal_lock);
2171   RRDD_LOG(LOG_INFO, "journal processing complete");
2173 } /* }}} static void journal_init */
2175 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2177   assert(sock != NULL);
2179   free(sock->rbuf);  sock->rbuf = NULL;
2180   free(sock->wbuf);  sock->wbuf = NULL;
2181   free(sock);
2182 } /* }}} void free_listen_socket */
2184 static void close_connection(listen_socket_t *sock) /* {{{ */
2186   if (sock->fd >= 0)
2187   {
2188     close(sock->fd);
2189     sock->fd = -1;
2190   }
2192   free_listen_socket(sock);
2194 } /* }}} void close_connection */
2196 static void *connection_thread_main (void *args) /* {{{ */
2198   listen_socket_t *sock;
2199   int fd;
2201   sock = (listen_socket_t *) args;
2202   fd = sock->fd;
2204   /* init read buffers */
2205   sock->next_read = sock->next_cmd = 0;
2206   sock->rbuf = malloc(RBUF_SIZE);
2207   if (sock->rbuf == NULL)
2208   {
2209     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2210     close_connection(sock);
2211     return NULL;
2212   }
2214   pthread_mutex_lock (&connection_threads_lock);
2215 #ifdef HAVE_LIBWRAP
2216   /* LIBWRAP does not support multiple threads! By putting this code
2217      inside pthread_mutex_lock we do not have to worry about request_info
2218      getting overwritten by another thread.
2219   */
2220   struct request_info req;
2221   request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2222   fromhost(&req);
2223   if(!hosts_access(&req)) {
2224     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2225     pthread_mutex_unlock (&connection_threads_lock);
2226     close_connection(sock);
2227     return NULL;
2228   }
2229 #endif /* HAVE_LIBWRAP */
2230   connection_threads_num++;
2231   pthread_mutex_unlock (&connection_threads_lock);
2233   while (state == RUNNING)
2234   {
2235     char *cmd;
2236     ssize_t cmd_len;
2237     ssize_t rbytes;
2238     time_t now;
2240     struct pollfd pollfd;
2241     int status;
2243     pollfd.fd = fd;
2244     pollfd.events = POLLIN | POLLPRI;
2245     pollfd.revents = 0;
2247     status = poll (&pollfd, 1, /* timeout = */ 500);
2248     if (state != RUNNING)
2249       break;
2250     else if (status == 0) /* timeout */
2251       continue;
2252     else if (status < 0) /* error */
2253     {
2254       status = errno;
2255       if (status != EINTR)
2256         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2257       continue;
2258     }
2260     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2261       break;
2262     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2263     {
2264       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2265           "poll(2) returned something unexpected: %#04hx",
2266           pollfd.revents);
2267       break;
2268     }
2270     rbytes = read(fd, sock->rbuf + sock->next_read,
2271                   RBUF_SIZE - sock->next_read);
2272     if (rbytes < 0)
2273     {
2274       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2275       break;
2276     }
2277     else if (rbytes == 0)
2278       break; /* eof */
2280     sock->next_read += rbytes;
2282     if (sock->batch_start)
2283       now = sock->batch_start;
2284     else
2285       now = time(NULL);
2287     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2288     {
2289       status = handle_request (sock, now, cmd, cmd_len+1);
2290       if (status != 0)
2291         goto out_close;
2292     }
2293   }
2295 out_close:
2296   close_connection(sock);
2298   /* Remove this thread from the connection threads list */
2299   pthread_mutex_lock (&connection_threads_lock);
2300   connection_threads_num--;
2301   if (connection_threads_num <= 0)
2302     pthread_cond_broadcast(&connection_threads_done);
2303   pthread_mutex_unlock (&connection_threads_lock);
2305   return (NULL);
2306 } /* }}} void *connection_thread_main */
2308 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2310   int fd;
2311   struct sockaddr_un sa;
2312   listen_socket_t *temp;
2313   int status;
2314   const char *path;
2315   char *path_copy, *dir;
2317   path = sock->addr;
2318   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2319     path += strlen("unix:");
2321   /* dirname may modify its argument */
2322   path_copy = strdup(path);
2323   if (path_copy == NULL)
2324   {
2325     fprintf(stderr, "rrdcached: strdup(): %s\n",
2326         rrd_strerror(errno));
2327     return (-1);
2328   }
2330   dir = dirname(path_copy);
2331   if (rrd_mkdir_p(dir, 0777) != 0)
2332   {
2333     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2334         dir, rrd_strerror(errno));
2335     return (-1);
2336   }
2338   free(path_copy);
2340   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2341       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2342   if (temp == NULL)
2343   {
2344     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2345     return (-1);
2346   }
2347   listen_fds = temp;
2348   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2350   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2351   if (fd < 0)
2352   {
2353     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2354              rrd_strerror(errno));
2355     return (-1);
2356   }
2358   memset (&sa, 0, sizeof (sa));
2359   sa.sun_family = AF_UNIX;
2360   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2362   /* if we've gotten this far, we own the pid file.  any daemon started
2363    * with the same args must not be alive.  therefore, ensure that we can
2364    * create the socket...
2365    */
2366   unlink(path);
2368   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2369   if (status != 0)
2370   {
2371     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2372              path, rrd_strerror(errno));
2373     close (fd);
2374     return (-1);
2375   }
2377   /* tweak the sockets group ownership */
2378   if (sock->socket_group != (gid_t)-1)
2379   {
2380     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2381          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2382     {
2383       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2384     }
2385   }
2387   if (sock->socket_permissions != (mode_t)-1)
2388   {
2389     if (chmod(path, sock->socket_permissions) != 0)
2390       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2391           (unsigned int)sock->socket_permissions, strerror(errno));
2392   }
2394   status = listen (fd, /* backlog = */ 10);
2395   if (status != 0)
2396   {
2397     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2398              path, rrd_strerror(errno));
2399     close (fd);
2400     unlink (path);
2401     return (-1);
2402   }
2404   listen_fds[listen_fds_num].fd = fd;
2405   listen_fds[listen_fds_num].family = PF_UNIX;
2406   strncpy(listen_fds[listen_fds_num].addr, path,
2407           sizeof (listen_fds[listen_fds_num].addr) - 1);
2408   listen_fds_num++;
2410   return (0);
2411 } /* }}} int open_listen_socket_unix */
2413 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2415   struct addrinfo ai_hints;
2416   struct addrinfo *ai_res;
2417   struct addrinfo *ai_ptr;
2418   char addr_copy[NI_MAXHOST];
2419   char *addr;
2420   char *port;
2421   int status;
2423   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2424   addr_copy[sizeof (addr_copy) - 1] = 0;
2425   addr = addr_copy;
2427   memset (&ai_hints, 0, sizeof (ai_hints));
2428   ai_hints.ai_flags = 0;
2429 #ifdef AI_ADDRCONFIG
2430   ai_hints.ai_flags |= AI_ADDRCONFIG;
2431 #endif
2432   ai_hints.ai_family = AF_UNSPEC;
2433   ai_hints.ai_socktype = SOCK_STREAM;
2435   port = NULL;
2436   if (*addr == '[') /* IPv6+port format */
2437   {
2438     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2439     addr++;
2441     port = strchr (addr, ']');
2442     if (port == NULL)
2443     {
2444       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2445       return (-1);
2446     }
2447     *port = 0;
2448     port++;
2450     if (*port == ':')
2451       port++;
2452     else if (*port == 0)
2453       port = NULL;
2454     else
2455     {
2456       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2457       return (-1);
2458     }
2459   } /* if (*addr == '[') */
2460   else
2461   {
2462     port = rindex(addr, ':');
2463     if (port != NULL)
2464     {
2465       *port = 0;
2466       port++;
2467     }
2468   }
2469   ai_res = NULL;
2470   status = getaddrinfo (addr,
2471                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2472                         &ai_hints, &ai_res);
2473   if (status != 0)
2474   {
2475     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2476              addr, gai_strerror (status));
2477     return (-1);
2478   }
2480   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2481   {
2482     int fd;
2483     listen_socket_t *temp;
2484     int one = 1;
2486     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2487         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2488     if (temp == NULL)
2489     {
2490       fprintf (stderr,
2491                "rrdcached: open_listen_socket_network: realloc failed.\n");
2492       continue;
2493     }
2494     listen_fds = temp;
2495     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2497     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2498     if (fd < 0)
2499     {
2500       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2501                rrd_strerror(errno));
2502       continue;
2503     }
2505     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2507     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2508     if (status != 0)
2509     {
2510       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2511                sock->addr, rrd_strerror(errno));
2512       close (fd);
2513       continue;
2514     }
2516     status = listen (fd, /* backlog = */ 10);
2517     if (status != 0)
2518     {
2519       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2520                sock->addr, rrd_strerror(errno));
2521       close (fd);
2522       freeaddrinfo(ai_res);
2523       return (-1);
2524     }
2526     listen_fds[listen_fds_num].fd = fd;
2527     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2528     listen_fds_num++;
2529   } /* for (ai_ptr) */
2531   freeaddrinfo(ai_res);
2532   return (0);
2533 } /* }}} static int open_listen_socket_network */
2535 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2537   assert(sock != NULL);
2538   assert(sock->addr != NULL);
2540   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2541       || sock->addr[0] == '/')
2542     return (open_listen_socket_unix(sock));
2543   else
2544     return (open_listen_socket_network(sock));
2545 } /* }}} int open_listen_socket */
2547 static int close_listen_sockets (void) /* {{{ */
2549   size_t i;
2551   for (i = 0; i < listen_fds_num; i++)
2552   {
2553     close (listen_fds[i].fd);
2555     if (listen_fds[i].family == PF_UNIX)
2556       unlink(listen_fds[i].addr);
2557   }
2559   free (listen_fds);
2560   listen_fds = NULL;
2561   listen_fds_num = 0;
2563   return (0);
2564 } /* }}} int close_listen_sockets */
2566 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2568   struct pollfd *pollfds;
2569   int pollfds_num;
2570   int status;
2571   int i;
2573   if (listen_fds_num < 1)
2574   {
2575     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2576     return (NULL);
2577   }
2579   pollfds_num = listen_fds_num;
2580   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2581   if (pollfds == NULL)
2582   {
2583     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2584     return (NULL);
2585   }
2586   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2588   RRDD_LOG(LOG_INFO, "listening for connections");
2590   while (state == RUNNING)
2591   {
2592     for (i = 0; i < pollfds_num; i++)
2593     {
2594       pollfds[i].fd = listen_fds[i].fd;
2595       pollfds[i].events = POLLIN | POLLPRI;
2596       pollfds[i].revents = 0;
2597     }
2599     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2600     if (state != RUNNING)
2601       break;
2602     else if (status == 0) /* timeout */
2603       continue;
2604     else if (status < 0) /* error */
2605     {
2606       status = errno;
2607       if (status != EINTR)
2608       {
2609         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2610       }
2611       continue;
2612     }
2614     for (i = 0; i < pollfds_num; i++)
2615     {
2616       listen_socket_t *client_sock;
2617       struct sockaddr_storage client_sa;
2618       socklen_t client_sa_size;
2619       pthread_t tid;
2620       pthread_attr_t attr;
2622       if (pollfds[i].revents == 0)
2623         continue;
2625       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2626       {
2627         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2628             "poll(2) returned something unexpected for listen FD #%i.",
2629             pollfds[i].fd);
2630         continue;
2631       }
2633       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2634       if (client_sock == NULL)
2635       {
2636         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2637         continue;
2638       }
2639       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2641       client_sa_size = sizeof (client_sa);
2642       client_sock->fd = accept (pollfds[i].fd,
2643           (struct sockaddr *) &client_sa, &client_sa_size);
2644       if (client_sock->fd < 0)
2645       {
2646         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2647         free(client_sock);
2648         continue;
2649       }
2651       pthread_attr_init (&attr);
2652       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2654       status = pthread_create (&tid, &attr, connection_thread_main,
2655                                client_sock);
2656       if (status != 0)
2657       {
2658         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2659         close_connection(client_sock);
2660         continue;
2661       }
2662     } /* for (pollfds_num) */
2663   } /* while (state == RUNNING) */
2665   RRDD_LOG(LOG_INFO, "starting shutdown");
2667   close_listen_sockets ();
2669   pthread_mutex_lock (&connection_threads_lock);
2670   while (connection_threads_num > 0)
2671     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2672   pthread_mutex_unlock (&connection_threads_lock);
2674   free(pollfds);
2676   return (NULL);
2677 } /* }}} void *listen_thread_main */
2679 static int daemonize (void) /* {{{ */
2681   int pid_fd;
2682   char *base_dir;
2684   daemon_uid = geteuid();
2686   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2687   if (pid_fd < 0)
2688     pid_fd = check_pidfile();
2689   if (pid_fd < 0)
2690     return pid_fd;
2692   /* open all the listen sockets */
2693   if (config_listen_address_list_len > 0)
2694   {
2695     for (size_t i = 0; i < config_listen_address_list_len; i++)
2696       open_listen_socket (config_listen_address_list[i]);
2698     rrd_free_ptrs((void ***) &config_listen_address_list,
2699                   &config_listen_address_list_len);
2700   }
2701   else
2702   {
2703     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2704         sizeof(default_socket.addr) - 1);
2705     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2707     if (default_socket.permissions == 0)
2708       socket_permission_set_all (&default_socket);
2710     open_listen_socket (&default_socket);
2711   }
2713   if (listen_fds_num < 1)
2714   {
2715     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2716     goto error;
2717   }
2719   if (!stay_foreground)
2720   {
2721     pid_t child;
2723     child = fork ();
2724     if (child < 0)
2725     {
2726       fprintf (stderr, "daemonize: fork(2) failed.\n");
2727       goto error;
2728     }
2729     else if (child > 0)
2730       exit(0);
2732     /* Become session leader */
2733     setsid ();
2735     /* Open the first three file descriptors to /dev/null */
2736     close (2);
2737     close (1);
2738     close (0);
2740     open ("/dev/null", O_RDWR);
2741     if (dup(0) == -1 || dup(0) == -1){
2742         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2743     }
2744   } /* if (!stay_foreground) */
2746   /* Change into the /tmp directory. */
2747   base_dir = (config_base_dir != NULL)
2748     ? config_base_dir
2749     : "/tmp";
2751   if (chdir (base_dir) != 0)
2752   {
2753     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2754     goto error;
2755   }
2757   install_signal_handlers();
2759   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2760   RRDD_LOG(LOG_INFO, "starting up");
2762   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2763                                 (GDestroyNotify) free_cache_item);
2764   if (cache_tree == NULL)
2765   {
2766     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2767     goto error;
2768   }
2770   return write_pidfile (pid_fd);
2772 error:
2773   remove_pidfile();
2774   return -1;
2775 } /* }}} int daemonize */
2777 static int cleanup (void) /* {{{ */
2779   pthread_cond_broadcast (&flush_cond);
2780   pthread_join (flush_thread, NULL);
2782   pthread_cond_broadcast (&queue_cond);
2783   for (int i = 0; i < config_queue_threads; i++)
2784     pthread_join (queue_threads[i], NULL);
2786   if (config_flush_at_shutdown)
2787   {
2788     assert(cache_queue_head == NULL);
2789     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2790   }
2792   free(queue_threads);
2793   free(config_base_dir);
2795   pthread_mutex_lock(&cache_lock);
2796   g_tree_destroy(cache_tree);
2798   pthread_mutex_lock(&journal_lock);
2799   journal_done();
2801   RRDD_LOG(LOG_INFO, "goodbye");
2802   closelog ();
2804   remove_pidfile ();
2805   free(config_pid_file);
2807   return (0);
2808 } /* }}} int cleanup */
2810 static int read_options (int argc, char **argv) /* {{{ */
2812   int option;
2813   int status = 0;
2815   socket_permission_clear (&default_socket);
2817   default_socket.socket_group = (gid_t)-1;
2818   default_socket.socket_permissions = (mode_t)-1;
2820   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2821   {
2822     switch (option)
2823     {
2824       case 'g':
2825         stay_foreground=1;
2826         break;
2828       case 'l':
2829       {
2830         listen_socket_t *new;
2832         new = malloc(sizeof(listen_socket_t));
2833         if (new == NULL)
2834         {
2835           fprintf(stderr, "read_options: malloc failed.\n");
2836           return(2);
2837         }
2838         memset(new, 0, sizeof(listen_socket_t));
2840         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2842         /* Add permissions to the socket {{{ */
2843         if (default_socket.permissions != 0)
2844         {
2845           socket_permission_copy (new, &default_socket);
2846         }
2847         else /* if (default_socket.permissions == 0) */
2848         {
2849           /* Add permission for ALL commands to the socket. */
2850           socket_permission_set_all (new);
2851         }
2852         /* }}} Done adding permissions. */
2854         new->socket_group = default_socket.socket_group;
2855         new->socket_permissions = default_socket.socket_permissions;
2857         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2858                          &config_listen_address_list_len, new))
2859         {
2860           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2861           return (2);
2862         }
2863       }
2864       break;
2866       /* set socket group permissions */
2867       case 's':
2868       {
2869         gid_t group_gid;
2870         struct group *grp;
2872         group_gid = strtoul(optarg, NULL, 10);
2873         if (errno != EINVAL && group_gid>0)
2874         {
2875           /* we were passed a number */
2876           grp = getgrgid(group_gid);
2877         }
2878         else
2879         {
2880           grp = getgrnam(optarg);
2881         }
2883         if (grp)
2884         {
2885           default_socket.socket_group = grp->gr_gid;
2886         }
2887         else
2888         {
2889           /* no idea what the user wanted... */
2890           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2891           return (5);
2892         }
2893       }
2894       break;
2896       /* set socket file permissions */
2897       case 'm':
2898       {
2899         long  tmp;
2900         char *endptr = NULL;
2902         tmp = strtol (optarg, &endptr, 8);
2903         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2904             || (tmp > 07777) || (tmp < 0)) {
2905           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2906               optarg);
2907           return (5);
2908         }
2910         default_socket.socket_permissions = (mode_t)tmp;
2911       }
2912       break;
2914       case 'P':
2915       {
2916         char *optcopy;
2917         char *saveptr;
2918         char *dummy;
2919         char *ptr;
2921         socket_permission_clear (&default_socket);
2923         optcopy = strdup (optarg);
2924         dummy = optcopy;
2925         saveptr = NULL;
2926         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2927         {
2928           dummy = NULL;
2929           status = socket_permission_add (&default_socket, ptr);
2930           if (status != 0)
2931           {
2932             fprintf (stderr, "read_options: Adding permission \"%s\" to "
2933                 "socket failed. Most likely, this permission doesn't "
2934                 "exist. Check your command line.\n", ptr);
2935             status = 4;
2936           }
2937         }
2939         free (optcopy);
2940       }
2941       break;
2943       case 'f':
2944       {
2945         int temp;
2947         temp = atoi (optarg);
2948         if (temp > 0)
2949           config_flush_interval = temp;
2950         else
2951         {
2952           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2953           status = 3;
2954         }
2955       }
2956       break;
2958       case 'w':
2959       {
2960         int temp;
2962         temp = atoi (optarg);
2963         if (temp > 0)
2964           config_write_interval = temp;
2965         else
2966         {
2967           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2968           status = 2;
2969         }
2970       }
2971       break;
2973       case 'z':
2974       {
2975         int temp;
2977         temp = atoi(optarg);
2978         if (temp > 0)
2979           config_write_jitter = temp;
2980         else
2981         {
2982           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2983           status = 2;
2984         }
2986         break;
2987       }
2989       case 't':
2990       {
2991         int threads;
2992         threads = atoi(optarg);
2993         if (threads >= 1)
2994           config_queue_threads = threads;
2995         else
2996         {
2997           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2998           return 1;
2999         }
3000       }
3001       break;
3003       case 'B':
3004         config_write_base_only = 1;
3005         break;
3007       case 'b':
3008       {
3009         size_t len;
3010         char base_realpath[PATH_MAX];
3012         if (config_base_dir != NULL)
3013           free (config_base_dir);
3014         config_base_dir = strdup (optarg);
3015         if (config_base_dir == NULL)
3016         {
3017           fprintf (stderr, "read_options: strdup failed.\n");
3018           return (3);
3019         }
3021         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3022         {
3023           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3024               config_base_dir, rrd_strerror (errno));
3025           return (3);
3026         }
3028         /* make sure that the base directory is not resolved via
3029          * symbolic links.  this makes some performance-enhancing
3030          * assumptions possible (we don't have to resolve paths
3031          * that start with a "/")
3032          */
3033         if (realpath(config_base_dir, base_realpath) == NULL)
3034         {
3035           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3036               "%s\n", config_base_dir, rrd_strerror(errno));
3037           return 5;
3038         }
3040         len = strlen (config_base_dir);
3041         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3042         {
3043           config_base_dir[len - 1] = 0;
3044           len--;
3045         }
3047         if (len < 1)
3048         {
3049           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3050           return (4);
3051         }
3053         _config_base_dir_len = len;
3055         len = strlen (base_realpath);
3056         while ((len > 0) && (base_realpath[len - 1] == '/'))
3057         {
3058           base_realpath[len - 1] = '\0';
3059           len--;
3060         }
3062         if (strncmp(config_base_dir,
3063                          base_realpath, sizeof(base_realpath)) != 0)
3064         {
3065           fprintf(stderr,
3066                   "Base directory (-b) resolved via file system links!\n"
3067                   "Please consult rrdcached '-b' documentation!\n"
3068                   "Consider specifying the real directory (%s)\n",
3069                   base_realpath);
3070           return 5;
3071         }
3072       }
3073       break;
3075       case 'p':
3076       {
3077         if (config_pid_file != NULL)
3078           free (config_pid_file);
3079         config_pid_file = strdup (optarg);
3080         if (config_pid_file == NULL)
3081         {
3082           fprintf (stderr, "read_options: strdup failed.\n");
3083           return (3);
3084         }
3085       }
3086       break;
3088       case 'F':
3089         config_flush_at_shutdown = 1;
3090         break;
3092       case 'j':
3093       {
3094         char journal_dir_actual[PATH_MAX];
3095         const char *dir;
3096         if (realpath((const char *)optarg, journal_dir_actual) == NULL)
3097         {
3098           fprintf(stderr, "Failed to canonicalize the journal directory '%s': %s\n",
3099               optarg, rrd_strerror(errno));
3100           return 7;
3101         }
3102         dir = journal_dir = strdup(journal_dir_actual);
3103         if (dir == NULL) {
3104           fprintf (stderr, "read_options: strdup failed.\n");
3105           return (3);
3106         }
3108         status = rrd_mkdir_p(dir, 0777);
3109         if (status != 0)
3110         {
3111           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3112               dir, rrd_strerror(errno));
3113           return 6;
3114         }
3116         if (access(dir, R_OK|W_OK|X_OK) != 0)
3117         {
3118           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3119                   errno ? rrd_strerror(errno) : "");
3120           return 6;
3121         }
3122       }
3123       break;
3125       case 'h':
3126       case '?':
3127         printf ("RRDCacheD %s\n"
3128             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3129             "\n"
3130             "Usage: rrdcached [options]\n"
3131             "\n"
3132             "Valid options are:\n"
3133             "  -l <address>  Socket address to listen to.\n"
3134             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3135             "  -P <perms>    Sets the permissions to assign to all following "
3136                             "sockets\n"
3137             "  -w <seconds>  Interval in which to write data.\n"
3138             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3139             "  -t <threads>  Number of write threads.\n"
3140             "  -f <seconds>  Interval in which to flush dead data.\n"
3141             "  -p <file>     Location of the PID-file.\n"
3142             "  -b <dir>      Base directory to change to.\n"
3143             "  -B            Restrict file access to paths within -b <dir>\n"
3144             "  -g            Do not fork and run in the foreground.\n"
3145             "  -j <dir>      Directory in which to create the journal files.\n"
3146             "  -F            Always flush all updates at shutdown\n"
3147             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3148             "                (the socket will also have read/write permissions "
3149                             "for that group)\n"
3150             "  -m <mode>     File permissions (octal) of all following UNIX "
3151                             "sockets\n"
3152             "\n"
3153             "For more information and a detailed description of all options "
3154             "please refer\n"
3155             "to the rrdcached(1) manual page.\n",
3156             VERSION);
3157         if (option == 'h')
3158           status = -1;
3159         else
3160           status = 1;
3161         break;
3162     } /* switch (option) */
3163   } /* while (getopt) */
3165   /* advise the user when values are not sane */
3166   if (config_flush_interval < 2 * config_write_interval)
3167     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3168             " 2x write interval (-w) !\n");
3169   if (config_write_jitter > config_write_interval)
3170     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3171             " write interval (-w) !\n");
3173   if (config_write_base_only && config_base_dir == NULL)
3174     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3175             "  Consult the rrdcached documentation\n");
3177   if (journal_dir == NULL)
3178     config_flush_at_shutdown = 1;
3180   return (status);
3181 } /* }}} int read_options */
3183 int main (int argc, char **argv)
3185   int status;
3187   status = read_options (argc, argv);
3188   if (status != 0)
3189   {
3190     if (status < 0)
3191       status = 0;
3192     return (status);
3193   }
3195   status = daemonize ();
3196   if (status != 0)
3197   {
3198     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3199     return (1);
3200   }
3202   journal_init();
3204   /* start the queue threads */
3205   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3206   if (queue_threads == NULL)
3207   {
3208     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3209     cleanup();
3210     return (1);
3211   }
3212   for (int i = 0; i < config_queue_threads; i++)
3213   {
3214     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3215     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3216     if (status != 0)
3217     {
3218       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3219       cleanup();
3220       return (1);
3221     }
3222   }
3224   /* start the flush thread */
3225   memset(&flush_thread, 0, sizeof(flush_thread));
3226   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3227   if (status != 0)
3228   {
3229     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3230     cleanup();
3231     return (1);
3232   }
3234   listen_thread_main (NULL);
3235   cleanup ();
3237   return (0);
3238 } /* int main */
3240 /*
3241  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3242  */