summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 978ca8b)
raw | patch | inline | side by side (parent: 978ca8b)
author | oetiker <oetiker@a5681a0c-68f1-0310-ab6d-d61299d08faa> | |
Mon, 29 Mar 2010 17:03:57 +0000 (17:03 +0000) | ||
committer | oetiker <oetiker@a5681a0c-68f1-0310-ab6d-d61299d08faa> | |
Mon, 29 Mar 2010 17:03:57 +0000 (17:03 +0000) |
version of) rrdfetch(1).
This has advantages over calling "FLUSH" from within the "client",
especially if the daemon is accessed using a network socket. For one, it
makes it easy to separate collecting and storing of data on one side and
creating graphs on another, possibly more public server. Without this
command this is only possible using networked file systems and similar
techniques.
When talking to an instance of RRDCacheD via a network socket, only
relative pathnames are allowed. If the RRD file is to be accessed
afterwards (why else would one call "FLUSH"?), the client has to be in a
specific directory so the *same* relative path can be used. If the file
is on a share mounted via the network, the required CWD may differ from
the CWD of the server, making developing and deploying solutions using
separated storing and graphing unnecessarily hard.
The data can be accessed using "rrdc_fetch" which should be a drop-in
replacement for "rrd_fetch_r". This makes it easy for programs using the
RRDtool C API to use this new functionality. -- Florian Forster
git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk@2059 a5681a0c-68f1-0310-ab6d-d61299d08faa
This has advantages over calling "FLUSH" from within the "client",
especially if the daemon is accessed using a network socket. For one, it
makes it easy to separate collecting and storing of data on one side and
creating graphs on another, possibly more public server. Without this
command this is only possible using networked file systems and similar
techniques.
When talking to an instance of RRDCacheD via a network socket, only
relative pathnames are allowed. If the RRD file is to be accessed
afterwards (why else would one call "FLUSH"?), the client has to be in a
specific directory so the *same* relative path can be used. If the file
is on a share mounted via the network, the required CWD may differ from
the CWD of the server, making developing and deploying solutions using
separated storing and graphing unnecessarily hard.
The data can be accessed using "rrdc_fetch" which should be a drop-in
replacement for "rrd_fetch_r". This makes it easy for programs using the
RRDtool C API to use this new functionality. -- Florian Forster
git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk@2059 a5681a0c-68f1-0310-ab6d-d61299d08faa
program/doc/rrdcached.pod | patch | blob | history | |
program/src/rrd_client.c | patch | blob | history | |
program/src/rrd_client.h | patch | blob | history | |
program/src/rrd_daemon.c | patch | blob | history |
index cf15590de3c22517f924d610da987d5d18927c72..3f72a23cd28a40385c8dc9c7cf942279d94ce1bb 100644 (file)
Shows any "pending" updates for a file, in order. The updates shown have
not yet been written to the underlying RRD file.
+=item B<FETCH> I<filename> I<CF> [I<start> [I<end>]]
+
+Calls C<rrd_fetch> with the specified arguments and returns the result in text
+form. If necessary, the file is flushed to disk first. The client side function
+C<rrdc_fetch> (declared in C<rrd_client.h>) parses the output and behaves just
+like C<rrd_fetch_r> for easy integration of remote queries.
+
=item B<FORGET> I<filename>
Removes I<filename> from the cache. Any pending updates B<WILL BE LOST>.
index 0b69000f26b1e01e0469289f31ef0f8c510fe776..04b54a8042ef29f27a70a68ae0af3e18c40d4a6c 100644 (file)
--- a/program/src/rrd_client.c
+++ b/program/src/rrd_client.c
/**
* RRDTool - src/rrd_client.c
- * Copyright (C) 2008 Florian octo Forster
+ * Copyright (C) 2008-2010 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
return (ret);
} /* }}} char *get_path */
+static size_t strsplit (char *string, char **fields, size_t size) /* {{{ */
+{
+ size_t i;
+ char *ptr;
+ char *saveptr;
+
+ i = 0;
+ ptr = string;
+ saveptr = NULL;
+ while ((fields[i] = strtok_r (ptr, " \t\r\n", &saveptr)) != NULL)
+ {
+ ptr = NULL;
+ i++;
+
+ if (i >= size)
+ break;
+ }
+
+ return (i);
+} /* }}} size_t strsplit */
+
+static int parse_header (char *line, /* {{{ */
+ char **ret_key, char **ret_value)
+{
+ char *tmp;
+
+ *ret_key = line;
+
+ tmp = strchr (line, ':');
+ if (tmp == NULL)
+ return (-1);
+
+ do
+ {
+ *tmp = 0;
+ tmp++;
+ }
+ while ((tmp[0] == ' ') || (tmp[0] == '\t'));
+
+ if (*tmp == 0)
+ return (-1);
+
+ *ret_value = tmp;
+ return (0);
+} /* }}} int parse_header */
+
+static int parse_ulong_header (char *line, /* {{{ */
+ char **ret_key, unsigned long *ret_value)
+{
+ char *str_value;
+ char *endptr;
+ int status;
+
+ str_value = NULL;
+ status = parse_header (line, ret_key, &str_value);
+ if (status != 0)
+ return (status);
+
+ endptr = NULL;
+ errno = 0;
+ *ret_value = (unsigned long) strtol (str_value, &endptr, /* base = */ 0);
+ if ((endptr == str_value) || (errno != 0))
+ return (-1);
+
+ return (0);
+} /* }}} int parse_ulong_header */
+
+static int parse_char_array_header (char *line, /* {{{ */
+ char **ret_key, char **array, size_t array_len, int alloc)
+{
+ char *tmp_array[array_len];
+ char *value;
+ size_t num;
+ int status;
+
+ value = NULL;
+ status = parse_header (line, ret_key, &value);
+ if (status != 0)
+ return (-1);
+
+ num = strsplit (value, tmp_array, array_len);
+ if (num != array_len)
+ return (-1);
+
+ if (alloc == 0)
+ {
+ memcpy (array, tmp_array, sizeof (tmp_array));
+ }
+ else
+ {
+ size_t i;
+
+ for (i = 0; i < array_len; i++)
+ array[i] = strdup (tmp_array[i]);
+ }
+
+ return (0);
+} /* }}} int parse_char_array_header */
+
+static int parse_value_array_header (char *line, /* {{{ */
+ time_t *ret_time, rrd_value_t *array, size_t array_len)
+{
+ char *str_key;
+ char *str_array[array_len];
+ char *endptr;
+ int status;
+ size_t i;
+
+ str_key = NULL;
+ status = parse_char_array_header (line, &str_key,
+ str_array, array_len, /* alloc = */ 0);
+ if (status != 0)
+ return (-1);
+
+ errno = 0;
+ endptr = NULL;
+ *ret_time = (time_t) strtol (str_key, &endptr, /* base = */ 10);
+ if ((endptr == str_key) || (errno != 0))
+ return (-1);
+
+ for (i = 0; i < array_len; i++)
+ {
+ endptr = NULL;
+ array[i] = (rrd_value_t) strtod (str_array[i], &endptr);
+ if ((endptr == str_array[i]) || (errno != 0))
+ return (-1);
+ }
+
+ return (0);
+} /* }}} int parse_value_array_header */
+
/* One must hold `lock' when calling `close_connection'. */
static void close_connection (void) /* {{{ */
{
return (status);
} /* }}} int rrdc_flush */
+int rrdc_fetch (const char *filename, /* {{{ */
+ const char *cf,
+ time_t *ret_start, time_t *ret_end,
+ unsigned long *ret_step,
+ unsigned long *ret_ds_num,
+ char ***ret_ds_names,
+ rrd_value_t **ret_data)
+{
+ char buffer[4096];
+ char *buffer_ptr;
+ size_t buffer_free;
+ size_t buffer_size;
+ rrdc_response_t *res;
+ char path_buffer[PATH_MAX];
+ char *path_ptr;
+
+ char *str_tmp;
+ unsigned long flush_version;
+
+ time_t start;
+ time_t end;
+ unsigned long step;
+ unsigned long ds_num;
+ char **ds_names;
+
+ rrd_value_t *data;
+ size_t data_size;
+ size_t data_fill;
+
+ int status;
+ size_t current_line;
+ time_t t;
+
+ if ((filename == NULL) || (cf == NULL))
+ return (-1);
+
+ /* Send request {{{ */
+ memset (buffer, 0, sizeof (buffer));
+ buffer_ptr = &buffer[0];
+ buffer_free = sizeof (buffer);
+
+ status = buffer_add_string ("FETCH", &buffer_ptr, &buffer_free);
+ if (status != 0)
+ return (ENOBUFS);
+
+ /* change to path for rrdcached */
+ path_ptr = get_path (filename, path_buffer);
+ if (path_ptr == NULL)
+ return (EINVAL);
+
+ status = buffer_add_string (path_ptr, &buffer_ptr, &buffer_free);
+ if (status != 0)
+ return (ENOBUFS);
+
+ status = buffer_add_string (cf, &buffer_ptr, &buffer_free);
+ if (status != 0)
+ return (ENOBUFS);
+
+ if ((ret_start != NULL) && (*ret_start > 0))
+ {
+ char tmp[64];
+ snprintf (tmp, sizeof (tmp), "%lu", (unsigned long) *ret_start);
+ tmp[sizeof (tmp) - 1] = 0;
+ status = buffer_add_string (tmp, &buffer_ptr, &buffer_free);
+ if (status != 0)
+ return (ENOBUFS);
+
+ if ((ret_end != NULL) && (*ret_end > 0))
+ {
+ snprintf (tmp, sizeof (tmp), "%lu", (unsigned long) *ret_end);
+ tmp[sizeof (tmp) - 1] = 0;
+ status = buffer_add_string (tmp, &buffer_ptr, &buffer_free);
+ if (status != 0)
+ return (ENOBUFS);
+ }
+ }
+
+ assert (buffer_free < sizeof (buffer));
+ buffer_size = sizeof (buffer) - buffer_free;
+ assert (buffer[buffer_size - 1] == ' ');
+ buffer[buffer_size - 1] = '\n';
+
+ res = NULL;
+ status = request (buffer, buffer_size, &res);
+ if (status != 0)
+ return (status);
+
+ status = res->status;
+ if (status < 0)
+ {
+ rrd_set_error ("rrdcached: %s", res->message);
+ response_free (res);
+ return (status);
+ }
+ /* }}} Send request */
+
+ ds_names = NULL;
+ ds_num = 0;
+ data = NULL;
+ current_line = 0;
+
+ /* Macros to make error handling a little easier (i. e. less to type and
+ * read. `BAIL_OUT' sets the error message, frees all dynamically allocated
+ * variables and returns the provided status code. */
+#define BAIL_OUT(status, ...) do { \
+ rrd_set_error ("rrdc_fetch: " __VA_ARGS__); \
+ free (data); \
+ if (ds_names != 0) { size_t k; for (k = 0; k < ds_num; k++) free (ds_names[k]); } \
+ free (ds_names); \
+ response_free (res); \
+ return (status); \
+ } while (0)
+
+#define READ_NUMERIC_FIELD(name,type,var) do { \
+ char *key; \
+ unsigned long value; \
+ assert (current_line < res->lines_num); \
+ status = parse_ulong_header (res->lines[current_line], &key, &value); \
+ if (status != 0) \
+ BAIL_OUT (-1, "Unable to parse header `%s'", name); \
+ if (strcasecmp (key, name) != 0) \
+ BAIL_OUT (-1, "Unexpected header line: Expected `%s', got `%s'", name, key); \
+ var = (type) value; \
+ current_line++; \
+ } while (0)
+
+ if (res->lines_num < 1)
+ BAIL_OUT (-1, "Premature end of response packet");
+
+ /* We're making some very strong assumptions about the fields below. We
+ * therefore check the version of the `flush' command first, so that later
+ * versions can change the order of fields and it's easier to implement
+ * backwards compatibility. */
+ READ_NUMERIC_FIELD ("FlushVersion", unsigned long, flush_version);
+ if (flush_version != 1)
+ BAIL_OUT (-1, "Don't know how to handle flush format version %lu.",
+ flush_version);
+
+ if (res->lines_num < 5)
+ BAIL_OUT (-1, "Premature end of response packet");
+
+ READ_NUMERIC_FIELD ("Start", time_t, start);
+ READ_NUMERIC_FIELD ("End", time_t, end);
+ if (start >= end)
+ BAIL_OUT (-1, "Malformed start and end times: start = %lu; end = %lu;",
+ (unsigned long) start,
+ (unsigned long) end);
+
+ READ_NUMERIC_FIELD ("Step", unsigned long, step);
+ if (step < 1)
+ BAIL_OUT (-1, "Invalid number for Step: %lu", step);
+
+ READ_NUMERIC_FIELD ("DSCount", unsigned long, ds_num);
+ if (ds_num < 1)
+ BAIL_OUT (-1, "Invalid number for DSCount: %lu", ds_num);
+
+ /* It's time to allocate some memory */
+ ds_names = calloc ((size_t) ds_num, sizeof (*ds_names));
+ if (ds_names == NULL)
+ BAIL_OUT (-1, "Out of memory");
+
+ status = parse_char_array_header (res->lines[current_line],
+ &str_tmp, ds_names, (size_t) ds_num, /* alloc = */ 1);
+ if (status != 0)
+ BAIL_OUT (-1, "Unable to parse header `DSName'");
+ if (strcasecmp ("DSName", str_tmp) != 0)
+ BAIL_OUT (-1, "Unexpected header line: Expected `DSName', got `%s'", str_tmp);
+ current_line++;
+
+ data_size = ds_num * (end - start) / step;
+ if (data_size < 1)
+ BAIL_OUT (-1, "No data returned or headers invalid.");
+
+ if (res->lines_num != (6 + (data_size / ds_num)))
+ BAIL_OUT (-1, "Got %zu lines, expected %zu",
+ res->lines_num, (6 + (data_size / ds_num)));
+
+ data = calloc (data_size, sizeof (*data));
+ if (data == NULL)
+ BAIL_OUT (-1, "Out of memory");
+
+
+ data_fill = 0;
+ for (t = start + step; t <= end; t += step, current_line++)
+ {
+ time_t tmp;
+
+ assert (current_line < res->lines_num);
+
+ status = parse_value_array_header (res->lines[current_line],
+ &tmp, data + data_fill, (size_t) ds_num);
+ if (status != 0)
+ BAIL_OUT (-1, "Cannot parse value line");
+
+ data_fill += (size_t) ds_num;
+ }
+
+ *ret_start = start;
+ *ret_end = end;
+ *ret_step = step;
+ *ret_ds_num = ds_num;
+ *ret_ds_names = ds_names;
+ *ret_data = data;
+
+ response_free (res);
+ return (0);
+#undef READ_NUMERIC_FIELD
+#undef BAIL_OUT
+} /* }}} int rrdc_flush */
/* convenience function; if there is a daemon specified, or if we can
* detect one from the environment, then flush the file. Otherwise, no-op
index 6c48dec8e953823e147c66887ca6241b597f7373..58f5473c7cf4d3becefa8dcf6a7792350df35035 100644 (file)
--- a/program/src/rrd_client.h
+++ b/program/src/rrd_client.h
/**
* RRDTool - src/rrd_client.h
- * Copyright (C) 2008 Florian octo Forster
+ * Copyright (C) 2008-2010 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
int rrdc_flush (const char *filename);
int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename);
+int rrdc_fetch (const char *filename,
+ const char *cf,
+ time_t *ret_start, time_t *ret_end,
+ unsigned long *ret_step,
+ unsigned long *ret_ds_num,
+ char ***ret_ds_names,
+ rrd_value_t **ret_data);
+
#else
# define rrdc_flush_if_daemon(a,b) 0
# define rrdc_connect(a) 0
index ef6d83cb31fc86d7dcc28cad062d4cdba6249981..248095899b4c4ea2f36b0af562eb1cee07e7b5d4 100644 (file)
--- a/program/src/rrd_daemon.c
+++ b/program/src/rrd_daemon.c
/**
* RRDTool - src/rrd_daemon.c
- * Copyright (C) 2008,2009 Florian octo Forster
+ * Copyright (C) 2008-2010 Florian octo Forster
* Copyright (C) 2008,2009 Kevin Brintnall
*
* This program is free software; you can redistribute it and/or modify it
} /* }}} int handle_request_update */
+static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
+{
+ char *file;
+ char *cf;
+
+ char *start_str;
+ char *end_str;
+ rrd_time_value_t start_tv;
+ rrd_time_value_t end_tv;
+ time_t start_tm;
+ time_t end_tm;
+
+ unsigned long step;
+ unsigned long ds_cnt;
+ char **ds_namv;
+ rrd_value_t *data;
+
+ int status;
+ unsigned long i;
+ time_t t;
+ rrd_value_t *data_ptr;
+
+ file = NULL;
+ cf = NULL;
+ start_str = NULL;
+ end_str = NULL;
+
+ /* Read the arguments */
+ do /* while (0) */
+ {
+ status = buffer_get_field (&buffer, &buffer_size, &file);
+ if (status != 0)
+ break;
+
+ status = buffer_get_field (&buffer, &buffer_size, &cf);
+ if (status != 0)
+ break;
+
+ status = buffer_get_field (&buffer, &buffer_size, &start_str);
+ if (status != 0)
+ {
+ start_str = NULL;
+ status = 0;
+ break;
+ }
+
+ status = buffer_get_field (&buffer, &buffer_size, &end_str);
+ if (status != 0)
+ {
+ end_str = NULL;
+ status = 0;
+ break;
+ }
+ } while (0);
+
+ if (status != 0)
+ return (syntax_error(sock,cmd));
+
+ status = flush_file (file);
+ if ((status != 0) && (status != ENOENT))
+ return (send_response (sock, RESP_ERR,
+ "flush_file (%s) failed with status %i.\n", file, status));
+
+ /* Parse start time */
+ if (start_str != NULL)
+ {
+ const char *errmsg;
+
+ errmsg = rrd_parsetime (start_str, &start_tv);
+ if (errmsg != NULL)
+ return (send_response(sock, RESP_ERR,
+ "Cannot parse start time `%s': %s\n", start_str, errmsg));
+ }
+ else
+ rrd_parsetime ("-86400", &start_tv);
+
+ /* Parse end time */
+ if (end_str != NULL)
+ {
+ const char *errmsg;
+
+ errmsg = rrd_parsetime (end_str, &end_tv);
+ if (errmsg != NULL)
+ return (send_response(sock, RESP_ERR,
+ "Cannot parse end time `%s': %s\n", end_str, errmsg));
+ }
+ else
+ rrd_parsetime ("now", &end_tv);
+
+ start_tm = 0;
+ end_tm = 0;
+ status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm);
+ if (status != 0)
+ return (send_response(sock, RESP_ERR,
+ "rrd_proc_start_end failed: %s\n", rrd_get_error ()));
+
+ step = -1;
+ ds_cnt = 0;
+ ds_namv = NULL;
+ data = NULL;
+
+ status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
+ &ds_cnt, &ds_namv, &data);
+ if (status != 0)
+ return (send_response(sock, RESP_ERR,
+ "rrd_fetch_r failed: %s\n", rrd_get_error ()));
+
+ add_response_info (sock, "FlushVersion: %lu\n", 1);
+ add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
+ add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
+ add_response_info (sock, "Step: %lu\n", step);
+ add_response_info (sock, "DSCount: %lu\n", ds_cnt);
+
+#define SSTRCAT(buffer,str,buffer_fill) do { \
+ size_t str_len = strlen (str); \
+ if ((buffer_fill + str_len) > sizeof (buffer)) \
+ str_len = sizeof (buffer) - buffer_fill; \
+ if (str_len > 0) { \
+ strncpy (buffer + buffer_fill, str, str_len); \
+ buffer_fill += str_len; \
+ assert (buffer_fill <= sizeof (buffer)); \
+ if (buffer_fill == sizeof (buffer)) \
+ buffer[buffer_fill - 1] = 0; \
+ else \
+ buffer[buffer_fill] = 0; \
+ } \
+ } while (0)
+
+ { /* Add list of DS names */
+ char linebuf[1024];
+ size_t linebuf_fill;
+
+ memset (linebuf, 0, sizeof (linebuf));
+ linebuf_fill = 0;
+ for (i = 0; i < ds_cnt; i++)
+ {
+ if (i > 0)
+ SSTRCAT (linebuf, " ", linebuf_fill);
+ SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
+ }
+ add_response_info (sock, "DSName: %s\n", linebuf);
+ }
+
+ /* Add the actual data */
+ assert (step > 0);
+ data_ptr = data;
+ for (t = start_tm + step; t <= end_tm; t += step)
+ {
+ char linebuf[1024];
+ size_t linebuf_fill;
+ char tmp[128];
+
+ memset (linebuf, 0, sizeof (linebuf));
+ linebuf_fill = 0;
+ for (i = 0; i < ds_cnt; i++)
+ {
+ snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
+ tmp[sizeof (tmp) - 1] = 0;
+ SSTRCAT (linebuf, tmp, linebuf_fill);
+
+ data_ptr++;
+ }
+
+ add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
+ } /* for (t) */
+
+ return (send_response (sock, RESP_OK, "Success\n"));
+#undef SSTRCAT
+} /* }}} int handle_request_fetch */
+
/* we came across a "WROTE" entry during journal replay.
* throw away any values that we have accumulated for this file
*/
NULL,
NULL
},
+ {
+ "FETCH",
+ handle_request_fetch,
+ CMD_CONTEXT_CLIENT,
+ "FETCH <file> <CF> [<start> [<end>]]\n"
+ ,
+ "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
+ },
{
"QUIT",
handle_request_quit,