Code

Merge pull request #647 from manuelluis/mlsr/threshold-missing_time
[collectd.git] / src / write_kafka.c
index ec9009eb6c1a52a9b4d7dff20ac763fc00efa38f..97db42657055999fa114cd1c70ca9f882ba4325c 100644 (file)
@@ -28,7 +28,6 @@
 #include "utils_format_graphite.h"
 #include "utils_format_json.h"
 #include "utils_crc32.h"
-#include "riemann.pb-c.h"
 
 #include <sys/types.h>
 #include <librdkafka/rdkafka.h>
@@ -56,6 +55,13 @@ struct kafka_topic_context {
 static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
 static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
                                int32_t, void *, void *);
+static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
+
+static void kafka_log(const rd_kafka_t *rkt, int level,
+                      const char *fac, const char *msg)
+{
+    plugin_log(level, "%s", msg);
+}
 
 static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
                                const void *keydata, size_t keylen,
@@ -170,6 +176,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
     tctx->escape_char = '.';
     tctx->store_rates = 1;
 
+    rd_kafka_conf_set_log_cb(conf, kafka_log);
     if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                                     errbuf, sizeof(errbuf))) == NULL) {
         sfree(tctx);