63e89186cb3eb45a7a741304d0745851b0886317
1 /**
2 * collectd - src/write_riemann.c
3 * Copyright (C) 2012,2013 Pierre-Yves Ritschard
4 * Copyright (C) 2013 Florian octo Forster
5 * Copyright (C) 2015,2016 Gergely Nagy
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Authors:
26 * Pierre-Yves Ritschard <pyr at spootnik.org>
27 * Florian octo Forster <octo at collectd.org>
28 * Gergely Nagy <algernon at madhouse-project.org>
29 */
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "write_riemann_threshold.h"
42 #define RIEMANN_HOST "localhost"
43 #define RIEMANN_PORT 5555
44 #define RIEMANN_TTL_FACTOR 2.0
45 #define RIEMANN_BATCH_MAX 8192
47 struct riemann_host {
48 char *name;
49 char *event_service_prefix;
50 pthread_mutex_t lock;
51 _Bool batch_mode;
52 _Bool notifications;
53 _Bool check_thresholds;
54 _Bool store_rates;
55 _Bool always_append_ds;
56 char *node;
57 int port;
58 riemann_client_type_t client_type;
59 riemann_client_t *client;
60 double ttl_factor;
61 cdtime_t batch_init;
62 int batch_max;
63 int reference_count;
64 riemann_message_t *batch_msg;
65 char *tls_ca_file;
66 char *tls_cert_file;
67 char *tls_key_file;
68 struct timeval timeout;
69 };
71 static char **riemann_tags;
72 static size_t riemann_tags_num;
73 static char **riemann_attrs;
74 static size_t riemann_attrs_num;
76 /* host->lock must be held when calling this function. */
77 static int wrr_connect(struct riemann_host *host) /* {{{ */
78 {
79 char const *node;
80 int port;
82 if (host->client)
83 return 0;
85 node = (host->node != NULL) ? host->node : RIEMANN_HOST;
86 port = (host->port) ? host->port : RIEMANN_PORT;
88 host->client = NULL;
90 host->client = riemann_client_create(host->client_type, node, port,
91 RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
92 RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
93 RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
94 RIEMANN_CLIENT_OPTION_NONE);
95 if (host->client == NULL) {
96 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
97 node, port);
98 return -1;
99 }
100 if (host->timeout.tv_sec != 0) {
101 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
102 riemann_client_free(host->client);
103 host->client = NULL;
104 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
105 node, port);
106 return -1;
107 }
108 }
109 DEBUG("write_riemann plugin: got a successful connection for: %s:%d",
110 node, port);
112 return 0;
113 } /* }}} int wrr_connect */
115 /* host->lock must be held when calling this function. */
116 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
117 {
118 if (!host->client)
119 return (0);
121 riemann_client_free(host->client);
122 host->client = NULL;
124 return (0);
125 } /* }}} int wrr_disconnect */
127 /**
128 * Function to send messages to riemann.
129 *
130 * Acquires the host lock, disconnects on errors.
131 */
132 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
133 {
134 int status = 0;
135 pthread_mutex_lock (&host->lock);
137 status = wrr_connect(host);
138 if (status != 0) {
139 pthread_mutex_unlock(&host->lock);
140 return status;
141 }
143 status = riemann_client_send_message(host->client, msg);
144 if (status != 0) {
145 wrr_disconnect(host);
146 pthread_mutex_unlock(&host->lock);
147 return status;
148 }
150 /*
151 * For TCP we need to receive message acknowledgemenent.
152 */
153 if (host->client_type != RIEMANN_CLIENT_UDP)
154 {
155 riemann_message_t *response;
157 response = riemann_client_recv_message(host->client);
159 if (response == NULL)
160 {
161 wrr_disconnect(host);
162 pthread_mutex_unlock(&host->lock);
163 return errno;
164 }
165 riemann_message_free(response);
166 }
168 pthread_mutex_unlock (&host->lock);
169 return 0;
170 } /* }}} int wrr_send */
172 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
173 notification_t const *n)
174 {
175 riemann_message_t *msg;
176 riemann_event_t *event;
177 char service_buffer[6 * DATA_MAX_NAME_LEN];
178 char const *severity;
179 notification_meta_t *meta;
180 size_t i;
182 switch (n->severity)
183 {
184 case NOTIF_OKAY: severity = "ok"; break;
185 case NOTIF_WARNING: severity = "warning"; break;
186 case NOTIF_FAILURE: severity = "critical"; break;
187 default: severity = "unknown";
188 }
190 format_name(service_buffer, sizeof(service_buffer),
191 /* host = */ "", n->plugin, n->plugin_instance,
192 n->type, n->type_instance);
194 event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
195 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
196 RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
197 RIEMANN_EVENT_FIELD_STATE, severity,
198 RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
199 RIEMANN_EVENT_FIELD_NONE);
201 if (n->host[0] != 0)
202 riemann_event_string_attribute_add(event, "host", n->host);
203 if (n->plugin[0] != 0)
204 riemann_event_string_attribute_add(event, "plugin", n->plugin);
205 if (n->plugin_instance[0] != 0)
206 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
208 if (n->type[0] != 0)
209 riemann_event_string_attribute_add(event, "type", n->type);
210 if (n->type_instance[0] != 0)
211 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
213 for (i = 0; i < riemann_attrs_num; i += 2)
214 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
216 for (i = 0; i < riemann_tags_num; i++)
217 riemann_event_tag_add(event, riemann_tags[i]);
219 if (n->message[0] != 0)
220 riemann_event_string_attribute_add(event, "description", n->message);
222 /* Pull in values from threshold and add extra attributes */
223 for (meta = n->meta; meta != NULL; meta = meta->next)
224 {
225 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
226 {
227 riemann_event_set(event,
228 RIEMANN_EVENT_FIELD_METRIC_D,
229 (double) meta->nm_value.nm_double,
230 RIEMANN_EVENT_FIELD_NONE);
231 continue;
232 }
234 if (meta->type == NM_TYPE_STRING) {
235 riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
236 continue;
237 }
238 }
240 msg = riemann_message_create_with_events(event, NULL);
241 if (msg == NULL)
242 {
243 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
244 riemann_event_free (event);
245 return (NULL);
246 }
248 DEBUG("write_riemann plugin: Successfully created message for notification: "
249 "host = \"%s\", service = \"%s\", state = \"%s\"",
250 event->host, event->service, event->state);
251 return (msg);
252 } /* }}} riemann_message_t *wrr_notification_to_message */
254 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
255 data_set_t const *ds,
256 value_list_t const *vl, size_t index,
257 gauge_t const *rates,
258 int status)
259 {
260 riemann_event_t *event;
261 char name_buffer[5 * DATA_MAX_NAME_LEN];
262 char service_buffer[6 * DATA_MAX_NAME_LEN];
263 size_t i;
265 event = riemann_event_new();
266 if (event == NULL)
267 {
268 ERROR("write_riemann plugin: riemann_event_new() failed.");
269 return (NULL);
270 }
272 format_name(name_buffer, sizeof(name_buffer),
273 /* host = */ "", vl->plugin, vl->plugin_instance,
274 vl->type, vl->type_instance);
275 if (host->always_append_ds || (ds->ds_num > 1))
276 {
277 if (host->event_service_prefix == NULL)
278 ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
279 &name_buffer[1], ds->ds[index].name);
280 else
281 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
282 host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
283 }
284 else
285 {
286 if (host->event_service_prefix == NULL)
287 sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
288 else
289 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
290 host->event_service_prefix, &name_buffer[1]);
291 }
293 riemann_event_set(event,
294 RIEMANN_EVENT_FIELD_HOST, vl->host,
295 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
296 RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
297 RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
298 "plugin", vl->plugin,
299 "type", vl->type,
300 "ds_name", ds->ds[index].name,
301 NULL,
302 RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
303 RIEMANN_EVENT_FIELD_NONE);
305 if (host->check_thresholds) {
306 const char *state = NULL;
308 switch (status) {
309 case STATE_OKAY:
310 state = "ok";
311 break;
312 case STATE_ERROR:
313 state = "critical";
314 break;
315 case STATE_WARNING:
316 state = "warning";
317 break;
318 case STATE_MISSING:
319 state = "unknown";
320 break;
321 }
322 if (state)
323 riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
324 RIEMANN_EVENT_FIELD_NONE);
325 }
327 if (vl->plugin_instance[0] != 0)
328 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
329 if (vl->type_instance[0] != 0)
330 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
332 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
333 {
334 char ds_type[DATA_MAX_NAME_LEN];
336 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
337 DS_TYPE_TO_STRING(ds->ds[index].type));
338 riemann_event_string_attribute_add(event, "ds_type", ds_type);
339 }
340 else
341 {
342 riemann_event_string_attribute_add(event, "ds_type",
343 DS_TYPE_TO_STRING(ds->ds[index].type));
344 }
346 {
347 char ds_index[DATA_MAX_NAME_LEN];
349 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
350 riemann_event_string_attribute_add(event, "ds_index", ds_index);
351 }
353 for (i = 0; i < riemann_attrs_num; i += 2)
354 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
356 for (i = 0; i < riemann_tags_num; i++)
357 riemann_event_tag_add(event, riemann_tags[i]);
359 if (ds->ds[index].type == DS_TYPE_GAUGE)
360 {
361 riemann_event_set(event,
362 RIEMANN_EVENT_FIELD_METRIC_D,
363 (double) vl->values[index].gauge,
364 RIEMANN_EVENT_FIELD_NONE);
365 }
366 else if (rates != NULL)
367 {
368 riemann_event_set(event,
369 RIEMANN_EVENT_FIELD_METRIC_D,
370 (double) rates[index],
371 RIEMANN_EVENT_FIELD_NONE);
372 }
373 else
374 {
375 int64_t metric;
377 if (ds->ds[index].type == DS_TYPE_DERIVE)
378 metric = (int64_t) vl->values[index].derive;
379 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
380 metric = (int64_t) vl->values[index].absolute;
381 else
382 metric = (int64_t) vl->values[index].counter;
384 riemann_event_set(event,
385 RIEMANN_EVENT_FIELD_METRIC_S64,
386 (int64_t) metric,
387 RIEMANN_EVENT_FIELD_NONE);
388 }
390 DEBUG("write_riemann plugin: Successfully created message for metric: "
391 "host = \"%s\", service = \"%s\"",
392 event->host, event->service);
393 return (event);
394 } /* }}} riemann_event_t *wrr_value_to_event */
396 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
397 data_set_t const *ds,
398 value_list_t const *vl,
399 int *statuses)
400 {
401 riemann_message_t *msg;
402 size_t i;
403 gauge_t *rates = NULL;
405 /* Initialize the Msg structure. */
406 msg = riemann_message_new();
407 if (msg == NULL)
408 {
409 ERROR ("write_riemann plugin: riemann_message_new failed.");
410 return (NULL);
411 }
413 if (host->store_rates)
414 {
415 rates = uc_get_rate(ds, vl);
416 if (rates == NULL)
417 {
418 ERROR("write_riemann plugin: uc_get_rate failed.");
419 riemann_message_free(msg);
420 return (NULL);
421 }
422 }
424 for (i = 0; i < vl->values_len; i++)
425 {
426 riemann_event_t *event;
428 event = wrr_value_to_event(host, ds, vl,
429 (int) i, rates, statuses[i]);
430 if (event == NULL)
431 {
432 riemann_message_free(msg);
433 sfree(rates);
434 return (NULL);
435 }
436 riemann_message_append_events(msg, event, NULL);
437 }
439 sfree(rates);
440 return (msg);
441 } /* }}} riemann_message_t *wrr_value_list_to_message */
443 /*
444 * Always call while holding host->lock !
445 */
446 static int wrr_batch_flush_nolock(cdtime_t timeout,
447 struct riemann_host *host)
448 {
449 cdtime_t now;
450 int status = 0;
452 if (timeout > 0) {
453 now = cdtime();
454 if ((host->batch_init + timeout) > now)
455 return status;
456 }
457 wrr_send(host, host->batch_msg);
458 riemann_message_free(host->batch_msg);
460 if (host->client_type != RIEMANN_CLIENT_UDP)
461 {
462 riemann_message_t *response;
464 response = riemann_client_recv_message(host->client);
466 if (!response)
467 {
468 wrr_disconnect(host);
469 return errno;
470 }
472 riemann_message_free(response);
473 }
475 host->batch_init = cdtime();
476 host->batch_msg = NULL;
477 return status;
478 }
480 static int wrr_batch_flush(cdtime_t timeout,
481 const char *identifier __attribute__((unused)),
482 user_data_t *user_data)
483 {
484 struct riemann_host *host;
485 int status;
487 if (user_data == NULL)
488 return (-EINVAL);
490 host = user_data->data;
491 pthread_mutex_lock(&host->lock);
492 status = wrr_batch_flush_nolock(timeout, host);
493 if (status != 0)
494 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
495 status);
497 pthread_mutex_unlock(&host->lock);
498 return status;
499 }
501 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
502 data_set_t const *ds,
503 value_list_t const *vl,
504 int *statuses)
505 {
506 riemann_message_t *msg;
507 size_t len;
508 int ret;
510 msg = wrr_value_list_to_message(host, ds, vl, statuses);
511 if (msg == NULL)
512 return -1;
514 pthread_mutex_lock(&host->lock);
516 if (host->batch_msg == NULL) {
517 host->batch_msg = msg;
518 } else {
519 int status;
521 status = riemann_message_append_events_n(host->batch_msg,
522 msg->n_events,
523 msg->events);
524 msg->n_events = 0;
525 msg->events = NULL;
527 riemann_message_free(msg);
529 if (status != 0) {
530 pthread_mutex_unlock(&host->lock);
531 ERROR("write_riemann plugin: out of memory");
532 return -1;
533 }
534 }
536 len = riemann_message_get_packed_size(host->batch_msg);
537 ret = 0;
538 if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
539 ret = wrr_batch_flush_nolock(0, host);
540 }
542 pthread_mutex_unlock(&host->lock);
543 return ret;
544 } /* }}} riemann_message_t *wrr_batch_add_value_list */
546 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
547 {
548 int status;
549 struct riemann_host *host = ud->data;
550 riemann_message_t *msg;
552 if (!host->notifications)
553 return 0;
555 /*
556 * Never batch for notifications, send them ASAP
557 */
558 msg = wrr_notification_to_message(host, n);
559 if (msg == NULL)
560 return (-1);
562 status = wrr_send(host, msg);
563 if (status != 0)
564 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
565 status);
567 riemann_message_free(msg);
568 return (status);
569 } /* }}} int wrr_notification */
571 static int wrr_write(const data_set_t *ds, /* {{{ */
572 const value_list_t *vl,
573 user_data_t *ud)
574 {
575 int status = 0;
576 int statuses[vl->values_len];
577 struct riemann_host *host = ud->data;
578 riemann_message_t *msg;
580 if (host->check_thresholds) {
581 status = write_riemann_threshold_check(ds, vl, statuses);
582 if (status != 0)
583 return status;
584 } else {
585 memset (statuses, 0, sizeof (statuses));
586 }
588 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
589 wrr_batch_add_value_list(host, ds, vl, statuses);
590 } else {
591 msg = wrr_value_list_to_message(host, ds, vl, statuses);
592 if (msg == NULL)
593 return (-1);
595 status = wrr_send(host, msg);
596 if (status != 0)
597 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
598 status);
600 riemann_message_free(msg);
601 }
602 return status;
603 } /* }}} int wrr_write */
605 static void wrr_free(void *p) /* {{{ */
606 {
607 struct riemann_host *host = p;
609 if (host == NULL)
610 return;
612 pthread_mutex_lock(&host->lock);
614 host->reference_count--;
615 if (host->reference_count > 0)
616 {
617 pthread_mutex_unlock(&host->lock);
618 return;
619 }
621 wrr_disconnect(host);
623 pthread_mutex_destroy(&host->lock);
624 sfree(host);
625 } /* }}} void wrr_free */
627 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
628 {
629 struct riemann_host *host = NULL;
630 int status = 0;
631 int i;
632 oconfig_item_t *child;
633 char callback_name[DATA_MAX_NAME_LEN];
634 user_data_t ud;
636 if ((host = calloc(1, sizeof(*host))) == NULL) {
637 ERROR ("write_riemann plugin: calloc failed.");
638 return ENOMEM;
639 }
640 pthread_mutex_init(&host->lock, NULL);
641 host->reference_count = 1;
642 host->node = NULL;
643 host->port = 0;
644 host->notifications = 1;
645 host->check_thresholds = 0;
646 host->store_rates = 1;
647 host->always_append_ds = 0;
648 host->batch_mode = 1;
649 host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
650 host->batch_init = cdtime();
651 host->ttl_factor = RIEMANN_TTL_FACTOR;
652 host->client = NULL;
653 host->client_type = RIEMANN_CLIENT_TCP;
654 host->timeout.tv_sec = 0;
655 host->timeout.tv_usec = 0;
657 status = cf_util_get_string(ci, &host->name);
658 if (status != 0) {
659 WARNING("write_riemann plugin: Required host name is missing.");
660 wrr_free(host);
661 return -1;
662 }
664 for (i = 0; i < ci->children_num; i++) {
665 /*
666 * The code here could be simplified but makes room
667 * for easy adding of new options later on.
668 */
669 child = &ci->children[i];
670 status = 0;
672 if (strcasecmp("Host", child->key) == 0) {
673 status = cf_util_get_string(child, &host->node);
674 if (status != 0)
675 break;
676 } else if (strcasecmp("Notifications", child->key) == 0) {
677 status = cf_util_get_boolean(child, &host->notifications);
678 if (status != 0)
679 break;
680 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
681 status = cf_util_get_string(child, &host->event_service_prefix);
682 if (status != 0)
683 break;
684 } else if (strcasecmp("CheckThresholds", child->key) == 0) {
685 status = cf_util_get_boolean(child, &host->check_thresholds);
686 if (status != 0)
687 break;
688 } else if (strcasecmp("Batch", child->key) == 0) {
689 status = cf_util_get_boolean(child, &host->batch_mode);
690 if (status != 0)
691 break;
692 } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
693 status = cf_util_get_int(child, &host->batch_max);
694 if (status != 0)
695 break;
696 } else if (strcasecmp("Timeout", child->key) == 0) {
697 status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
698 if (status != 0)
699 break;
700 } else if (strcasecmp("Port", child->key) == 0) {
701 host->port = cf_util_get_port_number(child);
702 if (host->port == -1) {
703 ERROR("write_riemann plugin: Invalid argument "
704 "configured for the \"Port\" "
705 "option.");
706 break;
707 }
708 } else if (strcasecmp("Protocol", child->key) == 0) {
709 char tmp[16];
710 status = cf_util_get_string_buffer(child,
711 tmp, sizeof(tmp));
712 if (status != 0)
713 {
714 ERROR("write_riemann plugin: cf_util_get_"
715 "string_buffer failed with "
716 "status %i.", status);
717 break;
718 }
720 if (strcasecmp("UDP", tmp) == 0)
721 host->client_type = RIEMANN_CLIENT_UDP;
722 else if (strcasecmp("TCP", tmp) == 0)
723 host->client_type = RIEMANN_CLIENT_TCP;
724 else if (strcasecmp("TLS", tmp) == 0)
725 host->client_type = RIEMANN_CLIENT_TLS;
726 else
727 WARNING("write_riemann plugin: The value "
728 "\"%s\" is not valid for the "
729 "\"Protocol\" option. Use "
730 "either \"UDP\", \"TCP\" or \"TLS\".",
731 tmp);
732 } else if (strcasecmp("TLSCAFile", child->key) == 0) {
733 status = cf_util_get_string(child, &host->tls_ca_file);
734 if (status != 0)
735 {
736 ERROR("write_riemann plugin: cf_util_get_"
737 "string_buffer failed with "
738 "status %i.", status);
739 break;
740 }
741 } else if (strcasecmp("TLSCertFile", child->key) == 0) {
742 status = cf_util_get_string(child, &host->tls_cert_file);
743 if (status != 0)
744 {
745 ERROR("write_riemann plugin: cf_util_get_"
746 "string_buffer failed with "
747 "status %i.", status);
748 break;
749 }
750 } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
751 status = cf_util_get_string(child, &host->tls_key_file);
752 if (status != 0)
753 {
754 ERROR("write_riemann plugin: cf_util_get_"
755 "string_buffer failed with "
756 "status %i.", status);
757 break;
758 }
759 } else if (strcasecmp("StoreRates", child->key) == 0) {
760 status = cf_util_get_boolean(child, &host->store_rates);
761 if (status != 0)
762 break;
763 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
764 status = cf_util_get_boolean(child,
765 &host->always_append_ds);
766 if (status != 0)
767 break;
768 } else if (strcasecmp("TTLFactor", child->key) == 0) {
769 double tmp = NAN;
770 status = cf_util_get_double(child, &tmp);
771 if (status != 0)
772 break;
773 if (tmp >= 2.0) {
774 host->ttl_factor = tmp;
775 } else if (tmp >= 1.0) {
776 NOTICE("write_riemann plugin: The configured "
777 "TTLFactor is very small "
778 "(%.1f). A value of 2.0 or "
779 "greater is recommended.",
780 tmp);
781 host->ttl_factor = tmp;
782 } else if (tmp > 0.0) {
783 WARNING("write_riemann plugin: The configured "
784 "TTLFactor is too small to be "
785 "useful (%.1f). I'll use it "
786 "since the user knows best, "
787 "but under protest.",
788 tmp);
789 host->ttl_factor = tmp;
790 } else { /* zero, negative and NAN */
791 ERROR("write_riemann plugin: The configured "
792 "TTLFactor is invalid (%.1f).",
793 tmp);
794 }
795 } else {
796 WARNING("write_riemann plugin: ignoring unknown config "
797 "option: \"%s\"", child->key);
798 }
799 }
800 if (status != 0) {
801 wrr_free(host);
802 return status;
803 }
805 ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
806 host->name);
807 ud.data = host;
808 ud.free_func = wrr_free;
810 pthread_mutex_lock(&host->lock);
812 status = plugin_register_write(callback_name, wrr_write, &ud);
814 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
815 ud.free_func = NULL;
816 plugin_register_flush(callback_name, wrr_batch_flush, &ud);
817 }
818 if (status != 0)
819 WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
820 "failed with status %i.",
821 callback_name, status);
822 else /* success */
823 host->reference_count++;
825 status = plugin_register_notification(callback_name,
826 wrr_notification, &ud);
827 if (status != 0)
828 WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
829 "failed with status %i.",
830 callback_name, status);
831 else /* success */
832 host->reference_count++;
834 if (host->reference_count <= 1)
835 {
836 /* Both callbacks failed => free memory.
837 * We need to unlock here, because riemann_free() will lock.
838 * This is not a race condition, because we're the only one
839 * holding a reference. */
840 pthread_mutex_unlock(&host->lock);
841 wrr_free(host);
842 return (-1);
843 }
845 host->reference_count--;
846 pthread_mutex_unlock(&host->lock);
848 return status;
849 } /* }}} int wrr_config_node */
851 static int wrr_config(oconfig_item_t *ci) /* {{{ */
852 {
853 int i;
854 oconfig_item_t *child;
855 int status;
857 for (i = 0; i < ci->children_num; i++) {
858 child = &ci->children[i];
860 if (strcasecmp("Node", child->key) == 0) {
861 wrr_config_node (child);
862 } else if (strcasecmp(child->key, "attribute") == 0) {
863 char *key = NULL;
864 char *val = NULL;
866 if (child->values_num != 2) {
867 WARNING("riemann attributes need both a key and a value.");
868 return (-1);
869 }
870 if (child->values[0].type != OCONFIG_TYPE_STRING ||
871 child->values[1].type != OCONFIG_TYPE_STRING) {
872 WARNING("riemann attribute needs string arguments.");
873 return (-1);
874 }
875 if ((key = strdup(child->values[0].value.string)) == NULL) {
876 WARNING("cannot allocate memory for attribute key.");
877 return (-1);
878 }
879 if ((val = strdup(child->values[1].value.string)) == NULL) {
880 WARNING("cannot allocate memory for attribute value.");
881 sfree(key);
882 return (-1);
883 }
884 strarray_add(&riemann_attrs, &riemann_attrs_num, key);
885 strarray_add(&riemann_attrs, &riemann_attrs_num, val);
886 DEBUG("write_riemann: got attr: %s => %s", key, val);
887 sfree(key);
888 sfree(val);
889 } else if (strcasecmp(child->key, "tag") == 0) {
890 char *tmp = NULL;
891 status = cf_util_get_string(child, &tmp);
892 if (status != 0)
893 continue;
895 strarray_add(&riemann_tags, &riemann_tags_num, tmp);
896 DEBUG("write_riemann plugin: Got tag: %s", tmp);
897 sfree(tmp);
898 } else {
899 WARNING("write_riemann plugin: Ignoring unknown "
900 "configuration option \"%s\" at top level.",
901 child->key);
902 }
903 }
904 return (0);
905 } /* }}} int wrr_config */
907 void module_register(void)
908 {
909 plugin_register_complex_config("write_riemann", wrr_config);
910 }
912 /* vim: set sw=8 sts=8 ts=8 noet : */