Code

tail plugin: Rename DSType from Latency to Distribution.
[collectd.git] / src / statsd.c
index 72b8e2b6342b2beb92c71e1322a0e836649521ad..491fe4239d35e4748a34c00931aed3643cc74aff 100644 (file)
  */
 
 #include "collectd.h"
+
 #include "plugin.h"
 #include "common.h"
-#include "configfile.h"
 #include "utils_avltree.h"
-#include "utils_complain.h"
 #include "utils_latency.h"
 
-#include <pthread.h>
-
 #include <sys/types.h>
-#include <sys/socket.h>
 #include <netdb.h>
 #include <poll.h>
 
@@ -65,6 +61,7 @@ struct statsd_metric_s
 {
   metric_type_t type;
   double value;
+  derive_t counter;
   latency_counter_t *latency;
   c_avl_tree_t *set;
   unsigned long updates_num;
@@ -89,6 +86,7 @@ static _Bool conf_delete_sets     = 0;
 static double *conf_timer_percentile = NULL;
 static size_t  conf_timer_percentile_num = 0;
 
+static _Bool conf_counter_sum     = 0;
 static _Bool conf_timer_lower     = 0;
 static _Bool conf_timer_upper     = 0;
 static _Bool conf_timer_sum       = 0;
@@ -126,14 +124,13 @@ static statsd_metric_t *statsd_metric_lookup_unsafe (char const *name, /* {{{ */
     return (NULL);
   }
 
-  metric = malloc (sizeof (*metric));
+  metric = calloc (1, sizeof (*metric));
   if (metric == NULL)
   {
-    ERROR ("statsd plugin: malloc failed.");
+    ERROR ("statsd plugin: calloc failed.");
     sfree (key_copy);
     return (NULL);
   }
-  memset (metric, 0, sizeof (*metric));
 
   metric->type = type;
   metric->latency = NULL;
@@ -195,6 +192,35 @@ static int statsd_metric_add (char const *name, double delta, /* {{{ */
   return (0);
 } /* }}} int statsd_metric_add */
 
+static void statsd_metric_free (statsd_metric_t *metric) /* {{{ */
+{
+  if (metric == NULL)
+    return;
+
+  if (metric->latency != NULL)
+  {
+    latency_counter_destroy (metric->latency);
+    metric->latency = NULL;
+  }
+
+  if (metric->set != NULL)
+  {
+    void *key;
+    void *value;
+
+    while (c_avl_pick (metric->set, &key, &value) == 0)
+    {
+      sfree (key);
+      assert (value == NULL);
+    }
+
+    c_avl_destroy (metric->set);
+    metric->set = NULL;
+  }
+
+  sfree (metric);
+} /* }}} void statsd_metric_free */
+
 static int statsd_parse_value (char const *str, value_t *ret_value) /* {{{ */
 {
   char *endptr = NULL;
@@ -233,6 +259,8 @@ static int statsd_handle_counter (char const *name, /* {{{ */
   if (status != 0)
     return (status);
 
+  /* Changes to the counter are added to (statsd_metric_t*)->value. ->counter is
+   * only updated in statsd_metric_submit_unsafe(). */
   return (statsd_metric_add (name, (double) (value.gauge / scale.gauge),
         STATSD_COUNTER));
 } /* }}} int statsd_handle_counter */
@@ -255,19 +283,35 @@ static int statsd_handle_gauge (char const *name, /* {{{ */
 } /* }}} int statsd_handle_gauge */
 
 static int statsd_handle_timer (char const *name, /* {{{ */
-    char const *value_str)
+    char const *value_str,
+    char const *extra)
 {
   statsd_metric_t *metric;
   value_t value_ms;
+  value_t scale;
   cdtime_t value;
   int status;
 
+  if ((extra != NULL) && (extra[0] != '@'))
+    return (-1);
+
+  scale.gauge = 1.0;
+  if (extra != NULL)
+  {
+    status = statsd_parse_value (extra + 1, &scale);
+    if (status != 0)
+      return (status);
+
+    if (!isfinite (scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
+      return (-1);
+  }
+
   value_ms.derive = 0;
   status = statsd_parse_value (value_str, &value_ms);
   if (status != 0)
     return (status);
 
-  value = MS_TO_CDTIME_T (value_ms.gauge);
+  value = MS_TO_CDTIME_T (value_ms.gauge / scale.gauge);
 
   pthread_mutex_lock (&metrics_lock);
 
@@ -311,7 +355,7 @@ static int statsd_handle_set (char const *name, /* {{{ */
 
   /* Make sure metric->set exists. */
   if (metric->set == NULL)
-    metric->set = c_avl_create ((void *) strcmp);
+    metric->set = c_avl_create ((int (*) (const void *, const void *)) strcmp);
 
   if (metric->set == NULL)
   {
@@ -377,15 +421,15 @@ static int statsd_parse_line (char *buffer) /* {{{ */
 
   if (strcmp ("c", type) == 0)
     return (statsd_handle_counter (name, value, extra));
+  else if (strcmp ("ms", type) == 0)
+    return (statsd_handle_timer (name, value, extra));
 
-  /* extra is only valid for counters */
+  /* extra is only valid for counters and timers */
   if (extra != NULL)
     return (-1);
 
   if (strcmp ("g", type) == 0)
     return (statsd_handle_gauge (name, value));
-  else if (strcmp ("ms", type) == 0)
-    return (statsd_handle_timer (name, value));
   else if (strcmp ("s", type) == 0)
     return (statsd_handle_set (name, value));
   else
@@ -456,22 +500,18 @@ static int statsd_network_init (struct pollfd **ret_fds, /* {{{ */
   struct pollfd *fds = NULL;
   size_t fds_num = 0;
 
-  struct addrinfo ai_hints;
-  struct addrinfo *ai_list = NULL;
-  struct addrinfo *ai_ptr;
+  struct addrinfo *ai_list;
   int status;
 
   char const *node = (conf_node != NULL) ? conf_node : STATSD_DEFAULT_NODE;
   char const *service = (conf_service != NULL)
     ? conf_service : STATSD_DEFAULT_SERVICE;
 
-  memset (&ai_hints, 0, sizeof (ai_hints));
-  ai_hints.ai_flags = AI_PASSIVE;
-#ifdef AI_ADDRCONFIG
-  ai_hints.ai_flags |= AI_ADDRCONFIG;
-#endif
-  ai_hints.ai_family = AF_UNSPEC;
-  ai_hints.ai_socktype = SOCK_DGRAM;
+  struct addrinfo ai_hints = {
+    .ai_family = AF_UNSPEC,
+    .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
+    .ai_socktype = SOCK_DGRAM
+  };
 
   status = getaddrinfo (node, service, &ai_hints, &ai_list);
   if (status != 0)
@@ -481,7 +521,7 @@ static int statsd_network_init (struct pollfd **ret_fds, /* {{{ */
     return (status);
   }
 
-  for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
   {
     int fd;
     struct pollfd *tmp;
@@ -517,6 +557,7 @@ static int statsd_network_init (struct pollfd **ret_fds, /* {{{ */
     if (tmp == NULL)
     {
       ERROR ("statsd plugin: realloc failed.");
+      close (fd);
       continue;
     }
     fds = tmp;
@@ -547,7 +588,6 @@ static void *statsd_network_thread (void *args) /* {{{ */
   struct pollfd *fds = NULL;
   size_t fds_num = 0;
   int status;
-  size_t i;
 
   status = statsd_network_init (&fds, &fds_num);
   if (status != 0)
@@ -571,7 +611,7 @@ static void *statsd_network_thread (void *args) /* {{{ */
       break;
     }
 
-    for (i = 0; i < fds_num; i++)
+    for (size_t i = 0; i < fds_num; i++)
     {
       if ((fds[i].revents & (POLLIN | POLLPRI)) == 0)
         continue;
@@ -582,7 +622,7 @@ static void *statsd_network_thread (void *args) /* {{{ */
   } /* while (!network_thread_shutdown) */
 
   /* Clean up */
-  for (i = 0; i < fds_num; i++)
+  for (size_t i = 0; i < fds_num; i++)
     close (fds[i].fd);
   sfree (fds);
 
@@ -622,9 +662,7 @@ static int statsd_config_timer_percentile (oconfig_item_t *ci) /* {{{ */
 
 static int statsd_config (oconfig_item_t *ci) /* {{{ */
 {
-  int i;
-
-  for (i = 0; i < ci->children_num; i++)
+  for (int i = 0; i < ci->children_num; i++)
   {
     oconfig_item_t *child = ci->children + i;
 
@@ -640,6 +678,8 @@ static int statsd_config (oconfig_item_t *ci) /* {{{ */
       cf_util_get_boolean (child, &conf_delete_gauges);
     else if (strcasecmp ("DeleteSets", child->key) == 0)
       cf_util_get_boolean (child, &conf_delete_sets);
+    else if (strcasecmp ("CounterSum", child->key) == 0)
+      cf_util_get_boolean (child, &conf_counter_sum);
     else if (strcasecmp ("TimerLower", child->key) == 0)
       cf_util_get_boolean (child, &conf_timer_lower);
     else if (strcasecmp ("TimerUpper", child->key) == 0)
@@ -662,7 +702,7 @@ static int statsd_init (void) /* {{{ */
 {
   pthread_mutex_lock (&metrics_lock);
   if (metrics_tree == NULL)
-    metrics_tree = c_avl_create ((void *) strcmp);
+    metrics_tree = c_avl_create ((int (*) (const void *, const void *)) strcmp);
 
   if (!network_thread_running)
   {
@@ -710,15 +750,12 @@ static int statsd_metric_clear_set_unsafe (statsd_metric_t *metric) /* {{{ */
 } /* }}} int statsd_metric_clear_set_unsafe */
 
 /* Must hold metrics_lock when calling this function. */
-static int statsd_metric_submit_unsafe (char const *name, /* {{{ */
-    statsd_metric_t const *metric)
+static int statsd_metric_submit_unsafe (char const *name, statsd_metric_t *metric) /* {{{ */
 {
-  value_t values[1];
   value_list_t vl = VALUE_LIST_INIT;
 
-  vl.values = values;
+  vl.values = &(value_t) { .gauge = NAN };
   vl.values_len = 1;
-  sstrncpy (vl.host, hostname_g, sizeof (vl.host));
   sstrncpy (vl.plugin, "statsd", sizeof (vl.plugin));
 
   if (metric->type == STATSD_GAUGE)
@@ -733,53 +770,55 @@ static int statsd_metric_submit_unsafe (char const *name, /* {{{ */
   sstrncpy (vl.type_instance, name, sizeof (vl.type_instance));
 
   if (metric->type == STATSD_GAUGE)
-    values[0].gauge = (gauge_t) metric->value;
+    vl.values[0].gauge = (gauge_t) metric->value;
   else if (metric->type == STATSD_TIMER)
   {
-    size_t i;
-
-    if (metric->updates_num == 0)
-      return (0);
+    _Bool have_events = (metric->updates_num > 0);
 
+    /* Make sure all timer metrics share the *same* timestamp. */
     vl.time = cdtime ();
 
     ssnprintf (vl.type_instance, sizeof (vl.type_instance),
         "%s-average", name);
-    values[0].gauge = CDTIME_T_TO_DOUBLE (
-        latency_counter_get_average (metric->latency));
+    vl.values[0].gauge = have_events
+      ? CDTIME_T_TO_DOUBLE (latency_counter_get_average (metric->latency))
+      : NAN;
     plugin_dispatch_values (&vl);
 
     if (conf_timer_lower) {
       ssnprintf (vl.type_instance, sizeof (vl.type_instance),
           "%s-lower", name);
-      values[0].gauge = CDTIME_T_TO_DOUBLE (
-          latency_counter_get_min (metric->latency));
+      vl.values[0].gauge = have_events
+        ? CDTIME_T_TO_DOUBLE (latency_counter_get_min (metric->latency))
+        : NAN;
       plugin_dispatch_values (&vl);
     }
 
     if (conf_timer_upper) {
       ssnprintf (vl.type_instance, sizeof (vl.type_instance),
           "%s-upper", name);
-      values[0].gauge = CDTIME_T_TO_DOUBLE (
-          latency_counter_get_max (metric->latency));
+      vl.values[0].gauge = have_events
+        ? CDTIME_T_TO_DOUBLE (latency_counter_get_max (metric->latency))
+        : NAN;
       plugin_dispatch_values (&vl);
     }
 
     if (conf_timer_sum) {
       ssnprintf (vl.type_instance, sizeof (vl.type_instance),
           "%s-sum", name);
-      values[0].gauge = CDTIME_T_TO_DOUBLE (
-          latency_counter_get_sum (metric->latency));
+      vl.values[0].gauge = have_events
+        ? CDTIME_T_TO_DOUBLE (latency_counter_get_sum (metric->latency))
+        : NAN;
       plugin_dispatch_values (&vl);
     }
 
-    for (i = 0; i < conf_timer_percentile_num; i++)
+    for (size_t i = 0; i < conf_timer_percentile_num; i++)
     {
       ssnprintf (vl.type_instance, sizeof (vl.type_instance),
           "%s-percentile-%.0f", name, conf_timer_percentile[i]);
-      values[0].gauge = CDTIME_T_TO_DOUBLE (
-          latency_counter_get_percentile (
-            metric->latency, conf_timer_percentile[i]));
+      vl.values[0].gauge = have_events
+        ? CDTIME_T_TO_DOUBLE (latency_counter_get_percentile (metric->latency, conf_timer_percentile[i]))
+        : NAN;
       plugin_dispatch_values (&vl);
     }
 
@@ -789,7 +828,7 @@ static int statsd_metric_submit_unsafe (char const *name, /* {{{ */
       sstrncpy (vl.type, "gauge", sizeof (vl.type));
       ssnprintf (vl.type_instance, sizeof (vl.type_instance),
           "%s-count", name);
-      values[0].gauge = latency_counter_get_num (metric->latency);
+      vl.values[0].gauge = latency_counter_get_num (metric->latency);
       plugin_dispatch_values (&vl);
     }
 
@@ -799,12 +838,34 @@ static int statsd_metric_submit_unsafe (char const *name, /* {{{ */
   else if (metric->type == STATSD_SET)
   {
     if (metric->set == NULL)
-      values[0].gauge = 0.0;
+      vl.values[0].gauge = 0.0;
     else
-      values[0].gauge = (gauge_t) c_avl_size (metric->set);
+      vl.values[0].gauge = (gauge_t) c_avl_size (metric->set);
+  }
+  else { /* STATSD_COUNTER */
+    gauge_t delta = nearbyint (metric->value);
+
+    /* Etsy's statsd writes counters as two metrics: a rate and the change since
+     * the last write. Since collectd does not reset its DERIVE metrics to zero,
+     * this makes little sense, but we're dispatching a "count" metric here
+     * anyway - if requested by the user - for compatibility reasons. */
+    if (conf_counter_sum)
+    {
+      sstrncpy (vl.type, "count", sizeof (vl.type));
+      vl.values[0].gauge = delta;
+      plugin_dispatch_values (&vl);
+
+      /* restore vl.type */
+      sstrncpy (vl.type, "derive", sizeof (vl.type));
+    }
+
+    /* Rather than resetting value to zero, subtract delta so we correctly keep
+     * track of residuals. */
+    metric->value   -= delta;
+    metric->counter += (derive_t) delta;
+
+    vl.values[0].derive = metric->counter;
   }
-  else
-    values[0].derive = (derive_t) metric->value;
 
   return (plugin_dispatch_values (&vl));
 } /* }}} int statsd_metric_submit_unsafe */
@@ -817,7 +878,6 @@ static int statsd_read (void) /* {{{ */
 
   char **to_be_deleted = NULL;
   size_t to_be_deleted_num = 0;
-  size_t i;
 
   pthread_mutex_lock (&metrics_lock);
 
@@ -852,7 +912,7 @@ static int statsd_read (void) /* {{{ */
   }
   c_avl_iterator_destroy (iter);
 
-  for (i = 0; i < to_be_deleted_num; i++)
+  for (size_t i = 0; i < to_be_deleted_num; i++)
   {
     int status;
 
@@ -866,7 +926,7 @@ static int statsd_read (void) /* {{{ */
     }
 
     sfree (name);
-    sfree (metric);
+    statsd_metric_free (metric);
   }
 
   pthread_mutex_unlock (&metrics_lock);
@@ -881,8 +941,6 @@ static int statsd_shutdown (void) /* {{{ */
   void *key;
   void *value;
 
-  pthread_mutex_lock (&metrics_lock);
-
   if (network_thread_running)
   {
     network_thread_shutdown = 1;
@@ -891,10 +949,12 @@ static int statsd_shutdown (void) /* {{{ */
   }
   network_thread_running = 0;
 
+  pthread_mutex_lock (&metrics_lock);
+
   while (c_avl_pick (metrics_tree, &key, &value) == 0)
   {
     sfree (key);
-    sfree (value);
+    statsd_metric_free (value);
   }
   c_avl_destroy (metrics_tree);
   metrics_tree = NULL;