diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c
index f997d3ceddbd4060add089914cf95b80b86b3fcf..17153ce98be3309f985b885b486e3a0fc7b5fd6d 100644 (file)
--- a/src/rrd_daemon.c
+++ b/src/rrd_daemon.c
/**
* RRDTool - src/rrd_daemon.c
- * Copyright (C) 2008 Florian octo Forster
- * Copyright (C) 2008 Kevin Brintnall
+ * Copyright (C) 2008,2009 Florian octo Forster
+ * Copyright (C) 2008,2009 Kevin Brintnall
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Now for some includes..
*/
/* {{{ */
-#if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
-#include "../win32/config.h"
-#else
-#ifdef HAVE_CONFIG_H
-#include "../rrd_config.h"
-#endif
-#endif
-
-#include "rrd.h"
+#include "rrd_tool.h"
#include "rrd_client.h"
+#include "unused.h"
#include <stdlib.h>
#ifndef WIN32
-#include <stdint.h>
+#ifdef HAVE_STDINT_H
+# include <stdint.h>
+#endif
#include <unistd.h>
#include <strings.h>
#include <inttypes.h>
-# include <sys/socket.h>
+#include <sys/socket.h>
#else
#include <sys/types.h>
#include <sys/stat.h>
+#include <dirent.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/un.h>
#include <assert.h>
#include <sys/time.h>
#include <time.h>
+#include <libgen.h>
+#include <grp.h>
+
+#ifdef HAVE_LIBWRAP
+#include <tcpd.h>
+#endif /* HAVE_LIBWRAP */
#include <glib-2.0/glib.h>
/* }}} */
-#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
-
-#ifndef __GNUC__
-# define __attribute__(x) /**/
-#endif
+#define RRDD_LOG(severity, ...) \
+ do { \
+ if (stay_foreground) { \
+ fprintf(stderr, __VA_ARGS__); \
+ fprintf(stderr, "\n"); } \
+ syslog ((severity), __VA_ARGS__); \
+ } while (0)
/*
* Types
*/
-typedef enum
-{
- PRIV_LOW,
- PRIV_HIGH
-} socket_privilege;
-
typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
struct listen_socket_s
int fd;
char addr[PATH_MAX + 1];
int family;
- socket_privilege privilege;
/* state for BATCH processing */
time_t batch_start;
char *wbuf;
ssize_t wbuf_len;
+
+ uint32_t permissions;
+
+ gid_t socket_group;
+ mode_t socket_permissions;
};
typedef struct listen_socket_s listen_socket_t;
-struct command;
+struct command_s;
+typedef struct command_s command_t;
/* note: guard against "unused" warnings in the handlers */
-#define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
- time_t now __attribute__((unused)),\
- char *buffer __attribute__((unused)),\
- size_t buffer_size __attribute__((unused))
+#define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
+ time_t UNUSED(now),\
+ char UNUSED(*buffer),\
+ size_t UNUSED(buffer_size)
-#define HANDLER_PROTO struct command *cmd __attribute__((unused)),\
+#define HANDLER_PROTO command_t UNUSED(*cmd),\
DISPATCH_PROTO
-struct command {
+struct command_s {
char *cmd;
int (*handler)(HANDLER_PROTO);
- socket_privilege min_priv;
char context; /* where we expect to see it */
#define CMD_CONTEXT_CLIENT (1<<0)
char **values;
size_t values_num;
time_t last_flush_time;
- time_t last_update_stamp;
+ double last_update_stamp;
#define CI_FLAGS_IN_TREE (1<<0)
#define CI_FLAGS_IN_QUEUE (1<<1)
int flags;
};
typedef enum queue_side_e queue_side_t;
+/* describe a set of journal files */
+typedef struct {
+ char **files;
+ size_t files_num;
+} journal_set;
+
/* max length of socket command or response */
#define CMD_MAX 4096
#define RBUF_SIZE (CMD_MAX*2)
static listen_socket_t *listen_fds = NULL;
static size_t listen_fds_num = 0;
-static int do_shutdown = 0;
+static listen_socket_t default_socket;
+
+enum {
+ RUNNING, /* normal operation */
+ FLUSHING, /* flushing remaining values */
+ SHUTDOWN /* shutting down */
+} state = RUNNING;
static pthread_t *queue_threads;
static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
/* Journaled updates */
-static char *journal_cur = NULL;
-static char *journal_old = NULL;
-static FILE *journal_fh = NULL;
+#define JOURNAL_REPLAY(s) ((s) == NULL)
+#define JOURNAL_BASE "rrd.journal"
+static journal_set *journal_cur = NULL;
+static journal_set *journal_old = NULL;
+static char *journal_dir = NULL;
+static FILE *journal_fh = NULL; /* current journal file handle */
+static long journal_size = 0; /* current journal size */
+#define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
static int journal_write(char *cmd, char *args);
static void journal_done(void);
static void sig_common (const char *sig) /* {{{ */
{
RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
- do_shutdown++;
+ state = FLUSHING;
pthread_cond_broadcast(&flush_cond);
pthread_cond_broadcast(&queue_cond);
} /* }}} void sig_common */
-static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
+static void sig_int_handler (int UNUSED(s)) /* {{{ */
{
sig_common("INT");
} /* }}} void sig_int_handler */
-static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
+static void sig_term_handler (int UNUSED(s)) /* {{{ */
{
sig_common("TERM");
} /* }}} void sig_term_handler */
-static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
+static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
{
config_flush_at_shutdown = 1;
sig_common("USR1");
} /* }}} void sig_usr1_handler */
-static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
+static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
{
config_flush_at_shutdown = 0;
sig_common("USR2");
static int open_pidfile(char *action, int oflag) /* {{{ */
{
int fd;
- char *file;
+ const char *file;
+ char *file_copy, *dir;
file = (config_pid_file != NULL)
? config_pid_file
: LOCALSTATEDIR "/run/rrdcached.pid";
+ /* dirname may modify its argument */
+ file_copy = strdup(file);
+ if (file_copy == NULL)
+ {
+ fprintf(stderr, "rrdcached: strdup(): %s\n",
+ rrd_strerror(errno));
+ return -1;
+ }
+
+ dir = dirname(file_copy);
+ if (rrd_mkdir_p(dir, 0777) != 0)
+ {
+ fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
+ dir, rrd_strerror(errno));
+ return -1;
+ }
+
+ free(file_copy);
+
fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
if (fd < 0)
fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
/* NOTREACHED */
assert(1==0);
-}
+} /* }}} char *next_cmd */
/* add the characters directly to the write buffer */
static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
char buffer[CMD_MAX];
int len;
- if (sock == NULL) return 0; /* journal replay mode */
+ if (JOURNAL_REPLAY(sock)) return 0;
if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
va_start(argp, fmt);
ssize_t wrote;
int rclen, len;
- if (sock == NULL) return rc; /* journal replay mode */
+ if (JOURNAL_REPLAY(sock)) return rc;
if (sock->batch_start)
{
/* in case anyone is waiting */
pthread_cond_broadcast(&ci->flushed);
+ pthread_cond_destroy(&ci->flushed);
free (ci);
if (ci->flags & CI_FLAGS_IN_QUEUE)
return FALSE;
- if ((ci->last_flush_time <= cfd->abs_timeout)
- && (ci->values_num > 0))
- {
- enqueue_cache_item (ci, TAIL);
- }
- else if ((do_shutdown != 0)
- && (ci->values_num > 0))
+ if (ci->values_num > 0
+ && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
{
enqueue_cache_item (ci, TAIL);
}
for (k = 0; k < cfd.keys_num; k++)
{
+ gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
/* should never fail, since we have held the cache_lock
* the entire time */
- assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
+ assert(status == TRUE);
}
if (cfd.keys != NULL)
return (0);
} /* int flush_old_values */
-static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
+static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
{
struct timeval now;
struct timespec next_flush;
pthread_mutex_lock(&cache_lock);
- while (!do_shutdown)
+ while (state == RUNNING)
{
gettimeofday (&now, NULL);
if ((now.tv_sec > next_flush.tv_sec)
|| ((now.tv_sec == next_flush.tv_sec)
&& ((1000 * now.tv_usec) > next_flush.tv_nsec)))
{
+ RRDD_LOG(LOG_DEBUG, "flushing old values");
+
+ /* Determine the time of the next cache flush. */
+ next_flush.tv_sec = now.tv_sec + config_flush_interval;
+
/* Flush all values that haven't been written in the last
* `config_write_interval' seconds. */
flush_old_values (config_write_interval);
- /* Determine the time of the next cache flush. */
- next_flush.tv_sec =
- now.tv_sec + next_flush.tv_sec % config_flush_interval;
-
/* unlock the cache while we rotate so we don't block incoming
* updates if the fsync() blocks on disk I/O */
pthread_mutex_unlock(&cache_lock);
if (config_flush_at_shutdown)
flush_old_values (-1); /* flush everything */
+ state = SHUTDOWN;
+
pthread_mutex_unlock(&cache_lock);
return NULL;
} /* void *flush_thread_main */
-static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
+static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
{
pthread_mutex_lock (&cache_lock);
- while (!do_shutdown
+ while (state != SHUTDOWN
|| (cache_queue_head != NULL && config_flush_at_shutdown))
{
cache_item_t *ci;
int status;
/* Now, check if there's something to store away. If not, wait until
- * something comes in. if we are shutting down, do not wait around. */
- if (cache_queue_head == NULL && !do_shutdown)
+ * something comes in. */
+ if (cache_queue_head == NULL)
{
status = pthread_cond_wait (&queue_cond, &cache_lock);
if ((status != 0) && (status != ETIMEDOUT))
}
journal_write("wrote", file);
- pthread_cond_broadcast(&ci->flushed);
- rrd_free_ptrs((void ***) &values, &values_num);
- free(file);
+ /* Search again in the tree. It's possible someone issued a "FORGET"
+ * while we were writing the update values. */
+ pthread_mutex_lock(&cache_lock);
+ ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
+ if (ci)
+ pthread_cond_broadcast(&ci->flushed);
+ pthread_mutex_unlock(&cache_lock);
if (status == 0)
{
pthread_mutex_unlock (&stats_lock);
}
+ rrd_free_ptrs((void ***) &values, &values_num);
+ free(file);
+
pthread_mutex_lock (&cache_lock);
}
pthread_mutex_unlock (&cache_lock);
@@ -1004,7 +1050,7 @@ static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
assert(file != NULL);
if (!config_write_base_only
- || sock == NULL /* journal replay */
+ || JOURNAL_REPLAY(sock)
|| config_base_dir == NULL)
return 1;
*filename = tmp;
} /* }}} static int get_abs_path */
-/* returns 1 if we have the required privilege level,
- * otherwise issue an error to the user on sock */
-static int has_privilege (listen_socket_t *sock, /* {{{ */
- socket_privilege priv)
-{
- if (sock == NULL) /* journal replay */
- return 1;
-
- if (sock->privilege >= priv)
- return 1;
-
- return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
-} /* }}} static int has_privilege */
-
static int flush_file (const char *filename) /* {{{ */
{
cache_item_t *ci;
return (0);
} /* }}} int flush_file */
-static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
+static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
{
char *err = "Syntax error.\n";
if (found == TRUE)
{
- if (sock != NULL)
+ if (!JOURNAL_REPLAY(sock))
journal_write("forget", file);
return send_response(sock, RESP_OK, "Gone!\n");
cache_item_t *ci;
/* save it for the journal later */
- strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
+ if (!JOURNAL_REPLAY(sock))
+ strncpy(orig_buf, buffer, min(CMD_MAX,buffer_size));
status = buffer_get_field (&buffer, &buffer_size, &file);
if (status != 0)
if (ci == NULL) /* {{{ */
{
struct stat statbuf;
+ cache_item_t *tmp;
/* don't hold the lock while we setup; stat(2) might block */
pthread_mutex_unlock(&cache_lock);
pthread_cond_init(&ci->flushed, NULL);
pthread_mutex_lock(&cache_lock);
- g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
+
+ /* another UPDATE might have added this entry in the meantime */
+ tmp = g_tree_lookup (cache_tree, file);
+ if (tmp == NULL)
+ g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
+ else
+ {
+ free_cache_item (ci);
+ ci = tmp;
+ }
+
+ /* state may have changed while we were unlocked */
+ if (state == SHUTDOWN)
+ return -1;
} /* }}} */
assert (ci != NULL);
/* don't re-write updates in replay mode */
- if (sock != NULL)
+ if (!JOURNAL_REPLAY(sock))
journal_write("update", orig_buf);
while (buffer_size > 0)
{
char *value;
- time_t stamp;
+ double stamp;
char *eostamp;
status = buffer_get_field (&buffer, &buffer_size, &value);
break;
}
- /* make sure update time is always moving forward */
- stamp = strtol(value, &eostamp, 10);
+ /* make sure update time is always moving forward. We use double here since
+ update does support subsecond precision for timestamps ... */
+ stamp = strtod(value, &eostamp);
if (eostamp == value || eostamp == NULL || *eostamp != ':')
{
pthread_mutex_unlock(&cache_lock);
{
pthread_mutex_unlock(&cache_lock);
return send_response(sock, RESP_ERR,
- "illegal attempt to update using time %ld when last"
- " update time is %ld (minimum one second step)\n",
+ "illegal attempt to update using time %lf when last"
+ " update time is %lf (minimum one second step)\n",
stamp, ci->last_update_stamp);
}
else
return -1;
} /* }}} static int handle_request_quit */
-struct command COMMANDS[] = {
+static command_t list_of_commands[] = { /* {{{ */
{
"UPDATE",
handle_request_update,
- PRIV_HIGH,
CMD_CONTEXT_ANY,
"UPDATE <filename> <values> [<values> ...]\n"
,
{
"WROTE",
handle_request_wrote,
- PRIV_HIGH,
CMD_CONTEXT_JOURNAL,
NULL,
NULL
{
"FLUSH",
handle_request_flush,
- PRIV_LOW,
CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
"FLUSH <filename>\n"
,
{
"FLUSHALL",
handle_request_flushall,
- PRIV_HIGH,
CMD_CONTEXT_CLIENT,
"FLUSHALL\n"
,
{
"PENDING",
handle_request_pending,
- PRIV_HIGH,
CMD_CONTEXT_CLIENT,
"PENDING <filename>\n"
,
{
"FORGET",
handle_request_forget,
- PRIV_HIGH,
CMD_CONTEXT_ANY,
"FORGET <filename>\n"
,
{
"QUEUE",
handle_request_queue,
- PRIV_LOW,
CMD_CONTEXT_CLIENT,
"QUEUE\n"
,
{
"STATS",
handle_request_stats,
- PRIV_LOW,
CMD_CONTEXT_CLIENT,
"STATS\n"
,
{
"HELP",
handle_request_help,
- PRIV_LOW,
CMD_CONTEXT_CLIENT,
"HELP [<command>]\n",
NULL, /* special! */
{
"BATCH",
batch_start,
- PRIV_LOW,
CMD_CONTEXT_CLIENT,
"BATCH\n"
,
{
".", /* BATCH terminator */
batch_done,
- PRIV_LOW,
CMD_CONTEXT_BATCH,
NULL,
NULL
{
"QUIT",
handle_request_quit,
- PRIV_LOW,
CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
"QUIT\n"
,
"Disconnect from rrdcached.\n"
- },
- {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */
-};
+ }
+}; /* }}} command_t list_of_commands[] */
+static size_t list_of_commands_len = sizeof (list_of_commands)
+ / sizeof (list_of_commands[0]);
-static struct command *find_command(char *cmd)
+static command_t *find_command(char *cmd)
{
- struct command *c = COMMANDS;
-
- while (c->cmd != NULL)
- {
- if (strcasecmp(cmd, c->cmd) == 0)
- break;
- c++;
- }
+ size_t i;
- if (c->cmd == NULL)
- return NULL;
- else
- return c;
+ for (i = 0; i < list_of_commands_len; i++)
+ if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
+ return (&list_of_commands[i]);
+ return NULL;
}
+/* We currently use the index in the `list_of_commands' array as a bit position
+ * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
+ * outside these functions so that switching to a more elegant storage method
+ * is easily possible. */
+static ssize_t find_command_index (const char *cmd) /* {{{ */
+{
+ size_t i;
+
+ for (i = 0; i < list_of_commands_len; i++)
+ if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
+ return ((ssize_t) i);
+ return (-1);
+} /* }}} ssize_t find_command_index */
+
+static int socket_permission_check (listen_socket_t *sock, /* {{{ */
+ const char *cmd)
+{
+ ssize_t i;
+
+ if (JOURNAL_REPLAY(sock))
+ return (1);
+
+ if (cmd == NULL)
+ return (-1);
+
+ if ((strcasecmp ("QUIT", cmd) == 0)
+ || (strcasecmp ("HELP", cmd) == 0))
+ return (1);
+ else if (strcmp (".", cmd) == 0)
+ cmd = "BATCH";
+
+ i = find_command_index (cmd);
+ if (i < 0)
+ return (-1);
+ assert (i < 32);
+
+ if ((sock->permissions & (1 << i)) != 0)
+ return (1);
+ return (0);
+} /* }}} int socket_permission_check */
+
+static int socket_permission_add (listen_socket_t *sock, /* {{{ */
+ const char *cmd)
+{
+ ssize_t i;
+
+ i = find_command_index (cmd);
+ if (i < 0)
+ return (-1);
+ assert (i < 32);
+
+ sock->permissions |= (1 << i);
+ return (0);
+} /* }}} int socket_permission_add */
+
+static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
+{
+ sock->permissions = 0;
+} /* }}} socket_permission_clear */
+
+static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
+ listen_socket_t *src)
+{
+ dest->permissions = src->permissions;
+} /* }}} socket_permission_copy */
+
+static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
+{
+ size_t i;
+
+ sock->permissions = 0;
+ for (i = 0; i < list_of_commands_len; i++)
+ sock->permissions |= (1 << i);
+} /* }}} void socket_permission_set_all */
+
/* check whether commands are received in the expected context */
-static int command_check_context(listen_socket_t *sock, struct command *cmd)
+static int command_check_context(listen_socket_t *sock, command_t *cmd)
{
- if (sock == NULL)
+ if (JOURNAL_REPLAY(sock))
return (cmd->context & CMD_CONTEXT_JOURNAL);
else if (sock->batch_start)
return (cmd->context & CMD_CONTEXT_BATCH);
int status;
char *cmd_str;
char *resp_txt;
- struct command *help = NULL;
+ command_t *help = NULL;
status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
if (status == 0)
}
else
{
- help = COMMANDS;
+ size_t i;
+
resp_txt = "Command overview\n";
- while (help->cmd)
+ for (i = 0; i < list_of_commands_len; i++)
{
- if (help->syntax)
- add_response_info(sock, "%s", help->syntax);
- help++;
+ if (list_of_commands[i].syntax == NULL)
+ continue;
+ add_response_info (sock, "%s", list_of_commands[i].syntax);
}
}
return send_response(sock, RESP_OK, resp_txt);
} /* }}} int handle_request_help */
-/* if sock==NULL, we are in journal replay mode */
static int handle_request (DISPATCH_PROTO) /* {{{ */
{
char *buffer_ptr = buffer;
char *cmd_str = NULL;
- struct command *cmd = NULL;
+ command_t *cmd = NULL;
int status;
assert (buffer[buffer_size - 1] == '\0');
if (!cmd)
return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
- status = has_privilege(sock, cmd->min_priv);
- if (status <= 0)
- return status;
+ if (!socket_permission_check (sock, cmd->cmd))
+ return send_response(sock, RESP_ERR, "Permission denied.\n");
if (!command_check_context(sock, cmd))
return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
} /* }}} int handle_request */
-/* MUST NOT hold journal_lock before calling this */
-static void journal_rotate(void) /* {{{ */
+static void journal_set_free (journal_set *js) /* {{{ */
{
- FILE *old_fh = NULL;
- int new_fd;
-
- if (journal_cur == NULL || journal_old == NULL)
+ if (js == NULL)
return;
- pthread_mutex_lock(&journal_lock);
+ rrd_free_ptrs((void ***) &js->files, &js->files_num);
- /* we rotate this way (rename before close) so that the we can release
- * the journal lock as fast as possible. Journal writes to the new
- * journal can proceed immediately after the new file is opened. The
- * fclose can then block without affecting new updates.
- */
- if (journal_fh != NULL)
+ free(js);
+} /* }}} journal_set_free */
+
+static void journal_set_remove (journal_set *js) /* {{{ */
+{
+ if (js == NULL)
+ return;
+
+ for (uint i=0; i < js->files_num; i++)
{
- old_fh = journal_fh;
- journal_fh = NULL;
- rename(journal_cur, journal_old);
- ++stats_journal_rotate;
+ RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
+ unlink(js->files[i]);
}
+} /* }}} journal_set_remove */
- new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
- S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
- if (new_fd >= 0)
+/* close current journal file handle.
+ * MUST hold journal_lock before calling */
+static void journal_close(void) /* {{{ */
+{
+ if (journal_fh != NULL)
{
- journal_fh = fdopen(new_fd, "a");
- if (journal_fh == NULL)
- close(new_fd);
+ if (fclose(journal_fh) != 0)
+ RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
}
- pthread_mutex_unlock(&journal_lock);
+ journal_fh = NULL;
+ journal_size = 0;
+} /* }}} journal_close */
+
+/* MUST hold journal_lock before calling */
+static void journal_new_file(void) /* {{{ */
+{
+ struct timeval now;
+ int new_fd;
+ char new_file[PATH_MAX + 1];
+
+ assert(journal_dir != NULL);
+ assert(journal_cur != NULL);
+
+ journal_close();
- if (old_fh != NULL)
- fclose(old_fh);
+ gettimeofday(&now, NULL);
+ /* this format assures that the files sort in strcmp() order */
+ snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
+ journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
+ new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
+ S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
+ if (new_fd < 0)
+ goto error;
+
+ journal_fh = fdopen(new_fd, "a");
if (journal_fh == NULL)
- {
- RRDD_LOG(LOG_CRIT,
- "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
- journal_cur, rrd_strerror(errno));
+ goto error;
- RRDD_LOG(LOG_ERR,
- "JOURNALING DISABLED: All values will be flushed at shutdown");
- config_flush_at_shutdown = 1;
- }
+ journal_size = ftell(journal_fh);
+ RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
+
+ /* record the file in the journal set */
+ rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
+
+ return;
+
+error:
+ RRDD_LOG(LOG_CRIT,
+ "JOURNALING DISABLED: Error while trying to create %s : %s",
+ new_file, rrd_strerror(errno));
+ RRDD_LOG(LOG_CRIT,
+ "JOURNALING DISABLED: All values will be flushed at shutdown");
+
+ close(new_fd);
+ config_flush_at_shutdown = 1;
+
+} /* }}} journal_new_file */
+
+/* MUST NOT hold journal_lock before calling this */
+static void journal_rotate(void) /* {{{ */
+{
+ journal_set *old_js = NULL;
+
+ if (journal_dir == NULL)
+ return;
+
+ RRDD_LOG(LOG_DEBUG, "rotating journals");
+
+ pthread_mutex_lock(&stats_lock);
+ ++stats_journal_rotate;
+ pthread_mutex_unlock(&stats_lock);
+
+ pthread_mutex_lock(&journal_lock);
+
+ journal_close();
+
+ /* rotate the journal sets */
+ old_js = journal_old;
+ journal_old = journal_cur;
+ journal_cur = calloc(1, sizeof(journal_set));
+
+ if (journal_cur != NULL)
+ journal_new_file();
+ else
+ RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
+
+ pthread_mutex_unlock(&journal_lock);
+
+ journal_set_remove(old_js);
+ journal_set_free (old_js);
} /* }}} static void journal_rotate */
+/* MUST hold journal_lock when calling */
static void journal_done(void) /* {{{ */
{
if (journal_cur == NULL)
return;
- pthread_mutex_lock(&journal_lock);
- if (journal_fh != NULL)
- {
- fclose(journal_fh);
- journal_fh = NULL;
- }
+ journal_close();
if (config_flush_at_shutdown)
{
RRDD_LOG(LOG_INFO, "removing journals");
- unlink(journal_old);
- unlink(journal_cur);
+ journal_set_remove(journal_old);
+ journal_set_remove(journal_cur);
}
else
{
"journals will be used at next startup");
}
- pthread_mutex_unlock(&journal_lock);
+ journal_set_free(journal_cur);
+ journal_set_free(journal_old);
+ free(journal_dir);
} /* }}} static void journal_done */
pthread_mutex_lock(&journal_lock);
chars = fprintf(journal_fh, "%s %s\n", cmd, args);
+ journal_size += chars;
+
+ if (journal_size > JOURNAL_MAX)
+ journal_new_file();
+
pthread_mutex_unlock(&journal_lock);
if (chars > 0)
memset(&statbuf, 0, sizeof(statbuf));
if (stat(file, &statbuf) != 0)
{
- if (errno == ENOENT)
- return 0;
-
reason = "stat error";
status = errno;
}
return entry_cnt > 0 ? 1 : 0;
} /* }}} static int journal_replay */
+static int journal_sort(const void *v1, const void *v2)
+{
+ char **jn1 = (char **) v1;
+ char **jn2 = (char **) v2;
+
+ return strcmp(*jn1,*jn2);
+}
+
static void journal_init(void) /* {{{ */
{
int had_journal = 0;
+ DIR *dir;
+ struct dirent *dent;
+ char path[PATH_MAX+1];
- if (journal_cur == NULL) return;
+ if (journal_dir == NULL) return;
pthread_mutex_lock(&journal_lock);
+ journal_cur = calloc(1, sizeof(journal_set));
+ if (journal_cur == NULL)
+ {
+ RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
+ return;
+ }
+
RRDD_LOG(LOG_INFO, "checking for journal files");
- had_journal += journal_replay(journal_old);
- had_journal += journal_replay(journal_cur);
+ /* Handle old journal files during transition. This gives them the
+ * correct sort order. TODO: remove after first release
+ */
+ {
+ char old_path[PATH_MAX+1];
+ snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
+ snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
+ rename(old_path, path);
+
+ snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
+ snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
+ rename(old_path, path);
+ }
+
+ dir = opendir(journal_dir);
+ if (!dir) {
+ RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
+ return;
+ }
+ while ((dent = readdir(dir)) != NULL)
+ {
+ /* looks like a journal file? */
+ if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
+ continue;
+
+ snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
+
+ if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
+ {
+ RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
+ dent->d_name);
+ break;
+ }
+ }
+ closedir(dir);
+
+ qsort(journal_cur->files, journal_cur->files_num,
+ sizeof(journal_cur->files[0]), journal_sort);
+
+ for (uint i=0; i < journal_cur->files_num; i++)
+ had_journal += journal_replay(journal_cur->files[i]);
+
+ journal_new_file();
/* it must have been a crash. start a flush */
if (had_journal && config_flush_at_shutdown)
flush_old_values(-1);
pthread_mutex_unlock(&journal_lock);
- journal_rotate();
RRDD_LOG(LOG_INFO, "journal processing complete");
}
pthread_mutex_lock (&connection_threads_lock);
+#ifdef HAVE_LIBWRAP
+ /* LIBWRAP does not support multiple threads! By putting this code
+ inside pthread_mutex_lock we do not have to worry about request_info
+ getting overwritten by another thread.
+ */
+ struct request_info req;
+ request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
+ fromhost(&req);
+ if(!hosts_access(&req)) {
+ RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
+ pthread_mutex_unlock (&connection_threads_lock);
+ close_connection(sock);
+ return NULL;
+ }
+#endif /* HAVE_LIBWRAP */
connection_threads_num++;
pthread_mutex_unlock (&connection_threads_lock);
- while (do_shutdown == 0)
+ while (state == RUNNING)
{
char *cmd;
ssize_t cmd_len;
pollfd.revents = 0;
status = poll (&pollfd, 1, /* timeout = */ 500);
- if (do_shutdown)
+ if (state != RUNNING)
break;
else if (status == 0) /* timeout */
continue;
listen_socket_t *temp;
int status;
const char *path;
+ char *path_copy, *dir;
path = sock->addr;
if (strncmp(path, "unix:", strlen("unix:")) == 0)
path += strlen("unix:");
+ /* dirname may modify its argument */
+ path_copy = strdup(path);
+ if (path_copy == NULL)
+ {
+ fprintf(stderr, "rrdcached: strdup(): %s\n",
+ rrd_strerror(errno));
+ return (-1);
+ }
+
+ dir = dirname(path_copy);
+ if (rrd_mkdir_p(dir, 0777) != 0)
+ {
+ fprintf(stderr, "Failed to create socket directory '%s': %s\n",
+ dir, rrd_strerror(errno));
+ return (-1);
+ }
+
+ free(path_copy);
+
temp = (listen_socket_t *) rrd_realloc (listen_fds,
sizeof (listen_fds[0]) * (listen_fds_num + 1));
if (temp == NULL)
return (-1);
}
+ /* tweak the sockets group ownership */
+ if (sock->socket_group != (gid_t)-1)
+ {
+ if ( (chown(path, getuid(), sock->socket_group) != 0) ||
+ (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
+ {
+ fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
+ }
+ }
+
+ if (sock->socket_permissions != (mode_t)-1)
+ {
+ if (chmod(path, sock->socket_permissions) != 0)
+ fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
+ (unsigned int)sock->socket_permissions, strerror(errno));
+ }
+
status = listen (fd, /* backlog = */ 10);
if (status != 0)
{
fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
return (-1);
}
- } /* if (*addr = ']') */
- else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
+ } /* if (*addr == '[') */
+ else
{
port = rindex(addr, ':');
if (port != NULL)
return (0);
} /* }}} int close_listen_sockets */
-static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
+static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
{
struct pollfd *pollfds;
int pollfds_num;
@@ -2305,7 +2585,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
RRDD_LOG(LOG_INFO, "listening for connections");
- while (do_shutdown == 0)
+ while (state == RUNNING)
{
for (i = 0; i < pollfds_num; i++)
{
@@ -2315,7 +2595,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
}
status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
- if (do_shutdown)
+ if (state != RUNNING)
break;
else if (status == 0) /* timeout */
continue;
@@ -2378,7 +2658,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
continue;
}
} /* for (pollfds_num) */
- } /* while (do_shutdown == 0) */
+ } /* while (state == RUNNING) */
RRDD_LOG(LOG_INFO, "starting shutdown");
}
else
{
- listen_socket_t sock;
- memset(&sock, 0, sizeof(sock));
- strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
- open_listen_socket (&sock);
+ strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
+ sizeof(default_socket.addr) - 1);
+ default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
+
+ if (default_socket.permissions == 0)
+ socket_permission_set_all (&default_socket);
+
+ open_listen_socket (&default_socket);
}
if (listen_fds_num < 1)
static int cleanup (void) /* {{{ */
{
- do_shutdown++;
-
pthread_cond_broadcast (&flush_cond);
pthread_join (flush_thread, NULL);
RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
}
- journal_done();
- remove_pidfile ();
-
free(queue_threads);
free(config_base_dir);
- free(config_pid_file);
- free(journal_cur);
- free(journal_old);
pthread_mutex_lock(&cache_lock);
g_tree_destroy(cache_tree);
+ pthread_mutex_lock(&journal_lock);
+ journal_done();
+
RRDD_LOG(LOG_INFO, "goodbye");
closelog ();
+ remove_pidfile ();
+ free(config_pid_file);
+
return (0);
} /* }}} int cleanup */
int option;
int status = 0;
- while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
+ socket_permission_clear (&default_socket);
+
+ default_socket.socket_group = (gid_t)-1;
+ default_socket.socket_permissions = (mode_t)-1;
+
+ while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
{
switch (option)
{
stay_foreground=1;
break;
- case 'L':
case 'l':
{
listen_socket_t *new;
memset(new, 0, sizeof(listen_socket_t));
strncpy(new->addr, optarg, sizeof(new->addr)-1);
- new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
+
+ /* Add permissions to the socket {{{ */
+ if (default_socket.permissions != 0)
+ {
+ socket_permission_copy (new, &default_socket);
+ }
+ else /* if (default_socket.permissions == 0) */
+ {
+ /* Add permission for ALL commands to the socket. */
+ socket_permission_set_all (new);
+ }
+ /* }}} Done adding permissions. */
+
+ new->socket_group = default_socket.socket_group;
+ new->socket_permissions = default_socket.socket_permissions;
if (!rrd_add_ptr((void ***)&config_listen_address_list,
&config_listen_address_list_len, new))
}
break;
+ /* set socket group permissions */
+ case 's':
+ {
+ gid_t group_gid;
+ struct group *grp;
+
+ group_gid = strtoul(optarg, NULL, 10);
+ if (errno != EINVAL && group_gid>0)
+ {
+ /* we were passed a number */
+ grp = getgrgid(group_gid);
+ }
+ else
+ {
+ grp = getgrnam(optarg);
+ }
+
+ if (grp)
+ {
+ default_socket.socket_group = grp->gr_gid;
+ }
+ else
+ {
+ /* no idea what the user wanted... */
+ fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
+ return (5);
+ }
+ }
+ break;
+
+ /* set socket file permissions */
+ case 'm':
+ {
+ long tmp;
+ char *endptr = NULL;
+
+ tmp = strtol (optarg, &endptr, 8);
+ if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
+ || (tmp > 07777) || (tmp < 0)) {
+ fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
+ optarg);
+ return (5);
+ }
+
+ default_socket.socket_permissions = (mode_t)tmp;
+ }
+ break;
+
+ case 'P':
+ {
+ char *optcopy;
+ char *saveptr;
+ char *dummy;
+ char *ptr;
+
+ socket_permission_clear (&default_socket);
+
+ optcopy = strdup (optarg);
+ dummy = optcopy;
+ saveptr = NULL;
+ while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
+ {
+ dummy = NULL;
+ status = socket_permission_add (&default_socket, ptr);
+ if (status != 0)
+ {
+ fprintf (stderr, "read_options: Adding permission \"%s\" to "
+ "socket failed. Most likely, this permission doesn't "
+ "exist. Check your command line.\n", ptr);
+ status = 4;
+ }
+ }
+
+ free (optcopy);
+ }
+ break;
+
case 'f':
{
int temp;
return (3);
}
+ if (rrd_mkdir_p (config_base_dir, 0777) != 0)
+ {
+ fprintf (stderr, "Failed to create base directory '%s': %s\n",
+ config_base_dir, rrd_strerror (errno));
+ return (3);
+ }
+
/* make sure that the base directory is not resolved via
* symbolic links. this makes some performance-enhancing
* assumptions possible (we don't have to resolve paths
*/
if (realpath(config_base_dir, base_realpath) == NULL)
{
- fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
- return 5;
- }
- else if (strncmp(config_base_dir,
- base_realpath, sizeof(base_realpath)) != 0)
- {
- fprintf(stderr,
- "Base directory (-b) resolved via file system links!\n"
- "Please consult rrdcached '-b' documentation!\n"
- "Consider specifying the real directory (%s)\n",
- base_realpath);
+ fprintf (stderr, "Failed to canonicalize the base directory '%s': "
+ "%s\n", config_base_dir, rrd_strerror(errno));
return 5;
}
}
_config_base_dir_len = len;
+
+ len = strlen (base_realpath);
+ while ((len > 0) && (base_realpath[len - 1] == '/'))
+ {
+ base_realpath[len - 1] = '\0';
+ len--;
+ }
+
+ if (strncmp(config_base_dir,
+ base_realpath, sizeof(base_realpath)) != 0)
+ {
+ fprintf(stderr,
+ "Base directory (-b) resolved via file system links!\n"
+ "Please consult rrdcached '-b' documentation!\n"
+ "Consider specifying the real directory (%s)\n",
+ base_realpath);
+ return 5;
+ }
}
break;
case 'j':
{
- struct stat statbuf;
- const char *dir = optarg;
+ char journal_dir_actual[PATH_MAX];
+ const char *dir;
+ dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
- status = stat(dir, &statbuf);
+ status = rrd_mkdir_p(dir, 0777);
if (status != 0)
{
- fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
+ fprintf(stderr, "Failed to create journal directory '%s': %s\n",
+ dir, rrd_strerror(errno));
return 6;
}
- if (!S_ISDIR(statbuf.st_mode)
- || access(dir, R_OK|W_OK|X_OK) != 0)
+ if (access(dir, R_OK|W_OK|X_OK) != 0)
{
fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
errno ? rrd_strerror(errno) : "");
return 6;
}
-
- journal_cur = malloc(PATH_MAX + 1);
- journal_old = malloc(PATH_MAX + 1);
- if (journal_cur == NULL || journal_old == NULL)
- {
- fprintf(stderr, "malloc failure for journal files\n");
- return 6;
- }
- else
- {
- snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
- snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
- }
}
break;
case 'h':
case '?':
- printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
+ printf ("RRDCacheD %s\n"
+ "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
"\n"
"Usage: rrdcached [options]\n"
"\n"
"Valid options are:\n"
" -l <address> Socket address to listen to.\n"
- " -L <address> Socket address to listen to ('FLUSH' only).\n"
+ " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
+ " -P <perms> Sets the permissions to assign to all following "
+ "sockets\n"
" -w <seconds> Interval in which to write data.\n"
" -z <delay> Delay writes up to <delay> seconds to spread load\n"
" -t <threads> Number of write threads.\n"
" -g Do not fork and run in the foreground.\n"
" -j <dir> Directory in which to create the journal files.\n"
" -F Always flush all updates at shutdown\n"
+ " -s <id|name> Group owner of all following UNIX sockets\n"
+ " (the socket will also have read/write permissions "
+ "for that group)\n"
+ " -m <mode> File permissions (octal) of all following UNIX "
+ "sockets\n"
"\n"
"For more information and a detailed description of all options "
"please refer\n"
"to the rrdcached(1) manual page.\n",
VERSION);
- status = -1;
+ if (option == 'h')
+ status = -1;
+ else
+ status = 1;
break;
} /* switch (option) */
} /* while (getopt) */
fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
" Consult the rrdcached documentation\n");
- if (journal_cur == NULL)
+ if (journal_dir == NULL)
config_flush_at_shutdown = 1;
return (status);