From: Florian Forster Date: Sat, 13 Jul 2013 09:19:54 +0000 (+0200) Subject: src/plugin.c: Some fixes for write limits. X-Git-Tag: collectd-5.4.0~21^2 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=d189964960c406ddaff60e47b4ae01fcc38bec45;p=collectd.git src/plugin.c: Some fixes for write limits. * Log an error once per second. * Coding style fixes. * Separate function for calculating drop probability. --- diff --git a/src/plugin.c b/src/plugin.c index 7ccfc4dd..12f002cf 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -121,7 +121,6 @@ static _Bool plugin_ctx_key_initialized = 0; static long write_limit_high = 0; static long write_limit_low = 0; -cdtime_t last_drop_time = 0; /* * Static functions @@ -1985,54 +1984,75 @@ static int plugin_dispatch_values_internal (value_list_t *vl) return (0); } /* int plugin_dispatch_values_internal */ -static _Bool drop_metric(void) { - _Bool drop = 0; - int wq_len = write_queue_length; - /* We store write_queue_length in a local variable because other threads may update write_queue_length. - * Having this in a local variable (like a cache) is better : we do not need a lock */ - - if(wq_len < write_limit_low) return(0); - - if((write_limit_high > 0) && (wq_len > write_limit_low)) { - if(wq_len >= write_limit_high) { - /* if high == low, we come here too */ - drop = 1; - } else { - /* here, high != low */ - long probability_to_drop; - long n; - - probability_to_drop = (wq_len - write_limit_low); - - n = cdrand_range(write_limit_low, write_limit_high); - - /* Let's have X = high - low. - * n is in range [0..X] - * probability_to_drop is in range [1..X[ - * probability_to_drop gets bigger when wq_len gets bigger. - */ - if(n <= probability_to_drop) { - drop = 1; - } - } - } - if(drop) { - cdtime_t now = cdtime(); - if((now - last_drop_time) > TIME_T_TO_CDTIME_T (60)) { - last_drop_time = now; - /* If you want to count dropped metrics, don't forget to add a lock here */ - /* dropped_metrics++; */ - ERROR ("plugin_dispatch_values : Low water mark reached, dropping a metric"); +static double get_drop_probability (void) /* {{{ */ +{ + long pos; + long size; + long wql; + + pthread_mutex_lock (&write_lock); + wql = write_queue_length; + pthread_mutex_unlock (&write_lock); + + if (wql < write_limit_low) + return (0.0); + if (wql >= write_limit_high) + return (1.0); + + pos = 1 + wql - write_limit_low; + size = 1 + write_limit_high - write_limit_low; + + return (((double) pos) / ((double) size)); +} /* }}} double get_drop_probability */ + +static _Bool check_drop_value (void) /* {{{ */ +{ + static cdtime_t last_message_time = 0; + static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER; + + double p; + double q; + int status; + + if (write_limit_high == 0) + return (0); + + p = get_drop_probability (); + if (p == 0.0) + return (0); + + status = pthread_mutex_trylock (&last_message_lock); + if (status == 0) + { + cdtime_t now; + + now = cdtime (); + if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1)) + { + last_message_time = now; + ERROR ("plugin_dispatch_values: Low water mark " + "reached. Dropping %.0f%% of metrics.", + 100.0 * p); } + pthread_mutex_unlock (&last_message_lock); } - return(drop); -} + + if (p == 1.0) + return (1); + + q = cdrand_d (); + if (q > p) + return (1); + else + return (0); +} /* }}} _Bool check_drop_value */ int plugin_dispatch_values (value_list_t const *vl) { int status; - if(drop_metric ()) return(0); + if (check_drop_value ()) + return (0); status = plugin_write_enqueue (vl); if (status != 0)