X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fmqtt.c;h=1b71d423d458109247e095cd8b5ce97de30b0ce4;hb=6fa9a91c383a96f5e857239946590b8ff1ffe519;hp=acc514c52ec0ae85110e72fa1465bedc3ea42ec2;hpb=82bd9cf8f273d70ec11165c783da35cbc577aaed;p=collectd.git diff --git a/src/mqtt.c b/src/mqtt.c index acc514c5..1b71d423 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -90,6 +90,7 @@ static size_t subscribers_num = 0; /* * Functions */ +#if LIBMOSQUITTO_MAJOR == 0 static char const *mosquitto_strerror (int code) { switch (code) @@ -113,6 +114,9 @@ static char const *mosquitto_strerror (int code) return "UNKNOWN ERROR CODE"; } +#else +/* provided by libmosquitto */ +#endif static void mqtt_free (mqtt_client_conf_t *conf) { @@ -157,7 +161,12 @@ static char *strip_prefix (char *topic) return (topic); } -static void on_message (__attribute__((unused)) void *arg, +static void on_message ( +#if LIBMOSQUITTO_MAJOR == 0 +#else + __attribute__((unused)) struct mosquitto *m, +#endif + __attribute__((unused)) void *arg, const struct mosquitto_message *msg) { value_list_t vl = VALUE_LIST_INIT; @@ -167,7 +176,8 @@ static void on_message (__attribute__((unused)) void *arg, char *payload; int status; - if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0)) + if ((msg->payloadlen <= 0) + || (((uint8_t *) msg->payload)[msg->payloadlen - 1] != 0)) return; topic = strdup (msg->topic); @@ -256,7 +266,11 @@ static int mqtt_connect (mqtt_client_conf_t *conf) else client_id = hostname_g; +#if LIBMOSQUITTO_MAJOR == 0 conf->mosq = mosquitto_new (client_id, /* user data = */ conf); +#else + conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf); +#endif if (conf->mosq == NULL) { ERROR ("mqtt plugin: mosquitto_new failed"); @@ -280,8 +294,12 @@ static int mqtt_connect (mqtt_client_conf_t *conf) } } +#if LIBMOSQUITTO_MAJOR == 0 status = mosquitto_connect (conf->mosq, conf->host, conf->port, /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session); +#else + status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE); +#endif if (status != MOSQ_ERR_SUCCESS) { char errbuf[1024]; @@ -299,7 +317,8 @@ static int mqtt_connect (mqtt_client_conf_t *conf) { mosquitto_message_callback_set (conf->mosq, on_message); - status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL, + status = mosquitto_subscribe (conf->mosq, + /* message_id = */ NULL, conf->topic, conf->qos); if (status != MOSQ_ERR_SUCCESS) { @@ -335,7 +354,13 @@ static void *subscribers_thread (void *arg) /* The documentation says "0" would map to the default (1000ms), but * that does not work on some versions. */ +#if LIBMOSQUITTO_MAJOR == 0 status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */); +#else + status = mosquitto_loop (conf->mosq, + /* timeout[ms] = */ 1000, + /* max_packets = */ 100); +#endif if (status == MOSQ_ERR_CONN_LOST) { conf->connected = 0; @@ -371,21 +396,22 @@ static int publish (mqtt_client_conf_t *conf, char const *topic, return (status); } - status = mosquitto_publish(conf->mosq, - /* message id */ NULL, - topic, + status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic, +#if LIBMOSQUITTO_MAJOR == 0 (uint32_t) payload_len, payload, - /* qos */ conf->qos, - /* retain */ conf->retain); +#else + (int) payload_len, payload, +#endif + conf->qos, conf->retain); if (status != MOSQ_ERR_SUCCESS) { char errbuf[1024]; c_complain (LOG_ERR, - &conf->complaint_cantpublish, - "plugin mqtt: mosquitto_publish failed: %s", - status == MOSQ_ERR_ERRNO ? - sstrerror(errno, errbuf, sizeof (errbuf)) : - mosquitto_strerror(status)); + &conf->complaint_cantpublish, + "mqtt plugin: mosquitto_publish failed: %s", + (status == MOSQ_ERR_ERRNO) + ? sstrerror(errno, errbuf, sizeof (errbuf)) + : mosquitto_strerror(status)); /* Mark our connection "down" regardless of the error as a safety * measure; we will try to reconnect the next time we have to publish a * message */ @@ -497,7 +523,16 @@ static int mqtt_config_publisher (oconfig_item_t *ci) conf->host = strdup (MQTT_DEFAULT_HOST); conf->port = MQTT_DEFAULT_PORT; conf->client_id = NULL; + conf->qos = 0; conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX); + conf->store_rates = 1; + + status = pthread_mutex_init (&conf->lock, NULL); + if (status != 0) + { + mqtt_free (conf); + return (status); + } C_COMPLAIN_INIT (&conf->complaint_cantpublish); @@ -583,7 +618,16 @@ static int mqtt_config_subscriber (oconfig_item_t *ci) conf->host = strdup (MQTT_DEFAULT_HOST); conf->port = MQTT_DEFAULT_PORT; conf->client_id = NULL; + conf->qos = 2; conf->topic = strdup (MQTT_DEFAULT_TOPIC); + conf->clean_session = 1; + + status = pthread_mutex_init (&conf->lock, NULL); + if (status != 0) + { + mqtt_free (conf); + return (status); + } C_COMPLAIN_INIT (&conf->complaint_cantpublish);