From: Vincent Bernat Date: Wed, 10 Dec 2014 14:41:49 +0000 (+0100) Subject: write_kafka: check for partition availability before selecting one X-Git-Tag: collectd-5.5.0~104^2 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=19c45a0a9443b55aa850d57c46a73349552b9835;p=collectd.git write_kafka: check for partition availability before selecting one When a partition is unavailable, sending to it will just lead to a lost metric. Therefore, after selecting the partition, check if it is available. If not, select the next one until we tried them all. A future iteration may use consistent hashing to avoid to double the work done on a partition when the previous one is unavailable. --- diff --git a/src/write_kafka.c b/src/write_kafka.c index ba76d71f..e1d2f122 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -76,8 +76,13 @@ static int32_t kafka_partition(const rd_kafka_topic_t *rkt, int32_t partition_cnt, void *p, void *m) { u_int32_t key = *((u_int32_t *)keydata ); + u_int32_t target = key % partition_cnt; + int32_t i = partition_cnt; - return key % partition_cnt; + while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) { + target = (target + 1) % partition_cnt; + } + return target; } static int kafka_write(const data_set_t *ds, /* {{{ */