summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 91ca6eb)
raw | patch | inline | side by side (parent: 91ca6eb)
author | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Fri, 6 Aug 2010 13:01:40 +0000 (15:01 +0200) | ||
committer | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Fri, 6 Aug 2010 13:03:08 +0000 (15:03 +0200) |
By default, the publishing code now creates "command" (i.e. PUTVAL) output.
For now this is easier to parse, so use this for the subscribing code.
(For now, anyways. I guess JSON will come later, too.)
For now this is easier to parse, so use this for the subscribing code.
(For now, anyways. I guess JSON will come later, too.)
src/Makefile.am | patch | blob | history | |
src/amqp.c | patch | blob | history |
diff --git a/src/Makefile.am b/src/Makefile.am
index 222d91685c8314d464012857470c4ef0d9c45d41..d4d6a1badb799c0c7ae662666ff7bba705b99d25 100644 (file)
--- a/src/Makefile.am
+++ b/src/Makefile.am
if BUILD_PLUGIN_AMQP
pkglib_LTLIBRARIES += amqp.la
-amqp_la_SOURCES = amqp.c utils_format_json.c utils_format_json.h
+amqp_la_SOURCES = amqp.c \
+ utils_cmd_putval.c utils_cmd_putval.h \
+ utils_format_json.c utils_format_json.h
amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS)
diff --git a/src/amqp.c b/src/amqp.c
index eb54795069f1bcc17f39d2571981d50137f49e99..84bcc066efcf41a93a14e11d82640e62c0dc6e14 100644 (file)
--- a/src/amqp.c
+++ b/src/amqp.c
#include "collectd.h"
#include "common.h"
#include "plugin.h"
+#include "utils_cmd_putval.h"
#include "utils_format_json.h"
#include <amqp.h>
#define CAMQP_DM_VOLATILE 1
#define CAMQP_DM_PERSISTENT 2
+#define CAMQP_FORMAT_COMMAND 1
+#define CAMQP_FORMAT_JSON 2
+
#define CAMQP_CHANNEL 1
/*
{
_Bool publish;
char *name;
+ int format;
char *host;
int port;
return (0);
} /* }}} int camqp_connect */
+static int shutdown (void) /* {{{ */
+{
+ size_t i;
+
+ DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
+ subscriber_threads_num);
+
+ subscriber_threads_running = 0;
+ for (i = 0; i < subscriber_threads_num; i++)
+ {
+ /* FIXME: Sending a signal is not very elegant here. Maybe find out how
+ * to use a timeout in the thread and check for the variable in regular
+ * intervals. */
+ pthread_kill (subscriber_threads[i], SIGTERM);
+ pthread_join (subscriber_threads[i], /* retval = */ NULL);
+ }
+
+ subscriber_threads_num = 0;
+ sfree (subscriber_threads);
+
+ DEBUG ("amqp plugin: All subscriber threads exited.");
+
+ return (0);
+} /* }}} int shutdown */
+
+/*
+ * Subscribing code
+ */
static int camqp_read_body (camqp_config_t *conf, /* {{{ */
size_t body_size)
{
return (0);
} /* }}} int camqp_subscribe_init */
+/*
+ * Publishing code
+ */
static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
const char *buffer)
{
@@ -604,9 +640,27 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
bfree = sizeof (buffer);
bfill = 0;
- format_json_initialize (buffer, &bfill, &bfree);
- format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
- format_json_finalize (buffer, &bfill, &bfree);
+ if (conf->format == CAMQP_FORMAT_COMMAND)
+ {
+ status = create_putval (buffer, sizeof (buffer), ds, vl);
+ if (status != 0)
+ {
+ ERROR ("amqp plugin: create_putval failed with status %i.",
+ status);
+ return (status);
+ }
+ }
+ else if (conf->format == CAMQP_FORMAT_JSON)
+ {
+ format_json_initialize (buffer, &bfill, &bfree);
+ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
+ format_json_finalize (buffer, &bfill, &bfree);
+ }
+ else
+ {
+ ERROR ("amqp plugin: Invalid format (%i).", conf->format);
+ return (-1);
+ }
pthread_mutex_lock (&conf->lock);
status = camqp_write_locked (conf, buffer);
@@ -615,6 +669,36 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
return (status);
} /* }}} int camqp_write */
+/*
+ * Config handling
+ */
+static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
+ camqp_config_t *conf)
+{
+ char *string;
+ int status;
+
+ string = NULL;
+ status = cf_util_get_string (ci, &string);
+ if (status != 0)
+ return (status);
+
+ assert (string != NULL);
+ if (strcasecmp ("Command", string) == 0)
+ conf->format = CAMQP_FORMAT_COMMAND;
+ else if (strcasecmp ("JSON", string) == 0)
+ conf->format = CAMQP_FORMAT_JSON;
+ else
+ {
+ WARNING ("amqp plugin: Invalid format string: %s",
+ string);
+ }
+
+ free (string);
+
+ return (0);
+} /* }}} int config_set_string */
+
static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
_Bool publish)
{
memset (conf, 0, sizeof (*conf));
conf->publish = publish;
conf->name = NULL;
+ conf->format = CAMQP_FORMAT_COMMAND;
conf->host = NULL;
conf->port = 5672;
conf->vhost = NULL;
{
oconfig_item_t *child = ci->children + i;
- if (strcasecmp ("Host", child->key) == 0)
+ if (strcasecmp ("Format", child->key) == 0)
+ status = camqp_config_set_format (child, conf);
+ else if (strcasecmp ("Host", child->key) == 0)
status = cf_util_get_string (child, &conf->host);
else if (strcasecmp ("Port", child->key) == 0)
{
return (0);
} /* }}} int camqp_config */
-static int shutdown (void) /* {{{ */
-{
- size_t i;
-
- DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
- subscriber_threads_num);
-
- subscriber_threads_running = 0;
- for (i = 0; i < subscriber_threads_num; i++)
- {
- /* FIXME: Sending a signal is not very elegant here. Maybe find out how
- * to use a timeout in the thread and check for the variable in regular
- * intervals. */
- pthread_kill (subscriber_threads[i], SIGTERM);
- pthread_join (subscriber_threads[i], /* retval = */ NULL);
- }
-
- subscriber_threads_num = 0;
- sfree (subscriber_threads);
-
- DEBUG ("amqp plugin: All subscriber threads exited.");
-
- return (0);
-} /* }}} int shutdown */
-
void module_register (void)
{
plugin_register_complex_config ("amqp", camqp_config);