Code

mqtt plugin: Add support for libmosquitto, major version >0.
authorFlorian Forster <octo@collectd.org>
Wed, 8 Jul 2015 11:09:26 +0000 (13:09 +0200)
committerFlorian Forster <octo@collectd.org>
Wed, 8 Jul 2015 11:09:26 +0000 (13:09 +0200)
src/mqtt.c

index b5e40aec4909563b96fffd4426ac28c3452e8017..210d38cbef7d9b0fd89e2863eee88042bc3ebd2a 100644 (file)
@@ -90,6 +90,7 @@ static size_t subscribers_num = 0;
 /*
  * Functions
  */
+#if LIBMOSQUITTO_MAJOR == 0
 static char const *mosquitto_strerror (int code)
 {
     switch (code)
@@ -113,6 +114,9 @@ static char const *mosquitto_strerror (int code)
 
     return "UNKNOWN ERROR CODE";
 }
+#else
+/* provided by libmosquitto */
+#endif
 
 static void mqtt_free (mqtt_client_conf_t *conf)
 {
@@ -157,7 +161,12 @@ static char *strip_prefix (char *topic)
     return (topic);
 }
 
-static void on_message (__attribute__((unused)) void *arg,
+static void on_message (
+#if LIBMOSQUITTO_MAJOR == 0
+#else
+        __attribute__((unused)) struct mosquitto *m,
+#endif
+        __attribute__((unused)) void *arg,
         const struct mosquitto_message *msg)
 {
     value_list_t vl = VALUE_LIST_INIT;
@@ -167,7 +176,8 @@ static void on_message (__attribute__((unused)) void *arg,
     char *payload;
     int status;
 
-    if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0))
+    if ((msg->payloadlen <= 0)
+            || (((uint8_t *) msg->payload)[msg->payloadlen - 1] != 0))
         return;
 
     topic = strdup (msg->topic);
@@ -256,7 +266,11 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
     else
         client_id = hostname_g;
 
+#if LIBMOSQUITTO_MAJOR == 0
     conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
+#else
+    conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf);
+#endif
     if (conf->mosq == NULL)
     {
         ERROR ("mqtt plugin: mosquitto_new failed");
@@ -280,8 +294,12 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
         }
     }
 
+#if LIBMOSQUITTO_MAJOR == 0
     status = mosquitto_connect (conf->mosq, conf->host, conf->port,
             /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
+#else
+    status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
+#endif
     if (status != MOSQ_ERR_SUCCESS)
     {
         char errbuf[1024];
@@ -299,7 +317,8 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
     {
         mosquitto_message_callback_set (conf->mosq, on_message);
 
-        status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL,
+        status = mosquitto_subscribe (conf->mosq,
+                /* message_id = */ NULL,
                 conf->topic, conf->qos);
         if (status != MOSQ_ERR_SUCCESS)
         {
@@ -335,7 +354,13 @@ static void *subscribers_thread (void *arg)
 
         /* The documentation says "0" would map to the default (1000ms), but
          * that does not work on some versions. */
+#if LIBMOSQUITTO_MAJOR == 0
         status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
+#else
+        status = mosquitto_loop (conf->mosq,
+                /* timeout[ms] = */ 1000,
+                /* max_packets = */  100);
+#endif
         if (status == MOSQ_ERR_CONN_LOST)
         {
             conf->connected = 0;
@@ -371,12 +396,13 @@ static int publish (mqtt_client_conf_t *conf, char const *topic,
         return (status);
     }
 
-    status = mosquitto_publish(conf->mosq,
-            /* message id */ NULL,
-            topic,
+    status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
+#if LIBMOSQUITTO_MAJOR == 0
             (uint32_t) payload_len, payload,
-            /* qos */ conf->qos,
-            /* retain */ conf->retain);
+#else
+            (int) payload_len, payload,
+#endif
+            conf->qos, conf->retain);
     if (status != MOSQ_ERR_SUCCESS)
     {
         char errbuf[1024];