Code

d31a988e76de275fe52adb50d3d23557095e4de9
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  *
4  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
5  * Copyright (C) 2013       Florian octo Forster
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
16  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
17  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  *
19  * Authors:
20  *   Pierre-Yves Ritschard <pyr at spootnik.org>
21  *   Florian octo Forster <octo at collectd.org>
22  */
24 #include "collectd.h"
25 #include "plugin.h"
26 #include "common.h"
27 #include "configfile.h"
28 #include "utils_cache.h"
29 #include "riemann.pb-c.h"
31 #include <sys/socket.h>
32 #include <arpa/inet.h>
33 #include <errno.h>
34 #include <netdb.h>
35 #include <inttypes.h>
36 #include <pthread.h>
38 #define RIEMANN_HOST            "localhost"
39 #define RIEMANN_PORT            "5555"
41 struct riemann_host {
42         char                    *name;
43 #define F_CONNECT                0x01
44         uint8_t                  flags;
45         pthread_mutex_t          lock;
46         _Bool                    store_rates;
47         _Bool                    always_append_ds;
48         char                    *node;
49         char                    *service;
50         _Bool                    use_tcp;
51         int                      s;
53         int                      reference_count;
54 };
56 static char     **riemann_tags;
57 static size_t     riemann_tags_num;
59 static void riemann_event_protobuf_free (Event *event) /* {{{ */
60 {
61         if (event == NULL)
62                 return;
64         sfree (event->state);
65         sfree (event->service);
66         sfree (event->host);
67         sfree (event->description);
69         strarray_free (event->tags, event->n_tags);
70         event->tags = NULL;
71         event->n_tags = 0;
73         sfree (event);
74 } /* }}} void riemann_event_protobuf_free */
76 static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */
77 {
78         size_t i;
80         if (msg == NULL)
81                 return;
83         for (i = 0; i < msg->n_events; i++)
84         {
85                 riemann_event_protobuf_free (msg->events[i]);
86                 msg->events[i] = NULL;
87         }
89         sfree (msg->events);
90         msg->n_events = 0;
92         sfree (msg);
93 } /* }}} void riemann_msg_protobuf_free */
95 /* host->lock must be held when calling this function. */
96 static int
97 riemann_connect(struct riemann_host *host)
98 {
99         int                      e;
100         struct addrinfo         *ai, *res, hints;
101         char const              *node;
102         char const              *service;
104         if (host->flags & F_CONNECT)
105                 return 0;
107         memset(&hints, 0, sizeof(hints));
108         memset(&service, 0, sizeof(service));
109         hints.ai_family = AF_UNSPEC;
110         hints.ai_socktype = host->use_tcp ? SOCK_STREAM : SOCK_DGRAM;
111 #ifdef AI_ADDRCONFIG
112         hints.ai_flags |= AI_ADDRCONFIG;
113 #endif
115         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
116         service = (host->service != NULL) ? host->service : RIEMANN_PORT;
118         if ((e = getaddrinfo(node, service, &hints, &res)) != 0) {
119                 ERROR ("write_riemann plugin: Unable to resolve host \"%s\": %s",
120                         node, gai_strerror(e));
121                 return -1;
122         }
124         host->s = -1;
125         for (ai = res; ai != NULL; ai = ai->ai_next) {
126                 if ((host->s = socket(ai->ai_family,
127                                       ai->ai_socktype,
128                                       ai->ai_protocol)) == -1) {
129                         continue;
130                 }
132                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
133                         close(host->s);
134                         host->s = -1;
135                         continue;
136                 }
138                 host->flags |= F_CONNECT;
139                 DEBUG("write_riemann plugin: got a succesful connection for: %s:%s",
140                                 node, service);
141                 break;
142         }
144         freeaddrinfo(res);
146         if (host->s < 0) {
147                 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%s",
148                                 node, service);
149                 return -1;
150         }
151         return 0;
154 /* host->lock must be held when calling this function. */
155 static int
156 riemann_disconnect (struct riemann_host *host)
158         if ((host->flags & F_CONNECT) == 0)
159                 return (0);
161         close (host->s);
162         host->s = -1;
163         host->flags &= ~F_CONNECT;
165         return (0);
168 static int
169 riemann_send(struct riemann_host *host, Msg const *msg)
171         u_char *buffer;
172         size_t  buffer_len;
173         int status;
175         pthread_mutex_lock (&host->lock);
177         status = riemann_connect (host);
178         if (status != 0)
179         {
180                 pthread_mutex_unlock (&host->lock);
181                 return status;
182         }
184         buffer_len = msg__get_packed_size(msg);
185         if (host->use_tcp)
186                 buffer_len += 4;
188         buffer = malloc (buffer_len);
189         if (buffer == NULL) {
190                 pthread_mutex_unlock (&host->lock);
191                 ERROR ("write_riemann plugin: malloc failed.");
192                 return ENOMEM;
193         }
194         memset (buffer, 0, buffer_len);
196         if (host->use_tcp)
197         {
198                 uint32_t length = htonl ((uint32_t) (buffer_len - 4));
199                 memcpy (buffer, &length, 4);
200                 msg__pack(msg, buffer + 4);
201         }
202         else
203         {
204                 msg__pack(msg, buffer);
205         }
207         status = (int) swrite (host->s, buffer, buffer_len);
208         if (status != 0)
209         {
210                 char errbuf[1024];
212                 riemann_disconnect (host);
213                 pthread_mutex_unlock (&host->lock);
215                 ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
216                                 (host->node != NULL) ? host->node : RIEMANN_HOST,
217                                 (host->service != NULL) ? host->service : RIEMANN_PORT,
218                                 sstrerror (errno, errbuf, sizeof (errbuf)));
219                 sfree (buffer);
220                 return -1;
221         }
223         pthread_mutex_unlock (&host->lock);
224         sfree (buffer);
225         return 0;
228 static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
230         return (strarray_add (&event->tags, &event->n_tags, tag));
231 } /* }}} int riemann_event_add_tag */
233 static int riemann_event_add_attribute (Event *event, /* {{{ */
234                 char const *key, char const *value)
236         Attribute **new_attributes;
237         Attribute *a;
239         new_attributes = realloc (event->attributes,
240                         sizeof (*event->attributes) * (event->n_attributes + 1));
241         if (new_attributes == NULL)
242         {
243                 ERROR ("write_riemann plugin: realloc failed.");
244                 return (ENOMEM);
245         }
246         event->attributes = new_attributes;
248         a = malloc (sizeof (*a));
249         if (a == NULL)
250         {
251                 ERROR ("write_riemann plugin: malloc failed.");
252                 return (ENOMEM);
253         }
254         attribute__init (a);
256         a->key = strdup (key);
257         if (value != NULL)
258                 a->value = strdup (value);
260         event->attributes[event->n_attributes] = a;
261         event->n_attributes++;
263         return (0);
264 } /* }}} int riemann_event_add_attribute */
266 static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ */
267                 notification_t const *n)
269         Msg *msg;
270         Event *event;
271         char service_buffer[6 * DATA_MAX_NAME_LEN];
272         char const *severity;
273         notification_meta_t *meta;
274         int i;
276         msg = malloc (sizeof (*msg));
277         if (msg == NULL)
278         {
279                 ERROR ("write_riemann plugin: malloc failed.");
280                 return (NULL);
281         }
282         memset (msg, 0, sizeof (*msg));
283         msg__init (msg);
285         msg->events = malloc (sizeof (*msg->events));
286         if (msg->events == NULL)
287         {
288                 ERROR ("write_riemann plugin: malloc failed.");
289                 sfree (msg);
290                 return (NULL);
291         }
293         event = malloc (sizeof (*event));
294         if (event == NULL)
295         {
296                 ERROR ("write_riemann plugin: malloc failed.");
297                 sfree (msg->events);
298                 sfree (msg);
299                 return (NULL);
300         }
301         memset (event, 0, sizeof (*event));
302         event__init (event);
304         msg->events[0] = event;
305         msg->n_events = 1;
307         event->host = strdup (n->host);
308         event->time = CDTIME_T_TO_TIME_T (n->time);
309         event->has_time = 1;
311         switch (n->severity)
312         {
313                 case NOTIF_OKAY:        severity = "ok"; break;
314                 case NOTIF_WARNING:     severity = "warning"; break;
315                 case NOTIF_FAILURE:     severity = "critical"; break;
316                 default:                severity = "unknown";
317         }
318         event->state = strdup (severity);
320         riemann_event_add_tag (event, "notification");
321         if (n->host[0] != 0)
322                 riemann_event_add_attribute (event, "host", n->host);
323         if (n->plugin[0] != 0)
324                 riemann_event_add_attribute (event, "plugin", n->plugin);
325         if (n->plugin_instance[0] != 0)
326                 riemann_event_add_attribute (event, "plugin_instance",
327                                 n->plugin_instance);
329         if (n->type[0] != 0)
330                 riemann_event_add_attribute (event, "type", n->type);
331         if (n->type_instance[0] != 0)
332                 riemann_event_add_attribute (event, "type_instance",
333                                 n->type_instance);
335         for (i = 0; i < riemann_tags_num; i++)
336                 riemann_event_add_tag (event, riemann_tags[i]);
338         format_name (service_buffer, sizeof (service_buffer),
339                         /* host = */ "", n->plugin, n->plugin_instance,
340                         n->type, n->type_instance);
341         event->service = strdup (&service_buffer[1]);
343         /* Pull in values from threshold */
344         for (meta = n->meta; meta != NULL; meta = meta->next)
345         {
346                 if (strcasecmp ("CurrentValue", meta->name) != 0)
347                         continue;
349                 event->metric_d = meta->nm_value.nm_double;
350                 event->has_metric_d = 1;
351                 break;
352         }
354         DEBUG ("write_riemann plugin: Successfully created protobuf for notification: "
355                         "host = \"%s\", service = \"%s\", state = \"%s\"",
356                         event->host, event->service, event->state);
357         return (msg);
358 } /* }}} Msg *riemann_notification_to_protobuf */
360 static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */
361                 data_set_t const *ds,
362                 value_list_t const *vl, size_t index,
363                 gauge_t const *rates)
365         Event *event;
366         char name_buffer[5 * DATA_MAX_NAME_LEN];
367         char service_buffer[6 * DATA_MAX_NAME_LEN];
368         int i;
370         event = malloc (sizeof (*event));
371         if (event == NULL)
372         {
373                 ERROR ("write_riemann plugin: malloc failed.");
374                 return (NULL);
375         }
376         memset (event, 0, sizeof (*event));
377         event__init (event);
379         event->host = strdup (vl->host);
380         event->time = CDTIME_T_TO_TIME_T (vl->time);
381         event->has_time = 1;
382         event->ttl = CDTIME_T_TO_TIME_T (2 * vl->interval);
383         event->has_ttl = 1;
385         riemann_event_add_attribute (event, "plugin", vl->plugin);
386         if (vl->plugin_instance[0] != 0)
387                 riemann_event_add_attribute (event, "plugin_instance",
388                                 vl->plugin_instance);
390         riemann_event_add_attribute (event, "type", vl->type);
391         if (vl->type_instance[0] != 0)
392                 riemann_event_add_attribute (event, "type_instance",
393                                 vl->type_instance);
395         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
396         {
397                 char ds_type[DATA_MAX_NAME_LEN];
399                 ssnprintf (ds_type, sizeof (ds_type), "%s:rate",
400                                 DS_TYPE_TO_STRING(ds->ds[index].type));
401                 riemann_event_add_attribute (event, "ds_type", ds_type);
402         }
403         else
404         {
405                 riemann_event_add_attribute (event, "ds_type",
406                                 DS_TYPE_TO_STRING(ds->ds[index].type));
407         }
408         riemann_event_add_attribute (event, "ds_name", ds->ds[index].name);
409         {
410                 char ds_index[DATA_MAX_NAME_LEN];
412                 ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
413                 riemann_event_add_attribute (event, "ds_index", ds_index);
414         }
416         for (i = 0; i < riemann_tags_num; i++)
417                 riemann_event_add_tag (event, riemann_tags[i]);
419         if (ds->ds[index].type == DS_TYPE_GAUGE)
420         {
421                 event->has_metric_d = 1;
422                 event->metric_d = (double) vl->values[index].gauge;
423         }
424         else if (rates != NULL)
425         {
426                 event->has_metric_d = 1;
427                 event->metric_d = (double) rates[index];
428         }
429         else
430         {
431                 event->has_metric_sint64 = 1;
432                 if (ds->ds[index].type == DS_TYPE_DERIVE)
433                         event->metric_sint64 = (int64_t) vl->values[index].derive;
434                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
435                         event->metric_sint64 = (int64_t) vl->values[index].absolute;
436                 else
437                         event->metric_sint64 = (int64_t) vl->values[index].counter;
438         }
440         format_name (name_buffer, sizeof (name_buffer),
441                         /* host = */ "", vl->plugin, vl->plugin_instance,
442                         vl->type, vl->type_instance);
443         if (host->always_append_ds || (ds->ds_num > 1))
444                 ssnprintf (service_buffer, sizeof (service_buffer),
445                                 "%s/%s", &name_buffer[1], ds->ds[index].name);
446         else
447                 sstrncpy (service_buffer, &name_buffer[1],
448                                 sizeof (service_buffer));
450         event->service = strdup (service_buffer);
452         DEBUG ("write_riemann plugin: Successfully created protobuf for metric: "
453                         "host = \"%s\", service = \"%s\"",
454                         event->host, event->service);
455         return (event);
456 } /* }}} Event *riemann_value_to_protobuf */
458 static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */
459                 data_set_t const *ds,
460                 value_list_t const *vl)
462         Msg *msg;
463         size_t i;
464         gauge_t *rates = NULL;
466         /* Initialize the Msg structure. */
467         msg = malloc (sizeof (*msg));
468         if (msg == NULL)
469         {
470                 ERROR ("write_riemann plugin: malloc failed.");
471                 return (NULL);
472         }
473         memset (msg, 0, sizeof (*msg));
474         msg__init (msg);
476         /* Set up events. First, the list of pointers. */
477         msg->n_events = (size_t) vl->values_len;
478         msg->events = calloc (msg->n_events, sizeof (*msg->events));
479         if (msg->events == NULL)
480         {
481                 ERROR ("write_riemann plugin: calloc failed.");
482                 riemann_msg_protobuf_free (msg);
483                 return (NULL);
484         }
486         if (host->store_rates)
487         {
488                 rates = uc_get_rate (ds, vl);
489                 if (rates == NULL)
490                 {
491                         ERROR ("write_riemann plugin: uc_get_rate failed.");
492                         riemann_msg_protobuf_free (msg);
493                         return (NULL);
494                 }
495         }
497         for (i = 0; i < msg->n_events; i++)
498         {
499                 msg->events[i] = riemann_value_to_protobuf (host, ds, vl,
500                                 (int) i, rates);
501                 if (msg->events[i] == NULL)
502                 {
503                         riemann_msg_protobuf_free (msg);
504                         sfree (rates);
505                         return (NULL);
506                 }
507         }
509         sfree (rates);
510         return (msg);
511 } /* }}} Msg *riemann_value_list_to_protobuf */
513 static int
514 riemann_notification(const notification_t *n, user_data_t *ud)
516         int                      status;
517         struct riemann_host     *host = ud->data;
518         Msg                     *msg;
520         msg = riemann_notification_to_protobuf (host, n);
521         if (msg == NULL)
522                 return (-1);
524         status = riemann_send (host, msg);
525         if (status != 0)
526                 ERROR ("write_riemann plugin: riemann_send failed with status %i",
527                                 status);
529         riemann_msg_protobuf_free (msg);
530         return (status);
531 } /* }}} int riemann_notification */
533 static int
534 riemann_write(const data_set_t *ds,
535               const value_list_t *vl,
536               user_data_t *ud)
538         int                      status;
539         struct riemann_host     *host = ud->data;
540         Msg                     *msg;
542         msg = riemann_value_list_to_protobuf (host, ds, vl);
543         if (msg == NULL)
544                 return (-1);
546         status = riemann_send (host, msg);
547         if (status != 0)
548                 ERROR ("write_riemann plugin: riemann_send failed with status %i",
549                                 status);
551         riemann_msg_protobuf_free (msg);
552         return status;
555 static void
556 riemann_free(void *p)
558         struct riemann_host     *host = p;
560         if (host == NULL)
561                 return;
563         pthread_mutex_lock (&host->lock);
565         host->reference_count--;
566         if (host->reference_count > 0)
567         {
568                 pthread_mutex_unlock (&host->lock);
569                 return;
570         }
572         riemann_disconnect (host);
574         sfree(host->service);
575         pthread_mutex_destroy (&host->lock);
576         sfree(host);
579 static int
580 riemann_config_node(oconfig_item_t *ci)
582         struct riemann_host     *host = NULL;
583         int                      status = 0;
584         int                      i;
585         oconfig_item_t          *child;
586         char                     callback_name[DATA_MAX_NAME_LEN];
587         user_data_t              ud;
589         if ((host = calloc(1, sizeof (*host))) == NULL) {
590                 ERROR ("write_riemann plugin: calloc failed.");
591                 return ENOMEM;
592         }
593         pthread_mutex_init (&host->lock, NULL);
594         host->reference_count = 1;
595         host->node = NULL;
596         host->service = NULL;
597         host->store_rates = 1;
598         host->always_append_ds = 0;
599         host->use_tcp = 0;
601         status = cf_util_get_string (ci, &host->name);
602         if (status != 0) {
603                 WARNING("write_riemann plugin: Required host name is missing.");
604                 riemann_free (host);
605                 return -1;
606         }
608         for (i = 0; i < ci->children_num; i++) {
609                 /*
610                  * The code here could be simplified but makes room
611                  * for easy adding of new options later on.
612                  */
613                 child = &ci->children[i];
614                 status = 0;
616                 if (strcasecmp ("Host", child->key) == 0) {
617                         status = cf_util_get_string (child, &host->node);
618                         if (status != 0)
619                                 break;
620                 } else if (strcasecmp ("Port", child->key) == 0) {
621                         status = cf_util_get_service (child, &host->service);
622                         if (status != 0) {
623                                 ERROR ("write_riemann plugin: Invalid argument "
624                                                 "configured for the \"Port\" "
625                                                 "option.");
626                                 break;
627                         }
628                 } else if (strcasecmp ("Protocol", child->key) == 0) {
629                         char tmp[16];
630                         status = cf_util_get_string_buffer (child,
631                                         tmp, sizeof (tmp));
632                         if (status != 0)
633                         {
634                                 ERROR ("write_riemann plugin: cf_util_get_"
635                                                 "string_buffer failed with "
636                                                 "status %i.", status);
637                                 break;
638                         }
640                         if (strcasecmp ("UDP", tmp) == 0)
641                                 host->use_tcp = 0;
642                         else if (strcasecmp ("TCP", tmp) == 0)
643                                 host->use_tcp = 1;
644                         else
645                                 WARNING ("write_riemann plugin: The value "
646                                                 "\"%s\" is not valid for the "
647                                                 "\"Protocol\" option. Use "
648                                                 "either \"UDP\" or \"TCP\".",
649                                                 tmp);
650                 } else if (strcasecmp ("StoreRates", child->key) == 0) {
651                         status = cf_util_get_boolean (child, &host->store_rates);
652                         if (status != 0)
653                                 break;
654                 } else if (strcasecmp ("AlwaysAppendDS", child->key) == 0) {
655                         status = cf_util_get_boolean (child,
656                                         &host->always_append_ds);
657                         if (status != 0)
658                                 break;
659                 } else {
660                         WARNING("write_riemann plugin: ignoring unknown config "
661                                 "option: \"%s\"", child->key);
662                 }
663         }
664         if (status != 0) {
665                 riemann_free (host);
666                 return status;
667         }
669         ssnprintf (callback_name, sizeof (callback_name), "write_riemann/%s",
670                         host->name);
671         ud.data = host;
672         ud.free_func = riemann_free;
674         pthread_mutex_lock (&host->lock);
676         status = plugin_register_write (callback_name, riemann_write, &ud);
677         if (status != 0)
678                 WARNING ("write_riemann plugin: plugin_register_write (\"%s\") "
679                                 "failed with status %i.",
680                                 callback_name, status);
681         else /* success */
682                 host->reference_count++;
684         status = plugin_register_notification (callback_name,
685                         riemann_notification, &ud);
686         if (status != 0)
687                 WARNING ("write_riemann plugin: plugin_register_notification (\"%s\") "
688                                 "failed with status %i.",
689                                 callback_name, status);
690         else /* success */
691                 host->reference_count++;
693         if (host->reference_count <= 1)
694         {
695                 /* Both callbacks failed => free memory.
696                  * We need to unlock here, because riemann_free() will lock.
697                  * This is not a race condition, because we're the only one
698                  * holding a reference. */
699                 pthread_mutex_unlock (&host->lock);
700                 riemann_free (host);
701                 return (-1);
702         }
704         host->reference_count--;
705         pthread_mutex_unlock (&host->lock);
707         return status;
710 static int
711 riemann_config(oconfig_item_t *ci)
713         int              i;
714         oconfig_item_t  *child;
715         int              status;
717         for (i = 0; i < ci->children_num; i++)  {
718                 child = &ci->children[i];
720                 if (strcasecmp("Node", child->key) == 0) {
721                         riemann_config_node (child);
722                 } else if (strcasecmp(child->key, "tag") == 0) {
723                         char *tmp = NULL;
724                         status = cf_util_get_string(child, &tmp);
725                         if (status != 0)
726                                 continue;
728                         strarray_add (&riemann_tags, &riemann_tags_num, tmp);
729                         DEBUG("write_riemann plugin: Got tag: %s", tmp);
730                         sfree (tmp);
731                 } else {
732                         WARNING ("write_riemann plugin: Ignoring unknown "
733                                  "configuration option \"%s\" at top level.",
734                                  child->key);
735                 }
736         }
737         return (0);
740 void
741 module_register(void)
743         plugin_register_complex_config ("write_riemann", riemann_config);
746 /* vim: set sw=8 sts=8 ts=8 noet : */