Code

mqtt plugin: Change some default values.
[collectd.git] / src / mqtt.c
index bfa2fdb9f4ec69eef00c42a565b7539f885c3582..b5e40aec4909563b96fffd4426ac28c3452e8017 100644 (file)
@@ -1,6 +1,7 @@
 /**
  * collectd - src/mqtt.c
- * Copyright (C) 2014       Marc Falzon <marc at baha dot mu>
+ * Copyright (C) 2014       Marc Falzon
+ * Copyright (C) 2014,2015  Florian octo Forster
  *
  * Permission is hereby granted, free of charge, to any person obtaining a
  * copy of this software and associated documentation files (the "Software"),
  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Marc Falzon <marc at baha dot mu>
+ *   Florian octo Forster <octo at collectd.org>
  **/
 
 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
 #define MQTT_DEFAULT_HOST           "localhost"
 #define MQTT_DEFAULT_PORT           1883
 #define MQTT_DEFAULT_TOPIC_PREFIX   "collectd"
+#define MQTT_DEFAULT_TOPIC          "collectd/#"
+#ifndef MQTT_KEEPALIVE
+# define MQTT_KEEPALIVE 60
+#endif
+
 
 /*
  * Data types
  */
 struct mqtt_client_conf
 {
+    _Bool               publish;
     char               *name;
 
     struct mosquitto   *mosq;
@@ -52,20 +63,33 @@ struct mqtt_client_conf
 
     char               *host;
     int                 port;
+    char               *client_id;
     char               *username;
     char               *password;
+    int                 qos;
 
-    char               *client_id;
+    /* For publishing */
     char               *topic_prefix;
     _Bool               store_rates;
     _Bool               retain;
-    int qos;
+
+    /* For subscribing */
+    pthread_t           thread;
+    _Bool               loop;
+    char               *topic;
+    _Bool               clean_session;
 
     c_complain_t        complaint_cantpublish;
     pthread_mutex_t     lock;
 };
 typedef struct mqtt_client_conf mqtt_client_conf_t;
 
+static mqtt_client_conf_t **subscribers = NULL;
+static size_t subscribers_num = 0;
+
+/*
+ * Functions
+ */
 static char const *mosquitto_strerror (int code)
 {
     switch (code)
@@ -108,9 +132,87 @@ static void mqtt_free (mqtt_client_conf_t *conf)
     sfree (conf);
 }
 
-/*
- * Functions
- */
+static char *strip_prefix (char *topic)
+{
+    size_t num;
+    size_t i;
+
+    num = 0;
+    for (i = 0; topic[i] != 0; i++)
+        if (topic[i] == '/')
+            num++;
+
+    if (num < 2)
+        return (NULL);
+
+    while (num > 2)
+    {
+        char *tmp = strchr (topic, '/');
+        if (tmp == NULL)
+            return (NULL);
+        topic = tmp + 1;
+        num--;
+    }
+
+    return (topic);
+}
+
+static void on_message (__attribute__((unused)) void *arg,
+        const struct mosquitto_message *msg)
+{
+    value_list_t vl = VALUE_LIST_INIT;
+    data_set_t const *ds;
+    char *topic;
+    char *name;
+    char *payload;
+    int status;
+
+    if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0))
+        return;
+
+    topic = strdup (msg->topic);
+    name = strip_prefix (topic);
+
+    status = parse_identifier_vl (name, &vl);
+    if (status != 0)
+    {
+        ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic);
+        sfree (topic);
+        return;
+    }
+    sfree (topic);
+
+    ds = plugin_get_ds (vl.type);
+    if (ds == NULL)
+    {
+        ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type);
+        return;
+    }
+
+    vl.values = calloc (ds->ds_num, sizeof (*vl.values));
+    if (vl.values == NULL)
+    {
+        ERROR ("mqtt plugin: calloc failed.");
+        return;
+    }
+    vl.values_len = ds->ds_num;
+
+    payload = strdup ((void *) msg->payload);
+    DEBUG ("mqtt plugin: payload = \"%s\"", payload);
+    status = parse_values (payload, &vl, ds);
+    if (status != 0)
+    {
+        ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload);
+        sfree (payload);
+        sfree (vl.values);
+        return;
+    }
+    sfree (payload);
+
+    plugin_dispatch_values (&vl);
+    sfree (vl.values);
+} /* void on_message */
+
 /* must hold conf->lock when calling. */
 static int mqtt_reconnect (mqtt_client_conf_t *conf)
 {
@@ -179,7 +281,7 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
     }
 
     status = mosquitto_connect (conf->mosq, conf->host, conf->port,
-            /* keepalive = */ 10, /* clean session = */ 1);
+            /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
     if (status != MOSQ_ERR_SUCCESS)
     {
         char errbuf[1024];
@@ -193,10 +295,68 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
         return (-1);
     }
 
+    if (!conf->publish)
+    {
+        mosquitto_message_callback_set (conf->mosq, on_message);
+
+        status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL,
+                conf->topic, conf->qos);
+        if (status != MOSQ_ERR_SUCCESS)
+        {
+            ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s",
+                    conf->topic, mosquitto_strerror (status));
+
+            mosquitto_disconnect (conf->mosq);
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            return (-1);
+        }
+    }
+
     conf->connected = 1;
     return (0);
 } /* mqtt_connect */
 
+static void *subscribers_thread (void *arg)
+{
+    mqtt_client_conf_t *conf = arg;
+    int status;
+
+    conf->loop = 1;
+
+    while (conf->loop)
+    {
+        status = mqtt_connect (conf);
+        if (status != 0)
+        {
+            sleep (1);
+            continue;
+        }
+
+        /* The documentation says "0" would map to the default (1000ms), but
+         * that does not work on some versions. */
+        status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
+        if (status == MOSQ_ERR_CONN_LOST)
+        {
+            conf->connected = 0;
+            continue;
+        }
+        else if (status != MOSQ_ERR_SUCCESS)
+        {
+            ERROR ("mqtt plugin: mosquitto_loop failed: %s",
+                    mosquitto_strerror (status));
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            conf->connected = 0;
+            continue;
+        }
+
+        DEBUG ("mqtt plugin: mosquitto_loop succeeded.");
+    } /* while (conf->loop) */
+
+    pthread_exit (0);
+} /* void *subscribers_thread */
+
 static int publish (mqtt_client_conf_t *conf, char const *topic,
     void const *payload, size_t payload_len)
 {
@@ -301,18 +461,19 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
  * <Publish "name">
  *   Host "example.com"
  *   Port 1883
- *   Prefix "collectd"
  *   ClientId "collectd"
  *   User "guest"
  *   Password "secret"
+ *   Prefix "collectd"
  *   StoreRates true
  *   Retain false
  *   QoS 0
  * </Publish>
  */
-static int mqtt_config_broker (oconfig_item_t *ci)
+static int mqtt_config_publisher (oconfig_item_t *ci)
 {
     mqtt_client_conf_t *conf;
+    char cb_name[1024];
     user_data_t user_data;
     int status;
     int i;
@@ -323,6 +484,7 @@ static int mqtt_config_broker (oconfig_item_t *ci)
         ERROR ("mqtt plugin: malloc failed.");
         return (-1);
     }
+    conf->publish = 1;
 
     conf->name = NULL;
     status = cf_util_get_string (ci, &conf->name);
@@ -335,7 +497,10 @@ static int mqtt_config_broker (oconfig_item_t *ci)
     conf->host = strdup (MQTT_DEFAULT_HOST);
     conf->port = MQTT_DEFAULT_PORT;
     conf->client_id = NULL;
+    conf->qos = 0;
     conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
+    conf->store_rates = 1;
+
     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
 
     for (i = 0; i < ci->children_num; i++)
@@ -351,18 +516,100 @@ static int mqtt_config_broker (oconfig_item_t *ci)
             else
                 conf->port = tmp;
         }
-        else if (strcasecmp ("Prefix", child->key) == 0)
-            cf_util_get_string (child, &conf->topic_prefix);
         else if (strcasecmp ("ClientId", child->key) == 0)
             cf_util_get_string (child, &conf->client_id);
         else if (strcasecmp ("User", child->key) == 0)
             cf_util_get_string (child, &conf->username);
         else if (strcasecmp ("Password", child->key) == 0)
             cf_util_get_string (child, &conf->password);
+        else if (strcasecmp ("QoS", child->key) == 0)
+        {
+            int tmp = -1;
+            status = cf_util_get_int (child, &tmp);
+            if ((status != 0) || (tmp < 0) || (tmp > 2))
+                ERROR ("mqtt plugin: Not a valid QoS setting.");
+            else
+                conf->qos = tmp;
+        }
+        else if (strcasecmp ("Prefix", child->key) == 0)
+            cf_util_get_string (child, &conf->topic_prefix);
         else if (strcasecmp ("StoreRates", child->key) == 0)
             cf_util_get_boolean (child, &conf->store_rates);
         else if (strcasecmp ("Retain", child->key) == 0)
             cf_util_get_boolean (child, &conf->retain);
+        else
+            ERROR ("mqtt plugin: Unknown config option: %s", child->key);
+    }
+
+    ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
+    memset (&user_data, 0, sizeof (user_data));
+    user_data.data = conf;
+
+    plugin_register_write (cb_name, mqtt_write, &user_data);
+    return (0);
+} /* mqtt_config_publisher */
+
+/*
+ * <Subscribe "name">
+ *   Host "example.com"
+ *   Port 1883
+ *   ClientId "collectd"
+ *   User "guest"
+ *   Password "secret"
+ *   Topic "collectd/#"
+ * </Publish>
+ */
+static int mqtt_config_subscriber (oconfig_item_t *ci)
+{
+    mqtt_client_conf_t **tmp;
+    mqtt_client_conf_t *conf;
+    int status;
+    int i;
+
+    conf = calloc (1, sizeof (*conf));
+    if (conf == NULL)
+    {
+        ERROR ("mqtt plugin: malloc failed.");
+        return (-1);
+    }
+    conf->publish = 0;
+
+    conf->name = NULL;
+    status = cf_util_get_string (ci, &conf->name);
+    if (status != 0)
+    {
+        mqtt_free (conf);
+        return (status);
+    }
+
+    conf->host = strdup (MQTT_DEFAULT_HOST);
+    conf->port = MQTT_DEFAULT_PORT;
+    conf->client_id = NULL;
+    conf->qos = 2;
+    conf->topic = strdup (MQTT_DEFAULT_TOPIC);
+    conf->clean_session = 1;
+
+    C_COMPLAIN_INIT (&conf->complaint_cantpublish);
+
+    for (i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
+        if (strcasecmp ("Host", child->key) == 0)
+            cf_util_get_string (child, &conf->host);
+        else if (strcasecmp ("Port", child->key) == 0)
+        {
+            int tmp = cf_util_get_port_number (child);
+            if (tmp < 0)
+                ERROR ("mqtt plugin: Invalid port number.");
+            else
+                conf->port = tmp;
+        }
+        else if (strcasecmp ("ClientId", child->key) == 0)
+            cf_util_get_string (child, &conf->client_id);
+        else if (strcasecmp ("User", child->key) == 0)
+            cf_util_get_string (child, &conf->username);
+        else if (strcasecmp ("Password", child->key) == 0)
+            cf_util_get_string (child, &conf->password);
         else if (strcasecmp ("QoS", child->key) == 0)
         {
             int tmp = -1;
@@ -372,25 +619,36 @@ static int mqtt_config_broker (oconfig_item_t *ci)
             else
                 conf->qos = tmp;
         }
+        else if (strcasecmp ("Topic", child->key) == 0)
+            cf_util_get_string (child, &conf->topic);
+        else if (strcasecmp ("CleanSession", child->key) == 0)
+            cf_util_get_boolean (child, &conf->clean_session);
         else
             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
     }
 
-    memset (&user_data, 0, sizeof (user_data));
-    user_data.data = conf;
-
-    DEBUG ("mqtt plugin: successfully connected to broker \"%s:%d\"",
-        conf->host, conf->port);
+    tmp = realloc (subscribers, sizeof (*subscribers) * subscribers_num);
+    if (tmp == NULL)
+    {
+        ERROR ("mqtt plugin: realloc failed.");
+        mqtt_free (conf);
+        return (-1);
+    }
+    subscribers = tmp;
+    subscribers[subscribers_num] = conf;
+    subscribers_num++;
 
-    plugin_register_write ("mqtt", mqtt_write, &user_data);
     return (0);
-} /* mqtt_config_broker */
+} /* mqtt_config_subscriber */
 
 /*
  * <Plugin mqtt>
  *   <Publish "name">
  *     # ...
  *   </Publish>
+ *   <Subscribe "name">
+ *     # ...
+ *   </Subscribe>
  * </Plugin>
  */
 static int mqtt_config (oconfig_item_t *ci)
@@ -402,7 +660,9 @@ static int mqtt_config (oconfig_item_t *ci)
         oconfig_item_t *child = ci->children + i;
 
         if (strcasecmp ("Publish", child->key) == 0)
-            mqtt_config_broker (child);
+            mqtt_config_publisher (child);
+        else if (strcasecmp ("Subscribe", child->key) == 0)
+            mqtt_config_subscriber (child);
         else
             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
     }
@@ -412,7 +672,29 @@ static int mqtt_config (oconfig_item_t *ci)
 
 static int mqtt_init (void)
 {
-    mosquitto_lib_init();
+    size_t i;
+
+    mosquitto_lib_init ();
+
+    for (i = 0; i < subscribers_num; i++)
+    {
+        int status;
+
+        if (subscribers[i]->loop)
+            continue;
+
+        status = plugin_thread_create (&subscribers[i]->thread,
+                /* attrs = */ NULL,
+                /* func  = */ subscribers_thread,
+                /* args  = */ subscribers[i]);
+        if (status != 0)
+        {
+            char errbuf[1024];
+            ERROR ("mqtt plugin: pthread_create failed: %s",
+                    sstrerror (errno, errbuf, sizeof (errbuf)));
+            continue;
+        }
+    }
 
     return (0);
 } /* mqtt_init */