From: Sebastian Harl Date: Thu, 29 Oct 2015 22:15:46 +0000 (+0100) Subject: grpc plugin: Make listeners and worker threads configurable. X-Git-Tag: collectd-5.6.0~246^2~19 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=1faf439320a1a875cbbd44179aec6641d0b7854b;p=collectd.git grpc plugin: Make listeners and worker threads configurable. --- diff --git a/src/grpc.cc b/src/grpc.cc index eb9e4e85..9523fc25 100644 --- a/src/grpc.cc +++ b/src/grpc.cc @@ -37,6 +37,14 @@ extern "C" { #include "common.h" #include "configfile.h" #include "plugin.h" + + typedef struct { + char *addr; + char *port; + } listener_t; + + static listener_t *listeners; + static size_t listeners_num; } using google::protobuf::util::TimeUtil; @@ -228,19 +236,30 @@ class CollectdServer final public: void Start() { - // TODO: make configurable - std::string addr("0.0.0.0:50051"); - // TODO: make configurable auto auth = grpc::InsecureServerCredentials(); grpc::ServerBuilder builder; - builder.AddListeningPort(addr, auth); + + if (!listeners_num) { + std::string default_addr("0.0.0.0:50051"); + builder.AddListeningPort(default_addr, auth); + INFO("grpc: Listening on %s", default_addr.c_str()); + } + else { + size_t i; + for (i = 0; i < listeners_num; i++) { + auto l = listeners[i]; + std::string addr(l.addr); + addr += std::string(":") + std::string(l.port); + builder.AddListeningPort(addr, auth); + INFO("grpc: Listening on %s", addr.c_str()); + } + } + builder.RegisterAsyncService(&service_); cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); - - INFO("grpc: Listening on %s", addr.c_str()); } /* Start() */ void Shutdown() @@ -284,7 +303,7 @@ static CollectdServer *server = nullptr; extern "C" { static pthread_t *workers; - static size_t workers_num; + static size_t workers_num = 5; static void *worker_thread(void *arg) { @@ -293,6 +312,66 @@ extern "C" { return NULL; } /* worker_thread() */ + static int c_grpc_config_listen(oconfig_item_t *ci) + { + listener_t *listener; + int i; + + if ((ci->values_num != 2) + || (ci->values[0].type != OCONFIG_TYPE_STRING) + || (ci->values[1].type != OCONFIG_TYPE_STRING)) { + ERROR("grpc: The `%s` config option needs exactly " + "two string argument (address and port).", ci->key); + return -1; + } + + listener = (listener_t *)realloc(listeners, + (listeners_num + 1) * sizeof(*listeners)); + if (!listener) { + ERROR("grpc: Failed to allocate listeners"); + return -1; + } + listeners = listener; + listener = listeners + listeners_num; + listeners_num++; + + listener->addr = strdup(ci->values[0].value.string); + listener->port = strdup(ci->values[1].value.string); + + for (i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + WARNING("grpc: Option `%s` not allowed in <%s> block.", + child->key, ci->key); + } + + return 0; + } /* c_grpc_config_listen() */ + + static int c_grpc_config(oconfig_item_t *ci) + { + int i; + + for (i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (!strcasecmp("Listen", child->key)) { + if (c_grpc_config_listen(child)) + return -1; + } + else if (!strcasecmp("WorkerThreads", child->key)) { + int n; + if (cf_util_get_int(child, &n)) + return -1; + workers_num = (size_t)n; + } + else { + WARNING("grpc: Option `%s` not allowed here.", child->key); + } + } + + return 0; + } /* c_grpc_config() */ + static int c_grpc_init(void) { server = new CollectdServer(); @@ -303,7 +382,7 @@ extern "C" { return -1; } - workers = (pthread_t *)calloc(5, sizeof(*workers)); + workers = (pthread_t *)calloc(workers_num, sizeof(*workers)); if (! workers) { delete server; server = nullptr; @@ -311,7 +390,6 @@ extern "C" { ERROR("grpc: Failed to allocate worker threads"); return -1; } - workers_num = 5; server->Start(); for (i = 0; i < workers_num; i++) { @@ -346,6 +424,7 @@ extern "C" { void module_register(void) { + plugin_register_complex_config("grpc", c_grpc_config); plugin_register_init("grpc", c_grpc_init); plugin_register_shutdown("grpc", c_grpc_shutdown); } /* module_register() */