Code

src/plugin.c: Some fixes for write limits.
[collectd.git] / src / plugin.c
index f23ef07122540085d77c2a9fa4fee2ea187f26da..12f002cf5baba8d3a77b6285e63cca253e04d6a8 100644 (file)
@@ -31,6 +31,7 @@
 #include "utils_llist.h"
 #include "utils_heap.h"
 #include "utils_time.h"
+#include "utils_random.h"
 
 #if HAVE_PTHREAD_H
 # include <pthread.h>
@@ -61,7 +62,7 @@ struct read_func_s
 #define rf_ctx rf_super.cf_ctx
        callback_func_t rf_super;
        char rf_group[DATA_MAX_NAME_LEN];
-       char rf_name[DATA_MAX_NAME_LEN];
+       char *rf_name;
        int rf_type;
        cdtime_t rf_interval;
        cdtime_t rf_effective_interval;
@@ -81,6 +82,8 @@ struct write_queue_s
 /*
  * Private variables
  */
+static c_avl_tree_t *plugins_loaded = NULL;
+
 static llist_t *list_init;
 static llist_t *list_write;
 static llist_t *list_flush;
@@ -106,6 +109,7 @@ static int             read_threads_num = 0;
 
 static write_queue_t  *write_queue_head;
 static write_queue_t  *write_queue_tail;
+static long            write_queue_length = 0;
 static _Bool           write_loop = 1;
 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
@@ -115,6 +119,9 @@ static size_t          write_threads_num = 0;
 static pthread_key_t   plugin_ctx_key;
 static _Bool           plugin_ctx_key_initialized = 0;
 
+static long            write_limit_high = 0;
+static long            write_limit_low = 0;
+
 /*
  * Static functions
  */
@@ -437,6 +444,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                {
                        DEBUG ("plugin_read_thread: Destroying the `%s' "
                                        "callback.", rf->rf_name);
+                       sfree (rf->rf_name);
                        destroy_callback ((callback_func_t *) rf);
                        rf = NULL;
                        continue;
@@ -667,11 +675,13 @@ static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
        {
                write_queue_head = q;
                write_queue_tail = q;
+               write_queue_length = 1;
        }
        else
        {
                write_queue_tail->next = q;
                write_queue_tail = q;
+               write_queue_length += 1;
        }
 
        pthread_cond_signal (&write_cond);
@@ -698,8 +708,11 @@ static value_list_t *plugin_write_dequeue (void) /* {{{ */
 
        q = write_queue_head;
        write_queue_head = q->next;
-       if (write_queue_head == NULL)
+       write_queue_length -= 1;
+       if (write_queue_head == NULL) {
                write_queue_tail = NULL;
+               assert(0 == write_queue_length);
+               }
 
        pthread_mutex_unlock (&write_lock);
 
@@ -802,6 +815,7 @@ static void stop_write_threads (void) /* {{{ */
        }
        write_queue_head = NULL;
        write_queue_tail = NULL;
+       write_queue_length = 0;
        pthread_mutex_unlock (&write_lock);
 
        if (i > 0)
@@ -830,8 +844,52 @@ void plugin_set_dir (const char *dir)
        }
 }
 
+static _Bool plugin_is_loaded (char const *name)
+{
+       int status;
+
+       if (plugins_loaded == NULL)
+               plugins_loaded = c_avl_create ((void *) strcasecmp);
+       assert (plugins_loaded != NULL);
+
+       status = c_avl_get (plugins_loaded, name, /* ret_value = */ NULL);
+       return (status == 0);
+}
+
+static int plugin_mark_loaded (char const *name)
+{
+       char *name_copy;
+       int status;
+
+       name_copy = strdup (name);
+       if (name_copy == NULL)
+               return (ENOMEM);
+
+       status = c_avl_insert (plugins_loaded,
+                       /* key = */ name_copy, /* value = */ NULL);
+       return (status);
+}
+
+static void plugin_free_loaded ()
+{
+       void *key;
+       void *value;
+
+       if (plugins_loaded == NULL)
+               return;
+
+       while (c_avl_pick (plugins_loaded, &key, &value) == 0)
+       {
+               sfree (key);
+               assert (value == NULL);
+       }
+
+       c_avl_destroy (plugins_loaded);
+       plugins_loaded = NULL;
+}
+
 #define BUFSIZE 512
-int plugin_load (const char *type, uint32_t flags)
+int plugin_load (char const *plugin_name, uint32_t flags)
 {
        DIR  *dh;
        const char *dir;
@@ -843,15 +901,38 @@ int plugin_load (const char *type, uint32_t flags)
        struct dirent *de;
        int status;
 
+       if (plugin_name == NULL)
+               return (EINVAL);
+
+       /* Check if plugin is already loaded and don't do anything in this
+        * case. */
+       if (plugin_is_loaded (plugin_name))
+               return (0);
+
        dir = plugin_get_dir ();
        ret = 1;
 
+       /*
+        * XXX: Magic at work:
+        *
+        * Some of the language bindings, for example the Python and Perl
+        * plugins, need to be able to export symbols to the scripts they run.
+        * For this to happen, the "Globals" flag needs to be set.
+        * Unfortunately, this technical detail is hard to explain to the
+        * average user and she shouldn't have to worry about this, ideally.
+        * So in order to save everyone's sanity use a different default for a
+        * handful of special plugins. --octo
+        */
+       if ((strcasecmp ("perl", plugin_name) == 0)
+                       || (strcasecmp ("python", plugin_name) == 0))
+               flags |= PLUGIN_FLAGS_GLOBAL;
+
        /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
         * type when matching the filename */
-       status = ssnprintf (typename, sizeof (typename), "%s.so", type);
+       status = ssnprintf (typename, sizeof (typename), "%s.so", plugin_name);
        if ((status < 0) || ((size_t) status >= sizeof (typename)))
        {
-               WARNING ("plugin_load: Filename too long: \"%s.so\"", type);
+               WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name);
                return (-1);
        }
        typename_len = strlen (typename);
@@ -898,13 +979,14 @@ int plugin_load (const char *type, uint32_t flags)
                if (status == 0)
                {
                        /* success */
+                       plugin_mark_loaded (plugin_name);
                        ret = 0;
                        break;
                }
                else
                {
                        ERROR ("plugin_load: Load plugin \"%s\" failed with "
-                                       "status %i.", type, status);
+                                       "status %i.", plugin_name, status);
                }
        }
 
@@ -912,7 +994,7 @@ int plugin_load (const char *type, uint32_t flags)
 
        if (filename[0] == 0)
                ERROR ("plugin_load: Could not find plugin \"%s\" in %s",
-                               type, dir);
+                               plugin_name, dir);
 
        return (ret);
 }
@@ -965,6 +1047,9 @@ static int plugin_insert_read (read_func_t *rf)
        int status;
        llentry_t *le;
 
+       rf->rf_next_read = cdtime ();
+       rf->rf_effective_interval = rf->rf_interval;
+
        pthread_mutex_lock (&read_lock);
 
        if (read_list == NULL)
@@ -1026,43 +1111,12 @@ static int plugin_insert_read (read_func_t *rf)
        return (0);
 } /* int plugin_insert_read */
 
-static int read_cb_wrapper (user_data_t *ud)
-{
-       int (*callback) (void);
-
-       if (ud == NULL)
-               return -1;
-
-       callback = ud->data;
-       return callback();
-} /* int read_cb_wrapper */
-
 int plugin_register_read (const char *name,
                int (*callback) (void))
 {
        read_func_t *rf;
-       plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
 
-       if (ctx.interval != 0) {
-               /* If ctx.interval is not zero (== use the plugin or global
-                * interval), we need to use the "complex" read callback,
-                * because only that allows to specify a different interval.
-                * Wrap the callback using read_cb_wrapper(). */
-               struct timespec interval;
-               user_data_t user_data;
-
-               user_data.data = callback;
-               user_data.free_func = NULL;
-
-               CDTIME_T_TO_TIMESPEC (ctx.interval, &interval);
-               return plugin_register_complex_read (/* group = */ NULL,
-                               name, read_cb_wrapper, &interval, &user_data);
-       }
-
-       DEBUG ("plugin_register_read: default_interval = %.3f",
-                       CDTIME_T_TO_DOUBLE(plugin_get_interval ()));
-
        rf = malloc (sizeof (*rf));
        if (rf == NULL)
        {
@@ -1074,12 +1128,11 @@ int plugin_register_read (const char *name,
        rf->rf_callback = (void *) callback;
        rf->rf_udata.data = NULL;
        rf->rf_udata.free_func = NULL;
-       rf->rf_ctx = ctx;
+       rf->rf_ctx = plugin_get_ctx ();
        rf->rf_group[0] = '\0';
-       sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
+       rf->rf_name = strdup (name);
        rf->rf_type = RF_SIMPLE;
-       rf->rf_interval = 0;
-       rf->rf_effective_interval = rf->rf_interval;
+       rf->rf_interval = plugin_get_interval ();
 
        status = plugin_insert_read (rf);
        if (status != 0)
@@ -1094,7 +1147,6 @@ int plugin_register_complex_read (const char *group, const char *name,
                user_data_t *user_data)
 {
        read_func_t *rf;
-       plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
 
        rf = malloc (sizeof (*rf));
@@ -1110,20 +1162,12 @@ int plugin_register_complex_read (const char *group, const char *name,
                sstrncpy (rf->rf_group, group, sizeof (rf->rf_group));
        else
                rf->rf_group[0] = '\0';
-       sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
+       rf->rf_name = strdup (name);
        rf->rf_type = RF_COMPLEX;
        if (interval != NULL)
-       {
                rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval);
-       }
-       else if (ctx.interval != 0)
-       {
-               rf->rf_interval = ctx.interval;
-       }
-       rf->rf_effective_interval = rf->rf_interval;
-
-       DEBUG ("plugin_register_read: interval = %.3f",
-                       CDTIME_T_TO_DOUBLE (rf->rf_interval));
+       else
+               rf->rf_interval = plugin_get_interval ();
 
        /* Set user data */
        if (user_data == NULL)
@@ -1136,7 +1180,7 @@ int plugin_register_complex_read (const char *group, const char *name,
                rf->rf_udata = *user_data;
        }
 
-       rf->rf_ctx = ctx;
+       rf->rf_ctx = plugin_get_ctx ();
 
        status = plugin_insert_read (rf);
        if (status != 0)
@@ -1390,7 +1434,8 @@ int plugin_unregister_notification (const char *name)
 
 void plugin_init_all (void)
 {
-       const char *chain_name;
+       char const *chain_name;
+       long write_threads_num;
        llentry_t *le;
        int status;
 
@@ -1403,16 +1448,38 @@ void plugin_init_all (void)
        chain_name = global_option_get ("PostCacheChain");
        post_cache_chain = fc_chain_get_by_name (chain_name);
 
+       write_limit_high = global_option_get_long ("WriteQueueLimitHigh",
+                       /* default = */ 0);
+       if (write_limit_high < 0)
        {
-               char const *tmp = global_option_get ("WriteThreads");
-               int num = atoi (tmp);
+               ERROR ("WriteQueueLimitHigh must be positive or zero.");
+               write_limit_high = 0;
+       }
 
-               if (num < 1)
-                       num = 5;
+       write_limit_low = global_option_get_long ("WriteQueueLimitLow",
+                       /* default = */ write_limit_high / 2);
+       if (write_limit_low < 0)
+       {
+               ERROR ("WriteQueueLimitLow must be positive or zero.");
+               write_limit_low = write_limit_high / 2;
+       }
+       else if (write_limit_low > write_limit_high)
+       {
+               ERROR ("WriteQueueLimitLow must not be larger than "
+                               "WriteQueueLimitHigh.");
+               write_limit_low = write_limit_high;
+       }
 
-               start_write_threads ((size_t) num);
+       write_threads_num = global_option_get_long ("WriteThreads",
+                       /* default = */ 5);
+       if (write_threads_num < 1)
+       {
+               ERROR ("WriteThreads must be positive.");
+               write_threads_num = 5;
        }
 
+       start_write_threads ((size_t) write_threads_num);
+
        if ((list_init == NULL) && (read_heap == NULL))
                return;
 
@@ -1695,6 +1762,8 @@ void plugin_shutdown_all (void)
        destroy_all_callbacks (&list_notification);
        destroy_all_callbacks (&list_shutdown);
        destroy_all_callbacks (&list_log);
+
+       plugin_free_loaded ();
 } /* void plugin_shutdown_all */
 
 int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
@@ -1915,10 +1984,76 @@ static int plugin_dispatch_values_internal (value_list_t *vl)
        return (0);
 } /* int plugin_dispatch_values_internal */
 
+static double get_drop_probability (void) /* {{{ */
+{
+       long pos;
+       long size;
+       long wql;
+
+       pthread_mutex_lock (&write_lock);
+       wql = write_queue_length;
+       pthread_mutex_unlock (&write_lock);
+
+       if (wql < write_limit_low)
+               return (0.0);
+       if (wql >= write_limit_high)
+               return (1.0);
+
+       pos = 1 + wql - write_limit_low;
+       size = 1 + write_limit_high - write_limit_low;
+
+       return (((double) pos) / ((double) size));
+} /* }}} double get_drop_probability */
+
+static _Bool check_drop_value (void) /* {{{ */
+{
+       static cdtime_t last_message_time = 0;
+       static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
+
+       double p;
+       double q;
+       int status;
+
+       if (write_limit_high == 0)
+               return (0);
+
+       p = get_drop_probability ();
+       if (p == 0.0)
+               return (0);
+
+       status = pthread_mutex_trylock (&last_message_lock);
+       if (status == 0)
+       {
+               cdtime_t now;
+
+               now = cdtime ();
+               if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
+               {
+                       last_message_time = now;
+                       ERROR ("plugin_dispatch_values: Low water mark "
+                                       "reached. Dropping %.0f%% of metrics.",
+                                       100.0 * p);
+               }
+               pthread_mutex_unlock (&last_message_lock);
+       }
+
+       if (p == 1.0)
+               return (1);
+
+       q = cdrand_d ();
+       if (q > p)
+               return (1);
+       else
+               return (0);
+} /* }}} _Bool check_drop_value */
+
 int plugin_dispatch_values (value_list_t const *vl)
 {
        int status;
 
+       if (check_drop_value ())
+               return (0);
+
        status = plugin_write_enqueue (vl);
        if (status != 0)
        {
@@ -2054,6 +2189,12 @@ const data_set_t *plugin_get_ds (const char *name)
 {
        data_set_t *ds;
 
+       if (data_sets == NULL)
+       {
+               ERROR ("plugin_get_ds: No data sets are defined yet.");
+               return (NULL);
+       }
+
        if (c_avl_get (data_sets, name, (void *) &ds) != 0)
        {
                DEBUG ("No such dataset registered: %s", name);