diff --git a/src/aggregation.c b/src/aggregation.c
index 2e13766e6d0f4ba400eae84abd8d52f6d2a446b6..db33c177c039fa1676d9fcdc690b47d428f9a82b 100644 (file)
--- a/src/aggregation.c
+++ b/src/aggregation.c
typedef struct agg_instance_s agg_instance_t;
struct agg_instance_s /* {{{ */
{
+ pthread_mutex_t lock;
identifier_t ident;
int ds_type;
} /* }}} void agg_instance_destroy */
/* Create a new aggregation instance. */
-static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */
- aggregation_t *agg)
+static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
+ value_list_t const *vl, aggregation_t *agg)
{
agg_instance_t *inst;
return (NULL);
}
memset (inst, 0, sizeof (*inst));
+ pthread_mutex_init (&inst->lock, /* attr = */ NULL);
+
+ inst->ds_type = ds->ds[0].type;
#define COPY_FIELD(fld) do { \
sstrncpy (inst->ident.fld, \
#undef COPY_FIELD
- inst->ds_type = -1;
inst->min = NAN;
inst->max = NAN;
gauge_t *rate;
if (ds->ds_num != 1)
- return (-1);
+ {
+ ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one "
+ "data source. This is currently not supported by this plugin. "
+ "Sorry.", ds->type);
+ return (EINVAL);
+ }
rate = uc_get_rate (ds, vl);
if (rate == NULL)
{
- ERROR ("aggregation plugin: uc_get_rate() failed.");
- return (-1);
+ char ident[6 * DATA_MAX_NAME_LEN];
+ FORMAT_VL (ident, sizeof (ident), vl);
+ ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".",
+ ident);
+ return (ENOENT);
}
if (isnan (rate[0]))
return (0);
}
+ pthread_mutex_lock (&inst->lock);
+
inst->num++;
inst->sum += rate[0];
inst->squares_sum += (rate[0] * rate[0]);
if (isnan (inst->max) || (inst->max < rate[0]))
inst->max = rate[0];
+ pthread_mutex_unlock (&inst->lock);
+
sfree (rate);
return (0);
} /* }}} int agg_instance_update */
+static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
+ char const *func, gauge_t rate, rate_to_value_state_t *state,
+ value_list_t *vl, char const *pi_prefix, cdtime_t t)
+{
+ value_t v;
+ int status;
+
+ if (pi_prefix[0] != 0)
+ ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
+ pi_prefix, func);
+ else
+ sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
+
+ memset (&v, 0, sizeof (v));
+ status = rate_to_value (&v, rate, state, inst->ds_type, t);
+ if (status != 0)
+ {
+ /* If this is the first iteration and rate_to_value() was asked to return a
+ * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
+ * gracefully. */
+ if (status == EAGAIN)
+ return (0);
+
+ WARNING ("aggregation plugin: rate_to_value failed with status %i.",
+ status);
+ return (-1);
+ }
+
+ vl->values = &v;
+ vl->values_len = 1;
+
+ plugin_dispatch_values_secure (vl);
+
+ vl->values = NULL;
+ vl->values_len = 0;
+
+ return (0);
+} /* }}} int agg_instance_read_func */
+
+static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
+{
+ value_list_t vl = VALUE_LIST_INIT;
+ char pi_prefix[DATA_MAX_NAME_LEN];
+
+ /* Pre-set all the fields in the value list that will not change per
+ * aggregation type (sum, average, ...). The struct will be re-used and must
+ * therefore be dispatched using the "secure" function. */
+
+ vl.time = t;
+ vl.interval = 0;
+
+ vl.meta = meta_data_create ();
+ if (vl.meta == NULL)
+ {
+ ERROR ("aggregation plugin: meta_data_create failed.");
+ return (-1);
+ }
+ meta_data_add_boolean (vl.meta, "aggregation:created", 1);
+
+ if (LU_IS_ALL (inst->ident.host))
+ sstrncpy (vl.host, "global", sizeof (vl.host));
+ else
+ sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
+
+ sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
+
+ if (LU_IS_ALL (inst->ident.plugin))
+ {
+ if (LU_IS_ALL (inst->ident.plugin_instance))
+ sstrncpy (pi_prefix, "", sizeof (pi_prefix));
+ else
+ sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
+ }
+ else
+ {
+ if (LU_IS_ALL (inst->ident.plugin_instance))
+ sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
+ else
+ ssnprintf (pi_prefix, sizeof (pi_prefix),
+ "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
+ }
+
+ sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
+
+ if (!LU_IS_ALL (inst->ident.type_instance))
+ sstrncpy (vl.type_instance, inst->ident.type_instance,
+ sizeof (vl.type_instance));
+
+#define READ_FUNC(func, rate) do { \
+ if (inst->state_ ## func != NULL) { \
+ agg_instance_read_func (inst, #func, rate, \
+ inst->state_ ## func, &vl, pi_prefix, t); \
+ } \
+} while (0)
+
+ pthread_mutex_lock (&inst->lock);
+
+ READ_FUNC (num, (gauge_t) inst->num);
+
+ /* All other aggregations are only defined when there have been any values
+ * at all. */
+ if (inst->num > 0)
+ {
+ READ_FUNC (sum, inst->sum);
+ READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
+ READ_FUNC (min, inst->min);
+ READ_FUNC (max, inst->max);
+ READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
+ - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
+ }
+
+ /* Reset internal state. */
+ inst->num = 0;
+ inst->sum = 0.0;
+ inst->squares_sum = 0.0;
+ inst->min = NAN;
+ inst->max = NAN;
+
+ pthread_mutex_unlock (&inst->lock);
+
+ meta_data_destroy (vl.meta);
+ vl.meta = NULL;
+
+ return (0);
+} /* }}} int agg_instance_read */
+
/* lookup_class_callback_t for utils_vl_lookup */
static void *agg_lookup_class_callback ( /* {{{ */
__attribute__((unused)) data_set_t const *ds,
value_list_t const *vl, void *user_class)
{
- return (agg_instance_create (vl, (aggregation_t *) user_class));
+ return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
} /* }}} void *agg_class_callback */
/* lookup_obj_callback_t for utils_vl_lookup */
/*
* <Plugin "aggregation">
* <Aggregation>
- * Host "/any/"
* Plugin "cpu"
- * PluginInstance "/all/"
* Type "cpu"
- * TypeInstance "/any/"
+ *
+ * GroupBy Host
+ * GroupBy TypeInstance
*
* CalculateNum true
* CalculateSum true
* </Aggregation>
* </Plugin>
*/
+static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
+ aggregation_t *agg)
+{
+ int i;
+
+ for (i = 0; i < ci->values_num; i++)
+ {
+ char const *value;
+
+ if (ci->values[i].type != OCONFIG_TYPE_STRING)
+ {
+ ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
+ "is not a string.", i + 1);
+ continue;
+ }
+
+ value = ci->values[i].value.string;
+
+ if (strcasecmp ("Host", value) == 0)
+ sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host));
+ else if (strcasecmp ("Plugin", value) == 0)
+ sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin));
+ else if (strcasecmp ("PluginInstance", value) == 0)
+ sstrncpy (agg->ident.plugin_instance, LU_ANY,
+ sizeof (agg->ident.plugin_instance));
+ else if (strcasecmp ("TypeInstance", value) == 0)
+ sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance));
+ else if (strcasecmp ("Type", value) == 0)
+ ERROR ("aggregation plugin: Grouping by type is not supported.");
+ else
+ WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
+ "option is invalid and will be ignored.", value);
+ } /* for (ci->values) */
+
+ return (0);
+} /* }}} int agg_config_handle_group_by */
+
static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
{
aggregation_t *agg;
+ _Bool is_valid;
int status;
int i;
}
memset (agg, 0, sizeof (*agg));
+ sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host));
+ sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin));
+ sstrncpy (agg->ident.plugin_instance, LU_ALL,
+ sizeof (agg->ident.plugin_instance));
+ sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type));
+ sstrncpy (agg->ident.type_instance, LU_ALL,
+ sizeof (agg->ident.type_instance));
+
for (i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;
else if (strcasecmp ("TypeInstance", child->key) == 0)
cf_util_get_string_buffer (child, agg->ident.type_instance,
sizeof (agg->ident.type_instance));
+ else if (strcasecmp ("GroupBy", child->key) == 0)
+ agg_config_handle_group_by (child, agg);
else if (strcasecmp ("CalculateNum", child->key) == 0)
cf_util_get_boolean (child, &agg->calc_num);
else if (strcasecmp ("CalculateSum", child->key) == 0)
"<Aggregation /> blocks and will be ignored.", child->key);
}
- /* TODO(octo): Check identifier:
- * - At least one wildcard.
- * - Type is set.
- */
+ /* Sanity checking */
+ is_valid = 1;
+ if (LU_IS_ALL (agg->ident.type)) /* {{{ */
+ {
+ ERROR ("aggregation plugin: It appears you did not specify the required "
+ "\"Type\" option in this aggregation. "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ is_valid = 0;
+ }
+ else if (strchr (agg->ident.type, '/') != NULL)
+ {
+ ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
+ "character. Especially, it may not be a wildcard. The current "
+ "value is \"%s\".", agg->ident.type);
+ is_valid = 0;
+ } /* }}} */
+
+ if (!LU_IS_ALL (agg->ident.host) /* {{{ */
+ && !LU_IS_ALL (agg->ident.plugin)
+ && !LU_IS_ALL (agg->ident.plugin_instance)
+ && !LU_IS_ALL (agg->ident.type_instance))
+ {
+ ERROR ("aggregation plugin: An aggregation must contain at least one "
+ "wildcard. This is achieved by leaving at least one of the \"Host\", "
+ "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
+ "and not grouping by that field. "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ is_valid = 0;
+ } /* }}} */
+
+ if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
+ && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
+ {
+ ERROR ("aggregation plugin: No aggregation function has been specified. "
+ "Without this, I don't know what I should be calculating. "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ is_valid = 0;
+ } /* }}} */
+
+ if (!is_valid) /* {{{ */
+ {
+ sfree (agg);
+ return (-1);
+ } /* }}} */
status = lookup_add (lookup, &agg->ident, agg);
if (status != 0)
return (-1);
}
+ DEBUG ("aggregation plugin: Successfully added aggregation: "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
return (0);
} /* }}} int agg_config_aggregation */
{
int i;
+ pthread_mutex_lock (&agg_instance_list_lock);
+
if (lookup == NULL)
{
lookup = lookup_create (agg_lookup_class_callback,
agg_lookup_free_obj_callback);
if (lookup == NULL)
{
+ pthread_mutex_unlock (&agg_instance_list_lock);
ERROR ("aggregation plugin: lookup_create failed.");
return (-1);
}
"<Plugin aggregation /> blocks and will be ignored.", child->key);
}
+ pthread_mutex_unlock (&agg_instance_list_lock);
+
return (0);
} /* }}} int agg_config */
static int agg_read (void) /* {{{ */
{
agg_instance_t *this;
- size_t i = 0;
+ cdtime_t t;
+ int success;
+
+ t = cdtime ();
+ success = 0;
pthread_mutex_lock (&agg_instance_list_lock);
+
+ /* agg_instance_list_head only holds data, after the "write" callback has
+ * been called with a matching value list at least once. So on startup,
+ * there's a race between the aggregations read() and write() callback. If
+ * the read() callback is called first, agg_instance_list_head is NULL and
+ * "success" may be zero. This is expected and should not result in an error.
+ * Therefore we need to handle this case separately. */
+ if (agg_instance_list_head == NULL)
+ {
+ pthread_mutex_unlock (&agg_instance_list_lock);
+ return (0);
+ }
+
for (this = agg_instance_list_head; this != NULL; this = this->next)
{
- DEBUG ("aggregation plugin: Handling instance: host = \"%s\", "
- "plugin = \"%s\", plugin_instance = \"%s\", "
- "type = \"%s\", type_instance = \"%s\"",
- this->ident.host,
- this->ident.plugin, this->ident.plugin_instance,
- this->ident.type, this->ident.type_instance);
- i++;
+ int status;
+
+ status = agg_instance_read (this, t);
+ if (status != 0)
+ WARNING ("aggregation plugin: Reading an aggregation instance "
+ "failed with status %i.", status);
+ else
+ success++;
}
- pthread_mutex_unlock (&agg_instance_list_lock);
- DEBUG ("aggregation plugin: There are currently %zu instances.", i);
+ pthread_mutex_unlock (&agg_instance_list_lock);
- return (0);
+ return ((success > 0) ? 0 : -1);
} /* }}} int agg_read */
static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */