From: John-John Tedro Date: Tue, 17 Sep 2013 13:40:13 +0000 (+0200) Subject: write_riemann plugin: Receive acknowledge message when using TCP. X-Git-Tag: collectd-5.5.0~343^2 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=78c0678a23963a829e41f03b5540bcb3dc585b59;p=collectd.git write_riemann plugin: Receive acknowledge message when using TCP. Not receiving an acknowledge message when communicating with riemann over TCP will cause the riemann instance to eventually hang for extended periods of time because of resource exhaustion. Took the time to reaorganize the riemann_send function to simplify locking. --- diff --git a/src/write_riemann.c b/src/write_riemann.c index 3345d044..3261e629 100644 --- a/src/write_riemann.c +++ b/src/write_riemann.c @@ -178,32 +178,30 @@ riemann_disconnect (struct riemann_host *host) return (0); } -static int -riemann_send(struct riemann_host *host, Msg const *msg) +static inline int +riemann_send_msg(struct riemann_host *host, const Msg *msg) { - u_char *buffer; + int status = 0; + u_char *buffer = NULL; size_t buffer_len; - int status; - - pthread_mutex_lock (&host->lock); status = riemann_connect (host); + if (status != 0) - { - pthread_mutex_unlock (&host->lock); return status; - } buffer_len = msg__get_packed_size(msg); + if (host->use_tcp) buffer_len += 4; buffer = malloc (buffer_len); + if (buffer == NULL) { - pthread_mutex_unlock (&host->lock); ERROR ("write_riemann plugin: malloc failed."); return ENOMEM; } + memset (buffer, 0, buffer_len); if (host->use_tcp) @@ -218,26 +216,105 @@ riemann_send(struct riemann_host *host, Msg const *msg) } status = (int) swrite (host->s, buffer, buffer_len); + if (status != 0) { char errbuf[1024]; - riemann_disconnect (host); - pthread_mutex_unlock (&host->lock); - ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s", (host->node != NULL) ? host->node : RIEMANN_HOST, (host->service != NULL) ? host->service : RIEMANN_PORT, sstrerror (errno, errbuf, sizeof (errbuf))); + sfree (buffer); return -1; } - pthread_mutex_unlock (&host->lock); sfree (buffer); return 0; } +static inline int +riemann_recv_ack(struct riemann_host *host) +{ + int status = 0; + Msg *msg = NULL; + uint32_t header; + + status = (int) sread (host->s, &header, 4); + + if (status != 0) + return -1; + + size_t size = ntohl(header); + + // Buffer on the stack since acknowledges are typically small. + u_char buffer[size]; + memset (buffer, 0, size); + + status = (int) sread (host->s, buffer, size); + + if (status != 0) + return status; + + msg = msg__unpack (NULL, size, buffer); + + if (msg == NULL) + return -1; + + if (!msg->ok) + { + ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s", + (host->node != NULL) ? host->node : RIEMANN_HOST, + (host->service != NULL) ? host->service : RIEMANN_PORT, + msg->error); + + msg__free_unpacked(msg, NULL); + return -1; + } + + msg__free_unpacked (msg, NULL); + return 0; +} + +/** + * Function to send messages (Msg) to riemann. + * + * Acquires the host lock, disconnects on errors. + */ +static int +riemann_send(struct riemann_host *host, Msg const *msg) +{ + int status = 0; + pthread_mutex_lock (&host->lock); + + status = riemann_send_msg(host, msg); + + if (status != 0) { + riemann_disconnect (host); + pthread_mutex_unlock (&host->lock); + return status; + } + + /* + * For TCP we need to receive message acknowledgemenent. + */ + if (host->use_tcp) + { + status = riemann_recv_ack(host); + + if (status != 0) + { + riemann_disconnect (host); + pthread_mutex_unlock (&host->lock); + return status; + } + } + + pthread_mutex_unlock (&host->lock); + return 0; +} + static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */ { return (strarray_add (&event->tags, &event->n_tags, tag));