ddbbe9e381a09a6e9f2976ae063422a93901b236
1 /*
2 * collectd - src/riemann.c
3 *
4 * Copyright (C) 2012 Pierre-Yves Ritschard <pyr@spootnik.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 *
18 */
20 #include "collectd.h"
21 #include "plugin.h"
22 #include "common.h"
23 #include "configfile.h"
24 #include "riemann.pb-c.h"
26 #include <sys/socket.h>
27 #include <arpa/inet.h>
28 #include <errno.h>
29 #include <netdb.h>
30 #include <inttypes.h>
31 #include <pthread.h>
33 #define RIEMANN_DELAY 1
34 #define RIEMANN_PORT 5555
35 #define RIEMANN_MAX_TAGS 37
36 #define RIEMANN_EXTRA_TAGS 32
38 struct riemann_host {
39 struct riemann_host *next;
40 #define F_CONNECT 0x01
41 u_int8_t flags;
42 pthread_mutex_t lock;
43 int delay;
44 char name[DATA_MAX_NAME_LEN];
45 int port;
46 int s;
47 };
49 struct riemann_event {
50 Event ev;
51 char service[DATA_MAX_NAME_LEN];
52 const char *tags[RIEMANN_MAX_TAGS];
53 };
55 char *riemann_tags[RIEMANN_EXTRA_TAGS];
56 int riemann_tagcount;
58 int riemann_write(const data_set_t *, const value_list_t *, user_data_t *);
59 int riemann_connect(struct riemann_host *);
60 void riemann_free(void *);
61 int riemann_config_host(oconfig_item_t *);
62 int riemann_config(oconfig_item_t *);
63 void module_register(void);
65 /*
66 * Functions
67 */
68 int
69 riemann_write(const data_set_t *ds,
70 const value_list_t *vl,
71 user_data_t *ud)
72 {
73 int i, j;
74 int status;
75 struct riemann_host *host = ud->data;
76 Msg msg = MSG__INIT;
77 Event *ev;
78 struct riemann_event *event_tab, *event;
79 u_char *buf;
80 size_t len;
82 if ((status = riemann_connect(host)) != 0)
83 return status;
85 msg.n_events = vl->values_len;
87 /*
88 * Get rid of allocations up front
89 */
90 if ((msg.events = calloc(msg.n_events, sizeof(*msg.events))) == NULL ||
91 (event_tab = calloc(msg.n_events, sizeof(*event_tab))) == NULL) {
92 free(msg.events);
93 free(event_tab);
94 return ENOMEM;
95 }
97 /*
98 * Now produce valid protobuf structures
99 */
100 for (i = 0; i < vl->values_len; i++) {
101 event = &event_tab[i];
102 event__init(&event->ev);
104 ev = &event->ev;
105 event__init(ev);
106 ev->host = host->name;
107 ev->has_time = 1;
108 ev->time = CDTIME_T_TO_TIME_T(vl->time);
109 ev->has_ttl = 1;
110 ev->ttl = CDTIME_T_TO_TIME_T(vl->interval) + host->delay;
111 ev->n_tags = 3;
112 ev->tags = (char **)event->tags;
113 event->tags[0] = DS_TYPE_TO_STRING(ds->ds[i].type);
114 event->tags[1] = vl->plugin;
115 event->tags[2] = ds->ds[i].name;
116 if (vl->plugin_instance && strlen(vl->plugin_instance)) {
117 event->tags[ev->n_tags++] = vl->plugin_instance;
118 }
119 if (vl->type && strlen(vl->type)) {
120 event->tags[ev->n_tags++] = vl->type;
121 }
122 if (vl->type_instance && strlen(vl->type_instance)) {
123 event->tags[ev->n_tags++] = vl->type_instance;
124 }
126 /* add user defined extra tags */
127 for (j = 0; j < riemann_tagcount; j++)
128 event->tags[ev->n_tags++] = riemann_tags[j];
130 switch (ds->ds[i].type) {
131 case DS_TYPE_COUNTER:
132 ev->has_metric_sint64 = 1;
133 ev->metric_sint64 = vl->values[i].counter;
134 break;
135 case DS_TYPE_GAUGE:
136 ev->has_metric_d = 1;
137 ev->metric_d = vl->values[i].gauge;
138 break;
139 case DS_TYPE_DERIVE:
140 ev->has_metric_sint64 = 1;
141 ev->metric_sint64 = vl->values[i].derive;
142 break;
143 case DS_TYPE_ABSOLUTE:
144 ev->has_metric_sint64 = 1;
145 ev->metric_sint64 = vl->values[i].absolute;
146 break;
147 default:
148 WARNING("riemann_write: unknown metric type: %d",
149 ds->ds[i].type);
150 break;
151 }
152 ssnprintf(event->service, sizeof(event->service),
153 "%s-%s-%s-%s-%s", vl->plugin, vl->plugin_instance,
154 vl->type, vl->type_instance, ds->ds[i].name);
155 ev->service = event->service;
156 DEBUG("riemann_write: %s ready to send", ev->service);
157 msg.events[i] = ev;
158 }
160 /*
161 * we have now packed a bunch of events, let's pack them
162 */
163 len = msg__get_packed_size(&msg);
164 DEBUG("riemann_write: packed size computed: %ld", len);
165 if ((buf = calloc(1, len)) == NULL) {
166 WARNING("riemann_write: failing to alloc buf!");
167 sfree(msg.events);
168 return ENOMEM;
169 }
171 /*
172 * prepend full size to beginning of buffer
173 */
174 msg__pack(&msg, buf);
175 sfree(msg.events);
177 /*
178 * we're now ready to send
179 */
180 if (write(host->s, buf, len) != len) {
181 WARNING("riemann_write: could not send out full packet");
182 return -1;
183 }
184 free(buf);
185 return 0;
186 }
188 int
189 riemann_connect(struct riemann_host *host)
190 {
191 int e;
192 struct addrinfo *ai, *res, hints;
193 struct sockaddr_in *sin4;
194 struct sockaddr_in6 *sin6;
196 if (host->flags & F_CONNECT)
197 return 0;
199 memset(&hints, 0, sizeof(hints));
200 hints.ai_family = PF_UNSPEC;
201 hints.ai_socktype = SOCK_DGRAM;
203 if ((e = getaddrinfo(host->name, NULL, &hints, &res)) != 0) {
204 WARNING("could not resolve host \"%s\": %s",
205 host->name, gai_strerror(e));
206 return -1;
207 }
209 for (ai = res; ai != NULL; ai = ai->ai_next) {
210 pthread_mutex_lock(&host->lock);
211 /*
212 * check if another thread did not already succesfully connect
213 */
214 if (host->flags & F_CONNECT) {
215 freeaddrinfo(res);
216 return 0;
217 }
219 if ((host->s = socket(ai->ai_family, SOCK_DGRAM, 0)) == -1) {
220 pthread_mutex_unlock(&host->lock);
221 WARNING("riemann_connect: could not open socket");
222 freeaddrinfo(res);
223 return -1;
224 }
226 switch (ai->ai_family) {
227 case AF_INET:
228 sin4 = (struct sockaddr_in *)ai->ai_addr;
229 sin4->sin_port = ntohs(host->port);
230 break;
231 case AF_INET6:
232 sin6 = (struct sockaddr_in6 *)ai->ai_addr;
233 sin6->sin6_port = ntohs(host->port);
234 break;
235 default:
236 WARNING("riemann_connect: unsupported address family");
237 close(host->s);
238 pthread_mutex_unlock(&host->lock);
239 freeaddrinfo(res);
240 return -1;
241 }
243 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
244 close(host->s);
245 host->flags |= ~F_CONNECT;
246 pthread_mutex_unlock(&host->lock);
247 freeaddrinfo(res);
248 return -1;
249 }
250 host->flags |= F_CONNECT;
251 DEBUG("got a succesful connection for: %s", host->name);
252 pthread_mutex_unlock(&host->lock);
253 break;
254 }
256 freeaddrinfo(res);
257 if (ai == NULL) {
258 WARNING("riemann_connect: no suitable hosts found");
259 return -1;
260 }
262 return 0;
263 }
265 void
266 riemann_free(void *p)
267 {
268 struct riemann_host *host = p;
270 if (host->flags & F_CONNECT)
271 close(host->s);
272 sfree(host);
273 }
275 int
276 riemann_config_host(oconfig_item_t *ci)
277 {
278 struct riemann_host *host = NULL;
279 int status = 0;
280 int i;
281 oconfig_item_t *child;
282 char cb_name[DATA_MAX_NAME_LEN];
283 user_data_t ud;
285 if (ci->values_num != 1 ||
286 ci->values[0].type != OCONFIG_TYPE_STRING) {
287 WARNING("riemann hosts need one string argument");
288 return -1;
289 }
291 if ((host = calloc(1, sizeof (*host))) == NULL) {
292 WARNING("riemann host allocation failed");
293 return ENOMEM;
294 }
296 if (cf_util_get_string_buffer(ci, host->name,
297 sizeof(host->name)) != 0) {
298 WARNING("riemann host name too long");
299 sfree(host);
300 return -1;
301 }
303 host->port = RIEMANN_PORT;
304 host->delay = RIEMANN_DELAY;
305 for (i = 0; i < ci->children_num; i++) {
306 /*
307 * The code here could be simplified but makes room
308 * for easy adding of new options later on.
309 */
310 child = &ci->children[i];
311 status = 0;
313 if (strcasecmp(child->key, "port") == 0) {
314 if ((status = cf_util_get_port_number(child)) < 0) {
315 WARNING("invalid port number");
316 break;
317 }
318 host->port = status;
319 status = 0;
320 } else if (strcasecmp(child->key, "delay") == 0) {
321 if ((status = cf_util_get_int(ci, &host->delay)) != 0)
322 break;
323 } else {
324 WARNING("riemann plugin: ignoring unknown config "
325 "option: \"%s\"", child->key);
326 }
327 }
328 if (status != 0) {
329 sfree(host);
330 return status;
331 }
333 pthread_mutex_init(&host->lock, NULL);
334 ssnprintf(cb_name, sizeof(cb_name), "riemann/%s:%d", host->name, host->port);
335 DEBUG("riemann cb_name: %s", cb_name);
336 ud.data = host;
337 ud.free_func = riemann_free;
339 if ((status = plugin_register_write(cb_name, riemann_write, &ud)) != 0)
340 riemann_free(host);
342 return status;
343 }
345 int
346 riemann_config(oconfig_item_t *ci)
347 {
348 int i;
349 char *newtag;
350 oconfig_item_t *child;
352 for (i = 0; i < ci->children_num; i++) {
353 child = &ci->children[i];
355 if (strcasecmp(child->key, "host") == 0) {
356 riemann_config_host(child);
357 } else if (strcasecmp(child->key, "tag") == 0) {
358 if (riemann_tagcount >= RIEMANN_EXTRA_TAGS) {
359 WARNING("riemann plugin: too many tags");
360 return -1;
361 }
362 newtag = NULL;
363 cf_util_get_string(child, &newtag);
364 if (newtag == NULL)
365 return -1;
366 riemann_tags[riemann_tagcount++] = newtag;
367 DEBUG("riemann_config: got tag: %s", newtag);
369 } else {
370 WARNING ("riemann plugin: Ignoring unknown "
371 "configuration option \"%s\" at top level.",
372 child->key);
373 }
374 }
375 return (0);
376 }
378 void
379 module_register(void)
380 {
381 DEBUG("riemann: module_register");
383 plugin_register_complex_config ("riemann", riemann_config);
384 }