Code

aggregation plugin: First working prototype.
[collectd.git] / src / aggregation.c
1 /**
2  * collectd - src/aggregation.c
3  * Copyright (C) 2012       Florian Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian Forster <octo at collectd.org>
25  **/
27 #include "collectd.h"
28 #include "plugin.h"
29 #include "common.h"
30 #include "configfile.h"
31 #include "meta_data.h"
32 #include "utils_cache.h" /* for uc_get_rate() */
33 #include "utils_vl_lookup.h"
35 #include <pthread.h>
37 struct aggregation_s /* {{{ */
38 {
39   identifier_t ident;
41   _Bool calc_num;
42   _Bool calc_sum;
43   _Bool calc_average;
44   _Bool calc_min;
45   _Bool calc_max;
46   _Bool calc_stddev;
47 }; /* }}} */
48 typedef struct aggregation_s aggregation_t;
50 struct agg_instance_s;
51 typedef struct agg_instance_s agg_instance_t;
52 struct agg_instance_s /* {{{ */
53 {
54   identifier_t ident;
56   int ds_type;
58   derive_t num;
59   gauge_t sum;
60   gauge_t squares_sum;
62   gauge_t min;
63   gauge_t max;
65   rate_to_value_state_t *state_num;
66   rate_to_value_state_t *state_sum;
67   rate_to_value_state_t *state_average;
68   rate_to_value_state_t *state_min;
69   rate_to_value_state_t *state_max;
70   rate_to_value_state_t *state_stddev;
72   agg_instance_t *next;
73 }; /* }}} */
75 static lookup_t *lookup = NULL;
77 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
78 static agg_instance_t *agg_instance_list_head = NULL;
80 static void agg_destroy (aggregation_t *agg) /* {{{ */
81 {
82   sfree (agg);
83 } /* }}} void agg_destroy */
85 /* Frees all dynamically allocated memory within the instance. */
86 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
87 {
88   if (inst == NULL)
89     return;
91   /* Remove this instance from the global list of instances. */
92   pthread_mutex_lock (&agg_instance_list_lock);
93   if (agg_instance_list_head == inst)
94     agg_instance_list_head = inst->next;
95   else if (agg_instance_list_head != NULL)
96   {
97     agg_instance_t *prev = agg_instance_list_head;
98     while ((prev != NULL) && (prev->next != inst))
99       prev = prev->next;
100     if (prev != NULL)
101       prev->next = inst->next;
102   }
103   pthread_mutex_unlock (&agg_instance_list_lock);
105   sfree (inst->state_num);
106   sfree (inst->state_sum);
107   sfree (inst->state_average);
108   sfree (inst->state_min);
109   sfree (inst->state_max);
110   sfree (inst->state_stddev);
112   memset (inst, 0, sizeof (*inst));
113   inst->ds_type = -1;
114   inst->min = NAN;
115   inst->max = NAN;
116 } /* }}} void agg_instance_destroy */
118 /* Create a new aggregation instance. */
119 static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */
120     aggregation_t *agg)
122   agg_instance_t *inst;
124   DEBUG ("aggregation plugin: Creating new instance.");
126   inst = malloc (sizeof (*inst));
127   if (inst == NULL)
128   {
129     ERROR ("aggregation plugin: malloc() failed.");
130     return (NULL);
131   }
132   memset (inst, 0, sizeof (*inst));
134 #define COPY_FIELD(fld) do { \
135   sstrncpy (inst->ident.fld, \
136       LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
137       sizeof (inst->ident.fld)); \
138 } while (0)
140   COPY_FIELD (host);
141   COPY_FIELD (plugin);
142   COPY_FIELD (plugin_instance);
143   COPY_FIELD (type);
144   COPY_FIELD (type_instance);
146 #undef COPY_FIELD
148   inst->ds_type = -1;
149   inst->min = NAN;
150   inst->max = NAN;
152 #define INIT_STATE(field) do { \
153   inst->state_ ## field = NULL; \
154   if (agg->calc_ ## field) { \
155     inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
156     if (inst->state_ ## field == NULL) { \
157       agg_instance_destroy (inst); \
158       ERROR ("aggregation plugin: malloc() failed."); \
159       return (NULL); \
160     } \
161     memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
162   } \
163 } while (0)
165   INIT_STATE (num);
166   INIT_STATE (sum);
167   INIT_STATE (average);
168   INIT_STATE (min);
169   INIT_STATE (max);
170   INIT_STATE (stddev);
172 #undef INIT_STATE
174   pthread_mutex_lock (&agg_instance_list_lock);
175   inst->next = agg_instance_list_head;
176   agg_instance_list_head = inst;
177   pthread_mutex_unlock (&agg_instance_list_lock);
179   return (inst);
180 } /* }}} agg_instance_t *agg_instance_create */
182 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
183  * the rate of the value list is available. Value lists with more than one data
184  * source are not supported and will return an error. Returns zero on success
185  * and non-zero otherwise. */
186 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
187     data_set_t const *ds, value_list_t const *vl)
189   gauge_t *rate;
191   if (ds->ds_num != 1)
192     return (-1);
194   rate = uc_get_rate (ds, vl);
195   if (rate == NULL)
196   {
197     ERROR ("aggregation plugin: uc_get_rate() failed.");
198     return (-1);
199   }
201   if (isnan (rate[0]))
202   {
203     sfree (rate);
204     return (0);
205   }
207   inst->num++;
208   inst->sum += rate[0];
209   inst->squares_sum += (rate[0] * rate[0]);
211   if (isnan (inst->min) || (inst->min > rate[0]))
212     inst->min = rate[0];
213   if (isnan (inst->max) || (inst->max < rate[0]))
214     inst->max = rate[0];
216   sfree (rate);
217   return (0);
218 } /* }}} int agg_instance_update */
220 /* lookup_class_callback_t for utils_vl_lookup */
221 static void *agg_lookup_class_callback ( /* {{{ */
222     __attribute__((unused)) data_set_t const *ds,
223     value_list_t const *vl, void *user_class)
225   return (agg_instance_create (vl, (aggregation_t *) user_class));
226 } /* }}} void *agg_class_callback */
228 /* lookup_obj_callback_t for utils_vl_lookup */
229 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
230     value_list_t const *vl,
231     __attribute__((unused)) void *user_class,
232     void *user_obj)
234   return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
235 } /* }}} int agg_lookup_obj_callback */
237 /* lookup_free_class_callback_t for utils_vl_lookup */
238 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
240   agg_destroy ((aggregation_t *) user_class);
241 } /* }}} void agg_lookup_free_class_callback */
243 /* lookup_free_obj_callback_t for utils_vl_lookup */
244 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
246   agg_instance_destroy ((agg_instance_t *) user_obj);
247 } /* }}} void agg_lookup_free_obj_callback */
249 /*
250  * <Plugin "aggregation">
251  *   <Aggregation>
252  *     Host "/any/"
253  *     Plugin "cpu"
254  *     PluginInstance "/all/"
255  *     Type "cpu"
256  *     TypeInstance "/any/"
257  *
258  *     CalculateNum true
259  *     CalculateSum true
260  *     CalculateAverage true
261  *     CalculateMinimum true
262  *     CalculateMaximum true
263  *     CalculateStddev true
264  *   </Aggregation>
265  * </Plugin>
266  */
267 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
269   aggregation_t *agg;
270   int status;
271   int i;
273   agg = malloc (sizeof (*agg));
274   if (agg == NULL)
275   {
276     ERROR ("aggregation plugin: malloc failed.");
277     return (-1);
278   }
279   memset (agg, 0, sizeof (*agg));
281   for (i = 0; i < ci->children_num; i++)
282   {
283     oconfig_item_t *child = ci->children + i;
285     if (strcasecmp ("Host", child->key) == 0)
286       cf_util_get_string_buffer (child, agg->ident.host,
287           sizeof (agg->ident.host));
288     else if (strcasecmp ("Plugin", child->key) == 0)
289       cf_util_get_string_buffer (child, agg->ident.plugin,
290           sizeof (agg->ident.plugin));
291     else if (strcasecmp ("PluginInstance", child->key) == 0)
292       cf_util_get_string_buffer (child, agg->ident.plugin_instance,
293           sizeof (agg->ident.plugin_instance));
294     else if (strcasecmp ("Type", child->key) == 0)
295       cf_util_get_string_buffer (child, agg->ident.type,
296           sizeof (agg->ident.type));
297     else if (strcasecmp ("TypeInstance", child->key) == 0)
298       cf_util_get_string_buffer (child, agg->ident.type_instance,
299           sizeof (agg->ident.type_instance));
300     else if (strcasecmp ("CalculateNum", child->key) == 0)
301       cf_util_get_boolean (child, &agg->calc_num);
302     else if (strcasecmp ("CalculateSum", child->key) == 0)
303       cf_util_get_boolean (child, &agg->calc_sum);
304     else if (strcasecmp ("CalculateAverage", child->key) == 0)
305       cf_util_get_boolean (child, &agg->calc_average);
306     else if (strcasecmp ("CalculateMinimum", child->key) == 0)
307       cf_util_get_boolean (child, &agg->calc_min);
308     else if (strcasecmp ("CalculateMaximum", child->key) == 0)
309       cf_util_get_boolean (child, &agg->calc_max);
310     else if (strcasecmp ("CalculateStddev", child->key) == 0)
311       cf_util_get_boolean (child, &agg->calc_stddev);
312     else
313       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
314           "<Aggregation /> blocks and will be ignored.", child->key);
315   }
317   /* TODO(octo): Check identifier:
318    * - At least one wildcard.
319    * - Type is set.
320    */
322   status = lookup_add (lookup, &agg->ident, agg);
323   if (status != 0)
324   {
325     ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
326     sfree (agg);
327     return (-1);
328   }
330   return (0);
331 } /* }}} int agg_config_aggregation */
333 static int agg_config (oconfig_item_t *ci) /* {{{ */
335   int i;
337   if (lookup == NULL)
338   {
339     lookup = lookup_create (agg_lookup_class_callback,
340         agg_lookup_obj_callback,
341         agg_lookup_free_class_callback,
342         agg_lookup_free_obj_callback);
343     if (lookup == NULL)
344     {
345       ERROR ("aggregation plugin: lookup_create failed.");
346       return (-1);
347     }
348   }
350   for (i = 0; i < ci->children_num; i++)
351   {
352     oconfig_item_t *child = ci->children + i;
354     if (strcasecmp ("Aggregation", child->key) == 0)
355       agg_config_aggregation (child);
356     else
357       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
358           "<Plugin aggregation /> blocks and will be ignored.", child->key);
359   }
361   return (0);
362 } /* }}} int agg_config */
364 static int agg_read (void) /* {{{ */
366   agg_instance_t *this;
367   size_t i = 0;
369   pthread_mutex_lock (&agg_instance_list_lock);
370   for (this = agg_instance_list_head; this != NULL; this = this->next)
371   {
372     DEBUG ("aggregation plugin: Handling instance: host = \"%s\", "
373         "plugin = \"%s\", plugin_instance = \"%s\", "
374         "type = \"%s\", type_instance = \"%s\"",
375         this->ident.host,
376         this->ident.plugin, this->ident.plugin_instance,
377         this->ident.type, this->ident.type_instance);
378     i++;
379   }
380   pthread_mutex_unlock (&agg_instance_list_lock);
382   DEBUG ("aggregation plugin: There are currently %zu instances.", i);
384   return (0);
385 } /* }}} int agg_read */
387 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
388     __attribute__((unused)) user_data_t *user_data)
390   _Bool created_by_aggregation = 0;
391   int status;
393   /* Ignore values that were created by the aggregation plugin to avoid weird
394    * effects. */
395   (void) meta_data_get_boolean (vl->meta, "aggregation:created",
396       &created_by_aggregation);
397   if (created_by_aggregation)
398     return (0);
400   if (lookup == NULL)
401     status = ENOENT;
402   else
403   {
404     status = lookup_search (lookup, ds, vl);
405     if (status > 0)
406       status = 0;
407   }
409   return (status);
410 } /* }}} int agg_write */
412 void module_register (void)
414   plugin_register_complex_config ("aggregation", agg_config);
415   plugin_register_read ("aggregation", agg_read);
416   plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
419 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */