From 0af81ad699d88660e5efd0da377cc439a9335f4c Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Mon, 9 Mar 2009 18:45:06 +0100 Subject: [PATCH] gmond plugin: Add a configuration. --- src/collectd.conf.in | 14 ++ src/collectd.conf.pod | 82 ++++++++++ src/gmond.c | 369 +++++++++++++++++++++++++++++++++--------- 3 files changed, 393 insertions(+), 72 deletions(-) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 1f56f1fc..ae8913fd 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -244,6 +244,20 @@ FQDNLookup true # # +# +# MCReceiveFrom "239.2.11.71" "8649" +# +# Type "swap" +# TypeInstance "total" +# DataSource "value" +# +# +# Type "swap" +# TypeInstance "free" +# DataSource "value" +# +# + # # Host "127.0.0.1" # Port "7634" diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index d3240c94..11f9b724 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -959,6 +959,88 @@ Controls whether or not to recurse into subdirectories. Enabled by default. =back +=head2 Plugin C + +The I plugin received the multicast traffic sent by B, the +statistics collection daemon of Ganglia. Mappings for the standard "metrics" +are built-in, custom mappings may be added via B blocks, see below. + +Synopsis: + + + MCReceiveFrom "239.2.11.71" "8649" + + Type "swap" + TypeInstance "total" + DataSource "value" + + + Type "swap" + TypeInstance "free" + DataSource "value" + + + +The following metrics are built-in: + +=over 4 + +=item * + +load_one, load_five, load_fifteen + +=item * + +cpu_user, cpu_system, cpu_idle, cpu_nice, cpu_wio + +=item * + +mem_free, mem_shared, mem_buffers, mem_cached, mem_total + +=item * + +bytes_in, bytes_out + +=item * + +pkts_in, pkts_out + +=back + +Available configuration options: + +=over 4 + +=item B I [I] + +Sets sets the multicast group and UDP port to which to subscribe. + +Default: B<239.2.11.71>E/EB<8649> + +=item EB IE + +These blocks add a new metric conversion to the internal table. I, the +string argument to the B block, is the metric name as used by Ganglia. + +=over 4 + +=item B I + +Type to map this metric to. Required. + +=item B I + +Type-instance to use. Optional. + +=item B I + +Data source to map this metric to. If the configured type has exactly one data +source, this is optional. Otherwise the option is required. + +=back + +=back + =head2 Plugin C To get values from B collectd connects to B (127.0.0.1), diff --git a/src/gmond.c b/src/gmond.c index a15099b5..93bbc95d 100644 --- a/src/gmond.c +++ b/src/gmond.c @@ -25,8 +25,6 @@ #include "configfile.h" #include "utils_avltree.h" -#include "network.h" - #if HAVE_PTHREAD_H # include #endif @@ -80,14 +78,20 @@ typedef struct staging_entry_s staging_entry_t; struct metric_map_s { - const char *ganglia_name; - const char *type; - const char *type_instance; - int ds_type; - size_t value_index; + char *ganglia_name; + char *type; + char *type_instance; + char *ds_name; + int ds_type; + int ds_index; }; typedef struct metric_map_s metric_map_t; +#define MC_RECEIVE_GROUP_DEFAULT "239.2.11.71" +static char *mc_receive_group = NULL; +#define MC_RECEIVE_PORT_DEFAULT "8649" +static char *mc_receive_port = NULL; + static struct pollfd *mc_receive_sockets = NULL; static size_t mc_receive_sockets_num = 0; @@ -99,31 +103,111 @@ static int mc_receive_thread_loop = 0; static int mc_receive_thread_running = 0; static pthread_t mc_receive_thread_id; -static metric_map_t metric_map[] = -{ - { "load_one", "load", "", DS_TYPE_GAUGE, 0 }, - { "load_five", "load", "", DS_TYPE_GAUGE, 1 }, - { "load_fifteen", "load", "", DS_TYPE_GAUGE, 2 }, - { "cpu_user", "cpu", "user", DS_TYPE_COUNTER, 0 }, - { "cpu_system", "cpu", "system", DS_TYPE_COUNTER, 0 }, - { "cpu_idle", "cpu", "idle", DS_TYPE_COUNTER, 0 }, - { "cpu_nice", "cpu", "nice", DS_TYPE_COUNTER, 0 }, - { "cpu_wio", "cpu", "wait", DS_TYPE_COUNTER, 0 }, - { "mem_free", "memory", "free", DS_TYPE_GAUGE, 0 }, - { "mem_shared", "memory", "shared", DS_TYPE_GAUGE, 0 }, - { "mem_buffers", "memory", "buffered", DS_TYPE_GAUGE, 0 }, - { "mem_cached", "memory", "cached", DS_TYPE_GAUGE, 0 }, - { "mem_total", "memory", "total", DS_TYPE_GAUGE, 0 }, - { "bytes_in", "if_octets", "", DS_TYPE_COUNTER, 0 }, - { "bytes_out", "if_octets", "", DS_TYPE_COUNTER, 1 }, - { "pkts_in", "if_packets", "", DS_TYPE_COUNTER, 0 }, - { "pkts_out", "if_packets", "", DS_TYPE_COUNTER, 1 } +static metric_map_t metric_map_default[] = +{ /*---------------+-------------+-----------+-------------+------+-----* + * ganglia_name ! type ! type_inst ! data_source ! type ! idx * + *---------------+-------------+-----------+-------------+------+-----*/ + { "load_one", "load", "", "shortterm", -1, -1 }, + { "load_five", "load", "", "midterm", -1, -1 }, + { "load_fifteen", "load", "", "longterm", -1, -1 }, + { "cpu_user", "cpu", "user", "value", -1, -1 }, + { "cpu_system", "cpu", "system", "value", -1, -1 }, + { "cpu_idle", "cpu", "idle", "value", -1, -1 }, + { "cpu_nice", "cpu", "nice", "value", -1, -1 }, + { "cpu_wio", "cpu", "wait", "value", -1, -1 }, + { "mem_free", "memory", "free", "value", -1, -1 }, + { "mem_shared", "memory", "shared", "value", -1, -1 }, + { "mem_buffers", "memory", "buffered", "value", -1, -1 }, + { "mem_cached", "memory", "cached", "value", -1, -1 }, + { "mem_total", "memory", "total", "value", -1, -1 }, + { "bytes_in", "if_octets", "", "rx", -1, -1 }, + { "bytes_out", "if_octets", "", "tx", -1, -1 }, + { "pkts_in", "if_packets", "", "rx", -1, -1 }, + { "pkts_out", "if_packets", "", "tx", -1, -1 } }; -static size_t metric_map_len = STATIC_ARRAY_SIZE (metric_map); +static size_t metric_map_len_default = STATIC_ARRAY_SIZE (metric_map_default); + +static metric_map_t *metric_map = NULL; +static size_t metric_map_len = 0; static c_avl_tree_t *staging_tree; static pthread_mutex_t staging_lock = PTHREAD_MUTEX_INITIALIZER; +static metric_map_t *metric_lookup (const char *key) /* {{{ */ +{ + metric_map_t *map; + size_t map_len; + size_t i; + + /* Search the user-supplied table first.. */ + map = metric_map; + map_len = metric_map_len; + for (i = 0; i < map_len; i++) + if (strcmp (map[i].ganglia_name, key) == 0) + break; + + /* .. and fall back to the built-in table if nothing is found. */ + if (i >= map_len) + { + map = metric_map_default; + map_len = metric_map_len_default; + + for (i = 0; i < map_len; i++) + if (strcmp (map[i].ganglia_name, key) == 0) + break; + } + + if (i >= map_len) + return (NULL); + + /* Look up the DS type and ds_index. */ + if ((map[i].ds_type < 0) || (map[i].ds_index < 0)) /* {{{ */ + { + const data_set_t *ds; + + ds = plugin_get_ds (map[i].type); + if (ds == NULL) + { + WARNING ("gmond plugin: Type not defined: %s", map[i].type); + return (NULL); + } + + if ((map[i].ds_name == NULL) && (ds->ds_num != 1)) + { + WARNING ("gmond plugin: No data source name defined for metric %s, " + "but type %s has more than one data source.", + map[i].ganglia_name, map[i].type); + return (NULL); + } + + if (map[i].ds_name == NULL) + { + map[i].ds_index = 0; + } + else + { + int j; + + for (j = 0; j < ds->ds_num; j++) + if (strcasecmp (ds->ds[j].name, map[i].ds_name) == 0) + break; + + if (j >= ds->ds_num) + { + WARNING ("gmond plugin: There is no data source " + "named `%s' in type `%s'.", + map[i].ds_name, ds->type); + return (NULL); + } + map[i].ds_index = j; + } + + map[i].ds_type = ds->ds[map[i].ds_index].type; + } /* }}} if ((map[i].ds_type < 0) || (map[i].ds_index < 0)) */ + + return (map + i); +} /* }}} metric_map_t *metric_lookup */ + static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */ size_t *ret_sockets_num, const char *node, const char *service, int listen) @@ -341,7 +425,8 @@ static staging_entry_t *staging_entry_get (const char *host, /* {{{ */ if (staging_tree == NULL) return (NULL); - ssnprintf (key, sizeof (key), "%s/%s/%s", host, type, type_instance); + ssnprintf (key, sizeof (key), "%s/%s/%s", host, type, + (type_instance != NULL) ? type_instance : ""); se = NULL; status = c_avl_get (staging_tree, key, (void *) &se); @@ -418,7 +503,7 @@ static int staging_entry_submit (const char *host, const char *name, /* {{{ */ static int staging_entry_update (const char *host, const char *name, /* {{{ */ const char *type, const char *type_instance, - int value_index, int ds_type, value_t value) + int ds_index, int ds_type, value_t value) { const data_set_t *ds; staging_entry_t *se; @@ -430,10 +515,10 @@ static int staging_entry_update (const char *host, const char *name, /* {{{ */ return (-1); } - if (ds->ds_num <= value_index) + if (ds->ds_num <= ds_index) { ERROR ("gmond plugin: Invalid index %i: %s has only %i data source(s).", - value_index, ds->type, ds->ds_num); + ds_index, ds->type, ds->ds_num); return (-1); } @@ -453,13 +538,10 @@ static int staging_entry_update (const char *host, const char *name, /* {{{ */ } if (ds_type == DS_TYPE_COUNTER) - se->vl.values[value_index].counter += value.counter; + se->vl.values[ds_index].counter += value.counter; else if (ds_type == DS_TYPE_GAUGE) - se->vl.values[value_index].gauge = value.gauge; - se->flags |= (0x01 << value_index); - - DEBUG ("gmond plugin: key = %s; flags = %i;", - se->key, se->flags); + se->vl.values[ds_index].gauge = value.gauge; + se->flags |= (0x01 << ds_index); /* Check if all values have been set and submit if so. */ if (se->flags == ((0x01 << se->vl.values_len) - 1)) @@ -479,12 +561,11 @@ static int mc_handle_value_msg (Ganglia_value_msg *msg) /* {{{ */ { const char *host; const char *name; + metric_map_t *map; value_t value_counter; value_t value_gauge; - size_t i; - /* Fill in `host', `name', `value_counter', and `value_gauge' according to * the value type, or return with an error. */ switch (msg->id) /* {{{ */ @@ -561,21 +642,14 @@ static int mc_handle_value_msg (Ganglia_value_msg *msg) /* {{{ */ assert (host != NULL); assert (name != NULL); - for (i = 0; i < metric_map_len; i++) - { - if (strcmp (name, metric_map[i].ganglia_name) != 0) - continue; - + map = metric_lookup (name); + if (map != NULL) return (staging_entry_update (host, name, - metric_map[i].type, metric_map[i].type_instance, - metric_map[i].value_index, metric_map[i].ds_type, - (metric_map[i].ds_type == DS_TYPE_COUNTER) - ? value_counter - : value_gauge)); - } + map->type, map->type_instance, + map->ds_index, map->ds_type, + (map->ds_type == DS_TYPE_COUNTER) ? value_counter : value_gauge)); DEBUG ("gmond plugin: Cannot find a translation for %s.", name); - return (-1); } /* }}} int mc_handle_value_msg */ @@ -588,31 +662,25 @@ static int mc_handle_metadata_msg (Ganglia_metadata_msg *msg) /* {{{ */ Ganglia_metadatadef msg_meta; staging_entry_t *se; const data_set_t *ds; - size_t i; + metric_map_t *map; msg_meta = msg->Ganglia_metadata_msg_u.gfull; if (msg_meta.metric.tmax <= 0) return (-1); - for (i = 0; i < metric_map_len; i++) - { - if (strcmp (msg_meta.metric_id.name, metric_map[i].ganglia_name) == 0) - break; - } - - if (i >= metric_map_len) + map = metric_lookup (msg_meta.metric_id.name); + if (map == NULL) { DEBUG ("gmond plugin: Not handling meta data %s.", msg_meta.metric_id.name); return (0); } - ds = plugin_get_ds (metric_map[i].type); + ds = plugin_get_ds (map->type); if (ds == NULL) { - WARNING ("gmond plugin: Could not find data set %s.", - metric_map[i].type); + WARNING ("gmond plugin: Could not find data set %s.", map->type); return (-1); } @@ -622,7 +690,7 @@ static int mc_handle_metadata_msg (Ganglia_metadata_msg *msg) /* {{{ */ pthread_mutex_lock (&staging_lock); se = staging_entry_get (msg_meta.metric_id.host, msg_meta.metric_id.name, - metric_map[i].type, metric_map[i].type_instance, + map->type, map->type_instance, ds->ds_num); if (se != NULL) se->vl.interval = (int) msg_meta.metric.tmax; @@ -726,7 +794,9 @@ static void *mc_receive_thread (void *arg) /* {{{ */ mc_receive_socket_entries = NULL; status = create_sockets (&mc_receive_socket_entries, &mc_receive_sockets_num, - "239.2.11.71", "8649", /* listen = */ 1); + (mc_receive_group != NULL) ? mc_receive_group : MC_RECEIVE_GROUP_DEFAULT, + (mc_receive_port != NULL) ? mc_receive_port : MC_RECEIVE_PORT_DEFAULT, + /* listen = */ 1); if (status != 0) { ERROR ("gmond plugin: create_sockets failed."); @@ -816,23 +886,177 @@ static int mc_receive_thread_stop (void) /* {{{ */ } /* }}} int mc_receive_thread_stop */ /* - * TODO: Config: + * Config: * * * MCReceiveFrom "239.2.11.71" "8649" - * MCSendTo "239.2.11.71" "8649" * * Type "load" * [TypeInstance "foo"] - * [Index 0] + * [DataSource "bar"] * * */ +static int gmond_config_set_string (oconfig_item_t *ci, char **str) /* {{{ */ +{ + char *tmp; + + if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) + { + WARNING ("gmond plugin: The `%s' option needs " + "exactly one string argument.", ci->key); + return (-1); + } + + tmp = strdup (ci->values[0].value.string); + if (tmp == NULL) + { + ERROR ("gmond plugin: strdup failed."); + return (-1); + } + + sfree (*str); + *str = tmp; + return (0); +} /* }}} int gmond_config_set_string */ + +static int gmond_config_add_metric (oconfig_item_t *ci) /* {{{ */ +{ + metric_map_t *map; + int i; + + if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) + { + WARNING ("gmond plugin: `Metric' blocks need " + "exactly one string argument."); + return (-1); + } + + map = realloc (metric_map, (metric_map_len + 1) * sizeof (*metric_map)); + if (map == NULL) + { + ERROR ("gmond plugin: realloc failed."); + return (-1); + } + metric_map = map; + map = metric_map + metric_map_len; + + memset (map, 0, sizeof (*map)); + map->type = NULL; + map->type_instance = NULL; + map->ds_name = NULL; + map->ds_type = -1; + map->ds_index = -1; + + map->ganglia_name = strdup (ci->values[0].value.string); + if (map->ganglia_name == NULL) + { + ERROR ("gmond plugin: strdup failed."); + return (-1); + } + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + if (strcasecmp ("Type", child->key) == 0) + gmond_config_set_string (child, &map->type); + else if (strcasecmp ("TypeInstance", child->key) == 0) + gmond_config_set_string (child, &map->type_instance); + else if (strcasecmp ("DataSource", child->key) == 0) + gmond_config_set_string (child, &map->ds_name); + else + { + WARNING ("gmond plugin: Unknown configuration option `%s' ignored.", + child->key); + } + } + + if (map->type == NULL) + { + ERROR ("gmond plugin: No type is set for metric %s.", + map->ganglia_name); + sfree (map->ganglia_name); + sfree (map->type_instance); + return (-1); + } + + metric_map_len++; + return (0); +} /* }}} int gmond_config_add_metric */ + +static int gmond_config_set_address (oconfig_item_t *ci, /* {{{ */ + char **ret_addr, char **ret_port) +{ + char *addr; + char *port; + + if ((ci->values_num != 1) && (ci->values_num != 2)) + { + WARNING ("gmond plugin: The `%s' config option needs " + "one or two string arguments.", + ci->key); + return (-1); + } + if ((ci->values[0].type != OCONFIG_TYPE_STRING) + || ((ci->values_num == 2) + && (ci->values[1].type != OCONFIG_TYPE_STRING))) + { + WARNING ("gmond plugin: The `%s' config option needs " + "one or two string arguments.", + ci->key); + return (-1); + } + + addr = strdup (ci->values[0].value.string); + if (ci->values_num == 2) + port = strdup (ci->values[1].value.string); + else + port = NULL; + + if ((addr == NULL) || ((ci->values_num == 2) && (port == NULL))) + { + ERROR ("gmond plugin: strdup failed."); + sfree (addr); + sfree (port); + return (-1); + } + + sfree (*ret_addr); + sfree (*ret_port); + + *ret_addr = addr; + *ret_port = port; + + return (0); +} /* }}} int gmond_config_set_address */ + +static int gmond_config (oconfig_item_t *ci) /* {{{ */ +{ + int i; + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + if (strcasecmp ("MCReceiveFrom", child->key) == 0) + gmond_config_set_address (child, &mc_receive_group, &mc_receive_port); + else if (strcasecmp ("Metric", child->key) == 0) + gmond_config_add_metric (child); + else + { + WARNING ("gmond plugin: Unknown configuration option `%s' ignored.", + child->key); + } + } + + return (0); +} /* }}} int gmond_config */ -static int gmond_init (void) +static int gmond_init (void) /* {{{ */ { create_sockets (&mc_send_sockets, &mc_send_sockets_num, - "239.2.11.71", "8649", /* listen = */ 0); + (mc_receive_group != NULL) ? mc_receive_group : MC_RECEIVE_GROUP_DEFAULT, + (mc_receive_port != NULL) ? mc_receive_port : MC_RECEIVE_PORT_DEFAULT, + /* listen = */ 0); staging_tree = c_avl_create ((void *) strcmp); if (staging_tree == NULL) @@ -844,9 +1068,9 @@ static int gmond_init (void) mc_receive_thread_start (); return (0); -} /* int gmond_init */ +} /* }}} int gmond_init */ -static int gmond_shutdown (void) +static int gmond_shutdown (void) /* {{{ */ { size_t i; @@ -864,10 +1088,11 @@ static int gmond_shutdown (void) return (0); -} /* int gmond_shutdown */ +} /* }}} int gmond_shutdown */ void module_register (void) { + plugin_register_complex_config ("gmond", gmond_config); plugin_register_init ("gmond", gmond_init); plugin_register_shutdown ("gmond", gmond_shutdown); } -- 2.30.2