1 /**
2 * collectd - src/write_sensu.c
3 * Copyright (C) 2015 Fabrice A. Marie
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Fabrice A. Marie <fabrice at kibinlabs.com>
25 */
27 #include "collectd.h"
28 #include "plugin.h"
29 #include "common.h"
30 #include "configfile.h"
31 #include "utils_cache.h"
32 #include <sys/socket.h>
33 #include <arpa/inet.h>
34 #include <errno.h>
35 #include <netdb.h>
36 #include <inttypes.h>
37 #include <pthread.h>
38 #include <stddef.h>
40 #include <stdlib.h>
41 #ifndef HAVE_ASPRINTF
42 /*
43 * Uses asprintf() portable implementation from
44 * https://github.com/littlstar/asprintf.c/blob/master/
45 * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
46 */
47 #include <stdio.h>
48 #include <stdarg.h>
50 int vasprintf(char **str, const char *fmt, va_list args) {
51 int size = 0;
52 va_list tmpa;
53 // copy
54 va_copy(tmpa, args);
55 // apply variadic arguments to
56 // sprintf with format to get size
57 size = vsnprintf(NULL, size, fmt, tmpa);
58 // toss args
59 va_end(tmpa);
60 // return -1 to be compliant if
61 // size is less than 0
62 if (size < 0) { return -1; }
63 // alloc with size plus 1 for `\0'
64 *str = (char *) malloc(size + 1);
65 // return -1 to be compliant
66 // if pointer is `NULL'
67 if (NULL == *str) { return -1; }
68 // format string with original
69 // variadic arguments and set new size
70 size = vsprintf(*str, fmt, args);
71 return size;
72 }
74 int asprintf(char **str, const char *fmt, ...) {
75 int size = 0;
76 va_list args;
77 // init variadic argumens
78 va_start(args, fmt);
79 // format and get size
80 size = vasprintf(str, fmt, args);
81 // toss args
82 va_end(args);
83 return size;
84 }
86 #endif
88 #define SENSU_HOST "localhost"
89 #define SENSU_PORT "3030"
91 struct str_list {
92 int nb_strs;
93 char **strs;
94 };
96 struct sensu_host {
97 char *name;
98 char *event_service_prefix;
99 struct str_list metric_handlers;
100 struct str_list notification_handlers;
101 #define F_READY 0x01
102 uint8_t flags;
103 pthread_mutex_t lock;
104 _Bool notifications;
105 _Bool metrics;
106 _Bool store_rates;
107 _Bool always_append_ds;
108 char *separator;
109 char *node;
110 char *service;
111 int s;
112 struct addrinfo *res;
113 int reference_count;
114 };
116 static char *sensu_tags;
117 static char **sensu_attrs;
118 static size_t sensu_attrs_num;
119 static const char *alloc_err ="write_sensu plugin: Unable to alloc memory";
121 static int add_str_to_list(struct str_list *strs,
122 const char *str_to_add) /* {{{ */
123 {
124 char **old_strs_ptr = strs->strs;
125 char *newstr = strdup(str_to_add);
126 if (newstr == NULL) {
127 ERROR(alloc_err);
128 return -1;
129 }
130 strs->strs = realloc(strs->strs, sizeof(char *) *(strs->nb_strs + 1));
131 if (strs->strs == NULL) {
132 strs->strs = old_strs_ptr;
133 free(newstr);
134 ERROR(alloc_err);
135 return -1;
136 }
137 strs->strs[strs->nb_strs] = newstr;
138 strs->nb_strs++;
139 return 0;
140 }
141 /* }}} int add_str_to_list */
143 static void free_str_list(struct str_list *strs) /* {{{ */
144 {
145 int i;
146 for (i=0; i<strs->nb_strs; i++)
147 free(strs->strs[i]);
148 free(strs->strs);
149 }
150 /* }}} void free_str_list */
152 static int sensu_connect(struct sensu_host *host) /* {{{ */
153 {
154 int e;
155 struct addrinfo *ai, hints;
156 char const *node;
157 char const *service;
159 // Resolve the target if we haven't done already
160 if (!(host->flags & F_READY)) {
161 memset(&hints, 0, sizeof(hints));
162 memset(&service, 0, sizeof(service));
163 host->res = NULL;
164 hints.ai_family = AF_INET;
165 hints.ai_socktype = SOCK_STREAM;
166 #ifdef AI_ADDRCONFIG
167 hints.ai_flags |= AI_ADDRCONFIG;
168 #endif
170 node = (host->node != NULL) ? host->node : SENSU_HOST;
171 service = (host->service != NULL) ? host->service : SENSU_PORT;
173 if ((e = getaddrinfo(node, service, &hints, &(host->res))) != 0) {
174 ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s",
175 node, gai_strerror(e));
176 return -1;
177 }
178 DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s",
179 node, service);
180 host->flags |= F_READY;
181 }
183 struct linger so_linger;
184 host->s = -1;
185 for (ai = host->res; ai != NULL; ai = ai->ai_next) {
186 // create the socket
187 if ((host->s = socket(ai->ai_family,
188 ai->ai_socktype,
189 ai->ai_protocol)) == -1) {
190 continue;
191 }
193 // Set very low close() lingering
194 so_linger.l_onoff = 1;
195 so_linger.l_linger = 3;
196 if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger) != 0)
197 WARNING("write_sensu plugin: failed to set socket close() lingering");
199 // connect the socket
200 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
201 close(host->s);
202 host->s = -1;
203 continue;
204 }
205 DEBUG("write_sensu plugin: connected");
206 break;
207 }
209 if (host->s < 0) {
210 WARNING("write_sensu plugin: Unable to connect to sensu client");
211 return -1;
212 }
213 return 0;
214 } /* }}} int sensu_connect */
216 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
217 {
218 if (host->s != -1)
219 close(host->s);
220 host->s = -1;
222 } /* }}} void sensu_close_socket */
224 static char *build_json_str_list(const char *tag, struct str_list const *list) /* {{{ */
225 {
226 int res;
227 char *ret_str;
228 char *temp_str;
229 int i;
230 if (list->nb_strs == 0) {
231 ret_str = malloc(sizeof(char));
232 if (ret_str == NULL) {
233 ERROR(alloc_err);
234 return NULL;
235 }
236 ret_str[0] = '\0';
237 }
239 res = asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
240 if (res == -1) {
241 ERROR(alloc_err);
242 return NULL;
243 }
244 for (i=1; i<list->nb_strs; i++) {
245 res = asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
246 free(temp_str);
247 if (res == -1) {
248 ERROR(alloc_err);
249 return NULL;
250 }
251 temp_str = ret_str;
252 }
253 res = asprintf(&ret_str, "%s]", temp_str);
254 free(temp_str);
255 if (res == -1) {
256 ERROR(alloc_err);
257 return NULL;
258 }
260 return ret_str;
261 } /* }}} char *build_json_str_list*/
263 int sensu_format_name2(char *ret, int ret_len,
264 const char *hostname,
265 const char *plugin, const char *plugin_instance,
266 const char *type, const char *type_instance,
267 const char *separator)
268 {
269 char *buffer;
270 size_t buffer_size;
272 buffer = ret;
273 buffer_size = (size_t) ret_len;
275 #define APPEND(str) do { \
276 size_t l = strlen (str); \
277 if (l >= buffer_size) \
278 return (ENOBUFS); \
279 memcpy (buffer, (str), l); \
280 buffer += l; buffer_size -= l; \
281 } while (0)
283 assert (plugin != NULL);
284 assert (type != NULL);
286 APPEND (hostname);
287 APPEND (separator);
288 APPEND (plugin);
289 if ((plugin_instance != NULL) && (plugin_instance[0] != 0))
290 {
291 APPEND ("-");
292 APPEND (plugin_instance);
293 }
294 APPEND (separator);
295 APPEND (type);
296 if ((type_instance != NULL) && (type_instance[0] != 0))
297 {
298 APPEND ("-");
299 APPEND (type_instance);
300 }
301 assert (buffer_size > 0);
302 buffer[0] = 0;
304 #undef APPEND
305 return (0);
306 } /* int sensu_format_name2 */
308 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
309 {
310 int i;
311 int len=strlen(orig_name);
312 for (i=0; i<len; i++) {
313 // some plugins like ipmi generate special characters in metric name
314 switch(orig_name[i]) {
315 case '(': orig_name[i] = '_'; break;
316 case ')': orig_name[i] = '_'; break;
317 case ' ': orig_name[i] = '_'; break;
318 case '"': orig_name[i] = '_'; break;
319 case '\'': orig_name[i] = '_'; break;
320 case '+': orig_name[i] = '_'; break;
321 }
322 }
323 } /* }}} char *replace_sensu_name_reserved */
325 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
326 data_set_t const *ds,
327 value_list_t const *vl, size_t index,
328 gauge_t const *rates,
329 int status)
330 {
331 char name_buffer[5 * DATA_MAX_NAME_LEN];
332 char service_buffer[6 * DATA_MAX_NAME_LEN];
333 int i;
334 char *ret_str;
335 char *temp_str;
336 char *value_str;
337 int res;
338 // First part of the JSON string
339 const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
341 char *handlers_str = build_json_str_list("handlers", &(host->metric_handlers));
342 if (handlers_str == NULL) {
343 ERROR(alloc_err);
344 return NULL;
345 }
347 // incorporate the handlers
348 if (strlen(handlers_str) == 0) {
349 free(handlers_str);
350 ret_str = strdup(part1);
351 if (ret_str == NULL) {
352 ERROR(alloc_err);
353 return NULL;
354 }
355 }
356 else {
357 res = asprintf(&ret_str, "%s, %s", part1, handlers_str);
358 free(handlers_str);
359 if (res == -1) {
360 ERROR(alloc_err);
361 return NULL;
362 }
363 }
365 // incorporate the plugin name information
366 res = asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, vl->plugin);
367 free(ret_str);
368 if (res == -1) {
369 ERROR(alloc_err);
370 return NULL;
371 }
372 ret_str = temp_str;
374 // incorporate the plugin type
375 res = asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, vl->type);
376 free(ret_str);
377 if (res == -1) {
378 ERROR(alloc_err);
379 return NULL;
380 }
381 ret_str = temp_str;
383 // incorporate the plugin instance if any
384 if (vl->plugin_instance[0] != 0) {
385 res = asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, vl->plugin_instance);
386 free(ret_str);
387 if (res == -1) {
388 ERROR(alloc_err);
389 return NULL;
390 }
391 ret_str = temp_str;
392 }
394 // incorporate the plugin type instance if any
395 if (vl->type_instance[0] != 0) {
396 res = asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, vl->type_instance);
397 free(ret_str);
398 if (res == -1) {
399 ERROR(alloc_err);
400 return NULL;
401 }
402 ret_str = temp_str;
403 }
405 // incorporate the data source type
406 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
407 char ds_type[DATA_MAX_NAME_LEN];
408 ssnprintf (ds_type, sizeof (ds_type), "%s:rate", DS_TYPE_TO_STRING(ds->ds[index].type));
409 res = asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, ds_type);
410 free(ret_str);
411 if (res == -1) {
412 ERROR(alloc_err);
413 return NULL;
414 }
415 ret_str = temp_str;
416 } else {
417 res = asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
418 free(ret_str);
419 if (res == -1) {
420 ERROR(alloc_err);
421 return NULL;
422 }
423 ret_str = temp_str;
424 }
426 // incorporate the data source name
427 res = asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"", ret_str, ds->ds[index].name);
428 free(ret_str);
429 if (res == -1) {
430 ERROR(alloc_err);
431 return NULL;
432 }
433 ret_str = temp_str;
435 // incorporate the data source index
436 {
437 char ds_index[DATA_MAX_NAME_LEN];
438 ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
439 res = asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s", ret_str, ds_index);
440 free(ret_str);
441 if (res == -1) {
442 ERROR(alloc_err);
443 return NULL;
444 }
445 ret_str = temp_str;
446 }
448 // add key value attributes from config if any
449 for (i = 0; i < sensu_attrs_num; i += 2) {
450 res = asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
451 free(ret_str);
452 if (res == -1) {
453 ERROR(alloc_err);
454 return NULL;
455 }
456 ret_str = temp_str;
457 }
459 // incorporate sensu tags from config if any
460 if (strlen(sensu_tags) != 0) {
461 res = asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
462 free(ret_str);
463 if (res == -1) {
464 ERROR(alloc_err);
465 return NULL;
466 }
467 ret_str = temp_str;
468 }
470 // calculate the value and set to a string
471 if (ds->ds[index].type == DS_TYPE_GAUGE) {
472 double tmp_v = (double) vl->values[index].gauge;
473 res = asprintf(&value_str, "%.8f", tmp_v, sensu_tags);
474 if (res == -1) {
475 free(ret_str);
476 ERROR(alloc_err);
477 return NULL;
478 }
479 } else if (rates != NULL) {
480 double tmp_v = (double) rates[index];
481 res = asprintf(&value_str, "%.8f", tmp_v, sensu_tags);
482 if (res == -1) {
483 free(ret_str);
484 ERROR(alloc_err);
485 return NULL;
486 }
487 } else {
488 int64_t tmp_v;
489 if (ds->ds[index].type == DS_TYPE_DERIVE)
490 tmp_v = (int64_t) vl->values[index].derive;
491 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
492 tmp_v = (int64_t) vl->values[index].absolute;
493 else
494 tmp_v = (int64_t) vl->values[index].counter;
495 res = asprintf(&value_str, "%lld", tmp_v, sensu_tags);
496 if (res == -1) {
497 free(ret_str);
498 ERROR(alloc_err);
499 return NULL;
500 }
501 }
503 // Generate the full service name
504 sensu_format_name2(name_buffer, sizeof(name_buffer),
505 vl->host, vl->plugin, vl->plugin_instance,
506 vl->type, vl->type_instance, host->separator);
507 if (host->always_append_ds || (ds->ds_num > 1)) {
508 if (host->event_service_prefix == NULL)
509 ssnprintf(service_buffer, sizeof(service_buffer), "%s.%s",
510 name_buffer, ds->ds[index].name);
511 else
512 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
513 host->event_service_prefix, name_buffer, ds->ds[index].name);
514 } else {
515 if (host->event_service_prefix == NULL)
516 sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
517 else
518 ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
519 host->event_service_prefix, name_buffer);
520 }
522 // Replace collectd sensor name reserved characters so that time series DB is happy
523 in_place_replace_sensu_name_reserved(service_buffer);
525 // finalize the buffer by setting the output and closing curly bracket
526 res = asprintf(&temp_str, "%s, \"output\": \"%s %s %ld\"}\n", ret_str, service_buffer, value_str, CDTIME_T_TO_TIME_T(vl->time));
527 free(ret_str);
528 free(value_str);
529 if (res == -1) {
530 ERROR(alloc_err);
531 return NULL;
532 }
533 ret_str = temp_str;
535 DEBUG("write_sensu plugin: Successfully created json for metric: "
536 "host = \"%s\", service = \"%s\"",
537 vl->host, service_buffer);
538 return ret_str;
539 } /* }}} char *sensu_value_to_json */
541 /*
542 * Uses replace_str2() implementation from
543 * http://creativeandcritical.net/str-replace-c/
544 * copyright (c) Laird Shaw, under public domain.
545 */
546 char *replace_str(const char *str, const char *old, /* {{{ */
547 const char *new)
548 {
549 char *ret, *r;
550 const char *p, *q;
551 size_t oldlen = strlen(old);
552 size_t count = strlen(new);
553 size_t retlen = count;
554 size_t newlen = count;
555 int samesize = (oldlen == newlen);
557 if (!samesize) {
558 for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
559 count++;
560 /* This is undefined if p - str > PTRDIFF_MAX */
561 retlen = p - str + strlen(p) + count * (newlen - oldlen);
562 } else
563 retlen = strlen(str);
565 ret = malloc(retlen + 1);
566 if (ret == NULL)
567 return NULL;
568 // added to original: not optimized, but keeps valgrind happy.
569 memset(ret, 0, retlen + 1);
571 r = ret;
572 p = str;
573 while (1) {
574 /* If the old and new strings are different lengths - in other
575 * words we have already iterated through with strstr above,
576 * and thus we know how many times we need to call it - then we
577 * can avoid the final (potentially lengthy) call to strstr,
578 * which we already know is going to return NULL, by
579 * decrementing and checking count.
580 */
581 if (!samesize && !count--)
582 break;
583 /* Otherwise i.e. when the old and new strings are the same
584 * length, and we don't know how many times to call strstr,
585 * we must check for a NULL return here (we check it in any
586 * event, to avoid further conditions, and because there's
587 * no harm done with the check even when the old and new
588 * strings are different lengths).
589 */
590 if ((q = strstr(p, old)) == NULL)
591 break;
592 /* This is undefined if q - p > PTRDIFF_MAX */
593 ptrdiff_t l = q - p;
594 memcpy(r, p, l);
595 r += l;
596 memcpy(r, new, newlen);
597 r += newlen;
598 p = q + oldlen;
599 }
600 strncpy(r, p, strlen(p));
602 return ret;
603 } /* }}} char *replace_str */
605 static char *replace_json_reserved(const char *message) /* {{{ */
606 {
607 char *msg = replace_str(message, "\\", "\\\\");
608 if (msg == NULL) {
609 ERROR(alloc_err);
610 return NULL;
611 }
612 char *tmp = replace_str(msg, "\"", "\\\"");
613 free(msg);
614 if (tmp == NULL) {
615 ERROR(alloc_err);
616 return NULL;
617 }
618 msg = replace_str(tmp, "\n", "\\\n");
619 free(tmp);
620 if (msg == NULL) {
621 ERROR(alloc_err);
622 return NULL;
623 }
624 return msg;
625 } /* }}} char *replace_json_reserved */
627 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
628 notification_t const *n)
629 {
630 char service_buffer[6 * DATA_MAX_NAME_LEN];
631 char const *severity;
632 notification_meta_t *meta;
633 char *ret_str;
634 char *temp_str;
635 int status;
636 int i;
637 int res;
638 // add the severity/status
639 switch (n->severity) {
640 case NOTIF_OKAY:
641 severity = "OK";
642 status = 0;
643 break;
644 case NOTIF_WARNING:
645 severity = "WARNING";
646 status = 1;
647 break;
648 case NOTIF_FAILURE:
649 severity = "CRITICAL";
650 status = 2;
651 break;
652 default:
653 severity = "UNKNOWN";
654 status = 3;
655 }
656 res = asprintf(&temp_str, "{\"status\": %d", status);
657 if (res == -1) {
658 ERROR(alloc_err);
659 return NULL;
660 }
661 ret_str = temp_str;
663 // incorporate the timestamp
664 res = asprintf(&temp_str, "%s, \"timestamp\": %ld", ret_str, CDTIME_T_TO_TIME_T(n->time));
665 free(ret_str);
666 if (res == -1) {
667 ERROR(alloc_err);
668 return NULL;
669 }
670 ret_str = temp_str;
672 char *handlers_str = build_json_str_list("handlers", &(host->notification_handlers));
673 if (handlers_str == NULL) {
674 ERROR(alloc_err);
675 return NULL;
676 }
677 // incorporate the handlers
678 if (strlen(handlers_str) != 0) {
679 res = asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
680 free(ret_str);
681 free(handlers_str);
682 if (res == -1) {
683 ERROR(alloc_err);
684 return NULL;
685 }
686 ret_str = temp_str;
687 } else {
688 free(handlers_str);
689 }
691 // incorporate the plugin name information if any
692 if (n->plugin[0] != 0) {
693 res = asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, n->plugin);
694 free(ret_str);
695 if (res == -1) {
696 ERROR(alloc_err);
697 return NULL;
698 }
699 ret_str = temp_str;
700 }
702 // incorporate the plugin type if any
703 if (n->type[0] != 0) {
704 res = asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, n->type);
705 free(ret_str);
706 if (res == -1) {
707 ERROR(alloc_err);
708 return NULL;
709 }
710 ret_str = temp_str;
711 }
713 // incorporate the plugin instance if any
714 if (n->plugin_instance[0] != 0) {
715 res = asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, n->plugin_instance);
716 free(ret_str);
717 if (res == -1) {
718 ERROR(alloc_err);
719 return NULL;
720 }
721 ret_str = temp_str;
722 }
724 // incorporate the plugin type instance if any
725 if (n->type_instance[0] != 0) {
726 res = asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, n->type_instance);
727 free(ret_str);
728 if (res == -1) {
729 ERROR(alloc_err);
730 return NULL;
731 }
732 ret_str = temp_str;
733 }
735 // add key value attributes from config if any
736 for (i = 0; i < sensu_attrs_num; i += 2) {
737 res = asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
738 free(ret_str);
739 if (res == -1) {
740 ERROR(alloc_err);
741 return NULL;
742 }
743 ret_str = temp_str;
744 }
746 // incorporate sensu tags from config if any
747 if (strlen(sensu_tags) != 0) {
748 res = asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
749 free(ret_str);
750 if (res == -1) {
751 ERROR(alloc_err);
752 return NULL;
753 }
754 ret_str = temp_str;
755 }
757 // incorporate the service name
758 sensu_format_name2(service_buffer, sizeof(service_buffer),
759 /* host */ "", n->plugin, n->plugin_instance,
760 n->type, n->type_instance, host->separator);
761 // replace sensu event name chars that are considered illegal
762 in_place_replace_sensu_name_reserved(service_buffer);
763 res = asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str, &service_buffer[1]);
764 free(ret_str);
765 if (res == -1) {
766 ERROR(alloc_err);
767 return NULL;
768 }
769 ret_str = temp_str;
771 // incorporate the check output
772 if (n->message[0] != 0) {
773 char *msg = replace_json_reserved(n->message);
774 if (msg == NULL) {
775 ERROR(alloc_err);
776 return NULL;
777 }
778 res = asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str, severity, msg);
779 free(ret_str);
780 free(msg);
781 if (res == -1) {
782 ERROR(alloc_err);
783 return NULL;
784 }
785 ret_str = temp_str;
786 }
788 // Pull in values from threshold and add extra attributes
789 for (meta = n->meta; meta != NULL; meta = meta->next) {
790 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE) {
791 res = asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str, meta->nm_value.nm_double);
792 free(ret_str);
793 if (res == -1) {
794 ERROR(alloc_err);
795 return NULL;
796 }
797 ret_str = temp_str;
798 }
799 if (meta->type == NM_TYPE_STRING) {
800 res = asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name, meta->nm_value.nm_string);
801 free(ret_str);
802 if (res == -1) {
803 ERROR(alloc_err);
804 return NULL;
805 }
806 ret_str = temp_str;
807 }
808 }
810 // close the curly bracket
811 res = asprintf(&temp_str, "%s}\n", ret_str);
812 free(ret_str);
813 if (res == -1) {
814 ERROR(alloc_err);
815 return NULL;
816 }
817 ret_str = temp_str;
819 DEBUG("write_sensu plugin: Successfully created JSON for notification: "
820 "host = \"%s\", service = \"%s\", state = \"%s\"",
821 n->host, service_buffer, severity);
822 return ret_str;
823 } /* }}} char *sensu_notification_to_json */
825 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
826 {
827 int status = 0;
828 size_t buffer_len;
830 status = sensu_connect(host);
831 if (status != 0)
832 return status;
834 buffer_len = strlen(msg);
836 status = (int) swrite(host->s, msg, buffer_len);
837 sensu_close_socket(host);
839 if (status != 0) {
840 char errbuf[1024];
841 ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
842 (host->node != NULL) ? host->node : SENSU_HOST,
843 (host->service != NULL) ? host->service : SENSU_PORT,
844 sstrerror(errno, errbuf, sizeof(errbuf)));
845 return -1;
846 }
848 return 0;
849 } /* }}} int sensu_send_msg */
852 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
853 {
854 int status = 0;
856 status = sensu_send_msg(host, msg);
857 if (status != 0) {
858 host->flags &= ~F_READY;
859 if (host->res != NULL) {
860 freeaddrinfo(host->res);
861 host->res = NULL;
862 }
863 return status;
864 }
866 return 0;
867 } /* }}} int sensu_send */
870 static int sensu_write(const data_set_t *ds, /* {{{ */
871 const value_list_t *vl,
872 user_data_t *ud)
873 {
874 int status = 0;
875 int statuses[vl->values_len];
876 struct sensu_host *host = ud->data;
877 gauge_t *rates = NULL;
878 int i;
879 char *msg;
881 pthread_mutex_lock(&host->lock);
882 memset(statuses, 0, vl->values_len * sizeof(*statuses));
884 if (host->store_rates) {
885 rates = uc_get_rate(ds, vl);
886 if (rates == NULL) {
887 ERROR("write_sensu plugin: uc_get_rate failed.");
888 pthread_mutex_unlock(&host->lock);
889 return -1;
890 }
891 }
892 for (i = 0; i < (size_t) vl->values_len; i++) {
893 msg = sensu_value_to_json(host, ds, vl, (int) i, rates, statuses[i]);
894 if (msg == NULL) {
895 sfree(rates);
896 pthread_mutex_unlock(&host->lock);
897 return -1;
898 }
899 status = sensu_send(host, msg);
900 free(msg);
901 if (status != 0) {
902 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
903 pthread_mutex_unlock(&host->lock);
904 sfree(rates);
905 return status;
906 }
907 }
908 sfree(rates);
909 pthread_mutex_unlock(&host->lock);
910 return status;
911 } /* }}} int sensu_write */
913 static int sensu_notification(const notification_t *n, user_data_t *ud) /* {{{ */
914 {
915 int status;
916 struct sensu_host *host = ud->data;
917 char *msg;
919 pthread_mutex_lock(&host->lock);
921 msg = sensu_notification_to_json(host, n);
922 if (msg == NULL) {
923 pthread_mutex_unlock(&host->lock);
924 return -1;
925 }
927 status = sensu_send(host, msg);
928 free(msg);
929 if (status != 0)
930 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
931 pthread_mutex_unlock(&host->lock);
933 return status;
934 } /* }}} int sensu_notification */
936 static void sensu_free(void *p) /* {{{ */
937 {
938 struct sensu_host *host = p;
940 if (host == NULL)
941 return;
943 pthread_mutex_lock(&host->lock);
945 host->reference_count--;
946 if (host->reference_count > 0) {
947 pthread_mutex_unlock(&host->lock);
948 return;
949 }
951 sensu_close_socket(host);
952 if (host->res != NULL) {
953 freeaddrinfo(host->res);
954 host->res = NULL;
955 }
956 sfree(host->service);
957 sfree(host->event_service_prefix);
958 sfree(host->name);
959 sfree(host->node);
960 sfree(host->separator);
961 free_str_list(&(host->metric_handlers));
962 free_str_list(&(host->notification_handlers));
963 pthread_mutex_destroy(&host->lock);
964 sfree(host);
965 } /* }}} void sensu_free */
968 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
969 {
970 struct sensu_host *host = NULL;
971 int status = 0;
972 int i;
973 oconfig_item_t *child;
974 char callback_name[DATA_MAX_NAME_LEN];
975 user_data_t ud;
977 if ((host = calloc(1, sizeof(*host))) == NULL) {
978 ERROR("write_sensu plugin: calloc failed.");
979 return ENOMEM;
980 }
981 pthread_mutex_init(&host->lock, NULL);
982 host->reference_count = 1;
983 host->node = NULL;
984 host->service = NULL;
985 host->notifications = 0;
986 host->metrics = 0;
987 host->store_rates = 1;
988 host->always_append_ds = 0;
989 host->metric_handlers.nb_strs = 0;
990 host->metric_handlers.strs = NULL;
991 host->notification_handlers.nb_strs = 0;
992 host->notification_handlers.strs = NULL;
993 host->separator = strdup("/");
994 if (host->separator == NULL) {
995 ERROR(alloc_err);
996 sensu_free(host);
997 return -1;
998 }
1000 status = cf_util_get_string(ci, &host->name);
1001 if (status != 0) {
1002 WARNING("write_sensu plugin: Required host name is missing.");
1003 sensu_free(host);
1004 return -1;
1005 }
1007 for (i = 0; i < ci->children_num; i++) {
1008 child = &ci->children[i];
1009 status = 0;
1011 if (strcasecmp("Host", child->key) == 0) {
1012 status = cf_util_get_string(child, &host->node);
1013 if (status != 0)
1014 break;
1015 } else if (strcasecmp("Notifications", child->key) == 0) {
1016 status = cf_util_get_boolean(child, &host->notifications);
1017 if (status != 0)
1018 break;
1019 } else if (strcasecmp("Metrics", child->key) == 0) {
1020 status = cf_util_get_boolean(child, &host->metrics);
1021 if (status != 0)
1022 break;
1023 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1024 status = cf_util_get_string(child, &host->event_service_prefix);
1025 if (status != 0)
1026 break;
1027 } else if (strcasecmp("Separator", child->key) == 0) {
1028 status = cf_util_get_string(child, &host->separator);
1029 if (status != 0)
1030 break;
1031 } else if (strcasecmp("MetricHandler", child->key) == 0) {
1032 char *temp_str = NULL;
1033 status = cf_util_get_string(child, &temp_str);
1034 if (status != 0)
1035 break;
1036 status = add_str_to_list(&(host->metric_handlers), temp_str);
1037 free(temp_str);
1038 if (status != 0)
1039 break;
1040 } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1041 char *temp_str = NULL;
1042 status = cf_util_get_string(child, &temp_str);
1043 if (status != 0)
1044 break;
1045 status = add_str_to_list(&(host->notification_handlers), temp_str);
1046 free(temp_str);
1047 if (status != 0)
1048 break;
1049 } else if (strcasecmp("Port", child->key) == 0) {
1050 status = cf_util_get_service(child, &host->service);
1051 if (status != 0) {
1052 ERROR("write_sensu plugin: Invalid argument "
1053 "configured for the \"Port\" "
1054 "option.");
1055 break;
1056 }
1057 } else if (strcasecmp("StoreRates", child->key) == 0) {
1058 status = cf_util_get_boolean(child, &host->store_rates);
1059 if (status != 0)
1060 break;
1061 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1062 status = cf_util_get_boolean(child,
1063 &host->always_append_ds);
1064 if (status != 0)
1065 break;
1066 } else {
1067 WARNING("write_sensu plugin: ignoring unknown config "
1068 "option: \"%s\"", child->key);
1069 }
1070 }
1071 if (status != 0) {
1072 sensu_free(host);
1073 return status;
1074 }
1076 if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1077 sensu_free(host);
1078 WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. Giving up.");
1079 return -1;
1080 }
1082 if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1083 sensu_free(host);
1084 WARNING("write_sensu plugin: notifications enabled but no NotificationHandler defined. Giving up.");
1085 return -1;
1086 }
1088 if ((host->notification_handlers.nb_strs > 0) && (host->notifications == 0)) {
1089 WARNING("write_sensu plugin: NotificationHandler given so forcing notifications to be enabled");
1090 host->notifications = 1;
1091 }
1093 if ((host->metric_handlers.nb_strs > 0) && (host->metrics == 0)) {
1094 WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be enabled");
1095 host->metrics = 1;
1096 }
1098 if (!(host->notifications || host->metrics)) {
1099 WARNING("write_sensu plugin: neither metrics nor notifications enabled. Giving up.");
1100 sensu_free(host);
1101 return -1;
1102 }
1104 ssnprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1105 ud.data = host;
1106 ud.free_func = sensu_free;
1108 pthread_mutex_lock(&host->lock);
1110 if (host->metrics) {
1111 status = plugin_register_write(callback_name, sensu_write, &ud);
1112 if (status != 0)
1113 WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1114 "failed with status %i.",
1115 callback_name, status);
1116 else /* success */
1117 host->reference_count++;
1118 }
1120 if (host->notifications) {
1121 status = plugin_register_notification(callback_name, sensu_notification, &ud);
1122 if (status != 0)
1123 WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1124 "failed with status %i.",
1125 callback_name, status);
1126 else
1127 host->reference_count++;
1128 }
1130 if (host->reference_count <= 1) {
1131 /* Both callbacks failed => free memory.
1132 * We need to unlock here, because sensu_free() will lock.
1133 * This is not a race condition, because we're the only one
1134 * holding a reference. */
1135 pthread_mutex_unlock(&host->lock);
1136 sensu_free(host);
1137 return -1;
1138 }
1140 host->reference_count--;
1141 pthread_mutex_unlock(&host->lock);
1143 return status;
1144 } /* }}} int sensu_config_node */
1146 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1147 {
1148 int i;
1149 oconfig_item_t *child;
1150 int status;
1151 struct str_list sensu_tags_arr;
1153 sensu_tags_arr.nb_strs = 0;
1154 sensu_tags_arr.strs = NULL;
1155 sensu_tags = malloc(sizeof(char));
1156 if (sensu_tags == NULL) {
1157 ERROR(alloc_err);
1158 return -1;
1159 }
1160 sensu_tags[0] = '\0';
1162 for (i = 0; i < ci->children_num; i++) {
1163 child = &ci->children[i];
1165 if (strcasecmp("Node", child->key) == 0) {
1166 sensu_config_node(child);
1167 } else if (strcasecmp(child->key, "attribute") == 0) {
1168 char *key = NULL;
1169 char *val = NULL;
1171 if (child->values_num != 2) {
1172 WARNING("sensu attributes need both a key and a value.");
1173 free(sensu_tags);
1174 return -1;
1175 }
1176 if (child->values[0].type != OCONFIG_TYPE_STRING ||
1177 child->values[1].type != OCONFIG_TYPE_STRING) {
1178 WARNING("sensu attribute needs string arguments.");
1179 free(sensu_tags);
1180 return -1;
1181 }
1182 if ((key = strdup(child->values[0].value.string)) == NULL) {
1183 ERROR(alloc_err);
1184 free(sensu_tags);
1185 return -1;
1186 }
1187 if ((val = strdup(child->values[1].value.string)) == NULL) {
1188 free(sensu_tags);
1189 free(key);
1190 ERROR(alloc_err);
1191 return -1;
1192 }
1193 strarray_add(&sensu_attrs, &sensu_attrs_num, key);
1194 strarray_add(&sensu_attrs, &sensu_attrs_num, val);
1195 DEBUG("write_sensu: got attr: %s => %s", key, val);
1196 sfree(key);
1197 sfree(val);
1198 } else if (strcasecmp(child->key, "tag") == 0) {
1199 char *tmp = NULL;
1200 status = cf_util_get_string(child, &tmp);
1201 if (status != 0)
1202 continue;
1204 status = add_str_to_list(&sensu_tags_arr, tmp);
1205 sfree(tmp);
1206 if (status != 0)
1207 continue;
1208 DEBUG("write_sensu plugin: Got tag: %s", tmp);
1209 } else {
1210 WARNING("write_sensu plugin: Ignoring unknown "
1211 "configuration option \"%s\" at top level.",
1212 child->key);
1213 }
1214 }
1215 if (sensu_tags_arr.nb_strs > 0) {
1216 free(sensu_tags);
1217 sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1218 free_str_list(&sensu_tags_arr);
1219 if (sensu_tags == NULL) {
1220 ERROR(alloc_err);
1221 return -1;
1222 }
1223 }
1224 return 0;
1225 } /* }}} int sensu_config */
1227 void module_register(void)
1228 {
1229 plugin_register_complex_config("write_sensu", sensu_config);
1230 }
1232 /* vim: set sw=8 sts=8 ts=8 noet : */