From 111f69addbf605ff94a4e3bf6c63163e8736c9a4 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 5 Aug 2010 15:01:03 +0200 Subject: [PATCH] amqp plugin: First step towards subscribing to data via AMQP. --- src/amqp.c | 415 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 404 insertions(+), 11 deletions(-) diff --git a/src/amqp.c b/src/amqp.c index eccdaff1..1924ce73 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -84,6 +84,10 @@ static const char *def_password = "guest"; static const char *def_exchange = "amq.fanout"; static const char *def_routingkey = "collectd"; +static pthread_t *subscriber_threads = NULL; +static size_t subscriber_threads_num = 0; +static _Bool subscriber_threads_running = 1; + #define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f) /* @@ -126,6 +130,196 @@ static void camqp_config_free (void *ptr) /* {{{ */ sfree (conf); } /* }}} void camqp_config_free */ +static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */ +{ + char *ret; + + if ((in == NULL) || (in->bytes == NULL)) + return (NULL); + + ret = malloc (in->len + 1); + if (ret == NULL) + return (NULL); + + memcpy (ret, in->bytes, in->len); + ret[in->len] = 0; + + return (ret); +} /* }}} char *camqp_bytes_cstring */ + +static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */ +{ + amqp_rpc_reply_t r; + + r = amqp_get_rpc_reply (conf->connection); + if (r.reply_type == AMQP_RESPONSE_NORMAL) + return (0); + + return (1); +} /* }}} _Bool camqp_is_error */ + +static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ + char *buffer, size_t buffer_size) +{ + amqp_rpc_reply_t r; + + r = amqp_get_rpc_reply (conf->connection); + switch (r.reply_type) + { + case AMQP_RESPONSE_NORMAL: + sstrncpy (buffer, "Success", sizeof (buffer)); + break; + + case AMQP_RESPONSE_NONE: + sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer)); + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + if (r.library_errno) + return (sstrerror (r.library_errno, buffer, buffer_size)); + else + sstrncpy (buffer, "End of stream", sizeof (buffer)); + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD) + { + amqp_connection_close_t *m = r.reply.decoded; + char *tmp = camqp_bytes_cstring (&m->reply_text); + ssnprintf (buffer, buffer_size, "Server connection error %d: %s", + m->reply_code, tmp); + sfree (tmp); + } + else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD) + { + amqp_channel_close_t *m = r.reply.decoded; + char *tmp = camqp_bytes_cstring (&m->reply_text); + ssnprintf (buffer, buffer_size, "Server channel error %d: %s", + m->reply_code, tmp); + sfree (tmp); + } + else + { + ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32, + r.reply.id); + } + break; + + default: + ssnprintf (buffer, buffer_size, "Unknown reply type %i", + (int) r.reply_type); + } + + return (buffer); +} /* }}} char *camqp_strerror */ + +static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ +{ + amqp_queue_declare_ok_t *qd_ret; + amqp_basic_consume_ok_t *cm_ret; + + qd_ret = amqp_queue_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ (conf->queue != NULL) + ? amqp_cstring_bytes (conf->queue) + : AMQP_EMPTY_BYTES, + /* passive = */ 0, + /* durable = */ 0, + /* exclusive = */ 0, + /* auto_delete = */ 1, + /* arguments = */ AMQP_EMPTY_TABLE); + if (qd_ret == NULL) + { + ERROR ("amqp plugin: amqp_queue_declare failed."); + camqp_close_connection (conf); + return (-1); + } + + if (conf->queue == NULL) + { + conf->queue = camqp_bytes_cstring (&qd_ret->queue); + if (conf->queue == NULL) + { + ERROR ("amqp plugin: camqp_bytes_cstring failed."); + camqp_close_connection (conf); + return (-1); + } + + INFO ("amqp plugin: Created queue \"%s\".", conf->queue); + } + DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue); + + /* bind to an exchange */ + if (conf->exchange != NULL) + { + amqp_queue_bind_ok_t *qb_ret; + + /* create the exchange */ + if (conf->exchange_type != NULL) + { + amqp_exchange_declare_ok_t *ed_ret; + + ed_ret = amqp_exchange_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* exchange = */ amqp_cstring_bytes (conf->exchange), + /* type = */ amqp_cstring_bytes (conf->exchange_type), + /* passive = */ 0, + /* durable = */ 0, + /* auto_delete = */ 1, + /* arguments = */ AMQP_EMPTY_TABLE); + if ((ed_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_exchange_declare failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + } + + DEBUG ("amqp plugin: queue = %s; exchange = %s; routing_key = %s;", + conf->queue, conf->exchange, CONF (conf, routingkey)); + + assert (conf->queue != NULL); + qb_ret = amqp_queue_bind (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ amqp_cstring_bytes (conf->queue), + /* exchange = */ amqp_cstring_bytes (conf->exchange), +#if 1 + /* routing_key = */ amqp_cstring_bytes (CONF (conf, routingkey)), +#else + /* routing_key = */ AMQP_EMPTY_BYTES, +#endif + /* arguments = */ AMQP_EMPTY_TABLE); + if ((qb_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_queue_bind failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + } /* if (conf->exchange != NULL) */ + + cm_ret = amqp_basic_consume (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ amqp_cstring_bytes (conf->queue), + /* consumer_tag = */ AMQP_EMPTY_BYTES, + /* no_local = */ 0, + /* no_ack = */ 1, + /* exclusive = */ 0); + if ((cm_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_basic_consume failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + return (0); +} /* }}} int camqp_setup_queue */ + static int camqp_connect (camqp_config_t *conf) /* {{{ */ { amqp_rpc_reply_t reply; @@ -186,9 +380,172 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" " "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port); + + if (!conf->publish) + return (camqp_setup_queue (conf)); return (0); } /* }}} int camqp_connect */ +static int camqp_read_body (camqp_config_t *conf, /* {{{ */ + size_t body_size) +{ + char body[body_size + 1]; + char *body_ptr; + size_t received; + amqp_frame_t frame; + int status; + + memset (body, 0, sizeof (body)); + body_ptr = &body[0]; + received = 0; + + while (received < body_size) + { + status = amqp_simple_wait_frame (conf->connection, &frame); + if (status < 0) + { + char errbuf[1024]; + status = (-1) * status; + ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (status); + } + + if (frame.frame_type != AMQP_FRAME_BODY) + { + NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8, + frame.frame_type); + return (-1); + } + + if ((body_size - received) < frame.payload.body_fragment.len) + { + WARNING ("amqp plugin: Body is larger than indicated by header."); + return (-1); + } + + memcpy (body_ptr, frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); + body_ptr += frame.payload.body_fragment.len; + received += frame.payload.body_fragment.len; + } /* while (received < body_size) */ + + DEBUG ("amqp plugin: camqp_read_body: body = %s", body); + + return (0); +} /* }}} int camqp_read_body */ + +static int camqp_read_header (camqp_config_t *conf) /* {{{ */ +{ + int status; + amqp_frame_t frame; + + status = amqp_simple_wait_frame (conf->connection, &frame); + if (status < 0) + { + char errbuf[1024]; + status = (-1) * status; + ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (status); + } + + if (frame.frame_type != AMQP_FRAME_HEADER) + { + NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8, + frame.frame_type); + return (-1); + } + + return (camqp_read_body (conf, frame.payload.properties.body_size)); +} /* }}} int camqp_read_header */ + +static void *camqp_subscribe_thread (void *user_data) /* {{{ */ +{ + camqp_config_t *conf = user_data; + int status; + + while (subscriber_threads_running) + { + amqp_frame_t frame; + + status = camqp_connect (conf); + if (status != 0) + { + ERROR ("amqp plugin: camqp_connect failed. " + "Will sleep for %i seconds.", interval_g); + sleep (interval_g); + continue; + } + + status = amqp_simple_wait_frame (conf->connection, &frame); + if (status < 0) + { + ERROR ("amqp plugin: amqp_simple_wait_frame failed. " + "Will sleep for %i seconds.", interval_g); + camqp_close_connection (conf); + sleep (interval_g); + continue; + } + + if (frame.frame_type != AMQP_FRAME_METHOD) + { + DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8, + frame.frame_type); + continue; + } + + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + { + DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32, + frame.payload.method.id); + continue; + } + + status = camqp_read_header (conf); + + amqp_maybe_release_buffers (conf->connection); + } /* while (subscriber_threads_running) */ + + camqp_config_free (conf); + pthread_exit (NULL); +} /* }}} void *camqp_subscribe_thread */ + +static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ +{ + int status; + pthread_t *tmp; + + tmp = realloc (subscriber_threads, + sizeof (*subscriber_threads) * (subscriber_threads_num + 1)); + if (tmp == NULL) + { + ERROR ("amqp plugin: realloc failed."); + camqp_config_free (conf); + return (ENOMEM); + } + subscriber_threads = tmp; + tmp = subscriber_threads + subscriber_threads_num; + memset (tmp, 0, sizeof (*tmp)); + + status = pthread_create (tmp, /* attr = */ NULL, + camqp_subscribe_thread, conf); + if (status != 0) + { + char errbuf[1024]; + ERROR ("amqp plugin: pthread_create failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + camqp_config_free (conf); + return (status); + } + + subscriber_threads_num++; + + return (0); +} /* }}} int camqp_subscribe_init */ + static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ const char *buffer) { @@ -297,7 +654,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ oconfig_item_t *child = ci->children + i; if (strcasecmp ("Host", child->key) == 0) - status = cf_util_get_string (ci, &conf->host); + status = cf_util_get_string (child, &conf->host); else if (strcasecmp ("Port", child->key) == 0) { status = cf_util_get_port_number (child); @@ -308,30 +665,30 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ } } else if (strcasecmp ("VHost", child->key) == 0) - status = cf_util_get_string (ci, &conf->vhost); + status = cf_util_get_string (child, &conf->vhost); else if (strcasecmp ("User", child->key) == 0) - status = cf_util_get_string (ci, &conf->user); + status = cf_util_get_string (child, &conf->user); else if (strcasecmp ("Password", child->key) == 0) - status = cf_util_get_string (ci, &conf->password); + status = cf_util_get_string (child, &conf->password); else if (strcasecmp ("Exchange", child->key) == 0) - status = cf_util_get_string (ci, &conf->exchange); + status = cf_util_get_string (child, &conf->exchange); else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish) - status = cf_util_get_string (ci, &conf->exchange_type); + status = cf_util_get_string (child, &conf->exchange_type); else if ((strcasecmp ("Queue", child->key) == 0) && !publish) - status = cf_util_get_string (ci, &conf->queue); + status = cf_util_get_string (child, &conf->queue); else if (strcasecmp ("RoutingKey", child->key) == 0) - status = cf_util_get_string (ci, &conf->routingkey); + status = cf_util_get_string (child, &conf->routingkey); else if (strcasecmp ("Persistent", child->key) == 0) { _Bool tmp = 0; - status = cf_util_get_boolean (ci, &tmp); + status = cf_util_get_boolean (child, &tmp); if (tmp) conf->delivery_mode = CAMQP_DM_PERSISTENT; else conf->delivery_mode = CAMQP_DM_VOLATILE; } else if (strcasecmp ("StoreRates", child->key) == 0) - status = cf_util_get_boolean (ci, &conf->store_rates); + status = cf_util_get_boolean (child, &conf->store_rates); else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key); @@ -357,6 +714,12 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ return (status); } + if (conf->exchange != NULL) + { + DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;", + conf->exchange); + } + if (publish) { char cbname[128]; @@ -371,6 +734,15 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ return (status); } } + else + { + status = camqp_subscribe_init (conf); + if (status != 0) + { + camqp_config_free (conf); + return (status); + } + } return (0); } /* }}} int camqp_config_connection */ @@ -385,6 +757,8 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */ if (strcasecmp ("Publish", child->key) == 0) camqp_config_connection (child, /* publish = */ 1); + else if (strcasecmp ("Subscribe", child->key) == 0) + camqp_config_connection (child, /* publish = */ 0); else WARNING ("amqp plugin: Ignoring unknown config option \"%s\".", child->key); @@ -395,7 +769,26 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */ static int shutdown (void) /* {{{ */ { - /* FIXME: Set a global shutdown variable here. */ + size_t i; + + DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", + subscriber_threads_num); + + subscriber_threads_running = 0; + for (i = 0; i < subscriber_threads_num; i++) + { + /* FIXME: Sending a signal is not very elegant here. Maybe find out how + * to use a timeout in the thread and check for the variable in regular + * intervals. */ + pthread_kill (subscriber_threads[i], SIGTERM); + pthread_join (subscriber_threads[i], /* retval = */ NULL); + } + + subscriber_threads_num = 0; + sfree (subscriber_threads); + + DEBUG ("amqp plugin: All subscriber threads exited."); + return (0); } /* }}} int shutdown */ -- 2.30.2