1 /**
2 * collectd - src/write_riemann.c
3 * Copyright (C) 2012,2013 Pierre-Yves Ritschard
4 * Copyright (C) 2013 Florian octo Forster
5 * Copyright (C) 2015,2016 Gergely Nagy
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Authors:
26 * Pierre-Yves Ritschard <pyr at spootnik.org>
27 * Florian octo Forster <octo at collectd.org>
28 * Gergely Nagy <algernon at madhouse-project.org>
29 */
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "write_riemann_threshold.h"
42 #define RIEMANN_HOST "localhost"
43 #define RIEMANN_PORT 5555
44 #define RIEMANN_TTL_FACTOR 2.0
45 #define RIEMANN_BATCH_MAX 8192
47 struct riemann_host {
48 char *name;
49 char *event_service_prefix;
50 pthread_mutex_t lock;
51 _Bool batch_mode;
52 _Bool notifications;
53 _Bool check_thresholds;
54 _Bool store_rates;
55 _Bool always_append_ds;
56 char *node;
57 int port;
58 riemann_client_type_t client_type;
59 riemann_client_t *client;
60 double ttl_factor;
61 cdtime_t batch_init;
62 int batch_max;
63 int reference_count;
64 riemann_message_t *batch_msg;
65 char *tls_ca_file;
66 char *tls_cert_file;
67 char *tls_key_file;
68 };
70 static char **riemann_tags;
71 static size_t riemann_tags_num;
72 static char **riemann_attrs;
73 static size_t riemann_attrs_num;
75 /* host->lock must be held when calling this function. */
76 static int wrr_connect(struct riemann_host *host) /* {{{ */
77 {
78 char const *node;
79 int port;
81 if (host->client)
82 return 0;
84 node = (host->node != NULL) ? host->node : RIEMANN_HOST;
85 port = (host->port) ? host->port : RIEMANN_PORT;
87 host->client = NULL;
89 host->client = riemann_client_create(host->client_type, node, port,
90 RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
91 RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
92 RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
93 RIEMANN_CLIENT_OPTION_NONE);
94 if (host->client == NULL) {
95 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
96 node, port);
97 return -1;
98 }
99 DEBUG("write_riemann plugin: got a successful connection for: %s:%d",
100 node, port);
102 return 0;
103 } /* }}} int wrr_connect */
105 /* host->lock must be held when calling this function. */
106 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
107 {
108 if (!host->client)
109 return (0);
111 riemann_client_free(host->client);
112 host->client = NULL;
114 return (0);
115 } /* }}} int wrr_disconnect */
117 /**
118 * Function to send messages to riemann.
119 *
120 * Acquires the host lock, disconnects on errors.
121 */
122 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
123 {
124 int status = 0;
125 pthread_mutex_lock (&host->lock);
127 status = wrr_connect(host);
128 if (status != 0)
129 return status;
131 status = riemann_client_send_message(host->client, msg);
132 if (status != 0) {
133 wrr_disconnect(host);
134 pthread_mutex_unlock(&host->lock);
135 return status;
136 }
138 /*
139 * For TCP we need to receive message acknowledgemenent.
140 */
141 if (host->client_type != RIEMANN_CLIENT_UDP)
142 {
143 riemann_message_t *response;
145 response = riemann_client_recv_message(host->client);
147 if (response == NULL)
148 {
149 wrr_disconnect(host);
150 pthread_mutex_unlock(&host->lock);
151 return errno;
152 }
153 riemann_message_free(response);
154 }
156 pthread_mutex_unlock (&host->lock);
157 return 0;
158 } /* }}} int wrr_send */
160 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
161 notification_t const *n)
162 {
163 riemann_message_t *msg;
164 riemann_event_t *event;
165 char service_buffer[6 * DATA_MAX_NAME_LEN];
166 char const *severity;
167 notification_meta_t *meta;
168 size_t i;
170 switch (n->severity)
171 {
172 case NOTIF_OKAY: severity = "ok"; break;
173 case NOTIF_WARNING: severity = "warning"; break;
174 case NOTIF_FAILURE: severity = "critical"; break;
175 default: severity = "unknown";
176 }
178 format_name(service_buffer, sizeof(service_buffer),
179 /* host = */ "", n->plugin, n->plugin_instance,
180 n->type, n->type_instance);
182 event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
183 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
184 RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
185 RIEMANN_EVENT_FIELD_STATE, severity,
186 RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
187 RIEMANN_EVENT_FIELD_NONE);
189 if (n->host[0] != 0)
190 riemann_event_attribute_add(event,
191 riemann_attribute_create("host", n->host));
192 if (n->plugin[0] != 0)
193 riemann_event_attribute_add(event,
194 riemann_attribute_create("plugin", n->plugin));
195 if (n->plugin_instance[0] != 0)
196 riemann_event_attribute_add(event,
197 riemann_attribute_create("plugin_instance",
198 n->plugin_instance));
200 if (n->type[0] != 0)
201 riemann_event_attribute_add(event,
202 riemann_attribute_create("type", n->type));
203 if (n->type_instance[0] != 0)
204 riemann_event_attribute_add(event,
205 riemann_attribute_create("type_instance",
206 n->type_instance));
208 for (i = 0; i < riemann_attrs_num; i += 2)
209 riemann_event_attribute_add(event,
210 riemann_attribute_create(riemann_attrs[i],
211 riemann_attrs[i +1]));
213 for (i = 0; i < riemann_tags_num; i++)
214 riemann_event_tag_add(event, riemann_tags[i]);
216 if (n->message[0] != 0)
217 riemann_event_attribute_add(event,
218 riemann_attribute_create("description", n->message));
220 /* Pull in values from threshold and add extra attributes */
221 for (meta = n->meta; meta != NULL; meta = meta->next)
222 {
223 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
224 {
225 riemann_event_set(event,
226 RIEMANN_EVENT_FIELD_METRIC_D,
227 (double) meta->nm_value.nm_double,
228 RIEMANN_EVENT_FIELD_NONE);
229 continue;
230 }
232 if (meta->type == NM_TYPE_STRING) {
233 riemann_event_attribute_add(event,
234 riemann_attribute_create(meta->name,
235 meta->nm_value.nm_string));
236 continue;
237 }
238 }
240 msg = riemann_message_create_with_events(event, NULL);
241 if (msg == NULL)
242 {
243 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
244 riemann_event_free (event);
245 return (NULL);
246 }
248 DEBUG("write_riemann plugin: Successfully created message for notification: "
249 "host = \"%s\", service = \"%s\", state = \"%s\"",
250 event->host, event->service, event->state);
251 return (msg);
252 } /* }}} riemann_message_t *wrr_notification_to_message */
254 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
255 data_set_t const *ds,
256 value_list_t const *vl, size_t index,
257 gauge_t const *rates,
258 int status)
259 {
260 riemann_event_t *event;
261 char name_buffer[5 * DATA_MAX_NAME_LEN];
262 char service_buffer[6 * DATA_MAX_NAME_LEN];
263 size_t i;
265 event = riemann_event_new();
266 if (event == NULL)
267 {
268 ERROR("write_riemann plugin: riemann_event_new() failed.");
269 return (NULL);
270 }
272 format_name(name_buffer, sizeof(name_buffer),
273 /* host = */ "", vl->plugin, vl->plugin_instance,
274 vl->type, vl->type_instance);
275 if (host->always_append_ds || (ds->ds_num > 1))
276 {
277 if (host->event_service_prefix == NULL)
278 ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
279 &name_buffer[1], ds->ds[index].name);
280 else
281 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
282 host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
283 }
284 else
285 {
286 if (host->event_service_prefix == NULL)
287 sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
288 else
289 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
290 host->event_service_prefix, &name_buffer[1]);
291 }
293 riemann_event_set(event,
294 RIEMANN_EVENT_FIELD_HOST, vl->host,
295 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
296 RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
297 RIEMANN_EVENT_FIELD_ATTRIBUTES,
298 riemann_attribute_create("plugin", vl->plugin),
299 riemann_attribute_create("type", vl->type),
300 riemann_attribute_create("ds_name", ds->ds[index].name),
301 NULL,
302 RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
303 RIEMANN_EVENT_FIELD_NONE);
305 if (host->check_thresholds) {
306 const char *state = NULL;
308 switch (status) {
309 case STATE_OKAY:
310 state = "ok";
311 break;
312 case STATE_ERROR:
313 state = "critical";
314 break;
315 case STATE_WARNING:
316 state = "warning";
317 break;
318 case STATE_MISSING:
319 state = "unknown";
320 break;
321 }
322 if (state)
323 riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
324 RIEMANN_EVENT_FIELD_NONE);
325 }
327 if (vl->plugin_instance[0] != 0)
328 riemann_event_attribute_add(event,
329 riemann_attribute_create("plugin_instance",
330 vl->plugin_instance));
331 if (vl->type_instance[0] != 0)
332 riemann_event_attribute_add(event,
333 riemann_attribute_create("type_instance",
334 vl->type_instance));
336 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
337 {
338 char ds_type[DATA_MAX_NAME_LEN];
340 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
341 DS_TYPE_TO_STRING(ds->ds[index].type));
342 riemann_event_attribute_add(event,
343 riemann_attribute_create("ds_type", ds_type));
344 }
345 else
346 {
347 riemann_event_attribute_add(event,
348 riemann_attribute_create("ds_type",
349 DS_TYPE_TO_STRING(ds->ds[index].type)));
350 }
352 {
353 char ds_index[DATA_MAX_NAME_LEN];
355 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
356 riemann_event_attribute_add(event,
357 riemann_attribute_create("ds_index", ds_index));
358 }
360 for (i = 0; i < riemann_attrs_num; i += 2)
361 riemann_event_attribute_add(event,
362 riemann_attribute_create(riemann_attrs[i],
363 riemann_attrs[i +1]));
365 for (i = 0; i < riemann_tags_num; i++)
366 riemann_event_tag_add(event, riemann_tags[i]);
368 if (ds->ds[index].type == DS_TYPE_GAUGE)
369 {
370 riemann_event_set(event,
371 RIEMANN_EVENT_FIELD_METRIC_D,
372 (double) vl->values[index].gauge,
373 RIEMANN_EVENT_FIELD_NONE);
374 }
375 else if (rates != NULL)
376 {
377 riemann_event_set(event,
378 RIEMANN_EVENT_FIELD_METRIC_D,
379 (double) rates[index],
380 RIEMANN_EVENT_FIELD_NONE);
381 }
382 else
383 {
384 int64_t metric;
386 if (ds->ds[index].type == DS_TYPE_DERIVE)
387 metric = (int64_t) vl->values[index].derive;
388 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
389 metric = (int64_t) vl->values[index].absolute;
390 else
391 metric = (int64_t) vl->values[index].counter;
393 riemann_event_set(event,
394 RIEMANN_EVENT_FIELD_METRIC_S64,
395 (int64_t) metric,
396 RIEMANN_EVENT_FIELD_NONE);
397 }
399 DEBUG("write_riemann plugin: Successfully created message for metric: "
400 "host = \"%s\", service = \"%s\"",
401 event->host, event->service);
402 return (event);
403 } /* }}} riemann_event_t *wrr_value_to_event */
405 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
406 data_set_t const *ds,
407 value_list_t const *vl,
408 int *statuses)
409 {
410 riemann_message_t *msg;
411 size_t i;
412 gauge_t *rates = NULL;
414 /* Initialize the Msg structure. */
415 msg = riemann_message_new();
416 if (msg == NULL)
417 {
418 ERROR ("write_riemann plugin: riemann_message_new failed.");
419 return (NULL);
420 }
422 if (host->store_rates)
423 {
424 rates = uc_get_rate(ds, vl);
425 if (rates == NULL)
426 {
427 ERROR("write_riemann plugin: uc_get_rate failed.");
428 riemann_message_free(msg);
429 return (NULL);
430 }
431 }
433 for (i = 0; i < vl->values_len; i++)
434 {
435 riemann_event_t *event;
437 event = wrr_value_to_event(host, ds, vl,
438 (int) i, rates, statuses[i]);
439 if (event == NULL)
440 {
441 riemann_message_free(msg);
442 sfree(rates);
443 return (NULL);
444 }
445 riemann_message_append_events(msg, event, NULL);
446 }
448 sfree(rates);
449 return (msg);
450 } /* }}} riemann_message_t *wrr_value_list_to_message */
452 /*
453 * Always call while holding host->lock !
454 */
455 static int wrr_batch_flush_nolock(cdtime_t timeout,
456 struct riemann_host *host)
457 {
458 cdtime_t now;
459 int status = 0;
461 if (timeout > 0) {
462 now = cdtime();
463 if ((host->batch_init + timeout) > now)
464 return status;
465 }
466 wrr_send(host, host->batch_msg);
467 riemann_message_free(host->batch_msg);
469 if (host->client_type != RIEMANN_CLIENT_UDP)
470 {
471 riemann_message_t *response;
473 response = riemann_client_recv_message(host->client);
475 if (!response)
476 {
477 wrr_disconnect(host);
478 return errno;
479 }
481 riemann_message_free(response);
482 }
484 host->batch_init = cdtime();
485 host->batch_msg = NULL;
486 return status;
487 }
489 static int wrr_batch_flush(cdtime_t timeout,
490 const char *identifier __attribute__((unused)),
491 user_data_t *user_data)
492 {
493 struct riemann_host *host;
494 int status;
496 if (user_data == NULL)
497 return (-EINVAL);
499 host = user_data->data;
500 pthread_mutex_lock(&host->lock);
501 status = wrr_batch_flush_nolock(timeout, host);
502 if (status != 0)
503 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
504 status);
506 pthread_mutex_unlock(&host->lock);
507 return status;
508 }
510 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
511 data_set_t const *ds,
512 value_list_t const *vl,
513 int *statuses)
514 {
515 riemann_message_t *msg;
516 size_t len;
517 int ret;
519 msg = wrr_value_list_to_message(host, ds, vl, statuses);
520 if (msg == NULL)
521 return -1;
523 pthread_mutex_lock(&host->lock);
525 if (host->batch_msg == NULL) {
526 host->batch_msg = msg;
527 } else {
528 int status;
530 status = riemann_message_append_events_n(host->batch_msg,
531 msg->n_events,
532 msg->events);
533 msg->n_events = 0;
534 msg->events = NULL;
536 riemann_message_free(msg);
538 if (status != 0) {
539 pthread_mutex_unlock(&host->lock);
540 ERROR("write_riemann plugin: out of memory");
541 return -1;
542 }
543 }
545 len = protobuf_c_message_get_packed_size((const ProtobufCMessage*)(host->batch_msg));
546 ret = 0;
547 if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
548 ret = wrr_batch_flush_nolock(0, host);
549 }
551 pthread_mutex_unlock(&host->lock);
552 return ret;
553 } /* }}} riemann_message_t *wrr_batch_add_value_list */
555 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
556 {
557 int status;
558 struct riemann_host *host = ud->data;
559 riemann_message_t *msg;
561 if (!host->notifications)
562 return 0;
564 /*
565 * Never batch for notifications, send them ASAP
566 */
567 msg = wrr_notification_to_message(host, n);
568 if (msg == NULL)
569 return (-1);
571 status = wrr_send(host, msg);
572 if (status != 0)
573 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
574 status);
576 riemann_message_free(msg);
577 return (status);
578 } /* }}} int wrr_notification */
580 static int wrr_write(const data_set_t *ds, /* {{{ */
581 const value_list_t *vl,
582 user_data_t *ud)
583 {
584 int status = 0;
585 int statuses[vl->values_len];
586 struct riemann_host *host = ud->data;
587 riemann_message_t *msg;
589 if (host->check_thresholds) {
590 status = write_riemann_threshold_check(ds, vl, statuses);
591 if (status != 0)
592 return status;
593 } else {
594 memset (statuses, 0, sizeof (statuses));
595 }
597 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
598 wrr_batch_add_value_list(host, ds, vl, statuses);
599 } else {
600 msg = wrr_value_list_to_message(host, ds, vl, statuses);
601 if (msg == NULL)
602 return (-1);
604 status = wrr_send(host, msg);
605 if (status != 0)
606 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
607 status);
609 riemann_message_free(msg);
610 }
611 return status;
612 } /* }}} int wrr_write */
614 static void wrr_free(void *p) /* {{{ */
615 {
616 struct riemann_host *host = p;
618 if (host == NULL)
619 return;
621 pthread_mutex_lock(&host->lock);
623 host->reference_count--;
624 if (host->reference_count > 0)
625 {
626 pthread_mutex_unlock(&host->lock);
627 return;
628 }
630 wrr_disconnect(host);
632 pthread_mutex_destroy(&host->lock);
633 sfree(host);
634 } /* }}} void wrr_free */
636 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
637 {
638 struct riemann_host *host = NULL;
639 int status = 0;
640 int i;
641 oconfig_item_t *child;
642 char callback_name[DATA_MAX_NAME_LEN];
643 user_data_t ud;
645 if ((host = calloc(1, sizeof(*host))) == NULL) {
646 ERROR ("write_riemann plugin: calloc failed.");
647 return ENOMEM;
648 }
649 pthread_mutex_init(&host->lock, NULL);
650 host->reference_count = 1;
651 host->node = NULL;
652 host->port = 0;
653 host->notifications = 1;
654 host->check_thresholds = 0;
655 host->store_rates = 1;
656 host->always_append_ds = 0;
657 host->batch_mode = 1;
658 host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
659 host->batch_init = cdtime();
660 host->ttl_factor = RIEMANN_TTL_FACTOR;
661 host->client = NULL;
662 host->client_type = RIEMANN_CLIENT_TCP;
664 status = cf_util_get_string(ci, &host->name);
665 if (status != 0) {
666 WARNING("write_riemann plugin: Required host name is missing.");
667 wrr_free(host);
668 return -1;
669 }
671 for (i = 0; i < ci->children_num; i++) {
672 /*
673 * The code here could be simplified but makes room
674 * for easy adding of new options later on.
675 */
676 child = &ci->children[i];
677 status = 0;
679 if (strcasecmp("Host", child->key) == 0) {
680 status = cf_util_get_string(child, &host->node);
681 if (status != 0)
682 break;
683 } else if (strcasecmp("Notifications", child->key) == 0) {
684 status = cf_util_get_boolean(child, &host->notifications);
685 if (status != 0)
686 break;
687 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
688 status = cf_util_get_string(child, &host->event_service_prefix);
689 if (status != 0)
690 break;
691 } else if (strcasecmp("CheckThresholds", child->key) == 0) {
692 status = cf_util_get_boolean(child, &host->check_thresholds);
693 if (status != 0)
694 break;
695 } else if (strcasecmp("Batch", child->key) == 0) {
696 status = cf_util_get_boolean(child, &host->batch_mode);
697 if (status != 0)
698 break;
699 } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
700 status = cf_util_get_int(child, &host->batch_max);
701 if (status != 0)
702 break;
703 } else if (strcasecmp("Port", child->key) == 0) {
704 host->port = cf_util_get_port_number(child);
705 if (host->port == -1) {
706 ERROR("write_riemann plugin: Invalid argument "
707 "configured for the \"Port\" "
708 "option.");
709 break;
710 }
711 } else if (strcasecmp("Protocol", child->key) == 0) {
712 char tmp[16];
713 status = cf_util_get_string_buffer(child,
714 tmp, sizeof(tmp));
715 if (status != 0)
716 {
717 ERROR("write_riemann plugin: cf_util_get_"
718 "string_buffer failed with "
719 "status %i.", status);
720 break;
721 }
723 if (strcasecmp("UDP", tmp) == 0)
724 host->client_type = RIEMANN_CLIENT_UDP;
725 else if (strcasecmp("TCP", tmp) == 0)
726 host->client_type = RIEMANN_CLIENT_TCP;
727 else if (strcasecmp("TLS", tmp) == 0)
728 host->client_type = RIEMANN_CLIENT_TLS;
729 else
730 WARNING("write_riemann plugin: The value "
731 "\"%s\" is not valid for the "
732 "\"Protocol\" option. Use "
733 "either \"UDP\", \"TCP\" or \"TLS\".",
734 tmp);
735 } else if (strcasecmp("TLSCAFile", child->key) == 0) {
736 status = cf_util_get_string(child, &host->tls_ca_file);
737 if (status != 0)
738 {
739 ERROR("write_riemann plugin: cf_util_get_"
740 "string_buffer failed with "
741 "status %i.", status);
742 break;
743 }
744 } else if (strcasecmp("TLSCertFile", child->key) == 0) {
745 status = cf_util_get_string(child, &host->tls_cert_file);
746 if (status != 0)
747 {
748 ERROR("write_riemann plugin: cf_util_get_"
749 "string_buffer failed with "
750 "status %i.", status);
751 break;
752 }
753 } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
754 status = cf_util_get_string(child, &host->tls_key_file);
755 if (status != 0)
756 {
757 ERROR("write_riemann plugin: cf_util_get_"
758 "string_buffer failed with "
759 "status %i.", status);
760 break;
761 }
762 } else if (strcasecmp("StoreRates", child->key) == 0) {
763 status = cf_util_get_boolean(child, &host->store_rates);
764 if (status != 0)
765 break;
766 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
767 status = cf_util_get_boolean(child,
768 &host->always_append_ds);
769 if (status != 0)
770 break;
771 } else if (strcasecmp("TTLFactor", child->key) == 0) {
772 double tmp = NAN;
773 status = cf_util_get_double(child, &tmp);
774 if (status != 0)
775 break;
776 if (tmp >= 2.0) {
777 host->ttl_factor = tmp;
778 } else if (tmp >= 1.0) {
779 NOTICE("write_riemann plugin: The configured "
780 "TTLFactor is very small "
781 "(%.1f). A value of 2.0 or "
782 "greater is recommended.",
783 tmp);
784 host->ttl_factor = tmp;
785 } else if (tmp > 0.0) {
786 WARNING("write_riemann plugin: The configured "
787 "TTLFactor is too small to be "
788 "useful (%.1f). I'll use it "
789 "since the user knows best, "
790 "but under protest.",
791 tmp);
792 host->ttl_factor = tmp;
793 } else { /* zero, negative and NAN */
794 ERROR("write_riemann plugin: The configured "
795 "TTLFactor is invalid (%.1f).",
796 tmp);
797 }
798 } else {
799 WARNING("write_riemann plugin: ignoring unknown config "
800 "option: \"%s\"", child->key);
801 }
802 }
803 if (status != 0) {
804 wrr_free(host);
805 return status;
806 }
808 ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
809 host->name);
810 ud.data = host;
811 ud.free_func = wrr_free;
813 pthread_mutex_lock(&host->lock);
815 status = plugin_register_write(callback_name, wrr_write, &ud);
817 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
818 ud.free_func = NULL;
819 plugin_register_flush(callback_name, wrr_batch_flush, &ud);
820 }
821 if (status != 0)
822 WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
823 "failed with status %i.",
824 callback_name, status);
825 else /* success */
826 host->reference_count++;
828 status = plugin_register_notification(callback_name,
829 wrr_notification, &ud);
830 if (status != 0)
831 WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
832 "failed with status %i.",
833 callback_name, status);
834 else /* success */
835 host->reference_count++;
837 if (host->reference_count <= 1)
838 {
839 /* Both callbacks failed => free memory.
840 * We need to unlock here, because riemann_free() will lock.
841 * This is not a race condition, because we're the only one
842 * holding a reference. */
843 pthread_mutex_unlock(&host->lock);
844 wrr_free(host);
845 return (-1);
846 }
848 host->reference_count--;
849 pthread_mutex_unlock(&host->lock);
851 return status;
852 } /* }}} int wrr_config_node */
854 static int wrr_config(oconfig_item_t *ci) /* {{{ */
855 {
856 int i;
857 oconfig_item_t *child;
858 int status;
860 for (i = 0; i < ci->children_num; i++) {
861 child = &ci->children[i];
863 if (strcasecmp("Node", child->key) == 0) {
864 wrr_config_node (child);
865 } else if (strcasecmp(child->key, "attribute") == 0) {
866 char *key = NULL;
867 char *val = NULL;
869 if (child->values_num != 2) {
870 WARNING("riemann attributes need both a key and a value.");
871 return (-1);
872 }
873 if (child->values[0].type != OCONFIG_TYPE_STRING ||
874 child->values[1].type != OCONFIG_TYPE_STRING) {
875 WARNING("riemann attribute needs string arguments.");
876 return (-1);
877 }
878 if ((key = strdup(child->values[0].value.string)) == NULL) {
879 WARNING("cannot allocate memory for attribute key.");
880 return (-1);
881 }
882 if ((val = strdup(child->values[1].value.string)) == NULL) {
883 WARNING("cannot allocate memory for attribute value.");
884 sfree(key);
885 return (-1);
886 }
887 strarray_add(&riemann_attrs, &riemann_attrs_num, key);
888 strarray_add(&riemann_attrs, &riemann_attrs_num, val);
889 DEBUG("write_riemann: got attr: %s => %s", key, val);
890 sfree(key);
891 sfree(val);
892 } else if (strcasecmp(child->key, "tag") == 0) {
893 char *tmp = NULL;
894 status = cf_util_get_string(child, &tmp);
895 if (status != 0)
896 continue;
898 strarray_add(&riemann_tags, &riemann_tags_num, tmp);
899 DEBUG("write_riemann plugin: Got tag: %s", tmp);
900 sfree(tmp);
901 } else {
902 WARNING("write_riemann plugin: Ignoring unknown "
903 "configuration option \"%s\" at top level.",
904 child->key);
905 }
906 }
907 return (0);
908 } /* }}} int wrr_config */
910 void module_register(void)
911 {
912 plugin_register_complex_config("write_riemann", wrr_config);
913 }
915 /* vim: set sw=8 sts=8 ts=8 noet : */