Code

Added new WriteQueueLengthLimit (drop values when bigger)
[collectd.git] / src / plugin.c
index 942f8bfe50438066adc80b46ac4175c990f9a642..4e106a5b1eb5193c3188ff87934feff0f4d4fa49 100644 (file)
  **/
 
 #include "collectd.h"
-#include "utils_complain.h"
-
-#include <ltdl.h>
-
-#if HAVE_PTHREAD_H
-# include <pthread.h>
-#endif
-
 #include "common.h"
 #include "plugin.h"
 #include "configfile.h"
+#include "filter_chain.h"
 #include "utils_avltree.h"
+#include "utils_cache.h"
+#include "utils_complain.h"
 #include "utils_llist.h"
 #include "utils_heap.h"
-#include "utils_cache.h"
-#include "filter_chain.h"
+#include "utils_time.h"
+
+#if HAVE_PTHREAD_H
+# include <pthread.h>
+#endif
+
+#include <ltdl.h>
 
 /*
  * Private structures
@@ -61,11 +61,11 @@ struct read_func_s
 #define rf_ctx rf_super.cf_ctx
        callback_func_t rf_super;
        char rf_group[DATA_MAX_NAME_LEN];
-       char rf_name[DATA_MAX_NAME_LEN];
+       char *rf_name;
        int rf_type;
-       struct timespec rf_interval;
-       struct timespec rf_effective_interval;
-       struct timespec rf_next_read;
+       cdtime_t rf_interval;
+       cdtime_t rf_effective_interval;
+       cdtime_t rf_next_read;
 };
 typedef struct read_func_s read_func_t;
 
@@ -81,6 +81,8 @@ struct write_queue_s
 /*
  * Private variables
  */
+static c_avl_tree_t *plugins_loaded = NULL;
+
 static llist_t *list_init;
 static llist_t *list_write;
 static llist_t *list_flush;
@@ -106,6 +108,7 @@ static int             read_threads_num = 0;
 
 static write_queue_t  *write_queue_head;
 static write_queue_t  *write_queue_tail;
+static long           write_queue_size = 0;
 static _Bool           write_loop = 1;
 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
@@ -115,6 +118,11 @@ static size_t          write_threads_num = 0;
 static pthread_key_t   plugin_ctx_key;
 static _Bool           plugin_ctx_key_initialized = 0;
 
+static long            writequeuelengthlimit_high = 0;
+static long            writequeuelengthlimit_low = 0;
+
+static unsigned int    random_seed;
+
 /*
  * Static functions
  */
@@ -360,13 +368,6 @@ static int plugin_load_file (char *file, uint32_t flags)
        return (0);
 }
 
-static _Bool timeout_reached(struct timespec timeout)
-{
-       struct timeval now;
-       gettimeofday(&now, NULL);
-       return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000));
-}
-
 static void *plugin_read_thread (void __attribute__((unused)) *args)
 {
        while (read_loop != 0)
@@ -392,18 +393,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                }
                pthread_mutex_unlock (&read_lock);
 
-               if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0))
+               if (rf->rf_interval == 0)
                {
                        /* this should not happen, because the interval is set
                         * for each plugin when loading it
                         * XXX: issue a warning? */
-                       now = cdtime ();
-
-                       CDTIME_T_TO_TIMESPEC (plugin_get_interval (), &rf->rf_interval);
-
+                       rf->rf_interval = plugin_get_interval ();
                        rf->rf_effective_interval = rf->rf_interval;
 
-                       CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
+                       rf->rf_next_read = cdtime ();
                }
 
                /* sleep until this entry is due,
@@ -415,11 +413,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                 * pthread_cond_timedwait returns. */
                rc = 0;
                while ((read_loop != 0)
-                               && !timeout_reached(rf->rf_next_read)
+                               && (cdtime () < rf->rf_next_read)
                                && rc == 0)
                {
+                       struct timespec ts = { 0 };
+
+                       CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts);
+
                        rc = pthread_cond_timedwait (&read_cond, &read_lock,
-                               &rf->rf_next_read);
+                               &ts);
                }
 
                /* Must hold `read_lock' when accessing `rf->rf_type'. */
@@ -443,6 +445,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                {
                        DEBUG ("plugin_read_thread: Destroying the `%s' "
                                        "callback.", rf->rf_name);
+                       sfree (rf->rf_name);
                        destroy_callback ((callback_func_t *) rf);
                        rf = NULL;
                        continue;
@@ -475,20 +478,14 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                 * intervals in which it will be called. */
                if (status != 0)
                {
-                       rf->rf_effective_interval.tv_sec *= 2;
-                       rf->rf_effective_interval.tv_nsec *= 2;
-                       NORMALIZE_TIMESPEC (rf->rf_effective_interval);
-
-                       if (rf->rf_effective_interval.tv_sec >= 86400)
-                       {
-                               rf->rf_effective_interval.tv_sec = 86400;
-                               rf->rf_effective_interval.tv_nsec = 0;
-                       }
+                       rf->rf_effective_interval *= 2;
+                       if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400))
+                               rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400);
 
                        NOTICE ("read-function of plugin `%s' failed. "
-                                       "Will suspend it for %i seconds.",
+                                       "Will suspend it for %.3f seconds.",
                                        rf->rf_name,
-                                       (int) rf->rf_effective_interval.tv_sec);
+                                       CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
                }
                else
                {
@@ -500,32 +497,26 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                now = cdtime ();
 
                DEBUG ("plugin_read_thread: Effective interval of the "
-                               "%s plugin is %i.%09i.",
+                               "%s plugin is %.3f seconds.",
                                rf->rf_name,
-                               (int) rf->rf_effective_interval.tv_sec,
-                               (int) rf->rf_effective_interval.tv_nsec);
+                               CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
 
                /* Calculate the next (absolute) time at which this function
                 * should be called. */
-               rf->rf_next_read.tv_sec = rf->rf_next_read.tv_sec
-                       + rf->rf_effective_interval.tv_sec;
-               rf->rf_next_read.tv_nsec = rf->rf_next_read.tv_nsec
-                       + rf->rf_effective_interval.tv_nsec;
-               NORMALIZE_TIMESPEC (rf->rf_next_read);
+               rf->rf_next_read += rf->rf_effective_interval;
 
                /* Check, if `rf_next_read' is in the past. */
-               if (TIMESPEC_TO_CDTIME_T (&rf->rf_next_read) < now)
+               if (rf->rf_next_read < now)
                {
                        /* `rf_next_read' is in the past. Insert `now'
                         * so this value doesn't trail off into the
                         * past too much. */
-                       CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
+                       rf->rf_next_read = now;
                }
 
-               DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.",
+               DEBUG ("plugin_read_thread: Next read of the %s plugin at %.3f.",
                                rf->rf_name,
-                               (int) rf->rf_next_read.tv_sec,
-                               (int) rf->rf_next_read.tv_nsec);
+                               CDTIME_T_TO_DOUBLE (rf->rf_next_read));
 
                /* Re-insert this read function into the heap again. */
                c_heap_insert (read_heap, rf);
@@ -685,11 +676,13 @@ static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
        {
                write_queue_head = q;
                write_queue_tail = q;
+               write_queue_size = 1;
        }
        else
        {
                write_queue_tail->next = q;
                write_queue_tail = q;
+               write_queue_size += 1;
        }
 
        pthread_cond_signal (&write_cond);
@@ -716,8 +709,11 @@ static value_list_t *plugin_write_dequeue (void) /* {{{ */
 
        q = write_queue_head;
        write_queue_head = q->next;
-       if (write_queue_head == NULL)
+       write_queue_size -= 1;
+       if (write_queue_head == NULL) {
                write_queue_tail = NULL;
+               write_queue_size = 0; /* Maybe instead of setting write_queue_size to 0, we should assert(write_queue_size == 0) ? */
+               }
 
        pthread_mutex_unlock (&write_lock);
 
@@ -820,6 +816,7 @@ static void stop_write_threads (void) /* {{{ */
        }
        write_queue_head = NULL;
        write_queue_tail = NULL;
+       write_queue_size = 0;
        pthread_mutex_unlock (&write_lock);
 
        if (i > 0)
@@ -848,8 +845,52 @@ void plugin_set_dir (const char *dir)
        }
 }
 
+static _Bool plugin_is_loaded (char const *name)
+{
+       int status;
+
+       if (plugins_loaded == NULL)
+               plugins_loaded = c_avl_create ((void *) strcasecmp);
+       assert (plugins_loaded != NULL);
+
+       status = c_avl_get (plugins_loaded, name, /* ret_value = */ NULL);
+       return (status == 0);
+}
+
+static int plugin_mark_loaded (char const *name)
+{
+       char *name_copy;
+       int status;
+
+       name_copy = strdup (name);
+       if (name_copy == NULL)
+               return (ENOMEM);
+
+       status = c_avl_insert (plugins_loaded,
+                       /* key = */ name_copy, /* value = */ NULL);
+       return (status);
+}
+
+static void plugin_free_loaded ()
+{
+       void *key;
+       void *value;
+
+       if (plugins_loaded == NULL)
+               return;
+
+       while (c_avl_pick (plugins_loaded, &key, &value) == 0)
+       {
+               sfree (key);
+               assert (value == NULL);
+       }
+
+       c_avl_destroy (plugins_loaded);
+       plugins_loaded = NULL;
+}
+
 #define BUFSIZE 512
-int plugin_load (const char *type, uint32_t flags)
+int plugin_load (char const *plugin_name, uint32_t flags)
 {
        DIR  *dh;
        const char *dir;
@@ -861,15 +902,38 @@ int plugin_load (const char *type, uint32_t flags)
        struct dirent *de;
        int status;
 
+       if (plugin_name == NULL)
+               return (EINVAL);
+
+       /* Check if plugin is already loaded and don't do anything in this
+        * case. */
+       if (plugin_is_loaded (plugin_name))
+               return (0);
+
        dir = plugin_get_dir ();
        ret = 1;
 
+       /*
+        * XXX: Magic at work:
+        *
+        * Some of the language bindings, for example the Python and Perl
+        * plugins, need to be able to export symbols to the scripts they run.
+        * For this to happen, the "Globals" flag needs to be set.
+        * Unfortunately, this technical detail is hard to explain to the
+        * average user and she shouldn't have to worry about this, ideally.
+        * So in order to save everyone's sanity use a different default for a
+        * handful of special plugins. --octo
+        */
+       if ((strcasecmp ("perl", plugin_name) == 0)
+                       || (strcasecmp ("python", plugin_name) == 0))
+               flags |= PLUGIN_FLAGS_GLOBAL;
+
        /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
         * type when matching the filename */
-       status = ssnprintf (typename, sizeof (typename), "%s.so", type);
+       status = ssnprintf (typename, sizeof (typename), "%s.so", plugin_name);
        if ((status < 0) || ((size_t) status >= sizeof (typename)))
        {
-               WARNING ("plugin_load: Filename too long: \"%s.so\"", type);
+               WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name);
                return (-1);
        }
        typename_len = strlen (typename);
@@ -916,13 +980,14 @@ int plugin_load (const char *type, uint32_t flags)
                if (status == 0)
                {
                        /* success */
+                       plugin_mark_loaded (plugin_name);
                        ret = 0;
                        break;
                }
                else
                {
                        ERROR ("plugin_load: Load plugin \"%s\" failed with "
-                                       "status %i.", type, status);
+                                       "status %i.", plugin_name, status);
                }
        }
 
@@ -930,7 +995,7 @@ int plugin_load (const char *type, uint32_t flags)
 
        if (filename[0] == 0)
                ERROR ("plugin_load: Could not find plugin \"%s\" in %s",
-                               type, dir);
+                               plugin_name, dir);
 
        return (ret);
 }
@@ -967,13 +1032,9 @@ static int plugin_compare_read_func (const void *arg0, const void *arg1)
        rf0 = arg0;
        rf1 = arg1;
 
-       if (rf0->rf_next_read.tv_sec < rf1->rf_next_read.tv_sec)
-               return (-1);
-       else if (rf0->rf_next_read.tv_sec > rf1->rf_next_read.tv_sec)
-               return (1);
-       else if (rf0->rf_next_read.tv_nsec < rf1->rf_next_read.tv_nsec)
+       if (rf0->rf_next_read < rf1->rf_next_read)
                return (-1);
-       else if (rf0->rf_next_read.tv_nsec > rf1->rf_next_read.tv_nsec)
+       else if (rf0->rf_next_read > rf1->rf_next_read)
                return (1);
        else
                return (0);
@@ -987,6 +1048,9 @@ static int plugin_insert_read (read_func_t *rf)
        int status;
        llentry_t *le;
 
+       rf->rf_next_read = cdtime ();
+       rf->rf_effective_interval = rf->rf_interval;
+
        pthread_mutex_lock (&read_lock);
 
        if (read_list == NULL)
@@ -1048,43 +1112,12 @@ static int plugin_insert_read (read_func_t *rf)
        return (0);
 } /* int plugin_insert_read */
 
-static int read_cb_wrapper (user_data_t *ud)
-{
-       int (*callback) (void);
-
-       if (ud == NULL)
-               return -1;
-
-       callback = ud->data;
-       return callback();
-} /* int read_cb_wrapper */
-
 int plugin_register_read (const char *name,
                int (*callback) (void))
 {
        read_func_t *rf;
-       plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
 
-       if (ctx.interval != 0) {
-               /* If ctx.interval is not zero (== use the plugin or global
-                * interval), we need to use the "complex" read callback,
-                * because only that allows to specify a different interval.
-                * Wrap the callback using read_cb_wrapper(). */
-               struct timespec interval;
-               user_data_t user_data;
-
-               user_data.data = callback;
-               user_data.free_func = NULL;
-
-               CDTIME_T_TO_TIMESPEC (ctx.interval, &interval);
-               return plugin_register_complex_read (/* group = */ NULL,
-                               name, read_cb_wrapper, &interval, &user_data);
-       }
-
-       DEBUG ("plugin_register_read: default_interval = %.3f",
-                       CDTIME_T_TO_DOUBLE(plugin_get_interval ()));
-
        rf = malloc (sizeof (*rf));
        if (rf == NULL)
        {
@@ -1096,13 +1129,11 @@ int plugin_register_read (const char *name,
        rf->rf_callback = (void *) callback;
        rf->rf_udata.data = NULL;
        rf->rf_udata.free_func = NULL;
-       rf->rf_ctx = ctx;
+       rf->rf_ctx = plugin_get_ctx ();
        rf->rf_group[0] = '\0';
-       sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
+       rf->rf_name = strdup (name);
        rf->rf_type = RF_SIMPLE;
-       rf->rf_interval.tv_sec = 0;
-       rf->rf_interval.tv_nsec = 0;
-       rf->rf_effective_interval = rf->rf_interval;
+       rf->rf_interval = plugin_get_interval ();
 
        status = plugin_insert_read (rf);
        if (status != 0)
@@ -1117,7 +1148,6 @@ int plugin_register_complex_read (const char *group, const char *name,
                user_data_t *user_data)
 {
        read_func_t *rf;
-       plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
 
        rf = malloc (sizeof (*rf));
@@ -1133,21 +1163,12 @@ int plugin_register_complex_read (const char *group, const char *name,
                sstrncpy (rf->rf_group, group, sizeof (rf->rf_group));
        else
                rf->rf_group[0] = '\0';
-       sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
+       rf->rf_name = strdup (name);
        rf->rf_type = RF_COMPLEX;
        if (interval != NULL)
-       {
-               rf->rf_interval = *interval;
-       }
-       else if (ctx.interval != 0)
-       {
-               CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval);
-       }
-       rf->rf_effective_interval = rf->rf_interval;
-
-       DEBUG ("plugin_register_read: interval = %i.%09i",
-                       (int) rf->rf_interval.tv_sec,
-                       (int) rf->rf_interval.tv_nsec);
+               rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval);
+       else
+               rf->rf_interval = plugin_get_interval ();
 
        /* Set user data */
        if (user_data == NULL)
@@ -1160,7 +1181,7 @@ int plugin_register_complex_read (const char *group, const char *name,
                rf->rf_udata = *user_data;
        }
 
-       rf->rf_ctx = ctx;
+       rf->rf_ctx = plugin_get_ctx ();
 
        status = plugin_insert_read (rf);
        if (status != 0)
@@ -1427,6 +1448,61 @@ void plugin_init_all (void)
        chain_name = global_option_get ("PostCacheChain");
        post_cache_chain = fc_chain_get_by_name (chain_name);
 
+       {
+               const char *str_writequeuelengthlimithigh = global_option_get ("WriteQueueLengthLimitHigh");
+               const char *str_writequeuelengthlimitlow = global_option_get ("WriteQueueLengthLimitLow");
+
+               writequeuelengthlimit_high = 0;
+               writequeuelengthlimit_low = 0;
+
+               if(NULL != str_writequeuelengthlimithigh) {
+                       errno = 0;
+                       /* get high limit */
+                       writequeuelengthlimit_high = strtol(str_writequeuelengthlimithigh, NULL, 10);
+                       if ((errno == ERANGE && (writequeuelengthlimit_high == LONG_MAX || writequeuelengthlimit_high == LONG_MIN))
+                                       || (errno != 0 && writequeuelengthlimit_high == 0)
+                          ) {
+                               writequeuelengthlimit_high = 0;
+                               ERROR("Config 'WriteQueueLengthLimitHigh' accepts one integer value only. Running with no limit !");
+                       }
+                       if(writequeuelengthlimit_high < 0) {
+                               ERROR("Config 'WriteQueueLengthLimitHigh' accepts positive values only. Running with no limit !");
+                               writequeuelengthlimit_high = 0;
+                       }
+               }
+
+               if((writequeuelengthlimit_high > 0) && (NULL != str_writequeuelengthlimitlow)) {
+                       errno = 0;
+                       /* get low limit */
+                       writequeuelengthlimit_low = strtol(str_writequeuelengthlimitlow, NULL, 10);
+                       if ((errno == ERANGE && (writequeuelengthlimit_low == LONG_MAX || writequeuelengthlimit_low == LONG_MIN))
+                                       || (errno != 0 && writequeuelengthlimit_low == 0)
+                          ) {
+                               writequeuelengthlimit_low = 0;
+                               ERROR("Config 'WriteQueueLengthLimitLow' accepts one integer value only. Using default low limit instead");
+                       }
+
+                       if(writequeuelengthlimit_low < 0) {
+                               ERROR("Config 'WriteQueueLengthLimitLow' accepts positive values only. Using default low limit instead");
+                               writequeuelengthlimit_low = 0;
+                       } else if(writequeuelengthlimit_low > writequeuelengthlimit_high) {
+                               ERROR("Config 'WriteQueueLengthLimitLow' (%ld) cannot be bigger than high limit (%ld). Using default low limit instead",
+                                               writequeuelengthlimit_low, writequeuelengthlimit_high);
+                               writequeuelengthlimit_low = 0;
+                       }
+               }
+               /* Check/compute low limit if not/badly defined */
+               if(writequeuelengthlimit_high > 0) {
+                       if(0 == writequeuelengthlimit_low) {
+                               writequeuelengthlimit_low = .5 * writequeuelengthlimit_high;
+                       }
+                       INFO("Config 'WriteQueueLengthLimit*' : Running with limits high=%ld low=%ld", writequeuelengthlimit_high, writequeuelengthlimit_low);
+                       random_seed = time(0);
+               } else {
+                       writequeuelengthlimit_low = 0; /* This should be useless, but in case... */
+               }
+       }
+
        {
                char const *tmp = global_option_get ("WriteThreads");
                int num = atoi (tmp);
@@ -1719,6 +1795,8 @@ void plugin_shutdown_all (void)
        destroy_all_callbacks (&list_notification);
        destroy_all_callbacks (&list_shutdown);
        destroy_all_callbacks (&list_log);
+
+       plugin_free_loaded ();
 } /* void plugin_shutdown_all */
 
 int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
@@ -1942,15 +2020,60 @@ static int plugin_dispatch_values_internal (value_list_t *vl)
 int plugin_dispatch_values (value_list_t const *vl)
 {
        int status;
+       int wq_size = write_queue_size;
+       /* We store write_queue_size in a local variable because other threads may update write_queue_size.
+        * Having this in a local variable (like a cache) is better : we do not need a lock */
+       short metric_will_be_dropped = 0;
+
+       if((writequeuelengthlimit_high > 0) && (wq_size > writequeuelengthlimit_low)) {
+               if(wq_size >= writequeuelengthlimit_high) {
+                       /* if high == low, we come here too */
+                       metric_will_be_dropped = 1;
+               } else {
+                       /* here, high != low */
+                       long probability_to_drop;
+                       long n;
+
+                       probability_to_drop = (wq_size - writequeuelengthlimit_low);
+
+                       /* We use rand_r with its bad RNG because it's enough for playing dices.
+                        * There is no security consideration here so rand_r() should be enough here.
+                        */
+                       n = rand_r(&random_seed) % (writequeuelengthlimit_high - writequeuelengthlimit_low) ;
+
+                       /* Let's have X = high - low.
+                        *   n is in range [0..X]
+                        *   probability_to_drop is in range [1..X[
+                        *   probability_to_drop gets bigger when wq_size gets bigger.
+                        */
+                       if(n <= probability_to_drop) {
+                               metric_will_be_dropped = 1;
+                       }
+               }
+       }
 
-       status = plugin_write_enqueue (vl);
-       if (status != 0)
+       if( ! metric_will_be_dropped) {
+               status = plugin_write_enqueue (vl);
+               if (status != 0)
+               {
+                       char errbuf[1024];
+                       ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
+                                       "with status %i (%s).", status,
+                                       sstrerror (status, errbuf, sizeof (errbuf)));
+                       return (status);
+               }
+       }
+       else
        {
-               char errbuf[1024];
-               ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
-                               "with status %i (%s).", status,
-                               sstrerror (status, errbuf, sizeof (errbuf)));
-               return (status);
+               /* If you want to count dropped metrics, don't forget to add a lock here */
+               /* dropped_metrics++; */
+               ERROR ("plugin_dispatch_values: value dropped (write queue %ld > %ld) : time = %.3f; interval = %.3f ; %s/%s%s%s/%s%s%s",
+                       write_queue_size, writequeuelengthlimit_low,
+                       CDTIME_T_TO_DOUBLE (vl->time),
+                       CDTIME_T_TO_DOUBLE (vl->interval),
+                       vl->host,
+                       vl->plugin, vl->plugin_instance[0]?"-":"", vl->plugin_instance,
+                       vl->type, vl->type_instance[0]?"-":"", vl->type_instance);
        }
 
        return (0);
@@ -2078,6 +2201,12 @@ const data_set_t *plugin_get_ds (const char *name)
 {
        data_set_t *ds;
 
+       if (data_sets == NULL)
+       {
+               ERROR ("plugin_get_ds: No data sets are defined yet.");
+               return (NULL);
+       }
+
        if (c_avl_get (data_sets, name, (void *) &ds) != 0)
        {
                DEBUG ("No such dataset registered: %s", name);