1 /**
2 * collectd - src/aggregation.c
3 * Copyright (C) 2012 Florian Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Florian Forster <octo at collectd.org>
25 **/
27 #include "collectd.h"
29 #include <pthread.h>
31 #include "plugin.h"
32 #include "common.h"
33 #include "configfile.h"
34 #include "meta_data.h"
35 #include "utils_cache.h" /* for uc_get_rate() */
36 #include "utils_subst.h"
37 #include "utils_vl_lookup.h"
39 #define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0)
40 #define AGG_FUNC_PLACEHOLDER "%{aggregation}"
42 struct aggregation_s /* {{{ */
43 {
44 identifier_t ident;
45 unsigned int group_by;
47 unsigned int regex_fields;
49 char *set_host;
50 char *set_plugin;
51 char *set_plugin_instance;
52 char *set_type_instance;
54 _Bool calc_num;
55 _Bool calc_sum;
56 _Bool calc_average;
57 _Bool calc_min;
58 _Bool calc_max;
59 _Bool calc_stddev;
60 }; /* }}} */
61 typedef struct aggregation_s aggregation_t;
63 struct agg_instance_s;
64 typedef struct agg_instance_s agg_instance_t;
65 struct agg_instance_s /* {{{ */
66 {
67 pthread_mutex_t lock;
68 identifier_t ident;
70 int ds_type;
72 derive_t num;
73 gauge_t sum;
74 gauge_t squares_sum;
76 gauge_t min;
77 gauge_t max;
79 rate_to_value_state_t *state_num;
80 rate_to_value_state_t *state_sum;
81 rate_to_value_state_t *state_average;
82 rate_to_value_state_t *state_min;
83 rate_to_value_state_t *state_max;
84 rate_to_value_state_t *state_stddev;
86 agg_instance_t *next;
87 }; /* }}} */
89 static lookup_t *lookup = NULL;
91 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
92 static agg_instance_t *agg_instance_list_head = NULL;
94 static _Bool agg_is_regex (char const *str) /* {{{ */
95 {
96 size_t len;
98 if (str == NULL)
99 return (0);
101 len = strlen (str);
102 if (len < 3)
103 return (0);
105 if ((str[0] == '/') && (str[len - 1] == '/'))
106 return (1);
107 else
108 return (0);
109 } /* }}} _Bool agg_is_regex */
111 static void agg_destroy (aggregation_t *agg) /* {{{ */
112 {
113 sfree (agg);
114 } /* }}} void agg_destroy */
116 /* Frees all dynamically allocated memory within the instance. */
117 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
118 {
119 if (inst == NULL)
120 return;
122 /* Remove this instance from the global list of instances. */
123 pthread_mutex_lock (&agg_instance_list_lock);
124 if (agg_instance_list_head == inst)
125 agg_instance_list_head = inst->next;
126 else if (agg_instance_list_head != NULL)
127 {
128 agg_instance_t *prev = agg_instance_list_head;
129 while ((prev != NULL) && (prev->next != inst))
130 prev = prev->next;
131 if (prev != NULL)
132 prev->next = inst->next;
133 }
134 pthread_mutex_unlock (&agg_instance_list_lock);
136 sfree (inst->state_num);
137 sfree (inst->state_sum);
138 sfree (inst->state_average);
139 sfree (inst->state_min);
140 sfree (inst->state_max);
141 sfree (inst->state_stddev);
143 memset (inst, 0, sizeof (*inst));
144 inst->ds_type = -1;
145 inst->min = NAN;
146 inst->max = NAN;
147 } /* }}} void agg_instance_destroy */
149 static int agg_instance_create_name (agg_instance_t *inst, /* {{{ */
150 value_list_t const *vl, aggregation_t const *agg)
151 {
152 #define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) do { \
153 if (agg->set_ ## field != NULL) \
154 sstrncpy (buffer, agg->set_ ## field, buffer_size); \
155 else if ((agg->regex_fields & group_mask) \
156 && (agg->group_by & group_mask)) \
157 sstrncpy (buffer, vl->field, buffer_size); \
158 else if ((agg->regex_fields & group_mask) \
159 && (AGG_MATCHES_ALL (agg->ident.field))) \
160 sstrncpy (buffer, all_value, buffer_size); \
161 else \
162 sstrncpy (buffer, agg->ident.field, buffer_size); \
163 } while (0)
165 /* Host */
166 COPY_FIELD (inst->ident.host, sizeof (inst->ident.host),
167 host, LU_GROUP_BY_HOST, "global");
169 /* Plugin */
170 if (agg->set_plugin != NULL)
171 sstrncpy (inst->ident.plugin, agg->set_plugin,
172 sizeof (inst->ident.plugin));
173 else
174 sstrncpy (inst->ident.plugin, "aggregation", sizeof (inst->ident.plugin));
176 /* Plugin instance */
177 if (agg->set_plugin_instance != NULL)
178 sstrncpy (inst->ident.plugin_instance, agg->set_plugin_instance,
179 sizeof (inst->ident.plugin_instance));
180 else
181 {
182 char tmp_plugin[DATA_MAX_NAME_LEN];
183 char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
185 if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
186 && (agg->group_by & LU_GROUP_BY_PLUGIN))
187 sstrncpy (tmp_plugin, vl->plugin, sizeof (tmp_plugin));
188 else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
189 && (AGG_MATCHES_ALL (agg->ident.plugin)))
190 sstrncpy (tmp_plugin, "", sizeof (tmp_plugin));
191 else
192 sstrncpy (tmp_plugin, agg->ident.plugin, sizeof (tmp_plugin));
194 if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
195 && (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
196 sstrncpy (tmp_plugin_instance, vl->plugin_instance,
197 sizeof (tmp_plugin_instance));
198 else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
199 && (AGG_MATCHES_ALL (agg->ident.plugin_instance)))
200 sstrncpy (tmp_plugin_instance, "", sizeof (tmp_plugin_instance));
201 else
202 sstrncpy (tmp_plugin_instance, agg->ident.plugin_instance,
203 sizeof (tmp_plugin_instance));
205 if ((strcmp ("", tmp_plugin) == 0)
206 && (strcmp ("", tmp_plugin_instance) == 0))
207 sstrncpy (inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
208 sizeof (inst->ident.plugin_instance));
209 else if (strcmp ("", tmp_plugin) != 0)
210 ssnprintf (inst->ident.plugin_instance,
211 sizeof (inst->ident.plugin_instance),
212 "%s-%s", tmp_plugin, AGG_FUNC_PLACEHOLDER);
213 else if (strcmp ("", tmp_plugin_instance) != 0)
214 ssnprintf (inst->ident.plugin_instance,
215 sizeof (inst->ident.plugin_instance),
216 "%s-%s", tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
217 else
218 ssnprintf (inst->ident.plugin_instance,
219 sizeof (inst->ident.plugin_instance),
220 "%s-%s-%s", tmp_plugin, tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
221 }
223 /* Type */
224 sstrncpy (inst->ident.type, agg->ident.type, sizeof (inst->ident.type));
226 /* Type instance */
227 COPY_FIELD (inst->ident.type_instance, sizeof (inst->ident.type_instance),
228 type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
230 #undef COPY_FIELD
232 return (0);
233 } /* }}} int agg_instance_create_name */
235 /* Create a new aggregation instance. */
236 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
237 value_list_t const *vl, aggregation_t *agg)
238 {
239 agg_instance_t *inst;
241 DEBUG ("aggregation plugin: Creating new instance.");
243 inst = malloc (sizeof (*inst));
244 if (inst == NULL)
245 {
246 ERROR ("aggregation plugin: malloc() failed.");
247 return (NULL);
248 }
249 memset (inst, 0, sizeof (*inst));
250 pthread_mutex_init (&inst->lock, /* attr = */ NULL);
252 inst->ds_type = ds->ds[0].type;
254 agg_instance_create_name (inst, vl, agg);
256 inst->min = NAN;
257 inst->max = NAN;
259 #define INIT_STATE(field) do { \
260 inst->state_ ## field = NULL; \
261 if (agg->calc_ ## field) { \
262 inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
263 if (inst->state_ ## field == NULL) { \
264 agg_instance_destroy (inst); \
265 ERROR ("aggregation plugin: malloc() failed."); \
266 return (NULL); \
267 } \
268 memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
269 } \
270 } while (0)
272 INIT_STATE (num);
273 INIT_STATE (sum);
274 INIT_STATE (average);
275 INIT_STATE (min);
276 INIT_STATE (max);
277 INIT_STATE (stddev);
279 #undef INIT_STATE
281 pthread_mutex_lock (&agg_instance_list_lock);
282 inst->next = agg_instance_list_head;
283 agg_instance_list_head = inst;
284 pthread_mutex_unlock (&agg_instance_list_lock);
286 return (inst);
287 } /* }}} agg_instance_t *agg_instance_create */
289 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
290 * the rate of the value list is available. Value lists with more than one data
291 * source are not supported and will return an error. Returns zero on success
292 * and non-zero otherwise. */
293 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
294 data_set_t const *ds, value_list_t const *vl)
295 {
296 gauge_t *rate;
298 if (ds->ds_num != 1)
299 {
300 ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one "
301 "data source. This is currently not supported by this plugin. "
302 "Sorry.", ds->type);
303 return (EINVAL);
304 }
306 rate = uc_get_rate (ds, vl);
307 if (rate == NULL)
308 {
309 char ident[6 * DATA_MAX_NAME_LEN];
310 FORMAT_VL (ident, sizeof (ident), vl);
311 ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".",
312 ident);
313 return (ENOENT);
314 }
316 if (isnan (rate[0]))
317 {
318 sfree (rate);
319 return (0);
320 }
322 pthread_mutex_lock (&inst->lock);
324 inst->num++;
325 inst->sum += rate[0];
326 inst->squares_sum += (rate[0] * rate[0]);
328 if (isnan (inst->min) || (inst->min > rate[0]))
329 inst->min = rate[0];
330 if (isnan (inst->max) || (inst->max < rate[0]))
331 inst->max = rate[0];
333 pthread_mutex_unlock (&inst->lock);
335 sfree (rate);
336 return (0);
337 } /* }}} int agg_instance_update */
339 static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
340 char const *func, gauge_t rate, rate_to_value_state_t *state,
341 value_list_t *vl, char const *pi_prefix, cdtime_t t)
342 {
343 value_t v;
344 int status;
346 if (pi_prefix[0] != 0)
347 subst_string (vl->plugin_instance, sizeof (vl->plugin_instance),
348 pi_prefix, AGG_FUNC_PLACEHOLDER, func);
349 else
350 sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
352 memset (&v, 0, sizeof (v));
353 status = rate_to_value (&v, rate, state, inst->ds_type, t);
354 if (status != 0)
355 {
356 /* If this is the first iteration and rate_to_value() was asked to return a
357 * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
358 * gracefully. */
359 if (status == EAGAIN)
360 return (0);
362 WARNING ("aggregation plugin: rate_to_value failed with status %i.",
363 status);
364 return (-1);
365 }
367 vl->values = &v;
368 vl->values_len = 1;
370 plugin_dispatch_values (vl);
372 vl->values = NULL;
373 vl->values_len = 0;
375 return (0);
376 } /* }}} int agg_instance_read_func */
378 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
379 {
380 value_list_t vl = VALUE_LIST_INIT;
382 /* Pre-set all the fields in the value list that will not change per
383 * aggregation type (sum, average, ...). The struct will be re-used and must
384 * therefore be dispatched using the "secure" function. */
386 vl.time = t;
387 vl.interval = 0;
389 vl.meta = meta_data_create ();
390 if (vl.meta == NULL)
391 {
392 ERROR ("aggregation plugin: meta_data_create failed.");
393 return (-1);
394 }
395 meta_data_add_boolean (vl.meta, "aggregation:created", 1);
397 sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
398 sstrncpy (vl.plugin, inst->ident.plugin, sizeof (vl.plugin));
399 sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
400 sstrncpy (vl.type_instance, inst->ident.type_instance,
401 sizeof (vl.type_instance));
403 #define READ_FUNC(func, rate) do { \
404 if (inst->state_ ## func != NULL) { \
405 agg_instance_read_func (inst, #func, rate, \
406 inst->state_ ## func, &vl, inst->ident.plugin_instance, t); \
407 } \
408 } while (0)
410 pthread_mutex_lock (&inst->lock);
412 READ_FUNC (num, (gauge_t) inst->num);
414 /* All other aggregations are only defined when there have been any values
415 * at all. */
416 if (inst->num > 0)
417 {
418 READ_FUNC (sum, inst->sum);
419 READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
420 READ_FUNC (min, inst->min);
421 READ_FUNC (max, inst->max);
422 READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
423 - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
424 }
426 /* Reset internal state. */
427 inst->num = 0;
428 inst->sum = 0.0;
429 inst->squares_sum = 0.0;
430 inst->min = NAN;
431 inst->max = NAN;
433 pthread_mutex_unlock (&inst->lock);
435 meta_data_destroy (vl.meta);
436 vl.meta = NULL;
438 return (0);
439 } /* }}} int agg_instance_read */
441 /* lookup_class_callback_t for utils_vl_lookup */
442 static void *agg_lookup_class_callback ( /* {{{ */
443 data_set_t const *ds, value_list_t const *vl, void *user_class)
444 {
445 return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
446 } /* }}} void *agg_class_callback */
448 /* lookup_obj_callback_t for utils_vl_lookup */
449 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
450 value_list_t const *vl,
451 __attribute__((unused)) void *user_class,
452 void *user_obj)
453 {
454 return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
455 } /* }}} int agg_lookup_obj_callback */
457 /* lookup_free_class_callback_t for utils_vl_lookup */
458 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
459 {
460 agg_destroy ((aggregation_t *) user_class);
461 } /* }}} void agg_lookup_free_class_callback */
463 /* lookup_free_obj_callback_t for utils_vl_lookup */
464 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
465 {
466 agg_instance_destroy ((agg_instance_t *) user_obj);
467 } /* }}} void agg_lookup_free_obj_callback */
469 /*
470 * <Plugin "aggregation">
471 * <Aggregation>
472 * Plugin "cpu"
473 * Type "cpu"
474 *
475 * GroupBy Host
476 * GroupBy TypeInstance
477 *
478 * CalculateNum true
479 * CalculateSum true
480 * CalculateAverage true
481 * CalculateMinimum true
482 * CalculateMaximum true
483 * CalculateStddev true
484 * </Aggregation>
485 * </Plugin>
486 */
487 static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
488 aggregation_t *agg)
489 {
490 int i;
492 for (i = 0; i < ci->values_num; i++)
493 {
494 char const *value;
496 if (ci->values[i].type != OCONFIG_TYPE_STRING)
497 {
498 ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
499 "is not a string.", i + 1);
500 continue;
501 }
503 value = ci->values[i].value.string;
505 if (strcasecmp ("Host", value) == 0)
506 agg->group_by |= LU_GROUP_BY_HOST;
507 else if (strcasecmp ("Plugin", value) == 0)
508 agg->group_by |= LU_GROUP_BY_PLUGIN;
509 else if (strcasecmp ("PluginInstance", value) == 0)
510 agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
511 else if (strcasecmp ("TypeInstance", value) == 0)
512 agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
513 else if (strcasecmp ("Type", value) == 0)
514 ERROR ("aggregation plugin: Grouping by type is not supported.");
515 else
516 WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
517 "option is invalid and will be ignored.", value);
518 } /* for (ci->values) */
520 return (0);
521 } /* }}} int agg_config_handle_group_by */
523 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
524 {
525 aggregation_t *agg;
526 _Bool is_valid;
527 int status;
528 int i;
530 agg = malloc (sizeof (*agg));
531 if (agg == NULL)
532 {
533 ERROR ("aggregation plugin: malloc failed.");
534 return (-1);
535 }
536 memset (agg, 0, sizeof (*agg));
538 sstrncpy (agg->ident.host, "/.*/", sizeof (agg->ident.host));
539 sstrncpy (agg->ident.plugin, "/.*/", sizeof (agg->ident.plugin));
540 sstrncpy (agg->ident.plugin_instance, "/.*/",
541 sizeof (agg->ident.plugin_instance));
542 sstrncpy (agg->ident.type, "/.*/", sizeof (agg->ident.type));
543 sstrncpy (agg->ident.type_instance, "/.*/",
544 sizeof (agg->ident.type_instance));
546 for (i = 0; i < ci->children_num; i++)
547 {
548 oconfig_item_t *child = ci->children + i;
550 if (strcasecmp ("Host", child->key) == 0)
551 cf_util_get_string_buffer (child, agg->ident.host,
552 sizeof (agg->ident.host));
553 else if (strcasecmp ("Plugin", child->key) == 0)
554 cf_util_get_string_buffer (child, agg->ident.plugin,
555 sizeof (agg->ident.plugin));
556 else if (strcasecmp ("PluginInstance", child->key) == 0)
557 cf_util_get_string_buffer (child, agg->ident.plugin_instance,
558 sizeof (agg->ident.plugin_instance));
559 else if (strcasecmp ("Type", child->key) == 0)
560 cf_util_get_string_buffer (child, agg->ident.type,
561 sizeof (agg->ident.type));
562 else if (strcasecmp ("TypeInstance", child->key) == 0)
563 cf_util_get_string_buffer (child, agg->ident.type_instance,
564 sizeof (agg->ident.type_instance));
565 else if (strcasecmp ("SetHost", child->key) == 0)
566 cf_util_get_string (child, &agg->set_host);
567 else if (strcasecmp ("SetPlugin", child->key) == 0)
568 cf_util_get_string (child, &agg->set_plugin);
569 else if (strcasecmp ("SetPluginInstance", child->key) == 0)
570 cf_util_get_string (child, &agg->set_plugin_instance);
571 else if (strcasecmp ("SetTypeInstance", child->key) == 0)
572 cf_util_get_string (child, &agg->set_type_instance);
573 else if (strcasecmp ("GroupBy", child->key) == 0)
574 agg_config_handle_group_by (child, agg);
575 else if (strcasecmp ("CalculateNum", child->key) == 0)
576 cf_util_get_boolean (child, &agg->calc_num);
577 else if (strcasecmp ("CalculateSum", child->key) == 0)
578 cf_util_get_boolean (child, &agg->calc_sum);
579 else if (strcasecmp ("CalculateAverage", child->key) == 0)
580 cf_util_get_boolean (child, &agg->calc_average);
581 else if (strcasecmp ("CalculateMinimum", child->key) == 0)
582 cf_util_get_boolean (child, &agg->calc_min);
583 else if (strcasecmp ("CalculateMaximum", child->key) == 0)
584 cf_util_get_boolean (child, &agg->calc_max);
585 else if (strcasecmp ("CalculateStddev", child->key) == 0)
586 cf_util_get_boolean (child, &agg->calc_stddev);
587 else
588 WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
589 "<Aggregation /> blocks and will be ignored.", child->key);
590 }
592 if (agg_is_regex (agg->ident.host))
593 agg->regex_fields |= LU_GROUP_BY_HOST;
594 if (agg_is_regex (agg->ident.plugin))
595 agg->regex_fields |= LU_GROUP_BY_PLUGIN;
596 if (agg_is_regex (agg->ident.plugin_instance))
597 agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
598 if (agg_is_regex (agg->ident.type_instance))
599 agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
601 /* Sanity checking */
602 is_valid = 1;
603 if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */
604 {
605 ERROR ("aggregation plugin: It appears you did not specify the required "
606 "\"Type\" option in this aggregation. "
607 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
608 "Type \"%s\", TypeInstance \"%s\")",
609 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
610 agg->ident.type, agg->ident.type_instance);
611 is_valid = 0;
612 }
613 else if (strchr (agg->ident.type, '/') != NULL)
614 {
615 ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
616 "character. Especially, it may not be a regex. The current "
617 "value is \"%s\".", agg->ident.type);
618 is_valid = 0;
619 } /* }}} */
621 /* Check that there is at least one regex field without a grouping. {{{ */
622 if ((agg->regex_fields & ~agg->group_by) == 0)
623 {
624 ERROR ("aggregation plugin: An aggregation must contain at least one "
625 "wildcard. This is achieved by leaving at least one of the \"Host\", "
626 "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
627 "or using a regular expression and not grouping by that field. "
628 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
629 "Type \"%s\", TypeInstance \"%s\")",
630 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
631 agg->ident.type, agg->ident.type_instance);
632 is_valid = 0;
633 } /* }}} */
635 /* Check that all grouping fields are regular expressions. {{{ */
636 if (agg->group_by & ~agg->regex_fields)
637 {
638 ERROR ("aggregation plugin: Only wildcard fields (fields for which a "
639 "regular expression is configured or which are left blank) can be "
640 "specified in the \"GroupBy\" option. "
641 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
642 "Type \"%s\", TypeInstance \"%s\")",
643 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
644 agg->ident.type, agg->ident.type_instance);
645 is_valid = 0;
646 } /* }}} */
648 if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
649 && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
650 {
651 ERROR ("aggregation plugin: No aggregation function has been specified. "
652 "Without this, I don't know what I should be calculating. "
653 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
654 "Type \"%s\", TypeInstance \"%s\")",
655 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
656 agg->ident.type, agg->ident.type_instance);
657 is_valid = 0;
658 } /* }}} */
660 if (!is_valid) /* {{{ */
661 {
662 sfree (agg);
663 return (-1);
664 } /* }}} */
666 status = lookup_add (lookup, &agg->ident, agg->group_by, agg);
667 if (status != 0)
668 {
669 ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
670 sfree (agg);
671 return (-1);
672 }
674 DEBUG ("aggregation plugin: Successfully added aggregation: "
675 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
676 "Type \"%s\", TypeInstance \"%s\")",
677 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
678 agg->ident.type, agg->ident.type_instance);
679 return (0);
680 } /* }}} int agg_config_aggregation */
682 static int agg_config (oconfig_item_t *ci) /* {{{ */
683 {
684 int i;
686 pthread_mutex_lock (&agg_instance_list_lock);
688 if (lookup == NULL)
689 {
690 lookup = lookup_create (agg_lookup_class_callback,
691 agg_lookup_obj_callback,
692 agg_lookup_free_class_callback,
693 agg_lookup_free_obj_callback);
694 if (lookup == NULL)
695 {
696 pthread_mutex_unlock (&agg_instance_list_lock);
697 ERROR ("aggregation plugin: lookup_create failed.");
698 return (-1);
699 }
700 }
702 for (i = 0; i < ci->children_num; i++)
703 {
704 oconfig_item_t *child = ci->children + i;
706 if (strcasecmp ("Aggregation", child->key) == 0)
707 agg_config_aggregation (child);
708 else
709 WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
710 "<Plugin aggregation /> blocks and will be ignored.", child->key);
711 }
713 pthread_mutex_unlock (&agg_instance_list_lock);
715 return (0);
716 } /* }}} int agg_config */
718 static int agg_read (void) /* {{{ */
719 {
720 agg_instance_t *this;
721 cdtime_t t;
722 int success;
724 t = cdtime ();
725 success = 0;
727 pthread_mutex_lock (&agg_instance_list_lock);
729 /* agg_instance_list_head only holds data, after the "write" callback has
730 * been called with a matching value list at least once. So on startup,
731 * there's a race between the aggregations read() and write() callback. If
732 * the read() callback is called first, agg_instance_list_head is NULL and
733 * "success" may be zero. This is expected and should not result in an error.
734 * Therefore we need to handle this case separately. */
735 if (agg_instance_list_head == NULL)
736 {
737 pthread_mutex_unlock (&agg_instance_list_lock);
738 return (0);
739 }
741 for (this = agg_instance_list_head; this != NULL; this = this->next)
742 {
743 int status;
745 status = agg_instance_read (this, t);
746 if (status != 0)
747 WARNING ("aggregation plugin: Reading an aggregation instance "
748 "failed with status %i.", status);
749 else
750 success++;
751 }
753 pthread_mutex_unlock (&agg_instance_list_lock);
755 return ((success > 0) ? 0 : -1);
756 } /* }}} int agg_read */
758 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
759 __attribute__((unused)) user_data_t *user_data)
760 {
761 _Bool created_by_aggregation = 0;
762 int status;
764 /* Ignore values that were created by the aggregation plugin to avoid weird
765 * effects. */
766 (void) meta_data_get_boolean (vl->meta, "aggregation:created",
767 &created_by_aggregation);
768 if (created_by_aggregation)
769 return (0);
771 if (lookup == NULL)
772 status = ENOENT;
773 else
774 {
775 status = lookup_search (lookup, ds, vl);
776 if (status > 0)
777 status = 0;
778 }
780 return (status);
781 } /* }}} int agg_write */
783 void module_register (void)
784 {
785 plugin_register_complex_config ("aggregation", agg_config);
786 plugin_register_read ("aggregation", agg_read);
787 plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
788 }
790 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */