diff --git a/src/mqtt.c b/src/mqtt.c
index 98b1751db7fafcc9b537aeca7ac2efa95df21465..1b71d423d458109247e095cd8b5ce97de30b0ce4 100644 (file)
--- a/src/mqtt.c
+++ b/src/mqtt.c
/*
* Functions
*/
/*
* Functions
*/
+#if LIBMOSQUITTO_MAJOR == 0
static char const *mosquitto_strerror (int code)
{
switch (code)
static char const *mosquitto_strerror (int code)
{
switch (code)
return "UNKNOWN ERROR CODE";
}
return "UNKNOWN ERROR CODE";
}
+#else
+/* provided by libmosquitto */
+#endif
static void mqtt_free (mqtt_client_conf_t *conf)
{
static void mqtt_free (mqtt_client_conf_t *conf)
{
return (topic);
}
return (topic);
}
-static void on_message (__attribute__((unused)) void *arg,
+static void on_message (
+#if LIBMOSQUITTO_MAJOR == 0
+#else
+ __attribute__((unused)) struct mosquitto *m,
+#endif
+ __attribute__((unused)) void *arg,
const struct mosquitto_message *msg)
{
value_list_t vl = VALUE_LIST_INIT;
const struct mosquitto_message *msg)
{
value_list_t vl = VALUE_LIST_INIT;
char *payload;
int status;
char *payload;
int status;
- if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0))
+ if ((msg->payloadlen <= 0)
+ || (((uint8_t *) msg->payload)[msg->payloadlen - 1] != 0))
return;
topic = strdup (msg->topic);
return;
topic = strdup (msg->topic);
else
client_id = hostname_g;
else
client_id = hostname_g;
+#if LIBMOSQUITTO_MAJOR == 0
conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
+#else
+ conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf);
+#endif
if (conf->mosq == NULL)
{
ERROR ("mqtt plugin: mosquitto_new failed");
if (conf->mosq == NULL)
{
ERROR ("mqtt plugin: mosquitto_new failed");
}
}
}
}
+#if LIBMOSQUITTO_MAJOR == 0
status = mosquitto_connect (conf->mosq, conf->host, conf->port,
/* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
status = mosquitto_connect (conf->mosq, conf->host, conf->port,
/* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
+#else
+ status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
+#endif
if (status != MOSQ_ERR_SUCCESS)
{
char errbuf[1024];
if (status != MOSQ_ERR_SUCCESS)
{
char errbuf[1024];
{
mosquitto_message_callback_set (conf->mosq, on_message);
{
mosquitto_message_callback_set (conf->mosq, on_message);
- status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL,
+ status = mosquitto_subscribe (conf->mosq,
+ /* message_id = */ NULL,
conf->topic, conf->qos);
if (status != MOSQ_ERR_SUCCESS)
{
conf->topic, conf->qos);
if (status != MOSQ_ERR_SUCCESS)
{
/* The documentation says "0" would map to the default (1000ms), but
* that does not work on some versions. */
/* The documentation says "0" would map to the default (1000ms), but
* that does not work on some versions. */
+#if LIBMOSQUITTO_MAJOR == 0
status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
+#else
+ status = mosquitto_loop (conf->mosq,
+ /* timeout[ms] = */ 1000,
+ /* max_packets = */ 100);
+#endif
if (status == MOSQ_ERR_CONN_LOST)
{
conf->connected = 0;
if (status == MOSQ_ERR_CONN_LOST)
{
conf->connected = 0;
return (status);
}
return (status);
}
- status = mosquitto_publish(conf->mosq,
- /* message id */ NULL,
- topic,
+ status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
+#if LIBMOSQUITTO_MAJOR == 0
(uint32_t) payload_len, payload,
(uint32_t) payload_len, payload,
- /* qos */ conf->qos,
- /* retain */ conf->retain);
+#else
+ (int) payload_len, payload,
+#endif
+ conf->qos, conf->retain);
if (status != MOSQ_ERR_SUCCESS)
{
char errbuf[1024];
c_complain (LOG_ERR,
if (status != MOSQ_ERR_SUCCESS)
{
char errbuf[1024];
c_complain (LOG_ERR,
- &conf->complaint_cantpublish,
- "plugin mqtt: mosquitto_publish failed: %s",
- status == MOSQ_ERR_ERRNO ?
- sstrerror(errno, errbuf, sizeof (errbuf)) :
- mosquitto_strerror(status));
+ &conf->complaint_cantpublish,
+ "mqtt plugin: mosquitto_publish failed: %s",
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror(errno, errbuf, sizeof (errbuf))
+ : mosquitto_strerror(status));
/* Mark our connection "down" regardless of the error as a safety
* measure; we will try to reconnect the next time we have to publish a
* message */
/* Mark our connection "down" regardless of the error as a safety
* measure; we will try to reconnect the next time we have to publish a
* message */
static int mqtt_config_publisher (oconfig_item_t *ci)
{
mqtt_client_conf_t *conf;
static int mqtt_config_publisher (oconfig_item_t *ci)
{
mqtt_client_conf_t *conf;
+ char cb_name[1024];
user_data_t user_data;
int status;
int i;
user_data_t user_data;
int status;
int i;
conf->host = strdup (MQTT_DEFAULT_HOST);
conf->port = MQTT_DEFAULT_PORT;
conf->client_id = NULL;
conf->host = strdup (MQTT_DEFAULT_HOST);
conf->port = MQTT_DEFAULT_PORT;
conf->client_id = NULL;
+ conf->qos = 0;
conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
+ conf->store_rates = 1;
+
+ status = pthread_mutex_init (&conf->lock, NULL);
+ if (status != 0)
+ {
+ mqtt_free (conf);
+ return (status);
+ }
C_COMPLAIN_INIT (&conf->complaint_cantpublish);
C_COMPLAIN_INIT (&conf->complaint_cantpublish);
ERROR ("mqtt plugin: Unknown config option: %s", child->key);
}
ERROR ("mqtt plugin: Unknown config option: %s", child->key);
}
+ ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
memset (&user_data, 0, sizeof (user_data));
user_data.data = conf;
memset (&user_data, 0, sizeof (user_data));
user_data.data = conf;
- plugin_register_write ("mqtt", mqtt_write, &user_data);
+ plugin_register_write (cb_name, mqtt_write, &user_data);
return (0);
} /* mqtt_config_publisher */
return (0);
} /* mqtt_config_publisher */
conf->host = strdup (MQTT_DEFAULT_HOST);
conf->port = MQTT_DEFAULT_PORT;
conf->client_id = NULL;
conf->host = strdup (MQTT_DEFAULT_HOST);
conf->port = MQTT_DEFAULT_PORT;
conf->client_id = NULL;
+ conf->qos = 2;
conf->topic = strdup (MQTT_DEFAULT_TOPIC);
conf->topic = strdup (MQTT_DEFAULT_TOPIC);
+ conf->clean_session = 1;
+
+ status = pthread_mutex_init (&conf->lock, NULL);
+ if (status != 0)
+ {
+ mqtt_free (conf);
+ return (status);
+ }
C_COMPLAIN_INIT (&conf->complaint_cantpublish);
C_COMPLAIN_INIT (&conf->complaint_cantpublish);