From 8991abf98de418b8464aa0f3251b024c9292da96 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Sun, 11 Mar 2012 14:47:24 +0100 Subject: [PATCH] amqp plugin: Fix compabitility with current librabbitmq. In particular, add compatibility to the 0.9.1 and current development version. Unfortunately, no version macro exists, so we need to do some autoconf trickery :( Fixes GitHub issue #6. --- configure.in | 41 ++++++++++++++++++++++++++------------- src/amqp.c | 54 +++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/configure.in b/configure.in index 265d503d..e30d0f88 100644 --- a/configure.in +++ b/configure.in @@ -3244,26 +3244,39 @@ AC_ARG_WITH(librabbitmq, [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [P [ with_librabbitmq="yes" ]) +SAVE_CPPFLAGS="$CPPFLAGS" +SAVE_LDFLAGS="$LDFLAGS" +CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags" +LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags" if test "x$with_librabbitmq" = "xyes" then - SAVE_CPPFLAGS="$CPPFLAGS" - CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags" - AC_CHECK_HEADERS(amqp.h, [with_librabbitmq="yes"], [with_librabbitmq="no (amqp.h not found)"]) - - CPPFLAGS="$SAVE_CPPFLAGS" fi if test "x$with_librabbitmq" = "xyes" then - SAVE_CPPFLAGS="$CPPFLAGS" - SAVE_LDFLAGS="$LDFLAGS" - CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags" - LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags" - + # librabbitmq up to version 0.9.1 provides "library_errno", later + # versions use "library_error". The library does not provide a version + # macro :( Use "AC_CHECK_MEMBERS" (plural) for automatic defines. + AC_CHECK_MEMBERS([amqp_rpc_reply_t.library_errno],,, + [ +#if HAVE_STDLIB_H +# include +#endif +#if HAVE_STDIO_H +# include +#endif +#if HAVE_STDINT_H +# include +#endif +#if HAVE_INTTYPES_H +# include +#endif +#include + ]) +fi +if test "x$with_librabbitmq" = "xyes" +then AC_CHECK_LIB(rabbitmq, amqp_basic_publish, [with_librabbitmq="yes"], [with_librabbitmq="no (Symbol 'amqp_basic_publish' not found)"]) - - CPPFLAGS="$SAVE_CPPFLAGS" - LDFLAGS="$SAVE_LDFLAGS" fi if test "x$with_librabbitmq" = "xyes" then @@ -3275,6 +3288,8 @@ then AC_SUBST(BUILD_WITH_LIBRABBITMQ_LIBS) AC_DEFINE(HAVE_LIBRABBITMQ, 1, [Define if librabbitmq is present and usable.]) fi +CPPFLAGS="$SAVE_CPPFLAGS" +LDFLAGS="$SAVE_LDFLAGS" AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes") # }}} diff --git a/src/amqp.c b/src/amqp.c index be1f709e..55d2a2ce 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -1,7 +1,7 @@ /** * collectd - src/amqp.c - * Copyright (C) 2009 Sebastien Pahl - * Copyright (C) 2010 Florian Forster + * Copyright (C) 2009 Sebastien Pahl + * Copyright (C) 2010-2012 Florian Forster * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -178,8 +178,13 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: +#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO if (r.library_errno) return (sstrerror (r.library_errno, buffer, buffer_size)); +#else + if (r.library_error) + return (sstrerror (r.library_error, buffer, buffer_size)); +#endif else sstrncpy (buffer, "End of stream", sizeof (buffer)); break; @@ -216,6 +221,7 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ return (buffer); } /* }}} char *camqp_strerror */ +#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ { amqp_exchange_declare_ok_t *ed_ret; @@ -246,6 +252,46 @@ static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ return (0); } /* }}} int camqp_create_exchange */ +#else +static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ +{ + amqp_exchange_declare_ok_t *ed_ret; + amqp_table_t argument_table; + struct amqp_table_entry_t_ argument_table_entries[1]; + + if (conf->exchange_type == NULL) + return (0); + + /* Valid arguments: "auto_delete", "internal" */ + argument_table.num_entries = STATIC_ARRAY_SIZE (argument_table_entries); + argument_table.entries = argument_table_entries; + argument_table_entries[0].key = amqp_cstring_bytes ("auto_delete"); + argument_table_entries[0].value.kind = AMQP_FIELD_KIND_BOOLEAN; + argument_table_entries[0].value.value.boolean = 1; + + ed_ret = amqp_exchange_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* exchange = */ amqp_cstring_bytes (conf->exchange), + /* type = */ amqp_cstring_bytes (conf->exchange_type), + /* passive = */ 0, + /* durable = */ 0, + /* arguments = */ argument_table); + if ((ed_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_exchange_declare failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + INFO ("amqp plugin: Successfully created exchange \"%s\" " + "with type \"%s\".", + conf->exchange, conf->exchange_type); + + return (0); +} /* }}} int camqp_create_exchange */ +#endif static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ { @@ -316,7 +362,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ /* consumer_tag = */ AMQP_EMPTY_BYTES, /* no_local = */ 0, /* no_ack = */ 1, - /* exclusive = */ 0); + /* exclusive = */ 0, + /* arguments = */ AMQP_EMPTY_TABLE + ); if ((cm_ret == NULL) && camqp_is_error (conf)) { char errbuf[1024]; -- 2.30.2