1 /**
2 * collectd - src/write_kafka.c
3 * Copyright (C) 2014 Pierre-Yves Ritschard
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Pierre-Yves Ritschard <pyr at spootnik.org>
25 */
27 #include "collectd.h"
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_cmd_putval.h"
32 #include "utils_format_graphite.h"
33 #include "utils_format_json.h"
34 #include "utils_random.h"
36 #include <errno.h>
37 #include <librdkafka/rdkafka.h>
38 #include <stdint.h>
40 struct kafka_topic_context {
41 #define KAFKA_FORMAT_JSON 0
42 #define KAFKA_FORMAT_COMMAND 1
43 #define KAFKA_FORMAT_GRAPHITE 2
44 uint8_t format;
45 unsigned int graphite_flags;
46 _Bool store_rates;
47 rd_kafka_topic_conf_t *conf;
48 rd_kafka_topic_t *topic;
49 rd_kafka_conf_t *kafka_conf;
50 rd_kafka_t *kafka;
51 char *key;
52 char *prefix;
53 char *postfix;
54 char escape_char;
55 char *topic_name;
56 pthread_mutex_t lock;
57 };
59 static int kafka_handle(struct kafka_topic_context *);
60 static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
61 static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
62 int32_t, void *, void *);
64 /* Version 0.9.0 of librdkafka deprecates rd_kafka_set_logger() in favor of
65 * rd_kafka_conf_set_log_cb(). This is to make sure we're not using the
66 * deprecated function. */
67 #ifdef HAVE_LIBRDKAFKA_LOG_CB
68 #undef HAVE_LIBRDKAFKA_LOGGER
69 #endif
71 #if defined(HAVE_LIBRDKAFKA_LOGGER) || defined(HAVE_LIBRDKAFKA_LOG_CB)
72 static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
74 static void kafka_log(const rd_kafka_t *rkt, int level, const char *fac,
75 const char *msg) {
76 plugin_log(level, "%s", msg);
77 }
78 #endif
80 static uint32_t kafka_hash(const char *keydata, size_t keylen) {
81 uint32_t hash = 5381;
82 for (; keylen > 0; keylen--)
83 hash = ((hash << 5) + hash) + keydata[keylen - 1];
84 return hash;
85 }
87 /* 31 bit -> 4 byte -> 8 byte hex string + null byte */
88 #define KAFKA_RANDOM_KEY_SIZE 9
89 #define KAFKA_RANDOM_KEY_BUFFER \
90 (char[KAFKA_RANDOM_KEY_SIZE]) { "" }
91 static char *kafka_random_key(char buffer[static KAFKA_RANDOM_KEY_SIZE]) {
92 ssnprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08" PRIX32, cdrand_u());
93 return buffer;
94 }
96 static int32_t kafka_partition(const rd_kafka_topic_t *rkt, const void *keydata,
97 size_t keylen, int32_t partition_cnt, void *p,
98 void *m) {
99 uint32_t key = kafka_hash(keydata, keylen);
100 uint32_t target = key % partition_cnt;
101 int32_t i = partition_cnt;
103 while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) {
104 target = (target + 1) % partition_cnt;
105 }
106 return target;
107 }
109 static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */
110 {
111 char errbuf[1024];
112 rd_kafka_conf_t *conf;
113 rd_kafka_topic_conf_t *topic_conf;
115 if (ctx->kafka != NULL && ctx->topic != NULL)
116 return (0);
118 if (ctx->kafka == NULL) {
119 if ((conf = rd_kafka_conf_dup(ctx->kafka_conf)) == NULL) {
120 ERROR("write_kafka plugin: cannot duplicate kafka config");
121 return (1);
122 }
124 if ((ctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errbuf,
125 sizeof(errbuf))) == NULL) {
126 ERROR("write_kafka plugin: cannot create kafka handle.");
127 return 1;
128 }
130 rd_kafka_conf_destroy(ctx->kafka_conf);
131 ctx->kafka_conf = NULL;
133 INFO("write_kafka plugin: created KAFKA handle : %s",
134 rd_kafka_name(ctx->kafka));
136 #if defined(HAVE_LIBRDKAFKA_LOGGER) && !defined(HAVE_LIBRDKAFKA_LOG_CB)
137 rd_kafka_set_logger(ctx->kafka, kafka_log);
138 #endif
139 }
141 if (ctx->topic == NULL) {
142 if ((topic_conf = rd_kafka_topic_conf_dup(ctx->conf)) == NULL) {
143 ERROR("write_kafka plugin: cannot duplicate kafka topic config");
144 return 1;
145 }
147 if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name,
148 topic_conf)) == NULL) {
149 ERROR("write_kafka plugin: cannot create topic : %s\n",
150 rd_kafka_err2str(rd_kafka_errno2err(errno)));
151 return errno;
152 }
154 rd_kafka_topic_conf_destroy(ctx->conf);
155 ctx->conf = NULL;
157 INFO("write_kafka plugin: handle created for topic : %s",
158 rd_kafka_topic_name(ctx->topic));
159 }
161 return (0);
163 } /* }}} int kafka_handle */
165 static int kafka_write(const data_set_t *ds, /* {{{ */
166 const value_list_t *vl, user_data_t *ud) {
167 int status = 0;
168 void *key;
169 size_t keylen = 0;
170 char buffer[8192];
171 size_t bfree = sizeof(buffer);
172 size_t bfill = 0;
173 size_t blen = 0;
174 struct kafka_topic_context *ctx = ud->data;
176 if ((ds == NULL) || (vl == NULL) || (ctx == NULL))
177 return EINVAL;
179 pthread_mutex_lock(&ctx->lock);
180 status = kafka_handle(ctx);
181 pthread_mutex_unlock(&ctx->lock);
182 if (status != 0)
183 return status;
185 bzero(buffer, sizeof(buffer));
187 switch (ctx->format) {
188 case KAFKA_FORMAT_COMMAND:
189 status = create_putval(buffer, sizeof(buffer), ds, vl);
190 if (status != 0) {
191 ERROR("write_kafka plugin: create_putval failed with status %i.", status);
192 return status;
193 }
194 blen = strlen(buffer);
195 break;
196 case KAFKA_FORMAT_JSON:
197 format_json_initialize(buffer, &bfill, &bfree);
198 format_json_value_list(buffer, &bfill, &bfree, ds, vl, ctx->store_rates);
199 format_json_finalize(buffer, &bfill, &bfree);
200 blen = strlen(buffer);
201 break;
202 case KAFKA_FORMAT_GRAPHITE:
203 status =
204 format_graphite(buffer, sizeof(buffer), ds, vl, ctx->prefix,
205 ctx->postfix, ctx->escape_char, ctx->graphite_flags);
206 if (status != 0) {
207 ERROR("write_kafka plugin: format_graphite failed with status %i.",
208 status);
209 return status;
210 }
211 blen = strlen(buffer);
212 break;
213 default:
214 ERROR("write_kafka plugin: invalid format %i.", ctx->format);
215 return -1;
216 }
218 key =
219 (ctx->key != NULL) ? ctx->key : kafka_random_key(KAFKA_RANDOM_KEY_BUFFER);
220 keylen = strlen(key);
222 rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
223 buffer, blen, key, keylen, NULL);
225 return status;
226 } /* }}} int kafka_write */
228 static void kafka_topic_context_free(void *p) /* {{{ */
229 {
230 struct kafka_topic_context *ctx = p;
232 if (ctx == NULL)
233 return;
235 if (ctx->topic_name != NULL)
236 sfree(ctx->topic_name);
237 if (ctx->topic != NULL)
238 rd_kafka_topic_destroy(ctx->topic);
239 if (ctx->conf != NULL)
240 rd_kafka_topic_conf_destroy(ctx->conf);
241 if (ctx->kafka_conf != NULL)
242 rd_kafka_conf_destroy(ctx->kafka_conf);
243 if (ctx->kafka != NULL)
244 rd_kafka_destroy(ctx->kafka);
246 sfree(ctx);
247 } /* }}} void kafka_topic_context_free */
249 static void kafka_config_topic(rd_kafka_conf_t *conf,
250 oconfig_item_t *ci) /* {{{ */
251 {
252 int status;
253 struct kafka_topic_context *tctx;
254 char *key = NULL;
255 char *val;
256 char callback_name[DATA_MAX_NAME_LEN];
257 char errbuf[1024];
258 oconfig_item_t *child;
259 rd_kafka_conf_res_t ret;
261 if ((tctx = calloc(1, sizeof(*tctx))) == NULL) {
262 ERROR("write_kafka plugin: calloc failed.");
263 return;
264 }
266 tctx->escape_char = '.';
267 tctx->store_rates = 1;
268 tctx->format = KAFKA_FORMAT_JSON;
269 tctx->key = NULL;
271 if ((tctx->kafka_conf = rd_kafka_conf_dup(conf)) == NULL) {
272 sfree(tctx);
273 ERROR("write_kafka plugin: cannot allocate memory for kafka config");
274 return;
275 }
277 #ifdef HAVE_LIBRDKAFKA_LOG_CB
278 rd_kafka_conf_set_log_cb(tctx->kafka_conf, kafka_log);
279 #endif
281 if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {
282 rd_kafka_conf_destroy(tctx->kafka_conf);
283 sfree(tctx);
284 ERROR("write_kafka plugin: cannot create topic configuration.");
285 return;
286 }
288 if (ci->values_num != 1) {
289 WARNING("kafka topic name needed.");
290 goto errout;
291 }
293 if (ci->values[0].type != OCONFIG_TYPE_STRING) {
294 WARNING("kafka topic needs a string argument.");
295 goto errout;
296 }
298 if ((tctx->topic_name = strdup(ci->values[0].value.string)) == NULL) {
299 ERROR("write_kafka plugin: cannot copy topic name.");
300 goto errout;
301 }
303 for (int i = 0; i < ci->children_num; i++) {
304 /*
305 * The code here could be simplified but makes room
306 * for easy adding of new options later on.
307 */
308 child = &ci->children[i];
309 status = 0;
311 if (strcasecmp("Property", child->key) == 0) {
312 if (child->values_num != 2) {
313 WARNING("kafka properties need both a key and a value.");
314 goto errout;
315 }
316 if (child->values[0].type != OCONFIG_TYPE_STRING ||
317 child->values[1].type != OCONFIG_TYPE_STRING) {
318 WARNING("kafka properties needs string arguments.");
319 goto errout;
320 }
321 key = child->values[0].value.string;
322 val = child->values[1].value.string;
323 ret =
324 rd_kafka_topic_conf_set(tctx->conf, key, val, errbuf, sizeof(errbuf));
325 if (ret != RD_KAFKA_CONF_OK) {
326 WARNING("cannot set kafka topic property %s to %s: %s.", key, val,
327 errbuf);
328 goto errout;
329 }
331 } else if (strcasecmp("Key", child->key) == 0) {
332 if (cf_util_get_string(child, &tctx->key) != 0)
333 continue;
334 if (strcasecmp("Random", tctx->key) == 0) {
335 sfree(tctx->key);
336 tctx->key = strdup(kafka_random_key(KAFKA_RANDOM_KEY_BUFFER));
337 }
338 } else if (strcasecmp("Format", child->key) == 0) {
339 status = cf_util_get_string(child, &key);
340 if (status != 0)
341 goto errout;
343 assert(key != NULL);
345 if (strcasecmp(key, "Command") == 0) {
346 tctx->format = KAFKA_FORMAT_COMMAND;
348 } else if (strcasecmp(key, "Graphite") == 0) {
349 tctx->format = KAFKA_FORMAT_GRAPHITE;
351 } else if (strcasecmp(key, "Json") == 0) {
352 tctx->format = KAFKA_FORMAT_JSON;
354 } else {
355 WARNING("write_kafka plugin: Invalid format string: %s", key);
356 }
358 sfree(key);
360 } else if (strcasecmp("StoreRates", child->key) == 0) {
361 status = cf_util_get_boolean(child, &tctx->store_rates);
362 (void)cf_util_get_flag(child, &tctx->graphite_flags,
363 GRAPHITE_STORE_RATES);
365 } else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0) {
366 status = cf_util_get_flag(child, &tctx->graphite_flags,
367 GRAPHITE_SEPARATE_INSTANCES);
369 } else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0) {
370 status = cf_util_get_flag(child, &tctx->graphite_flags,
371 GRAPHITE_ALWAYS_APPEND_DS);
373 } else if (strcasecmp("GraphitePrefix", child->key) == 0) {
374 status = cf_util_get_string(child, &tctx->prefix);
375 } else if (strcasecmp("GraphitePostfix", child->key) == 0) {
376 status = cf_util_get_string(child, &tctx->postfix);
377 } else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
378 char *tmp_buff = NULL;
379 status = cf_util_get_string(child, &tmp_buff);
380 if (strlen(tmp_buff) > 1)
381 WARNING("write_kafka plugin: The option \"GraphiteEscapeChar\" handles "
382 "only one character. Others will be ignored.");
383 tctx->escape_char = tmp_buff[0];
384 sfree(tmp_buff);
385 } else {
386 WARNING("write_kafka plugin: Invalid directive: %s.", child->key);
387 }
389 if (status != 0)
390 break;
391 }
393 rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition);
394 rd_kafka_topic_conf_set_opaque(tctx->conf, tctx);
396 ssnprintf(callback_name, sizeof(callback_name), "write_kafka/%s",
397 tctx->topic_name);
399 user_data_t ud = {.data = tctx, .free_func = kafka_topic_context_free};
401 status = plugin_register_write(callback_name, kafka_write, &ud);
402 if (status != 0) {
403 WARNING("write_kafka plugin: plugin_register_write (\"%s\") "
404 "failed with status %i.",
405 callback_name, status);
406 goto errout;
407 }
409 pthread_mutex_init(&tctx->lock, /* attr = */ NULL);
411 return;
412 errout:
413 if (tctx->topic_name != NULL)
414 free(tctx->topic_name);
415 if (tctx->conf != NULL)
416 rd_kafka_topic_conf_destroy(tctx->conf);
417 if (tctx->kafka_conf != NULL)
418 rd_kafka_conf_destroy(tctx->kafka_conf);
419 sfree(tctx);
420 } /* }}} int kafka_config_topic */
422 static int kafka_config(oconfig_item_t *ci) /* {{{ */
423 {
424 oconfig_item_t *child;
425 rd_kafka_conf_t *conf;
426 rd_kafka_conf_res_t ret;
427 char errbuf[1024];
429 if ((conf = rd_kafka_conf_new()) == NULL) {
430 WARNING("cannot allocate kafka configuration.");
431 return -1;
432 }
433 for (int i = 0; i < ci->children_num; i++) {
434 child = &ci->children[i];
436 if (strcasecmp("Topic", child->key) == 0) {
437 kafka_config_topic(conf, child);
438 } else if (strcasecmp(child->key, "Property") == 0) {
439 char *key = NULL;
440 char *val = NULL;
442 if (child->values_num != 2) {
443 WARNING("kafka properties need both a key and a value.");
444 goto errout;
445 }
446 if (child->values[0].type != OCONFIG_TYPE_STRING ||
447 child->values[1].type != OCONFIG_TYPE_STRING) {
448 WARNING("kafka properties needs string arguments.");
449 goto errout;
450 }
451 if ((key = strdup(child->values[0].value.string)) == NULL) {
452 WARNING("cannot allocate memory for attribute key.");
453 goto errout;
454 }
455 if ((val = strdup(child->values[1].value.string)) == NULL) {
456 WARNING("cannot allocate memory for attribute value.");
457 sfree(key);
458 goto errout;
459 }
460 ret = rd_kafka_conf_set(conf, key, val, errbuf, sizeof(errbuf));
461 if (ret != RD_KAFKA_CONF_OK) {
462 WARNING("cannot set kafka property %s to %s: %s", key, val, errbuf);
463 sfree(key);
464 sfree(val);
465 goto errout;
466 }
467 sfree(key);
468 sfree(val);
469 } else {
470 WARNING("write_kafka plugin: Ignoring unknown "
471 "configuration option \"%s\" at top level.",
472 child->key);
473 }
474 }
475 if (conf != NULL)
476 rd_kafka_conf_destroy(conf);
477 return (0);
478 errout:
479 if (conf != NULL)
480 rd_kafka_conf_destroy(conf);
481 return -1;
482 } /* }}} int kafka_config */
484 void module_register(void) {
485 plugin_register_complex_config("write_kafka", kafka_config);
486 }