summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 38dd976)
raw | patch | inline | side by side (parent: 38dd976)
author | Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com> | |
Wed, 31 Aug 2016 16:55:10 +0000 (17:55 +0100) | ||
committer | Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com> | |
Mon, 26 Dec 2016 13:26:05 +0000 (13:26 +0000) |
This plugin consists of two parts:
- OVS link
The implementation of the plugin itself, which uses
OVS utils API to be able to monitor a link status of
OVS connected interfaces and dispatch the values
through collectd notification mechanism whenever
the link state change occurs.
- OVS utils
This module implements the OVS DB communication routine
specified by RFC7047. It includes:
- Connecting/disconnecting to/from OVS DB (via TCP/UNIX);
- Mechanism to subscribe to OVS DB table events like
init/insert/modify/delete table rows;
- API to send custom request and receive result;
- Recovery connection mechanism with OVS DB;
- Handling of ECHO request to verify the liveness
of a database connection;
- Helpers functions.
Change-Id: Icac392bd1bd40f7dd156bfd2fc4ff08d9725a22f
Signed-off-by: Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
- OVS link
The implementation of the plugin itself, which uses
OVS utils API to be able to monitor a link status of
OVS connected interfaces and dispatch the values
through collectd notification mechanism whenever
the link state change occurs.
- OVS utils
This module implements the OVS DB communication routine
specified by RFC7047. It includes:
- Connecting/disconnecting to/from OVS DB (via TCP/UNIX);
- Mechanism to subscribe to OVS DB table events like
init/insert/modify/delete table rows;
- API to send custom request and receive result;
- Recovery connection mechanism with OVS DB;
- Handling of ECHO request to verify the liveness
of a database connection;
- Helpers functions.
Change-Id: Icac392bd1bd40f7dd156bfd2fc4ff08d9725a22f
Signed-off-by: Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
Makefile.am | patch | blob | history | |
README | patch | blob | history | |
configure.ac | patch | blob | history | |
src/collectd.conf.in | patch | blob | history | |
src/collectd.conf.pod | patch | blob | history | |
src/ovs_link.c | [new file with mode: 0644] | patch | blob |
src/utils_ovs.c | [new file with mode: 0644] | patch | blob |
src/utils_ovs.h | [new file with mode: 0644] | patch | blob |
diff --git a/Makefile.am b/Makefile.am
index f6554a1aab269c3396cec0babf76fd3e777c7469..d4d7cc02a35194d50e10789a0978f958fe186d27 100644 (file)
--- a/Makefile.am
+++ b/Makefile.am
oracle_la_LDFLAGS = $(PLUGIN_LDFLAGS)
endif
+if BUILD_PLUGIN_OVS_LINK
+pkglib_LTLIBRARIES += ovs_link.la
+ovs_link_la_SOURCES = ovs_link.c utils_ovs.c
+ovs_link_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS)
+ovs_link_la_CFLAGS = $(BUILD_WITH_LIBYAJL_CPPFLAGS)
+ovs_link_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS)
+endif
+
if BUILD_PLUGIN_PERL
pkglib_LTLIBRARIES += perl.la
perl_la_SOURCES = src/perl.c
index 925f3645c120d25ffa304053cc399519e3880db3..16640ab14b7537f94207674c3a679348e6b8b3ed 100644 (file)
--- a/README
+++ b/README
- oracle
Query data from an Oracle database.
+ - ovs_link
+ The plugin monitors the link status of OVS connected interfaces and
+ dispatches the values through collectd notification mechanism. It
+ requires YAJL library to be installed.
+ Detailed instructions for installing and setting up Open vSwitch, see
+ OVS documentation.
+ <http://openvswitch.org/support/dist-docs/INSTALL.md.html>
+
- perl
The perl plugin implements a Perl-interpreter into collectd. You can
write your own plugins in Perl and return arbitrary values using this
<http://www.xmms.org/>
* libyajl (optional)
- Parse JSON data. This is needed for the `ceph', `curl_json' and
+ Parse JSON data. This is needed for the `ceph', `curl_json', 'ovs_link' and
`log_logstash' plugins.
<http://github.com/lloyd/yajl>
diff --git a/configure.ac b/configure.ac
index 2d7f3e2a51841061762c03df6e1e47471da4c9c1..9243141550130d9b85bd3abe04527297e230f906 100644 (file)
--- a/configure.ac
+++ b/configure.ac
AC_PLUGIN([openldap], [$with_libldap], [OpenLDAP statistics])
AC_PLUGIN([openvpn], [yes], [OpenVPN client statistics])
AC_PLUGIN([oracle], [$with_oracle], [Oracle plugin])
+AC_PLUGIN([ovs_link], [$with_libyajl], [OVS link status plugin])
AC_PLUGIN([perl], [$plugin_perl], [Embed a Perl interpreter])
AC_PLUGIN([pf], [$have_net_pfvar_h], [BSD packet filter (PF) statistics])
# FIXME: Check for libevent, too.
AC_MSG_RESULT([ openldap . . . . . . $enable_openldap])
AC_MSG_RESULT([ openvpn . . . . . . . $enable_openvpn])
AC_MSG_RESULT([ oracle . . . . . . . $enable_oracle])
+AC_MSG_RESULT([ ovs_link . . . . . . $enable_ovs_link])
AC_MSG_RESULT([ perl . . . . . . . . $enable_perl])
AC_MSG_RESULT([ pf . . . . . . . . . $enable_pf])
AC_MSG_RESULT([ pinba . . . . . . . . $enable_pinba])
diff --git a/src/collectd.conf.in b/src/collectd.conf.in
index 62b245606e199fc3eb20050f7600cdda78979f57..4b9382522c0803bb25336d6c3d9b36e446c159a5 100644 (file)
--- a/src/collectd.conf.in
+++ b/src/collectd.conf.in
#@BUILD_PLUGIN_OPENLDAP_TRUE@LoadPlugin openldap
#@BUILD_PLUGIN_OPENVPN_TRUE@LoadPlugin openvpn
#@BUILD_PLUGIN_ORACLE_TRUE@LoadPlugin oracle
+#@BUILD_PLUGIN_OVS_LINK_TRUE@LoadPlugin ovs_link
#@BUILD_PLUGIN_PERL_TRUE@LoadPlugin perl
#@BUILD_PLUGIN_PINBA_TRUE@LoadPlugin pinba
#@BUILD_PLUGIN_PING_TRUE@LoadPlugin ping
# </Database>
#</Plugin>
+#<Plugin ovs_link>
+# OvsDbServerUrl "tcp:127.0.0.1:6640"
+# Interfaces "br0" "veth0"
+#</Plugin>
+
#<Plugin perl>
# IncludeDir "/my/include/path"
# BaseName "Collectd::Plugins"
diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod
index 2f29713a70d97e9a24ad72e4806f307e22fba0cb..e814af5bf180017113b7d2f55906544534d1ac96 100644 (file)
--- a/src/collectd.conf.pod
+++ b/src/collectd.conf.pod
=back
+=head2 Plugin C<ovs_link>
+
+
+The I<ovs_link> plugin monitors the link status of OVS connected interfaces and
+dispatches the values through collectd notification mechanism whenever the link
+state change occurs. This plugin uses OVSDB to get a link state change
+notification.
+
+B<Synopsis:>
+
+ <Plugin "ovs_link">
+ OvsDbServerUrl "tcp:127.0.0.1:6640"
+ Interfaces "br0" "veth0"
+ </Plugin>
+
+The plugin provides the following configuration options:
+
+=over 4
+
+=item B<OvsDbServerUrl> I<server>
+
+The URL is an address of OVS DB server JSON-RPC interface used by the plugin.
+To enable the interface, OVS DB daemon should be running with '--remote=ptcp:'
+or '--remote=punix:' option. See L<ovsdb-server(1)> for more details. The URL
+must take one of the following forms:
+
+=over 4
+
+=item B<tcp:>I<ip>:I<port>
+
+Connect to the given tcp I<port> on I<ip>, where I<ip> is IPv4 address
+of OVS DB server which is listening on TCP I<port> for incoming
+JSON-RPC client connection.
+
+=item B<unix:>I<file>
+
+Connect to the unix domain server socket named I<file> which is
+used by OVS DB for incoming JSON-RPC client connection.
+
+=back
+
+Default: C<tcp:127.0.0.1:6640>
+
+=item B<Interfaces> [I<ifname> ...]
+
+List of interface names to be monitored by this plugin. If this option is missed
+or it's empty then all OVS connected interfaces on all bridges are monitored.
+
+Default: empty (all interfaces on all bridges are monitored)
+
+=back
+
=head2 Plugin C<perl>
This plugin embeds a Perl-interpreter into collectd and provides an interface
diff --git a/src/ovs_link.c b/src/ovs_link.c
--- /dev/null
+++ b/src/ovs_link.c
@@ -0,0 +1,358 @@
+/**
+ * collectd - src/ovs_link.c
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ * Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
+ **/
+
+#include "common.h" /* auxiliary functions */
+#include "utils_ovs.h" /* OVS helpers */
+
+#define OVS_LINK_PLUGIN "ovs_link"
+#define OVS_LINK_DEFAULT_OVS_DB_SERVER_URL "tcp:127.0.0.1:6640"
+#define CONFIG_LOCK for (int __i = config_lock(); __i != 0 ; \
+ __i = config_unlock())
+
+struct interface_s {
+ char *name; /* interface name */
+ struct interface_s *next; /* next interface name */
+};
+typedef struct interface_s interface_t;
+
+struct ovs_link_config_s {
+ pthread_mutex_t mutex; /* mutex to lock the config structure */
+ char *ovs_db_server_url; /* OVS DB server URL */
+ ovs_db_t *ovs_db; /* pointer to OVS DB instance */
+ interface_t *ifaces; /* interface names */
+};
+typedef struct ovs_link_config_s ovs_link_config_t;
+
+/*
+ * Private variables
+ */
+ovs_link_config_t config = {PTHREAD_MUTEX_INITIALIZER, NULL, NULL, NULL};
+
+/* This function is used only by "CONFIG_LOCK" defined above.
+ * It always returns 1 when the config is locked.
+ */
+static inline int
+config_lock()
+{
+ pthread_mutex_lock(&config.mutex);
+ return (1);
+}
+
+/* This function is used only by "CONFIG_LOCK" defined above.
+ * It always returns 0 when config is unlocked.
+ */
+static inline int
+config_unlock()
+{
+ pthread_mutex_unlock(&config.mutex);
+ return (0);
+}
+
+/* Check if given interface name exists in configuration file. It
+ * returns 1 if exists otherwise 0. If no interfaces are configured,
+ * 1 is returned
+ */
+static int
+ovs_link_config_iface_exists(const char *ifname)
+{
+ int rc = 0;
+ CONFIG_LOCK {
+ if (!(rc = (config.ifaces == NULL))) {
+ for (interface_t *iface = config.ifaces; iface; iface = iface->next)
+ if (rc = (strcmp(ifname, iface->name) == 0))
+ break;
+ }
+ }
+ return rc;
+}
+
+/* Release memory allocated for configuration data */
+static void
+ovs_link_config_free()
+{
+ interface_t *del_iface = NULL;
+ CONFIG_LOCK {
+ sfree(config.ovs_db_server_url);
+ while (config.ifaces) {
+ del_iface = config.ifaces;
+ config.ifaces = config.ifaces->next;
+ free(del_iface->name);
+ free(del_iface);
+ }
+ }
+}
+
+/* Parse plugin configuration file and store the config
+ * in allocated memory. Returns negative value in case of error.
+ */
+static int
+ovs_link_plugin_config(oconfig_item_t *ci)
+{
+ interface_t *new_iface;
+ char *if_name;
+ char *ovs_db_url;
+
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+ if (strcasecmp("OvsDbServerUrl", child->key) == 0) {
+ if (cf_util_get_string(child, &ovs_db_url) < 0) {
+ ERROR(OVS_LINK_PLUGIN ": parse '%s' option failed", child->key);
+ goto failure;
+ } else
+ config.ovs_db_server_url = ovs_db_url;
+ } else if (strcasecmp("Interfaces", child->key) == 0) {
+ for (int j = 0; j < child->values_num; j++) {
+ /* check value type */
+ if (child->values[j].type != OCONFIG_TYPE_STRING) {
+ ERROR(OVS_LINK_PLUGIN
+ ": given interface name is not a string [idx=%d]", j);
+ goto failure;
+ }
+
+ /* get value */
+ if ((if_name = strdup(child->values[j].value.string)) == NULL) {
+ ERROR(OVS_LINK_PLUGIN " strdup() copy interface name fail");
+ goto failure;
+ }
+
+ if ((new_iface = malloc(sizeof(*new_iface))) == NULL) {
+ ERROR(OVS_LINK_PLUGIN ": malloc () copy interface name fail");
+ goto failure;
+ } else {
+ /* store interface name */
+ new_iface->name = if_name;
+ new_iface->next = config.ifaces;
+ CONFIG_LOCK {
+ config.ifaces = new_iface;
+ }
+ DEBUG(OVS_LINK_PLUGIN ": found monitored interface \"%s\"",
+ if_name);
+ }
+ }
+ } else {
+ ERROR(OVS_LINK_PLUGIN ": option '%s' is not allowed here", child->key);
+ goto failure;
+ }
+ }
+ return (0);
+
+failure:
+ ovs_link_config_free();
+ return (-1);
+}
+
+/* Dispatch OVS interface link status event to collectd */
+static int
+ovs_link_dispatch_notification(const char *link_name, const char *link_state)
+{
+ notification_t n = {NOTIF_FAILURE, time(NULL), "", "", OVS_LINK_PLUGIN,
+ "", "", "", NULL};
+
+ /* fill the notification data */
+ if (link_state != NULL)
+ n.severity = ((strcmp(link_state, "up") == 0) ?
+ NOTIF_OKAY : NOTIF_WARNING);
+ else
+ link_state = "UNKNOWN";
+
+ sstrncpy(n.host, hostname_g, sizeof(n.host));
+ ssnprintf(n.message, sizeof(n.message),
+ "link state of \"%s\" interface has been changed to \"%s\"",
+ link_name, link_state);
+
+ /* send the notification */
+ return plugin_dispatch_notification(&n);
+}
+
+/* Process OVS DB update table event. It handles link status update event(s)
+ * and dispatches the value(s) to collectd if interface name matches one of
+ * interfaces specified in configuration file.
+ */
+static void
+ovs_link_table_update_cb(yajl_val jupdates)
+{
+ yajl_val jnew_val = NULL;
+ yajl_val jupdate = NULL;
+ yajl_val jrow_update = NULL;
+ yajl_val jlink_name = NULL;
+ yajl_val jlink_state = NULL;
+ const char *link_name = NULL;
+
+ /* JSON "Interface" table update example:
+ * ---------------------------------
+ * {"Interface":
+ * {
+ * "9adf1db2-29ca-4140-ab22-ae347a4484de":
+ * {
+ * "new":
+ * {
+ * "name":"br0",
+ * "link_state":"up"
+ * },
+ * "old":
+ * {
+ * "link_state":"down"
+ * }
+ * }
+ * }
+ * }
+ */
+ if (!YAJL_IS_OBJECT(jupdates) || !(YAJL_GET_OBJECT(jupdates)->len > 0)) {
+ ERROR(OVS_LINK_PLUGIN ": unexpected OVS DB update event received");
+ return;
+ }
+ /* verify if this is a table event */
+ jupdate = YAJL_GET_OBJECT(jupdates)->values[0];
+ if (!YAJL_IS_OBJECT(jupdate)) {
+ ERROR(OVS_LINK_PLUGIN ": unexpected table update event received");
+ return;
+ }
+ /* go through all row updates */
+ for (int row_index = 0; row_index < YAJL_GET_OBJECT(jupdate)->len;
+ ++row_index) {
+ jrow_update = YAJL_GET_OBJECT(jupdate)->values[row_index];
+
+ /* check row update */
+ jnew_val = ovs_utils_get_value_by_key(jrow_update, "new");
+ if (jnew_val == NULL) {
+ ERROR(OVS_LINK_PLUGIN ": unexpected row update received");
+ return;
+ }
+ /* get link status update */
+ jlink_name = ovs_utils_get_value_by_key(jnew_val, "name");
+ jlink_state = ovs_utils_get_value_by_key(jnew_val, "link_state");
+ if (jlink_name && jlink_state) {
+ link_name = YAJL_GET_STRING(jlink_name);
+ if (link_name && ovs_link_config_iface_exists(link_name)) {
+ /* dispatch notification */
+ ovs_link_dispatch_notification(link_name,
+ YAJL_GET_STRING(jlink_state));
+ }
+ }
+ }
+}
+
+/* Process OVS DB result table callback. It handles init link status value
+ * and dispatches the value(s) to collectd. The logic to handle init status
+ * is same as 'ovs_link_table_update_cb'.
+ */
+static void
+ovs_link_table_result_cb(yajl_val jresult, yajl_val jerror)
+{
+ (void)jerror;
+ /* jerror is not used as it is the same all the time
+ (rfc7047, "Monitor" section, return value) */
+ ovs_link_table_update_cb(jresult);
+}
+
+/* Setup OVS DB table callback. It subscribes to 'Interface' tables
+ * to receive link status events.
+ */
+static void
+ovs_link_initialize(ovs_db_t *pdb)
+{
+ int ret = 0;
+ const char tb_name[] = "Interface";
+ const char *columns[] = {"name", "link_state", NULL};
+
+ /* register the update callback */
+ ret = ovs_db_table_cb_register(pdb, tb_name, columns,
+ ovs_link_table_update_cb,
+ ovs_link_table_result_cb,
+ OVS_DB_TABLE_CB_FLAG_MODIFY |
+ OVS_DB_TABLE_CB_FLAG_INITIAL);
+ if (ret < 0) {
+ ERROR(OVS_LINK_PLUGIN ": register OVS DB update callback failed");
+ return;
+ }
+
+ DEBUG(OVS_LINK_PLUGIN ": OVS DB has been initialized");
+}
+
+/* Set default config values (update config) if some of them aren't
+ * specified in configuration file
+ */
+static inline int
+ovs_link_config_set_default()
+{
+ if (!config.ovs_db_server_url)
+ config.ovs_db_server_url = strdup(OVS_LINK_DEFAULT_OVS_DB_SERVER_URL);
+ return (config.ovs_db_server_url == NULL);
+}
+
+/* Initialize OVS plugin */
+static int
+ovs_link_plugin_init(void)
+{
+ ovs_db_t *ovs_db = NULL;
+ ovs_db_callback_t cb = {.init_cb = ovs_link_initialize};
+
+ if (ovs_link_config_set_default()) {
+ ERROR(OVS_LINK_PLUGIN ": fail to make configuration");
+ ovs_link_config_free();
+ return (-1);
+ }
+
+ /* initialize OVS DB */
+ if ((ovs_db = ovs_db_init(config.ovs_db_server_url, &cb)) == NULL) {
+ ERROR(OVS_LINK_PLUGIN ": fail to connect to OVS DB server");
+ ovs_link_config_free();
+ return (-1);
+ }
+
+ /* store OVSDB handler */
+ CONFIG_LOCK {
+ config.ovs_db = ovs_db;
+ }
+
+ DEBUG(OVS_LINK_PLUGIN ": plugin has been initialized");
+ return (0);
+}
+
+/* Shutdown OVS plugin */
+static int
+ovs_link_plugin_shutdown(void)
+{
+ /* release memory allocated for config */
+ ovs_link_config_free();
+
+ /* destroy OVS DB */
+ if (ovs_db_destroy(config.ovs_db))
+ ERROR(OVS_LINK_PLUGIN ": OVSDB object destroy failed");
+
+ DEBUG(OVS_LINK_PLUGIN ": plugin has been destroyed");
+ return (0);
+}
+
+/* Register OVS plugin callbacks */
+void
+module_register(void)
+{
+ plugin_register_complex_config(OVS_LINK_PLUGIN, ovs_link_plugin_config);
+ plugin_register_init(OVS_LINK_PLUGIN, ovs_link_plugin_init);
+ plugin_register_shutdown(OVS_LINK_PLUGIN, ovs_link_plugin_shutdown);
+}
diff --git a/src/utils_ovs.c b/src/utils_ovs.c
--- /dev/null
+++ b/src/utils_ovs.c
@@ -0,0 +1,1284 @@
+/**
+ * collectd - src/utils_ovs.c
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ * Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
+ *
+ * OVS DB API internal architecture diagram
+ * +------------------------------------------------------------------------------+
+ * |OVS plugin |OVS utils |
+ * | | +------------------------+ |
+ * | | | echo handler | JSON request/ |
+ * | | +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
+ * | | | | | | | result |
+ * | | | +------------------------+ | | |
+ * | | | | +----+---+--------+ |
+ * | +----------+ | | +------------------------+ | | | | |
+ * | | update | | | | update handler | | | YAJL | JSON | |
+ * | | callback +<-------+(ovs_db_table_update_cp)+<---+ | parser | reader | |
+ * | +----------+ | | | | | | | | |
+ * | | | +------------------------+ | +--------+---+----+ |
+ * | | | | ^ |
+ * | +----------+ | | +------------------------+ | | |
+ * | | result | | | | result handler | | | |
+ * | | callback +<-------+ (ovs_db_result_cb) +<---+ JSON raw | |
+ * | +----------+ | | | | data | |
+ * | | | +------------------------+ | |
+ * | | | | |
+ * | | | +------------------+ +------------+----+ |
+ * | +----------+ | | |thread| | |thread| | |
+ * | | init | | | | | reconnect | | |
+ * | | callback +<---------+ EVENT WORKER +<------------+ POLL WORKER | |
+ * | +----------+ | | +------------------+ +--------+--------+ |
+ * | | | ^ |
+ * +----------------+-------------------------------------------------------------+
+ * | |
+ * JSON|echo reply raw|data
+ * v v
+ * +-------------------+----------------------------------------------+-----------+
+ * | TCP/UNIX socket |
+ * +-------------------------------------------------------------------------------
+ *
+ **/
+
+/* collectd headers */
+#include "common.h"
+
+/* private headers */
+#include "utils_ovs.h"
+
+/* system libraries */
+#include <semaphore.h>
+#include <arpa/inet.h>
+#include <poll.h>
+#include <sys/un.h>
+
+#define OVS_ERROR(fmt, ...) do { \
+ ERROR("ovs_utils: "fmt, ## __VA_ARGS__); } while (0)
+#define OVS_DEBUG(fmt, ...) do { \
+ DEBUG("%s:%d:%s(): "fmt, __FILE__, __LINE__, __FUNCTION__, \
+ ## __VA_ARGS__); } while (0)
+
+#define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */
+#define OVS_DB_POLL_READ_BLOCK_SIZE 5 /* read block size (bytes) */
+#define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
+#define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */
+
+#define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
+#define OVS_DB_EVENT_TERMINATE 1
+#define OVS_DB_EVENT_CONNECTED 2
+
+#define OVS_DB_POLL_STATE_RUNNING 1
+#define OVS_DB_POLL_STATE_EXITING 2
+
+#define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
+
+#define OVS_YAJL_CALL(func, ...) \
+ do { \
+ yajl_gen_ret = yajl_gen_status_ok; \
+ if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
+ goto yajl_gen_failure; \
+ } while (0)
+#define OVS_YAJL_ERROR_BUFFER_SIZE 1024
+#define OVS_ERROR_BUFF_SIZE 512
+#define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
+
+/* JSON reader internal data */
+struct ovs_json_reader_s {
+ char *buff_ptr;
+ size_t buff_size;
+ size_t buff_offset;
+ size_t json_offset;
+};
+typedef struct ovs_json_reader_s ovs_json_reader_t;
+
+/* Result callback declaration */
+struct ovs_result_cb_s {
+ sem_t sync;
+ ovs_db_result_cb_t call;
+};
+typedef struct ovs_result_cb_s ovs_result_cb_t;
+
+/* Table callback declaration */
+struct ovs_table_cb_s {
+ ovs_db_table_cb_t call;
+};
+typedef struct ovs_table_cb_s ovs_table_cb_t;
+
+/* Callback declaration */
+struct ovs_callback_s {
+ uint64_t uid;
+ union {
+ ovs_result_cb_t result;
+ ovs_table_cb_t table;
+ };
+ struct ovs_callback_s *next;
+ struct ovs_callback_s *prev;
+};
+typedef struct ovs_callback_s ovs_callback_t;
+
+/* Connection declaration */
+struct ovs_conn_s {
+ int sock;
+ int domain;
+ int type;
+ int addr_size;
+ union {
+ struct sockaddr_in s_inet;
+ struct sockaddr_un s_unix;
+ } addr;
+};
+typedef struct ovs_conn_s ovs_conn_t;
+
+/* Event thread data declaration */
+struct ovs_event_thread_s {
+ pthread_t tid;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int value;
+};
+typedef struct ovs_event_thread_s ovs_event_thread_t;
+
+/* Poll thread data declaration */
+struct ovs_poll_thread_s {
+ pthread_t tid;
+ pthread_mutex_t mutex;
+ int state;
+};
+typedef struct ovs_poll_thread_s ovs_poll_thread_t;
+
+/* OVS DB internal data declaration */
+struct ovs_db_s {
+ ovs_poll_thread_t poll_thread;
+ ovs_event_thread_t event_thread;
+ pthread_mutex_t mutex;
+ ovs_callback_t *cb;
+ ovs_conn_t conn;
+ ovs_db_init_cb_t init_cb;
+};
+typedef struct ovs_db_s ovs_db_t;
+
+/* Post an event to event thread.
+ * Possible events are:
+ * OVS_DB_EVENT_TERMINATE
+ * OVS_DB_EVENT_CONNECTED
+ */
+static void
+ovs_db_event_post(ovs_db_t *pdb, int event)
+{
+ pthread_mutex_lock(&pdb->event_thread.mutex);
+ pdb->event_thread.value = event;
+ pthread_mutex_unlock(&pdb->event_thread.mutex);
+ pthread_cond_signal(&pdb->event_thread.cond);
+}
+
+/* Check if POLL thread is still running. Returns
+ * 1 if running otherwise 0 is returned */
+static inline int
+ovs_db_poll_is_running(ovs_db_t *pdb)
+{
+ int state = 0;
+ pthread_mutex_lock(&pdb->poll_thread.mutex);
+ state = pdb->poll_thread.state;
+ pthread_mutex_unlock(&pdb->poll_thread.mutex);
+ return (state == OVS_DB_POLL_STATE_RUNNING);
+}
+
+/* Terminate POLL thread */
+static inline void
+ovs_db_poll_terminate(ovs_db_t *pdb)
+{
+ pthread_mutex_lock(&pdb->poll_thread.mutex);
+ pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
+ pthread_mutex_unlock(&pdb->poll_thread.mutex);
+}
+
+/* Generate unique identifier (UID). It is used by OVS DB API
+ * to set "id" field for any OVS DB JSON request. */
+static uint64_t
+ovs_uid_generate()
+{
+ struct timespec ts;
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
+}
+
+/*
+ * Callback API. These function are used to store
+ * registered callbacks in OVS DB API.
+ */
+
+/* Add new callback into OVS DB object */
+static void
+ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
+{
+ pthread_mutex_lock(&pdb->mutex);
+ if (pdb->cb)
+ pdb->cb->prev = new_cb;
+ new_cb->next = pdb->cb;
+ new_cb->prev = NULL;
+ pdb->cb = new_cb;
+ pthread_mutex_unlock(&pdb->mutex);
+}
+
+/* Remove callback from OVS DB object */
+static void
+ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
+{
+ ovs_callback_t *pre_cb = del_cb->prev;
+ ovs_callback_t *next_cb = del_cb->next;
+
+ pthread_mutex_lock(&pdb->mutex);
+ if (next_cb)
+ next_cb->prev = del_cb->prev;
+
+ if (pre_cb)
+ pre_cb->next = del_cb->next;
+ else
+ pdb->cb = del_cb->next;
+
+ free(del_cb);
+ pthread_mutex_unlock(&pdb->mutex);
+}
+
+/* Remove all callbacks form OVS DB object */
+static void
+ovs_db_callback_remove_all(ovs_db_t *pdb)
+{
+ pthread_mutex_lock(&pdb->mutex);
+ for (ovs_callback_t *del_cb = pdb->cb; pdb->cb; del_cb = pdb->cb) {
+ pdb->cb = pdb->cb->next;
+ free(del_cb);
+ }
+ pdb->cb = NULL;
+ pthread_mutex_unlock(&pdb->mutex);
+}
+
+/* Get/find callback in OVS DB object by UID. Returns pointer
+ * to requested callback otherwise NULL is returned */
+static ovs_callback_t *
+ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid)
+{
+ pthread_mutex_lock(&pdb->mutex);
+ for (ovs_callback_t *cb = pdb->cb; cb != NULL; cb = cb->next)
+ if (cb->uid == uid) {
+ pthread_mutex_unlock(&pdb->mutex);
+ return cb;
+ }
+ pthread_mutex_unlock(&pdb->mutex);
+ return NULL;
+}
+
+/* Send all requested data to the socket. Returns 0 if
+ * ALL request data has been sent otherwise negative value
+ * is returned */
+static int
+ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len)
+{
+ ssize_t nbytes = 0;
+ size_t rem = len;
+ size_t off = 0;
+
+ while (rem > 0) {
+ if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0)
+ return (-1);
+ rem -= (size_t)nbytes;
+ off += (size_t)nbytes;
+ }
+ return (0);
+}
+
+/* Parse OVS server URL.
+ * Format of the URL:
+ * "tcp:a.b.c.d:port" - define TCP connection (INET domain)
+ * "unix:file" - define UNIX socket file (UNIX domain)
+ */
+static int
+ovs_db_url_parse(const char *surl, ovs_conn_t *conn)
+{
+ ovs_conn_t tmp_conn;
+ char *nexttok = NULL;
+ char *in_str = NULL;
+ char *saveptr;
+ int ret = 0;
+
+ /* sanity check */
+ if ((surl == NULL) || (strlen(surl) < 1))
+ return (-1);
+
+ /* parse domain */
+ tmp_conn = *conn;
+ in_str = sstrdup(surl);
+ if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) {
+ if (strcmp("tcp", nexttok) == 0) {
+ tmp_conn.domain = AF_INET;
+ tmp_conn.type = SOCK_STREAM;
+ tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet);
+ } else if (strcmp("unix", nexttok) == 0) {
+ tmp_conn.domain = AF_UNIX;
+ tmp_conn.type = SOCK_STREAM;
+ tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix);
+ } else
+ goto failure;
+ } else
+ goto failure;
+
+ /* parse url depending on domain */
+ if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) {
+ if (tmp_conn.domain == AF_UNIX) {
+ /* <UNIX-NAME> */
+ tmp_conn.addr.s_inet.sin_family = AF_UNIX;
+ sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1);
+ } else {
+ /* <IP:PORT> */
+ tmp_conn.addr.s_inet.sin_family = AF_INET;
+ ret =
+ inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr);
+ if (ret == 1) {
+ if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL)
+ tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok));
+ else
+ goto failure;
+ } else
+ goto failure;
+ }
+ }
+
+ /* save result and return success */
+ *conn = tmp_conn;
+ sfree(in_str);
+ return (0);
+
+failure:
+ OVS_ERROR("%s() : invalid OVS DB URL provided");
+ sfree(in_str);
+ return (-1);
+}
+
+/*
+ * YAJL (Yet Another JSON Library) helper functions
+ * Documentation (https://lloyd.github.io/yajl/)
+ */
+
+/* Add null-terminated string into YAJL generator handle (JSON object).
+ * Similar function to yajl_gen_string() but takes null-terminated string
+ * instead of string and its length.
+ *
+ * jgen - YAJL generator handle allocated by yajl_gen_alloc()
+ * string - Null-terminated string
+ */
+static inline yajl_gen_status
+ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
+{
+ return yajl_gen_string(hander, string, strlen(string));
+}
+
+/* Add YAJL value into YAJL generator handle (JSON object)
+ *
+ * jgen - YAJL generator handle allocated by yajl_gen_alloc()
+ * jval - YAJL value usually returned by yajl_tree_get()
+ */
+static yajl_gen_status
+ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval)
+{
+ size_t array_len = 0;
+ yajl_val *jvalues = NULL;
+ yajl_val jobj_value = NULL;
+ const char *obj_key = NULL;
+ size_t obj_len = 0;
+ yajl_gen_status yajl_gen_ret;
+
+ if (YAJL_IS_STRING(jval))
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
+ else if (YAJL_IS_DOUBLE(jval))
+ OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
+ else if (YAJL_IS_INTEGER(jval))
+ OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
+ else if (YAJL_IS_TRUE(jval))
+ OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
+ else if (YAJL_IS_FALSE(jval))
+ OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
+ else if (YAJL_IS_NULL(jval))
+ OVS_YAJL_CALL(yajl_gen_null, jgen);
+ else if (YAJL_IS_ARRAY(jval)) {
+ /* create new array and add all elements into the array */
+ array_len = YAJL_GET_ARRAY(jval)->len;
+ jvalues = YAJL_GET_ARRAY(jval)->values;
+ OVS_YAJL_CALL(yajl_gen_array_open, jgen);
+ for (int i = 0; i < array_len; i++)
+ OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
+ OVS_YAJL_CALL(yajl_gen_array_close, jgen);
+ } else if (YAJL_IS_OBJECT(jval)) {
+ /* create new object and add all elements into the object */
+ OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+ obj_len = YAJL_GET_OBJECT(jval)->len;
+ for (int i = 0; i < obj_len; i++) {
+ obj_key = YAJL_GET_OBJECT(jval)->keys[i];
+ jobj_value = YAJL_GET_OBJECT(jval)->values[i];
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
+ OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
+ }
+ OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+ } else {
+ OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
+ (int)(jval)->type);
+ goto yajl_gen_failure;
+ }
+ return yajl_gen_status_ok;
+
+yajl_gen_failure:
+ OVS_ERROR("%s() error to generate value", __FUNCTION__);
+ return yajl_gen_ret;
+}
+
+/* OVS DB echo request handler. When OVS DB sends
+ * "echo" request to the client, client should generate
+ * "echo" replay with the same content received in the
+ * request */
+static int
+ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
+{
+ yajl_val jparams;
+ yajl_val jid;
+ yajl_gen jgen;
+ size_t resp_len = 0;
+ const char *resp = NULL;
+ const char *params_path[] = {"params", NULL};
+ const char *id_path[] = {"id", NULL};
+ yajl_gen_status yajl_gen_ret;
+
+ if ((jgen = yajl_gen_alloc(NULL)) == NULL)
+ return (-1);
+
+ /* check & get request attributes */
+ if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
+ ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
+ OVS_ERROR("parse echo request failed");
+ goto yajl_gen_failure;
+ }
+
+ /* generate JSON echo response */
+ OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
+ OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
+
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
+ OVS_YAJL_CALL(yajl_gen_null, jgen);
+
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
+ OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
+
+ OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+ OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
+ &resp_len);
+
+ /* send the response */
+ OVS_DEBUG("response: %s", resp);
+ if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
+ OVS_ERROR("send echo reply failed");
+ goto yajl_gen_failure;
+ }
+ /* clean up and return success */
+ yajl_gen_clear(jgen);
+ return (0);
+
+yajl_gen_failure:
+ /* release memory */
+ yajl_gen_clear(jgen);
+ return (-1);
+}
+
+/* Get OVS DB registered callback by YAJL val. The YAJL
+ * value should be YAJL string (UID). Returns NULL if
+ * callback hasn't been found.
+ */
+static ovs_callback_t *
+ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
+{
+ char *endptr = NULL;
+ const char *suid = NULL;
+ uint64_t uid;
+
+ if (jid && YAJL_IS_STRING(jid)) {
+ suid = YAJL_GET_STRING(jid);
+ uid = (uint64_t) strtoul(suid, &endptr, 16);
+ if (*endptr == '\0' && uid)
+ return ovs_db_callback_get(pdb, uid);
+ }
+
+ return NULL;
+}
+
+/* OVS DB table update event handler.
+ * This callback is called by POLL thread if OVS DB
+ * table update callback is received from the DB
+ * server. Once registered callback found, it's called
+ * by this handler. */
+static int
+ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode)
+{
+ ovs_callback_t *cb = NULL;
+ yajl_val jvalue;
+ yajl_val jparams;
+ yajl_val jtable_updates;
+ yajl_val jtable_update;
+ size_t obj_len = 0;
+ const char *table_name = NULL;
+ const char *params_path[] = {"params", NULL};
+ const char *id_path[] = {"id", NULL};
+
+ /* check & get request attributes */
+ if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
+ (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
+ goto ovs_failure;
+
+ /* check array length: [<json-value>, <table-updates>] */
+ if (YAJL_GET_ARRAY(jparams)->len != 2)
+ goto ovs_failure;
+
+ jvalue = YAJL_GET_ARRAY(jparams)->values[0];
+ jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
+ if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
+ goto ovs_failure;
+
+ /* find registered callback based on <json-value> */
+ cb = ovs_db_table_callback_get(pdb, jvalue);
+ if (cb == NULL || cb->table.call == NULL)
+ goto ovs_failure;
+
+ /* call registered callback */
+ cb->table.call(jtable_updates);
+ return 0;
+
+ovs_failure:
+ OVS_ERROR("invalid OVS DB table update event");
+ return (-1);
+}
+
+/* OVS DB result request handler.
+ * This callback is called by POLL thread if OVS DB
+ * result reply is received from the DB server.
+ * Once registered callback found, it's called
+ * by this handler. */
+static int
+ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
+{
+ ovs_callback_t *cb = NULL;
+ yajl_val jresult;
+ yajl_val jerror;
+ yajl_val jid;
+ const char *result_path[] = {"result", NULL};
+ const char *error_path[] = {"error", NULL};
+ const char *id_path[] = {"id", NULL};
+
+ jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
+ jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
+ jid = yajl_tree_get(jnode, id_path, yajl_t_string);
+
+ /* check & get result attributes */
+ if (!jresult || !jerror || !jid)
+ return (-1);
+
+ /* try to find registered callback */
+ cb = ovs_db_table_callback_get(pdb, jid);
+ if (cb != NULL && cb->result.call != NULL) {
+ /* call registered callback */
+ cb->result.call(jresult, jerror);
+ /* unlock owner of the reply */
+ sem_post(&cb->result.sync);
+ }
+
+ return (0);
+}
+
+/* Handle JSON data (one request) and call
+ * appropriate event OVS DB handler. Currently,
+ * update callback 'ovs_db_table_update_cb' and
+ * result callback 'ovs_db_result_cb' is supported.
+ */
+static int
+ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
+{
+ const char *method = NULL;
+ char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
+ const char *method_path[] = {"method", NULL};
+ const char *result_path[] = {"result", NULL};
+ char *sjson = NULL;
+ yajl_val jnode, jval;
+
+ /* duplicate the data to make null-terminated string
+ * required for yajl_tree_parse() */
+ if ((sjson = strndup(data, len)) == NULL)
+ return (-1);
+
+ OVS_DEBUG("%s", sjson);
+
+ /* parse json data */
+ jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
+ if (jnode == NULL) {
+ OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
+ return (-1);
+ }
+
+ /* get method name */
+ if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) {
+ method = YAJL_GET_STRING(jval);
+ if (strcmp("echo", method) == 0) {
+ /* echo request from the server */
+ if (ovs_db_table_echo_cb(pdb, jnode) < 0)
+ OVS_ERROR("handle echo request failed");
+ } else if (strcmp("update", method) == 0) {
+ /* update notification */
+ if (ovs_db_table_update_cb(pdb, jnode) < 0)
+ OVS_ERROR("handle update notification failed");
+ }
+ } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_object)) {
+ /* result notification */
+ if (ovs_db_result_cb(pdb, jnode) < 0)
+ OVS_ERROR("handle result reply failed");
+ }
+
+ /* release memory */
+ yajl_tree_free(jnode);
+ sfree(sjson);
+ return (0);
+}
+
+/*
+ * JSON reader implementation.
+ *
+ * This module process raw JSON data (byte stream) and
+ * returns fully-fledged JSON data which can be processed
+ * (parsed) by YAJL later.
+ */
+
+/* Allocate JSON reader instance */
+static inline ovs_json_reader_t *
+ovs_json_reader_alloc()
+{
+ ovs_json_reader_t *jreader = NULL;
+
+ if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
+ return NULL;
+
+ return jreader;
+}
+
+/* Push raw data into into the JSON reader for processing */
+static inline int
+ovs_json_reader_push_data(ovs_json_reader_t *jreader,
+ const char *data, size_t data_len)
+{
+ char *new_buff = NULL;
+ size_t available = jreader->buff_size - jreader->buff_offset;
+
+ /* check/update required memory space */
+ if (available < data_len) {
+ OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
+ (int)jreader->buff_size, (int)available, (int)data_len);
+
+ /* allocate new chunk of memory */
+ new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
+ if (new_buff == NULL)
+ return (-1);
+
+ /* point to new allocated memory */
+ jreader->buff_ptr = new_buff;
+ jreader->buff_size += data_len;
+ }
+
+ /* store input data */
+ memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
+ jreader->buff_offset += data_len;
+ return (0);
+}
+
+/* Pop one fully-fledged JSON if already exists. Returns 0 if
+ * completed JSON already exists otherwise negative value is
+ * returned */
+static inline int
+ovs_json_reader_pop(ovs_json_reader_t *jreader,
+ const char **json_ptr, size_t *json_len_ptr)
+{
+ size_t nbraces = 0;
+ size_t json_len = 0;
+ char *json = NULL;
+
+ /* search open/close brace */
+ for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
+ if (jreader->buff_ptr[i] == '{') {
+ nbraces++;
+ } else if (jreader->buff_ptr[i] == '}')
+ if (nbraces)
+ if (!(--nbraces)) {
+ /* JSON data */
+ *json_ptr = jreader->buff_ptr + jreader->json_offset;
+ *json_len_ptr = json_len + 1;
+ jreader->json_offset = i + 1;
+ return (0);
+ }
+
+ /* increase JSON data length */
+ if (nbraces)
+ json_len++;
+ }
+
+ if (jreader->json_offset) {
+ if (jreader->json_offset < jreader->buff_offset) {
+ /* shift data to the beginning of the buffer
+ * and zero rest of the buffer data */
+ json = &jreader->buff_ptr[jreader->json_offset];
+ json_len = jreader->buff_offset - jreader->json_offset;
+ for (int i = 0; i < jreader->buff_size; i++)
+ jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
+ jreader->buff_offset = json_len;
+ } else
+ /* reset the buffer */
+ jreader->buff_offset = 0;
+
+ /* data is at the beginning of the buffer */
+ jreader->json_offset = 0;
+ }
+
+ return (-1);
+}
+
+/* Reset JSON reader. It is useful when start processing
+ * new raw data. E.g.: in case of lost stream connection.
+ */
+static inline void
+ovs_json_reader_reset(ovs_json_reader_t *jreader)
+{
+ if (jreader) {
+ jreader->buff_offset = 0;
+ jreader->json_offset = 0;
+ }
+}
+
+/* Release internal data allocated for JSON reader */
+static inline void
+ovs_json_reader_free(ovs_json_reader_t *jreader)
+{
+ if (jreader) {
+ free(jreader->buff_ptr);
+ free(jreader);
+ }
+}
+
+/* Reconnect to OVD DB and call init OVS DB callback
+ * 'init_cb' if connection has been established.
+ */
+static int
+ovs_db_reconnect(ovs_db_t *pdb)
+{
+ char errbuff[OVS_ERROR_BUFF_SIZE];
+
+ /* remove all registered OVS DB table/result callbacks */
+ ovs_db_callback_remove_all(pdb);
+
+ /* open new socket */
+ if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) {
+ sstrerror(errno, errbuff, sizeof(errbuff));
+ OVS_ERROR("socket(): %s", errbuff);
+ return (-1);
+ }
+
+ /* try to connect to server */
+ if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr,
+ pdb->conn.addr_size) < 0) {
+ sstrerror(errno, errbuff, sizeof(errbuff));
+ OVS_ERROR("connect(): %s", errbuff);
+ close(pdb->conn.sock);
+ return (-1);
+ }
+
+ /* send notification to event thread */
+ ovs_db_event_post(pdb, OVS_DB_EVENT_CONNECTED);
+ return (0);
+}
+
+/* POLL worker thread.
+ * It listens on OVS DB connection for incoming
+ * requests/reply/events etc. Also, it reconnects to OVS DB
+ * if connection has been lost.
+ */
+static void *
+ovs_poll_worker(void *arg)
+{
+ ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
+ ovs_json_reader_t *jreader = NULL;
+ const char *json;
+ size_t json_len;
+ ssize_t nbytes = 0;
+ char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
+ struct pollfd poll_fd;
+ int poll_ret = 0;
+
+ if ((jreader = ovs_json_reader_alloc()) == NULL) {
+ OVS_ERROR("initialize json reader failed");
+ goto thread_exit;
+ }
+
+ /* start polling data */
+ poll_fd.fd = pdb->conn.sock;
+ poll_fd.events = POLLIN | POLLPRI;
+ poll_fd.revents = 0;
+
+ /* poll data */
+ while (ovs_db_poll_is_running(pdb)) {
+ poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
+ if (poll_ret > 0) {
+ if (poll_fd.revents & POLLNVAL) {
+ /* invalid file descriptor, reconnect */
+ if (ovs_db_reconnect(pdb) != 0) {
+ /* sleep awhile until next reconnect */
+ usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
+ }
+ ovs_json_reader_reset(jreader);
+ poll_fd.fd = pdb->conn.sock;
+ } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
+ /* connection is broken */
+ OVS_ERROR("poll() peer closed its end of the channel");
+ close(poll_fd.fd);
+ } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
+ /* read incoming data */
+ nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
+ if (nbytes > 0) {
+ OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
+ ovs_json_reader_push_data(jreader, buff, nbytes);
+ while (!ovs_json_reader_pop(jreader, &json, &json_len))
+ /* process JSON data */
+ ovs_db_json_data_process(pdb, json, json_len);
+ } else if (nbytes == 0) {
+ OVS_ERROR("recv() peer has performed an orderly shutdown");
+ close(poll_fd.fd);
+ } else {
+ OVS_ERROR("recv() receive data error");
+ break;
+ }
+ } /* poll() POLLIN & POLLPRI */
+ } else if (poll_ret == 0)
+ OVS_DEBUG("poll() timeout");
+ else {
+ OVS_ERROR("poll() error");
+ break;
+ }
+ }
+
+thread_exit:
+ OVS_DEBUG("poll thread has been completed");
+ ovs_json_reader_free(jreader);
+ pthread_exit((void *)0);
+ return ((void *)0);
+}
+
+/* EVENT worker thread.
+ * Perform task based on incoming events. This
+ * task can be done asynchronously which allows to
+ * handle OVD DB callback like 'init_cb'.
+ */
+static void *
+ovs_event_worker(void *arg)
+{
+ int ret = 0;
+ ovs_db_t *pdb = (ovs_db_t *)arg;
+ struct timespec ts;
+
+ while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
+ /* wait for an event */
+ clock_gettime(CLOCK_REALTIME, &ts);
+ ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
+ ret = pthread_cond_timedwait(&pdb->event_thread.cond,
+ &pdb->event_thread.mutex, &ts);
+ if (!ret) {
+ /* handle the event */
+ OVS_DEBUG("handle event %d", pdb->event_thread.value);
+ if (pdb->event_thread.value == OVS_DB_EVENT_CONNECTED)
+ if (pdb->init_cb)
+ pdb->init_cb(pdb);
+ } else if (ret == ETIMEDOUT) {
+ /* wait timeout */
+ OVS_DEBUG("no event received (timeout)");
+ continue;
+ } else {
+ /* unexpected error */
+ OVS_ERROR("pthread_cond_timedwait() failed");
+ break;
+ }
+ }
+
+thread_exit:
+ OVS_DEBUG("event thread has been completed");
+ pthread_exit((void *)0);
+ return ((void *)0);
+}
+
+/* Stop EVENT thread */
+static int
+ovs_db_event_thread_stop(ovs_db_t *pdb)
+{
+ ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
+ if (pthread_join(pdb->event_thread.tid, NULL) != 0)
+ return (-1);
+ pthread_mutex_unlock(&pdb->event_thread.mutex);
+ pthread_mutex_destroy(&pdb->event_thread.mutex);
+ return (0);
+}
+
+/* Stop POLL thread */
+static int
+ovs_db_poll_thread_stop(ovs_db_t *pdb)
+{
+ ovs_db_poll_terminate(pdb);
+ if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
+ return (-1);
+ pthread_mutex_destroy(&pdb->poll_thread.mutex);
+ return (0);
+}
+
+/*
+ * Public OVS DB API implementation
+ */
+
+ovs_db_t *
+ovs_db_init(const char *surl, ovs_db_callback_t *cb)
+{
+ pthread_mutexattr_t mutex_attr;
+ ovs_db_t *pdb = NULL;
+
+ /* allocate db data & fill it */
+ if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
+ return (NULL);
+
+ /* convert string url to socket addr */
+ if (ovs_db_url_parse(surl, &pdb->conn) < 0)
+ goto failure;
+
+ /* setup OVS DB callbacks */
+ if (cb)
+ pdb->init_cb = cb->init_cb;
+
+ /* prepare event thread */
+ pthread_cond_init(&pdb->event_thread.cond, NULL);
+ pthread_mutex_init(&pdb->event_thread.mutex, NULL);
+ pthread_mutex_lock(&pdb->event_thread.mutex);
+ if (plugin_thread_create(&pdb->event_thread.tid, NULL,
+ ovs_event_worker, pdb) != 0) {
+ OVS_ERROR("event worker start failed");
+ goto failure;
+ }
+
+ /* prepare polling thread */
+ ovs_db_reconnect(pdb);
+ pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
+ pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
+ if (plugin_thread_create(&pdb->poll_thread.tid, NULL,
+ ovs_poll_worker, pdb) != 0) {
+ OVS_ERROR("pull worker start failed");
+ goto failure;
+ }
+
+ /* init OVS DB mutex */
+ if (pthread_mutexattr_init(&mutex_attr) ||
+ pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
+ pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
+ OVS_ERROR("OVS DB mutex init failed");
+ goto failure;
+ }
+
+ /* return db to the caller */
+ return pdb;
+
+failure:
+ if (pdb->conn.sock)
+ /* close connection */
+ close(pdb->conn.sock);
+ if (pdb->event_thread.tid != 0)
+ /* stop event thread */
+ if (ovs_db_event_thread_stop(pdb) < 0)
+ OVS_ERROR("stop event thread failed");
+ if (pdb->poll_thread.tid != 0)
+ /* stop poll thread */
+ if (ovs_db_poll_thread_stop(pdb) < 0)
+ OVS_ERROR("stop poll thread failed");
+ sfree(pdb);
+ return NULL;
+}
+
+int
+ovs_db_send_request(ovs_db_t *pdb, const char *method,
+ const char *params, ovs_db_result_cb_t cb)
+{
+ int ret = 0;
+ yajl_gen_status yajl_gen_ret;
+ yajl_val jparams;
+ yajl_gen jgen;
+ ovs_callback_t *new_cb = NULL;
+ uint64_t uid;
+ char uid_buff[OVS_UID_STR_SIZE];
+ const char *req = NULL;
+ size_t req_len = 0;
+ struct timespec ts;
+
+ /* sanity check */
+ if (!pdb || !method || !params)
+ return (-1);
+
+ if ((jgen = yajl_gen_alloc(NULL)) == NULL)
+ return (-1);
+
+ /* try to parse params */
+ if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
+ OVS_ERROR("params is not a JSON string");
+ yajl_gen_clear(jgen);
+ return (-1);
+ }
+
+ /* generate method field */
+ OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
+
+ /* generate params field */
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
+ OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
+ yajl_tree_free(jparams);
+
+ /* generate id field */
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
+ uid = ovs_uid_generate();
+ ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
+
+ OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+
+ if (cb) {
+ /* register result callback */
+ if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
+ goto yajl_gen_failure;
+
+ /* add new callback to front */
+ sem_init(&new_cb->result.sync, 0, 0);
+ new_cb->result.call = cb;
+ new_cb->uid = uid;
+ ovs_db_callback_add(pdb, new_cb);
+ }
+
+ /* send the request */
+ OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req,
+ &req_len);
+ OVS_DEBUG("%s", req);
+ if (!ovs_db_data_send(pdb, req, req_len)) {
+ if (cb) {
+ /* wait for result */
+ clock_gettime(CLOCK_REALTIME, &ts);
+ ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
+ if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
+ OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
+ OVS_DB_SEND_REQ_TIMEOUT);
+ ret = (-1);
+ }
+ }
+ } else {
+ OVS_ERROR("ovs_db_data_send() failed");
+ ret = (-1);
+ }
+
+yajl_gen_failure:
+ if (new_cb) {
+ /* destroy callback */
+ sem_destroy(&new_cb->result.sync);
+ ovs_db_callback_remove(pdb, new_cb);
+ }
+
+ /* release memory */
+ yajl_gen_clear(jgen);
+ return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
+}
+
+int
+ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
+ const char **tb_column, ovs_db_table_cb_t update_cb,
+ ovs_db_result_cb_t result_cb, unsigned int flags)
+{
+ yajl_gen jgen;
+ yajl_gen_status yajl_gen_ret;
+ ovs_callback_t *new_cb = NULL;
+ char uid_str[OVS_UID_STR_SIZE];
+ char *params;
+ size_t params_len;
+ int ovs_db_ret = 0;
+
+ /* sanity check */
+ if (pdb == NULL || tb_name == NULL || update_cb == NULL)
+ return (-1);
+
+ if ((jgen = yajl_gen_alloc(NULL)) == NULL)
+ return (-1);
+
+ /* register table update callback */
+ if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
+ return (-1);
+
+ /* add new callback to front */
+ new_cb->table.call = update_cb;
+ new_cb->uid = ovs_uid_generate();
+ ovs_db_callback_add(pdb, new_cb);
+
+ /* make update notification request
+ * [<db-name>, <json-value>, <monitor-requests>] */
+ OVS_YAJL_CALL(yajl_gen_array_open, jgen);
+ {
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
+
+ /* uid string <json-value> */
+ ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
+
+ /* <monitor-requests> */
+ OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+ {
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
+ OVS_YAJL_CALL(yajl_gen_array_open, jgen);
+ {
+ /* <monitor-request> */
+ OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+ {
+ if (tb_column) {
+ /* columns within the table to be monitored */
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
+ OVS_YAJL_CALL(yajl_gen_array_open, jgen);
+ for (; *tb_column; tb_column++)
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
+ OVS_YAJL_CALL(yajl_gen_array_close, jgen);
+ }
+ /* specify select option */
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
+ {
+ OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+ {
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
+ OVS_YAJL_CALL(yajl_gen_bool, jgen,
+ flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
+ OVS_YAJL_CALL(yajl_gen_bool, jgen,
+ flags & OVS_DB_TABLE_CB_FLAG_INSERT);
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
+ OVS_YAJL_CALL(yajl_gen_bool, jgen,
+ flags & OVS_DB_TABLE_CB_FLAG_DELETE);
+ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
+ OVS_YAJL_CALL(yajl_gen_bool, jgen,
+ flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
+ }
+ OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+ }
+ }
+ OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+ }
+ OVS_YAJL_CALL(yajl_gen_array_close, jgen);
+ }
+ OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+ }
+ OVS_YAJL_CALL(yajl_gen_array_close, jgen);
+
+ /* make a request to subscribe to given table */
+ OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)¶ms,
+ ¶ms_len);
+ if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
+ OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
+ ovs_db_ret = (-1);
+ }
+
+yajl_gen_failure:
+ /* release memory */
+ yajl_gen_clear(jgen);
+ return ovs_db_ret;
+}
+
+int
+ovs_db_destroy(ovs_db_t *pdb)
+{
+ int ovs_db_ret = 0;
+ int ret = 0;
+
+ /* sanity check */
+ if (pdb == NULL)
+ return (-1);
+
+ /* try to lock the structure before releasing */
+ if (ret = pthread_mutex_lock(&pdb->mutex)) {
+ OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
+ return (-1);
+ }
+
+ /* stop poll thread */
+ if (ovs_db_event_thread_stop(pdb) < 0) {
+ OVS_ERROR("stop poll thread failed");
+ ovs_db_ret = (-1);
+ }
+
+ /* stop event thread */
+ if (ovs_db_poll_thread_stop(pdb) < 0) {
+ OVS_ERROR("stop event thread failed");
+ ovs_db_ret = (-1);
+ }
+
+ /* unsubscribe callbacks */
+ ovs_db_callback_remove_all(pdb);
+
+ /* close connection */
+ if (pdb->conn.sock)
+ close(pdb->conn.sock);
+
+ /* release DB handler */
+ pthread_mutex_unlock(&pdb->mutex);
+ pthread_mutex_destroy(&pdb->mutex);
+ sfree(pdb);
+ return ovs_db_ret;
+}
+
+/*
+ * Public OVS utils API implementation
+ */
+
+/* Get YAJL value by key from YAJL dictionary */
+yajl_val
+ovs_utils_get_value_by_key(yajl_val jval, const char *key)
+{
+ const char *obj_key = NULL;
+
+ /* check params */
+ if (!YAJL_IS_OBJECT(jval) || !key)
+ return NULL;
+
+ /* find a value by key */
+ for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
+ obj_key = YAJL_GET_OBJECT(jval)->keys[i];
+ if (strcmp(obj_key, key) == 0)
+ return YAJL_GET_OBJECT(jval)->values[i];
+ }
+
+ return NULL;
+}
diff --git a/src/utils_ovs.h b/src/utils_ovs.h
--- /dev/null
+++ b/src/utils_ovs.h
@@ -0,0 +1,202 @@
+/**
+ * collectd - src/utils_ovs.h
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ * Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
+ *
+ * Description:
+ * The OVS util module provides the following features:
+ * - Implements the OVS DB communication transport specified
+ * by RFC7047:
+ * * Connect/disconnect to OVS DB;
+ * * Recovery mechanism in case of OVS DB connection lost;
+ * * Subscription mechanism to OVS DB table update events
+ * (insert/modify/delete);
+ * * Send custom JSON request to OVS DB (poll table data, etc.)
+ * * Handling of echo request from OVS DB server to verify the
+ * liveness of the connection.
+ * - Provides YAJL helpers functions.
+ *
+ * OVS DB API User Guide:
+ * All OVS DB function/structure names begins from 'ovs_db_*' prefix. To
+ * start using OVS DB API, client (plugin) should initialize the OVS DB
+ * object (`ovs_db_t') by calling `ovs_db_init' function. It initializes
+ * internal data and creates two main workers (threads). The result of the
+ * function is a pointer to new OVS DB object which can be used by other
+ * OVS DB API later and must be released by `ovs_db_destroy' function if
+ * the object isn't needed anymore.
+ * Once OVS DB API is initialized, the `init_cb' callback is called if
+ * the connection to OVS DB has been established. This callback is called
+ * every time the OVS DB is reconnected. So, if the client registers table
+ * update event callbacks or does any other OVS DB setup that can be lost
+ * after OVS DB reconnecting, it should be done in `init_cb' callback.
+ * The `ovs_db_table_cb_register` function is used to register OVS DB
+ * table update event callback and receive the table update notification
+ * when requested event occurs (registered callback is called). See
+ * function API for more info.
+ * To send custom JSON-RPC request to OVS DB, the `ovs_db_send_request'
+ * function is used. Please note, that connection to OVS DB should be
+ * established otherwise the function will return error.
+ * To verify the liveness of established connection, the OVS DB server
+ * sends echo request to the client with a given interval. The OVS utils
+ * takes care about this request and handles it properly.
+ **/
+
+#ifndef UTILS_OVS_H
+#define UTILS_OVS_H
+
+#include <yajl/yajl_tree.h>
+#include <yajl/yajl_gen.h>
+
+/* Forward declaration */
+typedef struct ovs_db_s ovs_db_t;
+
+/* OVS DB callback type declaration */
+typedef void (*ovs_db_init_cb_t) (ovs_db_t *pdb);
+typedef void (*ovs_db_table_cb_t) (yajl_val jupdates);
+typedef void (*ovs_db_result_cb_t) (yajl_val jresult, yajl_val jerror);
+
+/* OVS DB structures */
+struct ovs_db_callback_s {
+ ovs_db_init_cb_t init_cb;
+};
+typedef struct ovs_db_callback_s ovs_db_callback_t;
+
+/* OVS DB prototypes */
+
+/*
+ * NAME
+ * ovs_db_init
+ *
+ * DESCRIPTION
+ * Initialize OVS DB internal data. The `ovs_db_destroy' function
+ * shall destroy the returned object.
+ *
+ * PARAMETERS
+ * `surl' OVS DB communication URL.
+ * `cb' OVS DB callbacks.
+ *
+ * RETURN VALUE
+ * New ovs_db_t object upon success or NULL if an error occurred.
+ */
+ovs_db_t *ovs_db_init(const char *surl, ovs_db_callback_t *cb);
+
+/*
+ * NAME
+ * ovs_db_destroy
+ *
+ * DESCRIPTION
+ * Destroy OVS DB object referenced by `pdb'.
+ *
+ * PARAMETERS
+ * `pdb' Pointer to OVS DB object.
+ *
+ * RETURN VALUE
+ * Zero upon success or non-zero if an error occurred.
+ */
+int ovs_db_destroy(ovs_db_t *pdb);
+
+/*
+ * NAME
+ * ovs_db_send_request
+ *
+ * DESCRIPTION
+ * Send JSON request to OVS DB server.
+ *
+ * PARAMETERS
+ * `pdb' Pointer to OVS DB object.
+ * `method' Request method name.
+ * `params' Method params to be sent (JSON value as a string).
+ * `cb' Result callback of the request. If NULL, the request
+ * is sent asynchronously.
+ *
+ * RETURN VALUE
+ * Zero upon success or non-zero if an error occurred.
+ */
+int ovs_db_send_request(ovs_db_t *pdb, const char *method,
+ const char *params, ovs_db_result_cb_t cb);
+
+/* callback types */
+#define OVS_DB_TABLE_CB_FLAG_INITIAL 0x01U
+#define OVS_DB_TABLE_CB_FLAG_INSERT 0x02U
+#define OVS_DB_TABLE_CB_FLAG_DELETE 0x04U
+#define OVS_DB_TABLE_CB_FLAG_MODIFY 0x08U
+#define OVS_DB_TABLE_CB_FLAG_ALL 0x0FU
+
+/*
+ * NAME
+ * ovs_db_table_cb_register
+ *
+ * DESCRIPTION
+ * Subscribe a callback on OVS DB table event. It allows to
+ * receive notifications (`update_cb' callback is called) of
+ * changes to requested table.
+ *
+ * PARAMETERS
+ * `pdb' Pointer to OVS DB object.
+ * `tb_name' OVS DB Table name to be monitored.
+ * `tb_column' OVS DB Table columns to be monitored. Last
+ * element in the array should be NULL.
+ * `update_cb' Callback function that is called when
+ * requested table columns are changed.
+ * `cb' Result callback of the request. If NULL, the call
+ * becomes asynchronous.
+ * Useful, if OVS_DB_TABLE_CB_FLAG_INITIAL is set.
+ * `flags' Bit mask of:
+ * OVS_DB_TABLE_CB_FLAG_INITIAL Receive initial values in
+ * result callback.
+ * OVS_DB_TABLE_CB_FLAG_INSERT Receive table insert events.
+ * OVS_DB_TABLE_CB_FLAG_DELETE Receive table remove events.
+ * OVS_DB_TABLE_CB_FLAG_MODIFY Receive table update events.
+ * OVS_DB_TABLE_CB_FLAG_ALL Receive all events.
+ *
+ * RETURN VALUE
+ * Zero upon success or non-zero if an error occurred.
+ */
+int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
+ const char **tb_column,
+ ovs_db_table_cb_t update_cb,
+ ovs_db_result_cb_t result_cb,
+ unsigned int flags);
+
+/*
+ * OVS utils API
+ */
+
+/*
+ * NAME
+ * ovs_utils_get_value_by_key
+ *
+ * DESCRIPTION
+ * Get YAJL value by object name.
+ *
+ * PARAMETERS
+ * `jval' YAJL object value.
+ * `key' Object key name.
+ *
+ * RETURN VALUE
+ * YAJL value upon success or NULL if key not found.
+ */
+yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key);
+
+#endif