Code

amqp plugin: Implement persistent connection handling.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 4 Aug 2010 17:31:44 +0000 (19:31 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 4 Aug 2010 17:31:44 +0000 (19:31 +0200)
A connection is now opened on the first value to be written and is kept
open for as long as possible. This is hopefully much more efficient than
opening a new connection for each value.

The value_list_t is now converted to JSON first, before locking the
global connection object, in order to hold the lock for the least
possible time.

src/amqp.c

index 489936484c0fddbce178f7a3c91d1ac2e1cd7e8d..ecbf338e2fb0f4e5fcfc75aa154eb7d4ede76cef 100644 (file)
 #include <stdlib.h>
 #include <unistd.h>
 #include <strings.h>
+#include <pthread.h>
 
-#include <collectd.h>
-#include <common.h>
-#include <plugin.h>
-#include <utils_format_json.h>
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+#include "utils_format_json.h"
 
 #include <amqp.h>
 #include <amqp_framing.h>
 
-#define PLUGIN_NAME "amqp"
-
 static int  port;
 static char *host       = NULL;
 static char *vhost      = NULL;
@@ -49,6 +48,9 @@ static char *password   = NULL;
 static char *exchange   = NULL;
 static char *routingkey = NULL;
 
+static amqp_connection_state_t amqp_conn = NULL;
+static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
+
 static const char *config_keys[] =
 {
     "Host",
@@ -102,61 +104,74 @@ static int config(const char *key, const char *value)
     return (-1);
 }
 
-static int amqp_write(const data_set_t *ds, const value_list_t *vl, user_data_t *user_data)
+static int amqp_write_locked (const char *buffer)
 {
-    int error;
-    int sockfd;
-    size_t bfree;
-    size_t bfill;
-    char buffer[4096];
     amqp_rpc_reply_t reply;
-    amqp_connection_state_t conn;
     amqp_basic_properties_t props;
+    int status;
 
-    /* TODO: Don't create a new connection for each value that is to be dispatched. */
-    conn = amqp_new_connection();
-    if ((sockfd = amqp_open_socket(host, port)) < 0)
-    {
-        ERROR ("amqp plugin: amqp_open_socket failed.");
-        amqp_destroy_connection(conn);
-        return (1);
-    }
-    amqp_set_sockfd(conn, sockfd);
-    reply = amqp_login(conn, vhost,
-            /* channel max = */      0,
-            /* frame max = */   131072,
-            /* heartbeat = */        0,
-            /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
-    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+    if (amqp_conn == NULL)
     {
-        ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
-                vhost, user);
-        amqp_destroy_connection(conn);
-        close(sockfd);
-        return (1);
-    }
-    amqp_channel_open(conn, 1);
-    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
-    {
-        ERROR ("amqp plugin: amqp_channel_open failed.");
-        amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
-        amqp_destroy_connection(conn);
-        close(sockfd);
-        return (1);
-    }
-    error = 0;
-    memset(buffer, 0, sizeof(buffer));
-    bfree = sizeof(buffer);
-    bfill = 0;
-    format_json_initialize(buffer, &bfill, &bfree);
-    /* TODO: Possibly add a config option "StoreRates" and pass the value along here. */
-    format_json_value_list(buffer, &bfill, &bfree, ds, vl,
-            /* rates = */ 0);
-    format_json_finalize(buffer, &bfill, &bfree);
+        int sockfd;
+
+        amqp_conn = amqp_new_connection ();
+        if (amqp_conn == NULL)
+        {
+            ERROR ("amqp plugin: amqp_new_connection failed.");
+            return (ENOMEM);
+        }
+
+        sockfd = amqp_open_socket (host, port);
+        if (sockfd < 0)
+        {
+            char errbuf[1024];
+            status = (-1) * sockfd;
+            ERROR ("amqp plugin: amqp_open_socket failed: %s",
+                    sstrerror (status, errbuf, sizeof (errbuf)));
+            amqp_destroy_connection(amqp_conn);
+            amqp_conn = NULL;
+            return (status);
+        }
+
+        amqp_set_sockfd (amqp_conn, sockfd);
+
+        reply = amqp_login(amqp_conn, vhost,
+                /* channel max = */      0,
+                /* frame max = */   131072,
+                /* heartbeat = */        0,
+                /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
+        if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+        {
+            ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
+                    vhost, user);
+            amqp_destroy_connection(amqp_conn);
+            close(sockfd);
+            amqp_conn = NULL;
+            return (1);
+        }
+
+        amqp_channel_open (amqp_conn, /* channel = */ 1);
+        /* FIXME: Is checking "reply.reply_type" really correct here? How does
+         * it get set? --octo */
+        if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+        {
+            ERROR ("amqp plugin: amqp_channel_open failed.");
+            amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
+            amqp_destroy_connection(amqp_conn);
+            close(sockfd);
+            amqp_conn = NULL;
+            return (1);
+        }
+
+        INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
+                "on %s:%i.", vhost, host, port);
+    } /* if (amqp_conn == NULL) */
+
     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
     props.content_type = amqp_cstring_bytes("application/json");
     props.delivery_mode = 2; /* persistent delivery mode */
-    error = amqp_basic_publish(conn,
+
+    status = amqp_basic_publish(amqp_conn,
                 /* channel = */ 1,
                 amqp_cstring_bytes(exchange),
                 amqp_cstring_bytes(routingkey),
@@ -164,40 +179,82 @@ static int amqp_write(const data_set_t *ds, const value_list_t *vl, user_data_t
                 /* immediate = */ 0,
                 &props,
                 amqp_cstring_bytes(buffer));
-    if (error != 0)
+    if (status != 0)
     {
+        int sockfd;
+
         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
-                error);
+                status);
+
+        sockfd = amqp_get_sockfd (amqp_conn);
+        amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
+        amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
+        amqp_destroy_connection (amqp_conn);
+        close(sockfd);
+        amqp_conn = NULL;
     }
 
-    reply = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
-    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
-        error = 1;
-    reply = amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
-    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
-        error = 1;
-    amqp_destroy_connection(conn);
-    if (close(sockfd) < 0)
-        error = 1;
-    return (error);
-}
+    return (status);
+} /* int amqp_write_locked */
+
+static int amqp_write (const data_set_t *ds, const value_list_t *vl,
+        __attribute__((unused)) user_data_t *user_data)
+{
+    char buffer[4096];
+    size_t bfree;
+    size_t bfill;
+    int status;
+
+    if ((ds == NULL) || (vl == NULL))
+        return (EINVAL);
+
+    memset (buffer, 0, sizeof (buffer));
+    bfree = sizeof (buffer);
+    bfill = 0;
+
+    format_json_initialize(buffer, &bfill, &bfree);
+    /* TODO: Possibly add a config option "StoreRates" and pass the value along here. */
+    format_json_value_list(buffer, &bfill, &bfree, ds, vl, /* rates = */ 0);
+    format_json_finalize(buffer, &bfill, &bfree);
+
+    pthread_mutex_lock (&amqp_conn_lock);
+    status = amqp_write_locked (buffer);
+    pthread_mutex_unlock (&amqp_conn_lock);
+
+    return (status);
+} /* int amqp_write */
 
 static int shutdown(void)
 {
+    pthread_mutex_lock (&amqp_conn_lock);
+    if (amqp_conn != NULL)
+    {
+        int sockfd;
+
+        sockfd = amqp_get_sockfd (amqp_conn);
+        amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
+        amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
+        amqp_destroy_connection (amqp_conn);
+        close(sockfd);
+        amqp_conn = NULL;
+    }
+    pthread_mutex_unlock (&amqp_conn_lock);
+
     sfree(host);
     sfree(vhost);
     sfree(user);
     sfree(password);
     sfree(exchange);
     sfree(routingkey);
+
     return (0);
 }
 
 void module_register(void)
 {
-    plugin_register_config(PLUGIN_NAME, config, config_keys, config_keys_num);
-    plugin_register_write(PLUGIN_NAME, amqp_write, NULL);
-    plugin_register_shutdown(PLUGIN_NAME, shutdown);
+    plugin_register_config("amqp", config, config_keys, config_keys_num);
+    plugin_register_write("amqp", amqp_write, NULL);
+    plugin_register_shutdown("amqp", shutdown);
 }
 
 /* vim: set sw=4 sts=4 et : */