X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fperl.c;h=96e85622445da09a7b9ad8deae514e215d279e6f;hb=ef493fef09db7227dcaedc2a3cae4a4a1ee4e1a9;hp=05e4b522d9c8c9fc7744b2ed6ea00effd5d463b7;hpb=cad67e6fcbaf2cad7703ef4f83a92dbd87d44cde;p=collectd.git diff --git a/src/perl.c b/src/perl.c index 05e4b522..96e85622 100644 --- a/src/perl.c +++ b/src/perl.c @@ -1,6 +1,6 @@ /** * collectd - src/perl.c - * Copyright (C) 2007 Sebastian Harl + * Copyright (C) 2007, 2008 Sebastian Harl * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -60,8 +60,9 @@ #define PLUGIN_WRITE 2 #define PLUGIN_SHUTDOWN 3 #define PLUGIN_LOG 4 +#define PLUGIN_NOTIF 5 -#define PLUGIN_TYPES 5 +#define PLUGIN_TYPES 6 #define PLUGIN_DATASET 255 @@ -76,6 +77,7 @@ void boot_DynaLoader (PerlInterpreter *, CV *); static XS (Collectd_plugin_register_ds); static XS (Collectd_plugin_unregister_ds); static XS (Collectd_plugin_dispatch_values); +static XS (Collectd_plugin_dispatch_notification); static XS (Collectd_plugin_log); static XS (Collectd_call_by_name); @@ -112,6 +114,9 @@ typedef struct { * point to the "base" thread */ static c_ithread_list_t *perl_threads = NULL; +/* the key used to store each pthread's ithread */ +static pthread_key_t perl_thr_key; + static int perl_argc = 0; static char **perl_argv = NULL; @@ -125,6 +130,8 @@ static struct { { "Collectd::plugin_register_data_set", Collectd_plugin_register_ds }, { "Collectd::plugin_unregister_data_set", Collectd_plugin_unregister_ds }, { "Collectd::plugin_dispatch_values", Collectd_plugin_dispatch_values }, + { "Collectd::plugin_dispatch_notification", + Collectd_plugin_dispatch_notification }, { "Collectd::plugin_log", Collectd_plugin_log }, { "Collectd::call_by_name", Collectd_call_by_name }, { "", NULL } @@ -140,6 +147,7 @@ struct { { "Collectd::TYPE_WRITE", PLUGIN_WRITE }, { "Collectd::TYPE_SHUTDOWN", PLUGIN_SHUTDOWN }, { "Collectd::TYPE_LOG", PLUGIN_LOG }, + { "Collectd::TYPE_NOTIF", PLUGIN_NOTIF }, { "Collectd::TYPE_DATASET", PLUGIN_DATASET }, { "Collectd::DS_TYPE_COUNTER", DS_TYPE_COUNTER }, { "Collectd::DS_TYPE_GAUGE", DS_TYPE_GAUGE }, @@ -148,9 +156,30 @@ struct { { "Collectd::LOG_NOTICE", LOG_NOTICE }, { "Collectd::LOG_INFO", LOG_INFO }, { "Collectd::LOG_DEBUG", LOG_DEBUG }, + { "Collectd::NOTIF_FAILURE", NOTIF_FAILURE }, + { "Collectd::NOTIF_WARNING", NOTIF_WARNING }, + { "Collectd::NOTIF_OKAY", NOTIF_OKAY }, { "", 0 } }; +struct { + char name[64]; + char *var; +} g_strings[] = +{ + { "Collectd::hostname_g", hostname_g }, + { "", NULL } +}; + +struct { + char name[64]; + int *var; +} g_integers[] = +{ + { "Collectd::interval_g", &interval_g }, + { "", NULL } +}; + /* * Helper functions for data type conversion. */ @@ -345,6 +374,43 @@ static int value_list2hv (pTHX_ value_list_t *vl, data_set_t *ds, HV *hash) return 0; } /* static int value2av (value_list_t *, data_set_t *, HV *) */ +static int notification2hv (pTHX_ notification_t *n, HV *hash) +{ + if (NULL == hv_store (hash, "severity", 8, newSViv (n->severity), 0)) + return -1; + + if (0 != n->time) + if (NULL == hv_store (hash, "time", 4, newSViv (n->time), 0)) + return -1; + + if ('\0' != *n->message) + if (NULL == hv_store (hash, "message", 7, newSVpv (n->message, 0), 0)) + return -1; + + if ('\0' != *n->host) + if (NULL == hv_store (hash, "host", 4, newSVpv (n->host, 0), 0)) + return -1; + + if ('\0' != *n->plugin) + if (NULL == hv_store (hash, "plugin", 6, newSVpv (n->plugin, 0), 0)) + return -1; + + if ('\0' != *n->plugin_instance) + if (NULL == hv_store (hash, "plugin_instance", 15, + newSVpv (n->plugin_instance, 0), 0)) + return -1; + + if ('\0' != *n->type) + if (NULL == hv_store (hash, "type", 4, newSVpv (n->type, 0), 0)) + return -1; + + if ('\0' != *n->type_instance) + if (NULL == hv_store (hash, "type_instance", 13, + newSVpv (n->type_instance, 0), 0)) + return -1; + return 0; +} /* static int notification2hv (notification_t *, HV *) */ + /* * Internal functions. */ @@ -355,7 +421,7 @@ static char *get_module_name (char *buf, size_t buf_len, const char *module) { status = snprintf (buf, buf_len, "%s", module); else status = snprintf (buf, buf_len, "%s::%s", base_name, module); - if ((status < 0) || (status >= buf_len)) + if ((status < 0) || ((unsigned int)status >= buf_len)) return (NULL); buf[buf_len - 1] = '\0'; return (buf); @@ -367,6 +433,7 @@ static char *get_module_name (char *buf, size_t buf_len, const char *module) { static int pplugin_register_data_set (pTHX_ char *name, AV *dataset) { int len = -1; + int ret = 0; int i = 0; data_source_t *ds = NULL; @@ -407,7 +474,12 @@ static int pplugin_register_data_set (pTHX_ char *name, AV *dataset) set->ds_num = len + 1; set->ds = ds; - return plugin_register_data_set (set); + + ret = plugin_register_data_set (set); + + free (ds); + free (set); + return ret; } /* static int pplugin_register_data_set (char *, SV *) */ /* @@ -489,8 +561,7 @@ static int pplugin_dispatch_values (pTHX_ char *name, HV *values) list.plugin[DATA_MAX_NAME_LEN - 1] = '\0'; } - if (NULL != (tmp = hv_fetch (values, - "plugin_instance", 15, 0))) { + if (NULL != (tmp = hv_fetch (values, "plugin_instance", 15, 0))) { strncpy (list.plugin_instance, SvPV_nolen (*tmp), DATA_MAX_NAME_LEN); list.plugin_instance[DATA_MAX_NAME_LEN - 1] = '\0'; } @@ -506,6 +577,71 @@ static int pplugin_dispatch_values (pTHX_ char *name, HV *values) return ret; } /* static int pplugin_dispatch_values (char *, HV *) */ +/* + * Dispatch a notification. + * + * notification: + * { + * severity => $severity, + * time => $time, + * message => $msg, + * host => $host, + * plugin => $plugin, + * type => $type, + * plugin_instance => $instance, + * type_instance => $type_instance + * } + */ +static int pplugin_dispatch_notification (pTHX_ HV *notif) +{ + notification_t n; + + SV **tmp = NULL; + + if (NULL == notif) + return -1; + + memset (&n, 0, sizeof (n)); + + if (NULL != (tmp = hv_fetch (notif, "severity", 8, 0))) + n.severity = SvIV (*tmp); + else + n.severity = NOTIF_FAILURE; + + if (NULL != (tmp = hv_fetch (notif, "time", 4, 0))) + n.time = (time_t)SvIV (*tmp); + else + n.time = time (NULL); + + if (NULL != (tmp = hv_fetch (notif, "message", 7, 0))) + strncpy (n.message, SvPV_nolen (*tmp), sizeof (n.message)); + n.message[sizeof (n.message) - 1] = '\0'; + + if (NULL != (tmp = hv_fetch (notif, "host", 4, 0))) + strncpy (n.host, SvPV_nolen (*tmp), sizeof (n.host)); + else + strncpy (n.host, hostname_g, sizeof (n.host)); + n.host[sizeof (n.host) - 1] = '\0'; + + if (NULL != (tmp = hv_fetch (notif, "plugin", 6, 0))) + strncpy (n.plugin, SvPV_nolen (*tmp), sizeof (n.plugin)); + n.plugin[sizeof (n.plugin) - 1] = '\0'; + + if (NULL != (tmp = hv_fetch (notif, "plugin_instance", 15, 0))) + strncpy (n.plugin_instance, SvPV_nolen (*tmp), + sizeof (n.plugin_instance)); + n.plugin_instance[sizeof (n.plugin_instance) - 1] = '\0'; + + if (NULL != (tmp = hv_fetch (notif, "type", 4, 0))) + strncpy (n.type, SvPV_nolen (*tmp), sizeof (n.type)); + n.type[sizeof (n.type) - 1] = '\0'; + + if (NULL != (tmp = hv_fetch (notif, "type_instance", 13, 0))) + strncpy (n.type_instance, SvPV_nolen (*tmp), sizeof (n.type_instance)); + n.type_instance[sizeof (n.type_instance) - 1] = '\0'; + return plugin_dispatch_notification (&n); +} /* static int pplugin_dispatch_notification (HV *) */ + /* * Call all working functions of the given type. */ @@ -564,11 +700,19 @@ static int pplugin_call_all (pTHX_ int type, ...) ds = va_arg (ap, data_set_t *); vl = va_arg (ap, value_list_t *); - if (-1 == data_set2av (aTHX_ ds, pds)) - return -1; + if (-1 == data_set2av (aTHX_ ds, pds)) { + av_clear (pds); + av_undef (pds); + pds = Nullav; + ret = -1; + } - if (-1 == value_list2hv (aTHX_ vl, ds, pvl)) - return -1; + if (-1 == value_list2hv (aTHX_ vl, ds, pvl)) { + hv_clear (pvl); + hv_undef (pvl); + pvl = Nullhv; + ret = -1; + } XPUSHs (sv_2mortal (newSVpv (ds->type, 0))); XPUSHs (sv_2mortal (newRV_noinc ((SV *)pds))); @@ -583,6 +727,34 @@ static int pplugin_call_all (pTHX_ int type, ...) XPUSHs (sv_2mortal (newSViv (va_arg (ap, int)))); XPUSHs (sv_2mortal (newSVpv (va_arg (ap, char *), 0))); } + else if (PLUGIN_NOTIF == type) { + /* + * $_[0] = + * { + * severity => $severity, + * time => $time, + * message => $msg, + * host => $host, + * plugin => $plugin, + * type => $type, + * plugin_instance => $instance, + * type_instance => $type_instance + * }; + */ + notification_t *n; + HV *notif = newHV (); + + n = va_arg (ap, notification_t *); + + if (-1 == notification2hv (aTHX_ n, notif)) { + hv_clear (notif); + hv_undef (notif); + notif = Nullhv; + ret = -1; + } + + XPUSHs (sv_2mortal (newRV_noinc ((SV *)notif))); + } PUTBACK; @@ -667,7 +839,7 @@ static XS (Collectd_plugin_unregister_ds) log_debug ("Collectd::plugin_unregister_data_set: type = \"%s\"", SvPV_nolen (ST (0))); - if (0 == pplugin_unregister_data_set (SvPV_nolen (ST (1)))) + if (0 == pplugin_unregister_data_set (SvPV_nolen (ST (0)))) XSRETURN_YES; else XSRETURN_EMPTY; @@ -718,6 +890,43 @@ static XS (Collectd_plugin_dispatch_values) XSRETURN_EMPTY; } /* static XS (Collectd_plugin_dispatch_values) */ +/* + * Collectd::plugin_dispatch_notification (notif). + * + * notif: + * notification to dispatch + */ +static XS (Collectd_plugin_dispatch_notification) +{ + SV *notif = NULL; + + int ret = 0; + + dXSARGS; + + if (1 != items) { + log_err ("Usage: Collectd::plugin_dispatch_notification(notif)"); + XSRETURN_EMPTY; + } + + log_debug ("Collectd::plugin_dispatch_notification: notif = \"%s\"", + SvPV_nolen (ST (0))); + + notif = ST (0); + + if (! (SvROK (notif) && (SVt_PVHV == SvTYPE (SvRV (notif))))) { + log_err ("Collectd::plugin_dispatch_notification: Invalid notif."); + XSRETURN_EMPTY; + } + + ret = pplugin_dispatch_notification (aTHX_ (HV *)SvRV (notif)); + + if (0 == ret) + XSRETURN_YES; + else + XSRETURN_EMPTY; +} /* static XS (Collectd_plugin_dispatch_notification) */ + /* * Collectd::plugin_log (level, message). * @@ -775,10 +984,68 @@ static XS (Collectd_call_by_name) * This has been inspired by Perl's ithreads introduced in version 5.6.0. */ +/* must be called with perl_threads->mutex locked */ +static void c_ithread_destroy (c_ithread_t *ithread) +{ + dTHXa (ithread->interp); + + assert (NULL != perl_threads); + + PERL_SET_CONTEXT (aTHX); + log_debug ("Shutting down Perl interpreter %p...", aTHX); + +#if COLLECT_DEBUG + sv_report_used (); + + --perl_threads->number_of_threads; +#endif /* COLLECT_DEBUG */ + + perl_destruct (aTHX); + perl_free (aTHX); + + if (NULL == ithread->prev) + perl_threads->head = ithread->next; + else + ithread->prev->next = ithread->next; + + if (NULL == ithread->next) + perl_threads->tail = ithread->prev; + else + ithread->next->prev = ithread->prev; + + sfree (ithread); + return; +} /* static void c_ithread_destroy (c_ithread_t *) */ + +static void c_ithread_destructor (void *arg) +{ + c_ithread_t *ithread = (c_ithread_t *)arg; + c_ithread_t *t = NULL; + + if (NULL == perl_threads) + return; + + pthread_mutex_lock (&perl_threads->mutex); + + for (t = perl_threads->head; NULL != t; t = t->next) + if (t == ithread) + break; + + /* the ithread no longer exists */ + if (NULL == t) + return; + + c_ithread_destroy (ithread); + + pthread_mutex_unlock (&perl_threads->mutex); + return; +} /* static void c_ithread_destructor (void *) */ + /* must be called with perl_threads->mutex locked */ static c_ithread_t *c_ithread_create (PerlInterpreter *base) { c_ithread_t *t = NULL; + dTHXa (NULL); assert (NULL != perl_threads); @@ -789,6 +1056,14 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base) ? NULL : perl_clone (base, CLONEf_KEEP_PTR_TABLE); + aTHX = t->interp; + + if (NULL != base) { + av_clear (PL_endav); + av_undef (PL_endav); + PL_endav = Nullav; + } + #if COLLECT_DEBUG ++perl_threads->number_of_threads; #endif /* COLLECT_DEBUG */ @@ -805,6 +1080,8 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base) } perl_threads->tail = t; + + pthread_setspecific (perl_thr_key, (const void *)t); return t; } /* static c_ithread_t *c_ithread_create (PerlInterpreter *) */ @@ -829,7 +1106,7 @@ static int perl_init (void) aTHX = t->interp; } - log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)\n", + log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); return pplugin_call_all (aTHX_ PLUGIN_INIT); } /* static int perl_init (void) */ @@ -851,7 +1128,7 @@ static int perl_read (void) aTHX = t->interp; } - log_debug ("perl_read: c_ithread: interp = %p (active threads: %i)\n", + log_debug ("perl_read: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); return pplugin_call_all (aTHX_ PLUGIN_READ); } /* static int perl_read (void) */ @@ -873,7 +1150,7 @@ static int perl_write (const data_set_t *ds, const value_list_t *vl) aTHX = t->interp; } - log_debug ("perl_write: c_ithread: interp = %p (active threads: %i)\n", + log_debug ("perl_write: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); return pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl); } /* static int perl_write (const data_set_t *, const value_list_t *) */ @@ -899,6 +1176,25 @@ static void perl_log (int level, const char *msg) return; } /* static void perl_log (int, const char *) */ +static int perl_notify (const notification_t *notif) +{ + dTHX; + + if (NULL == perl_threads) + return 0; + + if (NULL == aTHX) { + c_ithread_t *t = NULL; + + pthread_mutex_lock (&perl_threads->mutex); + t = c_ithread_create (perl_threads->head->interp); + pthread_mutex_unlock (&perl_threads->mutex); + + aTHX = t->interp; + } + return pplugin_call_all (aTHX_ PLUGIN_NOTIF, notif); +} /* static int perl_notify (const notification_t *) */ + static int perl_shutdown (void) { c_ithread_t *t = NULL; @@ -922,10 +1218,11 @@ static int perl_shutdown (void) aTHX = t->interp; } - log_debug ("perl_shutdown: c_ithread: interp = %p (active threads: %i)\n", + log_debug ("perl_shutdown: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); plugin_unregister_log ("perl"); + plugin_unregister_notification ("perl"); plugin_unregister_init ("perl"); plugin_unregister_read ("perl"); plugin_unregister_write ("perl"); @@ -936,35 +1233,72 @@ static int perl_shutdown (void) t = perl_threads->tail; while (NULL != t) { - aTHX = t->interp; - PERL_SET_CONTEXT (aTHX); - -#if COLLECT_DEBUG - sv_report_used (); -#endif /* COLLECT_DEBUG */ - - perl_destruct (aTHX); - perl_free (aTHX); + c_ithread_t *thr = t; + /* the pointer has to be advanced before destroying + * the thread as this will free the memory */ t = t->prev; - sfree (t); + c_ithread_destroy (thr); } pthread_mutex_unlock (&perl_threads->mutex); + pthread_mutex_destroy (&perl_threads->mutex); sfree (perl_threads); + pthread_key_delete (perl_thr_key); + PERL_SYS_TERM (); plugin_unregister_shutdown ("perl"); return ret; } /* static void perl_shutdown (void) */ +/* + * Access functions for global variables. + * + * These functions implement the "magic" used to access + * the global variables from Perl. + */ + +static int g_pv_get (pTHX_ SV *var, MAGIC *mg) +{ + char *pv = mg->mg_ptr; + sv_setpv (var, pv); + return 0; +} /* static int g_pv_get (pTHX_ SV *, MAGIC *) */ + +static int g_pv_set (pTHX_ SV *var, MAGIC *mg) +{ + char *pv = mg->mg_ptr; + strncpy (pv, SvPV_nolen (var), DATA_MAX_NAME_LEN); + pv[DATA_MAX_NAME_LEN - 1] = '\0'; + return 0; +} /* static int g_pv_set (pTHX_ SV *, MAGIC *) */ + +static int g_iv_get (pTHX_ SV *var, MAGIC *mg) +{ + int *iv = (int *)mg->mg_ptr; + sv_setiv (var, *iv); + return 0; +} /* static int g_iv_get (pTHX_ SV *, MAGIC *) */ + +static int g_iv_set (pTHX_ SV *var, MAGIC *mg) +{ + int *iv = (int *)mg->mg_ptr; + *iv = (int)SvIV (var); + return 0; +} /* static int g_iv_set (pTHX_ SV *, MAGIC *) */ + +static MGVTBL g_pv_vtbl = { g_pv_get, g_pv_set, NULL, NULL, NULL, NULL, NULL }; +static MGVTBL g_iv_vtbl = { g_iv_get, g_iv_set, NULL, NULL, NULL, NULL, NULL }; + /* bootstrap the Collectd module */ static void xs_init (pTHX) { HV *stash = NULL; + SV *tmp = NULL; char *file = __FILE__; int i = 0; @@ -983,6 +1317,25 @@ static void xs_init (pTHX) /* export "constants" */ for (i = 0; '\0' != constants[i].name[0]; ++i) newCONSTSUB (stash, constants[i].name, newSViv (constants[i].value)); + + /* export global variables + * by adding "magic" to the SV's representing the globale variables + * perl is able to automagically call the get/set function when + * accessing any such variable (this is basically the same as using + * tie() in Perl) */ + /* global strings */ + for (i = 0; '\0' != g_strings[i].name[0]; ++i) { + tmp = get_sv (g_strings[i].name, 1); + sv_magicext (tmp, NULL, PERL_MAGIC_ext, &g_pv_vtbl, + g_strings[i].var, 0); + } + + /* global integers */ + for (i = 0; '\0' != g_integers[i].name[0]; ++i) { + tmp = get_sv (g_integers[i].name, 1); + sv_magicext (tmp, NULL, PERL_MAGIC_ext, &g_iv_vtbl, + (char *)g_integers[i].var, 0); + } return; } /* static void xs_init (pTHX) */ @@ -1004,6 +1357,13 @@ static int init_pi (int argc, char **argv) } #endif /* COLLECT_DEBUG */ + if (0 != pthread_key_create (&perl_thr_key, c_ithread_destructor)) { + log_err ("init_pi: pthread_key_create failed"); + + /* this must not happen - cowardly giving up if it does */ + exit (1); + } + PERL_SYS_INIT3 (&argc, &argv, &environ); perl_threads = (c_ithread_list_t *)smalloc (sizeof (c_ithread_list_t)); @@ -1018,7 +1378,7 @@ static int init_pi (int argc, char **argv) perl_threads->tail = perl_threads->head; if (NULL == (perl_threads->head->interp = perl_alloc ())) { - log_err ("module_register: Not enough memory."); + log_err ("init_pi: Not enough memory."); exit (3); } @@ -1030,7 +1390,7 @@ static int init_pi (int argc, char **argv) PL_exit_flags |= PERL_EXIT_DESTRUCT_END; if (0 != perl_parse (aTHX_ xs_init, argc, argv, NULL)) { - log_err ("module_register: Unable to bootstrap Collectd."); + log_err ("init_pi: Unable to bootstrap Collectd."); exit (1); } @@ -1040,6 +1400,7 @@ static int init_pi (int argc, char **argv) perl_run (aTHX); plugin_register_log ("perl", perl_log); + plugin_register_notification ("perl", perl_notify); plugin_register_init ("perl", perl_init); plugin_register_read ("perl", perl_read); @@ -1059,8 +1420,10 @@ static int perl_config_loadplugin (pTHX_ oconfig_item_t *ci) char *value = NULL; if ((0 != ci->children_num) || (1 != ci->values_num) - || (OCONFIG_TYPE_STRING != ci->values[0].type)) + || (OCONFIG_TYPE_STRING != ci->values[0].type)) { + log_err ("LoadPlugin expects a single string argument."); return 1; + } value = ci->values[0].value.string; @@ -1089,8 +1452,10 @@ static int perl_config_basename (pTHX_ oconfig_item_t *ci) char *value = NULL; if ((0 != ci->children_num) || (1 != ci->values_num) - || (OCONFIG_TYPE_STRING != ci->values[0].type)) + || (OCONFIG_TYPE_STRING != ci->values[0].type)) { + log_err ("BaseName expects a single string argument."); return 1; + } value = ci->values[0].value.string; @@ -1108,8 +1473,15 @@ static int perl_config_enabledebugger (pTHX_ oconfig_item_t *ci) char *value = NULL; if ((0 != ci->children_num) || (1 != ci->values_num) - || (OCONFIG_TYPE_STRING != ci->values[0].type)) + || (OCONFIG_TYPE_STRING != ci->values[0].type)) { + log_err ("EnableDebugger expects a single string argument."); + return 1; + } + + if (NULL != perl_threads) { + log_warn ("EnableDebugger has no effects if used after LoadPlugin."); return 1; + } value = ci->values[0].value.string; @@ -1142,8 +1514,10 @@ static int perl_config_includedir (pTHX_ oconfig_item_t *ci) char *value = NULL; if ((0 != ci->children_num) || (1 != ci->values_num) - || (OCONFIG_TYPE_STRING != ci->values[0].type)) + || (OCONFIG_TYPE_STRING != ci->values[0].type)) { + log_err ("IncludeDir expects a single string argument."); return 1; + } value = ci->values[0].value.string;