76fc2092d022ff8588a2b5f11739e8653fa5c1c4
1 /*
2 * SysDB - src/core/plugin.c
3 * Copyright (C) 2012 Sebastian 'tokkee' Harl <sh@tokkee.org>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
19 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
28 #if HAVE_CONFIG_H
29 # include "config.h"
30 #endif /* HAVE_CONFIG_H */
32 #include "sysdb.h"
33 #include "core/plugin.h"
34 #include "core/time.h"
35 #include "utils/error.h"
36 #include "utils/llist.h"
37 #include "utils/strbuf.h"
39 #include <assert.h>
41 #include <errno.h>
43 #include <stdarg.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <strings.h>
47 #include <unistd.h>
49 #include <ltdl.h>
51 #include <pthread.h>
53 /* helper to access info attributes */
54 #define INFO_GET(i, attr) \
55 ((i)->attr ? (i)->attr : #attr" not set")
57 /*
58 * private data types
59 */
61 typedef struct {
62 sdb_object_t super;
63 sdb_plugin_ctx_t public;
65 sdb_plugin_info_t info;
66 lt_dlhandle handle;
68 /* The usage count differs from the object's ref count
69 * in that it provides higher level information about how
70 * the plugin is in use. */
71 size_t use_cnt;
72 } ctx_t;
73 #define CTX_INIT { SDB_OBJECT_INIT, \
74 SDB_PLUGIN_CTX_INIT, SDB_PLUGIN_INFO_INIT, NULL, 0 }
76 #define CTX(obj) ((ctx_t *)(obj))
78 typedef struct {
79 sdb_object_t super;
80 void *cb_callback;
81 sdb_object_t *cb_user_data;
82 ctx_t *cb_ctx;
83 } callback_t;
84 #define CB_INIT { SDB_OBJECT_INIT, \
85 /* callback = */ NULL, /* user_data = */ NULL, \
86 SDB_PLUGIN_CTX_INIT }
87 #define CB(obj) ((callback_t *)(obj))
88 #define CONST_CB(obj) ((const callback_t *)(obj))
90 typedef struct {
91 callback_t super;
92 #define ccb_callback super.cb_callback
93 #define ccb_user_data super.cb_user_data
94 #define ccb_ctx super.cb_ctx
95 sdb_time_t ccb_interval;
96 sdb_time_t ccb_next_update;
97 } collector_t;
98 #define CCB(obj) ((collector_t *)(obj))
99 #define CONST_CCB(obj) ((const collector_t *)(obj))
101 typedef struct {
102 callback_t super; /* cb_callback will always be NULL */
103 #define w_user_data super.cb_user_data
104 #define w_ctx super.cb_ctx
105 sdb_store_writer_t impl;
106 } writer_t;
107 #define WRITER(obj) ((writer_t *)(obj))
109 typedef struct {
110 callback_t super; /* cb_callback will always be NULL */
111 #define r_user_data super.cb_user_data
112 #define r_ctx super.cb_ctx
113 sdb_store_reader_t impl;
114 } reader_t;
115 #define READER(obj) ((reader_t *)(obj))
117 typedef struct {
118 callback_t super; /* cb_callback will always be NULL */
119 #define ts_user_data super.cb_user_data
120 #define ts_ctx super.cb_ctx
121 sdb_timeseries_fetcher_t impl;
122 } ts_fetcher_t;
123 #define TS_FETCHER(obj) ((ts_fetcher_t *)(obj))
125 /*
126 * private variables
127 */
129 static sdb_plugin_ctx_t plugin_default_ctx = SDB_PLUGIN_CTX_INIT;
130 static sdb_plugin_info_t plugin_default_info = SDB_PLUGIN_INFO_INIT;
132 static pthread_key_t plugin_ctx_key;
133 static bool plugin_ctx_key_initialized = 0;
135 /* a list of the plugin contexts of all registered plugins */
136 static sdb_llist_t *all_plugins = NULL;
138 static sdb_llist_t *config_list = NULL;
139 static sdb_llist_t *init_list = NULL;
140 static sdb_llist_t *collector_list = NULL;
141 static sdb_llist_t *cname_list = NULL;
142 static sdb_llist_t *shutdown_list = NULL;
143 static sdb_llist_t *log_list = NULL;
144 static sdb_llist_t *timeseries_fetcher_list = NULL;
145 static sdb_llist_t *writer_list = NULL;
146 static sdb_llist_t *reader_list = NULL;
148 static struct {
149 const char *type;
150 sdb_llist_t **list;
151 } all_lists[] = {
152 { "config", &config_list },
153 { "init", &init_list },
154 { "collector", &collector_list },
155 { "cname", &cname_list },
156 { "shutdown", &shutdown_list },
157 { "log", &log_list },
158 { "timeseries fetcher", ×eries_fetcher_list },
159 { "store writer", &writer_list },
160 { "store reader", &reader_list },
161 };
163 /*
164 * private helper functions
165 */
167 static void
168 plugin_info_clear(sdb_plugin_info_t *info)
169 {
170 sdb_plugin_info_t empty_info = SDB_PLUGIN_INFO_INIT;
171 if (! info)
172 return;
174 if (info->plugin_name)
175 free(info->plugin_name);
176 if (info->filename)
177 free(info->filename);
179 if (info->description)
180 free(info->description);
181 if (info->copyright)
182 free(info->copyright);
183 if (info->license)
184 free(info->license);
186 *info = empty_info;
187 } /* plugin_info_clear */
189 static void
190 ctx_key_init(void)
191 {
192 if (plugin_ctx_key_initialized)
193 return;
195 pthread_key_create(&plugin_ctx_key, /* destructor */ NULL);
196 plugin_ctx_key_initialized = 1;
197 } /* ctx_key_init */
199 static int
200 plugin_cmp_next_update(const sdb_object_t *a, const sdb_object_t *b)
201 {
202 const collector_t *ccb1 = (const collector_t *)a;
203 const collector_t *ccb2 = (const collector_t *)b;
205 assert(ccb1 && ccb2);
207 return (ccb1->ccb_next_update > ccb2->ccb_next_update)
208 ? 1 : (ccb1->ccb_next_update < ccb2->ccb_next_update)
209 ? -1 : 0;
210 } /* plugin_cmp_next_update */
212 static int
213 plugin_lookup_by_name(const sdb_object_t *obj, const void *id)
214 {
215 const callback_t *cb = CONST_CB(obj);
216 const char *name = id;
218 assert(cb && id);
220 /* when a plugin was registered from outside a plugin (e.g. the core),
221 * we don't have a plugin context */
222 if (! cb->cb_ctx)
223 return 1;
225 if (!strcasecmp(cb->cb_ctx->info.plugin_name, name))
226 return 0;
227 return 1;
228 } /* plugin_lookup_by_name */
230 /* since this function is called from sdb_plugin_reconfigure_finish()
231 * when iterating through all_plugins, we may not do any additional
232 * modifications to all_plugins except for the optional removal */
233 static void
234 plugin_unregister_by_name(const char *plugin_name)
235 {
236 sdb_object_t *obj;
237 size_t i;
239 for (i = 0; i < SDB_STATIC_ARRAY_LEN(all_lists); ++i) {
240 const char *type = all_lists[i].type;
241 sdb_llist_t *list = *all_lists[i].list;
243 while (1) {
244 callback_t *cb;
246 cb = CB(sdb_llist_remove(list,
247 plugin_lookup_by_name, plugin_name));
248 if (! cb)
249 break;
251 assert(cb->cb_ctx);
253 sdb_log(SDB_LOG_INFO, "core: Unregistering "
254 "%s callback '%s' (module %s)", type, cb->super.name,
255 cb->cb_ctx->info.plugin_name);
256 sdb_object_deref(SDB_OBJ(cb));
257 }
258 }
260 obj = sdb_llist_search_by_name(all_plugins, plugin_name);
261 /* when called from sdb_plugin_reconfigure_finish, the object has already
262 * been removed from the list */
263 if (obj && (obj->ref_cnt <= 1)) {
264 sdb_llist_remove_by_name(all_plugins, plugin_name);
265 sdb_object_deref(obj);
266 }
267 /* else: other callbacks still reference it */
268 } /* plugin_unregister_by_name */
270 /*
271 * store writer wrapper for performing database queries:
272 * It wraps another store writer, adding extra logic as needed.
273 */
275 typedef struct {
276 sdb_object_t super;
277 sdb_store_writer_t *w;
278 sdb_object_t *ud;
279 sdb_query_opts_t opts;
280 } query_writer_t;
281 #define QUERY_WRITER_INIT(w, ud) { \
282 SDB_OBJECT_INIT, \
283 (w), (ud), \
284 SDB_DEFAULT_QUERY_OPTS \
285 }
286 #define QUERY_WRITER(obj) ((query_writer_t *)(obj))
288 static int
289 query_store_host(sdb_store_host_t *host, sdb_object_t *user_data)
290 {
291 query_writer_t *qw = QUERY_WRITER(user_data);
292 return qw->w->store_host(host, qw->ud);
293 } /* query_store_host */
295 static int
296 query_store_service(sdb_store_service_t *service, sdb_object_t *user_data)
297 {
298 query_writer_t *qw = QUERY_WRITER(user_data);
299 return qw->w->store_service(service, qw->ud);
300 } /* query_store_service */
302 static int
303 query_store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
304 {
305 query_writer_t *qw = QUERY_WRITER(user_data);
306 sdb_timeseries_info_t *infos[metric->stores_num];
307 sdb_metric_store_t stores[metric->stores_num];
309 const sdb_metric_store_t *orig_stores = metric->stores;
310 int status;
311 size_t i;
313 if (! qw->opts.describe_timeseries)
314 /* nothing further to do */
315 return qw->w->store_metric(metric, qw->ud);
317 for (i = 0; i < metric->stores_num; i++) {
318 sdb_metric_store_t *s = stores + i;
319 *s = metric->stores[i];
320 infos[i] = sdb_plugin_describe_timeseries(s->type, s->id);
321 s->info = infos[i];
322 }
324 metric->stores = stores;
325 status = qw->w->store_metric(metric, qw->ud);
326 metric->stores = orig_stores;
328 for (i = 0; i < metric->stores_num; i++)
329 sdb_timeseries_info_destroy(infos[i]);
330 return status;
331 } /* query_store_metric */
333 static int
334 query_store_attribute(sdb_store_attribute_t *attr, sdb_object_t *user_data)
335 {
336 query_writer_t *qw = QUERY_WRITER(user_data);
337 return qw->w->store_attribute(attr, qw->ud);
338 } /* query_store_attribute */
340 static sdb_store_writer_t query_writer = {
341 query_store_host, query_store_service,
342 query_store_metric, query_store_attribute,
343 };
345 /*
346 * private types
347 */
349 static int
350 ctx_init(sdb_object_t *obj, va_list __attribute__((unused)) ap)
351 {
352 ctx_t *ctx = CTX(obj);
354 assert(ctx);
356 ctx->public = plugin_default_ctx;
357 ctx->info = plugin_default_info;
358 ctx->handle = NULL;
359 ctx->use_cnt = 1;
360 return 0;
361 } /* ctx_init */
363 static void
364 ctx_destroy(sdb_object_t *obj)
365 {
366 ctx_t *ctx = CTX(obj);
368 if (ctx->handle) {
369 const char *err;
371 sdb_log(SDB_LOG_INFO, "core: Unloading module %s",
372 ctx->info.plugin_name);
374 lt_dlerror();
375 lt_dlclose(ctx->handle);
376 if ((err = lt_dlerror()))
377 sdb_log(SDB_LOG_WARNING, "core: Failed to unload module %s: %s",
378 ctx->info.plugin_name, err);
379 }
381 plugin_info_clear(&ctx->info);
382 } /* ctx_destroy */
384 static sdb_type_t ctx_type = {
385 sizeof(ctx_t),
387 ctx_init,
388 ctx_destroy
389 };
391 static ctx_t *
392 ctx_get(void)
393 {
394 if (! plugin_ctx_key_initialized)
395 ctx_key_init();
396 return pthread_getspecific(plugin_ctx_key);
397 } /* ctx_get */
399 static ctx_t *
400 ctx_set(ctx_t *new)
401 {
402 ctx_t *old;
404 if (! plugin_ctx_key_initialized)
405 ctx_key_init();
407 old = pthread_getspecific(plugin_ctx_key);
408 if (old)
409 sdb_object_deref(SDB_OBJ(old));
410 if (new)
411 sdb_object_ref(SDB_OBJ(new));
412 pthread_setspecific(plugin_ctx_key, new);
413 return old;
414 } /* ctx_set */
416 static ctx_t *
417 ctx_create(const char *name)
418 {
419 ctx_t *ctx;
421 ctx = CTX(sdb_object_create(name, ctx_type));
422 if (! ctx)
423 return NULL;
425 if (! plugin_ctx_key_initialized)
426 ctx_key_init();
427 ctx_set(ctx);
428 return ctx;
429 } /* ctx_create */
431 /*
432 * plugin_init_ok:
433 * Checks whether the registration of a new plugin identified by 'obj' is
434 * okay. It consumes the first two arguments of 'ap'.
435 */
436 static bool
437 plugin_init_ok(sdb_object_t *obj, va_list ap)
438 {
439 sdb_llist_t **list = va_arg(ap, sdb_llist_t **);
440 const char *type = va_arg(ap, const char *);
442 assert(list); assert(type);
444 if (sdb_llist_search_by_name(*list, obj->name)) {
445 sdb_log(SDB_LOG_WARNING, "core: %s callback '%s' "
446 "has already been registered. Ignoring newly "
447 "registered version.", type, obj->name);
448 return 0;
449 }
450 return 1;
451 } /* plugin_init_ok */
453 static int
454 plugin_cb_init(sdb_object_t *obj, va_list ap)
455 {
456 void *callback;
457 sdb_object_t *ud;
459 if (! plugin_init_ok(obj, ap))
460 return -1;
462 callback = va_arg(ap, void *);
463 ud = va_arg(ap, sdb_object_t *);
465 /* cb_ctx may be NULL if the plugin was not registered by a plugin */
467 CB(obj)->cb_callback = callback;
468 CB(obj)->cb_ctx = ctx_get();
469 sdb_object_ref(SDB_OBJ(CB(obj)->cb_ctx));
471 sdb_object_ref(ud);
472 CB(obj)->cb_user_data = ud;
473 return 0;
474 } /* plugin_cb_init */
476 static void
477 plugin_cb_destroy(sdb_object_t *obj)
478 {
479 assert(obj);
480 sdb_object_deref(CB(obj)->cb_user_data);
481 sdb_object_deref(SDB_OBJ(CB(obj)->cb_ctx));
482 } /* plugin_cb_destroy */
484 static sdb_type_t callback_type = {
485 sizeof(callback_t),
487 plugin_cb_init,
488 plugin_cb_destroy
489 };
491 static sdb_type_t collector_type = {
492 sizeof(collector_t),
494 plugin_cb_init,
495 plugin_cb_destroy
496 };
498 static int
499 plugin_writer_init(sdb_object_t *obj, va_list ap)
500 {
501 sdb_store_writer_t *impl;
502 sdb_object_t *ud;
504 if (! plugin_init_ok(obj, ap))
505 return -1;
507 impl = va_arg(ap, sdb_store_writer_t *);
508 ud = va_arg(ap, sdb_object_t *);
509 assert(impl);
511 if ((! impl->store_host) || (! impl->store_service)
512 || (! impl->store_metric) || (! impl->store_attribute)) {
513 sdb_log(SDB_LOG_ERR, "core: store writer callback '%s' "
514 "does not fully implement the writer interface.",
515 obj->name);
516 return -1;
517 }
519 /* ctx may be NULL if the callback was not registered by a plugin */
521 WRITER(obj)->impl = *impl;
522 WRITER(obj)->w_ctx = ctx_get();
523 sdb_object_ref(SDB_OBJ(WRITER(obj)->w_ctx));
525 sdb_object_ref(ud);
526 WRITER(obj)->w_user_data = ud;
527 return 0;
528 } /* plugin_writer_init */
530 static void
531 plugin_writer_destroy(sdb_object_t *obj)
532 {
533 assert(obj);
534 sdb_object_deref(WRITER(obj)->w_user_data);
535 sdb_object_deref(SDB_OBJ(WRITER(obj)->w_ctx));
536 } /* plugin_writer_destroy */
538 static sdb_type_t writer_type = {
539 sizeof(writer_t),
541 plugin_writer_init,
542 plugin_writer_destroy
543 };
545 static int
546 plugin_reader_init(sdb_object_t *obj, va_list ap)
547 {
548 sdb_store_reader_t *impl;
549 sdb_object_t *ud;
551 if (! plugin_init_ok(obj, ap))
552 return -1;
554 impl = va_arg(ap, sdb_store_reader_t *);
555 ud = va_arg(ap, sdb_object_t *);
556 assert(impl);
558 if ((! impl->prepare_query) || (! impl->execute_query)) {
559 sdb_log(SDB_LOG_ERR, "core: store reader callback '%s' "
560 "does not fully implement the reader interface.",
561 obj->name);
562 return -1;
563 }
565 /* ctx may be NULL if the callback was not registered by a plugin */
567 READER(obj)->impl = *impl;
568 READER(obj)->r_ctx = ctx_get();
569 sdb_object_ref(SDB_OBJ(READER(obj)->r_ctx));
571 sdb_object_ref(ud);
572 READER(obj)->r_user_data = ud;
573 return 0;
574 } /* plugin_reader_init */
576 static void
577 plugin_reader_destroy(sdb_object_t *obj)
578 {
579 assert(obj);
580 sdb_object_deref(READER(obj)->r_user_data);
581 sdb_object_deref(SDB_OBJ(READER(obj)->r_ctx));
582 } /* plugin_reader_destroy */
584 static sdb_type_t reader_type = {
585 sizeof(reader_t),
587 plugin_reader_init,
588 plugin_reader_destroy
589 };
591 static int
592 plugin_ts_fetcher_init(sdb_object_t *obj, va_list ap)
593 {
594 sdb_timeseries_fetcher_t *impl;
595 sdb_object_t *ud;
597 if (! plugin_init_ok(obj, ap))
598 return -1;
600 impl = va_arg(ap, sdb_timeseries_fetcher_t *);
601 ud = va_arg(ap, sdb_object_t *);
602 assert(impl);
604 if ((! impl->describe) || (! impl->fetch)) {
605 sdb_log(SDB_LOG_ERR, "core: timeseries fetcher callback '%s' "
606 "does not fully implement the interface.",
607 obj->name);
608 return -1;
609 }
611 /* ctx may be NULL if the callback was not registered by a plugin */
613 TS_FETCHER(obj)->impl = *impl;
614 TS_FETCHER(obj)->ts_ctx = ctx_get();
615 sdb_object_ref(SDB_OBJ(TS_FETCHER(obj)->ts_ctx));
617 sdb_object_ref(ud);
618 TS_FETCHER(obj)->ts_user_data = ud;
619 return 0;
620 } /* plugin_ts_fetcher_init */
622 static void
623 plugin_ts_fetcher_destroy(sdb_object_t *obj)
624 {
625 assert(obj);
626 sdb_object_deref(TS_FETCHER(obj)->ts_user_data);
627 sdb_object_deref(SDB_OBJ(TS_FETCHER(obj)->ts_ctx));
628 } /* plugin_ts_fetcher_destroy */
630 static sdb_type_t ts_fetcher_type = {
631 sizeof(ts_fetcher_t),
633 plugin_ts_fetcher_init,
634 plugin_ts_fetcher_destroy
635 };
637 static int
638 module_init(const char *name, lt_dlhandle lh, sdb_plugin_info_t *info)
639 {
640 int (*mod_init)(sdb_plugin_info_t *);
641 int status;
643 mod_init = (int (*)(sdb_plugin_info_t *))lt_dlsym(lh, "sdb_module_init");
644 if (! mod_init) {
645 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': "
646 "could not find symbol 'sdb_module_init'", name);
647 return -1;
648 }
650 status = mod_init(info);
651 if (status) {
652 sdb_log(SDB_LOG_ERR, "core: Failed to initialize "
653 "module '%s'", name);
654 plugin_unregister_by_name(name);
655 return -1;
656 }
657 return 0;
658 } /* module_init */
660 static int
661 module_load(const char *basedir, const char *name,
662 const sdb_plugin_ctx_t *plugin_ctx)
663 {
664 char base_name[name ? strlen(name) + 1 : 1];
665 const char *name_ptr;
666 char *tmp;
668 char filename[1024];
669 lt_dlhandle lh;
671 ctx_t *ctx;
673 int status;
675 assert(name);
677 base_name[0] = '\0';
678 name_ptr = name;
680 while ((tmp = strstr(name_ptr, "::"))) {
681 strncat(base_name, name_ptr, (size_t)(tmp - name_ptr));
682 strcat(base_name, "/");
683 name_ptr = tmp + strlen("::");
684 }
685 strcat(base_name, name_ptr);
687 if (! basedir)
688 basedir = PKGLIBDIR;
690 snprintf(filename, sizeof(filename), "%s/%s.so", basedir, base_name);
691 filename[sizeof(filename) - 1] = '\0';
693 if (access(filename, R_OK)) {
694 char errbuf[1024];
695 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s' (%s): %s",
696 name, filename, sdb_strerror(errno, errbuf, sizeof(errbuf)));
697 return -1;
698 }
700 lt_dlinit();
701 lt_dlerror();
703 lh = lt_dlopen(filename);
704 if (! lh) {
705 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': %s"
706 "The most common cause for this problem are missing "
707 "dependencies.\n", name, lt_dlerror());
708 return -1;
709 }
711 if (ctx_get())
712 sdb_log(SDB_LOG_WARNING, "core: Discarding old plugin context");
714 ctx = ctx_create(name);
715 if (! ctx) {
716 sdb_log(SDB_LOG_ERR, "core: Failed to initialize plugin context");
717 return -1;
718 }
720 ctx->info.plugin_name = strdup(name);
721 ctx->info.filename = strdup(filename);
722 ctx->handle = lh;
724 if (plugin_ctx)
725 ctx->public = *plugin_ctx;
727 if ((status = module_init(name, lh, &ctx->info))) {
728 sdb_object_deref(SDB_OBJ(ctx));
729 ctx_set(NULL);
730 return status;
731 }
733 /* compare minor version */
734 if ((ctx->info.version < 0)
735 || ((int)(ctx->info.version / 100) != (int)(SDB_VERSION / 100)))
736 sdb_log(SDB_LOG_WARNING, "core: WARNING: version of "
737 "plugin '%s' (%i.%i.%i) does not match our version "
738 "(%i.%i.%i); this might cause problems",
739 name, SDB_VERSION_DECODE(ctx->info.version),
740 SDB_VERSION_DECODE(SDB_VERSION));
742 sdb_llist_append(all_plugins, SDB_OBJ(ctx));
744 sdb_log(SDB_LOG_INFO, "core: Successfully loaded "
745 "plugin %s v%i (%s)", ctx->info.plugin_name,
746 ctx->info.plugin_version,
747 INFO_GET(&ctx->info, description));
748 sdb_log(SDB_LOG_INFO, "core: Plugin %s: %s, License: %s",
749 ctx->info.plugin_name,
750 INFO_GET(&ctx->info, copyright),
751 INFO_GET(&ctx->info, license));
753 /* any registered callbacks took ownership of the context */
754 sdb_object_deref(SDB_OBJ(ctx));
756 /* reset */
757 ctx_set(NULL);
758 return 0;
759 } /* module_load */
761 static char *
762 plugin_get_name(const char *name, char *buf, size_t bufsize)
763 {
764 ctx_t *ctx = ctx_get();
766 if (ctx)
767 snprintf(buf, bufsize, "%s::%s", ctx->info.plugin_name, name);
768 else
769 snprintf(buf, bufsize, "core::%s", name);
770 return buf;
771 } /* plugin_get_name */
773 static int
774 plugin_add_impl(sdb_llist_t **list, sdb_type_t T, const char *type,
775 const char *name, void *impl, sdb_object_t *user_data)
776 {
777 sdb_object_t *obj;
779 if ((! name) || (! impl))
780 return -1;
782 assert(list);
784 if (! *list)
785 *list = sdb_llist_create();
786 if (! *list)
787 return -1;
789 obj = sdb_object_create(name, T, list, type, impl, user_data);
790 if (! obj)
791 return -1;
793 if (sdb_llist_append(*list, obj)) {
794 sdb_object_deref(obj);
795 return -1;
796 }
798 /* pass control to the list */
799 sdb_object_deref(obj);
801 sdb_log(SDB_LOG_INFO, "core: Registered %s callback '%s'.",
802 type, name);
803 return 0;
804 } /* plugin_add_impl */
806 /*
807 * object meta-data
808 */
810 typedef struct {
811 int obj_type;
812 sdb_time_t last_update;
813 sdb_time_t interval;
814 } interval_fetcher_t;
816 static int
817 interval_fetcher_host(sdb_store_host_t *host, sdb_object_t *user_data)
818 {
819 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
820 lu->obj_type = SDB_HOST;
821 lu->last_update = host->last_update;
822 return 0;
823 } /* interval_fetcher_host */
825 static int
826 interval_fetcher_service(sdb_store_service_t *svc, sdb_object_t *user_data)
827 {
828 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
829 lu->obj_type = SDB_SERVICE;
830 lu->last_update = svc->last_update;
831 return 0;
832 } /* interval_fetcher_service */
834 static int
835 interval_fetcher_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
836 {
837 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
838 lu->obj_type = SDB_METRIC;
839 lu->last_update = metric->last_update;
840 return 0;
841 } /* interval_fetcher_metric */
843 static int
844 interval_fetcher_attr(sdb_store_attribute_t *attr, sdb_object_t *user_data)
845 {
846 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
847 lu->obj_type = SDB_ATTRIBUTE;
848 lu->last_update = attr->last_update;
849 return 0;
850 } /* interval_fetcher_attr */
852 static sdb_store_writer_t interval_fetcher = {
853 interval_fetcher_host, interval_fetcher_service,
854 interval_fetcher_metric, interval_fetcher_attr,
855 };
857 static int
858 get_interval(int obj_type, const char *hostname,
859 int parent_type, const char *parent, const char *name,
860 sdb_time_t last_update, sdb_time_t *interval_out)
861 {
862 sdb_ast_fetch_t fetch = SDB_AST_FETCH_INIT;
863 char hn[hostname ? strlen(hostname) + 1 : 1];
864 char pn[parent ? strlen(parent) + 1 : 1];
865 char n[strlen(name) + 1];
866 int status;
868 interval_fetcher_t lu = { 0, 0, 0 };
869 sdb_object_wrapper_t obj = SDB_OBJECT_WRAPPER_STATIC(&lu);
870 sdb_time_t interval;
872 assert(name);
874 if (hostname)
875 strncpy(hn, hostname, sizeof(hn));
876 if (parent)
877 strncpy(pn, parent, sizeof(pn));
878 strncpy(n, name, sizeof(n));
880 fetch.obj_type = obj_type;
881 fetch.hostname = hostname ? hn : NULL;
882 fetch.parent_type = parent_type;
883 fetch.parent = parent ? pn : NULL;
884 fetch.name = n;
886 status = sdb_plugin_query(SDB_AST_NODE(&fetch),
887 &interval_fetcher, SDB_OBJ(&obj), NULL, NULL);
888 if ((status < 0) || (lu.obj_type != obj_type) || (lu.last_update == 0)) {
889 *interval_out = 0;
890 return 0;
891 }
893 if (lu.last_update >= last_update) {
894 if (lu.last_update > last_update)
895 sdb_log(SDB_LOG_DEBUG, "memstore: Cannot update %s '%s' - "
896 "value too old (%"PRIsdbTIME" < %"PRIsdbTIME")",
897 SDB_STORE_TYPE_TO_NAME(obj_type), name,
898 lu.last_update, last_update);
899 *interval_out = lu.interval;
900 return 1;
901 }
903 interval = last_update - lu.last_update;
904 if (lu.interval && interval)
905 interval = (sdb_time_t)((0.9 * (double)lu.interval)
906 + (0.1 * (double)interval));
907 *interval_out = interval;
908 return 0;
909 } /* get_interval */
911 static void
912 get_backend(char **backends, size_t *backends_num)
913 {
914 const sdb_plugin_info_t *info;
916 info = sdb_plugin_current();
917 if ((! info) || (! info->plugin_name) || (! *info->plugin_name)) {
918 *backends_num = 0;
919 return;
920 }
922 backends[0] = info->plugin_name;
923 *backends_num = 1;
924 } /* get_backend */
926 /*
927 * public API
928 */
930 int
931 sdb_plugin_load(const char *basedir, const char *name,
932 const sdb_plugin_ctx_t *plugin_ctx)
933 {
934 ctx_t *ctx;
936 int status;
938 if ((! name) || (! *name))
939 return -1;
941 if (! all_plugins) {
942 if (! (all_plugins = sdb_llist_create())) {
943 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': "
944 "internal error while creating linked list", name);
945 return -1;
946 }
947 }
949 ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
950 if (ctx) {
951 /* plugin already loaded */
952 if (! ctx->use_cnt) {
953 /* reloading plugin */
954 ctx_t *old_ctx = ctx_set(ctx);
956 status = module_init(ctx->info.plugin_name, ctx->handle, NULL);
957 ctx_set(old_ctx);
958 if (status)
959 return status;
961 sdb_log(SDB_LOG_INFO, "core: Successfully reloaded plugin "
962 "'%s' (%s)", ctx->info.plugin_name,
963 INFO_GET(&ctx->info, description));
964 }
965 ++ctx->use_cnt;
966 return 0;
967 }
969 return module_load(basedir, name, plugin_ctx);
970 } /* sdb_plugin_load */
972 int
973 sdb_plugin_set_info(sdb_plugin_info_t *info, int type, ...)
974 {
975 va_list ap;
977 if (! info)
978 return -1;
980 va_start(ap, type);
982 switch (type) {
983 case SDB_PLUGIN_INFO_DESC:
984 {
985 char *desc = va_arg(ap, char *);
986 if (desc) {
987 if (info->description)
988 free(info->description);
989 info->description = strdup(desc);
990 }
991 }
992 break;
993 case SDB_PLUGIN_INFO_COPYRIGHT:
994 {
995 char *copyright = va_arg(ap, char *);
996 if (copyright)
997 info->copyright = strdup(copyright);
998 }
999 break;
1000 case SDB_PLUGIN_INFO_LICENSE:
1001 {
1002 char *license = va_arg(ap, char *);
1003 if (license) {
1004 if (info->license)
1005 free(info->license);
1006 info->license = strdup(license);
1007 }
1008 }
1009 break;
1010 case SDB_PLUGIN_INFO_VERSION:
1011 {
1012 int version = va_arg(ap, int);
1013 info->version = version;
1014 }
1015 break;
1016 case SDB_PLUGIN_INFO_PLUGIN_VERSION:
1017 {
1018 int version = va_arg(ap, int);
1019 info->plugin_version = version;
1020 }
1021 break;
1022 default:
1023 va_end(ap);
1024 return -1;
1025 }
1027 va_end(ap);
1028 return 0;
1029 } /* sdb_plugin_set_info */
1031 int
1032 sdb_plugin_register_config(sdb_plugin_config_cb callback)
1033 {
1034 ctx_t *ctx = ctx_get();
1036 if (! ctx) {
1037 sdb_log(SDB_LOG_ERR, "core: Invalid attempt to register a "
1038 "config callback from outside a plugin");
1039 return -1;
1040 }
1041 return plugin_add_impl(&config_list, callback_type, "config",
1042 ctx->info.plugin_name, (void *)callback, NULL);
1043 } /* sdb_plugin_register_config */
1045 int
1046 sdb_plugin_register_init(const char *name, sdb_plugin_init_cb callback,
1047 sdb_object_t *user_data)
1048 {
1049 char cb_name[1024];
1050 return plugin_add_impl(&init_list, callback_type, "init",
1051 plugin_get_name(name, cb_name, sizeof(cb_name)),
1052 (void *)callback, user_data);
1053 } /* sdb_plugin_register_init */
1055 int
1056 sdb_plugin_register_shutdown(const char *name, sdb_plugin_shutdown_cb callback,
1057 sdb_object_t *user_data)
1058 {
1059 char cb_name[1024];
1060 return plugin_add_impl(&shutdown_list, callback_type, "shutdown",
1061 plugin_get_name(name, cb_name, sizeof(cb_name)),
1062 (void *)callback, user_data);
1063 } /* sdb_plugin_register_shutdown */
1065 int
1066 sdb_plugin_register_log(const char *name, sdb_plugin_log_cb callback,
1067 sdb_object_t *user_data)
1068 {
1069 char cb_name[1024];
1070 return plugin_add_impl(&log_list, callback_type, "log",
1071 plugin_get_name(name, cb_name, sizeof(cb_name)),
1072 callback, user_data);
1073 } /* sdb_plugin_register_log */
1075 int
1076 sdb_plugin_register_cname(const char *name, sdb_plugin_cname_cb callback,
1077 sdb_object_t *user_data)
1078 {
1079 char cb_name[1024];
1080 return plugin_add_impl(&cname_list, callback_type, "cname",
1081 plugin_get_name(name, cb_name, sizeof(cb_name)),
1082 callback, user_data);
1083 } /* sdb_plugin_register_cname */
1085 int
1086 sdb_plugin_register_collector(const char *name, sdb_plugin_collector_cb callback,
1087 const sdb_time_t *interval, sdb_object_t *user_data)
1088 {
1089 char cb_name[1024];
1090 sdb_object_t *obj;
1092 if ((! name) || (! callback))
1093 return -1;
1095 if (! collector_list)
1096 collector_list = sdb_llist_create();
1097 if (! collector_list)
1098 return -1;
1100 plugin_get_name(name, cb_name, sizeof(cb_name));
1102 obj = sdb_object_create(cb_name, collector_type,
1103 &collector_list, "collector", callback, user_data);
1104 if (! obj)
1105 return -1;
1107 if (interval)
1108 CCB(obj)->ccb_interval = *interval;
1109 else {
1110 ctx_t *ctx = ctx_get();
1112 if (! ctx) {
1113 sdb_log(SDB_LOG_ERR, "core: Cannot determine interval "
1114 "for collector %s; none specified and no plugin "
1115 "context found", cb_name);
1116 return -1;
1117 }
1119 CCB(obj)->ccb_interval = ctx->public.interval;
1120 }
1122 if (! (CCB(obj)->ccb_next_update = sdb_gettime())) {
1123 char errbuf[1024];
1124 sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1125 "time: %s", sdb_strerror(errno, errbuf, sizeof(errbuf)));
1126 sdb_object_deref(obj);
1127 return -1;
1128 }
1130 if (sdb_llist_insert_sorted(collector_list, obj,
1131 plugin_cmp_next_update)) {
1132 sdb_object_deref(obj);
1133 return -1;
1134 }
1136 /* pass control to the list */
1137 sdb_object_deref(obj);
1139 sdb_log(SDB_LOG_INFO, "core: Registered collector callback '%s' "
1140 "(interval = %.3fs).", cb_name,
1141 SDB_TIME_TO_DOUBLE(CCB(obj)->ccb_interval));
1142 return 0;
1143 } /* sdb_plugin_register_collector */
1145 int
1146 sdb_plugin_register_timeseries_fetcher(const char *name,
1147 sdb_timeseries_fetcher_t *fetcher, sdb_object_t *user_data)
1148 {
1149 return plugin_add_impl(×eries_fetcher_list, ts_fetcher_type, "time-series fetcher",
1150 name, fetcher, user_data);
1151 } /* sdb_plugin_register_timeseries_fetcher */
1153 int
1154 sdb_plugin_register_writer(const char *name,
1155 sdb_store_writer_t *writer, sdb_object_t *user_data)
1156 {
1157 char cb_name[1024];
1158 return plugin_add_impl(&writer_list, writer_type, "store writer",
1159 plugin_get_name(name, cb_name, sizeof(cb_name)),
1160 writer, user_data);
1161 } /* sdb_store_register_writer */
1163 int
1164 sdb_plugin_register_reader(const char *name,
1165 sdb_store_reader_t *reader, sdb_object_t *user_data)
1166 {
1167 char cb_name[1024];
1168 return plugin_add_impl(&reader_list, reader_type, "store reader",
1169 plugin_get_name(name, cb_name, sizeof(cb_name)),
1170 reader, user_data);
1171 } /* sdb_plugin_register_reader */
1173 void
1174 sdb_plugin_unregister_all(void)
1175 {
1176 size_t i;
1178 for (i = 0; i < SDB_STATIC_ARRAY_LEN(all_lists); ++i) {
1179 const char *type = all_lists[i].type;
1180 sdb_llist_t *list = *all_lists[i].list;
1182 size_t len = sdb_llist_len(list);
1184 if (! len)
1185 continue;
1187 sdb_llist_clear(list);
1188 sdb_log(SDB_LOG_INFO, "core: Unregistered %zu %s callback%s",
1189 len, type, len == 1 ? "" : "s");
1190 }
1191 } /* sdb_plugin_unregister_all */
1193 sdb_plugin_ctx_t
1194 sdb_plugin_get_ctx(void)
1195 {
1196 ctx_t *c;
1198 c = ctx_get();
1199 if (! c) {
1200 sdb_log(SDB_LOG_ERR, "core: Invalid read access to plugin "
1201 "context outside a plugin");
1202 return plugin_default_ctx;
1203 }
1204 return c->public;
1205 } /* sdb_plugin_get_ctx */
1207 int
1208 sdb_plugin_set_ctx(sdb_plugin_ctx_t ctx, sdb_plugin_ctx_t *old)
1209 {
1210 ctx_t *c;
1212 c = ctx_get();
1213 if (! c) {
1214 sdb_log(SDB_LOG_ERR, "core: Invalid write access to plugin "
1215 "context outside a plugin");
1216 return -1;
1217 }
1219 if (old)
1220 *old = c->public;
1221 c->public = ctx;
1222 return 0;
1223 } /* sdb_plugin_set_ctx */
1225 const sdb_plugin_info_t *
1226 sdb_plugin_current(void)
1227 {
1228 ctx_t *ctx = ctx_get();
1230 if (! ctx)
1231 return NULL;
1232 return &ctx->info;
1233 } /* sdb_plugin_current */
1235 int
1236 sdb_plugin_configure(const char *name, oconfig_item_t *ci)
1237 {
1238 callback_t *plugin;
1239 sdb_plugin_config_cb callback;
1241 ctx_t *old_ctx;
1243 int status;
1245 if ((! name) || (! ci))
1246 return -1;
1248 plugin = CB(sdb_llist_search_by_name(config_list, name));
1249 if (! plugin) {
1250 ctx_t *ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
1251 if (! ctx)
1252 sdb_log(SDB_LOG_ERR, "core: Cannot configure unknown "
1253 "plugin '%s'. Missing 'LoadPlugin \"%s\"'?",
1254 name, name);
1255 else
1256 sdb_log(SDB_LOG_ERR, "core: Plugin '%s' did not register "
1257 "a config callback.", name);
1258 errno = ENOENT;
1259 return -1;
1260 }
1262 old_ctx = ctx_set(plugin->cb_ctx);
1263 callback = (sdb_plugin_config_cb)plugin->cb_callback;
1264 status = callback(ci);
1265 ctx_set(old_ctx);
1266 return status;
1267 } /* sdb_plugin_configure */
1269 int
1270 sdb_plugin_reconfigure_init(void)
1271 {
1272 sdb_llist_iter_t *iter;
1274 iter = sdb_llist_get_iter(config_list);
1275 if (config_list && (! iter))
1276 return -1;
1278 /* deconfigure all plugins */
1279 while (sdb_llist_iter_has_next(iter)) {
1280 callback_t *plugin;
1281 sdb_plugin_config_cb callback;
1282 ctx_t *old_ctx;
1284 plugin = CB(sdb_llist_iter_get_next(iter));
1285 old_ctx = ctx_set(plugin->cb_ctx);
1286 callback = (sdb_plugin_config_cb)plugin->cb_callback;
1287 callback(NULL);
1288 ctx_set(old_ctx);
1289 }
1290 sdb_llist_iter_destroy(iter);
1292 iter = sdb_llist_get_iter(all_plugins);
1293 if (all_plugins && (! iter))
1294 return -1;
1296 /* record all plugins as being unused */
1297 while (sdb_llist_iter_has_next(iter))
1298 CTX(sdb_llist_iter_get_next(iter))->use_cnt = 0;
1299 sdb_llist_iter_destroy(iter);
1301 sdb_plugin_unregister_all();
1302 return 0;
1303 } /* sdb_plugin_reconfigure_init */
1305 int
1306 sdb_plugin_reconfigure_finish(void)
1307 {
1308 sdb_llist_iter_t *iter;
1310 iter = sdb_llist_get_iter(all_plugins);
1311 if (all_plugins && (! iter))
1312 return -1;
1314 while (sdb_llist_iter_has_next(iter)) {
1315 ctx_t *ctx = CTX(sdb_llist_iter_get_next(iter));
1316 if (ctx->use_cnt)
1317 continue;
1319 sdb_log(SDB_LOG_INFO, "core: Module %s no longer in use",
1320 ctx->info.plugin_name);
1321 sdb_llist_iter_remove_current(iter);
1322 plugin_unregister_by_name(ctx->info.plugin_name);
1323 sdb_object_deref(SDB_OBJ(ctx));
1324 }
1325 sdb_llist_iter_destroy(iter);
1326 return 0;
1327 } /* sdb_plugin_reconfigure_finish */
1329 int
1330 sdb_plugin_init_all(void)
1331 {
1332 sdb_llist_iter_t *iter;
1333 int ret = 0;
1335 iter = sdb_llist_get_iter(init_list);
1336 while (sdb_llist_iter_has_next(iter)) {
1337 callback_t *cb;
1338 sdb_plugin_init_cb callback;
1339 ctx_t *old_ctx;
1341 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1342 assert(obj);
1343 cb = CB(obj);
1345 callback = (sdb_plugin_init_cb)cb->cb_callback;
1347 old_ctx = ctx_set(cb->cb_ctx);
1348 if (callback(cb->cb_user_data)) {
1349 sdb_log(SDB_LOG_ERR, "core: Failed to initialize plugin "
1350 "'%s'. Unregistering all callbacks.", obj->name);
1351 ctx_set(old_ctx);
1352 plugin_unregister_by_name(cb->cb_ctx->info.plugin_name);
1353 ++ret;
1354 }
1355 else
1356 ctx_set(old_ctx);
1357 }
1358 sdb_llist_iter_destroy(iter);
1359 return ret;
1360 } /* sdb_plugin_init_all */
1362 int
1363 sdb_plugin_shutdown_all(void)
1364 {
1365 sdb_llist_iter_t *iter;
1366 int ret = 0;
1368 iter = sdb_llist_get_iter(shutdown_list);
1369 while (sdb_llist_iter_has_next(iter)) {
1370 callback_t *cb;
1371 sdb_plugin_shutdown_cb callback;
1372 ctx_t *old_ctx;
1374 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1375 assert(obj);
1376 cb = CB(obj);
1378 callback = (sdb_plugin_shutdown_cb)cb->cb_callback;
1380 old_ctx = ctx_set(cb->cb_ctx);
1381 if (callback(cb->cb_user_data)) {
1382 sdb_log(SDB_LOG_ERR, "core: Failed to shutdown plugin '%s'.",
1383 obj->name);
1384 ++ret;
1385 }
1386 ctx_set(old_ctx);
1387 }
1388 sdb_llist_iter_destroy(iter);
1389 return ret;
1390 } /* sdb_plugin_shutdown_all */
1392 int
1393 sdb_plugin_collector_loop(sdb_plugin_loop_t *loop)
1394 {
1395 if (! collector_list) {
1396 sdb_log(SDB_LOG_WARNING, "core: No collectors registered. "
1397 "Quiting main loop.");
1398 return -1;
1399 }
1401 if (! loop)
1402 return -1;
1404 while (loop->do_loop) {
1405 sdb_plugin_collector_cb callback;
1406 ctx_t *old_ctx;
1408 sdb_time_t interval, now;
1410 sdb_object_t *obj = sdb_llist_shift(collector_list);
1411 if (! obj)
1412 return -1;
1414 callback = (sdb_plugin_collector_cb)CCB(obj)->ccb_callback;
1416 if (! (now = sdb_gettime())) {
1417 char errbuf[1024];
1418 sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1419 "time in collector main loop: %s",
1420 sdb_strerror(errno, errbuf, sizeof(errbuf)));
1421 now = CCB(obj)->ccb_next_update;
1422 }
1424 if (now < CCB(obj)->ccb_next_update) {
1425 interval = CCB(obj)->ccb_next_update - now;
1427 errno = 0;
1428 while (loop->do_loop && sdb_sleep(interval, &interval)) {
1429 if (errno != EINTR) {
1430 char errbuf[1024];
1431 sdb_log(SDB_LOG_ERR, "core: Failed to sleep "
1432 "in collector main loop: %s",
1433 sdb_strerror(errno, errbuf, sizeof(errbuf)));
1434 sdb_llist_insert_sorted(collector_list, obj,
1435 plugin_cmp_next_update);
1436 sdb_object_deref(obj);
1437 return -1;
1438 }
1439 errno = 0;
1440 }
1442 if (! loop->do_loop) {
1443 /* put back; don't worry about errors */
1444 sdb_llist_insert_sorted(collector_list, obj,
1445 plugin_cmp_next_update);
1446 sdb_object_deref(obj);
1447 return 0;
1448 }
1449 }
1451 old_ctx = ctx_set(CCB(obj)->ccb_ctx);
1452 if (callback(CCB(obj)->ccb_user_data)) {
1453 /* XXX */
1454 }
1455 ctx_set(old_ctx);
1457 interval = CCB(obj)->ccb_interval;
1458 if (! interval)
1459 interval = loop->default_interval;
1460 if (! interval) {
1461 sdb_log(SDB_LOG_WARNING, "core: No interval configured "
1462 "for plugin '%s'; skipping any further "
1463 "iterations.", obj->name);
1464 sdb_object_deref(obj);
1465 continue;
1466 }
1468 CCB(obj)->ccb_next_update += interval;
1470 if (! (now = sdb_gettime())) {
1471 char errbuf[1024];
1472 sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1473 "time in collector main loop: %s",
1474 sdb_strerror(errno, errbuf, sizeof(errbuf)));
1475 now = CCB(obj)->ccb_next_update;
1476 }
1478 if (now > CCB(obj)->ccb_next_update) {
1479 sdb_log(SDB_LOG_WARNING, "core: Plugin '%s' took too "
1480 "long; skipping iterations to keep up.",
1481 obj->name);
1482 CCB(obj)->ccb_next_update = now;
1483 }
1485 if (sdb_llist_insert_sorted(collector_list, obj,
1486 plugin_cmp_next_update)) {
1487 sdb_log(SDB_LOG_ERR, "core: Failed to re-insert "
1488 "plugin '%s' into collector list. Unable to further "
1489 "use the plugin.",
1490 obj->name);
1491 sdb_object_deref(obj);
1492 return -1;
1493 }
1495 /* pass control back to the list */
1496 sdb_object_deref(obj);
1497 }
1498 return 0;
1499 } /* sdb_plugin_read_loop */
1501 char *
1502 sdb_plugin_cname(char *hostname)
1503 {
1504 sdb_llist_iter_t *iter;
1506 if (! hostname)
1507 return NULL;
1509 if (! cname_list)
1510 return hostname;
1512 iter = sdb_llist_get_iter(cname_list);
1513 while (sdb_llist_iter_has_next(iter)) {
1514 sdb_plugin_cname_cb callback;
1515 char *cname;
1517 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1518 assert(obj);
1520 callback = (sdb_plugin_cname_cb)CB(obj)->cb_callback;
1521 cname = callback(hostname, CB(obj)->cb_user_data);
1522 if (cname) {
1523 free(hostname);
1524 hostname = cname;
1525 }
1526 /* else: don't change hostname */
1527 }
1528 sdb_llist_iter_destroy(iter);
1529 return hostname;
1530 } /* sdb_plugin_cname */
1532 int
1533 sdb_plugin_log(int prio, const char *msg)
1534 {
1535 sdb_llist_iter_t *iter;
1536 int ret = -1;
1538 bool logged = 0;
1540 if (! msg)
1541 return 0;
1543 iter = sdb_llist_get_iter(log_list);
1544 while (sdb_llist_iter_has_next(iter)) {
1545 sdb_plugin_log_cb callback;
1546 int tmp;
1548 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1549 assert(obj);
1551 callback = (sdb_plugin_log_cb)CB(obj)->cb_callback;
1552 tmp = callback(prio, msg, CB(obj)->cb_user_data);
1553 if (tmp > ret)
1554 ret = tmp;
1556 if (CB(obj)->cb_ctx)
1557 logged = 1;
1558 /* else: this is an internally registered callback */
1559 }
1560 sdb_llist_iter_destroy(iter);
1562 if (! logged)
1563 return fprintf(stderr, "[%s] %s\n", SDB_LOG_PRIO_TO_STRING(prio), msg);
1564 return ret;
1565 } /* sdb_plugin_log */
1567 int
1568 sdb_plugin_vlogf(int prio, const char *fmt, va_list ap)
1569 {
1570 sdb_strbuf_t *buf;
1571 int ret;
1573 if (! fmt)
1574 return 0;
1576 buf = sdb_strbuf_create(64);
1577 if (! buf) {
1578 ret = fprintf(stderr, "[%s] ", SDB_LOG_PRIO_TO_STRING(prio));
1579 ret += vfprintf(stderr, fmt, ap);
1580 return ret;
1581 }
1583 if (sdb_strbuf_vsprintf(buf, fmt, ap) < 0) {
1584 sdb_strbuf_destroy(buf);
1585 return -1;
1586 }
1588 ret = sdb_plugin_log(prio, sdb_strbuf_string(buf));
1589 sdb_strbuf_destroy(buf);
1590 return ret;
1591 } /* sdb_plugin_vlogf */
1593 int
1594 sdb_plugin_logf(int prio, const char *fmt, ...)
1595 {
1596 va_list ap;
1597 int ret;
1599 if (! fmt)
1600 return 0;
1602 va_start(ap, fmt);
1603 ret = sdb_plugin_vlogf(prio, fmt, ap);
1604 va_end(ap);
1605 return ret;
1606 } /* sdb_plugin_logf */
1608 sdb_timeseries_t *
1609 sdb_plugin_fetch_timeseries(const char *type, const char *id,
1610 sdb_timeseries_opts_t *opts)
1611 {
1612 ts_fetcher_t *fetcher;
1613 sdb_timeseries_t *ts;
1615 ctx_t *old_ctx;
1617 if ((! type) || (! id) || (! opts))
1618 return NULL;
1620 fetcher = TS_FETCHER(sdb_llist_search_by_name(timeseries_fetcher_list, type));
1621 if (! fetcher) {
1622 sdb_log(SDB_LOG_ERR, "core: Cannot fetch time-series of type %s: "
1623 "no such plugin loaded", type);
1624 errno = ENOENT;
1625 return NULL;
1626 }
1628 old_ctx = ctx_set(fetcher->ts_ctx);
1629 ts = fetcher->impl.fetch(id, opts, fetcher->ts_user_data);
1630 ctx_set(old_ctx);
1631 return ts;
1632 } /* sdb_plugin_fetch_timeseries */
1634 sdb_timeseries_info_t *
1635 sdb_plugin_describe_timeseries(const char *type, const char *id)
1636 {
1637 ts_fetcher_t *fetcher;
1638 sdb_timeseries_info_t *ts_info;
1640 ctx_t *old_ctx;
1642 if ((! type) || (! id))
1643 return NULL;
1645 fetcher = TS_FETCHER(sdb_llist_search_by_name(timeseries_fetcher_list, type));
1646 if (! fetcher) {
1647 sdb_log(SDB_LOG_ERR, "core: Cannot describe time-series of type %s: "
1648 "no such plugin loaded", type);
1649 errno = ENOENT;
1650 return NULL;
1651 }
1653 old_ctx = ctx_set(fetcher->ts_ctx);
1654 ts_info = fetcher->impl.describe(id, fetcher->ts_user_data);
1655 ctx_set(old_ctx);
1656 return ts_info;
1657 } /* sdb_plugin_describe_timeseries */
1659 int
1660 sdb_plugin_query(sdb_ast_node_t *ast,
1661 sdb_store_writer_t *w, sdb_object_t *wd,
1662 sdb_query_opts_t *opts, sdb_strbuf_t *errbuf)
1663 {
1664 query_writer_t qw = QUERY_WRITER_INIT(w, wd);
1665 reader_t *reader;
1666 sdb_object_t *q;
1668 size_t n = sdb_llist_len(reader_list);
1669 int status = 0;
1671 if (! ast)
1672 return 0;
1674 if (opts)
1675 qw.opts = *opts;
1677 if ((ast->type != SDB_AST_TYPE_FETCH)
1678 && (ast->type != SDB_AST_TYPE_LIST)
1679 && (ast->type != SDB_AST_TYPE_LOOKUP)) {
1680 sdb_log(SDB_LOG_ERR, "core: Cannot execute query of type %s",
1681 SDB_AST_TYPE_TO_STRING(ast));
1682 sdb_strbuf_sprintf(errbuf, "Cannot execute query of type %s",
1683 SDB_AST_TYPE_TO_STRING(ast));
1684 return -1;
1685 }
1687 if (n != 1) {
1688 char *msg = (n > 0)
1689 ? "Cannot execute query: multiple readers not supported"
1690 : "Cannot execute query: no readers registered";
1691 sdb_strbuf_sprintf(errbuf, "%s", msg);
1692 sdb_log(SDB_LOG_ERR, "core: %s", msg);
1693 return -1;
1694 }
1696 reader = READER(sdb_llist_get(reader_list, 0));
1697 assert(reader);
1699 q = reader->impl.prepare_query(ast, errbuf, reader->r_user_data);
1700 if (q)
1701 status = reader->impl.execute_query(q, &query_writer, SDB_OBJ(&qw),
1702 errbuf, reader->r_user_data);
1703 else
1704 status = -1;
1706 sdb_object_deref(SDB_OBJ(q));
1707 sdb_object_deref(SDB_OBJ(reader));
1708 return status;
1709 } /* sdb_plugin_query */
1711 int
1712 sdb_plugin_store_host(const char *name, sdb_time_t last_update)
1713 {
1714 sdb_store_host_t host = SDB_STORE_HOST_INIT;
1715 char *backends[1];
1716 char *cname;
1718 sdb_llist_iter_t *iter;
1719 int status = 0;
1721 if (! name)
1722 return -1;
1724 if (! sdb_llist_len(writer_list)) {
1725 sdb_log(SDB_LOG_ERR, "core: Cannot store host: "
1726 "no writers registered");
1727 return -1;
1728 }
1730 cname = sdb_plugin_cname(strdup(name));
1731 if (! cname) {
1732 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1733 return -1;
1734 }
1736 host.name = cname;
1737 host.last_update = last_update ? last_update : sdb_gettime();
1738 if (get_interval(SDB_HOST, NULL, -1, NULL, cname,
1739 host.last_update, &host.interval)) {
1740 free(cname);
1741 return 1;
1742 }
1743 host.backends = (const char * const *)backends;
1744 get_backend(backends, &host.backends_num);
1746 iter = sdb_llist_get_iter(writer_list);
1747 while (sdb_llist_iter_has_next(iter)) {
1748 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1749 int s;
1750 assert(writer);
1751 s = writer->impl.store_host(&host, writer->w_user_data);
1752 if (((s > 0) && (status >= 0)) || (s < 0))
1753 status = s;
1754 }
1755 sdb_llist_iter_destroy(iter);
1756 free(cname);
1757 return status;
1758 } /* sdb_plugin_store_host */
1760 int
1761 sdb_plugin_store_service(const char *hostname, const char *name,
1762 sdb_time_t last_update)
1763 {
1764 sdb_store_service_t service = SDB_STORE_SERVICE_INIT;
1765 char *backends[1];
1766 char *cname;
1768 sdb_llist_iter_t *iter;
1769 sdb_data_t d;
1771 int status = 0;
1773 if ((! hostname) || (! name))
1774 return -1;
1776 if (! sdb_llist_len(writer_list)) {
1777 sdb_log(SDB_LOG_ERR, "core: Cannot store service: "
1778 "no writers registered");
1779 return -1;
1780 }
1782 cname = sdb_plugin_cname(strdup(hostname));
1783 if (! cname) {
1784 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1785 return -1;
1786 }
1788 service.hostname = cname;
1789 service.name = name;
1790 service.last_update = last_update ? last_update : sdb_gettime();
1791 if (get_interval(SDB_SERVICE, cname, -1, NULL, name,
1792 service.last_update, &service.interval)) {
1793 free(cname);
1794 return 1;
1795 }
1796 service.backends = (const char * const *)backends;
1797 get_backend(backends, &service.backends_num);
1799 iter = sdb_llist_get_iter(writer_list);
1800 while (sdb_llist_iter_has_next(iter)) {
1801 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1802 int s;
1803 assert(writer);
1804 s = writer->impl.store_service(&service, writer->w_user_data);
1805 if (((s > 0) && (status >= 0)) || (s < 0))
1806 status = s;
1807 }
1808 sdb_llist_iter_destroy(iter);
1810 if (! status) {
1811 /* record the hostname as an attribute */
1812 d.type = SDB_TYPE_STRING;
1813 d.data.string = cname;
1814 if (sdb_plugin_store_service_attribute(cname, name,
1815 "hostname", &d, service.last_update))
1816 status = -1;
1817 }
1819 free(cname);
1820 return status;
1821 } /* sdb_plugin_store_service */
1823 int
1824 sdb_plugin_store_metric(const char *hostname, const char *name,
1825 sdb_metric_store_t *store, sdb_time_t last_update)
1826 {
1827 sdb_store_metric_t metric = SDB_STORE_METRIC_INIT;
1828 char *backends[1];
1829 char *cname;
1831 sdb_llist_iter_t *iter;
1832 sdb_data_t d;
1834 int status = 0;
1836 if ((! hostname) || (! name))
1837 return -1;
1839 if (! sdb_llist_len(writer_list)) {
1840 sdb_log(SDB_LOG_ERR, "core: Cannot store metric: "
1841 "no writers registered");
1842 return -1;
1843 }
1845 cname = sdb_plugin_cname(strdup(hostname));
1846 if (! cname) {
1847 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1848 return -1;
1849 }
1851 if (store && ((! store->type) || (! store->id)))
1852 store = NULL;
1854 metric.hostname = cname;
1855 metric.name = name;
1856 if (store) {
1857 if (store->last_update < last_update)
1858 store->last_update = last_update;
1859 metric.stores = store;
1860 metric.stores_num = 1;
1861 }
1862 metric.last_update = last_update ? last_update : sdb_gettime();
1863 if (get_interval(SDB_METRIC, cname, -1, NULL, name,
1864 metric.last_update, &metric.interval)) {
1865 free(cname);
1866 return 1;
1867 }
1868 metric.backends = (const char * const *)backends;
1869 get_backend(backends, &metric.backends_num);
1871 iter = sdb_llist_get_iter(writer_list);
1872 while (sdb_llist_iter_has_next(iter)) {
1873 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1874 int s;
1875 assert(writer);
1876 s = writer->impl.store_metric(&metric, writer->w_user_data);
1877 if (((s > 0) && (status >= 0)) || (s < 0))
1878 status = s;
1879 }
1880 sdb_llist_iter_destroy(iter);
1882 if (! status) {
1883 /* record the hostname as an attribute */
1884 d.type = SDB_TYPE_STRING;
1885 d.data.string = cname;
1886 if (sdb_plugin_store_metric_attribute(cname, name,
1887 "hostname", &d, metric.last_update))
1888 status = -1;
1889 }
1891 free(cname);
1892 return status;
1893 } /* sdb_plugin_store_metric */
1895 int
1896 sdb_plugin_store_attribute(const char *hostname, const char *key,
1897 const sdb_data_t *value, sdb_time_t last_update)
1898 {
1899 sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1900 char *backends[1];
1901 char *cname;
1903 sdb_llist_iter_t *iter;
1904 int status = 0;
1906 if ((! hostname) || (! key) || (! value))
1907 return -1;
1909 if (! sdb_llist_len(writer_list)) {
1910 sdb_log(SDB_LOG_ERR, "core: Cannot store attribute: "
1911 "no writers registered");
1912 return -1;
1913 }
1915 cname = sdb_plugin_cname(strdup(hostname));
1916 if (! cname) {
1917 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1918 return -1;
1919 }
1921 attr.parent_type = SDB_HOST;
1922 attr.parent = cname;
1923 attr.key = key;
1924 attr.value = *value;
1925 attr.last_update = last_update ? last_update : sdb_gettime();
1926 if (get_interval(SDB_ATTRIBUTE, cname, -1, NULL, key,
1927 attr.last_update, &attr.interval)) {
1928 free(cname);
1929 return 1;
1930 }
1931 attr.backends = (const char * const *)backends;
1932 get_backend(backends, &attr.backends_num);
1934 iter = sdb_llist_get_iter(writer_list);
1935 while (sdb_llist_iter_has_next(iter)) {
1936 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1937 int s;
1938 assert(writer);
1939 s = writer->impl.store_attribute(&attr, writer->w_user_data);
1940 if (((s > 0) && (status >= 0)) || (s < 0))
1941 status = s;
1942 }
1943 sdb_llist_iter_destroy(iter);
1944 free(cname);
1945 return status;
1946 } /* sdb_plugin_store_attribute */
1948 int
1949 sdb_plugin_store_service_attribute(const char *hostname, const char *service,
1950 const char *key, const sdb_data_t *value, sdb_time_t last_update)
1951 {
1952 sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1953 char *backends[1];
1954 char *cname;
1956 sdb_llist_iter_t *iter;
1957 int status = 0;
1959 if ((! hostname) || (! service) || (! key) || (! value))
1960 return -1;
1962 if (! sdb_llist_len(writer_list)) {
1963 sdb_log(SDB_LOG_ERR, "core: Cannot store service attribute: "
1964 "no writers registered");
1965 return -1;
1966 }
1968 cname = sdb_plugin_cname(strdup(hostname));
1969 if (! cname) {
1970 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1971 return -1;
1972 }
1974 attr.hostname = cname;
1975 attr.parent_type = SDB_SERVICE;
1976 attr.parent = service;
1977 attr.key = key;
1978 attr.value = *value;
1979 attr.last_update = last_update ? last_update : sdb_gettime();
1980 if (get_interval(SDB_ATTRIBUTE, cname, SDB_SERVICE, service, key,
1981 attr.last_update, &attr.interval)) {
1982 free(cname);
1983 return 1;
1984 }
1985 attr.backends = (const char * const *)backends;
1986 get_backend(backends, &attr.backends_num);
1988 iter = sdb_llist_get_iter(writer_list);
1989 while (sdb_llist_iter_has_next(iter)) {
1990 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1991 int s;
1992 assert(writer);
1993 s = writer->impl.store_attribute(&attr, writer->w_user_data);
1994 if (((s > 0) && (status >= 0)) || (s < 0))
1995 status = s;
1996 }
1997 sdb_llist_iter_destroy(iter);
1998 free(cname);
1999 return status;
2000 } /* sdb_plugin_store_service_attribute */
2002 int
2003 sdb_plugin_store_metric_attribute(const char *hostname, const char *metric,
2004 const char *key, const sdb_data_t *value, sdb_time_t last_update)
2005 {
2006 sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
2007 char *backends[1];
2008 char *cname;
2010 sdb_llist_iter_t *iter;
2011 int status = 0;
2013 if ((! hostname) || (! metric) || (! key) || (! value))
2014 return -1;
2016 if (! sdb_llist_len(writer_list)) {
2017 sdb_log(SDB_LOG_ERR, "core: Cannot store metric attribute: "
2018 "no writers registered");
2019 return -1;
2020 }
2022 cname = sdb_plugin_cname(strdup(hostname));
2023 if (! cname) {
2024 sdb_log(SDB_LOG_ERR, "core: strdup failed");
2025 return -1;
2026 }
2028 attr.hostname = cname;
2029 attr.parent_type = SDB_METRIC;
2030 attr.parent = metric;
2031 attr.key = key;
2032 attr.value = *value;
2033 attr.last_update = last_update ? last_update : sdb_gettime();
2034 if (get_interval(SDB_ATTRIBUTE, cname, SDB_METRIC, metric, key,
2035 attr.last_update, &attr.interval)) {
2036 free(cname);
2037 return 1;
2038 }
2039 attr.backends = (const char * const *)backends;
2040 get_backend(backends, &attr.backends_num);
2042 iter = sdb_llist_get_iter(writer_list);
2043 while (sdb_llist_iter_has_next(iter)) {
2044 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
2045 int s;
2046 assert(writer);
2047 s = writer->impl.store_attribute(&attr, writer->w_user_data);
2048 if (((s > 0) && (status >= 0)) || (s < 0))
2049 status = s;
2050 }
2051 sdb_llist_iter_destroy(iter);
2052 free(cname);
2053 return status;
2054 } /* sdb_plugin_store_metric_attribute */
2056 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */