summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 6102042)
raw | patch | inline | side by side (parent: 6102042)
author | Florian Forster <octo@collectd.org> | |
Tue, 2 Aug 2016 06:14:24 +0000 (08:14 +0200) | ||
committer | Florian Forster <octo@collectd.org> | |
Wed, 10 Aug 2016 13:02:40 +0000 (15:02 +0200) |
proto/collectd.proto | patch | blob | history | |
src/grpc.cc | patch | blob | history |
diff --git a/proto/collectd.proto b/proto/collectd.proto
index 9ea73b2e9e85f87bc0deb8caf0449d30936a96b7..24aa52b7d1039416e8f967d1e68970eb3dfb553b 100644 (file)
--- a/proto/collectd.proto
+++ b/proto/collectd.proto
service Collectd {
// Query a list of values available from collectd's value cache.
- rpc QueryValues(QueryValuesRequest) returns (QueryValuesResponse);
+ rpc QueryValues(QueryValuesRequest) returns (stream QueryValuesResponse);
}
service Dispatch {
// The response from QueryValues.
message QueryValuesResponse {
- repeated collectd.types.ValueList value_lists = 1;
+ collectd.types.ValueList value_list = 1;
}
diff --git a/src/grpc.cc b/src/grpc.cc
index 9c4f2589f53289b0ce9df592ddc0be23db175508..aeb1c6d058a4fd65fbfef62ef2a0860c8aa720f6 100644 (file)
--- a/src/grpc.cc
+++ b/src/grpc.cc
#include <fstream>
#include <iostream>
+#include <queue>
#include <vector>
#include "collectd.grpc.pb.h"
@@ -275,61 +276,6 @@ static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesReques
return status;
} /* grpc::Status DispatchValue */
-static grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest req, QueryValuesResponse *res)
-{
- uc_iter_t *iter;
- char *name = NULL;
-
- value_list_t matcher;
- auto status = unmarshal_ident(req.identifier(), &matcher, false);
- if (!status.ok())
- return status;
-
- if ((iter = uc_get_iterator()) == NULL) {
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to query values: cannot create iterator"));
- }
-
- status = grpc::Status::OK;
- while (uc_iterator_next(iter, &name) == 0) {
- value_list_t vl;
- if (parse_identifier_vl(name, &vl) != 0) {
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to parse identifier"));
- break;
- }
-
- if (!ident_matches(&vl, &matcher))
- continue;
-
- if (uc_iterator_get_time(iter, &vl.time) < 0) {
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve value timestamp"));
- break;
- }
- if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve value interval"));
- break;
- }
- if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve values"));
- break;
- }
-
- auto pb_vl = res->add_value_lists();
- status = marshal_value_list(&vl, pb_vl);
- free(vl.values);
- if (!status.ok())
- break;
- }
-
- uc_iterator_destroy(iter);
-
- return status;
-} /* grpc::Status QueryValues */
-
// CallData is the abstract base class for asynchronous calls.
class CallData {
public:
/*
* Collectd service
*/
+
// QueryValuesCallData holds the state and implements the logic for QueryValues calls.
class QueryValuesCallData : public CallData {
public:
QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq)
: cq_(cq), service_(service), writer_(&context_) {
- // As part of the initialization, we *request* that the system start
- // processing QueryValues requests. In this request, "this" acts as
- // the tag uniquely identifying the request (so that different
- // QueryValuesCallData instances can serve different requests
- // concurrently), in this case the memory address of this
- // QueryValuesCallData instance.
- service->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
+ process(true);
}
void process(bool ok) final {
- if (done_) {
+ if (status_ == Status::INIT) {
+ status_ = Status::READ;
+ service_->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
+ } else if (status_ == Status::READ) {
+ auto err = queryValues();
+ if (!err.ok()) {
+ writer_.Finish(err, this);
+ status_ = Status::DONE;
+ return;
+ }
+ respond();
+ } else if (status_ == Status::WRITE) {
+ respond();
+ } else if (status_ == Status::DONE) {
+ new QueryValuesCallData(service_, cq_);
delete this;
} else {
- // Spawn a new QueryValuesCallData instance to serve new clients
- // while we process the one for this QueryValuesCallData. The
- // instance will deallocate itself as part of its FINISH state.
- new QueryValuesCallData(service_, cq_);
+ throw std::logic_error("Unhandled state enum.");
+ }
+ }
+
+private:
+ enum class Status {
+ INIT,
+ READ,
+ WRITE,
+ DONE,
+ };
+
+ grpc::Status queryValues (void) {
+ uc_iter_t *iter;
+ char *name = NULL;
+
+ value_list_t matcher;
+ auto status = unmarshal_ident(request_.identifier(), &matcher, false);
+ if (!status.ok())
+ return status;
+
+ if ((iter = uc_get_iterator()) == NULL) {
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to query values: cannot create iterator"));
+ }
+
+ status = grpc::Status::OK;
+ while (uc_iterator_next(iter, &name) == 0) {
+ value_list_t vl;
+ if (parse_identifier_vl(name, &vl) != 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to parse identifier"));
+ break;
+ }
+
+ if (!ident_matches(&vl, &matcher))
+ continue;
- auto status = QueryValues(&context_, request_, &response_);
- if (!status.ok()) {
- writer_.FinishWithError(status, this);
- } else {
- writer_.Finish(response_, grpc::Status::OK, this);
+ if (uc_iterator_get_time(iter, &vl.time) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value timestamp"));
+ break;
+ }
+ if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value interval"));
+ break;
+ }
+ if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve values"));
+ break;
}
- done_ = true;
+ value_lists_.push(vl);
}
+
+ uc_iterator_destroy(iter);
+ return status;
}
-private:
- bool done_ = false;
+ void respond() {
+ if (value_lists_.empty()) {
+ writer_.Finish(grpc::Status::OK, this);
+ status_ = Status::DONE;
+ return;
+ }
+
+ auto vl = value_lists_.front();
+
+ response_.Clear();
+ auto err = marshal_value_list(&vl, response_.mutable_value_list());
+ if (!err.ok()) {
+ writer_.Finish(err, this);
+ status_ = Status::DONE;
+ return;
+ }
+
+ value_lists_.pop();
+ free(vl.values);
+
+ writer_.Write(response_, this);
+ status_ = Status::WRITE;
+ }
+
+ Status status_ = Status::INIT;
grpc::ServerContext context_;
grpc::ServerCompletionQueue* cq_;
Collectd::AsyncService* service_;
QueryValuesRequest request_;
+ std::queue<value_list_t> value_lists_;
QueryValuesResponse response_;
- grpc::ServerAsyncResponseWriter<QueryValuesResponse> writer_;
+ grpc::ServerAsyncWriter<QueryValuesResponse> writer_;
};
/*