author | Florian Forster <octo@huhu.verplant.org> | |
Sat, 22 Mar 2008 08:41:42 +0000 (09:41 +0100) | ||
committer | Florian Forster <octo@huhu.verplant.org> | |
Sat, 22 Mar 2008 08:41:42 +0000 (09:41 +0100) |
Conflicts:
src/utils_cache.c
src/utils_cache.c
1 | 2 | |||
---|---|---|---|---|
configure.in | patch | | diff1 | | diff2 | | blob | history |
src/collectd.conf.pod | patch | | diff1 | | diff2 | | blob | history |
src/network.c | patch | | diff1 | | diff2 | | blob | history |
src/plugin.c | patch | | diff1 | | diff2 | | blob | history |
src/unixsock.c | patch | | diff1 | | diff2 | | blob | history |
src/utils_cache.c | patch | | diff1 | | diff2 | | blob | history |
src/utils_cache.h | patch | | diff1 | | diff2 | | blob | history |
diff --combined configure.in
index 036a9e46cba1661b6bee2cf4674d0841286d69f9,460b9316d74cc518a9391dfd26e30e3ce181f17f..e3662b765c2286748c162f4a331fcc2ff6595857
--- 1/configure.in
--- 2/configure.in
+++ b/configure.in
],
[with_rrdtool="no (symbol 'rrd_update' not found)"],
[-lm])
- ]
+ ],
[-lm])
CPPFLAGS="$SAVE_CPPFLAGS"
AC_PLUGIN([snmp], [$with_libnetsnmp], [SNMP querying plugin])
AC_PLUGIN([swap], [$plugin_swap], [Swap usage statistics])
AC_PLUGIN([syslog], [$have_syslog], [Syslog logging plugin])
+AC_PLUGIN([tail], [yes], [Parsing of logfiles])
AC_PLUGIN([tape], [$plugin_tape], [Tape drive statistics])
AC_PLUGIN([tcpconns], [$plugin_tcpconns], [TCP connection statistics])
AC_PLUGIN([unixsock], [yes], [Unixsock communication plugin])
snmp . . . . . . . $enable_snmp
swap . . . . . . . $enable_swap
syslog . . . . . . $enable_syslog
+ tail . . . . . . . $enable_tail
tape . . . . . . . $enable_tape
tcpconns . . . . . $enable_tcpconns
unixsock . . . . . $enable_unixsock
diff --combined src/collectd.conf.pod
index 7afe3f1249877208434787446d4f275f36f680f3,aa4421dc7e30bc8b25efcdae494bba8dd7e0bf09..00f56e7f3281541204edddb83ccb3897c1cde446
+++ b/src/collectd.conf.pod
Include "/etc/collectd.d/*.conf"
+ If more than one files are included by a single B<Include> option, the files
+ will be included in lexicographical order (as defined by the C<strcmp>
+ function). Thus, you can e.E<nbsp>g. use numbered prefixes to specify the
+ order in which the files are loaded.
+
To prevent loops and shooting yourself in the foot in interesting ways the
nesting is limited to a depth of 8E<nbsp>levels, which should be sufficient for
most uses. Since symlinks are followed it is still possible to crash the daemon
=back
+=head2 Plugin C<tail>
+
+The C<tail plugin> plugins follows logfiles, just like L<tail(1)> does, parses
+each line and dispatches found values. What is matched can be configured by the
+user using (extended) regular expressions, as described in L<regex(7)>.
+
+ <Plugin "tail">
+ <File "/var/log/exim4/mainlog">
+ Instance "exim"
+ <Match>
+ Regex "S=([1-9][0-9]*)"
+ DSType "CounterAdd"
+ Type "ipt_bytes"
+ Instance "total"
+ </Match>
+ <Match>
+ Regex "\\<R=local_user\\>"
+ DSType "CounterInc"
+ Type "email_count"
+ Instance "local_user"
+ </Match>
+ </File>
+ </Plugin>
+
+The config consists of one or more B<File> blocks, each of which configures one
+logfile to parse. Within each B<File> block, there are one or more B<Match>
+blocks, which configure a regular expression to search for.
+
+The B<Instance> option in the B<File> block may be used to set the plugin
+instance. So in the above example the plugin name C<tail-foo> would be used.
+This plugin instance is for all B<Match> blocks that B<follow> it, until the
+next B<Instance> option. This way you can extract several plugin instances from
+one logfile, handy when parsing syslog and the like.
+
+Each B<Match> block has the following options to describe how the match should
+be performed:
+
+=over 4
+
+=item B<Regex> I<regex>
+
+Sets the regular expression to use for matching against a line. The first
+subexpression has to match something that can be turned into a number by
+L<strtoll(3)> or L<strtod(3)>, depending on the value of C<CounterAdd>, see
+below. Because B<extended> regular expressions are used, you do not need to use
+backslashes for subexpressions! If in doubt, please consult L<regex(7)>. Due to
+collectd's config parsing you need to escape backslashes, though. So if you
+want to match literal parentheses you need to do the following:
+
+ Regex "SPAM \\(Score: (-?[0-9]+\\.[0-9]+)\\)"
+
+=item B<DSType> I<Type>
+
+Sets how the values are cumulated. I<Type> is one of:
+
+=over 4
+
+=item B<GaugeAverage>
+
+Calculate the average.
+
+=item B<GaugeMin>
+
+Use the smallest number only.
+
+=item B<GaugeMax>
+
+Use the greatest number only.
+
+=item B<GaugeLast>
+
+Use the last number found.
+
+=item B<CounterSet>
+
+The matched number is a counter. Simply sets the internal counter to this
+value.
+
+=item B<CounterAdd>
+
+Add the matched value to the internal counter.
+
+=item B<CounterInc>
+
+Increase the internal counter by one. This B<DSType> is the only one that does
+not use the matched subexpression, but simply counts the number of matched
+lines. Thus, you may use a regular expression without submatch in this case.
+
+=back
+
+As you'd expect the B<Gauge*> types interpret the submatch as a floating point
+number, using L<strtod(3)>. The B<CounterSet> and B<CounterAdd> interpret the
+submatch as an integer using L<strtoll(3)>. B<CounterInc> does not use the
+submatch at all and it may be omitted in this case.
+
+=item B<Type> I<Type>
+
+Sets the type used to dispatch this value. Detailed information about types and
+their configuration can be found in L<types.db(5)>.
+
+=item B<Instance> I<TypeInstance>
+
+This optional setting sets the type instance to use.
+
+=back
+
=head2 Plugin C<tcpconns>
The C<tcpconns plugin> counts the number of currently established TCP
diff --combined src/network.c
index f81717fed22283af360d35d2ac04c2a2a0e40ba0,b67928c7c2fe6a10aec868ba61c5c7fbe512f6c9..e15036424d880316ec58bcdffd7d9d3cc76ff2ae
--- 1/src/network.c
--- 2/src/network.c
+++ b/src/network.c
static int write_part_string (char **ret_buffer, int *ret_buffer_len,
int type, const char *str, int str_len)
{
- char *packet_ptr;
- int packet_len;
+ char *buffer;
+ int buffer_len;
- part_header_t pkg_head;
+ uint16_t pkg_type;
+ uint16_t pkg_length;
int offset;
- packet_len = sizeof (pkg_head) + str_len + 1;
- if (*ret_buffer_len < packet_len)
+ buffer_len = 2 * sizeof (uint16_t) + str_len + 1;
+ if (*ret_buffer_len < buffer_len)
return (-1);
- pkg_head.type = htons (type);
- pkg_head.length = htons (packet_len);
+ pkg_type = htons (type);
+ pkg_length = htons (buffer_len);
- packet_ptr = *ret_buffer;
+ buffer = *ret_buffer;
offset = 0;
- memcpy (packet_ptr + offset, &pkg_head, sizeof (pkg_head));
- offset += sizeof (pkg_head);
- memcpy (packet_ptr + offset, str, str_len);
+ memcpy (buffer + offset, (void *) &pkg_type, sizeof (pkg_type));
+ offset += sizeof (pkg_type);
+ memcpy (buffer + offset, (void *) &pkg_length, sizeof (pkg_length));
+ offset += sizeof (pkg_length);
+ memcpy (buffer + offset, str, str_len);
offset += str_len;
- memset (packet_ptr + offset, '\0', 1);
+ memset (buffer + offset, '\0', 1);
offset += 1;
- assert (offset == packet_len);
+ assert (offset == buffer_len);
- *ret_buffer = packet_ptr + packet_len;
- *ret_buffer_len -= packet_len;
+ *ret_buffer = buffer + buffer_len;
+ *ret_buffer_len -= buffer_len;
return (0);
} /* int write_part_string */
{
char *buffer = *ret_buffer;
int buffer_len = *ret_buffer_len;
- part_values_t pv;
+
+ uint16_t tmp16;
+ size_t exp_size;
int i;
- uint16_t h_length;
- uint16_t h_type;
- uint16_t h_num;
+ uint16_t pkg_length;
+ uint16_t pkg_type;
+ uint16_t pkg_numval;
+
+ uint8_t *pkg_types;
+ value_t *pkg_values;
if (buffer_len < (15))
{
return (-1);
}
- pv.head = (part_header_t *) buffer;
- h_length = ntohs (pv.head->length);
- h_type = ntohs (pv.head->type);
+ memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
+ buffer += sizeof (tmp16);
+ pkg_type = ntohs (tmp16);
+
+ memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
+ buffer += sizeof (tmp16);
+ pkg_length = ntohs (tmp16);
- assert (h_type == TYPE_VALUES);
+ memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
+ buffer += sizeof (tmp16);
+ pkg_numval = ntohs (tmp16);
- pv.num_values = (uint16_t *) (pv.head + 1);
- h_num = ntohs (*pv.num_values);
+ assert (pkg_type == TYPE_VALUES);
- if (h_num != ((h_length - 6) / 9))
+ exp_size = 3 * sizeof (uint16_t)
+ + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
+ if (buffer_len < exp_size)
{
- DEBUG ("`length' and `num of values' don't match");
+ WARNING ("network plugin: parse_part_values: "
+ "Packet too short: "
+ "Chunk of size %u expected, "
+ "but buffer has only %i bytes left.",
+ (unsigned int) exp_size, buffer_len);
return (-1);
}
- pv.values_types = (uint8_t *) (pv.num_values + 1);
- pv.values = (value_t *) (pv.values_types + h_num);
+ if (pkg_length != exp_size)
+ {
+ WARNING ("network plugin: parse_part_values: "
+ "Length and number of values "
+ "in the packet don't match.");
+ return (-1);
+ }
- for (i = 0; i < h_num; i++)
- if (pv.values_types[i] == DS_TYPE_COUNTER)
- pv.values[i].counter = ntohll (pv.values[i].counter);
- else
- pv.values[i].gauge = ntohd (pv.values[i].gauge);
+ pkg_types = (uint8_t *) malloc (pkg_numval * sizeof (uint8_t));
+ pkg_values = (value_t *) malloc (pkg_numval * sizeof (value_t));
+ if ((pkg_types == NULL) || (pkg_values == NULL))
+ {
+ sfree (pkg_types);
+ sfree (pkg_values);
+ ERROR ("network plugin: parse_part_values: malloc failed.");
+ return (-1);
+ }
- *ret_buffer = (void *) (pv.values + h_num);
- *ret_buffer_len = buffer_len - h_length;
- *ret_num_values = h_num;
- *ret_values = pv.values;
+ memcpy ((void *) pkg_types, (void *) buffer, pkg_numval * sizeof (uint8_t));
+ buffer += pkg_numval * sizeof (uint8_t);
+ memcpy ((void *) pkg_values, (void *) buffer, pkg_numval * sizeof (value_t));
+ buffer += pkg_numval * sizeof (value_t);
+
+ for (i = 0; i < pkg_numval; i++)
+ {
+ if (pkg_types[i] == DS_TYPE_COUNTER)
+ pkg_values[i].counter = ntohll (pkg_values[i].counter);
+ else if (pkg_types[i] == DS_TYPE_GAUGE)
+ pkg_values[i].gauge = ntohd (pkg_values[i].gauge);
+ }
+
+ *ret_buffer = buffer;
+ *ret_buffer_len = buffer_len - pkg_length;
+ *ret_num_values = pkg_numval;
+ *ret_values = pkg_values;
+
+ sfree (pkg_types);
return (0);
} /* int parse_part_values */
static int parse_part_number (void **ret_buffer, int *ret_buffer_len,
uint64_t *value)
{
- part_number_t pn;
- uint16_t len;
+ char *buffer = *ret_buffer;
+ int buffer_len = *ret_buffer_len;
- pn.head = (part_header_t *) *ret_buffer;
- pn.value = (uint64_t *) (pn.head + 1);
+ uint16_t tmp16;
+ uint64_t tmp64;
+ size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
- len = ntohs (pn.head->length);
- if (len != 12)
- return (-1);
- if (len > *ret_buffer_len)
+ uint16_t pkg_length;
+ uint16_t pkg_type;
+
+ if (buffer_len < exp_size)
+ {
+ WARNING ("network plugin: parse_part_number: "
+ "Packet too short: "
+ "Chunk of size %u expected, "
+ "but buffer has only %i bytes left.",
+ (unsigned int) exp_size, buffer_len);
return (-1);
- *value = ntohll (*pn.value);
+ }
+
+ memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
+ buffer += sizeof (tmp16);
+ pkg_type = ntohs (tmp16);
- *ret_buffer = (void *) (pn.value + 1);
- *ret_buffer_len -= len;
+ memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
+ buffer += sizeof (tmp16);
+ pkg_length = ntohs (tmp16);
+
+ memcpy ((void *) &tmp64, buffer, sizeof (tmp64));
+ buffer += sizeof (tmp64);
+ *value = ntohll (tmp64);
+
+ *ret_buffer = buffer;
+ *ret_buffer_len = buffer_len - pkg_length;
return (0);
} /* int parse_part_number */
{
char *buffer = *ret_buffer;
int buffer_len = *ret_buffer_len;
- part_string_t ps;
- uint16_t h_length;
- uint16_t h_type;
+ uint16_t tmp16;
+ size_t header_size = 2 * sizeof (uint16_t);
- DEBUG ("network plugin: parse_part_string: ret_buffer = %p;"
- " ret_buffer_len = %i; output = %p; output_len = %i;",
- *ret_buffer, *ret_buffer_len,
- (void *) output, output_len);
+ uint16_t pkg_length;
+ uint16_t pkg_type;
- ps.head = (part_header_t *) buffer;
+ if (buffer_len < header_size)
+ {
+ WARNING ("network plugin: parse_part_string: "
+ "Packet too short: "
+ "Chunk of at least size %u expected, "
+ "but buffer has only %i bytes left.",
+ (unsigned int) header_size, buffer_len);
+ return (-1);
+ }
- h_length = ntohs (ps.head->length);
- h_type = ntohs (ps.head->type);
+ memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
+ buffer += sizeof (tmp16);
+ pkg_type = ntohs (tmp16);
- DEBUG ("network plugin: parse_part_string: length = %hu; type = %hu;",
- h_length, h_type);
+ memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
+ buffer += sizeof (tmp16);
+ pkg_length = ntohs (tmp16);
- if (buffer_len < h_length)
+ /* Check that packet fits in the input buffer */
+ if (pkg_length > buffer_len)
{
- DEBUG ("packet is too short");
+ WARNING ("network plugin: parse_part_string: "
+ "Packet too big: "
+ "Chunk of size %hu received, "
+ "but buffer has only %i bytes left.",
+ pkg_length, buffer_len);
return (-1);
}
- assert ((h_type == TYPE_HOST)
- || (h_type == TYPE_PLUGIN)
- || (h_type == TYPE_PLUGIN_INSTANCE)
- || (h_type == TYPE_TYPE)
- || (h_type == TYPE_TYPE_INSTANCE)
- || (h_type == TYPE_MESSAGE));
-
- ps.value = buffer + 4;
- if (ps.value[h_length - 5] != '\0')
+
+ /* Check that pkg_length is in the valid range */
+ if (pkg_length <= header_size)
{
- DEBUG ("String does not end with a nullbyte");
+ WARNING ("network plugin: parse_part_string: "
+ "Packet too short: "
+ "Header claims this packet is only %hu "
+ "bytes long.", pkg_length);
return (-1);
}
- if (output_len < (h_length - 4))
+ /* Check that the package data fits into the output buffer.
+ * The previous if-statement ensures that:
+ * `pkg_length > header_size' */
+ if ((pkg_length - header_size) > output_len)
{
- DEBUG ("output buffer is too small");
+ WARNING ("network plugin: parse_part_string: "
+ "Output buffer too small.");
return (-1);
}
- strcpy (output, ps.value);
- DEBUG ("network plugin: parse_part_string: output = %s", output);
+ /* All sanity checks successfull, let's copy the data over */
+ output_len = pkg_length - header_size;
+ memcpy ((void *) output, (void *) buffer, output_len);
+ buffer += output_len;
- *ret_buffer = (void *) (buffer + h_length);
- *ret_buffer_len = buffer_len - h_length;
+ /* For some very weird reason '\0' doesn't do the trick on SPARC in
+ * this statement. */
+ if (output[output_len - 1] != 0)
+ {
+ WARNING ("network plugin: parse_part_string: "
+ "Received string does not end "
+ "with a NULL-byte.");
+ return (-1);
+ }
+
+ *ret_buffer = buffer;
+ *ret_buffer_len = buffer_len - pkg_length;
return (0);
} /* int parse_part_string */
static int parse_packet (void *buffer, int buffer_len)
{
- part_header_t *header;
int status;
value_list_t vl = VALUE_LIST_INIT;
while ((status == 0) && (0 < buffer_len)
&& ((unsigned int) buffer_len > sizeof (part_header_t)))
{
- header = (part_header_t *) buffer;
+ uint16_t pkg_length;
+ uint16_t pkg_type;
+
+ memcpy ((void *) &pkg_type,
+ (void *) buffer,
+ sizeof (pkg_type));
+ memcpy ((void *) &pkg_length,
+ (void *) (buffer + sizeof (pkg_type)),
+ sizeof (pkg_length));
- if (ntohs (header->length) > buffer_len)
+ pkg_length = ntohs (pkg_length);
+ pkg_type = ntohs (pkg_type);
+
+ if (pkg_length > buffer_len)
break;
- /* Assure that this loop terminates eventually */
- if (ntohs (header->length) < 4)
+ /* Ensure that this loop terminates eventually */
+ if (pkg_length < (2 * sizeof (uint16_t)))
break;
- if (ntohs (header->type) == TYPE_VALUES)
+ if (pkg_type == TYPE_VALUES)
{
status = parse_part_values (&buffer, &buffer_len,
&vl.values, &vl.values_len);
if (status != 0)
- {
- DEBUG ("parse_part_values failed.");
break;
- }
if ((vl.time > 0)
&& (strlen (vl.host) > 0)
&& (strlen (type) > 0)
&& (cache_check (type, &vl) == 0))
{
- DEBUG ("network plugin: parse_packet:"
- " dispatching values");
plugin_dispatch_values (type, &vl);
}
else
DEBUG ("network plugin: parse_packet:"
" NOT dispatching values");
}
+
+ sfree (vl.values);
}
- else if (ntohs (header->type) == TYPE_TIME)
+ else if (pkg_type == TYPE_TIME)
{
uint64_t tmp = 0;
- status = parse_part_number (&buffer, &buffer_len, &tmp);
+ status = parse_part_number (&buffer, &buffer_len,
+ &tmp);
if (status == 0)
{
vl.time = (time_t) tmp;
n.time = (time_t) tmp;
}
}
- else if (ntohs (header->type) == TYPE_INTERVAL)
+ else if (pkg_type == TYPE_INTERVAL)
{
uint64_t tmp = 0;
- status = parse_part_number (&buffer, &buffer_len, &tmp);
+ status = parse_part_number (&buffer, &buffer_len,
+ &tmp);
if (status == 0)
vl.interval = (int) tmp;
}
- else if (ntohs (header->type) == TYPE_HOST)
+ else if (pkg_type == TYPE_HOST)
{
status = parse_part_string (&buffer, &buffer_len,
vl.host, sizeof (vl.host));
- strncpy (n.host, vl.host, sizeof (n.host));
- n.host[sizeof (n.host) - 1] = '\0';
- DEBUG ("network plugin: parse_packet: vl.host = %s",
- vl.host);
+ if (status == 0)
+ sstrncpy (n.host, vl.host, sizeof (n.host));
}
- else if (ntohs (header->type) == TYPE_PLUGIN)
+ else if (pkg_type == TYPE_PLUGIN)
{
status = parse_part_string (&buffer, &buffer_len,
vl.plugin, sizeof (vl.plugin));
- strncpy (n.plugin, vl.plugin, sizeof (n.plugin));
- n.plugin[sizeof (n.plugin) - 1] = '\0';
- DEBUG ("network plugin: parse_packet: vl.plugin = %s",
- vl.plugin);
+ if (status == 0)
+ sstrncpy (n.plugin, vl.plugin,
+ sizeof (n.plugin));
}
- else if (ntohs (header->type) == TYPE_PLUGIN_INSTANCE)
+ else if (pkg_type == TYPE_PLUGIN_INSTANCE)
{
status = parse_part_string (&buffer, &buffer_len,
vl.plugin_instance,
sizeof (vl.plugin_instance));
- strncpy (n.plugin_instance, vl.plugin_instance,
- sizeof (n.plugin_instance));
- n.plugin_instance[sizeof (n.plugin_instance) - 1] = '\0';
- DEBUG ("network plugin: parse_packet: "
- "vl.plugin_instance = %s",
- vl.plugin_instance);
+ if (status == 0)
+ sstrncpy (n.plugin_instance,
+ vl.plugin_instance,
+ sizeof (n.plugin_instance));
}
- else if (ntohs (header->type) == TYPE_TYPE)
+ else if (pkg_type == TYPE_TYPE)
{
status = parse_part_string (&buffer, &buffer_len,
type, sizeof (type));
- strncpy (n.type, type, sizeof (n.type));
- n.type[sizeof (n.type) - 1] = '\0';
- DEBUG ("network plugin: parse_packet: type = %s",
- type);
+ if (status == 0)
+ sstrncpy (n.type, type, sizeof (n.type));
}
- else if (ntohs (header->type) == TYPE_TYPE_INSTANCE)
+ else if (pkg_type == TYPE_TYPE_INSTANCE)
{
status = parse_part_string (&buffer, &buffer_len,
vl.type_instance,
sizeof (vl.type_instance));
- strncpy (n.type_instance, vl.type_instance,
- sizeof (n.type_instance));
- n.type_instance[sizeof (n.type_instance) - 1] = '\0';
- DEBUG ("network plugin: parse_packet: "
- "vl.type_instance = %s",
- vl.type_instance);
+ if (status == 0)
+ sstrncpy (n.type_instance, vl.type_instance,
+ sizeof (n.type_instance));
}
- else if (ntohs (header->type) == TYPE_MESSAGE)
+ else if (pkg_type == TYPE_MESSAGE)
{
status = parse_part_string (&buffer, &buffer_len,
n.message, sizeof (n.message));
- DEBUG ("network plugin: parse_packet: n.message = %s",
- n.message);
- if ((n.severity != NOTIF_FAILURE)
+ if (status != 0)
+ {
+ /* do nothing */
+ }
+ else if ((n.severity != NOTIF_FAILURE)
&& (n.severity != NOTIF_WARNING)
&& (n.severity != NOTIF_OKAY))
{
INFO ("network plugin: "
"Ignoring notification with "
- "unknown severity %s.",
+ "unknown severity %i.",
n.severity);
}
else if (n.time <= 0)
}
else
{
- /*
- * TODO: Let this do a separate thread so that
- * no packets are lost if this takes too long.
- */
plugin_dispatch_notification (&n);
}
}
- else if (ntohs (header->type) == TYPE_SEVERITY)
+ else if (pkg_type == TYPE_SEVERITY)
{
uint64_t tmp = 0;
- status = parse_part_number (&buffer, &buffer_len, &tmp);
+ status = parse_part_number (&buffer, &buffer_len,
+ &tmp);
if (status == 0)
n.severity = (int) tmp;
}
else
{
DEBUG ("network plugin: parse_packet: Unknown part"
- " type: 0x%0hx", ntohs (header->type));
- buffer = ((char *) buffer) + ntohs (header->length);
+ " type: 0x%04hx", pkg_type);
+ buffer = ((char *) buffer) + pkg_length;
}
} /* while (buffer_len > sizeof (part_header_t)) */
- return (0);
+ return (status);
} /* int parse_packet */
static void free_sockent (sockent_t *se)
return (0);
} /* int network_init */
+static int network_flush (int timeout)
+{
+ pthread_mutex_lock (&send_buffer_lock);
+
+ if (((time (NULL) - cache_flush_last) >= timeout)
+ && (send_buffer_fill > 0))
+ {
+ flush_buffer ();
+ }
+
+ pthread_mutex_unlock (&send_buffer_lock);
+
+ return (0);
+} /* int network_flush */
+
void module_register (void)
{
plugin_register_config ("network", network_config,
config_keys, config_keys_num);
plugin_register_init ("network", network_init);
+ plugin_register_flush ("network", network_flush);
} /* void module_register */
diff --combined src/plugin.c
index eaf1a41079193d2139d72dde17056f54c481513b,0570f0ee0dc37269ce66fb724fe7f6abd64ac7dc..cf0384918a7e8eec902094d6f88d46053ee5b116
--- 1/src/plugin.c
--- 2/src/plugin.c
+++ b/src/plugin.c
/**
* collectd - src/plugin.c
- * Copyright (C) 2005,2006 Florian octo Forster
+ * Copyright (C) 2005-2008 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
*
* Authors:
* Florian octo Forster <octo at verplant.org>
+ * Sebastian Harl <sh at tokkee.org>
**/
#include "collectd.h"
static llist_t *list_init;
static llist_t *list_read;
static llist_t *list_write;
+static llist_t *list_flush;
static llist_t *list_shutdown;
static llist_t *list_log;
static llist_t *list_notification;
pthread_mutex_unlock (&read_lock);
pthread_exit (NULL);
+ return ((void *) 0);
} /* void *plugin_read_thread */
static void start_threads (int num)
return (register_callback (&list_write, name, (void *) callback));
} /* int plugin_register_write */
+int plugin_register_flush (const char *name, int (*callback) (const int))
+{
+ return (register_callback (&list_flush, name, (void *) callback));
+} /* int plugin_register_flush */
+
int plugin_register_shutdown (char *name,
int (*callback) (void))
{
return (plugin_unregister (list_write, name));
}
+int plugin_unregister_flush (const char *name)
+{
+ return (plugin_unregister (list_flush, name));
+}
+
int plugin_unregister_shutdown (const char *name)
{
return (plugin_unregister (list_shutdown, name));
pthread_mutex_unlock (&read_lock);
} /* void plugin_read_all */
+int plugin_flush_one (int timeout, const char *name)
+{
+ int (*callback) (int);
+ llentry_t *le;
+ int status;
+
+ if (list_flush == NULL)
+ return (-1);
+
+ le = llist_search (list_flush, name);
+ if (le == NULL)
+ return (-1);
+ callback = (int (*) (int)) le->value;
+
+ status = (*callback) (timeout);
+
+ return (status);
+} /* int plugin_flush_ont */
+
+void plugin_flush_all (int timeout)
+{
+ int (*callback) (int);
+ llentry_t *le;
+
+ if (list_flush == NULL)
+ return;
+
+ le = llist_head (list_flush);
+ while (le != NULL)
+ {
+ callback = (int (*) (int)) le->value;
+ le = le->next;
+
+ (*callback) (timeout);
+ }
+} /* void plugin_flush_all */
+
void plugin_shutdown_all (void)
{
int (*callback) (void);
diff --combined src/unixsock.c
index a7618c0f9a4a2d5366bb1548e931804a938a6147,837a902135b12e7429a767377263afb77405a04a..5845f9de1c8c8514989a836683dbbd5f568b273f
--- 1/src/unixsock.c
--- 2/src/unixsock.c
+++ b/src/unixsock.c
#include "plugin.h"
#include "configfile.h"
+#include "utils_cmd_flush.h"
+#include "utils_cmd_getval.h"
#include "utils_cmd_putval.h"
#include "utils_cmd_putnotif.h"
return (0);
} /* int us_open_socket */
-static int us_handle_getval (FILE *fh, char **fields, int fields_num)
-{
- char *hostname;
- char *plugin;
- char *plugin_instance;
- char *type;
- char *type_instance;
- char name[4*DATA_MAX_NAME_LEN];
- value_cache_t *vc;
- int status;
- int i;
-
- if (fields_num != 2)
- {
- DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num);
- fprintf (fh, "-1 Wrong number of fields: Got %i, expected 2.\n",
- fields_num);
- fflush (fh);
- return (-1);
- }
- DEBUG ("unixsock plugin: Got query for `%s'", fields[1]);
-
- status = parse_identifier (fields[1], &hostname,
- &plugin, &plugin_instance,
- &type, &type_instance);
- if (status != 0)
- {
- DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]);
- fprintf (fh, "-1 Cannot parse identifier.\n");
- fflush (fh);
- return (-1);
- }
-
- status = format_name (name, sizeof (name),
- hostname, plugin, plugin_instance, type, type_instance);
- if (status != 0)
- {
- fprintf (fh, "-1 format_name failed.\n");
- return (-1);
- }
-
- pthread_mutex_lock (&cache_lock);
-
- DEBUG ("vc = cache_search (%s)", name);
- vc = cache_search (name);
-
- if (vc == NULL)
- {
- DEBUG ("Did not find cache entry.");
- fprintf (fh, "-1 No such value");
- }
- else
- {
- DEBUG ("Found cache entry.");
- fprintf (fh, "%i", vc->values_num);
- for (i = 0; i < vc->values_num; i++)
- {
- fprintf (fh, " %s=", vc->ds->ds[i].name);
- if (isnan (vc->gauge[i]))
- fprintf (fh, "NaN");
- else
- fprintf (fh, "%12e", vc->gauge[i]);
- }
- }
-
- /* Free the mutex as soon as possible and definitely before flushing */
- pthread_mutex_unlock (&cache_lock);
-
- fprintf (fh, "\n");
- fflush (fh);
-
- return (0);
-} /* int us_handle_getval */
-
static int us_handle_listval (FILE *fh, char **fields, int fields_num)
{
char buffer[1024];
if (strcasecmp (fields[0], "getval") == 0)
{
- us_handle_getval (fh, fields, fields_num);
+ handle_getval (fh, fields, fields_num);
}
else if (strcasecmp (fields[0], "putval") == 0)
{
{
handle_putnotif (fh, fields, fields_num);
}
+ else if (strcasecmp (fields[0], "flush") == 0)
+ {
+ handle_flush (fh, fields, fields_num);
+ }
else
{
fprintf (fh, "-1 Unknown command: %s\n", fields[0]);
close (fd);
pthread_exit ((void *) 0);
+ return ((void *) 0);
} /* void *us_handle_client */
static void *us_server_thread (void *arg)
diff --combined src/utils_cache.c
index ad8fb199f175e02e39625df41ecf82abfb559abd,b9b896215be1cb631f5c1b0252bcf4c341af6846..c471ee28d29e1f87bf6a38a856debb932b7bf1c2
--- 1/src/utils_cache.c
--- 2/src/utils_cache.c
+++ b/src/utils_cache.c
return (0);
} /* int uc_send_notification */
+ static int uc_insert (const data_set_t *ds, const value_list_t *vl,
+ const char *key)
+ {
+ int i;
+ char *key_copy;
+ cache_entry_t *ce;
+
+ /* `cache_lock' has been locked by `uc_update' */
+
+ key_copy = strdup (key);
+ if (key_copy == NULL)
+ {
+ ERROR ("uc_insert: strdup failed.");
+ return (-1);
+ }
+
+ ce = cache_alloc (ds->ds_num);
+ if (ce == NULL)
+ {
+ ERROR ("uc_insert: cache_alloc (%i) failed.", ds->ds_num);
+ return (-1);
+ }
+
+ sstrncpy (ce->name, key, sizeof (ce->name));
+
+ for (i = 0; i < ds->ds_num; i++)
+ {
+ if (ds->ds[i].type == DS_TYPE_COUNTER)
+ {
+ ce->values_gauge[i] = NAN;
+ ce->values_counter[i] = vl->values[i].counter;
+ }
+ else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
+ {
+ ce->values_gauge[i] = vl->values[i].gauge;
+ }
+ } /* for (i) */
+
+ ce->last_time = vl->time;
+ ce->last_update = time (NULL);
+ ce->interval = vl->interval;
+ ce->state = STATE_OKAY;
+
+ if (c_avl_insert (cache_tree, key_copy, ce) != 0)
+ {
+ sfree (key_copy);
+ ERROR ("uc_insert: c_avl_insert failed.");
+ return (-1);
+ }
+
+ DEBUG ("uc_insert: Added %s to the cache.", key);
+ return (0);
+ } /* int uc_insert */
+
int uc_init (void)
{
if (cache_tree == NULL)
else if (status == 0) /* ``service'' is uninteresting */
{
ce = NULL;
- DEBUG ("uc_check_timeout: %s is missing but ``uninteresting''", keys[i]);
- status = c_avl_remove (cache_tree, keys[i], (void *) &key, (void *) &ce);
+ DEBUG ("uc_check_timeout: %s is missing but ``uninteresting''",
+ keys[i]);
+ status = c_avl_remove (cache_tree, keys[i],
+ (void *) &key, (void *) &ce);
if (status != 0)
{
ERROR ("uc_check_timeout: c_avl_remove (%s) failed.", keys[i]);
sfree (keys[i]);
cache_free (ce);
}
- else /* (status > 0); ``service'' is interesting */
+ else if (status == 1) /* persist */
+ {
+ DEBUG ("uc_check_timeout: %s is missing, sending notification.",
+ keys[i]);
+ ce->state = STATE_MISSING;
+ }
+ else if (status == 2) /* do not persist */
{
- /*
- * `keys[i]' is not freed and set to NULL, so that the for-loop below
- * will send out notifications. There's nothing else to do here.
- */
- DEBUG ("uc_check_timeout: %s is missing and ``interesting''", keys[i]);
- ce->state = STATE_ERROR;
+ if (ce->state == STATE_MISSING)
+ {
+ DEBUG ("uc_check_timeout: %s is missing but "
+ "notification has already been sent.",
+ keys[i]);
+ sfree (keys[i]);
+ }
+ else /* (ce->state != STATE_MISSING) */
+ {
+ DEBUG ("uc_check_timeout: %s is missing, sending one notification.",
+ keys[i]);
+ ce->state = STATE_MISSING;
+ }
+ }
+ else
+ {
+ WARNING ("uc_check_timeout: ut_check_interesting (%s) returned ",
+ "invalid status %i.",
+ keys[i], status);
}
} /* for (keys[i]) */
char name[6 * DATA_MAX_NAME_LEN];
cache_entry_t *ce = NULL;
int send_okay_notification = 0;
+ time_t update_delay = 0;
+ notification_t n;
+ int status;
+ int i;
if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
{
- ERROR ("uc_insert: FORMAT_VL failed.");
+ ERROR ("uc_update: FORMAT_VL failed.");
return (-1);
}
pthread_mutex_lock (&cache_lock);
- if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
+ status = c_avl_get (cache_tree, name, (void *) &ce);
+ if (status != 0) /* entry does not yet exist */
{
- int i;
-
- assert (ce != NULL);
- assert (ce->values_num == ds->ds_num);
-
- if (ce->last_time >= vl->time)
- {
- pthread_mutex_unlock (&cache_lock);
- NOTICE ("uc_insert: Value too old: name = %s; value time = %u; "
- "last cache update = %u;",
- name, (unsigned int) vl->time, (unsigned int) ce->last_time);
- return (-1);
- }
-
- if ((ce->last_time + ce->interval) < vl->time)
- {
- send_okay_notification = vl->time - ce->last_time;
- ce->state = STATE_OKAY;
- }
+ status = uc_insert (ds, vl, name);
+ pthread_mutex_unlock (&cache_lock);
+ return (status);
+ }
- for (i = 0; i < ds->ds_num; i++)
- {
- if (ds->ds[i].type == DS_TYPE_COUNTER)
- {
- counter_t diff;
-
- /* check if the counter has wrapped around */
- if (vl->values[i].counter < ce->values_counter[i])
- {
- if (ce->values_counter[i] <= 4294967295U)
- diff = (4294967295U - ce->values_counter[i])
- + vl->values[i].counter;
- else
- diff = (18446744073709551615ULL - ce->values_counter[i])
- + vl->values[i].counter;
- }
- else /* counter has NOT wrapped around */
- {
- diff = vl->values[i].counter - ce->values_counter[i];
- }
-
- ce->values_gauge[i] = ((double) diff)
- / ((double) (vl->time - ce->last_time));
- ce->values_counter[i] = vl->values[i].counter;
- }
- else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
- {
- ce->values_gauge[i] = vl->values[i].gauge;
- }
- DEBUG ("uc_insert: %s: ds[%i] = %lf", name, i, ce->values_gauge[i]);
- } /* for (i) */
+ assert (ce != NULL);
+ assert (ce->values_num == ds->ds_num);
- ce->last_time = vl->time;
- ce->last_update = time (NULL);
- ce->interval = vl->interval;
+ if (ce->last_time >= vl->time)
+ {
+ pthread_mutex_unlock (&cache_lock);
+ NOTICE ("uc_update: Value too old: name = %s; value time = %u; "
+ "last cache update = %u;",
+ name, (unsigned int) vl->time, (unsigned int) ce->last_time);
+ return (-1);
}
- else /* key is not found */
+
+ /* Send a notification (after the lock has been released) if we switch the
+ * state from something else to `okay'. */
+ if (ce->state == STATE_MISSING)
{
- int i;
- char *key;
-
- key = strdup (name);
- if (key == NULL)
- {
- pthread_mutex_unlock (&cache_lock);
- ERROR ("uc_insert: strdup failed.");
- return (-1);
- }
+ send_okay_notification = 1;
+ ce->state = STATE_OKAY;
+ update_delay = time (NULL) - ce->last_update;
+ }
- ce = cache_alloc (ds->ds_num);
- if (ce == NULL)
+ for (i = 0; i < ds->ds_num; i++)
+ {
+ if (ds->ds[i].type == DS_TYPE_COUNTER)
{
- pthread_mutex_unlock (&cache_lock);
- ERROR ("uc_insert: cache_alloc (%i) failed.", ds->ds_num);
- return (-1);
- }
+ counter_t diff;
- sstrncpy (ce->name, name, sizeof (ce->name));
-
- for (i = 0; i < ds->ds_num; i++)
- {
- if (ds->ds[i].type == DS_TYPE_COUNTER)
+ /* check if the counter has wrapped around */
+ if (vl->values[i].counter < ce->values_counter[i])
{
- ce->values_gauge[i] = NAN;
- ce->values_counter[i] = vl->values[i].counter;
+ if (ce->values_counter[i] <= 4294967295U)
+ diff = (4294967295U - ce->values_counter[i])
+ + vl->values[i].counter;
+ else
+ diff = (18446744073709551615ULL - ce->values_counter[i])
+ + vl->values[i].counter;
}
- else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
+ else /* counter has NOT wrapped around */
{
- ce->values_gauge[i] = vl->values[i].gauge;
+ diff = vl->values[i].counter - ce->values_counter[i];
}
- } /* for (i) */
-
- ce->last_time = vl->time;
- ce->last_update = time (NULL);
- ce->interval = vl->interval;
- ce->state = STATE_OKAY;
- if (c_avl_insert (cache_tree, key, ce) != 0)
+ ce->values_gauge[i] = ((double) diff)
+ / ((double) (vl->time - ce->last_time));
+ ce->values_counter[i] = vl->values[i].counter;
+ }
+ else /* if (ds->ds[i].type == DS_TYPE_GAUGE) */
{
- pthread_mutex_unlock (&cache_lock);
- ERROR ("uc_insert: c_avl_insert failed.");
- return (-1);
+ ce->values_gauge[i] = vl->values[i].gauge;
}
+ DEBUG ("uc_update: %s: ds[%i] = %lf", name, i, ce->values_gauge[i]);
+ } /* for (i) */
- DEBUG ("uc_insert: Added %s to the cache.", name);
- } /* if (key is not found) */
+ ce->last_time = vl->time;
+ ce->last_update = time (NULL);
+ ce->interval = vl->interval;
pthread_mutex_unlock (&cache_lock);
+ if (send_okay_notification == 0)
+ return (0);
+
/* Do not send okay notifications for uninteresting values, i. e. values for
* which no threshold is configured. */
- if (send_okay_notification > 0)
- {
- int status;
-
- status = ut_check_interesting (name);
- if (status <= 0)
- send_okay_notification = 0;
- }
-
- if (send_okay_notification > 0)
- {
- notification_t n;
- memset (&n, '\0', sizeof (n));
+ status = ut_check_interesting (name);
+ if (status <= 0)
+ return (0);
- /* Copy the associative members */
- NOTIFICATION_INIT_VL (&n, vl, ds);
+ /* Initialize the notification */
+ memset (&n, '\0', sizeof (n));
+ NOTIFICATION_INIT_VL (&n, vl, ds);
- n.severity = NOTIF_OKAY;
- n.time = vl->time;
+ n.severity = NOTIF_OKAY;
+ n.time = vl->time;
- snprintf (n.message, sizeof (n.message),
- "Received a value for %s. It was missing for %i seconds.",
- name, send_okay_notification);
- n.message[sizeof (n.message) - 1] = '\0';
+ snprintf (n.message, sizeof (n.message),
+ "Received a value for %s. It was missing for %u seconds.",
+ name, (unsigned int) update_delay);
+ n.message[sizeof (n.message) - 1] = '\0';
- plugin_dispatch_notification (&n);
- }
+ plugin_dispatch_notification (&n);
return (0);
- } /* int uc_insert */
+ } /* int uc_update */
-gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl)
+int uc_get_rate_by_name (const char *name, gauge_t **ret_values, size_t *ret_values_num)
{
- char name[6 * DATA_MAX_NAME_LEN];
gauge_t *ret = NULL;
+ size_t ret_num = 0;
cache_entry_t *ce = NULL;
+ int status = 0;
+ if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
+ {
+ ERROR ("uc_get_rate: FORMAT_VL failed.");
+ return (NULL);
+ }
+
pthread_mutex_lock (&cache_lock);
if (c_avl_get (cache_tree, name, (void *) &ce) == 0)
{
assert (ce != NULL);
- assert (ce->values_num == ds->ds_num);
- ret = (gauge_t *) malloc (ce->values_num * sizeof (gauge_t));
+ ret_num = ce->values_num;
+ ret = (gauge_t *) malloc (ret_num * sizeof (gauge_t));
if (ret == NULL)
{
- ERROR ("uc_get_rate: malloc failed.");
+ ERROR ("utils_cache: uc_get_rate_by_name: malloc failed.");
+ status = -1;
}
else
{
- memcpy (ret, ce->values_gauge, ce->values_num * sizeof (gauge_t));
+ memcpy (ret, ce->values_gauge, ret_num * sizeof (gauge_t));
}
}
+ else
+ {
+ DEBUG ("utils_cache: uc_get_rate_by_name: No such value: %s", name);
+ status = -1;
+ }
pthread_mutex_unlock (&cache_lock);
+ if (status == 0)
+ {
+ *ret_values = ret;
+ *ret_values_num = ret_num;
+ }
+
+ return (status);
+} /* gauge_t *uc_get_rate_by_name */
+
+gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl)
+{
+ char name[6 * DATA_MAX_NAME_LEN];
+ gauge_t *ret = NULL;
+ size_t ret_num = 0;
+ int status;
+
+ if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
+ {
+ ERROR ("uc_insert: FORMAT_VL failed.");
+ return (NULL);
+ }
+
+ status = uc_get_rate_by_name (name, &ret, &ret_num);
+ if (status != 0)
+ return (NULL);
+
+ /* This is important - the caller has no other way of knowing how many
+ * values are returned. */
+ if (ret_num != ds->ds_num)
+ {
+ ERROR ("utils_cache: uc_get_rate: ds[%s] has %i values, "
+ "but uc_get_rate_by_name returned %i.",
+ ds->type, ds->ds_num, ret_num);
+ sfree (ret);
+ return (NULL);
+ }
+
return (ret);
} /* gauge_t *uc_get_rate */
cache_entry_t *ce = NULL;
int ret = -1;
- if (state < STATE_OKAY)
- state = STATE_OKAY;
- if (state > STATE_ERROR)
- state = STATE_ERROR;
-
if (FORMAT_VL (name, sizeof (name), vl, ds) != 0)
{
ERROR ("uc_get_state: FORMAT_VL failed.");
diff --combined src/utils_cache.h
index ed6830b646b57cf370ec3868831a20f6492c6e89,51d9c031385d026da27203bb8d12b7c4c40c928a..9b6972afdb6a17099391038e89ecf7b4985374eb
--- 1/src/utils_cache.h
--- 2/src/utils_cache.h
+++ b/src/utils_cache.h
#include "plugin.h"
- #define STATE_OKAY 0
- #define STATE_WARNING 1
- #define STATE_ERROR 2
+ #define STATE_OKAY 0
+ #define STATE_WARNING 1
+ #define STATE_ERROR 2
+ #define STATE_MISSING 15
int uc_init (void);
int uc_check_timeout (void);
int uc_update (const data_set_t *ds, const value_list_t *vl);
+int uc_get_rate_by_name (const char *name, gauge_t **ret_values, size_t *ret_values_num);
gauge_t *uc_get_rate (const data_set_t *ds, const value_list_t *vl);
int uc_get_state (const data_set_t *ds, const value_list_t *vl);