1 /**
2 * collectd - src/write_riemann.c
3 * Copyright (C) 2012,2013 Pierre-Yves Ritschard
4 * Copyright (C) 2013 Florian octo Forster
5 * Copyright (C) 2015,2016 Gergely Nagy
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Authors:
26 * Pierre-Yves Ritschard <pyr at spootnik.org>
27 * Florian octo Forster <octo at collectd.org>
28 * Gergely Nagy <algernon at madhouse-project.org>
29 */
31 #include "collectd.h"
33 #include "common.h"
34 #include "plugin.h"
35 #include "utils_cache.h"
36 #include "utils_complain.h"
37 #include "write_riemann_threshold.h"
39 #include <riemann/riemann-client.h>
41 #define RIEMANN_HOST "localhost"
42 #define RIEMANN_PORT 5555
43 #define RIEMANN_TTL_FACTOR 2.0
44 #define RIEMANN_BATCH_MAX 8192
46 struct riemann_host {
47 c_complain_t init_complaint;
48 char *name;
49 char *event_service_prefix;
50 pthread_mutex_t lock;
51 _Bool batch_mode;
52 _Bool notifications;
53 _Bool check_thresholds;
54 _Bool store_rates;
55 _Bool always_append_ds;
56 char *node;
57 int port;
58 riemann_client_type_t client_type;
59 riemann_client_t *client;
60 double ttl_factor;
61 cdtime_t batch_init;
62 int batch_max;
63 int batch_timeout;
64 int reference_count;
65 riemann_message_t *batch_msg;
66 char *tls_ca_file;
67 char *tls_cert_file;
68 char *tls_key_file;
69 struct timeval timeout;
70 };
72 static char **riemann_tags;
73 static size_t riemann_tags_num;
74 static char **riemann_attrs;
75 static size_t riemann_attrs_num;
77 /* host->lock must be held when calling this function. */
78 static int wrr_connect(struct riemann_host *host) /* {{{ */
79 {
80 char const *node;
81 int port;
83 if (host->client)
84 return 0;
86 node = (host->node != NULL) ? host->node : RIEMANN_HOST;
87 port = (host->port) ? host->port : RIEMANN_PORT;
89 host->client = NULL;
91 host->client = riemann_client_create(
92 host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
93 host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
94 host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
95 host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
96 if (host->client == NULL) {
97 c_complain(LOG_ERR, &host->init_complaint,
98 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
99 node, port);
100 return -1;
101 }
102 #if RCC_VERSION_NUMBER >= 0x010800
103 if (host->timeout.tv_sec != 0) {
104 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
105 riemann_client_free(host->client);
106 host->client = NULL;
107 c_complain(LOG_ERR, &host->init_complaint,
108 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
109 node, port);
110 return -1;
111 }
112 }
113 #endif
115 set_sock_opts(riemann_client_get_fd(host->client));
117 c_release(LOG_INFO, &host->init_complaint,
118 "write_riemann plugin: Successfully connected to %s:%d", node,
119 port);
121 return 0;
122 } /* }}} int wrr_connect */
124 /* host->lock must be held when calling this function. */
125 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
126 {
127 if (!host->client)
128 return (0);
130 riemann_client_free(host->client);
131 host->client = NULL;
133 return (0);
134 } /* }}} int wrr_disconnect */
136 /**
137 * Function to send messages to riemann.
138 *
139 * Acquires the host lock, disconnects on errors.
140 */
141 static int wrr_send_nolock(struct riemann_host *host,
142 riemann_message_t *msg) /* {{{ */
143 {
144 int status = 0;
146 status = wrr_connect(host);
147 if (status != 0) {
148 return status;
149 }
151 status = riemann_client_send_message(host->client, msg);
152 if (status != 0) {
153 wrr_disconnect(host);
154 return status;
155 }
157 /*
158 * For TCP we need to receive message acknowledgemenent.
159 */
160 if (host->client_type != RIEMANN_CLIENT_UDP) {
161 riemann_message_t *response;
163 response = riemann_client_recv_message(host->client);
165 if (response == NULL) {
166 wrr_disconnect(host);
167 return errno;
168 }
169 riemann_message_free(response);
170 }
172 return 0;
173 } /* }}} int wrr_send */
175 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
176 int status = 0;
178 pthread_mutex_lock(&host->lock);
179 status = wrr_send_nolock(host, msg);
180 pthread_mutex_unlock(&host->lock);
181 return status;
182 }
184 static riemann_message_t *
185 wrr_notification_to_message(struct riemann_host *host, /* {{{ */
186 notification_t const *n) {
187 riemann_message_t *msg;
188 riemann_event_t *event;
189 char service_buffer[6 * DATA_MAX_NAME_LEN];
190 char const *severity;
192 switch (n->severity) {
193 case NOTIF_OKAY:
194 severity = "ok";
195 break;
196 case NOTIF_WARNING:
197 severity = "warning";
198 break;
199 case NOTIF_FAILURE:
200 severity = "critical";
201 break;
202 default:
203 severity = "unknown";
204 }
206 format_name(service_buffer, sizeof(service_buffer),
207 /* host = */ "", n->plugin, n->plugin_instance, n->type,
208 n->type_instance);
210 event = riemann_event_create(
211 RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
212 (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
213 "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
214 RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
215 RIEMANN_EVENT_FIELD_NONE);
217 if (n->host[0] != 0)
218 riemann_event_string_attribute_add(event, "host", n->host);
219 if (n->plugin[0] != 0)
220 riemann_event_string_attribute_add(event, "plugin", n->plugin);
221 if (n->plugin_instance[0] != 0)
222 riemann_event_string_attribute_add(event, "plugin_instance",
223 n->plugin_instance);
225 if (n->type[0] != 0)
226 riemann_event_string_attribute_add(event, "type", n->type);
227 if (n->type_instance[0] != 0)
228 riemann_event_string_attribute_add(event, "type_instance",
229 n->type_instance);
231 for (size_t i = 0; i < riemann_attrs_num; i += 2)
232 riemann_event_string_attribute_add(event, riemann_attrs[i],
233 riemann_attrs[i + 1]);
235 for (size_t i = 0; i < riemann_tags_num; i++)
236 riemann_event_tag_add(event, riemann_tags[i]);
238 if (n->message[0] != 0)
239 riemann_event_string_attribute_add(event, "description", n->message);
241 /* Pull in values from threshold and add extra attributes */
242 for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
243 if (strcasecmp("CurrentValue", meta->name) == 0 &&
244 meta->type == NM_TYPE_DOUBLE) {
245 riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
246 (double)meta->nm_value.nm_double,
247 RIEMANN_EVENT_FIELD_NONE);
248 continue;
249 }
251 if (meta->type == NM_TYPE_STRING) {
252 riemann_event_string_attribute_add(event, meta->name,
253 meta->nm_value.nm_string);
254 continue;
255 }
256 }
258 msg = riemann_message_create_with_events(event, NULL);
259 if (msg == NULL) {
260 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
261 riemann_event_free(event);
262 return (NULL);
263 }
265 DEBUG("write_riemann plugin: Successfully created message for notification: "
266 "host = \"%s\", service = \"%s\", state = \"%s\"",
267 event->host, event->service, event->state);
268 return (msg);
269 } /* }}} riemann_message_t *wrr_notification_to_message */
271 static riemann_event_t *
272 wrr_value_to_event(struct riemann_host const *host, /* {{{ */
273 data_set_t const *ds, value_list_t const *vl, size_t index,
274 gauge_t const *rates, int status) {
275 riemann_event_t *event;
276 char name_buffer[5 * DATA_MAX_NAME_LEN];
277 char service_buffer[6 * DATA_MAX_NAME_LEN];
278 size_t i;
280 event = riemann_event_new();
281 if (event == NULL) {
282 ERROR("write_riemann plugin: riemann_event_new() failed.");
283 return (NULL);
284 }
286 format_name(name_buffer, sizeof(name_buffer),
287 /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
288 vl->type_instance);
289 if (host->always_append_ds || (ds->ds_num > 1)) {
290 if (host->event_service_prefix == NULL)
291 ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
292 &name_buffer[1], ds->ds[index].name);
293 else
294 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
295 host->event_service_prefix, &name_buffer[1],
296 ds->ds[index].name);
297 } else {
298 if (host->event_service_prefix == NULL)
299 sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
300 else
301 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
302 host->event_service_prefix, &name_buffer[1]);
303 }
305 riemann_event_set(
306 event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
307 (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
308 (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
309 RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
310 vl->type, "ds_name", ds->ds[index].name, NULL,
311 RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
313 if (host->check_thresholds) {
314 const char *state = NULL;
316 switch (status) {
317 case STATE_OKAY:
318 state = "ok";
319 break;
320 case STATE_ERROR:
321 state = "critical";
322 break;
323 case STATE_WARNING:
324 state = "warning";
325 break;
326 case STATE_MISSING:
327 state = "unknown";
328 break;
329 }
330 if (state)
331 riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
332 RIEMANN_EVENT_FIELD_NONE);
333 }
335 if (vl->plugin_instance[0] != 0)
336 riemann_event_string_attribute_add(event, "plugin_instance",
337 vl->plugin_instance);
338 if (vl->type_instance[0] != 0)
339 riemann_event_string_attribute_add(event, "type_instance",
340 vl->type_instance);
342 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
343 char ds_type[DATA_MAX_NAME_LEN];
345 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
346 DS_TYPE_TO_STRING(ds->ds[index].type));
347 riemann_event_string_attribute_add(event, "ds_type", ds_type);
348 } else {
349 riemann_event_string_attribute_add(event, "ds_type",
350 DS_TYPE_TO_STRING(ds->ds[index].type));
351 }
353 {
354 char ds_index[DATA_MAX_NAME_LEN];
356 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
357 riemann_event_string_attribute_add(event, "ds_index", ds_index);
358 }
360 for (i = 0; i < riemann_attrs_num; i += 2)
361 riemann_event_string_attribute_add(event, riemann_attrs[i],
362 riemann_attrs[i + 1]);
364 for (i = 0; i < riemann_tags_num; i++)
365 riemann_event_tag_add(event, riemann_tags[i]);
367 if (ds->ds[index].type == DS_TYPE_GAUGE) {
368 riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
369 (double)vl->values[index].gauge,
370 RIEMANN_EVENT_FIELD_NONE);
371 } else if (rates != NULL) {
372 riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
373 RIEMANN_EVENT_FIELD_NONE);
374 } else {
375 int64_t metric;
377 if (ds->ds[index].type == DS_TYPE_DERIVE)
378 metric = (int64_t)vl->values[index].derive;
379 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
380 metric = (int64_t)vl->values[index].absolute;
381 else
382 metric = (int64_t)vl->values[index].counter;
384 riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
385 RIEMANN_EVENT_FIELD_NONE);
386 }
388 DEBUG("write_riemann plugin: Successfully created message for metric: "
389 "host = \"%s\", service = \"%s\"",
390 event->host, event->service);
391 return (event);
392 } /* }}} riemann_event_t *wrr_value_to_event */
394 static riemann_message_t *
395 wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
396 data_set_t const *ds, value_list_t const *vl,
397 int *statuses) {
398 riemann_message_t *msg;
399 size_t i;
400 gauge_t *rates = NULL;
402 /* Initialize the Msg structure. */
403 msg = riemann_message_new();
404 if (msg == NULL) {
405 ERROR("write_riemann plugin: riemann_message_new failed.");
406 return (NULL);
407 }
409 if (host->store_rates) {
410 rates = uc_get_rate(ds, vl);
411 if (rates == NULL) {
412 ERROR("write_riemann plugin: uc_get_rate failed.");
413 riemann_message_free(msg);
414 return (NULL);
415 }
416 }
418 for (i = 0; i < vl->values_len; i++) {
419 riemann_event_t *event;
421 event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
422 if (event == NULL) {
423 riemann_message_free(msg);
424 sfree(rates);
425 return (NULL);
426 }
427 riemann_message_append_events(msg, event, NULL);
428 }
430 sfree(rates);
431 return (msg);
432 } /* }}} riemann_message_t *wrr_value_list_to_message */
434 /*
435 * Always call while holding host->lock !
436 */
437 static int wrr_batch_flush_nolock(cdtime_t timeout, struct riemann_host *host) {
438 cdtime_t now;
439 int status = 0;
441 now = cdtime();
442 if (timeout > 0) {
443 if ((host->batch_init + timeout) > now) {
444 return status;
445 }
446 }
447 wrr_send_nolock(host, host->batch_msg);
448 riemann_message_free(host->batch_msg);
450 host->batch_init = now;
451 host->batch_msg = NULL;
452 return status;
453 }
455 static int wrr_batch_flush(cdtime_t timeout,
456 const char *identifier __attribute__((unused)),
457 user_data_t *user_data) {
458 struct riemann_host *host;
459 int status;
461 if (user_data == NULL)
462 return (-EINVAL);
464 host = user_data->data;
465 pthread_mutex_lock(&host->lock);
466 status = wrr_batch_flush_nolock(timeout, host);
467 if (status != 0)
468 c_complain(
469 LOG_ERR, &host->init_complaint,
470 "write_riemann plugin: riemann_client_send failed with status %i",
471 status);
472 else
473 c_release(LOG_DEBUG, &host->init_complaint,
474 "write_riemann plugin: batch sent.");
476 pthread_mutex_unlock(&host->lock);
477 return status;
478 }
480 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
481 data_set_t const *ds,
482 value_list_t const *vl, int *statuses) {
483 riemann_message_t *msg;
484 size_t len;
485 int ret;
486 cdtime_t timeout;
488 msg = wrr_value_list_to_message(host, ds, vl, statuses);
489 if (msg == NULL)
490 return -1;
492 pthread_mutex_lock(&host->lock);
494 if (host->batch_msg == NULL) {
495 host->batch_msg = msg;
496 } else {
497 int status;
499 status = riemann_message_append_events_n(host->batch_msg, msg->n_events,
500 msg->events);
501 msg->n_events = 0;
502 msg->events = NULL;
504 riemann_message_free(msg);
506 if (status != 0) {
507 pthread_mutex_unlock(&host->lock);
508 ERROR("write_riemann plugin: out of memory");
509 return -1;
510 }
511 }
513 len = riemann_message_get_packed_size(host->batch_msg);
514 ret = 0;
515 if ((host->batch_max < 0) || (((size_t)host->batch_max) <= len)) {
516 ret = wrr_batch_flush_nolock(0, host);
517 } else {
518 if (host->batch_timeout > 0) {
519 timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
520 ret = wrr_batch_flush_nolock(timeout, host);
521 }
522 }
524 pthread_mutex_unlock(&host->lock);
525 return ret;
526 } /* }}} riemann_message_t *wrr_batch_add_value_list */
528 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
529 {
530 int status;
531 struct riemann_host *host = ud->data;
532 riemann_message_t *msg;
534 if (!host->notifications)
535 return 0;
537 /*
538 * Never batch for notifications, send them ASAP
539 */
540 msg = wrr_notification_to_message(host, n);
541 if (msg == NULL)
542 return (-1);
544 status = wrr_send(host, msg);
545 if (status != 0)
546 c_complain(
547 LOG_ERR, &host->init_complaint,
548 "write_riemann plugin: riemann_client_send failed with status %i",
549 status);
550 else
551 c_release(LOG_DEBUG, &host->init_complaint,
552 "write_riemann plugin: riemann_client_send succeeded");
554 riemann_message_free(msg);
555 return (status);
556 } /* }}} int wrr_notification */
558 static int wrr_write(const data_set_t *ds, /* {{{ */
559 const value_list_t *vl, user_data_t *ud) {
560 int status = 0;
561 int statuses[vl->values_len];
562 struct riemann_host *host = ud->data;
563 riemann_message_t *msg;
565 if (host->check_thresholds) {
566 status = write_riemann_threshold_check(ds, vl, statuses);
567 if (status != 0)
568 return status;
569 } else {
570 memset(statuses, 0, sizeof(statuses));
571 }
573 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
574 wrr_batch_add_value_list(host, ds, vl, statuses);
575 } else {
576 msg = wrr_value_list_to_message(host, ds, vl, statuses);
577 if (msg == NULL)
578 return (-1);
580 status = wrr_send(host, msg);
582 riemann_message_free(msg);
583 }
584 return status;
585 } /* }}} int wrr_write */
587 static void wrr_free(void *p) /* {{{ */
588 {
589 struct riemann_host *host = p;
591 if (host == NULL)
592 return;
594 pthread_mutex_lock(&host->lock);
596 host->reference_count--;
597 if (host->reference_count > 0) {
598 pthread_mutex_unlock(&host->lock);
599 return;
600 }
602 wrr_disconnect(host);
604 pthread_mutex_destroy(&host->lock);
605 sfree(host);
606 } /* }}} void wrr_free */
608 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
609 {
610 struct riemann_host *host = NULL;
611 int status = 0;
612 int i;
613 oconfig_item_t *child;
614 char callback_name[DATA_MAX_NAME_LEN];
616 if ((host = calloc(1, sizeof(*host))) == NULL) {
617 ERROR("write_riemann plugin: calloc failed.");
618 return ENOMEM;
619 }
620 pthread_mutex_init(&host->lock, NULL);
621 C_COMPLAIN_INIT(&host->init_complaint);
622 host->reference_count = 1;
623 host->node = NULL;
624 host->port = 0;
625 host->notifications = 1;
626 host->check_thresholds = 0;
627 host->store_rates = 1;
628 host->always_append_ds = 0;
629 host->batch_mode = 1;
630 host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
631 host->batch_init = cdtime();
632 host->batch_timeout = 0;
633 host->ttl_factor = RIEMANN_TTL_FACTOR;
634 host->client = NULL;
635 host->client_type = RIEMANN_CLIENT_TCP;
636 host->timeout.tv_sec = 0;
637 host->timeout.tv_usec = 0;
639 status = cf_util_get_string(ci, &host->name);
640 if (status != 0) {
641 WARNING("write_riemann plugin: Required host name is missing.");
642 wrr_free(host);
643 return -1;
644 }
646 for (i = 0; i < ci->children_num; i++) {
647 /*
648 * The code here could be simplified but makes room
649 * for easy adding of new options later on.
650 */
651 child = &ci->children[i];
652 status = 0;
654 if (strcasecmp("Host", child->key) == 0) {
655 status = cf_util_get_string(child, &host->node);
656 if (status != 0)
657 break;
658 } else if (strcasecmp("Notifications", child->key) == 0) {
659 status = cf_util_get_boolean(child, &host->notifications);
660 if (status != 0)
661 break;
662 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
663 status = cf_util_get_string(child, &host->event_service_prefix);
664 if (status != 0)
665 break;
666 } else if (strcasecmp("CheckThresholds", child->key) == 0) {
667 status = cf_util_get_boolean(child, &host->check_thresholds);
668 if (status != 0)
669 break;
670 } else if (strcasecmp("Batch", child->key) == 0) {
671 status = cf_util_get_boolean(child, &host->batch_mode);
672 if (status != 0)
673 break;
674 } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
675 status = cf_util_get_int(child, &host->batch_max);
676 if (status != 0)
677 break;
678 } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
679 status = cf_util_get_int(child, &host->batch_timeout);
680 if (status != 0)
681 break;
682 } else if (strcasecmp("Timeout", child->key) == 0) {
683 #if RCC_VERSION_NUMBER >= 0x010800
684 status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
685 if (status != 0)
686 break;
687 #else
688 WARNING("write_riemann plugin: The Timeout option is not supported. "
689 "Please upgrade the Riemann client to at least 1.8.0.");
690 #endif
691 } else if (strcasecmp("Port", child->key) == 0) {
692 host->port = cf_util_get_port_number(child);
693 if (host->port == -1) {
694 ERROR("write_riemann plugin: Invalid argument "
695 "configured for the \"Port\" "
696 "option.");
697 break;
698 }
699 } else if (strcasecmp("Protocol", child->key) == 0) {
700 char tmp[16];
701 status = cf_util_get_string_buffer(child, tmp, sizeof(tmp));
702 if (status != 0) {
703 ERROR("write_riemann plugin: cf_util_get_"
704 "string_buffer failed with "
705 "status %i.",
706 status);
707 break;
708 }
710 if (strcasecmp("UDP", tmp) == 0)
711 host->client_type = RIEMANN_CLIENT_UDP;
712 else if (strcasecmp("TCP", tmp) == 0)
713 host->client_type = RIEMANN_CLIENT_TCP;
714 else if (strcasecmp("TLS", tmp) == 0)
715 host->client_type = RIEMANN_CLIENT_TLS;
716 else
717 WARNING("write_riemann plugin: The value "
718 "\"%s\" is not valid for the "
719 "\"Protocol\" option. Use "
720 "either \"UDP\", \"TCP\" or \"TLS\".",
721 tmp);
722 } else if (strcasecmp("TLSCAFile", child->key) == 0) {
723 status = cf_util_get_string(child, &host->tls_ca_file);
724 if (status != 0) {
725 ERROR("write_riemann plugin: cf_util_get_"
726 "string_buffer failed with "
727 "status %i.",
728 status);
729 break;
730 }
731 } else if (strcasecmp("TLSCertFile", child->key) == 0) {
732 status = cf_util_get_string(child, &host->tls_cert_file);
733 if (status != 0) {
734 ERROR("write_riemann plugin: cf_util_get_"
735 "string_buffer failed with "
736 "status %i.",
737 status);
738 break;
739 }
740 } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
741 status = cf_util_get_string(child, &host->tls_key_file);
742 if (status != 0) {
743 ERROR("write_riemann plugin: cf_util_get_"
744 "string_buffer failed with "
745 "status %i.",
746 status);
747 break;
748 }
749 } else if (strcasecmp("StoreRates", child->key) == 0) {
750 status = cf_util_get_boolean(child, &host->store_rates);
751 if (status != 0)
752 break;
753 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
754 status = cf_util_get_boolean(child, &host->always_append_ds);
755 if (status != 0)
756 break;
757 } else if (strcasecmp("TTLFactor", child->key) == 0) {
758 double tmp = NAN;
759 status = cf_util_get_double(child, &tmp);
760 if (status != 0)
761 break;
762 if (tmp >= 2.0) {
763 host->ttl_factor = tmp;
764 } else if (tmp >= 1.0) {
765 NOTICE("write_riemann plugin: The configured "
766 "TTLFactor is very small "
767 "(%.1f). A value of 2.0 or "
768 "greater is recommended.",
769 tmp);
770 host->ttl_factor = tmp;
771 } else if (tmp > 0.0) {
772 WARNING("write_riemann plugin: The configured "
773 "TTLFactor is too small to be "
774 "useful (%.1f). I'll use it "
775 "since the user knows best, "
776 "but under protest.",
777 tmp);
778 host->ttl_factor = tmp;
779 } else { /* zero, negative and NAN */
780 ERROR("write_riemann plugin: The configured "
781 "TTLFactor is invalid (%.1f).",
782 tmp);
783 }
784 } else {
785 WARNING("write_riemann plugin: ignoring unknown config "
786 "option: \"%s\"",
787 child->key);
788 }
789 }
790 if (status != 0) {
791 wrr_free(host);
792 return status;
793 }
795 ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
796 host->name);
798 user_data_t ud = {.data = host, .free_func = wrr_free};
800 pthread_mutex_lock(&host->lock);
802 status = plugin_register_write(callback_name, wrr_write, &ud);
804 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
805 ud.free_func = NULL;
806 plugin_register_flush(callback_name, wrr_batch_flush, &ud);
807 }
808 if (status != 0)
809 WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
810 "failed with status %i.",
811 callback_name, status);
812 else /* success */
813 host->reference_count++;
815 status = plugin_register_notification(callback_name, wrr_notification, &ud);
816 if (status != 0)
817 WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
818 "failed with status %i.",
819 callback_name, status);
820 else /* success */
821 host->reference_count++;
823 if (host->reference_count <= 1) {
824 /* Both callbacks failed => free memory.
825 * We need to unlock here, because riemann_free() will lock.
826 * This is not a race condition, because we're the only one
827 * holding a reference. */
828 pthread_mutex_unlock(&host->lock);
829 wrr_free(host);
830 return (-1);
831 }
833 host->reference_count--;
834 pthread_mutex_unlock(&host->lock);
836 return status;
837 } /* }}} int wrr_config_node */
839 static int wrr_config(oconfig_item_t *ci) /* {{{ */
840 {
841 int i;
842 oconfig_item_t *child;
843 int status;
845 for (i = 0; i < ci->children_num; i++) {
846 child = &ci->children[i];
848 if (strcasecmp("Node", child->key) == 0) {
849 wrr_config_node(child);
850 } else if (strcasecmp(child->key, "attribute") == 0) {
851 char *key = NULL;
852 char *val = NULL;
854 if (child->values_num != 2) {
855 WARNING("riemann attributes need both a key and a value.");
856 return (-1);
857 }
858 if (child->values[0].type != OCONFIG_TYPE_STRING ||
859 child->values[1].type != OCONFIG_TYPE_STRING) {
860 WARNING("riemann attribute needs string arguments.");
861 return (-1);
862 }
863 if ((key = strdup(child->values[0].value.string)) == NULL) {
864 WARNING("cannot allocate memory for attribute key.");
865 return (-1);
866 }
867 if ((val = strdup(child->values[1].value.string)) == NULL) {
868 WARNING("cannot allocate memory for attribute value.");
869 sfree(key);
870 return (-1);
871 }
872 strarray_add(&riemann_attrs, &riemann_attrs_num, key);
873 strarray_add(&riemann_attrs, &riemann_attrs_num, val);
874 DEBUG("write_riemann: got attr: %s => %s", key, val);
875 sfree(key);
876 sfree(val);
877 } else if (strcasecmp(child->key, "tag") == 0) {
878 char *tmp = NULL;
879 status = cf_util_get_string(child, &tmp);
880 if (status != 0)
881 continue;
883 strarray_add(&riemann_tags, &riemann_tags_num, tmp);
884 DEBUG("write_riemann plugin: Got tag: %s", tmp);
885 sfree(tmp);
886 } else {
887 WARNING("write_riemann plugin: Ignoring unknown "
888 "configuration option \"%s\" at top level.",
889 child->key);
890 }
891 }
892 return (0);
893 } /* }}} int wrr_config */
895 void module_register(void) {
896 plugin_register_complex_config("write_riemann", wrr_config);
897 }
899 /* vim: set sw=8 sts=8 ts=8 noet : */