Code

Merge remote-tracking branch 'github/pr/387'
[collectd.git] / src / aggregation.c
index db33c177c039fa1676d9fcdc690b47d428f9a82b..0c0f19d6fe1585301443754bf6bc8933511d8bc5 100644 (file)
  **/
 
 #include "collectd.h"
+
+#include <pthread.h>
+
 #include "plugin.h"
 #include "common.h"
 #include "configfile.h"
 #include "meta_data.h"
 #include "utils_cache.h" /* for uc_get_rate() */
+#include "utils_subst.h"
 #include "utils_vl_lookup.h"
 
-#include <pthread.h>
+#define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0)
+#define AGG_FUNC_PLACEHOLDER "%{aggregation}"
 
 struct aggregation_s /* {{{ */
 {
   identifier_t ident;
+  unsigned int group_by;
+
+  unsigned int regex_fields;
+
+  char *set_host;
+  char *set_plugin;
+  char *set_plugin_instance;
+  char *set_type_instance;
 
   _Bool calc_num;
   _Bool calc_sum;
@@ -78,6 +91,23 @@ static lookup_t *lookup = NULL;
 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
 static agg_instance_t *agg_instance_list_head = NULL;
 
+static _Bool agg_is_regex (char const *str) /* {{{ */
+{
+  size_t len;
+
+  if (str == NULL)
+    return (0);
+
+  len = strlen (str);
+  if (len < 3)
+    return (0);
+
+  if ((str[0] == '/') && (str[len - 1] == '/'))
+    return (1);
+  else
+    return (0);
+} /* }}} _Bool agg_is_regex */
+
 static void agg_destroy (aggregation_t *agg) /* {{{ */
 {
   sfree (agg);
@@ -116,6 +146,92 @@ static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
   inst->max = NAN;
 } /* }}} void agg_instance_destroy */
 
+static int agg_instance_create_name (agg_instance_t *inst, /* {{{ */
+    value_list_t const *vl, aggregation_t const *agg)
+{
+#define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) do { \
+  if (agg->set_ ## field != NULL) \
+    sstrncpy (buffer, agg->set_ ## field, buffer_size); \
+  else if ((agg->regex_fields & group_mask) \
+      && (agg->group_by & group_mask)) \
+    sstrncpy (buffer, vl->field, buffer_size); \
+  else if ((agg->regex_fields & group_mask) \
+      && (AGG_MATCHES_ALL (agg->ident.field))) \
+    sstrncpy (buffer, all_value, buffer_size); \
+  else \
+    sstrncpy (buffer, agg->ident.field, buffer_size); \
+} while (0)
+
+  /* Host */
+  COPY_FIELD (inst->ident.host, sizeof (inst->ident.host),
+      host, LU_GROUP_BY_HOST, "global");
+
+  /* Plugin */
+  if (agg->set_plugin != NULL)
+    sstrncpy (inst->ident.plugin, agg->set_plugin,
+        sizeof (inst->ident.plugin));
+  else
+    sstrncpy (inst->ident.plugin, "aggregation", sizeof (inst->ident.plugin));
+
+  /* Plugin instance */
+  if (agg->set_plugin_instance != NULL)
+    sstrncpy (inst->ident.plugin_instance, agg->set_plugin_instance,
+        sizeof (inst->ident.plugin_instance));
+  else
+  {
+    char tmp_plugin[DATA_MAX_NAME_LEN];
+    char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
+
+    if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
+        && (agg->group_by & LU_GROUP_BY_PLUGIN))
+      sstrncpy (tmp_plugin, vl->plugin, sizeof (tmp_plugin));
+    else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
+        && (AGG_MATCHES_ALL (agg->ident.plugin)))
+      sstrncpy (tmp_plugin, "", sizeof (tmp_plugin));
+    else
+      sstrncpy (tmp_plugin, agg->ident.plugin, sizeof (tmp_plugin));
+
+    if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
+        && (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
+      sstrncpy (tmp_plugin_instance, vl->plugin_instance,
+          sizeof (tmp_plugin_instance));
+    else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
+        && (AGG_MATCHES_ALL (agg->ident.plugin_instance)))
+      sstrncpy (tmp_plugin_instance, "", sizeof (tmp_plugin_instance));
+    else
+      sstrncpy (tmp_plugin_instance, agg->ident.plugin_instance,
+          sizeof (tmp_plugin_instance));
+
+    if ((strcmp ("", tmp_plugin) == 0)
+        && (strcmp ("", tmp_plugin_instance) == 0))
+      sstrncpy (inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
+          sizeof (inst->ident.plugin_instance));
+    else if (strcmp ("", tmp_plugin) != 0)
+      ssnprintf (inst->ident.plugin_instance,
+          sizeof (inst->ident.plugin_instance),
+          "%s-%s", tmp_plugin, AGG_FUNC_PLACEHOLDER);
+    else if (strcmp ("", tmp_plugin_instance) != 0)
+      ssnprintf (inst->ident.plugin_instance,
+          sizeof (inst->ident.plugin_instance),
+          "%s-%s", tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
+    else
+      ssnprintf (inst->ident.plugin_instance,
+          sizeof (inst->ident.plugin_instance),
+          "%s-%s-%s", tmp_plugin, tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
+  }
+
+  /* Type */
+  sstrncpy (inst->ident.type, agg->ident.type, sizeof (inst->ident.type));
+
+  /* Type instance */
+  COPY_FIELD (inst->ident.type_instance, sizeof (inst->ident.type_instance),
+      type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
+
+#undef COPY_FIELD
+
+  return (0);
+} /* }}} int agg_instance_create_name */
+
 /* Create a new aggregation instance. */
 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
     value_list_t const *vl, aggregation_t *agg)
@@ -135,19 +251,7 @@ static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
 
   inst->ds_type = ds->ds[0].type;
 
-#define COPY_FIELD(fld) do { \
-  sstrncpy (inst->ident.fld, \
-      LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
-      sizeof (inst->ident.fld)); \
-} while (0)
-
-  COPY_FIELD (host);
-  COPY_FIELD (plugin);
-  COPY_FIELD (plugin_instance);
-  COPY_FIELD (type);
-  COPY_FIELD (type_instance);
-
-#undef COPY_FIELD
+  agg_instance_create_name (inst, vl, agg);
 
   inst->min = NAN;
   inst->max = NAN;
@@ -240,8 +344,8 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
   int status;
 
   if (pi_prefix[0] != 0)
-    ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
-        pi_prefix, func);
+    subst_string (vl->plugin_instance, sizeof (vl->plugin_instance),
+        pi_prefix, AGG_FUNC_PLACEHOLDER, func);
   else
     sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
 
@@ -263,7 +367,7 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
   vl->values = &v;
   vl->values_len = 1;
 
-  plugin_dispatch_values_secure (vl);
+  plugin_dispatch_values (vl);
 
   vl->values = NULL;
   vl->values_len = 0;
@@ -274,7 +378,6 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
 {
   value_list_t vl = VALUE_LIST_INIT;
-  char pi_prefix[DATA_MAX_NAME_LEN];
 
   /* Pre-set all the fields in the value list that will not change per
    * aggregation type (sum, average, ...). The struct will be re-used and must
@@ -291,39 +394,16 @@ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
   }
   meta_data_add_boolean (vl.meta, "aggregation:created", 1);
 
-  if (LU_IS_ALL (inst->ident.host))
-    sstrncpy (vl.host, "global", sizeof (vl.host));
-  else
-    sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
-
-  sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
-
-  if (LU_IS_ALL (inst->ident.plugin))
-  {
-    if (LU_IS_ALL (inst->ident.plugin_instance))
-      sstrncpy (pi_prefix, "", sizeof (pi_prefix));
-    else
-      sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
-  }
-  else
-  {
-    if (LU_IS_ALL (inst->ident.plugin_instance))
-      sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
-    else
-      ssnprintf (pi_prefix, sizeof (pi_prefix),
-          "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
-  }
-
+  sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
+  sstrncpy (vl.plugin, inst->ident.plugin, sizeof (vl.plugin));
   sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
-
-  if (!LU_IS_ALL (inst->ident.type_instance))
-    sstrncpy (vl.type_instance, inst->ident.type_instance,
-        sizeof (vl.type_instance));
+  sstrncpy (vl.type_instance, inst->ident.type_instance,
+      sizeof (vl.type_instance));
 
 #define READ_FUNC(func, rate) do { \
   if (inst->state_ ## func != NULL) { \
     agg_instance_read_func (inst, #func, rate, \
-        inst->state_ ## func, &vl, pi_prefix, t); \
+        inst->state_ ## func, &vl, inst->ident.plugin_instance, t); \
   } \
 } while (0)
 
@@ -424,14 +504,13 @@ static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
     value = ci->values[i].value.string;
 
     if (strcasecmp ("Host", value) == 0)
-      sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host));
+      agg->group_by |= LU_GROUP_BY_HOST;
     else if (strcasecmp ("Plugin", value) == 0)
-      sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin));
+      agg->group_by |= LU_GROUP_BY_PLUGIN;
     else if (strcasecmp ("PluginInstance", value) == 0)
-      sstrncpy (agg->ident.plugin_instance, LU_ANY,
-          sizeof (agg->ident.plugin_instance));
+      agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
     else if (strcasecmp ("TypeInstance", value) == 0)
-      sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance));
+      agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
     else if (strcasecmp ("Type", value) == 0)
       ERROR ("aggregation plugin: Grouping by type is not supported.");
     else
@@ -457,12 +536,12 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
   }
   memset (agg, 0, sizeof (*agg));
 
-  sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host));
-  sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin));
-  sstrncpy (agg->ident.plugin_instance, LU_ALL,
+  sstrncpy (agg->ident.host, "/.*/", sizeof (agg->ident.host));
+  sstrncpy (agg->ident.plugin, "/.*/", sizeof (agg->ident.plugin));
+  sstrncpy (agg->ident.plugin_instance, "/.*/",
       sizeof (agg->ident.plugin_instance));
-  sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type));
-  sstrncpy (agg->ident.type_instance, LU_ALL,
+  sstrncpy (agg->ident.type, "/.*/", sizeof (agg->ident.type));
+  sstrncpy (agg->ident.type_instance, "/.*/",
       sizeof (agg->ident.type_instance));
 
   for (i = 0; i < ci->children_num; i++)
@@ -484,6 +563,14 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp ("TypeInstance", child->key) == 0)
       cf_util_get_string_buffer (child, agg->ident.type_instance,
           sizeof (agg->ident.type_instance));
+    else if (strcasecmp ("SetHost", child->key) == 0)
+      cf_util_get_string (child, &agg->set_host);
+    else if (strcasecmp ("SetPlugin", child->key) == 0)
+      cf_util_get_string (child, &agg->set_plugin);
+    else if (strcasecmp ("SetPluginInstance", child->key) == 0)
+      cf_util_get_string (child, &agg->set_plugin_instance);
+    else if (strcasecmp ("SetTypeInstance", child->key) == 0)
+      cf_util_get_string (child, &agg->set_type_instance);
     else if (strcasecmp ("GroupBy", child->key) == 0)
       agg_config_handle_group_by (child, agg);
     else if (strcasecmp ("CalculateNum", child->key) == 0)
@@ -503,9 +590,18 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
           "<Aggregation /> blocks and will be ignored.", child->key);
   }
 
+  if (agg_is_regex (agg->ident.host))
+    agg->regex_fields |= LU_GROUP_BY_HOST;
+  if (agg_is_regex (agg->ident.plugin))
+    agg->regex_fields |= LU_GROUP_BY_PLUGIN;
+  if (agg_is_regex (agg->ident.plugin_instance))
+    agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
+  if (agg_is_regex (agg->ident.type_instance))
+    agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
+
   /* Sanity checking */
   is_valid = 1;
-  if (LU_IS_ALL (agg->ident.type)) /* {{{ */
+  if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */
   {
     ERROR ("aggregation plugin: It appears you did not specify the required "
         "\"Type\" option in this aggregation. "
@@ -518,20 +614,31 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
   else if (strchr (agg->ident.type, '/') != NULL)
   {
     ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
-        "character. Especially, it may not be a wildcard. The current "
+        "character. Especially, it may not be a regex. The current "
         "value is \"%s\".", agg->ident.type);
     is_valid = 0;
   } /* }}} */
 
-  if (!LU_IS_ALL (agg->ident.host) /* {{{ */
-      && !LU_IS_ALL (agg->ident.plugin)
-      && !LU_IS_ALL (agg->ident.plugin_instance)
-      && !LU_IS_ALL (agg->ident.type_instance))
+  /* Check that there is at least one regex field without a grouping. {{{ */
+  if ((agg->regex_fields & ~agg->group_by) == 0)
   {
     ERROR ("aggregation plugin: An aggregation must contain at least one "
         "wildcard. This is achieved by leaving at least one of the \"Host\", "
         "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
-        "and not grouping by that field. "
+        "or using a regular expression and not grouping by that field. "
+        "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+        "Type \"%s\", TypeInstance \"%s\")",
+        agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+        agg->ident.type, agg->ident.type_instance);
+    is_valid = 0;
+  } /* }}} */
+
+  /* Check that all grouping fields are regular expressions. {{{ */
+  if (agg->group_by & ~agg->regex_fields)
+  {
+    ERROR ("aggregation plugin: Only wildcard fields (fields for which a "
+        "regular expression is configured or which are left blank) can be "
+        "specified in the \"GroupBy\" option. "
         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
         "Type \"%s\", TypeInstance \"%s\")",
         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
@@ -557,7 +664,7 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
     return (-1);
   } /* }}} */
 
-  status = lookup_add (lookup, &agg->ident, agg);
+  status = lookup_add (lookup, &agg->ident, agg->group_by, agg);
   if (status != 0)
   {
     ERROR ("aggregation plugin: lookup_add failed with status %i.", status);