1 /**
2 * collectd - src/write_riemann.c
3 * Copyright (C) 2012,2013 Pierre-Yves Ritschard
4 * Copyright (C) 2013 Florian octo Forster
5 * Copyright (C) 2015,2016 Gergely Nagy
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Authors:
26 * Pierre-Yves Ritschard <pyr at spootnik.org>
27 * Florian octo Forster <octo at collectd.org>
28 * Gergely Nagy <algernon at madhouse-project.org>
29 */
31 #include "collectd.h"
34 #include "plugin.h"
35 #include "common.h"
36 #include "configfile.h"
37 #include "utils_cache.h"
38 #include "utils_complain.h"
39 #include "write_riemann_threshold.h"
41 #include <errno.h>
42 #include <riemann/riemann-client.h>
44 #define RIEMANN_HOST "localhost"
45 #define RIEMANN_PORT 5555
46 #define RIEMANN_TTL_FACTOR 2.0
47 #define RIEMANN_BATCH_MAX 8192
49 struct riemann_host {
50 c_complain_t init_complaint;
51 char *name;
52 char *event_service_prefix;
53 pthread_mutex_t lock;
54 _Bool batch_mode;
55 _Bool notifications;
56 _Bool check_thresholds;
57 _Bool store_rates;
58 _Bool always_append_ds;
59 char *node;
60 int port;
61 riemann_client_type_t client_type;
62 riemann_client_t *client;
63 double ttl_factor;
64 cdtime_t batch_init;
65 int batch_max;
66 int batch_timeout;
67 int reference_count;
68 riemann_message_t *batch_msg;
69 char *tls_ca_file;
70 char *tls_cert_file;
71 char *tls_key_file;
72 struct timeval timeout;
73 };
75 static char **riemann_tags;
76 static size_t riemann_tags_num;
77 static char **riemann_attrs;
78 static size_t riemann_attrs_num;
80 /* host->lock must be held when calling this function. */
81 static int wrr_connect(struct riemann_host *host) /* {{{ */
82 {
83 char const *node;
84 int port;
86 if (host->client)
87 return 0;
89 node = (host->node != NULL) ? host->node : RIEMANN_HOST;
90 port = (host->port) ? host->port : RIEMANN_PORT;
92 host->client = NULL;
94 host->client = riemann_client_create(host->client_type, node, port,
95 RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
96 RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
97 RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
98 RIEMANN_CLIENT_OPTION_NONE);
99 if (host->client == NULL) {
100 c_complain (LOG_ERR, &host->init_complaint,
101 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
102 node, port);
103 return -1;
104 }
105 if (host->timeout.tv_sec != 0) {
106 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
107 riemann_client_free(host->client);
108 host->client = NULL;
109 c_complain (LOG_ERR, &host->init_complaint,
110 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
111 node, port);
112 return -1;
113 }
114 }
116 c_release (LOG_INFO, &host->init_complaint,
117 "write_riemann plugin: Successfully connected to %s:%d",
118 node, port);
120 return 0;
121 } /* }}} int wrr_connect */
123 /* host->lock must be held when calling this function. */
124 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
125 {
126 if (!host->client)
127 return (0);
129 riemann_client_free(host->client);
130 host->client = NULL;
132 return (0);
133 } /* }}} int wrr_disconnect */
135 /**
136 * Function to send messages to riemann.
137 *
138 * Acquires the host lock, disconnects on errors.
139 */
140 static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
141 {
142 int status = 0;
144 status = wrr_connect(host);
145 if (status != 0) {
146 return status;
147 }
149 status = riemann_client_send_message(host->client, msg);
150 if (status != 0) {
151 wrr_disconnect(host);
152 return status;
153 }
155 /*
156 * For TCP we need to receive message acknowledgemenent.
157 */
158 if (host->client_type != RIEMANN_CLIENT_UDP)
159 {
160 riemann_message_t *response;
162 response = riemann_client_recv_message(host->client);
164 if (response == NULL)
165 {
166 wrr_disconnect(host);
167 return errno;
168 }
169 riemann_message_free(response);
170 }
172 return 0;
173 } /* }}} int wrr_send */
175 static int wrr_send(struct riemann_host *host, riemann_message_t *msg)
176 {
177 int status = 0;
179 pthread_mutex_lock (&host->lock);
180 status = wrr_send_nolock(host, msg);
181 pthread_mutex_unlock (&host->lock);
182 return status;
183 }
185 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
186 notification_t const *n)
187 {
188 riemann_message_t *msg;
189 riemann_event_t *event;
190 char service_buffer[6 * DATA_MAX_NAME_LEN];
191 char const *severity;
192 notification_meta_t *meta;
193 size_t i;
195 switch (n->severity)
196 {
197 case NOTIF_OKAY: severity = "ok"; break;
198 case NOTIF_WARNING: severity = "warning"; break;
199 case NOTIF_FAILURE: severity = "critical"; break;
200 default: severity = "unknown";
201 }
203 format_name(service_buffer, sizeof(service_buffer),
204 /* host = */ "", n->plugin, n->plugin_instance,
205 n->type, n->type_instance);
207 event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
208 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
209 RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
210 RIEMANN_EVENT_FIELD_STATE, severity,
211 RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
212 RIEMANN_EVENT_FIELD_NONE);
214 if (n->host[0] != 0)
215 riemann_event_string_attribute_add(event, "host", n->host);
216 if (n->plugin[0] != 0)
217 riemann_event_string_attribute_add(event, "plugin", n->plugin);
218 if (n->plugin_instance[0] != 0)
219 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
221 if (n->type[0] != 0)
222 riemann_event_string_attribute_add(event, "type", n->type);
223 if (n->type_instance[0] != 0)
224 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
226 for (i = 0; i < riemann_attrs_num; i += 2)
227 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
229 for (i = 0; i < riemann_tags_num; i++)
230 riemann_event_tag_add(event, riemann_tags[i]);
232 if (n->message[0] != 0)
233 riemann_event_string_attribute_add(event, "description", n->message);
235 /* Pull in values from threshold and add extra attributes */
236 for (meta = n->meta; meta != NULL; meta = meta->next)
237 {
238 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
239 {
240 riemann_event_set(event,
241 RIEMANN_EVENT_FIELD_METRIC_D,
242 (double) meta->nm_value.nm_double,
243 RIEMANN_EVENT_FIELD_NONE);
244 continue;
245 }
247 if (meta->type == NM_TYPE_STRING) {
248 riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
249 continue;
250 }
251 }
253 msg = riemann_message_create_with_events(event, NULL);
254 if (msg == NULL)
255 {
256 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
257 riemann_event_free (event);
258 return (NULL);
259 }
261 DEBUG("write_riemann plugin: Successfully created message for notification: "
262 "host = \"%s\", service = \"%s\", state = \"%s\"",
263 event->host, event->service, event->state);
264 return (msg);
265 } /* }}} riemann_message_t *wrr_notification_to_message */
267 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
268 data_set_t const *ds,
269 value_list_t const *vl, size_t index,
270 gauge_t const *rates,
271 int status)
272 {
273 riemann_event_t *event;
274 char name_buffer[5 * DATA_MAX_NAME_LEN];
275 char service_buffer[6 * DATA_MAX_NAME_LEN];
276 size_t i;
278 event = riemann_event_new();
279 if (event == NULL)
280 {
281 ERROR("write_riemann plugin: riemann_event_new() failed.");
282 return (NULL);
283 }
285 format_name(name_buffer, sizeof(name_buffer),
286 /* host = */ "", vl->plugin, vl->plugin_instance,
287 vl->type, vl->type_instance);
288 if (host->always_append_ds || (ds->ds_num > 1))
289 {
290 if (host->event_service_prefix == NULL)
291 ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
292 &name_buffer[1], ds->ds[index].name);
293 else
294 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
295 host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
296 }
297 else
298 {
299 if (host->event_service_prefix == NULL)
300 sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
301 else
302 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
303 host->event_service_prefix, &name_buffer[1]);
304 }
306 riemann_event_set(event,
307 RIEMANN_EVENT_FIELD_HOST, vl->host,
308 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
309 RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
310 RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
311 "plugin", vl->plugin,
312 "type", vl->type,
313 "ds_name", ds->ds[index].name,
314 NULL,
315 RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
316 RIEMANN_EVENT_FIELD_NONE);
318 if (host->check_thresholds) {
319 const char *state = NULL;
321 switch (status) {
322 case STATE_OKAY:
323 state = "ok";
324 break;
325 case STATE_ERROR:
326 state = "critical";
327 break;
328 case STATE_WARNING:
329 state = "warning";
330 break;
331 case STATE_MISSING:
332 state = "unknown";
333 break;
334 }
335 if (state)
336 riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
337 RIEMANN_EVENT_FIELD_NONE);
338 }
340 if (vl->plugin_instance[0] != 0)
341 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
342 if (vl->type_instance[0] != 0)
343 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
345 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
346 {
347 char ds_type[DATA_MAX_NAME_LEN];
349 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
350 DS_TYPE_TO_STRING(ds->ds[index].type));
351 riemann_event_string_attribute_add(event, "ds_type", ds_type);
352 }
353 else
354 {
355 riemann_event_string_attribute_add(event, "ds_type",
356 DS_TYPE_TO_STRING(ds->ds[index].type));
357 }
359 {
360 char ds_index[DATA_MAX_NAME_LEN];
362 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
363 riemann_event_string_attribute_add(event, "ds_index", ds_index);
364 }
366 for (i = 0; i < riemann_attrs_num; i += 2)
367 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
369 for (i = 0; i < riemann_tags_num; i++)
370 riemann_event_tag_add(event, riemann_tags[i]);
372 if (ds->ds[index].type == DS_TYPE_GAUGE)
373 {
374 riemann_event_set(event,
375 RIEMANN_EVENT_FIELD_METRIC_D,
376 (double) vl->values[index].gauge,
377 RIEMANN_EVENT_FIELD_NONE);
378 }
379 else if (rates != NULL)
380 {
381 riemann_event_set(event,
382 RIEMANN_EVENT_FIELD_METRIC_D,
383 (double) rates[index],
384 RIEMANN_EVENT_FIELD_NONE);
385 }
386 else
387 {
388 int64_t metric;
390 if (ds->ds[index].type == DS_TYPE_DERIVE)
391 metric = (int64_t) vl->values[index].derive;
392 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
393 metric = (int64_t) vl->values[index].absolute;
394 else
395 metric = (int64_t) vl->values[index].counter;
397 riemann_event_set(event,
398 RIEMANN_EVENT_FIELD_METRIC_S64,
399 (int64_t) metric,
400 RIEMANN_EVENT_FIELD_NONE);
401 }
403 DEBUG("write_riemann plugin: Successfully created message for metric: "
404 "host = \"%s\", service = \"%s\"",
405 event->host, event->service);
406 return (event);
407 } /* }}} riemann_event_t *wrr_value_to_event */
409 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
410 data_set_t const *ds,
411 value_list_t const *vl,
412 int *statuses)
413 {
414 riemann_message_t *msg;
415 size_t i;
416 gauge_t *rates = NULL;
418 /* Initialize the Msg structure. */
419 msg = riemann_message_new();
420 if (msg == NULL)
421 {
422 ERROR ("write_riemann plugin: riemann_message_new failed.");
423 return (NULL);
424 }
426 if (host->store_rates)
427 {
428 rates = uc_get_rate(ds, vl);
429 if (rates == NULL)
430 {
431 ERROR("write_riemann plugin: uc_get_rate failed.");
432 riemann_message_free(msg);
433 return (NULL);
434 }
435 }
437 for (i = 0; i < vl->values_len; i++)
438 {
439 riemann_event_t *event;
441 event = wrr_value_to_event(host, ds, vl,
442 (int) i, rates, statuses[i]);
443 if (event == NULL)
444 {
445 riemann_message_free(msg);
446 sfree(rates);
447 return (NULL);
448 }
449 riemann_message_append_events(msg, event, NULL);
450 }
452 sfree(rates);
453 return (msg);
454 } /* }}} riemann_message_t *wrr_value_list_to_message */
456 /*
457 * Always call while holding host->lock !
458 */
459 static int wrr_batch_flush_nolock(cdtime_t timeout,
460 struct riemann_host *host)
461 {
462 cdtime_t now;
463 int status = 0;
465 now = cdtime();
466 if (timeout > 0) {
467 if ((host->batch_init + timeout) > now) {
468 return status;
469 }
470 }
471 wrr_send_nolock(host, host->batch_msg);
472 riemann_message_free(host->batch_msg);
474 host->batch_init = now;
475 host->batch_msg = NULL;
476 return status;
477 }
479 static int wrr_batch_flush(cdtime_t timeout,
480 const char *identifier __attribute__((unused)),
481 user_data_t *user_data)
482 {
483 struct riemann_host *host;
484 int status;
486 if (user_data == NULL)
487 return (-EINVAL);
489 host = user_data->data;
490 pthread_mutex_lock(&host->lock);
491 status = wrr_batch_flush_nolock(timeout, host);
492 if (status != 0)
493 c_complain (LOG_ERR, &host->init_complaint,
494 "write_riemann plugin: riemann_client_send failed with status %i",
495 status);
496 else
497 c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
499 pthread_mutex_unlock(&host->lock);
500 return status;
501 }
503 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
504 data_set_t const *ds,
505 value_list_t const *vl,
506 int *statuses)
507 {
508 riemann_message_t *msg;
509 size_t len;
510 int ret;
511 cdtime_t timeout;
513 msg = wrr_value_list_to_message(host, ds, vl, statuses);
514 if (msg == NULL)
515 return -1;
517 pthread_mutex_lock(&host->lock);
519 if (host->batch_msg == NULL) {
520 host->batch_msg = msg;
521 } else {
522 int status;
524 status = riemann_message_append_events_n(host->batch_msg,
525 msg->n_events,
526 msg->events);
527 msg->n_events = 0;
528 msg->events = NULL;
530 riemann_message_free(msg);
532 if (status != 0) {
533 pthread_mutex_unlock(&host->lock);
534 ERROR("write_riemann plugin: out of memory");
535 return -1;
536 }
537 }
539 len = riemann_message_get_packed_size(host->batch_msg);
540 ret = 0;
541 if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
542 ret = wrr_batch_flush_nolock(0, host);
543 } else {
544 if (host->batch_timeout > 0) {
545 timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
546 ret = wrr_batch_flush_nolock(timeout, host);
547 }
548 }
550 pthread_mutex_unlock(&host->lock);
551 return ret;
552 } /* }}} riemann_message_t *wrr_batch_add_value_list */
554 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
555 {
556 int status;
557 struct riemann_host *host = ud->data;
558 riemann_message_t *msg;
560 if (!host->notifications)
561 return 0;
563 /*
564 * Never batch for notifications, send them ASAP
565 */
566 msg = wrr_notification_to_message(host, n);
567 if (msg == NULL)
568 return (-1);
570 status = wrr_send(host, msg);
571 if (status != 0)
572 c_complain (LOG_ERR, &host->init_complaint,
573 "write_riemann plugin: riemann_client_send failed with status %i",
574 status);
575 else
576 c_release (LOG_DEBUG, &host->init_complaint,
577 "write_riemann plugin: riemann_client_send succeeded");
579 riemann_message_free(msg);
580 return (status);
581 } /* }}} int wrr_notification */
583 static int wrr_write(const data_set_t *ds, /* {{{ */
584 const value_list_t *vl,
585 user_data_t *ud)
586 {
587 int status = 0;
588 int statuses[vl->values_len];
589 struct riemann_host *host = ud->data;
590 riemann_message_t *msg;
592 if (host->check_thresholds) {
593 status = write_riemann_threshold_check(ds, vl, statuses);
594 if (status != 0)
595 return status;
596 } else {
597 memset (statuses, 0, sizeof (statuses));
598 }
600 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
601 wrr_batch_add_value_list(host, ds, vl, statuses);
602 } else {
603 msg = wrr_value_list_to_message(host, ds, vl, statuses);
604 if (msg == NULL)
605 return (-1);
607 status = wrr_send(host, msg);
609 riemann_message_free(msg);
610 }
611 return status;
612 } /* }}} int wrr_write */
614 static void wrr_free(void *p) /* {{{ */
615 {
616 struct riemann_host *host = p;
618 if (host == NULL)
619 return;
621 pthread_mutex_lock(&host->lock);
623 host->reference_count--;
624 if (host->reference_count > 0)
625 {
626 pthread_mutex_unlock(&host->lock);
627 return;
628 }
630 wrr_disconnect(host);
632 pthread_mutex_destroy(&host->lock);
633 sfree(host);
634 } /* }}} void wrr_free */
636 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
637 {
638 struct riemann_host *host = NULL;
639 int status = 0;
640 int i;
641 oconfig_item_t *child;
642 char callback_name[DATA_MAX_NAME_LEN];
643 user_data_t ud;
645 if ((host = calloc(1, sizeof(*host))) == NULL) {
646 ERROR ("write_riemann plugin: calloc failed.");
647 return ENOMEM;
648 }
649 pthread_mutex_init(&host->lock, NULL);
650 C_COMPLAIN_INIT (&host->init_complaint);
651 host->reference_count = 1;
652 host->node = NULL;
653 host->port = 0;
654 host->notifications = 1;
655 host->check_thresholds = 0;
656 host->store_rates = 1;
657 host->always_append_ds = 0;
658 host->batch_mode = 1;
659 host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
660 host->batch_init = cdtime();
661 host->batch_timeout = 0;
662 host->ttl_factor = RIEMANN_TTL_FACTOR;
663 host->client = NULL;
664 host->client_type = RIEMANN_CLIENT_TCP;
665 host->timeout.tv_sec = 0;
666 host->timeout.tv_usec = 0;
668 status = cf_util_get_string(ci, &host->name);
669 if (status != 0) {
670 WARNING("write_riemann plugin: Required host name is missing.");
671 wrr_free(host);
672 return -1;
673 }
675 for (i = 0; i < ci->children_num; i++) {
676 /*
677 * The code here could be simplified but makes room
678 * for easy adding of new options later on.
679 */
680 child = &ci->children[i];
681 status = 0;
683 if (strcasecmp("Host", child->key) == 0) {
684 status = cf_util_get_string(child, &host->node);
685 if (status != 0)
686 break;
687 } else if (strcasecmp("Notifications", child->key) == 0) {
688 status = cf_util_get_boolean(child, &host->notifications);
689 if (status != 0)
690 break;
691 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
692 status = cf_util_get_string(child, &host->event_service_prefix);
693 if (status != 0)
694 break;
695 } else if (strcasecmp("CheckThresholds", child->key) == 0) {
696 status = cf_util_get_boolean(child, &host->check_thresholds);
697 if (status != 0)
698 break;
699 } else if (strcasecmp("Batch", child->key) == 0) {
700 status = cf_util_get_boolean(child, &host->batch_mode);
701 if (status != 0)
702 break;
703 } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
704 status = cf_util_get_int(child, &host->batch_max);
705 if (status != 0)
706 break;
707 } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
708 status = cf_util_get_int(child, &host->batch_timeout);
709 if (status != 0)
710 break;
711 } else if (strcasecmp("Timeout", child->key) == 0) {
712 status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
713 if (status != 0)
714 break;
715 } else if (strcasecmp("Port", child->key) == 0) {
716 host->port = cf_util_get_port_number(child);
717 if (host->port == -1) {
718 ERROR("write_riemann plugin: Invalid argument "
719 "configured for the \"Port\" "
720 "option.");
721 break;
722 }
723 } else if (strcasecmp("Protocol", child->key) == 0) {
724 char tmp[16];
725 status = cf_util_get_string_buffer(child,
726 tmp, sizeof(tmp));
727 if (status != 0)
728 {
729 ERROR("write_riemann plugin: cf_util_get_"
730 "string_buffer failed with "
731 "status %i.", status);
732 break;
733 }
735 if (strcasecmp("UDP", tmp) == 0)
736 host->client_type = RIEMANN_CLIENT_UDP;
737 else if (strcasecmp("TCP", tmp) == 0)
738 host->client_type = RIEMANN_CLIENT_TCP;
739 else if (strcasecmp("TLS", tmp) == 0)
740 host->client_type = RIEMANN_CLIENT_TLS;
741 else
742 WARNING("write_riemann plugin: The value "
743 "\"%s\" is not valid for the "
744 "\"Protocol\" option. Use "
745 "either \"UDP\", \"TCP\" or \"TLS\".",
746 tmp);
747 } else if (strcasecmp("TLSCAFile", child->key) == 0) {
748 status = cf_util_get_string(child, &host->tls_ca_file);
749 if (status != 0)
750 {
751 ERROR("write_riemann plugin: cf_util_get_"
752 "string_buffer failed with "
753 "status %i.", status);
754 break;
755 }
756 } else if (strcasecmp("TLSCertFile", child->key) == 0) {
757 status = cf_util_get_string(child, &host->tls_cert_file);
758 if (status != 0)
759 {
760 ERROR("write_riemann plugin: cf_util_get_"
761 "string_buffer failed with "
762 "status %i.", status);
763 break;
764 }
765 } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
766 status = cf_util_get_string(child, &host->tls_key_file);
767 if (status != 0)
768 {
769 ERROR("write_riemann plugin: cf_util_get_"
770 "string_buffer failed with "
771 "status %i.", status);
772 break;
773 }
774 } else if (strcasecmp("StoreRates", child->key) == 0) {
775 status = cf_util_get_boolean(child, &host->store_rates);
776 if (status != 0)
777 break;
778 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
779 status = cf_util_get_boolean(child,
780 &host->always_append_ds);
781 if (status != 0)
782 break;
783 } else if (strcasecmp("TTLFactor", child->key) == 0) {
784 double tmp = NAN;
785 status = cf_util_get_double(child, &tmp);
786 if (status != 0)
787 break;
788 if (tmp >= 2.0) {
789 host->ttl_factor = tmp;
790 } else if (tmp >= 1.0) {
791 NOTICE("write_riemann plugin: The configured "
792 "TTLFactor is very small "
793 "(%.1f). A value of 2.0 or "
794 "greater is recommended.",
795 tmp);
796 host->ttl_factor = tmp;
797 } else if (tmp > 0.0) {
798 WARNING("write_riemann plugin: The configured "
799 "TTLFactor is too small to be "
800 "useful (%.1f). I'll use it "
801 "since the user knows best, "
802 "but under protest.",
803 tmp);
804 host->ttl_factor = tmp;
805 } else { /* zero, negative and NAN */
806 ERROR("write_riemann plugin: The configured "
807 "TTLFactor is invalid (%.1f).",
808 tmp);
809 }
810 } else {
811 WARNING("write_riemann plugin: ignoring unknown config "
812 "option: \"%s\"", child->key);
813 }
814 }
815 if (status != 0) {
816 wrr_free(host);
817 return status;
818 }
820 ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
821 host->name);
822 ud.data = host;
823 ud.free_func = wrr_free;
825 pthread_mutex_lock(&host->lock);
827 status = plugin_register_write(callback_name, wrr_write, &ud);
829 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
830 ud.free_func = NULL;
831 plugin_register_flush(callback_name, wrr_batch_flush, &ud);
832 }
833 if (status != 0)
834 WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
835 "failed with status %i.",
836 callback_name, status);
837 else /* success */
838 host->reference_count++;
840 status = plugin_register_notification(callback_name,
841 wrr_notification, &ud);
842 if (status != 0)
843 WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
844 "failed with status %i.",
845 callback_name, status);
846 else /* success */
847 host->reference_count++;
849 if (host->reference_count <= 1)
850 {
851 /* Both callbacks failed => free memory.
852 * We need to unlock here, because riemann_free() will lock.
853 * This is not a race condition, because we're the only one
854 * holding a reference. */
855 pthread_mutex_unlock(&host->lock);
856 wrr_free(host);
857 return (-1);
858 }
860 host->reference_count--;
861 pthread_mutex_unlock(&host->lock);
863 return status;
864 } /* }}} int wrr_config_node */
866 static int wrr_config(oconfig_item_t *ci) /* {{{ */
867 {
868 int i;
869 oconfig_item_t *child;
870 int status;
872 for (i = 0; i < ci->children_num; i++) {
873 child = &ci->children[i];
875 if (strcasecmp("Node", child->key) == 0) {
876 wrr_config_node (child);
877 } else if (strcasecmp(child->key, "attribute") == 0) {
878 char *key = NULL;
879 char *val = NULL;
881 if (child->values_num != 2) {
882 WARNING("riemann attributes need both a key and a value.");
883 return (-1);
884 }
885 if (child->values[0].type != OCONFIG_TYPE_STRING ||
886 child->values[1].type != OCONFIG_TYPE_STRING) {
887 WARNING("riemann attribute needs string arguments.");
888 return (-1);
889 }
890 if ((key = strdup(child->values[0].value.string)) == NULL) {
891 WARNING("cannot allocate memory for attribute key.");
892 return (-1);
893 }
894 if ((val = strdup(child->values[1].value.string)) == NULL) {
895 WARNING("cannot allocate memory for attribute value.");
896 sfree(key);
897 return (-1);
898 }
899 strarray_add(&riemann_attrs, &riemann_attrs_num, key);
900 strarray_add(&riemann_attrs, &riemann_attrs_num, val);
901 DEBUG("write_riemann: got attr: %s => %s", key, val);
902 sfree(key);
903 sfree(val);
904 } else if (strcasecmp(child->key, "tag") == 0) {
905 char *tmp = NULL;
906 status = cf_util_get_string(child, &tmp);
907 if (status != 0)
908 continue;
910 strarray_add(&riemann_tags, &riemann_tags_num, tmp);
911 DEBUG("write_riemann plugin: Got tag: %s", tmp);
912 sfree(tmp);
913 } else {
914 WARNING("write_riemann plugin: Ignoring unknown "
915 "configuration option \"%s\" at top level.",
916 child->key);
917 }
918 }
919 return (0);
920 } /* }}} int wrr_config */
922 void module_register(void)
923 {
924 plugin_register_complex_config("write_riemann", wrr_config);
925 }
927 /* vim: set sw=8 sts=8 ts=8 noet : */