From 57cea0b217d7ee6e8d8f146d6c018421559503c3 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Wed, 10 Aug 2016 14:42:59 +0200 Subject: [PATCH] grpc plugin: Move all functions to a single service again. --- proto/collectd.proto | 2 -- src/grpc.cc | 57 +++++++++++++++++++------------------------- 2 files changed, 24 insertions(+), 35 deletions(-) diff --git a/proto/collectd.proto b/proto/collectd.proto index 8fea4f4f..0ff6e771 100644 --- a/proto/collectd.proto +++ b/proto/collectd.proto @@ -32,9 +32,7 @@ import "types.proto"; service Collectd { // Query a list of values available from collectd's value cache. rpc QueryValues(QueryValuesRequest) returns (stream QueryValuesResponse); -} -service Dispatch { // DispatchValues sends a stream of ValueLists to the server. rpc DispatchValues(stream DispatchValuesRequest) returns (DispatchValuesResponse); } diff --git a/src/grpc.cc b/src/grpc.cc index 5a5899dd..53b8fa53 100644 --- a/src/grpc.cc +++ b/src/grpc.cc @@ -47,7 +47,6 @@ extern "C" { } using collectd::Collectd; -using collectd::Dispatch; using collectd::DispatchValuesRequest; using collectd::DispatchValuesResponse; @@ -271,9 +270,9 @@ public: } std::queue value_lists; - err = this->read(&match, &value_lists); + err = this->queryValuesRead(&match, &value_lists); if (err.ok()) { - err = this->write(ctx, writer, &value_lists); + err = this->queryValuesWrite(ctx, writer, &value_lists); } while (!value_lists.empty()) { @@ -285,8 +284,28 @@ public: return err; } + grpc::Status DispatchValues(grpc::ServerContext *ctx, + grpc::ServerReader *reader, + DispatchValuesResponse *res) override { + DispatchValuesRequest req; + + while (reader->Read(&req)) { + value_list_t vl = VALUE_LIST_INIT; + auto status = unmarshal_value_list(req.value_list(), &vl); + if (!status.ok()) + return status; + + if (plugin_dispatch_values(&vl)) + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to enqueue values for writing")); + } + + res->Clear(); + return grpc::Status::OK; + } + private: - grpc::Status read(value_list_t const *match, std::queue *value_lists) { + grpc::Status queryValuesRead(value_list_t const *match, std::queue *value_lists) { uc_iter_t *iter; if ((iter = uc_get_iterator()) == NULL) { return grpc::Status(grpc::StatusCode::INTERNAL, @@ -329,7 +348,7 @@ private: return status; } - grpc::Status write(grpc::ServerContext *ctx, + grpc::Status queryValuesWrite(grpc::ServerContext *ctx, grpc::ServerWriter *writer, std::queue *value_lists) { while (!value_lists->empty()) { @@ -354,32 +373,6 @@ private: } }; -/* - * Dispatch service - */ -class DispatchImpl : public collectd::Dispatch::Service { -public: - grpc::Status DispatchValues(grpc::ServerContext *ctx, - grpc::ServerReader *reader, - DispatchValuesResponse *res) override { - DispatchValuesRequest req; - - while (reader->Read(&req)) { - value_list_t vl = VALUE_LIST_INIT; - auto status = unmarshal_value_list(req.value_list(), &vl); - if (!status.ok()) - return status; - - if (plugin_dispatch_values(&vl)) - return grpc::Status(grpc::StatusCode::INTERNAL, - grpc::string("failed to enqueue values for writing")); - } - - res->Clear(); - return grpc::Status::OK; - } -}; - /* * gRPC server implementation */ @@ -413,7 +406,6 @@ public: } builder.RegisterService(&collectd_service_); - builder.RegisterService(&dispatch_service_); server_ = builder.BuildAndStart(); } /* Start() */ @@ -425,7 +417,6 @@ public: private: CollectdImpl collectd_service_; - DispatchImpl dispatch_service_; std::unique_ptr server_; }; /* class CollectdServer */ -- 2.30.2