X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fwrite_riemann.c;h=2936dfa46f5ab91f158d3092baa164cdeab39140;hb=bed961ab910f1cf5d96d1e60af0227431a38bbf3;hp=f7b0388d9e37c3fecadc9c37e6ad710102d006c5;hpb=d555842142065378f041c2ea29e808386c463a16;p=collectd.git diff --git a/src/write_riemann.c b/src/write_riemann.c index f7b0388d..2936dfa4 100644 --- a/src/write_riemann.c +++ b/src/write_riemann.c @@ -37,6 +37,7 @@ #include "common.h" #include "configfile.h" #include "utils_cache.h" +#include "utils_complain.h" #include "write_riemann_threshold.h" #define RIEMANN_HOST "localhost" @@ -45,6 +46,7 @@ #define RIEMANN_BATCH_MAX 8192 struct riemann_host { + c_complain_t init_complaint; char *name; char *event_service_prefix; pthread_mutex_t lock; @@ -60,11 +62,13 @@ struct riemann_host { double ttl_factor; cdtime_t batch_init; int batch_max; + int batch_timeout; int reference_count; riemann_message_t *batch_msg; char *tls_ca_file; char *tls_cert_file; char *tls_key_file; + struct timeval timeout; }; static char **riemann_tags; @@ -92,12 +96,25 @@ static int wrr_connect(struct riemann_host *host) /* {{{ */ RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE); if (host->client == NULL) { - WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d", - node, port); + c_complain (LOG_ERR, &host->init_complaint, + "write_riemann plugin: Unable to connect to Riemann at %s:%d", + node, port); return -1; } - DEBUG("write_riemann plugin: got a successful connection for: %s:%d", - node, port); + if (host->timeout.tv_sec != 0) { + if (riemann_client_set_timeout(host->client, &host->timeout) != 0) { + riemann_client_free(host->client); + host->client = NULL; + c_complain (LOG_ERR, &host->init_complaint, + "write_riemann plugin: Unable to connect to Riemann at %s:%d", + node, port); + return -1; + } + } + + c_release (LOG_INFO, &host->init_complaint, + "write_riemann plugin: Successfully connected to %s:%d", + node, port); return 0; } /* }}} int wrr_connect */ @@ -119,19 +136,18 @@ static int wrr_disconnect(struct riemann_host *host) /* {{{ */ * * Acquires the host lock, disconnects on errors. */ -static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */ +static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */ { int status = 0; - pthread_mutex_lock (&host->lock); status = wrr_connect(host); - if (status != 0) + if (status != 0) { return status; + } status = riemann_client_send_message(host->client, msg); if (status != 0) { wrr_disconnect(host); - pthread_mutex_unlock(&host->lock); return status; } @@ -147,16 +163,24 @@ static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */ if (response == NULL) { wrr_disconnect(host); - pthread_mutex_unlock(&host->lock); return errno; } riemann_message_free(response); } - pthread_mutex_unlock (&host->lock); return 0; } /* }}} int wrr_send */ +static int wrr_send(struct riemann_host *host, riemann_message_t *msg) +{ + int status = 0; + + pthread_mutex_lock (&host->lock); + status = wrr_send_nolock(host, msg); + pthread_mutex_unlock (&host->lock); + return status; +} + static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */ notification_t const *n) { @@ -187,35 +211,25 @@ static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, RIEMANN_EVENT_FIELD_NONE); if (n->host[0] != 0) - riemann_event_attribute_add(event, - riemann_attribute_create("host", n->host)); + riemann_event_string_attribute_add(event, "host", n->host); if (n->plugin[0] != 0) - riemann_event_attribute_add(event, - riemann_attribute_create("plugin", n->plugin)); + riemann_event_string_attribute_add(event, "plugin", n->plugin); if (n->plugin_instance[0] != 0) - riemann_event_attribute_add(event, - riemann_attribute_create("plugin_instance", - n->plugin_instance)); + riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance); if (n->type[0] != 0) - riemann_event_attribute_add(event, - riemann_attribute_create("type", n->type)); + riemann_event_string_attribute_add(event, "type", n->type); if (n->type_instance[0] != 0) - riemann_event_attribute_add(event, - riemann_attribute_create("type_instance", - n->type_instance)); + riemann_event_string_attribute_add(event, "type_instance", n->type_instance); for (i = 0; i < riemann_attrs_num; i += 2) - riemann_event_attribute_add(event, - riemann_attribute_create(riemann_attrs[i], - riemann_attrs[i +1])); + riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]); for (i = 0; i < riemann_tags_num; i++) riemann_event_tag_add(event, riemann_tags[i]); if (n->message[0] != 0) - riemann_event_attribute_add(event, - riemann_attribute_create("description", n->message)); + riemann_event_string_attribute_add(event, "description", n->message); /* Pull in values from threshold and add extra attributes */ for (meta = n->meta; meta != NULL; meta = meta->next) @@ -230,9 +244,7 @@ static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, } if (meta->type == NM_TYPE_STRING) { - riemann_event_attribute_add(event, - riemann_attribute_create(meta->name, - meta->nm_value.nm_string)); + riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string); continue; } } @@ -294,10 +306,10 @@ static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* { RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor, - RIEMANN_EVENT_FIELD_ATTRIBUTES, - riemann_attribute_create("plugin", vl->plugin), - riemann_attribute_create("type", vl->type), - riemann_attribute_create("ds_name", ds->ds[index].name), + RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, + "plugin", vl->plugin, + "type", vl->type, + "ds_name", ds->ds[index].name, NULL, RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE); @@ -325,13 +337,9 @@ static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* { } if (vl->plugin_instance[0] != 0) - riemann_event_attribute_add(event, - riemann_attribute_create("plugin_instance", - vl->plugin_instance)); + riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance); if (vl->type_instance[0] != 0) - riemann_event_attribute_add(event, - riemann_attribute_create("type_instance", - vl->type_instance)); + riemann_event_string_attribute_add(event, "type_instance", vl->type_instance); if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) { @@ -339,28 +347,23 @@ static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* { ssnprintf(ds_type, sizeof(ds_type), "%s:rate", DS_TYPE_TO_STRING(ds->ds[index].type)); - riemann_event_attribute_add(event, - riemann_attribute_create("ds_type", ds_type)); + riemann_event_string_attribute_add(event, "ds_type", ds_type); } else { - riemann_event_attribute_add(event, - riemann_attribute_create("ds_type", - DS_TYPE_TO_STRING(ds->ds[index].type))); + riemann_event_string_attribute_add(event, "ds_type", + DS_TYPE_TO_STRING(ds->ds[index].type)); } { char ds_index[DATA_MAX_NAME_LEN]; ssnprintf(ds_index, sizeof(ds_index), "%zu", index); - riemann_event_attribute_add(event, - riemann_attribute_create("ds_index", ds_index)); + riemann_event_string_attribute_add(event, "ds_index", ds_index); } for (i = 0; i < riemann_attrs_num; i += 2) - riemann_event_attribute_add(event, - riemann_attribute_create(riemann_attrs[i], - riemann_attrs[i +1])); + riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]); for (i = 0; i < riemann_tags_num; i++) riemann_event_tag_add(event, riemann_tags[i]); @@ -458,30 +461,16 @@ static int wrr_batch_flush_nolock(cdtime_t timeout, cdtime_t now; int status = 0; + now = cdtime(); if (timeout > 0) { - now = cdtime(); - if ((host->batch_init + timeout) > now) + if ((host->batch_init + timeout) > now) { return status; + } } - wrr_send(host, host->batch_msg); + wrr_send_nolock(host, host->batch_msg); riemann_message_free(host->batch_msg); - if (host->client_type != RIEMANN_CLIENT_UDP) - { - riemann_message_t *response; - - response = riemann_client_recv_message(host->client); - - if (!response) - { - wrr_disconnect(host); - return errno; - } - - riemann_message_free(response); - } - - host->batch_init = cdtime(); + host->batch_init = now; host->batch_msg = NULL; return status; } @@ -500,8 +489,11 @@ static int wrr_batch_flush(cdtime_t timeout, pthread_mutex_lock(&host->lock); status = wrr_batch_flush_nolock(timeout, host); if (status != 0) - ERROR("write_riemann plugin: riemann_client_send failed with status %i", - status); + c_complain (LOG_ERR, &host->init_complaint, + "write_riemann plugin: riemann_client_send failed with status %i", + status); + else + c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent."); pthread_mutex_unlock(&host->lock); return status; @@ -515,6 +507,7 @@ static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */ riemann_message_t *msg; size_t len; int ret; + cdtime_t timeout; msg = wrr_value_list_to_message(host, ds, vl, statuses); if (msg == NULL) @@ -542,11 +535,16 @@ static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */ } } - len = protobuf_c_message_get_packed_size((const ProtobufCMessage*)(host->batch_msg)); + len = riemann_message_get_packed_size(host->batch_msg); ret = 0; if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) { ret = wrr_batch_flush_nolock(0, host); - } + } else { + if (host->batch_timeout > 0) { + timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout); + ret = wrr_batch_flush_nolock(timeout, host); + } + } pthread_mutex_unlock(&host->lock); return ret; @@ -570,8 +568,12 @@ static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */ status = wrr_send(host, msg); if (status != 0) - ERROR("write_riemann plugin: riemann_client_send failed with status %i", - status); + c_complain (LOG_ERR, &host->init_complaint, + "write_riemann plugin: riemann_client_send failed with status %i", + status); + else + c_release (LOG_DEBUG, &host->init_complaint, + "write_riemann plugin: riemann_client_send succeeded"); riemann_message_free(msg); return (status); @@ -595,16 +597,13 @@ static int wrr_write(const data_set_t *ds, /* {{{ */ } if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) { - wrr_batch_add_value_list(host, ds, vl, statuses); + wrr_batch_add_value_list(host, ds, vl, statuses); } else { msg = wrr_value_list_to_message(host, ds, vl, statuses); if (msg == NULL) return (-1); status = wrr_send(host, msg); - if (status != 0) - ERROR("write_riemann plugin: riemann_client_send failed with status %i", - status); riemann_message_free(msg); } @@ -647,6 +646,7 @@ static int wrr_config_node(oconfig_item_t *ci) /* {{{ */ return ENOMEM; } pthread_mutex_init(&host->lock, NULL); + C_COMPLAIN_INIT (&host->init_complaint); host->reference_count = 1; host->node = NULL; host->port = 0; @@ -657,9 +657,12 @@ static int wrr_config_node(oconfig_item_t *ci) /* {{{ */ host->batch_mode = 1; host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */ host->batch_init = cdtime(); + host->batch_timeout = 0; host->ttl_factor = RIEMANN_TTL_FACTOR; host->client = NULL; host->client_type = RIEMANN_CLIENT_TCP; + host->timeout.tv_sec = 0; + host->timeout.tv_usec = 0; status = cf_util_get_string(ci, &host->name); if (status != 0) { @@ -700,6 +703,14 @@ static int wrr_config_node(oconfig_item_t *ci) /* {{{ */ status = cf_util_get_int(child, &host->batch_max); if (status != 0) break; + } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) { + status = cf_util_get_int(child, &host->batch_timeout); + if (status != 0) + break; + } else if (strcasecmp("Timeout", child->key) == 0) { + status = cf_util_get_int(child, (int *)&host->timeout.tv_sec); + if (status != 0) + break; } else if (strcasecmp("Port", child->key) == 0) { host->port = cf_util_get_port_number(child); if (host->port == -1) {