diff --git a/src/grpc.cc b/src/grpc.cc
index ca3314ecf10cf743828d076c79b4e2217968fc91..8b76954d6748090a795d2eaf09fcfe7a8110f7c0 100644 (file)
--- a/src/grpc.cc
+++ b/src/grpc.cc
#include <fstream>
#include <iostream>
+#include <queue>
#include <vector>
#include "collectd.grpc.pb.h"
#include "collectd.h"
#include "common.h"
-#include "configfile.h"
#include "plugin.h"
#include "daemon/utils_cache.h"
}
using collectd::Collectd;
-using collectd::Dispatch;
using collectd::DispatchValuesRequest;
-using collectd::DispatchValuesReply;
+using collectd::DispatchValuesResponse;
using collectd::QueryValuesRequest;
-using collectd::QueryValuesReply;
+using collectd::QueryValuesResponse;
using google::protobuf::util::TimeUtil;
@@ -258,189 +257,119 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
} /* unmarshal_value_list() */
/*
- * request call-backs and call objects
+ * Collectd service
*/
-static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesRequest request, DispatchValuesReply *reply)
-{
- value_list_t vl = VALUE_LIST_INIT;
- auto status = unmarshal_value_list(request.value_list(), &vl);
- if (!status.ok())
- return status;
-
- if (plugin_dispatch_values(&vl))
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to enqueue values for writing"));
+class CollectdImpl : public collectd::Collectd::Service {
+public:
+ grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
+ value_list_t match;
+ auto status = unmarshal_ident(req->identifier(), &match, false);
+ if (!status.ok()) {
+ return status;
+ }
- reply->Clear();
- return status;
-} /* grpc::Status DispatchValue */
+ std::queue<value_list_t> value_lists;
+ status = this->queryValuesRead(&match, &value_lists);
+ if (status.ok()) {
+ status = this->queryValuesWrite(ctx, writer, &value_lists);
+ }
-static grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest req, QueryValuesReply *res)
-{
- uc_iter_t *iter;
- char *name = NULL;
+ while (!value_lists.empty()) {
+ auto vl = value_lists.front();
+ value_lists.pop();
+ sfree(vl.values);
+ }
- value_list_t matcher;
- auto status = unmarshal_ident(req.identifier(), &matcher, false);
- if (!status.ok())
return status;
-
- if ((iter = uc_get_iterator()) == NULL) {
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to query values: cannot create iterator"));
}
- status = grpc::Status::OK;
- while (uc_iterator_next(iter, &name) == 0) {
- value_list_t vl;
- if (parse_identifier_vl(name, &vl) != 0) {
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to parse identifier"));
- break;
- }
+ grpc::Status DispatchValues(grpc::ServerContext *ctx,
+ grpc::ServerReader<DispatchValuesRequest> *reader,
+ DispatchValuesResponse *res) override {
+ DispatchValuesRequest req;
- if (!ident_matches(&vl, &matcher))
- continue;
+ while (reader->Read(&req)) {
+ value_list_t vl = VALUE_LIST_INIT;
+ auto status = unmarshal_value_list(req.value_list(), &vl);
+ if (!status.ok())
+ return status;
- if (uc_iterator_get_time(iter, &vl.time) < 0) {
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve value timestamp"));
- break;
- }
- if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve value interval"));
- break;
- }
- if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve values"));
- break;
+ if (plugin_dispatch_values(&vl))
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to enqueue values for writing"));
}
- auto pb_vl = res->add_value_lists();
- status = marshal_value_list(&vl, pb_vl);
- free(vl.values);
- if (!status.ok())
- break;
+ res->Clear();
+ return grpc::Status::OK;
}
- uc_iterator_destroy(iter);
+private:
+ grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
+ uc_iter_t *iter;
+ if ((iter = uc_get_iterator()) == NULL) {
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to query values: cannot create iterator"));
+ }
- return status;
-} /* grpc::Status QueryValues */
+ grpc::Status status = grpc::Status::OK;
+ char *name = NULL;
+ while (uc_iterator_next(iter, &name) == 0) {
+ value_list_t vl;
+ if (parse_identifier_vl(name, &vl) != 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to parse identifier"));
+ break;
+ }
-// CallData is the abstract base class for asynchronous calls.
-class CallData {
-public:
- virtual ~CallData() {}
- virtual void process(bool ok) = 0;
+ if (!ident_matches(&vl, match))
+ continue;
-protected:
- CallData() {}
+ if (uc_iterator_get_time(iter, &vl.time) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value timestamp"));
+ break;
+ }
+ if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value interval"));
+ break;
+ }
+ if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve values"));
+ break;
+ }
-private:
- CallData(const CallData&) = delete;
- CallData& operator=(const CallData&) = delete;
-};
+ value_lists->push(vl);
+ } // while (uc_iterator_next(iter, &name) == 0)
-/*
- * Collectd service
- */
-// QueryValuesCallData holds the state and implements the logic for QueryValues calls.
-class QueryValuesCallData : public CallData {
-public:
- QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq)
- : cq_(cq), service_(service), writer_(&context_) {
- // As part of the initialization, we *request* that the system start
- // processing QueryValues requests. In this request, "this" acts as
- // the tag uniquely identifying the request (so that different
- // QueryValuesCallData instances can serve different requests
- // concurrently), in this case the memory address of this
- // QueryValuesCallData instance.
- service->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
+ uc_iterator_destroy(iter);
+ return status;
}
- void process(bool ok) final {
- if (done_) {
- delete this;
- } else {
- // Spawn a new QueryValuesCallData instance to serve new clients
- // while we process the one for this QueryValuesCallData. The
- // instance will deallocate itself as part of its FINISH state.
- new QueryValuesCallData(service_, cq_);
+ grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
+ grpc::ServerWriter<QueryValuesResponse> *writer,
+ std::queue<value_list_t> *value_lists) {
+ while (!value_lists->empty()) {
+ auto vl = value_lists->front();
+ QueryValuesResponse res;
+ res.Clear();
- auto status = QueryValues(&context_, request_, &response_);
+ auto status = marshal_value_list(&vl, res.mutable_value_list());
if (!status.ok()) {
- writer_.FinishWithError(status, this);
- } else {
- writer_.Finish(response_, grpc::Status::OK, this);
+ return status;
}
- done_ = true;
- }
- }
-
-private:
- bool done_ = false;
- grpc::ServerContext context_;
- grpc::ServerCompletionQueue* cq_;
- Collectd::AsyncService* service_;
- QueryValuesRequest request_;
- QueryValuesReply response_;
- grpc::ServerAsyncResponseWriter<QueryValuesReply> writer_;
-};
-
-/*
- * Dispatch service
- */
-// DispatchValuesCallData holds the state and implements the logic for DispatchValues calls.
-class DispatchValuesCallData : public CallData {
-public:
- DispatchValuesCallData(Dispatch::AsyncService* service, grpc::ServerCompletionQueue* cq)
- : cq_(cq), service_(service), reader_(&context_) {
- process(true);
- }
+ if (!writer->Write(res)) {
+ return grpc::Status::CANCELLED;
+ }
- void process(bool ok) final {
- if (status == Status::INIT) {
- service_->RequestDispatchValues(&context_, &reader_, cq_, cq_, this);
- status = Status::CALL;
- } else if (status == Status::CALL) {
- reader_.Read(&request_, this);
- status = Status::READ;
- } else if (status == Status::READ && ok) {
- (void) DispatchValue(&context_, request_, &response_);
-
- reader_.Read(&request_, this);
- } else if (status == Status::READ) {
- response_.Clear();
-
- status = Status::DONE;
- } else if (status == Status::DONE) {
- new DispatchValuesCallData(service_, cq_);
- delete this;
- } else {
- ERROR("grpc: DispatchValuesCallData: invalid state");
+ value_lists->pop();
+ sfree(vl.values);
}
- }
-private:
- enum class Status {
- INIT,
- CALL,
- READ,
- DONE,
- };
- Status status = Status::INIT;
-
- grpc::ServerContext context_;
- grpc::ServerCompletionQueue* cq_;
- Dispatch::AsyncService* service_;
-
- DispatchValuesRequest request_;
- DispatchValuesReply response_;
- grpc::ServerAsyncReader<DispatchValuesReply, DispatchValuesRequest> reader_;
+ return grpc::Status::OK;
+ }
};
/*
}
}
- cq_ = builder.AddCompletionQueue();
-
builder.RegisterService(&collectd_service_);
- builder.RegisterService(&dispatch_service_);
server_ = builder.BuildAndStart();
- new QueryValuesCallData(&collectd_service_, cq_.get());
- new DispatchValuesCallData(&dispatch_service_, cq_.get());
} /* Start() */
void Shutdown()
{
server_->Shutdown();
- cq_->Shutdown();
} /* Shutdown() */
- void Mainloop()
- {
- while (true) {
- void *tag = NULL;
- bool ok = false;
+private:
+ CollectdImpl collectd_service_;
+
+ std::unique_ptr<grpc::Server> server_;
+}; /* class CollectdServer */
+
+class CollectdClient final
+{
+public:
+ CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel) : stub_(Collectd::NewStub(channel)) {
+ }
+
+ int DispatchValues(value_list_t const *vl) {
+ grpc::ClientContext ctx;
- // Block waiting to read the next event from the completion queue.
- // The event is uniquely identified by its tag, which in this case
- // is the memory address of a CallData instance.
- if (!cq_->Next(&tag, &ok))
- break; // Queue shut down.
+ DispatchValuesRequest req;
+ auto status = marshal_value_list(vl, req.mutable_value_list());
+ if (!status.ok()) {
+ ERROR("grpc: Marshalling value_list_t failed.");
+ return -1;
+ }
- static_cast<CallData*>(tag)->process(ok);
+ DispatchValuesResponse res;
+ auto stream = stub_->DispatchValues(&ctx, &res);
+ if (!stream->Write(req)) {
+ NOTICE("grpc: Broken stream.");
+ /* intentionally not returning. */
}
- } /* Mainloop() */
-private:
- Collectd::AsyncService collectd_service_;
- Dispatch::AsyncService dispatch_service_;
+ stream->WritesDone();
+ status = stream->Finish();
+ if (!status.ok()) {
+ ERROR ("grpc: Error while closing stream.");
+ return -1;
+ }
- std::unique_ptr<grpc::Server> server_;
- std::unique_ptr<grpc::ServerCompletionQueue> cq_;
-}; /* class CollectdServer */
+ return 0;
+ } /* int DispatchValues */
+
+private:
+ std::unique_ptr<Collectd::Stub> stub_;
+};
static CollectdServer *server = nullptr;
* collectd plugin interface
*/
extern "C" {
- static pthread_t *workers;
- static size_t workers_num = 5;
+ static void c_grpc_destroy_write_callback (void *ptr) {
+ delete (CollectdClient *) ptr;
+ }
- static void *worker_thread(void *arg)
- {
- CollectdServer *s = (CollectdServer *)arg;
- s->Mainloop();
- return NULL;
- } /* worker_thread() */
+ static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
+ value_list_t const *vl,
+ user_data_t *ud) {
+ CollectdClient *c = (CollectdClient *) ud->data;
+ return c->DispatchValues(vl);
+ }
static int c_grpc_config_listen(oconfig_item_t *ci)
{
return -1;
}
}
- else if (!strcasecmp("SSLRootCerts", child->key)) {
+ else if (!strcasecmp("SSLCACertificateFile", child->key)) {
char *certs = NULL;
if (cf_util_get_string(child, &certs)) {
ERROR("grpc: Option `%s` expects a string value",
}
ssl_opts->pem_root_certs = read_file(certs);
}
- else if (!strcasecmp("SSLServerKey", child->key)) {
+ else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
char *key = NULL;
if (cf_util_get_string(child, &key)) {
ERROR("grpc: Option `%s` expects a string value",
}
pkcp.private_key = read_file(key);
}
- else if (!strcasecmp("SSLServerCert", child->key)) {
+ else if (!strcasecmp("SSLCertificateFile", child->key)) {
char *cert = NULL;
if (cf_util_get_string(child, &cert)) {
ERROR("grpc: Option `%s` expects a string value",
return 0;
} /* c_grpc_config_listen() */
+ static int c_grpc_config_server(oconfig_item_t *ci)
+ {
+ if ((ci->values_num != 2)
+ || (ci->values[0].type != OCONFIG_TYPE_STRING)
+ || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
+ ERROR("grpc: The `%s` config option needs exactly "
+ "two string argument (address and port).", ci->key);
+ return -1;
+ }
+
+ grpc::SslCredentialsOptions ssl_opts;
+ bool use_ssl = false;
+
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (!strcasecmp("EnableSSL", child->key)) {
+ if (cf_util_get_boolean(child, &use_ssl)) {
+ return -1;
+ }
+ }
+ else if (!strcasecmp("SSLCACertificateFile", child->key)) {
+ char *certs = NULL;
+ if (cf_util_get_string(child, &certs)) {
+ return -1;
+ }
+ ssl_opts.pem_root_certs = read_file(certs);
+ }
+ else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
+ char *key = NULL;
+ if (cf_util_get_string(child, &key)) {
+ return -1;
+ }
+ ssl_opts.pem_private_key = read_file(key);
+ }
+ else if (!strcasecmp("SSLCertificateFile", child->key)) {
+ char *cert = NULL;
+ if (cf_util_get_string(child, &cert)) {
+ return -1;
+ }
+ ssl_opts.pem_cert_chain = read_file(cert);
+ }
+ else {
+ WARNING("grpc: Option `%s` not allowed in <%s> block.",
+ child->key, ci->key);
+ }
+ }
+
+ auto node = grpc::string(ci->values[0].value.string);
+ auto service = grpc::string(ci->values[1].value.string);
+ auto addr = node + ":" + service;
+
+ CollectdClient *client;
+ if (use_ssl) {
+ auto channel_creds = grpc::SslCredentials(ssl_opts);
+ auto channel = grpc::CreateChannel(addr, channel_creds);
+ client = new CollectdClient(channel);
+ } else {
+ auto channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
+ client = new CollectdClient(channel);
+ }
+
+ auto callback_name = grpc::string("grpc/") + addr;
+ user_data_t ud = {
+ .data = client,
+ .free_func = c_grpc_destroy_write_callback,
+ };
+
+ plugin_register_write (callback_name.c_str(), c_grpc_write, &ud);
+ return 0;
+ } /* c_grpc_config_server() */
+
static int c_grpc_config(oconfig_item_t *ci)
{
int i;
if (c_grpc_config_listen(child))
return -1;
}
- else if (!strcasecmp("WorkerThreads", child->key)) {
- int n;
- if (cf_util_get_int(child, &n))
+ else if (!strcasecmp("Server", child->key)) {
+ if (c_grpc_config_server(child))
return -1;
- workers_num = (size_t)n;
}
+
else {
WARNING("grpc: Option `%s` not allowed here.", child->key);
}
static int c_grpc_init(void)
{
server = new CollectdServer();
- size_t i;
-
- if (! server) {
+ if (!server) {
ERROR("grpc: Failed to create server");
return -1;
}
- workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
- if (! workers) {
- delete server;
- server = nullptr;
-
- ERROR("grpc: Failed to allocate worker threads");
- return -1;
- }
-
server->Start();
- for (i = 0; i < workers_num; i++) {
- plugin_thread_create(&workers[i], /* attr = */ NULL,
- worker_thread, server);
- }
- INFO("grpc: Started %zu workers", workers_num);
return 0;
} /* c_grpc_init() */
static int c_grpc_shutdown(void)
{
- size_t i;
-
if (!server)
- return -1;
+ return 0;
server->Shutdown();
- INFO("grpc: Waiting for %zu workers to terminate", workers_num);
- for (i = 0; i < workers_num; i++)
- pthread_join(workers[i], NULL);
- free(workers);
- workers = NULL;
- workers_num = 0;
-
delete server;
server = nullptr;