author | Florian Forster <octo@collectd.org> | |
Sat, 13 Jul 2013 09:24:39 +0000 (11:24 +0200) | ||
committer | Florian Forster <octo@collectd.org> | |
Sat, 13 Jul 2013 09:24:39 +0000 (11:24 +0200) |
diff --git a/src/collectd.conf.in b/src/collectd.conf.in
index f4ac81a0f4d77577f4adc7a1ff12dfc4fa598ffd..80aba6a46fbc2e84cc341d3ef33234015de6fa40 100644 (file)
--- a/src/collectd.conf.in
+++ b/src/collectd.conf.in
#ReadThreads 5
#WriteThreads 5
+# Limit the size of the write queue. Default is no limit. Setting up a limit is
+# recommended for servers handling a high volume of traffic.
+#WriteQueueLimitHigh 1000000
+#WriteQueueLimitLow 800000
+
##############################################################################
# Logging #
#----------------------------------------------------------------------------#
diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod
index ddf3ac60e12da8699fdcc238674cee63aff75621..11db1ccdc3d1e07c49f7c6efc93761ba43c0e21b 100644 (file)
--- a/src/collectd.conf.pod
+++ b/src/collectd.conf.pod
default value is B<5>, but you may want to increase this if you have more than
five plugins that may take relatively long to write to.
+=item B<WriteQueueLimitHigh> I<HighNum>
+
+=item B<WriteQueueLimitLow> I<LowNum>
+
+Metrics are read by the I<read threads> and then put into a queue to be handled
+by the I<write threads>. If one of the I<write plugins> is slow (e.g. network
+timeouts, I/O saturation of the disk) this queue will grow. In order to avoid
+running into memory issues in such a case, you can limit the size of this
+queue.
+
+By default, there is no limit and memory may grow indefinitely. This is most
+likely not an issue for clients, i.e. instances that only handle the local
+metrics. For servers it is recommended to set this to a non-zero value, though.
+
+You can set the limits using B<WriteQueueLimitHigh> and B<WriteQueueLimitLow>.
+Each of them takes a numerical argument which is the number of metrics in the
+queue. If there are I<HighNum> metrics in the queue, any new metrics I<will> be
+dropped. If there are less than I<LowNum> metrics in the queue, all new metrics
+I<will> be enqueued. If the number of metrics currently in the queue is between
+I<LowNum> and I<HighNum>, the metric is dropped with a probability that is
+proportional to the number of metrics in the queue (i.e. it increases linearly
+until it reaches 100%.)
+
+If B<WriteQueueLimitHigh> is set to non-zero and B<WriteQueueLimitLow> is
+unset, the latter will default to half of B<WriteQueueLimitHigh>.
+
+If you do not want to randomly drop values when the queue size is between
+I<LowNum> and I<HighNum>, set If B<WriteQueueLimitHigh> and
+B<WriteQueueLimitLow> to same value.
+
=item B<Hostname> I<Name>
Sets the hostname that identifies a host. If you omit this setting, the
diff --git a/src/configfile.c b/src/configfile.c
index d6c224fd74966e23f6758ee79be11b75a67e5960..0e54f267f9fdd08c15d4d4d11a427f409d708723 100644 (file)
--- a/src/configfile.c
+++ b/src/configfile.c
{"Interval", NULL, NULL},
{"ReadThreads", NULL, "5"},
{"WriteThreads", NULL, "5"},
+ {"WriteQueueLimitHigh", NULL, NULL},
+ {"WriteQueueLimitLow", NULL, NULL},
{"Timeout", NULL, "2"},
{"AutoLoadPlugin", NULL, "false"},
{"PreCacheChain", NULL, "PreCache"},
: cf_global_options[i].def);
} /* char *global_option_get */
+long global_option_get_long (const char *option, long default_value)
+{
+ const char *str;
+ long value;
+
+ str = global_option_get (option);
+ if (NULL == str)
+ return (default_value);
+
+ errno = 0;
+ value = strtol (str, /* endptr = */ NULL, /* base = */ 0);
+ if (errno != 0)
+ return (default_value);
+
+ return (value);
+} /* char *global_option_get_long */
+
cdtime_t cf_get_default_interval (void)
{
char const *str = global_option_get ("Interval");
diff --git a/src/configfile.h b/src/configfile.h
index 5a719a421a66a25df0073d6bdc4f24e8cbe9ebac..c91fcd5f4abb0bf87bf2339ae4524ac8e3b1de99 100644 (file)
--- a/src/configfile.h
+++ b/src/configfile.h
int global_option_set (const char *option, const char *value);
const char *global_option_get (const char *option);
+long global_option_get_long (const char *option, long default_value);
+long global_option_get_long_in_range (const char *option, long default_value, long min, long max);
cdtime_t cf_get_default_interval (void);
diff --git a/src/plugin.c b/src/plugin.c
index 2ad866e3bb2fbcb44012ff997b62932563bdd349..6c7aa057cf2d272b126eadf6308a76188d3f3b1a 100644 (file)
--- a/src/plugin.c
+++ b/src/plugin.c
#include "utils_llist.h"
#include "utils_heap.h"
#include "utils_time.h"
+#include "utils_random.h"
#if HAVE_PTHREAD_H
# include <pthread.h>
static write_queue_t *write_queue_head;
static write_queue_t *write_queue_tail;
+static long write_queue_length = 0;
static _Bool write_loop = 1;
static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
static pthread_key_t plugin_ctx_key;
static _Bool plugin_ctx_key_initialized = 0;
+static long write_limit_high = 0;
+static long write_limit_low = 0;
+
/*
* Static functions
*/
{
write_queue_head = q;
write_queue_tail = q;
+ write_queue_length = 1;
}
else
{
write_queue_tail->next = q;
write_queue_tail = q;
+ write_queue_length += 1;
}
pthread_cond_signal (&write_cond);
q = write_queue_head;
write_queue_head = q->next;
- if (write_queue_head == NULL)
+ write_queue_length -= 1;
+ if (write_queue_head == NULL) {
write_queue_tail = NULL;
+ assert(0 == write_queue_length);
+ }
pthread_mutex_unlock (&write_lock);
}
write_queue_head = NULL;
write_queue_tail = NULL;
+ write_queue_length = 0;
pthread_mutex_unlock (&write_lock);
if (i > 0)
void plugin_init_all (void)
{
- const char *chain_name;
+ char const *chain_name;
+ long write_threads_num;
llentry_t *le;
int status;
chain_name = global_option_get ("PostCacheChain");
post_cache_chain = fc_chain_get_by_name (chain_name);
+ write_limit_high = global_option_get_long ("WriteQueueLimitHigh",
+ /* default = */ 0);
+ if (write_limit_high < 0)
{
- char const *tmp = global_option_get ("WriteThreads");
- int num = atoi (tmp);
+ ERROR ("WriteQueueLimitHigh must be positive or zero.");
+ write_limit_high = 0;
+ }
- if (num < 1)
- num = 5;
+ write_limit_low = global_option_get_long ("WriteQueueLimitLow",
+ /* default = */ write_limit_high / 2);
+ if (write_limit_low < 0)
+ {
+ ERROR ("WriteQueueLimitLow must be positive or zero.");
+ write_limit_low = write_limit_high / 2;
+ }
+ else if (write_limit_low > write_limit_high)
+ {
+ ERROR ("WriteQueueLimitLow must not be larger than "
+ "WriteQueueLimitHigh.");
+ write_limit_low = write_limit_high;
+ }
- start_write_threads ((size_t) num);
+ write_threads_num = global_option_get_long ("WriteThreads",
+ /* default = */ 5);
+ if (write_threads_num < 1)
+ {
+ ERROR ("WriteThreads must be positive.");
+ write_threads_num = 5;
}
+ start_write_threads ((size_t) write_threads_num);
+
if ((list_init == NULL) && (read_heap == NULL))
return;
return (0);
} /* int plugin_dispatch_values_internal */
+static double get_drop_probability (void) /* {{{ */
+{
+ long pos;
+ long size;
+ long wql;
+
+ pthread_mutex_lock (&write_lock);
+ wql = write_queue_length;
+ pthread_mutex_unlock (&write_lock);
+
+ if (wql < write_limit_low)
+ return (0.0);
+ if (wql >= write_limit_high)
+ return (1.0);
+
+ pos = 1 + wql - write_limit_low;
+ size = 1 + write_limit_high - write_limit_low;
+
+ return (((double) pos) / ((double) size));
+} /* }}} double get_drop_probability */
+
+static _Bool check_drop_value (void) /* {{{ */
+{
+ static cdtime_t last_message_time = 0;
+ static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
+
+ double p;
+ double q;
+ int status;
+
+ if (write_limit_high == 0)
+ return (0);
+
+ p = get_drop_probability ();
+ if (p == 0.0)
+ return (0);
+
+ status = pthread_mutex_trylock (&last_message_lock);
+ if (status == 0)
+ {
+ cdtime_t now;
+
+ now = cdtime ();
+ if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
+ {
+ last_message_time = now;
+ ERROR ("plugin_dispatch_values: Low water mark "
+ "reached. Dropping %.0f%% of metrics.",
+ 100.0 * p);
+ }
+ pthread_mutex_unlock (&last_message_lock);
+ }
+
+ if (p == 1.0)
+ return (1);
+
+ q = cdrand_d ();
+ if (q > p)
+ return (1);
+ else
+ return (0);
+} /* }}} _Bool check_drop_value */
+
int plugin_dispatch_values (value_list_t const *vl)
{
int status;
+ if (check_drop_value ())
+ return (0);
+
status = plugin_write_enqueue (vl);
if (status != 0)
{