diff --git a/src/amqp.c b/src/amqp.c
index 1e23a563daccc430e3c6cda5331e5e3f9d2b9406..89f051e81960668c957abfe72239c542ed4681d2 100644 (file)
--- a/src/amqp.c
+++ b/src/amqp.c
*
* Authors:
* Sebastien Pahl <sebastien.pahl at dotcloud.com>
- * Florian Forster <octo at verplant.org>
+ * Florian Forster <octo at collectd.org>
**/
#include "collectd.h"
+
#include "common.h"
#include "plugin.h"
#include "utils_cmd_putval.h"
#include "utils_format_json.h"
#include "utils_format_graphite.h"
-#include <pthread.h>
-
#include <amqp.h>
#include <amqp_framing.h>
char *exchange;
char *routing_key;
+ /* Number of seconds to wait before connection is retried */
+ int connection_retry_delay;
+
/* publish only */
uint8_t delivery_mode;
_Bool store_rates;
/* subscribe only */
char *exchange_type;
char *queue;
+ _Bool queue_durable;
+ _Bool queue_auto_delete;
amqp_connection_state_t connection;
pthread_mutex_t lock;
? amqp_cstring_bytes (conf->queue)
: AMQP_EMPTY_BYTES,
/* passive = */ 0,
- /* durable = */ 0,
+ /* durable = */ conf->queue_durable,
/* exclusive = */ 0,
- /* auto_delete = */ 1,
+ /* auto_delete = */ conf->queue_auto_delete,
/* arguments = */ AMQP_EMPTY_TABLE);
if (qd_ret == NULL)
{
static int camqp_connect (camqp_config_t *conf) /* {{{ */
{
+ static time_t last_connect_time = 0;
+
amqp_rpc_reply_t reply;
int status;
#ifdef HAVE_AMQP_TCP_SOCKET
if (conf->connection != NULL)
return (0);
+ time_t now = time(NULL);
+ if (now < (last_connect_time + conf->connection_retry_delay))
+ {
+ DEBUG("amqp plugin: skipping connection retry, "
+ "ConnectionRetryDelay: %d", conf->connection_retry_delay);
+ return(1);
+ }
+ else
+ {
+ DEBUG ("amqp plugin: retrying connection");
+ last_connect_time = now;
+ }
+
conf->connection = amqp_new_connection ();
if (conf->connection == NULL)
{
static int camqp_shutdown (void) /* {{{ */
{
- size_t i;
-
DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
subscriber_threads_num);
subscriber_threads_running = 0;
- for (i = 0; i < subscriber_threads_num; i++)
+ for (size_t i = 0; i < subscriber_threads_num; i++)
{
/* FIXME: Sending a signal is not very elegant here. Maybe find out how
* to use a timeout in the thread and check for the variable in regular
if (tmp == NULL)
{
ERROR ("amqp plugin: realloc failed.");
- camqp_config_free (conf);
+ sfree (subscriber_threads);
return (ENOMEM);
}
subscriber_threads = tmp;
char errbuf[1024];
ERROR ("amqp plugin: pthread_create failed: %s",
sstrerror (status, errbuf, sizeof (errbuf)));
- camqp_config_free (conf);
return (status);
}
static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
const char *buffer, const char *routing_key)
{
- amqp_basic_properties_t props;
int status;
status = camqp_connect (conf);
if (status != 0)
return (status);
- memset (&props, 0, sizeof (props));
- props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
- | AMQP_BASIC_DELIVERY_MODE_FLAG
- | AMQP_BASIC_APP_ID_FLAG;
+ amqp_basic_properties_t props = {
+ ._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+ | AMQP_BASIC_DELIVERY_MODE_FLAG
+ | AMQP_BASIC_APP_ID_FLAG,
+ .delivery_mode = conf->delivery_mode,
+ .app_id = amqp_cstring_bytes("collectd")
+ };
+
if (conf->format == CAMQP_FORMAT_COMMAND)
props.content_type = amqp_cstring_bytes("text/collectd");
else if (conf->format == CAMQP_FORMAT_JSON)
props.content_type = amqp_cstring_bytes("text/graphite");
else
assert (23 == 42);
- props.delivery_mode = conf->delivery_mode;
- props.app_id = amqp_cstring_bytes("collectd");
status = amqp_basic_publish(conf->connection,
/* channel = */ 1,
@@ -791,21 +808,18 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
{
camqp_config_t *conf = user_data->data;
char routing_key[6 * DATA_MAX_NAME_LEN];
- char buffer[4096];
+ char buffer[8192];
int status;
if ((ds == NULL) || (vl == NULL) || (conf == NULL))
return (EINVAL);
- memset (buffer, 0, sizeof (buffer));
-
if (conf->routing_key != NULL)
{
sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
}
else
{
- size_t i;
ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
vl->host,
vl->plugin, vl->plugin_instance,
/* Switch slashes (the only character forbidden by collectd) and dots
* (the separation character used by AMQP). */
- for (i = 0; routing_key[i] != 0; i++)
+ for (size_t i = 0; routing_key[i] != 0; i++)
{
if (routing_key[i] == '.')
routing_key[i] = '/';
{
camqp_config_t *conf;
int status;
- int i;
- conf = malloc (sizeof (*conf));
+ conf = calloc (1, sizeof (*conf));
if (conf == NULL)
{
- ERROR ("amqp plugin: malloc failed.");
+ ERROR ("amqp plugin: calloc failed.");
return (ENOMEM);
}
/* Initialize "conf" {{{ */
- memset (conf, 0, sizeof (*conf));
conf->publish = publish;
conf->name = NULL;
conf->format = CAMQP_FORMAT_COMMAND;
conf->password = NULL;
conf->exchange = NULL;
conf->routing_key = NULL;
+ conf->connection_retry_delay = 0;
+
/* publish only */
conf->delivery_mode = CAMQP_DM_VOLATILE;
conf->store_rates = 0;
/* subscribe only */
conf->exchange_type = NULL;
conf->queue = NULL;
+ conf->queue_durable = 0;
+ conf->queue_auto_delete = 1;
/* general */
conf->connection = NULL;
pthread_mutex_init (&conf->lock, /* attr = */ NULL);
return (status);
}
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;
status = cf_util_get_string (child, &conf->exchange_type);
else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
status = cf_util_get_string (child, &conf->queue);
+ else if ((strcasecmp ("QueueDurable", child->key) == 0) && !publish)
+ status = cf_util_get_boolean (child, &conf->queue_durable);
+ else if ((strcasecmp ("QueueAutoDelete", child->key) == 0) && !publish)
+ status = cf_util_get_boolean (child, &conf->queue_auto_delete);
else if (strcasecmp ("RoutingKey", child->key) == 0)
status = cf_util_get_string (child, &conf->routing_key);
else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
conf->escape_char = tmp_buff[0];
sfree (tmp_buff);
}
+ else if (strcasecmp ("ConnectionRetryDelay", child->key) == 0)
+ status = cf_util_get_int (child, &conf->connection_retry_delay);
else
WARNING ("amqp plugin: Ignoring unknown "
"configuration option \"%s\".", child->key);
static int camqp_config (oconfig_item_t *ci) /* {{{ */
{
- int i;
-
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;