Code

Merge branch 'sh/plugin_interval'
[collectd.git] / src / plugin.c
index cdd56bd79dd8f2f4d0c6374c7d33fe5ebbe224d7..c69046c3fab5f2e4192814994fafb6a15d6bd781 100644 (file)
@@ -45,6 +45,7 @@ struct callback_func_s
 {
        void *cf_callback;
        user_data_t cf_udata;
+       plugin_ctx_t cf_ctx;
 };
 typedef struct callback_func_s callback_func_t;
 
@@ -57,6 +58,7 @@ struct read_func_s
         * The `rf_super' member MUST be the first one in this structure! */
 #define rf_callback rf_super.cf_callback
 #define rf_udata rf_super.cf_udata
+#define rf_ctx rf_super.cf_ctx
        callback_func_t rf_super;
        char rf_group[DATA_MAX_NAME_LEN];
        char rf_name[DATA_MAX_NAME_LEN];
@@ -93,6 +95,9 @@ static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t      *read_threads = NULL;
 static int             read_threads_num = 0;
 
+static pthread_key_t   plugin_ctx_key;
+static _Bool           plugin_ctx_key_initialized = 0;
+
 /*
  * Static functions
  */
@@ -246,6 +251,8 @@ static int create_register_callback (llist_t **list, /* {{{ */
                cf->cf_udata = *ud;
        }
 
+       cf->cf_ctx = plugin_get_ctx ();
+
        return (register_callback (list, name, cf));
 } /* }}} int create_register_callback */
 
@@ -291,7 +298,7 @@ static int plugin_load_file (char *file, uint32_t flags)
                dlh = lt_dlopenadvise(file, advise);
                lt_dladvise_destroy(&advise);
        } else {
-               dlh = lt_dlopen (file);
+               dlh = lt_dlopen (file);
        }
 #else /* if LIBTOOL_VERSION == 1 */
        if (flags & PLUGIN_FLAGS_GLOBAL)
@@ -346,33 +353,34 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
        while (read_loop != 0)
        {
                read_func_t *rf;
+               plugin_ctx_t old_ctx;
                cdtime_t now;
                int status;
                int rf_type;
                int rc;
 
-               /* Get the read function that needs to be read next. */
+               /* Get the read function that needs to be read next.
+                * We don't need to hold "read_lock" for the heap, but we need
+                * to call c_heap_get_root() and pthread_cond_wait() in the
+                * same protected block. */
+               pthread_mutex_lock (&read_lock);
                rf = c_heap_get_root (read_heap);
                if (rf == NULL)
                {
-                       struct timespec abstime;
-
-                       now = cdtime ();
-
-                       CDTIME_T_TO_TIMESPEC (now + interval_g, &abstime);
-
-                       pthread_mutex_lock (&read_lock);
-                       pthread_cond_timedwait (&read_cond, &read_lock,
-                                       &abstime);
-                       pthread_mutex_unlock (&read_lock);
+                       pthread_cond_wait (&read_cond, &read_lock);
+                        pthread_mutex_unlock (&read_lock);
                        continue;
                }
+               pthread_mutex_unlock (&read_lock);
 
                if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0))
                {
+                       /* this should not happen, because the interval is set
+                        * for each plugin when loading it
+                        * XXX: issue a warning? */
                        now = cdtime ();
 
-                       CDTIME_T_TO_TIMESPEC (interval_g, &rf->rf_interval);
+                       CDTIME_T_TO_TIMESPEC (plugin_get_interval (), &rf->rf_interval);
 
                        rf->rf_effective_interval = rf->rf_interval;
 
@@ -423,6 +431,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
 
                DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
 
+               old_ctx = plugin_set_ctx (rf->rf_ctx);
+
                if (rf_type == RF_SIMPLE)
                {
                        int (*callback) (void);
@@ -440,6 +450,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                        status = (*callback) (&rf->rf_udata);
                }
 
+               plugin_set_ctx (old_ctx);
+
                /* If the function signals failure, we will increase the
                 * intervals in which it will be called. */
                if (status != 0)
@@ -769,16 +781,49 @@ static int plugin_insert_read (read_func_t *rf)
        /* This does not fail. */
        llist_append (read_list, le);
 
+       /* Wake up all the read threads. */
+       pthread_cond_broadcast (&read_cond);
        pthread_mutex_unlock (&read_lock);
        return (0);
 } /* int plugin_insert_read */
 
+static int read_cb_wrapper (user_data_t *ud)
+{
+       int (*callback) (void);
+
+       if (ud == NULL)
+               return -1;
+
+       callback = ud->data;
+       return callback();
+} /* int read_cb_wrapper */
+
 int plugin_register_read (const char *name,
                int (*callback) (void))
 {
        read_func_t *rf;
+       plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
 
+       if (ctx.interval != 0) {
+               /* If ctx.interval is not zero (== use the plugin or global
+                * interval), we need to use the "complex" read callback,
+                * because only that allows to specify a different interval.
+                * Wrap the callback using read_cb_wrapper(). */
+               struct timespec interval;
+               user_data_t user_data;
+
+               user_data.data = callback;
+               user_data.free_func = NULL;
+
+               CDTIME_T_TO_TIMESPEC (ctx.interval, &interval);
+               return plugin_register_complex_read (/* group = */ NULL,
+                               name, read_cb_wrapper, &interval, &user_data);
+       }
+
+       DEBUG ("plugin_register_read: default_interval = %.3f",
+                       CDTIME_T_TO_DOUBLE(plugin_get_interval ()));
+
        rf = malloc (sizeof (*rf));
        if (rf == NULL)
        {
@@ -790,6 +835,7 @@ int plugin_register_read (const char *name,
        rf->rf_callback = (void *) callback;
        rf->rf_udata.data = NULL;
        rf->rf_udata.free_func = NULL;
+       rf->rf_ctx = ctx;
        rf->rf_group[0] = '\0';
        sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
        rf->rf_type = RF_SIMPLE;
@@ -810,6 +856,7 @@ int plugin_register_complex_read (const char *group, const char *name,
                user_data_t *user_data)
 {
        read_func_t *rf;
+       plugin_ctx_t ctx = plugin_get_ctx ();
        int status;
 
        rf = malloc (sizeof (*rf));
@@ -831,8 +878,16 @@ int plugin_register_complex_read (const char *group, const char *name,
        {
                rf->rf_interval = *interval;
        }
+       else if (ctx.interval != 0)
+       {
+               CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval);
+       }
        rf->rf_effective_interval = rf->rf_interval;
 
+       DEBUG ("plugin_register_read: interval = %i.%09i",
+                       (int) rf->rf_interval.tv_sec,
+                       (int) rf->rf_interval.tv_nsec);
+
        /* Set user data */
        if (user_data == NULL)
        {
@@ -844,6 +899,8 @@ int plugin_register_complex_read (const char *group, const char *name,
                rf->rf_udata = *user_data;
        }
 
+       rf->rf_ctx = ctx;
+
        status = plugin_insert_read (rf);
        if (status != 0)
                sfree (rf);
@@ -1121,10 +1178,13 @@ void plugin_init_all (void)
        {
                callback_func_t *cf;
                plugin_init_cb callback;
+               plugin_ctx_t old_ctx;
 
                cf = le->value;
+               old_ctx = plugin_set_ctx (cf->cf_ctx);
                callback = cf->cf_callback;
                status = (*callback) ();
+               plugin_set_ctx (old_ctx);
 
                if (status != 0)
                {
@@ -1177,11 +1237,14 @@ int plugin_read_all_once (void)
        while (42)
        {
                read_func_t *rf;
+               plugin_ctx_t old_ctx;
 
                rf = c_heap_get_root (read_heap);
                if (rf == NULL)
                        break;
 
+               old_ctx = plugin_set_ctx (rf->rf_ctx);
+
                if (rf->rf_type == RF_SIMPLE)
                {
                        int (*callback) (void);
@@ -1197,6 +1260,8 @@ int plugin_read_all_once (void)
                        status = (*callback) (&rf->rf_udata);
                }
 
+               plugin_set_ctx (old_ctx);
+
                if (status != 0)
                {
                        NOTICE ("read-function of plugin `%s' failed.",
@@ -1243,6 +1308,9 @@ int plugin_write (const char *plugin, /* {{{ */
       callback_func_t *cf = le->value;
       plugin_write_cb callback;
 
+      /* do not switch plugin context; rather keep the context (interval)
+       * information of the calling read plugin */
+
       DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
       callback = cf->cf_callback;
       status = (*callback) (ds, vl, &cf->cf_udata);
@@ -1278,6 +1346,9 @@ int plugin_write (const char *plugin, /* {{{ */
 
     cf = le->value;
 
+    /* do not switch plugin context; rather keep the context (interval)
+     * information of the calling read plugin */
+
     DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
     callback = cf->cf_callback;
     status = (*callback) (ds, vl, &cf->cf_udata);
@@ -1298,6 +1369,7 @@ int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
   {
     callback_func_t *cf;
     plugin_flush_cb callback;
+    plugin_ctx_t old_ctx;
 
     if ((plugin != NULL)
         && (strcmp (plugin, le->key) != 0))
@@ -1307,10 +1379,13 @@ int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
     }
 
     cf = le->value;
+    old_ctx = plugin_set_ctx (cf->cf_ctx);
     callback = cf->cf_callback;
 
     (*callback) (timeout, identifier, &cf->cf_udata);
 
+    plugin_set_ctx (old_ctx);
+
     le = le->next;
   }
   return (0);
@@ -1343,8 +1418,10 @@ void plugin_shutdown_all (void)
        {
                callback_func_t *cf;
                plugin_shutdown_cb callback;
+               plugin_ctx_t old_ctx;
 
                cf = le->value;
+               old_ctx = plugin_set_ctx (cf->cf_ctx);
                callback = cf->cf_callback;
 
                /* Advance the pointer before calling the callback allows
@@ -1354,6 +1431,8 @@ void plugin_shutdown_all (void)
                le = le->next;
 
                (*callback) ();
+
+               plugin_set_ctx (old_ctx);
        }
 
        /* Write plugins which use the `user_data' pointer usually need the
@@ -1382,12 +1461,15 @@ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
   {
     callback_func_t *cf;
     plugin_missing_cb callback;
+    plugin_ctx_t old_ctx;
     int status;
 
     cf = le->value;
+    old_ctx = plugin_set_ctx (cf->cf_ctx);
     callback = cf->cf_callback;
 
     status = (*callback) (vl, &cf->cf_udata);
+    plugin_set_ctx (old_ctx);
     if (status != 0)
     {
       if (status < 0)
@@ -1463,7 +1545,25 @@ int plugin_dispatch_values (value_list_t *vl)
                vl->time = cdtime ();
 
        if (vl->interval <= 0)
-               vl->interval = interval_g;
+       {
+               plugin_ctx_t ctx = plugin_get_ctx ();
+
+               if (ctx.interval != 0)
+                       vl->interval = ctx.interval;
+               else
+               {
+                       char name[6 * DATA_MAX_NAME_LEN];
+                       FORMAT_VL (name, sizeof (name), vl);
+                       ERROR ("plugin_dispatch_values: Unable to determine "
+                                       "interval from context for "
+                                       "value list \"%s\". "
+                                       "This indicates a broken plugin. "
+                                       "Please report this problem to the "
+                                       "collectd mailing list or at "
+                                       "<http://collectd.org/bugs/>.", name);
+                       vl->interval = interval_g;
+               }
+       }
 
        DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
                        "host = %s; "
@@ -1653,6 +1753,9 @@ int plugin_dispatch_notification (const notification_t *notif)
                plugin_notification_cb callback;
                int status;
 
+               /* do not switch plugin context; rather keep the context
+                * (interval) information of the calling plugin */
+
                cf = le->value;
                callback = cf->cf_callback;
                status = (*callback) (notif, &cf->cf_udata);
@@ -1700,6 +1803,9 @@ void plugin_log (int level, const char *format, ...)
                cf = le->value;
                callback = cf->cf_callback;
 
+               /* do not switch plugin context; rather keep the context
+                * (interval) information of the calling plugin */
+
                (*callback) (level, msg, &cf->cf_udata);
 
                le = le->next;
@@ -1932,4 +2038,122 @@ int plugin_notification_meta_free (notification_meta_t *n)
   return (0);
 } /* int plugin_notification_meta_free */
 
+static void plugin_ctx_destructor (void *ctx)
+{
+       sfree (ctx);
+} /* void plugin_ctx_destructor */
+
+static plugin_ctx_t ctx_init = { /* interval = */ 0 };
+
+static plugin_ctx_t *plugin_ctx_create (void)
+{
+       plugin_ctx_t *ctx;
+
+       ctx = malloc (sizeof (*ctx));
+       if (ctx == NULL) {
+               char errbuf[1024];
+               ERROR ("Failed to allocate plugin context: %s",
+                               sstrerror (errno, errbuf, sizeof (errbuf)));
+               return NULL;
+       }
+
+       *ctx = ctx_init;
+       assert (plugin_ctx_key_initialized);
+       pthread_setspecific (plugin_ctx_key, ctx);
+       DEBUG("Created new plugin context.");
+       return (ctx);
+} /* int plugin_ctx_create */
+
+void plugin_init_ctx (void)
+{
+       pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor);
+       plugin_ctx_key_initialized = 1;
+} /* void plugin_init_ctx */
+
+plugin_ctx_t plugin_get_ctx (void)
+{
+       plugin_ctx_t *ctx;
+
+       assert (plugin_ctx_key_initialized);
+       ctx = pthread_getspecific (plugin_ctx_key);
+
+       if (ctx == NULL) {
+               ctx = plugin_ctx_create ();
+               /* this must no happen -- exit() instead? */
+               if (ctx == NULL)
+                       return ctx_init;
+       }
+
+       return (*ctx);
+} /* plugin_ctx_t plugin_get_ctx */
+
+plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx)
+{
+       plugin_ctx_t *c;
+       plugin_ctx_t old;
+
+       assert (plugin_ctx_key_initialized);
+       c = pthread_getspecific (plugin_ctx_key);
+
+       if (c == NULL) {
+               c = plugin_ctx_create ();
+               /* this must no happen -- exit() instead? */
+               if (c == NULL)
+                       return ctx_init;
+       }
+
+       old = *c;
+       *c = ctx;
+
+       return (old);
+} /* void plugin_set_ctx */
+
+cdtime_t plugin_get_interval (void)
+{
+       cdtime_t interval;
+
+       interval = plugin_get_ctx().interval;
+       if (interval > 0)
+               return interval;
+
+       return cf_get_default_interval ();
+} /* cdtime_t plugin_get_interval */
+
+typedef struct {
+       plugin_ctx_t ctx;
+       void *(*start_routine) (void *);
+       void *arg;
+} plugin_thread_t;
+
+static void *plugin_thread_start (void *arg)
+{
+       plugin_thread_t *plugin_thread = arg;
+
+       void *(*start_routine) (void *) = plugin_thread->start_routine;
+       void *plugin_arg = plugin_thread->arg;
+
+       plugin_set_ctx (plugin_thread->ctx);
+
+       free (plugin_thread);
+
+       return start_routine (plugin_arg);
+} /* void *plugin_thread_start */
+
+int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr,
+               void *(*start_routine) (void *), void *arg)
+{
+       plugin_thread_t *plugin_thread;
+
+       plugin_thread = malloc (sizeof (*plugin_thread));
+       if (plugin_thread == NULL)
+               return -1;
+
+       plugin_thread->ctx           = plugin_get_ctx ();
+       plugin_thread->start_routine = start_routine;
+       plugin_thread->arg           = arg;
+
+       return pthread_create (thread, attr,
+                       plugin_thread_start, plugin_thread);
+} /* int plugin_thread_create */
+
 /* vim: set sw=8 ts=8 noet fdm=marker : */