1 /**
2 * collectd - src/mqtt.c
3 * Copyright (C) 2014 Marc Falzon
4 * Copyright (C) 2014,2015 Florian octo Forster
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
23 *
24 * Authors:
25 * Marc Falzon <marc at baha dot mu>
26 * Florian octo Forster <octo at collectd.org>
27 **/
29 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
32 #include "collectd.h"
33 #include "common.h"
34 #include "plugin.h"
35 #include "utils_cache.h"
36 #include "utils_complain.h"
38 #include <pthread.h>
40 #include <mosquitto.h>
42 #define MQTT_MAX_TOPIC_SIZE 1024
43 #define MQTT_MAX_MESSAGE_SIZE MQTT_MAX_TOPIC_SIZE + 1024
44 #define MQTT_DEFAULT_HOST "localhost"
45 #define MQTT_DEFAULT_PORT 1883
46 #define MQTT_DEFAULT_TOPIC_PREFIX "collectd"
47 #define MQTT_DEFAULT_TOPIC "collectd/#"
48 #ifndef MQTT_KEEPALIVE
49 # define MQTT_KEEPALIVE 60
50 #endif
53 /*
54 * Data types
55 */
56 struct mqtt_client_conf
57 {
58 _Bool publish;
59 char *name;
61 struct mosquitto *mosq;
62 _Bool connected;
64 char *host;
65 int port;
66 char *client_id;
67 char *username;
68 char *password;
69 int qos;
71 /* For publishing */
72 char *topic_prefix;
73 _Bool store_rates;
74 _Bool retain;
76 /* For subscribing */
77 pthread_t thread;
78 _Bool loop;
79 char *topic;
80 _Bool clean_session;
82 c_complain_t complaint_cantpublish;
83 pthread_mutex_t lock;
84 };
85 typedef struct mqtt_client_conf mqtt_client_conf_t;
87 static mqtt_client_conf_t **subscribers = NULL;
88 static size_t subscribers_num = 0;
90 /*
91 * Functions
92 */
93 #if LIBMOSQUITTO_MAJOR == 0
94 static char const *mosquitto_strerror (int code)
95 {
96 switch (code)
97 {
98 case MOSQ_ERR_SUCCESS: return "MOSQ_ERR_SUCCESS";
99 case MOSQ_ERR_NOMEM: return "MOSQ_ERR_NOMEM";
100 case MOSQ_ERR_PROTOCOL: return "MOSQ_ERR_PROTOCOL";
101 case MOSQ_ERR_INVAL: return "MOSQ_ERR_INVAL";
102 case MOSQ_ERR_NO_CONN: return "MOSQ_ERR_NO_CONN";
103 case MOSQ_ERR_CONN_REFUSED: return "MOSQ_ERR_CONN_REFUSED";
104 case MOSQ_ERR_NOT_FOUND: return "MOSQ_ERR_NOT_FOUND";
105 case MOSQ_ERR_CONN_LOST: return "MOSQ_ERR_CONN_LOST";
106 case MOSQ_ERR_SSL: return "MOSQ_ERR_SSL";
107 case MOSQ_ERR_PAYLOAD_SIZE: return "MOSQ_ERR_PAYLOAD_SIZE";
108 case MOSQ_ERR_NOT_SUPPORTED: return "MOSQ_ERR_NOT_SUPPORTED";
109 case MOSQ_ERR_AUTH: return "MOSQ_ERR_AUTH";
110 case MOSQ_ERR_ACL_DENIED: return "MOSQ_ERR_ACL_DENIED";
111 case MOSQ_ERR_UNKNOWN: return "MOSQ_ERR_UNKNOWN";
112 case MOSQ_ERR_ERRNO: return "MOSQ_ERR_ERRNO";
113 }
115 return "UNKNOWN ERROR CODE";
116 }
117 #else
118 /* provided by libmosquitto */
119 #endif
121 static void mqtt_free (mqtt_client_conf_t *conf)
122 {
123 if (conf == NULL)
124 return;
126 if (conf->connected)
127 (void) mosquitto_disconnect (conf->mosq);
128 conf->connected = 0;
129 (void) mosquitto_destroy (conf->mosq);
131 sfree (conf->host);
132 sfree (conf->username);
133 sfree (conf->password);
134 sfree (conf->client_id);
135 sfree (conf->topic_prefix);
136 sfree (conf);
137 }
139 static char *strip_prefix (char *topic)
140 {
141 size_t num;
142 size_t i;
144 num = 0;
145 for (i = 0; topic[i] != 0; i++)
146 if (topic[i] == '/')
147 num++;
149 if (num < 2)
150 return (NULL);
152 while (num > 2)
153 {
154 char *tmp = strchr (topic, '/');
155 if (tmp == NULL)
156 return (NULL);
157 topic = tmp + 1;
158 num--;
159 }
161 return (topic);
162 }
164 static void on_message (
165 #if LIBMOSQUITTO_MAJOR == 0
166 #else
167 __attribute__((unused)) struct mosquitto *m,
168 #endif
169 __attribute__((unused)) void *arg,
170 const struct mosquitto_message *msg)
171 {
172 value_list_t vl = VALUE_LIST_INIT;
173 data_set_t const *ds;
174 char *topic;
175 char *name;
176 char *payload;
177 int status;
179 if ((msg->payloadlen <= 0)
180 || (((uint8_t *) msg->payload)[msg->payloadlen - 1] != 0))
181 return;
183 topic = strdup (msg->topic);
184 name = strip_prefix (topic);
186 status = parse_identifier_vl (name, &vl);
187 if (status != 0)
188 {
189 ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic);
190 sfree (topic);
191 return;
192 }
193 sfree (topic);
195 ds = plugin_get_ds (vl.type);
196 if (ds == NULL)
197 {
198 ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type);
199 return;
200 }
202 vl.values = calloc (ds->ds_num, sizeof (*vl.values));
203 if (vl.values == NULL)
204 {
205 ERROR ("mqtt plugin: calloc failed.");
206 return;
207 }
208 vl.values_len = ds->ds_num;
210 payload = strdup ((void *) msg->payload);
211 DEBUG ("mqtt plugin: payload = \"%s\"", payload);
212 status = parse_values (payload, &vl, ds);
213 if (status != 0)
214 {
215 ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload);
216 sfree (payload);
217 sfree (vl.values);
218 return;
219 }
220 sfree (payload);
222 plugin_dispatch_values (&vl);
223 sfree (vl.values);
224 } /* void on_message */
226 /* must hold conf->lock when calling. */
227 static int mqtt_reconnect (mqtt_client_conf_t *conf)
228 {
229 int status;
231 if (conf->connected)
232 return (0);
234 status = mosquitto_reconnect (conf->mosq);
235 if (status != MOSQ_ERR_SUCCESS)
236 {
237 char errbuf[1024];
238 ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
239 (status == MOSQ_ERR_ERRNO)
240 ? sstrerror(errno, errbuf, sizeof (errbuf))
241 : mosquitto_strerror (status));
242 return (-1);
243 }
245 conf->connected = 1;
247 c_release (LOG_INFO,
248 &conf->complaint_cantpublish,
249 "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
250 conf->host, conf->port);
252 return (0);
253 } /* mqtt_reconnect */
255 /* must hold conf->lock when calling. */
256 static int mqtt_connect (mqtt_client_conf_t *conf)
257 {
258 char const *client_id;
259 int status;
261 if (conf->mosq != NULL)
262 return mqtt_reconnect (conf);
264 if (conf->client_id)
265 client_id = conf->client_id;
266 else
267 client_id = hostname_g;
269 #if LIBMOSQUITTO_MAJOR == 0
270 conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
271 #else
272 conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf);
273 #endif
274 if (conf->mosq == NULL)
275 {
276 ERROR ("mqtt plugin: mosquitto_new failed");
277 return (-1);
278 }
280 if (conf->username && conf->password)
281 {
282 status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
283 if (status != MOSQ_ERR_SUCCESS)
284 {
285 char errbuf[1024];
286 ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
287 (status == MOSQ_ERR_ERRNO)
288 ? sstrerror (errno, errbuf, sizeof (errbuf))
289 : mosquitto_strerror (status));
291 mosquitto_destroy (conf->mosq);
292 conf->mosq = NULL;
293 return (-1);
294 }
295 }
297 #if LIBMOSQUITTO_MAJOR == 0
298 status = mosquitto_connect (conf->mosq, conf->host, conf->port,
299 /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
300 #else
301 status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
302 #endif
303 if (status != MOSQ_ERR_SUCCESS)
304 {
305 char errbuf[1024];
306 ERROR ("mqtt plugin: mosquitto_connect failed: %s",
307 (status == MOSQ_ERR_ERRNO)
308 ? sstrerror (errno, errbuf, sizeof (errbuf))
309 : mosquitto_strerror (status));
311 mosquitto_destroy (conf->mosq);
312 conf->mosq = NULL;
313 return (-1);
314 }
316 if (!conf->publish)
317 {
318 mosquitto_message_callback_set (conf->mosq, on_message);
320 status = mosquitto_subscribe (conf->mosq,
321 /* message_id = */ NULL,
322 conf->topic, conf->qos);
323 if (status != MOSQ_ERR_SUCCESS)
324 {
325 ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s",
326 conf->topic, mosquitto_strerror (status));
328 mosquitto_disconnect (conf->mosq);
329 mosquitto_destroy (conf->mosq);
330 conf->mosq = NULL;
331 return (-1);
332 }
333 }
335 conf->connected = 1;
336 return (0);
337 } /* mqtt_connect */
339 static void *subscribers_thread (void *arg)
340 {
341 mqtt_client_conf_t *conf = arg;
342 int status;
344 conf->loop = 1;
346 while (conf->loop)
347 {
348 status = mqtt_connect (conf);
349 if (status != 0)
350 {
351 sleep (1);
352 continue;
353 }
355 /* The documentation says "0" would map to the default (1000ms), but
356 * that does not work on some versions. */
357 #if LIBMOSQUITTO_MAJOR == 0
358 status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
359 #else
360 status = mosquitto_loop (conf->mosq,
361 /* timeout[ms] = */ 1000,
362 /* max_packets = */ 100);
363 #endif
364 if (status == MOSQ_ERR_CONN_LOST)
365 {
366 conf->connected = 0;
367 continue;
368 }
369 else if (status != MOSQ_ERR_SUCCESS)
370 {
371 ERROR ("mqtt plugin: mosquitto_loop failed: %s",
372 mosquitto_strerror (status));
373 mosquitto_destroy (conf->mosq);
374 conf->mosq = NULL;
375 conf->connected = 0;
376 continue;
377 }
379 DEBUG ("mqtt plugin: mosquitto_loop succeeded.");
380 } /* while (conf->loop) */
382 pthread_exit (0);
383 } /* void *subscribers_thread */
385 static int publish (mqtt_client_conf_t *conf, char const *topic,
386 void const *payload, size_t payload_len)
387 {
388 int status;
390 pthread_mutex_lock (&conf->lock);
392 status = mqtt_connect (conf);
393 if (status != 0) {
394 pthread_mutex_unlock (&conf->lock);
395 ERROR ("mqtt plugin: unable to reconnect to broker");
396 return (status);
397 }
399 status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
400 #if LIBMOSQUITTO_MAJOR == 0
401 (uint32_t) payload_len, payload,
402 #else
403 (int) payload_len, payload,
404 #endif
405 conf->qos, conf->retain);
406 if (status != MOSQ_ERR_SUCCESS)
407 {
408 char errbuf[1024];
409 c_complain (LOG_ERR,
410 &conf->complaint_cantpublish,
411 "plugin mqtt: mosquitto_publish failed: %s",
412 status == MOSQ_ERR_ERRNO ?
413 sstrerror(errno, errbuf, sizeof (errbuf)) :
414 mosquitto_strerror(status));
415 /* Mark our connection "down" regardless of the error as a safety
416 * measure; we will try to reconnect the next time we have to publish a
417 * message */
418 conf->connected = 0;
420 pthread_mutex_unlock (&conf->lock);
421 return (-1);
422 }
424 pthread_mutex_unlock (&conf->lock);
425 return (0);
426 } /* int publish */
428 static int format_topic (char *buf, size_t buf_len,
429 data_set_t const *ds, value_list_t const *vl,
430 mqtt_client_conf_t *conf)
431 {
432 char name[MQTT_MAX_TOPIC_SIZE];
433 int status;
435 if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
436 return (FORMAT_VL (buf, buf_len, vl));
438 status = FORMAT_VL (name, sizeof (name), vl);
439 if (status != 0)
440 return (status);
442 status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name);
443 if ((status < 0) || (((size_t) status) >= buf_len))
444 return (ENOMEM);
446 return (0);
447 } /* int format_topic */
449 static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
450 user_data_t *user_data)
451 {
452 mqtt_client_conf_t *conf;
453 char topic[MQTT_MAX_TOPIC_SIZE];
454 char payload[MQTT_MAX_MESSAGE_SIZE];
455 int status = 0;
457 if ((user_data == NULL) || (user_data->data == NULL))
458 return (EINVAL);
459 conf = user_data->data;
461 status = format_topic (topic, sizeof (topic), ds, vl, conf);
462 if (status != 0)
463 {
464 ERROR ("mqtt plugin: format_topic failed with status %d.", status);
465 return (status);
466 }
468 status = format_values (payload, sizeof (payload),
469 ds, vl, conf->store_rates);
470 if (status != 0)
471 {
472 ERROR ("mqtt plugin: format_values failed with status %d.", status);
473 return (status);
474 }
476 status = publish (conf, topic, payload, strlen (payload) + 1);
477 if (status != 0)
478 {
479 ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status));
480 return (status);
481 }
483 return (status);
484 } /* mqtt_write */
486 /*
487 * <Publish "name">
488 * Host "example.com"
489 * Port 1883
490 * ClientId "collectd"
491 * User "guest"
492 * Password "secret"
493 * Prefix "collectd"
494 * StoreRates true
495 * Retain false
496 * QoS 0
497 * </Publish>
498 */
499 static int mqtt_config_publisher (oconfig_item_t *ci)
500 {
501 mqtt_client_conf_t *conf;
502 char cb_name[1024];
503 user_data_t user_data;
504 int status;
505 int i;
507 conf = calloc (1, sizeof (*conf));
508 if (conf == NULL)
509 {
510 ERROR ("mqtt plugin: malloc failed.");
511 return (-1);
512 }
513 conf->publish = 1;
515 conf->name = NULL;
516 status = cf_util_get_string (ci, &conf->name);
517 if (status != 0)
518 {
519 mqtt_free (conf);
520 return (status);
521 }
523 conf->host = strdup (MQTT_DEFAULT_HOST);
524 conf->port = MQTT_DEFAULT_PORT;
525 conf->client_id = NULL;
526 conf->qos = 0;
527 conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
528 conf->store_rates = 1;
530 C_COMPLAIN_INIT (&conf->complaint_cantpublish);
532 for (i = 0; i < ci->children_num; i++)
533 {
534 oconfig_item_t *child = ci->children + i;
535 if (strcasecmp ("Host", child->key) == 0)
536 cf_util_get_string (child, &conf->host);
537 else if (strcasecmp ("Port", child->key) == 0)
538 {
539 int tmp = cf_util_get_port_number (child);
540 if (tmp < 0)
541 ERROR ("mqtt plugin: Invalid port number.");
542 else
543 conf->port = tmp;
544 }
545 else if (strcasecmp ("ClientId", child->key) == 0)
546 cf_util_get_string (child, &conf->client_id);
547 else if (strcasecmp ("User", child->key) == 0)
548 cf_util_get_string (child, &conf->username);
549 else if (strcasecmp ("Password", child->key) == 0)
550 cf_util_get_string (child, &conf->password);
551 else if (strcasecmp ("QoS", child->key) == 0)
552 {
553 int tmp = -1;
554 status = cf_util_get_int (child, &tmp);
555 if ((status != 0) || (tmp < 0) || (tmp > 2))
556 ERROR ("mqtt plugin: Not a valid QoS setting.");
557 else
558 conf->qos = tmp;
559 }
560 else if (strcasecmp ("Prefix", child->key) == 0)
561 cf_util_get_string (child, &conf->topic_prefix);
562 else if (strcasecmp ("StoreRates", child->key) == 0)
563 cf_util_get_boolean (child, &conf->store_rates);
564 else if (strcasecmp ("Retain", child->key) == 0)
565 cf_util_get_boolean (child, &conf->retain);
566 else
567 ERROR ("mqtt plugin: Unknown config option: %s", child->key);
568 }
570 ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
571 memset (&user_data, 0, sizeof (user_data));
572 user_data.data = conf;
574 plugin_register_write (cb_name, mqtt_write, &user_data);
575 return (0);
576 } /* mqtt_config_publisher */
578 /*
579 * <Subscribe "name">
580 * Host "example.com"
581 * Port 1883
582 * ClientId "collectd"
583 * User "guest"
584 * Password "secret"
585 * Topic "collectd/#"
586 * </Publish>
587 */
588 static int mqtt_config_subscriber (oconfig_item_t *ci)
589 {
590 mqtt_client_conf_t **tmp;
591 mqtt_client_conf_t *conf;
592 int status;
593 int i;
595 conf = calloc (1, sizeof (*conf));
596 if (conf == NULL)
597 {
598 ERROR ("mqtt plugin: malloc failed.");
599 return (-1);
600 }
601 conf->publish = 0;
603 conf->name = NULL;
604 status = cf_util_get_string (ci, &conf->name);
605 if (status != 0)
606 {
607 mqtt_free (conf);
608 return (status);
609 }
611 conf->host = strdup (MQTT_DEFAULT_HOST);
612 conf->port = MQTT_DEFAULT_PORT;
613 conf->client_id = NULL;
614 conf->qos = 2;
615 conf->topic = strdup (MQTT_DEFAULT_TOPIC);
616 conf->clean_session = 1;
618 C_COMPLAIN_INIT (&conf->complaint_cantpublish);
620 for (i = 0; i < ci->children_num; i++)
621 {
622 oconfig_item_t *child = ci->children + i;
623 if (strcasecmp ("Host", child->key) == 0)
624 cf_util_get_string (child, &conf->host);
625 else if (strcasecmp ("Port", child->key) == 0)
626 {
627 int tmp = cf_util_get_port_number (child);
628 if (tmp < 0)
629 ERROR ("mqtt plugin: Invalid port number.");
630 else
631 conf->port = tmp;
632 }
633 else if (strcasecmp ("ClientId", child->key) == 0)
634 cf_util_get_string (child, &conf->client_id);
635 else if (strcasecmp ("User", child->key) == 0)
636 cf_util_get_string (child, &conf->username);
637 else if (strcasecmp ("Password", child->key) == 0)
638 cf_util_get_string (child, &conf->password);
639 else if (strcasecmp ("QoS", child->key) == 0)
640 {
641 int tmp = -1;
642 status = cf_util_get_int (child, &tmp);
643 if ((status != 0) || (tmp < 0) || (tmp > 2))
644 ERROR ("mqtt plugin: Not a valid QoS setting.");
645 else
646 conf->qos = tmp;
647 }
648 else if (strcasecmp ("Topic", child->key) == 0)
649 cf_util_get_string (child, &conf->topic);
650 else if (strcasecmp ("CleanSession", child->key) == 0)
651 cf_util_get_boolean (child, &conf->clean_session);
652 else
653 ERROR ("mqtt plugin: Unknown config option: %s", child->key);
654 }
656 tmp = realloc (subscribers, sizeof (*subscribers) * subscribers_num);
657 if (tmp == NULL)
658 {
659 ERROR ("mqtt plugin: realloc failed.");
660 mqtt_free (conf);
661 return (-1);
662 }
663 subscribers = tmp;
664 subscribers[subscribers_num] = conf;
665 subscribers_num++;
667 return (0);
668 } /* mqtt_config_subscriber */
670 /*
671 * <Plugin mqtt>
672 * <Publish "name">
673 * # ...
674 * </Publish>
675 * <Subscribe "name">
676 * # ...
677 * </Subscribe>
678 * </Plugin>
679 */
680 static int mqtt_config (oconfig_item_t *ci)
681 {
682 int i;
684 for (i = 0; i < ci->children_num; i++)
685 {
686 oconfig_item_t *child = ci->children + i;
688 if (strcasecmp ("Publish", child->key) == 0)
689 mqtt_config_publisher (child);
690 else if (strcasecmp ("Subscribe", child->key) == 0)
691 mqtt_config_subscriber (child);
692 else
693 ERROR ("mqtt plugin: Unknown config option: %s", child->key);
694 }
696 return (0);
697 } /* int mqtt_config */
699 static int mqtt_init (void)
700 {
701 size_t i;
703 mosquitto_lib_init ();
705 for (i = 0; i < subscribers_num; i++)
706 {
707 int status;
709 if (subscribers[i]->loop)
710 continue;
712 status = plugin_thread_create (&subscribers[i]->thread,
713 /* attrs = */ NULL,
714 /* func = */ subscribers_thread,
715 /* args = */ subscribers[i]);
716 if (status != 0)
717 {
718 char errbuf[1024];
719 ERROR ("mqtt plugin: pthread_create failed: %s",
720 sstrerror (errno, errbuf, sizeof (errbuf)));
721 continue;
722 }
723 }
725 return (0);
726 } /* mqtt_init */
728 void module_register (void)
729 {
730 plugin_register_complex_config ("mqtt", mqtt_config);
731 plugin_register_init ("mqtt", mqtt_init);
732 } /* void module_register */
734 /* vim: set sw=4 sts=4 et fdm=marker : */