summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: bc01677)
raw | patch | inline | side by side (parent: bc01677)
author | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Wed, 4 Aug 2010 17:31:44 +0000 (19:31 +0200) | ||
committer | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Wed, 4 Aug 2010 17:31:44 +0000 (19:31 +0200) |
A connection is now opened on the first value to be written and is kept
open for as long as possible. This is hopefully much more efficient than
opening a new connection for each value.
The value_list_t is now converted to JSON first, before locking the
global connection object, in order to hold the lock for the least
possible time.
open for as long as possible. This is hopefully much more efficient than
opening a new connection for each value.
The value_list_t is now converted to JSON first, before locking the
global connection object, in order to hold the lock for the least
possible time.
src/amqp.c | patch | blob | history |
diff --git a/src/amqp.c b/src/amqp.c
index 489936484c0fddbce178f7a3c91d1ac2e1cd7e8d..ecbf338e2fb0f4e5fcfc75aa154eb7d4ede76cef 100644 (file)
--- a/src/amqp.c
+++ b/src/amqp.c
#include <stdlib.h>
#include <unistd.h>
#include <strings.h>
+#include <pthread.h>
-#include <collectd.h>
-#include <common.h>
-#include <plugin.h>
-#include <utils_format_json.h>
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+#include "utils_format_json.h"
#include <amqp.h>
#include <amqp_framing.h>
-#define PLUGIN_NAME "amqp"
-
static int port;
static char *host = NULL;
static char *vhost = NULL;
static char *exchange = NULL;
static char *routingkey = NULL;
+static amqp_connection_state_t amqp_conn = NULL;
+static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
+
static const char *config_keys[] =
{
"Host",
return (-1);
}
-static int amqp_write(const data_set_t *ds, const value_list_t *vl, user_data_t *user_data)
+static int amqp_write_locked (const char *buffer)
{
- int error;
- int sockfd;
- size_t bfree;
- size_t bfill;
- char buffer[4096];
amqp_rpc_reply_t reply;
- amqp_connection_state_t conn;
amqp_basic_properties_t props;
+ int status;
- /* TODO: Don't create a new connection for each value that is to be dispatched. */
- conn = amqp_new_connection();
- if ((sockfd = amqp_open_socket(host, port)) < 0)
- {
- ERROR ("amqp plugin: amqp_open_socket failed.");
- amqp_destroy_connection(conn);
- return (1);
- }
- amqp_set_sockfd(conn, sockfd);
- reply = amqp_login(conn, vhost,
- /* channel max = */ 0,
- /* frame max = */ 131072,
- /* heartbeat = */ 0,
- /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
- if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+ if (amqp_conn == NULL)
{
- ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
- vhost, user);
- amqp_destroy_connection(conn);
- close(sockfd);
- return (1);
- }
- amqp_channel_open(conn, 1);
- if (reply.reply_type != AMQP_RESPONSE_NORMAL)
- {
- ERROR ("amqp plugin: amqp_channel_open failed.");
- amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
- amqp_destroy_connection(conn);
- close(sockfd);
- return (1);
- }
- error = 0;
- memset(buffer, 0, sizeof(buffer));
- bfree = sizeof(buffer);
- bfill = 0;
- format_json_initialize(buffer, &bfill, &bfree);
- /* TODO: Possibly add a config option "StoreRates" and pass the value along here. */
- format_json_value_list(buffer, &bfill, &bfree, ds, vl,
- /* rates = */ 0);
- format_json_finalize(buffer, &bfill, &bfree);
+ int sockfd;
+
+ amqp_conn = amqp_new_connection ();
+ if (amqp_conn == NULL)
+ {
+ ERROR ("amqp plugin: amqp_new_connection failed.");
+ return (ENOMEM);
+ }
+
+ sockfd = amqp_open_socket (host, port);
+ if (sockfd < 0)
+ {
+ char errbuf[1024];
+ status = (-1) * sockfd;
+ ERROR ("amqp plugin: amqp_open_socket failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ amqp_destroy_connection(amqp_conn);
+ amqp_conn = NULL;
+ return (status);
+ }
+
+ amqp_set_sockfd (amqp_conn, sockfd);
+
+ reply = amqp_login(amqp_conn, vhost,
+ /* channel max = */ 0,
+ /* frame max = */ 131072,
+ /* heartbeat = */ 0,
+ /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
+ if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+ {
+ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
+ vhost, user);
+ amqp_destroy_connection(amqp_conn);
+ close(sockfd);
+ amqp_conn = NULL;
+ return (1);
+ }
+
+ amqp_channel_open (amqp_conn, /* channel = */ 1);
+ /* FIXME: Is checking "reply.reply_type" really correct here? How does
+ * it get set? --octo */
+ if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+ {
+ ERROR ("amqp plugin: amqp_channel_open failed.");
+ amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection(amqp_conn);
+ close(sockfd);
+ amqp_conn = NULL;
+ return (1);
+ }
+
+ INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
+ "on %s:%i.", vhost, host, port);
+ } /* if (amqp_conn == NULL) */
+
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
props.delivery_mode = 2; /* persistent delivery mode */
- error = amqp_basic_publish(conn,
+
+ status = amqp_basic_publish(amqp_conn,
/* channel = */ 1,
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey),
@@ -164,40 +179,82 @@ static int amqp_write(const data_set_t *ds, const value_list_t *vl, user_data_t
/* immediate = */ 0,
&props,
amqp_cstring_bytes(buffer));
- if (error != 0)
+ if (status != 0)
{
+ int sockfd;
+
ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
- error);
+ status);
+
+ sockfd = amqp_get_sockfd (amqp_conn);
+ amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
+ amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection (amqp_conn);
+ close(sockfd);
+ amqp_conn = NULL;
}
- reply = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
- if (reply.reply_type != AMQP_RESPONSE_NORMAL)
- error = 1;
- reply = amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
- if (reply.reply_type != AMQP_RESPONSE_NORMAL)
- error = 1;
- amqp_destroy_connection(conn);
- if (close(sockfd) < 0)
- error = 1;
- return (error);
-}
+ return (status);
+} /* int amqp_write_locked */
+
+static int amqp_write (const data_set_t *ds, const value_list_t *vl,
+ __attribute__((unused)) user_data_t *user_data)
+{
+ char buffer[4096];
+ size_t bfree;
+ size_t bfill;
+ int status;
+
+ if ((ds == NULL) || (vl == NULL))
+ return (EINVAL);
+
+ memset (buffer, 0, sizeof (buffer));
+ bfree = sizeof (buffer);
+ bfill = 0;
+
+ format_json_initialize(buffer, &bfill, &bfree);
+ /* TODO: Possibly add a config option "StoreRates" and pass the value along here. */
+ format_json_value_list(buffer, &bfill, &bfree, ds, vl, /* rates = */ 0);
+ format_json_finalize(buffer, &bfill, &bfree);
+
+ pthread_mutex_lock (&amqp_conn_lock);
+ status = amqp_write_locked (buffer);
+ pthread_mutex_unlock (&amqp_conn_lock);
+
+ return (status);
+} /* int amqp_write */
static int shutdown(void)
{
+ pthread_mutex_lock (&amqp_conn_lock);
+ if (amqp_conn != NULL)
+ {
+ int sockfd;
+
+ sockfd = amqp_get_sockfd (amqp_conn);
+ amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
+ amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection (amqp_conn);
+ close(sockfd);
+ amqp_conn = NULL;
+ }
+ pthread_mutex_unlock (&amqp_conn_lock);
+
sfree(host);
sfree(vhost);
sfree(user);
sfree(password);
sfree(exchange);
sfree(routingkey);
+
return (0);
}
void module_register(void)
{
- plugin_register_config(PLUGIN_NAME, config, config_keys, config_keys_num);
- plugin_register_write(PLUGIN_NAME, amqp_write, NULL);
- plugin_register_shutdown(PLUGIN_NAME, shutdown);
+ plugin_register_config("amqp", config, config_keys, config_keys_num);
+ plugin_register_write("amqp", amqp_write, NULL);
+ plugin_register_shutdown("amqp", shutdown);
}
/* vim: set sw=4 sts=4 et : */