X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fmqtt.c;h=210d38cbef7d9b0fd89e2863eee88042bc3ebd2a;hb=416aab49a9ae19b993b747b5fca92fc3501fff5d;hp=7f785dc86b8a89249141ea37a3866e45cd23978f;hpb=72b27b2c9784a0984ff9713e7a56a7956f6181e6;p=collectd.git diff --git a/src/mqtt.c b/src/mqtt.c index 7f785dc8..210d38cb 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -45,6 +45,10 @@ #define MQTT_DEFAULT_PORT 1883 #define MQTT_DEFAULT_TOPIC_PREFIX "collectd" #define MQTT_DEFAULT_TOPIC "collectd/#" +#ifndef MQTT_KEEPALIVE +# define MQTT_KEEPALIVE 60 +#endif + /* * Data types @@ -86,6 +90,7 @@ static size_t subscribers_num = 0; /* * Functions */ +#if LIBMOSQUITTO_MAJOR == 0 static char const *mosquitto_strerror (int code) { switch (code) @@ -109,6 +114,9 @@ static char const *mosquitto_strerror (int code) return "UNKNOWN ERROR CODE"; } +#else +/* provided by libmosquitto */ +#endif static void mqtt_free (mqtt_client_conf_t *conf) { @@ -153,7 +161,12 @@ static char *strip_prefix (char *topic) return (topic); } -static void on_message (__attribute__((unused)) void *arg, +static void on_message ( +#if LIBMOSQUITTO_MAJOR == 0 +#else + __attribute__((unused)) struct mosquitto *m, +#endif + __attribute__((unused)) void *arg, const struct mosquitto_message *msg) { value_list_t vl = VALUE_LIST_INIT; @@ -163,7 +176,8 @@ static void on_message (__attribute__((unused)) void *arg, char *payload; int status; - if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0)) + if ((msg->payloadlen <= 0) + || (((uint8_t *) msg->payload)[msg->payloadlen - 1] != 0)) return; topic = strdup (msg->topic); @@ -252,7 +266,11 @@ static int mqtt_connect (mqtt_client_conf_t *conf) else client_id = hostname_g; +#if LIBMOSQUITTO_MAJOR == 0 conf->mosq = mosquitto_new (client_id, /* user data = */ conf); +#else + conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf); +#endif if (conf->mosq == NULL) { ERROR ("mqtt plugin: mosquitto_new failed"); @@ -276,8 +294,12 @@ static int mqtt_connect (mqtt_client_conf_t *conf) } } +#if LIBMOSQUITTO_MAJOR == 0 status = mosquitto_connect (conf->mosq, conf->host, conf->port, - /* keepalive = */ 10, /* clean session = */ conf->clean_session); + /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session); +#else + status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE); +#endif if (status != MOSQ_ERR_SUCCESS) { char errbuf[1024]; @@ -295,7 +317,8 @@ static int mqtt_connect (mqtt_client_conf_t *conf) { mosquitto_message_callback_set (conf->mosq, on_message); - status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL, + status = mosquitto_subscribe (conf->mosq, + /* message_id = */ NULL, conf->topic, conf->qos); if (status != MOSQ_ERR_SUCCESS) { @@ -331,7 +354,13 @@ static void *subscribers_thread (void *arg) /* The documentation says "0" would map to the default (1000ms), but * that does not work on some versions. */ +#if LIBMOSQUITTO_MAJOR == 0 status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */); +#else + status = mosquitto_loop (conf->mosq, + /* timeout[ms] = */ 1000, + /* max_packets = */ 100); +#endif if (status == MOSQ_ERR_CONN_LOST) { conf->connected = 0; @@ -367,12 +396,13 @@ static int publish (mqtt_client_conf_t *conf, char const *topic, return (status); } - status = mosquitto_publish(conf->mosq, - /* message id */ NULL, - topic, + status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic, +#if LIBMOSQUITTO_MAJOR == 0 (uint32_t) payload_len, payload, - /* qos */ conf->qos, - /* retain */ conf->retain); +#else + (int) payload_len, payload, +#endif + conf->qos, conf->retain); if (status != MOSQ_ERR_SUCCESS) { char errbuf[1024]; @@ -469,6 +499,7 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl, static int mqtt_config_publisher (oconfig_item_t *ci) { mqtt_client_conf_t *conf; + char cb_name[1024]; user_data_t user_data; int status; int i; @@ -492,7 +523,9 @@ static int mqtt_config_publisher (oconfig_item_t *ci) conf->host = strdup (MQTT_DEFAULT_HOST); conf->port = MQTT_DEFAULT_PORT; conf->client_id = NULL; + conf->qos = 0; conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX); + conf->store_rates = 1; C_COMPLAIN_INIT (&conf->complaint_cantpublish); @@ -534,10 +567,11 @@ static int mqtt_config_publisher (oconfig_item_t *ci) ERROR ("mqtt plugin: Unknown config option: %s", child->key); } + ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name); memset (&user_data, 0, sizeof (user_data)); user_data.data = conf; - plugin_register_write ("mqtt", mqtt_write, &user_data); + plugin_register_write (cb_name, mqtt_write, &user_data); return (0); } /* mqtt_config_publisher */ @@ -577,7 +611,9 @@ static int mqtt_config_subscriber (oconfig_item_t *ci) conf->host = strdup (MQTT_DEFAULT_HOST); conf->port = MQTT_DEFAULT_PORT; conf->client_id = NULL; + conf->qos = 2; conf->topic = strdup (MQTT_DEFAULT_TOPIC); + conf->clean_session = 1; C_COMPLAIN_INIT (&conf->complaint_cantpublish);