summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: f6a1edd)
raw | patch | inline | side by side (parent: f6a1edd)
author | Sebastian Harl <sh@tokkee.org> | |
Wed, 5 Aug 2015 08:38:48 +0000 (10:38 +0200) | ||
committer | Sebastian Harl <sh@tokkee.org> | |
Wed, 5 Aug 2015 08:42:04 +0000 (10:42 +0200) |
A reader plugin can be used to query a data-store based on a query described
by an AST. The plugin module provides a generic function for this purpose.
At the moment, only a single reader plugin can be used at a time. Supporting
multiple readers requires a good merge strategy. The challenge is to handle
filters and query conditions correctly which may evaluate differently after
two stores have been merged.
by an AST. The plugin module provides a generic function for this purpose.
At the moment, only a single reader plugin can be used at a time. Supporting
multiple readers requires a good merge strategy. The challenge is to handle
filters and query conditions correctly which may evaluate differently after
two stores have been merged.
src/core/plugin.c | patch | blob | history | |
src/core/store.c | patch | blob | history | |
src/include/core/plugin.h | patch | blob | history | |
src/include/core/store.h | patch | blob | history |
diff --git a/src/core/plugin.c b/src/core/plugin.c
index 3ba98df263cb7a3e257fbcac2a7dd364005ca979..26b1408001717d0a7fb2aa9d953a3c3a0125a8f9 100644 (file)
--- a/src/core/plugin.c
+++ b/src/core/plugin.c
} writer_t;
#define WRITER(obj) ((writer_t *)(obj))
+typedef struct {
+ callback_t super; /* cb_callback will always be NULL */
+#define r_user_data super.cb_user_data
+#define r_ctx super.cb_ctx
+ sdb_store_reader_t impl;
+} reader_t;
+#define READER(obj) ((reader_t *)(obj))
+
/*
* private variables
*/
static sdb_llist_t *log_list = NULL;
static sdb_llist_t *ts_fetcher_list = NULL;
static sdb_llist_t *writer_list = NULL;
+static sdb_llist_t *reader_list = NULL;
static struct {
const char *type;
{ "log", &log_list },
{ "timeseries fetcher", &ts_fetcher_list },
{ "store writer", &writer_list },
+ { "store reader", &reader_list },
};
/*
plugin_writer_destroy
};
+static int
+plugin_reader_init(sdb_object_t *obj, va_list ap)
+{
+ sdb_store_reader_t *impl = va_arg(ap, sdb_store_reader_t *);
+ sdb_object_t *ud = va_arg(ap, sdb_object_t *);
+
+ assert(impl);
+
+ if ((! impl->prepare_query) || (! impl->execute_query)) {
+ sdb_log(SDB_LOG_ERR, "core: store reader callback '%s' "
+ "does not fully implement the reader interface.",
+ obj->name);
+ return -1;
+ }
+ if (sdb_llist_search_by_name(reader_list, obj->name)) {
+ sdb_log(SDB_LOG_WARNING, "core: store reader callback '%s' "
+ "has already been registered. Ignoring newly "
+ "registered version.", obj->name);
+ return -1;
+ }
+
+ /* ctx may be NULL if the callback was not registered by a plugin */
+
+ READER(obj)->impl = *impl;
+ READER(obj)->r_ctx = ctx_get();
+ sdb_object_ref(SDB_OBJ(READER(obj)->r_ctx));
+
+ sdb_object_ref(ud);
+ READER(obj)->r_user_data = ud;
+ return 0;
+} /* plugin_reader_init */
+
+static void
+plugin_reader_destroy(sdb_object_t *obj)
+{
+ assert(obj);
+ sdb_object_deref(READER(obj)->r_user_data);
+ sdb_object_deref(SDB_OBJ(READER(obj)->r_ctx));
+} /* plugin_reader_destroy */
+
+static sdb_type_t reader_type = {
+ sizeof(reader_t),
+
+ plugin_reader_init,
+ plugin_reader_destroy
+};
+
static int
module_init(const char *name, lt_dlhandle lh, sdb_plugin_info_t *info)
{
return 0;
} /* sdb_store_register_writer */
+int
+sdb_plugin_register_reader(const char *name,
+ sdb_store_reader_t *reader, sdb_object_t *user_data)
+{
+ char cb_name[1024];
+ sdb_object_t *obj;
+
+ if ((! name) || (! reader))
+ return -1;
+
+ if (! reader_list)
+ reader_list = sdb_llist_create();
+ if (! reader_list)
+ return -1;
+
+ plugin_get_name(name, cb_name, sizeof(cb_name));
+
+ obj = sdb_object_create(cb_name, reader_type,
+ reader, user_data);
+ if (! obj)
+ return -1;
+
+ if (sdb_llist_append(reader_list, obj)) {
+ sdb_object_deref(obj);
+ return -1;
+ }
+
+ /* pass control to the list */
+ sdb_object_deref(obj);
+
+ sdb_log(SDB_LOG_INFO, "core: Registered store reader callback '%s'.",
+ cb_name);
+ return 0;
+} /* sdb_plugin_register_reader */
+
void
sdb_plugin_unregister_all(void)
{
return ts;
} /* sdb_plugin_fetch_timeseries */
+int
+sdb_plugin_query(sdb_ast_node_t *ast, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf)
+{
+ size_t n = sdb_llist_len(reader_list);
+ reader_t *reader;
+ sdb_object_t *q;
+ int status = 0;
+
+ if (! ast)
+ return 0;
+
+ if (n != 1) {
+ char *msg = (n > 0)
+ ? "Cannot execute query: multiple readers not supported"
+ : "Cannot execute query: no readers registered";
+ sdb_strbuf_sprintf(errbuf, "%s", msg);
+ sdb_log(SDB_LOG_ERR, "core: %s", msg);
+ return -1;
+ }
+
+ reader = READER(sdb_llist_get(reader_list, 0));
+ assert(reader);
+
+ q = reader->impl.prepare_query(ast, errbuf, reader->r_user_data);
+ if (q)
+ status = reader->impl.execute_query(q, buf, errbuf, reader->r_user_data);
+ else
+ status = -1;
+
+ sdb_object_deref(SDB_OBJ(q));
+ sdb_object_deref(SDB_OBJ(reader));
+ return status;
+} /* sdb_plugin_query */
+
int
sdb_plugin_store_host(const char *name, sdb_time_t last_update)
{
diff --git a/src/core/store.c b/src/core/store.c
index f672c120a3c646b3893f06583a717b13ad9f22cf..2d32e0d2cdbbb7419c54094d607c3eb771d99a8f 100644 (file)
--- a/src/core/store.c
+++ b/src/core/store.c
store_attribute, store_service_attr, store_metric_attr,
};
+/*
+ * TODO: let prepare and execute accept a store object as their user_data
+ * object
+ */
+
+static sdb_object_t *
+prepare_query(sdb_ast_node_t *ast,
+ sdb_strbuf_t __attribute__((unused)) *errbuf,
+ sdb_object_t __attribute__((unused)) *user_data)
+{
+ return SDB_OBJ(sdb_store_query_prepare(ast));
+} /* prepare_query */
+
+static int
+execute_query(sdb_object_t *q,
+ sdb_strbuf_t *buf, sdb_strbuf_t *errbuf,
+ sdb_object_t __attribute__((unused)) *user_data)
+{
+ return sdb_store_query_execute(QUERY(q), buf, errbuf);
+} /* execute_query */
+
+static sdb_store_reader_t store_reader = {
+ prepare_query, execute_query,
+};
+
/*
* public API
*/
index 1155a957cd3f06fc186b16d4250662c25aaaa9dd..6b5a9f46268b928c4284e5910afe3117f964d25c 100644 (file)
*
* Arguments:
* - user_data: If specified, this will be passed on to each call of the
- * callback. The function will take ownership of the object, that is,
+ * callbacks. The function will take ownership of the object, that is,
* increment the reference count by one. In case the caller does not longer
* use the object for other purposes, it should thus deref it.
*/
sdb_plugin_register_writer(const char *name,
sdb_store_writer_t *writer, sdb_object_t *user_data);
+/*
+ * sdb_plugin_register_reader:
+ * Register a "reader" implementation for querying the store. It is invalid to
+ * register an incomplete reader which does not implement all of the reader
+ * interface.
+ *
+ * Arguments:
+ * - user_data: If specified, this will be passed on to each call of the
+ * callbacks. The function will take ownership of the object, that is,
+ * increment the reference count by one. In case the caller does not longer
+ * use the object for other purposes, it should thus deref it.
+ */
+int
+sdb_plugin_register_reader(const char *name,
+ sdb_store_reader_t *reader, sdb_object_t *user_data);
+
/*
* sdb_plugin_unregister_all:
* Unregister all registered plugins and destruct their user-data objects.
sdb_plugin_fetch_timeseries(const char *type, const char *id,
sdb_timeseries_opts_t *opts);
+/*
+ * sdb_plugin_query:
+ * Query the store using the query specified by 'ast'. The result will be
+ * written to 'buf' and any errors will be written to 'errbuf'.
+ *
+ * Returns:
+ * - 0 on success
+ * - a negative value else
+ */
+int
+sdb_plugin_query(sdb_ast_node_t *ast, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf);
+
/*
* sdb_plugin_store_host, sdb_plugin_store_service, sdb_plugin_store_metric,
* sdb_plugin_store_attribute, sdb_plugin_store_service_attribute,
index 289bc2f88684ef6a82be2d63935d1b5bcb85d22a..e851dd9f955ee38cb3bec993fe753aa3e4f5a537 100644 (file)
--- a/src/include/core/store.h
+++ b/src/include/core/store.h
sdb_object_t *user_data);
} sdb_store_writer_t;
+/*
+ * A store reader describes the interface to query a store implementation.
+ */
+typedef struct {
+ /*
+ * prepare_query:
+ * Prepare the query described by 'ast' for execution.
+ */
+ sdb_object_t *(*prepare_query)(sdb_ast_node_t *ast,
+ sdb_strbuf_t *errbuf, sdb_object_t *user_data);
+
+ /*
+ * execute_query:
+ * Execute a previously prepared query. The callback may expect that only
+ * queries prepared by its respective prepare callback will be passed to
+ * this function.
+ *
+ * TODO: Instead of letting the executor write directly to a string buffer
+ * (which cannot easily be merged with other results), let it hand
+ * all objects to a store-writer.
+ */
+ int (*execute_query)(sdb_object_t *q,
+ sdb_strbuf_t *buf, sdb_strbuf_t *errbuf,
+ sdb_object_t *user_data);
+} sdb_store_reader_t;
+
/*
* sdb_store_init:
* Initialize the store sub-system. This function has to be called before