Code

plugin: Dynamically populate timeseries information when querying metrics.
[sysdb.git] / src / core / plugin.c
1 /*
2  * SysDB - src/core/plugin.c
3  * Copyright (C) 2012 Sebastian 'tokkee' Harl <sh@tokkee.org>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
19  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
28 #if HAVE_CONFIG_H
29 #       include "config.h"
30 #endif /* HAVE_CONFIG_H */
32 #include "sysdb.h"
33 #include "core/plugin.h"
34 #include "core/time.h"
35 #include "utils/error.h"
36 #include "utils/llist.h"
37 #include "utils/strbuf.h"
39 #include <assert.h>
41 #include <errno.h>
43 #include <stdarg.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <strings.h>
47 #include <unistd.h>
49 #include <ltdl.h>
51 #include <pthread.h>
53 /* helper to access info attributes */
54 #define INFO_GET(i, attr) \
55         ((i)->attr ? (i)->attr : #attr" not set")
57 /*
58  * private data types
59  */
61 typedef struct {
62         sdb_object_t super;
63         sdb_plugin_ctx_t public;
65         sdb_plugin_info_t info;
66         lt_dlhandle handle;
68         /* The usage count differs from the object's ref count
69          * in that it provides higher level information about how
70          * the plugin is in use. */
71         size_t use_cnt;
72 } ctx_t;
73 #define CTX_INIT { SDB_OBJECT_INIT, \
74         SDB_PLUGIN_CTX_INIT, SDB_PLUGIN_INFO_INIT, NULL, 0 }
76 #define CTX(obj) ((ctx_t *)(obj))
78 typedef struct {
79         sdb_object_t super;
80         void *cb_callback;
81         sdb_object_t *cb_user_data;
82         ctx_t *cb_ctx;
83 } callback_t;
84 #define CB_INIT { SDB_OBJECT_INIT, \
85         /* callback = */ NULL, /* user_data = */ NULL, \
86         SDB_PLUGIN_CTX_INIT }
87 #define CB(obj) ((callback_t *)(obj))
88 #define CONST_CB(obj) ((const callback_t *)(obj))
90 typedef struct {
91         callback_t super;
92 #define ccb_callback super.cb_callback
93 #define ccb_user_data super.cb_user_data
94 #define ccb_ctx super.cb_ctx
95         sdb_time_t ccb_interval;
96         sdb_time_t ccb_next_update;
97 } collector_t;
98 #define CCB(obj) ((collector_t *)(obj))
99 #define CONST_CCB(obj) ((const collector_t *)(obj))
101 typedef struct {
102         callback_t super; /* cb_callback will always be NULL */
103 #define w_user_data super.cb_user_data
104 #define w_ctx super.cb_ctx
105         sdb_store_writer_t impl;
106 } writer_t;
107 #define WRITER(obj) ((writer_t *)(obj))
109 typedef struct {
110         callback_t super; /* cb_callback will always be NULL */
111 #define r_user_data super.cb_user_data
112 #define r_ctx super.cb_ctx
113         sdb_store_reader_t impl;
114 } reader_t;
115 #define READER(obj) ((reader_t *)(obj))
117 typedef struct {
118         callback_t super; /* cb_callback will always be NULL */
119 #define ts_user_data super.cb_user_data
120 #define ts_ctx super.cb_ctx
121         sdb_timeseries_fetcher_t impl;
122 } ts_fetcher_t;
123 #define TS_FETCHER(obj) ((ts_fetcher_t *)(obj))
125 /*
126  * private variables
127  */
129 static sdb_plugin_ctx_t  plugin_default_ctx  = SDB_PLUGIN_CTX_INIT;
130 static sdb_plugin_info_t plugin_default_info = SDB_PLUGIN_INFO_INIT;
132 static pthread_key_t     plugin_ctx_key;
133 static bool              plugin_ctx_key_initialized = 0;
135 /* a list of the plugin contexts of all registered plugins */
136 static sdb_llist_t      *all_plugins = NULL;
138 static sdb_llist_t      *config_list = NULL;
139 static sdb_llist_t      *init_list = NULL;
140 static sdb_llist_t      *collector_list = NULL;
141 static sdb_llist_t      *cname_list = NULL;
142 static sdb_llist_t      *shutdown_list = NULL;
143 static sdb_llist_t      *log_list = NULL;
144 static sdb_llist_t      *timeseries_fetcher_list = NULL;
145 static sdb_llist_t      *writer_list = NULL;
146 static sdb_llist_t      *reader_list = NULL;
148 static struct {
149         const char   *type;
150         sdb_llist_t **list;
151 } all_lists[] = {
152         { "config",             &config_list },
153         { "init",               &init_list },
154         { "collector",          &collector_list },
155         { "cname",              &cname_list },
156         { "shutdown",           &shutdown_list },
157         { "log",                &log_list },
158         { "timeseries fetcher", &timeseries_fetcher_list },
159         { "store writer",       &writer_list },
160         { "store reader",       &reader_list },
161 };
163 /*
164  * private helper functions
165  */
167 static void
168 plugin_info_clear(sdb_plugin_info_t *info)
170         sdb_plugin_info_t empty_info = SDB_PLUGIN_INFO_INIT;
171         if (! info)
172                 return;
174         if (info->plugin_name)
175                 free(info->plugin_name);
176         if (info->filename)
177                 free(info->filename);
179         if (info->description)
180                 free(info->description);
181         if (info->copyright)
182                 free(info->copyright);
183         if (info->license)
184                 free(info->license);
186         *info = empty_info;
187 } /* plugin_info_clear */
189 static void
190 ctx_key_init(void)
192         if (plugin_ctx_key_initialized)
193                 return;
195         pthread_key_create(&plugin_ctx_key, /* destructor */ NULL);
196         plugin_ctx_key_initialized = 1;
197 } /* ctx_key_init */
199 static int
200 plugin_cmp_next_update(const sdb_object_t *a, const sdb_object_t *b)
202         const collector_t *ccb1 = (const collector_t *)a;
203         const collector_t *ccb2 = (const collector_t *)b;
205         assert(ccb1 && ccb2);
207         return (ccb1->ccb_next_update > ccb2->ccb_next_update)
208                 ? 1 : (ccb1->ccb_next_update < ccb2->ccb_next_update)
209                 ? -1 : 0;
210 } /* plugin_cmp_next_update */
212 static int
213 plugin_lookup_by_name(const sdb_object_t *obj, const void *id)
215         const callback_t *cb = CONST_CB(obj);
216         const char *name = id;
218         assert(cb && id);
220         /* when a plugin was registered from outside a plugin (e.g. the core),
221          * we don't have a plugin context */
222         if (! cb->cb_ctx)
223                 return 1;
225         if (!strcasecmp(cb->cb_ctx->info.plugin_name, name))
226                 return 0;
227         return 1;
228 } /* plugin_lookup_by_name */
230 /* since this function is called from sdb_plugin_reconfigure_finish()
231  * when iterating through all_plugins, we may not do any additional
232  * modifications to all_plugins except for the optional removal */
233 static void
234 plugin_unregister_by_name(const char *plugin_name)
236         sdb_object_t *obj;
237         size_t i;
239         for (i = 0; i < SDB_STATIC_ARRAY_LEN(all_lists); ++i) {
240                 const char  *type =  all_lists[i].type;
241                 sdb_llist_t *list = *all_lists[i].list;
243                 while (1) {
244                         callback_t *cb;
246                         cb = CB(sdb_llist_remove(list,
247                                                 plugin_lookup_by_name, plugin_name));
248                         if (! cb)
249                                 break;
251                         assert(cb->cb_ctx);
253                         sdb_log(SDB_LOG_INFO, "core: Unregistering "
254                                         "%s callback '%s' (module %s)", type, cb->super.name,
255                                         cb->cb_ctx->info.plugin_name);
256                         sdb_object_deref(SDB_OBJ(cb));
257                 }
258         }
260         obj = sdb_llist_search_by_name(all_plugins, plugin_name);
261         /* when called from sdb_plugin_reconfigure_finish, the object has already
262          * been removed from the list */
263         if (obj && (obj->ref_cnt <= 1)) {
264                 sdb_llist_remove_by_name(all_plugins, plugin_name);
265                 sdb_object_deref(obj);
266         }
267         /* else: other callbacks still reference it */
268 } /* plugin_unregister_by_name */
270 /*
271  * store writer wrapper for performing database queries:
272  * It wraps another store writer, adding extra logic as needed.
273  */
275 typedef struct {
276         sdb_object_t super;
277         sdb_store_writer_t *w;
278         sdb_object_t *ud;
279 } query_writer_t;
280 #define QUERY_WRITER_INIT(w, ud) { SDB_OBJECT_INIT, (w), (ud) }
281 #define QUERY_WRITER(obj) ((query_writer_t *)(obj))
283 static int
284 query_store_host(sdb_store_host_t *host, sdb_object_t *user_data)
286         query_writer_t *qw = QUERY_WRITER(user_data);
287         return qw->w->store_host(host, qw->ud);
288 } /* query_store_host */
290 static int
291 query_store_service(sdb_store_service_t *service, sdb_object_t *user_data)
293         query_writer_t *qw = QUERY_WRITER(user_data);
294         return qw->w->store_service(service, qw->ud);
295 } /* query_store_service */
297 static int
298 query_store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
300         query_writer_t *qw = QUERY_WRITER(user_data);
301         sdb_timeseries_info_t *infos[metric->stores_num];
302         sdb_metric_store_t stores[metric->stores_num];
304         const sdb_metric_store_t *orig_stores = metric->stores;
305         int status;
306         size_t i;
308         for (i = 0; i < metric->stores_num; i++) {
309                 /* TODO: Make this optional using query options. */
310                 sdb_metric_store_t *s = stores + i;
311                 *s = metric->stores[i];
312                 infos[i] = sdb_plugin_describe_timeseries(s->type, s->id);
313                 s->info = infos[i];
314         }
316         metric->stores = stores;
317         status = qw->w->store_metric(metric, qw->ud);
318         metric->stores = orig_stores;
320         for (i = 0; i < metric->stores_num; i++)
321                 sdb_timeseries_info_destroy(infos[i]);
322         return status;
323 } /* query_store_metric */
325 static int
326 query_store_attribute(sdb_store_attribute_t *attr, sdb_object_t *user_data)
328         query_writer_t *qw = QUERY_WRITER(user_data);
329         return qw->w->store_attribute(attr, qw->ud);
330 } /* query_store_attribute */
332 static sdb_store_writer_t query_writer = {
333         query_store_host, query_store_service,
334         query_store_metric, query_store_attribute,
335 };
337 /*
338  * private types
339  */
341 static int
342 ctx_init(sdb_object_t *obj, va_list __attribute__((unused)) ap)
344         ctx_t *ctx = CTX(obj);
346         assert(ctx);
348         ctx->public = plugin_default_ctx;
349         ctx->info = plugin_default_info;
350         ctx->handle = NULL;
351         ctx->use_cnt = 1;
352         return 0;
353 } /* ctx_init */
355 static void
356 ctx_destroy(sdb_object_t *obj)
358         ctx_t *ctx = CTX(obj);
360         if (ctx->handle) {
361                 const char *err;
363                 sdb_log(SDB_LOG_INFO, "core: Unloading module %s",
364                                 ctx->info.plugin_name);
366                 lt_dlerror();
367                 lt_dlclose(ctx->handle);
368                 if ((err = lt_dlerror()))
369                         sdb_log(SDB_LOG_WARNING, "core: Failed to unload module %s: %s",
370                                         ctx->info.plugin_name, err);
371         }
373         plugin_info_clear(&ctx->info);
374 } /* ctx_destroy */
376 static sdb_type_t ctx_type = {
377         sizeof(ctx_t),
379         ctx_init,
380         ctx_destroy
381 };
383 static ctx_t *
384 ctx_get(void)
386         if (! plugin_ctx_key_initialized)
387                 ctx_key_init();
388         return pthread_getspecific(plugin_ctx_key);
389 } /* ctx_get */
391 static ctx_t *
392 ctx_set(ctx_t *new)
394         ctx_t *old;
396         if (! plugin_ctx_key_initialized)
397                 ctx_key_init();
399         old = pthread_getspecific(plugin_ctx_key);
400         if (old)
401                 sdb_object_deref(SDB_OBJ(old));
402         if (new)
403                 sdb_object_ref(SDB_OBJ(new));
404         pthread_setspecific(plugin_ctx_key, new);
405         return old;
406 } /* ctx_set */
408 static ctx_t *
409 ctx_create(const char *name)
411         ctx_t *ctx;
413         ctx = CTX(sdb_object_create(name, ctx_type));
414         if (! ctx)
415                 return NULL;
417         if (! plugin_ctx_key_initialized)
418                 ctx_key_init();
419         ctx_set(ctx);
420         return ctx;
421 } /* ctx_create */
423 /*
424  * plugin_init_ok:
425  * Checks whether the registration of a new plugin identified by 'obj' is
426  * okay. It consumes the first two arguments of 'ap'.
427  */
428 static bool
429 plugin_init_ok(sdb_object_t *obj, va_list ap)
431         sdb_llist_t **list = va_arg(ap, sdb_llist_t **);
432         const char *type = va_arg(ap, const char *);
434         assert(list); assert(type);
436         if (sdb_llist_search_by_name(*list, obj->name)) {
437                 sdb_log(SDB_LOG_WARNING, "core: %s callback '%s' "
438                                 "has already been registered. Ignoring newly "
439                                 "registered version.", type, obj->name);
440                 return 0;
441         }
442         return 1;
443 } /* plugin_init_ok */
445 static int
446 plugin_cb_init(sdb_object_t *obj, va_list ap)
448         void *callback;
449         sdb_object_t *ud;
451         if (! plugin_init_ok(obj, ap))
452                 return -1;
454         callback = va_arg(ap, void *);
455         ud = va_arg(ap, sdb_object_t *);
457         /* cb_ctx may be NULL if the plugin was not registered by a plugin */
459         CB(obj)->cb_callback = callback;
460         CB(obj)->cb_ctx      = ctx_get();
461         sdb_object_ref(SDB_OBJ(CB(obj)->cb_ctx));
463         sdb_object_ref(ud);
464         CB(obj)->cb_user_data = ud;
465         return 0;
466 } /* plugin_cb_init */
468 static void
469 plugin_cb_destroy(sdb_object_t *obj)
471         assert(obj);
472         sdb_object_deref(CB(obj)->cb_user_data);
473         sdb_object_deref(SDB_OBJ(CB(obj)->cb_ctx));
474 } /* plugin_cb_destroy */
476 static sdb_type_t callback_type = {
477         sizeof(callback_t),
479         plugin_cb_init,
480         plugin_cb_destroy
481 };
483 static sdb_type_t collector_type = {
484         sizeof(collector_t),
486         plugin_cb_init,
487         plugin_cb_destroy
488 };
490 static int
491 plugin_writer_init(sdb_object_t *obj, va_list ap)
493         sdb_store_writer_t *impl;
494         sdb_object_t *ud;
496         if (! plugin_init_ok(obj, ap))
497                 return -1;
499         impl = va_arg(ap, sdb_store_writer_t *);
500         ud = va_arg(ap, sdb_object_t *);
501         assert(impl);
503         if ((! impl->store_host) || (! impl->store_service)
504                         || (! impl->store_metric) || (! impl->store_attribute)) {
505                 sdb_log(SDB_LOG_ERR, "core: store writer callback '%s' "
506                                 "does not fully implement the writer interface.",
507                                 obj->name);
508                 return -1;
509         }
511         /* ctx may be NULL if the callback was not registered by a plugin */
513         WRITER(obj)->impl = *impl;
514         WRITER(obj)->w_ctx  = ctx_get();
515         sdb_object_ref(SDB_OBJ(WRITER(obj)->w_ctx));
517         sdb_object_ref(ud);
518         WRITER(obj)->w_user_data = ud;
519         return 0;
520 } /* plugin_writer_init */
522 static void
523 plugin_writer_destroy(sdb_object_t *obj)
525         assert(obj);
526         sdb_object_deref(WRITER(obj)->w_user_data);
527         sdb_object_deref(SDB_OBJ(WRITER(obj)->w_ctx));
528 } /* plugin_writer_destroy */
530 static sdb_type_t writer_type = {
531         sizeof(writer_t),
533         plugin_writer_init,
534         plugin_writer_destroy
535 };
537 static int
538 plugin_reader_init(sdb_object_t *obj, va_list ap)
540         sdb_store_reader_t *impl;
541         sdb_object_t *ud;
543         if (! plugin_init_ok(obj, ap))
544                 return -1;
546         impl = va_arg(ap, sdb_store_reader_t *);
547         ud = va_arg(ap, sdb_object_t *);
548         assert(impl);
550         if ((! impl->prepare_query) || (! impl->execute_query)) {
551                 sdb_log(SDB_LOG_ERR, "core: store reader callback '%s' "
552                                 "does not fully implement the reader interface.",
553                                 obj->name);
554                 return -1;
555         }
557         /* ctx may be NULL if the callback was not registered by a plugin */
559         READER(obj)->impl = *impl;
560         READER(obj)->r_ctx  = ctx_get();
561         sdb_object_ref(SDB_OBJ(READER(obj)->r_ctx));
563         sdb_object_ref(ud);
564         READER(obj)->r_user_data = ud;
565         return 0;
566 } /* plugin_reader_init */
568 static void
569 plugin_reader_destroy(sdb_object_t *obj)
571         assert(obj);
572         sdb_object_deref(READER(obj)->r_user_data);
573         sdb_object_deref(SDB_OBJ(READER(obj)->r_ctx));
574 } /* plugin_reader_destroy */
576 static sdb_type_t reader_type = {
577         sizeof(reader_t),
579         plugin_reader_init,
580         plugin_reader_destroy
581 };
583 static int
584 plugin_ts_fetcher_init(sdb_object_t *obj, va_list ap)
586         sdb_timeseries_fetcher_t *impl;
587         sdb_object_t *ud;
589         if (! plugin_init_ok(obj, ap))
590                 return -1;
592         impl = va_arg(ap, sdb_timeseries_fetcher_t *);
593         ud = va_arg(ap, sdb_object_t *);
594         assert(impl);
596         if ((! impl->describe) || (! impl->fetch)) {
597                 sdb_log(SDB_LOG_ERR, "core: timeseries fetcher callback '%s' "
598                                 "does not fully implement the interface.",
599                                 obj->name);
600                 return -1;
601         }
603         /* ctx may be NULL if the callback was not registered by a plugin */
605         TS_FETCHER(obj)->impl = *impl;
606         TS_FETCHER(obj)->ts_ctx  = ctx_get();
607         sdb_object_ref(SDB_OBJ(TS_FETCHER(obj)->ts_ctx));
609         sdb_object_ref(ud);
610         TS_FETCHER(obj)->ts_user_data = ud;
611         return 0;
612 } /* plugin_ts_fetcher_init */
614 static void
615 plugin_ts_fetcher_destroy(sdb_object_t *obj)
617         assert(obj);
618         sdb_object_deref(TS_FETCHER(obj)->ts_user_data);
619         sdb_object_deref(SDB_OBJ(TS_FETCHER(obj)->ts_ctx));
620 } /* plugin_ts_fetcher_destroy */
622 static sdb_type_t ts_fetcher_type = {
623         sizeof(ts_fetcher_t),
625         plugin_ts_fetcher_init,
626         plugin_ts_fetcher_destroy
627 };
629 static int
630 module_init(const char *name, lt_dlhandle lh, sdb_plugin_info_t *info)
632         int (*mod_init)(sdb_plugin_info_t *);
633         int status;
635         mod_init = (int (*)(sdb_plugin_info_t *))lt_dlsym(lh, "sdb_module_init");
636         if (! mod_init) {
637                 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': "
638                                 "could not find symbol 'sdb_module_init'", name);
639                 return -1;
640         }
642         status = mod_init(info);
643         if (status) {
644                 sdb_log(SDB_LOG_ERR, "core: Failed to initialize "
645                                 "module '%s'", name);
646                 plugin_unregister_by_name(name);
647                 return -1;
648         }
649         return 0;
650 } /* module_init */
652 static int
653 module_load(const char *basedir, const char *name,
654                 const sdb_plugin_ctx_t *plugin_ctx)
656         char  base_name[name ? strlen(name) + 1 : 1];
657         const char *name_ptr;
658         char *tmp;
660         char filename[1024];
661         lt_dlhandle lh;
663         ctx_t *ctx;
665         int status;
667         assert(name);
669         base_name[0] = '\0';
670         name_ptr = name;
672         while ((tmp = strstr(name_ptr, "::"))) {
673                 strncat(base_name, name_ptr, (size_t)(tmp - name_ptr));
674                 strcat(base_name, "/");
675                 name_ptr = tmp + strlen("::");
676         }
677         strcat(base_name, name_ptr);
679         if (! basedir)
680                 basedir = PKGLIBDIR;
682         snprintf(filename, sizeof(filename), "%s/%s.so", basedir, base_name);
683         filename[sizeof(filename) - 1] = '\0';
685         if (access(filename, R_OK)) {
686                 char errbuf[1024];
687                 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s' (%s): %s",
688                                 name, filename, sdb_strerror(errno, errbuf, sizeof(errbuf)));
689                 return -1;
690         }
692         lt_dlinit();
693         lt_dlerror();
695         lh = lt_dlopen(filename);
696         if (! lh) {
697                 sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': %s"
698                                 "The most common cause for this problem are missing "
699                                 "dependencies.\n", name, lt_dlerror());
700                 return -1;
701         }
703         if (ctx_get())
704                 sdb_log(SDB_LOG_WARNING, "core: Discarding old plugin context");
706         ctx = ctx_create(name);
707         if (! ctx) {
708                 sdb_log(SDB_LOG_ERR, "core: Failed to initialize plugin context");
709                 return -1;
710         }
712         ctx->info.plugin_name = strdup(name);
713         ctx->info.filename = strdup(filename);
714         ctx->handle = lh;
716         if (plugin_ctx)
717                 ctx->public = *plugin_ctx;
719         if ((status = module_init(name, lh, &ctx->info))) {
720                 sdb_object_deref(SDB_OBJ(ctx));
721                 return status;
722         }
724         /* compare minor version */
725         if ((ctx->info.version < 0)
726                         || ((int)(ctx->info.version / 100) != (int)(SDB_VERSION / 100)))
727                 sdb_log(SDB_LOG_WARNING, "core: WARNING: version of "
728                                 "plugin '%s' (%i.%i.%i) does not match our version "
729                                 "(%i.%i.%i); this might cause problems",
730                                 name, SDB_VERSION_DECODE(ctx->info.version),
731                                 SDB_VERSION_DECODE(SDB_VERSION));
733         sdb_llist_append(all_plugins, SDB_OBJ(ctx));
735         sdb_log(SDB_LOG_INFO, "core: Successfully loaded "
736                         "plugin %s v%i (%s)", ctx->info.plugin_name,
737                         ctx->info.plugin_version,
738                         INFO_GET(&ctx->info, description));
739         sdb_log(SDB_LOG_INFO, "core: Plugin %s: %s, License: %s",
740                         ctx->info.plugin_name,
741                         INFO_GET(&ctx->info, copyright),
742                         INFO_GET(&ctx->info, license));
744         /* any registered callbacks took ownership of the context */
745         sdb_object_deref(SDB_OBJ(ctx));
747         /* reset */
748         ctx_set(NULL);
749         return 0;
750 } /* module_load */
752 static char *
753 plugin_get_name(const char *name, char *buf, size_t bufsize)
755         ctx_t *ctx = ctx_get();
757         if (ctx)
758                 snprintf(buf, bufsize, "%s::%s", ctx->info.plugin_name, name);
759         else
760                 snprintf(buf, bufsize, "core::%s", name);
761         return buf;
762 } /* plugin_get_name */
764 static int
765 plugin_add_impl(sdb_llist_t **list, sdb_type_t T, const char *type,
766                 const char *name, void *impl, sdb_object_t *user_data)
768         sdb_object_t *obj;
770         if ((! name) || (! impl))
771                 return -1;
773         assert(list);
775         if (! *list)
776                 *list = sdb_llist_create();
777         if (! *list)
778                 return -1;
780         obj = sdb_object_create(name, T, list, type, impl, user_data);
781         if (! obj)
782                 return -1;
784         if (sdb_llist_append(*list, obj)) {
785                 sdb_object_deref(obj);
786                 return -1;
787         }
789         /* pass control to the list */
790         sdb_object_deref(obj);
792         sdb_log(SDB_LOG_INFO, "core: Registered %s callback '%s'.",
793                         type, name);
794         return 0;
795 } /* plugin_add_impl */
797 /*
798  * object meta-data
799  */
801 typedef struct {
802         int obj_type;
803         sdb_time_t last_update;
804         sdb_time_t interval;
805 } interval_fetcher_t;
807 static int
808 interval_fetcher_host(sdb_store_host_t *host, sdb_object_t *user_data)
810         interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
811         lu->obj_type = SDB_HOST;
812         lu->last_update = host->last_update;
813         return 0;
814 } /* interval_fetcher_host */
816 static int
817 interval_fetcher_service(sdb_store_service_t *svc, sdb_object_t *user_data)
819         interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
820         lu->obj_type = SDB_SERVICE;
821         lu->last_update = svc->last_update;
822         return 0;
823 } /* interval_fetcher_service */
825 static int
826 interval_fetcher_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
828         interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
829         lu->obj_type = SDB_METRIC;
830         lu->last_update = metric->last_update;
831         return 0;
832 } /* interval_fetcher_metric */
834 static int
835 interval_fetcher_attr(sdb_store_attribute_t *attr, sdb_object_t *user_data)
837         interval_fetcher_t *lu = SDB_OBJ_WRAPPER(user_data)->data;
838         lu->obj_type = SDB_ATTRIBUTE;
839         lu->last_update = attr->last_update;
840         return 0;
841 } /* interval_fetcher_attr */
843 static sdb_store_writer_t interval_fetcher = {
844         interval_fetcher_host, interval_fetcher_service,
845         interval_fetcher_metric, interval_fetcher_attr,
846 };
848 static int
849 get_interval(int obj_type, const char *hostname,
850                 int parent_type, const char *parent, const char *name,
851                 sdb_time_t last_update, sdb_time_t *interval_out)
853         sdb_ast_fetch_t fetch = SDB_AST_FETCH_INIT;
854         char hn[hostname ? strlen(hostname) + 1 : 1];
855         char pn[parent ? strlen(parent) + 1 : 1];
856         char n[strlen(name) + 1];
857         int status;
859         interval_fetcher_t lu = { 0, 0, 0 };
860         sdb_object_wrapper_t obj = SDB_OBJECT_WRAPPER_STATIC(&lu);
861         sdb_time_t interval;
863         assert(name);
865         if (hostname)
866                 strncpy(hn, hostname, sizeof(hn));
867         if (parent)
868                 strncpy(pn, parent, sizeof(pn));
869         strncpy(n, name, sizeof(n));
871         fetch.obj_type = obj_type;
872         fetch.hostname = hostname ? hn : NULL;
873         fetch.parent_type = parent_type;
874         fetch.parent = parent ? pn : NULL;
875         fetch.name = n;
877         status = sdb_plugin_query(SDB_AST_NODE(&fetch),
878                         &interval_fetcher, SDB_OBJ(&obj), NULL);
879         if ((status < 0) || (lu.obj_type != obj_type) || (lu.last_update == 0)) {
880                 *interval_out = 0;
881                 return 0;
882         }
884         if (lu.last_update >= last_update) {
885                 if (lu.last_update > last_update)
886                         sdb_log(SDB_LOG_DEBUG, "memstore: Cannot update %s '%s' - "
887                                         "value too old (%"PRIsdbTIME" < %"PRIsdbTIME")",
888                                         SDB_STORE_TYPE_TO_NAME(obj_type), name,
889                                         lu.last_update, last_update);
890                 *interval_out = lu.interval;
891                 return 1;
892         }
894         interval = last_update - lu.last_update;
895         if (lu.interval && interval)
896                 interval = (sdb_time_t)((0.9 * (double)lu.interval)
897                                 + (0.1 * (double)interval));
898         *interval_out = interval;
899         return 0;
900 } /* get_interval */
902 static void
903 get_backend(char **backends, size_t *backends_num)
905         const sdb_plugin_info_t *info;
907         info = sdb_plugin_current();
908         if ((! info) || (! info->plugin_name) || (! *info->plugin_name)) {
909                 *backends_num = 0;
910                 return;
911         }
913         backends[0] = info->plugin_name;
914         *backends_num = 1;
915 } /* get_backend */
917 /*
918  * public API
919  */
921 int
922 sdb_plugin_load(const char *basedir, const char *name,
923                 const sdb_plugin_ctx_t *plugin_ctx)
925         ctx_t *ctx;
927         int status;
929         if ((! name) || (! *name))
930                 return -1;
932         if (! all_plugins) {
933                 if (! (all_plugins = sdb_llist_create())) {
934                         sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': "
935                                         "internal error while creating linked list", name);
936                         return -1;
937                 }
938         }
940         ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
941         if (ctx) {
942                 /* plugin already loaded */
943                 if (! ctx->use_cnt) {
944                         /* reloading plugin */
945                         ctx_t *old_ctx = ctx_set(ctx);
947                         status = module_init(ctx->info.plugin_name, ctx->handle, NULL);
948                         if (status)
949                                 return status;
951                         sdb_log(SDB_LOG_INFO, "core: Successfully reloaded plugin "
952                                         "'%s' (%s)", ctx->info.plugin_name,
953                                         INFO_GET(&ctx->info, description));
954                         ctx_set(old_ctx);
955                 }
956                 ++ctx->use_cnt;
957                 return 0;
958         }
960         return module_load(basedir, name, plugin_ctx);
961 } /* sdb_plugin_load */
963 int
964 sdb_plugin_set_info(sdb_plugin_info_t *info, int type, ...)
966         va_list ap;
968         if (! info)
969                 return -1;
971         va_start(ap, type);
973         switch (type) {
974                 case SDB_PLUGIN_INFO_DESC:
975                         {
976                                 char *desc = va_arg(ap, char *);
977                                 if (desc) {
978                                         if (info->description)
979                                                 free(info->description);
980                                         info->description = strdup(desc);
981                                 }
982                         }
983                         break;
984                 case SDB_PLUGIN_INFO_COPYRIGHT:
985                         {
986                                 char *copyright = va_arg(ap, char *);
987                                 if (copyright)
988                                         info->copyright = strdup(copyright);
989                         }
990                         break;
991                 case SDB_PLUGIN_INFO_LICENSE:
992                         {
993                                 char *license = va_arg(ap, char *);
994                                 if (license) {
995                                         if (info->license)
996                                                 free(info->license);
997                                         info->license = strdup(license);
998                                 }
999                         }
1000                         break;
1001                 case SDB_PLUGIN_INFO_VERSION:
1002                         {
1003                                 int version = va_arg(ap, int);
1004                                 info->version = version;
1005                         }
1006                         break;
1007                 case SDB_PLUGIN_INFO_PLUGIN_VERSION:
1008                         {
1009                                 int version = va_arg(ap, int);
1010                                 info->plugin_version = version;
1011                         }
1012                         break;
1013                 default:
1014                         va_end(ap);
1015                         return -1;
1016         }
1018         va_end(ap);
1019         return 0;
1020 } /* sdb_plugin_set_info */
1022 int
1023 sdb_plugin_register_config(sdb_plugin_config_cb callback)
1025         ctx_t *ctx = ctx_get();
1027         if (! ctx) {
1028                 sdb_log(SDB_LOG_ERR, "core: Invalid attempt to register a "
1029                                 "config callback from outside a plugin");
1030                 return -1;
1031         }
1032         return plugin_add_impl(&config_list, callback_type, "config",
1033                         ctx->info.plugin_name, (void *)callback, NULL);
1034 } /* sdb_plugin_register_config */
1036 int
1037 sdb_plugin_register_init(const char *name, sdb_plugin_init_cb callback,
1038                 sdb_object_t *user_data)
1040         char cb_name[1024];
1041         return plugin_add_impl(&init_list, callback_type, "init",
1042                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1043                         (void *)callback, user_data);
1044 } /* sdb_plugin_register_init */
1046 int
1047 sdb_plugin_register_shutdown(const char *name, sdb_plugin_shutdown_cb callback,
1048                 sdb_object_t *user_data)
1050         char cb_name[1024];
1051         return plugin_add_impl(&shutdown_list, callback_type, "shutdown",
1052                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1053                         (void *)callback, user_data);
1054 } /* sdb_plugin_register_shutdown */
1056 int
1057 sdb_plugin_register_log(const char *name, sdb_plugin_log_cb callback,
1058                 sdb_object_t *user_data)
1060         char cb_name[1024];
1061         return plugin_add_impl(&log_list, callback_type, "log",
1062                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1063                         callback, user_data);
1064 } /* sdb_plugin_register_log */
1066 int
1067 sdb_plugin_register_cname(const char *name, sdb_plugin_cname_cb callback,
1068                 sdb_object_t *user_data)
1070         char cb_name[1024];
1071         return plugin_add_impl(&cname_list, callback_type, "cname",
1072                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1073                         callback, user_data);
1074 } /* sdb_plugin_register_cname */
1076 int
1077 sdb_plugin_register_collector(const char *name, sdb_plugin_collector_cb callback,
1078                 const sdb_time_t *interval, sdb_object_t *user_data)
1080         char cb_name[1024];
1081         sdb_object_t *obj;
1083         if ((! name) || (! callback))
1084                 return -1;
1086         if (! collector_list)
1087                 collector_list = sdb_llist_create();
1088         if (! collector_list)
1089                 return -1;
1091         plugin_get_name(name, cb_name, sizeof(cb_name));
1093         obj = sdb_object_create(cb_name, collector_type,
1094                         &collector_list, "collector", callback, user_data);
1095         if (! obj)
1096                 return -1;
1098         if (interval)
1099                 CCB(obj)->ccb_interval = *interval;
1100         else {
1101                 ctx_t *ctx = ctx_get();
1103                 if (! ctx) {
1104                         sdb_log(SDB_LOG_ERR, "core: Cannot determine interval "
1105                                         "for collector %s; none specified and no plugin "
1106                                         "context found", cb_name);
1107                         return -1;
1108                 }
1110                 CCB(obj)->ccb_interval = ctx->public.interval;
1111         }
1113         if (! (CCB(obj)->ccb_next_update = sdb_gettime())) {
1114                 char errbuf[1024];
1115                 sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1116                                 "time: %s", sdb_strerror(errno, errbuf, sizeof(errbuf)));
1117                 sdb_object_deref(obj);
1118                 return -1;
1119         }
1121         if (sdb_llist_insert_sorted(collector_list, obj,
1122                                 plugin_cmp_next_update)) {
1123                 sdb_object_deref(obj);
1124                 return -1;
1125         }
1127         /* pass control to the list */
1128         sdb_object_deref(obj);
1130         sdb_log(SDB_LOG_INFO, "core: Registered collector callback '%s' "
1131                         "(interval = %.3fs).", cb_name,
1132                         SDB_TIME_TO_DOUBLE(CCB(obj)->ccb_interval));
1133         return 0;
1134 } /* sdb_plugin_register_collector */
1136 int
1137 sdb_plugin_register_timeseries_fetcher(const char *name,
1138                 sdb_timeseries_fetcher_t *fetcher, sdb_object_t *user_data)
1140         return plugin_add_impl(&timeseries_fetcher_list, ts_fetcher_type, "time-series fetcher",
1141                         name, fetcher, user_data);
1142 } /* sdb_plugin_register_timeseries_fetcher */
1144 int
1145 sdb_plugin_register_writer(const char *name,
1146                 sdb_store_writer_t *writer, sdb_object_t *user_data)
1148         char cb_name[1024];
1149         return plugin_add_impl(&writer_list, writer_type, "store writer",
1150                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1151                         writer, user_data);
1152 } /* sdb_store_register_writer */
1154 int
1155 sdb_plugin_register_reader(const char *name,
1156                 sdb_store_reader_t *reader, sdb_object_t *user_data)
1158         char cb_name[1024];
1159         return plugin_add_impl(&reader_list, reader_type, "store reader",
1160                         plugin_get_name(name, cb_name, sizeof(cb_name)),
1161                         reader, user_data);
1162 } /* sdb_plugin_register_reader */
1164 void
1165 sdb_plugin_unregister_all(void)
1167         size_t i;
1169         for (i = 0; i < SDB_STATIC_ARRAY_LEN(all_lists); ++i) {
1170                 const char  *type =  all_lists[i].type;
1171                 sdb_llist_t *list = *all_lists[i].list;
1173                 size_t len = sdb_llist_len(list);
1175                 if (! len)
1176                         continue;
1178                 sdb_llist_clear(list);
1179                 sdb_log(SDB_LOG_INFO, "core: Unregistered %zu %s callback%s",
1180                                 len, type, len == 1 ? "" : "s");
1181         }
1182 } /* sdb_plugin_unregister_all */
1184 sdb_plugin_ctx_t
1185 sdb_plugin_get_ctx(void)
1187         ctx_t *c;
1189         c = ctx_get();
1190         if (! c) {
1191                 sdb_plugin_log(SDB_LOG_ERR, "core: Invalid read access to plugin "
1192                                 "context outside a plugin");
1193                 return plugin_default_ctx;
1194         }
1195         return c->public;
1196 } /* sdb_plugin_get_ctx */
1198 int
1199 sdb_plugin_set_ctx(sdb_plugin_ctx_t ctx, sdb_plugin_ctx_t *old)
1201         ctx_t *c;
1203         c = ctx_get();
1204         if (! c) {
1205                 sdb_plugin_log(SDB_LOG_ERR, "core: Invalid write access to plugin "
1206                                 "context outside a plugin");
1207                 return -1;
1208         }
1210         if (old)
1211                 *old = c->public;
1212         c->public = ctx;
1213         return 0;
1214 } /* sdb_plugin_set_ctx */
1216 const sdb_plugin_info_t *
1217 sdb_plugin_current(void)
1219         ctx_t *ctx = ctx_get();
1221         if (! ctx)
1222                 return NULL;
1223         return &ctx->info;
1224 } /* sdb_plugin_current */
1226 int
1227 sdb_plugin_configure(const char *name, oconfig_item_t *ci)
1229         callback_t *plugin;
1230         sdb_plugin_config_cb callback;
1232         ctx_t *old_ctx;
1234         int status;
1236         if ((! name) || (! ci))
1237                 return -1;
1239         plugin = CB(sdb_llist_search_by_name(config_list, name));
1240         if (! plugin) {
1241                 ctx_t *ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
1242                 if (! ctx)
1243                         sdb_log(SDB_LOG_ERR, "core: Cannot configure unknown "
1244                                         "plugin '%s'. Missing 'LoadPlugin \"%s\"'?",
1245                                         name, name);
1246                 else
1247                         sdb_log(SDB_LOG_ERR, "core: Plugin '%s' did not register "
1248                                         "a config callback.", name);
1249                 errno = ENOENT;
1250                 return -1;
1251         }
1253         old_ctx = ctx_set(plugin->cb_ctx);
1254         callback = (sdb_plugin_config_cb)plugin->cb_callback;
1255         status = callback(ci);
1256         ctx_set(old_ctx);
1257         return status;
1258 } /* sdb_plugin_configure */
1260 int
1261 sdb_plugin_reconfigure_init(void)
1263         sdb_llist_iter_t *iter;
1265         iter = sdb_llist_get_iter(config_list);
1266         if (config_list && (! iter))
1267                 return -1;
1269         /* deconfigure all plugins */
1270         while (sdb_llist_iter_has_next(iter)) {
1271                 callback_t *plugin;
1272                 sdb_plugin_config_cb callback;
1273                 ctx_t *old_ctx;
1275                 plugin = CB(sdb_llist_iter_get_next(iter));
1276                 old_ctx = ctx_set(plugin->cb_ctx);
1277                 callback = (sdb_plugin_config_cb)plugin->cb_callback;
1278                 callback(NULL);
1279                 ctx_set(old_ctx);
1280         }
1281         sdb_llist_iter_destroy(iter);
1283         iter = sdb_llist_get_iter(all_plugins);
1284         if (all_plugins && (! iter))
1285                 return -1;
1287         /* record all plugins as being unused */
1288         while (sdb_llist_iter_has_next(iter))
1289                 CTX(sdb_llist_iter_get_next(iter))->use_cnt = 0;
1290         sdb_llist_iter_destroy(iter);
1292         sdb_plugin_unregister_all();
1293         return 0;
1294 } /* sdb_plugin_reconfigure_init */
1296 int
1297 sdb_plugin_reconfigure_finish(void)
1299         sdb_llist_iter_t *iter;
1301         iter = sdb_llist_get_iter(all_plugins);
1302         if (all_plugins && (! iter))
1303                 return -1;
1305         while (sdb_llist_iter_has_next(iter)) {
1306                 ctx_t *ctx = CTX(sdb_llist_iter_get_next(iter));
1307                 if (ctx->use_cnt)
1308                         continue;
1310                 sdb_log(SDB_LOG_INFO, "core: Module %s no longer in use",
1311                                 ctx->info.plugin_name);
1312                 sdb_llist_iter_remove_current(iter);
1313                 plugin_unregister_by_name(ctx->info.plugin_name);
1314                 sdb_object_deref(SDB_OBJ(ctx));
1315         }
1316         sdb_llist_iter_destroy(iter);
1317         return 0;
1318 } /* sdb_plugin_reconfigure_finish */
1320 int
1321 sdb_plugin_init_all(void)
1323         sdb_llist_iter_t *iter;
1324         int ret = 0;
1326         iter = sdb_llist_get_iter(init_list);
1327         while (sdb_llist_iter_has_next(iter)) {
1328                 callback_t *cb;
1329                 sdb_plugin_init_cb callback;
1330                 ctx_t *old_ctx;
1332                 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1333                 assert(obj);
1334                 cb = CB(obj);
1336                 callback = (sdb_plugin_init_cb)cb->cb_callback;
1338                 old_ctx = ctx_set(cb->cb_ctx);
1339                 if (callback(cb->cb_user_data)) {
1340                         sdb_log(SDB_LOG_ERR, "core: Failed to initialize plugin "
1341                                         "'%s'. Unregistering all callbacks.", obj->name);
1342                         ctx_set(old_ctx);
1343                         plugin_unregister_by_name(cb->cb_ctx->info.plugin_name);
1344                         ++ret;
1345                 }
1346                 else
1347                         ctx_set(old_ctx);
1348         }
1349         sdb_llist_iter_destroy(iter);
1350         return ret;
1351 } /* sdb_plugin_init_all */
1353 int
1354 sdb_plugin_shutdown_all(void)
1356         sdb_llist_iter_t *iter;
1357         int ret = 0;
1359         iter = sdb_llist_get_iter(shutdown_list);
1360         while (sdb_llist_iter_has_next(iter)) {
1361                 callback_t *cb;
1362                 sdb_plugin_shutdown_cb callback;
1363                 ctx_t *old_ctx;
1365                 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1366                 assert(obj);
1367                 cb = CB(obj);
1369                 callback = (sdb_plugin_shutdown_cb)cb->cb_callback;
1371                 old_ctx = ctx_set(cb->cb_ctx);
1372                 if (callback(cb->cb_user_data)) {
1373                         sdb_log(SDB_LOG_ERR, "core: Failed to shutdown plugin '%s'.",
1374                                         obj->name);
1375                         ++ret;
1376                 }
1377                 ctx_set(old_ctx);
1378         }
1379         sdb_llist_iter_destroy(iter);
1380         return ret;
1381 } /* sdb_plugin_shutdown_all */
1383 int
1384 sdb_plugin_collector_loop(sdb_plugin_loop_t *loop)
1386         if (! collector_list) {
1387                 sdb_log(SDB_LOG_WARNING, "core: No collectors registered. "
1388                                 "Quiting main loop.");
1389                 return -1;
1390         }
1392         if (! loop)
1393                 return -1;
1395         while (loop->do_loop) {
1396                 sdb_plugin_collector_cb callback;
1397                 ctx_t *old_ctx;
1399                 sdb_time_t interval, now;
1401                 sdb_object_t *obj = sdb_llist_shift(collector_list);
1402                 if (! obj)
1403                         return -1;
1405                 callback = (sdb_plugin_collector_cb)CCB(obj)->ccb_callback;
1407                 if (! (now = sdb_gettime())) {
1408                         char errbuf[1024];
1409                         sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1410                                         "time in collector main loop: %s",
1411                                         sdb_strerror(errno, errbuf, sizeof(errbuf)));
1412                         now = CCB(obj)->ccb_next_update;
1413                 }
1415                 if (now < CCB(obj)->ccb_next_update) {
1416                         interval = CCB(obj)->ccb_next_update - now;
1418                         errno = 0;
1419                         while (loop->do_loop && sdb_sleep(interval, &interval)) {
1420                                 if (errno != EINTR) {
1421                                         char errbuf[1024];
1422                                         sdb_log(SDB_LOG_ERR, "core: Failed to sleep "
1423                                                         "in collector main loop: %s",
1424                                                         sdb_strerror(errno, errbuf, sizeof(errbuf)));
1425                                         sdb_llist_insert_sorted(collector_list, obj,
1426                                                         plugin_cmp_next_update);
1427                                         sdb_object_deref(obj);
1428                                         return -1;
1429                                 }
1430                                 errno = 0;
1431                         }
1433                         if (! loop->do_loop) {
1434                                 /* put back; don't worry about errors */
1435                                 sdb_llist_insert_sorted(collector_list, obj,
1436                                                 plugin_cmp_next_update);
1437                                 sdb_object_deref(obj);
1438                                 return 0;
1439                         }
1440                 }
1442                 old_ctx = ctx_set(CCB(obj)->ccb_ctx);
1443                 if (callback(CCB(obj)->ccb_user_data)) {
1444                         /* XXX */
1445                 }
1446                 ctx_set(old_ctx);
1448                 interval = CCB(obj)->ccb_interval;
1449                 if (! interval)
1450                         interval = loop->default_interval;
1451                 if (! interval) {
1452                         sdb_log(SDB_LOG_WARNING, "core: No interval configured "
1453                                         "for plugin '%s'; skipping any further "
1454                                         "iterations.", obj->name);
1455                         sdb_object_deref(obj);
1456                         continue;
1457                 }
1459                 CCB(obj)->ccb_next_update += interval;
1461                 if (! (now = sdb_gettime())) {
1462                         char errbuf[1024];
1463                         sdb_log(SDB_LOG_ERR, "core: Failed to determine current "
1464                                         "time in collector main loop: %s",
1465                                         sdb_strerror(errno, errbuf, sizeof(errbuf)));
1466                         now = CCB(obj)->ccb_next_update;
1467                 }
1469                 if (now > CCB(obj)->ccb_next_update) {
1470                         sdb_log(SDB_LOG_WARNING, "core: Plugin '%s' took too "
1471                                         "long; skipping iterations to keep up.",
1472                                         obj->name);
1473                         CCB(obj)->ccb_next_update = now;
1474                 }
1476                 if (sdb_llist_insert_sorted(collector_list, obj,
1477                                         plugin_cmp_next_update)) {
1478                         sdb_log(SDB_LOG_ERR, "core: Failed to re-insert "
1479                                         "plugin '%s' into collector list. Unable to further "
1480                                         "use the plugin.",
1481                                         obj->name);
1482                         sdb_object_deref(obj);
1483                         return -1;
1484                 }
1486                 /* pass control back to the list */
1487                 sdb_object_deref(obj);
1488         }
1489         return 0;
1490 } /* sdb_plugin_read_loop */
1492 char *
1493 sdb_plugin_cname(char *hostname)
1495         sdb_llist_iter_t *iter;
1497         if (! hostname)
1498                 return NULL;
1500         if (! cname_list)
1501                 return hostname;
1503         iter = sdb_llist_get_iter(cname_list);
1504         while (sdb_llist_iter_has_next(iter)) {
1505                 sdb_plugin_cname_cb callback;
1506                 char *cname;
1508                 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1509                 assert(obj);
1511                 callback = (sdb_plugin_cname_cb)CB(obj)->cb_callback;
1512                 cname = callback(hostname, CB(obj)->cb_user_data);
1513                 if (cname) {
1514                         free(hostname);
1515                         hostname = cname;
1516                 }
1517                 /* else: don't change hostname */
1518         }
1519         sdb_llist_iter_destroy(iter);
1520         return hostname;
1521 } /* sdb_plugin_cname */
1523 int
1524 sdb_plugin_log(int prio, const char *msg)
1526         sdb_llist_iter_t *iter;
1527         int ret = -1;
1529         bool logged = 0;
1531         if (! msg)
1532                 return 0;
1534         iter = sdb_llist_get_iter(log_list);
1535         while (sdb_llist_iter_has_next(iter)) {
1536                 sdb_plugin_log_cb callback;
1537                 int tmp;
1539                 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
1540                 assert(obj);
1542                 callback = (sdb_plugin_log_cb)CB(obj)->cb_callback;
1543                 tmp = callback(prio, msg, CB(obj)->cb_user_data);
1544                 if (tmp > ret)
1545                         ret = tmp;
1547                 if (CB(obj)->cb_ctx)
1548                         logged = 1;
1549                 /* else: this is an internally registered callback */
1550         }
1551         sdb_llist_iter_destroy(iter);
1553         if (! logged)
1554                 return fprintf(stderr, "[%s] %s\n", SDB_LOG_PRIO_TO_STRING(prio), msg);
1555         return ret;
1556 } /* sdb_plugin_log */
1558 int
1559 sdb_plugin_vlogf(int prio, const char *fmt, va_list ap)
1561         sdb_strbuf_t *buf;
1562         int ret;
1564         if (! fmt)
1565                 return 0;
1567         buf = sdb_strbuf_create(64);
1568         if (! buf) {
1569                 ret = fprintf(stderr, "[%s] ", SDB_LOG_PRIO_TO_STRING(prio));
1570                 ret += vfprintf(stderr, fmt, ap);
1571                 return ret;
1572         }
1574         if (sdb_strbuf_vsprintf(buf, fmt, ap) < 0) {
1575                 sdb_strbuf_destroy(buf);
1576                 return -1;
1577         }
1579         ret = sdb_plugin_log(prio, sdb_strbuf_string(buf));
1580         sdb_strbuf_destroy(buf);
1581         return ret;
1582 } /* sdb_plugin_vlogf */
1584 int
1585 sdb_plugin_logf(int prio, const char *fmt, ...)
1587         va_list ap;
1588         int ret;
1590         if (! fmt)
1591                 return 0;
1593         va_start(ap, fmt);
1594         ret = sdb_plugin_vlogf(prio, fmt, ap);
1595         va_end(ap);
1596         return ret;
1597 } /* sdb_plugin_logf */
1599 sdb_timeseries_t *
1600 sdb_plugin_fetch_timeseries(const char *type, const char *id,
1601                 sdb_timeseries_opts_t *opts)
1603         ts_fetcher_t *fetcher;
1604         sdb_timeseries_t *ts;
1606         ctx_t *old_ctx;
1608         if ((! type) || (! id) || (! opts))
1609                 return NULL;
1611         fetcher = TS_FETCHER(sdb_llist_search_by_name(timeseries_fetcher_list, type));
1612         if (! fetcher) {
1613                 sdb_log(SDB_LOG_ERR, "core: Cannot fetch time-series of type %s: "
1614                                 "no such plugin loaded", type);
1615                 errno = ENOENT;
1616                 return NULL;
1617         }
1619         old_ctx = ctx_set(fetcher->ts_ctx);
1620         ts = fetcher->impl.fetch(id, opts, fetcher->ts_user_data);
1621         ctx_set(old_ctx);
1622         return ts;
1623 } /* sdb_plugin_fetch_timeseries */
1625 sdb_timeseries_info_t *
1626 sdb_plugin_describe_timeseries(const char *type, const char *id)
1628         ts_fetcher_t *fetcher;
1629         sdb_timeseries_info_t *ts_info;
1631         ctx_t *old_ctx;
1633         if ((! type) || (! id))
1634                 return NULL;
1636         fetcher = TS_FETCHER(sdb_llist_search_by_name(timeseries_fetcher_list, type));
1637         if (! fetcher) {
1638                 sdb_log(SDB_LOG_ERR, "core: Cannot describe time-series of type %s: "
1639                                 "no such plugin loaded", type);
1640                 errno = ENOENT;
1641                 return NULL;
1642         }
1644         old_ctx = ctx_set(fetcher->ts_ctx);
1645         ts_info = fetcher->impl.describe(id, fetcher->ts_user_data);
1646         ctx_set(old_ctx);
1647         return ts_info;
1648 } /* sdb_plugin_describe_timeseries */
1650 int
1651 sdb_plugin_query(sdb_ast_node_t *ast,
1652                 sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf)
1654         query_writer_t qw = QUERY_WRITER_INIT(w, wd);
1655         reader_t *reader;
1656         sdb_object_t *q;
1658         size_t n = sdb_llist_len(reader_list);
1659         int status = 0;
1661         if (! ast)
1662                 return 0;
1664         if ((ast->type != SDB_AST_TYPE_FETCH)
1665                         && (ast->type != SDB_AST_TYPE_LIST)
1666                         && (ast->type != SDB_AST_TYPE_LOOKUP)) {
1667                 sdb_log(SDB_LOG_ERR, "core: Cannot execute query of type %s",
1668                                 SDB_AST_TYPE_TO_STRING(ast));
1669                 sdb_strbuf_sprintf(errbuf, "Cannot execute query of type %s",
1670                                 SDB_AST_TYPE_TO_STRING(ast));
1671                 return -1;
1672         }
1674         if (n != 1) {
1675                 char *msg = (n > 0)
1676                         ? "Cannot execute query: multiple readers not supported"
1677                         : "Cannot execute query: no readers registered";
1678                 sdb_strbuf_sprintf(errbuf, "%s", msg);
1679                 sdb_log(SDB_LOG_ERR, "core: %s", msg);
1680                 return -1;
1681         }
1683         reader = READER(sdb_llist_get(reader_list, 0));
1684         assert(reader);
1686         q = reader->impl.prepare_query(ast, errbuf, reader->r_user_data);
1687         if (q)
1688                 status = reader->impl.execute_query(q, &query_writer, SDB_OBJ(&qw),
1689                                 errbuf, reader->r_user_data);
1690         else
1691                 status = -1;
1693         sdb_object_deref(SDB_OBJ(q));
1694         sdb_object_deref(SDB_OBJ(reader));
1695         return status;
1696 } /* sdb_plugin_query */
1698 int
1699 sdb_plugin_store_host(const char *name, sdb_time_t last_update)
1701         sdb_store_host_t host = SDB_STORE_HOST_INIT;
1702         char *backends[1];
1703         char *cname;
1705         sdb_llist_iter_t *iter;
1706         int status = 0;
1708         if (! name)
1709                 return -1;
1711         if (! sdb_llist_len(writer_list)) {
1712                 sdb_log(SDB_LOG_ERR, "core: Cannot store host: "
1713                                 "no writers registered");
1714                 return -1;
1715         }
1717         cname = sdb_plugin_cname(strdup(name));
1718         if (! cname) {
1719                 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1720                 return -1;
1721         }
1723         host.name = cname;
1724         host.last_update = last_update ? last_update : sdb_gettime();
1725         if (get_interval(SDB_HOST, NULL, -1, NULL, cname,
1726                                 host.last_update, &host.interval)) {
1727                 free(cname);
1728                 return 1;
1729         }
1730         host.backends = (const char * const *)backends;
1731         get_backend(backends, &host.backends_num);
1733         iter = sdb_llist_get_iter(writer_list);
1734         while (sdb_llist_iter_has_next(iter)) {
1735                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1736                 int s;
1737                 assert(writer);
1738                 s = writer->impl.store_host(&host, writer->w_user_data);
1739                 if (((s > 0) && (status >= 0)) || (s < 0))
1740                         status = s;
1741         }
1742         sdb_llist_iter_destroy(iter);
1743         free(cname);
1744         return status;
1745 } /* sdb_plugin_store_host */
1747 int
1748 sdb_plugin_store_service(const char *hostname, const char *name,
1749                 sdb_time_t last_update)
1751         sdb_store_service_t service = SDB_STORE_SERVICE_INIT;
1752         char *backends[1];
1753         char *cname;
1755         sdb_llist_iter_t *iter;
1756         sdb_data_t d;
1758         int status = 0;
1760         if ((! hostname) || (! name))
1761                 return -1;
1763         if (! sdb_llist_len(writer_list)) {
1764                 sdb_log(SDB_LOG_ERR, "core: Cannot store service: "
1765                                 "no writers registered");
1766                 return -1;
1767         }
1769         cname = sdb_plugin_cname(strdup(hostname));
1770         if (! cname) {
1771                 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1772                 return -1;
1773         }
1775         service.hostname = cname;
1776         service.name = name;
1777         service.last_update = last_update ? last_update : sdb_gettime();
1778         if (get_interval(SDB_SERVICE, cname, -1, NULL, name,
1779                                 service.last_update, &service.interval)) {
1780                 free(cname);
1781                 return 1;
1782         }
1783         service.backends = (const char * const *)backends;
1784         get_backend(backends, &service.backends_num);
1786         iter = sdb_llist_get_iter(writer_list);
1787         while (sdb_llist_iter_has_next(iter)) {
1788                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1789                 int s;
1790                 assert(writer);
1791                 s = writer->impl.store_service(&service, writer->w_user_data);
1792                 if (((s > 0) && (status >= 0)) || (s < 0))
1793                         status = s;
1794         }
1795         sdb_llist_iter_destroy(iter);
1797         if (! status) {
1798                 /* record the hostname as an attribute */
1799                 d.type = SDB_TYPE_STRING;
1800                 d.data.string = cname;
1801                 if (sdb_plugin_store_service_attribute(cname, name,
1802                                         "hostname", &d, service.last_update))
1803                         status = -1;
1804         }
1806         free(cname);
1807         return status;
1808 } /* sdb_plugin_store_service */
1810 int
1811 sdb_plugin_store_metric(const char *hostname, const char *name,
1812                 sdb_metric_store_t *store, sdb_time_t last_update)
1814         sdb_store_metric_t metric = SDB_STORE_METRIC_INIT;
1815         char *backends[1];
1816         char *cname;
1818         sdb_llist_iter_t *iter;
1819         sdb_data_t d;
1821         int status = 0;
1823         if ((! hostname) || (! name))
1824                 return -1;
1826         if (! sdb_llist_len(writer_list)) {
1827                 sdb_log(SDB_LOG_ERR, "core: Cannot store metric: "
1828                                 "no writers registered");
1829                 return -1;
1830         }
1832         cname = sdb_plugin_cname(strdup(hostname));
1833         if (! cname) {
1834                 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1835                 return -1;
1836         }
1838         if (store && ((! store->type) || (! store->id)))
1839                 store = NULL;
1841         metric.hostname = cname;
1842         metric.name = name;
1843         if (store) {
1844                 if (store->last_update < last_update)
1845                         store->last_update = last_update;
1846                 metric.stores = store;
1847                 metric.stores_num = 1;
1848         }
1849         metric.last_update = last_update ? last_update : sdb_gettime();
1850         if (get_interval(SDB_METRIC, cname, -1, NULL, name,
1851                                 metric.last_update, &metric.interval)) {
1852                 free(cname);
1853                 return 1;
1854         }
1855         metric.backends = (const char * const *)backends;
1856         get_backend(backends, &metric.backends_num);
1858         iter = sdb_llist_get_iter(writer_list);
1859         while (sdb_llist_iter_has_next(iter)) {
1860                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1861                 int s;
1862                 assert(writer);
1863                 s = writer->impl.store_metric(&metric, writer->w_user_data);
1864                 if (((s > 0) && (status >= 0)) || (s < 0))
1865                         status = s;
1866         }
1867         sdb_llist_iter_destroy(iter);
1869         if (! status) {
1870                 /* record the hostname as an attribute */
1871                 d.type = SDB_TYPE_STRING;
1872                 d.data.string = cname;
1873                 if (sdb_plugin_store_metric_attribute(cname, name,
1874                                         "hostname", &d, metric.last_update))
1875                         status = -1;
1876         }
1878         free(cname);
1879         return status;
1880 } /* sdb_plugin_store_metric */
1882 int
1883 sdb_plugin_store_attribute(const char *hostname, const char *key,
1884                 const sdb_data_t *value, sdb_time_t last_update)
1886         sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1887         char *backends[1];
1888         char *cname;
1890         sdb_llist_iter_t *iter;
1891         int status = 0;
1893         if ((! hostname) || (! key) || (! value))
1894                 return -1;
1896         if (! sdb_llist_len(writer_list)) {
1897                 sdb_log(SDB_LOG_ERR, "core: Cannot store attribute: "
1898                                 "no writers registered");
1899                 return -1;
1900         }
1902         cname = sdb_plugin_cname(strdup(hostname));
1903         if (! cname) {
1904                 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1905                 return -1;
1906         }
1908         attr.parent_type = SDB_HOST;
1909         attr.parent = cname;
1910         attr.key = key;
1911         attr.value = *value;
1912         attr.last_update = last_update ? last_update : sdb_gettime();
1913         if (get_interval(SDB_ATTRIBUTE, cname, -1, NULL, key,
1914                                 attr.last_update, &attr.interval)) {
1915                 free(cname);
1916                 return 1;
1917         }
1918         attr.backends = (const char * const *)backends;
1919         get_backend(backends, &attr.backends_num);
1921         iter = sdb_llist_get_iter(writer_list);
1922         while (sdb_llist_iter_has_next(iter)) {
1923                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1924                 int s;
1925                 assert(writer);
1926                 s = writer->impl.store_attribute(&attr, writer->w_user_data);
1927                 if (((s > 0) && (status >= 0)) || (s < 0))
1928                         status = s;
1929         }
1930         sdb_llist_iter_destroy(iter);
1931         free(cname);
1932         return status;
1933 } /* sdb_plugin_store_attribute */
1935 int
1936 sdb_plugin_store_service_attribute(const char *hostname, const char *service,
1937                 const char *key, const sdb_data_t *value, sdb_time_t last_update)
1939         sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1940         char *backends[1];
1941         char *cname;
1943         sdb_llist_iter_t *iter;
1944         int status = 0;
1946         if ((! hostname) || (! service) || (! key) || (! value))
1947                 return -1;
1949         if (! sdb_llist_len(writer_list)) {
1950                 sdb_log(SDB_LOG_ERR, "core: Cannot store service attribute: "
1951                                 "no writers registered");
1952                 return -1;
1953         }
1955         cname = sdb_plugin_cname(strdup(hostname));
1956         if (! cname) {
1957                 sdb_log(SDB_LOG_ERR, "core: strdup failed");
1958                 return -1;
1959         }
1961         attr.hostname = cname;
1962         attr.parent_type = SDB_SERVICE;
1963         attr.parent = service;
1964         attr.key = key;
1965         attr.value = *value;
1966         attr.last_update = last_update ? last_update : sdb_gettime();
1967         if (get_interval(SDB_ATTRIBUTE, cname, SDB_SERVICE, service, key,
1968                                 attr.last_update, &attr.interval)) {
1969                 free(cname);
1970                 return 1;
1971         }
1972         attr.backends = (const char * const *)backends;
1973         get_backend(backends, &attr.backends_num);
1975         iter = sdb_llist_get_iter(writer_list);
1976         while (sdb_llist_iter_has_next(iter)) {
1977                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
1978                 int s;
1979                 assert(writer);
1980                 s = writer->impl.store_attribute(&attr, writer->w_user_data);
1981                 if (((s > 0) && (status >= 0)) || (s < 0))
1982                         status = s;
1983         }
1984         sdb_llist_iter_destroy(iter);
1985         free(cname);
1986         return status;
1987 } /* sdb_plugin_store_service_attribute */
1989 int
1990 sdb_plugin_store_metric_attribute(const char *hostname, const char *metric,
1991                 const char *key, const sdb_data_t *value, sdb_time_t last_update)
1993         sdb_store_attribute_t attr = SDB_STORE_ATTRIBUTE_INIT;
1994         char *backends[1];
1995         char *cname;
1997         sdb_llist_iter_t *iter;
1998         int status = 0;
2000         if ((! hostname) || (! metric) || (! key) || (! value))
2001                 return -1;
2003         if (! sdb_llist_len(writer_list)) {
2004                 sdb_log(SDB_LOG_ERR, "core: Cannot store metric attribute: "
2005                                 "no writers registered");
2006                 return -1;
2007         }
2009         cname = sdb_plugin_cname(strdup(hostname));
2010         if (! cname) {
2011                 sdb_log(SDB_LOG_ERR, "core: strdup failed");
2012                 return -1;
2013         }
2015         attr.hostname = cname;
2016         attr.parent_type = SDB_METRIC;
2017         attr.parent = metric;
2018         attr.key = key;
2019         attr.value = *value;
2020         attr.last_update = last_update ? last_update : sdb_gettime();
2021         if (get_interval(SDB_ATTRIBUTE, cname, SDB_METRIC, metric, key,
2022                                 attr.last_update, &attr.interval)) {
2023                 free(cname);
2024                 return 1;
2025         }
2026         attr.backends = (const char * const *)backends;
2027         get_backend(backends, &attr.backends_num);
2029         iter = sdb_llist_get_iter(writer_list);
2030         while (sdb_llist_iter_has_next(iter)) {
2031                 writer_t *writer = WRITER(sdb_llist_iter_get_next(iter));
2032                 int s;
2033                 assert(writer);
2034                 s = writer->impl.store_attribute(&attr, writer->w_user_data);
2035                 if (((s > 0) && (status >= 0)) || (s < 0))
2036                         status = s;
2037         }
2038         sdb_llist_iter_destroy(iter);
2039         free(cname);
2040         return status;
2041 } /* sdb_plugin_store_metric_attribute */
2043 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */