Code

Add support for durable AMQP queues
authorDavid Blundell <david.blundell@100percentit.com>
Fri, 23 May 2014 19:13:52 +0000 (20:13 +0100)
committerMarc Fournier <marc.fournier@camptocamp.com>
Fri, 20 Jun 2014 14:44:28 +0000 (16:44 +0200)
The collectd amqp plugin allows setting messages as persistent but is hardcoded to create or use transient queues that are set to auto_delete.  This commit adds support for QueueDurable and QueueAutoDelete boolean values in the config file and changes the queue creation as appropriate.

If the values are not set in the config file, QueueDurable defaults to false and QueueAutoDelete defaults to true.

Tested with RabbitMQ 3.3.1

src/amqp.c

index edd4f749396bbb14c501792b62db70d188ade0de..3f33ff78386e593d6d48559628f7503d9dbfeb1b 100644 (file)
@@ -79,6 +79,8 @@ struct camqp_config_s
     /* subscribe only */
     char   *exchange_type;
     char   *queue;
+    _Bool   queue_durable;
+    _Bool   queue_auto_delete;
 
     amqp_connection_state_t connection;
     pthread_mutex_t lock;
@@ -314,9 +316,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
             ? amqp_cstring_bytes (conf->queue)
             : AMQP_EMPTY_BYTES,
             /* passive     = */ 0,
-            /* durable     = */ 0,
+            /* durable     = */ conf->queue_durable,
             /* exclusive   = */ 0,
-            /* auto_delete = */ 1,
+            /* auto_delete = */ conf->queue_auto_delete,
             /* arguments   = */ AMQP_EMPTY_TABLE);
     if (qd_ret == NULL)
     {
@@ -885,6 +887,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     /* subscribe only */
     conf->exchange_type = NULL;
     conf->queue = NULL;
+    conf->queue_durable = 0;
+    conf->queue_auto_delete = 1;
     /* general */
     conf->connection = NULL;
     pthread_mutex_init (&conf->lock, /* attr = */ NULL);
@@ -924,6 +928,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             status = cf_util_get_string (child, &conf->exchange_type);
         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
             status = cf_util_get_string (child, &conf->queue);
+        else if (strcasecmp ("QueueDurable", child->key) == 0)
+            status = cf_util_get_boolean (child, &conf->queue_durable);
+        else if (strcasecmp ("QueueAutoDelete", child->key) == 0)
+            status = cf_util_get_boolean (child, &conf->queue_auto_delete);
         else if (strcasecmp ("RoutingKey", child->key) == 0)
             status = cf_util_get_string (child, &conf->routing_key);
         else if ((strcasecmp ("Persistent", child->key) == 0) && publish)