Code

grpc plugin: Make listeners and worker threads configurable.
authorSebastian Harl <sh@tokkee.org>
Thu, 29 Oct 2015 22:15:46 +0000 (23:15 +0100)
committerSebastian Harl <sh@tokkee.org>
Mon, 30 May 2016 21:44:19 +0000 (23:44 +0200)
src/grpc.cc

index eb9e4e85a25feaaec369697fd3b073aad203cb9d..9523fc2513dd2cf6804a2fdec8ab2029bbeb04c8 100644 (file)
@@ -37,6 +37,14 @@ extern "C" {
 #include "common.h"
 #include "configfile.h"
 #include "plugin.h"
+
+       typedef struct {
+               char *addr;
+               char *port;
+       } listener_t;
+
+       static listener_t *listeners;
+       static size_t listeners_num;
 }
 
 using google::protobuf::util::TimeUtil;
@@ -228,19 +236,30 @@ class CollectdServer final
 public:
        void Start()
        {
-               // TODO: make configurable
-               std::string addr("0.0.0.0:50051");
-
                // TODO: make configurable
                auto auth = grpc::InsecureServerCredentials();
 
                grpc::ServerBuilder builder;
-               builder.AddListeningPort(addr, auth);
+
+               if (!listeners_num) {
+                       std::string default_addr("0.0.0.0:50051");
+                       builder.AddListeningPort(default_addr, auth);
+                       INFO("grpc: Listening on %s", default_addr.c_str());
+               }
+               else {
+                       size_t i;
+                       for (i = 0; i < listeners_num; i++) {
+                               auto l = listeners[i];
+                               std::string addr(l.addr);
+                               addr += std::string(":") + std::string(l.port);
+                               builder.AddListeningPort(addr, auth);
+                               INFO("grpc: Listening on %s", addr.c_str());
+                       }
+               }
+
                builder.RegisterAsyncService(&service_);
                cq_ = builder.AddCompletionQueue();
                server_ = builder.BuildAndStart();
-
-               INFO("grpc: Listening on %s", addr.c_str());
        } /* Start() */
 
        void Shutdown()
@@ -284,7 +303,7 @@ static CollectdServer *server = nullptr;
 
 extern "C" {
        static pthread_t *workers;
-       static size_t workers_num;
+       static size_t workers_num = 5;
 
        static void *worker_thread(void *arg)
        {
@@ -293,6 +312,66 @@ extern "C" {
                return NULL;
        } /* worker_thread() */
 
+       static int c_grpc_config_listen(oconfig_item_t *ci)
+       {
+               listener_t *listener;
+               int i;
+
+               if ((ci->values_num != 2)
+                               || (ci->values[0].type != OCONFIG_TYPE_STRING)
+                               || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
+                       ERROR("grpc: The `%s` config option needs exactly "
+                                       "two string argument (address and port).", ci->key);
+                       return -1;
+               }
+
+               listener = (listener_t *)realloc(listeners,
+                               (listeners_num + 1) * sizeof(*listeners));
+               if (!listener) {
+                       ERROR("grpc: Failed to allocate listeners");
+                       return -1;
+               }
+               listeners = listener;
+               listener = listeners + listeners_num;
+               listeners_num++;
+
+               listener->addr = strdup(ci->values[0].value.string);
+               listener->port = strdup(ci->values[1].value.string);
+
+               for (i = 0; i < ci->children_num; i++) {
+                       oconfig_item_t *child = ci->children + i;
+                       WARNING("grpc: Option `%s` not allowed in <%s> block.",
+                                       child->key, ci->key);
+               }
+
+               return 0;
+       } /* c_grpc_config_listen() */
+
+       static int c_grpc_config(oconfig_item_t *ci)
+       {
+               int i;
+
+               for (i = 0; i < ci->children_num; i++) {
+                       oconfig_item_t *child = ci->children + i;
+
+                       if (!strcasecmp("Listen", child->key)) {
+                               if (c_grpc_config_listen(child))
+                                       return -1;
+                       }
+                       else if (!strcasecmp("WorkerThreads", child->key)) {
+                               int n;
+                               if (cf_util_get_int(child, &n))
+                                       return -1;
+                               workers_num = (size_t)n;
+                       }
+                       else {
+                               WARNING("grpc: Option `%s` not allowed here.", child->key);
+                       }
+               }
+
+               return 0;
+       } /* c_grpc_config() */
+
        static int c_grpc_init(void)
        {
                server = new CollectdServer();
@@ -303,7 +382,7 @@ extern "C" {
                        return -1;
                }
 
-               workers = (pthread_t *)calloc(5, sizeof(*workers));
+               workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
                if (! workers) {
                        delete server;
                        server = nullptr;
@@ -311,7 +390,6 @@ extern "C" {
                        ERROR("grpc: Failed to allocate worker threads");
                        return -1;
                }
-               workers_num = 5;
 
                server->Start();
                for (i = 0; i < workers_num; i++) {
@@ -346,6 +424,7 @@ extern "C" {
 
        void module_register(void)
        {
+               plugin_register_complex_config("grpc", c_grpc_config);
                plugin_register_init("grpc", c_grpc_init);
                plugin_register_shutdown("grpc", c_grpc_shutdown);
        } /* module_register() */