summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 9f0f594)
raw | patch | inline | side by side (parent: 9f0f594)
author | Florian Forster <octo@collectd.org> | |
Wed, 10 Aug 2016 12:42:59 +0000 (14:42 +0200) | ||
committer | Florian Forster <octo@collectd.org> | |
Wed, 10 Aug 2016 13:02:46 +0000 (15:02 +0200) |
proto/collectd.proto | patch | blob | history | |
src/grpc.cc | patch | blob | history |
diff --git a/proto/collectd.proto b/proto/collectd.proto
index 8fea4f4f03153ef7e2d47201f8876fb942e95aa8..0ff6e771342db0834ae7f7d04a7ee5f8249dc659 100644 (file)
--- a/proto/collectd.proto
+++ b/proto/collectd.proto
service Collectd {
// Query a list of values available from collectd's value cache.
rpc QueryValues(QueryValuesRequest) returns (stream QueryValuesResponse);
-}
-service Dispatch {
// DispatchValues sends a stream of ValueLists to the server.
rpc DispatchValues(stream DispatchValuesRequest) returns (DispatchValuesResponse);
}
diff --git a/src/grpc.cc b/src/grpc.cc
index 5a5899dd65f7c7fd0bbb4dd36570bb26727f0db5..53b8fa53a87e52a8d1d2fabae54e9656feb9947a 100644 (file)
--- a/src/grpc.cc
+++ b/src/grpc.cc
}
using collectd::Collectd;
-using collectd::Dispatch;
using collectd::DispatchValuesRequest;
using collectd::DispatchValuesResponse;
}
std::queue<value_list_t> value_lists;
- err = this->read(&match, &value_lists);
+ err = this->queryValuesRead(&match, &value_lists);
if (err.ok()) {
- err = this->write(ctx, writer, &value_lists);
+ err = this->queryValuesWrite(ctx, writer, &value_lists);
}
while (!value_lists.empty()) {
return err;
}
+ grpc::Status DispatchValues(grpc::ServerContext *ctx,
+ grpc::ServerReader<DispatchValuesRequest> *reader,
+ DispatchValuesResponse *res) override {
+ DispatchValuesRequest req;
+
+ while (reader->Read(&req)) {
+ value_list_t vl = VALUE_LIST_INIT;
+ auto status = unmarshal_value_list(req.value_list(), &vl);
+ if (!status.ok())
+ return status;
+
+ if (plugin_dispatch_values(&vl))
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to enqueue values for writing"));
+ }
+
+ res->Clear();
+ return grpc::Status::OK;
+ }
+
private:
- grpc::Status read(value_list_t const *match, std::queue<value_list_t> *value_lists) {
+ grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
uc_iter_t *iter;
if ((iter = uc_get_iterator()) == NULL) {
return grpc::Status(grpc::StatusCode::INTERNAL,
return status;
}
- grpc::Status write(grpc::ServerContext *ctx,
+ grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
grpc::ServerWriter<QueryValuesResponse> *writer,
std::queue<value_list_t> *value_lists) {
while (!value_lists->empty()) {
}
};
-/*
- * Dispatch service
- */
-class DispatchImpl : public collectd::Dispatch::Service {
-public:
- grpc::Status DispatchValues(grpc::ServerContext *ctx,
- grpc::ServerReader<DispatchValuesRequest> *reader,
- DispatchValuesResponse *res) override {
- DispatchValuesRequest req;
-
- while (reader->Read(&req)) {
- value_list_t vl = VALUE_LIST_INIT;
- auto status = unmarshal_value_list(req.value_list(), &vl);
- if (!status.ok())
- return status;
-
- if (plugin_dispatch_values(&vl))
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to enqueue values for writing"));
- }
-
- res->Clear();
- return grpc::Status::OK;
- }
-};
-
/*
* gRPC server implementation
*/
}
builder.RegisterService(&collectd_service_);
- builder.RegisterService(&dispatch_service_);
server_ = builder.BuildAndStart();
} /* Start() */
private:
CollectdImpl collectd_service_;
- DispatchImpl dispatch_service_;
std::unique_ptr<grpc::Server> server_;
}; /* class CollectdServer */