diff --git a/src/write_http.c b/src/write_http.c
index 27337813affbc45dc6d9728f41f3427c78e3ac05..41615d3f0ccbd0be9c1cefba46708a4ace10e680 100644 (file)
--- a/src/write_http.c
+++ b/src/write_http.c
**/
#include "collectd.h"
+
#include "plugin.h"
#include "common.h"
-#include "utils_cache.h"
#include "utils_format_json.h"
-
-#if HAVE_PTHREAD_H
-# include <pthread.h>
-#endif
+#include "utils_format_kairosdb.h"
#include <curl/curl.h>
time_t low_speed_time;
int timeout;
-#define WH_FORMAT_COMMAND 0
-#define WH_FORMAT_JSON 1
+#define WH_FORMAT_COMMAND 0
+#define WH_FORMAT_JSON 1
+#define WH_FORMAT_KAIROSDB 2
int format;
+ _Bool send_metrics;
+ _Bool send_notifications;
CURL *curl;
struct curl_slist *headers;
static void wh_reset_buffer (wh_callback_t *cb) /* {{{ */
{
+ if ((cb == NULL) || (cb->send_buffer == NULL))
+ return;
+
memset (cb->send_buffer, 0, cb->send_buffer_size);
cb->send_buffer_free = cb->send_buffer_size;
cb->send_buffer_fill = 0;
cb->send_buffer_init_time = cdtime ();
- if (cb->format == WH_FORMAT_JSON)
+ if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB)
{
format_json_initialize (cb->send_buffer,
&cb->send_buffer_fill,
}
} /* }}} wh_reset_buffer */
-static int wh_send_buffer (wh_callback_t *cb) /* {{{ */
+/* must hold cb->send_lock when calling */
+static int wh_post_nolock (wh_callback_t *cb, char const *data) /* {{{ */
{
int status = 0;
- curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, cb->send_buffer);
+ curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, data);
status = curl_easy_perform (cb->curl);
wh_log_http_error (cb);
status, cb->curl_errbuf);
}
return (status);
-} /* }}} wh_send_buffer */
+} /* }}} wh_post_nolock */
static int wh_callback_init (wh_callback_t *cb) /* {{{ */
{
curl_easy_setopt (cb->curl, CURLOPT_USERAGENT, COLLECTD_USERAGENT);
cb->headers = curl_slist_append (cb->headers, "Accept: */*");
- if (cb->format == WH_FORMAT_JSON)
+ if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB)
cb->headers = curl_slist_append (cb->headers, "Content-Type: application/json");
else
cb->headers = curl_slist_append (cb->headers, "Content-Type: text/plain");
if (cb->pass != NULL)
credentials_size += strlen (cb->pass);
- cb->credentials = (char *) malloc (credentials_size);
+ cb->credentials = malloc (credentials_size);
if (cb->credentials == NULL)
{
ERROR ("curl plugin: malloc failed.");
if (cb->format == WH_FORMAT_COMMAND)
{
- if (cb->send_buffer_fill <= 0)
+ if (cb->send_buffer_fill == 0)
{
cb->send_buffer_init_time = cdtime ();
return (0);
}
- status = wh_send_buffer (cb);
+ status = wh_post_nolock (cb, cb->send_buffer);
wh_reset_buffer (cb);
}
- else if (cb->format == WH_FORMAT_JSON)
+ else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB)
{
if (cb->send_buffer_fill <= 2)
{
return (status);
}
- status = wh_send_buffer (cb);
+ status = wh_post_nolock (cb, cb->send_buffer);
wh_reset_buffer (cb);
}
else
pthread_mutex_lock (&cb->send_lock);
- if (cb->curl == NULL)
+ if (wh_callback_init (cb) != 0)
{
- status = wh_callback_init (cb);
- if (status != 0)
- {
- ERROR ("write_http plugin: wh_callback_init failed.");
- pthread_mutex_unlock (&cb->send_lock);
- return (-1);
- }
+ ERROR ("write_http plugin: wh_callback_init failed.");
+ pthread_mutex_unlock (&cb->send_lock);
+ return (-1);
}
status = wh_flush_nolock (timeout, cb);
cb = data;
- wh_flush_nolock (/* timeout = */ 0, cb);
+ if (cb->send_buffer != NULL)
+ wh_flush_nolock (/* timeout = */ 0, cb);
if (cb->curl != NULL)
{
@@ -361,7 +362,11 @@ static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{
int status;
- if (0 != strcmp (ds->type, vl->type)) {
+ /* sanity checks, primarily to make static analyzers happy. */
+ if ((cb == NULL) || (cb->send_buffer == NULL))
+ return -1;
+
+ if (strcmp (ds->type, vl->type) == 0) {
ERROR ("write_http plugin: DS type does not match "
"value list type");
return -1;
@@ -396,16 +401,11 @@ static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{
}
pthread_mutex_lock (&cb->send_lock);
-
- if (cb->curl == NULL)
+ if (wh_callback_init (cb) != 0)
{
- status = wh_callback_init (cb);
- if (status != 0)
- {
- ERROR ("write_http plugin: wh_callback_init failed.");
- pthread_mutex_unlock (&cb->send_lock);
- return (-1);
- }
+ ERROR ("write_http plugin: wh_callback_init failed.");
+ pthread_mutex_unlock (&cb->send_lock);
+ return (-1);
}
if (command_len >= cb->send_buffer_free)
@@ -443,6 +443,55 @@ static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ *
{
int status;
+ pthread_mutex_lock (&cb->send_lock);
+ if (wh_callback_init (cb) != 0)
+ {
+ ERROR ("write_http plugin: wh_callback_init failed.");
+ pthread_mutex_unlock (&cb->send_lock);
+ return (-1);
+ }
+
+ status = format_json_value_list (cb->send_buffer,
+ &cb->send_buffer_fill,
+ &cb->send_buffer_free,
+ ds, vl, cb->store_rates);
+ if (status == -ENOMEM)
+ {
+ status = wh_flush_nolock (/* timeout = */ 0, cb);
+ if (status != 0)
+ {
+ wh_reset_buffer (cb);
+ pthread_mutex_unlock (&cb->send_lock);
+ return (status);
+ }
+
+ status = format_json_value_list (cb->send_buffer,
+ &cb->send_buffer_fill,
+ &cb->send_buffer_free,
+ ds, vl, cb->store_rates);
+ }
+ if (status != 0)
+ {
+ pthread_mutex_unlock (&cb->send_lock);
+ return (status);
+ }
+
+ DEBUG ("write_http plugin: <%s> buffer %zu/%zu (%g%%)",
+ cb->location,
+ cb->send_buffer_fill, cb->send_buffer_size,
+ 100.0 * ((double) cb->send_buffer_fill) / ((double) cb->send_buffer_size));
+
+ /* Check if we have enough space for this command. */
+ pthread_mutex_unlock (&cb->send_lock);
+
+ return (0);
+} /* }}} int wh_write_json */
+
+static int wh_write_kairosdb (const data_set_t *ds, const value_list_t *vl, /* {{{ */
+ wh_callback_t *cb)
+{
+ int status;
+
pthread_mutex_lock (&cb->send_lock);
if (cb->curl == NULL)
@@ -456,11 +505,11 @@ static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ *
}
}
- status = format_json_value_list (cb->send_buffer,
+ status = format_kairosdb_value_list (cb->send_buffer,
&cb->send_buffer_fill,
&cb->send_buffer_free,
ds, vl, cb->store_rates);
- if (status == (-ENOMEM))
+ if (status == -ENOMEM)
{
status = wh_flush_nolock (/* timeout = */ 0, cb);
if (status != 0)
@@ -470,7 +519,7 @@ static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ *
return (status);
}
- status = format_json_value_list (cb->send_buffer,
+ status = format_kairosdb_value_list (cb->send_buffer,
&cb->send_buffer_fill,
&cb->send_buffer_free,
ds, vl, cb->store_rates);
@@ -490,7 +539,7 @@ static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ *
pthread_mutex_unlock (&cb->send_lock);
return (0);
-} /* }}} int wh_write_json */
+} /* }}} int wh_write_kairosdb */
static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
user_data_t *user_data)
return (-EINVAL);
cb = user_data->data;
+ assert (cb->send_metrics);
- if (cb->format == WH_FORMAT_JSON)
+ switch(cb->format) {
+ case WH_FORMAT_JSON:
status = wh_write_json (ds, vl, cb);
- else
+ break;
+ case WH_FORMAT_KAIROSDB:
+ status = wh_write_kairosdb (ds, vl, cb);
+ break;
+ default:
status = wh_write_command (ds, vl, cb);
-
+ break;
+ }
return (status);
} /* }}} int wh_write */
+static int wh_notify (notification_t const *n, user_data_t *ud) /* {{{ */
+{
+ wh_callback_t *cb;
+ char alert[4096];
+ int status;
+
+ if ((ud == NULL) || (ud->data == NULL))
+ return (EINVAL);
+
+ cb = ud->data;
+ assert (cb->send_notifications);
+
+ status = format_json_notification (alert, sizeof (alert), n);
+ if (status != 0)
+ {
+ ERROR ("write_http plugin: formatting notification failed");
+ return status;
+ }
+
+ pthread_mutex_lock (&cb->send_lock);
+ if (wh_callback_init (cb) != 0)
+ {
+ ERROR ("write_http plugin: wh_callback_init failed.");
+ pthread_mutex_unlock (&cb->send_lock);
+ return (-1);
+ }
+
+ status = wh_post_nolock (cb, alert);
+ pthread_mutex_unlock (&cb->send_lock);
+
+ return (status);
+} /* }}} int wh_notify */
+
static int config_set_format (wh_callback_t *cb, /* {{{ */
oconfig_item_t *ci)
{
cb->format = WH_FORMAT_COMMAND;
else if (strcasecmp ("JSON", string) == 0)
cb->format = WH_FORMAT_JSON;
+ else if (strcasecmp ("KAIROSDB", string) == 0)
+ cb->format = WH_FORMAT_KAIROSDB;
else
{
ERROR ("write_http plugin: Invalid format string: %s",
{
wh_callback_t *cb;
int buffer_size = 0;
- user_data_t user_data;
char callback_name[DATA_MAX_NAME_LEN];
int status = 0;
- int i;
- cb = malloc (sizeof (*cb));
+ cb = calloc (1, sizeof (*cb));
if (cb == NULL)
{
- ERROR ("write_http plugin: malloc failed.");
+ ERROR ("write_http plugin: calloc failed.");
return (-1);
}
- memset (cb, 0, sizeof (*cb));
cb->verify_peer = 1;
cb->verify_host = 1;
cb->format = WH_FORMAT_COMMAND;
cb->timeout = 0;
cb->log_http_error = 0;
cb->headers = NULL;
-
+ cb->send_metrics = 1;
+ cb->send_notifications = 0;
pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
if (strcasecmp ("URL", ci->key) == 0)
cf_util_get_string (ci, &cb->location);
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;
}
else if (strcasecmp ("Format", child->key) == 0)
status = config_set_format (cb, child);
+ else if (strcasecmp ("Metrics", child->key) == 0)
+ cf_util_get_boolean (child, &cb->send_metrics);
+ else if (strcasecmp ("Notifications", child->key) == 0)
+ cf_util_get_boolean (child, &cb->send_notifications);
else if (strcasecmp ("StoreRates", child->key) == 0)
status = cf_util_get_boolean (child, &cb->store_rates);
else if (strcasecmp ("BufferSize", child->key) == 0)
return (-1);
}
+ if (!cb->send_metrics && !cb->send_notifications)
+ {
+ ERROR ("write_http plugin: Neither metrics nor notifications "
+ "are enabled for \"%s\".", cb->name);
+ wh_callback_free (cb);
+ return (-1);
+ }
+
if (cb->low_speed_limit > 0)
cb->low_speed_time = CDTIME_T_TO_TIME_T(plugin_get_interval());
DEBUG ("write_http: Registering write callback '%s' with URL '%s'",
callback_name, cb->location);
- memset (&user_data, 0, sizeof (user_data));
- user_data.data = cb;
- user_data.free_func = NULL;
+ user_data_t user_data = {
+ .data = cb
+ };
+
plugin_register_flush (callback_name, wh_flush, &user_data);
user_data.free_func = wh_callback_free;
- plugin_register_write (callback_name, wh_write, &user_data);
+
+ if (cb->send_metrics)
+ {
+ plugin_register_write (callback_name, wh_write, &user_data);
+ user_data.free_func = NULL;
+
+ plugin_register_flush (callback_name, wh_flush, &user_data);
+ }
+
+ if (cb->send_notifications)
+ {
+ plugin_register_notification (callback_name, wh_notify, &user_data);
+ user_data.free_func = NULL;
+ }
return (0);
} /* }}} int wh_config_node */
static int wh_config (oconfig_item_t *ci) /* {{{ */
{
- int i;
-
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;