1 /**
2 * collectd - src/amqp.c
3 * Copyright (C) 2009 Sebastien Pahl
4 * Copyright (C) 2010-2012 Florian Forster
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
23 *
24 * Authors:
25 * Sebastien Pahl <sebastien.pahl at dotcloud.com>
26 * Florian Forster <octo at verplant.org>
27 **/
29 #include "collectd.h"
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_cmd_putval.h"
33 #include "utils_format_json.h"
34 #include "utils_format_graphite.h"
36 #include <pthread.h>
38 #include <amqp.h>
39 #include <amqp_framing.h>
41 /* Defines for the delivery mode. I have no idea why they're not defined by the
42 * library.. */
43 #define CAMQP_DM_VOLATILE 1
44 #define CAMQP_DM_PERSISTENT 2
46 #define CAMQP_FORMAT_COMMAND 1
47 #define CAMQP_FORMAT_JSON 2
48 #define CAMQP_FORMAT_GRAPHITE 3
50 #define CAMQP_CHANNEL 1
52 /*
53 * Data types
54 */
55 struct camqp_config_s
56 {
57 _Bool publish;
58 char *name;
60 char *host;
61 int port;
62 char *vhost;
63 char *user;
64 char *password;
66 char *exchange;
67 char *routing_key;
69 /* publish only */
70 uint8_t delivery_mode;
71 _Bool store_rates;
72 int format;
73 /* publish & graphite format only */
74 char *prefix;
75 char *postfix;
76 char escape_char;
78 /* subscribe only */
79 char *exchange_type;
80 char *queue;
82 amqp_connection_state_t connection;
83 pthread_mutex_t lock;
84 };
85 typedef struct camqp_config_s camqp_config_t;
87 /*
88 * Global variables
89 */
90 static const char *def_host = "localhost";
91 static const char *def_vhost = "/";
92 static const char *def_user = "guest";
93 static const char *def_password = "guest";
94 static const char *def_exchange = "amq.fanout";
96 static pthread_t *subscriber_threads = NULL;
97 static size_t subscriber_threads_num = 0;
98 static _Bool subscriber_threads_running = 1;
100 #define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
102 /*
103 * Functions
104 */
105 static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
106 {
107 int sockfd;
109 if ((conf == NULL) || (conf->connection == NULL))
110 return;
112 sockfd = amqp_get_sockfd (conf->connection);
113 amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
114 amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
115 amqp_destroy_connection (conf->connection);
116 close (sockfd);
117 conf->connection = NULL;
118 } /* }}} void camqp_close_connection */
120 static void camqp_config_free (void *ptr) /* {{{ */
121 {
122 camqp_config_t *conf = ptr;
124 if (conf == NULL)
125 return;
127 camqp_close_connection (conf);
129 sfree (conf->name);
130 sfree (conf->host);
131 sfree (conf->vhost);
132 sfree (conf->user);
133 sfree (conf->password);
134 sfree (conf->exchange);
135 sfree (conf->exchange_type);
136 sfree (conf->queue);
137 sfree (conf->routing_key);
138 sfree (conf->prefix);
139 sfree (conf->postfix);
142 sfree (conf);
143 } /* }}} void camqp_config_free */
145 static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */
146 {
147 char *ret;
149 if ((in == NULL) || (in->bytes == NULL))
150 return (NULL);
152 ret = malloc (in->len + 1);
153 if (ret == NULL)
154 return (NULL);
156 memcpy (ret, in->bytes, in->len);
157 ret[in->len] = 0;
159 return (ret);
160 } /* }}} char *camqp_bytes_cstring */
162 static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */
163 {
164 amqp_rpc_reply_t r;
166 r = amqp_get_rpc_reply (conf->connection);
167 if (r.reply_type == AMQP_RESPONSE_NORMAL)
168 return (0);
170 return (1);
171 } /* }}} _Bool camqp_is_error */
173 static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
174 char *buffer, size_t buffer_size)
175 {
176 amqp_rpc_reply_t r;
178 r = amqp_get_rpc_reply (conf->connection);
179 switch (r.reply_type)
180 {
181 case AMQP_RESPONSE_NORMAL:
182 sstrncpy (buffer, "Success", sizeof (buffer));
183 break;
185 case AMQP_RESPONSE_NONE:
186 sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
187 break;
189 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
190 #if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
191 if (r.library_errno)
192 return (sstrerror (r.library_errno, buffer, buffer_size));
193 #else
194 if (r.library_error)
195 return (sstrerror (r.library_error, buffer, buffer_size));
196 #endif
197 else
198 sstrncpy (buffer, "End of stream", sizeof (buffer));
199 break;
201 case AMQP_RESPONSE_SERVER_EXCEPTION:
202 if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
203 {
204 amqp_connection_close_t *m = r.reply.decoded;
205 char *tmp = camqp_bytes_cstring (&m->reply_text);
206 ssnprintf (buffer, buffer_size, "Server connection error %d: %s",
207 m->reply_code, tmp);
208 sfree (tmp);
209 }
210 else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
211 {
212 amqp_channel_close_t *m = r.reply.decoded;
213 char *tmp = camqp_bytes_cstring (&m->reply_text);
214 ssnprintf (buffer, buffer_size, "Server channel error %d: %s",
215 m->reply_code, tmp);
216 sfree (tmp);
217 }
218 else
219 {
220 ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32,
221 r.reply.id);
222 }
223 break;
225 default:
226 ssnprintf (buffer, buffer_size, "Unknown reply type %i",
227 (int) r.reply_type);
228 }
230 return (buffer);
231 } /* }}} char *camqp_strerror */
233 #if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
234 static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
235 {
236 amqp_exchange_declare_ok_t *ed_ret;
238 if (conf->exchange_type == NULL)
239 return (0);
241 ed_ret = amqp_exchange_declare (conf->connection,
242 /* channel = */ CAMQP_CHANNEL,
243 /* exchange = */ amqp_cstring_bytes (conf->exchange),
244 /* type = */ amqp_cstring_bytes (conf->exchange_type),
245 /* passive = */ 0,
246 /* durable = */ 0,
247 /* auto_delete = */ 1,
248 /* arguments = */ AMQP_EMPTY_TABLE);
249 if ((ed_ret == NULL) && camqp_is_error (conf))
250 {
251 char errbuf[1024];
252 ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
253 camqp_strerror (conf, errbuf, sizeof (errbuf)));
254 camqp_close_connection (conf);
255 return (-1);
256 }
258 INFO ("amqp plugin: Successfully created exchange \"%s\" "
259 "with type \"%s\".",
260 conf->exchange, conf->exchange_type);
262 return (0);
263 } /* }}} int camqp_create_exchange */
264 #else
265 static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
266 {
267 amqp_exchange_declare_ok_t *ed_ret;
268 amqp_table_t argument_table;
269 struct amqp_table_entry_t_ argument_table_entries[1];
271 if (conf->exchange_type == NULL)
272 return (0);
274 /* Valid arguments: "auto_delete", "internal" */
275 argument_table.num_entries = STATIC_ARRAY_SIZE (argument_table_entries);
276 argument_table.entries = argument_table_entries;
277 argument_table_entries[0].key = amqp_cstring_bytes ("auto_delete");
278 argument_table_entries[0].value.kind = AMQP_FIELD_KIND_BOOLEAN;
279 argument_table_entries[0].value.value.boolean = 1;
281 ed_ret = amqp_exchange_declare (conf->connection,
282 /* channel = */ CAMQP_CHANNEL,
283 /* exchange = */ amqp_cstring_bytes (conf->exchange),
284 /* type = */ amqp_cstring_bytes (conf->exchange_type),
285 /* passive = */ 0,
286 /* durable = */ 0,
287 /* arguments = */ argument_table);
288 if ((ed_ret == NULL) && camqp_is_error (conf))
289 {
290 char errbuf[1024];
291 ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
292 camqp_strerror (conf, errbuf, sizeof (errbuf)));
293 camqp_close_connection (conf);
294 return (-1);
295 }
297 INFO ("amqp plugin: Successfully created exchange \"%s\" "
298 "with type \"%s\".",
299 conf->exchange, conf->exchange_type);
301 return (0);
302 } /* }}} int camqp_create_exchange */
303 #endif
305 static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
306 {
307 amqp_queue_declare_ok_t *qd_ret;
308 amqp_basic_consume_ok_t *cm_ret;
310 qd_ret = amqp_queue_declare (conf->connection,
311 /* channel = */ CAMQP_CHANNEL,
312 /* queue = */ (conf->queue != NULL)
313 ? amqp_cstring_bytes (conf->queue)
314 : AMQP_EMPTY_BYTES,
315 /* passive = */ 0,
316 /* durable = */ 0,
317 /* exclusive = */ 0,
318 /* auto_delete = */ 1,
319 /* arguments = */ AMQP_EMPTY_TABLE);
320 if (qd_ret == NULL)
321 {
322 ERROR ("amqp plugin: amqp_queue_declare failed.");
323 camqp_close_connection (conf);
324 return (-1);
325 }
327 if (conf->queue == NULL)
328 {
329 conf->queue = camqp_bytes_cstring (&qd_ret->queue);
330 if (conf->queue == NULL)
331 {
332 ERROR ("amqp plugin: camqp_bytes_cstring failed.");
333 camqp_close_connection (conf);
334 return (-1);
335 }
337 INFO ("amqp plugin: Created queue \"%s\".", conf->queue);
338 }
339 DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue);
341 /* bind to an exchange */
342 if (conf->exchange != NULL)
343 {
344 amqp_queue_bind_ok_t *qb_ret;
346 assert (conf->queue != NULL);
347 qb_ret = amqp_queue_bind (conf->connection,
348 /* channel = */ CAMQP_CHANNEL,
349 /* queue = */ amqp_cstring_bytes (conf->queue),
350 /* exchange = */ amqp_cstring_bytes (conf->exchange),
351 /* routing_key = */ (conf->routing_key != NULL)
352 ? amqp_cstring_bytes (conf->routing_key)
353 : AMQP_EMPTY_BYTES,
354 /* arguments = */ AMQP_EMPTY_TABLE);
355 if ((qb_ret == NULL) && camqp_is_error (conf))
356 {
357 char errbuf[1024];
358 ERROR ("amqp plugin: amqp_queue_bind failed: %s",
359 camqp_strerror (conf, errbuf, sizeof (errbuf)));
360 camqp_close_connection (conf);
361 return (-1);
362 }
364 DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".",
365 conf->queue, conf->exchange);
366 } /* if (conf->exchange != NULL) */
368 cm_ret = amqp_basic_consume (conf->connection,
369 /* channel = */ CAMQP_CHANNEL,
370 /* queue = */ amqp_cstring_bytes (conf->queue),
371 /* consumer_tag = */ AMQP_EMPTY_BYTES,
372 /* no_local = */ 0,
373 /* no_ack = */ 1,
374 /* exclusive = */ 0,
375 /* arguments = */ AMQP_EMPTY_TABLE
376 );
377 if ((cm_ret == NULL) && camqp_is_error (conf))
378 {
379 char errbuf[1024];
380 ERROR ("amqp plugin: amqp_basic_consume failed: %s",
381 camqp_strerror (conf, errbuf, sizeof (errbuf)));
382 camqp_close_connection (conf);
383 return (-1);
384 }
386 return (0);
387 } /* }}} int camqp_setup_queue */
389 static int camqp_connect (camqp_config_t *conf) /* {{{ */
390 {
391 amqp_rpc_reply_t reply;
392 int sockfd;
393 int status;
395 if (conf->connection != NULL)
396 return (0);
398 conf->connection = amqp_new_connection ();
399 if (conf->connection == NULL)
400 {
401 ERROR ("amqp plugin: amqp_new_connection failed.");
402 return (ENOMEM);
403 }
405 sockfd = amqp_open_socket (CONF(conf, host), conf->port);
406 if (sockfd < 0)
407 {
408 char errbuf[1024];
409 status = (-1) * sockfd;
410 ERROR ("amqp plugin: amqp_open_socket failed: %s",
411 sstrerror (status, errbuf, sizeof (errbuf)));
412 amqp_destroy_connection (conf->connection);
413 conf->connection = NULL;
414 return (status);
415 }
416 amqp_set_sockfd (conf->connection, sockfd);
418 reply = amqp_login (conf->connection, CONF(conf, vhost),
419 /* channel max = */ 0,
420 /* frame max = */ 131072,
421 /* heartbeat = */ 0,
422 /* authentication = */ AMQP_SASL_METHOD_PLAIN,
423 CONF(conf, user), CONF(conf, password));
424 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
425 {
426 ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
427 CONF(conf, vhost), CONF(conf, user));
428 amqp_destroy_connection (conf->connection);
429 close (sockfd);
430 conf->connection = NULL;
431 return (1);
432 }
434 amqp_channel_open (conf->connection, /* channel = */ 1);
435 /* FIXME: Is checking "reply.reply_type" really correct here? How does
436 * it get set? --octo */
437 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
438 {
439 ERROR ("amqp plugin: amqp_channel_open failed.");
440 amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
441 amqp_destroy_connection (conf->connection);
442 close(sockfd);
443 conf->connection = NULL;
444 return (1);
445 }
447 INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
448 "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
450 status = camqp_create_exchange (conf);
451 if (status != 0)
452 return (status);
454 if (!conf->publish)
455 return (camqp_setup_queue (conf));
456 return (0);
457 } /* }}} int camqp_connect */
459 static int camqp_shutdown (void) /* {{{ */
460 {
461 size_t i;
463 DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
464 subscriber_threads_num);
466 subscriber_threads_running = 0;
467 for (i = 0; i < subscriber_threads_num; i++)
468 {
469 /* FIXME: Sending a signal is not very elegant here. Maybe find out how
470 * to use a timeout in the thread and check for the variable in regular
471 * intervals. */
472 pthread_kill (subscriber_threads[i], SIGTERM);
473 pthread_join (subscriber_threads[i], /* retval = */ NULL);
474 }
476 subscriber_threads_num = 0;
477 sfree (subscriber_threads);
479 DEBUG ("amqp plugin: All subscriber threads exited.");
481 return (0);
482 } /* }}} int camqp_shutdown */
484 /*
485 * Subscribing code
486 */
487 static int camqp_read_body (camqp_config_t *conf, /* {{{ */
488 size_t body_size, const char *content_type)
489 {
490 char body[body_size + 1];
491 char *body_ptr;
492 size_t received;
493 amqp_frame_t frame;
494 int status;
496 memset (body, 0, sizeof (body));
497 body_ptr = &body[0];
498 received = 0;
500 while (received < body_size)
501 {
502 status = amqp_simple_wait_frame (conf->connection, &frame);
503 if (status < 0)
504 {
505 char errbuf[1024];
506 status = (-1) * status;
507 ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
508 sstrerror (status, errbuf, sizeof (errbuf)));
509 camqp_close_connection (conf);
510 return (status);
511 }
513 if (frame.frame_type != AMQP_FRAME_BODY)
514 {
515 NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
516 frame.frame_type);
517 return (-1);
518 }
520 if ((body_size - received) < frame.payload.body_fragment.len)
521 {
522 WARNING ("amqp plugin: Body is larger than indicated by header.");
523 return (-1);
524 }
526 memcpy (body_ptr, frame.payload.body_fragment.bytes,
527 frame.payload.body_fragment.len);
528 body_ptr += frame.payload.body_fragment.len;
529 received += frame.payload.body_fragment.len;
530 } /* while (received < body_size) */
532 if (strcasecmp ("text/collectd", content_type) == 0)
533 {
534 status = handle_putval (stderr, body);
535 if (status != 0)
536 ERROR ("amqp plugin: handle_putval failed with status %i.",
537 status);
538 return (status);
539 }
540 else if (strcasecmp ("application/json", content_type) == 0)
541 {
542 ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not "
543 "been implemented yet. FIXME!");
544 return (0);
545 }
546 else
547 {
548 ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
549 content_type);
550 return (EINVAL);
551 }
553 /* not reached */
554 return (0);
555 } /* }}} int camqp_read_body */
557 static int camqp_read_header (camqp_config_t *conf) /* {{{ */
558 {
559 int status;
560 amqp_frame_t frame;
561 amqp_basic_properties_t *properties;
562 char *content_type;
564 status = amqp_simple_wait_frame (conf->connection, &frame);
565 if (status < 0)
566 {
567 char errbuf[1024];
568 status = (-1) * status;
569 ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
570 sstrerror (status, errbuf, sizeof (errbuf)));
571 camqp_close_connection (conf);
572 return (status);
573 }
575 if (frame.frame_type != AMQP_FRAME_HEADER)
576 {
577 NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
578 frame.frame_type);
579 return (-1);
580 }
582 properties = frame.payload.properties.decoded;
583 content_type = camqp_bytes_cstring (&properties->content_type);
584 if (content_type == NULL)
585 {
586 ERROR ("amqp plugin: Unable to determine content type.");
587 return (-1);
588 }
590 status = camqp_read_body (conf,
591 (size_t) frame.payload.properties.body_size,
592 content_type);
594 sfree (content_type);
595 return (status);
596 } /* }}} int camqp_read_header */
598 static void *camqp_subscribe_thread (void *user_data) /* {{{ */
599 {
600 camqp_config_t *conf = user_data;
601 int status;
603 cdtime_t interval = plugin_get_interval ();
605 while (subscriber_threads_running)
606 {
607 amqp_frame_t frame;
609 status = camqp_connect (conf);
610 if (status != 0)
611 {
612 struct timespec ts_interval;
613 ERROR ("amqp plugin: camqp_connect failed. "
614 "Will sleep for %.3f seconds.",
615 CDTIME_T_TO_DOUBLE (interval));
616 CDTIME_T_TO_TIMESPEC (interval, &ts_interval);
617 nanosleep (&ts_interval, /* remaining = */ NULL);
618 continue;
619 }
621 status = amqp_simple_wait_frame (conf->connection, &frame);
622 if (status < 0)
623 {
624 struct timespec ts_interval;
625 ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
626 "Will sleep for %.3f seconds.",
627 CDTIME_T_TO_DOUBLE (interval));
628 camqp_close_connection (conf);
629 CDTIME_T_TO_TIMESPEC (interval, &ts_interval);
630 nanosleep (&ts_interval, /* remaining = */ NULL);
631 continue;
632 }
634 if (frame.frame_type != AMQP_FRAME_METHOD)
635 {
636 DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8,
637 frame.frame_type);
638 continue;
639 }
641 if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
642 {
643 DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32,
644 frame.payload.method.id);
645 continue;
646 }
648 status = camqp_read_header (conf);
650 amqp_maybe_release_buffers (conf->connection);
651 } /* while (subscriber_threads_running) */
653 camqp_config_free (conf);
654 pthread_exit (NULL);
655 return (NULL);
656 } /* }}} void *camqp_subscribe_thread */
658 static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
659 {
660 int status;
661 pthread_t *tmp;
663 tmp = realloc (subscriber_threads,
664 sizeof (*subscriber_threads) * (subscriber_threads_num + 1));
665 if (tmp == NULL)
666 {
667 ERROR ("amqp plugin: realloc failed.");
668 camqp_config_free (conf);
669 return (ENOMEM);
670 }
671 subscriber_threads = tmp;
672 tmp = subscriber_threads + subscriber_threads_num;
673 memset (tmp, 0, sizeof (*tmp));
675 status = plugin_thread_create (tmp, /* attr = */ NULL,
676 camqp_subscribe_thread, conf);
677 if (status != 0)
678 {
679 char errbuf[1024];
680 ERROR ("amqp plugin: pthread_create failed: %s",
681 sstrerror (status, errbuf, sizeof (errbuf)));
682 camqp_config_free (conf);
683 return (status);
684 }
686 subscriber_threads_num++;
688 return (0);
689 } /* }}} int camqp_subscribe_init */
691 /*
692 * Publishing code
693 */
694 /* XXX: You must hold "conf->lock" when calling this function! */
695 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
696 const char *buffer, const char *routing_key)
697 {
698 amqp_basic_properties_t props;
699 int status;
701 status = camqp_connect (conf);
702 if (status != 0)
703 return (status);
705 memset (&props, 0, sizeof (props));
706 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
707 | AMQP_BASIC_DELIVERY_MODE_FLAG
708 | AMQP_BASIC_APP_ID_FLAG;
709 if (conf->format == CAMQP_FORMAT_COMMAND)
710 props.content_type = amqp_cstring_bytes("text/collectd");
711 else if (conf->format == CAMQP_FORMAT_JSON)
712 props.content_type = amqp_cstring_bytes("application/json");
713 else if (conf->format == CAMQP_FORMAT_GRAPHITE)
714 props.content_type = amqp_cstring_bytes("text/graphite");
715 else
716 assert (23 == 42);
717 props.delivery_mode = conf->delivery_mode;
718 props.app_id = amqp_cstring_bytes("collectd");
720 status = amqp_basic_publish(conf->connection,
721 /* channel = */ 1,
722 amqp_cstring_bytes(CONF(conf, exchange)),
723 amqp_cstring_bytes (routing_key),
724 /* mandatory = */ 0,
725 /* immediate = */ 0,
726 &props,
727 amqp_cstring_bytes(buffer));
728 if (status != 0)
729 {
730 ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
731 status);
732 camqp_close_connection (conf);
733 }
735 return (status);
736 } /* }}} int camqp_write_locked */
738 static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
739 user_data_t *user_data)
740 {
741 camqp_config_t *conf = user_data->data;
742 char routing_key[6 * DATA_MAX_NAME_LEN];
743 char buffer[4096];
744 int status;
746 if ((ds == NULL) || (vl == NULL) || (conf == NULL))
747 return (EINVAL);
749 memset (buffer, 0, sizeof (buffer));
751 if (conf->routing_key != NULL)
752 {
753 sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
754 }
755 else
756 {
757 size_t i;
758 ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
759 vl->host,
760 vl->plugin, vl->plugin_instance,
761 vl->type, vl->type_instance);
763 /* Switch slashes (the only character forbidden by collectd) and dots
764 * (the separation character used by AMQP). */
765 for (i = 0; routing_key[i] != 0; i++)
766 {
767 if (routing_key[i] == '.')
768 routing_key[i] = '/';
769 else if (routing_key[i] == '/')
770 routing_key[i] = '.';
771 }
772 }
774 if (conf->format == CAMQP_FORMAT_COMMAND)
775 {
776 status = create_putval (buffer, sizeof (buffer), ds, vl);
777 if (status != 0)
778 {
779 ERROR ("amqp plugin: create_putval failed with status %i.",
780 status);
781 return (status);
782 }
783 }
784 else if (conf->format == CAMQP_FORMAT_JSON)
785 {
786 size_t bfree = sizeof (buffer);
787 size_t bfill = 0;
789 format_json_initialize (buffer, &bfill, &bfree);
790 format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
791 format_json_finalize (buffer, &bfill, &bfree);
792 }
793 else if (conf->format == CAMQP_FORMAT_GRAPHITE)
794 {
795 status = format_graphite (buffer, sizeof (buffer), ds, vl,
796 conf->prefix, conf->postfix, conf->escape_char,
797 conf->store_rates);
798 if (status != 0)
799 {
800 ERROR ("amqp plugin: format_graphite failed with status %i.",
801 status);
802 return (status);
803 }
804 }
805 else
806 {
807 ERROR ("amqp plugin: Invalid format (%i).", conf->format);
808 return (-1);
809 }
811 pthread_mutex_lock (&conf->lock);
812 status = camqp_write_locked (conf, buffer, routing_key);
813 pthread_mutex_unlock (&conf->lock);
815 return (status);
816 } /* }}} int camqp_write */
818 /*
819 * Config handling
820 */
821 static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
822 camqp_config_t *conf)
823 {
824 char *string;
825 int status;
827 string = NULL;
828 status = cf_util_get_string (ci, &string);
829 if (status != 0)
830 return (status);
832 assert (string != NULL);
833 if (strcasecmp ("Command", string) == 0)
834 conf->format = CAMQP_FORMAT_COMMAND;
835 else if (strcasecmp ("JSON", string) == 0)
836 conf->format = CAMQP_FORMAT_JSON;
837 else if (strcasecmp ("Graphite", string) == 0)
838 conf->format = CAMQP_FORMAT_GRAPHITE;
839 else
840 {
841 WARNING ("amqp plugin: Invalid format string: %s",
842 string);
843 }
845 free (string);
847 return (0);
848 } /* }}} int config_set_string */
850 static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
851 _Bool publish)
852 {
853 camqp_config_t *conf;
854 int status;
855 int i;
857 conf = malloc (sizeof (*conf));
858 if (conf == NULL)
859 {
860 ERROR ("amqp plugin: malloc failed.");
861 return (ENOMEM);
862 }
864 /* Initialize "conf" {{{ */
865 memset (conf, 0, sizeof (*conf));
866 conf->publish = publish;
867 conf->name = NULL;
868 conf->format = CAMQP_FORMAT_COMMAND;
869 conf->host = NULL;
870 conf->port = 5672;
871 conf->vhost = NULL;
872 conf->user = NULL;
873 conf->password = NULL;
874 conf->exchange = NULL;
875 conf->routing_key = NULL;
876 /* publish only */
877 conf->delivery_mode = CAMQP_DM_VOLATILE;
878 conf->store_rates = 0;
879 /* publish & graphite only */
880 conf->prefix = NULL;
881 conf->postfix = NULL;
882 conf->escape_char = '_';
883 /* subscribe only */
884 conf->exchange_type = NULL;
885 conf->queue = NULL;
886 /* general */
887 conf->connection = NULL;
888 pthread_mutex_init (&conf->lock, /* attr = */ NULL);
889 /* }}} */
891 status = cf_util_get_string (ci, &conf->name);
892 if (status != 0)
893 {
894 sfree (conf);
895 return (status);
896 }
898 for (i = 0; i < ci->children_num; i++)
899 {
900 oconfig_item_t *child = ci->children + i;
902 if (strcasecmp ("Host", child->key) == 0)
903 status = cf_util_get_string (child, &conf->host);
904 else if (strcasecmp ("Port", child->key) == 0)
905 {
906 status = cf_util_get_port_number (child);
907 if (status > 0)
908 {
909 conf->port = status;
910 status = 0;
911 }
912 }
913 else if (strcasecmp ("VHost", child->key) == 0)
914 status = cf_util_get_string (child, &conf->vhost);
915 else if (strcasecmp ("User", child->key) == 0)
916 status = cf_util_get_string (child, &conf->user);
917 else if (strcasecmp ("Password", child->key) == 0)
918 status = cf_util_get_string (child, &conf->password);
919 else if (strcasecmp ("Exchange", child->key) == 0)
920 status = cf_util_get_string (child, &conf->exchange);
921 else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
922 status = cf_util_get_string (child, &conf->exchange_type);
923 else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
924 status = cf_util_get_string (child, &conf->queue);
925 else if (strcasecmp ("RoutingKey", child->key) == 0)
926 status = cf_util_get_string (child, &conf->routing_key);
927 else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
928 {
929 _Bool tmp = 0;
930 status = cf_util_get_boolean (child, &tmp);
931 if (tmp)
932 conf->delivery_mode = CAMQP_DM_PERSISTENT;
933 else
934 conf->delivery_mode = CAMQP_DM_VOLATILE;
935 }
936 else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
937 status = cf_util_get_boolean (child, &conf->store_rates);
938 else if ((strcasecmp ("Format", child->key) == 0) && publish)
939 status = camqp_config_set_format (child, conf);
940 else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish)
941 status = cf_util_get_string (child, &conf->prefix);
942 else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)
943 status = cf_util_get_string (child, &conf->postfix);
944 else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish)
945 {
946 char *tmp_buff = NULL;
947 status = cf_util_get_string (child, &tmp_buff);
948 if (strlen (tmp_buff) > 1)
949 WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles "
950 "only one character. Others will be ignored.");
951 conf->escape_char = tmp_buff[0];
952 sfree (tmp_buff);
953 }
954 else
955 WARNING ("amqp plugin: Ignoring unknown "
956 "configuration option \"%s\".", child->key);
958 if (status != 0)
959 break;
960 } /* for (i = 0; i < ci->children_num; i++) */
962 if ((status == 0) && (conf->exchange == NULL))
963 {
964 if (conf->exchange_type != NULL)
965 WARNING ("amqp plugin: The option \"ExchangeType\" was given "
966 "without the \"Exchange\" option. It will be ignored.");
968 if (!publish && (conf->routing_key != NULL))
969 WARNING ("amqp plugin: The option \"RoutingKey\" was given "
970 "without the \"Exchange\" option. It will be ignored.");
972 }
974 if (status != 0)
975 {
976 camqp_config_free (conf);
977 return (status);
978 }
980 if (conf->exchange != NULL)
981 {
982 DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;",
983 conf->exchange);
984 }
986 if (publish)
987 {
988 char cbname[128];
989 user_data_t ud = { conf, camqp_config_free };
991 ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
993 status = plugin_register_write (cbname, camqp_write, &ud);
994 if (status != 0)
995 {
996 camqp_config_free (conf);
997 return (status);
998 }
999 }
1000 else
1001 {
1002 status = camqp_subscribe_init (conf);
1003 if (status != 0)
1004 {
1005 camqp_config_free (conf);
1006 return (status);
1007 }
1008 }
1010 return (0);
1011 } /* }}} int camqp_config_connection */
1013 static int camqp_config (oconfig_item_t *ci) /* {{{ */
1014 {
1015 int i;
1017 for (i = 0; i < ci->children_num; i++)
1018 {
1019 oconfig_item_t *child = ci->children + i;
1021 if (strcasecmp ("Publish", child->key) == 0)
1022 camqp_config_connection (child, /* publish = */ 1);
1023 else if (strcasecmp ("Subscribe", child->key) == 0)
1024 camqp_config_connection (child, /* publish = */ 0);
1025 else
1026 WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
1027 child->key);
1028 } /* for (ci->children_num) */
1030 return (0);
1031 } /* }}} int camqp_config */
1033 void module_register (void)
1034 {
1035 plugin_register_complex_config ("amqp", camqp_config);
1036 plugin_register_shutdown ("amqp", camqp_shutdown);
1037 } /* void module_register */
1039 /* vim: set sw=4 sts=4 et fdm=marker : */