From ff8752c6bbda5e81d5713300ac4ad60d162dedc3 Mon Sep 17 00:00:00 2001 From: Jeremy Katz Date: Wed, 26 Nov 2014 18:48:44 -0500 Subject: [PATCH] Add a plugin for monitoring zookeeper This adds a simple plugin to read data from Zookeeper's MNTR command to allow for easy monitoring of Zookeeper from collectd without requiring jmx --- configure.ac | 3 + src/Makefile.am | 6 + src/collectd.conf.in | 6 + src/types.db | 3 + src/zookeeper.c | 322 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 340 insertions(+) create mode 100644 src/zookeeper.c diff --git a/configure.ac b/configure.ac index 332ed521..bf11ae99 100644 --- a/configure.ac +++ b/configure.ac @@ -5140,6 +5140,7 @@ plugin_vmem="no" plugin_vserver="no" plugin_wireless="no" plugin_zfs_arc="no" +plugin_zookeeper="no" # Linux if test "x$ac_system" = "xLinux" @@ -5583,6 +5584,7 @@ AC_PLUGIN([write_riemann], [$have_protoc_c], [Riemann output plugin]) AC_PLUGIN([write_tsdb], [yes], [TSDB output plugin]) AC_PLUGIN([xmms], [$with_libxmms], [XMMS statistics]) AC_PLUGIN([zfs_arc], [$plugin_zfs_arc], [ZFS ARC statistics]) +AC_PLUGIN([zookeeper], [yes], [Zookeeper statistics]) dnl Default configuration file # Load either syslog or logfile @@ -5951,6 +5953,7 @@ Configuration: write_tsdb . . . . . $enable_write_tsdb xmms . . . . . . . . $enable_xmms zfs_arc . . . . . . . $enable_zfs_arc + zookeeper . . . . . . $enable_zookeeper EOF diff --git a/src/Makefile.am b/src/Makefile.am index e9428690..942e6a06 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1230,6 +1230,12 @@ endif endif endif +if BUILD_PLUGIN_ZOOKEEPER +pkglib_LTLIBRARIES += zookeeper.la +zookeeper_la_SOURCES = zookeeper.c +zookeeper_la_LDFLAGS = $(PLUGIN_LDFLAGS) +endif + BUILT_SOURCES += $(dist_man_MANS) dist_man_MANS = collectd.1 \ diff --git a/src/collectd.conf.in b/src/collectd.conf.in index a25cce55..2454045d 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -204,6 +204,7 @@ #@BUILD_PLUGIN_WRITE_TSDB_TRUE@LoadPlugin write_tsdb #@BUILD_PLUGIN_XMMS_TRUE@LoadPlugin xmms #@BUILD_PLUGIN_ZFS_ARC_TRUE@LoadPlugin zfs_arc +#@BUILD_PLUGIN_ZOOKEEPER_TRUE@LoadPlugin zookeeper ############################################################################## # Plugin configuration # @@ -1342,6 +1343,11 @@ # # +# +# Host "localhost" +# Port "2181" +# + ############################################################################## # Filter configuration # #----------------------------------------------------------------------------# diff --git a/src/types.db b/src/types.db index df54472b..2bf7a172 100644 --- a/src/types.db +++ b/src/types.db @@ -22,6 +22,7 @@ compression uncompressed:DERIVE:0:U, compressed:DERIVE:0:U connections value:DERIVE:0:U conntrack value:GAUGE:0:4294967295 contextswitch value:DERIVE:0:U +count value:GAUGE:0:U counter value:COUNTER:U:U cpufreq value:GAUGE:0:U cpu value:DERIVE:0:U @@ -131,6 +132,7 @@ node_stat value:DERIVE:0:U node_tx_rate value:GAUGE:0:127 objects value:GAUGE:0:U operations value:DERIVE:0:U +packets value:DERIVE:0:U pending_operations value:GAUGE:0:U percent value:GAUGE:0:100.1 percent_bytes value:GAUGE:0:100.1 @@ -220,6 +222,7 @@ voltage value:GAUGE:U:U vs_memory value:GAUGE:0:9223372036854775807 vs_processes value:GAUGE:0:65535 vs_threads value:GAUGE:0:65535 +zk_approximate_data_size value:GAUGE:0:U # # Legacy types diff --git a/src/zookeeper.c b/src/zookeeper.c new file mode 100644 index 00000000..e80ed4d2 --- /dev/null +++ b/src/zookeeper.c @@ -0,0 +1,322 @@ +/** + * collectd - src/zookeeper.c + * Copyright (C) 2014 Google, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Jeremy Katz + **/ + +#include "collectd.h" +#include "common.h" +#include "plugin.h" + +#include +#include +#include +#include +#include + +#define ZOOKEEPER_DEF_HOST "127.0.0.1" +#define ZOOKEEPER_DEF_PORT "2181" + +static char *zk_host = NULL; +static char *zk_port = NULL; + +static const char *config_keys[] = +{ + "Host", + "Port" +}; +static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); + +static int zookeeper_config(const char *key, const char *value) +{ + if (strncmp(key, "Host", strlen("Host")) == 0) + { + sfree (zk_host); + zk_host = strdup (value); + } + else if (strncmp(key, "Port", strlen("Port")) == 0) + { + sfree (zk_port); + zk_port = strdup (value); + } + else + { + return -1; + } + return 0; +} + +static void zookeeper_submit_gauge (const char * type, const char * type_inst, gauge_t val) +{ + value_t values[1]; + value_list_t vl = VALUE_LIST_INIT; + + values[0].gauge = val; + + vl.values = values; + vl.values_len = 1; + sstrncpy (vl.host, hostname_g, sizeof (vl.host)); + sstrncpy (vl.plugin, "zookeeper", sizeof (vl.plugin)); + sstrncpy (vl.type, type, sizeof (vl.type)); + if (type_inst != NULL) + sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance)); + + plugin_dispatch_values (&vl); +} /* zookeeper_submit_gauge */ + +static void zookeeper_submit_derive (const char * type, const char * type_inst, derive_t val) +{ + value_t values[1]; + value_list_t vl = VALUE_LIST_INIT; + + values[0].derive = val; + + vl.values = values; + vl.values_len = 1; + sstrncpy (vl.host, hostname_g, sizeof (vl.host)); + sstrncpy (vl.plugin, "zookeeper", sizeof (vl.plugin)); + sstrncpy (vl.type, type, sizeof (vl.type)); + if (type_inst != NULL) + sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance)); + + plugin_dispatch_values (&vl); +} /* zookeeper_submit_derive */ + +static int zookeeper_connect (void) +{ + int sk; + int status; + struct addrinfo ai_hints; + struct addrinfo *ai; + struct addrinfo *ai_list; + char *host; + char *port; + + memset ((void *) &ai_hints, '\0', sizeof (ai_hints)); + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_STREAM; + + host = (zk_host != NULL) ? zk_host : ZOOKEEPER_DEF_HOST; + port = (zk_port != NULL) ? zk_port : ZOOKEEPER_DEF_PORT; + status = getaddrinfo (host, port, &ai_hints, &ai_list); + if (status != 0) + { + char errbuf[1024]; + INFO ("getaddrinfo failed: %s", + (status == EAI_SYSTEM) + ? sstrerror (errno, errbuf, sizeof (errbuf)) + : gai_strerror (status)); + return (-1); + } + + for (ai = ai_list; ai != NULL; ai = ai->ai_next) + { + sk = socket (ai->ai_family, SOCK_STREAM, 0); + if (sk < 0) + { + char errbuf[1024]; + WARNING ("zookeeper: socket(2) failed: %s", + sstrerror (errno, errbuf, sizeof(errbuf))); + continue; + } + status = (int) connect (sk, ai->ai_addr, ai->ai_addrlen); + if (status != 0) + { + char errbuf[1024]; + close (sk); + sk = -1; + WARNING ("zookeeper: connect(2) failed: %s", + sstrerror (errno, errbuf, sizeof(errbuf))); + continue; + } + + /* connected */ + break; + } + + freeaddrinfo(ai_list); + return (sk); +} /* int zookeeper_connect */ + +static int zookeeper_query (char *buffer, size_t buffer_size) +{ + int sk = -1; + int status; + size_t buffer_fill; + + sk = zookeeper_connect(); + if (sk < 0) + { + ERROR ("zookeeper: Could not connect to daemon"); + return (-1); + } + + status = (int) swrite (sk, "mntr\r\n", strlen("mntr\r\n")); + if (status != 0) + { + char errbuf[1024]; + ERROR ("zookeeper: write(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + close (sk); + return (-1); + } + + memset (buffer, 0, buffer_size); + buffer_fill = 0; + + while ((status = (int) recv (sk, buffer + buffer_fill, + buffer_size - buffer_fill, /* flags = */ 0)) != 0) + { + if (status < 0) + { + char errbuf[1024]; + if ((errno == EAGAIN) || (errno == EINTR)) + continue; + ERROR ("zookeeper: Error reading from socket: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + close (sk); + return (-1); + } + + buffer_fill += (size_t) status; + if (status == 0) + { + /* done reading from the socket */ + break; + } + } /* while (recv) */ + + status = 0; + if (buffer_fill == 0) + { + WARNING ("zookeeper: No data returned by MNTR command."); + status = -1; + } + + close(sk); + return (status); +} /* int zookeeper_query */ + + +static int zookeeper_read (void) { + char buf[4096]; + char *ptr; + char *save_ptr; + char *line; + char *fields[2]; + + if (zookeeper_query (buf, sizeof (buf)) < 0) + { + return (-1); + } + + ptr = buf; + save_ptr = NULL; + while ((line = strtok_r (ptr, "\n\r", &save_ptr)) != NULL) + { + ptr = NULL; + if (strsplit(line, fields, 2) != 2) + { + continue; + } +#define FIELD_CHECK(check, expected) \ + (strncmp (check, expected, strlen(expected)) == 0) + + if (FIELD_CHECK (fields[0], "zk_avg_latency")) + { + zookeeper_submit_gauge ("latency", "avg", atol(fields[1])); + } + else if (FIELD_CHECK(fields[0], "zk_min_latency")) + { + zookeeper_submit_gauge ("latency", "min", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_max_latency")) + { + zookeeper_submit_gauge ("latency", "max", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_packets_received")) + { + zookeeper_submit_derive ("packets", "received", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_packets_sent")) + { + zookeeper_submit_derive ("packets", "sent", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_num_alive_connections")) + { + zookeeper_submit_gauge ("current_connections", NULL, atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_outstanding_requests")) + { + zookeeper_submit_gauge ("requests", "outstanding", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_znode_count")) + { + zookeeper_submit_gauge ("count", "znode", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_watch_count")) + { + zookeeper_submit_gauge ("count", "watch", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_ephemerals_count")) + { + zookeeper_submit_gauge ("count", "ephemerals", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_ephemerals_count")) + { + zookeeper_submit_gauge ("count", "ephemerals", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_ephemerals_count")) + { + zookeeper_submit_gauge ("count", "ephemerals", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_approximate_data_size")) + { + zookeeper_submit_gauge ("zk_approximate_data_size", NULL, atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_followers")) + { + zookeeper_submit_gauge ("count", "followers", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_synced_followers")) + { + zookeeper_submit_gauge ("count", "synced_followers", atol(fields[1])); + } + else if (FIELD_CHECK (fields[0], "zk_pending_syncs")) + { + zookeeper_submit_gauge ("count", "pending_syncs", atol(fields[1])); + } + else + { + DEBUG("Uncollected zookeeper MNTR field %s", fields[0]); + } + } + + return (0); +} /* zookeeper_read */ + +void module_register (void) +{ + plugin_register_config ("zookeeper", zookeeper_config, config_keys, config_keys_num); + plugin_register_read ("zookeeper", zookeeper_read); +} /* void module_register */ -- 2.30.2