author | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Sat, 6 Nov 2010 11:15:44 +0000 (12:15 +0100) | ||
committer | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Sat, 6 Nov 2010 11:15:44 +0000 (12:15 +0100) |
37 files changed:
index e83c2f8407e4479f4b17b7bcd492babadb41e9c7..c57f90b2d0c38aa625f48e280dac736570ec0284 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
Antony Dovgal <tony at daylessday.org>
- memcached plugin.
+Aurélien Reynaud <collectd at wattapower.net>
+ - LPAR plugin.
+ - Various fixes for AIX, HP-UX and Solaris.
+
Bruno Prémont <bonbons at linux-vserver.org>
- BIND plugin.
- Many bugreports and -fixes in various plugins,
Scott Garrett <sgarrett at technomancer.com>
- tape plugin.
+Sebastien Pahl <sebastien.pahl at dotcloud.com>
+ - AMQP plugin.
+
Simon Kuhnle <simon at blarzwurst.de>
- OpenBSD code for the cpu and memory plugins.
index 0c7a4221369fb496cbd79e87cd22031886cc3427..2ed8934fc92e43842dcab3ed7df8808bc5aa4bfb 100644 (file)
--- a/README
+++ b/README
- load
System load average over the last 1, 5 and 15 minutes.
+ - lpar
+ Detailed CPU statistics of the “Logical Partitions” virtualization
+ technique built into IBM's POWER processors.
+
- libvirt
CPU, disk and network I/O statistics from virtual machines.
* Output can be written or sent to various destinations by the following
plugins:
+ - amqp
+ Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP)
+ server, such as RabbitMQ.
+
- csv
Write to comma separated values (CSV) files. This needs lots of
diskspace but is extremely portable and can be analysed with almost
Used to capture packets by the `dns' plugin.
<http://www.tcpdump.org/>
+ * libperfstat (optional)
+ Used by various plugins to gather statistics under AIX.
+
* libperl (optional)
Obviously used by the `perl' plugin. The library has to be compiled with
ithread support (introduced in Perl 5.6.0).
Used by the `python' plugin. Currently, only 2.3 ≦ Python < 3 is supported.
<http://www.python.org/>
+ * librabbitmq (optional; also called “rabbitmq-c”)
+ Used by the AMQP plugin for AMQP connections, for example to RabbitMQ.
+ <http://hg.rabbitmq.com/rabbitmq-c/>
+
* librouteros (optional)
Used by the `routeros' plugin to connect to a device running `RouterOS'.
<http://verplant.org/librouteros/>
index f1b5d859874c0affc08143e1a536ca6f99747dc6..ca3b5d2349805a505ae22df4faad8c7b9feda529 100644 (file)
}
}
-sub plugin_flush_one {
- my $timeout = shift;
- my $name = shift;
-
- WARNING ("Collectd::plugin_flush_one is deprecated - "
- . "use Collectd::plugin_flush instead.");
-
- if (! (defined ($timeout) && defined ($name))) {
- ERROR ("Usage: Collectd::plugin_flush_one(timeout, name)");
- return;
- }
-
- plugin_flush (plugins => $name, timeout => $timeout);
-}
-
-sub plugin_flush_all {
- my $timeout = shift;
-
- WARNING ("Collectd::plugin_flush_all is deprecated - "
- . "use Collectd::plugin_flush instead.");
-
- if (! defined ($timeout)) {
- ERROR ("Usage: Collectd::plugin_flush_all(timeout)");
- return;
- }
-
- plugin_flush (timeout => $timeout);
-}
-
sub fc_call {
my $type = shift;
my $name = shift;
diff --git a/configure.in b/configure.in
index ba11dc3c249139d343ff0215eeef5245ef98bc1b..af584425ae890d9c9087471de9b8c943e49703d1 100644 (file)
--- a/configure.in
+++ b/configure.in
then
AC_DEFINE(_POSIX_PTHREAD_SEMANTICS, 1, [Define to enforce POSIX thread semantics under Solaris.])
fi
+if test "x$ac_system" = "xAIX"
+then
+ AC_DEFINE(_THREAD_SAFE_ERRNO, 1, [Define to use the thread-safe version of errno under AIX.])
+fi
# Where to install .pc files.
pkgconfigdir="${libdir}/pkgconfig"
if test "x$with_perfstat" = "xyes"
then
AC_DEFINE(HAVE_PERFSTAT, 1, [Define to 1 if you have the 'perfstat' library (-lperfstat)])
+ # struct members pertaining to donation have been added to libperfstat somewhere between AIX5.3ML5 and AIX5.3ML9
+ AC_CHECK_MEMBER([perfstat_partition_type_t.b.donate_enabled], [], [], [[#include <libperfstat.h]])
+ if test "x$av_cv_member_perfstat_partition_type_t_b_donate_enabled" = "xyes"
+ then
+ AC_DEFINE(PERFSTAT_SUPPORTS_DONATION, 1, [Define to 1 if your version of the 'perfstat' library supports donation])
+ fi
fi
AM_CONDITIONAL(BUILD_WITH_PERFSTAT, test "x$with_perfstat" = "xyes")
fi
# }}} --with-python
+# --with-librabbitmq {{{
+with_librabbitmq_cppflags=""
+with_librabbitmq_ldflags=""
+AC_ARG_WITH(librabbitmq, [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [Path to librabbitmq.])],
+[
+ if test "x$withval" != "xno" && test "x$withval" != "xyes"
+ then
+ with_librabbitmq_cppflags="-I$withval/include"
+ with_librabbitmq_ldflags="-L$withval/lib"
+ with_librabbitmq="yes"
+ else
+ with_librabbitmq="$withval"
+ fi
+],
+[
+ with_librabbitmq="yes"
+])
+if test "x$with_librabbitmq" = "xyes"
+then
+ SAVE_CPPFLAGS="$CPPFLAGS"
+ CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+
+ AC_CHECK_HEADERS(amqp.h, [with_librabbitmq="yes"], [with_librabbitmq="no (amqp.h not found)"])
+
+ CPPFLAGS="$SAVE_CPPFLAGS"
+fi
+if test "x$with_librabbitmq" = "xyes"
+then
+ SAVE_CPPFLAGS="$CPPFLAGS"
+ SAVE_LDFLAGS="$LDFLAGS"
+ CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+ LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
+
+ AC_CHECK_LIB(rabbitmq, amqp_basic_publish, [with_librabbitmq="yes"], [with_librabbitmq="no (Symbol 'amqp_basic_publish' not found)"])
+
+ CPPFLAGS="$SAVE_CPPFLAGS"
+ LDFLAGS="$SAVE_LDFLAGS"
+fi
+if test "x$with_librabbitmq" = "xyes"
+then
+ BUILD_WITH_LIBRABBITMQ_CPPFLAGS="$with_librabbitmq_cppflags"
+ BUILD_WITH_LIBRABBITMQ_LDFLAGS="$with_librabbitmq_ldflags"
+ BUILD_WITH_LIBRABBITMQ_LIBS="-lrabbitmq"
+ AC_SUBST(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
+ AC_SUBST(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
+ AC_SUBST(BUILD_WITH_LIBRABBITMQ_LIBS)
+ AC_DEFINE(HAVE_LIBRABBITMQ, 1, [Define if librabbitmq is present and usable.])
+fi
+AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes")
+# }}}
+
# --with-librouteros {{{
AC_ARG_WITH(librouteros, [AS_HELP_STRING([--with-librouteros@<:@=PREFIX@:>@], [Path to librouteros.])],
[
fi
if test "x$have_sysctlbyname" = "xyes"
then
+ plugin_contextswitch="yes"
plugin_cpu="yes"
plugin_memory="yes"
plugin_tcpconns="yes"
m4_divert_once([HELP_ENABLE], [])
+AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin])
AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics])
AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC])
AC_PLUGIN([apple_sensors], [$with_libiokit], [Apple's hardware sensors])
AC_PLUGIN([libvirt], [$plugin_libvirt], [Virtual machine statistics])
AC_PLUGIN([load], [$plugin_load], [System load])
AC_PLUGIN([logfile], [yes], [File logging plugin])
+AC_PLUGIN([lpar], [$with_perfstat], [AIX logical partitions statistics])
AC_PLUGIN([madwifi], [$have_linux_wireless_h], [Madwifi wireless statistics])
AC_PLUGIN([match_empty_counter], [yes], [The empty counter match])
AC_PLUGIN([match_hashed], [yes], [The hashed match])
libperl . . . . . . . $with_libperl
libpq . . . . . . . . $with_libpq
libpthread . . . . . $with_libpthread
+ librabbitmq . . . . . $with_librabbitmq
librouteros . . . . . $with_librouteros
librrd . . . . . . . $with_librrd
libsensors . . . . . $with_libsensors
perl . . . . . . . . $with_perl_bindings
Modules:
+ amqp . . . . . . . $enable_amqp
apache . . . . . . . $enable_apache
apcups . . . . . . . $enable_apcups
apple_sensors . . . . $enable_apple_sensors
libvirt . . . . . . . $enable_libvirt
load . . . . . . . . $enable_load
logfile . . . . . . . $enable_logfile
+ lpar... . . . . . . . $enable_lpar
madwifi . . . . . . . $enable_madwifi
match_empty_counter . $enable_match_empty_counter
match_hashed . . . . $enable_match_hashed
index 9c5e3d1dfe2839f08bb73c251843fb8c23b97c53..3bb3d8b125de6d9a2ac0209f917685a560332277 100644 (file)
<Type ps_cputime>
Module PsCputime
</Type>
+<Type ps_disk_octets>
+ Module GenericIO
+ DataSources read write
+ DSName "read Read "
+ DSName write Written
+ RRDTitle "Process disk traffic ({instance})"
+ RRDVerticalLabel "Bytes per second"
+# RRDOptions ...
+ RRDFormat "%5.1lf%s"
+</Type>
<Type ps_rss>
DataSources value
DSName value RSS
diff --git a/contrib/collection3/lib/Collectd/Graph/Common.pm b/contrib/collection3/lib/Collectd/Graph/Common.pm
index f88c22b5420aeb9e961c208dc688b6f9f463f8e0..c6e250819430c4330f5bf00ba48c847ab2b12819 100644 (file)
for (my $i = 0; $i < @files; $i++)
{
my $file = $files[$i];
- my $key = $file->{'plugin_instance'} || '';
+ my $key1 = $file->{'hostname'} || '';
+ my $key2 = $file->{'plugin_instance'} || '';
+ my $key = "$key1-$key2";
$data->{$key} ||= [];
push (@{$data->{$key}}, $file);
diff --git a/src/Makefile.am b/src/Makefile.am
index 025476afeb8866140c2fcde2c23eb941ee6e1016..e64e167cf11e59fceac7339d95507290ed9bb272 100644 (file)
--- a/src/Makefile.am
+++ b/src/Makefile.am
BUILT_SOURCES =
CLEANFILES =
+if BUILD_PLUGIN_AMQP
+pkglib_LTLIBRARIES += amqp.la
+amqp_la_SOURCES = amqp.c \
+ utils_cmd_putval.c utils_cmd_putval.h \
+ utils_format_json.c utils_format_json.h
+amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
+amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
+amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS)
+collectd_LDADD += "-dlopen" amqp.la
+collectd_DEPENDENCIES += amqp.la
+endif
+
if BUILD_PLUGIN_APACHE
pkglib_LTLIBRARIES += apache.la
apache_la_SOURCES = apache.c
collectd_DEPENDENCIES += logfile.la
endif
+if BUILD_PLUGIN_LPAR
+pkglib_LTLIBRARIES += lpar.la
+lpar_la_SOURCES = lpar.c
+lpar_la_LDFLAGS = -module -avoid-version
+collectd_LDADD += "-dlopen" lpar.la
+collectd_DEPENDENCIES += lpar.la
+lpar_la_LIBADD = -lperfstat
+endif
+
if BUILD_PLUGIN_MADWIFI
pkglib_LTLIBRARIES += madwifi.la
madwifi_la_SOURCES = madwifi.c madwifi.h
diff --git a/src/amqp.c b/src/amqp.c
--- /dev/null
+++ b/src/amqp.c
@@ -0,0 +1,939 @@
+/**
+ * collectd - src/amqp.c
+ * Copyright (C) 2009 Sebastien Pahl
+ * Copyright (C) 2010 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Sebastien Pahl <sebastien.pahl at dotcloud.com>
+ * Florian Forster <octo at verplant.org>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+#include "utils_cmd_putval.h"
+#include "utils_format_json.h"
+
+#include <pthread.h>
+
+#include <amqp.h>
+#include <amqp_framing.h>
+
+/* Defines for the delivery mode. I have no idea why they're not defined by the
+ * library.. */
+#define CAMQP_DM_VOLATILE 1
+#define CAMQP_DM_PERSISTENT 2
+
+#define CAMQP_FORMAT_COMMAND 1
+#define CAMQP_FORMAT_JSON 2
+
+#define CAMQP_CHANNEL 1
+
+/*
+ * Data types
+ */
+struct camqp_config_s
+{
+ _Bool publish;
+ char *name;
+
+ char *host;
+ int port;
+ char *vhost;
+ char *user;
+ char *password;
+
+ char *exchange;
+ char *routing_key;
+
+ /* publish only */
+ uint8_t delivery_mode;
+ _Bool store_rates;
+ int format;
+
+ /* subscribe only */
+ char *exchange_type;
+ char *queue;
+
+ amqp_connection_state_t connection;
+ pthread_mutex_t lock;
+};
+typedef struct camqp_config_s camqp_config_t;
+
+/*
+ * Global variables
+ */
+static const char *def_host = "localhost";
+static const char *def_vhost = "/";
+static const char *def_user = "guest";
+static const char *def_password = "guest";
+static const char *def_exchange = "amq.fanout";
+
+static pthread_t *subscriber_threads = NULL;
+static size_t subscriber_threads_num = 0;
+static _Bool subscriber_threads_running = 1;
+
+#define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
+
+/*
+ * Functions
+ */
+static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
+{
+ int sockfd;
+
+ if ((conf == NULL) || (conf->connection == NULL))
+ return;
+
+ sockfd = amqp_get_sockfd (conf->connection);
+ amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
+ amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection (conf->connection);
+ close (sockfd);
+ conf->connection = NULL;
+} /* }}} void camqp_close_connection */
+
+static void camqp_config_free (void *ptr) /* {{{ */
+{
+ camqp_config_t *conf = ptr;
+
+ if (conf == NULL)
+ return;
+
+ camqp_close_connection (conf);
+
+ sfree (conf->name);
+ sfree (conf->host);
+ sfree (conf->vhost);
+ sfree (conf->user);
+ sfree (conf->password);
+ sfree (conf->exchange);
+ sfree (conf->exchange_type);
+ sfree (conf->queue);
+ sfree (conf->routing_key);
+
+ sfree (conf);
+} /* }}} void camqp_config_free */
+
+static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */
+{
+ char *ret;
+
+ if ((in == NULL) || (in->bytes == NULL))
+ return (NULL);
+
+ ret = malloc (in->len + 1);
+ if (ret == NULL)
+ return (NULL);
+
+ memcpy (ret, in->bytes, in->len);
+ ret[in->len] = 0;
+
+ return (ret);
+} /* }}} char *camqp_bytes_cstring */
+
+static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */
+{
+ amqp_rpc_reply_t r;
+
+ r = amqp_get_rpc_reply (conf->connection);
+ if (r.reply_type == AMQP_RESPONSE_NORMAL)
+ return (0);
+
+ return (1);
+} /* }}} _Bool camqp_is_error */
+
+static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
+ char *buffer, size_t buffer_size)
+{
+ amqp_rpc_reply_t r;
+
+ r = amqp_get_rpc_reply (conf->connection);
+ switch (r.reply_type)
+ {
+ case AMQP_RESPONSE_NORMAL:
+ sstrncpy (buffer, "Success", sizeof (buffer));
+ break;
+
+ case AMQP_RESPONSE_NONE:
+ sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
+ break;
+
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ if (r.library_errno)
+ return (sstrerror (r.library_errno, buffer, buffer_size));
+ else
+ sstrncpy (buffer, "End of stream", sizeof (buffer));
+ break;
+
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
+ {
+ amqp_connection_close_t *m = r.reply.decoded;
+ char *tmp = camqp_bytes_cstring (&m->reply_text);
+ ssnprintf (buffer, buffer_size, "Server connection error %d: %s",
+ m->reply_code, tmp);
+ sfree (tmp);
+ }
+ else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
+ {
+ amqp_channel_close_t *m = r.reply.decoded;
+ char *tmp = camqp_bytes_cstring (&m->reply_text);
+ ssnprintf (buffer, buffer_size, "Server channel error %d: %s",
+ m->reply_code, tmp);
+ sfree (tmp);
+ }
+ else
+ {
+ ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32,
+ r.reply.id);
+ }
+ break;
+
+ default:
+ ssnprintf (buffer, buffer_size, "Unknown reply type %i",
+ (int) r.reply_type);
+ }
+
+ return (buffer);
+} /* }}} char *camqp_strerror */
+
+static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
+{
+ amqp_exchange_declare_ok_t *ed_ret;
+
+ if (conf->exchange_type == NULL)
+ return (0);
+
+ ed_ret = amqp_exchange_declare (conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* exchange = */ amqp_cstring_bytes (conf->exchange),
+ /* type = */ amqp_cstring_bytes (conf->exchange_type),
+ /* passive = */ 0,
+ /* durable = */ 0,
+ /* auto_delete = */ 1,
+ /* arguments = */ AMQP_EMPTY_TABLE);
+ if ((ed_ret == NULL) && camqp_is_error (conf))
+ {
+ char errbuf[1024];
+ ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
+ camqp_strerror (conf, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ INFO ("amqp plugin: Successfully created exchange \"%s\" "
+ "with type \"%s\".",
+ conf->exchange, conf->exchange_type);
+
+ return (0);
+} /* }}} int camqp_create_exchange */
+
+static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
+{
+ amqp_queue_declare_ok_t *qd_ret;
+ amqp_basic_consume_ok_t *cm_ret;
+
+ qd_ret = amqp_queue_declare (conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* queue = */ (conf->queue != NULL)
+ ? amqp_cstring_bytes (conf->queue)
+ : AMQP_EMPTY_BYTES,
+ /* passive = */ 0,
+ /* durable = */ 0,
+ /* exclusive = */ 0,
+ /* auto_delete = */ 1,
+ /* arguments = */ AMQP_EMPTY_TABLE);
+ if (qd_ret == NULL)
+ {
+ ERROR ("amqp plugin: amqp_queue_declare failed.");
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ if (conf->queue == NULL)
+ {
+ conf->queue = camqp_bytes_cstring (&qd_ret->queue);
+ if (conf->queue == NULL)
+ {
+ ERROR ("amqp plugin: camqp_bytes_cstring failed.");
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ INFO ("amqp plugin: Created queue \"%s\".", conf->queue);
+ }
+ DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue);
+
+ /* bind to an exchange */
+ if (conf->exchange != NULL)
+ {
+ amqp_queue_bind_ok_t *qb_ret;
+
+ assert (conf->queue != NULL);
+ qb_ret = amqp_queue_bind (conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* queue = */ amqp_cstring_bytes (conf->queue),
+ /* exchange = */ amqp_cstring_bytes (conf->exchange),
+ /* routing_key = */ (conf->routing_key != NULL)
+ ? amqp_cstring_bytes (conf->routing_key)
+ : AMQP_EMPTY_BYTES,
+ /* arguments = */ AMQP_EMPTY_TABLE);
+ if ((qb_ret == NULL) && camqp_is_error (conf))
+ {
+ char errbuf[1024];
+ ERROR ("amqp plugin: amqp_queue_bind failed: %s",
+ camqp_strerror (conf, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".",
+ conf->queue, conf->exchange);
+ } /* if (conf->exchange != NULL) */
+
+ cm_ret = amqp_basic_consume (conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* queue = */ amqp_cstring_bytes (conf->queue),
+ /* consumer_tag = */ AMQP_EMPTY_BYTES,
+ /* no_local = */ 0,
+ /* no_ack = */ 1,
+ /* exclusive = */ 0);
+ if ((cm_ret == NULL) && camqp_is_error (conf))
+ {
+ char errbuf[1024];
+ ERROR ("amqp plugin: amqp_basic_consume failed: %s",
+ camqp_strerror (conf, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ return (0);
+} /* }}} int camqp_setup_queue */
+
+static int camqp_connect (camqp_config_t *conf) /* {{{ */
+{
+ amqp_rpc_reply_t reply;
+ int sockfd;
+ int status;
+
+ if (conf->connection != NULL)
+ return (0);
+
+ conf->connection = amqp_new_connection ();
+ if (conf->connection == NULL)
+ {
+ ERROR ("amqp plugin: amqp_new_connection failed.");
+ return (ENOMEM);
+ }
+
+ sockfd = amqp_open_socket (CONF(conf, host), conf->port);
+ if (sockfd < 0)
+ {
+ char errbuf[1024];
+ status = (-1) * sockfd;
+ ERROR ("amqp plugin: amqp_open_socket failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ amqp_destroy_connection (conf->connection);
+ conf->connection = NULL;
+ return (status);
+ }
+ amqp_set_sockfd (conf->connection, sockfd);
+
+ reply = amqp_login (conf->connection, CONF(conf, vhost),
+ /* channel max = */ 0,
+ /* frame max = */ 131072,
+ /* heartbeat = */ 0,
+ /* authentication = */ AMQP_SASL_METHOD_PLAIN,
+ CONF(conf, user), CONF(conf, password));
+ if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+ {
+ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
+ CONF(conf, vhost), CONF(conf, user));
+ amqp_destroy_connection (conf->connection);
+ close (sockfd);
+ conf->connection = NULL;
+ return (1);
+ }
+
+ amqp_channel_open (conf->connection, /* channel = */ 1);
+ /* FIXME: Is checking "reply.reply_type" really correct here? How does
+ * it get set? --octo */
+ if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+ {
+ ERROR ("amqp plugin: amqp_channel_open failed.");
+ amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection (conf->connection);
+ close(sockfd);
+ conf->connection = NULL;
+ return (1);
+ }
+
+ INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
+ "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
+
+ status = camqp_create_exchange (conf);
+ if (status != 0)
+ return (status);
+
+ if (!conf->publish)
+ return (camqp_setup_queue (conf));
+ return (0);
+} /* }}} int camqp_connect */
+
+static int camqp_shutdown (void) /* {{{ */
+{
+ size_t i;
+
+ DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
+ subscriber_threads_num);
+
+ subscriber_threads_running = 0;
+ for (i = 0; i < subscriber_threads_num; i++)
+ {
+ /* FIXME: Sending a signal is not very elegant here. Maybe find out how
+ * to use a timeout in the thread and check for the variable in regular
+ * intervals. */
+ pthread_kill (subscriber_threads[i], SIGTERM);
+ pthread_join (subscriber_threads[i], /* retval = */ NULL);
+ }
+
+ subscriber_threads_num = 0;
+ sfree (subscriber_threads);
+
+ DEBUG ("amqp plugin: All subscriber threads exited.");
+
+ return (0);
+} /* }}} int camqp_shutdown */
+
+/*
+ * Subscribing code
+ */
+static int camqp_read_body (camqp_config_t *conf, /* {{{ */
+ size_t body_size, const char *content_type)
+{
+ char body[body_size + 1];
+ char *body_ptr;
+ size_t received;
+ amqp_frame_t frame;
+ int status;
+
+ memset (body, 0, sizeof (body));
+ body_ptr = &body[0];
+ received = 0;
+
+ while (received < body_size)
+ {
+ status = amqp_simple_wait_frame (conf->connection, &frame);
+ if (status < 0)
+ {
+ char errbuf[1024];
+ status = (-1) * status;
+ ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (status);
+ }
+
+ if (frame.frame_type != AMQP_FRAME_BODY)
+ {
+ NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
+ frame.frame_type);
+ return (-1);
+ }
+
+ if ((body_size - received) < frame.payload.body_fragment.len)
+ {
+ WARNING ("amqp plugin: Body is larger than indicated by header.");
+ return (-1);
+ }
+
+ memcpy (body_ptr, frame.payload.body_fragment.bytes,
+ frame.payload.body_fragment.len);
+ body_ptr += frame.payload.body_fragment.len;
+ received += frame.payload.body_fragment.len;
+ } /* while (received < body_size) */
+
+ if (strcasecmp ("text/collectd", content_type) == 0)
+ {
+ status = handle_putval (stderr, body);
+ if (status != 0)
+ ERROR ("amqp plugin: handle_putval failed with status %i.",
+ status);
+ return (status);
+ }
+ else if (strcasecmp ("application/json", content_type) == 0)
+ {
+ ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not "
+ "been implemented yet. FIXME!");
+ return (0);
+ }
+ else
+ {
+ ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
+ content_type);
+ return (EINVAL);
+ }
+
+ /* not reached */
+ return (0);
+} /* }}} int camqp_read_body */
+
+static int camqp_read_header (camqp_config_t *conf) /* {{{ */
+{
+ int status;
+ amqp_frame_t frame;
+ amqp_basic_properties_t *properties;
+ char *content_type;
+
+ status = amqp_simple_wait_frame (conf->connection, &frame);
+ if (status < 0)
+ {
+ char errbuf[1024];
+ status = (-1) * status;
+ ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (status);
+ }
+
+ if (frame.frame_type != AMQP_FRAME_HEADER)
+ {
+ NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
+ frame.frame_type);
+ return (-1);
+ }
+
+ properties = frame.payload.properties.decoded;
+ content_type = camqp_bytes_cstring (&properties->content_type);
+ if (content_type == NULL)
+ {
+ ERROR ("amqp plugin: Unable to determine content type.");
+ return (-1);
+ }
+
+ status = camqp_read_body (conf,
+ (size_t) frame.payload.properties.body_size,
+ content_type);
+
+ sfree (content_type);
+ return (status);
+} /* }}} int camqp_read_header */
+
+static void *camqp_subscribe_thread (void *user_data) /* {{{ */
+{
+ camqp_config_t *conf = user_data;
+ int status;
+
+ while (subscriber_threads_running)
+ {
+ amqp_frame_t frame;
+
+ status = camqp_connect (conf);
+ if (status != 0)
+ {
+ ERROR ("amqp plugin: camqp_connect failed. "
+ "Will sleep for %i seconds.", interval_g);
+ sleep (interval_g);
+ continue;
+ }
+
+ status = amqp_simple_wait_frame (conf->connection, &frame);
+ if (status < 0)
+ {
+ ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
+ "Will sleep for %i seconds.", interval_g);
+ camqp_close_connection (conf);
+ sleep (interval_g);
+ continue;
+ }
+
+ if (frame.frame_type != AMQP_FRAME_METHOD)
+ {
+ DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8,
+ frame.frame_type);
+ continue;
+ }
+
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
+ {
+ DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32,
+ frame.payload.method.id);
+ continue;
+ }
+
+ status = camqp_read_header (conf);
+
+ amqp_maybe_release_buffers (conf->connection);
+ } /* while (subscriber_threads_running) */
+
+ camqp_config_free (conf);
+ pthread_exit (NULL);
+} /* }}} void *camqp_subscribe_thread */
+
+static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
+{
+ int status;
+ pthread_t *tmp;
+
+ tmp = realloc (subscriber_threads,
+ sizeof (*subscriber_threads) * (subscriber_threads_num + 1));
+ if (tmp == NULL)
+ {
+ ERROR ("amqp plugin: realloc failed.");
+ camqp_config_free (conf);
+ return (ENOMEM);
+ }
+ subscriber_threads = tmp;
+ tmp = subscriber_threads + subscriber_threads_num;
+ memset (tmp, 0, sizeof (*tmp));
+
+ status = pthread_create (tmp, /* attr = */ NULL,
+ camqp_subscribe_thread, conf);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("amqp plugin: pthread_create failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ camqp_config_free (conf);
+ return (status);
+ }
+
+ subscriber_threads_num++;
+
+ return (0);
+} /* }}} int camqp_subscribe_init */
+
+/*
+ * Publishing code
+ */
+/* XXX: You must hold "conf->lock" when calling this function! */
+static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
+ const char *buffer, const char *routing_key)
+{
+ amqp_basic_properties_t props;
+ int status;
+
+ status = camqp_connect (conf);
+ if (status != 0)
+ return (status);
+
+ memset (&props, 0, sizeof (props));
+ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+ | AMQP_BASIC_DELIVERY_MODE_FLAG
+ | AMQP_BASIC_APP_ID_FLAG;
+ if (conf->format == CAMQP_FORMAT_COMMAND)
+ props.content_type = amqp_cstring_bytes("text/collectd");
+ else if (conf->format == CAMQP_FORMAT_JSON)
+ props.content_type = amqp_cstring_bytes("application/json");
+ else
+ assert (23 == 42);
+ props.delivery_mode = conf->delivery_mode;
+ props.app_id = amqp_cstring_bytes("collectd");
+
+ status = amqp_basic_publish(conf->connection,
+ /* channel = */ 1,
+ amqp_cstring_bytes(CONF(conf, exchange)),
+ amqp_cstring_bytes (routing_key),
+ /* mandatory = */ 0,
+ /* immediate = */ 0,
+ &props,
+ amqp_cstring_bytes(buffer));
+ if (status != 0)
+ {
+ ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
+ status);
+ camqp_close_connection (conf);
+ }
+
+ return (status);
+} /* }}} int camqp_write_locked */
+
+static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
+ user_data_t *user_data)
+{
+ camqp_config_t *conf = user_data->data;
+ char routing_key[6 * DATA_MAX_NAME_LEN];
+ char buffer[4096];
+ int status;
+
+ if ((ds == NULL) || (vl == NULL) || (conf == NULL))
+ return (EINVAL);
+
+ memset (buffer, 0, sizeof (buffer));
+
+ if (conf->routing_key != NULL)
+ {
+ sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
+ }
+ else
+ {
+ size_t i;
+ ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
+ vl->host,
+ vl->plugin, vl->plugin_instance,
+ vl->type, vl->type_instance);
+
+ /* Switch slashes (the only character forbidden by collectd) and dots
+ * (the separation character used by AMQP). */
+ for (i = 0; routing_key[i] != 0; i++)
+ {
+ if (routing_key[i] == '.')
+ routing_key[i] = '/';
+ else if (routing_key[i] == '/')
+ routing_key[i] = '.';
+ }
+ }
+
+ if (conf->format == CAMQP_FORMAT_COMMAND)
+ {
+ status = create_putval (buffer, sizeof (buffer), ds, vl);
+ if (status != 0)
+ {
+ ERROR ("amqp plugin: create_putval failed with status %i.",
+ status);
+ return (status);
+ }
+ }
+ else if (conf->format == CAMQP_FORMAT_JSON)
+ {
+ size_t bfree = sizeof (buffer);
+ size_t bfill = 0;
+
+ format_json_initialize (buffer, &bfill, &bfree);
+ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
+ format_json_finalize (buffer, &bfill, &bfree);
+ }
+ else
+ {
+ ERROR ("amqp plugin: Invalid format (%i).", conf->format);
+ return (-1);
+ }
+
+ pthread_mutex_lock (&conf->lock);
+ status = camqp_write_locked (conf, buffer, routing_key);
+ pthread_mutex_unlock (&conf->lock);
+
+ return (status);
+} /* }}} int camqp_write */
+
+/*
+ * Config handling
+ */
+static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
+ camqp_config_t *conf)
+{
+ char *string;
+ int status;
+
+ string = NULL;
+ status = cf_util_get_string (ci, &string);
+ if (status != 0)
+ return (status);
+
+ assert (string != NULL);
+ if (strcasecmp ("Command", string) == 0)
+ conf->format = CAMQP_FORMAT_COMMAND;
+ else if (strcasecmp ("JSON", string) == 0)
+ conf->format = CAMQP_FORMAT_JSON;
+ else
+ {
+ WARNING ("amqp plugin: Invalid format string: %s",
+ string);
+ }
+
+ free (string);
+
+ return (0);
+} /* }}} int config_set_string */
+
+static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
+ _Bool publish)
+{
+ camqp_config_t *conf;
+ int status;
+ int i;
+
+ conf = malloc (sizeof (*conf));
+ if (conf == NULL)
+ {
+ ERROR ("amqp plugin: malloc failed.");
+ return (ENOMEM);
+ }
+
+ /* Initialize "conf" {{{ */
+ memset (conf, 0, sizeof (*conf));
+ conf->publish = publish;
+ conf->name = NULL;
+ conf->format = CAMQP_FORMAT_COMMAND;
+ conf->host = NULL;
+ conf->port = 5672;
+ conf->vhost = NULL;
+ conf->user = NULL;
+ conf->password = NULL;
+ conf->exchange = NULL;
+ conf->routing_key = NULL;
+ /* publish only */
+ conf->delivery_mode = CAMQP_DM_VOLATILE;
+ conf->store_rates = 0;
+ /* subscribe only */
+ conf->exchange_type = NULL;
+ conf->queue = NULL;
+ /* general */
+ conf->connection = NULL;
+ pthread_mutex_init (&conf->lock, /* attr = */ NULL);
+ /* }}} */
+
+ status = cf_util_get_string (ci, &conf->name);
+ if (status != 0)
+ {
+ sfree (conf);
+ return (status);
+ }
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Host", child->key) == 0)
+ status = cf_util_get_string (child, &conf->host);
+ else if (strcasecmp ("Port", child->key) == 0)
+ {
+ status = cf_util_get_port_number (child);
+ if (status > 0)
+ {
+ conf->port = status;
+ status = 0;
+ }
+ }
+ else if (strcasecmp ("VHost", child->key) == 0)
+ status = cf_util_get_string (child, &conf->vhost);
+ else if (strcasecmp ("User", child->key) == 0)
+ status = cf_util_get_string (child, &conf->user);
+ else if (strcasecmp ("Password", child->key) == 0)
+ status = cf_util_get_string (child, &conf->password);
+ else if (strcasecmp ("Exchange", child->key) == 0)
+ status = cf_util_get_string (child, &conf->exchange);
+ else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
+ status = cf_util_get_string (child, &conf->exchange_type);
+ else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
+ status = cf_util_get_string (child, &conf->queue);
+ else if (strcasecmp ("RoutingKey", child->key) == 0)
+ status = cf_util_get_string (child, &conf->routing_key);
+ else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
+ {
+ _Bool tmp = 0;
+ status = cf_util_get_boolean (child, &tmp);
+ if (tmp)
+ conf->delivery_mode = CAMQP_DM_PERSISTENT;
+ else
+ conf->delivery_mode = CAMQP_DM_VOLATILE;
+ }
+ else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
+ status = cf_util_get_boolean (child, &conf->store_rates);
+ else if ((strcasecmp ("Format", child->key) == 0) && publish)
+ status = camqp_config_set_format (child, conf);
+ else
+ WARNING ("amqp plugin: Ignoring unknown "
+ "configuration option \"%s\".", child->key);
+
+ if (status != 0)
+ break;
+ } /* for (i = 0; i < ci->children_num; i++) */
+
+ if ((status == 0) && (conf->exchange == NULL))
+ {
+ if (conf->exchange_type != NULL)
+ WARNING ("amqp plugin: The option \"ExchangeType\" was given "
+ "without the \"Exchange\" option. It will be ignored.");
+
+ if (!publish && (conf->routing_key != NULL))
+ WARNING ("amqp plugin: The option \"RoutingKey\" was given "
+ "without the \"Exchange\" option. It will be ignored.");
+
+ }
+
+ if (status != 0)
+ {
+ camqp_config_free (conf);
+ return (status);
+ }
+
+ if (conf->exchange != NULL)
+ {
+ DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;",
+ conf->exchange);
+ }
+
+ if (publish)
+ {
+ char cbname[128];
+ user_data_t ud = { conf, camqp_config_free };
+
+ ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
+
+ status = plugin_register_write (cbname, camqp_write, &ud);
+ if (status != 0)
+ {
+ camqp_config_free (conf);
+ return (status);
+ }
+ }
+ else
+ {
+ status = camqp_subscribe_init (conf);
+ if (status != 0)
+ {
+ camqp_config_free (conf);
+ return (status);
+ }
+ }
+
+ return (0);
+} /* }}} int camqp_config_connection */
+
+static int camqp_config (oconfig_item_t *ci) /* {{{ */
+{
+ int i;
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Publish", child->key) == 0)
+ camqp_config_connection (child, /* publish = */ 1);
+ else if (strcasecmp ("Subscribe", child->key) == 0)
+ camqp_config_connection (child, /* publish = */ 0);
+ else
+ WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
+ child->key);
+ } /* for (ci->children_num) */
+
+ return (0);
+} /* }}} int camqp_config */
+
+void module_register (void)
+{
+ plugin_register_complex_config ("amqp", camqp_config);
+ plugin_register_shutdown ("amqp", camqp_shutdown);
+} /* void module_register */
+
+/* vim: set sw=4 sts=4 et fdm=marker : */
diff --git a/src/apache.c b/src/apache.c
index 3d6d957c5c2a721a3ce328fd19e27c2439e39a8c..506ba84ec23e58d9396e9165454c3a08a6e33be3 100644 (file)
--- a/src/apache.c
+++ b/src/apache.c
/**
* collectd - src/apache.c
- * Copyright (C) 2006-2009 Florian octo Forster
+ * Copyright (C) 2006-2010 Florian octo Forster
* Copyright (C) 2007 Florent EppO Monbillard
* Copyright (C) 2009 Amit Gupta
*
st->server_type = APACHE;
else if (strstr (buf, "lighttpd") != NULL)
st->server_type = LIGHTTPD;
+ else if (strstr (buf, "IBM_HTTP_Server") != NULL)
+ st->server_type = APACHE;
else
{
const char *hdr = buf;
{
int status = 0;
int i;
- oconfig_item_t *lci = NULL; /* legacy config */
for (i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;
- if (strcasecmp ("Instance", child->key) == 0 && child->children_num > 0)
+ if (strcasecmp ("Instance", child->key) == 0)
config_add (child);
else
- {
- /* legacy mode - convert to <Instance ...> config */
- if (lci == NULL)
- {
- lci = malloc (sizeof(*lci));
- if (lci == NULL)
- {
- ERROR ("apache plugin: malloc failed.");
- return (-1);
- }
- memset (lci, '\0', sizeof (*lci));
- }
-
- lci->children_num++;
- lci->children =
- realloc (lci->children,
- lci->children_num * sizeof (*child));
- if (lci->children == NULL)
- {
- ERROR ("apache plugin: realloc failed.");
- return (-1);
- }
- memcpy (&lci->children[lci->children_num-1], child, sizeof (*child));
- }
+ WARNING ("apache plugin: The configuration option "
+ "\"%s\" is not allowed here. Did you "
+ "forget to add an <Instance /> block "
+ "around the configuration?",
+ child->key);
} /* for (ci->children) */
- if (lci)
- {
- /* create a <Instance ""> entry */
- lci->key = "Instance";
- lci->values_num = 1;
- lci->values = (oconfig_value_t *) malloc (lci->values_num * sizeof (oconfig_value_t));
- lci->values[0].type = OCONFIG_TYPE_STRING;
- lci->values[0].value.string = "";
-
- status = config_add (lci);
- sfree (lci->values);
- sfree (lci->children);
- sfree (lci);
- }
-
- return status;
+ return (status);
} /* int config */
/* initialize curl for each host */
st->server_type = APACHE;
else if (strcasecmp(st->server, "lighttpd") == 0)
st->server_type = LIGHTTPD;
+ else if (strcasecmp(st->server, "ibm_http_server") == 0)
+ st->server_type = APACHE;
else
WARNING ("apache plugin: Unknown `Server' setting: %s",
st->server);
diff --git a/src/collectd-perl.pod b/src/collectd-perl.pod
index 5c11b65281cea66df6cf786f102e7abbb363ee0a..6b44722cfe808605603a549ac3690c5998bb1eb7 100644 (file)
--- a/src/collectd-perl.pod
+++ b/src/collectd-perl.pod
@@ -376,11 +376,6 @@ is found (and the number of values matches the number of data-sources) then the
type, data-set and value-list is passed to all write-callbacks that are
registered with the daemon.
-B<Note>: Prior to version 4.4 of collectd, the data-set type used to be passed
-as the first argument to B<plugin_register>. This syntax is still supported
-for backwards compatibility but has been deprecated and will be removed in
-some future version of collectd.
-
=item B<plugin_write> ([B<plugins> => I<...>][, B<datasets> => I<...>],
B<valuelists> => I<...>)
the B<plugins> and B<identifiers> arguments may either be a string or a
reference to an array of strings.
-=item B<plugin_flush_one> (I<timeout>, I<plugin>)
-
-This is identical to using "plugin_flush (timeout =E<gt> I<timeout>, plugins
-=E<gt> I<plugin>".
-
-B<Note>: Starting with version 4.5 of collectd, B<plugin_flush_one> has been
-deprecated and will be removed in some future version of collectd. Use
-B<plugin_flush> instead.
-
-=item B<plugin_flush_all> (I<timeout>)
-
-This is identical to using "plugin_flush (timeout =E<gt> I<timeout>)".
-
-B<Note>: Starting with version 4.5 of collectd, B<plugin_flush_all> has been
-deprecated and will be removed in some future version of collectd. Use
-B<plugin_flush> instead.
-
=item B<plugin_dispatch_notification> (I<notification>)
Submits a I<notification> to the daemon which will then pass it to all
index b9408a3d94957c022dda3ca7ba8025a7e6b2013a..267296cfd6009a82904198f3b999b3dfe5aa3ab7 100644 (file)
--- a/src/collectd-python.pod
+++ b/src/collectd-python.pod
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the "Software"),
+# to deal in the Software without restriction, including without limitation
+# the rights to use, copy, modify, merge, publish, distribute, sublicense,
+# and/or sell copies of the Software, and to permit persons to whom the
+# Software is furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+
=head1 NAME
collectd-python - Documentation of collectd's C<python plugin>
=item
-This plugin is not compatible with python3. Trying to compile it with python3
-will fail because of the ways string, unicode and bytearray behavior was
-changed.
-
-=item
-
Not all aspects of the collectd API are accessible from python. This includes
-but is not limited to meta-data, filters and data sets.
+but is not limited to filters and data sets.
=back
diff --git a/src/collectd.conf.in b/src/collectd.conf.in
index cc125ddf2ebb7fdd88e8431c24dedd330f789959..42addd270084f09c41f9ead41de8704773d22e18 100644 (file)
--- a/src/collectd.conf.in
+++ b/src/collectd.conf.in
# to missing dependencies or because they have been deactivated explicitly. #
##############################################################################
+#@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp
#@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache
#@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups
#@BUILD_PLUGIN_APPLE_SENSORS_TRUE@LoadPlugin apple_sensors
# ription of those options is available in the collectd.conf(5) manual page. #
##############################################################################
+#<Plugin "amqp">
+# <Publish "name">
+# Host "localhost"
+# Port "5672"
+# VHost "/"
+# User "guest"
+# Password "guest"
+# Exchange "amq.fanout"
+# RoutingKey "collectd"
+# Persistent false
+# StoreRates false
+# </Publish>
+#</Plugin>
+
#<Plugin apache>
-# URL "http://localhost/status?auto"
-# User "www-user"
-# Password "secret"
-# CACert "/etc/ssl/ca.crt"
+# <Instance "local">
+# URL "http://localhost/status?auto"
+# User "www-user"
+# Password "secret"
+# CACert "/etc/ssl/ca.crt"
+# </Instance>
#</Plugin>
#<Plugin apcups>
diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod
index af07cdf23017a2afbf6881ebe16563d86e6ff290..1da35982d032d716474d14a208679a23ff6ba212 100644 (file)
--- a/src/collectd.conf.pod
+++ b/src/collectd.conf.pod
F<README> file shipped with the sourcecode and hopefully binary packets as
well.
+=head2 Plugin C<amqp>
+
+The I<AMQMP plugin> can be used to communicate with other instances of
+I<collectd> or third party applications using an AMQP message broker. Values
+are sent to or received from the broker, which handles routing, queueing and
+possibly filtering or messages.
+
+ <Plugin "amqp">
+ # Send values to an AMQP broker
+ <Publish "some_name">
+ Host "localhost"
+ Port "5672"
+ VHost "/"
+ User "guest"
+ Password "guest"
+ Exchange "amq.fanout"
+ # ExchangeType "fanout"
+ # RoutingKey "collectd"
+ # Persistent false
+ # Format "command"
+ # StoreRates false
+ </Publish>
+
+ # Receive values from an AMQP broker
+ <Subscribe "some_name">
+ Host "localhost"
+ Port "5672"
+ VHost "/"
+ User "guest"
+ Password "guest"
+ Exchange "amq.fanout"
+ # ExchangeType "fanout"
+ # Queue "queue_name"
+ # RoutingKey "collectd.#"
+ </Subscribe>
+ </Plugin>
+
+The plugin's configuration consists of a number of I<Publish> and I<Subscribe>
+blocks, which configure sending and receiving of values respectively. The two
+blocks are very similar, so unless otherwise noted, an option can be used in
+either block. The name given in the blocks starting tag is only used for
+reporting messages, but may be used to support I<flushing> of certain
+I<Publish> blocks in the future.
+
+=over 4
+
+=item B<Host> I<Host>
+
+Hostname or IP-address of the AMQP broker. Defaults to the default behavior of
+the underlying communications library, I<rabbitmq-c>, which is "localhost".
+
+=item B<Port> I<Port>
+
+Service name or port number on which the AMQP broker accepts connections. This
+argument must be a string, even if the numeric form is used. Defaults to
+"5672".
+
+=item B<VHost> I<VHost>
+
+Name of the I<virtual host> on the AMQP broker to use. Defaults to "/".
+
+=item B<User> I<User>
+
+=item B<Password> I<Password>
+
+Credentials used to authenticate to the AMQP broker. By default "guest"/"guest"
+is used.
+
+=item B<Exchange> I<Exchange>
+
+In I<Publish> blocks, this option specifies the I<exchange> to send values to.
+By default, "amq.fanout" will be used.
+
+In I<Subscribe> blocks this option is optional. If given, a I<binding> between
+the given exchange and the I<queue> is created, using the I<routing key> if
+configured. See the B<Queue> and B<RoutingKey> options below.
+
+=item B<ExchangeType> I<Type>
+
+If given, the plugin will try to create the configured I<exchange> with this
+I<type> after connecting. When in a I<Subscribe> block, the I<queue> will then
+be bound to this exchange.
+
+=item B<Queue> I<Queue> (Subscribe only)
+
+Configures the I<queue> name to subscribe to. If no queue name was configures
+explicitly, a unique queue name will be created by the broker.
+
+=item B<RoutingKey> I<Key>
+
+In I<Publish> blocks, this configures the routing key to set on all outgoing
+messages. If not given, the routing key will be computed from the I<identifier>
+of the value. The host, plugin, type and the two instances are concatenated
+together using dots as the separator and all containing dots replaced with
+slashes. For example "collectd.host/example/com.cpu.0.cpu.user". This makes it
+possible to receive only specific values using a "topic" exchange.
+
+In I<Subscribe> blocks, configures the I<routing key> used when creating a
+I<binding> between an I<exchange> and the I<queue>. The usual wildcards can be
+used to filter messages when using a "topic" exchange. If you're only
+interested in CPU statistics, you could use the routing key "collectd.*.cpu.#"
+for example.
+
+=item B<Persistent> B<true>|B<false> (Publish only)
+
+Selects the I<delivery method> to use. If set to B<true>, the I<persistent>
+mode will be used, i.e. delivery is guaranteed. If set to B<false> (the
+default), the I<transient> delivery mode will be used, i.e. messages may be
+lost due to high load, overflowing queues or similar issues.
+
+=item B<Format> B<Command>|B<JSON> (Publish only)
+
+Selects the format in which messages are sent to the broker. If set to
+B<Command> (the default), values are sent as C<PUTVAL> commands which are
+identical to the syntax used by the I<Exec> and I<UnixSock plugins>. In this
+case, the C<Content-Type> header field will be set to C<text/collectd>.
+
+If set to B<JSON>, the values are encoded in the I<JavaScript Object Notation>,
+an easy and straight forward exchange format. The C<Content-Type> header field
+will be set to C<application/json>.
+
+A subscribing client I<should> use the C<Content-Type> header field to
+determine how to decode the values. Currently, the I<AMQP plugin> itself can
+only decode the B<Command> format.
+
+=item B<StoreRates> B<true>|B<false> (Publish only)
+
+Determines whether or not C<COUNTER>, C<DERIVE> and C<ABSOLUTE> data sources
+are converted to a I<rate> (i.e. a C<GAUGE> value). If set to B<false> (the
+default), no conversion is performed. Otherwise the conversion is performed
+using the internal value cache.
+
+Please note that currently this option is only used if the B<Format> option has
+been set to B<JSON>.
+
+=back
+
=head2 Plugin C<apache>
To configure the C<apache>-plugin you first need to configure the Apache
also supported. It introduces a new field, called C<BusyServers>, to count the
number of currently connected clients. This field is also supported.
-The following options are accepted by the C<apache>-plugin:
+The configuration of the I<Apache> plugin consists of one or more
+C<E<lt>InstanceE<nbsp>/E<gt>> blocks. Each block requires one string argument
+as the instance name. For example:
+
+ <Plugin "apache">
+ <Instance "www1">
+ URL "http://www1.example.com/mod_status?auto"
+ </Instance>
+ <Instance "www2">
+ URL "http://www2.example.com/mod_status?auto"
+ </Instance>
+ </Plugin>
+
+The instance name will be used as the I<plugin instance>. To emulate the old
+(versionE<nbsp>4) behavior, you can use an empty string (""). In order for the
+plugin to work correctly, each instance name must be unique. This is not
+enforced by the plugin and it is your responsibility to ensure it.
+
+The following options are accepted within each I<Instance> block:
=over 4
Sets the URL of the C<mod_status> output. This needs to be the output generated
by C<ExtendedStatus on> and it needs to be the machine readable output
-generated by appending the C<?auto> argument.
+generated by appending the C<?auto> argument. This option is I<mandatory>.
=item B<User> I<Username>
=item B<MaxPacketSize> I<1024-65535>
Set the maximum size for datagrams received over the network. Packets larger
-than this will be truncated.
+than this will be truncated. Defaults to 1452E<nbsp>bytes.
=item B<Forward> I<true|false>
diff --git a/src/collectd.h b/src/collectd.h
index 8849b30b221ffa776e6e0303acab8b9919e20b02..93d356e3e09ebf3edec48a467d925e6b855e83c0 100644 (file)
--- a/src/collectd.h
+++ b/src/collectd.h
#if HAVE_STDINT_H
# include <stdint.h>
#endif
-#if HAVE_STDBOOL_H
-# include <stdbool.h>
-#else
-# ifndef HAVE__BOOL
-# ifdef __cplusplus
-typedef bool _Bool;
-# else
-# define _Bool signed char
-# endif
-# endif
-# define bool _Bool
-# define false 0
-# define true 1
-# define __bool_true_false_are_defined 1
-#endif
#if HAVE_UNISTD_H
# include <unistd.h>
#endif
# include <kstat.h>
#endif
-#if HAVE_SENSORS_SENSORS_H
-# include <sensors/sensors.h>
-#endif
-
#ifndef PACKAGE_NAME
#define PACKAGE_NAME "collectd"
#endif
diff --git a/src/contextswitch.c b/src/contextswitch.c
index 06055ca59372299616a6f8b1bf105d30e34760f4..c207318f9d62425fbfb18e7e86727bce62b65bcb 100644 (file)
--- a/src/contextswitch.c
+++ b/src/contextswitch.c
/**
* collectd - src/contextswitch.c
* Copyright (C) 2009 Patrik Weiskircher
+ * Copyright (C) 2010 Kimo Rosenbaum
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
*
* Authors:
* Patrik Weiskircher <weiskircher at inqnet.at>
+ * Kimo Rosenbaum <http://github.com/kimor79>
**/
#include "collectd.h"
#include "common.h"
#include "plugin.h"
-#if !KERNEL_LINUX
+#ifdef HAVE_SYS_SYSCTL_H
+# include <sys/sysctl.h>
+#endif
+
+#if HAVE_SYSCTLBYNAME
+/* no global variables */
+/* #endif HAVE_SYSCTLBYNAME */
+
+#elif KERNEL_LINUX
+/* no global variables */
+/* #endif KERNEL_LINUX */
+
+#else
# error "No applicable input method."
#endif
static int cs_read (void)
{
+#if HAVE_SYSCTLBYNAME
+ int value = 0;
+ size_t value_len = sizeof (value);
+ int status;
+
+ status = sysctlbyname ("vm.stats.sys.v_swtch",
+ &value, &value_len,
+ /* new pointer = */ NULL, /* new length = */ 0);
+ if (status != 0)
+ {
+ ERROR("contextswitch plugin: sysctlbyname "
+ "(vm.stats.sys.v_swtch) failed");
+ return (-1);
+ }
+
+ cs_submit (value);
+/* #endif HAVE_SYSCTLBYNAME */
+
+#elif KERNEL_LINUX
FILE *fh;
char buffer[64];
int numfields;
if (status == -2)
ERROR ("contextswitch plugin: Unable to find context switch value.");
+#endif /* KERNEL_LINUX */
return status;
}
diff --git a/src/curl.c b/src/curl.c
index a533e147b1b6e2af1dcf00ff7634667b20bf4784..8b95c80febb279a1c6f2e0579787274bb113967f 100644 (file)
--- a/src/curl.c
+++ b/src/curl.c
vl.values = values;
vl.values_len = 1;
- vl.time = time (NULL);
sstrncpy (vl.host, hostname_g, sizeof (vl.host));
sstrncpy (vl.plugin, "curl", sizeof (vl.plugin));
sstrncpy (vl.plugin_instance, wp->instance, sizeof (vl.plugin_instance));
@@ -596,7 +595,6 @@ static void cc_submit_response_time (const web_page_t *wp, double seconds) /* {{
vl.values = values;
vl.values_len = 1;
- vl.time = time (NULL);
sstrncpy (vl.host, hostname_g, sizeof (vl.host));
sstrncpy (vl.plugin, "curl", sizeof (vl.plugin));
sstrncpy (vl.plugin_instance, wp->instance, sizeof (vl.plugin_instance));
diff --git a/src/curl_json.c b/src/curl_json.c
index 21deed61a1e1192ca12bfdf11208a0de4582c461..433764e285014902dd7bbe7f53371ed2cf364a7b 100644 (file)
--- a/src/curl_json.c
+++ b/src/curl_json.c
return (0);
status = yajl_parse(db->yajl, (unsigned char *)buf, len);
- if (status == yajl_status_ok)
- {
- status = yajl_parse_complete(db->yajl);
- return (len);
- }
- else if (status == yajl_status_insufficient_data)
- return (len);
-
- if (status != yajl_status_ok)
+ if ((status != yajl_status_ok)
+ && (status != yajl_status_insufficient_data))
{
unsigned char *msg =
- yajl_get_error(db->yajl, 1, (unsigned char *)buf, len);
+ yajl_get_error(db->yajl, /* verbose = */ 1,
+ /* jsonText = */ (unsigned char *) buf, (unsigned int) len);
ERROR ("curl_json plugin: yajl_parse failed: %s", msg);
yajl_free_error(db->yajl, msg);
return (0); /* abort write callback */
if (db->yajl == NULL)
{
ERROR ("curl_json plugin: yajl_alloc failed.");
+ db->yajl = yprev;
return (-1);
}
status = curl_easy_perform (curl);
-
- yajl_free (db->yajl);
- db->yajl = yprev;
+ if (status != 0)
+ {
+ ERROR ("curl_json plugin: curl_easy_perform failed with status %i: %s (%s)",
+ status, db->curl_errbuf, url);
+ yajl_free (db->yajl);
+ db->yajl = yprev;
+ return (-1);
+ }
curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &url);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc);
/* The response code is zero if a non-HTTP transport was used. */
if ((rc != 0) && (rc != 200))
{
- ERROR ("curl_json plugin: curl_easy_perform failed with response code %ld (%s)",
- rc, url);
+ ERROR ("curl_json plugin: curl_easy_perform failed with "
+ "response code %ld (%s)", rc, url);
+ yajl_free (db->yajl);
+ db->yajl = yprev;
return (-1);
}
- if (status != 0)
+ status = yajl_parse_complete (db->yajl);
+ if (status != yajl_status_ok)
{
- ERROR ("curl_json plugin: curl_easy_perform failed with status %i: %s (%s)",
- status, db->curl_errbuf, url);
+ unsigned char *errmsg;
+
+ errmsg = yajl_get_error (db->yajl, /* verbose = */ 0,
+ /* jsonText = */ NULL, /* jsonTextLen = */ 0);
+ ERROR ("curl_json plugin: yajl_parse_complete failed: %s",
+ (char *) errmsg);
+ yajl_free_error (db->yajl, errmsg);
+ yajl_free (db->yajl);
+ db->yajl = yprev;
return (-1);
}
+ yajl_free (db->yajl);
+ db->yajl = yprev;
return (0);
} /* }}} int cj_curl_perform */
diff --git a/src/df.c b/src/df.c
index b2be8e5eb77a40f55abb499934956eaefe05a3f7..4b3cba019cea9219289cc8adef7633a05e30a9ba 100644 (file)
--- a/src/df.c
+++ b/src/df.c
static ignorelist_t *il_mountpoint = NULL;
static ignorelist_t *il_fstype = NULL;
-static _Bool by_device = false;
-static _Bool report_inodes = false;
+static _Bool by_device = 0;
+static _Bool report_inodes = 0;
static int df_init (void)
{
else if (strcasecmp (key, "ReportByDevice") == 0)
{
if (IS_TRUE (value))
- by_device = true;
+ by_device = 1;
return (0);
}
else if (strcasecmp (key, "ReportInodes") == 0)
{
if (IS_TRUE (value))
- report_inodes = true;
+ report_inodes = 1;
else
- report_inodes = false;
+ report_inodes = 0;
return (0);
}
diff --git a/src/lpar.c b/src/lpar.c
--- /dev/null
+++ b/src/lpar.c
@@ -0,0 +1,273 @@
+/**
+ * collectd - src/lpar.c
+ * Copyright (C) 2010 Aurélien Reynaud
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Authors:
+ * Aurélien Reynaud <collectd at wattapower.net>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+
+#include <sys/protosw.h>
+#include <libperfstat.h>
+#include <sys/utsname.h>
+
+/* XINTFRAC was defined in libperfstat.h somewhere between AIX 5.3 and 6.1 */
+#ifndef XINTFRAC
+# include <sys/systemcfg.h>
+# define XINTFRAC ((double)(_system_configuration.Xint) / \
+ (double)(_system_configuration.Xfrac))
+#endif
+
+#define CLOCKTICKS_TO_TICKS(cticks) ((cticks) / XINTFRAC)
+
+static const char *config_keys[] =
+{
+ "CpuPoolStats",
+ "ReportBySerial"
+};
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+static _Bool pool_stats = 0;
+static _Bool report_by_serial = 0;
+#if PERFSTAT_SUPPORTS_DONATION
+static _Bool donate_flag = 0;
+#endif
+static char serial[SYS_NMLN];
+
+static perfstat_partition_total_t lparstats_old;
+
+static int lpar_config (const char *key, const char *value)
+{
+ if (strcasecmp ("CpuPoolStats", key) == 0)
+ {
+ if (IS_TRUE (value))
+ pool_stats = 1;
+ else
+ pool_stats = 0;
+ }
+ else if (strcasecmp ("ReportBySerial", key) == 0)
+ {
+ if (IS_TRUE (value))
+ report_by_serial = 1;
+ else
+ report_by_serial = 0;
+ }
+ else
+ {
+ return (-1);
+ }
+
+ return (0);
+} /* int lpar_config */
+
+static int lpar_init (void)
+{
+ int status;
+
+ /* Retrieve the initial metrics. Returns the number of structures filled. */
+ status = perfstat_partition_total (/* name = */ NULL, /* (must be NULL) */
+ &lparstats_old, sizeof (perfstat_partition_total_t),
+ /* number = */ 1 /* (must be 1) */);
+ if (status != 1)
+ {
+ char errbuf[1024];
+ ERROR ("lpar plugin: perfstat_partition_total failed: %s (%i)",
+ sstrerror (errno, errbuf, sizeof (errbuf)),
+ status);
+ return (-1);
+ }
+
+#if PERFSTAT_SUPPORTS_DONATION
+ if (!lparstats_old.type.b.shared_enabled
+ && lparstats_old.type.b.donate_enabled)
+ {
+ donate_flag = 1;
+ }
+#endif
+
+ if (pool_stats && !lparstats_old.type.b.pool_util_authority)
+ {
+ WARNING ("lpar plugin: This partition does not have pool authority. "
+ "Disabling CPU pool statistics collection.");
+ pool_stats = 0;
+ }
+
+ return (0);
+} /* int lpar_init */
+
+static void lpar_submit (const char *type_instance, double value)
+{
+ value_t values[1];
+ value_list_t vl = VALUE_LIST_INIT;
+
+ values[0].gauge = (gauge_t)value;
+
+ vl.values = values;
+ vl.values_len = 1;
+ if (report_by_serial)
+ {
+ sstrncpy (vl.host, serial, sizeof (vl.host));
+ sstrncpy (vl.plugin_instance, hostname_g, sizeof (vl.plugin));
+ }
+ else
+ {
+ sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+ }
+ sstrncpy (vl.plugin, "lpar", sizeof (vl.plugin));
+ sstrncpy (vl.type, "vcpu", sizeof (vl.type));
+ sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
+
+ plugin_dispatch_values (&vl);
+} /* void lpar_submit */
+
+static int lpar_read (void)
+{
+ perfstat_partition_total_t lparstats;
+ int status;
+ struct utsname name;
+ u_longlong_t ticks;
+ u_longlong_t user_ticks, syst_ticks, wait_ticks, idle_ticks;
+ u_longlong_t consumed_ticks;
+ double entitled_proc_capacity;
+
+ /* An LPAR has the same serial number as the physical system it is currently
+ running on. It is a convenient way of tracking LPARs as they are moved
+ from chassis to chassis through Live Partition Mobility (LPM). */
+ if (uname (&name) != 0)
+ {
+ ERROR ("lpar plugin: uname failed.");
+ return (-1);
+ }
+ sstrncpy (serial, name.machine, sizeof (serial));
+
+ /* Retrieve the current metrics. Returns the number of structures filled. */
+ status = perfstat_partition_total (/* name = */ NULL, /* (must be NULL) */
+ &lparstats, sizeof (perfstat_partition_total_t),
+ /* number = */ 1 /* (must be 1) */);
+ if (status != 1)
+ {
+ char errbuf[1024];
+ ERROR ("lpar plugin: perfstat_partition_total failed: %s (%i)",
+ sstrerror (errno, errbuf, sizeof (errbuf)),
+ status);
+ return (-1);
+ }
+
+ /* Number of ticks since we last run. */
+ ticks = lparstats.timebase_last - lparstats_old.timebase_last;
+ if (ticks == 0)
+ {
+ /* The stats have not been updated. Return now to avoid
+ * dividing by zero */
+ return (0);
+ }
+
+ /*
+ * On a shared partition, we're "entitled" to a certain amount of
+ * processing power, for example 250/100 of a physical CPU. Processing
+ * capacity not used by the partition may be assigned to a different
+ * partition by the hypervisor, so "idle" is hopefully a very small
+ * number.
+ *
+ * A dedicated partition may donate its CPUs to another partition and
+ * may steal ticks from somewhere else (another partition or maybe the
+ * shared pool, I don't know --octo).
+ */
+
+ /* entitled_proc_capacity is in 1/100th of a CPU */
+ entitled_proc_capacity = 0.01 * ((double) lparstats.entitled_proc_capacity);
+ lpar_submit ("entitled", entitled_proc_capacity);
+
+ /* The number of ticks actually spent in the various states */
+ user_ticks = lparstats.puser - lparstats_old.puser;
+ syst_ticks = lparstats.psys - lparstats_old.psys;
+ wait_ticks = lparstats.pwait - lparstats_old.pwait;
+ idle_ticks = lparstats.pidle - lparstats_old.pidle;
+ consumed_ticks = user_ticks + syst_ticks + wait_ticks + idle_ticks;
+
+ lpar_submit ("user", (double) user_ticks / (double) ticks);
+ lpar_submit ("system", (double) syst_ticks / (double) ticks);
+ lpar_submit ("wait", (double) wait_ticks / (double) ticks);
+ lpar_submit ("idle", (double) idle_ticks / (double) ticks);
+
+#if PERFSTAT_SUPPORTS_DONATION
+ if (donate_flag)
+ {
+ /* donated => ticks given to another partition
+ * stolen => ticks received from another partition */
+ u_longlong_t idle_donated_ticks, busy_donated_ticks;
+ u_longlong_t idle_stolen_ticks, busy_stolen_ticks;
+
+ /* FYI: PURR == Processor Utilization of Resources Register
+ * SPURR == Scaled PURR */
+ idle_donated_ticks = lparstats.idle_donated_purr - lparstats_old.idle_donated_purr;
+ busy_donated_ticks = lparstats.busy_donated_purr - lparstats_old.busy_donated_purr;
+ idle_stolen_ticks = lparstats.idle_stolen_purr - lparstats_old.idle_stolen_purr;
+ busy_stolen_ticks = lparstats.busy_stolen_purr - lparstats_old.busy_stolen_purr;
+
+ lpar_submit ("idle_donated", (double) idle_donated_ticks / (double) ticks);
+ lpar_submit ("busy_donated", (double) busy_donated_ticks / (double) ticks);
+ lpar_submit ("idle_stolen", (double) idle_stolen_ticks / (double) ticks);
+ lpar_submit ("busy_stolen", (double) busy_stolen_ticks / (double) ticks);
+
+ /* Donated ticks will be accounted for as stolen ticks in other LPARs */
+ consumed_ticks += idle_stolen_ticks + busy_stolen_ticks;
+ }
+#endif
+
+ lpar_submit ("consumed", (double) consumed_ticks / (double) ticks);
+
+ if (pool_stats)
+ {
+ char typinst[DATA_MAX_NAME_LEN];
+ u_longlong_t pool_idle_cticks;
+ double pool_idle_cpus;
+ double pool_busy_cpus;
+
+ /* We're calculating "busy" from "idle" and the total number of
+ * CPUs, because the "busy" member didn't exist in early versions
+ * of libperfstat. It was added somewhere between AIX 5.3 ML5 and ML9. */
+ pool_idle_cticks = lparstats.pool_idle_time - lparstats_old.pool_idle_time;
+ pool_idle_cpus = CLOCKTICKS_TO_TICKS ((double) pool_idle_cticks) / (double) ticks;
+ pool_busy_cpus = ((double) lparstats.phys_cpus_pool) - pool_idle_cpus;
+ if (pool_busy_cpus < 0.0)
+ pool_busy_cpus = 0.0;
+
+ ssnprintf (typinst, sizeof (typinst), "pool-%X-busy", lparstats.pool_id);
+ lpar_submit (typinst, pool_busy_cpus);
+
+ ssnprintf (typinst, sizeof (typinst), "pool-%X-idle", lparstats.pool_id);
+ lpar_submit (typinst, pool_idle_cpus);
+ }
+
+ memcpy (&lparstats_old, &lparstats, sizeof (lparstats_old));
+
+ return (0);
+} /* int lpar_read */
+
+void module_register (void)
+{
+ plugin_register_config ("lpar", lpar_config,
+ config_keys, config_keys_num);
+ plugin_register_init ("lpar", lpar_init);
+ plugin_register_read ("lpar", lpar_read);
+} /* void module_register */
+
+/* vim: set sw=8 noet : */
+
diff --git a/src/match_value.c b/src/match_value.c
index 9f02226bdae67109f94a4139cb2f6b60f26cdb34..ae6282c4afa4a54083a745ba14bcaa01873d3eca 100644 (file)
--- a/src/match_value.c
+++ b/src/match_value.c
*/
static void mv_free_match (mv_match_t *m) /* {{{ */
{
+ int i;
+
if (m == NULL)
return;
+ if (m->data_sources != NULL)
+ {
+ for (i = 0; i < m->data_sources_num; ++i)
+ free(m->data_sources[i]);
+ free(m->data_sources);
+ }
+
free (m);
} /* }}} void mv_free_match */
diff --git a/src/memcachec.c b/src/memcachec.c
index d066501cdcba73afb56363dae167b9dd78353e10..8f51e22f01dd3c2d728d0cbc0a8e2cc61fba99e1 100644 (file)
--- a/src/memcachec.c
+++ b/src/memcachec.c
vl.values = values;
vl.values_len = 1;
- vl.time = time (NULL);
sstrncpy (vl.host, hostname_g, sizeof (vl.host));
sstrncpy (vl.plugin, "memcachec", sizeof (vl.plugin));
sstrncpy (vl.plugin_instance, wp->instance, sizeof (vl.plugin_instance));
diff --git a/src/memcached.c b/src/memcached.c
index 348591fd160b79c0f64308732698e79ec9ab4789..8490bf661b358d793a0701c779ad2758587b2df7 100644 (file)
--- a/src/memcached.c
+++ b/src/memcached.c
vl.values = values;
vl.values_len = 2;
- vl.time = time (NULL);
sstrncpy (vl.host, hostname_g, sizeof (vl.host));
sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
sstrncpy (vl.type, type, sizeof (vl.type));
vl.values = values;
vl.values_len = 1;
- vl.time = time (NULL);
sstrncpy (vl.host, hostname_g, sizeof (vl.host));
sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
sstrncpy (vl.type, type, sizeof (vl.type));
vl.values = values;
vl.values_len = 2;
- vl.time = time (NULL);
sstrncpy (vl.host, hostname_g, sizeof (vl.host));
sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
sstrncpy (vl.type, type, sizeof (vl.type));
diff --git a/src/mysql.c b/src/mysql.c
index 48ad528b0f279b80f51df44cb64578d93ec0d7ea..a01bbe4018d3f63f973da07c3c5196f1f46e2597 100644 (file)
--- a/src/mysql.c
+++ b/src/mysql.c
struct mysql_database_s /* {{{ */
{
- /* instance == NULL => legacy mode */
char *instance;
char *host;
char *user;
int err;
if ((err = mysql_ping (db->con)) != 0)
{
- WARNING ("mysql_ping failed for %s: %s",
- (db->instance != NULL)
- ? db->instance
- : "<legacy>",
+ /* Assured by "mysql_config_database" */
+ assert (db->instance != NULL);
+ WARNING ("mysql_ping failed for instance \"%s\": %s",
+ db->instance,
mysql_error (db->con));
db->state = 0;
}
static void set_host (mysql_database_t *db, char *buf, size_t buflen)
{
- /* XXX legacy mode - use hostname_g */
- if (db->instance == NULL)
+ if ((db->host == NULL)
+ || (strcmp ("", db->host) == 0)
+ || (strcmp ("localhost", db->host) == 0))
sstrncpy (buf, hostname_g, buflen);
else
- {
- if ((db->host == NULL)
- || (strcmp ("", db->host) == 0)
- || (strcmp ("localhost", db->host) == 0))
- sstrncpy (buf, hostname_g, buflen);
- else
- sstrncpy (buf, db->host, buflen);
- }
-}
-
-static void set_plugin_instance (mysql_database_t *db,
- char *buf, size_t buflen)
-{
- /* XXX legacy mode - no plugin_instance */
- if (db->instance == NULL)
- sstrncpy (buf, "", buflen);
- else
- sstrncpy (buf, db->instance, buflen);
-}
+ sstrncpy (buf, db->host, buflen);
+} /* void set_host */
static void submit (const char *type, const char *type_instance,
value_t *values, size_t values_len, mysql_database_t *db)
set_host (db, vl.host, sizeof (vl.host));
sstrncpy (vl.plugin, "mysql", sizeof (vl.plugin));
- set_plugin_instance (db, vl.plugin_instance, sizeof (vl.plugin_instance));
+
+ /* Assured by "mysql_config_database" */
+ assert (db->instance != NULL);
+ sstrncpy (vl.plugin_instance, db->instance, sizeof (vl.plugin_instance));
sstrncpy (vl.type, type, sizeof (vl.type));
if (type_instance != NULL)
sql = row[SLAVE_SQL_RUNNING_IDX];
set_host (db, n.host, sizeof (n.host));
- set_plugin_instance (db,
- n.plugin_instance, sizeof (n.plugin_instance));
+
+ /* Assured by "mysql_config_database" */
+ assert (db->instance != NULL);
+ sstrncpy (n.plugin_instance, db->instance, sizeof (n.plugin_instance));
if (((io == NULL) || (strcasecmp (io, "yes") != 0))
&& (db->slave_io_running))
diff --git a/src/netapp.c b/src/netapp.c
index 317b0fe40d51936e924244a23321f7b7f4b29e68..c50b3dbdbcb707860d7a4658d211e844f1b3fe3a 100644 (file)
--- a/src/netapp.c
+++ b/src/netapp.c
const char *plugin_inst,
const char *type, const char *type_inst,
value_t *values, int values_len,
- time_t timestamp)
+ time_t timestamp, int interval)
{
value_list_t vl = VALUE_LIST_INIT;
if (timestamp > 0)
vl.time = timestamp;
+ if (interval > 0)
+ vl.interval = interval;
+
if (host != NULL)
sstrncpy (vl.host, host, sizeof (vl.host));
else
static int submit_two_counters (const char *host, const char *plugin_inst, /* {{{ */
const char *type, const char *type_inst, counter_t val0, counter_t val1,
- time_t timestamp)
+ time_t timestamp, int interval)
{
value_t values[2];
@@ -600,23 +603,23 @@ static int submit_two_counters (const char *host, const char *plugin_inst, /* {{
values[1].counter = val1;
return (submit_values (host, plugin_inst, type, type_inst,
- values, 2, timestamp));
+ values, 2, timestamp, interval));
} /* }}} int submit_two_counters */
static int submit_counter (const char *host, const char *plugin_inst, /* {{{ */
- const char *type, const char *type_inst, counter_t counter, time_t timestamp)
+ const char *type, const char *type_inst, counter_t counter, time_t timestamp, int interval)
{
value_t v;
v.counter = counter;
return (submit_values (host, plugin_inst, type, type_inst,
- &v, 1, timestamp));
+ &v, 1, timestamp, interval));
} /* }}} int submit_counter */
static int submit_two_gauge (const char *host, const char *plugin_inst, /* {{{ */
const char *type, const char *type_inst, gauge_t val0, gauge_t val1,
- time_t timestamp)
+ time_t timestamp, int interval)
{
value_t values[2];
@@ -624,18 +627,18 @@ static int submit_two_gauge (const char *host, const char *plugin_inst, /* {{{ *
values[1].gauge = val1;
return (submit_values (host, plugin_inst, type, type_inst,
- values, 2, timestamp));
+ values, 2, timestamp, interval));
} /* }}} int submit_two_gauge */
static int submit_double (const char *host, const char *plugin_inst, /* {{{ */
- const char *type, const char *type_inst, double d, time_t timestamp)
+ const char *type, const char *type_inst, double d, time_t timestamp, int interval)
{
value_t v;
v.gauge = (gauge_t) d;
return (submit_values (host, plugin_inst, type, type_inst,
- &v, 1, timestamp));
+ &v, 1, timestamp, interval));
} /* }}} int submit_uint64 */
/* Calculate hit ratio from old and new counters and submit the resulting
uint64_t new_misses,
uint64_t old_hits,
uint64_t old_misses,
- time_t timestamp)
+ time_t timestamp,
+ int interval)
{
value_t v;
}
return (submit_values (host, plugin_inst, "cache_ratio", type_inst,
- &v, 1, timestamp));
+ &v, 1, timestamp, interval));
} /* }}} int submit_cache_ratio */
/* Submits all the caches used by WAFL. Uses "submit_cache_ratio". */
static int submit_wafl_data (const char *hostname, const char *instance, /* {{{ */
- cfg_wafl_t *old_data, const cfg_wafl_t *new_data)
+ cfg_wafl_t *old_data, const cfg_wafl_t *new_data, int interval)
{
/* Submit requested counters */
if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_NAME_CACHE | HAVE_WAFL_NAME_CACHE)
@@ -677,28 +681,28 @@ static int submit_wafl_data (const char *hostname, const char *instance, /* {{{
submit_cache_ratio (hostname, instance, "name_cache_hit",
new_data->name_cache_hit, new_data->name_cache_miss,
old_data->name_cache_hit, old_data->name_cache_miss,
- new_data->timestamp);
+ new_data->timestamp, interval);
if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_DIR_CACHE | HAVE_WAFL_FIND_DIR)
&& HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_FIND_DIR))
submit_cache_ratio (hostname, instance, "find_dir_hit",
new_data->find_dir_hit, new_data->find_dir_miss,
old_data->find_dir_hit, old_data->find_dir_miss,
- new_data->timestamp);
+ new_data->timestamp, interval);
if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_BUF_CACHE | HAVE_WAFL_BUF_HASH)
&& HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_BUF_HASH))
submit_cache_ratio (hostname, instance, "buf_hash_hit",
new_data->buf_hash_hit, new_data->buf_hash_miss,
old_data->buf_hash_hit, old_data->buf_hash_miss,
- new_data->timestamp);
+ new_data->timestamp, interval);
if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_INODE_CACHE | HAVE_WAFL_INODE_CACHE)
&& HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_INODE_CACHE))
submit_cache_ratio (hostname, instance, "inode_cache_hit",
new_data->inode_cache_hit, new_data->inode_cache_miss,
old_data->inode_cache_hit, old_data->inode_cache_miss,
- new_data->timestamp);
+ new_data->timestamp, interval);
/* Clear old HAVE_* flags */
old_data->flags &= ~HAVE_WAFL_ALL;
* update flags appropriately. */
static int submit_volume_perf_data (const char *hostname, /* {{{ */
data_volume_perf_t *old_data,
- const data_volume_perf_t *new_data)
+ const data_volume_perf_t *new_data, int interval)
{
char plugin_instance[DATA_MAX_NAME_LEN];
&& HAS_ALL_FLAGS (new_data->flags, HAVE_VOLUME_PERF_BYTES_READ | HAVE_VOLUME_PERF_BYTES_WRITE))
{
submit_two_counters (hostname, plugin_instance, "disk_octets", /* type instance = */ NULL,
- (counter_t) new_data->read_bytes, (counter_t) new_data->write_bytes, new_data->timestamp);
+ (counter_t) new_data->read_bytes, (counter_t) new_data->write_bytes, new_data->timestamp, interval);
}
/* Check for and submit disk-operations values */
&& HAS_ALL_FLAGS (new_data->flags, HAVE_VOLUME_PERF_OPS_READ | HAVE_VOLUME_PERF_OPS_WRITE))
{
submit_two_counters (hostname, plugin_instance, "disk_ops", /* type instance = */ NULL,
- (counter_t) new_data->read_ops, (counter_t) new_data->write_ops, new_data->timestamp);
+ (counter_t) new_data->read_ops, (counter_t) new_data->write_ops, new_data->timestamp, interval);
}
/* Check for, calculate and submit disk-latency values */
}
submit_two_gauge (hostname, plugin_instance, "disk_latency", /* type instance = */ NULL,
- latency_per_op_read, latency_per_op_write, new_data->timestamp);
+ latency_per_op_read, latency_per_op_write, new_data->timestamp, interval);
}
/* Clear all HAVE_* flags. */
*/
/* Data corresponding to <WAFL /> */
static int cna_handle_wafl_data (const char *hostname, cfg_wafl_t *cfg_wafl, /* {{{ */
- na_elem_t *data)
+ na_elem_t *data, int interval)
{
cfg_wafl_t perf_data;
const char *plugin_inst;
}
}
- return (submit_wafl_data (hostname, plugin_inst, cfg_wafl, &perf_data));
+ return (submit_wafl_data (hostname, plugin_inst, cfg_wafl, &perf_data, interval));
} /* }}} void cna_handle_wafl_data */
static int cna_setup_wafl (cfg_wafl_t *cw) /* {{{ */
return (-1);
}
- status = cna_handle_wafl_data (host->name, host->cfg_wafl, data);
+ status = cna_handle_wafl_data (host->name, host->cfg_wafl, data, host->interval);
if (status == 0)
host->cfg_wafl->interval.last_read = now;
/* Data corresponding to <Disks /> */
static int cna_handle_disk_data (const char *hostname, /* {{{ */
- cfg_disk_t *cfg_disk, na_elem_t *data)
+ cfg_disk_t *cfg_disk, na_elem_t *data, int interval)
{
time_t timestamp;
na_elem_t *instances;
if ((cfg_disk->flags & CFG_DISK_BUSIEST) && (worst_disk != NULL))
submit_double (hostname, "system", "percent", "disk_busy",
- worst_disk->disk_busy_percent, timestamp);
+ worst_disk->disk_busy_percent, timestamp, interval);
return (0);
} /* }}} int cna_handle_disk_data */
return (-1);
}
- status = cna_handle_disk_data (host->name, host->cfg_disk, data);
+ status = cna_handle_disk_data (host->name, host->cfg_disk, data, host->interval);
if (status == 0)
host->cfg_disk->interval.last_read = now;
/* Data corresponding to <VolumePerf /> */
static int cna_handle_volume_perf_data (const char *hostname, /* {{{ */
- cfg_volume_perf_t *cvp, na_elem_t *data)
+ cfg_volume_perf_t *cvp, na_elem_t *data, int interval)
{
time_t timestamp;
na_elem_t *elem_instances;
}
} /* for (elem_counter) */
- submit_volume_perf_data (hostname, v, &perf_data);
+ submit_volume_perf_data (hostname, v, &perf_data, interval);
} /* for (volume) */
return (0);
return (-1);
}
- status = cna_handle_volume_perf_data (host->name, host->cfg_volume_perf, data);
+ status = cna_handle_volume_perf_data (host->name, host->cfg_volume_perf, data, host->interval);
if (status == 0)
host->cfg_volume_perf->interval.last_read = now;
/* Data corresponding to <VolumeUsage /> */
static int cna_submit_volume_usage_data (const char *hostname, /* {{{ */
- cfg_volume_usage_t *cfg_volume)
+ cfg_volume_usage_t *cfg_volume, int interval)
{
data_volume_usage_t *v;
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_NORM_FREE))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "free",
- (double) norm_free, /* timestamp = */ 0);
+ (double) norm_free, /* timestamp = */ 0, interval);
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SIS_SAVED))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "sis_saved",
- (double) sis_saved, /* timestamp = */ 0);
+ (double) sis_saved, /* timestamp = */ 0, interval);
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_NORM_USED))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "used",
- (double) norm_used, /* timestamp = */ 0);
+ (double) norm_used, /* timestamp = */ 0, interval);
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_RSVD))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "snap_reserved",
- (double) snap_reserve_free, /* timestamp = */ 0);
+ (double) snap_reserve_free, /* timestamp = */ 0, interval);
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_USED | HAVE_VOLUME_USAGE_SNAP_RSVD))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "snap_reserve_used",
- (double) snap_reserve_used, /* timestamp = */ 0);
+ (double) snap_reserve_used, /* timestamp = */ 0, interval);
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_USED))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "snap_normal_used",
- (double) snap_norm_used, /* timestamp = */ 0);
+ (double) snap_norm_used, /* timestamp = */ 0, interval);
/* Clear all the HAVE_* flags */
v->flags &= ~HAVE_VOLUME_USAGE_ALL;
} /* }}} end of 32-bit workaround */
} /* for (elem_volume) */
- return (cna_submit_volume_usage_data (host->name, cfg_volume));
+ return (cna_submit_volume_usage_data (host->name, cfg_volume, host->interval));
} /* }}} int cna_handle_volume_usage_data */
static int cna_setup_volume_usage (cfg_volume_usage_t *cvu) /* {{{ */
/* Data corresponding to <System /> */
static int cna_handle_system_data (const char *hostname, /* {{{ */
- cfg_system_t *cfg_system, na_elem_t *data)
+ cfg_system_t *cfg_system, na_elem_t *data, int interval)
{
na_elem_t *instances;
na_elem_t *counter;
&& (value > 0) && (strlen(name) > 4)
&& (!strcmp(name + strlen(name) - 4, "_ops"))) {
submit_counter (hostname, instance, "disk_ops_complex", name,
- (counter_t) value, timestamp);
+ (counter_t) value, timestamp, interval);
}
} /* for (counter) */
if ((cfg_system->flags & CFG_SYSTEM_DISK)
&& (HAS_ALL_FLAGS (counter_flags, 0x01 | 0x02)))
submit_two_counters (hostname, instance, "disk_octets", NULL,
- disk_read, disk_written, timestamp);
+ disk_read, disk_written, timestamp, interval);
if ((cfg_system->flags & CFG_SYSTEM_NET)
&& (HAS_ALL_FLAGS (counter_flags, 0x04 | 0x08)))
submit_two_counters (hostname, instance, "if_octets", NULL,
- net_recv, net_sent, timestamp);
+ net_recv, net_sent, timestamp, interval);
if ((cfg_system->flags & CFG_SYSTEM_CPU)
&& (HAS_ALL_FLAGS (counter_flags, 0x10 | 0x20)))
{
submit_counter (hostname, instance, "cpu", "system",
- cpu_busy, timestamp);
+ cpu_busy, timestamp, interval);
submit_counter (hostname, instance, "cpu", "idle",
- cpu_total - cpu_busy, timestamp);
+ cpu_total - cpu_busy, timestamp, interval);
}
return (0);
return (-1);
}
- status = cna_handle_system_data (host->name, host->cfg_system, data);
+ status = cna_handle_system_data (host->name, host->cfg_system, data, host->interval);
if (status == 0)
host->cfg_system->interval.last_read = now;
diff --git a/src/network.c b/src/network.c
index 73e6d92d2cf197cd55175b6fc8ee9a0c24b55915..3ad11778e407f8bc369add540d64a3c131f95b1c 100644 (file)
--- a/src/network.c
+++ b/src/network.c
#include "utils_fbhash.h"
#include "utils_avltree.h"
#include "utils_cache.h"
+#include "utils_complain.h"
#include "network.h"
* Private variables
*/
static int network_config_ttl = 0;
-static size_t network_config_packet_size = 1024;
+static size_t network_config_packet_size = 1452;
static int network_config_forward = 0;
static int network_config_stats = 0;
/* This is a value we already sent. Don't allow it to be received again in
* order to avoid looping. */
if ((status == 0) && (time_sent >= ((uint64_t) vl->time)))
- return (false);
+ return (0);
- return (true);
+ return (1);
} /* }}} _Bool check_receive_okay */
static _Bool check_send_okay (const value_list_t *vl) /* {{{ */
{
- _Bool received = false;
+ _Bool received = 0;
int status;
if (network_config_forward != 0)
- return (true);
+ return (1);
if (vl->meta == NULL)
- return (true);
+ return (1);
status = meta_data_get_boolean (vl->meta, "network:received", &received);
if (status == -ENOENT)
- return (true);
+ return (1);
else if (status != 0)
{
ERROR ("network plugin: check_send_okay: meta_data_get_boolean failed "
"with status %i.", status);
- return (true);
+ return (1);
}
/* By default, only *send* value lists that were not *received* by the
return (-ENOMEM);
}
- status = meta_data_add_boolean (vl->meta, "network:received", true);
+ status = meta_data_add_boolean (vl->meta, "network:received", 1);
if (status != 0)
{
ERROR ("network plugin: meta_data_add_boolean failed.");
static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */
void **ret_buffer, size_t *ret_buffer_len, int flags)
{
+ static c_complain_t complain_no_users = C_COMPLAIN_INIT_STATIC;
+
char *buffer;
size_t buffer_len;
size_t buffer_offset;
if (se->data.server.userdb == NULL)
{
- NOTICE ("network plugin: Received signed network packet but can't verify "
- "it because no user DB has been configured. Will accept it.");
+ c_complain (LOG_NOTICE, &complain_no_users,
+ "network plugin: Received signed network packet but can't verify it "
+ "because no user DB has been configured. Will accept it.");
return (0);
}
static int network_init (void)
{
- static _Bool have_init = false;
+ static _Bool have_init = 0;
/* Check if we were already initialized. If so, just return - there's
* nothing more to do (for now, that is). */
if (have_init)
return (0);
- have_init = true;
+ have_init = 1;
#if HAVE_LIBGCRYPT
gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
diff --git a/src/nginx.c b/src/nginx.c
index 697684277a153911fdc43474cdc74bdab5b62d1e..36d3d8d2bed85f9de5867f9f381f0102e85f51c0 100644 (file)
--- a/src/nginx.c
+++ b/src/nginx.c
static CURL *curl = NULL;
-#define ABUFFER_SIZE 16384
-static char nginx_buffer[ABUFFER_SIZE];
-static int nginx_buffer_len = 0;
-static char nginx_curl_error[CURL_ERROR_SIZE];
+static char nginx_buffer[16384];
+static size_t nginx_buffer_len = 0;
+static char nginx_curl_error[CURL_ERROR_SIZE];
static const char *config_keys[] =
{
{
size_t len = size * nmemb;
- if ((nginx_buffer_len + len) >= ABUFFER_SIZE)
+ /* Check if the data fits into the memory. If not, truncate it. */
+ if ((nginx_buffer_len + len) >= sizeof (nginx_buffer))
{
- len = (ABUFFER_SIZE - 1) - nginx_buffer_len;
+ assert (sizeof (nginx_buffer) > nginx_buffer_len);
+ len = (sizeof (nginx_buffer) - 1) - nginx_buffer_len;
}
if (len <= 0)
return (len);
- memcpy (nginx_buffer + nginx_buffer_len, (char *) buf, len);
+ memcpy (&nginx_buffer[nginx_buffer_len], buf, len);
nginx_buffer_len += len;
- nginx_buffer[nginx_buffer_len] = '\0';
+ nginx_buffer[nginx_buffer_len] = 0;
return (len);
}
diff --git a/src/notify_email.c b/src/notify_email.c
index 62e1c486a3601f20db464b1e2083d96cc46544d3..0aed27f12bb2ac8b41b5fa4e80d586e0c71f8bca 100644 (file)
--- a/src/notify_email.c
+++ b/src/notify_email.c
/**
* collectd - src/notify_email.c
* Copyright (C) 2008 Oleg King
+ * Copyright (C) 2010 Florian Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
*
* Authors:
* Oleg King <king2 at kaluga.ru>
+ * Florian Forster <octo at collectd.org>
**/
#include "collectd.h"
#include <auth-client.h>
#include <libesmtp.h>
+#include <pthread.h>
#define MAXSTRING 256
static int recipients_len = 0;
static smtp_session_t session;
+static pthread_mutex_t session_lock = PTHREAD_MUTEX_INITIALIZER;
static smtp_message_t message;
static auth_context_t authctx = NULL;
{
char server[MAXSTRING];
+ ssnprintf(server, sizeof (server), "%s:%i",
+ (smtp_host == NULL) ? DEFAULT_SMTP_HOST : smtp_host,
+ smtp_port);
+
+ pthread_mutex_lock (&session_lock);
+
auth_client_init();
- if (!(session = smtp_create_session ())) {
+
+ session = smtp_create_session ();
+ if (session == NULL) {
+ pthread_mutex_unlock (&session_lock);
ERROR ("notify_email plugin: cannot create SMTP session");
return (-1);
}
smtp_set_monitorcb (session, monitor_cb, NULL, 1);
smtp_set_hostname (session, hostname_g);
- ssnprintf(server, sizeof (server), "%s:%i",
- (smtp_host == NULL) ? DEFAULT_SMTP_HOST : smtp_host,
- smtp_port);
smtp_set_server (session, server);
if (smtp_user && smtp_password) {
}
if ( !smtp_auth_set_context (session, authctx)) {
+ pthread_mutex_unlock (&session_lock);
ERROR ("notify_email plugin: cannot set SMTP auth context");
return (-1);
}
+ pthread_mutex_unlock (&session_lock);
return (0);
} /* int notify_email_init */
static int notify_email_shutdown (void)
{
- smtp_destroy_session (session);
- auth_destroy_context (authctx);
+ pthread_mutex_lock (&session_lock);
+
+ if (session != NULL)
+ smtp_destroy_session (session);
+ session = NULL;
+
+ if (authctx != NULL)
+ auth_destroy_context (authctx);
+ authctx = NULL;
+
auth_client_exit();
+
+ pthread_mutex_unlock (&session_lock);
return (0);
} /* int notify_email_shutdown */
n->host,
n->message);
+ pthread_mutex_lock (&session_lock);
+
+ if (session == NULL) {
+ /* Initialization failed or we're in the process of shutting down. */
+ pthread_mutex_unlock (&session_lock);
+ return (-1);
+ }
+
if (!(message = smtp_add_message (session))) {
+ pthread_mutex_unlock (&session_lock);
ERROR ("notify_email plugin: cannot set SMTP message");
return (-1);
}
char buf[MAXSTRING];
ERROR ("notify_email plugin: SMTP server problem: %s",
smtp_strerror (smtp_errno (), buf, sizeof buf));
+ pthread_mutex_unlock (&session_lock);
return (-1);
} else {
const smtp_status_t *status;
smtp_enumerate_recipients (message, print_recipient_status, NULL);
}
+ pthread_mutex_unlock (&session_lock);
return (0);
} /* int notify_email_notification */
diff --git a/src/perl.c b/src/perl.c
index a2f5da299bf92ae9679d3fd69b478f691540e3e4..726058044b80931843069737022b300473dd6ca9 100644 (file)
--- a/src/perl.c
+++ b/src/perl.c
#include "configfile.h"
+#if HAVE_STDBOOL_H
+# include <stdbool.h>
+#endif
+
#include <EXTERN.h>
#include <perl.h>
static XS (Collectd_plugin_dispatch_values)
{
SV *values = NULL;
- int values_idx = 0;
int ret = 0;
dXSARGS;
- if (2 == items) {
- log_warn ("Collectd::plugin_dispatch_values with two arguments "
- "is deprecated - pass the type through values->{type}.");
- values_idx = 1;
- }
- else if (1 != items) {
+ if (1 != items) {
log_err ("Usage: Collectd::plugin_dispatch_values(values)");
XSRETURN_EMPTY;
}
log_debug ("Collectd::plugin_dispatch_values: values=\"%s\"",
- SvPV_nolen (ST (values_idx)));
+ SvPV_nolen (ST (/* stack index = */ 0)));
- values = ST (values_idx);
+ values = ST (/* stack index = */ 0);
+ /* Make sure the argument is a hash reference. */
if (! (SvROK (values) && (SVt_PVHV == SvTYPE (SvRV (values))))) {
log_err ("Collectd::plugin_dispatch_values: Invalid values.");
XSRETURN_EMPTY;
}
- if (((2 == items) && (NULL == ST (0))) || (NULL == values))
- XSRETURN_EMPTY;
-
- if ((2 == items) && (NULL == hv_store ((HV *)SvRV (values), "type", 4,
- newSVsv (ST (0)), 0))) {
- log_err ("Collectd::plugin_dispatch_values: Could not store type.");
+ if (NULL == values)
XSRETURN_EMPTY;
- }
ret = pplugin_dispatch_values (aTHX_ (HV *)SvRV (values));
diff --git a/src/plugin.c b/src/plugin.c
index af894d54724c804a0bed9ae512b74fd533f9a18d..65d3875ebe5ae4b211dcc3f38addc3cd9491c26c 100644 (file)
--- a/src/plugin.c
+++ b/src/plugin.c
}
case NM_TYPE_BOOLEAN:
{
- meta->nm_value.nm_boolean = *((bool *) value);
+ meta->nm_value.nm_boolean = *((_Bool *) value);
break;
}
default:
int plugin_notification_meta_add_boolean (notification_t *n,
const char *name,
- bool value)
+ _Bool value)
{
return (plugin_notification_meta_add (n, name, NM_TYPE_BOOLEAN, &value));
}
diff --git a/src/plugin.h b/src/plugin.h
index 8b9449ee1dd64ec76436446bbbbaa9edd38a898b..d78aa4f82ba11c255a6c3c4d053cb4bd1504b38b 100644 (file)
--- a/src/plugin.h
+++ b/src/plugin.h
int64_t nm_signed_int;
uint64_t nm_unsigned_int;
double nm_double;
- bool nm_boolean;
+ _Bool nm_boolean;
} nm_value;
struct notification_meta_s *next;
} notification_meta_t;
double value);
int plugin_notification_meta_add_boolean (notification_t *n,
const char *name,
- bool value);
+ _Bool value);
int plugin_notification_meta_copy (notification_t *dst,
const notification_t *src);
diff --git a/src/python.c b/src/python.c
index 8772cd1fc23623d52f8ab8d96042ee86b18bc2bb..c056b5bf076ebee47b4490ff033a204dbf8f922e 100644 (file)
--- a/src/python.c
+++ b/src/python.c
static int cpy_config(oconfig_item_t *ci) {
int i;
+ char *argv = "";
PyObject *sys, *tb;
PyObject *sys_path;
PyObject *module;
cpy_log_exception("python initialization");
return 1;
}
+ PySys_SetArgv(1, &argv);
+ PyList_SetSlice(sys_path, 0, 1, NULL);
+
#ifdef IS_PY3K
module = PyImport_ImportModule("collectd");
#else
diff --git a/src/rrdtool.c b/src/rrdtool.c
index 4655b96ee77c79a74dca16f9f412bd2df55d3cd7..cb8ad593cbe51d4f9a36f22734fc273c8f724810 100644 (file)
--- a/src/rrdtool.c
+++ b/src/rrdtool.c
pthread_mutex_lock (&queue_lock);
/* Wait for values to arrive */
- while (true)
+ while (42)
{
struct timespec ts_wait;
&ts_wait);
if (status == ETIMEDOUT)
break;
- } /* while (true) */
+ } /* while (42) */
/* XXX: If you need to lock both, cache_lock and queue_lock, at
* the same time, ALWAYS lock `cache_lock' first! */
diff --git a/src/ted.c b/src/ted.c
index 8dc00e5a161ad928883b086d100a451e48d1db6f..bf519bbee53ec38c34ddae669c707523e2fbf617 100644 (file)
--- a/src/ted.c
+++ b/src/ted.c
values[0].gauge = value;
- vl.time = time (NULL);
vl.values = values;
vl.values_len = 1;
sstrncpy (vl.host, hostname_g, sizeof (vl.host));
diff --git a/src/types.db b/src/types.db
index f75e60074ed3165c402bc629db8f5c8d95936164..0484983294ba523ff5b167c2c6f78e87c59dab7d 100644 (file)
--- a/src/types.db
+++ b/src/types.db
cpu value:COUNTER:0:4294967295
current value:GAUGE:U:U
current_connections value:GAUGE:0:U
+current_sessions value:GAUGE:0:U
delay seconds:GAUGE:-1000000:1000000
derive value:DERIVE:0:U
df used:GAUGE:0:1125899906842623, free:GAUGE:0:1125899906842623
mysql_locks value:COUNTER:0:U
mysql_log_position value:COUNTER:0:4294967295
mysql_octets rx:COUNTER:0:4294967295, tx:COUNTER:0:4294967295
-mysql_qcache hits:COUNTER:0:U, inserts:COUNTER:0:U, not_cached:COUNTER:0:U, lowmem_prunes:COUNTER:0:U, queries_in_cache:GAUGE:0:U
-mysql_threads running:GAUGE:0:U, connected:GAUGE:0:U, cached:GAUGE:0:U, created:COUNTER:0:U
nfs_procedure value:COUNTER:0:4294967295
nginx_connections value:GAUGE:0:U
nginx_requests value:COUNTER:0:134217728
total_values value:DERIVE:0:U
uptime value:GAUGE:0:4294967295
users users:GAUGE:0:65535
+vcpu value:GAUGE:0:U
virt_cpu_total ns:COUNTER:0:256000000000
virt_vcpu ns:COUNTER:0:1000000000
vmpage_action value:COUNTER:0:4294967295
vs_memory value:GAUGE:0:9223372036854775807
vs_processes value:GAUGE:0:65535
vs_threads value:GAUGE:0:65535
-pinba_view req_per_sec:GAUGE:0:U, req_time:GAUGE:0:U, ru_utime:GAUGE:0:U, ru_stime:GAUGE:0:U, doc_size:GAUGE:0:U, mem_peak:GAUGE:0:U
diff --git a/src/utils_cache.c b/src/utils_cache.c
index 69ea864b99dbf1ddb2f35f8e82ae4078b0c8e9e6..aeb662d55bc09e63c18ae856ac182aa04fc6b369 100644 (file)
--- a/src/utils_cache.c
+++ b/src/utils_cache.c
}
/* Check if the entry has been updated in the meantime */
- if ((n.time - ce->last_update) < (2 * ce->interval))
+ if ((n.time - ce->last_update) < (timeout_g * ce->interval))
{
ce->state = STATE_OKAY;
pthread_mutex_unlock (&cache_lock);
diff --git a/src/utils_match.c b/src/utils_match.c
index 4d4b57d08051b31e91dceb0a8bf43a4e67695dfe..062bcfe37fd152851eb11eb03464daabe0b53ae2 100644 (file)
--- a/src/utils_match.c
+++ b/src/utils_match.c
return (NULL);
memset (obj, '\0', sizeof (cu_match_t));
- status = regcomp (&obj->regex, regex, REG_EXTENDED);
+ status = regcomp (&obj->regex, regex, REG_EXTENDED | REG_NEWLINE);
if (status != 0)
{
ERROR ("Compiling the regular expression \"%s\" failed.", regex);
diff --git a/src/utils_tail.c b/src/utils_tail.c
index 904a52122e7e40d4d6ebcc47788d218fe99888fa..5b7551d3a18f4925cb0403f454350373c8c7c6e8 100644 (file)
--- a/src/utils_tail.c
+++ b/src/utils_tail.c
while (42)
{
+ size_t len;
+
status = cu_tail_readline (obj, buf, buflen);
if (status != 0)
{
if (buf[0] == 0)
break;
+ len = strlen (buf);
+ while (len > 0) {
+ if (buf[len - 1] != '\n')
+ break;
+ buf[len - 1] = '\0';
+ }
+
status = callback (data, buf, buflen);
if (status != 0)
{