X-Git-Url: https://git.tokkee.org/?p=sysdb.git;a=blobdiff_plain;f=src%2Fcore%2Fstore_exec.c;h=d4e308806bac8ece40c9a12a20885df768c2dffe;hp=c377129b825e8c6e42a2a690e6953f3a21749337;hb=9a96acd759c31211aa512e174339a9c178c4eb83;hpb=3603db13105aefac4d6ceb356bcfd118a92fa654 diff --git a/src/core/store_exec.c b/src/core/store_exec.c index c377129..d4e3088 100644 --- a/src/core/store_exec.c +++ b/src/core/store_exec.c @@ -43,8 +43,10 @@ */ typedef struct { - sdb_store_json_formatter_t *f; sdb_store_obj_t *current_host; + + sdb_store_writer_t *w; + sdb_object_t *wd; } iter_t; static int @@ -55,7 +57,7 @@ maybe_emit_host(iter_t *iter, sdb_store_obj_t *obj) if (iter->current_host == obj->parent) return 0; iter->current_host = obj->parent; - return sdb_store_json_emit(iter->f, obj->parent); + return sdb_store_emit(obj->parent, iter->w, iter->wd); } /* maybe_emit_host */ static int @@ -65,7 +67,7 @@ list_tojson(sdb_store_obj_t *obj, { iter_t *iter = user_data; maybe_emit_host(iter, obj); - return sdb_store_json_emit(iter->f, obj); + return sdb_store_emit(obj, iter->w, iter->wd); } /* list_tojson */ static int @@ -74,7 +76,7 @@ lookup_tojson(sdb_store_obj_t *obj, sdb_store_matcher_t *filter, { iter_t *iter = user_data; maybe_emit_host(iter, obj); - return sdb_store_json_emit_full(iter->f, obj, filter); + return sdb_store_emit_full(obj, filter, iter->w, iter->wd); } /* lookup_tojson */ /* @@ -82,16 +84,14 @@ lookup_tojson(sdb_store_obj_t *obj, sdb_store_matcher_t *filter, */ static int -exec_fetch(sdb_store_t *store, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, +exec_fetch(sdb_store_t *store, + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf, int type, const char *hostname, const char *name, sdb_store_matcher_t *filter) { - uint32_t res_type = htonl(SDB_CONNECTION_FETCH); - sdb_store_obj_t *host; sdb_store_obj_t *obj; - sdb_store_json_formatter_t *f; int status = 0; if ((! name) || ((type == SDB_HOST) && hostname) @@ -133,132 +133,64 @@ exec_fetch(sdb_store_t *store, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, } host = NULL; - f = sdb_store_json_formatter(buf, type, /* flags = */ 0); - if (! f) { - char err[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to create " - "JSON formatter to handle FETCH command: %s", - sdb_strerror(errno, err, sizeof(err))); - - sdb_strbuf_sprintf(errbuf, "Out of memory"); - sdb_object_deref(SDB_OBJ(obj)); - return -1; - } - - sdb_strbuf_memcpy(buf, &res_type, sizeof(uint32_t)); if (type != SDB_HOST) - status = sdb_store_json_emit(f, obj->parent); - if (status || sdb_store_json_emit_full(f, obj, filter)) { + status = sdb_store_emit(obj->parent, w, wd); + if (status || sdb_store_emit_full(obj, filter, w, wd)) { sdb_log(SDB_LOG_ERR, "frontend: Failed to serialize " "%s %s.%s to JSON", SDB_STORE_TYPE_TO_NAME(type), hostname, name); sdb_strbuf_sprintf(errbuf, "Out of memory"); - sdb_object_deref(SDB_OBJ(f)); sdb_object_deref(SDB_OBJ(obj)); return -1; } sdb_object_deref(SDB_OBJ(obj)); - sdb_store_json_finish(f); - sdb_object_deref(SDB_OBJ(f)); - return SDB_CONNECTION_DATA; } /* exec_fetch */ static int -exec_list(sdb_store_t *store, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, +exec_list(sdb_store_t *store, + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf, int type, sdb_store_matcher_t *filter) { - uint32_t res_type = htonl(SDB_CONNECTION_LIST); - iter_t iter = { NULL, NULL }; - - iter.f = sdb_store_json_formatter(buf, type, SDB_WANT_ARRAY); - if (! iter.f) { - char err[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to create " - "JSON formatter to handle LIST command: %s", - sdb_strerror(errno, err, sizeof(err))); - - sdb_strbuf_sprintf(errbuf, "Out of memory"); - return -1; - } + iter_t iter = { NULL, w, wd }; - sdb_strbuf_memcpy(buf, &res_type, sizeof(uint32_t)); if (sdb_store_scan(store, type, /* m = */ NULL, filter, list_tojson, &iter)) { sdb_log(SDB_LOG_ERR, "frontend: Failed to serialize " "store to JSON"); sdb_strbuf_sprintf(errbuf, "Out of memory"); - sdb_object_deref(SDB_OBJ(iter.f)); return -1; } - sdb_store_json_finish(iter.f); - sdb_object_deref(SDB_OBJ(iter.f)); - return SDB_CONNECTION_DATA; } /* exec_list */ static int -exec_lookup(sdb_store_t *store, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, +exec_lookup(sdb_store_t *store, + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf, int type, sdb_store_matcher_t *m, sdb_store_matcher_t *filter) { - uint32_t res_type = htonl(SDB_CONNECTION_LOOKUP); - iter_t iter = { NULL, NULL }; - - iter.f = sdb_store_json_formatter(buf, type, SDB_WANT_ARRAY); - if (! iter.f) { - char err[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to create " - "JSON formatter to handle LOOKUP command: %s", - sdb_strerror(errno, err, sizeof(err))); - - sdb_strbuf_sprintf(errbuf, "Out of memory"); - return -1; - } - - sdb_strbuf_memcpy(buf, &res_type, sizeof(uint32_t)); + iter_t iter = { NULL, w, wd }; if (sdb_store_scan(store, type, m, filter, lookup_tojson, &iter)) { sdb_log(SDB_LOG_ERR, "frontend: Failed to lookup %ss", SDB_STORE_TYPE_TO_NAME(type)); sdb_strbuf_sprintf(errbuf, "Failed to lookup %ss", SDB_STORE_TYPE_TO_NAME(type)); - sdb_object_deref(SDB_OBJ(iter.f)); return -1; } - sdb_store_json_finish(iter.f); - sdb_object_deref(SDB_OBJ(iter.f)); - return SDB_CONNECTION_DATA; } /* exec_lookup */ -static int -exec_timeseries(sdb_store_t *store, sdb_strbuf_t *buf, sdb_strbuf_t *errbuf, - const char *hostname, const char *metric, - sdb_timeseries_opts_t *opts) -{ - uint32_t res_type = htonl(SDB_CONNECTION_TIMESERIES); - - sdb_strbuf_memcpy(buf, &res_type, sizeof(uint32_t)); - if (sdb_store_fetch_timeseries(store, hostname, metric, opts, buf)) { - sdb_log(SDB_LOG_ERR, "frontend: Failed to fetch time-series"); - sdb_strbuf_sprintf(errbuf, "Failed to fetch time-series"); - return -1; - } - - return SDB_CONNECTION_DATA; -} /* exec_timeseries */ - /* * public API */ int sdb_store_query_execute(sdb_store_t *store, sdb_store_query_t *q, - sdb_strbuf_t *buf, sdb_strbuf_t *errbuf) + sdb_store_writer_t *w, sdb_object_t *wd, sdb_strbuf_t *errbuf) { - sdb_timeseries_opts_t ts_opts; sdb_ast_node_t *ast; if (! q) @@ -271,25 +203,18 @@ sdb_store_query_execute(sdb_store_t *store, sdb_store_query_t *q, ast = q->ast; switch (ast->type) { case SDB_AST_TYPE_FETCH: - return exec_fetch(store, buf, errbuf, SDB_AST_FETCH(ast)->obj_type, + return exec_fetch(store, w, wd, errbuf, SDB_AST_FETCH(ast)->obj_type, SDB_AST_FETCH(ast)->hostname, SDB_AST_FETCH(ast)->name, q->filter); case SDB_AST_TYPE_LIST: - return exec_list(store, buf, errbuf, SDB_AST_LIST(ast)->obj_type, + return exec_list(store, w, wd, errbuf, SDB_AST_LIST(ast)->obj_type, q->filter); case SDB_AST_TYPE_LOOKUP: - return exec_lookup(store, buf, errbuf, SDB_AST_LOOKUP(ast)->obj_type, + return exec_lookup(store, w, wd, errbuf, SDB_AST_LOOKUP(ast)->obj_type, q->matcher, q->filter); - case SDB_AST_TYPE_TIMESERIES: - ts_opts.start = SDB_AST_TIMESERIES(ast)->start; - ts_opts.end = SDB_AST_TIMESERIES(ast)->end; - return exec_timeseries(store, buf, errbuf, - SDB_AST_TIMESERIES(ast)->hostname, - SDB_AST_TIMESERIES(ast)->metric, &ts_opts); - default: sdb_log(SDB_LOG_ERR, "store: Invalid query of type %s", SDB_AST_TYPE_TO_STRING(ast));