summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 96179cb)
raw | patch | inline | side by side (parent: 96179cb)
author | Florian Forster <octo@collectd.org> | |
Thu, 20 Nov 2014 17:25:21 +0000 (18:25 +0100) | ||
committer | Florian Forster <octo@collectd.org> | |
Mon, 6 Jul 2015 12:07:10 +0000 (14:07 +0200) |
The payload now includes all data sources and the timestamp. The payload
is formatted with the existing format_values() function, which removed
quite a couple of lines of code.
is formatted with the existing format_values() function, which removed
quite a couple of lines of code.
src/mqtt.c | patch | blob | history |
diff --git a/src/mqtt.c b/src/mqtt.c
index 5c844a760d5bcc393c7ae9bfade22cb001c356f8..ef46e529375c9d3171562f5c1332c9541aa19dbb 100644 (file)
--- a/src/mqtt.c
+++ b/src/mqtt.c
return (0);
} /* mqtt_publish_message */
-static int mqtt_format_metric_value (char *buf, size_t buf_len,
- const data_set_t *data_set, const value_list_t *vl, int ds_num)
+static int format_topic (char *buf, size_t buf_len,
+ data_set_t const *ds, value_list_t const *vl,
+ mqtt_client_conf_t *conf)
{
- gauge_t *rates = NULL;
- gauge_t *value = NULL;
- size_t metric_value_len;
- int status = 0;
-
- memset (buf, 0, buf_len);
-
- if (data_set->ds[ds_num].type == DS_TYPE_GAUGE)
- value = &vl->values[ds_num].gauge;
- else {
- rates = uc_get_rate (data_set, vl);
- value = &rates[ds_num];
- }
-
- metric_value_len = ssnprintf (buf, buf_len, "%f", *value);
-
- if (metric_value_len >= buf_len)
- return (-ENOMEM);
-
- if (rates)
- sfree (rates);
-
- return (status);
-} /* mqtt_format_metric_value */
-
-static int mqtt_format_message_topic (char *buf, size_t buf_len,
- char const *prefix, const value_list_t *vl, const char *ds_name)
-{
- size_t topic_buf_len;
-
- memset (buf, 0, buf_len);
-
- /*
- MQTT message topic format:
- [<prefix>/]<hostname>/<plugin>/<plugin instance>/<type>/<type instance>/<ds>/
- */
- topic_buf_len = (size_t) ssnprintf (buf, buf_len,
- "%s/%s/%s/%s/%s/%s/%s",
- prefix,
- vl->host,
- vl->plugin,
- vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "(null)",
- vl->type,
- vl->type_instance[0] != '\0' ? vl->type_instance : "(null)",
- ds_name);
-
- if (topic_buf_len >= buf_len)
- {
- ERROR ("mqtt_format_message_topic: topic buffer too small: "
- "Need %zu bytes.", topic_buf_len + 1);
- return (-ENOMEM);
- }
-
- return (0);
-} /* mqtt_format_message_topic */
-
-static int mqtt_format_payload (char *buf, size_t buf_len,
- const data_set_t *data_set, const value_list_t *vl, int ds_num)
-{
- char metric_path[10 * DATA_MAX_NAME_LEN];
- char metric_value[512];
- size_t payload_buf_len;
- int status = 0;
+ char name[MQTT_MAX_TOPIC_SIZE];
+ int status;
- memset (buf, 0, buf_len);
-
- ssnprintf (metric_path, sizeof (metric_path),
- "%s.%s%s%s.%s%s%s%s%s",
- vl->host,
- vl->plugin,
- vl->plugin_instance[0] != '\0' ? "." : "",
- vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "",
- vl->type,
- vl->type_instance[0] != '\0' ? "." : "",
- vl->type_instance[0] != '\0' ? vl->type_instance : "",
- strcmp(data_set->ds[ds_num].name, "value") != 0 ? "." : "",
- strcmp(data_set->ds[ds_num].name, "value") != 0 ?
- data_set->ds[ds_num].name : "");
-
- status = mqtt_format_metric_value (metric_value,
- sizeof (metric_value),
- data_set,
- vl,
- ds_num);
+ if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
+ return (FORMAT_VL (buf, buf_len, vl));
+ status = FORMAT_VL (name, sizeof (name), vl);
if (status != 0)
- {
- ERROR ("mqtt_format_payload: error with mqtt_format_metric_value");
return (status);
- }
- payload_buf_len = (size_t) ssnprintf (buf, buf_len,
- "%s %s %u",
- metric_path,
- metric_value,
- (unsigned int) CDTIME_T_TO_TIME_T (vl->time));
+ status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name);
+ if ((status < 0) || (((size_t) status) >= buf_len))
+ return (ENOMEM);
- if (payload_buf_len >= buf_len)
- {
- ERROR ("mqtt_format_payload: payload buffer too small: "
- "Need %zu bytes.", payload_buf_len + 1);
- return (-ENOMEM);
- }
-
- return (status);
-} /* mqtt_format_payload */
+ return (0);
+} /* int format_topic */
-static int mqtt_write (const data_set_t *data_set, const value_list_t *vl,
+static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
user_data_t *user_data)
{
- struct mqtt_client_conf *conf;
- char msg_topic[MQTT_MAX_TOPIC_SIZE];
- char msg_payload[MQTT_MAX_MESSAGE_SIZE];
+ mqtt_client_conf_t *conf;
+ char topic[MQTT_MAX_TOPIC_SIZE];
+ char payload[MQTT_MAX_MESSAGE_SIZE];
int status = 0;
- int i;
+ _Bool const store_rates = 0; /* TODO: Config option */
- if (user_data == NULL)
+ if ((user_data == NULL) || (user_data->data == NULL))
return (EINVAL);
-
conf = user_data->data;
- if (!conf->connected)
+ status = format_topic (topic, sizeof (topic), ds, vl, conf);
{
- status = mqtt_reconnect_broker (conf);
+ ERROR ("mqtt plugin: format_topic failed with status %d.", status);
+ return (status);
+ }
- if (status != 0) {
- ERROR ("plugin mqtt: unable to reconnect to broker");
- return (status);
- }
+ status = format_values (payload, sizeof (payload),
+ ds, vl, store_rates);
+ if (status != 0)
+ {
+ ERROR ("mqtt plugin: format_values failed with status %d.", status);
+ return (status);
}
- for (i = 0; i < data_set->ds_num; i++)
+ status = publish (conf, topic, payload, sizeof (payload));
+ if (status != 0)
{
- status = mqtt_format_message_topic (msg_topic, sizeof (msg_topic),
- conf->topic_prefix, vl, data_set->ds[i].name);
- if (status != 0)
- {
- ERROR ("plugin mqtt: error with mqtt_format_message_topic");
- return (status);
- }
-
- status = mqtt_format_payload (msg_payload,
- sizeof (msg_payload),
- data_set,
- vl,
- i);
-
- if (status != 0)
- {
- ERROR ("mqtt_write: error with mqtt_format_payload");
- return (status);
- }
-
- status = mqtt_publish_message (conf,
- msg_topic,
- msg_payload,
- sizeof (msg_payload));
- if (status != 0)
- {
- ERROR ("plugin mqtt: unable to publish message");
- return (status);
- }
-
- DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_write[%02X]\x1B[0m "
- "published message: topic=%s payload=%s",
- (unsigned)pthread_self(),
- msg_topic,
- msg_payload);
+ ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status));
+ return (status);
}
return (status);
static int mqtt_config (oconfig_item_t *ci)
{
- struct mqtt_client_conf *conf;
+ mqtt_client_conf_t *conf;
user_data_t user_data;
- char errbuf[1024];
int status;
- DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_config[%02X]\x1B[0m ",
- (unsigned)pthread_self());
-
- conf = malloc (sizeof (*conf));
+ conf = calloc (1, sizeof (*conf));
if (conf == NULL)
{
- ERROR ("write_mqtt plugin: malloc failed.");
+ ERROR ("mqtt plugin: malloc failed.");
return (-1);
}
- memset (conf, 0, sizeof (*conf));
-
conf->connected = false;
conf->host = MQTT_DEFAULT_HOST;
conf->port = MQTT_DEFAULT_PORT;
status = mosquitto_connect (conf->mosq, conf->host, conf->port,
/* keepalive = */ 10, /* clean session = */ 1);
- if (status != MOSQ_ERR_SUCCESS) {
- ERROR ("mqtt_config: mosquitto_connect failed: %s",
- (status == MOSQ_ERR_ERRNO ?
- sstrerror(errno, errbuf, sizeof (errbuf)) :
- mosquitto_strerror (status)));
+ if (status != MOSQ_ERR_SUCCESS)
+ {
+ char errbuf[1024];
+ ERROR ("mqtt plugin: mosquitto_connect failed: %s",
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror (errno, errbuf, sizeof (errbuf))
+ : mosquitto_strerror (status));
+ free (conf);
return (-1);
}