Code

core: Add support for reader/query plugins.
authorSebastian Harl <sh@tokkee.org>
Wed, 5 Aug 2015 08:38:48 +0000 (10:38 +0200)
committerSebastian Harl <sh@tokkee.org>
Wed, 5 Aug 2015 08:42:04 +0000 (10:42 +0200)
A reader plugin can be used to query a data-store based on a query described
by an AST. The plugin module provides a generic function for this purpose.

At the moment, only a single reader plugin can be used at a time. Supporting
multiple readers requires a good merge strategy. The challenge is to handle
filters and query conditions correctly which may evaluate differently after
two stores have been merged.

src/core/plugin.c
src/core/store.c
src/include/core/plugin.h
src/include/core/store.h

index 3ba98df..26b1408 100644 (file)
@@ -106,6 +106,14 @@ typedef struct {
 } writer_t;
 #define WRITER(obj) ((writer_t *)(obj))
 
+typedef struct {
+       callback_t super; /* cb_callback will always be NULL */
+#define r_user_data super.cb_user_data
+#define r_ctx super.cb_ctx
+       sdb_store_reader_t impl;
+} reader_t;
+#define READER(obj) ((reader_t *)(obj))
+
 /*
  * private variables
  */
@@ -127,6 +135,7 @@ static sdb_llist_t      *shutdown_list = NULL;
 static sdb_llist_t      *log_list = NULL;
 static sdb_llist_t      *ts_fetcher_list = NULL;
 static sdb_llist_t      *writer_list = NULL;
+static sdb_llist_t      *reader_list = NULL;
 
 static struct {
        const char   *type;
@@ -140,6 +149,7 @@ static struct {
        { "log",                &log_list },
        { "timeseries fetcher", &ts_fetcher_list },
        { "store writer",       &writer_list },
+       { "store reader",       &reader_list },
 };
 
 /*
@@ -436,6 +446,53 @@ static sdb_type_t writer_type = {
        plugin_writer_destroy
 };
 
+static int
+plugin_reader_init(sdb_object_t *obj, va_list ap)
+{
+       sdb_store_reader_t *impl = va_arg(ap, sdb_store_reader_t *);
+       sdb_object_t       *ud   = va_arg(ap, sdb_object_t *);
+
+       assert(impl);
+
+       if ((! impl->prepare_query) || (! impl->execute_query)) {
+               sdb_log(SDB_LOG_ERR, "core: store reader callback '%s' "
+                               "does not fully implement the reader interface.",
+                               obj->name);
+               return -1;
+       }
+       if (sdb_llist_search_by_name(reader_list, obj->name)) {
+               sdb_log(SDB_LOG_WARNING, "core: store reader callback '%s' "
+                               "has already been registered. Ignoring newly "
+                               "registered version.", obj->name);
+               return -1;
+       }
+
+       /* ctx may be NULL if the callback was not registered by a plugin */
+
+       READER(obj)->impl = *impl;
+       READER(obj)->r_ctx  = ctx_get();
+       sdb_object_ref(SDB_OBJ(READER(obj)->r_ctx));
+
+       sdb_object_ref(ud);
+       READER(obj)->r_user_data = ud;
+       return 0;
+} /* plugin_reader_init */
+
+static void
+plugin_reader_destroy(sdb_object_t *obj)
+{
+       assert(obj);
+       sdb_object_deref(READER(obj)->r_user_data);
+       sdb_object_deref(SDB_OBJ(READER(obj)->r_ctx));
+} /* plugin_reader_destroy */
+
+static sdb_type_t reader_type = {
+       sizeof(reader_t),
+
+       plugin_reader_init,
+       plugin_reader_destroy
+};
+
 static int
 module_init(const char *name, lt_dlhandle lh, sdb_plugin_info_t *info)
 {
@@ -867,6 +924,41 @@ sdb_plugin_register_writer(const char *name,
        return 0;
 } /* sdb_store_register_writer */
 
+int
+sdb_plugin_register_reader(const char *name,
+               sdb_store_reader_t *reader, sdb_object_t *user_data)
+{
+       char cb_name[1024];
+       sdb_object_t *obj;
+
+       if ((! name) || (! reader))
+               return -1;
+
+       if (! reader_list)
+               reader_list = sdb_llist_create();
+       if (! reader_list)
+               return -1;
+
+       plugin_get_name(name, cb_name, sizeof(cb_name));
+
+       obj = sdb_object_create(cb_name, reader_type,
+                       reader, user_data);
+       if (! obj)
+               return -1;
+
+       if (sdb_llist_append(reader_list, obj)) {
+               sdb_object_deref(obj);
+               return -1;
+       }
+
+       /* pass control to the list */
+       sdb_object_deref(obj);
+
+       sdb_log(SDB_LOG_INFO, "core: Registered store reader callback '%s'.",
+                       cb_name);
+       return 0;
+} /* sdb_plugin_register_reader */
+
 void
 sdb_plugin_unregister_all(void)
 {
@@ -1330,6 +1422,40 @@ sdb_plugin_fetch_timeseries(const char *type, const char *id,
        return ts;
 } /* sdb_plugin_fetch_timeseries */
 
+int
+sdb_plugin_query(sdb_ast_node_t *ast, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf)
+{
+       size_t n = sdb_llist_len(reader_list);
+       reader_t *reader;
+       sdb_object_t *q;
+       int status = 0;
+
+       if (! ast)
+               return 0;
+
+       if (n != 1) {
+               char *msg = (n > 0)
+                       ? "Cannot execute query: multiple readers not supported"
+                       : "Cannot execute query: no readers registered";
+               sdb_strbuf_sprintf(errbuf, "%s", msg);
+               sdb_log(SDB_LOG_ERR, "core: %s", msg);
+               return -1;
+       }
+
+       reader = READER(sdb_llist_get(reader_list, 0));
+       assert(reader);
+
+       q = reader->impl.prepare_query(ast, errbuf, reader->r_user_data);
+       if (q)
+               status = reader->impl.execute_query(q, buf, errbuf, reader->r_user_data);
+       else
+               status = -1;
+
+       sdb_object_deref(SDB_OBJ(q));
+       sdb_object_deref(SDB_OBJ(reader));
+       return status;
+} /* sdb_plugin_query */
+
 int
 sdb_plugin_store_host(const char *name, sdb_time_t last_update)
 {
index f672c12..2d32e0d 100644 (file)
@@ -831,6 +831,31 @@ static sdb_store_writer_t store_writer = {
        store_attribute, store_service_attr, store_metric_attr,
 };
 
+/*
+ * TODO: let prepare and execute accept a store object as their user_data
+ * object
+ */
+
+static sdb_object_t *
+prepare_query(sdb_ast_node_t *ast,
+               sdb_strbuf_t __attribute__((unused)) *errbuf,
+               sdb_object_t __attribute__((unused)) *user_data)
+{
+       return SDB_OBJ(sdb_store_query_prepare(ast));
+} /* prepare_query */
+
+static int
+execute_query(sdb_object_t *q,
+               sdb_strbuf_t *buf, sdb_strbuf_t *errbuf,
+               sdb_object_t __attribute__((unused)) *user_data)
+{
+       return sdb_store_query_execute(QUERY(q), buf, errbuf);
+} /* execute_query */
+
+static sdb_store_reader_t store_reader = {
+       prepare_query, execute_query,
+};
+
 /*
  * public API
  */
index 1155a95..6b5a9f4 100644 (file)
@@ -267,7 +267,7 @@ sdb_plugin_register_ts_fetcher(const char *name,
  *
  * Arguments:
  *  - user_data: If specified, this will be passed on to each call of the
- *    callback. The function will take ownership of the object, that is,
+ *    callbacks. The function will take ownership of the object, that is,
  *    increment the reference count by one. In case the caller does not longer
  *    use the object for other purposes, it should thus deref it.
  */
@@ -275,6 +275,22 @@ int
 sdb_plugin_register_writer(const char *name,
                sdb_store_writer_t *writer, sdb_object_t *user_data);
 
+/*
+ * sdb_plugin_register_reader:
+ * Register a "reader" implementation for querying the store. It is invalid to
+ * register an incomplete reader which does not implement all of the reader
+ * interface.
+ *
+ * Arguments:
+ *  - user_data: If specified, this will be passed on to each call of the
+ *    callbacks. The function will take ownership of the object, that is,
+ *    increment the reference count by one. In case the caller does not longer
+ *    use the object for other purposes, it should thus deref it.
+ */
+int
+sdb_plugin_register_reader(const char *name,
+               sdb_store_reader_t *reader, sdb_object_t *user_data);
+
 /*
  * sdb_plugin_unregister_all:
  * Unregister all registered plugins and destruct their user-data objects.
@@ -420,6 +436,18 @@ sdb_timeseries_t *
 sdb_plugin_fetch_timeseries(const char *type, const char *id,
                sdb_timeseries_opts_t *opts);
 
+/*
+ * sdb_plugin_query:
+ * Query the store using the query specified by 'ast'. The result will be
+ * written to 'buf' and any errors will be written to 'errbuf'.
+ *
+ * Returns:
+ *  - 0 on success
+ *  - a negative value else
+ */
+int
+sdb_plugin_query(sdb_ast_node_t *ast, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf);
+
 /*
  * sdb_plugin_store_host, sdb_plugin_store_service, sdb_plugin_store_metric,
  * sdb_plugin_store_attribute, sdb_plugin_store_service_attribute,
index 289bc2f..e851dd9 100644 (file)
@@ -229,6 +229,32 @@ typedef struct {
                        sdb_object_t *user_data);
 } sdb_store_writer_t;
 
+/*
+ * A store reader describes the interface to query a store implementation.
+ */
+typedef struct {
+       /*
+        * prepare_query:
+        * Prepare the query described by 'ast' for execution.
+        */
+       sdb_object_t *(*prepare_query)(sdb_ast_node_t *ast,
+                       sdb_strbuf_t *errbuf, sdb_object_t *user_data);
+
+       /*
+        * execute_query:
+        * Execute a previously prepared query. The callback may expect that only
+        * queries prepared by its respective prepare callback will be passed to
+        * this function.
+        *
+        * TODO: Instead of letting the executor write directly to a string buffer
+        *       (which cannot easily be merged with other results), let it hand
+        *       all objects to a store-writer.
+        */
+       int (*execute_query)(sdb_object_t *q,
+                       sdb_strbuf_t *buf, sdb_strbuf_t *errbuf,
+                       sdb_object_t *user_data);
+} sdb_store_reader_t;
+
 /*
  * sdb_store_init:
  * Initialize the store sub-system. This function has to be called before