From 4f05de6dce917250505eaa937aed4a6e0892e62e Mon Sep 17 00:00:00 2001 From: Aurelien ROUGEMONT Date: Wed, 27 Jan 2016 17:06:45 +0100 Subject: [PATCH] Add KAIROSDB format to write_http plugin --- src/Makefile.am | 3 +- src/collectd.conf.pod | 5 +- src/utils_format_kairosdb.c | 379 ++++++++++++++++++++++++++++++++++++ src/utils_format_kairosdb.h | 45 +++++ src/write_http.c | 80 +++++++- 5 files changed, 501 insertions(+), 11 deletions(-) create mode 100644 src/utils_format_kairosdb.c create mode 100644 src/utils_format_kairosdb.h diff --git a/src/Makefile.am b/src/Makefile.am index 641e2fae..93045e23 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1221,7 +1221,8 @@ endif if BUILD_PLUGIN_WRITE_HTTP pkglib_LTLIBRARIES += write_http.la write_http_la_SOURCES = write_http.c \ - utils_format_json.c utils_format_json.h + utils_format_json.c utils_format_json.h \ + utils_format_kairosdb.c utils_format_kairosdb.h write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS) write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS) write_http_la_LIBADD = $(BUILD_WITH_LIBCURL_LIBS) diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index fffef712..04c654a1 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -7809,11 +7809,12 @@ Define which SSL protocol version must be used. By default C will attempt to figure out the remote SSL protocol version. See L for more details. -=item B B|B +=item B B|B|B Format of the output to generate. If set to B, will create output that is understood by the I and I plugins. When set to B, will -create output in the I (JSON). +create output in the I (JSON). When set to KAIROSDB +, will create output in the KairosDB format. Defaults to B. diff --git a/src/utils_format_kairosdb.c b/src/utils_format_kairosdb.c new file mode 100644 index 00000000..281ce524 --- /dev/null +++ b/src/utils_format_kairosdb.c @@ -0,0 +1,379 @@ +/** + * collectd - src/utils_format_kairosdb.c + * Copyright (C) 2016 Aurelien beorn Rougemont + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Aurelien beorn Rougemont + **/ + + +#include "collectd.h" +#include "plugin.h" +#include "common.h" + +#include "utils_cache.h" +#include "utils_format_kairosdb.h" + + +/* This is the KAIROSDB format for write_http output + * + * Target format + * [ + * { + * "name":"collectd.vmem" + * "datapoints": + * [ + * [1453897164060, 97.000000] + * ], + * "tags": + * { + * "host": "fqdn.domain.tld", + * "plugin_instance": "vmpage_number", + * "type": "kernel_stack", + * "ds": "value" + * "" + * } + * } + * ] + */ + +static int kairosdb_escape_string (char *buffer, size_t buffer_size, /* {{{ */ + const char *string) +{ + size_t src_pos; + size_t dst_pos; + + if ((buffer == NULL) || (string == NULL)) + return (-EINVAL); + + if (buffer_size < 3) + return (-ENOMEM); + + dst_pos = 0; + +#define BUFFER_ADD(c) do { \ + if (dst_pos >= (buffer_size - 1)) { \ + buffer[buffer_size - 1] = 0; \ + return (-ENOMEM); \ + } \ + buffer[dst_pos] = (c); \ + dst_pos++; \ +} while (0) + + /* Escape special characters */ + /* authorize -_. and alpha num but also escapes " */ + BUFFER_ADD ('"'); + for (src_pos = 0; string[src_pos] != 0; src_pos++) + { + if (isalnum(string[src_pos]) || + 0x2d == string[src_pos] || + 0x2e == string[src_pos] || + 0x5f == string[src_pos]) + BUFFER_ADD (tolower(string[src_pos])); + } /* for */ + BUFFER_ADD ('"'); + buffer[dst_pos] = 0; + +#undef BUFFER_ADD + + return (0); +} /* }}} int kairosdb_escape_string */ + +static int values_to_kairosdb (char *buffer, size_t buffer_size, /* {{{ */ + const data_set_t *ds, const value_list_t *vl, int store_rates, + int ds_idx) +{ + size_t offset = 0; + gauge_t *rates = NULL; + + memset (buffer, 0, buffer_size); + +#define BUFFER_ADD(...) do { \ + int status; \ + status = ssnprintf (buffer + offset, buffer_size - offset, \ + __VA_ARGS__); \ + if (status < 1) \ + { \ + sfree(rates); \ + return (-1); \ + } \ + else if (((size_t) status) >= (buffer_size - offset)) \ + { \ + sfree(rates); \ + return (-ENOMEM); \ + } \ + else \ + offset += ((size_t) status); \ +} while (0) + + if (ds->ds[ds_idx].type == DS_TYPE_GAUGE) + { + if (isfinite (vl->values[ds_idx].gauge)) + { + BUFFER_ADD ("[["); + BUFFER_ADD ("%"PRIu64, CDTIME_T_TO_MS (vl->time)); + BUFFER_ADD (","); + BUFFER_ADD (JSON_GAUGE_FORMAT, vl->values[ds_idx].gauge); + } + else + { + DEBUG ("utils_format_kairosdb: invalid vl->values[ds_idx].gauge for %s|%s|%s|%s|%s", + vl->plugin, + vl->plugin_instance, + vl->type, + vl->type_instance, + ds->ds[ds_idx].name); + return (-1); + } + } + else if (store_rates) + { + if (rates == NULL) + rates = uc_get_rate (ds, vl); + if (rates == NULL) + { + WARNING ("utils_format_kairosdb: uc_get_rate failed for %s|%s|%s|%s|%s", + vl->plugin, + vl->plugin_instance, + vl->type, + vl->type_instance, + ds->ds[ds_idx].name); + + return (-1); + } + + if (isfinite (rates[ds_idx])) + { + BUFFER_ADD ("[["); + BUFFER_ADD ("%"PRIu64, CDTIME_T_TO_MS (vl->time)); + BUFFER_ADD (","); + BUFFER_ADD (JSON_GAUGE_FORMAT, rates[ds_idx]); + } + else + { + WARNING ("utils_format_kairosdb: invalid rates[ds_idx] for %s|%s|%s|%s|%s", + vl->plugin, + vl->plugin_instance, + vl->type, + vl->type_instance, + ds->ds[ds_idx].name); + sfree(rates); + return (-1); + } + } + else if (ds->ds[ds_idx].type == DS_TYPE_COUNTER) + { + BUFFER_ADD ("[["); + BUFFER_ADD ("%"PRIu64, CDTIME_T_TO_MS (vl->time)); + BUFFER_ADD (","); + BUFFER_ADD ("%llu", vl->values[ds_idx].counter); + } + else if (ds->ds[ds_idx].type == DS_TYPE_DERIVE) + { + BUFFER_ADD ("[["); + BUFFER_ADD ("%"PRIu64, CDTIME_T_TO_MS (vl->time)); + BUFFER_ADD (","); + BUFFER_ADD ("%"PRIi64, vl->values[ds_idx].derive); + } + else if (ds->ds[ds_idx].type == DS_TYPE_ABSOLUTE) + { + BUFFER_ADD ("[["); + BUFFER_ADD ("%"PRIu64, CDTIME_T_TO_MS (vl->time)); + BUFFER_ADD (","); + BUFFER_ADD ("%"PRIu64, vl->values[ds_idx].absolute); + } + else + { + ERROR ("format_kairosdb: Unknown data source type: %i", + ds->ds[ds_idx].type); + sfree (rates); + return (-1); + } + BUFFER_ADD ("]]"); + +#undef BUFFER_ADD + + DEBUG ("format_kairosdb: values_to_kairosdb: buffer = %s;", buffer); + sfree(rates); + return (0); +} /* }}} int values_to_kairosdb */ + +static int value_list_to_kairosdb (char *buffer, size_t buffer_size, /* {{{ */ + const data_set_t *ds, const value_list_t *vl, int store_rates) +{ + char temp[512]; + size_t offset = 0; + int status; + int i=0; + + memset (buffer, 0, buffer_size); + +#define BUFFER_ADD(...) do { \ + status = ssnprintf (buffer + offset, buffer_size - offset, \ + __VA_ARGS__); \ + if (status < 1) \ + return (-1); \ + else if (((size_t) status) >= (buffer_size - offset)) \ + return (-ENOMEM); \ + else \ + offset += ((size_t) status); \ +} while (0) + +#define BUFFER_ADD_KEYVAL(key, value) do { \ + status = kairosdb_escape_string (temp, sizeof (temp), (value)); \ + if (status != 0) \ + return (status); \ + BUFFER_ADD (",\"%s\": %s", (key), temp); \ +} while (0) + + for (i = 0; i < ds->ds_num; i++) + { + /* All value lists have a leading comma. The first one will be replaced with + * a square bracket in `format_kairosdb_finalize'. */ + BUFFER_ADD (",{"); + + BUFFER_ADD ("\"name\":\"collectd"); + + BUFFER_ADD (".%s", vl->plugin); + + status = values_to_kairosdb (temp, sizeof (temp), ds, vl, store_rates, i); + if (status != 0) + return (status); + + BUFFER_ADD ("\", \"datapoints\": %s", temp); + + /* + * Now adds meta data to metric as tags + */ + + memset (temp, 0, sizeof(temp)); + + BUFFER_ADD (", \"tags\":\{"); + + BUFFER_ADD ("\"host\": \"%s\"", vl->host); + if (strlen(vl->plugin_instance)) + BUFFER_ADD_KEYVAL ("plugin_instance", vl->plugin_instance); + BUFFER_ADD_KEYVAL ("type", vl->type); + if (strlen(vl->type_instance)) + BUFFER_ADD_KEYVAL ("type_instance", vl->type_instance); + if (ds->ds_num != 1) + BUFFER_ADD_KEYVAL ("ds", ds->ds[i].name); + BUFFER_ADD ("}}"); + } /* for ds->ds_num */ + +#undef BUFFER_ADD_KEYVAL +#undef BUFFER_ADD + + DEBUG ("format_kairosdb: value_list_to_kairosdb: buffer = %s;", buffer); + + return (0); +} /* }}} int value_list_to_kairosdb */ + +static int format_kairosdb_value_list_nocheck (char *buffer, /* {{{ */ + size_t *ret_buffer_fill, size_t *ret_buffer_free, + const data_set_t *ds, const value_list_t *vl, + int store_rates, size_t temp_size) +{ + char temp[temp_size]; + int status; + + status = value_list_to_kairosdb (temp, sizeof (temp), ds, vl, store_rates); + if (status != 0) + return (status); + temp_size = strlen (temp); + + memcpy (buffer + (*ret_buffer_fill), temp, temp_size + 1); + (*ret_buffer_fill) += temp_size; + (*ret_buffer_free) -= temp_size; + + return (0); +} /* }}} int format_kairosdb_value_list_nocheck */ + +int format_kairosdb_initialize (char *buffer, /* {{{ */ + size_t *ret_buffer_fill, size_t *ret_buffer_free) +{ + size_t buffer_fill; + size_t buffer_free; + + if ((buffer == NULL) || (ret_buffer_fill == NULL) || (ret_buffer_free == NULL)) + return (-EINVAL); + + buffer_fill = *ret_buffer_fill; + buffer_free = *ret_buffer_free; + + buffer_free = buffer_fill + buffer_free; + buffer_fill = 0; + + if (buffer_free < 3) + return (-ENOMEM); + + memset (buffer, 0, buffer_free); + *ret_buffer_fill = buffer_fill; + *ret_buffer_free = buffer_free; + + return (0); +} /* }}} int format_kairosdb_initialize */ + +int format_kairosdb_finalize (char *buffer, /* {{{ */ + size_t *ret_buffer_fill, size_t *ret_buffer_free) +{ + size_t pos; + + if ((buffer == NULL) || (ret_buffer_fill == NULL) || (ret_buffer_free == NULL)) + return (-EINVAL); + + if (*ret_buffer_free < 2) + return (-ENOMEM); + + /* Replace the leading comma added in `value_list_to_kairosdb' with a square + * bracket. */ + if (buffer[0] != ',') + return (-EINVAL); + buffer[0] = '['; + + pos = *ret_buffer_fill; + buffer[pos] = ']'; + buffer[pos+1] = 0; + + (*ret_buffer_fill)++; + (*ret_buffer_free)--; + + return (0); +} /* }}} int format_kairosdb_finalize */ + +int format_kairosdb_value_list (char *buffer, /* {{{ */ + size_t *ret_buffer_fill, size_t *ret_buffer_free, + const data_set_t *ds, const value_list_t *vl, int store_rates) +{ + if ((buffer == NULL) + || (ret_buffer_fill == NULL) || (ret_buffer_free == NULL) + || (ds == NULL) || (vl == NULL)) + return (-EINVAL); + + if (*ret_buffer_free < 3) + return (-ENOMEM); + + return (format_kairosdb_value_list_nocheck (buffer, + ret_buffer_fill, ret_buffer_free, ds, vl, + store_rates, (*ret_buffer_free) - 2)); +} /* }}} int format_kairosdb_value_list */ + +/* vim: set sw=2 sts=2 et fdm=marker : */ diff --git a/src/utils_format_kairosdb.h b/src/utils_format_kairosdb.h new file mode 100644 index 00000000..23cc862e --- /dev/null +++ b/src/utils_format_kairosdb.h @@ -0,0 +1,45 @@ +/** + * collectd - src/utils_format_kairosdb.h + * Copyright (C) 2016 Aurelien Rougemont + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Aurelien beorn Rougemont + **/ + +#ifndef UTILS_FORMAT_KAIROSDB_H +#define UTILS_FORMAT_KAIROSDB_H 1 + +#include "collectd.h" +#include "plugin.h" + +#ifndef JSON_GAUGE_FORMAT +# define JSON_GAUGE_FORMAT GAUGE_FORMAT +#endif + +int format_kairosdb_initialize (char *buffer, + size_t *ret_buffer_fill, size_t *ret_buffer_free); +int format_kairosdb_value_list (char *buffer, + size_t *ret_buffer_fill, size_t *ret_buffer_free, + const data_set_t *ds, const value_list_t *vl, int store_rates); +int format_kairosdb_finalize (char *buffer, + size_t *ret_buffer_fill, size_t *ret_buffer_free); + +#endif /* UTILS_FORMAT_KAIROSDB_H */ diff --git a/src/write_http.c b/src/write_http.c index aea79352..ac6ef2f5 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -27,6 +27,7 @@ #include "plugin.h" #include "common.h" #include "utils_format_json.h" +#include "utils_format_kairosdb.h" #include @@ -59,8 +60,9 @@ struct wh_callback_s time_t low_speed_time; int timeout; -#define WH_FORMAT_COMMAND 0 -#define WH_FORMAT_JSON 1 +#define WH_FORMAT_COMMAND 0 +#define WH_FORMAT_JSON 1 +#define WH_FORMAT_KAIROSDB 2 int format; CURL *curl; @@ -97,7 +99,7 @@ static void wh_reset_buffer (wh_callback_t *cb) /* {{{ */ cb->send_buffer_fill = 0; cb->send_buffer_init_time = cdtime (); - if (cb->format == WH_FORMAT_JSON) + if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) { format_json_initialize (cb->send_buffer, &cb->send_buffer_fill, @@ -152,7 +154,7 @@ static int wh_callback_init (wh_callback_t *cb) /* {{{ */ curl_easy_setopt (cb->curl, CURLOPT_USERAGENT, COLLECTD_USERAGENT); cb->headers = curl_slist_append (cb->headers, "Accept: */*"); - if (cb->format == WH_FORMAT_JSON) + if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) cb->headers = curl_slist_append (cb->headers, "Content-Type: application/json"); else cb->headers = curl_slist_append (cb->headers, "Content-Type: text/plain"); @@ -244,7 +246,7 @@ static int wh_flush_nolock (cdtime_t timeout, wh_callback_t *cb) /* {{{ */ status = wh_send_buffer (cb); wh_reset_buffer (cb); } - else if (cb->format == WH_FORMAT_JSON) + else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) { if (cb->send_buffer_fill <= 2) { @@ -487,6 +489,60 @@ static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ * return (0); } /* }}} int wh_write_json */ +static int wh_write_kairosdb (const data_set_t *ds, const value_list_t *vl, /* {{{ */ + wh_callback_t *cb) +{ + int status; + + pthread_mutex_lock (&cb->send_lock); + + if (cb->curl == NULL) + { + status = wh_callback_init (cb); + if (status != 0) + { + ERROR ("write_http plugin: wh_callback_init failed."); + pthread_mutex_unlock (&cb->send_lock); + return (-1); + } + } + + status = format_kairosdb_value_list (cb->send_buffer, + &cb->send_buffer_fill, + &cb->send_buffer_free, + ds, vl, cb->store_rates); + if (status == -ENOMEM) + { + status = wh_flush_nolock (/* timeout = */ 0, cb); + if (status != 0) + { + wh_reset_buffer (cb); + pthread_mutex_unlock (&cb->send_lock); + return (status); + } + + status = format_kairosdb_value_list (cb->send_buffer, + &cb->send_buffer_fill, + &cb->send_buffer_free, + ds, vl, cb->store_rates); + } + if (status != 0) + { + pthread_mutex_unlock (&cb->send_lock); + return (status); + } + + DEBUG ("write_http plugin: <%s> buffer %zu/%zu (%g%%)", + cb->location, + cb->send_buffer_fill, cb->send_buffer_size, + 100.0 * ((double) cb->send_buffer_fill) / ((double) cb->send_buffer_size)); + + /* Check if we have enough space for this command. */ + pthread_mutex_unlock (&cb->send_lock); + + return (0); +} /* }}} int wh_write_kairosdb */ + static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ user_data_t *user_data) { @@ -498,11 +554,17 @@ static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ cb = user_data->data; - if (cb->format == WH_FORMAT_JSON) + switch(cb->format) { + case WH_FORMAT_JSON: status = wh_write_json (ds, vl, cb); - else + break; + case WH_FORMAT_KAIROSDB: + status = wh_write_kairosdb (ds, vl, cb); + break; + default: status = wh_write_command (ds, vl, cb); - + break; + } return (status); } /* }}} int wh_write */ @@ -524,6 +586,8 @@ static int config_set_format (wh_callback_t *cb, /* {{{ */ cb->format = WH_FORMAT_COMMAND; else if (strcasecmp ("JSON", string) == 0) cb->format = WH_FORMAT_JSON; + else if (strcasecmp ("KAIROSDB", string) == 0) + cb->format = WH_FORMAT_KAIROSDB; else { ERROR ("write_http plugin: Invalid format string: %s", -- 2.30.2