Code

grpc plugin: Add a plugin providing a gRPC server.
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
30 #include "collectd.grpc.pb.h"
32 extern "C" {
33 #include <stdbool.h>
34 #include <pthread.h>
36 #include "collectd.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "plugin.h"
40 }
42 using google::protobuf::util::TimeUtil;
44 /*
45  * proto conversion
46  */
48 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
49 {
50         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
51         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
53         std::string s;
55         s = msg.host();
56         if (!s.length())
57                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
58                                 grpc::string("missing host name"));
59         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
61         s = msg.plugin();
62         if (!s.length())
63                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
64                                 grpc::string("missing plugin name"));
65         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
67         s = msg.type();
68         if (!s.length())
69                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
70                                 grpc::string("missing type name"));
71         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
73         s = msg.plugin_instance();
74         if (s.length())
75                 sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
77         s = msg.type_instance();
78         if (s.length())
79                 sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
81         value_t *values = NULL;
82         size_t values_len = 0;
83         auto status = grpc::Status::OK;
85         for (auto v : msg.value()) {
86                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
87                 if (!val) {
88                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
89                                         grpc::string("failed to allocate values array"));
90                         break;
91                 }
93                 values = val;
94                 val = values + values_len;
95                 values_len++;
97                 switch (v.value_case()) {
98                 case collectd::types::Value::ValueCase::kCounter:
99                         val->counter = counter_t(v.counter());
100                         break;
101                 case collectd::types::Value::ValueCase::kGauge:
102                         val->gauge = gauge_t(v.gauge());
103                         break;
104                 case collectd::types::Value::ValueCase::kDerive:
105                         val->derive = derive_t(v.derive());
106                         break;
107                 case collectd::types::Value::ValueCase::kAbsolute:
108                         val->absolute = absolute_t(v.absolute());
109                         break;
110                 default:
111                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
112                                         grpc::string("unkown value type"));
113                         break;
114                 }
116                 if (!status.ok())
117                         break;
118         }
119         if (status.ok()) {
120                 vl->values = values;
121                 vl->values_len = values_len;
122         }
123         else if (values) {
124                 free(values);
125         }
127         return status;
128 } /* unmarshal_value_list() */
130 /*
131  * request call objects
132  */
134 class Call
136 public:
137         Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
138                 : service_(service), cq_(cq), status_(CREATE)
139         { }
141         virtual ~Call()
142         { }
144         void Handle()
145         {
146                 if (status_ == CREATE) {
147                         Create();
148                         status_ = PROCESS;
149                 }
150                 else if (status_ == PROCESS) {
151                         Process();
152                         status_ = FINISH;
153                 }
154                 else {
155                         GPR_ASSERT(status_ == FINISH);
156                         Finish();
157                 }
158         } /* Handle() */
160 protected:
161         virtual void Create() = 0;
162         virtual void Process() = 0;
163         virtual void Finish() = 0;
165         collectd::Collectd::AsyncService *service_;
166         grpc::ServerCompletionQueue *cq_;
167         grpc::ServerContext ctx_;
169 private:
170         enum CallStatus { CREATE, PROCESS, FINISH };
171         CallStatus status_;
172 }; /* class Call */
174 class DispatchValuesCall : public Call
176 public:
177         DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
178                 : Call(service, cq), responder_(&ctx_)
179         {
180                 Handle();
181         } /* DispatchValuesCall() */
183         virtual ~DispatchValuesCall()
184         { }
186 private:
187         void Create()
188         {
189                 service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this);
190         } /* Create() */
192         void Process()
193         {
194                 // Add a new request object to the queue.
195                 new DispatchValuesCall(service_, cq_);
197                 value_list_t vl = VALUE_LIST_INIT;
198                 auto status = unmarshal_value_list(request_.values(), &vl);
199                 if (!status.ok()) {
200                         responder_.Finish(reply_, status, this);
201                         return;
202                 }
204                 if (plugin_dispatch_values(&vl))
205                         status = grpc::Status(grpc::StatusCode::INTERNAL,
206                                         grpc::string("failed to enqueue values for writing"));
208                 responder_.Finish(reply_, status, this);
209         } /* Process() */
211         void Finish()
212         {
213                 delete this;
214         } /* Finish() */
216         collectd::DispatchValuesRequest request_;
217         collectd::DispatchValuesReply reply_;
219         grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
220 };
222 /*
223  * gRPC server implementation
224  */
226 class CollectdServer final
228 public:
229         void Start()
230         {
231                 // TODO: make configurable
232                 std::string addr("0.0.0.0:50051");
234                 // TODO: make configurable
235                 auto auth = grpc::InsecureServerCredentials();
237                 grpc::ServerBuilder builder;
238                 builder.AddListeningPort(addr, auth);
239                 builder.RegisterAsyncService(&service_);
240                 cq_ = builder.AddCompletionQueue();
241                 server_ = builder.BuildAndStart();
243                 INFO("grpc: Listening on %s", addr.c_str());
244         } /* Start() */
246         void Shutdown()
247         {
248                 server_->Shutdown();
249                 cq_->Shutdown();
250         } /* Shutdown() */
252         void Mainloop()
253         {
254                 // Register request types.
255                 new DispatchValuesCall(&service_, cq_.get());
257                 while (true) {
258                         void *req = NULL;
259                         bool ok = false;
261                         if (!cq_->Next(&req, &ok))
262                                 break; // Queue shut down.
263                         if (!ok) {
264                                 ERROR("grpc: Failed to read from queue");
265                                 break;
266                         }
268                         static_cast<Call *>(req)->Handle();
269                 }
270         } /* Mainloop() */
272 private:
273         collectd::Collectd::AsyncService service_;
275         std::unique_ptr<grpc::Server> server_;
276         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
277 }; /* class CollectdServer */
279 static CollectdServer *server = nullptr;
281 /*
282  * collectd plugin interface
283  */
285 extern "C" {
286         static pthread_t *workers;
287         static size_t workers_num;
289         static void *worker_thread(void *arg)
290         {
291                 CollectdServer *s = (CollectdServer *)arg;
292                 s->Mainloop();
293                 return NULL;
294         } /* worker_thread() */
296         static int c_grpc_init(void)
297         {
298                 server = new CollectdServer();
299                 size_t i;
301                 if (! server) {
302                         ERROR("grpc: Failed to create server");
303                         return -1;
304                 }
306                 workers = (pthread_t *)calloc(5, sizeof(*workers));
307                 if (! workers) {
308                         delete server;
309                         server = nullptr;
311                         ERROR("grpc: Failed to allocate worker threads");
312                         return -1;
313                 }
314                 workers_num = 5;
316                 server->Start();
317                 for (i = 0; i < workers_num; i++) {
318                         pthread_create(&workers[i], /* attr = */ NULL,
319                                         worker_thread, server);
320                 }
321                 INFO("grpc: Started %zu workers", workers_num);
322                 return 0;
323         } /* c_grpc_init() */
325         static int c_grpc_shutdown(void)
326         {
327                 size_t i;
329                 if (!server)
330                         return -1;
332                 server->Shutdown();
334                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
335                 for (i = 0; i < workers_num; i++)
336                         pthread_join(workers[i], NULL);
337                 free(workers);
338                 workers = NULL;
339                 workers_num = 0;
341                 delete server;
342                 server = nullptr;
344                 return 0;
345         } /* c_grpc_shutdown() */
347         void module_register(void)
348         {
349                 plugin_register_init("grpc", c_grpc_init);
350                 plugin_register_shutdown("grpc", c_grpc_shutdown);
351         } /* module_register() */
352 } /* extern "C" */
354 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */