X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fplugin.c;h=809c140f7e0d1ee8f9047e39d5a7c17eed4cac8a;hb=1e74bb66c61d965c4aa261adfaf058f3356cff09;hp=cdd56bd79dd8f2f4d0c6374c7d33fe5ebbe224d7;hpb=3427c2e266c04d67848bda913caa730a395c7295;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index cdd56bd7..809c140f 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -45,6 +45,7 @@ struct callback_func_s { void *cf_callback; user_data_t cf_udata; + plugin_ctx_t cf_ctx; }; typedef struct callback_func_s callback_func_t; @@ -57,6 +58,7 @@ struct read_func_s * The `rf_super' member MUST be the first one in this structure! */ #define rf_callback rf_super.cf_callback #define rf_udata rf_super.cf_udata +#define rf_ctx rf_super.cf_ctx callback_func_t rf_super; char rf_group[DATA_MAX_NAME_LEN]; char rf_name[DATA_MAX_NAME_LEN]; @@ -93,6 +95,9 @@ static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; static pthread_t *read_threads = NULL; static int read_threads_num = 0; +static pthread_key_t plugin_ctx_key; +static _Bool plugin_ctx_key_initialized = 0; + /* * Static functions */ @@ -246,6 +251,8 @@ static int create_register_callback (llist_t **list, /* {{{ */ cf->cf_udata = *ud; } + cf->cf_ctx = plugin_get_ctx (); + return (register_callback (list, name, cf)); } /* }}} int create_register_callback */ @@ -291,7 +298,7 @@ static int plugin_load_file (char *file, uint32_t flags) dlh = lt_dlopenadvise(file, advise); lt_dladvise_destroy(&advise); } else { - dlh = lt_dlopen (file); + dlh = lt_dlopen (file); } #else /* if LIBTOOL_VERSION == 1 */ if (flags & PLUGIN_FLAGS_GLOBAL) @@ -346,33 +353,34 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) while (read_loop != 0) { read_func_t *rf; + plugin_ctx_t old_ctx; cdtime_t now; int status; int rf_type; int rc; - /* Get the read function that needs to be read next. */ + /* Get the read function that needs to be read next. + * We don't need to hold "read_lock" for the heap, but we need + * to call c_heap_get_root() and pthread_cond_wait() in the + * same protected block. */ + pthread_mutex_lock (&read_lock); rf = c_heap_get_root (read_heap); if (rf == NULL) { - struct timespec abstime; - - now = cdtime (); - - CDTIME_T_TO_TIMESPEC (now + interval_g, &abstime); - - pthread_mutex_lock (&read_lock); - pthread_cond_timedwait (&read_cond, &read_lock, - &abstime); - pthread_mutex_unlock (&read_lock); + pthread_cond_wait (&read_cond, &read_lock); + pthread_mutex_unlock (&read_lock); continue; } + pthread_mutex_unlock (&read_lock); if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0)) { + /* this should not happen, because the interval is set + * for each plugin when loading it + * XXX: issue a warning? */ now = cdtime (); - CDTIME_T_TO_TIMESPEC (interval_g, &rf->rf_interval); + CDTIME_T_TO_TIMESPEC (plugin_get_interval (), &rf->rf_interval); rf->rf_effective_interval = rf->rf_interval; @@ -423,6 +431,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name); + old_ctx = plugin_set_ctx (rf->rf_ctx); + if (rf_type == RF_SIMPLE) { int (*callback) (void); @@ -440,6 +450,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) status = (*callback) (&rf->rf_udata); } + plugin_set_ctx (old_ctx); + /* If the function signals failure, we will increase the * intervals in which it will be called. */ if (status != 0) @@ -714,6 +726,9 @@ static int plugin_insert_read (read_func_t *rf) int status; llentry_t *le; + cdtime_t now = cdtime (); + CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read); + pthread_mutex_lock (&read_lock); if (read_list == NULL) @@ -769,16 +784,49 @@ static int plugin_insert_read (read_func_t *rf) /* This does not fail. */ llist_append (read_list, le); + /* Wake up all the read threads. */ + pthread_cond_broadcast (&read_cond); pthread_mutex_unlock (&read_lock); return (0); } /* int plugin_insert_read */ +static int read_cb_wrapper (user_data_t *ud) +{ + int (*callback) (void); + + if (ud == NULL) + return -1; + + callback = ud->data; + return callback(); +} /* int read_cb_wrapper */ + int plugin_register_read (const char *name, int (*callback) (void)) { read_func_t *rf; + plugin_ctx_t ctx = plugin_get_ctx (); int status; + if (ctx.interval != 0) { + /* If ctx.interval is not zero (== use the plugin or global + * interval), we need to use the "complex" read callback, + * because only that allows to specify a different interval. + * Wrap the callback using read_cb_wrapper(). */ + struct timespec interval; + user_data_t user_data; + + user_data.data = callback; + user_data.free_func = NULL; + + CDTIME_T_TO_TIMESPEC (ctx.interval, &interval); + return plugin_register_complex_read (/* group = */ NULL, + name, read_cb_wrapper, &interval, &user_data); + } + + DEBUG ("plugin_register_read: default_interval = %.3f", + CDTIME_T_TO_DOUBLE(plugin_get_interval ())); + rf = malloc (sizeof (*rf)); if (rf == NULL) { @@ -790,6 +838,7 @@ int plugin_register_read (const char *name, rf->rf_callback = (void *) callback; rf->rf_udata.data = NULL; rf->rf_udata.free_func = NULL; + rf->rf_ctx = ctx; rf->rf_group[0] = '\0'; sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); rf->rf_type = RF_SIMPLE; @@ -810,6 +859,7 @@ int plugin_register_complex_read (const char *group, const char *name, user_data_t *user_data) { read_func_t *rf; + plugin_ctx_t ctx = plugin_get_ctx (); int status; rf = malloc (sizeof (*rf)); @@ -831,8 +881,16 @@ int plugin_register_complex_read (const char *group, const char *name, { rf->rf_interval = *interval; } + else if (ctx.interval != 0) + { + CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval); + } rf->rf_effective_interval = rf->rf_interval; + DEBUG ("plugin_register_read: interval = %i.%09i", + (int) rf->rf_interval.tv_sec, + (int) rf->rf_interval.tv_nsec); + /* Set user data */ if (user_data == NULL) { @@ -844,6 +902,8 @@ int plugin_register_complex_read (const char *group, const char *name, rf->rf_udata = *user_data; } + rf->rf_ctx = ctx; + status = plugin_insert_read (rf); if (status != 0) sfree (rf); @@ -1121,10 +1181,13 @@ void plugin_init_all (void) { callback_func_t *cf; plugin_init_cb callback; + plugin_ctx_t old_ctx; cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; status = (*callback) (); + plugin_set_ctx (old_ctx); if (status != 0) { @@ -1177,11 +1240,14 @@ int plugin_read_all_once (void) while (42) { read_func_t *rf; + plugin_ctx_t old_ctx; rf = c_heap_get_root (read_heap); if (rf == NULL) break; + old_ctx = plugin_set_ctx (rf->rf_ctx); + if (rf->rf_type == RF_SIMPLE) { int (*callback) (void); @@ -1197,6 +1263,8 @@ int plugin_read_all_once (void) status = (*callback) (&rf->rf_udata); } + plugin_set_ctx (old_ctx); + if (status != 0) { NOTICE ("read-function of plugin `%s' failed.", @@ -1243,6 +1311,9 @@ int plugin_write (const char *plugin, /* {{{ */ callback_func_t *cf = le->value; plugin_write_cb callback; + /* do not switch plugin context; rather keep the context (interval) + * information of the calling read plugin */ + DEBUG ("plugin: plugin_write: Writing values via %s.", le->key); callback = cf->cf_callback; status = (*callback) (ds, vl, &cf->cf_udata); @@ -1278,6 +1349,9 @@ int plugin_write (const char *plugin, /* {{{ */ cf = le->value; + /* do not switch plugin context; rather keep the context (interval) + * information of the calling read plugin */ + DEBUG ("plugin: plugin_write: Writing values via %s.", le->key); callback = cf->cf_callback; status = (*callback) (ds, vl, &cf->cf_udata); @@ -1298,6 +1372,7 @@ int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier) { callback_func_t *cf; plugin_flush_cb callback; + plugin_ctx_t old_ctx; if ((plugin != NULL) && (strcmp (plugin, le->key) != 0)) @@ -1307,10 +1382,13 @@ int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier) } cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; (*callback) (timeout, identifier, &cf->cf_udata); + plugin_set_ctx (old_ctx); + le = le->next; } return (0); @@ -1343,8 +1421,10 @@ void plugin_shutdown_all (void) { callback_func_t *cf; plugin_shutdown_cb callback; + plugin_ctx_t old_ctx; cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; /* Advance the pointer before calling the callback allows @@ -1354,6 +1434,8 @@ void plugin_shutdown_all (void) le = le->next; (*callback) (); + + plugin_set_ctx (old_ctx); } /* Write plugins which use the `user_data' pointer usually need the @@ -1382,12 +1464,15 @@ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ { callback_func_t *cf; plugin_missing_cb callback; + plugin_ctx_t old_ctx; int status; cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; status = (*callback) (vl, &cf->cf_udata); + plugin_set_ctx (old_ctx); if (status != 0) { if (status < 0) @@ -1463,7 +1548,25 @@ int plugin_dispatch_values (value_list_t *vl) vl->time = cdtime (); if (vl->interval <= 0) - vl->interval = interval_g; + { + plugin_ctx_t ctx = plugin_get_ctx (); + + if (ctx.interval != 0) + vl->interval = ctx.interval; + else + { + char name[6 * DATA_MAX_NAME_LEN]; + FORMAT_VL (name, sizeof (name), vl); + ERROR ("plugin_dispatch_values: Unable to determine " + "interval from context for " + "value list \"%s\". " + "This indicates a broken plugin. " + "Please report this problem to the " + "collectd mailing list or at " + ".", name); + vl->interval = cf_get_default_interval (); + } + } DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; " "host = %s; " @@ -1653,6 +1756,9 @@ int plugin_dispatch_notification (const notification_t *notif) plugin_notification_cb callback; int status; + /* do not switch plugin context; rather keep the context + * (interval) information of the calling plugin */ + cf = le->value; callback = cf->cf_callback; status = (*callback) (notif, &cf->cf_udata); @@ -1700,6 +1806,9 @@ void plugin_log (int level, const char *format, ...) cf = le->value; callback = cf->cf_callback; + /* do not switch plugin context; rather keep the context + * (interval) information of the calling plugin */ + (*callback) (level, msg, &cf->cf_udata); le = le->next; @@ -1932,4 +2041,122 @@ int plugin_notification_meta_free (notification_meta_t *n) return (0); } /* int plugin_notification_meta_free */ +static void plugin_ctx_destructor (void *ctx) +{ + sfree (ctx); +} /* void plugin_ctx_destructor */ + +static plugin_ctx_t ctx_init = { /* interval = */ 0 }; + +static plugin_ctx_t *plugin_ctx_create (void) +{ + plugin_ctx_t *ctx; + + ctx = malloc (sizeof (*ctx)); + if (ctx == NULL) { + char errbuf[1024]; + ERROR ("Failed to allocate plugin context: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return NULL; + } + + *ctx = ctx_init; + assert (plugin_ctx_key_initialized); + pthread_setspecific (plugin_ctx_key, ctx); + DEBUG("Created new plugin context."); + return (ctx); +} /* int plugin_ctx_create */ + +void plugin_init_ctx (void) +{ + pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor); + plugin_ctx_key_initialized = 1; +} /* void plugin_init_ctx */ + +plugin_ctx_t plugin_get_ctx (void) +{ + plugin_ctx_t *ctx; + + assert (plugin_ctx_key_initialized); + ctx = pthread_getspecific (plugin_ctx_key); + + if (ctx == NULL) { + ctx = plugin_ctx_create (); + /* this must no happen -- exit() instead? */ + if (ctx == NULL) + return ctx_init; + } + + return (*ctx); +} /* plugin_ctx_t plugin_get_ctx */ + +plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx) +{ + plugin_ctx_t *c; + plugin_ctx_t old; + + assert (plugin_ctx_key_initialized); + c = pthread_getspecific (plugin_ctx_key); + + if (c == NULL) { + c = plugin_ctx_create (); + /* this must no happen -- exit() instead? */ + if (c == NULL) + return ctx_init; + } + + old = *c; + *c = ctx; + + return (old); +} /* void plugin_set_ctx */ + +cdtime_t plugin_get_interval (void) +{ + cdtime_t interval; + + interval = plugin_get_ctx().interval; + if (interval > 0) + return interval; + + return cf_get_default_interval (); +} /* cdtime_t plugin_get_interval */ + +typedef struct { + plugin_ctx_t ctx; + void *(*start_routine) (void *); + void *arg; +} plugin_thread_t; + +static void *plugin_thread_start (void *arg) +{ + plugin_thread_t *plugin_thread = arg; + + void *(*start_routine) (void *) = plugin_thread->start_routine; + void *plugin_arg = plugin_thread->arg; + + plugin_set_ctx (plugin_thread->ctx); + + free (plugin_thread); + + return start_routine (plugin_arg); +} /* void *plugin_thread_start */ + +int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr, + void *(*start_routine) (void *), void *arg) +{ + plugin_thread_t *plugin_thread; + + plugin_thread = malloc (sizeof (*plugin_thread)); + if (plugin_thread == NULL) + return -1; + + plugin_thread->ctx = plugin_get_ctx (); + plugin_thread->start_routine = start_routine; + plugin_thread->arg = arg; + + return pthread_create (thread, attr, + plugin_thread_start, plugin_thread); +} /* int plugin_thread_create */ + /* vim: set sw=8 ts=8 noet fdm=marker : */