Code

grpc plugin: Make RPC call implementation more modular.
authorSebastian Harl <sh@tokkee.org>
Mon, 2 Nov 2015 23:03:11 +0000 (00:03 +0100)
committerSebastian Harl <sh@tokkee.org>
Mon, 30 May 2016 21:44:19 +0000 (23:44 +0200)
Use a template class for the RPC call object implementation to handle all
common functionality. The actual RPC implementation has been moved into
overloaded functions, one for each RPC. This approach only works as long as
the request and response type pairs are unique per call. That's common
practice, though, and in case there's an exception, it would have to fall back
to the previous approach (one class per call).

src/grpc.cc

index abdd6b02e3b710423e92fac9a9924048ae46c36b..2f79f42d2b75ae4f3f119eac275b9d098fc65d5e 100644 (file)
@@ -49,6 +49,13 @@ extern "C" {
        static size_t listeners_num;
 }
 
+using collectd::Collectd;
+
+using collectd::DispatchValuesRequest;
+using collectd::DispatchValuesReply;
+using collectd::ListValuesRequest;
+using collectd::ListValuesReply;
+
 using google::protobuf::util::TimeUtil;
 
 /*
@@ -138,13 +145,51 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
 } /* unmarshal_value_list() */
 
 /*
- * request call objects
+ * request call-backs and call objects
  */
 
+static grpc::Status Process(grpc::ServerContext *ctx,
+               DispatchValuesRequest request, DispatchValuesReply *reply)
+{
+       value_list_t vl = VALUE_LIST_INIT;
+       auto status = unmarshal_value_list(request.values(), &vl);
+       if (!status.ok())
+               return status;
+
+       if (plugin_dispatch_values(&vl))
+               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                               grpc::string("failed to enqueue values for writing"));
+       return status;
+} /* Process(): DispatchValues */
+
+static grpc::Status Process(grpc::ServerContext *ctx,
+               ListValuesRequest request, ListValuesReply *reply)
+{
+       char **names = NULL;
+       cdtime_t *times = NULL;
+       size_t i, n = 0;
+
+       if (uc_get_names(&names, &times, &n))
+               return grpc::Status(grpc::StatusCode::INTERNAL,
+                               grpc::string("failed to retrieve values"));
+
+       for (i = 0; i < n; i++) {
+               auto v = reply->add_value();
+               auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
+               v->set_name(names[i]);
+               v->set_allocated_time(new google::protobuf::Timestamp(t));
+               sfree(names[i]);
+       }
+       sfree(names);
+       sfree(times);
+
+       return grpc::Status::OK;
+} /* Process(): ListValues */
+
 class Call
 {
 public:
-       Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
+       Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
                : service_(service), cq_(cq), status_(CREATE)
        { }
 
@@ -172,7 +217,7 @@ protected:
        virtual void Process() = 0;
        virtual void Finish() = 0;
 
-       collectd::Collectd::AsyncService *service_;
+       Collectd::AsyncService *service_;
        grpc::ServerCompletionQueue *cq_;
        grpc::ServerContext ctx_;
 
@@ -181,40 +226,35 @@ private:
        CallStatus status_;
 }; /* class Call */
 
-class DispatchValuesCall : public Call
+template<typename RequestT, typename ReplyT>
+class RpcCall final : public Call
 {
+       typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
+                       RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
+                       grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
+
 public:
-       DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
-               : Call(service, cq), responder_(&ctx_)
+       RpcCall(Collectd::AsyncService *service,
+                       CreatorT creator, grpc::ServerCompletionQueue *cq)
+               : Call(service, cq), creator_(creator), responder_(&ctx_)
        {
                Handle();
-       } /* DispatchValuesCall() */
+       } /* RpcCall() */
 
-       virtual ~DispatchValuesCall()
+       virtual ~RpcCall()
        { }
 
 private:
        void Create()
        {
-               service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this);
+               (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
        } /* Create() */
 
        void Process()
        {
                // Add a new request object to the queue.
-               new DispatchValuesCall(service_, cq_);
-
-               value_list_t vl = VALUE_LIST_INIT;
-               auto status = unmarshal_value_list(request_.values(), &vl);
-               if (!status.ok()) {
-                       responder_.Finish(reply_, status, this);
-                       return;
-               }
-
-               if (plugin_dispatch_values(&vl))
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to enqueue values for writing"));
-
+               new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
+               grpc::Status status = ::Process(&ctx_, request_, &reply_);
                responder_.Finish(reply_, status, this);
        } /* Process() */
 
@@ -223,67 +263,13 @@ private:
                delete this;
        } /* Finish() */
 
-       collectd::DispatchValuesRequest request_;
-       collectd::DispatchValuesReply reply_;
-
-       grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
-}; /* class DispatchValuesCall */
-
-class ListValuesCall : public Call
-{
-public:
-       ListValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
-               : Call(service, cq), responder_(&ctx_)
-       {
-               Handle();
-       } /* ListValuesCall() */
-
-       virtual ~ListValuesCall()
-       { }
-
-private:
-       void Create()
-       {
-               service_->RequestListValues(&ctx_, &request_, &responder_, cq_, cq_, this);
-       } /* Create() */
-
-       void Process()
-       {
-               new ListValuesCall(service_, cq_);
-
-               char **names = NULL;
-               cdtime_t *times = NULL;
-               size_t i, n = 0;
-
-               auto status = grpc::Status::OK;
-               if (uc_get_names(&names, &times, &n)) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve values"));
-               }
-
-               for (i = 0; i < n; i++) {
-                       auto v = reply_.add_value();
-                       auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
-                       v->set_name(names[i]);
-                       v->set_allocated_time(new google::protobuf::Timestamp(t));
-                       sfree(names[i]);
-               }
-               sfree(names);
-               sfree(times);
-
-               responder_.Finish(reply_, status, this);
-       } /* Process() */
-
-       void Finish()
-       {
-               delete this;
-       } /* Finish() */
+       CreatorT creator_;
 
-       collectd::ListValuesRequest request_;
-       collectd::ListValuesReply reply_;
+       RequestT request_;
+       ReplyT reply_;
 
-       grpc::ServerAsyncResponseWriter<collectd::ListValuesReply> responder_;
-}; /* class ListValuesCall */
+       grpc::ServerAsyncResponseWriter<ReplyT> responder_;
+}; /* class RpcCall */
 
 /*
  * gRPC server implementation
@@ -329,8 +315,10 @@ public:
        void Mainloop()
        {
                // Register request types.
-               new DispatchValuesCall(&service_, cq_.get());
-               new ListValuesCall(&service_, cq_.get());
+               new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
+                               &Collectd::AsyncService::RequestDispatchValues, cq_.get());
+               new RpcCall<ListValuesRequest, ListValuesReply>(&service_,
+                               &Collectd::AsyncService::RequestListValues, cq_.get());
 
                while (true) {
                        void *req = NULL;
@@ -348,7 +336,7 @@ public:
        } /* Mainloop() */
 
 private:
-       collectd::Collectd::AsyncService service_;
+       Collectd::AsyncService service_;
 
        std::unique_ptr<grpc::Server> server_;
        std::unique_ptr<grpc::ServerCompletionQueue> cq_;