Code

stats plugin: Add support for sets.
authorFlorian Forster <octo@collectd.org>
Mon, 17 Jun 2013 14:47:24 +0000 (16:47 +0200)
committerFlorian Forster <octo@collectd.org>
Mon, 17 Jun 2013 14:47:24 +0000 (16:47 +0200)
src/collectd.conf.in
src/collectd.conf.pod
src/statsd.c

index 515da22ac53667ce1a3146d540580922251cde88..98a7596706eb1d1107260d92544b225f0ef3e6ef 100644 (file)
 #  Host "::"
 #  Port "8125"
 #  DeleteCounters false
-#  DeleteTimers false
-#  DeleteGauges false
+#  DeleteTimers   false
+#  DeleteGauges   false
+#  DeleteSets     false
 #</Plugin>
 
 #<Plugin "swap">
index a16f29a1fd83b1982fbaa3791ee03d1a421dbebd..39b3792d351abf7aa7d823211384e9f0ca63a694 100644 (file)
@@ -5109,8 +5109,9 @@ The I<statsd plugin> listens to a UDP socket, reads "events" in the statsd
 protocol and dispatches rates or other aggregates of these numbers
 periodically.
 
-The plugin implements the I<Counter>, I<Timer> and I<Gauge> types which are
-dispatched as C<derive>, C<latency> and C<gauge> respectively.
+The plugin implements the I<Counter>, I<Timer>, I<Gauge> and I<Set> types which
+are dispatched as the I<collectd> types C<derive>, C<latency>, C<gauge> and
+C<objects> respectively.
 
 The following configuration options are valid:
 
@@ -5132,11 +5133,13 @@ Defaults to C<8125>.
 
 =item B<DeleteGauges> B<false>|B<true>
 
+=item B<DeleteSets> B<false>|B<true>
+
 These options control what happens if metrics are not updated in an interval.
 If set to B<False>, the default, metrics are dispatched unchanged, i.e. the
-rate of counters will be zero, timers report C<NaN> and gauges are unchanged.
-If set to B<True>, the such metrics are not dispatched and removed from the
-internal cache.
+rate of counters and size of sets will be zero, timers report C<NaN> and gauges
+are unchanged. If set to B<True>, the such metrics are not dispatched and
+removed from the internal cache.
 
 =back
 
index 5457c33f90d22eea9582947cd95b3836326bf058..9443fed40f55485f797bb54a0b538ad1c6a50f09 100644 (file)
@@ -45,7 +45,8 @@ enum metric_type_e
 {
   STATSD_COUNTER,
   STATSD_TIMER,
-  STATSD_GAUGE
+  STATSD_GAUGE,
+  STATSD_SET
 };
 typedef enum metric_type_e metric_type_t;
 
@@ -53,6 +54,7 @@ struct statsd_metric_s
 {
   metric_type_t type;
   int64_t value;
+  c_avl_tree_t *set;
   unsigned long updates_num;
 };
 typedef struct statsd_metric_s statsd_metric_t;
@@ -70,6 +72,7 @@ static char *conf_service = NULL;
 static _Bool conf_delete_counters = 0;
 static _Bool conf_delete_timers   = 0;
 static _Bool conf_delete_gauges   = 0;
+static _Bool conf_delete_sets     = 0;
 
 /* Must hold metrics_lock when calling this function. */
 static int statsd_metric_set_unsafe (char const *name, int64_t value, /* {{{ */
@@ -227,14 +230,93 @@ static int statsd_handle_timer (char const *name, /* {{{ */
   return (statsd_metric_add (key, (int64_t) value.derive, STATSD_TIMER));
 } /* }}} int statsd_handle_timer */
 
-static int statsd_handle_set (char const *name __attribute__((unused)), /* {{{ */
-    char const *value_str __attribute__((unused)))
+static int statsd_handle_set (char const *key_orig, /* {{{ */
+    char const *name_orig)
 {
-  static c_complain_t c = C_COMPLAIN_INIT_STATIC;
+  char key[DATA_MAX_NAME_LEN + 2];
+  char *name;
+  statsd_metric_t *metric = NULL;
+  int status;
+
+  ssnprintf (key, sizeof (key), "s:%s", key_orig);
+
+  pthread_mutex_lock (&metrics_lock);
+
+  status = c_avl_get (metrics_tree, key, (void *) &metric);
+  if (status != 0) /* Create a new metric */
+  {
+    char *key_copy;
+
+    DEBUG ("stats plugin: Adding new metric \"%s\".", key);
+    key_copy = strdup (key);
+    if (key_copy == NULL)
+    {
+      pthread_mutex_unlock (&metrics_lock);
+      ERROR ("statsd plugin: strdup failed.");
+      return (-1);
+    }
+
+    metric = calloc (1, sizeof (*metric));
+    if (metric == NULL)
+    {
+      pthread_mutex_unlock (&metrics_lock);
+      ERROR ("statsd plugin: calloc failed.");
+      sfree (key_copy);
+      return (-1);
+    }
+    metric->type = STATSD_SET;
+    metric->set = NULL;
+
+    status = c_avl_insert (metrics_tree, key_copy, metric);
+    if (status != 0)
+    {
+      pthread_mutex_unlock (&metrics_lock);
+      ERROR ("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
+          key_copy, status);
+      sfree (key_copy);
+      sfree (metric);
+      return (-1);
+    }
+  }
+  assert (metric != NULL);
+
+  /* Make sure metric->set exists. */
+  if (metric->set == NULL)
+    metric->set = c_avl_create ((void *) strcmp);
+
+  if (metric->set == NULL)
+  {
+    pthread_mutex_unlock (&metrics_lock);
+    ERROR ("statsd plugin: c_avl_create failed.");
+    return (-1);
+  }
+
+  name = strdup (name_orig);
+  if (name == NULL)
+  {
+    pthread_mutex_unlock (&metrics_lock);
+    ERROR ("statsd plugin: strdup failed.");
+    return (-1);
+  }
 
-  c_complain (LOG_WARNING, &c,
-      "statsd plugin: Support for sets is not yet implemented.");
+  status = c_avl_insert (metric->set, name, /* value = */ NULL);
+  if (status < 0)
+  {
+    pthread_mutex_unlock (&metrics_lock);
+    if (status < 0)
+      ERROR ("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
+          name, status);
+    sfree (name);
+    return (-1);
+  }
+  else if (status > 0) /* key already exists */
+  {
+    sfree (name);
+  }
+
+  metric->updates_num++;
 
+  pthread_mutex_unlock (&metrics_lock);
   return (0);
 } /* }}} int statsd_handle_set */
 
@@ -486,6 +568,8 @@ static int statsd_config (oconfig_item_t *ci) /* {{{ */
       cf_util_get_boolean (child, &conf_delete_timers);
     else if (strcasecmp ("DeleteGauges", child->key) == 0)
       cf_util_get_boolean (child, &conf_delete_gauges);
+    else if (strcasecmp ("DeleteSets", child->key) == 0)
+      cf_util_get_boolean (child, &conf_delete_sets);
     else
       ERROR ("statsd plugin: The \"%s\" config option is not valid.",
           child->key);
@@ -524,7 +608,29 @@ static int statsd_init (void) /* {{{ */
   return (0);
 } /* }}} int statsd_init */
 
-static int statsd_metric_submit (char const *name, /* {{{ */
+/* Must hold metrics_lock when calling this function. */
+static int statsd_metric_clear_set_unsafe (statsd_metric_t *metric) /* {{{ */
+{
+  void *key;
+  void *value;
+
+  if ((metric == NULL) || (metric->type != STATSD_SET))
+    return (EINVAL);
+
+  if (metric->set == NULL)
+    return (0);
+
+  while (c_avl_pick (metric->set, &key, &value) == 0)
+  {
+    sfree (key);
+    sfree (value);
+  }
+
+  return (0);
+} /* }}} int statsd_metric_clear_set_unsafe */
+
+/* Must hold metrics_lock when calling this function. */
+static int statsd_metric_submit_unsafe (char const *name, /* {{{ */
     statsd_metric_t const *metric)
 {
   value_t values[1];
@@ -540,6 +646,13 @@ static int statsd_metric_submit (char const *name, /* {{{ */
       values[0].gauge =
         ((gauge_t) metric->value) / ((gauge_t) metric->updates_num);
   }
+  else if (metric->type == STATSD_SET)
+  {
+    if (metric->set == NULL)
+      values[0].gauge = 0.0;
+    else
+      values[0].gauge = (gauge_t) c_avl_size (metric->set);
+  }
   else
     values[0].derive = (derive_t) metric->value;
 
@@ -552,13 +665,15 @@ static int statsd_metric_submit (char const *name, /* {{{ */
     sstrncpy (vl.type, "gauge", sizeof (vl.type));
   else if (metric->type == STATSD_TIMER)
     sstrncpy (vl.type, "latency", sizeof (vl.type));
+  else if (metric->type == STATSD_SET)
+    sstrncpy (vl.type, "objects", sizeof (vl.type));
   else /* if (metric->type == STATSD_COUNTER) */
     sstrncpy (vl.type, "derive", sizeof (vl.type));
 
   sstrncpy (vl.type_instance, name, sizeof (vl.type_instance));
 
   return (plugin_dispatch_values (&vl));
-} /* }}} int statsd_metric_submit */
+} /* }}} int statsd_metric_submit_unsafe */
 
 static int statsd_read (void) /* {{{ */
 {
@@ -584,7 +699,8 @@ static int statsd_read (void) /* {{{ */
     if ((metric->updates_num == 0)
         && ((conf_delete_counters && (metric->type == STATSD_COUNTER))
           || (conf_delete_timers && (metric->type == STATSD_TIMER))
-          || (conf_delete_gauges && (metric->type == STATSD_GAUGE))))
+          || (conf_delete_gauges && (metric->type == STATSD_GAUGE))
+          || (conf_delete_sets && (metric->type == STATSD_SET))))
     {
       DEBUG ("statsd plugin: Deleting metric \"%s\".", name);
       strarray_add (&to_be_deleted, &to_be_deleted_num, name);
@@ -593,8 +709,12 @@ static int statsd_read (void) /* {{{ */
 
     /* Names have a prefix, e.g. "c:", which determines the (statsd) type.
      * Remove this here. */
-    statsd_metric_submit (name + 2, metric);
+    statsd_metric_submit_unsafe (name + 2, metric);
+
+    /* Reset the metric. */
     metric->updates_num = 0;
+    if (metric->type == STATSD_SET)
+      statsd_metric_clear_set_unsafe (metric);
   }
   c_avl_iterator_destroy (iter);