From fd919402b261f5e07e2348a7f22262893f41daf1 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Sat, 17 Jan 2015 01:27:07 +0100 Subject: [PATCH] store::network: Add a store writer plugin sending objects over the network. The plugin supports sending data to one or more other SysDB instances using the wire-format of the STORE command. --- configure.ac | 8 + src/Makefile.am | 10 ++ src/plugins/store/network.c | 329 ++++++++++++++++++++++++++++++++++++ 3 files changed, 347 insertions(+) create mode 100644 src/plugins/store/network.c diff --git a/configure.ac b/configure.ac index 59f4ee7..325a733 100644 --- a/configure.ac +++ b/configure.ac @@ -672,6 +672,11 @@ Time-series fetchers:]) AC_SDB_PLUGIN([timeseries-rrdtool], [$rrdtool_default], [fetch time-series data from RRD files]) +m4_divert_once([HELP_ENABLE], [ +Store writers:]) +AC_SDB_PLUGIN([store-network], [yes], + [send stored objects to a remote instance]) + m4_divert_once([HELP_ENABLE], [ Plugins:]) @@ -727,6 +732,9 @@ AC_MSG_RESULT() AC_MSG_RESULT([ Time-series fetchers:]) AC_MSG_RESULT([ rrdtool: . . . . . . . . . $enable_timeseries_rrdtool]) AC_MSG_RESULT() +AC_MSG_RESULT([ Store writers:]) +AC_MSG_RESULT([ network: . . . . . . . . . $enable_store_network]) +AC_MSG_RESULT() AC_MSG_RESULT([ Plugins:]) AC_MSG_RESULT([ cname::dns: . . . . . . . . $enable_cname_dns]) AC_MSG_RESULT([ syslog: . . . . . . . . . . $enable_syslog]) diff --git a/src/Makefile.am b/src/Makefile.am index f467f2f..7cff3e8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -153,6 +153,7 @@ pkgbackendcollectdlibdir = $(pkgbackendlibdir)/collectd pkgbackendpuppetlibdir = $(pkgbackendlibdir)/puppet pkgcnamelibdir = $(pkglibdir)/cname pkgtimeserieslibdir = $(pkglibdir)/timeseries +pkgstorelibdir = $(pkglibdir)/store pkglib_LTLIBRARIES = pkgbackendlib_LTLIBRARIES = @@ -160,6 +161,7 @@ pkgbackendcollectdlib_LTLIBRARIES = pkgbackendpuppetlib_LTLIBRARIES = pkgcnamelib_LTLIBRARIES = pkgtimeserieslib_LTLIBRARIES = +pkgstorelib_LTLIBRARIES = if BUILD_PLUGIN_CNAMEDNS pkgcnamelib_LTLIBRARIES += plugins/cname/dns.la @@ -218,6 +220,14 @@ sysdbd_LDADD += -dlopen plugins/timeseries/rrdtool.la sysdbd_DEPENDENCIES += plugins/timeseries/rrdtool.la endif +if BUILD_PLUGIN_STORENETWORK +pkgstorelib_LTLIBRARIES += plugins/store/network.la +plugins_store_network_la_SOURCES = plugins/store/network.c +plugins_store_network_la_LDFLAGS = $(AM_LDFLAGS) libsysdbclient.la -module -avoid-version +sysdbd_LDADD += -dlopen plugins/store/network.la +sysdbd_DEPENDENCIES += plugins/store/network.la +endif + include/client/sysdb.h: include/client/sysdb.h.in ../version source ../version; sed \ -e "s/@SDB_VERSION_MAJOR@/$$VERSION_MAJOR/g" \ diff --git a/src/plugins/store/network.c b/src/plugins/store/network.c new file mode 100644 index 0000000..c9aaec5 --- /dev/null +++ b/src/plugins/store/network.c @@ -0,0 +1,329 @@ +/* + * SysDB - src/plugins/backend/store/network.c + * Copyright (C) 2015 Sebastian 'tokkee' Harl + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include "sysdb.h" +#include "core/plugin.h" +#include "client/sock.h" +#include "utils/error.h" +#include "utils/proto.h" +#include "utils/os.h" + +#include "liboconfig/utils.h" + +#include + +#include +#include +#include + +SDB_PLUGIN_MAGIC; + +/* + * private data types + */ + +typedef struct { + sdb_client_t *client; + char *addr; + char *username; +} user_data_t; +#define UD(obj) ((user_data_t *)(obj)) +#define CLIENT(obj) UD(SDB_OBJ_WRAPPER(obj)->data)->client + +static void +user_data_destroy(void *obj) +{ + user_data_t *ud = UD(obj); + + if (! ud) + return; + + if (ud->client) + sdb_client_destroy(ud->client); + ud->client = NULL; + if (ud->addr) + free(ud->addr); + ud->addr = NULL; + if (ud->username) + free(ud->username); + ud->username = NULL; +} /* user_data_destroy */ + +/* + * store writer implementation + */ + +static int +store_rpc(sdb_client_t *client, const char *msg, size_t msg_len) +{ + sdb_strbuf_t *buf = sdb_strbuf_create(128); + uint32_t rstatus = 0; + ssize_t status; + + status = sdb_client_rpc(client, SDB_CONNECTION_STORE, + (uint32_t)msg_len, msg, &rstatus, buf); + if (status < 0) + sdb_log(SDB_LOG_ERR, "store::network: %s", sdb_strbuf_string(buf)); + else if (rstatus != SDB_CONNECTION_OK) { + sdb_log(SDB_LOG_ERR, "store::network: Failed to send object: %s", + sdb_strbuf_string(buf)); + status = -1; + } + + sdb_strbuf_destroy(buf); + if (status < 0) + return -1; + return 0; +} /* store_rpc */ + +static int +store_host(const char *name, sdb_time_t last_update, sdb_object_t *user_data) +{ + sdb_proto_host_t host = { last_update, name }; + size_t len = sdb_proto_marshal_host(NULL, 0, &host); + char buf[len]; + + sdb_proto_marshal_host(buf, len, &host); + return store_rpc(CLIENT(user_data), buf, len); +} /* store_host */ + +static int +store_service(const char *hostname, const char *name, sdb_time_t last_update, + sdb_object_t *user_data) +{ + sdb_proto_service_t svc = { last_update, hostname, name }; + ssize_t len = sdb_proto_marshal_service(NULL, 0, &svc); + char buf[len]; + + sdb_proto_marshal_service(buf, len, &svc); + return store_rpc(CLIENT(user_data), buf, len); +} /* store_service */ + +static int +store_metric(const char *hostname, const char *name, + sdb_metric_store_t *store, sdb_time_t last_update, + sdb_object_t *user_data) +{ + sdb_proto_metric_t metric = { + last_update, hostname, name, + store ? store->type : NULL, store ? store->id : NULL, + }; + size_t len = sdb_proto_marshal_metric(NULL, 0, &metric); + char buf[len]; + + sdb_proto_marshal_metric(buf, len, &metric); + return store_rpc(CLIENT(user_data), buf, len); +} /* store_metric */ + +static int +store_attr(const char *hostname, const char *key, const sdb_data_t *value, + sdb_time_t last_update, sdb_object_t *user_data) +{ + sdb_proto_attribute_t attr = { + last_update, SDB_HOST, NULL, hostname, key, *value, + }; + size_t len = sdb_proto_marshal_attribute(NULL, 0, &attr); + char buf[len]; + + sdb_proto_marshal_attribute(buf, len, &attr); + return store_rpc(CLIENT(user_data), buf, len); +} /* store_attr */ + +static int +store_service_attr(const char *hostname, const char *service, + const char *key, const sdb_data_t *value, sdb_time_t last_update, + sdb_object_t *user_data) +{ + sdb_proto_attribute_t attr = { + last_update, SDB_SERVICE, hostname, service, key, *value, + }; + size_t len = sdb_proto_marshal_attribute(NULL, 0, &attr); + char buf[len]; + + sdb_proto_marshal_attribute(buf, len, &attr); + return store_rpc(CLIENT(user_data), buf, len); +} /* store_service_attr */ + +static int +store_metric_attr(const char *hostname, const char *metric, + const char *key, const sdb_data_t *value, sdb_time_t last_update, + sdb_object_t *user_data) +{ + sdb_proto_attribute_t attr = { + last_update, SDB_METRIC, hostname, metric, key, *value, + }; + size_t len = sdb_proto_marshal_attribute(NULL, 0, &attr); + char buf[len]; + + sdb_proto_marshal_attribute(buf, len, &attr); + return store_rpc(CLIENT(user_data), buf, len); +} /* store_metric_attr */ + +static sdb_store_writer_t store_impl = { + store_host, store_service, store_metric, + store_attr, store_service_attr, store_metric_attr, +}; + +/* + * plugin API + */ + +static int +store_init(sdb_object_t *user_data) +{ + user_data_t *ud; + + if (! user_data) + return -1; + + ud = SDB_OBJ_WRAPPER(user_data)->data; + if (sdb_client_connect(ud->client, ud->username)) { + sdb_log(SDB_LOG_ERR, "store::network: Failed to connect " + "to SysDB at %s as user %s", ud->addr, ud->username); + return -1; + } + + sdb_log(SDB_LOG_ERR, "store::network: Successfully connected " + "to SysDB at %s as user %s", ud->addr, ud->username); + return 0; +} /* store_init */ + +static int +store_config_server(oconfig_item_t *ci) +{ + sdb_object_t *user_data; + user_data_t *ud; + + int i; + + ud = calloc(1, sizeof(*ud)); + if (! ud) { + char errbuf[1024]; + sdb_log(SDB_LOG_ERR, "store::network: Failed to allocate " + "a user-data object: %s", + sdb_strerror(errno, errbuf, sizeof(errbuf))); + return -1; + } + + if (oconfig_get_string(ci, &ud->addr)) { + sdb_log(SDB_LOG_ERR, "store::network: Server requires " + "a single string argument\n\tUsage: "); + user_data_destroy(ud); + return -1; + } + ud->addr = strdup(ud->addr); + if (! ud->addr) { + sdb_log(SDB_LOG_ERR, "store::network: Failed to duplicate " + "a string"); + user_data_destroy(ud); + return -1; + } + + ud->client = sdb_client_create(ud->addr); + if (! ud->client) { + char errbuf[1024]; + sdb_log(SDB_LOG_ERR, "store::network: Failed to create client " + "connecting to '%s': %s", ud->addr, + sdb_strerror(errno, errbuf, sizeof(errbuf))); + user_data_destroy(ud); + return -1; + } + + for (i = 0; i < ci->children_num; ++i) { + oconfig_item_t *child = ci->children + i; + + if (! strcasecmp(child->key, "Username")) + oconfig_get_string(child, &ud->username); + else + sdb_log(SDB_LOG_WARNING, "store::network: Ignoring " + "unknown config option '%s' inside .", + child->key, ud->addr); + } + + if (ud->username) + ud->username = strdup(ud->username); + if (! ud->username) + ud->username = sdb_get_current_user(); + + user_data = sdb_object_create_wrapper("store-network-userdata", ud, + user_data_destroy); + if (! user_data) { + char errbuf[1024]; + sdb_log(SDB_LOG_ERR, "store::network: Failed to allocate " + "a user-data wrapper object: %s", + sdb_strerror(errno, errbuf, sizeof(errbuf))); + user_data_destroy(ud); + return -1; + } + + sdb_plugin_register_init(ud->addr, store_init, user_data); + sdb_plugin_register_writer(ud->addr, &store_impl, user_data); + sdb_object_deref(user_data); + return 0; +} /* store_config_server */ + +static int +store_config(oconfig_item_t *ci) +{ + int i; + + if (! ci) /* nothing to do to deconfigure this plugin */ + return 0; + + for (i = 0; i < ci->children_num; ++i) { + oconfig_item_t *child = ci->children + i; + + if (! strcasecmp(child->key, "Server")) + store_config_server(child); + else + sdb_log(SDB_LOG_WARNING, "store::network: Ignoring " + "unknown config option '%s'.", child->key); + } + return 0; +} /* store_config */ + +int +sdb_module_init(sdb_plugin_info_t *info) +{ + sdb_plugin_set_info(info, SDB_PLUGIN_INFO_DESC, + "send stored objects to a remote SysDB instance"); + sdb_plugin_set_info(info, SDB_PLUGIN_INFO_COPYRIGHT, + "Copyright (C) 2015 Sebastian 'tokkee' Harl "); + sdb_plugin_set_info(info, SDB_PLUGIN_INFO_LICENSE, "BSD"); + sdb_plugin_set_info(info, SDB_PLUGIN_INFO_VERSION, SDB_VERSION); + sdb_plugin_set_info(info, SDB_PLUGIN_INFO_PLUGIN_VERSION, SDB_VERSION); + + sdb_plugin_register_config(store_config); + return 0; +} /* sdb_module_init */ + +/* vim: set tw=78 sw=4 ts=4 noexpandtab : */ + -- 2.30.2