1 /**
2 * collectd - src/write_sensu.c
3 * Copyright (C) 2015 Fabrice A. Marie
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Fabrice A. Marie <fabrice at kibinlabs.com>
25 */
27 #define _GNU_SOURCE
29 #include "collectd.h"
31 #include "plugin.h"
32 #include "common.h"
33 #include "configfile.h"
34 #include "utils_cache.h"
35 #include <arpa/inet.h>
36 #include <errno.h>
37 #include <netdb.h>
38 #include <inttypes.h>
39 #include <stddef.h>
41 #include <stdlib.h>
42 #define SENSU_HOST "localhost"
43 #define SENSU_PORT "3030"
45 #ifdef HAVE_ASPRINTF
46 #define my_asprintf asprintf
47 #define my_vasprintf vasprintf
48 #else
49 /*
50 * asprintf() is available from Solaris 10 update 11.
51 * For older versions, use asprintf() portable implementation from
52 * https://github.com/littlstar/asprintf.c/blob/master/
53 * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
54 */
56 static int my_vasprintf(char **str, const char *fmt, va_list args) {
57 int size = 0;
58 va_list tmpa;
59 // copy
60 va_copy(tmpa, args);
61 // apply variadic arguments to
62 // sprintf with format to get size
63 size = vsnprintf(NULL, size, fmt, tmpa);
64 // toss args
65 va_end(tmpa);
66 // return -1 to be compliant if
67 // size is less than 0
68 if (size < 0) { return -1; }
69 // alloc with size plus 1 for `\0'
70 *str = (char *) malloc(size + 1);
71 // return -1 to be compliant
72 // if pointer is `NULL'
73 if (NULL == *str) { return -1; }
74 // format string with original
75 // variadic arguments and set new size
76 size = vsprintf(*str, fmt, args);
77 return size;
78 }
80 static int my_asprintf(char **str, const char *fmt, ...) {
81 int size = 0;
82 va_list args;
83 // init variadic argumens
84 va_start(args, fmt);
85 // format and get size
86 size = my_vasprintf(str, fmt, args);
87 // toss args
88 va_end(args);
89 return size;
90 }
92 #endif
94 struct str_list {
95 int nb_strs;
96 char **strs;
97 };
99 struct sensu_host {
100 char *name;
101 char *event_service_prefix;
102 struct str_list metric_handlers;
103 struct str_list notification_handlers;
104 #define F_READY 0x01
105 uint8_t flags;
106 pthread_mutex_t lock;
107 _Bool notifications;
108 _Bool metrics;
109 _Bool store_rates;
110 _Bool always_append_ds;
111 char *separator;
112 char *node;
113 char *service;
114 int s;
115 struct addrinfo *res;
116 int reference_count;
117 };
119 static char *sensu_tags = NULL;
120 static char **sensu_attrs = NULL;
121 static size_t sensu_attrs_num;
123 static int add_str_to_list(struct str_list *strs,
124 const char *str_to_add) /* {{{ */
125 {
126 char **old_strs_ptr = strs->strs;
127 char *newstr = strdup(str_to_add);
128 if (newstr == NULL) {
129 ERROR("write_sensu plugin: Unable to alloc memory");
130 return -1;
131 }
132 strs->strs = realloc(strs->strs, sizeof(char *) *(strs->nb_strs + 1));
133 if (strs->strs == NULL) {
134 strs->strs = old_strs_ptr;
135 free(newstr);
136 ERROR("write_sensu plugin: Unable to alloc memory");
137 return -1;
138 }
139 strs->strs[strs->nb_strs] = newstr;
140 strs->nb_strs++;
141 return 0;
142 }
143 /* }}} int add_str_to_list */
145 static void free_str_list(struct str_list *strs) /* {{{ */
146 {
147 for (int i=0; i<strs->nb_strs; i++)
148 free(strs->strs[i]);
149 free(strs->strs);
150 }
151 /* }}} void free_str_list */
153 static int sensu_connect(struct sensu_host *host) /* {{{ */
154 {
155 int e;
156 char const *node;
157 char const *service;
159 // Resolve the target if we haven't done already
160 if (!(host->flags & F_READY)) {
161 memset(&service, 0, sizeof(service));
162 host->res = NULL;
164 node = (host->node != NULL) ? host->node : SENSU_HOST;
165 service = (host->service != NULL) ? host->service : SENSU_PORT;
167 struct addrinfo ai_hints = {
168 .ai_family = AF_INET,
169 .ai_flags = AI_ADDRCONFIG,
170 .ai_socktype = SOCK_STREAM
171 };
173 if ((e = getaddrinfo(node, service, &ai_hints, &(host->res))) != 0) {
174 ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s",
175 node, gai_strerror(e));
176 return -1;
177 }
178 DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s",
179 node, service);
180 host->flags |= F_READY;
181 }
183 struct linger so_linger;
184 host->s = -1;
185 for (struct addrinfo *ai = host->res; ai != NULL; ai = ai->ai_next) {
186 // create the socket
187 if ((host->s = socket(ai->ai_family,
188 ai->ai_socktype,
189 ai->ai_protocol)) == -1) {
190 continue;
191 }
193 // Set very low close() lingering
194 so_linger.l_onoff = 1;
195 so_linger.l_linger = 3;
196 if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger) != 0)
197 WARNING("write_sensu plugin: failed to set socket close() lingering");
199 set_sock_opts(host->s);
201 // connect the socket
202 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
203 close(host->s);
204 host->s = -1;
205 continue;
206 }
207 DEBUG("write_sensu plugin: connected");
208 break;
209 }
211 if (host->s < 0) {
212 WARNING("write_sensu plugin: Unable to connect to sensu client");
213 return -1;
214 }
215 return 0;
216 } /* }}} int sensu_connect */
218 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
219 {
220 if (host->s != -1)
221 close(host->s);
222 host->s = -1;
224 } /* }}} void sensu_close_socket */
226 static char *build_json_str_list(const char *tag, struct str_list const *list) /* {{{ */
227 {
228 int res;
229 char *ret_str = NULL;
230 char *temp_str;
231 if (list->nb_strs == 0) {
232 ret_str = malloc(sizeof(char));
233 if (ret_str == NULL) {
234 ERROR("write_sensu plugin: Unable to alloc memory");
235 return NULL;
236 }
237 ret_str[0] = '\0';
238 }
240 res = my_asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
241 if (res == -1) {
242 ERROR("write_sensu plugin: Unable to alloc memory");
243 free(ret_str);
244 return NULL;
245 }
246 for (int i=1; i<list->nb_strs; i++) {
247 res = my_asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
248 free(temp_str);
249 if (res == -1) {
250 ERROR("write_sensu plugin: Unable to alloc memory");
251 return NULL;
252 }
253 temp_str = ret_str;
254 }
255 res = my_asprintf(&ret_str, "%s]", temp_str);
256 free(temp_str);
257 if (res == -1) {
258 ERROR("write_sensu plugin: Unable to alloc memory");
259 return NULL;
260 }
262 return ret_str;
263 } /* }}} char *build_json_str_list*/
265 static int sensu_format_name2(char *ret, int ret_len,
266 const char *hostname,
267 const char *plugin, const char *plugin_instance,
268 const char *type, const char *type_instance,
269 const char *separator)
270 {
271 char *buffer;
272 size_t buffer_size;
274 buffer = ret;
275 buffer_size = (size_t) ret_len;
277 #define APPEND(str) do { \
278 size_t l = strlen (str); \
279 if (l >= buffer_size) \
280 return (ENOBUFS); \
281 memcpy (buffer, (str), l); \
282 buffer += l; buffer_size -= l; \
283 } while (0)
285 assert (plugin != NULL);
286 assert (type != NULL);
288 APPEND (hostname);
289 APPEND (separator);
290 APPEND (plugin);
291 if ((plugin_instance != NULL) && (plugin_instance[0] != 0))
292 {
293 APPEND ("-");
294 APPEND (plugin_instance);
295 }
296 APPEND (separator);
297 APPEND (type);
298 if ((type_instance != NULL) && (type_instance[0] != 0))
299 {
300 APPEND ("-");
301 APPEND (type_instance);
302 }
303 assert (buffer_size > 0);
304 buffer[0] = 0;
306 #undef APPEND
307 return (0);
308 } /* int sensu_format_name2 */
310 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
311 {
312 int len=strlen(orig_name);
313 for (int i=0; i<len; i++) {
314 // some plugins like ipmi generate special characters in metric name
315 switch(orig_name[i]) {
316 case '(': orig_name[i] = '_'; break;
317 case ')': orig_name[i] = '_'; break;
318 case ' ': orig_name[i] = '_'; break;
319 case '"': orig_name[i] = '_'; break;
320 case '\'': orig_name[i] = '_'; break;
321 case '+': orig_name[i] = '_'; break;
322 }
323 }
324 } /* }}} char *replace_sensu_name_reserved */
326 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
327 data_set_t const *ds,
328 value_list_t const *vl, size_t index,
329 gauge_t const *rates,
330 int status)
331 {
332 char name_buffer[5 * DATA_MAX_NAME_LEN];
333 char service_buffer[6 * DATA_MAX_NAME_LEN];
334 char *ret_str;
335 char *temp_str;
336 char *value_str;
337 int res;
338 // First part of the JSON string
339 const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
341 char *handlers_str = build_json_str_list("handlers", &(host->metric_handlers));
342 if (handlers_str == NULL) {
343 ERROR("write_sensu plugin: Unable to alloc memory");
344 return NULL;
345 }
347 // incorporate the handlers
348 if (strlen(handlers_str) == 0) {
349 free(handlers_str);
350 ret_str = strdup(part1);
351 if (ret_str == NULL) {
352 ERROR("write_sensu plugin: Unable to alloc memory");
353 return NULL;
354 }
355 }
356 else {
357 res = my_asprintf(&ret_str, "%s, %s", part1, handlers_str);
358 free(handlers_str);
359 if (res == -1) {
360 ERROR("write_sensu plugin: Unable to alloc memory");
361 return NULL;
362 }
363 }
365 // incorporate the plugin name information
366 res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, vl->plugin);
367 free(ret_str);
368 if (res == -1) {
369 ERROR("write_sensu plugin: Unable to alloc memory");
370 return NULL;
371 }
372 ret_str = temp_str;
374 // incorporate the plugin type
375 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, vl->type);
376 free(ret_str);
377 if (res == -1) {
378 ERROR("write_sensu plugin: Unable to alloc memory");
379 return NULL;
380 }
381 ret_str = temp_str;
383 // incorporate the plugin instance if any
384 if (vl->plugin_instance[0] != 0) {
385 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, vl->plugin_instance);
386 free(ret_str);
387 if (res == -1) {
388 ERROR("write_sensu plugin: Unable to alloc memory");
389 return NULL;
390 }
391 ret_str = temp_str;
392 }
394 // incorporate the plugin type instance if any
395 if (vl->type_instance[0] != 0) {
396 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, vl->type_instance);
397 free(ret_str);
398 if (res == -1) {
399 ERROR("write_sensu plugin: Unable to alloc memory");
400 return NULL;
401 }
402 ret_str = temp_str;
403 }
405 // incorporate the data source type
406 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
407 char ds_type[DATA_MAX_NAME_LEN];
408 ssnprintf (ds_type, sizeof (ds_type), "%s:rate", DS_TYPE_TO_STRING(ds->ds[index].type));
409 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, ds_type);
410 free(ret_str);
411 if (res == -1) {
412 ERROR("write_sensu plugin: Unable to alloc memory");
413 return NULL;
414 }
415 ret_str = temp_str;
416 } else {
417 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
418 free(ret_str);
419 if (res == -1) {
420 ERROR("write_sensu plugin: Unable to alloc memory");
421 return NULL;
422 }
423 ret_str = temp_str;
424 }
426 // incorporate the data source name
427 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"", ret_str, ds->ds[index].name);
428 free(ret_str);
429 if (res == -1) {
430 ERROR("write_sensu plugin: Unable to alloc memory");
431 return NULL;
432 }
433 ret_str = temp_str;
435 // incorporate the data source index
436 {
437 char ds_index[DATA_MAX_NAME_LEN];
438 ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
439 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s", ret_str, ds_index);
440 free(ret_str);
441 if (res == -1) {
442 ERROR("write_sensu plugin: Unable to alloc memory");
443 return NULL;
444 }
445 ret_str = temp_str;
446 }
448 // add key value attributes from config if any
449 for (size_t i = 0; i < sensu_attrs_num; i += 2) {
450 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
451 free(ret_str);
452 if (res == -1) {
453 ERROR("write_sensu plugin: Unable to alloc memory");
454 return NULL;
455 }
456 ret_str = temp_str;
457 }
459 // incorporate sensu tags from config if any
460 if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
461 res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
462 free(ret_str);
463 if (res == -1) {
464 ERROR("write_sensu plugin: Unable to alloc memory");
465 return NULL;
466 }
467 ret_str = temp_str;
468 }
470 // calculate the value and set to a string
471 if (ds->ds[index].type == DS_TYPE_GAUGE) {
472 res = my_asprintf(&value_str, GAUGE_FORMAT, vl->values[index].gauge);
473 if (res == -1) {
474 free(ret_str);
475 ERROR("write_sensu plugin: Unable to alloc memory");
476 return NULL;
477 }
478 } else if (rates != NULL) {
479 res = my_asprintf(&value_str, GAUGE_FORMAT, rates[index]);
480 if (res == -1) {
481 free(ret_str);
482 ERROR("write_sensu plugin: Unable to alloc memory");
483 return NULL;
484 }
485 } else {
486 if (ds->ds[index].type == DS_TYPE_DERIVE) {
487 res = my_asprintf(&value_str, "%"PRIi64, vl->values[index].derive);
488 if (res == -1) {
489 free(ret_str);
490 ERROR("write_sensu plugin: Unable to alloc memory");
491 return NULL;
492 }
493 }
494 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) {
495 res = my_asprintf(&value_str, "%"PRIu64, vl->values[index].absolute);
496 if (res == -1) {
497 free(ret_str);
498 ERROR("write_sensu plugin: Unable to alloc memory");
499 return NULL;
500 }
501 }
502 else {
503 res = my_asprintf(&value_str, "%llu", vl->values[index].counter);
504 if (res == -1) {
505 free(ret_str);
506 ERROR("write_sensu plugin: Unable to alloc memory");
507 return NULL;
508 }
509 }
510 }
512 // Generate the full service name
513 sensu_format_name2(name_buffer, sizeof(name_buffer),
514 vl->host, vl->plugin, vl->plugin_instance,
515 vl->type, vl->type_instance, host->separator);
516 if (host->always_append_ds || (ds->ds_num > 1)) {
517 if (host->event_service_prefix == NULL)
518 ssnprintf(service_buffer, sizeof(service_buffer), "%s.%s",
519 name_buffer, ds->ds[index].name);
520 else
521 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
522 host->event_service_prefix, name_buffer, ds->ds[index].name);
523 } else {
524 if (host->event_service_prefix == NULL)
525 sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
526 else
527 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
528 host->event_service_prefix, name_buffer);
529 }
531 // Replace collectd sensor name reserved characters so that time series DB is happy
532 in_place_replace_sensu_name_reserved(service_buffer);
534 // finalize the buffer by setting the output and closing curly bracket
535 res = my_asprintf(&temp_str, "%s, \"output\": \"%s %s %lld\"}\n",ret_str, service_buffer, value_str, (long long)CDTIME_T_TO_TIME_T(vl->time));
536 free(ret_str);
537 free(value_str);
538 if (res == -1) {
539 ERROR("write_sensu plugin: Unable to alloc memory");
540 return NULL;
541 }
542 ret_str = temp_str;
544 DEBUG("write_sensu plugin: Successfully created json for metric: "
545 "host = \"%s\", service = \"%s\"",
546 vl->host, service_buffer);
547 return ret_str;
548 } /* }}} char *sensu_value_to_json */
550 /*
551 * Uses replace_str2() implementation from
552 * http://creativeandcritical.net/str-replace-c/
553 * copyright (c) Laird Shaw, under public domain.
554 */
555 static char *replace_str(const char *str, const char *old, /* {{{ */
556 const char *new)
557 {
558 char *ret, *r;
559 const char *p, *q;
560 size_t oldlen = strlen(old);
561 size_t count = strlen(new);
562 size_t retlen;
563 size_t newlen = count;
564 int samesize = (oldlen == newlen);
566 if (!samesize) {
567 for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
568 count++;
569 /* This is undefined if p - str > PTRDIFF_MAX */
570 retlen = p - str + strlen(p) + count * (newlen - oldlen);
571 } else
572 retlen = strlen(str);
574 ret = calloc(1, retlen + 1);
575 if (ret == NULL)
576 return NULL;
577 // added to original: not optimized, but keeps valgrind happy.
579 r = ret;
580 p = str;
581 while (1) {
582 /* If the old and new strings are different lengths - in other
583 * words we have already iterated through with strstr above,
584 * and thus we know how many times we need to call it - then we
585 * can avoid the final (potentially lengthy) call to strstr,
586 * which we already know is going to return NULL, by
587 * decrementing and checking count.
588 */
589 if (!samesize && !count--)
590 break;
591 /* Otherwise i.e. when the old and new strings are the same
592 * length, and we don't know how many times to call strstr,
593 * we must check for a NULL return here (we check it in any
594 * event, to avoid further conditions, and because there's
595 * no harm done with the check even when the old and new
596 * strings are different lengths).
597 */
598 if ((q = strstr(p, old)) == NULL)
599 break;
600 /* This is undefined if q - p > PTRDIFF_MAX */
601 ptrdiff_t l = q - p;
602 memcpy(r, p, l);
603 r += l;
604 memcpy(r, new, newlen);
605 r += newlen;
606 p = q + oldlen;
607 }
608 strncpy(r, p, strlen(p));
610 return ret;
611 } /* }}} char *replace_str */
613 static char *replace_json_reserved(const char *message) /* {{{ */
614 {
615 char *msg = replace_str(message, "\\", "\\\\");
616 if (msg == NULL) {
617 ERROR("write_sensu plugin: Unable to alloc memory");
618 return NULL;
619 }
620 char *tmp = replace_str(msg, "\"", "\\\"");
621 free(msg);
622 if (tmp == NULL) {
623 ERROR("write_sensu plugin: Unable to alloc memory");
624 return NULL;
625 }
626 msg = replace_str(tmp, "\n", "\\\n");
627 free(tmp);
628 if (msg == NULL) {
629 ERROR("write_sensu plugin: Unable to alloc memory");
630 return NULL;
631 }
632 return msg;
633 } /* }}} char *replace_json_reserved */
635 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
636 notification_t const *n)
637 {
638 char service_buffer[6 * DATA_MAX_NAME_LEN];
639 char const *severity;
640 char *ret_str;
641 char *temp_str;
642 int status;
643 size_t i;
644 int res;
645 // add the severity/status
646 switch (n->severity) {
647 case NOTIF_OKAY:
648 severity = "OK";
649 status = 0;
650 break;
651 case NOTIF_WARNING:
652 severity = "WARNING";
653 status = 1;
654 break;
655 case NOTIF_FAILURE:
656 severity = "CRITICAL";
657 status = 2;
658 break;
659 default:
660 severity = "UNKNOWN";
661 status = 3;
662 }
663 res = my_asprintf(&temp_str, "{\"status\": %d", status);
664 if (res == -1) {
665 ERROR("write_sensu plugin: Unable to alloc memory");
666 return NULL;
667 }
668 ret_str = temp_str;
670 // incorporate the timestamp
671 res = my_asprintf(&temp_str, "%s, \"timestamp\": %lld", ret_str, (long long)CDTIME_T_TO_TIME_T(n->time));
672 free(ret_str);
673 if (res == -1) {
674 ERROR("write_sensu plugin: Unable to alloc memory");
675 return NULL;
676 }
677 ret_str = temp_str;
679 char *handlers_str = build_json_str_list("handlers", &(host->notification_handlers));
680 if (handlers_str == NULL) {
681 ERROR("write_sensu plugin: Unable to alloc memory");
682 free(ret_str);
683 return NULL;
684 }
685 // incorporate the handlers
686 if (strlen(handlers_str) != 0) {
687 res = my_asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
688 free(ret_str);
689 free(handlers_str);
690 if (res == -1) {
691 ERROR("write_sensu plugin: Unable to alloc memory");
692 return NULL;
693 }
694 ret_str = temp_str;
695 } else {
696 free(handlers_str);
697 }
699 // incorporate the plugin name information if any
700 if (n->plugin[0] != 0) {
701 res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, n->plugin);
702 free(ret_str);
703 if (res == -1) {
704 ERROR("write_sensu plugin: Unable to alloc memory");
705 return NULL;
706 }
707 ret_str = temp_str;
708 }
710 // incorporate the plugin type if any
711 if (n->type[0] != 0) {
712 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, n->type);
713 free(ret_str);
714 if (res == -1) {
715 ERROR("write_sensu plugin: Unable to alloc memory");
716 return NULL;
717 }
718 ret_str = temp_str;
719 }
721 // incorporate the plugin instance if any
722 if (n->plugin_instance[0] != 0) {
723 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, n->plugin_instance);
724 free(ret_str);
725 if (res == -1) {
726 ERROR("write_sensu plugin: Unable to alloc memory");
727 return NULL;
728 }
729 ret_str = temp_str;
730 }
732 // incorporate the plugin type instance if any
733 if (n->type_instance[0] != 0) {
734 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, n->type_instance);
735 free(ret_str);
736 if (res == -1) {
737 ERROR("write_sensu plugin: Unable to alloc memory");
738 return NULL;
739 }
740 ret_str = temp_str;
741 }
743 // add key value attributes from config if any
744 for (i = 0; i < sensu_attrs_num; i += 2) {
745 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
746 free(ret_str);
747 if (res == -1) {
748 ERROR("write_sensu plugin: Unable to alloc memory");
749 return NULL;
750 }
751 ret_str = temp_str;
752 }
754 // incorporate sensu tags from config if any
755 if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
756 res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
757 free(ret_str);
758 if (res == -1) {
759 ERROR("write_sensu plugin: Unable to alloc memory");
760 return NULL;
761 }
762 ret_str = temp_str;
763 }
765 // incorporate the service name
766 sensu_format_name2(service_buffer, sizeof(service_buffer),
767 /* host */ "", n->plugin, n->plugin_instance,
768 n->type, n->type_instance, host->separator);
769 // replace sensu event name chars that are considered illegal
770 in_place_replace_sensu_name_reserved(service_buffer);
771 res = my_asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str, &service_buffer[1]);
772 free(ret_str);
773 if (res == -1) {
774 ERROR("write_sensu plugin: Unable to alloc memory");
775 return NULL;
776 }
777 ret_str = temp_str;
779 // incorporate the check output
780 if (n->message[0] != 0) {
781 char *msg = replace_json_reserved(n->message);
782 if (msg == NULL) {
783 ERROR("write_sensu plugin: Unable to alloc memory");
784 free(ret_str);
785 return NULL;
786 }
787 res = my_asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str, severity, msg);
788 free(ret_str);
789 free(msg);
790 if (res == -1) {
791 ERROR("write_sensu plugin: Unable to alloc memory");
792 return NULL;
793 }
794 ret_str = temp_str;
795 }
797 // Pull in values from threshold and add extra attributes
798 for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
799 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE) {
800 res = my_asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str, meta->nm_value.nm_double);
801 free(ret_str);
802 if (res == -1) {
803 ERROR("write_sensu plugin: Unable to alloc memory");
804 return NULL;
805 }
806 ret_str = temp_str;
807 }
808 if (meta->type == NM_TYPE_STRING) {
809 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name, meta->nm_value.nm_string);
810 free(ret_str);
811 if (res == -1) {
812 ERROR("write_sensu plugin: Unable to alloc memory");
813 return NULL;
814 }
815 ret_str = temp_str;
816 }
817 }
819 // close the curly bracket
820 res = my_asprintf(&temp_str, "%s}\n", ret_str);
821 free(ret_str);
822 if (res == -1) {
823 ERROR("write_sensu plugin: Unable to alloc memory");
824 return NULL;
825 }
826 ret_str = temp_str;
828 DEBUG("write_sensu plugin: Successfully created JSON for notification: "
829 "host = \"%s\", service = \"%s\", state = \"%s\"",
830 n->host, service_buffer, severity);
831 return ret_str;
832 } /* }}} char *sensu_notification_to_json */
834 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
835 {
836 int status = 0;
837 size_t buffer_len;
839 status = sensu_connect(host);
840 if (status != 0)
841 return status;
843 buffer_len = strlen(msg);
845 status = (int) swrite(host->s, msg, buffer_len);
846 sensu_close_socket(host);
848 if (status != 0) {
849 char errbuf[1024];
850 ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
851 (host->node != NULL) ? host->node : SENSU_HOST,
852 (host->service != NULL) ? host->service : SENSU_PORT,
853 sstrerror(errno, errbuf, sizeof(errbuf)));
854 return -1;
855 }
857 return 0;
858 } /* }}} int sensu_send_msg */
861 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
862 {
863 int status = 0;
865 status = sensu_send_msg(host, msg);
866 if (status != 0) {
867 host->flags &= ~F_READY;
868 if (host->res != NULL) {
869 freeaddrinfo(host->res);
870 host->res = NULL;
871 }
872 return status;
873 }
875 return 0;
876 } /* }}} int sensu_send */
879 static int sensu_write(const data_set_t *ds, /* {{{ */
880 const value_list_t *vl,
881 user_data_t *ud)
882 {
883 int status = 0;
884 int statuses[vl->values_len];
885 struct sensu_host *host = ud->data;
886 gauge_t *rates = NULL;
887 char *msg;
889 pthread_mutex_lock(&host->lock);
890 memset(statuses, 0, vl->values_len * sizeof(*statuses));
892 if (host->store_rates) {
893 rates = uc_get_rate(ds, vl);
894 if (rates == NULL) {
895 ERROR("write_sensu plugin: uc_get_rate failed.");
896 pthread_mutex_unlock(&host->lock);
897 return -1;
898 }
899 }
900 for (size_t i = 0; i < vl->values_len; i++) {
901 msg = sensu_value_to_json(host, ds, vl, (int) i, rates, statuses[i]);
902 if (msg == NULL) {
903 sfree(rates);
904 pthread_mutex_unlock(&host->lock);
905 return -1;
906 }
907 status = sensu_send(host, msg);
908 free(msg);
909 if (status != 0) {
910 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
911 pthread_mutex_unlock(&host->lock);
912 sfree(rates);
913 return status;
914 }
915 }
916 sfree(rates);
917 pthread_mutex_unlock(&host->lock);
918 return status;
919 } /* }}} int sensu_write */
921 static int sensu_notification(const notification_t *n, user_data_t *ud) /* {{{ */
922 {
923 int status;
924 struct sensu_host *host = ud->data;
925 char *msg;
927 pthread_mutex_lock(&host->lock);
929 msg = sensu_notification_to_json(host, n);
930 if (msg == NULL) {
931 pthread_mutex_unlock(&host->lock);
932 return -1;
933 }
935 status = sensu_send(host, msg);
936 free(msg);
937 if (status != 0)
938 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
939 pthread_mutex_unlock(&host->lock);
941 return status;
942 } /* }}} int sensu_notification */
944 static void sensu_free(void *p) /* {{{ */
945 {
946 struct sensu_host *host = p;
948 if (host == NULL)
949 return;
951 pthread_mutex_lock(&host->lock);
953 host->reference_count--;
954 if (host->reference_count > 0) {
955 pthread_mutex_unlock(&host->lock);
956 return;
957 }
959 sensu_close_socket(host);
960 if (host->res != NULL) {
961 freeaddrinfo(host->res);
962 host->res = NULL;
963 }
964 sfree(host->service);
965 sfree(host->event_service_prefix);
966 sfree(host->name);
967 sfree(host->node);
968 sfree(host->separator);
969 free_str_list(&(host->metric_handlers));
970 free_str_list(&(host->notification_handlers));
971 pthread_mutex_destroy(&host->lock);
972 sfree(host);
973 } /* }}} void sensu_free */
976 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
977 {
978 struct sensu_host *host = NULL;
979 int status = 0;
980 oconfig_item_t *child;
981 char callback_name[DATA_MAX_NAME_LEN];
982 user_data_t ud;
984 if ((host = calloc(1, sizeof(*host))) == NULL) {
985 ERROR("write_sensu plugin: calloc failed.");
986 return ENOMEM;
987 }
988 pthread_mutex_init(&host->lock, NULL);
989 host->reference_count = 1;
990 host->node = NULL;
991 host->service = NULL;
992 host->notifications = 0;
993 host->metrics = 0;
994 host->store_rates = 1;
995 host->always_append_ds = 0;
996 host->metric_handlers.nb_strs = 0;
997 host->metric_handlers.strs = NULL;
998 host->notification_handlers.nb_strs = 0;
999 host->notification_handlers.strs = NULL;
1000 host->separator = strdup("/");
1001 if (host->separator == NULL) {
1002 ERROR("write_sensu plugin: Unable to alloc memory");
1003 sensu_free(host);
1004 return -1;
1005 }
1007 status = cf_util_get_string(ci, &host->name);
1008 if (status != 0) {
1009 WARNING("write_sensu plugin: Required host name is missing.");
1010 sensu_free(host);
1011 return -1;
1012 }
1014 for (int i = 0; i < ci->children_num; i++) {
1015 child = &ci->children[i];
1016 status = 0;
1018 if (strcasecmp("Host", child->key) == 0) {
1019 status = cf_util_get_string(child, &host->node);
1020 if (status != 0)
1021 break;
1022 } else if (strcasecmp("Notifications", child->key) == 0) {
1023 status = cf_util_get_boolean(child, &host->notifications);
1024 if (status != 0)
1025 break;
1026 } else if (strcasecmp("Metrics", child->key) == 0) {
1027 status = cf_util_get_boolean(child, &host->metrics);
1028 if (status != 0)
1029 break;
1030 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1031 status = cf_util_get_string(child, &host->event_service_prefix);
1032 if (status != 0)
1033 break;
1034 } else if (strcasecmp("Separator", child->key) == 0) {
1035 status = cf_util_get_string(child, &host->separator);
1036 if (status != 0)
1037 break;
1038 } else if (strcasecmp("MetricHandler", child->key) == 0) {
1039 char *temp_str = NULL;
1040 status = cf_util_get_string(child, &temp_str);
1041 if (status != 0)
1042 break;
1043 status = add_str_to_list(&(host->metric_handlers), temp_str);
1044 free(temp_str);
1045 if (status != 0)
1046 break;
1047 } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1048 char *temp_str = NULL;
1049 status = cf_util_get_string(child, &temp_str);
1050 if (status != 0)
1051 break;
1052 status = add_str_to_list(&(host->notification_handlers), temp_str);
1053 free(temp_str);
1054 if (status != 0)
1055 break;
1056 } else if (strcasecmp("Port", child->key) == 0) {
1057 status = cf_util_get_service(child, &host->service);
1058 if (status != 0) {
1059 ERROR("write_sensu plugin: Invalid argument "
1060 "configured for the \"Port\" "
1061 "option.");
1062 break;
1063 }
1064 } else if (strcasecmp("StoreRates", child->key) == 0) {
1065 status = cf_util_get_boolean(child, &host->store_rates);
1066 if (status != 0)
1067 break;
1068 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1069 status = cf_util_get_boolean(child,
1070 &host->always_append_ds);
1071 if (status != 0)
1072 break;
1073 } else {
1074 WARNING("write_sensu plugin: ignoring unknown config "
1075 "option: \"%s\"", child->key);
1076 }
1077 }
1078 if (status != 0) {
1079 sensu_free(host);
1080 return status;
1081 }
1083 if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1084 sensu_free(host);
1085 WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. Giving up.");
1086 return -1;
1087 }
1089 if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1090 sensu_free(host);
1091 WARNING("write_sensu plugin: notifications enabled but no NotificationHandler defined. Giving up.");
1092 return -1;
1093 }
1095 if ((host->notification_handlers.nb_strs > 0) && (host->notifications == 0)) {
1096 WARNING("write_sensu plugin: NotificationHandler given so forcing notifications to be enabled");
1097 host->notifications = 1;
1098 }
1100 if ((host->metric_handlers.nb_strs > 0) && (host->metrics == 0)) {
1101 WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be enabled");
1102 host->metrics = 1;
1103 }
1105 if (!(host->notifications || host->metrics)) {
1106 WARNING("write_sensu plugin: neither metrics nor notifications enabled. Giving up.");
1107 sensu_free(host);
1108 return -1;
1109 }
1111 ssnprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1112 ud.data = host;
1113 ud.free_func = sensu_free;
1115 pthread_mutex_lock(&host->lock);
1117 if (host->metrics) {
1118 status = plugin_register_write(callback_name, sensu_write, &ud);
1119 if (status != 0)
1120 WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1121 "failed with status %i.",
1122 callback_name, status);
1123 else /* success */
1124 host->reference_count++;
1125 }
1127 if (host->notifications) {
1128 status = plugin_register_notification(callback_name, sensu_notification, &ud);
1129 if (status != 0)
1130 WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1131 "failed with status %i.",
1132 callback_name, status);
1133 else
1134 host->reference_count++;
1135 }
1137 if (host->reference_count <= 1) {
1138 /* Both callbacks failed => free memory.
1139 * We need to unlock here, because sensu_free() will lock.
1140 * This is not a race condition, because we're the only one
1141 * holding a reference. */
1142 pthread_mutex_unlock(&host->lock);
1143 sensu_free(host);
1144 return -1;
1145 }
1147 host->reference_count--;
1148 pthread_mutex_unlock(&host->lock);
1150 return status;
1151 } /* }}} int sensu_config_node */
1153 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1154 {
1155 oconfig_item_t *child;
1156 int status;
1157 struct str_list sensu_tags_arr;
1159 sensu_tags_arr.nb_strs = 0;
1160 sensu_tags_arr.strs = NULL;
1162 for (int i = 0; i < ci->children_num; i++) {
1163 child = &ci->children[i];
1165 if (strcasecmp("Node", child->key) == 0) {
1166 sensu_config_node(child);
1167 } else if (strcasecmp(child->key, "attribute") == 0) {
1168 if (child->values_num != 2) {
1169 WARNING("sensu attributes need both a key and a value.");
1170 continue;
1171 }
1172 if (child->values[0].type != OCONFIG_TYPE_STRING ||
1173 child->values[1].type != OCONFIG_TYPE_STRING) {
1174 WARNING("sensu attribute needs string arguments.");
1175 continue;
1176 }
1178 strarray_add(&sensu_attrs, &sensu_attrs_num, child->values[0].value.string);
1179 strarray_add(&sensu_attrs, &sensu_attrs_num, child->values[1].value.string);
1181 DEBUG("write_sensu plugin: New attribute: %s => %s",
1182 child->values[0].value.string,
1183 child->values[1].value.string);
1184 } else if (strcasecmp(child->key, "tag") == 0) {
1185 char *tmp = NULL;
1186 status = cf_util_get_string(child, &tmp);
1187 if (status != 0)
1188 continue;
1190 status = add_str_to_list(&sensu_tags_arr, tmp);
1191 sfree(tmp);
1192 if (status != 0)
1193 continue;
1194 DEBUG("write_sensu plugin: Got tag: %s", tmp);
1195 } else {
1196 WARNING("write_sensu plugin: Ignoring unknown "
1197 "configuration option \"%s\" at top level.",
1198 child->key);
1199 }
1200 }
1201 if (sensu_tags_arr.nb_strs > 0) {
1202 sfree (sensu_tags);
1203 sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1204 free_str_list(&sensu_tags_arr);
1205 if (sensu_tags == NULL) {
1206 ERROR("write_sensu plugin: Unable to alloc memory");
1207 return -1;
1208 }
1209 }
1210 return 0;
1211 } /* }}} int sensu_config */
1213 void module_register(void)
1214 {
1215 plugin_register_complex_config("write_sensu", sensu_config);
1216 }
1218 /* vim: set sw=8 sts=8 ts=8 noet : */