Code

print \n for log messages while runing rrdcached in the foreground ... suggested...
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) { \
118       fprintf(stderr, __VA_ARGS__); \
119       fprintf(stderr, "\n"); } \
120     syslog ((severity), __VA_ARGS__); \
121   } while (0)
123 /*
124  * Types
125  */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
130   int fd;
131   char addr[PATH_MAX + 1];
132   int family;
134   /* state for BATCH processing */
135   time_t batch_start;
136   int batch_cmd;
138   /* buffered IO */
139   char *rbuf;
140   off_t next_cmd;
141   off_t next_read;
143   char *wbuf;
144   ssize_t wbuf_len;
146   uint32_t permissions;
148   gid_t  socket_group;
149   mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
157                         time_t UNUSED(now),\
158                         char  UNUSED(*buffer),\
159                         size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
162                         DISPATCH_PROTO
164 struct command_s {
165   char   *cmd;
166   int (*handler)(HANDLER_PROTO);
168   char  context;                /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT      (1<<0)
170 #define CMD_CONTEXT_BATCH       (1<<1)
171 #define CMD_CONTEXT_JOURNAL     (1<<2)
172 #define CMD_CONTEXT_ANY         (0x7f)
174   char *syntax;
175   char *help;
176 };
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
182   char *file;
183   char **values;
184   size_t values_num;
185   time_t last_flush_time;
186   time_t last_update_stamp;
187 #define CI_FLAGS_IN_TREE  (1<<0)
188 #define CI_FLAGS_IN_QUEUE (1<<1)
189   int flags;
190   pthread_cond_t  flushed;
191   cache_item_t *prev;
192   cache_item_t *next;
193 };
195 struct callback_flush_data_s
197   time_t now;
198   time_t abs_timeout;
199   char **keys;
200   size_t keys_num;
201 };
202 typedef struct callback_flush_data_s callback_flush_data_t;
204 enum queue_side_e
206   HEAD,
207   TAIL
208 };
209 typedef enum queue_side_e queue_side_t;
211 /* describe a set of journal files */
212 typedef struct {
213   char **files;
214   size_t files_num;
215 } journal_set;
217 /* max length of socket command or response */
218 #define CMD_MAX 4096
219 #define RBUF_SIZE (CMD_MAX*2)
221 /*
222  * Variables
223  */
224 static int stay_foreground = 0;
225 static uid_t daemon_uid;
227 static listen_socket_t *listen_fds = NULL;
228 static size_t listen_fds_num = 0;
230 enum {
231   RUNNING,              /* normal operation */
232   FLUSHING,             /* flushing remaining values */
233   SHUTDOWN              /* shutting down */
234 } state = RUNNING;
236 static pthread_t *queue_threads;
237 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
238 static int config_queue_threads = 4;
240 static pthread_t flush_thread;
241 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
243 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
244 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
245 static int connection_threads_num = 0;
247 /* Cache stuff */
248 static GTree          *cache_tree = NULL;
249 static cache_item_t   *cache_queue_head = NULL;
250 static cache_item_t   *cache_queue_tail = NULL;
251 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
253 static int config_write_interval = 300;
254 static int config_write_jitter   = 0;
255 static int config_flush_interval = 3600;
256 static int config_flush_at_shutdown = 0;
257 static char *config_pid_file = NULL;
258 static char *config_base_dir = NULL;
259 static size_t _config_base_dir_len = 0;
260 static int config_write_base_only = 0;
262 static listen_socket_t **config_listen_address_list = NULL;
263 static size_t config_listen_address_list_len = 0;
265 static uint64_t stats_queue_length = 0;
266 static uint64_t stats_updates_received = 0;
267 static uint64_t stats_flush_received = 0;
268 static uint64_t stats_updates_written = 0;
269 static uint64_t stats_data_sets_written = 0;
270 static uint64_t stats_journal_bytes = 0;
271 static uint64_t stats_journal_rotate = 0;
272 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
274 /* Journaled updates */
275 #define JOURNAL_REPLAY(s) ((s) == NULL)
276 #define JOURNAL_BASE "rrd.journal"
277 static journal_set *journal_cur = NULL;
278 static journal_set *journal_old = NULL;
279 static char *journal_dir = NULL;
280 static FILE *journal_fh = NULL;         /* current journal file handle */
281 static long  journal_size = 0;          /* current journal size */
282 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
283 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
284 static int journal_write(char *cmd, char *args);
285 static void journal_done(void);
286 static void journal_rotate(void);
288 /* prototypes for forward refernces */
289 static int handle_request_help (HANDLER_PROTO);
291 /* 
292  * Functions
293  */
294 static void sig_common (const char *sig) /* {{{ */
296   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
297   state = FLUSHING;
298   pthread_cond_broadcast(&flush_cond);
299   pthread_cond_broadcast(&queue_cond);
300 } /* }}} void sig_common */
302 static void sig_int_handler (int UNUSED(s)) /* {{{ */
304   sig_common("INT");
305 } /* }}} void sig_int_handler */
307 static void sig_term_handler (int UNUSED(s)) /* {{{ */
309   sig_common("TERM");
310 } /* }}} void sig_term_handler */
312 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
314   config_flush_at_shutdown = 1;
315   sig_common("USR1");
316 } /* }}} void sig_usr1_handler */
318 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
320   config_flush_at_shutdown = 0;
321   sig_common("USR2");
322 } /* }}} void sig_usr2_handler */
324 static void install_signal_handlers(void) /* {{{ */
326   /* These structures are static, because `sigaction' behaves weird if the are
327    * overwritten.. */
328   static struct sigaction sa_int;
329   static struct sigaction sa_term;
330   static struct sigaction sa_pipe;
331   static struct sigaction sa_usr1;
332   static struct sigaction sa_usr2;
334   /* Install signal handlers */
335   memset (&sa_int, 0, sizeof (sa_int));
336   sa_int.sa_handler = sig_int_handler;
337   sigaction (SIGINT, &sa_int, NULL);
339   memset (&sa_term, 0, sizeof (sa_term));
340   sa_term.sa_handler = sig_term_handler;
341   sigaction (SIGTERM, &sa_term, NULL);
343   memset (&sa_pipe, 0, sizeof (sa_pipe));
344   sa_pipe.sa_handler = SIG_IGN;
345   sigaction (SIGPIPE, &sa_pipe, NULL);
347   memset (&sa_pipe, 0, sizeof (sa_usr1));
348   sa_usr1.sa_handler = sig_usr1_handler;
349   sigaction (SIGUSR1, &sa_usr1, NULL);
351   memset (&sa_usr2, 0, sizeof (sa_usr2));
352   sa_usr2.sa_handler = sig_usr2_handler;
353   sigaction (SIGUSR2, &sa_usr2, NULL);
355 } /* }}} void install_signal_handlers */
357 static int open_pidfile(char *action, int oflag) /* {{{ */
359   int fd;
360   const char *file;
361   char *file_copy, *dir;
363   file = (config_pid_file != NULL)
364     ? config_pid_file
365     : LOCALSTATEDIR "/run/rrdcached.pid";
367   /* dirname may modify its argument */
368   file_copy = strdup(file);
369   if (file_copy == NULL)
370   {
371     fprintf(stderr, "rrdcached: strdup(): %s\n",
372         rrd_strerror(errno));
373     return -1;
374   }
376   dir = dirname(file_copy);
377   if (rrd_mkdir_p(dir, 0777) != 0)
378   {
379     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
380         dir, rrd_strerror(errno));
381     return -1;
382   }
384   free(file_copy);
386   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
387   if (fd < 0)
388     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
389             action, file, rrd_strerror(errno));
391   return(fd);
392 } /* }}} static int open_pidfile */
394 /* check existing pid file to see whether a daemon is running */
395 static int check_pidfile(void)
397   int pid_fd;
398   pid_t pid;
399   char pid_str[16];
401   pid_fd = open_pidfile("open", O_RDWR);
402   if (pid_fd < 0)
403     return pid_fd;
405   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
406     return -1;
408   pid = atoi(pid_str);
409   if (pid <= 0)
410     return -1;
412   /* another running process that we can signal COULD be
413    * a competing rrdcached */
414   if (pid != getpid() && kill(pid, 0) == 0)
415   {
416     fprintf(stderr,
417             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
418     close(pid_fd);
419     return -1;
420   }
422   lseek(pid_fd, 0, SEEK_SET);
423   if (ftruncate(pid_fd, 0) == -1)
424   {
425     fprintf(stderr,
426             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
427     close(pid_fd);
428     return -1;
429   }
431   fprintf(stderr,
432           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
433           "rrdcached: starting normally.\n", pid);
435   return pid_fd;
436 } /* }}} static int check_pidfile */
438 static int write_pidfile (int fd) /* {{{ */
440   pid_t pid;
441   FILE *fh;
443   pid = getpid ();
445   fh = fdopen (fd, "w");
446   if (fh == NULL)
447   {
448     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
449     close(fd);
450     return (-1);
451   }
453   fprintf (fh, "%i\n", (int) pid);
454   fclose (fh);
456   return (0);
457 } /* }}} int write_pidfile */
459 static int remove_pidfile (void) /* {{{ */
461   char *file;
462   int status;
464   file = (config_pid_file != NULL)
465     ? config_pid_file
466     : LOCALSTATEDIR "/run/rrdcached.pid";
468   status = unlink (file);
469   if (status == 0)
470     return (0);
471   return (errno);
472 } /* }}} int remove_pidfile */
474 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
476   char *eol;
478   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
479                sock->next_read - sock->next_cmd);
481   if (eol == NULL)
482   {
483     /* no commands left, move remainder back to front of rbuf */
484     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
485             sock->next_read - sock->next_cmd);
486     sock->next_read -= sock->next_cmd;
487     sock->next_cmd = 0;
488     *len = 0;
489     return NULL;
490   }
491   else
492   {
493     char *cmd = sock->rbuf + sock->next_cmd;
494     *eol = '\0';
496     sock->next_cmd = eol - sock->rbuf + 1;
498     if (eol > sock->rbuf && *(eol-1) == '\r')
499       *(--eol) = '\0'; /* handle "\r\n" EOL */
501     *len = eol - cmd;
503     return cmd;
504   }
506   /* NOTREACHED */
507   assert(1==0);
508 } /* }}} char *next_cmd */
510 /* add the characters directly to the write buffer */
511 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
513   char *new_buf;
515   assert(sock != NULL);
517   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
518   if (new_buf == NULL)
519   {
520     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
521     return -1;
522   }
524   strncpy(new_buf + sock->wbuf_len, str, len + 1);
526   sock->wbuf = new_buf;
527   sock->wbuf_len += len;
529   return 0;
530 } /* }}} static int add_to_wbuf */
532 /* add the text to the "extra" info that's sent after the status line */
533 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
535   va_list argp;
536   char buffer[CMD_MAX];
537   int len;
539   if (JOURNAL_REPLAY(sock)) return 0;
540   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
542   va_start(argp, fmt);
543 #ifdef HAVE_VSNPRINTF
544   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
545 #else
546   len = vsprintf(buffer, fmt, argp);
547 #endif
548   va_end(argp);
549   if (len < 0)
550   {
551     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
552     return -1;
553   }
555   return add_to_wbuf(sock, buffer, len);
556 } /* }}} static int add_response_info */
558 static int count_lines(char *str) /* {{{ */
560   int lines = 0;
562   if (str != NULL)
563   {
564     while ((str = strchr(str, '\n')) != NULL)
565     {
566       ++lines;
567       ++str;
568     }
569   }
571   return lines;
572 } /* }}} static int count_lines */
574 /* send the response back to the user.
575  * returns 0 on success, -1 on error
576  * write buffer is always zeroed after this call */
577 static int send_response (listen_socket_t *sock, response_code rc,
578                           char *fmt, ...) /* {{{ */
580   va_list argp;
581   char buffer[CMD_MAX];
582   int lines;
583   ssize_t wrote;
584   int rclen, len;
586   if (JOURNAL_REPLAY(sock)) return rc;
588   if (sock->batch_start)
589   {
590     if (rc == RESP_OK)
591       return rc; /* no response on success during BATCH */
592     lines = sock->batch_cmd;
593   }
594   else if (rc == RESP_OK)
595     lines = count_lines(sock->wbuf);
596   else
597     lines = -1;
599   rclen = sprintf(buffer, "%d ", lines);
600   va_start(argp, fmt);
601 #ifdef HAVE_VSNPRINTF
602   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
603 #else
604   len = vsprintf(buffer+rclen, fmt, argp);
605 #endif
606   va_end(argp);
607   if (len < 0)
608     return -1;
610   len += rclen;
612   /* append the result to the wbuf, don't write to the user */
613   if (sock->batch_start)
614     return add_to_wbuf(sock, buffer, len);
616   /* first write must be complete */
617   if (len != write(sock->fd, buffer, len))
618   {
619     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
620     return -1;
621   }
623   if (sock->wbuf != NULL && rc == RESP_OK)
624   {
625     wrote = 0;
626     while (wrote < sock->wbuf_len)
627     {
628       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
629       if (wb <= 0)
630       {
631         RRDD_LOG(LOG_INFO, "send_response: could not write results");
632         return -1;
633       }
634       wrote += wb;
635     }
636   }
638   free(sock->wbuf); sock->wbuf = NULL;
639   sock->wbuf_len = 0;
641   return 0;
642 } /* }}} */
644 static void wipe_ci_values(cache_item_t *ci, time_t when)
646   ci->values = NULL;
647   ci->values_num = 0;
649   ci->last_flush_time = when;
650   if (config_write_jitter > 0)
651     ci->last_flush_time += (rrd_random() % config_write_jitter);
654 /* remove_from_queue
655  * remove a "cache_item_t" item from the queue.
656  * must hold 'cache_lock' when calling this
657  */
658 static void remove_from_queue(cache_item_t *ci) /* {{{ */
660   if (ci == NULL) return;
661   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
663   if (ci->prev == NULL)
664     cache_queue_head = ci->next; /* reset head */
665   else
666     ci->prev->next = ci->next;
668   if (ci->next == NULL)
669     cache_queue_tail = ci->prev; /* reset the tail */
670   else
671     ci->next->prev = ci->prev;
673   ci->next = ci->prev = NULL;
674   ci->flags &= ~CI_FLAGS_IN_QUEUE;
676   pthread_mutex_lock (&stats_lock);
677   assert (stats_queue_length > 0);
678   stats_queue_length--;
679   pthread_mutex_unlock (&stats_lock);
681 } /* }}} static void remove_from_queue */
683 /* free the resources associated with the cache_item_t
684  * must hold cache_lock when calling this function
685  */
686 static void *free_cache_item(cache_item_t *ci) /* {{{ */
688   if (ci == NULL) return NULL;
690   remove_from_queue(ci);
692   for (size_t i=0; i < ci->values_num; i++)
693     free(ci->values[i]);
695   free (ci->values);
696   free (ci->file);
698   /* in case anyone is waiting */
699   pthread_cond_broadcast(&ci->flushed);
700   pthread_cond_destroy(&ci->flushed);
702   free (ci);
704   return NULL;
705 } /* }}} static void *free_cache_item */
707 /*
708  * enqueue_cache_item:
709  * `cache_lock' must be acquired before calling this function!
710  */
711 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
712     queue_side_t side)
714   if (ci == NULL)
715     return (-1);
717   if (ci->values_num == 0)
718     return (0);
720   if (side == HEAD)
721   {
722     if (cache_queue_head == ci)
723       return 0;
725     /* remove if further down in queue */
726     remove_from_queue(ci);
728     ci->prev = NULL;
729     ci->next = cache_queue_head;
730     if (ci->next != NULL)
731       ci->next->prev = ci;
732     cache_queue_head = ci;
734     if (cache_queue_tail == NULL)
735       cache_queue_tail = cache_queue_head;
736   }
737   else /* (side == TAIL) */
738   {
739     /* We don't move values back in the list.. */
740     if (ci->flags & CI_FLAGS_IN_QUEUE)
741       return (0);
743     assert (ci->next == NULL);
744     assert (ci->prev == NULL);
746     ci->prev = cache_queue_tail;
748     if (cache_queue_tail == NULL)
749       cache_queue_head = ci;
750     else
751       cache_queue_tail->next = ci;
753     cache_queue_tail = ci;
754   }
756   ci->flags |= CI_FLAGS_IN_QUEUE;
758   pthread_cond_signal(&queue_cond);
759   pthread_mutex_lock (&stats_lock);
760   stats_queue_length++;
761   pthread_mutex_unlock (&stats_lock);
763   return (0);
764 } /* }}} int enqueue_cache_item */
766 /*
767  * tree_callback_flush:
768  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
769  * while this is in progress.
770  */
771 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
772     gpointer data)
774   cache_item_t *ci;
775   callback_flush_data_t *cfd;
777   ci = (cache_item_t *) value;
778   cfd = (callback_flush_data_t *) data;
780   if (ci->flags & CI_FLAGS_IN_QUEUE)
781     return FALSE;
783   if (ci->values_num > 0
784       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
785   {
786     enqueue_cache_item (ci, TAIL);
787   }
788   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
789       && (ci->values_num <= 0))
790   {
791     assert ((char *) key == ci->file);
792     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
793     {
794       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
795       return (FALSE);
796     }
797   }
799   return (FALSE);
800 } /* }}} gboolean tree_callback_flush */
802 static int flush_old_values (int max_age)
804   callback_flush_data_t cfd;
805   size_t k;
807   memset (&cfd, 0, sizeof (cfd));
808   /* Pass the current time as user data so that we don't need to call
809    * `time' for each node. */
810   cfd.now = time (NULL);
811   cfd.keys = NULL;
812   cfd.keys_num = 0;
814   if (max_age > 0)
815     cfd.abs_timeout = cfd.now - max_age;
816   else
817     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
819   /* `tree_callback_flush' will return the keys of all values that haven't
820    * been touched in the last `config_flush_interval' seconds in `cfd'.
821    * The char*'s in this array point to the same memory as ci->file, so we
822    * don't need to free them separately. */
823   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
825   for (k = 0; k < cfd.keys_num; k++)
826   {
827     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
828     /* should never fail, since we have held the cache_lock
829      * the entire time */
830     assert(status == TRUE);
831   }
833   if (cfd.keys != NULL)
834   {
835     free (cfd.keys);
836     cfd.keys = NULL;
837   }
839   return (0);
840 } /* int flush_old_values */
842 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
844   struct timeval now;
845   struct timespec next_flush;
846   int status;
848   gettimeofday (&now, NULL);
849   next_flush.tv_sec = now.tv_sec + config_flush_interval;
850   next_flush.tv_nsec = 1000 * now.tv_usec;
852   pthread_mutex_lock(&cache_lock);
854   while (state == RUNNING)
855   {
856     gettimeofday (&now, NULL);
857     if ((now.tv_sec > next_flush.tv_sec)
858         || ((now.tv_sec == next_flush.tv_sec)
859           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
860     {
861       RRDD_LOG(LOG_DEBUG, "flushing old values");
863       /* Determine the time of the next cache flush. */
864       next_flush.tv_sec = now.tv_sec + config_flush_interval;
866       /* Flush all values that haven't been written in the last
867        * `config_write_interval' seconds. */
868       flush_old_values (config_write_interval);
870       /* unlock the cache while we rotate so we don't block incoming
871        * updates if the fsync() blocks on disk I/O */
872       pthread_mutex_unlock(&cache_lock);
873       journal_rotate();
874       pthread_mutex_lock(&cache_lock);
875     }
877     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
878     if (status != 0 && status != ETIMEDOUT)
879     {
880       RRDD_LOG (LOG_ERR, "flush_thread_main: "
881                 "pthread_cond_timedwait returned %i.", status);
882     }
883   }
885   if (config_flush_at_shutdown)
886     flush_old_values (-1); /* flush everything */
888   state = SHUTDOWN;
890   pthread_mutex_unlock(&cache_lock);
892   return NULL;
893 } /* void *flush_thread_main */
895 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
897   pthread_mutex_lock (&cache_lock);
899   while (state != SHUTDOWN
900          || (cache_queue_head != NULL && config_flush_at_shutdown))
901   {
902     cache_item_t *ci;
903     char *file;
904     char **values;
905     size_t values_num;
906     int status;
908     /* Now, check if there's something to store away. If not, wait until
909      * something comes in. */
910     if (cache_queue_head == NULL)
911     {
912       status = pthread_cond_wait (&queue_cond, &cache_lock);
913       if ((status != 0) && (status != ETIMEDOUT))
914       {
915         RRDD_LOG (LOG_ERR, "queue_thread_main: "
916             "pthread_cond_wait returned %i.", status);
917       }
918     }
920     /* Check if a value has arrived. This may be NULL if we timed out or there
921      * was an interrupt such as a signal. */
922     if (cache_queue_head == NULL)
923       continue;
925     ci = cache_queue_head;
927     /* copy the relevant parts */
928     file = strdup (ci->file);
929     if (file == NULL)
930     {
931       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
932       continue;
933     }
935     assert(ci->values != NULL);
936     assert(ci->values_num > 0);
938     values = ci->values;
939     values_num = ci->values_num;
941     wipe_ci_values(ci, time(NULL));
942     remove_from_queue(ci);
944     pthread_mutex_unlock (&cache_lock);
946     rrd_clear_error ();
947     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
948     if (status != 0)
949     {
950       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
951           "rrd_update_r (%s) failed with status %i. (%s)",
952           file, status, rrd_get_error());
953     }
955     journal_write("wrote", file);
957     /* Search again in the tree.  It's possible someone issued a "FORGET"
958      * while we were writing the update values. */
959     pthread_mutex_lock(&cache_lock);
960     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
961     if (ci)
962       pthread_cond_broadcast(&ci->flushed);
963     pthread_mutex_unlock(&cache_lock);
965     if (status == 0)
966     {
967       pthread_mutex_lock (&stats_lock);
968       stats_updates_written++;
969       stats_data_sets_written += values_num;
970       pthread_mutex_unlock (&stats_lock);
971     }
973     rrd_free_ptrs((void ***) &values, &values_num);
974     free(file);
976     pthread_mutex_lock (&cache_lock);
977   }
978   pthread_mutex_unlock (&cache_lock);
980   return (NULL);
981 } /* }}} void *queue_thread_main */
983 static int buffer_get_field (char **buffer_ret, /* {{{ */
984     size_t *buffer_size_ret, char **field_ret)
986   char *buffer;
987   size_t buffer_pos;
988   size_t buffer_size;
989   char *field;
990   size_t field_size;
991   int status;
993   buffer = *buffer_ret;
994   buffer_pos = 0;
995   buffer_size = *buffer_size_ret;
996   field = *buffer_ret;
997   field_size = 0;
999   if (buffer_size <= 0)
1000     return (-1);
1002   /* This is ensured by `handle_request'. */
1003   assert (buffer[buffer_size - 1] == '\0');
1005   status = -1;
1006   while (buffer_pos < buffer_size)
1007   {
1008     /* Check for end-of-field or end-of-buffer */
1009     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1010     {
1011       field[field_size] = 0;
1012       field_size++;
1013       buffer_pos++;
1014       status = 0;
1015       break;
1016     }
1017     /* Handle escaped characters. */
1018     else if (buffer[buffer_pos] == '\\')
1019     {
1020       if (buffer_pos >= (buffer_size - 1))
1021         break;
1022       buffer_pos++;
1023       field[field_size] = buffer[buffer_pos];
1024       field_size++;
1025       buffer_pos++;
1026     }
1027     /* Normal operation */ 
1028     else
1029     {
1030       field[field_size] = buffer[buffer_pos];
1031       field_size++;
1032       buffer_pos++;
1033     }
1034   } /* while (buffer_pos < buffer_size) */
1036   if (status != 0)
1037     return (status);
1039   *buffer_ret = buffer + buffer_pos;
1040   *buffer_size_ret = buffer_size - buffer_pos;
1041   *field_ret = field;
1043   return (0);
1044 } /* }}} int buffer_get_field */
1046 /* if we're restricting writes to the base directory,
1047  * check whether the file falls within the dir
1048  * returns 1 if OK, otherwise 0
1049  */
1050 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1052   assert(file != NULL);
1054   if (!config_write_base_only
1055       || JOURNAL_REPLAY(sock)
1056       || config_base_dir == NULL)
1057     return 1;
1059   if (strstr(file, "../") != NULL) goto err;
1061   /* relative paths without "../" are ok */
1062   if (*file != '/') return 1;
1064   /* file must be of the format base + "/" + <1+ char filename> */
1065   if (strlen(file) < _config_base_dir_len + 2) goto err;
1066   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1067   if (*(file + _config_base_dir_len) != '/') goto err;
1069   return 1;
1071 err:
1072   if (sock != NULL && sock->fd >= 0)
1073     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1075   return 0;
1076 } /* }}} static int check_file_access */
1078 /* when using a base dir, convert relative paths to absolute paths.
1079  * if necessary, modifies the "filename" pointer to point
1080  * to the new path created in "tmp".  "tmp" is provided
1081  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1082  *
1083  * this allows us to optimize for the expected case (absolute path)
1084  * with a no-op.
1085  */
1086 static void get_abs_path(char **filename, char *tmp)
1088   assert(tmp != NULL);
1089   assert(filename != NULL && *filename != NULL);
1091   if (config_base_dir == NULL || **filename == '/')
1092     return;
1094   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1095   *filename = tmp;
1096 } /* }}} static int get_abs_path */
1098 static int flush_file (const char *filename) /* {{{ */
1100   cache_item_t *ci;
1102   pthread_mutex_lock (&cache_lock);
1104   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1105   if (ci == NULL)
1106   {
1107     pthread_mutex_unlock (&cache_lock);
1108     return (ENOENT);
1109   }
1111   if (ci->values_num > 0)
1112   {
1113     /* Enqueue at head */
1114     enqueue_cache_item (ci, HEAD);
1115     pthread_cond_wait(&ci->flushed, &cache_lock);
1116   }
1118   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1119    * may have been purged during our cond_wait() */
1121   pthread_mutex_unlock(&cache_lock);
1123   return (0);
1124 } /* }}} int flush_file */
1126 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1128   char *err = "Syntax error.\n";
1130   if (cmd && cmd->syntax)
1131     err = cmd->syntax;
1133   return send_response(sock, RESP_ERR, "Usage: %s", err);
1134 } /* }}} static int syntax_error() */
1136 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1138   uint64_t copy_queue_length;
1139   uint64_t copy_updates_received;
1140   uint64_t copy_flush_received;
1141   uint64_t copy_updates_written;
1142   uint64_t copy_data_sets_written;
1143   uint64_t copy_journal_bytes;
1144   uint64_t copy_journal_rotate;
1146   uint64_t tree_nodes_number;
1147   uint64_t tree_depth;
1149   pthread_mutex_lock (&stats_lock);
1150   copy_queue_length       = stats_queue_length;
1151   copy_updates_received   = stats_updates_received;
1152   copy_flush_received     = stats_flush_received;
1153   copy_updates_written    = stats_updates_written;
1154   copy_data_sets_written  = stats_data_sets_written;
1155   copy_journal_bytes      = stats_journal_bytes;
1156   copy_journal_rotate     = stats_journal_rotate;
1157   pthread_mutex_unlock (&stats_lock);
1159   pthread_mutex_lock (&cache_lock);
1160   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1161   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1162   pthread_mutex_unlock (&cache_lock);
1164   add_response_info(sock,
1165                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1166   add_response_info(sock,
1167                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1168   add_response_info(sock,
1169                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1170   add_response_info(sock,
1171                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1172   add_response_info(sock,
1173                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1174   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1175   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1176   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1177   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1179   send_response(sock, RESP_OK, "Statistics follow\n");
1181   return (0);
1182 } /* }}} int handle_request_stats */
1184 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1186   char *file, file_tmp[PATH_MAX];
1187   int status;
1189   status = buffer_get_field (&buffer, &buffer_size, &file);
1190   if (status != 0)
1191   {
1192     return syntax_error(sock,cmd);
1193   }
1194   else
1195   {
1196     pthread_mutex_lock(&stats_lock);
1197     stats_flush_received++;
1198     pthread_mutex_unlock(&stats_lock);
1200     get_abs_path(&file, file_tmp);
1201     if (!check_file_access(file, sock)) return 0;
1203     status = flush_file (file);
1204     if (status == 0)
1205       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1206     else if (status == ENOENT)
1207     {
1208       /* no file in our tree; see whether it exists at all */
1209       struct stat statbuf;
1211       memset(&statbuf, 0, sizeof(statbuf));
1212       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1213         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1214       else
1215         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1216     }
1217     else if (status < 0)
1218       return send_response(sock, RESP_ERR, "Internal error.\n");
1219     else
1220       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1221   }
1223   /* NOTREACHED */
1224   assert(1==0);
1225 } /* }}} int handle_request_flush */
1227 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1229   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1231   pthread_mutex_lock(&cache_lock);
1232   flush_old_values(-1);
1233   pthread_mutex_unlock(&cache_lock);
1235   return send_response(sock, RESP_OK, "Started flush.\n");
1236 } /* }}} static int handle_request_flushall */
1238 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1240   int status;
1241   char *file, file_tmp[PATH_MAX];
1242   cache_item_t *ci;
1244   status = buffer_get_field(&buffer, &buffer_size, &file);
1245   if (status != 0)
1246     return syntax_error(sock,cmd);
1248   get_abs_path(&file, file_tmp);
1250   pthread_mutex_lock(&cache_lock);
1251   ci = g_tree_lookup(cache_tree, file);
1252   if (ci == NULL)
1253   {
1254     pthread_mutex_unlock(&cache_lock);
1255     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1256   }
1258   for (size_t i=0; i < ci->values_num; i++)
1259     add_response_info(sock, "%s\n", ci->values[i]);
1261   pthread_mutex_unlock(&cache_lock);
1262   return send_response(sock, RESP_OK, "updates pending\n");
1263 } /* }}} static int handle_request_pending */
1265 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1267   int status;
1268   gboolean found;
1269   char *file, file_tmp[PATH_MAX];
1271   status = buffer_get_field(&buffer, &buffer_size, &file);
1272   if (status != 0)
1273     return syntax_error(sock,cmd);
1275   get_abs_path(&file, file_tmp);
1276   if (!check_file_access(file, sock)) return 0;
1278   pthread_mutex_lock(&cache_lock);
1279   found = g_tree_remove(cache_tree, file);
1280   pthread_mutex_unlock(&cache_lock);
1282   if (found == TRUE)
1283   {
1284     if (!JOURNAL_REPLAY(sock))
1285       journal_write("forget", file);
1287     return send_response(sock, RESP_OK, "Gone!\n");
1288   }
1289   else
1290     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1292   /* NOTREACHED */
1293   assert(1==0);
1294 } /* }}} static int handle_request_forget */
1296 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1298   cache_item_t *ci;
1300   pthread_mutex_lock(&cache_lock);
1302   ci = cache_queue_head;
1303   while (ci != NULL)
1304   {
1305     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1306     ci = ci->next;
1307   }
1309   pthread_mutex_unlock(&cache_lock);
1311   return send_response(sock, RESP_OK, "in queue.\n");
1312 } /* }}} int handle_request_queue */
1314 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1316   char *file, file_tmp[PATH_MAX];
1317   int values_num = 0;
1318   int status;
1319   char orig_buf[CMD_MAX];
1321   cache_item_t *ci;
1323   /* save it for the journal later */
1324   if (!JOURNAL_REPLAY(sock))
1325     strncpy(orig_buf, buffer, buffer_size);
1327   status = buffer_get_field (&buffer, &buffer_size, &file);
1328   if (status != 0)
1329     return syntax_error(sock,cmd);
1331   pthread_mutex_lock(&stats_lock);
1332   stats_updates_received++;
1333   pthread_mutex_unlock(&stats_lock);
1335   get_abs_path(&file, file_tmp);
1336   if (!check_file_access(file, sock)) return 0;
1338   pthread_mutex_lock (&cache_lock);
1339   ci = g_tree_lookup (cache_tree, file);
1341   if (ci == NULL) /* {{{ */
1342   {
1343     struct stat statbuf;
1344     cache_item_t *tmp;
1346     /* don't hold the lock while we setup; stat(2) might block */
1347     pthread_mutex_unlock(&cache_lock);
1349     memset (&statbuf, 0, sizeof (statbuf));
1350     status = stat (file, &statbuf);
1351     if (status != 0)
1352     {
1353       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1355       status = errno;
1356       if (status == ENOENT)
1357         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1358       else
1359         return send_response(sock, RESP_ERR,
1360                              "stat failed with error %i.\n", status);
1361     }
1362     if (!S_ISREG (statbuf.st_mode))
1363       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1365     if (access(file, R_OK|W_OK) != 0)
1366       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1367                            file, rrd_strerror(errno));
1369     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1370     if (ci == NULL)
1371     {
1372       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1374       return send_response(sock, RESP_ERR, "malloc failed.\n");
1375     }
1376     memset (ci, 0, sizeof (cache_item_t));
1378     ci->file = strdup (file);
1379     if (ci->file == NULL)
1380     {
1381       free (ci);
1382       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1384       return send_response(sock, RESP_ERR, "strdup failed.\n");
1385     }
1387     wipe_ci_values(ci, now);
1388     ci->flags = CI_FLAGS_IN_TREE;
1389     pthread_cond_init(&ci->flushed, NULL);
1391     pthread_mutex_lock(&cache_lock);
1393     /* another UPDATE might have added this entry in the meantime */
1394     tmp = g_tree_lookup (cache_tree, file);
1395     if (tmp == NULL)
1396       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1397     else
1398     {
1399       free_cache_item (ci);
1400       ci = tmp;
1401     }
1403     /* state may have changed while we were unlocked */
1404     if (state == SHUTDOWN)
1405       return -1;
1406   } /* }}} */
1407   assert (ci != NULL);
1409   /* don't re-write updates in replay mode */
1410   if (!JOURNAL_REPLAY(sock))
1411     journal_write("update", orig_buf);
1413   while (buffer_size > 0)
1414   {
1415     char *value;
1416     time_t stamp;
1417     char *eostamp;
1419     status = buffer_get_field (&buffer, &buffer_size, &value);
1420     if (status != 0)
1421     {
1422       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1423       break;
1424     }
1426     /* make sure update time is always moving forward */
1427     stamp = strtol(value, &eostamp, 10);
1428     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1429     {
1430       pthread_mutex_unlock(&cache_lock);
1431       return send_response(sock, RESP_ERR,
1432                            "Cannot find timestamp in '%s'!\n", value);
1433     }
1434     else if (stamp <= ci->last_update_stamp)
1435     {
1436       pthread_mutex_unlock(&cache_lock);
1437       return send_response(sock, RESP_ERR,
1438                            "illegal attempt to update using time %ld when last"
1439                            " update time is %ld (minimum one second step)\n",
1440                            stamp, ci->last_update_stamp);
1441     }
1442     else
1443       ci->last_update_stamp = stamp;
1445     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1446     {
1447       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1448       continue;
1449     }
1451     values_num++;
1452   }
1454   if (((now - ci->last_flush_time) >= config_write_interval)
1455       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1456       && (ci->values_num > 0))
1457   {
1458     enqueue_cache_item (ci, TAIL);
1459   }
1461   pthread_mutex_unlock (&cache_lock);
1463   if (values_num < 1)
1464     return send_response(sock, RESP_ERR, "No values updated.\n");
1465   else
1466     return send_response(sock, RESP_OK,
1467                          "errors, enqueued %i value(s).\n", values_num);
1469   /* NOTREACHED */
1470   assert(1==0);
1472 } /* }}} int handle_request_update */
1474 /* we came across a "WROTE" entry during journal replay.
1475  * throw away any values that we have accumulated for this file
1476  */
1477 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1479   cache_item_t *ci;
1480   const char *file = buffer;
1482   pthread_mutex_lock(&cache_lock);
1484   ci = g_tree_lookup(cache_tree, file);
1485   if (ci == NULL)
1486   {
1487     pthread_mutex_unlock(&cache_lock);
1488     return (0);
1489   }
1491   if (ci->values)
1492     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1494   wipe_ci_values(ci, now);
1495   remove_from_queue(ci);
1497   pthread_mutex_unlock(&cache_lock);
1498   return (0);
1499 } /* }}} int handle_request_wrote */
1501 /* start "BATCH" processing */
1502 static int batch_start (HANDLER_PROTO) /* {{{ */
1504   int status;
1505   if (sock->batch_start)
1506     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1508   status = send_response(sock, RESP_OK,
1509                          "Go ahead.  End with dot '.' on its own line.\n");
1510   sock->batch_start = time(NULL);
1511   sock->batch_cmd = 0;
1513   return status;
1514 } /* }}} static int batch_start */
1516 /* finish "BATCH" processing and return results to the client */
1517 static int batch_done (HANDLER_PROTO) /* {{{ */
1519   assert(sock->batch_start);
1520   sock->batch_start = 0;
1521   sock->batch_cmd  = 0;
1522   return send_response(sock, RESP_OK, "errors\n");
1523 } /* }}} static int batch_done */
1525 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1527   return -1;
1528 } /* }}} static int handle_request_quit */
1530 static command_t list_of_commands[] = { /* {{{ */
1531   {
1532     "UPDATE",
1533     handle_request_update,
1534     CMD_CONTEXT_ANY,
1535     "UPDATE <filename> <values> [<values> ...]\n"
1536     ,
1537     "Adds the given file to the internal cache if it is not yet known and\n"
1538     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1539     "for details.\n"
1540     "\n"
1541     "Each <values> has the following form:\n"
1542     "  <values> = <time>:<value>[:<value>[...]]\n"
1543     "See the rrdupdate(1) manpage for details.\n"
1544   },
1545   {
1546     "WROTE",
1547     handle_request_wrote,
1548     CMD_CONTEXT_JOURNAL,
1549     NULL,
1550     NULL
1551   },
1552   {
1553     "FLUSH",
1554     handle_request_flush,
1555     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1556     "FLUSH <filename>\n"
1557     ,
1558     "Adds the given filename to the head of the update queue and returns\n"
1559     "after it has been dequeued.\n"
1560   },
1561   {
1562     "FLUSHALL",
1563     handle_request_flushall,
1564     CMD_CONTEXT_CLIENT,
1565     "FLUSHALL\n"
1566     ,
1567     "Triggers writing of all pending updates.  Returns immediately.\n"
1568   },
1569   {
1570     "PENDING",
1571     handle_request_pending,
1572     CMD_CONTEXT_CLIENT,
1573     "PENDING <filename>\n"
1574     ,
1575     "Shows any 'pending' updates for a file, in order.\n"
1576     "The updates shown have not yet been written to the underlying RRD file.\n"
1577   },
1578   {
1579     "FORGET",
1580     handle_request_forget,
1581     CMD_CONTEXT_ANY,
1582     "FORGET <filename>\n"
1583     ,
1584     "Removes the file completely from the cache.\n"
1585     "Any pending updates for the file will be lost.\n"
1586   },
1587   {
1588     "QUEUE",
1589     handle_request_queue,
1590     CMD_CONTEXT_CLIENT,
1591     "QUEUE\n"
1592     ,
1593         "Shows all files in the output queue.\n"
1594     "The output is zero or more lines in the following format:\n"
1595     "(where <num_vals> is the number of values to be written)\n"
1596     "\n"
1597     "<num_vals> <filename>\n"
1598   },
1599   {
1600     "STATS",
1601     handle_request_stats,
1602     CMD_CONTEXT_CLIENT,
1603     "STATS\n"
1604     ,
1605     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1606     "a description of the values.\n"
1607   },
1608   {
1609     "HELP",
1610     handle_request_help,
1611     CMD_CONTEXT_CLIENT,
1612     "HELP [<command>]\n",
1613     NULL, /* special! */
1614   },
1615   {
1616     "BATCH",
1617     batch_start,
1618     CMD_CONTEXT_CLIENT,
1619     "BATCH\n"
1620     ,
1621     "The 'BATCH' command permits the client to initiate a bulk load\n"
1622     "   of commands to rrdcached.\n"
1623     "\n"
1624     "Usage:\n"
1625     "\n"
1626     "    client: BATCH\n"
1627     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1628     "    client: command #1\n"
1629     "    client: command #2\n"
1630     "    client: ... and so on\n"
1631     "    client: .\n"
1632     "    server: 2 errors\n"
1633     "    server: 7 message for command #7\n"
1634     "    server: 9 message for command #9\n"
1635     "\n"
1636     "For more information, consult the rrdcached(1) documentation.\n"
1637   },
1638   {
1639     ".",   /* BATCH terminator */
1640     batch_done,
1641     CMD_CONTEXT_BATCH,
1642     NULL,
1643     NULL
1644   },
1645   {
1646     "QUIT",
1647     handle_request_quit,
1648     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1649     "QUIT\n"
1650     ,
1651     "Disconnect from rrdcached.\n"
1652   }
1653 }; /* }}} command_t list_of_commands[] */
1654 static size_t list_of_commands_len = sizeof (list_of_commands)
1655   / sizeof (list_of_commands[0]);
1657 static command_t *find_command(char *cmd)
1659   size_t i;
1661   for (i = 0; i < list_of_commands_len; i++)
1662     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1663       return (&list_of_commands[i]);
1664   return NULL;
1667 /* We currently use the index in the `list_of_commands' array as a bit position
1668  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1669  * outside these functions so that switching to a more elegant storage method
1670  * is easily possible. */
1671 static ssize_t find_command_index (const char *cmd) /* {{{ */
1673   size_t i;
1675   for (i = 0; i < list_of_commands_len; i++)
1676     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1677       return ((ssize_t) i);
1678   return (-1);
1679 } /* }}} ssize_t find_command_index */
1681 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1682     const char *cmd)
1684   ssize_t i;
1686   if (JOURNAL_REPLAY(sock))
1687     return (1);
1689   if (cmd == NULL)
1690     return (-1);
1692   if ((strcasecmp ("QUIT", cmd) == 0)
1693       || (strcasecmp ("HELP", cmd) == 0))
1694     return (1);
1695   else if (strcmp (".", cmd) == 0)
1696     cmd = "BATCH";
1698   i = find_command_index (cmd);
1699   if (i < 0)
1700     return (-1);
1701   assert (i < 32);
1703   if ((sock->permissions & (1 << i)) != 0)
1704     return (1);
1705   return (0);
1706 } /* }}} int socket_permission_check */
1708 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1709     const char *cmd)
1711   ssize_t i;
1713   i = find_command_index (cmd);
1714   if (i < 0)
1715     return (-1);
1716   assert (i < 32);
1718   sock->permissions |= (1 << i);
1719   return (0);
1720 } /* }}} int socket_permission_add */
1722 /* check whether commands are received in the expected context */
1723 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1725   if (JOURNAL_REPLAY(sock))
1726     return (cmd->context & CMD_CONTEXT_JOURNAL);
1727   else if (sock->batch_start)
1728     return (cmd->context & CMD_CONTEXT_BATCH);
1729   else
1730     return (cmd->context & CMD_CONTEXT_CLIENT);
1732   /* NOTREACHED */
1733   assert(1==0);
1736 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1738   int status;
1739   char *cmd_str;
1740   char *resp_txt;
1741   command_t *help = NULL;
1743   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1744   if (status == 0)
1745     help = find_command(cmd_str);
1747   if (help && (help->syntax || help->help))
1748   {
1749     char tmp[CMD_MAX];
1751     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1752     resp_txt = tmp;
1754     if (help->syntax)
1755       add_response_info(sock, "Usage: %s\n", help->syntax);
1757     if (help->help)
1758       add_response_info(sock, "%s\n", help->help);
1759   }
1760   else
1761   {
1762     size_t i;
1764     resp_txt = "Command overview\n";
1766     for (i = 0; i < list_of_commands_len; i++)
1767     {
1768       if (list_of_commands[i].syntax == NULL)
1769         continue;
1770       add_response_info (sock, "%s", list_of_commands[i].syntax);
1771     }
1772   }
1774   return send_response(sock, RESP_OK, resp_txt);
1775 } /* }}} int handle_request_help */
1777 static int handle_request (DISPATCH_PROTO) /* {{{ */
1779   char *buffer_ptr = buffer;
1780   char *cmd_str = NULL;
1781   command_t *cmd = NULL;
1782   int status;
1784   assert (buffer[buffer_size - 1] == '\0');
1786   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1787   if (status != 0)
1788   {
1789     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1790     return (-1);
1791   }
1793   if (sock != NULL && sock->batch_start)
1794     sock->batch_cmd++;
1796   cmd = find_command(cmd_str);
1797   if (!cmd)
1798     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1800   if (!socket_permission_check (sock, cmd->cmd))
1801     return send_response(sock, RESP_ERR, "Permission denied.\n");
1803   if (!command_check_context(sock, cmd))
1804     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1806   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1807 } /* }}} int handle_request */
1809 static void journal_set_free (journal_set *js) /* {{{ */
1811   if (js == NULL)
1812     return;
1814   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1816   free(js);
1817 } /* }}} journal_set_free */
1819 static void journal_set_remove (journal_set *js) /* {{{ */
1821   if (js == NULL)
1822     return;
1824   for (uint i=0; i < js->files_num; i++)
1825   {
1826     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1827     unlink(js->files[i]);
1828   }
1829 } /* }}} journal_set_remove */
1831 /* close current journal file handle.
1832  * MUST hold journal_lock before calling */
1833 static void journal_close(void) /* {{{ */
1835   if (journal_fh != NULL)
1836   {
1837     if (fclose(journal_fh) != 0)
1838       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1839   }
1841   journal_fh = NULL;
1842   journal_size = 0;
1843 } /* }}} journal_close */
1845 /* MUST hold journal_lock before calling */
1846 static void journal_new_file(void) /* {{{ */
1848   struct timeval now;
1849   int  new_fd;
1850   char new_file[PATH_MAX + 1];
1852   assert(journal_dir != NULL);
1853   assert(journal_cur != NULL);
1855   journal_close();
1857   gettimeofday(&now, NULL);
1858   /* this format assures that the files sort in strcmp() order */
1859   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1860            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1862   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1863                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1864   if (new_fd < 0)
1865     goto error;
1867   journal_fh = fdopen(new_fd, "a");
1868   if (journal_fh == NULL)
1869     goto error;
1871   journal_size = ftell(journal_fh);
1872   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1874   /* record the file in the journal set */
1875   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1877   return;
1879 error:
1880   RRDD_LOG(LOG_CRIT,
1881            "JOURNALING DISABLED: Error while trying to create %s : %s",
1882            new_file, rrd_strerror(errno));
1883   RRDD_LOG(LOG_CRIT,
1884            "JOURNALING DISABLED: All values will be flushed at shutdown");
1886   close(new_fd);
1887   config_flush_at_shutdown = 1;
1889 } /* }}} journal_new_file */
1891 /* MUST NOT hold journal_lock before calling this */
1892 static void journal_rotate(void) /* {{{ */
1894   journal_set *old_js = NULL;
1896   if (journal_dir == NULL)
1897     return;
1899   RRDD_LOG(LOG_DEBUG, "rotating journals");
1901   pthread_mutex_lock(&stats_lock);
1902   ++stats_journal_rotate;
1903   pthread_mutex_unlock(&stats_lock);
1905   pthread_mutex_lock(&journal_lock);
1907   journal_close();
1909   /* rotate the journal sets */
1910   old_js = journal_old;
1911   journal_old = journal_cur;
1912   journal_cur = calloc(1, sizeof(journal_set));
1914   if (journal_cur != NULL)
1915     journal_new_file();
1916   else
1917     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1919   pthread_mutex_unlock(&journal_lock);
1921   journal_set_remove(old_js);
1922   journal_set_free  (old_js);
1924 } /* }}} static void journal_rotate */
1926 /* MUST hold journal_lock when calling */
1927 static void journal_done(void) /* {{{ */
1929   if (journal_cur == NULL)
1930     return;
1932   journal_close();
1934   if (config_flush_at_shutdown)
1935   {
1936     RRDD_LOG(LOG_INFO, "removing journals");
1937     journal_set_remove(journal_old);
1938     journal_set_remove(journal_cur);
1939   }
1940   else
1941   {
1942     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1943              "journals will be used at next startup");
1944   }
1946   journal_set_free(journal_cur);
1947   journal_set_free(journal_old);
1948   free(journal_dir);
1950 } /* }}} static void journal_done */
1952 static int journal_write(char *cmd, char *args) /* {{{ */
1954   int chars;
1956   if (journal_fh == NULL)
1957     return 0;
1959   pthread_mutex_lock(&journal_lock);
1960   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1961   journal_size += chars;
1963   if (journal_size > JOURNAL_MAX)
1964     journal_new_file();
1966   pthread_mutex_unlock(&journal_lock);
1968   if (chars > 0)
1969   {
1970     pthread_mutex_lock(&stats_lock);
1971     stats_journal_bytes += chars;
1972     pthread_mutex_unlock(&stats_lock);
1973   }
1975   return chars;
1976 } /* }}} static int journal_write */
1978 static int journal_replay (const char *file) /* {{{ */
1980   FILE *fh;
1981   int entry_cnt = 0;
1982   int fail_cnt = 0;
1983   uint64_t line = 0;
1984   char entry[CMD_MAX];
1985   time_t now;
1987   if (file == NULL) return 0;
1989   {
1990     char *reason = "unknown error";
1991     int status = 0;
1992     struct stat statbuf;
1994     memset(&statbuf, 0, sizeof(statbuf));
1995     if (stat(file, &statbuf) != 0)
1996     {
1997       reason = "stat error";
1998       status = errno;
1999     }
2000     else if (!S_ISREG(statbuf.st_mode))
2001     {
2002       reason = "not a regular file";
2003       status = EPERM;
2004     }
2005     if (statbuf.st_uid != daemon_uid)
2006     {
2007       reason = "not owned by daemon user";
2008       status = EACCES;
2009     }
2010     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2011     {
2012       reason = "must not be user/group writable";
2013       status = EACCES;
2014     }
2016     if (status != 0)
2017     {
2018       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2019                file, rrd_strerror(status), reason);
2020       return 0;
2021     }
2022   }
2024   fh = fopen(file, "r");
2025   if (fh == NULL)
2026   {
2027     if (errno != ENOENT)
2028       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2029                file, rrd_strerror(errno));
2030     return 0;
2031   }
2032   else
2033     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2035   now = time(NULL);
2037   while(!feof(fh))
2038   {
2039     size_t entry_len;
2041     ++line;
2042     if (fgets(entry, sizeof(entry), fh) == NULL)
2043       break;
2044     entry_len = strlen(entry);
2046     /* check \n termination in case journal writing crashed mid-line */
2047     if (entry_len == 0)
2048       continue;
2049     else if (entry[entry_len - 1] != '\n')
2050     {
2051       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2052       ++fail_cnt;
2053       continue;
2054     }
2056     entry[entry_len - 1] = '\0';
2058     if (handle_request(NULL, now, entry, entry_len) == 0)
2059       ++entry_cnt;
2060     else
2061       ++fail_cnt;
2062   }
2064   fclose(fh);
2066   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2067            entry_cnt, fail_cnt);
2069   return entry_cnt > 0 ? 1 : 0;
2070 } /* }}} static int journal_replay */
2072 static int journal_sort(const void *v1, const void *v2)
2074   char **jn1 = (char **) v1;
2075   char **jn2 = (char **) v2;
2077   return strcmp(*jn1,*jn2);
2080 static void journal_init(void) /* {{{ */
2082   int had_journal = 0;
2083   DIR *dir;
2084   struct dirent *dent;
2085   char path[PATH_MAX+1];
2087   if (journal_dir == NULL) return;
2089   pthread_mutex_lock(&journal_lock);
2091   journal_cur = calloc(1, sizeof(journal_set));
2092   if (journal_cur == NULL)
2093   {
2094     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2095     return;
2096   }
2098   RRDD_LOG(LOG_INFO, "checking for journal files");
2100   /* Handle old journal files during transition.  This gives them the
2101    * correct sort order.  TODO: remove after first release
2102    */
2103   {
2104     char old_path[PATH_MAX+1];
2105     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2106     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2107     rename(old_path, path);
2109     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2110     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2111     rename(old_path, path);
2112   }
2114   dir = opendir(journal_dir);
2115   if (!dir) {
2116     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2117     return;
2118   }
2119   while ((dent = readdir(dir)) != NULL)
2120   {
2121     /* looks like a journal file? */
2122     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2123       continue;
2125     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2127     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2128     {
2129       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2130                dent->d_name);
2131       break;
2132     }
2133   }
2134   closedir(dir);
2136   qsort(journal_cur->files, journal_cur->files_num,
2137         sizeof(journal_cur->files[0]), journal_sort);
2139   for (uint i=0; i < journal_cur->files_num; i++)
2140     had_journal += journal_replay(journal_cur->files[i]);
2142   journal_new_file();
2144   /* it must have been a crash.  start a flush */
2145   if (had_journal && config_flush_at_shutdown)
2146     flush_old_values(-1);
2148   pthread_mutex_unlock(&journal_lock);
2150   RRDD_LOG(LOG_INFO, "journal processing complete");
2152 } /* }}} static void journal_init */
2154 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2156   assert(sock != NULL);
2158   free(sock->rbuf);  sock->rbuf = NULL;
2159   free(sock->wbuf);  sock->wbuf = NULL;
2160   free(sock);
2161 } /* }}} void free_listen_socket */
2163 static void close_connection(listen_socket_t *sock) /* {{{ */
2165   if (sock->fd >= 0)
2166   {
2167     close(sock->fd);
2168     sock->fd = -1;
2169   }
2171   free_listen_socket(sock);
2173 } /* }}} void close_connection */
2175 static void *connection_thread_main (void *args) /* {{{ */
2177   listen_socket_t *sock;
2178   int fd;
2180   sock = (listen_socket_t *) args;
2181   fd = sock->fd;
2183   /* init read buffers */
2184   sock->next_read = sock->next_cmd = 0;
2185   sock->rbuf = malloc(RBUF_SIZE);
2186   if (sock->rbuf == NULL)
2187   {
2188     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2189     close_connection(sock);
2190     return NULL;
2191   }
2193   pthread_mutex_lock (&connection_threads_lock);
2194   connection_threads_num++;
2195   pthread_mutex_unlock (&connection_threads_lock);
2197   while (state == RUNNING)
2198   {
2199     char *cmd;
2200     ssize_t cmd_len;
2201     ssize_t rbytes;
2202     time_t now;
2204     struct pollfd pollfd;
2205     int status;
2207     pollfd.fd = fd;
2208     pollfd.events = POLLIN | POLLPRI;
2209     pollfd.revents = 0;
2211     status = poll (&pollfd, 1, /* timeout = */ 500);
2212     if (state != RUNNING)
2213       break;
2214     else if (status == 0) /* timeout */
2215       continue;
2216     else if (status < 0) /* error */
2217     {
2218       status = errno;
2219       if (status != EINTR)
2220         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2221       continue;
2222     }
2224     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2225       break;
2226     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2227     {
2228       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2229           "poll(2) returned something unexpected: %#04hx",
2230           pollfd.revents);
2231       break;
2232     }
2234     rbytes = read(fd, sock->rbuf + sock->next_read,
2235                   RBUF_SIZE - sock->next_read);
2236     if (rbytes < 0)
2237     {
2238       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2239       break;
2240     }
2241     else if (rbytes == 0)
2242       break; /* eof */
2244     sock->next_read += rbytes;
2246     if (sock->batch_start)
2247       now = sock->batch_start;
2248     else
2249       now = time(NULL);
2251     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2252     {
2253       status = handle_request (sock, now, cmd, cmd_len+1);
2254       if (status != 0)
2255         goto out_close;
2256     }
2257   }
2259 out_close:
2260   close_connection(sock);
2262   /* Remove this thread from the connection threads list */
2263   pthread_mutex_lock (&connection_threads_lock);
2264   connection_threads_num--;
2265   if (connection_threads_num <= 0)
2266     pthread_cond_broadcast(&connection_threads_done);
2267   pthread_mutex_unlock (&connection_threads_lock);
2269   return (NULL);
2270 } /* }}} void *connection_thread_main */
2272 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2274   int fd;
2275   struct sockaddr_un sa;
2276   listen_socket_t *temp;
2277   int status;
2278   const char *path;
2279   char *path_copy, *dir;
2281   path = sock->addr;
2282   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2283     path += strlen("unix:");
2285   /* dirname may modify its argument */
2286   path_copy = strdup(path);
2287   if (path_copy == NULL)
2288   {
2289     fprintf(stderr, "rrdcached: strdup(): %s\n",
2290         rrd_strerror(errno));
2291     return (-1);
2292   }
2294   dir = dirname(path_copy);
2295   if (rrd_mkdir_p(dir, 0777) != 0)
2296   {
2297     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2298         dir, rrd_strerror(errno));
2299     return (-1);
2300   }
2302   free(path_copy);
2304   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2305       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2306   if (temp == NULL)
2307   {
2308     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2309     return (-1);
2310   }
2311   listen_fds = temp;
2312   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2314   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2315   if (fd < 0)
2316   {
2317     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2318              rrd_strerror(errno));
2319     return (-1);
2320   }
2322   memset (&sa, 0, sizeof (sa));
2323   sa.sun_family = AF_UNIX;
2324   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2326   /* if we've gotten this far, we own the pid file.  any daemon started
2327    * with the same args must not be alive.  therefore, ensure that we can
2328    * create the socket...
2329    */
2330   unlink(path);
2332   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2333   if (status != 0)
2334   {
2335     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2336              path, rrd_strerror(errno));
2337     close (fd);
2338     return (-1);
2339   }
2341   /* tweak the sockets group ownership */
2342   if (sock->socket_group != (gid_t)-1)
2343   {
2344     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2345          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2346     {
2347       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2348     }
2349   }
2351   if (sock->socket_permissions != (mode_t)-1)
2352   {
2353     if (chmod(path, sock->socket_permissions) != 0)
2354       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2355           (unsigned int)sock->socket_permissions, strerror(errno));
2356   }
2358   status = listen (fd, /* backlog = */ 10);
2359   if (status != 0)
2360   {
2361     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2362              path, rrd_strerror(errno));
2363     close (fd);
2364     unlink (path);
2365     return (-1);
2366   }
2368   listen_fds[listen_fds_num].fd = fd;
2369   listen_fds[listen_fds_num].family = PF_UNIX;
2370   strncpy(listen_fds[listen_fds_num].addr, path,
2371           sizeof (listen_fds[listen_fds_num].addr) - 1);
2372   listen_fds_num++;
2374   return (0);
2375 } /* }}} int open_listen_socket_unix */
2377 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2379   struct addrinfo ai_hints;
2380   struct addrinfo *ai_res;
2381   struct addrinfo *ai_ptr;
2382   char addr_copy[NI_MAXHOST];
2383   char *addr;
2384   char *port;
2385   int status;
2387   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2388   addr_copy[sizeof (addr_copy) - 1] = 0;
2389   addr = addr_copy;
2391   memset (&ai_hints, 0, sizeof (ai_hints));
2392   ai_hints.ai_flags = 0;
2393 #ifdef AI_ADDRCONFIG
2394   ai_hints.ai_flags |= AI_ADDRCONFIG;
2395 #endif
2396   ai_hints.ai_family = AF_UNSPEC;
2397   ai_hints.ai_socktype = SOCK_STREAM;
2399   port = NULL;
2400   if (*addr == '[') /* IPv6+port format */
2401   {
2402     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2403     addr++;
2405     port = strchr (addr, ']');
2406     if (port == NULL)
2407     {
2408       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2409       return (-1);
2410     }
2411     *port = 0;
2412     port++;
2414     if (*port == ':')
2415       port++;
2416     else if (*port == 0)
2417       port = NULL;
2418     else
2419     {
2420       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2421       return (-1);
2422     }
2423   } /* if (*addr == '[') */
2424   else
2425   {
2426     port = rindex(addr, ':');
2427     if (port != NULL)
2428     {
2429       *port = 0;
2430       port++;
2431     }
2432   }
2433   ai_res = NULL;
2434   status = getaddrinfo (addr,
2435                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2436                         &ai_hints, &ai_res);
2437   if (status != 0)
2438   {
2439     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2440              addr, gai_strerror (status));
2441     return (-1);
2442   }
2444   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2445   {
2446     int fd;
2447     listen_socket_t *temp;
2448     int one = 1;
2450     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2451         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2452     if (temp == NULL)
2453     {
2454       fprintf (stderr,
2455                "rrdcached: open_listen_socket_network: realloc failed.\n");
2456       continue;
2457     }
2458     listen_fds = temp;
2459     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2461     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2462     if (fd < 0)
2463     {
2464       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2465                rrd_strerror(errno));
2466       continue;
2467     }
2469     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2471     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2472     if (status != 0)
2473     {
2474       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2475                sock->addr, rrd_strerror(errno));
2476       close (fd);
2477       continue;
2478     }
2480     status = listen (fd, /* backlog = */ 10);
2481     if (status != 0)
2482     {
2483       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2484                sock->addr, rrd_strerror(errno));
2485       close (fd);
2486       freeaddrinfo(ai_res);
2487       return (-1);
2488     }
2490     listen_fds[listen_fds_num].fd = fd;
2491     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2492     listen_fds_num++;
2493   } /* for (ai_ptr) */
2495   freeaddrinfo(ai_res);
2496   return (0);
2497 } /* }}} static int open_listen_socket_network */
2499 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2501   assert(sock != NULL);
2502   assert(sock->addr != NULL);
2504   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2505       || sock->addr[0] == '/')
2506     return (open_listen_socket_unix(sock));
2507   else
2508     return (open_listen_socket_network(sock));
2509 } /* }}} int open_listen_socket */
2511 static int close_listen_sockets (void) /* {{{ */
2513   size_t i;
2515   for (i = 0; i < listen_fds_num; i++)
2516   {
2517     close (listen_fds[i].fd);
2519     if (listen_fds[i].family == PF_UNIX)
2520       unlink(listen_fds[i].addr);
2521   }
2523   free (listen_fds);
2524   listen_fds = NULL;
2525   listen_fds_num = 0;
2527   return (0);
2528 } /* }}} int close_listen_sockets */
2530 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2532   struct pollfd *pollfds;
2533   int pollfds_num;
2534   int status;
2535   int i;
2537   if (listen_fds_num < 1)
2538   {
2539     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2540     return (NULL);
2541   }
2543   pollfds_num = listen_fds_num;
2544   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2545   if (pollfds == NULL)
2546   {
2547     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2548     return (NULL);
2549   }
2550   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2552   RRDD_LOG(LOG_INFO, "listening for connections");
2554   while (state == RUNNING)
2555   {
2556     for (i = 0; i < pollfds_num; i++)
2557     {
2558       pollfds[i].fd = listen_fds[i].fd;
2559       pollfds[i].events = POLLIN | POLLPRI;
2560       pollfds[i].revents = 0;
2561     }
2563     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2564     if (state != RUNNING)
2565       break;
2566     else if (status == 0) /* timeout */
2567       continue;
2568     else if (status < 0) /* error */
2569     {
2570       status = errno;
2571       if (status != EINTR)
2572       {
2573         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2574       }
2575       continue;
2576     }
2578     for (i = 0; i < pollfds_num; i++)
2579     {
2580       listen_socket_t *client_sock;
2581       struct sockaddr_storage client_sa;
2582       socklen_t client_sa_size;
2583       pthread_t tid;
2584       pthread_attr_t attr;
2586       if (pollfds[i].revents == 0)
2587         continue;
2589       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2590       {
2591         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2592             "poll(2) returned something unexpected for listen FD #%i.",
2593             pollfds[i].fd);
2594         continue;
2595       }
2597       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2598       if (client_sock == NULL)
2599       {
2600         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2601         continue;
2602       }
2603       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2605       client_sa_size = sizeof (client_sa);
2606       client_sock->fd = accept (pollfds[i].fd,
2607           (struct sockaddr *) &client_sa, &client_sa_size);
2608       if (client_sock->fd < 0)
2609       {
2610         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2611         free(client_sock);
2612         continue;
2613       }
2615       pthread_attr_init (&attr);
2616       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2618       status = pthread_create (&tid, &attr, connection_thread_main,
2619                                client_sock);
2620       if (status != 0)
2621       {
2622         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2623         close_connection(client_sock);
2624         continue;
2625       }
2626     } /* for (pollfds_num) */
2627   } /* while (state == RUNNING) */
2629   RRDD_LOG(LOG_INFO, "starting shutdown");
2631   close_listen_sockets ();
2633   pthread_mutex_lock (&connection_threads_lock);
2634   while (connection_threads_num > 0)
2635     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2636   pthread_mutex_unlock (&connection_threads_lock);
2638   free(pollfds);
2640   return (NULL);
2641 } /* }}} void *listen_thread_main */
2643 static int daemonize (void) /* {{{ */
2645   int pid_fd;
2646   char *base_dir;
2648   daemon_uid = geteuid();
2650   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2651   if (pid_fd < 0)
2652     pid_fd = check_pidfile();
2653   if (pid_fd < 0)
2654     return pid_fd;
2656   /* open all the listen sockets */
2657   if (config_listen_address_list_len > 0)
2658   {
2659     for (size_t i = 0; i < config_listen_address_list_len; i++)
2660       open_listen_socket (config_listen_address_list[i]);
2662     rrd_free_ptrs((void ***) &config_listen_address_list,
2663                   &config_listen_address_list_len);
2664   }
2665   else
2666   {
2667     listen_socket_t sock;
2668     memset(&sock, 0, sizeof(sock));
2669     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2670     open_listen_socket (&sock);
2671   }
2673   if (listen_fds_num < 1)
2674   {
2675     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2676     goto error;
2677   }
2679   if (!stay_foreground)
2680   {
2681     pid_t child;
2683     child = fork ();
2684     if (child < 0)
2685     {
2686       fprintf (stderr, "daemonize: fork(2) failed.\n");
2687       goto error;
2688     }
2689     else if (child > 0)
2690       exit(0);
2692     /* Become session leader */
2693     setsid ();
2695     /* Open the first three file descriptors to /dev/null */
2696     close (2);
2697     close (1);
2698     close (0);
2700     open ("/dev/null", O_RDWR);
2701     if (dup(0) == -1 || dup(0) == -1){
2702         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2703     }
2704   } /* if (!stay_foreground) */
2706   /* Change into the /tmp directory. */
2707   base_dir = (config_base_dir != NULL)
2708     ? config_base_dir
2709     : "/tmp";
2711   if (chdir (base_dir) != 0)
2712   {
2713     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2714     goto error;
2715   }
2717   install_signal_handlers();
2719   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2720   RRDD_LOG(LOG_INFO, "starting up");
2722   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2723                                 (GDestroyNotify) free_cache_item);
2724   if (cache_tree == NULL)
2725   {
2726     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2727     goto error;
2728   }
2730   return write_pidfile (pid_fd);
2732 error:
2733   remove_pidfile();
2734   return -1;
2735 } /* }}} int daemonize */
2737 static int cleanup (void) /* {{{ */
2739   pthread_cond_broadcast (&flush_cond);
2740   pthread_join (flush_thread, NULL);
2742   pthread_cond_broadcast (&queue_cond);
2743   for (int i = 0; i < config_queue_threads; i++)
2744     pthread_join (queue_threads[i], NULL);
2746   if (config_flush_at_shutdown)
2747   {
2748     assert(cache_queue_head == NULL);
2749     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2750   }
2752   free(queue_threads);
2753   free(config_base_dir);
2755   pthread_mutex_lock(&cache_lock);
2756   g_tree_destroy(cache_tree);
2758   pthread_mutex_lock(&journal_lock);
2759   journal_done();
2761   RRDD_LOG(LOG_INFO, "goodbye");
2762   closelog ();
2764   remove_pidfile ();
2765   free(config_pid_file);
2767   return (0);
2768 } /* }}} int cleanup */
2770 static int read_options (int argc, char **argv) /* {{{ */
2772   int option;
2773   int status = 0;
2775   char **permissions = NULL;
2776   size_t permissions_len = 0;
2778   gid_t  socket_group = (gid_t)-1;
2779   mode_t socket_permissions = (mode_t)-1;
2781   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2782   {
2783     switch (option)
2784     {
2785       case 'g':
2786         stay_foreground=1;
2787         break;
2789       case 'l':
2790       {
2791         listen_socket_t *new;
2793         new = malloc(sizeof(listen_socket_t));
2794         if (new == NULL)
2795         {
2796           fprintf(stderr, "read_options: malloc failed.\n");
2797           return(2);
2798         }
2799         memset(new, 0, sizeof(listen_socket_t));
2801         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2803         /* Add permissions to the socket {{{ */
2804         if (permissions_len != 0)
2805         {
2806           size_t i;
2807           for (i = 0; i < permissions_len; i++)
2808           {
2809             status = socket_permission_add (new, permissions[i]);
2810             if (status != 0)
2811             {
2812               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2813                   "socket failed. Most likely, this permission doesn't "
2814                   "exist. Check your command line.\n", permissions[i]);
2815               status = 4;
2816             }
2817           }
2818         }
2819         else /* if (permissions_len == 0) */
2820         {
2821           /* Add permission for ALL commands to the socket. */
2822           size_t i;
2823           for (i = 0; i < list_of_commands_len; i++)
2824           {
2825             status = socket_permission_add (new, list_of_commands[i].cmd);
2826             if (status != 0)
2827             {
2828               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2829                   "socket failed. This should never happen, ever! Sorry.\n",
2830                   permissions[i]);
2831               status = 4;
2832             }
2833           }
2834         }
2835         /* }}} Done adding permissions. */
2837         new->socket_group = socket_group;
2838         new->socket_permissions = socket_permissions;
2840         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2841                          &config_listen_address_list_len, new))
2842         {
2843           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2844           return (2);
2845         }
2846       }
2847       break;
2849       /* set socket group permissions */
2850       case 's':
2851       {
2852         gid_t group_gid;
2853         struct group *grp;
2855         group_gid = strtoul(optarg, NULL, 10);
2856         if (errno != EINVAL && group_gid>0)
2857         {
2858           /* we were passed a number */
2859           grp = getgrgid(group_gid);
2860         }
2861         else
2862         {
2863           grp = getgrnam(optarg);
2864         }
2866         if (grp)
2867         {
2868           socket_group = grp->gr_gid;
2869         }
2870         else
2871         {
2872           /* no idea what the user wanted... */
2873           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2874           return (5);
2875         }
2876       }
2877       break;
2879       /* set socket file permissions */
2880       case 'm':
2881       {
2882         long  tmp;
2883         char *endptr = NULL;
2885         tmp = strtol (optarg, &endptr, 8);
2886         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2887             || (tmp > 07777) || (tmp < 0)) {
2888           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2889               optarg);
2890           return (5);
2891         }
2893         socket_permissions = (mode_t)tmp;
2894       }
2895       break;
2897       case 'P':
2898       {
2899         char *optcopy;
2900         char *saveptr;
2901         char *dummy;
2902         char *ptr;
2904         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2906         optcopy = strdup (optarg);
2907         dummy = optcopy;
2908         saveptr = NULL;
2909         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2910         {
2911           dummy = NULL;
2912           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2913         }
2915         free (optcopy);
2916       }
2917       break;
2919       case 'f':
2920       {
2921         int temp;
2923         temp = atoi (optarg);
2924         if (temp > 0)
2925           config_flush_interval = temp;
2926         else
2927         {
2928           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2929           status = 3;
2930         }
2931       }
2932       break;
2934       case 'w':
2935       {
2936         int temp;
2938         temp = atoi (optarg);
2939         if (temp > 0)
2940           config_write_interval = temp;
2941         else
2942         {
2943           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2944           status = 2;
2945         }
2946       }
2947       break;
2949       case 'z':
2950       {
2951         int temp;
2953         temp = atoi(optarg);
2954         if (temp > 0)
2955           config_write_jitter = temp;
2956         else
2957         {
2958           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2959           status = 2;
2960         }
2962         break;
2963       }
2965       case 't':
2966       {
2967         int threads;
2968         threads = atoi(optarg);
2969         if (threads >= 1)
2970           config_queue_threads = threads;
2971         else
2972         {
2973           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2974           return 1;
2975         }
2976       }
2977       break;
2979       case 'B':
2980         config_write_base_only = 1;
2981         break;
2983       case 'b':
2984       {
2985         size_t len;
2986         char base_realpath[PATH_MAX];
2988         if (config_base_dir != NULL)
2989           free (config_base_dir);
2990         config_base_dir = strdup (optarg);
2991         if (config_base_dir == NULL)
2992         {
2993           fprintf (stderr, "read_options: strdup failed.\n");
2994           return (3);
2995         }
2997         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2998         {
2999           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3000               config_base_dir, rrd_strerror (errno));
3001           return (3);
3002         }
3004         /* make sure that the base directory is not resolved via
3005          * symbolic links.  this makes some performance-enhancing
3006          * assumptions possible (we don't have to resolve paths
3007          * that start with a "/")
3008          */
3009         if (realpath(config_base_dir, base_realpath) == NULL)
3010         {
3011           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3012               "%s\n", config_base_dir, rrd_strerror(errno));
3013           return 5;
3014         }
3016         len = strlen (config_base_dir);
3017         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3018         {
3019           config_base_dir[len - 1] = 0;
3020           len--;
3021         }
3023         if (len < 1)
3024         {
3025           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3026           return (4);
3027         }
3029         _config_base_dir_len = len;
3031         len = strlen (base_realpath);
3032         while ((len > 0) && (base_realpath[len - 1] == '/'))
3033         {
3034           base_realpath[len - 1] = '\0';
3035           len--;
3036         }
3038         if (strncmp(config_base_dir,
3039                          base_realpath, sizeof(base_realpath)) != 0)
3040         {
3041           fprintf(stderr,
3042                   "Base directory (-b) resolved via file system links!\n"
3043                   "Please consult rrdcached '-b' documentation!\n"
3044                   "Consider specifying the real directory (%s)\n",
3045                   base_realpath);
3046           return 5;
3047         }
3048       }
3049       break;
3051       case 'p':
3052       {
3053         if (config_pid_file != NULL)
3054           free (config_pid_file);
3055         config_pid_file = strdup (optarg);
3056         if (config_pid_file == NULL)
3057         {
3058           fprintf (stderr, "read_options: strdup failed.\n");
3059           return (3);
3060         }
3061       }
3062       break;
3064       case 'F':
3065         config_flush_at_shutdown = 1;
3066         break;
3068       case 'j':
3069       {
3070         char journal_dir_actual[PATH_MAX];
3071         const char *dir;
3072         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3074         status = rrd_mkdir_p(dir, 0777);
3075         if (status != 0)
3076         {
3077           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3078               dir, rrd_strerror(errno));
3079           return 6;
3080         }
3082         if (access(dir, R_OK|W_OK|X_OK) != 0)
3083         {
3084           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3085                   errno ? rrd_strerror(errno) : "");
3086           return 6;
3087         }
3088       }
3089       break;
3091       case 'h':
3092       case '?':
3093         printf ("RRDCacheD %s\n"
3094             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3095             "\n"
3096             "Usage: rrdcached [options]\n"
3097             "\n"
3098             "Valid options are:\n"
3099             "  -l <address>  Socket address to listen to.\n"
3100             "  -P <perms>    Sets the permissions to assign to all following "
3101                             "sockets\n"
3102             "  -w <seconds>  Interval in which to write data.\n"
3103             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3104             "  -t <threads>  Number of write threads.\n"
3105             "  -f <seconds>  Interval in which to flush dead data.\n"
3106             "  -p <file>     Location of the PID-file.\n"
3107             "  -b <dir>      Base directory to change to.\n"
3108             "  -B            Restrict file access to paths within -b <dir>\n"
3109             "  -g            Do not fork and run in the foreground.\n"
3110             "  -j <dir>      Directory in which to create the journal files.\n"
3111             "  -F            Always flush all updates at shutdown\n"
3112             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3113             "                (the socket will also have read/write permissions "
3114                             "for that group)\n"
3115             "  -m <mode>     File permissions (octal) of all following UNIX "
3116                             "sockets\n"
3117             "\n"
3118             "For more information and a detailed description of all options "
3119             "please refer\n"
3120             "to the rrdcached(1) manual page.\n",
3121             VERSION);
3122         if (option == 'h')
3123           status = -1;
3124         else
3125           status = 1;
3126         break;
3127     } /* switch (option) */
3128   } /* while (getopt) */
3130   /* advise the user when values are not sane */
3131   if (config_flush_interval < 2 * config_write_interval)
3132     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3133             " 2x write interval (-w) !\n");
3134   if (config_write_jitter > config_write_interval)
3135     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3136             " write interval (-w) !\n");
3138   if (config_write_base_only && config_base_dir == NULL)
3139     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3140             "  Consult the rrdcached documentation\n");
3142   if (journal_dir == NULL)
3143     config_flush_at_shutdown = 1;
3145   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3147   return (status);
3148 } /* }}} int read_options */
3150 int main (int argc, char **argv)
3152   int status;
3154   status = read_options (argc, argv);
3155   if (status != 0)
3156   {
3157     if (status < 0)
3158       status = 0;
3159     return (status);
3160   }
3162   status = daemonize ();
3163   if (status != 0)
3164   {
3165     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3166     return (1);
3167   }
3169   journal_init();
3171   /* start the queue threads */
3172   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3173   if (queue_threads == NULL)
3174   {
3175     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3176     cleanup();
3177     return (1);
3178   }
3179   for (int i = 0; i < config_queue_threads; i++)
3180   {
3181     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3182     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3183     if (status != 0)
3184     {
3185       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3186       cleanup();
3187       return (1);
3188     }
3189   }
3191   /* start the flush thread */
3192   memset(&flush_thread, 0, sizeof(flush_thread));
3193   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3194   if (status != 0)
3195   {
3196     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3197     cleanup();
3198     return (1);
3199   }
3201   listen_thread_main (NULL);
3202   cleanup ();
3204   return (0);
3205 } /* int main */
3207 /*
3208  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3209  */