summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 5e0906a)
raw | patch | inline | side by side (parent: 5e0906a)
author | Florian Forster <octo@collectd.org> | |
Tue, 1 Dec 2015 20:08:49 +0000 (21:08 +0100) | ||
committer | Florian Forster <octo@collectd.org> | |
Tue, 1 Dec 2015 20:09:32 +0000 (21:09 +0100) |
Fixes: #1283
src/write_kafka.c | patch | blob | history |
diff --git a/src/write_kafka.c b/src/write_kafka.c
index a5977aba786770e1e29489b9590a26ed21561c0c..9bc958f9a1bb56e40ffcd1f934ae6a24f0070c5e 100644 (file)
--- a/src/write_kafka.c
+++ b/src/write_kafka.c
rd_kafka_topic_t *topic;
rd_kafka_conf_t *kafka_conf;
rd_kafka_t *kafka;
- int has_key;
- uint32_t key;
+ char *key;
char *prefix;
char *postfix;
char escape_char;
user_data_t *ud)
{
int status = 0;
- uint32_t key;
+ void *key;
+ size_t keylen = 0;
char buffer[8192];
size_t bfree = sizeof(buffer);
size_t bfill = 0;
return -1;
}
- /*
- * We partition our stream by metric name
- */
- if (ctx->has_key)
- key = ctx->key;
- else
- key = rand();
+ key = ctx->key;
+ if (key != NULL)
+ keylen = strlen (key);
rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY, buffer, blen,
- &key, sizeof(key), NULL);
+ key, keylen, NULL);
return status;
} /* }}} int kafka_write */
@@ -318,19 +314,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
}
} else if (strcasecmp ("Key", child->key) == 0) {
- char *tmp_buf = NULL;
- status = cf_util_get_string(child, &tmp_buf);
- if (status != 0) {
- WARNING("write_kafka plugin: invalid key supplied");
- break;
- }
-
- if (strcasecmp(tmp_buf, "Random") != 0) {
- tctx->has_key = 1;
- tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
- }
- sfree(tmp_buf);
-
+ cf_util_get_string (child, &tctx->key);
} else if (strcasecmp ("Format", child->key) == 0) {
status = cf_util_get_string(child, &key);
if (status != 0)