diff --git a/src/perl.c b/src/perl.c
index 78e508ae4d1f1bb7c0b9ae5c9ca340b9685e79e6..682ff83284e6ddbe7b4fca0f236073b9f7f22f09 100644 (file)
--- a/src/perl.c
+++ b/src/perl.c
typedef struct c_ithread_s {
/* the thread's Perl interpreter */
PerlInterpreter *interp;
+ _Bool running; /* thread is inside pi */
+ _Bool shutdown;
+ pthread_t pthread;
/* double linked list of threads */
struct c_ithread_s *prev;
#endif /* COLLECT_DEBUG */
pthread_mutex_t mutex;
+ pthread_mutexattr_t mutexattr;
} c_ithread_list_t;
/* name / user_data for Perl matches / targets */
if (NULL == (tmp = hv_fetch (hash, "value", 5, 0))) {
log_warn ("av2notification_meta: Skipping invalid "
"meta information.");
- free ((*m)->name);
free (*m);
continue;
}
{
int retvals = 0;
+ _Bool old_running;
va_list ap;
int ret = 0;
dSP;
+ c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key);
+ if (t == NULL) /* thread destroyed ( c_ithread_destroy*() -> log_debug() ) */
+ return 0;
+
+ old_running = t->running;
+ t->running = 1;
+
+ if (t->shutdown) {
+ t->running = old_running;
+ return 0;
+ }
+
if ((type < 0) || (type >= PLUGIN_TYPES))
return -1;
FREETMPS;
LEAVE;
+ t->running = old_running;
va_end (ap);
return ret;
} /* static int pplugin_call_all (int, ...) */
/* the ithread no longer exists */
if (NULL == t)
+ {
+ pthread_mutex_unlock (&perl_threads->mutex);
return;
+ }
c_ithread_destroy (ithread);
t->prev = perl_threads->tail;
}
+ t->pthread = pthread_self();
+ t->running = 0;
+ t->shutdown = 0;
perl_threads->tail = t;
pthread_setspecific (perl_thr_key, (const void *)t);
{
int retvals = 0;
+ _Bool old_running;
va_list ap;
int ret = 0;
dSP;
+ c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key);
+ if (t == NULL) /* thread destroyed */
+ return 0;
+
+ old_running = t->running;
+ t->running = 1;
+
+ if (t->shutdown) {
+ t->running = old_running;
+ return 0;
+ }
+
if ((type < 0) || (type >= FC_TYPES))
return -1;
FREETMPS;
LEAVE;
+ t->running = old_running;
va_end (ap);
return ret;
} /* static int fc_call (int, int, pfc_user_data_t *, ...) */
values = ST (/* stack index = */ 0);
+ if (NULL == values)
+ XSRETURN_EMPTY;
+
/* Make sure the argument is a hash reference. */
if (! (SvROK (values) && (SVt_PVHV == SvTYPE (SvRV (values))))) {
log_err ("Collectd::plugin_dispatch_values: Invalid values.");
XSRETURN_EMPTY;
}
- if (NULL == values)
- XSRETURN_EMPTY;
-
ret = pplugin_dispatch_values (aTHX_ (HV *)SvRV (values));
if (0 == ret)
static int perl_init (void)
{
+ int status;
dTHX;
if (NULL == perl_threads)
log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)",
aTHX, perl_threads->number_of_threads);
- return pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+ /* Lock the base thread to avoid race conditions with c_ithread_create().
+ * See https://github.com/collectd/collectd/issues/9 and
+ * https://github.com/collectd/collectd/issues/1706 for details.
+ */
+ assert (aTHX == perl_threads->head->interp);
+ pthread_mutex_lock (&perl_threads->mutex);
+
+ status = pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ return status;
} /* static int perl_init (void) */
static int perl_read (void)
/* Lock the base thread if this is not called from one of the read threads
* to avoid race conditions with c_ithread_create(). See
- * https://github.com/collectd/collectd/issues/9 for details. */
+ * https://github.com/collectd/collectd/issues/9 for details.
+ */
+
if (aTHX == perl_threads->head->interp)
pthread_mutex_lock (&perl_threads->mutex);
return 0;
if (NULL == aTHX) {
- c_ithread_t *t = NULL;
+ t = NULL;
pthread_mutex_lock (&perl_threads->mutex);
t = c_ithread_create (perl_threads->head->interp);
t = perl_threads->tail;
while (NULL != t) {
+ struct timespec ts_wait;
c_ithread_t *thr = t;
/* the pointer has to be advanced before destroying
* the thread as this will free the memory */
t = t->prev;
+ thr->shutdown = 1;
+ if (thr->running) {
+ /* Give some time to thread to exit from pi */
+ WARNING ("perl shutdown: thread is running inside perl. Waiting.");
+ ts_wait.tv_sec = 0;
+ ts_wait.tv_nsec = 500000;
+ nanosleep (&ts_wait, NULL);
+ }
+ if (thr->running) {
+ /* This will crash collectd process later due to PERL_SYS_TERM() */
+ //ERROR ("perl shutdown: thread hangs inside perl. "
+ // "Skipped perl interpreter destroy.");
+ //continue;
+
+ ERROR ("perl shutdown: thread hangs inside perl. Thread killed.");
+ pthread_kill (thr->pthread, SIGTERM);
+ }
c_ithread_destroy (thr);
}
pthread_mutex_unlock (&perl_threads->mutex);
pthread_mutex_destroy (&perl_threads->mutex);
+ pthread_mutexattr_destroy (&perl_threads->mutexattr);
sfree (perl_threads);
perl_threads = (c_ithread_list_t *)smalloc (sizeof (c_ithread_list_t));
memset (perl_threads, 0, sizeof (c_ithread_list_t));
- pthread_mutex_init (&perl_threads->mutex, NULL);
+ pthread_mutexattr_init(&perl_threads->mutexattr);
+ pthread_mutexattr_settype(&perl_threads->mutexattr, PTHREAD_MUTEX_RECURSIVE);
+ pthread_mutex_init (&perl_threads->mutex, &perl_threads->mutexattr);
/* locking the mutex should not be necessary at this point
* but let's just do it for the sake of completeness */
pthread_mutex_lock (&perl_threads->mutex);
int current_status = 0;
if (NULL != perl_threads)
- aTHX = PERL_GET_CONTEXT;
+ {
+ if ((aTHX = PERL_GET_CONTEXT) == NULL)
+ return -1;
+ }
if (0 == strcasecmp (c->key, "LoadPlugin"))
current_status = perl_config_loadplugin (aTHX_ c);