From adc387a3aad391700eb1bae48f1d95a804ea4e8a Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Wed, 4 Aug 2010 19:31:44 +0200 Subject: [PATCH] amqp plugin: Implement persistent connection handling. A connection is now opened on the first value to be written and is kept open for as long as possible. This is hopefully much more efficient than opening a new connection for each value. The value_list_t is now converted to JSON first, before locking the global connection object, in order to hold the lock for the least possible time. --- src/amqp.c | 195 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 126 insertions(+), 69 deletions(-) diff --git a/src/amqp.c b/src/amqp.c index 48993648..ecbf338e 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -30,17 +30,16 @@ #include #include #include +#include -#include -#include -#include -#include +#include "collectd.h" +#include "common.h" +#include "plugin.h" +#include "utils_format_json.h" #include #include -#define PLUGIN_NAME "amqp" - static int port; static char *host = NULL; static char *vhost = NULL; @@ -49,6 +48,9 @@ static char *password = NULL; static char *exchange = NULL; static char *routingkey = NULL; +static amqp_connection_state_t amqp_conn = NULL; +static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER; + static const char *config_keys[] = { "Host", @@ -102,61 +104,74 @@ static int config(const char *key, const char *value) return (-1); } -static int amqp_write(const data_set_t *ds, const value_list_t *vl, user_data_t *user_data) +static int amqp_write_locked (const char *buffer) { - int error; - int sockfd; - size_t bfree; - size_t bfill; - char buffer[4096]; amqp_rpc_reply_t reply; - amqp_connection_state_t conn; amqp_basic_properties_t props; + int status; - /* TODO: Don't create a new connection for each value that is to be dispatched. */ - conn = amqp_new_connection(); - if ((sockfd = amqp_open_socket(host, port)) < 0) - { - ERROR ("amqp plugin: amqp_open_socket failed."); - amqp_destroy_connection(conn); - return (1); - } - amqp_set_sockfd(conn, sockfd); - reply = amqp_login(conn, vhost, - /* channel max = */ 0, - /* frame max = */ 131072, - /* heartbeat = */ 0, - /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password); - if (reply.reply_type != AMQP_RESPONSE_NORMAL) + if (amqp_conn == NULL) { - ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", - vhost, user); - amqp_destroy_connection(conn); - close(sockfd); - return (1); - } - amqp_channel_open(conn, 1); - if (reply.reply_type != AMQP_RESPONSE_NORMAL) - { - ERROR ("amqp plugin: amqp_channel_open failed."); - amqp_connection_close(conn, AMQP_REPLY_SUCCESS); - amqp_destroy_connection(conn); - close(sockfd); - return (1); - } - error = 0; - memset(buffer, 0, sizeof(buffer)); - bfree = sizeof(buffer); - bfill = 0; - format_json_initialize(buffer, &bfill, &bfree); - /* TODO: Possibly add a config option "StoreRates" and pass the value along here. */ - format_json_value_list(buffer, &bfill, &bfree, ds, vl, - /* rates = */ 0); - format_json_finalize(buffer, &bfill, &bfree); + int sockfd; + + amqp_conn = amqp_new_connection (); + if (amqp_conn == NULL) + { + ERROR ("amqp plugin: amqp_new_connection failed."); + return (ENOMEM); + } + + sockfd = amqp_open_socket (host, port); + if (sockfd < 0) + { + char errbuf[1024]; + status = (-1) * sockfd; + ERROR ("amqp plugin: amqp_open_socket failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + amqp_destroy_connection(amqp_conn); + amqp_conn = NULL; + return (status); + } + + amqp_set_sockfd (amqp_conn, sockfd); + + reply = amqp_login(amqp_conn, vhost, + /* channel max = */ 0, + /* frame max = */ 131072, + /* heartbeat = */ 0, + /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", + vhost, user); + amqp_destroy_connection(amqp_conn); + close(sockfd); + amqp_conn = NULL; + return (1); + } + + amqp_channel_open (amqp_conn, /* channel = */ 1); + /* FIXME: Is checking "reply.reply_type" really correct here? How does + * it get set? --octo */ + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + ERROR ("amqp plugin: amqp_channel_open failed."); + amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(amqp_conn); + close(sockfd); + amqp_conn = NULL; + return (1); + } + + INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" " + "on %s:%i.", vhost, host, port); + } /* if (amqp_conn == NULL) */ + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("application/json"); props.delivery_mode = 2; /* persistent delivery mode */ - error = amqp_basic_publish(conn, + + status = amqp_basic_publish(amqp_conn, /* channel = */ 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), @@ -164,40 +179,82 @@ static int amqp_write(const data_set_t *ds, const value_list_t *vl, user_data_t /* immediate = */ 0, &props, amqp_cstring_bytes(buffer)); - if (error != 0) + if (status != 0) { + int sockfd; + ERROR ("amqp plugin: amqp_basic_publish failed with status %i.", - error); + status); + + sockfd = amqp_get_sockfd (amqp_conn); + amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS); + amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection (amqp_conn); + close(sockfd); + amqp_conn = NULL; } - reply = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); - if (reply.reply_type != AMQP_RESPONSE_NORMAL) - error = 1; - reply = amqp_connection_close(conn, AMQP_REPLY_SUCCESS); - if (reply.reply_type != AMQP_RESPONSE_NORMAL) - error = 1; - amqp_destroy_connection(conn); - if (close(sockfd) < 0) - error = 1; - return (error); -} + return (status); +} /* int amqp_write_locked */ + +static int amqp_write (const data_set_t *ds, const value_list_t *vl, + __attribute__((unused)) user_data_t *user_data) +{ + char buffer[4096]; + size_t bfree; + size_t bfill; + int status; + + if ((ds == NULL) || (vl == NULL)) + return (EINVAL); + + memset (buffer, 0, sizeof (buffer)); + bfree = sizeof (buffer); + bfill = 0; + + format_json_initialize(buffer, &bfill, &bfree); + /* TODO: Possibly add a config option "StoreRates" and pass the value along here. */ + format_json_value_list(buffer, &bfill, &bfree, ds, vl, /* rates = */ 0); + format_json_finalize(buffer, &bfill, &bfree); + + pthread_mutex_lock (&amqp_conn_lock); + status = amqp_write_locked (buffer); + pthread_mutex_unlock (&amqp_conn_lock); + + return (status); +} /* int amqp_write */ static int shutdown(void) { + pthread_mutex_lock (&amqp_conn_lock); + if (amqp_conn != NULL) + { + int sockfd; + + sockfd = amqp_get_sockfd (amqp_conn); + amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS); + amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection (amqp_conn); + close(sockfd); + amqp_conn = NULL; + } + pthread_mutex_unlock (&amqp_conn_lock); + sfree(host); sfree(vhost); sfree(user); sfree(password); sfree(exchange); sfree(routingkey); + return (0); } void module_register(void) { - plugin_register_config(PLUGIN_NAME, config, config_keys, config_keys_num); - plugin_register_write(PLUGIN_NAME, amqp_write, NULL); - plugin_register_shutdown(PLUGIN_NAME, shutdown); + plugin_register_config("amqp", config, config_keys, config_keys_num); + plugin_register_write("amqp", amqp_write, NULL); + plugin_register_shutdown("amqp", shutdown); } /* vim: set sw=4 sts=4 et : */ -- 2.30.2