From 1811b98c6d2f6356612fbc5539b8190111a0af73 Mon Sep 17 00:00:00 2001 From: Saikrishna Arcot Date: Sun, 9 Apr 2017 08:40:15 -0700 Subject: [PATCH] Update mongodb write code to use latest API (Fixes: #492) (#2236) Update mongodb write code to use latest API (Fixes: #492) --- Makefile.am | 3 +- build.sh | 7 +- configure.ac | 50 +++++--- src/write_mongodb.c | 294 +++++++++++++++++++++++++++----------------- 4 files changed, 212 insertions(+), 142 deletions(-) diff --git a/Makefile.am b/Makefile.am index 23e6a24f..1f18d231 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1764,9 +1764,8 @@ endif if BUILD_PLUGIN_WRITE_MONGODB pkglib_LTLIBRARIES += write_mongodb.la write_mongodb_la_SOURCES = src/write_mongodb.c -write_mongodb_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBMONGOC_CPPFLAGS) +write_mongodb_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBMONGOC_CFLAGS) write_mongodb_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBMONGOC_LDFLAGS) -write_mongodb_la_LIBADD = -lmongoc endif if BUILD_PLUGIN_WRITE_PROMETHEUS diff --git a/build.sh b/build.sh index 465eff9d..40f5361d 100755 --- a/build.sh +++ b/build.sh @@ -18,12 +18,7 @@ EOF done } -check_for_application lex bison autoheader aclocal automake autoconf - -# Actually we don't need the pkg-config executable, but we need the M4 macros. -# We check for `pkg-config' here and hope that M4 macros will then be -# available, too. -check_for_application pkg-config +check_for_application lex bison autoheader aclocal automake autoconf pkg-config libtoolize="" libtoolize --version >/dev/null 2>/dev/null diff --git a/configure.ac b/configure.ac index b6a47dcf..320572d0 100644 --- a/configure.ac +++ b/configure.ac @@ -3310,52 +3310,62 @@ AC_ARG_WITH([libmongoc], else if test "x$withval" = "xno"; then with_libmongoc="no" else - with_libmongoc="yes" - LIBMONGOC_CPPFLAGS="$LIBMONGOC_CPPFLAGS -I$withval/include" - LIBMONGOC_LDFLAGS="$LIBMONGOC_LDFLAGS -L$withval/lib" + with_libmongoc="no" fi; fi ], [with_libmongoc="yes"] ) -SAVE_CPPFLAGS="$CPPFLAGS" -SAVE_LDFLAGS="$LDFLAGS" - -CPPFLAGS="$CPPFLAGS $LIBMONGOC_CPPFLAGS" -LDFLAGS="$LDFLAGS $LIBMONGOC_LDFLAGS" +if test "x$with_libmongoc" = "xyes"; then + PKG_CHECK_MODULES([LIBMONGOC], [libmongoc-1.0], + [with_libmongoc="yes"], + [with_libmongoc="no (pkg-config could not find libmongoc)"] + ) +fi if test "x$with_libmongoc" = "xyes"; then - if test "x$LIBMONGOC_CPPFLAGS" != "x"; then - AC_MSG_NOTICE([libmongoc CPPFLAGS: $LIBMONGOC_CPPFLAGS]) + SAVE_CPPFLAGS="$CPPFLAGS" + + CPPFLAGS="$CPPFLAGS $LIBMONGOC_CFLAGS" + + if test "x$CPPFLAGS" != "x"; then + AC_MSG_NOTICE([libmongoc CPPFLAGS: $LIBMONGOC_CFLAGS]) fi - AC_CHECK_HEADERS([mongo.h], + AC_CHECK_HEADERS([mongoc.h], [with_libmongoc="yes"], - [with_libmongoc="no ('mongo.h' not found)"], - [[#define MONGO_HAVE_STDINT 1]] + [with_libmongoc="no ('mongoc.h' not found)"] ) + + CPPFLAGS="$SAVE_CPPFLAGS" fi if test "x$with_libmongoc" = "xyes"; then + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_LDFLAGS="$LDFLAGS" + + CPPFLAGS="$CPPFLAGS $LIBMONGOC_CFLAGS" + LDFLAGS="$LDFLAGS $LIBMONGOC_LDFLAGS" + if test "x$LIBMONGOC_LDFLAGS" != "x"; then AC_MSG_NOTICE([libmongoc LDFLAGS: $LIBMONGOC_LDFLAGS]) fi - AC_CHECK_LIB([mongoc], [mongo_run_command], + AC_CHECK_LIB([mongoc-1.0], [mongoc_init], [with_libmongoc="yes"], - [with_libmongoc="no (symbol 'mongo_run_command' not found)"] + [with_libmongoc="no (symbol 'mongoc_init' not found)"] ) -fi -CPPFLAGS="$SAVE_CPPFLAGS" -LDFLAGS="$SAVE_LDFLAGS" + CPPFLAGS="$SAVE_CPPFLAGS" + LDFLAGS="$SAVE_LDFLAGS" +fi if test "x$with_libmongoc" = "xyes"; then - BUILD_WITH_LIBMONGOC_CPPFLAGS="$LIBMONGOC_CPPFLAGS" + BUILD_WITH_LIBMONGOC_CFLAGS="$LIBMONGOC_CFLAGS" BUILD_WITH_LIBMONGOC_LDFLAGS="$LIBMONGOC_LDFLAGS" fi -AC_SUBST([BUILD_WITH_LIBMONGOC_CPPFLAGS]) +AC_SUBST([BUILD_WITH_LIBMONGOC_CFLAGS]) AC_SUBST([BUILD_WITH_LIBMONGOC_LDFLAGS]) # }}} diff --git a/src/write_mongodb.c b/src/write_mongodb.c index 10f78322..66dc8e07 100644 --- a/src/write_mongodb.c +++ b/src/write_mongodb.c @@ -3,6 +3,7 @@ * Copyright (C) 2010-2013 Florian Forster * Copyright (C) 2010 Akkarit Sangpetch * Copyright (C) 2012 Chris Lundquist + * Copyright (C) 2017 Saikrishna Arcot * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -26,6 +27,7 @@ * Florian Forster * Akkarit Sangpetch * Chris Lundquist + * Saikrishna Arcot **/ #include "collectd.h" @@ -34,13 +36,7 @@ #include "plugin.h" #include "utils_cache.h" -#define MONGO_HAVE_STDINT 1 -#include - -#if (MONGO_MAJOR == 0) && (MONGO_MINOR < 8) -#define bson_alloc() bson_create() -#define bson_dealloc(b) bson_dispose(b) -#endif +#include struct wm_node_s { char name[DATA_MAX_NAME_LEN]; @@ -55,8 +51,10 @@ struct wm_node_s { char *passwd; _Bool store_rates; + _Bool connected; - mongo conn[1]; + mongoc_client_t *client; + mongoc_database_t *database; pthread_mutex_t lock; }; typedef struct wm_node_s wm_node_t; @@ -64,170 +62,235 @@ typedef struct wm_node_s wm_node_t; /* * Functions */ -static bson *wm_create_bson(const data_set_t *ds, /* {{{ */ - const value_list_t *vl, _Bool store_rates) { - bson *ret; +static bson_t *wm_create_bson(const data_set_t *ds, /* {{{ */ + const value_list_t *vl, _Bool store_rates) { + bson_t *ret; + bson_t subarray; gauge_t *rates; - ret = bson_alloc(); /* matched by bson_dealloc() */ - if (ret == NULL) { - ERROR("write_mongodb plugin: bson_create failed."); - return (NULL); + ret = bson_new(); + if (!ret) { + ERROR("write_mongodb plugin: bson_new failed."); + return NULL; } if (store_rates) { rates = uc_get_rate(ds, vl); if (rates == NULL) { ERROR("write_mongodb plugin: uc_get_rate() failed."); - return (NULL); + bson_free(ret); + return NULL; } } else { rates = NULL; } - bson_init(ret); /* matched by bson_destroy() */ - bson_append_date(ret, "time", (bson_date_t)CDTIME_T_TO_MS(vl->time)); - bson_append_string(ret, "host", vl->host); - bson_append_string(ret, "plugin", vl->plugin); - bson_append_string(ret, "plugin_instance", vl->plugin_instance); - bson_append_string(ret, "type", vl->type); - bson_append_string(ret, "type_instance", vl->type_instance); + BSON_APPEND_DATE_TIME(ret, "timestamp", CDTIME_T_TO_MS(vl->time)); + BSON_APPEND_UTF8(ret, "host", vl->host); + BSON_APPEND_UTF8(ret, "plugin", vl->plugin); + BSON_APPEND_UTF8(ret, "plugin_instance", vl->plugin_instance); + BSON_APPEND_UTF8(ret, "type", vl->type); + BSON_APPEND_UTF8(ret, "type_instance", vl->type_instance); - bson_append_start_array(ret, "values"); /* {{{ */ + BSON_APPEND_ARRAY_BEGIN(ret, "values", &subarray); /* {{{ */ for (int i = 0; i < ds->ds_num; i++) { char key[16]; ssnprintf(key, sizeof(key), "%i", i); if (ds->ds[i].type == DS_TYPE_GAUGE) - bson_append_double(ret, key, vl->values[i].gauge); + BSON_APPEND_DOUBLE(&subarray, key, vl->values[i].gauge); else if (store_rates) - bson_append_double(ret, key, (double)rates[i]); + BSON_APPEND_DOUBLE(&subarray, key, (double)rates[i]); else if (ds->ds[i].type == DS_TYPE_COUNTER) - bson_append_long(ret, key, vl->values[i].counter); + BSON_APPEND_INT64(&subarray, key, vl->values[i].counter); else if (ds->ds[i].type == DS_TYPE_DERIVE) - bson_append_long(ret, key, vl->values[i].derive); + BSON_APPEND_INT64(&subarray, key, vl->values[i].derive); else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) - bson_append_long(ret, key, vl->values[i].absolute); - else - assert(23 == 42); + BSON_APPEND_INT64(&subarray, key, vl->values[i].absolute); + else { + ERROR("write_mongodb plugin: Unknown ds_type %d for index %d", + ds->ds[i].type, i); + bson_free(ret); + return NULL; + } } - bson_append_finish_array(ret); /* }}} values */ + bson_append_array_end(ret, &subarray); /* }}} values */ - bson_append_start_array(ret, "dstypes"); /* {{{ */ + BSON_APPEND_ARRAY_BEGIN(ret, "dstypes", &subarray); /* {{{ */ for (int i = 0; i < ds->ds_num; i++) { char key[16]; ssnprintf(key, sizeof(key), "%i", i); if (store_rates) - bson_append_string(ret, key, "gauge"); + BSON_APPEND_UTF8(&subarray, key, "gauge"); else - bson_append_string(ret, key, DS_TYPE_TO_STRING(ds->ds[i].type)); + BSON_APPEND_UTF8(&subarray, key, DS_TYPE_TO_STRING(ds->ds[i].type)); } - bson_append_finish_array(ret); /* }}} dstypes */ + bson_append_array_end(ret, &subarray); /* }}} dstypes */ - bson_append_start_array(ret, "dsnames"); /* {{{ */ + BSON_APPEND_ARRAY_BEGIN(ret, "dsnames", &subarray); /* {{{ */ for (int i = 0; i < ds->ds_num; i++) { char key[16]; ssnprintf(key, sizeof(key), "%i", i); - bson_append_string(ret, key, ds->ds[i].name); + BSON_APPEND_UTF8(&subarray, key, ds->ds[i].name); } - bson_append_finish_array(ret); /* }}} dsnames */ - - bson_finish(ret); + bson_append_array_end(ret, &subarray); /* }}} dsnames */ sfree(rates); - return (ret); + + size_t error_location; + if (!bson_validate(ret, BSON_VALIDATE_UTF8, &error_location)) { + ERROR("write_mongodb plugin: Error in generated BSON document " + "at byte %zu", error_location); + bson_free(ret); + return NULL; + } + + return ret; } /* }}} bson *wm_create_bson */ -static int wm_write(const data_set_t *ds, /* {{{ */ - const value_list_t *vl, user_data_t *ud) { - wm_node_t *node = ud->data; - char collection_name[512]; - bson *bson_record; - int status; +static int wm_initialize(wm_node_t *node) /* {{{ */ +{ + char *uri; + size_t uri_length; + char const *format_string; - ssnprintf(collection_name, sizeof(collection_name), "collectd.%s", - vl->plugin); + if (node->connected) { + return 0; + } - bson_record = wm_create_bson(ds, vl, node->store_rates); - if (bson_record == NULL) - return (ENOMEM); + INFO("write_mongodb plugin: Connecting to [%s]:%i", + (node->host != NULL) ? node->host : "localhost", + (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT); + + if ((node->db != NULL) && (node->user != NULL) && (node->passwd != NULL)) { + format_string = "mongodb://%s:%s@%s:%d/?authSource=%s"; + uri_length = strlen(format_string) + strlen(node->user) + + strlen(node->passwd) + strlen(node->host) + 5 + + strlen(node->db) + 1; + if ((uri = calloc(sizeof(char), uri_length)) == NULL) { + ERROR("write_mongodb plugin: Not enough memory to assemble " + "authentication string."); + mongoc_client_destroy(node->client); + node->client = NULL; + node->connected = 0; + return -1; + } + ssnprintf(uri, uri_length, format_string, node->user, node->passwd, + node->host, node->port, node->db); - pthread_mutex_lock(&node->lock); + node->client = mongoc_client_new(uri); + if (!node->client) { + ERROR("write_mongodb plugin: Authenticating to [%s]%i for database " + "\"%s\" as user \"%s\" failed.", + (node->host != NULL) ? node->host : "localhost", + (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT, node->db, + node->user); + node->connected = 0; + sfree(uri); + return -1; + } + } else { + format_string = "mongodb://%s:%d"; + uri_length = strlen(format_string) + strlen(node->host) + 5 + 1; + if ((uri = calloc(sizeof(char), uri_length)) == NULL) { + ERROR("write_mongodb plugin: Not enough memory to assemble " + "authentication string."); + mongoc_client_destroy(node->client); + node->client = NULL; + node->connected = 0; + return -1; + } + snprintf(uri, uri_length, format_string, node->host, node->port); - if (!mongo_is_connected(node->conn)) { - INFO("write_mongodb plugin: Connecting to [%s]:%i", - (node->host != NULL) ? node->host : "localhost", - (node->port != 0) ? node->port : MONGO_DEFAULT_PORT); - status = mongo_connect(node->conn, node->host, node->port); - if (status != MONGO_OK) { + node->client = mongoc_client_new(uri); + if (!node->client) { ERROR("write_mongodb plugin: Connecting to [%s]:%i failed.", (node->host != NULL) ? node->host : "localhost", - (node->port != 0) ? node->port : MONGO_DEFAULT_PORT); - mongo_destroy(node->conn); - pthread_mutex_unlock(&node->lock); - return (-1); + (node->port != 0) ? node->port : MONGOC_DEFAULT_PORT); + node->connected = 0; + sfree(uri); + return -1; } + } + sfree(uri); + + node->database = mongoc_client_get_database(node->client, "collectd"); + if (!node->database) { + ERROR("write_mongodb plugin: error creating/getting database"); + mongoc_client_destroy(node->client); + node->client = NULL; + node->connected = 0; + return -1; + } - if ((node->db != NULL) && (node->user != NULL) && (node->passwd != NULL)) { - status = mongo_cmd_authenticate(node->conn, node->db, node->user, - node->passwd); - if (status != MONGO_OK) { - ERROR("write_mongodb plugin: Authenticating to [%s]%i for database " - "\"%s\" as user \"%s\" failed.", - (node->host != NULL) ? node->host : "localhost", - (node->port != 0) ? node->port : MONGO_DEFAULT_PORT, node->db, - node->user); - mongo_destroy(node->conn); - pthread_mutex_unlock(&node->lock); - return (-1); - } - } + node->connected = 1; + return 0; +} /* }}} int wm_initialize */ - if (node->timeout > 0) { - status = mongo_set_op_timeout(node->conn, node->timeout); - if (status != MONGO_OK) { - WARNING("write_mongodb plugin: mongo_set_op_timeout(%i) failed: %s", - node->timeout, node->conn->errstr); - } - } +static int wm_write(const data_set_t *ds, /* {{{ */ + const value_list_t *vl, user_data_t *ud) { + wm_node_t *node = ud->data; + mongoc_collection_t *collection = NULL; + bson_t *bson_record; + bson_error_t error; + int status; + + bson_record = wm_create_bson(ds, vl, node->store_rates); + if (!bson_record) { + ERROR("write_mongodb plugin: error making insert bson"); + return -1; } - /* Assert if the connection has been established */ - assert(mongo_is_connected(node->conn)); - -#if MONGO_MINOR >= 6 - /* There was an API change in 0.6.0 as linked below */ - /* https://github.com/mongodb/mongo-c-driver/blob/master/HISTORY.md */ - status = mongo_insert(node->conn, collection_name, bson_record, NULL); -#else - status = mongo_insert(node->conn, collection_name, bson_record); -#endif - - if (status != MONGO_OK) { - ERROR("write_mongodb plugin: error inserting record: %d", node->conn->err); - if (node->conn->err != MONGO_BSON_INVALID) - ERROR("write_mongodb plugin: %s", node->conn->errstr); - else - ERROR("write_mongodb plugin: Invalid BSON structure, error = %#x", - (unsigned int)bson_record->err); + pthread_mutex_lock(&node->lock); + if (wm_initialize(node) < 0) { + ERROR("write_mongodb plugin: error making connection to server"); + pthread_mutex_unlock(&node->lock); + bson_free(bson_record); + return -1; + } - /* Disconnect except on data errors. */ - if ((node->conn->err != MONGO_BSON_INVALID) && - (node->conn->err != MONGO_BSON_NOT_FINISHED)) - mongo_destroy(node->conn); + collection = + mongoc_client_get_collection(node->client, "collectd", vl->plugin); + if (!collection) { + ERROR("write_mongodb plugin: error creating/getting collection"); + mongoc_database_destroy(node->database); + mongoc_client_destroy(node->client); + node->database = NULL; + node->client = NULL; + node->connected = 0; + pthread_mutex_unlock(&node->lock); + bson_free(bson_record); + return -1; } - pthread_mutex_unlock(&node->lock); + status = mongoc_collection_insert(collection, MONGOC_INSERT_NONE, bson_record, + NULL, &error); + + if (!status) { + ERROR("write_mongodb plugin: error inserting record: %s", error.message); + mongoc_database_destroy(node->database); + mongoc_client_destroy(node->client); + node->database = NULL; + node->client = NULL; + node->connected = 0; + pthread_mutex_unlock(&node->lock); + bson_free(bson_record); + mongoc_collection_destroy(collection); + return -1; + } /* free our resource as not to leak memory */ - bson_destroy(bson_record); /* matches bson_init() */ - bson_dealloc(bson_record); /* matches bson_alloc() */ + mongoc_collection_destroy(collection); - return (0); + pthread_mutex_unlock(&node->lock); + + bson_free(bson_record); + + return 0; } /* }}} int wm_write */ static void wm_config_free(void *ptr) /* {{{ */ @@ -237,8 +300,11 @@ static void wm_config_free(void *ptr) /* {{{ */ if (node == NULL) return; - if (mongo_is_connected(node->conn)) - mongo_destroy(node->conn); + mongoc_database_destroy(node->database); + mongoc_client_destroy(node->client); + node->database = NULL; + node->client = NULL; + node->connected = 0; sfree(node->host); sfree(node); @@ -252,7 +318,7 @@ static int wm_config_node(oconfig_item_t *ci) /* {{{ */ node = calloc(1, sizeof(*node)); if (node == NULL) return (ENOMEM); - mongo_init(node->conn); + mongoc_init(); node->host = NULL; node->store_rates = 1; pthread_mutex_init(&node->lock, /* attr = */ NULL); -- 2.30.2