1924ce735416b8890b57c17f6c63f2ab456ef46c
1 /**
2 * collectd - src/amqp.c
3 * Copyright (C) 2009 Sebastien Pahl
4 * Copyright (C) 2010 Florian Forster
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
23 *
24 * Authors:
25 * Sebastien Pahl <sebastien.pahl at dotcloud.com>
26 * Florian Forster <octo at verplant.org>
27 **/
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
33 #include <pthread.h>
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "utils_format_json.h"
40 #include <amqp.h>
41 #include <amqp_framing.h>
43 /* Defines for the delivery mode. I have no idea why they're not defined by the
44 * library.. */
45 #define CAMQP_DM_VOLATILE 1
46 #define CAMQP_DM_PERSISTENT 2
48 #define CAMQP_CHANNEL 1
50 /*
51 * Data types
52 */
53 struct camqp_config_s
54 {
55 _Bool publish;
56 char *name;
58 char *host;
59 int port;
60 char *vhost;
61 char *user;
62 char *password;
64 char *exchange;
65 char *exchange_type;
66 char *queue;
67 char *routingkey;
68 uint8_t delivery_mode;
70 _Bool store_rates;
72 amqp_connection_state_t connection;
73 pthread_mutex_t lock;
74 };
75 typedef struct camqp_config_s camqp_config_t;
77 /*
78 * Global variables
79 */
80 static const char *def_host = "localhost";
81 static const char *def_vhost = "/";
82 static const char *def_user = "guest";
83 static const char *def_password = "guest";
84 static const char *def_exchange = "amq.fanout";
85 static const char *def_routingkey = "collectd";
87 static pthread_t *subscriber_threads = NULL;
88 static size_t subscriber_threads_num = 0;
89 static _Bool subscriber_threads_running = 1;
91 #define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
93 /*
94 * Functions
95 */
96 static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
97 {
98 int sockfd;
100 if ((conf == NULL) || (conf->connection == NULL))
101 return;
103 sockfd = amqp_get_sockfd (conf->connection);
104 amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
105 amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
106 amqp_destroy_connection (conf->connection);
107 close (sockfd);
108 conf->connection = NULL;
109 } /* }}} void camqp_close_connection */
111 static void camqp_config_free (void *ptr) /* {{{ */
112 {
113 camqp_config_t *conf = ptr;
115 if (conf == NULL)
116 return;
118 camqp_close_connection (conf);
120 sfree (conf->name);
121 sfree (conf->host);
122 sfree (conf->vhost);
123 sfree (conf->user);
124 sfree (conf->password);
125 sfree (conf->exchange);
126 sfree (conf->exchange_type);
127 sfree (conf->queue);
128 sfree (conf->routingkey);
130 sfree (conf);
131 } /* }}} void camqp_config_free */
133 static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */
134 {
135 char *ret;
137 if ((in == NULL) || (in->bytes == NULL))
138 return (NULL);
140 ret = malloc (in->len + 1);
141 if (ret == NULL)
142 return (NULL);
144 memcpy (ret, in->bytes, in->len);
145 ret[in->len] = 0;
147 return (ret);
148 } /* }}} char *camqp_bytes_cstring */
150 static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */
151 {
152 amqp_rpc_reply_t r;
154 r = amqp_get_rpc_reply (conf->connection);
155 if (r.reply_type == AMQP_RESPONSE_NORMAL)
156 return (0);
158 return (1);
159 } /* }}} _Bool camqp_is_error */
161 static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
162 char *buffer, size_t buffer_size)
163 {
164 amqp_rpc_reply_t r;
166 r = amqp_get_rpc_reply (conf->connection);
167 switch (r.reply_type)
168 {
169 case AMQP_RESPONSE_NORMAL:
170 sstrncpy (buffer, "Success", sizeof (buffer));
171 break;
173 case AMQP_RESPONSE_NONE:
174 sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
175 break;
177 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
178 if (r.library_errno)
179 return (sstrerror (r.library_errno, buffer, buffer_size));
180 else
181 sstrncpy (buffer, "End of stream", sizeof (buffer));
182 break;
184 case AMQP_RESPONSE_SERVER_EXCEPTION:
185 if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
186 {
187 amqp_connection_close_t *m = r.reply.decoded;
188 char *tmp = camqp_bytes_cstring (&m->reply_text);
189 ssnprintf (buffer, buffer_size, "Server connection error %d: %s",
190 m->reply_code, tmp);
191 sfree (tmp);
192 }
193 else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
194 {
195 amqp_channel_close_t *m = r.reply.decoded;
196 char *tmp = camqp_bytes_cstring (&m->reply_text);
197 ssnprintf (buffer, buffer_size, "Server channel error %d: %s",
198 m->reply_code, tmp);
199 sfree (tmp);
200 }
201 else
202 {
203 ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32,
204 r.reply.id);
205 }
206 break;
208 default:
209 ssnprintf (buffer, buffer_size, "Unknown reply type %i",
210 (int) r.reply_type);
211 }
213 return (buffer);
214 } /* }}} char *camqp_strerror */
216 static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
217 {
218 amqp_queue_declare_ok_t *qd_ret;
219 amqp_basic_consume_ok_t *cm_ret;
221 qd_ret = amqp_queue_declare (conf->connection,
222 /* channel = */ CAMQP_CHANNEL,
223 /* queue = */ (conf->queue != NULL)
224 ? amqp_cstring_bytes (conf->queue)
225 : AMQP_EMPTY_BYTES,
226 /* passive = */ 0,
227 /* durable = */ 0,
228 /* exclusive = */ 0,
229 /* auto_delete = */ 1,
230 /* arguments = */ AMQP_EMPTY_TABLE);
231 if (qd_ret == NULL)
232 {
233 ERROR ("amqp plugin: amqp_queue_declare failed.");
234 camqp_close_connection (conf);
235 return (-1);
236 }
238 if (conf->queue == NULL)
239 {
240 conf->queue = camqp_bytes_cstring (&qd_ret->queue);
241 if (conf->queue == NULL)
242 {
243 ERROR ("amqp plugin: camqp_bytes_cstring failed.");
244 camqp_close_connection (conf);
245 return (-1);
246 }
248 INFO ("amqp plugin: Created queue \"%s\".", conf->queue);
249 }
250 DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue);
252 /* bind to an exchange */
253 if (conf->exchange != NULL)
254 {
255 amqp_queue_bind_ok_t *qb_ret;
257 /* create the exchange */
258 if (conf->exchange_type != NULL)
259 {
260 amqp_exchange_declare_ok_t *ed_ret;
262 ed_ret = amqp_exchange_declare (conf->connection,
263 /* channel = */ CAMQP_CHANNEL,
264 /* exchange = */ amqp_cstring_bytes (conf->exchange),
265 /* type = */ amqp_cstring_bytes (conf->exchange_type),
266 /* passive = */ 0,
267 /* durable = */ 0,
268 /* auto_delete = */ 1,
269 /* arguments = */ AMQP_EMPTY_TABLE);
270 if ((ed_ret == NULL) && camqp_is_error (conf))
271 {
272 char errbuf[1024];
273 ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
274 camqp_strerror (conf, errbuf, sizeof (errbuf)));
275 camqp_close_connection (conf);
276 return (-1);
277 }
278 }
280 DEBUG ("amqp plugin: queue = %s; exchange = %s; routing_key = %s;",
281 conf->queue, conf->exchange, CONF (conf, routingkey));
283 assert (conf->queue != NULL);
284 qb_ret = amqp_queue_bind (conf->connection,
285 /* channel = */ CAMQP_CHANNEL,
286 /* queue = */ amqp_cstring_bytes (conf->queue),
287 /* exchange = */ amqp_cstring_bytes (conf->exchange),
288 #if 1
289 /* routing_key = */ amqp_cstring_bytes (CONF (conf, routingkey)),
290 #else
291 /* routing_key = */ AMQP_EMPTY_BYTES,
292 #endif
293 /* arguments = */ AMQP_EMPTY_TABLE);
294 if ((qb_ret == NULL) && camqp_is_error (conf))
295 {
296 char errbuf[1024];
297 ERROR ("amqp plugin: amqp_queue_bind failed: %s",
298 camqp_strerror (conf, errbuf, sizeof (errbuf)));
299 camqp_close_connection (conf);
300 return (-1);
301 }
302 } /* if (conf->exchange != NULL) */
304 cm_ret = amqp_basic_consume (conf->connection,
305 /* channel = */ CAMQP_CHANNEL,
306 /* queue = */ amqp_cstring_bytes (conf->queue),
307 /* consumer_tag = */ AMQP_EMPTY_BYTES,
308 /* no_local = */ 0,
309 /* no_ack = */ 1,
310 /* exclusive = */ 0);
311 if ((cm_ret == NULL) && camqp_is_error (conf))
312 {
313 char errbuf[1024];
314 ERROR ("amqp plugin: amqp_basic_consume failed: %s",
315 camqp_strerror (conf, errbuf, sizeof (errbuf)));
316 camqp_close_connection (conf);
317 return (-1);
318 }
320 return (0);
321 } /* }}} int camqp_setup_queue */
323 static int camqp_connect (camqp_config_t *conf) /* {{{ */
324 {
325 amqp_rpc_reply_t reply;
326 int sockfd;
327 int status;
329 if (conf->connection != NULL)
330 return (0);
332 conf->connection = amqp_new_connection ();
333 if (conf->connection == NULL)
334 {
335 ERROR ("amqp plugin: amqp_new_connection failed.");
336 return (ENOMEM);
337 }
339 sockfd = amqp_open_socket (CONF(conf, host), conf->port);
340 if (sockfd < 0)
341 {
342 char errbuf[1024];
343 status = (-1) * sockfd;
344 ERROR ("amqp plugin: amqp_open_socket failed: %s",
345 sstrerror (status, errbuf, sizeof (errbuf)));
346 amqp_destroy_connection (conf->connection);
347 conf->connection = NULL;
348 return (status);
349 }
350 amqp_set_sockfd (conf->connection, sockfd);
352 reply = amqp_login (conf->connection, CONF(conf, vhost),
353 /* channel max = */ 0,
354 /* frame max = */ 131072,
355 /* heartbeat = */ 0,
356 /* authentication = */ AMQP_SASL_METHOD_PLAIN,
357 CONF(conf, user), CONF(conf, password));
358 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
359 {
360 ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
361 CONF(conf, vhost), CONF(conf, user));
362 amqp_destroy_connection (conf->connection);
363 close (sockfd);
364 conf->connection = NULL;
365 return (1);
366 }
368 amqp_channel_open (conf->connection, /* channel = */ 1);
369 /* FIXME: Is checking "reply.reply_type" really correct here? How does
370 * it get set? --octo */
371 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
372 {
373 ERROR ("amqp plugin: amqp_channel_open failed.");
374 amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
375 amqp_destroy_connection (conf->connection);
376 close(sockfd);
377 conf->connection = NULL;
378 return (1);
379 }
381 INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
382 "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
384 if (!conf->publish)
385 return (camqp_setup_queue (conf));
386 return (0);
387 } /* }}} int camqp_connect */
389 static int camqp_read_body (camqp_config_t *conf, /* {{{ */
390 size_t body_size)
391 {
392 char body[body_size + 1];
393 char *body_ptr;
394 size_t received;
395 amqp_frame_t frame;
396 int status;
398 memset (body, 0, sizeof (body));
399 body_ptr = &body[0];
400 received = 0;
402 while (received < body_size)
403 {
404 status = amqp_simple_wait_frame (conf->connection, &frame);
405 if (status < 0)
406 {
407 char errbuf[1024];
408 status = (-1) * status;
409 ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
410 sstrerror (status, errbuf, sizeof (errbuf)));
411 camqp_close_connection (conf);
412 return (status);
413 }
415 if (frame.frame_type != AMQP_FRAME_BODY)
416 {
417 NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
418 frame.frame_type);
419 return (-1);
420 }
422 if ((body_size - received) < frame.payload.body_fragment.len)
423 {
424 WARNING ("amqp plugin: Body is larger than indicated by header.");
425 return (-1);
426 }
428 memcpy (body_ptr, frame.payload.body_fragment.bytes,
429 frame.payload.body_fragment.len);
430 body_ptr += frame.payload.body_fragment.len;
431 received += frame.payload.body_fragment.len;
432 } /* while (received < body_size) */
434 DEBUG ("amqp plugin: camqp_read_body: body = %s", body);
436 return (0);
437 } /* }}} int camqp_read_body */
439 static int camqp_read_header (camqp_config_t *conf) /* {{{ */
440 {
441 int status;
442 amqp_frame_t frame;
444 status = amqp_simple_wait_frame (conf->connection, &frame);
445 if (status < 0)
446 {
447 char errbuf[1024];
448 status = (-1) * status;
449 ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
450 sstrerror (status, errbuf, sizeof (errbuf)));
451 camqp_close_connection (conf);
452 return (status);
453 }
455 if (frame.frame_type != AMQP_FRAME_HEADER)
456 {
457 NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
458 frame.frame_type);
459 return (-1);
460 }
462 return (camqp_read_body (conf, frame.payload.properties.body_size));
463 } /* }}} int camqp_read_header */
465 static void *camqp_subscribe_thread (void *user_data) /* {{{ */
466 {
467 camqp_config_t *conf = user_data;
468 int status;
470 while (subscriber_threads_running)
471 {
472 amqp_frame_t frame;
474 status = camqp_connect (conf);
475 if (status != 0)
476 {
477 ERROR ("amqp plugin: camqp_connect failed. "
478 "Will sleep for %i seconds.", interval_g);
479 sleep (interval_g);
480 continue;
481 }
483 status = amqp_simple_wait_frame (conf->connection, &frame);
484 if (status < 0)
485 {
486 ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
487 "Will sleep for %i seconds.", interval_g);
488 camqp_close_connection (conf);
489 sleep (interval_g);
490 continue;
491 }
493 if (frame.frame_type != AMQP_FRAME_METHOD)
494 {
495 DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8,
496 frame.frame_type);
497 continue;
498 }
500 if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
501 {
502 DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32,
503 frame.payload.method.id);
504 continue;
505 }
507 status = camqp_read_header (conf);
509 amqp_maybe_release_buffers (conf->connection);
510 } /* while (subscriber_threads_running) */
512 camqp_config_free (conf);
513 pthread_exit (NULL);
514 } /* }}} void *camqp_subscribe_thread */
516 static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
517 {
518 int status;
519 pthread_t *tmp;
521 tmp = realloc (subscriber_threads,
522 sizeof (*subscriber_threads) * (subscriber_threads_num + 1));
523 if (tmp == NULL)
524 {
525 ERROR ("amqp plugin: realloc failed.");
526 camqp_config_free (conf);
527 return (ENOMEM);
528 }
529 subscriber_threads = tmp;
530 tmp = subscriber_threads + subscriber_threads_num;
531 memset (tmp, 0, sizeof (*tmp));
533 status = pthread_create (tmp, /* attr = */ NULL,
534 camqp_subscribe_thread, conf);
535 if (status != 0)
536 {
537 char errbuf[1024];
538 ERROR ("amqp plugin: pthread_create failed: %s",
539 sstrerror (status, errbuf, sizeof (errbuf)));
540 camqp_config_free (conf);
541 return (status);
542 }
544 subscriber_threads_num++;
546 return (0);
547 } /* }}} int camqp_subscribe_init */
549 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
550 const char *buffer)
551 {
552 amqp_basic_properties_t props;
553 int status;
555 status = camqp_connect (conf);
556 if (status != 0)
557 return (status);
559 memset (&props, 0, sizeof (props));
560 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
561 | AMQP_BASIC_DELIVERY_MODE_FLAG
562 | AMQP_BASIC_APP_ID_FLAG;
563 props.content_type = amqp_cstring_bytes("application/json");
564 props.delivery_mode = conf->delivery_mode;
565 props.app_id = amqp_cstring_bytes("collectd");
567 status = amqp_basic_publish(conf->connection,
568 /* channel = */ 1,
569 amqp_cstring_bytes(CONF(conf, exchange)),
570 amqp_cstring_bytes(CONF(conf, routingkey)),
571 /* mandatory = */ 0,
572 /* immediate = */ 0,
573 &props,
574 amqp_cstring_bytes(buffer));
575 if (status != 0)
576 {
577 ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
578 status);
579 camqp_close_connection (conf);
580 }
582 return (status);
583 } /* }}} int camqp_write_locked */
585 static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
586 user_data_t *user_data)
587 {
588 camqp_config_t *conf = user_data->data;
589 char buffer[4096];
590 size_t bfree;
591 size_t bfill;
592 int status;
594 if ((ds == NULL) || (vl == NULL) || (conf == NULL))
595 return (EINVAL);
597 memset (buffer, 0, sizeof (buffer));
598 bfree = sizeof (buffer);
599 bfill = 0;
601 format_json_initialize (buffer, &bfill, &bfree);
602 format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
603 format_json_finalize (buffer, &bfill, &bfree);
605 pthread_mutex_lock (&conf->lock);
606 status = camqp_write_locked (conf, buffer);
607 pthread_mutex_unlock (&conf->lock);
609 return (status);
610 } /* }}} int camqp_write */
612 static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
613 _Bool publish)
614 {
615 camqp_config_t *conf;
616 int status;
617 int i;
619 conf = malloc (sizeof (*conf));
620 if (conf == NULL)
621 {
622 ERROR ("amqp plugin: malloc failed.");
623 return (ENOMEM);
624 }
626 /* Initialize "conf" {{{ */
627 memset (conf, 0, sizeof (*conf));
628 conf->publish = publish;
629 conf->name = NULL;
630 conf->host = NULL;
631 conf->port = 5672;
632 conf->vhost = NULL;
633 conf->user = NULL;
634 conf->password = NULL;
635 conf->exchange = NULL;
636 conf->exchange_type = NULL;
637 conf->queue = NULL;
638 conf->routingkey = NULL;
639 conf->delivery_mode = CAMQP_DM_VOLATILE;
640 conf->store_rates = 0;
641 conf->connection = NULL;
642 pthread_mutex_init (&conf->lock, /* attr = */ NULL);
643 /* }}} */
645 status = cf_util_get_string (ci, &conf->name);
646 if (status != 0)
647 {
648 sfree (conf);
649 return (status);
650 }
652 for (i = 0; i < ci->children_num; i++)
653 {
654 oconfig_item_t *child = ci->children + i;
656 if (strcasecmp ("Host", child->key) == 0)
657 status = cf_util_get_string (child, &conf->host);
658 else if (strcasecmp ("Port", child->key) == 0)
659 {
660 status = cf_util_get_port_number (child);
661 if (status > 0)
662 {
663 conf->port = status;
664 status = 0;
665 }
666 }
667 else if (strcasecmp ("VHost", child->key) == 0)
668 status = cf_util_get_string (child, &conf->vhost);
669 else if (strcasecmp ("User", child->key) == 0)
670 status = cf_util_get_string (child, &conf->user);
671 else if (strcasecmp ("Password", child->key) == 0)
672 status = cf_util_get_string (child, &conf->password);
673 else if (strcasecmp ("Exchange", child->key) == 0)
674 status = cf_util_get_string (child, &conf->exchange);
675 else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
676 status = cf_util_get_string (child, &conf->exchange_type);
677 else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
678 status = cf_util_get_string (child, &conf->queue);
679 else if (strcasecmp ("RoutingKey", child->key) == 0)
680 status = cf_util_get_string (child, &conf->routingkey);
681 else if (strcasecmp ("Persistent", child->key) == 0)
682 {
683 _Bool tmp = 0;
684 status = cf_util_get_boolean (child, &tmp);
685 if (tmp)
686 conf->delivery_mode = CAMQP_DM_PERSISTENT;
687 else
688 conf->delivery_mode = CAMQP_DM_VOLATILE;
689 }
690 else if (strcasecmp ("StoreRates", child->key) == 0)
691 status = cf_util_get_boolean (child, &conf->store_rates);
692 else
693 WARNING ("amqp plugin: Ignoring unknown "
694 "configuration option \"%s\".", child->key);
696 if (status != 0)
697 break;
698 } /* for (i = 0; i < ci->children_num; i++) */
700 if ((status == 0) && !publish && (conf->exchange == NULL))
701 {
702 if (conf->routingkey != NULL)
703 WARNING ("amqp plugin: The option \"RoutingKey\" was given "
704 "without the \"Exchange\" option. It will be ignored.");
706 if (conf->exchange_type != NULL)
707 WARNING ("amqp plugin: The option \"ExchangeType\" was given "
708 "without the \"Exchange\" option. It will be ignored.");
709 }
711 if (status != 0)
712 {
713 camqp_config_free (conf);
714 return (status);
715 }
717 if (conf->exchange != NULL)
718 {
719 DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;",
720 conf->exchange);
721 }
723 if (publish)
724 {
725 char cbname[128];
726 user_data_t ud = { conf, camqp_config_free };
728 ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
730 status = plugin_register_write (cbname, camqp_write, &ud);
731 if (status != 0)
732 {
733 camqp_config_free (conf);
734 return (status);
735 }
736 }
737 else
738 {
739 status = camqp_subscribe_init (conf);
740 if (status != 0)
741 {
742 camqp_config_free (conf);
743 return (status);
744 }
745 }
747 return (0);
748 } /* }}} int camqp_config_connection */
750 static int camqp_config (oconfig_item_t *ci) /* {{{ */
751 {
752 int i;
754 for (i = 0; i < ci->children_num; i++)
755 {
756 oconfig_item_t *child = ci->children + i;
758 if (strcasecmp ("Publish", child->key) == 0)
759 camqp_config_connection (child, /* publish = */ 1);
760 else if (strcasecmp ("Subscribe", child->key) == 0)
761 camqp_config_connection (child, /* publish = */ 0);
762 else
763 WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
764 child->key);
765 } /* for (ci->children_num) */
767 return (0);
768 } /* }}} int camqp_config */
770 static int shutdown (void) /* {{{ */
771 {
772 size_t i;
774 DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
775 subscriber_threads_num);
777 subscriber_threads_running = 0;
778 for (i = 0; i < subscriber_threads_num; i++)
779 {
780 /* FIXME: Sending a signal is not very elegant here. Maybe find out how
781 * to use a timeout in the thread and check for the variable in regular
782 * intervals. */
783 pthread_kill (subscriber_threads[i], SIGTERM);
784 pthread_join (subscriber_threads[i], /* retval = */ NULL);
785 }
787 subscriber_threads_num = 0;
788 sfree (subscriber_threads);
790 DEBUG ("amqp plugin: All subscriber threads exited.");
792 return (0);
793 } /* }}} int shutdown */
795 void module_register (void)
796 {
797 plugin_register_complex_config ("amqp", camqp_config);
798 plugin_register_shutdown ("amqp", shutdown);
799 } /* void module_register */
801 /* vim: set sw=4 sts=4 et fdm=marker : */