eb9e4e85a25feaaec369697fd3b073aad203cb9d
1 /**
2 * collectd - src/grpc.cc
3 * Copyright (C) 2015 Sebastian Harl
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Sebastian Harl <sh at tokkee.org>
25 **/
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
30 #include "collectd.grpc.pb.h"
32 extern "C" {
33 #include <stdbool.h>
34 #include <pthread.h>
36 #include "collectd.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "plugin.h"
40 }
42 using google::protobuf::util::TimeUtil;
44 /*
45 * proto conversion
46 */
48 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
49 {
50 vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
51 vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
53 std::string s;
55 s = msg.host();
56 if (!s.length())
57 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
58 grpc::string("missing host name"));
59 sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
61 s = msg.plugin();
62 if (!s.length())
63 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
64 grpc::string("missing plugin name"));
65 sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
67 s = msg.type();
68 if (!s.length())
69 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
70 grpc::string("missing type name"));
71 sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
73 s = msg.plugin_instance();
74 if (s.length())
75 sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
77 s = msg.type_instance();
78 if (s.length())
79 sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
81 value_t *values = NULL;
82 size_t values_len = 0;
83 auto status = grpc::Status::OK;
85 for (auto v : msg.value()) {
86 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
87 if (!val) {
88 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
89 grpc::string("failed to allocate values array"));
90 break;
91 }
93 values = val;
94 val = values + values_len;
95 values_len++;
97 switch (v.value_case()) {
98 case collectd::types::Value::ValueCase::kCounter:
99 val->counter = counter_t(v.counter());
100 break;
101 case collectd::types::Value::ValueCase::kGauge:
102 val->gauge = gauge_t(v.gauge());
103 break;
104 case collectd::types::Value::ValueCase::kDerive:
105 val->derive = derive_t(v.derive());
106 break;
107 case collectd::types::Value::ValueCase::kAbsolute:
108 val->absolute = absolute_t(v.absolute());
109 break;
110 default:
111 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
112 grpc::string("unkown value type"));
113 break;
114 }
116 if (!status.ok())
117 break;
118 }
119 if (status.ok()) {
120 vl->values = values;
121 vl->values_len = values_len;
122 }
123 else if (values) {
124 free(values);
125 }
127 return status;
128 } /* unmarshal_value_list() */
130 /*
131 * request call objects
132 */
134 class Call
135 {
136 public:
137 Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
138 : service_(service), cq_(cq), status_(CREATE)
139 { }
141 virtual ~Call()
142 { }
144 void Handle()
145 {
146 if (status_ == CREATE) {
147 Create();
148 status_ = PROCESS;
149 }
150 else if (status_ == PROCESS) {
151 Process();
152 status_ = FINISH;
153 }
154 else {
155 GPR_ASSERT(status_ == FINISH);
156 Finish();
157 }
158 } /* Handle() */
160 protected:
161 virtual void Create() = 0;
162 virtual void Process() = 0;
163 virtual void Finish() = 0;
165 collectd::Collectd::AsyncService *service_;
166 grpc::ServerCompletionQueue *cq_;
167 grpc::ServerContext ctx_;
169 private:
170 enum CallStatus { CREATE, PROCESS, FINISH };
171 CallStatus status_;
172 }; /* class Call */
174 class DispatchValuesCall : public Call
175 {
176 public:
177 DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
178 : Call(service, cq), responder_(&ctx_)
179 {
180 Handle();
181 } /* DispatchValuesCall() */
183 virtual ~DispatchValuesCall()
184 { }
186 private:
187 void Create()
188 {
189 service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this);
190 } /* Create() */
192 void Process()
193 {
194 // Add a new request object to the queue.
195 new DispatchValuesCall(service_, cq_);
197 value_list_t vl = VALUE_LIST_INIT;
198 auto status = unmarshal_value_list(request_.values(), &vl);
199 if (!status.ok()) {
200 responder_.Finish(reply_, status, this);
201 return;
202 }
204 if (plugin_dispatch_values(&vl))
205 status = grpc::Status(grpc::StatusCode::INTERNAL,
206 grpc::string("failed to enqueue values for writing"));
208 responder_.Finish(reply_, status, this);
209 } /* Process() */
211 void Finish()
212 {
213 delete this;
214 } /* Finish() */
216 collectd::DispatchValuesRequest request_;
217 collectd::DispatchValuesReply reply_;
219 grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
220 };
222 /*
223 * gRPC server implementation
224 */
226 class CollectdServer final
227 {
228 public:
229 void Start()
230 {
231 // TODO: make configurable
232 std::string addr("0.0.0.0:50051");
234 // TODO: make configurable
235 auto auth = grpc::InsecureServerCredentials();
237 grpc::ServerBuilder builder;
238 builder.AddListeningPort(addr, auth);
239 builder.RegisterAsyncService(&service_);
240 cq_ = builder.AddCompletionQueue();
241 server_ = builder.BuildAndStart();
243 INFO("grpc: Listening on %s", addr.c_str());
244 } /* Start() */
246 void Shutdown()
247 {
248 server_->Shutdown();
249 cq_->Shutdown();
250 } /* Shutdown() */
252 void Mainloop()
253 {
254 // Register request types.
255 new DispatchValuesCall(&service_, cq_.get());
257 while (true) {
258 void *req = NULL;
259 bool ok = false;
261 if (!cq_->Next(&req, &ok))
262 break; // Queue shut down.
263 if (!ok) {
264 ERROR("grpc: Failed to read from queue");
265 break;
266 }
268 static_cast<Call *>(req)->Handle();
269 }
270 } /* Mainloop() */
272 private:
273 collectd::Collectd::AsyncService service_;
275 std::unique_ptr<grpc::Server> server_;
276 std::unique_ptr<grpc::ServerCompletionQueue> cq_;
277 }; /* class CollectdServer */
279 static CollectdServer *server = nullptr;
281 /*
282 * collectd plugin interface
283 */
285 extern "C" {
286 static pthread_t *workers;
287 static size_t workers_num;
289 static void *worker_thread(void *arg)
290 {
291 CollectdServer *s = (CollectdServer *)arg;
292 s->Mainloop();
293 return NULL;
294 } /* worker_thread() */
296 static int c_grpc_init(void)
297 {
298 server = new CollectdServer();
299 size_t i;
301 if (! server) {
302 ERROR("grpc: Failed to create server");
303 return -1;
304 }
306 workers = (pthread_t *)calloc(5, sizeof(*workers));
307 if (! workers) {
308 delete server;
309 server = nullptr;
311 ERROR("grpc: Failed to allocate worker threads");
312 return -1;
313 }
314 workers_num = 5;
316 server->Start();
317 for (i = 0; i < workers_num; i++) {
318 pthread_create(&workers[i], /* attr = */ NULL,
319 worker_thread, server);
320 }
321 INFO("grpc: Started %zu workers", workers_num);
322 return 0;
323 } /* c_grpc_init() */
325 static int c_grpc_shutdown(void)
326 {
327 size_t i;
329 if (!server)
330 return -1;
332 server->Shutdown();
334 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
335 for (i = 0; i < workers_num; i++)
336 pthread_join(workers[i], NULL);
337 free(workers);
338 workers = NULL;
339 workers_num = 0;
341 delete server;
342 server = nullptr;
344 return 0;
345 } /* c_grpc_shutdown() */
347 void module_register(void)
348 {
349 plugin_register_init("grpc", c_grpc_init);
350 plugin_register_shutdown("grpc", c_grpc_shutdown);
351 } /* module_register() */
352 } /* extern "C" */
354 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */