summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 28144b1)
raw | patch | inline | side by side (parent: 28144b1)
author | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Fri, 6 Aug 2010 13:45:13 +0000 (15:45 +0200) | ||
committer | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Fri, 6 Aug 2010 13:45:13 +0000 (15:45 +0200) |
If no routing key is configured, the publish code will now create one based
on the value's identifier. The subscribing code will no longer use a
default but use an empty routing key if nothing was configured.
on the value's identifier. The subscribing code will no longer use a
default but use an empty routing key if nothing was configured.
src/amqp.c | patch | blob | history |
diff --git a/src/amqp.c b/src/amqp.c
index d3a53d7ab2a1844057f1d940fe911357ac38e0ef..6be483e27f7da935c49f2ba52f067215c12d6bd3 100644 (file)
--- a/src/amqp.c
+++ b/src/amqp.c
char *password;
char *exchange;
- char *routingkey;
+ char *routing_key;
/* publish only */
uint8_t delivery_mode;
static const char *def_user = "guest";
static const char *def_password = "guest";
static const char *def_exchange = "amq.fanout";
-static const char *def_routingkey = "collectd";
static pthread_t *subscriber_threads = NULL;
static size_t subscriber_threads_num = 0;
sfree (conf->exchange);
sfree (conf->exchange_type);
sfree (conf->queue);
- sfree (conf->routingkey);
+ sfree (conf->routing_key);
sfree (conf);
} /* }}} void camqp_config_free */
}
}
- DEBUG ("amqp plugin: queue = %s; exchange = %s; routing_key = %s;",
- conf->queue, conf->exchange, CONF (conf, routingkey));
-
assert (conf->queue != NULL);
qb_ret = amqp_queue_bind (conf->connection,
/* channel = */ CAMQP_CHANNEL,
/* queue = */ amqp_cstring_bytes (conf->queue),
/* exchange = */ amqp_cstring_bytes (conf->exchange),
-#if 1
- /* routing_key = */ amqp_cstring_bytes (CONF (conf, routingkey)),
-#else
- /* routing_key = */ AMQP_EMPTY_BYTES,
-#endif
+ /* routing_key = */ (conf->routing_key != NULL)
+ ? amqp_cstring_bytes (conf->routing_key)
+ : AMQP_EMPTY_BYTES,
/* arguments = */ AMQP_EMPTY_TABLE);
if ((qb_ret == NULL) && camqp_is_error (conf))
{
* Publishing code
*/
static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
- const char *buffer)
+ const char *buffer, const char *routing_key)
{
amqp_basic_properties_t props;
int status;
status = amqp_basic_publish(conf->connection,
/* channel = */ 1,
amqp_cstring_bytes(CONF(conf, exchange)),
- amqp_cstring_bytes(CONF(conf, routingkey)),
+ amqp_cstring_bytes (routing_key),
/* mandatory = */ 0,
/* immediate = */ 0,
&props,
@@ -648,17 +642,37 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
user_data_t *user_data)
{
camqp_config_t *conf = user_data->data;
+ char routing_key[6 * DATA_MAX_NAME_LEN];
char buffer[4096];
- size_t bfree;
- size_t bfill;
int status;
if ((ds == NULL) || (vl == NULL) || (conf == NULL))
return (EINVAL);
memset (buffer, 0, sizeof (buffer));
- bfree = sizeof (buffer);
- bfill = 0;
+
+ if (conf->routing_key != NULL)
+ {
+ sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
+ }
+ else
+ {
+ size_t i;
+ ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
+ vl->host,
+ vl->plugin, vl->plugin_instance,
+ vl->type, vl->type_instance);
+
+ /* Switch slashes (the only character forbidden by collectd) and dots
+ * (the separation character used by AMQP). */
+ for (i = 0; routing_key[i] != 0; i++)
+ {
+ if (routing_key[i] == '.')
+ routing_key[i] = '/';
+ else if (routing_key[i] == '/')
+ routing_key[i] = '.';
+ }
+ }
if (conf->format == CAMQP_FORMAT_COMMAND)
{
}
else if (conf->format == CAMQP_FORMAT_JSON)
{
+ size_t bfree = sizeof (buffer);
+ size_t bfill = 0;
+
format_json_initialize (buffer, &bfill, &bfree);
format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
format_json_finalize (buffer, &bfill, &bfree);
}
pthread_mutex_lock (&conf->lock);
- status = camqp_write_locked (conf, buffer);
+ status = camqp_write_locked (conf, buffer, routing_key);
pthread_mutex_unlock (&conf->lock);
return (status);
conf->user = NULL;
conf->password = NULL;
conf->exchange = NULL;
- conf->routingkey = NULL;
+ conf->routing_key = NULL;
/* publish only */
conf->delivery_mode = CAMQP_DM_VOLATILE;
conf->store_rates = 0;
else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
status = cf_util_get_string (child, &conf->queue);
else if (strcasecmp ("RoutingKey", child->key) == 0)
- status = cf_util_get_string (child, &conf->routingkey);
+ status = cf_util_get_string (child, &conf->routing_key);
else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
{
_Bool tmp = 0;
if ((status == 0) && !publish && (conf->exchange == NULL))
{
- if (conf->routingkey != NULL)
+ if (conf->routing_key != NULL)
WARNING ("amqp plugin: The option \"RoutingKey\" was given "
"without the \"Exchange\" option. It will be ignored.");