Code

parser: Let the TIMESERIES command accept optional data-source names.
[sysdb.git] / src / core / plugin.c
1 /*
2  * SysDB - src/core/plugin.c
3  * Copyright (C) 2012 Sebastian 'tokkee' Harl <sh@tokkee.org>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
19  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
28 #if HAVE_CONFIG_H
29 #       include "config.h"
30 #endif /* HAVE_CONFIG_H */
32 #include "sysdb.h"
33 #include "core/plugin.h"
34 #include "core/time.h"
35 #include "utils/error.h"
36 #include "utils/llist.h"
37 #include "utils/strbuf.h"
39 #include <assert.h>
41 #include <errno.h>
43 #include <stdarg.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <strings.h>
47 #include <unistd.h>
49 #include <ltdl.h>
51 #include <pthread.h>
53 /* helper to access info attributes */
54 #define INFO_GET(i, attr) \
55         ((i)->attr ? (i)->attr : #attr" not set")
57 /*
58  * private data types
59  */
61 typedef struct {
62         sdb_object_t super;
63         sdb_plugin_ctx_t public;
65         sdb_plugin_info_t info;
66         lt_dlhandle handle;
68         /* The usage count differs from the object's ref count
69          * in that it provides higher level information about how
70          * the plugin is in use. */
71         size_t use_cnt;
72 } ctx_t;
73 #define CTX_INIT { SDB_OBJECT_INIT, \
74         SDB_PLUGIN_CTX_INIT, SDB_PLUGIN_INFO_INIT, NULL, 0 }
76 #define CTX(obj) ((ctx_t *)(obj))
78 typedef struct {
79         sdb_object_t super;
80         void *cb_callback;
81         sdb_object_t *cb_user_data;
82         ctx_t *cb_ctx;
83 } callback_t;
84 #define CB_INIT { SDB_OBJECT_INIT, \
85         /* callback = */ NULL, /* user_data = */ NULL, \
86         SDB_PLUGIN_CTX_INIT }
87 #define CB(obj) ((callback_t *)(obj))
88 #define CONST_CB(obj) ((const callback_t *)(obj))
90 typedef struct {
91         callback_t super;
92 #define ccb_callback super.cb_callback
93 #define ccb_user_data super.cb_user_data
94 #define ccb_ctx super.cb_ctx
95         sdb_time_t ccb_interval;
96         sdb_time_t ccb_next_update;
97 } collector_t;
98 #define CCB(obj) ((collector_t *)(obj))
99 #define CONST_CCB(obj) ((const collector_t *)(obj))
101 typedef struct {
102         callback_t super; /* cb_callback will always be NULL */
103 #define w_user_data super.cb_user_data
104 #define w_ctx super.cb_ctx
105         sdb_store_writer_t impl;
106 } writer_t;
107 #define WRITER(obj) ((writer_t *)(obj))
109 typedef struct {
110         callback_t super; /* cb_callback will always be NULL */
111 #define r_user_data super.cb_user_data
112 #define r_ctx super.cb_ctx
113         sdb_store_reader_t impl;
114 } reader_t;
115 #define READER(obj) ((reader_t *)(obj))
117 typedef struct {
118         callback_t super; /* cb_callback will always be NULL */
119 #define ts_user_data super.cb_user_data
120 #define ts_ctx super.cb_ctx
121         sdb_timeseries_fetcher_t impl;
122 } ts_fetcher_t;
123 #define TS_FETCHER(obj) ((ts_fetcher_t *)(obj))
125 /*
126  * private variables
127  */
129 static sdb_plugin_ctx_t  plugin_default_ctx  = SDB_PLUGIN_CTX_INIT;
130 static sdb_plugin_info_t plugin_default_info = SDB_PLUGIN_INFO_INIT;
132 static pthread_key_t     plugin_ctx_key;
133 static bool              plugin_ctx_key_initialized = 0;
135 /* a list of the plugin contexts of all registered plugins */
136 static sdb_llist_t      *all_plugins = NULL;
138 static sdb_llist_t      *config_list = NULL;
139 static sdb_llist_t      *init_list = NULL;
140 static sdb_llist_t      *collector_list = NULL;
141 static sdb_llist_t      *cname_list = NULL;
142 static sdb_llist_t      *shutdown_list = NULL;
143 static sdb_llist_t      *log_list = NULL;
144 static sdb_llist_t      *timeseries_fetcher_list = NULL;
145 static sdb_llist_t      *writer_list = NULL;
146 static sdb_llist_t      *reader_list = NULL;
148 static struct {
149         const char   *type;
150         sdb_llist_t **list;
151 } all_lists[] = {
152         { "config",             &config_list },
153         { "init",               &init_list },
154         { "collector",          &collector_list },
155         { "cname",              &cname_list },
156         { "shutdown",           &shutdown_list },
157         { "log",                &log_list },
158         { "timeseries fetcher", &timeseries_fetcher_list },
159         { "store writer",       &writer_list },
160         { "store reader",       &reader_list },
161 };
163 /*
164  * private helper functions
165  */
167 static void
168 plugin_info_clear(sdb_plugin_info_t *info)
170         sdb_plugin_info_t empty_info = SDB_PLUGIN_INFO_INIT;
171         if (! info)
172                 return;
174         if (info->plugin_name)
175                 free(info->plugin_name);
176         if (info->filename)
177                 free(info->filename);
179         if (info->description)
180                 free(info->description);
181         if (info->copyright)
182                 free(info->copyright);
183         if (info->license)
184                 free(info->license);
186         *info = empty_info;
187 } /* plugin_info_clear */
189 static void
190 ctx_key_init(void)
192         if (plugin_ctx_key_initialized)
193                 return;
195         pthread_key_create(&plugin_ctx_key, /* destructor */ NULL);
196         plugin_ctx_key_initialized = 1;
197 } /* ctx_key_init */
199 static int
200 plugin_cmp_next_update(const sdb_object_t *a, const sdb_object_t *b)
202         const collector_t *ccb1 = (const collector_t *)a;
203         const collector_t *ccb2 = (const collector_t *)b;
205         assert(ccb1 && ccb2);
207         return (ccb1->ccb_next_update > ccb2->ccb_next_update)
208                 ? 1 : (ccb1->ccb_next_update < ccb2->ccb_next_update)
209                 ? -1 : 0;
210 } /* plugin_cmp_next_update */
212 static int
213 plugin_lookup_by_name(const sdb_object_t *obj, const void *id)
215         const callback_t *cb = CONST_CB(obj);
216         const char *name = id;
218         assert(cb && id);
220         /* when a plugin was registered from outside a plugin (e.g. the core),
221          * we don't have a plugin context */
222         if (! cb->cb_ctx)
223                 return 1;
225         if (!strcasecmp(cb->cb_ctx->info.plugin_name, name))
226                 return 0;
227         return 1;
228 } /* plugin_lookup_by_name */
230 /* since this function is called from sdb_plugin_reconfigure_finish()
231  * when iterating through all_plugins, we may not do any additional
232  * modifications to all_plugins except for the optional removal */
233 static void
234 plugin_unregister_by_name(const char *plugin_name)
236         sdb_object_t *obj;
237         size_t i;
239         for (i = 0; i < SDB_STATIC_ARRAY_LEN(all_lists); ++i) {
240                 const char  *type =  all_lists[i].type;
241                 sdb_llist_t *list = *all_lists[i].list;
243                 while (1) {
244                         callback_t *cb;
246                         cb = CB(sdb_llist_remove(list,
247                                                 plugin_lookup_by_name, plugin_name));
248                         if (! cb)
249                                 break;
251                         assert(cb->cb_ctx);
253                         sdb_log(SDB_LOG_INFO, "Unregistering %s callback '%s' (module %s)",
254                                         type, cb->super.name, cb->cb_ctx->info.plugin_name);
255                         sdb_object_deref(SDB_OBJ(cb));
256                 }
257         }
259         obj = sdb_llist_search_by_name(all_plugins, plugin_name);
260         /* when called from sdb_plugin_reconfigure_finish, the object has already
261          * been removed from the list */
262         if (obj && (obj->ref_cnt <= 1)) {
263                 sdb_llist_remove_by_name(all_plugins, plugin_name);
264                 sdb_object_deref(obj);
265         }
266         /* else: other callbacks still reference it */
267 } /* plugin_unregister_by_name */
269 /*
270  * store writer wrapper for performing database queries:
271  * It wraps another store writer, adding extra logic as needed.
272  */
274 typedef struct {
275         sdb_object_t super;
276         sdb_store_writer_t *w;
277         sdb_object_t *ud;
278         sdb_query_opts_t opts;
279 } query_writer_t;
280 #define QUERY_WRITER_INIT(w, ud) { \
281         SDB_OBJECT_INIT, \
282         (w), (ud), \
283         SDB_DEFAULT_QUERY_OPTS \
285 #define QUERY_WRITER(obj) ((query_writer_t *)(obj))
287 static int
288 query_store_host(sdb_store_host_t *host, sdb_object_t *user_data)
290         query_writer_t *qw = QUERY_WRITER(user_data);
291         return qw->w->store_host(host, qw->ud);
292 } /* query_store_host */
294 static int
295 query_store_service(sdb_store_service_t *service, sdb_object_t *user_data)
297         query_writer_t *qw = QUERY_WRITER(user_data);
298         return qw->w->store_service(service, qw->ud);
299 } /* query_store_service */
301 static int
302 query_store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
304         query_writer_t *qw = QUERY_WRITER(user_data);
305         sdb_timeseries_info_t *infos[metric->stores_num];
306         sdb_metric_store_t stores[metric->stores_num];
308         const sdb_metric_store_t *orig_stores = metric->stores;
309         int status;
310         size_t i;
312         if (! qw->opts.describe_timeseries)
313                 /* nothing further to do */
314                 return qw->w->store_metric(metric, qw->ud);
316         for (i = 0; i < metric->stores_num; i++) {
317                 sdb_metric_store_t *s = stores + i;
318                 *s = metric->stores[i];
319                 infos[i] = sdb_plugin_describe_timeseries(s->type, s->id);
320                 s->info = infos[i];
321         }
323         metric->stores = stores;
324         status = qw->w->store_metric(metric, qw->ud);
325         metric->stores = orig_stores;
327         for (i = 0; i < metric->stores_num; i++)
328                 sdb_timeseries_info_destroy(infos[i]);
329         return status;
330 } /* query_store_metric */
332 static int
333 query_store_attribute(sdb_store_attribute_t *attr, sdb_object_t *user_data)
335         query_writer_t *qw = QUERY_WRITER(user_data);
336         return qw->w->store_attribute(attr, qw->ud);
337 } /* query_store_attribute */
339 static sdb_store_writer_t query_writer = {
340         query_store_host, query_store_service,
341         query_store_metric, query_store_attribute,
342 };
344 /*
345  * private types
346  */
348 static int
349 ctx_init(sdb_object_t *obj, va_list __attribute__((unused)) ap)
351         ctx_t *ctx = CTX(obj);
353         assert(ctx);
355         ctx->public = plugin_default_ctx;
356         ctx->info = plugin_default_info;
357         ctx->handle = NULL;
358         ctx->use_cnt = 1;
359         return 0;
360 } /* ctx_init */
362 static void
363 ctx_destroy(sdb_object_t *obj)
365         ctx_t *ctx = CTX(obj);
367         if (ctx->handle) {
368                 const char *err;
370                 sdb_log(SDB_LOG_INFO, "Unloading module %s", ctx->info.plugin_name);
372                 lt_dlerror();
373                 lt_dlclose(ctx->handle);
374                 if ((err = lt_dlerror()))
375                         sdb_log(SDB_LOG_WARNING, "Failed to unload module %s: %s",
376                                         ctx->info.plugin_name, err);
377         }
379         plugin_info_clear(&ctx->info);
380 } /* ctx_destroy */
382 static sdb_type_t ctx_type = {
383         sizeof(ctx_t),
385         ctx_init,
386         ctx_destroy
387 };
389 static ctx_t *
390 ctx_get(void)
392         if (! plugin_ctx_key_initialized)
393                 ctx_key_init();
394         return pthread_getspecific(plugin_ctx_key);
395 } /* ctx_get */
397 static ctx_t *
398 ctx_set(ctx_t *new)
400         ctx_t *old;
402         if (! plugin_ctx_key_initialized)
403                 ctx_key_init();
405         old = pthread_getspecific(plugin_ctx_key);
406         if (old)
407                 sdb_object_deref(SDB_OBJ(old));
408         if (new)
409                 sdb_object_ref(SDB_OBJ(new));
410         pthread_setspecific(plugin_ctx_key, new);
411         return old;
412 } /* ctx_set */
414 static ctx_t *
415 ctx_create(const char *name)
417         ctx_t *ctx;
419         ctx = CTX(sdb_object_create(name, ctx_type));
420         if (! ctx)
421                 return NULL;
423         if (! plugin_ctx_key_initialized)
424                 ctx_key_init();
425         ctx_set(ctx);
426         return ctx;
427 } /* ctx_create */
429 /*
430  * plugin_init_ok:
431  * Checks whether the registration of a new plugin identified by 'obj' is
432  * okay. It consumes the first two arguments of 'ap'.
433  */
434 static bool
435 plugin_init_ok(sdb_object_t *obj, va_list ap)
437         sdb_llist_t **list = va_arg(ap, sdb_llist_t **);
438         const char *type = va_arg(ap, const char *);
440         assert(list); assert(type);
442         if (sdb_llist_search_by_name(*list, obj->name)) {
443                 sdb_log(SDB_LOG_WARNING, "%s callback '%s' has already been "
444                                 "registered. Ignoring newly registered version.",
445                                 type, obj->name);
446                 return 0;
447         }
448         return 1;
449 } /* plugin_init_ok */
451 static int
452 plugin_cb_init(sdb_object_t *obj, va_list ap)
454         void *callback;
455         sdb_object_t *ud;
457         if (! plugin_init_ok(obj, ap))
458                 return -1;
460         callback = va_arg(ap, void *);
461         ud = va_arg(ap, sdb_object_t *);
463         /* cb_ctx may be NULL if the plugin was not registered by a plugin */
465         CB(obj)->cb_callback = callback;
466         CB(obj)->cb_ctx      = ctx_get();
467         sdb_object_ref(SDB_OBJ(CB(obj)->cb_ctx));
469         sdb_object_ref(ud);
470         CB(obj)->cb_user_data = ud;
471         return 0;
472 } /* plugin_cb_init */
474 static void
475 plugin_cb_destroy(sdb_object_t *obj)
477         assert(obj);
478         sdb_object_deref(CB(obj)->cb_user_data);
479         sdb_object_deref(SDB_OBJ(CB(obj)->cb_ctx));
480 } /* plugin_cb_destroy */
482 static sdb_type_t callback_type = {
483         sizeof(callback_t),
485         plugin_cb_init,
486         plugin_cb_destroy
487 };
489 static sdb_type_t collector_type = {
490         sizeof(collector_t),
492         plugin_cb_init,
493         plugin_cb_destroy
494 };
496 static int
497 plugin_writer_init(sdb_object_t *obj, va_list ap)
499         sdb_store_writer_t *impl;
500         sdb_object_t *ud;
502         if (! plugin_init_ok(obj, ap))
503                 return -1;
505         impl = va_arg(ap, sdb_store_writer_t *);
506         ud = va_arg(ap, sdb_object_t *);
507         assert(impl);
509         if ((! impl->store_host) || (! impl->store_service)
510                         || (! impl->store_metric) || (! impl->store_attribute)) {
511                 sdb_log(SDB_LOG_ERR, "store writer callback '%s' does not fully "
512                                 "implement the writer interface.", obj->name);
513                 return -1;
514         }
516         /* ctx may be NULL if the callback was not registered by a plugin */
518         WRITER(obj)->impl = *impl;
519         WRITER(obj)->w_ctx  = ctx_get();
520         sdb_object_ref(SDB_OBJ(WRITER(obj)->w_ctx));
522         sdb_object_ref(ud);
523         WRITER(obj)->w_user_data = ud;
524         return 0;
525 } /* plugin_writer_init */
527 static void
528 plugin_writer_destroy(sdb_object_t *obj)
530         assert(obj);
531         sdb_object_deref(WRITER(obj)->w_user_data);
532         sdb_object_deref(SDB_OBJ(WRITER(obj)->w_ctx));
533 } /* plugin_writer_destroy */
535 static sdb_type_t writer_type = {
536         sizeof(writer_t),
538         plugin_writer_init,
539         plugin_writer_destroy
540 };
542 static int
543 plugin_reader_init(sdb_object_t *obj, va_list ap)
545         sdb_store_reader_t *impl;
546         sdb_object_t *ud;
548         if (! plugin_init_ok(obj, ap))
549                 return -1;
551         impl = va_arg(ap, sdb_store_reader_t *);
552         ud = va_arg(ap, sdb_object_t *);
553         assert(impl);
555         if ((! impl->prepare_query) || (! impl->execute_query)) {
556                 sdb_log(SDB_LOG_ERR, "store reader callback '%s' does not fully "
557                                 "implement the reader interface.", obj->name);
558                 return -1;
559         }
561         /* ctx may be NULL if the callback was not registered by a plugin */
563         READER(obj)->impl = *impl;
564         READER(obj)->r_ctx  = ctx_get();
565         sdb_object_ref(SDB_OBJ(READER(obj)->r_ctx));
567         sdb_object_ref(ud);
568         READER(obj)->r_user_data = ud;
569         return 0;
570 } /* plugin_reader_init */
572 static void
573 plugin_reader_destroy(sdb_object_t *obj)
575         assert(obj);
576         sdb_object_deref(READER(obj)->r_user_data);
577         sdb_object_deref(SDB_OBJ(READER(obj)->r_ctx));
578 } /* plugin_reader_destroy */
580 static sdb_type_t reader_type = {
581         sizeof(reader_t),
583         plugin_reader_init,
584         plugin_reader_destroy
585 };
587 static int
588 plugin_ts_fetcher_init(sdb_object_t *obj, va_list ap)
590         sdb_timeseries_fetcher_t *impl;
591         sdb_object_t *ud;
593         if (! plugin_init_ok(obj, ap))
594                 return -1;
596         impl = va_arg(ap, sdb_timeseries_fetcher_t *);
597         ud = va_arg(ap, sdb_object_t *);
598         assert(impl);
600         if ((! impl->describe) || (! impl->fetch)) {
601                 sdb_log(SDB_LOG_ERR, "timeseries fetcher callback '%s' does not fully "
602                                 "implement the interface.", obj->name);
603                 return -1;
604         }
606         /* ctx may be NULL if the callback was not registered by a plugin */
608         TS_FETCHER(obj)->impl = *impl;
609         TS_FETCHER(obj)->ts_ctx  = ctx_get();
610         sdb_object_ref(SDB_OBJ(TS_FETCHER(obj)->ts_ctx));
612         sdb_object_ref(ud);
613         TS_FETCHER(obj)->ts_user_data = ud;
614         return 0;
615 } /* plugin_ts_fetcher_init */
617 static void
618 plugin_ts_fetcher_destroy(sdb_object_t *obj)
620         assert(obj);
621         sdb_object_deref(TS_FETCHER(obj)->ts_user_data);
622         sdb_object_deref(SDB_OBJ(TS_FETCHER(obj)->ts_ctx));
623 } /* plugin_ts_fetcher_destroy */
625 static sdb_type_t ts_fetcher_type = {
626         sizeof(ts_fetcher_t),
628         plugin_ts_fetcher_init,
629         plugin_ts_fetcher_destroy
630 };
632 static int
633 module_init(const char *name, lt_dlhandle lh, sdb_plugin_info_t *info)
635         int (*mod_init)(sdb_plugin_info_t *);
636         int status;
638         mod_init = (int (*)(sdb_plugin_info_t *))lt_dlsym(lh, "sdb_module_init");
639         if (! mod_init) {
640                 sdb_log(SDB_LOG_ERR, "Failed to load plugin '%s': "
641                                 "could not find symbol 'sdb_module_init'", name);
642                 return -1;
643         }
645         status = mod_init(info);
646         if (status) {
647                 sdb_log(SDB_LOG_ERR, "Failed to initialize module '%s'", name);
648                 plugin_unregister_by_name(name);
649                 return -1;
650         }
651         return 0;
652 } /* module_init */
654 static int
655 module_load(const char *basedir, const char *name,
656                 const sdb_plugin_ctx_t *plugin_ctx)
658         char  base_name[name ? strlen(name) + 1 : 1];
659         const char *name_ptr;
660         char *tmp;
662         char filename[1024];
663         lt_dlhandle lh;
665         ctx_t *ctx;
667         int status;
669         assert(name);
671         base_name[0] = '\0';
672         name_ptr = name;
674         while ((tmp = strstr(name_ptr, "::"))) {
675                 strncat(base_name, name_ptr, (size_t)(tmp - name_ptr));
676                 strcat(base_name, "/");
677                 name_ptr = tmp + strlen("::");
678         }
679         strcat(base_name, name_ptr);
681         if (! basedir)
682                 basedir = PKGLIBDIR;
684         snprintf(filename, sizeof(filename), "%s/%s.so", basedir, base_name);
685         filename[sizeof(filename) - 1] = '\0';
687         if (access(filename, R_OK)) {
688                 char errbuf[1024];
689                 sdb_log(SDB_LOG_ERR, "Failed to load plugin '%s' (%s): %s",
690                                 name, filename, sdb_strerror(errno, errbuf, sizeof(errbuf)));
691                 return -1;
692         }
694         lt_dlinit();
695         lt_dlerror();
697         lh = lt_dlopen(filename);
698         if (! lh) {
699                 sdb_log(SDB_LOG_ERR, "Failed to load plugin '%s': %s"
700                                 "The most common cause for this problem are missing "
701                                 "dependencies.\n", name, lt_dlerror());
702                 return -1;
703         }
705         if (ctx_get())
706                 sdb_log(SDB_LOG_WARNING, "Discarding old plugin context");
708         ctx = ctx_create(name);
709         if (! ctx) {
710                 sdb_log(SDB_LOG_ERR, "Failed to initialize plugin context");
711                 return -1;
712         }
714         ctx->info.plugin_name = strdup(name);
715         ctx->info.filename = strdup(filename);
716         ctx->handle = lh;
718         if (plugin_ctx)
719                 ctx->public = *plugin_ctx;
721         if ((status = module_init(name, lh, &ctx->info))) {
722                 sdb_object_deref(SDB_OBJ(ctx));
723                 ctx_set(NULL);
724                 return status;
725         }
727         /* compare minor version */
728         if ((ctx->info.version < 0)
729                         || ((int)(ctx->info.version / 100) != (int)(SDB_VERSION / 100)))
730                 sdb_log(SDB_LOG_WARNING, "WARNING: plugin version (%i.%i.%i) "
731                                 "does not match core version (%i.%i.%i); "
732                                 "this might cause problems",
733                                 SDB_VERSION_DECODE(ctx->info.version),
734                                 SDB_VERSION_DECODE(SDB_VERSION));
736         sdb_llist_append(all_plugins, SDB_OBJ(ctx));
738         /* log messages will be prefixed by the plugin name */
739         sdb_log(SDB_LOG_INFO, "Successfully loaded "
740                         "version %i (%s)", ctx->info.plugin_version,
741                         INFO_GET(&ctx->info, description));
742         sdb_log(SDB_LOG_INFO, "%s, License: %s",
743                         INFO_GET(&ctx->info, copyright),
744                         INFO_GET(&ctx->info, license));
746         /* any registered callbacks took ownership of the context */
747         sdb_object_deref(SDB_OBJ(ctx));
749         /* reset */
750         ctx_set(NULL);
751         return 0;
752 } /* module_load */
754 static char *
755 plugin_get_name(const char *name, char *buf, size_t bufsize)
757         ctx_t *ctx = ctx_get();
759         if (ctx)
760                 snprintf(buf, bufsize, "%s::%s", ctx->info.plugin_name, name);
761         else
762                 snprintf(buf, bufsize, "core::%s", name);
763         return buf;
764 } /* plugin_get_name */
766 static int
767 plugin_add_impl(sdb_llist_t **list, sdb_type_t T, const char *type,
768                 const char *name, void *impl, sdb_object_t *user_data)
770         sdb_object_t *obj;
772         if ((! name) || (! impl))
773                 return -1;
775         assert(list);
777         if (! *list)
778                 *list = sdb_llist_create();
779         if (! *list)
780                 return -1;
782         obj = sdb_object_create(name, T, list, type, impl, user_data);
783         if (! obj)
784                 return -1;
786         if (sdb_llist_append(*list, obj)) {
787                 sdb_object_deref(obj);
788                 return -1;
789         }
791         /* pass control to the list */
792         sdb_object_deref(obj);
794         sdb_log(SDB_LOG_INFO, "Registered %s callback '%s'.", type, name);
795         return 0;
796 } /* plugin_add_impl */
798 /*
799  * object meta-data
800  */
802 typedef struct {
803         int obj_type;
804         sdb_time_t last_update;
805         sdb_time_t interval;
806 } interval_fetcher_t;
808 static int
809 interval_fetcher_host(sdb_store_host_t *host, sdb_object_t *user_data)
811         interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
812         lu->obj_type = SDB_HOST;
813         lu->last_update = host->last_update;
814         return 0;
815 } /* interval_fetcher_host */
817 static int
818 interval_fetcher_service(sdb_store_service_t *svc, sdb_object_t *user_data)
820         interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
821         lu->obj_type = SDB_SERVICE;
822         lu->last_update = svc->last_update;
823         return 0;
824 } /* interval_fetcher_service */
826 static int
827 interval_fetcher_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
829         interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
830         lu->obj_type = SDB_METRIC;
831         lu->last_update = metric->last_update;
832         return 0;
833 } /* interval_fetcher_metric */
835 static int
836 interval_fetcher_attr(sdb_store_attribute_t *attr, sdb_object_t *user_data)
838         interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
839         lu->obj_type = SDB_ATTRIBUTE;
840         lu->last_update = attr->last_update;
841         return 0;
842 } /* interval_fetcher_attr */
844 static sdb_store_writer_t interval_fetcher = {
845         interval_fetcher_host, interval_fetcher_service,
846         interval_fetcher_metric, interval_fetcher_attr,
847 };
849 static int
850 get_interval(int obj_type, const char *hostname,
851                 int parent_type, const char *parent, const char *name,
852                 sdb_time_t last_update, sdb_time_t *interval_out)
854         sdb_ast_fetch_t fetch = SDB_AST_FETCH_INIT;
855         char hn[hostname ? strlen(hostname) + 1 : 1];
856         char pn[parent ? strlen(parent) + 1 : 1];
857         char n[strlen(name) + 1];
858         int status;
860         interval_fetcher_t lu = { 0, 0, 0 };
861         sdb_object_wrapper_t obj = SDB_OBJECT_WRAPPER_STATIC(&lu);
862         sdb_time_t interval;
864         assert(name);
866         if (hostname)
867                 strncpy(hn, hostname, sizeof(hn));
868         if (parent)
869                 strncpy(pn, parent, sizeof(pn));
870         strncpy(n, name, sizeof(n));
872         fetch.obj_type = obj_type;
873         fetch.hostname = hostname ? hn : NULL;
874         fetch.parent_type = parent_type;
875         fetch.parent = parent ? pn : NULL;
876         fetch.name = n;
878         status = sdb_plugin_query(SDB_AST_NODE(&fetch),
879                         &interval_fetcher, SDB_OBJ(&obj), NULL, NULL);
880         if ((status < 0) || (lu.obj_type != obj_type) || (lu.last_update == 0)) {
881                 *interval_out = 0;
882                 return 0;
883         }
885         if (lu.last_update >= last_update) {
886                 if (lu.last_update > last_update)
887                         sdb_log(SDB_LOG_DEBUG, "Cannot update %s '%s' - "
888                                         "value too old (%"PRIsdbTIME" < %"PRIsdbTIME")",
889                                         SDB_STORE_TYPE_TO_NAME(obj_type), name,
890                                         lu.last_update, last_update);
891                 *interval_out = lu.interval;
892                 return 1;
893         }
895         interval = last_update - lu.last_update;
896         if (lu.interval && interval)
897                 interval = (sdb_time_t)((0.9 * (double)lu.interval)
898                                 + (0.1 * (double)interval));
899         *interval_out = interval;
900         return 0;
901 } /* get_interval */
903 static void
904 get_backend(char **backends, size_t *backends_num)
906         const sdb_plugin_info_t *info;
908         info = sdb_plugin_current();
909         if ((! info) || (! info->plugin_name) || (! *info->plugin_name)) {
910                 *backends_num = 0;
911                 return;
912         }
914         backends[0] = info->plugin_name;
915         *backends_num = 1;
916 } /* get_backend */
918 /*
919  * public API
920  */
922 int
923 sdb_plugin_load(const char *basedir, const char *name,
924                 const sdb_plugin_ctx_t *plugin_ctx)
926         ctx_t *ctx;
928         int status;
930         if ((! name) || (! *name))
931                 return -1;
933         if (! all_plugins) {
934                 if (! (all_plugins = sdb_llist_create())) {
935                         sdb_log(SDB_LOG_ERR, "Failed to load plugin '%s': "
936                                         "internal error while creating linked list", name);
937                         return -1;
938                 }
939         }
941         ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
942         if (ctx) {
943                 /* plugin already loaded */
944                 if (! ctx->use_cnt) {
945                         /* reloading plugin */
946                         ctx_t *old_ctx = ctx_set(ctx);
948                         status = module_init(ctx->info.plugin_name, ctx->handle, NULL);
949                         ctx_set(old_ctx);
950                         if (status)
951                                 return status;
953                         ctx_set(old_ctx);
954                         sdb_log(SDB_LOG_INFO, "Successfully reloaded plugin "
955                                         "'%s' (%s)", ctx->info.plugin_name,
956                                         INFO_GET(&ctx->info, description));
957                 }
958                 ++ctx->use_cnt;
959                 return 0;
960         }
962         return module_load(basedir, name, plugin_ctx);
963 } /* sdb_plugin_load */
965 int
966 sdb_plugin_set_info(sdb_plugin_info_t *info, int type, ...)
968         va_list ap;
970         if (! info)
971                 return -1;
973         va_start(ap, type);
975         switch (type) {
976                 case SDB_PLUGIN_INFO_DESC:
977                         {
978                                 char *desc = va_arg(ap, char *);
979                                 if (desc) {
980                                         if (info->description)
981                                                 free(info->description);
982                                         info->description = strdup(desc);
983                                 }
984                         }
985                         break;
986                 case SDB_PLUGIN_INFO_COPYRIGHT:
987                         {
988                                 char *copyright = va_arg(ap, char *);
989                                 if (copyright)
990                                         info->copyright = strdup(copyright);
991                         }
992                         break;
993                 case SDB_PLUGIN_INFO_LICENSE:
994                         {
995                                 char *license = va_arg(ap, char *);
996                                 if (license) {
997                                         if (info->license)
998                                                 free(info->license);
999                                         info->license = strdup(license);
1000                                 }
1001                         }
1002                         break;
1003                 case SDB_PLUGIN_INFO_VERSION:
1004                         {
1005                                 int version = va_arg(ap, int);
1006                                 info->version = version;
1007                         }
1008                         break;
1009                 case SDB_PLUGIN_INFO_PLUGIN_VERSION:
1010                         {
1011                                 int version = va_arg(ap, int);
1012                                 info->plugin_version = version;
1013                         }
1014                         break;
1015                 default:
1016                         va_end(ap);
1017                         return -1;
1018         }
1020         va_end(ap);
1021         return 0;
1022 } /* sdb_plugin_set_info */
1024 int
1025 sdb_plugin_register_config(sdb_plugin_config_cb callback)
1027         ctx_t *ctx = ctx_get();
1029         if (! ctx) {
1030                 sdb_log(SDB_LOG_ERR, "Invalid attempt to register a "
1031                                 "config callback from outside a plugin");
1032                 return -1;
1033         }
1034         return plugin_add_impl(&config_list, callback_type, "config",
1035                         ctx->info.plugin_name, (void *)callback, NULL);
1036 } /* sdb_plugin_register_config */
1038 int
1039 sdb_plugin_register_init(const char *name, sdb_plugin_init_cb callback,
1040                 sdb_object_t *user_data)
1042         char cb_name[1024];
1043         return plugin_add_impl(&init_list, callback_type, "init",
1044                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1045                         (void *)callback, user_data);
1046 } /* sdb_plugin_register_init */
1048 int
1049 sdb_plugin_register_shutdown(const char *name, sdb_plugin_shutdown_cb callback,
1050                 sdb_object_t *user_data)
1052         char cb_name[1024];
1053         return plugin_add_impl(&shutdown_list, callback_type, "shutdown",
1054                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1055                         (void *)callback, user_data);
1056 } /* sdb_plugin_register_shutdown */
1058 int
1059 sdb_plugin_register_log(const char *name, sdb_plugin_log_cb callback,
1060                 sdb_object_t *user_data)
1062         char cb_name[1024];
1063         return plugin_add_impl(&log_list, callback_type, "log",
1064                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1065                         callback, user_data);
1066 } /* sdb_plugin_register_log */
1068 int
1069 sdb_plugin_register_cname(const char *name, sdb_plugin_cname_cb callback,
1070                 sdb_object_t *user_data)
1072         char cb_name[1024];
1073         return plugin_add_impl(&cname_list, callback_type, "cname",
1074                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1075                         callback, user_data);
1076 } /* sdb_plugin_register_cname */
1078 int
1079 sdb_plugin_register_collector(const char *name, sdb_plugin_collector_cb callback,
1080                 const sdb_time_t *interval, sdb_object_t *user_data)
1082         char cb_name[1024];
1083         sdb_object_t *obj;
1085         if ((! name) || (! callback))
1086                 return -1;
1088         if (! collector_list)
1089                 collector_list = sdb_llist_create();
1090         if (! collector_list)
1091                 return -1;
1093         plugin_get_name(name, cb_name, sizeof(cb_name));
1095         obj = sdb_object_create(cb_name, collector_type,
1096                         &collector_list, "collector", callback, user_data);
1097         if (! obj)
1098                 return -1;
1100         if (interval)
1101                 CCB(obj)->ccb_interval = *interval;
1102         else {
1103                 ctx_t *ctx = ctx_get();
1105                 if (! ctx) {
1106                         sdb_log(SDB_LOG_ERR, "Cannot determine interval for collector %s; "
1107                                         "none specified and no plugin context found", cb_name);
1108                         return -1;
1109                 }
1111                 CCB(obj)->ccb_interval = ctx->public.interval;
1112         }
1114         if (! (CCB(obj)->ccb_next_update = sdb_gettime())) {
1115                 char errbuf[1024];
1116                 sdb_log(SDB_LOG_ERR, "Failed to determine current time: %s",
1117                                 sdb_strerror(errno, errbuf, sizeof(errbuf)));
1118                 sdb_object_deref(obj);
1119                 return -1;
1120         }
1122         if (sdb_llist_insert_sorted(collector_list, obj,
1123                                 plugin_cmp_next_update)) {
1124                 sdb_object_deref(obj);
1125                 return -1;
1126         }
1128         /* pass control to the list */
1129         sdb_object_deref(obj);
1131         sdb_log(SDB_LOG_INFO, "Registered collector callback '%s' "
1132                         "(interval = %.3fs).", cb_name,
1133                         SDB_TIME_TO_DOUBLE(CCB(obj)->ccb_interval));
1134         return 0;
1135 } /* sdb_plugin_register_collector */
1137 int
1138 sdb_plugin_register_timeseries_fetcher(const char *name,
1139                 sdb_timeseries_fetcher_t *fetcher, sdb_object_t *user_data)
1141         return plugin_add_impl(&timeseries_fetcher_list, ts_fetcher_type, "time-series fetcher",
1142                         name, fetcher, user_data);
1143 } /* sdb_plugin_register_timeseries_fetcher */
1145 int
1146 sdb_plugin_register_writer(const char *name,
1147                 sdb_store_writer_t *writer, sdb_object_t *user_data)
1149         char cb_name[1024];
1150         return plugin_add_impl(&writer_list, writer_type, "store writer",
1151                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1152                         writer, user_data);
1153 } /* sdb_store_register_writer */
1155 int
1156 sdb_plugin_register_reader(const char *name,
1157                 sdb_store_reader_t *reader, sdb_object_t *user_data)
1159         char cb_name[1024];
1160         return plugin_add_impl(&reader_list, reader_type, "store reader",
1161                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1162                         reader, user_data);
1163 } /* sdb_plugin_register_reader */
1165 void
1166 sdb_plugin_unregister_all(void)
1168         size_t i;
1170         for (i = 0; i < SDB_STATIC_ARRAY_LEN(all_lists); ++i) {
1171                 const char  *type =  all_lists[i].type;
1172                 sdb_llist_t *list = *all_lists[i].list;
1174                 size_t len = sdb_llist_len(list);
1176                 if (! len)
1177                         continue;
1179                 sdb_llist_clear(list);
1180                 sdb_log(SDB_LOG_INFO, "Unregistered %zu %s callback%s",
1181                                 len, type, len == 1 ? "" : "s");
1182         }
1183 } /* sdb_plugin_unregister_all */
1185 sdb_plugin_ctx_t
1186 sdb_plugin_get_ctx(void)
1188         ctx_t *c;
1190         c = ctx_get();
1191         if (! c) {
1192                 sdb_log(SDB_LOG_ERR, "Invalid read access to plugin "
1193                                 "context outside a plugin");
1194                 return plugin_default_ctx;
1195         }
1196         return c->public;
1197 } /* sdb_plugin_get_ctx */
1199 int
1200 sdb_plugin_set_ctx(sdb_plugin_ctx_t ctx, sdb_plugin_ctx_t *old)
1202         ctx_t *c;
1204         c = ctx_get();
1205         if (! c) {
1206                 sdb_log(SDB_LOG_ERR, "Invalid write access to plugin "
1207                                 "context outside a plugin");
1208                 return -1;
1209         }
1211         if (old)
1212                 *old = c->public;
1213         c->public = ctx;
1214         return 0;
1215 } /* sdb_plugin_set_ctx */
1217 const sdb_plugin_info_t *
1218 sdb_plugin_current(void)
1220         ctx_t *ctx = ctx_get();
1222         if (! ctx)
1223                 return NULL;
1224         return &ctx->info;
1225 } /* sdb_plugin_current */
1227 int
1228 sdb_plugin_configure(const char *name, oconfig_item_t *ci)
1230         callback_t *plugin;
1231         sdb_plugin_config_cb callback;
1233         ctx_t *old_ctx;
1235         int status;
1237         if ((! name) || (! ci))
1238                 return -1;
1240         plugin = CB(sdb_llist_search_by_name(config_list, name));
1241         if (! plugin) {
1242                 ctx_t *ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
1243                 if (! ctx)
1244                         sdb_log(SDB_LOG_ERR, "Cannot configure unknown plugin '%s'. "
1245                                         "Missing 'LoadPlugin \"%s\"'?",
1246                                         name, name);
1247                 else
1248                         sdb_log(SDB_LOG_ERR, "Plugin '%s' did not register "
1249                                         "a config callback.", name);
1250                 errno = ENOENT;
1251                 return -1;
1252         }
1254         old_ctx = ctx_set(plugin->cb_ctx);
1255         callback = (sdb_plugin_config_cb)plugin->cb_callback;
1256         status = callback(ci);
1257         ctx_set(old_ctx);
1258         return status;
1259 } /* sdb_plugin_configure */
1261 int
1262 sdb_plugin_reconfigure_init(void)
1264         sdb_llist_iter_t *iter;
1266         iter = sdb_llist_get_iter(config_list);
1267         if (config_list && (! iter))
1268                 return -1;
1270         /* deconfigure all plugins */
1271         while (sdb_llist_iter_has_next(iter)) {
1272                 callback_t *plugin;
1273                 sdb_plugin_config_cb callback;
1274                 ctx_t *old_ctx;
1276                 plugin = CB(sdb_llist_iter_get_next(iter));
1277                 old_ctx = ctx_set(plugin->cb_ctx);
1278                 callback = (sdb_plugin_config_cb)plugin->cb_callback;
1279                 callback(NULL);
1280                 ctx_set(old_ctx);
1281         }
1282         sdb_llist_iter_destroy(iter);
1284         iter = sdb_llist_get_iter(all_plugins);
1285         if (all_plugins && (! iter))
1286                 return -1;
1288         /* record all plugins as being unused */
1289         while (sdb_llist_iter_has_next(iter))
1290                 CTX(sdb_llist_iter_get_next(iter))->use_cnt = 0;
1291         sdb_llist_iter_destroy(iter);
1293         sdb_plugin_unregister_all();
1294         return 0;
1295 } /* sdb_plugin_reconfigure_init */
1297 int
1298 sdb_plugin_reconfigure_finish(void)
1300         sdb_llist_iter_t *iter;
1302         iter = sdb_llist_get_iter(all_plugins);
1303         if (all_plugins && (! iter))
1304                 return -1;
1306         while (sdb_llist_iter_has_next(iter)) {
1307                 ctx_t *ctx = CTX(sdb_llist_iter_get_next(iter));
1308                 if (ctx->use_cnt)
1309                         continue;
1311                 sdb_log(SDB_LOG_INFO, "Module %s no longer in use",
1312                                 ctx->info.plugin_name);
1313                 sdb_llist_iter_remove_current(iter);
1314                 plugin_unregister_by_name(ctx->info.plugin_name);
1315                 sdb_object_deref(SDB_OBJ(ctx));
1316         }
1317         sdb_llist_iter_destroy(iter);
1318         return 0;
1319 } /* sdb_plugin_reconfigure_finish */
1321 int
1322 sdb_plugin_init_all(void)
1324         sdb_llist_iter_t *iter;
1325         int ret = 0;
1327         iter = sdb_llist_get_iter(init_list);
1328         while (sdb_llist_iter_has_next(iter)) {
1329                 callback_t *cb;
1330                 sdb_plugin_init_cb callback;
1331                 ctx_t *old_ctx;
1333                 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1334                 assert(obj);
1335                 cb = CB(obj);
1337                 callback = (sdb_plugin_init_cb)cb->cb_callback;
1339                 old_ctx = ctx_set(cb->cb_ctx);
1340                 if (callback(cb->cb_user_data)) {
1341                         sdb_log(SDB_LOG_ERR, "Failed to initialize plugin '%s'. "
1342                                         "Unregistering all callbacks.", obj->name);
1343                         ctx_set(old_ctx);
1344                         plugin_unregister_by_name(cb->cb_ctx->info.plugin_name);
1345                         ++ret;
1346                 }
1347                 else
1348                         ctx_set(old_ctx);
1349         }
1350         sdb_llist_iter_destroy(iter);
1351         return ret;
1352 } /* sdb_plugin_init_all */
1354 int
1355 sdb_plugin_shutdown_all(void)
1357         sdb_llist_iter_t *iter;
1358         int ret = 0;
1360         iter = sdb_llist_get_iter(shutdown_list);
1361         while (sdb_llist_iter_has_next(iter)) {
1362                 callback_t *cb;
1363                 sdb_plugin_shutdown_cb callback;
1364                 ctx_t *old_ctx;
1366                 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1367                 assert(obj);
1368                 cb = CB(obj);
1370                 callback = (sdb_plugin_shutdown_cb)cb->cb_callback;
1372                 old_ctx = ctx_set(cb->cb_ctx);
1373                 if (callback(cb->cb_user_data)) {
1374                         sdb_log(SDB_LOG_ERR, "Failed to shutdown plugin '%s'.",
1375                                         obj->name);
1376                         ++ret;
1377                 }
1378                 ctx_set(old_ctx);
1379         }
1380         sdb_llist_iter_destroy(iter);
1381         return ret;
1382 } /* sdb_plugin_shutdown_all */
1384 int
1385 sdb_plugin_collector_loop(sdb_plugin_loop_t *loop)
1387         if (! collector_list) {
1388                 sdb_log(SDB_LOG_WARNING, "No collectors registered. "
1389                                 "Quiting main loop.");
1390                 return -1;
1391         }
1393         if (! loop)
1394                 return -1;
1396         while (loop->do_loop) {
1397                 sdb_plugin_collector_cb callback;
1398                 ctx_t *old_ctx;
1400                 sdb_time_t interval, now;
1402                 sdb_object_t *obj = sdb_llist_shift(collector_list);
1403                 if (! obj)
1404                         return -1;
1406                 callback = (sdb_plugin_collector_cb)CCB(obj)->ccb_callback;
1408                 if (! (now = sdb_gettime())) {
1409                         char errbuf[1024];
1410                         sdb_log(SDB_LOG_ERR, "Failed to determine current "
1411                                         "time in collector main loop: %s",
1412                                         sdb_strerror(errno, errbuf, sizeof(errbuf)));
1413                         now = CCB(obj)->ccb_next_update;
1414                 }
1416                 if (now < CCB(obj)->ccb_next_update) {
1417                         interval = CCB(obj)->ccb_next_update - now;
1419                         errno = 0;
1420                         while (loop->do_loop && sdb_sleep(interval, &interval)) {
1421                                 if (errno != EINTR) {
1422                                         char errbuf[1024];
1423                                         sdb_log(SDB_LOG_ERR, "Failed to sleep "
1424                                                         "in collector main loop: %s",
1425                                                         sdb_strerror(errno, errbuf, sizeof(errbuf)));
1426                                         sdb_llist_insert_sorted(collector_list, obj,
1427                                                         plugin_cmp_next_update);
1428                                         sdb_object_deref(obj);
1429                                         return -1;
1430                                 }
1431                                 errno = 0;
1432                         }
1434                         if (! loop->do_loop) {
1435                                 /* put back; don't worry about errors */
1436                                 sdb_llist_insert_sorted(collector_list, obj,
1437                                                 plugin_cmp_next_update);
1438                                 sdb_object_deref(obj);
1439                                 return 0;
1440                         }
1441                 }
1443                 old_ctx = ctx_set(CCB(obj)->ccb_ctx);
1444                 if (callback(CCB(obj)->ccb_user_data)) {
1445                         /* XXX */
1446                 }
1447                 ctx_set(old_ctx);
1449                 interval = CCB(obj)->ccb_interval;
1450                 if (! interval)
1451                         interval = loop->default_interval;
1452                 if (! interval) {
1453                         sdb_log(SDB_LOG_WARNING, "No interval configured "
1454                                         "for plugin '%s'; skipping any further "
1455                                         "iterations.", obj->name);
1456                         sdb_object_deref(obj);
1457                         continue;
1458                 }
1460                 CCB(obj)->ccb_next_update += interval;
1462                 if (! (now = sdb_gettime())) {
1463                         char errbuf[1024];
1464                         sdb_log(SDB_LOG_ERR, "Failed to determine current "
1465                                         "time in collector main loop: %s",
1466                                         sdb_strerror(errno, errbuf, sizeof(errbuf)));
1467                         now = CCB(obj)->ccb_next_update;
1468                 }
1470                 if (now > CCB(obj)->ccb_next_update) {
1471                         sdb_log(SDB_LOG_WARNING, "Plugin '%s' took too "
1472                                         "long; skipping iterations to keep up.",
1473                                         obj->name);
1474                         CCB(obj)->ccb_next_update = now;
1475                 }
1477                 if (sdb_llist_insert_sorted(collector_list, obj,
1478                                         plugin_cmp_next_update)) {
1479                         sdb_log(SDB_LOG_ERR, "Failed to re-insert plugin '%s' into "
1480                                         "collector list. Unable to further use the plugin.",
1481                                         obj->name);
1482                         sdb_object_deref(obj);
1483                         return -1;
1484                 }
1486                 /* pass control back to the list */
1487                 sdb_object_deref(obj);
1488         }
1489         return 0;
1490 } /* sdb_plugin_read_loop */
1492 char *
1493 sdb_plugin_cname(char *hostname)
1495         sdb_llist_iter_t *iter;
1497         if (! hostname)
1498                 return NULL;
1500         if (! cname_list)
1501                 return hostname;
1503         iter = sdb_llist_get_iter(cname_list);
1504         while (sdb_llist_iter_has_next(iter)) {
1505                 sdb_plugin_cname_cb callback;
1506                 char *cname;
1508                 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1509                 assert(obj);
1511                 callback = (sdb_plugin_cname_cb)CB(obj)->cb_callback;
1512                 cname = callback(hostname, CB(obj)->cb_user_data);
1513                 if (cname) {
1514                         free(hostname);
1515                         hostname = cname;
1516                 }
1517                 /* else: don't change hostname */
1518         }
1519         sdb_llist_iter_destroy(iter);
1520         return hostname;
1521 } /* sdb_plugin_cname */
1523 int
1524 sdb_plugin_log(int prio, const char *msg)
1526         const sdb_plugin_info_t *info;
1527         int ret = -1;
1529         if (! msg)
1530                 return 0;
1532         info = sdb_plugin_current();
1534         {
1535                 const char *plugin_name = info ? info->plugin_name : "";
1536                 char log_msg[strlen(msg) + strlen(plugin_name) + strlen(" plugin: ") + 1];
1538                 sdb_llist_iter_t *iter;
1539                 bool logged = 0;
1541                 if (*plugin_name)
1542                         snprintf(log_msg, sizeof(log_msg), "%s plugin: %s",
1543                                         plugin_name, msg);
1544                 else
1545                         strncpy(log_msg, msg, sizeof(log_msg));
1547                 iter = sdb_llist_get_iter(log_list);
1548                 while (sdb_llist_iter_has_next(iter)) {
1549                         sdb_plugin_log_cb callback;
1550                         int tmp;
1552                         sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1553                         assert(obj);
1555                         callback = (sdb_plugin_log_cb)CB(obj)->cb_callback;
1556                         tmp = callback(prio, msg, CB(obj)->cb_user_data);
1557                         if (tmp > ret)
1558                                 ret = tmp;
1560                         if (CB(obj)->cb_ctx)
1561                                 logged = 1;
1562                         /* else: this is an internally registered callback */
1563                 }
1564                 sdb_llist_iter_destroy(iter);
1566                 if (! logged)
1567                         return fprintf(stderr, "[%s] %s\n",
1568                                         SDB_LOG_PRIO_TO_STRING(prio), log_msg);
1569         }
1570         return ret;
1571 } /* sdb_plugin_log */
1573 int
1574 sdb_plugin_vlogf(int prio, const char *fmt, va_list ap)
1576         sdb_strbuf_t *buf;
1577         int ret;
1579         if (! fmt)
1580                 return 0;
1582         buf = sdb_strbuf_create(64);
1583         if (! buf) {
1584                 ret = fprintf(stderr, "[%s] ", SDB_LOG_PRIO_TO_STRING(prio));
1585                 ret += vfprintf(stderr, fmt, ap);
1586                 return ret;
1587         }
1589         if (sdb_strbuf_vsprintf(buf, fmt, ap) < 0) {
1590                 sdb_strbuf_destroy(buf);
1591                 return -1;
1592         }
1594         ret = sdb_plugin_log(prio, sdb_strbuf_string(buf));
1595         sdb_strbuf_destroy(buf);
1596         return ret;
1597 } /* sdb_plugin_vlogf */
1599 int
1600 sdb_plugin_logf(int prio, const char *fmt, ...)
1602         va_list ap;
1603         int ret;
1605         if (! fmt)
1606                 return 0;
1608         va_start(ap, fmt);
1609         ret = sdb_plugin_vlogf(prio, fmt, ap);
1610         va_end(ap);
1611         return ret;
1612 } /* sdb_plugin_logf */
1614 sdb_timeseries_t *
1615 sdb_plugin_fetch_timeseries(const char *type, const char *id,
1616                 sdb_timeseries_opts_t *opts)
1618         ts_fetcher_t *fetcher;
1619         sdb_timeseries_t *ts;
1621         ctx_t *old_ctx;
1623         if ((! type) || (! id) || (! opts))
1624                 return NULL;
1626         fetcher = TS_FETCHER(sdb_llist_search_by_name(timeseries_fetcher_list, type));
1627         if (! fetcher) {
1628                 sdb_log(SDB_LOG_ERR, "Cannot fetch time-series of type %s: "
1629                                 "no such plugin loaded", type);
1630                 errno = ENOENT;
1631                 return NULL;
1632         }
1634         old_ctx = ctx_set(fetcher->ts_ctx);
1635         ts = fetcher->impl.fetch(id, opts, fetcher->ts_user_data);
1636         ctx_set(old_ctx);
1637         return ts;
1638 } /* sdb_plugin_fetch_timeseries */
1640 sdb_timeseries_info_t *
1641 sdb_plugin_describe_timeseries(const char *type, const char *id)
1643         ts_fetcher_t *fetcher;
1644         sdb_timeseries_info_t *ts_info;
1646         ctx_t *old_ctx;
1648         if ((! type) || (! id))
1649                 return NULL;
1651         fetcher = TS_FETCHER(sdb_llist_search_by_name(timeseries_fetcher_list, type));
1652         if (! fetcher) {
1653                 sdb_log(SDB_LOG_ERR, "Cannot describe time-series of type %s: "
1654                                 "no such plugin loaded", type);
1655                 errno = ENOENT;
1656                 return NULL;
1657         }
1659         old_ctx = ctx_set(fetcher->ts_ctx);
1660         ts_info = fetcher->impl.describe(id, fetcher->ts_user_data);
1661         ctx_set(old_ctx);
1662         return ts_info;
1663 } /* sdb_plugin_describe_timeseries */
1665 int
1666 sdb_plugin_query(sdb_ast_node_t *ast,
1667                 sdb_store_writer_t *w, sdb_object_t *wd,
1668                 sdb_query_opts_t *opts, sdb_strbuf_t *errbuf)
1670         query_writer_t qw = QUERY_WRITER_INIT(w, wd);
1671         reader_t *reader;
1672         sdb_object_t *q;
1674         size_t n = sdb_llist_len(reader_list);
1675         int status = 0;
1677         if (! ast)
1678                 return 0;
1680         if (opts)
1681                 qw.opts = *opts;
1683         if ((ast->type != SDB_AST_TYPE_FETCH)
1684                         && (ast->type != SDB_AST_TYPE_LIST)
1685                         && (ast->type != SDB_AST_TYPE_LOOKUP)) {
1686                 sdb_log(SDB_LOG_ERR, "Cannot execute query of type %s",
1687                                 SDB_AST_TYPE_TO_STRING(ast));
1688                 sdb_strbuf_sprintf(errbuf, "Cannot execute query of type %s",
1689                                 SDB_AST_TYPE_TO_STRING(ast));
1690                 return -1;
1691         }
1693         if (n != 1) {
1694                 char *msg = (n > 0)
1695                         ? "Cannot execute query: multiple readers not supported"
1696                         : "Cannot execute query: no readers registered";
1697                 sdb_strbuf_sprintf(errbuf, "%s", msg);
1698                 sdb_log(SDB_LOG_ERR, "%s", msg);
1699                 return -1;
1700         }
1702         reader = READER(sdb_llist_get(reader_list, 0));
1703         assert(reader);
1705         q = reader->impl.prepare_query(ast, errbuf, reader->r_user_data);
1706         if (q)
1707                 status = reader->impl.execute_query(q, &query_writer, SDB_OBJ(&qw),
1708                                 errbuf, reader->r_user_data);
1709         else
1710                 status = -1;
1712         sdb_object_deref(SDB_OBJ(q));
1713         sdb_object_deref(SDB_OBJ(reader));
1714         return status;
1715 } /* sdb_plugin_query */
1717 int
1718 sdb_plugin_store_host(const char *name, sdb_time_t last_update)
1720         sdb_store_host_t host = SDB_STORE_HOST_INIT;
1721         char *backends[1];
1722         char *cname;
1724         sdb_llist_iter_t *iter;
1725         int status = 0;
1727         if (! name)
1728                 return -1;
1730         if (! sdb_llist_len(writer_list)) {
1731                 sdb_log(SDB_LOG_ERR, "Cannot store host: no writers registered");
1732                 return -1;
1733         }
1735         cname = sdb_plugin_cname(strdup(name));
1736         if (! cname) {
1737                 sdb_log(SDB_LOG_ERR, "strdup failed");
1738                 return -1;
1739         }
1741         host.name = cname;
1742         host.last_update = last_update ? last_update : sdb_gettime();
1743         if (get_interval(SDB_HOST, NULL, -1, NULL, cname,
1744                                 host.last_update, &host.interval)) {
1745                 free(cname);
1746                 return 1;
1747         }
1748         host.backends = (const char * const *)backends;
1749         get_backend(backends, &host.backends_num);
1751         iter = sdb_llist_get_iter(writer_list);
1752         while (sdb_llist_iter_has_next(iter)) {
1753                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1754                 int s;
1755                 assert(writer);
1756                 s = writer->impl.store_host(&host, writer->w_user_data);
1757                 if (((s > 0) && (status >= 0)) || (s < 0))
1758                         status = s;
1759         }
1760         sdb_llist_iter_destroy(iter);
1761         free(cname);
1762         return status;
1763 } /* sdb_plugin_store_host */
1765 int
1766 sdb_plugin_store_service(const char *hostname, const char *name,
1767                 sdb_time_t last_update)
1769         sdb_store_service_t service = SDB_STORE_SERVICE_INIT;
1770         char *backends[1];
1771         char *cname;
1773         sdb_llist_iter_t *iter;
1774         sdb_data_t d;
1776         int status = 0;
1778         if ((! hostname) || (! name))
1779                 return -1;
1781         if (! sdb_llist_len(writer_list)) {
1782                 sdb_log(SDB_LOG_ERR, "Cannot store service: "
1783                                 "no writers registered");
1784                 return -1;
1785         }
1787         cname = sdb_plugin_cname(strdup(hostname));
1788         if (! cname) {
1789                 sdb_log(SDB_LOG_ERR, "strdup failed");
1790                 return -1;
1791         }
1793         service.hostname = cname;
1794         service.name = name;
1795         service.last_update = last_update ? last_update : sdb_gettime();
1796         if (get_interval(SDB_SERVICE, cname, -1, NULL, name,
1797                                 service.last_update, &service.interval)) {
1798                 free(cname);
1799                 return 1;
1800         }
1801         service.backends = (const char * const *)backends;
1802         get_backend(backends, &service.backends_num);
1804         iter = sdb_llist_get_iter(writer_list);
1805         while (sdb_llist_iter_has_next(iter)) {
1806                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1807                 int s;
1808                 assert(writer);
1809                 s = writer->impl.store_service(&service, writer->w_user_data);
1810                 if (((s > 0) && (status >= 0)) || (s < 0))
1811                         status = s;
1812         }
1813         sdb_llist_iter_destroy(iter);
1815         if (! status) {
1816                 /* record the hostname as an attribute */
1817                 d.type = SDB_TYPE_STRING;
1818                 d.data.string = cname;
1819                 if (sdb_plugin_store_service_attribute(cname, name,
1820                                         "hostname", &d, service.last_update))
1821                         status = -1;
1822         }
1824         free(cname);
1825         return status;
1826 } /* sdb_plugin_store_service */
1828 int
1829 sdb_plugin_store_metric(const char *hostname, const char *name,
1830                 sdb_metric_store_t *store, sdb_time_t last_update)
1832         sdb_store_metric_t metric = SDB_STORE_METRIC_INIT;
1833         char *backends[1];
1834         char *cname;
1836         sdb_llist_iter_t *iter;
1837         sdb_data_t d;
1839         int status = 0;
1841         if ((! hostname) || (! name))
1842                 return -1;
1844         if (! sdb_llist_len(writer_list)) {
1845                 sdb_log(SDB_LOG_ERR, "Cannot store metric: no writers registered");
1846                 return -1;
1847         }
1849         cname = sdb_plugin_cname(strdup(hostname));
1850         if (! cname) {
1851                 sdb_log(SDB_LOG_ERR, "strdup failed");
1852                 return -1;
1853         }
1855         if (store && ((! store->type) || (! store->id)))
1856                 store = NULL;
1858         metric.hostname = cname;
1859         metric.name = name;
1860         if (store) {
1861                 if (store->last_update < last_update)
1862                         store->last_update = last_update;
1863                 metric.stores = store;
1864                 metric.stores_num = 1;
1865         }
1866         metric.last_update = last_update ? last_update : sdb_gettime();
1867         if (get_interval(SDB_METRIC, cname, -1, NULL, name,
1868                                 metric.last_update, &metric.interval)) {
1869                 free(cname);
1870                 return 1;
1871         }
1872         metric.backends = (const char * const *)backends;
1873         get_backend(backends, &metric.backends_num);
1875         iter = sdb_llist_get_iter(writer_list);
1876         while (sdb_llist_iter_has_next(iter)) {
1877                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1878                 int s;
1879                 assert(writer);
1880                 s = writer->impl.store_metric(&metric, writer->w_user_data);
1881                 if (((s > 0) && (status >= 0)) || (s < 0))
1882                         status = s;
1883         }
1884         sdb_llist_iter_destroy(iter);
1886         if (! status) {
1887                 /* record the hostname as an attribute */
1888                 d.type = SDB_TYPE_STRING;
1889                 d.data.string = cname;
1890                 if (sdb_plugin_store_metric_attribute(cname, name,
1891                                         "hostname", &d, metric.last_update))
1892                         status = -1;
1893         }
1895         free(cname);
1896         return status;
1897 } /* sdb_plugin_store_metric */
1899 int
1900 sdb_plugin_store_attribute(const char *hostname, const char *key,
1901                 const sdb_data_t *value, sdb_time_t last_update)
1903         sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1904         char *backends[1];
1905         char *cname;
1907         sdb_llist_iter_t *iter;
1908         int status = 0;
1910         if ((! hostname) || (! key) || (! value))
1911                 return -1;
1913         if (! sdb_llist_len(writer_list)) {
1914                 sdb_log(SDB_LOG_ERR, "Cannot store attribute: no writers registered");
1915                 return -1;
1916         }
1918         cname = sdb_plugin_cname(strdup(hostname));
1919         if (! cname) {
1920                 sdb_log(SDB_LOG_ERR, "strdup failed");
1921                 return -1;
1922         }
1924         attr.parent_type = SDB_HOST;
1925         attr.parent = cname;
1926         attr.key = key;
1927         attr.value = *value;
1928         attr.last_update = last_update ? last_update : sdb_gettime();
1929         if (get_interval(SDB_ATTRIBUTE, cname, -1, NULL, key,
1930                                 attr.last_update, &attr.interval)) {
1931                 free(cname);
1932                 return 1;
1933         }
1934         attr.backends = (const char * const *)backends;
1935         get_backend(backends, &attr.backends_num);
1937         iter = sdb_llist_get_iter(writer_list);
1938         while (sdb_llist_iter_has_next(iter)) {
1939                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1940                 int s;
1941                 assert(writer);
1942                 s = writer->impl.store_attribute(&attr, writer->w_user_data);
1943                 if (((s > 0) && (status >= 0)) || (s < 0))
1944                         status = s;
1945         }
1946         sdb_llist_iter_destroy(iter);
1947         free(cname);
1948         return status;
1949 } /* sdb_plugin_store_attribute */
1951 int
1952 sdb_plugin_store_service_attribute(const char *hostname, const char *service,
1953                 const char *key, const sdb_data_t *value, sdb_time_t last_update)
1955         sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1956         char *backends[1];
1957         char *cname;
1959         sdb_llist_iter_t *iter;
1960         int status = 0;
1962         if ((! hostname) || (! service) || (! key) || (! value))
1963                 return -1;
1965         if (! sdb_llist_len(writer_list)) {
1966                 sdb_log(SDB_LOG_ERR, "Cannot store service attribute: "
1967                                 "no writers registered");
1968                 return -1;
1969         }
1971         cname = sdb_plugin_cname(strdup(hostname));
1972         if (! cname) {
1973                 sdb_log(SDB_LOG_ERR, "strdup failed");
1974                 return -1;
1975         }
1977         attr.hostname = cname;
1978         attr.parent_type = SDB_SERVICE;
1979         attr.parent = service;
1980         attr.key = key;
1981         attr.value = *value;
1982         attr.last_update = last_update ? last_update : sdb_gettime();
1983         if (get_interval(SDB_ATTRIBUTE, cname, SDB_SERVICE, service, key,
1984                                 attr.last_update, &attr.interval)) {
1985                 free(cname);
1986                 return 1;
1987         }
1988         attr.backends = (const char * const *)backends;
1989         get_backend(backends, &attr.backends_num);
1991         iter = sdb_llist_get_iter(writer_list);
1992         while (sdb_llist_iter_has_next(iter)) {
1993                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1994                 int s;
1995                 assert(writer);
1996                 s = writer->impl.store_attribute(&attr, writer->w_user_data);
1997                 if (((s > 0) && (status >= 0)) || (s < 0))
1998                         status = s;
1999         }
2000         sdb_llist_iter_destroy(iter);
2001         free(cname);
2002         return status;
2003 } /* sdb_plugin_store_service_attribute */
2005 int
2006 sdb_plugin_store_metric_attribute(const char *hostname, const char *metric,
2007                 const char *key, const sdb_data_t *value, sdb_time_t last_update)
2009         sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
2010         char *backends[1];
2011         char *cname;
2013         sdb_llist_iter_t *iter;
2014         int status = 0;
2016         if ((! hostname) || (! metric) || (! key) || (! value))
2017                 return -1;
2019         if (! sdb_llist_len(writer_list)) {
2020                 sdb_log(SDB_LOG_ERR, "Cannot store metric attribute: "
2021                                 "no writers registered");
2022                 return -1;
2023         }
2025         cname = sdb_plugin_cname(strdup(hostname));
2026         if (! cname) {
2027                 sdb_log(SDB_LOG_ERR, "strdup failed");
2028                 return -1;
2029         }
2031         attr.hostname = cname;
2032         attr.parent_type = SDB_METRIC;
2033         attr.parent = metric;
2034         attr.key = key;
2035         attr.value = *value;
2036         attr.last_update = last_update ? last_update : sdb_gettime();
2037         if (get_interval(SDB_ATTRIBUTE, cname, SDB_METRIC, metric, key,
2038                                 attr.last_update, &attr.interval)) {
2039                 free(cname);
2040                 return 1;
2041         }
2042         attr.backends = (const char * const *)backends;
2043         get_backend(backends, &attr.backends_num);
2045         iter = sdb_llist_get_iter(writer_list);
2046         while (sdb_llist_iter_has_next(iter)) {
2047                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
2048                 int s;
2049                 assert(writer);
2050                 s = writer->impl.store_attribute(&attr, writer->w_user_data);
2051                 if (((s > 0) && (status >= 0)) || (s < 0))
2052                         status = s;
2053         }
2054         sdb_llist_iter_destroy(iter);
2055         free(cname);
2056         return status;
2057 } /* sdb_plugin_store_metric_attribute */
2059 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */