Code

Merge branch 'collectd-5.2' into collectd-5.3
[collectd.git] / src / rrdcached.c
1 /**
2  * collectd - src/rrdcached.c
3  * Copyright (C) 2008-2013  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at collectd.org>
20  **/
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "utils_rrdcreate.h"
27 #undef HAVE_CONFIG_H
28 #include <rrd.h>
29 #include <rrd_client.h>
31 /*
32  * Private variables
33  */
34 static char *datadir = NULL;
35 static char *daemon_address = NULL;
36 static _Bool config_create_files = 1;
37 static _Bool config_collect_stats = 1;
38 static rrdcreate_config_t rrdcreate_config =
39 {
40         /* stepsize = */ 0,
41         /* heartbeat = */ 0,
42         /* rrarows = */ 1200,
43         /* xff = */ 0.1,
45         /* timespans = */ NULL,
46         /* timespans_num = */ 0,
48         /* consolidation_functions = */ NULL,
49         /* consolidation_functions_num = */ 0,
51         /* async = */ 0
52 };
54 /*
55  * Prototypes.
56  */
57 static int rc_write (const data_set_t *ds, const value_list_t *vl,
58     user_data_t __attribute__((unused)) *user_data);
59 static int rc_flush (__attribute__((unused)) cdtime_t timeout,
60     const char *identifier, __attribute__((unused)) user_data_t *ud);
62 static int value_list_to_string (char *buffer, int buffer_len,
63     const data_set_t *ds, const value_list_t *vl)
64 {
65   int offset;
66   int status;
67   int i;
68   time_t t;
70   assert (0 == strcmp (ds->type, vl->type));
72   memset (buffer, '\0', buffer_len);
74   t = CDTIME_T_TO_TIME_T (vl->time);
75   status = ssnprintf (buffer, buffer_len, "%lu", (unsigned long) t);
76   if ((status < 1) || (status >= buffer_len))
77     return (-1);
78   offset = status;
80   for (i = 0; i < ds->ds_num; i++)
81   {
82     if ((ds->ds[i].type != DS_TYPE_COUNTER)
83         && (ds->ds[i].type != DS_TYPE_GAUGE)
84         && (ds->ds[i].type != DS_TYPE_DERIVE)
85         && (ds->ds[i].type != DS_TYPE_ABSOLUTE))
86       return (-1);
88     if (ds->ds[i].type == DS_TYPE_COUNTER)
89     {
90       status = ssnprintf (buffer + offset, buffer_len - offset,
91           ":%llu", vl->values[i].counter);
92     }
93     else if (ds->ds[i].type == DS_TYPE_GAUGE) 
94     {
95       status = ssnprintf (buffer + offset, buffer_len - offset,
96           ":%f", vl->values[i].gauge);
97     }
98     else if (ds->ds[i].type == DS_TYPE_DERIVE) {
99       status = ssnprintf (buffer + offset, buffer_len - offset,
100           ":%"PRIi64, vl->values[i].derive);
101     }
102     else /* if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */ {
103       status = ssnprintf (buffer + offset, buffer_len - offset,
104           ":%"PRIu64, vl->values[i].absolute);
105  
106     }
108     if ((status < 1) || (status >= (buffer_len - offset)))
109       return (-1);
111     offset += status;
112   } /* for ds->ds_num */
114   return (0);
115 } /* int value_list_to_string */
117 static int value_list_to_filename (char *buffer, size_t buffer_size,
118     value_list_t const *vl)
120   char const suffix[] = ".rrd";
121   int status;
122   size_t len;
124   status = FORMAT_VL (buffer, buffer_size, vl);
125   if (status != 0)
126     return (status);
128   len = strlen (buffer);
129   assert (len < buffer_size);
130   buffer += len;
131   buffer_size -= len;
133   if (buffer_size <= sizeof (suffix))
134     return (ENOMEM);
136   memcpy (buffer, suffix, sizeof (suffix));
137   return (0);
138 } /* int value_list_to_filename */
140 static int rc_config_get_int_positive (oconfig_item_t const *ci, int *ret)
142   int status;
143   int tmp = 0;
145   status = cf_util_get_int (ci, &tmp);
146   if (status != 0)
147     return (status);
148   if (tmp < 0)
149     return (EINVAL);
151   *ret = tmp;
152   return (0);
153 } /* int rc_config_get_int_positive */
155 static int rc_config_get_xff (oconfig_item_t const *ci, double *ret)
157   double value;
159   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER))
160   {
161     ERROR ("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
162         "in the range [0.0, 1.0)", ci->key);
163     return (EINVAL);
164   }
166   value = ci->values[0].value.number;
167   if ((value >= 0.0) && (value < 1.0))
168   {
169     *ret = value;
170     return (0);
171   }
173   ERROR ("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
174       "in the range [0.0, 1.0)", ci->key);
175   return (EINVAL);
176 } /* int rc_config_get_xff */
178 static int rc_config_add_timespan (int timespan)
180   int *tmp;
182   if (timespan <= 0)
183     return (EINVAL);
185   tmp = realloc (rrdcreate_config.timespans,
186       sizeof (*rrdcreate_config.timespans)
187       * (rrdcreate_config.timespans_num + 1));
188   if (tmp == NULL)
189     return (ENOMEM);
190   rrdcreate_config.timespans = tmp;
192   rrdcreate_config.timespans[rrdcreate_config.timespans_num] = timespan;
193   rrdcreate_config.timespans_num++;
195   return (0);
196 } /* int rc_config_add_timespan */
198 static int rc_config (oconfig_item_t *ci)
200   int i;
202   for (i = 0; i < ci->children_num; i++)
203   {
204     oconfig_item_t const *child = ci->children + i;
205     const char *key = child->key;
206     int status = 0;
208     if (strcasecmp ("DataDir", key) == 0)
209     {
210       status = cf_util_get_string (child, &datadir);
211       if (status == 0)
212       {
213         int len = strlen (datadir);
215         while ((len > 0) && (datadir[len - 1] == '/'))
216         {
217           len--;
218           datadir[len] = 0;
219         }
221         if (len <= 0)
222           sfree (datadir);
223       }
224     }
225     else if (strcasecmp ("DaemonAddress", key) == 0)
226       status = cf_util_get_string (child, &daemon_address);
227     else if (strcasecmp ("CreateFiles", key) == 0)
228       status = cf_util_get_boolean (child, &config_create_files);
229     else if (strcasecmp ("CreateFilesAsync", key) == 0)
230       status = cf_util_get_boolean (child, &rrdcreate_config.async);
231     else if (strcasecmp ("CollectStatistics", key) == 0)
232       status = cf_util_get_boolean (child, &config_collect_stats);
233     else if (strcasecmp ("StepSize", key) == 0)
234     {
235       int tmp = -1;
237       status = rc_config_get_int_positive (child, &tmp);
238       if (status == 0)
239         rrdcreate_config.stepsize = (unsigned long) tmp;
240     }
241     else if (strcasecmp ("HeartBeat", key) == 0)
242       status = rc_config_get_int_positive (child, &rrdcreate_config.heartbeat);
243     else if (strcasecmp ("RRARows", key) == 0)
244       status = rc_config_get_int_positive (child, &rrdcreate_config.rrarows);
245     else if (strcasecmp ("RRATimespan", key) == 0)
246     {
247       int tmp = -1;
248       status = rc_config_get_int_positive (child, &tmp);
249       if (status == 0)
250         status = rc_config_add_timespan (tmp);
251     }
252     else if (strcasecmp ("XFF", key) == 0)
253       status = rc_config_get_xff (child, &rrdcreate_config.xff);
254     else
255     {
256       WARNING ("rrdcached plugin: Ignoring invalid option %s.", key);
257       continue;
258     }
260     if (status != 0)
261       WARNING ("rrdcached plugin: Handling the \"%s\" option failed.", key);
262   }
264   if (daemon_address != NULL)
265   {
266     plugin_register_write ("rrdcached", rc_write, /* user_data = */ NULL);
267     plugin_register_flush ("rrdcached", rc_flush, /* user_data = */ NULL);
268   }
269   return (0);
270 } /* int rc_config */
272 static int rc_read (void)
274   int status;
275   rrdc_stats_t *head;
276   rrdc_stats_t *ptr;
278   value_t values[1];
279   value_list_t vl = VALUE_LIST_INIT;
281   if (daemon_address == NULL)
282     return (-1);
284   if (!config_collect_stats)
285     return (-1);
287   vl.values = values;
288   vl.values_len = 1;
290   if ((strncmp ("unix:", daemon_address, strlen ("unix:")) == 0)
291       || (daemon_address[0] == '/'))
292     sstrncpy (vl.host, hostname_g, sizeof (vl.host));
293   else
294     sstrncpy (vl.host, daemon_address, sizeof (vl.host));
295   sstrncpy (vl.plugin, "rrdcached", sizeof (vl.plugin));
297   status = rrdc_connect (daemon_address);
298   if (status != 0)
299   {
300     ERROR ("rrdcached plugin: rrdc_connect (%s) failed with status %i.",
301         daemon_address, status);
302     return (-1);
303   }
305   head = NULL;
306   status = rrdc_stats_get (&head);
307   if (status != 0)
308   {
309     ERROR ("rrdcached plugin: rrdc_stats_get failed with status %i.", status);
310     return (-1);
311   }
313   for (ptr = head; ptr != NULL; ptr = ptr->next)
314   {
315     if (ptr->type == RRDC_STATS_TYPE_GAUGE)
316       values[0].gauge = (gauge_t) ptr->value.gauge;
317     else if (ptr->type == RRDC_STATS_TYPE_COUNTER)
318       values[0].counter = (counter_t) ptr->value.counter;
319     else
320       continue;
322     if (strcasecmp ("QueueLength", ptr->name) == 0)
323     {
324       sstrncpy (vl.type, "queue_length", sizeof (vl.type));
325       sstrncpy (vl.type_instance, "", sizeof (vl.type_instance));
326     }
327     else if (strcasecmp ("UpdatesWritten", ptr->name) == 0)
328     {
329       sstrncpy (vl.type, "operations", sizeof (vl.type));
330       sstrncpy (vl.type_instance, "write-updates", sizeof (vl.type_instance));
331     }
332     else if (strcasecmp ("DataSetsWritten", ptr->name) == 0)
333     {
334       sstrncpy (vl.type, "operations", sizeof (vl.type));
335       sstrncpy (vl.type_instance, "write-data_sets",
336           sizeof (vl.type_instance));
337     }
338     else if (strcasecmp ("TreeNodesNumber", ptr->name) == 0)
339     {
340       sstrncpy (vl.type, "gauge", sizeof (vl.type));
341       sstrncpy (vl.type_instance, "tree_nodes", sizeof (vl.type_instance));
342     }
343     else if (strcasecmp ("TreeDepth", ptr->name) == 0)
344     {
345       sstrncpy (vl.type, "gauge", sizeof (vl.type));
346       sstrncpy (vl.type_instance, "tree_depth", sizeof (vl.type_instance));
347     }
348     else if (strcasecmp ("FlushesReceived", ptr->name) == 0)
349     {
350       sstrncpy (vl.type, "operations", sizeof (vl.type));
351       sstrncpy (vl.type_instance, "receive-flush", sizeof (vl.type_instance));
352     }
353     else if (strcasecmp ("JournalBytes", ptr->name) == 0)
354     {
355       sstrncpy (vl.type, "counter", sizeof (vl.type));
356       sstrncpy (vl.type_instance, "journal-bytes", sizeof (vl.type_instance));
357     }
358     else if (strcasecmp ("JournalRotate", ptr->name) == 0)
359     {
360       sstrncpy (vl.type, "counter", sizeof (vl.type));
361       sstrncpy (vl.type_instance, "journal-rotates", sizeof (vl.type_instance));
362     }
363     else if (strcasecmp ("UpdatesReceived", ptr->name) == 0)
364     {
365       sstrncpy (vl.type, "operations", sizeof (vl.type));
366       sstrncpy (vl.type_instance, "receive-update", sizeof (vl.type_instance));
367     }
368     else
369     {
370       DEBUG ("rrdcached plugin: rc_read: Unknown statistic `%s'.", ptr->name);
371       continue;
372     }
374     plugin_dispatch_values (&vl);
375   } /* for (ptr = head; ptr != NULL; ptr = ptr->next) */
377   rrdc_stats_free (head);
379   return (0);
380 } /* int rc_read */
382 static int rc_init (void)
384   if (config_collect_stats)
385     plugin_register_read ("rrdcached", rc_read);
387   return (0);
388 } /* int rc_init */
390 static int rc_write (const data_set_t *ds, const value_list_t *vl,
391     user_data_t __attribute__((unused)) *user_data)
393   char filename[PATH_MAX];
394   char values[512];
395   char *values_array[2];
396   int status;
398   if (daemon_address == NULL)
399   {
400     ERROR ("rrdcached plugin: daemon_address == NULL.");
401     plugin_unregister_write ("rrdcached");
402     return (-1);
403   }
405   if (strcmp (ds->type, vl->type) != 0)
406   {
407     ERROR ("rrdcached plugin: DS type does not match value list type");
408     return (-1);
409   }
411   if (value_list_to_filename (filename, sizeof (filename), vl) != 0)
412   {
413     ERROR ("rrdcached plugin: value_list_to_filename failed.");
414     return (-1);
415   }
417   if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
418   {
419     ERROR ("rrdcached plugin: value_list_to_string failed.");
420     return (-1);
421   }
423   values_array[0] = values;
424   values_array[1] = NULL;
426   if (config_create_files)
427   {
428     struct stat statbuf;
430     status = stat (filename, &statbuf);
431     if (status != 0)
432     {
433       if (errno != ENOENT)
434       {
435         char errbuf[1024];
436         ERROR ("rrdcached plugin: stat (%s) failed: %s",
437             filename, sstrerror (errno, errbuf, sizeof (errbuf)));
438         return (-1);
439       }
441       status = cu_rrd_create_file (filename, ds, vl, &rrdcreate_config);
442       if (status != 0)
443       {
444         ERROR ("rrdcached plugin: cu_rrd_create_file (%s) failed.",
445             filename);
446         return (-1);
447       }
448       else if (rrdcreate_config.async)
449         return (0);
450     }
451   }
453   status = rrdc_connect (daemon_address);
454   if (status != 0)
455   {
456     ERROR ("rrdcached plugin: rrdc_connect (%s) failed with status %i.",
457         daemon_address, status);
458     return (-1);
459   }
461   status = rrdc_update (filename, /* values_num = */ 1, (void *) values_array);
462   if (status != 0)
463   {
464     ERROR ("rrdcached plugin: rrdc_update (%s, [%s], 1) failed with "
465         "status %i.",
466         filename, values_array[0], status);
467     return (-1);
468   }
470   return (0);
471 } /* int rc_write */
473 static int rc_flush (__attribute__((unused)) cdtime_t timeout, /* {{{ */
474     const char *identifier,
475     __attribute__((unused)) user_data_t *ud)
477   char filename[PATH_MAX + 1];
478   int status;
480   if (identifier == NULL)
481     return (EINVAL);
483   if (datadir != NULL)
484     ssnprintf (filename, sizeof (filename), "%s/%s.rrd", datadir, identifier);
485   else
486     ssnprintf (filename, sizeof (filename), "%s.rrd", identifier);
488   status = rrdc_connect (daemon_address);
489   if (status != 0)
490   {
491     ERROR ("rrdcached plugin: rrdc_connect (%s) failed with status %i.",
492         daemon_address, status);
493     return (-1);
494   }
496   status = rrdc_flush (filename);
497   if (status != 0)
498   {
499     ERROR ("rrdcached plugin: rrdc_flush (%s) failed with status %i.",
500         filename, status);
501     return (-1);
502   }
503   DEBUG ("rrdcached plugin: rrdc_flush (%s): Success.", filename);
505   return (0);
506 } /* }}} int rc_flush */
508 static int rc_shutdown (void)
510   rrdc_disconnect ();
511   return (0);
512 } /* int rc_shutdown */
514 void module_register (void)
516   plugin_register_complex_config ("rrdcached", rc_config);
517   plugin_register_init ("rrdcached", rc_init);
518   plugin_register_shutdown ("rrdcached", rc_shutdown);
519 } /* void module_register */
521 /*
522  * vim: set sw=2 sts=2 et :
523  */