X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fplugin.c;h=30a1ff1a304911f91e571fe56cd22c4d15535078;hb=480d66bbe1970d6cbb68765878f2ee6187bbd5b2;hp=4e106a5b1eb5193c3188ff87934feff0f4d4fa49;hpb=1115921a0d8f73c08fd505344af4266105e1155d;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index 4e106a5b..30a1ff1a 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -1,19 +1,24 @@ /** * collectd - src/plugin.c - * Copyright (C) 2005-2013 Florian octo Forster + * Copyright (C) 2005-2014 Florian octo Forster * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; only version 2 of the License is applicable. + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. * * Authors: * Florian octo Forster @@ -31,6 +36,7 @@ #include "utils_llist.h" #include "utils_heap.h" #include "utils_time.h" +#include "utils_random.h" #if HAVE_PTHREAD_H # include @@ -108,7 +114,7 @@ static int read_threads_num = 0; static write_queue_t *write_queue_head; static write_queue_t *write_queue_tail; -static long write_queue_size = 0; +static long write_queue_length = 0; static _Bool write_loop = 1; static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; @@ -118,10 +124,8 @@ static size_t write_threads_num = 0; static pthread_key_t plugin_ctx_key; static _Bool plugin_ctx_key_initialized = 0; -static long writequeuelengthlimit_high = 0; -static long writequeuelengthlimit_low = 0; - -static unsigned int random_seed; +static long write_limit_high = 0; +static long write_limit_low = 0; /* * Static functions @@ -676,13 +680,13 @@ static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */ { write_queue_head = q; write_queue_tail = q; - write_queue_size = 1; + write_queue_length = 1; } else { write_queue_tail->next = q; write_queue_tail = q; - write_queue_size += 1; + write_queue_length += 1; } pthread_cond_signal (&write_cond); @@ -709,10 +713,10 @@ static value_list_t *plugin_write_dequeue (void) /* {{{ */ q = write_queue_head; write_queue_head = q->next; - write_queue_size -= 1; + write_queue_length -= 1; if (write_queue_head == NULL) { write_queue_tail = NULL; - write_queue_size = 0; /* Maybe instead of setting write_queue_size to 0, we should assert(write_queue_size == 0) ? */ + assert(0 == write_queue_length); } pthread_mutex_unlock (&write_lock); @@ -816,7 +820,7 @@ static void stop_write_threads (void) /* {{{ */ } write_queue_head = NULL; write_queue_tail = NULL; - write_queue_size = 0; + write_queue_length = 0; pthread_mutex_unlock (&write_lock); if (i > 0) @@ -1218,6 +1222,27 @@ int plugin_register_shutdown (const char *name, (void *) callback, /* user_data = */ NULL)); } /* int plugin_register_shutdown */ +static void plugin_free_data_sets (void) +{ + void *key; + void *value; + + if (data_sets == NULL) + return; + + while (c_avl_pick (data_sets, &key, &value) == 0) + { + data_set_t *ds = value; + /* key is a pointer to ds->type */ + + sfree (ds->ds); + sfree (ds); + } + + c_avl_destroy (data_sets); + data_sets = NULL; +} /* void plugin_free_data_sets */ + int plugin_register_data_set (const data_set_t *ds) { data_set_t *ds_copy; @@ -1435,7 +1460,8 @@ int plugin_unregister_notification (const char *name) void plugin_init_all (void) { - const char *chain_name; + char const *chain_name; + long write_threads_num; llentry_t *le; int status; @@ -1448,71 +1474,38 @@ void plugin_init_all (void) chain_name = global_option_get ("PostCacheChain"); post_cache_chain = fc_chain_get_by_name (chain_name); + write_limit_high = global_option_get_long ("WriteQueueLimitHigh", + /* default = */ 0); + if (write_limit_high < 0) { - const char *str_writequeuelengthlimithigh = global_option_get ("WriteQueueLengthLimitHigh"); - const char *str_writequeuelengthlimitlow = global_option_get ("WriteQueueLengthLimitLow"); - - writequeuelengthlimit_high = 0; - writequeuelengthlimit_low = 0; - - if(NULL != str_writequeuelengthlimithigh) { - errno = 0; - /* get high limit */ - writequeuelengthlimit_high = strtol(str_writequeuelengthlimithigh, NULL, 10); - if ((errno == ERANGE && (writequeuelengthlimit_high == LONG_MAX || writequeuelengthlimit_high == LONG_MIN)) - || (errno != 0 && writequeuelengthlimit_high == 0) - ) { - writequeuelengthlimit_high = 0; - ERROR("Config 'WriteQueueLengthLimitHigh' accepts one integer value only. Running with no limit !"); - } - if(writequeuelengthlimit_high < 0) { - ERROR("Config 'WriteQueueLengthLimitHigh' accepts positive values only. Running with no limit !"); - writequeuelengthlimit_high = 0; - } - } - - if((writequeuelengthlimit_high > 0) && (NULL != str_writequeuelengthlimitlow)) { - errno = 0; - /* get low limit */ - writequeuelengthlimit_low = strtol(str_writequeuelengthlimitlow, NULL, 10); - if ((errno == ERANGE && (writequeuelengthlimit_low == LONG_MAX || writequeuelengthlimit_low == LONG_MIN)) - || (errno != 0 && writequeuelengthlimit_low == 0) - ) { - writequeuelengthlimit_low = 0; - ERROR("Config 'WriteQueueLengthLimitLow' accepts one integer value only. Using default low limit instead"); - } - - if(writequeuelengthlimit_low < 0) { - ERROR("Config 'WriteQueueLengthLimitLow' accepts positive values only. Using default low limit instead"); - writequeuelengthlimit_low = 0; - } else if(writequeuelengthlimit_low > writequeuelengthlimit_high) { - ERROR("Config 'WriteQueueLengthLimitLow' (%ld) cannot be bigger than high limit (%ld). Using default low limit instead", - writequeuelengthlimit_low, writequeuelengthlimit_high); - writequeuelengthlimit_low = 0; - } - } - /* Check/compute low limit if not/badly defined */ - if(writequeuelengthlimit_high > 0) { - if(0 == writequeuelengthlimit_low) { - writequeuelengthlimit_low = .5 * writequeuelengthlimit_high; - } - INFO("Config 'WriteQueueLengthLimit*' : Running with limits high=%ld low=%ld", writequeuelengthlimit_high, writequeuelengthlimit_low); - random_seed = time(0); - } else { - writequeuelengthlimit_low = 0; /* This should be useless, but in case... */ - } + ERROR ("WriteQueueLimitHigh must be positive or zero."); + write_limit_high = 0; } + write_limit_low = global_option_get_long ("WriteQueueLimitLow", + /* default = */ write_limit_high / 2); + if (write_limit_low < 0) { - char const *tmp = global_option_get ("WriteThreads"); - int num = atoi (tmp); - - if (num < 1) - num = 5; + ERROR ("WriteQueueLimitLow must be positive or zero."); + write_limit_low = write_limit_high / 2; + } + else if (write_limit_low > write_limit_high) + { + ERROR ("WriteQueueLimitLow must not be larger than " + "WriteQueueLimitHigh."); + write_limit_low = write_limit_high; + } - start_write_threads ((size_t) num); + write_threads_num = global_option_get_long ("WriteThreads", + /* default = */ 5); + if (write_threads_num < 1) + { + ERROR ("WriteThreads must be positive."); + write_threads_num = 5; } + start_write_threads ((size_t) write_threads_num); + if ((list_init == NULL) && (read_heap == NULL)) return; @@ -1797,6 +1790,7 @@ void plugin_shutdown_all (void) destroy_all_callbacks (&list_log); plugin_free_loaded (); + plugin_free_data_sets (); } /* void plugin_shutdown_all */ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ @@ -2017,68 +2011,148 @@ static int plugin_dispatch_values_internal (value_list_t *vl) return (0); } /* int plugin_dispatch_values_internal */ -int plugin_dispatch_values (value_list_t const *vl) +static double get_drop_probability (void) /* {{{ */ { + long pos; + long size; + long wql; + + pthread_mutex_lock (&write_lock); + wql = write_queue_length; + pthread_mutex_unlock (&write_lock); + + if (wql < write_limit_low) + return (0.0); + if (wql >= write_limit_high) + return (1.0); + + pos = 1 + wql - write_limit_low; + size = 1 + write_limit_high - write_limit_low; + + return (((double) pos) / ((double) size)); +} /* }}} double get_drop_probability */ + +static _Bool check_drop_value (void) /* {{{ */ +{ + static cdtime_t last_message_time = 0; + static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER; + + double p; + double q; int status; - int wq_size = write_queue_size; - /* We store write_queue_size in a local variable because other threads may update write_queue_size. - * Having this in a local variable (like a cache) is better : we do not need a lock */ - short metric_will_be_dropped = 0; - - if((writequeuelengthlimit_high > 0) && (wq_size > writequeuelengthlimit_low)) { - if(wq_size >= writequeuelengthlimit_high) { - /* if high == low, we come here too */ - metric_will_be_dropped = 1; - } else { - /* here, high != low */ - long probability_to_drop; - long n; - - probability_to_drop = (wq_size - writequeuelengthlimit_low); - - /* We use rand_r with its bad RNG because it's enough for playing dices. - * There is no security consideration here so rand_r() should be enough here. - */ - n = rand_r(&random_seed) % (writequeuelengthlimit_high - writequeuelengthlimit_low) ; - - /* Let's have X = high - low. - * n is in range [0..X] - * probability_to_drop is in range [1..X[ - * probability_to_drop gets bigger when wq_size gets bigger. - */ - if(n <= probability_to_drop) { - metric_will_be_dropped = 1; - } - } - } - if( ! metric_will_be_dropped) { - status = plugin_write_enqueue (vl); - if (status != 0) + if (write_limit_high == 0) + return (0); + + p = get_drop_probability (); + if (p == 0.0) + return (0); + + status = pthread_mutex_trylock (&last_message_lock); + if (status == 0) + { + cdtime_t now; + + now = cdtime (); + if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1)) { - char errbuf[1024]; - ERROR ("plugin_dispatch_values: plugin_write_enqueue failed " - "with status %i (%s).", status, - sstrerror (status, errbuf, sizeof (errbuf))); - return (status); + last_message_time = now; + ERROR ("plugin_dispatch_values: Low water mark " + "reached. Dropping %.0f%% of metrics.", + 100.0 * p); } + pthread_mutex_unlock (&last_message_lock); } + + if (p == 1.0) + return (1); + + q = cdrand_d (); + if (q > p) + return (1); else + return (0); +} /* }}} _Bool check_drop_value */ + +int plugin_dispatch_values (value_list_t const *vl) +{ + int status; + + if (check_drop_value ()) + return (0); + + status = plugin_write_enqueue (vl); + if (status != 0) { - /* If you want to count dropped metrics, don't forget to add a lock here */ - /* dropped_metrics++; */ - ERROR ("plugin_dispatch_values: value dropped (write queue %ld > %ld) : time = %.3f; interval = %.3f ; %s/%s%s%s/%s%s%s", - write_queue_size, writequeuelengthlimit_low, - CDTIME_T_TO_DOUBLE (vl->time), - CDTIME_T_TO_DOUBLE (vl->interval), - vl->host, - vl->plugin, vl->plugin_instance[0]?"-":"", vl->plugin_instance, - vl->type, vl->type_instance[0]?"-":"", vl->type_instance); + char errbuf[1024]; + ERROR ("plugin_dispatch_values: plugin_write_enqueue failed " + "with status %i (%s).", status, + sstrerror (status, errbuf, sizeof (errbuf))); + return (status); } return (0); } +__attribute__((sentinel)) +int plugin_dispatch_multivalue (value_list_t const *template, /* {{{ */ + _Bool store_percentage, ...) +{ + value_list_t *vl; + int failed = 0; + gauge_t sum = 0.0; + va_list ap; + + assert (template->values_len == 1); + + va_start (ap, store_percentage); + while (42) + { + char const *name; + gauge_t value; + + name = va_arg (ap, char const *); + if (name == NULL) + break; + + value = va_arg (ap, gauge_t); + if (!isnan (value)) + sum += value; + } + va_end (ap); + + vl = plugin_value_list_clone (template); + /* plugin_value_list_clone makes sure vl->time is set to non-zero. */ + if (store_percentage) + sstrncpy (vl->type, "percent", sizeof (vl->type)); + + va_start (ap, store_percentage); + while (42) + { + char const *name; + int status; + + /* Set the type instance. */ + name = va_arg (ap, char const *); + if (name == NULL) + break; + sstrncpy (vl->type_instance, name, sizeof (vl->type_instance)); + + /* Set the value. */ + vl->values[0].gauge = va_arg (ap, gauge_t); + if (store_percentage) + vl->values[0].gauge *= 100.0 / sum; + + status = plugin_write_enqueue (vl); + if (status != 0) + failed++; + } + va_end (ap); + + plugin_value_list_free (vl); + return (failed); +} /* }}} int plugin_dispatch_multivalue */ + int plugin_dispatch_notification (const notification_t *notif) { llentry_t *le;