1 /*
2 * SysDB - src/plugins/timeseries/rrdtool.c
3 * Copyright (C) 2014-2016 Sebastian 'tokkee' Harl <sh@tokkee.org>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
19 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
28 #if HAVE_CONFIG_H
29 # include "config.h"
30 #endif /* HAVE_CONFIG_H */
32 #include "sysdb.h"
33 #include "core/plugin.h"
34 #include "utils/error.h"
36 #include "liboconfig/utils.h"
38 #include <errno.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <strings.h>
42 #include <rrd.h>
43 #ifdef HAVE_RRD_CLIENT_H
44 # include <rrd_client.h>
45 #endif
47 SDB_PLUGIN_MAGIC;
49 /* Current versions of RRDtool do not support multiple RRDCacheD client
50 * connections. Use this to guard against multiple configured RRDCacheD
51 * instances. */
52 static bool rrdcached_in_use = 0;
54 static bool
55 rrdcached_connect(char *addr)
56 {
57 #ifdef HAVE_RRD_CLIENT_H
58 rrd_clear_error();
59 if (! rrdc_is_connected(addr)) {
60 if (rrdc_connect(addr)) {
61 sdb_log(SDB_LOG_ERR, "Failed to connectd to RRDCacheD at %s: %s",
62 addr, rrd_get_error());
63 return 0;
64 }
65 }
66 #else
67 sdb_log(SDB_LOG_ERR, "Callback called with RRDCacheD address "
68 "but your build of SysDB does not support that");
69 return 0;
70 #endif
71 return 1;
72 } /* rrdcached_connect */
74 /*
75 * plugin API
76 */
78 static sdb_timeseries_info_t *
79 sdb_rrd_describe(const char *id, sdb_object_t *user_data)
80 {
81 rrd_info_t *info, *iter;
82 char filename[strlen(id) + 1];
83 sdb_timeseries_info_t *ts_info;
85 strncpy(filename, id, sizeof(filename));
87 if (user_data) {
88 /* -> use RRDCacheD */
89 char *addr = SDB_OBJ_WRAPPER(user_data)->data;
91 if (! rrdcached_connect(addr))
92 return NULL;
94 #ifdef HAVE_RRD_CLIENT_H
95 /* TODO: detect and use rrdc_info if possible */
96 sdb_log(SDB_LOG_ERR, "DESCRIBE not yet supported via RRDCacheD");
97 return NULL;
98 #endif
99 }
100 else {
101 rrd_clear_error();
102 info = rrd_info_r(filename);
103 }
104 if (! info) {
105 sdb_log(SDB_LOG_ERR, "Failed to extract header information from '%s': %s",
106 filename, rrd_get_error());
107 return NULL;
108 }
110 ts_info = calloc(1, sizeof(*ts_info));
111 if (! ts_info) {
112 sdb_log(SDB_LOG_ERR, "Failed to allocate memory");
113 rrd_info_free(info);
114 return NULL;
115 }
117 for (iter = info; iter != NULL; iter = iter->next) {
118 size_t len, n, m;
119 char *ds_name;
120 char **tmp;
122 /* Parse the DS name. The raw value is not exposed via the rrd_info
123 * interface. */
124 n = strlen("ds[");
125 if (strncmp(iter->key, "ds[", n))
126 continue;
128 len = strlen(iter->key);
129 m = strlen("].index");
130 if ((len < m) || strcmp(iter->key + len - m, "].index"))
131 continue;
133 ds_name = iter->key + n;
134 len -= n;
135 ds_name[len - m] = '\0';
137 /* Append the new datum. */
138 tmp = realloc(ts_info->data_names,
139 (ts_info->data_names_len + 1) * sizeof(*ts_info->data_names));
140 if (! tmp) {
141 sdb_log(SDB_LOG_ERR, "Failed to allocate memory");
142 sdb_timeseries_info_destroy(ts_info);
143 rrd_info_free(info);
144 return NULL;
145 }
147 ts_info->data_names = tmp;
148 ts_info->data_names[ts_info->data_names_len] = strdup(ds_name);
149 if (! ts_info->data_names[ts_info->data_names_len]) {
150 sdb_log(SDB_LOG_ERR, "Failed to allocate memory");
151 sdb_timeseries_info_destroy(ts_info);
152 rrd_info_free(info);
153 return NULL;
154 }
155 ts_info->data_names_len++;
156 }
157 rrd_info_free(info);
159 return ts_info;
160 } /* sdb_rrd_describe */
162 static int
163 copy_data(sdb_timeseries_t *ts, rrd_value_t *data, time_t step,
164 size_t ds_cnt, char **ds_names)
165 {
166 time_t start = SDB_TIME_TO_SECS(ts->start);
167 time_t end = SDB_TIME_TO_SECS(ts->end);
168 time_t t;
170 ssize_t ds_target[ds_cnt];
171 size_t i, j;
173 /* determine the target index of each data-source,
174 * or -1 if the data-source isn't wanted */
175 for (i = 0; i < ds_cnt; i++) {
176 ds_target[i] = -1;
177 for (j = 0; j < ts->data_names_len; j++) {
178 if (!strcmp(ds_names[i], ts->data_names[j])) {
179 ds_target[i] = j;
180 break;
181 }
182 }
183 }
185 /* check if any wanted data-sources is missing */
186 for (i = 0; i < ts->data_names_len; i++) {
187 bool found = false;
189 for (j = 0; j < ds_cnt; j++) {
190 if (!strcmp(ts->data_names[i], ds_names[j])) {
191 found = true;
192 break;
193 }
194 }
195 if (!found) {
196 sdb_log(SDB_LOG_ERR, "Requested data-source '%s' not found",
197 ts->data_names[i]);
198 return -1;
199 }
200 }
202 for (t = start; t <= end; t += step) {
203 i = (size_t)(t - start) / (size_t)step;
205 for (j = 0; j < ds_cnt; ++j) {
206 if (ds_target[j] >= 0) {
207 size_t x = (size_t)ds_target[j];
208 ts->data[x][i].timestamp = SECS_TO_SDB_TIME(t);
209 ts->data[x][i].value = *data;
210 }
211 ++data;
212 }
213 }
214 return 0;
215 }
217 static sdb_timeseries_t *
218 sdb_rrd_fetch(const char *id, sdb_timeseries_opts_t *opts,
219 sdb_object_t *user_data)
220 {
221 sdb_timeseries_t *ts;
223 time_t start = (time_t)SDB_TIME_TO_SECS(opts->start);
224 time_t end = (time_t)SDB_TIME_TO_SECS(opts->end);
226 unsigned long step = 0;
227 unsigned long ds_cnt = 0;
228 unsigned long val_cnt = 0;
229 char **ds_namv = NULL;
230 rrd_value_t *data = NULL;
232 if (user_data) {
233 /* -> use RRDCacheD */
234 char *addr = SDB_OBJ_WRAPPER(user_data)->data;
236 if (! rrdcached_connect(addr))
237 return NULL;
239 #ifdef HAVE_RRD_CLIENT_H
240 if (rrdc_flush(id)) {
241 sdb_log(SDB_LOG_ERR, "Failed to flush '%s' through RRDCacheD: %s",
242 id, rrd_get_error());
243 return NULL;
244 }
245 #endif
246 }
248 #define FREE_RRD_DATA() \
249 do { \
250 size_t i; \
251 for (i = 0; i < ds_cnt; ++i) \
252 rrd_freemem(ds_namv[i]); \
253 rrd_freemem(ds_namv); \
254 rrd_freemem(data); \
255 } while (0)
257 /* limit to about 1000 data-points for now
258 * TODO: make this configurable */
259 step = (end - start) / 1000;
261 if (rrd_fetch_r(id, "AVERAGE", &start, &end, &step,
262 &ds_cnt, &ds_namv, &data)) {
263 char errbuf[1024];
264 sdb_strerror(errno, errbuf, sizeof(errbuf));
265 sdb_log(SDB_LOG_ERR, "Failed to fetch data from %s: %s", id, errbuf);
266 return NULL;
267 }
269 val_cnt = (unsigned long)(end - start) / step;
271 /* RRDtool does not support fetching specific data-sources, so we'll have
272 * to filter the requested ones after fetching them all */
273 if (opts->data_names && opts->data_names_len)
274 ts = sdb_timeseries_create(opts->data_names_len,
275 (const char * const *)opts->data_names, val_cnt);
276 else
277 ts = sdb_timeseries_create(ds_cnt, (const char * const *)ds_namv, val_cnt);
278 if (! ts) {
279 char errbuf[1024];
280 sdb_strerror(errno, errbuf, sizeof(errbuf));
281 sdb_log(SDB_LOG_ERR, "Failed to allocate time-series object: %s", errbuf);
282 FREE_RRD_DATA();
283 return NULL;
284 }
286 ts->start = SECS_TO_SDB_TIME(start + (time_t)step);
287 ts->end = SECS_TO_SDB_TIME(end);
289 if (copy_data(ts, data, (time_t)step, (size_t)ds_cnt, ds_namv) < 0) {
290 FREE_RRD_DATA();
291 sdb_timeseries_destroy(ts);
292 return NULL;
293 }
295 FREE_RRD_DATA();
296 return ts;
297 } /* sdb_rrd_fetch */
299 static sdb_timeseries_fetcher_t fetcher_impl = {
300 sdb_rrd_describe, sdb_rrd_fetch,
301 };
303 static int
304 sdb_rrdcached_shutdown(sdb_object_t __attribute__((unused)) *user_data)
305 {
306 #ifdef HAVE_RRD_CLIENT_H
307 rrdc_disconnect();
308 #endif
309 return 0;
310 } /* sdb_rrdcached_shutdown */
312 static int
313 sdb_rrd_config_rrdcached(oconfig_item_t *ci)
314 {
315 sdb_object_t *ud;
316 char *addr = NULL;
318 if (rrdcached_in_use) {
319 sdb_log(SDB_LOG_ERR, "RRDCacheD does not support multiple connections");
320 return -1;
321 }
323 #ifndef HAVE_RRD_CLIENT_H
324 sdb_log(SDB_LOG_ERR, "RRDCacheD client support not available in your SysDB build");
325 return -1;
326 #else
327 if (oconfig_get_string(ci, &addr)) {
328 sdb_log(SDB_LOG_ERR, "RRDCacheD requires a single string argument\n"
329 "\tUsage <RRDCacheD ADDR>");
330 return -1;
331 }
332 if ((*addr != '/') && strncmp(addr, "unix:", strlen("unix:"))) {
333 /* XXX: add (optional) support for rrdc_fetch if available */
334 sdb_log(SDB_LOG_ERR, "RRDCacheD only supports local (UNIX socket) addresses");
335 return -1;
336 }
338 addr = strdup(addr);
339 if (! addr) {
340 char errbuf[1024];
341 sdb_log(SDB_LOG_ERR, "Failed to duplicate string: %s",
342 sdb_strerror(errno, errbuf, sizeof(errbuf)));
343 return -1;
344 }
345 if (ci->children_num)
346 sdb_log(SDB_LOG_WARNING, "RRDCacheD does not support any child config options");
348 ud = sdb_object_create_wrapper("rrdcached-addr", addr, free);
349 if (! ud) {
350 char errbuf[1024];
351 sdb_log(SDB_LOG_ERR, "Failed to create user-data object: %s",
352 sdb_strerror(errno, errbuf, sizeof(errbuf)));
353 free(addr);
354 return -1;
355 }
357 sdb_plugin_register_timeseries_fetcher("rrdcached", &fetcher_impl, ud);
358 sdb_plugin_register_shutdown("rrdcached", sdb_rrdcached_shutdown, NULL);
359 sdb_object_deref(ud);
360 rrdcached_in_use = 1;
361 return 0;
362 #endif
363 } /* sdb_rrd_config_rrdcached */
365 static int
366 sdb_rrd_config(oconfig_item_t *ci)
367 {
368 int i;
370 if (! ci) { /* reconfigure */
371 rrdcached_in_use = 0;
372 return 0;
373 }
375 for (i = 0; i < ci->children_num; ++i) {
376 oconfig_item_t *child = ci->children + i;
378 if (! strcasecmp(child->key, "RRDCacheD"))
379 sdb_rrd_config_rrdcached(child);
380 else
381 sdb_log(SDB_LOG_WARNING, "Ignoring unknown config option '%s'.", child->key);
382 }
383 return 0;
384 } /* sdb_rrd_config */
386 int
387 sdb_module_init(sdb_plugin_info_t *info)
388 {
389 sdb_plugin_set_info(info, SDB_PLUGIN_INFO_DESC,
390 "fetch time-series from RRD files");
391 sdb_plugin_set_info(info, SDB_PLUGIN_INFO_COPYRIGHT,
392 "Copyright (C) 2014 Sebastian 'tokkee' Harl <sh@tokkee.org>");
393 sdb_plugin_set_info(info, SDB_PLUGIN_INFO_LICENSE, "BSD");
394 sdb_plugin_set_info(info, SDB_PLUGIN_INFO_VERSION, SDB_VERSION);
395 sdb_plugin_set_info(info, SDB_PLUGIN_INFO_PLUGIN_VERSION, SDB_VERSION);
397 sdb_plugin_register_timeseries_fetcher("rrdtool", &fetcher_impl, NULL);
398 sdb_plugin_register_config(sdb_rrd_config);
399 return 0;
400 } /* sdb_module_init */
402 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */