diff --git a/src/write_redis.c b/src/write_redis.c
index 5dd674cf24796b4c78de42378a92dccf981be0b6..135a458785ba55da955ea5c3c206a1467c3f4269 100644 (file)
--- a/src/write_redis.c
+++ b/src/write_redis.c
/**
* collectd - src/write_redis.c
- * Copyright (C) 2010 Florian Forster
+ * Copyright (C) 2010-2015 Florian Forster
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
**/
#include "collectd.h"
+
#include "plugin.h"
#include "common.h"
#include "configfile.h"
-#include <pthread.h>
-#include <credis.h>
+#include <sys/time.h>
+#include <hiredis/hiredis.h>
+
+#ifndef REDIS_DEFAULT_PREFIX
+# define REDIS_DEFAULT_PREFIX "collectd/"
+#endif
struct wr_node_s
{
char *host;
int port;
- int timeout;
+ struct timeval timeout;
+ char *prefix;
+ int database;
+ int max_set_size;
+ _Bool store_rates;
- REDIS conn;
+ redisContext *conn;
pthread_mutex_t lock;
};
typedef struct wr_node_s wr_node_t;
wr_node_t *node = ud->data;
char ident[512];
char key[512];
- char value[512];
+ char value[512] = { 0 };
+ char time[24];
size_t value_size;
char *value_ptr;
int status;
+ redisReply *rr;
status = FORMAT_VL (ident, sizeof (ident), vl);
if (status != 0)
return (status);
- ssnprintf (key, sizeof (key), "collectd/%s", ident);
+ ssnprintf (key, sizeof (key), "%s%s",
+ (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
+ ident);
+ ssnprintf (time, sizeof (time), "%.9f", CDTIME_T_TO_DOUBLE(vl->time));
- memset (value, 0, sizeof (value));
value_size = sizeof (value);
value_ptr = &value[0];
-
- status = format_values (value_ptr, value_size, ds, vl, /* store rates = */ 0);
+ status = format_values (value_ptr, value_size, ds, vl, node->store_rates);
if (status != 0)
return (status);
if (node->conn == NULL)
{
- node->conn = credis_connect (node->host, node->port, node->timeout);
+ node->conn = redisConnectWithTimeout ((char *)node->host, node->port, node->timeout);
if (node->conn == NULL)
{
- ERROR ("write_redis plugin: Connecting to host \"%s\" (port %i) failed.",
+ ERROR ("write_redis plugin: Connecting to host \"%s\" (port %i) failed: Unkown reason",
(node->host != NULL) ? node->host : "localhost",
(node->port != 0) ? node->port : 6379);
pthread_mutex_unlock (&node->lock);
return (-1);
}
+ else if (node->conn->err)
+ {
+ ERROR ("write_redis plugin: Connecting to host \"%s\" (port %i) failed: %s",
+ (node->host != NULL) ? node->host : "localhost",
+ (node->port != 0) ? node->port : 6379,
+ node->conn->errstr);
+ pthread_mutex_unlock (&node->lock);
+ return (-1);
+ }
+
+ rr = redisCommand(node->conn, "SELECT %d", node->database);
+ if (rr == NULL)
+ WARNING("SELECT command error. database:%d message:%s", node->database, node->conn->errstr);
+ else
+ freeReplyObject (rr);
}
- /* "credis_zadd" doesn't handle a NULL pointer gracefully, so I'd rather
- * have a meaningful assertion message than a normal segmentation fault. */
- assert (node->conn != NULL);
- status = credis_zadd (node->conn, key, (double) vl->time, value);
+ rr = redisCommand (node->conn, "ZADD %s %s %s", key, time, value);
+ if (rr == NULL)
+ WARNING("ZADD command error. key:%s message:%s", key, node->conn->errstr);
+ else
+ freeReplyObject (rr);
- credis_sadd (node->conn, "collectd/values", ident);
+ if (node->max_set_size >= 0)
+ {
+ rr = redisCommand (node->conn, "ZREMRANGEBYRANK %s %d %d", key, 0, (-1 * node->max_set_size) - 1);
+ if (rr == NULL)
+ WARNING("ZREMRANGEBYRANK command error. key:%s message:%s", key, node->conn->errstr);
+ else
+ freeReplyObject (rr);
+ }
+
+ /* TODO(octo): This is more overhead than necessary. Use the cache and
+ * metadata to determine if it is a new metric and call SADD only once for
+ * each metric. */
+ rr = redisCommand (node->conn, "SADD %svalues %s",
+ (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
+ ident);
+ if (rr==NULL)
+ WARNING("SADD command error. ident:%s message:%s", ident, node->conn->errstr);
+ else
+ freeReplyObject (rr);
pthread_mutex_unlock (&node->lock);
if (node->conn != NULL)
{
- credis_close (node->conn);
+ redisFree (node->conn);
node->conn = NULL;
}
static int wr_config_node (oconfig_item_t *ci) /* {{{ */
{
wr_node_t *node;
+ int timeout;
int status;
- int i;
- node = malloc (sizeof (*node));
+ node = calloc (1, sizeof (*node));
if (node == NULL)
return (ENOMEM);
- memset (node, 0, sizeof (*node));
node->host = NULL;
node->port = 0;
- node->timeout = 1000;
+ node->timeout.tv_sec = 0;
+ node->timeout.tv_usec = 1000;
node->conn = NULL;
+ node->prefix = NULL;
+ node->database = 0;
+ node->max_set_size = -1;
+ node->store_rates = 1;
pthread_mutex_init (&node->lock, /* attr = */ NULL);
status = cf_util_get_string_buffer (ci, node->name, sizeof (node->name));
return (status);
}
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;
status = 0;
}
}
- else if (strcasecmp ("Timeout", child->key) == 0)
- status = cf_util_get_int (child, &node->timeout);
+ else if (strcasecmp ("Timeout", child->key) == 0) {
+ status = cf_util_get_int (child, &timeout);
+ if (status == 0) node->timeout.tv_usec = timeout;
+ }
+ else if (strcasecmp ("Prefix", child->key) == 0) {
+ status = cf_util_get_string (child, &node->prefix);
+ }
+ else if (strcasecmp ("Database", child->key) == 0) {
+ status = cf_util_get_int (child, &node->database);
+ }
+ else if (strcasecmp ("MaxSetSize", child->key) == 0) {
+ status = cf_util_get_int (child, &node->max_set_size);
+ }
+ else if (strcasecmp ("StoreRates", child->key) == 0) {
+ status = cf_util_get_boolean (child, &node->store_rates);
+ }
else
WARNING ("write_redis plugin: Ignoring unknown config option \"%s\".",
child->key);
static int wr_config (oconfig_item_t *ci) /* {{{ */
{
- int i;
-
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;