Code

Merge remote-tracking branch 'github/pr/631'
[collectd.git] / src / write_tsdb.c
index 8a793b5c2b704825ccaf8434d99ae72cf6fe24fb..b670f3ae583d3d401565c019ed5df900667f87b8 100644 (file)
  */
 
 #include "collectd.h"
+
 #include "common.h"
 #include "plugin.h"
-#include "configfile.h"
 
 #include "utils_cache.h"
-#include "utils_parse_option.h"
 
-#include <pthread.h>
-#include <sys/socket.h>
 #include <netdb.h>
 
 #ifndef WT_DEFAULT_NODE
@@ -80,10 +77,8 @@ struct wt_callback
     char     *node;
     char     *service;
     char     *host_tags;
-    char     escape_char;
 
     _Bool    store_rates;
-    _Bool    separate_instances;
     _Bool    always_append_ds;
 
     char     send_buf[WT_SEND_BUF_SIZE];
@@ -146,7 +141,7 @@ static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb)
             return 0;
     }
 
-    if (cb->send_buf_fill <= 0)
+    if (cb->send_buf_fill == 0)
     {
         cb->send_buf_init_time = cdtime();
         return 0;
@@ -160,9 +155,7 @@ static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb)
 
 static int wt_callback_init(struct wt_callback *cb)
 {
-    struct addrinfo ai_hints;
     struct addrinfo *ai_list;
-    struct addrinfo *ai_ptr;
     int status;
 
     const char *node = cb->node ? cb->node : WT_DEFAULT_NODE;
@@ -171,14 +164,11 @@ static int wt_callback_init(struct wt_callback *cb)
     if (cb->sock_fd > 0)
         return 0;
 
-    memset(&ai_hints, 0, sizeof(ai_hints));
-#ifdef AI_ADDRCONFIG
-    ai_hints.ai_flags    |= AI_ADDRCONFIG;
-#endif
-    ai_hints.ai_family   = AF_UNSPEC;
-    ai_hints.ai_socktype = SOCK_STREAM;
-
-    ai_list = NULL;
+    struct addrinfo ai_hints = {
+        .ai_family = AF_UNSPEC,
+        .ai_flags = AI_ADDRCONFIG,
+        .ai_socktype = SOCK_STREAM
+    };
 
     status = getaddrinfo(node, service, &ai_hints, &ai_list);
     if (status != 0)
@@ -189,13 +179,15 @@ static int wt_callback_init(struct wt_callback *cb)
     }
 
     assert (ai_list != NULL);
-    for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+    for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
     {
         cb->sock_fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype,
                              ai_ptr->ai_protocol);
         if (cb->sock_fd < 0)
             continue;
 
+        set_sock_opts(cb->sock_fd);
+
         status = connect(cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
         if (status != 0)
         {
@@ -215,7 +207,6 @@ static int wt_callback_init(struct wt_callback *cb)
         ERROR("write_tsdb plugin: Connecting to %s:%s failed. "
               "The last error was: %s", node, service,
               sstrerror (errno, errbuf, sizeof(errbuf)));
-        close(cb->sock_fd);
         return -1;
     }
 
@@ -311,7 +302,7 @@ static int wt_format_values(char *ret, size_t ret_len,
 } while (0)
 
     if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
-        BUFFER_ADD("%f", vl->values[ds_num].gauge);
+        BUFFER_ADD(GAUGE_FORMAT, vl->values[ds_num].gauge);
     else if (store_rates)
     {
         if (rates == NULL)
@@ -322,7 +313,7 @@ static int wt_format_values(char *ret, size_t ret_len,
                     "uc_get_rate failed.");
             return -1;
         }
-        BUFFER_ADD("%f", rates[ds_num]);
+        BUFFER_ADD(GAUGE_FORMAT, rates[ds_num]);
     }
     else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
         BUFFER_ADD("%llu", vl->values[ds_num].counter);
@@ -350,46 +341,59 @@ static int wt_format_name(char *ret, int ret_len,
                           const char *ds_name)
 {
     int status;
-    char *temp;
-    char *prefix;
+    char *temp = NULL;
+    const char *prefix = "";
     const char *meta_prefix = "tsdb_prefix";
 
-    status = meta_data_get_string(vl->meta, meta_prefix, &temp);
-    if (status == -ENOENT) {
-        prefix = "";
-    } else if (status < 0) {
-        sfree(temp);
-        return status;
-    } else {
-        prefix = temp;
+    if (vl->meta) {
+        status = meta_data_get_string(vl->meta, meta_prefix, &temp);
+        if (status == -ENOENT) {
+            /* defaults to empty string */
+        } else if (status < 0) {
+            sfree(temp);
+            return status;
+        } else {
+            prefix = temp;
+        }
     }
 
     if (ds_name != NULL) {
         if (vl->plugin_instance[0] == '\0') {
-            ssnprintf(ret, ret_len, "%s.%s.%s",
-                      prefix, vl->plugin, ds_name);
-        } else if (vl->type_instance == '\0') {
-            ssnprintf(ret, ret_len, "%s.%s.%s.%s.%s",
-                      prefix, vl->plugin, vl->plugin_instance,
-                      vl->type_instance, ds_name);
-        } else {
-            ssnprintf(ret, ret_len, "%s.%s.%s.%s.%s",
-                      prefix, vl->plugin, vl->plugin_instance, vl->type,
-                      ds_name);
+            if (vl->type_instance[0] == '\0') {
+                ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
+                        vl->type, ds_name);
+            } else {
+                ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
+                        vl->type, vl->type_instance, ds_name);
+            }
+        } else { /* vl->plugin_instance != "" */
+            if (vl->type_instance[0] == '\0') {
+                ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
+                        vl->plugin_instance, vl->type, ds_name);
+            } else {
+                ssnprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix,
+                        vl->plugin, vl->plugin_instance, vl->type,
+                        vl->type_instance, ds_name);
+            }
+        }
+    } else { /* ds_name == NULL */
+        if (vl->plugin_instance[0] == '\0') {
+            if (vl->type_instance[0] == '\0') {
+                ssnprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin,
+                        vl->type);
+            } else {
+                ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
+                        vl->type_instance, vl->type);
+            }
+        } else { /* vl->plugin_instance != "" */
+            if (vl->type_instance[0] == '\0') {
+                ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
+                        vl->plugin_instance, vl->type);
+            } else {
+                ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
+                        vl->plugin_instance, vl->type, vl->type_instance);
+            }
         }
-    } else if (vl->plugin_instance[0] == '\0') {
-        if (vl->type_instance[0] == '\0')
-            ssnprintf(ret, ret_len, "%s.%s.%s",
-                      prefix, vl->plugin, vl->type);
-        else
-            ssnprintf(ret, ret_len, "%s.%s.%s",
-                      prefix, vl->plugin, vl->type_instance);
-    } else if (vl->type_instance[0] == '\0') {
-        ssnprintf(ret, ret_len, "%s.%s.%s.%s",
-                  prefix, vl->plugin, vl->plugin_instance, vl->type);
-    } else {
-        ssnprintf(ret, ret_len, "%s.%s.%s.%s",
-                  prefix, vl->plugin, vl->plugin_instance, vl->type_instance);
     }
 
     sfree(temp);
@@ -401,44 +405,48 @@ static int wt_send_message (const char* key, const char* value,
                             const char* host, meta_data_t *md)
 {
     int status;
-    int message_len;
-    char *temp, *tags;
+    size_t message_len;
+    char *temp = NULL;
+    const char *tags = "";
     char message[1024];
-    const char *message_fmt;
+    const char *host_tags = cb->host_tags ? cb->host_tags : "";
     const char *meta_tsdb = "tsdb_tags";
 
     /* skip if value is NaN */
     if (value[0] == 'n')
         return 0;
 
-    status = meta_data_get_string(md, meta_tsdb, &temp);
-    if (status == -ENOENT) {
-        tags = "";
-    } else if (status < 0) {
-        ERROR("write_tsdb plugin: tags metadata get failure");
-        sfree(temp);
-        pthread_mutex_unlock(&cb->send_lock);
-        return status;
-    } else {
-        tags = temp;
+    if (md) {
+        status = meta_data_get_string(md, meta_tsdb, &temp);
+        if (status == -ENOENT) {
+            /* defaults to empty string */
+        } else if (status < 0) {
+            ERROR("write_tsdb plugin: tags metadata get failure");
+            sfree(temp);
+            pthread_mutex_unlock(&cb->send_lock);
+            return status;
+        } else {
+            tags = temp;
+        }
     }
 
-    message_fmt = "put %s %u %s fqdn=%s %s %s\r\n";
-    message_len = ssnprintf (message, sizeof(message),
-                                      message_fmt,
-                                      key,
-                                      (unsigned int)CDTIME_T_TO_TIME_T(
-                                          time),
-                                      value,
-                                      host,
-                                      tags,
-                                      cb->host_tags);
-
-    sfree(tags);
+    status = ssnprintf (message,
+                             sizeof(message),
+                             "put %s %.0f %s fqdn=%s %s %s\r\n",
+                             key,
+                             CDTIME_T_TO_DOUBLE(time),
+                             value,
+                             host,
+                             tags,
+                             host_tags);
+    sfree(temp);
+    if (status < 0)
+        return -1;
+    message_len = (size_t) status;
 
     if (message_len >= sizeof(message)) {
         ERROR("write_tsdb plugin: message buffer too small: "
-              "Need %d bytes.", message_len + 1);
+              "Need %zu bytes.", message_len + 1);
         return -1;
     }
 
@@ -494,7 +502,7 @@ static int wt_write_messages(const data_set_t *ds, const value_list_t *vl,
     char key[10*DATA_MAX_NAME_LEN];
     char values[512];
 
-    int status, i;
+    int status;
 
     if (0 != strcmp(ds->type, vl->type))
     {
@@ -503,7 +511,7 @@ static int wt_write_messages(const data_set_t *ds, const value_list_t *vl,
         return -1;
     }
 
-    for (i = 0; i < ds->ds_num; i++)
+    for (size_t i = 0; i < ds->ds_num; i++)
     {
         const char *ds_name = NULL;
 
@@ -559,61 +567,26 @@ static int wt_write(const data_set_t *ds, const value_list_t *vl,
     return status;
 }
 
-static int config_set_char(char *dest,
-                           oconfig_item_t *ci)
-{
-    char buffer[4];
-    int status;
-
-    memset(buffer, 0, sizeof(buffer));
-
-    status = cf_util_get_string_buffer(ci, buffer, sizeof(buffer));
-    if (status != 0)
-        return (status);
-
-    if (buffer[0] == 0)
-    {
-        ERROR("write_tsdb plugin: Cannot use an empty string for the "
-              "\"EscapeCharacter\" option.");
-        return -1;
-    }
-
-    if (buffer[1] != 0)
-    {
-        WARNING("write_tsdb plugin: Only the first character of the "
-                "\"EscapeCharacter\" option ('%c') will be used.",
-                (int) buffer[0]);
-    }
-
-    *dest = buffer[0];
-
-    return 0;
-}
-
 static int wt_config_tsd(oconfig_item_t *ci)
 {
     struct wt_callback *cb;
-    user_data_t user_data;
     char callback_name[DATA_MAX_NAME_LEN];
-    int i;
 
-    cb = malloc(sizeof(*cb));
+    cb = calloc(1, sizeof(*cb));
     if (cb == NULL)
     {
-        ERROR("write_tsdb plugin: malloc failed.");
+        ERROR("write_tsdb plugin: calloc failed.");
         return -1;
     }
-    memset(cb, 0, sizeof(*cb));
     cb->sock_fd = -1;
     cb->node = NULL;
     cb->service = NULL;
     cb->host_tags = NULL;
-    cb->escape_char = WT_DEFAULT_ESCAPE;
-    cb->store_rates = 1;
+    cb->store_rates = 0;
 
     pthread_mutex_init (&cb->send_lock, NULL);
 
-    for (i = 0; i < ci->children_num; i++)
+    for (int i = 0; i < ci->children_num; i++)
     {
         oconfig_item_t *child = ci->children + i;
 
@@ -625,12 +598,8 @@ static int wt_config_tsd(oconfig_item_t *ci)
             cf_util_get_string(child, &cb->host_tags);
         else if (strcasecmp("StoreRates", child->key) == 0)
             cf_util_get_boolean(child, &cb->store_rates);
-        else if (strcasecmp("SeparateInstances", child->key) == 0)
-            cf_util_get_boolean(child, &cb->separate_instances);
         else if (strcasecmp("AlwaysAppendDS", child->key) == 0)
             cf_util_get_boolean(child, &cb->always_append_ds);
-        else if (strcasecmp("EscapeCharacter", child->key) == 0)
-            config_set_char(&cb->escape_char, child);
         else
         {
             ERROR("write_tsdb plugin: Invalid configuration "
@@ -642,9 +611,11 @@ static int wt_config_tsd(oconfig_item_t *ci)
               cb->node != NULL ? cb->node : WT_DEFAULT_NODE,
               cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE);
 
-    memset(&user_data, 0, sizeof(user_data));
-    user_data.data = cb;
-    user_data.free_func = wt_callback_free;
+    user_data_t user_data = {
+        .data = cb,
+        .free_func = wt_callback_free
+    };
+
     plugin_register_write(callback_name, wt_write, &user_data);
 
     user_data.free_func = NULL;
@@ -655,9 +626,7 @@ static int wt_config_tsd(oconfig_item_t *ci)
 
 static int wt_config(oconfig_item_t *ci)
 {
-    int i;
-
-    for (i = 0; i < ci->children_num; i++)
+    for (int i = 0; i < ci->children_num; i++)
     {
         oconfig_item_t *child = ci->children + i;