X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fwrite_graphite.c;h=a3bead39b49147a8a72fca8ebf9c16489d095464;hb=7a6887ad7eef33e04bcb0720c213d05fd9be8a59;hp=827ec3881263a749eeb68d89c50d7a357a185ac4;hpb=efa4700ad47969749b0fca622294fd006b2d7cb8;p=collectd.git diff --git a/src/write_graphite.c b/src/write_graphite.c index 827ec388..a3bead39 100644 --- a/src/write_graphite.c +++ b/src/write_graphite.c @@ -1,6 +1,10 @@ /** * collectd - src/write_graphite.c - * Copyright (C) 2011 Scott Sanders + * Copyright (C) 2012 Pierre-Yves Ritschard + * Copyright (C) 2011 Scott Sanders + * Copyright (C) 2009 Paul Sadauskas + * Copyright (C) 2009 Doug MacEachern + * Copyright (C) 2007-2013 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -15,10 +19,14 @@ * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * - * Author: - * Scott Sanders + * Authors: + * Florian octo Forster + * Doug MacEachern + * Paul Sadauskas + * Scott Sanders + * Pierre-Yves Ritschard * - * based on the excellent write_http plugin + * Based on the write_http plugin. **/ /* write_graphite plugin configuation example @@ -38,26 +46,15 @@ #include "configfile.h" #include "utils_cache.h" +#include "utils_complain.h" #include "utils_parse_option.h" /* Folks without pthread will need to disable this plugin. */ #include #include -#include -#include - -#include #include -#ifndef WG_FORMAT_NAME -#define WG_FORMAT_NAME(ret, ret_len, vl, cb, name) \ - wg_format_name (ret, ret_len, (vl)->host, (vl)->plugin, \ - (vl)->plugin_instance, (vl)->type, \ - (vl)->type_instance, (cb)->prefix, (cb)->postfix, \ - name, (cb)->dotchar) -#endif - #ifndef WG_DEFAULT_NODE # define WG_DEFAULT_NODE "localhost" #endif @@ -66,8 +63,13 @@ # define WG_DEFAULT_SERVICE "2003" #endif +#ifndef WG_DEFAULT_ESCAPE +# define WG_DEFAULT_ESCAPE '_' +#endif + +/* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */ #ifndef WG_SEND_BUF_SIZE -# define WG_SEND_BUF_SIZE 4096 +# define WG_SEND_BUF_SIZE 1428 #endif /* @@ -76,13 +78,16 @@ struct wg_callback { int sock_fd; - struct hostent *server; char *node; char *service; char *prefix; char *postfix; - char dotchar; + char escape_char; + + _Bool store_rates; + _Bool separate_instances; + _Bool always_append_ds; char send_buf[WG_SEND_BUF_SIZE]; size_t send_buf_free; @@ -90,6 +95,7 @@ struct wg_callback cdtime_t send_buf_init_time; pthread_mutex_t send_lock; + c_complain_t init_complaint; }; @@ -106,30 +112,26 @@ static void wg_reset_buffer (struct wg_callback *cb) static int wg_send_buffer (struct wg_callback *cb) { - int status = 0; + ssize_t status = 0; - status = write (cb->sock_fd, cb->send_buf, strlen (cb->send_buf)); + status = swrite (cb->sock_fd, cb->send_buf, strlen (cb->send_buf)); if (status < 0) { - ERROR ("write_graphite plugin: send failed with " - "status %i (%s)", - status, - strerror (errno)); + char errbuf[1024]; + ERROR ("write_graphite plugin: send failed with status %zi (%s)", + status, sstrerror (errno, errbuf, sizeof (errbuf))); - pthread_mutex_trylock (&cb->send_lock); - DEBUG ("write_graphite plugin: closing socket and restting fd " - "so reinit will occur"); close (cb->sock_fd); cb->sock_fd = -1; - pthread_mutex_unlock (&cb->send_lock); - return (-1); } + return (0); } +/* NOTE: You must hold cb->send_lock when calling this function! */ static int wg_flush_nolock (cdtime_t timeout, struct wg_callback *cb) { int status; @@ -215,12 +217,19 @@ static int wg_callback_init (struct wg_callback *cb) if (cb->sock_fd < 0) { char errbuf[1024]; - ERROR ("write_graphite plugin: Connecting to %s:%s failed. " + c_complain (LOG_ERR, &cb->init_complaint, + "write_graphite plugin: Connecting to %s:%s failed. " "The last error was: %s", node, service, sstrerror (errno, errbuf, sizeof (errbuf))); close (cb->sock_fd); return (-1); } + else + { + c_release (LOG_INFO, &cb->init_complaint, + "write_graphite plugin: Successfully connected to %s:%s.", + node, service); + } wg_reset_buffer (cb); @@ -236,14 +245,20 @@ static void wg_callback_free (void *data) cb = data; + pthread_mutex_lock (&cb->send_lock); + wg_flush_nolock (/* timeout = */ 0, cb); close(cb->sock_fd); + cb->sock_fd = -1; + sfree(cb->node); sfree(cb->service); sfree(cb->prefix); sfree(cb->postfix); + pthread_mutex_destroy (&cb->send_lock); + sfree(cb); } @@ -266,7 +281,7 @@ static int wg_flush (cdtime_t timeout, status = wg_callback_init (cb); if (status != 0) { - ERROR ("write_graphite plugin: wg_callback_init failed."); + /* An error message has already been printed. */ pthread_mutex_unlock (&cb->send_lock); return (-1); } @@ -341,127 +356,92 @@ static int wg_format_values (char *ret, size_t ret_len, return (0); } -static int swap_chars (char *dst, const char *src, - const char from, const char to) +static void wg_copy_escape_part (char *dst, const char *src, size_t dst_len, + char escape_char) { size_t i; - int reps = 0; + memset (dst, 0, dst_len); - for (i = 0; i < strlen(src) ; i++) + if (src == NULL) + return; + + for (i = 0; i < dst_len; i++) { - if (src[i] == from) + if (src[i] == 0) { - dst[i] = to; - ++reps; + dst[i] = 0; + break; } + + if ((src[i] == '.') + || isspace ((int) src[i]) + || iscntrl ((int) src[i])) + dst[i] = escape_char; else dst[i] = src[i]; } - dst[i] = '\0'; - - return reps; } static int wg_format_name (char *ret, int ret_len, - const char *hostname, - const char *plugin, const char *plugin_instance, - const char *type, const char *type_instance, - const char *prefix, const char *postfix, - const char *ds_name, const char dotchar) + const value_list_t *vl, + const struct wg_callback *cb, + const char *ds_name) { - int status; - char *n_hostname = 0; - char *n_type_instance = 0; + char n_host[DATA_MAX_NAME_LEN]; + char n_plugin[DATA_MAX_NAME_LEN]; + char n_plugin_instance[DATA_MAX_NAME_LEN]; + char n_type[DATA_MAX_NAME_LEN]; + char n_type_instance[DATA_MAX_NAME_LEN]; + + char *prefix; + char *postfix; - assert (plugin != NULL); - assert (type != NULL); + char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1]; + char tmp_type[2 * DATA_MAX_NAME_LEN + 1]; + prefix = cb->prefix; if (prefix == NULL) prefix = ""; + postfix = cb->postfix; if (postfix == NULL) postfix = ""; - if ((n_hostname = malloc(strlen(hostname)+1)) == NULL) - { - ERROR ("Unable to allocate memory for normalized hostname buffer"); - return (-1); - } - - if (swap_chars(n_hostname, hostname, '.', dotchar) == -1) - { - ERROR ("Unable to normalize hostname"); - return (-1); - } - - if (type_instance && type_instance[0] != '\0') { - if ((n_type_instance = malloc(strlen(type_instance)+1)) == NULL) - { - ERROR ("Unable to allocate memory for normalized datasource name buffer"); - return (-1); - } - if (swap_chars(n_type_instance, type_instance, '.', dotchar) == -1) - { - ERROR ("Unable to normalize datasource name"); - return (-1); - } - } + wg_copy_escape_part (n_host, vl->host, + sizeof (n_host), cb->escape_char); + wg_copy_escape_part (n_plugin, vl->plugin, + sizeof (n_plugin), cb->escape_char); + wg_copy_escape_part (n_plugin_instance, vl->plugin_instance, + sizeof (n_plugin_instance), cb->escape_char); + wg_copy_escape_part (n_type, vl->type, + sizeof (n_type), cb->escape_char); + wg_copy_escape_part (n_type_instance, vl->type_instance, + sizeof (n_type_instance), cb->escape_char); + + if (n_plugin_instance[0] != '\0') + ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s", + n_plugin, + cb->separate_instances ? '.' : '-', + n_plugin_instance); + else + sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin)); - if ((plugin_instance == NULL) || (plugin_instance[0] == '\0')) - { - if ((n_type_instance == NULL) || (n_type_instance[0] == '\0')) - { - if ((ds_name == NULL) || (ds_name[0] == '\0')) - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s", - prefix, n_hostname, postfix, plugin, type); - else - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s", - prefix, n_hostname, postfix, plugin, type, ds_name); - } - else - { - if ((ds_name == NULL) || (ds_name[0] == '\0')) - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s-%s", - prefix, n_hostname, postfix, plugin, type, - n_type_instance); - else - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s-%s.%s", - prefix, n_hostname, postfix, plugin, type, - n_type_instance, ds_name); - } - } + if (n_type_instance[0] != '\0') + ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s", + n_type, + cb->separate_instances ? '.' : '-', + n_type_instance); else - { - if ((n_type_instance == NULL) || (n_type_instance[0] == '\0')) - { - if ((ds_name == NULL) || (ds_name[0] == '\0')) - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s", - prefix, n_hostname, postfix, plugin, - plugin_instance, type); - else - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s.%s", - prefix, n_hostname, postfix, plugin, - plugin_instance, type, ds_name); - } - else - { - if ((ds_name == NULL) || (ds_name[0] == '\0')) - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s-%s", - prefix, n_hostname, postfix, plugin, - plugin_instance, type, n_type_instance); - else - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s-%s.%s", - prefix, n_hostname, postfix, plugin, - plugin_instance, type, n_type_instance, ds_name); - } - } + sstrncpy (tmp_type, n_type, sizeof (tmp_type)); - sfree(n_hostname); - sfree(n_type_instance); + if (ds_name != NULL) + ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s", + prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name); + else + ssnprintf (ret, ret_len, "%s%s%s.%s.%s", + prefix, n_host, postfix, tmp_plugin, tmp_type); - if ((status < 1) || (status >= ret_len)) - return (-1); return (0); } @@ -473,17 +453,16 @@ static int wg_send_message (const char* key, const char* value, char message[1024]; message_len = (size_t) ssnprintf (message, sizeof (message), - "%s %s %.0f\n", + "%s %s %u\r\n", key, value, - CDTIME_T_TO_DOUBLE(time)); + (unsigned int) CDTIME_T_TO_TIME_T (time)); if (message_len >= sizeof (message)) { ERROR ("write_graphite plugin: message buffer too small: " "Need %zu bytes.", message_len + 1); return (-1); } - pthread_mutex_lock (&cb->send_lock); if (cb->sock_fd < 0) @@ -491,7 +470,7 @@ static int wg_send_message (const char* key, const char* value, status = wg_callback_init (cb); if (status != 0) { - ERROR ("write_graphite plugin: wg_callback_init failed."); + /* An error message has already been printed. */ pthread_mutex_unlock (&cb->send_lock); return (-1); } @@ -506,6 +485,8 @@ static int wg_send_message (const char* key, const char* value, return (status); } } + + /* Assert that we have enough space for this message. */ assert (message_len < cb->send_buf_free); /* `message_len + 1' because `message_len' does not include the @@ -515,14 +496,13 @@ static int wg_send_message (const char* key, const char* value, cb->send_buf_fill += message_len; cb->send_buf_free -= message_len; - DEBUG ("write_graphite plugin: <%s:%s> buf %zu/%zu (%g%%) \"%s\"", + DEBUG ("write_graphite plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", cb->node, cb->service, cb->send_buf_fill, sizeof (cb->send_buf), 100.0 * ((double) cb->send_buf_fill) / ((double) sizeof (cb->send_buf)), message); - /* Check if we have enough space for this message. */ pthread_mutex_unlock (&cb->send_lock); return (0); @@ -543,43 +523,15 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, return -1; } - if (ds->ds_num > 1) - { - for (i = 0; i < ds->ds_num; i++) - { - /* Copy the identifier to `key' and escape it. */ - status = WG_FORMAT_NAME (key, sizeof (key), vl, cb, ds->ds[i].name); - if (status != 0) - { - ERROR ("write_graphite plugin: error with format_name"); - return (status); - } - - escape_string (key, sizeof (key)); - /* Convert the values to an ASCII representation and put that - * into `values'. */ - status = wg_format_values (values, sizeof (values), i, ds, vl, 0); - if (status != 0) - { - ERROR ("write_graphite plugin: error with " - "wg_format_values"); - return (status); - } - - /* Send the message to graphite */ - status = wg_send_message (key, values, vl->time, cb); - if (status != 0) - { - ERROR ("write_graphite plugin: error with " - "wg_send_message"); - return (status); - } - } - } - else + for (i = 0; i < ds->ds_num; i++) { + const char *ds_name = NULL; + + if (cb->always_append_ds || (ds->ds_num > 1)) + ds_name = ds->ds[i].name; + /* Copy the identifier to `key' and escape it. */ - status = WG_FORMAT_NAME (key, sizeof (key), vl, cb, NULL); + status = wg_format_name (key, sizeof (key), vl, cb, ds_name); if (status != 0) { ERROR ("write_graphite plugin: error with format_name"); @@ -589,7 +541,8 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, escape_string (key, sizeof (key)); /* Convert the values to an ASCII representation and put that into * `values'. */ - status = wg_format_values (values, sizeof (values), 0, ds, vl, 0); + status = wg_format_values (values, sizeof (values), i, ds, vl, + cb->store_rates); if (status != 0) { ERROR ("write_graphite plugin: error with " @@ -601,8 +554,7 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, status = wg_send_message (key, values, vl->time, cb); if (status != 0) { - ERROR ("write_graphite plugin: error with " - "wg_send_message"); + /* An error message has already been printed. */ return (status); } } @@ -617,7 +569,7 @@ static int wg_write (const data_set_t *ds, const value_list_t *vl, int status; if (user_data == NULL) - return (-EINVAL); + return (EINVAL); cb = user_data->data; @@ -629,14 +581,30 @@ static int wg_write (const data_set_t *ds, const value_list_t *vl, static int config_set_char (char *dest, oconfig_item_t *ci) { - if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) + char buffer[4]; + int status; + + memset (buffer, 0, sizeof (buffer)); + + status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer)); + if (status != 0) + return (status); + + if (buffer[0] == 0) { - WARNING ("write_graphite plugin: The `%s' config option " - "needs exactly one string argument.", ci->key); + ERROR ("write_graphite plugin: Cannot use an empty string for the " + "\"EscapeCharacter\" option."); return (-1); } - *dest = ci->values[0].value.string[0]; + if (buffer[1] != 0) + { + WARNING ("write_graphite plugin: Only the first character of the " + "\"EscapeCharacter\" option ('%c') will be used.", + (int) buffer[0]); + } + + *dest = buffer[0]; return (0); } @@ -645,6 +613,7 @@ static int wg_config_carbon (oconfig_item_t *ci) { struct wg_callback *cb; user_data_t user_data; + char callback_name[DATA_MAX_NAME_LEN]; int i; cb = malloc (sizeof (*cb)); @@ -659,10 +628,11 @@ static int wg_config_carbon (oconfig_item_t *ci) cb->service = NULL; cb->prefix = NULL; cb->postfix = NULL; - cb->server = NULL; - cb->dotchar = '_'; + cb->escape_char = WG_DEFAULT_ESCAPE; + cb->store_rates = 1; pthread_mutex_init (&cb->send_lock, /* attr = */ NULL); + C_COMPLAIN_INIT (&cb->init_complaint); for (i = 0; i < ci->children_num; i++) { @@ -671,13 +641,19 @@ static int wg_config_carbon (oconfig_item_t *ci) if (strcasecmp ("Host", child->key) == 0) cf_util_get_string (child, &cb->node); else if (strcasecmp ("Port", child->key) == 0) - cf_util_get_string (child, &cb->service); + cf_util_get_service (child, &cb->service); else if (strcasecmp ("Prefix", child->key) == 0) cf_util_get_string (child, &cb->prefix); else if (strcasecmp ("Postfix", child->key) == 0) cf_util_get_string (child, &cb->postfix); - else if (strcasecmp ("DotCharacter", child->key) == 0) - config_set_char (&cb->dotchar, child); + else if (strcasecmp ("StoreRates", child->key) == 0) + cf_util_get_boolean (child, &cb->store_rates); + else if (strcasecmp ("SeparateInstances", child->key) == 0) + cf_util_get_boolean (child, &cb->separate_instances); + else if (strcasecmp ("AlwaysAppendDS", child->key) == 0) + cf_util_get_boolean (child, &cb->always_append_ds); + else if (strcasecmp ("EscapeCharacter", child->key) == 0) + config_set_char (&cb->escape_char, child); else { ERROR ("write_graphite plugin: Invalid configuration " @@ -685,17 +661,17 @@ static int wg_config_carbon (oconfig_item_t *ci) } } - DEBUG ("write_graphite: Registering write callback to carbon agent %s:%s", - cb->node ? cb->node : WG_DEFAULT_NODE, - cb->service ? cb->service : WG_DEFAULT_SERVICE); + ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s/%s", + cb->node != NULL ? cb->node : WG_DEFAULT_NODE, + cb->service != NULL ? cb->service : WG_DEFAULT_SERVICE); memset (&user_data, 0, sizeof (user_data)); user_data.data = cb; - user_data.free_func = NULL; - plugin_register_flush ("write_graphite", wg_flush, &user_data); - user_data.free_func = wg_callback_free; - plugin_register_write ("write_graphite", wg_write, &user_data); + plugin_register_write (callback_name, wg_write, &user_data); + + user_data.free_func = NULL; + plugin_register_flush (callback_name, wg_flush, &user_data); return (0); }