Code

write_riemann plugin: Receive acknowledge message when using TCP.
authorJohn-John Tedro <udoprog@spotify.com>
Tue, 17 Sep 2013 13:40:13 +0000 (15:40 +0200)
committerMarc Fournier <marc.fournier@camptocamp.com>
Mon, 3 Mar 2014 14:29:52 +0000 (15:29 +0100)
Not receiving an acknowledge message when communicating with riemann over TCP
will cause the riemann instance to eventually hang for extended periods of time
because of resource exhaustion.

Took the time to reaorganize the riemann_send function to simplify locking.

src/write_riemann.c

index 15bb23787c59f7b4f01d543c621a75fee2be42f4..fdd160b5d3681ff7e787120a49bedaafd998f55c 100644 (file)
@@ -176,32 +176,30 @@ riemann_disconnect (struct riemann_host *host)
        return (0);
 }
 
-static int
-riemann_send(struct riemann_host *host, Msg const *msg)
+static inline int
+riemann_send_msg(struct riemann_host *host, const Msg *msg)
 {
-       u_char *buffer;
+       int status = 0;
+       u_char *buffer = NULL;
        size_t  buffer_len;
-       int status;
-
-       pthread_mutex_lock (&host->lock);
 
        status = riemann_connect (host);
+
        if (status != 0)
-       {
-               pthread_mutex_unlock (&host->lock);
                return status;
-       }
 
        buffer_len = msg__get_packed_size(msg);
+
        if (host->use_tcp)
                buffer_len += 4;
 
        buffer = malloc (buffer_len);
+
        if (buffer == NULL) {
-               pthread_mutex_unlock (&host->lock);
                ERROR ("write_riemann plugin: malloc failed.");
                return ENOMEM;
        }
+
        memset (buffer, 0, buffer_len);
 
        if (host->use_tcp)
@@ -216,26 +214,105 @@ riemann_send(struct riemann_host *host, Msg const *msg)
        }
 
        status = (int) swrite (host->s, buffer, buffer_len);
+
        if (status != 0)
        {
                char errbuf[1024];
 
-               riemann_disconnect (host);
-               pthread_mutex_unlock (&host->lock);
-
                ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
                                (host->node != NULL) ? host->node : RIEMANN_HOST,
                                (host->service != NULL) ? host->service : RIEMANN_PORT,
                                sstrerror (errno, errbuf, sizeof (errbuf)));
+
                sfree (buffer);
                return -1;
        }
 
-       pthread_mutex_unlock (&host->lock);
        sfree (buffer);
        return 0;
 }
 
+static inline int
+riemann_recv_ack(struct riemann_host *host)
+{
+       int status = 0;
+       Msg *msg = NULL;
+       uint32_t header;
+
+       status = (int) sread (host->s, &header, 4);
+
+       if (status != 0)
+               return -1;
+
+       size_t size = ntohl(header);
+
+       // Buffer on the stack since acknowledges are typically small.
+       u_char buffer[size];
+       memset (buffer, 0, size);
+
+       status = (int) sread (host->s, buffer, size);
+
+       if (status != 0)
+               return status;
+
+       msg = msg__unpack (NULL, size, buffer);
+
+       if (msg == NULL)
+               return -1;
+
+       if (!msg->ok)
+       {
+               ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s",
+                               (host->node != NULL) ? host->node : RIEMANN_HOST,
+                               (host->service != NULL) ? host->service : RIEMANN_PORT,
+                               msg->error);
+
+               msg__free_unpacked(msg, NULL);
+               return -1;
+       }
+
+       msg__free_unpacked (msg, NULL);
+       return 0;
+}
+
+/**
+ * Function to send messages (Msg) to riemann.
+ *
+ * Acquires the host lock, disconnects on errors.
+ */
+static int
+riemann_send(struct riemann_host *host, Msg const *msg)
+{
+       int status = 0;
+       pthread_mutex_lock (&host->lock);
+
+       status = riemann_send_msg(host, msg);
+
+       if (status != 0) {
+               riemann_disconnect (host);
+               pthread_mutex_unlock (&host->lock);
+               return status;
+       }
+
+       /*
+        * For TCP we need to receive message acknowledgemenent.
+        */
+       if (host->use_tcp)
+       {
+               status = riemann_recv_ack(host);
+
+               if (status != 0)
+               {
+                       riemann_disconnect (host);
+                       pthread_mutex_unlock (&host->lock);
+                       return status;
+               }
+       }
+
+       pthread_mutex_unlock (&host->lock);
+       return 0;
+}
+
 static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
 {
        return (strarray_add (&event->tags, &event->n_tags, tag));