Code

aggregation plugin: First working prototype.
authorFlorian Forster <octo@collectd.org>
Sat, 23 Jun 2012 18:50:27 +0000 (20:50 +0200)
committerFlorian Forster <octo@collectd.org>
Sat, 23 Jun 2012 18:50:27 +0000 (20:50 +0200)
configure.in
src/Makefile.am
src/aggregation.c [new file with mode: 0644]
src/collectd.conf.in

index 1cfc2fcae20b4f2be5a2064c0c1174ab096a5bb2..cc3d4c4ed09ba5a841c1d3c7ef60644b316f28d6 100644 (file)
@@ -4774,6 +4774,7 @@ AC_ARG_ENABLE([all-plugins],
 
 m4_divert_once([HELP_ENABLE], [])
 
+AC_PLUGIN([aggregation], [yes],                [Aggregation plugin])
 AC_PLUGIN([amqp],        [$with_librabbitmq],  [AMQP output plugin])
 AC_PLUGIN([apache],      [$with_libcurl],      [Apache httpd statistics])
 AC_PLUGIN([apcups],      [yes],                [Statistics of UPSes by APC])
@@ -5106,6 +5107,7 @@ Configuration:
     perl  . . . . . . . . $with_perl_bindings
 
   Modules:
+    aggregation . . . . . $enable_aggregation
     amqp    . . . . . . . $enable_amqp
     apache  . . . . . . . $enable_apache
     apcups  . . . . . . . $enable_apcups
index e110d88b290dbb365e4f11e7a505a847c5c21305..9d528c11c2a95acfffbbaf88b9eaba6380efd622 100644 (file)
@@ -120,6 +120,16 @@ pkglib_LTLIBRARIES =
 BUILT_SOURCES = 
 CLEANFILES = 
 
+if BUILD_PLUGIN_AGGREGATION
+pkglib_LTLIBRARIES += aggregation.la
+aggregation_la_SOURCES = aggregation.c \
+                         utils_vl_lookup.c utils_vl_lookup.h
+aggregation_la_LDFLAGS = -module -avoid-version
+aggregation_la_LIBADD =
+collectd_LDADD += "-dlopen" aggregation.la
+collectd_DEPENDENCIES += aggregation.la
+endif
+
 if BUILD_PLUGIN_AMQP
 pkglib_LTLIBRARIES += amqp.la
 amqp_la_SOURCES = amqp.c \
diff --git a/src/aggregation.c b/src/aggregation.c
new file mode 100644 (file)
index 0000000..2e13766
--- /dev/null
@@ -0,0 +1,419 @@
+/**
+ * collectd - src/aggregation.c
+ * Copyright (C) 2012       Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+#include "configfile.h"
+#include "meta_data.h"
+#include "utils_cache.h" /* for uc_get_rate() */
+#include "utils_vl_lookup.h"
+
+#include <pthread.h>
+
+struct aggregation_s /* {{{ */
+{
+  identifier_t ident;
+
+  _Bool calc_num;
+  _Bool calc_sum;
+  _Bool calc_average;
+  _Bool calc_min;
+  _Bool calc_max;
+  _Bool calc_stddev;
+}; /* }}} */
+typedef struct aggregation_s aggregation_t;
+
+struct agg_instance_s;
+typedef struct agg_instance_s agg_instance_t;
+struct agg_instance_s /* {{{ */
+{
+  identifier_t ident;
+
+  int ds_type;
+
+  derive_t num;
+  gauge_t sum;
+  gauge_t squares_sum;
+
+  gauge_t min;
+  gauge_t max;
+
+  rate_to_value_state_t *state_num;
+  rate_to_value_state_t *state_sum;
+  rate_to_value_state_t *state_average;
+  rate_to_value_state_t *state_min;
+  rate_to_value_state_t *state_max;
+  rate_to_value_state_t *state_stddev;
+
+  agg_instance_t *next;
+}; /* }}} */
+
+static lookup_t *lookup = NULL;
+
+static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
+static agg_instance_t *agg_instance_list_head = NULL;
+
+static void agg_destroy (aggregation_t *agg) /* {{{ */
+{
+  sfree (agg);
+} /* }}} void agg_destroy */
+
+/* Frees all dynamically allocated memory within the instance. */
+static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
+{
+  if (inst == NULL)
+    return;
+
+  /* Remove this instance from the global list of instances. */
+  pthread_mutex_lock (&agg_instance_list_lock);
+  if (agg_instance_list_head == inst)
+    agg_instance_list_head = inst->next;
+  else if (agg_instance_list_head != NULL)
+  {
+    agg_instance_t *prev = agg_instance_list_head;
+    while ((prev != NULL) && (prev->next != inst))
+      prev = prev->next;
+    if (prev != NULL)
+      prev->next = inst->next;
+  }
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  sfree (inst->state_num);
+  sfree (inst->state_sum);
+  sfree (inst->state_average);
+  sfree (inst->state_min);
+  sfree (inst->state_max);
+  sfree (inst->state_stddev);
+
+  memset (inst, 0, sizeof (*inst));
+  inst->ds_type = -1;
+  inst->min = NAN;
+  inst->max = NAN;
+} /* }}} void agg_instance_destroy */
+
+/* Create a new aggregation instance. */
+static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */
+    aggregation_t *agg)
+{
+  agg_instance_t *inst;
+
+  DEBUG ("aggregation plugin: Creating new instance.");
+
+  inst = malloc (sizeof (*inst));
+  if (inst == NULL)
+  {
+    ERROR ("aggregation plugin: malloc() failed.");
+    return (NULL);
+  }
+  memset (inst, 0, sizeof (*inst));
+
+#define COPY_FIELD(fld) do { \
+  sstrncpy (inst->ident.fld, \
+      LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
+      sizeof (inst->ident.fld)); \
+} while (0)
+
+  COPY_FIELD (host);
+  COPY_FIELD (plugin);
+  COPY_FIELD (plugin_instance);
+  COPY_FIELD (type);
+  COPY_FIELD (type_instance);
+
+#undef COPY_FIELD
+
+  inst->ds_type = -1;
+  inst->min = NAN;
+  inst->max = NAN;
+
+#define INIT_STATE(field) do { \
+  inst->state_ ## field = NULL; \
+  if (agg->calc_ ## field) { \
+    inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
+    if (inst->state_ ## field == NULL) { \
+      agg_instance_destroy (inst); \
+      ERROR ("aggregation plugin: malloc() failed."); \
+      return (NULL); \
+    } \
+    memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
+  } \
+} while (0)
+
+  INIT_STATE (num);
+  INIT_STATE (sum);
+  INIT_STATE (average);
+  INIT_STATE (min);
+  INIT_STATE (max);
+  INIT_STATE (stddev);
+
+#undef INIT_STATE
+
+  pthread_mutex_lock (&agg_instance_list_lock);
+  inst->next = agg_instance_list_head;
+  agg_instance_list_head = inst;
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  return (inst);
+} /* }}} agg_instance_t *agg_instance_create */
+
+/* Update the num, sum, min, max, ... fields of the aggregation instance, if
+ * the rate of the value list is available. Value lists with more than one data
+ * source are not supported and will return an error. Returns zero on success
+ * and non-zero otherwise. */
+static int agg_instance_update (agg_instance_t *inst, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl)
+{
+  gauge_t *rate;
+
+  if (ds->ds_num != 1)
+    return (-1);
+
+  rate = uc_get_rate (ds, vl);
+  if (rate == NULL)
+  {
+    ERROR ("aggregation plugin: uc_get_rate() failed.");
+    return (-1);
+  }
+
+  if (isnan (rate[0]))
+  {
+    sfree (rate);
+    return (0);
+  }
+
+  inst->num++;
+  inst->sum += rate[0];
+  inst->squares_sum += (rate[0] * rate[0]);
+
+  if (isnan (inst->min) || (inst->min > rate[0]))
+    inst->min = rate[0];
+  if (isnan (inst->max) || (inst->max < rate[0]))
+    inst->max = rate[0];
+
+  sfree (rate);
+  return (0);
+} /* }}} int agg_instance_update */
+
+/* lookup_class_callback_t for utils_vl_lookup */
+static void *agg_lookup_class_callback ( /* {{{ */
+    __attribute__((unused)) data_set_t const *ds,
+    value_list_t const *vl, void *user_class)
+{
+  return (agg_instance_create (vl, (aggregation_t *) user_class));
+} /* }}} void *agg_class_callback */
+
+/* lookup_obj_callback_t for utils_vl_lookup */
+static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
+    value_list_t const *vl,
+    __attribute__((unused)) void *user_class,
+    void *user_obj)
+{
+  return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
+} /* }}} int agg_lookup_obj_callback */
+
+/* lookup_free_class_callback_t for utils_vl_lookup */
+static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
+{
+  agg_destroy ((aggregation_t *) user_class);
+} /* }}} void agg_lookup_free_class_callback */
+
+/* lookup_free_obj_callback_t for utils_vl_lookup */
+static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
+{
+  agg_instance_destroy ((agg_instance_t *) user_obj);
+} /* }}} void agg_lookup_free_obj_callback */
+
+/*
+ * <Plugin "aggregation">
+ *   <Aggregation>
+ *     Host "/any/"
+ *     Plugin "cpu"
+ *     PluginInstance "/all/"
+ *     Type "cpu"
+ *     TypeInstance "/any/"
+ *
+ *     CalculateNum true
+ *     CalculateSum true
+ *     CalculateAverage true
+ *     CalculateMinimum true
+ *     CalculateMaximum true
+ *     CalculateStddev true
+ *   </Aggregation>
+ * </Plugin>
+ */
+static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
+{
+  aggregation_t *agg;
+  int status;
+  int i;
+
+  agg = malloc (sizeof (*agg));
+  if (agg == NULL)
+  {
+    ERROR ("aggregation plugin: malloc failed.");
+    return (-1);
+  }
+  memset (agg, 0, sizeof (*agg));
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Host", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.host,
+          sizeof (agg->ident.host));
+    else if (strcasecmp ("Plugin", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.plugin,
+          sizeof (agg->ident.plugin));
+    else if (strcasecmp ("PluginInstance", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.plugin_instance,
+          sizeof (agg->ident.plugin_instance));
+    else if (strcasecmp ("Type", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.type,
+          sizeof (agg->ident.type));
+    else if (strcasecmp ("TypeInstance", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.type_instance,
+          sizeof (agg->ident.type_instance));
+    else if (strcasecmp ("CalculateNum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_num);
+    else if (strcasecmp ("CalculateSum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_sum);
+    else if (strcasecmp ("CalculateAverage", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_average);
+    else if (strcasecmp ("CalculateMinimum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_min);
+    else if (strcasecmp ("CalculateMaximum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_max);
+    else if (strcasecmp ("CalculateStddev", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_stddev);
+    else
+      WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
+          "<Aggregation /> blocks and will be ignored.", child->key);
+  }
+
+  /* TODO(octo): Check identifier:
+   * - At least one wildcard.
+   * - Type is set.
+   */
+
+  status = lookup_add (lookup, &agg->ident, agg);
+  if (status != 0)
+  {
+    ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
+    sfree (agg);
+    return (-1);
+  }
+
+  return (0);
+} /* }}} int agg_config_aggregation */
+
+static int agg_config (oconfig_item_t *ci) /* {{{ */
+{
+  int i;
+
+  if (lookup == NULL)
+  {
+    lookup = lookup_create (agg_lookup_class_callback,
+        agg_lookup_obj_callback,
+        agg_lookup_free_class_callback,
+        agg_lookup_free_obj_callback);
+    if (lookup == NULL)
+    {
+      ERROR ("aggregation plugin: lookup_create failed.");
+      return (-1);
+    }
+  }
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Aggregation", child->key) == 0)
+      agg_config_aggregation (child);
+    else
+      WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
+          "<Plugin aggregation /> blocks and will be ignored.", child->key);
+  }
+
+  return (0);
+} /* }}} int agg_config */
+
+static int agg_read (void) /* {{{ */
+{
+  agg_instance_t *this;
+  size_t i = 0;
+
+  pthread_mutex_lock (&agg_instance_list_lock);
+  for (this = agg_instance_list_head; this != NULL; this = this->next)
+  {
+    DEBUG ("aggregation plugin: Handling instance: host = \"%s\", "
+        "plugin = \"%s\", plugin_instance = \"%s\", "
+        "type = \"%s\", type_instance = \"%s\"",
+        this->ident.host,
+        this->ident.plugin, this->ident.plugin_instance,
+        this->ident.type, this->ident.type_instance);
+    i++;
+  }
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  DEBUG ("aggregation plugin: There are currently %zu instances.", i);
+
+  return (0);
+} /* }}} int agg_read */
+
+static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
+    __attribute__((unused)) user_data_t *user_data)
+{
+  _Bool created_by_aggregation = 0;
+  int status;
+
+  /* Ignore values that were created by the aggregation plugin to avoid weird
+   * effects. */
+  (void) meta_data_get_boolean (vl->meta, "aggregation:created",
+      &created_by_aggregation);
+  if (created_by_aggregation)
+    return (0);
+
+  if (lookup == NULL)
+    status = ENOENT;
+  else
+  {
+    status = lookup_search (lookup, ds, vl);
+    if (status > 0)
+      status = 0;
+  }
+
+  return (status);
+} /* }}} int agg_write */
+
+void module_register (void)
+{
+  plugin_register_complex_config ("aggregation", agg_config);
+  plugin_register_read ("aggregation", agg_read);
+  plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
+}
+
+/* vim: set sw=2 sts=2 tw=78 et fdm=marker : */
index f3ef6759b1fc7ee6573e7b621076954be7fbc503..102e116d26be88c81dd70da1d73158aab05dfd6f 100644 (file)
@@ -52,6 +52,7 @@
 # to missing dependencies or because they have been deactivated explicitly.  #
 ##############################################################################
 
+#@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation
 #@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp
 #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache
 #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups
 # ription of those options is available in the collectd.conf(5) manual page. #
 ##############################################################################
 
+#<Plugin "aggregation">
+#  <Aggregation>
+#    Host "/any/"
+#    Plugin "example"
+#    PluginInstance "/all/"
+#    Type "gauge"
+#    TypeInstance "/any/"
+#
+#    CalculateNum false
+#    CalculateSum false
+#    CalculateAverage true
+#    CalculateMinimum false
+#    CalculateMaximum false
+#    CalculateStddev false
+#  </Aggregation>
+#</Plugin>
+
 #<Plugin "amqp">
 #  <Publish "name">
 #    Host "localhost"