summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 5d2f373)
raw | patch | inline | side by side (parent: 5d2f373)
author | Florian Forster <octo@collectd.org> | |
Fri, 21 Nov 2014 16:41:17 +0000 (17:41 +0100) | ||
committer | Florian Forster <octo@collectd.org> | |
Mon, 6 Jul 2015 12:07:11 +0000 (14:07 +0200) |
Publisher and subscriber should now be able to send metrics to one another.
src/mqtt.c | patch | blob | history |
diff --git a/src/mqtt.c b/src/mqtt.c
index bfa2fdb9f4ec69eef00c42a565b7539f885c3582..7a7ba59dfb57b06d850ebb5f13619c5291b5f8c2 100644 (file)
--- a/src/mqtt.c
+++ b/src/mqtt.c
#define MQTT_DEFAULT_HOST "localhost"
#define MQTT_DEFAULT_PORT 1883
#define MQTT_DEFAULT_TOPIC_PREFIX "collectd"
+#define MQTT_DEFAULT_TOPIC "collectd/#"
/*
* Data types
*/
struct mqtt_client_conf
{
+ _Bool publish;
char *name;
struct mosquitto *mosq;
char *host;
int port;
+ char *client_id;
char *username;
char *password;
+ int qos;
- char *client_id;
+ /* For publishing */
char *topic_prefix;
_Bool store_rates;
_Bool retain;
- int qos;
+
+ /* For subscribing */
+ pthread_t thread;
+ _Bool loop;
+ char *topic;
+ _Bool clean_session;
c_complain_t complaint_cantpublish;
pthread_mutex_t lock;
};
typedef struct mqtt_client_conf mqtt_client_conf_t;
+static mqtt_client_conf_t **subscribers = NULL;
+static size_t subscribers_num = 0;
+
+/*
+ * Functions
+ */
static char const *mosquitto_strerror (int code)
{
switch (code)
sfree (conf);
}
-/*
- * Functions
- */
+static char *strip_prefix (char *topic)
+{
+ size_t num;
+ size_t i;
+
+ num = 0;
+ for (i = 0; topic[i] != 0; i++)
+ if (topic[i] == '/')
+ num++;
+
+ if (num < 2)
+ return (NULL);
+
+ while (num > 2)
+ {
+ char *tmp = strchr (topic, '/');
+ if (tmp == NULL)
+ return (NULL);
+ topic = tmp + 1;
+ num--;
+ }
+
+ return (topic);
+}
+
+static void on_message (__attribute__((unused)) void *arg,
+ const struct mosquitto_message *msg)
+{
+ value_list_t vl = VALUE_LIST_INIT;
+ data_set_t const *ds;
+ char *topic;
+ char *name;
+ char *payload;
+ int status;
+
+ if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0))
+ return;
+
+ topic = strdup (msg->topic);
+ name = strip_prefix (topic);
+
+ status = parse_identifier_vl (name, &vl);
+ if (status != 0)
+ {
+ ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic);
+ sfree (topic);
+ return;
+ }
+ sfree (topic);
+
+ ds = plugin_get_ds (vl.type);
+ if (ds == NULL)
+ {
+ ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type);
+ return;
+ }
+
+ vl.values = calloc (ds->ds_num, sizeof (*vl.values));
+ if (vl.values == NULL)
+ {
+ ERROR ("mqtt plugin: calloc failed.");
+ return;
+ }
+ vl.values_len = ds->ds_num;
+
+ payload = strdup ((void *) msg->payload);
+ DEBUG ("mqtt plugin: payload = \"%s\"", payload);
+ status = parse_values (payload, &vl, ds);
+ if (status != 0)
+ {
+ ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload);
+ sfree (payload);
+ sfree (vl.values);
+ return;
+ }
+ sfree (payload);
+
+ plugin_dispatch_values (&vl);
+ sfree (vl.values);
+} /* void on_message */
+
/* must hold conf->lock when calling. */
static int mqtt_reconnect (mqtt_client_conf_t *conf)
{
}
status = mosquitto_connect (conf->mosq, conf->host, conf->port,
- /* keepalive = */ 10, /* clean session = */ 1);
+ /* keepalive = */ 10, /* clean session = */ conf->clean_session);
if (status != MOSQ_ERR_SUCCESS)
{
char errbuf[1024];
return (-1);
}
+ if (!conf->publish)
+ {
+ mosquitto_message_callback_set (conf->mosq, on_message);
+
+ status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL,
+ conf->topic, conf->qos);
+ if (status != MOSQ_ERR_SUCCESS)
+ {
+ ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s",
+ conf->topic, mosquitto_strerror (status));
+
+ mosquitto_disconnect (conf->mosq);
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+ }
+
conf->connected = 1;
return (0);
} /* mqtt_connect */
+static void *subscribers_thread (void *arg)
+{
+ mqtt_client_conf_t *conf = arg;
+ int status;
+
+ conf->loop = 1;
+
+ while (conf->loop)
+ {
+ status = mqtt_connect (conf);
+ if (status != 0)
+ {
+ sleep (1);
+ continue;
+ }
+
+ /* The documentation says "0" would map to the default (1000ms), but
+ * that does not work on some versions. */
+ status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
+ if (status == MOSQ_ERR_CONN_LOST)
+ {
+ conf->connected = 0;
+ continue;
+ }
+ else if (status != MOSQ_ERR_SUCCESS)
+ {
+ ERROR ("mqtt plugin: mosquitto_loop failed: %s",
+ mosquitto_strerror (status));
+ mosquitto_destroy (conf->mosq);
+ conf->mosq = NULL;
+ conf->connected = 0;
+ continue;
+ }
+
+ DEBUG ("mqtt plugin: mosquitto_loop succeeded.");
+ } /* while (conf->loop) */
+
+ pthread_exit (0);
+} /* void *subscribers_thread */
+
static int publish (mqtt_client_conf_t *conf, char const *topic,
void const *payload, size_t payload_len)
{
* <Publish "name">
* Host "example.com"
* Port 1883
- * Prefix "collectd"
* ClientId "collectd"
* User "guest"
* Password "secret"
+ * Prefix "collectd"
* StoreRates true
* Retain false
* QoS 0
* </Publish>
*/
-static int mqtt_config_broker (oconfig_item_t *ci)
+static int mqtt_config_publisher (oconfig_item_t *ci)
{
mqtt_client_conf_t *conf;
user_data_t user_data;
ERROR ("mqtt plugin: malloc failed.");
return (-1);
}
+ conf->publish = 1;
conf->name = NULL;
status = cf_util_get_string (ci, &conf->name);
conf->port = MQTT_DEFAULT_PORT;
conf->client_id = NULL;
conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
+
C_COMPLAIN_INIT (&conf->complaint_cantpublish);
for (i = 0; i < ci->children_num; i++)
else
conf->port = tmp;
}
- else if (strcasecmp ("Prefix", child->key) == 0)
- cf_util_get_string (child, &conf->topic_prefix);
else if (strcasecmp ("ClientId", child->key) == 0)
cf_util_get_string (child, &conf->client_id);
else if (strcasecmp ("User", child->key) == 0)
cf_util_get_string (child, &conf->username);
else if (strcasecmp ("Password", child->key) == 0)
cf_util_get_string (child, &conf->password);
+ else if (strcasecmp ("QoS", child->key) == 0)
+ {
+ int tmp = -1;
+ status = cf_util_get_int (child, &tmp);
+ if ((status != 0) || (tmp < 0) || (tmp > 2))
+ ERROR ("mqtt plugin: Not a valid QoS setting.");
+ else
+ conf->qos = tmp;
+ }
+ else if (strcasecmp ("Prefix", child->key) == 0)
+ cf_util_get_string (child, &conf->topic_prefix);
else if (strcasecmp ("StoreRates", child->key) == 0)
cf_util_get_boolean (child, &conf->store_rates);
else if (strcasecmp ("Retain", child->key) == 0)
cf_util_get_boolean (child, &conf->retain);
+ else
+ ERROR ("mqtt plugin: Unknown config option: %s", child->key);
+ }
+
+ memset (&user_data, 0, sizeof (user_data));
+ user_data.data = conf;
+
+ plugin_register_write ("mqtt", mqtt_write, &user_data);
+ return (0);
+} /* mqtt_config_publisher */
+
+/*
+ * <Subscribe "name">
+ * Host "example.com"
+ * Port 1883
+ * ClientId "collectd"
+ * User "guest"
+ * Password "secret"
+ * Topic "collectd/#"
+ * </Publish>
+ */
+static int mqtt_config_subscriber (oconfig_item_t *ci)
+{
+ mqtt_client_conf_t **tmp;
+ mqtt_client_conf_t *conf;
+ int status;
+ int i;
+
+ conf = calloc (1, sizeof (*conf));
+ if (conf == NULL)
+ {
+ ERROR ("mqtt plugin: malloc failed.");
+ return (-1);
+ }
+ conf->publish = 0;
+
+ conf->name = NULL;
+ status = cf_util_get_string (ci, &conf->name);
+ if (status != 0)
+ {
+ mqtt_free (conf);
+ return (status);
+ }
+
+ conf->host = strdup (MQTT_DEFAULT_HOST);
+ conf->port = MQTT_DEFAULT_PORT;
+ conf->client_id = NULL;
+ conf->topic = strdup (MQTT_DEFAULT_TOPIC);
+
+ C_COMPLAIN_INIT (&conf->complaint_cantpublish);
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+ if (strcasecmp ("Host", child->key) == 0)
+ cf_util_get_string (child, &conf->host);
+ else if (strcasecmp ("Port", child->key) == 0)
+ {
+ int tmp = cf_util_get_port_number (child);
+ if (tmp < 0)
+ ERROR ("mqtt plugin: Invalid port number.");
+ else
+ conf->port = tmp;
+ }
+ else if (strcasecmp ("ClientId", child->key) == 0)
+ cf_util_get_string (child, &conf->client_id);
+ else if (strcasecmp ("User", child->key) == 0)
+ cf_util_get_string (child, &conf->username);
+ else if (strcasecmp ("Password", child->key) == 0)
+ cf_util_get_string (child, &conf->password);
else if (strcasecmp ("QoS", child->key) == 0)
{
int tmp = -1;
else
conf->qos = tmp;
}
+ else if (strcasecmp ("Topic", child->key) == 0)
+ cf_util_get_string (child, &conf->topic);
+ else if (strcasecmp ("CleanSession", child->key) == 0)
+ cf_util_get_boolean (child, &conf->clean_session);
else
ERROR ("mqtt plugin: Unknown config option: %s", child->key);
}
- memset (&user_data, 0, sizeof (user_data));
- user_data.data = conf;
-
- DEBUG ("mqtt plugin: successfully connected to broker \"%s:%d\"",
- conf->host, conf->port);
+ tmp = realloc (subscribers, sizeof (*subscribers) * subscribers_num);
+ if (tmp == NULL)
+ {
+ ERROR ("mqtt plugin: realloc failed.");
+ mqtt_free (conf);
+ return (-1);
+ }
+ subscribers = tmp;
+ subscribers[subscribers_num] = conf;
+ subscribers_num++;
- plugin_register_write ("mqtt", mqtt_write, &user_data);
return (0);
-} /* mqtt_config_broker */
+} /* mqtt_config_subscriber */
/*
* <Plugin mqtt>
* <Publish "name">
* # ...
* </Publish>
+ * <Subscribe "name">
+ * # ...
+ * </Subscribe>
* </Plugin>
*/
static int mqtt_config (oconfig_item_t *ci)
oconfig_item_t *child = ci->children + i;
if (strcasecmp ("Publish", child->key) == 0)
- mqtt_config_broker (child);
+ mqtt_config_publisher (child);
+ else if (strcasecmp ("Subscribe", child->key) == 0)
+ mqtt_config_subscriber (child);
else
ERROR ("mqtt plugin: Unknown config option: %s", child->key);
}
static int mqtt_init (void)
{
- mosquitto_lib_init();
+ size_t i;
+
+ mosquitto_lib_init ();
+
+ for (i = 0; i < subscribers_num; i++)
+ {
+ int status;
+
+ if (subscribers[i]->loop)
+ continue;
+
+ status = plugin_thread_create (&subscribers[i]->thread,
+ /* attrs = */ NULL,
+ /* func = */ subscribers_thread,
+ /* args = */ subscribers[i]);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("mqtt plugin: pthread_create failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ continue;
+ }
+ }
return (0);
} /* mqtt_init */