1 /**
2 * collectd - src/write_riemann.c
3 * Copyright (C) 2012,2013 Pierre-Yves Ritschard
4 * Copyright (C) 2013 Florian octo Forster
5 * Copyright (C) 2015,2016 Gergely Nagy
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Authors:
26 * Pierre-Yves Ritschard <pyr at spootnik.org>
27 * Florian octo Forster <octo at collectd.org>
28 * Gergely Nagy <algernon at madhouse-project.org>
29 */
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "utils_complain.h"
41 #include "write_riemann_threshold.h"
43 #define RIEMANN_HOST "localhost"
44 #define RIEMANN_PORT 5555
45 #define RIEMANN_TTL_FACTOR 2.0
46 #define RIEMANN_BATCH_MAX 8192
48 struct riemann_host {
49 c_complain_t init_complaint;
50 char *name;
51 char *event_service_prefix;
52 pthread_mutex_t lock;
53 _Bool batch_mode;
54 _Bool notifications;
55 _Bool check_thresholds;
56 _Bool store_rates;
57 _Bool always_append_ds;
58 char *node;
59 int port;
60 riemann_client_type_t client_type;
61 riemann_client_t *client;
62 double ttl_factor;
63 cdtime_t batch_init;
64 int batch_max;
65 int reference_count;
66 riemann_message_t *batch_msg;
67 char *tls_ca_file;
68 char *tls_cert_file;
69 char *tls_key_file;
70 struct timeval timeout;
71 };
73 static char **riemann_tags;
74 static size_t riemann_tags_num;
75 static char **riemann_attrs;
76 static size_t riemann_attrs_num;
78 /* host->lock must be held when calling this function. */
79 static int wrr_connect(struct riemann_host *host) /* {{{ */
80 {
81 char const *node;
82 int port;
84 if (host->client)
85 return 0;
87 node = (host->node != NULL) ? host->node : RIEMANN_HOST;
88 port = (host->port) ? host->port : RIEMANN_PORT;
90 host->client = NULL;
92 host->client = riemann_client_create(host->client_type, node, port,
93 RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
94 RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
95 RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
96 RIEMANN_CLIENT_OPTION_NONE);
97 if (host->client == NULL) {
98 c_complain (LOG_ERR, &host->init_complaint,
99 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
100 node, port);
101 return -1;
102 }
103 if (host->timeout.tv_sec != 0) {
104 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
105 riemann_client_free(host->client);
106 host->client = NULL;
107 c_complain (LOG_ERR, &host->init_complaint,
108 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
109 node, port);
110 return -1;
111 }
112 }
114 c_release (LOG_INFO, &host->init_complaint,
115 "write_riemann plugin: Successfully connected to %s:%d",
116 node, port);
118 return 0;
119 } /* }}} int wrr_connect */
121 /* host->lock must be held when calling this function. */
122 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
123 {
124 if (!host->client)
125 return (0);
127 riemann_client_free(host->client);
128 host->client = NULL;
130 return (0);
131 } /* }}} int wrr_disconnect */
133 /**
134 * Function to send messages to riemann.
135 *
136 * Acquires the host lock, disconnects on errors.
137 */
138 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
139 {
140 int status = 0;
141 pthread_mutex_lock (&host->lock);
143 status = wrr_connect(host);
144 if (status != 0) {
145 pthread_mutex_unlock(&host->lock);
146 return status;
147 }
149 status = riemann_client_send_message(host->client, msg);
150 if (status != 0) {
151 wrr_disconnect(host);
152 pthread_mutex_unlock(&host->lock);
153 return status;
154 }
156 /*
157 * For TCP we need to receive message acknowledgemenent.
158 */
159 if (host->client_type != RIEMANN_CLIENT_UDP)
160 {
161 riemann_message_t *response;
163 response = riemann_client_recv_message(host->client);
165 if (response == NULL)
166 {
167 wrr_disconnect(host);
168 pthread_mutex_unlock(&host->lock);
169 return errno;
170 }
171 riemann_message_free(response);
172 }
174 pthread_mutex_unlock (&host->lock);
175 return 0;
176 } /* }}} int wrr_send */
178 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
179 notification_t const *n)
180 {
181 riemann_message_t *msg;
182 riemann_event_t *event;
183 char service_buffer[6 * DATA_MAX_NAME_LEN];
184 char const *severity;
185 notification_meta_t *meta;
186 size_t i;
188 switch (n->severity)
189 {
190 case NOTIF_OKAY: severity = "ok"; break;
191 case NOTIF_WARNING: severity = "warning"; break;
192 case NOTIF_FAILURE: severity = "critical"; break;
193 default: severity = "unknown";
194 }
196 format_name(service_buffer, sizeof(service_buffer),
197 /* host = */ "", n->plugin, n->plugin_instance,
198 n->type, n->type_instance);
200 event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
201 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
202 RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
203 RIEMANN_EVENT_FIELD_STATE, severity,
204 RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
205 RIEMANN_EVENT_FIELD_NONE);
207 if (n->host[0] != 0)
208 riemann_event_string_attribute_add(event, "host", n->host);
209 if (n->plugin[0] != 0)
210 riemann_event_string_attribute_add(event, "plugin", n->plugin);
211 if (n->plugin_instance[0] != 0)
212 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
214 if (n->type[0] != 0)
215 riemann_event_string_attribute_add(event, "type", n->type);
216 if (n->type_instance[0] != 0)
217 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
219 for (i = 0; i < riemann_attrs_num; i += 2)
220 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
222 for (i = 0; i < riemann_tags_num; i++)
223 riemann_event_tag_add(event, riemann_tags[i]);
225 if (n->message[0] != 0)
226 riemann_event_string_attribute_add(event, "description", n->message);
228 /* Pull in values from threshold and add extra attributes */
229 for (meta = n->meta; meta != NULL; meta = meta->next)
230 {
231 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
232 {
233 riemann_event_set(event,
234 RIEMANN_EVENT_FIELD_METRIC_D,
235 (double) meta->nm_value.nm_double,
236 RIEMANN_EVENT_FIELD_NONE);
237 continue;
238 }
240 if (meta->type == NM_TYPE_STRING) {
241 riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
242 continue;
243 }
244 }
246 msg = riemann_message_create_with_events(event, NULL);
247 if (msg == NULL)
248 {
249 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
250 riemann_event_free (event);
251 return (NULL);
252 }
254 DEBUG("write_riemann plugin: Successfully created message for notification: "
255 "host = \"%s\", service = \"%s\", state = \"%s\"",
256 event->host, event->service, event->state);
257 return (msg);
258 } /* }}} riemann_message_t *wrr_notification_to_message */
260 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
261 data_set_t const *ds,
262 value_list_t const *vl, size_t index,
263 gauge_t const *rates,
264 int status)
265 {
266 riemann_event_t *event;
267 char name_buffer[5 * DATA_MAX_NAME_LEN];
268 char service_buffer[6 * DATA_MAX_NAME_LEN];
269 size_t i;
271 event = riemann_event_new();
272 if (event == NULL)
273 {
274 ERROR("write_riemann plugin: riemann_event_new() failed.");
275 return (NULL);
276 }
278 format_name(name_buffer, sizeof(name_buffer),
279 /* host = */ "", vl->plugin, vl->plugin_instance,
280 vl->type, vl->type_instance);
281 if (host->always_append_ds || (ds->ds_num > 1))
282 {
283 if (host->event_service_prefix == NULL)
284 ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
285 &name_buffer[1], ds->ds[index].name);
286 else
287 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
288 host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
289 }
290 else
291 {
292 if (host->event_service_prefix == NULL)
293 sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
294 else
295 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
296 host->event_service_prefix, &name_buffer[1]);
297 }
299 riemann_event_set(event,
300 RIEMANN_EVENT_FIELD_HOST, vl->host,
301 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
302 RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
303 RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
304 "plugin", vl->plugin,
305 "type", vl->type,
306 "ds_name", ds->ds[index].name,
307 NULL,
308 RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
309 RIEMANN_EVENT_FIELD_NONE);
311 if (host->check_thresholds) {
312 const char *state = NULL;
314 switch (status) {
315 case STATE_OKAY:
316 state = "ok";
317 break;
318 case STATE_ERROR:
319 state = "critical";
320 break;
321 case STATE_WARNING:
322 state = "warning";
323 break;
324 case STATE_MISSING:
325 state = "unknown";
326 break;
327 }
328 if (state)
329 riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
330 RIEMANN_EVENT_FIELD_NONE);
331 }
333 if (vl->plugin_instance[0] != 0)
334 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
335 if (vl->type_instance[0] != 0)
336 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
338 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
339 {
340 char ds_type[DATA_MAX_NAME_LEN];
342 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
343 DS_TYPE_TO_STRING(ds->ds[index].type));
344 riemann_event_string_attribute_add(event, "ds_type", ds_type);
345 }
346 else
347 {
348 riemann_event_string_attribute_add(event, "ds_type",
349 DS_TYPE_TO_STRING(ds->ds[index].type));
350 }
352 {
353 char ds_index[DATA_MAX_NAME_LEN];
355 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
356 riemann_event_string_attribute_add(event, "ds_index", ds_index);
357 }
359 for (i = 0; i < riemann_attrs_num; i += 2)
360 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
362 for (i = 0; i < riemann_tags_num; i++)
363 riemann_event_tag_add(event, riemann_tags[i]);
365 if (ds->ds[index].type == DS_TYPE_GAUGE)
366 {
367 riemann_event_set(event,
368 RIEMANN_EVENT_FIELD_METRIC_D,
369 (double) vl->values[index].gauge,
370 RIEMANN_EVENT_FIELD_NONE);
371 }
372 else if (rates != NULL)
373 {
374 riemann_event_set(event,
375 RIEMANN_EVENT_FIELD_METRIC_D,
376 (double) rates[index],
377 RIEMANN_EVENT_FIELD_NONE);
378 }
379 else
380 {
381 int64_t metric;
383 if (ds->ds[index].type == DS_TYPE_DERIVE)
384 metric = (int64_t) vl->values[index].derive;
385 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
386 metric = (int64_t) vl->values[index].absolute;
387 else
388 metric = (int64_t) vl->values[index].counter;
390 riemann_event_set(event,
391 RIEMANN_EVENT_FIELD_METRIC_S64,
392 (int64_t) metric,
393 RIEMANN_EVENT_FIELD_NONE);
394 }
396 DEBUG("write_riemann plugin: Successfully created message for metric: "
397 "host = \"%s\", service = \"%s\"",
398 event->host, event->service);
399 return (event);
400 } /* }}} riemann_event_t *wrr_value_to_event */
402 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
403 data_set_t const *ds,
404 value_list_t const *vl,
405 int *statuses)
406 {
407 riemann_message_t *msg;
408 size_t i;
409 gauge_t *rates = NULL;
411 /* Initialize the Msg structure. */
412 msg = riemann_message_new();
413 if (msg == NULL)
414 {
415 ERROR ("write_riemann plugin: riemann_message_new failed.");
416 return (NULL);
417 }
419 if (host->store_rates)
420 {
421 rates = uc_get_rate(ds, vl);
422 if (rates == NULL)
423 {
424 ERROR("write_riemann plugin: uc_get_rate failed.");
425 riemann_message_free(msg);
426 return (NULL);
427 }
428 }
430 for (i = 0; i < vl->values_len; i++)
431 {
432 riemann_event_t *event;
434 event = wrr_value_to_event(host, ds, vl,
435 (int) i, rates, statuses[i]);
436 if (event == NULL)
437 {
438 riemann_message_free(msg);
439 sfree(rates);
440 return (NULL);
441 }
442 riemann_message_append_events(msg, event, NULL);
443 }
445 sfree(rates);
446 return (msg);
447 } /* }}} riemann_message_t *wrr_value_list_to_message */
449 /*
450 * Always call while holding host->lock !
451 */
452 static int wrr_batch_flush_nolock(cdtime_t timeout,
453 struct riemann_host *host)
454 {
455 cdtime_t now;
456 int status = 0;
458 if (timeout > 0) {
459 now = cdtime();
460 if ((host->batch_init + timeout) > now)
461 return status;
462 }
463 wrr_send(host, host->batch_msg);
464 riemann_message_free(host->batch_msg);
466 if (host->client_type != RIEMANN_CLIENT_UDP)
467 {
468 riemann_message_t *response;
470 response = riemann_client_recv_message(host->client);
472 if (!response)
473 {
474 wrr_disconnect(host);
475 return errno;
476 }
478 riemann_message_free(response);
479 }
481 host->batch_init = cdtime();
482 host->batch_msg = NULL;
483 return status;
484 }
486 static int wrr_batch_flush(cdtime_t timeout,
487 const char *identifier __attribute__((unused)),
488 user_data_t *user_data)
489 {
490 struct riemann_host *host;
491 int status;
493 if (user_data == NULL)
494 return (-EINVAL);
496 host = user_data->data;
497 pthread_mutex_lock(&host->lock);
498 status = wrr_batch_flush_nolock(timeout, host);
499 if (status != 0)
500 c_complain (LOG_ERR, &host->init_complaint,
501 "write_riemann plugin: riemann_client_send failed with status %i",
502 status);
503 else
504 c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
506 pthread_mutex_unlock(&host->lock);
507 return status;
508 }
510 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
511 data_set_t const *ds,
512 value_list_t const *vl,
513 int *statuses)
514 {
515 riemann_message_t *msg;
516 size_t len;
517 int ret;
519 msg = wrr_value_list_to_message(host, ds, vl, statuses);
520 if (msg == NULL)
521 return -1;
523 pthread_mutex_lock(&host->lock);
525 if (host->batch_msg == NULL) {
526 host->batch_msg = msg;
527 } else {
528 int status;
530 status = riemann_message_append_events_n(host->batch_msg,
531 msg->n_events,
532 msg->events);
533 msg->n_events = 0;
534 msg->events = NULL;
536 riemann_message_free(msg);
538 if (status != 0) {
539 pthread_mutex_unlock(&host->lock);
540 ERROR("write_riemann plugin: out of memory");
541 return -1;
542 }
543 }
545 len = riemann_message_get_packed_size(host->batch_msg);
546 ret = 0;
547 if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
548 ret = wrr_batch_flush_nolock(0, host);
549 }
551 pthread_mutex_unlock(&host->lock);
552 return ret;
553 } /* }}} riemann_message_t *wrr_batch_add_value_list */
555 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
556 {
557 int status;
558 struct riemann_host *host = ud->data;
559 riemann_message_t *msg;
561 if (!host->notifications)
562 return 0;
564 /*
565 * Never batch for notifications, send them ASAP
566 */
567 msg = wrr_notification_to_message(host, n);
568 if (msg == NULL)
569 return (-1);
571 status = wrr_send(host, msg);
572 if (status != 0)
573 c_complain (LOG_ERR, &host->init_complaint,
574 "write_riemann plugin: riemann_client_send failed with status %i",
575 status);
576 else
577 c_release (LOG_DEBUG, &host->init_complaint,
578 "write_riemann plugin: riemann_client_send succeeded");
580 riemann_message_free(msg);
581 return (status);
582 } /* }}} int wrr_notification */
584 static int wrr_write(const data_set_t *ds, /* {{{ */
585 const value_list_t *vl,
586 user_data_t *ud)
587 {
588 int status = 0;
589 int statuses[vl->values_len];
590 struct riemann_host *host = ud->data;
591 riemann_message_t *msg;
593 if (host->check_thresholds) {
594 status = write_riemann_threshold_check(ds, vl, statuses);
595 if (status != 0)
596 return status;
597 } else {
598 memset (statuses, 0, sizeof (statuses));
599 }
601 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
602 wrr_batch_add_value_list(host, ds, vl, statuses);
603 } else {
604 msg = wrr_value_list_to_message(host, ds, vl, statuses);
605 if (msg == NULL)
606 return (-1);
608 status = wrr_send(host, msg);
610 riemann_message_free(msg);
611 }
612 return status;
613 } /* }}} int wrr_write */
615 static void wrr_free(void *p) /* {{{ */
616 {
617 struct riemann_host *host = p;
619 if (host == NULL)
620 return;
622 pthread_mutex_lock(&host->lock);
624 host->reference_count--;
625 if (host->reference_count > 0)
626 {
627 pthread_mutex_unlock(&host->lock);
628 return;
629 }
631 wrr_disconnect(host);
633 pthread_mutex_destroy(&host->lock);
634 sfree(host);
635 } /* }}} void wrr_free */
637 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
638 {
639 struct riemann_host *host = NULL;
640 int status = 0;
641 int i;
642 oconfig_item_t *child;
643 char callback_name[DATA_MAX_NAME_LEN];
644 user_data_t ud;
646 if ((host = calloc(1, sizeof(*host))) == NULL) {
647 ERROR ("write_riemann plugin: calloc failed.");
648 return ENOMEM;
649 }
650 pthread_mutex_init(&host->lock, NULL);
651 C_COMPLAIN_INIT (&host->init_complaint);
652 host->reference_count = 1;
653 host->node = NULL;
654 host->port = 0;
655 host->notifications = 1;
656 host->check_thresholds = 0;
657 host->store_rates = 1;
658 host->always_append_ds = 0;
659 host->batch_mode = 1;
660 host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
661 host->batch_init = cdtime();
662 host->ttl_factor = RIEMANN_TTL_FACTOR;
663 host->client = NULL;
664 host->client_type = RIEMANN_CLIENT_TCP;
665 host->timeout.tv_sec = 0;
666 host->timeout.tv_usec = 0;
668 status = cf_util_get_string(ci, &host->name);
669 if (status != 0) {
670 WARNING("write_riemann plugin: Required host name is missing.");
671 wrr_free(host);
672 return -1;
673 }
675 for (i = 0; i < ci->children_num; i++) {
676 /*
677 * The code here could be simplified but makes room
678 * for easy adding of new options later on.
679 */
680 child = &ci->children[i];
681 status = 0;
683 if (strcasecmp("Host", child->key) == 0) {
684 status = cf_util_get_string(child, &host->node);
685 if (status != 0)
686 break;
687 } else if (strcasecmp("Notifications", child->key) == 0) {
688 status = cf_util_get_boolean(child, &host->notifications);
689 if (status != 0)
690 break;
691 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
692 status = cf_util_get_string(child, &host->event_service_prefix);
693 if (status != 0)
694 break;
695 } else if (strcasecmp("CheckThresholds", child->key) == 0) {
696 status = cf_util_get_boolean(child, &host->check_thresholds);
697 if (status != 0)
698 break;
699 } else if (strcasecmp("Batch", child->key) == 0) {
700 status = cf_util_get_boolean(child, &host->batch_mode);
701 if (status != 0)
702 break;
703 } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
704 status = cf_util_get_int(child, &host->batch_max);
705 if (status != 0)
706 break;
707 } else if (strcasecmp("Timeout", child->key) == 0) {
708 status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
709 if (status != 0)
710 break;
711 } else if (strcasecmp("Port", child->key) == 0) {
712 host->port = cf_util_get_port_number(child);
713 if (host->port == -1) {
714 ERROR("write_riemann plugin: Invalid argument "
715 "configured for the \"Port\" "
716 "option.");
717 break;
718 }
719 } else if (strcasecmp("Protocol", child->key) == 0) {
720 char tmp[16];
721 status = cf_util_get_string_buffer(child,
722 tmp, sizeof(tmp));
723 if (status != 0)
724 {
725 ERROR("write_riemann plugin: cf_util_get_"
726 "string_buffer failed with "
727 "status %i.", status);
728 break;
729 }
731 if (strcasecmp("UDP", tmp) == 0)
732 host->client_type = RIEMANN_CLIENT_UDP;
733 else if (strcasecmp("TCP", tmp) == 0)
734 host->client_type = RIEMANN_CLIENT_TCP;
735 else if (strcasecmp("TLS", tmp) == 0)
736 host->client_type = RIEMANN_CLIENT_TLS;
737 else
738 WARNING("write_riemann plugin: The value "
739 "\"%s\" is not valid for the "
740 "\"Protocol\" option. Use "
741 "either \"UDP\", \"TCP\" or \"TLS\".",
742 tmp);
743 } else if (strcasecmp("TLSCAFile", child->key) == 0) {
744 status = cf_util_get_string(child, &host->tls_ca_file);
745 if (status != 0)
746 {
747 ERROR("write_riemann plugin: cf_util_get_"
748 "string_buffer failed with "
749 "status %i.", status);
750 break;
751 }
752 } else if (strcasecmp("TLSCertFile", child->key) == 0) {
753 status = cf_util_get_string(child, &host->tls_cert_file);
754 if (status != 0)
755 {
756 ERROR("write_riemann plugin: cf_util_get_"
757 "string_buffer failed with "
758 "status %i.", status);
759 break;
760 }
761 } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
762 status = cf_util_get_string(child, &host->tls_key_file);
763 if (status != 0)
764 {
765 ERROR("write_riemann plugin: cf_util_get_"
766 "string_buffer failed with "
767 "status %i.", status);
768 break;
769 }
770 } else if (strcasecmp("StoreRates", child->key) == 0) {
771 status = cf_util_get_boolean(child, &host->store_rates);
772 if (status != 0)
773 break;
774 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
775 status = cf_util_get_boolean(child,
776 &host->always_append_ds);
777 if (status != 0)
778 break;
779 } else if (strcasecmp("TTLFactor", child->key) == 0) {
780 double tmp = NAN;
781 status = cf_util_get_double(child, &tmp);
782 if (status != 0)
783 break;
784 if (tmp >= 2.0) {
785 host->ttl_factor = tmp;
786 } else if (tmp >= 1.0) {
787 NOTICE("write_riemann plugin: The configured "
788 "TTLFactor is very small "
789 "(%.1f). A value of 2.0 or "
790 "greater is recommended.",
791 tmp);
792 host->ttl_factor = tmp;
793 } else if (tmp > 0.0) {
794 WARNING("write_riemann plugin: The configured "
795 "TTLFactor is too small to be "
796 "useful (%.1f). I'll use it "
797 "since the user knows best, "
798 "but under protest.",
799 tmp);
800 host->ttl_factor = tmp;
801 } else { /* zero, negative and NAN */
802 ERROR("write_riemann plugin: The configured "
803 "TTLFactor is invalid (%.1f).",
804 tmp);
805 }
806 } else {
807 WARNING("write_riemann plugin: ignoring unknown config "
808 "option: \"%s\"", child->key);
809 }
810 }
811 if (status != 0) {
812 wrr_free(host);
813 return status;
814 }
816 ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
817 host->name);
818 ud.data = host;
819 ud.free_func = wrr_free;
821 pthread_mutex_lock(&host->lock);
823 status = plugin_register_write(callback_name, wrr_write, &ud);
825 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
826 ud.free_func = NULL;
827 plugin_register_flush(callback_name, wrr_batch_flush, &ud);
828 }
829 if (status != 0)
830 WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
831 "failed with status %i.",
832 callback_name, status);
833 else /* success */
834 host->reference_count++;
836 status = plugin_register_notification(callback_name,
837 wrr_notification, &ud);
838 if (status != 0)
839 WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
840 "failed with status %i.",
841 callback_name, status);
842 else /* success */
843 host->reference_count++;
845 if (host->reference_count <= 1)
846 {
847 /* Both callbacks failed => free memory.
848 * We need to unlock here, because riemann_free() will lock.
849 * This is not a race condition, because we're the only one
850 * holding a reference. */
851 pthread_mutex_unlock(&host->lock);
852 wrr_free(host);
853 return (-1);
854 }
856 host->reference_count--;
857 pthread_mutex_unlock(&host->lock);
859 return status;
860 } /* }}} int wrr_config_node */
862 static int wrr_config(oconfig_item_t *ci) /* {{{ */
863 {
864 int i;
865 oconfig_item_t *child;
866 int status;
868 for (i = 0; i < ci->children_num; i++) {
869 child = &ci->children[i];
871 if (strcasecmp("Node", child->key) == 0) {
872 wrr_config_node (child);
873 } else if (strcasecmp(child->key, "attribute") == 0) {
874 char *key = NULL;
875 char *val = NULL;
877 if (child->values_num != 2) {
878 WARNING("riemann attributes need both a key and a value.");
879 return (-1);
880 }
881 if (child->values[0].type != OCONFIG_TYPE_STRING ||
882 child->values[1].type != OCONFIG_TYPE_STRING) {
883 WARNING("riemann attribute needs string arguments.");
884 return (-1);
885 }
886 if ((key = strdup(child->values[0].value.string)) == NULL) {
887 WARNING("cannot allocate memory for attribute key.");
888 return (-1);
889 }
890 if ((val = strdup(child->values[1].value.string)) == NULL) {
891 WARNING("cannot allocate memory for attribute value.");
892 sfree(key);
893 return (-1);
894 }
895 strarray_add(&riemann_attrs, &riemann_attrs_num, key);
896 strarray_add(&riemann_attrs, &riemann_attrs_num, val);
897 DEBUG("write_riemann: got attr: %s => %s", key, val);
898 sfree(key);
899 sfree(val);
900 } else if (strcasecmp(child->key, "tag") == 0) {
901 char *tmp = NULL;
902 status = cf_util_get_string(child, &tmp);
903 if (status != 0)
904 continue;
906 strarray_add(&riemann_tags, &riemann_tags_num, tmp);
907 DEBUG("write_riemann plugin: Got tag: %s", tmp);
908 sfree(tmp);
909 } else {
910 WARNING("write_riemann plugin: Ignoring unknown "
911 "configuration option \"%s\" at top level.",
912 child->key);
913 }
914 }
915 return (0);
916 } /* }}} int wrr_config */
918 void module_register(void)
919 {
920 plugin_register_complex_config("write_riemann", wrr_config);
921 }
923 /* vim: set sw=8 sts=8 ts=8 noet : */