summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: c10c83d)
raw | patch | inline | side by side (parent: c10c83d)
author | Gergely Nagy <algernon@madhouse-project.org> | |
Tue, 5 Apr 2016 07:26:24 +0000 (09:26 +0200) | ||
committer | Gergely Nagy <algernon@madhouse-project.org> | |
Wed, 6 Apr 2016 07:34:22 +0000 (09:34 +0200) |
Instead of implementing a collectd-specific riemann client, use the
riemann-c-client library to talk to Riemann.
This also adds support for TLS, as an added bonus, and makes it easier to
improve the plugin later on.
Signed-off-by: Gergely Nagy <algernon@madhouse-project.org>
riemann-c-client library to talk to Riemann.
This also adds support for TLS, as an added bonus, and makes it easier to
improve the plugin later on.
Signed-off-by: Gergely Nagy <algernon@madhouse-project.org>
README | patch | blob | history | |
configure.ac | patch | blob | history | |
src/Makefile.am | patch | blob | history | |
src/write_riemann.c | patch | blob | history |
index 76b5487a442dabed57f21585c842d76d80ac9adc..5f2494544a947a364d053eebee75c180b0bd87b6 100644 (file)
--- a/README
+++ b/README
* libprotobuf-c, protoc-c (optional)
Used by the `pinba' plugin to generate a parser for the network packets
- sent by the Pinba PHP extension, and by the `write_riemann' plugin to
- generate events to be sent to a Riemann server.
+ sent by the Pinba PHP extension.
<http://code.google.com/p/protobuf-c/>
* libpython (optional)
`varnish' plugin.
<http://varnish-cache.org>
+ * riemann-c-client (optional)
+ For the `write_riemann' plugin.
+ <https://github.com/algernon/riemann-c-client>
+
Configuring / Compiling / Installing
------------------------------------
diff --git a/configure.ac b/configure.ac
index 333f29aedb22bb05f7df97a71d4da3b5a602fb75..fbd82ab99d28b8befcf915e2e46f682783319b49 100644 (file)
--- a/configure.ac
+++ b/configure.ac
[with_libnotify="no (pkg-config doesn't know libnotify)"]
)
+PKG_CHECK_MODULES([RIEMANN_C], [riemann-client >= 1.5.0],
+ [with_riemann_c="yes"],
+ [with_riemann_c="no (pbg-config doesn't know riemann-c-client)"])
+
# Check for enabled/disabled features
#
AC_PLUGIN([write_log], [yes], [Log output plugin])
AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin])
AC_PLUGIN([write_redis], [$with_libhiredis], [Redis output plugin])
-AC_PLUGIN([write_riemann], [$have_protoc_c], [Riemann output plugin])
+AC_PLUGIN([write_riemann], [$with_riemann_c], [Riemann output plugin])
AC_PLUGIN([write_sensu], [yes], [Sensu output plugin])
AC_PLUGIN([write_tsdb], [yes], [TSDB output plugin])
AC_PLUGIN([xmms], [$with_libxmms], [XMMS statistics])
oracle . . . . . . . $with_oracle
protobuf-c . . . . . $have_protoc_c
python . . . . . . . $with_python
+ riemann-c-client . . $with_riemann_c
Features:
daemon mode . . . . . $enable_daemon
diff --git a/src/Makefile.am b/src/Makefile.am
index 3832bd241dea8ddcc8542fe743238e822a9b290f..073b38013d2e8fcbd1dc2dc0f4eb3c1e0d563a47 100644 (file)
--- a/src/Makefile.am
+++ b/src/Makefile.am
if BUILD_PLUGIN_WRITE_RIEMANN
pkglib_LTLIBRARIES += write_riemann.la
write_riemann_la_SOURCES = write_riemann.c write_riemann_threshold.c write_riemann_threshold.h
-nodist_write_riemann_la_SOURCES = riemann.pb-c.c riemann.pb-c.h
-write_riemann_la_LDFLAGS = $(PLUGIN_LDFLAGS)
-write_riemann_la_LIBADD = -lprotobuf-c
+write_riemann_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(RIEMANN_C_LIBS)
+write_riemann_la_CFLAGS = $(AM_CFLAGS) $(RIEMANN_C_CFLAGS)
endif
if BUILD_PLUGIN_WRITE_SENSU
diff --git a/src/write_riemann.c b/src/write_riemann.c
index 8191ae256d61caec939dacd322c82fab8ca49fba..f7b0388d9e37c3fecadc9c37e6ad710102d006c5 100644 (file)
--- a/src/write_riemann.c
+++ b/src/write_riemann.c
* collectd - src/write_riemann.c
* Copyright (C) 2012,2013 Pierre-Yves Ritschard
* Copyright (C) 2013 Florian octo Forster
+ * Copyright (C) 2015,2016 Gergely Nagy
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* Authors:
* Pierre-Yves Ritschard <pyr at spootnik.org>
* Florian octo Forster <octo at collectd.org>
+ * Gergely Nagy <algernon at madhouse-project.org>
*/
-#include <arpa/inet.h>
+#include <riemann/riemann-client.h>
#include <errno.h>
-#include <netdb.h>
-#include <inttypes.h>
#include <pthread.h>
#include "collectd.h"
#include "common.h"
#include "configfile.h"
#include "utils_cache.h"
-#include "riemann.pb-c.h"
#include "write_riemann_threshold.h"
#define RIEMANN_HOST "localhost"
-#define RIEMANN_PORT "5555"
+#define RIEMANN_PORT 5555
#define RIEMANN_TTL_FACTOR 2.0
#define RIEMANN_BATCH_MAX 8192
struct riemann_host {
char *name;
char *event_service_prefix;
-#define F_CONNECT 0x01
- uint8_t flags;
pthread_mutex_t lock;
_Bool batch_mode;
_Bool notifications;
_Bool store_rates;
_Bool always_append_ds;
char *node;
- char *service;
- _Bool use_tcp;
- int s;
+ int port;
+ riemann_client_type_t client_type;
+ riemann_client_t *client;
double ttl_factor;
- Msg *batch_msg;
cdtime_t batch_init;
int batch_max;
int reference_count;
+ riemann_message_t *batch_msg;
+ char *tls_ca_file;
+ char *tls_cert_file;
+ char *tls_key_file;
};
static char **riemann_tags;
static char **riemann_attrs;
static size_t riemann_attrs_num;
-static void riemann_event_protobuf_free (Event *event) /* {{{ */
-{
- size_t i;
-
- if (event == NULL)
- return;
-
- sfree (event->state);
- sfree (event->service);
- sfree (event->host);
- sfree (event->description);
-
- strarray_free (event->tags, event->n_tags);
- event->tags = NULL;
- event->n_tags = 0;
-
- for (i = 0; i < event->n_attributes; i++)
- {
- sfree (event->attributes[i]->key);
- sfree (event->attributes[i]->value);
- sfree (event->attributes[i]);
- }
- sfree (event->attributes);
- event->n_attributes = 0;
-
- sfree (event);
-} /* }}} void riemann_event_protobuf_free */
-
-static void riemann_msg_protobuf_free(Msg *msg) /* {{{ */
-{
- size_t i;
-
- if (msg == NULL)
- return;
-
- for (i = 0; i < msg->n_events; i++)
- {
- riemann_event_protobuf_free (msg->events[i]);
- msg->events[i] = NULL;
- }
-
- sfree (msg->events);
- msg->n_events = 0;
-
- sfree (msg);
-} /* }}} void riemann_msg_protobuf_free */
-
/* host->lock must be held when calling this function. */
-static int riemann_connect(struct riemann_host *host) /* {{{ */
+static int wrr_connect(struct riemann_host *host) /* {{{ */
{
- int e;
- struct addrinfo *ai, *res, hints;
char const *node;
- char const *service;
+ int port;
- if (host->flags & F_CONNECT)
+ if (host->client)
return 0;
- memset(&hints, 0, sizeof(hints));
- memset(&service, 0, sizeof(service));
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = host->use_tcp ? SOCK_STREAM : SOCK_DGRAM;
-#ifdef AI_ADDRCONFIG
- hints.ai_flags |= AI_ADDRCONFIG;
-#endif
-
node = (host->node != NULL) ? host->node : RIEMANN_HOST;
- service = (host->service != NULL) ? host->service : RIEMANN_PORT;
-
- if ((e = getaddrinfo(node, service, &hints, &res)) != 0) {
- ERROR ("write_riemann plugin: Unable to resolve host \"%s\": %s",
- node, gai_strerror(e));
+ port = (host->port) ? host->port : RIEMANN_PORT;
+
+ host->client = NULL;
+
+ host->client = riemann_client_create(host->client_type, node, port,
+ RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
+ RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
+ RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
+ RIEMANN_CLIENT_OPTION_NONE);
+ if (host->client == NULL) {
+ WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
+ node, port);
return -1;
}
+ DEBUG("write_riemann plugin: got a successful connection for: %s:%d",
+ node, port);
- host->s = -1;
- for (ai = res; ai != NULL; ai = ai->ai_next) {
- if ((host->s = socket(ai->ai_family,
- ai->ai_socktype,
- ai->ai_protocol)) == -1) {
- continue;
- }
-
- if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
- close(host->s);
- host->s = -1;
- continue;
- }
-
- host->flags |= F_CONNECT;
- DEBUG("write_riemann plugin: got a successful connection for: %s:%s",
- node, service);
- break;
- }
-
- freeaddrinfo(res);
-
- if (host->s < 0) {
- WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%s",
- node, service);
- return -1;
- }
return 0;
-} /* }}} int riemann_connect */
+} /* }}} int wrr_connect */
/* host->lock must be held when calling this function. */
-static int riemann_disconnect (struct riemann_host *host) /* {{{ */
+static int wrr_disconnect(struct riemann_host *host) /* {{{ */
{
- if ((host->flags & F_CONNECT) == 0)
+ if (!host->client)
return (0);
- close (host->s);
- host->s = -1;
- host->flags &= ~F_CONNECT;
+ riemann_client_free(host->client);
+ host->client = NULL;
return (0);
-} /* }}} int riemann_disconnect */
-
-static int riemann_send_msg (struct riemann_host *host, const Msg *msg) /* {{{ */
-{
- int status = 0;
- u_char *buffer = NULL;
- size_t buffer_len;
-
- status = riemann_connect (host);
- if (status != 0)
- return status;
-
- buffer_len = msg__get_packed_size(msg);
-
- if (host->use_tcp)
- buffer_len += 4;
-
- buffer = calloc (1, buffer_len);
- if (buffer == NULL) {
- ERROR ("write_riemann plugin: calloc failed.");
- return ENOMEM;
- }
-
- if (host->use_tcp)
- {
- uint32_t length = htonl ((uint32_t) (buffer_len - 4));
- memcpy (buffer, &length, 4);
- msg__pack(msg, buffer + 4);
- }
- else
- {
- msg__pack(msg, buffer);
- }
-
- status = (int) swrite (host->s, buffer, buffer_len);
- if (status != 0)
- {
- char errbuf[1024];
- ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
- (host->node != NULL) ? host->node : RIEMANN_HOST,
- (host->service != NULL) ? host->service : RIEMANN_PORT,
- sstrerror (errno, errbuf, sizeof (errbuf)));
- sfree (buffer);
- return -1;
- }
-
- sfree (buffer);
- return 0;
-} /* }}} int riemann_send_msg */
-
-static int riemann_recv_ack(struct riemann_host *host) /* {{{ */
-{
- int status = 0;
- Msg *msg = NULL;
- uint32_t header;
-
- status = (int) sread (host->s, &header, 4);
-
- if (status != 0)
- return -1;
-
- size_t size = ntohl(header);
-
- // Buffer on the stack since acknowledges are typically small.
- u_char buffer[size];
- memset (buffer, 0, size);
-
- status = (int) sread (host->s, buffer, size);
-
- if (status != 0)
- return status;
-
- msg = msg__unpack (NULL, size, buffer);
-
- if (msg == NULL)
- return -1;
-
- if (!msg->ok)
- {
- ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s",
- (host->node != NULL) ? host->node : RIEMANN_HOST,
- (host->service != NULL) ? host->service : RIEMANN_PORT,
- msg->error);
-
- msg__free_unpacked(msg, NULL);
- return -1;
- }
-
- msg__free_unpacked (msg, NULL);
- return 0;
-} /* }}} int riemann_recv_ack */
+} /* }}} int wrr_disconnect */
/**
- * Function to send messages (Msg) to riemann.
+ * Function to send messages to riemann.
*
* Acquires the host lock, disconnects on errors.
*/
-static int riemann_send(struct riemann_host *host, Msg const *msg) /* {{{ */
+static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
{
int status = 0;
pthread_mutex_lock (&host->lock);
- status = riemann_send_msg(host, msg);
+ status = wrr_connect(host);
+ if (status != 0)
+ return status;
+
+ status = riemann_client_send_message(host->client, msg);
if (status != 0) {
- riemann_disconnect (host);
- pthread_mutex_unlock (&host->lock);
+ wrr_disconnect(host);
+ pthread_mutex_unlock(&host->lock);
return status;
}
/*
* For TCP we need to receive message acknowledgemenent.
*/
- if (host->use_tcp)
+ if (host->client_type != RIEMANN_CLIENT_UDP)
{
- status = riemann_recv_ack(host);
+ riemann_message_t *response;
- if (status != 0)
+ response = riemann_client_recv_message(host->client);
+
+ if (response == NULL)
{
- riemann_disconnect (host);
- pthread_mutex_unlock (&host->lock);
- return status;
+ wrr_disconnect(host);
+ pthread_mutex_unlock(&host->lock);
+ return errno;
}
+ riemann_message_free(response);
}
pthread_mutex_unlock (&host->lock);
return 0;
-} /* }}} int riemann_send */
-
-static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
-{
- return (strarray_add (&event->tags, &event->n_tags, tag));
-} /* }}} int riemann_event_add_tag */
-
-static int riemann_event_add_attribute(Event *event, /* {{{ */
- char const *key, char const *value)
-{
- Attribute **new_attributes;
- Attribute *a;
-
- new_attributes = realloc (event->attributes,
- sizeof (*event->attributes) * (event->n_attributes + 1));
- if (new_attributes == NULL)
- {
- ERROR ("write_riemann plugin: realloc failed.");
- return (ENOMEM);
- }
- event->attributes = new_attributes;
+} /* }}} int wrr_send */
- a = malloc (sizeof (*a));
- if (a == NULL)
- {
- ERROR ("write_riemann plugin: malloc failed.");
- return (ENOMEM);
- }
- attribute__init (a);
-
- a->key = strdup (key);
- if (value != NULL)
- a->value = strdup (value);
-
- event->attributes[event->n_attributes] = a;
- event->n_attributes++;
-
- return (0);
-} /* }}} int riemann_event_add_attribute */
-
-static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ */
+static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
notification_t const *n)
{
- Msg *msg;
- Event *event;
+ riemann_message_t *msg;
+ riemann_event_t *event;
char service_buffer[6 * DATA_MAX_NAME_LEN];
char const *severity;
notification_meta_t *meta;
size_t i;
- msg = calloc (1, sizeof (*msg));
- if (msg == NULL)
- {
- ERROR ("write_riemann plugin: calloc failed.");
- return (NULL);
- }
- msg__init (msg);
-
- msg->events = malloc (sizeof (*msg->events));
- if (msg->events == NULL)
- {
- ERROR ("write_riemann plugin: malloc failed.");
- sfree (msg);
- return (NULL);
- }
-
- event = calloc (1, sizeof (*event));
- if (event == NULL)
- {
- ERROR ("write_riemann plugin: calloc failed.");
- sfree (msg->events);
- sfree (msg);
- return (NULL);
- }
- event__init (event);
-
- msg->events[0] = event;
- msg->n_events = 1;
-
- event->host = strdup (n->host);
- event->time = CDTIME_T_TO_TIME_T (n->time);
- event->has_time = 1;
-
switch (n->severity)
{
case NOTIF_OKAY: severity = "ok"; break;
@@ -404,352 +174,389 @@ static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ *
case NOTIF_FAILURE: severity = "critical"; break;
default: severity = "unknown";
}
- event->state = strdup (severity);
- riemann_event_add_tag (event, "notification");
+ format_name(service_buffer, sizeof(service_buffer),
+ /* host = */ "", n->plugin, n->plugin_instance,
+ n->type, n->type_instance);
+
+ event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
+ RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
+ RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
+ RIEMANN_EVENT_FIELD_STATE, severity,
+ RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
+ RIEMANN_EVENT_FIELD_NONE);
+
if (n->host[0] != 0)
- riemann_event_add_attribute (event, "host", n->host);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("host", n->host));
if (n->plugin[0] != 0)
- riemann_event_add_attribute (event, "plugin", n->plugin);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("plugin", n->plugin));
if (n->plugin_instance[0] != 0)
- riemann_event_add_attribute (event, "plugin_instance",
- n->plugin_instance);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("plugin_instance",
+ n->plugin_instance));
if (n->type[0] != 0)
- riemann_event_add_attribute (event, "type", n->type);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("type", n->type));
if (n->type_instance[0] != 0)
- riemann_event_add_attribute (event, "type_instance",
- n->type_instance);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("type_instance",
+ n->type_instance));
for (i = 0; i < riemann_attrs_num; i += 2)
- riemann_event_add_attribute(event,
- riemann_attrs[i],
- riemann_attrs[i +1]);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create(riemann_attrs[i],
+ riemann_attrs[i +1]));
for (i = 0; i < riemann_tags_num; i++)
- riemann_event_add_tag (event, riemann_tags[i]);
-
- format_name (service_buffer, sizeof (service_buffer),
- /* host = */ "", n->plugin, n->plugin_instance,
- n->type, n->type_instance);
- event->service = strdup (&service_buffer[1]);
+ riemann_event_tag_add(event, riemann_tags[i]);
if (n->message[0] != 0)
- riemann_event_add_attribute (event, "description", n->message);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("description", n->message));
/* Pull in values from threshold and add extra attributes */
for (meta = n->meta; meta != NULL; meta = meta->next)
{
- if (strcasecmp ("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
+ if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
{
- event->metric_d = meta->nm_value.nm_double;
- event->has_metric_d = 1;
+ riemann_event_set(event,
+ RIEMANN_EVENT_FIELD_METRIC_D,
+ (double) meta->nm_value.nm_double,
+ RIEMANN_EVENT_FIELD_NONE);
continue;
}
if (meta->type == NM_TYPE_STRING) {
- riemann_event_add_attribute (event, meta->name, meta->nm_value.nm_string);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create(meta->name,
+ meta->nm_value.nm_string));
continue;
}
}
- DEBUG ("write_riemann plugin: Successfully created protobuf for notification: "
- "host = \"%s\", service = \"%s\", state = \"%s\"",
- event->host, event->service, event->state);
+ msg = riemann_message_create_with_events(event, NULL);
+ if (msg == NULL)
+ {
+ ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
+ riemann_event_free (event);
+ return (NULL);
+ }
+
+ DEBUG("write_riemann plugin: Successfully created message for notification: "
+ "host = \"%s\", service = \"%s\", state = \"%s\"",
+ event->host, event->service, event->state);
return (msg);
-} /* }}} Msg *riemann_notification_to_protobuf */
+} /* }}} riemann_message_t *wrr_notification_to_message */
-static Event *riemann_value_to_protobuf(struct riemann_host const *host, /* {{{ */
- data_set_t const *ds,
- value_list_t const *vl, size_t index,
- gauge_t const *rates,
- int status)
+static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
+ data_set_t const *ds,
+ value_list_t const *vl, size_t index,
+ gauge_t const *rates,
+ int status)
{
- Event *event;
+ riemann_event_t *event;
char name_buffer[5 * DATA_MAX_NAME_LEN];
char service_buffer[6 * DATA_MAX_NAME_LEN];
- double ttl;
size_t i;
- event = calloc (1, sizeof (*event));
+ event = riemann_event_new();
if (event == NULL)
{
- ERROR ("write_riemann plugin: calloc failed.");
+ ERROR("write_riemann plugin: riemann_event_new() failed.");
return (NULL);
}
- event__init (event);
- event->host = strdup (vl->host);
- event->time = CDTIME_T_TO_TIME_T (vl->time);
- event->has_time = 1;
+ format_name(name_buffer, sizeof(name_buffer),
+ /* host = */ "", vl->plugin, vl->plugin_instance,
+ vl->type, vl->type_instance);
+ if (host->always_append_ds || (ds->ds_num > 1))
+ {
+ if (host->event_service_prefix == NULL)
+ ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
+ &name_buffer[1], ds->ds[index].name);
+ else
+ ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
+ host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
+ }
+ else
+ {
+ if (host->event_service_prefix == NULL)
+ sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
+ else
+ ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
+ host->event_service_prefix, &name_buffer[1]);
+ }
+
+ riemann_event_set(event,
+ RIEMANN_EVENT_FIELD_HOST, vl->host,
+ RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
+ RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
+ RIEMANN_EVENT_FIELD_ATTRIBUTES,
+ riemann_attribute_create("plugin", vl->plugin),
+ riemann_attribute_create("type", vl->type),
+ riemann_attribute_create("ds_name", ds->ds[index].name),
+ NULL,
+ RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
+ RIEMANN_EVENT_FIELD_NONE);
if (host->check_thresholds) {
+ const char *state = NULL;
+
switch (status) {
case STATE_OKAY:
- event->state = strdup("ok");
+ state = "ok";
break;
case STATE_ERROR:
- event->state = strdup("critical");
+ state = "critical";
break;
case STATE_WARNING:
- event->state = strdup("warning");
+ state = "warning";
break;
case STATE_MISSING:
- event->state = strdup("unknown");
+ state = "unknown";
break;
}
+ if (state)
+ riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
+ RIEMANN_EVENT_FIELD_NONE);
}
- ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor;
- event->ttl = (float) ttl;
- event->has_ttl = 1;
-
- riemann_event_add_attribute (event, "plugin", vl->plugin);
if (vl->plugin_instance[0] != 0)
- riemann_event_add_attribute (event, "plugin_instance",
- vl->plugin_instance);
-
- riemann_event_add_attribute (event, "type", vl->type);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("plugin_instance",
+ vl->plugin_instance));
if (vl->type_instance[0] != 0)
- riemann_event_add_attribute (event, "type_instance",
- vl->type_instance);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("type_instance",
+ vl->type_instance));
if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
{
char ds_type[DATA_MAX_NAME_LEN];
- ssnprintf (ds_type, sizeof (ds_type), "%s:rate",
- DS_TYPE_TO_STRING(ds->ds[index].type));
- riemann_event_add_attribute (event, "ds_type", ds_type);
+ ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
+ DS_TYPE_TO_STRING(ds->ds[index].type));
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("ds_type", ds_type));
}
else
{
- riemann_event_add_attribute (event, "ds_type",
- DS_TYPE_TO_STRING(ds->ds[index].type));
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("ds_type",
+ DS_TYPE_TO_STRING(ds->ds[index].type)));
}
- riemann_event_add_attribute (event, "ds_name", ds->ds[index].name);
+
{
char ds_index[DATA_MAX_NAME_LEN];
- ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
- riemann_event_add_attribute (event, "ds_index", ds_index);
+ ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create("ds_index", ds_index));
}
for (i = 0; i < riemann_attrs_num; i += 2)
- riemann_event_add_attribute(event,
- riemann_attrs[i],
- riemann_attrs[i +1]);
+ riemann_event_attribute_add(event,
+ riemann_attribute_create(riemann_attrs[i],
+ riemann_attrs[i +1]));
for (i = 0; i < riemann_tags_num; i++)
- riemann_event_add_tag (event, riemann_tags[i]);
+ riemann_event_tag_add(event, riemann_tags[i]);
if (ds->ds[index].type == DS_TYPE_GAUGE)
{
- event->has_metric_d = 1;
- event->metric_d = (double) vl->values[index].gauge;
+ riemann_event_set(event,
+ RIEMANN_EVENT_FIELD_METRIC_D,
+ (double) vl->values[index].gauge,
+ RIEMANN_EVENT_FIELD_NONE);
}
else if (rates != NULL)
{
- event->has_metric_d = 1;
- event->metric_d = (double) rates[index];
+ riemann_event_set(event,
+ RIEMANN_EVENT_FIELD_METRIC_D,
+ (double) rates[index],
+ RIEMANN_EVENT_FIELD_NONE);
}
else
{
- event->has_metric_sint64 = 1;
+ int64_t metric;
+
if (ds->ds[index].type == DS_TYPE_DERIVE)
- event->metric_sint64 = (int64_t) vl->values[index].derive;
+ metric = (int64_t) vl->values[index].derive;
else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
- event->metric_sint64 = (int64_t) vl->values[index].absolute;
+ metric = (int64_t) vl->values[index].absolute;
else
- event->metric_sint64 = (int64_t) vl->values[index].counter;
- }
+ metric = (int64_t) vl->values[index].counter;
- format_name (name_buffer, sizeof (name_buffer),
- /* host = */ "", vl->plugin, vl->plugin_instance,
- vl->type, vl->type_instance);
- if (host->always_append_ds || (ds->ds_num > 1))
- {
- if (host->event_service_prefix == NULL)
- ssnprintf (service_buffer, sizeof (service_buffer), "%s/%s",
- &name_buffer[1], ds->ds[index].name);
- else
- ssnprintf (service_buffer, sizeof (service_buffer), "%s%s/%s",
- host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
- }
- else
- {
- if (host->event_service_prefix == NULL)
- sstrncpy (service_buffer, &name_buffer[1], sizeof (service_buffer));
- else
- ssnprintf (service_buffer, sizeof (service_buffer), "%s%s",
- host->event_service_prefix, &name_buffer[1]);
+ riemann_event_set(event,
+ RIEMANN_EVENT_FIELD_METRIC_S64,
+ (int64_t) metric,
+ RIEMANN_EVENT_FIELD_NONE);
}
- event->service = strdup (service_buffer);
-
- DEBUG ("write_riemann plugin: Successfully created protobuf for metric: "
- "host = \"%s\", service = \"%s\"",
- event->host, event->service);
+ DEBUG("write_riemann plugin: Successfully created message for metric: "
+ "host = \"%s\", service = \"%s\"",
+ event->host, event->service);
return (event);
-} /* }}} Event *riemann_value_to_protobuf */
+} /* }}} riemann_event_t *wrr_value_to_event */
-static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */
- data_set_t const *ds,
- value_list_t const *vl,
- int *statuses)
+static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
+ data_set_t const *ds,
+ value_list_t const *vl,
+ int *statuses)
{
- Msg *msg;
+ riemann_message_t *msg;
size_t i;
gauge_t *rates = NULL;
/* Initialize the Msg structure. */
- msg = calloc (1, sizeof (*msg));
+ msg = riemann_message_new();
if (msg == NULL)
{
- ERROR ("write_riemann plugin: calloc failed.");
- return (NULL);
- }
- msg__init (msg);
-
- /* Set up events. First, the list of pointers. */
- msg->n_events = vl->values_len;
- msg->events = calloc (msg->n_events, sizeof (*msg->events));
- if (msg->events == NULL)
- {
- ERROR ("write_riemann plugin: calloc failed.");
- riemann_msg_protobuf_free (msg);
+ ERROR ("write_riemann plugin: riemann_message_new failed.");
return (NULL);
}
if (host->store_rates)
{
- rates = uc_get_rate (ds, vl);
+ rates = uc_get_rate(ds, vl);
if (rates == NULL)
{
- ERROR ("write_riemann plugin: uc_get_rate failed.");
- riemann_msg_protobuf_free (msg);
+ ERROR("write_riemann plugin: uc_get_rate failed.");
+ riemann_message_free(msg);
return (NULL);
}
}
- for (i = 0; i < msg->n_events; i++)
+ for (i = 0; i < vl->values_len; i++)
{
- msg->events[i] = riemann_value_to_protobuf (host, ds, vl,
- (int) i, rates, statuses[i]);
- if (msg->events[i] == NULL)
+ riemann_event_t *event;
+
+ event = wrr_value_to_event(host, ds, vl,
+ (int) i, rates, statuses[i]);
+ if (event == NULL)
{
- riemann_msg_protobuf_free (msg);
- sfree (rates);
+ riemann_message_free(msg);
+ sfree(rates);
return (NULL);
}
+ riemann_message_append_events(msg, event, NULL);
}
- sfree (rates);
+ sfree(rates);
return (msg);
-} /* }}} Msg *riemann_value_list_to_protobuf */
-
+} /* }}} riemann_message_t *wrr_value_list_to_message */
/*
* Always call while holding host->lock !
*/
-static int riemann_batch_flush_nolock (cdtime_t timeout,
- struct riemann_host *host)
+static int wrr_batch_flush_nolock(cdtime_t timeout,
+ struct riemann_host *host)
{
- cdtime_t now;
- int status = 0;
+ cdtime_t now;
+ int status = 0;
- if (timeout > 0) {
- now = cdtime ();
- if ((host->batch_init + timeout) > now)
- return status;
- }
- riemann_send_msg(host, host->batch_msg);
- riemann_msg_protobuf_free(host->batch_msg);
+ if (timeout > 0) {
+ now = cdtime();
+ if ((host->batch_init + timeout) > now)
+ return status;
+ }
+ wrr_send(host, host->batch_msg);
+ riemann_message_free(host->batch_msg);
+
+ if (host->client_type != RIEMANN_CLIENT_UDP)
+ {
+ riemann_message_t *response;
- if (host->use_tcp && ((status = riemann_recv_ack(host)) != 0))
- riemann_disconnect (host);
+ response = riemann_client_recv_message(host->client);
- host->batch_init = cdtime();
- host->batch_msg = NULL;
- return status;
+ if (!response)
+ {
+ wrr_disconnect(host);
+ return errno;
+ }
+
+ riemann_message_free(response);
+ }
+
+ host->batch_init = cdtime();
+ host->batch_msg = NULL;
+ return status;
}
-static int riemann_batch_flush (cdtime_t timeout,
+static int wrr_batch_flush(cdtime_t timeout,
const char *identifier __attribute__((unused)),
user_data_t *user_data)
{
- struct riemann_host *host;
- int status;
+ struct riemann_host *host;
+ int status;
- if (user_data == NULL)
- return (-EINVAL);
+ if (user_data == NULL)
+ return (-EINVAL);
- host = user_data->data;
- pthread_mutex_lock (&host->lock);
- status = riemann_batch_flush_nolock (timeout, host);
- if (status != 0)
- ERROR ("write_riemann plugin: riemann_send failed with status %i",
- status);
+ host = user_data->data;
+ pthread_mutex_lock(&host->lock);
+ status = wrr_batch_flush_nolock(timeout, host);
+ if (status != 0)
+ ERROR("write_riemann plugin: riemann_client_send failed with status %i",
+ status);
- pthread_mutex_unlock(&host->lock);
- return status;
+ pthread_mutex_unlock(&host->lock);
+ return status;
}
-static int riemann_batch_add_value_list (struct riemann_host *host, /* {{{ */
- data_set_t const *ds,
- value_list_t const *vl,
- int *statuses)
+static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
+ data_set_t const *ds,
+ value_list_t const *vl,
+ int *statuses)
{
- size_t i;
- Event **events;
- Msg *msg;
- size_t len;
- int ret;
+ riemann_message_t *msg;
+ size_t len;
+ int ret;
- msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
- if (msg == NULL)
- return -1;
+ msg = wrr_value_list_to_message(host, ds, vl, statuses);
+ if (msg == NULL)
+ return -1;
- pthread_mutex_lock(&host->lock);
+ pthread_mutex_lock(&host->lock);
- if (host->batch_msg == NULL) {
- host->batch_msg = msg;
- } else {
- len = msg->n_events + host->batch_msg->n_events;
- events = realloc(host->batch_msg->events,
- (len * sizeof(*host->batch_msg->events)));
- if (events == NULL) {
- pthread_mutex_unlock(&host->lock);
- ERROR ("write_riemann plugin: out of memory");
- riemann_msg_protobuf_free (msg);
- return -1;
- }
- host->batch_msg->events = events;
+ if (host->batch_msg == NULL) {
+ host->batch_msg = msg;
+ } else {
+ int status;
- for (i = host->batch_msg->n_events; i < len; i++)
- host->batch_msg->events[i] = msg->events[i - host->batch_msg->n_events];
+ status = riemann_message_append_events_n(host->batch_msg,
+ msg->n_events,
+ msg->events);
+ msg->n_events = 0;
+ msg->events = NULL;
- host->batch_msg->n_events = len;
- sfree (msg->events);
- msg->n_events = 0;
- sfree (msg);
- }
+ riemann_message_free(msg);
- len = msg__get_packed_size(host->batch_msg);
- ret = 0;
- if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
- ret = riemann_batch_flush_nolock(0, host);
- }
+ if (status != 0) {
+ pthread_mutex_unlock(&host->lock);
+ ERROR("write_riemann plugin: out of memory");
+ return -1;
+ }
+ }
- pthread_mutex_unlock(&host->lock);
- return ret;
-} /* }}} Msg *riemann_batch_add_value_list */
+ len = protobuf_c_message_get_packed_size((const ProtobufCMessage*)(host->batch_msg));
+ ret = 0;
+ if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
+ ret = wrr_batch_flush_nolock(0, host);
+ }
+
+ pthread_mutex_unlock(&host->lock);
+ return ret;
+} /* }}} riemann_message_t *wrr_batch_add_value_list */
-static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ */
+static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
{
int status;
struct riemann_host *host = ud->data;
- Msg *msg;
+ riemann_message_t *msg;
if (!host->notifications)
return 0;
@@ -757,322 +564,352 @@ static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{
/*
* Never batch for notifications, send them ASAP
*/
- msg = riemann_notification_to_protobuf (host, n);
+ msg = wrr_notification_to_message(host, n);
if (msg == NULL)
return (-1);
- status = riemann_send (host, msg);
+ status = wrr_send(host, msg);
if (status != 0)
- ERROR ("write_riemann plugin: riemann_send failed with status %i",
- status);
+ ERROR("write_riemann plugin: riemann_client_send failed with status %i",
+ status);
- riemann_msg_protobuf_free (msg);
+ riemann_message_free(msg);
return (status);
-} /* }}} int riemann_notification */
+} /* }}} int wrr_notification */
-static int riemann_write(const data_set_t *ds, /* {{{ */
- const value_list_t *vl,
- user_data_t *ud)
+static int wrr_write(const data_set_t *ds, /* {{{ */
+ const value_list_t *vl,
+ user_data_t *ud)
{
int status = 0;
int statuses[vl->values_len];
struct riemann_host *host = ud->data;
+ riemann_message_t *msg;
if (host->check_thresholds) {
status = write_riemann_threshold_check(ds, vl, statuses);
- if (status != 0)
- return status;
- } else {
- memset (statuses, 0, sizeof (statuses));
- }
-
- if (host->use_tcp == 1 && host->batch_mode) {
- riemann_batch_add_value_list (host, ds, vl, statuses);
- } else {
- Msg *msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
- if (msg == NULL)
- return (-1);
-
- status = riemann_send (host, msg);
- if (status != 0)
- ERROR ("write_riemann plugin: riemann_send failed with status %i", status);
+ if (status != 0)
+ return status;
+ } else {
+ memset (statuses, 0, sizeof (statuses));
+ }
+
+ if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
+ wrr_batch_add_value_list(host, ds, vl, statuses);
+ } else {
+ msg = wrr_value_list_to_message(host, ds, vl, statuses);
+ if (msg == NULL)
+ return (-1);
- riemann_msg_protobuf_free (msg);
- }
+ status = wrr_send(host, msg);
+ if (status != 0)
+ ERROR("write_riemann plugin: riemann_client_send failed with status %i",
+ status);
- return status;
-} /* }}} int riemann_write */
+ riemann_message_free(msg);
+ }
+ return status;
+} /* }}} int wrr_write */
-static void riemann_free(void *p) /* {{{ */
+static void wrr_free(void *p) /* {{{ */
{
- struct riemann_host *host = p;
+ struct riemann_host *host = p;
- if (host == NULL)
- return;
+ if (host == NULL)
+ return;
- pthread_mutex_lock (&host->lock);
+ pthread_mutex_lock(&host->lock);
- host->reference_count--;
- if (host->reference_count > 0)
- {
- pthread_mutex_unlock (&host->lock);
- return;
- }
+ host->reference_count--;
+ if (host->reference_count > 0)
+ {
+ pthread_mutex_unlock(&host->lock);
+ return;
+ }
- riemann_disconnect (host);
+ wrr_disconnect(host);
- sfree(host->service);
- pthread_mutex_destroy (&host->lock);
- sfree(host);
-} /* }}} void riemann_free */
+ pthread_mutex_destroy(&host->lock);
+ sfree(host);
+} /* }}} void wrr_free */
-static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
+static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
{
- struct riemann_host *host = NULL;
- int status = 0;
- int i;
- oconfig_item_t *child;
- char callback_name[DATA_MAX_NAME_LEN];
- user_data_t ud;
-
- if ((host = calloc(1, sizeof (*host))) == NULL) {
- ERROR ("write_riemann plugin: calloc failed.");
- return ENOMEM;
- }
- pthread_mutex_init (&host->lock, NULL);
- host->reference_count = 1;
- host->node = NULL;
- host->service = NULL;
- host->notifications = 1;
- host->check_thresholds = 0;
- host->store_rates = 1;
- host->always_append_ds = 0;
- host->use_tcp = 1;
- host->batch_mode = 1;
- host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
- host->batch_init = cdtime();
- host->ttl_factor = RIEMANN_TTL_FACTOR;
-
- status = cf_util_get_string (ci, &host->name);
- if (status != 0) {
- WARNING("write_riemann plugin: Required host name is missing.");
- riemann_free (host);
- return -1;
- }
-
- for (i = 0; i < ci->children_num; i++) {
- /*
- * The code here could be simplified but makes room
- * for easy adding of new options later on.
- */
- child = &ci->children[i];
- status = 0;
-
- if (strcasecmp ("Host", child->key) == 0) {
- status = cf_util_get_string (child, &host->node);
- if (status != 0)
- break;
- } else if (strcasecmp ("Notifications", child->key) == 0) {
- status = cf_util_get_boolean(child, &host->notifications);
- if (status != 0)
- break;
- } else if (strcasecmp ("EventServicePrefix", child->key) == 0) {
- status = cf_util_get_string (child, &host->event_service_prefix);
- if (status != 0)
- break;
- } else if (strcasecmp ("CheckThresholds", child->key) == 0) {
- status = cf_util_get_boolean(child, &host->check_thresholds);
- if (status != 0)
- break;
- } else if (strcasecmp ("Batch", child->key) == 0) {
- status = cf_util_get_boolean(child, &host->batch_mode);
- if (status != 0)
- break;
- } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
- status = cf_util_get_int(child, &host->batch_max);
- if (status != 0)
- break;
- } else if (strcasecmp ("Port", child->key) == 0) {
- status = cf_util_get_service (child, &host->service);
- if (status != 0) {
- ERROR ("write_riemann plugin: Invalid argument "
- "configured for the \"Port\" "
- "option.");
- break;
- }
- } else if (strcasecmp ("Protocol", child->key) == 0) {
- char tmp[16];
- status = cf_util_get_string_buffer (child,
- tmp, sizeof (tmp));
- if (status != 0)
- {
- ERROR ("write_riemann plugin: cf_util_get_"
- "string_buffer failed with "
- "status %i.", status);
- break;
- }
-
- if (strcasecmp ("UDP", tmp) == 0)
- host->use_tcp = 0;
- else if (strcasecmp ("TCP", tmp) == 0)
- host->use_tcp = 1;
- else
- WARNING ("write_riemann plugin: The value "
- "\"%s\" is not valid for the "
- "\"Protocol\" option. Use "
- "either \"UDP\" or \"TCP\".",
- tmp);
- } else if (strcasecmp ("StoreRates", child->key) == 0) {
- status = cf_util_get_boolean (child, &host->store_rates);
- if (status != 0)
- break;
- } else if (strcasecmp ("AlwaysAppendDS", child->key) == 0) {
- status = cf_util_get_boolean (child,
- &host->always_append_ds);
- if (status != 0)
- break;
- } else if (strcasecmp ("TTLFactor", child->key) == 0) {
- double tmp = NAN;
- status = cf_util_get_double (child, &tmp);
- if (status != 0)
- break;
- if (tmp >= 2.0) {
- host->ttl_factor = tmp;
- } else if (tmp >= 1.0) {
- NOTICE ("write_riemann plugin: The configured "
- "TTLFactor is very small "
- "(%.1f). A value of 2.0 or "
- "greater is recommended.",
- tmp);
- host->ttl_factor = tmp;
- } else if (tmp > 0.0) {
- WARNING ("write_riemann plugin: The configured "
- "TTLFactor is too small to be "
- "useful (%.1f). I'll use it "
- "since the user knows best, "
- "but under protest.",
- tmp);
- host->ttl_factor = tmp;
- } else { /* zero, negative and NAN */
- ERROR ("write_riemann plugin: The configured "
- "TTLFactor is invalid (%.1f).",
- tmp);
- }
- } else {
- WARNING("write_riemann plugin: ignoring unknown config "
- "option: \"%s\"", child->key);
- }
- }
- if (status != 0) {
- riemann_free (host);
- return status;
- }
-
- ssnprintf (callback_name, sizeof (callback_name), "write_riemann/%s",
- host->name);
- ud.data = host;
- ud.free_func = riemann_free;
-
- pthread_mutex_lock (&host->lock);
-
- status = plugin_register_write (callback_name, riemann_write, &ud);
+ struct riemann_host *host = NULL;
+ int status = 0;
+ int i;
+ oconfig_item_t *child;
+ char callback_name[DATA_MAX_NAME_LEN];
+ user_data_t ud;
+
+ if ((host = calloc(1, sizeof(*host))) == NULL) {
+ ERROR ("write_riemann plugin: calloc failed.");
+ return ENOMEM;
+ }
+ pthread_mutex_init(&host->lock, NULL);
+ host->reference_count = 1;
+ host->node = NULL;
+ host->port = 0;
+ host->notifications = 1;
+ host->check_thresholds = 0;
+ host->store_rates = 1;
+ host->always_append_ds = 0;
+ host->batch_mode = 1;
+ host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
+ host->batch_init = cdtime();
+ host->ttl_factor = RIEMANN_TTL_FACTOR;
+ host->client = NULL;
+ host->client_type = RIEMANN_CLIENT_TCP;
+
+ status = cf_util_get_string(ci, &host->name);
+ if (status != 0) {
+ WARNING("write_riemann plugin: Required host name is missing.");
+ wrr_free(host);
+ return -1;
+ }
+
+ for (i = 0; i < ci->children_num; i++) {
+ /*
+ * The code here could be simplified but makes room
+ * for easy adding of new options later on.
+ */
+ child = &ci->children[i];
+ status = 0;
+
+ if (strcasecmp("Host", child->key) == 0) {
+ status = cf_util_get_string(child, &host->node);
+ if (status != 0)
+ break;
+ } else if (strcasecmp("Notifications", child->key) == 0) {
+ status = cf_util_get_boolean(child, &host->notifications);
+ if (status != 0)
+ break;
+ } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
+ status = cf_util_get_string(child, &host->event_service_prefix);
+ if (status != 0)
+ break;
+ } else if (strcasecmp("CheckThresholds", child->key) == 0) {
+ status = cf_util_get_boolean(child, &host->check_thresholds);
+ if (status != 0)
+ break;
+ } else if (strcasecmp("Batch", child->key) == 0) {
+ status = cf_util_get_boolean(child, &host->batch_mode);
+ if (status != 0)
+ break;
+ } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
+ status = cf_util_get_int(child, &host->batch_max);
+ if (status != 0)
+ break;
+ } else if (strcasecmp("Port", child->key) == 0) {
+ host->port = cf_util_get_port_number(child);
+ if (host->port == -1) {
+ ERROR("write_riemann plugin: Invalid argument "
+ "configured for the \"Port\" "
+ "option.");
+ break;
+ }
+ } else if (strcasecmp("Protocol", child->key) == 0) {
+ char tmp[16];
+ status = cf_util_get_string_buffer(child,
+ tmp, sizeof(tmp));
+ if (status != 0)
+ {
+ ERROR("write_riemann plugin: cf_util_get_"
+ "string_buffer failed with "
+ "status %i.", status);
+ break;
+ }
- if (host->use_tcp == 1 && host->batch_mode) {
- ud.free_func = NULL;
- plugin_register_flush(callback_name, riemann_batch_flush, &ud);
+ if (strcasecmp("UDP", tmp) == 0)
+ host->client_type = RIEMANN_CLIENT_UDP;
+ else if (strcasecmp("TCP", tmp) == 0)
+ host->client_type = RIEMANN_CLIENT_TCP;
+ else if (strcasecmp("TLS", tmp) == 0)
+ host->client_type = RIEMANN_CLIENT_TLS;
+ else
+ WARNING("write_riemann plugin: The value "
+ "\"%s\" is not valid for the "
+ "\"Protocol\" option. Use "
+ "either \"UDP\", \"TCP\" or \"TLS\".",
+ tmp);
+ } else if (strcasecmp("TLSCAFile", child->key) == 0) {
+ status = cf_util_get_string(child, &host->tls_ca_file);
+ if (status != 0)
+ {
+ ERROR("write_riemann plugin: cf_util_get_"
+ "string_buffer failed with "
+ "status %i.", status);
+ break;
+ }
+ } else if (strcasecmp("TLSCertFile", child->key) == 0) {
+ status = cf_util_get_string(child, &host->tls_cert_file);
+ if (status != 0)
+ {
+ ERROR("write_riemann plugin: cf_util_get_"
+ "string_buffer failed with "
+ "status %i.", status);
+ break;
+ }
+ } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
+ status = cf_util_get_string(child, &host->tls_key_file);
+ if (status != 0)
+ {
+ ERROR("write_riemann plugin: cf_util_get_"
+ "string_buffer failed with "
+ "status %i.", status);
+ break;
+ }
+ } else if (strcasecmp("StoreRates", child->key) == 0) {
+ status = cf_util_get_boolean(child, &host->store_rates);
+ if (status != 0)
+ break;
+ } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
+ status = cf_util_get_boolean(child,
+ &host->always_append_ds);
+ if (status != 0)
+ break;
+ } else if (strcasecmp("TTLFactor", child->key) == 0) {
+ double tmp = NAN;
+ status = cf_util_get_double(child, &tmp);
+ if (status != 0)
+ break;
+ if (tmp >= 2.0) {
+ host->ttl_factor = tmp;
+ } else if (tmp >= 1.0) {
+ NOTICE("write_riemann plugin: The configured "
+ "TTLFactor is very small "
+ "(%.1f). A value of 2.0 or "
+ "greater is recommended.",
+ tmp);
+ host->ttl_factor = tmp;
+ } else if (tmp > 0.0) {
+ WARNING("write_riemann plugin: The configured "
+ "TTLFactor is too small to be "
+ "useful (%.1f). I'll use it "
+ "since the user knows best, "
+ "but under protest.",
+ tmp);
+ host->ttl_factor = tmp;
+ } else { /* zero, negative and NAN */
+ ERROR("write_riemann plugin: The configured "
+ "TTLFactor is invalid (%.1f).",
+ tmp);
+ }
+ } else {
+ WARNING("write_riemann plugin: ignoring unknown config "
+ "option: \"%s\"", child->key);
+ }
+ }
+ if (status != 0) {
+ wrr_free(host);
+ return status;
+ }
+
+ ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
+ host->name);
+ ud.data = host;
+ ud.free_func = wrr_free;
+
+ pthread_mutex_lock(&host->lock);
+
+ status = plugin_register_write(callback_name, wrr_write, &ud);
+
+ if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
+ ud.free_func = NULL;
+ plugin_register_flush(callback_name, wrr_batch_flush, &ud);
+ }
+ if (status != 0)
+ WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
+ "failed with status %i.",
+ callback_name, status);
+ else /* success */
+ host->reference_count++;
+
+ status = plugin_register_notification(callback_name,
+ wrr_notification, &ud);
+ if (status != 0)
+ WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
+ "failed with status %i.",
+ callback_name, status);
+ else /* success */
+ host->reference_count++;
+
+ if (host->reference_count <= 1)
+ {
+ /* Both callbacks failed => free memory.
+ * We need to unlock here, because riemann_free() will lock.
+ * This is not a race condition, because we're the only one
+ * holding a reference. */
+ pthread_mutex_unlock(&host->lock);
+ wrr_free(host);
+ return (-1);
}
- if (status != 0)
- WARNING ("write_riemann plugin: plugin_register_write (\"%s\") "
- "failed with status %i.",
- callback_name, status);
- else /* success */
- host->reference_count++;
-
- status = plugin_register_notification (callback_name,
- riemann_notification, &ud);
- if (status != 0)
- WARNING ("write_riemann plugin: plugin_register_notification (\"%s\") "
- "failed with status %i.",
- callback_name, status);
- else /* success */
- host->reference_count++;
- if (host->reference_count <= 1)
- {
- /* Both callbacks failed => free memory.
- * We need to unlock here, because riemann_free() will lock.
- * This is not a race condition, because we're the only one
- * holding a reference. */
- pthread_mutex_unlock (&host->lock);
- riemann_free (host);
- return (-1);
- }
+ host->reference_count--;
+ pthread_mutex_unlock(&host->lock);
- host->reference_count--;
- pthread_mutex_unlock (&host->lock);
+ return status;
+} /* }}} int wrr_config_node */
- return status;
-} /* }}} int riemann_config_node */
-
-static int riemann_config(oconfig_item_t *ci) /* {{{ */
+static int wrr_config(oconfig_item_t *ci) /* {{{ */
{
- int i;
- oconfig_item_t *child;
- int status;
-
- for (i = 0; i < ci->children_num; i++) {
- child = &ci->children[i];
-
- if (strcasecmp("Node", child->key) == 0) {
- riemann_config_node (child);
- } else if (strcasecmp(child->key, "attribute") == 0) {
- char *key = NULL;
- char *val = NULL;
-
- if (child->values_num != 2) {
- WARNING("riemann attributes need both a key and a value.");
- return (-1);
- }
- if (child->values[0].type != OCONFIG_TYPE_STRING ||
- child->values[1].type != OCONFIG_TYPE_STRING) {
- WARNING("riemann attribute needs string arguments.");
- return (-1);
- }
- if ((key = strdup(child->values[0].value.string)) == NULL) {
- WARNING("cannot allocate memory for attribute key.");
- return (-1);
- }
- if ((val = strdup(child->values[1].value.string)) == NULL) {
- WARNING("cannot allocate memory for attribute value.");
- sfree (key);
- return (-1);
- }
- strarray_add(&riemann_attrs, &riemann_attrs_num, key);
- strarray_add(&riemann_attrs, &riemann_attrs_num, val);
- DEBUG("write_riemann: got attr: %s => %s", key, val);
- sfree(key);
- sfree(val);
- } else if (strcasecmp(child->key, "tag") == 0) {
- char *tmp = NULL;
- status = cf_util_get_string(child, &tmp);
- if (status != 0)
- continue;
-
- strarray_add (&riemann_tags, &riemann_tags_num, tmp);
- DEBUG("write_riemann plugin: Got tag: %s", tmp);
- sfree (tmp);
- } else {
- WARNING ("write_riemann plugin: Ignoring unknown "
- "configuration option \"%s\" at top level.",
- child->key);
- }
- }
- return (0);
-} /* }}} int riemann_config */
+ int i;
+ oconfig_item_t *child;
+ int status;
+
+ for (i = 0; i < ci->children_num; i++) {
+ child = &ci->children[i];
+
+ if (strcasecmp("Node", child->key) == 0) {
+ wrr_config_node (child);
+ } else if (strcasecmp(child->key, "attribute") == 0) {
+ char *key = NULL;
+ char *val = NULL;
+
+ if (child->values_num != 2) {
+ WARNING("riemann attributes need both a key and a value.");
+ return (-1);
+ }
+ if (child->values[0].type != OCONFIG_TYPE_STRING ||
+ child->values[1].type != OCONFIG_TYPE_STRING) {
+ WARNING("riemann attribute needs string arguments.");
+ return (-1);
+ }
+ if ((key = strdup(child->values[0].value.string)) == NULL) {
+ WARNING("cannot allocate memory for attribute key.");
+ return (-1);
+ }
+ if ((val = strdup(child->values[1].value.string)) == NULL) {
+ WARNING("cannot allocate memory for attribute value.");
+ sfree(key);
+ return (-1);
+ }
+ strarray_add(&riemann_attrs, &riemann_attrs_num, key);
+ strarray_add(&riemann_attrs, &riemann_attrs_num, val);
+ DEBUG("write_riemann: got attr: %s => %s", key, val);
+ sfree(key);
+ sfree(val);
+ } else if (strcasecmp(child->key, "tag") == 0) {
+ char *tmp = NULL;
+ status = cf_util_get_string(child, &tmp);
+ if (status != 0)
+ continue;
+
+ strarray_add(&riemann_tags, &riemann_tags_num, tmp);
+ DEBUG("write_riemann plugin: Got tag: %s", tmp);
+ sfree(tmp);
+ } else {
+ WARNING("write_riemann plugin: Ignoring unknown "
+ "configuration option \"%s\" at top level.",
+ child->key);
+ }
+ }
+ return (0);
+} /* }}} int wrr_config */
void module_register(void)
{
- plugin_register_complex_config ("write_riemann", riemann_config);
+ plugin_register_complex_config("write_riemann", wrr_config);
}
/* vim: set sw=8 sts=8 ts=8 noet : */