Code

bee21a2d7e6e248cb49f36b9983694892caed058
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 typedef enum
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
119   /* state for BATCH processing */
120   time_t batch_start;
121   int batch_cmd;
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
150 struct callback_flush_data_s
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
159 enum queue_side_e
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(char *action, int oflag) /* {{{ */
291   int fd;
292   char *file;
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
298   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
301             action, file, rrd_strerror(errno));
303   return(fd);
304 } /* }}} static int open_pidfile */
306 /* check existing pid file to see whether a daemon is running */
307 static int check_pidfile(void)
309   int pid_fd;
310   pid_t pid;
311   char pid_str[16];
313   pid_fd = open_pidfile("open", O_RDWR);
314   if (pid_fd < 0)
315     return pid_fd;
317   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
318     return -1;
320   pid = atoi(pid_str);
321   if (pid <= 0)
322     return -1;
324   /* another running process that we can signal COULD be
325    * a competing rrdcached */
326   if (pid != getpid() && kill(pid, 0) == 0)
327   {
328     fprintf(stderr,
329             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
330     close(pid_fd);
331     return -1;
332   }
334   lseek(pid_fd, 0, SEEK_SET);
335   ftruncate(pid_fd, 0);
337   fprintf(stderr,
338           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
339           "rrdcached: starting normally.\n", pid);
341   return pid_fd;
342 } /* }}} static int check_pidfile */
344 static int write_pidfile (int fd) /* {{{ */
346   pid_t pid;
347   FILE *fh;
349   pid = getpid ();
351   fh = fdopen (fd, "w");
352   if (fh == NULL)
353   {
354     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
355     close(fd);
356     return (-1);
357   }
359   fprintf (fh, "%i\n", (int) pid);
360   fclose (fh);
362   return (0);
363 } /* }}} int write_pidfile */
365 static int remove_pidfile (void) /* {{{ */
367   char *file;
368   int status;
370   file = (config_pid_file != NULL)
371     ? config_pid_file
372     : LOCALSTATEDIR "/run/rrdcached.pid";
374   status = unlink (file);
375   if (status == 0)
376     return (0);
377   return (errno);
378 } /* }}} int remove_pidfile */
380 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
382   char *eol;
384   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
385                sock->next_read - sock->next_cmd);
387   if (eol == NULL)
388   {
389     /* no commands left, move remainder back to front of rbuf */
390     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
391             sock->next_read - sock->next_cmd);
392     sock->next_read -= sock->next_cmd;
393     sock->next_cmd = 0;
394     *len = 0;
395     return NULL;
396   }
397   else
398   {
399     char *cmd = sock->rbuf + sock->next_cmd;
400     *eol = '\0';
402     sock->next_cmd = eol - sock->rbuf + 1;
404     if (eol > sock->rbuf && *(eol-1) == '\r')
405       *(--eol) = '\0'; /* handle "\r\n" EOL */
407     *len = eol - cmd;
409     return cmd;
410   }
412   /* NOTREACHED */
413   assert(1==0);
416 /* add the characters directly to the write buffer */
417 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
419   char *new_buf;
421   assert(sock != NULL);
423   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
424   if (new_buf == NULL)
425   {
426     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
427     return -1;
428   }
430   strncpy(new_buf + sock->wbuf_len, str, len + 1);
432   sock->wbuf = new_buf;
433   sock->wbuf_len += len;
435   return 0;
436 } /* }}} static int add_to_wbuf */
438 /* add the text to the "extra" info that's sent after the status line */
439 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
441   va_list argp;
442   char buffer[CMD_MAX];
443   int len;
445   if (sock == NULL) return 0; /* journal replay mode */
446   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
448   va_start(argp, fmt);
449 #ifdef HAVE_VSNPRINTF
450   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
451 #else
452   len = vsprintf(buffer, fmt, argp);
453 #endif
454   va_end(argp);
455   if (len < 0)
456   {
457     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
458     return -1;
459   }
461   return add_to_wbuf(sock, buffer, len);
462 } /* }}} static int add_response_info */
464 static int count_lines(char *str) /* {{{ */
466   int lines = 0;
468   if (str != NULL)
469   {
470     while ((str = strchr(str, '\n')) != NULL)
471     {
472       ++lines;
473       ++str;
474     }
475   }
477   return lines;
478 } /* }}} static int count_lines */
480 /* send the response back to the user.
481  * returns 0 on success, -1 on error
482  * write buffer is always zeroed after this call */
483 static int send_response (listen_socket_t *sock, response_code rc,
484                           char *fmt, ...) /* {{{ */
486   va_list argp;
487   char buffer[CMD_MAX];
488   int lines;
489   ssize_t wrote;
490   int rclen, len;
492   if (sock == NULL) return rc;  /* journal replay mode */
494   if (sock->batch_start)
495   {
496     if (rc == RESP_OK)
497       return rc; /* no response on success during BATCH */
498     lines = sock->batch_cmd;
499   }
500   else if (rc == RESP_OK)
501     lines = count_lines(sock->wbuf);
502   else
503     lines = -1;
505   rclen = sprintf(buffer, "%d ", lines);
506   va_start(argp, fmt);
507 #ifdef HAVE_VSNPRINTF
508   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
509 #else
510   len = vsprintf(buffer+rclen, fmt, argp);
511 #endif
512   va_end(argp);
513   if (len < 0)
514     return -1;
516   len += rclen;
518   /* append the result to the wbuf, don't write to the user */
519   if (sock->batch_start)
520     return add_to_wbuf(sock, buffer, len);
522   /* first write must be complete */
523   if (len != write(sock->fd, buffer, len))
524   {
525     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
526     return -1;
527   }
529   if (sock->wbuf != NULL && rc == RESP_OK)
530   {
531     wrote = 0;
532     while (wrote < sock->wbuf_len)
533     {
534       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
535       if (wb <= 0)
536       {
537         RRDD_LOG(LOG_INFO, "send_response: could not write results");
538         return -1;
539       }
540       wrote += wb;
541     }
542   }
544   free(sock->wbuf); sock->wbuf = NULL;
545   sock->wbuf_len = 0;
547   return 0;
548 } /* }}} */
550 static void wipe_ci_values(cache_item_t *ci, time_t when)
552   ci->values = NULL;
553   ci->values_num = 0;
555   ci->last_flush_time = when;
556   if (config_write_jitter > 0)
557     ci->last_flush_time += (random() % config_write_jitter);
560 /* remove_from_queue
561  * remove a "cache_item_t" item from the queue.
562  * must hold 'cache_lock' when calling this
563  */
564 static void remove_from_queue(cache_item_t *ci) /* {{{ */
566   if (ci == NULL) return;
567   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
569   if (ci->prev == NULL)
570     cache_queue_head = ci->next; /* reset head */
571   else
572     ci->prev->next = ci->next;
574   if (ci->next == NULL)
575     cache_queue_tail = ci->prev; /* reset the tail */
576   else
577     ci->next->prev = ci->prev;
579   ci->next = ci->prev = NULL;
580   ci->flags &= ~CI_FLAGS_IN_QUEUE;
581 } /* }}} static void remove_from_queue */
583 /* remove an entry from the tree and free all its resources.
584  * must hold 'cache lock' while calling this.
585  * returns 0 on success, otherwise errno */
586 static int forget_file(const char *file)
588   cache_item_t *ci;
590   ci = g_tree_lookup(cache_tree, file);
591   if (ci == NULL)
592     return ENOENT;
594   g_tree_remove (cache_tree, file);
595   remove_from_queue(ci);
597   for (int i=0; i < ci->values_num; i++)
598     free(ci->values[i]);
600   free (ci->values);
601   free (ci->file);
603   /* in case anyone is waiting */
604   pthread_cond_broadcast(&ci->flushed);
606   free (ci);
608   return 0;
609 } /* }}} static int forget_file */
611 /*
612  * enqueue_cache_item:
613  * `cache_lock' must be acquired before calling this function!
614  */
615 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
616     queue_side_t side)
618   if (ci == NULL)
619     return (-1);
621   if (ci->values_num == 0)
622     return (0);
624   if (side == HEAD)
625   {
626     if (cache_queue_head == ci)
627       return 0;
629     /* remove if further down in queue */
630     remove_from_queue(ci);
632     ci->prev = NULL;
633     ci->next = cache_queue_head;
634     if (ci->next != NULL)
635       ci->next->prev = ci;
636     cache_queue_head = ci;
638     if (cache_queue_tail == NULL)
639       cache_queue_tail = cache_queue_head;
640   }
641   else /* (side == TAIL) */
642   {
643     /* We don't move values back in the list.. */
644     if (ci->flags & CI_FLAGS_IN_QUEUE)
645       return (0);
647     assert (ci->next == NULL);
648     assert (ci->prev == NULL);
650     ci->prev = cache_queue_tail;
652     if (cache_queue_tail == NULL)
653       cache_queue_head = ci;
654     else
655       cache_queue_tail->next = ci;
657     cache_queue_tail = ci;
658   }
660   ci->flags |= CI_FLAGS_IN_QUEUE;
662   pthread_cond_broadcast(&cache_cond);
663   pthread_mutex_lock (&stats_lock);
664   stats_queue_length++;
665   pthread_mutex_unlock (&stats_lock);
667   return (0);
668 } /* }}} int enqueue_cache_item */
670 /*
671  * tree_callback_flush:
672  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
673  * while this is in progress.
674  */
675 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
676     gpointer data)
678   cache_item_t *ci;
679   callback_flush_data_t *cfd;
681   ci = (cache_item_t *) value;
682   cfd = (callback_flush_data_t *) data;
684   if (ci->flags & CI_FLAGS_IN_QUEUE)
685     return FALSE;
687   if ((ci->last_flush_time <= cfd->abs_timeout)
688       && (ci->values_num > 0))
689   {
690     enqueue_cache_item (ci, TAIL);
691   }
692   else if ((do_shutdown != 0)
693       && (ci->values_num > 0))
694   {
695     enqueue_cache_item (ci, TAIL);
696   }
697   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
698       && (ci->values_num <= 0))
699   {
700     char **temp;
702     temp = (char **) realloc (cfd->keys,
703         sizeof (char *) * (cfd->keys_num + 1));
704     if (temp == NULL)
705     {
706       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
707       return (FALSE);
708     }
709     cfd->keys = temp;
710     /* Make really sure this points to the _same_ place */
711     assert ((char *) key == ci->file);
712     cfd->keys[cfd->keys_num] = (char *) key;
713     cfd->keys_num++;
714   }
716   return (FALSE);
717 } /* }}} gboolean tree_callback_flush */
719 static int flush_old_values (int max_age)
721   callback_flush_data_t cfd;
722   size_t k;
724   memset (&cfd, 0, sizeof (cfd));
725   /* Pass the current time as user data so that we don't need to call
726    * `time' for each node. */
727   cfd.now = time (NULL);
728   cfd.keys = NULL;
729   cfd.keys_num = 0;
731   if (max_age > 0)
732     cfd.abs_timeout = cfd.now - max_age;
733   else
734     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
736   /* `tree_callback_flush' will return the keys of all values that haven't
737    * been touched in the last `config_flush_interval' seconds in `cfd'.
738    * The char*'s in this array point to the same memory as ci->file, so we
739    * don't need to free them separately. */
740   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
742   for (k = 0; k < cfd.keys_num; k++)
743   {
744     /* should never fail, since we have held the cache_lock
745      * the entire time */
746     assert( forget_file(cfd.keys[k]) == 0 );
747   }
749   if (cfd.keys != NULL)
750   {
751     free (cfd.keys);
752     cfd.keys = NULL;
753   }
755   return (0);
756 } /* int flush_old_values */
758 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
760   struct timeval now;
761   struct timespec next_flush;
762   int final_flush = 0; /* make sure we only flush once on shutdown */
764   gettimeofday (&now, NULL);
765   next_flush.tv_sec = now.tv_sec + config_flush_interval;
766   next_flush.tv_nsec = 1000 * now.tv_usec;
768   pthread_mutex_lock (&cache_lock);
769   while ((do_shutdown == 0) || (cache_queue_head != NULL))
770   {
771     cache_item_t *ci;
772     char *file;
773     char **values;
774     int values_num;
775     int status;
776     int i;
778     /* First, check if it's time to do the cache flush. */
779     gettimeofday (&now, NULL);
780     if ((now.tv_sec > next_flush.tv_sec)
781         || ((now.tv_sec == next_flush.tv_sec)
782           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
783     {
784       /* Flush all values that haven't been written in the last
785        * `config_write_interval' seconds. */
786       flush_old_values (config_write_interval);
788       /* Determine the time of the next cache flush. */
789       next_flush.tv_sec =
790         now.tv_sec + next_flush.tv_sec % config_flush_interval;
792       /* unlock the cache while we rotate so we don't block incoming
793        * updates if the fsync() blocks on disk I/O */
794       pthread_mutex_unlock(&cache_lock);
795       journal_rotate();
796       pthread_mutex_lock(&cache_lock);
797     }
799     /* Now, check if there's something to store away. If not, wait until
800      * something comes in or it's time to do the cache flush.  if we are
801      * shutting down, do not wait around.  */
802     if (cache_queue_head == NULL && !do_shutdown)
803     {
804       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
805       if ((status != 0) && (status != ETIMEDOUT))
806       {
807         RRDD_LOG (LOG_ERR, "queue_thread_main: "
808             "pthread_cond_timedwait returned %i.", status);
809       }
810     }
812     /* We're about to shut down */
813     if (do_shutdown != 0 && !final_flush++)
814     {
815       if (config_flush_at_shutdown)
816         flush_old_values (-1); /* flush everything */
817       else
818         break;
819     }
821     /* Check if a value has arrived. This may be NULL if we timed out or there
822      * was an interrupt such as a signal. */
823     if (cache_queue_head == NULL)
824       continue;
826     ci = cache_queue_head;
828     /* copy the relevant parts */
829     file = strdup (ci->file);
830     if (file == NULL)
831     {
832       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
833       continue;
834     }
836     assert(ci->values != NULL);
837     assert(ci->values_num > 0);
839     values = ci->values;
840     values_num = ci->values_num;
842     wipe_ci_values(ci, time(NULL));
843     remove_from_queue(ci);
845     pthread_mutex_lock (&stats_lock);
846     assert (stats_queue_length > 0);
847     stats_queue_length--;
848     pthread_mutex_unlock (&stats_lock);
850     pthread_mutex_unlock (&cache_lock);
852     rrd_clear_error ();
853     status = rrd_update_r (file, NULL, values_num, (void *) values);
854     if (status != 0)
855     {
856       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
857           "rrd_update_r (%s) failed with status %i. (%s)",
858           file, status, rrd_get_error());
859     }
861     journal_write("wrote", file);
862     pthread_cond_broadcast(&ci->flushed);
864     for (i = 0; i < values_num; i++)
865       free (values[i]);
867     free(values);
868     free(file);
870     if (status == 0)
871     {
872       pthread_mutex_lock (&stats_lock);
873       stats_updates_written++;
874       stats_data_sets_written += values_num;
875       pthread_mutex_unlock (&stats_lock);
876     }
878     pthread_mutex_lock (&cache_lock);
880     /* We're about to shut down */
881     if (do_shutdown != 0 && !final_flush++)
882     {
883       if (config_flush_at_shutdown)
884           flush_old_values (-1); /* flush everything */
885       else
886         break;
887     }
888   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
889   pthread_mutex_unlock (&cache_lock);
891   if (config_flush_at_shutdown)
892   {
893     assert(cache_queue_head == NULL);
894     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
895   }
897   journal_done();
899   return (NULL);
900 } /* }}} void *queue_thread_main */
902 static int buffer_get_field (char **buffer_ret, /* {{{ */
903     size_t *buffer_size_ret, char **field_ret)
905   char *buffer;
906   size_t buffer_pos;
907   size_t buffer_size;
908   char *field;
909   size_t field_size;
910   int status;
912   buffer = *buffer_ret;
913   buffer_pos = 0;
914   buffer_size = *buffer_size_ret;
915   field = *buffer_ret;
916   field_size = 0;
918   if (buffer_size <= 0)
919     return (-1);
921   /* This is ensured by `handle_request'. */
922   assert (buffer[buffer_size - 1] == '\0');
924   status = -1;
925   while (buffer_pos < buffer_size)
926   {
927     /* Check for end-of-field or end-of-buffer */
928     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
929     {
930       field[field_size] = 0;
931       field_size++;
932       buffer_pos++;
933       status = 0;
934       break;
935     }
936     /* Handle escaped characters. */
937     else if (buffer[buffer_pos] == '\\')
938     {
939       if (buffer_pos >= (buffer_size - 1))
940         break;
941       buffer_pos++;
942       field[field_size] = buffer[buffer_pos];
943       field_size++;
944       buffer_pos++;
945     }
946     /* Normal operation */ 
947     else
948     {
949       field[field_size] = buffer[buffer_pos];
950       field_size++;
951       buffer_pos++;
952     }
953   } /* while (buffer_pos < buffer_size) */
955   if (status != 0)
956     return (status);
958   *buffer_ret = buffer + buffer_pos;
959   *buffer_size_ret = buffer_size - buffer_pos;
960   *field_ret = field;
962   return (0);
963 } /* }}} int buffer_get_field */
965 /* if we're restricting writes to the base directory,
966  * check whether the file falls within the dir
967  * returns 1 if OK, otherwise 0
968  */
969 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
971   assert(file != NULL);
973   if (!config_write_base_only
974       || sock == NULL /* journal replay */
975       || config_base_dir == NULL)
976     return 1;
978   if (strstr(file, "../") != NULL) goto err;
980   /* relative paths without "../" are ok */
981   if (*file != '/') return 1;
983   /* file must be of the format base + "/" + <1+ char filename> */
984   if (strlen(file) < _config_base_dir_len + 2) goto err;
985   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
986   if (*(file + _config_base_dir_len) != '/') goto err;
988   return 1;
990 err:
991   if (sock != NULL && sock->fd >= 0)
992     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
994   return 0;
995 } /* }}} static int check_file_access */
997 /* when using a base dir, convert relative paths to absolute paths.
998  * if necessary, modifies the "filename" pointer to point
999  * to the new path created in "tmp".  "tmp" is provided
1000  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1001  *
1002  * this allows us to optimize for the expected case (absolute path)
1003  * with a no-op.
1004  */
1005 static void get_abs_path(char **filename, char *tmp)
1007   assert(tmp != NULL);
1008   assert(filename != NULL && *filename != NULL);
1010   if (config_base_dir == NULL || **filename == '/')
1011     return;
1013   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1014   *filename = tmp;
1015 } /* }}} static int get_abs_path */
1017 /* returns 1 if we have the required privilege level,
1018  * otherwise issue an error to the user on sock */
1019 static int has_privilege (listen_socket_t *sock, /* {{{ */
1020                           socket_privilege priv)
1022   if (sock == NULL) /* journal replay */
1023     return 1;
1025   if (sock->privilege >= priv)
1026     return 1;
1028   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1029 } /* }}} static int has_privilege */
1031 static int flush_file (const char *filename) /* {{{ */
1033   cache_item_t *ci;
1035   pthread_mutex_lock (&cache_lock);
1037   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1038   if (ci == NULL)
1039   {
1040     pthread_mutex_unlock (&cache_lock);
1041     return (ENOENT);
1042   }
1044   if (ci->values_num > 0)
1045   {
1046     /* Enqueue at head */
1047     enqueue_cache_item (ci, HEAD);
1048     pthread_cond_wait(&ci->flushed, &cache_lock);
1049   }
1051   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1052    * may have been purged during our cond_wait() */
1054   pthread_mutex_unlock(&cache_lock);
1056   return (0);
1057 } /* }}} int flush_file */
1059 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1060     char *buffer, size_t buffer_size)
1062   int status;
1063   char **help_text;
1064   char *command;
1066   char *help_help[2] =
1067   {
1068     "Command overview\n"
1069     ,
1070     "HELP [<command>]\n"
1071     "FLUSH <filename>\n"
1072     "FLUSHALL\n"
1073     "PENDING <filename>\n"
1074     "FORGET <filename>\n"
1075     "UPDATE <filename> <values> [<values> ...]\n"
1076     "BATCH\n"
1077     "STATS\n"
1078   };
1080   char *help_flush[2] =
1081   {
1082     "Help for FLUSH\n"
1083     ,
1084     "Usage: FLUSH <filename>\n"
1085     "\n"
1086     "Adds the given filename to the head of the update queue and returns\n"
1087     "after is has been dequeued.\n"
1088   };
1090   char *help_flushall[2] =
1091   {
1092     "Help for FLUSHALL\n"
1093     ,
1094     "Usage: FLUSHALL\n"
1095     "\n"
1096     "Triggers writing of all pending updates.  Returns immediately.\n"
1097   };
1099   char *help_pending[2] =
1100   {
1101     "Help for PENDING\n"
1102     ,
1103     "Usage: PENDING <filename>\n"
1104     "\n"
1105     "Shows any 'pending' updates for a file, in order.\n"
1106     "The updates shown have not yet been written to the underlying RRD file.\n"
1107   };
1109   char *help_forget[2] =
1110   {
1111     "Help for FORGET\n"
1112     ,
1113     "Usage: FORGET <filename>\n"
1114     "\n"
1115     "Removes the file completely from the cache.\n"
1116     "Any pending updates for the file will be lost.\n"
1117   };
1119   char *help_update[2] =
1120   {
1121     "Help for UPDATE\n"
1122     ,
1123     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1124     "\n"
1125     "Adds the given file to the internal cache if it is not yet known and\n"
1126     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1127     "for details.\n"
1128     "\n"
1129     "Each <values> has the following form:\n"
1130     "  <values> = <time>:<value>[:<value>[...]]\n"
1131     "See the rrdupdate(1) manpage for details.\n"
1132   };
1134   char *help_stats[2] =
1135   {
1136     "Help for STATS\n"
1137     ,
1138     "Usage: STATS\n"
1139     "\n"
1140     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1141     "a description of the values.\n"
1142   };
1144   char *help_batch[2] =
1145   {
1146     "Help for BATCH\n"
1147     ,
1148     "The 'BATCH' command permits the client to initiate a bulk load\n"
1149     "   of commands to rrdcached.\n"
1150     "\n"
1151     "Usage:\n"
1152     "\n"
1153     "    client: BATCH\n"
1154     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1155     "    client: command #1\n"
1156     "    client: command #2\n"
1157     "    client: ... and so on\n"
1158     "    client: .\n"
1159     "    server: 2 errors\n"
1160     "    server: 7 message for command #7\n"
1161     "    server: 9 message for command #9\n"
1162     "\n"
1163     "For more information, consult the rrdcached(1) documentation.\n"
1164   };
1166   status = buffer_get_field (&buffer, &buffer_size, &command);
1167   if (status != 0)
1168     help_text = help_help;
1169   else
1170   {
1171     if (strcasecmp (command, "update") == 0)
1172       help_text = help_update;
1173     else if (strcasecmp (command, "flush") == 0)
1174       help_text = help_flush;
1175     else if (strcasecmp (command, "flushall") == 0)
1176       help_text = help_flushall;
1177     else if (strcasecmp (command, "pending") == 0)
1178       help_text = help_pending;
1179     else if (strcasecmp (command, "forget") == 0)
1180       help_text = help_forget;
1181     else if (strcasecmp (command, "stats") == 0)
1182       help_text = help_stats;
1183     else if (strcasecmp (command, "batch") == 0)
1184       help_text = help_batch;
1185     else
1186       help_text = help_help;
1187   }
1189   add_response_info(sock, help_text[1]);
1190   return send_response(sock, RESP_OK, help_text[0]);
1191 } /* }}} int handle_request_help */
1193 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1195   uint64_t copy_queue_length;
1196   uint64_t copy_updates_received;
1197   uint64_t copy_flush_received;
1198   uint64_t copy_updates_written;
1199   uint64_t copy_data_sets_written;
1200   uint64_t copy_journal_bytes;
1201   uint64_t copy_journal_rotate;
1203   uint64_t tree_nodes_number;
1204   uint64_t tree_depth;
1206   pthread_mutex_lock (&stats_lock);
1207   copy_queue_length       = stats_queue_length;
1208   copy_updates_received   = stats_updates_received;
1209   copy_flush_received     = stats_flush_received;
1210   copy_updates_written    = stats_updates_written;
1211   copy_data_sets_written  = stats_data_sets_written;
1212   copy_journal_bytes      = stats_journal_bytes;
1213   copy_journal_rotate     = stats_journal_rotate;
1214   pthread_mutex_unlock (&stats_lock);
1216   pthread_mutex_lock (&cache_lock);
1217   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1218   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1219   pthread_mutex_unlock (&cache_lock);
1221   add_response_info(sock,
1222                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1223   add_response_info(sock,
1224                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1225   add_response_info(sock,
1226                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1227   add_response_info(sock,
1228                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1229   add_response_info(sock,
1230                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1231   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1232   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1233   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1234   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1236   send_response(sock, RESP_OK, "Statistics follow\n");
1238   return (0);
1239 } /* }}} int handle_request_stats */
1241 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1242     char *buffer, size_t buffer_size)
1244   char *file, file_tmp[PATH_MAX];
1245   int status;
1247   status = buffer_get_field (&buffer, &buffer_size, &file);
1248   if (status != 0)
1249   {
1250     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1251   }
1252   else
1253   {
1254     pthread_mutex_lock(&stats_lock);
1255     stats_flush_received++;
1256     pthread_mutex_unlock(&stats_lock);
1258     get_abs_path(&file, file_tmp);
1259     if (!check_file_access(file, sock)) return 0;
1261     status = flush_file (file);
1262     if (status == 0)
1263       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1264     else if (status == ENOENT)
1265     {
1266       /* no file in our tree; see whether it exists at all */
1267       struct stat statbuf;
1269       memset(&statbuf, 0, sizeof(statbuf));
1270       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1271         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1272       else
1273         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1274     }
1275     else if (status < 0)
1276       return send_response(sock, RESP_ERR, "Internal error.\n");
1277     else
1278       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1279   }
1281   /* NOTREACHED */
1282   assert(1==0);
1283 } /* }}} int handle_request_flush */
1285 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1287   int status;
1289   status = has_privilege(sock, PRIV_HIGH);
1290   if (status <= 0)
1291     return status;
1293   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1295   pthread_mutex_lock(&cache_lock);
1296   flush_old_values(-1);
1297   pthread_mutex_unlock(&cache_lock);
1299   return send_response(sock, RESP_OK, "Started flush.\n");
1300 } /* }}} static int handle_request_flushall */
1302 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1303                                   char *buffer, size_t buffer_size)
1305   int status;
1306   char *file, file_tmp[PATH_MAX];
1307   cache_item_t *ci;
1309   status = buffer_get_field(&buffer, &buffer_size, &file);
1310   if (status != 0)
1311     return send_response(sock, RESP_ERR,
1312                          "Usage: PENDING <filename>\n");
1314   status = has_privilege(sock, PRIV_HIGH);
1315   if (status <= 0)
1316     return status;
1318   get_abs_path(&file, file_tmp);
1320   pthread_mutex_lock(&cache_lock);
1321   ci = g_tree_lookup(cache_tree, file);
1322   if (ci == NULL)
1323   {
1324     pthread_mutex_unlock(&cache_lock);
1325     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1326   }
1328   for (int i=0; i < ci->values_num; i++)
1329     add_response_info(sock, "%s\n", ci->values[i]);
1331   pthread_mutex_unlock(&cache_lock);
1332   return send_response(sock, RESP_OK, "updates pending\n");
1333 } /* }}} static int handle_request_pending */
1335 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1336                                  char *buffer, size_t buffer_size)
1338   int status;
1339   char *file, file_tmp[PATH_MAX];
1341   status = buffer_get_field(&buffer, &buffer_size, &file);
1342   if (status != 0)
1343     return send_response(sock, RESP_ERR,
1344                          "Usage: FORGET <filename>\n");
1346   status = has_privilege(sock, PRIV_HIGH);
1347   if (status <= 0)
1348     return status;
1350   get_abs_path(&file, file_tmp);
1351   if (!check_file_access(file, sock)) return 0;
1353   pthread_mutex_lock(&cache_lock);
1354   status = forget_file(file);
1355   pthread_mutex_unlock(&cache_lock);
1357   if (status == 0)
1358   {
1359     if (sock != NULL)
1360       journal_write("forget", file);
1362     return send_response(sock, RESP_OK, "Gone!\n");
1363   }
1364   else
1365     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1366                          status < 0 ? "Internal error" : rrd_strerror(status));
1368   /* NOTREACHED */
1369   assert(1==0);
1370 } /* }}} static int handle_request_forget */
1372 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1373                                   time_t now,
1374                                   char *buffer, size_t buffer_size)
1376   char *file, file_tmp[PATH_MAX];
1377   int values_num = 0;
1378   int bad_timestamps = 0;
1379   int status;
1380   char orig_buf[CMD_MAX];
1382   cache_item_t *ci;
1384   status = has_privilege(sock, PRIV_HIGH);
1385   if (status <= 0)
1386     return status;
1388   /* save it for the journal later */
1389   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1391   status = buffer_get_field (&buffer, &buffer_size, &file);
1392   if (status != 0)
1393     return send_response(sock, RESP_ERR,
1394                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1396   pthread_mutex_lock(&stats_lock);
1397   stats_updates_received++;
1398   pthread_mutex_unlock(&stats_lock);
1400   get_abs_path(&file, file_tmp);
1401   if (!check_file_access(file, sock)) return 0;
1403   pthread_mutex_lock (&cache_lock);
1404   ci = g_tree_lookup (cache_tree, file);
1406   if (ci == NULL) /* {{{ */
1407   {
1408     struct stat statbuf;
1410     /* don't hold the lock while we setup; stat(2) might block */
1411     pthread_mutex_unlock(&cache_lock);
1413     memset (&statbuf, 0, sizeof (statbuf));
1414     status = stat (file, &statbuf);
1415     if (status != 0)
1416     {
1417       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1419       status = errno;
1420       if (status == ENOENT)
1421         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1422       else
1423         return send_response(sock, RESP_ERR,
1424                              "stat failed with error %i.\n", status);
1425     }
1426     if (!S_ISREG (statbuf.st_mode))
1427       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1429     if (access(file, R_OK|W_OK) != 0)
1430       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1431                            file, rrd_strerror(errno));
1433     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1434     if (ci == NULL)
1435     {
1436       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1438       return send_response(sock, RESP_ERR, "malloc failed.\n");
1439     }
1440     memset (ci, 0, sizeof (cache_item_t));
1442     ci->file = strdup (file);
1443     if (ci->file == NULL)
1444     {
1445       free (ci);
1446       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1448       return send_response(sock, RESP_ERR, "strdup failed.\n");
1449     }
1451     wipe_ci_values(ci, now);
1452     ci->flags = CI_FLAGS_IN_TREE;
1453     ci->flushed = PTHREAD_COND_INITIALIZER;
1455     pthread_mutex_lock(&cache_lock);
1456     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1457   } /* }}} */
1458   assert (ci != NULL);
1460   /* don't re-write updates in replay mode */
1461   if (sock != NULL)
1462     journal_write("update", orig_buf);
1464   while (buffer_size > 0)
1465   {
1466     char **temp;
1467     char *value;
1468     time_t stamp;
1469     char *eostamp;
1471     status = buffer_get_field (&buffer, &buffer_size, &value);
1472     if (status != 0)
1473     {
1474       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1475       break;
1476     }
1478     /* make sure update time is always moving forward */
1479     stamp = strtol(value, &eostamp, 10);
1480     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1481     {
1482       ++bad_timestamps;
1483       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1484       continue;
1485     }
1486     else if (stamp <= ci->last_update_stamp)
1487     {
1488       ++bad_timestamps;
1489       add_response_info(sock,
1490                         "illegal attempt to update using time %ld when"
1491                         " last update time is %ld (minimum one second step)\n",
1492                         stamp, ci->last_update_stamp);
1493       continue;
1494     }
1495     else
1496       ci->last_update_stamp = stamp;
1498     temp = (char **) realloc (ci->values,
1499         sizeof (char *) * (ci->values_num + 1));
1500     if (temp == NULL)
1501     {
1502       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1503       continue;
1504     }
1505     ci->values = temp;
1507     ci->values[ci->values_num] = strdup (value);
1508     if (ci->values[ci->values_num] == NULL)
1509     {
1510       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1511       continue;
1512     }
1513     ci->values_num++;
1515     values_num++;
1516   }
1518   if (((now - ci->last_flush_time) >= config_write_interval)
1519       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1520       && (ci->values_num > 0))
1521   {
1522     enqueue_cache_item (ci, TAIL);
1523   }
1525   pthread_mutex_unlock (&cache_lock);
1527   if (values_num < 1)
1528   {
1529     /* journal replay mode */
1530     if (sock == NULL) return RESP_ERR;
1532     /* if we had only one update attempt, then return the full
1533        error message... try to get the most information out
1534        of the limited error space allowed by the protocol
1535     */
1536     if (bad_timestamps == 1)
1537       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1538     else
1539       return send_response(sock, RESP_ERR,
1540                            "No values updated (%d bad timestamps).\n",
1541                            bad_timestamps);
1542   }
1543   else
1544     return send_response(sock, RESP_OK,
1545                          "errors, enqueued %i value(s).\n", values_num);
1547   /* NOTREACHED */
1548   assert(1==0);
1550 } /* }}} int handle_request_update */
1552 /* we came across a "WROTE" entry during journal replay.
1553  * throw away any values that we have accumulated for this file
1554  */
1555 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1557   int i;
1558   cache_item_t *ci;
1559   const char *file = buffer;
1561   pthread_mutex_lock(&cache_lock);
1563   ci = g_tree_lookup(cache_tree, file);
1564   if (ci == NULL)
1565   {
1566     pthread_mutex_unlock(&cache_lock);
1567     return (0);
1568   }
1570   if (ci->values)
1571   {
1572     for (i=0; i < ci->values_num; i++)
1573       free(ci->values[i]);
1575     free(ci->values);
1576   }
1578   wipe_ci_values(ci, now);
1579   remove_from_queue(ci);
1581   pthread_mutex_unlock(&cache_lock);
1582   return (0);
1583 } /* }}} int handle_request_wrote */
1585 /* start "BATCH" processing */
1586 static int batch_start (listen_socket_t *sock) /* {{{ */
1588   int status;
1589   if (sock->batch_start)
1590     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1592   status = send_response(sock, RESP_OK,
1593                          "Go ahead.  End with dot '.' on its own line.\n");
1594   sock->batch_start = time(NULL);
1595   sock->batch_cmd = 0;
1597   return status;
1598 } /* }}} static int batch_start */
1600 /* finish "BATCH" processing and return results to the client */
1601 static int batch_done (listen_socket_t *sock) /* {{{ */
1603   assert(sock->batch_start);
1604   sock->batch_start = 0;
1605   sock->batch_cmd  = 0;
1606   return send_response(sock, RESP_OK, "errors\n");
1607 } /* }}} static int batch_done */
1609 /* if sock==NULL, we are in journal replay mode */
1610 static int handle_request (listen_socket_t *sock, /* {{{ */
1611                            time_t now,
1612                            char *buffer, size_t buffer_size)
1614   char *buffer_ptr;
1615   char *command;
1616   int status;
1618   assert (buffer[buffer_size - 1] == '\0');
1620   buffer_ptr = buffer;
1621   command = NULL;
1622   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1623   if (status != 0)
1624   {
1625     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1626     return (-1);
1627   }
1629   if (sock != NULL && sock->batch_start)
1630     sock->batch_cmd++;
1632   if (strcasecmp (command, "update") == 0)
1633     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1634   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1635   {
1636     /* this is only valid in replay mode */
1637     return (handle_request_wrote (buffer_ptr, now));
1638   }
1639   else if (strcasecmp (command, "flush") == 0)
1640     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1641   else if (strcasecmp (command, "flushall") == 0)
1642     return (handle_request_flushall(sock));
1643   else if (strcasecmp (command, "pending") == 0)
1644     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1645   else if (strcasecmp (command, "forget") == 0)
1646     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1647   else if (strcasecmp (command, "stats") == 0)
1648     return (handle_request_stats (sock));
1649   else if (strcasecmp (command, "help") == 0)
1650     return (handle_request_help (sock, buffer_ptr, buffer_size));
1651   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1652     return batch_start(sock);
1653   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1654     return batch_done(sock);
1655   else
1656     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1658   /* NOTREACHED */
1659   assert(1==0);
1660 } /* }}} int handle_request */
1662 /* MUST NOT hold journal_lock before calling this */
1663 static void journal_rotate(void) /* {{{ */
1665   FILE *old_fh = NULL;
1666   int new_fd;
1668   if (journal_cur == NULL || journal_old == NULL)
1669     return;
1671   pthread_mutex_lock(&journal_lock);
1673   /* we rotate this way (rename before close) so that the we can release
1674    * the journal lock as fast as possible.  Journal writes to the new
1675    * journal can proceed immediately after the new file is opened.  The
1676    * fclose can then block without affecting new updates.
1677    */
1678   if (journal_fh != NULL)
1679   {
1680     old_fh = journal_fh;
1681     journal_fh = NULL;
1682     rename(journal_cur, journal_old);
1683     ++stats_journal_rotate;
1684   }
1686   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1687                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1688   if (new_fd >= 0)
1689   {
1690     journal_fh = fdopen(new_fd, "a");
1691     if (journal_fh == NULL)
1692       close(new_fd);
1693   }
1695   pthread_mutex_unlock(&journal_lock);
1697   if (old_fh != NULL)
1698     fclose(old_fh);
1700   if (journal_fh == NULL)
1701   {
1702     RRDD_LOG(LOG_CRIT,
1703              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1704              journal_cur, rrd_strerror(errno));
1706     RRDD_LOG(LOG_ERR,
1707              "JOURNALING DISABLED: All values will be flushed at shutdown");
1708     config_flush_at_shutdown = 1;
1709   }
1711 } /* }}} static void journal_rotate */
1713 static void journal_done(void) /* {{{ */
1715   if (journal_cur == NULL)
1716     return;
1718   pthread_mutex_lock(&journal_lock);
1719   if (journal_fh != NULL)
1720   {
1721     fclose(journal_fh);
1722     journal_fh = NULL;
1723   }
1725   if (config_flush_at_shutdown)
1726   {
1727     RRDD_LOG(LOG_INFO, "removing journals");
1728     unlink(journal_old);
1729     unlink(journal_cur);
1730   }
1731   else
1732   {
1733     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1734              "journals will be used at next startup");
1735   }
1737   pthread_mutex_unlock(&journal_lock);
1739 } /* }}} static void journal_done */
1741 static int journal_write(char *cmd, char *args) /* {{{ */
1743   int chars;
1745   if (journal_fh == NULL)
1746     return 0;
1748   pthread_mutex_lock(&journal_lock);
1749   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1750   pthread_mutex_unlock(&journal_lock);
1752   if (chars > 0)
1753   {
1754     pthread_mutex_lock(&stats_lock);
1755     stats_journal_bytes += chars;
1756     pthread_mutex_unlock(&stats_lock);
1757   }
1759   return chars;
1760 } /* }}} static int journal_write */
1762 static int journal_replay (const char *file) /* {{{ */
1764   FILE *fh;
1765   int entry_cnt = 0;
1766   int fail_cnt = 0;
1767   uint64_t line = 0;
1768   char entry[CMD_MAX];
1769   time_t now;
1771   if (file == NULL) return 0;
1773   {
1774     char *reason;
1775     int status = 0;
1776     struct stat statbuf;
1778     memset(&statbuf, 0, sizeof(statbuf));
1779     if (stat(file, &statbuf) != 0)
1780     {
1781       if (errno == ENOENT)
1782         return 0;
1784       reason = "stat error";
1785       status = errno;
1786     }
1787     else if (!S_ISREG(statbuf.st_mode))
1788     {
1789       reason = "not a regular file";
1790       status = EPERM;
1791     }
1792     if (statbuf.st_uid != daemon_uid)
1793     {
1794       reason = "not owned by daemon user";
1795       status = EACCES;
1796     }
1797     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1798     {
1799       reason = "must not be user/group writable";
1800       status = EACCES;
1801     }
1803     if (status != 0)
1804     {
1805       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1806                file, rrd_strerror(status), reason);
1807       return 0;
1808     }
1809   }
1811   fh = fopen(file, "r");
1812   if (fh == NULL)
1813   {
1814     if (errno != ENOENT)
1815       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1816                file, rrd_strerror(errno));
1817     return 0;
1818   }
1819   else
1820     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1822   now = time(NULL);
1824   while(!feof(fh))
1825   {
1826     size_t entry_len;
1828     ++line;
1829     if (fgets(entry, sizeof(entry), fh) == NULL)
1830       break;
1831     entry_len = strlen(entry);
1833     /* check \n termination in case journal writing crashed mid-line */
1834     if (entry_len == 0)
1835       continue;
1836     else if (entry[entry_len - 1] != '\n')
1837     {
1838       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1839       ++fail_cnt;
1840       continue;
1841     }
1843     entry[entry_len - 1] = '\0';
1845     if (handle_request(NULL, now, entry, entry_len) == 0)
1846       ++entry_cnt;
1847     else
1848       ++fail_cnt;
1849   }
1851   fclose(fh);
1853   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1854            entry_cnt, fail_cnt);
1856   return entry_cnt > 0 ? 1 : 0;
1857 } /* }}} static int journal_replay */
1859 static void journal_init(void) /* {{{ */
1861   int had_journal = 0;
1863   if (journal_cur == NULL) return;
1865   pthread_mutex_lock(&journal_lock);
1867   RRDD_LOG(LOG_INFO, "checking for journal files");
1869   had_journal += journal_replay(journal_old);
1870   had_journal += journal_replay(journal_cur);
1872   /* it must have been a crash.  start a flush */
1873   if (had_journal && config_flush_at_shutdown)
1874     flush_old_values(-1);
1876   pthread_mutex_unlock(&journal_lock);
1877   journal_rotate();
1879   RRDD_LOG(LOG_INFO, "journal processing complete");
1881 } /* }}} static void journal_init */
1883 static void close_connection(listen_socket_t *sock)
1885   close(sock->fd) ;  sock->fd   = -1;
1886   free(sock->rbuf);  sock->rbuf = NULL;
1887   free(sock->wbuf);  sock->wbuf = NULL;
1889   free(sock);
1892 static void *connection_thread_main (void *args) /* {{{ */
1894   pthread_t self;
1895   listen_socket_t *sock;
1896   int i;
1897   int fd;
1899   sock = (listen_socket_t *) args;
1900   fd = sock->fd;
1902   /* init read buffers */
1903   sock->next_read = sock->next_cmd = 0;
1904   sock->rbuf = malloc(RBUF_SIZE);
1905   if (sock->rbuf == NULL)
1906   {
1907     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1908     close_connection(sock);
1909     return NULL;
1910   }
1912   pthread_mutex_lock (&connection_threads_lock);
1913   {
1914     pthread_t *temp;
1916     temp = (pthread_t *) realloc (connection_threads,
1917         sizeof (pthread_t) * (connection_threads_num + 1));
1918     if (temp == NULL)
1919     {
1920       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1921     }
1922     else
1923     {
1924       connection_threads = temp;
1925       connection_threads[connection_threads_num] = pthread_self ();
1926       connection_threads_num++;
1927     }
1928   }
1929   pthread_mutex_unlock (&connection_threads_lock);
1931   while (do_shutdown == 0)
1932   {
1933     char *cmd;
1934     ssize_t cmd_len;
1935     ssize_t rbytes;
1936     time_t now;
1938     struct pollfd pollfd;
1939     int status;
1941     pollfd.fd = fd;
1942     pollfd.events = POLLIN | POLLPRI;
1943     pollfd.revents = 0;
1945     status = poll (&pollfd, 1, /* timeout = */ 500);
1946     if (do_shutdown)
1947       break;
1948     else if (status == 0) /* timeout */
1949       continue;
1950     else if (status < 0) /* error */
1951     {
1952       status = errno;
1953       if (status != EINTR)
1954         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1955       continue;
1956     }
1958     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1959       break;
1960     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1961     {
1962       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1963           "poll(2) returned something unexpected: %#04hx",
1964           pollfd.revents);
1965       break;
1966     }
1968     rbytes = read(fd, sock->rbuf + sock->next_read,
1969                   RBUF_SIZE - sock->next_read);
1970     if (rbytes < 0)
1971     {
1972       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1973       break;
1974     }
1975     else if (rbytes == 0)
1976       break; /* eof */
1978     sock->next_read += rbytes;
1980     if (sock->batch_start)
1981       now = sock->batch_start;
1982     else
1983       now = time(NULL);
1985     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1986     {
1987       status = handle_request (sock, now, cmd, cmd_len+1);
1988       if (status != 0)
1989         goto out_close;
1990     }
1991   }
1993 out_close:
1994   close_connection(sock);
1996   self = pthread_self ();
1997   /* Remove this thread from the connection threads list */
1998   pthread_mutex_lock (&connection_threads_lock);
1999   /* Find out own index in the array */
2000   for (i = 0; i < connection_threads_num; i++)
2001     if (pthread_equal (connection_threads[i], self) != 0)
2002       break;
2003   assert (i < connection_threads_num);
2005   /* Move the trailing threads forward. */
2006   if (i < (connection_threads_num - 1))
2007   {
2008     memmove (connection_threads + i,
2009         connection_threads + i + 1,
2010         sizeof (pthread_t) * (connection_threads_num - i - 1));
2011   }
2013   connection_threads_num--;
2014   pthread_mutex_unlock (&connection_threads_lock);
2016   return (NULL);
2017 } /* }}} void *connection_thread_main */
2019 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2021   int fd;
2022   struct sockaddr_un sa;
2023   listen_socket_t *temp;
2024   int status;
2025   const char *path;
2027   path = sock->addr;
2028   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2029     path += strlen("unix:");
2031   temp = (listen_socket_t *) realloc (listen_fds,
2032       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2033   if (temp == NULL)
2034   {
2035     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2036     return (-1);
2037   }
2038   listen_fds = temp;
2039   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2041   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2042   if (fd < 0)
2043   {
2044     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2045              rrd_strerror(errno));
2046     return (-1);
2047   }
2049   memset (&sa, 0, sizeof (sa));
2050   sa.sun_family = AF_UNIX;
2051   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2053   /* if we've gotten this far, we own the pid file.  any daemon started
2054    * with the same args must not be alive.  therefore, ensure that we can
2055    * create the socket...
2056    */
2057   unlink(path);
2059   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2060   if (status != 0)
2061   {
2062     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2063              path, rrd_strerror(errno));
2064     close (fd);
2065     return (-1);
2066   }
2068   status = listen (fd, /* backlog = */ 10);
2069   if (status != 0)
2070   {
2071     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2072              path, rrd_strerror(errno));
2073     close (fd);
2074     unlink (path);
2075     return (-1);
2076   }
2078   listen_fds[listen_fds_num].fd = fd;
2079   listen_fds[listen_fds_num].family = PF_UNIX;
2080   strncpy(listen_fds[listen_fds_num].addr, path,
2081           sizeof (listen_fds[listen_fds_num].addr) - 1);
2082   listen_fds_num++;
2084   return (0);
2085 } /* }}} int open_listen_socket_unix */
2087 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2089   struct addrinfo ai_hints;
2090   struct addrinfo *ai_res;
2091   struct addrinfo *ai_ptr;
2092   char addr_copy[NI_MAXHOST];
2093   char *addr;
2094   char *port;
2095   int status;
2097   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2098   addr_copy[sizeof (addr_copy) - 1] = 0;
2099   addr = addr_copy;
2101   memset (&ai_hints, 0, sizeof (ai_hints));
2102   ai_hints.ai_flags = 0;
2103 #ifdef AI_ADDRCONFIG
2104   ai_hints.ai_flags |= AI_ADDRCONFIG;
2105 #endif
2106   ai_hints.ai_family = AF_UNSPEC;
2107   ai_hints.ai_socktype = SOCK_STREAM;
2109   port = NULL;
2110   if (*addr == '[') /* IPv6+port format */
2111   {
2112     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2113     addr++;
2115     port = strchr (addr, ']');
2116     if (port == NULL)
2117     {
2118       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2119       return (-1);
2120     }
2121     *port = 0;
2122     port++;
2124     if (*port == ':')
2125       port++;
2126     else if (*port == 0)
2127       port = NULL;
2128     else
2129     {
2130       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2131       return (-1);
2132     }
2133   } /* if (*addr = ']') */
2134   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2135   {
2136     port = rindex(addr, ':');
2137     if (port != NULL)
2138     {
2139       *port = 0;
2140       port++;
2141     }
2142   }
2143   ai_res = NULL;
2144   status = getaddrinfo (addr,
2145                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2146                         &ai_hints, &ai_res);
2147   if (status != 0)
2148   {
2149     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2150              addr, gai_strerror (status));
2151     return (-1);
2152   }
2154   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2155   {
2156     int fd;
2157     listen_socket_t *temp;
2158     int one = 1;
2160     temp = (listen_socket_t *) realloc (listen_fds,
2161         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2162     if (temp == NULL)
2163     {
2164       fprintf (stderr,
2165                "rrdcached: open_listen_socket_network: realloc failed.\n");
2166       continue;
2167     }
2168     listen_fds = temp;
2169     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2171     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2172     if (fd < 0)
2173     {
2174       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2175                rrd_strerror(errno));
2176       continue;
2177     }
2179     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2181     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2182     if (status != 0)
2183     {
2184       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2185                sock->addr, rrd_strerror(errno));
2186       close (fd);
2187       continue;
2188     }
2190     status = listen (fd, /* backlog = */ 10);
2191     if (status != 0)
2192     {
2193       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2194                sock->addr, rrd_strerror(errno));
2195       close (fd);
2196       return (-1);
2197     }
2199     listen_fds[listen_fds_num].fd = fd;
2200     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2201     listen_fds_num++;
2202   } /* for (ai_ptr) */
2204   return (0);
2205 } /* }}} static int open_listen_socket_network */
2207 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2209   assert(sock != NULL);
2210   assert(sock->addr != NULL);
2212   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2213       || sock->addr[0] == '/')
2214     return (open_listen_socket_unix(sock));
2215   else
2216     return (open_listen_socket_network(sock));
2217 } /* }}} int open_listen_socket */
2219 static int close_listen_sockets (void) /* {{{ */
2221   size_t i;
2223   for (i = 0; i < listen_fds_num; i++)
2224   {
2225     close (listen_fds[i].fd);
2227     if (listen_fds[i].family == PF_UNIX)
2228       unlink(listen_fds[i].addr);
2229   }
2231   free (listen_fds);
2232   listen_fds = NULL;
2233   listen_fds_num = 0;
2235   return (0);
2236 } /* }}} int close_listen_sockets */
2238 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2240   struct pollfd *pollfds;
2241   int pollfds_num;
2242   int status;
2243   int i;
2245   if (listen_fds_num < 1)
2246   {
2247     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2248     return (NULL);
2249   }
2251   pollfds_num = listen_fds_num;
2252   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2253   if (pollfds == NULL)
2254   {
2255     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2256     return (NULL);
2257   }
2258   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2260   RRDD_LOG(LOG_INFO, "listening for connections");
2262   while (do_shutdown == 0)
2263   {
2264     assert (pollfds_num == ((int) listen_fds_num));
2265     for (i = 0; i < pollfds_num; i++)
2266     {
2267       pollfds[i].fd = listen_fds[i].fd;
2268       pollfds[i].events = POLLIN | POLLPRI;
2269       pollfds[i].revents = 0;
2270     }
2272     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2273     if (do_shutdown)
2274       break;
2275     else if (status == 0) /* timeout */
2276       continue;
2277     else if (status < 0) /* error */
2278     {
2279       status = errno;
2280       if (status != EINTR)
2281       {
2282         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2283       }
2284       continue;
2285     }
2287     for (i = 0; i < pollfds_num; i++)
2288     {
2289       listen_socket_t *client_sock;
2290       struct sockaddr_storage client_sa;
2291       socklen_t client_sa_size;
2292       pthread_t tid;
2293       pthread_attr_t attr;
2295       if (pollfds[i].revents == 0)
2296         continue;
2298       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2299       {
2300         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2301             "poll(2) returned something unexpected for listen FD #%i.",
2302             pollfds[i].fd);
2303         continue;
2304       }
2306       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2307       if (client_sock == NULL)
2308       {
2309         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2310         continue;
2311       }
2312       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2314       client_sa_size = sizeof (client_sa);
2315       client_sock->fd = accept (pollfds[i].fd,
2316           (struct sockaddr *) &client_sa, &client_sa_size);
2317       if (client_sock->fd < 0)
2318       {
2319         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2320         free(client_sock);
2321         continue;
2322       }
2324       pthread_attr_init (&attr);
2325       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2327       status = pthread_create (&tid, &attr, connection_thread_main,
2328                                client_sock);
2329       if (status != 0)
2330       {
2331         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2332         close_connection(client_sock);
2333         continue;
2334       }
2335     } /* for (pollfds_num) */
2336   } /* while (do_shutdown == 0) */
2338   RRDD_LOG(LOG_INFO, "starting shutdown");
2340   close_listen_sockets ();
2342   pthread_mutex_lock (&connection_threads_lock);
2343   while (connection_threads_num > 0)
2344   {
2345     pthread_t wait_for;
2347     wait_for = connection_threads[0];
2349     pthread_mutex_unlock (&connection_threads_lock);
2350     pthread_join (wait_for, /* retval = */ NULL);
2351     pthread_mutex_lock (&connection_threads_lock);
2352   }
2353   pthread_mutex_unlock (&connection_threads_lock);
2355   return (NULL);
2356 } /* }}} void *listen_thread_main */
2358 static int daemonize (void) /* {{{ */
2360   int pid_fd;
2361   char *base_dir;
2363   daemon_uid = geteuid();
2365   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2366   if (pid_fd < 0)
2367     pid_fd = check_pidfile();
2368   if (pid_fd < 0)
2369     return pid_fd;
2371   /* open all the listen sockets */
2372   if (config_listen_address_list_len > 0)
2373   {
2374     for (int i = 0; i < config_listen_address_list_len; i++)
2375       open_listen_socket (config_listen_address_list[i]);
2376   }
2377   else
2378   {
2379     listen_socket_t sock;
2380     memset(&sock, 0, sizeof(sock));
2381     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2382     open_listen_socket (&sock);
2383   }
2385   if (listen_fds_num < 1)
2386   {
2387     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2388     goto error;
2389   }
2391   if (!stay_foreground)
2392   {
2393     pid_t child;
2395     child = fork ();
2396     if (child < 0)
2397     {
2398       fprintf (stderr, "daemonize: fork(2) failed.\n");
2399       goto error;
2400     }
2401     else if (child > 0)
2402       exit(0);
2404     /* Become session leader */
2405     setsid ();
2407     /* Open the first three file descriptors to /dev/null */
2408     close (2);
2409     close (1);
2410     close (0);
2412     open ("/dev/null", O_RDWR);
2413     dup (0);
2414     dup (0);
2415   } /* if (!stay_foreground) */
2417   /* Change into the /tmp directory. */
2418   base_dir = (config_base_dir != NULL)
2419     ? config_base_dir
2420     : "/tmp";
2422   if (chdir (base_dir) != 0)
2423   {
2424     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2425     goto error;
2426   }
2428   install_signal_handlers();
2430   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2431   RRDD_LOG(LOG_INFO, "starting up");
2433   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2434   if (cache_tree == NULL)
2435   {
2436     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2437     goto error;
2438   }
2440   return write_pidfile (pid_fd);
2442 error:
2443   remove_pidfile();
2444   return -1;
2445 } /* }}} int daemonize */
2447 static int cleanup (void) /* {{{ */
2449   do_shutdown++;
2451   pthread_cond_signal (&cache_cond);
2452   pthread_join (queue_thread, /* return = */ NULL);
2454   remove_pidfile ();
2456   RRDD_LOG(LOG_INFO, "goodbye");
2457   closelog ();
2459   return (0);
2460 } /* }}} int cleanup */
2462 static int read_options (int argc, char **argv) /* {{{ */
2464   int option;
2465   int status = 0;
2467   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2468   {
2469     switch (option)
2470     {
2471       case 'g':
2472         stay_foreground=1;
2473         break;
2475       case 'L':
2476       case 'l':
2477       {
2478         listen_socket_t **temp;
2479         listen_socket_t *new;
2481         new = malloc(sizeof(listen_socket_t));
2482         if (new == NULL)
2483         {
2484           fprintf(stderr, "read_options: malloc failed.\n");
2485           return(2);
2486         }
2487         memset(new, 0, sizeof(listen_socket_t));
2489         temp = (listen_socket_t **) realloc (config_listen_address_list,
2490             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2491         if (temp == NULL)
2492         {
2493           fprintf (stderr, "read_options: realloc failed.\n");
2494           return (2);
2495         }
2496         config_listen_address_list = temp;
2498         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2499         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2501         temp[config_listen_address_list_len] = new;
2502         config_listen_address_list_len++;
2503       }
2504       break;
2506       case 'f':
2507       {
2508         int temp;
2510         temp = atoi (optarg);
2511         if (temp > 0)
2512           config_flush_interval = temp;
2513         else
2514         {
2515           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2516           status = 3;
2517         }
2518       }
2519       break;
2521       case 'w':
2522       {
2523         int temp;
2525         temp = atoi (optarg);
2526         if (temp > 0)
2527           config_write_interval = temp;
2528         else
2529         {
2530           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2531           status = 2;
2532         }
2533       }
2534       break;
2536       case 'z':
2537       {
2538         int temp;
2540         temp = atoi(optarg);
2541         if (temp > 0)
2542           config_write_jitter = temp;
2543         else
2544         {
2545           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2546           status = 2;
2547         }
2549         break;
2550       }
2552       case 'B':
2553         config_write_base_only = 1;
2554         break;
2556       case 'b':
2557       {
2558         size_t len;
2559         char base_realpath[PATH_MAX];
2561         if (config_base_dir != NULL)
2562           free (config_base_dir);
2563         config_base_dir = strdup (optarg);
2564         if (config_base_dir == NULL)
2565         {
2566           fprintf (stderr, "read_options: strdup failed.\n");
2567           return (3);
2568         }
2570         /* make sure that the base directory is not resolved via
2571          * symbolic links.  this makes some performance-enhancing
2572          * assumptions possible (we don't have to resolve paths
2573          * that start with a "/")
2574          */
2575         if (realpath(config_base_dir, base_realpath) == NULL)
2576         {
2577           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2578           return 5;
2579         }
2580         else if (strncmp(config_base_dir,
2581                          base_realpath, sizeof(base_realpath)) != 0)
2582         {
2583           fprintf(stderr,
2584                   "Base directory (-b) resolved via file system links!\n"
2585                   "Please consult rrdcached '-b' documentation!\n"
2586                   "Consider specifying the real directory (%s)\n",
2587                   base_realpath);
2588           return 5;
2589         }
2591         len = strlen (config_base_dir);
2592         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2593         {
2594           config_base_dir[len - 1] = 0;
2595           len--;
2596         }
2598         if (len < 1)
2599         {
2600           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2601           return (4);
2602         }
2604         _config_base_dir_len = len;
2605       }
2606       break;
2608       case 'p':
2609       {
2610         if (config_pid_file != NULL)
2611           free (config_pid_file);
2612         config_pid_file = strdup (optarg);
2613         if (config_pid_file == NULL)
2614         {
2615           fprintf (stderr, "read_options: strdup failed.\n");
2616           return (3);
2617         }
2618       }
2619       break;
2621       case 'F':
2622         config_flush_at_shutdown = 1;
2623         break;
2625       case 'j':
2626       {
2627         struct stat statbuf;
2628         const char *dir = optarg;
2630         status = stat(dir, &statbuf);
2631         if (status != 0)
2632         {
2633           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2634           return 6;
2635         }
2637         if (!S_ISDIR(statbuf.st_mode)
2638             || access(dir, R_OK|W_OK|X_OK) != 0)
2639         {
2640           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2641                   errno ? rrd_strerror(errno) : "");
2642           return 6;
2643         }
2645         journal_cur = malloc(PATH_MAX + 1);
2646         journal_old = malloc(PATH_MAX + 1);
2647         if (journal_cur == NULL || journal_old == NULL)
2648         {
2649           fprintf(stderr, "malloc failure for journal files\n");
2650           return 6;
2651         }
2652         else 
2653         {
2654           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2655           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2656         }
2657       }
2658       break;
2660       case 'h':
2661       case '?':
2662         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2663             "\n"
2664             "Usage: rrdcached [options]\n"
2665             "\n"
2666             "Valid options are:\n"
2667             "  -l <address>  Socket address to listen to.\n"
2668             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2669             "  -w <seconds>  Interval in which to write data.\n"
2670             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2671             "  -f <seconds>  Interval in which to flush dead data.\n"
2672             "  -p <file>     Location of the PID-file.\n"
2673             "  -b <dir>      Base directory to change to.\n"
2674             "  -B            Restrict file access to paths within -b <dir>\n"
2675             "  -g            Do not fork and run in the foreground.\n"
2676             "  -j <dir>      Directory in which to create the journal files.\n"
2677             "  -F            Always flush all updates at shutdown\n"
2678             "\n"
2679             "For more information and a detailed description of all options "
2680             "please refer\n"
2681             "to the rrdcached(1) manual page.\n",
2682             VERSION);
2683         status = -1;
2684         break;
2685     } /* switch (option) */
2686   } /* while (getopt) */
2688   /* advise the user when values are not sane */
2689   if (config_flush_interval < 2 * config_write_interval)
2690     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2691             " 2x write interval (-w) !\n");
2692   if (config_write_jitter > config_write_interval)
2693     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2694             " write interval (-w) !\n");
2696   if (config_write_base_only && config_base_dir == NULL)
2697     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2698             "  Consult the rrdcached documentation\n");
2700   if (journal_cur == NULL)
2701     config_flush_at_shutdown = 1;
2703   return (status);
2704 } /* }}} int read_options */
2706 int main (int argc, char **argv)
2708   int status;
2710   status = read_options (argc, argv);
2711   if (status != 0)
2712   {
2713     if (status < 0)
2714       status = 0;
2715     return (status);
2716   }
2718   status = daemonize ();
2719   if (status != 0)
2720   {
2721     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2722     return (1);
2723   }
2725   journal_init();
2727   /* start the queue thread */
2728   memset (&queue_thread, 0, sizeof (queue_thread));
2729   status = pthread_create (&queue_thread,
2730                            NULL, /* attr */
2731                            queue_thread_main,
2732                            NULL); /* args */
2733   if (status != 0)
2734   {
2735     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2736     cleanup();
2737     return (1);
2738   }
2740   listen_thread_main (NULL);
2741   cleanup ();
2743   return (0);
2744 } /* int main */
2746 /*
2747  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2748  */