Code

When specifying a relative path (-j option) rrd_cached would segfault when
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) \
118       fprintf(stderr, __VA_ARGS__); \
119     syslog ((severity), __VA_ARGS__); \
120   } while (0)
122 /*
123  * Types
124  */
125 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127 struct listen_socket_s
129   int fd;
130   char addr[PATH_MAX + 1];
131   int family;
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
142   char *wbuf;
143   ssize_t wbuf_len;
145   uint32_t permissions;
147   gid_t  socket_group;
148   mode_t socket_permissions;
149 };
150 typedef struct listen_socket_s listen_socket_t;
152 struct command_s;
153 typedef struct command_s command_t;
154 /* note: guard against "unused" warnings in the handlers */
155 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
156                         time_t UNUSED(now),\
157                         char  UNUSED(*buffer),\
158                         size_t UNUSED(buffer_size)
160 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
161                         DISPATCH_PROTO
163 struct command_s {
164   char   *cmd;
165   int (*handler)(HANDLER_PROTO);
167   char  context;                /* where we expect to see it */
168 #define CMD_CONTEXT_CLIENT      (1<<0)
169 #define CMD_CONTEXT_BATCH       (1<<1)
170 #define CMD_CONTEXT_JOURNAL     (1<<2)
171 #define CMD_CONTEXT_ANY         (0x7f)
173   char *syntax;
174   char *help;
175 };
177 struct cache_item_s;
178 typedef struct cache_item_s cache_item_t;
179 struct cache_item_s
181   char *file;
182   char **values;
183   size_t values_num;
184   time_t last_flush_time;
185   time_t last_update_stamp;
186 #define CI_FLAGS_IN_TREE  (1<<0)
187 #define CI_FLAGS_IN_QUEUE (1<<1)
188   int flags;
189   pthread_cond_t  flushed;
190   cache_item_t *prev;
191   cache_item_t *next;
192 };
194 struct callback_flush_data_s
196   time_t now;
197   time_t abs_timeout;
198   char **keys;
199   size_t keys_num;
200 };
201 typedef struct callback_flush_data_s callback_flush_data_t;
203 enum queue_side_e
205   HEAD,
206   TAIL
207 };
208 typedef enum queue_side_e queue_side_t;
210 /* describe a set of journal files */
211 typedef struct {
212   char **files;
213   size_t files_num;
214 } journal_set;
216 /* max length of socket command or response */
217 #define CMD_MAX 4096
218 #define RBUF_SIZE (CMD_MAX*2)
220 /*
221  * Variables
222  */
223 static int stay_foreground = 0;
224 static uid_t daemon_uid;
226 static listen_socket_t *listen_fds = NULL;
227 static size_t listen_fds_num = 0;
229 enum {
230   RUNNING,              /* normal operation */
231   FLUSHING,             /* flushing remaining values */
232   SHUTDOWN              /* shutting down */
233 } state = RUNNING;
235 static pthread_t *queue_threads;
236 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
237 static int config_queue_threads = 4;
239 static pthread_t flush_thread;
240 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
242 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
243 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
244 static int connection_threads_num = 0;
246 /* Cache stuff */
247 static GTree          *cache_tree = NULL;
248 static cache_item_t   *cache_queue_head = NULL;
249 static cache_item_t   *cache_queue_tail = NULL;
250 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
252 static int config_write_interval = 300;
253 static int config_write_jitter   = 0;
254 static int config_flush_interval = 3600;
255 static int config_flush_at_shutdown = 0;
256 static char *config_pid_file = NULL;
257 static char *config_base_dir = NULL;
258 static size_t _config_base_dir_len = 0;
259 static int config_write_base_only = 0;
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
273 /* Journaled updates */
274 #define JOURNAL_REPLAY(s) ((s) == NULL)
275 #define JOURNAL_BASE "rrd.journal"
276 static journal_set *journal_cur = NULL;
277 static journal_set *journal_old = NULL;
278 static char *journal_dir = NULL;
279 static FILE *journal_fh = NULL;         /* current journal file handle */
280 static long  journal_size = 0;          /* current journal size */
281 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
282 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
283 static int journal_write(char *cmd, char *args);
284 static void journal_done(void);
285 static void journal_rotate(void);
287 /* prototypes for forward refernces */
288 static int handle_request_help (HANDLER_PROTO);
290 /* 
291  * Functions
292  */
293 static void sig_common (const char *sig) /* {{{ */
295   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
296   state = FLUSHING;
297   pthread_cond_broadcast(&flush_cond);
298   pthread_cond_broadcast(&queue_cond);
299 } /* }}} void sig_common */
301 static void sig_int_handler (int UNUSED(s)) /* {{{ */
303   sig_common("INT");
304 } /* }}} void sig_int_handler */
306 static void sig_term_handler (int UNUSED(s)) /* {{{ */
308   sig_common("TERM");
309 } /* }}} void sig_term_handler */
311 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
313   config_flush_at_shutdown = 1;
314   sig_common("USR1");
315 } /* }}} void sig_usr1_handler */
317 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
319   config_flush_at_shutdown = 0;
320   sig_common("USR2");
321 } /* }}} void sig_usr2_handler */
323 static void install_signal_handlers(void) /* {{{ */
325   /* These structures are static, because `sigaction' behaves weird if the are
326    * overwritten.. */
327   static struct sigaction sa_int;
328   static struct sigaction sa_term;
329   static struct sigaction sa_pipe;
330   static struct sigaction sa_usr1;
331   static struct sigaction sa_usr2;
333   /* Install signal handlers */
334   memset (&sa_int, 0, sizeof (sa_int));
335   sa_int.sa_handler = sig_int_handler;
336   sigaction (SIGINT, &sa_int, NULL);
338   memset (&sa_term, 0, sizeof (sa_term));
339   sa_term.sa_handler = sig_term_handler;
340   sigaction (SIGTERM, &sa_term, NULL);
342   memset (&sa_pipe, 0, sizeof (sa_pipe));
343   sa_pipe.sa_handler = SIG_IGN;
344   sigaction (SIGPIPE, &sa_pipe, NULL);
346   memset (&sa_pipe, 0, sizeof (sa_usr1));
347   sa_usr1.sa_handler = sig_usr1_handler;
348   sigaction (SIGUSR1, &sa_usr1, NULL);
350   memset (&sa_usr2, 0, sizeof (sa_usr2));
351   sa_usr2.sa_handler = sig_usr2_handler;
352   sigaction (SIGUSR2, &sa_usr2, NULL);
354 } /* }}} void install_signal_handlers */
356 static int open_pidfile(char *action, int oflag) /* {{{ */
358   int fd;
359   const char *file;
360   char *file_copy, *dir;
362   file = (config_pid_file != NULL)
363     ? config_pid_file
364     : LOCALSTATEDIR "/run/rrdcached.pid";
366   /* dirname may modify its argument */
367   file_copy = strdup(file);
368   if (file_copy == NULL)
369   {
370     fprintf(stderr, "rrdcached: strdup(): %s\n",
371         rrd_strerror(errno));
372     return -1;
373   }
375   dir = dirname(file_copy);
376   if (rrd_mkdir_p(dir, 0777) != 0)
377   {
378     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
379         dir, rrd_strerror(errno));
380     return -1;
381   }
383   free(file_copy);
385   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
386   if (fd < 0)
387     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
388             action, file, rrd_strerror(errno));
390   return(fd);
391 } /* }}} static int open_pidfile */
393 /* check existing pid file to see whether a daemon is running */
394 static int check_pidfile(void)
396   int pid_fd;
397   pid_t pid;
398   char pid_str[16];
400   pid_fd = open_pidfile("open", O_RDWR);
401   if (pid_fd < 0)
402     return pid_fd;
404   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
405     return -1;
407   pid = atoi(pid_str);
408   if (pid <= 0)
409     return -1;
411   /* another running process that we can signal COULD be
412    * a competing rrdcached */
413   if (pid != getpid() && kill(pid, 0) == 0)
414   {
415     fprintf(stderr,
416             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
417     close(pid_fd);
418     return -1;
419   }
421   lseek(pid_fd, 0, SEEK_SET);
422   if (ftruncate(pid_fd, 0) == -1)
423   {
424     fprintf(stderr,
425             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
426     close(pid_fd);
427     return -1;
428   }
430   fprintf(stderr,
431           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
432           "rrdcached: starting normally.\n", pid);
434   return pid_fd;
435 } /* }}} static int check_pidfile */
437 static int write_pidfile (int fd) /* {{{ */
439   pid_t pid;
440   FILE *fh;
442   pid = getpid ();
444   fh = fdopen (fd, "w");
445   if (fh == NULL)
446   {
447     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
448     close(fd);
449     return (-1);
450   }
452   fprintf (fh, "%i\n", (int) pid);
453   fclose (fh);
455   return (0);
456 } /* }}} int write_pidfile */
458 static int remove_pidfile (void) /* {{{ */
460   char *file;
461   int status;
463   file = (config_pid_file != NULL)
464     ? config_pid_file
465     : LOCALSTATEDIR "/run/rrdcached.pid";
467   status = unlink (file);
468   if (status == 0)
469     return (0);
470   return (errno);
471 } /* }}} int remove_pidfile */
473 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
475   char *eol;
477   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
478                sock->next_read - sock->next_cmd);
480   if (eol == NULL)
481   {
482     /* no commands left, move remainder back to front of rbuf */
483     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
484             sock->next_read - sock->next_cmd);
485     sock->next_read -= sock->next_cmd;
486     sock->next_cmd = 0;
487     *len = 0;
488     return NULL;
489   }
490   else
491   {
492     char *cmd = sock->rbuf + sock->next_cmd;
493     *eol = '\0';
495     sock->next_cmd = eol - sock->rbuf + 1;
497     if (eol > sock->rbuf && *(eol-1) == '\r')
498       *(--eol) = '\0'; /* handle "\r\n" EOL */
500     *len = eol - cmd;
502     return cmd;
503   }
505   /* NOTREACHED */
506   assert(1==0);
507 } /* }}} char *next_cmd */
509 /* add the characters directly to the write buffer */
510 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
512   char *new_buf;
514   assert(sock != NULL);
516   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
517   if (new_buf == NULL)
518   {
519     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
520     return -1;
521   }
523   strncpy(new_buf + sock->wbuf_len, str, len + 1);
525   sock->wbuf = new_buf;
526   sock->wbuf_len += len;
528   return 0;
529 } /* }}} static int add_to_wbuf */
531 /* add the text to the "extra" info that's sent after the status line */
532 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
534   va_list argp;
535   char buffer[CMD_MAX];
536   int len;
538   if (JOURNAL_REPLAY(sock)) return 0;
539   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
541   va_start(argp, fmt);
542 #ifdef HAVE_VSNPRINTF
543   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
544 #else
545   len = vsprintf(buffer, fmt, argp);
546 #endif
547   va_end(argp);
548   if (len < 0)
549   {
550     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
551     return -1;
552   }
554   return add_to_wbuf(sock, buffer, len);
555 } /* }}} static int add_response_info */
557 static int count_lines(char *str) /* {{{ */
559   int lines = 0;
561   if (str != NULL)
562   {
563     while ((str = strchr(str, '\n')) != NULL)
564     {
565       ++lines;
566       ++str;
567     }
568   }
570   return lines;
571 } /* }}} static int count_lines */
573 /* send the response back to the user.
574  * returns 0 on success, -1 on error
575  * write buffer is always zeroed after this call */
576 static int send_response (listen_socket_t *sock, response_code rc,
577                           char *fmt, ...) /* {{{ */
579   va_list argp;
580   char buffer[CMD_MAX];
581   int lines;
582   ssize_t wrote;
583   int rclen, len;
585   if (JOURNAL_REPLAY(sock)) return rc;
587   if (sock->batch_start)
588   {
589     if (rc == RESP_OK)
590       return rc; /* no response on success during BATCH */
591     lines = sock->batch_cmd;
592   }
593   else if (rc == RESP_OK)
594     lines = count_lines(sock->wbuf);
595   else
596     lines = -1;
598   rclen = sprintf(buffer, "%d ", lines);
599   va_start(argp, fmt);
600 #ifdef HAVE_VSNPRINTF
601   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
602 #else
603   len = vsprintf(buffer+rclen, fmt, argp);
604 #endif
605   va_end(argp);
606   if (len < 0)
607     return -1;
609   len += rclen;
611   /* append the result to the wbuf, don't write to the user */
612   if (sock->batch_start)
613     return add_to_wbuf(sock, buffer, len);
615   /* first write must be complete */
616   if (len != write(sock->fd, buffer, len))
617   {
618     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
619     return -1;
620   }
622   if (sock->wbuf != NULL && rc == RESP_OK)
623   {
624     wrote = 0;
625     while (wrote < sock->wbuf_len)
626     {
627       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
628       if (wb <= 0)
629       {
630         RRDD_LOG(LOG_INFO, "send_response: could not write results");
631         return -1;
632       }
633       wrote += wb;
634     }
635   }
637   free(sock->wbuf); sock->wbuf = NULL;
638   sock->wbuf_len = 0;
640   return 0;
641 } /* }}} */
643 static void wipe_ci_values(cache_item_t *ci, time_t when)
645   ci->values = NULL;
646   ci->values_num = 0;
648   ci->last_flush_time = when;
649   if (config_write_jitter > 0)
650     ci->last_flush_time += (rrd_random() % config_write_jitter);
653 /* remove_from_queue
654  * remove a "cache_item_t" item from the queue.
655  * must hold 'cache_lock' when calling this
656  */
657 static void remove_from_queue(cache_item_t *ci) /* {{{ */
659   if (ci == NULL) return;
660   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
662   if (ci->prev == NULL)
663     cache_queue_head = ci->next; /* reset head */
664   else
665     ci->prev->next = ci->next;
667   if (ci->next == NULL)
668     cache_queue_tail = ci->prev; /* reset the tail */
669   else
670     ci->next->prev = ci->prev;
672   ci->next = ci->prev = NULL;
673   ci->flags &= ~CI_FLAGS_IN_QUEUE;
675   pthread_mutex_lock (&stats_lock);
676   assert (stats_queue_length > 0);
677   stats_queue_length--;
678   pthread_mutex_unlock (&stats_lock);
680 } /* }}} static void remove_from_queue */
682 /* free the resources associated with the cache_item_t
683  * must hold cache_lock when calling this function
684  */
685 static void *free_cache_item(cache_item_t *ci) /* {{{ */
687   if (ci == NULL) return NULL;
689   remove_from_queue(ci);
691   for (size_t i=0; i < ci->values_num; i++)
692     free(ci->values[i]);
694   free (ci->values);
695   free (ci->file);
697   /* in case anyone is waiting */
698   pthread_cond_broadcast(&ci->flushed);
699   pthread_cond_destroy(&ci->flushed);
701   free (ci);
703   return NULL;
704 } /* }}} static void *free_cache_item */
706 /*
707  * enqueue_cache_item:
708  * `cache_lock' must be acquired before calling this function!
709  */
710 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
711     queue_side_t side)
713   if (ci == NULL)
714     return (-1);
716   if (ci->values_num == 0)
717     return (0);
719   if (side == HEAD)
720   {
721     if (cache_queue_head == ci)
722       return 0;
724     /* remove if further down in queue */
725     remove_from_queue(ci);
727     ci->prev = NULL;
728     ci->next = cache_queue_head;
729     if (ci->next != NULL)
730       ci->next->prev = ci;
731     cache_queue_head = ci;
733     if (cache_queue_tail == NULL)
734       cache_queue_tail = cache_queue_head;
735   }
736   else /* (side == TAIL) */
737   {
738     /* We don't move values back in the list.. */
739     if (ci->flags & CI_FLAGS_IN_QUEUE)
740       return (0);
742     assert (ci->next == NULL);
743     assert (ci->prev == NULL);
745     ci->prev = cache_queue_tail;
747     if (cache_queue_tail == NULL)
748       cache_queue_head = ci;
749     else
750       cache_queue_tail->next = ci;
752     cache_queue_tail = ci;
753   }
755   ci->flags |= CI_FLAGS_IN_QUEUE;
757   pthread_cond_signal(&queue_cond);
758   pthread_mutex_lock (&stats_lock);
759   stats_queue_length++;
760   pthread_mutex_unlock (&stats_lock);
762   return (0);
763 } /* }}} int enqueue_cache_item */
765 /*
766  * tree_callback_flush:
767  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
768  * while this is in progress.
769  */
770 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
771     gpointer data)
773   cache_item_t *ci;
774   callback_flush_data_t *cfd;
776   ci = (cache_item_t *) value;
777   cfd = (callback_flush_data_t *) data;
779   if (ci->flags & CI_FLAGS_IN_QUEUE)
780     return FALSE;
782   if (ci->values_num > 0
783       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
784   {
785     enqueue_cache_item (ci, TAIL);
786   }
787   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
788       && (ci->values_num <= 0))
789   {
790     assert ((char *) key == ci->file);
791     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
792     {
793       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
794       return (FALSE);
795     }
796   }
798   return (FALSE);
799 } /* }}} gboolean tree_callback_flush */
801 static int flush_old_values (int max_age)
803   callback_flush_data_t cfd;
804   size_t k;
806   memset (&cfd, 0, sizeof (cfd));
807   /* Pass the current time as user data so that we don't need to call
808    * `time' for each node. */
809   cfd.now = time (NULL);
810   cfd.keys = NULL;
811   cfd.keys_num = 0;
813   if (max_age > 0)
814     cfd.abs_timeout = cfd.now - max_age;
815   else
816     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
818   /* `tree_callback_flush' will return the keys of all values that haven't
819    * been touched in the last `config_flush_interval' seconds in `cfd'.
820    * The char*'s in this array point to the same memory as ci->file, so we
821    * don't need to free them separately. */
822   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
824   for (k = 0; k < cfd.keys_num; k++)
825   {
826     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
827     /* should never fail, since we have held the cache_lock
828      * the entire time */
829     assert(status == TRUE);
830   }
832   if (cfd.keys != NULL)
833   {
834     free (cfd.keys);
835     cfd.keys = NULL;
836   }
838   return (0);
839 } /* int flush_old_values */
841 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
843   struct timeval now;
844   struct timespec next_flush;
845   int status;
847   gettimeofday (&now, NULL);
848   next_flush.tv_sec = now.tv_sec + config_flush_interval;
849   next_flush.tv_nsec = 1000 * now.tv_usec;
851   pthread_mutex_lock(&cache_lock);
853   while (state == RUNNING)
854   {
855     gettimeofday (&now, NULL);
856     if ((now.tv_sec > next_flush.tv_sec)
857         || ((now.tv_sec == next_flush.tv_sec)
858           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
859     {
860       RRDD_LOG(LOG_DEBUG, "flushing old values");
862       /* Determine the time of the next cache flush. */
863       next_flush.tv_sec = now.tv_sec + config_flush_interval;
865       /* Flush all values that haven't been written in the last
866        * `config_write_interval' seconds. */
867       flush_old_values (config_write_interval);
869       /* unlock the cache while we rotate so we don't block incoming
870        * updates if the fsync() blocks on disk I/O */
871       pthread_mutex_unlock(&cache_lock);
872       journal_rotate();
873       pthread_mutex_lock(&cache_lock);
874     }
876     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
877     if (status != 0 && status != ETIMEDOUT)
878     {
879       RRDD_LOG (LOG_ERR, "flush_thread_main: "
880                 "pthread_cond_timedwait returned %i.", status);
881     }
882   }
884   if (config_flush_at_shutdown)
885     flush_old_values (-1); /* flush everything */
887   state = SHUTDOWN;
889   pthread_mutex_unlock(&cache_lock);
891   return NULL;
892 } /* void *flush_thread_main */
894 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
896   pthread_mutex_lock (&cache_lock);
898   while (state != SHUTDOWN
899          || (cache_queue_head != NULL && config_flush_at_shutdown))
900   {
901     cache_item_t *ci;
902     char *file;
903     char **values;
904     size_t values_num;
905     int status;
907     /* Now, check if there's something to store away. If not, wait until
908      * something comes in. */
909     if (cache_queue_head == NULL)
910     {
911       status = pthread_cond_wait (&queue_cond, &cache_lock);
912       if ((status != 0) && (status != ETIMEDOUT))
913       {
914         RRDD_LOG (LOG_ERR, "queue_thread_main: "
915             "pthread_cond_wait returned %i.", status);
916       }
917     }
919     /* Check if a value has arrived. This may be NULL if we timed out or there
920      * was an interrupt such as a signal. */
921     if (cache_queue_head == NULL)
922       continue;
924     ci = cache_queue_head;
926     /* copy the relevant parts */
927     file = strdup (ci->file);
928     if (file == NULL)
929     {
930       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
931       continue;
932     }
934     assert(ci->values != NULL);
935     assert(ci->values_num > 0);
937     values = ci->values;
938     values_num = ci->values_num;
940     wipe_ci_values(ci, time(NULL));
941     remove_from_queue(ci);
943     pthread_mutex_unlock (&cache_lock);
945     rrd_clear_error ();
946     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
947     if (status != 0)
948     {
949       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
950           "rrd_update_r (%s) failed with status %i. (%s)",
951           file, status, rrd_get_error());
952     }
954     journal_write("wrote", file);
956     /* Search again in the tree.  It's possible someone issued a "FORGET"
957      * while we were writing the update values. */
958     pthread_mutex_lock(&cache_lock);
959     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
960     if (ci)
961       pthread_cond_broadcast(&ci->flushed);
962     pthread_mutex_unlock(&cache_lock);
964     if (status == 0)
965     {
966       pthread_mutex_lock (&stats_lock);
967       stats_updates_written++;
968       stats_data_sets_written += values_num;
969       pthread_mutex_unlock (&stats_lock);
970     }
972     rrd_free_ptrs((void ***) &values, &values_num);
973     free(file);
975     pthread_mutex_lock (&cache_lock);
976   }
977   pthread_mutex_unlock (&cache_lock);
979   return (NULL);
980 } /* }}} void *queue_thread_main */
982 static int buffer_get_field (char **buffer_ret, /* {{{ */
983     size_t *buffer_size_ret, char **field_ret)
985   char *buffer;
986   size_t buffer_pos;
987   size_t buffer_size;
988   char *field;
989   size_t field_size;
990   int status;
992   buffer = *buffer_ret;
993   buffer_pos = 0;
994   buffer_size = *buffer_size_ret;
995   field = *buffer_ret;
996   field_size = 0;
998   if (buffer_size <= 0)
999     return (-1);
1001   /* This is ensured by `handle_request'. */
1002   assert (buffer[buffer_size - 1] == '\0');
1004   status = -1;
1005   while (buffer_pos < buffer_size)
1006   {
1007     /* Check for end-of-field or end-of-buffer */
1008     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1009     {
1010       field[field_size] = 0;
1011       field_size++;
1012       buffer_pos++;
1013       status = 0;
1014       break;
1015     }
1016     /* Handle escaped characters. */
1017     else if (buffer[buffer_pos] == '\\')
1018     {
1019       if (buffer_pos >= (buffer_size - 1))
1020         break;
1021       buffer_pos++;
1022       field[field_size] = buffer[buffer_pos];
1023       field_size++;
1024       buffer_pos++;
1025     }
1026     /* Normal operation */ 
1027     else
1028     {
1029       field[field_size] = buffer[buffer_pos];
1030       field_size++;
1031       buffer_pos++;
1032     }
1033   } /* while (buffer_pos < buffer_size) */
1035   if (status != 0)
1036     return (status);
1038   *buffer_ret = buffer + buffer_pos;
1039   *buffer_size_ret = buffer_size - buffer_pos;
1040   *field_ret = field;
1042   return (0);
1043 } /* }}} int buffer_get_field */
1045 /* if we're restricting writes to the base directory,
1046  * check whether the file falls within the dir
1047  * returns 1 if OK, otherwise 0
1048  */
1049 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1051   assert(file != NULL);
1053   if (!config_write_base_only
1054       || JOURNAL_REPLAY(sock)
1055       || config_base_dir == NULL)
1056     return 1;
1058   if (strstr(file, "../") != NULL) goto err;
1060   /* relative paths without "../" are ok */
1061   if (*file != '/') return 1;
1063   /* file must be of the format base + "/" + <1+ char filename> */
1064   if (strlen(file) < _config_base_dir_len + 2) goto err;
1065   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1066   if (*(file + _config_base_dir_len) != '/') goto err;
1068   return 1;
1070 err:
1071   if (sock != NULL && sock->fd >= 0)
1072     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1074   return 0;
1075 } /* }}} static int check_file_access */
1077 /* when using a base dir, convert relative paths to absolute paths.
1078  * if necessary, modifies the "filename" pointer to point
1079  * to the new path created in "tmp".  "tmp" is provided
1080  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1081  *
1082  * this allows us to optimize for the expected case (absolute path)
1083  * with a no-op.
1084  */
1085 static void get_abs_path(char **filename, char *tmp)
1087   assert(tmp != NULL);
1088   assert(filename != NULL && *filename != NULL);
1090   if (config_base_dir == NULL || **filename == '/')
1091     return;
1093   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1094   *filename = tmp;
1095 } /* }}} static int get_abs_path */
1097 static int flush_file (const char *filename) /* {{{ */
1099   cache_item_t *ci;
1101   pthread_mutex_lock (&cache_lock);
1103   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1104   if (ci == NULL)
1105   {
1106     pthread_mutex_unlock (&cache_lock);
1107     return (ENOENT);
1108   }
1110   if (ci->values_num > 0)
1111   {
1112     /* Enqueue at head */
1113     enqueue_cache_item (ci, HEAD);
1114     pthread_cond_wait(&ci->flushed, &cache_lock);
1115   }
1117   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1118    * may have been purged during our cond_wait() */
1120   pthread_mutex_unlock(&cache_lock);
1122   return (0);
1123 } /* }}} int flush_file */
1125 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1127   char *err = "Syntax error.\n";
1129   if (cmd && cmd->syntax)
1130     err = cmd->syntax;
1132   return send_response(sock, RESP_ERR, "Usage: %s", err);
1133 } /* }}} static int syntax_error() */
1135 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1137   uint64_t copy_queue_length;
1138   uint64_t copy_updates_received;
1139   uint64_t copy_flush_received;
1140   uint64_t copy_updates_written;
1141   uint64_t copy_data_sets_written;
1142   uint64_t copy_journal_bytes;
1143   uint64_t copy_journal_rotate;
1145   uint64_t tree_nodes_number;
1146   uint64_t tree_depth;
1148   pthread_mutex_lock (&stats_lock);
1149   copy_queue_length       = stats_queue_length;
1150   copy_updates_received   = stats_updates_received;
1151   copy_flush_received     = stats_flush_received;
1152   copy_updates_written    = stats_updates_written;
1153   copy_data_sets_written  = stats_data_sets_written;
1154   copy_journal_bytes      = stats_journal_bytes;
1155   copy_journal_rotate     = stats_journal_rotate;
1156   pthread_mutex_unlock (&stats_lock);
1158   pthread_mutex_lock (&cache_lock);
1159   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1161   pthread_mutex_unlock (&cache_lock);
1163   add_response_info(sock,
1164                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1165   add_response_info(sock,
1166                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167   add_response_info(sock,
1168                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169   add_response_info(sock,
1170                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171   add_response_info(sock,
1172                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1178   send_response(sock, RESP_OK, "Statistics follow\n");
1180   return (0);
1181 } /* }}} int handle_request_stats */
1183 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1185   char *file, file_tmp[PATH_MAX];
1186   int status;
1188   status = buffer_get_field (&buffer, &buffer_size, &file);
1189   if (status != 0)
1190   {
1191     return syntax_error(sock,cmd);
1192   }
1193   else
1194   {
1195     pthread_mutex_lock(&stats_lock);
1196     stats_flush_received++;
1197     pthread_mutex_unlock(&stats_lock);
1199     get_abs_path(&file, file_tmp);
1200     if (!check_file_access(file, sock)) return 0;
1202     status = flush_file (file);
1203     if (status == 0)
1204       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205     else if (status == ENOENT)
1206     {
1207       /* no file in our tree; see whether it exists at all */
1208       struct stat statbuf;
1210       memset(&statbuf, 0, sizeof(statbuf));
1211       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1213       else
1214         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1215     }
1216     else if (status < 0)
1217       return send_response(sock, RESP_ERR, "Internal error.\n");
1218     else
1219       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1220   }
1222   /* NOTREACHED */
1223   assert(1==0);
1224 } /* }}} int handle_request_flush */
1226 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1228   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1230   pthread_mutex_lock(&cache_lock);
1231   flush_old_values(-1);
1232   pthread_mutex_unlock(&cache_lock);
1234   return send_response(sock, RESP_OK, "Started flush.\n");
1235 } /* }}} static int handle_request_flushall */
1237 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1239   int status;
1240   char *file, file_tmp[PATH_MAX];
1241   cache_item_t *ci;
1243   status = buffer_get_field(&buffer, &buffer_size, &file);
1244   if (status != 0)
1245     return syntax_error(sock,cmd);
1247   get_abs_path(&file, file_tmp);
1249   pthread_mutex_lock(&cache_lock);
1250   ci = g_tree_lookup(cache_tree, file);
1251   if (ci == NULL)
1252   {
1253     pthread_mutex_unlock(&cache_lock);
1254     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1255   }
1257   for (size_t i=0; i < ci->values_num; i++)
1258     add_response_info(sock, "%s\n", ci->values[i]);
1260   pthread_mutex_unlock(&cache_lock);
1261   return send_response(sock, RESP_OK, "updates pending\n");
1262 } /* }}} static int handle_request_pending */
1264 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1266   int status;
1267   gboolean found;
1268   char *file, file_tmp[PATH_MAX];
1270   status = buffer_get_field(&buffer, &buffer_size, &file);
1271   if (status != 0)
1272     return syntax_error(sock,cmd);
1274   get_abs_path(&file, file_tmp);
1275   if (!check_file_access(file, sock)) return 0;
1277   pthread_mutex_lock(&cache_lock);
1278   found = g_tree_remove(cache_tree, file);
1279   pthread_mutex_unlock(&cache_lock);
1281   if (found == TRUE)
1282   {
1283     if (!JOURNAL_REPLAY(sock))
1284       journal_write("forget", file);
1286     return send_response(sock, RESP_OK, "Gone!\n");
1287   }
1288   else
1289     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1291   /* NOTREACHED */
1292   assert(1==0);
1293 } /* }}} static int handle_request_forget */
1295 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1297   cache_item_t *ci;
1299   pthread_mutex_lock(&cache_lock);
1301   ci = cache_queue_head;
1302   while (ci != NULL)
1303   {
1304     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1305     ci = ci->next;
1306   }
1308   pthread_mutex_unlock(&cache_lock);
1310   return send_response(sock, RESP_OK, "in queue.\n");
1311 } /* }}} int handle_request_queue */
1313 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1315   char *file, file_tmp[PATH_MAX];
1316   int values_num = 0;
1317   int status;
1318   char orig_buf[CMD_MAX];
1320   cache_item_t *ci;
1322   /* save it for the journal later */
1323   if (!JOURNAL_REPLAY(sock))
1324     strncpy(orig_buf, buffer, buffer_size);
1326   status = buffer_get_field (&buffer, &buffer_size, &file);
1327   if (status != 0)
1328     return syntax_error(sock,cmd);
1330   pthread_mutex_lock(&stats_lock);
1331   stats_updates_received++;
1332   pthread_mutex_unlock(&stats_lock);
1334   get_abs_path(&file, file_tmp);
1335   if (!check_file_access(file, sock)) return 0;
1337   pthread_mutex_lock (&cache_lock);
1338   ci = g_tree_lookup (cache_tree, file);
1340   if (ci == NULL) /* {{{ */
1341   {
1342     struct stat statbuf;
1343     cache_item_t *tmp;
1345     /* don't hold the lock while we setup; stat(2) might block */
1346     pthread_mutex_unlock(&cache_lock);
1348     memset (&statbuf, 0, sizeof (statbuf));
1349     status = stat (file, &statbuf);
1350     if (status != 0)
1351     {
1352       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1354       status = errno;
1355       if (status == ENOENT)
1356         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1357       else
1358         return send_response(sock, RESP_ERR,
1359                              "stat failed with error %i.\n", status);
1360     }
1361     if (!S_ISREG (statbuf.st_mode))
1362       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1364     if (access(file, R_OK|W_OK) != 0)
1365       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1366                            file, rrd_strerror(errno));
1368     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1369     if (ci == NULL)
1370     {
1371       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1373       return send_response(sock, RESP_ERR, "malloc failed.\n");
1374     }
1375     memset (ci, 0, sizeof (cache_item_t));
1377     ci->file = strdup (file);
1378     if (ci->file == NULL)
1379     {
1380       free (ci);
1381       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1383       return send_response(sock, RESP_ERR, "strdup failed.\n");
1384     }
1386     wipe_ci_values(ci, now);
1387     ci->flags = CI_FLAGS_IN_TREE;
1388     pthread_cond_init(&ci->flushed, NULL);
1390     pthread_mutex_lock(&cache_lock);
1392     /* another UPDATE might have added this entry in the meantime */
1393     tmp = g_tree_lookup (cache_tree, file);
1394     if (tmp == NULL)
1395       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1396     else
1397     {
1398       free_cache_item (ci);
1399       ci = tmp;
1400     }
1402     /* state may have changed while we were unlocked */
1403     if (state == SHUTDOWN)
1404       return -1;
1405   } /* }}} */
1406   assert (ci != NULL);
1408   /* don't re-write updates in replay mode */
1409   if (!JOURNAL_REPLAY(sock))
1410     journal_write("update", orig_buf);
1412   while (buffer_size > 0)
1413   {
1414     char *value;
1415     time_t stamp;
1416     char *eostamp;
1418     status = buffer_get_field (&buffer, &buffer_size, &value);
1419     if (status != 0)
1420     {
1421       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1422       break;
1423     }
1425     /* make sure update time is always moving forward */
1426     stamp = strtol(value, &eostamp, 10);
1427     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1428     {
1429       pthread_mutex_unlock(&cache_lock);
1430       return send_response(sock, RESP_ERR,
1431                            "Cannot find timestamp in '%s'!\n", value);
1432     }
1433     else if (stamp <= ci->last_update_stamp)
1434     {
1435       pthread_mutex_unlock(&cache_lock);
1436       return send_response(sock, RESP_ERR,
1437                            "illegal attempt to update using time %ld when last"
1438                            " update time is %ld (minimum one second step)\n",
1439                            stamp, ci->last_update_stamp);
1440     }
1441     else
1442       ci->last_update_stamp = stamp;
1444     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1445     {
1446       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1447       continue;
1448     }
1450     values_num++;
1451   }
1453   if (((now - ci->last_flush_time) >= config_write_interval)
1454       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1455       && (ci->values_num > 0))
1456   {
1457     enqueue_cache_item (ci, TAIL);
1458   }
1460   pthread_mutex_unlock (&cache_lock);
1462   if (values_num < 1)
1463     return send_response(sock, RESP_ERR, "No values updated.\n");
1464   else
1465     return send_response(sock, RESP_OK,
1466                          "errors, enqueued %i value(s).\n", values_num);
1468   /* NOTREACHED */
1469   assert(1==0);
1471 } /* }}} int handle_request_update */
1473 /* we came across a "WROTE" entry during journal replay.
1474  * throw away any values that we have accumulated for this file
1475  */
1476 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1478   cache_item_t *ci;
1479   const char *file = buffer;
1481   pthread_mutex_lock(&cache_lock);
1483   ci = g_tree_lookup(cache_tree, file);
1484   if (ci == NULL)
1485   {
1486     pthread_mutex_unlock(&cache_lock);
1487     return (0);
1488   }
1490   if (ci->values)
1491     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1493   wipe_ci_values(ci, now);
1494   remove_from_queue(ci);
1496   pthread_mutex_unlock(&cache_lock);
1497   return (0);
1498 } /* }}} int handle_request_wrote */
1500 /* start "BATCH" processing */
1501 static int batch_start (HANDLER_PROTO) /* {{{ */
1503   int status;
1504   if (sock->batch_start)
1505     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1507   status = send_response(sock, RESP_OK,
1508                          "Go ahead.  End with dot '.' on its own line.\n");
1509   sock->batch_start = time(NULL);
1510   sock->batch_cmd = 0;
1512   return status;
1513 } /* }}} static int batch_start */
1515 /* finish "BATCH" processing and return results to the client */
1516 static int batch_done (HANDLER_PROTO) /* {{{ */
1518   assert(sock->batch_start);
1519   sock->batch_start = 0;
1520   sock->batch_cmd  = 0;
1521   return send_response(sock, RESP_OK, "errors\n");
1522 } /* }}} static int batch_done */
1524 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1526   return -1;
1527 } /* }}} static int handle_request_quit */
1529 static command_t list_of_commands[] = { /* {{{ */
1530   {
1531     "UPDATE",
1532     handle_request_update,
1533     CMD_CONTEXT_ANY,
1534     "UPDATE <filename> <values> [<values> ...]\n"
1535     ,
1536     "Adds the given file to the internal cache if it is not yet known and\n"
1537     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1538     "for details.\n"
1539     "\n"
1540     "Each <values> has the following form:\n"
1541     "  <values> = <time>:<value>[:<value>[...]]\n"
1542     "See the rrdupdate(1) manpage for details.\n"
1543   },
1544   {
1545     "WROTE",
1546     handle_request_wrote,
1547     CMD_CONTEXT_JOURNAL,
1548     NULL,
1549     NULL
1550   },
1551   {
1552     "FLUSH",
1553     handle_request_flush,
1554     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1555     "FLUSH <filename>\n"
1556     ,
1557     "Adds the given filename to the head of the update queue and returns\n"
1558     "after it has been dequeued.\n"
1559   },
1560   {
1561     "FLUSHALL",
1562     handle_request_flushall,
1563     CMD_CONTEXT_CLIENT,
1564     "FLUSHALL\n"
1565     ,
1566     "Triggers writing of all pending updates.  Returns immediately.\n"
1567   },
1568   {
1569     "PENDING",
1570     handle_request_pending,
1571     CMD_CONTEXT_CLIENT,
1572     "PENDING <filename>\n"
1573     ,
1574     "Shows any 'pending' updates for a file, in order.\n"
1575     "The updates shown have not yet been written to the underlying RRD file.\n"
1576   },
1577   {
1578     "FORGET",
1579     handle_request_forget,
1580     CMD_CONTEXT_ANY,
1581     "FORGET <filename>\n"
1582     ,
1583     "Removes the file completely from the cache.\n"
1584     "Any pending updates for the file will be lost.\n"
1585   },
1586   {
1587     "QUEUE",
1588     handle_request_queue,
1589     CMD_CONTEXT_CLIENT,
1590     "QUEUE\n"
1591     ,
1592         "Shows all files in the output queue.\n"
1593     "The output is zero or more lines in the following format:\n"
1594     "(where <num_vals> is the number of values to be written)\n"
1595     "\n"
1596     "<num_vals> <filename>\n"
1597   },
1598   {
1599     "STATS",
1600     handle_request_stats,
1601     CMD_CONTEXT_CLIENT,
1602     "STATS\n"
1603     ,
1604     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1605     "a description of the values.\n"
1606   },
1607   {
1608     "HELP",
1609     handle_request_help,
1610     CMD_CONTEXT_CLIENT,
1611     "HELP [<command>]\n",
1612     NULL, /* special! */
1613   },
1614   {
1615     "BATCH",
1616     batch_start,
1617     CMD_CONTEXT_CLIENT,
1618     "BATCH\n"
1619     ,
1620     "The 'BATCH' command permits the client to initiate a bulk load\n"
1621     "   of commands to rrdcached.\n"
1622     "\n"
1623     "Usage:\n"
1624     "\n"
1625     "    client: BATCH\n"
1626     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1627     "    client: command #1\n"
1628     "    client: command #2\n"
1629     "    client: ... and so on\n"
1630     "    client: .\n"
1631     "    server: 2 errors\n"
1632     "    server: 7 message for command #7\n"
1633     "    server: 9 message for command #9\n"
1634     "\n"
1635     "For more information, consult the rrdcached(1) documentation.\n"
1636   },
1637   {
1638     ".",   /* BATCH terminator */
1639     batch_done,
1640     CMD_CONTEXT_BATCH,
1641     NULL,
1642     NULL
1643   },
1644   {
1645     "QUIT",
1646     handle_request_quit,
1647     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1648     "QUIT\n"
1649     ,
1650     "Disconnect from rrdcached.\n"
1651   }
1652 }; /* }}} command_t list_of_commands[] */
1653 static size_t list_of_commands_len = sizeof (list_of_commands)
1654   / sizeof (list_of_commands[0]);
1656 static command_t *find_command(char *cmd)
1658   size_t i;
1660   for (i = 0; i < list_of_commands_len; i++)
1661     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1662       return (&list_of_commands[i]);
1663   return NULL;
1666 /* We currently use the index in the `list_of_commands' array as a bit position
1667  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1668  * outside these functions so that switching to a more elegant storage method
1669  * is easily possible. */
1670 static ssize_t find_command_index (const char *cmd) /* {{{ */
1672   size_t i;
1674   for (i = 0; i < list_of_commands_len; i++)
1675     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1676       return ((ssize_t) i);
1677   return (-1);
1678 } /* }}} ssize_t find_command_index */
1680 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1681     const char *cmd)
1683   ssize_t i;
1685   if (JOURNAL_REPLAY(sock))
1686     return (1);
1688   if (cmd == NULL)
1689     return (-1);
1691   if ((strcasecmp ("QUIT", cmd) == 0)
1692       || (strcasecmp ("HELP", cmd) == 0))
1693     return (1);
1694   else if (strcmp (".", cmd) == 0)
1695     cmd = "BATCH";
1697   i = find_command_index (cmd);
1698   if (i < 0)
1699     return (-1);
1700   assert (i < 32);
1702   if ((sock->permissions & (1 << i)) != 0)
1703     return (1);
1704   return (0);
1705 } /* }}} int socket_permission_check */
1707 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1708     const char *cmd)
1710   ssize_t i;
1712   i = find_command_index (cmd);
1713   if (i < 0)
1714     return (-1);
1715   assert (i < 32);
1717   sock->permissions |= (1 << i);
1718   return (0);
1719 } /* }}} int socket_permission_add */
1721 /* check whether commands are received in the expected context */
1722 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1724   if (JOURNAL_REPLAY(sock))
1725     return (cmd->context & CMD_CONTEXT_JOURNAL);
1726   else if (sock->batch_start)
1727     return (cmd->context & CMD_CONTEXT_BATCH);
1728   else
1729     return (cmd->context & CMD_CONTEXT_CLIENT);
1731   /* NOTREACHED */
1732   assert(1==0);
1735 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1737   int status;
1738   char *cmd_str;
1739   char *resp_txt;
1740   command_t *help = NULL;
1742   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1743   if (status == 0)
1744     help = find_command(cmd_str);
1746   if (help && (help->syntax || help->help))
1747   {
1748     char tmp[CMD_MAX];
1750     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1751     resp_txt = tmp;
1753     if (help->syntax)
1754       add_response_info(sock, "Usage: %s\n", help->syntax);
1756     if (help->help)
1757       add_response_info(sock, "%s\n", help->help);
1758   }
1759   else
1760   {
1761     size_t i;
1763     resp_txt = "Command overview\n";
1765     for (i = 0; i < list_of_commands_len; i++)
1766     {
1767       if (list_of_commands[i].syntax == NULL)
1768         continue;
1769       add_response_info (sock, "%s", list_of_commands[i].syntax);
1770     }
1771   }
1773   return send_response(sock, RESP_OK, resp_txt);
1774 } /* }}} int handle_request_help */
1776 static int handle_request (DISPATCH_PROTO) /* {{{ */
1778   char *buffer_ptr = buffer;
1779   char *cmd_str = NULL;
1780   command_t *cmd = NULL;
1781   int status;
1783   assert (buffer[buffer_size - 1] == '\0');
1785   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1786   if (status != 0)
1787   {
1788     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1789     return (-1);
1790   }
1792   if (sock != NULL && sock->batch_start)
1793     sock->batch_cmd++;
1795   cmd = find_command(cmd_str);
1796   if (!cmd)
1797     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1799   if (!socket_permission_check (sock, cmd->cmd))
1800     return send_response(sock, RESP_ERR, "Permission denied.\n");
1802   if (!command_check_context(sock, cmd))
1803     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1805   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1806 } /* }}} int handle_request */
1808 static void journal_set_free (journal_set *js) /* {{{ */
1810   if (js == NULL)
1811     return;
1813   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1815   free(js);
1816 } /* }}} journal_set_free */
1818 static void journal_set_remove (journal_set *js) /* {{{ */
1820   if (js == NULL)
1821     return;
1823   for (uint i=0; i < js->files_num; i++)
1824   {
1825     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1826     unlink(js->files[i]);
1827   }
1828 } /* }}} journal_set_remove */
1830 /* close current journal file handle.
1831  * MUST hold journal_lock before calling */
1832 static void journal_close(void) /* {{{ */
1834   if (journal_fh != NULL)
1835   {
1836     if (fclose(journal_fh) != 0)
1837       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1838   }
1840   journal_fh = NULL;
1841   journal_size = 0;
1842 } /* }}} journal_close */
1844 /* MUST hold journal_lock before calling */
1845 static void journal_new_file(void) /* {{{ */
1847   struct timeval now;
1848   int  new_fd;
1849   char new_file[PATH_MAX + 1];
1851   assert(journal_dir != NULL);
1852   assert(journal_cur != NULL);
1854   journal_close();
1856   gettimeofday(&now, NULL);
1857   /* this format assures that the files sort in strcmp() order */
1858   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1859            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1861   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1862                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1863   if (new_fd < 0)
1864     goto error;
1866   journal_fh = fdopen(new_fd, "a");
1867   if (journal_fh == NULL)
1868     goto error;
1870   journal_size = ftell(journal_fh);
1871   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1873   /* record the file in the journal set */
1874   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1876   return;
1878 error:
1879   RRDD_LOG(LOG_CRIT,
1880            "JOURNALING DISABLED: Error while trying to create %s : %s",
1881            new_file, rrd_strerror(errno));
1882   RRDD_LOG(LOG_CRIT,
1883            "JOURNALING DISABLED: All values will be flushed at shutdown");
1885   close(new_fd);
1886   config_flush_at_shutdown = 1;
1888 } /* }}} journal_new_file */
1890 /* MUST NOT hold journal_lock before calling this */
1891 static void journal_rotate(void) /* {{{ */
1893   journal_set *old_js = NULL;
1895   if (journal_dir == NULL)
1896     return;
1898   RRDD_LOG(LOG_DEBUG, "rotating journals");
1900   pthread_mutex_lock(&stats_lock);
1901   ++stats_journal_rotate;
1902   pthread_mutex_unlock(&stats_lock);
1904   pthread_mutex_lock(&journal_lock);
1906   journal_close();
1908   /* rotate the journal sets */
1909   old_js = journal_old;
1910   journal_old = journal_cur;
1911   journal_cur = calloc(1, sizeof(journal_set));
1913   if (journal_cur != NULL)
1914     journal_new_file();
1915   else
1916     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1918   pthread_mutex_unlock(&journal_lock);
1920   journal_set_remove(old_js);
1921   journal_set_free  (old_js);
1923 } /* }}} static void journal_rotate */
1925 /* MUST hold journal_lock when calling */
1926 static void journal_done(void) /* {{{ */
1928   if (journal_cur == NULL)
1929     return;
1931   journal_close();
1933   if (config_flush_at_shutdown)
1934   {
1935     RRDD_LOG(LOG_INFO, "removing journals");
1936     journal_set_remove(journal_old);
1937     journal_set_remove(journal_cur);
1938   }
1939   else
1940   {
1941     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1942              "journals will be used at next startup");
1943   }
1945   journal_set_free(journal_cur);
1946   journal_set_free(journal_old);
1947   free(journal_dir);
1949 } /* }}} static void journal_done */
1951 static int journal_write(char *cmd, char *args) /* {{{ */
1953   int chars;
1955   if (journal_fh == NULL)
1956     return 0;
1958   pthread_mutex_lock(&journal_lock);
1959   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1960   journal_size += chars;
1962   if (journal_size > JOURNAL_MAX)
1963     journal_new_file();
1965   pthread_mutex_unlock(&journal_lock);
1967   if (chars > 0)
1968   {
1969     pthread_mutex_lock(&stats_lock);
1970     stats_journal_bytes += chars;
1971     pthread_mutex_unlock(&stats_lock);
1972   }
1974   return chars;
1975 } /* }}} static int journal_write */
1977 static int journal_replay (const char *file) /* {{{ */
1979   FILE *fh;
1980   int entry_cnt = 0;
1981   int fail_cnt = 0;
1982   uint64_t line = 0;
1983   char entry[CMD_MAX];
1984   time_t now;
1986   if (file == NULL) return 0;
1988   {
1989     char *reason = "unknown error";
1990     int status = 0;
1991     struct stat statbuf;
1993     memset(&statbuf, 0, sizeof(statbuf));
1994     if (stat(file, &statbuf) != 0)
1995     {
1996       reason = "stat error";
1997       status = errno;
1998     }
1999     else if (!S_ISREG(statbuf.st_mode))
2000     {
2001       reason = "not a regular file";
2002       status = EPERM;
2003     }
2004     if (statbuf.st_uid != daemon_uid)
2005     {
2006       reason = "not owned by daemon user";
2007       status = EACCES;
2008     }
2009     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2010     {
2011       reason = "must not be user/group writable";
2012       status = EACCES;
2013     }
2015     if (status != 0)
2016     {
2017       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2018                file, rrd_strerror(status), reason);
2019       return 0;
2020     }
2021   }
2023   fh = fopen(file, "r");
2024   if (fh == NULL)
2025   {
2026     if (errno != ENOENT)
2027       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2028                file, rrd_strerror(errno));
2029     return 0;
2030   }
2031   else
2032     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2034   now = time(NULL);
2036   while(!feof(fh))
2037   {
2038     size_t entry_len;
2040     ++line;
2041     if (fgets(entry, sizeof(entry), fh) == NULL)
2042       break;
2043     entry_len = strlen(entry);
2045     /* check \n termination in case journal writing crashed mid-line */
2046     if (entry_len == 0)
2047       continue;
2048     else if (entry[entry_len - 1] != '\n')
2049     {
2050       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2051       ++fail_cnt;
2052       continue;
2053     }
2055     entry[entry_len - 1] = '\0';
2057     if (handle_request(NULL, now, entry, entry_len) == 0)
2058       ++entry_cnt;
2059     else
2060       ++fail_cnt;
2061   }
2063   fclose(fh);
2065   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2066            entry_cnt, fail_cnt);
2068   return entry_cnt > 0 ? 1 : 0;
2069 } /* }}} static int journal_replay */
2071 static int journal_sort(const void *v1, const void *v2)
2073   char **jn1 = (char **) v1;
2074   char **jn2 = (char **) v2;
2076   return strcmp(*jn1,*jn2);
2079 static void journal_init(void) /* {{{ */
2081   int had_journal = 0;
2082   DIR *dir;
2083   struct dirent *dent;
2084   char path[PATH_MAX+1];
2086   if (journal_dir == NULL) return;
2088   pthread_mutex_lock(&journal_lock);
2090   journal_cur = calloc(1, sizeof(journal_set));
2091   if (journal_cur == NULL)
2092   {
2093     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2094     return;
2095   }
2097   RRDD_LOG(LOG_INFO, "checking for journal files");
2099   /* Handle old journal files during transition.  This gives them the
2100    * correct sort order.  TODO: remove after first release
2101    */
2102   {
2103     char old_path[PATH_MAX+1];
2104     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2105     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2106     rename(old_path, path);
2108     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2109     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2110     rename(old_path, path);
2111   }
2113   dir = opendir(journal_dir);
2114   if (!dir) {
2115     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2116     return;
2117   }
2118   while ((dent = readdir(dir)) != NULL)
2119   {
2120     /* looks like a journal file? */
2121     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2122       continue;
2124     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2126     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2127     {
2128       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2129                dent->d_name);
2130       break;
2131     }
2132   }
2133   closedir(dir);
2135   qsort(journal_cur->files, journal_cur->files_num,
2136         sizeof(journal_cur->files[0]), journal_sort);
2138   for (uint i=0; i < journal_cur->files_num; i++)
2139     had_journal += journal_replay(journal_cur->files[i]);
2141   journal_new_file();
2143   /* it must have been a crash.  start a flush */
2144   if (had_journal && config_flush_at_shutdown)
2145     flush_old_values(-1);
2147   pthread_mutex_unlock(&journal_lock);
2149   RRDD_LOG(LOG_INFO, "journal processing complete");
2151 } /* }}} static void journal_init */
2153 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2155   assert(sock != NULL);
2157   free(sock->rbuf);  sock->rbuf = NULL;
2158   free(sock->wbuf);  sock->wbuf = NULL;
2159   free(sock);
2160 } /* }}} void free_listen_socket */
2162 static void close_connection(listen_socket_t *sock) /* {{{ */
2164   if (sock->fd >= 0)
2165   {
2166     close(sock->fd);
2167     sock->fd = -1;
2168   }
2170   free_listen_socket(sock);
2172 } /* }}} void close_connection */
2174 static void *connection_thread_main (void *args) /* {{{ */
2176   listen_socket_t *sock;
2177   int fd;
2179   sock = (listen_socket_t *) args;
2180   fd = sock->fd;
2182   /* init read buffers */
2183   sock->next_read = sock->next_cmd = 0;
2184   sock->rbuf = malloc(RBUF_SIZE);
2185   if (sock->rbuf == NULL)
2186   {
2187     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2188     close_connection(sock);
2189     return NULL;
2190   }
2192   pthread_mutex_lock (&connection_threads_lock);
2193   connection_threads_num++;
2194   pthread_mutex_unlock (&connection_threads_lock);
2196   while (state == RUNNING)
2197   {
2198     char *cmd;
2199     ssize_t cmd_len;
2200     ssize_t rbytes;
2201     time_t now;
2203     struct pollfd pollfd;
2204     int status;
2206     pollfd.fd = fd;
2207     pollfd.events = POLLIN | POLLPRI;
2208     pollfd.revents = 0;
2210     status = poll (&pollfd, 1, /* timeout = */ 500);
2211     if (state != RUNNING)
2212       break;
2213     else if (status == 0) /* timeout */
2214       continue;
2215     else if (status < 0) /* error */
2216     {
2217       status = errno;
2218       if (status != EINTR)
2219         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2220       continue;
2221     }
2223     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2224       break;
2225     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2226     {
2227       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2228           "poll(2) returned something unexpected: %#04hx",
2229           pollfd.revents);
2230       break;
2231     }
2233     rbytes = read(fd, sock->rbuf + sock->next_read,
2234                   RBUF_SIZE - sock->next_read);
2235     if (rbytes < 0)
2236     {
2237       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2238       break;
2239     }
2240     else if (rbytes == 0)
2241       break; /* eof */
2243     sock->next_read += rbytes;
2245     if (sock->batch_start)
2246       now = sock->batch_start;
2247     else
2248       now = time(NULL);
2250     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2251     {
2252       status = handle_request (sock, now, cmd, cmd_len+1);
2253       if (status != 0)
2254         goto out_close;
2255     }
2256   }
2258 out_close:
2259   close_connection(sock);
2261   /* Remove this thread from the connection threads list */
2262   pthread_mutex_lock (&connection_threads_lock);
2263   connection_threads_num--;
2264   if (connection_threads_num <= 0)
2265     pthread_cond_broadcast(&connection_threads_done);
2266   pthread_mutex_unlock (&connection_threads_lock);
2268   return (NULL);
2269 } /* }}} void *connection_thread_main */
2271 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2273   int fd;
2274   struct sockaddr_un sa;
2275   listen_socket_t *temp;
2276   int status;
2277   const char *path;
2278   char *path_copy, *dir;
2280   path = sock->addr;
2281   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2282     path += strlen("unix:");
2284   /* dirname may modify its argument */
2285   path_copy = strdup(path);
2286   if (path_copy == NULL)
2287   {
2288     fprintf(stderr, "rrdcached: strdup(): %s\n",
2289         rrd_strerror(errno));
2290     return (-1);
2291   }
2293   dir = dirname(path_copy);
2294   if (rrd_mkdir_p(dir, 0777) != 0)
2295   {
2296     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2297         dir, rrd_strerror(errno));
2298     return (-1);
2299   }
2301   free(path_copy);
2303   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2304       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2305   if (temp == NULL)
2306   {
2307     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2308     return (-1);
2309   }
2310   listen_fds = temp;
2311   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2313   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2314   if (fd < 0)
2315   {
2316     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2317              rrd_strerror(errno));
2318     return (-1);
2319   }
2321   memset (&sa, 0, sizeof (sa));
2322   sa.sun_family = AF_UNIX;
2323   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2325   /* if we've gotten this far, we own the pid file.  any daemon started
2326    * with the same args must not be alive.  therefore, ensure that we can
2327    * create the socket...
2328    */
2329   unlink(path);
2331   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2332   if (status != 0)
2333   {
2334     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2335              path, rrd_strerror(errno));
2336     close (fd);
2337     return (-1);
2338   }
2340   /* tweak the sockets group ownership */
2341   if (sock->socket_group != (gid_t)-1)
2342   {
2343     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2344          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2345     {
2346       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2347     }
2348   }
2350   if (sock->socket_permissions != (mode_t)-1)
2351   {
2352     if (chmod(path, sock->socket_permissions) != 0)
2353       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2354           (unsigned int)sock->socket_permissions, strerror(errno));
2355   }
2357   status = listen (fd, /* backlog = */ 10);
2358   if (status != 0)
2359   {
2360     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2361              path, rrd_strerror(errno));
2362     close (fd);
2363     unlink (path);
2364     return (-1);
2365   }
2367   listen_fds[listen_fds_num].fd = fd;
2368   listen_fds[listen_fds_num].family = PF_UNIX;
2369   strncpy(listen_fds[listen_fds_num].addr, path,
2370           sizeof (listen_fds[listen_fds_num].addr) - 1);
2371   listen_fds_num++;
2373   return (0);
2374 } /* }}} int open_listen_socket_unix */
2376 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2378   struct addrinfo ai_hints;
2379   struct addrinfo *ai_res;
2380   struct addrinfo *ai_ptr;
2381   char addr_copy[NI_MAXHOST];
2382   char *addr;
2383   char *port;
2384   int status;
2386   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2387   addr_copy[sizeof (addr_copy) - 1] = 0;
2388   addr = addr_copy;
2390   memset (&ai_hints, 0, sizeof (ai_hints));
2391   ai_hints.ai_flags = 0;
2392 #ifdef AI_ADDRCONFIG
2393   ai_hints.ai_flags |= AI_ADDRCONFIG;
2394 #endif
2395   ai_hints.ai_family = AF_UNSPEC;
2396   ai_hints.ai_socktype = SOCK_STREAM;
2398   port = NULL;
2399   if (*addr == '[') /* IPv6+port format */
2400   {
2401     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2402     addr++;
2404     port = strchr (addr, ']');
2405     if (port == NULL)
2406     {
2407       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2408       return (-1);
2409     }
2410     *port = 0;
2411     port++;
2413     if (*port == ':')
2414       port++;
2415     else if (*port == 0)
2416       port = NULL;
2417     else
2418     {
2419       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2420       return (-1);
2421     }
2422   } /* if (*addr == '[') */
2423   else
2424   {
2425     port = rindex(addr, ':');
2426     if (port != NULL)
2427     {
2428       *port = 0;
2429       port++;
2430     }
2431   }
2432   ai_res = NULL;
2433   status = getaddrinfo (addr,
2434                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2435                         &ai_hints, &ai_res);
2436   if (status != 0)
2437   {
2438     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2439              addr, gai_strerror (status));
2440     return (-1);
2441   }
2443   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2444   {
2445     int fd;
2446     listen_socket_t *temp;
2447     int one = 1;
2449     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2450         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2451     if (temp == NULL)
2452     {
2453       fprintf (stderr,
2454                "rrdcached: open_listen_socket_network: realloc failed.\n");
2455       continue;
2456     }
2457     listen_fds = temp;
2458     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2460     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2461     if (fd < 0)
2462     {
2463       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2464                rrd_strerror(errno));
2465       continue;
2466     }
2468     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2470     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2471     if (status != 0)
2472     {
2473       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2474                sock->addr, rrd_strerror(errno));
2475       close (fd);
2476       continue;
2477     }
2479     status = listen (fd, /* backlog = */ 10);
2480     if (status != 0)
2481     {
2482       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2483                sock->addr, rrd_strerror(errno));
2484       close (fd);
2485       freeaddrinfo(ai_res);
2486       return (-1);
2487     }
2489     listen_fds[listen_fds_num].fd = fd;
2490     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2491     listen_fds_num++;
2492   } /* for (ai_ptr) */
2494   freeaddrinfo(ai_res);
2495   return (0);
2496 } /* }}} static int open_listen_socket_network */
2498 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2500   assert(sock != NULL);
2501   assert(sock->addr != NULL);
2503   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2504       || sock->addr[0] == '/')
2505     return (open_listen_socket_unix(sock));
2506   else
2507     return (open_listen_socket_network(sock));
2508 } /* }}} int open_listen_socket */
2510 static int close_listen_sockets (void) /* {{{ */
2512   size_t i;
2514   for (i = 0; i < listen_fds_num; i++)
2515   {
2516     close (listen_fds[i].fd);
2518     if (listen_fds[i].family == PF_UNIX)
2519       unlink(listen_fds[i].addr);
2520   }
2522   free (listen_fds);
2523   listen_fds = NULL;
2524   listen_fds_num = 0;
2526   return (0);
2527 } /* }}} int close_listen_sockets */
2529 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2531   struct pollfd *pollfds;
2532   int pollfds_num;
2533   int status;
2534   int i;
2536   if (listen_fds_num < 1)
2537   {
2538     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2539     return (NULL);
2540   }
2542   pollfds_num = listen_fds_num;
2543   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2544   if (pollfds == NULL)
2545   {
2546     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2547     return (NULL);
2548   }
2549   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2551   RRDD_LOG(LOG_INFO, "listening for connections");
2553   while (state == RUNNING)
2554   {
2555     for (i = 0; i < pollfds_num; i++)
2556     {
2557       pollfds[i].fd = listen_fds[i].fd;
2558       pollfds[i].events = POLLIN | POLLPRI;
2559       pollfds[i].revents = 0;
2560     }
2562     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2563     if (state != RUNNING)
2564       break;
2565     else if (status == 0) /* timeout */
2566       continue;
2567     else if (status < 0) /* error */
2568     {
2569       status = errno;
2570       if (status != EINTR)
2571       {
2572         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2573       }
2574       continue;
2575     }
2577     for (i = 0; i < pollfds_num; i++)
2578     {
2579       listen_socket_t *client_sock;
2580       struct sockaddr_storage client_sa;
2581       socklen_t client_sa_size;
2582       pthread_t tid;
2583       pthread_attr_t attr;
2585       if (pollfds[i].revents == 0)
2586         continue;
2588       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2589       {
2590         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2591             "poll(2) returned something unexpected for listen FD #%i.",
2592             pollfds[i].fd);
2593         continue;
2594       }
2596       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2597       if (client_sock == NULL)
2598       {
2599         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2600         continue;
2601       }
2602       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2604       client_sa_size = sizeof (client_sa);
2605       client_sock->fd = accept (pollfds[i].fd,
2606           (struct sockaddr *) &client_sa, &client_sa_size);
2607       if (client_sock->fd < 0)
2608       {
2609         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2610         free(client_sock);
2611         continue;
2612       }
2614       pthread_attr_init (&attr);
2615       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2617       status = pthread_create (&tid, &attr, connection_thread_main,
2618                                client_sock);
2619       if (status != 0)
2620       {
2621         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2622         close_connection(client_sock);
2623         continue;
2624       }
2625     } /* for (pollfds_num) */
2626   } /* while (state == RUNNING) */
2628   RRDD_LOG(LOG_INFO, "starting shutdown");
2630   close_listen_sockets ();
2632   pthread_mutex_lock (&connection_threads_lock);
2633   while (connection_threads_num > 0)
2634     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2635   pthread_mutex_unlock (&connection_threads_lock);
2637   free(pollfds);
2639   return (NULL);
2640 } /* }}} void *listen_thread_main */
2642 static int daemonize (void) /* {{{ */
2644   int pid_fd;
2645   char *base_dir;
2647   daemon_uid = geteuid();
2649   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2650   if (pid_fd < 0)
2651     pid_fd = check_pidfile();
2652   if (pid_fd < 0)
2653     return pid_fd;
2655   /* open all the listen sockets */
2656   if (config_listen_address_list_len > 0)
2657   {
2658     for (size_t i = 0; i < config_listen_address_list_len; i++)
2659       open_listen_socket (config_listen_address_list[i]);
2661     rrd_free_ptrs((void ***) &config_listen_address_list,
2662                   &config_listen_address_list_len);
2663   }
2664   else
2665   {
2666     listen_socket_t sock;
2667     memset(&sock, 0, sizeof(sock));
2668     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2669     open_listen_socket (&sock);
2670   }
2672   if (listen_fds_num < 1)
2673   {
2674     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2675     goto error;
2676   }
2678   if (!stay_foreground)
2679   {
2680     pid_t child;
2682     child = fork ();
2683     if (child < 0)
2684     {
2685       fprintf (stderr, "daemonize: fork(2) failed.\n");
2686       goto error;
2687     }
2688     else if (child > 0)
2689       exit(0);
2691     /* Become session leader */
2692     setsid ();
2694     /* Open the first three file descriptors to /dev/null */
2695     close (2);
2696     close (1);
2697     close (0);
2699     open ("/dev/null", O_RDWR);
2700     if (dup(0) == -1 || dup(0) == -1){
2701         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2702     }
2703   } /* if (!stay_foreground) */
2705   /* Change into the /tmp directory. */
2706   base_dir = (config_base_dir != NULL)
2707     ? config_base_dir
2708     : "/tmp";
2710   if (chdir (base_dir) != 0)
2711   {
2712     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2713     goto error;
2714   }
2716   install_signal_handlers();
2718   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2719   RRDD_LOG(LOG_INFO, "starting up");
2721   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2722                                 (GDestroyNotify) free_cache_item);
2723   if (cache_tree == NULL)
2724   {
2725     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2726     goto error;
2727   }
2729   return write_pidfile (pid_fd);
2731 error:
2732   remove_pidfile();
2733   return -1;
2734 } /* }}} int daemonize */
2736 static int cleanup (void) /* {{{ */
2738   pthread_cond_broadcast (&flush_cond);
2739   pthread_join (flush_thread, NULL);
2741   pthread_cond_broadcast (&queue_cond);
2742   for (int i = 0; i < config_queue_threads; i++)
2743     pthread_join (queue_threads[i], NULL);
2745   if (config_flush_at_shutdown)
2746   {
2747     assert(cache_queue_head == NULL);
2748     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2749   }
2751   free(queue_threads);
2752   free(config_base_dir);
2754   pthread_mutex_lock(&cache_lock);
2755   g_tree_destroy(cache_tree);
2757   pthread_mutex_lock(&journal_lock);
2758   journal_done();
2760   RRDD_LOG(LOG_INFO, "goodbye");
2761   closelog ();
2763   remove_pidfile ();
2764   free(config_pid_file);
2766   return (0);
2767 } /* }}} int cleanup */
2769 static int read_options (int argc, char **argv) /* {{{ */
2771   int option;
2772   int status = 0;
2774   char **permissions = NULL;
2775   size_t permissions_len = 0;
2777   gid_t  socket_group = (gid_t)-1;
2778   mode_t socket_permissions = (mode_t)-1;
2780   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2781   {
2782     switch (option)
2783     {
2784       case 'g':
2785         stay_foreground=1;
2786         break;
2788       case 'l':
2789       {
2790         listen_socket_t *new;
2792         new = malloc(sizeof(listen_socket_t));
2793         if (new == NULL)
2794         {
2795           fprintf(stderr, "read_options: malloc failed.\n");
2796           return(2);
2797         }
2798         memset(new, 0, sizeof(listen_socket_t));
2800         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2802         /* Add permissions to the socket {{{ */
2803         if (permissions_len != 0)
2804         {
2805           size_t i;
2806           for (i = 0; i < permissions_len; i++)
2807           {
2808             status = socket_permission_add (new, permissions[i]);
2809             if (status != 0)
2810             {
2811               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2812                   "socket failed. Most likely, this permission doesn't "
2813                   "exist. Check your command line.\n", permissions[i]);
2814               status = 4;
2815             }
2816           }
2817         }
2818         else /* if (permissions_len == 0) */
2819         {
2820           /* Add permission for ALL commands to the socket. */
2821           size_t i;
2822           for (i = 0; i < list_of_commands_len; i++)
2823           {
2824             status = socket_permission_add (new, list_of_commands[i].cmd);
2825             if (status != 0)
2826             {
2827               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2828                   "socket failed. This should never happen, ever! Sorry.\n",
2829                   permissions[i]);
2830               status = 4;
2831             }
2832           }
2833         }
2834         /* }}} Done adding permissions. */
2836         new->socket_group = socket_group;
2837         new->socket_permissions = socket_permissions;
2839         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2840                          &config_listen_address_list_len, new))
2841         {
2842           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2843           return (2);
2844         }
2845       }
2846       break;
2848       /* set socket group permissions */
2849       case 's':
2850       {
2851         gid_t group_gid;
2852         struct group *grp;
2854         group_gid = strtoul(optarg, NULL, 10);
2855         if (errno != EINVAL && group_gid>0)
2856         {
2857           /* we were passed a number */
2858           grp = getgrgid(group_gid);
2859         }
2860         else
2861         {
2862           grp = getgrnam(optarg);
2863         }
2865         if (grp)
2866         {
2867           socket_group = grp->gr_gid;
2868         }
2869         else
2870         {
2871           /* no idea what the user wanted... */
2872           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2873           return (5);
2874         }
2875       }
2876       break;
2878       /* set socket file permissions */
2879       case 'm':
2880       {
2881         long  tmp;
2882         char *endptr = NULL;
2884         tmp = strtol (optarg, &endptr, 8);
2885         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2886             || (tmp > 07777) || (tmp < 0)) {
2887           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2888               optarg);
2889           return (5);
2890         }
2892         socket_permissions = (mode_t)tmp;
2893       }
2894       break;
2896       case 'P':
2897       {
2898         char *optcopy;
2899         char *saveptr;
2900         char *dummy;
2901         char *ptr;
2903         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2905         optcopy = strdup (optarg);
2906         dummy = optcopy;
2907         saveptr = NULL;
2908         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2909         {
2910           dummy = NULL;
2911           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2912         }
2914         free (optcopy);
2915       }
2916       break;
2918       case 'f':
2919       {
2920         int temp;
2922         temp = atoi (optarg);
2923         if (temp > 0)
2924           config_flush_interval = temp;
2925         else
2926         {
2927           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2928           status = 3;
2929         }
2930       }
2931       break;
2933       case 'w':
2934       {
2935         int temp;
2937         temp = atoi (optarg);
2938         if (temp > 0)
2939           config_write_interval = temp;
2940         else
2941         {
2942           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2943           status = 2;
2944         }
2945       }
2946       break;
2948       case 'z':
2949       {
2950         int temp;
2952         temp = atoi(optarg);
2953         if (temp > 0)
2954           config_write_jitter = temp;
2955         else
2956         {
2957           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2958           status = 2;
2959         }
2961         break;
2962       }
2964       case 't':
2965       {
2966         int threads;
2967         threads = atoi(optarg);
2968         if (threads >= 1)
2969           config_queue_threads = threads;
2970         else
2971         {
2972           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2973           return 1;
2974         }
2975       }
2976       break;
2978       case 'B':
2979         config_write_base_only = 1;
2980         break;
2982       case 'b':
2983       {
2984         size_t len;
2985         char base_realpath[PATH_MAX];
2987         if (config_base_dir != NULL)
2988           free (config_base_dir);
2989         config_base_dir = strdup (optarg);
2990         if (config_base_dir == NULL)
2991         {
2992           fprintf (stderr, "read_options: strdup failed.\n");
2993           return (3);
2994         }
2996         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2997         {
2998           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2999               config_base_dir, rrd_strerror (errno));
3000           return (3);
3001         }
3003         /* make sure that the base directory is not resolved via
3004          * symbolic links.  this makes some performance-enhancing
3005          * assumptions possible (we don't have to resolve paths
3006          * that start with a "/")
3007          */
3008         if (realpath(config_base_dir, base_realpath) == NULL)
3009         {
3010           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3011               "%s\n", config_base_dir, rrd_strerror(errno));
3012           return 5;
3013         }
3015         len = strlen (config_base_dir);
3016         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3017         {
3018           config_base_dir[len - 1] = 0;
3019           len--;
3020         }
3022         if (len < 1)
3023         {
3024           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3025           return (4);
3026         }
3028         _config_base_dir_len = len;
3030         len = strlen (base_realpath);
3031         while ((len > 0) && (base_realpath[len - 1] == '/'))
3032         {
3033           base_realpath[len - 1] = '\0';
3034           len--;
3035         }
3037         if (strncmp(config_base_dir,
3038                          base_realpath, sizeof(base_realpath)) != 0)
3039         {
3040           fprintf(stderr,
3041                   "Base directory (-b) resolved via file system links!\n"
3042                   "Please consult rrdcached '-b' documentation!\n"
3043                   "Consider specifying the real directory (%s)\n",
3044                   base_realpath);
3045           return 5;
3046         }
3047       }
3048       break;
3050       case 'p':
3051       {
3052         if (config_pid_file != NULL)
3053           free (config_pid_file);
3054         config_pid_file = strdup (optarg);
3055         if (config_pid_file == NULL)
3056         {
3057           fprintf (stderr, "read_options: strdup failed.\n");
3058           return (3);
3059         }
3060       }
3061       break;
3063       case 'F':
3064         config_flush_at_shutdown = 1;
3065         break;
3067       case 'j':
3068       {
3069         char journal_dir_actual[PATH_MAX];
3070         const char *dir;
3071         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3073         status = rrd_mkdir_p(dir, 0777);
3074         if (status != 0)
3075         {
3076           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3077               dir, rrd_strerror(errno));
3078           return 6;
3079         }
3081         if (access(dir, R_OK|W_OK|X_OK) != 0)
3082         {
3083           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3084                   errno ? rrd_strerror(errno) : "");
3085           return 6;
3086         }
3087       }
3088       break;
3090       case 'h':
3091       case '?':
3092         printf ("RRDCacheD %s\n"
3093             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3094             "\n"
3095             "Usage: rrdcached [options]\n"
3096             "\n"
3097             "Valid options are:\n"
3098             "  -l <address>  Socket address to listen to.\n"
3099             "  -P <perms>    Sets the permissions to assign to all following "
3100                             "sockets\n"
3101             "  -w <seconds>  Interval in which to write data.\n"
3102             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3103             "  -t <threads>  Number of write threads.\n"
3104             "  -f <seconds>  Interval in which to flush dead data.\n"
3105             "  -p <file>     Location of the PID-file.\n"
3106             "  -b <dir>      Base directory to change to.\n"
3107             "  -B            Restrict file access to paths within -b <dir>\n"
3108             "  -g            Do not fork and run in the foreground.\n"
3109             "  -j <dir>      Directory in which to create the journal files.\n"
3110             "  -F            Always flush all updates at shutdown\n"
3111             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3112             "                (the socket will also have read/write permissions "
3113                             "for that group)\n"
3114             "  -m <mode>     File permissions (octal) of all following UNIX "
3115                             "sockets\n"
3116             "\n"
3117             "For more information and a detailed description of all options "
3118             "please refer\n"
3119             "to the rrdcached(1) manual page.\n",
3120             VERSION);
3121         if (option == 'h')
3122           status = -1;
3123         else
3124           status = 1;
3125         break;
3126     } /* switch (option) */
3127   } /* while (getopt) */
3129   /* advise the user when values are not sane */
3130   if (config_flush_interval < 2 * config_write_interval)
3131     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3132             " 2x write interval (-w) !\n");
3133   if (config_write_jitter > config_write_interval)
3134     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3135             " write interval (-w) !\n");
3137   if (config_write_base_only && config_base_dir == NULL)
3138     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3139             "  Consult the rrdcached documentation\n");
3141   if (journal_dir == NULL)
3142     config_flush_at_shutdown = 1;
3144   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3146   return (status);
3147 } /* }}} int read_options */
3149 int main (int argc, char **argv)
3151   int status;
3153   status = read_options (argc, argv);
3154   if (status != 0)
3155   {
3156     if (status < 0)
3157       status = 0;
3158     return (status);
3159   }
3161   status = daemonize ();
3162   if (status != 0)
3163   {
3164     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3165     return (1);
3166   }
3168   journal_init();
3170   /* start the queue threads */
3171   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3172   if (queue_threads == NULL)
3173   {
3174     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3175     cleanup();
3176     return (1);
3177   }
3178   for (int i = 0; i < config_queue_threads; i++)
3179   {
3180     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3181     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3182     if (status != 0)
3183     {
3184       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3185       cleanup();
3186       return (1);
3187     }
3188   }
3190   /* start the flush thread */
3191   memset(&flush_thread, 0, sizeof(flush_thread));
3192   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3193   if (status != 0)
3194   {
3195     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3196     cleanup();
3197     return (1);
3198   }
3200   listen_thread_main (NULL);
3201   cleanup ();
3203   return (0);
3204 } /* int main */
3206 /*
3207  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3208  */