Code

Under most circumstances, rrdcached can detect a stale pid file.
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102  * Types
103  */
104 typedef enum
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
119   /* state for BATCH processing */
120   time_t batch_start;
121   int batch_cmd;
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
150 struct callback_flush_data_s
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
159 enum queue_side_e
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(char *action, int oflag) /* {{{ */
291   int fd;
292   char *file;
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
298   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
301             action, file, rrd_strerror(errno));
303   return(fd);
304 } /* }}} static int open_pidfile */
306 /* check existing pid file to see whether a daemon is running */
307 static int check_pidfile(void)
309   int pid_fd;
310   pid_t pid;
311   char pid_str[16];
313   pid_fd = open_pidfile("open", O_RDWR);
314   if (pid_fd < 0)
315     return pid_fd;
317   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
318     return -1;
320   pid = atoi(pid_str);
321   if (pid <= 0)
322     return -1;
324   /* another running process that we can signal COULD be
325    * a competing rrdcached */
326   if (pid != getpid() && kill(pid, 0) == 0)
327   {
328     fprintf(stderr,
329             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
330     close(pid_fd);
331     return -1;
332   }
334   lseek(pid_fd, 0, SEEK_SET);
335   ftruncate(pid_fd, 0);
337   fprintf(stderr,
338           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
339           "rrdcached: starting normally.\n", pid);
341   return pid_fd;
342 } /* }}} static int check_pidfile */
344 static int write_pidfile (int fd) /* {{{ */
346   pid_t pid;
347   FILE *fh;
349   pid = getpid ();
351   fh = fdopen (fd, "w");
352   if (fh == NULL)
353   {
354     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
355     close(fd);
356     return (-1);
357   }
359   fprintf (fh, "%i\n", (int) pid);
360   fclose (fh);
362   return (0);
363 } /* }}} int write_pidfile */
365 static int remove_pidfile (void) /* {{{ */
367   char *file;
368   int status;
370   file = (config_pid_file != NULL)
371     ? config_pid_file
372     : LOCALSTATEDIR "/run/rrdcached.pid";
374   status = unlink (file);
375   if (status == 0)
376     return (0);
377   return (errno);
378 } /* }}} int remove_pidfile */
380 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
382   char *eol;
384   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
385                sock->next_read - sock->next_cmd);
387   if (eol == NULL)
388   {
389     /* no commands left, move remainder back to front of rbuf */
390     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
391             sock->next_read - sock->next_cmd);
392     sock->next_read -= sock->next_cmd;
393     sock->next_cmd = 0;
394     *len = 0;
395     return NULL;
396   }
397   else
398   {
399     char *cmd = sock->rbuf + sock->next_cmd;
400     *eol = '\0';
402     sock->next_cmd = eol - sock->rbuf + 1;
404     if (eol > sock->rbuf && *(eol-1) == '\r')
405       *(--eol) = '\0'; /* handle "\r\n" EOL */
407     *len = eol - cmd;
409     return cmd;
410   }
412   /* NOTREACHED */
413   assert(1==0);
416 /* add the characters directly to the write buffer */
417 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
419   char *new_buf;
421   assert(sock != NULL);
423   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
424   if (new_buf == NULL)
425   {
426     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
427     return -1;
428   }
430   strncpy(new_buf + sock->wbuf_len, str, len + 1);
432   sock->wbuf = new_buf;
433   sock->wbuf_len += len;
435   return 0;
436 } /* }}} static int add_to_wbuf */
438 /* add the text to the "extra" info that's sent after the status line */
439 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
441   va_list argp;
442   char buffer[CMD_MAX];
443   int len;
445   if (sock == NULL) return 0; /* journal replay mode */
446   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
448   va_start(argp, fmt);
449 #ifdef HAVE_VSNPRINTF
450   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
451 #else
452   len = vsprintf(buffer, fmt, argp);
453 #endif
454   va_end(argp);
455   if (len < 0)
456   {
457     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
458     return -1;
459   }
461   return add_to_wbuf(sock, buffer, len);
462 } /* }}} static int add_response_info */
464 static int count_lines(char *str) /* {{{ */
466   int lines = 0;
468   if (str != NULL)
469   {
470     while ((str = strchr(str, '\n')) != NULL)
471     {
472       ++lines;
473       ++str;
474     }
475   }
477   return lines;
478 } /* }}} static int count_lines */
480 /* send the response back to the user.
481  * returns 0 on success, -1 on error
482  * write buffer is always zeroed after this call */
483 static int send_response (listen_socket_t *sock, response_code rc,
484                           char *fmt, ...) /* {{{ */
486   va_list argp;
487   char buffer[CMD_MAX];
488   int lines;
489   ssize_t wrote;
490   int rclen, len;
492   if (sock == NULL) return rc;  /* journal replay mode */
494   if (sock->batch_start)
495   {
496     if (rc == RESP_OK)
497       return rc; /* no response on success during BATCH */
498     lines = sock->batch_cmd;
499   }
500   else if (rc == RESP_OK)
501     lines = count_lines(sock->wbuf);
502   else
503     lines = -1;
505   rclen = sprintf(buffer, "%d ", lines);
506   va_start(argp, fmt);
507 #ifdef HAVE_VSNPRINTF
508   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
509 #else
510   len = vsprintf(buffer+rclen, fmt, argp);
511 #endif
512   va_end(argp);
513   if (len < 0)
514     return -1;
516   len += rclen;
518   /* append the result to the wbuf, don't write to the user */
519   if (sock->batch_start)
520     return add_to_wbuf(sock, buffer, len);
522   /* first write must be complete */
523   if (len != write(sock->fd, buffer, len))
524   {
525     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
526     return -1;
527   }
529   if (sock->wbuf != NULL && rc == RESP_OK)
530   {
531     wrote = 0;
532     while (wrote < sock->wbuf_len)
533     {
534       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
535       if (wb <= 0)
536       {
537         RRDD_LOG(LOG_INFO, "send_response: could not write results");
538         return -1;
539       }
540       wrote += wb;
541     }
542   }
544   free(sock->wbuf); sock->wbuf = NULL;
545   sock->wbuf_len = 0;
547   return 0;
548 } /* }}} */
550 static void wipe_ci_values(cache_item_t *ci, time_t when)
552   ci->values = NULL;
553   ci->values_num = 0;
555   ci->last_flush_time = when;
556   if (config_write_jitter > 0)
557     ci->last_flush_time += (random() % config_write_jitter);
560 /* remove_from_queue
561  * remove a "cache_item_t" item from the queue.
562  * must hold 'cache_lock' when calling this
563  */
564 static void remove_from_queue(cache_item_t *ci) /* {{{ */
566   if (ci == NULL) return;
568   if (ci->prev == NULL)
569     cache_queue_head = ci->next; /* reset head */
570   else
571     ci->prev->next = ci->next;
573   if (ci->next == NULL)
574     cache_queue_tail = ci->prev; /* reset the tail */
575   else
576     ci->next->prev = ci->prev;
578   ci->next = ci->prev = NULL;
579   ci->flags &= ~CI_FLAGS_IN_QUEUE;
580 } /* }}} static void remove_from_queue */
582 /* remove an entry from the tree and free all its resources.
583  * must hold 'cache lock' while calling this.
584  * returns 0 on success, otherwise errno */
585 static int forget_file(const char *file)
587   cache_item_t *ci;
589   ci = g_tree_lookup(cache_tree, file);
590   if (ci == NULL)
591     return ENOENT;
593   g_tree_remove (cache_tree, file);
594   remove_from_queue(ci);
596   for (int i=0; i < ci->values_num; i++)
597     free(ci->values[i]);
599   free (ci->values);
600   free (ci->file);
602   /* in case anyone is waiting */
603   pthread_cond_broadcast(&ci->flushed);
605   free (ci);
607   return 0;
608 } /* }}} static int forget_file */
610 /*
611  * enqueue_cache_item:
612  * `cache_lock' must be acquired before calling this function!
613  */
614 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
615     queue_side_t side)
617   if (ci == NULL)
618     return (-1);
620   if (ci->values_num == 0)
621     return (0);
623   if (side == HEAD)
624   {
625     if (cache_queue_head == ci)
626       return 0;
628     /* remove from the double linked list */
629     if (ci->flags & CI_FLAGS_IN_QUEUE)
630       remove_from_queue(ci);
632     ci->prev = NULL;
633     ci->next = cache_queue_head;
634     if (ci->next != NULL)
635       ci->next->prev = ci;
636     cache_queue_head = ci;
638     if (cache_queue_tail == NULL)
639       cache_queue_tail = cache_queue_head;
640   }
641   else /* (side == TAIL) */
642   {
643     /* We don't move values back in the list.. */
644     if (ci->flags & CI_FLAGS_IN_QUEUE)
645       return (0);
647     assert (ci->next == NULL);
648     assert (ci->prev == NULL);
650     ci->prev = cache_queue_tail;
652     if (cache_queue_tail == NULL)
653       cache_queue_head = ci;
654     else
655       cache_queue_tail->next = ci;
657     cache_queue_tail = ci;
658   }
660   ci->flags |= CI_FLAGS_IN_QUEUE;
662   pthread_cond_broadcast(&cache_cond);
663   pthread_mutex_lock (&stats_lock);
664   stats_queue_length++;
665   pthread_mutex_unlock (&stats_lock);
667   return (0);
668 } /* }}} int enqueue_cache_item */
670 /*
671  * tree_callback_flush:
672  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
673  * while this is in progress.
674  */
675 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
676     gpointer data)
678   cache_item_t *ci;
679   callback_flush_data_t *cfd;
681   ci = (cache_item_t *) value;
682   cfd = (callback_flush_data_t *) data;
684   if ((ci->last_flush_time <= cfd->abs_timeout)
685       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
686       && (ci->values_num > 0))
687   {
688     enqueue_cache_item (ci, TAIL);
689   }
690   else if ((do_shutdown != 0)
691       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
692       && (ci->values_num > 0))
693   {
694     enqueue_cache_item (ci, TAIL);
695   }
696   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
697       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
698       && (ci->values_num <= 0))
699   {
700     char **temp;
702     temp = (char **) realloc (cfd->keys,
703         sizeof (char *) * (cfd->keys_num + 1));
704     if (temp == NULL)
705     {
706       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
707       return (FALSE);
708     }
709     cfd->keys = temp;
710     /* Make really sure this points to the _same_ place */
711     assert ((char *) key == ci->file);
712     cfd->keys[cfd->keys_num] = (char *) key;
713     cfd->keys_num++;
714   }
716   return (FALSE);
717 } /* }}} gboolean tree_callback_flush */
719 static int flush_old_values (int max_age)
721   callback_flush_data_t cfd;
722   size_t k;
724   memset (&cfd, 0, sizeof (cfd));
725   /* Pass the current time as user data so that we don't need to call
726    * `time' for each node. */
727   cfd.now = time (NULL);
728   cfd.keys = NULL;
729   cfd.keys_num = 0;
731   if (max_age > 0)
732     cfd.abs_timeout = cfd.now - max_age;
733   else
734     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
736   /* `tree_callback_flush' will return the keys of all values that haven't
737    * been touched in the last `config_flush_interval' seconds in `cfd'.
738    * The char*'s in this array point to the same memory as ci->file, so we
739    * don't need to free them separately. */
740   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
742   for (k = 0; k < cfd.keys_num; k++)
743   {
744     /* should never fail, since we have held the cache_lock
745      * the entire time */
746     assert( forget_file(cfd.keys[k]) == 0 );
747   }
749   if (cfd.keys != NULL)
750   {
751     free (cfd.keys);
752     cfd.keys = NULL;
753   }
755   return (0);
756 } /* int flush_old_values */
758 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
760   struct timeval now;
761   struct timespec next_flush;
762   int final_flush = 0; /* make sure we only flush once on shutdown */
764   gettimeofday (&now, NULL);
765   next_flush.tv_sec = now.tv_sec + config_flush_interval;
766   next_flush.tv_nsec = 1000 * now.tv_usec;
768   pthread_mutex_lock (&cache_lock);
769   while ((do_shutdown == 0) || (cache_queue_head != NULL))
770   {
771     cache_item_t *ci;
772     char *file;
773     char **values;
774     int values_num;
775     int status;
776     int i;
778     /* First, check if it's time to do the cache flush. */
779     gettimeofday (&now, NULL);
780     if ((now.tv_sec > next_flush.tv_sec)
781         || ((now.tv_sec == next_flush.tv_sec)
782           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
783     {
784       /* Flush all values that haven't been written in the last
785        * `config_write_interval' seconds. */
786       flush_old_values (config_write_interval);
788       /* Determine the time of the next cache flush. */
789       next_flush.tv_sec =
790         now.tv_sec + next_flush.tv_sec % config_flush_interval;
792       /* unlock the cache while we rotate so we don't block incoming
793        * updates if the fsync() blocks on disk I/O */
794       pthread_mutex_unlock(&cache_lock);
795       journal_rotate();
796       pthread_mutex_lock(&cache_lock);
797     }
799     /* Now, check if there's something to store away. If not, wait until
800      * something comes in or it's time to do the cache flush.  if we are
801      * shutting down, do not wait around.  */
802     if (cache_queue_head == NULL && !do_shutdown)
803     {
804       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
805       if ((status != 0) && (status != ETIMEDOUT))
806       {
807         RRDD_LOG (LOG_ERR, "queue_thread_main: "
808             "pthread_cond_timedwait returned %i.", status);
809       }
810     }
812     /* We're about to shut down */
813     if (do_shutdown != 0 && !final_flush++)
814     {
815       if (config_flush_at_shutdown)
816         flush_old_values (-1); /* flush everything */
817       else
818         break;
819     }
821     /* Check if a value has arrived. This may be NULL if we timed out or there
822      * was an interrupt such as a signal. */
823     if (cache_queue_head == NULL)
824       continue;
826     ci = cache_queue_head;
828     /* copy the relevant parts */
829     file = strdup (ci->file);
830     if (file == NULL)
831     {
832       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
833       continue;
834     }
836     assert(ci->values != NULL);
837     assert(ci->values_num > 0);
839     values = ci->values;
840     values_num = ci->values_num;
842     wipe_ci_values(ci, time(NULL));
843     remove_from_queue(ci);
845     pthread_mutex_lock (&stats_lock);
846     assert (stats_queue_length > 0);
847     stats_queue_length--;
848     pthread_mutex_unlock (&stats_lock);
850     pthread_mutex_unlock (&cache_lock);
852     rrd_clear_error ();
853     status = rrd_update_r (file, NULL, values_num, (void *) values);
854     if (status != 0)
855     {
856       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
857           "rrd_update_r (%s) failed with status %i. (%s)",
858           file, status, rrd_get_error());
859     }
861     journal_write("wrote", file);
862     pthread_cond_broadcast(&ci->flushed);
864     for (i = 0; i < values_num; i++)
865       free (values[i]);
867     free(values);
868     free(file);
870     if (status == 0)
871     {
872       pthread_mutex_lock (&stats_lock);
873       stats_updates_written++;
874       stats_data_sets_written += values_num;
875       pthread_mutex_unlock (&stats_lock);
876     }
878     pthread_mutex_lock (&cache_lock);
880     /* We're about to shut down */
881     if (do_shutdown != 0 && !final_flush++)
882     {
883       if (config_flush_at_shutdown)
884           flush_old_values (-1); /* flush everything */
885       else
886         break;
887     }
888   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
889   pthread_mutex_unlock (&cache_lock);
891   if (config_flush_at_shutdown)
892   {
893     assert(cache_queue_head == NULL);
894     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
895   }
897   journal_done();
899   return (NULL);
900 } /* }}} void *queue_thread_main */
902 static int buffer_get_field (char **buffer_ret, /* {{{ */
903     size_t *buffer_size_ret, char **field_ret)
905   char *buffer;
906   size_t buffer_pos;
907   size_t buffer_size;
908   char *field;
909   size_t field_size;
910   int status;
912   buffer = *buffer_ret;
913   buffer_pos = 0;
914   buffer_size = *buffer_size_ret;
915   field = *buffer_ret;
916   field_size = 0;
918   if (buffer_size <= 0)
919     return (-1);
921   /* This is ensured by `handle_request'. */
922   assert (buffer[buffer_size - 1] == '\0');
924   status = -1;
925   while (buffer_pos < buffer_size)
926   {
927     /* Check for end-of-field or end-of-buffer */
928     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
929     {
930       field[field_size] = 0;
931       field_size++;
932       buffer_pos++;
933       status = 0;
934       break;
935     }
936     /* Handle escaped characters. */
937     else if (buffer[buffer_pos] == '\\')
938     {
939       if (buffer_pos >= (buffer_size - 1))
940         break;
941       buffer_pos++;
942       field[field_size] = buffer[buffer_pos];
943       field_size++;
944       buffer_pos++;
945     }
946     /* Normal operation */ 
947     else
948     {
949       field[field_size] = buffer[buffer_pos];
950       field_size++;
951       buffer_pos++;
952     }
953   } /* while (buffer_pos < buffer_size) */
955   if (status != 0)
956     return (status);
958   *buffer_ret = buffer + buffer_pos;
959   *buffer_size_ret = buffer_size - buffer_pos;
960   *field_ret = field;
962   return (0);
963 } /* }}} int buffer_get_field */
965 /* if we're restricting writes to the base directory,
966  * check whether the file falls within the dir
967  * returns 1 if OK, otherwise 0
968  */
969 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
971   assert(file != NULL);
973   if (!config_write_base_only
974       || sock == NULL /* journal replay */
975       || config_base_dir == NULL)
976     return 1;
978   if (strstr(file, "../") != NULL) goto err;
980   /* relative paths without "../" are ok */
981   if (*file != '/') return 1;
983   /* file must be of the format base + "/" + <1+ char filename> */
984   if (strlen(file) < _config_base_dir_len + 2) goto err;
985   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
986   if (*(file + _config_base_dir_len) != '/') goto err;
988   return 1;
990 err:
991   if (sock != NULL && sock->fd >= 0)
992     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
994   return 0;
995 } /* }}} static int check_file_access */
997 /* when using a base dir, convert relative paths to absolute paths.
998  * if necessary, modifies the "filename" pointer to point
999  * to the new path created in "tmp".  "tmp" is provided
1000  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1001  *
1002  * this allows us to optimize for the expected case (absolute path)
1003  * with a no-op.
1004  */
1005 static void get_abs_path(char **filename, char *tmp)
1007   assert(tmp != NULL);
1008   assert(filename != NULL && *filename != NULL);
1010   if (config_base_dir == NULL || **filename == '/')
1011     return;
1013   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1014   *filename = tmp;
1015 } /* }}} static int get_abs_path */
1017 /* returns 1 if we have the required privilege level,
1018  * otherwise issue an error to the user on sock */
1019 static int has_privilege (listen_socket_t *sock, /* {{{ */
1020                           socket_privilege priv)
1022   if (sock == NULL) /* journal replay */
1023     return 1;
1025   if (sock->privilege >= priv)
1026     return 1;
1028   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1029 } /* }}} static int has_privilege */
1031 static int flush_file (const char *filename) /* {{{ */
1033   cache_item_t *ci;
1035   pthread_mutex_lock (&cache_lock);
1037   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1038   if (ci == NULL)
1039   {
1040     pthread_mutex_unlock (&cache_lock);
1041     return (ENOENT);
1042   }
1044   if (ci->values_num > 0)
1045   {
1046     /* Enqueue at head */
1047     enqueue_cache_item (ci, HEAD);
1048     pthread_cond_wait(&ci->flushed, &cache_lock);
1049   }
1051   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1052    * may have been purged during our cond_wait() */
1054   pthread_mutex_unlock(&cache_lock);
1056   return (0);
1057 } /* }}} int flush_file */
1059 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1060     char *buffer, size_t buffer_size)
1062   int status;
1063   char **help_text;
1064   char *command;
1066   char *help_help[2] =
1067   {
1068     "Command overview\n"
1069     ,
1070     "HELP [<command>]\n"
1071     "FLUSH <filename>\n"
1072     "FLUSHALL\n"
1073     "PENDING <filename>\n"
1074     "FORGET <filename>\n"
1075     "UPDATE <filename> <values> [<values> ...]\n"
1076     "BATCH\n"
1077     "STATS\n"
1078   };
1080   char *help_flush[2] =
1081   {
1082     "Help for FLUSH\n"
1083     ,
1084     "Usage: FLUSH <filename>\n"
1085     "\n"
1086     "Adds the given filename to the head of the update queue and returns\n"
1087     "after is has been dequeued.\n"
1088   };
1090   char *help_flushall[2] =
1091   {
1092     "Help for FLUSHALL\n"
1093     ,
1094     "Usage: FLUSHALL\n"
1095     "\n"
1096     "Triggers writing of all pending updates.  Returns immediately.\n"
1097   };
1099   char *help_pending[2] =
1100   {
1101     "Help for PENDING\n"
1102     ,
1103     "Usage: PENDING <filename>\n"
1104     "\n"
1105     "Shows any 'pending' updates for a file, in order.\n"
1106     "The updates shown have not yet been written to the underlying RRD file.\n"
1107   };
1109   char *help_forget[2] =
1110   {
1111     "Help for FORGET\n"
1112     ,
1113     "Usage: FORGET <filename>\n"
1114     "\n"
1115     "Removes the file completely from the cache.\n"
1116     "Any pending updates for the file will be lost.\n"
1117   };
1119   char *help_update[2] =
1120   {
1121     "Help for UPDATE\n"
1122     ,
1123     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1124     "\n"
1125     "Adds the given file to the internal cache if it is not yet known and\n"
1126     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1127     "for details.\n"
1128     "\n"
1129     "Each <values> has the following form:\n"
1130     "  <values> = <time>:<value>[:<value>[...]]\n"
1131     "See the rrdupdate(1) manpage for details.\n"
1132   };
1134   char *help_stats[2] =
1135   {
1136     "Help for STATS\n"
1137     ,
1138     "Usage: STATS\n"
1139     "\n"
1140     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1141     "a description of the values.\n"
1142   };
1144   char *help_batch[2] =
1145   {
1146     "Help for BATCH\n"
1147     ,
1148     "The 'BATCH' command permits the client to initiate a bulk load\n"
1149     "   of commands to rrdcached.\n"
1150     "\n"
1151     "Usage:\n"
1152     "\n"
1153     "    client: BATCH\n"
1154     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1155     "    client: command #1\n"
1156     "    client: command #2\n"
1157     "    client: ... and so on\n"
1158     "    client: .\n"
1159     "    server: 2 errors\n"
1160     "    server: 7 message for command #7\n"
1161     "    server: 9 message for command #9\n"
1162     "\n"
1163     "For more information, consult the rrdcached(1) documentation.\n"
1164   };
1166   status = buffer_get_field (&buffer, &buffer_size, &command);
1167   if (status != 0)
1168     help_text = help_help;
1169   else
1170   {
1171     if (strcasecmp (command, "update") == 0)
1172       help_text = help_update;
1173     else if (strcasecmp (command, "flush") == 0)
1174       help_text = help_flush;
1175     else if (strcasecmp (command, "flushall") == 0)
1176       help_text = help_flushall;
1177     else if (strcasecmp (command, "pending") == 0)
1178       help_text = help_pending;
1179     else if (strcasecmp (command, "forget") == 0)
1180       help_text = help_forget;
1181     else if (strcasecmp (command, "stats") == 0)
1182       help_text = help_stats;
1183     else if (strcasecmp (command, "batch") == 0)
1184       help_text = help_batch;
1185     else
1186       help_text = help_help;
1187   }
1189   add_response_info(sock, help_text[1]);
1190   return send_response(sock, RESP_OK, help_text[0]);
1191 } /* }}} int handle_request_help */
1193 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1195   uint64_t copy_queue_length;
1196   uint64_t copy_updates_received;
1197   uint64_t copy_flush_received;
1198   uint64_t copy_updates_written;
1199   uint64_t copy_data_sets_written;
1200   uint64_t copy_journal_bytes;
1201   uint64_t copy_journal_rotate;
1203   uint64_t tree_nodes_number;
1204   uint64_t tree_depth;
1206   pthread_mutex_lock (&stats_lock);
1207   copy_queue_length       = stats_queue_length;
1208   copy_updates_received   = stats_updates_received;
1209   copy_flush_received     = stats_flush_received;
1210   copy_updates_written    = stats_updates_written;
1211   copy_data_sets_written  = stats_data_sets_written;
1212   copy_journal_bytes      = stats_journal_bytes;
1213   copy_journal_rotate     = stats_journal_rotate;
1214   pthread_mutex_unlock (&stats_lock);
1216   pthread_mutex_lock (&cache_lock);
1217   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1218   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1219   pthread_mutex_unlock (&cache_lock);
1221   add_response_info(sock,
1222                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1223   add_response_info(sock,
1224                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1225   add_response_info(sock,
1226                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1227   add_response_info(sock,
1228                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1229   add_response_info(sock,
1230                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1231   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1232   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1233   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1234   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1236   send_response(sock, RESP_OK, "Statistics follow\n");
1238   return (0);
1239 } /* }}} int handle_request_stats */
1241 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1242     char *buffer, size_t buffer_size)
1244   char *file, file_tmp[PATH_MAX];
1245   int status;
1247   status = buffer_get_field (&buffer, &buffer_size, &file);
1248   if (status != 0)
1249   {
1250     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1251   }
1252   else
1253   {
1254     pthread_mutex_lock(&stats_lock);
1255     stats_flush_received++;
1256     pthread_mutex_unlock(&stats_lock);
1258     get_abs_path(&file, file_tmp);
1259     if (!check_file_access(file, sock)) return 0;
1261     status = flush_file (file);
1262     if (status == 0)
1263       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1264     else if (status == ENOENT)
1265     {
1266       /* no file in our tree; see whether it exists at all */
1267       struct stat statbuf;
1269       memset(&statbuf, 0, sizeof(statbuf));
1270       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1271         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1272       else
1273         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1274     }
1275     else if (status < 0)
1276       return send_response(sock, RESP_ERR, "Internal error.\n");
1277     else
1278       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1279   }
1281   /* NOTREACHED */
1282   assert(1==0);
1283 } /* }}} int handle_request_flush */
1285 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1287   int status;
1289   status = has_privilege(sock, PRIV_HIGH);
1290   if (status <= 0)
1291     return status;
1293   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1295   pthread_mutex_lock(&cache_lock);
1296   flush_old_values(-1);
1297   pthread_mutex_unlock(&cache_lock);
1299   return send_response(sock, RESP_OK, "Started flush.\n");
1300 } /* }}} static int handle_request_flushall */
1302 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1303                                   char *buffer, size_t buffer_size)
1305   int status;
1306   char *file, file_tmp[PATH_MAX];
1307   cache_item_t *ci;
1309   status = buffer_get_field(&buffer, &buffer_size, &file);
1310   if (status != 0)
1311     return send_response(sock, RESP_ERR,
1312                          "Usage: PENDING <filename>\n");
1314   status = has_privilege(sock, PRIV_HIGH);
1315   if (status <= 0)
1316     return status;
1318   get_abs_path(&file, file_tmp);
1320   pthread_mutex_lock(&cache_lock);
1321   ci = g_tree_lookup(cache_tree, file);
1322   if (ci == NULL)
1323   {
1324     pthread_mutex_unlock(&cache_lock);
1325     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1326   }
1328   for (int i=0; i < ci->values_num; i++)
1329     add_response_info(sock, "%s\n", ci->values[i]);
1331   pthread_mutex_unlock(&cache_lock);
1332   return send_response(sock, RESP_OK, "updates pending\n");
1333 } /* }}} static int handle_request_pending */
1335 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1336                                  char *buffer, size_t buffer_size)
1338   int status;
1339   char *file, file_tmp[PATH_MAX];
1341   status = buffer_get_field(&buffer, &buffer_size, &file);
1342   if (status != 0)
1343     return send_response(sock, RESP_ERR,
1344                          "Usage: FORGET <filename>\n");
1346   status = has_privilege(sock, PRIV_HIGH);
1347   if (status <= 0)
1348     return status;
1350   get_abs_path(&file, file_tmp);
1351   if (!check_file_access(file, sock)) return 0;
1353   pthread_mutex_lock(&cache_lock);
1354   status = forget_file(file);
1355   pthread_mutex_unlock(&cache_lock);
1357   if (status == 0)
1358   {
1359     if (sock != NULL)
1360       journal_write("forget", file);
1362     return send_response(sock, RESP_OK, "Gone!\n");
1363   }
1364   else
1365     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1366                          status < 0 ? "Internal error" : rrd_strerror(status));
1368   /* NOTREACHED */
1369   assert(1==0);
1370 } /* }}} static int handle_request_forget */
1372 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1373                                   time_t now,
1374                                   char *buffer, size_t buffer_size)
1376   char *file, file_tmp[PATH_MAX];
1377   int values_num = 0;
1378   int bad_timestamps = 0;
1379   int status;
1380   char orig_buf[CMD_MAX];
1382   cache_item_t *ci;
1384   status = has_privilege(sock, PRIV_HIGH);
1385   if (status <= 0)
1386     return status;
1388   /* save it for the journal later */
1389   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1391   status = buffer_get_field (&buffer, &buffer_size, &file);
1392   if (status != 0)
1393     return send_response(sock, RESP_ERR,
1394                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1396   pthread_mutex_lock(&stats_lock);
1397   stats_updates_received++;
1398   pthread_mutex_unlock(&stats_lock);
1400   get_abs_path(&file, file_tmp);
1401   if (!check_file_access(file, sock)) return 0;
1403   pthread_mutex_lock (&cache_lock);
1404   ci = g_tree_lookup (cache_tree, file);
1406   if (ci == NULL) /* {{{ */
1407   {
1408     struct stat statbuf;
1410     /* don't hold the lock while we setup; stat(2) might block */
1411     pthread_mutex_unlock(&cache_lock);
1413     memset (&statbuf, 0, sizeof (statbuf));
1414     status = stat (file, &statbuf);
1415     if (status != 0)
1416     {
1417       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1419       status = errno;
1420       if (status == ENOENT)
1421         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1422       else
1423         return send_response(sock, RESP_ERR,
1424                              "stat failed with error %i.\n", status);
1425     }
1426     if (!S_ISREG (statbuf.st_mode))
1427       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1429     if (access(file, R_OK|W_OK) != 0)
1430       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1431                            file, rrd_strerror(errno));
1433     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1434     if (ci == NULL)
1435     {
1436       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1438       return send_response(sock, RESP_ERR, "malloc failed.\n");
1439     }
1440     memset (ci, 0, sizeof (cache_item_t));
1442     ci->file = strdup (file);
1443     if (ci->file == NULL)
1444     {
1445       free (ci);
1446       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1448       return send_response(sock, RESP_ERR, "strdup failed.\n");
1449     }
1451     wipe_ci_values(ci, now);
1452     ci->flags = CI_FLAGS_IN_TREE;
1454     pthread_mutex_lock(&cache_lock);
1455     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1456   } /* }}} */
1457   assert (ci != NULL);
1459   /* don't re-write updates in replay mode */
1460   if (sock != NULL)
1461     journal_write("update", orig_buf);
1463   while (buffer_size > 0)
1464   {
1465     char **temp;
1466     char *value;
1467     time_t stamp;
1468     char *eostamp;
1470     status = buffer_get_field (&buffer, &buffer_size, &value);
1471     if (status != 0)
1472     {
1473       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1474       break;
1475     }
1477     /* make sure update time is always moving forward */
1478     stamp = strtol(value, &eostamp, 10);
1479     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1480     {
1481       ++bad_timestamps;
1482       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1483       continue;
1484     }
1485     else if (stamp <= ci->last_update_stamp)
1486     {
1487       ++bad_timestamps;
1488       add_response_info(sock,
1489                         "illegal attempt to update using time %ld when"
1490                         " last update time is %ld (minimum one second step)\n",
1491                         stamp, ci->last_update_stamp);
1492       continue;
1493     }
1494     else
1495       ci->last_update_stamp = stamp;
1497     temp = (char **) realloc (ci->values,
1498         sizeof (char *) * (ci->values_num + 1));
1499     if (temp == NULL)
1500     {
1501       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1502       continue;
1503     }
1504     ci->values = temp;
1506     ci->values[ci->values_num] = strdup (value);
1507     if (ci->values[ci->values_num] == NULL)
1508     {
1509       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1510       continue;
1511     }
1512     ci->values_num++;
1514     values_num++;
1515   }
1517   if (((now - ci->last_flush_time) >= config_write_interval)
1518       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1519       && (ci->values_num > 0))
1520   {
1521     enqueue_cache_item (ci, TAIL);
1522   }
1524   pthread_mutex_unlock (&cache_lock);
1526   if (values_num < 1)
1527   {
1528     /* if we had only one update attempt, then return the full
1529        error message... try to get the most information out
1530        of the limited error space allowed by the protocol
1531     */
1532     if (bad_timestamps == 1)
1533       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1534     else
1535       return send_response(sock, RESP_ERR,
1536                            "No values updated (%d bad timestamps).\n",
1537                            bad_timestamps);
1538   }
1539   else
1540     return send_response(sock, RESP_OK,
1541                          "errors, enqueued %i value(s).\n", values_num);
1543   /* NOTREACHED */
1544   assert(1==0);
1546 } /* }}} int handle_request_update */
1548 /* we came across a "WROTE" entry during journal replay.
1549  * throw away any values that we have accumulated for this file
1550  */
1551 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1553   int i;
1554   cache_item_t *ci;
1555   const char *file = buffer;
1557   pthread_mutex_lock(&cache_lock);
1559   ci = g_tree_lookup(cache_tree, file);
1560   if (ci == NULL)
1561   {
1562     pthread_mutex_unlock(&cache_lock);
1563     return (0);
1564   }
1566   if (ci->values)
1567   {
1568     for (i=0; i < ci->values_num; i++)
1569       free(ci->values[i]);
1571     free(ci->values);
1572   }
1574   wipe_ci_values(ci, now);
1575   remove_from_queue(ci);
1577   pthread_mutex_unlock(&cache_lock);
1578   return (0);
1579 } /* }}} int handle_request_wrote */
1581 /* start "BATCH" processing */
1582 static int batch_start (listen_socket_t *sock) /* {{{ */
1584   int status;
1585   if (sock->batch_start)
1586     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1588   status = send_response(sock, RESP_OK,
1589                          "Go ahead.  End with dot '.' on its own line.\n");
1590   sock->batch_start = time(NULL);
1591   sock->batch_cmd = 0;
1593   return status;
1594 } /* }}} static int batch_start */
1596 /* finish "BATCH" processing and return results to the client */
1597 static int batch_done (listen_socket_t *sock) /* {{{ */
1599   assert(sock->batch_start);
1600   sock->batch_start = 0;
1601   sock->batch_cmd  = 0;
1602   return send_response(sock, RESP_OK, "errors\n");
1603 } /* }}} static int batch_done */
1605 /* if sock==NULL, we are in journal replay mode */
1606 static int handle_request (listen_socket_t *sock, /* {{{ */
1607                            time_t now,
1608                            char *buffer, size_t buffer_size)
1610   char *buffer_ptr;
1611   char *command;
1612   int status;
1614   assert (buffer[buffer_size - 1] == '\0');
1616   buffer_ptr = buffer;
1617   command = NULL;
1618   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1619   if (status != 0)
1620   {
1621     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1622     return (-1);
1623   }
1625   if (sock != NULL && sock->batch_start)
1626     sock->batch_cmd++;
1628   if (strcasecmp (command, "update") == 0)
1629     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1630   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1631   {
1632     /* this is only valid in replay mode */
1633     return (handle_request_wrote (buffer_ptr, now));
1634   }
1635   else if (strcasecmp (command, "flush") == 0)
1636     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1637   else if (strcasecmp (command, "flushall") == 0)
1638     return (handle_request_flushall(sock));
1639   else if (strcasecmp (command, "pending") == 0)
1640     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1641   else if (strcasecmp (command, "forget") == 0)
1642     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1643   else if (strcasecmp (command, "stats") == 0)
1644     return (handle_request_stats (sock));
1645   else if (strcasecmp (command, "help") == 0)
1646     return (handle_request_help (sock, buffer_ptr, buffer_size));
1647   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1648     return batch_start(sock);
1649   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1650     return batch_done(sock);
1651   else
1652     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1654   /* NOTREACHED */
1655   assert(1==0);
1656 } /* }}} int handle_request */
1658 /* MUST NOT hold journal_lock before calling this */
1659 static void journal_rotate(void) /* {{{ */
1661   FILE *old_fh = NULL;
1662   int new_fd;
1664   if (journal_cur == NULL || journal_old == NULL)
1665     return;
1667   pthread_mutex_lock(&journal_lock);
1669   /* we rotate this way (rename before close) so that the we can release
1670    * the journal lock as fast as possible.  Journal writes to the new
1671    * journal can proceed immediately after the new file is opened.  The
1672    * fclose can then block without affecting new updates.
1673    */
1674   if (journal_fh != NULL)
1675   {
1676     old_fh = journal_fh;
1677     journal_fh = NULL;
1678     rename(journal_cur, journal_old);
1679     ++stats_journal_rotate;
1680   }
1682   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1683                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1684   if (new_fd >= 0)
1685   {
1686     journal_fh = fdopen(new_fd, "a");
1687     if (journal_fh == NULL)
1688       close(new_fd);
1689   }
1691   pthread_mutex_unlock(&journal_lock);
1693   if (old_fh != NULL)
1694     fclose(old_fh);
1696   if (journal_fh == NULL)
1697   {
1698     RRDD_LOG(LOG_CRIT,
1699              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1700              journal_cur, rrd_strerror(errno));
1702     RRDD_LOG(LOG_ERR,
1703              "JOURNALING DISABLED: All values will be flushed at shutdown");
1704     config_flush_at_shutdown = 1;
1705   }
1707 } /* }}} static void journal_rotate */
1709 static void journal_done(void) /* {{{ */
1711   if (journal_cur == NULL)
1712     return;
1714   pthread_mutex_lock(&journal_lock);
1715   if (journal_fh != NULL)
1716   {
1717     fclose(journal_fh);
1718     journal_fh = NULL;
1719   }
1721   if (config_flush_at_shutdown)
1722   {
1723     RRDD_LOG(LOG_INFO, "removing journals");
1724     unlink(journal_old);
1725     unlink(journal_cur);
1726   }
1727   else
1728   {
1729     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1730              "journals will be used at next startup");
1731   }
1733   pthread_mutex_unlock(&journal_lock);
1735 } /* }}} static void journal_done */
1737 static int journal_write(char *cmd, char *args) /* {{{ */
1739   int chars;
1741   if (journal_fh == NULL)
1742     return 0;
1744   pthread_mutex_lock(&journal_lock);
1745   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1746   pthread_mutex_unlock(&journal_lock);
1748   if (chars > 0)
1749   {
1750     pthread_mutex_lock(&stats_lock);
1751     stats_journal_bytes += chars;
1752     pthread_mutex_unlock(&stats_lock);
1753   }
1755   return chars;
1756 } /* }}} static int journal_write */
1758 static int journal_replay (const char *file) /* {{{ */
1760   FILE *fh;
1761   int entry_cnt = 0;
1762   int fail_cnt = 0;
1763   uint64_t line = 0;
1764   char entry[CMD_MAX];
1765   time_t now;
1767   if (file == NULL) return 0;
1769   {
1770     char *reason;
1771     int status = 0;
1772     struct stat statbuf;
1774     memset(&statbuf, 0, sizeof(statbuf));
1775     if (stat(file, &statbuf) != 0)
1776     {
1777       if (errno == ENOENT)
1778         return 0;
1780       reason = "stat error";
1781       status = errno;
1782     }
1783     else if (!S_ISREG(statbuf.st_mode))
1784     {
1785       reason = "not a regular file";
1786       status = EPERM;
1787     }
1788     if (statbuf.st_uid != daemon_uid)
1789     {
1790       reason = "not owned by daemon user";
1791       status = EACCES;
1792     }
1793     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1794     {
1795       reason = "must not be user/group writable";
1796       status = EACCES;
1797     }
1799     if (status != 0)
1800     {
1801       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1802                file, rrd_strerror(status), reason);
1803       return 0;
1804     }
1805   }
1807   fh = fopen(file, "r");
1808   if (fh == NULL)
1809   {
1810     if (errno != ENOENT)
1811       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1812                file, rrd_strerror(errno));
1813     return 0;
1814   }
1815   else
1816     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1818   now = time(NULL);
1820   while(!feof(fh))
1821   {
1822     size_t entry_len;
1824     ++line;
1825     if (fgets(entry, sizeof(entry), fh) == NULL)
1826       break;
1827     entry_len = strlen(entry);
1829     /* check \n termination in case journal writing crashed mid-line */
1830     if (entry_len == 0)
1831       continue;
1832     else if (entry[entry_len - 1] != '\n')
1833     {
1834       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1835       ++fail_cnt;
1836       continue;
1837     }
1839     entry[entry_len - 1] = '\0';
1841     if (handle_request(NULL, now, entry, entry_len) == 0)
1842       ++entry_cnt;
1843     else
1844       ++fail_cnt;
1845   }
1847   fclose(fh);
1849   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1850            entry_cnt, fail_cnt);
1852   return entry_cnt > 0 ? 1 : 0;
1853 } /* }}} static int journal_replay */
1855 static void journal_init(void) /* {{{ */
1857   int had_journal = 0;
1859   if (journal_cur == NULL) return;
1861   pthread_mutex_lock(&journal_lock);
1863   RRDD_LOG(LOG_INFO, "checking for journal files");
1865   had_journal += journal_replay(journal_old);
1866   had_journal += journal_replay(journal_cur);
1868   /* it must have been a crash.  start a flush */
1869   if (had_journal && config_flush_at_shutdown)
1870     flush_old_values(-1);
1872   pthread_mutex_unlock(&journal_lock);
1873   journal_rotate();
1875   RRDD_LOG(LOG_INFO, "journal processing complete");
1877 } /* }}} static void journal_init */
1879 static void close_connection(listen_socket_t *sock)
1881   close(sock->fd) ;  sock->fd   = -1;
1882   free(sock->rbuf);  sock->rbuf = NULL;
1883   free(sock->wbuf);  sock->wbuf = NULL;
1885   free(sock);
1888 static void *connection_thread_main (void *args) /* {{{ */
1890   pthread_t self;
1891   listen_socket_t *sock;
1892   int i;
1893   int fd;
1895   sock = (listen_socket_t *) args;
1896   fd = sock->fd;
1898   /* init read buffers */
1899   sock->next_read = sock->next_cmd = 0;
1900   sock->rbuf = malloc(RBUF_SIZE);
1901   if (sock->rbuf == NULL)
1902   {
1903     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1904     close_connection(sock);
1905     return NULL;
1906   }
1908   pthread_mutex_lock (&connection_threads_lock);
1909   {
1910     pthread_t *temp;
1912     temp = (pthread_t *) realloc (connection_threads,
1913         sizeof (pthread_t) * (connection_threads_num + 1));
1914     if (temp == NULL)
1915     {
1916       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1917     }
1918     else
1919     {
1920       connection_threads = temp;
1921       connection_threads[connection_threads_num] = pthread_self ();
1922       connection_threads_num++;
1923     }
1924   }
1925   pthread_mutex_unlock (&connection_threads_lock);
1927   while (do_shutdown == 0)
1928   {
1929     char *cmd;
1930     ssize_t cmd_len;
1931     ssize_t rbytes;
1932     time_t now;
1934     struct pollfd pollfd;
1935     int status;
1937     pollfd.fd = fd;
1938     pollfd.events = POLLIN | POLLPRI;
1939     pollfd.revents = 0;
1941     status = poll (&pollfd, 1, /* timeout = */ 500);
1942     if (do_shutdown)
1943       break;
1944     else if (status == 0) /* timeout */
1945       continue;
1946     else if (status < 0) /* error */
1947     {
1948       status = errno;
1949       if (status != EINTR)
1950         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1951       continue;
1952     }
1954     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1955       break;
1956     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1957     {
1958       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1959           "poll(2) returned something unexpected: %#04hx",
1960           pollfd.revents);
1961       break;
1962     }
1964     rbytes = read(fd, sock->rbuf + sock->next_read,
1965                   RBUF_SIZE - sock->next_read);
1966     if (rbytes < 0)
1967     {
1968       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1969       break;
1970     }
1971     else if (rbytes == 0)
1972       break; /* eof */
1974     sock->next_read += rbytes;
1976     if (sock->batch_start)
1977       now = sock->batch_start;
1978     else
1979       now = time(NULL);
1981     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1982     {
1983       status = handle_request (sock, now, cmd, cmd_len+1);
1984       if (status != 0)
1985         goto out_close;
1986     }
1987   }
1989 out_close:
1990   close_connection(sock);
1992   self = pthread_self ();
1993   /* Remove this thread from the connection threads list */
1994   pthread_mutex_lock (&connection_threads_lock);
1995   /* Find out own index in the array */
1996   for (i = 0; i < connection_threads_num; i++)
1997     if (pthread_equal (connection_threads[i], self) != 0)
1998       break;
1999   assert (i < connection_threads_num);
2001   /* Move the trailing threads forward. */
2002   if (i < (connection_threads_num - 1))
2003   {
2004     memmove (connection_threads + i,
2005         connection_threads + i + 1,
2006         sizeof (pthread_t) * (connection_threads_num - i - 1));
2007   }
2009   connection_threads_num--;
2010   pthread_mutex_unlock (&connection_threads_lock);
2012   return (NULL);
2013 } /* }}} void *connection_thread_main */
2015 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2017   int fd;
2018   struct sockaddr_un sa;
2019   listen_socket_t *temp;
2020   int status;
2021   const char *path;
2023   path = sock->addr;
2024   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2025     path += strlen("unix:");
2027   temp = (listen_socket_t *) realloc (listen_fds,
2028       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2029   if (temp == NULL)
2030   {
2031     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
2032     return (-1);
2033   }
2034   listen_fds = temp;
2035   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2037   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2038   if (fd < 0)
2039   {
2040     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
2041     return (-1);
2042   }
2044   memset (&sa, 0, sizeof (sa));
2045   sa.sun_family = AF_UNIX;
2046   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2048   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2049   if (status != 0)
2050   {
2051     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
2052     close (fd);
2053     unlink (path);
2054     return (-1);
2055   }
2057   status = listen (fd, /* backlog = */ 10);
2058   if (status != 0)
2059   {
2060     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
2061     close (fd);
2062     unlink (path);
2063     return (-1);
2064   }
2066   listen_fds[listen_fds_num].fd = fd;
2067   listen_fds[listen_fds_num].family = PF_UNIX;
2068   strncpy(listen_fds[listen_fds_num].addr, path,
2069           sizeof (listen_fds[listen_fds_num].addr) - 1);
2070   listen_fds_num++;
2072   return (0);
2073 } /* }}} int open_listen_socket_unix */
2075 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2077   struct addrinfo ai_hints;
2078   struct addrinfo *ai_res;
2079   struct addrinfo *ai_ptr;
2080   char addr_copy[NI_MAXHOST];
2081   char *addr;
2082   char *port;
2083   int status;
2085   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2086   addr_copy[sizeof (addr_copy) - 1] = 0;
2087   addr = addr_copy;
2089   memset (&ai_hints, 0, sizeof (ai_hints));
2090   ai_hints.ai_flags = 0;
2091 #ifdef AI_ADDRCONFIG
2092   ai_hints.ai_flags |= AI_ADDRCONFIG;
2093 #endif
2094   ai_hints.ai_family = AF_UNSPEC;
2095   ai_hints.ai_socktype = SOCK_STREAM;
2097   port = NULL;
2098   if (*addr == '[') /* IPv6+port format */
2099   {
2100     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2101     addr++;
2103     port = strchr (addr, ']');
2104     if (port == NULL)
2105     {
2106       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2107           sock->addr);
2108       return (-1);
2109     }
2110     *port = 0;
2111     port++;
2113     if (*port == ':')
2114       port++;
2115     else if (*port == 0)
2116       port = NULL;
2117     else
2118     {
2119       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2120           port);
2121       return (-1);
2122     }
2123   } /* if (*addr = ']') */
2124   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2125   {
2126     port = rindex(addr, ':');
2127     if (port != NULL)
2128     {
2129       *port = 0;
2130       port++;
2131     }
2132   }
2133   ai_res = NULL;
2134   status = getaddrinfo (addr,
2135                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2136                         &ai_hints, &ai_res);
2137   if (status != 0)
2138   {
2139     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2140         "%s", addr, gai_strerror (status));
2141     return (-1);
2142   }
2144   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2145   {
2146     int fd;
2147     listen_socket_t *temp;
2148     int one = 1;
2150     temp = (listen_socket_t *) realloc (listen_fds,
2151         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2152     if (temp == NULL)
2153     {
2154       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2155       continue;
2156     }
2157     listen_fds = temp;
2158     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2160     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2161     if (fd < 0)
2162     {
2163       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2164       continue;
2165     }
2167     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2169     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2170     if (status != 0)
2171     {
2172       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2173       close (fd);
2174       continue;
2175     }
2177     status = listen (fd, /* backlog = */ 10);
2178     if (status != 0)
2179     {
2180       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2181       close (fd);
2182       return (-1);
2183     }
2185     listen_fds[listen_fds_num].fd = fd;
2186     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2187     listen_fds_num++;
2188   } /* for (ai_ptr) */
2190   return (0);
2191 } /* }}} static int open_listen_socket_network */
2193 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2195   assert(sock != NULL);
2196   assert(sock->addr != NULL);
2198   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2199       || sock->addr[0] == '/')
2200     return (open_listen_socket_unix(sock));
2201   else
2202     return (open_listen_socket_network(sock));
2203 } /* }}} int open_listen_socket */
2205 static int close_listen_sockets (void) /* {{{ */
2207   size_t i;
2209   for (i = 0; i < listen_fds_num; i++)
2210   {
2211     close (listen_fds[i].fd);
2213     if (listen_fds[i].family == PF_UNIX)
2214       unlink(listen_fds[i].addr);
2215   }
2217   free (listen_fds);
2218   listen_fds = NULL;
2219   listen_fds_num = 0;
2221   return (0);
2222 } /* }}} int close_listen_sockets */
2224 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2226   struct pollfd *pollfds;
2227   int pollfds_num;
2228   int status;
2229   int i;
2231   for (i = 0; i < config_listen_address_list_len; i++)
2232     open_listen_socket (config_listen_address_list[i]);
2234   if (config_listen_address_list_len < 1)
2235   {
2236     listen_socket_t sock;
2237     memset(&sock, 0, sizeof(sock));
2238     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2239     open_listen_socket (&sock);
2240   }
2242   if (listen_fds_num < 1)
2243   {
2244     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2245         "could be opened. Sorry.");
2246     return (NULL);
2247   }
2249   pollfds_num = listen_fds_num;
2250   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2251   if (pollfds == NULL)
2252   {
2253     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2254     return (NULL);
2255   }
2256   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2258   RRDD_LOG(LOG_INFO, "listening for connections");
2260   while (do_shutdown == 0)
2261   {
2262     assert (pollfds_num == ((int) listen_fds_num));
2263     for (i = 0; i < pollfds_num; i++)
2264     {
2265       pollfds[i].fd = listen_fds[i].fd;
2266       pollfds[i].events = POLLIN | POLLPRI;
2267       pollfds[i].revents = 0;
2268     }
2270     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2271     if (do_shutdown)
2272       break;
2273     else if (status == 0) /* timeout */
2274       continue;
2275     else if (status < 0) /* error */
2276     {
2277       status = errno;
2278       if (status != EINTR)
2279       {
2280         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2281       }
2282       continue;
2283     }
2285     for (i = 0; i < pollfds_num; i++)
2286     {
2287       listen_socket_t *client_sock;
2288       struct sockaddr_storage client_sa;
2289       socklen_t client_sa_size;
2290       pthread_t tid;
2291       pthread_attr_t attr;
2293       if (pollfds[i].revents == 0)
2294         continue;
2296       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2297       {
2298         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2299             "poll(2) returned something unexpected for listen FD #%i.",
2300             pollfds[i].fd);
2301         continue;
2302       }
2304       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2305       if (client_sock == NULL)
2306       {
2307         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2308         continue;
2309       }
2310       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2312       client_sa_size = sizeof (client_sa);
2313       client_sock->fd = accept (pollfds[i].fd,
2314           (struct sockaddr *) &client_sa, &client_sa_size);
2315       if (client_sock->fd < 0)
2316       {
2317         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2318         free(client_sock);
2319         continue;
2320       }
2322       pthread_attr_init (&attr);
2323       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2325       status = pthread_create (&tid, &attr, connection_thread_main,
2326                                client_sock);
2327       if (status != 0)
2328       {
2329         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2330         close_connection(client_sock);
2331         continue;
2332       }
2333     } /* for (pollfds_num) */
2334   } /* while (do_shutdown == 0) */
2336   RRDD_LOG(LOG_INFO, "starting shutdown");
2338   close_listen_sockets ();
2340   pthread_mutex_lock (&connection_threads_lock);
2341   while (connection_threads_num > 0)
2342   {
2343     pthread_t wait_for;
2345     wait_for = connection_threads[0];
2347     pthread_mutex_unlock (&connection_threads_lock);
2348     pthread_join (wait_for, /* retval = */ NULL);
2349     pthread_mutex_lock (&connection_threads_lock);
2350   }
2351   pthread_mutex_unlock (&connection_threads_lock);
2353   return (NULL);
2354 } /* }}} void *listen_thread_main */
2356 static int daemonize (void) /* {{{ */
2358   int status;
2359   int pid_fd;
2360   char *base_dir;
2362   daemon_uid = geteuid();
2364   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2365   if (pid_fd < 0)
2366     pid_fd = check_pidfile();
2367   if (pid_fd < 0)
2368     return pid_fd;
2370   if (!stay_foreground)
2371   {
2372     pid_t child;
2374     child = fork ();
2375     if (child < 0)
2376     {
2377       fprintf (stderr, "daemonize: fork(2) failed.\n");
2378       return (-1);
2379     }
2380     else if (child > 0)
2381     {
2382       return (1);
2383     }
2385     /* Become session leader */
2386     setsid ();
2388     /* Open the first three file descriptors to /dev/null */
2389     close (2);
2390     close (1);
2391     close (0);
2393     open ("/dev/null", O_RDWR);
2394     dup (0);
2395     dup (0);
2396   } /* if (!stay_foreground) */
2398   /* Change into the /tmp directory. */
2399   base_dir = (config_base_dir != NULL)
2400     ? config_base_dir
2401     : "/tmp";
2402   status = chdir (base_dir);
2403   if (status != 0)
2404   {
2405     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2406     return (-1);
2407   }
2409   install_signal_handlers();
2411   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2412   RRDD_LOG(LOG_INFO, "starting up");
2414   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2415   if (cache_tree == NULL)
2416   {
2417     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2418     return (-1);
2419   }
2421   status = write_pidfile (pid_fd);
2422   return status;
2423 } /* }}} int daemonize */
2425 static int cleanup (void) /* {{{ */
2427   do_shutdown++;
2429   pthread_cond_signal (&cache_cond);
2430   pthread_join (queue_thread, /* return = */ NULL);
2432   remove_pidfile ();
2434   RRDD_LOG(LOG_INFO, "goodbye");
2435   closelog ();
2437   return (0);
2438 } /* }}} int cleanup */
2440 static int read_options (int argc, char **argv) /* {{{ */
2442   int option;
2443   int status = 0;
2445   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2446   {
2447     switch (option)
2448     {
2449       case 'g':
2450         stay_foreground=1;
2451         break;
2453       case 'L':
2454       case 'l':
2455       {
2456         listen_socket_t **temp;
2457         listen_socket_t *new;
2459         new = malloc(sizeof(listen_socket_t));
2460         if (new == NULL)
2461         {
2462           fprintf(stderr, "read_options: malloc failed.\n");
2463           return(2);
2464         }
2465         memset(new, 0, sizeof(listen_socket_t));
2467         temp = (listen_socket_t **) realloc (config_listen_address_list,
2468             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2469         if (temp == NULL)
2470         {
2471           fprintf (stderr, "read_options: realloc failed.\n");
2472           return (2);
2473         }
2474         config_listen_address_list = temp;
2476         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2477         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2479         temp[config_listen_address_list_len] = new;
2480         config_listen_address_list_len++;
2481       }
2482       break;
2484       case 'f':
2485       {
2486         int temp;
2488         temp = atoi (optarg);
2489         if (temp > 0)
2490           config_flush_interval = temp;
2491         else
2492         {
2493           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2494           status = 3;
2495         }
2496       }
2497       break;
2499       case 'w':
2500       {
2501         int temp;
2503         temp = atoi (optarg);
2504         if (temp > 0)
2505           config_write_interval = temp;
2506         else
2507         {
2508           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2509           status = 2;
2510         }
2511       }
2512       break;
2514       case 'z':
2515       {
2516         int temp;
2518         temp = atoi(optarg);
2519         if (temp > 0)
2520           config_write_jitter = temp;
2521         else
2522         {
2523           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2524           status = 2;
2525         }
2527         break;
2528       }
2530       case 'B':
2531         config_write_base_only = 1;
2532         break;
2534       case 'b':
2535       {
2536         size_t len;
2537         char base_realpath[PATH_MAX];
2539         if (config_base_dir != NULL)
2540           free (config_base_dir);
2541         config_base_dir = strdup (optarg);
2542         if (config_base_dir == NULL)
2543         {
2544           fprintf (stderr, "read_options: strdup failed.\n");
2545           return (3);
2546         }
2548         /* make sure that the base directory is not resolved via
2549          * symbolic links.  this makes some performance-enhancing
2550          * assumptions possible (we don't have to resolve paths
2551          * that start with a "/")
2552          */
2553         if (realpath(config_base_dir, base_realpath) == NULL)
2554         {
2555           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2556           return 5;
2557         }
2558         else if (strncmp(config_base_dir,
2559                          base_realpath, sizeof(base_realpath)) != 0)
2560         {
2561           fprintf(stderr,
2562                   "Base directory (-b) resolved via file system links!\n"
2563                   "Please consult rrdcached '-b' documentation!\n"
2564                   "Consider specifying the real directory (%s)\n",
2565                   base_realpath);
2566           return 5;
2567         }
2569         len = strlen (config_base_dir);
2570         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2571         {
2572           config_base_dir[len - 1] = 0;
2573           len--;
2574         }
2576         if (len < 1)
2577         {
2578           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2579           return (4);
2580         }
2582         _config_base_dir_len = len;
2583       }
2584       break;
2586       case 'p':
2587       {
2588         if (config_pid_file != NULL)
2589           free (config_pid_file);
2590         config_pid_file = strdup (optarg);
2591         if (config_pid_file == NULL)
2592         {
2593           fprintf (stderr, "read_options: strdup failed.\n");
2594           return (3);
2595         }
2596       }
2597       break;
2599       case 'F':
2600         config_flush_at_shutdown = 1;
2601         break;
2603       case 'j':
2604       {
2605         struct stat statbuf;
2606         const char *dir = optarg;
2608         status = stat(dir, &statbuf);
2609         if (status != 0)
2610         {
2611           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2612           return 6;
2613         }
2615         if (!S_ISDIR(statbuf.st_mode)
2616             || access(dir, R_OK|W_OK|X_OK) != 0)
2617         {
2618           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2619                   errno ? rrd_strerror(errno) : "");
2620           return 6;
2621         }
2623         journal_cur = malloc(PATH_MAX + 1);
2624         journal_old = malloc(PATH_MAX + 1);
2625         if (journal_cur == NULL || journal_old == NULL)
2626         {
2627           fprintf(stderr, "malloc failure for journal files\n");
2628           return 6;
2629         }
2630         else 
2631         {
2632           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2633           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2634         }
2635       }
2636       break;
2638       case 'h':
2639       case '?':
2640         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2641             "\n"
2642             "Usage: rrdcached [options]\n"
2643             "\n"
2644             "Valid options are:\n"
2645             "  -l <address>  Socket address to listen to.\n"
2646             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2647             "  -w <seconds>  Interval in which to write data.\n"
2648             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2649             "  -f <seconds>  Interval in which to flush dead data.\n"
2650             "  -p <file>     Location of the PID-file.\n"
2651             "  -b <dir>      Base directory to change to.\n"
2652             "  -B            Restrict file access to paths within -b <dir>\n"
2653             "  -g            Do not fork and run in the foreground.\n"
2654             "  -j <dir>      Directory in which to create the journal files.\n"
2655             "  -F            Always flush all updates at shutdown\n"
2656             "\n"
2657             "For more information and a detailed description of all options "
2658             "please refer\n"
2659             "to the rrdcached(1) manual page.\n",
2660             VERSION);
2661         status = -1;
2662         break;
2663     } /* switch (option) */
2664   } /* while (getopt) */
2666   /* advise the user when values are not sane */
2667   if (config_flush_interval < 2 * config_write_interval)
2668     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2669             " 2x write interval (-w) !\n");
2670   if (config_write_jitter > config_write_interval)
2671     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2672             " write interval (-w) !\n");
2674   if (config_write_base_only && config_base_dir == NULL)
2675     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2676             "  Consult the rrdcached documentation\n");
2678   if (journal_cur == NULL)
2679     config_flush_at_shutdown = 1;
2681   return (status);
2682 } /* }}} int read_options */
2684 int main (int argc, char **argv)
2686   int status;
2688   status = read_options (argc, argv);
2689   if (status != 0)
2690   {
2691     if (status < 0)
2692       status = 0;
2693     return (status);
2694   }
2696   status = daemonize ();
2697   if (status == 1)
2698   {
2699     struct sigaction sigchld;
2701     memset (&sigchld, 0, sizeof (sigchld));
2702     sigchld.sa_handler = SIG_IGN;
2703     sigaction (SIGCHLD, &sigchld, NULL);
2705     return (0);
2706   }
2707   else if (status != 0)
2708   {
2709     fprintf (stderr, "daemonize failed, exiting.\n");
2710     return (1);
2711   }
2713   journal_init();
2715   /* start the queue thread */
2716   memset (&queue_thread, 0, sizeof (queue_thread));
2717   status = pthread_create (&queue_thread,
2718                            NULL, /* attr */
2719                            queue_thread_main,
2720                            NULL); /* args */
2721   if (status != 0)
2722   {
2723     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2724     cleanup();
2725     return (1);
2726   }
2728   listen_thread_main (NULL);
2729   cleanup ();
2731   return (0);
2732 } /* int main */
2734 /*
2735  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2736  */