1 /**
2 * collectd - src/write_riemann.c
3 * Copyright (C) 2012,2013 Pierre-Yves Ritschard
4 * Copyright (C) 2013 Florian octo Forster
5 * Copyright (C) 2015,2016 Gergely Nagy
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Authors:
26 * Pierre-Yves Ritschard <pyr at spootnik.org>
27 * Florian octo Forster <octo at collectd.org>
28 * Gergely Nagy <algernon at madhouse-project.org>
29 */
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
34 #include "collectd.h"
35 #include "plugin.h"
36 #include "common.h"
37 #include "configfile.h"
38 #include "utils_cache.h"
39 #include "utils_complain.h"
40 #include "write_riemann_threshold.h"
42 #define RIEMANN_HOST "localhost"
43 #define RIEMANN_PORT 5555
44 #define RIEMANN_TTL_FACTOR 2.0
45 #define RIEMANN_BATCH_MAX 8192
47 struct riemann_host {
48 c_complain_t init_complaint;
49 char *name;
50 char *event_service_prefix;
51 pthread_mutex_t lock;
52 _Bool batch_mode;
53 _Bool notifications;
54 _Bool check_thresholds;
55 _Bool store_rates;
56 _Bool always_append_ds;
57 char *node;
58 int port;
59 riemann_client_type_t client_type;
60 riemann_client_t *client;
61 double ttl_factor;
62 cdtime_t batch_init;
63 int batch_max;
64 int batch_timeout;
65 int reference_count;
66 riemann_message_t *batch_msg;
67 char *tls_ca_file;
68 char *tls_cert_file;
69 char *tls_key_file;
70 struct timeval timeout;
71 };
73 static char **riemann_tags;
74 static size_t riemann_tags_num;
75 static char **riemann_attrs;
76 static size_t riemann_attrs_num;
78 /* host->lock must be held when calling this function. */
79 static int wrr_connect(struct riemann_host *host) /* {{{ */
80 {
81 char const *node;
82 int port;
84 if (host->client)
85 return 0;
87 node = (host->node != NULL) ? host->node : RIEMANN_HOST;
88 port = (host->port) ? host->port : RIEMANN_PORT;
90 host->client = NULL;
92 host->client = riemann_client_create(host->client_type, node, port,
93 RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
94 RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
95 RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
96 RIEMANN_CLIENT_OPTION_NONE);
97 if (host->client == NULL) {
98 c_complain (LOG_ERR, &host->init_complaint,
99 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
100 node, port);
101 return -1;
102 }
103 if (host->timeout.tv_sec != 0) {
104 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
105 riemann_client_free(host->client);
106 host->client = NULL;
107 c_complain (LOG_ERR, &host->init_complaint,
108 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
109 node, port);
110 return -1;
111 }
112 }
114 c_release (LOG_INFO, &host->init_complaint,
115 "write_riemann plugin: Successfully connected to %s:%d",
116 node, port);
118 return 0;
119 } /* }}} int wrr_connect */
121 /* host->lock must be held when calling this function. */
122 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
123 {
124 if (!host->client)
125 return (0);
127 riemann_client_free(host->client);
128 host->client = NULL;
130 return (0);
131 } /* }}} int wrr_disconnect */
133 /**
134 * Function to send messages to riemann.
135 *
136 * Acquires the host lock, disconnects on errors.
137 */
138 static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
139 {
140 int status = 0;
142 status = wrr_connect(host);
143 if (status != 0) {
144 return status;
145 }
147 status = riemann_client_send_message(host->client, msg);
148 if (status != 0) {
149 wrr_disconnect(host);
150 return status;
151 }
153 /*
154 * For TCP we need to receive message acknowledgemenent.
155 */
156 if (host->client_type != RIEMANN_CLIENT_UDP)
157 {
158 riemann_message_t *response;
160 response = riemann_client_recv_message(host->client);
162 if (response == NULL)
163 {
164 wrr_disconnect(host);
165 return errno;
166 }
167 riemann_message_free(response);
168 }
170 return 0;
171 } /* }}} int wrr_send */
173 static int wrr_send(struct riemann_host *host, riemann_message_t *msg)
174 {
175 int status = 0;
177 pthread_mutex_lock (&host->lock);
178 status = wrr_send_nolock(host, msg);
179 pthread_mutex_unlock (&host->lock);
180 return status;
181 }
183 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
184 notification_t const *n)
185 {
186 riemann_message_t *msg;
187 riemann_event_t *event;
188 char service_buffer[6 * DATA_MAX_NAME_LEN];
189 char const *severity;
190 notification_meta_t *meta;
191 size_t i;
193 switch (n->severity)
194 {
195 case NOTIF_OKAY: severity = "ok"; break;
196 case NOTIF_WARNING: severity = "warning"; break;
197 case NOTIF_FAILURE: severity = "critical"; break;
198 default: severity = "unknown";
199 }
201 format_name(service_buffer, sizeof(service_buffer),
202 /* host = */ "", n->plugin, n->plugin_instance,
203 n->type, n->type_instance);
205 event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
206 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
207 RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
208 RIEMANN_EVENT_FIELD_STATE, severity,
209 RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
210 RIEMANN_EVENT_FIELD_NONE);
212 if (n->host[0] != 0)
213 riemann_event_string_attribute_add(event, "host", n->host);
214 if (n->plugin[0] != 0)
215 riemann_event_string_attribute_add(event, "plugin", n->plugin);
216 if (n->plugin_instance[0] != 0)
217 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
219 if (n->type[0] != 0)
220 riemann_event_string_attribute_add(event, "type", n->type);
221 if (n->type_instance[0] != 0)
222 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
224 for (i = 0; i < riemann_attrs_num; i += 2)
225 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
227 for (i = 0; i < riemann_tags_num; i++)
228 riemann_event_tag_add(event, riemann_tags[i]);
230 if (n->message[0] != 0)
231 riemann_event_string_attribute_add(event, "description", n->message);
233 /* Pull in values from threshold and add extra attributes */
234 for (meta = n->meta; meta != NULL; meta = meta->next)
235 {
236 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
237 {
238 riemann_event_set(event,
239 RIEMANN_EVENT_FIELD_METRIC_D,
240 (double) meta->nm_value.nm_double,
241 RIEMANN_EVENT_FIELD_NONE);
242 continue;
243 }
245 if (meta->type == NM_TYPE_STRING) {
246 riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
247 continue;
248 }
249 }
251 msg = riemann_message_create_with_events(event, NULL);
252 if (msg == NULL)
253 {
254 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
255 riemann_event_free (event);
256 return (NULL);
257 }
259 DEBUG("write_riemann plugin: Successfully created message for notification: "
260 "host = \"%s\", service = \"%s\", state = \"%s\"",
261 event->host, event->service, event->state);
262 return (msg);
263 } /* }}} riemann_message_t *wrr_notification_to_message */
265 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
266 data_set_t const *ds,
267 value_list_t const *vl, size_t index,
268 gauge_t const *rates,
269 int status)
270 {
271 riemann_event_t *event;
272 char name_buffer[5 * DATA_MAX_NAME_LEN];
273 char service_buffer[6 * DATA_MAX_NAME_LEN];
274 size_t i;
276 event = riemann_event_new();
277 if (event == NULL)
278 {
279 ERROR("write_riemann plugin: riemann_event_new() failed.");
280 return (NULL);
281 }
283 format_name(name_buffer, sizeof(name_buffer),
284 /* host = */ "", vl->plugin, vl->plugin_instance,
285 vl->type, vl->type_instance);
286 if (host->always_append_ds || (ds->ds_num > 1))
287 {
288 if (host->event_service_prefix == NULL)
289 ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
290 &name_buffer[1], ds->ds[index].name);
291 else
292 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
293 host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
294 }
295 else
296 {
297 if (host->event_service_prefix == NULL)
298 sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
299 else
300 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
301 host->event_service_prefix, &name_buffer[1]);
302 }
304 riemann_event_set(event,
305 RIEMANN_EVENT_FIELD_HOST, vl->host,
306 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
307 RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
308 RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
309 "plugin", vl->plugin,
310 "type", vl->type,
311 "ds_name", ds->ds[index].name,
312 NULL,
313 RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
314 RIEMANN_EVENT_FIELD_NONE);
316 if (host->check_thresholds) {
317 const char *state = NULL;
319 switch (status) {
320 case STATE_OKAY:
321 state = "ok";
322 break;
323 case STATE_ERROR:
324 state = "critical";
325 break;
326 case STATE_WARNING:
327 state = "warning";
328 break;
329 case STATE_MISSING:
330 state = "unknown";
331 break;
332 }
333 if (state)
334 riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
335 RIEMANN_EVENT_FIELD_NONE);
336 }
338 if (vl->plugin_instance[0] != 0)
339 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
340 if (vl->type_instance[0] != 0)
341 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
343 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
344 {
345 char ds_type[DATA_MAX_NAME_LEN];
347 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
348 DS_TYPE_TO_STRING(ds->ds[index].type));
349 riemann_event_string_attribute_add(event, "ds_type", ds_type);
350 }
351 else
352 {
353 riemann_event_string_attribute_add(event, "ds_type",
354 DS_TYPE_TO_STRING(ds->ds[index].type));
355 }
357 {
358 char ds_index[DATA_MAX_NAME_LEN];
360 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
361 riemann_event_string_attribute_add(event, "ds_index", ds_index);
362 }
364 for (i = 0; i < riemann_attrs_num; i += 2)
365 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
367 for (i = 0; i < riemann_tags_num; i++)
368 riemann_event_tag_add(event, riemann_tags[i]);
370 if (ds->ds[index].type == DS_TYPE_GAUGE)
371 {
372 riemann_event_set(event,
373 RIEMANN_EVENT_FIELD_METRIC_D,
374 (double) vl->values[index].gauge,
375 RIEMANN_EVENT_FIELD_NONE);
376 }
377 else if (rates != NULL)
378 {
379 riemann_event_set(event,
380 RIEMANN_EVENT_FIELD_METRIC_D,
381 (double) rates[index],
382 RIEMANN_EVENT_FIELD_NONE);
383 }
384 else
385 {
386 int64_t metric;
388 if (ds->ds[index].type == DS_TYPE_DERIVE)
389 metric = (int64_t) vl->values[index].derive;
390 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
391 metric = (int64_t) vl->values[index].absolute;
392 else
393 metric = (int64_t) vl->values[index].counter;
395 riemann_event_set(event,
396 RIEMANN_EVENT_FIELD_METRIC_S64,
397 (int64_t) metric,
398 RIEMANN_EVENT_FIELD_NONE);
399 }
401 DEBUG("write_riemann plugin: Successfully created message for metric: "
402 "host = \"%s\", service = \"%s\"",
403 event->host, event->service);
404 return (event);
405 } /* }}} riemann_event_t *wrr_value_to_event */
407 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
408 data_set_t const *ds,
409 value_list_t const *vl,
410 int *statuses)
411 {
412 riemann_message_t *msg;
413 size_t i;
414 gauge_t *rates = NULL;
416 /* Initialize the Msg structure. */
417 msg = riemann_message_new();
418 if (msg == NULL)
419 {
420 ERROR ("write_riemann plugin: riemann_message_new failed.");
421 return (NULL);
422 }
424 if (host->store_rates)
425 {
426 rates = uc_get_rate(ds, vl);
427 if (rates == NULL)
428 {
429 ERROR("write_riemann plugin: uc_get_rate failed.");
430 riemann_message_free(msg);
431 return (NULL);
432 }
433 }
435 for (i = 0; i < vl->values_len; i++)
436 {
437 riemann_event_t *event;
439 event = wrr_value_to_event(host, ds, vl,
440 (int) i, rates, statuses[i]);
441 if (event == NULL)
442 {
443 riemann_message_free(msg);
444 sfree(rates);
445 return (NULL);
446 }
447 riemann_message_append_events(msg, event, NULL);
448 }
450 sfree(rates);
451 return (msg);
452 } /* }}} riemann_message_t *wrr_value_list_to_message */
454 /*
455 * Always call while holding host->lock !
456 */
457 static int wrr_batch_flush_nolock(cdtime_t timeout,
458 struct riemann_host *host)
459 {
460 cdtime_t now;
461 int status = 0;
463 now = cdtime();
464 if (timeout > 0) {
465 if ((host->batch_init + timeout) > now) {
466 return status;
467 }
468 }
469 wrr_send_nolock(host, host->batch_msg);
470 riemann_message_free(host->batch_msg);
472 host->batch_init = now;
473 host->batch_msg = NULL;
474 return status;
475 }
477 static int wrr_batch_flush(cdtime_t timeout,
478 const char *identifier __attribute__((unused)),
479 user_data_t *user_data)
480 {
481 struct riemann_host *host;
482 int status;
484 if (user_data == NULL)
485 return (-EINVAL);
487 host = user_data->data;
488 pthread_mutex_lock(&host->lock);
489 status = wrr_batch_flush_nolock(timeout, host);
490 if (status != 0)
491 c_complain (LOG_ERR, &host->init_complaint,
492 "write_riemann plugin: riemann_client_send failed with status %i",
493 status);
494 else
495 c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
497 pthread_mutex_unlock(&host->lock);
498 return status;
499 }
501 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
502 data_set_t const *ds,
503 value_list_t const *vl,
504 int *statuses)
505 {
506 riemann_message_t *msg;
507 size_t len;
508 int ret;
509 cdtime_t timeout;
511 msg = wrr_value_list_to_message(host, ds, vl, statuses);
512 if (msg == NULL)
513 return -1;
515 pthread_mutex_lock(&host->lock);
517 if (host->batch_msg == NULL) {
518 host->batch_msg = msg;
519 } else {
520 int status;
522 status = riemann_message_append_events_n(host->batch_msg,
523 msg->n_events,
524 msg->events);
525 msg->n_events = 0;
526 msg->events = NULL;
528 riemann_message_free(msg);
530 if (status != 0) {
531 pthread_mutex_unlock(&host->lock);
532 ERROR("write_riemann plugin: out of memory");
533 return -1;
534 }
535 }
537 len = riemann_message_get_packed_size(host->batch_msg);
538 ret = 0;
539 if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
540 ret = wrr_batch_flush_nolock(0, host);
541 } else {
542 if (host->batch_timeout > 0) {
543 timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
544 ret = wrr_batch_flush_nolock(timeout, host);
545 }
546 }
548 pthread_mutex_unlock(&host->lock);
549 return ret;
550 } /* }}} riemann_message_t *wrr_batch_add_value_list */
552 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
553 {
554 int status;
555 struct riemann_host *host = ud->data;
556 riemann_message_t *msg;
558 if (!host->notifications)
559 return 0;
561 /*
562 * Never batch for notifications, send them ASAP
563 */
564 msg = wrr_notification_to_message(host, n);
565 if (msg == NULL)
566 return (-1);
568 status = wrr_send(host, msg);
569 if (status != 0)
570 c_complain (LOG_ERR, &host->init_complaint,
571 "write_riemann plugin: riemann_client_send failed with status %i",
572 status);
573 else
574 c_release (LOG_DEBUG, &host->init_complaint,
575 "write_riemann plugin: riemann_client_send succeeded");
577 riemann_message_free(msg);
578 return (status);
579 } /* }}} int wrr_notification */
581 static int wrr_write(const data_set_t *ds, /* {{{ */
582 const value_list_t *vl,
583 user_data_t *ud)
584 {
585 int status = 0;
586 int statuses[vl->values_len];
587 struct riemann_host *host = ud->data;
588 riemann_message_t *msg;
590 if (host->check_thresholds) {
591 status = write_riemann_threshold_check(ds, vl, statuses);
592 if (status != 0)
593 return status;
594 } else {
595 memset (statuses, 0, sizeof (statuses));
596 }
598 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
599 wrr_batch_add_value_list(host, ds, vl, statuses);
600 } else {
601 msg = wrr_value_list_to_message(host, ds, vl, statuses);
602 if (msg == NULL)
603 return (-1);
605 status = wrr_send(host, msg);
607 riemann_message_free(msg);
608 }
609 return status;
610 } /* }}} int wrr_write */
612 static void wrr_free(void *p) /* {{{ */
613 {
614 struct riemann_host *host = p;
616 if (host == NULL)
617 return;
619 pthread_mutex_lock(&host->lock);
621 host->reference_count--;
622 if (host->reference_count > 0)
623 {
624 pthread_mutex_unlock(&host->lock);
625 return;
626 }
628 wrr_disconnect(host);
630 pthread_mutex_destroy(&host->lock);
631 sfree(host);
632 } /* }}} void wrr_free */
634 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
635 {
636 struct riemann_host *host = NULL;
637 int status = 0;
638 int i;
639 oconfig_item_t *child;
640 char callback_name[DATA_MAX_NAME_LEN];
641 user_data_t ud;
643 if ((host = calloc(1, sizeof(*host))) == NULL) {
644 ERROR ("write_riemann plugin: calloc failed.");
645 return ENOMEM;
646 }
647 pthread_mutex_init(&host->lock, NULL);
648 C_COMPLAIN_INIT (&host->init_complaint);
649 host->reference_count = 1;
650 host->node = NULL;
651 host->port = 0;
652 host->notifications = 1;
653 host->check_thresholds = 0;
654 host->store_rates = 1;
655 host->always_append_ds = 0;
656 host->batch_mode = 1;
657 host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
658 host->batch_init = cdtime();
659 host->batch_timeout = 0;
660 host->ttl_factor = RIEMANN_TTL_FACTOR;
661 host->client = NULL;
662 host->client_type = RIEMANN_CLIENT_TCP;
663 host->timeout.tv_sec = 0;
664 host->timeout.tv_usec = 0;
666 status = cf_util_get_string(ci, &host->name);
667 if (status != 0) {
668 WARNING("write_riemann plugin: Required host name is missing.");
669 wrr_free(host);
670 return -1;
671 }
673 for (i = 0; i < ci->children_num; i++) {
674 /*
675 * The code here could be simplified but makes room
676 * for easy adding of new options later on.
677 */
678 child = &ci->children[i];
679 status = 0;
681 if (strcasecmp("Host", child->key) == 0) {
682 status = cf_util_get_string(child, &host->node);
683 if (status != 0)
684 break;
685 } else if (strcasecmp("Notifications", child->key) == 0) {
686 status = cf_util_get_boolean(child, &host->notifications);
687 if (status != 0)
688 break;
689 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
690 status = cf_util_get_string(child, &host->event_service_prefix);
691 if (status != 0)
692 break;
693 } else if (strcasecmp("CheckThresholds", child->key) == 0) {
694 status = cf_util_get_boolean(child, &host->check_thresholds);
695 if (status != 0)
696 break;
697 } else if (strcasecmp("Batch", child->key) == 0) {
698 status = cf_util_get_boolean(child, &host->batch_mode);
699 if (status != 0)
700 break;
701 } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
702 status = cf_util_get_int(child, &host->batch_max);
703 if (status != 0)
704 break;
705 } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
706 status = cf_util_get_int(child, &host->batch_timeout);
707 if (status != 0)
708 break;
709 } else if (strcasecmp("Timeout", child->key) == 0) {
710 status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
711 if (status != 0)
712 break;
713 } else if (strcasecmp("Port", child->key) == 0) {
714 host->port = cf_util_get_port_number(child);
715 if (host->port == -1) {
716 ERROR("write_riemann plugin: Invalid argument "
717 "configured for the \"Port\" "
718 "option.");
719 break;
720 }
721 } else if (strcasecmp("Protocol", child->key) == 0) {
722 char tmp[16];
723 status = cf_util_get_string_buffer(child,
724 tmp, sizeof(tmp));
725 if (status != 0)
726 {
727 ERROR("write_riemann plugin: cf_util_get_"
728 "string_buffer failed with "
729 "status %i.", status);
730 break;
731 }
733 if (strcasecmp("UDP", tmp) == 0)
734 host->client_type = RIEMANN_CLIENT_UDP;
735 else if (strcasecmp("TCP", tmp) == 0)
736 host->client_type = RIEMANN_CLIENT_TCP;
737 else if (strcasecmp("TLS", tmp) == 0)
738 host->client_type = RIEMANN_CLIENT_TLS;
739 else
740 WARNING("write_riemann plugin: The value "
741 "\"%s\" is not valid for the "
742 "\"Protocol\" option. Use "
743 "either \"UDP\", \"TCP\" or \"TLS\".",
744 tmp);
745 } else if (strcasecmp("TLSCAFile", child->key) == 0) {
746 status = cf_util_get_string(child, &host->tls_ca_file);
747 if (status != 0)
748 {
749 ERROR("write_riemann plugin: cf_util_get_"
750 "string_buffer failed with "
751 "status %i.", status);
752 break;
753 }
754 } else if (strcasecmp("TLSCertFile", child->key) == 0) {
755 status = cf_util_get_string(child, &host->tls_cert_file);
756 if (status != 0)
757 {
758 ERROR("write_riemann plugin: cf_util_get_"
759 "string_buffer failed with "
760 "status %i.", status);
761 break;
762 }
763 } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
764 status = cf_util_get_string(child, &host->tls_key_file);
765 if (status != 0)
766 {
767 ERROR("write_riemann plugin: cf_util_get_"
768 "string_buffer failed with "
769 "status %i.", status);
770 break;
771 }
772 } else if (strcasecmp("StoreRates", child->key) == 0) {
773 status = cf_util_get_boolean(child, &host->store_rates);
774 if (status != 0)
775 break;
776 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
777 status = cf_util_get_boolean(child,
778 &host->always_append_ds);
779 if (status != 0)
780 break;
781 } else if (strcasecmp("TTLFactor", child->key) == 0) {
782 double tmp = NAN;
783 status = cf_util_get_double(child, &tmp);
784 if (status != 0)
785 break;
786 if (tmp >= 2.0) {
787 host->ttl_factor = tmp;
788 } else if (tmp >= 1.0) {
789 NOTICE("write_riemann plugin: The configured "
790 "TTLFactor is very small "
791 "(%.1f). A value of 2.0 or "
792 "greater is recommended.",
793 tmp);
794 host->ttl_factor = tmp;
795 } else if (tmp > 0.0) {
796 WARNING("write_riemann plugin: The configured "
797 "TTLFactor is too small to be "
798 "useful (%.1f). I'll use it "
799 "since the user knows best, "
800 "but under protest.",
801 tmp);
802 host->ttl_factor = tmp;
803 } else { /* zero, negative and NAN */
804 ERROR("write_riemann plugin: The configured "
805 "TTLFactor is invalid (%.1f).",
806 tmp);
807 }
808 } else {
809 WARNING("write_riemann plugin: ignoring unknown config "
810 "option: \"%s\"", child->key);
811 }
812 }
813 if (status != 0) {
814 wrr_free(host);
815 return status;
816 }
818 ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
819 host->name);
820 ud.data = host;
821 ud.free_func = wrr_free;
823 pthread_mutex_lock(&host->lock);
825 status = plugin_register_write(callback_name, wrr_write, &ud);
827 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
828 ud.free_func = NULL;
829 plugin_register_flush(callback_name, wrr_batch_flush, &ud);
830 }
831 if (status != 0)
832 WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
833 "failed with status %i.",
834 callback_name, status);
835 else /* success */
836 host->reference_count++;
838 status = plugin_register_notification(callback_name,
839 wrr_notification, &ud);
840 if (status != 0)
841 WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
842 "failed with status %i.",
843 callback_name, status);
844 else /* success */
845 host->reference_count++;
847 if (host->reference_count <= 1)
848 {
849 /* Both callbacks failed => free memory.
850 * We need to unlock here, because riemann_free() will lock.
851 * This is not a race condition, because we're the only one
852 * holding a reference. */
853 pthread_mutex_unlock(&host->lock);
854 wrr_free(host);
855 return (-1);
856 }
858 host->reference_count--;
859 pthread_mutex_unlock(&host->lock);
861 return status;
862 } /* }}} int wrr_config_node */
864 static int wrr_config(oconfig_item_t *ci) /* {{{ */
865 {
866 int i;
867 oconfig_item_t *child;
868 int status;
870 for (i = 0; i < ci->children_num; i++) {
871 child = &ci->children[i];
873 if (strcasecmp("Node", child->key) == 0) {
874 wrr_config_node (child);
875 } else if (strcasecmp(child->key, "attribute") == 0) {
876 char *key = NULL;
877 char *val = NULL;
879 if (child->values_num != 2) {
880 WARNING("riemann attributes need both a key and a value.");
881 return (-1);
882 }
883 if (child->values[0].type != OCONFIG_TYPE_STRING ||
884 child->values[1].type != OCONFIG_TYPE_STRING) {
885 WARNING("riemann attribute needs string arguments.");
886 return (-1);
887 }
888 if ((key = strdup(child->values[0].value.string)) == NULL) {
889 WARNING("cannot allocate memory for attribute key.");
890 return (-1);
891 }
892 if ((val = strdup(child->values[1].value.string)) == NULL) {
893 WARNING("cannot allocate memory for attribute value.");
894 sfree(key);
895 return (-1);
896 }
897 strarray_add(&riemann_attrs, &riemann_attrs_num, key);
898 strarray_add(&riemann_attrs, &riemann_attrs_num, val);
899 DEBUG("write_riemann: got attr: %s => %s", key, val);
900 sfree(key);
901 sfree(val);
902 } else if (strcasecmp(child->key, "tag") == 0) {
903 char *tmp = NULL;
904 status = cf_util_get_string(child, &tmp);
905 if (status != 0)
906 continue;
908 strarray_add(&riemann_tags, &riemann_tags_num, tmp);
909 DEBUG("write_riemann plugin: Got tag: %s", tmp);
910 sfree(tmp);
911 } else {
912 WARNING("write_riemann plugin: Ignoring unknown "
913 "configuration option \"%s\" at top level.",
914 child->key);
915 }
916 }
917 return (0);
918 } /* }}} int wrr_config */
920 void module_register(void)
921 {
922 plugin_register_complex_config("write_riemann", wrr_config);
923 }
925 /* vim: set sw=8 sts=8 ts=8 noet : */