84df548511079d2afdd66eb2144b8042b6aa6074
1 /**
2 * collectd - src/aggregation.c
3 * Copyright (C) 2012 Florian Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Florian Forster <octo at collectd.org>
25 **/
27 #include "collectd.h"
29 #include "plugin.h"
30 #include "common.h"
31 #include "configfile.h"
32 #include "meta_data.h"
33 #include "utils_cache.h" /* for uc_get_rate() */
34 #include "utils_subst.h"
35 #include "utils_vl_lookup.h"
37 #define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0)
38 #define AGG_FUNC_PLACEHOLDER "%{aggregation}"
40 struct aggregation_s /* {{{ */
41 {
42 identifier_t ident;
43 unsigned int group_by;
45 unsigned int regex_fields;
47 char *set_host;
48 char *set_plugin;
49 char *set_plugin_instance;
50 char *set_type_instance;
52 _Bool calc_num;
53 _Bool calc_sum;
54 _Bool calc_average;
55 _Bool calc_min;
56 _Bool calc_max;
57 _Bool calc_stddev;
58 }; /* }}} */
59 typedef struct aggregation_s aggregation_t;
61 struct agg_instance_s;
62 typedef struct agg_instance_s agg_instance_t;
63 struct agg_instance_s /* {{{ */
64 {
65 pthread_mutex_t lock;
66 identifier_t ident;
68 int ds_type;
70 derive_t num;
71 gauge_t sum;
72 gauge_t squares_sum;
74 gauge_t min;
75 gauge_t max;
77 rate_to_value_state_t *state_num;
78 rate_to_value_state_t *state_sum;
79 rate_to_value_state_t *state_average;
80 rate_to_value_state_t *state_min;
81 rate_to_value_state_t *state_max;
82 rate_to_value_state_t *state_stddev;
84 agg_instance_t *next;
85 }; /* }}} */
87 static lookup_t *lookup = NULL;
89 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
90 static agg_instance_t *agg_instance_list_head = NULL;
92 static _Bool agg_is_regex (char const *str) /* {{{ */
93 {
94 size_t len;
96 if (str == NULL)
97 return (0);
99 len = strlen (str);
100 if (len < 3)
101 return (0);
103 if ((str[0] == '/') && (str[len - 1] == '/'))
104 return (1);
105 else
106 return (0);
107 } /* }}} _Bool agg_is_regex */
109 static void agg_destroy (aggregation_t *agg) /* {{{ */
110 {
111 sfree (agg);
112 } /* }}} void agg_destroy */
114 /* Frees all dynamically allocated memory within the instance. */
115 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
116 {
117 if (inst == NULL)
118 return;
120 /* Remove this instance from the global list of instances. */
121 pthread_mutex_lock (&agg_instance_list_lock);
122 if (agg_instance_list_head == inst)
123 agg_instance_list_head = inst->next;
124 else if (agg_instance_list_head != NULL)
125 {
126 agg_instance_t *prev = agg_instance_list_head;
127 while ((prev != NULL) && (prev->next != inst))
128 prev = prev->next;
129 if (prev != NULL)
130 prev->next = inst->next;
131 }
132 pthread_mutex_unlock (&agg_instance_list_lock);
134 sfree (inst->state_num);
135 sfree (inst->state_sum);
136 sfree (inst->state_average);
137 sfree (inst->state_min);
138 sfree (inst->state_max);
139 sfree (inst->state_stddev);
141 memset (inst, 0, sizeof (*inst));
142 inst->ds_type = -1;
143 inst->min = NAN;
144 inst->max = NAN;
145 } /* }}} void agg_instance_destroy */
147 static int agg_instance_create_name (agg_instance_t *inst, /* {{{ */
148 value_list_t const *vl, aggregation_t const *agg)
149 {
150 #define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) do { \
151 if (agg->set_ ## field != NULL) \
152 sstrncpy (buffer, agg->set_ ## field, buffer_size); \
153 else if ((agg->regex_fields & group_mask) \
154 && (agg->group_by & group_mask)) \
155 sstrncpy (buffer, vl->field, buffer_size); \
156 else if ((agg->regex_fields & group_mask) \
157 && (AGG_MATCHES_ALL (agg->ident.field))) \
158 sstrncpy (buffer, all_value, buffer_size); \
159 else \
160 sstrncpy (buffer, agg->ident.field, buffer_size); \
161 } while (0)
163 /* Host */
164 COPY_FIELD (inst->ident.host, sizeof (inst->ident.host),
165 host, LU_GROUP_BY_HOST, "global");
167 /* Plugin */
168 if (agg->set_plugin != NULL)
169 sstrncpy (inst->ident.plugin, agg->set_plugin,
170 sizeof (inst->ident.plugin));
171 else
172 sstrncpy (inst->ident.plugin, "aggregation", sizeof (inst->ident.plugin));
174 /* Plugin instance */
175 if (agg->set_plugin_instance != NULL)
176 sstrncpy (inst->ident.plugin_instance, agg->set_plugin_instance,
177 sizeof (inst->ident.plugin_instance));
178 else
179 {
180 char tmp_plugin[DATA_MAX_NAME_LEN];
181 char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
183 if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
184 && (agg->group_by & LU_GROUP_BY_PLUGIN))
185 sstrncpy (tmp_plugin, vl->plugin, sizeof (tmp_plugin));
186 else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
187 && (AGG_MATCHES_ALL (agg->ident.plugin)))
188 sstrncpy (tmp_plugin, "", sizeof (tmp_plugin));
189 else
190 sstrncpy (tmp_plugin, agg->ident.plugin, sizeof (tmp_plugin));
192 if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
193 && (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
194 sstrncpy (tmp_plugin_instance, vl->plugin_instance,
195 sizeof (tmp_plugin_instance));
196 else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
197 && (AGG_MATCHES_ALL (agg->ident.plugin_instance)))
198 sstrncpy (tmp_plugin_instance, "", sizeof (tmp_plugin_instance));
199 else
200 sstrncpy (tmp_plugin_instance, agg->ident.plugin_instance,
201 sizeof (tmp_plugin_instance));
203 if ((strcmp ("", tmp_plugin) == 0)
204 && (strcmp ("", tmp_plugin_instance) == 0))
205 sstrncpy (inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
206 sizeof (inst->ident.plugin_instance));
207 else if (strcmp ("", tmp_plugin) != 0)
208 ssnprintf (inst->ident.plugin_instance,
209 sizeof (inst->ident.plugin_instance),
210 "%s-%s", tmp_plugin, AGG_FUNC_PLACEHOLDER);
211 else if (strcmp ("", tmp_plugin_instance) != 0)
212 ssnprintf (inst->ident.plugin_instance,
213 sizeof (inst->ident.plugin_instance),
214 "%s-%s", tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
215 else
216 ssnprintf (inst->ident.plugin_instance,
217 sizeof (inst->ident.plugin_instance),
218 "%s-%s-%s", tmp_plugin, tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
219 }
221 /* Type */
222 sstrncpy (inst->ident.type, agg->ident.type, sizeof (inst->ident.type));
224 /* Type instance */
225 COPY_FIELD (inst->ident.type_instance, sizeof (inst->ident.type_instance),
226 type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
228 #undef COPY_FIELD
230 return (0);
231 } /* }}} int agg_instance_create_name */
233 /* Create a new aggregation instance. */
234 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
235 value_list_t const *vl, aggregation_t *agg)
236 {
237 agg_instance_t *inst;
239 DEBUG ("aggregation plugin: Creating new instance.");
241 inst = calloc (1, sizeof (*inst));
242 if (inst == NULL)
243 {
244 ERROR ("aggregation plugin: calloc() failed.");
245 return (NULL);
246 }
247 pthread_mutex_init (&inst->lock, /* attr = */ NULL);
249 inst->ds_type = ds->ds[0].type;
251 agg_instance_create_name (inst, vl, agg);
253 inst->min = NAN;
254 inst->max = NAN;
256 #define INIT_STATE(field) do { \
257 inst->state_ ## field = NULL; \
258 if (agg->calc_ ## field) { \
259 inst->state_ ## field = calloc (1, sizeof (*inst->state_ ## field)); \
260 if (inst->state_ ## field == NULL) { \
261 agg_instance_destroy (inst); \
262 free (inst); \
263 ERROR ("aggregation plugin: calloc() failed."); \
264 return (NULL); \
265 } \
266 } \
267 } while (0)
269 INIT_STATE (num);
270 INIT_STATE (sum);
271 INIT_STATE (average);
272 INIT_STATE (min);
273 INIT_STATE (max);
274 INIT_STATE (stddev);
276 #undef INIT_STATE
278 pthread_mutex_lock (&agg_instance_list_lock);
279 inst->next = agg_instance_list_head;
280 agg_instance_list_head = inst;
281 pthread_mutex_unlock (&agg_instance_list_lock);
283 return (inst);
284 } /* }}} agg_instance_t *agg_instance_create */
286 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
287 * the rate of the value list is available. Value lists with more than one data
288 * source are not supported and will return an error. Returns zero on success
289 * and non-zero otherwise. */
290 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
291 data_set_t const *ds, value_list_t const *vl)
292 {
293 gauge_t *rate;
295 if (ds->ds_num != 1)
296 {
297 ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one "
298 "data source. This is currently not supported by this plugin. "
299 "Sorry.", ds->type);
300 return (EINVAL);
301 }
303 rate = uc_get_rate (ds, vl);
304 if (rate == NULL)
305 {
306 char ident[6 * DATA_MAX_NAME_LEN];
307 FORMAT_VL (ident, sizeof (ident), vl);
308 ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".",
309 ident);
310 return (ENOENT);
311 }
313 if (isnan (rate[0]))
314 {
315 sfree (rate);
316 return (0);
317 }
319 pthread_mutex_lock (&inst->lock);
321 inst->num++;
322 inst->sum += rate[0];
323 inst->squares_sum += (rate[0] * rate[0]);
325 if (isnan (inst->min) || (inst->min > rate[0]))
326 inst->min = rate[0];
327 if (isnan (inst->max) || (inst->max < rate[0]))
328 inst->max = rate[0];
330 pthread_mutex_unlock (&inst->lock);
332 sfree (rate);
333 return (0);
334 } /* }}} int agg_instance_update */
336 static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
337 char const *func, gauge_t rate, rate_to_value_state_t *state,
338 value_list_t *vl, char const *pi_prefix, cdtime_t t)
339 {
340 value_t v = { 0 } ;
341 int status;
343 if (pi_prefix[0] != 0)
344 subst_string (vl->plugin_instance, sizeof (vl->plugin_instance),
345 pi_prefix, AGG_FUNC_PLACEHOLDER, func);
346 else
347 sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
349 status = rate_to_value (&v, rate, state, inst->ds_type, t);
350 if (status != 0)
351 {
352 /* If this is the first iteration and rate_to_value() was asked to return a
353 * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
354 * gracefully. */
355 if (status == EAGAIN)
356 return (0);
358 WARNING ("aggregation plugin: rate_to_value failed with status %i.",
359 status);
360 return (-1);
361 }
363 vl->values = &v;
364 vl->values_len = 1;
366 plugin_dispatch_values (vl);
368 vl->values = NULL;
369 vl->values_len = 0;
371 return (0);
372 } /* }}} int agg_instance_read_func */
374 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
375 {
376 value_list_t vl = VALUE_LIST_INIT;
378 /* Pre-set all the fields in the value list that will not change per
379 * aggregation type (sum, average, ...). The struct will be re-used and must
380 * therefore be dispatched using the "secure" function. */
382 vl.time = t;
383 vl.interval = 0;
385 vl.meta = meta_data_create ();
386 if (vl.meta == NULL)
387 {
388 ERROR ("aggregation plugin: meta_data_create failed.");
389 return (-1);
390 }
391 meta_data_add_boolean (vl.meta, "aggregation:created", 1);
393 sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
394 sstrncpy (vl.plugin, inst->ident.plugin, sizeof (vl.plugin));
395 sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
396 sstrncpy (vl.type_instance, inst->ident.type_instance,
397 sizeof (vl.type_instance));
399 #define READ_FUNC(func, rate) do { \
400 if (inst->state_ ## func != NULL) { \
401 agg_instance_read_func (inst, #func, rate, \
402 inst->state_ ## func, &vl, inst->ident.plugin_instance, t); \
403 } \
404 } while (0)
406 pthread_mutex_lock (&inst->lock);
408 READ_FUNC (num, (gauge_t) inst->num);
410 /* All other aggregations are only defined when there have been any values
411 * at all. */
412 if (inst->num > 0)
413 {
414 READ_FUNC (sum, inst->sum);
415 READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
416 READ_FUNC (min, inst->min);
417 READ_FUNC (max, inst->max);
418 READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
419 - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
420 }
422 /* Reset internal state. */
423 inst->num = 0;
424 inst->sum = 0.0;
425 inst->squares_sum = 0.0;
426 inst->min = NAN;
427 inst->max = NAN;
429 pthread_mutex_unlock (&inst->lock);
431 meta_data_destroy (vl.meta);
432 vl.meta = NULL;
434 return (0);
435 } /* }}} int agg_instance_read */
437 /* lookup_class_callback_t for utils_vl_lookup */
438 static void *agg_lookup_class_callback ( /* {{{ */
439 data_set_t const *ds, value_list_t const *vl, void *user_class)
440 {
441 return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
442 } /* }}} void *agg_class_callback */
444 /* lookup_obj_callback_t for utils_vl_lookup */
445 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
446 value_list_t const *vl,
447 __attribute__((unused)) void *user_class,
448 void *user_obj)
449 {
450 return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
451 } /* }}} int agg_lookup_obj_callback */
453 /* lookup_free_class_callback_t for utils_vl_lookup */
454 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
455 {
456 agg_destroy ((aggregation_t *) user_class);
457 } /* }}} void agg_lookup_free_class_callback */
459 /* lookup_free_obj_callback_t for utils_vl_lookup */
460 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
461 {
462 agg_instance_destroy ((agg_instance_t *) user_obj);
463 } /* }}} void agg_lookup_free_obj_callback */
465 /*
466 * <Plugin "aggregation">
467 * <Aggregation>
468 * Plugin "cpu"
469 * Type "cpu"
470 *
471 * GroupBy Host
472 * GroupBy TypeInstance
473 *
474 * CalculateNum true
475 * CalculateSum true
476 * CalculateAverage true
477 * CalculateMinimum true
478 * CalculateMaximum true
479 * CalculateStddev true
480 * </Aggregation>
481 * </Plugin>
482 */
483 static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
484 aggregation_t *agg)
485 {
486 int i;
488 for (i = 0; i < ci->values_num; i++)
489 {
490 char const *value;
492 if (ci->values[i].type != OCONFIG_TYPE_STRING)
493 {
494 ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
495 "is not a string.", i + 1);
496 continue;
497 }
499 value = ci->values[i].value.string;
501 if (strcasecmp ("Host", value) == 0)
502 agg->group_by |= LU_GROUP_BY_HOST;
503 else if (strcasecmp ("Plugin", value) == 0)
504 agg->group_by |= LU_GROUP_BY_PLUGIN;
505 else if (strcasecmp ("PluginInstance", value) == 0)
506 agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
507 else if (strcasecmp ("TypeInstance", value) == 0)
508 agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
509 else if (strcasecmp ("Type", value) == 0)
510 ERROR ("aggregation plugin: Grouping by type is not supported.");
511 else
512 WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
513 "option is invalid and will be ignored.", value);
514 } /* for (ci->values) */
516 return (0);
517 } /* }}} int agg_config_handle_group_by */
519 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
520 {
521 aggregation_t *agg;
522 _Bool is_valid;
523 int status;
524 int i;
526 agg = calloc (1, sizeof (*agg));
527 if (agg == NULL)
528 {
529 ERROR ("aggregation plugin: calloc failed.");
530 return (-1);
531 }
533 sstrncpy (agg->ident.host, "/.*/", sizeof (agg->ident.host));
534 sstrncpy (agg->ident.plugin, "/.*/", sizeof (agg->ident.plugin));
535 sstrncpy (agg->ident.plugin_instance, "/.*/",
536 sizeof (agg->ident.plugin_instance));
537 sstrncpy (agg->ident.type, "/.*/", sizeof (agg->ident.type));
538 sstrncpy (agg->ident.type_instance, "/.*/",
539 sizeof (agg->ident.type_instance));
541 for (i = 0; i < ci->children_num; i++)
542 {
543 oconfig_item_t *child = ci->children + i;
545 if (strcasecmp ("Host", child->key) == 0)
546 cf_util_get_string_buffer (child, agg->ident.host,
547 sizeof (agg->ident.host));
548 else if (strcasecmp ("Plugin", child->key) == 0)
549 cf_util_get_string_buffer (child, agg->ident.plugin,
550 sizeof (agg->ident.plugin));
551 else if (strcasecmp ("PluginInstance", child->key) == 0)
552 cf_util_get_string_buffer (child, agg->ident.plugin_instance,
553 sizeof (agg->ident.plugin_instance));
554 else if (strcasecmp ("Type", child->key) == 0)
555 cf_util_get_string_buffer (child, agg->ident.type,
556 sizeof (agg->ident.type));
557 else if (strcasecmp ("TypeInstance", child->key) == 0)
558 cf_util_get_string_buffer (child, agg->ident.type_instance,
559 sizeof (agg->ident.type_instance));
560 else if (strcasecmp ("SetHost", child->key) == 0)
561 cf_util_get_string (child, &agg->set_host);
562 else if (strcasecmp ("SetPlugin", child->key) == 0)
563 cf_util_get_string (child, &agg->set_plugin);
564 else if (strcasecmp ("SetPluginInstance", child->key) == 0)
565 cf_util_get_string (child, &agg->set_plugin_instance);
566 else if (strcasecmp ("SetTypeInstance", child->key) == 0)
567 cf_util_get_string (child, &agg->set_type_instance);
568 else if (strcasecmp ("GroupBy", child->key) == 0)
569 agg_config_handle_group_by (child, agg);
570 else if (strcasecmp ("CalculateNum", child->key) == 0)
571 cf_util_get_boolean (child, &agg->calc_num);
572 else if (strcasecmp ("CalculateSum", child->key) == 0)
573 cf_util_get_boolean (child, &agg->calc_sum);
574 else if (strcasecmp ("CalculateAverage", child->key) == 0)
575 cf_util_get_boolean (child, &agg->calc_average);
576 else if (strcasecmp ("CalculateMinimum", child->key) == 0)
577 cf_util_get_boolean (child, &agg->calc_min);
578 else if (strcasecmp ("CalculateMaximum", child->key) == 0)
579 cf_util_get_boolean (child, &agg->calc_max);
580 else if (strcasecmp ("CalculateStddev", child->key) == 0)
581 cf_util_get_boolean (child, &agg->calc_stddev);
582 else
583 WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
584 "<Aggregation /> blocks and will be ignored.", child->key);
585 }
587 if (agg_is_regex (agg->ident.host))
588 agg->regex_fields |= LU_GROUP_BY_HOST;
589 if (agg_is_regex (agg->ident.plugin))
590 agg->regex_fields |= LU_GROUP_BY_PLUGIN;
591 if (agg_is_regex (agg->ident.plugin_instance))
592 agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
593 if (agg_is_regex (agg->ident.type_instance))
594 agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
596 /* Sanity checking */
597 is_valid = 1;
598 if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */
599 {
600 ERROR ("aggregation plugin: It appears you did not specify the required "
601 "\"Type\" option in this aggregation. "
602 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
603 "Type \"%s\", TypeInstance \"%s\")",
604 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
605 agg->ident.type, agg->ident.type_instance);
606 is_valid = 0;
607 }
608 else if (strchr (agg->ident.type, '/') != NULL)
609 {
610 ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
611 "character. Especially, it may not be a regex. The current "
612 "value is \"%s\".", agg->ident.type);
613 is_valid = 0;
614 } /* }}} */
616 /* Check that there is at least one regex field without a grouping. {{{ */
617 if ((agg->regex_fields & ~agg->group_by) == 0)
618 {
619 ERROR ("aggregation plugin: An aggregation must contain at least one "
620 "wildcard. This is achieved by leaving at least one of the \"Host\", "
621 "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
622 "or using a regular expression and not grouping by that field. "
623 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
624 "Type \"%s\", TypeInstance \"%s\")",
625 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
626 agg->ident.type, agg->ident.type_instance);
627 is_valid = 0;
628 } /* }}} */
630 /* Check that all grouping fields are regular expressions. {{{ */
631 if (agg->group_by & ~agg->regex_fields)
632 {
633 ERROR ("aggregation plugin: Only wildcard fields (fields for which a "
634 "regular expression is configured or which are left blank) can be "
635 "specified in the \"GroupBy\" option. "
636 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
637 "Type \"%s\", TypeInstance \"%s\")",
638 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
639 agg->ident.type, agg->ident.type_instance);
640 is_valid = 0;
641 } /* }}} */
643 if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
644 && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
645 {
646 ERROR ("aggregation plugin: No aggregation function has been specified. "
647 "Without this, I don't know what I should be calculating. "
648 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
649 "Type \"%s\", TypeInstance \"%s\")",
650 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
651 agg->ident.type, agg->ident.type_instance);
652 is_valid = 0;
653 } /* }}} */
655 if (!is_valid) /* {{{ */
656 {
657 sfree (agg);
658 return (-1);
659 } /* }}} */
661 status = lookup_add (lookup, &agg->ident, agg->group_by, agg);
662 if (status != 0)
663 {
664 ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
665 sfree (agg);
666 return (-1);
667 }
669 DEBUG ("aggregation plugin: Successfully added aggregation: "
670 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
671 "Type \"%s\", TypeInstance \"%s\")",
672 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
673 agg->ident.type, agg->ident.type_instance);
674 return (0);
675 } /* }}} int agg_config_aggregation */
677 static int agg_config (oconfig_item_t *ci) /* {{{ */
678 {
679 int i;
681 pthread_mutex_lock (&agg_instance_list_lock);
683 if (lookup == NULL)
684 {
685 lookup = lookup_create (agg_lookup_class_callback,
686 agg_lookup_obj_callback,
687 agg_lookup_free_class_callback,
688 agg_lookup_free_obj_callback);
689 if (lookup == NULL)
690 {
691 pthread_mutex_unlock (&agg_instance_list_lock);
692 ERROR ("aggregation plugin: lookup_create failed.");
693 return (-1);
694 }
695 }
697 for (i = 0; i < ci->children_num; i++)
698 {
699 oconfig_item_t *child = ci->children + i;
701 if (strcasecmp ("Aggregation", child->key) == 0)
702 agg_config_aggregation (child);
703 else
704 WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
705 "<Plugin aggregation /> blocks and will be ignored.", child->key);
706 }
708 pthread_mutex_unlock (&agg_instance_list_lock);
710 return (0);
711 } /* }}} int agg_config */
713 static int agg_read (void) /* {{{ */
714 {
715 agg_instance_t *this;
716 cdtime_t t;
717 int success;
719 t = cdtime ();
720 success = 0;
722 pthread_mutex_lock (&agg_instance_list_lock);
724 /* agg_instance_list_head only holds data, after the "write" callback has
725 * been called with a matching value list at least once. So on startup,
726 * there's a race between the aggregations read() and write() callback. If
727 * the read() callback is called first, agg_instance_list_head is NULL and
728 * "success" may be zero. This is expected and should not result in an error.
729 * Therefore we need to handle this case separately. */
730 if (agg_instance_list_head == NULL)
731 {
732 pthread_mutex_unlock (&agg_instance_list_lock);
733 return (0);
734 }
736 for (this = agg_instance_list_head; this != NULL; this = this->next)
737 {
738 int status;
740 status = agg_instance_read (this, t);
741 if (status != 0)
742 WARNING ("aggregation plugin: Reading an aggregation instance "
743 "failed with status %i.", status);
744 else
745 success++;
746 }
748 pthread_mutex_unlock (&agg_instance_list_lock);
750 return ((success > 0) ? 0 : -1);
751 } /* }}} int agg_read */
753 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
754 __attribute__((unused)) user_data_t *user_data)
755 {
756 _Bool created_by_aggregation = 0;
757 int status;
759 /* Ignore values that were created by the aggregation plugin to avoid weird
760 * effects. */
761 (void) meta_data_get_boolean (vl->meta, "aggregation:created",
762 &created_by_aggregation);
763 if (created_by_aggregation)
764 return (0);
766 if (lookup == NULL)
767 status = ENOENT;
768 else
769 {
770 status = lookup_search (lookup, ds, vl);
771 if (status > 0)
772 status = 0;
773 }
775 return (status);
776 } /* }}} int agg_write */
778 void module_register (void)
779 {
780 plugin_register_complex_config ("aggregation", agg_config);
781 plugin_register_read ("aggregation", agg_read);
782 plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
783 }
785 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */