Code

Merge branch 'ym/limit_write_queue_length'
authorFlorian Forster <octo@collectd.org>
Sat, 13 Jul 2013 09:24:39 +0000 (11:24 +0200)
committerFlorian Forster <octo@collectd.org>
Sat, 13 Jul 2013 09:24:39 +0000 (11:24 +0200)
1  2 
src/plugin.c

diff --combined src/plugin.c
index 2ad866e3bb2fbcb44012ff997b62932563bdd349,12f002cf5baba8d3a77b6285e63cca253e04d6a8..6c7aa057cf2d272b126eadf6308a76188d3f3b1a
@@@ -31,6 -31,7 +31,7 @@@
  #include "utils_llist.h"
  #include "utils_heap.h"
  #include "utils_time.h"
+ #include "utils_random.h"
  
  #if HAVE_PTHREAD_H
  # include <pthread.h>
@@@ -108,6 -109,7 +109,7 @@@ static int             read_threads_nu
  
  static write_queue_t  *write_queue_head;
  static write_queue_t  *write_queue_tail;
+ static long            write_queue_length = 0;
  static _Bool           write_loop = 1;
  static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
  static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
@@@ -117,6 -119,9 +119,9 @@@ static size_t          write_threads_nu
  static pthread_key_t   plugin_ctx_key;
  static _Bool           plugin_ctx_key_initialized = 0;
  
+ static long            write_limit_high = 0;
+ static long            write_limit_low = 0;
  /*
   * Static functions
   */
@@@ -670,11 -675,13 +675,13 @@@ static int plugin_write_enqueue (value_
        {
                write_queue_head = q;
                write_queue_tail = q;
+               write_queue_length = 1;
        }
        else
        {
                write_queue_tail->next = q;
                write_queue_tail = q;
+               write_queue_length += 1;
        }
  
        pthread_cond_signal (&write_cond);
@@@ -701,8 -708,11 +708,11 @@@ static value_list_t *plugin_write_deque
  
        q = write_queue_head;
        write_queue_head = q->next;
-       if (write_queue_head == NULL)
+       write_queue_length -= 1;
+       if (write_queue_head == NULL) {
                write_queue_tail = NULL;
+               assert(0 == write_queue_length);
+               }
  
        pthread_mutex_unlock (&write_lock);
  
@@@ -805,6 -815,7 +815,7 @@@ static void stop_write_threads (void) /
        }
        write_queue_head = NULL;
        write_queue_tail = NULL;
+       write_queue_length = 0;
        pthread_mutex_unlock (&write_lock);
  
        if (i > 0)
@@@ -1206,27 -1217,6 +1217,27 @@@ int plugin_register_shutdown (const cha
                                (void *) callback, /* user_data = */ NULL));
  } /* int plugin_register_shutdown */
  
 +static void plugin_free_data_sets (void)
 +{
 +      void *key;
 +      void *value;
 +
 +      if (data_sets == NULL)
 +              return;
 +
 +      while (c_avl_pick (data_sets, &key, &value) == 0)
 +      {
 +              data_set_t *ds = value;
 +              /* key is a pointer to ds->type */
 +
 +              sfree (ds->ds);
 +              sfree (ds);
 +      }
 +
 +      c_avl_destroy (data_sets);
 +      data_sets = NULL;
 +} /* void plugin_free_data_sets */
 +
  int plugin_register_data_set (const data_set_t *ds)
  {
        data_set_t *ds_copy;
@@@ -1444,7 -1434,8 +1455,8 @@@ int plugin_unregister_notification (con
  
  void plugin_init_all (void)
  {
-       const char *chain_name;
+       char const *chain_name;
+       long write_threads_num;
        llentry_t *le;
        int status;
  
        chain_name = global_option_get ("PostCacheChain");
        post_cache_chain = fc_chain_get_by_name (chain_name);
  
+       write_limit_high = global_option_get_long ("WriteQueueLimitHigh",
+                       /* default = */ 0);
+       if (write_limit_high < 0)
        {
-               char const *tmp = global_option_get ("WriteThreads");
-               int num = atoi (tmp);
+               ERROR ("WriteQueueLimitHigh must be positive or zero.");
+               write_limit_high = 0;
+       }
  
-               if (num < 1)
-                       num = 5;
+       write_limit_low = global_option_get_long ("WriteQueueLimitLow",
+                       /* default = */ write_limit_high / 2);
+       if (write_limit_low < 0)
+       {
+               ERROR ("WriteQueueLimitLow must be positive or zero.");
+               write_limit_low = write_limit_high / 2;
+       }
+       else if (write_limit_low > write_limit_high)
+       {
+               ERROR ("WriteQueueLimitLow must not be larger than "
+                               "WriteQueueLimitHigh.");
+               write_limit_low = write_limit_high;
+       }
  
-               start_write_threads ((size_t) num);
+       write_threads_num = global_option_get_long ("WriteThreads",
+                       /* default = */ 5);
+       if (write_threads_num < 1)
+       {
+               ERROR ("WriteThreads must be positive.");
+               write_threads_num = 5;
        }
  
+       start_write_threads ((size_t) write_threads_num);
        if ((list_init == NULL) && (read_heap == NULL))
                return;
  
@@@ -1751,7 -1764,6 +1785,7 @@@ void plugin_shutdown_all (void
        destroy_all_callbacks (&list_log);
  
        plugin_free_loaded ();
 +      plugin_free_data_sets ();
  } /* void plugin_shutdown_all */
  
  int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
@@@ -1972,10 -1984,76 +2006,76 @@@ static int plugin_dispatch_values_inter
        return (0);
  } /* int plugin_dispatch_values_internal */
  
+ static double get_drop_probability (void) /* {{{ */
+ {
+       long pos;
+       long size;
+       long wql;
+       pthread_mutex_lock (&write_lock);
+       wql = write_queue_length;
+       pthread_mutex_unlock (&write_lock);
+       if (wql < write_limit_low)
+               return (0.0);
+       if (wql >= write_limit_high)
+               return (1.0);
+       pos = 1 + wql - write_limit_low;
+       size = 1 + write_limit_high - write_limit_low;
+       return (((double) pos) / ((double) size));
+ } /* }}} double get_drop_probability */
+ static _Bool check_drop_value (void) /* {{{ */
+ {
+       static cdtime_t last_message_time = 0;
+       static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
+       double p;
+       double q;
+       int status;
+       if (write_limit_high == 0)
+               return (0);
+       p = get_drop_probability ();
+       if (p == 0.0)
+               return (0);
+       status = pthread_mutex_trylock (&last_message_lock);
+       if (status == 0)
+       {
+               cdtime_t now;
+               now = cdtime ();
+               if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
+               {
+                       last_message_time = now;
+                       ERROR ("plugin_dispatch_values: Low water mark "
+                                       "reached. Dropping %.0f%% of metrics.",
+                                       100.0 * p);
+               }
+               pthread_mutex_unlock (&last_message_lock);
+       }
+       if (p == 1.0)
+               return (1);
+       q = cdrand_d ();
+       if (q > p)
+               return (1);
+       else
+               return (0);
+ } /* }}} _Bool check_drop_value */
  int plugin_dispatch_values (value_list_t const *vl)
  {
        int status;
  
+       if (check_drop_value ())
+               return (0);
        status = plugin_write_enqueue (vl);
        if (status != 0)
        {