Code

patches: Added bts770681_riemann_ack.
[pkg-collectd.git] / debian / patches / bts770681_riemann_ack.dpatch
1 #! /bin/sh /usr/share/dpatch/dpatch-run
2 ## bts770681_riemann_ack.dpatch by John-John Tedro <udoprog@spotify.com>
3 ##
4 ## DP: write_riemann plugin: Receive acknowledge message when using TCP.
5 ## DP:
6 ## DP: Not receiving an acknowledge message when communicating with riemann
7 ## DP: over TCP will cause the riemann instance to eventually hang for
8 ## DP: extended periods of time because of resource exhaustion.
9 ## DP:
10 ## DP: Upstream bug report:
11 ## DP: https://github.com/collectd/collectd/pull/425
13 @DPATCH@
15 diff a/src/write_riemann.c b/src/write_riemann.c
16 --- a/src/write_riemann.c
17 +++ b/src/write_riemann.c
18 @@ -176,32 +176,30 @@ riemann_disconnect (struct riemann_host *host)
19         return (0);
20  }
21  
22 -static int
23 -riemann_send(struct riemann_host *host, Msg const *msg)
24 +static inline int
25 +riemann_send_msg(struct riemann_host *host, const Msg *msg)
26  {
27 -       u_char *buffer;
28 +       int status = 0;
29 +       u_char *buffer = NULL;
30         size_t  buffer_len;
31 -       int status;
32 -
33 -       pthread_mutex_lock (&host->lock);
34  
35         status = riemann_connect (host);
36 +
37         if (status != 0)
38 -       {
39 -               pthread_mutex_unlock (&host->lock);
40                 return status;
41 -       }
42  
43         buffer_len = msg__get_packed_size(msg);
44 +
45         if (host->use_tcp)
46                 buffer_len += 4;
47  
48         buffer = malloc (buffer_len);
49 +
50         if (buffer == NULL) {
51 -               pthread_mutex_unlock (&host->lock);
52                 ERROR ("write_riemann plugin: malloc failed.");
53                 return ENOMEM;
54         }
55 +
56         memset (buffer, 0, buffer_len);
57  
58         if (host->use_tcp)
59 @@ -216,26 +214,105 @@ riemann_send(struct riemann_host *host, Msg const *msg)
60         }
61  
62         status = (int) swrite (host->s, buffer, buffer_len);
63 +
64         if (status != 0)
65         {
66                 char errbuf[1024];
67  
68 -               riemann_disconnect (host);
69 -               pthread_mutex_unlock (&host->lock);
70 -
71                 ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
72                                 (host->node != NULL) ? host->node : RIEMANN_HOST,
73                                 (host->service != NULL) ? host->service : RIEMANN_PORT,
74                                 sstrerror (errno, errbuf, sizeof (errbuf)));
75 +
76                 sfree (buffer);
77                 return -1;
78         }
79  
80 -       pthread_mutex_unlock (&host->lock);
81         sfree (buffer);
82         return 0;
83  }
84  
85 +static inline int
86 +riemann_recv_ack(struct riemann_host *host)
87 +{
88 +       int status = 0;
89 +       Msg *msg = NULL;
90 +       uint32_t header;
91 +
92 +       status = (int) sread (host->s, &header, 4);
93 +
94 +       if (status != 0)
95 +               return -1;
96 +
97 +       size_t size = ntohl(header);
98 +
99 +       // Buffer on the stack since acknowledges are typically small.
100 +       u_char buffer[size];
101 +       memset (buffer, 0, size);
103 +       status = (int) sread (host->s, buffer, size);
105 +       if (status != 0)
106 +               return status;
108 +       msg = msg__unpack (NULL, size, buffer);
110 +       if (msg == NULL)
111 +               return -1;
113 +       if (!msg->ok)
114 +       {
115 +               ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s",
116 +                               (host->node != NULL) ? host->node : RIEMANN_HOST,
117 +                               (host->service != NULL) ? host->service : RIEMANN_PORT,
118 +                               msg->error);
120 +               msg__free_unpacked(msg, NULL);
121 +               return -1;
122 +       }
124 +       msg__free_unpacked (msg, NULL);
125 +       return 0;
126 +}
128 +/**
129 + * Function to send messages (Msg) to riemann.
130 + *
131 + * Acquires the host lock, disconnects on errors.
132 + */
133 +static int
134 +riemann_send(struct riemann_host *host, Msg const *msg)
135 +{
136 +       int status = 0;
137 +       pthread_mutex_lock (&host->lock);
139 +       status = riemann_send_msg(host, msg);
141 +       if (status != 0) {
142 +               riemann_disconnect (host);
143 +               pthread_mutex_unlock (&host->lock);
144 +               return status;
145 +       }
147 +       /*
148 +        * For TCP we need to receive message acknowledgemenent.
149 +        */
150 +       if (host->use_tcp)
151 +       {
152 +               status = riemann_recv_ack(host);
154 +               if (status != 0)
155 +               {
156 +                       riemann_disconnect (host);
157 +                       pthread_mutex_unlock (&host->lock);
158 +                       return status;
159 +               }
160 +       }
162 +       pthread_mutex_unlock (&host->lock);
163 +       return 0;
164 +}
166  static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
167  {
168         return (strarray_add (&event->tags, &event->n_tags, tag));