summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: eca80da)
raw | patch | inline | side by side (parent: eca80da)
author | Pierre-Yves Ritschard <pyr@spootnik.org> | |
Thu, 31 Jul 2014 07:58:06 +0000 (09:58 +0200) | ||
committer | Pierre-Yves Ritschard <pyr@spootnik.org> | |
Thu, 31 Jul 2014 07:58:06 +0000 (09:58 +0200) |
configure.ac | patch | blob | history | |
src/write_kafka.c | patch | blob | history |
diff --git a/configure.ac b/configure.ac
index 59f05710246ec36a66a2ea325f35551b70e039b6..6e04af16b7b116a19f74d27e01d366162083366e 100644 (file)
--- a/configure.ac
+++ b/configure.ac
if test "x$with_librdkafka" = "xyes"
then
AC_CHECK_LIB(rdkafka, rd_kafka_new, [with_librdkafka="yes"], [with_librdkafka="no (Symbol 'rd_kafka_new' not found)"])
+ AC_CHECK_LIB(rdkafka, rd_kafka_conf_set_log_cb, [with_librdkafka_log="yes"], [with_librdkafka_log="no (Symbol 'rd_kafka_conf_set_log_cb not found)"])
fi
if test "x$with_librdkafka" = "xyes"
then
AC_SUBST(BUILD_WITH_LIBRDKAFKA_LDFLAGS)
AC_SUBST(BUILD_WITH_LIBRDKAFKA_LIBS)
AC_DEFINE(HAVE_LIBRDKAFKA, 1, [Define if librdkafka is present and usable.])
+ if test "x$with_librdkafka_log" = "xyes"
+ then
+ AC_DEFINE(HAVE_LIBRDKAFKA_LOG, 1, [Define if librdkafka log facility is present and usable.])
+ fi
fi
CPPFLAGS="$SAVE_CPPFLAGS"
LDFLAGS="$SAVE_LDFLAGS"
diff --git a/src/write_kafka.c b/src/write_kafka.c
index b74fe97ddc6ecde6d50bd3d94c981234191516b5..ff3176dd8dc3240a22733e1ba9019083a67ec243 100644 (file)
--- a/src/write_kafka.c
+++ b/src/write_kafka.c
static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
int32_t, void *, void *);
+
+#ifdef HAVE_LIBRDKAFKA_LOG
static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
static void kafka_log(const rd_kafka_t *rkt, int level,
plugin_log(level, "%s", msg);
}
+#endif
+
static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
const void *keydata, size_t keylen,
int32_t partition_cnt, void *p, void *m)
@@ -182,7 +186,12 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
tctx->store_rates = 1;
tctx->format = KAFKA_FORMAT_JSON;
+#ifdef HAVE_LIBRDKAFKA_LOG
+ /*
+ * Some versions of rdkafka do not allow setting a log callback.
+ */
rd_kafka_conf_set_log_cb(conf, kafka_log);
+#endif
if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errbuf, sizeof(errbuf))) == NULL) {
sfree(tctx);