X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fplugin.c;h=6c7aa057cf2d272b126eadf6308a76188d3f3b1a;hb=13cd1495b95cb1cbebcbf6abebc171ebcddc192c;hp=4c6a0322f18200f92caee47adbe41ab089ee94f5;hpb=d11c1f68458465f1c9dcfd0396704b7fba7d5804;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index 4c6a0322..6c7aa057 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -31,6 +31,7 @@ #include "utils_llist.h" #include "utils_heap.h" #include "utils_time.h" +#include "utils_random.h" #if HAVE_PTHREAD_H # include @@ -61,7 +62,7 @@ struct read_func_s #define rf_ctx rf_super.cf_ctx callback_func_t rf_super; char rf_group[DATA_MAX_NAME_LEN]; - char rf_name[DATA_MAX_NAME_LEN]; + char *rf_name; int rf_type; cdtime_t rf_interval; cdtime_t rf_effective_interval; @@ -81,6 +82,8 @@ struct write_queue_s /* * Private variables */ +static c_avl_tree_t *plugins_loaded = NULL; + static llist_t *list_init; static llist_t *list_write; static llist_t *list_flush; @@ -106,6 +109,7 @@ static int read_threads_num = 0; static write_queue_t *write_queue_head; static write_queue_t *write_queue_tail; +static long write_queue_length = 0; static _Bool write_loop = 1; static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; @@ -115,6 +119,9 @@ static size_t write_threads_num = 0; static pthread_key_t plugin_ctx_key; static _Bool plugin_ctx_key_initialized = 0; +static long write_limit_high = 0; +static long write_limit_low = 0; + /* * Static functions */ @@ -437,6 +444,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) { DEBUG ("plugin_read_thread: Destroying the `%s' " "callback.", rf->rf_name); + sfree (rf->rf_name); destroy_callback ((callback_func_t *) rf); rf = NULL; continue; @@ -667,11 +675,13 @@ static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */ { write_queue_head = q; write_queue_tail = q; + write_queue_length = 1; } else { write_queue_tail->next = q; write_queue_tail = q; + write_queue_length += 1; } pthread_cond_signal (&write_cond); @@ -698,8 +708,11 @@ static value_list_t *plugin_write_dequeue (void) /* {{{ */ q = write_queue_head; write_queue_head = q->next; - if (write_queue_head == NULL) + write_queue_length -= 1; + if (write_queue_head == NULL) { write_queue_tail = NULL; + assert(0 == write_queue_length); + } pthread_mutex_unlock (&write_lock); @@ -802,6 +815,7 @@ static void stop_write_threads (void) /* {{{ */ } write_queue_head = NULL; write_queue_tail = NULL; + write_queue_length = 0; pthread_mutex_unlock (&write_lock); if (i > 0) @@ -830,8 +844,52 @@ void plugin_set_dir (const char *dir) } } +static _Bool plugin_is_loaded (char const *name) +{ + int status; + + if (plugins_loaded == NULL) + plugins_loaded = c_avl_create ((void *) strcasecmp); + assert (plugins_loaded != NULL); + + status = c_avl_get (plugins_loaded, name, /* ret_value = */ NULL); + return (status == 0); +} + +static int plugin_mark_loaded (char const *name) +{ + char *name_copy; + int status; + + name_copy = strdup (name); + if (name_copy == NULL) + return (ENOMEM); + + status = c_avl_insert (plugins_loaded, + /* key = */ name_copy, /* value = */ NULL); + return (status); +} + +static void plugin_free_loaded () +{ + void *key; + void *value; + + if (plugins_loaded == NULL) + return; + + while (c_avl_pick (plugins_loaded, &key, &value) == 0) + { + sfree (key); + assert (value == NULL); + } + + c_avl_destroy (plugins_loaded); + plugins_loaded = NULL; +} + #define BUFSIZE 512 -int plugin_load (const char *type, uint32_t flags) +int plugin_load (char const *plugin_name, uint32_t flags) { DIR *dh; const char *dir; @@ -843,15 +901,38 @@ int plugin_load (const char *type, uint32_t flags) struct dirent *de; int status; + if (plugin_name == NULL) + return (EINVAL); + + /* Check if plugin is already loaded and don't do anything in this + * case. */ + if (plugin_is_loaded (plugin_name)) + return (0); + dir = plugin_get_dir (); ret = 1; + /* + * XXX: Magic at work: + * + * Some of the language bindings, for example the Python and Perl + * plugins, need to be able to export symbols to the scripts they run. + * For this to happen, the "Globals" flag needs to be set. + * Unfortunately, this technical detail is hard to explain to the + * average user and she shouldn't have to worry about this, ideally. + * So in order to save everyone's sanity use a different default for a + * handful of special plugins. --octo + */ + if ((strcasecmp ("perl", plugin_name) == 0) + || (strcasecmp ("python", plugin_name) == 0)) + flags |= PLUGIN_FLAGS_GLOBAL; + /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the * type when matching the filename */ - status = ssnprintf (typename, sizeof (typename), "%s.so", type); + status = ssnprintf (typename, sizeof (typename), "%s.so", plugin_name); if ((status < 0) || ((size_t) status >= sizeof (typename))) { - WARNING ("plugin_load: Filename too long: \"%s.so\"", type); + WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name); return (-1); } typename_len = strlen (typename); @@ -898,13 +979,14 @@ int plugin_load (const char *type, uint32_t flags) if (status == 0) { /* success */ + plugin_mark_loaded (plugin_name); ret = 0; break; } else { ERROR ("plugin_load: Load plugin \"%s\" failed with " - "status %i.", type, status); + "status %i.", plugin_name, status); } } @@ -912,7 +994,7 @@ int plugin_load (const char *type, uint32_t flags) if (filename[0] == 0) ERROR ("plugin_load: Could not find plugin \"%s\" in %s", - type, dir); + plugin_name, dir); return (ret); } @@ -1048,7 +1130,7 @@ int plugin_register_read (const char *name, rf->rf_udata.free_func = NULL; rf->rf_ctx = plugin_get_ctx (); rf->rf_group[0] = '\0'; - sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); + rf->rf_name = strdup (name); rf->rf_type = RF_SIMPLE; rf->rf_interval = plugin_get_interval (); @@ -1080,7 +1162,7 @@ int plugin_register_complex_read (const char *group, const char *name, sstrncpy (rf->rf_group, group, sizeof (rf->rf_group)); else rf->rf_group[0] = '\0'; - sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); + rf->rf_name = strdup (name); rf->rf_type = RF_COMPLEX; if (interval != NULL) rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval); @@ -1135,6 +1217,27 @@ int plugin_register_shutdown (const char *name, (void *) callback, /* user_data = */ NULL)); } /* int plugin_register_shutdown */ +static void plugin_free_data_sets (void) +{ + void *key; + void *value; + + if (data_sets == NULL) + return; + + while (c_avl_pick (data_sets, &key, &value) == 0) + { + data_set_t *ds = value; + /* key is a pointer to ds->type */ + + sfree (ds->ds); + sfree (ds); + } + + c_avl_destroy (data_sets); + data_sets = NULL; +} /* void plugin_free_data_sets */ + int plugin_register_data_set (const data_set_t *ds) { data_set_t *ds_copy; @@ -1352,7 +1455,8 @@ int plugin_unregister_notification (const char *name) void plugin_init_all (void) { - const char *chain_name; + char const *chain_name; + long write_threads_num; llentry_t *le; int status; @@ -1365,16 +1469,38 @@ void plugin_init_all (void) chain_name = global_option_get ("PostCacheChain"); post_cache_chain = fc_chain_get_by_name (chain_name); + write_limit_high = global_option_get_long ("WriteQueueLimitHigh", + /* default = */ 0); + if (write_limit_high < 0) { - char const *tmp = global_option_get ("WriteThreads"); - int num = atoi (tmp); + ERROR ("WriteQueueLimitHigh must be positive or zero."); + write_limit_high = 0; + } - if (num < 1) - num = 5; + write_limit_low = global_option_get_long ("WriteQueueLimitLow", + /* default = */ write_limit_high / 2); + if (write_limit_low < 0) + { + ERROR ("WriteQueueLimitLow must be positive or zero."); + write_limit_low = write_limit_high / 2; + } + else if (write_limit_low > write_limit_high) + { + ERROR ("WriteQueueLimitLow must not be larger than " + "WriteQueueLimitHigh."); + write_limit_low = write_limit_high; + } - start_write_threads ((size_t) num); + write_threads_num = global_option_get_long ("WriteThreads", + /* default = */ 5); + if (write_threads_num < 1) + { + ERROR ("WriteThreads must be positive."); + write_threads_num = 5; } + start_write_threads ((size_t) write_threads_num); + if ((list_init == NULL) && (read_heap == NULL)) return; @@ -1657,6 +1783,9 @@ void plugin_shutdown_all (void) destroy_all_callbacks (&list_notification); destroy_all_callbacks (&list_shutdown); destroy_all_callbacks (&list_log); + + plugin_free_loaded (); + plugin_free_data_sets (); } /* void plugin_shutdown_all */ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ @@ -1877,10 +2006,76 @@ static int plugin_dispatch_values_internal (value_list_t *vl) return (0); } /* int plugin_dispatch_values_internal */ +static double get_drop_probability (void) /* {{{ */ +{ + long pos; + long size; + long wql; + + pthread_mutex_lock (&write_lock); + wql = write_queue_length; + pthread_mutex_unlock (&write_lock); + + if (wql < write_limit_low) + return (0.0); + if (wql >= write_limit_high) + return (1.0); + + pos = 1 + wql - write_limit_low; + size = 1 + write_limit_high - write_limit_low; + + return (((double) pos) / ((double) size)); +} /* }}} double get_drop_probability */ + +static _Bool check_drop_value (void) /* {{{ */ +{ + static cdtime_t last_message_time = 0; + static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER; + + double p; + double q; + int status; + + if (write_limit_high == 0) + return (0); + + p = get_drop_probability (); + if (p == 0.0) + return (0); + + status = pthread_mutex_trylock (&last_message_lock); + if (status == 0) + { + cdtime_t now; + + now = cdtime (); + if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1)) + { + last_message_time = now; + ERROR ("plugin_dispatch_values: Low water mark " + "reached. Dropping %.0f%% of metrics.", + 100.0 * p); + } + pthread_mutex_unlock (&last_message_lock); + } + + if (p == 1.0) + return (1); + + q = cdrand_d (); + if (q > p) + return (1); + else + return (0); +} /* }}} _Bool check_drop_value */ + int plugin_dispatch_values (value_list_t const *vl) { int status; + if (check_drop_value ()) + return (0); + status = plugin_write_enqueue (vl); if (status != 0) {