From bdbde949ad0bd244574efcec596675e1ef889c43 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Thu, 2 Feb 2012 07:44:48 +0100 Subject: [PATCH] plugin: Introduced basic support for per-plugin intervals. This is based on a newly introduced "plugin context", which stores plugin- related settings (currently the plugin interval) for each registered callback. The context is initialized when loading the plugin (LoadPlugin), setting the interval to the value of the newly introduced "Interval" option of the LoadPlugin block, if specified. The context is set (using thread-specific data) and restored before and after calling any callback. This way, single plugins don't need to take special care in order to benefit from the new feature. plugin.c will handle (most) situations automatically. VALUE_LIST_INIT() has been changed to honor the plugin interval settings (if any). As a helper, the new 'plugin_interval' macro may be used to access the current plugin's interval (read-only). Thanks to Florian for the initial idea! --- src/collectd.c | 2 + src/configfile.c | 52 +++++++++++++++- src/configfile.h | 3 + src/plugin.c | 155 ++++++++++++++++++++++++++++++++++++++++++++++- src/plugin.h | 21 ++++++- 5 files changed, 229 insertions(+), 4 deletions(-) diff --git a/src/collectd.c b/src/collectd.c index d33d1d66..c3dc7381 100644 --- a/src/collectd.c +++ b/src/collectd.c @@ -468,6 +468,8 @@ int main (int argc, char **argv) if (optind < argc) exit_usage (1); + plugin_init_ctx (); + /* * Read options from the config file, the environment and the command * line (in that order, with later options overwriting previous ones in diff --git a/src/configfile.c b/src/configfile.c index 7c8347b1..2a03035c 100644 --- a/src/configfile.c +++ b/src/configfile.c @@ -46,6 +46,7 @@ typedef struct cf_callback int (*callback) (const char *, const char *); const char **keys; int keys_num; + plugin_ctx_t ctx; struct cf_callback *next; } cf_callback_t; @@ -53,6 +54,7 @@ typedef struct cf_complex_callback_s { char *type; int (*callback) (oconfig_item_t *); + plugin_ctx_t ctx; struct cf_complex_callback_s *next; } cf_complex_callback_t; @@ -128,6 +130,7 @@ static int cf_dispatch (const char *type, const char *orig_key, const char *orig_value) { cf_callback_t *cf_cb; + plugin_ctx_t old_ctx; char *key; char *value; int ret; @@ -156,6 +159,8 @@ static int cf_dispatch (const char *type, const char *orig_key, ret = -1; + old_ctx = plugin_set_ctx (cf_cb->ctx); + for (i = 0; i < cf_cb->keys_num; i++) { if ((cf_cb->keys[i] != NULL) @@ -166,6 +171,8 @@ static int cf_dispatch (const char *type, const char *orig_key, } } + plugin_set_ctx (old_ctx); + if (i >= cf_cb->keys_num) WARNING ("Plugin `%s' did not register for value `%s'.", type, key); @@ -244,6 +251,7 @@ static int dispatch_loadplugin (const oconfig_item_t *ci) int i; const char *name; unsigned int flags = 0; + plugin_ctx_t ctx; assert (strcasecmp (ci->key, "LoadPlugin") == 0); if (ci->values_num != 1) @@ -252,6 +260,7 @@ static int dispatch_loadplugin (const oconfig_item_t *ci) return (-1); name = ci->values[0].value.string; + ctx.interval = 0; /* * XXX: Magic at work: @@ -271,6 +280,16 @@ static int dispatch_loadplugin (const oconfig_item_t *ci) for (i = 0; i < ci->children_num; ++i) { if (strcasecmp("Globals", ci->children[i].key) == 0) cf_util_get_flag (ci->children + i, &flags, PLUGIN_FLAGS_GLOBAL); + else if (strcasecmp ("Interval", ci->children[i].key) == 0) { + double interval = 0.0; + + if (cf_util_get_double (ci->children + i, &interval) != 0) { + /* cf_util_get_double will log an error */ + continue; + } + + ctx.interval = DOUBLE_TO_CDTIME_T (interval); + } else { WARNING("Ignoring unknown LoadPlugin option \"%s\" " "for plugin \"%s\"", @@ -278,6 +297,7 @@ static int dispatch_loadplugin (const oconfig_item_t *ci) } } + plugin_set_ctx (ctx); return (plugin_load (name, (uint32_t) flags)); } /* int dispatch_value_loadplugin */ @@ -357,8 +377,18 @@ static int dispatch_block_plugin (oconfig_item_t *ci) /* Check for a complex callback first */ for (cb = complex_callback_head; cb != NULL; cb = cb->next) + { if (strcasecmp (name, cb->type) == 0) - return (cb->callback (ci)); + { + plugin_ctx_t old_ctx; + int ret_val; + + old_ctx = plugin_set_ctx (cb->ctx); + ret_val = (cb->callback (ci)); + plugin_set_ctx (old_ctx); + return (ret_val); + } + } /* Hm, no complex plugin found. Dispatch the values one by one */ for (i = 0; i < ci->children_num; i++) @@ -884,6 +914,7 @@ void cf_register (const char *type, cf_cb->callback = callback; cf_cb->keys = keys; cf_cb->keys_num = keys_num; + cf_cb->ctx = plugin_get_ctx (); cf_cb->next = first_callback; first_callback = cf_cb; @@ -907,6 +938,8 @@ int cf_register_complex (const char *type, int (*callback) (oconfig_item_t *)) new->callback = callback; new->next = NULL; + new->ctx = plugin_get_ctx (); + if (complex_callback_head == NULL) { complex_callback_head = new; @@ -1015,6 +1048,23 @@ int cf_util_get_int (const oconfig_item_t *ci, int *ret_value) /* {{{ */ return (0); } /* }}} int cf_util_get_int */ +int cf_util_get_double (const oconfig_item_t *ci, double *ret_value) /* {{{ */ +{ + if ((ci == NULL) || (ret_value == NULL)) + return (EINVAL); + + if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER)) + { + ERROR ("cf_util_get_double: The %s option requires " + "exactly one numeric argument.", ci->key); + return (-1); + } + + *ret_value = ci->values[0].value.number; + + return (0); +} /* }}} int cf_util_get_double */ + int cf_util_get_boolean (const oconfig_item_t *ci, _Bool *ret_bool) /* {{{ */ { if ((ci == NULL) || (ret_bool == NULL)) diff --git a/src/configfile.h b/src/configfile.h index e63a0ea0..59ea5542 100644 --- a/src/configfile.h +++ b/src/configfile.h @@ -100,6 +100,9 @@ int cf_util_get_string_buffer (const oconfig_item_t *ci, char *buffer, /* Assures the config option is a number and returns it as an int. */ int cf_util_get_int (const oconfig_item_t *ci, int *ret_value); +/* Assures the config option is a number and returns it as a double. */ +int cf_util_get_double (const oconfig_item_t *ci, double *ret_value); + /* Assures the config option is a boolean and assignes it to `ret_bool'. * Otherwise, `ret_bool' is not changed and non-zero is returned. */ int cf_util_get_boolean (const oconfig_item_t *ci, _Bool *ret_bool); diff --git a/src/plugin.c b/src/plugin.c index 91c40b6b..7d66bd87 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -45,6 +45,7 @@ struct callback_func_s { void *cf_callback; user_data_t cf_udata; + plugin_ctx_t cf_ctx; }; typedef struct callback_func_s callback_func_t; @@ -57,6 +58,7 @@ struct read_func_s * The `rf_super' member MUST be the first one in this structure! */ #define rf_callback rf_super.cf_callback #define rf_udata rf_super.cf_udata +#define rf_ctx rf_super.cf_ctx callback_func_t rf_super; char rf_group[DATA_MAX_NAME_LEN]; char rf_name[DATA_MAX_NAME_LEN]; @@ -93,6 +95,8 @@ static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; static pthread_t *read_threads = NULL; static int read_threads_num = 0; +static pthread_key_t plugin_ctx_key; + /* * Static functions */ @@ -246,6 +250,8 @@ static int create_register_callback (llist_t **list, /* {{{ */ cf->cf_udata = *ud; } + cf->cf_ctx = plugin_get_ctx (); + return (register_callback (list, name, cf)); } /* }}} int create_register_callback */ @@ -346,6 +352,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) while (read_loop != 0) { read_func_t *rf; + plugin_ctx_t old_ctx; cdtime_t now; int status; int rf_type; @@ -423,6 +430,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name); + old_ctx = plugin_set_ctx (rf->rf_ctx); + if (rf_type == RF_SIMPLE) { int (*callback) (void); @@ -440,6 +449,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) status = (*callback) (&rf->rf_udata); } + plugin_set_ctx (old_ctx); + /* If the function signals failure, we will increase the * intervals in which it will be called. */ if (status != 0) @@ -773,12 +784,39 @@ static int plugin_insert_read (read_func_t *rf) return (0); } /* int plugin_insert_read */ +static int read_cb_wrapper (user_data_t *ud) +{ + int (*callback) (void); + + if (ud == NULL) + return -1; + + callback = ud->data; + return callback(); +} /* int read_cb_wrapper */ + int plugin_register_read (const char *name, int (*callback) (void)) { read_func_t *rf; + plugin_ctx_t ctx = plugin_get_ctx (); int status; + if (ctx.interval != 0) { + struct timespec interval; + user_data_t user_data; + + DEBUG ("plugin_register_read: plugin_interval = %.3f", + CDTIME_T_TO_DOUBLE(plugin_interval)); + + user_data.data = callback; + user_data.free_func = NULL; + + CDTIME_T_TO_TIMESPEC (ctx.interval, &interval); + return plugin_register_complex_read (/* group = */ NULL, + name, read_cb_wrapper, &interval, &user_data); + } + rf = malloc (sizeof (*rf)); if (rf == NULL) { @@ -790,6 +828,7 @@ int plugin_register_read (const char *name, rf->rf_callback = (void *) callback; rf->rf_udata.data = NULL; rf->rf_udata.free_func = NULL; + rf->rf_ctx = ctx; rf->rf_group[0] = '\0'; sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); rf->rf_type = RF_SIMPLE; @@ -810,6 +849,7 @@ int plugin_register_complex_read (const char *group, const char *name, user_data_t *user_data) { read_func_t *rf; + plugin_ctx_t ctx = plugin_get_ctx (); int status; rf = malloc (sizeof (*rf)); @@ -831,6 +871,10 @@ int plugin_register_complex_read (const char *group, const char *name, { rf->rf_interval = *interval; } + else if (ctx.interval != 0) + { + CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval); + } rf->rf_effective_interval = rf->rf_interval; /* Set user data */ @@ -844,6 +888,8 @@ int plugin_register_complex_read (const char *group, const char *name, rf->rf_udata = *user_data; } + rf->rf_ctx = ctx; + status = plugin_insert_read (rf); if (status != 0) sfree (rf); @@ -1121,10 +1167,13 @@ void plugin_init_all (void) { callback_func_t *cf; plugin_init_cb callback; + plugin_ctx_t old_ctx; cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; status = (*callback) (); + plugin_set_ctx (old_ctx); if (status != 0) { @@ -1177,11 +1226,14 @@ int plugin_read_all_once (void) while (42) { read_func_t *rf; + plugin_ctx_t old_ctx; rf = c_heap_get_root (read_heap); if (rf == NULL) break; + old_ctx = plugin_set_ctx (rf->rf_ctx); + if (rf->rf_type == RF_SIMPLE) { int (*callback) (void); @@ -1197,6 +1249,8 @@ int plugin_read_all_once (void) status = (*callback) (&rf->rf_udata); } + plugin_set_ctx (old_ctx); + if (status != 0) { NOTICE ("read-function of plugin `%s' failed.", @@ -1242,6 +1296,7 @@ int plugin_write (const char *plugin, /* {{{ */ { callback_func_t *cf = le->value; plugin_write_cb callback; + plugin_ctx_t old_ctx = plugin_set_ctx (cf->cf_ctx); DEBUG ("plugin: plugin_write: Writing values via %s.", le->key); callback = cf->cf_callback; @@ -1251,6 +1306,8 @@ int plugin_write (const char *plugin, /* {{{ */ else success++; + plugin_set_ctx (old_ctx); + le = le->next; } @@ -1263,6 +1320,7 @@ int plugin_write (const char *plugin, /* {{{ */ { callback_func_t *cf; plugin_write_cb callback; + plugin_ctx_t old_ctx; le = llist_head (list_write); while (le != NULL) @@ -1278,9 +1336,13 @@ int plugin_write (const char *plugin, /* {{{ */ cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); + DEBUG ("plugin: plugin_write: Writing values via %s.", le->key); callback = cf->cf_callback; status = (*callback) (ds, vl, &cf->cf_udata); + + plugin_set_ctx (old_ctx); } return (status); @@ -1298,6 +1360,7 @@ int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier) { callback_func_t *cf; plugin_flush_cb callback; + plugin_ctx_t old_ctx; if ((plugin != NULL) && (strcmp (plugin, le->key) != 0)) @@ -1307,10 +1370,13 @@ int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier) } cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; (*callback) (timeout, identifier, &cf->cf_udata); + plugin_set_ctx (old_ctx); + le = le->next; } return (0); @@ -1343,8 +1409,10 @@ void plugin_shutdown_all (void) { callback_func_t *cf; plugin_shutdown_cb callback; + plugin_ctx_t old_ctx; cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; /* Advance the pointer before calling the callback allows @@ -1354,6 +1422,8 @@ void plugin_shutdown_all (void) le = le->next; (*callback) (); + + plugin_set_ctx (old_ctx); } /* Write plugins which use the `user_data' pointer usually need the @@ -1382,12 +1452,15 @@ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ { callback_func_t *cf; plugin_missing_cb callback; + plugin_ctx_t old_ctx; int status; cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; status = (*callback) (vl, &cf->cf_udata); + plugin_set_ctx (old_ctx); if (status != 0) { if (status < 0) @@ -1462,8 +1535,14 @@ int plugin_dispatch_values (value_list_t *vl) if (vl->time == 0) vl->time = cdtime (); - if (vl->interval <= 0) - vl->interval = interval_g; + if (vl->interval <= 0) { + plugin_ctx_t ctx = plugin_get_ctx (); + + if (ctx.interval != 0) + vl->interval = ctx.interval; + else + vl->interval = interval_g; + } DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; " "host = %s; " @@ -1651,11 +1730,14 @@ int plugin_dispatch_notification (const notification_t *notif) { callback_func_t *cf; plugin_notification_cb callback; + plugin_ctx_t old_ctx; int status; cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; status = (*callback) (notif, &cf->cf_udata); + plugin_set_ctx (old_ctx); if (status != 0) { WARNING ("plugin_dispatch_notification: Notification " @@ -1696,12 +1778,15 @@ void plugin_log (int level, const char *format, ...) { callback_func_t *cf; plugin_log_cb callback; + plugin_ctx_t old_ctx; cf = le->value; + old_ctx = plugin_set_ctx (cf->cf_ctx); callback = cf->cf_callback; (*callback) (level, msg, &cf->cf_udata); + plugin_set_ctx (old_ctx); le = le->next; } } /* void plugin_log */ @@ -1894,4 +1979,70 @@ int plugin_notification_meta_free (notification_meta_t *n) return (0); } /* int plugin_notification_meta_free */ +static void plugin_ctx_destructor (void *ctx) +{ + sfree (ctx); +} /* void plugin_ctx_destructor */ + +static plugin_ctx_t ctx_init = { /* interval = */ 0 }; + +static plugin_ctx_t *plugin_ctx_create (void) +{ + plugin_ctx_t *ctx; + + ctx = malloc (sizeof (*ctx)); + if (ctx == NULL) { + char errbuf[1024]; + ERROR ("Failed to allocate plugin context: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return NULL; + } + + *ctx = ctx_init; + pthread_setspecific (plugin_ctx_key, ctx); + DEBUG("Created new plugin context."); + return (ctx); +} /* int plugin_ctx_create */ + +void plugin_init_ctx (void) +{ + pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor); +} /* void plugin_init_ctx */ + +plugin_ctx_t plugin_get_ctx (void) +{ + plugin_ctx_t *ctx; + + ctx = pthread_getspecific (plugin_ctx_key); + + if (ctx == NULL) { + ctx = plugin_ctx_create (); + /* this must no happen -- exit() instead? */ + if (ctx == NULL) + return ctx_init; + } + + return (*ctx); +} /* plugin_ctx_t plugin_get_ctx */ + +plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx) +{ + plugin_ctx_t *c; + plugin_ctx_t old; + + c = pthread_getspecific (plugin_ctx_key); + + if (c == NULL) { + c = plugin_ctx_create (); + /* this must no happen -- exit() instead? */ + if (c == NULL) + return ctx_init; + } + + old = *c; + *c = ctx; + + return (old); +} /* void plugin_set_ctx */ + /* vim: set sw=8 ts=8 noet fdm=marker : */ diff --git a/src/plugin.h b/src/plugin.h index 86d40340..a2f7f098 100644 --- a/src/plugin.h +++ b/src/plugin.h @@ -65,6 +65,8 @@ #define NOTIF_WARNING 2 #define NOTIF_OKAY 4 +#define plugin_interval (plugin_get_ctx().interval) + /* * Public data types */ @@ -97,7 +99,9 @@ struct value_list_s }; typedef struct value_list_s value_list_t; -#define VALUE_LIST_INIT { NULL, 0, 0, interval_g, "localhost", "", "", "", "", NULL } +#define VALUE_LIST_INIT { NULL, 0, 0, \ + (plugin_interval > 0) ? plugin_interval : interval_g, \ + "localhost", "", "", "", "", NULL } #define VALUE_LIST_STATIC { NULL, 0, 0, 0, "localhost", "", "", "", "", NULL } struct data_source_s @@ -161,6 +165,12 @@ struct user_data_s }; typedef struct user_data_s user_data_t; +struct plugin_ctx_s +{ + cdtime_t interval; +}; +typedef struct plugin_ctx_s plugin_ctx_t; + /* * Callback types */ @@ -359,4 +369,13 @@ int plugin_notification_meta_copy (notification_t *dst, int plugin_notification_meta_free (notification_meta_t *n); +/* + * Plugin context management. + */ + +void plugin_init_ctx (void); + +plugin_ctx_t plugin_get_ctx (void); +plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx); + #endif /* PLUGIN_H */ -- 2.30.2