From: Florian Forster Date: Thu, 3 Dec 2015 12:05:03 +0000 (+0100) Subject: ceph plugin: Rewrite handling of JSON state. X-Git-Tag: collectd-5.5.1~35 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=e5a9d9dee9e5ad0d47a68723bb7aa33b65355214;p=collectd.git ceph plugin: Rewrite handling of JSON state. The previous code didn't handle non-numeric map values correctly, leaking state and resulting in ridiculously long key strings. This rewrite fixes this and adds a unit test to ensure that this is actually working as intended. Fixes: #1350 --- diff --git a/src/ceph.c b/src/ceph.c index 5b83adc7..251def9c 100644 --- a/src/ceph.c +++ b/src/ceph.c @@ -135,11 +135,10 @@ struct yajl_struct { node_handler_t handler; void * handler_arg; - struct { - char key[DATA_MAX_NAME_LEN]; - int key_len; - } state[YAJL_MAX_DEPTH]; - int depth; + + char *key; + char *stack[YAJL_MAX_DEPTH]; + size_t depth; }; typedef struct yajl_struct yajl_struct; @@ -267,69 +266,72 @@ static int ceph_cb_boolean(void *ctx, int bool_val) static int ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len) { - yajl_struct *yajl = (yajl_struct*)ctx; + yajl_struct *state = (yajl_struct*) ctx; char buffer[number_len+1]; - int i, status; char key[2 * DATA_MAX_NAME_LEN]; _Bool latency_type = 0; + size_t i; + int status; memcpy(buffer, number_val, number_len); buffer[sizeof(buffer) - 1] = 0; - sstrncpy (key, yajl->state[0].key, sizeof (key)); - for (i = 1; i < yajl->depth; i++) + memset (key, 0, sizeof (key)); + for (i = 0; i < state->depth; i++) { - /* Special case for latency metrics. */ - if ((i == yajl->depth-1) - && ((strcmp(yajl->state[i].key,"avgcount") == 0) - || (strcmp(yajl->state[i].key,"sum") == 0))) - { - /* Super-special case for filestore:JournalWrBytes. For some - * reason, Ceph schema encodes this as a count/sum pair while all - * other "Bytes" data (excluding used/capacity bytes for OSD space) - * uses a single "Derive" type. To spare further confusion, keep - * this KPI as the same type of other "Bytes". Instead of keeping - * an "average" or "rate", use the "sum" in the pair and assign - * that to the derive value. */ - if (convert_special_metrics && (i >= 2) - && (strcmp("filestore", yajl->state[i-2].key) == 0) - && (strcmp("journal_wr_bytes", yajl->state[i-1].key) == 0) - && (strcmp("avgcount", yajl->state[i].key) == 0)) - { - DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes"); - yajl->depth -= 1; - return CEPH_CB_CONTINUE; - } + if (state->stack[i] == NULL) + continue; - /* Don't add "avgcount" or "sum" to the key just yet. If the - * handler function signals RETRY_AVGCOUNT, we'll add it and try - * again. */ - latency_type = 1; - break; - } + if (strlen (key) != 0) + BUFFER_ADD (key, "."); + BUFFER_ADD (key, state->stack[i]); + } + + /* Special case for latency metrics. */ + if ((strcmp ("avgcount", state->key) == 0) + || (strcmp ("sum", state->key) == 0)) + { + latency_type = 1; + /* Super-special case for filestore.journal_wr_bytes.avgcount: For + * some reason, Ceph schema encodes this as a count/sum pair while all + * other "Bytes" data (excluding used/capacity bytes for OSD space) uses + * a single "Derive" type. To spare further confusion, keep this KPI as + * the same type of other "Bytes". Instead of keeping an "average" or + * "rate", use the "sum" in the pair and assign that to the derive + * value. */ + if (convert_special_metrics && (state->depth >= 2) + && (strcmp("filestore", state->stack[state->depth - 2]) == 0) + && (strcmp("journal_wr_bytes", state->stack[state->depth - 1]) == 0) + && (strcmp("avgcount", state->key) == 0)) + { + DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes"); + return CEPH_CB_CONTINUE; + } + } + else /* not a latency type */ + { BUFFER_ADD (key, "."); - BUFFER_ADD (key, yajl->state[i].key); + BUFFER_ADD (key, state->key); } - status = yajl->handler(yajl->handler_arg, buffer, key); + status = state->handler(state->handler_arg, buffer, key); if((status == RETRY_AVGCOUNT) && latency_type) { /* Add previously skipped part of the key, either "avgcount" or "sum", * and try again. */ BUFFER_ADD (key, "."); - BUFFER_ADD (key, yajl->state[yajl->depth-1].key); + BUFFER_ADD (key, state->key); - status = yajl->handler(yajl->handler_arg, buffer, key); + status = state->handler(state->handler_arg, buffer, key); } - if(status == -ENOMEM) + if (status != 0) { - ERROR("ceph plugin: memory allocation failed"); + ERROR("ceph plugin: JSON handler failed with status %d.", status); return CEPH_CB_ABORT; } - yajl->depth -= 1; return CEPH_CB_CONTINUE; } @@ -341,37 +343,52 @@ static int ceph_cb_string(void *ctx, const unsigned char *string_val, static int ceph_cb_start_map(void *ctx) { + yajl_struct *state = (yajl_struct*) ctx; + + /* Push key to the stack */ + if (state->depth == YAJL_MAX_DEPTH) + return CEPH_CB_ABORT; + + state->stack[state->depth] = state->key; + state->depth++; + state->key = NULL; + return CEPH_CB_CONTINUE; } -static int -ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len) +static int ceph_cb_end_map(void *ctx) { - yajl_struct *yajl = (yajl_struct*)ctx; + yajl_struct *state = (yajl_struct*) ctx; - if((yajl->depth+1) >= YAJL_MAX_DEPTH) - { - ERROR("ceph plugin: depth exceeds max, aborting."); + /* Pop key from the stack */ + if (state->depth == 0) return CEPH_CB_ABORT; - } - - char buffer[string_len+1]; - - memcpy(buffer, key, string_len); - buffer[sizeof(buffer) - 1] = 0; - snprintf(yajl->state[yajl->depth].key, sizeof(buffer), "%s", buffer); - yajl->state[yajl->depth].key_len = sizeof(buffer); - yajl->depth = (yajl->depth + 1); + sfree (state->key); + state->depth--; + state->key = state->stack[state->depth]; + state->stack[state->depth] = NULL; return CEPH_CB_CONTINUE; } -static int ceph_cb_end_map(void *ctx) +static int +ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len) { - yajl_struct *yajl = (yajl_struct*)ctx; + yajl_struct *state = (yajl_struct*) ctx; + size_t sz = ((size_t) string_len) + 1; + + sfree (state->key); + state->key = malloc (sz); + if (state->key == NULL) + { + ERROR ("ceph plugin: malloc failed."); + return CEPH_CB_ABORT; + } + + memmove (state->key, key, sz - 1); + state->key[sz - 1] = 0; - yajl->depth = (yajl->depth - 1); return CEPH_CB_CONTINUE; } diff --git a/src/ceph_test.c b/src/ceph_test.c index ffe2c7f6..ae67125f 100644 --- a/src/ceph_test.c +++ b/src/ceph_test.c @@ -22,6 +22,131 @@ #include "ceph.c" /* sic */ #include "testing.h" +struct case_s +{ + char *key; + char *value; +}; +typedef struct case_s case_t; + +struct test_s +{ + case_t *cases; + size_t cases_num; +}; +typedef struct test_s test_t; + +static int test_handler(void *user, char const *val, char const *key) +{ + test_t *t = user; + size_t i; + + char status[1024]; + _Bool ok; + + /* special case for latency metrics. */ + if (strcmp ("filestore.example_latency", key) == 0) + return RETRY_AVGCOUNT; + + snprintf (status, sizeof (status), "unexpected call: test_handler(\"%s\") = \"%s\"", key, val); + ok = 0; + + for (i = 0; i < t->cases_num; i++) + { + if (strcmp (key, t->cases[i].key) != 0) + continue; + + if (strcmp (val, t->cases[i].value) != 0) + { + snprintf (status, sizeof (status), "test_handler(\"%s\") = \"%s\", want \"%s\"", key, val, t->cases[i].value); + ok = 0; + break; + } + + snprintf (status, sizeof (status), "test_handler(\"%s\") = \"%s\"", key, val); + ok = 1; + break; + } + + OK1(ok, status); + return ok ? 0 : -1; +} + +DEF_TEST(traverse_json) +{ + char const *json = "{\n" + " \"WBThrottle\": {\n" + " \"bytes_dirtied\": {\n" + " \"type\": 2,\n" + " \"description\": \"Dirty data\",\n" + " \"nick\": \"\"\n" + " },\n" + " \"bytes_wb\": {\n" + " \"type\": 2,\n" + " \"description\": \"Written data\",\n" + " \"nick\": \"\"\n" + " },\n" + " \"ios_dirtied\": {\n" + " \"type\": 2,\n" + " \"description\": \"Dirty operations\",\n" + " \"nick\": \"\"\n" + " },\n" + " \"ios_wb\": {\n" + " \"type\": 2,\n" + " \"description\": \"Written operations\",\n" + " \"nick\": \"\"\n" + " },\n" + " \"inodes_dirtied\": {\n" + " \"type\": 2,\n" + " \"description\": \"Entries waiting for write\",\n" + " \"nick\": \"\"\n" + " },\n" + " \"inodes_wb\": {\n" + " \"type\": 10,\n" + " \"description\": \"Written entries\",\n" + " \"nick\": \"\"\n" + " }\n" + " },\n" + " \"filestore\": {\n" + " \"journal_wr_bytes\": {\n" + " \"avgcount\": 23,\n" + " \"sum\": 3117\n" + " },\n" + " \"example_latency\": {\n" + " \"avgcount\": 42,\n" + " \"sum\": 4711\n" + " }\n" + " }\n" + "}\n"; + case_t cases[] = { + {"WBThrottle.bytes_dirtied.type", "2"}, + {"WBThrottle.bytes_wb.type", "2"}, + {"WBThrottle.ios_dirtied.type", "2"}, + {"WBThrottle.ios_wb.type", "2"}, + {"WBThrottle.inodes_dirtied.type", "2"}, + {"WBThrottle.inodes_wb.type", "10"}, + {"filestore.journal_wr_bytes", "3117"}, + {"filestore.example_latency.avgcount", "42"}, + {"filestore.example_latency.sum", "4711"}, + }; + test_t t = {cases, STATIC_ARRAY_SIZE (cases)}; + + yajl_struct ctx = {test_handler, &t}; + + yajl_handle hndl; +#if HAVE_YAJL_V2 + hndl = yajl_alloc (&callbacks, NULL, &ctx); + CHECK_ZERO (traverse_json ((unsigned char *) json, (uint32_t) strlen (json), hndl)); + CHECK_ZERO (yajl_complete_parse (hndl)); +#else + hndl = yajl_alloc (&callbacks, NULL, NULL, &ctx); + CHECK_ZERO (traverse_json ((unsigned char *) json, (uint32_t) strlen (json), hndl)); + CHECK_ZERO (yajl_parse_complete (hndl)); +#endif + + return 0; +} + DEF_TEST(parse_keys) { struct { @@ -37,6 +162,7 @@ DEF_TEST(parse_keys) {"aa.bb.cc.dd.ee.ff", "Aa.bb.cc.dd.ee.ff"}, {"aa.bb.cc.dd.ee.ff.type", "Aa.bb.cc.dd.ee.ff"}, {"aa.type", "Aa.type"}, + {"WBThrottle.bytes_dirtied.type", "WBThrottle.bytesDirtied"}, }; size_t i; @@ -56,6 +182,7 @@ DEF_TEST(parse_keys) int main (void) { + RUN_TEST(traverse_json); RUN_TEST(parse_keys); END_TEST;