summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: c263b20)
raw | patch | inline | side by side (parent: c263b20)
author | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Thu, 5 Aug 2010 07:32:47 +0000 (09:32 +0200) | ||
committer | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Thu, 5 Aug 2010 07:32:47 +0000 (09:32 +0200) |
src/amqp.c | patch | blob | history |
diff --git a/src/amqp.c b/src/amqp.c
index 1f61080cdff9c799f1f2299d91e4d7461bbfe055..748ff9f155a0207a50f859fdae6bd8164568def5 100644 (file)
--- a/src/amqp.c
+++ b/src/amqp.c
/*
* Global variables
*/
-static int port = 5672;
-static char *host = NULL;
-static char *vhost = NULL;
-static char *user = NULL;
-static char *password = NULL;
-static char *exchange = NULL;
-static char *routingkey = NULL;
-static uint8_t delivery_mode = AMQP_DM_VOLATILE;
-static _Bool store_rates = 0;
+static const char *def_host = "localhost";
+static const char *def_vhost = "/";
+static const char *def_user = "guest";
+static const char *def_password = "guest";
+static const char *def_exchange = "amq.fanout";
+static const char *def_routingkey = "collectd";
+
+static char *conf_host = NULL;
+static char *conf_vhost = NULL;
+static char *conf_user = NULL;
+static char *conf_password = NULL;
+static char *conf_exchange = NULL;
+static char *conf_routingkey = NULL;
+static int conf_port = 5672;
+static uint8_t conf_delivery_mode = AMQP_DM_VOLATILE;
+static _Bool conf_store_rates = 0;
+
+#define CONF(f) ((conf_##f != NULL) ? conf_##f : def_##f)
static amqp_connection_state_t amqp_conn = NULL;
static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
static int config(const char *key, const char *value)
{
if (strcasecmp(key, "host") == 0)
- return (config_set(&host, value));
+ return (config_set(&conf_host, value));
else if(strcasecmp(key, "port") == 0)
{
int tmp;
return (1);
}
- port = tmp;
+ conf_port = tmp;
return (0);
}
else if (strcasecmp(key, "vhost") == 0)
- return (config_set(&vhost, value));
+ return (config_set(&conf_vhost, value));
else if (strcasecmp(key, "user") == 0)
- return (config_set(&user, value));
+ return (config_set(&conf_user, value));
else if (strcasecmp(key, "password") == 0)
- return (config_set(&password, value));
+ return (config_set(&conf_password, value));
else if (strcasecmp(key, "exchange") == 0)
- return (config_set(&exchange, value));
+ return (config_set(&conf_exchange, value));
else if (strcasecmp(key, "routingkey") == 0)
- return (config_set(&routingkey, value));
+ return (config_set(&conf_routingkey, value));
else if (strcasecmp ("Persistent", key) == 0)
{
if (IS_TRUE (value))
- delivery_mode = AMQP_DM_PERSISTENT;
+ conf_delivery_mode = AMQP_DM_PERSISTENT;
else
- delivery_mode = AMQP_DM_VOLATILE;
+ conf_delivery_mode = AMQP_DM_VOLATILE;
return (0);
}
else if (strcasecmp ("StoreRates", key) == 0)
{
if (IS_TRUE (value))
- store_rates = 1;
+ conf_store_rates = 1;
else
- store_rates = 0;
+ conf_store_rates = 0;
return (0);
}
return (-1);
return (ENOMEM);
}
- sockfd = amqp_open_socket (host, port);
+ sockfd = amqp_open_socket (CONF(host), conf_port);
if (sockfd < 0)
{
char errbuf[1024];
amqp_set_sockfd (amqp_conn, sockfd);
- reply = amqp_login(amqp_conn, vhost,
+ reply = amqp_login (amqp_conn, CONF(vhost),
/* channel max = */ 0,
- /* frame max = */ 131072,
- /* heartbeat = */ 0,
- /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
+ /* frame max = */ 131072,
+ /* heartbeat = */ 0,
+ /* authentication = */ AMQP_SASL_METHOD_PLAIN,
+ CONF(user), CONF(password));
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
{
ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
- vhost, user);
- amqp_destroy_connection(amqp_conn);
- close(sockfd);
+ CONF(vhost), CONF(user));
+ amqp_destroy_connection (amqp_conn);
+ close (sockfd);
amqp_conn = NULL;
return (1);
}
}
INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
- "on %s:%i.", vhost, host, port);
+ "on %s:%i.", CONF(vhost), CONF(host), conf_port);
return (0);
} /* int amqp_connect */
return (status);
memset (&props, 0, sizeof (props));
- props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
+ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+ | AMQP_BASIC_DELIVERY_MODE_FLAG
+ | AMQP_BASIC_APP_ID_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
- props.delivery_mode = delivery_mode;
+ props.delivery_mode = conf_delivery_mode;
+ props.app_id = amqp_cstring_bytes("collectd");
status = amqp_basic_publish(amqp_conn,
/* channel = */ 1,
- amqp_cstring_bytes(exchange),
- amqp_cstring_bytes(routingkey),
+ amqp_cstring_bytes(CONF(exchange)),
+ amqp_cstring_bytes(CONF(routingkey)),
/* mandatory = */ 0,
/* immediate = */ 0,
&props,
bfill = 0;
format_json_initialize (buffer, &bfill, &bfree);
- format_json_value_list (buffer, &bfill, &bfree, ds, vl, store_rates);
+ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf_store_rates);
format_json_finalize (buffer, &bfill, &bfree);
pthread_mutex_lock (&amqp_conn_lock);
}
pthread_mutex_unlock (&amqp_conn_lock);
- sfree(host);
- sfree(vhost);
- sfree(user);
- sfree(password);
- sfree(exchange);
- sfree(routingkey);
+ sfree(conf_host);
+ sfree(conf_vhost);
+ sfree(conf_user);
+ sfree(conf_password);
+ sfree(conf_exchange);
+ sfree(conf_routingkey);
return (0);
} /* int shutdown */