From: Florian Forster Date: Wed, 16 Jan 2013 21:00:11 +0000 (+0100) Subject: write_riemann plugin: Rename the "riemann" plugin. X-Git-Tag: collectd-5.3.0~56^2~6 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=0a116775f091004f4c7e7736f1c908dcc7851099;p=collectd.git write_riemann plugin: Rename the "riemann" plugin. --- diff --git a/configure.in b/configure.in index 005b30b6..5598a0b1 100644 --- a/configure.in +++ b/configure.in @@ -4932,7 +4932,6 @@ AC_PLUGIN([processes], [$plugin_processes], [Process statistics]) AC_PLUGIN([protocols], [$plugin_protocols], [Protocol (IP, TCP, ...) statistics]) AC_PLUGIN([python], [$with_python], [Embed a Python interpreter]) AC_PLUGIN([redis], [$with_libcredis], [Redis plugin]) -AC_PLUGIN([riemann], [$have_protoc_c], [Riemann plugin]) AC_PLUGIN([routeros], [$with_librouteros], [RouterOS plugin]) AC_PLUGIN([rrdcached], [$librrd_rrdc_update], [RRDTool output plugin]) AC_PLUGIN([rrdtool], [$with_librrd], [RRDTool output plugin]) @@ -4965,8 +4964,9 @@ AC_PLUGIN([vserver], [$plugin_vserver], [Linux VServer statistics]) AC_PLUGIN([wireless], [$plugin_wireless], [Wireless statistics]) AC_PLUGIN([write_graphite], [yes], [Graphite / Carbon output plugin]) AC_PLUGIN([write_http], [$with_libcurl], [HTTP output plugin]) -AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin]) AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin]) +AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin]) +AC_PLUGIN([write_riemann], [$have_protoc_c], [Riemann output plugin]) AC_PLUGIN([xmms], [$with_libxmms], [XMMS statistics]) AC_PLUGIN([zfs_arc], [$plugin_zfs_arc], [ZFS ARC statistics]) @@ -5266,7 +5266,6 @@ Configuration: protocols . . . . . . $enable_protocols python . . . . . . . $enable_python redis . . . . . . . . $enable_redis - riemann . . . . . . . $enable_riemann routeros . . . . . . $enable_routeros rrdcached . . . . . . $enable_rrdcached rrdtool . . . . . . . $enable_rrdtool @@ -5299,8 +5298,9 @@ Configuration: wireless . . . . . . $enable_wireless write_graphite . . . $enable_write_graphite write_http . . . . . $enable_write_http - write_redis . . . . . $enable_write_redis write_mongodb . . . . $enable_write_mongodb + write_redis . . . . . $enable_write_redis + write_riemann . . . . $enable_write_riemann xmms . . . . . . . . $enable_xmms zfs_arc . . . . . . . $enable_zfs_arc diff --git a/src/Makefile.am b/src/Makefile.am index e084930a..8bdf9f84 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -981,17 +981,6 @@ collectd_LDADD += "-dlopen" redis.la collectd_DEPENDENCIES += redis.la endif -if BUILD_PLUGIN_RIEMANN -BUILT_SOURCES += riemann.pb-c.c riemann.pb-c.h -CLEANFILES += riemann.pb-c.c riemann.pb-c.h -pkglib_LTLIBRARIES += riemann.la -riemann_la_SOURCES = riemann.c riemann.pb-c.c -riemann_la_LDFLAGS = -module -avoid-version -riemann_la_LIBADD = -lprotobuf-c -collectd_LDADD += "-dlopen" riemann.la -collectd_DEPENDENCIES += riemann.la -endif - if BUILD_PLUGIN_ROUTEROS pkglib_LTLIBRARIES += routeros.la routeros_la_SOURCES = routeros.c @@ -1347,6 +1336,17 @@ collectd_LDADD += "-dlopen" write_redis.la collectd_DEPENDENCIES += write_redis.la endif +if BUILD_PLUGIN_WRITE_RIEMANN +BUILT_SOURCES += riemann.pb-c.c riemann.pb-c.h +CLEANFILES += riemann.pb-c.c riemann.pb-c.h +pkglib_LTLIBRARIES += write_riemann.la +write_riemann_la_SOURCES = write_riemann.c riemann.pb-c.c +write_riemann_la_LDFLAGS = -module -avoid-version +write_riemann_la_LIBADD = -lprotobuf-c +collectd_LDADD += "-dlopen" write_riemann.la +collectd_DEPENDENCIES += write_riemann.la +endif + if BUILD_PLUGIN_XMMS pkglib_LTLIBRARIES += xmms.la xmms_la_SOURCES = xmms.c diff --git a/src/riemann.c b/src/riemann.c deleted file mode 100644 index c1844299..00000000 --- a/src/riemann.c +++ /dev/null @@ -1,687 +0,0 @@ -/* - * collectd - src/riemann.c - * - * Copyright (C) 2012 Pierre-Yves Ritschard - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER - * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING - * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - * - */ - -#include "collectd.h" -#include "plugin.h" -#include "common.h" -#include "configfile.h" -#include "utils_cache.h" -#include "riemann.pb-c.h" - -#include -#include -#include -#include -#include -#include - -#define RIEMANN_DELAY 1 -#define RIEMANN_PORT "5555" -#define RIEMANN_MAX_TAGS 37 -#define RIEMANN_EXTRA_TAGS 32 - -struct riemann_host { -#define F_CONNECT 0x01 - uint8_t flags; - pthread_mutex_t lock; - int delay; - _Bool store_rates; - char *node; - char *service; - int s; - - int reference_count; -}; - -static char **riemann_tags; -static size_t riemann_tags_num; - -static int riemann_send(struct riemann_host *, Msg const *); -static int riemann_notification(const notification_t *, user_data_t *); -static int riemann_write(const data_set_t *, const value_list_t *, user_data_t *); -static int riemann_connect(struct riemann_host *); -static int riemann_disconnect (struct riemann_host *host); -static void riemann_free(void *); -static int riemann_config_host(oconfig_item_t *); -static int riemann_config(oconfig_item_t *); -void module_register(void); - -static void riemann_event_protobuf_free (Event *event) /* {{{ */ -{ - if (event == NULL) - return; - - sfree (event->state); - sfree (event->service); - sfree (event->host); - sfree (event->description); - - strarray_free (event->tags, event->n_tags); - event->tags = NULL; - event->n_tags = 0; - - sfree (event); -} /* }}} void riemann_event_protobuf_free */ - -static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */ -{ - size_t i; - - if (msg == NULL) - return; - - for (i = 0; i < msg->n_events; i++) - { - riemann_event_protobuf_free (msg->events[i]); - msg->events[i] = NULL; - } - - sfree (msg->events); - msg->n_events = 0; - - sfree (msg); -} /* }}} void riemann_msg_protobuf_free */ - -static int -riemann_send(struct riemann_host *host, Msg const *msg) -{ - u_char *buffer; - size_t buffer_len; - int status; - - pthread_mutex_lock (&host->lock); - - status = riemann_connect (host); - if (status != 0) - { - pthread_mutex_unlock (&host->lock); - return status; - } - - buffer_len = msg__get_packed_size(msg); - buffer = malloc (buffer_len); - if (buffer == NULL) { - pthread_mutex_unlock (&host->lock); - ERROR ("riemann plugin: malloc failed."); - return ENOMEM; - } - memset (buffer, 0, buffer_len); - - msg__pack(msg, buffer); - - status = (int) swrite (host->s, buffer, buffer_len); - if (status != 0) - { - char errbuf[1024]; - - riemann_disconnect (host); - pthread_mutex_unlock (&host->lock); - - ERROR ("riemann plugin: Sending to Riemann at %s:%s failed: %s", - host->node, - (host->service != NULL) ? host->service : RIEMANN_PORT, - sstrerror (errno, errbuf, sizeof (errbuf))); - sfree (buffer); - return -1; - } - - pthread_mutex_unlock (&host->lock); - sfree (buffer); - return 0; -} - -static int riemann_event_add_tag (Event *event, /* {{{ */ - char const *format, ...) -{ - va_list ap; - char buffer[1024]; - size_t ret; - - va_start (ap, format); - ret = vsnprintf (buffer, sizeof (buffer), format, ap); - if (ret >= sizeof (buffer)) - ret = sizeof (buffer) - 1; - buffer[ret] = 0; - va_end (ap); - - return (strarray_add (&event->tags, &event->n_tags, buffer)); -} /* }}} int riemann_event_add_tag */ - -static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ */ - notification_t const *n) -{ - Msg *msg; - Event *event; - char service_buffer[6 * DATA_MAX_NAME_LEN]; - char const *severity; - notification_meta_t *meta; - int i; - - msg = malloc (sizeof (*msg)); - if (msg == NULL) - { - ERROR ("riemann plugin: malloc failed."); - return (NULL); - } - memset (msg, 0, sizeof (*msg)); - msg__init (msg); - - msg->events = malloc (sizeof (*msg->events)); - if (msg->events == NULL) - { - ERROR ("riemann plugin: malloc failed."); - sfree (msg); - return (NULL); - } - - event = malloc (sizeof (*event)); - if (event == NULL) - { - ERROR ("riemann plugin: malloc failed."); - sfree (msg->events); - sfree (msg); - return (NULL); - } - memset (event, 0, sizeof (*event)); - event__init (event); - - msg->events[0] = event; - msg->n_events = 1; - - event->host = strdup (n->host); - event->time = CDTIME_T_TO_TIME_T (n->time); - event->has_time = 1; - - switch (n->severity) - { - case NOTIF_OKAY: severity = "okay"; break; - case NOTIF_WARNING: severity = "warning"; break; - case NOTIF_FAILURE: severity = "failure"; break; - default: severity = "unknown"; - } - event->state = strdup (severity); - - riemann_event_add_tag (event, "notification"); - if (n->plugin[0] != 0) - riemann_event_add_tag (event, "plugin:%s", n->plugin); - if (n->plugin_instance[0] != 0) - riemann_event_add_tag (event, "plugin_instance:%s", - n->plugin_instance); - - if (n->type[0] != 0) - riemann_event_add_tag (event, "type:%s", n->type); - if (n->type_instance[0] != 0) - riemann_event_add_tag (event, "type_instance:%s", - n->type_instance); - - for (i = 0; i < riemann_tags_num; i++) - riemann_event_add_tag (event, "%s", riemann_tags[i]); - - /* TODO: Use FORMAT_VL() here. */ - ssnprintf (service_buffer, sizeof(service_buffer), - "%s-%s-%s-%s", n->plugin, n->plugin_instance, - n->type, n->type_instance); - event->service = strdup (service_buffer); - - /* Pull in values from threshold */ - for (meta = n->meta; meta != NULL; meta = meta->next) - { - if (strcasecmp ("CurrentValue", meta->name) != 0) - continue; - - event->metric_d = meta->nm_value.nm_double; - event->has_metric_d = 1; - break; - } - - DEBUG ("riemann plugin: Successfully created protobuf for notification: " - "host = \"%s\", service = \"%s\", state = \"%s\"", - event->host, event->service, event->state); - return (msg); -} /* }}} Msg *riemann_notification_to_protobuf */ - -static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */ - data_set_t const *ds, - value_list_t const *vl, size_t index, - gauge_t const *rates) -{ - Event *event; - char service_buffer[6 * DATA_MAX_NAME_LEN]; - int i; - - event = malloc (sizeof (*event)); - if (event == NULL) - { - ERROR ("riemann plugin: malloc failed."); - return (NULL); - } - memset (event, 0, sizeof (*event)); - event__init (event); - - event->host = strdup (vl->host); - event->time = CDTIME_T_TO_TIME_T (vl->time); - event->has_time = 1; - event->ttl = CDTIME_T_TO_TIME_T (vl->interval) + host->delay; - event->has_ttl = 1; - - riemann_event_add_tag (event, "plugin:%s", vl->plugin); - if (vl->plugin_instance[0] != 0) - riemann_event_add_tag (event, "plugin_instance:%s", - vl->plugin_instance); - - riemann_event_add_tag (event, "type:%s", vl->type); - if (vl->type_instance[0] != 0) - riemann_event_add_tag (event, "type_instance:%s", - vl->type_instance); - - if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) - { - riemann_event_add_tag (event, "ds_type:%s:rate", - DS_TYPE_TO_STRING(ds->ds[index].type)); - } - else - { - riemann_event_add_tag (event, "ds_type:%s", - DS_TYPE_TO_STRING(ds->ds[index].type)); - } - riemann_event_add_tag (event, "ds_name:%s", ds->ds[index].name); - riemann_event_add_tag (event, "ds_index:%zu", index); - - for (i = 0; i < riemann_tags_num; i++) - riemann_event_add_tag (event, "%s", riemann_tags[i]); - - if (ds->ds[index].type == DS_TYPE_GAUGE) - { - event->has_metric_d = 1; - event->metric_d = (double) vl->values[index].gauge; - } - else if (rates != NULL) - { - event->has_metric_d = 1; - event->metric_d = (double) rates[index]; - } - else - { - event->has_metric_sint64 = 1; - if (ds->ds[index].type == DS_TYPE_DERIVE) - event->metric_sint64 = (int64_t) vl->values[index].derive; - else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) - event->metric_sint64 = (int64_t) vl->values[index].absolute; - else - event->metric_sint64 = (int64_t) vl->values[index].counter; - } - - /* TODO: Use FORMAT_VL() here. */ - ssnprintf (service_buffer, sizeof(service_buffer), - "%s-%s-%s-%s-%s", vl->plugin, vl->plugin_instance, - vl->type, vl->type_instance, ds->ds[index].name); - event->service = strdup (service_buffer); - - DEBUG ("riemann plugin: Successfully created protobuf for metric: " - "host = \"%s\", service = \"%s\"", - event->host, event->service); - return (event); -} /* }}} Event *riemann_value_to_protobuf */ - -static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */ - data_set_t const *ds, - value_list_t const *vl) -{ - Msg *msg; - size_t i; - gauge_t *rates = NULL; - - /* Initialize the Msg structure. */ - msg = malloc (sizeof (*msg)); - if (msg == NULL) - { - ERROR ("riemann plugin: malloc failed."); - return (NULL); - } - memset (msg, 0, sizeof (*msg)); - msg__init (msg); - - /* Set up events. First, the list of pointers. */ - msg->n_events = (size_t) vl->values_len; - msg->events = calloc (msg->n_events, sizeof (*msg->events)); - if (msg->events == NULL) - { - ERROR ("riemann plugin: calloc failed."); - riemann_msg_protobuf_free (msg); - return (NULL); - } - - if (host->store_rates) - { - rates = uc_get_rate (ds, vl); - if (rates == NULL) - { - ERROR ("riemann plugin: uc_get_rate failed."); - riemann_msg_protobuf_free (msg); - return (NULL); - } - } - - for (i = 0; i < msg->n_events; i++) - { - msg->events[i] = riemann_value_to_protobuf (host, ds, vl, - (int) i, rates); - if (msg->events[i] == NULL) - { - riemann_msg_protobuf_free (msg); - sfree (rates); - return (NULL); - } - } - - sfree (rates); - return (msg); -} /* }}} Msg *riemann_value_list_to_protobuf */ - -static int -riemann_notification(const notification_t *n, user_data_t *ud) -{ - int status; - struct riemann_host *host = ud->data; - Msg *msg; - - msg = riemann_notification_to_protobuf (host, n); - if (msg == NULL) - return (-1); - - status = riemann_send (host, msg); - if (status != 0) - ERROR ("riemann plugin: riemann_send failed with status %i", - status); - - riemann_msg_protobuf_free (msg); - return (status); -} /* }}} int riemann_notification */ - -static int -riemann_write(const data_set_t *ds, - const value_list_t *vl, - user_data_t *ud) -{ - int status; - struct riemann_host *host = ud->data; - Msg *msg; - - msg = riemann_value_list_to_protobuf (host, ds, vl); - if (msg == NULL) - return (-1); - - status = riemann_send (host, msg); - if (status != 0) - ERROR ("riemann plugin: riemann_send failed with status %i", - status); - - riemann_msg_protobuf_free (msg); - return status; -} - -/* host->lock must be held when calling this function. */ -static int -riemann_connect(struct riemann_host *host) -{ - int e; - struct addrinfo *ai, *res, hints; - char const *service; - - if (host->flags & F_CONNECT) - return 0; - - memset(&hints, 0, sizeof(hints)); - memset(&service, 0, sizeof(service)); - hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; -#ifdef AI_ADDRCONFIG - hints.ai_flags |= AI_ADDRCONFIG; -#endif - - assert (host->node != NULL); - service = (host->service != NULL) ? host->service : RIEMANN_PORT; - - if ((e = getaddrinfo(host->node, service, &hints, &res)) != 0) { - ERROR ("riemann plugin: Unable to resolve host \"%s\": %s", - host->node, gai_strerror(e)); - return -1; - } - - host->s = -1; - for (ai = res; ai != NULL; ai = ai->ai_next) { - if ((host->s = socket(ai->ai_family, - ai->ai_socktype, - ai->ai_protocol)) == -1) { - continue; - } - - if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) { - close(host->s); - host->s = -1; - continue; - } - - host->flags |= F_CONNECT; - DEBUG("riemann plugin: got a succesful connection for: %s:%s", - host->node, service); - break; - } - - freeaddrinfo(res); - - if (host->s < 0) { - WARNING("riemann plugin: Unable to connect to Riemann at %s:%s", - host->node, service); - return -1; - } - return 0; -} - -/* host->lock must be held when calling this function. */ -static int -riemann_disconnect (struct riemann_host *host) -{ - if ((host->flags & F_CONNECT) == 0) - return (0); - - close (host->s); - host->s = -1; - host->flags &= ~F_CONNECT; - - return (0); -} - -static void -riemann_free(void *p) -{ - struct riemann_host *host = p; - - if (host == NULL) - return; - - pthread_mutex_lock (&host->lock); - - host->reference_count--; - if (host->reference_count > 0) - { - pthread_mutex_unlock (&host->lock); - return; - } - - riemann_disconnect (host); - - sfree(host->service); - pthread_mutex_destroy (&host->lock); - sfree(host); -} - -static int -riemann_config_host(oconfig_item_t *ci) -{ - struct riemann_host *host = NULL; - int status = 0; - int i; - oconfig_item_t *child; - char w_cb_name[DATA_MAX_NAME_LEN]; - char n_cb_name[DATA_MAX_NAME_LEN]; - user_data_t ud; - - if (ci->values_num != 1 || - ci->values[0].type != OCONFIG_TYPE_STRING) { - WARNING("riemann hosts need one string argument"); - return -1; - } - - if ((host = calloc(1, sizeof (*host))) == NULL) { - WARNING("riemann host allocation failed"); - return ENOMEM; - } - pthread_mutex_init (&host->lock, NULL); - host->reference_count = 1; - host->node = NULL; - host->service = NULL; - host->delay = RIEMANN_DELAY; - - status = cf_util_get_string (ci, &host->node); - if (status != 0) { - WARNING("riemann plugin: Required host name is missing."); - riemann_free (host); - return -1; - } - - for (i = 0; i < ci->children_num; i++) { - /* - * The code here could be simplified but makes room - * for easy adding of new options later on. - */ - child = &ci->children[i]; - status = 0; - - if (strcasecmp(child->key, "port") == 0) { - status = cf_util_get_service (child, &host->service); - if (status != 0) { - ERROR ("riemann plugin: Invalid argument " - "configured for the \"Port\" " - "option."); - break; - } - } else if (strcasecmp(child->key, "delay") == 0) { - if ((status = cf_util_get_int(ci, &host->delay)) != 0) - break; - } else if (strcasecmp ("StoreRates", child->key) == 0) { - status = cf_util_get_boolean (ci, &host->store_rates); - if (status != 0) - break; - } else { - WARNING("riemann plugin: ignoring unknown config " - "option: \"%s\"", child->key); - } - } - if (status != 0) { - riemann_free (host); - return status; - } - - ssnprintf(w_cb_name, sizeof(w_cb_name), "write-riemann/%s:%s", - host->node, - (host->service != NULL) ? host->service : RIEMANN_PORT); - ssnprintf(n_cb_name, sizeof(n_cb_name), "notification-riemann/%s:%s", - host->node, - (host->service != NULL) ? host->service : RIEMANN_PORT); - DEBUG("riemann w_cb_name: %s", w_cb_name); - DEBUG("riemann n_cb_name: %s", n_cb_name); - ud.data = host; - ud.free_func = riemann_free; - - pthread_mutex_lock (&host->lock); - - status = plugin_register_write (w_cb_name, riemann_write, &ud); - if (status != 0) - WARNING ("riemann plugin: plugin_register_write (\"%s\") " - "failed with status %i.", - w_cb_name, status); - else /* success */ - host->reference_count++; - - status = plugin_register_notification (n_cb_name, - riemann_notification, &ud); - if (status != 0) - WARNING ("riemann plugin: plugin_register_notification (\"%s\") " - "failed with status %i.", - n_cb_name, status); - else /* success */ - host->reference_count++; - - if (host->reference_count <= 1) - { - /* Both callbacks failed => free memory. - * We need to unlock here, because riemann_free() will lock. - * This is not a race condition, because we're the only one - * holding a reference. */ - pthread_mutex_unlock (&host->lock); - riemann_free (host); - return (-1); - } - - host->reference_count--; - pthread_mutex_unlock (&host->lock); - - return status; -} - -static int -riemann_config(oconfig_item_t *ci) -{ - int i; - oconfig_item_t *child; - int status; - - for (i = 0; i < ci->children_num; i++) { - child = &ci->children[i]; - - if (strcasecmp(child->key, "host") == 0) { - riemann_config_host(child); - } else if (strcasecmp(child->key, "tag") == 0) { - char *tmp = NULL; - status = cf_util_get_string(child, &tmp); - if (status != 0) - continue; - - strarray_add (&riemann_tags, &riemann_tags_num, tmp); - DEBUG("riemann plugin: Got tag: %s", tmp); - sfree (tmp); - } else { - WARNING ("riemann plugin: Ignoring unknown " - "configuration option \"%s\" at top level.", - child->key); - } - } - return (0); -} - -void -module_register(void) -{ - DEBUG("riemann: module_register"); - - plugin_register_complex_config ("riemann", riemann_config); -} - -/* vim: set sw=8 sts=8 ts=8 noet : */ diff --git a/src/write_riemann.c b/src/write_riemann.c new file mode 100644 index 00000000..c1844299 --- /dev/null +++ b/src/write_riemann.c @@ -0,0 +1,687 @@ +/* + * collectd - src/riemann.c + * + * Copyright (C) 2012 Pierre-Yves Ritschard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER + * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING + * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + */ + +#include "collectd.h" +#include "plugin.h" +#include "common.h" +#include "configfile.h" +#include "utils_cache.h" +#include "riemann.pb-c.h" + +#include +#include +#include +#include +#include +#include + +#define RIEMANN_DELAY 1 +#define RIEMANN_PORT "5555" +#define RIEMANN_MAX_TAGS 37 +#define RIEMANN_EXTRA_TAGS 32 + +struct riemann_host { +#define F_CONNECT 0x01 + uint8_t flags; + pthread_mutex_t lock; + int delay; + _Bool store_rates; + char *node; + char *service; + int s; + + int reference_count; +}; + +static char **riemann_tags; +static size_t riemann_tags_num; + +static int riemann_send(struct riemann_host *, Msg const *); +static int riemann_notification(const notification_t *, user_data_t *); +static int riemann_write(const data_set_t *, const value_list_t *, user_data_t *); +static int riemann_connect(struct riemann_host *); +static int riemann_disconnect (struct riemann_host *host); +static void riemann_free(void *); +static int riemann_config_host(oconfig_item_t *); +static int riemann_config(oconfig_item_t *); +void module_register(void); + +static void riemann_event_protobuf_free (Event *event) /* {{{ */ +{ + if (event == NULL) + return; + + sfree (event->state); + sfree (event->service); + sfree (event->host); + sfree (event->description); + + strarray_free (event->tags, event->n_tags); + event->tags = NULL; + event->n_tags = 0; + + sfree (event); +} /* }}} void riemann_event_protobuf_free */ + +static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */ +{ + size_t i; + + if (msg == NULL) + return; + + for (i = 0; i < msg->n_events; i++) + { + riemann_event_protobuf_free (msg->events[i]); + msg->events[i] = NULL; + } + + sfree (msg->events); + msg->n_events = 0; + + sfree (msg); +} /* }}} void riemann_msg_protobuf_free */ + +static int +riemann_send(struct riemann_host *host, Msg const *msg) +{ + u_char *buffer; + size_t buffer_len; + int status; + + pthread_mutex_lock (&host->lock); + + status = riemann_connect (host); + if (status != 0) + { + pthread_mutex_unlock (&host->lock); + return status; + } + + buffer_len = msg__get_packed_size(msg); + buffer = malloc (buffer_len); + if (buffer == NULL) { + pthread_mutex_unlock (&host->lock); + ERROR ("riemann plugin: malloc failed."); + return ENOMEM; + } + memset (buffer, 0, buffer_len); + + msg__pack(msg, buffer); + + status = (int) swrite (host->s, buffer, buffer_len); + if (status != 0) + { + char errbuf[1024]; + + riemann_disconnect (host); + pthread_mutex_unlock (&host->lock); + + ERROR ("riemann plugin: Sending to Riemann at %s:%s failed: %s", + host->node, + (host->service != NULL) ? host->service : RIEMANN_PORT, + sstrerror (errno, errbuf, sizeof (errbuf))); + sfree (buffer); + return -1; + } + + pthread_mutex_unlock (&host->lock); + sfree (buffer); + return 0; +} + +static int riemann_event_add_tag (Event *event, /* {{{ */ + char const *format, ...) +{ + va_list ap; + char buffer[1024]; + size_t ret; + + va_start (ap, format); + ret = vsnprintf (buffer, sizeof (buffer), format, ap); + if (ret >= sizeof (buffer)) + ret = sizeof (buffer) - 1; + buffer[ret] = 0; + va_end (ap); + + return (strarray_add (&event->tags, &event->n_tags, buffer)); +} /* }}} int riemann_event_add_tag */ + +static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ */ + notification_t const *n) +{ + Msg *msg; + Event *event; + char service_buffer[6 * DATA_MAX_NAME_LEN]; + char const *severity; + notification_meta_t *meta; + int i; + + msg = malloc (sizeof (*msg)); + if (msg == NULL) + { + ERROR ("riemann plugin: malloc failed."); + return (NULL); + } + memset (msg, 0, sizeof (*msg)); + msg__init (msg); + + msg->events = malloc (sizeof (*msg->events)); + if (msg->events == NULL) + { + ERROR ("riemann plugin: malloc failed."); + sfree (msg); + return (NULL); + } + + event = malloc (sizeof (*event)); + if (event == NULL) + { + ERROR ("riemann plugin: malloc failed."); + sfree (msg->events); + sfree (msg); + return (NULL); + } + memset (event, 0, sizeof (*event)); + event__init (event); + + msg->events[0] = event; + msg->n_events = 1; + + event->host = strdup (n->host); + event->time = CDTIME_T_TO_TIME_T (n->time); + event->has_time = 1; + + switch (n->severity) + { + case NOTIF_OKAY: severity = "okay"; break; + case NOTIF_WARNING: severity = "warning"; break; + case NOTIF_FAILURE: severity = "failure"; break; + default: severity = "unknown"; + } + event->state = strdup (severity); + + riemann_event_add_tag (event, "notification"); + if (n->plugin[0] != 0) + riemann_event_add_tag (event, "plugin:%s", n->plugin); + if (n->plugin_instance[0] != 0) + riemann_event_add_tag (event, "plugin_instance:%s", + n->plugin_instance); + + if (n->type[0] != 0) + riemann_event_add_tag (event, "type:%s", n->type); + if (n->type_instance[0] != 0) + riemann_event_add_tag (event, "type_instance:%s", + n->type_instance); + + for (i = 0; i < riemann_tags_num; i++) + riemann_event_add_tag (event, "%s", riemann_tags[i]); + + /* TODO: Use FORMAT_VL() here. */ + ssnprintf (service_buffer, sizeof(service_buffer), + "%s-%s-%s-%s", n->plugin, n->plugin_instance, + n->type, n->type_instance); + event->service = strdup (service_buffer); + + /* Pull in values from threshold */ + for (meta = n->meta; meta != NULL; meta = meta->next) + { + if (strcasecmp ("CurrentValue", meta->name) != 0) + continue; + + event->metric_d = meta->nm_value.nm_double; + event->has_metric_d = 1; + break; + } + + DEBUG ("riemann plugin: Successfully created protobuf for notification: " + "host = \"%s\", service = \"%s\", state = \"%s\"", + event->host, event->service, event->state); + return (msg); +} /* }}} Msg *riemann_notification_to_protobuf */ + +static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */ + data_set_t const *ds, + value_list_t const *vl, size_t index, + gauge_t const *rates) +{ + Event *event; + char service_buffer[6 * DATA_MAX_NAME_LEN]; + int i; + + event = malloc (sizeof (*event)); + if (event == NULL) + { + ERROR ("riemann plugin: malloc failed."); + return (NULL); + } + memset (event, 0, sizeof (*event)); + event__init (event); + + event->host = strdup (vl->host); + event->time = CDTIME_T_TO_TIME_T (vl->time); + event->has_time = 1; + event->ttl = CDTIME_T_TO_TIME_T (vl->interval) + host->delay; + event->has_ttl = 1; + + riemann_event_add_tag (event, "plugin:%s", vl->plugin); + if (vl->plugin_instance[0] != 0) + riemann_event_add_tag (event, "plugin_instance:%s", + vl->plugin_instance); + + riemann_event_add_tag (event, "type:%s", vl->type); + if (vl->type_instance[0] != 0) + riemann_event_add_tag (event, "type_instance:%s", + vl->type_instance); + + if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) + { + riemann_event_add_tag (event, "ds_type:%s:rate", + DS_TYPE_TO_STRING(ds->ds[index].type)); + } + else + { + riemann_event_add_tag (event, "ds_type:%s", + DS_TYPE_TO_STRING(ds->ds[index].type)); + } + riemann_event_add_tag (event, "ds_name:%s", ds->ds[index].name); + riemann_event_add_tag (event, "ds_index:%zu", index); + + for (i = 0; i < riemann_tags_num; i++) + riemann_event_add_tag (event, "%s", riemann_tags[i]); + + if (ds->ds[index].type == DS_TYPE_GAUGE) + { + event->has_metric_d = 1; + event->metric_d = (double) vl->values[index].gauge; + } + else if (rates != NULL) + { + event->has_metric_d = 1; + event->metric_d = (double) rates[index]; + } + else + { + event->has_metric_sint64 = 1; + if (ds->ds[index].type == DS_TYPE_DERIVE) + event->metric_sint64 = (int64_t) vl->values[index].derive; + else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) + event->metric_sint64 = (int64_t) vl->values[index].absolute; + else + event->metric_sint64 = (int64_t) vl->values[index].counter; + } + + /* TODO: Use FORMAT_VL() here. */ + ssnprintf (service_buffer, sizeof(service_buffer), + "%s-%s-%s-%s-%s", vl->plugin, vl->plugin_instance, + vl->type, vl->type_instance, ds->ds[index].name); + event->service = strdup (service_buffer); + + DEBUG ("riemann plugin: Successfully created protobuf for metric: " + "host = \"%s\", service = \"%s\"", + event->host, event->service); + return (event); +} /* }}} Event *riemann_value_to_protobuf */ + +static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */ + data_set_t const *ds, + value_list_t const *vl) +{ + Msg *msg; + size_t i; + gauge_t *rates = NULL; + + /* Initialize the Msg structure. */ + msg = malloc (sizeof (*msg)); + if (msg == NULL) + { + ERROR ("riemann plugin: malloc failed."); + return (NULL); + } + memset (msg, 0, sizeof (*msg)); + msg__init (msg); + + /* Set up events. First, the list of pointers. */ + msg->n_events = (size_t) vl->values_len; + msg->events = calloc (msg->n_events, sizeof (*msg->events)); + if (msg->events == NULL) + { + ERROR ("riemann plugin: calloc failed."); + riemann_msg_protobuf_free (msg); + return (NULL); + } + + if (host->store_rates) + { + rates = uc_get_rate (ds, vl); + if (rates == NULL) + { + ERROR ("riemann plugin: uc_get_rate failed."); + riemann_msg_protobuf_free (msg); + return (NULL); + } + } + + for (i = 0; i < msg->n_events; i++) + { + msg->events[i] = riemann_value_to_protobuf (host, ds, vl, + (int) i, rates); + if (msg->events[i] == NULL) + { + riemann_msg_protobuf_free (msg); + sfree (rates); + return (NULL); + } + } + + sfree (rates); + return (msg); +} /* }}} Msg *riemann_value_list_to_protobuf */ + +static int +riemann_notification(const notification_t *n, user_data_t *ud) +{ + int status; + struct riemann_host *host = ud->data; + Msg *msg; + + msg = riemann_notification_to_protobuf (host, n); + if (msg == NULL) + return (-1); + + status = riemann_send (host, msg); + if (status != 0) + ERROR ("riemann plugin: riemann_send failed with status %i", + status); + + riemann_msg_protobuf_free (msg); + return (status); +} /* }}} int riemann_notification */ + +static int +riemann_write(const data_set_t *ds, + const value_list_t *vl, + user_data_t *ud) +{ + int status; + struct riemann_host *host = ud->data; + Msg *msg; + + msg = riemann_value_list_to_protobuf (host, ds, vl); + if (msg == NULL) + return (-1); + + status = riemann_send (host, msg); + if (status != 0) + ERROR ("riemann plugin: riemann_send failed with status %i", + status); + + riemann_msg_protobuf_free (msg); + return status; +} + +/* host->lock must be held when calling this function. */ +static int +riemann_connect(struct riemann_host *host) +{ + int e; + struct addrinfo *ai, *res, hints; + char const *service; + + if (host->flags & F_CONNECT) + return 0; + + memset(&hints, 0, sizeof(hints)); + memset(&service, 0, sizeof(service)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; +#ifdef AI_ADDRCONFIG + hints.ai_flags |= AI_ADDRCONFIG; +#endif + + assert (host->node != NULL); + service = (host->service != NULL) ? host->service : RIEMANN_PORT; + + if ((e = getaddrinfo(host->node, service, &hints, &res)) != 0) { + ERROR ("riemann plugin: Unable to resolve host \"%s\": %s", + host->node, gai_strerror(e)); + return -1; + } + + host->s = -1; + for (ai = res; ai != NULL; ai = ai->ai_next) { + if ((host->s = socket(ai->ai_family, + ai->ai_socktype, + ai->ai_protocol)) == -1) { + continue; + } + + if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) { + close(host->s); + host->s = -1; + continue; + } + + host->flags |= F_CONNECT; + DEBUG("riemann plugin: got a succesful connection for: %s:%s", + host->node, service); + break; + } + + freeaddrinfo(res); + + if (host->s < 0) { + WARNING("riemann plugin: Unable to connect to Riemann at %s:%s", + host->node, service); + return -1; + } + return 0; +} + +/* host->lock must be held when calling this function. */ +static int +riemann_disconnect (struct riemann_host *host) +{ + if ((host->flags & F_CONNECT) == 0) + return (0); + + close (host->s); + host->s = -1; + host->flags &= ~F_CONNECT; + + return (0); +} + +static void +riemann_free(void *p) +{ + struct riemann_host *host = p; + + if (host == NULL) + return; + + pthread_mutex_lock (&host->lock); + + host->reference_count--; + if (host->reference_count > 0) + { + pthread_mutex_unlock (&host->lock); + return; + } + + riemann_disconnect (host); + + sfree(host->service); + pthread_mutex_destroy (&host->lock); + sfree(host); +} + +static int +riemann_config_host(oconfig_item_t *ci) +{ + struct riemann_host *host = NULL; + int status = 0; + int i; + oconfig_item_t *child; + char w_cb_name[DATA_MAX_NAME_LEN]; + char n_cb_name[DATA_MAX_NAME_LEN]; + user_data_t ud; + + if (ci->values_num != 1 || + ci->values[0].type != OCONFIG_TYPE_STRING) { + WARNING("riemann hosts need one string argument"); + return -1; + } + + if ((host = calloc(1, sizeof (*host))) == NULL) { + WARNING("riemann host allocation failed"); + return ENOMEM; + } + pthread_mutex_init (&host->lock, NULL); + host->reference_count = 1; + host->node = NULL; + host->service = NULL; + host->delay = RIEMANN_DELAY; + + status = cf_util_get_string (ci, &host->node); + if (status != 0) { + WARNING("riemann plugin: Required host name is missing."); + riemann_free (host); + return -1; + } + + for (i = 0; i < ci->children_num; i++) { + /* + * The code here could be simplified but makes room + * for easy adding of new options later on. + */ + child = &ci->children[i]; + status = 0; + + if (strcasecmp(child->key, "port") == 0) { + status = cf_util_get_service (child, &host->service); + if (status != 0) { + ERROR ("riemann plugin: Invalid argument " + "configured for the \"Port\" " + "option."); + break; + } + } else if (strcasecmp(child->key, "delay") == 0) { + if ((status = cf_util_get_int(ci, &host->delay)) != 0) + break; + } else if (strcasecmp ("StoreRates", child->key) == 0) { + status = cf_util_get_boolean (ci, &host->store_rates); + if (status != 0) + break; + } else { + WARNING("riemann plugin: ignoring unknown config " + "option: \"%s\"", child->key); + } + } + if (status != 0) { + riemann_free (host); + return status; + } + + ssnprintf(w_cb_name, sizeof(w_cb_name), "write-riemann/%s:%s", + host->node, + (host->service != NULL) ? host->service : RIEMANN_PORT); + ssnprintf(n_cb_name, sizeof(n_cb_name), "notification-riemann/%s:%s", + host->node, + (host->service != NULL) ? host->service : RIEMANN_PORT); + DEBUG("riemann w_cb_name: %s", w_cb_name); + DEBUG("riemann n_cb_name: %s", n_cb_name); + ud.data = host; + ud.free_func = riemann_free; + + pthread_mutex_lock (&host->lock); + + status = plugin_register_write (w_cb_name, riemann_write, &ud); + if (status != 0) + WARNING ("riemann plugin: plugin_register_write (\"%s\") " + "failed with status %i.", + w_cb_name, status); + else /* success */ + host->reference_count++; + + status = plugin_register_notification (n_cb_name, + riemann_notification, &ud); + if (status != 0) + WARNING ("riemann plugin: plugin_register_notification (\"%s\") " + "failed with status %i.", + n_cb_name, status); + else /* success */ + host->reference_count++; + + if (host->reference_count <= 1) + { + /* Both callbacks failed => free memory. + * We need to unlock here, because riemann_free() will lock. + * This is not a race condition, because we're the only one + * holding a reference. */ + pthread_mutex_unlock (&host->lock); + riemann_free (host); + return (-1); + } + + host->reference_count--; + pthread_mutex_unlock (&host->lock); + + return status; +} + +static int +riemann_config(oconfig_item_t *ci) +{ + int i; + oconfig_item_t *child; + int status; + + for (i = 0; i < ci->children_num; i++) { + child = &ci->children[i]; + + if (strcasecmp(child->key, "host") == 0) { + riemann_config_host(child); + } else if (strcasecmp(child->key, "tag") == 0) { + char *tmp = NULL; + status = cf_util_get_string(child, &tmp); + if (status != 0) + continue; + + strarray_add (&riemann_tags, &riemann_tags_num, tmp); + DEBUG("riemann plugin: Got tag: %s", tmp); + sfree (tmp); + } else { + WARNING ("riemann plugin: Ignoring unknown " + "configuration option \"%s\" at top level.", + child->key); + } + } + return (0); +} + +void +module_register(void) +{ + DEBUG("riemann: module_register"); + + plugin_register_complex_config ("riemann", riemann_config); +} + +/* vim: set sw=8 sts=8 ts=8 noet : */