summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 7a66c29)
raw | patch | inline | side by side (parent: 7a66c29)
author | Florian Forster <octo@collectd.org> | |
Fri, 21 Nov 2014 11:05:39 +0000 (12:05 +0100) | ||
committer | Florian Forster <octo@collectd.org> | |
Mon, 6 Jul 2015 12:07:10 +0000 (14:07 +0200) |
Also adds support for authentication and configuring a couple of settings
(QoS, rates, retention).
(QoS, rates, retention).
src/mqtt.c | patch | blob | history |
diff --git a/src/mqtt.c b/src/mqtt.c
index 163bd4ad8a49994d8149fb540de7ad8bb14646f4..457d7bd84910a946bab5f28026a70299eae752b4 100644 (file)
--- a/src/mqtt.c
+++ b/src/mqtt.c
*/
struct mqtt_client_conf
{
- struct mosquitto *mosq;
+ char *name;
+
+ struct mosquitto *mosq;
_Bool connected;
- char *host;
+
+ char *host;
int port;
- char *client_id;
- char *topic_prefix;
+ char *username;
+ char *password;
+
+ char *client_id;
+ char *topic_prefix;
+ _Bool store_rates;
+ _Bool retain;
+ int qos;
+
c_complain_t complaint_cantpublish;
pthread_mutex_t lock;
};
return "UNKNOWN ERROR CODE";
}
+static void mqtt_free (mqtt_client_conf_t *conf)
+{
+ if (conf == NULL)
+ return;
+
+ if (conf->connected)
+ (void) mosquitto_disconnect (conf->mosq);
+ conf->connected = 0;
+ (void) mosquitto_destroy (conf->mosq);
+
+ sfree (conf->host);
+ sfree (conf->username);
+ sfree (conf->password);
+ sfree (conf->client_id);
+ sfree (conf->topic_prefix);
+ sfree (conf);
+}
+
/*
* Functions
*/
static int publish (mqtt_client_conf_t *conf, char const *topic,
void const *payload, size_t payload_len)
{
- int const qos = 0; /* TODO: Config option */
int status;
pthread_mutex_lock (&conf->lock);
/* message id */ NULL,
topic,
(uint32_t) payload_len, payload,
- /* qos */ qos,
- /* retain */ false);
+ /* qos */ conf->qos,
+ /* retain */ conf->retain);
if (status != MOSQ_ERR_SUCCESS)
{
char errbuf[1024];
char topic[MQTT_MAX_TOPIC_SIZE];
char payload[MQTT_MAX_MESSAGE_SIZE];
int status = 0;
- _Bool const store_rates = 0; /* TODO: Config option */
if ((user_data == NULL) || (user_data->data == NULL))
return (EINVAL);
}
status = format_values (payload, sizeof (payload),
- ds, vl, store_rates);
+ ds, vl, conf->store_rates);
if (status != 0)
{
ERROR ("mqtt plugin: format_values failed with status %d.", status);
} /* mqtt_write */
/*
- * <Plugin mqtt>
+ * <Publish "name">
* Host "example.com"
* Port 1883
* Prefix "collectd"
* ClientId "collectd"
- * </Plugin>
+ * User "guest"
+ * Password "secret"
+ * StoreRates true
+ * Retain false
+ * QoS 0
+ * </Publish>
*/
-static int mqtt_config (oconfig_item_t *ci)
+static int mqtt_config_broker (oconfig_item_t *ci)
{
mqtt_client_conf_t *conf;
user_data_t user_data;
return (-1);
}
- conf->connected = 0;
+ conf->name = NULL;
+ status = cf_util_get_string (ci, &conf->name);
+ if (status != 0)
+ {
+ mqtt_free (conf);
+ return (status);
+ }
+
conf->host = strdup (MQTT_DEFAULT_HOST);
conf->port = MQTT_DEFAULT_PORT;
conf->client_id = strdup (MQTT_DEFAULT_CLIENT_ID);
{
int tmp = cf_util_get_port_number (child);
if (tmp < 0)
- {
ERROR ("mqtt plugin: Invalid port number.");
- continue;
- }
- conf->port = tmp;
+ else
+ conf->port = tmp;
}
else if (strcasecmp ("Prefix", child->key) == 0)
cf_util_get_string (child, &conf->topic_prefix);
else if (strcasecmp ("ClientId", child->key) == 0)
cf_util_get_string (child, &conf->client_id);
+ else if (strcasecmp ("User", child->key) == 0)
+ cf_util_get_string (child, &conf->username);
+ else if (strcasecmp ("Password", child->key) == 0)
+ cf_util_get_string (child, &conf->password);
+ else if (strcasecmp ("StoreRates", child->key) == 0)
+ cf_util_get_boolean (child, &conf->store_rates);
+ else if (strcasecmp ("Retain", child->key) == 0)
+ cf_util_get_boolean (child, &conf->retain);
+ else if (strcasecmp ("QoS", child->key) == 0)
+ {
+ int tmp = -1;
+ status = cf_util_get_int (child, &tmp);
+ if ((status != 0) || (tmp < 0) || (tmp > 2))
+ ERROR ("mqtt plugin: Not a valid QoS setting.");
+ else
+ conf->qos = tmp;
+ }
else
ERROR ("mqtt plugin: Unknown config option: %s", child->key);
}
if (conf->mosq == NULL)
{
ERROR ("mqtt plugin: mosquitto_new failed");
- free (conf);
+ mqtt_free (conf);
return (-1);
}
+ if (conf->username && conf->password)
+ {
+ status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
+ if (status != MOSQ_ERR_SUCCESS)
+ {
+ char errbuf[1024];
+ ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror (errno, errbuf, sizeof (errbuf))
+ : mosquitto_strerror (status));
+ mqtt_free (conf);
+ return (-1);
+ }
+ }
+
status = mosquitto_connect (conf->mosq, conf->host, conf->port,
/* keepalive = */ 10, /* clean session = */ 1);
if (status != MOSQ_ERR_SUCCESS)
(status == MOSQ_ERR_ERRNO)
? sstrerror (errno, errbuf, sizeof (errbuf))
: mosquitto_strerror (status));
- free (conf);
+ mqtt_free (conf);
return (-1);
}
plugin_register_write ("mqtt", mqtt_write, &user_data);
return (0);
-} /* mqtt_config */
+} /* mqtt_config_broker */
+
+/*
+ * <Plugin mqtt>
+ * <Publish "name">
+ * # ...
+ * </Publish>
+ * </Plugin>
+ */
+static int mqtt_config (oconfig_item_t *ci)
+{
+ int i;
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Publish", child->key) == 0)
+ mqtt_config_broker (child);
+ else
+ ERROR ("mqtt plugin: Unknown config option: %s", child->key);
+ }
+
+ return (0);
+} /* int mqtt_config */
static int mqtt_init (void)
{