Code

patches: Added bts770681_riemann_ack.
authorSebastian Harl <sh@tokkee.org>
Sun, 23 Nov 2014 12:20:44 +0000 (13:20 +0100)
committerSebastian Harl <sh@tokkee.org>
Sun, 23 Nov 2014 12:20:44 +0000 (13:20 +0100)
Upstream fix for the write_riemann plugin to avoid locking up a remote Riemann
instance. Thanks to Marc Fournier for reporting this.

Closes: #770681
debian/changelog
debian/patches/00list
debian/patches/bts770681_riemann_ack.dpatch [new file with mode: 0755]
debian/patches/collection.cgi.dpatch [changed mode: 0644->0755]

index 8d0487125d2deada7168f6c7666f65388d73ff38..eb58e134609896533ee169ee7c59639720fa0487 100644 (file)
@@ -1,3 +1,12 @@
+collectd (5.4.1-6) UNRELEASED; urgency=medium
+
+  * debian/patches:
+    - Added bts770681_riemann_ack: upstream fix for the write_riemann plugin
+      to avoid locking up a remote Riemann instance; thanks to Marc Fournier
+      for reporting this (Closes: #770681).
+
+ -- Sebastian Harl <tokkee@debian.org>  Sun, 23 Nov 2014 13:04:03 +0100
+
 collectd (5.4.1-5) unstable; urgency=medium
 
   * debian/rules:
index ae9c349118cfee31c14d359e0674fe21eefabbd3..78e7f2d8c17edd217bec1c52fe8fc4dce9721031 100644 (file)
@@ -4,3 +4,4 @@ collection.cgi.dpatch
 myplugin_includes.dpatch
 myplugin_api.dpatch
 bts559801_plugin_find_fix.dpatch
+bts770681_riemann_ack.dpatch
diff --git a/debian/patches/bts770681_riemann_ack.dpatch b/debian/patches/bts770681_riemann_ack.dpatch
new file mode 100755 (executable)
index 0000000..c48dd2a
--- /dev/null
@@ -0,0 +1,168 @@
+#! /bin/sh /usr/share/dpatch/dpatch-run
+## bts770681_riemann_ack.dpatch by John-John Tedro <udoprog@spotify.com>
+##
+## DP: write_riemann plugin: Receive acknowledge message when using TCP.
+## DP:
+## DP: Not receiving an acknowledge message when communicating with riemann
+## DP: over TCP will cause the riemann instance to eventually hang for
+## DP: extended periods of time because of resource exhaustion.
+## DP:
+## DP: Upstream bug report:
+## DP: https://github.com/collectd/collectd/pull/425
+
+@DPATCH@
+
+diff a/src/write_riemann.c b/src/write_riemann.c
+--- a/src/write_riemann.c
++++ b/src/write_riemann.c
+@@ -176,32 +176,30 @@ riemann_disconnect (struct riemann_host *host)
+       return (0);
+ }
+-static int
+-riemann_send(struct riemann_host *host, Msg const *msg)
++static inline int
++riemann_send_msg(struct riemann_host *host, const Msg *msg)
+ {
+-      u_char *buffer;
++      int status = 0;
++      u_char *buffer = NULL;
+       size_t  buffer_len;
+-      int status;
+-
+-      pthread_mutex_lock (&host->lock);
+       status = riemann_connect (host);
++
+       if (status != 0)
+-      {
+-              pthread_mutex_unlock (&host->lock);
+               return status;
+-      }
+       buffer_len = msg__get_packed_size(msg);
++
+       if (host->use_tcp)
+               buffer_len += 4;
+       buffer = malloc (buffer_len);
++
+       if (buffer == NULL) {
+-              pthread_mutex_unlock (&host->lock);
+               ERROR ("write_riemann plugin: malloc failed.");
+               return ENOMEM;
+       }
++
+       memset (buffer, 0, buffer_len);
+       if (host->use_tcp)
+@@ -216,26 +214,105 @@ riemann_send(struct riemann_host *host, Msg const *msg)
+       }
+       status = (int) swrite (host->s, buffer, buffer_len);
++
+       if (status != 0)
+       {
+               char errbuf[1024];
+-              riemann_disconnect (host);
+-              pthread_mutex_unlock (&host->lock);
+-
+               ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
+                               (host->node != NULL) ? host->node : RIEMANN_HOST,
+                               (host->service != NULL) ? host->service : RIEMANN_PORT,
+                               sstrerror (errno, errbuf, sizeof (errbuf)));
++
+               sfree (buffer);
+               return -1;
+       }
+-      pthread_mutex_unlock (&host->lock);
+       sfree (buffer);
+       return 0;
+ }
++static inline int
++riemann_recv_ack(struct riemann_host *host)
++{
++      int status = 0;
++      Msg *msg = NULL;
++      uint32_t header;
++
++      status = (int) sread (host->s, &header, 4);
++
++      if (status != 0)
++              return -1;
++
++      size_t size = ntohl(header);
++
++      // Buffer on the stack since acknowledges are typically small.
++      u_char buffer[size];
++      memset (buffer, 0, size);
++
++      status = (int) sread (host->s, buffer, size);
++
++      if (status != 0)
++              return status;
++
++      msg = msg__unpack (NULL, size, buffer);
++
++      if (msg == NULL)
++              return -1;
++
++      if (!msg->ok)
++      {
++              ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s",
++                              (host->node != NULL) ? host->node : RIEMANN_HOST,
++                              (host->service != NULL) ? host->service : RIEMANN_PORT,
++                              msg->error);
++
++              msg__free_unpacked(msg, NULL);
++              return -1;
++      }
++
++      msg__free_unpacked (msg, NULL);
++      return 0;
++}
++
++/**
++ * Function to send messages (Msg) to riemann.
++ *
++ * Acquires the host lock, disconnects on errors.
++ */
++static int
++riemann_send(struct riemann_host *host, Msg const *msg)
++{
++      int status = 0;
++      pthread_mutex_lock (&host->lock);
++
++      status = riemann_send_msg(host, msg);
++
++      if (status != 0) {
++              riemann_disconnect (host);
++              pthread_mutex_unlock (&host->lock);
++              return status;
++      }
++
++      /*
++       * For TCP we need to receive message acknowledgemenent.
++       */
++      if (host->use_tcp)
++      {
++              status = riemann_recv_ack(host);
++
++              if (status != 0)
++              {
++                      riemann_disconnect (host);
++                      pthread_mutex_unlock (&host->lock);
++                      return status;
++              }
++      }
++
++      pthread_mutex_unlock (&host->lock);
++      return 0;
++}
++
+ static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
+ {
+       return (strarray_add (&event->tags, &event->n_tags, tag));
old mode 100644 (file)
new mode 100755 (executable)