diff --git a/src/write_kafka.c b/src/write_kafka.c
index 736fddb12a993f8aeaebea16a8a7e513351bda6a..a5371d4c0a933e40d3c2c9051babf5b39383909f 100644 (file)
--- a/src/write_kafka.c
+++ b/src/write_kafka.c
rd_kafka_topic_t *topic;
rd_kafka_conf_t *kafka_conf;
rd_kafka_t *kafka;
rd_kafka_topic_t *topic;
rd_kafka_conf_t *kafka_conf;
rd_kafka_t *kafka;
- int has_key;
- uint32_t key;
+ char *key;
char *prefix;
char *postfix;
char escape_char;
char *prefix;
char *postfix;
char escape_char;
user_data_t *ud)
{
int status = 0;
user_data_t *ud)
{
int status = 0;
- uint32_t key;
+ void *key;
+ size_t keylen = 0;
char buffer[8192];
size_t bfree = sizeof(buffer);
size_t bfill = 0;
char buffer[8192];
size_t bfree = sizeof(buffer);
size_t bfill = 0;
return -1;
}
return -1;
}
- /*
- * We partition our stream by metric name
- */
- if (ctx->has_key)
- key = ctx->key;
- else
- key = rand();
+ key = ctx->key;
+ if (key != NULL)
+ keylen = strlen (key);
rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY, buffer, blen,
rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY, buffer, blen,
- &key, sizeof(key), NULL);
+ key, keylen, NULL);
return status;
} /* }}} int kafka_write */
return status;
} /* }}} int kafka_write */
@@ -318,19 +314,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
}
} else if (strcasecmp ("Key", child->key) == 0) {
}
} else if (strcasecmp ("Key", child->key) == 0) {
- char *tmp_buf = NULL;
- status = cf_util_get_string(child, &tmp_buf);
- if (status != 0) {
- WARNING("write_kafka plugin: invalid key supplied");
- break;
- }
-
- if (strcasecmp(tmp_buf, "Random") != 0) {
- tctx->has_key = 1;
- tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
- }
- sfree(tmp_buf);
-
+ cf_util_get_string (child, &tctx->key);
} else if (strcasecmp ("Format", child->key) == 0) {
status = cf_util_get_string(child, &key);
if (status != 0)
} else if (strcasecmp ("Format", child->key) == 0) {
status = cf_util_get_string(child, &key);
if (status != 0)