0ed02603283036d0060b16c75849e87272372260
1 /**
2 * collectd - src/write_riemann.c
3 * Copyright (C) 2012,2013 Pierre-Yves Ritschard
4 * Copyright (C) 2013 Florian octo Forster
5 * Copyright (C) 2015,2016 Gergely Nagy
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Authors:
26 * Pierre-Yves Ritschard <pyr at spootnik.org>
27 * Florian octo Forster <octo at collectd.org>
28 * Gergely Nagy <algernon at madhouse-project.org>
29 */
31 #include "collectd.h"
33 #include "common.h"
34 #include "plugin.h"
35 #include "utils_cache.h"
36 #include "utils_complain.h"
37 #include "write_riemann_threshold.h"
39 #include <errno.h>
40 #include <riemann/riemann-client.h>
42 #define RIEMANN_HOST "localhost"
43 #define RIEMANN_PORT 5555
44 #define RIEMANN_TTL_FACTOR 2.0
45 #define RIEMANN_BATCH_MAX 8192
47 struct riemann_host {
48 c_complain_t init_complaint;
49 char *name;
50 char *event_service_prefix;
51 pthread_mutex_t lock;
52 _Bool batch_mode;
53 _Bool notifications;
54 _Bool check_thresholds;
55 _Bool store_rates;
56 _Bool always_append_ds;
57 char *node;
58 int port;
59 riemann_client_type_t client_type;
60 riemann_client_t *client;
61 double ttl_factor;
62 cdtime_t batch_init;
63 int batch_max;
64 int batch_timeout;
65 int reference_count;
66 riemann_message_t *batch_msg;
67 char *tls_ca_file;
68 char *tls_cert_file;
69 char *tls_key_file;
70 struct timeval timeout;
71 };
73 static char **riemann_tags;
74 static size_t riemann_tags_num;
75 static char **riemann_attrs;
76 static size_t riemann_attrs_num;
78 /* host->lock must be held when calling this function. */
79 static int wrr_connect(struct riemann_host *host) /* {{{ */
80 {
81 char const *node;
82 int port;
84 if (host->client)
85 return 0;
87 node = (host->node != NULL) ? host->node : RIEMANN_HOST;
88 port = (host->port) ? host->port : RIEMANN_PORT;
90 host->client = NULL;
92 host->client = riemann_client_create(
93 host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
94 host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
95 host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
96 host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
97 if (host->client == NULL) {
98 c_complain(LOG_ERR, &host->init_complaint,
99 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
100 node, port);
101 return -1;
102 }
103 if (host->timeout.tv_sec != 0) {
104 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
105 riemann_client_free(host->client);
106 host->client = NULL;
107 c_complain(LOG_ERR, &host->init_complaint,
108 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
109 node, port);
110 return -1;
111 }
112 }
114 set_sock_opts(riemann_client_get_fd(host->client));
116 c_release(LOG_INFO, &host->init_complaint,
117 "write_riemann plugin: Successfully connected to %s:%d", node,
118 port);
120 return 0;
121 } /* }}} int wrr_connect */
123 /* host->lock must be held when calling this function. */
124 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
125 {
126 if (!host->client)
127 return (0);
129 riemann_client_free(host->client);
130 host->client = NULL;
132 return (0);
133 } /* }}} int wrr_disconnect */
135 /**
136 * Function to send messages to riemann.
137 *
138 * Acquires the host lock, disconnects on errors.
139 */
140 static int wrr_send_nolock(struct riemann_host *host,
141 riemann_message_t *msg) /* {{{ */
142 {
143 int status = 0;
145 status = wrr_connect(host);
146 if (status != 0) {
147 return status;
148 }
150 status = riemann_client_send_message(host->client, msg);
151 if (status != 0) {
152 wrr_disconnect(host);
153 return status;
154 }
156 /*
157 * For TCP we need to receive message acknowledgemenent.
158 */
159 if (host->client_type != RIEMANN_CLIENT_UDP) {
160 riemann_message_t *response;
162 response = riemann_client_recv_message(host->client);
164 if (response == NULL) {
165 wrr_disconnect(host);
166 return errno;
167 }
168 riemann_message_free(response);
169 }
171 return 0;
172 } /* }}} int wrr_send */
174 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
175 int status = 0;
177 pthread_mutex_lock(&host->lock);
178 status = wrr_send_nolock(host, msg);
179 pthread_mutex_unlock(&host->lock);
180 return status;
181 }
183 static riemann_message_t *
184 wrr_notification_to_message(struct riemann_host *host, /* {{{ */
185 notification_t const *n) {
186 riemann_message_t *msg;
187 riemann_event_t *event;
188 char service_buffer[6 * DATA_MAX_NAME_LEN];
189 char const *severity;
191 switch (n->severity) {
192 case NOTIF_OKAY:
193 severity = "ok";
194 break;
195 case NOTIF_WARNING:
196 severity = "warning";
197 break;
198 case NOTIF_FAILURE:
199 severity = "critical";
200 break;
201 default:
202 severity = "unknown";
203 }
205 format_name(service_buffer, sizeof(service_buffer),
206 /* host = */ "", n->plugin, n->plugin_instance, n->type,
207 n->type_instance);
209 event = riemann_event_create(
210 RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
211 (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
212 "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
213 RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
214 RIEMANN_EVENT_FIELD_NONE);
216 if (n->host[0] != 0)
217 riemann_event_string_attribute_add(event, "host", n->host);
218 if (n->plugin[0] != 0)
219 riemann_event_string_attribute_add(event, "plugin", n->plugin);
220 if (n->plugin_instance[0] != 0)
221 riemann_event_string_attribute_add(event, "plugin_instance",
222 n->plugin_instance);
224 if (n->type[0] != 0)
225 riemann_event_string_attribute_add(event, "type", n->type);
226 if (n->type_instance[0] != 0)
227 riemann_event_string_attribute_add(event, "type_instance",
228 n->type_instance);
230 for (size_t i = 0; i < riemann_attrs_num; i += 2)
231 riemann_event_string_attribute_add(event, riemann_attrs[i],
232 riemann_attrs[i + 1]);
234 for (size_t i = 0; i < riemann_tags_num; i++)
235 riemann_event_tag_add(event, riemann_tags[i]);
237 if (n->message[0] != 0)
238 riemann_event_string_attribute_add(event, "description", n->message);
240 /* Pull in values from threshold and add extra attributes */
241 for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
242 if (strcasecmp("CurrentValue", meta->name) == 0 &&
243 meta->type == NM_TYPE_DOUBLE) {
244 riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
245 (double)meta->nm_value.nm_double,
246 RIEMANN_EVENT_FIELD_NONE);
247 continue;
248 }
250 if (meta->type == NM_TYPE_STRING) {
251 riemann_event_string_attribute_add(event, meta->name,
252 meta->nm_value.nm_string);
253 continue;
254 }
255 }
257 msg = riemann_message_create_with_events(event, NULL);
258 if (msg == NULL) {
259 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
260 riemann_event_free(event);
261 return (NULL);
262 }
264 DEBUG("write_riemann plugin: Successfully created message for notification: "
265 "host = \"%s\", service = \"%s\", state = \"%s\"",
266 event->host, event->service, event->state);
267 return (msg);
268 } /* }}} riemann_message_t *wrr_notification_to_message */
270 static riemann_event_t *
271 wrr_value_to_event(struct riemann_host const *host, /* {{{ */
272 data_set_t const *ds, value_list_t const *vl, size_t index,
273 gauge_t const *rates, int status) {
274 riemann_event_t *event;
275 char name_buffer[5 * DATA_MAX_NAME_LEN];
276 char service_buffer[6 * DATA_MAX_NAME_LEN];
277 size_t i;
279 event = riemann_event_new();
280 if (event == NULL) {
281 ERROR("write_riemann plugin: riemann_event_new() failed.");
282 return (NULL);
283 }
285 format_name(name_buffer, sizeof(name_buffer),
286 /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
287 vl->type_instance);
288 if (host->always_append_ds || (ds->ds_num > 1)) {
289 if (host->event_service_prefix == NULL)
290 ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
291 &name_buffer[1], ds->ds[index].name);
292 else
293 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
294 host->event_service_prefix, &name_buffer[1],
295 ds->ds[index].name);
296 } else {
297 if (host->event_service_prefix == NULL)
298 sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
299 else
300 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
301 host->event_service_prefix, &name_buffer[1]);
302 }
304 riemann_event_set(
305 event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
306 (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
307 (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
308 RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
309 vl->type, "ds_name", ds->ds[index].name, NULL,
310 RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
312 if (host->check_thresholds) {
313 const char *state = NULL;
315 switch (status) {
316 case STATE_OKAY:
317 state = "ok";
318 break;
319 case STATE_ERROR:
320 state = "critical";
321 break;
322 case STATE_WARNING:
323 state = "warning";
324 break;
325 case STATE_MISSING:
326 state = "unknown";
327 break;
328 }
329 if (state)
330 riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
331 RIEMANN_EVENT_FIELD_NONE);
332 }
334 if (vl->plugin_instance[0] != 0)
335 riemann_event_string_attribute_add(event, "plugin_instance",
336 vl->plugin_instance);
337 if (vl->type_instance[0] != 0)
338 riemann_event_string_attribute_add(event, "type_instance",
339 vl->type_instance);
341 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
342 char ds_type[DATA_MAX_NAME_LEN];
344 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
345 DS_TYPE_TO_STRING(ds->ds[index].type));
346 riemann_event_string_attribute_add(event, "ds_type", ds_type);
347 } else {
348 riemann_event_string_attribute_add(event, "ds_type",
349 DS_TYPE_TO_STRING(ds->ds[index].type));
350 }
352 {
353 char ds_index[DATA_MAX_NAME_LEN];
355 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
356 riemann_event_string_attribute_add(event, "ds_index", ds_index);
357 }
359 for (i = 0; i < riemann_attrs_num; i += 2)
360 riemann_event_string_attribute_add(event, riemann_attrs[i],
361 riemann_attrs[i + 1]);
363 for (i = 0; i < riemann_tags_num; i++)
364 riemann_event_tag_add(event, riemann_tags[i]);
366 if (ds->ds[index].type == DS_TYPE_GAUGE) {
367 riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
368 (double)vl->values[index].gauge,
369 RIEMANN_EVENT_FIELD_NONE);
370 } else if (rates != NULL) {
371 riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
372 RIEMANN_EVENT_FIELD_NONE);
373 } else {
374 int64_t metric;
376 if (ds->ds[index].type == DS_TYPE_DERIVE)
377 metric = (int64_t)vl->values[index].derive;
378 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
379 metric = (int64_t)vl->values[index].absolute;
380 else
381 metric = (int64_t)vl->values[index].counter;
383 riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
384 RIEMANN_EVENT_FIELD_NONE);
385 }
387 DEBUG("write_riemann plugin: Successfully created message for metric: "
388 "host = \"%s\", service = \"%s\"",
389 event->host, event->service);
390 return (event);
391 } /* }}} riemann_event_t *wrr_value_to_event */
393 static riemann_message_t *
394 wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
395 data_set_t const *ds, value_list_t const *vl,
396 int *statuses) {
397 riemann_message_t *msg;
398 size_t i;
399 gauge_t *rates = NULL;
401 /* Initialize the Msg structure. */
402 msg = riemann_message_new();
403 if (msg == NULL) {
404 ERROR("write_riemann plugin: riemann_message_new failed.");
405 return (NULL);
406 }
408 if (host->store_rates) {
409 rates = uc_get_rate(ds, vl);
410 if (rates == NULL) {
411 ERROR("write_riemann plugin: uc_get_rate failed.");
412 riemann_message_free(msg);
413 return (NULL);
414 }
415 }
417 for (i = 0; i < vl->values_len; i++) {
418 riemann_event_t *event;
420 event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
421 if (event == NULL) {
422 riemann_message_free(msg);
423 sfree(rates);
424 return (NULL);
425 }
426 riemann_message_append_events(msg, event, NULL);
427 }
429 sfree(rates);
430 return (msg);
431 } /* }}} riemann_message_t *wrr_value_list_to_message */
433 /*
434 * Always call while holding host->lock !
435 */
436 static int wrr_batch_flush_nolock(cdtime_t timeout, struct riemann_host *host) {
437 cdtime_t now;
438 int status = 0;
440 now = cdtime();
441 if (timeout > 0) {
442 if ((host->batch_init + timeout) > now) {
443 return status;
444 }
445 }
446 wrr_send_nolock(host, host->batch_msg);
447 riemann_message_free(host->batch_msg);
449 host->batch_init = now;
450 host->batch_msg = NULL;
451 return status;
452 }
454 static int wrr_batch_flush(cdtime_t timeout,
455 const char *identifier __attribute__((unused)),
456 user_data_t *user_data) {
457 struct riemann_host *host;
458 int status;
460 if (user_data == NULL)
461 return (-EINVAL);
463 host = user_data->data;
464 pthread_mutex_lock(&host->lock);
465 status = wrr_batch_flush_nolock(timeout, host);
466 if (status != 0)
467 c_complain(
468 LOG_ERR, &host->init_complaint,
469 "write_riemann plugin: riemann_client_send failed with status %i",
470 status);
471 else
472 c_release(LOG_DEBUG, &host->init_complaint,
473 "write_riemann plugin: batch sent.");
475 pthread_mutex_unlock(&host->lock);
476 return status;
477 }
479 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
480 data_set_t const *ds,
481 value_list_t const *vl, int *statuses) {
482 riemann_message_t *msg;
483 size_t len;
484 int ret;
485 cdtime_t timeout;
487 msg = wrr_value_list_to_message(host, ds, vl, statuses);
488 if (msg == NULL)
489 return -1;
491 pthread_mutex_lock(&host->lock);
493 if (host->batch_msg == NULL) {
494 host->batch_msg = msg;
495 } else {
496 int status;
498 status = riemann_message_append_events_n(host->batch_msg, msg->n_events,
499 msg->events);
500 msg->n_events = 0;
501 msg->events = NULL;
503 riemann_message_free(msg);
505 if (status != 0) {
506 pthread_mutex_unlock(&host->lock);
507 ERROR("write_riemann plugin: out of memory");
508 return -1;
509 }
510 }
512 len = riemann_message_get_packed_size(host->batch_msg);
513 ret = 0;
514 if ((host->batch_max < 0) || (((size_t)host->batch_max) <= len)) {
515 ret = wrr_batch_flush_nolock(0, host);
516 } else {
517 if (host->batch_timeout > 0) {
518 timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
519 ret = wrr_batch_flush_nolock(timeout, host);
520 }
521 }
523 pthread_mutex_unlock(&host->lock);
524 return ret;
525 } /* }}} riemann_message_t *wrr_batch_add_value_list */
527 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
528 {
529 int status;
530 struct riemann_host *host = ud->data;
531 riemann_message_t *msg;
533 if (!host->notifications)
534 return 0;
536 /*
537 * Never batch for notifications, send them ASAP
538 */
539 msg = wrr_notification_to_message(host, n);
540 if (msg == NULL)
541 return (-1);
543 status = wrr_send(host, msg);
544 if (status != 0)
545 c_complain(
546 LOG_ERR, &host->init_complaint,
547 "write_riemann plugin: riemann_client_send failed with status %i",
548 status);
549 else
550 c_release(LOG_DEBUG, &host->init_complaint,
551 "write_riemann plugin: riemann_client_send succeeded");
553 riemann_message_free(msg);
554 return (status);
555 } /* }}} int wrr_notification */
557 static int wrr_write(const data_set_t *ds, /* {{{ */
558 const value_list_t *vl, user_data_t *ud) {
559 int status = 0;
560 int statuses[vl->values_len];
561 struct riemann_host *host = ud->data;
562 riemann_message_t *msg;
564 if (host->check_thresholds) {
565 status = write_riemann_threshold_check(ds, vl, statuses);
566 if (status != 0)
567 return status;
568 } else {
569 memset(statuses, 0, sizeof(statuses));
570 }
572 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
573 wrr_batch_add_value_list(host, ds, vl, statuses);
574 } else {
575 msg = wrr_value_list_to_message(host, ds, vl, statuses);
576 if (msg == NULL)
577 return (-1);
579 status = wrr_send(host, msg);
581 riemann_message_free(msg);
582 }
583 return status;
584 } /* }}} int wrr_write */
586 static void wrr_free(void *p) /* {{{ */
587 {
588 struct riemann_host *host = p;
590 if (host == NULL)
591 return;
593 pthread_mutex_lock(&host->lock);
595 host->reference_count--;
596 if (host->reference_count > 0) {
597 pthread_mutex_unlock(&host->lock);
598 return;
599 }
601 wrr_disconnect(host);
603 pthread_mutex_destroy(&host->lock);
604 sfree(host);
605 } /* }}} void wrr_free */
607 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
608 {
609 struct riemann_host *host = NULL;
610 int status = 0;
611 int i;
612 oconfig_item_t *child;
613 char callback_name[DATA_MAX_NAME_LEN];
614 user_data_t ud;
616 if ((host = calloc(1, sizeof(*host))) == NULL) {
617 ERROR("write_riemann plugin: calloc failed.");
618 return ENOMEM;
619 }
620 pthread_mutex_init(&host->lock, NULL);
621 C_COMPLAIN_INIT(&host->init_complaint);
622 host->reference_count = 1;
623 host->node = NULL;
624 host->port = 0;
625 host->notifications = 1;
626 host->check_thresholds = 0;
627 host->store_rates = 1;
628 host->always_append_ds = 0;
629 host->batch_mode = 1;
630 host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
631 host->batch_init = cdtime();
632 host->batch_timeout = 0;
633 host->ttl_factor = RIEMANN_TTL_FACTOR;
634 host->client = NULL;
635 host->client_type = RIEMANN_CLIENT_TCP;
636 host->timeout.tv_sec = 0;
637 host->timeout.tv_usec = 0;
639 status = cf_util_get_string(ci, &host->name);
640 if (status != 0) {
641 WARNING("write_riemann plugin: Required host name is missing.");
642 wrr_free(host);
643 return -1;
644 }
646 for (i = 0; i < ci->children_num; i++) {
647 /*
648 * The code here could be simplified but makes room
649 * for easy adding of new options later on.
650 */
651 child = &ci->children[i];
652 status = 0;
654 if (strcasecmp("Host", child->key) == 0) {
655 status = cf_util_get_string(child, &host->node);
656 if (status != 0)
657 break;
658 } else if (strcasecmp("Notifications", child->key) == 0) {
659 status = cf_util_get_boolean(child, &host->notifications);
660 if (status != 0)
661 break;
662 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
663 status = cf_util_get_string(child, &host->event_service_prefix);
664 if (status != 0)
665 break;
666 } else if (strcasecmp("CheckThresholds", child->key) == 0) {
667 status = cf_util_get_boolean(child, &host->check_thresholds);
668 if (status != 0)
669 break;
670 } else if (strcasecmp("Batch", child->key) == 0) {
671 status = cf_util_get_boolean(child, &host->batch_mode);
672 if (status != 0)
673 break;
674 } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
675 status = cf_util_get_int(child, &host->batch_max);
676 if (status != 0)
677 break;
678 } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
679 status = cf_util_get_int(child, &host->batch_timeout);
680 if (status != 0)
681 break;
682 } else if (strcasecmp("Timeout", child->key) == 0) {
683 status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
684 if (status != 0)
685 break;
686 } else if (strcasecmp("Port", child->key) == 0) {
687 host->port = cf_util_get_port_number(child);
688 if (host->port == -1) {
689 ERROR("write_riemann plugin: Invalid argument "
690 "configured for the \"Port\" "
691 "option.");
692 break;
693 }
694 } else if (strcasecmp("Protocol", child->key) == 0) {
695 char tmp[16];
696 status = cf_util_get_string_buffer(child, tmp, sizeof(tmp));
697 if (status != 0) {
698 ERROR("write_riemann plugin: cf_util_get_"
699 "string_buffer failed with "
700 "status %i.",
701 status);
702 break;
703 }
705 if (strcasecmp("UDP", tmp) == 0)
706 host->client_type = RIEMANN_CLIENT_UDP;
707 else if (strcasecmp("TCP", tmp) == 0)
708 host->client_type = RIEMANN_CLIENT_TCP;
709 else if (strcasecmp("TLS", tmp) == 0)
710 host->client_type = RIEMANN_CLIENT_TLS;
711 else
712 WARNING("write_riemann plugin: The value "
713 "\"%s\" is not valid for the "
714 "\"Protocol\" option. Use "
715 "either \"UDP\", \"TCP\" or \"TLS\".",
716 tmp);
717 } else if (strcasecmp("TLSCAFile", child->key) == 0) {
718 status = cf_util_get_string(child, &host->tls_ca_file);
719 if (status != 0) {
720 ERROR("write_riemann plugin: cf_util_get_"
721 "string_buffer failed with "
722 "status %i.",
723 status);
724 break;
725 }
726 } else if (strcasecmp("TLSCertFile", child->key) == 0) {
727 status = cf_util_get_string(child, &host->tls_cert_file);
728 if (status != 0) {
729 ERROR("write_riemann plugin: cf_util_get_"
730 "string_buffer failed with "
731 "status %i.",
732 status);
733 break;
734 }
735 } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
736 status = cf_util_get_string(child, &host->tls_key_file);
737 if (status != 0) {
738 ERROR("write_riemann plugin: cf_util_get_"
739 "string_buffer failed with "
740 "status %i.",
741 status);
742 break;
743 }
744 } else if (strcasecmp("StoreRates", child->key) == 0) {
745 status = cf_util_get_boolean(child, &host->store_rates);
746 if (status != 0)
747 break;
748 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
749 status = cf_util_get_boolean(child, &host->always_append_ds);
750 if (status != 0)
751 break;
752 } else if (strcasecmp("TTLFactor", child->key) == 0) {
753 double tmp = NAN;
754 status = cf_util_get_double(child, &tmp);
755 if (status != 0)
756 break;
757 if (tmp >= 2.0) {
758 host->ttl_factor = tmp;
759 } else if (tmp >= 1.0) {
760 NOTICE("write_riemann plugin: The configured "
761 "TTLFactor is very small "
762 "(%.1f). A value of 2.0 or "
763 "greater is recommended.",
764 tmp);
765 host->ttl_factor = tmp;
766 } else if (tmp > 0.0) {
767 WARNING("write_riemann plugin: The configured "
768 "TTLFactor is too small to be "
769 "useful (%.1f). I'll use it "
770 "since the user knows best, "
771 "but under protest.",
772 tmp);
773 host->ttl_factor = tmp;
774 } else { /* zero, negative and NAN */
775 ERROR("write_riemann plugin: The configured "
776 "TTLFactor is invalid (%.1f).",
777 tmp);
778 }
779 } else {
780 WARNING("write_riemann plugin: ignoring unknown config "
781 "option: \"%s\"",
782 child->key);
783 }
784 }
785 if (status != 0) {
786 wrr_free(host);
787 return status;
788 }
790 ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
791 host->name);
792 ud.data = host;
793 ud.free_func = wrr_free;
795 pthread_mutex_lock(&host->lock);
797 status = plugin_register_write(callback_name, wrr_write, &ud);
799 if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
800 ud.free_func = NULL;
801 plugin_register_flush(callback_name, wrr_batch_flush, &ud);
802 }
803 if (status != 0)
804 WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
805 "failed with status %i.",
806 callback_name, status);
807 else /* success */
808 host->reference_count++;
810 status = plugin_register_notification(callback_name, wrr_notification, &ud);
811 if (status != 0)
812 WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
813 "failed with status %i.",
814 callback_name, status);
815 else /* success */
816 host->reference_count++;
818 if (host->reference_count <= 1) {
819 /* Both callbacks failed => free memory.
820 * We need to unlock here, because riemann_free() will lock.
821 * This is not a race condition, because we're the only one
822 * holding a reference. */
823 pthread_mutex_unlock(&host->lock);
824 wrr_free(host);
825 return (-1);
826 }
828 host->reference_count--;
829 pthread_mutex_unlock(&host->lock);
831 return status;
832 } /* }}} int wrr_config_node */
834 static int wrr_config(oconfig_item_t *ci) /* {{{ */
835 {
836 int i;
837 oconfig_item_t *child;
838 int status;
840 for (i = 0; i < ci->children_num; i++) {
841 child = &ci->children[i];
843 if (strcasecmp("Node", child->key) == 0) {
844 wrr_config_node(child);
845 } else if (strcasecmp(child->key, "attribute") == 0) {
846 char *key = NULL;
847 char *val = NULL;
849 if (child->values_num != 2) {
850 WARNING("riemann attributes need both a key and a value.");
851 return (-1);
852 }
853 if (child->values[0].type != OCONFIG_TYPE_STRING ||
854 child->values[1].type != OCONFIG_TYPE_STRING) {
855 WARNING("riemann attribute needs string arguments.");
856 return (-1);
857 }
858 if ((key = strdup(child->values[0].value.string)) == NULL) {
859 WARNING("cannot allocate memory for attribute key.");
860 return (-1);
861 }
862 if ((val = strdup(child->values[1].value.string)) == NULL) {
863 WARNING("cannot allocate memory for attribute value.");
864 sfree(key);
865 return (-1);
866 }
867 strarray_add(&riemann_attrs, &riemann_attrs_num, key);
868 strarray_add(&riemann_attrs, &riemann_attrs_num, val);
869 DEBUG("write_riemann: got attr: %s => %s", key, val);
870 sfree(key);
871 sfree(val);
872 } else if (strcasecmp(child->key, "tag") == 0) {
873 char *tmp = NULL;
874 status = cf_util_get_string(child, &tmp);
875 if (status != 0)
876 continue;
878 strarray_add(&riemann_tags, &riemann_tags_num, tmp);
879 DEBUG("write_riemann plugin: Got tag: %s", tmp);
880 sfree(tmp);
881 } else {
882 WARNING("write_riemann plugin: Ignoring unknown "
883 "configuration option \"%s\" at top level.",
884 child->key);
885 }
886 }
887 return (0);
888 } /* }}} int wrr_config */
890 void module_register(void) {
891 plugin_register_complex_config("write_riemann", wrr_config);
892 }
894 /* vim: set sw=8 sts=8 ts=8 noet : */