1 /*
2 * SysDB - src/core/plugin.c
3 * Copyright (C) 2012 Sebastian 'tokkee' Harl <sh@tokkee.org>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
19 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
28 #if HAVE_CONFIG_H
29 # include "config.h"
30 #endif /* HAVE_CONFIG_H */
32 #include "sysdb.h"
33 #include "core/plugin.h"
34 #include "core/time.h"
35 #include "utils/error.h"
36 #include "utils/llist.h"
37 #include "utils/strbuf.h"
39 #include <assert.h>
41 #include <errno.h>
43 #include <stdarg.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <strings.h>
47 #include <unistd.h>
49 #include <ltdl.h>
51 #include <pthread.h>
53 /* helper to access info attributes */
54 #define INFO_GET(i, attr) \
55 ((i)->attr ? (i)->attr : #attr" not set")
57 /*
58 * private data types
59 */
61 typedef struct {
62 sdb_object_t super;
63 sdb_plugin_ctx_t public;
65 sdb_plugin_info_t info;
66 lt_dlhandle handle;
68 /* The usage count differs from the object's ref count
69 * in that it provides higher level information about how
70 * the plugin is in use. */
71 size_t use_cnt;
72 } ctx_t;
73 #define CTX_INIT { SDB_OBJECT_INIT, \
74 SDB_PLUGIN_CTX_INIT, SDB_PLUGIN_INFO_INIT, NULL, 0 }
76 #define CTX(obj) ((ctx_t *)(obj))
78 typedef struct {
79 sdb_object_t super;
80 void *cb_callback;
81 sdb_object_t *cb_user_data;
82 ctx_t *cb_ctx;
83 } callback_t;
84 #define CB_INIT { SDB_OBJECT_INIT, \
85 /* callback = */ NULL, /* user_data = */ NULL, \
86 SDB_PLUGIN_CTX_INIT }
87 #define CB(obj) ((callback_t *)(obj))
88 #define CONST_CB(obj) ((const callback_t *)(obj))
90 typedef struct {
91 callback_t super;
92 #define ccb_callback super.cb_callback
93 #define ccb_user_data super.cb_user_data
94 #define ccb_ctx super.cb_ctx
95 sdb_time_t ccb_interval;
96 sdb_time_t ccb_next_update;
97 } collector_t;
98 #define CCB(obj) ((collector_t *)(obj))
99 #define CONST_CCB(obj) ((const collector_t *)(obj))
101 typedef struct {
102 callback_t super; /* cb_callback will always be NULL */
103 #define w_user_data super.cb_user_data
104 #define w_ctx super.cb_ctx
105 sdb_store_writer_t impl;
106 } writer_t;
107 #define WRITER(obj) ((writer_t *)(obj))
109 typedef struct {
110 callback_t super; /* cb_callback will always be NULL */
111 #define r_user_data super.cb_user_data
112 #define r_ctx super.cb_ctx
113 sdb_store_reader_t impl;
114 } reader_t;
115 #define READER(obj) ((reader_t *)(obj))
117 typedef struct {
118 callback_t super; /* cb_callback will always be NULL */
119 #define ts_user_data super.cb_user_data
120 #define ts_ctx super.cb_ctx
121 sdb_timeseries_fetcher_t impl;
122 } ts_fetcher_t;
123 #define TS_FETCHER(obj) ((ts_fetcher_t *)(obj))
125 /*
126 * private variables
127 */
129 static sdb_plugin_ctx_t plugin_default_ctx = SDB_PLUGIN_CTX_INIT;
130 static sdb_plugin_info_t plugin_default_info = SDB_PLUGIN_INFO_INIT;
132 static pthread_key_t plugin_ctx_key;
133 static bool plugin_ctx_key_initialized = 0;
135 /* a list of the plugin contexts of all registered plugins */
136 static sdb_llist_t *all_plugins = NULL;
138 static sdb_llist_t *config_list = NULL;
139 static sdb_llist_t *init_list = NULL;
140 static sdb_llist_t *collector_list = NULL;
141 static sdb_llist_t *cname_list = NULL;
142 static sdb_llist_t *shutdown_list = NULL;
143 static sdb_llist_t *log_list = NULL;
144 static sdb_llist_t *timeseries_fetcher_list = NULL;
145 static sdb_llist_t *writer_list = NULL;
146 static sdb_llist_t *reader_list = NULL;
148 static struct {
149 const char *type;
150 sdb_llist_t **list;
151 } all_lists[] = {
152 { "config", &config_list },
153 { "init", &init_list },
154 { "collector", &collector_list },
155 { "cname", &cname_list },
156 { "shutdown", &shutdown_list },
157 { "log", &log_list },
158 { "timeseries fetcher", ×eries_fetcher_list },
159 { "store writer", &writer_list },
160 { "store reader", &reader_list },
161 };
163 /*
164 * private helper functions
165 */
167 static void
168 plugin_info_clear(sdb_plugin_info_t *info)
169 {
170 sdb_plugin_info_t empty_info = SDB_PLUGIN_INFO_INIT;
171 if (! info)
172 return;
174 if (info->plugin_name)
175 free(info->plugin_name);
176 if (info->filename)
177 free(info->filename);
179 if (info->description)
180 free(info->description);
181 if (info->copyright)
182 free(info->copyright);
183 if (info->license)
184 free(info->license);
186 *info = empty_info;
187 } /* plugin_info_clear */
189 static void
190 ctx_key_init(void)
191 {
192 if (plugin_ctx_key_initialized)
193 return;
195 pthread_key_create(&plugin_ctx_key, /* destructor */ NULL);
196 plugin_ctx_key_initialized = 1;
197 } /* ctx_key_init */
199 static int
200 plugin_cmp_next_update(const sdb_object_t *a, const sdb_object_t *b)
201 {
202 const collector_t *ccb1 = (const collector_t *)a;
203 const collector_t *ccb2 = (const collector_t *)b;
205 assert(ccb1 && ccb2);
207 return (ccb1->ccb_next_update > ccb2->ccb_next_update)
208 ? 1 : (ccb1->ccb_next_update < ccb2->ccb_next_update)
209 ? -1 : 0;
210 } /* plugin_cmp_next_update */
212 static int
213 plugin_lookup_by_name(const sdb_object_t *obj, const void *id)
214 {
215 const callback_t *cb = CONST_CB(obj);
216 const char *name = id;
218 assert(cb && id);
220 /* when a plugin was registered from outside a plugin (e.g. the core),
221 * we don't have a plugin context */
222 if (! cb->cb_ctx)
223 return 1;
225 if (!strcasecmp(cb->cb_ctx->info.plugin_name, name))
226 return 0;
227 return 1;
228 } /* plugin_lookup_by_name */
230 /* since this function is called from sdb_plugin_reconfigure_finish()
231 * when iterating through all_plugins, we may not do any additional
232 * modifications to all_plugins except for the optional removal */
233 static void
234 plugin_unregister_by_name(const char *plugin_name)
235 {
236 sdb_object_t *obj;
237 size_t i;
239 for (i = 0; i < SDB_STATIC_ARRAY_LEN(all_lists); ++i) {
240 const char *type = all_lists[i].type;
241 sdb_llist_t *list = *all_lists[i].list;
243 while (1) {
244 callback_t *cb;
246 cb = CB(sdb_llist_remove(list,
247 plugin_lookup_by_name, plugin_name));
248 if (! cb)
249 break;
251 assert(cb->cb_ctx);
253 sdb_log(SDB_LOG_INFO, "core: Unregistering "
254 "%s callback '%s' (module %s)", type, cb->super.name,
255 cb->cb_ctx->info.plugin_name);
256 sdb_object_deref(SDB_OBJ(cb));
257 }
258 }
260 obj = sdb_llist_search_by_name(all_plugins, plugin_name);
261 /* when called from sdb_plugin_reconfigure_finish, the object has already
262 * been removed from the list */
263 if (obj && (obj->ref_cnt <= 1)) {
264 sdb_llist_remove_by_name(all_plugins, plugin_name);
265 sdb_object_deref(obj);
266 }
267 /* else: other callbacks still reference it */
268 } /* plugin_unregister_by_name */
270 /*
271 * store writer wrapper for performing database queries:
272 * It wraps another store writer, adding extra logic as needed.
273 */
275 typedef struct {
276 sdb_object_t super;
277 sdb_store_writer_t *w;
278 sdb_object_t *ud;
279 sdb_query_opts_t opts;
280 } query_writer_t;
281 #define QUERY_WRITER_INIT(w, ud) { \
282 SDB_OBJECT_INIT, \
283 (w), (ud), \
284 SDB_DEFAULT_QUERY_OPTS \
285 }
286 #define QUERY_WRITER(obj) ((query_writer_t *)(obj))
288 static int
289 query_store_host(sdb_store_host_t *host, sdb_object_t *user_data)
290 {
291 query_writer_t *qw = QUERY_WRITER(user_data);
292 return qw->w->store_host(host, qw->ud);
293 } /* query_store_host */
295 static int
296 query_store_service(sdb_store_service_t *service, sdb_object_t *user_data)
297 {
298 query_writer_t *qw = QUERY_WRITER(user_data);
299 return qw->w->store_service(service, qw->ud);
300 } /* query_store_service */
302 static int
303 query_store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
304 {
305 query_writer_t *qw = QUERY_WRITER(user_data);
306 sdb_timeseries_info_t *infos[metric->stores_num];
307 sdb_metric_store_t stores[metric->stores_num];
309 const sdb_metric_store_t *orig_stores = metric->stores;
310 int status;
311 size_t i;
313 if (! qw->opts.describe_timeseries)
314 /* nothing further to do */
315 return qw->w->store_metric(metric, qw->ud);
317 for (i = 0; i < metric->stores_num; i++) {
318 sdb_metric_store_t *s = stores + i;
319 *s = metric->stores[i];
320 infos[i] = sdb_plugin_describe_timeseries(s->type, s->id);
321 s->info = infos[i];
322 }
324 metric->stores = stores;
325 status = qw->w->store_metric(metric, qw->ud);
326 metric->stores = orig_stores;
328 for (i = 0; i < metric->stores_num; i++)
329 sdb_timeseries_info_destroy(infos[i]);
330 return status;
331 } /* query_store_metric */
333 static int
334 query_store_attribute(sdb_store_attribute_t *attr, sdb_object_t *user_data)
335 {
336 query_writer_t *qw = QUERY_WRITER(user_data);
337 return qw->w->store_attribute(attr, qw->ud);
338 } /* query_store_attribute */
340 static sdb_store_writer_t query_writer = {
341 query_store_host, query_store_service,
342 query_store_metric, query_store_attribute,
343 };
345 /*
346 * private types
347 */
349 static int
350 ctx_init(sdb_object_t *obj, va_list __attribute__((unused)) ap)
351 {
352 ctx_t *ctx = CTX(obj);
354 assert(ctx);
356 ctx->public = plugin_default_ctx;
357 ctx->info = plugin_default_info;
358 ctx->handle = NULL;
359 ctx->use_cnt = 1;
360 return 0;
361 } /* ctx_init */
363 static void
364 ctx_destroy(sdb_object_t *obj)
365 {
366 ctx_t *ctx = CTX(obj);
368 if (ctx->handle) {
369 const char *err;
371 sdb_log(SDB_LOG_INFO, "core: Unloading module %s",
372 ctx->info.plugin_name);
374 lt_dlerror();
375 lt_dlclose(ctx->handle);
376 if ((err = lt_dlerror()))
377 sdb_log(SDB_LOG_WARNING, "core: Failed to unload module %s: %s",
378 ctx->info.plugin_name, err);
379 }
381 plugin_info_clear(&ctx->info);
382 } /* ctx_destroy */
384 static sdb_type_t ctx_type = {
385 sizeof(ctx_t),
387 ctx_init,
388 ctx_destroy
389 };
391 static ctx_t *
392 ctx_get(void)
393 {
394 if (! plugin_ctx_key_initialized)
395 ctx_key_init();
396 return pthread_getspecific(plugin_ctx_key);
397 } /* ctx_get */
399 static ctx_t *
400 ctx_set(ctx_t *new)
401 {
402 ctx_t *old;
404 if (! plugin_ctx_key_initialized)
405 ctx_key_init();
407 old = pthread_getspecific(plugin_ctx_key);
408 if (old)
409 sdb_object_deref(SDB_OBJ(old));
410 if (new)
411 sdb_object_ref(SDB_OBJ(new));
412 pthread_setspecific(plugin_ctx_key, new);
413 return old;
414 } /* ctx_set */
416 static ctx_t *
417 ctx_create(const char *name)
418 {
419 ctx_t *ctx;
421 ctx = CTX(sdb_object_create(name, ctx_type));
422 if (! ctx)
423 return NULL;
425 if (! plugin_ctx_key_initialized)
426 ctx_key_init();
427 ctx_set(ctx);
428 return ctx;
429 } /* ctx_create */
431 /*
432 * plugin_init_ok:
433 * Checks whether the registration of a new plugin identified by 'obj' is
434 * okay. It consumes the first two arguments of 'ap'.
435 */
436 static bool
437 plugin_init_ok(sdb_object_t *obj, va_list ap)
438 {
439 sdb_llist_t **list = va_arg(ap, sdb_llist_t **);
440 const char *type = va_arg(ap, const char *);
442 assert(list); assert(type);
444 if (sdb_llist_search_by_name(*list, obj->name)) {
445 sdb_log(SDB_LOG_WARNING, "core: %s callback '%s' "
446 "has already been registered. Ignoring newly "
447 "registered version.", type, obj->name);
448 return 0;
449 }
450 return 1;
451 } /* plugin_init_ok */
453 static int
454 plugin_cb_init(sdb_object_t *obj, va_list ap)
455 {
456 void *callback;
457 sdb_object_t *ud;
459 if (! plugin_init_ok(obj, ap))
460 return -1;
462 callback = va_arg(ap, void *);
463 ud = va_arg(ap, sdb_object_t *);
465 /* cb_ctx may be NULL if the plugin was not registered by a plugin */
467 CB(obj)->cb_callback = callback;
468 CB(obj)->cb_ctx = ctx_get();
469 sdb_object_ref(SDB_OBJ(CB(obj)->cb_ctx));
471 sdb_object_ref(ud);
472 CB(obj)->cb_user_data = ud;
473 return 0;
474 } /* plugin_cb_init */
476 static void
477 plugin_cb_destroy(sdb_object_t *obj)
478 {
479 assert(obj);
480 sdb_object_deref(CB(obj)->cb_user_data);
481 sdb_object_deref(SDB_OBJ(CB(obj)->cb_ctx));
482 } /* plugin_cb_destroy */
484 static sdb_type_t callback_type = {
485 sizeof(callback_t),
487 plugin_cb_init,
488 plugin_cb_destroy
489 };
491 static sdb_type_t collector_type = {
492 sizeof(collector_t),
494 plugin_cb_init,
495 plugin_cb_destroy
496 };
498 static int
499 plugin_writer_init(sdb_object_t *obj, va_list ap)
500 {
501 sdb_store_writer_t *impl;
502 sdb_object_t *ud;
504 if (! plugin_init_ok(obj, ap))
505 return -1;
507 impl = va_arg(ap, sdb_store_writer_t *);
508 ud = va_arg(ap, sdb_object_t *);
509 assert(impl);
511 if ((! impl->store_host) || (! impl->store_service)
512 || (! impl->store_metric) || (! impl->store_attribute)) {
513 sdb_log(SDB_LOG_ERR, "core: store writer callback '%s' "
514 "does not fully implement the writer interface.",
515 obj->name);
516 return -1;
517 }
519 /* ctx may be NULL if the callback was not registered by a plugin */
521 WRITER(obj)->impl = *impl;
522 WRITER(obj)->w_ctx = ctx_get();
523 sdb_object_ref(SDB_OBJ(WRITER(obj)->w_ctx));
525 sdb_object_ref(ud);
526 WRITER(obj)->w_user_data = ud;
527 return 0;
528 } /* plugin_writer_init */
530 static void
531 plugin_writer_destroy(sdb_object_t *obj)
532 {
533 assert(obj);
534 sdb_object_deref(WRITER(obj)->w_user_data);
535 sdb_object_deref(SDB_OBJ(WRITER(obj)->w_ctx));
536 } /* plugin_writer_destroy */
538 static sdb_type_t writer_type = {
539 sizeof(writer_t),
541 plugin_writer_init,
542 plugin_writer_destroy
543 };
545 static int
546 plugin_reader_init(sdb_object_t *obj, va_list ap)
547 {
548 sdb_store_reader_t *impl;
549 sdb_object_t *ud;
551 if (! plugin_init_ok(obj, ap))
552 return -1;
554 impl = va_arg(ap, sdb_store_reader_t *);
555 ud = va_arg(ap, sdb_object_t *);
556 assert(impl);
558 if ((! impl->prepare_query) || (! impl->execute_query)) {
559 sdb_log(SDB_LOG_ERR, "core: store reader callback '%s' "
560 "does not fully implement the reader interface.",
561 obj->name);
562 return -1;
563 }
565 /* ctx may be NULL if the callback was not registered by a plugin */
567 READER(obj)->impl = *impl;
568 READER(obj)->r_ctx = ctx_get();
569 sdb_object_ref(SDB_OBJ(READER(obj)->r_ctx));
571 sdb_object_ref(ud);
572 READER(obj)->r_user_data = ud;
573 return 0;
574 } /* plugin_reader_init */
576 static void
577 plugin_reader_destroy(sdb_object_t *obj)
578 {
579 assert(obj);
580 sdb_object_deref(READER(obj)->r_user_data);
581 sdb_object_deref(SDB_OBJ(READER(obj)->r_ctx));
582 } /* plugin_reader_destroy */
584 static sdb_type_t reader_type = {
585 sizeof(reader_t),
587 plugin_reader_init,
588 plugin_reader_destroy
589 };
591 static int
592 plugin_ts_fetcher_init(sdb_object_t *obj, va_list ap)
593 {
594 sdb_timeseries_fetcher_t *impl;
595 sdb_object_t *ud;
597 if (! plugin_init_ok(obj, ap))
598 return -1;
600 impl = va_arg(ap, sdb_timeseries_fetcher_t *);
601 ud = va_arg(ap, sdb_object_t *);
602 assert(impl);
604 if ((! impl->describe) || (! impl->fetch)) {
605 sdb_log(SDB_LOG_ERR, "core: timeseries fetcher callback '%s' "
606 "does not fully implement the interface.",
607 obj->name);
608 return -1;
609 }
611 /* ctx may be NULL if the callback was not registered by a plugin */
613 TS_FETCHER(obj)->impl = *impl;
614 TS_FETCHER(obj)->ts_ctx = ctx_get();
615 sdb_object_ref(SDB_OBJ(TS_FETCHER(obj)->ts_ctx));
617 sdb_object_ref(ud);
618 TS_FETCHER(obj)->ts_user_data = ud;
619 return 0;
620 } /* plugin_ts_fetcher_init */
622 static void
623 plugin_ts_fetcher_destroy(sdb_object_t *obj)
624 {
625 assert(obj);
626 sdb_object_deref(TS_FETCHER(obj)->ts_user_data);
627 sdb_object_deref(SDB_OBJ(TS_FETCHER(obj)->ts_ctx));
628 } /* plugin_ts_fetcher_destroy */
630 static sdb_type_t ts_fetcher_type = {
631 sizeof(ts_fetcher_t),
633 plugin_ts_fetcher_init,
634 plugin_ts_fetcher_destroy
635 };
637 static int
638 module_init(const char *name, lt_dlhandle lh, sdb_plugin_info_t *info)
639 {
640 int (*mod_init)(sdb_plugin_info_t *);
641 int status;
643 mod_init = (int (*)(sdb_plugin_info_t *))lt_dlsym(lh, "sdb_module_init");
644 if (! mod_init) {
645 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': "
646 "could not find symbol 'sdb_module_init'", name);
647 return -1;
648 }
650 status = mod_init(info);
651 if (status) {
652 sdb_log(SDB_LOG_ERR, "core: Failed to initialize "
653 "module '%s'", name);
654 plugin_unregister_by_name(name);
655 return -1;
656 }
657 return 0;
658 } /* module_init */
660 static int
661 module_load(const char *basedir, const char *name,
662 const sdb_plugin_ctx_t *plugin_ctx)
663 {
664 char base_name[name ? strlen(name) + 1 : 1];
665 const char *name_ptr;
666 char *tmp;
668 char filename[1024];
669 lt_dlhandle lh;
671 ctx_t *ctx;
673 int status;
675 assert(name);
677 base_name[0] = '\0';
678 name_ptr = name;
680 while ((tmp = strstr(name_ptr, "::"))) {
681 strncat(base_name, name_ptr, (size_t)(tmp - name_ptr));
682 strcat(base_name, "/");
683 name_ptr = tmp + strlen("::");
684 }
685 strcat(base_name, name_ptr);
687 if (! basedir)
688 basedir = PKGLIBDIR;
690 snprintf(filename, sizeof(filename), "%s/%s.so", basedir, base_name);
691 filename[sizeof(filename) - 1] = '\0';
693 if (access(filename, R_OK)) {
694 char errbuf[1024];
695 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s' (%s): %s",
696 name, filename, sdb_strerror(errno, errbuf, sizeof(errbuf)));
697 return -1;
698 }
700 lt_dlinit();
701 lt_dlerror();
703 lh = lt_dlopen(filename);
704 if (! lh) {
705 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': %s"
706 "The most common cause for this problem are missing "
707 "dependencies.\n", name, lt_dlerror());
708 return -1;
709 }
711 if (ctx_get())
712 sdb_log(SDB_LOG_WARNING, "core: Discarding old plugin context");
714 ctx = ctx_create(name);
715 if (! ctx) {
716 sdb_log(SDB_LOG_ERR, "core: Failed to initialize plugin context");
717 return -1;
718 }
720 ctx->info.plugin_name = strdup(name);
721 ctx->info.filename = strdup(filename);
722 ctx->handle = lh;
724 if (plugin_ctx)
725 ctx->public = *plugin_ctx;
727 if ((status = module_init(name, lh, &ctx->info))) {
728 sdb_object_deref(SDB_OBJ(ctx));
729 return status;
730 }
732 /* compare minor version */
733 if ((ctx->info.version < 0)
734 || ((int)(ctx->info.version / 100) != (int)(SDB_VERSION / 100)))
735 sdb_log(SDB_LOG_WARNING, "core: WARNING: version of "
736 "plugin '%s' (%i.%i.%i) does not match our version "
737 "(%i.%i.%i); this might cause problems",
738 name, SDB_VERSION_DECODE(ctx->info.version),
739 SDB_VERSION_DECODE(SDB_VERSION));
741 sdb_llist_append(all_plugins, SDB_OBJ(ctx));
743 sdb_log(SDB_LOG_INFO, "core: Successfully loaded "
744 "plugin %s v%i (%s)", ctx->info.plugin_name,
745 ctx->info.plugin_version,
746 INFO_GET(&ctx->info, description));
747 sdb_log(SDB_LOG_INFO, "core: Plugin %s: %s, License: %s",
748 ctx->info.plugin_name,
749 INFO_GET(&ctx->info, copyright),
750 INFO_GET(&ctx->info, license));
752 /* any registered callbacks took ownership of the context */
753 sdb_object_deref(SDB_OBJ(ctx));
755 /* reset */
756 ctx_set(NULL);
757 return 0;
758 } /* module_load */
760 static char *
761 plugin_get_name(const char *name, char *buf, size_t bufsize)
762 {
763 ctx_t *ctx = ctx_get();
765 if (ctx)
766 snprintf(buf, bufsize, "%s::%s", ctx->info.plugin_name, name);
767 else
768 snprintf(buf, bufsize, "core::%s", name);
769 return buf;
770 } /* plugin_get_name */
772 static int
773 plugin_add_impl(sdb_llist_t **list, sdb_type_t T, const char *type,
774 const char *name, void *impl, sdb_object_t *user_data)
775 {
776 sdb_object_t *obj;
778 if ((! name) || (! impl))
779 return -1;
781 assert(list);
783 if (! *list)
784 *list = sdb_llist_create();
785 if (! *list)
786 return -1;
788 obj = sdb_object_create(name, T, list, type, impl, user_data);
789 if (! obj)
790 return -1;
792 if (sdb_llist_append(*list, obj)) {
793 sdb_object_deref(obj);
794 return -1;
795 }
797 /* pass control to the list */
798 sdb_object_deref(obj);
800 sdb_log(SDB_LOG_INFO, "core: Registered %s callback '%s'.",
801 type, name);
802 return 0;
803 } /* plugin_add_impl */
805 /*
806 * object meta-data
807 */
809 typedef struct {
810 int obj_type;
811 sdb_time_t last_update;
812 sdb_time_t interval;
813 } interval_fetcher_t;
815 static int
816 interval_fetcher_host(sdb_store_host_t *host, sdb_object_t *user_data)
817 {
818 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
819 lu->obj_type = SDB_HOST;
820 lu->last_update = host->last_update;
821 return 0;
822 } /* interval_fetcher_host */
824 static int
825 interval_fetcher_service(sdb_store_service_t *svc, sdb_object_t *user_data)
826 {
827 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
828 lu->obj_type = SDB_SERVICE;
829 lu->last_update = svc->last_update;
830 return 0;
831 } /* interval_fetcher_service */
833 static int
834 interval_fetcher_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
835 {
836 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
837 lu->obj_type = SDB_METRIC;
838 lu->last_update = metric->last_update;
839 return 0;
840 } /* interval_fetcher_metric */
842 static int
843 interval_fetcher_attr(sdb_store_attribute_t *attr, sdb_object_t *user_data)
844 {
845 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
846 lu->obj_type = SDB_ATTRIBUTE;
847 lu->last_update = attr->last_update;
848 return 0;
849 } /* interval_fetcher_attr */
851 static sdb_store_writer_t interval_fetcher = {
852 interval_fetcher_host, interval_fetcher_service,
853 interval_fetcher_metric, interval_fetcher_attr,
854 };
856 static int
857 get_interval(int obj_type, const char *hostname,
858 int parent_type, const char *parent, const char *name,
859 sdb_time_t last_update, sdb_time_t *interval_out)
860 {
861 sdb_ast_fetch_t fetch = SDB_AST_FETCH_INIT;
862 char hn[hostname ? strlen(hostname) + 1 : 1];
863 char pn[parent ? strlen(parent) + 1 : 1];
864 char n[strlen(name) + 1];
865 int status;
867 interval_fetcher_t lu = { 0, 0, 0 };
868 sdb_object_wrapper_t obj = SDB_OBJECT_WRAPPER_STATIC(&lu);
869 sdb_time_t interval;
871 assert(name);
873 if (hostname)
874 strncpy(hn, hostname, sizeof(hn));
875 if (parent)
876 strncpy(pn, parent, sizeof(pn));
877 strncpy(n, name, sizeof(n));
879 fetch.obj_type = obj_type;
880 fetch.hostname = hostname ? hn : NULL;
881 fetch.parent_type = parent_type;
882 fetch.parent = parent ? pn : NULL;
883 fetch.name = n;
885 status = sdb_plugin_query(SDB_AST_NODE(&fetch),
886 &interval_fetcher, SDB_OBJ(&obj), NULL, NULL);
887 if ((status < 0) || (lu.obj_type != obj_type) || (lu.last_update == 0)) {
888 *interval_out = 0;
889 return 0;
890 }
892 if (lu.last_update >= last_update) {
893 if (lu.last_update > last_update)
894 sdb_log(SDB_LOG_DEBUG, "memstore: Cannot update %s '%s' - "
895 "value too old (%"PRIsdbTIME" < %"PRIsdbTIME")",
896 SDB_STORE_TYPE_TO_NAME(obj_type), name,
897 lu.last_update, last_update);
898 *interval_out = lu.interval;
899 return 1;
900 }
902 interval = last_update - lu.last_update;
903 if (lu.interval && interval)
904 interval = (sdb_time_t)((0.9 * (double)lu.interval)
905 + (0.1 * (double)interval));
906 *interval_out = interval;
907 return 0;
908 } /* get_interval */
910 static void
911 get_backend(char **backends, size_t *backends_num)
912 {
913 const sdb_plugin_info_t *info;
915 info = sdb_plugin_current();
916 if ((! info) || (! info->plugin_name) || (! *info->plugin_name)) {
917 *backends_num = 0;
918 return;
919 }
921 backends[0] = info->plugin_name;
922 *backends_num = 1;
923 } /* get_backend */
925 /*
926 * public API
927 */
929 int
930 sdb_plugin_load(const char *basedir, const char *name,
931 const sdb_plugin_ctx_t *plugin_ctx)
932 {
933 ctx_t *ctx;
935 int status;
937 if ((! name) || (! *name))
938 return -1;
940 if (! all_plugins) {
941 if (! (all_plugins = sdb_llist_create())) {
942 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': "
943 "internal error while creating linked list", name);
944 return -1;
945 }
946 }
948 ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
949 if (ctx) {
950 /* plugin already loaded */
951 if (! ctx->use_cnt) {
952 /* reloading plugin */
953 ctx_t *old_ctx = ctx_set(ctx);
955 status = module_init(ctx->info.plugin_name, ctx->handle, NULL);
956 if (status)
957 return status;
959 sdb_log(SDB_LOG_INFO, "core: Successfully reloaded plugin "
960 "'%s' (%s)", ctx->info.plugin_name,
961 INFO_GET(&ctx->info, description));
962 ctx_set(old_ctx);
963 }
964 ++ctx->use_cnt;
965 return 0;
966 }
968 return module_load(basedir, name, plugin_ctx);
969 } /* sdb_plugin_load */
971 int
972 sdb_plugin_set_info(sdb_plugin_info_t *info, int type, ...)
973 {
974 va_list ap;
976 if (! info)
977 return -1;
979 va_start(ap, type);
981 switch (type) {
982 case SDB_PLUGIN_INFO_DESC:
983 {
984 char *desc = va_arg(ap, char *);
985 if (desc) {
986 if (info->description)
987 free(info->description);
988 info->description = strdup(desc);
989 }
990 }
991 break;
992 case SDB_PLUGIN_INFO_COPYRIGHT:
993 {
994 char *copyright = va_arg(ap, char *);
995 if (copyright)
996 info->copyright = strdup(copyright);
997 }
998 break;
999 case SDB_PLUGIN_INFO_LICENSE:
1000 {
1001 char *license = va_arg(ap, char *);
1002 if (license) {
1003 if (info->license)
1004 free(info->license);
1005 info->license = strdup(license);
1006 }
1007 }
1008 break;
1009 case SDB_PLUGIN_INFO_VERSION:
1010 {
1011 int version = va_arg(ap, int);
1012 info->version = version;
1013 }
1014 break;
1015 case SDB_PLUGIN_INFO_PLUGIN_VERSION:
1016 {
1017 int version = va_arg(ap, int);
1018 info->plugin_version = version;
1019 }
1020 break;
1021 default:
1022 va_end(ap);
1023 return -1;
1024 }
1026 va_end(ap);
1027 return 0;
1028 } /* sdb_plugin_set_info */
1030 int
1031 sdb_plugin_register_config(sdb_plugin_config_cb callback)
1032 {
1033 ctx_t *ctx = ctx_get();
1035 if (! ctx) {
1036 sdb_log(SDB_LOG_ERR, "core: Invalid attempt to register a "
1037 "config callback from outside a plugin");
1038 return -1;
1039 }
1040 return plugin_add_impl(&config_list, callback_type, "config",
1041 ctx->info.plugin_name, (void *)callback, NULL);
1042 } /* sdb_plugin_register_config */
1044 int
1045 sdb_plugin_register_init(const char *name, sdb_plugin_init_cb callback,
1046 sdb_object_t *user_data)
1047 {
1048 char cb_name[1024];
1049 return plugin_add_impl(&init_list, callback_type, "init",
1050 plugin_get_name(name, cb_name, sizeof(cb_name)),
1051 (void *)callback, user_data);
1052 } /* sdb_plugin_register_init */
1054 int
1055 sdb_plugin_register_shutdown(const char *name, sdb_plugin_shutdown_cb callback,
1056 sdb_object_t *user_data)
1057 {
1058 char cb_name[1024];
1059 return plugin_add_impl(&shutdown_list, callback_type, "shutdown",
1060 plugin_get_name(name, cb_name, sizeof(cb_name)),
1061 (void *)callback, user_data);
1062 } /* sdb_plugin_register_shutdown */
1064 int
1065 sdb_plugin_register_log(const char *name, sdb_plugin_log_cb callback,
1066 sdb_object_t *user_data)
1067 {
1068 char cb_name[1024];
1069 return plugin_add_impl(&log_list, callback_type, "log",
1070 plugin_get_name(name, cb_name, sizeof(cb_name)),
1071 callback, user_data);
1072 } /* sdb_plugin_register_log */
1074 int
1075 sdb_plugin_register_cname(const char *name, sdb_plugin_cname_cb callback,
1076 sdb_object_t *user_data)
1077 {
1078 char cb_name[1024];
1079 return plugin_add_impl(&cname_list, callback_type, "cname",
1080 plugin_get_name(name, cb_name, sizeof(cb_name)),
1081 callback, user_data);
1082 } /* sdb_plugin_register_cname */
1084 int
1085 sdb_plugin_register_collector(const char *name, sdb_plugin_collector_cb callback,
1086 const sdb_time_t *interval, sdb_object_t *user_data)
1087 {
1088 char cb_name[1024];
1089 sdb_object_t *obj;
1091 if ((! name) || (! callback))
1092 return -1;
1094 if (! collector_list)
1095 collector_list = sdb_llist_create();
1096 if (! collector_list)
1097 return -1;
1099 plugin_get_name(name, cb_name, sizeof(cb_name));
1101 obj = sdb_object_create(cb_name, collector_type,
1102 &collector_list, "collector", callback, user_data);
1103 if (! obj)
1104 return -1;
1106 if (interval)
1107 CCB(obj)->ccb_interval = *interval;
1108 else {
1109 ctx_t *ctx = ctx_get();
1111 if (! ctx) {
1112 sdb_log(SDB_LOG_ERR, "core: Cannot determine interval "
1113 "for collector %s; none specified and no plugin "
1114 "context found", cb_name);
1115 return -1;
1116 }
1118 CCB(obj)->ccb_interval = ctx->public.interval;
1119 }
1121 if (! (CCB(obj)->ccb_next_update = sdb_gettime())) {
1122 char errbuf[1024];
1123 sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1124 "time: %s", sdb_strerror(errno, errbuf, sizeof(errbuf)));
1125 sdb_object_deref(obj);
1126 return -1;
1127 }
1129 if (sdb_llist_insert_sorted(collector_list, obj,
1130 plugin_cmp_next_update)) {
1131 sdb_object_deref(obj);
1132 return -1;
1133 }
1135 /* pass control to the list */
1136 sdb_object_deref(obj);
1138 sdb_log(SDB_LOG_INFO, "core: Registered collector callback '%s' "
1139 "(interval = %.3fs).", cb_name,
1140 SDB_TIME_TO_DOUBLE(CCB(obj)->ccb_interval));
1141 return 0;
1142 } /* sdb_plugin_register_collector */
1144 int
1145 sdb_plugin_register_timeseries_fetcher(const char *name,
1146 sdb_timeseries_fetcher_t *fetcher, sdb_object_t *user_data)
1147 {
1148 return plugin_add_impl(×eries_fetcher_list, ts_fetcher_type, "time-series fetcher",
1149 name, fetcher, user_data);
1150 } /* sdb_plugin_register_timeseries_fetcher */
1152 int
1153 sdb_plugin_register_writer(const char *name,
1154 sdb_store_writer_t *writer, sdb_object_t *user_data)
1155 {
1156 char cb_name[1024];
1157 return plugin_add_impl(&writer_list, writer_type, "store writer",
1158 plugin_get_name(name, cb_name, sizeof(cb_name)),
1159 writer, user_data);
1160 } /* sdb_store_register_writer */
1162 int
1163 sdb_plugin_register_reader(const char *name,
1164 sdb_store_reader_t *reader, sdb_object_t *user_data)
1165 {
1166 char cb_name[1024];
1167 return plugin_add_impl(&reader_list, reader_type, "store reader",
1168 plugin_get_name(name, cb_name, sizeof(cb_name)),
1169 reader, user_data);
1170 } /* sdb_plugin_register_reader */
1172 void
1173 sdb_plugin_unregister_all(void)
1174 {
1175 size_t i;
1177 for (i = 0; i < SDB_STATIC_ARRAY_LEN(all_lists); ++i) {
1178 const char *type = all_lists[i].type;
1179 sdb_llist_t *list = *all_lists[i].list;
1181 size_t len = sdb_llist_len(list);
1183 if (! len)
1184 continue;
1186 sdb_llist_clear(list);
1187 sdb_log(SDB_LOG_INFO, "core: Unregistered %zu %s callback%s",
1188 len, type, len == 1 ? "" : "s");
1189 }
1190 } /* sdb_plugin_unregister_all */
1192 sdb_plugin_ctx_t
1193 sdb_plugin_get_ctx(void)
1194 {
1195 ctx_t *c;
1197 c = ctx_get();
1198 if (! c) {
1199 sdb_log(SDB_LOG_ERR, "core: Invalid read access to plugin "
1200 "context outside a plugin");
1201 return plugin_default_ctx;
1202 }
1203 return c->public;
1204 } /* sdb_plugin_get_ctx */
1206 int
1207 sdb_plugin_set_ctx(sdb_plugin_ctx_t ctx, sdb_plugin_ctx_t *old)
1208 {
1209 ctx_t *c;
1211 c = ctx_get();
1212 if (! c) {
1213 sdb_log(SDB_LOG_ERR, "core: Invalid write access to plugin "
1214 "context outside a plugin");
1215 return -1;
1216 }
1218 if (old)
1219 *old = c->public;
1220 c->public = ctx;
1221 return 0;
1222 } /* sdb_plugin_set_ctx */
1224 const sdb_plugin_info_t *
1225 sdb_plugin_current(void)
1226 {
1227 ctx_t *ctx = ctx_get();
1229 if (! ctx)
1230 return NULL;
1231 return &ctx->info;
1232 } /* sdb_plugin_current */
1234 int
1235 sdb_plugin_configure(const char *name, oconfig_item_t *ci)
1236 {
1237 callback_t *plugin;
1238 sdb_plugin_config_cb callback;
1240 ctx_t *old_ctx;
1242 int status;
1244 if ((! name) || (! ci))
1245 return -1;
1247 plugin = CB(sdb_llist_search_by_name(config_list, name));
1248 if (! plugin) {
1249 ctx_t *ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
1250 if (! ctx)
1251 sdb_log(SDB_LOG_ERR, "core: Cannot configure unknown "
1252 "plugin '%s'. Missing 'LoadPlugin \"%s\"'?",
1253 name, name);
1254 else
1255 sdb_log(SDB_LOG_ERR, "core: Plugin '%s' did not register "
1256 "a config callback.", name);
1257 errno = ENOENT;
1258 return -1;
1259 }
1261 old_ctx = ctx_set(plugin->cb_ctx);
1262 callback = (sdb_plugin_config_cb)plugin->cb_callback;
1263 status = callback(ci);
1264 ctx_set(old_ctx);
1265 return status;
1266 } /* sdb_plugin_configure */
1268 int
1269 sdb_plugin_reconfigure_init(void)
1270 {
1271 sdb_llist_iter_t *iter;
1273 iter = sdb_llist_get_iter(config_list);
1274 if (config_list && (! iter))
1275 return -1;
1277 /* deconfigure all plugins */
1278 while (sdb_llist_iter_has_next(iter)) {
1279 callback_t *plugin;
1280 sdb_plugin_config_cb callback;
1281 ctx_t *old_ctx;
1283 plugin = CB(sdb_llist_iter_get_next(iter));
1284 old_ctx = ctx_set(plugin->cb_ctx);
1285 callback = (sdb_plugin_config_cb)plugin->cb_callback;
1286 callback(NULL);
1287 ctx_set(old_ctx);
1288 }
1289 sdb_llist_iter_destroy(iter);
1291 iter = sdb_llist_get_iter(all_plugins);
1292 if (all_plugins && (! iter))
1293 return -1;
1295 /* record all plugins as being unused */
1296 while (sdb_llist_iter_has_next(iter))
1297 CTX(sdb_llist_iter_get_next(iter))->use_cnt = 0;
1298 sdb_llist_iter_destroy(iter);
1300 sdb_plugin_unregister_all();
1301 return 0;
1302 } /* sdb_plugin_reconfigure_init */
1304 int
1305 sdb_plugin_reconfigure_finish(void)
1306 {
1307 sdb_llist_iter_t *iter;
1309 iter = sdb_llist_get_iter(all_plugins);
1310 if (all_plugins && (! iter))
1311 return -1;
1313 while (sdb_llist_iter_has_next(iter)) {
1314 ctx_t *ctx = CTX(sdb_llist_iter_get_next(iter));
1315 if (ctx->use_cnt)
1316 continue;
1318 sdb_log(SDB_LOG_INFO, "core: Module %s no longer in use",
1319 ctx->info.plugin_name);
1320 sdb_llist_iter_remove_current(iter);
1321 plugin_unregister_by_name(ctx->info.plugin_name);
1322 sdb_object_deref(SDB_OBJ(ctx));
1323 }
1324 sdb_llist_iter_destroy(iter);
1325 return 0;
1326 } /* sdb_plugin_reconfigure_finish */
1328 int
1329 sdb_plugin_init_all(void)
1330 {
1331 sdb_llist_iter_t *iter;
1332 int ret = 0;
1334 iter = sdb_llist_get_iter(init_list);
1335 while (sdb_llist_iter_has_next(iter)) {
1336 callback_t *cb;
1337 sdb_plugin_init_cb callback;
1338 ctx_t *old_ctx;
1340 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1341 assert(obj);
1342 cb = CB(obj);
1344 callback = (sdb_plugin_init_cb)cb->cb_callback;
1346 old_ctx = ctx_set(cb->cb_ctx);
1347 if (callback(cb->cb_user_data)) {
1348 sdb_log(SDB_LOG_ERR, "core: Failed to initialize plugin "
1349 "'%s'. Unregistering all callbacks.", obj->name);
1350 ctx_set(old_ctx);
1351 plugin_unregister_by_name(cb->cb_ctx->info.plugin_name);
1352 ++ret;
1353 }
1354 else
1355 ctx_set(old_ctx);
1356 }
1357 sdb_llist_iter_destroy(iter);
1358 return ret;
1359 } /* sdb_plugin_init_all */
1361 int
1362 sdb_plugin_shutdown_all(void)
1363 {
1364 sdb_llist_iter_t *iter;
1365 int ret = 0;
1367 iter = sdb_llist_get_iter(shutdown_list);
1368 while (sdb_llist_iter_has_next(iter)) {
1369 callback_t *cb;
1370 sdb_plugin_shutdown_cb callback;
1371 ctx_t *old_ctx;
1373 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1374 assert(obj);
1375 cb = CB(obj);
1377 callback = (sdb_plugin_shutdown_cb)cb->cb_callback;
1379 old_ctx = ctx_set(cb->cb_ctx);
1380 if (callback(cb->cb_user_data)) {
1381 sdb_log(SDB_LOG_ERR, "core: Failed to shutdown plugin '%s'.",
1382 obj->name);
1383 ++ret;
1384 }
1385 ctx_set(old_ctx);
1386 }
1387 sdb_llist_iter_destroy(iter);
1388 return ret;
1389 } /* sdb_plugin_shutdown_all */
1391 int
1392 sdb_plugin_collector_loop(sdb_plugin_loop_t *loop)
1393 {
1394 if (! collector_list) {
1395 sdb_log(SDB_LOG_WARNING, "core: No collectors registered. "
1396 "Quiting main loop.");
1397 return -1;
1398 }
1400 if (! loop)
1401 return -1;
1403 while (loop->do_loop) {
1404 sdb_plugin_collector_cb callback;
1405 ctx_t *old_ctx;
1407 sdb_time_t interval, now;
1409 sdb_object_t *obj = sdb_llist_shift(collector_list);
1410 if (! obj)
1411 return -1;
1413 callback = (sdb_plugin_collector_cb)CCB(obj)->ccb_callback;
1415 if (! (now = sdb_gettime())) {
1416 char errbuf[1024];
1417 sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1418 "time in collector main loop: %s",
1419 sdb_strerror(errno, errbuf, sizeof(errbuf)));
1420 now = CCB(obj)->ccb_next_update;
1421 }
1423 if (now < CCB(obj)->ccb_next_update) {
1424 interval = CCB(obj)->ccb_next_update - now;
1426 errno = 0;
1427 while (loop->do_loop && sdb_sleep(interval, &interval)) {
1428 if (errno != EINTR) {
1429 char errbuf[1024];
1430 sdb_log(SDB_LOG_ERR, "core: Failed to sleep "
1431 "in collector main loop: %s",
1432 sdb_strerror(errno, errbuf, sizeof(errbuf)));
1433 sdb_llist_insert_sorted(collector_list, obj,
1434 plugin_cmp_next_update);
1435 sdb_object_deref(obj);
1436 return -1;
1437 }
1438 errno = 0;
1439 }
1441 if (! loop->do_loop) {
1442 /* put back; don't worry about errors */
1443 sdb_llist_insert_sorted(collector_list, obj,
1444 plugin_cmp_next_update);
1445 sdb_object_deref(obj);
1446 return 0;
1447 }
1448 }
1450 old_ctx = ctx_set(CCB(obj)->ccb_ctx);
1451 if (callback(CCB(obj)->ccb_user_data)) {
1452 /* XXX */
1453 }
1454 ctx_set(old_ctx);
1456 interval = CCB(obj)->ccb_interval;
1457 if (! interval)
1458 interval = loop->default_interval;
1459 if (! interval) {
1460 sdb_log(SDB_LOG_WARNING, "core: No interval configured "
1461 "for plugin '%s'; skipping any further "
1462 "iterations.", obj->name);
1463 sdb_object_deref(obj);
1464 continue;
1465 }
1467 CCB(obj)->ccb_next_update += interval;
1469 if (! (now = sdb_gettime())) {
1470 char errbuf[1024];
1471 sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1472 "time in collector main loop: %s",
1473 sdb_strerror(errno, errbuf, sizeof(errbuf)));
1474 now = CCB(obj)->ccb_next_update;
1475 }
1477 if (now > CCB(obj)->ccb_next_update) {
1478 sdb_log(SDB_LOG_WARNING, "core: Plugin '%s' took too "
1479 "long; skipping iterations to keep up.",
1480 obj->name);
1481 CCB(obj)->ccb_next_update = now;
1482 }
1484 if (sdb_llist_insert_sorted(collector_list, obj,
1485 plugin_cmp_next_update)) {
1486 sdb_log(SDB_LOG_ERR, "core: Failed to re-insert "
1487 "plugin '%s' into collector list. Unable to further "
1488 "use the plugin.",
1489 obj->name);
1490 sdb_object_deref(obj);
1491 return -1;
1492 }
1494 /* pass control back to the list */
1495 sdb_object_deref(obj);
1496 }
1497 return 0;
1498 } /* sdb_plugin_read_loop */
1500 char *
1501 sdb_plugin_cname(char *hostname)
1502 {
1503 sdb_llist_iter_t *iter;
1505 if (! hostname)
1506 return NULL;
1508 if (! cname_list)
1509 return hostname;
1511 iter = sdb_llist_get_iter(cname_list);
1512 while (sdb_llist_iter_has_next(iter)) {
1513 sdb_plugin_cname_cb callback;
1514 char *cname;
1516 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1517 assert(obj);
1519 callback = (sdb_plugin_cname_cb)CB(obj)->cb_callback;
1520 cname = callback(hostname, CB(obj)->cb_user_data);
1521 if (cname) {
1522 free(hostname);
1523 hostname = cname;
1524 }
1525 /* else: don't change hostname */
1526 }
1527 sdb_llist_iter_destroy(iter);
1528 return hostname;
1529 } /* sdb_plugin_cname */
1531 int
1532 sdb_plugin_log(int prio, const char *msg)
1533 {
1534 sdb_llist_iter_t *iter;
1535 int ret = -1;
1537 bool logged = 0;
1539 if (! msg)
1540 return 0;
1542 iter = sdb_llist_get_iter(log_list);
1543 while (sdb_llist_iter_has_next(iter)) {
1544 sdb_plugin_log_cb callback;
1545 int tmp;
1547 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1548 assert(obj);
1550 callback = (sdb_plugin_log_cb)CB(obj)->cb_callback;
1551 tmp = callback(prio, msg, CB(obj)->cb_user_data);
1552 if (tmp > ret)
1553 ret = tmp;
1555 if (CB(obj)->cb_ctx)
1556 logged = 1;
1557 /* else: this is an internally registered callback */
1558 }
1559 sdb_llist_iter_destroy(iter);
1561 if (! logged)
1562 return fprintf(stderr, "[%s] %s\n", SDB_LOG_PRIO_TO_STRING(prio), msg);
1563 return ret;
1564 } /* sdb_plugin_log */
1566 int
1567 sdb_plugin_vlogf(int prio, const char *fmt, va_list ap)
1568 {
1569 sdb_strbuf_t *buf;
1570 int ret;
1572 if (! fmt)
1573 return 0;
1575 buf = sdb_strbuf_create(64);
1576 if (! buf) {
1577 ret = fprintf(stderr, "[%s] ", SDB_LOG_PRIO_TO_STRING(prio));
1578 ret += vfprintf(stderr, fmt, ap);
1579 return ret;
1580 }
1582 if (sdb_strbuf_vsprintf(buf, fmt, ap) < 0) {
1583 sdb_strbuf_destroy(buf);
1584 return -1;
1585 }
1587 ret = sdb_plugin_log(prio, sdb_strbuf_string(buf));
1588 sdb_strbuf_destroy(buf);
1589 return ret;
1590 } /* sdb_plugin_vlogf */
1592 int
1593 sdb_plugin_logf(int prio, const char *fmt, ...)
1594 {
1595 va_list ap;
1596 int ret;
1598 if (! fmt)
1599 return 0;
1601 va_start(ap, fmt);
1602 ret = sdb_plugin_vlogf(prio, fmt, ap);
1603 va_end(ap);
1604 return ret;
1605 } /* sdb_plugin_logf */
1607 sdb_timeseries_t *
1608 sdb_plugin_fetch_timeseries(const char *type, const char *id,
1609 sdb_timeseries_opts_t *opts)
1610 {
1611 ts_fetcher_t *fetcher;
1612 sdb_timeseries_t *ts;
1614 ctx_t *old_ctx;
1616 if ((! type) || (! id) || (! opts))
1617 return NULL;
1619 fetcher = TS_FETCHER(sdb_llist_search_by_name(timeseries_fetcher_list, type));
1620 if (! fetcher) {
1621 sdb_log(SDB_LOG_ERR, "core: Cannot fetch time-series of type %s: "
1622 "no such plugin loaded", type);
1623 errno = ENOENT;
1624 return NULL;
1625 }
1627 old_ctx = ctx_set(fetcher->ts_ctx);
1628 ts = fetcher->impl.fetch(id, opts, fetcher->ts_user_data);
1629 ctx_set(old_ctx);
1630 return ts;
1631 } /* sdb_plugin_fetch_timeseries */
1633 sdb_timeseries_info_t *
1634 sdb_plugin_describe_timeseries(const char *type, const char *id)
1635 {
1636 ts_fetcher_t *fetcher;
1637 sdb_timeseries_info_t *ts_info;
1639 ctx_t *old_ctx;
1641 if ((! type) || (! id))
1642 return NULL;
1644 fetcher = TS_FETCHER(sdb_llist_search_by_name(timeseries_fetcher_list, type));
1645 if (! fetcher) {
1646 sdb_log(SDB_LOG_ERR, "core: Cannot describe time-series of type %s: "
1647 "no such plugin loaded", type);
1648 errno = ENOENT;
1649 return NULL;
1650 }
1652 old_ctx = ctx_set(fetcher->ts_ctx);
1653 ts_info = fetcher->impl.describe(id, fetcher->ts_user_data);
1654 ctx_set(old_ctx);
1655 return ts_info;
1656 } /* sdb_plugin_describe_timeseries */
1658 int
1659 sdb_plugin_query(sdb_ast_node_t *ast,
1660 sdb_store_writer_t *w, sdb_object_t *wd,
1661 sdb_query_opts_t *opts, sdb_strbuf_t *errbuf)
1662 {
1663 query_writer_t qw = QUERY_WRITER_INIT(w, wd);
1664 reader_t *reader;
1665 sdb_object_t *q;
1667 size_t n = sdb_llist_len(reader_list);
1668 int status = 0;
1670 if (! ast)
1671 return 0;
1673 if (opts)
1674 qw.opts = *opts;
1676 if ((ast->type != SDB_AST_TYPE_FETCH)
1677 && (ast->type != SDB_AST_TYPE_LIST)
1678 && (ast->type != SDB_AST_TYPE_LOOKUP)) {
1679 sdb_log(SDB_LOG_ERR, "core: Cannot execute query of type %s",
1680 SDB_AST_TYPE_TO_STRING(ast));
1681 sdb_strbuf_sprintf(errbuf, "Cannot execute query of type %s",
1682 SDB_AST_TYPE_TO_STRING(ast));
1683 return -1;
1684 }
1686 if (n != 1) {
1687 char *msg = (n > 0)
1688 ? "Cannot execute query: multiple readers not supported"
1689 : "Cannot execute query: no readers registered";
1690 sdb_strbuf_sprintf(errbuf, "%s", msg);
1691 sdb_log(SDB_LOG_ERR, "core: %s", msg);
1692 return -1;
1693 }
1695 reader = READER(sdb_llist_get(reader_list, 0));
1696 assert(reader);
1698 q = reader->impl.prepare_query(ast, errbuf, reader->r_user_data);
1699 if (q)
1700 status = reader->impl.execute_query(q, &query_writer, SDB_OBJ(&qw),
1701 errbuf, reader->r_user_data);
1702 else
1703 status = -1;
1705 sdb_object_deref(SDB_OBJ(q));
1706 sdb_object_deref(SDB_OBJ(reader));
1707 return status;
1708 } /* sdb_plugin_query */
1710 int
1711 sdb_plugin_store_host(const char *name, sdb_time_t last_update)
1712 {
1713 sdb_store_host_t host = SDB_STORE_HOST_INIT;
1714 char *backends[1];
1715 char *cname;
1717 sdb_llist_iter_t *iter;
1718 int status = 0;
1720 if (! name)
1721 return -1;
1723 if (! sdb_llist_len(writer_list)) {
1724 sdb_log(SDB_LOG_ERR, "core: Cannot store host: "
1725 "no writers registered");
1726 return -1;
1727 }
1729 cname = sdb_plugin_cname(strdup(name));
1730 if (! cname) {
1731 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1732 return -1;
1733 }
1735 host.name = cname;
1736 host.last_update = last_update ? last_update : sdb_gettime();
1737 if (get_interval(SDB_HOST, NULL, -1, NULL, cname,
1738 host.last_update, &host.interval)) {
1739 free(cname);
1740 return 1;
1741 }
1742 host.backends = (const char * const *)backends;
1743 get_backend(backends, &host.backends_num);
1745 iter = sdb_llist_get_iter(writer_list);
1746 while (sdb_llist_iter_has_next(iter)) {
1747 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1748 int s;
1749 assert(writer);
1750 s = writer->impl.store_host(&host, writer->w_user_data);
1751 if (((s > 0) && (status >= 0)) || (s < 0))
1752 status = s;
1753 }
1754 sdb_llist_iter_destroy(iter);
1755 free(cname);
1756 return status;
1757 } /* sdb_plugin_store_host */
1759 int
1760 sdb_plugin_store_service(const char *hostname, const char *name,
1761 sdb_time_t last_update)
1762 {
1763 sdb_store_service_t service = SDB_STORE_SERVICE_INIT;
1764 char *backends[1];
1765 char *cname;
1767 sdb_llist_iter_t *iter;
1768 sdb_data_t d;
1770 int status = 0;
1772 if ((! hostname) || (! name))
1773 return -1;
1775 if (! sdb_llist_len(writer_list)) {
1776 sdb_log(SDB_LOG_ERR, "core: Cannot store service: "
1777 "no writers registered");
1778 return -1;
1779 }
1781 cname = sdb_plugin_cname(strdup(hostname));
1782 if (! cname) {
1783 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1784 return -1;
1785 }
1787 service.hostname = cname;
1788 service.name = name;
1789 service.last_update = last_update ? last_update : sdb_gettime();
1790 if (get_interval(SDB_SERVICE, cname, -1, NULL, name,
1791 service.last_update, &service.interval)) {
1792 free(cname);
1793 return 1;
1794 }
1795 service.backends = (const char * const *)backends;
1796 get_backend(backends, &service.backends_num);
1798 iter = sdb_llist_get_iter(writer_list);
1799 while (sdb_llist_iter_has_next(iter)) {
1800 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1801 int s;
1802 assert(writer);
1803 s = writer->impl.store_service(&service, writer->w_user_data);
1804 if (((s > 0) && (status >= 0)) || (s < 0))
1805 status = s;
1806 }
1807 sdb_llist_iter_destroy(iter);
1809 if (! status) {
1810 /* record the hostname as an attribute */
1811 d.type = SDB_TYPE_STRING;
1812 d.data.string = cname;
1813 if (sdb_plugin_store_service_attribute(cname, name,
1814 "hostname", &d, service.last_update))
1815 status = -1;
1816 }
1818 free(cname);
1819 return status;
1820 } /* sdb_plugin_store_service */
1822 int
1823 sdb_plugin_store_metric(const char *hostname, const char *name,
1824 sdb_metric_store_t *store, sdb_time_t last_update)
1825 {
1826 sdb_store_metric_t metric = SDB_STORE_METRIC_INIT;
1827 char *backends[1];
1828 char *cname;
1830 sdb_llist_iter_t *iter;
1831 sdb_data_t d;
1833 int status = 0;
1835 if ((! hostname) || (! name))
1836 return -1;
1838 if (! sdb_llist_len(writer_list)) {
1839 sdb_log(SDB_LOG_ERR, "core: Cannot store metric: "
1840 "no writers registered");
1841 return -1;
1842 }
1844 cname = sdb_plugin_cname(strdup(hostname));
1845 if (! cname) {
1846 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1847 return -1;
1848 }
1850 if (store && ((! store->type) || (! store->id)))
1851 store = NULL;
1853 metric.hostname = cname;
1854 metric.name = name;
1855 if (store) {
1856 if (store->last_update < last_update)
1857 store->last_update = last_update;
1858 metric.stores = store;
1859 metric.stores_num = 1;
1860 }
1861 metric.last_update = last_update ? last_update : sdb_gettime();
1862 if (get_interval(SDB_METRIC, cname, -1, NULL, name,
1863 metric.last_update, &metric.interval)) {
1864 free(cname);
1865 return 1;
1866 }
1867 metric.backends = (const char * const *)backends;
1868 get_backend(backends, &metric.backends_num);
1870 iter = sdb_llist_get_iter(writer_list);
1871 while (sdb_llist_iter_has_next(iter)) {
1872 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1873 int s;
1874 assert(writer);
1875 s = writer->impl.store_metric(&metric, writer->w_user_data);
1876 if (((s > 0) && (status >= 0)) || (s < 0))
1877 status = s;
1878 }
1879 sdb_llist_iter_destroy(iter);
1881 if (! status) {
1882 /* record the hostname as an attribute */
1883 d.type = SDB_TYPE_STRING;
1884 d.data.string = cname;
1885 if (sdb_plugin_store_metric_attribute(cname, name,
1886 "hostname", &d, metric.last_update))
1887 status = -1;
1888 }
1890 free(cname);
1891 return status;
1892 } /* sdb_plugin_store_metric */
1894 int
1895 sdb_plugin_store_attribute(const char *hostname, const char *key,
1896 const sdb_data_t *value, sdb_time_t last_update)
1897 {
1898 sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1899 char *backends[1];
1900 char *cname;
1902 sdb_llist_iter_t *iter;
1903 int status = 0;
1905 if ((! hostname) || (! key) || (! value))
1906 return -1;
1908 if (! sdb_llist_len(writer_list)) {
1909 sdb_log(SDB_LOG_ERR, "core: Cannot store attribute: "
1910 "no writers registered");
1911 return -1;
1912 }
1914 cname = sdb_plugin_cname(strdup(hostname));
1915 if (! cname) {
1916 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1917 return -1;
1918 }
1920 attr.parent_type = SDB_HOST;
1921 attr.parent = cname;
1922 attr.key = key;
1923 attr.value = *value;
1924 attr.last_update = last_update ? last_update : sdb_gettime();
1925 if (get_interval(SDB_ATTRIBUTE, cname, -1, NULL, key,
1926 attr.last_update, &attr.interval)) {
1927 free(cname);
1928 return 1;
1929 }
1930 attr.backends = (const char * const *)backends;
1931 get_backend(backends, &attr.backends_num);
1933 iter = sdb_llist_get_iter(writer_list);
1934 while (sdb_llist_iter_has_next(iter)) {
1935 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1936 int s;
1937 assert(writer);
1938 s = writer->impl.store_attribute(&attr, writer->w_user_data);
1939 if (((s > 0) && (status >= 0)) || (s < 0))
1940 status = s;
1941 }
1942 sdb_llist_iter_destroy(iter);
1943 free(cname);
1944 return status;
1945 } /* sdb_plugin_store_attribute */
1947 int
1948 sdb_plugin_store_service_attribute(const char *hostname, const char *service,
1949 const char *key, const sdb_data_t *value, sdb_time_t last_update)
1950 {
1951 sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1952 char *backends[1];
1953 char *cname;
1955 sdb_llist_iter_t *iter;
1956 int status = 0;
1958 if ((! hostname) || (! service) || (! key) || (! value))
1959 return -1;
1961 if (! sdb_llist_len(writer_list)) {
1962 sdb_log(SDB_LOG_ERR, "core: Cannot store service attribute: "
1963 "no writers registered");
1964 return -1;
1965 }
1967 cname = sdb_plugin_cname(strdup(hostname));
1968 if (! cname) {
1969 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1970 return -1;
1971 }
1973 attr.hostname = cname;
1974 attr.parent_type = SDB_SERVICE;
1975 attr.parent = service;
1976 attr.key = key;
1977 attr.value = *value;
1978 attr.last_update = last_update ? last_update : sdb_gettime();
1979 if (get_interval(SDB_ATTRIBUTE, cname, SDB_SERVICE, service, key,
1980 attr.last_update, &attr.interval)) {
1981 free(cname);
1982 return 1;
1983 }
1984 attr.backends = (const char * const *)backends;
1985 get_backend(backends, &attr.backends_num);
1987 iter = sdb_llist_get_iter(writer_list);
1988 while (sdb_llist_iter_has_next(iter)) {
1989 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1990 int s;
1991 assert(writer);
1992 s = writer->impl.store_attribute(&attr, writer->w_user_data);
1993 if (((s > 0) && (status >= 0)) || (s < 0))
1994 status = s;
1995 }
1996 sdb_llist_iter_destroy(iter);
1997 free(cname);
1998 return status;
1999 } /* sdb_plugin_store_service_attribute */
2001 int
2002 sdb_plugin_store_metric_attribute(const char *hostname, const char *metric,
2003 const char *key, const sdb_data_t *value, sdb_time_t last_update)
2004 {
2005 sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
2006 char *backends[1];
2007 char *cname;
2009 sdb_llist_iter_t *iter;
2010 int status = 0;
2012 if ((! hostname) || (! metric) || (! key) || (! value))
2013 return -1;
2015 if (! sdb_llist_len(writer_list)) {
2016 sdb_log(SDB_LOG_ERR, "core: Cannot store metric attribute: "
2017 "no writers registered");
2018 return -1;
2019 }
2021 cname = sdb_plugin_cname(strdup(hostname));
2022 if (! cname) {
2023 sdb_log(SDB_LOG_ERR, "core: strdup failed");
2024 return -1;
2025 }
2027 attr.hostname = cname;
2028 attr.parent_type = SDB_METRIC;
2029 attr.parent = metric;
2030 attr.key = key;
2031 attr.value = *value;
2032 attr.last_update = last_update ? last_update : sdb_gettime();
2033 if (get_interval(SDB_ATTRIBUTE, cname, SDB_METRIC, metric, key,
2034 attr.last_update, &attr.interval)) {
2035 free(cname);
2036 return 1;
2037 }
2038 attr.backends = (const char * const *)backends;
2039 get_backend(backends, &attr.backends_num);
2041 iter = sdb_llist_get_iter(writer_list);
2042 while (sdb_llist_iter_has_next(iter)) {
2043 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
2044 int s;
2045 assert(writer);
2046 s = writer->impl.store_attribute(&attr, writer->w_user_data);
2047 if (((s > 0) && (status >= 0)) || (s < 0))
2048 status = s;
2049 }
2050 sdb_llist_iter_destroy(iter);
2051 free(cname);
2052 return status;
2053 } /* sdb_plugin_store_metric_attribute */
2055 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */