From: Florian Forster Date: Thu, 20 Nov 2014 17:19:37 +0000 (+0100) Subject: mqtt plugin: Concurrency fixes, pick up conf->lock in publish. X-Git-Tag: collectd-5.6.0~637^2~18 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=96179cb2a421f98dedfc7bfe0d56ab50c2caf56b;p=collectd.git mqtt plugin: Concurrency fixes, pick up conf->lock in publish. --- diff --git a/src/mqtt.c b/src/mqtt.c index 7adc7f71..5c844a76 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -55,6 +55,7 @@ struct mqtt_client_conf c_complain_t complaint_cantpublish; pthread_mutex_t lock; }; +typedef struct mqtt_client_conf mqtt_client_conf_t; static char const *mosquitto_strerror (int code) { @@ -83,22 +84,20 @@ static char const *mosquitto_strerror (int code) /* * Functions */ -static int mqtt_reconnect_broker (struct mqtt_client_conf *conf) +/* must hold conf->lock when calling. */ +static int mqtt_reconnect_broker (mqtt_client_conf_t *conf) { int status; if (conf->connected) return (0); - pthread_mutex_lock (&conf->lock); - status = mosquitto_reconnect (conf->mosq); - - if (status != MOSQ_ERR_SUCCESS) { + if (status != MOSQ_ERR_SUCCESS) + { ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s", (status == MOSQ_ERR_ERRNO ? strerror(errno) : mosquitto_strerror (status))); - pthread_mutex_unlock (&conf->lock); return (-1); } @@ -109,17 +108,24 @@ static int mqtt_reconnect_broker (struct mqtt_client_conf *conf) "mqtt plugin: successfully reconnected to broker \"%s:%d\"", conf->host, conf->port); - pthread_mutex_unlock (&conf->lock); - return (0); } /* mqtt_reconnect_broker */ -static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic, +static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic, void const *payload, size_t payload_len) { char errbuf[1024]; int status; + pthread_mutex_lock (&conf->lock); + + status = mqtt_reconnect_broker (conf); + if (status != 0) { + pthread_mutex_unlock (&conf->lock); + ERROR ("mqtt plugin: unable to reconnect to broker"); + return (status); + } + status = mosquitto_publish(conf->mosq, /* message id */ NULL, topic, @@ -127,7 +133,6 @@ static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic, payload, /* qos */ 0, /* retain */ false); - if (status != MOSQ_ERR_SUCCESS) { c_complain (LOG_ERR, @@ -142,9 +147,11 @@ static int mqtt_publish_message (struct mqtt_client_conf *conf, char *topic, */ conf->connected = false; + pthread_mutex_unlock (&conf->lock); return (-1); } + pthread_mutex_unlock (&conf->lock); return (0); } /* mqtt_publish_message */