f836b9dfe900d74f9edac4f4d756d4a791af9c6c
1 /**
2 * collectd - src/write_riemann.c
3 * Copyright (C) 2012,2013 Pierre-Yves Ritschard
4 * Copyright (C) 2013 Florian octo Forster
5 * Copyright (C) 2015,2016 Gergely Nagy
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Authors:
26 * Pierre-Yves Ritschard <pyr at spootnik.org>
27 * Florian octo Forster <octo at collectd.org>
28 * Gergely Nagy <algernon at madhouse-project.org>
29 */
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "utils_complain.h"
41 #include "write_riemann_threshold.h"
43 #define RIEMANN_HOST "localhost"
44 #define RIEMANN_PORT 5555
45 #define RIEMANN_TTL_FACTOR 2.0
46 #define RIEMANN_BATCH_MAX 8192
48 struct riemann_host {
49 c_complain_t init_complaint;
50 char *name;
51 char *event_service_prefix;
52 pthread_mutex_t lock;
53 _Bool batch_mode;
54 _Bool notifications;
55 _Bool check_thresholds;
56 _Bool store_rates;
57 _Bool always_append_ds;
58 char *node;
59 int port;
60 riemann_client_type_t client_type;
61 riemann_client_t *client;
62 double ttl_factor;
63 cdtime_t batch_init;
64 int batch_max;
65 int batch_timeout;
66 int reference_count;
67 riemann_message_t *batch_msg;
68 char *tls_ca_file;
69 char *tls_cert_file;
70 char *tls_key_file;
71 struct timeval timeout;
72 };
74 static char **riemann_tags;
75 static size_t riemann_tags_num;
76 static char **riemann_attrs;
77 static size_t riemann_attrs_num;
79 /* host->lock must be held when calling this function. */
80 static int wrr_connect(struct riemann_host *host) /* {{{ */
81 {
82 char const *node;
83 int port;
85 if (host->client)
86 return 0;
88 node = (host->node != NULL) ? host->node : RIEMANN_HOST;
89 port = (host->port) ? host->port : RIEMANN_PORT;
91 host->client = NULL;
93 host->client = riemann_client_create(host->client_type, node, port,
94 RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
95 RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
96 RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
97 RIEMANN_CLIENT_OPTION_NONE);
98 if (host->client == NULL) {
99 c_complain (LOG_ERR, &host->init_complaint,
100 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
101 node, port);
102 return -1;
103 }
104 if (host->timeout.tv_sec != 0) {
105 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
106 riemann_client_free(host->client);
107 host->client = NULL;
108 c_complain (LOG_ERR, &host->init_complaint,
109 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
110 node, port);
111 return -1;
112 }
113 }
115 c_release (LOG_INFO, &host->init_complaint,
116 "write_riemann plugin: Successfully connected to %s:%d",
117 node, port);
119 return 0;
120 } /* }}} int wrr_connect */
122 /* host->lock must be held when calling this function. */
123 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
124 {
125 if (!host->client)
126 return (0);
128 riemann_client_free(host->client);
129 host->client = NULL;
131 return (0);
132 } /* }}} int wrr_disconnect */
134 /**
135 * Function to send messages to riemann.
136 *
137 * Acquires the host lock, disconnects on errors.
138 */
139 static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
140 {
141 int status = 0;
143 status = wrr_connect(host);
144 if (status != 0) {
145 return status;
146 }
148 status = riemann_client_send_message(host->client, msg);
149 if (status != 0) {
150 wrr_disconnect(host);
151 return status;
152 }
154 /*
155 * For TCP we need to receive message acknowledgemenent.
156 */
157 if (host->client_type != RIEMANN_CLIENT_UDP)
158 {
159 riemann_message_t *response;
161 response = riemann_client_recv_message(host->client);
163 if (response == NULL)
164 {
165 wrr_disconnect(host);
166 return errno;
167 }
168 riemann_message_free(response);
169 }
171 return 0;
172 } /* }}} int wrr_send */
174 static int wrr_send(struct riemann_host *host, riemann_message_t *msg)
175 {
176 int status = 0;
178 pthread_mutex_lock (&host->lock);
179 status = wrr_send_nolock(host, msg);
180 pthread_mutex_unlock (&host->lock);
181 return status;
182 }
184 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
185 notification_t const *n)
186 {
187 riemann_message_t *msg;
188 riemann_event_t *event;
189 char service_buffer[6 * DATA_MAX_NAME_LEN];
190 char const *severity;
191 notification_meta_t *meta;
192 size_t i;
194 switch (n->severity)
195 {
196 case NOTIF_OKAY: severity = "ok"; break;
197 case NOTIF_WARNING: severity = "warning"; break;
198 case NOTIF_FAILURE: severity = "critical"; break;
199 default: severity = "unknown";
200 }
202 format_name(service_buffer, sizeof(service_buffer),
203 /* host = */ "", n->plugin, n->plugin_instance,
204 n->type, n->type_instance);
206 event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
207 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
208 RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
209 RIEMANN_EVENT_FIELD_STATE, severity,
210 RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
211 RIEMANN_EVENT_FIELD_NONE);
213 if (n->host[0] != 0)
214 riemann_event_string_attribute_add(event, "host", n->host);
215 if (n->plugin[0] != 0)
216 riemann_event_string_attribute_add(event, "plugin", n->plugin);
217 if (n->plugin_instance[0] != 0)
218 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
220 if (n->type[0] != 0)
221 riemann_event_string_attribute_add(event, "type", n->type);
222 if (n->type_instance[0] != 0)
223 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
225 for (i = 0; i < riemann_attrs_num; i += 2)
226 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
228 for (i = 0; i < riemann_tags_num; i++)
229 riemann_event_tag_add(event, riemann_tags[i]);
231 if (n->message[0] != 0)
232 riemann_event_string_attribute_add(event, "description", n->message);
234 /* Pull in values from threshold and add extra attributes */
235 for (meta = n->meta; meta != NULL; meta = meta->next)
236 {
237 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
238 {
239 riemann_event_set(event,
240 RIEMANN_EVENT_FIELD_METRIC_D,
241 (double) meta->nm_value.nm_double,
242 RIEMANN_EVENT_FIELD_NONE);
243 continue;
244 }
246 if (meta->type == NM_TYPE_STRING) {
247 riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
248 continue;
249 }
250 }
252 msg = riemann_message_create_with_events(event, NULL);
253 if (msg == NULL)
254 {
255 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
256 riemann_event_free (event);
257 return (NULL);
258 }
260 DEBUG("write_riemann plugin: Successfully created message for notification: "
261 "host = \"%s\", service = \"%s\", state = \"%s\"",
262 event->host, event->service, event->state);
263 return (msg);
264 } /* }}} riemann_message_t *wrr_notification_to_message */
266 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
267 data_set_t const *ds,
268 value_list_t const *vl, size_t index,
269 gauge_t const *rates,
270 int status)
271 {
272 riemann_event_t *event;
273 char name_buffer[5 * DATA_MAX_NAME_LEN];
274 char service_buffer[6 * DATA_MAX_NAME_LEN];
275 size_t i;
277 event = riemann_event_new();
278 if (event == NULL)
279 {
280 ERROR("write_riemann plugin: riemann_event_new() failed.");
281 return (NULL);
282 }
284 format_name(name_buffer, sizeof(name_buffer),
285 /* host = */ "", vl->plugin, vl->plugin_instance,
286 vl->type, vl->type_instance);
287 if (host->always_append_ds || (ds->ds_num > 1))
288 {
289 if (host->event_service_prefix == NULL)
290 ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
291 &name_buffer[1], ds->ds[index].name);
292 else
293 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
294 host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
295 }
296 else
297 {
298 if (host->event_service_prefix == NULL)
299 sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
300 else
301 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
302 host->event_service_prefix, &name_buffer[1]);
303 }
305 riemann_event_set(event,
306 RIEMANN_EVENT_FIELD_HOST, vl->host,
307 RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
308 RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
309 RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
310 "plugin", vl->plugin,
311 "type", vl->type,
312 "ds_name", ds->ds[index].name,
313 NULL,
314 RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
315 RIEMANN_EVENT_FIELD_NONE);
317 if (host->check_thresholds) {
318 const char *state = NULL;
320 switch (status) {
321 case STATE_OKAY:
322 state = "ok";
323 break;
324 case STATE_ERROR:
325 state = "critical";
326 break;
327 case STATE_WARNING:
328 state = "warning";
329 break;
330 case STATE_MISSING:
331 state = "unknown";
332 break;
333 }
334 if (state)
335 riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
336 RIEMANN_EVENT_FIELD_NONE);
337 }
339 if (vl->plugin_instance[0] != 0)
340 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
341 if (vl->type_instance[0] != 0)
342 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
344 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
345 {
346 char ds_type[DATA_MAX_NAME_LEN];
348 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
349 DS_TYPE_TO_STRING(ds->ds[index].type));
350 riemann_event_string_attribute_add(event, "ds_type", ds_type);
351 }
352 else
353 {
354 riemann_event_string_attribute_add(event, "ds_type",
355 DS_TYPE_TO_STRING(ds->ds[index].type));
356 }
358 {
359 char ds_index[DATA_MAX_NAME_LEN];
361 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
362 riemann_event_string_attribute_add(event, "ds_index", ds_index);
363 }
365 for (i = 0; i < riemann_attrs_num; i += 2)
366 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
368 for (i = 0; i < riemann_tags_num; i++)
369 riemann_event_tag_add(event, riemann_tags[i]);
371 if (ds->ds[index].type == DS_TYPE_GAUGE)
372 {
373 riemann_event_set(event,
374 RIEMANN_EVENT_FIELD_METRIC_D,
375 (double) vl->values[index].gauge,
376 RIEMANN_EVENT_FIELD_NONE);
377 }
378 else if (rates != NULL)
379 {
380 riemann_event_set(event,
381 RIEMANN_EVENT_FIELD_METRIC_D,
382 (double) rates[index],
383 RIEMANN_EVENT_FIELD_NONE);
384 }
385 else
386 {
387 int64_t metric;
389 if (ds->ds[index].type == DS_TYPE_DERIVE)
390 metric = (int64_t) vl->values[index].derive;
391 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
392 metric = (int64_t) vl->values[index].absolute;
393 else
394 metric = (int64_t) vl->values[index].counter;
396 riemann_event_set(event,
397 RIEMANN_EVENT_FIELD_METRIC_S64,
398 (int64_t) metric,
399 RIEMANN_EVENT_FIELD_NONE);
400 }
402 DEBUG("write_riemann plugin: Successfully created message for metric: "
403 "host = \"%s\", service = \"%s\"",
404 event->host, event->service);
405 return (event);
406 } /* }}} riemann_event_t *wrr_value_to_event */
408 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
409 data_set_t const *ds,
410 value_list_t const *vl,
411 int *statuses)
412 {
413 riemann_message_t *msg;
414 size_t i;
415 gauge_t *rates = NULL;
417 /* Initialize the Msg structure. */
418 msg = riemann_message_new();
419 if (msg == NULL)
420 {
421 ERROR ("write_riemann plugin: riemann_message_new failed.");
422 return (NULL);
423 }
425 if (host->store_rates)
426 {
427 rates = uc_get_rate(ds, vl);
428 if (rates == NULL)
429 {
430 ERROR("write_riemann plugin: uc_get_rate failed.");
431 riemann_message_free(msg);
432 return (NULL);
433 }
434 }
436 for (i = 0; i < vl->values_len; i++)
437 {
438 riemann_event_t *event;
440 event = wrr_value_to_event(host, ds, vl,
441 (int) i, rates, statuses[i]);
442 if (event == NULL)
443 {
444 riemann_message_free(msg);
445 sfree(rates);
446 return (NULL);
447 }
448 riemann_message_append_events(msg, event, NULL);
449 }
451 sfree(rates);
452 return (msg);
453 } /* }}} riemann_message_t *wrr_value_list_to_message */
455 /*
456 * Always call while holding host->lock !
457 */
458 static int wrr_batch_flush_nolock(cdtime_t timeout,
459 struct riemann_host *host)
460 {
461 cdtime_t now;
462 int status = 0;
464 if (timeout > 0) {
465 now = cdtime();
466 if ((host->batch_init + timeout) > now)
467 return status;
468 }
469 wrr_send_nolock(host, host->batch_msg);
470 riemann_message_free(host->batch_msg);
472 if (host->client_type != RIEMANN_CLIENT_UDP)
473 {
474 riemann_message_t *response;
476 response = riemann_client_recv_message(host->client);
478 if (!response)
479 {
480 wrr_disconnect(host);
481 return errno;
482 }
484 riemann_message_free(response);
485 }
487 host->batch_init = cdtime();
488 host->batch_msg = NULL;
489 return status;
490 }
492 static int wrr_batch_flush(cdtime_t timeout,
493 const char *identifier __attribute__((unused)),
494 user_data_t *user_data)
495 {
496 struct riemann_host *host;
497 int status;
499 if (user_data == NULL)
500 return (-EINVAL);
502 host = user_data->data;
503 pthread_mutex_lock(&host->lock);
504 status = wrr_batch_flush_nolock(timeout, host);
505 if (status != 0)
506 c_complain (LOG_ERR, &host->init_complaint,
507 "write_riemann plugin: riemann_client_send failed with status %i",
508 status);
509 else
510 c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
512 pthread_mutex_unlock(&host->lock);
513 return status;
514 }
516 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
517 data_set_t const *ds,
518 value_list_t const *vl,
519 int *statuses)
520 {
521 riemann_message_t *msg;
522 size_t len;
523 int ret;
524 cdtime_t timeout;
526 msg = wrr_value_list_to_message(host, ds, vl, statuses);
527 if (msg == NULL)
528 return -1;
530 pthread_mutex_lock(&host->lock);
532 if (host->batch_msg == NULL) {
533 host->batch_msg = msg;
534 } else {
535 int status;
537 status = riemann_message_append_events_n(host->batch_msg,
538 msg->n_events,
539 msg->events);
540 msg->n_events = 0;
541 msg->events = NULL;
543 riemann_message_free(msg);
545 if (status != 0) {
546 pthread_mutex_unlock(&host->lock);
547 ERROR("write_riemann plugin: out of memory");
548 return -1;
549 }
550 }
552 len = riemann_message_get_packed_size(host->batch_msg);
553 ret = 0;
554 if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
555 ret = wrr_batch_flush_nolock(0, host);
556 } else {
557 if (host->batch_timeout > 0) {
558 timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
559 ret = wrr_batch_flush_nolock(timeout, host);
560 }
561 }
563 pthread_mutex_unlock(&host->lock);
564 return ret;
565 } /* }}} riemann_message_t *wrr_batch_add_value_list */
567 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
568 {
569 int status;
570 struct riemann_host *host = ud->data;
571 riemann_message_t *msg;
573 if (!host->notifications)
574 return 0;
576 /*
577 * Never batch for notifications, send them ASAP
578 */
579 msg = wrr_notification_to_message(host, n);
580 if (msg == NULL)
581 return (-1);
583 status = wrr_send(host, msg);
584 if (status != 0)
585 c_complain (LOG_ERR, &host->init_complaint,
586 "write_riemann plugin: riemann_client_send failed with status %i",
587 status);
588 else
589 c_release (LOG_DEBUG, &host->init_complaint,
590 "write_riemann plugin: riemann_client_send succeeded");
592 riemann_message_free(msg);
593 return (status);
594 } /* }}} int wrr_notification */
596 static int wrr_write(const data_set_t *ds, /* {{{ */
597 const value_list_t *vl,
598 user_data_t *ud)
599 {
600 int status = 0;
601 int statuses[vl->values_len];
602 struct riemann_host *host = ud->data;
603 riemann_message_t *msg;
605 if (host->check_thresholds) {
606 status = write_riemann_threshold_check(ds, vl, statuses);
607 if (status != 0)
608 return status;
609 } else {
610 memset (statuses, 0, sizeof (statuses));
611 }
613 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
614 wrr_batch_add_value_list(host, ds, vl, statuses);
615 } else {
616 msg = wrr_value_list_to_message(host, ds, vl, statuses);
617 if (msg == NULL)
618 return (-1);
620 status = wrr_send(host, msg);
622 riemann_message_free(msg);
623 }
624 return status;
625 } /* }}} int wrr_write */
627 static void wrr_free(void *p) /* {{{ */
628 {
629 struct riemann_host *host = p;
631 if (host == NULL)
632 return;
634 pthread_mutex_lock(&host->lock);
636 host->reference_count--;
637 if (host->reference_count > 0)
638 {
639 pthread_mutex_unlock(&host->lock);
640 return;
641 }
643 wrr_disconnect(host);
645 pthread_mutex_destroy(&host->lock);
646 sfree(host);
647 } /* }}} void wrr_free */
649 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
650 {
651 struct riemann_host *host = NULL;
652 int status = 0;
653 int i;
654 oconfig_item_t *child;
655 char callback_name[DATA_MAX_NAME_LEN];
656 user_data_t ud;
658 if ((host = calloc(1, sizeof(*host))) == NULL) {
659 ERROR ("write_riemann plugin: calloc failed.");
660 return ENOMEM;
661 }
662 pthread_mutex_init(&host->lock, NULL);
663 C_COMPLAIN_INIT (&host->init_complaint);
664 host->reference_count = 1;
665 host->node = NULL;
666 host->port = 0;
667 host->notifications = 1;
668 host->check_thresholds = 0;
669 host->store_rates = 1;
670 host->always_append_ds = 0;
671 host->batch_mode = 1;
672 host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
673 host->batch_init = cdtime();
674 host->batch_timeout = 0;
675 host->ttl_factor = RIEMANN_TTL_FACTOR;
676 host->client = NULL;
677 host->client_type = RIEMANN_CLIENT_TCP;
678 host->timeout.tv_sec = 0;
679 host->timeout.tv_usec = 0;
681 status = cf_util_get_string(ci, &host->name);
682 if (status != 0) {
683 WARNING("write_riemann plugin: Required host name is missing.");
684 wrr_free(host);
685 return -1;
686 }
688 for (i = 0; i < ci->children_num; i++) {
689 /*
690 * The code here could be simplified but makes room
691 * for easy adding of new options later on.
692 */
693 child = &ci->children[i];
694 status = 0;
696 if (strcasecmp("Host", child->key) == 0) {
697 status = cf_util_get_string(child, &host->node);
698 if (status != 0)
699 break;
700 } else if (strcasecmp("Notifications", child->key) == 0) {
701 status = cf_util_get_boolean(child, &host->notifications);
702 if (status != 0)
703 break;
704 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
705 status = cf_util_get_string(child, &host->event_service_prefix);
706 if (status != 0)
707 break;
708 } else if (strcasecmp("CheckThresholds", child->key) == 0) {
709 status = cf_util_get_boolean(child, &host->check_thresholds);
710 if (status != 0)
711 break;
712 } else if (strcasecmp("Batch", child->key) == 0) {
713 status = cf_util_get_boolean(child, &host->batch_mode);
714 if (status != 0)
715 break;
716 } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
717 status = cf_util_get_int(child, &host->batch_max);
718 if (status != 0)
719 break;
720 } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
721 status = cf_util_get_int(child, &host->batch_timeout);
722 if (status != 0)
723 break;
724 } else if (strcasecmp("Timeout", child->key) == 0) {
725 status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
726 if (status != 0)
727 break;
728 } else if (strcasecmp("Port", child->key) == 0) {
729 host->port = cf_util_get_port_number(child);
730 if (host->port == -1) {
731 ERROR("write_riemann plugin: Invalid argument "
732 "configured for the \"Port\" "
733 "option.");
734 break;
735 }
736 } else if (strcasecmp("Protocol", child->key) == 0) {
737 char tmp[16];
738 status = cf_util_get_string_buffer(child,
739 tmp, sizeof(tmp));
740 if (status != 0)
741 {
742 ERROR("write_riemann plugin: cf_util_get_"
743 "string_buffer failed with "
744 "status %i.", status);
745 break;
746 }
748 if (strcasecmp("UDP", tmp) == 0)
749 host->client_type = RIEMANN_CLIENT_UDP;
750 else if (strcasecmp("TCP", tmp) == 0)
751 host->client_type = RIEMANN_CLIENT_TCP;
752 else if (strcasecmp("TLS", tmp) == 0)
753 host->client_type = RIEMANN_CLIENT_TLS;
754 else
755 WARNING("write_riemann plugin: The value "
756 "\"%s\" is not valid for the "
757 "\"Protocol\" option. Use "
758 "either \"UDP\", \"TCP\" or \"TLS\".",
759 tmp);
760 } else if (strcasecmp("TLSCAFile", child->key) == 0) {
761 status = cf_util_get_string(child, &host->tls_ca_file);
762 if (status != 0)
763 {
764 ERROR("write_riemann plugin: cf_util_get_"
765 "string_buffer failed with "
766 "status %i.", status);
767 break;
768 }
769 } else if (strcasecmp("TLSCertFile", child->key) == 0) {
770 status = cf_util_get_string(child, &host->tls_cert_file);
771 if (status != 0)
772 {
773 ERROR("write_riemann plugin: cf_util_get_"
774 "string_buffer failed with "
775 "status %i.", status);
776 break;
777 }
778 } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
779 status = cf_util_get_string(child, &host->tls_key_file);
780 if (status != 0)
781 {
782 ERROR("write_riemann plugin: cf_util_get_"
783 "string_buffer failed with "
784 "status %i.", status);
785 break;
786 }
787 } else if (strcasecmp("StoreRates", child->key) == 0) {
788 status = cf_util_get_boolean(child, &host->store_rates);
789 if (status != 0)
790 break;
791 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
792 status = cf_util_get_boolean(child,
793 &host->always_append_ds);
794 if (status != 0)
795 break;
796 } else if (strcasecmp("TTLFactor", child->key) == 0) {
797 double tmp = NAN;
798 status = cf_util_get_double(child, &tmp);
799 if (status != 0)
800 break;
801 if (tmp >= 2.0) {
802 host->ttl_factor = tmp;
803 } else if (tmp >= 1.0) {
804 NOTICE("write_riemann plugin: The configured "
805 "TTLFactor is very small "
806 "(%.1f). A value of 2.0 or "
807 "greater is recommended.",
808 tmp);
809 host->ttl_factor = tmp;
810 } else if (tmp > 0.0) {
811 WARNING("write_riemann plugin: The configured "
812 "TTLFactor is too small to be "
813 "useful (%.1f). I'll use it "
814 "since the user knows best, "
815 "but under protest.",
816 tmp);
817 host->ttl_factor = tmp;
818 } else { /* zero, negative and NAN */
819 ERROR("write_riemann plugin: The configured "
820 "TTLFactor is invalid (%.1f).",
821 tmp);
822 }
823 } else {
824 WARNING("write_riemann plugin: ignoring unknown config "
825 "option: \"%s\"", child->key);
826 }
827 }
828 if (status != 0) {
829 wrr_free(host);
830 return status;
831 }
833 ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
834 host->name);
835 ud.data = host;
836 ud.free_func = wrr_free;
838 pthread_mutex_lock(&host->lock);
840 status = plugin_register_write(callback_name, wrr_write, &ud);
842 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
843 ud.free_func = NULL;
844 plugin_register_flush(callback_name, wrr_batch_flush, &ud);
845 }
846 if (status != 0)
847 WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
848 "failed with status %i.",
849 callback_name, status);
850 else /* success */
851 host->reference_count++;
853 status = plugin_register_notification(callback_name,
854 wrr_notification, &ud);
855 if (status != 0)
856 WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
857 "failed with status %i.",
858 callback_name, status);
859 else /* success */
860 host->reference_count++;
862 if (host->reference_count <= 1)
863 {
864 /* Both callbacks failed => free memory.
865 * We need to unlock here, because riemann_free() will lock.
866 * This is not a race condition, because we're the only one
867 * holding a reference. */
868 pthread_mutex_unlock(&host->lock);
869 wrr_free(host);
870 return (-1);
871 }
873 host->reference_count--;
874 pthread_mutex_unlock(&host->lock);
876 return status;
877 } /* }}} int wrr_config_node */
879 static int wrr_config(oconfig_item_t *ci) /* {{{ */
880 {
881 int i;
882 oconfig_item_t *child;
883 int status;
885 for (i = 0; i < ci->children_num; i++) {
886 child = &ci->children[i];
888 if (strcasecmp("Node", child->key) == 0) {
889 wrr_config_node (child);
890 } else if (strcasecmp(child->key, "attribute") == 0) {
891 char *key = NULL;
892 char *val = NULL;
894 if (child->values_num != 2) {
895 WARNING("riemann attributes need both a key and a value.");
896 return (-1);
897 }
898 if (child->values[0].type != OCONFIG_TYPE_STRING ||
899 child->values[1].type != OCONFIG_TYPE_STRING) {
900 WARNING("riemann attribute needs string arguments.");
901 return (-1);
902 }
903 if ((key = strdup(child->values[0].value.string)) == NULL) {
904 WARNING("cannot allocate memory for attribute key.");
905 return (-1);
906 }
907 if ((val = strdup(child->values[1].value.string)) == NULL) {
908 WARNING("cannot allocate memory for attribute value.");
909 sfree(key);
910 return (-1);
911 }
912 strarray_add(&riemann_attrs, &riemann_attrs_num, key);
913 strarray_add(&riemann_attrs, &riemann_attrs_num, val);
914 DEBUG("write_riemann: got attr: %s => %s", key, val);
915 sfree(key);
916 sfree(val);
917 } else if (strcasecmp(child->key, "tag") == 0) {
918 char *tmp = NULL;
919 status = cf_util_get_string(child, &tmp);
920 if (status != 0)
921 continue;
923 strarray_add(&riemann_tags, &riemann_tags_num, tmp);
924 DEBUG("write_riemann plugin: Got tag: %s", tmp);
925 sfree(tmp);
926 } else {
927 WARNING("write_riemann plugin: Ignoring unknown "
928 "configuration option \"%s\" at top level.",
929 child->key);
930 }
931 }
932 return (0);
933 } /* }}} int wrr_config */
935 void module_register(void)
936 {
937 plugin_register_complex_config("write_riemann", wrr_config);
938 }
940 /* vim: set sw=8 sts=8 ts=8 noet : */