Code

mqtt plugin: Add support for libmosquitto, major version >0.
[collectd.git] / src / mqtt.c
index 7f785dc86b8a89249141ea37a3866e45cd23978f..210d38cbef7d9b0fd89e2863eee88042bc3ebd2a 100644 (file)
 #define MQTT_DEFAULT_PORT           1883
 #define MQTT_DEFAULT_TOPIC_PREFIX   "collectd"
 #define MQTT_DEFAULT_TOPIC          "collectd/#"
+#ifndef MQTT_KEEPALIVE
+# define MQTT_KEEPALIVE 60
+#endif
+
 
 /*
  * Data types
@@ -86,6 +90,7 @@ static size_t subscribers_num = 0;
 /*
  * Functions
  */
+#if LIBMOSQUITTO_MAJOR == 0
 static char const *mosquitto_strerror (int code)
 {
     switch (code)
@@ -109,6 +114,9 @@ static char const *mosquitto_strerror (int code)
 
     return "UNKNOWN ERROR CODE";
 }
+#else
+/* provided by libmosquitto */
+#endif
 
 static void mqtt_free (mqtt_client_conf_t *conf)
 {
@@ -153,7 +161,12 @@ static char *strip_prefix (char *topic)
     return (topic);
 }
 
-static void on_message (__attribute__((unused)) void *arg,
+static void on_message (
+#if LIBMOSQUITTO_MAJOR == 0
+#else
+        __attribute__((unused)) struct mosquitto *m,
+#endif
+        __attribute__((unused)) void *arg,
         const struct mosquitto_message *msg)
 {
     value_list_t vl = VALUE_LIST_INIT;
@@ -163,7 +176,8 @@ static void on_message (__attribute__((unused)) void *arg,
     char *payload;
     int status;
 
-    if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0))
+    if ((msg->payloadlen <= 0)
+            || (((uint8_t *) msg->payload)[msg->payloadlen - 1] != 0))
         return;
 
     topic = strdup (msg->topic);
@@ -252,7 +266,11 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
     else
         client_id = hostname_g;
 
+#if LIBMOSQUITTO_MAJOR == 0
     conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
+#else
+    conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf);
+#endif
     if (conf->mosq == NULL)
     {
         ERROR ("mqtt plugin: mosquitto_new failed");
@@ -276,8 +294,12 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
         }
     }
 
+#if LIBMOSQUITTO_MAJOR == 0
     status = mosquitto_connect (conf->mosq, conf->host, conf->port,
-            /* keepalive = */ 10, /* clean session = */ conf->clean_session);
+            /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
+#else
+    status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
+#endif
     if (status != MOSQ_ERR_SUCCESS)
     {
         char errbuf[1024];
@@ -295,7 +317,8 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
     {
         mosquitto_message_callback_set (conf->mosq, on_message);
 
-        status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL,
+        status = mosquitto_subscribe (conf->mosq,
+                /* message_id = */ NULL,
                 conf->topic, conf->qos);
         if (status != MOSQ_ERR_SUCCESS)
         {
@@ -331,7 +354,13 @@ static void *subscribers_thread (void *arg)
 
         /* The documentation says "0" would map to the default (1000ms), but
          * that does not work on some versions. */
+#if LIBMOSQUITTO_MAJOR == 0
         status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
+#else
+        status = mosquitto_loop (conf->mosq,
+                /* timeout[ms] = */ 1000,
+                /* max_packets = */  100);
+#endif
         if (status == MOSQ_ERR_CONN_LOST)
         {
             conf->connected = 0;
@@ -367,12 +396,13 @@ static int publish (mqtt_client_conf_t *conf, char const *topic,
         return (status);
     }
 
-    status = mosquitto_publish(conf->mosq,
-            /* message id */ NULL,
-            topic,
+    status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
+#if LIBMOSQUITTO_MAJOR == 0
             (uint32_t) payload_len, payload,
-            /* qos */ conf->qos,
-            /* retain */ conf->retain);
+#else
+            (int) payload_len, payload,
+#endif
+            conf->qos, conf->retain);
     if (status != MOSQ_ERR_SUCCESS)
     {
         char errbuf[1024];
@@ -469,6 +499,7 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
 static int mqtt_config_publisher (oconfig_item_t *ci)
 {
     mqtt_client_conf_t *conf;
+    char cb_name[1024];
     user_data_t user_data;
     int status;
     int i;
@@ -492,7 +523,9 @@ static int mqtt_config_publisher (oconfig_item_t *ci)
     conf->host = strdup (MQTT_DEFAULT_HOST);
     conf->port = MQTT_DEFAULT_PORT;
     conf->client_id = NULL;
+    conf->qos = 0;
     conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
+    conf->store_rates = 1;
 
     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
 
@@ -534,10 +567,11 @@ static int mqtt_config_publisher (oconfig_item_t *ci)
             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
     }
 
+    ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
     memset (&user_data, 0, sizeof (user_data));
     user_data.data = conf;
 
-    plugin_register_write ("mqtt", mqtt_write, &user_data);
+    plugin_register_write (cb_name, mqtt_write, &user_data);
     return (0);
 } /* mqtt_config_publisher */
 
@@ -577,7 +611,9 @@ static int mqtt_config_subscriber (oconfig_item_t *ci)
     conf->host = strdup (MQTT_DEFAULT_HOST);
     conf->port = MQTT_DEFAULT_PORT;
     conf->client_id = NULL;
+    conf->qos = 2;
     conf->topic = strdup (MQTT_DEFAULT_TOPIC);
+    conf->clean_session = 1;
 
     C_COMPLAIN_INIT (&conf->complaint_cantpublish);