summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 677a95a)
raw | patch | inline | side by side (parent: 677a95a)
author | Sebastian Harl <sh@tokkee.org> | |
Fri, 6 May 2016 22:15:49 +0000 (00:15 +0200) | ||
committer | Sebastian Harl <sh@tokkee.org> | |
Mon, 30 May 2016 21:44:19 +0000 (23:44 +0200) |
This is a more powerful function to query values from collectd's cache.
proto/collectd.proto | patch | blob | history | |
proto/types.proto | patch | blob | history | |
src/grpc.cc | patch | blob | history |
diff --git a/proto/collectd.proto b/proto/collectd.proto
index ba60793b66f1e1e564c1a23918f828fa2c72f050..608fcbb1657e1476d5ca705e18b8c43abe1c54af 100644 (file)
--- a/proto/collectd.proto
+++ b/proto/collectd.proto
// collectd - proto/collectd.proto
-// Copyright (C) 2015 Sebastian Harl
+// Copyright (C) 2015-2016 Sebastian Harl
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// Dispatch collected values to collectd.
rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply);
- // Retrieve a list of all values available in collectd's value cache.
- rpc ListValues(ListValuesRequest) returns (ListValuesReply);
+ // Query a list of values available from collectd's value cache.
+ rpc QueryValues(QueryValuesRequest) returns (QueryValuesReply);
}
// The arguments to DispatchValues.
message DispatchValuesReply {
}
-// The arguments to ListValues.
-message ListValuesRequest {
+// The arguments to QueryValues.
+message QueryValuesRequest {
+ // Query by the fields of the identifier. Only return values matching the
+ // specified shell wildcard patterns (see fnmatch(3)). Use '*' to match
+ // any value.
+ collectd.types.Identifier identifier = 1;
}
-// The response from ListValues.
-message ListValuesReply {
- message Value {
- string name = 1;
- google.protobuf.Timestamp time = 2;
- }
-
- repeated Value value = 1;
+// The response from QueryValues.
+message QueryValuesReply {
+ repeated collectd.types.ValueList values = 1;
}
diff --git a/proto/types.proto b/proto/types.proto
index 6e6714b6ce8aec1dd1a7c29ab81607b0841322a6..4a852e42de37b19fca66a58f5ba586f6f4c9afbc 100644 (file)
--- a/proto/types.proto
+++ b/proto/types.proto
// collectd - proto/types.proto
-// Copyright (C) 2015 Sebastian Harl
+// Copyright (C) 2015-2016 Sebastian Harl
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
diff --git a/src/grpc.cc b/src/grpc.cc
index 45fb02902ad0434a1c4487741fe3d7f3e766208d..4e47098629364b4b06806fa8fc808a5be9e83005 100644 (file)
--- a/src/grpc.cc
+++ b/src/grpc.cc
/**
* collectd - src/grpc.cc
- * Copyright (C) 2015 Sebastian Harl
+ * Copyright (C) 2015-2016 Sebastian Harl
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
#include "collectd.grpc.pb.h"
extern "C" {
+#include <fnmatch.h>
#include <stdbool.h>
#include <pthread.h>
using collectd::DispatchValuesRequest;
using collectd::DispatchValuesReply;
-using collectd::ListValuesRequest;
-using collectd::ListValuesReply;
+using collectd::QueryValuesRequest;
+using collectd::QueryValuesReply;
using google::protobuf::util::TimeUtil;
+/*
+ * helper functions
+ */
+
+static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
+{
+ if (fnmatch(matcher->host, vl->host, 0))
+ return false;
+
+ if (fnmatch(matcher->plugin, vl->plugin, 0))
+ return false;
+ if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
+ return false;
+
+ if (fnmatch(matcher->type, vl->type, 0))
+ return false;
+ if (fnmatch(matcher->type_instance, vl->type_instance, 0))
+ return false;
+
+ return true;
+} /* ident_matches */
+
/*
* proto conversion
*/
-static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl)
+static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
+{
+ msg->set_host(vl->host);
+ msg->set_plugin(vl->plugin);
+ if (vl->plugin_instance[0] != '\0')
+ msg->set_plugin_instance(vl->plugin_instance);
+ msg->set_type(vl->type);
+ if (vl->type_instance[0] != '\0')
+ msg->set_type_instance(vl->type_instance);
+} /* marshal_ident */
+
+static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
+ bool require_fields)
{
std::string s;
s = msg.host();
- if (!s.length())
+ if (!s.length() && require_fields)
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
grpc::string("missing host name"));
sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
s = msg.plugin();
- if (!s.length())
+ if (!s.length() && require_fields)
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
grpc::string("missing plugin name"));
sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
s = msg.type();
- if (!s.length())
+ if (!s.length() && require_fields)
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
grpc::string("missing type name"));
sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
@@ -93,19 +128,59 @@ static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, valu
return grpc::Status::OK;
} /* unmarshal_ident() */
+static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
+{
+ auto id = msg->mutable_identifier();
+ marshal_ident(vl, id);
+
+ auto ds = plugin_get_ds(vl->type);
+ if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve data-set for values"));
+ }
+
+ auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
+ auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
+ msg->set_allocated_time(new google::protobuf::Timestamp(t));
+ msg->set_allocated_interval(new google::protobuf::Duration(d));
+
+ for (size_t i = 0; i < vl->values_len; ++i) {
+ auto v = msg->add_value();
+ switch (ds->ds[i].type) {
+ case DS_TYPE_COUNTER:
+ v->set_counter(vl->values[i].counter);
+ break;
+ case DS_TYPE_GAUGE:
+ v->set_gauge(vl->values[i].gauge);
+ break;
+ case DS_TYPE_DERIVE:
+ v->set_derive(vl->values[i].derive);
+ break;
+ case DS_TYPE_ABSOLUTE:
+ v->set_absolute(vl->values[i].absolute);
+ break;
+ default:
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("unknown value type"));
+ }
+ }
+
+ return grpc::Status::OK;
+} /* marshal_value_list */
+
static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
{
vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
- auto status = unmarshal_ident(msg.identifier(), vl);
+ auto status = unmarshal_ident(msg.identifier(), vl, true);
if (!status.ok())
return status;
value_t *values = NULL;
size_t values_len = 0;
- status = grpc::Status::OK;
+ status = grpc::Status::OK;
for (auto v : msg.value()) {
value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
if (!val) {
} /* Process(): DispatchValues */
static grpc::Status Process(grpc::ServerContext *ctx,
- ListValuesRequest request, ListValuesReply *reply)
+ QueryValuesRequest request, QueryValuesReply *reply)
{
- char **names = NULL;
- cdtime_t *times = NULL;
- size_t i, n = 0;
+ uc_iter_t *iter;
+ char *name = NULL;
+
+ value_list_t matcher;
+ auto status = unmarshal_ident(request.identifier(), &matcher, false);
+ if (!status.ok())
+ return status;
- if (uc_get_names(&names, ×, &n))
+ if ((iter = uc_get_iterator()) == NULL) {
return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve values"));
-
- for (i = 0; i < n; i++) {
- auto v = reply->add_value();
- auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
- v->set_name(names[i]);
- v->set_allocated_time(new google::protobuf::Timestamp(t));
- sfree(names[i]);
+ grpc::string("failed to query values: cannot create iterator"));
}
- sfree(names);
- sfree(times);
+
+ while (uc_iterator_next(iter, &name) == 0) {
+ value_list_t res;
+ if (parse_identifier_vl(name, &res) != 0)
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to parse identifier"));
+
+ if (!ident_matches(&res, &matcher))
+ continue;
+
+ if (uc_iterator_get_time(iter, &res.time) < 0)
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value timestamp"));
+ if (uc_iterator_get_interval(iter, &res.interval) < 0)
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value interval"));
+ if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0)
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve values"));
+
+ auto vl = reply->add_values();
+ status = marshal_value_list(&res, vl);
+ free(res.values);
+ if (!status.ok())
+ return status;
+ }
+
+ uc_iterator_destroy(iter);
return grpc::Status::OK;
-} /* Process(): ListValues */
+} /* Process(): QueryValues */
class Call
{
// Register request types.
new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
&Collectd::AsyncService::RequestDispatchValues, cq_.get());
- new RpcCall<ListValuesRequest, ListValuesReply>(&service_,
- &Collectd::AsyncService::RequestListValues, cq_.get());
+ new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
+ &Collectd::AsyncService::RequestQueryValues, cq_.get());
while (true) {
void *req = NULL;