diff --git a/src/write_kafka.c b/src/write_kafka.c
index c5c7e3df9b3bac47b78b3fd216b13655b308b2c1..424acac95044f4f5c0bac57b52d5b772ddbe19c9 100644 (file)
--- a/src/write_kafka.c
+++ b/src/write_kafka.c
#include "utils_format_json.h"
#include "utils_crc32.h"
-#include <sys/types.h>
+#include <stdint.h>
#include <librdkafka/rdkafka.h>
#include <pthread.h>
#include <zlib.h>
#define KAFKA_FORMAT_JSON 0
#define KAFKA_FORMAT_COMMAND 1
#define KAFKA_FORMAT_GRAPHITE 2
- u_int8_t format;
+ uint8_t format;
unsigned int graphite_flags;
_Bool store_rates;
rd_kafka_topic_conf_t *conf;
rd_kafka_conf_t *kafka_conf;
rd_kafka_t *kafka;
int has_key;
- u_int32_t key;
+ uint32_t key;
char *prefix;
char *postfix;
char escape_char;
const void *keydata, size_t keylen,
int32_t partition_cnt, void *p, void *m)
{
- u_int32_t key = *((u_int32_t *)keydata );
- u_int32_t target = key % partition_cnt;
+ uint32_t key = *((uint32_t *)keydata );
+ uint32_t target = key % partition_cnt;
int32_t i = partition_cnt;
while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) {
}
rd_kafka_conf_destroy(ctx->kafka_conf);
+ ctx->kafka_conf = NULL;
+
INFO ("write_kafka plugin: created KAFKA handle : %s", rd_kafka_name(ctx->kafka));
#ifdef HAVE_LIBRDKAFKA_LOGGER
}
rd_kafka_topic_conf_destroy(ctx->conf);
+ ctx->conf = NULL;
+
INFO ("write_kafka plugin: handle created for topic : %s", rd_kafka_topic_name(ctx->topic));
}
user_data_t *ud)
{
int status = 0;
- u_int32_t key;
+ uint32_t key;
char buffer[8192];
size_t bfree = sizeof(buffer);
size_t bfill = 0;
}
if (conf != NULL)
rd_kafka_conf_destroy(conf);
- return (0);
+ return (0);
errout:
if (conf != NULL)
rd_kafka_conf_destroy(conf);