diff --git a/src/write_kafka.c b/src/write_kafka.c
index ba76d71f6c727e4f1b19a5514e74e1e6eb36926e..a2947d15b34fc84eedac2332a315128bf6a27c6b 100644 (file)
--- a/src/write_kafka.c
+++ b/src/write_kafka.c
int32_t partition_cnt, void *p, void *m)
{
u_int32_t key = *((u_int32_t *)keydata );
+ u_int32_t target = key % partition_cnt;
+ int32_t i = partition_cnt;
- return key % partition_cnt;
+ while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) {
+ target = (target + 1) % partition_cnt;
+ }
+ return target;
}
static int kafka_write(const data_set_t *ds, /* {{{ */
@@ -240,7 +245,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
goto errout;
}
key = child->values[0].value.string;
- val = child->values[0].value.string;
+ val = child->values[1].value.string;
ret = rd_kafka_topic_conf_set(tctx->conf,key, val,
errbuf, sizeof(errbuf));
if (ret != RD_KAFKA_CONF_OK) {