summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 02baaed)
raw | patch | inline | side by side (parent: 02baaed)
author | Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com> | |
Fri, 28 Oct 2016 10:18:17 +0000 (11:18 +0100) | ||
committer | Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com> | |
Mon, 26 Dec 2016 13:26:05 +0000 (13:26 +0000) |
- Change configuration format to suggested one;
- Fix init/destroy API;
- Fix memory leaks;
- Code-clean-up.
Change-Id: I1ff94271b777c69f3d07a66f43dc10d034e71101
Signed-off-by: Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
- Fix init/destroy API;
- Fix memory leaks;
- Code-clean-up.
Change-Id: I1ff94271b777c69f3d07a66f43dc10d034e71101
Signed-off-by: Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
diff --git a/src/collectd.conf.in b/src/collectd.conf.in
index 883a079f3afd21b60a55f2456d1d6fc7c6c325a3..e754fa8e6f95a8c5ffc8d38d8e381a8f21946d0f 100644 (file)
--- a/src/collectd.conf.in
+++ b/src/collectd.conf.in
#</Plugin>
#<Plugin ovs_events>
-# OvsDbAddress "127.0.0.1" "6640"
+# Port "6640"
+# Address "127.0.0.1"
+# Socket "/var/run/openvswitch/db.sock"
# Interfaces "br0" "veth0"
# SendNotification false
#</Plugin>
diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod
index 061c4baff945fdec3c2ca359a2b99c2854bcf075..210e1073e08413c3f687d57a435989c876f48f4e 100644 (file)
--- a/src/collectd.conf.pod
+++ b/src/collectd.conf.pod
B<Synopsis:>
<Plugin "ovs_events">
- OvsDbAddress "127.0.0.1" "6640"
+ Port "6640"
+ Address "127.0.0.1"
+ Socket "/var/run/openvswitch/db.sock"
Interfaces "br0" "veth0"
SendNotification false
</Plugin>
=over 4
-=item B<OvsDbAddress> I<node> I<service>
+=item B<Address> I<node>
-The address of OVS DB server JSON-RPC interface used by the plugin.
-To enable the interface, OVS DB daemon should be running with '--remote=ptcp:'
-or '--remote=punix:' option. See L<ovsdb-server(1)> for more details. The
-address arguments must take one of the following forms:
+The address of OVS DB server JSON-RPC interface used by the plugin. To enable
+the interface, OVS DB daemon should be running with '--remote=ptcp:' option.
+See L<ovsdb-server(1)> for more details. The option may be either network
+hostname, IPv4 numbers-and-dots notation or IPv6 hexadecimal string format.
+Defaults to 'localhost'.
-=over 4
-
-=item I<node>
-
-The I<node> argument of the address can be either network hostname, IPv4
-numbers-and-dots notation or IPv6 hexadecimal string format. In case of Unix
-domain socket, the "I<unix:>file" format should be used, where I<file> is
-the full name of OVS DB Unix domain socket.
+=item B<Port> I<service>
-=item I<service>
+TCP-port to connect to. Either a service name or a port number may be given.
+Please note that numerical port numbers must be given as a string. Defaults
+to "6640".
-The I<service> argument of the address specifies the service name used to
-connect to OVS DB. See L<services(5)> for more details. This argument is
-skipped if Unix domain address is used.
-
-=back
+=item B<Socket> I<path>
-Default: C<"localhost" "6640">
+The UNIX domain socket path of OVS DB server JSON-RPC interface used by the
+plugin. To enable the interface, the OVS DB daemon should be running with
+'--remote=punix:' option. See L<ovsdb-server(1)> for more details. If this
+option is set, B<Address> and B<Port> options are ignored.
=item B<Interfaces> [I<ifname> ...]
diff --git a/src/ovs_events.c b/src/ovs_events.c
index ce5254daf70b0f824195875d06577b30974c9e83..1f63a065a554988e9e2f784cfba6723bfe8f3ae2 100644 (file)
--- a/src/ovs_events.c
+++ b/src/ovs_events.c
_Bool send_notification; /* sent notification to collectd? */
char ovs_db_node[OVS_DB_ADDR_NODE_SIZE]; /* OVS DB node */
char ovs_db_serv[OVS_DB_ADDR_SERVICE_SIZE]; /* OVS DB service */
+ char ovs_db_unix[OVS_DB_ADDR_UNIX_SIZE]; /* OVS DB unix socket path */
ovs_events_iface_list_t *ifaces; /* interface info */
};
typedef struct ovs_events_config_s ovs_events_config_t;
.config = {.send_notification = 0, /* do not send notification */
.ovs_db_node = "localhost", /* use default OVS DB node */
.ovs_db_serv = "6640", /* use default OVS DB service */
+ .ovs_db_unix = "", /* UNIX path empty by default */
.ifaces = NULL},
.ovs_db_select_params = NULL,
.is_db_available = 0,
static char *ovs_events_get_select_params() {
int ret = 0;
size_t buff_size = 0;
- size_t offset = 0;
- char *buff = NULL;
- char *new_buff = NULL;
+ size_t buff_off = 0;
+ char *opt_buff = NULL;
const char params_fmt[] = "[\"Open_vSwitch\"%s]";
const char option_fmt[] = ",{\"op\":\"select\",\"table\":\"Interface\","
"\"where\":[[\"name\",\"==\",\"%s\"]],"
"\"external_ids\",\"name\",\"_uuid\"]}";
/* setup OVS DB interface condition */
for (ovs_events_iface_list_t *iface = ovs_events_ctx.config.ifaces; iface;
- iface = iface->next, offset += ret) {
+ iface = iface->next, buff_off += ret) {
/* allocate new buffer (format size + ifname len is good enough) */
buff_size += (sizeof(option_fmt) + strlen(iface->name));
- new_buff = realloc(buff, buff_size);
- if (new_buff == NULL)
- goto failure;
- buff = new_buff;
- ret = ssnprintf(buff + offset, buff_size, option_fmt, iface->name);
- if (ret < 0)
- goto failure;
+ char *new_buff = realloc(opt_buff, buff_size);
+ if (new_buff == NULL) {
+ sfree(opt_buff);
+ return NULL;
+ }
+ opt_buff = new_buff;
+ ret = ssnprintf(opt_buff + buff_off, buff_size - buff_off, option_fmt,
+ iface->name);
+ if (ret < 0) {
+ sfree(opt_buff);
+ return NULL;
+ }
}
/* if no interfaces are configured, use default params */
- if (buff == NULL) {
- buff = strdup(default_opt);
- offset = strlen(default_opt);
- }
+ if (opt_buff == NULL)
+ opt_buff = strdup(default_opt);
/* allocate memory for OVS DB select params */
- buff_size = offset + sizeof(params_fmt);
- new_buff = malloc(buff_size);
- if (new_buff == NULL)
- goto failure;
+ size_t params_size = sizeof(params_fmt) + strlen(opt_buff);
+ char *params_buff = malloc(params_size);
+ if (params_buff == NULL) {
+ sfree(opt_buff);
+ return NULL;
+ }
/* create OVS DB select params */
- if (ssnprintf(new_buff, buff_size, params_fmt, buff) < 0)
- goto failure;
+ if (ssnprintf(params_buff, params_size, params_fmt, opt_buff) < 0)
+ sfree(params_buff);
- sfree(buff);
- return new_buff;
-
-failure:
- sfree(new_buff);
- sfree(buff);
- return NULL;
+ sfree(opt_buff);
+ return params_buff;
}
/* Release memory allocated for configuration data */
for (int i = 0; i < ci->children_num; i++) {
oconfig_item_t *child = ci->children + i;
if (strcasecmp("SendNotification", child->key) == 0) {
- if (cf_util_get_boolean(child, &ovs_events_ctx.config.send_notification) <
- 0)
+ if (cf_util_get_boolean(child,
+ &ovs_events_ctx.config.send_notification) != 0)
+ OVS_EVENTS_CONFIG_ERROR(child->key);
+ } else if (strcasecmp("Address", child->key) == 0) {
+ if (cf_util_get_string_buffer(
+ child, ovs_events_ctx.config.ovs_db_node,
+ sizeof(ovs_events_ctx.config.ovs_db_node)) != 0)
+ OVS_EVENTS_CONFIG_ERROR(child->key);
+ } else if (strcasecmp("Port", child->key) == 0) {
+ if (cf_util_get_string_buffer(
+ child, ovs_events_ctx.config.ovs_db_serv,
+ sizeof(ovs_events_ctx.config.ovs_db_serv)) != 0)
+ OVS_EVENTS_CONFIG_ERROR(child->key);
+ } else if (strcasecmp("Socket", child->key) == 0) {
+ if (cf_util_get_string_buffer(
+ child, ovs_events_ctx.config.ovs_db_unix,
+ sizeof(ovs_events_ctx.config.ovs_db_unix)) != 0)
OVS_EVENTS_CONFIG_ERROR(child->key);
- } else if (strcasecmp("OvsDbAddress", child->key) == 0) {
- if (child->values_num < 1) {
- ERROR(OVS_EVENTS_PLUGIN ": invalid OVS DB address specified");
- goto failure;
- }
- /* check node type and get the value */
- if (child->values[0].type != OCONFIG_TYPE_STRING) {
- ERROR(OVS_EVENTS_PLUGIN ": OVS DB node is not a string");
- goto failure;
- }
- sstrncpy(ovs_events_ctx.config.ovs_db_node, child->values[0].value.string,
- sizeof(ovs_events_ctx.config.ovs_db_node));
- /* get OVS DB address service name (optional) */
- if (child->values_num > 1) {
- if (child->values[1].type != OCONFIG_TYPE_STRING) {
- ERROR(OVS_EVENTS_PLUGIN ": OVS DB service is not a string");
- goto failure;
- }
- sstrncpy(ovs_events_ctx.config.ovs_db_serv,
- child->values[1].value.string,
- sizeof(ovs_events_ctx.config.ovs_db_serv));
- }
} else if (strcasecmp("Interfaces", child->key) == 0) {
for (int j = 0; j < child->values_num; j++) {
/* check value type */
}
}
-/* OVD DB reply callback. It parses reply, receives
+/* OVS DB reply callback. It parses reply, receives
* interface information and dispatches the info to
- * collecd
+ * collectd
*/
static void ovs_events_poll_result_cb(yajl_val jresult, yajl_val jerror) {
yajl_val *jvalues = NULL;
}
}
OVS_EVENTS_CTX_LOCK { ovs_events_ctx.is_db_available = 1; }
- DEBUG(OVS_EVENTS_PLUGIN ": OVS DB has been initialized");
+ DEBUG(OVS_EVENTS_PLUGIN ": OVS DB connection has been initialized");
}
/* OVS DB terminate connection notification callback */
ovs_db_callback_t cb = {.post_conn_init = ovs_events_conn_initialize,
.post_conn_terminate = ovs_events_conn_terminate};
- DEBUG(OVS_EVENTS_PLUGIN ": OVS DB node = %s, service=%s",
- ovs_events_ctx.config.ovs_db_node, ovs_events_ctx.config.ovs_db_serv);
+ DEBUG(OVS_EVENTS_PLUGIN ": OVS DB address=%s, service=%s, unix=%s",
+ ovs_events_ctx.config.ovs_db_node, ovs_events_ctx.config.ovs_db_serv,
+ ovs_events_ctx.config.ovs_db_unix);
/* generate OVS DB select condition based on list on configured interfaces */
ovs_events_ctx.ovs_db_select_params = ovs_events_get_select_params();
/* initialize OVS DB */
ovs_db = ovs_db_init(ovs_events_ctx.config.ovs_db_node,
- ovs_events_ctx.config.ovs_db_serv, &cb);
+ ovs_events_ctx.config.ovs_db_serv,
+ ovs_events_ctx.config.ovs_db_unix, &cb);
if (ovs_db == NULL) {
ERROR(OVS_EVENTS_PLUGIN ": fail to connect to OVS DB server");
goto ovs_events_failure;
diff --git a/src/utils_ovs.c b/src/utils_ovs.c
index 2a4bdf8339ed49c2cec0a65f9755a836aeecd550..2b14849007c9c8a998db9d6836b979d59556e9b2 100644 (file)
--- a/src/utils_ovs.c
+++ b/src/utils_ovs.c
#define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */
#define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
#define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
-#define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */
#define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
#define OVS_DB_EVENT_TERMINATE 1
ovs_db_callback_t cb;
char service[OVS_DB_ADDR_SERVICE_SIZE];
char node[OVS_DB_ADDR_NODE_SIZE];
+ char unix_path[OVS_DB_ADDR_NODE_SIZE];
int sock;
};
+/* Global variables */
+static uint64_t ovs_uid = 0;
+static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
+
/* Post an event to event thread.
* Possible events are:
* OVS_DB_EVENT_TERMINATE
return (state == OVS_DB_POLL_STATE_RUNNING);
}
-/* Terminate POLL thread */
-static void ovs_db_poll_terminate(ovs_db_t *pdb) {
- pthread_mutex_lock(&pdb->poll_thread.mutex);
- pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
- pthread_mutex_unlock(&pdb->poll_thread.mutex);
-}
-
/* Generate unique identifier (UID). It is used by OVS DB API
* to set "id" field for any OVS DB JSON request. */
static uint64_t ovs_uid_generate() {
- struct timespec ts;
- clock_gettime(CLOCK_MONOTONIC, &ts);
- return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
+ uint64_t new_uid;
+ pthread_mutex_lock(&ovs_uid_mutex);
+ new_uid = ++ovs_uid;
+ pthread_mutex_unlock(&ovs_uid_mutex);
+ return new_uid;
}
/*
/* Remove callback from OVS DB object */
static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
+ pthread_mutex_lock(&pdb->mutex);
ovs_callback_t *pre_cb = del_cb->prev;
ovs_callback_t *next_cb = del_cb->next;
- pthread_mutex_lock(&pdb->mutex);
if (next_cb)
next_cb->prev = del_cb->prev;
* to requested callback otherwise NULL is returned.
*
* IMPORTANT NOTE:
- * The OVS DB mutex should be locked by the caller
+ * The OVS DB mutex MUST be locked by the caller
* to make sure that returned callback is still valid.
*/
static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
/* check & get request attributes */
if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
- (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
- goto ovs_failure;
+ (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
+ OVS_ERROR("invalid OVS DB request received");
+ return (-1);
+ }
/* check array length: [<json-value>, <table-updates>] */
- if (YAJL_GET_ARRAY(jparams)->len != 2)
- goto ovs_failure;
+ if ((YAJL_GET_ARRAY(jparams) == NULL) ||
+ (YAJL_GET_ARRAY(jparams)->len != 2)) {
+ OVS_ERROR("invalid OVS DB request received");
+ return (-1);
+ }
jvalue = YAJL_GET_ARRAY(jparams)->values[0];
jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
- if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
- goto ovs_failure;
+ if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
+ OVS_ERROR("invalid OVS DB request id or table update received");
+ return (-1);
+ }
/* find registered callback based on <json-value> */
pthread_mutex_lock(&pdb->mutex);
cb = ovs_db_table_callback_get(pdb, jvalue);
if (cb == NULL || cb->table.call == NULL) {
+ OVS_ERROR("No OVS DB table update callback found");
pthread_mutex_unlock(&pdb->mutex);
- goto ovs_failure;
+ return (-1);
}
/* call registered callback */
cb->table.call(jtable_updates);
pthread_mutex_unlock(&pdb->mutex);
return 0;
-
-ovs_failure:
- OVS_ERROR("invalid OVS DB table update event");
- return (-1);
}
/* OVS DB result request handler.
/* duplicate the data to make null-terminated string
* required for yajl_tree_parse() */
- if ((sjson = malloc(len + 1)) == NULL)
+ if ((sjson = calloc(1, len + 1)) == NULL)
return (-1);
sstrncpy(sjson, data, len + 1);
}
}
-/* Reconnect to OVD DB and call the OVS DB post connection init callback
+/* Reconnect to OVS DB and call the OVS DB post connection init callback
* if connection has been established.
*/
-static int ovs_db_reconnect(ovs_db_t *pdb) {
- char errbuff[OVS_ERROR_BUFF_SIZE];
+static void ovs_db_reconnect(ovs_db_t *pdb) {
const char unix_prefix[] = "unix:";
- struct addrinfo *result, *rp;
- _Bool is_connected = 0;
+ const char *node_info = pdb->node;
+ struct addrinfo *result;
struct sockaddr_un saunix;
- /* remove all registered OVS DB table/result callbacks */
- ovs_db_callback_remove_all(pdb);
-
- if (strncmp(pdb->node, unix_prefix, strlen(unix_prefix)) == 0) {
- /* create unix socket address */
- rp = calloc(1, sizeof(struct addrinfo));
+ if (pdb->unix_path[0] != '\0') {
+ /* use UNIX socket instead of INET address */
+ node_info = pdb->unix_path;
+ result = calloc(1, sizeof(struct addrinfo));
struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
- if (rp == NULL || sa_unix == NULL) {
- sfree(rp);
+ if (result == NULL || sa_unix == NULL) {
+ sfree(result);
sfree(sa_unix);
- return (1);
+ return;
}
- rp->ai_family = AF_UNIX;
- rp->ai_socktype = SOCK_STREAM;
- rp->ai_addrlen = sizeof(*sa_unix);
- rp->ai_addr = (struct sockaddr *)sa_unix;
- sa_unix->sun_family = rp->ai_family;
- sstrncpy(sa_unix->sun_path, (pdb->node + strlen(unix_prefix)),
- sizeof(sa_unix->sun_path));
- result = rp;
+ result->ai_family = AF_UNIX;
+ result->ai_socktype = SOCK_STREAM;
+ result->ai_addrlen = sizeof(*sa_unix);
+ result->ai_addr = (struct sockaddr *)sa_unix;
+ sa_unix->sun_family = result->ai_family;
+ sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
} else {
- /* intet socket address */
- int ret = 0;
+ /* inet socket address */
struct addrinfo hints;
/* setup criteria for selecting the socket address */
hints.ai_socktype = SOCK_STREAM;
/* get socket addresses */
- if ((ret = getaddrinfo(pdb->node, pdb->service, &hints, &result)) != 0) {
+ int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
+ if (ret != 0) {
OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
- return (1);
+ return;
}
}
/* try to connect to the server */
- for (rp = result; rp != NULL; rp = rp->ai_next) {
- if ((pdb->sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol)) <
- 0) {
+ for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
+ char errbuff[OVS_ERROR_BUFF_SIZE];
+ int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sock < 0) {
sstrerror(errno, errbuff, sizeof(errbuff));
OVS_DEBUG("socket(): %s", errbuff);
continue;
}
- if (connect(pdb->sock, rp->ai_addr, rp->ai_addrlen) < 0) {
+ if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
+ close(sock);
sstrerror(errno, errbuff, sizeof(errbuff));
OVS_DEBUG("connect(): %s [family=%d]", errbuff, rp->ai_family);
- close(pdb->sock);
} else {
- is_connected = 1;
+ /* send notification to event thread */
+ ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
+ pdb->sock = sock;
break;
}
}
- /* send notification to event thread */
- if (is_connected)
- ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
- else
- OVS_ERROR("connect to \"%s\" failed", pdb->node);
+ if (pdb->sock < 0)
+ OVS_ERROR("connect to \"%s\" failed", node_info);
freeaddrinfo(result);
- return !is_connected;
}
/* POLL worker thread.
static void *ovs_poll_worker(void *arg) {
ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
ovs_json_reader_t *jreader = NULL;
- const char *json;
- size_t json_len;
- ssize_t nbytes = 0;
- char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
- struct pollfd poll_fd;
- int poll_ret = 0;
+ struct pollfd poll_fd = {
+ .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0,
+ };
+ /* create JSON reader instance */
if ((jreader = ovs_json_reader_alloc()) == NULL) {
OVS_ERROR("initialize json reader failed");
- goto thread_exit;
+ return (NULL);
}
- /* start polling data */
- poll_fd.fd = pdb->sock;
- poll_fd.events = POLLIN | POLLPRI;
- poll_fd.revents = 0;
-
/* poll data */
while (ovs_db_poll_is_running(pdb)) {
- poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
- if (poll_ret > 0) {
- if (poll_fd.revents & POLLNVAL) {
- /* invalid file descriptor, reconnect */
- if (ovs_db_reconnect(pdb) != 0) {
- /* sleep awhile until next reconnect */
- usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
- }
- ovs_json_reader_reset(jreader);
- poll_fd.fd = pdb->sock;
- } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
- /* connection is broken */
- OVS_ERROR("poll() peer closed its end of the channel");
- ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
- close(poll_fd.fd);
- } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
- /* read incoming data */
- nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
- if (nbytes > 0) {
- OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
- ovs_json_reader_push_data(jreader, buff, nbytes);
- while (!ovs_json_reader_pop(jreader, &json, &json_len))
- /* process JSON data */
- ovs_db_json_data_process(pdb, json, json_len);
- } else if (nbytes == 0) {
- OVS_ERROR("recv() peer has performed an orderly shutdown");
- ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
- close(poll_fd.fd);
- } else {
- OVS_ERROR("recv() receive data error");
- break;
- }
- } /* poll() POLLIN & POLLPRI */
- } else if (poll_ret == 0)
- OVS_DEBUG("poll() timeout");
- else {
- OVS_ERROR("poll() error");
+ char errbuff[OVS_ERROR_BUFF_SIZE];
+ poll_fd.fd = pdb->sock;
+ int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
+ if (poll_ret < 0) {
+ sstrerror(errno, errbuff, sizeof(errbuff));
+ OVS_ERROR("poll(): %s", errbuff);
break;
+ } else if (poll_ret == 0) {
+ OVS_DEBUG("poll(): timeout");
+ if (pdb->sock < 0)
+ /* invalid fd, so try to reconnect */
+ ovs_db_reconnect(pdb);
+ continue;
+ }
+ if (poll_fd.revents & POLLNVAL) {
+ /* invalid file descriptor, clean-up */
+ ovs_db_callback_remove_all(pdb);
+ ovs_json_reader_reset(jreader);
+ /* setting poll FD to -1 tells poll() call to ignore this FD.
+ * In that case poll() call will return timeout all the time */
+ pdb->sock = (-1);
+ } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
+ /* connection is broken */
+ close(poll_fd.fd);
+ ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
+ OVS_ERROR("poll() peer closed its end of the channel");
+ } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
+ /* read incoming data */
+ char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
+ ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
+ if (nbytes < 0) {
+ sstrerror(errno, errbuff, sizeof(errbuff));
+ OVS_ERROR("recv(): %s", errbuff);
+ /* read error? Try to reconnect */
+ close(poll_fd.fd);
+ continue;
+ } else if (nbytes == 0) {
+ close(poll_fd.fd);
+ ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
+ OVS_ERROR("recv() peer has performed an orderly shutdown");
+ continue;
+ }
+ /* read incoming data */
+ size_t json_len = 0;
+ const char *json = NULL;
+ OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
+ ovs_json_reader_push_data(jreader, buff, nbytes);
+ while (!ovs_json_reader_pop(jreader, &json, &json_len))
+ /* process JSON data */
+ ovs_db_json_data_process(pdb, json, json_len);
}
}
-thread_exit:
OVS_DEBUG("poll thread has been completed");
ovs_json_reader_free(jreader);
- pthread_exit((void *)0);
- return ((void *)0);
+ return (NULL);
}
/* EVENT worker thread.
* Perform task based on incoming events. This
* task can be done asynchronously which allows to
- * handle OVD DB callback like 'init_cb'.
+ * handle OVS DB callback like 'init_cb'.
*/
static void *ovs_event_worker(void *arg) {
- int ret = 0;
ovs_db_t *pdb = (ovs_db_t *)arg;
- struct timespec ts;
while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
/* wait for an event */
+ struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
- ret = pthread_cond_timedwait(&pdb->event_thread.cond,
- &pdb->event_thread.mutex, &ts);
+ int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
+ &pdb->event_thread.mutex, &ts);
if (!ret) {
/* handle the event */
OVS_DEBUG("handle event %d", pdb->event_thread.value);
}
}
-thread_exit:
OVS_DEBUG("event thread has been completed");
- pthread_exit((void *)0);
- return ((void *)0);
+ return (NULL);
+}
+
+/* Initialize EVENT thread */
+static int ovs_db_event_thread_init(ovs_db_t *pdb) {
+ pdb->event_thread.tid = -1;
+ /* init event thread condition variable */
+ if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
+ return (-1);
+ }
+ /* init event thread mutex */
+ if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
+ pthread_cond_destroy(&pdb->event_thread.cond);
+ return (-1);
+ }
+ /* Hold the event thread mutex. It ensures that no events
+ * will be lost while thread is still starting. Once event
+ * thread is started and ready to accept events, it will release
+ * the mutex */
+ if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
+ pthread_mutex_destroy(&pdb->event_thread.mutex);
+ pthread_cond_destroy(&pdb->event_thread.cond);
+ return (-1);
+ }
+ /* start event thread */
+ pthread_t tid;
+ if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb) != 0) {
+ pthread_mutex_unlock(&pdb->event_thread.mutex);
+ pthread_mutex_destroy(&pdb->event_thread.mutex);
+ pthread_cond_destroy(&pdb->event_thread.cond);
+ return (-1);
+ }
+ pdb->event_thread.tid = tid;
+ return (0);
}
-/* Stop EVENT thread */
-static int ovs_db_event_thread_stop(ovs_db_t *pdb) {
+/* Destroy EVENT thread */
+static int ovs_db_event_thread_destroy(ovs_db_t *pdb) {
+ if (pdb->event_thread.tid < 0)
+ /* already destroyed */
+ return (0);
ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
if (pthread_join(pdb->event_thread.tid, NULL) != 0)
return (-1);
+ /* Event thread always holds the thread mutex when
+ * performs some task (handles event) and releases it when
+ * while sleeping. Thus, if event thread exits, the mutex
+ * remains locked */
pthread_mutex_unlock(&pdb->event_thread.mutex);
pthread_mutex_destroy(&pdb->event_thread.mutex);
+ pthread_cond_destroy(&pdb->event_thread.cond);
+ pdb->event_thread.tid = -1;
+ return (0);
+}
+
+/* Initialize POLL thread */
+static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
+ pdb->poll_thread.tid = -1;
+ /* init event thread mutex */
+ if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
+ return (-1);
+ }
+ /* start poll thread */
+ pthread_t tid;
+ pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
+ if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb) != 0) {
+ pthread_mutex_destroy(&pdb->poll_thread.mutex);
+ return (-1);
+ }
+ pdb->poll_thread.tid = tid;
return (0);
}
-/* Stop POLL thread */
-static int ovs_db_poll_thread_stop(ovs_db_t *pdb) {
- ovs_db_poll_terminate(pdb);
+/* Destroy POLL thread */
+static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
+ if (pdb->poll_thread.tid < 0)
+ /* already destroyed */
+ return (0);
+ /* change thread state */
+ pthread_mutex_lock(&pdb->poll_thread.mutex);
+ pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
+ pthread_mutex_unlock(&pdb->poll_thread.mutex);
+ /* join the thread */
if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
return (-1);
pthread_mutex_destroy(&pdb->poll_thread.mutex);
+ pdb->poll_thread.tid = -1;
return (0);
}
*/
ovs_db_t *ovs_db_init(const char *node, const char *service,
- ovs_db_callback_t *cb) {
- pthread_mutexattr_t mutex_attr;
- ovs_db_t *pdb = NULL;
-
- /* allocate db data & fill it */
- if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
+ const char *unix_path, ovs_db_callback_t *cb) {
+ /* sanity check */
+ if (node == NULL || service == NULL || unix_path == NULL)
return (NULL);
- /* node cannot be unset */
- if (node == NULL || strlen(node) == 0)
+ /* allocate db data & fill it */
+ ovs_db_t *pdb = pdb = calloc(1, sizeof(*pdb));
+ if (pdb == NULL)
return (NULL);
/* store the OVS DB address */
sstrncpy(pdb->node, node, sizeof(pdb->node));
- if (service != NULL)
- sstrncpy(pdb->service, service, sizeof(pdb->service));
+ sstrncpy(pdb->service, service, sizeof(pdb->service));
+ sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
/* setup OVS DB callbacks */
if (cb)
pdb->cb = *cb;
- /* prepare event thread */
- pthread_cond_init(&pdb->event_thread.cond, NULL);
- pthread_mutex_init(&pdb->event_thread.mutex, NULL);
- pthread_mutex_lock(&pdb->event_thread.mutex);
- if (plugin_thread_create(&pdb->event_thread.tid, NULL, ovs_event_worker,
- pdb) != 0) {
- OVS_ERROR("event worker start failed");
- goto failure;
+ /* init OVS DB mutex attributes */
+ pthread_mutexattr_t mutex_attr;
+ if (pthread_mutexattr_init(&mutex_attr)) {
+ OVS_ERROR("OVS DB mutex attribute init failed");
+ sfree(pdb);
+ return (NULL);
}
-
- /* prepare polling thread */
- ovs_db_reconnect(pdb);
- pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
- pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
- if (plugin_thread_create(&pdb->poll_thread.tid, NULL, ovs_poll_worker, pdb) !=
- 0) {
- OVS_ERROR("pull worker start failed");
- goto failure;
+ /* set OVS DB mutex as recursive */
+ if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
+ OVS_ERROR("Failed to set OVS DB mutex as recursive");
+ pthread_mutexattr_destroy(&mutex_attr);
+ sfree(pdb);
+ return (NULL);
}
-
/* init OVS DB mutex */
- if (pthread_mutexattr_init(&mutex_attr) ||
- pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
- pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
+ if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
OVS_ERROR("OVS DB mutex init failed");
- goto failure;
+ pthread_mutexattr_destroy(&mutex_attr);
+ sfree(pdb);
+ return (NULL);
}
+ /* destroy mutex attributes */
+ pthread_mutexattr_destroy(&mutex_attr);
- /* return db to the caller */
- return pdb;
+ /* init event thread */
+ if (ovs_db_event_thread_init(pdb) < 0) {
+ ovs_db_destroy(pdb);
+ return (NULL);
+ }
-failure:
- if (pdb->sock)
- /* close connection */
- close(pdb->sock);
- if (pdb->event_thread.tid != 0)
- /* stop event thread */
- if (ovs_db_event_thread_stop(pdb) < 0)
- OVS_ERROR("stop event thread failed");
- if (pdb->poll_thread.tid != 0)
- /* stop poll thread */
- if (ovs_db_poll_thread_stop(pdb) < 0)
- OVS_ERROR("stop poll thread failed");
- sfree(pdb);
- return NULL;
+ /* init polling thread */
+ pdb->sock = -1;
+ if (ovs_db_poll_thread_init(pdb) < 0) {
+ ovs_db_destroy(pdb);
+ return (NULL);
+ }
+ return pdb;
}
int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
@@ -1038,7 +1093,7 @@ int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
if (cb) {
/* register result callback */
- if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
+ if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
goto yajl_gen_failure;
/* add new callback to front */
if (pdb == NULL || tb_name == NULL || update_cb == NULL)
return (-1);
- if ((jgen = yajl_gen_alloc(NULL)) == NULL)
+ /* allocate new update callback */
+ if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
return (-1);
- /* register table update callback */
- if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
+ /* init YAJL generator */
+ if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
+ sfree(new_cb);
return (-1);
+ }
/* add new callback to front */
new_cb->table.call = update_cb;
/* try to lock the structure before releasing */
if ((ret = pthread_mutex_lock(&pdb->mutex))) {
- OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
+ OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
return (-1);
}
/* stop poll thread */
- if (ovs_db_event_thread_stop(pdb) < 0) {
- OVS_ERROR("stop poll thread failed");
+ if (ovs_db_event_thread_destroy(pdb) < 0) {
+ OVS_ERROR("destroy poll thread failed");
ovs_db_ret = (-1);
}
/* stop event thread */
- if (ovs_db_poll_thread_stop(pdb) < 0) {
+ if (ovs_db_poll_thread_destroy(pdb) < 0) {
OVS_ERROR("stop event thread failed");
ovs_db_ret = (-1);
}
ovs_db_callback_remove_all(pdb);
/* close connection */
- if (pdb->sock)
+ if (pdb->sock >= 0)
close(pdb->sock);
/* release DB handler */
* Public OVS utils API implementation
*/
-/* Get YAJL value by key from YAJL dictionary */
+/* Get YAJL value by key from YAJL dictionary
+ *
+ * EXAMPLE:
+ * {
+ * "key_a" : <YAJL return value>
+ * "key_b" : <YAJL return value>
+ * }
+ */
yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
const char *obj_key = NULL;
return NULL;
}
-/* Get OVS DB map value by given map key */
+/* Get OVS DB map value by given map key
+ *
+ * FROM RFC7047:
+ *
+ * <pair>
+ * A 2-element JSON array that represents a pair within a database
+ * map. The first element is an <atom> that represents the key, and
+ * the second element is an <atom> that represents the value.
+ *
+ * <map>
+ * A 2-element JSON array that represents a database map value. The
+ * first element of the array must be the string "map", and the
+ * second element must be an array of zero or more <pair>s giving the
+ * values in the map. All of the <pair>s must have the same key and
+ * value types.
+ *
+ * EXAMPLE:
+ * [
+ * "map", [
+ * [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
+ * ]
+ * ]
+ */
yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
size_t map_len = 0;
size_t array_len = 0;
diff --git a/src/utils_ovs.h b/src/utils_ovs.h
index e90bda31090dd63d3167bf93be1a1092cedbe5ad..52c2f915ce62f55e65d6e001638cee2b5740048d 100644 (file)
--- a/src/utils_ovs.h
+++ b/src/utils_ovs.h
*/
void (*post_conn_init)(ovs_db_t *pdb);
/*
- * This callback is called when OVD DB connection
+ * This callback is called when OVS DB connection
* has been lost. This field can be NULL.
*/
void (*post_conn_terminate)(void);
/* OVS DB defines */
#define OVS_DB_ADDR_NODE_SIZE 256
#define OVS_DB_ADDR_SERVICE_SIZE 128
+#define OVS_DB_ADDR_UNIX_SIZE 108
/* OVS DB prototypes */
* PARAMETERS
* `node' OVS DB Address.
* `service' OVS DB service name.
+ * `unix' OVS DB unix socket path.
* `cb' OVS DB callbacks.
*
* RETURN VALUE
* New ovs_db_t object upon success or NULL if an error occurred.
*/
ovs_db_t *ovs_db_init(const char *node, const char *service,
- ovs_db_callback_t *cb);
+ const char *unix_path, ovs_db_callback_t *cb);
/*
* NAME