From f8924f5ff50601d08b528903e5f5fabce9df9f46 Mon Sep 17 00:00:00 2001 From: "Mytnyk, VolodymyrX" Date: Tue, 20 Sep 2016 13:36:08 +0100 Subject: [PATCH] OVS link plugin: Add connection terminate callback Extend OVS utils to notify OVS plugin about OVS DB connection lost. If the connection is lost, OVS link plugin will dispatch notification and print error message. OVS plugin treats status of all interfaces as UNKNOWN. Change-Id: I22fe3cb0740e0f4779a5c4f6b92e78f1ad9777a3 Signed-off-by: Mytnyk, VolodymyrX --- src/collectd.conf.pod | 4 +-- src/ovs_link.c | 68 ++++++++++++++++++++++++++++++++++++++----- src/utils_ovs.c | 51 ++++++++++++++++++++------------ src/utils_ovs.h | 15 ++++++++-- 4 files changed, 108 insertions(+), 30 deletions(-) diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 3c8af40a..3d69dcf6 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -5505,8 +5505,8 @@ Default: empty (all interfaces on all bridges are monitored) =item B I -If set to true, OVS interface link status notification is sent to collectd. -Default value is false. +If set to true, OVS link notifications (interface status and OVS DB connection +terminate) are sent to collectd. Default value is false. =back diff --git a/src/ovs_link.c b/src/ovs_link.c index e298ae9a..1faa8ea0 100644 --- a/src/ovs_link.c +++ b/src/ovs_link.c @@ -58,6 +58,7 @@ typedef struct ovs_link_config_s ovs_link_config_t; /* OVS link context type */ struct ovs_link_ctx_s { pthread_mutex_t mutex; /* mutex to lock the context */ + pthread_mutexattr_t mutex_attr; /* context mutex attribute */ ovs_db_t *ovs_db; /* pointer to OVS DB instance */ ovs_link_config_t config; /* plugin config */ ovs_link_interface_info_t *ifaces; /* interface info */ @@ -245,7 +246,7 @@ ovs_link_link_status_submit(const char *link_name, values[0].gauge = (gauge_t) link_status; vl.time = cdtime(); vl.values = values; - vl.values_len = sizeof(values) / sizeof(values[0]); + vl.values_len = STATIC_ARRAY_SIZE(values); sstrncpy(vl.host, hostname_g, sizeof(vl.host)); sstrncpy(vl.plugin, OVS_LINK_PLUGIN, sizeof(vl.plugin)); sstrncpy(vl.plugin_instance, link_name, sizeof(vl.plugin_instance)); @@ -254,6 +255,17 @@ ovs_link_link_status_submit(const char *link_name, plugin_dispatch_values(&vl); } +/* Dispatch OVS DB terminate connection event to collectd */ +static void +ovs_link_dispatch_terminate_notification(const char *msg) +{ + notification_t n = {NOTIF_FAILURE, cdtime(), "", "", OVS_LINK_PLUGIN, + "", "", "", NULL}; + ssnprintf(n.message, sizeof(n.message), msg); + sstrncpy(n.host, hostname_g, sizeof(n.host)); + plugin_dispatch_notification(&n); +} + /* Process OVS DB update table event. It handles link status update event(s) * and dispatches the value(s) to collectd if interface name matches one of * interfaces specified in configuration file. @@ -348,10 +360,10 @@ ovs_link_table_result_cb(yajl_val jresult, yajl_val jerror) } /* Setup OVS DB table callback. It subscribes to OVS DB 'Interface' table - * to receive link status events. + * to receive link status event(s). */ static void -ovs_link_initialize(ovs_db_t *pdb) +ovs_link_conn_initialize(ovs_db_t *pdb) { int ret = 0; const char tb_name[] = "Interface"; @@ -371,6 +383,22 @@ ovs_link_initialize(ovs_db_t *pdb) DEBUG(OVS_LINK_PLUGIN ": OVS DB has been initialized"); } +/* OVS DB terminate connection notification callback */ +static void +ovs_link_conn_terminate() +{ + const char msg[] = "OVS DB connection has been lost"; + if (ovs_link_ctx.config.send_notification) + ovs_link_dispatch_terminate_notification(msg); + WARNIG(OVS_LINK_PLUGIN ": %s", msg); + OVS_LINK_CTX_LOCK { + /* update link status to UNKNOWN */ + for (ovs_link_interface_info_t *iface = ovs_link_ctx.ifaces; iface; + iface = iface->next) + ovs_link_link_status_update(iface->name, UNKNOWN); + } +} + /* Read OVS link status plugin callback */ static int ovs_link_plugin_read(user_data_t *ud) @@ -390,15 +418,27 @@ static int ovs_link_plugin_init(void) { ovs_db_t *ovs_db = NULL; - ovs_db_callback_t cb = {.init_cb = ovs_link_initialize}; + ovs_db_callback_t cb = {.post_conn_init = ovs_link_conn_initialize, + .post_conn_terminate = ovs_link_conn_terminate}; + + /* Initialize the context mutex */ + if (pthread_mutexattr_init(&ovs_link_ctx.mutex_attr) != 0) { + ERROR(OVS_LINK_PLUGIN ": init context mutex attribute failed"); + return (-1); + } + pthread_mutexattr_settype(&ovs_link_ctx.mutex_attr, + PTHREAD_MUTEX_RECURSIVE); + if (pthread_mutex_init(&ovs_link_ctx.mutex, &ovs_link_ctx.mutex_attr) != 0) { + ERROR(OVS_LINK_PLUGIN ": init context mutex failed"); + goto ovs_link_failure; + } /* set default OVS DB url */ if (ovs_link_ctx.config.ovs_db_server_url == NULL) if ((ovs_link_ctx.config.ovs_db_server_url = strdup(OVS_LINK_DEFAULT_OVS_DB_SERVER_URL)) == NULL) { ERROR(OVS_LINK_PLUGIN ": fail to set default OVS DB URL"); - ovs_link_config_free(); - return (-1); + goto ovs_link_failure; } DEBUG(OVS_LINK_PLUGIN ": OVS DB url = %s", ovs_link_ctx.config.ovs_db_server_url); @@ -407,8 +447,7 @@ ovs_link_plugin_init(void) ovs_db = ovs_db_init(ovs_link_ctx.config.ovs_db_server_url, &cb); if (ovs_db == NULL) { ERROR(OVS_LINK_PLUGIN ": fail to connect to OVS DB server"); - ovs_link_config_free(); - return (-1); + goto ovs_link_failure; } /* store OVS DB handler */ @@ -418,6 +457,15 @@ ovs_link_plugin_init(void) DEBUG(OVS_LINK_PLUGIN ": plugin has been initialized"); return (0); + +ovs_link_failure: + ERROR(OVS_LINK_PLUGIN ": plugin initialize failed"); + /* release allocated memory */ + ovs_link_config_free(); + /* destroy context mutex */ + pthread_mutexattr_destroy(&ovs_link_ctx.mutex_attr); + pthread_mutex_destroy(&ovs_link_ctx.mutex); + return (-1); } /* Shutdown OVS plugin */ @@ -431,6 +479,10 @@ ovs_link_plugin_shutdown(void) if (ovs_db_destroy(ovs_link_ctx.ovs_db)) ERROR(OVS_LINK_PLUGIN ": OVSDB object destroy failed"); + /* destroy context mutex */ + pthread_mutexattr_destroy(&ovs_link_ctx.mutex_attr); + pthread_mutex_destroy(&ovs_link_ctx.mutex); + DEBUG(OVS_LINK_PLUGIN ": plugin has been destroyed"); return (0); } diff --git a/src/utils_ovs.c b/src/utils_ovs.c index 4c99eef2..87d2ca1f 100644 --- a/src/utils_ovs.c +++ b/src/utils_ovs.c @@ -86,7 +86,8 @@ #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */ #define OVS_DB_EVENT_TERMINATE 1 -#define OVS_DB_EVENT_CONNECTED 2 +#define OVS_DB_EVENT_CONN_ESTABLISHED 2 +#define OVS_DB_EVENT_CONN_TERMINATED 3 #define OVS_DB_POLL_STATE_RUNNING 1 #define OVS_DB_POLL_STATE_EXITING 2 @@ -172,16 +173,17 @@ struct ovs_db_s { ovs_poll_thread_t poll_thread; ovs_event_thread_t event_thread; pthread_mutex_t mutex; - ovs_callback_t *cb; + ovs_callback_t *remote_cb; + ovs_db_callback_t cb; ovs_conn_t conn; - ovs_db_init_cb_t init_cb; }; typedef struct ovs_db_s ovs_db_t; /* Post an event to event thread. * Possible events are: * OVS_DB_EVENT_TERMINATE - * OVS_DB_EVENT_CONNECTED + * OVS_DB_EVENT_CONN_ESTABLISHED + * OVS_DB_EVENT_CONN_TERMINATED */ static void ovs_db_event_post(ovs_db_t *pdb, int event) @@ -233,11 +235,11 @@ static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) { pthread_mutex_lock(&pdb->mutex); - if (pdb->cb) - pdb->cb->prev = new_cb; - new_cb->next = pdb->cb; + if (pdb->remote_cb) + pdb->remote_cb->prev = new_cb; + new_cb->next = pdb->remote_cb; new_cb->prev = NULL; - pdb->cb = new_cb; + pdb->remote_cb = new_cb; pthread_mutex_unlock(&pdb->mutex); } @@ -255,7 +257,7 @@ ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) if (pre_cb) pre_cb->next = del_cb->next; else - pdb->cb = del_cb->next; + pdb->remote_cb = del_cb->next; free(del_cb); pthread_mutex_unlock(&pdb->mutex); @@ -266,11 +268,12 @@ static void ovs_db_callback_remove_all(ovs_db_t *pdb) { pthread_mutex_lock(&pdb->mutex); - for (ovs_callback_t *del_cb = pdb->cb; pdb->cb; del_cb = pdb->cb) { - pdb->cb = pdb->cb->next; + for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb; + del_cb = pdb->remote_cb) { + pdb->remote_cb = pdb->remote_cb->next; free(del_cb); } - pdb->cb = NULL; + pdb->remote_cb = NULL; pthread_mutex_unlock(&pdb->mutex); } @@ -280,7 +283,7 @@ static ovs_callback_t * ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) { pthread_mutex_lock(&pdb->mutex); - for (ovs_callback_t *cb = pdb->cb; cb != NULL; cb = cb->next) + for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next) if (cb->uid == uid) { pthread_mutex_unlock(&pdb->mutex); return cb; @@ -814,7 +817,7 @@ ovs_db_reconnect(ovs_db_t *pdb) } /* send notification to event thread */ - ovs_db_event_post(pdb, OVS_DB_EVENT_CONNECTED); + ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED); return (0); } @@ -860,6 +863,7 @@ ovs_poll_worker(void *arg) } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) { /* connection is broken */ OVS_ERROR("poll() peer closed its end of the channel"); + ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED); close(poll_fd.fd); } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) { /* read incoming data */ @@ -872,6 +876,7 @@ ovs_poll_worker(void *arg) ovs_db_json_data_process(pdb, json, json_len); } else if (nbytes == 0) { OVS_ERROR("recv() peer has performed an orderly shutdown"); + ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED); close(poll_fd.fd); } else { OVS_ERROR("recv() receive data error"); @@ -914,9 +919,19 @@ ovs_event_worker(void *arg) if (!ret) { /* handle the event */ OVS_DEBUG("handle event %d", pdb->event_thread.value); - if (pdb->event_thread.value == OVS_DB_EVENT_CONNECTED) - if (pdb->init_cb) - pdb->init_cb(pdb); + switch (pdb->event_thread.value) { + case OVS_DB_EVENT_CONN_ESTABLISHED: + if (pdb->cb.post_conn_init) + pdb->cb.post_conn_init(pdb); + break; + case OVS_DB_EVENT_CONN_TERMINATED: + if (pdb->cb.post_conn_terminate) + pdb->cb.post_conn_terminate(); + break; + default: + OVS_DEBUG("unknown event received"); + break; + } } else if (ret == ETIMEDOUT) { /* wait timeout */ OVS_DEBUG("no event received (timeout)"); @@ -977,7 +992,7 @@ ovs_db_init(const char *surl, ovs_db_callback_t *cb) /* setup OVS DB callbacks */ if (cb) - pdb->init_cb = cb->init_cb; + pdb->cb = *cb; /* prepare event thread */ pthread_cond_init(&pdb->event_thread.cond, NULL); diff --git a/src/utils_ovs.h b/src/utils_ovs.h index 484260f6..66868cda 100644 --- a/src/utils_ovs.h +++ b/src/utils_ovs.h @@ -72,13 +72,24 @@ typedef struct ovs_db_s ovs_db_t; /* OVS DB callback type declaration */ -typedef void (*ovs_db_init_cb_t) (ovs_db_t *pdb); typedef void (*ovs_db_table_cb_t) (yajl_val jupdates); typedef void (*ovs_db_result_cb_t) (yajl_val jresult, yajl_val jerror); /* OVS DB structures */ struct ovs_db_callback_s { - ovs_db_init_cb_t init_cb; + /* + * This callback is called when OVS DB connection + * has been established and ready to use. Client + * can use this callback to configure OVS DB, e.g. + * to subscribe to table update notification or poll + * some OVS DB data. This field can be NULL. + */ + void (*post_conn_init) (ovs_db_t *pdb); + /* + * This callback is called when OVD DB connection + * has been lost. This field can be NULL. + */ + void (*post_conn_terminate) (void); }; typedef struct ovs_db_callback_s ovs_db_callback_t; -- 2.30.2