index 19460032ad911d9c98613ec5faebf959da769e7c..0ea5bf767f0e45fd88a28a1bdb19020e97dccb48 100644 (file)
--- a/program/src/rrd_daemon.c
+++ b/program/src/rrd_daemon.c
/**
* RRDTool - src/rrd_daemon.c
- * Copyright (C) 2008,2009 Florian octo Forster
+ * Copyright (C) 2008-2010 Florian octo Forster
* Copyright (C) 2008,2009 Kevin Brintnall
*
* This program is free software; you can redistribute it and/or modify it
* Now for some includes..
*/
/* {{{ */
-#if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
-#include "../win32/config.h"
-#else
-#ifdef HAVE_CONFIG_H
-#include "../rrd_config.h"
-#endif
-#endif
-#include "rrd.h"
+#include "rrd_tool.h"
#include "rrd_client.h"
+#include "unused.h"
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <libgen.h>
+#include <grp.h>
+
+#ifdef HAVE_LIBWRAP
+#include <tcpd.h>
+#endif /* HAVE_LIBWRAP */
#include <glib-2.0/glib.h>
/* }}} */
-#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
-
-#ifndef __GNUC__
-# define __attribute__(x) /**/
-#endif
+#define RRDD_LOG(severity, ...) \
+ do { \
+ if (stay_foreground) { \
+ fprintf(stderr, __VA_ARGS__); \
+ fprintf(stderr, "\n"); } \
+ syslog ((severity), __VA_ARGS__); \
+ } while (0)
/*
* Types
ssize_t wbuf_len;
uint32_t permissions;
+
+ gid_t socket_group;
+ mode_t socket_permissions;
};
typedef struct listen_socket_s listen_socket_t;
struct command_s;
typedef struct command_s command_t;
/* note: guard against "unused" warnings in the handlers */
-#define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
- time_t now __attribute__((unused)),\
- char *buffer __attribute__((unused)),\
- size_t buffer_size __attribute__((unused))
+#define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
+ time_t UNUSED(now),\
+ char UNUSED(*buffer),\
+ size_t UNUSED(buffer_size)
-#define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
+#define HANDLER_PROTO command_t UNUSED(*cmd),\
DISPATCH_PROTO
struct command_s {
{
char *file;
char **values;
- size_t values_num;
+ size_t values_num; /* number of valid pointers */
+ size_t values_alloc; /* number of allocated pointers */
time_t last_flush_time;
- time_t last_update_stamp;
+ double last_update_stamp;
#define CI_FLAGS_IN_TREE (1<<0)
#define CI_FLAGS_IN_QUEUE (1<<1)
int flags;
size_t files_num;
} journal_set;
-/* max length of socket command or response */
-#define CMD_MAX 4096
-#define RBUF_SIZE (CMD_MAX*2)
+#define RBUF_SIZE (RRD_CMD_MAX*2)
/*
* Variables
static listen_socket_t *listen_fds = NULL;
static size_t listen_fds_num = 0;
+static listen_socket_t default_socket;
+
enum {
RUNNING, /* normal operation */
FLUSHING, /* flushing remaining values */
static char *config_base_dir = NULL;
static size_t _config_base_dir_len = 0;
static int config_write_base_only = 0;
+static size_t config_alloc_chunk = 1;
static listen_socket_t **config_listen_address_list = NULL;
static size_t config_listen_address_list_len = 0;
static uint64_t stats_journal_rotate = 0;
static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
+static int opt_no_overwrite = 0; /* default for the daemon */
+
/* Journaled updates */
+#define JOURNAL_REPLAY(s) ((s) == NULL)
#define JOURNAL_BASE "rrd.journal"
static journal_set *journal_cur = NULL;
static journal_set *journal_old = NULL;
static void sig_common (const char *sig) /* {{{ */
{
RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
- state = FLUSHING;
+ if (state == RUNNING) {
+ state = FLUSHING;
+ }
pthread_cond_broadcast(&flush_cond);
pthread_cond_broadcast(&queue_cond);
} /* }}} void sig_common */
-static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
+static void sig_int_handler (int UNUSED(s)) /* {{{ */
{
sig_common("INT");
} /* }}} void sig_int_handler */
-static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
+static void sig_term_handler (int UNUSED(s)) /* {{{ */
{
sig_common("TERM");
} /* }}} void sig_term_handler */
-static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
+static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
{
config_flush_at_shutdown = 1;
sig_common("USR1");
} /* }}} void sig_usr1_handler */
-static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
+static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
{
config_flush_at_shutdown = 0;
sig_common("USR2");
@@ -525,10 +536,10 @@ static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
{
va_list argp;
- char buffer[CMD_MAX];
+ char buffer[RRD_CMD_MAX];
int len;
- if (sock == NULL) return 0; /* journal replay mode */
+ if (JOURNAL_REPLAY(sock)) return 0;
if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
va_start(argp, fmt);
char *fmt, ...) /* {{{ */
{
va_list argp;
- char buffer[CMD_MAX];
+ char buffer[RRD_CMD_MAX];
int lines;
ssize_t wrote;
int rclen, len;
- if (sock == NULL) return rc; /* journal replay mode */
+ if (JOURNAL_REPLAY(sock)) return rc;
if (sock->batch_start)
{
else
lines = -1;
- rclen = sprintf(buffer, "%d ", lines);
+ rclen = snprintf(buffer, sizeof buffer, "%d ", lines);
va_start(argp, fmt);
#ifdef HAVE_VSNPRINTF
len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
{
ci->values = NULL;
ci->values_num = 0;
+ ci->values_alloc = 0;
ci->last_flush_time = when;
if (config_write_jitter > 0)
for (k = 0; k < cfd.keys_num; k++)
{
+ gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
/* should never fail, since we have held the cache_lock
* the entire time */
- assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
+ assert(status == TRUE);
}
if (cfd.keys != NULL)
return (0);
} /* int flush_old_values */
-static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
+static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
{
struct timeval now;
struct timespec next_flush;
return NULL;
} /* void *flush_thread_main */
-static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
+static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
{
pthread_mutex_lock (&cache_lock);
@@ -1043,7 +1056,7 @@ static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
assert(file != NULL);
if (!config_write_base_only
- || sock == NULL /* journal replay */
+ || JOURNAL_REPLAY(sock)
|| config_base_dir == NULL)
return 1;
if (found == TRUE)
{
- if (sock != NULL)
+ if (!JOURNAL_REPLAY(sock))
journal_write("forget", file);
return send_response(sock, RESP_OK, "Gone!\n");
char *file, file_tmp[PATH_MAX];
int values_num = 0;
int status;
- char orig_buf[CMD_MAX];
+ char orig_buf[RRD_CMD_MAX];
cache_item_t *ci;
/* save it for the journal later */
- if (sock != NULL)
- strncpy(orig_buf, buffer, buffer_size);
+ if (!JOURNAL_REPLAY(sock))
+ strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
status = buffer_get_field (&buffer, &buffer_size, &file);
if (status != 0)
assert (ci != NULL);
/* don't re-write updates in replay mode */
- if (sock != NULL)
+ if (!JOURNAL_REPLAY(sock))
journal_write("update", orig_buf);
while (buffer_size > 0)
{
char *value;
- time_t stamp;
+ double stamp;
char *eostamp;
status = buffer_get_field (&buffer, &buffer_size, &value);
break;
}
- /* make sure update time is always moving forward */
- stamp = strtol(value, &eostamp, 10);
+ /* make sure update time is always moving forward. We use double here since
+ update does support subsecond precision for timestamps ... */
+ stamp = strtod(value, &eostamp);
if (eostamp == value || eostamp == NULL || *eostamp != ':')
{
pthread_mutex_unlock(&cache_lock);
{
pthread_mutex_unlock(&cache_lock);
return send_response(sock, RESP_ERR,
- "illegal attempt to update using time %ld when last"
- " update time is %ld (minimum one second step)\n",
+ "illegal attempt to update using time %lf when last"
+ " update time is %lf (minimum one second step)\n",
stamp, ci->last_update_stamp);
}
else
ci->last_update_stamp = stamp;
- if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
+ if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
+ &ci->values_alloc, config_alloc_chunk))
{
RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
continue;
} /* }}} int handle_request_update */
+static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
+{
+ char *file, file_tmp[PATH_MAX];
+ char *cf;
+
+ char *start_str;
+ char *end_str;
+ time_t start_tm;
+ time_t end_tm;
+
+ unsigned long step;
+ unsigned long ds_cnt;
+ char **ds_namv;
+ rrd_value_t *data;
+
+ int status;
+ unsigned long i;
+ time_t t;
+ rrd_value_t *data_ptr;
+
+ file = NULL;
+ cf = NULL;
+ start_str = NULL;
+ end_str = NULL;
+
+ /* Read the arguments */
+ do /* while (0) */
+ {
+ status = buffer_get_field (&buffer, &buffer_size, &file);
+ if (status != 0)
+ break;
+
+ status = buffer_get_field (&buffer, &buffer_size, &cf);
+ if (status != 0)
+ break;
+
+ status = buffer_get_field (&buffer, &buffer_size, &start_str);
+ if (status != 0)
+ {
+ start_str = NULL;
+ status = 0;
+ break;
+ }
+
+ status = buffer_get_field (&buffer, &buffer_size, &end_str);
+ if (status != 0)
+ {
+ end_str = NULL;
+ status = 0;
+ break;
+ }
+ } while (0);
+
+ if (status != 0)
+ return (syntax_error(sock,cmd));
+
+ get_abs_path(&file, file_tmp);
+ if (!check_file_access(file, sock)) return 0;
+
+ status = flush_file (file);
+ if ((status != 0) && (status != ENOENT))
+ return (send_response (sock, RESP_ERR,
+ "flush_file (%s) failed with status %i.\n", file, status));
+
+ t = time (NULL); /* "now" */
+
+ /* Parse start time */
+ if (start_str != NULL)
+ {
+ char *endptr;
+ long value;
+
+ endptr = NULL;
+ errno = 0;
+ value = strtol (start_str, &endptr, /* base = */ 0);
+ if ((endptr == start_str) || (errno != 0))
+ return (send_response(sock, RESP_ERR,
+ "Cannot parse start time `%s': Only simple integers are allowed.\n",
+ start_str));
+
+ if (value > 0)
+ start_tm = (time_t) value;
+ else
+ start_tm = (time_t) (t + value);
+ }
+ else
+ {
+ start_tm = t - 86400;
+ }
+
+ /* Parse end time */
+ if (end_str != NULL)
+ {
+ char *endptr;
+ long value;
+
+ endptr = NULL;
+ errno = 0;
+ value = strtol (end_str, &endptr, /* base = */ 0);
+ if ((endptr == end_str) || (errno != 0))
+ return (send_response(sock, RESP_ERR,
+ "Cannot parse end time `%s': Only simple integers are allowed.\n",
+ end_str));
+
+ if (value > 0)
+ end_tm = (time_t) value;
+ else
+ end_tm = (time_t) (t + value);
+ }
+ else
+ {
+ end_tm = t;
+ }
+
+ step = -1;
+ ds_cnt = 0;
+ ds_namv = NULL;
+ data = NULL;
+
+ status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
+ &ds_cnt, &ds_namv, &data);
+ if (status != 0)
+ return (send_response(sock, RESP_ERR,
+ "rrd_fetch_r failed: %s\n", rrd_get_error ()));
+
+ add_response_info (sock, "FlushVersion: %lu\n", 1);
+ add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
+ add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
+ add_response_info (sock, "Step: %lu\n", step);
+ add_response_info (sock, "DSCount: %lu\n", ds_cnt);
+
+#define SSTRCAT(buffer,str,buffer_fill) do { \
+ size_t str_len = strlen (str); \
+ if ((buffer_fill + str_len) > sizeof (buffer)) \
+ str_len = sizeof (buffer) - buffer_fill; \
+ if (str_len > 0) { \
+ strncpy (buffer + buffer_fill, str, str_len); \
+ buffer_fill += str_len; \
+ assert (buffer_fill <= sizeof (buffer)); \
+ if (buffer_fill == sizeof (buffer)) \
+ buffer[buffer_fill - 1] = 0; \
+ else \
+ buffer[buffer_fill] = 0; \
+ } \
+ } while (0)
+
+ { /* Add list of DS names */
+ char linebuf[1024];
+ size_t linebuf_fill;
+
+ memset (linebuf, 0, sizeof (linebuf));
+ linebuf_fill = 0;
+ for (i = 0; i < ds_cnt; i++)
+ {
+ if (i > 0)
+ SSTRCAT (linebuf, " ", linebuf_fill);
+ SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
+ rrd_freemem(ds_namv[i]);
+ }
+ rrd_freemem(ds_namv);
+ add_response_info (sock, "DSName: %s\n", linebuf);
+ }
+
+ /* Add the actual data */
+ assert (step > 0);
+ data_ptr = data;
+ for (t = start_tm + step; t <= end_tm; t += step)
+ {
+ char linebuf[1024];
+ size_t linebuf_fill;
+ char tmp[128];
+
+ memset (linebuf, 0, sizeof (linebuf));
+ linebuf_fill = 0;
+ for (i = 0; i < ds_cnt; i++)
+ {
+ snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
+ tmp[sizeof (tmp) - 1] = 0;
+ SSTRCAT (linebuf, tmp, linebuf_fill);
+
+ data_ptr++;
+ }
+
+ add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
+ } /* for (t) */
+ rrd_freemem(data);
+
+ return (send_response (sock, RESP_OK, "Success\n"));
+#undef SSTRCAT
+} /* }}} int handle_request_fetch */
+
/* we came across a "WROTE" entry during journal replay.
* throw away any values that we have accumulated for this file
*/
return (0);
} /* }}} int handle_request_wrote */
+static int handle_request_info (HANDLER_PROTO) /* {{{ */
+{
+ char *file, file_tmp[PATH_MAX];
+ int status;
+ rrd_info_t *info;
+
+ /* obtain filename */
+ status = buffer_get_field(&buffer, &buffer_size, &file);
+ if (status != 0)
+ return syntax_error(sock,cmd);
+ /* get full pathname */
+ get_abs_path(&file, file_tmp);
+ if (!check_file_access(file, sock)) {
+ return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
+ }
+ /* get data */
+ rrd_clear_error ();
+ info = rrd_info_r(file);
+ if(!info) {
+ return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
+ }
+ for (rrd_info_t *data = info; data != NULL; data = data->next) {
+ switch (data->type) {
+ case RD_I_VAL:
+ if (isnan(data->value.u_val))
+ add_response_info(sock,"%s %d NaN\n",data->key, data->type);
+ else
+ add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
+ break;
+ case RD_I_CNT:
+ add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
+ break;
+ case RD_I_INT:
+ add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
+ break;
+ case RD_I_STR:
+ add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
+ break;
+ case RD_I_BLO:
+ add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
+ break;
+ }
+ }
+
+ rrd_info_free(info);
+
+ return send_response(sock, RESP_OK, "Info for %s follows\n",file);
+} /* }}} static int handle_request_info */
+
+static int handle_request_first (HANDLER_PROTO) /* {{{ */
+{
+ char *i, *file, file_tmp[PATH_MAX];
+ int status;
+ int idx;
+ time_t t;
+
+ /* obtain filename */
+ status = buffer_get_field(&buffer, &buffer_size, &file);
+ if (status != 0)
+ return syntax_error(sock,cmd);
+ /* get full pathname */
+ get_abs_path(&file, file_tmp);
+ if (!check_file_access(file, sock)) {
+ return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
+ }
+
+ status = buffer_get_field(&buffer, &buffer_size, &i);
+ if (status != 0)
+ return syntax_error(sock,cmd);
+ idx = atoi(i);
+ if(idx<0) {
+ return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
+ }
+
+ /* get data */
+ rrd_clear_error ();
+ t = rrd_first_r(file,idx);
+ if(t<1) {
+ return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
+ }
+ return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
+} /* }}} static int handle_request_first */
+
+
+static int handle_request_last (HANDLER_PROTO) /* {{{ */
+{
+ char *file, file_tmp[PATH_MAX];
+ int status;
+ time_t t, from_file, step;
+ rrd_file_t * rrd_file;
+ cache_item_t * ci;
+ rrd_t rrd;
+
+ /* obtain filename */
+ status = buffer_get_field(&buffer, &buffer_size, &file);
+ if (status != 0)
+ return syntax_error(sock,cmd);
+ /* get full pathname */
+ get_abs_path(&file, file_tmp);
+ if (!check_file_access(file, sock)) {
+ return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
+ }
+ rrd_clear_error();
+ rrd_init(&rrd);
+ rrd_file = rrd_open(file,&rrd,RRD_READONLY);
+ if(!rrd_file) {
+ return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
+ }
+ from_file = rrd.live_head->last_up;
+ step = rrd.stat_head->pdp_step;
+ rrd_close(rrd_file);
+ pthread_mutex_lock(&cache_lock);
+ ci = g_tree_lookup(cache_tree, file);
+ if (ci)
+ t = ci->last_update_stamp;
+ else
+ t = from_file;
+ pthread_mutex_unlock(&cache_lock);
+ t -= t % step;
+ rrd_free(&rrd);
+ if(t<1) {
+ return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
+ }
+ return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
+} /* }}} static int handle_request_last */
+
+static int handle_request_create (HANDLER_PROTO) /* {{{ */
+{
+ char *file, file_tmp[PATH_MAX];
+ char *tok;
+ int ac = 0;
+ char *av[128];
+ int status;
+ unsigned long step = 300;
+ time_t last_up = time(NULL)-10;
+ int no_overwrite = opt_no_overwrite;
+
+
+ /* obtain filename */
+ status = buffer_get_field(&buffer, &buffer_size, &file);
+ if (status != 0)
+ return syntax_error(sock,cmd);
+ /* get full pathname */
+ get_abs_path(&file, file_tmp);
+ if (!check_file_access(file, sock)) {
+ return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
+ }
+ RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
+
+ while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
+ if( ! strncmp(tok,"-b",2) ) {
+ status = buffer_get_field(&buffer, &buffer_size, &tok );
+ if (status != 0) return syntax_error(sock,cmd);
+ last_up = (time_t) atol(tok);
+ continue;
+ }
+ if( ! strncmp(tok,"-s",2) ) {
+ status = buffer_get_field(&buffer, &buffer_size, &tok );
+ if (status != 0) return syntax_error(sock,cmd);
+ step = atol(tok);
+ continue;
+ }
+ if( ! strncmp(tok,"-O",2) ) {
+ no_overwrite = 1;
+ continue;
+ }
+ if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
+ if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
+ return syntax_error(sock,cmd);
+ }
+ if(step<1) {
+ return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
+ }
+ if (last_up < 3600 * 24 * 365 * 10) {
+ return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
+ }
+
+ rrd_clear_error ();
+ status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
+
+ if(!status) {
+ return send_response(sock, RESP_OK, "RRD created OK\n");
+ }
+ return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
+} /* }}} static int handle_request_create */
+
/* start "BATCH" processing */
static int batch_start (HANDLER_PROTO) /* {{{ */
{
NULL,
NULL
},
+ {
+ "FETCH",
+ handle_request_fetch,
+ CMD_CONTEXT_CLIENT,
+ "FETCH <file> <CF> [<start> [<end>]]\n"
+ ,
+ "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
+ },
+ {
+ "INFO",
+ handle_request_info,
+ CMD_CONTEXT_CLIENT,
+ "INFO <filename>\n",
+ "The INFO command retrieves information about a specified RRD file.\n"
+ "This is returned in standard rrdinfo format, a sequence of lines\n"
+ "with the format <keyname> = <value>\n"
+ "Note that this is the data as of the last update of the RRD file itself,\n"
+ "not the last time data was received via rrdcached, so there may be pending\n"
+ "updates in the queue. If this bothers you, then first run a FLUSH.\n"
+ },
+ {
+ "FIRST",
+ handle_request_first,
+ CMD_CONTEXT_CLIENT,
+ "FIRST <filename> <rra index>\n",
+ "The FIRST command retrieves the first data time for a specified RRA in\n"
+ "an RRD file.\n"
+ },
+ {
+ "LAST",
+ handle_request_last,
+ CMD_CONTEXT_CLIENT,
+ "LAST <filename>\n",
+ "The LAST command retrieves the last update time for a specified RRD file.\n"
+ "Note that this is the time of the last update of the RRD file itself, not\n"
+ "the last time data was received via rrdcached, so there may be pending\n"
+ "updates in the queue. If this bothers you, then first run a FLUSH.\n"
+ },
+ {
+ "CREATE",
+ handle_request_create,
+ CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
+ "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
+ "The CREATE command will create an RRD file, overwriting any existing file\n"
+ "unless the -O option is given or rrdcached was started with the -O option.\n"
+ "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
+ "not acceptable) and the step is in seconds (default is 300).\n"
+ "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
+ },
{
"QUIT",
handle_request_quit,
{
ssize_t i;
- if (sock == NULL) /* journal replay */
+ if (JOURNAL_REPLAY(sock))
return (1);
if (cmd == NULL)
return (0);
} /* }}} int socket_permission_add */
+static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
+{
+ sock->permissions = 0;
+} /* }}} socket_permission_clear */
+
+static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
+ listen_socket_t *src)
+{
+ dest->permissions = src->permissions;
+} /* }}} socket_permission_copy */
+
+static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
+{
+ size_t i;
+
+ sock->permissions = 0;
+ for (i = 0; i < list_of_commands_len; i++)
+ sock->permissions |= (1 << i);
+} /* }}} void socket_permission_set_all */
+
/* check whether commands are received in the expected context */
static int command_check_context(listen_socket_t *sock, command_t *cmd)
{
- if (sock == NULL)
+ if (JOURNAL_REPLAY(sock))
return (cmd->context & CMD_CONTEXT_JOURNAL);
else if (sock->batch_start)
return (cmd->context & CMD_CONTEXT_BATCH);
if (help && (help->syntax || help->help))
{
- char tmp[CMD_MAX];
+ char tmp[RRD_CMD_MAX];
snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
resp_txt = tmp;
return send_response(sock, RESP_OK, resp_txt);
} /* }}} int handle_request_help */
-/* if sock==NULL, we are in journal replay mode */
static int handle_request (DISPATCH_PROTO) /* {{{ */
{
char *buffer_ptr = buffer;
int entry_cnt = 0;
int fail_cnt = 0;
uint64_t line = 0;
- char entry[CMD_MAX];
+ char entry[RRD_CMD_MAX];
time_t now;
if (file == NULL) return 0;
}
dir = opendir(journal_dir);
+ if (!dir) {
+ RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
+ return;
+ }
while ((dent = readdir(dir)) != NULL)
{
/* looks like a journal file? */
}
pthread_mutex_lock (&connection_threads_lock);
+#ifdef HAVE_LIBWRAP
+ /* LIBWRAP does not support multiple threads! By putting this code
+ inside pthread_mutex_lock we do not have to worry about request_info
+ getting overwritten by another thread.
+ */
+ struct request_info req;
+ request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
+ fromhost(&req);
+ if(!hosts_access(&req)) {
+ RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
+ pthread_mutex_unlock (&connection_threads_lock);
+ close_connection(sock);
+ return NULL;
+ }
+#endif /* HAVE_LIBWRAP */
connection_threads_num++;
pthread_mutex_unlock (&connection_threads_lock);
return (-1);
}
+ /* tweak the sockets group ownership */
+ if (sock->socket_group != (gid_t)-1)
+ {
+ if ( (chown(path, getuid(), sock->socket_group) != 0) ||
+ (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
+ {
+ fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
+ }
+ }
+
+ if (sock->socket_permissions != (mode_t)-1)
+ {
+ if (chmod(path, sock->socket_permissions) != 0)
+ fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
+ (unsigned int)sock->socket_permissions, strerror(errno));
+ }
+
status = listen (fd, /* backlog = */ 10);
if (status != 0)
{
return (open_listen_socket_network(sock));
} /* }}} int open_listen_socket */
+#ifndef SD_LISTEN_FDS_START
+# define SD_LISTEN_FDS_START 3
+#endif
+/*
+ * returns number of descriptors passed from systemd
+ */
+static int open_listen_sockets_systemd(void) /* {{{ */
+{
+ listen_socket_t *temp;
+ struct sockaddr_un sa;
+ socklen_t l;
+ int sd_fd;
+ const char *env;
+ unsigned long n;
+
+ /* check if it for us */
+ env = getenv("LISTEN_PID");
+ if (!env)
+ return 0;
+
+ n = strtoul(env, NULL, 10);
+ if (!n || n == ULONG_MAX || (pid_t)n != getpid())
+ return 0;
+
+ /* get the number of passed descriptors */
+ env = getenv("LISTEN_FDS");
+ if (!env)
+ return 0;
+
+ n = strtoul(env, NULL, 10);
+ if (!n || n == ULONG_MAX)
+ return 0;
+
+ temp = (listen_socket_t *) rrd_realloc (listen_fds,
+ sizeof (listen_fds[0]) * (listen_fds_num + n));
+ if (temp == NULL)
+ {
+ fprintf (stderr, "rrdcached: open_listen_socket_systemd: realloc failed.\n");
+ return 0;
+ }
+ listen_fds = temp;
+
+ for (unsigned int i = 0; i < n; i++)
+ {
+ sd_fd = SD_LISTEN_FDS_START + i;
+
+ l = sizeof(sa);
+ memset(&sa, 0, l);
+ if (getsockname(sd_fd, &sa, &l) < 0)
+ {
+ fprintf(stderr, "open_listen_sockets_systemd: problem getting fd %d: %s\n", sd_fd, rrd_strerror (errno));
+ return i;
+ }
+
+ listen_fds[listen_fds_num].fd = sd_fd;
+ listen_fds[listen_fds_num].family = sa.sun_family;
+ listen_fds_num++;
+ }
+
+ return n;
+} /* }}} open_listen_sockets_systemd */
+
+static void open_listen_sockets_traditional(void) /* {{{ */
+{
+ if (config_listen_address_list_len > 0)
+ {
+ for (size_t i = 0; i < config_listen_address_list_len; i++)
+ open_listen_socket (config_listen_address_list[i]);
+
+ rrd_free_ptrs((void ***) &config_listen_address_list,
+ &config_listen_address_list_len);
+ }
+ else
+ {
+ strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
+ sizeof(default_socket.addr) - 1);
+ default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
+
+ if (default_socket.permissions == 0)
+ socket_permission_set_all (&default_socket);
+
+ open_listen_socket (&default_socket);
+ }
+} /* }}} open_list_sockets_traditional */
+
static int close_listen_sockets (void) /* {{{ */
{
size_t i;
return (0);
} /* }}} int close_listen_sockets */
-static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
+static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
{
struct pollfd *pollfds;
int pollfds_num;
if (pid_fd < 0)
return pid_fd;
- /* open all the listen sockets */
- if (config_listen_address_list_len > 0)
- {
- for (size_t i = 0; i < config_listen_address_list_len; i++)
- open_listen_socket (config_listen_address_list[i]);
+ /* gather sockets passed from systemd;
+ * if none, open all the listen sockets from config or default */
- rrd_free_ptrs((void ***) &config_listen_address_list,
- &config_listen_address_list_len);
- }
- else
- {
- listen_socket_t sock;
- memset(&sock, 0, sizeof(sock));
- strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
- open_listen_socket (&sock);
- }
+ if (!(open_listen_sockets_systemd() > 0))
+ open_listen_sockets_traditional();
if (listen_fds_num < 1)
{
int option;
int status = 0;
- char **permissions = NULL;
- size_t permissions_len = 0;
+ socket_permission_clear (&default_socket);
+
+ default_socket.socket_group = (gid_t)-1;
+ default_socket.socket_permissions = (mode_t)-1;
- while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
+ while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
{
switch (option)
{
+ case 'O':
+ opt_no_overwrite = 1;
+ break;
+
case 'g':
stay_foreground=1;
break;
strncpy(new->addr, optarg, sizeof(new->addr)-1);
/* Add permissions to the socket {{{ */
- if (permissions_len != 0)
+ if (default_socket.permissions != 0)
{
- size_t i;
- for (i = 0; i < permissions_len; i++)
- {
- status = socket_permission_add (new, permissions[i]);
- if (status != 0)
- {
- fprintf (stderr, "read_options: Adding permission \"%s\" to "
- "socket failed. Most likely, this permission doesn't "
- "exist. Check your command line.\n", permissions[i]);
- status = 4;
- }
- }
+ socket_permission_copy (new, &default_socket);
}
- else /* if (permissions_len == 0) */
+ else /* if (default_socket.permissions == 0) */
{
/* Add permission for ALL commands to the socket. */
- size_t i;
- for (i = 0; i < list_of_commands_len; i++)
- {
- status = socket_permission_add (new, list_of_commands[i].cmd);
- if (status != 0)
- {
- fprintf (stderr, "read_options: Adding permission \"%s\" to "
- "socket failed. This should never happen, ever! Sorry.\n",
- permissions[i]);
- status = 4;
- }
- }
+ socket_permission_set_all (new);
}
/* }}} Done adding permissions. */
+ new->socket_group = default_socket.socket_group;
+ new->socket_permissions = default_socket.socket_permissions;
+
if (!rrd_add_ptr((void ***)&config_listen_address_list,
&config_listen_address_list_len, new))
{
}
break;
+ /* set socket group permissions */
+ case 's':
+ {
+ gid_t group_gid;
+ struct group *grp;
+
+ group_gid = strtoul(optarg, NULL, 10);
+ if (errno != EINVAL && group_gid>0)
+ {
+ /* we were passed a number */
+ grp = getgrgid(group_gid);
+ }
+ else
+ {
+ grp = getgrnam(optarg);
+ }
+
+ if (grp)
+ {
+ default_socket.socket_group = grp->gr_gid;
+ }
+ else
+ {
+ /* no idea what the user wanted... */
+ fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
+ return (5);
+ }
+ }
+ break;
+
+ /* set socket file permissions */
+ case 'm':
+ {
+ long tmp;
+ char *endptr = NULL;
+
+ tmp = strtol (optarg, &endptr, 8);
+ if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
+ || (tmp > 07777) || (tmp < 0)) {
+ fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
+ optarg);
+ return (5);
+ }
+
+ default_socket.socket_permissions = (mode_t)tmp;
+ }
+ break;
+
case 'P':
{
char *optcopy;
char *dummy;
char *ptr;
- rrd_free_ptrs ((void *) &permissions, &permissions_len);
+ socket_permission_clear (&default_socket);
optcopy = strdup (optarg);
dummy = optcopy;
while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
{
dummy = NULL;
- rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
+ status = socket_permission_add (&default_socket, ptr);
+ if (status != 0)
+ {
+ fprintf (stderr, "read_options: Adding permission \"%s\" to "
+ "socket failed. Most likely, this permission doesn't "
+ "exist. Check your command line.\n", ptr);
+ status = 4;
+ }
}
free (optcopy);
case 'j':
{
- const char *dir = journal_dir = strdup(optarg);
-
- status = rrd_mkdir_p(dir, 0777);
- if (status != 0)
- {
- fprintf(stderr, "Failed to create journal directory '%s': %s\n",
- dir, rrd_strerror(errno));
- return 6;
- }
+ char journal_dir_actual[PATH_MAX];
+ journal_dir = realpath((const char *)optarg, journal_dir_actual);
+ if (journal_dir)
+ {
+ // if we were able to properly resolve the path, lets have a copy
+ // for use outside this block.
+ journal_dir = strdup(journal_dir);
+ status = rrd_mkdir_p(journal_dir, 0777);
+ if (status != 0)
+ {
+ fprintf(stderr, "Failed to create journal directory '%s': %s\n",
+ journal_dir, rrd_strerror(errno));
+ return 6;
+ }
+ if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
+ {
+ fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
+ errno ? rrd_strerror(errno) : "");
+ return 6;
+ }
+ } else {
+ fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
+ errno ? rrd_strerror(errno) : "");
+ return 6;
+ }
+ }
+ break;
- if (access(dir, R_OK|W_OK|X_OK) != 0)
+ case 'a':
+ {
+ int temp = atoi(optarg);
+ if (temp > 0)
+ config_alloc_chunk = temp;
+ else
{
- fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
- errno ? rrd_strerror(errno) : "");
- return 6;
+ fprintf(stderr, "Invalid allocation size: %s\n", optarg);
+ return 10;
}
}
break;
"\n"
"Valid options are:\n"
" -l <address> Socket address to listen to.\n"
+ " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
" -P <perms> Sets the permissions to assign to all following "
"sockets\n"
" -w <seconds> Interval in which to write data.\n"
" -g Do not fork and run in the foreground.\n"
" -j <dir> Directory in which to create the journal files.\n"
" -F Always flush all updates at shutdown\n"
+ " -s <id|name> Group owner of all following UNIX sockets\n"
+ " (the socket will also have read/write permissions "
+ "for that group)\n"
+ " -m <mode> File permissions (octal) of all following UNIX "
+ "sockets\n"
+ " -a <size> Memory allocation chunk size. Default is 1.\n"
+ " -O Do not allow CREATE commands to overwrite existing\n"
+ " files, even if asked to.\n"
"\n"
"For more information and a detailed description of all options "
"please refer\n"
"to the rrdcached(1) manual page.\n",
VERSION);
- status = -1;
+ if (option == 'h')
+ status = -1;
+ else
+ status = 1;
break;
} /* switch (option) */
} /* while (getopt) */
if (journal_dir == NULL)
config_flush_at_shutdown = 1;
- rrd_free_ptrs ((void *) &permissions, &permissions_len);
-
return (status);
} /* }}} int read_options */