Code

* Add utility functions to allocate pointers in variable size chunks.
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
111 #include <glib-2.0/glib.h>
112 /* }}} */
114 #define RRDD_LOG(severity, ...) \
115   do { \
116     if (stay_foreground) \
117       fprintf(stderr, __VA_ARGS__); \
118     syslog ((severity), __VA_ARGS__); \
119   } while (0)
121 #ifndef __GNUC__
122 # define __attribute__(x) /**/
123 #endif
125 /*
126  * Types
127  */
128 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
130 struct listen_socket_s
132   int fd;
133   char addr[PATH_MAX + 1];
134   int family;
136   /* state for BATCH processing */
137   time_t batch_start;
138   int batch_cmd;
140   /* buffered IO */
141   char *rbuf;
142   off_t next_cmd;
143   off_t next_read;
145   char *wbuf;
146   ssize_t wbuf_len;
148   uint32_t permissions;
150   gid_t  socket_group;
151   mode_t socket_permissions;
152 };
153 typedef struct listen_socket_s listen_socket_t;
155 struct command_s;
156 typedef struct command_s command_t;
157 /* note: guard against "unused" warnings in the handlers */
158 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
159                         time_t now              __attribute__((unused)),\
160                         char  *buffer           __attribute__((unused)),\
161                         size_t buffer_size      __attribute__((unused))
163 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
164                         DISPATCH_PROTO
166 struct command_s {
167   char   *cmd;
168   int (*handler)(HANDLER_PROTO);
170   char  context;                /* where we expect to see it */
171 #define CMD_CONTEXT_CLIENT      (1<<0)
172 #define CMD_CONTEXT_BATCH       (1<<1)
173 #define CMD_CONTEXT_JOURNAL     (1<<2)
174 #define CMD_CONTEXT_ANY         (0x7f)
176   char *syntax;
177   char *help;
178 };
180 struct cache_item_s;
181 typedef struct cache_item_s cache_item_t;
182 struct cache_item_s
184   char *file;
185   char **values;
186   size_t values_num;            /* number of valid pointers */
187   size_t values_alloc;          /* number of allocated pointers */
188   time_t last_flush_time;
189   time_t last_update_stamp;
190 #define CI_FLAGS_IN_TREE  (1<<0)
191 #define CI_FLAGS_IN_QUEUE (1<<1)
192   int flags;
193   pthread_cond_t  flushed;
194   cache_item_t *prev;
195   cache_item_t *next;
196 };
198 struct callback_flush_data_s
200   time_t now;
201   time_t abs_timeout;
202   char **keys;
203   size_t keys_num;
204 };
205 typedef struct callback_flush_data_s callback_flush_data_t;
207 enum queue_side_e
209   HEAD,
210   TAIL
211 };
212 typedef enum queue_side_e queue_side_t;
214 /* describe a set of journal files */
215 typedef struct {
216   char **files;
217   size_t files_num;
218 } journal_set;
220 /* max length of socket command or response */
221 #define CMD_MAX 4096
222 #define RBUF_SIZE (CMD_MAX*2)
224 /*
225  * Variables
226  */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
233 enum {
234   RUNNING,              /* normal operation */
235   FLUSHING,             /* flushing remaining values */
236   SHUTDOWN              /* shutting down */
237 } state = RUNNING;
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
250 /* Cache stuff */
251 static GTree          *cache_tree = NULL;
252 static cache_item_t   *cache_queue_head = NULL;
253 static cache_item_t   *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
256 static int config_write_interval = 300;
257 static int config_write_jitter   = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
278 /* Journaled updates */
279 #define JOURNAL_REPLAY(s) ((s) == NULL)
280 #define JOURNAL_BASE "rrd.journal"
281 static journal_set *journal_cur = NULL;
282 static journal_set *journal_old = NULL;
283 static char *journal_dir = NULL;
284 static FILE *journal_fh = NULL;         /* current journal file handle */
285 static long  journal_size = 0;          /* current journal size */
286 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
287 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
288 static int journal_write(char *cmd, char *args);
289 static void journal_done(void);
290 static void journal_rotate(void);
292 /* prototypes for forward refernces */
293 static int handle_request_help (HANDLER_PROTO);
295 /* 
296  * Functions
297  */
298 static void sig_common (const char *sig) /* {{{ */
300   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
301   state = FLUSHING;
302   pthread_cond_broadcast(&flush_cond);
303   pthread_cond_broadcast(&queue_cond);
304 } /* }}} void sig_common */
306 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
308   sig_common("INT");
309 } /* }}} void sig_int_handler */
311 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
313   sig_common("TERM");
314 } /* }}} void sig_term_handler */
316 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
318   config_flush_at_shutdown = 1;
319   sig_common("USR1");
320 } /* }}} void sig_usr1_handler */
322 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
324   config_flush_at_shutdown = 0;
325   sig_common("USR2");
326 } /* }}} void sig_usr2_handler */
328 static void install_signal_handlers(void) /* {{{ */
330   /* These structures are static, because `sigaction' behaves weird if the are
331    * overwritten.. */
332   static struct sigaction sa_int;
333   static struct sigaction sa_term;
334   static struct sigaction sa_pipe;
335   static struct sigaction sa_usr1;
336   static struct sigaction sa_usr2;
338   /* Install signal handlers */
339   memset (&sa_int, 0, sizeof (sa_int));
340   sa_int.sa_handler = sig_int_handler;
341   sigaction (SIGINT, &sa_int, NULL);
343   memset (&sa_term, 0, sizeof (sa_term));
344   sa_term.sa_handler = sig_term_handler;
345   sigaction (SIGTERM, &sa_term, NULL);
347   memset (&sa_pipe, 0, sizeof (sa_pipe));
348   sa_pipe.sa_handler = SIG_IGN;
349   sigaction (SIGPIPE, &sa_pipe, NULL);
351   memset (&sa_pipe, 0, sizeof (sa_usr1));
352   sa_usr1.sa_handler = sig_usr1_handler;
353   sigaction (SIGUSR1, &sa_usr1, NULL);
355   memset (&sa_usr2, 0, sizeof (sa_usr2));
356   sa_usr2.sa_handler = sig_usr2_handler;
357   sigaction (SIGUSR2, &sa_usr2, NULL);
359 } /* }}} void install_signal_handlers */
361 static int open_pidfile(char *action, int oflag) /* {{{ */
363   int fd;
364   const char *file;
365   char *file_copy, *dir;
367   file = (config_pid_file != NULL)
368     ? config_pid_file
369     : LOCALSTATEDIR "/run/rrdcached.pid";
371   /* dirname may modify its argument */
372   file_copy = strdup(file);
373   if (file_copy == NULL)
374   {
375     fprintf(stderr, "rrdcached: strdup(): %s\n",
376         rrd_strerror(errno));
377     return -1;
378   }
380   dir = dirname(file_copy);
381   if (rrd_mkdir_p(dir, 0777) != 0)
382   {
383     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
384         dir, rrd_strerror(errno));
385     return -1;
386   }
388   free(file_copy);
390   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
391   if (fd < 0)
392     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
393             action, file, rrd_strerror(errno));
395   return(fd);
396 } /* }}} static int open_pidfile */
398 /* check existing pid file to see whether a daemon is running */
399 static int check_pidfile(void)
401   int pid_fd;
402   pid_t pid;
403   char pid_str[16];
405   pid_fd = open_pidfile("open", O_RDWR);
406   if (pid_fd < 0)
407     return pid_fd;
409   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
410     return -1;
412   pid = atoi(pid_str);
413   if (pid <= 0)
414     return -1;
416   /* another running process that we can signal COULD be
417    * a competing rrdcached */
418   if (pid != getpid() && kill(pid, 0) == 0)
419   {
420     fprintf(stderr,
421             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
422     close(pid_fd);
423     return -1;
424   }
426   lseek(pid_fd, 0, SEEK_SET);
427   if (ftruncate(pid_fd, 0) == -1)
428   {
429     fprintf(stderr,
430             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
431     close(pid_fd);
432     return -1;
433   }
435   fprintf(stderr,
436           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
437           "rrdcached: starting normally.\n", pid);
439   return pid_fd;
440 } /* }}} static int check_pidfile */
442 static int write_pidfile (int fd) /* {{{ */
444   pid_t pid;
445   FILE *fh;
447   pid = getpid ();
449   fh = fdopen (fd, "w");
450   if (fh == NULL)
451   {
452     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
453     close(fd);
454     return (-1);
455   }
457   fprintf (fh, "%i\n", (int) pid);
458   fclose (fh);
460   return (0);
461 } /* }}} int write_pidfile */
463 static int remove_pidfile (void) /* {{{ */
465   char *file;
466   int status;
468   file = (config_pid_file != NULL)
469     ? config_pid_file
470     : LOCALSTATEDIR "/run/rrdcached.pid";
472   status = unlink (file);
473   if (status == 0)
474     return (0);
475   return (errno);
476 } /* }}} int remove_pidfile */
478 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
480   char *eol;
482   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
483                sock->next_read - sock->next_cmd);
485   if (eol == NULL)
486   {
487     /* no commands left, move remainder back to front of rbuf */
488     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
489             sock->next_read - sock->next_cmd);
490     sock->next_read -= sock->next_cmd;
491     sock->next_cmd = 0;
492     *len = 0;
493     return NULL;
494   }
495   else
496   {
497     char *cmd = sock->rbuf + sock->next_cmd;
498     *eol = '\0';
500     sock->next_cmd = eol - sock->rbuf + 1;
502     if (eol > sock->rbuf && *(eol-1) == '\r')
503       *(--eol) = '\0'; /* handle "\r\n" EOL */
505     *len = eol - cmd;
507     return cmd;
508   }
510   /* NOTREACHED */
511   assert(1==0);
512 } /* }}} char *next_cmd */
514 /* add the characters directly to the write buffer */
515 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
517   char *new_buf;
519   assert(sock != NULL);
521   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
522   if (new_buf == NULL)
523   {
524     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
525     return -1;
526   }
528   strncpy(new_buf + sock->wbuf_len, str, len + 1);
530   sock->wbuf = new_buf;
531   sock->wbuf_len += len;
533   return 0;
534 } /* }}} static int add_to_wbuf */
536 /* add the text to the "extra" info that's sent after the status line */
537 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
539   va_list argp;
540   char buffer[CMD_MAX];
541   int len;
543   if (JOURNAL_REPLAY(sock)) return 0;
544   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
546   va_start(argp, fmt);
547 #ifdef HAVE_VSNPRINTF
548   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
549 #else
550   len = vsprintf(buffer, fmt, argp);
551 #endif
552   va_end(argp);
553   if (len < 0)
554   {
555     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
556     return -1;
557   }
559   return add_to_wbuf(sock, buffer, len);
560 } /* }}} static int add_response_info */
562 static int count_lines(char *str) /* {{{ */
564   int lines = 0;
566   if (str != NULL)
567   {
568     while ((str = strchr(str, '\n')) != NULL)
569     {
570       ++lines;
571       ++str;
572     }
573   }
575   return lines;
576 } /* }}} static int count_lines */
578 /* send the response back to the user.
579  * returns 0 on success, -1 on error
580  * write buffer is always zeroed after this call */
581 static int send_response (listen_socket_t *sock, response_code rc,
582                           char *fmt, ...) /* {{{ */
584   va_list argp;
585   char buffer[CMD_MAX];
586   int lines;
587   ssize_t wrote;
588   int rclen, len;
590   if (JOURNAL_REPLAY(sock)) return rc;
592   if (sock->batch_start)
593   {
594     if (rc == RESP_OK)
595       return rc; /* no response on success during BATCH */
596     lines = sock->batch_cmd;
597   }
598   else if (rc == RESP_OK)
599     lines = count_lines(sock->wbuf);
600   else
601     lines = -1;
603   rclen = sprintf(buffer, "%d ", lines);
604   va_start(argp, fmt);
605 #ifdef HAVE_VSNPRINTF
606   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
607 #else
608   len = vsprintf(buffer+rclen, fmt, argp);
609 #endif
610   va_end(argp);
611   if (len < 0)
612     return -1;
614   len += rclen;
616   /* append the result to the wbuf, don't write to the user */
617   if (sock->batch_start)
618     return add_to_wbuf(sock, buffer, len);
620   /* first write must be complete */
621   if (len != write(sock->fd, buffer, len))
622   {
623     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
624     return -1;
625   }
627   if (sock->wbuf != NULL && rc == RESP_OK)
628   {
629     wrote = 0;
630     while (wrote < sock->wbuf_len)
631     {
632       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
633       if (wb <= 0)
634       {
635         RRDD_LOG(LOG_INFO, "send_response: could not write results");
636         return -1;
637       }
638       wrote += wb;
639     }
640   }
642   free(sock->wbuf); sock->wbuf = NULL;
643   sock->wbuf_len = 0;
645   return 0;
646 } /* }}} */
648 static void wipe_ci_values(cache_item_t *ci, time_t when)
650   ci->values = NULL;
651   ci->values_num = 0;
652   ci->values_alloc = 0;
654   ci->last_flush_time = when;
655   if (config_write_jitter > 0)
656     ci->last_flush_time += (rrd_random() % config_write_jitter);
659 /* remove_from_queue
660  * remove a "cache_item_t" item from the queue.
661  * must hold 'cache_lock' when calling this
662  */
663 static void remove_from_queue(cache_item_t *ci) /* {{{ */
665   if (ci == NULL) return;
666   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
668   if (ci->prev == NULL)
669     cache_queue_head = ci->next; /* reset head */
670   else
671     ci->prev->next = ci->next;
673   if (ci->next == NULL)
674     cache_queue_tail = ci->prev; /* reset the tail */
675   else
676     ci->next->prev = ci->prev;
678   ci->next = ci->prev = NULL;
679   ci->flags &= ~CI_FLAGS_IN_QUEUE;
681   pthread_mutex_lock (&stats_lock);
682   assert (stats_queue_length > 0);
683   stats_queue_length--;
684   pthread_mutex_unlock (&stats_lock);
686 } /* }}} static void remove_from_queue */
688 /* free the resources associated with the cache_item_t
689  * must hold cache_lock when calling this function
690  */
691 static void *free_cache_item(cache_item_t *ci) /* {{{ */
693   if (ci == NULL) return NULL;
695   remove_from_queue(ci);
697   for (size_t i=0; i < ci->values_num; i++)
698     free(ci->values[i]);
700   free (ci->values);
701   free (ci->file);
703   /* in case anyone is waiting */
704   pthread_cond_broadcast(&ci->flushed);
705   pthread_cond_destroy(&ci->flushed);
707   free (ci);
709   return NULL;
710 } /* }}} static void *free_cache_item */
712 /*
713  * enqueue_cache_item:
714  * `cache_lock' must be acquired before calling this function!
715  */
716 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
717     queue_side_t side)
719   if (ci == NULL)
720     return (-1);
722   if (ci->values_num == 0)
723     return (0);
725   if (side == HEAD)
726   {
727     if (cache_queue_head == ci)
728       return 0;
730     /* remove if further down in queue */
731     remove_from_queue(ci);
733     ci->prev = NULL;
734     ci->next = cache_queue_head;
735     if (ci->next != NULL)
736       ci->next->prev = ci;
737     cache_queue_head = ci;
739     if (cache_queue_tail == NULL)
740       cache_queue_tail = cache_queue_head;
741   }
742   else /* (side == TAIL) */
743   {
744     /* We don't move values back in the list.. */
745     if (ci->flags & CI_FLAGS_IN_QUEUE)
746       return (0);
748     assert (ci->next == NULL);
749     assert (ci->prev == NULL);
751     ci->prev = cache_queue_tail;
753     if (cache_queue_tail == NULL)
754       cache_queue_head = ci;
755     else
756       cache_queue_tail->next = ci;
758     cache_queue_tail = ci;
759   }
761   ci->flags |= CI_FLAGS_IN_QUEUE;
763   pthread_cond_signal(&queue_cond);
764   pthread_mutex_lock (&stats_lock);
765   stats_queue_length++;
766   pthread_mutex_unlock (&stats_lock);
768   return (0);
769 } /* }}} int enqueue_cache_item */
771 /*
772  * tree_callback_flush:
773  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
774  * while this is in progress.
775  */
776 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
777     gpointer data)
779   cache_item_t *ci;
780   callback_flush_data_t *cfd;
782   ci = (cache_item_t *) value;
783   cfd = (callback_flush_data_t *) data;
785   if (ci->flags & CI_FLAGS_IN_QUEUE)
786     return FALSE;
788   if (ci->values_num > 0
789       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
790   {
791     enqueue_cache_item (ci, TAIL);
792   }
793   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
794       && (ci->values_num <= 0))
795   {
796     assert ((char *) key == ci->file);
797     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
798     {
799       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
800       return (FALSE);
801     }
802   }
804   return (FALSE);
805 } /* }}} gboolean tree_callback_flush */
807 static int flush_old_values (int max_age)
809   callback_flush_data_t cfd;
810   size_t k;
812   memset (&cfd, 0, sizeof (cfd));
813   /* Pass the current time as user data so that we don't need to call
814    * `time' for each node. */
815   cfd.now = time (NULL);
816   cfd.keys = NULL;
817   cfd.keys_num = 0;
819   if (max_age > 0)
820     cfd.abs_timeout = cfd.now - max_age;
821   else
822     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
824   /* `tree_callback_flush' will return the keys of all values that haven't
825    * been touched in the last `config_flush_interval' seconds in `cfd'.
826    * The char*'s in this array point to the same memory as ci->file, so we
827    * don't need to free them separately. */
828   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
830   for (k = 0; k < cfd.keys_num; k++)
831   {
832     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
833     /* should never fail, since we have held the cache_lock
834      * the entire time */
835     assert(status == TRUE);
836   }
838   if (cfd.keys != NULL)
839   {
840     free (cfd.keys);
841     cfd.keys = NULL;
842   }
844   return (0);
845 } /* int flush_old_values */
847 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
849   struct timeval now;
850   struct timespec next_flush;
851   int status;
853   gettimeofday (&now, NULL);
854   next_flush.tv_sec = now.tv_sec + config_flush_interval;
855   next_flush.tv_nsec = 1000 * now.tv_usec;
857   pthread_mutex_lock(&cache_lock);
859   while (state == RUNNING)
860   {
861     gettimeofday (&now, NULL);
862     if ((now.tv_sec > next_flush.tv_sec)
863         || ((now.tv_sec == next_flush.tv_sec)
864           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
865     {
866       RRDD_LOG(LOG_DEBUG, "flushing old values");
868       /* Determine the time of the next cache flush. */
869       next_flush.tv_sec = now.tv_sec + config_flush_interval;
871       /* Flush all values that haven't been written in the last
872        * `config_write_interval' seconds. */
873       flush_old_values (config_write_interval);
875       /* unlock the cache while we rotate so we don't block incoming
876        * updates if the fsync() blocks on disk I/O */
877       pthread_mutex_unlock(&cache_lock);
878       journal_rotate();
879       pthread_mutex_lock(&cache_lock);
880     }
882     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
883     if (status != 0 && status != ETIMEDOUT)
884     {
885       RRDD_LOG (LOG_ERR, "flush_thread_main: "
886                 "pthread_cond_timedwait returned %i.", status);
887     }
888   }
890   if (config_flush_at_shutdown)
891     flush_old_values (-1); /* flush everything */
893   state = SHUTDOWN;
895   pthread_mutex_unlock(&cache_lock);
897   return NULL;
898 } /* void *flush_thread_main */
900 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
902   pthread_mutex_lock (&cache_lock);
904   while (state != SHUTDOWN
905          || (cache_queue_head != NULL && config_flush_at_shutdown))
906   {
907     cache_item_t *ci;
908     char *file;
909     char **values;
910     size_t values_num;
911     int status;
913     /* Now, check if there's something to store away. If not, wait until
914      * something comes in. */
915     if (cache_queue_head == NULL)
916     {
917       status = pthread_cond_wait (&queue_cond, &cache_lock);
918       if ((status != 0) && (status != ETIMEDOUT))
919       {
920         RRDD_LOG (LOG_ERR, "queue_thread_main: "
921             "pthread_cond_wait returned %i.", status);
922       }
923     }
925     /* Check if a value has arrived. This may be NULL if we timed out or there
926      * was an interrupt such as a signal. */
927     if (cache_queue_head == NULL)
928       continue;
930     ci = cache_queue_head;
932     /* copy the relevant parts */
933     file = strdup (ci->file);
934     if (file == NULL)
935     {
936       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
937       continue;
938     }
940     assert(ci->values != NULL);
941     assert(ci->values_num > 0);
943     values = ci->values;
944     values_num = ci->values_num;
946     wipe_ci_values(ci, time(NULL));
947     remove_from_queue(ci);
949     pthread_mutex_unlock (&cache_lock);
951     rrd_clear_error ();
952     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
953     if (status != 0)
954     {
955       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
956           "rrd_update_r (%s) failed with status %i. (%s)",
957           file, status, rrd_get_error());
958     }
960     journal_write("wrote", file);
962     /* Search again in the tree.  It's possible someone issued a "FORGET"
963      * while we were writing the update values. */
964     pthread_mutex_lock(&cache_lock);
965     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
966     if (ci)
967       pthread_cond_broadcast(&ci->flushed);
968     pthread_mutex_unlock(&cache_lock);
970     if (status == 0)
971     {
972       pthread_mutex_lock (&stats_lock);
973       stats_updates_written++;
974       stats_data_sets_written += values_num;
975       pthread_mutex_unlock (&stats_lock);
976     }
978     rrd_free_ptrs((void ***) &values, &values_num);
979     free(file);
981     pthread_mutex_lock (&cache_lock);
982   }
983   pthread_mutex_unlock (&cache_lock);
985   return (NULL);
986 } /* }}} void *queue_thread_main */
988 static int buffer_get_field (char **buffer_ret, /* {{{ */
989     size_t *buffer_size_ret, char **field_ret)
991   char *buffer;
992   size_t buffer_pos;
993   size_t buffer_size;
994   char *field;
995   size_t field_size;
996   int status;
998   buffer = *buffer_ret;
999   buffer_pos = 0;
1000   buffer_size = *buffer_size_ret;
1001   field = *buffer_ret;
1002   field_size = 0;
1004   if (buffer_size <= 0)
1005     return (-1);
1007   /* This is ensured by `handle_request'. */
1008   assert (buffer[buffer_size - 1] == '\0');
1010   status = -1;
1011   while (buffer_pos < buffer_size)
1012   {
1013     /* Check for end-of-field or end-of-buffer */
1014     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1015     {
1016       field[field_size] = 0;
1017       field_size++;
1018       buffer_pos++;
1019       status = 0;
1020       break;
1021     }
1022     /* Handle escaped characters. */
1023     else if (buffer[buffer_pos] == '\\')
1024     {
1025       if (buffer_pos >= (buffer_size - 1))
1026         break;
1027       buffer_pos++;
1028       field[field_size] = buffer[buffer_pos];
1029       field_size++;
1030       buffer_pos++;
1031     }
1032     /* Normal operation */ 
1033     else
1034     {
1035       field[field_size] = buffer[buffer_pos];
1036       field_size++;
1037       buffer_pos++;
1038     }
1039   } /* while (buffer_pos < buffer_size) */
1041   if (status != 0)
1042     return (status);
1044   *buffer_ret = buffer + buffer_pos;
1045   *buffer_size_ret = buffer_size - buffer_pos;
1046   *field_ret = field;
1048   return (0);
1049 } /* }}} int buffer_get_field */
1051 /* if we're restricting writes to the base directory,
1052  * check whether the file falls within the dir
1053  * returns 1 if OK, otherwise 0
1054  */
1055 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1057   assert(file != NULL);
1059   if (!config_write_base_only
1060       || JOURNAL_REPLAY(sock)
1061       || config_base_dir == NULL)
1062     return 1;
1064   if (strstr(file, "../") != NULL) goto err;
1066   /* relative paths without "../" are ok */
1067   if (*file != '/') return 1;
1069   /* file must be of the format base + "/" + <1+ char filename> */
1070   if (strlen(file) < _config_base_dir_len + 2) goto err;
1071   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1072   if (*(file + _config_base_dir_len) != '/') goto err;
1074   return 1;
1076 err:
1077   if (sock != NULL && sock->fd >= 0)
1078     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1080   return 0;
1081 } /* }}} static int check_file_access */
1083 /* when using a base dir, convert relative paths to absolute paths.
1084  * if necessary, modifies the "filename" pointer to point
1085  * to the new path created in "tmp".  "tmp" is provided
1086  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1087  *
1088  * this allows us to optimize for the expected case (absolute path)
1089  * with a no-op.
1090  */
1091 static void get_abs_path(char **filename, char *tmp)
1093   assert(tmp != NULL);
1094   assert(filename != NULL && *filename != NULL);
1096   if (config_base_dir == NULL || **filename == '/')
1097     return;
1099   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1100   *filename = tmp;
1101 } /* }}} static int get_abs_path */
1103 static int flush_file (const char *filename) /* {{{ */
1105   cache_item_t *ci;
1107   pthread_mutex_lock (&cache_lock);
1109   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1110   if (ci == NULL)
1111   {
1112     pthread_mutex_unlock (&cache_lock);
1113     return (ENOENT);
1114   }
1116   if (ci->values_num > 0)
1117   {
1118     /* Enqueue at head */
1119     enqueue_cache_item (ci, HEAD);
1120     pthread_cond_wait(&ci->flushed, &cache_lock);
1121   }
1123   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1124    * may have been purged during our cond_wait() */
1126   pthread_mutex_unlock(&cache_lock);
1128   return (0);
1129 } /* }}} int flush_file */
1131 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1133   char *err = "Syntax error.\n";
1135   if (cmd && cmd->syntax)
1136     err = cmd->syntax;
1138   return send_response(sock, RESP_ERR, "Usage: %s", err);
1139 } /* }}} static int syntax_error() */
1141 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1143   uint64_t copy_queue_length;
1144   uint64_t copy_updates_received;
1145   uint64_t copy_flush_received;
1146   uint64_t copy_updates_written;
1147   uint64_t copy_data_sets_written;
1148   uint64_t copy_journal_bytes;
1149   uint64_t copy_journal_rotate;
1151   uint64_t tree_nodes_number;
1152   uint64_t tree_depth;
1154   pthread_mutex_lock (&stats_lock);
1155   copy_queue_length       = stats_queue_length;
1156   copy_updates_received   = stats_updates_received;
1157   copy_flush_received     = stats_flush_received;
1158   copy_updates_written    = stats_updates_written;
1159   copy_data_sets_written  = stats_data_sets_written;
1160   copy_journal_bytes      = stats_journal_bytes;
1161   copy_journal_rotate     = stats_journal_rotate;
1162   pthread_mutex_unlock (&stats_lock);
1164   pthread_mutex_lock (&cache_lock);
1165   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1166   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1167   pthread_mutex_unlock (&cache_lock);
1169   add_response_info(sock,
1170                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1171   add_response_info(sock,
1172                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1173   add_response_info(sock,
1174                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1175   add_response_info(sock,
1176                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1177   add_response_info(sock,
1178                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1179   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1180   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1181   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1182   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1184   send_response(sock, RESP_OK, "Statistics follow\n");
1186   return (0);
1187 } /* }}} int handle_request_stats */
1189 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1191   char *file, file_tmp[PATH_MAX];
1192   int status;
1194   status = buffer_get_field (&buffer, &buffer_size, &file);
1195   if (status != 0)
1196   {
1197     return syntax_error(sock,cmd);
1198   }
1199   else
1200   {
1201     pthread_mutex_lock(&stats_lock);
1202     stats_flush_received++;
1203     pthread_mutex_unlock(&stats_lock);
1205     get_abs_path(&file, file_tmp);
1206     if (!check_file_access(file, sock)) return 0;
1208     status = flush_file (file);
1209     if (status == 0)
1210       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1211     else if (status == ENOENT)
1212     {
1213       /* no file in our tree; see whether it exists at all */
1214       struct stat statbuf;
1216       memset(&statbuf, 0, sizeof(statbuf));
1217       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1218         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1219       else
1220         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1221     }
1222     else if (status < 0)
1223       return send_response(sock, RESP_ERR, "Internal error.\n");
1224     else
1225       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1226   }
1228   /* NOTREACHED */
1229   assert(1==0);
1230 } /* }}} int handle_request_flush */
1232 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1234   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1236   pthread_mutex_lock(&cache_lock);
1237   flush_old_values(-1);
1238   pthread_mutex_unlock(&cache_lock);
1240   return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1243 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1245   int status;
1246   char *file, file_tmp[PATH_MAX];
1247   cache_item_t *ci;
1249   status = buffer_get_field(&buffer, &buffer_size, &file);
1250   if (status != 0)
1251     return syntax_error(sock,cmd);
1253   get_abs_path(&file, file_tmp);
1255   pthread_mutex_lock(&cache_lock);
1256   ci = g_tree_lookup(cache_tree, file);
1257   if (ci == NULL)
1258   {
1259     pthread_mutex_unlock(&cache_lock);
1260     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1261   }
1263   for (size_t i=0; i < ci->values_num; i++)
1264     add_response_info(sock, "%s\n", ci->values[i]);
1266   pthread_mutex_unlock(&cache_lock);
1267   return send_response(sock, RESP_OK, "updates pending\n");
1268 } /* }}} static int handle_request_pending */
1270 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1272   int status;
1273   gboolean found;
1274   char *file, file_tmp[PATH_MAX];
1276   status = buffer_get_field(&buffer, &buffer_size, &file);
1277   if (status != 0)
1278     return syntax_error(sock,cmd);
1280   get_abs_path(&file, file_tmp);
1281   if (!check_file_access(file, sock)) return 0;
1283   pthread_mutex_lock(&cache_lock);
1284   found = g_tree_remove(cache_tree, file);
1285   pthread_mutex_unlock(&cache_lock);
1287   if (found == TRUE)
1288   {
1289     if (!JOURNAL_REPLAY(sock))
1290       journal_write("forget", file);
1292     return send_response(sock, RESP_OK, "Gone!\n");
1293   }
1294   else
1295     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1297   /* NOTREACHED */
1298   assert(1==0);
1299 } /* }}} static int handle_request_forget */
1301 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1303   cache_item_t *ci;
1305   pthread_mutex_lock(&cache_lock);
1307   ci = cache_queue_head;
1308   while (ci != NULL)
1309   {
1310     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1311     ci = ci->next;
1312   }
1314   pthread_mutex_unlock(&cache_lock);
1316   return send_response(sock, RESP_OK, "in queue.\n");
1317 } /* }}} int handle_request_queue */
1319 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1321   char *file, file_tmp[PATH_MAX];
1322   int values_num = 0;
1323   int status;
1324   char orig_buf[CMD_MAX];
1326   cache_item_t *ci;
1328   /* save it for the journal later */
1329   if (!JOURNAL_REPLAY(sock))
1330     strncpy(orig_buf, buffer, buffer_size);
1332   status = buffer_get_field (&buffer, &buffer_size, &file);
1333   if (status != 0)
1334     return syntax_error(sock,cmd);
1336   pthread_mutex_lock(&stats_lock);
1337   stats_updates_received++;
1338   pthread_mutex_unlock(&stats_lock);
1340   get_abs_path(&file, file_tmp);
1341   if (!check_file_access(file, sock)) return 0;
1343   pthread_mutex_lock (&cache_lock);
1344   ci = g_tree_lookup (cache_tree, file);
1346   if (ci == NULL) /* {{{ */
1347   {
1348     struct stat statbuf;
1349     cache_item_t *tmp;
1351     /* don't hold the lock while we setup; stat(2) might block */
1352     pthread_mutex_unlock(&cache_lock);
1354     memset (&statbuf, 0, sizeof (statbuf));
1355     status = stat (file, &statbuf);
1356     if (status != 0)
1357     {
1358       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1360       status = errno;
1361       if (status == ENOENT)
1362         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1363       else
1364         return send_response(sock, RESP_ERR,
1365                              "stat failed with error %i.\n", status);
1366     }
1367     if (!S_ISREG (statbuf.st_mode))
1368       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1370     if (access(file, R_OK|W_OK) != 0)
1371       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1372                            file, rrd_strerror(errno));
1374     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1375     if (ci == NULL)
1376     {
1377       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1379       return send_response(sock, RESP_ERR, "malloc failed.\n");
1380     }
1381     memset (ci, 0, sizeof (cache_item_t));
1383     ci->file = strdup (file);
1384     if (ci->file == NULL)
1385     {
1386       free (ci);
1387       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1389       return send_response(sock, RESP_ERR, "strdup failed.\n");
1390     }
1392     wipe_ci_values(ci, now);
1393     ci->flags = CI_FLAGS_IN_TREE;
1394     pthread_cond_init(&ci->flushed, NULL);
1396     pthread_mutex_lock(&cache_lock);
1398     /* another UPDATE might have added this entry in the meantime */
1399     tmp = g_tree_lookup (cache_tree, file);
1400     if (tmp == NULL)
1401       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1402     else
1403     {
1404       free_cache_item (ci);
1405       ci = tmp;
1406     }
1408     /* state may have changed while we were unlocked */
1409     if (state == SHUTDOWN)
1410       return -1;
1411   } /* }}} */
1412   assert (ci != NULL);
1414   /* don't re-write updates in replay mode */
1415   if (!JOURNAL_REPLAY(sock))
1416     journal_write("update", orig_buf);
1418   while (buffer_size > 0)
1419   {
1420     char *value;
1421     time_t stamp;
1422     char *eostamp;
1424     status = buffer_get_field (&buffer, &buffer_size, &value);
1425     if (status != 0)
1426     {
1427       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1428       break;
1429     }
1431     /* make sure update time is always moving forward */
1432     stamp = strtol(value, &eostamp, 10);
1433     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434     {
1435       pthread_mutex_unlock(&cache_lock);
1436       return send_response(sock, RESP_ERR,
1437                            "Cannot find timestamp in '%s'!\n", value);
1438     }
1439     else if (stamp <= ci->last_update_stamp)
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "illegal attempt to update using time %ld when last"
1444                            " update time is %ld (minimum one second step)\n",
1445                            stamp, ci->last_update_stamp);
1446     }
1447     else
1448       ci->last_update_stamp = stamp;
1450     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451                               &ci->values_alloc, config_alloc_chunk))
1452     {
1453       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454       continue;
1455     }
1457     values_num++;
1458   }
1460   if (((now - ci->last_flush_time) >= config_write_interval)
1461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462       && (ci->values_num > 0))
1463   {
1464     enqueue_cache_item (ci, TAIL);
1465   }
1467   pthread_mutex_unlock (&cache_lock);
1469   if (values_num < 1)
1470     return send_response(sock, RESP_ERR, "No values updated.\n");
1471   else
1472     return send_response(sock, RESP_OK,
1473                          "errors, enqueued %i value(s).\n", values_num);
1475   /* NOTREACHED */
1476   assert(1==0);
1478 } /* }}} int handle_request_update */
1480 /* we came across a "WROTE" entry during journal replay.
1481  * throw away any values that we have accumulated for this file
1482  */
1483 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1485   cache_item_t *ci;
1486   const char *file = buffer;
1488   pthread_mutex_lock(&cache_lock);
1490   ci = g_tree_lookup(cache_tree, file);
1491   if (ci == NULL)
1492   {
1493     pthread_mutex_unlock(&cache_lock);
1494     return (0);
1495   }
1497   if (ci->values)
1498     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1500   wipe_ci_values(ci, now);
1501   remove_from_queue(ci);
1503   pthread_mutex_unlock(&cache_lock);
1504   return (0);
1505 } /* }}} int handle_request_wrote */
1507 /* start "BATCH" processing */
1508 static int batch_start (HANDLER_PROTO) /* {{{ */
1510   int status;
1511   if (sock->batch_start)
1512     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1514   status = send_response(sock, RESP_OK,
1515                          "Go ahead.  End with dot '.' on its own line.\n");
1516   sock->batch_start = time(NULL);
1517   sock->batch_cmd = 0;
1519   return status;
1520 } /* }}} static int batch_start */
1522 /* finish "BATCH" processing and return results to the client */
1523 static int batch_done (HANDLER_PROTO) /* {{{ */
1525   assert(sock->batch_start);
1526   sock->batch_start = 0;
1527   sock->batch_cmd  = 0;
1528   return send_response(sock, RESP_OK, "errors\n");
1529 } /* }}} static int batch_done */
1531 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1533   return -1;
1534 } /* }}} static int handle_request_quit */
1536 static command_t list_of_commands[] = { /* {{{ */
1537   {
1538     "UPDATE",
1539     handle_request_update,
1540     CMD_CONTEXT_ANY,
1541     "UPDATE <filename> <values> [<values> ...]\n"
1542     ,
1543     "Adds the given file to the internal cache if it is not yet known and\n"
1544     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1545     "for details.\n"
1546     "\n"
1547     "Each <values> has the following form:\n"
1548     "  <values> = <time>:<value>[:<value>[...]]\n"
1549     "See the rrdupdate(1) manpage for details.\n"
1550   },
1551   {
1552     "WROTE",
1553     handle_request_wrote,
1554     CMD_CONTEXT_JOURNAL,
1555     NULL,
1556     NULL
1557   },
1558   {
1559     "FLUSH",
1560     handle_request_flush,
1561     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1562     "FLUSH <filename>\n"
1563     ,
1564     "Adds the given filename to the head of the update queue and returns\n"
1565     "after it has been dequeued.\n"
1566   },
1567   {
1568     "FLUSHALL",
1569     handle_request_flushall,
1570     CMD_CONTEXT_CLIENT,
1571     "FLUSHALL\n"
1572     ,
1573     "Triggers writing of all pending updates.  Returns immediately.\n"
1574   },
1575   {
1576     "PENDING",
1577     handle_request_pending,
1578     CMD_CONTEXT_CLIENT,
1579     "PENDING <filename>\n"
1580     ,
1581     "Shows any 'pending' updates for a file, in order.\n"
1582     "The updates shown have not yet been written to the underlying RRD file.\n"
1583   },
1584   {
1585     "FORGET",
1586     handle_request_forget,
1587     CMD_CONTEXT_ANY,
1588     "FORGET <filename>\n"
1589     ,
1590     "Removes the file completely from the cache.\n"
1591     "Any pending updates for the file will be lost.\n"
1592   },
1593   {
1594     "QUEUE",
1595     handle_request_queue,
1596     CMD_CONTEXT_CLIENT,
1597     "QUEUE\n"
1598     ,
1599         "Shows all files in the output queue.\n"
1600     "The output is zero or more lines in the following format:\n"
1601     "(where <num_vals> is the number of values to be written)\n"
1602     "\n"
1603     "<num_vals> <filename>\n"
1604   },
1605   {
1606     "STATS",
1607     handle_request_stats,
1608     CMD_CONTEXT_CLIENT,
1609     "STATS\n"
1610     ,
1611     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1612     "a description of the values.\n"
1613   },
1614   {
1615     "HELP",
1616     handle_request_help,
1617     CMD_CONTEXT_CLIENT,
1618     "HELP [<command>]\n",
1619     NULL, /* special! */
1620   },
1621   {
1622     "BATCH",
1623     batch_start,
1624     CMD_CONTEXT_CLIENT,
1625     "BATCH\n"
1626     ,
1627     "The 'BATCH' command permits the client to initiate a bulk load\n"
1628     "   of commands to rrdcached.\n"
1629     "\n"
1630     "Usage:\n"
1631     "\n"
1632     "    client: BATCH\n"
1633     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1634     "    client: command #1\n"
1635     "    client: command #2\n"
1636     "    client: ... and so on\n"
1637     "    client: .\n"
1638     "    server: 2 errors\n"
1639     "    server: 7 message for command #7\n"
1640     "    server: 9 message for command #9\n"
1641     "\n"
1642     "For more information, consult the rrdcached(1) documentation.\n"
1643   },
1644   {
1645     ".",   /* BATCH terminator */
1646     batch_done,
1647     CMD_CONTEXT_BATCH,
1648     NULL,
1649     NULL
1650   },
1651   {
1652     "QUIT",
1653     handle_request_quit,
1654     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1655     "QUIT\n"
1656     ,
1657     "Disconnect from rrdcached.\n"
1658   }
1659 }; /* }}} command_t list_of_commands[] */
1660 static size_t list_of_commands_len = sizeof (list_of_commands)
1661   / sizeof (list_of_commands[0]);
1663 static command_t *find_command(char *cmd)
1665   size_t i;
1667   for (i = 0; i < list_of_commands_len; i++)
1668     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1669       return (&list_of_commands[i]);
1670   return NULL;
1673 /* We currently use the index in the `list_of_commands' array as a bit position
1674  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1675  * outside these functions so that switching to a more elegant storage method
1676  * is easily possible. */
1677 static ssize_t find_command_index (const char *cmd) /* {{{ */
1679   size_t i;
1681   for (i = 0; i < list_of_commands_len; i++)
1682     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1683       return ((ssize_t) i);
1684   return (-1);
1685 } /* }}} ssize_t find_command_index */
1687 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1688     const char *cmd)
1690   ssize_t i;
1692   if (JOURNAL_REPLAY(sock))
1693     return (1);
1695   if (cmd == NULL)
1696     return (-1);
1698   if ((strcasecmp ("QUIT", cmd) == 0)
1699       || (strcasecmp ("HELP", cmd) == 0))
1700     return (1);
1701   else if (strcmp (".", cmd) == 0)
1702     cmd = "BATCH";
1704   i = find_command_index (cmd);
1705   if (i < 0)
1706     return (-1);
1707   assert (i < 32);
1709   if ((sock->permissions & (1 << i)) != 0)
1710     return (1);
1711   return (0);
1712 } /* }}} int socket_permission_check */
1714 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1715     const char *cmd)
1717   ssize_t i;
1719   i = find_command_index (cmd);
1720   if (i < 0)
1721     return (-1);
1722   assert (i < 32);
1724   sock->permissions |= (1 << i);
1725   return (0);
1726 } /* }}} int socket_permission_add */
1728 /* check whether commands are received in the expected context */
1729 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1731   if (JOURNAL_REPLAY(sock))
1732     return (cmd->context & CMD_CONTEXT_JOURNAL);
1733   else if (sock->batch_start)
1734     return (cmd->context & CMD_CONTEXT_BATCH);
1735   else
1736     return (cmd->context & CMD_CONTEXT_CLIENT);
1738   /* NOTREACHED */
1739   assert(1==0);
1742 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1744   int status;
1745   char *cmd_str;
1746   char *resp_txt;
1747   command_t *help = NULL;
1749   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1750   if (status == 0)
1751     help = find_command(cmd_str);
1753   if (help && (help->syntax || help->help))
1754   {
1755     char tmp[CMD_MAX];
1757     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1758     resp_txt = tmp;
1760     if (help->syntax)
1761       add_response_info(sock, "Usage: %s\n", help->syntax);
1763     if (help->help)
1764       add_response_info(sock, "%s\n", help->help);
1765   }
1766   else
1767   {
1768     size_t i;
1770     resp_txt = "Command overview\n";
1772     for (i = 0; i < list_of_commands_len; i++)
1773     {
1774       if (list_of_commands[i].syntax == NULL)
1775         continue;
1776       add_response_info (sock, "%s", list_of_commands[i].syntax);
1777     }
1778   }
1780   return send_response(sock, RESP_OK, resp_txt);
1781 } /* }}} int handle_request_help */
1783 static int handle_request (DISPATCH_PROTO) /* {{{ */
1785   char *buffer_ptr = buffer;
1786   char *cmd_str = NULL;
1787   command_t *cmd = NULL;
1788   int status;
1790   assert (buffer[buffer_size - 1] == '\0');
1792   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1793   if (status != 0)
1794   {
1795     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1796     return (-1);
1797   }
1799   if (sock != NULL && sock->batch_start)
1800     sock->batch_cmd++;
1802   cmd = find_command(cmd_str);
1803   if (!cmd)
1804     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1806   if (!socket_permission_check (sock, cmd->cmd))
1807     return send_response(sock, RESP_ERR, "Permission denied.\n");
1809   if (!command_check_context(sock, cmd))
1810     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1812   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1813 } /* }}} int handle_request */
1815 static void journal_set_free (journal_set *js) /* {{{ */
1817   if (js == NULL)
1818     return;
1820   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1822   free(js);
1823 } /* }}} journal_set_free */
1825 static void journal_set_remove (journal_set *js) /* {{{ */
1827   if (js == NULL)
1828     return;
1830   for (uint i=0; i < js->files_num; i++)
1831   {
1832     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1833     unlink(js->files[i]);
1834   }
1835 } /* }}} journal_set_remove */
1837 /* close current journal file handle.
1838  * MUST hold journal_lock before calling */
1839 static void journal_close(void) /* {{{ */
1841   if (journal_fh != NULL)
1842   {
1843     if (fclose(journal_fh) != 0)
1844       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1845   }
1847   journal_fh = NULL;
1848   journal_size = 0;
1849 } /* }}} journal_close */
1851 /* MUST hold journal_lock before calling */
1852 static void journal_new_file(void) /* {{{ */
1854   struct timeval now;
1855   int  new_fd;
1856   char new_file[PATH_MAX + 1];
1858   assert(journal_dir != NULL);
1859   assert(journal_cur != NULL);
1861   journal_close();
1863   gettimeofday(&now, NULL);
1864   /* this format assures that the files sort in strcmp() order */
1865   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1866            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1868   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1869                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1870   if (new_fd < 0)
1871     goto error;
1873   journal_fh = fdopen(new_fd, "a");
1874   if (journal_fh == NULL)
1875     goto error;
1877   journal_size = ftell(journal_fh);
1878   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1880   /* record the file in the journal set */
1881   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1883   return;
1885 error:
1886   RRDD_LOG(LOG_CRIT,
1887            "JOURNALING DISABLED: Error while trying to create %s : %s",
1888            new_file, rrd_strerror(errno));
1889   RRDD_LOG(LOG_CRIT,
1890            "JOURNALING DISABLED: All values will be flushed at shutdown");
1892   close(new_fd);
1893   config_flush_at_shutdown = 1;
1895 } /* }}} journal_new_file */
1897 /* MUST NOT hold journal_lock before calling this */
1898 static void journal_rotate(void) /* {{{ */
1900   journal_set *old_js = NULL;
1902   if (journal_dir == NULL)
1903     return;
1905   RRDD_LOG(LOG_DEBUG, "rotating journals");
1907   pthread_mutex_lock(&stats_lock);
1908   ++stats_journal_rotate;
1909   pthread_mutex_unlock(&stats_lock);
1911   pthread_mutex_lock(&journal_lock);
1913   journal_close();
1915   /* rotate the journal sets */
1916   old_js = journal_old;
1917   journal_old = journal_cur;
1918   journal_cur = calloc(1, sizeof(journal_set));
1920   if (journal_cur != NULL)
1921     journal_new_file();
1922   else
1923     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1925   pthread_mutex_unlock(&journal_lock);
1927   journal_set_remove(old_js);
1928   journal_set_free  (old_js);
1930 } /* }}} static void journal_rotate */
1932 /* MUST hold journal_lock when calling */
1933 static void journal_done(void) /* {{{ */
1935   if (journal_cur == NULL)
1936     return;
1938   journal_close();
1940   if (config_flush_at_shutdown)
1941   {
1942     RRDD_LOG(LOG_INFO, "removing journals");
1943     journal_set_remove(journal_old);
1944     journal_set_remove(journal_cur);
1945   }
1946   else
1947   {
1948     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1949              "journals will be used at next startup");
1950   }
1952   journal_set_free(journal_cur);
1953   journal_set_free(journal_old);
1954   free(journal_dir);
1956 } /* }}} static void journal_done */
1958 static int journal_write(char *cmd, char *args) /* {{{ */
1960   int chars;
1962   if (journal_fh == NULL)
1963     return 0;
1965   pthread_mutex_lock(&journal_lock);
1966   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1967   journal_size += chars;
1969   if (journal_size > JOURNAL_MAX)
1970     journal_new_file();
1972   pthread_mutex_unlock(&journal_lock);
1974   if (chars > 0)
1975   {
1976     pthread_mutex_lock(&stats_lock);
1977     stats_journal_bytes += chars;
1978     pthread_mutex_unlock(&stats_lock);
1979   }
1981   return chars;
1982 } /* }}} static int journal_write */
1984 static int journal_replay (const char *file) /* {{{ */
1986   FILE *fh;
1987   int entry_cnt = 0;
1988   int fail_cnt = 0;
1989   uint64_t line = 0;
1990   char entry[CMD_MAX];
1991   time_t now;
1993   if (file == NULL) return 0;
1995   {
1996     char *reason = "unknown error";
1997     int status = 0;
1998     struct stat statbuf;
2000     memset(&statbuf, 0, sizeof(statbuf));
2001     if (stat(file, &statbuf) != 0)
2002     {
2003       reason = "stat error";
2004       status = errno;
2005     }
2006     else if (!S_ISREG(statbuf.st_mode))
2007     {
2008       reason = "not a regular file";
2009       status = EPERM;
2010     }
2011     if (statbuf.st_uid != daemon_uid)
2012     {
2013       reason = "not owned by daemon user";
2014       status = EACCES;
2015     }
2016     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2017     {
2018       reason = "must not be user/group writable";
2019       status = EACCES;
2020     }
2022     if (status != 0)
2023     {
2024       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2025                file, rrd_strerror(status), reason);
2026       return 0;
2027     }
2028   }
2030   fh = fopen(file, "r");
2031   if (fh == NULL)
2032   {
2033     if (errno != ENOENT)
2034       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2035                file, rrd_strerror(errno));
2036     return 0;
2037   }
2038   else
2039     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2041   now = time(NULL);
2043   while(!feof(fh))
2044   {
2045     size_t entry_len;
2047     ++line;
2048     if (fgets(entry, sizeof(entry), fh) == NULL)
2049       break;
2050     entry_len = strlen(entry);
2052     /* check \n termination in case journal writing crashed mid-line */
2053     if (entry_len == 0)
2054       continue;
2055     else if (entry[entry_len - 1] != '\n')
2056     {
2057       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2058       ++fail_cnt;
2059       continue;
2060     }
2062     entry[entry_len - 1] = '\0';
2064     if (handle_request(NULL, now, entry, entry_len) == 0)
2065       ++entry_cnt;
2066     else
2067       ++fail_cnt;
2068   }
2070   fclose(fh);
2072   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2073            entry_cnt, fail_cnt);
2075   return entry_cnt > 0 ? 1 : 0;
2076 } /* }}} static int journal_replay */
2078 static int journal_sort(const void *v1, const void *v2)
2080   char **jn1 = (char **) v1;
2081   char **jn2 = (char **) v2;
2083   return strcmp(*jn1,*jn2);
2086 static void journal_init(void) /* {{{ */
2088   int had_journal = 0;
2089   DIR *dir;
2090   struct dirent *dent;
2091   char path[PATH_MAX+1];
2093   if (journal_dir == NULL) return;
2095   pthread_mutex_lock(&journal_lock);
2097   journal_cur = calloc(1, sizeof(journal_set));
2098   if (journal_cur == NULL)
2099   {
2100     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2101     return;
2102   }
2104   RRDD_LOG(LOG_INFO, "checking for journal files");
2106   /* Handle old journal files during transition.  This gives them the
2107    * correct sort order.  TODO: remove after first release
2108    */
2109   {
2110     char old_path[PATH_MAX+1];
2111     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2112     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2113     rename(old_path, path);
2115     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2116     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2117     rename(old_path, path);
2118   }
2120   dir = opendir(journal_dir);
2121   while ((dent = readdir(dir)) != NULL)
2122   {
2123     /* looks like a journal file? */
2124     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2125       continue;
2127     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2129     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2130     {
2131       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2132                dent->d_name);
2133       break;
2134     }
2135   }
2136   closedir(dir);
2138   qsort(journal_cur->files, journal_cur->files_num,
2139         sizeof(journal_cur->files[0]), journal_sort);
2141   for (uint i=0; i < journal_cur->files_num; i++)
2142     had_journal += journal_replay(journal_cur->files[i]);
2144   journal_new_file();
2146   /* it must have been a crash.  start a flush */
2147   if (had_journal && config_flush_at_shutdown)
2148     flush_old_values(-1);
2150   pthread_mutex_unlock(&journal_lock);
2152   RRDD_LOG(LOG_INFO, "journal processing complete");
2154 } /* }}} static void journal_init */
2156 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2158   assert(sock != NULL);
2160   free(sock->rbuf);  sock->rbuf = NULL;
2161   free(sock->wbuf);  sock->wbuf = NULL;
2162   free(sock);
2163 } /* }}} void free_listen_socket */
2165 static void close_connection(listen_socket_t *sock) /* {{{ */
2167   if (sock->fd >= 0)
2168   {
2169     close(sock->fd);
2170     sock->fd = -1;
2171   }
2173   free_listen_socket(sock);
2175 } /* }}} void close_connection */
2177 static void *connection_thread_main (void *args) /* {{{ */
2179   listen_socket_t *sock;
2180   int fd;
2182   sock = (listen_socket_t *) args;
2183   fd = sock->fd;
2185   /* init read buffers */
2186   sock->next_read = sock->next_cmd = 0;
2187   sock->rbuf = malloc(RBUF_SIZE);
2188   if (sock->rbuf == NULL)
2189   {
2190     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2191     close_connection(sock);
2192     return NULL;
2193   }
2195   pthread_mutex_lock (&connection_threads_lock);
2196   connection_threads_num++;
2197   pthread_mutex_unlock (&connection_threads_lock);
2199   while (state == RUNNING)
2200   {
2201     char *cmd;
2202     ssize_t cmd_len;
2203     ssize_t rbytes;
2204     time_t now;
2206     struct pollfd pollfd;
2207     int status;
2209     pollfd.fd = fd;
2210     pollfd.events = POLLIN | POLLPRI;
2211     pollfd.revents = 0;
2213     status = poll (&pollfd, 1, /* timeout = */ 500);
2214     if (state != RUNNING)
2215       break;
2216     else if (status == 0) /* timeout */
2217       continue;
2218     else if (status < 0) /* error */
2219     {
2220       status = errno;
2221       if (status != EINTR)
2222         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2223       continue;
2224     }
2226     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2227       break;
2228     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2229     {
2230       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2231           "poll(2) returned something unexpected: %#04hx",
2232           pollfd.revents);
2233       break;
2234     }
2236     rbytes = read(fd, sock->rbuf + sock->next_read,
2237                   RBUF_SIZE - sock->next_read);
2238     if (rbytes < 0)
2239     {
2240       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2241       break;
2242     }
2243     else if (rbytes == 0)
2244       break; /* eof */
2246     sock->next_read += rbytes;
2248     if (sock->batch_start)
2249       now = sock->batch_start;
2250     else
2251       now = time(NULL);
2253     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2254     {
2255       status = handle_request (sock, now, cmd, cmd_len+1);
2256       if (status != 0)
2257         goto out_close;
2258     }
2259   }
2261 out_close:
2262   close_connection(sock);
2264   /* Remove this thread from the connection threads list */
2265   pthread_mutex_lock (&connection_threads_lock);
2266   connection_threads_num--;
2267   if (connection_threads_num <= 0)
2268     pthread_cond_broadcast(&connection_threads_done);
2269   pthread_mutex_unlock (&connection_threads_lock);
2271   return (NULL);
2272 } /* }}} void *connection_thread_main */
2274 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2276   int fd;
2277   struct sockaddr_un sa;
2278   listen_socket_t *temp;
2279   int status;
2280   const char *path;
2281   char *path_copy, *dir;
2283   path = sock->addr;
2284   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2285     path += strlen("unix:");
2287   /* dirname may modify its argument */
2288   path_copy = strdup(path);
2289   if (path_copy == NULL)
2290   {
2291     fprintf(stderr, "rrdcached: strdup(): %s\n",
2292         rrd_strerror(errno));
2293     return (-1);
2294   }
2296   dir = dirname(path_copy);
2297   if (rrd_mkdir_p(dir, 0777) != 0)
2298   {
2299     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2300         dir, rrd_strerror(errno));
2301     return (-1);
2302   }
2304   free(path_copy);
2306   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2307       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2308   if (temp == NULL)
2309   {
2310     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2311     return (-1);
2312   }
2313   listen_fds = temp;
2314   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2316   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2317   if (fd < 0)
2318   {
2319     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2320              rrd_strerror(errno));
2321     return (-1);
2322   }
2324   memset (&sa, 0, sizeof (sa));
2325   sa.sun_family = AF_UNIX;
2326   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2328   /* if we've gotten this far, we own the pid file.  any daemon started
2329    * with the same args must not be alive.  therefore, ensure that we can
2330    * create the socket...
2331    */
2332   unlink(path);
2334   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2335   if (status != 0)
2336   {
2337     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2338              path, rrd_strerror(errno));
2339     close (fd);
2340     return (-1);
2341   }
2343   /* tweak the sockets group ownership */
2344   if (sock->socket_group != (gid_t)-1)
2345   {
2346     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2347          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2348     {
2349       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2350     }
2351   }
2353   if (sock->socket_permissions != (mode_t)-1)
2354   {
2355     if (chmod(path, sock->socket_permissions) != 0)
2356       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2357           (unsigned int)sock->socket_permissions, strerror(errno));
2358   }
2360   status = listen (fd, /* backlog = */ 10);
2361   if (status != 0)
2362   {
2363     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2364              path, rrd_strerror(errno));
2365     close (fd);
2366     unlink (path);
2367     return (-1);
2368   }
2370   listen_fds[listen_fds_num].fd = fd;
2371   listen_fds[listen_fds_num].family = PF_UNIX;
2372   strncpy(listen_fds[listen_fds_num].addr, path,
2373           sizeof (listen_fds[listen_fds_num].addr) - 1);
2374   listen_fds_num++;
2376   return (0);
2377 } /* }}} int open_listen_socket_unix */
2379 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2381   struct addrinfo ai_hints;
2382   struct addrinfo *ai_res;
2383   struct addrinfo *ai_ptr;
2384   char addr_copy[NI_MAXHOST];
2385   char *addr;
2386   char *port;
2387   int status;
2389   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2390   addr_copy[sizeof (addr_copy) - 1] = 0;
2391   addr = addr_copy;
2393   memset (&ai_hints, 0, sizeof (ai_hints));
2394   ai_hints.ai_flags = 0;
2395 #ifdef AI_ADDRCONFIG
2396   ai_hints.ai_flags |= AI_ADDRCONFIG;
2397 #endif
2398   ai_hints.ai_family = AF_UNSPEC;
2399   ai_hints.ai_socktype = SOCK_STREAM;
2401   port = NULL;
2402   if (*addr == '[') /* IPv6+port format */
2403   {
2404     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2405     addr++;
2407     port = strchr (addr, ']');
2408     if (port == NULL)
2409     {
2410       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2411       return (-1);
2412     }
2413     *port = 0;
2414     port++;
2416     if (*port == ':')
2417       port++;
2418     else if (*port == 0)
2419       port = NULL;
2420     else
2421     {
2422       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2423       return (-1);
2424     }
2425   } /* if (*addr == '[') */
2426   else
2427   {
2428     port = rindex(addr, ':');
2429     if (port != NULL)
2430     {
2431       *port = 0;
2432       port++;
2433     }
2434   }
2435   ai_res = NULL;
2436   status = getaddrinfo (addr,
2437                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2438                         &ai_hints, &ai_res);
2439   if (status != 0)
2440   {
2441     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2442              addr, gai_strerror (status));
2443     return (-1);
2444   }
2446   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2447   {
2448     int fd;
2449     listen_socket_t *temp;
2450     int one = 1;
2452     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2453         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2454     if (temp == NULL)
2455     {
2456       fprintf (stderr,
2457                "rrdcached: open_listen_socket_network: realloc failed.\n");
2458       continue;
2459     }
2460     listen_fds = temp;
2461     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2463     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2464     if (fd < 0)
2465     {
2466       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2467                rrd_strerror(errno));
2468       continue;
2469     }
2471     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2473     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2474     if (status != 0)
2475     {
2476       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2477                sock->addr, rrd_strerror(errno));
2478       close (fd);
2479       continue;
2480     }
2482     status = listen (fd, /* backlog = */ 10);
2483     if (status != 0)
2484     {
2485       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2486                sock->addr, rrd_strerror(errno));
2487       close (fd);
2488       freeaddrinfo(ai_res);
2489       return (-1);
2490     }
2492     listen_fds[listen_fds_num].fd = fd;
2493     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2494     listen_fds_num++;
2495   } /* for (ai_ptr) */
2497   freeaddrinfo(ai_res);
2498   return (0);
2499 } /* }}} static int open_listen_socket_network */
2501 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2503   assert(sock != NULL);
2504   assert(sock->addr != NULL);
2506   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2507       || sock->addr[0] == '/')
2508     return (open_listen_socket_unix(sock));
2509   else
2510     return (open_listen_socket_network(sock));
2511 } /* }}} int open_listen_socket */
2513 static int close_listen_sockets (void) /* {{{ */
2515   size_t i;
2517   for (i = 0; i < listen_fds_num; i++)
2518   {
2519     close (listen_fds[i].fd);
2521     if (listen_fds[i].family == PF_UNIX)
2522       unlink(listen_fds[i].addr);
2523   }
2525   free (listen_fds);
2526   listen_fds = NULL;
2527   listen_fds_num = 0;
2529   return (0);
2530 } /* }}} int close_listen_sockets */
2532 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2534   struct pollfd *pollfds;
2535   int pollfds_num;
2536   int status;
2537   int i;
2539   if (listen_fds_num < 1)
2540   {
2541     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2542     return (NULL);
2543   }
2545   pollfds_num = listen_fds_num;
2546   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2547   if (pollfds == NULL)
2548   {
2549     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2550     return (NULL);
2551   }
2552   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2554   RRDD_LOG(LOG_INFO, "listening for connections");
2556   while (state == RUNNING)
2557   {
2558     for (i = 0; i < pollfds_num; i++)
2559     {
2560       pollfds[i].fd = listen_fds[i].fd;
2561       pollfds[i].events = POLLIN | POLLPRI;
2562       pollfds[i].revents = 0;
2563     }
2565     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2566     if (state != RUNNING)
2567       break;
2568     else if (status == 0) /* timeout */
2569       continue;
2570     else if (status < 0) /* error */
2571     {
2572       status = errno;
2573       if (status != EINTR)
2574       {
2575         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2576       }
2577       continue;
2578     }
2580     for (i = 0; i < pollfds_num; i++)
2581     {
2582       listen_socket_t *client_sock;
2583       struct sockaddr_storage client_sa;
2584       socklen_t client_sa_size;
2585       pthread_t tid;
2586       pthread_attr_t attr;
2588       if (pollfds[i].revents == 0)
2589         continue;
2591       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2592       {
2593         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2594             "poll(2) returned something unexpected for listen FD #%i.",
2595             pollfds[i].fd);
2596         continue;
2597       }
2599       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2600       if (client_sock == NULL)
2601       {
2602         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2603         continue;
2604       }
2605       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2607       client_sa_size = sizeof (client_sa);
2608       client_sock->fd = accept (pollfds[i].fd,
2609           (struct sockaddr *) &client_sa, &client_sa_size);
2610       if (client_sock->fd < 0)
2611       {
2612         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2613         free(client_sock);
2614         continue;
2615       }
2617       pthread_attr_init (&attr);
2618       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2620       status = pthread_create (&tid, &attr, connection_thread_main,
2621                                client_sock);
2622       if (status != 0)
2623       {
2624         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2625         close_connection(client_sock);
2626         continue;
2627       }
2628     } /* for (pollfds_num) */
2629   } /* while (state == RUNNING) */
2631   RRDD_LOG(LOG_INFO, "starting shutdown");
2633   close_listen_sockets ();
2635   pthread_mutex_lock (&connection_threads_lock);
2636   while (connection_threads_num > 0)
2637     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2638   pthread_mutex_unlock (&connection_threads_lock);
2640   free(pollfds);
2642   return (NULL);
2643 } /* }}} void *listen_thread_main */
2645 static int daemonize (void) /* {{{ */
2647   int pid_fd;
2648   char *base_dir;
2650   daemon_uid = geteuid();
2652   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2653   if (pid_fd < 0)
2654     pid_fd = check_pidfile();
2655   if (pid_fd < 0)
2656     return pid_fd;
2658   /* open all the listen sockets */
2659   if (config_listen_address_list_len > 0)
2660   {
2661     for (size_t i = 0; i < config_listen_address_list_len; i++)
2662       open_listen_socket (config_listen_address_list[i]);
2664     rrd_free_ptrs((void ***) &config_listen_address_list,
2665                   &config_listen_address_list_len);
2666   }
2667   else
2668   {
2669     listen_socket_t sock;
2670     memset(&sock, 0, sizeof(sock));
2671     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2672     open_listen_socket (&sock);
2673   }
2675   if (listen_fds_num < 1)
2676   {
2677     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2678     goto error;
2679   }
2681   if (!stay_foreground)
2682   {
2683     pid_t child;
2685     child = fork ();
2686     if (child < 0)
2687     {
2688       fprintf (stderr, "daemonize: fork(2) failed.\n");
2689       goto error;
2690     }
2691     else if (child > 0)
2692       exit(0);
2694     /* Become session leader */
2695     setsid ();
2697     /* Open the first three file descriptors to /dev/null */
2698     close (2);
2699     close (1);
2700     close (0);
2702     open ("/dev/null", O_RDWR);
2703     if (dup(0) == -1 || dup(0) == -1){
2704         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2705     }
2706   } /* if (!stay_foreground) */
2708   /* Change into the /tmp directory. */
2709   base_dir = (config_base_dir != NULL)
2710     ? config_base_dir
2711     : "/tmp";
2713   if (chdir (base_dir) != 0)
2714   {
2715     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2716     goto error;
2717   }
2719   install_signal_handlers();
2721   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2722   RRDD_LOG(LOG_INFO, "starting up");
2724   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2725                                 (GDestroyNotify) free_cache_item);
2726   if (cache_tree == NULL)
2727   {
2728     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2729     goto error;
2730   }
2732   return write_pidfile (pid_fd);
2734 error:
2735   remove_pidfile();
2736   return -1;
2737 } /* }}} int daemonize */
2739 static int cleanup (void) /* {{{ */
2741   pthread_cond_broadcast (&flush_cond);
2742   pthread_join (flush_thread, NULL);
2744   pthread_cond_broadcast (&queue_cond);
2745   for (int i = 0; i < config_queue_threads; i++)
2746     pthread_join (queue_threads[i], NULL);
2748   if (config_flush_at_shutdown)
2749   {
2750     assert(cache_queue_head == NULL);
2751     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2752   }
2754   free(queue_threads);
2755   free(config_base_dir);
2757   pthread_mutex_lock(&cache_lock);
2758   g_tree_destroy(cache_tree);
2760   pthread_mutex_lock(&journal_lock);
2761   journal_done();
2763   RRDD_LOG(LOG_INFO, "goodbye");
2764   closelog ();
2766   remove_pidfile ();
2767   free(config_pid_file);
2769   return (0);
2770 } /* }}} int cleanup */
2772 static int read_options (int argc, char **argv) /* {{{ */
2774   int option;
2775   int status = 0;
2777   char **permissions = NULL;
2778   size_t permissions_len = 0;
2780   gid_t  socket_group = (gid_t)-1;
2781   mode_t socket_permissions = (mode_t)-1;
2783   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:m:h?")) != -1)
2784   {
2785     switch (option)
2786     {
2787       case 'g':
2788         stay_foreground=1;
2789         break;
2791       case 'l':
2792       {
2793         listen_socket_t *new;
2795         new = malloc(sizeof(listen_socket_t));
2796         if (new == NULL)
2797         {
2798           fprintf(stderr, "read_options: malloc failed.\n");
2799           return(2);
2800         }
2801         memset(new, 0, sizeof(listen_socket_t));
2803         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2805         /* Add permissions to the socket {{{ */
2806         if (permissions_len != 0)
2807         {
2808           size_t i;
2809           for (i = 0; i < permissions_len; i++)
2810           {
2811             status = socket_permission_add (new, permissions[i]);
2812             if (status != 0)
2813             {
2814               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2815                   "socket failed. Most likely, this permission doesn't "
2816                   "exist. Check your command line.\n", permissions[i]);
2817               status = 4;
2818             }
2819           }
2820         }
2821         else /* if (permissions_len == 0) */
2822         {
2823           /* Add permission for ALL commands to the socket. */
2824           size_t i;
2825           for (i = 0; i < list_of_commands_len; i++)
2826           {
2827             status = socket_permission_add (new, list_of_commands[i].cmd);
2828             if (status != 0)
2829             {
2830               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2831                   "socket failed. This should never happen, ever! Sorry.\n",
2832                   permissions[i]);
2833               status = 4;
2834             }
2835           }
2836         }
2837         /* }}} Done adding permissions. */
2839         new->socket_group = socket_group;
2840         new->socket_permissions = socket_permissions;
2842         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2843                          &config_listen_address_list_len, new))
2844         {
2845           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2846           return (2);
2847         }
2848       }
2849       break;
2851       /* set socket group permissions */
2852       case 's':
2853       {
2854         gid_t group_gid;
2855         struct group *grp;
2857         group_gid = strtoul(optarg, NULL, 10);
2858         if (errno != EINVAL && group_gid>0)
2859         {
2860           /* we were passed a number */
2861           grp = getgrgid(group_gid);
2862         }
2863         else
2864         {
2865           grp = getgrnam(optarg);
2866         }
2868         if (grp)
2869         {
2870           socket_group = grp->gr_gid;
2871         }
2872         else
2873         {
2874           /* no idea what the user wanted... */
2875           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2876           return (5);
2877         }
2878       }
2879       break;
2881       /* set socket file permissions */
2882       case 'm':
2883       {
2884         long  tmp;
2885         char *endptr = NULL;
2887         tmp = strtol (optarg, &endptr, 8);
2888         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2889             || (tmp > 07777) || (tmp < 0)) {
2890           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2891               optarg);
2892           return (5);
2893         }
2895         socket_permissions = (mode_t)tmp;
2896       }
2897       break;
2899       case 'P':
2900       {
2901         char *optcopy;
2902         char *saveptr;
2903         char *dummy;
2904         char *ptr;
2906         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2908         optcopy = strdup (optarg);
2909         dummy = optcopy;
2910         saveptr = NULL;
2911         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2912         {
2913           dummy = NULL;
2914           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2915         }
2917         free (optcopy);
2918       }
2919       break;
2921       case 'f':
2922       {
2923         int temp;
2925         temp = atoi (optarg);
2926         if (temp > 0)
2927           config_flush_interval = temp;
2928         else
2929         {
2930           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2931           status = 3;
2932         }
2933       }
2934       break;
2936       case 'w':
2937       {
2938         int temp;
2940         temp = atoi (optarg);
2941         if (temp > 0)
2942           config_write_interval = temp;
2943         else
2944         {
2945           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2946           status = 2;
2947         }
2948       }
2949       break;
2951       case 'z':
2952       {
2953         int temp;
2955         temp = atoi(optarg);
2956         if (temp > 0)
2957           config_write_jitter = temp;
2958         else
2959         {
2960           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2961           status = 2;
2962         }
2964         break;
2965       }
2967       case 't':
2968       {
2969         int threads;
2970         threads = atoi(optarg);
2971         if (threads >= 1)
2972           config_queue_threads = threads;
2973         else
2974         {
2975           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2976           return 1;
2977         }
2978       }
2979       break;
2981       case 'B':
2982         config_write_base_only = 1;
2983         break;
2985       case 'b':
2986       {
2987         size_t len;
2988         char base_realpath[PATH_MAX];
2990         if (config_base_dir != NULL)
2991           free (config_base_dir);
2992         config_base_dir = strdup (optarg);
2993         if (config_base_dir == NULL)
2994         {
2995           fprintf (stderr, "read_options: strdup failed.\n");
2996           return (3);
2997         }
2999         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3000         {
3001           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3002               config_base_dir, rrd_strerror (errno));
3003           return (3);
3004         }
3006         /* make sure that the base directory is not resolved via
3007          * symbolic links.  this makes some performance-enhancing
3008          * assumptions possible (we don't have to resolve paths
3009          * that start with a "/")
3010          */
3011         if (realpath(config_base_dir, base_realpath) == NULL)
3012         {
3013           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3014               "%s\n", config_base_dir, rrd_strerror(errno));
3015           return 5;
3016         }
3018         len = strlen (config_base_dir);
3019         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3020         {
3021           config_base_dir[len - 1] = 0;
3022           len--;
3023         }
3025         if (len < 1)
3026         {
3027           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3028           return (4);
3029         }
3031         _config_base_dir_len = len;
3033         len = strlen (base_realpath);
3034         while ((len > 0) && (base_realpath[len - 1] == '/'))
3035         {
3036           base_realpath[len - 1] = '\0';
3037           len--;
3038         }
3040         if (strncmp(config_base_dir,
3041                          base_realpath, sizeof(base_realpath)) != 0)
3042         {
3043           fprintf(stderr,
3044                   "Base directory (-b) resolved via file system links!\n"
3045                   "Please consult rrdcached '-b' documentation!\n"
3046                   "Consider specifying the real directory (%s)\n",
3047                   base_realpath);
3048           return 5;
3049         }
3050       }
3051       break;
3053       case 'p':
3054       {
3055         if (config_pid_file != NULL)
3056           free (config_pid_file);
3057         config_pid_file = strdup (optarg);
3058         if (config_pid_file == NULL)
3059         {
3060           fprintf (stderr, "read_options: strdup failed.\n");
3061           return (3);
3062         }
3063       }
3064       break;
3066       case 'F':
3067         config_flush_at_shutdown = 1;
3068         break;
3070       case 'j':
3071       {
3072         const char *dir = journal_dir = strdup(optarg);
3074         status = rrd_mkdir_p(dir, 0777);
3075         if (status != 0)
3076         {
3077           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3078               dir, rrd_strerror(errno));
3079           return 6;
3080         }
3082         if (access(dir, R_OK|W_OK|X_OK) != 0)
3083         {
3084           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3085                   errno ? rrd_strerror(errno) : "");
3086           return 6;
3087         }
3088       }
3089       break;
3091       case 'm':
3092       {
3093         int temp = atoi(optarg);
3094         if (temp > 0)
3095           config_alloc_chunk = temp;
3096         else
3097         {
3098           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3099           status = 10;
3100         }
3101       }
3102       break;
3104       case 'h':
3105       case '?':
3106         printf ("RRDCacheD %s\n"
3107             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3108             "\n"
3109             "Usage: rrdcached [options]\n"
3110             "\n"
3111             "Valid options are:\n"
3112             "  -l <address>  Socket address to listen to.\n"
3113             "  -P <perms>    Sets the permissions to assign to all following "
3114                             "sockets\n"
3115             "  -w <seconds>  Interval in which to write data.\n"
3116             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3117             "  -t <threads>  Number of write threads.\n"
3118             "  -f <seconds>  Interval in which to flush dead data.\n"
3119             "  -p <file>     Location of the PID-file.\n"
3120             "  -b <dir>      Base directory to change to.\n"
3121             "  -B            Restrict file access to paths within -b <dir>\n"
3122             "  -g            Do not fork and run in the foreground.\n"
3123             "  -j <dir>      Directory in which to create the journal files.\n"
3124             "  -F            Always flush all updates at shutdown\n"
3125             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3126             "                (the socket will also have read/write permissions "
3127                             "for that group)\n"
3128             "  -m <mode>     File permissions (octal) of all following UNIX "
3129                             "sockets\n"
3130             "\n"
3131             "For more information and a detailed description of all options "
3132             "please refer\n"
3133             "to the rrdcached(1) manual page.\n",
3134             VERSION);
3135         if (option == 'h')
3136           status = -1;
3137         else
3138           status = 1;
3139         break;
3140     } /* switch (option) */
3141   } /* while (getopt) */
3143   /* advise the user when values are not sane */
3144   if (config_flush_interval < 2 * config_write_interval)
3145     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3146             " 2x write interval (-w) !\n");
3147   if (config_write_jitter > config_write_interval)
3148     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3149             " write interval (-w) !\n");
3151   if (config_write_base_only && config_base_dir == NULL)
3152     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3153             "  Consult the rrdcached documentation\n");
3155   if (journal_dir == NULL)
3156     config_flush_at_shutdown = 1;
3158   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3160   return (status);
3161 } /* }}} int read_options */
3163 int main (int argc, char **argv)
3165   int status;
3167   status = read_options (argc, argv);
3168   if (status != 0)
3169   {
3170     if (status < 0)
3171       status = 0;
3172     return (status);
3173   }
3175   status = daemonize ();
3176   if (status != 0)
3177   {
3178     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3179     return (1);
3180   }
3182   journal_init();
3184   /* start the queue threads */
3185   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3186   if (queue_threads == NULL)
3187   {
3188     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3189     cleanup();
3190     return (1);
3191   }
3192   for (int i = 0; i < config_queue_threads; i++)
3193   {
3194     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3195     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3196     if (status != 0)
3197     {
3198       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3199       cleanup();
3200       return (1);
3201     }
3202   }
3204   /* start the flush thread */
3205   memset(&flush_thread, 0, sizeof(flush_thread));
3206   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3207   if (status != 0)
3208   {
3209     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3210     cleanup();
3211     return (1);
3212   }
3214   listen_thread_main (NULL);
3215   cleanup ();
3217   return (0);
3218 } /* int main */
3220 /*
3221  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3222  */