1 /**
2 * collectd - src/write_graphite.c
3 * Copyright (C) 2011 Scott Sanders
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; only version 2 of the License is applicable.
8 *
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 *
18 * Author:
19 * Scott Sanders <scott@jssjr.com>
20 *
21 * based on the excellent write_http plugin
22 **/
24 /* write_graphite plugin configuation example
25 *
26 * <Plugin write_graphite>
27 * <Carbon "local-agent">
28 * Host "localhost"
29 * Port 2003
30 * Prefix "collectd"
31 * </Carbon>
32 * </Plugin>
33 */
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "configfile.h"
40 #include "utils_cache.h"
41 #include "utils_parse_option.h"
43 /* Folks without pthread will need to disable this plugin. */
44 #include <pthread.h>
46 #include <sys/socket.h>
47 #include <sys/stat.h>
48 #include <sys/types.h>
50 #include <netinet/in.h>
51 #include <netdb.h>
53 #ifndef WG_FORMAT_NAME
54 #define WG_FORMAT_NAME(ret, ret_len, vl, prefix, name) \
55 wg_format_name (ret, ret_len, (vl)->host, (vl)->plugin, (vl)->plugin_instance, \
56 (vl)->type, (vl)->type_instance, prefix, name)
57 #endif
59 /*
60 * Private variables
61 */
62 struct wg_callback
63 {
64 char *name;
66 int sock_fd;
67 struct hostent *server;
69 char *host;
70 int port;
71 char *prefix;
73 char send_buf[4096];
74 size_t send_buf_free;
75 size_t send_buf_fill;
76 cdtime_t send_buf_init_time;
78 pthread_mutex_t send_lock;
79 };
82 /*
83 * Functions
84 */
85 static void wg_reset_buffer (struct wg_callback *cb)
86 {
87 memset (cb->send_buf, 0, sizeof (cb->send_buf));
88 cb->send_buf_free = sizeof (cb->send_buf);
89 cb->send_buf_fill = 0;
90 cb->send_buf_init_time = cdtime ();
91 }
93 static int wg_send_buffer (struct wg_callback *cb)
94 {
95 int status = 0;
97 status = write (cb->sock_fd, cb->send_buf, strlen (cb->send_buf));
98 if (status < 0)
99 {
100 ERROR ("write_graphite plugin: send failed with "
101 "status %i (%s)",
102 status,
103 strerror (errno));
105 pthread_mutex_lock (&cb->send_lock);
107 DEBUG ("write_graphite plugin: closing socket and restting fd "
108 "so reinit will occur");
109 close (cb->sock_fd);
110 cb->sock_fd = -1;
112 pthread_mutex_unlock (&cb->send_lock);
114 return (-1);
115 }
116 return (0);
117 }
119 static int wg_flush_nolock (cdtime_t timeout, struct wg_callback *cb)
120 {
121 int status;
123 DEBUG ("write_graphite plugin: wg_flush_nolock: timeout = %.3f; "
124 "send_buf_fill = %zu;",
125 (double)timeout,
126 cb->send_buf_fill);
128 /* timeout == 0 => flush unconditionally */
129 if (timeout > 0)
130 {
131 cdtime_t now;
133 now = cdtime ();
134 if ((cb->send_buf_init_time + timeout) > now)
135 return (0);
136 }
138 if (cb->send_buf_fill <= 0)
139 {
140 cb->send_buf_init_time = cdtime ();
141 return (0);
142 }
144 status = wg_send_buffer (cb);
145 wg_reset_buffer (cb);
147 return (status);
148 }
150 static int wg_callback_init (struct wg_callback *cb)
151 {
152 int status;
154 struct sockaddr_in serv_addr;
156 if (cb->sock_fd > 0)
157 return (0);
159 cb->sock_fd = socket (AF_INET, SOCK_STREAM, 0);
160 if (cb->sock_fd < 0)
161 {
162 ERROR ("write_graphite plugin: socket failed: %s", strerror (errno));
163 return (-1);
164 }
165 cb->server = gethostbyname(cb->host);
166 if (cb->server == NULL)
167 {
168 ERROR ("write_graphite plugin: no such host");
169 return (-1);
170 }
171 memset (&serv_addr, 0, sizeof (serv_addr));
172 serv_addr.sin_family = AF_INET;
173 memcpy (&serv_addr.sin_addr.s_addr,
174 cb->server->h_addr,
175 cb->server->h_length);
176 serv_addr.sin_port = htons(cb->port);
178 status = connect(cb->sock_fd, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
179 if (status < 0)
180 {
181 char errbuf[1024];
182 sstrerror (errno, errbuf, sizeof (errbuf));
183 ERROR ("write_graphite plugin: connect failed: %s", errbuf);
184 close (cb->sock_fd);
185 cb->sock_fd = -1;
186 return (-1);
187 }
189 wg_reset_buffer (cb);
191 return (0);
192 }
194 static void wg_callback_free (void *data)
195 {
196 struct wg_callback *cb;
198 if (data == NULL)
199 return;
201 cb = data;
203 wg_flush_nolock (/* timeout = */ 0, cb);
205 close(cb->sock_fd);
206 sfree(cb->name);
207 sfree(cb->host);
208 sfree(cb->prefix);
210 sfree(cb);
211 }
213 static int wg_flush (cdtime_t timeout,
214 const char *identifier __attribute__((unused)),
215 user_data_t *user_data)
216 {
217 struct wg_callback *cb;
218 int status;
220 if (user_data == NULL)
221 return (-EINVAL);
223 cb = user_data->data;
225 pthread_mutex_lock (&cb->send_lock);
227 if (cb->sock_fd < 0)
228 {
229 status = wg_callback_init (cb);
230 if (status != 0)
231 {
232 ERROR ("write_graphite plugin: wg_callback_init failed.");
233 pthread_mutex_unlock (&cb->send_lock);
234 return (-1);
235 }
236 }
238 status = wg_flush_nolock (timeout, cb);
239 pthread_mutex_unlock (&cb->send_lock);
241 return (status);
242 }
244 static int wg_format_values (char *ret, size_t ret_len,
245 int ds_num, const data_set_t *ds, const value_list_t *vl,
246 _Bool store_rates)
247 {
248 size_t offset = 0;
249 int status;
250 gauge_t *rates = NULL;
252 assert (0 == strcmp (ds->type, vl->type));
254 memset (ret, 0, ret_len);
256 #define BUFFER_ADD(...) do { \
257 status = ssnprintf (ret + offset, ret_len - offset, \
258 __VA_ARGS__); \
259 if (status < 1) \
260 { \
261 sfree (rates); \
262 return (-1); \
263 } \
264 else if (((size_t) status) >= (ret_len - offset)) \
265 { \
266 sfree (rates); \
267 return (-1); \
268 } \
269 else \
270 offset += ((size_t) status); \
271 } while (0)
273 if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
274 BUFFER_ADD ("%f", vl->values[ds_num].gauge);
275 else if (store_rates)
276 {
277 if (rates == NULL)
278 rates = uc_get_rate (ds, vl);
279 if (rates == NULL)
280 {
281 WARNING ("format_values: "
282 "uc_get_rate failed.");
283 return (-1);
284 }
285 BUFFER_ADD ("%g", rates[ds_num]);
286 }
287 else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
288 BUFFER_ADD ("%llu", vl->values[ds_num].counter);
289 else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
290 BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive);
291 else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
292 BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute);
293 else
294 {
295 ERROR ("format_values plugin: Unknown data source type: %i",
296 ds->ds[ds_num].type);
297 sfree (rates);
298 return (-1);
299 }
301 #undef BUFFER_ADD
303 sfree (rates);
304 return (0);
305 }
307 static int normalize_hostname (char *dst, const char *src)
308 {
309 size_t i;
311 int reps = 0;
313 for (i = 0; i < strlen(src) ; i++)
314 {
315 if (src[i] == '.')
316 {
317 dst[i] = '_';
318 ++reps;
319 }
320 else
321 dst[i] = src[i];
322 }
323 dst[i] = '\0';
325 return reps;
326 }
328 static int wg_format_name (char *ret, int ret_len,
329 const char *hostname,
330 const char *plugin, const char *plugin_instance,
331 const char *type, const char *type_instance,
332 const char *prefix, const char *ds_name)
333 {
334 int status;
335 char *n_hostname;
337 assert (plugin != NULL);
338 assert (type != NULL);
340 if ((n_hostname = malloc(strlen(hostname)+1)) == NULL)
341 {
342 ERROR ("Unable to allocate memory for normalized hostname buffer");
343 return (-1);
344 }
346 if (normalize_hostname(n_hostname, hostname) == -1)
347 {
348 ERROR ("Unable to normalize hostname");
349 return (-1);
350 }
352 if ((plugin_instance == NULL) || (strlen (plugin_instance) == 0))
353 {
354 if ((type_instance == NULL) || (strlen (type_instance) == 0))
355 {
356 if ((ds_name == NULL) || (strlen (ds_name) == 0))
357 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s",
358 prefix, n_hostname, plugin, type);
359 else
360 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s",
361 prefix, n_hostname, plugin, type, ds_name);
362 }
363 else
364 {
365 if ((ds_name == NULL) || (strlen (ds_name) == 0))
366 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s-%s",
367 prefix, n_hostname, plugin, type,
368 type_instance);
369 else
370 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s-%s.%s",
371 prefix, n_hostname, plugin, type,
372 type_instance, ds_name);
373 }
374 }
375 else
376 {
377 if ((type_instance == NULL) || (strlen (type_instance) == 0))
378 {
379 if ((ds_name == NULL) || (strlen (ds_name) == 0))
380 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s",
381 prefix, n_hostname, plugin,
382 plugin_instance, type);
383 else
384 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s.%s",
385 prefix, n_hostname, plugin,
386 plugin_instance, type, ds_name);
387 }
388 else
389 {
390 if ((ds_name == NULL) || (strlen (ds_name) == 0))
391 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s-%s",
392 prefix, n_hostname, plugin,
393 plugin_instance, type, type_instance);
394 else
395 status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s-%s.%s",
396 prefix, n_hostname, plugin,
397 plugin_instance, type, type_instance, ds_name);
398 }
399 }
401 sfree(n_hostname);
403 if ((status < 1) || (status >= ret_len))
404 return (-1);
405 return (0);
406 }
408 static int wg_send_message (const char* key, const char* value, cdtime_t time, struct wg_callback *cb)
409 {
410 int status;
411 size_t message_len;
412 char message[1024];
414 message_len = (size_t) ssnprintf (message, sizeof (message),
415 "%s %s %.0f\n",
416 key,
417 value,
418 CDTIME_T_TO_DOUBLE(time));
419 if (message_len >= sizeof (message)) {
420 ERROR ("write_graphite plugin: message buffer too small: "
421 "Need %zu bytes.", message_len + 1);
422 return (-1);
423 }
426 pthread_mutex_lock (&cb->send_lock);
428 if (cb->sock_fd < 0)
429 {
430 status = wg_callback_init (cb);
431 if (status != 0)
432 {
433 ERROR ("write_graphite plugin: wg_callback_init failed.");
434 pthread_mutex_unlock (&cb->send_lock);
435 return (-1);
436 }
437 }
439 if (message_len >= cb->send_buf_free)
440 {
441 status = wg_flush_nolock (/* timeout = */ 0, cb);
442 if (status != 0)
443 {
444 pthread_mutex_unlock (&cb->send_lock);
445 return (status);
446 }
447 }
448 assert (message_len < cb->send_buf_free);
450 /* `message_len + 1' because `message_len' does not include the
451 * trailing null byte. Neither does `send_buffer_fill'. */
452 memcpy (cb->send_buf + cb->send_buf_fill,
453 message, message_len + 1);
454 cb->send_buf_fill += message_len;
455 cb->send_buf_free -= message_len;
457 DEBUG ("write_graphite plugin: <%s:%d> buf %zu/%zu (%g%%) \"%s\"",
458 cb->host,
459 cb->port,
460 cb->send_buf_fill, sizeof (cb->send_buf),
461 100.0 * ((double) cb->send_buf_fill) / ((double) sizeof (cb->send_buf)),
462 message);
464 /* Check if we have enough space for this message. */
465 pthread_mutex_unlock (&cb->send_lock);
467 return (0);
468 }
470 static int wg_write_messages (const data_set_t *ds, const value_list_t *vl,
471 struct wg_callback *cb)
472 {
473 char key[10*DATA_MAX_NAME_LEN];
474 char values[512];
476 int status, i;
478 if (0 != strcmp (ds->type, vl->type))
479 {
480 ERROR ("write_graphite plugin: DS type does not match "
481 "value list type");
482 return -1;
483 }
485 if (ds->ds_num > 1)
486 {
487 for (i = 0; i < ds->ds_num; i++)
488 {
489 /* Copy the identifier to `key' and escape it. */
490 status = WG_FORMAT_NAME (key, sizeof (key), vl, cb->prefix, ds->ds[i].name);
491 if (status != 0)
492 {
493 ERROR ("write_graphite plugin: error with format_name");
494 return (status);
495 }
497 escape_string (key, sizeof (key));
498 /* Convert the values to an ASCII representation and put that into
499 * `values'. */
500 status = wg_format_values (values, sizeof (values), i, ds, vl, 0);
501 if (status != 0)
502 {
503 ERROR ("write_graphite plugin: error with "
504 "wg_format_values");
505 return (status);
506 }
508 /* Send the message to graphite */
509 status = wg_send_message (key, values, vl->time, cb);
510 if (status != 0)
511 {
512 ERROR ("write_graphite plugin: error with "
513 "wg_send_message");
514 return (status);
515 }
516 }
517 }
518 else
519 {
520 /* Copy the identifier to `key' and escape it. */
521 status = WG_FORMAT_NAME (key, sizeof (key), vl, cb->prefix, NULL);
522 if (status != 0)
523 {
524 ERROR ("write_graphite plugin: error with format_name");
525 return (status);
526 }
528 escape_string (key, sizeof (key));
529 /* Convert the values to an ASCII representation and put that into
530 * `values'. */
531 status = wg_format_values (values, sizeof (values), 0, ds, vl, 0);
532 if (status != 0)
533 {
534 ERROR ("write_graphite plugin: error with "
535 "wg_format_values");
536 return (status);
537 }
539 /* Send the message to graphite */
540 status = wg_send_message (key, values, vl->time, cb);
541 if (status != 0)
542 {
543 ERROR ("write_graphite plugin: error with "
544 "wg_send_message");
545 return (status);
546 }
547 }
549 return (0);
550 }
552 static int wg_write (const data_set_t *ds, const value_list_t *vl,
553 user_data_t *user_data)
554 {
555 struct wg_callback *cb;
556 int status;
558 if (user_data == NULL)
559 return (-EINVAL);
561 cb = user_data->data;
563 status = wg_write_messages (ds, vl, cb);
565 return (status);
566 }
568 static int config_set_number (int *dest,
569 oconfig_item_t *ci)
570 {
571 if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER))
572 {
573 WARNING ("write_graphite plugin: The `%s' config option "
574 "needs exactly one numeric argument.", ci->key);
575 return (-1);
576 }
578 *dest = ci->values[0].value.number;
580 return (0);
581 }
583 static int config_set_string (char **ret_string,
584 oconfig_item_t *ci)
585 {
586 char *string;
588 if ((ci->values_num != 1)
589 || (ci->values[0].type != OCONFIG_TYPE_STRING))
590 {
591 WARNING ("write_graphite plugin: The `%s' config option "
592 "needs exactly one string argument.", ci->key);
593 return (-1);
594 }
596 string = strdup (ci->values[0].value.string);
597 if (string == NULL)
598 {
599 ERROR ("write_graphite plugin: strdup failed.");
600 return (-1);
601 }
603 if (*ret_string != NULL)
604 sfree (*ret_string);
605 *ret_string = string;
607 return (0);
608 }
610 static int wg_config_carbon (oconfig_item_t *ci)
611 {
612 struct wg_callback *cb;
613 user_data_t user_data;
614 int i;
616 cb = malloc (sizeof (*cb));
617 if (cb == NULL)
618 {
619 ERROR ("write_graphite plugin: malloc failed.");
620 return (-1);
621 }
622 memset (cb, 0, sizeof (*cb));
623 cb->sock_fd = -1;
624 cb->host = NULL;
625 cb->name = NULL;
626 cb->port = 2003;
627 cb->prefix = NULL;
628 cb->server = NULL;
630 pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
632 config_set_string (&cb->name, ci);
633 if (cb->name == NULL)
634 return (-1);
636 for (i = 0; i < ci->children_num; i++)
637 {
638 oconfig_item_t *child = ci->children + i;
640 if (strcasecmp ("Host", child->key) == 0)
641 config_set_string (&cb->host, child);
642 else if (strcasecmp ("Port", child->key) == 0)
643 config_set_number (&cb->port, child);
644 else if (strcasecmp ("Prefix", child->key) == 0)
645 config_set_string (&cb->prefix, child);
646 else
647 {
648 ERROR ("write_graphite plugin: Invalid configuration "
649 "option: %s.", child->key);
650 }
651 }
653 DEBUG ("write_graphite: Registering write callback to carbon agent "
654 "%s:%d", cb->host, cb->port);
656 memset (&user_data, 0, sizeof (user_data));
657 user_data.data = cb;
658 user_data.free_func = NULL;
659 plugin_register_flush ("write_graphite", wg_flush, &user_data);
661 user_data.free_func = wg_callback_free;
662 plugin_register_write ("write_graphite", wg_write, &user_data);
664 return (0);
665 }
667 static int wg_config (oconfig_item_t *ci)
668 {
669 int i;
671 for (i = 0; i < ci->children_num; i++)
672 {
673 oconfig_item_t *child = ci->children + i;
675 if (strcasecmp ("Carbon", child->key) == 0)
676 wg_config_carbon (child);
677 else
678 {
679 ERROR ("write_graphite plugin: Invalid configuration "
680 "option: %s.", child->key);
681 }
682 }
684 return (0);
685 }
687 void module_register (void)
688 {
689 plugin_register_complex_config ("write_graphite", wg_config);
690 }
692 /* vim: set sw=4 ts=4 sts=4 tw=78 et : */