diff --git a/src/amqp.c b/src/amqp.c
index 187582c46885e080dc18937ecb7aca7a5d83d19d..9bc3175cebaf0dd25c5a1188f4099e10426ccaac 100644 (file)
--- a/src/amqp.c
+++ b/src/amqp.c
#include "utils_format_json.h"
#include "utils_format_graphite.h"
-#include <pthread.h>
-
#include <amqp.h>
#include <amqp_framing.h>
switch (r.reply_type)
{
case AMQP_RESPONSE_NORMAL:
- sstrncpy (buffer, "Success", sizeof (buffer));
+ sstrncpy (buffer, "Success", buffer_size);
break;
case AMQP_RESPONSE_NONE:
- sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
+ sstrncpy (buffer, "Missing RPC reply type", buffer_size);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
return (sstrerror (r.library_error, buffer, buffer_size));
#endif
else
- sstrncpy (buffer, "End of stream", sizeof (buffer));
+ sstrncpy (buffer, "End of stream", buffer_size);
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
/* type = */ amqp_cstring_bytes (conf->exchange_type),
/* passive = */ 0,
/* durable = */ 0,
+#if defined(AMQP_VERSION) && AMQP_VERSION >= 0x00060000
+ /* auto delete = */ 0,
+ /* internal = */ 0,
+#endif
/* arguments = */ argument_table);
if ((ed_ret == NULL) && camqp_is_error (conf))
{
continue;
}
- status = camqp_read_header (conf);
+ camqp_read_header (conf);
amqp_maybe_release_buffers (conf->connection);
} /* while (subscriber_threads_running) */
if (tmp == NULL)
{
ERROR ("amqp plugin: realloc failed.");
- camqp_config_free (conf);
+ sfree (subscriber_threads);
return (ENOMEM);
}
subscriber_threads = tmp;
char errbuf[1024];
ERROR ("amqp plugin: pthread_create failed: %s",
sstrerror (status, errbuf, sizeof (errbuf)));
- camqp_config_free (conf);
return (status);
}
static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
const char *buffer, const char *routing_key)
{
- amqp_basic_properties_t props;
int status;
status = camqp_connect (conf);
if (status != 0)
return (status);
- memset (&props, 0, sizeof (props));
- props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
- | AMQP_BASIC_DELIVERY_MODE_FLAG
- | AMQP_BASIC_APP_ID_FLAG;
+ amqp_basic_properties_t props = {
+ ._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+ | AMQP_BASIC_DELIVERY_MODE_FLAG
+ | AMQP_BASIC_APP_ID_FLAG,
+ .delivery_mode = conf->delivery_mode,
+ .app_id = amqp_cstring_bytes("collectd")
+ };
+
if (conf->format == CAMQP_FORMAT_COMMAND)
props.content_type = amqp_cstring_bytes("text/collectd");
else if (conf->format == CAMQP_FORMAT_JSON)
props.content_type = amqp_cstring_bytes("text/graphite");
else
assert (23 == 42);
- props.delivery_mode = conf->delivery_mode;
- props.app_id = amqp_cstring_bytes("collectd");
status = amqp_basic_publish(conf->connection,
/* channel = */ 1,
if ((ds == NULL) || (vl == NULL) || (conf == NULL))
return (EINVAL);
- memset (buffer, 0, sizeof (buffer));
-
if (conf->routing_key != NULL)
{
sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
int status;
int i;
- conf = malloc (sizeof (*conf));
+ conf = calloc (1, sizeof (*conf));
if (conf == NULL)
{
- ERROR ("amqp plugin: malloc failed.");
+ ERROR ("amqp plugin: calloc failed.");
return (ENOMEM);
}
/* Initialize "conf" {{{ */
- memset (conf, 0, sizeof (*conf));
conf->publish = publish;
conf->name = NULL;
conf->format = CAMQP_FORMAT_COMMAND;
conf->password = NULL;
conf->exchange = NULL;
conf->routing_key = NULL;
- conf->connection_retry_delay = 60;
+ conf->connection_retry_delay = 0;
/* publish only */
conf->delivery_mode = CAMQP_DM_VOLATILE;