From bb7b99fec92dc9ea1c7ce70e050f4d97fec47ded Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Sun, 4 Sep 2016 22:36:56 -0400 Subject: [PATCH] Add support for multiple metric data-stores. It did not make sense to only store one and let conflicting values override each other on each iteration. Instead, track each separately, along with the respective last_update timestamp. This will allow to read the right data on access. At the moment, when requesting a timeseries, the most up to date data store will be used. The network plugin does not support sending multiple data stores at the moment. It will only send the first. --- src/core/memstore-private.h | 13 +-- src/core/memstore.c | 156 +++++++++++++++++++++++++----------- src/core/plugin.c | 5 +- src/core/store_json.c | 2 +- src/frontend/query.c | 22 +++-- src/include/core/store.h | 7 +- src/plugins/store/network.c | 5 +- 7 files changed, 142 insertions(+), 68 deletions(-) diff --git a/src/core/memstore-private.h b/src/core/memstore-private.h index 1afb66a..784912b 100644 --- a/src/core/memstore-private.h +++ b/src/core/memstore-private.h @@ -80,15 +80,18 @@ typedef struct { #define SVC(obj) ((service_t *)(obj)) #define CONST_SVC(obj) ((const service_t *)(obj)) +typedef struct { + char *type; + char *id; + sdb_time_t last_update; +} metric_store_t; typedef struct { sdb_memstore_obj_t super; sdb_avltree_t *attributes; - struct { - char *type; - char *id; - sdb_time_t last_update; - } store; + + metric_store_t *stores; + size_t stores_num; } metric_t; #define METRIC(obj) ((metric_t *)(obj)) diff --git a/src/core/memstore.c b/src/core/memstore.c index 317a176..87f9512 100644 --- a/src/core/memstore.c +++ b/src/core/memstore.c @@ -216,7 +216,8 @@ metric_init(sdb_object_t *obj, va_list ap) if (! sobj->attributes) return -1; - sobj->store.type = sobj->store.id = NULL; + sobj->stores = NULL; + sobj->stores_num = 0; return 0; } /* metric_init */ @@ -224,17 +225,24 @@ static void metric_destroy(sdb_object_t *obj) { metric_t *sobj = METRIC(obj); - assert(obj); + size_t i; + assert(obj); store_obj_destroy(obj); if (sobj->attributes) sdb_avltree_destroy(sobj->attributes); - if (sobj->store.type) - free(sobj->store.type); - if (sobj->store.id) - free(sobj->store.id); + for (i = 0; i < sobj->stores_num; ++i) { + if (sobj->stores[i].type) + free(sobj->stores[i].type); + if (sobj->stores[i].id) + free(sobj->stores[i].id); + } + if (sobj->stores) + free(sobj->stores); + sobj->stores = NULL; + sobj->stores_num = 0; } /* metric_destroy */ static int @@ -401,41 +409,82 @@ store_obj(store_obj_t *obj, sdb_memstore_obj_t **updated_obj) } /* store_obj */ static int -store_metric_store(metric_t *metric, sdb_store_metric_t *m) +store_metric_update_store(metric_store_t *store, + const sdb_metric_store_t __attribute__((unused)) *s, + sdb_time_t last_update) { - char *type = metric->store.type; - char *id = metric->store.id; - - if (! m->store.last_update) - m->store.last_update = metric->store.last_update; - else if (m->store.last_update < metric->store.last_update) + if (last_update <= store->last_update) return 0; + store->last_update = last_update; + return 0; +} /* store_metric_update_store */ - if ((! metric->store.type) || strcasecmp(metric->store.type, m->store.type)) { - if (! (type = strdup(m->store.type))) - return -1; - } - if ((! metric->store.id) || strcasecmp(metric->store.id, m->store.id)) { - if (! (id = strdup(m->store.id))) { - if (type != metric->store.type) - free(type); - return -1; - } +static int +store_metric_add_store(metric_t *metric, const sdb_metric_store_t *s, + sdb_time_t last_update) +{ + char *type = strdup(s->type); + char *id = strdup(s->id); + + metric_store_t *new; + + if ((! type) || (! id)) { + if (type) + free(type); + if (id) + free(id); + return -1; } - if (type != metric->store.type) { - if (metric->store.type) - free(metric->store.type); - metric->store.type = type; + new = realloc(metric->stores, + (metric->stores_num + 1) * sizeof(*metric->stores)); + if (! new) { + free(type); + free(id); + return -1; } - if (id != metric->store.id) { - if (metric->store.id) - free(metric->store.id); - metric->store.id = id; + + metric->stores = new; + new = metric->stores + metric->stores_num; + metric->stores_num++; + + new->type = type; + new->id = id; + new->last_update = last_update; + return 0; +} /* store_metric_add_store */ + +static int +store_metric_stores(metric_t *metric, sdb_store_metric_t *m) +{ + size_t i; + + if (! m->stores_num) + return 0; + + for (i = 0; i < m->stores_num; ++i) { + sdb_time_t last_update = m->stores[i].last_update; + size_t j; + + if (last_update < m->last_update) + last_update = m->last_update; + + for (j = 0; j < metric->stores_num; ++j) { + if ((! strcasecmp(metric->stores[j].type, m->stores[i].type)) + && (! strcasecmp(metric->stores[j].id, m->stores[i].id))) { + if (store_metric_update_store(metric->stores + j, + m->stores + i, last_update) < 0) + return -1; + break; + } + } + + if (j >= metric->stores_num) + if (store_metric_add_store(metric, m->stores + i, last_update) < 0) + return -1; } - metric->store.last_update = m->store.last_update; return 0; -} /* store_metric_store */ +} /* store_metric_stores */ /* The store's host_lock has to be acquired before calling this function. */ static sdb_avltree_t * @@ -622,12 +671,14 @@ store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data) host_t *host; int status = 0; + size_t i; if ((! metric) || (! metric->hostname) || (! metric->name)) return -1; - if ((metric->store.type != NULL) != (metric->store.id != NULL)) - return -1; + for (i = 0; i < metric->stores_num; ++i) + if ((metric->stores[i].type == NULL) || (metric->stores[i].id == NULL)) + return -1; pthread_rwlock_wrlock(&st->host_lock); host = HOST(sdb_avltree_lookup(st->hosts, metric->hostname)); @@ -655,9 +706,8 @@ store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data) } assert(new); - if (metric->store.type && metric->store.id) - if (store_metric_store(METRIC(new), metric)) - status = -1; + if (store_metric_stores(METRIC(new), metric)) + status = -1; pthread_rwlock_unlock(&st->host_lock); return status; } /* store_metric */ @@ -727,14 +777,16 @@ sdb_memstore_metric(sdb_memstore_t *store, const char *hostname, const char *nam sdb_time_t last_update, sdb_time_t interval) { sdb_store_metric_t metric = { - hostname, name, - { NULL, NULL, 0 }, + hostname, name, /* stores */ NULL, 0, last_update, interval, NULL, 0, }; if (metric_store) { - metric.store.type = metric_store->type; - metric.store.id = metric_store->id; - metric.store.last_update = metric_store->last_update; + metric.stores = &(const sdb_metric_store_t){ + metric_store->type, + metric_store->id, + metric_store->last_update, + }; + metric.stores_num = 1; } return store_metric(&metric, SDB_OBJ(store)); } /* sdb_memstore_metric */ @@ -860,7 +912,7 @@ sdb_memstore_get_field(sdb_memstore_obj_t *obj, int field, sdb_data_t *res) if (obj->type != SDB_METRIC) return -1; tmp.type = SDB_TYPE_BOOLEAN; - tmp.data.boolean = METRIC(obj)->store.type != NULL; + tmp.data.boolean = METRIC(obj)->stores_num > 0; default: return -1; } @@ -1002,19 +1054,25 @@ sdb_memstore_emit(sdb_memstore_obj_t *obj, sdb_store_writer_t *w, sdb_object_t * } case SDB_METRIC: { + sdb_metric_store_t metric_stores[METRIC(obj)->stores_num]; sdb_store_metric_t metric = { obj->parent ? obj->parent->_name : NULL, obj->_name, - { - METRIC(obj)->store.type, - METRIC(obj)->store.id, - METRIC(obj)->store.last_update, - }, + metric_stores, + METRIC(obj)->stores_num, obj->last_update, obj->interval, (const char * const *)obj->backends, obj->backends_num, }; + size_t i; + + for (i = 0; i < METRIC(obj)->stores_num; ++i) { + metric_stores[i].type = METRIC(obj)->stores[i].type; + metric_stores[i].id = METRIC(obj)->stores[i].id; + metric_stores[i].last_update = METRIC(obj)->stores[i].last_update; + } + if (! w->store_metric) return -1; return w->store_metric(&metric, wd); diff --git a/src/core/plugin.c b/src/core/plugin.c index 92fa886..0bf785d 100644 --- a/src/core/plugin.c +++ b/src/core/plugin.c @@ -1735,9 +1735,8 @@ sdb_plugin_store_metric(const char *hostname, const char *name, if (store) { if (store->last_update < last_update) store->last_update = last_update; - metric.store.type = store->type; - metric.store.id = store->id; - metric.store.last_update = store->last_update; + metric.stores = store; + metric.stores_num = 1; } metric.last_update = last_update ? last_update : sdb_gettime(); if (get_interval(SDB_METRIC, cname, -1, NULL, name, diff --git a/src/core/store_json.c b/src/core/store_json.c index 9266608..56baa46 100644 --- a/src/core/store_json.c +++ b/src/core/store_json.c @@ -326,7 +326,7 @@ emit_metric(sdb_store_metric_t *metric, sdb_object_t *user_data) metric->name, /* value */ NULL, - /* timeseries */ metric->store.type != NULL, + /* timeseries */ metric->stores_num > 0, metric->last_update, metric->interval, diff --git a/src/frontend/query.c b/src/frontend/query.c index 3f33a99..302c586 100644 --- a/src/frontend/query.c +++ b/src/frontend/query.c @@ -65,15 +65,23 @@ static int metric_fetcher_metric(sdb_store_metric_t *metric, sdb_object_t *user_data) { metric_store_t *st = SDB_OBJ_WRAPPER(user_data)->data; + sdb_time_t last_update = 0; + size_t idx = 0, i; - if ((! metric->store.type) || (! metric->store.id)) - return 0; - - st->type = strdup(metric->store.type); - st->id = strdup(metric->store.id); - st->last_update = metric->store.last_update; - if ((! st->type) || (! st->id)) + if (! metric->stores_num) return -1; + + /* Find the most up to date data store. + * TODO: Consider merging multiple results? */ + for (i = 0; i < metric->stores_num; ++i) { + if (metric->stores[i].last_update > last_update) { + last_update = metric->stores[i].last_update; + idx = i; + } + } + st->type = strdup(metric->stores[idx].type); + st->id = strdup(metric->stores[idx].id); + st->last_update = metric->stores[idx].last_update; return 0; } /* metric_fetcher_metric */ diff --git a/src/include/core/store.h b/src/include/core/store.h index e432cab..3f20429 100644 --- a/src/include/core/store.h +++ b/src/include/core/store.h @@ -134,14 +134,17 @@ typedef struct { typedef struct { const char *hostname; const char *name; - sdb_metric_store_t store; + + /* All data stores providing this metric. */ + const sdb_metric_store_t *stores; + size_t stores_num; sdb_time_t last_update; sdb_time_t interval; const char * const *backends; size_t backends_num; } sdb_store_metric_t; -#define SDB_STORE_METRIC_INIT { NULL, NULL, { NULL, NULL, 0 }, 0, 0, NULL, 0 } +#define SDB_STORE_METRIC_INIT { NULL, NULL, NULL, 0, 0, 0, NULL, 0 } /* * sdb_store_attribute_t represents a stored attribute. diff --git a/src/plugins/store/network.c b/src/plugins/store/network.c index 77012b9..79df107 100644 --- a/src/plugins/store/network.c +++ b/src/plugins/store/network.c @@ -149,7 +149,10 @@ store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data) { sdb_proto_metric_t m = { metric->last_update, metric->hostname, metric->name, - metric->store.type, metric->store.id, metric->store.last_update, + /* TODO: Add support for sending all data stores. */ + metric->stores_num ? metric->stores[0].type : NULL, + metric->stores_num ? metric->stores[0].id : NULL, + metric->stores_num ? metric->stores[0].last_update : 0, }; size_t len = sdb_proto_marshal_metric(NULL, 0, &m); char buf[len]; -- 2.30.2