1 /*
2 * SysDB - src/core/plugin.c
3 * Copyright (C) 2012 Sebastian 'tokkee' Harl <sh@tokkee.org>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
19 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
28 #if HAVE_CONFIG_H
29 # include "config.h"
30 #endif /* HAVE_CONFIG_H */
32 #include "sysdb.h"
33 #include "core/plugin.h"
34 #include "core/time.h"
35 #include "utils/error.h"
36 #include "utils/llist.h"
37 #include "utils/strbuf.h"
39 #include <assert.h>
41 #include <errno.h>
43 #include <stdarg.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <strings.h>
47 #include <unistd.h>
49 #include <ltdl.h>
51 #include <pthread.h>
53 /* helper to access info attributes */
54 #define INFO_GET(i, attr) \
55 ((i)->attr ? (i)->attr : #attr" not set")
57 /*
58 * private data types
59 */
61 typedef struct {
62 sdb_object_t super;
63 sdb_plugin_ctx_t public;
65 sdb_plugin_info_t info;
66 lt_dlhandle handle;
68 /* The usage count differs from the object's ref count
69 * in that it provides higher level information about how
70 * the plugin is in use. */
71 size_t use_cnt;
72 } ctx_t;
73 #define CTX_INIT { SDB_OBJECT_INIT, \
74 SDB_PLUGIN_CTX_INIT, SDB_PLUGIN_INFO_INIT, NULL, 0 }
76 #define CTX(obj) ((ctx_t *)(obj))
78 typedef struct {
79 sdb_object_t super;
80 void *cb_callback;
81 sdb_object_t *cb_user_data;
82 ctx_t *cb_ctx;
83 } callback_t;
84 #define CB_INIT { SDB_OBJECT_INIT, \
85 /* callback = */ NULL, /* user_data = */ NULL, \
86 SDB_PLUGIN_CTX_INIT }
87 #define CB(obj) ((callback_t *)(obj))
88 #define CONST_CB(obj) ((const callback_t *)(obj))
90 typedef struct {
91 callback_t super;
92 #define ccb_callback super.cb_callback
93 #define ccb_user_data super.cb_user_data
94 #define ccb_ctx super.cb_ctx
95 sdb_time_t ccb_interval;
96 sdb_time_t ccb_next_update;
97 } collector_t;
98 #define CCB(obj) ((collector_t *)(obj))
99 #define CONST_CCB(obj) ((const collector_t *)(obj))
101 typedef struct {
102 callback_t super; /* cb_callback will always be NULL */
103 #define w_user_data super.cb_user_data
104 #define w_ctx super.cb_ctx
105 sdb_store_writer_t impl;
106 } writer_t;
107 #define WRITER(obj) ((writer_t *)(obj))
109 typedef struct {
110 callback_t super; /* cb_callback will always be NULL */
111 #define r_user_data super.cb_user_data
112 #define r_ctx super.cb_ctx
113 sdb_store_reader_t impl;
114 } reader_t;
115 #define READER(obj) ((reader_t *)(obj))
117 typedef struct {
118 callback_t super; /* cb_callback will always be NULL */
119 #define ts_user_data super.cb_user_data
120 #define ts_ctx super.cb_ctx
121 sdb_timeseries_fetcher_t impl;
122 } ts_fetcher_t;
123 #define TS_FETCHER(obj) ((ts_fetcher_t *)(obj))
125 /*
126 * private variables
127 */
129 static sdb_plugin_ctx_t plugin_default_ctx = SDB_PLUGIN_CTX_INIT;
130 static sdb_plugin_info_t plugin_default_info = SDB_PLUGIN_INFO_INIT;
132 static pthread_key_t plugin_ctx_key;
133 static bool plugin_ctx_key_initialized = 0;
135 /* a list of the plugin contexts of all registered plugins */
136 static sdb_llist_t *all_plugins = NULL;
138 static sdb_llist_t *config_list = NULL;
139 static sdb_llist_t *init_list = NULL;
140 static sdb_llist_t *collector_list = NULL;
141 static sdb_llist_t *cname_list = NULL;
142 static sdb_llist_t *shutdown_list = NULL;
143 static sdb_llist_t *log_list = NULL;
144 static sdb_llist_t *timeseries_fetcher_list = NULL;
145 static sdb_llist_t *writer_list = NULL;
146 static sdb_llist_t *reader_list = NULL;
148 static struct {
149 const char *type;
150 sdb_llist_t **list;
151 } all_lists[] = {
152 { "config", &config_list },
153 { "init", &init_list },
154 { "collector", &collector_list },
155 { "cname", &cname_list },
156 { "shutdown", &shutdown_list },
157 { "log", &log_list },
158 { "timeseries fetcher", ×eries_fetcher_list },
159 { "store writer", &writer_list },
160 { "store reader", &reader_list },
161 };
163 /*
164 * private helper functions
165 */
167 static void
168 plugin_info_clear(sdb_plugin_info_t *info)
169 {
170 sdb_plugin_info_t empty_info = SDB_PLUGIN_INFO_INIT;
171 if (! info)
172 return;
174 if (info->plugin_name)
175 free(info->plugin_name);
176 if (info->filename)
177 free(info->filename);
179 if (info->description)
180 free(info->description);
181 if (info->copyright)
182 free(info->copyright);
183 if (info->license)
184 free(info->license);
186 *info = empty_info;
187 } /* plugin_info_clear */
189 static void
190 ctx_key_init(void)
191 {
192 if (plugin_ctx_key_initialized)
193 return;
195 pthread_key_create(&plugin_ctx_key, /* destructor */ NULL);
196 plugin_ctx_key_initialized = 1;
197 } /* ctx_key_init */
199 static int
200 plugin_cmp_next_update(const sdb_object_t *a, const sdb_object_t *b)
201 {
202 const collector_t *ccb1 = (const collector_t *)a;
203 const collector_t *ccb2 = (const collector_t *)b;
205 assert(ccb1 && ccb2);
207 return (ccb1->ccb_next_update > ccb2->ccb_next_update)
208 ? 1 : (ccb1->ccb_next_update < ccb2->ccb_next_update)
209 ? -1 : 0;
210 } /* plugin_cmp_next_update */
212 static int
213 plugin_lookup_by_name(const sdb_object_t *obj, const void *id)
214 {
215 const callback_t *cb = CONST_CB(obj);
216 const char *name = id;
218 assert(cb && id);
220 /* when a plugin was registered from outside a plugin (e.g. the core),
221 * we don't have a plugin context */
222 if (! cb->cb_ctx)
223 return 1;
225 if (!strcasecmp(cb->cb_ctx->info.plugin_name, name))
226 return 0;
227 return 1;
228 } /* plugin_lookup_by_name */
230 /* since this function is called from sdb_plugin_reconfigure_finish()
231 * when iterating through all_plugins, we may not do any additional
232 * modifications to all_plugins except for the optional removal */
233 static void
234 plugin_unregister_by_name(const char *plugin_name)
235 {
236 sdb_object_t *obj;
237 size_t i;
239 for (i = 0; i < SDB_STATIC_ARRAY_LEN(all_lists); ++i) {
240 const char *type = all_lists[i].type;
241 sdb_llist_t *list = *all_lists[i].list;
243 while (1) {
244 callback_t *cb;
246 cb = CB(sdb_llist_remove(list,
247 plugin_lookup_by_name, plugin_name));
248 if (! cb)
249 break;
251 assert(cb->cb_ctx);
253 sdb_log(SDB_LOG_INFO, "core: Unregistering "
254 "%s callback '%s' (module %s)", type, cb->super.name,
255 cb->cb_ctx->info.plugin_name);
256 sdb_object_deref(SDB_OBJ(cb));
257 }
258 }
260 obj = sdb_llist_search_by_name(all_plugins, plugin_name);
261 /* when called from sdb_plugin_reconfigure_finish, the object has already
262 * been removed from the list */
263 if (obj && (obj->ref_cnt <= 1)) {
264 sdb_llist_remove_by_name(all_plugins, plugin_name);
265 sdb_object_deref(obj);
266 }
267 /* else: other callbacks still reference it */
268 } /* plugin_unregister_by_name */
270 /*
271 * store writer wrapper for performing database queries:
272 * It wraps another store writer, adding extra logic as needed.
273 */
275 typedef struct {
276 sdb_object_t super;
277 sdb_store_writer_t *w;
278 sdb_object_t *ud;
279 } query_writer_t;
280 #define QUERY_WRITER_INIT(w, ud) { SDB_OBJECT_INIT, (w), (ud) }
281 #define QUERY_WRITER(obj) ((query_writer_t *)(obj))
283 static int
284 query_store_host(sdb_store_host_t *host, sdb_object_t *user_data)
285 {
286 query_writer_t *qw = QUERY_WRITER(user_data);
287 return qw->w->store_host(host, qw->ud);
288 } /* query_store_host */
290 static int
291 query_store_service(sdb_store_service_t *service, sdb_object_t *user_data)
292 {
293 query_writer_t *qw = QUERY_WRITER(user_data);
294 return qw->w->store_service(service, qw->ud);
295 } /* query_store_service */
297 static int
298 query_store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
299 {
300 query_writer_t *qw = QUERY_WRITER(user_data);
301 sdb_timeseries_info_t *infos[metric->stores_num];
302 sdb_metric_store_t stores[metric->stores_num];
304 const sdb_metric_store_t *orig_stores = metric->stores;
305 int status;
306 size_t i;
308 for (i = 0; i < metric->stores_num; i++) {
309 /* TODO: Make this optional using query options. */
310 sdb_metric_store_t *s = stores + i;
311 *s = metric->stores[i];
312 infos[i] = sdb_plugin_describe_timeseries(s->type, s->id);
313 s->info = infos[i];
314 }
316 metric->stores = stores;
317 status = qw->w->store_metric(metric, qw->ud);
318 metric->stores = orig_stores;
320 for (i = 0; i < metric->stores_num; i++)
321 sdb_timeseries_info_destroy(infos[i]);
322 return status;
323 } /* query_store_metric */
325 static int
326 query_store_attribute(sdb_store_attribute_t *attr, sdb_object_t *user_data)
327 {
328 query_writer_t *qw = QUERY_WRITER(user_data);
329 return qw->w->store_attribute(attr, qw->ud);
330 } /* query_store_attribute */
332 static sdb_store_writer_t query_writer = {
333 query_store_host, query_store_service,
334 query_store_metric, query_store_attribute,
335 };
337 /*
338 * private types
339 */
341 static int
342 ctx_init(sdb_object_t *obj, va_list __attribute__((unused)) ap)
343 {
344 ctx_t *ctx = CTX(obj);
346 assert(ctx);
348 ctx->public = plugin_default_ctx;
349 ctx->info = plugin_default_info;
350 ctx->handle = NULL;
351 ctx->use_cnt = 1;
352 return 0;
353 } /* ctx_init */
355 static void
356 ctx_destroy(sdb_object_t *obj)
357 {
358 ctx_t *ctx = CTX(obj);
360 if (ctx->handle) {
361 const char *err;
363 sdb_log(SDB_LOG_INFO, "core: Unloading module %s",
364 ctx->info.plugin_name);
366 lt_dlerror();
367 lt_dlclose(ctx->handle);
368 if ((err = lt_dlerror()))
369 sdb_log(SDB_LOG_WARNING, "core: Failed to unload module %s: %s",
370 ctx->info.plugin_name, err);
371 }
373 plugin_info_clear(&ctx->info);
374 } /* ctx_destroy */
376 static sdb_type_t ctx_type = {
377 sizeof(ctx_t),
379 ctx_init,
380 ctx_destroy
381 };
383 static ctx_t *
384 ctx_get(void)
385 {
386 if (! plugin_ctx_key_initialized)
387 ctx_key_init();
388 return pthread_getspecific(plugin_ctx_key);
389 } /* ctx_get */
391 static ctx_t *
392 ctx_set(ctx_t *new)
393 {
394 ctx_t *old;
396 if (! plugin_ctx_key_initialized)
397 ctx_key_init();
399 old = pthread_getspecific(plugin_ctx_key);
400 if (old)
401 sdb_object_deref(SDB_OBJ(old));
402 if (new)
403 sdb_object_ref(SDB_OBJ(new));
404 pthread_setspecific(plugin_ctx_key, new);
405 return old;
406 } /* ctx_set */
408 static ctx_t *
409 ctx_create(const char *name)
410 {
411 ctx_t *ctx;
413 ctx = CTX(sdb_object_create(name, ctx_type));
414 if (! ctx)
415 return NULL;
417 if (! plugin_ctx_key_initialized)
418 ctx_key_init();
419 ctx_set(ctx);
420 return ctx;
421 } /* ctx_create */
423 /*
424 * plugin_init_ok:
425 * Checks whether the registration of a new plugin identified by 'obj' is
426 * okay. It consumes the first two arguments of 'ap'.
427 */
428 static bool
429 plugin_init_ok(sdb_object_t *obj, va_list ap)
430 {
431 sdb_llist_t **list = va_arg(ap, sdb_llist_t **);
432 const char *type = va_arg(ap, const char *);
434 assert(list); assert(type);
436 if (sdb_llist_search_by_name(*list, obj->name)) {
437 sdb_log(SDB_LOG_WARNING, "core: %s callback '%s' "
438 "has already been registered. Ignoring newly "
439 "registered version.", type, obj->name);
440 return 0;
441 }
442 return 1;
443 } /* plugin_init_ok */
445 static int
446 plugin_cb_init(sdb_object_t *obj, va_list ap)
447 {
448 void *callback;
449 sdb_object_t *ud;
451 if (! plugin_init_ok(obj, ap))
452 return -1;
454 callback = va_arg(ap, void *);
455 ud = va_arg(ap, sdb_object_t *);
457 /* cb_ctx may be NULL if the plugin was not registered by a plugin */
459 CB(obj)->cb_callback = callback;
460 CB(obj)->cb_ctx = ctx_get();
461 sdb_object_ref(SDB_OBJ(CB(obj)->cb_ctx));
463 sdb_object_ref(ud);
464 CB(obj)->cb_user_data = ud;
465 return 0;
466 } /* plugin_cb_init */
468 static void
469 plugin_cb_destroy(sdb_object_t *obj)
470 {
471 assert(obj);
472 sdb_object_deref(CB(obj)->cb_user_data);
473 sdb_object_deref(SDB_OBJ(CB(obj)->cb_ctx));
474 } /* plugin_cb_destroy */
476 static sdb_type_t callback_type = {
477 sizeof(callback_t),
479 plugin_cb_init,
480 plugin_cb_destroy
481 };
483 static sdb_type_t collector_type = {
484 sizeof(collector_t),
486 plugin_cb_init,
487 plugin_cb_destroy
488 };
490 static int
491 plugin_writer_init(sdb_object_t *obj, va_list ap)
492 {
493 sdb_store_writer_t *impl;
494 sdb_object_t *ud;
496 if (! plugin_init_ok(obj, ap))
497 return -1;
499 impl = va_arg(ap, sdb_store_writer_t *);
500 ud = va_arg(ap, sdb_object_t *);
501 assert(impl);
503 if ((! impl->store_host) || (! impl->store_service)
504 || (! impl->store_metric) || (! impl->store_attribute)) {
505 sdb_log(SDB_LOG_ERR, "core: store writer callback '%s' "
506 "does not fully implement the writer interface.",
507 obj->name);
508 return -1;
509 }
511 /* ctx may be NULL if the callback was not registered by a plugin */
513 WRITER(obj)->impl = *impl;
514 WRITER(obj)->w_ctx = ctx_get();
515 sdb_object_ref(SDB_OBJ(WRITER(obj)->w_ctx));
517 sdb_object_ref(ud);
518 WRITER(obj)->w_user_data = ud;
519 return 0;
520 } /* plugin_writer_init */
522 static void
523 plugin_writer_destroy(sdb_object_t *obj)
524 {
525 assert(obj);
526 sdb_object_deref(WRITER(obj)->w_user_data);
527 sdb_object_deref(SDB_OBJ(WRITER(obj)->w_ctx));
528 } /* plugin_writer_destroy */
530 static sdb_type_t writer_type = {
531 sizeof(writer_t),
533 plugin_writer_init,
534 plugin_writer_destroy
535 };
537 static int
538 plugin_reader_init(sdb_object_t *obj, va_list ap)
539 {
540 sdb_store_reader_t *impl;
541 sdb_object_t *ud;
543 if (! plugin_init_ok(obj, ap))
544 return -1;
546 impl = va_arg(ap, sdb_store_reader_t *);
547 ud = va_arg(ap, sdb_object_t *);
548 assert(impl);
550 if ((! impl->prepare_query) || (! impl->execute_query)) {
551 sdb_log(SDB_LOG_ERR, "core: store reader callback '%s' "
552 "does not fully implement the reader interface.",
553 obj->name);
554 return -1;
555 }
557 /* ctx may be NULL if the callback was not registered by a plugin */
559 READER(obj)->impl = *impl;
560 READER(obj)->r_ctx = ctx_get();
561 sdb_object_ref(SDB_OBJ(READER(obj)->r_ctx));
563 sdb_object_ref(ud);
564 READER(obj)->r_user_data = ud;
565 return 0;
566 } /* plugin_reader_init */
568 static void
569 plugin_reader_destroy(sdb_object_t *obj)
570 {
571 assert(obj);
572 sdb_object_deref(READER(obj)->r_user_data);
573 sdb_object_deref(SDB_OBJ(READER(obj)->r_ctx));
574 } /* plugin_reader_destroy */
576 static sdb_type_t reader_type = {
577 sizeof(reader_t),
579 plugin_reader_init,
580 plugin_reader_destroy
581 };
583 static int
584 plugin_ts_fetcher_init(sdb_object_t *obj, va_list ap)
585 {
586 sdb_timeseries_fetcher_t *impl;
587 sdb_object_t *ud;
589 if (! plugin_init_ok(obj, ap))
590 return -1;
592 impl = va_arg(ap, sdb_timeseries_fetcher_t *);
593 ud = va_arg(ap, sdb_object_t *);
594 assert(impl);
596 if ((! impl->describe) || (! impl->fetch)) {
597 sdb_log(SDB_LOG_ERR, "core: timeseries fetcher callback '%s' "
598 "does not fully implement the interface.",
599 obj->name);
600 return -1;
601 }
603 /* ctx may be NULL if the callback was not registered by a plugin */
605 TS_FETCHER(obj)->impl = *impl;
606 TS_FETCHER(obj)->ts_ctx = ctx_get();
607 sdb_object_ref(SDB_OBJ(TS_FETCHER(obj)->ts_ctx));
609 sdb_object_ref(ud);
610 TS_FETCHER(obj)->ts_user_data = ud;
611 return 0;
612 } /* plugin_ts_fetcher_init */
614 static void
615 plugin_ts_fetcher_destroy(sdb_object_t *obj)
616 {
617 assert(obj);
618 sdb_object_deref(TS_FETCHER(obj)->ts_user_data);
619 sdb_object_deref(SDB_OBJ(TS_FETCHER(obj)->ts_ctx));
620 } /* plugin_ts_fetcher_destroy */
622 static sdb_type_t ts_fetcher_type = {
623 sizeof(ts_fetcher_t),
625 plugin_ts_fetcher_init,
626 plugin_ts_fetcher_destroy
627 };
629 static int
630 module_init(const char *name, lt_dlhandle lh, sdb_plugin_info_t *info)
631 {
632 int (*mod_init)(sdb_plugin_info_t *);
633 int status;
635 mod_init = (int (*)(sdb_plugin_info_t *))lt_dlsym(lh, "sdb_module_init");
636 if (! mod_init) {
637 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': "
638 "could not find symbol 'sdb_module_init'", name);
639 return -1;
640 }
642 status = mod_init(info);
643 if (status) {
644 sdb_log(SDB_LOG_ERR, "core: Failed to initialize "
645 "module '%s'", name);
646 plugin_unregister_by_name(name);
647 return -1;
648 }
649 return 0;
650 } /* module_init */
652 static int
653 module_load(const char *basedir, const char *name,
654 const sdb_plugin_ctx_t *plugin_ctx)
655 {
656 char base_name[name ? strlen(name) + 1 : 1];
657 const char *name_ptr;
658 char *tmp;
660 char filename[1024];
661 lt_dlhandle lh;
663 ctx_t *ctx;
665 int status;
667 assert(name);
669 base_name[0] = '\0';
670 name_ptr = name;
672 while ((tmp = strstr(name_ptr, "::"))) {
673 strncat(base_name, name_ptr, (size_t)(tmp - name_ptr));
674 strcat(base_name, "/");
675 name_ptr = tmp + strlen("::");
676 }
677 strcat(base_name, name_ptr);
679 if (! basedir)
680 basedir = PKGLIBDIR;
682 snprintf(filename, sizeof(filename), "%s/%s.so", basedir, base_name);
683 filename[sizeof(filename) - 1] = '\0';
685 if (access(filename, R_OK)) {
686 char errbuf[1024];
687 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s' (%s): %s",
688 name, filename, sdb_strerror(errno, errbuf, sizeof(errbuf)));
689 return -1;
690 }
692 lt_dlinit();
693 lt_dlerror();
695 lh = lt_dlopen(filename);
696 if (! lh) {
697 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': %s"
698 "The most common cause for this problem are missing "
699 "dependencies.\n", name, lt_dlerror());
700 return -1;
701 }
703 if (ctx_get())
704 sdb_log(SDB_LOG_WARNING, "core: Discarding old plugin context");
706 ctx = ctx_create(name);
707 if (! ctx) {
708 sdb_log(SDB_LOG_ERR, "core: Failed to initialize plugin context");
709 return -1;
710 }
712 ctx->info.plugin_name = strdup(name);
713 ctx->info.filename = strdup(filename);
714 ctx->handle = lh;
716 if (plugin_ctx)
717 ctx->public = *plugin_ctx;
719 if ((status = module_init(name, lh, &ctx->info))) {
720 sdb_object_deref(SDB_OBJ(ctx));
721 return status;
722 }
724 /* compare minor version */
725 if ((ctx->info.version < 0)
726 || ((int)(ctx->info.version / 100) != (int)(SDB_VERSION / 100)))
727 sdb_log(SDB_LOG_WARNING, "core: WARNING: version of "
728 "plugin '%s' (%i.%i.%i) does not match our version "
729 "(%i.%i.%i); this might cause problems",
730 name, SDB_VERSION_DECODE(ctx->info.version),
731 SDB_VERSION_DECODE(SDB_VERSION));
733 sdb_llist_append(all_plugins, SDB_OBJ(ctx));
735 sdb_log(SDB_LOG_INFO, "core: Successfully loaded "
736 "plugin %s v%i (%s)", ctx->info.plugin_name,
737 ctx->info.plugin_version,
738 INFO_GET(&ctx->info, description));
739 sdb_log(SDB_LOG_INFO, "core: Plugin %s: %s, License: %s",
740 ctx->info.plugin_name,
741 INFO_GET(&ctx->info, copyright),
742 INFO_GET(&ctx->info, license));
744 /* any registered callbacks took ownership of the context */
745 sdb_object_deref(SDB_OBJ(ctx));
747 /* reset */
748 ctx_set(NULL);
749 return 0;
750 } /* module_load */
752 static char *
753 plugin_get_name(const char *name, char *buf, size_t bufsize)
754 {
755 ctx_t *ctx = ctx_get();
757 if (ctx)
758 snprintf(buf, bufsize, "%s::%s", ctx->info.plugin_name, name);
759 else
760 snprintf(buf, bufsize, "core::%s", name);
761 return buf;
762 } /* plugin_get_name */
764 static int
765 plugin_add_impl(sdb_llist_t **list, sdb_type_t T, const char *type,
766 const char *name, void *impl, sdb_object_t *user_data)
767 {
768 sdb_object_t *obj;
770 if ((! name) || (! impl))
771 return -1;
773 assert(list);
775 if (! *list)
776 *list = sdb_llist_create();
777 if (! *list)
778 return -1;
780 obj = sdb_object_create(name, T, list, type, impl, user_data);
781 if (! obj)
782 return -1;
784 if (sdb_llist_append(*list, obj)) {
785 sdb_object_deref(obj);
786 return -1;
787 }
789 /* pass control to the list */
790 sdb_object_deref(obj);
792 sdb_log(SDB_LOG_INFO, "core: Registered %s callback '%s'.",
793 type, name);
794 return 0;
795 } /* plugin_add_impl */
797 /*
798 * object meta-data
799 */
801 typedef struct {
802 int obj_type;
803 sdb_time_t last_update;
804 sdb_time_t interval;
805 } interval_fetcher_t;
807 static int
808 interval_fetcher_host(sdb_store_host_t *host, sdb_object_t *user_data)
809 {
810 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
811 lu->obj_type = SDB_HOST;
812 lu->last_update = host->last_update;
813 return 0;
814 } /* interval_fetcher_host */
816 static int
817 interval_fetcher_service(sdb_store_service_t *svc, sdb_object_t *user_data)
818 {
819 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
820 lu->obj_type = SDB_SERVICE;
821 lu->last_update = svc->last_update;
822 return 0;
823 } /* interval_fetcher_service */
825 static int
826 interval_fetcher_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
827 {
828 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
829 lu->obj_type = SDB_METRIC;
830 lu->last_update = metric->last_update;
831 return 0;
832 } /* interval_fetcher_metric */
834 static int
835 interval_fetcher_attr(sdb_store_attribute_t *attr, sdb_object_t *user_data)
836 {
837 interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
838 lu->obj_type = SDB_ATTRIBUTE;
839 lu->last_update = attr->last_update;
840 return 0;
841 } /* interval_fetcher_attr */
843 static sdb_store_writer_t interval_fetcher = {
844 interval_fetcher_host, interval_fetcher_service,
845 interval_fetcher_metric, interval_fetcher_attr,
846 };
848 static int
849 get_interval(int obj_type, const char *hostname,
850 int parent_type, const char *parent, const char *name,
851 sdb_time_t last_update, sdb_time_t *interval_out)
852 {
853 sdb_ast_fetch_t fetch = SDB_AST_FETCH_INIT;
854 char hn[hostname ? strlen(hostname) + 1 : 1];
855 char pn[parent ? strlen(parent) + 1 : 1];
856 char n[strlen(name) + 1];
857 int status;
859 interval_fetcher_t lu = { 0, 0, 0 };
860 sdb_object_wrapper_t obj = SDB_OBJECT_WRAPPER_STATIC(&lu);
861 sdb_time_t interval;
863 assert(name);
865 if (hostname)
866 strncpy(hn, hostname, sizeof(hn));
867 if (parent)
868 strncpy(pn, parent, sizeof(pn));
869 strncpy(n, name, sizeof(n));
871 fetch.obj_type = obj_type;
872 fetch.hostname = hostname ? hn : NULL;
873 fetch.parent_type = parent_type;
874 fetch.parent = parent ? pn : NULL;
875 fetch.name = n;
877 status = sdb_plugin_query(SDB_AST_NODE(&fetch),
878 &interval_fetcher, SDB_OBJ(&obj), NULL);
879 if ((status < 0) || (lu.obj_type != obj_type) || (lu.last_update == 0)) {
880 *interval_out = 0;
881 return 0;
882 }
884 if (lu.last_update >= last_update) {
885 if (lu.last_update > last_update)
886 sdb_log(SDB_LOG_DEBUG, "memstore: Cannot update %s '%s' - "
887 "value too old (%"PRIsdbTIME" < %"PRIsdbTIME")",
888 SDB_STORE_TYPE_TO_NAME(obj_type), name,
889 lu.last_update, last_update);
890 *interval_out = lu.interval;
891 return 1;
892 }
894 interval = last_update - lu.last_update;
895 if (lu.interval && interval)
896 interval = (sdb_time_t)((0.9 * (double)lu.interval)
897 + (0.1 * (double)interval));
898 *interval_out = interval;
899 return 0;
900 } /* get_interval */
902 static void
903 get_backend(char **backends, size_t *backends_num)
904 {
905 const sdb_plugin_info_t *info;
907 info = sdb_plugin_current();
908 if ((! info) || (! info->plugin_name) || (! *info->plugin_name)) {
909 *backends_num = 0;
910 return;
911 }
913 backends[0] = info->plugin_name;
914 *backends_num = 1;
915 } /* get_backend */
917 /*
918 * public API
919 */
921 int
922 sdb_plugin_load(const char *basedir, const char *name,
923 const sdb_plugin_ctx_t *plugin_ctx)
924 {
925 ctx_t *ctx;
927 int status;
929 if ((! name) || (! *name))
930 return -1;
932 if (! all_plugins) {
933 if (! (all_plugins = sdb_llist_create())) {
934 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': "
935 "internal error while creating linked list", name);
936 return -1;
937 }
938 }
940 ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
941 if (ctx) {
942 /* plugin already loaded */
943 if (! ctx->use_cnt) {
944 /* reloading plugin */
945 ctx_t *old_ctx = ctx_set(ctx);
947 status = module_init(ctx->info.plugin_name, ctx->handle, NULL);
948 if (status)
949 return status;
951 sdb_log(SDB_LOG_INFO, "core: Successfully reloaded plugin "
952 "'%s' (%s)", ctx->info.plugin_name,
953 INFO_GET(&ctx->info, description));
954 ctx_set(old_ctx);
955 }
956 ++ctx->use_cnt;
957 return 0;
958 }
960 return module_load(basedir, name, plugin_ctx);
961 } /* sdb_plugin_load */
963 int
964 sdb_plugin_set_info(sdb_plugin_info_t *info, int type, ...)
965 {
966 va_list ap;
968 if (! info)
969 return -1;
971 va_start(ap, type);
973 switch (type) {
974 case SDB_PLUGIN_INFO_DESC:
975 {
976 char *desc = va_arg(ap, char *);
977 if (desc) {
978 if (info->description)
979 free(info->description);
980 info->description = strdup(desc);
981 }
982 }
983 break;
984 case SDB_PLUGIN_INFO_COPYRIGHT:
985 {
986 char *copyright = va_arg(ap, char *);
987 if (copyright)
988 info->copyright = strdup(copyright);
989 }
990 break;
991 case SDB_PLUGIN_INFO_LICENSE:
992 {
993 char *license = va_arg(ap, char *);
994 if (license) {
995 if (info->license)
996 free(info->license);
997 info->license = strdup(license);
998 }
999 }
1000 break;
1001 case SDB_PLUGIN_INFO_VERSION:
1002 {
1003 int version = va_arg(ap, int);
1004 info->version = version;
1005 }
1006 break;
1007 case SDB_PLUGIN_INFO_PLUGIN_VERSION:
1008 {
1009 int version = va_arg(ap, int);
1010 info->plugin_version = version;
1011 }
1012 break;
1013 default:
1014 va_end(ap);
1015 return -1;
1016 }
1018 va_end(ap);
1019 return 0;
1020 } /* sdb_plugin_set_info */
1022 int
1023 sdb_plugin_register_config(sdb_plugin_config_cb callback)
1024 {
1025 ctx_t *ctx = ctx_get();
1027 if (! ctx) {
1028 sdb_log(SDB_LOG_ERR, "core: Invalid attempt to register a "
1029 "config callback from outside a plugin");
1030 return -1;
1031 }
1032 return plugin_add_impl(&config_list, callback_type, "config",
1033 ctx->info.plugin_name, (void *)callback, NULL);
1034 } /* sdb_plugin_register_config */
1036 int
1037 sdb_plugin_register_init(const char *name, sdb_plugin_init_cb callback,
1038 sdb_object_t *user_data)
1039 {
1040 char cb_name[1024];
1041 return plugin_add_impl(&init_list, callback_type, "init",
1042 plugin_get_name(name, cb_name, sizeof(cb_name)),
1043 (void *)callback, user_data);
1044 } /* sdb_plugin_register_init */
1046 int
1047 sdb_plugin_register_shutdown(const char *name, sdb_plugin_shutdown_cb callback,
1048 sdb_object_t *user_data)
1049 {
1050 char cb_name[1024];
1051 return plugin_add_impl(&shutdown_list, callback_type, "shutdown",
1052 plugin_get_name(name, cb_name, sizeof(cb_name)),
1053 (void *)callback, user_data);
1054 } /* sdb_plugin_register_shutdown */
1056 int
1057 sdb_plugin_register_log(const char *name, sdb_plugin_log_cb callback,
1058 sdb_object_t *user_data)
1059 {
1060 char cb_name[1024];
1061 return plugin_add_impl(&log_list, callback_type, "log",
1062 plugin_get_name(name, cb_name, sizeof(cb_name)),
1063 callback, user_data);
1064 } /* sdb_plugin_register_log */
1066 int
1067 sdb_plugin_register_cname(const char *name, sdb_plugin_cname_cb callback,
1068 sdb_object_t *user_data)
1069 {
1070 char cb_name[1024];
1071 return plugin_add_impl(&cname_list, callback_type, "cname",
1072 plugin_get_name(name, cb_name, sizeof(cb_name)),
1073 callback, user_data);
1074 } /* sdb_plugin_register_cname */
1076 int
1077 sdb_plugin_register_collector(const char *name, sdb_plugin_collector_cb callback,
1078 const sdb_time_t *interval, sdb_object_t *user_data)
1079 {
1080 char cb_name[1024];
1081 sdb_object_t *obj;
1083 if ((! name) || (! callback))
1084 return -1;
1086 if (! collector_list)
1087 collector_list = sdb_llist_create();
1088 if (! collector_list)
1089 return -1;
1091 plugin_get_name(name, cb_name, sizeof(cb_name));
1093 obj = sdb_object_create(cb_name, collector_type,
1094 &collector_list, "collector", callback, user_data);
1095 if (! obj)
1096 return -1;
1098 if (interval)
1099 CCB(obj)->ccb_interval = *interval;
1100 else {
1101 ctx_t *ctx = ctx_get();
1103 if (! ctx) {
1104 sdb_log(SDB_LOG_ERR, "core: Cannot determine interval "
1105 "for collector %s; none specified and no plugin "
1106 "context found", cb_name);
1107 return -1;
1108 }
1110 CCB(obj)->ccb_interval = ctx->public.interval;
1111 }
1113 if (! (CCB(obj)->ccb_next_update = sdb_gettime())) {
1114 char errbuf[1024];
1115 sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1116 "time: %s", sdb_strerror(errno, errbuf, sizeof(errbuf)));
1117 sdb_object_deref(obj);
1118 return -1;
1119 }
1121 if (sdb_llist_insert_sorted(collector_list, obj,
1122 plugin_cmp_next_update)) {
1123 sdb_object_deref(obj);
1124 return -1;
1125 }
1127 /* pass control to the list */
1128 sdb_object_deref(obj);
1130 sdb_log(SDB_LOG_INFO, "core: Registered collector callback '%s' "
1131 "(interval = %.3fs).", cb_name,
1132 SDB_TIME_TO_DOUBLE(CCB(obj)->ccb_interval));
1133 return 0;
1134 } /* sdb_plugin_register_collector */
1136 int
1137 sdb_plugin_register_timeseries_fetcher(const char *name,
1138 sdb_timeseries_fetcher_t *fetcher, sdb_object_t *user_data)
1139 {
1140 return plugin_add_impl(×eries_fetcher_list, ts_fetcher_type, "time-series fetcher",
1141 name, fetcher, user_data);
1142 } /* sdb_plugin_register_timeseries_fetcher */
1144 int
1145 sdb_plugin_register_writer(const char *name,
1146 sdb_store_writer_t *writer, sdb_object_t *user_data)
1147 {
1148 char cb_name[1024];
1149 return plugin_add_impl(&writer_list, writer_type, "store writer",
1150 plugin_get_name(name, cb_name, sizeof(cb_name)),
1151 writer, user_data);
1152 } /* sdb_store_register_writer */
1154 int
1155 sdb_plugin_register_reader(const char *name,
1156 sdb_store_reader_t *reader, sdb_object_t *user_data)
1157 {
1158 char cb_name[1024];
1159 return plugin_add_impl(&reader_list, reader_type, "store reader",
1160 plugin_get_name(name, cb_name, sizeof(cb_name)),
1161 reader, user_data);
1162 } /* sdb_plugin_register_reader */
1164 void
1165 sdb_plugin_unregister_all(void)
1166 {
1167 size_t i;
1169 for (i = 0; i < SDB_STATIC_ARRAY_LEN(all_lists); ++i) {
1170 const char *type = all_lists[i].type;
1171 sdb_llist_t *list = *all_lists[i].list;
1173 size_t len = sdb_llist_len(list);
1175 if (! len)
1176 continue;
1178 sdb_llist_clear(list);
1179 sdb_log(SDB_LOG_INFO, "core: Unregistered %zu %s callback%s",
1180 len, type, len == 1 ? "" : "s");
1181 }
1182 } /* sdb_plugin_unregister_all */
1184 sdb_plugin_ctx_t
1185 sdb_plugin_get_ctx(void)
1186 {
1187 ctx_t *c;
1189 c = ctx_get();
1190 if (! c) {
1191 sdb_plugin_log(SDB_LOG_ERR, "core: Invalid read access to plugin "
1192 "context outside a plugin");
1193 return plugin_default_ctx;
1194 }
1195 return c->public;
1196 } /* sdb_plugin_get_ctx */
1198 int
1199 sdb_plugin_set_ctx(sdb_plugin_ctx_t ctx, sdb_plugin_ctx_t *old)
1200 {
1201 ctx_t *c;
1203 c = ctx_get();
1204 if (! c) {
1205 sdb_plugin_log(SDB_LOG_ERR, "core: Invalid write access to plugin "
1206 "context outside a plugin");
1207 return -1;
1208 }
1210 if (old)
1211 *old = c->public;
1212 c->public = ctx;
1213 return 0;
1214 } /* sdb_plugin_set_ctx */
1216 const sdb_plugin_info_t *
1217 sdb_plugin_current(void)
1218 {
1219 ctx_t *ctx = ctx_get();
1221 if (! ctx)
1222 return NULL;
1223 return &ctx->info;
1224 } /* sdb_plugin_current */
1226 int
1227 sdb_plugin_configure(const char *name, oconfig_item_t *ci)
1228 {
1229 callback_t *plugin;
1230 sdb_plugin_config_cb callback;
1232 ctx_t *old_ctx;
1234 int status;
1236 if ((! name) || (! ci))
1237 return -1;
1239 plugin = CB(sdb_llist_search_by_name(config_list, name));
1240 if (! plugin) {
1241 ctx_t *ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
1242 if (! ctx)
1243 sdb_log(SDB_LOG_ERR, "core: Cannot configure unknown "
1244 "plugin '%s'. Missing 'LoadPlugin \"%s\"'?",
1245 name, name);
1246 else
1247 sdb_log(SDB_LOG_ERR, "core: Plugin '%s' did not register "
1248 "a config callback.", name);
1249 errno = ENOENT;
1250 return -1;
1251 }
1253 old_ctx = ctx_set(plugin->cb_ctx);
1254 callback = (sdb_plugin_config_cb)plugin->cb_callback;
1255 status = callback(ci);
1256 ctx_set(old_ctx);
1257 return status;
1258 } /* sdb_plugin_configure */
1260 int
1261 sdb_plugin_reconfigure_init(void)
1262 {
1263 sdb_llist_iter_t *iter;
1265 iter = sdb_llist_get_iter(config_list);
1266 if (config_list && (! iter))
1267 return -1;
1269 /* deconfigure all plugins */
1270 while (sdb_llist_iter_has_next(iter)) {
1271 callback_t *plugin;
1272 sdb_plugin_config_cb callback;
1273 ctx_t *old_ctx;
1275 plugin = CB(sdb_llist_iter_get_next(iter));
1276 old_ctx = ctx_set(plugin->cb_ctx);
1277 callback = (sdb_plugin_config_cb)plugin->cb_callback;
1278 callback(NULL);
1279 ctx_set(old_ctx);
1280 }
1281 sdb_llist_iter_destroy(iter);
1283 iter = sdb_llist_get_iter(all_plugins);
1284 if (all_plugins && (! iter))
1285 return -1;
1287 /* record all plugins as being unused */
1288 while (sdb_llist_iter_has_next(iter))
1289 CTX(sdb_llist_iter_get_next(iter))->use_cnt = 0;
1290 sdb_llist_iter_destroy(iter);
1292 sdb_plugin_unregister_all();
1293 return 0;
1294 } /* sdb_plugin_reconfigure_init */
1296 int
1297 sdb_plugin_reconfigure_finish(void)
1298 {
1299 sdb_llist_iter_t *iter;
1301 iter = sdb_llist_get_iter(all_plugins);
1302 if (all_plugins && (! iter))
1303 return -1;
1305 while (sdb_llist_iter_has_next(iter)) {
1306 ctx_t *ctx = CTX(sdb_llist_iter_get_next(iter));
1307 if (ctx->use_cnt)
1308 continue;
1310 sdb_log(SDB_LOG_INFO, "core: Module %s no longer in use",
1311 ctx->info.plugin_name);
1312 sdb_llist_iter_remove_current(iter);
1313 plugin_unregister_by_name(ctx->info.plugin_name);
1314 sdb_object_deref(SDB_OBJ(ctx));
1315 }
1316 sdb_llist_iter_destroy(iter);
1317 return 0;
1318 } /* sdb_plugin_reconfigure_finish */
1320 int
1321 sdb_plugin_init_all(void)
1322 {
1323 sdb_llist_iter_t *iter;
1324 int ret = 0;
1326 iter = sdb_llist_get_iter(init_list);
1327 while (sdb_llist_iter_has_next(iter)) {
1328 callback_t *cb;
1329 sdb_plugin_init_cb callback;
1330 ctx_t *old_ctx;
1332 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1333 assert(obj);
1334 cb = CB(obj);
1336 callback = (sdb_plugin_init_cb)cb->cb_callback;
1338 old_ctx = ctx_set(cb->cb_ctx);
1339 if (callback(cb->cb_user_data)) {
1340 sdb_log(SDB_LOG_ERR, "core: Failed to initialize plugin "
1341 "'%s'. Unregistering all callbacks.", obj->name);
1342 ctx_set(old_ctx);
1343 plugin_unregister_by_name(cb->cb_ctx->info.plugin_name);
1344 ++ret;
1345 }
1346 else
1347 ctx_set(old_ctx);
1348 }
1349 sdb_llist_iter_destroy(iter);
1350 return ret;
1351 } /* sdb_plugin_init_all */
1353 int
1354 sdb_plugin_shutdown_all(void)
1355 {
1356 sdb_llist_iter_t *iter;
1357 int ret = 0;
1359 iter = sdb_llist_get_iter(shutdown_list);
1360 while (sdb_llist_iter_has_next(iter)) {
1361 callback_t *cb;
1362 sdb_plugin_shutdown_cb callback;
1363 ctx_t *old_ctx;
1365 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1366 assert(obj);
1367 cb = CB(obj);
1369 callback = (sdb_plugin_shutdown_cb)cb->cb_callback;
1371 old_ctx = ctx_set(cb->cb_ctx);
1372 if (callback(cb->cb_user_data)) {
1373 sdb_log(SDB_LOG_ERR, "core: Failed to shutdown plugin '%s'.",
1374 obj->name);
1375 ++ret;
1376 }
1377 ctx_set(old_ctx);
1378 }
1379 sdb_llist_iter_destroy(iter);
1380 return ret;
1381 } /* sdb_plugin_shutdown_all */
1383 int
1384 sdb_plugin_collector_loop(sdb_plugin_loop_t *loop)
1385 {
1386 if (! collector_list) {
1387 sdb_log(SDB_LOG_WARNING, "core: No collectors registered. "
1388 "Quiting main loop.");
1389 return -1;
1390 }
1392 if (! loop)
1393 return -1;
1395 while (loop->do_loop) {
1396 sdb_plugin_collector_cb callback;
1397 ctx_t *old_ctx;
1399 sdb_time_t interval, now;
1401 sdb_object_t *obj = sdb_llist_shift(collector_list);
1402 if (! obj)
1403 return -1;
1405 callback = (sdb_plugin_collector_cb)CCB(obj)->ccb_callback;
1407 if (! (now = sdb_gettime())) {
1408 char errbuf[1024];
1409 sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1410 "time in collector main loop: %s",
1411 sdb_strerror(errno, errbuf, sizeof(errbuf)));
1412 now = CCB(obj)->ccb_next_update;
1413 }
1415 if (now < CCB(obj)->ccb_next_update) {
1416 interval = CCB(obj)->ccb_next_update - now;
1418 errno = 0;
1419 while (loop->do_loop && sdb_sleep(interval, &interval)) {
1420 if (errno != EINTR) {
1421 char errbuf[1024];
1422 sdb_log(SDB_LOG_ERR, "core: Failed to sleep "
1423 "in collector main loop: %s",
1424 sdb_strerror(errno, errbuf, sizeof(errbuf)));
1425 sdb_llist_insert_sorted(collector_list, obj,
1426 plugin_cmp_next_update);
1427 sdb_object_deref(obj);
1428 return -1;
1429 }
1430 errno = 0;
1431 }
1433 if (! loop->do_loop) {
1434 /* put back; don't worry about errors */
1435 sdb_llist_insert_sorted(collector_list, obj,
1436 plugin_cmp_next_update);
1437 sdb_object_deref(obj);
1438 return 0;
1439 }
1440 }
1442 old_ctx = ctx_set(CCB(obj)->ccb_ctx);
1443 if (callback(CCB(obj)->ccb_user_data)) {
1444 /* XXX */
1445 }
1446 ctx_set(old_ctx);
1448 interval = CCB(obj)->ccb_interval;
1449 if (! interval)
1450 interval = loop->default_interval;
1451 if (! interval) {
1452 sdb_log(SDB_LOG_WARNING, "core: No interval configured "
1453 "for plugin '%s'; skipping any further "
1454 "iterations.", obj->name);
1455 sdb_object_deref(obj);
1456 continue;
1457 }
1459 CCB(obj)->ccb_next_update += interval;
1461 if (! (now = sdb_gettime())) {
1462 char errbuf[1024];
1463 sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1464 "time in collector main loop: %s",
1465 sdb_strerror(errno, errbuf, sizeof(errbuf)));
1466 now = CCB(obj)->ccb_next_update;
1467 }
1469 if (now > CCB(obj)->ccb_next_update) {
1470 sdb_log(SDB_LOG_WARNING, "core: Plugin '%s' took too "
1471 "long; skipping iterations to keep up.",
1472 obj->name);
1473 CCB(obj)->ccb_next_update = now;
1474 }
1476 if (sdb_llist_insert_sorted(collector_list, obj,
1477 plugin_cmp_next_update)) {
1478 sdb_log(SDB_LOG_ERR, "core: Failed to re-insert "
1479 "plugin '%s' into collector list. Unable to further "
1480 "use the plugin.",
1481 obj->name);
1482 sdb_object_deref(obj);
1483 return -1;
1484 }
1486 /* pass control back to the list */
1487 sdb_object_deref(obj);
1488 }
1489 return 0;
1490 } /* sdb_plugin_read_loop */
1492 char *
1493 sdb_plugin_cname(char *hostname)
1494 {
1495 sdb_llist_iter_t *iter;
1497 if (! hostname)
1498 return NULL;
1500 if (! cname_list)
1501 return hostname;
1503 iter = sdb_llist_get_iter(cname_list);
1504 while (sdb_llist_iter_has_next(iter)) {
1505 sdb_plugin_cname_cb callback;
1506 char *cname;
1508 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1509 assert(obj);
1511 callback = (sdb_plugin_cname_cb)CB(obj)->cb_callback;
1512 cname = callback(hostname, CB(obj)->cb_user_data);
1513 if (cname) {
1514 free(hostname);
1515 hostname = cname;
1516 }
1517 /* else: don't change hostname */
1518 }
1519 sdb_llist_iter_destroy(iter);
1520 return hostname;
1521 } /* sdb_plugin_cname */
1523 int
1524 sdb_plugin_log(int prio, const char *msg)
1525 {
1526 sdb_llist_iter_t *iter;
1527 int ret = -1;
1529 bool logged = 0;
1531 if (! msg)
1532 return 0;
1534 iter = sdb_llist_get_iter(log_list);
1535 while (sdb_llist_iter_has_next(iter)) {
1536 sdb_plugin_log_cb callback;
1537 int tmp;
1539 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1540 assert(obj);
1542 callback = (sdb_plugin_log_cb)CB(obj)->cb_callback;
1543 tmp = callback(prio, msg, CB(obj)->cb_user_data);
1544 if (tmp > ret)
1545 ret = tmp;
1547 if (CB(obj)->cb_ctx)
1548 logged = 1;
1549 /* else: this is an internally registered callback */
1550 }
1551 sdb_llist_iter_destroy(iter);
1553 if (! logged)
1554 return fprintf(stderr, "[%s] %s\n", SDB_LOG_PRIO_TO_STRING(prio), msg);
1555 return ret;
1556 } /* sdb_plugin_log */
1558 int
1559 sdb_plugin_vlogf(int prio, const char *fmt, va_list ap)
1560 {
1561 sdb_strbuf_t *buf;
1562 int ret;
1564 if (! fmt)
1565 return 0;
1567 buf = sdb_strbuf_create(64);
1568 if (! buf) {
1569 ret = fprintf(stderr, "[%s] ", SDB_LOG_PRIO_TO_STRING(prio));
1570 ret += vfprintf(stderr, fmt, ap);
1571 return ret;
1572 }
1574 if (sdb_strbuf_vsprintf(buf, fmt, ap) < 0) {
1575 sdb_strbuf_destroy(buf);
1576 return -1;
1577 }
1579 ret = sdb_plugin_log(prio, sdb_strbuf_string(buf));
1580 sdb_strbuf_destroy(buf);
1581 return ret;
1582 } /* sdb_plugin_vlogf */
1584 int
1585 sdb_plugin_logf(int prio, const char *fmt, ...)
1586 {
1587 va_list ap;
1588 int ret;
1590 if (! fmt)
1591 return 0;
1593 va_start(ap, fmt);
1594 ret = sdb_plugin_vlogf(prio, fmt, ap);
1595 va_end(ap);
1596 return ret;
1597 } /* sdb_plugin_logf */
1599 sdb_timeseries_t *
1600 sdb_plugin_fetch_timeseries(const char *type, const char *id,
1601 sdb_timeseries_opts_t *opts)
1602 {
1603 ts_fetcher_t *fetcher;
1604 sdb_timeseries_t *ts;
1606 ctx_t *old_ctx;
1608 if ((! type) || (! id) || (! opts))
1609 return NULL;
1611 fetcher = TS_FETCHER(sdb_llist_search_by_name(timeseries_fetcher_list, type));
1612 if (! fetcher) {
1613 sdb_log(SDB_LOG_ERR, "core: Cannot fetch time-series of type %s: "
1614 "no such plugin loaded", type);
1615 errno = ENOENT;
1616 return NULL;
1617 }
1619 old_ctx = ctx_set(fetcher->ts_ctx);
1620 ts = fetcher->impl.fetch(id, opts, fetcher->ts_user_data);
1621 ctx_set(old_ctx);
1622 return ts;
1623 } /* sdb_plugin_fetch_timeseries */
1625 sdb_timeseries_info_t *
1626 sdb_plugin_describe_timeseries(const char *type, const char *id)
1627 {
1628 ts_fetcher_t *fetcher;
1629 sdb_timeseries_info_t *ts_info;
1631 ctx_t *old_ctx;
1633 if ((! type) || (! id))
1634 return NULL;
1636 fetcher = TS_FETCHER(sdb_llist_search_by_name(timeseries_fetcher_list, type));
1637 if (! fetcher) {
1638 sdb_log(SDB_LOG_ERR, "core: Cannot describe time-series of type %s: "
1639 "no such plugin loaded", type);
1640 errno = ENOENT;
1641 return NULL;
1642 }
1644 old_ctx = ctx_set(fetcher->ts_ctx);
1645 ts_info = fetcher->impl.describe(id, fetcher->ts_user_data);
1646 ctx_set(old_ctx);
1647 return ts_info;
1648 } /* sdb_plugin_describe_timeseries */
1650 int
1651 sdb_plugin_query(sdb_ast_node_t *ast,
1652 sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf)
1653 {
1654 query_writer_t qw = QUERY_WRITER_INIT(w, wd);
1655 reader_t *reader;
1656 sdb_object_t *q;
1658 size_t n = sdb_llist_len(reader_list);
1659 int status = 0;
1661 if (! ast)
1662 return 0;
1664 if ((ast->type != SDB_AST_TYPE_FETCH)
1665 && (ast->type != SDB_AST_TYPE_LIST)
1666 && (ast->type != SDB_AST_TYPE_LOOKUP)) {
1667 sdb_log(SDB_LOG_ERR, "core: Cannot execute query of type %s",
1668 SDB_AST_TYPE_TO_STRING(ast));
1669 sdb_strbuf_sprintf(errbuf, "Cannot execute query of type %s",
1670 SDB_AST_TYPE_TO_STRING(ast));
1671 return -1;
1672 }
1674 if (n != 1) {
1675 char *msg = (n > 0)
1676 ? "Cannot execute query: multiple readers not supported"
1677 : "Cannot execute query: no readers registered";
1678 sdb_strbuf_sprintf(errbuf, "%s", msg);
1679 sdb_log(SDB_LOG_ERR, "core: %s", msg);
1680 return -1;
1681 }
1683 reader = READER(sdb_llist_get(reader_list, 0));
1684 assert(reader);
1686 q = reader->impl.prepare_query(ast, errbuf, reader->r_user_data);
1687 if (q)
1688 status = reader->impl.execute_query(q, &query_writer, SDB_OBJ(&qw),
1689 errbuf, reader->r_user_data);
1690 else
1691 status = -1;
1693 sdb_object_deref(SDB_OBJ(q));
1694 sdb_object_deref(SDB_OBJ(reader));
1695 return status;
1696 } /* sdb_plugin_query */
1698 int
1699 sdb_plugin_store_host(const char *name, sdb_time_t last_update)
1700 {
1701 sdb_store_host_t host = SDB_STORE_HOST_INIT;
1702 char *backends[1];
1703 char *cname;
1705 sdb_llist_iter_t *iter;
1706 int status = 0;
1708 if (! name)
1709 return -1;
1711 if (! sdb_llist_len(writer_list)) {
1712 sdb_log(SDB_LOG_ERR, "core: Cannot store host: "
1713 "no writers registered");
1714 return -1;
1715 }
1717 cname = sdb_plugin_cname(strdup(name));
1718 if (! cname) {
1719 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1720 return -1;
1721 }
1723 host.name = cname;
1724 host.last_update = last_update ? last_update : sdb_gettime();
1725 if (get_interval(SDB_HOST, NULL, -1, NULL, cname,
1726 host.last_update, &host.interval)) {
1727 free(cname);
1728 return 1;
1729 }
1730 host.backends = (const char * const *)backends;
1731 get_backend(backends, &host.backends_num);
1733 iter = sdb_llist_get_iter(writer_list);
1734 while (sdb_llist_iter_has_next(iter)) {
1735 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1736 int s;
1737 assert(writer);
1738 s = writer->impl.store_host(&host, writer->w_user_data);
1739 if (((s > 0) && (status >= 0)) || (s < 0))
1740 status = s;
1741 }
1742 sdb_llist_iter_destroy(iter);
1743 free(cname);
1744 return status;
1745 } /* sdb_plugin_store_host */
1747 int
1748 sdb_plugin_store_service(const char *hostname, const char *name,
1749 sdb_time_t last_update)
1750 {
1751 sdb_store_service_t service = SDB_STORE_SERVICE_INIT;
1752 char *backends[1];
1753 char *cname;
1755 sdb_llist_iter_t *iter;
1756 sdb_data_t d;
1758 int status = 0;
1760 if ((! hostname) || (! name))
1761 return -1;
1763 if (! sdb_llist_len(writer_list)) {
1764 sdb_log(SDB_LOG_ERR, "core: Cannot store service: "
1765 "no writers registered");
1766 return -1;
1767 }
1769 cname = sdb_plugin_cname(strdup(hostname));
1770 if (! cname) {
1771 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1772 return -1;
1773 }
1775 service.hostname = cname;
1776 service.name = name;
1777 service.last_update = last_update ? last_update : sdb_gettime();
1778 if (get_interval(SDB_SERVICE, cname, -1, NULL, name,
1779 service.last_update, &service.interval)) {
1780 free(cname);
1781 return 1;
1782 }
1783 service.backends = (const char * const *)backends;
1784 get_backend(backends, &service.backends_num);
1786 iter = sdb_llist_get_iter(writer_list);
1787 while (sdb_llist_iter_has_next(iter)) {
1788 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1789 int s;
1790 assert(writer);
1791 s = writer->impl.store_service(&service, writer->w_user_data);
1792 if (((s > 0) && (status >= 0)) || (s < 0))
1793 status = s;
1794 }
1795 sdb_llist_iter_destroy(iter);
1797 if (! status) {
1798 /* record the hostname as an attribute */
1799 d.type = SDB_TYPE_STRING;
1800 d.data.string = cname;
1801 if (sdb_plugin_store_service_attribute(cname, name,
1802 "hostname", &d, service.last_update))
1803 status = -1;
1804 }
1806 free(cname);
1807 return status;
1808 } /* sdb_plugin_store_service */
1810 int
1811 sdb_plugin_store_metric(const char *hostname, const char *name,
1812 sdb_metric_store_t *store, sdb_time_t last_update)
1813 {
1814 sdb_store_metric_t metric = SDB_STORE_METRIC_INIT;
1815 char *backends[1];
1816 char *cname;
1818 sdb_llist_iter_t *iter;
1819 sdb_data_t d;
1821 int status = 0;
1823 if ((! hostname) || (! name))
1824 return -1;
1826 if (! sdb_llist_len(writer_list)) {
1827 sdb_log(SDB_LOG_ERR, "core: Cannot store metric: "
1828 "no writers registered");
1829 return -1;
1830 }
1832 cname = sdb_plugin_cname(strdup(hostname));
1833 if (! cname) {
1834 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1835 return -1;
1836 }
1838 if (store && ((! store->type) || (! store->id)))
1839 store = NULL;
1841 metric.hostname = cname;
1842 metric.name = name;
1843 if (store) {
1844 if (store->last_update < last_update)
1845 store->last_update = last_update;
1846 metric.stores = store;
1847 metric.stores_num = 1;
1848 }
1849 metric.last_update = last_update ? last_update : sdb_gettime();
1850 if (get_interval(SDB_METRIC, cname, -1, NULL, name,
1851 metric.last_update, &metric.interval)) {
1852 free(cname);
1853 return 1;
1854 }
1855 metric.backends = (const char * const *)backends;
1856 get_backend(backends, &metric.backends_num);
1858 iter = sdb_llist_get_iter(writer_list);
1859 while (sdb_llist_iter_has_next(iter)) {
1860 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1861 int s;
1862 assert(writer);
1863 s = writer->impl.store_metric(&metric, writer->w_user_data);
1864 if (((s > 0) && (status >= 0)) || (s < 0))
1865 status = s;
1866 }
1867 sdb_llist_iter_destroy(iter);
1869 if (! status) {
1870 /* record the hostname as an attribute */
1871 d.type = SDB_TYPE_STRING;
1872 d.data.string = cname;
1873 if (sdb_plugin_store_metric_attribute(cname, name,
1874 "hostname", &d, metric.last_update))
1875 status = -1;
1876 }
1878 free(cname);
1879 return status;
1880 } /* sdb_plugin_store_metric */
1882 int
1883 sdb_plugin_store_attribute(const char *hostname, const char *key,
1884 const sdb_data_t *value, sdb_time_t last_update)
1885 {
1886 sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1887 char *backends[1];
1888 char *cname;
1890 sdb_llist_iter_t *iter;
1891 int status = 0;
1893 if ((! hostname) || (! key) || (! value))
1894 return -1;
1896 if (! sdb_llist_len(writer_list)) {
1897 sdb_log(SDB_LOG_ERR, "core: Cannot store attribute: "
1898 "no writers registered");
1899 return -1;
1900 }
1902 cname = sdb_plugin_cname(strdup(hostname));
1903 if (! cname) {
1904 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1905 return -1;
1906 }
1908 attr.parent_type = SDB_HOST;
1909 attr.parent = cname;
1910 attr.key = key;
1911 attr.value = *value;
1912 attr.last_update = last_update ? last_update : sdb_gettime();
1913 if (get_interval(SDB_ATTRIBUTE, cname, -1, NULL, key,
1914 attr.last_update, &attr.interval)) {
1915 free(cname);
1916 return 1;
1917 }
1918 attr.backends = (const char * const *)backends;
1919 get_backend(backends, &attr.backends_num);
1921 iter = sdb_llist_get_iter(writer_list);
1922 while (sdb_llist_iter_has_next(iter)) {
1923 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1924 int s;
1925 assert(writer);
1926 s = writer->impl.store_attribute(&attr, writer->w_user_data);
1927 if (((s > 0) && (status >= 0)) || (s < 0))
1928 status = s;
1929 }
1930 sdb_llist_iter_destroy(iter);
1931 free(cname);
1932 return status;
1933 } /* sdb_plugin_store_attribute */
1935 int
1936 sdb_plugin_store_service_attribute(const char *hostname, const char *service,
1937 const char *key, const sdb_data_t *value, sdb_time_t last_update)
1938 {
1939 sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1940 char *backends[1];
1941 char *cname;
1943 sdb_llist_iter_t *iter;
1944 int status = 0;
1946 if ((! hostname) || (! service) || (! key) || (! value))
1947 return -1;
1949 if (! sdb_llist_len(writer_list)) {
1950 sdb_log(SDB_LOG_ERR, "core: Cannot store service attribute: "
1951 "no writers registered");
1952 return -1;
1953 }
1955 cname = sdb_plugin_cname(strdup(hostname));
1956 if (! cname) {
1957 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1958 return -1;
1959 }
1961 attr.hostname = cname;
1962 attr.parent_type = SDB_SERVICE;
1963 attr.parent = service;
1964 attr.key = key;
1965 attr.value = *value;
1966 attr.last_update = last_update ? last_update : sdb_gettime();
1967 if (get_interval(SDB_ATTRIBUTE, cname, SDB_SERVICE, service, key,
1968 attr.last_update, &attr.interval)) {
1969 free(cname);
1970 return 1;
1971 }
1972 attr.backends = (const char * const *)backends;
1973 get_backend(backends, &attr.backends_num);
1975 iter = sdb_llist_get_iter(writer_list);
1976 while (sdb_llist_iter_has_next(iter)) {
1977 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1978 int s;
1979 assert(writer);
1980 s = writer->impl.store_attribute(&attr, writer->w_user_data);
1981 if (((s > 0) && (status >= 0)) || (s < 0))
1982 status = s;
1983 }
1984 sdb_llist_iter_destroy(iter);
1985 free(cname);
1986 return status;
1987 } /* sdb_plugin_store_service_attribute */
1989 int
1990 sdb_plugin_store_metric_attribute(const char *hostname, const char *metric,
1991 const char *key, const sdb_data_t *value, sdb_time_t last_update)
1992 {
1993 sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1994 char *backends[1];
1995 char *cname;
1997 sdb_llist_iter_t *iter;
1998 int status = 0;
2000 if ((! hostname) || (! metric) || (! key) || (! value))
2001 return -1;
2003 if (! sdb_llist_len(writer_list)) {
2004 sdb_log(SDB_LOG_ERR, "core: Cannot store metric attribute: "
2005 "no writers registered");
2006 return -1;
2007 }
2009 cname = sdb_plugin_cname(strdup(hostname));
2010 if (! cname) {
2011 sdb_log(SDB_LOG_ERR, "core: strdup failed");
2012 return -1;
2013 }
2015 attr.hostname = cname;
2016 attr.parent_type = SDB_METRIC;
2017 attr.parent = metric;
2018 attr.key = key;
2019 attr.value = *value;
2020 attr.last_update = last_update ? last_update : sdb_gettime();
2021 if (get_interval(SDB_ATTRIBUTE, cname, SDB_METRIC, metric, key,
2022 attr.last_update, &attr.interval)) {
2023 free(cname);
2024 return 1;
2025 }
2026 attr.backends = (const char * const *)backends;
2027 get_backend(backends, &attr.backends_num);
2029 iter = sdb_llist_get_iter(writer_list);
2030 while (sdb_llist_iter_has_next(iter)) {
2031 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
2032 int s;
2033 assert(writer);
2034 s = writer->impl.store_attribute(&attr, writer->w_user_data);
2035 if (((s > 0) && (status >= 0)) || (s < 0))
2036 status = s;
2037 }
2038 sdb_llist_iter_destroy(iter);
2039 free(cname);
2040 return status;
2041 } /* sdb_plugin_store_metric_attribute */
2043 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */