From 2320afb2d74ee94e24442c669a19dae3f8fa9858 Mon Sep 17 00:00:00 2001 From: Thomas Meson Date: Sat, 11 Aug 2012 00:11:54 +0200 Subject: [PATCH] amqp plugin: add support for Graphite output This commit implements "Graphite format" for AMQP Plugin. The AMQP plugin will be able to directly output a valid Graphite metric format ( \n). This is very useful when the Graphite server is directly reading from an AMQP broker. You can then avoid having a proxy somewhere doing the conversion between PUTVAL or JSON metrics into Graphite format. Signed-off-by: Florian Forster --- src/Makefile.am | 3 +- src/amqp.c | 46 +++++++- src/collectd.conf.pod | 25 ++++ src/utils_format_graphite.c | 225 ++++++++++++++++++++++++++++++++++++ src/utils_format_graphite.h | 33 ++++++ 5 files changed, 329 insertions(+), 3 deletions(-) create mode 100644 src/utils_format_graphite.c create mode 100644 src/utils_format_graphite.h diff --git a/src/Makefile.am b/src/Makefile.am index f106fa1a..f8c1f798 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -124,7 +124,8 @@ if BUILD_PLUGIN_AMQP pkglib_LTLIBRARIES += amqp.la amqp_la_SOURCES = amqp.c \ utils_cmd_putval.c utils_cmd_putval.h \ - utils_format_json.c utils_format_json.h + utils_format_json.c utils_format_json.h \ + utils_format_graphite.c utils_format_graphite.h amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS) amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS) amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS) diff --git a/src/amqp.c b/src/amqp.c index 55d2a2ce..c9929dc0 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -31,6 +31,7 @@ #include "plugin.h" #include "utils_cmd_putval.h" #include "utils_format_json.h" +#include "utils_format_graphite.h" #include @@ -42,8 +43,9 @@ #define CAMQP_DM_VOLATILE 1 #define CAMQP_DM_PERSISTENT 2 -#define CAMQP_FORMAT_COMMAND 1 -#define CAMQP_FORMAT_JSON 2 +#define CAMQP_FORMAT_COMMAND 1 +#define CAMQP_FORMAT_JSON 2 +#define CAMQP_FORMAT_GRAPHITE 3 #define CAMQP_CHANNEL 1 @@ -68,6 +70,10 @@ struct camqp_config_s uint8_t delivery_mode; _Bool store_rates; int format; + /* publish & graphite format only */ + char *prefix; + char *postfix; + char escape_char; /* subscribe only */ char *exchange_type; @@ -129,6 +135,9 @@ static void camqp_config_free (void *ptr) /* {{{ */ sfree (conf->exchange_type); sfree (conf->queue); sfree (conf->routing_key); + sfree (conf->prefix); + sfree (conf->postfix); + sfree (conf); } /* }}} void camqp_config_free */ @@ -698,6 +707,8 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ props.content_type = amqp_cstring_bytes("text/collectd"); else if (conf->format == CAMQP_FORMAT_JSON) props.content_type = amqp_cstring_bytes("application/json"); + else if (conf->format == CAMQP_FORMAT_GRAPHITE) + props.content_type = amqp_cstring_bytes("text/graphite"); else assert (23 == 42); props.delivery_mode = conf->delivery_mode; @@ -776,6 +787,17 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); format_json_finalize (buffer, &bfill, &bfree); } + else if (conf->format == CAMQP_FORMAT_GRAPHITE) + { + status = format_graphite (buffer, sizeof (buffer), ds, vl, + conf->prefix, conf->postfix, conf->escape_char); + if (status != 0) + { + ERROR ("amqp plugin: format_graphite failed with status %i.", + status); + return (status); + } + } else { ERROR ("amqp plugin: Invalid format (%i).", conf->format); @@ -808,6 +830,8 @@ static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */ conf->format = CAMQP_FORMAT_COMMAND; else if (strcasecmp ("JSON", string) == 0) conf->format = CAMQP_FORMAT_JSON; + else if (strcasecmp ("Graphite", string) == 0) + conf->format = CAMQP_FORMAT_GRAPHITE; else { WARNING ("amqp plugin: Invalid format string: %s", @@ -848,6 +872,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + /* publish & graphite only */ + conf->prefix = NULL; + conf->postfix = NULL; + conf->escape_char = '_'; /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; @@ -905,6 +933,20 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ status = cf_util_get_boolean (child, &conf->store_rates); else if ((strcasecmp ("Format", child->key) == 0) && publish) status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) + status = cf_util_get_string (child, &conf->prefix); + else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish) + status = cf_util_get_string (child, &conf->postfix); + else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish) + { + char *tmp_buff = NULL; + status = cf_util_get_string (child, &tmp_buff); + if (strlen (tmp_buff) > 1) + WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + conf->escape_char = tmp_buff[0]; + sfree (tmp_buff); + } else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key); diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index e275112f..5eb3af47 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -210,6 +210,8 @@ possibly filtering or messages. # Persistent false # Format "command" # StoreRates false + # GraphitePrefix "collectd." + # GraphiteEscapeChar "_" # Receive values from an AMQP broker @@ -310,6 +312,10 @@ If set to B, the values are encoded in the I, an easy and straight forward exchange format. The C header field will be set to C. +If set to B, values are encoded in the I format, which is +" \n". The C header field will be set to +C. + A subscribing client I use the C header field to determine how to decode the values. Currently, the I itself can only decode the B format. @@ -324,6 +330,25 @@ using the internal value cache. Please note that currently this option is only used if the B option has been set to B. +=item B (Publish and B=I only) + +A prefix can be added in the metric name when outputting in the I format. +It's added before the I name. +Metric name will be "" + +=item B (Publish and B=I only) + +A postfix can be added in the metric name when outputting in the I format. +It's added after the I name. +Metric name will be "" + +=item B (Publish and B=I only) + +Specify a character to replace dots (.) in the host part of the metric name. +In I metric name, dots are used as separators between different +metric parts (host, plugin, type). +Default is "_" (I). + =back =head2 Plugin C diff --git a/src/utils_format_graphite.c b/src/utils_format_graphite.c new file mode 100644 index 00000000..b0ebc1eb --- /dev/null +++ b/src/utils_format_graphite.c @@ -0,0 +1,225 @@ +/** + * collectd - src/utils_format_graphite.c + * Copyright (C) 2012 Thomas Meson + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Author: + * Thomas Meson + **/ + +#include "collectd.h" +#include "plugin.h" +#include "common.h" + +#include "utils_cache.h" +#include "utils_format_json.h" +#include "utils_parse_option.h" + +/* Utils functions to format data sets in graphite format. + * Largely taken from write_graphite.c as it remains the same formatting */ + +static int gr_format_values (char *ret, size_t ret_len, + int ds_num, const data_set_t *ds, const value_list_t *vl) +{ + size_t offset = 0; + int status; + + assert (0 == strcmp (ds->type, vl->type)); + + memset (ret, 0, ret_len); + +#define BUFFER_ADD(...) do { \ + status = ssnprintf (ret + offset, ret_len - offset, \ + __VA_ARGS__); \ + if (status < 1) \ + { \ + return (-1); \ + } \ + else if (((size_t) status) >= (ret_len - offset)) \ + { \ + return (-1); \ + } \ + else \ + offset += ((size_t) status); \ +} while (0) + + if (ds->ds[ds_num].type == DS_TYPE_GAUGE) + BUFFER_ADD ("%f", vl->values[ds_num].gauge); + else if (ds->ds[ds_num].type == DS_TYPE_COUNTER) + BUFFER_ADD ("%llu", vl->values[ds_num].counter); + else if (ds->ds[ds_num].type == DS_TYPE_DERIVE) + BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive); + else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE) + BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute); + else + { + ERROR ("gr_format_values plugin: Unknown data source type: %i", + ds->ds[ds_num].type); + return (-1); + } + +#undef BUFFER_ADD + + return (0); +} + +static void copy_escape_part (char *dst, const char *src, size_t dst_len, + char escape_char) +{ + size_t i; + + memset (dst, 0, dst_len); + + if (src == NULL) + return; + + for (i = 0; i < dst_len; i++) + { + if (src[i] == 0) + { + dst[i] = 0; + break; + } + + if ((src[i] == '.') + || isspace ((int) src[i]) + || iscntrl ((int) src[i])) + dst[i] = escape_char; + else + dst[i] = src[i]; + } +} + +static int gr_format_name (char *ret, int ret_len, + const value_list_t *vl, + const char *ds_name, + char *prefix, + char *postfix, + char escape_char) +{ + char n_host[DATA_MAX_NAME_LEN]; + char n_plugin[DATA_MAX_NAME_LEN]; + char n_plugin_instance[DATA_MAX_NAME_LEN]; + char n_type[DATA_MAX_NAME_LEN]; + char n_type_instance[DATA_MAX_NAME_LEN]; + + char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1]; + char tmp_type[2 * DATA_MAX_NAME_LEN + 1]; + + if (prefix == NULL) + prefix = ""; + + if (postfix == NULL) + postfix = ""; + + copy_escape_part (n_host, vl->host, + sizeof (n_host), escape_char); + copy_escape_part (n_plugin, vl->plugin, + sizeof (n_plugin), escape_char); + copy_escape_part (n_plugin_instance, vl->plugin_instance, + sizeof (n_plugin_instance), escape_char); + copy_escape_part (n_type, vl->type, + sizeof (n_type), escape_char); + copy_escape_part (n_type_instance, vl->type_instance, + sizeof (n_type_instance), escape_char); + + if (n_plugin_instance[0] != '\0') + ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s", + n_plugin, + '-', + n_plugin_instance); + else + sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin)); + + if (n_type_instance[0] != '\0') + ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s", + n_type, + '-', + n_type_instance); + else + sstrncpy (tmp_type, n_type, sizeof (tmp_type)); + + if (ds_name != NULL) + ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s", + prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name); + else + ssnprintf (ret, ret_len, "%s%s%s.%s.%s", + prefix, n_host, postfix, tmp_plugin, tmp_type); + + return (0); +} + +int format_graphite (char *buffer, size_t buffer_size, + const data_set_t *ds, const value_list_t *vl, char *prefix, + char *postfix, char escape_char) +{ + int status = 0; + int i; + int buffer_pos = 0; + + for (i = 0; i < ds->ds_num; i++) + { + const char *ds_name = NULL; + char key[10*DATA_MAX_NAME_LEN]; + char values[512]; + size_t message_len; + char message[1024]; + + ds_name = ds->ds[i].name; + + /* Copy the identifier to `key' and escape it. */ + status = gr_format_name (key, sizeof (key), vl, ds_name, + prefix, postfix, escape_char); + if (status != 0) + { + ERROR ("amqp plugin: error with gr_format_name"); + return (status); + } + + escape_string (key, sizeof (key)); + /* Convert the values to an ASCII representation and put that into + * `values'. */ + status = gr_format_values (values, sizeof (values), i, ds, vl); + if (status != 0) + { + ERROR ("format_graphite: error with gr_format_values"); + return (status); + } + + /* Compute the graphite command */ + message_len = (size_t) ssnprintf (message, sizeof (message), + "%s %s %u\r\n", + key, + values, + (unsigned int) CDTIME_T_TO_TIME_T (vl->time)); + if (message_len >= sizeof (message)) { + ERROR ("format_graphite: message buffer too small: " + "Need %zu bytes.", message_len + 1); + return (-ENOMEM); + } + + /* Append it in case we got multiple data set */ + if ((buffer_pos + message_len) >= buffer_size) + { + ERROR ("format_graphite: target buffer too small"); + return (-ENOMEM); + } + memcpy((void *) (buffer + buffer_pos), message, message_len); + buffer_pos += message_len; + } + return (status); +} /* int format_graphite */ + +/* vim: set sw=2 sts=2 et fdm=marker : */ diff --git a/src/utils_format_graphite.h b/src/utils_format_graphite.h new file mode 100644 index 00000000..a3c4d85c --- /dev/null +++ b/src/utils_format_graphite.h @@ -0,0 +1,33 @@ +/** + * collectd - src/utils_format_graphite.h + * Copyright (C) 2012 Thomas Meson + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Author: + * Thomas Meson + **/ + +#ifndef UTILS_FORMAT_GRAPHITE_H +#define UTILS_FORMAT_GRAPHITE_H 1 + +#include "collectd.h" +#include "plugin.h" + +int format_graphite (char *buffer, + size_t buffer_size, const data_set_t *ds, + const value_list_t *vl, const char *prefix, + const char *postfix, const char escape_char); + +#endif /* UTILS_FORMAT_GRAPHITE_H */ -- 2.30.2