From b96f58e73819f3928919dfa2da63829fcae92f86 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 6 Aug 2010 15:45:13 +0200 Subject: [PATCH] amqp plugin: Improve handling of the "routing key". If no routing key is configured, the publish code will now create one based on the value's identifier. The subscribing code will no longer use a default but use an empty routing key if nothing was configured. --- src/amqp.c | 59 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/src/amqp.c b/src/amqp.c index d3a53d7a..6be483e2 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -67,7 +67,7 @@ struct camqp_config_s char *password; char *exchange; - char *routingkey; + char *routing_key; /* publish only */ uint8_t delivery_mode; @@ -90,7 +90,6 @@ static const char *def_vhost = "/"; static const char *def_user = "guest"; static const char *def_password = "guest"; static const char *def_exchange = "amq.fanout"; -static const char *def_routingkey = "collectd"; static pthread_t *subscriber_threads = NULL; static size_t subscriber_threads_num = 0; @@ -133,7 +132,7 @@ static void camqp_config_free (void *ptr) /* {{{ */ sfree (conf->exchange); sfree (conf->exchange_type); sfree (conf->queue); - sfree (conf->routingkey); + sfree (conf->routing_key); sfree (conf); } /* }}} void camqp_config_free */ @@ -285,19 +284,14 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ } } - DEBUG ("amqp plugin: queue = %s; exchange = %s; routing_key = %s;", - conf->queue, conf->exchange, CONF (conf, routingkey)); - assert (conf->queue != NULL); qb_ret = amqp_queue_bind (conf->connection, /* channel = */ CAMQP_CHANNEL, /* queue = */ amqp_cstring_bytes (conf->queue), /* exchange = */ amqp_cstring_bytes (conf->exchange), -#if 1 - /* routing_key = */ amqp_cstring_bytes (CONF (conf, routingkey)), -#else - /* routing_key = */ AMQP_EMPTY_BYTES, -#endif + /* routing_key = */ (conf->routing_key != NULL) + ? amqp_cstring_bytes (conf->routing_key) + : AMQP_EMPTY_BYTES, /* arguments = */ AMQP_EMPTY_TABLE); if ((qb_ret == NULL) && camqp_is_error (conf)) { @@ -609,7 +603,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ * Publishing code */ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ - const char *buffer) + const char *buffer, const char *routing_key) { amqp_basic_properties_t props; int status; @@ -629,7 +623,7 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ status = amqp_basic_publish(conf->connection, /* channel = */ 1, amqp_cstring_bytes(CONF(conf, exchange)), - amqp_cstring_bytes(CONF(conf, routingkey)), + amqp_cstring_bytes (routing_key), /* mandatory = */ 0, /* immediate = */ 0, &props, @@ -648,17 +642,37 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ user_data_t *user_data) { camqp_config_t *conf = user_data->data; + char routing_key[6 * DATA_MAX_NAME_LEN]; char buffer[4096]; - size_t bfree; - size_t bfill; int status; if ((ds == NULL) || (vl == NULL) || (conf == NULL)) return (EINVAL); memset (buffer, 0, sizeof (buffer)); - bfree = sizeof (buffer); - bfill = 0; + + if (conf->routing_key != NULL) + { + sstrncpy (routing_key, conf->routing_key, sizeof (routing_key)); + } + else + { + size_t i; + ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s", + vl->host, + vl->plugin, vl->plugin_instance, + vl->type, vl->type_instance); + + /* Switch slashes (the only character forbidden by collectd) and dots + * (the separation character used by AMQP). */ + for (i = 0; routing_key[i] != 0; i++) + { + if (routing_key[i] == '.') + routing_key[i] = '/'; + else if (routing_key[i] == '/') + routing_key[i] = '.'; + } + } if (conf->format == CAMQP_FORMAT_COMMAND) { @@ -672,6 +686,9 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ } else if (conf->format == CAMQP_FORMAT_JSON) { + size_t bfree = sizeof (buffer); + size_t bfill = 0; + format_json_initialize (buffer, &bfill, &bfree); format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); format_json_finalize (buffer, &bfill, &bfree); @@ -683,7 +700,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ } pthread_mutex_lock (&conf->lock); - status = camqp_write_locked (conf, buffer); + status = camqp_write_locked (conf, buffer, routing_key); pthread_mutex_unlock (&conf->lock); return (status); @@ -744,7 +761,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->user = NULL; conf->password = NULL; conf->exchange = NULL; - conf->routingkey = NULL; + conf->routing_key = NULL; /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; @@ -793,7 +810,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ else if ((strcasecmp ("Queue", child->key) == 0) && !publish) status = cf_util_get_string (child, &conf->queue); else if (strcasecmp ("RoutingKey", child->key) == 0) - status = cf_util_get_string (child, &conf->routingkey); + status = cf_util_get_string (child, &conf->routing_key); else if ((strcasecmp ("Persistent", child->key) == 0) && publish) { _Bool tmp = 0; @@ -815,7 +832,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ if ((status == 0) && !publish && (conf->exchange == NULL)) { - if (conf->routingkey != NULL) + if (conf->routing_key != NULL) WARNING ("amqp plugin: The option \"RoutingKey\" was given " "without the \"Exchange\" option. It will be ignored."); -- 2.30.2