Code

plugin: Introduced basic support for per-plugin intervals.
authorSebastian Harl <sh@tokkee.org>
Thu, 2 Feb 2012 06:44:48 +0000 (07:44 +0100)
committerSebastian Harl <sh@tokkee.org>
Fri, 3 Feb 2012 14:04:30 +0000 (15:04 +0100)
This is based on a newly introduced "plugin context", which stores plugin-
related settings (currently the plugin interval) for each registered callback.
The context is initialized when loading the plugin (LoadPlugin), setting the
interval to the value of the newly introduced "Interval" option of the
LoadPlugin block, if specified.

The context is set (using thread-specific data) and restored before and after
calling any callback.

This way, single plugins don't need to take special care in order to benefit
from the new feature. plugin.c will handle (most) situations automatically.

VALUE_LIST_INIT() has been changed to honor the plugin interval settings (if
any). As a helper, the new 'plugin_interval' macro may be used to access the
current plugin's interval (read-only).

Thanks to Florian for the initial idea!

src/collectd.c
src/configfile.c
src/configfile.h
src/plugin.c
src/plugin.h

index d33d1d6685ff4755d9ab460a526313b468d6f71d..c3dc7381baee532a6ef153050f6fba016337240c 100644 (file)
@@ -468,6 +468,8 @@ int main (int argc, char **argv)
        if (optind < argc)
                exit_usage (1);
 
+       plugin_init_ctx ();
+
        /*
         * Read options from the config file, the environment and the command
         * line (in that order, with later options overwriting previous ones in
index 7c8347b15b20a1aa8d73011a1255105f5937035e..2a03035c6659ce4c85edc9e979b3f99ff440b206 100644 (file)
@@ -46,6 +46,7 @@ typedef struct cf_callback
        int  (*callback) (const char *, const char *);
        const char **keys;
        int    keys_num;
+       plugin_ctx_t ctx;
        struct cf_callback *next;
 } cf_callback_t;
 
@@ -53,6 +54,7 @@ typedef struct cf_complex_callback_s
 {
        char *type;
        int (*callback) (oconfig_item_t *);
+       plugin_ctx_t ctx;
        struct cf_complex_callback_s *next;
 } cf_complex_callback_t;
 
@@ -128,6 +130,7 @@ static int cf_dispatch (const char *type, const char *orig_key,
                const char *orig_value)
 {
        cf_callback_t *cf_cb;
+       plugin_ctx_t old_ctx;
        char *key;
        char *value;
        int ret;
@@ -156,6 +159,8 @@ static int cf_dispatch (const char *type, const char *orig_key,
 
        ret = -1;
 
+       old_ctx = plugin_set_ctx (cf_cb->ctx);
+
        for (i = 0; i < cf_cb->keys_num; i++)
        {
                if ((cf_cb->keys[i] != NULL)
@@ -166,6 +171,8 @@ static int cf_dispatch (const char *type, const char *orig_key,
                }
        }
 
+       plugin_set_ctx (old_ctx);
+
        if (i >= cf_cb->keys_num)
                WARNING ("Plugin `%s' did not register for value `%s'.", type, key);
 
@@ -244,6 +251,7 @@ static int dispatch_loadplugin (const oconfig_item_t *ci)
        int i;
        const char *name;
        unsigned int flags = 0;
+       plugin_ctx_t ctx;
        assert (strcasecmp (ci->key, "LoadPlugin") == 0);
 
        if (ci->values_num != 1)
@@ -252,6 +260,7 @@ static int dispatch_loadplugin (const oconfig_item_t *ci)
                return (-1);
 
        name = ci->values[0].value.string;
+       ctx.interval = 0;
 
        /*
         * XXX: Magic at work:
@@ -271,6 +280,16 @@ static int dispatch_loadplugin (const oconfig_item_t *ci)
        for (i = 0; i < ci->children_num; ++i) {
                if (strcasecmp("Globals", ci->children[i].key) == 0)
                        cf_util_get_flag (ci->children + i, &flags, PLUGIN_FLAGS_GLOBAL);
+               else if (strcasecmp ("Interval", ci->children[i].key) == 0) {
+                       double interval = 0.0;
+
+                       if (cf_util_get_double (ci->children + i, &interval) != 0) {
+                               /* cf_util_get_double will log an error */
+                               continue;
+                       }
+
+                       ctx.interval = DOUBLE_TO_CDTIME_T (interval);
+               }
                else {
                        WARNING("Ignoring unknown LoadPlugin option \"%s\" "
                                        "for plugin \"%s\"",
@@ -278,6 +297,7 @@ static int dispatch_loadplugin (const oconfig_item_t *ci)
                }
        }
 
+       plugin_set_ctx (ctx);
        return (plugin_load (name, (uint32_t) flags));
 } /* int dispatch_value_loadplugin */
 
@@ -357,8 +377,18 @@ static int dispatch_block_plugin (oconfig_item_t *ci)
 
        /* Check for a complex callback first */
        for (cb = complex_callback_head; cb != NULL; cb = cb->next)
+       {
                if (strcasecmp (name, cb->type) == 0)
-                       return (cb->callback (ci));
+               {
+                       plugin_ctx_t old_ctx;
+                       int ret_val;
+
+                       old_ctx = plugin_set_ctx (cb->ctx);
+                       ret_val = (cb->callback (ci));
+                       plugin_set_ctx (old_ctx);
+                       return (ret_val);
+               }
+       }
 
        /* Hm, no complex plugin found. Dispatch the values one by one */
        for (i = 0; i < ci->children_num; i++)
@@ -884,6 +914,7 @@ void cf_register (const char *type,
        cf_cb->callback = callback;
        cf_cb->keys     = keys;
        cf_cb->keys_num = keys_num;
+       cf_cb->ctx      = plugin_get_ctx ();
 
        cf_cb->next = first_callback;
        first_callback = cf_cb;
@@ -907,6 +938,8 @@ int cf_register_complex (const char *type, int (*callback) (oconfig_item_t *))
        new->callback = callback;
        new->next = NULL;
 
+       new->ctx = plugin_get_ctx ();
+
        if (complex_callback_head == NULL)
        {
                complex_callback_head = new;
@@ -1015,6 +1048,23 @@ int cf_util_get_int (const oconfig_item_t *ci, int *ret_value) /* {{{ */
        return (0);
 } /* }}} int cf_util_get_int */
 
+int cf_util_get_double (const oconfig_item_t *ci, double *ret_value) /* {{{ */
+{
+       if ((ci == NULL) || (ret_value == NULL))
+               return (EINVAL);
+
+       if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER))
+       {
+               ERROR ("cf_util_get_double: The %s option requires "
+                               "exactly one numeric argument.", ci->key);
+               return (-1);
+       }
+
+       *ret_value = ci->values[0].value.number;
+
+       return (0);
+} /* }}} int cf_util_get_double */
+
 int cf_util_get_boolean (const oconfig_item_t *ci, _Bool *ret_bool) /* {{{ */
 {
        if ((ci == NULL) || (ret_bool == NULL))
index e63a0ea047533257bde40be867bc0394060a884f..59ea55428e43e9752a91c3c9e82675fc0d7ba91b 100644 (file)
@@ -100,6 +100,9 @@ int cf_util_get_string_buffer (const oconfig_item_t *ci, char *buffer,
 /* Assures the config option is a number and returns it as an int. */
 int cf_util_get_int (const oconfig_item_t *ci, int *ret_value);
 
+/* Assures the config option is a number and returns it as a double. */
+int cf_util_get_double (const oconfig_item_t *ci, double *ret_value);
+
 /* Assures the config option is a boolean and assignes it to `ret_bool'.
  * Otherwise, `ret_bool' is not changed and non-zero is returned. */
 int cf_util_get_boolean (const oconfig_item_t *ci, _Bool *ret_bool);
index 91c40b6bad115203cb021f95bd919583310c9460..7d66bd87670953cdfacbacd75ecfaeaa3cd4cc41 100644 (file)
@@ -45,6 +45,7 @@ struct callback_func_s
 {
        void *cf_callback;
        user_data_t cf_udata;
+       plugin_ctx_t cf_ctx;
 };
 typedef struct callback_func_s callback_func_t;
 
@@ -57,6 +58,7 @@ struct read_func_s
         * The `rf_super' member MUST be the first one in this structure! */
 #define rf_callback rf_super.cf_callback
 #define rf_udata rf_super.cf_udata
+#define rf_ctx rf_super.cf_ctx
        callback_func_t rf_super;
        char rf_group[DATA_MAX_NAME_LEN];
        char rf_name[DATA_MAX_NAME_LEN];
@@ -93,6 +95,8 @@ static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t      *read_threads = NULL;
 static int             read_threads_num = 0;
 
+static pthread_key_t   plugin_ctx_key;
+
 /*
  * Static functions
  */
@@ -246,6 +250,8 @@ static int create_register_callback (llist_t **list, /* {{{ */
                cf->cf_udata = *ud;
        }
 
+       cf->cf_ctx = plugin_get_ctx ();
+
        return (register_callback (list, name, cf));
 } /* }}} int create_register_callback */
 
@@ -346,6 +352,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
        while (read_loop != 0)
        {
                read_func_t *rf;
+               plugin_ctx_t old_ctx;
                cdtime_t now;
                int status;
                int rf_type;
@@ -423,6 +430,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
 
                DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
 
+               old_ctx = plugin_set_ctx (rf->rf_ctx);
+
                if (rf_type == RF_SIMPLE)
                {
                        int (*callback) (void);
@@ -440,6 +449,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                        status = (*callback) (&rf->rf_udata);
                }
 
+               plugin_set_ctx (old_ctx);
+
                /* If the function signals failure, we will increase the
                 * intervals in which it will be called. */
                if (status != 0)
@@ -773,12 +784,39 @@ static int plugin_insert_read (read_func_t *rf)
        return (0);
 } /* int plugin_insert_read */
 
+static int read_cb_wrapper (user_data_t *ud)
+{
+       int (*callback) (void);
+
+       if (ud == NULL)
+               return -1;
+
+       callback = ud->data;
+       return callback();
+} /* int read_cb_wrapper */
+
 int plugin_register_read (const char *name,
                int (*callback) (void))
 {
        read_func_t *rf;
+       plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
 
+       if (ctx.interval != 0) {
+               struct timespec interval;
+               user_data_t user_data;
+
+               DEBUG ("plugin_register_read: plugin_interval = %.3f",
+                               CDTIME_T_TO_DOUBLE(plugin_interval));
+
+               user_data.data = callback;
+               user_data.free_func = NULL;
+
+               CDTIME_T_TO_TIMESPEC (ctx.interval, &interval);
+               return plugin_register_complex_read (/* group = */ NULL,
+                               name, read_cb_wrapper, &interval, &user_data);
+       }
+
        rf = malloc (sizeof (*rf));
        if (rf == NULL)
        {
@@ -790,6 +828,7 @@ int plugin_register_read (const char *name,
        rf->rf_callback = (void *) callback;
        rf->rf_udata.data = NULL;
        rf->rf_udata.free_func = NULL;
+       rf->rf_ctx = ctx;
        rf->rf_group[0] = '\0';
        sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
        rf->rf_type = RF_SIMPLE;
@@ -810,6 +849,7 @@ int plugin_register_complex_read (const char *group, const char *name,
                user_data_t *user_data)
 {
        read_func_t *rf;
+       plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
 
        rf = malloc (sizeof (*rf));
@@ -831,6 +871,10 @@ int plugin_register_complex_read (const char *group, const char *name,
        {
                rf->rf_interval = *interval;
        }
+       else if (ctx.interval != 0)
+       {
+               CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval);
+       }
        rf->rf_effective_interval = rf->rf_interval;
 
        /* Set user data */
@@ -844,6 +888,8 @@ int plugin_register_complex_read (const char *group, const char *name,
                rf->rf_udata = *user_data;
        }
 
+       rf->rf_ctx = ctx;
+
        status = plugin_insert_read (rf);
        if (status != 0)
                sfree (rf);
@@ -1121,10 +1167,13 @@ void plugin_init_all (void)
        {
                callback_func_t *cf;
                plugin_init_cb callback;
+               plugin_ctx_t old_ctx;
 
                cf = le->value;
+               old_ctx = plugin_set_ctx (cf->cf_ctx);
                callback = cf->cf_callback;
                status = (*callback) ();
+               plugin_set_ctx (old_ctx);
 
                if (status != 0)
                {
@@ -1177,11 +1226,14 @@ int plugin_read_all_once (void)
        while (42)
        {
                read_func_t *rf;
+               plugin_ctx_t old_ctx;
 
                rf = c_heap_get_root (read_heap);
                if (rf == NULL)
                        break;
 
+               old_ctx = plugin_set_ctx (rf->rf_ctx);
+
                if (rf->rf_type == RF_SIMPLE)
                {
                        int (*callback) (void);
@@ -1197,6 +1249,8 @@ int plugin_read_all_once (void)
                        status = (*callback) (&rf->rf_udata);
                }
 
+               plugin_set_ctx (old_ctx);
+
                if (status != 0)
                {
                        NOTICE ("read-function of plugin `%s' failed.",
@@ -1242,6 +1296,7 @@ int plugin_write (const char *plugin, /* {{{ */
     {
       callback_func_t *cf = le->value;
       plugin_write_cb callback;
+      plugin_ctx_t old_ctx = plugin_set_ctx (cf->cf_ctx);
 
       DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
       callback = cf->cf_callback;
@@ -1251,6 +1306,8 @@ int plugin_write (const char *plugin, /* {{{ */
       else
         success++;
 
+      plugin_set_ctx (old_ctx);
+
       le = le->next;
     }
 
@@ -1263,6 +1320,7 @@ int plugin_write (const char *plugin, /* {{{ */
   {
     callback_func_t *cf;
     plugin_write_cb callback;
+    plugin_ctx_t old_ctx;
 
     le = llist_head (list_write);
     while (le != NULL)
@@ -1278,9 +1336,13 @@ int plugin_write (const char *plugin, /* {{{ */
 
     cf = le->value;
 
+    old_ctx = plugin_set_ctx (cf->cf_ctx);
+
     DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
     callback = cf->cf_callback;
     status = (*callback) (ds, vl, &cf->cf_udata);
+
+    plugin_set_ctx (old_ctx);
   }
 
   return (status);
@@ -1298,6 +1360,7 @@ int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
   {
     callback_func_t *cf;
     plugin_flush_cb callback;
+    plugin_ctx_t old_ctx;
 
     if ((plugin != NULL)
         && (strcmp (plugin, le->key) != 0))
@@ -1307,10 +1370,13 @@ int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
     }
 
     cf = le->value;
+    old_ctx = plugin_set_ctx (cf->cf_ctx);
     callback = cf->cf_callback;
 
     (*callback) (timeout, identifier, &cf->cf_udata);
 
+    plugin_set_ctx (old_ctx);
+
     le = le->next;
   }
   return (0);
@@ -1343,8 +1409,10 @@ void plugin_shutdown_all (void)
        {
                callback_func_t *cf;
                plugin_shutdown_cb callback;
+               plugin_ctx_t old_ctx;
 
                cf = le->value;
+               old_ctx = plugin_set_ctx (cf->cf_ctx);
                callback = cf->cf_callback;
 
                /* Advance the pointer before calling the callback allows
@@ -1354,6 +1422,8 @@ void plugin_shutdown_all (void)
                le = le->next;
 
                (*callback) ();
+
+               plugin_set_ctx (old_ctx);
        }
 
        /* Write plugins which use the `user_data' pointer usually need the
@@ -1382,12 +1452,15 @@ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
   {
     callback_func_t *cf;
     plugin_missing_cb callback;
+    plugin_ctx_t old_ctx;
     int status;
 
     cf = le->value;
+    old_ctx = plugin_set_ctx (cf->cf_ctx);
     callback = cf->cf_callback;
 
     status = (*callback) (vl, &cf->cf_udata);
+    plugin_set_ctx (old_ctx);
     if (status != 0)
     {
       if (status < 0)
@@ -1462,8 +1535,14 @@ int plugin_dispatch_values (value_list_t *vl)
        if (vl->time == 0)
                vl->time = cdtime ();
 
-       if (vl->interval <= 0)
-               vl->interval = interval_g;
+       if (vl->interval <= 0) {
+               plugin_ctx_t ctx = plugin_get_ctx ();
+
+               if (ctx.interval != 0)
+                       vl->interval = ctx.interval;
+               else
+                       vl->interval = interval_g;
+       }
 
        DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
                        "host = %s; "
@@ -1651,11 +1730,14 @@ int plugin_dispatch_notification (const notification_t *notif)
        {
                callback_func_t *cf;
                plugin_notification_cb callback;
+               plugin_ctx_t old_ctx;
                int status;
 
                cf = le->value;
+               old_ctx = plugin_set_ctx (cf->cf_ctx);
                callback = cf->cf_callback;
                status = (*callback) (notif, &cf->cf_udata);
+               plugin_set_ctx (old_ctx);
                if (status != 0)
                {
                        WARNING ("plugin_dispatch_notification: Notification "
@@ -1696,12 +1778,15 @@ void plugin_log (int level, const char *format, ...)
        {
                callback_func_t *cf;
                plugin_log_cb callback;
+               plugin_ctx_t old_ctx;
 
                cf = le->value;
+               old_ctx = plugin_set_ctx (cf->cf_ctx);
                callback = cf->cf_callback;
 
                (*callback) (level, msg, &cf->cf_udata);
 
+               plugin_set_ctx (old_ctx);
                le = le->next;
        }
 } /* void plugin_log */
@@ -1894,4 +1979,70 @@ int plugin_notification_meta_free (notification_meta_t *n)
   return (0);
 } /* int plugin_notification_meta_free */
 
+static void plugin_ctx_destructor (void *ctx)
+{
+       sfree (ctx);
+} /* void plugin_ctx_destructor */
+
+static plugin_ctx_t ctx_init = { /* interval = */ 0 };
+
+static plugin_ctx_t *plugin_ctx_create (void)
+{
+       plugin_ctx_t *ctx;
+
+       ctx = malloc (sizeof (*ctx));
+       if (ctx == NULL) {
+               char errbuf[1024];
+               ERROR ("Failed to allocate plugin context: %s",
+                               sstrerror (errno, errbuf, sizeof (errbuf)));
+               return NULL;
+       }
+
+       *ctx = ctx_init;
+       pthread_setspecific (plugin_ctx_key, ctx);
+       DEBUG("Created new plugin context.");
+       return (ctx);
+} /* int plugin_ctx_create */
+
+void plugin_init_ctx (void)
+{
+       pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor);
+} /* void plugin_init_ctx */
+
+plugin_ctx_t plugin_get_ctx (void)
+{
+       plugin_ctx_t *ctx;
+
+       ctx = pthread_getspecific (plugin_ctx_key);
+
+       if (ctx == NULL) {
+               ctx = plugin_ctx_create ();
+               /* this must no happen -- exit() instead? */
+               if (ctx == NULL)
+                       return ctx_init;
+       }
+
+       return (*ctx);
+} /* plugin_ctx_t plugin_get_ctx */
+
+plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx)
+{
+       plugin_ctx_t *c;
+       plugin_ctx_t old;
+
+       c = pthread_getspecific (plugin_ctx_key);
+
+       if (c == NULL) {
+               c = plugin_ctx_create ();
+               /* this must no happen -- exit() instead? */
+               if (c == NULL)
+                       return ctx_init;
+       }
+
+       old = *c;
+       *c = ctx;
+
+       return (old);
+} /* void plugin_set_ctx */
+
 /* vim: set sw=8 ts=8 noet fdm=marker : */
index 86d403400e88af2c04e987548aed48ad5d98fd57..a2f7f098113f84a1453c0a37077e2e0bc224c37b 100644 (file)
@@ -65,6 +65,8 @@
 #define NOTIF_WARNING 2
 #define NOTIF_OKAY    4
 
+#define plugin_interval (plugin_get_ctx().interval)
+
 /*
  * Public data types
  */
@@ -97,7 +99,9 @@ struct value_list_s
 };
 typedef struct value_list_s value_list_t;
 
-#define VALUE_LIST_INIT { NULL, 0, 0, interval_g, "localhost", "", "", "", "", NULL }
+#define VALUE_LIST_INIT { NULL, 0, 0, \
+       (plugin_interval > 0) ? plugin_interval : interval_g, \
+       "localhost", "", "", "", "", NULL }
 #define VALUE_LIST_STATIC { NULL, 0, 0, 0, "localhost", "", "", "", "", NULL }
 
 struct data_source_s
@@ -161,6 +165,12 @@ struct user_data_s
 };
 typedef struct user_data_s user_data_t;
 
+struct plugin_ctx_s
+{
+       cdtime_t interval;
+};
+typedef struct plugin_ctx_s plugin_ctx_t;
+
 /*
  * Callback types
  */
@@ -359,4 +369,13 @@ int plugin_notification_meta_copy (notification_t *dst,
 
 int plugin_notification_meta_free (notification_meta_t *n);
 
+/*
+ * Plugin context management.
+ */
+
+void plugin_init_ctx (void);
+
+plugin_ctx_t plugin_get_ctx (void);
+plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx);
+
 #endif /* PLUGIN_H */