diff --git a/src/plugin.c b/src/plugin.c
index a749d1e07b18502127406469471cd728fe9121bd..bbede051bbfaea4972c120b4c3e4f79f5e777fe8 100644 (file)
--- a/src/plugin.c
+++ b/src/plugin.c
{
void *cf_callback;
user_data_t cf_udata;
+ plugin_ctx_t cf_ctx;
};
typedef struct callback_func_s callback_func_t;
* The `rf_super' member MUST be the first one in this structure! */
#define rf_callback rf_super.cf_callback
#define rf_udata rf_super.cf_udata
+#define rf_ctx rf_super.cf_ctx
callback_func_t rf_super;
char rf_group[DATA_MAX_NAME_LEN];
char rf_name[DATA_MAX_NAME_LEN];
static pthread_t *read_threads = NULL;
static int read_threads_num = 0;
+static pthread_key_t plugin_ctx_key;
+static _Bool plugin_ctx_key_initialized = 0;
+
/*
* Static functions
*/
cf->cf_udata = *ud;
}
+ cf->cf_ctx = plugin_get_ctx ();
+
return (register_callback (list, name, cf));
} /* }}} int create_register_callback */
dlh = lt_dlopenadvise(file, advise);
lt_dladvise_destroy(&advise);
} else {
- dlh = lt_dlopen (file);
+ dlh = lt_dlopen (file);
}
#else /* if LIBTOOL_VERSION == 1 */
if (flags & PLUGIN_FLAGS_GLOBAL)
while (read_loop != 0)
{
read_func_t *rf;
+ plugin_ctx_t old_ctx;
cdtime_t now;
int status;
int rf_type;
int rc;
- /* Get the read function that needs to be read next. */
+ /* Get the read function that needs to be read next.
+ * We don't need to hold "read_lock" for the heap, but we need
+ * to call c_heap_get_root() and pthread_cond_wait() in the
+ * same protected block. */
+ pthread_mutex_lock (&read_lock);
rf = c_heap_get_root (read_heap);
if (rf == NULL)
{
- struct timespec abstime;
-
- now = cdtime ();
-
- CDTIME_T_TO_TIMESPEC (now + interval_g, &abstime);
-
- pthread_mutex_lock (&read_lock);
- pthread_cond_timedwait (&read_cond, &read_lock,
- &abstime);
- pthread_mutex_unlock (&read_lock);
+ pthread_cond_wait (&read_cond, &read_lock);
+ pthread_mutex_unlock (&read_lock);
continue;
}
+ pthread_mutex_unlock (&read_lock);
if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0))
{
+ /* this should not happen, because the interval is set
+ * for each plugin when loading it
+ * XXX: issue a warning? */
now = cdtime ();
- CDTIME_T_TO_TIMESPEC (interval_g, &rf->rf_interval);
+ CDTIME_T_TO_TIMESPEC (plugin_get_interval (), &rf->rf_interval);
rf->rf_effective_interval = rf->rf_interval;
DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
+ old_ctx = plugin_set_ctx (rf->rf_ctx);
+
if (rf_type == RF_SIMPLE)
{
int (*callback) (void);
status = (*callback) (&rf->rf_udata);
}
+ plugin_set_ctx (old_ctx);
+
/* If the function signals failure, we will increase the
* intervals in which it will be called. */
if (status != 0)
/* This does not fail. */
llist_append (read_list, le);
+ /* Wake up all the read threads. */
+ pthread_cond_broadcast (&read_cond);
pthread_mutex_unlock (&read_lock);
return (0);
} /* int plugin_insert_read */
+static int read_cb_wrapper (user_data_t *ud)
+{
+ int (*callback) (void);
+
+ if (ud == NULL)
+ return -1;
+
+ callback = ud->data;
+ return callback();
+} /* int read_cb_wrapper */
+
int plugin_register_read (const char *name,
int (*callback) (void))
{
read_func_t *rf;
+ plugin_ctx_t ctx = plugin_get_ctx ();
int status;
+ if (ctx.interval != 0) {
+ /* If ctx.interval is not zero (== use the plugin or global
+ * interval), we need to use the "complex" read callback,
+ * because only that allows to specify a different interval.
+ * Wrap the callback using read_cb_wrapper(). */
+ struct timespec interval;
+ user_data_t user_data;
+
+ user_data.data = callback;
+ user_data.free_func = NULL;
+
+ CDTIME_T_TO_TIMESPEC (ctx.interval, &interval);
+ return plugin_register_complex_read (/* group = */ NULL,
+ name, read_cb_wrapper, &interval, &user_data);
+ }
+
+ DEBUG ("plugin_register_read: default_interval = %.3f",
+ CDTIME_T_TO_DOUBLE(plugin_get_interval ()));
+
rf = malloc (sizeof (*rf));
if (rf == NULL)
{
rf->rf_callback = (void *) callback;
rf->rf_udata.data = NULL;
rf->rf_udata.free_func = NULL;
+ rf->rf_ctx = ctx;
rf->rf_group[0] = '\0';
sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
rf->rf_type = RF_SIMPLE;
user_data_t *user_data)
{
read_func_t *rf;
+ plugin_ctx_t ctx = plugin_get_ctx ();
int status;
rf = malloc (sizeof (*rf));
{
rf->rf_interval = *interval;
}
+ else if (ctx.interval != 0)
+ {
+ CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval);
+ }
rf->rf_effective_interval = rf->rf_interval;
+ DEBUG ("plugin_register_read: interval = %i.%09i",
+ (int) rf->rf_interval.tv_sec,
+ (int) rf->rf_interval.tv_nsec);
+
/* Set user data */
if (user_data == NULL)
{
rf->rf_udata = *user_data;
}
+ rf->rf_ctx = ctx;
+
status = plugin_insert_read (rf);
if (status != 0)
sfree (rf);
{
callback_func_t *cf;
plugin_init_cb callback;
+ plugin_ctx_t old_ctx;
cf = le->value;
+ old_ctx = plugin_set_ctx (cf->cf_ctx);
callback = cf->cf_callback;
status = (*callback) ();
+ plugin_set_ctx (old_ctx);
if (status != 0)
{
while (42)
{
read_func_t *rf;
+ plugin_ctx_t old_ctx;
rf = c_heap_get_root (read_heap);
if (rf == NULL)
break;
+ old_ctx = plugin_set_ctx (rf->rf_ctx);
+
if (rf->rf_type == RF_SIMPLE)
{
int (*callback) (void);
status = (*callback) (&rf->rf_udata);
}
+ plugin_set_ctx (old_ctx);
+
if (status != 0)
{
NOTICE ("read-function of plugin `%s' failed.",
callback_func_t *cf = le->value;
plugin_write_cb callback;
+ /* do not switch plugin context; rather keep the context (interval)
+ * information of the calling read plugin */
+
DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
callback = cf->cf_callback;
status = (*callback) (ds, vl, &cf->cf_udata);
cf = le->value;
+ /* do not switch plugin context; rather keep the context (interval)
+ * information of the calling read plugin */
+
DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
callback = cf->cf_callback;
status = (*callback) (ds, vl, &cf->cf_udata);
@@ -1298,6 +1369,7 @@ int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
{
callback_func_t *cf;
plugin_flush_cb callback;
+ plugin_ctx_t old_ctx;
if ((plugin != NULL)
&& (strcmp (plugin, le->key) != 0))
@@ -1307,10 +1379,13 @@ int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
}
cf = le->value;
+ old_ctx = plugin_set_ctx (cf->cf_ctx);
callback = cf->cf_callback;
(*callback) (timeout, identifier, &cf->cf_udata);
+ plugin_set_ctx (old_ctx);
+
le = le->next;
}
return (0);
{
callback_func_t *cf;
plugin_shutdown_cb callback;
+ plugin_ctx_t old_ctx;
cf = le->value;
+ old_ctx = plugin_set_ctx (cf->cf_ctx);
callback = cf->cf_callback;
/* Advance the pointer before calling the callback allows
le = le->next;
(*callback) ();
+
+ plugin_set_ctx (old_ctx);
}
/* Write plugins which use the `user_data' pointer usually need the
{
callback_func_t *cf;
plugin_missing_cb callback;
+ plugin_ctx_t old_ctx;
int status;
cf = le->value;
+ old_ctx = plugin_set_ctx (cf->cf_ctx);
callback = cf->cf_callback;
status = (*callback) (vl, &cf->cf_udata);
+ plugin_set_ctx (old_ctx);
if (status != 0)
{
if (status < 0)
vl->time = cdtime ();
if (vl->interval <= 0)
- vl->interval = interval_g;
+ {
+ plugin_ctx_t ctx = plugin_get_ctx ();
+
+ if (ctx.interval != 0)
+ vl->interval = ctx.interval;
+ else
+ {
+ char name[6 * DATA_MAX_NAME_LEN];
+ FORMAT_VL (name, sizeof (name), vl);
+ ERROR ("plugin_dispatch_values: Unable to determine "
+ "interval from context for "
+ "value list \"%s\". "
+ "This indicates a broken plugin. "
+ "Please report this problem to the "
+ "collectd mailing list or at "
+ "<http://collectd.org/bugs/>.", name);
+ vl->interval = cf_get_default_interval ();
+ }
+ }
DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
"host = %s; "
return (0);
} /* int plugin_dispatch_values */
+int plugin_dispatch_values_secure (const value_list_t *vl)
+{
+ value_list_t vl_copy;
+ int status;
+
+ if (vl == NULL)
+ return EINVAL;
+
+ memcpy (&vl_copy, vl, sizeof (vl_copy));
+
+ /* Write callbacks must not change the values and meta pointers, so we can
+ * savely skip copying those and make this more efficient. */
+ if ((pre_cache_chain == NULL) && (post_cache_chain == NULL))
+ return (plugin_dispatch_values (&vl_copy));
+
+ /* Set pointers to NULL, just to be on the save side. */
+ vl_copy.values = NULL;
+ vl_copy.meta = NULL;
+
+ vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len);
+ if (vl_copy.values == NULL)
+ {
+ ERROR ("plugin_dispatch_values_secure: malloc failed.");
+ return (ENOMEM);
+ }
+ memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len);
+
+ if (vl->meta != NULL)
+ {
+ vl_copy.meta = meta_data_clone (vl->meta);
+ if (vl_copy.meta == NULL)
+ {
+ ERROR ("plugin_dispatch_values_secure: meta_data_clone failed.");
+ free (vl_copy.values);
+ return (ENOMEM);
+ }
+ } /* if (vl->meta) */
+
+ status = plugin_dispatch_values (&vl_copy);
+
+ meta_data_destroy (vl_copy.meta);
+ free (vl_copy.values);
+
+ return (status);
+} /* int plugin_dispatch_values_secure */
+
int plugin_dispatch_notification (const notification_t *notif)
{
llentry_t *le;
plugin_notification_cb callback;
int status;
+ /* do not switch plugin context; rather keep the context
+ * (interval) information of the calling plugin */
+
cf = le->value;
callback = cf->cf_callback;
status = (*callback) (notif, &cf->cf_udata);
cf = le->value;
callback = cf->cf_callback;
+ /* do not switch plugin context; rather keep the context
+ * (interval) information of the calling plugin */
+
(*callback) (level, msg, &cf->cf_udata);
le = le->next;
}
} /* void plugin_log */
+int parse_log_severity (const char *severity)
+{
+ int log_level = -1;
+
+ if ((0 == strcasecmp (severity, "emerg"))
+ || (0 == strcasecmp (severity, "alert"))
+ || (0 == strcasecmp (severity, "crit"))
+ || (0 == strcasecmp (severity, "err")))
+ log_level = LOG_ERR;
+ else if (0 == strcasecmp (severity, "warning"))
+ log_level = LOG_WARNING;
+ else if (0 == strcasecmp (severity, "notice"))
+ log_level = LOG_NOTICE;
+ else if (0 == strcasecmp (severity, "info"))
+ log_level = LOG_INFO;
+#if COLLECT_DEBUG
+ else if (0 == strcasecmp (severity, "debug"))
+ log_level = LOG_DEBUG;
+#endif /* COLLECT_DEBUG */
+
+ return (log_level);
+} /* int parse_log_severity */
+
+int parse_notif_severity (const char *severity)
+{
+ int notif_severity = -1;
+
+ if (strcasecmp (severity, "FAILURE") == 0)
+ notif_severity = NOTIF_FAILURE;
+ else if (strcmp (severity, "OKAY") == 0)
+ notif_severity = NOTIF_OKAY;
+ else if ((strcmp (severity, "WARNING") == 0)
+ || (strcmp (severity, "WARN") == 0))
+ notif_severity = NOTIF_WARNING;
+
+ return (notif_severity);
+} /* int parse_notif_severity */
+
const data_set_t *plugin_get_ds (const char *name)
{
data_set_t *ds;
return (0);
} /* int plugin_notification_meta_free */
+static void plugin_ctx_destructor (void *ctx)
+{
+ sfree (ctx);
+} /* void plugin_ctx_destructor */
+
+static plugin_ctx_t ctx_init = { /* interval = */ 0 };
+
+static plugin_ctx_t *plugin_ctx_create (void)
+{
+ plugin_ctx_t *ctx;
+
+ ctx = malloc (sizeof (*ctx));
+ if (ctx == NULL) {
+ char errbuf[1024];
+ ERROR ("Failed to allocate plugin context: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ return NULL;
+ }
+
+ *ctx = ctx_init;
+ assert (plugin_ctx_key_initialized);
+ pthread_setspecific (plugin_ctx_key, ctx);
+ DEBUG("Created new plugin context.");
+ return (ctx);
+} /* int plugin_ctx_create */
+
+void plugin_init_ctx (void)
+{
+ pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor);
+ plugin_ctx_key_initialized = 1;
+} /* void plugin_init_ctx */
+
+plugin_ctx_t plugin_get_ctx (void)
+{
+ plugin_ctx_t *ctx;
+
+ assert (plugin_ctx_key_initialized);
+ ctx = pthread_getspecific (plugin_ctx_key);
+
+ if (ctx == NULL) {
+ ctx = plugin_ctx_create ();
+ /* this must no happen -- exit() instead? */
+ if (ctx == NULL)
+ return ctx_init;
+ }
+
+ return (*ctx);
+} /* plugin_ctx_t plugin_get_ctx */
+
+plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx)
+{
+ plugin_ctx_t *c;
+ plugin_ctx_t old;
+
+ assert (plugin_ctx_key_initialized);
+ c = pthread_getspecific (plugin_ctx_key);
+
+ if (c == NULL) {
+ c = plugin_ctx_create ();
+ /* this must no happen -- exit() instead? */
+ if (c == NULL)
+ return ctx_init;
+ }
+
+ old = *c;
+ *c = ctx;
+
+ return (old);
+} /* void plugin_set_ctx */
+
+cdtime_t plugin_get_interval (void)
+{
+ cdtime_t interval;
+
+ interval = plugin_get_ctx().interval;
+ if (interval > 0)
+ return interval;
+
+ return cf_get_default_interval ();
+} /* cdtime_t plugin_get_interval */
+
+typedef struct {
+ plugin_ctx_t ctx;
+ void *(*start_routine) (void *);
+ void *arg;
+} plugin_thread_t;
+
+static void *plugin_thread_start (void *arg)
+{
+ plugin_thread_t *plugin_thread = arg;
+
+ void *(*start_routine) (void *) = plugin_thread->start_routine;
+ void *plugin_arg = plugin_thread->arg;
+
+ plugin_set_ctx (plugin_thread->ctx);
+
+ free (plugin_thread);
+
+ return start_routine (plugin_arg);
+} /* void *plugin_thread_start */
+
+int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr,
+ void *(*start_routine) (void *), void *arg)
+{
+ plugin_thread_t *plugin_thread;
+
+ plugin_thread = malloc (sizeof (*plugin_thread));
+ if (plugin_thread == NULL)
+ return -1;
+
+ plugin_thread->ctx = plugin_get_ctx ();
+ plugin_thread->start_routine = start_routine;
+ plugin_thread->arg = arg;
+
+ return pthread_create (thread, attr,
+ plugin_thread_start, plugin_thread);
+} /* int plugin_thread_create */
+
/* vim: set sw=8 ts=8 noet fdm=marker : */