From: Sebastian Harl Date: Tue, 6 Oct 2015 19:47:34 +0000 (+0200) Subject: Build a more generic/powerful query API which writes to a store-writer. X-Git-Tag: sysdb-0.8.0~14 X-Git-Url: https://git.tokkee.org/?p=sysdb.git;a=commitdiff_plain;h=9a96acd759c31211aa512e174339a9c178c4eb83 Build a more generic/powerful query API which writes to a store-writer. Instead of letting the query implementation write JSON directly, let them emit objects to a store-writer. This allows for more powerful and centralized post-processing of the data and avoids code-duplication by moving all logic of how to write out the data to the core. Also, this had a couple of nice side effects and allowed for further simplication: - The TIMESERIES command is now handled by the front-end (based on a FETCH of the respective metric data); query plugin no longer have to implement this. - All protocol specific information is now handled by the frontend; query plugins no longer have to handle this (response-type). - Further separation of the memory-store and generic store code. --- diff --git a/src/core/plugin.c b/src/core/plugin.c index 8977699..3611be4 100644 --- a/src/core/plugin.c +++ b/src/core/plugin.c @@ -1437,7 +1437,8 @@ sdb_plugin_fetch_timeseries(const char *type, const char *id, } /* sdb_plugin_fetch_timeseries */ int -sdb_plugin_query(sdb_ast_node_t *ast, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf) +sdb_plugin_query(sdb_ast_node_t *ast, + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf) { size_t n = sdb_llist_len(reader_list); reader_t *reader; @@ -1449,8 +1450,7 @@ sdb_plugin_query(sdb_ast_node_t *ast, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf) if ((ast->type != SDB_AST_TYPE_FETCH) && (ast->type != SDB_AST_TYPE_LIST) - && (ast->type != SDB_AST_TYPE_LOOKUP) - && (ast->type != SDB_AST_TYPE_TIMESERIES)) { + && (ast->type != SDB_AST_TYPE_LOOKUP)) { sdb_log(SDB_LOG_ERR, "core: Cannot execute query of type %s", SDB_AST_TYPE_TO_STRING(ast)); sdb_strbuf_sprintf(errbuf, "Cannot execute query of type %s", @@ -1472,7 +1472,8 @@ sdb_plugin_query(sdb_ast_node_t *ast, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf) q = reader->impl.prepare_query(ast, errbuf, reader->r_user_data); if (q) - status = reader->impl.execute_query(q, buf, errbuf, reader->r_user_data); + status = reader->impl.execute_query(q, w, SDB_OBJ(wd), + errbuf, reader->r_user_data); else status = -1; diff --git a/src/core/store.c b/src/core/store.c index dfa6ac9..fa72955 100644 --- a/src/core/store.c +++ b/src/core/store.c @@ -689,11 +689,11 @@ prepare_query(sdb_ast_node_t *ast, static int execute_query(sdb_object_t *q, - sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf, sdb_object_t *user_data) { return sdb_store_query_execute(SDB_STORE(user_data), - QUERY(q), buf, errbuf); + QUERY(q), w, wd, errbuf); } /* execute_query */ sdb_store_reader_t sdb_store_reader = { @@ -903,75 +903,6 @@ sdb_store_get_attr(sdb_store_obj_t *obj, const char *name, sdb_data_t *res, return 0; } /* sdb_store_get_attr */ -/* TODO: sdb_store_fetch_timeseries should move into the plugin module */ - -int -sdb_store_fetch_timeseries(sdb_store_t *store, - const char *hostname, const char *metric, - sdb_timeseries_opts_t *opts, sdb_strbuf_t *buf) -{ - sdb_avltree_t *metrics; - host_t *host; - sdb_metric_t *m; - - sdb_timeseries_t *ts; - - int status = 0; - - if ((! store) || (! hostname) || (! metric) || (! opts) || (! buf)) - return -1; - - pthread_rwlock_rdlock(&store->host_lock); - host = HOST(sdb_avltree_lookup(store->hosts, hostname)); - metrics = get_host_children(host, SDB_METRIC); - sdb_object_deref(SDB_OBJ(host)); - if (! metrics) { - sdb_log(SDB_LOG_ERR, "store: Failed to fetch time-series '%s/%s' " - "- host '%s' not found", hostname, metric, hostname); - pthread_rwlock_unlock(&store->host_lock); - return -1; - } - - m = METRIC(sdb_avltree_lookup(metrics, metric)); - if (! m) { - sdb_log(SDB_LOG_ERR, "store: Failed to fetch time-series '%s/%s' " - "- metric '%s' not found", hostname, metric, metric); - pthread_rwlock_unlock(&store->host_lock); - return -1; - } - - if ((! m->store.type) || (! m->store.id)) { - sdb_log(SDB_LOG_ERR, "store: Failed to fetch time-series '%s/%s' " - "- no data-store configured for the stored metric", - hostname, metric); - sdb_object_deref(SDB_OBJ(m)); - pthread_rwlock_unlock(&store->host_lock); - return -1; - } - - { - char type[strlen(m->store.type) + 1]; - char id[strlen(m->store.id) + 1]; - - strncpy(type, m->store.type, sizeof(type)); - strncpy(id, m->store.id, sizeof(id)); - pthread_rwlock_unlock(&store->host_lock); - - ts = sdb_plugin_fetch_timeseries(type, id, opts); - if (! ts) { - sdb_log(SDB_LOG_ERR, "store: Failed to fetch time-series '%s/%s' " - "- %s fetcher callback returned no data for '%s'", - hostname, metric, type, id); - status = -1; - } - } - - sdb_timeseries_tojson(ts, buf); - sdb_object_deref(SDB_OBJ(m)); - sdb_timeseries_destroy(ts); - return status; -} /* sdb_store_fetch_timeseries */ - int sdb_store_scan(sdb_store_t *store, int type, sdb_store_matcher_t *m, sdb_store_matcher_t *filter, diff --git a/src/core/store_exec.c b/src/core/store_exec.c index 6cf3a82..d4e3088 100644 --- a/src/core/store_exec.c +++ b/src/core/store_exec.c @@ -84,16 +84,14 @@ lookup_tojson(sdb_store_obj_t *obj, sdb_store_matcher_t *filter, */ static int -exec_fetch(sdb_store_t *store, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, +exec_fetch(sdb_store_t *store, + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf, int type, const char *hostname, const char *name, sdb_store_matcher_t *filter) { - uint32_t res_type = htonl(SDB_CONNECTION_FETCH); - sdb_store_obj_t *host; sdb_store_obj_t *obj; - sdb_store_json_formatter_t *f; int status = 0; if ((! name) || ((type == SDB_HOST) && hostname) @@ -135,137 +133,64 @@ exec_fetch(sdb_store_t *store, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, } host = NULL; - f = sdb_store_json_formatter(buf, type, /* flags = */ 0); - if (! f) { - char err[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to create " - "JSON formatter to handle FETCH command: %s", - sdb_strerror(errno, err, sizeof(err))); - - sdb_strbuf_sprintf(errbuf, "Out of memory"); - sdb_object_deref(SDB_OBJ(obj)); - return -1; - } - - sdb_strbuf_memcpy(buf, &res_type, sizeof(uint32_t)); if (type != SDB_HOST) - status = sdb_store_emit(obj->parent, - &sdb_store_json_writer, SDB_OBJ(f)); - if (status || sdb_store_emit_full(obj, filter, - &sdb_store_json_writer, SDB_OBJ(f))) { + status = sdb_store_emit(obj->parent, w, wd); + if (status || sdb_store_emit_full(obj, filter, w, wd)) { sdb_log(SDB_LOG_ERR, "frontend: Failed to serialize " "%s %s.%s to JSON", SDB_STORE_TYPE_TO_NAME(type), hostname, name); sdb_strbuf_sprintf(errbuf, "Out of memory"); - sdb_object_deref(SDB_OBJ(f)); sdb_object_deref(SDB_OBJ(obj)); return -1; } sdb_object_deref(SDB_OBJ(obj)); - sdb_store_json_finish(f); - sdb_object_deref(SDB_OBJ(f)); - return SDB_CONNECTION_DATA; } /* exec_fetch */ static int -exec_list(sdb_store_t *store, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, +exec_list(sdb_store_t *store, + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf, int type, sdb_store_matcher_t *filter) { - uint32_t res_type = htonl(SDB_CONNECTION_LIST); - iter_t iter = { NULL, &sdb_store_json_writer, NULL }; - sdb_store_json_formatter_t *f; + iter_t iter = { NULL, w, wd }; - f = sdb_store_json_formatter(buf, type, SDB_WANT_ARRAY); - if (! f) { - char err[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to create " - "JSON formatter to handle LIST command: %s", - sdb_strerror(errno, err, sizeof(err))); - - sdb_strbuf_sprintf(errbuf, "Out of memory"); - return -1; - } - - iter.wd = SDB_OBJ(f); - sdb_strbuf_memcpy(buf, &res_type, sizeof(uint32_t)); if (sdb_store_scan(store, type, /* m = */ NULL, filter, list_tojson, &iter)) { sdb_log(SDB_LOG_ERR, "frontend: Failed to serialize " "store to JSON"); sdb_strbuf_sprintf(errbuf, "Out of memory"); - sdb_object_deref(SDB_OBJ(f)); return -1; } - sdb_store_json_finish(f); - sdb_object_deref(SDB_OBJ(f)); - return SDB_CONNECTION_DATA; } /* exec_list */ static int -exec_lookup(sdb_store_t *store, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, +exec_lookup(sdb_store_t *store, + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf, int type, sdb_store_matcher_t *m, sdb_store_matcher_t *filter) { - uint32_t res_type = htonl(SDB_CONNECTION_LOOKUP); - iter_t iter = { NULL, &sdb_store_json_writer, NULL }; - sdb_store_json_formatter_t *f; - - f = sdb_store_json_formatter(buf, type, SDB_WANT_ARRAY); - if (! f) { - char err[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to create " - "JSON formatter to handle LOOKUP command: %s", - sdb_strerror(errno, err, sizeof(err))); + iter_t iter = { NULL, w, wd }; - sdb_strbuf_sprintf(errbuf, "Out of memory"); - return -1; - } - - iter.wd = SDB_OBJ(f); - sdb_strbuf_memcpy(buf, &res_type, sizeof(uint32_t)); if (sdb_store_scan(store, type, m, filter, lookup_tojson, &iter)) { sdb_log(SDB_LOG_ERR, "frontend: Failed to lookup %ss", SDB_STORE_TYPE_TO_NAME(type)); sdb_strbuf_sprintf(errbuf, "Failed to lookup %ss", SDB_STORE_TYPE_TO_NAME(type)); - sdb_object_deref(SDB_OBJ(f)); return -1; } - sdb_store_json_finish(f); - sdb_object_deref(SDB_OBJ(f)); - return SDB_CONNECTION_DATA; } /* exec_lookup */ -static int -exec_timeseries(sdb_store_t *store, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, - const char *hostname, const char *metric, - sdb_timeseries_opts_t *opts) -{ - uint32_t res_type = htonl(SDB_CONNECTION_TIMESERIES); - - sdb_strbuf_memcpy(buf, &res_type, sizeof(uint32_t)); - if (sdb_store_fetch_timeseries(store, hostname, metric, opts, buf)) { - sdb_log(SDB_LOG_ERR, "frontend: Failed to fetch time-series"); - sdb_strbuf_sprintf(errbuf, "Failed to fetch time-series"); - return -1; - } - - return SDB_CONNECTION_DATA; -} /* exec_timeseries */ - /* * public API */ int sdb_store_query_execute(sdb_store_t *store, sdb_store_query_t *q, - sdb_strbuf_t *buf, sdb_strbuf_t *errbuf) + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf) { - sdb_timeseries_opts_t ts_opts; sdb_ast_node_t *ast; if (! q) @@ -278,25 +203,18 @@ sdb_store_query_execute(sdb_store_t *store, sdb_store_query_t *q, ast = q->ast; switch (ast->type) { case SDB_AST_TYPE_FETCH: - return exec_fetch(store, buf, errbuf, SDB_AST_FETCH(ast)->obj_type, + return exec_fetch(store, w, wd, errbuf, SDB_AST_FETCH(ast)->obj_type, SDB_AST_FETCH(ast)->hostname, SDB_AST_FETCH(ast)->name, q->filter); case SDB_AST_TYPE_LIST: - return exec_list(store, buf, errbuf, SDB_AST_LIST(ast)->obj_type, + return exec_list(store, w, wd, errbuf, SDB_AST_LIST(ast)->obj_type, q->filter); case SDB_AST_TYPE_LOOKUP: - return exec_lookup(store, buf, errbuf, SDB_AST_LOOKUP(ast)->obj_type, + return exec_lookup(store, w, wd, errbuf, SDB_AST_LOOKUP(ast)->obj_type, q->matcher, q->filter); - case SDB_AST_TYPE_TIMESERIES: - ts_opts.start = SDB_AST_TIMESERIES(ast)->start; - ts_opts.end = SDB_AST_TIMESERIES(ast)->end; - return exec_timeseries(store, buf, errbuf, - SDB_AST_TIMESERIES(ast)->hostname, - SDB_AST_TIMESERIES(ast)->metric, &ts_opts); - default: sdb_log(SDB_LOG_ERR, "store: Invalid query of type %s", SDB_AST_TYPE_TO_STRING(ast)); diff --git a/src/frontend/query.c b/src/frontend/query.c index 56cd4ec..9a157fb 100644 --- a/src/frontend/query.c +++ b/src/frontend/query.c @@ -43,6 +43,51 @@ #include #include +/* + * metric fetcher: + * Implements the callbacks necessary to read a metric object. + * TODO: FETCH should allow to ignore child elements (attributes); then, we'd + * only need store_host/store_metric. + */ + +typedef struct { + char *type; + char *id; +} metric_store_t; + +static int +metric_fetcher_host(sdb_store_host_t __attribute__((unused)) *host, + sdb_object_t __attribute__((unused)) *user_data) +{ + return 0; +} /* metric_fetcher_host */ + +static int +metric_fetcher_metric(sdb_store_metric_t *metric, sdb_object_t *user_data) +{ + metric_store_t *st = SDB_OBJ_WRAPPER(user_data)->data; + + if ((! metric->store.type) || (! metric->store.id)) + return 0; + + st->type = strdup(metric->store.type); + st->id = strdup(metric->store.id); + if ((! st->type) || (! st->id)) + return -1; + return 0; +} /* metric_fetcher_metric */ + +static int +metric_fetcher_attr(sdb_store_attribute_t __attribute__((unused)) *attr, + sdb_object_t __attribute__((unused)) *user_data) +{ + return 0; +} /* metric_fetcher_attr */ + +static sdb_store_writer_t metric_fetcher = { + metric_fetcher_host, NULL, metric_fetcher_metric, metric_fetcher_attr, +}; + /* * private helper functions */ @@ -59,6 +104,45 @@ sstrlen(const char *s) return s ? strlen(s) : 0; } /* sstrlen */ +static int +exec_query(sdb_ast_node_t *ast, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf) +{ + sdb_store_json_formatter_t *f; + int type = 0, flags = 0; + uint32_t res_type = 0; + int status; + + switch (ast->type) { + case SDB_AST_TYPE_FETCH: + type = SDB_AST_FETCH(ast)->obj_type; + res_type = htonl(SDB_CONNECTION_FETCH); + break; + case SDB_AST_TYPE_LIST: + type = SDB_AST_LIST(ast)->obj_type; + flags = SDB_WANT_ARRAY; + res_type = htonl(SDB_CONNECTION_LIST); + break; + case SDB_AST_TYPE_LOOKUP: + type = SDB_AST_LOOKUP(ast)->obj_type; + flags = SDB_WANT_ARRAY; + res_type = htonl(SDB_CONNECTION_LOOKUP); + break; + default: + sdb_strbuf_sprintf(errbuf, "invalid command %s (%#x)", + SDB_AST_TYPE_TO_STRING(ast), ast->type); + return -1; + } + + f = sdb_store_json_formatter(buf, type, flags); + sdb_strbuf_memcpy(buf, &res_type, sizeof(res_type)); + status = sdb_plugin_query(ast, &sdb_store_json_writer, SDB_OBJ(f), errbuf); + if (status < 0) + sdb_strbuf_clear(buf); + sdb_store_json_finish(f); + sdb_object_deref(SDB_OBJ(f)); + return status; +} /* exec_query */ + static int exec_store(sdb_ast_store_t *st, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf) { @@ -145,7 +229,58 @@ exec_store(sdb_ast_store_t *st, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf) } /* exec_store */ static int -exec_query(sdb_conn_t *conn, sdb_ast_node_t *ast) +exec_timeseries(sdb_ast_timeseries_t *ts, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf) +{ + metric_store_t st = { NULL, NULL }; + sdb_object_wrapper_t obj = SDB_OBJECT_WRAPPER_STATIC(&st); + sdb_ast_fetch_t fetch = SDB_AST_FETCH_INIT; + sdb_timeseries_opts_t opts = { 0, 0 }; + sdb_timeseries_t *series = NULL; + int status; + + if ((! ts) || (! ts->hostname) || (! ts->metric)) + return -1; + + fetch.obj_type = SDB_METRIC; + fetch.hostname = strdup(ts->hostname); + fetch.name = strdup(ts->metric); + opts.start = ts->start; + opts.end = ts->end; + + status = sdb_plugin_query(SDB_AST_NODE(&fetch), + &metric_fetcher, SDB_OBJ(&obj), errbuf); + if ((status < 0) || (! st.type) || (! st.id)) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to fetch time-series '%s/%s' " + "- no data-store configured for the stored metric", + ts->hostname, ts->metric); + status = -1; + } + if (status >= 0) { + series = sdb_plugin_fetch_timeseries(st.type, st.id, &opts); + if (! series) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to fetch time-series '%s/%s' " + "- %s fetcher callback returned no data for '%s'", + ts->hostname, ts->metric, st.type, st.id); + status = -1; + } + } + + if (status >= 0) { + sdb_timeseries_tojson(series, buf); + sdb_timeseries_destroy(series); + } + + free(fetch.hostname); + free(fetch.name); + if (st.type) + free(st.type); + if (st.id) + free(st.id); + return status; +} /* exec_timeseries */ + +static int +exec_cmd(sdb_conn_t *conn, sdb_ast_node_t *ast) { sdb_strbuf_t *buf; int status; @@ -160,10 +295,14 @@ exec_query(sdb_conn_t *conn, sdb_ast_node_t *ast) sdb_strbuf_sprintf(conn->errbuf, "Out of memory"); return -1; } + if (ast->type == SDB_AST_TYPE_STORE) status = exec_store(SDB_AST_STORE(ast), buf, conn->errbuf); + else if (ast->type == SDB_AST_TYPE_TIMESERIES) + status = exec_timeseries(SDB_AST_TIMESERIES(ast), buf, conn->errbuf); else - status = sdb_plugin_query(ast, buf, conn->errbuf); + status = exec_query(ast, buf, conn->errbuf); + if (status < 0) { char query[conn->cmd_len + 1]; strncpy(query, sdb_strbuf_string(conn->buf), conn->cmd_len); @@ -176,7 +315,7 @@ exec_query(sdb_conn_t *conn, sdb_ast_node_t *ast) sdb_strbuf_destroy(buf); return status < 0 ? status : 0; -} /* exec_query */ +} /* exec_cmd */ /* * public API @@ -227,7 +366,7 @@ sdb_conn_query(sdb_conn_t *conn) } if (ast) { - status = exec_query(conn, ast); + status = exec_cmd(conn, ast); sdb_object_deref(SDB_OBJ(ast)); } sdb_llist_destroy(parsetree); @@ -266,7 +405,7 @@ sdb_conn_fetch(sdb_conn_t *conn) hostname[0] ? strdup(hostname) : NULL, name[0] ? strdup(name) : NULL, /* filter = */ NULL); - status = exec_query(conn, ast); + status = exec_cmd(conn, ast); sdb_object_deref(SDB_OBJ(ast)); return status; } /* sdb_conn_fetch */ @@ -292,7 +431,7 @@ sdb_conn_list(sdb_conn_t *conn) } ast = sdb_ast_list_create((int)type, /* filter = */ NULL); - status = exec_query(conn, ast); + status = exec_cmd(conn, ast); sdb_object_deref(SDB_OBJ(ast)); return status; } /* sdb_conn_list */ @@ -336,7 +475,7 @@ sdb_conn_lookup(sdb_conn_t *conn) } ast = sdb_ast_lookup_create((int)type, m, /* filter = */ NULL); - status = exec_query(conn, ast); + status = exec_cmd(conn, ast); if (! ast) sdb_object_deref(SDB_OBJ(m)); sdb_object_deref(SDB_OBJ(ast)); @@ -443,7 +582,7 @@ sdb_conn_store(sdb_conn_t *conn) status = sdb_parser_analyze(ast, conn->errbuf); if (! status) - status = exec_query(conn, ast); + status = exec_cmd(conn, ast); sdb_object_deref(SDB_OBJ(ast)); return status; } /* sdb_conn_store */ diff --git a/src/include/core/plugin.h b/src/include/core/plugin.h index 6b5a9f4..6c4a2e1 100644 --- a/src/include/core/plugin.h +++ b/src/include/core/plugin.h @@ -446,7 +446,8 @@ sdb_plugin_fetch_timeseries(const char *type, const char *id, * - a negative value else */ int -sdb_plugin_query(sdb_ast_node_t *ast, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf); +sdb_plugin_query(sdb_ast_node_t *ast, + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf); /* * sdb_plugin_store_host, sdb_plugin_store_service, sdb_plugin_store_metric, diff --git a/src/include/core/store.h b/src/include/core/store.h index f42d217..0cee913 100644 --- a/src/include/core/store.h +++ b/src/include/core/store.h @@ -288,15 +288,12 @@ typedef struct { * execute_query: * Execute a previously prepared query. The callback may expect that only * queries prepared by its respective prepare callback will be passed to - * this function. - * - * TODO: Instead of letting the executor write directly to a string buffer - * (which cannot easily be merged with other results), let it hand - * all objects to a store-writer. + * this function. The query result will be passed back via the specified + * store writer. */ int (*execute_query)(sdb_object_t *q, - sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, - sdb_object_t *user_data); + sdb_store_writer_t *w, sdb_object_t *wd, + sdb_strbuf_t *errbuf, sdb_object_t *user_data); } sdb_store_reader_t; /* @@ -462,7 +459,7 @@ sdb_store_query_prepare_matcher(sdb_ast_node_t *ast); */ int sdb_store_query_execute(sdb_store_t *store, sdb_store_query_t *m, - sdb_strbuf_t *buf, sdb_strbuf_t *errbuf); + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf); /* * sdb_store_expr_create: