From 408b3fc30cabf109333b2b86caf5edf47f5b82f5 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Thu, 29 Oct 2015 22:37:53 +0100 Subject: [PATCH] grpc plugin: Add a plugin providing a gRPC server. gRPC is an open source RPC framework based on protocol buffers. The plugin currently exposes one RPC end-point for dispatching values to the daemon but is easy to extend for more functionality. The plugin is written in C++ and uses an synchronous architecture which should provide maximum performance. The collectd build system has been extended to support C++ (11) and protocol buffers (3.0+). --- .gitignore | 3 + Makefile.am | 2 +- configure.ac | 100 +++++++++++- proto/Makefile.am | 1 + proto/collectd.proto | 43 +++++ proto/types.proto | 52 +++++++ src/Makefile.am | 35 +++++ src/daemon/collectd.h | 2 +- src/grpc.cc | 354 ++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 587 insertions(+), 5 deletions(-) create mode 100644 proto/Makefile.am create mode 100644 proto/collectd.proto create mode 100644 proto/types.proto create mode 100644 src/grpc.cc diff --git a/.gitignore b/.gitignore index b14cc7ab..2d3a3368 100644 --- a/.gitignore +++ b/.gitignore @@ -52,6 +52,9 @@ src/liboconfig/scanner.c # protobuf stuff: src/*.pb-c.[ch] +src/*.grpc.pb.cc +src/*.pb.cc +src/*.pb.h # make dist stuff: /collectd-*.tar.gz diff --git a/Makefile.am b/Makefile.am index b79ea1ca..c63893fe 100644 --- a/Makefile.am +++ b/Makefile.am @@ -6,7 +6,7 @@ if BUILD_INCLUDED_LTDL SUBDIRS += libltdl endif -SUBDIRS += src bindings . +SUBDIRS += proto src bindings . AM_CPPFLAGS = $(LTDLINCL) diff --git a/configure.ac b/configure.ac index e2149ba4..a6f29675 100644 --- a/configure.ac +++ b/configure.ac @@ -43,6 +43,7 @@ AC_SYS_LARGEFILE # Checks for programs. # AC_PROG_CC +AC_PROG_CXX AC_PROG_CPP AC_PROG_EGREP AC_PROG_INSTALL @@ -73,6 +74,25 @@ then AC_MSG_ERROR([bison is missing and you do not have ${srcdir}/src/liboconfig/parser.c. Please install bison]) fi +AC_PATH_PROG([PROTOC], [protoc]) +have_protoc3="no" +if test "x$PROTOC" != "x"; then + AC_MSG_CHECKING([for protoc 3.0.0+]) + if $PROTOC --version | grep -q libprotoc.3; then + protoc3="yes (`$PROTOC --version`)" + have_protoc3="yes" + else + protoc3="no (`$PROTOC --version`)" + fi + AC_MSG_RESULT([$protoc3]) +fi +AC_SUBST([PROTOC]) +AM_CONDITIONAL(HAVE_PROTOC3, test "x$have_protoc3" = "xyes") + +AC_PATH_PROG([GRPC_CPP_PLUGIN], [grpc_cpp_plugin]) +AC_SUBST([GRPC_CPP_PLUGIN]) +AM_CONDITIONAL(HAVE_GRPC_CPP, test "x$GRPC_CPP_PLUGIN" != "x") + AC_CHECK_PROG([have_protoc_c], [protoc-c], [yes], [no]) if test "x$have_protoc_c" = "xno" then @@ -779,6 +799,16 @@ AC_CHECK_FUNCS(gettimeofday select strdup strtol getaddrinfo getnameinfo strchr AC_FUNC_STRERROR_R +test_cxx_flags() { + AC_LANG_PUSH(C++) + AC_LANG_CONFTEST([int main(void){}]) + $CXX -c conftest.cpp $CXXFLAGS $@ > /dev/null 2> /dev/null + ret=$? + rm -f conftest.o + AC_LANG_POP(C++) + return $ret +} + SAVE_CFLAGS="$CFLAGS" # Emulate behavior of src/Makefile.am if test "x$GCC" = "xyes" @@ -2210,6 +2240,62 @@ AC_SUBST(GCRYPT_LIBS) AM_CONDITIONAL(BUILD_WITH_LIBGCRYPT, test "x$with_libgcrypt" = "xyes") # }}} +# --with-grpc {{{ +AC_ARG_WITH(grpc, [AS_HELP_STRING([--without-grpc], [Disable gRPC (default: autodetect).])], +[ + with_grpc="$withval" +], +[ + with_grpc="yes" +]) + +if test "x$with_grpc" = "xyes" +then + if test "x$have_protoc3" != "xyes" + then + with_grpc="no (requires protoc 3.0.0+)" + else if test "x$GRPC_CPP_PLUGIN" = "x" + then + with_grpc"no (requires grpc_cpp_plugin)" + fi; fi +fi + +if test "x$with_grpc" = "xyes" +then + AC_MSG_CHECKING([whether $CXX accepts -std=c++11]) + if test_cxx_flags -std=c++11; then + AC_MSG_RESULT([yes]) + else + AC_MSG_RESULT([no]) + with_grpc="no (requires C++11 support)" + fi +fi + +if test "x$with_grpc" = "xyes" +then + AC_LANG_PUSH(C++) + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_CXXFLAGS="$CXXFLAGS" + CPPFLAGS="$CPPFLAGS -std=c++11" + CXXFLAGS="$CXXFLAGS -std=c++11" + AC_CHECK_HEADERS([grpc++/grpc++.h], [], + [with_grpc="no (grpc++/grpc++.h not found)"]) + CPPFLAGS="$SAVE_CPPFLAGS" + CXXFLAGS="$SAVE_CXXFLAGS" + AC_LANG_POP(C++) +fi +with_libgrpc="no" +if test "x$with_grpc" = "xyes" +then + AC_LANG_PUSH(C++) + AC_CHECK_LIB([grpc], [grpc_register_plugin], + [with_libgrpc="yes"], + [with_grpc="no (libgrpc not found)"], + [-lgpr -lprotobuf]) + AC_LANG_POP(C++) +fi +# }}} + # --with-libiptc {{{ AC_ARG_WITH(libiptc, [AS_HELP_STRING([--with-libiptc@<:@=PREFIX@:>@], [Path to libiptc.])], [ @@ -5394,7 +5480,7 @@ AC_DEFUN( then enable_plugin="yes" else - enable_plugin="no" + enable_plugin="$2" fi else enable_plugin="$enable_all_plugins" @@ -5411,7 +5497,7 @@ AC_DEFUN( fi else # User passed "yes" but dependency checking yielded "no" => Dependency problem. dependency_error="yes" - enable_plugin="no (dependency error)" + enable_plugin="$2 (dependency error)" fi fi AM_CONDITIONAL([BUILD_PLUGIN_]my_toupper([$1]), test "x$enable_plugin" = "xyes") @@ -5871,6 +5957,7 @@ AC_PLUGIN([fhcount], [$plugin_fhcount], [File handles statis AC_PLUGIN([filecount], [yes], [Count files in directories]) AC_PLUGIN([fscache], [$plugin_fscache], [fscache statistics]) AC_PLUGIN([gmond], [$with_libganglia], [Ganglia plugin]) +AC_PLUGIN([grpc], [$with_grpc], [gRPC plugin]) AC_PLUGIN([hddtemp], [yes], [Query hddtempd]) AC_PLUGIN([interface], [$plugin_interface], [Interface traffic statistics]) AC_PLUGIN([ipc], [$plugin_ipc], [IPC statistics]) @@ -6128,13 +6215,16 @@ AC_SUBST(LCC_VERSION_STRING) AC_CONFIG_FILES(src/libcollectdclient/collectd/lcc_features.h) AM_CFLAGS="-Wall" +AM_CXXFLAGS="-Wall" if test "x$enable_werror" != "xno" then AM_CFLAGS="$AM_CFLAGS -Werror" + AM_CXXFLAGS="$AM_CFLAGS -Werror" fi AC_SUBST([AM_CFLAGS]) +AC_SUBST([AM_CXXFLAGS]) -AC_CONFIG_FILES([Makefile src/Makefile src/daemon/Makefile src/collectd.conf src/libcollectdclient/Makefile src/libcollectdclient/libcollectdclient.pc src/liboconfig/Makefile bindings/Makefile bindings/java/Makefile]) +AC_CONFIG_FILES([Makefile proto/Makefile src/Makefile src/daemon/Makefile src/collectd.conf src/libcollectdclient/Makefile src/libcollectdclient/libcollectdclient.pc src/liboconfig/Makefile bindings/Makefile bindings/java/Makefile]) AC_OUTPUT if test "x$with_librrd" = "xyes" \ @@ -6168,6 +6258,7 @@ Configuration: Platform . . . . . . $ac_system CC . . . . . . . . . $CC CFLAGS . . . . . . . $AM_CFLAGS $CFLAGS + CXXFLAGS . . . . . . $AM_CXXFLAGS $CXXFLAGS CPP . . . . . . . . . $CPP CPPFLAGS . . . . . . $CPPFLAGS LD . . . . . . . . . $LD @@ -6184,6 +6275,7 @@ Configuration: libesmtp . . . . . . $with_libesmtp libganglia . . . . . $with_libganglia libgcrypt . . . . . . $with_libgcrypt + libgrpc . . . . . . . $with_libgrpc libhal . . . . . . . $with_libhal libhiredis . . . . . $with_libhiredis libi2c-dev . . . . . $with_libi2c @@ -6231,6 +6323,7 @@ Configuration: libyajl . . . . . . . $with_libyajl oracle . . . . . . . $with_oracle protobuf-c . . . . . $have_protoc_c + protoc 3 . . . . . . $have_protoc3 python . . . . . . . $with_python Features: @@ -6274,6 +6367,7 @@ Configuration: filecount . . . . . . $enable_filecount fscache . . . . . . . $enable_fscache gmond . . . . . . . . $enable_gmond + grpc . . . . . . . . $enable_grpc hddtemp . . . . . . . $enable_hddtemp interface . . . . . . $enable_interface ipc . . . . . . . . . $enable_ipc diff --git a/proto/Makefile.am b/proto/Makefile.am new file mode 100644 index 00000000..3c0bfd7f --- /dev/null +++ b/proto/Makefile.am @@ -0,0 +1 @@ +EXTRA_DIST = collectd.proto types.proto diff --git a/proto/collectd.proto b/proto/collectd.proto new file mode 100644 index 00000000..84db755b --- /dev/null +++ b/proto/collectd.proto @@ -0,0 +1,43 @@ +// collectd - proto/collectd.proto +// Copyright (C) 2015 Sebastian Harl +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +// +// Authors: +// Sebastian Harl + +syntax = "proto3"; + +package collectd; + +import "types.proto"; + +service Collectd { + // Dispatch collected values to collectd. + rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply); +} + +// The arguments to DispatchValues. +message DispatchValuesRequest { + collectd.types.ValueList values = 1; +} + +// The response from DispatchValues. +message DispatchValuesReply { +} diff --git a/proto/types.proto b/proto/types.proto new file mode 100644 index 00000000..7f3d329d --- /dev/null +++ b/proto/types.proto @@ -0,0 +1,52 @@ +// collectd - proto/types.proto +// Copyright (C) 2015 Sebastian Harl +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +// +// Authors: +// Sebastian Harl + +syntax = "proto3"; + +package collectd.types; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; + +message Value { + oneof value { + uint64 counter = 1; + double gauge = 2; + int64 derive = 3; + uint64 absolute = 4; + }; +} + +message ValueList { + repeated Value value = 1; + + google.protobuf.Timestamp time = 2; + google.protobuf.Duration interval = 3; + + string host = 4; + string plugin = 5; + string plugin_instance = 6; + string type = 7; + string type_instance = 8; +} diff --git a/src/Makefile.am b/src/Makefile.am index d54094f6..b8b2575d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -17,6 +17,12 @@ endif AM_CPPFLAGS += -DPLUGINDIR='"${pkglibdir}"' AM_CPPFLAGS += -DPKGDATADIR='"${pkgdatadir}"' +AUTOMAKE_OPTIONS = subdir-objects + +V_PROTOC = $(v_protoc_@AM_V@) +v_protoc_ = $(v_protoc_@AM_DEFAULT_V@) +v_protoc_0 = @echo " PROTOC " $@; + noinst_LTLIBRARIES = check_PROGRAMS = TESTS = @@ -106,6 +112,24 @@ pkglib_LTLIBRARIES = BUILT_SOURCES = CLEANFILES = +if HAVE_PROTOC3 +if HAVE_GRPC_CPP +BUILT_SOURCES += collectd.grpc.pb.cc collectd.pb.cc types.pb.cc +CLEANFILES += collectd.grpc.pb.cc collectd.pb.cc types.pb.cc \ + collectd.grpc.pb.h collectd.pb.h types.pb.h + +collectd.grpc.pb.cc: $(top_srcdir)/proto/collectd.proto $(top_srcdir)/proto/types.proto + $(V_PROTOC)@PROTOC@ -I$(top_srcdir)/proto \ + --grpc_out=$(builddir) --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN) $< + +collectd.pb.cc: $(top_srcdir)/proto/collectd.proto $(top_srcdir)/proto/types.proto + $(V_PROTOC)@PROTOC@ -I$(top_srcdir)/proto --cpp_out=$(builddir) $< + +types.pb.cc: $(top_srcdir)/proto/types.proto + $(V_PROTOC)@PROTOC@ -I$(top_srcdir)/proto --cpp_out=$(builddir) $< +endif +endif + if BUILD_PLUGIN_AGGREGATION pkglib_LTLIBRARIES += aggregation.la aggregation_la_SOURCES = aggregation.c \ @@ -407,6 +431,17 @@ gmond_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(GANGLIA_LDFLAGS) gmond_la_LIBADD = $(GANGLIA_LIBS) endif +if BUILD_PLUGIN_GRPC +pkglib_LTLIBRARIES += grpc.la +grpc_la_SOURCES = grpc.cc \ + collectd.grpc.pb.cc collectd.pb.cc types.pb.cc +grpc_la_CPPFLAGS = $(AM_CPPFLAGS) -std=c++11 +grpc_la_CFLAGS = $(AM_CFLAGS) +grpc_la_CXXFLAGS = $(AM_CXXFLAGS) -std=c++11 +grpc_la_LDFLAGS = $(PLUGIN_LDFLAGS) +grpc_la_LIBADD = -lgrpc++_unsecure -lgrpc -lgpr -lprotobuf -lpthread -ldl +endif + if BUILD_PLUGIN_HDDTEMP pkglib_LTLIBRARIES += hddtemp.la hddtemp_la_SOURCES = hddtemp.c diff --git a/src/daemon/collectd.h b/src/daemon/collectd.h index 90021874..8fb29455 100644 --- a/src/daemon/collectd.h +++ b/src/daemon/collectd.h @@ -271,7 +271,7 @@ typedef int _Bool; #endif #ifndef COLLECTD_USERAGENT - # define COLLECTD_USERAGENT PACKAGE_NAME"/"PACKAGE_VERSION + # define COLLECTD_USERAGENT PACKAGE_NAME "/" PACKAGE_VERSION #endif /* Only enable __attribute__() for compilers known to support it. */ diff --git a/src/grpc.cc b/src/grpc.cc new file mode 100644 index 00000000..eb9e4e85 --- /dev/null +++ b/src/grpc.cc @@ -0,0 +1,354 @@ +/** + * collectd - src/grpc.cc + * Copyright (C) 2015 Sebastian Harl + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Sebastian Harl + **/ + +#include +#include + +#include "collectd.grpc.pb.h" + +extern "C" { +#include +#include + +#include "collectd.h" +#include "common.h" +#include "configfile.h" +#include "plugin.h" +} + +using google::protobuf::util::TimeUtil; + +/* + * proto conversion + */ + +static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl) +{ + vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time())); + vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval())); + + std::string s; + + s = msg.host(); + if (!s.length()) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + grpc::string("missing host name")); + sstrncpy(vl->host, s.c_str(), sizeof(vl->host)); + + s = msg.plugin(); + if (!s.length()) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + grpc::string("missing plugin name")); + sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin)); + + s = msg.type(); + if (!s.length()) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + grpc::string("missing type name")); + sstrncpy(vl->type, s.c_str(), sizeof(vl->type)); + + s = msg.plugin_instance(); + if (s.length()) + sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance)); + + s = msg.type_instance(); + if (s.length()) + sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance)); + + value_t *values = NULL; + size_t values_len = 0; + auto status = grpc::Status::OK; + + for (auto v : msg.value()) { + value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values)); + if (!val) { + status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, + grpc::string("failed to allocate values array")); + break; + } + + values = val; + val = values + values_len; + values_len++; + + switch (v.value_case()) { + case collectd::types::Value::ValueCase::kCounter: + val->counter = counter_t(v.counter()); + break; + case collectd::types::Value::ValueCase::kGauge: + val->gauge = gauge_t(v.gauge()); + break; + case collectd::types::Value::ValueCase::kDerive: + val->derive = derive_t(v.derive()); + break; + case collectd::types::Value::ValueCase::kAbsolute: + val->absolute = absolute_t(v.absolute()); + break; + default: + status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + grpc::string("unkown value type")); + break; + } + + if (!status.ok()) + break; + } + if (status.ok()) { + vl->values = values; + vl->values_len = values_len; + } + else if (values) { + free(values); + } + + return status; +} /* unmarshal_value_list() */ + +/* + * request call objects + */ + +class Call +{ +public: + Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq) + : service_(service), cq_(cq), status_(CREATE) + { } + + virtual ~Call() + { } + + void Handle() + { + if (status_ == CREATE) { + Create(); + status_ = PROCESS; + } + else if (status_ == PROCESS) { + Process(); + status_ = FINISH; + } + else { + GPR_ASSERT(status_ == FINISH); + Finish(); + } + } /* Handle() */ + +protected: + virtual void Create() = 0; + virtual void Process() = 0; + virtual void Finish() = 0; + + collectd::Collectd::AsyncService *service_; + grpc::ServerCompletionQueue *cq_; + grpc::ServerContext ctx_; + +private: + enum CallStatus { CREATE, PROCESS, FINISH }; + CallStatus status_; +}; /* class Call */ + +class DispatchValuesCall : public Call +{ +public: + DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq) + : Call(service, cq), responder_(&ctx_) + { + Handle(); + } /* DispatchValuesCall() */ + + virtual ~DispatchValuesCall() + { } + +private: + void Create() + { + service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this); + } /* Create() */ + + void Process() + { + // Add a new request object to the queue. + new DispatchValuesCall(service_, cq_); + + value_list_t vl = VALUE_LIST_INIT; + auto status = unmarshal_value_list(request_.values(), &vl); + if (!status.ok()) { + responder_.Finish(reply_, status, this); + return; + } + + if (plugin_dispatch_values(&vl)) + status = grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to enqueue values for writing")); + + responder_.Finish(reply_, status, this); + } /* Process() */ + + void Finish() + { + delete this; + } /* Finish() */ + + collectd::DispatchValuesRequest request_; + collectd::DispatchValuesReply reply_; + + grpc::ServerAsyncResponseWriter responder_; +}; + +/* + * gRPC server implementation + */ + +class CollectdServer final +{ +public: + void Start() + { + // TODO: make configurable + std::string addr("0.0.0.0:50051"); + + // TODO: make configurable + auto auth = grpc::InsecureServerCredentials(); + + grpc::ServerBuilder builder; + builder.AddListeningPort(addr, auth); + builder.RegisterAsyncService(&service_); + cq_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); + + INFO("grpc: Listening on %s", addr.c_str()); + } /* Start() */ + + void Shutdown() + { + server_->Shutdown(); + cq_->Shutdown(); + } /* Shutdown() */ + + void Mainloop() + { + // Register request types. + new DispatchValuesCall(&service_, cq_.get()); + + while (true) { + void *req = NULL; + bool ok = false; + + if (!cq_->Next(&req, &ok)) + break; // Queue shut down. + if (!ok) { + ERROR("grpc: Failed to read from queue"); + break; + } + + static_cast(req)->Handle(); + } + } /* Mainloop() */ + +private: + collectd::Collectd::AsyncService service_; + + std::unique_ptr server_; + std::unique_ptr cq_; +}; /* class CollectdServer */ + +static CollectdServer *server = nullptr; + +/* + * collectd plugin interface + */ + +extern "C" { + static pthread_t *workers; + static size_t workers_num; + + static void *worker_thread(void *arg) + { + CollectdServer *s = (CollectdServer *)arg; + s->Mainloop(); + return NULL; + } /* worker_thread() */ + + static int c_grpc_init(void) + { + server = new CollectdServer(); + size_t i; + + if (! server) { + ERROR("grpc: Failed to create server"); + return -1; + } + + workers = (pthread_t *)calloc(5, sizeof(*workers)); + if (! workers) { + delete server; + server = nullptr; + + ERROR("grpc: Failed to allocate worker threads"); + return -1; + } + workers_num = 5; + + server->Start(); + for (i = 0; i < workers_num; i++) { + pthread_create(&workers[i], /* attr = */ NULL, + worker_thread, server); + } + INFO("grpc: Started %zu workers", workers_num); + return 0; + } /* c_grpc_init() */ + + static int c_grpc_shutdown(void) + { + size_t i; + + if (!server) + return -1; + + server->Shutdown(); + + INFO("grpc: Waiting for %zu workers to terminate", workers_num); + for (i = 0; i < workers_num; i++) + pthread_join(workers[i], NULL); + free(workers); + workers = NULL; + workers_num = 0; + + delete server; + server = nullptr; + + return 0; + } /* c_grpc_shutdown() */ + + void module_register(void) + { + plugin_register_init("grpc", c_grpc_init); + plugin_register_shutdown("grpc", c_grpc_shutdown); + } /* module_register() */ +} /* extern "C" */ + +/* vim: set sw=4 ts=4 tw=78 noexpandtab : */ -- 2.30.2