Code

amqp plugin: Put the connecting code into a separate function.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 4 Aug 2010 21:08:16 +0000 (23:08 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 4 Aug 2010 21:08:16 +0000 (23:08 +0200)
src/amqp.c

index a432425330ffb74cae25d5ed59b61716a02cf881..488fbac7f9aafee4b803672059a24ce10d146053 100644 (file)
@@ -119,69 +119,79 @@ static int config(const char *key, const char *value)
     return (-1);
 }
 
-static int amqp_write_locked (const char *buffer)
+static int amqp_connect (void)
 {
     amqp_rpc_reply_t reply;
-    amqp_basic_properties_t props;
+    int sockfd;
     int status;
 
+    if (amqp_conn != NULL)
+        return (0);
+
+    amqp_conn = amqp_new_connection ();
     if (amqp_conn == NULL)
     {
-        int sockfd;
+        ERROR ("amqp plugin: amqp_new_connection failed.");
+        return (ENOMEM);
+    }
 
-        amqp_conn = amqp_new_connection ();
-        if (amqp_conn == NULL)
-        {
-            ERROR ("amqp plugin: amqp_new_connection failed.");
-            return (ENOMEM);
-        }
+    sockfd = amqp_open_socket (host, port);
+    if (sockfd < 0)
+    {
+        char errbuf[1024];
+        status = (-1) * sockfd;
+        ERROR ("amqp plugin: amqp_open_socket failed: %s",
+                sstrerror (status, errbuf, sizeof (errbuf)));
+        amqp_destroy_connection(amqp_conn);
+        amqp_conn = NULL;
+        return (status);
+    }
 
-        sockfd = amqp_open_socket (host, port);
-        if (sockfd < 0)
-        {
-            char errbuf[1024];
-            status = (-1) * sockfd;
-            ERROR ("amqp plugin: amqp_open_socket failed: %s",
-                    sstrerror (status, errbuf, sizeof (errbuf)));
-            amqp_destroy_connection(amqp_conn);
-            amqp_conn = NULL;
-            return (status);
-        }
+    amqp_set_sockfd (amqp_conn, sockfd);
 
-        amqp_set_sockfd (amqp_conn, sockfd);
+    reply = amqp_login(amqp_conn, vhost,
+            /* channel max = */      0,
+            /* frame max = */   131072,
+            /* heartbeat = */        0,
+            /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
+    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+    {
+        ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
+                vhost, user);
+        amqp_destroy_connection(amqp_conn);
+        close(sockfd);
+        amqp_conn = NULL;
+        return (1);
+    }
 
-        reply = amqp_login(amqp_conn, vhost,
-                /* channel max = */      0,
-                /* frame max = */   131072,
-                /* heartbeat = */        0,
-                /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
-        if (reply.reply_type != AMQP_RESPONSE_NORMAL)
-        {
-            ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
-                    vhost, user);
-            amqp_destroy_connection(amqp_conn);
-            close(sockfd);
-            amqp_conn = NULL;
-            return (1);
-        }
+    amqp_channel_open (amqp_conn, /* channel = */ 1);
+    /* FIXME: Is checking "reply.reply_type" really correct here? How does
+     * it get set? --octo */
+    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+    {
+        ERROR ("amqp plugin: amqp_channel_open failed.");
+        amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
+        amqp_destroy_connection(amqp_conn);
+        close(sockfd);
+        amqp_conn = NULL;
+        return (1);
+    }
 
-        amqp_channel_open (amqp_conn, /* channel = */ 1);
-        /* FIXME: Is checking "reply.reply_type" really correct here? How does
-         * it get set? --octo */
-        if (reply.reply_type != AMQP_RESPONSE_NORMAL)
-        {
-            ERROR ("amqp plugin: amqp_channel_open failed.");
-            amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
-            amqp_destroy_connection(amqp_conn);
-            close(sockfd);
-            amqp_conn = NULL;
-            return (1);
-        }
+    INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
+            "on %s:%i.", vhost, host, port);
+    return (0);
+} /* int amqp_connect */
 
-        INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
-                "on %s:%i.", vhost, host, port);
-    } /* if (amqp_conn == NULL) */
+static int amqp_write_locked (const char *buffer)
+{
+    amqp_basic_properties_t props;
+    int status;
+
+    status = amqp_connect ();
+    if (status != 0)
+        return (status);
 
+    memset (&props, 0, sizeof (props));
     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
     props.content_type = amqp_cstring_bytes("application/json");
     props.delivery_mode = delivery_mode;
@@ -205,7 +215,7 @@ static int amqp_write_locked (const char *buffer)
         amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
         amqp_destroy_connection (amqp_conn);
-        close(sockfd);
+        close (sockfd);
         amqp_conn = NULL;
     }