X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fwrite_graphite.c;h=6b16bec215539245220fa662352604aad4ecef83;hb=f7f4cc14a2eb055b6097a2946fd15bd2b8ac39e4;hp=9f7e5e74c75cbafba981e1caaee3c2c9d3a8eac6;hpb=6c16a604aacf313751cac123ddc04dbf626788f8;p=collectd.git diff --git a/src/write_graphite.c b/src/write_graphite.c index 9f7e5e74..6b16bec2 100644 --- a/src/write_graphite.c +++ b/src/write_graphite.c @@ -1,6 +1,10 @@ /** * collectd - src/write_graphite.c - * Copyright (C) 2011 Scott Sanders + * Copyright (C) 2012 Pierre-Yves Ritschard + * Copyright (C) 2011 Scott Sanders + * Copyright (C) 2009 Paul Sadauskas + * Copyright (C) 2009 Doug MacEachern + * Copyright (C) 2007-2012 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -15,10 +19,14 @@ * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * - * Author: - * Scott Sanders + * Authors: + * Florian octo Forster + * Doug MacEachern + * Paul Sadauskas + * Scott Sanders + * Pierre-Yves Ritschard * - * based on the excellent write_http plugin + * Based on the write_http plugin. **/ /* write_graphite plugin configuation example @@ -26,7 +34,7 @@ * * * Host "localhost" - * Port 2003 + * Port "2003" * Prefix "collectd" * * @@ -44,21 +52,27 @@ #include #include -#include -#include - -#include #include #ifndef WG_FORMAT_NAME -#define WG_FORMAT_NAME(ret, ret_len, vl, prefix, name) \ - wg_format_name (ret, ret_len, (vl)->host, (vl)->plugin, \ - (vl)->plugin_instance, (vl)->type, \ - (vl)->type_instance, prefix, name) +#define WG_FORMAT_NAME(ret, ret_len, vl, cb, ds_name) \ + wg_format_name (ret, ret_len, (vl)->host, \ + (vl)->plugin, (vl)->plugin_instance, \ + (vl)->type, (vl)->type_instance, \ + (cb)->prefix, (cb)->postfix, \ + ds_name, (cb)->escape_char) +#endif + +#ifndef WG_DEFAULT_NODE +# define WG_DEFAULT_NODE "localhost" +#endif + +#ifndef WG_DEFAULT_SERVICE +# define WG_DEFAULT_SERVICE "2003" #endif #ifndef WG_SEND_BUF_SIZE -#define WG_SEND_BUF_SIZE 4096 +# define WG_SEND_BUF_SIZE 4096 #endif /* @@ -67,11 +81,12 @@ struct wg_callback { int sock_fd; - struct hostent *server; - char *host; - int port; + char *node; + char *service; char *prefix; + char *postfix; + char escape_char; char send_buf[WG_SEND_BUF_SIZE]; size_t send_buf_free; @@ -105,20 +120,15 @@ static int wg_send_buffer (struct wg_callback *cb) status, strerror (errno)); - pthread_mutex_lock (&cb->send_lock); - - DEBUG ("write_graphite plugin: closing socket and restting fd " - "so reinit will occur"); close (cb->sock_fd); cb->sock_fd = -1; - pthread_mutex_unlock (&cb->send_lock); - return (-1); } return (0); } +/* NOTE: You must hold cb->send_lock when calling this function! */ static int wg_flush_nolock (cdtime_t timeout, struct wg_callback *cb) { int status; @@ -152,42 +162,62 @@ static int wg_flush_nolock (cdtime_t timeout, struct wg_callback *cb) static int wg_callback_init (struct wg_callback *cb) { + struct addrinfo ai_hints; + struct addrinfo *ai_list; + struct addrinfo *ai_ptr; int status; - struct sockaddr_in serv_addr; + const char *node = cb->node ? cb->node : WG_DEFAULT_NODE; + const char *service = cb->service ? cb->service : WG_DEFAULT_SERVICE; if (cb->sock_fd > 0) return (0); - cb->sock_fd = socket (AF_INET, SOCK_STREAM, 0); - if (cb->sock_fd < 0) + memset (&ai_hints, 0, sizeof (ai_hints)); +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_STREAM; + + ai_list = NULL; + + status = getaddrinfo (node, service, &ai_hints, &ai_list); + if (status != 0) { - ERROR ("write_graphite plugin: socket failed: %s", strerror (errno)); + ERROR ("write_graphite plugin: getaddrinfo (%s, %s) failed: %s", + node, service, gai_strerror (status)); return (-1); } - cb->server = gethostbyname(cb->host); - if (cb->server == NULL) + + assert (ai_list != NULL); + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { - ERROR ("write_graphite plugin: no such host"); - return (-1); + cb->sock_fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, + ai_ptr->ai_protocol); + if (cb->sock_fd < 0) + continue; + + status = connect (cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + close (cb->sock_fd); + cb->sock_fd = -1; + continue; + } + + break; } - memset (&serv_addr, 0, sizeof (serv_addr)); - serv_addr.sin_family = AF_INET; - memcpy (&serv_addr.sin_addr.s_addr, - cb->server->h_addr, - cb->server->h_length); - serv_addr.sin_port = htons(cb->port); - - status = connect(cb->sock_fd, - (struct sockaddr *) &serv_addr, - sizeof(serv_addr)); - if (status < 0) + + freeaddrinfo (ai_list); + + if (cb->sock_fd < 0) { char errbuf[1024]; - sstrerror (errno, errbuf, sizeof (errbuf)); - ERROR ("write_graphite plugin: connect failed: %s", errbuf); + ERROR ("write_graphite plugin: Connecting to %s:%s failed. " + "The last error was: %s", node, service, + sstrerror (errno, errbuf, sizeof (errbuf))); close (cb->sock_fd); - cb->sock_fd = -1; return (-1); } @@ -205,11 +235,19 @@ static void wg_callback_free (void *data) cb = data; + pthread_mutex_lock (&cb->send_lock); + wg_flush_nolock (/* timeout = */ 0, cb); close(cb->sock_fd); - sfree(cb->host); + cb->sock_fd = -1; + + sfree(cb->node); + sfree(cb->service); sfree(cb->prefix); + sfree(cb->postfix); + + pthread_mutex_destroy (&cb->send_lock); sfree(cb); } @@ -308,114 +346,99 @@ static int wg_format_values (char *ret, size_t ret_len, return (0); } -static int mangle_dots (char *dst, const char *src) +static void wg_copy_escape_part (char *dst, const char *src, size_t dst_len, + char escape_char) { size_t i; - int reps = 0; + memset (dst, 0, dst_len); + + if (src == NULL) + return; - for (i = 0; i < strlen(src) ; i++) + for (i = 0; i < dst_len; i++) { - if (src[i] == '.') - { - dst[i] = '_'; - ++reps; - } + if ((src[i] == '.') + || isspace ((int) src[i]) + || iscntrl ((int) src[i])) + dst[i] = escape_char; else dst[i] = src[i]; - } - dst[i] = '\0'; - return reps; + if (src[i] == 0) + break; + } } static int wg_format_name (char *ret, int ret_len, const char *hostname, const char *plugin, const char *plugin_instance, const char *type, const char *type_instance, - const char *prefix, const char *ds_name) + const char *prefix, const char *postfix, + const char *ds_name, char escape_char) { + char n_hostname[DATA_MAX_NAME_LEN]; + char n_plugin[DATA_MAX_NAME_LEN]; + char n_plugin_instance[DATA_MAX_NAME_LEN]; + char n_type[DATA_MAX_NAME_LEN]; + char n_type_instance[DATA_MAX_NAME_LEN]; int status; - char *n_hostname = 0; - char *n_ds_name = 0; + assert (hostname != NULL); assert (plugin != NULL); assert (type != NULL); - - if ((n_hostname = malloc(strlen(hostname)+1)) == NULL) + assert (ds_name != NULL); + + if (prefix == NULL) + prefix = ""; + + if (postfix == NULL) + postfix = ""; + + wg_copy_escape_part (n_hostname, hostname, + sizeof (n_hostname), escape_char); + wg_copy_escape_part (n_plugin, plugin, + sizeof (n_plugin), escape_char); + wg_copy_escape_part (n_plugin_instance, plugin_instance, + sizeof (n_plugin_instance), escape_char); + wg_copy_escape_part (n_type, type, + sizeof (n_type), escape_char); + wg_copy_escape_part (n_type_instance, type_instance, + sizeof (n_type_instance), escape_char); + + if (n_plugin_instance[0] == '\0') { - ERROR ("Unable to allocate memory for normalized hostname buffer"); - return (-1); - } - - if (mangle_dots(n_hostname, hostname) == -1) - { - ERROR ("Unable to normalize hostname"); - return (-1); - } - - if (ds_name && ds_name[0] != '\0') { - if (mangle_dots(n_ds_name, ds_name) == -1) + if (n_type_instance[0] == '\0') { - ERROR ("Unable to normalize datasource name"); - return (-1); - } - } - - if ((plugin_instance == NULL) || (plugin_instance[0] == '\0')) - { - if ((type_instance == NULL) || (type_instance[0] == '\0')) - { - if ((ds_name == NULL) || (ds_name[0] == '\0')) - status = ssnprintf (ret, ret_len, "%s.%s.%s.%s", - prefix, n_hostname, plugin, type); - else - status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s", - prefix, n_hostname, plugin, type, n_ds_name); + status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s", + prefix, n_hostname, postfix, n_plugin, n_type, ds_name); } else { - if ((ds_name == NULL) || (ds_name[0] == '\0')) - status = ssnprintf (ret, ret_len, "%s.%s.%s.%s-%s", - prefix, n_hostname, plugin, type, - type_instance); - else - status = ssnprintf (ret, ret_len, "%s.%s.%s.%s-%s.%s", - prefix, n_hostname, plugin, type, - type_instance, n_ds_name); + status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s-%s.%s", + prefix, n_hostname, postfix, n_plugin, n_type, + n_type_instance, ds_name); } } else { - if ((type_instance == NULL) || (type_instance[0] == '\0')) + if (n_type_instance[0] == '\0') { - if ((ds_name == NULL) || (ds_name[0] == '\0')) - status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s", - prefix, n_hostname, plugin, - plugin_instance, type); - else - status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s.%s", - prefix, n_hostname, plugin, - plugin_instance, type, n_ds_name); + status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s.%s", + prefix, n_hostname, postfix, n_plugin, + n_plugin_instance, n_type, ds_name); } else { - if ((ds_name == NULL) || (ds_name[0] == '\0')) - status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s-%s", - prefix, n_hostname, plugin, - plugin_instance, type, type_instance); - else - status = ssnprintf (ret, ret_len, "%s.%s.%s.%s.%s-%s.%s", - prefix, n_hostname, plugin, - plugin_instance, type, type_instance, n_ds_name); + status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s-%s.%s", + prefix, n_hostname, postfix, n_plugin, + n_plugin_instance, n_type, n_type_instance, ds_name); } } - sfree(n_hostname); - sfree(n_ds_name); - if ((status < 1) || (status >= ret_len)) return (-1); + return (0); } @@ -469,9 +492,9 @@ static int wg_send_message (const char* key, const char* value, cb->send_buf_fill += message_len; cb->send_buf_free -= message_len; - DEBUG ("write_graphite plugin: <%s:%d> buf %zu/%zu (%g%%) \"%s\"", - cb->host, - cb->port, + DEBUG ("write_graphite plugin: <%s:%s> buf %zu/%zu (%g%%) \"%s\"", + cb->node, + cb->service, cb->send_buf_fill, sizeof (cb->send_buf), 100.0 * ((double) cb->send_buf_fill) / ((double) sizeof (cb->send_buf)), message); @@ -502,8 +525,7 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, for (i = 0; i < ds->ds_num; i++) { /* Copy the identifier to `key' and escape it. */ - status = WG_FORMAT_NAME (key, sizeof (key), vl, cb->prefix, \ - ds->ds[i].name); + status = WG_FORMAT_NAME (key, sizeof (key), vl, cb, ds->ds[i].name); if (status != 0) { ERROR ("write_graphite plugin: error with format_name"); @@ -534,7 +556,7 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, else { /* Copy the identifier to `key' and escape it. */ - status = WG_FORMAT_NAME (key, sizeof (key), vl, cb->prefix, NULL); + status = WG_FORMAT_NAME (key, sizeof (key), vl, cb, NULL); if (status != 0) { ERROR ("write_graphite plugin: error with format_name"); @@ -581,44 +603,17 @@ static int wg_write (const data_set_t *ds, const value_list_t *vl, return (status); } -static int config_set_number (int *dest, +static int config_set_char (char *dest, oconfig_item_t *ci) { - if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER)) - { - WARNING ("write_graphite plugin: The `%s' config option " - "needs exactly one numeric argument.", ci->key); - return (-1); - } - - *dest = ci->values[0].value.number; - - return (0); -} - -static int config_set_string (char **ret_string, - oconfig_item_t *ci) -{ - char *string; - - if ((ci->values_num != 1) - || (ci->values[0].type != OCONFIG_TYPE_STRING)) + if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) { WARNING ("write_graphite plugin: The `%s' config option " "needs exactly one string argument.", ci->key); return (-1); } - string = strdup (ci->values[0].value.string); - if (string == NULL) - { - ERROR ("write_graphite plugin: strdup failed."); - return (-1); - } - - if (*ret_string != NULL) - sfree (*ret_string); - *ret_string = string; + *dest = ci->values[0].value.string[0]; return (0); } @@ -637,10 +632,11 @@ static int wg_config_carbon (oconfig_item_t *ci) } memset (cb, 0, sizeof (*cb)); cb->sock_fd = -1; - cb->host = NULL; - cb->port = 2003; + cb->node = NULL; + cb->service = NULL; cb->prefix = NULL; - cb->server = NULL; + cb->postfix = NULL; + cb->escape_char = '_'; pthread_mutex_init (&cb->send_lock, /* attr = */ NULL); @@ -649,11 +645,15 @@ static int wg_config_carbon (oconfig_item_t *ci) oconfig_item_t *child = ci->children + i; if (strcasecmp ("Host", child->key) == 0) - config_set_string (&cb->host, child); + cf_util_get_string (child, &cb->node); else if (strcasecmp ("Port", child->key) == 0) - config_set_number (&cb->port, child); + cf_util_get_string (child, &cb->service); else if (strcasecmp ("Prefix", child->key) == 0) - config_set_string (&cb->prefix, child); + cf_util_get_string (child, &cb->prefix); + else if (strcasecmp ("Postfix", child->key) == 0) + cf_util_get_string (child, &cb->postfix); + else if (strcasecmp ("EscapeCharacter", child->key) == 0) + config_set_char (&cb->escape_char, child); else { ERROR ("write_graphite plugin: Invalid configuration " @@ -661,8 +661,9 @@ static int wg_config_carbon (oconfig_item_t *ci) } } - DEBUG ("write_graphite: Registering write callback to carbon agent " - "%s:%d", cb->host, cb->port); + DEBUG ("write_graphite: Registering write callback to carbon agent %s:%s", + cb->node ? cb->node : WG_DEFAULT_NODE, + cb->service ? cb->service : WG_DEFAULT_SERVICE); memset (&user_data, 0, sizeof (user_data)); user_data.data = cb;