From 03ed4b711ea23607b255e8a187070133e63184cb Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Sat, 23 Jun 2012 20:50:27 +0200 Subject: [PATCH] aggregation plugin: First working prototype. --- configure.in | 2 + src/Makefile.am | 10 ++ src/aggregation.c | 419 +++++++++++++++++++++++++++++++++++++++++++ src/collectd.conf.in | 18 ++ 4 files changed, 449 insertions(+) create mode 100644 src/aggregation.c diff --git a/configure.in b/configure.in index 1cfc2fca..cc3d4c4e 100644 --- a/configure.in +++ b/configure.in @@ -4774,6 +4774,7 @@ AC_ARG_ENABLE([all-plugins], m4_divert_once([HELP_ENABLE], []) +AC_PLUGIN([aggregation], [yes], [Aggregation plugin]) AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin]) AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics]) AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC]) @@ -5106,6 +5107,7 @@ Configuration: perl . . . . . . . . $with_perl_bindings Modules: + aggregation . . . . . $enable_aggregation amqp . . . . . . . $enable_amqp apache . . . . . . . $enable_apache apcups . . . . . . . $enable_apcups diff --git a/src/Makefile.am b/src/Makefile.am index e110d88b..9d528c11 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -120,6 +120,16 @@ pkglib_LTLIBRARIES = BUILT_SOURCES = CLEANFILES = +if BUILD_PLUGIN_AGGREGATION +pkglib_LTLIBRARIES += aggregation.la +aggregation_la_SOURCES = aggregation.c \ + utils_vl_lookup.c utils_vl_lookup.h +aggregation_la_LDFLAGS = -module -avoid-version +aggregation_la_LIBADD = +collectd_LDADD += "-dlopen" aggregation.la +collectd_DEPENDENCIES += aggregation.la +endif + if BUILD_PLUGIN_AMQP pkglib_LTLIBRARIES += amqp.la amqp_la_SOURCES = amqp.c \ diff --git a/src/aggregation.c b/src/aggregation.c new file mode 100644 index 00000000..2e13766e --- /dev/null +++ b/src/aggregation.c @@ -0,0 +1,419 @@ +/** + * collectd - src/aggregation.c + * Copyright (C) 2012 Florian Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Florian Forster + **/ + +#include "collectd.h" +#include "plugin.h" +#include "common.h" +#include "configfile.h" +#include "meta_data.h" +#include "utils_cache.h" /* for uc_get_rate() */ +#include "utils_vl_lookup.h" + +#include + +struct aggregation_s /* {{{ */ +{ + identifier_t ident; + + _Bool calc_num; + _Bool calc_sum; + _Bool calc_average; + _Bool calc_min; + _Bool calc_max; + _Bool calc_stddev; +}; /* }}} */ +typedef struct aggregation_s aggregation_t; + +struct agg_instance_s; +typedef struct agg_instance_s agg_instance_t; +struct agg_instance_s /* {{{ */ +{ + identifier_t ident; + + int ds_type; + + derive_t num; + gauge_t sum; + gauge_t squares_sum; + + gauge_t min; + gauge_t max; + + rate_to_value_state_t *state_num; + rate_to_value_state_t *state_sum; + rate_to_value_state_t *state_average; + rate_to_value_state_t *state_min; + rate_to_value_state_t *state_max; + rate_to_value_state_t *state_stddev; + + agg_instance_t *next; +}; /* }}} */ + +static lookup_t *lookup = NULL; + +static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER; +static agg_instance_t *agg_instance_list_head = NULL; + +static void agg_destroy (aggregation_t *agg) /* {{{ */ +{ + sfree (agg); +} /* }}} void agg_destroy */ + +/* Frees all dynamically allocated memory within the instance. */ +static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */ +{ + if (inst == NULL) + return; + + /* Remove this instance from the global list of instances. */ + pthread_mutex_lock (&agg_instance_list_lock); + if (agg_instance_list_head == inst) + agg_instance_list_head = inst->next; + else if (agg_instance_list_head != NULL) + { + agg_instance_t *prev = agg_instance_list_head; + while ((prev != NULL) && (prev->next != inst)) + prev = prev->next; + if (prev != NULL) + prev->next = inst->next; + } + pthread_mutex_unlock (&agg_instance_list_lock); + + sfree (inst->state_num); + sfree (inst->state_sum); + sfree (inst->state_average); + sfree (inst->state_min); + sfree (inst->state_max); + sfree (inst->state_stddev); + + memset (inst, 0, sizeof (*inst)); + inst->ds_type = -1; + inst->min = NAN; + inst->max = NAN; +} /* }}} void agg_instance_destroy */ + +/* Create a new aggregation instance. */ +static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */ + aggregation_t *agg) +{ + agg_instance_t *inst; + + DEBUG ("aggregation plugin: Creating new instance."); + + inst = malloc (sizeof (*inst)); + if (inst == NULL) + { + ERROR ("aggregation plugin: malloc() failed."); + return (NULL); + } + memset (inst, 0, sizeof (*inst)); + +#define COPY_FIELD(fld) do { \ + sstrncpy (inst->ident.fld, \ + LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \ + sizeof (inst->ident.fld)); \ +} while (0) + + COPY_FIELD (host); + COPY_FIELD (plugin); + COPY_FIELD (plugin_instance); + COPY_FIELD (type); + COPY_FIELD (type_instance); + +#undef COPY_FIELD + + inst->ds_type = -1; + inst->min = NAN; + inst->max = NAN; + +#define INIT_STATE(field) do { \ + inst->state_ ## field = NULL; \ + if (agg->calc_ ## field) { \ + inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \ + if (inst->state_ ## field == NULL) { \ + agg_instance_destroy (inst); \ + ERROR ("aggregation plugin: malloc() failed."); \ + return (NULL); \ + } \ + memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \ + } \ +} while (0) + + INIT_STATE (num); + INIT_STATE (sum); + INIT_STATE (average); + INIT_STATE (min); + INIT_STATE (max); + INIT_STATE (stddev); + +#undef INIT_STATE + + pthread_mutex_lock (&agg_instance_list_lock); + inst->next = agg_instance_list_head; + agg_instance_list_head = inst; + pthread_mutex_unlock (&agg_instance_list_lock); + + return (inst); +} /* }}} agg_instance_t *agg_instance_create */ + +/* Update the num, sum, min, max, ... fields of the aggregation instance, if + * the rate of the value list is available. Value lists with more than one data + * source are not supported and will return an error. Returns zero on success + * and non-zero otherwise. */ +static int agg_instance_update (agg_instance_t *inst, /* {{{ */ + data_set_t const *ds, value_list_t const *vl) +{ + gauge_t *rate; + + if (ds->ds_num != 1) + return (-1); + + rate = uc_get_rate (ds, vl); + if (rate == NULL) + { + ERROR ("aggregation plugin: uc_get_rate() failed."); + return (-1); + } + + if (isnan (rate[0])) + { + sfree (rate); + return (0); + } + + inst->num++; + inst->sum += rate[0]; + inst->squares_sum += (rate[0] * rate[0]); + + if (isnan (inst->min) || (inst->min > rate[0])) + inst->min = rate[0]; + if (isnan (inst->max) || (inst->max < rate[0])) + inst->max = rate[0]; + + sfree (rate); + return (0); +} /* }}} int agg_instance_update */ + +/* lookup_class_callback_t for utils_vl_lookup */ +static void *agg_lookup_class_callback ( /* {{{ */ + __attribute__((unused)) data_set_t const *ds, + value_list_t const *vl, void *user_class) +{ + return (agg_instance_create (vl, (aggregation_t *) user_class)); +} /* }}} void *agg_class_callback */ + +/* lookup_obj_callback_t for utils_vl_lookup */ +static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */ + value_list_t const *vl, + __attribute__((unused)) void *user_class, + void *user_obj) +{ + return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl)); +} /* }}} int agg_lookup_obj_callback */ + +/* lookup_free_class_callback_t for utils_vl_lookup */ +static void agg_lookup_free_class_callback (void *user_class) /* {{{ */ +{ + agg_destroy ((aggregation_t *) user_class); +} /* }}} void agg_lookup_free_class_callback */ + +/* lookup_free_obj_callback_t for utils_vl_lookup */ +static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */ +{ + agg_instance_destroy ((agg_instance_t *) user_obj); +} /* }}} void agg_lookup_free_obj_callback */ + +/* + * + * + * Host "/any/" + * Plugin "cpu" + * PluginInstance "/all/" + * Type "cpu" + * TypeInstance "/any/" + * + * CalculateNum true + * CalculateSum true + * CalculateAverage true + * CalculateMinimum true + * CalculateMaximum true + * CalculateStddev true + * + * + */ +static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ +{ + aggregation_t *agg; + int status; + int i; + + agg = malloc (sizeof (*agg)); + if (agg == NULL) + { + ERROR ("aggregation plugin: malloc failed."); + return (-1); + } + memset (agg, 0, sizeof (*agg)); + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Host", child->key) == 0) + cf_util_get_string_buffer (child, agg->ident.host, + sizeof (agg->ident.host)); + else if (strcasecmp ("Plugin", child->key) == 0) + cf_util_get_string_buffer (child, agg->ident.plugin, + sizeof (agg->ident.plugin)); + else if (strcasecmp ("PluginInstance", child->key) == 0) + cf_util_get_string_buffer (child, agg->ident.plugin_instance, + sizeof (agg->ident.plugin_instance)); + else if (strcasecmp ("Type", child->key) == 0) + cf_util_get_string_buffer (child, agg->ident.type, + sizeof (agg->ident.type)); + else if (strcasecmp ("TypeInstance", child->key) == 0) + cf_util_get_string_buffer (child, agg->ident.type_instance, + sizeof (agg->ident.type_instance)); + else if (strcasecmp ("CalculateNum", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_num); + else if (strcasecmp ("CalculateSum", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_sum); + else if (strcasecmp ("CalculateAverage", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_average); + else if (strcasecmp ("CalculateMinimum", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_min); + else if (strcasecmp ("CalculateMaximum", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_max); + else if (strcasecmp ("CalculateStddev", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_stddev); + else + WARNING ("aggregation plugin: The \"%s\" key is not allowed inside " + " blocks and will be ignored.", child->key); + } + + /* TODO(octo): Check identifier: + * - At least one wildcard. + * - Type is set. + */ + + status = lookup_add (lookup, &agg->ident, agg); + if (status != 0) + { + ERROR ("aggregation plugin: lookup_add failed with status %i.", status); + sfree (agg); + return (-1); + } + + return (0); +} /* }}} int agg_config_aggregation */ + +static int agg_config (oconfig_item_t *ci) /* {{{ */ +{ + int i; + + if (lookup == NULL) + { + lookup = lookup_create (agg_lookup_class_callback, + agg_lookup_obj_callback, + agg_lookup_free_class_callback, + agg_lookup_free_obj_callback); + if (lookup == NULL) + { + ERROR ("aggregation plugin: lookup_create failed."); + return (-1); + } + } + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Aggregation", child->key) == 0) + agg_config_aggregation (child); + else + WARNING ("aggregation plugin: The \"%s\" key is not allowed inside " + " blocks and will be ignored.", child->key); + } + + return (0); +} /* }}} int agg_config */ + +static int agg_read (void) /* {{{ */ +{ + agg_instance_t *this; + size_t i = 0; + + pthread_mutex_lock (&agg_instance_list_lock); + for (this = agg_instance_list_head; this != NULL; this = this->next) + { + DEBUG ("aggregation plugin: Handling instance: host = \"%s\", " + "plugin = \"%s\", plugin_instance = \"%s\", " + "type = \"%s\", type_instance = \"%s\"", + this->ident.host, + this->ident.plugin, this->ident.plugin_instance, + this->ident.type, this->ident.type_instance); + i++; + } + pthread_mutex_unlock (&agg_instance_list_lock); + + DEBUG ("aggregation plugin: There are currently %zu instances.", i); + + return (0); +} /* }}} int agg_read */ + +static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */ + __attribute__((unused)) user_data_t *user_data) +{ + _Bool created_by_aggregation = 0; + int status; + + /* Ignore values that were created by the aggregation plugin to avoid weird + * effects. */ + (void) meta_data_get_boolean (vl->meta, "aggregation:created", + &created_by_aggregation); + if (created_by_aggregation) + return (0); + + if (lookup == NULL) + status = ENOENT; + else + { + status = lookup_search (lookup, ds, vl); + if (status > 0) + status = 0; + } + + return (status); +} /* }}} int agg_write */ + +void module_register (void) +{ + plugin_register_complex_config ("aggregation", agg_config); + plugin_register_read ("aggregation", agg_read); + plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL); +} + +/* vim: set sw=2 sts=2 tw=78 et fdm=marker : */ diff --git a/src/collectd.conf.in b/src/collectd.conf.in index f3ef6759..102e116d 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -52,6 +52,7 @@ # to missing dependencies or because they have been deactivated explicitly. # ############################################################################## +#@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation #@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups @@ -161,6 +162,23 @@ # ription of those options is available in the collectd.conf(5) manual page. # ############################################################################## +# +# +# Host "/any/" +# Plugin "example" +# PluginInstance "/all/" +# Type "gauge" +# TypeInstance "/any/" +# +# CalculateNum false +# CalculateSum false +# CalculateAverage true +# CalculateMinimum false +# CalculateMaximum false +# CalculateStddev false +# +# + # # # Host "localhost" -- 2.30.2