From cb1de9491fd5fb20eeec551487edf85e97b17579 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 25 Jan 2013 11:08:43 +0100 Subject: [PATCH] write_riemann plugin: Implement communication over TCP. --- src/collectd.conf.in | 1 + src/collectd.conf.pod | 11 +++++++++-- src/write_riemann.c | 40 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 9f0390e4..699c0e0a 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -1065,6 +1065,7 @@ # # Host "localhost" # Port 5555 +# Protocol UDP # StoreRates true # AlwaysAppendDS false # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 5ada55ab..aa2871d8 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -5546,7 +5546,9 @@ Synopsis: Host "localhost" Port "5555" - StoreRates false + Protocol UDP + StoreRates true + AlwaysAppendDS false Delay 10 Tag "foobar" @@ -5573,7 +5575,12 @@ Hostname or address to connect to. Defaults to C. Service name or port number to connect to. Defaults to C<5555>. -=item B B|B +=item B B|B + +Specify the protocol to use when communicating with I. Defaults to +B. + +=item B B|B If set to B (the default), convert counter values to rates. If set to B counter values are stored as is, i.e. as an increasing integer number. diff --git a/src/write_riemann.c b/src/write_riemann.c index 62d75f36..df0c3733 100644 --- a/src/write_riemann.c +++ b/src/write_riemann.c @@ -47,6 +47,7 @@ struct riemann_host { _Bool always_append_ds; char *node; char *service; + _Bool use_tcp; int s; int reference_count; @@ -118,6 +119,9 @@ riemann_send(struct riemann_host *host, Msg const *msg) } buffer_len = msg__get_packed_size(msg); + if (host->use_tcp) + buffer_len += 4; + buffer = malloc (buffer_len); if (buffer == NULL) { pthread_mutex_unlock (&host->lock); @@ -126,7 +130,16 @@ riemann_send(struct riemann_host *host, Msg const *msg) } memset (buffer, 0, buffer_len); - msg__pack(msg, buffer); + if (host->use_tcp) + { + uint32_t length = htonl ((uint32_t) (buffer_len - 4)); + memcpy (buffer, &length, 4); + msg__pack(msg, buffer + 4); + } + else + { + msg__pack(msg, buffer); + } status = (int) swrite (host->s, buffer, buffer_len); if (status != 0) @@ -460,7 +473,7 @@ riemann_connect(struct riemann_host *host) memset(&hints, 0, sizeof(hints)); memset(&service, 0, sizeof(service)); hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; + hints.ai_socktype = host->use_tcp ? SOCK_STREAM : SOCK_DGRAM; #ifdef AI_ADDRCONFIG hints.ai_flags |= AI_ADDRCONFIG; #endif @@ -562,6 +575,7 @@ riemann_config_node(oconfig_item_t *ci) host->service = NULL; host->store_rates = 1; host->always_append_ds = 0; + host->use_tcp = 0; status = cf_util_get_string (ci, &host->name); if (status != 0) { @@ -590,6 +604,28 @@ riemann_config_node(oconfig_item_t *ci) "option."); break; } + } else if (strcasecmp ("Protocol", child->key) == 0) { + char tmp[16]; + status = cf_util_get_string_buffer (child, + tmp, sizeof (tmp)); + if (status != 0) + { + ERROR ("write_riemann plugin: cf_util_get_" + "string_buffer failed with " + "status %i.", status); + break; + } + + if (strcasecmp ("UDP", tmp) == 0) + host->use_tcp = 0; + else if (strcasecmp ("TCP", tmp) == 0) + host->use_tcp = 1; + else + WARNING ("write_riemann plugin: The value " + "\"%s\" is not valid for the " + "\"Protocol\" option. Use " + "either \"UDP\" or \"TCP\".", + tmp); } else if (strcasecmp ("StoreRates", child->key) == 0) { status = cf_util_get_boolean (child, &host->store_rates); if (status != 0) -- 2.30.2