summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: e7dd087)
raw | patch | inline | side by side (parent: e7dd087)
author | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Sun, 8 Aug 2010 12:45:27 +0000 (14:45 +0200) | ||
committer | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Sun, 8 Aug 2010 12:45:27 +0000 (14:45 +0200) |
src/amqp.c | patch | blob | history | |
src/collectd.conf.pod | patch | blob | history |
diff --git a/src/amqp.c b/src/amqp.c
index 7b9f41b275480ff465cfc5d6ccb0aba4b086d77c..d6cd2756a9b5b2b5312515528eeaae14677be9ca 100644 (file)
--- a/src/amqp.c
+++ b/src/amqp.c
return (buffer);
} /* }}} char *camqp_strerror */
+static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
+{
+ amqp_exchange_declare_ok_t *ed_ret;
+
+ if (conf->exchange_type == NULL)
+ return (0);
+
+ ed_ret = amqp_exchange_declare (conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* exchange = */ amqp_cstring_bytes (conf->exchange),
+ /* type = */ amqp_cstring_bytes (conf->exchange_type),
+ /* passive = */ 0,
+ /* durable = */ 0,
+ /* auto_delete = */ 1,
+ /* arguments = */ AMQP_EMPTY_TABLE);
+ if ((ed_ret == NULL) && camqp_is_error (conf))
+ {
+ char errbuf[1024];
+ ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
+ camqp_strerror (conf, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ INFO ("amqp plugin: Successfully created exchange \"%s\" "
+ "with type \"%s\".",
+ conf->exchange, conf->exchange_type);
+
+ return (0);
+} /* }}} int camqp_create_exchange */
+
static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
{
amqp_queue_declare_ok_t *qd_ret;
{
amqp_queue_bind_ok_t *qb_ret;
- /* create the exchange */
- if (conf->exchange_type != NULL)
- {
- amqp_exchange_declare_ok_t *ed_ret;
-
- ed_ret = amqp_exchange_declare (conf->connection,
- /* channel = */ CAMQP_CHANNEL,
- /* exchange = */ amqp_cstring_bytes (conf->exchange),
- /* type = */ amqp_cstring_bytes (conf->exchange_type),
- /* passive = */ 0,
- /* durable = */ 0,
- /* auto_delete = */ 1,
- /* arguments = */ AMQP_EMPTY_TABLE);
- if ((ed_ret == NULL) && camqp_is_error (conf))
- {
- char errbuf[1024];
- ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
- camqp_strerror (conf, errbuf, sizeof (errbuf)));
- camqp_close_connection (conf);
- return (-1);
- }
- }
-
assert (conf->queue != NULL);
qb_ret = amqp_queue_bind (conf->connection,
/* channel = */ CAMQP_CHANNEL,
INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
"on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
+ status = camqp_create_exchange (conf);
+ if (status != 0)
+ return (status);
+
if (!conf->publish)
return (camqp_setup_queue (conf));
return (0);
diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod
index 195ad79fd37d2e6615021c7ede4d66d8e30ee47d..36245e2cfbf25a4d53293d878d2a1d77ab632fe3 100644 (file)
--- a/src/collectd.conf.pod
+++ b/src/collectd.conf.pod
User "guest"
Password "guest"
Exchange "amq.fanout"
+ # ExchangeType "fanout"
# RoutingKey "collectd"
# Persistent false
# Format "command"
@@ -230,10 +231,11 @@ In I<Subscribe> blocks this option is optional. If given, a I<binding> between
the given exchange and the I<queue> is created, using the I<routing key> if
configured. See the B<Queue> and B<RoutingKey> options below.
-=item B<ExchangeType> I<Type> (Subscribe only)
+=item B<ExchangeType> I<Type>
If given, the plugin will try to create the configured I<exchange> with this
-I<type> after connecting and bind its I<queue> to it.
+I<type> after connecting. When in a I<Subscribe> block, the I<queue> will then
+be bound to this exchange.
=item B<Queue> I<Queue> (Subscribe only)