1 /**
2 * collectd - src/mqtt.c
3 * Copyright (C) 2014 Marc Falzon
4 * Copyright (C) 2014,2015 Florian octo Forster
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
23 *
24 * Authors:
25 * Marc Falzon <marc at baha dot mu>
26 * Florian octo Forster <octo at collectd.org>
27 * Jan-Piet Mens <jpmens at gmail.com>
28 **/
30 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
32 #include "collectd.h"
34 #include "common.h"
35 #include "plugin.h"
36 #include "utils_complain.h"
38 #include <mosquitto.h>
40 #define MQTT_MAX_TOPIC_SIZE 1024
41 #define MQTT_MAX_MESSAGE_SIZE MQTT_MAX_TOPIC_SIZE + 1024
42 #define MQTT_DEFAULT_HOST "localhost"
43 #define MQTT_DEFAULT_PORT 1883
44 #define MQTT_DEFAULT_TOPIC_PREFIX "collectd"
45 #define MQTT_DEFAULT_TOPIC "collectd/#"
46 #ifndef MQTT_KEEPALIVE
47 #define MQTT_KEEPALIVE 60
48 #endif
49 #ifndef SSL_VERIFY_PEER
50 #define SSL_VERIFY_PEER 1
51 #endif
53 /*
54 * Data types
55 */
56 struct mqtt_client_conf {
57 _Bool publish;
58 char *name;
60 struct mosquitto *mosq;
61 _Bool connected;
63 char *host;
64 int port;
65 char *client_id;
66 char *username;
67 char *password;
68 int qos;
69 char *cacertificatefile;
70 char *certificatefile;
71 char *certificatekeyfile;
72 char *tlsprotocol;
73 char *ciphersuite;
75 /* For publishing */
76 char *topic_prefix;
77 _Bool store_rates;
78 _Bool retain;
80 /* For subscribing */
81 pthread_t thread;
82 _Bool loop;
83 char *topic;
84 _Bool clean_session;
86 c_complain_t complaint_cantpublish;
87 pthread_mutex_t lock;
88 };
89 typedef struct mqtt_client_conf mqtt_client_conf_t;
91 static mqtt_client_conf_t **subscribers = NULL;
92 static size_t subscribers_num = 0;
94 /*
95 * Functions
96 */
97 #if LIBMOSQUITTO_MAJOR == 0
98 static char const *mosquitto_strerror(int code) {
99 switch (code) {
100 case MOSQ_ERR_SUCCESS:
101 return "MOSQ_ERR_SUCCESS";
102 case MOSQ_ERR_NOMEM:
103 return "MOSQ_ERR_NOMEM";
104 case MOSQ_ERR_PROTOCOL:
105 return "MOSQ_ERR_PROTOCOL";
106 case MOSQ_ERR_INVAL:
107 return "MOSQ_ERR_INVAL";
108 case MOSQ_ERR_NO_CONN:
109 return "MOSQ_ERR_NO_CONN";
110 case MOSQ_ERR_CONN_REFUSED:
111 return "MOSQ_ERR_CONN_REFUSED";
112 case MOSQ_ERR_NOT_FOUND:
113 return "MOSQ_ERR_NOT_FOUND";
114 case MOSQ_ERR_CONN_LOST:
115 return "MOSQ_ERR_CONN_LOST";
116 case MOSQ_ERR_SSL:
117 return "MOSQ_ERR_SSL";
118 case MOSQ_ERR_PAYLOAD_SIZE:
119 return "MOSQ_ERR_PAYLOAD_SIZE";
120 case MOSQ_ERR_NOT_SUPPORTED:
121 return "MOSQ_ERR_NOT_SUPPORTED";
122 case MOSQ_ERR_AUTH:
123 return "MOSQ_ERR_AUTH";
124 case MOSQ_ERR_ACL_DENIED:
125 return "MOSQ_ERR_ACL_DENIED";
126 case MOSQ_ERR_UNKNOWN:
127 return "MOSQ_ERR_UNKNOWN";
128 case MOSQ_ERR_ERRNO:
129 return "MOSQ_ERR_ERRNO";
130 }
132 return "UNKNOWN ERROR CODE";
133 }
134 #else
135 /* provided by libmosquitto */
136 #endif
138 static void mqtt_free(mqtt_client_conf_t *conf) {
139 if (conf == NULL)
140 return;
142 if (conf->connected)
143 (void)mosquitto_disconnect(conf->mosq);
144 conf->connected = 0;
145 (void)mosquitto_destroy(conf->mosq);
147 sfree(conf->host);
148 sfree(conf->username);
149 sfree(conf->password);
150 sfree(conf->client_id);
151 sfree(conf->topic_prefix);
152 sfree(conf);
153 }
155 static char *strip_prefix(char *topic) {
156 size_t num = 0;
158 for (size_t i = 0; topic[i] != 0; i++)
159 if (topic[i] == '/')
160 num++;
162 if (num < 2)
163 return (NULL);
165 while (num > 2) {
166 char *tmp = strchr(topic, '/');
167 if (tmp == NULL)
168 return (NULL);
169 topic = tmp + 1;
170 num--;
171 }
173 return (topic);
174 }
176 static void on_message(
177 #if LIBMOSQUITTO_MAJOR == 0
178 #else
179 __attribute__((unused)) struct mosquitto *m,
180 #endif
181 __attribute__((unused)) void *arg, const struct mosquitto_message *msg) {
182 value_list_t vl = VALUE_LIST_INIT;
183 data_set_t const *ds;
184 char *topic;
185 char *name;
186 char *payload;
187 int status;
189 if (msg->payloadlen <= 0) {
190 DEBUG("mqtt plugin: message has empty payload");
191 return;
192 }
194 topic = strdup(msg->topic);
195 name = strip_prefix(topic);
197 status = parse_identifier_vl(name, &vl);
198 if (status != 0) {
199 ERROR("mqtt plugin: Unable to parse topic \"%s\".", topic);
200 sfree(topic);
201 return;
202 }
203 sfree(topic);
205 ds = plugin_get_ds(vl.type);
206 if (ds == NULL) {
207 ERROR("mqtt plugin: Unknown type: \"%s\".", vl.type);
208 return;
209 }
211 vl.values = calloc(ds->ds_num, sizeof(*vl.values));
212 if (vl.values == NULL) {
213 ERROR("mqtt plugin: calloc failed.");
214 return;
215 }
216 vl.values_len = ds->ds_num;
218 payload = malloc(msg->payloadlen + 1);
219 if (payload == NULL) {
220 ERROR("mqtt plugin: malloc for payload buffer failed.");
221 sfree(vl.values);
222 return;
223 }
224 memmove(payload, msg->payload, msg->payloadlen);
225 payload[msg->payloadlen] = 0;
227 DEBUG("mqtt plugin: payload = \"%s\"", payload);
228 status = parse_values(payload, &vl, ds);
229 if (status != 0) {
230 ERROR("mqtt plugin: Unable to parse payload \"%s\".", payload);
231 sfree(payload);
232 sfree(vl.values);
233 return;
234 }
235 sfree(payload);
237 plugin_dispatch_values(&vl);
238 sfree(vl.values);
239 } /* void on_message */
241 /* must hold conf->lock when calling. */
242 static int mqtt_reconnect(mqtt_client_conf_t *conf) {
243 int status;
245 if (conf->connected)
246 return (0);
248 status = mosquitto_reconnect(conf->mosq);
249 if (status != MOSQ_ERR_SUCCESS) {
250 char errbuf[1024];
251 ERROR("mqtt_connect_broker: mosquitto_connect failed: %s",
252 (status == MOSQ_ERR_ERRNO) ? sstrerror(errno, errbuf, sizeof(errbuf))
253 : mosquitto_strerror(status));
254 return (-1);
255 }
257 conf->connected = 1;
259 c_release(LOG_INFO, &conf->complaint_cantpublish,
260 "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
261 conf->host, conf->port);
263 return (0);
264 } /* mqtt_reconnect */
266 /* must hold conf->lock when calling. */
267 static int mqtt_connect(mqtt_client_conf_t *conf) {
268 char const *client_id;
269 int status;
271 if (conf->mosq != NULL)
272 return mqtt_reconnect(conf);
274 if (conf->client_id)
275 client_id = conf->client_id;
276 else
277 client_id = hostname_g;
279 #if LIBMOSQUITTO_MAJOR == 0
280 conf->mosq = mosquitto_new(client_id, /* user data = */ conf);
281 #else
282 conf->mosq =
283 mosquitto_new(client_id, conf->clean_session, /* user data = */ conf);
284 #endif
285 if (conf->mosq == NULL) {
286 ERROR("mqtt plugin: mosquitto_new failed");
287 return (-1);
288 }
290 #if LIBMOSQUITTO_MAJOR != 0
291 if (conf->cacertificatefile) {
292 status = mosquitto_tls_set(conf->mosq, conf->cacertificatefile, NULL,
293 conf->certificatefile, conf->certificatekeyfile,
294 /* pw_callback */ NULL);
295 if (status != MOSQ_ERR_SUCCESS) {
296 ERROR("mqtt plugin: cannot mosquitto_tls_set: %s",
297 mosquitto_strerror(status));
298 mosquitto_destroy(conf->mosq);
299 conf->mosq = NULL;
300 return (-1);
301 }
303 status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER,
304 conf->tlsprotocol, conf->ciphersuite);
305 if (status != MOSQ_ERR_SUCCESS) {
306 ERROR("mqtt plugin: cannot mosquitto_tls_opts_set: %s",
307 mosquitto_strerror(status));
308 mosquitto_destroy(conf->mosq);
309 conf->mosq = NULL;
310 return (-1);
311 }
313 status = mosquitto_tls_insecure_set(conf->mosq, false);
314 if (status != MOSQ_ERR_SUCCESS) {
315 ERROR("mqtt plugin: cannot mosquitto_tls_insecure_set: %s",
316 mosquitto_strerror(status));
317 mosquitto_destroy(conf->mosq);
318 conf->mosq = NULL;
319 return (-1);
320 }
321 }
322 #endif
324 if (conf->username && conf->password) {
325 status =
326 mosquitto_username_pw_set(conf->mosq, conf->username, conf->password);
327 if (status != MOSQ_ERR_SUCCESS) {
328 char errbuf[1024];
329 ERROR("mqtt plugin: mosquitto_username_pw_set failed: %s",
330 (status == MOSQ_ERR_ERRNO)
331 ? sstrerror(errno, errbuf, sizeof(errbuf))
332 : mosquitto_strerror(status));
334 mosquitto_destroy(conf->mosq);
335 conf->mosq = NULL;
336 return (-1);
337 }
338 }
340 #if LIBMOSQUITTO_MAJOR == 0
341 status = mosquitto_connect(conf->mosq, conf->host, conf->port,
342 /* keepalive = */ MQTT_KEEPALIVE,
343 /* clean session = */ conf->clean_session);
344 #else
345 status =
346 mosquitto_connect(conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
347 #endif
348 if (status != MOSQ_ERR_SUCCESS) {
349 char errbuf[1024];
350 ERROR("mqtt plugin: mosquitto_connect failed: %s",
351 (status == MOSQ_ERR_ERRNO) ? sstrerror(errno, errbuf, sizeof(errbuf))
352 : mosquitto_strerror(status));
354 mosquitto_destroy(conf->mosq);
355 conf->mosq = NULL;
356 return (-1);
357 }
359 if (!conf->publish) {
360 mosquitto_message_callback_set(conf->mosq, on_message);
362 status =
363 mosquitto_subscribe(conf->mosq,
364 /* message_id = */ NULL, conf->topic, conf->qos);
365 if (status != MOSQ_ERR_SUCCESS) {
366 ERROR("mqtt plugin: Subscribing to \"%s\" failed: %s", conf->topic,
367 mosquitto_strerror(status));
369 mosquitto_disconnect(conf->mosq);
370 mosquitto_destroy(conf->mosq);
371 conf->mosq = NULL;
372 return (-1);
373 }
374 }
376 conf->connected = 1;
377 return (0);
378 } /* mqtt_connect */
380 static void *subscribers_thread(void *arg) {
381 mqtt_client_conf_t *conf = arg;
382 int status;
384 conf->loop = 1;
386 while (conf->loop) {
387 status = mqtt_connect(conf);
388 if (status != 0) {
389 sleep(1);
390 continue;
391 }
393 /* The documentation says "0" would map to the default (1000ms), but
394 * that does not work on some versions. */
395 #if LIBMOSQUITTO_MAJOR == 0
396 status = mosquitto_loop(conf->mosq, /* timeout = */ 1000 /* ms */);
397 #else
398 status = mosquitto_loop(conf->mosq,
399 /* timeout[ms] = */ 1000,
400 /* max_packets = */ 100);
401 #endif
402 if (status == MOSQ_ERR_CONN_LOST) {
403 conf->connected = 0;
404 continue;
405 } else if (status != MOSQ_ERR_SUCCESS) {
406 ERROR("mqtt plugin: mosquitto_loop failed: %s",
407 mosquitto_strerror(status));
408 mosquitto_destroy(conf->mosq);
409 conf->mosq = NULL;
410 conf->connected = 0;
411 continue;
412 }
414 DEBUG("mqtt plugin: mosquitto_loop succeeded.");
415 } /* while (conf->loop) */
417 pthread_exit(0);
418 } /* void *subscribers_thread */
420 static int publish(mqtt_client_conf_t *conf, char const *topic,
421 void const *payload, size_t payload_len) {
422 int status;
424 pthread_mutex_lock(&conf->lock);
426 status = mqtt_connect(conf);
427 if (status != 0) {
428 pthread_mutex_unlock(&conf->lock);
429 ERROR("mqtt plugin: unable to reconnect to broker");
430 return (status);
431 }
433 status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
434 #if LIBMOSQUITTO_MAJOR == 0
435 (uint32_t)payload_len, payload,
436 #else
437 (int)payload_len, payload,
438 #endif
439 conf->qos, conf->retain);
440 if (status != MOSQ_ERR_SUCCESS) {
441 char errbuf[1024];
442 c_complain(LOG_ERR, &conf->complaint_cantpublish,
443 "mqtt plugin: mosquitto_publish failed: %s",
444 (status == MOSQ_ERR_ERRNO)
445 ? sstrerror(errno, errbuf, sizeof(errbuf))
446 : mosquitto_strerror(status));
447 /* Mark our connection "down" regardless of the error as a safety
448 * measure; we will try to reconnect the next time we have to publish a
449 * message */
450 conf->connected = 0;
451 mosquitto_disconnect(conf->mosq);
453 pthread_mutex_unlock(&conf->lock);
454 return (-1);
455 }
457 pthread_mutex_unlock(&conf->lock);
458 return (0);
459 } /* int publish */
461 static int format_topic(char *buf, size_t buf_len, data_set_t const *ds,
462 value_list_t const *vl, mqtt_client_conf_t *conf) {
463 char name[MQTT_MAX_TOPIC_SIZE];
464 int status;
465 char *c;
467 if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
468 return (FORMAT_VL(buf, buf_len, vl));
470 status = FORMAT_VL(name, sizeof(name), vl);
471 if (status != 0)
472 return (status);
474 status = ssnprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name);
475 if ((status < 0) || (((size_t)status) >= buf_len))
476 return (ENOMEM);
478 while((c = strchr(buf, '#')) || (c = strchr(buf, '+'))) {
479 *c = '_';
480 }
482 return (0);
483 } /* int format_topic */
485 static int mqtt_write(const data_set_t *ds, const value_list_t *vl,
486 user_data_t *user_data) {
487 mqtt_client_conf_t *conf;
488 char topic[MQTT_MAX_TOPIC_SIZE];
489 char payload[MQTT_MAX_MESSAGE_SIZE];
490 int status = 0;
492 if ((user_data == NULL) || (user_data->data == NULL))
493 return (EINVAL);
494 conf = user_data->data;
496 status = format_topic(topic, sizeof(topic), ds, vl, conf);
497 if (status != 0) {
498 ERROR("mqtt plugin: format_topic failed with status %d.", status);
499 return (status);
500 }
502 status = format_values(payload, sizeof(payload), ds, vl, conf->store_rates);
503 if (status != 0) {
504 ERROR("mqtt plugin: format_values failed with status %d.", status);
505 return (status);
506 }
508 status = publish(conf, topic, payload, strlen(payload) + 1);
509 if (status != 0) {
510 ERROR("mqtt plugin: publish failed: %s", mosquitto_strerror(status));
511 return (status);
512 }
514 return (status);
515 } /* mqtt_write */
517 /*
518 * <Publish "name">
519 * Host "example.com"
520 * Port 1883
521 * ClientId "collectd"
522 * User "guest"
523 * Password "secret"
524 * Prefix "collectd"
525 * StoreRates true
526 * Retain false
527 * QoS 0
528 * CACert "ca.pem" Enables TLS if set
529 * CertificateFile "client-cert.pem" optional
530 * CertificateKeyFile "client-key.pem" optional
531 * TLSProtocol "tlsv1.2" optional
532 * </Publish>
533 */
534 static int mqtt_config_publisher(oconfig_item_t *ci) {
535 mqtt_client_conf_t *conf;
536 char cb_name[1024];
537 int status;
539 conf = calloc(1, sizeof(*conf));
540 if (conf == NULL) {
541 ERROR("mqtt plugin: calloc failed.");
542 return (-1);
543 }
544 conf->publish = 1;
546 conf->name = NULL;
547 status = cf_util_get_string(ci, &conf->name);
548 if (status != 0) {
549 mqtt_free(conf);
550 return (status);
551 }
553 conf->host = strdup(MQTT_DEFAULT_HOST);
554 conf->port = MQTT_DEFAULT_PORT;
555 conf->client_id = NULL;
556 conf->qos = 0;
557 conf->topic_prefix = strdup(MQTT_DEFAULT_TOPIC_PREFIX);
558 conf->store_rates = 1;
560 status = pthread_mutex_init(&conf->lock, NULL);
561 if (status != 0) {
562 mqtt_free(conf);
563 return (status);
564 }
566 C_COMPLAIN_INIT(&conf->complaint_cantpublish);
568 for (int i = 0; i < ci->children_num; i++) {
569 oconfig_item_t *child = ci->children + i;
570 if (strcasecmp("Host", child->key) == 0)
571 cf_util_get_string(child, &conf->host);
572 else if (strcasecmp("Port", child->key) == 0) {
573 int tmp = cf_util_get_port_number(child);
574 if (tmp < 0)
575 ERROR("mqtt plugin: Invalid port number.");
576 else
577 conf->port = tmp;
578 } else if (strcasecmp("ClientId", child->key) == 0)
579 cf_util_get_string(child, &conf->client_id);
580 else if (strcasecmp("User", child->key) == 0)
581 cf_util_get_string(child, &conf->username);
582 else if (strcasecmp("Password", child->key) == 0)
583 cf_util_get_string(child, &conf->password);
584 else if (strcasecmp("QoS", child->key) == 0) {
585 int tmp = -1;
586 status = cf_util_get_int(child, &tmp);
587 if ((status != 0) || (tmp < 0) || (tmp > 2))
588 ERROR("mqtt plugin: Not a valid QoS setting.");
589 else
590 conf->qos = tmp;
591 } else if (strcasecmp("Prefix", child->key) == 0)
592 cf_util_get_string(child, &conf->topic_prefix);
593 else if (strcasecmp("StoreRates", child->key) == 0)
594 cf_util_get_boolean(child, &conf->store_rates);
595 else if (strcasecmp("Retain", child->key) == 0)
596 cf_util_get_boolean(child, &conf->retain);
597 else if (strcasecmp("CACert", child->key) == 0)
598 cf_util_get_string(child, &conf->cacertificatefile);
599 else if (strcasecmp("CertificateFile", child->key) == 0)
600 cf_util_get_string(child, &conf->certificatefile);
601 else if (strcasecmp("CertificateKeyFile", child->key) == 0)
602 cf_util_get_string(child, &conf->certificatekeyfile);
603 else if (strcasecmp("TLSProtocol", child->key) == 0)
604 cf_util_get_string(child, &conf->tlsprotocol);
605 else if (strcasecmp("CipherSuite", child->key) == 0)
606 cf_util_get_string(child, &conf->ciphersuite);
607 else
608 ERROR("mqtt plugin: Unknown config option: %s", child->key);
609 }
611 ssnprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name);
612 plugin_register_write(cb_name, mqtt_write, &(user_data_t){
613 .data = conf,
614 });
615 return (0);
616 } /* mqtt_config_publisher */
618 /*
619 * <Subscribe "name">
620 * Host "example.com"
621 * Port 1883
622 * ClientId "collectd"
623 * User "guest"
624 * Password "secret"
625 * Topic "collectd/#"
626 * </Subscribe>
627 */
628 static int mqtt_config_subscriber(oconfig_item_t *ci) {
629 mqtt_client_conf_t **tmp;
630 mqtt_client_conf_t *conf;
631 int status;
633 conf = calloc(1, sizeof(*conf));
634 if (conf == NULL) {
635 ERROR("mqtt plugin: calloc failed.");
636 return (-1);
637 }
638 conf->publish = 0;
640 conf->name = NULL;
641 status = cf_util_get_string(ci, &conf->name);
642 if (status != 0) {
643 mqtt_free(conf);
644 return (status);
645 }
647 conf->host = strdup(MQTT_DEFAULT_HOST);
648 conf->port = MQTT_DEFAULT_PORT;
649 conf->client_id = NULL;
650 conf->qos = 2;
651 conf->topic = strdup(MQTT_DEFAULT_TOPIC);
652 conf->clean_session = 1;
654 status = pthread_mutex_init(&conf->lock, NULL);
655 if (status != 0) {
656 mqtt_free(conf);
657 return (status);
658 }
660 C_COMPLAIN_INIT(&conf->complaint_cantpublish);
662 for (int i = 0; i < ci->children_num; i++) {
663 oconfig_item_t *child = ci->children + i;
664 if (strcasecmp("Host", child->key) == 0)
665 cf_util_get_string(child, &conf->host);
666 else if (strcasecmp("Port", child->key) == 0) {
667 status = cf_util_get_port_number(child);
668 if (status < 0)
669 ERROR("mqtt plugin: Invalid port number.");
670 else
671 conf->port = status;
672 } else if (strcasecmp("ClientId", child->key) == 0)
673 cf_util_get_string(child, &conf->client_id);
674 else if (strcasecmp("User", child->key) == 0)
675 cf_util_get_string(child, &conf->username);
676 else if (strcasecmp("Password", child->key) == 0)
677 cf_util_get_string(child, &conf->password);
678 else if (strcasecmp("QoS", child->key) == 0) {
679 int qos = -1;
680 status = cf_util_get_int(child, &qos);
681 if ((status != 0) || (qos < 0) || (qos > 2))
682 ERROR("mqtt plugin: Not a valid QoS setting.");
683 else
684 conf->qos = qos;
685 } else if (strcasecmp("Topic", child->key) == 0)
686 cf_util_get_string(child, &conf->topic);
687 else if (strcasecmp("CleanSession", child->key) == 0)
688 cf_util_get_boolean(child, &conf->clean_session);
689 else
690 ERROR("mqtt plugin: Unknown config option: %s", child->key);
691 }
693 tmp = realloc(subscribers, sizeof(*subscribers) * (subscribers_num + 1));
694 if (tmp == NULL) {
695 ERROR("mqtt plugin: realloc failed.");
696 mqtt_free(conf);
697 return (-1);
698 }
699 subscribers = tmp;
700 subscribers[subscribers_num] = conf;
701 subscribers_num++;
703 return (0);
704 } /* mqtt_config_subscriber */
706 /*
707 * <Plugin mqtt>
708 * <Publish "name">
709 * # ...
710 * </Publish>
711 * <Subscribe "name">
712 * # ...
713 * </Subscribe>
714 * </Plugin>
715 */
716 static int mqtt_config(oconfig_item_t *ci) {
717 for (int i = 0; i < ci->children_num; i++) {
718 oconfig_item_t *child = ci->children + i;
720 if (strcasecmp("Publish", child->key) == 0)
721 mqtt_config_publisher(child);
722 else if (strcasecmp("Subscribe", child->key) == 0)
723 mqtt_config_subscriber(child);
724 else
725 ERROR("mqtt plugin: Unknown config option: %s", child->key);
726 }
728 return (0);
729 } /* int mqtt_config */
731 static int mqtt_init(void) {
732 mosquitto_lib_init();
734 for (size_t i = 0; i < subscribers_num; i++) {
735 int status;
737 if (subscribers[i]->loop)
738 continue;
740 status = plugin_thread_create(&subscribers[i]->thread,
741 /* attrs = */ NULL,
742 /* func = */ subscribers_thread,
743 /* args = */ subscribers[i],
744 /* name = */ "mqtt");
745 if (status != 0) {
746 char errbuf[1024];
747 ERROR("mqtt plugin: pthread_create failed: %s",
748 sstrerror(errno, errbuf, sizeof(errbuf)));
749 continue;
750 }
751 }
753 return (0);
754 } /* mqtt_init */
756 void module_register(void) {
757 plugin_register_complex_config("mqtt", mqtt_config);
758 plugin_register_init("mqtt", mqtt_init);
759 } /* void module_register */