Code

amqp plugin: Improve handling of the "routing key".
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Fri, 6 Aug 2010 13:45:13 +0000 (15:45 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Fri, 6 Aug 2010 13:45:13 +0000 (15:45 +0200)
If no routing key is configured, the publish code will now create one based
on the value's identifier. The subscribing code will no longer use a
default but use an empty routing key if nothing was configured.

src/amqp.c

index d3a53d7ab2a1844057f1d940fe911357ac38e0ef..6be483e27f7da935c49f2ba52f067215c12d6bd3 100644 (file)
@@ -67,7 +67,7 @@ struct camqp_config_s
     char   *password;
 
     char   *exchange;
-    char   *routingkey;
+    char   *routing_key;
 
     /* publish only */
     uint8_t delivery_mode;
@@ -90,7 +90,6 @@ static const char *def_vhost      = "/";
 static const char *def_user       = "guest";
 static const char *def_password   = "guest";
 static const char *def_exchange   = "amq.fanout";
-static const char *def_routingkey = "collectd";
 
 static pthread_t *subscriber_threads     = NULL;
 static size_t     subscriber_threads_num = 0;
@@ -133,7 +132,7 @@ static void camqp_config_free (void *ptr) /* {{{ */
     sfree (conf->exchange);
     sfree (conf->exchange_type);
     sfree (conf->queue);
-    sfree (conf->routingkey);
+    sfree (conf->routing_key);
 
     sfree (conf);
 } /* }}} void camqp_config_free */
@@ -285,19 +284,14 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
             }
         }
 
-        DEBUG ("amqp plugin: queue = %s; exchange = %s; routing_key = %s;",
-                conf->queue, conf->exchange, CONF (conf, routingkey));
-
         assert (conf->queue != NULL);
         qb_ret = amqp_queue_bind (conf->connection,
                 /* channel     = */ CAMQP_CHANNEL,
                 /* queue       = */ amqp_cstring_bytes (conf->queue),
                 /* exchange    = */ amqp_cstring_bytes (conf->exchange),
-#if 1
-                /* routing_key = */ amqp_cstring_bytes (CONF (conf, routingkey)),
-#else
-                /* routing_key = */ AMQP_EMPTY_BYTES,
-#endif
+                /* routing_key = */ (conf->routing_key != NULL)
+                ? amqp_cstring_bytes (conf->routing_key)
+                : AMQP_EMPTY_BYTES,
                 /* arguments   = */ AMQP_EMPTY_TABLE);
         if ((qb_ret == NULL) && camqp_is_error (conf))
         {
@@ -609,7 +603,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
  * Publishing code
  */
 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
-        const char *buffer)
+        const char *buffer, const char *routing_key)
 {
     amqp_basic_properties_t props;
     int status;
@@ -629,7 +623,7 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
     status = amqp_basic_publish(conf->connection,
                 /* channel = */ 1,
                 amqp_cstring_bytes(CONF(conf, exchange)),
-                amqp_cstring_bytes(CONF(conf, routingkey)),
+                amqp_cstring_bytes (routing_key),
                 /* mandatory = */ 0,
                 /* immediate = */ 0,
                 &props,
@@ -648,17 +642,37 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
         user_data_t *user_data)
 {
     camqp_config_t *conf = user_data->data;
+    char routing_key[6 * DATA_MAX_NAME_LEN];
     char buffer[4096];
-    size_t bfree;
-    size_t bfill;
     int status;
 
     if ((ds == NULL) || (vl == NULL) || (conf == NULL))
         return (EINVAL);
 
     memset (buffer, 0, sizeof (buffer));
-    bfree = sizeof (buffer);
-    bfill = 0;
+
+    if (conf->routing_key != NULL)
+    {
+        sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
+    }
+    else
+    {
+        size_t i;
+        ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
+                vl->host,
+                vl->plugin, vl->plugin_instance,
+                vl->type, vl->type_instance);
+
+        /* Switch slashes (the only character forbidden by collectd) and dots
+         * (the separation character used by AMQP). */
+        for (i = 0; routing_key[i] != 0; i++)
+        {
+            if (routing_key[i] == '.')
+                routing_key[i] = '/';
+            else if (routing_key[i] == '/')
+                routing_key[i] = '.';
+        }
+    }
 
     if (conf->format == CAMQP_FORMAT_COMMAND)
     {
@@ -672,6 +686,9 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
     }
     else if (conf->format == CAMQP_FORMAT_JSON)
     {
+        size_t bfree = sizeof (buffer);
+        size_t bfill = 0;
+
         format_json_initialize (buffer, &bfill, &bfree);
         format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
         format_json_finalize (buffer, &bfill, &bfree);
@@ -683,7 +700,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
     }
 
     pthread_mutex_lock (&conf->lock);
-    status = camqp_write_locked (conf, buffer);
+    status = camqp_write_locked (conf, buffer, routing_key);
     pthread_mutex_unlock (&conf->lock);
 
     return (status);
@@ -744,7 +761,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     conf->user = NULL;
     conf->password = NULL;
     conf->exchange = NULL;
-    conf->routingkey = NULL;
+    conf->routing_key = NULL;
     /* publish only */
     conf->delivery_mode = CAMQP_DM_VOLATILE;
     conf->store_rates = 0;
@@ -793,7 +810,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
             status = cf_util_get_string (child, &conf->queue);
         else if (strcasecmp ("RoutingKey", child->key) == 0)
-            status = cf_util_get_string (child, &conf->routingkey);
+            status = cf_util_get_string (child, &conf->routing_key);
         else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
         {
             _Bool tmp = 0;
@@ -815,7 +832,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
 
     if ((status == 0) && !publish && (conf->exchange == NULL))
     {
-        if (conf->routingkey != NULL)
+        if (conf->routing_key != NULL)
             WARNING ("amqp plugin: The option \"RoutingKey\" was given "
                     "without the \"Exchange\" option. It will be ignored.");