author | Florian Forster <octo@collectd.org> | |
Wed, 7 Aug 2013 07:40:26 +0000 (09:40 +0200) | ||
committer | Florian Forster <octo@collectd.org> | |
Wed, 7 Aug 2013 07:40:26 +0000 (09:40 +0200) |
configure.in | patch | blob | history | |
src/Makefile.am | patch | blob | history | |
src/collectd.conf.in | patch | blob | history | |
src/collectd.conf.pod | patch | blob | history | |
src/rrdtool.c | patch | blob | history | |
src/sigrok.c | [new file with mode: 0644] | patch | blob |
src/statsd.c | [new file with mode: 0644] | patch | blob |
src/types.db | patch | blob | history | |
src/utils_latency.c | [new file with mode: 0644] | patch | blob |
src/utils_latency.h | [new file with mode: 0644] | patch | blob |
diff --git a/configure.in b/configure.in
index 27c6cd316b916a0047cfe389face9b5b40d0af9b..dbdc0589a0fee67ca9566f43c07ecb40012c3ae2 100644 (file)
--- a/configure.in
+++ b/configure.in
AM_CONDITIONAL(BUILD_WITH_LM_SENSORS, test "x$with_libsensors" = "xyes")
# }}}
+# --with-libsigrok {{{
+with_libsigrok_cflags=""
+with_libsigrok_ldflags=""
+AC_ARG_WITH(libsigrok, [AS_HELP_STRING([--with-libsigrok@<:@=PREFIX@:>@], [Path to libsigrok.])],
+[
+ if test "x$withval" = "xno"
+ then
+ with_libsigrok="no"
+ else
+ with_libsigrok="yes"
+ if test "x$withval" != "xyes"
+ then
+ with_libsigrok_cflags="-I$withval/include"
+ with_libsigrok_ldflags="-L$withval/lib"
+ fi
+ fi
+],[])
+
+# libsigrok has a glib dependency
+if test "x$with_libsigrok" = "xyes"
+then
+ if test -z "m4_ifdef([AM_PATH_GLIB_2_0], [yes], [])"
+ then
+ with_libsigrok="no (glib not available)"
+ else
+ AM_PATH_GLIB_2_0([2.28.0],
+ [with_libsigrok_cflags="$with_libsigrok_cflags $GLIB_CFLAGS"; with_libsigrok_ldflags="$with_libsigrok_ldflags $GLIB_LIBS"])
+ fi
+fi
+
+# libsigrok headers
+if test "x$with_libsigrok" = "xyes"
+then
+ SAVE_CPPFLAGS="$CPPFLAGS"
+ CPPFLAGS="$CPPFLAGS $with_libsigrok_cflags"
+
+ AC_CHECK_HEADERS(libsigrok/libsigrok.h, [], [with_libsigrok="no (libsigrok/libsigrok.h not found)"])
+
+ CPPFLAGS="$SAVE_CPPFLAGS"
+fi
+
+# libsigrok library
+if test "x$with_libsigrok" = "xyes"
+then
+ SAVE_CPPFLAGS="$CPPFLAGS"
+ SAVE_LDFLAGS="$LDFLAGS"
+ CPPFLAGS="$CPPFLAGS $with_libsigrok_cflags"
+ LDFLAGS="$LDFLAGS $with_libsigrok_ldflags"
+
+ AC_CHECK_LIB(sigrok, sr_init,
+ [
+ AC_DEFINE(HAVE_LIBSIGROK, 1, [Define to 1 if you have the sigrok library (-lsigrok).])
+ ],
+ [with_libsigrok="no (libsigrok not found)"])
+
+ CPPFLAGS="$SAVE_CPPFLAGS"
+ LDFLAGS="$SAVE_LDFLAGS"
+fi
+if test "x$with_libsigrok" = "xyes"
+then
+ BUILD_WITH_LIBSIGROK_CFLAGS="$with_libsigrok_cflags"
+ BUILD_WITH_LIBSIGROK_LDFLAGS="$with_libsigrok_ldflags"
+ AC_SUBST(BUILD_WITH_LIBSIGROK_CFLAGS)
+ AC_SUBST(BUILD_WITH_LIBSIGROK_LDFLAGS)
+fi
+AM_CONDITIONAL(BUILD_WITH_LIBSIGROK, test "x$with_libsigrok" = "xyes")
+# }}}
+
# --with-libstatgrab {{{
with_libstatgrab_cflags=""
with_libstatgrab_ldflags=""
AC_PLUGIN([rrdtool], [$with_librrd], [RRDTool output plugin])
AC_PLUGIN([sensors], [$with_libsensors], [lm_sensors statistics])
AC_PLUGIN([serial], [$plugin_serial], [serial port traffic])
+AC_PLUGIN([sigrok], [$with_libsigrok], [sigrok acquisition sources])
AC_PLUGIN([snmp], [$with_libnetsnmp], [SNMP querying plugin])
+AC_PLUGIN([statsd], [yes], [StatsD plugin])
AC_PLUGIN([swap], [$plugin_swap], [Swap usage statistics])
AC_PLUGIN([syslog], [$have_syslog], [Syslog logging plugin])
AC_PLUGIN([table], [yes], [Parsing of tabular data])
librouteros . . . . . $with_librouteros
librrd . . . . . . . $with_librrd
libsensors . . . . . $with_libsensors
+ libsigrok . . . . . $with_libsigrok
libstatgrab . . . . . $with_libstatgrab
libtokyotyrant . . . $with_libtokyotyrant
libupsclient . . . . $with_libupsclient
rrdtool . . . . . . . $enable_rrdtool
sensors . . . . . . . $enable_sensors
serial . . . . . . . $enable_serial
+ sigrok . . . . . . . $enable_sigrok
snmp . . . . . . . . $enable_snmp
+ statsd . . . . . . . $enable_statsd
swap . . . . . . . . $enable_swap
syslog . . . . . . . $enable_syslog
table . . . . . . . . $enable_table
diff --git a/src/Makefile.am b/src/Makefile.am
index c3e596d4a5fea92d6853521fb9ee19458c5a94d4..f35b45a35c335cc6302df127f6f87a0891c5cacd 100644 (file)
--- a/src/Makefile.am
+++ b/src/Makefile.am
collectd_DEPENDENCIES += serial.la
endif
+if BUILD_PLUGIN_SIGROK
+pkglib_LTLIBRARIES += sigrok.la
+sigrok_la_SOURCES = sigrok.c
+sigrok_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBSIGROK_CFLAGS)
+sigrok_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBSIGROK_LDFLAGS)
+sigrok_la_LIBADD = -lsigrok
+collectd_LDADD += "-dlopen" sigrok.la
+collectd_DEPENDENCIES += sigrok.la
+endif
+
if BUILD_PLUGIN_SNMP
pkglib_LTLIBRARIES += snmp.la
snmp_la_SOURCES = snmp.c
collectd_DEPENDENCIES += snmp.la
endif
+if BUILD_PLUGIN_STATSD
+pkglib_LTLIBRARIES += statsd.la
+statsd_la_SOURCES = statsd.c \
+ utils_latency.h utils_latency.c
+statsd_la_LDFLAGS = -module -avoid-version
+statsd_la_LIBADD = -lpthread
+collectd_LDADD += "-dlopen" statsd.la
+collectd_DEPENDENCIES += statsd.la
+endif
+
if BUILD_PLUGIN_SWAP
pkglib_LTLIBRARIES += swap.la
swap_la_SOURCES = swap.c
diff --git a/src/collectd.conf.in b/src/collectd.conf.in
index 80aba6a46fbc2e84cc341d3ef33234015de6fa40..e8d4d28ef026c1c09546cdc0d143de52a19bbed5 100644 (file)
--- a/src/collectd.conf.in
+++ b/src/collectd.conf.in
@LOAD_PLUGIN_RRDTOOL@LoadPlugin rrdtool
#@BUILD_PLUGIN_SENSORS_TRUE@LoadPlugin sensors
#@BUILD_PLUGIN_SERIAL_TRUE@LoadPlugin serial
+#@BUILD_PLUGIN_SIGROK_TRUE@LoadPlugin sigrok
#@BUILD_PLUGIN_SNMP_TRUE@LoadPlugin snmp
+#@BUILD_PLUGIN_STATSD_TRUE@LoadPlugin statsd
#@BUILD_PLUGIN_SWAP_TRUE@LoadPlugin swap
#@BUILD_PLUGIN_TABLE_TRUE@LoadPlugin table
#@BUILD_PLUGIN_TAIL_TRUE@LoadPlugin tail
# IgnoreSelected false
#</Plugin>
+#<Plugin sigrok>
+# LogLevel 3
+# <Device "AC Voltage">
+# Driver "fluke-dmm"
+# MinimumInterval 10
+# Conn "/dev/ttyUSB2"
+# </Device>
+# <Device "Sound Level">
+# Driver "cem-dt-885x"
+# Conn "/dev/ttyUSB1"
+# </Device>
+#</Plugin>
+
#<Plugin snmp>
# <Data "powerplus_voltge_input">
# Type "voltage"
# </Host>
#</Plugin>
+#<Plugin statsd>
+# Host "::"
+# Port "8125"
+# DeleteCounters false
+# DeleteTimers false
+# DeleteGauges false
+# DeleteSets false
+# TimerPercentile 90.0
+#</Plugin>
+
#<Plugin "swap">
# ReportByDevice false
# ReportBytes true
diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod
index 11db1ccdc3d1e07c49f7c6efc93761ba43c0e21b..1b73c417d9ebfc54211eefc252116d733a0b9c96 100644 (file)
--- a/src/collectd.conf.pod
+++ b/src/collectd.conf.pod
=back
+=head2 Plugin C<sigrok>
+
+The I<sigrok plugin> uses I<libsigrok> to retrieve measurements from any device
+supported by the L<sigrok|http://sigrok.org/> project.
+
+B<Synopsis>
+
+ <Plugin sigrok>
+ LogLevel 3
+ <Device "AC Voltage">
+ Driver "fluke-dmm"
+ MinimumInterval 10
+ Conn "/dev/ttyUSB2"
+ </Device>
+ <Device "Sound Level">
+ Driver "cem-dt-885x"
+ Conn "/dev/ttyUSB1"
+ </Device>
+ </Plugin>
+
+=over 4
+
+=item B<LogLevel> B<0-5>
+
+The I<sigrok> logging level to pass on to the I<collectd> log, as a number
+between B<0> and B<5> (inclusive). These levels correspond to C<None>,
+C<Errors>, C<Warnings>, C<Informational>, C<Debug >and C<Spew>, respectively.
+The default is B<2> (C<Warnings>). The I<sigrok> log messages, regardless of
+their level, are always submitted to I<collectd> at its INFO log level.
+
+=item E<lt>B<Device> I<Name>E<gt>
+
+A sigrok-supported device, uniquely identified by this section's options. The
+I<Name> is passed to I<collectd> as the I<plugin instance>.
+
+=item B<Driver> I<DriverName>
+
+The sigrok driver to use for this device.
+
+=item B<Conn> I<ConnectionSpec>
+
+If the device cannot be auto-discovered, or more than one might be discovered
+by the driver, I<ConnectionSpec> specifies the connection string to the device.
+It can be of the form of a device path (e.g.E<nbsp>C</dev/ttyUSB2>), or, in
+case of a non-serial USB-connected device, the USB I<VendorID>B<.>I<ProductID>
+separated by a period (e.g.E<nbsp>C<0403.6001>). A USB device can also be
+specified as I<Bus>B<.>I<Address> (e.g.E<nbsp>C<1.41>).
+
+=item B<SerialComm> I<SerialSpec>
+
+For serial devices with non-standard port settings, this option can be used
+to specify them in a form understood by I<sigrok>, e.g.E<nbsp>C<9600/8n1>.
+This should not be necessary; drivers know how to communicate with devices they
+support.
+
+=item B<MinimumInterval> I<Seconds>
+
+Specifies the minimum time between measurement dispatches to I<collectd>, in
+seconds. Since some I<sigrok> supported devices can acquire measurements many
+times per second, it may be necessary to throttle these. For example, the
+I<RRD plugin> cannot process writes more than once per second.
+
+The default B<MinimumInterval> is B<0>, meaning measurements received from the
+device are always dispatched to I<collectd>. When throttled, unused
+measurements are discarded.
+
+=back
+
=head2 Plugin C<snmp>
Since the configuration of the C<snmp plugin> is a little more complicated than
other plugins, its documentation has been moved to an own manpage,
L<collectd-snmp(5)>. Please see there for details.
+=head2 Plugin C<statsd>
+
+The I<statsd plugin> listens to a UDP socket, reads "events" in the statsd
+protocol and dispatches rates or other aggregates of these numbers
+periodically.
+
+The plugin implements the I<Counter>, I<Timer>, I<Gauge> and I<Set> types which
+are dispatched as the I<collectd> types C<derive>, C<latency>, C<gauge> and
+C<objects> respectively.
+
+The following configuration options are valid:
+
+=over 4
+
+=item B<Host> I<Host>
+
+Bind to the hostname / address I<Host>. By default, the plugin will bind to the
+"any" address, i.e. accept packets sent to any of the hosts addresses.
+
+=item B<Port> I<Port>
+
+UDP port to listen to. This can be either a service name or a port number.
+Defaults to C<8125>.
+
+=item B<DeleteCounters> B<false>|B<true>
+
+=item B<DeleteTimers> B<false>|B<true>
+
+=item B<DeleteGauges> B<false>|B<true>
+
+=item B<DeleteSets> B<false>|B<true>
+
+These options control what happens if metrics are not updated in an interval.
+If set to B<False>, the default, metrics are dispatched unchanged, i.e. the
+rate of counters and size of sets will be zero, timers report C<NaN> and gauges
+are unchanged. If set to B<True>, the such metrics are not dispatched and
+removed from the internal cache.
+
+=item B<TimerPercentile> I<Percent>
+
+Calculate and dispatch the configured percentile, i.e. compute the latency, so
+that I<Percent> of all reported timers are smaller than or equal to the
+computed latency. This is useful for cutting off the long tail latency, as it's
+often done in I<Service Level Agreements> (SLAs).
+
+If not specified, no percentile is calculated / dispatched.
+
+=back
+
=head2 Plugin C<swap>
The I<Swap plugin> collects information about used and available swap space. On
diff --git a/src/rrdtool.c b/src/rrdtool.c
index 80833902a21e7b05656c3d6baa4fb91ffd5d170d..2f28329f0fbef43f17c70b3b7136a6b4f187e2b8 100644 (file)
--- a/src/rrdtool.c
+++ b/src/rrdtool.c
} /* int srrd_update */
#endif /* !HAVE_THREADSAFE_LIBRRD */
-static int value_list_to_string (char *buffer, int buffer_len,
+static int value_list_to_string_multiple (char *buffer, int buffer_len,
const data_set_t *ds, const value_list_t *vl)
{
int offset;
offset += status;
} /* for ds->ds_num */
+ return (0);
+} /* int value_list_to_string_multiple */
+
+static int value_list_to_string (char *buffer, int buffer_len,
+ const data_set_t *ds, const value_list_t *vl)
+{
+ int status;
+ time_t tt;
+
+ if (ds->ds_num != 1)
+ return (value_list_to_string_multiple (buffer, buffer_len,
+ ds, vl));
+
+ tt = CDTIME_T_TO_TIME_T (vl->time);
+ switch (ds->ds[0].type)
+ {
+ case DS_TYPE_DERIVE:
+ status = ssnprintf (buffer, buffer_len, "%u:%"PRIi64,
+ (unsigned) tt, vl->values[0].derive);
+ break;
+ case DS_TYPE_GAUGE:
+ status = ssnprintf (buffer, buffer_len, "%u:%lf",
+ (unsigned) tt, vl->values[0].gauge);
+ break;
+ case DS_TYPE_COUNTER:
+ status = ssnprintf (buffer, buffer_len, "%u:%llu",
+ (unsigned) tt, vl->values[0].counter);
+ break;
+ case DS_TYPE_ABSOLUTE:
+ status = ssnprintf (buffer, buffer_len, "%u:%"PRIu64,
+ (unsigned) tt, vl->values[0].absolute);
+ break;
+ default:
+ return (EINVAL);
+ }
+
+ if ((status < 1) || (status >= buffer_len))
+ return (ENOMEM);
+
return (0);
} /* int value_list_to_string */
diff --git a/src/sigrok.c b/src/sigrok.c
--- /dev/null
+++ b/src/sigrok.c
@@ -0,0 +1,399 @@
+/*
+ * collectd - src/sigrok.c
+ * Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <pthread.h>
+
+#include <glib.h>
+#include <libsigrok/libsigrok.h>
+
+/* Minimum interval between dispatches coming from this plugin. The RRD
+ * plugin, at least, complains when written to with sub-second intervals.*/
+#define DEFAULT_MIN_DISPATCH_INTERVAL TIME_T_TO_CDTIME_T(0)
+
+static pthread_t sr_thread;
+static int sr_thread_running = FALSE;
+GSList *config_devices;
+static int num_devices;
+static int loglevel = SR_LOG_WARN;
+static struct sr_context *sr_ctx;
+
+struct config_device {
+ char *name;
+ char *driver;
+ char *conn;
+ char *serialcomm;
+ struct sr_dev_inst *sdi;
+ cdtime_t min_dispatch_interval;
+ cdtime_t last_dispatch;
+};
+
+
+static int sigrok_log_callback(void*cb_data __attribute__((unused)),
+ int msg_loglevel, const char *format, va_list args)
+{
+ char s[512];
+
+ if (msg_loglevel <= loglevel) {
+ vsnprintf(s, 512, format, args);
+ plugin_log(LOG_INFO, "sigrok plugin: %s", s);
+ }
+
+ return 0;
+}
+
+static int sigrok_config_device(oconfig_item_t *ci)
+{
+ struct config_device *cfdev;
+ int i;
+
+ if (!(cfdev = malloc(sizeof(struct config_device)))) {
+ ERROR("sigrok plugin: malloc() failed.");
+ return -1;
+ }
+ memset(cfdev, 0, sizeof(*cfdev));
+ if (cf_util_get_string(ci, &cfdev->name)) {
+ free(cfdev);
+ WARNING("sigrok plugin: Invalid device name.");
+ return -1;
+ }
+ cfdev->min_dispatch_interval = DEFAULT_MIN_DISPATCH_INTERVAL;
+
+ for (i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *item = ci->children + i;
+ if (!strcasecmp(item->key, "driver"))
+ cf_util_get_string(item, &cfdev->driver);
+ else if (!strcasecmp(item->key, "conn"))
+ cf_util_get_string(item, &cfdev->conn);
+ else if (!strcasecmp(item->key, "serialcomm"))
+ cf_util_get_string(item, &cfdev->serialcomm);
+ else if (!strcasecmp(item->key, "minimuminterval"))
+ cf_util_get_cdtime(item, &cfdev->min_dispatch_interval);
+ else
+ WARNING("sigrok plugin: Invalid keyword \"%s\".",
+ item->key);
+ }
+
+ config_devices = g_slist_append(config_devices, cfdev);
+
+ return 0;
+}
+
+static int sigrok_config(oconfig_item_t *ci)
+{
+ int i;
+
+ for (i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *item = ci->children + i;
+ if (strcasecmp("LogLevel", item->key) == 0) {
+ int status;
+ int tmp = -1;
+
+ status = cf_util_get_int (item, &tmp);
+ if (status != 0)
+ continue;
+ else if ((tmp < 0) || (tmp > 5)) {
+ ERROR ("sigrok plugin: The \"LogLevel\" "
+ "configuration option expects "
+ "an integer between 0 and 5 "
+ "(inclusive); you provided %i.",
+ tmp);
+ continue;
+ }
+ loglevel = tmp;
+ } else if (!strcasecmp(item->key, "Device"))
+ sigrok_config_device(item);
+ else
+ WARNING("sigrok plugin: Invalid keyword \"%s\".",
+ item->key);
+ }
+
+ return 0;
+}
+
+static char *sigrok_value_type(const struct sr_datafeed_analog *analog)
+{
+ char *s;
+
+ if (analog->mq == SR_MQ_VOLTAGE)
+ s = "voltage";
+ else if (analog->mq == SR_MQ_CURRENT)
+ s = "current";
+ else if (analog->mq == SR_MQ_FREQUENCY)
+ s = "frequency";
+ else if (analog->mq == SR_MQ_POWER)
+ s = "power";
+ else if (analog->mq == SR_MQ_TEMPERATURE)
+ s = "temperature";
+ else if (analog->mq == SR_MQ_RELATIVE_HUMIDITY)
+ s = "humidity";
+ else if (analog->mq == SR_MQ_SOUND_PRESSURE_LEVEL)
+ s = "spl";
+ else
+ s = "gauge";
+
+ return s;
+}
+
+static void sigrok_feed_callback(const struct sr_dev_inst *sdi,
+ const struct sr_datafeed_packet *packet, void *cb_data)
+{
+ const struct sr_datafeed_analog *analog;
+ struct config_device *cfdev;
+ GSList *l;
+ value_t value;
+ value_list_t vl = VALUE_LIST_INIT;
+
+ /* Find this device's configuration. */
+ cfdev = NULL;
+ for (l = config_devices; l; l = l->next) {
+ cfdev = l->data;
+ if (cfdev->sdi == sdi) {
+ /* Found it. */
+ break;
+ }
+ cfdev = NULL;
+ }
+
+ if (!cfdev) {
+ ERROR("sigrok plugin: Received data from driver \"%s\" but "
+ "can't find a configuration / device matching "
+ "it.", sdi->driver->name);
+ return;
+ }
+
+ if (packet->type == SR_DF_END) {
+ /* TODO: try to restart acquisition after a delay? */
+ WARNING("sigrok plugin: acquisition for \"%s\" ended.",
+ cfdev->name);
+ return;
+ }
+
+ if (packet->type != SR_DF_ANALOG)
+ return;
+
+ if ((cfdev->min_dispatch_interval != 0)
+ && ((cdtime() - cfdev->last_dispatch)
+ < cfdev->min_dispatch_interval))
+ return;
+
+ /* Ignore all but the first sample on the first probe. */
+ analog = packet->payload;
+ value.gauge = analog->data[0];
+ vl.values = &value;
+ vl.values_len = 1;
+ sstrncpy(vl.host, hostname_g, sizeof(vl.host));
+ sstrncpy(vl.plugin, "sigrok", sizeof(vl.plugin));
+ ssnprintf(vl.plugin_instance, sizeof(vl.plugin_instance),
+ "%s", cfdev->name);
+ sstrncpy(vl.type, sigrok_value_type(analog), sizeof(vl.type));
+
+ plugin_dispatch_values(&vl);
+ cfdev->last_dispatch = cdtime();
+}
+
+static void sigrok_free_drvopts(struct sr_config *src)
+{
+ g_variant_unref(src->data);
+ g_free(src);
+}
+
+static int sigrok_init_driver(struct config_device *cfdev,
+ struct sr_dev_driver *drv)
+{
+ struct sr_config *src;
+ GSList *devlist, *drvopts;
+ char hwident[512];
+
+ if (sr_driver_init(sr_ctx, drv) != SR_OK)
+ /* Error was logged by libsigrok. */
+ return -1;
+
+ drvopts = NULL;
+ if (cfdev->conn) {
+ if (!(src = malloc(sizeof(struct sr_config))))
+ return -1;
+ src->key = SR_CONF_CONN;
+ src->data = g_variant_new_string(cfdev->conn);
+ drvopts = g_slist_append(drvopts, src);
+ }
+ if (cfdev->serialcomm) {
+ if (!(src = malloc(sizeof(struct sr_config))))
+ return -1;
+ src->key = SR_CONF_SERIALCOMM;
+ src->data = g_variant_new_string(cfdev->serialcomm);
+ drvopts = g_slist_append(drvopts, src);
+ }
+ devlist = sr_driver_scan(drv, drvopts);
+ g_slist_free_full(drvopts, (GDestroyNotify)sigrok_free_drvopts);
+ if (!devlist) {
+ /* Not an error, but the user should know about it. */
+ WARNING("sigrok plugin: No device found for \"%s\".",
+ cfdev->name);
+ return 0;
+ }
+
+ if (g_slist_length(devlist) > 1) {
+ INFO("sigrok plugin: %d sigrok devices for device entry "
+ "\"%s\": must be 1.",
+ g_slist_length(devlist), cfdev->name);
+ return -1;
+ }
+ cfdev->sdi = devlist->data;
+ g_slist_free(devlist);
+ ssnprintf(hwident, sizeof(hwident), "%s %s %s",
+ cfdev->sdi->vendor ? cfdev->sdi->vendor : "",
+ cfdev->sdi->model ? cfdev->sdi->model : "",
+ cfdev->sdi->version ? cfdev->sdi->version : "");
+ INFO("sigrok plugin: Device \"%s\" is a %s", cfdev->name, hwident);
+
+ if (sr_dev_open(cfdev->sdi) != SR_OK)
+ return -1;
+
+ if (sr_session_dev_add(cfdev->sdi) != SR_OK)
+ return -1;
+
+ return 1;
+}
+
+static void *sigrok_read_thread(void *arg __attribute__((unused)))
+{
+ struct sr_dev_driver *drv, **drvlist;
+ GSList *l;
+ struct config_device *cfdev;
+ int ret, i;
+
+ sr_log_callback_set(sigrok_log_callback, NULL);
+ sr_log_loglevel_set(loglevel);
+
+ if ((ret = sr_init(&sr_ctx)) != SR_OK) {
+ ERROR("sigrok plugin: Failed to initialize libsigrok: %s.",
+ sr_strerror(ret));
+ return NULL;
+ }
+
+ if (!sr_session_new())
+ return NULL;
+
+ num_devices = 0;
+ drvlist = sr_driver_list();
+ for (l = config_devices; l; l = l->next) {
+ cfdev = l->data;
+ drv = NULL;
+ for (i = 0; drvlist[i]; i++) {
+ if (!strcmp(drvlist[i]->name, cfdev->driver)) {
+ drv = drvlist[i];
+ break;
+ }
+ }
+ if (!drv) {
+ ERROR("sigrok plugin: Unknown driver \"%s\".",
+ cfdev->driver);
+ return NULL;
+ }
+
+ if ((ret = sigrok_init_driver(cfdev, drv)) < 0)
+ /* Error was already logged. */
+ return NULL;
+
+ num_devices += ret;
+ }
+
+ if (num_devices > 0) {
+ /* Do this only when we're sure there's hardware to talk to. */
+ if (sr_session_datafeed_callback_add(sigrok_feed_callback, NULL)
+ != SR_OK)
+ return NULL;
+
+ /* Start acquisition on all devices. */
+ if (sr_session_start() != SR_OK)
+ return NULL;
+
+ /* Main loop, runs forever. */
+ sr_session_run();
+
+ sr_session_stop();
+ sr_session_dev_remove_all();
+ }
+
+ sr_session_destroy();
+
+ sr_exit(sr_ctx);
+
+ pthread_exit(NULL);
+ sr_thread_running = FALSE;
+
+ return NULL;
+}
+
+static int sigrok_init(void)
+{
+ int status;
+
+ if (sr_thread_running) {
+ ERROR("sigrok plugin: Thread already running.");
+ return -1;
+ }
+
+ if ((status = plugin_thread_create(&sr_thread, NULL, sigrok_read_thread,
+ NULL)) != 0) {
+ ERROR("sigrok plugin: Failed to create thread: %s.",
+ strerror(status));
+ return -1;
+ }
+ sr_thread_running = TRUE;
+
+ return 0;
+}
+
+static int sigrok_shutdown(void)
+{
+ struct config_device *cfdev;
+ GSList *l;
+
+ if (sr_thread_running) {
+ pthread_cancel(sr_thread);
+ pthread_join(sr_thread, NULL);
+ }
+
+ for (l = config_devices; l; l = l->next) {
+ cfdev = l->data;
+ free(cfdev->name);
+ free(cfdev->driver);
+ free(cfdev->conn);
+ free(cfdev->serialcomm);
+ free(cfdev);
+ }
+ g_slist_free(config_devices);
+
+ return 0;
+}
+
+void module_register(void)
+{
+ plugin_register_complex_config("sigrok", sigrok_config);
+ plugin_register_init("sigrok", sigrok_init);
+ plugin_register_shutdown("sigrok", sigrok_shutdown);
+}
diff --git a/src/statsd.c b/src/statsd.c
--- /dev/null
+++ b/src/statsd.c
@@ -0,0 +1,861 @@
+/**
+ * collectd - src/statsd.c
+ *
+ * Copyright (C) 2013 Florian octo Forster
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
+ * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
+ * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ *
+ * Authors:
+ * Florian octo Forster <octo at collectd.org>
+ */
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+#include "configfile.h"
+#include "utils_avltree.h"
+#include "utils_complain.h"
+#include "utils_latency.h"
+
+#include <pthread.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <poll.h>
+
+#ifndef STATSD_DEFAULT_NODE
+# define STATSD_DEFAULT_NODE NULL
+#endif
+
+#ifndef STATSD_DEFAULT_SERVICE
+# define STATSD_DEFAULT_SERVICE "8125"
+#endif
+
+enum metric_type_e
+{
+ STATSD_COUNTER,
+ STATSD_TIMER,
+ STATSD_GAUGE,
+ STATSD_SET
+};
+typedef enum metric_type_e metric_type_t;
+
+struct statsd_metric_s
+{
+ metric_type_t type;
+ double value;
+ latency_counter_t *latency;
+ c_avl_tree_t *set;
+ unsigned long updates_num;
+};
+typedef struct statsd_metric_s statsd_metric_t;
+
+static c_avl_tree_t *metrics_tree = NULL;
+static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
+
+static pthread_t network_thread;
+static _Bool network_thread_running = 0;
+static _Bool network_thread_shutdown = 0;
+
+static char *conf_node = NULL;
+static char *conf_service = NULL;
+
+static _Bool conf_delete_counters = 0;
+static _Bool conf_delete_timers = 0;
+static _Bool conf_delete_gauges = 0;
+static _Bool conf_delete_sets = 0;
+
+static double *conf_timer_percentile = NULL;
+static size_t conf_timer_percentile_num = 0;
+
+/* Must hold metrics_lock when calling this function. */
+static statsd_metric_t *statsd_metric_lookup_unsafe (char const *name, /* {{{ */
+ metric_type_t type)
+{
+ char key[DATA_MAX_NAME_LEN + 2];
+ char *key_copy;
+ statsd_metric_t *metric;
+ int status;
+
+ switch (type)
+ {
+ case STATSD_COUNTER: key[0] = 'c'; break;
+ case STATSD_TIMER: key[0] = 't'; break;
+ case STATSD_GAUGE: key[0] = 'g'; break;
+ case STATSD_SET: key[0] = 's'; break;
+ default: return (NULL);
+ }
+
+ key[1] = ':';
+ sstrncpy (&key[2], name, sizeof (key) - 2);
+
+ status = c_avl_get (metrics_tree, key, (void *) &metric);
+ if (status == 0)
+ return (metric);
+
+ key_copy = strdup (key);
+ if (key_copy == NULL)
+ {
+ ERROR ("statsd plugin: strdup failed.");
+ return (NULL);
+ }
+
+ metric = malloc (sizeof (*metric));
+ if (metric == NULL)
+ {
+ ERROR ("statsd plugin: malloc failed.");
+ sfree (key_copy);
+ return (NULL);
+ }
+ memset (metric, 0, sizeof (*metric));
+
+ metric->type = type;
+ metric->latency = NULL;
+ metric->set = NULL;
+
+ status = c_avl_insert (metrics_tree, key_copy, metric);
+ if (status != 0)
+ {
+ ERROR ("statsd plugin: c_avl_insert failed.");
+ sfree (key_copy);
+ sfree (metric);
+ return (NULL);
+ }
+
+ return (metric);
+} /* }}} statsd_metric_lookup_unsafe */
+
+static int statsd_metric_set (char const *name, double value, /* {{{ */
+ metric_type_t type)
+{
+ statsd_metric_t *metric;
+
+ pthread_mutex_lock (&metrics_lock);
+
+ metric = statsd_metric_lookup_unsafe (name, type);
+ if (metric == NULL)
+ {
+ pthread_mutex_unlock (&metrics_lock);
+ return (-1);
+ }
+
+ metric->value = value;
+ metric->updates_num++;
+
+ pthread_mutex_unlock (&metrics_lock);
+
+ return (0);
+} /* }}} int statsd_metric_set */
+
+static int statsd_metric_add (char const *name, double delta, /* {{{ */
+ metric_type_t type)
+{
+ statsd_metric_t *metric;
+
+ pthread_mutex_lock (&metrics_lock);
+
+ metric = statsd_metric_lookup_unsafe (name, type);
+ if (metric == NULL)
+ {
+ pthread_mutex_unlock (&metrics_lock);
+ return (-1);
+ }
+
+ metric->value += delta;
+ metric->updates_num++;
+
+ pthread_mutex_unlock (&metrics_lock);
+
+ return (0);
+} /* }}} int statsd_metric_add */
+
+static int statsd_parse_value (char const *str, value_t *ret_value) /* {{{ */
+{
+ char *endptr = NULL;
+
+ ret_value->gauge = (gauge_t) strtod (str, &endptr);
+ if ((str == endptr) || ((endptr != NULL) && (*endptr != 0)))
+ return (-1);
+
+ return (0);
+} /* }}} int statsd_parse_value */
+
+static int statsd_handle_counter (char const *name, /* {{{ */
+ char const *value_str,
+ char const *extra)
+{
+ value_t value;
+ value_t scale;
+ int status;
+
+ if ((extra != NULL) && (extra[0] != '@'))
+ return (-1);
+
+ scale.gauge = 1.0;
+ if (extra != NULL)
+ {
+ status = statsd_parse_value (extra + 1, &scale);
+ if (status != 0)
+ return (status);
+
+ if (!isfinite (scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
+ return (-1);
+ }
+
+ value.gauge = 1.0;
+ status = statsd_parse_value (value_str, &value);
+ if (status != 0)
+ return (status);
+
+ return (statsd_metric_add (name, (double) (value.gauge / scale.gauge),
+ STATSD_COUNTER));
+} /* }}} int statsd_handle_counter */
+
+static int statsd_handle_gauge (char const *name, /* {{{ */
+ char const *value_str)
+{
+ value_t value;
+ int status;
+
+ value.gauge = 0;
+ status = statsd_parse_value (value_str, &value);
+ if (status != 0)
+ return (status);
+
+ if ((value_str[0] == '+') || (value_str[0] == '-'))
+ return (statsd_metric_add (name, (double) value.gauge, STATSD_GAUGE));
+ else
+ return (statsd_metric_set (name, (double) value.gauge, STATSD_GAUGE));
+} /* }}} int statsd_handle_gauge */
+
+static int statsd_handle_timer (char const *name, /* {{{ */
+ char const *value_str)
+{
+ statsd_metric_t *metric;
+ value_t value_ms;
+ cdtime_t value;
+ int status;
+
+ value_ms.derive = 0;
+ status = statsd_parse_value (value_str, &value_ms);
+ if (status != 0)
+ return (status);
+
+ value = MS_TO_CDTIME_T (value_ms.gauge);
+
+ pthread_mutex_lock (&metrics_lock);
+
+ metric = statsd_metric_lookup_unsafe (name, STATSD_TIMER);
+ if (metric == NULL)
+ {
+ pthread_mutex_unlock (&metrics_lock);
+ return (-1);
+ }
+
+ if (metric->latency == NULL)
+ metric->latency = latency_counter_create ();
+ if (metric->latency == NULL)
+ {
+ pthread_mutex_unlock (&metrics_lock);
+ return (-1);
+ }
+
+ latency_counter_add (metric->latency, value);
+ metric->updates_num++;
+
+ pthread_mutex_unlock (&metrics_lock);
+ return (0);
+} /* }}} int statsd_handle_timer */
+
+static int statsd_handle_set (char const *name, /* {{{ */
+ char const *set_key_orig)
+{
+ statsd_metric_t *metric = NULL;
+ char *set_key;
+ int status;
+
+ pthread_mutex_lock (&metrics_lock);
+
+ metric = statsd_metric_lookup_unsafe (name, STATSD_SET);
+ if (metric == NULL)
+ {
+ pthread_mutex_unlock (&metrics_lock);
+ return (-1);
+ }
+
+ /* Make sure metric->set exists. */
+ if (metric->set == NULL)
+ metric->set = c_avl_create ((void *) strcmp);
+
+ if (metric->set == NULL)
+ {
+ pthread_mutex_unlock (&metrics_lock);
+ ERROR ("statsd plugin: c_avl_create failed.");
+ return (-1);
+ }
+
+ set_key = strdup (set_key_orig);
+ if (set_key == NULL)
+ {
+ pthread_mutex_unlock (&metrics_lock);
+ ERROR ("statsd plugin: strdup failed.");
+ return (-1);
+ }
+
+ status = c_avl_insert (metric->set, set_key, /* value = */ NULL);
+ if (status < 0)
+ {
+ pthread_mutex_unlock (&metrics_lock);
+ if (status < 0)
+ ERROR ("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
+ set_key, status);
+ sfree (set_key);
+ return (-1);
+ }
+ else if (status > 0) /* key already exists */
+ {
+ sfree (set_key);
+ }
+
+ metric->updates_num++;
+
+ pthread_mutex_unlock (&metrics_lock);
+ return (0);
+} /* }}} int statsd_handle_set */
+
+static int statsd_parse_line (char *buffer) /* {{{ */
+{
+ char *name = buffer;
+ char *value;
+ char *type;
+ char *extra;
+
+ type = strchr (name, '|');
+ if (type == NULL)
+ return (-1);
+ *type = 0;
+ type++;
+
+ value = strrchr (name, ':');
+ if (value == NULL)
+ return (-1);
+ *value = 0;
+ value++;
+
+ extra = strchr (type, '|');
+ if (extra != NULL)
+ {
+ *extra = 0;
+ extra++;
+ }
+
+ if (strcmp ("c", type) == 0)
+ return (statsd_handle_counter (name, value, extra));
+
+ /* extra is only valid for counters */
+ if (extra != NULL)
+ return (-1);
+
+ if (strcmp ("g", type) == 0)
+ return (statsd_handle_gauge (name, value));
+ else if (strcmp ("ms", type) == 0)
+ return (statsd_handle_timer (name, value));
+ else if (strcmp ("s", type) == 0)
+ return (statsd_handle_set (name, value));
+ else
+ return (-1);
+} /* }}} void statsd_parse_line */
+
+static void statsd_parse_buffer (char *buffer) /* {{{ */
+{
+ while (buffer != NULL)
+ {
+ char orig[64];
+ char *next;
+ int status;
+
+ next = strchr (buffer, '\n');
+ if (next != NULL)
+ {
+ *next = 0;
+ next++;
+ }
+
+ if (*buffer == 0)
+ {
+ buffer = next;
+ continue;
+ }
+
+ sstrncpy (orig, buffer, sizeof (orig));
+
+ status = statsd_parse_line (buffer);
+ if (status != 0)
+ ERROR ("statsd plugin: Unable to parse line: \"%s\"", orig);
+
+ buffer = next;
+ }
+} /* }}} void statsd_parse_buffer */
+
+static void statsd_network_read (int fd) /* {{{ */
+{
+ char buffer[4096];
+ size_t buffer_size;
+ ssize_t status;
+
+ status = recv (fd, buffer, sizeof (buffer), /* flags = */ MSG_DONTWAIT);
+ if (status < 0)
+ {
+ char errbuf[1024];
+
+ if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
+ return;
+
+ ERROR ("statsd plugin: recv(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ return;
+ }
+
+ buffer_size = (size_t) status;
+ if (buffer_size >= sizeof (buffer))
+ buffer_size = sizeof (buffer) - 1;
+ buffer[buffer_size] = 0;
+
+ statsd_parse_buffer (buffer);
+} /* }}} void statsd_network_read */
+
+static int statsd_network_init (struct pollfd **ret_fds, /* {{{ */
+ size_t *ret_fds_num)
+{
+ struct pollfd *fds = NULL;
+ size_t fds_num = 0;
+
+ struct addrinfo ai_hints;
+ struct addrinfo *ai_list = NULL;
+ struct addrinfo *ai_ptr;
+ int status;
+
+ char const *node = (conf_node != NULL) ? conf_node : STATSD_DEFAULT_NODE;
+ char const *service = (conf_service != NULL)
+ ? conf_service : STATSD_DEFAULT_SERVICE;
+
+ memset (&ai_hints, 0, sizeof (ai_hints));
+ ai_hints.ai_flags = AI_PASSIVE;
+#ifdef AI_ADDRCONFIG
+ ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+ ai_hints.ai_family = AF_UNSPEC;
+ ai_hints.ai_socktype = SOCK_DGRAM;
+
+ status = getaddrinfo (node, service, &ai_hints, &ai_list);
+ if (status != 0)
+ {
+ ERROR ("statsd plugin: getaddrinfo (\"%s\", \"%s\") failed: %s",
+ node, service, gai_strerror (status));
+ return (status);
+ }
+
+ for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+ {
+ int fd;
+ struct pollfd *tmp;
+
+ char dbg_node[NI_MAXHOST];
+ char dbg_service[NI_MAXSERV];
+
+ fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+ if (fd < 0)
+ {
+ char errbuf[1024];
+ ERROR ("statsd plugin: socket(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ continue;
+ }
+
+ getnameinfo (ai_ptr->ai_addr, ai_ptr->ai_addrlen,
+ dbg_node, sizeof (dbg_node), dbg_service, sizeof (dbg_service),
+ NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV);
+ DEBUG ("statsd plugin: Trying to bind to [%s]:%s ...", dbg_node, dbg_service);
+
+ status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("statsd plugin: bind(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ close (fd);
+ continue;
+ }
+
+ tmp = realloc (fds, sizeof (*fds) * (fds_num + 1));
+ if (tmp == NULL)
+ {
+ ERROR ("statsd plugin: realloc failed.");
+ continue;
+ }
+ fds = tmp;
+ tmp = fds + fds_num;
+ fds_num++;
+
+ memset (tmp, 0, sizeof (*tmp));
+ tmp->fd = fd;
+ tmp->events = POLLIN | POLLPRI;
+ }
+
+ freeaddrinfo (ai_list);
+
+ if (fds_num == 0)
+ {
+ ERROR ("statsd plugin: Unable to create listening socket for [%s]:%s.",
+ (node != NULL) ? node : "::", service);
+ return (ENOENT);
+ }
+
+ *ret_fds = fds;
+ *ret_fds_num = fds_num;
+ return (0);
+} /* }}} int statsd_network_init */
+
+static void *statsd_network_thread (void *args) /* {{{ */
+{
+ struct pollfd *fds = NULL;
+ size_t fds_num = 0;
+ int status;
+ size_t i;
+
+ status = statsd_network_init (&fds, &fds_num);
+ if (status != 0)
+ {
+ ERROR ("statsd plugin: Unable to open listening sockets.");
+ pthread_exit ((void *) 0);
+ }
+
+ while (!network_thread_shutdown)
+ {
+ status = poll (fds, (nfds_t) fds_num, /* timeout = */ -1);
+ if (status < 0)
+ {
+ char errbuf[1024];
+
+ if ((errno == EINTR) || (errno == EAGAIN))
+ continue;
+
+ ERROR ("statsd plugin: poll(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ break;
+ }
+
+ for (i = 0; i < fds_num; i++)
+ {
+ if ((fds[i].revents & (POLLIN | POLLPRI)) == 0)
+ continue;
+
+ statsd_network_read (fds[i].fd);
+ fds[i].revents = 0;
+ }
+ } /* while (!network_thread_shutdown) */
+
+ /* Clean up */
+ for (i = 0; i < fds_num; i++)
+ close (fds[i].fd);
+ sfree (fds);
+
+ return ((void *) 0);
+} /* }}} void *statsd_network_thread */
+
+static int statsd_config_timer_percentile (oconfig_item_t *ci) /* {{{ */
+{
+ double percent = NAN;
+ double *tmp;
+ int status;
+
+ status = cf_util_get_double (ci, &percent);
+ if (status != 0)
+ return (status);
+
+ if ((percent <= 0.0) || (percent >= 100))
+ {
+ ERROR ("statsd plugin: The value for \"%s\" must be between 0 and 100, "
+ "exclusively.", ci->key);
+ return (ERANGE);
+ }
+
+ tmp = realloc (conf_timer_percentile,
+ sizeof (*conf_timer_percentile) * (conf_timer_percentile_num + 1));
+ if (tmp == NULL)
+ {
+ ERROR ("statsd plugin: realloc failed.");
+ return (ENOMEM);
+ }
+ conf_timer_percentile = tmp;
+ conf_timer_percentile[conf_timer_percentile_num] = percent;
+ conf_timer_percentile_num++;
+
+ return (0);
+} /* }}} int statsd_config_timer_percentile */
+
+static int statsd_config (oconfig_item_t *ci) /* {{{ */
+{
+ int i;
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Host", child->key) == 0)
+ cf_util_get_string (child, &conf_node);
+ else if (strcasecmp ("Port", child->key) == 0)
+ cf_util_get_service (child, &conf_service);
+ else if (strcasecmp ("DeleteCounters", child->key) == 0)
+ cf_util_get_boolean (child, &conf_delete_counters);
+ else if (strcasecmp ("DeleteTimers", child->key) == 0)
+ cf_util_get_boolean (child, &conf_delete_timers);
+ else if (strcasecmp ("DeleteGauges", child->key) == 0)
+ cf_util_get_boolean (child, &conf_delete_gauges);
+ else if (strcasecmp ("DeleteSets", child->key) == 0)
+ cf_util_get_boolean (child, &conf_delete_sets);
+ else if (strcasecmp ("TimerPercentile", child->key) == 0)
+ statsd_config_timer_percentile (child);
+ else
+ ERROR ("statsd plugin: The \"%s\" config option is not valid.",
+ child->key);
+ }
+
+ return (0);
+} /* }}} int statsd_config */
+
+static int statsd_init (void) /* {{{ */
+{
+ pthread_mutex_lock (&metrics_lock);
+ if (metrics_tree == NULL)
+ metrics_tree = c_avl_create ((void *) strcmp);
+
+ if (!network_thread_running)
+ {
+ int status;
+
+ status = pthread_create (&network_thread,
+ /* attr = */ NULL,
+ statsd_network_thread,
+ /* args = */ NULL);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ pthread_mutex_unlock (&metrics_lock);
+ ERROR ("statsd plugin: pthread_create failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ return (status);
+ }
+ }
+ network_thread_running = 1;
+
+ pthread_mutex_unlock (&metrics_lock);
+
+ return (0);
+} /* }}} int statsd_init */
+
+/* Must hold metrics_lock when calling this function. */
+static int statsd_metric_clear_set_unsafe (statsd_metric_t *metric) /* {{{ */
+{
+ void *key;
+ void *value;
+
+ if ((metric == NULL) || (metric->type != STATSD_SET))
+ return (EINVAL);
+
+ if (metric->set == NULL)
+ return (0);
+
+ while (c_avl_pick (metric->set, &key, &value) == 0)
+ {
+ sfree (key);
+ sfree (value);
+ }
+
+ return (0);
+} /* }}} int statsd_metric_clear_set_unsafe */
+
+/* Must hold metrics_lock when calling this function. */
+static int statsd_metric_submit_unsafe (char const *name, /* {{{ */
+ statsd_metric_t const *metric)
+{
+ value_t values[1];
+ value_list_t vl = VALUE_LIST_INIT;
+
+ vl.values = values;
+ vl.values_len = 1;
+ sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+ sstrncpy (vl.plugin, "statsd", sizeof (vl.plugin));
+
+ if (metric->type == STATSD_GAUGE)
+ sstrncpy (vl.type, "gauge", sizeof (vl.type));
+ else if (metric->type == STATSD_TIMER)
+ sstrncpy (vl.type, "latency", sizeof (vl.type));
+ else if (metric->type == STATSD_SET)
+ sstrncpy (vl.type, "objects", sizeof (vl.type));
+ else /* if (metric->type == STATSD_COUNTER) */
+ sstrncpy (vl.type, "derive", sizeof (vl.type));
+
+ sstrncpy (vl.type_instance, name, sizeof (vl.type_instance));
+
+ if (metric->type == STATSD_GAUGE)
+ values[0].gauge = (gauge_t) metric->value;
+ else if (metric->type == STATSD_TIMER)
+ {
+ size_t i;
+
+ if (metric->updates_num == 0)
+ return (0);
+
+ vl.time = cdtime ();
+
+ ssnprintf (vl.type_instance, sizeof (vl.type_instance),
+ "%s-average", name);
+ values[0].gauge = CDTIME_T_TO_DOUBLE (
+ latency_counter_get_average (metric->latency));
+ plugin_dispatch_values (&vl);
+
+ for (i = 0; i < conf_timer_percentile_num; i++)
+ {
+ ssnprintf (vl.type_instance, sizeof (vl.type_instance),
+ "%s-percentile-%.0f", name, conf_timer_percentile[i]);
+ values[0].gauge = CDTIME_T_TO_DOUBLE (
+ latency_counter_get_percentile (
+ metric->latency, conf_timer_percentile[i]));
+ plugin_dispatch_values (&vl);
+ }
+
+ latency_counter_reset (metric->latency);
+ return (0);
+ }
+ else if (metric->type == STATSD_SET)
+ {
+ if (metric->set == NULL)
+ values[0].gauge = 0.0;
+ else
+ values[0].gauge = (gauge_t) c_avl_size (metric->set);
+ }
+ else
+ values[0].derive = (derive_t) metric->value;
+
+ return (plugin_dispatch_values (&vl));
+} /* }}} int statsd_metric_submit_unsafe */
+
+static int statsd_read (void) /* {{{ */
+{
+ c_avl_iterator_t *iter;
+ char *name;
+ statsd_metric_t *metric;
+
+ char **to_be_deleted = NULL;
+ size_t to_be_deleted_num = 0;
+ size_t i;
+
+ pthread_mutex_lock (&metrics_lock);
+
+ if (metrics_tree == NULL)
+ {
+ pthread_mutex_unlock (&metrics_lock);
+ return (0);
+ }
+
+ iter = c_avl_get_iterator (metrics_tree);
+ while (c_avl_iterator_next (iter, (void *) &name, (void *) &metric) == 0)
+ {
+ if ((metric->updates_num == 0)
+ && ((conf_delete_counters && (metric->type == STATSD_COUNTER))
+ || (conf_delete_timers && (metric->type == STATSD_TIMER))
+ || (conf_delete_gauges && (metric->type == STATSD_GAUGE))
+ || (conf_delete_sets && (metric->type == STATSD_SET))))
+ {
+ DEBUG ("statsd plugin: Deleting metric \"%s\".", name);
+ strarray_add (&to_be_deleted, &to_be_deleted_num, name);
+ continue;
+ }
+
+ /* Names have a prefix, e.g. "c:", which determines the (statsd) type.
+ * Remove this here. */
+ statsd_metric_submit_unsafe (name + 2, metric);
+
+ /* Reset the metric. */
+ metric->updates_num = 0;
+ if (metric->type == STATSD_SET)
+ statsd_metric_clear_set_unsafe (metric);
+ }
+ c_avl_iterator_destroy (iter);
+
+ for (i = 0; i < to_be_deleted_num; i++)
+ {
+ int status;
+
+ status = c_avl_remove (metrics_tree, to_be_deleted[i],
+ (void *) &name, (void *) &metric);
+ if (status != 0)
+ {
+ ERROR ("stats plugin: c_avl_remove (\"%s\") failed with status %i.",
+ to_be_deleted[i], status);
+ continue;
+ }
+
+ sfree (name);
+ sfree (metric);
+ }
+
+ pthread_mutex_unlock (&metrics_lock);
+
+ strarray_free (to_be_deleted, to_be_deleted_num);
+
+ return (0);
+} /* }}} int statsd_read */
+
+static int statsd_shutdown (void) /* {{{ */
+{
+ void *key;
+ void *value;
+
+ pthread_mutex_lock (&metrics_lock);
+
+ if (network_thread_running)
+ {
+ network_thread_shutdown = 1;
+ pthread_kill (network_thread, SIGTERM);
+ pthread_join (network_thread, /* retval = */ NULL);
+ }
+ network_thread_running = 0;
+
+ while (c_avl_pick (metrics_tree, &key, &value) == 0)
+ {
+ sfree (key);
+ sfree (value);
+ }
+ c_avl_destroy (metrics_tree);
+ metrics_tree = NULL;
+
+ sfree (conf_node);
+ sfree (conf_service);
+
+ pthread_mutex_unlock (&metrics_lock);
+
+ return (0);
+} /* }}} int statsd_shutdown */
+
+void module_register (void)
+{
+ plugin_register_complex_config ("statsd", statsd_config);
+ plugin_register_init ("statsd", statsd_init);
+ plugin_register_read ("statsd", statsd_read);
+ plugin_register_shutdown ("statsd", statsd_shutdown);
+}
+
+/* vim: set sw=2 sts=2 et fdm=marker : */
diff --git a/src/types.db b/src/types.db
index f1d30f0d1a17df08e41124ce73bac2fde3c0e240..723b477d72e45e1c4eac75ef719a4aa7d06ec641 100644 (file)
--- a/src/types.db
+++ b/src/types.db
snr value:GAUGE:0:U
spam_check value:GAUGE:0:U
spam_score value:GAUGE:U:U
+spl value:GAUGE:U:U
swap_io value:DERIVE:0:U
swap value:GAUGE:0:1099511627776
tcp_connections value:GAUGE:0:4294967295
-temperature value:GAUGE:-273.15:U
+temperature value:GAUGE:U:U
threads value:GAUGE:0:U
time_dispersion value:GAUGE:-1000000:1000000
timeleft value:GAUGE:0:U
diff --git a/src/utils_latency.c b/src/utils_latency.c
--- /dev/null
+++ b/src/utils_latency.c
@@ -0,0 +1,173 @@
+/**
+ * collectd - src/utils_latency.c
+ * Copyright (C) 2013 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <ff at octo.it>
+ **/
+
+#include "collectd.h"
+#include "utils_latency.h"
+#include "common.h"
+
+#ifndef LATENCY_HISTOGRAM_SIZE
+# define LATENCY_HISTOGRAM_SIZE 1000
+#endif
+
+struct latency_counter_s
+{
+ cdtime_t start_time;
+
+ cdtime_t sum;
+ size_t num;
+
+ cdtime_t min;
+ cdtime_t max;
+
+ int histogram[LATENCY_HISTOGRAM_SIZE];
+};
+
+latency_counter_t *latency_counter_create () /* {{{ */
+{
+ latency_counter_t *lc;
+
+ lc = malloc (sizeof (*lc));
+ if (lc == NULL)
+ return (NULL);
+
+ latency_counter_reset (lc);
+ return (lc);
+} /* }}} latency_counter_t *latency_counter_create */
+
+void latency_counter_destroy (latency_counter_t *lc) /* {{{ */
+{
+ sfree (lc);
+} /* }}} void latency_counter_destroy */
+
+void latency_counter_add (latency_counter_t *lc, cdtime_t latency) /* {{{ */
+{
+ size_t latency_ms;
+
+ if ((lc == NULL) || (latency == 0))
+ return;
+
+ lc->sum += latency;
+ lc->num++;
+
+ if ((lc->min == 0) && (lc->max == 0))
+ lc->min = lc->max = latency;
+ if (lc->min > latency)
+ lc->min = latency;
+ if (lc->max < latency)
+ lc->max = latency;
+
+ /* A latency of _exactly_ 1.0 ms should be stored in the buffer 0, so
+ * subtract one from the cdtime_t value so that exactly 1.0 ms get sorted
+ * accordingly. */
+ latency_ms = (size_t) CDTIME_T_TO_MS (latency - 1);
+ if (latency_ms < STATIC_ARRAY_SIZE (lc->histogram))
+ lc->histogram[latency_ms]++;
+} /* }}} void latency_counter_add */
+
+void latency_counter_reset (latency_counter_t *lc) /* {{{ */
+{
+ if (lc == NULL)
+ return;
+
+ memset (lc, 0, sizeof (*lc));
+ lc->start_time = cdtime ();
+} /* }}} void latency_counter_reset */
+
+cdtime_t latency_counter_get_min (latency_counter_t *lc) /* {{{ */
+{
+ if (lc == NULL)
+ return (0);
+ return (lc->min);
+} /* }}} cdtime_t latency_counter_get_min */
+
+cdtime_t latency_counter_get_max (latency_counter_t *lc) /* {{{ */
+{
+ if (lc == NULL)
+ return (0);
+ return (lc->max);
+} /* }}} cdtime_t latency_counter_get_max */
+
+cdtime_t latency_counter_get_average (latency_counter_t *lc) /* {{{ */
+{
+ double average;
+
+ if (lc == NULL)
+ return (0);
+
+ average = CDTIME_T_TO_DOUBLE (lc->sum) / ((double) lc->num);
+ return (DOUBLE_TO_CDTIME_T (average));
+} /* }}} cdtime_t latency_counter_get_average */
+
+cdtime_t latency_counter_get_percentile (latency_counter_t *lc,
+ double percent)
+{
+ double percent_upper;
+ double percent_lower;
+ double ms_upper;
+ double ms_lower;
+ double ms_interpolated;
+ int sum;
+ size_t i;
+
+ if ((lc == NULL) || !((percent > 0.0) && (percent < 100.0)))
+ return (0);
+
+ /* Find index i so that at least "percent" events are within i+1 ms. */
+ percent_upper = 0.0;
+ percent_lower = 0.0;
+ sum = 0;
+ for (i = 0; i < LATENCY_HISTOGRAM_SIZE; i++)
+ {
+ percent_lower = percent_upper;
+ sum += lc->histogram[i];
+ if (sum == 0)
+ percent_upper = 0.0;
+ else
+ percent_upper = 100.0 * ((double) sum) / ((double) lc->num);
+
+ if (percent_upper >= percent)
+ break;
+ }
+
+ if (i >= LATENCY_HISTOGRAM_SIZE)
+ return (0);
+
+ assert (percent_upper >= percent);
+ assert (percent_lower < percent);
+
+ ms_upper = (double) (i + 1);
+ ms_lower = (double) i;
+ if (i == 0)
+ return (MS_TO_CDTIME_T (ms_upper));
+
+ ms_interpolated = (((percent_upper - percent) * ms_lower)
+ + ((percent - percent_lower) * ms_upper))
+ / (percent_upper - percent_lower);
+
+ return (MS_TO_CDTIME_T (ms_interpolated));
+} /* }}} cdtime_t latency_counter_get_percentile */
+
+/* vim: set sw=2 sts=2 et fdm=marker : */
diff --git a/src/utils_latency.h b/src/utils_latency.h
--- /dev/null
+++ b/src/utils_latency.h
@@ -0,0 +1,45 @@
+/**
+ * collectd - src/utils_latency.h
+ * Copyright (C) 2013 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <ff at octo.it>
+ **/
+
+#include "collectd.h"
+#include "utils_time.h"
+
+struct latency_counter_s;
+typedef struct latency_counter_s latency_counter_t;
+
+latency_counter_t *latency_counter_create ();
+void latency_counter_destroy (latency_counter_t *lc);
+
+void latency_counter_add (latency_counter_t *lc, cdtime_t latency);
+void latency_counter_reset (latency_counter_t *lc);
+
+cdtime_t latency_counter_get_min (latency_counter_t *lc);
+cdtime_t latency_counter_get_max (latency_counter_t *lc);
+cdtime_t latency_counter_get_average (latency_counter_t *lc);
+cdtime_t latency_counter_get_percentile (latency_counter_t *lc,
+ double percent);
+
+/* vim: set sw=2 sts=2 et : */