summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 90760db)
raw | patch | inline | side by side (parent: 90760db)
author | Sebastian Harl <sh@tokkee.org> | |
Mon, 16 Feb 2009 13:59:57 +0000 (14:59 +0100) | ||
committer | Sebastian Harl <sh@tokkee.org> | |
Mon, 16 Feb 2009 14:22:05 +0000 (15:22 +0100) |
fc_register() may now be used to register matches and targets with collectd.
The function takes three arguments: type, name, proc. 'type' has to be any of
FC_MATCH or FC_TARGET, 'name' specifies the name of the target / match and
'proc' is a hash of callbacks (specified by name just like any other
callbacks). There are three types of callbacks: create, destroy and match /
invoke - the former two being optional.
The match / invoke callback has to return any of the following statuses:
FC_MATCH_NO_MATCH, FC_MATCH_MATCHES, FC_TARGET_CONTINUE, FC_TARGET_STOP or
FC_TARGET_RETURN. If none of those constants are used, the behavior is
undefined.
The function takes three arguments: type, name, proc. 'type' has to be any of
FC_MATCH or FC_TARGET, 'name' specifies the name of the target / match and
'proc' is a hash of callbacks (specified by name just like any other
callbacks). There are three types of callbacks: create, destroy and match /
invoke - the former two being optional.
The match / invoke callback has to return any of the following statuses:
FC_MATCH_NO_MATCH, FC_MATCH_MATCHES, FC_TARGET_CONTINUE, FC_TARGET_STOP or
FC_TARGET_RETURN. If none of those constants are used, the behavior is
undefined.
bindings/perl/Collectd.pm | patch | blob | history | |
src/perl.c | patch | blob | history |
index 6e53b0e70e033bca363cad78a444dbff43ed4a24..007367d179bca4e281a1422b116778fe51e207f0 100644 (file)
LOG_INFO
LOG_DEBUG
) ],
+ 'filter_chain' => [ qw(
+ fc_register
+ FC_MATCH_NO_MATCH
+ FC_MATCH_MATCHES
+ FC_TARGET_CONTINUE
+ FC_TARGET_STOP
+ FC_TARGET_RETURN
+ ) ],
+ 'fc_types' => [ qw(
+ FC_MATCH
+ FC_TARGET
+ ) ],
'notif' => [ qw(
NOTIF_FAILURE
NOTIF_WARNING
Exporter::export_ok_tags ('all');
my @plugins : shared = ();
+my @fc_plugins : shared = ();
my %cf_callbacks : shared = ();
my %types = (
TYPE_FLUSH, "flush"
);
+my %fc_types = (
+ FC_MATCH, "match",
+ FC_TARGET, "target"
+);
+
+my %fc_exec_names = (
+ FC_MATCH, "match",
+ FC_TARGET, "invoke"
+);
+
foreach my $type (keys %types) {
$plugins[$type] = &share ({});
}
+foreach my $type (keys %fc_types) {
+ $fc_plugins[$type] = &share ({});
+}
+
sub _log {
my $caller = shift;
my $lvl = shift;
plugin_flush (timeout => $timeout);
}
+sub fc_call {
+ my $type = shift;
+ my $name = shift;
+ my $cb_type = shift;
+
+ my %proc;
+
+ our $cb_name = undef;
+ my $status;
+
+ if (! ((defined $type) && (defined $name) && (defined $cb_type))) {
+ ERROR ("Usage: Collectd::fc_call(type, name, cb_type, ...)");
+ return;
+ }
+
+ if (! defined $fc_plugins[$type]) {
+ ERROR ("Collectd::fc_call: Invalid type \"$type\"");
+ return;
+ }
+
+ if (! defined $fc_plugins[$type]->{$name}) {
+ ERROR ("Collectd::fc_call: Unknown "
+ . ($type == FC_MATCH ? "match" : "target")
+ . " \"$name\"");
+ return;
+ }
+
+ DEBUG ("Collectd::fc_call: "
+ . "type = \"$type\", name = \"$name\", cb_type = \"$cb_type\"");
+
+ {
+ lock %{$fc_plugins[$type]};
+ %proc = %{$fc_plugins[$type]->{$name}};
+ }
+
+ if (FC_CB_EXEC == $cb_type) {
+ $cb_name = $proc{$fc_exec_names{$type}};
+ }
+ elsif (FC_CB_CREATE == $cb_type) {
+ if (defined $proc{'create'}) {
+ $cb_name = $proc{'create'};
+ }
+ else {
+ return 1;
+ }
+ }
+ elsif (FC_CB_DESTROY == $cb_type) {
+ if (defined $proc{'destroy'}) {
+ $cb_name = $proc{'destroy'};
+ }
+ else {
+ return 1;
+ }
+ }
+
+ $status = call_by_name (@_);
+
+ if ($status < 0) {
+ my $err = undef;
+
+ if ($@) {
+ $err = $@;
+ }
+ else {
+ $err = "callback returned false";
+ }
+
+ ERROR ("Execution of fc callback \"$cb_name\" failed: $err");
+ return;
+ }
+ return $status;
+}
+
+sub fc_register {
+ my $type = shift;
+ my $name = shift;
+ my $proc = shift;
+
+ my %fc : shared;
+
+ DEBUG ("Collectd::fc_register: "
+ . "type = \"$type\", name = \"$name\", proc = \"$proc\"");
+
+ if (! ((defined $type) && (defined $name) && (defined $proc))) {
+ ERROR ("Usage: Collectd::fc_register(type, name, proc)");
+ return;
+ }
+
+ if (! defined $fc_plugins[$type]) {
+ ERROR ("Collectd::fc_register: Invalid type \"$type\"");
+ return;
+ }
+
+ if (("HASH" ne ref ($proc)) || (! defined $proc->{$fc_exec_names{$type}})
+ || ("" ne ref ($proc->{$fc_exec_names{$type}}))) {
+ ERROR ("Collectd::fc_register: Invalid proc.");
+ return;
+ }
+
+ for my $p (qw( create destroy )) {
+ if ((defined $proc->{$p}) && ("" ne ref ($proc->{$p}))) {
+ ERROR ("Collectd::fc_register: Invalid proc.");
+ return;
+ }
+ }
+
+ %fc = %$proc;
+
+ foreach my $p (keys %fc) {
+ my $pkg = scalar caller;
+
+ if ($p !~ m/^(create|destroy|$fc_exec_names{$type})$/) {
+ next;
+ }
+
+ if ($fc{$p} !~ m/^$pkg\:\:/) {
+ $fc{$p} = $pkg . "::" . $fc{$p};
+ }
+ }
+
+ lock %{$fc_plugins[$type]};
+ if (defined $fc_plugins[$type]->{$name}) {
+ WARNING ("Collectd::fc_register: Overwriting previous "
+ . "definition of match \"$name\".");
+ }
+
+ if (! _fc_register ($type, $name)) {
+ ERROR ("Collectd::fc_register: Failed to register \"$name\".");
+ return;
+ }
+
+ $fc_plugins[$type]->{$name} = \%fc;
+ return 1;
+}
+
sub _plugin_dispatch_config {
my $plugin = shift;
my $config = shift;
diff --git a/src/perl.c b/src/perl.c
index 9f91d0181013a7e8493a7526f3405b989b793617..142e9c0eb34aaf4524d40f85ed5cc4d14d89222c 100644 (file)
--- a/src/perl.c
+++ b/src/perl.c
#include "plugin.h"
#include "common.h"
+#include "filter_chain.h"
+
#include <pthread.h>
#if !defined(USE_ITHREADS)
#define PLUGIN_CONFIG 254
#define PLUGIN_DATASET 255
+#define FC_MATCH 0
+#define FC_TARGET 1
+
+#define FC_TYPES 2
+
+#define FC_CB_CREATE 0
+#define FC_CB_DESTROY 1
+#define FC_CB_EXEC 2
+
+#define FC_CB_TYPES 3
+
#define log_debug(...) DEBUG ("perl: " __VA_ARGS__)
#define log_info(...) INFO ("perl: " __VA_ARGS__)
#define log_warn(...) WARNING ("perl: " __VA_ARGS__)
static XS (Collectd__plugin_flush);
static XS (Collectd_plugin_dispatch_notification);
static XS (Collectd_plugin_log);
+static XS (Collectd__fc_register);
static XS (Collectd_call_by_name);
/*
pthread_mutex_t mutex;
} c_ithread_list_t;
+/* name / user_data for Perl matches / targets */
+typedef struct {
+ char *name;
+ SV *user_data;
+} pfc_user_data_t;
+
+#define PFC_USER_DATA_FREE(data) \
+ do { \
+ sfree ((data)->name); \
+ if (NULL != (data)->user_data) \
+ sv_free ((data)->user_data); \
+ sfree (data); \
+ } while (0)
+
/*
* private variables
*/
{ "Collectd::plugin_dispatch_notification",
Collectd_plugin_dispatch_notification },
{ "Collectd::plugin_log", Collectd_plugin_log },
+ { "Collectd::_fc_register", Collectd__fc_register },
{ "Collectd::call_by_name", Collectd_call_by_name },
{ "", NULL }
};
int value;
} constants[] =
{
- { "Collectd::TYPE_INIT", PLUGIN_INIT },
- { "Collectd::TYPE_READ", PLUGIN_READ },
- { "Collectd::TYPE_WRITE", PLUGIN_WRITE },
- { "Collectd::TYPE_SHUTDOWN", PLUGIN_SHUTDOWN },
- { "Collectd::TYPE_LOG", PLUGIN_LOG },
- { "Collectd::TYPE_NOTIF", PLUGIN_NOTIF },
- { "Collectd::TYPE_FLUSH", PLUGIN_FLUSH },
- { "Collectd::TYPE_CONFIG", PLUGIN_CONFIG },
- { "Collectd::TYPE_DATASET", PLUGIN_DATASET },
- { "Collectd::DS_TYPE_COUNTER", DS_TYPE_COUNTER },
- { "Collectd::DS_TYPE_GAUGE", DS_TYPE_GAUGE },
- { "Collectd::LOG_ERR", LOG_ERR },
- { "Collectd::LOG_WARNING", LOG_WARNING },
- { "Collectd::LOG_NOTICE", LOG_NOTICE },
- { "Collectd::LOG_INFO", LOG_INFO },
- { "Collectd::LOG_DEBUG", LOG_DEBUG },
- { "Collectd::NOTIF_FAILURE", NOTIF_FAILURE },
- { "Collectd::NOTIF_WARNING", NOTIF_WARNING },
- { "Collectd::NOTIF_OKAY", NOTIF_OKAY },
+ { "Collectd::TYPE_INIT", PLUGIN_INIT },
+ { "Collectd::TYPE_READ", PLUGIN_READ },
+ { "Collectd::TYPE_WRITE", PLUGIN_WRITE },
+ { "Collectd::TYPE_SHUTDOWN", PLUGIN_SHUTDOWN },
+ { "Collectd::TYPE_LOG", PLUGIN_LOG },
+ { "Collectd::TYPE_NOTIF", PLUGIN_NOTIF },
+ { "Collectd::TYPE_FLUSH", PLUGIN_FLUSH },
+ { "Collectd::TYPE_CONFIG", PLUGIN_CONFIG },
+ { "Collectd::TYPE_DATASET", PLUGIN_DATASET },
+ { "Collectd::DS_TYPE_COUNTER", DS_TYPE_COUNTER },
+ { "Collectd::DS_TYPE_GAUGE", DS_TYPE_GAUGE },
+ { "Collectd::LOG_ERR", LOG_ERR },
+ { "Collectd::LOG_WARNING", LOG_WARNING },
+ { "Collectd::LOG_NOTICE", LOG_NOTICE },
+ { "Collectd::LOG_INFO", LOG_INFO },
+ { "Collectd::LOG_DEBUG", LOG_DEBUG },
+ { "Collectd::FC_MATCH", FC_MATCH },
+ { "Collectd::FC_TARGET", FC_TARGET },
+ { "Collectd::FC_CB_CREATE", FC_CB_CREATE },
+ { "Collectd::FC_CB_DESTROY", FC_CB_DESTROY },
+ { "Collectd::FC_CB_EXEC", FC_CB_EXEC },
+ { "Collectd::FC_MATCH_NO_MATCH", FC_MATCH_NO_MATCH },
+ { "Collectd::FC_MATCH_MATCHES", FC_MATCH_MATCHES },
+ { "Collectd::FC_TARGET_CONTINUE", FC_TARGET_CONTINUE },
+ { "Collectd::FC_TARGET_STOP", FC_TARGET_STOP },
+ { "Collectd::FC_TARGET_RETURN", FC_TARGET_RETURN },
+ { "Collectd::NOTIF_FAILURE", NOTIF_FAILURE },
+ { "Collectd::NOTIF_WARNING", NOTIF_WARNING },
+ { "Collectd::NOTIF_OKAY", NOTIF_OKAY },
{ "", 0 }
};
return ret;
} /* static int pplugin_call_all (int, ...) */
+/*
+ * collectd's perl interpreter based thread implementation.
+ *
+ * This has been inspired by Perl's ithreads introduced in version 5.6.0.
+ */
+
+/* must be called with perl_threads->mutex locked */
+static void c_ithread_destroy (c_ithread_t *ithread)
+{
+ dTHXa (ithread->interp);
+
+ assert (NULL != perl_threads);
+
+ PERL_SET_CONTEXT (aTHX);
+ log_debug ("Shutting down Perl interpreter %p...", aTHX);
+
+#if COLLECT_DEBUG
+ sv_report_used ();
+
+ --perl_threads->number_of_threads;
+#endif /* COLLECT_DEBUG */
+
+ perl_destruct (aTHX);
+ perl_free (aTHX);
+
+ if (NULL == ithread->prev)
+ perl_threads->head = ithread->next;
+ else
+ ithread->prev->next = ithread->next;
+
+ if (NULL == ithread->next)
+ perl_threads->tail = ithread->prev;
+ else
+ ithread->next->prev = ithread->prev;
+
+ sfree (ithread);
+ return;
+} /* static void c_ithread_destroy (c_ithread_t *) */
+
+static void c_ithread_destructor (void *arg)
+{
+ c_ithread_t *ithread = (c_ithread_t *)arg;
+ c_ithread_t *t = NULL;
+
+ if (NULL == perl_threads)
+ return;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+
+ for (t = perl_threads->head; NULL != t; t = t->next)
+ if (t == ithread)
+ break;
+
+ /* the ithread no longer exists */
+ if (NULL == t)
+ return;
+
+ c_ithread_destroy (ithread);
+
+ pthread_mutex_unlock (&perl_threads->mutex);
+ return;
+} /* static void c_ithread_destructor (void *) */
+
+/* must be called with perl_threads->mutex locked */
+static c_ithread_t *c_ithread_create (PerlInterpreter *base)
+{
+ c_ithread_t *t = NULL;
+ dTHXa (NULL);
+
+ assert (NULL != perl_threads);
+
+ t = (c_ithread_t *)smalloc (sizeof (c_ithread_t));
+ memset (t, 0, sizeof (c_ithread_t));
+
+ t->interp = (NULL == base)
+ ? NULL
+ : perl_clone (base, CLONEf_KEEP_PTR_TABLE);
+
+ aTHX = t->interp;
+
+ if ((NULL != base) && (NULL != PL_endav)) {
+ av_clear (PL_endav);
+ av_undef (PL_endav);
+ PL_endav = Nullav;
+ }
+
+#if COLLECT_DEBUG
+ ++perl_threads->number_of_threads;
+#endif /* COLLECT_DEBUG */
+
+ t->next = NULL;
+
+ if (NULL == perl_threads->tail) {
+ perl_threads->head = t;
+ t->prev = NULL;
+ }
+ else {
+ perl_threads->tail->next = t;
+ t->prev = perl_threads->tail;
+ }
+
+ perl_threads->tail = t;
+
+ pthread_setspecific (perl_thr_key, (const void *)t);
+ return t;
+} /* static c_ithread_t *c_ithread_create (PerlInterpreter *) */
+
+/*
+ * Filter chains implementation.
+ */
+
+static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
+{
+ int retvals = 0;
+
+ va_list ap;
+ int ret = 0;
+
+ notification_meta_t **meta = NULL;
+ AV *pmeta = NULL;
+
+ dSP;
+
+ if ((type < 0) || (type >= FC_TYPES))
+ return -1;
+
+ if ((cb_type < 0) || (cb_type >= FC_CB_TYPES))
+ return -1;
+
+ va_start (ap, data);
+
+ ENTER;
+ SAVETMPS;
+
+ PUSHMARK (SP);
+
+ XPUSHs (sv_2mortal (newSViv ((IV)type)));
+ XPUSHs (sv_2mortal (newSVpv (data->name, 0)));
+ XPUSHs (sv_2mortal (newSViv ((IV)cb_type)));
+
+ if (FC_CB_CREATE == cb_type) {
+ /*
+ * $_[0] = $ci;
+ * $_[1] = $user_data;
+ */
+ oconfig_item_t *ci;
+ HV *config = newHV ();
+
+ ci = va_arg (ap, oconfig_item_t *);
+
+ if (0 != oconfig_item2hv (aTHX_ ci, config)) {
+ hv_clear (config);
+ hv_undef (config);
+ config = (HV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)config)));
+ }
+ else if (FC_CB_DESTROY == cb_type) {
+ /*
+ * $_[1] = $user_data;
+ */
+
+ /* nothing to be done - the user data pointer
+ * is pushed onto the stack later */
+ }
+ else if (FC_CB_EXEC == cb_type) {
+ /*
+ * $_[0] = $ds;
+ * $_[1] = $vl;
+ * $_[2] = $meta;
+ * $_[3] = $user_data;
+ */
+ data_set_t *ds;
+ value_list_t *vl;
+
+ AV *pds = newAV ();
+ HV *pvl = newHV ();
+
+ ds = va_arg (ap, data_set_t *);
+ vl = va_arg (ap, value_list_t *);
+ meta = va_arg (ap, notification_meta_t **);
+
+ if (0 != data_set2av (aTHX_ ds, pds)) {
+ av_clear (pds);
+ av_undef (pds);
+ pds = (AV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ if (0 != value_list2hv (aTHX_ vl, ds, pvl)) {
+ hv_clear (pvl);
+ hv_undef (pvl);
+ pvl = (HV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ if (NULL != meta) {
+ pmeta = newAV ();
+
+ if (0 != notification_meta2av (aTHX_ *meta, pmeta)) {
+ av_clear (pmeta);
+ av_undef (pmeta);
+ pmeta = (AV *)&PL_sv_undef;
+ ret = -1;
+ }
+ }
+ else {
+ pmeta = (AV *)&PL_sv_undef;
+ }
+
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pds)));
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pvl)));
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pmeta)));
+ }
+
+ XPUSHs (sv_2mortal (newRV_inc (data->user_data)));
+
+ PUTBACK;
+
+ retvals = call_pv ("Collectd::fc_call", G_SCALAR);
+
+ if ((FC_CB_EXEC == cb_type) && (meta != NULL)) {
+ assert (pmeta != NULL);
+
+ plugin_notification_meta_free (*meta);
+ av2notification_meta (aTHX_ pmeta, meta);
+ }
+
+ SPAGAIN;
+ if (0 < retvals) {
+ SV *tmp = POPs;
+
+ /* the exec callbacks return a status, while
+ * the others return a boolean value */
+ if (FC_CB_EXEC == cb_type)
+ ret = SvIV (tmp);
+ else if (! SvTRUE (tmp))
+ ret = -1;
+ }
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+
+ va_end (ap);
+ return ret;
+} /* static int fc_call (int, int, pfc_user_data_t *, ...) */
+
+static int fc_create (int type, const oconfig_item_t *ci, void **user_data)
+{
+ pfc_user_data_t *data;
+
+ int ret = 0;
+
+ dTHX;
+
+ if (NULL == perl_threads)
+ return 0;
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_create: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ if ((1 != ci->values_num)
+ || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
+ log_warn ("A \"%s\" block expects a single string argument.",
+ (FC_MATCH == type) ? "Match" : "Target");
+ return -1;
+ }
+
+ data = (pfc_user_data_t *)smalloc (sizeof (*data));
+ data->name = sstrdup (ci->values[0].value.string);
+ data->user_data = newSV (0);
+
+ ret = fc_call (aTHX_ type, FC_CB_CREATE, data, ci);
+
+ if (0 != ret)
+ PFC_USER_DATA_FREE (data);
+ else
+ *user_data = data;
+ return ret;
+} /* static int fc_create (int, const oconfig_item_t *, void **) */
+
+static int fc_destroy (int type, void **user_data)
+{
+ pfc_user_data_t *data = *(pfc_user_data_t **)user_data;
+
+ int ret = 0;
+
+ dTHX;
+
+ if ((NULL == perl_threads) || (NULL == data))
+ return 0;
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_destroy: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ ret = fc_call (aTHX_ type, FC_CB_DESTROY, data);
+
+ PFC_USER_DATA_FREE (data);
+ *user_data = NULL;
+ return ret;
+} /* static int fc_destroy (int, void **) */
+
+static int fc_exec (int type, const data_set_t *ds, const value_list_t *vl,
+ notification_meta_t **meta, void **user_data)
+{
+ pfc_user_data_t *data = *(pfc_user_data_t **)user_data;
+
+ dTHX;
+
+ if (NULL == perl_threads)
+ return 0;
+
+ assert (NULL != data);
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_exec: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ return fc_call (aTHX_ type, FC_CB_EXEC, data, ds, vl, meta);
+} /* static int fc_exec (int, const data_set_t *, const value_list_t *,
+ notification_meta_t **, void **) */
+
+static int pmatch_create (const oconfig_item_t *ci, void **user_data)
+{
+ return fc_create (FC_MATCH, ci, user_data);
+} /* static int pmatch_create (const oconfig_item_t *, void **) */
+
+static int pmatch_destroy (void **user_data)
+{
+ return fc_destroy (FC_MATCH, user_data);
+} /* static int pmatch_destroy (void **) */
+
+static int pmatch_match (const data_set_t *ds, const value_list_t *vl,
+ notification_meta_t **meta, void **user_data)
+{
+ return fc_exec (FC_MATCH, ds, vl, meta, user_data);
+} /* static int pmatch_match (const data_set_t *, const value_list_t *,
+ notification_meta_t **, void **) */
+
+static match_proc_t pmatch = {
+ pmatch_create, pmatch_destroy, pmatch_match
+};
+
+static int ptarget_create (const oconfig_item_t *ci, void **user_data)
+{
+ return fc_create (FC_TARGET, ci, user_data);
+} /* static int ptarget_create (const oconfig_item_t *, void **) */
+
+static int ptarget_destroy (void **user_data)
+{
+ return fc_destroy (FC_TARGET, user_data);
+} /* static int ptarget_destroy (void **) */
+
+static int ptarget_invoke (const data_set_t *ds, value_list_t *vl,
+ notification_meta_t **meta, void **user_data)
+{
+ return fc_exec (FC_TARGET, ds, vl, meta, user_data);
+} /* static int ptarget_invoke (const data_set_t *, value_list_t *,
+ notification_meta_t **, void **) */
+
+static target_proc_t ptarget = {
+ ptarget_create, ptarget_destroy, ptarget_invoke
+};
+
/*
* Exported Perl API.
*/
XSRETURN_YES;
} /* static XS (Collectd_plugin_log) */
+/*
+ * Collectd::_fc_register (type, name)
+ *
+ * type:
+ * match | target
+ *
+ * name:
+ * name of the match
+ */
+static XS (Collectd__fc_register)
+{
+ int type;
+ char *name;
+
+ int ret = 0;
+
+ dXSARGS;
+
+ if (2 != items) {
+ log_err ("Usage: Collectd::_fc_register(type, name)");
+ XSRETURN_EMPTY;
+ }
+
+ type = SvIV (ST (0));
+ name = SvPV_nolen (ST (1));
+
+ if (FC_MATCH == type)
+ ret = fc_register_match (name, pmatch);
+ else if (FC_TARGET == type)
+ ret = fc_register_target (name, ptarget);
+
+ if (0 == ret)
+ XSRETURN_YES;
+ else
+ XSRETURN_EMPTY;
+} /* static XS (Collectd_fc_register) */
+
/*
* Collectd::call_by_name (...).
*
call_pv (name, 0);
} /* static XS (Collectd_call_by_name) */
-/*
- * collectd's perl interpreter based thread implementation.
- *
- * This has been inspired by Perl's ithreads introduced in version 5.6.0.
- */
-
-/* must be called with perl_threads->mutex locked */
-static void c_ithread_destroy (c_ithread_t *ithread)
-{
- dTHXa (ithread->interp);
-
- assert (NULL != perl_threads);
-
- PERL_SET_CONTEXT (aTHX);
- log_debug ("Shutting down Perl interpreter %p...", aTHX);
-
-#if COLLECT_DEBUG
- sv_report_used ();
-
- --perl_threads->number_of_threads;
-#endif /* COLLECT_DEBUG */
-
- perl_destruct (aTHX);
- perl_free (aTHX);
-
- if (NULL == ithread->prev)
- perl_threads->head = ithread->next;
- else
- ithread->prev->next = ithread->next;
-
- if (NULL == ithread->next)
- perl_threads->tail = ithread->prev;
- else
- ithread->next->prev = ithread->prev;
-
- sfree (ithread);
- return;
-} /* static void c_ithread_destroy (c_ithread_t *) */
-
-static void c_ithread_destructor (void *arg)
-{
- c_ithread_t *ithread = (c_ithread_t *)arg;
- c_ithread_t *t = NULL;
-
- if (NULL == perl_threads)
- return;
-
- pthread_mutex_lock (&perl_threads->mutex);
-
- for (t = perl_threads->head; NULL != t; t = t->next)
- if (t == ithread)
- break;
-
- /* the ithread no longer exists */
- if (NULL == t)
- return;
-
- c_ithread_destroy (ithread);
-
- pthread_mutex_unlock (&perl_threads->mutex);
- return;
-} /* static void c_ithread_destructor (void *) */
-
-/* must be called with perl_threads->mutex locked */
-static c_ithread_t *c_ithread_create (PerlInterpreter *base)
-{
- c_ithread_t *t = NULL;
- dTHXa (NULL);
-
- assert (NULL != perl_threads);
-
- t = (c_ithread_t *)smalloc (sizeof (c_ithread_t));
- memset (t, 0, sizeof (c_ithread_t));
-
- t->interp = (NULL == base)
- ? NULL
- : perl_clone (base, CLONEf_KEEP_PTR_TABLE);
-
- aTHX = t->interp;
-
- if ((NULL != base) && (NULL != PL_endav)) {
- av_clear (PL_endav);
- av_undef (PL_endav);
- PL_endav = Nullav;
- }
-
-#if COLLECT_DEBUG
- ++perl_threads->number_of_threads;
-#endif /* COLLECT_DEBUG */
-
- t->next = NULL;
-
- if (NULL == perl_threads->tail) {
- perl_threads->head = t;
- t->prev = NULL;
- }
- else {
- perl_threads->tail->next = t;
- t->prev = perl_threads->tail;
- }
-
- perl_threads->tail = t;
-
- pthread_setspecific (perl_thr_key, (const void *)t);
- return t;
-} /* static c_ithread_t *c_ithread_create (PerlInterpreter *) */
-
/*
* Interface to collectd.
*/