summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 6743e8a)
raw | patch | inline | side by side (parent: 6743e8a)
author | Florian Forster <octo@collectd.org> | |
Fri, 12 Aug 2016 13:22:49 +0000 (15:22 +0200) | ||
committer | Florian Forster <octo@collectd.org> | |
Fri, 12 Aug 2016 14:14:29 +0000 (16:14 +0200) |
This allows the gRPC plugin to send metrics, via gRPC, to another
collectd instance or other implementation of the "Collectd" gRPC
service.
collectd instance or other implementation of the "Collectd" gRPC
service.
src/collectd.conf.in | patch | blob | history | |
src/collectd.conf.pod | patch | blob | history | |
src/grpc.cc | patch | blob | history |
diff --git a/src/collectd.conf.in b/src/collectd.conf.in
index e06465bf44ac82c256c1303abb41df3dd3bb49e8..e1e02a8a59f7643c697dcc3050747e7329e93383 100644 (file)
--- a/src/collectd.conf.in
+++ b/src/collectd.conf.in
#</Plugin>
#<Plugin grpc>
-# <Listen "0.0.0.0" "50051">
+# <Server "example.com" "50051">
# EnableSSL true
# SSLRootCerts "/path/to/root.pem"
# SSLServerCert "/path/to/server.pem"
# SSLServerKey "/path/to/server.key"
+# </Server>
+# <Listen "0.0.0.0" "50051">
+# EnableSSL true
+# SSLCACertificateFile "/path/to/root.pem"
+# SSLCertificateFile "/path/to/client.pem"
+# SSLCertificateKeyFile "/path/to/client.key"
# </Listen>
#</Plugin>
diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod
index 6d1547a63f3b523ebfe222b969e706a48c65fa31..d8faba58dd278aa03b7f698e5f63c870697bbd39 100644 (file)
--- a/src/collectd.conf.pod
+++ b/src/collectd.conf.pod
=over 4
+=item B<Server> I<Host> I<Port>
+
+The B<Server> statement sets the address of a server to which to send metrics
+via the C<DispatchValues> function.
+
+The argument I<Host> may be a hostname, an IPv4 address, or an IPv6 address.
+
+Optionally, B<Server> may be specified as a configuration block which supports
+the following options:
+
+=over 4
+
+=item B<EnableSSL> B<false>|B<true>
+
+Whether to require SSL for outgoing connections. Default: false.
+
+=item B<SSLCACertificateFile> I<Filename>
+
+=item B<SSLCertificateFile> I<Filename>
+
+=item B<SSLCertificateKeyFile> I<Filename>
+
+Filenames specifying SSL certificate and key material to be used with SSL
+connections.
+
+=back
+
=item B<Listen> I<Host> I<Port>
The B<Listen> statement sets the network address to bind to. When multiple
diff --git a/src/grpc.cc b/src/grpc.cc
index a38abc1dedeb079a0e54f72acc6f1fb608c3dfe8..70c8e6c5e0c988ce23928fe5c95b964d33d3ac75 100644 (file)
--- a/src/grpc.cc
+++ b/src/grpc.cc
std::unique_ptr<grpc::Server> server_;
}; /* class CollectdServer */
+class CollectdClient final
+{
+public:
+ CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel) : stub_(Collectd::NewStub(channel)) {
+ }
+
+ int DispatchValues(value_list_t const *vl) {
+ grpc::ClientContext ctx;
+
+ DispatchValuesRequest req;
+ auto status = marshal_value_list(vl, req.mutable_value_list());
+ if (!status.ok()) {
+ ERROR("grpc: Marshalling value_list_t failed.");
+ return -1;
+ }
+
+ DispatchValuesResponse res;
+ auto stream = stub_->DispatchValues(&ctx, &res);
+ if (!stream->Write(req)) {
+ NOTICE("grpc: Broken stream.");
+ /* intentionally not returning. */
+ }
+
+ stream->WritesDone();
+ status = stream->Finish();
+ if (!status.ok()) {
+ ERROR ("grpc: Error while closing stream.");
+ return -1;
+ }
+
+ return 0;
+ } /* int DispatchValues */
+
+private:
+ std::unique_ptr<Collectd::Stub> stub_;
+};
+
static CollectdServer *server = nullptr;
/*
* collectd plugin interface
*/
extern "C" {
+ static void c_grpc_destroy_write_callback (void *ptr) {
+ delete (CollectdClient *) ptr;
+ }
+
+ static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
+ value_list_t const *vl,
+ user_data_t *ud) {
+ CollectdClient *c = (CollectdClient *) ud->data;
+ return c->DispatchValues(vl);
+ }
+
static int c_grpc_config_listen(oconfig_item_t *ci)
{
if ((ci->values_num != 2)
return 0;
} /* c_grpc_config_listen() */
+ static int c_grpc_config_server(oconfig_item_t *ci)
+ {
+ if ((ci->values_num != 2)
+ || (ci->values[0].type != OCONFIG_TYPE_STRING)
+ || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
+ ERROR("grpc: The `%s` config option needs exactly "
+ "two string argument (address and port).", ci->key);
+ return -1;
+ }
+
+ grpc::SslCredentialsOptions ssl_opts;
+ bool use_ssl = false;
+
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (!strcasecmp("EnableSSL", child->key)) {
+ if (cf_util_get_boolean(child, &use_ssl)) {
+ return -1;
+ }
+ }
+ else if (!strcasecmp("SSLCACertificateFile", child->key)) {
+ char *certs = NULL;
+ if (cf_util_get_string(child, &certs)) {
+ return -1;
+ }
+ ssl_opts.pem_root_certs = read_file(certs);
+ }
+ else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
+ char *key = NULL;
+ if (cf_util_get_string(child, &key)) {
+ return -1;
+ }
+ ssl_opts.pem_private_key = read_file(key);
+ }
+ else if (!strcasecmp("SSLCertificateFile", child->key)) {
+ char *cert = NULL;
+ if (cf_util_get_string(child, &cert)) {
+ return -1;
+ }
+ ssl_opts.pem_cert_chain = read_file(cert);
+ }
+ else {
+ WARNING("grpc: Option `%s` not allowed in <%s> block.",
+ child->key, ci->key);
+ }
+ }
+
+ auto node = grpc::string(ci->values[0].value.string);
+ auto service = grpc::string(ci->values[1].value.string);
+ auto addr = node + ":" + service;
+
+ CollectdClient *client;
+ if (use_ssl) {
+ auto channel_creds = grpc::SslCredentials(ssl_opts);
+ auto channel = grpc::CreateChannel(addr, channel_creds);
+ client = new CollectdClient(channel);
+ } else {
+ auto channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
+ client = new CollectdClient(channel);
+ }
+
+ auto callback_name = grpc::string("grpc/") + addr;
+ user_data_t ud = {
+ .data = client,
+ .free_func = c_grpc_destroy_write_callback,
+ };
+
+ plugin_register_write (callback_name.c_str(), c_grpc_write, &ud);
+ return 0;
+ } /* c_grpc_config_server() */
+
static int c_grpc_config(oconfig_item_t *ci)
{
int i;
if (c_grpc_config_listen(child))
return -1;
}
+ else if (!strcasecmp("Server", child->key)) {
+ if (c_grpc_config_server(child))
+ return -1;
+ }
+
else {
WARNING("grpc: Option `%s` not allowed here.", child->key);
}