ff2b151eee5a28c67c05bf284237e57c3c7f27f6
1 /**
2 * collectd - src/utils_ovs.c
3 *
4 * Copyright(c) 2016 Intel Corporation. All rights reserved.
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy of
7 * this software and associated documentation files (the "Software"), to deal in
8 * the Software without restriction, including without limitation the rights to
9 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10 * of the Software, and to permit persons to whom the Software is furnished to do
11 * so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 *
24 * Authors:
25 * Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
26 **/
28 /* clang-format off */
29 /*
30 * OVS DB API internal architecture diagram
31 * +------------------------------------------------------------------------------+
32 * |OVS plugin |OVS utils |
33 * | | +------------------------+ |
34 * | | | echo handler | JSON request/ |
35 * | | +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
36 * | | | | | | | result |
37 * | | | +------------------------+ | | |
38 * | | | | +----+---+--------+ |
39 * | +----------+ | | +------------------------+ | | | | |
40 * | | update | | | | update handler | | | YAJL | JSON | |
41 * | | callback +<-------+(ovs_db_table_update_cp)+<---+ | parser | reader | |
42 * | +----------+ | | | | | | | | |
43 * | | | +------------------------+ | +--------+---+----+ |
44 * | | | | ^ |
45 * | +----------+ | | +------------------------+ | | |
46 * | | result | | | | result handler | | | |
47 * | | callback +<-------+ (ovs_db_result_cb) +<---+ JSON raw | |
48 * | +----------+ | | | | data | |
49 * | | | +------------------------+ | |
50 * | | | | |
51 * | | | +------------------+ +------------+----+ |
52 * | +----------+ | | |thread| | |thread| | |
53 * | | init | | | | | reconnect | | |
54 * | | callback +<---------+ EVENT WORKER +<------------+ POLL WORKER | |
55 * | +----------+ | | +------------------+ +--------+--------+ |
56 * | | | ^ |
57 * +----------------+-------------------------------------------------------------+
58 * | |
59 * JSON|echo reply raw|data
60 * v v
61 * +-------------------+----------------------------------------------+-----------+
62 * | TCP/UNIX socket |
63 * +-------------------------------------------------------------------------------
64 */
65 /* clang-format on */
67 /* collectd headers */
68 #include "collectd.h"
70 #include "common.h"
72 /* private headers */
73 #include "utils_ovs.h"
75 /* system libraries */
76 #if HAVE_NETDB_H
77 #include <netdb.h>
78 #endif
79 #if HAVE_ARPA_INET_H
80 #include <arpa/inet.h>
81 #endif
82 #if HAVE_POLL_H
83 #include <poll.h>
84 #endif
85 #if HAVE_SYS_UN_H
86 #include <sys/un.h>
87 #endif
89 #include <semaphore.h>
91 #define OVS_ERROR(fmt, ...) \
92 do { \
93 ERROR("ovs_utils: " fmt, ##__VA_ARGS__); \
94 } while (0)
95 #define OVS_DEBUG(fmt, ...) \
96 do { \
97 DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__, \
98 ##__VA_ARGS__); \
99 } while (0)
101 #define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */
102 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
103 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
105 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
106 #define OVS_DB_EVENT_TERMINATE 1
107 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
108 #define OVS_DB_EVENT_CONN_TERMINATED 3
110 #define OVS_DB_POLL_STATE_RUNNING 1
111 #define OVS_DB_POLL_STATE_EXITING 2
113 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
115 #define OVS_YAJL_CALL(func, ...) \
116 do { \
117 yajl_gen_ret = yajl_gen_status_ok; \
118 if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
119 goto yajl_gen_failure; \
120 } while (0)
121 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
122 #define OVS_ERROR_BUFF_SIZE 512
123 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
125 /* JSON reader internal data */
126 struct ovs_json_reader_s {
127 char *buff_ptr;
128 size_t buff_size;
129 size_t buff_offset;
130 size_t json_offset;
131 };
132 typedef struct ovs_json_reader_s ovs_json_reader_t;
134 /* Result callback declaration */
135 struct ovs_result_cb_s {
136 sem_t sync;
137 ovs_db_result_cb_t call;
138 };
139 typedef struct ovs_result_cb_s ovs_result_cb_t;
141 /* Table callback declaration */
142 struct ovs_table_cb_s {
143 ovs_db_table_cb_t call;
144 };
145 typedef struct ovs_table_cb_s ovs_table_cb_t;
147 /* Callback declaration */
148 struct ovs_callback_s {
149 uint64_t uid;
150 union {
151 ovs_result_cb_t result;
152 ovs_table_cb_t table;
153 };
154 struct ovs_callback_s *next;
155 struct ovs_callback_s *prev;
156 };
157 typedef struct ovs_callback_s ovs_callback_t;
159 /* Event thread data declaration */
160 struct ovs_event_thread_s {
161 pthread_t tid;
162 pthread_mutex_t mutex;
163 pthread_cond_t cond;
164 int value;
165 };
166 typedef struct ovs_event_thread_s ovs_event_thread_t;
168 /* Poll thread data declaration */
169 struct ovs_poll_thread_s {
170 pthread_t tid;
171 pthread_mutex_t mutex;
172 int state;
173 };
174 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
176 /* OVS DB internal data declaration */
177 struct ovs_db_s {
178 ovs_poll_thread_t poll_thread;
179 ovs_event_thread_t event_thread;
180 pthread_mutex_t mutex;
181 ovs_callback_t *remote_cb;
182 ovs_db_callback_t cb;
183 char service[OVS_DB_ADDR_SERVICE_SIZE];
184 char node[OVS_DB_ADDR_NODE_SIZE];
185 char unix_path[OVS_DB_ADDR_NODE_SIZE];
186 int sock;
187 };
189 /* Global variables */
190 static uint64_t ovs_uid = 0;
191 static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
193 /* Post an event to event thread.
194 * Possible events are:
195 * OVS_DB_EVENT_TERMINATE
196 * OVS_DB_EVENT_CONN_ESTABLISHED
197 * OVS_DB_EVENT_CONN_TERMINATED
198 */
199 static void ovs_db_event_post(ovs_db_t *pdb, int event) {
200 pthread_mutex_lock(&pdb->event_thread.mutex);
201 pdb->event_thread.value = event;
202 pthread_mutex_unlock(&pdb->event_thread.mutex);
203 pthread_cond_signal(&pdb->event_thread.cond);
204 }
206 /* Check if POLL thread is still running. Returns
207 * 1 if running otherwise 0 is returned */
208 static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
209 int state = 0;
210 pthread_mutex_lock(&pdb->poll_thread.mutex);
211 state = pdb->poll_thread.state;
212 pthread_mutex_unlock(&pdb->poll_thread.mutex);
213 return (state == OVS_DB_POLL_STATE_RUNNING);
214 }
216 /* Generate unique identifier (UID). It is used by OVS DB API
217 * to set "id" field for any OVS DB JSON request. */
218 static uint64_t ovs_uid_generate() {
219 uint64_t new_uid;
220 pthread_mutex_lock(&ovs_uid_mutex);
221 new_uid = ++ovs_uid;
222 pthread_mutex_unlock(&ovs_uid_mutex);
223 return new_uid;
224 }
226 /*
227 * Callback API. These function are used to store
228 * registered callbacks in OVS DB API.
229 */
231 /* Add new callback into OVS DB object */
232 static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
233 pthread_mutex_lock(&pdb->mutex);
234 if (pdb->remote_cb)
235 pdb->remote_cb->prev = new_cb;
236 new_cb->next = pdb->remote_cb;
237 new_cb->prev = NULL;
238 pdb->remote_cb = new_cb;
239 pthread_mutex_unlock(&pdb->mutex);
240 }
242 /* Remove callback from OVS DB object */
243 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
244 pthread_mutex_lock(&pdb->mutex);
245 ovs_callback_t *pre_cb = del_cb->prev;
246 ovs_callback_t *next_cb = del_cb->next;
248 if (next_cb)
249 next_cb->prev = del_cb->prev;
251 if (pre_cb)
252 pre_cb->next = del_cb->next;
253 else
254 pdb->remote_cb = del_cb->next;
256 free(del_cb);
257 pthread_mutex_unlock(&pdb->mutex);
258 }
260 /* Remove all callbacks form OVS DB object */
261 static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
262 pthread_mutex_lock(&pdb->mutex);
263 for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
264 del_cb = pdb->remote_cb) {
265 pdb->remote_cb = del_cb->next;
266 free(del_cb);
267 }
268 pdb->remote_cb = NULL;
269 pthread_mutex_unlock(&pdb->mutex);
270 }
272 /* Get/find callback in OVS DB object by UID. Returns pointer
273 * to requested callback otherwise NULL is returned.
274 *
275 * IMPORTANT NOTE:
276 * The OVS DB mutex MUST be locked by the caller
277 * to make sure that returned callback is still valid.
278 */
279 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
280 for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
281 if (cb->uid == uid)
282 return cb;
283 return NULL;
284 }
286 /* Send all requested data to the socket. Returns 0 if
287 * ALL request data has been sent otherwise negative value
288 * is returned */
289 static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
290 ssize_t nbytes = 0;
291 size_t rem = len;
292 size_t off = 0;
294 while (rem > 0) {
295 if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
296 return (-1);
297 rem -= (size_t)nbytes;
298 off += (size_t)nbytes;
299 }
300 return (0);
301 }
303 /*
304 * YAJL (Yet Another JSON Library) helper functions
305 * Documentation (https://lloyd.github.io/yajl/)
306 */
308 /* Add null-terminated string into YAJL generator handle (JSON object).
309 * Similar function to yajl_gen_string() but takes null-terminated string
310 * instead of string and its length.
311 *
312 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
313 * string - Null-terminated string
314 */
315 static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
316 const char *string) {
317 return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
318 }
320 /* Add YAJL value into YAJL generator handle (JSON object)
321 *
322 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
323 * jval - YAJL value usually returned by yajl_tree_get()
324 */
325 static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
326 size_t array_len = 0;
327 yajl_val *jvalues = NULL;
328 yajl_val jobj_value = NULL;
329 const char *obj_key = NULL;
330 size_t obj_len = 0;
331 yajl_gen_status yajl_gen_ret;
333 if (YAJL_IS_STRING(jval))
334 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
335 else if (YAJL_IS_DOUBLE(jval))
336 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
337 else if (YAJL_IS_INTEGER(jval))
338 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
339 else if (YAJL_IS_TRUE(jval))
340 OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
341 else if (YAJL_IS_FALSE(jval))
342 OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
343 else if (YAJL_IS_NULL(jval))
344 OVS_YAJL_CALL(yajl_gen_null, jgen);
345 else if (YAJL_IS_ARRAY(jval)) {
346 /* create new array and add all elements into the array */
347 array_len = YAJL_GET_ARRAY(jval)->len;
348 jvalues = YAJL_GET_ARRAY(jval)->values;
349 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
350 for (int i = 0; i < array_len; i++)
351 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
352 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
353 } else if (YAJL_IS_OBJECT(jval)) {
354 /* create new object and add all elements into the object */
355 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
356 obj_len = YAJL_GET_OBJECT(jval)->len;
357 for (int i = 0; i < obj_len; i++) {
358 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
359 jobj_value = YAJL_GET_OBJECT(jval)->values[i];
360 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
361 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
362 }
363 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
364 } else {
365 OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
366 (int)(jval)->type);
367 goto yajl_gen_failure;
368 }
369 return yajl_gen_status_ok;
371 yajl_gen_failure:
372 OVS_ERROR("%s() error to generate value", __FUNCTION__);
373 return yajl_gen_ret;
374 }
376 /* OVS DB echo request handler. When OVS DB sends
377 * "echo" request to the client, client should generate
378 * "echo" replay with the same content received in the
379 * request */
380 static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
381 yajl_val jparams;
382 yajl_val jid;
383 yajl_gen jgen;
384 size_t resp_len = 0;
385 const char *resp = NULL;
386 const char *params_path[] = {"params", NULL};
387 const char *id_path[] = {"id", NULL};
388 yajl_gen_status yajl_gen_ret;
390 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
391 return (-1);
393 /* check & get request attributes */
394 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
395 ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
396 OVS_ERROR("parse echo request failed");
397 goto yajl_gen_failure;
398 }
400 /* generate JSON echo response */
401 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
403 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
404 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
406 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
407 OVS_YAJL_CALL(yajl_gen_null, jgen);
409 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
410 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
412 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
413 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
414 &resp_len);
416 /* send the response */
417 OVS_DEBUG("response: %s", resp);
418 if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
419 OVS_ERROR("send echo reply failed");
420 goto yajl_gen_failure;
421 }
422 /* clean up and return success */
423 yajl_gen_clear(jgen);
424 return (0);
426 yajl_gen_failure:
427 /* release memory */
428 yajl_gen_clear(jgen);
429 return (-1);
430 }
432 /* Get OVS DB registered callback by YAJL val. The YAJL
433 * value should be YAJL string (UID). Returns NULL if
434 * callback hasn't been found. See also ovs_db_callback_get()
435 * description for addition info.
436 */
437 static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
438 char *endptr = NULL;
439 const char *suid = NULL;
440 uint64_t uid;
442 if (jid && YAJL_IS_STRING(jid)) {
443 suid = YAJL_GET_STRING(jid);
444 uid = (uint64_t)strtoul(suid, &endptr, 16);
445 if (*endptr == '\0' && uid)
446 return ovs_db_callback_get(pdb, uid);
447 }
449 return NULL;
450 }
452 /* OVS DB table update event handler.
453 * This callback is called by POLL thread if OVS DB
454 * table update callback is received from the DB
455 * server. Once registered callback found, it's called
456 * by this handler. */
457 static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
458 ovs_callback_t *cb = NULL;
459 yajl_val jvalue;
460 yajl_val jparams;
461 yajl_val jtable_updates;
462 yajl_val jtable_update;
463 size_t obj_len = 0;
464 const char *table_name = NULL;
465 const char *params_path[] = {"params", NULL};
466 const char *id_path[] = {"id", NULL};
468 /* check & get request attributes */
469 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
470 (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
471 OVS_ERROR("invalid OVS DB request received");
472 return (-1);
473 }
475 /* check array length: [<json-value>, <table-updates>] */
476 if ((YAJL_GET_ARRAY(jparams) == NULL) ||
477 (YAJL_GET_ARRAY(jparams)->len != 2)) {
478 OVS_ERROR("invalid OVS DB request received");
479 return (-1);
480 }
482 jvalue = YAJL_GET_ARRAY(jparams)->values[0];
483 jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
484 if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
485 OVS_ERROR("invalid OVS DB request id or table update received");
486 return (-1);
487 }
489 /* find registered callback based on <json-value> */
490 pthread_mutex_lock(&pdb->mutex);
491 cb = ovs_db_table_callback_get(pdb, jvalue);
492 if (cb == NULL || cb->table.call == NULL) {
493 OVS_ERROR("No OVS DB table update callback found");
494 pthread_mutex_unlock(&pdb->mutex);
495 return (-1);
496 }
498 /* call registered callback */
499 cb->table.call(jtable_updates);
500 pthread_mutex_unlock(&pdb->mutex);
501 return 0;
502 }
504 /* OVS DB result request handler.
505 * This callback is called by POLL thread if OVS DB
506 * result reply is received from the DB server.
507 * Once registered callback found, it's called
508 * by this handler. */
509 static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
510 ovs_callback_t *cb = NULL;
511 yajl_val jresult;
512 yajl_val jerror;
513 yajl_val jid;
514 const char *result_path[] = {"result", NULL};
515 const char *error_path[] = {"error", NULL};
516 const char *id_path[] = {"id", NULL};
518 jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
519 jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
520 jid = yajl_tree_get(jnode, id_path, yajl_t_string);
522 /* check & get result attributes */
523 if (!jresult || !jerror || !jid)
524 return (-1);
526 /* try to find registered callback */
527 pthread_mutex_lock(&pdb->mutex);
528 cb = ovs_db_table_callback_get(pdb, jid);
529 if (cb != NULL && cb->result.call != NULL) {
530 /* call registered callback */
531 cb->result.call(jresult, jerror);
532 /* unlock owner of the reply */
533 sem_post(&cb->result.sync);
534 }
536 pthread_mutex_unlock(&pdb->mutex);
537 return (0);
538 }
540 /* Handle JSON data (one request) and call
541 * appropriate event OVS DB handler. Currently,
542 * update callback 'ovs_db_table_update_cb' and
543 * result callback 'ovs_db_result_cb' is supported.
544 */
545 static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
546 size_t len) {
547 const char *method = NULL;
548 char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
549 const char *method_path[] = {"method", NULL};
550 const char *result_path[] = {"result", NULL};
551 char *sjson = NULL;
552 yajl_val jnode, jval;
554 /* duplicate the data to make null-terminated string
555 * required for yajl_tree_parse() */
556 if ((sjson = calloc(1, len + 1)) == NULL)
557 return (-1);
559 sstrncpy(sjson, data, len + 1);
560 OVS_DEBUG("[len=%zu] %s", len, sjson);
562 /* parse json data */
563 jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
564 if (jnode == NULL) {
565 OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
566 sfree(sjson);
567 return (-1);
568 }
570 /* get method name */
571 if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
572 method = YAJL_GET_STRING(jval);
573 if (strcmp("echo", method) == 0) {
574 /* echo request from the server */
575 if (ovs_db_table_echo_cb(pdb, jnode) < 0)
576 OVS_ERROR("handle echo request failed");
577 } else if (strcmp("update", method) == 0) {
578 /* update notification */
579 if (ovs_db_table_update_cb(pdb, jnode) < 0)
580 OVS_ERROR("handle update notification failed");
581 }
582 } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
583 /* result notification */
584 if (ovs_db_result_cb(pdb, jnode) < 0)
585 OVS_ERROR("handle result reply failed");
586 } else
587 OVS_ERROR("connot find method or result failed");
589 /* release memory */
590 yajl_tree_free(jnode);
591 sfree(sjson);
592 return (0);
593 }
595 /*
596 * JSON reader implementation.
597 *
598 * This module process raw JSON data (byte stream) and
599 * returns fully-fledged JSON data which can be processed
600 * (parsed) by YAJL later.
601 */
603 /* Allocate JSON reader instance */
604 static ovs_json_reader_t *ovs_json_reader_alloc() {
605 ovs_json_reader_t *jreader = NULL;
607 if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
608 return NULL;
610 return jreader;
611 }
613 /* Push raw data into into the JSON reader for processing */
614 static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
615 const char *data, size_t data_len) {
616 char *new_buff = NULL;
617 size_t available = jreader->buff_size - jreader->buff_offset;
619 /* check/update required memory space */
620 if (available < data_len) {
621 OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
622 (int)jreader->buff_size, (int)available, (int)data_len);
624 /* allocate new chunk of memory */
625 new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
626 if (new_buff == NULL)
627 return (-1);
629 /* point to new allocated memory */
630 jreader->buff_ptr = new_buff;
631 jreader->buff_size += data_len;
632 }
634 /* store input data */
635 memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
636 jreader->buff_offset += data_len;
637 return (0);
638 }
640 /* Pop one fully-fledged JSON if already exists. Returns 0 if
641 * completed JSON already exists otherwise negative value is
642 * returned */
643 static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
644 const char **json_ptr, size_t *json_len_ptr) {
645 size_t nbraces = 0;
646 size_t json_len = 0;
647 char *json = NULL;
649 /* search open/close brace */
650 for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
651 if (jreader->buff_ptr[i] == '{') {
652 nbraces++;
653 } else if (jreader->buff_ptr[i] == '}')
654 if (nbraces)
655 if (!(--nbraces)) {
656 /* JSON data */
657 *json_ptr = jreader->buff_ptr + jreader->json_offset;
658 *json_len_ptr = json_len + 1;
659 jreader->json_offset = i + 1;
660 return (0);
661 }
663 /* increase JSON data length */
664 if (nbraces)
665 json_len++;
666 }
668 if (jreader->json_offset) {
669 if (jreader->json_offset < jreader->buff_offset) {
670 /* shift data to the beginning of the buffer
671 * and zero rest of the buffer data */
672 json = &jreader->buff_ptr[jreader->json_offset];
673 json_len = jreader->buff_offset - jreader->json_offset;
674 for (int i = 0; i < jreader->buff_size; i++)
675 jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
676 jreader->buff_offset = json_len;
677 } else
678 /* reset the buffer */
679 jreader->buff_offset = 0;
681 /* data is at the beginning of the buffer */
682 jreader->json_offset = 0;
683 }
685 return (-1);
686 }
688 /* Reset JSON reader. It is useful when start processing
689 * new raw data. E.g.: in case of lost stream connection.
690 */
691 static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
692 if (jreader) {
693 jreader->buff_offset = 0;
694 jreader->json_offset = 0;
695 }
696 }
698 /* Release internal data allocated for JSON reader */
699 static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
700 if (jreader) {
701 free(jreader->buff_ptr);
702 free(jreader);
703 }
704 }
706 /* Reconnect to OVS DB and call the OVS DB post connection init callback
707 * if connection has been established.
708 */
709 static void ovs_db_reconnect(ovs_db_t *pdb) {
710 const char unix_prefix[] = "unix:";
711 const char *node_info = pdb->node;
712 struct addrinfo *result;
713 struct sockaddr_un saunix;
715 if (pdb->unix_path[0] != '\0') {
716 /* use UNIX socket instead of INET address */
717 node_info = pdb->unix_path;
718 result = calloc(1, sizeof(struct addrinfo));
719 struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
720 if (result == NULL || sa_unix == NULL) {
721 sfree(result);
722 sfree(sa_unix);
723 return;
724 }
725 result->ai_family = AF_UNIX;
726 result->ai_socktype = SOCK_STREAM;
727 result->ai_addrlen = sizeof(*sa_unix);
728 result->ai_addr = (struct sockaddr *)sa_unix;
729 sa_unix->sun_family = result->ai_family;
730 sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
731 } else {
732 /* inet socket address */
733 struct addrinfo hints;
735 /* setup criteria for selecting the socket address */
736 memset(&hints, 0, sizeof(hints));
737 hints.ai_family = AF_UNSPEC;
738 hints.ai_socktype = SOCK_STREAM;
740 /* get socket addresses */
741 int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
742 if (ret != 0) {
743 OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
744 return;
745 }
746 }
747 /* try to connect to the server */
748 for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
749 char errbuff[OVS_ERROR_BUFF_SIZE];
750 int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
751 if (sock < 0) {
752 sstrerror(errno, errbuff, sizeof(errbuff));
753 OVS_DEBUG("socket(): %s", errbuff);
754 continue;
755 }
756 if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
757 close(sock);
758 sstrerror(errno, errbuff, sizeof(errbuff));
759 OVS_DEBUG("connect(): %s [family=%d]", errbuff, rp->ai_family);
760 } else {
761 /* send notification to event thread */
762 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
763 pdb->sock = sock;
764 break;
765 }
766 }
768 if (pdb->sock < 0)
769 OVS_ERROR("connect to \"%s\" failed", node_info);
771 freeaddrinfo(result);
772 }
774 /* POLL worker thread.
775 * It listens on OVS DB connection for incoming
776 * requests/reply/events etc. Also, it reconnects to OVS DB
777 * if connection has been lost.
778 */
779 static void *ovs_poll_worker(void *arg) {
780 ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
781 ovs_json_reader_t *jreader = NULL;
782 struct pollfd poll_fd = {
783 .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0,
784 };
786 /* create JSON reader instance */
787 if ((jreader = ovs_json_reader_alloc()) == NULL) {
788 OVS_ERROR("initialize json reader failed");
789 return (NULL);
790 }
792 /* poll data */
793 while (ovs_db_poll_is_running(pdb)) {
794 char errbuff[OVS_ERROR_BUFF_SIZE];
795 poll_fd.fd = pdb->sock;
796 int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
797 if (poll_ret < 0) {
798 sstrerror(errno, errbuff, sizeof(errbuff));
799 OVS_ERROR("poll(): %s", errbuff);
800 break;
801 } else if (poll_ret == 0) {
802 OVS_DEBUG("poll(): timeout");
803 if (pdb->sock < 0)
804 /* invalid fd, so try to reconnect */
805 ovs_db_reconnect(pdb);
806 continue;
807 }
808 if (poll_fd.revents & POLLNVAL) {
809 /* invalid file descriptor, clean-up */
810 ovs_db_callback_remove_all(pdb);
811 ovs_json_reader_reset(jreader);
812 /* setting poll FD to -1 tells poll() call to ignore this FD.
813 * In that case poll() call will return timeout all the time */
814 pdb->sock = (-1);
815 } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
816 /* connection is broken */
817 close(poll_fd.fd);
818 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
819 OVS_ERROR("poll() peer closed its end of the channel");
820 } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
821 /* read incoming data */
822 char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
823 ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
824 if (nbytes < 0) {
825 sstrerror(errno, errbuff, sizeof(errbuff));
826 OVS_ERROR("recv(): %s", errbuff);
827 /* read error? Try to reconnect */
828 close(poll_fd.fd);
829 continue;
830 } else if (nbytes == 0) {
831 close(poll_fd.fd);
832 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
833 OVS_ERROR("recv() peer has performed an orderly shutdown");
834 continue;
835 }
836 /* read incoming data */
837 size_t json_len = 0;
838 const char *json = NULL;
839 OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
840 ovs_json_reader_push_data(jreader, buff, nbytes);
841 while (!ovs_json_reader_pop(jreader, &json, &json_len))
842 /* process JSON data */
843 ovs_db_json_data_process(pdb, json, json_len);
844 }
845 }
847 OVS_DEBUG("poll thread has been completed");
848 ovs_json_reader_free(jreader);
849 return (NULL);
850 }
852 /* EVENT worker thread.
853 * Perform task based on incoming events. This
854 * task can be done asynchronously which allows to
855 * handle OVS DB callback like 'init_cb'.
856 */
857 static void *ovs_event_worker(void *arg) {
858 ovs_db_t *pdb = (ovs_db_t *)arg;
860 while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
861 /* wait for an event */
862 struct timespec ts;
863 clock_gettime(CLOCK_REALTIME, &ts);
864 ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
865 int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
866 &pdb->event_thread.mutex, &ts);
867 if (!ret) {
868 /* handle the event */
869 OVS_DEBUG("handle event %d", pdb->event_thread.value);
870 switch (pdb->event_thread.value) {
871 case OVS_DB_EVENT_CONN_ESTABLISHED:
872 if (pdb->cb.post_conn_init)
873 pdb->cb.post_conn_init(pdb);
874 break;
875 case OVS_DB_EVENT_CONN_TERMINATED:
876 if (pdb->cb.post_conn_terminate)
877 pdb->cb.post_conn_terminate();
878 break;
879 default:
880 OVS_DEBUG("unknown event received");
881 break;
882 }
883 } else if (ret == ETIMEDOUT) {
884 /* wait timeout */
885 OVS_DEBUG("no event received (timeout)");
886 continue;
887 } else {
888 /* unexpected error */
889 OVS_ERROR("pthread_cond_timedwait() failed");
890 break;
891 }
892 }
894 OVS_DEBUG("event thread has been completed");
895 return (NULL);
896 }
898 /* Initialize EVENT thread */
899 static int ovs_db_event_thread_init(ovs_db_t *pdb) {
900 pdb->event_thread.tid = -1;
901 /* init event thread condition variable */
902 if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
903 return (-1);
904 }
905 /* init event thread mutex */
906 if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
907 pthread_cond_destroy(&pdb->event_thread.cond);
908 return (-1);
909 }
910 /* Hold the event thread mutex. It ensures that no events
911 * will be lost while thread is still starting. Once event
912 * thread is started and ready to accept events, it will release
913 * the mutex */
914 if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
915 pthread_mutex_destroy(&pdb->event_thread.mutex);
916 pthread_cond_destroy(&pdb->event_thread.cond);
917 return (-1);
918 }
919 /* start event thread */
920 pthread_t tid;
921 if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb,
922 "utils_ovs:event") != 0) {
923 pthread_mutex_unlock(&pdb->event_thread.mutex);
924 pthread_mutex_destroy(&pdb->event_thread.mutex);
925 pthread_cond_destroy(&pdb->event_thread.cond);
926 return (-1);
927 }
928 pdb->event_thread.tid = tid;
929 return (0);
930 }
932 /* Destroy EVENT thread */
933 static int ovs_db_event_thread_destroy(ovs_db_t *pdb) {
934 if (pdb->event_thread.tid < 0)
935 /* already destroyed */
936 return (0);
937 ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
938 if (pthread_join(pdb->event_thread.tid, NULL) != 0)
939 return (-1);
940 /* Event thread always holds the thread mutex when
941 * performs some task (handles event) and releases it when
942 * while sleeping. Thus, if event thread exits, the mutex
943 * remains locked */
944 pthread_mutex_unlock(&pdb->event_thread.mutex);
945 pthread_mutex_destroy(&pdb->event_thread.mutex);
946 pthread_cond_destroy(&pdb->event_thread.cond);
947 pdb->event_thread.tid = -1;
948 return (0);
949 }
951 /* Initialize POLL thread */
952 static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
953 pdb->poll_thread.tid = -1;
954 /* init event thread mutex */
955 if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
956 return (-1);
957 }
958 /* start poll thread */
959 pthread_t tid;
960 pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
961 if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb,
962 "utils_ovs:poll") != 0) {
963 pthread_mutex_destroy(&pdb->poll_thread.mutex);
964 return (-1);
965 }
966 pdb->poll_thread.tid = tid;
967 return (0);
968 }
970 /* Destroy POLL thread */
971 static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
972 if (pdb->poll_thread.tid < 0)
973 /* already destroyed */
974 return (0);
975 /* change thread state */
976 pthread_mutex_lock(&pdb->poll_thread.mutex);
977 pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
978 pthread_mutex_unlock(&pdb->poll_thread.mutex);
979 /* join the thread */
980 if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
981 return (-1);
982 pthread_mutex_destroy(&pdb->poll_thread.mutex);
983 pdb->poll_thread.tid = -1;
984 return (0);
985 }
987 /*
988 * Public OVS DB API implementation
989 */
991 ovs_db_t *ovs_db_init(const char *node, const char *service,
992 const char *unix_path, ovs_db_callback_t *cb) {
993 /* sanity check */
994 if (node == NULL || service == NULL || unix_path == NULL)
995 return (NULL);
997 /* allocate db data & fill it */
998 ovs_db_t *pdb = pdb = calloc(1, sizeof(*pdb));
999 if (pdb == NULL)
1000 return (NULL);
1002 /* store the OVS DB address */
1003 sstrncpy(pdb->node, node, sizeof(pdb->node));
1004 sstrncpy(pdb->service, service, sizeof(pdb->service));
1005 sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
1007 /* setup OVS DB callbacks */
1008 if (cb)
1009 pdb->cb = *cb;
1011 /* init OVS DB mutex attributes */
1012 pthread_mutexattr_t mutex_attr;
1013 if (pthread_mutexattr_init(&mutex_attr)) {
1014 OVS_ERROR("OVS DB mutex attribute init failed");
1015 sfree(pdb);
1016 return (NULL);
1017 }
1018 /* set OVS DB mutex as recursive */
1019 if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
1020 OVS_ERROR("Failed to set OVS DB mutex as recursive");
1021 pthread_mutexattr_destroy(&mutex_attr);
1022 sfree(pdb);
1023 return (NULL);
1024 }
1025 /* init OVS DB mutex */
1026 if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1027 OVS_ERROR("OVS DB mutex init failed");
1028 pthread_mutexattr_destroy(&mutex_attr);
1029 sfree(pdb);
1030 return (NULL);
1031 }
1032 /* destroy mutex attributes */
1033 pthread_mutexattr_destroy(&mutex_attr);
1035 /* init event thread */
1036 if (ovs_db_event_thread_init(pdb) < 0) {
1037 ovs_db_destroy(pdb);
1038 return (NULL);
1039 }
1041 /* init polling thread */
1042 pdb->sock = -1;
1043 if (ovs_db_poll_thread_init(pdb) < 0) {
1044 ovs_db_destroy(pdb);
1045 return (NULL);
1046 }
1047 return pdb;
1048 }
1050 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
1051 ovs_db_result_cb_t cb) {
1052 int ret = 0;
1053 yajl_gen_status yajl_gen_ret;
1054 yajl_val jparams;
1055 yajl_gen jgen;
1056 ovs_callback_t *new_cb = NULL;
1057 uint64_t uid;
1058 char uid_buff[OVS_UID_STR_SIZE];
1059 const char *req = NULL;
1060 size_t req_len = 0;
1061 struct timespec ts;
1063 /* sanity check */
1064 if (!pdb || !method || !params)
1065 return (-1);
1067 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1068 return (-1);
1070 /* try to parse params */
1071 if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1072 OVS_ERROR("params is not a JSON string");
1073 yajl_gen_clear(jgen);
1074 return (-1);
1075 }
1077 /* generate method field */
1078 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1080 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1081 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1083 /* generate params field */
1084 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1085 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1086 yajl_tree_free(jparams);
1088 /* generate id field */
1089 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1090 uid = ovs_uid_generate();
1091 ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1092 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1094 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1096 if (cb) {
1097 /* register result callback */
1098 if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1099 goto yajl_gen_failure;
1101 /* add new callback to front */
1102 sem_init(&new_cb->result.sync, 0, 0);
1103 new_cb->result.call = cb;
1104 new_cb->uid = uid;
1105 ovs_db_callback_add(pdb, new_cb);
1106 }
1108 /* send the request */
1109 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
1110 OVS_DEBUG("%s", req);
1111 if (!ovs_db_data_send(pdb, req, req_len)) {
1112 if (cb) {
1113 /* wait for result */
1114 clock_gettime(CLOCK_REALTIME, &ts);
1115 ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1116 if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1117 OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1118 OVS_DB_SEND_REQ_TIMEOUT);
1119 ret = (-1);
1120 }
1121 }
1122 } else {
1123 OVS_ERROR("ovs_db_data_send() failed");
1124 ret = (-1);
1125 }
1127 yajl_gen_failure:
1128 if (new_cb) {
1129 /* destroy callback */
1130 sem_destroy(&new_cb->result.sync);
1131 ovs_db_callback_remove(pdb, new_cb);
1132 }
1134 /* release memory */
1135 yajl_gen_clear(jgen);
1136 return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1137 }
1139 int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1140 const char **tb_column,
1141 ovs_db_table_cb_t update_cb,
1142 ovs_db_result_cb_t result_cb, unsigned int flags) {
1143 yajl_gen jgen;
1144 yajl_gen_status yajl_gen_ret;
1145 ovs_callback_t *new_cb = NULL;
1146 char uid_str[OVS_UID_STR_SIZE];
1147 char *params;
1148 size_t params_len;
1149 int ovs_db_ret = 0;
1151 /* sanity check */
1152 if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1153 return (-1);
1155 /* allocate new update callback */
1156 if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1157 return (-1);
1159 /* init YAJL generator */
1160 if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
1161 sfree(new_cb);
1162 return (-1);
1163 }
1165 /* add new callback to front */
1166 new_cb->table.call = update_cb;
1167 new_cb->uid = ovs_uid_generate();
1168 ovs_db_callback_add(pdb, new_cb);
1170 /* make update notification request
1171 * [<db-name>, <json-value>, <monitor-requests>] */
1172 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1173 {
1174 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1176 /* uid string <json-value> */
1177 ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1178 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1180 /* <monitor-requests> */
1181 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1182 {
1183 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1184 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1185 {
1186 /* <monitor-request> */
1187 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1188 {
1189 if (tb_column) {
1190 /* columns within the table to be monitored */
1191 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1192 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1193 for (; *tb_column; tb_column++)
1194 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1195 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1196 }
1197 /* specify select option */
1198 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1199 {
1200 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1201 {
1202 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1203 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1204 flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1205 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1206 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1207 flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1208 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1209 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1210 flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1211 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1212 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1213 flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1214 }
1215 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1216 }
1217 }
1218 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1219 }
1220 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1221 }
1222 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1223 }
1224 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1226 /* make a request to subscribe to given table */
1227 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)¶ms,
1228 ¶ms_len);
1229 if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1230 OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1231 ovs_db_ret = (-1);
1232 }
1234 yajl_gen_failure:
1235 /* release memory */
1236 yajl_gen_clear(jgen);
1237 return ovs_db_ret;
1238 }
1240 int ovs_db_destroy(ovs_db_t *pdb) {
1241 int ovs_db_ret = 0;
1242 int ret = 0;
1244 /* sanity check */
1245 if (pdb == NULL)
1246 return (-1);
1248 /* try to lock the structure before releasing */
1249 if ((ret = pthread_mutex_lock(&pdb->mutex))) {
1250 OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
1251 return (-1);
1252 }
1254 /* stop poll thread */
1255 if (ovs_db_event_thread_destroy(pdb) < 0) {
1256 OVS_ERROR("destroy poll thread failed");
1257 ovs_db_ret = (-1);
1258 }
1260 /* stop event thread */
1261 if (ovs_db_poll_thread_destroy(pdb) < 0) {
1262 OVS_ERROR("stop event thread failed");
1263 ovs_db_ret = (-1);
1264 }
1266 /* unsubscribe callbacks */
1267 ovs_db_callback_remove_all(pdb);
1269 /* close connection */
1270 if (pdb->sock >= 0)
1271 close(pdb->sock);
1273 /* release DB handler */
1274 pthread_mutex_unlock(&pdb->mutex);
1275 pthread_mutex_destroy(&pdb->mutex);
1276 sfree(pdb);
1277 return ovs_db_ret;
1278 }
1280 /*
1281 * Public OVS utils API implementation
1282 */
1284 /* Get YAJL value by key from YAJL dictionary
1285 *
1286 * EXAMPLE:
1287 * {
1288 * "key_a" : <YAJL return value>
1289 * "key_b" : <YAJL return value>
1290 * }
1291 */
1292 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
1293 const char *obj_key = NULL;
1295 /* check params */
1296 if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1297 return NULL;
1299 /* find a value by key */
1300 for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1301 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1302 if (strcmp(obj_key, key) == 0)
1303 return YAJL_GET_OBJECT(jval)->values[i];
1304 }
1306 return NULL;
1307 }
1309 /* Get OVS DB map value by given map key
1310 *
1311 * FROM RFC7047:
1312 *
1313 * <pair>
1314 * A 2-element JSON array that represents a pair within a database
1315 * map. The first element is an <atom> that represents the key, and
1316 * the second element is an <atom> that represents the value.
1317 *
1318 * <map>
1319 * A 2-element JSON array that represents a database map value. The
1320 * first element of the array must be the string "map", and the
1321 * second element must be an array of zero or more <pair>s giving the
1322 * values in the map. All of the <pair>s must have the same key and
1323 * value types.
1324 *
1325 * EXAMPLE:
1326 * [
1327 * "map", [
1328 * [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
1329 * ]
1330 * ]
1331 */
1332 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
1333 size_t map_len = 0;
1334 size_t array_len = 0;
1335 yajl_val *map_values = NULL;
1336 yajl_val *array_values = NULL;
1337 const char *str_val = NULL;
1339 /* check YAJL array */
1340 if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1341 return NULL;
1343 /* check a database map value (2-element, first one should be a string */
1344 array_len = YAJL_GET_ARRAY(jval)->len;
1345 array_values = YAJL_GET_ARRAY(jval)->values;
1346 if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1347 (!YAJL_IS_ARRAY(array_values[1])))
1348 return NULL;
1350 /* check first element of the array */
1351 str_val = YAJL_GET_STRING(array_values[0]);
1352 if (strcmp("map", str_val) != 0)
1353 return NULL;
1355 /* try to find map value by map key */
1356 map_len = YAJL_GET_ARRAY(array_values[1])->len;
1357 map_values = YAJL_GET_ARRAY(array_values[1])->values;
1358 for (int i = 0; i < map_len; i++) {
1359 /* check YAJL array */
1360 if (!YAJL_IS_ARRAY(map_values[i]))
1361 break;
1363 /* check a database pair value (2-element, first one represents a key
1364 * and it should be a string in our case */
1365 array_len = YAJL_GET_ARRAY(map_values[i])->len;
1366 array_values = YAJL_GET_ARRAY(map_values[i])->values;
1367 if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1368 break;
1370 /* return map value if given key equals map key */
1371 str_val = YAJL_GET_STRING(array_values[0]);
1372 if (strcmp(key, str_val) == 0)
1373 return array_values[1];
1374 }
1375 return NULL;
1376 }