From: Sebastien Pahl Date: Tue, 9 Feb 2010 16:15:55 +0000 (+0100) Subject: First implentation of an amqp output plugin X-Git-Tag: collectd-5.0.0-beta0~28^2~38 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=cd423f6a7706e7445ca56d7221e8cc84f6d466cc;p=collectd.git First implentation of an amqp output plugin It's ugly for now but it works. --- diff --git a/configure.in b/configure.in index fc12c088..28e71b6a 100644 --- a/configure.in +++ b/configure.in @@ -1279,6 +1279,57 @@ then fi AM_CONDITIONAL(BUILD_WITH_LIBKVM_OPENFILES, test "x$with_kvm_openfiles" = "xyes") +# --with-librabbitmq {{{ +with_librabbitmq_cppflags="" +with_librabbitmq_ldflags="" +AC_ARG_WITH(librabbitmq, [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [Path to librabbitmq.])], +[ + if test "x$withval" != "xno" && test "x$withval" != "xyes" + then + with_librabbitmq_cppflags="-I$withval/include" + with_librabbitmq_ldflags="-L$withval/lib" + with_librabbitmq="yes" + else + with_librabbitmq="$withval" + fi +], +[ + with_librabbitmq="yes" +]) +if test "x$with_librabbitmq" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags" + + AC_CHECK_HEADERS(amqp.h, [with_librabbitmq="yes"], [with_librabbitmq="no (amqp.h not found)"]) + + CPPFLAGS="$SAVE_CPPFLAGS" +fi +if test "x$with_librabbitmq" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_LDFLAGS="$LDFLAGS" + CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags" + LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags" + + AC_CHECK_LIB(rabbitmq, amqp_basic_publish, [with_librabbitmq="yes"], [with_librabbitmq="no (Symbol 'amqp_basic_publish' not found)"]) + + CPPFLAGS="$SAVE_CPPFLAGS" + LDFLAGS="$SAVE_LDFLAGS" +fi +if test "x$with_librabbitmq" = "xyes" +then + BUILD_WITH_LIBRABBITMQ_CPPFLAGS="$with_librabbitmq_cppflags" + BUILD_WITH_LIBRABBITMQ_LDFLAGS="$with_librabbitmq_ldflags" + BUILD_WITH_LIBRABBITMQ_LIBS="-lrabbitmq" + AC_SUBST(BUILD_WITH_LIBRABBITMQ_CPPFLAGS) + AC_SUBST(BUILD_WITH_LIBRABBITMQ_LDFLAGS) + AC_SUBST(BUILD_WITH_LIBRABBITMQ_LIBS) + AC_DEFINE(HAVE_LIBRABBITMQ, 1, [Define if librabbitmq is present and usable.]) +fi +AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes") +# }}} + # --with-libcurl {{{ with_curl_config="curl-config" with_curl_cflags="" @@ -4403,6 +4454,7 @@ AC_ARG_ENABLE([all-plugins], m4_divert_once([HELP_ENABLE], []) +AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin]) AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics]) AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC]) AC_PLUGIN([apple_sensors], [$with_libiokit], [Apple's hardware sensors]) @@ -4699,6 +4751,7 @@ Configuration: libperl . . . . . . . $with_libperl libpq . . . . . . . . $with_libpq libpthread . . . . . $with_libpthread + librabbitmq . . . . . $with_librabbitmq librouteros . . . . . $with_librouteros librrd . . . . . . . $with_librrd libsensors . . . . . $with_libsensors @@ -4723,6 +4776,7 @@ Configuration: perl . . . . . . . . $with_perl_bindings Modules: + amqp . . . . . . . $enable_amqp apache . . . . . . . $enable_apache apcups . . . . . . . $enable_apcups apple_sensors . . . . $enable_apple_sensors diff --git a/src/Makefile.am b/src/Makefile.am index 00d0e20e..cabdc04d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -110,6 +110,16 @@ pkglib_LTLIBRARIES = BUILT_SOURCES = CLEANFILES = +if BUILD_PLUGIN_AMQP +pkglib_LTLIBRARIES += amqp.la +amqp_la_SOURCES = amqp.c utils_format_json.c utils_format_json.h +amqp_la_LDFLAGS = -module -avoid-version +amqp_la_CFLAGS = $(BUILD_WITH_LIBRABBITMQ_CFLAGS) +amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS) +collectd_LDADD += "-dlopen" amqp.la +collectd_DEPENDENCIES += amqp.la +endif + if BUILD_PLUGIN_APACHE pkglib_LTLIBRARIES += apache.la apache_la_SOURCES = apache.c diff --git a/src/amqp.c b/src/amqp.c new file mode 100644 index 00000000..0b470e10 --- /dev/null +++ b/src/amqp.c @@ -0,0 +1,181 @@ +/* +** +** collectd-amqp +** Copyright (c) <2009> +** +** Permission is hereby granted, free of charge, to any person +** obtaining a copy of this software and associated documentation +** files (the "Software"), to deal in the Software without +** restriction, including without limitation the rights to use, +** copy, modify, merge, publish, distribute, sublicense, and/or sell +** copies of the Software, and to permit persons to whom the +** Software is furnished to do so, subject to the following +** conditions: +** +** The above copyright notice and this permission notice shall be +** included in all copies or substantial portions of the Software. +** +** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +** EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +** OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +** NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +** HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +** WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +** FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +** OTHER DEALINGS IN THE SOFTWARE. +** +*/ + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#define PLUGIN_NAME "amqp" + +static int port; +static char *host = NULL; +static char *vhost = NULL; +static char *user = NULL; +static char *password = NULL; +static char *exchange = NULL; +static char *routingkey = NULL; + +static const char *config_keys[] = +{ + "Host", + "Port", + "VHost", + "User", + "Password", + "Exchange", + "RoutingKey" +}; + +static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); + +static void config_free(char *var) +{ + if (var != NULL) + free(var); +} + +static int config_set(char **var, const char *value) +{ + config_free(*var); + if ((*var = strdup(value)) == NULL) + return (1); + return (0); +} + +static int config(const char *key, const char *value) +{ + if (strcasecmp(key, "host") == 0) + return (config_set(&host, value)); + else if(strcasecmp(key, "port") == 0) + { + port = atoi(value); + return (0); + } + else if (strcasecmp(key, "vhost") == 0) + return (config_set(&vhost, value)); + else if (strcasecmp(key, "user") == 0) + return (config_set(&user, value)); + else if (strcasecmp(key, "password") == 0) + return (config_set(&password, value)); + else if (strcasecmp(key, "exchange") == 0) + return (config_set(&exchange, value)); + else if (strcasecmp(key, "routingkey") == 0) + return (config_set(&routingkey, value)); + return (-1); +} + +static int amqp_write(const data_set_t *ds, const value_list_t *vl, user_data_t *user_data) +{ + int error; + int sockfd; + size_t bfree; + size_t bfill; + char buffer[4096]; + amqp_rpc_reply_t reply; + amqp_connection_state_t conn; + amqp_basic_properties_t props; + + conn = amqp_new_connection(); + if ((sockfd = amqp_open_socket(host, port)) < 0) + { + amqp_destroy_connection(conn); + return (1); + } + amqp_set_sockfd(conn, sockfd); + reply = amqp_login(conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user, password); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + amqp_destroy_connection(conn); + close(sockfd); + return (1); + } + amqp_channel_open(conn, 1); + if (amqp_rpc_reply.reply_type != AMQP_RESPONSE_NORMAL) + { + amqp_connection_close(conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(conn); + close(sockfd); + return (1); + } + error = 0; + memset(buffer, 0, sizeof(buffer)); + bfree = sizeof(buffer); + bfill = 0; + format_json_initialize(buffer, &bfill, &bfree); + format_json_value_list(buffer, &bfill, &bfree, ds, vl); + format_json_finalize(buffer, &bfill, &bfree); + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_cstring_bytes("application/json"); + props.delivery_mode = 2; // persistent delivery mode + error = amqp_basic_publish(conn, + 1, + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(routingkey), + 0, + 0, + &props, + amqp_cstring_bytes(buffer)); + reply = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + error = 1; + reply = amqp_connection_close(conn, AMQP_REPLY_SUCCESS); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + error = 1; + amqp_destroy_connection(conn); + if (close(sockfd) < 0) + error = 1; + return (error); +} + +static int shutdown(void) +{ + config_free(host); + config_free(vhost); + config_free(user); + config_free(password); + config_free(exchange); + config_free(routingkey); + return (0); +} + +void module_register(void) +{ + plugin_register_config(PLUGIN_NAME, config, config_keys, config_keys_num); + plugin_register_write(PLUGIN_NAME, amqp_write, NULL); + plugin_register_shutdown(PLUGIN_NAME, shutdown); +} +