Code

Add support for multiple metric data-stores.
authorSebastian Harl <sh@tokkee.org>
Mon, 5 Sep 2016 02:36:56 +0000 (22:36 -0400)
committerSebastian Harl <sh@tokkee.org>
Tue, 6 Sep 2016 02:21:10 +0000 (22:21 -0400)
It did not make sense to only store one and let conflicting values override
each other on each iteration. Instead, track each separately, along with the
respective last_update timestamp. This will allow to read the right data on
access.

At the moment, when requesting a timeseries, the most up to date data store
will be used.

The network plugin does not support sending multiple data stores at the
moment. It will only send the first.

src/core/memstore-private.h
src/core/memstore.c
src/core/plugin.c
src/core/store_json.c
src/frontend/query.c
src/include/core/store.h
src/plugins/store/network.c

index 1afb66ad736668038b0794a525ecc7657e595a34..784912bbe44223bf6613db9d4ff7653793dcd7d3 100644 (file)
@@ -80,15 +80,18 @@ typedef struct {
 #define SVC(obj) ((service_t *)(obj))
 #define CONST_SVC(obj) ((const service_t *)(obj))
 
+typedef struct {
+       char *type;
+       char *id;
+       sdb_time_t last_update;
+} metric_store_t;
 typedef struct {
        sdb_memstore_obj_t super;
 
        sdb_avltree_t *attributes;
-       struct {
-               char *type;
-               char *id;
-               sdb_time_t last_update;
-       } store;
+
+       metric_store_t *stores;
+       size_t stores_num;
 } metric_t;
 #define METRIC(obj) ((metric_t *)(obj))
 
index 317a1761491486cdb6295862383fab4fd6520a1d..87f9512745446d0f079ca6792fb12ce40168b1ca 100644 (file)
@@ -216,7 +216,8 @@ metric_init(sdb_object_t *obj, va_list ap)
        if (! sobj->attributes)
                return -1;
 
-       sobj->store.type = sobj->store.id = NULL;
+       sobj->stores = NULL;
+       sobj->stores_num = 0;
        return 0;
 } /* metric_init */
 
@@ -224,17 +225,24 @@ static void
 metric_destroy(sdb_object_t *obj)
 {
        metric_t *sobj = METRIC(obj);
-       assert(obj);
+       size_t i;
 
+       assert(obj);
        store_obj_destroy(obj);
 
        if (sobj->attributes)
                sdb_avltree_destroy(sobj->attributes);
 
-       if (sobj->store.type)
-               free(sobj->store.type);
-       if (sobj->store.id)
-               free(sobj->store.id);
+       for (i = 0; i < sobj->stores_num; ++i) {
+               if (sobj->stores[i].type)
+                       free(sobj->stores[i].type);
+               if (sobj->stores[i].id)
+                       free(sobj->stores[i].id);
+       }
+       if (sobj->stores)
+               free(sobj->stores);
+       sobj->stores = NULL;
+       sobj->stores_num = 0;
 } /* metric_destroy */
 
 static int
@@ -401,41 +409,82 @@ store_obj(store_obj_t *obj, sdb_memstore_obj_t **updated_obj)
 } /* store_obj */
 
 static int
-store_metric_store(metric_t *metric, sdb_store_metric_t *m)
+store_metric_update_store(metric_store_t *store,
+               const sdb_metric_store_t __attribute__((unused)) *s,
+               sdb_time_t last_update)
 {
-       char *type = metric->store.type;
-       char *id = metric->store.id;
-
-       if (! m->store.last_update)
-               m->store.last_update = metric->store.last_update;
-       else if (m->store.last_update < metric->store.last_update)
+       if (last_update <= store->last_update)
                return 0;
+       store->last_update = last_update;
+       return 0;
+} /* store_metric_update_store */
 
-       if ((! metric->store.type) || strcasecmp(metric->store.type, m->store.type)) {
-               if (! (type = strdup(m->store.type)))
-                       return -1;
-       }
-       if ((! metric->store.id) || strcasecmp(metric->store.id, m->store.id)) {
-               if (! (id = strdup(m->store.id))) {
-                       if (type != metric->store.type)
-                               free(type);
-                       return -1;
-               }
+static int
+store_metric_add_store(metric_t *metric, const sdb_metric_store_t *s,
+               sdb_time_t last_update)
+{
+       char *type = strdup(s->type);
+       char *id = strdup(s->id);
+
+       metric_store_t *new;
+
+       if ((! type) || (! id)) {
+               if (type)
+                       free(type);
+               if (id)
+                       free(id);
+               return -1;
        }
 
-       if (type != metric->store.type) {
-               if (metric->store.type)
-                       free(metric->store.type);
-               metric->store.type = type;
+       new = realloc(metric->stores,
+                       (metric->stores_num + 1) * sizeof(*metric->stores));
+       if (! new) {
+               free(type);
+               free(id);
+               return -1;
        }
-       if (id != metric->store.id) {
-               if (metric->store.id)
-                       free(metric->store.id);
-               metric->store.id = id;
+
+       metric->stores = new;
+       new = metric->stores + metric->stores_num;
+       metric->stores_num++;
+
+       new->type = type;
+       new->id = id;
+       new->last_update = last_update;
+       return 0;
+} /* store_metric_add_store */
+
+static int
+store_metric_stores(metric_t *metric, sdb_store_metric_t *m)
+{
+       size_t i;
+
+       if (! m->stores_num)
+               return 0;
+
+       for (i = 0; i < m->stores_num; ++i) {
+               sdb_time_t last_update = m->stores[i].last_update;
+               size_t j;
+
+               if (last_update < m->last_update)
+                       last_update = m->last_update;
+
+               for (j = 0; j < metric->stores_num; ++j) {
+                       if ((! strcasecmp(metric->stores[j].type, m->stores[i].type))
+                                       && (! strcasecmp(metric->stores[j].id, m->stores[i].id))) {
+                               if (store_metric_update_store(metric->stores + j,
+                                                       m->stores + i, last_update) < 0)
+                                       return -1;
+                               break;
+                       }
+               }
+
+               if (j >= metric->stores_num)
+                       if (store_metric_add_store(metric, m->stores + i, last_update) < 0)
+                               return -1;
        }
-       metric->store.last_update = m->store.last_update;
        return 0;
-} /* store_metric_store */
+} /* store_metric_stores */
 
 /* The store's host_lock has to be acquired before calling this function. */
 static sdb_avltree_t *
@@ -622,12 +671,14 @@ store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
        host_t *host;
 
        int status = 0;
+       size_t i;
 
        if ((! metric) || (! metric->hostname) || (! metric->name))
                return -1;
 
-       if ((metric->store.type != NULL) != (metric->store.id != NULL))
-               return -1;
+       for (i = 0; i < metric->stores_num; ++i)
+               if ((metric->stores[i].type == NULL) || (metric->stores[i].id == NULL))
+                       return -1;
 
        pthread_rwlock_wrlock(&st->host_lock);
        host = HOST(sdb_avltree_lookup(st->hosts, metric->hostname));
@@ -655,9 +706,8 @@ store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
        }
 
        assert(new);
-       if (metric->store.type && metric->store.id)
-               if (store_metric_store(METRIC(new), metric))
-                       status = -1;
+       if (store_metric_stores(METRIC(new), metric))
+               status = -1;
        pthread_rwlock_unlock(&st->host_lock);
        return status;
 } /* store_metric */
@@ -727,14 +777,16 @@ sdb_memstore_metric(sdb_memstore_t *store, const char *hostname, const char *nam
                sdb_time_t last_update, sdb_time_t interval)
 {
        sdb_store_metric_t metric = {
-               hostname, name,
-               { NULL, NULL, 0 },
+               hostname, name, /* stores */ NULL, 0,
                last_update, interval, NULL, 0,
        };
        if (metric_store) {
-               metric.store.type = metric_store->type;
-               metric.store.id = metric_store->id;
-               metric.store.last_update = metric_store->last_update;
+               metric.stores = &(const sdb_metric_store_t){
+                       metric_store->type,
+                       metric_store->id,
+                       metric_store->last_update,
+               };
+               metric.stores_num = 1;
        }
        return store_metric(&metric, SDB_OBJ(store));
 } /* sdb_memstore_metric */
@@ -860,7 +912,7 @@ sdb_memstore_get_field(sdb_memstore_obj_t *obj, int field, sdb_data_t *res)
                        if (obj->type != SDB_METRIC)
                                return -1;
                        tmp.type = SDB_TYPE_BOOLEAN;
-                       tmp.data.boolean = METRIC(obj)->store.type != NULL;
+                       tmp.data.boolean = METRIC(obj)->stores_num > 0;
                default:
                        return -1;
        }
@@ -1002,19 +1054,25 @@ sdb_memstore_emit(sdb_memstore_obj_t *obj, sdb_store_writer_t *w, sdb_object_t *
                }
        case SDB_METRIC:
                {
+                       sdb_metric_store_t metric_stores[METRIC(obj)->stores_num];
                        sdb_store_metric_t metric = {
                                obj->parent ? obj->parent->_name : NULL,
                                obj->_name,
-                               {
-                                       METRIC(obj)->store.type,
-                                       METRIC(obj)->store.id,
-                                       METRIC(obj)->store.last_update,
-                               },
+                               metric_stores,
+                               METRIC(obj)->stores_num,
                                obj->last_update,
                                obj->interval,
                                (const char * const *)obj->backends,
                                obj->backends_num,
                        };
+                       size_t i;
+
+                       for (i = 0; i < METRIC(obj)->stores_num; ++i) {
+                               metric_stores[i].type = METRIC(obj)->stores[i].type;
+                               metric_stores[i].id = METRIC(obj)->stores[i].id;
+                               metric_stores[i].last_update = METRIC(obj)->stores[i].last_update;
+                       }
+
                        if (! w->store_metric)
                                return -1;
                        return w->store_metric(&metric, wd);
index 92fa8866981f6e5b4623100498c04d7e5567f0d8..0bf785d571a29b57ac9ecba7d9f34dd1dc6d6b5a 100644 (file)
@@ -1735,9 +1735,8 @@ sdb_plugin_store_metric(const char *hostname, const char *name,
        if (store) {
                if (store->last_update < last_update)
                        store->last_update = last_update;
-               metric.store.type = store->type;
-               metric.store.id = store->id;
-               metric.store.last_update = store->last_update;
+               metric.stores = store;
+               metric.stores_num = 1;
        }
        metric.last_update = last_update ? last_update : sdb_gettime();
        if (get_interval(SDB_METRIC, cname, -1, NULL, name,
index 92666089870016beea0b0307f846d458b258b8fc..56baa46d476d33f9f83974046ecbb982d375a2fa 100644 (file)
@@ -326,7 +326,7 @@ emit_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
                        metric->name,
 
                        /* value */ NULL,
-                       /* timeseries */ metric->store.type != NULL,
+                       /* timeseries */ metric->stores_num > 0,
 
                        metric->last_update,
                        metric->interval,
index 3f33a9914b7bfd342727f73e4ae74134ef9c1b11..302c586a6c227de98b7e3ae035f58224a135a7de 100644 (file)
@@ -65,15 +65,23 @@ static int
 metric_fetcher_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
 {
        metric_store_t *st = SDB_OBJ_WRAPPER(user_data)->data;
+       sdb_time_t last_update = 0;
+       size_t idx = 0, i;
 
-       if ((! metric->store.type) || (! metric->store.id))
-               return 0;
-
-       st->type = strdup(metric->store.type);
-       st->id = strdup(metric->store.id);
-       st->last_update = metric->store.last_update;
-       if ((! st->type) || (! st->id))
+       if (! metric->stores_num)
                return -1;
+
+       /* Find the most up to date data store.
+        * TODO: Consider merging multiple results? */
+       for (i = 0; i < metric->stores_num; ++i) {
+               if (metric->stores[i].last_update > last_update) {
+                       last_update = metric->stores[i].last_update;
+                       idx = i;
+               }
+       }
+       st->type = strdup(metric->stores[idx].type);
+       st->id = strdup(metric->stores[idx].id);
+       st->last_update = metric->stores[idx].last_update;
        return 0;
 } /* metric_fetcher_metric */
 
index e432cab428130252e517b72b0855fc210468c5c7..3f20429b9f3e37eccdf40c2938c111ad163203b1 100644 (file)
@@ -134,14 +134,17 @@ typedef struct {
 typedef struct {
        const char *hostname;
        const char *name;
-       sdb_metric_store_t store;
+
+       /* All data stores providing this metric. */
+       const sdb_metric_store_t *stores;
+       size_t stores_num;
 
        sdb_time_t last_update;
        sdb_time_t interval;
        const char * const *backends;
        size_t backends_num;
 } sdb_store_metric_t;
-#define SDB_STORE_METRIC_INIT { NULL, NULL, { NULL, NULL, 0 }, 0, 0, NULL, 0 }
+#define SDB_STORE_METRIC_INIT { NULL, NULL, NULL, 0, 0, 0, NULL, 0 }
 
 /*
  * sdb_store_attribute_t represents a stored attribute.
index 77012b95bf9cab931c6a02e7f06685cd57e0c68d..79df1070d1ba10e02b054b5ed3769cd0c0d8ec91 100644 (file)
@@ -149,7 +149,10 @@ store_metric(sdb_store_metric_t *metric, sdb_object_t *user_data)
 {
        sdb_proto_metric_t m = {
                metric->last_update, metric->hostname, metric->name,
-               metric->store.type, metric->store.id, metric->store.last_update,
+               /* TODO: Add support for sending all data stores. */
+               metric->stores_num ? metric->stores[0].type : NULL,
+               metric->stores_num ? metric->stores[0].id : NULL,
+               metric->stores_num ? metric->stores[0].last_update : 0,
        };
        size_t len = sdb_proto_marshal_metric(NULL, 0, &m);
        char buf[len];