1 #! /bin/sh /usr/share/dpatch/dpatch-run
2 ## bts770681_riemann_ack.dpatch by John-John Tedro <udoprog@spotify.com>
3 ##
4 ## DP: write_riemann plugin: Receive acknowledge message when using TCP.
5 ## DP:
6 ## DP: Not receiving an acknowledge message when communicating with riemann
7 ## DP: over TCP will cause the riemann instance to eventually hang for
8 ## DP: extended periods of time because of resource exhaustion.
9 ## DP:
10 ## DP: Upstream bug report:
11 ## DP: https://github.com/collectd/collectd/pull/425
13 @DPATCH@
15 diff a/src/write_riemann.c b/src/write_riemann.c
16 --- a/src/write_riemann.c
17 +++ b/src/write_riemann.c
18 @@ -176,32 +176,30 @@ riemann_disconnect (struct riemann_host *host)
19 return (0);
20 }
22 -static int
23 -riemann_send(struct riemann_host *host, Msg const *msg)
24 +static inline int
25 +riemann_send_msg(struct riemann_host *host, const Msg *msg)
26 {
27 - u_char *buffer;
28 + int status = 0;
29 + u_char *buffer = NULL;
30 size_t buffer_len;
31 - int status;
32 -
33 - pthread_mutex_lock (&host->lock);
35 status = riemann_connect (host);
36 +
37 if (status != 0)
38 - {
39 - pthread_mutex_unlock (&host->lock);
40 return status;
41 - }
43 buffer_len = msg__get_packed_size(msg);
44 +
45 if (host->use_tcp)
46 buffer_len += 4;
48 buffer = malloc (buffer_len);
49 +
50 if (buffer == NULL) {
51 - pthread_mutex_unlock (&host->lock);
52 ERROR ("write_riemann plugin: malloc failed.");
53 return ENOMEM;
54 }
55 +
56 memset (buffer, 0, buffer_len);
58 if (host->use_tcp)
59 @@ -216,26 +214,105 @@ riemann_send(struct riemann_host *host, Msg const *msg)
60 }
62 status = (int) swrite (host->s, buffer, buffer_len);
63 +
64 if (status != 0)
65 {
66 char errbuf[1024];
68 - riemann_disconnect (host);
69 - pthread_mutex_unlock (&host->lock);
70 -
71 ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
72 (host->node != NULL) ? host->node : RIEMANN_HOST,
73 (host->service != NULL) ? host->service : RIEMANN_PORT,
74 sstrerror (errno, errbuf, sizeof (errbuf)));
75 +
76 sfree (buffer);
77 return -1;
78 }
80 - pthread_mutex_unlock (&host->lock);
81 sfree (buffer);
82 return 0;
83 }
85 +static inline int
86 +riemann_recv_ack(struct riemann_host *host)
87 +{
88 + int status = 0;
89 + Msg *msg = NULL;
90 + uint32_t header;
91 +
92 + status = (int) sread (host->s, &header, 4);
93 +
94 + if (status != 0)
95 + return -1;
96 +
97 + size_t size = ntohl(header);
98 +
99 + // Buffer on the stack since acknowledges are typically small.
100 + u_char buffer[size];
101 + memset (buffer, 0, size);
102 +
103 + status = (int) sread (host->s, buffer, size);
104 +
105 + if (status != 0)
106 + return status;
107 +
108 + msg = msg__unpack (NULL, size, buffer);
109 +
110 + if (msg == NULL)
111 + return -1;
112 +
113 + if (!msg->ok)
114 + {
115 + ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s",
116 + (host->node != NULL) ? host->node : RIEMANN_HOST,
117 + (host->service != NULL) ? host->service : RIEMANN_PORT,
118 + msg->error);
119 +
120 + msg__free_unpacked(msg, NULL);
121 + return -1;
122 + }
123 +
124 + msg__free_unpacked (msg, NULL);
125 + return 0;
126 +}
127 +
128 +/**
129 + * Function to send messages (Msg) to riemann.
130 + *
131 + * Acquires the host lock, disconnects on errors.
132 + */
133 +static int
134 +riemann_send(struct riemann_host *host, Msg const *msg)
135 +{
136 + int status = 0;
137 + pthread_mutex_lock (&host->lock);
138 +
139 + status = riemann_send_msg(host, msg);
140 +
141 + if (status != 0) {
142 + riemann_disconnect (host);
143 + pthread_mutex_unlock (&host->lock);
144 + return status;
145 + }
146 +
147 + /*
148 + * For TCP we need to receive message acknowledgemenent.
149 + */
150 + if (host->use_tcp)
151 + {
152 + status = riemann_recv_ack(host);
153 +
154 + if (status != 0)
155 + {
156 + riemann_disconnect (host);
157 + pthread_mutex_unlock (&host->lock);
158 + return status;
159 + }
160 + }
161 +
162 + pthread_mutex_unlock (&host->lock);
163 + return 0;
164 +}
165 +
166 static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
167 {
168 return (strarray_add (&event->tags, &event->n_tags, tag));