1 /**
2 * collectd - src/aggregation.c
3 * Copyright (C) 2012 Florian Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Florian Forster <octo at collectd.org>
25 **/
27 #include "collectd.h"
29 #include "common.h"
30 #include "meta_data.h"
31 #include "plugin.h"
32 #include "utils_cache.h" /* for uc_get_rate() */
33 #include "utils_subst.h"
34 #include "utils_vl_lookup.h"
36 #define AGG_MATCHES_ALL(str) (strcmp("/.*/", str) == 0)
37 #define AGG_FUNC_PLACEHOLDER "%{aggregation}"
39 struct aggregation_s /* {{{ */
40 {
41 lookup_identifier_t ident;
42 unsigned int group_by;
44 unsigned int regex_fields;
46 char *set_host;
47 char *set_plugin;
48 char *set_plugin_instance;
49 char *set_type_instance;
51 _Bool calc_num;
52 _Bool calc_sum;
53 _Bool calc_average;
54 _Bool calc_min;
55 _Bool calc_max;
56 _Bool calc_stddev;
57 }; /* }}} */
58 typedef struct aggregation_s aggregation_t;
60 struct agg_instance_s;
61 typedef struct agg_instance_s agg_instance_t;
62 struct agg_instance_s /* {{{ */
63 {
64 pthread_mutex_t lock;
65 lookup_identifier_t ident;
67 int ds_type;
69 derive_t num;
70 gauge_t sum;
71 gauge_t squares_sum;
73 gauge_t min;
74 gauge_t max;
76 rate_to_value_state_t *state_num;
77 rate_to_value_state_t *state_sum;
78 rate_to_value_state_t *state_average;
79 rate_to_value_state_t *state_min;
80 rate_to_value_state_t *state_max;
81 rate_to_value_state_t *state_stddev;
83 agg_instance_t *next;
84 }; /* }}} */
86 static lookup_t *lookup = NULL;
88 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
89 static agg_instance_t *agg_instance_list_head = NULL;
91 static _Bool agg_is_regex(char const *str) /* {{{ */
92 {
93 size_t len;
95 if (str == NULL)
96 return (0);
98 len = strlen(str);
99 if (len < 3)
100 return (0);
102 if ((str[0] == '/') && (str[len - 1] == '/'))
103 return (1);
104 else
105 return (0);
106 } /* }}} _Bool agg_is_regex */
108 static void agg_destroy(aggregation_t *agg) /* {{{ */
109 {
110 sfree(agg);
111 } /* }}} void agg_destroy */
113 /* Frees all dynamically allocated memory within the instance. */
114 static void agg_instance_destroy(agg_instance_t *inst) /* {{{ */
115 {
116 if (inst == NULL)
117 return;
119 /* Remove this instance from the global list of instances. */
120 pthread_mutex_lock(&agg_instance_list_lock);
121 if (agg_instance_list_head == inst)
122 agg_instance_list_head = inst->next;
123 else if (agg_instance_list_head != NULL) {
124 agg_instance_t *prev = agg_instance_list_head;
125 while ((prev != NULL) && (prev->next != inst))
126 prev = prev->next;
127 if (prev != NULL)
128 prev->next = inst->next;
129 }
130 pthread_mutex_unlock(&agg_instance_list_lock);
132 sfree(inst->state_num);
133 sfree(inst->state_sum);
134 sfree(inst->state_average);
135 sfree(inst->state_min);
136 sfree(inst->state_max);
137 sfree(inst->state_stddev);
139 memset(inst, 0, sizeof(*inst));
140 inst->ds_type = -1;
141 inst->min = NAN;
142 inst->max = NAN;
143 } /* }}} void agg_instance_destroy */
145 static int agg_instance_create_name(agg_instance_t *inst, /* {{{ */
146 value_list_t const *vl,
147 aggregation_t const *agg) {
148 #define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) \
149 do { \
150 if (agg->set_##field != NULL) \
151 sstrncpy(buffer, agg->set_##field, buffer_size); \
152 else if ((agg->regex_fields & group_mask) && (agg->group_by & group_mask)) \
153 sstrncpy(buffer, vl->field, buffer_size); \
154 else if ((agg->regex_fields & group_mask) && \
155 (AGG_MATCHES_ALL(agg->ident.field))) \
156 sstrncpy(buffer, all_value, buffer_size); \
157 else \
158 sstrncpy(buffer, agg->ident.field, buffer_size); \
159 } while (0)
161 /* Host */
162 COPY_FIELD(inst->ident.host, sizeof(inst->ident.host), host, LU_GROUP_BY_HOST,
163 "global");
165 /* Plugin */
166 if (agg->set_plugin != NULL)
167 sstrncpy(inst->ident.plugin, agg->set_plugin, sizeof(inst->ident.plugin));
168 else
169 sstrncpy(inst->ident.plugin, "aggregation", sizeof(inst->ident.plugin));
171 /* Plugin instance */
172 if (agg->set_plugin_instance != NULL)
173 sstrncpy(inst->ident.plugin_instance, agg->set_plugin_instance,
174 sizeof(inst->ident.plugin_instance));
175 else {
176 char tmp_plugin[DATA_MAX_NAME_LEN];
177 char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
179 if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) &&
180 (agg->group_by & LU_GROUP_BY_PLUGIN))
181 sstrncpy(tmp_plugin, vl->plugin, sizeof(tmp_plugin));
182 else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) &&
183 (AGG_MATCHES_ALL(agg->ident.plugin)))
184 sstrncpy(tmp_plugin, "", sizeof(tmp_plugin));
185 else
186 sstrncpy(tmp_plugin, agg->ident.plugin, sizeof(tmp_plugin));
188 if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) &&
189 (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
190 sstrncpy(tmp_plugin_instance, vl->plugin_instance,
191 sizeof(tmp_plugin_instance));
192 else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) &&
193 (AGG_MATCHES_ALL(agg->ident.plugin_instance)))
194 sstrncpy(tmp_plugin_instance, "", sizeof(tmp_plugin_instance));
195 else
196 sstrncpy(tmp_plugin_instance, agg->ident.plugin_instance,
197 sizeof(tmp_plugin_instance));
199 if ((strcmp("", tmp_plugin) == 0) && (strcmp("", tmp_plugin_instance) == 0))
200 sstrncpy(inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
201 sizeof(inst->ident.plugin_instance));
202 else if (strcmp("", tmp_plugin) != 0)
203 ssnprintf(inst->ident.plugin_instance,
204 sizeof(inst->ident.plugin_instance), "%s-%s", tmp_plugin,
205 AGG_FUNC_PLACEHOLDER);
206 else if (strcmp("", tmp_plugin_instance) != 0)
207 ssnprintf(inst->ident.plugin_instance,
208 sizeof(inst->ident.plugin_instance), "%s-%s",
209 tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
210 else
211 ssnprintf(inst->ident.plugin_instance,
212 sizeof(inst->ident.plugin_instance), "%s-%s-%s", tmp_plugin,
213 tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
214 }
216 /* Type */
217 sstrncpy(inst->ident.type, agg->ident.type, sizeof(inst->ident.type));
219 /* Type instance */
220 COPY_FIELD(inst->ident.type_instance, sizeof(inst->ident.type_instance),
221 type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
223 #undef COPY_FIELD
225 return (0);
226 } /* }}} int agg_instance_create_name */
228 /* Create a new aggregation instance. */
229 static agg_instance_t *agg_instance_create(data_set_t const *ds, /* {{{ */
230 value_list_t const *vl,
231 aggregation_t *agg) {
232 agg_instance_t *inst;
234 DEBUG("aggregation plugin: Creating new instance.");
236 inst = calloc(1, sizeof(*inst));
237 if (inst == NULL) {
238 ERROR("aggregation plugin: calloc() failed.");
239 return (NULL);
240 }
241 pthread_mutex_init(&inst->lock, /* attr = */ NULL);
243 inst->ds_type = ds->ds[0].type;
245 agg_instance_create_name(inst, vl, agg);
247 inst->min = NAN;
248 inst->max = NAN;
250 #define INIT_STATE(field) \
251 do { \
252 inst->state_##field = NULL; \
253 if (agg->calc_##field) { \
254 inst->state_##field = calloc(1, sizeof(*inst->state_##field)); \
255 if (inst->state_##field == NULL) { \
256 agg_instance_destroy(inst); \
257 free(inst); \
258 ERROR("aggregation plugin: calloc() failed."); \
259 return (NULL); \
260 } \
261 } \
262 } while (0)
264 INIT_STATE(num);
265 INIT_STATE(sum);
266 INIT_STATE(average);
267 INIT_STATE(min);
268 INIT_STATE(max);
269 INIT_STATE(stddev);
271 #undef INIT_STATE
273 pthread_mutex_lock(&agg_instance_list_lock);
274 inst->next = agg_instance_list_head;
275 agg_instance_list_head = inst;
276 pthread_mutex_unlock(&agg_instance_list_lock);
278 return (inst);
279 } /* }}} agg_instance_t *agg_instance_create */
281 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
282 * the rate of the value list is available. Value lists with more than one data
283 * source are not supported and will return an error. Returns zero on success
284 * and non-zero otherwise. */
285 static int agg_instance_update(agg_instance_t *inst, /* {{{ */
286 data_set_t const *ds, value_list_t const *vl) {
287 gauge_t *rate;
289 if (ds->ds_num != 1) {
290 ERROR("aggregation plugin: The \"%s\" type (data set) has more than one "
291 "data source. This is currently not supported by this plugin. "
292 "Sorry.",
293 ds->type);
294 return (EINVAL);
295 }
297 rate = uc_get_rate(ds, vl);
298 if (rate == NULL) {
299 char ident[6 * DATA_MAX_NAME_LEN];
300 FORMAT_VL(ident, sizeof(ident), vl);
301 ERROR("aggregation plugin: Unable to read the current rate of \"%s\".",
302 ident);
303 return (ENOENT);
304 }
306 if (isnan(rate[0])) {
307 sfree(rate);
308 return (0);
309 }
311 pthread_mutex_lock(&inst->lock);
313 inst->num++;
314 inst->sum += rate[0];
315 inst->squares_sum += (rate[0] * rate[0]);
317 if (isnan(inst->min) || (inst->min > rate[0]))
318 inst->min = rate[0];
319 if (isnan(inst->max) || (inst->max < rate[0]))
320 inst->max = rate[0];
322 pthread_mutex_unlock(&inst->lock);
324 sfree(rate);
325 return (0);
326 } /* }}} int agg_instance_update */
328 static int agg_instance_read_func(agg_instance_t *inst, /* {{{ */
329 char const *func, gauge_t rate,
330 rate_to_value_state_t *state,
331 value_list_t *vl, char const *pi_prefix,
332 cdtime_t t) {
333 value_t v;
334 int status;
336 if (pi_prefix[0] != 0)
337 subst_string(vl->plugin_instance, sizeof(vl->plugin_instance), pi_prefix,
338 AGG_FUNC_PLACEHOLDER, func);
339 else
340 sstrncpy(vl->plugin_instance, func, sizeof(vl->plugin_instance));
342 status = rate_to_value(&v, rate, state, inst->ds_type, t);
343 if (status != 0) {
344 /* If this is the first iteration and rate_to_value() was asked to return a
345 * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
346 * gracefully. */
347 if (status == EAGAIN)
348 return (0);
350 WARNING("aggregation plugin: rate_to_value failed with status %i.", status);
351 return (-1);
352 }
354 vl->values = &v;
355 vl->values_len = 1;
357 plugin_dispatch_values(vl);
359 vl->values = NULL;
360 vl->values_len = 0;
362 return (0);
363 } /* }}} int agg_instance_read_func */
365 static int agg_instance_read(agg_instance_t *inst, cdtime_t t) /* {{{ */
366 {
367 value_list_t vl = VALUE_LIST_INIT;
369 /* Pre-set all the fields in the value list that will not change per
370 * aggregation type (sum, average, ...). The struct will be re-used and must
371 * therefore be dispatched using the "secure" function. */
373 vl.time = t;
374 vl.interval = 0;
376 vl.meta = meta_data_create();
377 if (vl.meta == NULL) {
378 ERROR("aggregation plugin: meta_data_create failed.");
379 return (-1);
380 }
381 meta_data_add_boolean(vl.meta, "aggregation:created", 1);
383 sstrncpy(vl.host, inst->ident.host, sizeof(vl.host));
384 sstrncpy(vl.plugin, inst->ident.plugin, sizeof(vl.plugin));
385 sstrncpy(vl.type, inst->ident.type, sizeof(vl.type));
386 sstrncpy(vl.type_instance, inst->ident.type_instance,
387 sizeof(vl.type_instance));
389 #define READ_FUNC(func, rate) \
390 do { \
391 if (inst->state_##func != NULL) { \
392 agg_instance_read_func(inst, #func, rate, inst->state_##func, &vl, \
393 inst->ident.plugin_instance, t); \
394 } \
395 } while (0)
397 pthread_mutex_lock(&inst->lock);
399 READ_FUNC(num, (gauge_t)inst->num);
401 /* All other aggregations are only defined when there have been any values
402 * at all. */
403 if (inst->num > 0) {
404 READ_FUNC(sum, inst->sum);
405 READ_FUNC(average, (inst->sum / ((gauge_t)inst->num)));
406 READ_FUNC(min, inst->min);
407 READ_FUNC(max, inst->max);
408 READ_FUNC(stddev, sqrt((((gauge_t)inst->num) * inst->squares_sum) -
409 (inst->sum * inst->sum)) /
410 ((gauge_t)inst->num));
411 }
413 /* Reset internal state. */
414 inst->num = 0;
415 inst->sum = 0.0;
416 inst->squares_sum = 0.0;
417 inst->min = NAN;
418 inst->max = NAN;
420 pthread_mutex_unlock(&inst->lock);
422 meta_data_destroy(vl.meta);
423 vl.meta = NULL;
425 return (0);
426 } /* }}} int agg_instance_read */
428 /* lookup_class_callback_t for utils_vl_lookup */
429 static void *agg_lookup_class_callback(/* {{{ */
430 data_set_t const *ds,
431 value_list_t const *vl,
432 void *user_class) {
433 return (agg_instance_create(ds, vl, (aggregation_t *)user_class));
434 } /* }}} void *agg_class_callback */
436 /* lookup_obj_callback_t for utils_vl_lookup */
437 static int agg_lookup_obj_callback(data_set_t const *ds, /* {{{ */
438 value_list_t const *vl,
439 __attribute__((unused)) void *user_class,
440 void *user_obj) {
441 return (agg_instance_update((agg_instance_t *)user_obj, ds, vl));
442 } /* }}} int agg_lookup_obj_callback */
444 /* lookup_free_class_callback_t for utils_vl_lookup */
445 static void agg_lookup_free_class_callback(void *user_class) /* {{{ */
446 {
447 agg_destroy((aggregation_t *)user_class);
448 } /* }}} void agg_lookup_free_class_callback */
450 /* lookup_free_obj_callback_t for utils_vl_lookup */
451 static void agg_lookup_free_obj_callback(void *user_obj) /* {{{ */
452 {
453 agg_instance_destroy((agg_instance_t *)user_obj);
454 } /* }}} void agg_lookup_free_obj_callback */
456 /*
457 * <Plugin "aggregation">
458 * <Aggregation>
459 * Plugin "cpu"
460 * Type "cpu"
461 *
462 * GroupBy Host
463 * GroupBy TypeInstance
464 *
465 * CalculateNum true
466 * CalculateSum true
467 * CalculateAverage true
468 * CalculateMinimum true
469 * CalculateMaximum true
470 * CalculateStddev true
471 * </Aggregation>
472 * </Plugin>
473 */
474 static int agg_config_handle_group_by(oconfig_item_t const *ci, /* {{{ */
475 aggregation_t *agg) {
476 for (int i = 0; i < ci->values_num; i++) {
477 char const *value;
479 if (ci->values[i].type != OCONFIG_TYPE_STRING) {
480 ERROR("aggregation plugin: Argument %i of the \"GroupBy\" option "
481 "is not a string.",
482 i + 1);
483 continue;
484 }
486 value = ci->values[i].value.string;
488 if (strcasecmp("Host", value) == 0)
489 agg->group_by |= LU_GROUP_BY_HOST;
490 else if (strcasecmp("Plugin", value) == 0)
491 agg->group_by |= LU_GROUP_BY_PLUGIN;
492 else if (strcasecmp("PluginInstance", value) == 0)
493 agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
494 else if (strcasecmp("TypeInstance", value) == 0)
495 agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
496 else if (strcasecmp("Type", value) == 0)
497 ERROR("aggregation plugin: Grouping by type is not supported.");
498 else
499 WARNING("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
500 "option is invalid and will be ignored.",
501 value);
502 } /* for (ci->values) */
504 return (0);
505 } /* }}} int agg_config_handle_group_by */
507 static int agg_config_aggregation(oconfig_item_t *ci) /* {{{ */
508 {
509 aggregation_t *agg;
510 _Bool is_valid;
511 int status;
513 agg = calloc(1, sizeof(*agg));
514 if (agg == NULL) {
515 ERROR("aggregation plugin: calloc failed.");
516 return (-1);
517 }
519 sstrncpy(agg->ident.host, "/.*/", sizeof(agg->ident.host));
520 sstrncpy(agg->ident.plugin, "/.*/", sizeof(agg->ident.plugin));
521 sstrncpy(agg->ident.plugin_instance, "/.*/",
522 sizeof(agg->ident.plugin_instance));
523 sstrncpy(agg->ident.type, "/.*/", sizeof(agg->ident.type));
524 sstrncpy(agg->ident.type_instance, "/.*/", sizeof(agg->ident.type_instance));
526 for (int i = 0; i < ci->children_num; i++) {
527 oconfig_item_t *child = ci->children + i;
529 if (strcasecmp("Host", child->key) == 0)
530 cf_util_get_string_buffer(child, agg->ident.host,
531 sizeof(agg->ident.host));
532 else if (strcasecmp("Plugin", child->key) == 0)
533 cf_util_get_string_buffer(child, agg->ident.plugin,
534 sizeof(agg->ident.plugin));
535 else if (strcasecmp("PluginInstance", child->key) == 0)
536 cf_util_get_string_buffer(child, agg->ident.plugin_instance,
537 sizeof(agg->ident.plugin_instance));
538 else if (strcasecmp("Type", child->key) == 0)
539 cf_util_get_string_buffer(child, agg->ident.type,
540 sizeof(agg->ident.type));
541 else if (strcasecmp("TypeInstance", child->key) == 0)
542 cf_util_get_string_buffer(child, agg->ident.type_instance,
543 sizeof(agg->ident.type_instance));
544 else if (strcasecmp("SetHost", child->key) == 0)
545 cf_util_get_string(child, &agg->set_host);
546 else if (strcasecmp("SetPlugin", child->key) == 0)
547 cf_util_get_string(child, &agg->set_plugin);
548 else if (strcasecmp("SetPluginInstance", child->key) == 0)
549 cf_util_get_string(child, &agg->set_plugin_instance);
550 else if (strcasecmp("SetTypeInstance", child->key) == 0)
551 cf_util_get_string(child, &agg->set_type_instance);
552 else if (strcasecmp("GroupBy", child->key) == 0)
553 agg_config_handle_group_by(child, agg);
554 else if (strcasecmp("CalculateNum", child->key) == 0)
555 cf_util_get_boolean(child, &agg->calc_num);
556 else if (strcasecmp("CalculateSum", child->key) == 0)
557 cf_util_get_boolean(child, &agg->calc_sum);
558 else if (strcasecmp("CalculateAverage", child->key) == 0)
559 cf_util_get_boolean(child, &agg->calc_average);
560 else if (strcasecmp("CalculateMinimum", child->key) == 0)
561 cf_util_get_boolean(child, &agg->calc_min);
562 else if (strcasecmp("CalculateMaximum", child->key) == 0)
563 cf_util_get_boolean(child, &agg->calc_max);
564 else if (strcasecmp("CalculateStddev", child->key) == 0)
565 cf_util_get_boolean(child, &agg->calc_stddev);
566 else
567 WARNING("aggregation plugin: The \"%s\" key is not allowed inside "
568 "<Aggregation /> blocks and will be ignored.",
569 child->key);
570 }
572 if (agg_is_regex(agg->ident.host))
573 agg->regex_fields |= LU_GROUP_BY_HOST;
574 if (agg_is_regex(agg->ident.plugin))
575 agg->regex_fields |= LU_GROUP_BY_PLUGIN;
576 if (agg_is_regex(agg->ident.plugin_instance))
577 agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
578 if (agg_is_regex(agg->ident.type_instance))
579 agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
581 /* Sanity checking */
582 is_valid = 1;
583 if (strcmp("/.*/", agg->ident.type) == 0) /* {{{ */
584 {
585 ERROR("aggregation plugin: It appears you did not specify the required "
586 "\"Type\" option in this aggregation. "
587 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
588 "Type \"%s\", TypeInstance \"%s\")",
589 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
590 agg->ident.type, agg->ident.type_instance);
591 is_valid = 0;
592 } else if (strchr(agg->ident.type, '/') != NULL) {
593 ERROR("aggregation plugin: The \"Type\" may not contain the '/' "
594 "character. Especially, it may not be a regex. The current "
595 "value is \"%s\".",
596 agg->ident.type);
597 is_valid = 0;
598 } /* }}} */
600 /* Check that there is at least one regex field without a grouping. {{{ */
601 if ((agg->regex_fields & ~agg->group_by) == 0) {
602 ERROR("aggregation plugin: An aggregation must contain at least one "
603 "wildcard. This is achieved by leaving at least one of the \"Host\", "
604 "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
605 "or using a regular expression and not grouping by that field. "
606 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
607 "Type \"%s\", TypeInstance \"%s\")",
608 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
609 agg->ident.type, agg->ident.type_instance);
610 is_valid = 0;
611 } /* }}} */
613 /* Check that all grouping fields are regular expressions. {{{ */
614 if (agg->group_by & ~agg->regex_fields) {
615 ERROR("aggregation plugin: Only wildcard fields (fields for which a "
616 "regular expression is configured or which are left blank) can be "
617 "specified in the \"GroupBy\" option. "
618 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
619 "Type \"%s\", TypeInstance \"%s\")",
620 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
621 agg->ident.type, agg->ident.type_instance);
622 is_valid = 0;
623 } /* }}} */
625 if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
626 && !agg->calc_min && !agg->calc_max && !agg->calc_stddev) {
627 ERROR("aggregation plugin: No aggregation function has been specified. "
628 "Without this, I don't know what I should be calculating. "
629 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
630 "Type \"%s\", TypeInstance \"%s\")",
631 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
632 agg->ident.type, agg->ident.type_instance);
633 is_valid = 0;
634 } /* }}} */
636 if (!is_valid) /* {{{ */
637 {
638 sfree(agg);
639 return (-1);
640 } /* }}} */
642 status = lookup_add(lookup, &agg->ident, agg->group_by, agg);
643 if (status != 0) {
644 ERROR("aggregation plugin: lookup_add failed with status %i.", status);
645 sfree(agg);
646 return (-1);
647 }
649 DEBUG("aggregation plugin: Successfully added aggregation: "
650 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
651 "Type \"%s\", TypeInstance \"%s\")",
652 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
653 agg->ident.type, agg->ident.type_instance);
654 return (0);
655 } /* }}} int agg_config_aggregation */
657 static int agg_config(oconfig_item_t *ci) /* {{{ */
658 {
659 pthread_mutex_lock(&agg_instance_list_lock);
661 if (lookup == NULL) {
662 lookup = lookup_create(agg_lookup_class_callback, agg_lookup_obj_callback,
663 agg_lookup_free_class_callback,
664 agg_lookup_free_obj_callback);
665 if (lookup == NULL) {
666 pthread_mutex_unlock(&agg_instance_list_lock);
667 ERROR("aggregation plugin: lookup_create failed.");
668 return (-1);
669 }
670 }
672 for (int i = 0; i < ci->children_num; i++) {
673 oconfig_item_t *child = ci->children + i;
675 if (strcasecmp("Aggregation", child->key) == 0)
676 agg_config_aggregation(child);
677 else
678 WARNING("aggregation plugin: The \"%s\" key is not allowed inside "
679 "<Plugin aggregation /> blocks and will be ignored.",
680 child->key);
681 }
683 pthread_mutex_unlock(&agg_instance_list_lock);
685 return (0);
686 } /* }}} int agg_config */
688 static int agg_read(void) /* {{{ */
689 {
690 cdtime_t t;
691 int success;
693 t = cdtime();
694 success = 0;
696 pthread_mutex_lock(&agg_instance_list_lock);
698 /* agg_instance_list_head only holds data, after the "write" callback has
699 * been called with a matching value list at least once. So on startup,
700 * there's a race between the aggregations read() and write() callback. If
701 * the read() callback is called first, agg_instance_list_head is NULL and
702 * "success" may be zero. This is expected and should not result in an error.
703 * Therefore we need to handle this case separately. */
704 if (agg_instance_list_head == NULL) {
705 pthread_mutex_unlock(&agg_instance_list_lock);
706 return (0);
707 }
709 for (agg_instance_t *this = agg_instance_list_head; this != NULL;
710 this = this->next) {
711 int status;
713 status = agg_instance_read(this, t);
714 if (status != 0)
715 WARNING("aggregation plugin: Reading an aggregation instance "
716 "failed with status %i.",
717 status);
718 else
719 success++;
720 }
722 pthread_mutex_unlock(&agg_instance_list_lock);
724 return ((success > 0) ? 0 : -1);
725 } /* }}} int agg_read */
727 static int agg_write(data_set_t const *ds, value_list_t const *vl, /* {{{ */
728 __attribute__((unused)) user_data_t *user_data) {
729 _Bool created_by_aggregation = 0;
730 int status;
732 /* Ignore values that were created by the aggregation plugin to avoid weird
733 * effects. */
734 (void)meta_data_get_boolean(vl->meta, "aggregation:created",
735 &created_by_aggregation);
736 if (created_by_aggregation)
737 return (0);
739 if (lookup == NULL)
740 status = ENOENT;
741 else {
742 status = lookup_search(lookup, ds, vl);
743 if (status > 0)
744 status = 0;
745 }
747 return (status);
748 } /* }}} int agg_write */
750 void module_register(void) {
751 plugin_register_complex_config("aggregation", agg_config);
752 plugin_register_read("aggregation", agg_read);
753 plugin_register_write("aggregation", agg_write, /* user_data = */ NULL);
754 }
756 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */