Code

grpc plugin: Add a plugin providing a gRPC server.
authorSebastian Harl <sh@tokkee.org>
Thu, 29 Oct 2015 21:37:53 +0000 (22:37 +0100)
committerSebastian Harl <sh@tokkee.org>
Mon, 30 May 2016 21:44:19 +0000 (23:44 +0200)
gRPC is an open source RPC framework based on protocol buffers. The plugin
currently exposes one RPC end-point for dispatching values to the daemon but
is easy to extend for more functionality.

The plugin is written in C++ and uses an synchronous architecture which should
provide maximum performance. The collectd build system has been extended to
support C++ (11) and protocol buffers (3.0+).

.gitignore
Makefile.am
configure.ac
proto/Makefile.am [new file with mode: 0644]
proto/collectd.proto [new file with mode: 0644]
proto/types.proto [new file with mode: 0644]
src/Makefile.am
src/daemon/collectd.h
src/grpc.cc [new file with mode: 0644]

index b14cc7abd3678d58100952abac32260624e1fef8..2d3a33683d19fba43c699125ec5aa2cc2eacf095 100644 (file)
@@ -52,6 +52,9 @@ src/liboconfig/scanner.c
 
 # protobuf stuff:
 src/*.pb-c.[ch]
+src/*.grpc.pb.cc
+src/*.pb.cc
+src/*.pb.h
 
 # make dist stuff:
 /collectd-*.tar.gz
index b79ea1cabfbdc562cfcd46983771b1351d6a5d23..c63893fe6727945b946db37e486b2338e96dc7e4 100644 (file)
@@ -6,7 +6,7 @@ if BUILD_INCLUDED_LTDL
 SUBDIRS += libltdl
 endif
 
-SUBDIRS += src bindings .
+SUBDIRS += proto src bindings .
 
 AM_CPPFLAGS = $(LTDLINCL)
 
index e2149ba4465125c1a9ac72bc72e36138ac567b6d..a6f2967528c0d06e549273241c8997eabf4c5732 100644 (file)
@@ -43,6 +43,7 @@ AC_SYS_LARGEFILE
 # Checks for programs.
 #
 AC_PROG_CC
+AC_PROG_CXX
 AC_PROG_CPP
 AC_PROG_EGREP
 AC_PROG_INSTALL
@@ -73,6 +74,25 @@ then
        AC_MSG_ERROR([bison is missing and you do not have ${srcdir}/src/liboconfig/parser.c. Please install bison])
 fi
 
+AC_PATH_PROG([PROTOC], [protoc])
+have_protoc3="no"
+if test "x$PROTOC" != "x"; then
+       AC_MSG_CHECKING([for protoc 3.0.0+])
+       if $PROTOC --version | grep -q libprotoc.3; then
+               protoc3="yes (`$PROTOC --version`)"
+               have_protoc3="yes"
+       else
+               protoc3="no (`$PROTOC --version`)"
+       fi
+       AC_MSG_RESULT([$protoc3])
+fi
+AC_SUBST([PROTOC])
+AM_CONDITIONAL(HAVE_PROTOC3, test "x$have_protoc3" = "xyes")
+
+AC_PATH_PROG([GRPC_CPP_PLUGIN], [grpc_cpp_plugin])
+AC_SUBST([GRPC_CPP_PLUGIN])
+AM_CONDITIONAL(HAVE_GRPC_CPP, test "x$GRPC_CPP_PLUGIN" != "x")
+
 AC_CHECK_PROG([have_protoc_c], [protoc-c], [yes], [no])
 if test "x$have_protoc_c" = "xno"
 then
@@ -779,6 +799,16 @@ AC_CHECK_FUNCS(gettimeofday select strdup strtol getaddrinfo getnameinfo strchr
 
 AC_FUNC_STRERROR_R
 
+test_cxx_flags() {
+       AC_LANG_PUSH(C++)
+       AC_LANG_CONFTEST([int main(void){}])
+       $CXX -c conftest.cpp $CXXFLAGS $@ > /dev/null 2> /dev/null
+       ret=$?
+       rm -f conftest.o
+       AC_LANG_POP(C++)
+       return $ret
+}
+
 SAVE_CFLAGS="$CFLAGS"
 # Emulate behavior of src/Makefile.am
 if test "x$GCC" = "xyes"
@@ -2210,6 +2240,62 @@ AC_SUBST(GCRYPT_LIBS)
 AM_CONDITIONAL(BUILD_WITH_LIBGCRYPT, test "x$with_libgcrypt" = "xyes")
 # }}}
 
+# --with-grpc {{{
+AC_ARG_WITH(grpc, [AS_HELP_STRING([--without-grpc], [Disable gRPC (default: autodetect).])],
+[
+       with_grpc="$withval"
+],
+[
+       with_grpc="yes"
+])
+
+if test "x$with_grpc" = "xyes"
+then
+       if test "x$have_protoc3" != "xyes"
+       then
+               with_grpc="no (requires protoc 3.0.0+)"
+       else if test "x$GRPC_CPP_PLUGIN" = "x"
+       then
+               with_grpc"no (requires grpc_cpp_plugin)"
+       fi; fi
+fi
+
+if test "x$with_grpc" = "xyes"
+then
+       AC_MSG_CHECKING([whether $CXX accepts -std=c++11])
+       if test_cxx_flags -std=c++11; then
+               AC_MSG_RESULT([yes])
+       else
+               AC_MSG_RESULT([no])
+               with_grpc="no (requires C++11 support)"
+       fi
+fi
+
+if test "x$with_grpc" = "xyes"
+then
+       AC_LANG_PUSH(C++)
+       SAVE_CPPFLAGS="$CPPFLAGS"
+       SAVE_CXXFLAGS="$CXXFLAGS"
+       CPPFLAGS="$CPPFLAGS -std=c++11"
+       CXXFLAGS="$CXXFLAGS -std=c++11"
+       AC_CHECK_HEADERS([grpc++/grpc++.h], [],
+                       [with_grpc="no (grpc++/grpc++.h not found)"])
+       CPPFLAGS="$SAVE_CPPFLAGS"
+       CXXFLAGS="$SAVE_CXXFLAGS"
+       AC_LANG_POP(C++)
+fi
+with_libgrpc="no"
+if test "x$with_grpc" = "xyes"
+then
+       AC_LANG_PUSH(C++)
+       AC_CHECK_LIB([grpc], [grpc_register_plugin],
+                       [with_libgrpc="yes"],
+                       [with_grpc="no (libgrpc not found)"],
+                       [-lgpr -lprotobuf])
+       AC_LANG_POP(C++)
+fi
+# }}}
+
 # --with-libiptc {{{
 AC_ARG_WITH(libiptc, [AS_HELP_STRING([--with-libiptc@<:@=PREFIX@:>@], [Path to libiptc.])],
 [
@@ -5394,7 +5480,7 @@ AC_DEFUN(
             then
                     enable_plugin="yes"
             else
-                    enable_plugin="no"
+                    enable_plugin="$2"
             fi
         else
             enable_plugin="$enable_all_plugins"
@@ -5411,7 +5497,7 @@ AC_DEFUN(
                    fi
            else # User passed "yes" but dependency checking yielded "no" => Dependency problem.
                    dependency_error="yes"
-                   enable_plugin="no (dependency error)"
+                   enable_plugin="$2 (dependency error)"
            fi
     fi
     AM_CONDITIONAL([BUILD_PLUGIN_]my_toupper([$1]), test "x$enable_plugin" = "xyes")
@@ -5871,6 +5957,7 @@ AC_PLUGIN([fhcount],             [$plugin_fhcount],         [File handles statis
 AC_PLUGIN([filecount],           [yes],                     [Count files in directories])
 AC_PLUGIN([fscache],             [$plugin_fscache],         [fscache statistics])
 AC_PLUGIN([gmond],               [$with_libganglia],        [Ganglia plugin])
+AC_PLUGIN([grpc],                [$with_grpc],              [gRPC plugin])
 AC_PLUGIN([hddtemp],             [yes],                     [Query hddtempd])
 AC_PLUGIN([interface],           [$plugin_interface],       [Interface traffic statistics])
 AC_PLUGIN([ipc],                 [$plugin_ipc],             [IPC statistics])
@@ -6128,13 +6215,16 @@ AC_SUBST(LCC_VERSION_STRING)
 AC_CONFIG_FILES(src/libcollectdclient/collectd/lcc_features.h)
 
 AM_CFLAGS="-Wall"
+AM_CXXFLAGS="-Wall"
 if test "x$enable_werror" != "xno"
 then
         AM_CFLAGS="$AM_CFLAGS -Werror"
+        AM_CXXFLAGS="$AM_CFLAGS -Werror"
 fi
 AC_SUBST([AM_CFLAGS])
+AC_SUBST([AM_CXXFLAGS])
 
-AC_CONFIG_FILES([Makefile src/Makefile src/daemon/Makefile src/collectd.conf src/libcollectdclient/Makefile src/libcollectdclient/libcollectdclient.pc src/liboconfig/Makefile bindings/Makefile bindings/java/Makefile])
+AC_CONFIG_FILES([Makefile proto/Makefile src/Makefile src/daemon/Makefile src/collectd.conf src/libcollectdclient/Makefile src/libcollectdclient/libcollectdclient.pc src/liboconfig/Makefile bindings/Makefile bindings/java/Makefile])
 AC_OUTPUT
 
 if test "x$with_librrd" = "xyes" \
@@ -6168,6 +6258,7 @@ Configuration:
     Platform  . . . . . . $ac_system
     CC  . . . . . . . . . $CC
     CFLAGS  . . . . . . . $AM_CFLAGS $CFLAGS
+    CXXFLAGS  . . . . . . $AM_CXXFLAGS $CXXFLAGS
     CPP . . . . . . . . . $CPP
     CPPFLAGS  . . . . . . $CPPFLAGS
     LD  . . . . . . . . . $LD
@@ -6184,6 +6275,7 @@ Configuration:
     libesmtp  . . . . . . $with_libesmtp
     libganglia  . . . . . $with_libganglia
     libgcrypt . . . . . . $with_libgcrypt
+    libgrpc . . . . . . . $with_libgrpc
     libhal  . . . . . . . $with_libhal
     libhiredis  . . . . . $with_libhiredis
     libi2c-dev  . . . . . $with_libi2c
@@ -6231,6 +6323,7 @@ Configuration:
     libyajl . . . . . . . $with_libyajl
     oracle  . . . . . . . $with_oracle
     protobuf-c  . . . . . $have_protoc_c
+    protoc 3  . . . . . . $have_protoc3
     python  . . . . . . . $with_python
 
   Features:
@@ -6274,6 +6367,7 @@ Configuration:
     filecount . . . . . . $enable_filecount
     fscache . . . . . . . $enable_fscache
     gmond . . . . . . . . $enable_gmond
+    grpc  . . . . . . . . $enable_grpc
     hddtemp . . . . . . . $enable_hddtemp
     interface . . . . . . $enable_interface
     ipc . . . . . . . . . $enable_ipc
diff --git a/proto/Makefile.am b/proto/Makefile.am
new file mode 100644 (file)
index 0000000..3c0bfd7
--- /dev/null
@@ -0,0 +1 @@
+EXTRA_DIST = collectd.proto types.proto
diff --git a/proto/collectd.proto b/proto/collectd.proto
new file mode 100644 (file)
index 0000000..84db755
--- /dev/null
@@ -0,0 +1,43 @@
+// collectd - proto/collectd.proto
+// Copyright (C) 2015 Sebastian Harl
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the "Software"),
+// to deal in the Software without restriction, including without limitation
+// the rights to use, copy, modify, merge, publish, distribute, sublicense,
+// and/or sell copies of the Software, and to permit persons to whom the
+// Software is furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+//
+// Authors:
+//   Sebastian Harl <sh at tokkee.org>
+
+syntax = "proto3";
+
+package collectd;
+
+import "types.proto";
+
+service Collectd {
+       // Dispatch collected values to collectd.
+       rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply);
+}
+
+// The arguments to DispatchValues.
+message DispatchValuesRequest {
+       collectd.types.ValueList values = 1;
+}
+
+// The response from DispatchValues.
+message DispatchValuesReply {
+}
diff --git a/proto/types.proto b/proto/types.proto
new file mode 100644 (file)
index 0000000..7f3d329
--- /dev/null
@@ -0,0 +1,52 @@
+// collectd - proto/types.proto
+// Copyright (C) 2015 Sebastian Harl
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the "Software"),
+// to deal in the Software without restriction, including without limitation
+// the rights to use, copy, modify, merge, publish, distribute, sublicense,
+// and/or sell copies of the Software, and to permit persons to whom the
+// Software is furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+//
+// Authors:
+//   Sebastian Harl <sh at tokkee.org>
+
+syntax = "proto3";
+
+package collectd.types;
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+message Value {
+       oneof value {
+               uint64 counter = 1;
+               double gauge = 2;
+               int64 derive = 3;
+               uint64 absolute = 4;
+       };
+}
+
+message ValueList {
+       repeated Value value = 1;
+
+       google.protobuf.Timestamp time = 2;
+       google.protobuf.Duration interval = 3;
+
+       string host = 4;
+       string plugin = 5;
+       string plugin_instance = 6;
+       string type = 7;
+       string type_instance = 8;
+}
index d54094f6760a4be86fcbc19bb689417e5c614178..b8b2575db61c6cb1ea8707eb632a51007b026e54 100644 (file)
@@ -17,6 +17,12 @@ endif
 AM_CPPFLAGS += -DPLUGINDIR='"${pkglibdir}"'
 AM_CPPFLAGS += -DPKGDATADIR='"${pkgdatadir}"'
 
+AUTOMAKE_OPTIONS = subdir-objects
+
+V_PROTOC = $(v_protoc_@AM_V@)
+v_protoc_ = $(v_protoc_@AM_DEFAULT_V@)
+v_protoc_0 = @echo "  PROTOC  " $@;
+
 noinst_LTLIBRARIES =
 check_PROGRAMS =
 TESTS =
@@ -106,6 +112,24 @@ pkglib_LTLIBRARIES =
 BUILT_SOURCES =
 CLEANFILES =
 
+if HAVE_PROTOC3
+if HAVE_GRPC_CPP
+BUILT_SOURCES += collectd.grpc.pb.cc collectd.pb.cc types.pb.cc
+CLEANFILES += collectd.grpc.pb.cc collectd.pb.cc types.pb.cc \
+               collectd.grpc.pb.h collectd.pb.h types.pb.h
+
+collectd.grpc.pb.cc: $(top_srcdir)/proto/collectd.proto $(top_srcdir)/proto/types.proto
+       $(V_PROTOC)@PROTOC@ -I$(top_srcdir)/proto \
+               --grpc_out=$(builddir) --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN) $<
+
+collectd.pb.cc: $(top_srcdir)/proto/collectd.proto $(top_srcdir)/proto/types.proto
+       $(V_PROTOC)@PROTOC@ -I$(top_srcdir)/proto --cpp_out=$(builddir) $<
+
+types.pb.cc: $(top_srcdir)/proto/types.proto
+       $(V_PROTOC)@PROTOC@ -I$(top_srcdir)/proto --cpp_out=$(builddir) $<
+endif
+endif
+
 if BUILD_PLUGIN_AGGREGATION
 pkglib_LTLIBRARIES += aggregation.la
 aggregation_la_SOURCES = aggregation.c \
@@ -407,6 +431,17 @@ gmond_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(GANGLIA_LDFLAGS)
 gmond_la_LIBADD = $(GANGLIA_LIBS)
 endif
 
+if BUILD_PLUGIN_GRPC
+pkglib_LTLIBRARIES += grpc.la
+grpc_la_SOURCES = grpc.cc \
+               collectd.grpc.pb.cc collectd.pb.cc types.pb.cc
+grpc_la_CPPFLAGS = $(AM_CPPFLAGS) -std=c++11
+grpc_la_CFLAGS = $(AM_CFLAGS)
+grpc_la_CXXFLAGS = $(AM_CXXFLAGS) -std=c++11
+grpc_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+grpc_la_LIBADD = -lgrpc++_unsecure -lgrpc -lgpr -lprotobuf -lpthread -ldl
+endif
+
 if BUILD_PLUGIN_HDDTEMP
 pkglib_LTLIBRARIES += hddtemp.la
 hddtemp_la_SOURCES = hddtemp.c
index 900218742532607d7d3f545d5b06e23c96b107ec..8fb29455d55798fbc6bd0d832dd6f54b3961845d 100644 (file)
@@ -271,7 +271,7 @@ typedef int _Bool;
 #endif
 
  #ifndef COLLECTD_USERAGENT
- # define COLLECTD_USERAGENT PACKAGE_NAME"/"PACKAGE_VERSION
+ # define COLLECTD_USERAGENT PACKAGE_NAME "/" PACKAGE_VERSION
  #endif
 
 /* Only enable __attribute__() for compilers known to support it. */
diff --git a/src/grpc.cc b/src/grpc.cc
new file mode 100644 (file)
index 0000000..eb9e4e8
--- /dev/null
@@ -0,0 +1,354 @@
+/**
+ * collectd - src/grpc.cc
+ * Copyright (C) 2015 Sebastian Harl
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Sebastian Harl <sh at tokkee.org>
+ **/
+
+#include <grpc++/grpc++.h>
+#include <google/protobuf/util/time_util.h>
+
+#include "collectd.grpc.pb.h"
+
+extern "C" {
+#include <stdbool.h>
+#include <pthread.h>
+
+#include "collectd.h"
+#include "common.h"
+#include "configfile.h"
+#include "plugin.h"
+}
+
+using google::protobuf::util::TimeUtil;
+
+/*
+ * proto conversion
+ */
+
+static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
+{
+       vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
+       vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
+
+       std::string s;
+
+       s = msg.host();
+       if (!s.length())
+               return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+                               grpc::string("missing host name"));
+       sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
+
+       s = msg.plugin();
+       if (!s.length())
+               return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+                               grpc::string("missing plugin name"));
+       sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
+
+       s = msg.type();
+       if (!s.length())
+               return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+                               grpc::string("missing type name"));
+       sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
+
+       s = msg.plugin_instance();
+       if (s.length())
+               sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
+
+       s = msg.type_instance();
+       if (s.length())
+               sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
+
+       value_t *values = NULL;
+       size_t values_len = 0;
+       auto status = grpc::Status::OK;
+
+       for (auto v : msg.value()) {
+               value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
+               if (!val) {
+                       status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
+                                       grpc::string("failed to allocate values array"));
+                       break;
+               }
+
+               values = val;
+               val = values + values_len;
+               values_len++;
+
+               switch (v.value_case()) {
+               case collectd::types::Value::ValueCase::kCounter:
+                       val->counter = counter_t(v.counter());
+                       break;
+               case collectd::types::Value::ValueCase::kGauge:
+                       val->gauge = gauge_t(v.gauge());
+                       break;
+               case collectd::types::Value::ValueCase::kDerive:
+                       val->derive = derive_t(v.derive());
+                       break;
+               case collectd::types::Value::ValueCase::kAbsolute:
+                       val->absolute = absolute_t(v.absolute());
+                       break;
+               default:
+                       status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+                                       grpc::string("unkown value type"));
+                       break;
+               }
+
+               if (!status.ok())
+                       break;
+       }
+       if (status.ok()) {
+               vl->values = values;
+               vl->values_len = values_len;
+       }
+       else if (values) {
+               free(values);
+       }
+
+       return status;
+} /* unmarshal_value_list() */
+
+/*
+ * request call objects
+ */
+
+class Call
+{
+public:
+       Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
+               : service_(service), cq_(cq), status_(CREATE)
+       { }
+
+       virtual ~Call()
+       { }
+
+       void Handle()
+       {
+               if (status_ == CREATE) {
+                       Create();
+                       status_ = PROCESS;
+               }
+               else if (status_ == PROCESS) {
+                       Process();
+                       status_ = FINISH;
+               }
+               else {
+                       GPR_ASSERT(status_ == FINISH);
+                       Finish();
+               }
+       } /* Handle() */
+
+protected:
+       virtual void Create() = 0;
+       virtual void Process() = 0;
+       virtual void Finish() = 0;
+
+       collectd::Collectd::AsyncService *service_;
+       grpc::ServerCompletionQueue *cq_;
+       grpc::ServerContext ctx_;
+
+private:
+       enum CallStatus { CREATE, PROCESS, FINISH };
+       CallStatus status_;
+}; /* class Call */
+
+class DispatchValuesCall : public Call
+{
+public:
+       DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
+               : Call(service, cq), responder_(&ctx_)
+       {
+               Handle();
+       } /* DispatchValuesCall() */
+
+       virtual ~DispatchValuesCall()
+       { }
+
+private:
+       void Create()
+       {
+               service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this);
+       } /* Create() */
+
+       void Process()
+       {
+               // Add a new request object to the queue.
+               new DispatchValuesCall(service_, cq_);
+
+               value_list_t vl = VALUE_LIST_INIT;
+               auto status = unmarshal_value_list(request_.values(), &vl);
+               if (!status.ok()) {
+                       responder_.Finish(reply_, status, this);
+                       return;
+               }
+
+               if (plugin_dispatch_values(&vl))
+                       status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                       grpc::string("failed to enqueue values for writing"));
+
+               responder_.Finish(reply_, status, this);
+       } /* Process() */
+
+       void Finish()
+       {
+               delete this;
+       } /* Finish() */
+
+       collectd::DispatchValuesRequest request_;
+       collectd::DispatchValuesReply reply_;
+
+       grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
+};
+
+/*
+ * gRPC server implementation
+ */
+
+class CollectdServer final
+{
+public:
+       void Start()
+       {
+               // TODO: make configurable
+               std::string addr("0.0.0.0:50051");
+
+               // TODO: make configurable
+               auto auth = grpc::InsecureServerCredentials();
+
+               grpc::ServerBuilder builder;
+               builder.AddListeningPort(addr, auth);
+               builder.RegisterAsyncService(&service_);
+               cq_ = builder.AddCompletionQueue();
+               server_ = builder.BuildAndStart();
+
+               INFO("grpc: Listening on %s", addr.c_str());
+       } /* Start() */
+
+       void Shutdown()
+       {
+               server_->Shutdown();
+               cq_->Shutdown();
+       } /* Shutdown() */
+
+       void Mainloop()
+       {
+               // Register request types.
+               new DispatchValuesCall(&service_, cq_.get());
+
+               while (true) {
+                       void *req = NULL;
+                       bool ok = false;
+
+                       if (!cq_->Next(&req, &ok))
+                               break; // Queue shut down.
+                       if (!ok) {
+                               ERROR("grpc: Failed to read from queue");
+                               break;
+                       }
+
+                       static_cast<Call *>(req)->Handle();
+               }
+       } /* Mainloop() */
+
+private:
+       collectd::Collectd::AsyncService service_;
+
+       std::unique_ptr<grpc::Server> server_;
+       std::unique_ptr<grpc::ServerCompletionQueue> cq_;
+}; /* class CollectdServer */
+
+static CollectdServer *server = nullptr;
+
+/*
+ * collectd plugin interface
+ */
+
+extern "C" {
+       static pthread_t *workers;
+       static size_t workers_num;
+
+       static void *worker_thread(void *arg)
+       {
+               CollectdServer *s = (CollectdServer *)arg;
+               s->Mainloop();
+               return NULL;
+       } /* worker_thread() */
+
+       static int c_grpc_init(void)
+       {
+               server = new CollectdServer();
+               size_t i;
+
+               if (! server) {
+                       ERROR("grpc: Failed to create server");
+                       return -1;
+               }
+
+               workers = (pthread_t *)calloc(5, sizeof(*workers));
+               if (! workers) {
+                       delete server;
+                       server = nullptr;
+
+                       ERROR("grpc: Failed to allocate worker threads");
+                       return -1;
+               }
+               workers_num = 5;
+
+               server->Start();
+               for (i = 0; i < workers_num; i++) {
+                       pthread_create(&workers[i], /* attr = */ NULL,
+                                       worker_thread, server);
+               }
+               INFO("grpc: Started %zu workers", workers_num);
+               return 0;
+       } /* c_grpc_init() */
+
+       static int c_grpc_shutdown(void)
+       {
+               size_t i;
+
+               if (!server)
+                       return -1;
+
+               server->Shutdown();
+
+               INFO("grpc: Waiting for %zu workers to terminate", workers_num);
+               for (i = 0; i < workers_num; i++)
+                       pthread_join(workers[i], NULL);
+               free(workers);
+               workers = NULL;
+               workers_num = 0;
+
+               delete server;
+               server = nullptr;
+
+               return 0;
+       } /* c_grpc_shutdown() */
+
+       void module_register(void)
+       {
+               plugin_register_init("grpc", c_grpc_init);
+               plugin_register_shutdown("grpc", c_grpc_shutdown);
+       } /* module_register() */
+} /* extern "C" */
+
+/* vim: set sw=4 ts=4 tw=78 noexpandtab : */