1 /**
2 * collectd - src/utils_ovs.c
3 *
4 * Copyright(c) 2016 Intel Corporation. All rights reserved.
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy of
7 * this software and associated documentation files (the "Software"), to deal in
8 * the Software without restriction, including without limitation the rights to
9 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10 * of the Software, and to permit persons to whom the Software is furnished to do
11 * so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 *
24 * Authors:
25 * Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
26 *
27 * OVS DB API internal architecture diagram
28 * +------------------------------------------------------------------------------+
29 * |OVS plugin |OVS utils |
30 * | | +------------------------+ |
31 * | | | echo handler | JSON request/ |
32 * | | +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
33 * | | | | | | | result |
34 * | | | +------------------------+ | | |
35 * | | | | +----+---+--------+ |
36 * | +----------+ | | +------------------------+ | | | | |
37 * | | update | | | | update handler | | | YAJL | JSON | |
38 * | | callback +<-------+(ovs_db_table_update_cp)+<---+ | parser | reader | |
39 * | +----------+ | | | | | | | | |
40 * | | | +------------------------+ | +--------+---+----+ |
41 * | | | | ^ |
42 * | +----------+ | | +------------------------+ | | |
43 * | | result | | | | result handler | | | |
44 * | | callback +<-------+ (ovs_db_result_cb) +<---+ JSON raw | |
45 * | +----------+ | | | | data | |
46 * | | | +------------------------+ | |
47 * | | | | |
48 * | | | +------------------+ +------------+----+ |
49 * | +----------+ | | |thread| | |thread| | |
50 * | | init | | | | | reconnect | | |
51 * | | callback +<---------+ EVENT WORKER +<------------+ POLL WORKER | |
52 * | +----------+ | | +------------------+ +--------+--------+ |
53 * | | | ^ |
54 * +----------------+-------------------------------------------------------------+
55 * | |
56 * JSON|echo reply raw|data
57 * v v
58 * +-------------------+----------------------------------------------+-----------+
59 * | TCP/UNIX socket |
60 * +-------------------------------------------------------------------------------
61 *
62 **/
64 /* collectd headers */
65 #include "common.h"
67 /* private headers */
68 #include "utils_ovs.h"
70 /* system libraries */
71 #include <semaphore.h>
72 #include <arpa/inet.h>
73 #include <poll.h>
74 #include <sys/un.h>
76 #define OVS_ERROR(fmt, ...) do { \
77 ERROR("ovs_utils: "fmt, ## __VA_ARGS__); } while (0)
78 #define OVS_DEBUG(fmt, ...) do { \
79 DEBUG("%s:%d:%s(): "fmt, __FILE__, __LINE__, __FUNCTION__, \
80 ## __VA_ARGS__); } while (0)
82 #define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */
83 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
84 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
85 #define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */
87 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
88 #define OVS_DB_EVENT_TERMINATE 1
89 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
90 #define OVS_DB_EVENT_CONN_TERMINATED 3
92 #define OVS_DB_POLL_STATE_RUNNING 1
93 #define OVS_DB_POLL_STATE_EXITING 2
95 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
97 #define OVS_YAJL_CALL(func, ...) \
98 do { \
99 yajl_gen_ret = yajl_gen_status_ok; \
100 if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
101 goto yajl_gen_failure; \
102 } while (0)
103 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
104 #define OVS_ERROR_BUFF_SIZE 512
105 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
107 /* JSON reader internal data */
108 struct ovs_json_reader_s {
109 char *buff_ptr;
110 size_t buff_size;
111 size_t buff_offset;
112 size_t json_offset;
113 };
114 typedef struct ovs_json_reader_s ovs_json_reader_t;
116 /* Result callback declaration */
117 struct ovs_result_cb_s {
118 sem_t sync;
119 ovs_db_result_cb_t call;
120 };
121 typedef struct ovs_result_cb_s ovs_result_cb_t;
123 /* Table callback declaration */
124 struct ovs_table_cb_s {
125 ovs_db_table_cb_t call;
126 };
127 typedef struct ovs_table_cb_s ovs_table_cb_t;
129 /* Callback declaration */
130 struct ovs_callback_s {
131 uint64_t uid;
132 union {
133 ovs_result_cb_t result;
134 ovs_table_cb_t table;
135 };
136 struct ovs_callback_s *next;
137 struct ovs_callback_s *prev;
138 };
139 typedef struct ovs_callback_s ovs_callback_t;
141 /* Connection declaration */
142 struct ovs_conn_s {
143 int sock;
144 int domain;
145 int type;
146 int addr_size;
147 union {
148 struct sockaddr_in s_inet;
149 struct sockaddr_un s_unix;
150 } addr;
151 };
152 typedef struct ovs_conn_s ovs_conn_t;
154 /* Event thread data declaration */
155 struct ovs_event_thread_s {
156 pthread_t tid;
157 pthread_mutex_t mutex;
158 pthread_cond_t cond;
159 int value;
160 };
161 typedef struct ovs_event_thread_s ovs_event_thread_t;
163 /* Poll thread data declaration */
164 struct ovs_poll_thread_s {
165 pthread_t tid;
166 pthread_mutex_t mutex;
167 int state;
168 };
169 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
171 /* OVS DB internal data declaration */
172 struct ovs_db_s {
173 ovs_poll_thread_t poll_thread;
174 ovs_event_thread_t event_thread;
175 pthread_mutex_t mutex;
176 ovs_callback_t *remote_cb;
177 ovs_db_callback_t cb;
178 ovs_conn_t conn;
179 };
180 typedef struct ovs_db_s ovs_db_t;
182 /* Post an event to event thread.
183 * Possible events are:
184 * OVS_DB_EVENT_TERMINATE
185 * OVS_DB_EVENT_CONN_ESTABLISHED
186 * OVS_DB_EVENT_CONN_TERMINATED
187 */
188 static void
189 ovs_db_event_post(ovs_db_t *pdb, int event)
190 {
191 pthread_mutex_lock(&pdb->event_thread.mutex);
192 pdb->event_thread.value = event;
193 pthread_mutex_unlock(&pdb->event_thread.mutex);
194 pthread_cond_signal(&pdb->event_thread.cond);
195 }
197 /* Check if POLL thread is still running. Returns
198 * 1 if running otherwise 0 is returned */
199 static inline int
200 ovs_db_poll_is_running(ovs_db_t *pdb)
201 {
202 int state = 0;
203 pthread_mutex_lock(&pdb->poll_thread.mutex);
204 state = pdb->poll_thread.state;
205 pthread_mutex_unlock(&pdb->poll_thread.mutex);
206 return (state == OVS_DB_POLL_STATE_RUNNING);
207 }
209 /* Terminate POLL thread */
210 static inline void
211 ovs_db_poll_terminate(ovs_db_t *pdb)
212 {
213 pthread_mutex_lock(&pdb->poll_thread.mutex);
214 pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
215 pthread_mutex_unlock(&pdb->poll_thread.mutex);
216 }
218 /* Generate unique identifier (UID). It is used by OVS DB API
219 * to set "id" field for any OVS DB JSON request. */
220 static uint64_t
221 ovs_uid_generate()
222 {
223 struct timespec ts;
224 clock_gettime(CLOCK_MONOTONIC, &ts);
225 return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
226 }
228 /*
229 * Callback API. These function are used to store
230 * registered callbacks in OVS DB API.
231 */
233 /* Add new callback into OVS DB object */
234 static void
235 ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
236 {
237 pthread_mutex_lock(&pdb->mutex);
238 if (pdb->remote_cb)
239 pdb->remote_cb->prev = new_cb;
240 new_cb->next = pdb->remote_cb;
241 new_cb->prev = NULL;
242 pdb->remote_cb = new_cb;
243 pthread_mutex_unlock(&pdb->mutex);
244 }
246 /* Remove callback from OVS DB object */
247 static void
248 ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
249 {
250 ovs_callback_t *pre_cb = del_cb->prev;
251 ovs_callback_t *next_cb = del_cb->next;
253 pthread_mutex_lock(&pdb->mutex);
254 if (next_cb)
255 next_cb->prev = del_cb->prev;
257 if (pre_cb)
258 pre_cb->next = del_cb->next;
259 else
260 pdb->remote_cb = del_cb->next;
262 free(del_cb);
263 pthread_mutex_unlock(&pdb->mutex);
264 }
266 /* Remove all callbacks form OVS DB object */
267 static void
268 ovs_db_callback_remove_all(ovs_db_t *pdb)
269 {
270 pthread_mutex_lock(&pdb->mutex);
271 for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
272 del_cb = pdb->remote_cb) {
273 pdb->remote_cb = pdb->remote_cb->next;
274 free(del_cb);
275 }
276 pdb->remote_cb = NULL;
277 pthread_mutex_unlock(&pdb->mutex);
278 }
280 /* Get/find callback in OVS DB object by UID. Returns pointer
281 * to requested callback otherwise NULL is returned */
282 static ovs_callback_t *
283 ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid)
284 {
285 pthread_mutex_lock(&pdb->mutex);
286 for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
287 if (cb->uid == uid) {
288 pthread_mutex_unlock(&pdb->mutex);
289 return cb;
290 }
291 pthread_mutex_unlock(&pdb->mutex);
292 return NULL;
293 }
295 /* Send all requested data to the socket. Returns 0 if
296 * ALL request data has been sent otherwise negative value
297 * is returned */
298 static int
299 ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len)
300 {
301 ssize_t nbytes = 0;
302 size_t rem = len;
303 size_t off = 0;
305 while (rem > 0) {
306 if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0)
307 return (-1);
308 rem -= (size_t)nbytes;
309 off += (size_t)nbytes;
310 }
311 return (0);
312 }
314 /* Parse OVS server URL.
315 * Format of the URL:
316 * "tcp:a.b.c.d:port" - define TCP connection (INET domain)
317 * "unix:file" - define UNIX socket file (UNIX domain)
318 */
319 static int
320 ovs_db_url_parse(const char *surl, ovs_conn_t *conn)
321 {
322 ovs_conn_t tmp_conn;
323 char *nexttok = NULL;
324 char *in_str = NULL;
325 char *saveptr;
326 int ret = 0;
328 /* sanity check */
329 if ((surl == NULL) || (strlen(surl) < 1))
330 return (-1);
332 /* parse domain */
333 tmp_conn = *conn;
334 in_str = sstrdup(surl);
335 if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) {
336 if (strcmp("tcp", nexttok) == 0) {
337 tmp_conn.domain = AF_INET;
338 tmp_conn.type = SOCK_STREAM;
339 tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet);
340 } else if (strcmp("unix", nexttok) == 0) {
341 tmp_conn.domain = AF_UNIX;
342 tmp_conn.type = SOCK_STREAM;
343 tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix);
344 } else
345 goto failure;
346 } else
347 goto failure;
349 /* parse url depending on domain */
350 if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) {
351 if (tmp_conn.domain == AF_UNIX) {
352 /* <UNIX-NAME> */
353 tmp_conn.addr.s_inet.sin_family = AF_UNIX;
354 sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1);
355 } else {
356 /* <IP:PORT> */
357 tmp_conn.addr.s_inet.sin_family = AF_INET;
358 ret =
359 inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr);
360 if (ret == 1) {
361 if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL)
362 tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok));
363 else
364 goto failure;
365 } else
366 goto failure;
367 }
368 }
370 /* save result and return success */
371 *conn = tmp_conn;
372 sfree(in_str);
373 return (0);
375 failure:
376 OVS_ERROR("%s() : invalid OVS DB URL provided");
377 sfree(in_str);
378 return (-1);
379 }
381 /*
382 * YAJL (Yet Another JSON Library) helper functions
383 * Documentation (https://lloyd.github.io/yajl/)
384 */
386 /* Add null-terminated string into YAJL generator handle (JSON object).
387 * Similar function to yajl_gen_string() but takes null-terminated string
388 * instead of string and its length.
389 *
390 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
391 * string - Null-terminated string
392 */
393 static inline yajl_gen_status
394 ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
395 {
396 return yajl_gen_string(hander, string, strlen(string));
397 }
399 /* Add YAJL value into YAJL generator handle (JSON object)
400 *
401 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
402 * jval - YAJL value usually returned by yajl_tree_get()
403 */
404 static yajl_gen_status
405 ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval)
406 {
407 size_t array_len = 0;
408 yajl_val *jvalues = NULL;
409 yajl_val jobj_value = NULL;
410 const char *obj_key = NULL;
411 size_t obj_len = 0;
412 yajl_gen_status yajl_gen_ret;
414 if (YAJL_IS_STRING(jval))
415 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
416 else if (YAJL_IS_DOUBLE(jval))
417 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
418 else if (YAJL_IS_INTEGER(jval))
419 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
420 else if (YAJL_IS_TRUE(jval))
421 OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
422 else if (YAJL_IS_FALSE(jval))
423 OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
424 else if (YAJL_IS_NULL(jval))
425 OVS_YAJL_CALL(yajl_gen_null, jgen);
426 else if (YAJL_IS_ARRAY(jval)) {
427 /* create new array and add all elements into the array */
428 array_len = YAJL_GET_ARRAY(jval)->len;
429 jvalues = YAJL_GET_ARRAY(jval)->values;
430 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
431 for (int i = 0; i < array_len; i++)
432 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
433 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
434 } else if (YAJL_IS_OBJECT(jval)) {
435 /* create new object and add all elements into the object */
436 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
437 obj_len = YAJL_GET_OBJECT(jval)->len;
438 for (int i = 0; i < obj_len; i++) {
439 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
440 jobj_value = YAJL_GET_OBJECT(jval)->values[i];
441 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
442 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
443 }
444 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
445 } else {
446 OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
447 (int)(jval)->type);
448 goto yajl_gen_failure;
449 }
450 return yajl_gen_status_ok;
452 yajl_gen_failure:
453 OVS_ERROR("%s() error to generate value", __FUNCTION__);
454 return yajl_gen_ret;
455 }
457 /* OVS DB echo request handler. When OVS DB sends
458 * "echo" request to the client, client should generate
459 * "echo" replay with the same content received in the
460 * request */
461 static int
462 ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
463 {
464 yajl_val jparams;
465 yajl_val jid;
466 yajl_gen jgen;
467 size_t resp_len = 0;
468 const char *resp = NULL;
469 const char *params_path[] = {"params", NULL};
470 const char *id_path[] = {"id", NULL};
471 yajl_gen_status yajl_gen_ret;
473 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
474 return (-1);
476 /* check & get request attributes */
477 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
478 ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
479 OVS_ERROR("parse echo request failed");
480 goto yajl_gen_failure;
481 }
483 /* generate JSON echo response */
484 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
486 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
487 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
489 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
490 OVS_YAJL_CALL(yajl_gen_null, jgen);
492 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
493 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
495 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
496 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
497 &resp_len);
499 /* send the response */
500 OVS_DEBUG("response: %s", resp);
501 if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
502 OVS_ERROR("send echo reply failed");
503 goto yajl_gen_failure;
504 }
505 /* clean up and return success */
506 yajl_gen_clear(jgen);
507 return (0);
509 yajl_gen_failure:
510 /* release memory */
511 yajl_gen_clear(jgen);
512 return (-1);
513 }
515 /* Get OVS DB registered callback by YAJL val. The YAJL
516 * value should be YAJL string (UID). Returns NULL if
517 * callback hasn't been found.
518 */
519 static ovs_callback_t *
520 ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
521 {
522 char *endptr = NULL;
523 const char *suid = NULL;
524 uint64_t uid;
526 if (jid && YAJL_IS_STRING(jid)) {
527 suid = YAJL_GET_STRING(jid);
528 uid = (uint64_t) strtoul(suid, &endptr, 16);
529 if (*endptr == '\0' && uid)
530 return ovs_db_callback_get(pdb, uid);
531 }
533 return NULL;
534 }
536 /* OVS DB table update event handler.
537 * This callback is called by POLL thread if OVS DB
538 * table update callback is received from the DB
539 * server. Once registered callback found, it's called
540 * by this handler. */
541 static int
542 ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode)
543 {
544 ovs_callback_t *cb = NULL;
545 yajl_val jvalue;
546 yajl_val jparams;
547 yajl_val jtable_updates;
548 yajl_val jtable_update;
549 size_t obj_len = 0;
550 const char *table_name = NULL;
551 const char *params_path[] = {"params", NULL};
552 const char *id_path[] = {"id", NULL};
554 /* check & get request attributes */
555 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
556 (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
557 goto ovs_failure;
559 /* check array length: [<json-value>, <table-updates>] */
560 if (YAJL_GET_ARRAY(jparams)->len != 2)
561 goto ovs_failure;
563 jvalue = YAJL_GET_ARRAY(jparams)->values[0];
564 jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
565 if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
566 goto ovs_failure;
568 /* find registered callback based on <json-value> */
569 cb = ovs_db_table_callback_get(pdb, jvalue);
570 if (cb == NULL || cb->table.call == NULL)
571 goto ovs_failure;
573 /* call registered callback */
574 cb->table.call(jtable_updates);
575 return 0;
577 ovs_failure:
578 OVS_ERROR("invalid OVS DB table update event");
579 return (-1);
580 }
582 /* OVS DB result request handler.
583 * This callback is called by POLL thread if OVS DB
584 * result reply is received from the DB server.
585 * Once registered callback found, it's called
586 * by this handler. */
587 static int
588 ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
589 {
590 ovs_callback_t *cb = NULL;
591 yajl_val jresult;
592 yajl_val jerror;
593 yajl_val jid;
594 const char *result_path[] = {"result", NULL};
595 const char *error_path[] = {"error", NULL};
596 const char *id_path[] = {"id", NULL};
598 jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
599 jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
600 jid = yajl_tree_get(jnode, id_path, yajl_t_string);
602 /* check & get result attributes */
603 if (!jresult || !jerror || !jid)
604 return (-1);
606 /* try to find registered callback */
607 cb = ovs_db_table_callback_get(pdb, jid);
608 if (cb != NULL && cb->result.call != NULL) {
609 /* call registered callback */
610 cb->result.call(jresult, jerror);
611 /* unlock owner of the reply */
612 sem_post(&cb->result.sync);
613 }
615 return (0);
616 }
618 /* Handle JSON data (one request) and call
619 * appropriate event OVS DB handler. Currently,
620 * update callback 'ovs_db_table_update_cb' and
621 * result callback 'ovs_db_result_cb' is supported.
622 */
623 static int
624 ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
625 {
626 const char *method = NULL;
627 char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
628 const char *method_path[] = {"method", NULL};
629 const char *result_path[] = {"result", NULL};
630 char *sjson = NULL;
631 yajl_val jnode, jval;
633 /* duplicate the data to make null-terminated string
634 * required for yajl_tree_parse() */
635 if ((sjson = malloc(len + 1)) == NULL)
636 return (-1);
638 sstrncpy(sjson, data, len + 1);
639 OVS_DEBUG("[len=%d] %s", len, sjson);
641 /* parse json data */
642 jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
643 if (jnode == NULL) {
644 OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
645 sfree(sjson);
646 return (-1);
647 }
649 /* get method name */
650 if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) {
651 method = YAJL_GET_STRING(jval);
652 if (strcmp("echo", method) == 0) {
653 /* echo request from the server */
654 if (ovs_db_table_echo_cb(pdb, jnode) < 0)
655 OVS_ERROR("handle echo request failed");
656 } else if (strcmp("update", method) == 0) {
657 /* update notification */
658 if (ovs_db_table_update_cb(pdb, jnode) < 0)
659 OVS_ERROR("handle update notification failed");
660 }
661 } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_any)) {
662 /* result notification */
663 if (ovs_db_result_cb(pdb, jnode) < 0)
664 OVS_ERROR("handle result reply failed");
665 } else
666 OVS_ERROR("connot find method or result failed");
668 /* release memory */
669 yajl_tree_free(jnode);
670 sfree(sjson);
671 return (0);
672 }
674 /*
675 * JSON reader implementation.
676 *
677 * This module process raw JSON data (byte stream) and
678 * returns fully-fledged JSON data which can be processed
679 * (parsed) by YAJL later.
680 */
682 /* Allocate JSON reader instance */
683 static inline ovs_json_reader_t *
684 ovs_json_reader_alloc()
685 {
686 ovs_json_reader_t *jreader = NULL;
688 if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
689 return NULL;
691 return jreader;
692 }
694 /* Push raw data into into the JSON reader for processing */
695 static inline int
696 ovs_json_reader_push_data(ovs_json_reader_t *jreader,
697 const char *data, size_t data_len)
698 {
699 char *new_buff = NULL;
700 size_t available = jreader->buff_size - jreader->buff_offset;
702 /* check/update required memory space */
703 if (available < data_len) {
704 OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
705 (int)jreader->buff_size, (int)available, (int)data_len);
707 /* allocate new chunk of memory */
708 new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
709 if (new_buff == NULL)
710 return (-1);
712 /* point to new allocated memory */
713 jreader->buff_ptr = new_buff;
714 jreader->buff_size += data_len;
715 }
717 /* store input data */
718 memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
719 jreader->buff_offset += data_len;
720 return (0);
721 }
723 /* Pop one fully-fledged JSON if already exists. Returns 0 if
724 * completed JSON already exists otherwise negative value is
725 * returned */
726 static inline int
727 ovs_json_reader_pop(ovs_json_reader_t *jreader,
728 const char **json_ptr, size_t *json_len_ptr)
729 {
730 size_t nbraces = 0;
731 size_t json_len = 0;
732 char *json = NULL;
734 /* search open/close brace */
735 for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
736 if (jreader->buff_ptr[i] == '{') {
737 nbraces++;
738 } else if (jreader->buff_ptr[i] == '}')
739 if (nbraces)
740 if (!(--nbraces)) {
741 /* JSON data */
742 *json_ptr = jreader->buff_ptr + jreader->json_offset;
743 *json_len_ptr = json_len + 1;
744 jreader->json_offset = i + 1;
745 return (0);
746 }
748 /* increase JSON data length */
749 if (nbraces)
750 json_len++;
751 }
753 if (jreader->json_offset) {
754 if (jreader->json_offset < jreader->buff_offset) {
755 /* shift data to the beginning of the buffer
756 * and zero rest of the buffer data */
757 json = &jreader->buff_ptr[jreader->json_offset];
758 json_len = jreader->buff_offset - jreader->json_offset;
759 for (int i = 0; i < jreader->buff_size; i++)
760 jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
761 jreader->buff_offset = json_len;
762 } else
763 /* reset the buffer */
764 jreader->buff_offset = 0;
766 /* data is at the beginning of the buffer */
767 jreader->json_offset = 0;
768 }
770 return (-1);
771 }
773 /* Reset JSON reader. It is useful when start processing
774 * new raw data. E.g.: in case of lost stream connection.
775 */
776 static inline void
777 ovs_json_reader_reset(ovs_json_reader_t *jreader)
778 {
779 if (jreader) {
780 jreader->buff_offset = 0;
781 jreader->json_offset = 0;
782 }
783 }
785 /* Release internal data allocated for JSON reader */
786 static inline void
787 ovs_json_reader_free(ovs_json_reader_t *jreader)
788 {
789 if (jreader) {
790 free(jreader->buff_ptr);
791 free(jreader);
792 }
793 }
795 /* Reconnect to OVD DB and call init OVS DB callback
796 * 'init_cb' if connection has been established.
797 */
798 static int
799 ovs_db_reconnect(ovs_db_t *pdb)
800 {
801 char errbuff[OVS_ERROR_BUFF_SIZE];
803 /* remove all registered OVS DB table/result callbacks */
804 ovs_db_callback_remove_all(pdb);
806 /* open new socket */
807 if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) {
808 sstrerror(errno, errbuff, sizeof(errbuff));
809 OVS_ERROR("socket(): %s", errbuff);
810 return (-1);
811 }
813 /* try to connect to server */
814 if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr,
815 pdb->conn.addr_size) < 0) {
816 sstrerror(errno, errbuff, sizeof(errbuff));
817 OVS_ERROR("connect(): %s", errbuff);
818 close(pdb->conn.sock);
819 return (-1);
820 }
822 /* send notification to event thread */
823 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
824 return (0);
825 }
827 /* POLL worker thread.
828 * It listens on OVS DB connection for incoming
829 * requests/reply/events etc. Also, it reconnects to OVS DB
830 * if connection has been lost.
831 */
832 static void *
833 ovs_poll_worker(void *arg)
834 {
835 ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
836 ovs_json_reader_t *jreader = NULL;
837 const char *json;
838 size_t json_len;
839 ssize_t nbytes = 0;
840 char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
841 struct pollfd poll_fd;
842 int poll_ret = 0;
844 if ((jreader = ovs_json_reader_alloc()) == NULL) {
845 OVS_ERROR("initialize json reader failed");
846 goto thread_exit;
847 }
849 /* start polling data */
850 poll_fd.fd = pdb->conn.sock;
851 poll_fd.events = POLLIN | POLLPRI;
852 poll_fd.revents = 0;
854 /* poll data */
855 while (ovs_db_poll_is_running(pdb)) {
856 poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
857 if (poll_ret > 0) {
858 if (poll_fd.revents & POLLNVAL) {
859 /* invalid file descriptor, reconnect */
860 if (ovs_db_reconnect(pdb) != 0) {
861 /* sleep awhile until next reconnect */
862 usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
863 }
864 ovs_json_reader_reset(jreader);
865 poll_fd.fd = pdb->conn.sock;
866 } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
867 /* connection is broken */
868 OVS_ERROR("poll() peer closed its end of the channel");
869 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
870 close(poll_fd.fd);
871 } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
872 /* read incoming data */
873 nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
874 if (nbytes > 0) {
875 OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
876 ovs_json_reader_push_data(jreader, buff, nbytes);
877 while (!ovs_json_reader_pop(jreader, &json, &json_len))
878 /* process JSON data */
879 ovs_db_json_data_process(pdb, json, json_len);
880 } else if (nbytes == 0) {
881 OVS_ERROR("recv() peer has performed an orderly shutdown");
882 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
883 close(poll_fd.fd);
884 } else {
885 OVS_ERROR("recv() receive data error");
886 break;
887 }
888 } /* poll() POLLIN & POLLPRI */
889 } else if (poll_ret == 0)
890 OVS_DEBUG("poll() timeout");
891 else {
892 OVS_ERROR("poll() error");
893 break;
894 }
895 }
897 thread_exit:
898 OVS_DEBUG("poll thread has been completed");
899 ovs_json_reader_free(jreader);
900 pthread_exit((void *)0);
901 return ((void *)0);
902 }
904 /* EVENT worker thread.
905 * Perform task based on incoming events. This
906 * task can be done asynchronously which allows to
907 * handle OVD DB callback like 'init_cb'.
908 */
909 static void *
910 ovs_event_worker(void *arg)
911 {
912 int ret = 0;
913 ovs_db_t *pdb = (ovs_db_t *)arg;
914 struct timespec ts;
916 while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
917 /* wait for an event */
918 clock_gettime(CLOCK_REALTIME, &ts);
919 ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
920 ret = pthread_cond_timedwait(&pdb->event_thread.cond,
921 &pdb->event_thread.mutex, &ts);
922 if (!ret) {
923 /* handle the event */
924 OVS_DEBUG("handle event %d", pdb->event_thread.value);
925 switch (pdb->event_thread.value) {
926 case OVS_DB_EVENT_CONN_ESTABLISHED:
927 if (pdb->cb.post_conn_init)
928 pdb->cb.post_conn_init(pdb);
929 break;
930 case OVS_DB_EVENT_CONN_TERMINATED:
931 if (pdb->cb.post_conn_terminate)
932 pdb->cb.post_conn_terminate();
933 break;
934 default:
935 OVS_DEBUG("unknown event received");
936 break;
937 }
938 } else if (ret == ETIMEDOUT) {
939 /* wait timeout */
940 OVS_DEBUG("no event received (timeout)");
941 continue;
942 } else {
943 /* unexpected error */
944 OVS_ERROR("pthread_cond_timedwait() failed");
945 break;
946 }
947 }
949 thread_exit:
950 OVS_DEBUG("event thread has been completed");
951 pthread_exit((void *)0);
952 return ((void *)0);
953 }
955 /* Stop EVENT thread */
956 static int
957 ovs_db_event_thread_stop(ovs_db_t *pdb)
958 {
959 ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
960 if (pthread_join(pdb->event_thread.tid, NULL) != 0)
961 return (-1);
962 pthread_mutex_unlock(&pdb->event_thread.mutex);
963 pthread_mutex_destroy(&pdb->event_thread.mutex);
964 return (0);
965 }
967 /* Stop POLL thread */
968 static int
969 ovs_db_poll_thread_stop(ovs_db_t *pdb)
970 {
971 ovs_db_poll_terminate(pdb);
972 if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
973 return (-1);
974 pthread_mutex_destroy(&pdb->poll_thread.mutex);
975 return (0);
976 }
978 /*
979 * Public OVS DB API implementation
980 */
982 ovs_db_t *
983 ovs_db_init(const char *surl, ovs_db_callback_t *cb)
984 {
985 pthread_mutexattr_t mutex_attr;
986 ovs_db_t *pdb = NULL;
988 /* allocate db data & fill it */
989 if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
990 return (NULL);
992 /* convert string url to socket addr */
993 if (ovs_db_url_parse(surl, &pdb->conn) < 0)
994 goto failure;
996 /* setup OVS DB callbacks */
997 if (cb)
998 pdb->cb = *cb;
1000 /* prepare event thread */
1001 pthread_cond_init(&pdb->event_thread.cond, NULL);
1002 pthread_mutex_init(&pdb->event_thread.mutex, NULL);
1003 pthread_mutex_lock(&pdb->event_thread.mutex);
1004 if (plugin_thread_create(&pdb->event_thread.tid, NULL,
1005 ovs_event_worker, pdb) != 0) {
1006 OVS_ERROR("event worker start failed");
1007 goto failure;
1008 }
1010 /* prepare polling thread */
1011 ovs_db_reconnect(pdb);
1012 pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
1013 pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
1014 if (plugin_thread_create(&pdb->poll_thread.tid, NULL,
1015 ovs_poll_worker, pdb) != 0) {
1016 OVS_ERROR("pull worker start failed");
1017 goto failure;
1018 }
1020 /* init OVS DB mutex */
1021 if (pthread_mutexattr_init(&mutex_attr) ||
1022 pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
1023 pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1024 OVS_ERROR("OVS DB mutex init failed");
1025 goto failure;
1026 }
1028 /* return db to the caller */
1029 return pdb;
1031 failure:
1032 if (pdb->conn.sock)
1033 /* close connection */
1034 close(pdb->conn.sock);
1035 if (pdb->event_thread.tid != 0)
1036 /* stop event thread */
1037 if (ovs_db_event_thread_stop(pdb) < 0)
1038 OVS_ERROR("stop event thread failed");
1039 if (pdb->poll_thread.tid != 0)
1040 /* stop poll thread */
1041 if (ovs_db_poll_thread_stop(pdb) < 0)
1042 OVS_ERROR("stop poll thread failed");
1043 sfree(pdb);
1044 return NULL;
1045 }
1047 int
1048 ovs_db_send_request(ovs_db_t *pdb, const char *method,
1049 const char *params, ovs_db_result_cb_t cb)
1050 {
1051 int ret = 0;
1052 yajl_gen_status yajl_gen_ret;
1053 yajl_val jparams;
1054 yajl_gen jgen;
1055 ovs_callback_t *new_cb = NULL;
1056 uint64_t uid;
1057 char uid_buff[OVS_UID_STR_SIZE];
1058 const char *req = NULL;
1059 size_t req_len = 0;
1060 struct timespec ts;
1062 /* sanity check */
1063 if (!pdb || !method || !params)
1064 return (-1);
1066 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1067 return (-1);
1069 /* try to parse params */
1070 if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1071 OVS_ERROR("params is not a JSON string");
1072 yajl_gen_clear(jgen);
1073 return (-1);
1074 }
1076 /* generate method field */
1077 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1079 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1080 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1082 /* generate params field */
1083 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1084 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1085 yajl_tree_free(jparams);
1087 /* generate id field */
1088 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1089 uid = ovs_uid_generate();
1090 ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1091 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1093 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1095 if (cb) {
1096 /* register result callback */
1097 if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1098 goto yajl_gen_failure;
1100 /* add new callback to front */
1101 sem_init(&new_cb->result.sync, 0, 0);
1102 new_cb->result.call = cb;
1103 new_cb->uid = uid;
1104 ovs_db_callback_add(pdb, new_cb);
1105 }
1107 /* send the request */
1108 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req,
1109 &req_len);
1110 OVS_DEBUG("%s", req);
1111 if (!ovs_db_data_send(pdb, req, req_len)) {
1112 if (cb) {
1113 /* wait for result */
1114 clock_gettime(CLOCK_REALTIME, &ts);
1115 ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1116 if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1117 OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1118 OVS_DB_SEND_REQ_TIMEOUT);
1119 ret = (-1);
1120 }
1121 }
1122 } else {
1123 OVS_ERROR("ovs_db_data_send() failed");
1124 ret = (-1);
1125 }
1127 yajl_gen_failure:
1128 if (new_cb) {
1129 /* destroy callback */
1130 sem_destroy(&new_cb->result.sync);
1131 ovs_db_callback_remove(pdb, new_cb);
1132 }
1134 /* release memory */
1135 yajl_gen_clear(jgen);
1136 return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1137 }
1139 int
1140 ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1141 const char **tb_column, ovs_db_table_cb_t update_cb,
1142 ovs_db_result_cb_t result_cb, unsigned int flags)
1143 {
1144 yajl_gen jgen;
1145 yajl_gen_status yajl_gen_ret;
1146 ovs_callback_t *new_cb = NULL;
1147 char uid_str[OVS_UID_STR_SIZE];
1148 char *params;
1149 size_t params_len;
1150 int ovs_db_ret = 0;
1152 /* sanity check */
1153 if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1154 return (-1);
1156 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1157 return (-1);
1159 /* register table update callback */
1160 if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1161 return (-1);
1163 /* add new callback to front */
1164 new_cb->table.call = update_cb;
1165 new_cb->uid = ovs_uid_generate();
1166 ovs_db_callback_add(pdb, new_cb);
1168 /* make update notification request
1169 * [<db-name>, <json-value>, <monitor-requests>] */
1170 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1171 {
1172 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1174 /* uid string <json-value> */
1175 ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1176 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1178 /* <monitor-requests> */
1179 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1180 {
1181 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1182 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1183 {
1184 /* <monitor-request> */
1185 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1186 {
1187 if (tb_column) {
1188 /* columns within the table to be monitored */
1189 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1190 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1191 for (; *tb_column; tb_column++)
1192 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1193 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1194 }
1195 /* specify select option */
1196 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1197 {
1198 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1199 {
1200 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1201 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1202 flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1203 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1204 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1205 flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1206 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1207 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1208 flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1209 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1210 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1211 flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1212 }
1213 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1214 }
1215 }
1216 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1217 }
1218 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1219 }
1220 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1221 }
1222 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1224 /* make a request to subscribe to given table */
1225 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)¶ms,
1226 ¶ms_len);
1227 if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1228 OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1229 ovs_db_ret = (-1);
1230 }
1232 yajl_gen_failure:
1233 /* release memory */
1234 yajl_gen_clear(jgen);
1235 return ovs_db_ret;
1236 }
1238 int
1239 ovs_db_destroy(ovs_db_t *pdb)
1240 {
1241 int ovs_db_ret = 0;
1242 int ret = 0;
1244 /* sanity check */
1245 if (pdb == NULL)
1246 return (-1);
1248 /* try to lock the structure before releasing */
1249 if (ret = pthread_mutex_lock(&pdb->mutex)) {
1250 OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
1251 return (-1);
1252 }
1254 /* stop poll thread */
1255 if (ovs_db_event_thread_stop(pdb) < 0) {
1256 OVS_ERROR("stop poll thread failed");
1257 ovs_db_ret = (-1);
1258 }
1260 /* stop event thread */
1261 if (ovs_db_poll_thread_stop(pdb) < 0) {
1262 OVS_ERROR("stop event thread failed");
1263 ovs_db_ret = (-1);
1264 }
1266 /* unsubscribe callbacks */
1267 ovs_db_callback_remove_all(pdb);
1269 /* close connection */
1270 if (pdb->conn.sock)
1271 close(pdb->conn.sock);
1273 /* release DB handler */
1274 pthread_mutex_unlock(&pdb->mutex);
1275 pthread_mutex_destroy(&pdb->mutex);
1276 sfree(pdb);
1277 return ovs_db_ret;
1278 }
1280 /*
1281 * Public OVS utils API implementation
1282 */
1284 /* Get YAJL value by key from YAJL dictionary */
1285 yajl_val
1286 ovs_utils_get_value_by_key(yajl_val jval, const char *key)
1287 {
1288 const char *obj_key = NULL;
1290 /* check params */
1291 if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1292 return NULL;
1294 /* find a value by key */
1295 for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1296 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1297 if (strcmp(obj_key, key) == 0)
1298 return YAJL_GET_OBJECT(jval)->values[i];
1299 }
1301 return NULL;
1302 }
1304 /* Get OVS DB map value by given map key */
1305 yajl_val
1306 ovs_utils_get_map_value(yajl_val jval, const char *key)
1307 {
1308 size_t map_len = 0;
1309 size_t array_len = 0;
1310 yajl_val *map_values = NULL;
1311 yajl_val *array_values = NULL;
1313 /* check YAJL array */
1314 if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1315 return NULL;
1317 /* check a database map value (2-element, first one should be a string */
1318 array_len = YAJL_GET_ARRAY(jval)->len;
1319 array_values = YAJL_GET_ARRAY(jval)->values;
1320 if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1321 (!YAJL_IS_ARRAY(array_values[1])))
1322 return NULL;
1324 /* check first element of the array */
1325 if (strcmp("map", YAJL_GET_STRING(array_values[0])) != 0)
1326 return NULL;
1328 /* try to find map value by map key */
1329 map_len = YAJL_GET_ARRAY(array_values[1])->len;
1330 map_values = YAJL_GET_ARRAY(array_values[1])->values;
1331 for (int i = 0; i < map_len; i++) {
1332 /* check YAJL array */
1333 if (!YAJL_IS_ARRAY(map_values[i]))
1334 break;
1336 /* check a database pair value (2-element, first one represents a key
1337 * and it should be a string in our case */
1338 array_len = YAJL_GET_ARRAY(map_values[i])->len;
1339 array_values = YAJL_GET_ARRAY(map_values[i])->values;
1340 if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1341 break;
1343 /* return map value if given key equals map key */
1344 if (strcmp(key, YAJL_GET_STRING(array_values[0])) == 0)
1345 return array_values[1];
1346 }
1347 return NULL;
1348 }