Code

amqp plugin: Fix compabitility with current librabbitmq.
authorFlorian Forster <octo@collectd.org>
Sun, 11 Mar 2012 13:47:24 +0000 (14:47 +0100)
committerFlorian Forster <octo@collectd.org>
Sun, 11 Mar 2012 13:47:24 +0000 (14:47 +0100)
In particular, add compatibility to the 0.9.1 and current development version.
Unfortunately, no version macro exists, so we need to do some autoconf trickery
:(

Fixes GitHub issue #6.

configure.in
src/amqp.c

index 265d503d5128b6cbe66f1682f2df994341149077..e30d0f8855679262b78937c88da01a23f2260238 100644 (file)
@@ -3244,26 +3244,39 @@ AC_ARG_WITH(librabbitmq, [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [P
 [
        with_librabbitmq="yes"
 ])
+SAVE_CPPFLAGS="$CPPFLAGS"
+SAVE_LDFLAGS="$LDFLAGS"
+CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
 if test "x$with_librabbitmq" = "xyes"
 then
-       SAVE_CPPFLAGS="$CPPFLAGS"
-       CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
-
        AC_CHECK_HEADERS(amqp.h, [with_librabbitmq="yes"], [with_librabbitmq="no (amqp.h not found)"])
-
-       CPPFLAGS="$SAVE_CPPFLAGS"
 fi
 if test "x$with_librabbitmq" = "xyes"
 then
-       SAVE_CPPFLAGS="$CPPFLAGS"
-       SAVE_LDFLAGS="$LDFLAGS"
-       CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
-       LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
-
+       # librabbitmq up to version 0.9.1 provides "library_errno", later
+       # versions use "library_error". The library does not provide a version
+       # macro :( Use "AC_CHECK_MEMBERS" (plural) for automatic defines.
+       AC_CHECK_MEMBERS([amqp_rpc_reply_t.library_errno],,,
+                        [
+#if HAVE_STDLIB_H
+# include <stdlib.h>
+#endif
+#if HAVE_STDIO_H
+# include <stdio.h>
+#endif
+#if HAVE_STDINT_H
+# include <stdint.h>
+#endif
+#if HAVE_INTTYPES_H
+# include <inttypes.h>
+#endif
+#include <amqp.h>
+                         ])
+fi
+if test "x$with_librabbitmq" = "xyes"
+then
        AC_CHECK_LIB(rabbitmq, amqp_basic_publish, [with_librabbitmq="yes"], [with_librabbitmq="no (Symbol 'amqp_basic_publish' not found)"])
-
-       CPPFLAGS="$SAVE_CPPFLAGS"
-       LDFLAGS="$SAVE_LDFLAGS"
 fi
 if test "x$with_librabbitmq" = "xyes"
 then
@@ -3275,6 +3288,8 @@ then
        AC_SUBST(BUILD_WITH_LIBRABBITMQ_LIBS)
        AC_DEFINE(HAVE_LIBRABBITMQ, 1, [Define if librabbitmq is present and usable.])
 fi
+CPPFLAGS="$SAVE_CPPFLAGS"
+LDFLAGS="$SAVE_LDFLAGS"
 AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes")
 # }}}
 
index be1f709e120181efdf7ce8181c4f9032f02047ed..55d2a2ceff04f3772e43c21c4288ec6d97c1ddb2 100644 (file)
@@ -1,7 +1,7 @@
 /**
  * collectd - src/amqp.c
- * Copyright (C) 2009  Sebastien Pahl
- * Copyright (C) 2010  Florian Forster
+ * Copyright (C) 2009       Sebastien Pahl
+ * Copyright (C) 2010-2012  Florian Forster
  *
  * Permission is hereby granted, free of charge, to any person obtaining a
  * copy of this software and associated documentation files (the "Software"),
@@ -178,8 +178,13 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
             break;
 
         case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
             if (r.library_errno)
                 return (sstrerror (r.library_errno, buffer, buffer_size));
+#else
+            if (r.library_error)
+                return (sstrerror (r.library_error, buffer, buffer_size));
+#endif
             else
                 sstrncpy (buffer, "End of stream", sizeof (buffer));
             break;
@@ -216,6 +221,7 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
     return (buffer);
 } /* }}} char *camqp_strerror */
 
+#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
 static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
 {
     amqp_exchange_declare_ok_t *ed_ret;
@@ -246,6 +252,46 @@ static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
 
     return (0);
 } /* }}} int camqp_create_exchange */
+#else
+static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
+{
+    amqp_exchange_declare_ok_t *ed_ret;
+    amqp_table_t argument_table;
+    struct amqp_table_entry_t_ argument_table_entries[1];
+
+    if (conf->exchange_type == NULL)
+        return (0);
+
+    /* Valid arguments: "auto_delete", "internal" */
+    argument_table.num_entries = STATIC_ARRAY_SIZE (argument_table_entries);
+    argument_table.entries = argument_table_entries;
+    argument_table_entries[0].key = amqp_cstring_bytes ("auto_delete");
+    argument_table_entries[0].value.kind = AMQP_FIELD_KIND_BOOLEAN;
+    argument_table_entries[0].value.value.boolean = 1;
+
+    ed_ret = amqp_exchange_declare (conf->connection,
+            /* channel     = */ CAMQP_CHANNEL,
+            /* exchange    = */ amqp_cstring_bytes (conf->exchange),
+            /* type        = */ amqp_cstring_bytes (conf->exchange_type),
+            /* passive     = */ 0,
+            /* durable     = */ 0,
+            /* arguments   = */ argument_table);
+    if ((ed_ret == NULL) && camqp_is_error (conf))
+    {
+        char errbuf[1024];
+        ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
+                camqp_strerror (conf, errbuf, sizeof (errbuf)));
+        camqp_close_connection (conf);
+        return (-1);
+    }
+
+    INFO ("amqp plugin: Successfully created exchange \"%s\" "
+            "with type \"%s\".",
+            conf->exchange, conf->exchange_type);
+
+    return (0);
+} /* }}} int camqp_create_exchange */
+#endif
 
 static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
 {
@@ -316,7 +362,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
             /* consumer_tag = */ AMQP_EMPTY_BYTES,
             /* no_local     = */ 0,
             /* no_ack       = */ 1,
-            /* exclusive    = */ 0);
+            /* exclusive    = */ 0,
+            /* arguments    = */ AMQP_EMPTY_TABLE
+        );
     if ((cm_ret == NULL) && camqp_is_error (conf))
     {
         char errbuf[1024];