From: Sebastian Harl Date: Tue, 13 Nov 2007 22:59:25 +0000 (+0100) Subject: perl plugin: Added basic multi-threading support. X-Git-Tag: collectd-4.3.0beta0~102 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=c4bc5aa6fe93b7113f649869d9bf3e9fd7b12ff2;p=collectd.git perl plugin: Added basic multi-threading support. This patch maps each C thread, which accesses the perl plugin, to an interpreter based Perl thread. The implementation has been inspired by Perl's ithreads introduced in version 5.6.0 and should be compatible (as in: can be used side-by-side) with it. You can use threads::shared to share data structures between threads of either implementation. Signed-off-by: Sebastian Harl --- diff --git a/src/perl.c b/src/perl.c index ead6f4b0..889ab561 100644 --- a/src/perl.c +++ b/src/perl.c @@ -45,6 +45,8 @@ #include "plugin.h" #include "common.h" +#include + #if !defined(USE_ITHREADS) # error "Perl does not support ithreads!" #endif /* !defined(USE_ITHREADS) */ @@ -72,13 +74,35 @@ static XS (Collectd_plugin_unregister_ds); static XS (Collectd_plugin_dispatch_values); static XS (Collectd_plugin_log); +/* + * private data types + */ + +typedef struct c_ithread_s { + /* the thread's Perl interpreter */ + PerlInterpreter *interp; + + /* double linked list of threads */ + struct c_ithread_s *prev; + struct c_ithread_s *next; +} c_ithread_t; + +typedef struct { + c_ithread_t *head; + c_ithread_t *tail; + + pthread_mutex_t mutex; +} c_ithread_list_t; + /* * private variables */ -static PerlInterpreter *perl = NULL; +/* if perl_threads != NULL perl_threads->head must + * point to the "base" thread */ +static c_ithread_list_t *perl_threads = NULL; -static int perl_argc = 0; +static int perl_argc = 0; static char **perl_argv = NULL; static char base_name[DATA_MAX_NAME_LEN] = ""; @@ -705,58 +729,118 @@ static XS (Collectd_plugin_log) XSRETURN_YES; } /* static XS (Collectd_plugin_log) */ +/* + * collectd's perl interpreter based thread implementation. + * + * This has been inspired by Perl's ithreads introduced in version 5.6.0. + */ + +/* must be called with perl_threads->mutex locked */ +static c_ithread_t *c_ithread_create (PerlInterpreter *base) +{ + c_ithread_t *t = NULL; + + assert (NULL != perl_threads); + + t = (c_ithread_t *)smalloc (sizeof (c_ithread_t)); + memset (t, 0, sizeof (c_ithread_t)); + + t->interp = (NULL == base) + ? NULL + : perl_clone (base, CLONEf_KEEP_PTR_TABLE); + + t->next = NULL; + + if (NULL == perl_threads->tail) { + perl_threads->head = t; + t->prev = NULL; + } + else { + perl_threads->tail->next = t; + t->prev = perl_threads->tail; + } + + perl_threads->tail = t; + return t; +} /* static c_ithread_t *c_ithread_create (PerlInterpreter *) */ + /* * Interface to collectd. */ static int perl_init (void) { - dTHXa (NULL); + dTHX; - if (NULL == perl) + if (NULL == perl_threads) return 0; - PERL_SET_CONTEXT (perl); - aTHX = perl; + if (NULL == aTHX) { + c_ithread_t *t = NULL; + + pthread_mutex_lock (&perl_threads->mutex); + t = c_ithread_create (perl_threads->head->interp); + pthread_mutex_unlock (&perl_threads->mutex); + aTHX = t->interp; + } return pplugin_call_all (aTHX_ PLUGIN_INIT); } /* static int perl_init (void) */ static int perl_read (void) { - dTHXa (NULL); + dTHX; - if (NULL == perl) + if (NULL == perl_threads) return 0; - PERL_SET_CONTEXT (perl); - aTHX = perl; + if (NULL == aTHX) { + c_ithread_t *t = NULL; + + pthread_mutex_lock (&perl_threads->mutex); + t = c_ithread_create (perl_threads->head->interp); + pthread_mutex_unlock (&perl_threads->mutex); + aTHX = t->interp; + } return pplugin_call_all (aTHX_ PLUGIN_READ); } /* static int perl_read (void) */ static int perl_write (const data_set_t *ds, const value_list_t *vl) { - dTHXa (NULL); + dTHX; - if (NULL == perl) + if (NULL == perl_threads) return 0; - PERL_SET_CONTEXT (perl); - aTHX = perl; + if (NULL == aTHX) { + c_ithread_t *t = NULL; + + pthread_mutex_lock (&perl_threads->mutex); + t = c_ithread_create (perl_threads->head->interp); + pthread_mutex_unlock (&perl_threads->mutex); + aTHX = t->interp; + } return pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl); } /* static int perl_write (const data_set_t *, const value_list_t *) */ static void perl_log (int level, const char *msg) { - dTHXa (NULL); + dTHX; - if (NULL == perl) + if (NULL == perl_threads) return; - PERL_SET_CONTEXT (perl); - aTHX = perl; + if (NULL == aTHX) { + c_ithread_t *t = NULL; + + pthread_mutex_lock (&perl_threads->mutex); + t = c_ithread_create (perl_threads->head->interp); + pthread_mutex_unlock (&perl_threads->mutex); + + aTHX = t->interp; + } pplugin_call_all (aTHX_ PLUGIN_LOG, level, msg); return; @@ -764,17 +848,26 @@ static void perl_log (int level, const char *msg) static int perl_shutdown (void) { + c_ithread_t *t = NULL; + int ret = 0; - dTHXa (NULL); + dTHX; plugin_unregister_complex_config ("perl"); - if (NULL == perl) + if (NULL == perl_threads) return 0; - PERL_SET_CONTEXT (perl); - aTHX = perl; + if (NULL == aTHX) { + c_ithread_t *t = NULL; + + pthread_mutex_lock (&perl_threads->mutex); + t = c_ithread_create (perl_threads->head->interp); + pthread_mutex_unlock (&perl_threads->mutex); + + aTHX = t->interp; + } plugin_unregister_log ("perl"); plugin_unregister_init ("perl"); @@ -783,13 +876,28 @@ static int perl_shutdown (void) ret = pplugin_call_all (aTHX_ PLUGIN_SHUTDOWN); + pthread_mutex_lock (&perl_threads->mutex); + t = perl_threads->tail; + + while (NULL != t) { + aTHX = t->interp; + PERL_SET_CONTEXT (aTHX); + #if COLLECT_DEBUG - sv_report_used (); + sv_report_used (); #endif /* COLLECT_DEBUG */ - perl_destruct (perl); - perl_free (perl); - perl = NULL; + perl_destruct (aTHX); + perl_free (aTHX); + + t = t->prev; + + sfree (t); + } + + pthread_mutex_unlock (&perl_threads->mutex); + + sfree (perl_threads); PERL_SYS_TERM (); @@ -827,7 +935,7 @@ static int init_pi (int argc, char **argv) { dTHXa (NULL); - if (NULL != perl) + if (NULL != perl_threads) return 0; log_info ("Initializing Perl interpreter..."); @@ -842,17 +950,30 @@ static int init_pi (int argc, char **argv) PERL_SYS_INIT3 (&argc, &argv, &environ); - if (NULL == (perl = perl_alloc ())) { + perl_threads = (c_ithread_list_t *)smalloc (sizeof (c_ithread_list_t)); + memset (perl_threads, 0, sizeof (c_ithread_list_t)); + + pthread_mutex_init (&perl_threads->mutex, NULL); + /* locking the mutex should not be necessary at this point + * but let's just do it for the sake of completeness */ + pthread_mutex_lock (&perl_threads->mutex); + + perl_threads->head = c_ithread_create (NULL); + perl_threads->tail = perl_threads->head; + + if (NULL == (perl_threads->head->interp = perl_alloc ())) { log_err ("module_register: Not enough memory."); exit (3); } - aTHX = perl; - perl_construct (perl); + aTHX = perl_threads->head->interp; + pthread_mutex_unlock (&perl_threads->mutex); + + perl_construct (aTHX); PL_exit_flags |= PERL_EXIT_DESTRUCT_END; - if (0 != perl_parse (perl, xs_init, argc, argv, NULL)) { + if (0 != perl_parse (aTHX_ xs_init, argc, argv, NULL)) { log_err ("module_register: Unable to bootstrap Collectd."); exit (1); } @@ -860,7 +981,7 @@ static int init_pi (int argc, char **argv) /* Set $0 to "collectd" because perl_parse() has to set it to "-e". */ sv_setpv (get_sv ("0", 0), "collectd"); - perl_run (perl); + perl_run (aTHX); plugin_register_log ("perl", perl_log); plugin_register_init ("perl", perl_init); @@ -893,7 +1014,10 @@ static int perl_config_loadplugin (pTHX_ oconfig_item_t *ci) } init_pi (perl_argc, perl_argv); - aTHX = perl; + assert (NULL != perl_threads); + assert (NULL != perl_threads->head); + + aTHX = perl_threads->head->interp; log_debug ("perl_config: loading perl plugin \"%s\"", value); load_module (PERL_LOADMOD_NOIMPORT, @@ -994,10 +1118,12 @@ static int perl_config (oconfig_item_t *ci) { int i = 0; - dTHXa (NULL); + dTHX; - PERL_SET_CONTEXT (perl); - aTHX = perl; + /* dTHX does not get any valid values in case Perl + * has not been initialized */ + if (NULL == perl_threads) + aTHX = NULL; for (i = 0; i < ci->children_num; ++i) { oconfig_item_t *c = ci->children + i;