summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 4e3117f)
raw | patch | inline | side by side (parent: 4e3117f)
author | Florian Forster <octo@collectd.org> | |
Mon, 21 Jan 2013 09:27:55 +0000 (10:27 +0100) | ||
committer | Florian Forster <octo@collectd.org> | |
Mon, 21 Jan 2013 09:27:55 +0000 (10:27 +0100) |
This fixes Github issue #75.
diff --git a/src/collectd.conf.in b/src/collectd.conf.in
index dead93b6154985d67cfb7269e5fcbbc1ae5ad70a..e7428fb0d0a0cc58bafd139794e1da8f55eadc8e 100644 (file)
--- a/src/collectd.conf.in
+++ b/src/collectd.conf.in
#Timeout 2
#ReadThreads 5
+#WriteThreads 5
##############################################################################
# Logging #
diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod
index 01346e14ca29e275351659575dc7e7ff05eb352d..74a8cfc7e2629f050fbd48f2514978c6bd4843c9 100644 (file)
--- a/src/collectd.conf.pod
+++ b/src/collectd.conf.pod
Number of threads to start for reading plugins. The default value is B<5>, but
you may want to increase this if you have more than five plugins that take a
-long time to read. Mostly those are plugin that do network-IO. Setting this to
-a value higher than the number of plugins you've loaded is totally useless.
+long time to read. Mostly those are plugins that do network-IO. Setting this to
+a value higher than the number of registered read callbacks is not recommended.
+
+=item B<WriteThreads> I<Num>
+
+Number of threads to start for dispatching value lists to write plugins. The
+default value is B<5>, but you may want to increase this if you have more than
+five plugins that may take relatively long to write to.
=item B<Hostname> I<Name>
diff --git a/src/configfile.c b/src/configfile.c
index b16ae476de9e9e79600e96dacf5467e30bab1a71..ac5e8edcb94f2bfda66c5eefab67dfab3c66e6db 100644 (file)
--- a/src/configfile.c
+++ b/src/configfile.c
{"FQDNLookup", NULL, "true"},
{"Interval", NULL, NULL},
{"ReadThreads", NULL, "5"},
+ {"WriteThreads", NULL, "5"},
{"Timeout", NULL, "2"},
{"PreCacheChain", NULL, "PreCache"},
{"PostCacheChain", NULL, "PostCache"}
diff --git a/src/plugin.c b/src/plugin.c
index bbede051bbfaea4972c120b4c3e4f79f5e777fe8..0aa33b1dce924e4d88329d50c68c0cf400f1265c 100644 (file)
--- a/src/plugin.c
+++ b/src/plugin.c
/**
* collectd - src/plugin.c
- * Copyright (C) 2005-2011 Florian octo Forster
+ * Copyright (C) 2005-2013 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
};
typedef struct read_func_s read_func_t;
+struct write_queue_s;
+typedef struct write_queue_s write_queue_t;
+struct write_queue_s
+{
+ value_list_t *vl;
+ write_queue_t *next;
+};
+
/*
* Private variables
*/
static pthread_t *read_threads = NULL;
static int read_threads_num = 0;
+static write_queue_t *write_queue_head;
+static write_queue_t *write_queue_tail;
+static _Bool write_loop = 1;
+static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t *write_threads = NULL;
+static size_t write_threads_num = 0;
+
static pthread_key_t plugin_ctx_key;
static _Bool plugin_ctx_key_initialized = 0;
/*
* Static functions
*/
+static int plugin_dispatch_values_internal (value_list_t *vl);
+
static const char *plugin_get_dir (void)
{
if (plugindir == NULL)
read_threads_num = 0;
} /* void stop_read_threads */
+static void plugin_value_list_free (value_list_t *vl) /* {{{ */
+{
+ if (vl == NULL)
+ return;
+
+ meta_data_destroy (vl->meta);
+ sfree (vl->values);
+ sfree (vl);
+} /* }}} void plugin_value_list_free */
+
+static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
+{
+ value_list_t *vl;
+
+ if (vl_orig == NULL)
+ return (NULL);
+
+ vl = malloc (sizeof (*vl));
+ if (vl == NULL)
+ return (NULL);
+ memcpy (vl, vl_orig, sizeof (*vl));
+
+ vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
+ if (vl->values == NULL)
+ {
+ plugin_value_list_free (vl);
+ return (NULL);
+ }
+ memcpy (vl->values, vl_orig->values,
+ vl_orig->values_len * sizeof (*vl->values));
+
+ vl->meta = meta_data_clone (vl->meta);
+ if ((vl_orig->meta != NULL) && (vl->meta == NULL))
+ {
+ plugin_value_list_free (vl);
+ return (NULL);
+ }
+
+ return (vl);
+} /* }}} value_list_t *plugin_value_list_clone */
+
+static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
+{
+ write_queue_t *q;
+
+ q = malloc (sizeof (*q));
+ if (q == NULL)
+ return (ENOMEM);
+ q->next = NULL;
+
+ q->vl = plugin_value_list_clone (vl);
+ if (q->vl == NULL)
+ {
+ sfree (q);
+ return (ENOMEM);
+ }
+
+ pthread_mutex_lock (&write_lock);
+
+ if (write_queue_tail == NULL)
+ {
+ write_queue_head = q;
+ write_queue_tail = q;
+ }
+ else
+ {
+ write_queue_tail->next = q;
+ write_queue_tail = q;
+ }
+
+ pthread_cond_signal (&write_cond);
+ pthread_mutex_unlock (&write_lock);
+
+ return (0);
+} /* }}} int plugin_write_enqueue */
+
+static value_list_t *plugin_write_dequeue (void) /* {{{ */
+{
+ write_queue_t *q;
+ value_list_t *vl;
+
+ pthread_mutex_lock (&write_lock);
+
+ while (write_loop && (write_queue_head == NULL))
+ pthread_cond_wait (&write_cond, &write_lock);
+
+ if (write_queue_head == NULL)
+ {
+ pthread_mutex_unlock (&write_lock);
+ return (NULL);
+ }
+
+ q = write_queue_head;
+ write_queue_head = q->next;
+ if (write_queue_head == NULL)
+ write_queue_tail = NULL;
+
+ pthread_mutex_unlock (&write_lock);
+
+ vl = q->vl;
+ sfree (q);
+ return (vl);
+} /* }}} value_list_t *plugin_write_dequeue */
+
+static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
+{
+ while (write_loop)
+ {
+ value_list_t *vl = plugin_write_dequeue ();
+ if (vl == NULL)
+ continue;
+
+ plugin_dispatch_values_internal (vl);
+
+ plugin_value_list_free (vl);
+ }
+
+ pthread_exit (NULL);
+ return ((void *) 0);
+} /* }}} void *plugin_write_thread */
+
+static void start_write_threads (size_t num) /* {{{ */
+{
+ size_t i;
+
+ if (write_threads != NULL)
+ return;
+
+ write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+ if (write_threads == NULL)
+ {
+ ERROR ("plugin: start_write_threads: calloc failed.");
+ return;
+ }
+
+ write_threads_num = 0;
+ for (i = 0; i < num; i++)
+ {
+ int status;
+
+ status = pthread_create (write_threads + write_threads_num,
+ /* attr = */ NULL,
+ plugin_write_thread,
+ /* arg = */ NULL);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("plugin: start_write_threads: pthread_create failed "
+ "with status %i (%s).", status,
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ return;
+ }
+
+ write_threads_num++;
+ } /* for (i) */
+} /* }}} void start_write_threads */
+
+static void stop_write_threads (void) /* {{{ */
+{
+ write_queue_t *q;
+ int i;
+
+ if (write_threads == NULL)
+ return;
+
+ INFO ("collectd: Stopping %zu write threads.", write_threads_num);
+
+ pthread_mutex_lock (&write_lock);
+ write_loop = 0;
+ DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
+ pthread_cond_broadcast (&write_cond);
+ pthread_mutex_unlock (&write_lock);
+
+ for (i = 0; i < write_threads_num; i++)
+ {
+ if (pthread_join (write_threads[i], NULL) != 0)
+ {
+ ERROR ("plugin: stop_write_threads: pthread_join failed.");
+ }
+ write_threads[i] = (pthread_t) 0;
+ }
+ sfree (write_threads);
+ write_threads_num = 0;
+
+ pthread_mutex_lock (&write_lock);
+ i = 0;
+ for (q = write_queue_head; q != NULL; q = q->next)
+ {
+ plugin_value_list_free (q->vl);
+ sfree (q);
+ i++;
+ }
+ write_queue_head = NULL;
+ write_queue_tail = NULL;
+ pthread_mutex_unlock (&write_lock);
+
+ if (i > 0)
+ {
+ WARNING ("plugin: %i value list%s left after shutting down "
+ "the write threads.",
+ i, (i == 1) ? " was" : "s were");
+ }
+} /* }}} void stop_write_threads */
+
/*
* Public functions
*/
chain_name = global_option_get ("PostCacheChain");
post_cache_chain = fc_chain_get_by_name (chain_name);
+ {
+ char const *tmp = global_option_get ("WriteThreads");
+ int num = atoi (tmp);
+
+ if (num < 1)
+ num = 5;
+
+ start_write_threads ((size_t) num);
+ }
if ((list_init == NULL) && (read_heap == NULL))
return;
plugin_set_ctx (old_ctx);
}
+ stop_write_threads ();
+
/* Write plugins which use the `user_data' pointer usually need the
* same data available to the flush callback. If this is the case, set
* the free_function to NULL when registering the flush callback and to
return (0);
} /* int }}} plugin_dispatch_missing */
-int plugin_dispatch_values (value_list_t *vl)
+static int plugin_dispatch_values_internal (value_list_t *vl)
{
int status;
static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
}
return (0);
-} /* int plugin_dispatch_values */
+} /* int plugin_dispatch_values_internal */
-int plugin_dispatch_values_secure (const value_list_t *vl)
+int plugin_dispatch_values (value_list_t const *vl)
{
- value_list_t vl_copy;
- int status;
-
- if (vl == NULL)
- return EINVAL;
-
- memcpy (&vl_copy, vl, sizeof (vl_copy));
-
- /* Write callbacks must not change the values and meta pointers, so we can
- * savely skip copying those and make this more efficient. */
- if ((pre_cache_chain == NULL) && (post_cache_chain == NULL))
- return (plugin_dispatch_values (&vl_copy));
-
- /* Set pointers to NULL, just to be on the save side. */
- vl_copy.values = NULL;
- vl_copy.meta = NULL;
-
- vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len);
- if (vl_copy.values == NULL)
- {
- ERROR ("plugin_dispatch_values_secure: malloc failed.");
- return (ENOMEM);
- }
- memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len);
-
- if (vl->meta != NULL)
- {
- vl_copy.meta = meta_data_clone (vl->meta);
- if (vl_copy.meta == NULL)
- {
- ERROR ("plugin_dispatch_values_secure: meta_data_clone failed.");
- free (vl_copy.values);
- return (ENOMEM);
- }
- } /* if (vl->meta) */
+ int status;
- status = plugin_dispatch_values (&vl_copy);
+ status = plugin_write_enqueue (vl);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
+ "with status %i (%s).", status,
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ return (status);
+ }
- meta_data_destroy (vl_copy.meta);
- free (vl_copy.values);
+ return (0);
+}
- return (status);
+int plugin_dispatch_values_secure (const value_list_t *vl)
+{
+ return (plugin_dispatch_values (vl));
} /* int plugin_dispatch_values_secure */
int plugin_dispatch_notification (const notification_t *notif)
diff --git a/src/plugin.h b/src/plugin.h
index 0f35de5615b9357b616b1bbb60d9a1b7c069f894..c28709e00a0458b100c0ce1bd68c488ed41ba8c9 100644 (file)
--- a/src/plugin.h
+++ b/src/plugin.h
* `vl' Value list of the values that have been read by a `read'
* function.
*/
-int plugin_dispatch_values (value_list_t *vl);
+int plugin_dispatch_values (value_list_t const *vl);
int plugin_dispatch_values_secure (const value_list_t *vl);
int plugin_dispatch_missing (const value_list_t *vl);