X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fplugin.c;h=4e106a5b1eb5193c3188ff87934feff0f4d4fa49;hb=1115921a0d8f73c08fd505344af4266105e1155d;hp=942f8bfe50438066adc80b46ac4175c990f9a642;hpb=c9ca810479718e02eeecfda9155f06585a0362fc;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index 942f8bfe..4e106a5b 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -21,22 +21,22 @@ **/ #include "collectd.h" -#include "utils_complain.h" - -#include - -#if HAVE_PTHREAD_H -# include -#endif - #include "common.h" #include "plugin.h" #include "configfile.h" +#include "filter_chain.h" #include "utils_avltree.h" +#include "utils_cache.h" +#include "utils_complain.h" #include "utils_llist.h" #include "utils_heap.h" -#include "utils_cache.h" -#include "filter_chain.h" +#include "utils_time.h" + +#if HAVE_PTHREAD_H +# include +#endif + +#include /* * Private structures @@ -61,11 +61,11 @@ struct read_func_s #define rf_ctx rf_super.cf_ctx callback_func_t rf_super; char rf_group[DATA_MAX_NAME_LEN]; - char rf_name[DATA_MAX_NAME_LEN]; + char *rf_name; int rf_type; - struct timespec rf_interval; - struct timespec rf_effective_interval; - struct timespec rf_next_read; + cdtime_t rf_interval; + cdtime_t rf_effective_interval; + cdtime_t rf_next_read; }; typedef struct read_func_s read_func_t; @@ -81,6 +81,8 @@ struct write_queue_s /* * Private variables */ +static c_avl_tree_t *plugins_loaded = NULL; + static llist_t *list_init; static llist_t *list_write; static llist_t *list_flush; @@ -106,6 +108,7 @@ static int read_threads_num = 0; static write_queue_t *write_queue_head; static write_queue_t *write_queue_tail; +static long write_queue_size = 0; static _Bool write_loop = 1; static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; @@ -115,6 +118,11 @@ static size_t write_threads_num = 0; static pthread_key_t plugin_ctx_key; static _Bool plugin_ctx_key_initialized = 0; +static long writequeuelengthlimit_high = 0; +static long writequeuelengthlimit_low = 0; + +static unsigned int random_seed; + /* * Static functions */ @@ -360,13 +368,6 @@ static int plugin_load_file (char *file, uint32_t flags) return (0); } -static _Bool timeout_reached(struct timespec timeout) -{ - struct timeval now; - gettimeofday(&now, NULL); - return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000)); -} - static void *plugin_read_thread (void __attribute__((unused)) *args) { while (read_loop != 0) @@ -392,18 +393,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) } pthread_mutex_unlock (&read_lock); - if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0)) + if (rf->rf_interval == 0) { /* this should not happen, because the interval is set * for each plugin when loading it * XXX: issue a warning? */ - now = cdtime (); - - CDTIME_T_TO_TIMESPEC (plugin_get_interval (), &rf->rf_interval); - + rf->rf_interval = plugin_get_interval (); rf->rf_effective_interval = rf->rf_interval; - CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read); + rf->rf_next_read = cdtime (); } /* sleep until this entry is due, @@ -415,11 +413,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) * pthread_cond_timedwait returns. */ rc = 0; while ((read_loop != 0) - && !timeout_reached(rf->rf_next_read) + && (cdtime () < rf->rf_next_read) && rc == 0) { + struct timespec ts = { 0 }; + + CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts); + rc = pthread_cond_timedwait (&read_cond, &read_lock, - &rf->rf_next_read); + &ts); } /* Must hold `read_lock' when accessing `rf->rf_type'. */ @@ -443,6 +445,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) { DEBUG ("plugin_read_thread: Destroying the `%s' " "callback.", rf->rf_name); + sfree (rf->rf_name); destroy_callback ((callback_func_t *) rf); rf = NULL; continue; @@ -475,20 +478,14 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) * intervals in which it will be called. */ if (status != 0) { - rf->rf_effective_interval.tv_sec *= 2; - rf->rf_effective_interval.tv_nsec *= 2; - NORMALIZE_TIMESPEC (rf->rf_effective_interval); - - if (rf->rf_effective_interval.tv_sec >= 86400) - { - rf->rf_effective_interval.tv_sec = 86400; - rf->rf_effective_interval.tv_nsec = 0; - } + rf->rf_effective_interval *= 2; + if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400)) + rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400); NOTICE ("read-function of plugin `%s' failed. " - "Will suspend it for %i seconds.", + "Will suspend it for %.3f seconds.", rf->rf_name, - (int) rf->rf_effective_interval.tv_sec); + CDTIME_T_TO_DOUBLE (rf->rf_effective_interval)); } else { @@ -500,32 +497,26 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) now = cdtime (); DEBUG ("plugin_read_thread: Effective interval of the " - "%s plugin is %i.%09i.", + "%s plugin is %.3f seconds.", rf->rf_name, - (int) rf->rf_effective_interval.tv_sec, - (int) rf->rf_effective_interval.tv_nsec); + CDTIME_T_TO_DOUBLE (rf->rf_effective_interval)); /* Calculate the next (absolute) time at which this function * should be called. */ - rf->rf_next_read.tv_sec = rf->rf_next_read.tv_sec - + rf->rf_effective_interval.tv_sec; - rf->rf_next_read.tv_nsec = rf->rf_next_read.tv_nsec - + rf->rf_effective_interval.tv_nsec; - NORMALIZE_TIMESPEC (rf->rf_next_read); + rf->rf_next_read += rf->rf_effective_interval; /* Check, if `rf_next_read' is in the past. */ - if (TIMESPEC_TO_CDTIME_T (&rf->rf_next_read) < now) + if (rf->rf_next_read < now) { /* `rf_next_read' is in the past. Insert `now' * so this value doesn't trail off into the * past too much. */ - CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read); + rf->rf_next_read = now; } - DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.", + DEBUG ("plugin_read_thread: Next read of the %s plugin at %.3f.", rf->rf_name, - (int) rf->rf_next_read.tv_sec, - (int) rf->rf_next_read.tv_nsec); + CDTIME_T_TO_DOUBLE (rf->rf_next_read)); /* Re-insert this read function into the heap again. */ c_heap_insert (read_heap, rf); @@ -685,11 +676,13 @@ static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */ { write_queue_head = q; write_queue_tail = q; + write_queue_size = 1; } else { write_queue_tail->next = q; write_queue_tail = q; + write_queue_size += 1; } pthread_cond_signal (&write_cond); @@ -716,8 +709,11 @@ static value_list_t *plugin_write_dequeue (void) /* {{{ */ q = write_queue_head; write_queue_head = q->next; - if (write_queue_head == NULL) + write_queue_size -= 1; + if (write_queue_head == NULL) { write_queue_tail = NULL; + write_queue_size = 0; /* Maybe instead of setting write_queue_size to 0, we should assert(write_queue_size == 0) ? */ + } pthread_mutex_unlock (&write_lock); @@ -820,6 +816,7 @@ static void stop_write_threads (void) /* {{{ */ } write_queue_head = NULL; write_queue_tail = NULL; + write_queue_size = 0; pthread_mutex_unlock (&write_lock); if (i > 0) @@ -848,8 +845,52 @@ void plugin_set_dir (const char *dir) } } +static _Bool plugin_is_loaded (char const *name) +{ + int status; + + if (plugins_loaded == NULL) + plugins_loaded = c_avl_create ((void *) strcasecmp); + assert (plugins_loaded != NULL); + + status = c_avl_get (plugins_loaded, name, /* ret_value = */ NULL); + return (status == 0); +} + +static int plugin_mark_loaded (char const *name) +{ + char *name_copy; + int status; + + name_copy = strdup (name); + if (name_copy == NULL) + return (ENOMEM); + + status = c_avl_insert (plugins_loaded, + /* key = */ name_copy, /* value = */ NULL); + return (status); +} + +static void plugin_free_loaded () +{ + void *key; + void *value; + + if (plugins_loaded == NULL) + return; + + while (c_avl_pick (plugins_loaded, &key, &value) == 0) + { + sfree (key); + assert (value == NULL); + } + + c_avl_destroy (plugins_loaded); + plugins_loaded = NULL; +} + #define BUFSIZE 512 -int plugin_load (const char *type, uint32_t flags) +int plugin_load (char const *plugin_name, uint32_t flags) { DIR *dh; const char *dir; @@ -861,15 +902,38 @@ int plugin_load (const char *type, uint32_t flags) struct dirent *de; int status; + if (plugin_name == NULL) + return (EINVAL); + + /* Check if plugin is already loaded and don't do anything in this + * case. */ + if (plugin_is_loaded (plugin_name)) + return (0); + dir = plugin_get_dir (); ret = 1; + /* + * XXX: Magic at work: + * + * Some of the language bindings, for example the Python and Perl + * plugins, need to be able to export symbols to the scripts they run. + * For this to happen, the "Globals" flag needs to be set. + * Unfortunately, this technical detail is hard to explain to the + * average user and she shouldn't have to worry about this, ideally. + * So in order to save everyone's sanity use a different default for a + * handful of special plugins. --octo + */ + if ((strcasecmp ("perl", plugin_name) == 0) + || (strcasecmp ("python", plugin_name) == 0)) + flags |= PLUGIN_FLAGS_GLOBAL; + /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the * type when matching the filename */ - status = ssnprintf (typename, sizeof (typename), "%s.so", type); + status = ssnprintf (typename, sizeof (typename), "%s.so", plugin_name); if ((status < 0) || ((size_t) status >= sizeof (typename))) { - WARNING ("plugin_load: Filename too long: \"%s.so\"", type); + WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name); return (-1); } typename_len = strlen (typename); @@ -916,13 +980,14 @@ int plugin_load (const char *type, uint32_t flags) if (status == 0) { /* success */ + plugin_mark_loaded (plugin_name); ret = 0; break; } else { ERROR ("plugin_load: Load plugin \"%s\" failed with " - "status %i.", type, status); + "status %i.", plugin_name, status); } } @@ -930,7 +995,7 @@ int plugin_load (const char *type, uint32_t flags) if (filename[0] == 0) ERROR ("plugin_load: Could not find plugin \"%s\" in %s", - type, dir); + plugin_name, dir); return (ret); } @@ -967,13 +1032,9 @@ static int plugin_compare_read_func (const void *arg0, const void *arg1) rf0 = arg0; rf1 = arg1; - if (rf0->rf_next_read.tv_sec < rf1->rf_next_read.tv_sec) - return (-1); - else if (rf0->rf_next_read.tv_sec > rf1->rf_next_read.tv_sec) - return (1); - else if (rf0->rf_next_read.tv_nsec < rf1->rf_next_read.tv_nsec) + if (rf0->rf_next_read < rf1->rf_next_read) return (-1); - else if (rf0->rf_next_read.tv_nsec > rf1->rf_next_read.tv_nsec) + else if (rf0->rf_next_read > rf1->rf_next_read) return (1); else return (0); @@ -987,6 +1048,9 @@ static int plugin_insert_read (read_func_t *rf) int status; llentry_t *le; + rf->rf_next_read = cdtime (); + rf->rf_effective_interval = rf->rf_interval; + pthread_mutex_lock (&read_lock); if (read_list == NULL) @@ -1048,43 +1112,12 @@ static int plugin_insert_read (read_func_t *rf) return (0); } /* int plugin_insert_read */ -static int read_cb_wrapper (user_data_t *ud) -{ - int (*callback) (void); - - if (ud == NULL) - return -1; - - callback = ud->data; - return callback(); -} /* int read_cb_wrapper */ - int plugin_register_read (const char *name, int (*callback) (void)) { read_func_t *rf; - plugin_ctx_t ctx = plugin_get_ctx (); int status; - if (ctx.interval != 0) { - /* If ctx.interval is not zero (== use the plugin or global - * interval), we need to use the "complex" read callback, - * because only that allows to specify a different interval. - * Wrap the callback using read_cb_wrapper(). */ - struct timespec interval; - user_data_t user_data; - - user_data.data = callback; - user_data.free_func = NULL; - - CDTIME_T_TO_TIMESPEC (ctx.interval, &interval); - return plugin_register_complex_read (/* group = */ NULL, - name, read_cb_wrapper, &interval, &user_data); - } - - DEBUG ("plugin_register_read: default_interval = %.3f", - CDTIME_T_TO_DOUBLE(plugin_get_interval ())); - rf = malloc (sizeof (*rf)); if (rf == NULL) { @@ -1096,13 +1129,11 @@ int plugin_register_read (const char *name, rf->rf_callback = (void *) callback; rf->rf_udata.data = NULL; rf->rf_udata.free_func = NULL; - rf->rf_ctx = ctx; + rf->rf_ctx = plugin_get_ctx (); rf->rf_group[0] = '\0'; - sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); + rf->rf_name = strdup (name); rf->rf_type = RF_SIMPLE; - rf->rf_interval.tv_sec = 0; - rf->rf_interval.tv_nsec = 0; - rf->rf_effective_interval = rf->rf_interval; + rf->rf_interval = plugin_get_interval (); status = plugin_insert_read (rf); if (status != 0) @@ -1117,7 +1148,6 @@ int plugin_register_complex_read (const char *group, const char *name, user_data_t *user_data) { read_func_t *rf; - plugin_ctx_t ctx = plugin_get_ctx (); int status; rf = malloc (sizeof (*rf)); @@ -1133,21 +1163,12 @@ int plugin_register_complex_read (const char *group, const char *name, sstrncpy (rf->rf_group, group, sizeof (rf->rf_group)); else rf->rf_group[0] = '\0'; - sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); + rf->rf_name = strdup (name); rf->rf_type = RF_COMPLEX; if (interval != NULL) - { - rf->rf_interval = *interval; - } - else if (ctx.interval != 0) - { - CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval); - } - rf->rf_effective_interval = rf->rf_interval; - - DEBUG ("plugin_register_read: interval = %i.%09i", - (int) rf->rf_interval.tv_sec, - (int) rf->rf_interval.tv_nsec); + rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval); + else + rf->rf_interval = plugin_get_interval (); /* Set user data */ if (user_data == NULL) @@ -1160,7 +1181,7 @@ int plugin_register_complex_read (const char *group, const char *name, rf->rf_udata = *user_data; } - rf->rf_ctx = ctx; + rf->rf_ctx = plugin_get_ctx (); status = plugin_insert_read (rf); if (status != 0) @@ -1427,6 +1448,61 @@ void plugin_init_all (void) chain_name = global_option_get ("PostCacheChain"); post_cache_chain = fc_chain_get_by_name (chain_name); + { + const char *str_writequeuelengthlimithigh = global_option_get ("WriteQueueLengthLimitHigh"); + const char *str_writequeuelengthlimitlow = global_option_get ("WriteQueueLengthLimitLow"); + + writequeuelengthlimit_high = 0; + writequeuelengthlimit_low = 0; + + if(NULL != str_writequeuelengthlimithigh) { + errno = 0; + /* get high limit */ + writequeuelengthlimit_high = strtol(str_writequeuelengthlimithigh, NULL, 10); + if ((errno == ERANGE && (writequeuelengthlimit_high == LONG_MAX || writequeuelengthlimit_high == LONG_MIN)) + || (errno != 0 && writequeuelengthlimit_high == 0) + ) { + writequeuelengthlimit_high = 0; + ERROR("Config 'WriteQueueLengthLimitHigh' accepts one integer value only. Running with no limit !"); + } + if(writequeuelengthlimit_high < 0) { + ERROR("Config 'WriteQueueLengthLimitHigh' accepts positive values only. Running with no limit !"); + writequeuelengthlimit_high = 0; + } + } + + if((writequeuelengthlimit_high > 0) && (NULL != str_writequeuelengthlimitlow)) { + errno = 0; + /* get low limit */ + writequeuelengthlimit_low = strtol(str_writequeuelengthlimitlow, NULL, 10); + if ((errno == ERANGE && (writequeuelengthlimit_low == LONG_MAX || writequeuelengthlimit_low == LONG_MIN)) + || (errno != 0 && writequeuelengthlimit_low == 0) + ) { + writequeuelengthlimit_low = 0; + ERROR("Config 'WriteQueueLengthLimitLow' accepts one integer value only. Using default low limit instead"); + } + + if(writequeuelengthlimit_low < 0) { + ERROR("Config 'WriteQueueLengthLimitLow' accepts positive values only. Using default low limit instead"); + writequeuelengthlimit_low = 0; + } else if(writequeuelengthlimit_low > writequeuelengthlimit_high) { + ERROR("Config 'WriteQueueLengthLimitLow' (%ld) cannot be bigger than high limit (%ld). Using default low limit instead", + writequeuelengthlimit_low, writequeuelengthlimit_high); + writequeuelengthlimit_low = 0; + } + } + /* Check/compute low limit if not/badly defined */ + if(writequeuelengthlimit_high > 0) { + if(0 == writequeuelengthlimit_low) { + writequeuelengthlimit_low = .5 * writequeuelengthlimit_high; + } + INFO("Config 'WriteQueueLengthLimit*' : Running with limits high=%ld low=%ld", writequeuelengthlimit_high, writequeuelengthlimit_low); + random_seed = time(0); + } else { + writequeuelengthlimit_low = 0; /* This should be useless, but in case... */ + } + } + { char const *tmp = global_option_get ("WriteThreads"); int num = atoi (tmp); @@ -1719,6 +1795,8 @@ void plugin_shutdown_all (void) destroy_all_callbacks (&list_notification); destroy_all_callbacks (&list_shutdown); destroy_all_callbacks (&list_log); + + plugin_free_loaded (); } /* void plugin_shutdown_all */ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ @@ -1942,15 +2020,60 @@ static int plugin_dispatch_values_internal (value_list_t *vl) int plugin_dispatch_values (value_list_t const *vl) { int status; + int wq_size = write_queue_size; + /* We store write_queue_size in a local variable because other threads may update write_queue_size. + * Having this in a local variable (like a cache) is better : we do not need a lock */ + short metric_will_be_dropped = 0; + + if((writequeuelengthlimit_high > 0) && (wq_size > writequeuelengthlimit_low)) { + if(wq_size >= writequeuelengthlimit_high) { + /* if high == low, we come here too */ + metric_will_be_dropped = 1; + } else { + /* here, high != low */ + long probability_to_drop; + long n; + + probability_to_drop = (wq_size - writequeuelengthlimit_low); + + /* We use rand_r with its bad RNG because it's enough for playing dices. + * There is no security consideration here so rand_r() should be enough here. + */ + n = rand_r(&random_seed) % (writequeuelengthlimit_high - writequeuelengthlimit_low) ; + + /* Let's have X = high - low. + * n is in range [0..X] + * probability_to_drop is in range [1..X[ + * probability_to_drop gets bigger when wq_size gets bigger. + */ + if(n <= probability_to_drop) { + metric_will_be_dropped = 1; + } + } + } - status = plugin_write_enqueue (vl); - if (status != 0) + if( ! metric_will_be_dropped) { + status = plugin_write_enqueue (vl); + if (status != 0) + { + char errbuf[1024]; + ERROR ("plugin_dispatch_values: plugin_write_enqueue failed " + "with status %i (%s).", status, + sstrerror (status, errbuf, sizeof (errbuf))); + return (status); + } + } + else { - char errbuf[1024]; - ERROR ("plugin_dispatch_values: plugin_write_enqueue failed " - "with status %i (%s).", status, - sstrerror (status, errbuf, sizeof (errbuf))); - return (status); + /* If you want to count dropped metrics, don't forget to add a lock here */ + /* dropped_metrics++; */ + ERROR ("plugin_dispatch_values: value dropped (write queue %ld > %ld) : time = %.3f; interval = %.3f ; %s/%s%s%s/%s%s%s", + write_queue_size, writequeuelengthlimit_low, + CDTIME_T_TO_DOUBLE (vl->time), + CDTIME_T_TO_DOUBLE (vl->interval), + vl->host, + vl->plugin, vl->plugin_instance[0]?"-":"", vl->plugin_instance, + vl->type, vl->type_instance[0]?"-":"", vl->type_instance); } return (0); @@ -2078,6 +2201,12 @@ const data_set_t *plugin_get_ds (const char *name) { data_set_t *ds; + if (data_sets == NULL) + { + ERROR ("plugin_get_ds: No data sets are defined yet."); + return (NULL); + } + if (c_avl_get (data_sets, name, (void *) &ds) != 0) { DEBUG ("No such dataset registered: %s", name);