X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Famqp.c;h=4206bdc1e3ee36b99192452b377943e007fb8af8;hb=bc5073cf25c5cea3a322f70a3a9ba50c67460638;hp=24bc4881f6659049ca465d249c312332d0912aba;hpb=ef4a3db895a0aba7107c0f1c6c2ef2a7f128aaf8;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index 24bc4881..4206bdc1 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -23,7 +23,7 @@ * * Authors: * Sebastien Pahl - * Florian Forster + * Florian Forster **/ #include "collectd.h" @@ -80,6 +80,9 @@ struct camqp_config_s char *exchange; char *routing_key; + /* Number of seconds to wait before connection is retried */ + int connection_retry_delay; + /* publish only */ uint8_t delivery_mode; _Bool store_rates; @@ -93,6 +96,8 @@ struct camqp_config_s /* subscribe only */ char *exchange_type; char *queue; + _Bool queue_durable; + _Bool queue_auto_delete; amqp_connection_state_t connection; pthread_mutex_t lock; @@ -194,11 +199,11 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ switch (r.reply_type) { case AMQP_RESPONSE_NORMAL: - sstrncpy (buffer, "Success", sizeof (buffer)); + sstrncpy (buffer, "Success", buffer_size); break; case AMQP_RESPONSE_NONE: - sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer)); + sstrncpy (buffer, "Missing RPC reply type", buffer_size); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: @@ -210,7 +215,7 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ return (sstrerror (r.library_error, buffer, buffer_size)); #endif else - sstrncpy (buffer, "End of stream", sizeof (buffer)); + sstrncpy (buffer, "End of stream", buffer_size); break; case AMQP_RESPONSE_SERVER_EXCEPTION: @@ -332,9 +337,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ ? amqp_cstring_bytes (conf->queue) : AMQP_EMPTY_BYTES, /* passive = */ 0, - /* durable = */ 0, + /* durable = */ conf->queue_durable, /* exclusive = */ 0, - /* auto_delete = */ 1, + /* auto_delete = */ conf->queue_auto_delete, /* arguments = */ AMQP_EMPTY_TABLE); if (qd_ret == NULL) { @@ -407,6 +412,8 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ static int camqp_connect (camqp_config_t *conf) /* {{{ */ { + static time_t last_connect_time = 0; + amqp_rpc_reply_t reply; int status; #ifdef HAVE_AMQP_TCP_SOCKET @@ -418,6 +425,19 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ if (conf->connection != NULL) return (0); + time_t now = time(NULL); + if (now < (last_connect_time + conf->connection_retry_delay)) + { + DEBUG("amqp plugin: skipping connection retry, " + "ConnectionRetryDelay: %d", conf->connection_retry_delay); + return(1); + } + else + { + DEBUG ("amqp plugin: retrying connection"); + last_connect_time = now; + } + conf->connection = amqp_new_connection (); if (conf->connection == NULL) { @@ -696,7 +716,7 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ continue; } - status = camqp_read_header (conf); + camqp_read_header (conf); amqp_maybe_release_buffers (conf->connection); } /* while (subscriber_threads_running) */ @@ -791,7 +811,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ { camqp_config_t *conf = user_data->data; char routing_key[6 * DATA_MAX_NAME_LEN]; - char buffer[4096]; + char buffer[8192]; int status; if ((ds == NULL) || (vl == NULL) || (conf == NULL)) @@ -924,9 +944,12 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->password = NULL; conf->exchange = NULL; conf->routing_key = NULL; + conf->connection_retry_delay = 0; + /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + conf->graphite_flags = 0; /* publish & graphite only */ conf->prefix = NULL; conf->postfix = NULL; @@ -934,6 +957,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; + conf->queue_durable = 0; + conf->queue_auto_delete = 1; /* general */ conf->connection = NULL; pthread_mutex_init (&conf->lock, /* attr = */ NULL); @@ -973,6 +998,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ status = cf_util_get_string (child, &conf->exchange_type); else if ((strcasecmp ("Queue", child->key) == 0) && !publish) status = cf_util_get_string (child, &conf->queue); + else if ((strcasecmp ("QueueDurable", child->key) == 0) && !publish) + status = cf_util_get_boolean (child, &conf->queue_durable); + else if ((strcasecmp ("QueueAutoDelete", child->key) == 0) && !publish) + status = cf_util_get_boolean (child, &conf->queue_auto_delete); else if (strcasecmp ("RoutingKey", child->key) == 0) status = cf_util_get_string (child, &conf->routing_key); else if ((strcasecmp ("Persistent", child->key) == 0) && publish) @@ -992,6 +1021,12 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ } else if ((strcasecmp ("Format", child->key) == 0) && publish) status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphiteSeparateInstances", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + else if ((strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) status = cf_util_get_string (child, &conf->prefix); else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish) @@ -1006,6 +1041,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->escape_char = tmp_buff[0]; sfree (tmp_buff); } + else if (strcasecmp ("ConnectionRetryDelay", child->key) == 0) + status = cf_util_get_int (child, &conf->connection_retry_delay); else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key);