Code

store::network: Add a store writer plugin sending objects over the network.
authorSebastian Harl <sh@tokkee.org>
Sat, 17 Jan 2015 00:27:07 +0000 (01:27 +0100)
committerSebastian Harl <sh@tokkee.org>
Sat, 17 Jan 2015 19:31:47 +0000 (11:31 -0800)
The plugin supports sending data to one or more other SysDB instances using
the wire-format of the STORE command.

configure.ac
src/Makefile.am
src/plugins/store/network.c [new file with mode: 0644]

index 59f4ee7c413c0e5c5cdfeb2f66309d0e86d5d89e..325a733a2a5d526694bbdd5b58a7cf8d452d4414 100644 (file)
@@ -672,6 +672,11 @@ Time-series fetchers:])
 AC_SDB_PLUGIN([timeseries-rrdtool], [$rrdtool_default],
                [fetch time-series data from RRD files])
 
+m4_divert_once([HELP_ENABLE], [
+Store writers:])
+AC_SDB_PLUGIN([store-network], [yes],
+               [send stored objects to a remote instance])
+
 m4_divert_once([HELP_ENABLE], [
 Plugins:])
 
@@ -727,6 +732,9 @@ AC_MSG_RESULT()
 AC_MSG_RESULT([  Time-series fetchers:])
 AC_MSG_RESULT([    rrdtool:  . . . . . . . . . $enable_timeseries_rrdtool])
 AC_MSG_RESULT()
+AC_MSG_RESULT([  Store writers:])
+AC_MSG_RESULT([    network:  . . . . . . . . . $enable_store_network])
+AC_MSG_RESULT()
 AC_MSG_RESULT([  Plugins:])
 AC_MSG_RESULT([    cname::dns: . . . . . . . . $enable_cname_dns])
 AC_MSG_RESULT([    syslog: . . . . . . . . . . $enable_syslog])
index f467f2f9bf075db5f47fe34a7d57014c695d2958..7cff3e8a5b5dafc05568cc41c6ab8f435de6a46d 100644 (file)
@@ -153,6 +153,7 @@ pkgbackendcollectdlibdir = $(pkgbackendlibdir)/collectd
 pkgbackendpuppetlibdir = $(pkgbackendlibdir)/puppet
 pkgcnamelibdir = $(pkglibdir)/cname
 pkgtimeserieslibdir = $(pkglibdir)/timeseries
+pkgstorelibdir = $(pkglibdir)/store
 
 pkglib_LTLIBRARIES =
 pkgbackendlib_LTLIBRARIES =
@@ -160,6 +161,7 @@ pkgbackendcollectdlib_LTLIBRARIES =
 pkgbackendpuppetlib_LTLIBRARIES =
 pkgcnamelib_LTLIBRARIES =
 pkgtimeserieslib_LTLIBRARIES =
+pkgstorelib_LTLIBRARIES =
 
 if BUILD_PLUGIN_CNAMEDNS
 pkgcnamelib_LTLIBRARIES += plugins/cname/dns.la
@@ -218,6 +220,14 @@ sysdbd_LDADD += -dlopen plugins/timeseries/rrdtool.la
 sysdbd_DEPENDENCIES += plugins/timeseries/rrdtool.la
 endif
 
+if BUILD_PLUGIN_STORENETWORK
+pkgstorelib_LTLIBRARIES += plugins/store/network.la
+plugins_store_network_la_SOURCES = plugins/store/network.c
+plugins_store_network_la_LDFLAGS = $(AM_LDFLAGS) libsysdbclient.la -module -avoid-version
+sysdbd_LDADD += -dlopen plugins/store/network.la
+sysdbd_DEPENDENCIES += plugins/store/network.la
+endif
+
 include/client/sysdb.h: include/client/sysdb.h.in ../version
        source ../version; sed \
            -e "s/@SDB_VERSION_MAJOR@/$$VERSION_MAJOR/g" \
diff --git a/src/plugins/store/network.c b/src/plugins/store/network.c
new file mode 100644 (file)
index 0000000..c9aaec5
--- /dev/null
@@ -0,0 +1,329 @@
+/*
+ * SysDB - src/plugins/backend/store/network.c
+ * Copyright (C) 2015 Sebastian 'tokkee' Harl <sh@tokkee.org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#if HAVE_CONFIG_H
+#      include "config.h"
+#endif
+
+#include "sysdb.h"
+#include "core/plugin.h"
+#include "client/sock.h"
+#include "utils/error.h"
+#include "utils/proto.h"
+#include "utils/os.h"
+
+#include "liboconfig/utils.h"
+
+#include <errno.h>
+
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+
+SDB_PLUGIN_MAGIC;
+
+/*
+ * private data types
+ */
+
+typedef struct {
+       sdb_client_t *client;
+       char *addr;
+       char *username;
+} user_data_t;
+#define UD(obj) ((user_data_t *)(obj))
+#define CLIENT(obj) UD(SDB_OBJ_WRAPPER(obj)->data)->client
+
+static void
+user_data_destroy(void *obj)
+{
+       user_data_t *ud = UD(obj);
+
+       if (! ud)
+               return;
+
+       if (ud->client)
+               sdb_client_destroy(ud->client);
+       ud->client = NULL;
+       if (ud->addr)
+               free(ud->addr);
+       ud->addr = NULL;
+       if (ud->username)
+               free(ud->username);
+       ud->username = NULL;
+} /* user_data_destroy */
+
+/*
+ * store writer implementation
+ */
+
+static int
+store_rpc(sdb_client_t *client, const char *msg, size_t msg_len)
+{
+       sdb_strbuf_t *buf = sdb_strbuf_create(128);
+       uint32_t rstatus = 0;
+       ssize_t status;
+
+       status = sdb_client_rpc(client, SDB_CONNECTION_STORE,
+                       (uint32_t)msg_len, msg, &rstatus, buf);
+       if (status < 0)
+               sdb_log(SDB_LOG_ERR, "store::network: %s", sdb_strbuf_string(buf));
+       else if (rstatus != SDB_CONNECTION_OK) {
+               sdb_log(SDB_LOG_ERR, "store::network: Failed to send object: %s",
+                               sdb_strbuf_string(buf));
+               status = -1;
+       }
+
+       sdb_strbuf_destroy(buf);
+       if (status < 0)
+               return -1;
+       return 0;
+} /* store_rpc */
+
+static int
+store_host(const char *name, sdb_time_t last_update, sdb_object_t *user_data)
+{
+       sdb_proto_host_t host = { last_update, name };
+       size_t len = sdb_proto_marshal_host(NULL, 0, &host);
+       char buf[len];
+
+       sdb_proto_marshal_host(buf, len, &host);
+       return store_rpc(CLIENT(user_data), buf, len);
+} /* store_host */
+
+static int
+store_service(const char *hostname, const char *name, sdb_time_t last_update,
+               sdb_object_t *user_data)
+{
+       sdb_proto_service_t svc = { last_update, hostname, name };
+       ssize_t len = sdb_proto_marshal_service(NULL, 0, &svc);
+       char buf[len];
+
+       sdb_proto_marshal_service(buf, len, &svc);
+       return store_rpc(CLIENT(user_data), buf, len);
+} /* store_service */
+
+static int
+store_metric(const char *hostname, const char *name,
+               sdb_metric_store_t *store, sdb_time_t last_update,
+               sdb_object_t *user_data)
+{
+       sdb_proto_metric_t metric = {
+               last_update, hostname, name,
+               store ? store->type : NULL, store ? store->id : NULL,
+       };
+       size_t len = sdb_proto_marshal_metric(NULL, 0, &metric);
+       char buf[len];
+
+       sdb_proto_marshal_metric(buf, len, &metric);
+       return store_rpc(CLIENT(user_data), buf, len);
+} /* store_metric */
+
+static int
+store_attr(const char *hostname, const char *key, const sdb_data_t *value,
+               sdb_time_t last_update, sdb_object_t *user_data)
+{
+       sdb_proto_attribute_t attr = {
+               last_update, SDB_HOST, NULL, hostname, key, *value,
+       };
+       size_t len = sdb_proto_marshal_attribute(NULL, 0, &attr);
+       char buf[len];
+
+       sdb_proto_marshal_attribute(buf, len, &attr);
+       return store_rpc(CLIENT(user_data), buf, len);
+} /* store_attr */
+
+static int
+store_service_attr(const char *hostname, const char *service,
+               const char *key, const sdb_data_t *value, sdb_time_t last_update,
+               sdb_object_t *user_data)
+{
+       sdb_proto_attribute_t attr = {
+               last_update, SDB_SERVICE, hostname, service, key, *value,
+       };
+       size_t len = sdb_proto_marshal_attribute(NULL, 0, &attr);
+       char buf[len];
+
+       sdb_proto_marshal_attribute(buf, len, &attr);
+       return store_rpc(CLIENT(user_data), buf, len);
+} /* store_service_attr */
+
+static int
+store_metric_attr(const char *hostname, const char *metric,
+               const char *key, const sdb_data_t *value, sdb_time_t last_update,
+               sdb_object_t *user_data)
+{
+       sdb_proto_attribute_t attr = {
+               last_update, SDB_METRIC, hostname, metric, key, *value,
+       };
+       size_t len = sdb_proto_marshal_attribute(NULL, 0, &attr);
+       char buf[len];
+
+       sdb_proto_marshal_attribute(buf, len, &attr);
+       return store_rpc(CLIENT(user_data), buf, len);
+} /* store_metric_attr */
+
+static sdb_store_writer_t store_impl = {
+       store_host, store_service, store_metric,
+       store_attr, store_service_attr, store_metric_attr,
+};
+
+/*
+ * plugin API
+ */
+
+static int
+store_init(sdb_object_t *user_data)
+{
+       user_data_t *ud;
+
+       if (! user_data)
+               return -1;
+
+       ud = SDB_OBJ_WRAPPER(user_data)->data;
+       if (sdb_client_connect(ud->client, ud->username)) {
+               sdb_log(SDB_LOG_ERR, "store::network: Failed to connect "
+                               "to SysDB at %s as user %s", ud->addr, ud->username);
+               return -1;
+       }
+
+       sdb_log(SDB_LOG_ERR, "store::network: Successfully connected "
+                       "to SysDB at %s as user %s", ud->addr, ud->username);
+       return 0;
+} /* store_init */
+
+static int
+store_config_server(oconfig_item_t *ci)
+{
+       sdb_object_t *user_data;
+       user_data_t *ud;
+
+       int i;
+
+       ud = calloc(1, sizeof(*ud));
+       if (! ud) {
+               char errbuf[1024];
+               sdb_log(SDB_LOG_ERR, "store::network: Failed to allocate "
+                               "a user-data object: %s",
+                               sdb_strerror(errno, errbuf, sizeof(errbuf)));
+               return -1;
+       }
+
+       if (oconfig_get_string(ci, &ud->addr)) {
+               sdb_log(SDB_LOG_ERR, "store::network: Server requires "
+                               "a single string argument\n\tUsage: <Server ADDRESS>");
+               user_data_destroy(ud);
+               return -1;
+       }
+       ud->addr = strdup(ud->addr);
+       if (! ud->addr) {
+               sdb_log(SDB_LOG_ERR, "store::network: Failed to duplicate "
+                               "a string");
+               user_data_destroy(ud);
+               return -1;
+       }
+
+       ud->client = sdb_client_create(ud->addr);
+       if (! ud->client) {
+               char errbuf[1024];
+               sdb_log(SDB_LOG_ERR, "store::network: Failed to create client "
+                               "connecting to '%s': %s", ud->addr,
+                               sdb_strerror(errno, errbuf, sizeof(errbuf)));
+               user_data_destroy(ud);
+               return -1;
+       }
+
+       for (i = 0; i < ci->children_num; ++i) {
+               oconfig_item_t *child = ci->children + i;
+
+               if (! strcasecmp(child->key, "Username"))
+                       oconfig_get_string(child, &ud->username);
+               else
+                       sdb_log(SDB_LOG_WARNING, "store::network: Ignoring "
+                                       "unknown config option '%s' inside <Server %s>.",
+                                       child->key, ud->addr);
+       }
+
+       if (ud->username)
+               ud->username = strdup(ud->username);
+       if (! ud->username)
+               ud->username = sdb_get_current_user();
+
+       user_data = sdb_object_create_wrapper("store-network-userdata", ud,
+                       user_data_destroy);
+       if (! user_data) {
+               char errbuf[1024];
+               sdb_log(SDB_LOG_ERR, "store::network: Failed to allocate "
+                               "a user-data wrapper object: %s",
+                               sdb_strerror(errno, errbuf, sizeof(errbuf)));
+               user_data_destroy(ud);
+               return -1;
+       }
+
+       sdb_plugin_register_init(ud->addr, store_init, user_data);
+       sdb_plugin_register_writer(ud->addr, &store_impl, user_data);
+       sdb_object_deref(user_data);
+       return 0;
+} /* store_config_server */
+
+static int
+store_config(oconfig_item_t *ci)
+{
+       int i;
+
+       if (! ci) /* nothing to do to deconfigure this plugin */
+               return 0;
+
+       for (i = 0; i < ci->children_num; ++i) {
+               oconfig_item_t *child = ci->children + i;
+
+               if (! strcasecmp(child->key, "Server"))
+                       store_config_server(child);
+               else
+                       sdb_log(SDB_LOG_WARNING, "store::network: Ignoring "
+                                       "unknown config option '%s'.", child->key);
+       }
+       return 0;
+} /* store_config */
+
+int
+sdb_module_init(sdb_plugin_info_t *info)
+{
+       sdb_plugin_set_info(info, SDB_PLUGIN_INFO_DESC,
+                       "send stored objects to a remote SysDB instance");
+       sdb_plugin_set_info(info, SDB_PLUGIN_INFO_COPYRIGHT,
+                       "Copyright (C) 2015 Sebastian 'tokkee' Harl <sh@tokkee.org>");
+       sdb_plugin_set_info(info, SDB_PLUGIN_INFO_LICENSE, "BSD");
+       sdb_plugin_set_info(info, SDB_PLUGIN_INFO_VERSION, SDB_VERSION);
+       sdb_plugin_set_info(info, SDB_PLUGIN_INFO_PLUGIN_VERSION, SDB_VERSION);
+
+       sdb_plugin_register_config(store_config);
+       return 0;
+} /* sdb_module_init */
+
+/* vim: set tw=78 sw=4 ts=4 noexpandtab : */
+