diff --git a/src/write_kafka.c b/src/write_kafka.c
index 75da6aaafefdc6d2bacb3de0c1fae5bdc0eb0ef0..a719cd376d18a5cd6fb690117133977ddfff13fd 100644 (file)
--- a/src/write_kafka.c
+++ b/src/write_kafka.c
static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
int32_t, void *, void *);
-#if defined HAVE_LIBRDKAFKA_LOGGER || defined HAVE_LIBRDKAFKA_LOG_CB
+/* Version 0.9.0 of librdkafka deprecates rd_kafka_set_logger() in favor of
+ * rd_kafka_conf_set_log_cb(). This is to make sure we're not using the
+ * deprecated function. */
+#ifdef HAVE_LIBRDKAFKA_LOG_CB
+# undef HAVE_LIBRDKAFKA_LOGGER
+#endif
+
+#if defined(HAVE_LIBRDKAFKA_LOGGER) || defined(HAVE_LIBRDKAFKA_LOG_CB)
static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
static void kafka_log(const rd_kafka_t *rkt, int level,
return hash;
}
+/* 31 bit -> 4 byte -> 8 byte hex string + null byte */
+#define KAFKA_RANDOM_KEY_SIZE 9
+#define KAFKA_RANDOM_KEY_BUFFER (char[KAFKA_RANDOM_KEY_SIZE]) {""}
+static char *kafka_random_key(char buffer[static KAFKA_RANDOM_KEY_SIZE])
+{
+ ssnprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08lX", (unsigned long) mrand48());
+ return buffer;
+}
+
static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
const void *keydata, size_t keylen,
int32_t partition_cnt, void *p, void *m)
switch (ctx->format) {
case KAFKA_FORMAT_COMMAND:
- status = create_putval(buffer, sizeof(buffer), ds, vl);
+ status = cmd_create_putval(buffer, sizeof(buffer), ds, vl);
if (status != 0) {
- ERROR("write_kafka plugin: create_putval failed with status %i.",
+ ERROR("write_kafka plugin: cmd_create_putval failed with status %i.",
status);
return status;
}
return -1;
}
- key = ctx->key;
- if (key != NULL)
- keylen = strlen (key);
- else
- keylen = 0;
+ key = (ctx->key != NULL)
+ ? ctx->key
+ : kafka_random_key(KAFKA_RANDOM_KEY_BUFFER);
+ keylen = strlen (key);
rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY, buffer, blen,
@@ -319,8 +334,12 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
}
} else if (strcasecmp ("Key", child->key) == 0) {
- cf_util_get_string (child, &tctx->key);
- assert (tctx->key != NULL);
+ if (cf_util_get_string (child, &tctx->key) != 0)
+ continue;
+ if (strcasecmp ("Random", tctx->key) == 0) {
+ sfree(tctx->key);
+ tctx->key = strdup (kafka_random_key (KAFKA_RANDOM_KEY_BUFFER));
+ }
} else if (strcasecmp ("Format", child->key) == 0) {
status = cf_util_get_string(child, &key);
if (status != 0)
@@ -357,6 +376,10 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
status = cf_util_get_flag (child, &tctx->graphite_flags,
GRAPHITE_ALWAYS_APPEND_DS);
+ } else if (strcasecmp ("GraphitePreserveSeparator", child->key) == 0) {
+ status = cf_util_get_flag (child, &tctx->graphite_flags,
+ GRAPHITE_PRESERVE_SEPARATOR);
+
} else if (strcasecmp ("GraphitePrefix", child->key) == 0) {
status = cf_util_get_string (child, &tctx->prefix);
} else if (strcasecmp ("GraphitePostfix", child->key) == 0) {