1 /**
2 * collectd - src/utils_ovs.c
3 *
4 * Copyright(c) 2016 Intel Corporation. All rights reserved.
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy of
7 * this software and associated documentation files (the "Software"), to deal in
8 * the Software without restriction, including without limitation the rights to
9 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10 * of the Software, and to permit persons to whom the Software is furnished to do
11 * so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 *
24 * Authors:
25 * Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
26 *
27 * OVS DB API internal architecture diagram
28 * +------------------------------------------------------------------------------+
29 * |OVS plugin |OVS utils |
30 * | | +------------------------+ |
31 * | | | echo handler | JSON request/ |
32 * | | +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
33 * | | | | | | | result |
34 * | | | +------------------------+ | | |
35 * | | | | +----+---+--------+ |
36 * | +----------+ | | +------------------------+ | | | | |
37 * | | update | | | | update handler | | | YAJL | JSON | |
38 * | | callback +<-------+(ovs_db_table_update_cp)+<---+ | parser | reader | |
39 * | +----------+ | | | | | | | | |
40 * | | | +------------------------+ | +--------+---+----+ |
41 * | | | | ^ |
42 * | +----------+ | | +------------------------+ | | |
43 * | | result | | | | result handler | | | |
44 * | | callback +<-------+ (ovs_db_result_cb) +<---+ JSON raw | |
45 * | +----------+ | | | | data | |
46 * | | | +------------------------+ | |
47 * | | | | |
48 * | | | +------------------+ +------------+----+ |
49 * | +----------+ | | |thread| | |thread| | |
50 * | | init | | | | | reconnect | | |
51 * | | callback +<---------+ EVENT WORKER +<------------+ POLL WORKER | |
52 * | +----------+ | | +------------------+ +--------+--------+ |
53 * | | | ^ |
54 * +----------------+-------------------------------------------------------------+
55 * | |
56 * JSON|echo reply raw|data
57 * v v
58 * +-------------------+----------------------------------------------+-----------+
59 * | TCP/UNIX socket |
60 * +-------------------------------------------------------------------------------
61 *
62 **/
64 /* collectd headers */
65 #include "common.h"
67 /* private headers */
68 #include "utils_ovs.h"
70 /* system libraries */
71 #include <semaphore.h>
72 #include <arpa/inet.h>
73 #include <poll.h>
74 #include <sys/un.h>
76 #define OVS_ERROR(fmt, ...) do { \
77 ERROR("ovs_utils: "fmt, ## __VA_ARGS__); } while (0)
78 #define OVS_DEBUG(fmt, ...) do { \
79 DEBUG("%s:%d:%s(): "fmt, __FILE__, __LINE__, __FUNCTION__, \
80 ## __VA_ARGS__); } while (0)
82 #define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */
83 #define OVS_DB_POLL_READ_BLOCK_SIZE 5 /* read block size (bytes) */
84 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
85 #define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */
87 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
88 #define OVS_DB_EVENT_TERMINATE 1
89 #define OVS_DB_EVENT_CONNECTED 2
91 #define OVS_DB_POLL_STATE_RUNNING 1
92 #define OVS_DB_POLL_STATE_EXITING 2
94 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
96 #define OVS_YAJL_CALL(func, ...) \
97 do { \
98 yajl_gen_ret = yajl_gen_status_ok; \
99 if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
100 goto yajl_gen_failure; \
101 } while (0)
102 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
103 #define OVS_ERROR_BUFF_SIZE 512
104 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
106 /* JSON reader internal data */
107 struct ovs_json_reader_s {
108 char *buff_ptr;
109 size_t buff_size;
110 size_t buff_offset;
111 size_t json_offset;
112 };
113 typedef struct ovs_json_reader_s ovs_json_reader_t;
115 /* Result callback declaration */
116 struct ovs_result_cb_s {
117 sem_t sync;
118 ovs_db_result_cb_t call;
119 };
120 typedef struct ovs_result_cb_s ovs_result_cb_t;
122 /* Table callback declaration */
123 struct ovs_table_cb_s {
124 ovs_db_table_cb_t call;
125 };
126 typedef struct ovs_table_cb_s ovs_table_cb_t;
128 /* Callback declaration */
129 struct ovs_callback_s {
130 uint64_t uid;
131 union {
132 ovs_result_cb_t result;
133 ovs_table_cb_t table;
134 };
135 struct ovs_callback_s *next;
136 struct ovs_callback_s *prev;
137 };
138 typedef struct ovs_callback_s ovs_callback_t;
140 /* Connection declaration */
141 struct ovs_conn_s {
142 int sock;
143 int domain;
144 int type;
145 int addr_size;
146 union {
147 struct sockaddr_in s_inet;
148 struct sockaddr_un s_unix;
149 } addr;
150 };
151 typedef struct ovs_conn_s ovs_conn_t;
153 /* Event thread data declaration */
154 struct ovs_event_thread_s {
155 pthread_t tid;
156 pthread_mutex_t mutex;
157 pthread_cond_t cond;
158 int value;
159 };
160 typedef struct ovs_event_thread_s ovs_event_thread_t;
162 /* Poll thread data declaration */
163 struct ovs_poll_thread_s {
164 pthread_t tid;
165 pthread_mutex_t mutex;
166 int state;
167 };
168 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
170 /* OVS DB internal data declaration */
171 struct ovs_db_s {
172 ovs_poll_thread_t poll_thread;
173 ovs_event_thread_t event_thread;
174 pthread_mutex_t mutex;
175 ovs_callback_t *cb;
176 ovs_conn_t conn;
177 ovs_db_init_cb_t init_cb;
178 };
179 typedef struct ovs_db_s ovs_db_t;
181 /* Post an event to event thread.
182 * Possible events are:
183 * OVS_DB_EVENT_TERMINATE
184 * OVS_DB_EVENT_CONNECTED
185 */
186 static void
187 ovs_db_event_post(ovs_db_t *pdb, int event)
188 {
189 pthread_mutex_lock(&pdb->event_thread.mutex);
190 pdb->event_thread.value = event;
191 pthread_mutex_unlock(&pdb->event_thread.mutex);
192 pthread_cond_signal(&pdb->event_thread.cond);
193 }
195 /* Check if POLL thread is still running. Returns
196 * 1 if running otherwise 0 is returned */
197 static inline int
198 ovs_db_poll_is_running(ovs_db_t *pdb)
199 {
200 int state = 0;
201 pthread_mutex_lock(&pdb->poll_thread.mutex);
202 state = pdb->poll_thread.state;
203 pthread_mutex_unlock(&pdb->poll_thread.mutex);
204 return (state == OVS_DB_POLL_STATE_RUNNING);
205 }
207 /* Terminate POLL thread */
208 static inline void
209 ovs_db_poll_terminate(ovs_db_t *pdb)
210 {
211 pthread_mutex_lock(&pdb->poll_thread.mutex);
212 pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
213 pthread_mutex_unlock(&pdb->poll_thread.mutex);
214 }
216 /* Generate unique identifier (UID). It is used by OVS DB API
217 * to set "id" field for any OVS DB JSON request. */
218 static uint64_t
219 ovs_uid_generate()
220 {
221 struct timespec ts;
222 clock_gettime(CLOCK_MONOTONIC, &ts);
223 return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
224 }
226 /*
227 * Callback API. These function are used to store
228 * registered callbacks in OVS DB API.
229 */
231 /* Add new callback into OVS DB object */
232 static void
233 ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
234 {
235 pthread_mutex_lock(&pdb->mutex);
236 if (pdb->cb)
237 pdb->cb->prev = new_cb;
238 new_cb->next = pdb->cb;
239 new_cb->prev = NULL;
240 pdb->cb = new_cb;
241 pthread_mutex_unlock(&pdb->mutex);
242 }
244 /* Remove callback from OVS DB object */
245 static void
246 ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
247 {
248 ovs_callback_t *pre_cb = del_cb->prev;
249 ovs_callback_t *next_cb = del_cb->next;
251 pthread_mutex_lock(&pdb->mutex);
252 if (next_cb)
253 next_cb->prev = del_cb->prev;
255 if (pre_cb)
256 pre_cb->next = del_cb->next;
257 else
258 pdb->cb = del_cb->next;
260 free(del_cb);
261 pthread_mutex_unlock(&pdb->mutex);
262 }
264 /* Remove all callbacks form OVS DB object */
265 static void
266 ovs_db_callback_remove_all(ovs_db_t *pdb)
267 {
268 pthread_mutex_lock(&pdb->mutex);
269 for (ovs_callback_t *del_cb = pdb->cb; pdb->cb; del_cb = pdb->cb) {
270 pdb->cb = pdb->cb->next;
271 free(del_cb);
272 }
273 pdb->cb = NULL;
274 pthread_mutex_unlock(&pdb->mutex);
275 }
277 /* Get/find callback in OVS DB object by UID. Returns pointer
278 * to requested callback otherwise NULL is returned */
279 static ovs_callback_t *
280 ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid)
281 {
282 pthread_mutex_lock(&pdb->mutex);
283 for (ovs_callback_t *cb = pdb->cb; cb != NULL; cb = cb->next)
284 if (cb->uid == uid) {
285 pthread_mutex_unlock(&pdb->mutex);
286 return cb;
287 }
288 pthread_mutex_unlock(&pdb->mutex);
289 return NULL;
290 }
292 /* Send all requested data to the socket. Returns 0 if
293 * ALL request data has been sent otherwise negative value
294 * is returned */
295 static int
296 ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len)
297 {
298 ssize_t nbytes = 0;
299 size_t rem = len;
300 size_t off = 0;
302 while (rem > 0) {
303 if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0)
304 return (-1);
305 rem -= (size_t)nbytes;
306 off += (size_t)nbytes;
307 }
308 return (0);
309 }
311 /* Parse OVS server URL.
312 * Format of the URL:
313 * "tcp:a.b.c.d:port" - define TCP connection (INET domain)
314 * "unix:file" - define UNIX socket file (UNIX domain)
315 */
316 static int
317 ovs_db_url_parse(const char *surl, ovs_conn_t *conn)
318 {
319 ovs_conn_t tmp_conn;
320 char *nexttok = NULL;
321 char *in_str = NULL;
322 char *saveptr;
323 int ret = 0;
325 /* sanity check */
326 if ((surl == NULL) || (strlen(surl) < 1))
327 return (-1);
329 /* parse domain */
330 tmp_conn = *conn;
331 in_str = sstrdup(surl);
332 if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) {
333 if (strcmp("tcp", nexttok) == 0) {
334 tmp_conn.domain = AF_INET;
335 tmp_conn.type = SOCK_STREAM;
336 tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet);
337 } else if (strcmp("unix", nexttok) == 0) {
338 tmp_conn.domain = AF_UNIX;
339 tmp_conn.type = SOCK_STREAM;
340 tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix);
341 } else
342 goto failure;
343 } else
344 goto failure;
346 /* parse url depending on domain */
347 if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) {
348 if (tmp_conn.domain == AF_UNIX) {
349 /* <UNIX-NAME> */
350 tmp_conn.addr.s_inet.sin_family = AF_UNIX;
351 sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1);
352 } else {
353 /* <IP:PORT> */
354 tmp_conn.addr.s_inet.sin_family = AF_INET;
355 ret =
356 inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr);
357 if (ret == 1) {
358 if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL)
359 tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok));
360 else
361 goto failure;
362 } else
363 goto failure;
364 }
365 }
367 /* save result and return success */
368 *conn = tmp_conn;
369 sfree(in_str);
370 return (0);
372 failure:
373 OVS_ERROR("%s() : invalid OVS DB URL provided");
374 sfree(in_str);
375 return (-1);
376 }
378 /*
379 * YAJL (Yet Another JSON Library) helper functions
380 * Documentation (https://lloyd.github.io/yajl/)
381 */
383 /* Add null-terminated string into YAJL generator handle (JSON object).
384 * Similar function to yajl_gen_string() but takes null-terminated string
385 * instead of string and its length.
386 *
387 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
388 * string - Null-terminated string
389 */
390 static inline yajl_gen_status
391 ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
392 {
393 return yajl_gen_string(hander, string, strlen(string));
394 }
396 /* Add YAJL value into YAJL generator handle (JSON object)
397 *
398 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
399 * jval - YAJL value usually returned by yajl_tree_get()
400 */
401 static yajl_gen_status
402 ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval)
403 {
404 size_t array_len = 0;
405 yajl_val *jvalues = NULL;
406 yajl_val jobj_value = NULL;
407 const char *obj_key = NULL;
408 size_t obj_len = 0;
409 yajl_gen_status yajl_gen_ret;
411 if (YAJL_IS_STRING(jval))
412 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
413 else if (YAJL_IS_DOUBLE(jval))
414 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
415 else if (YAJL_IS_INTEGER(jval))
416 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
417 else if (YAJL_IS_TRUE(jval))
418 OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
419 else if (YAJL_IS_FALSE(jval))
420 OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
421 else if (YAJL_IS_NULL(jval))
422 OVS_YAJL_CALL(yajl_gen_null, jgen);
423 else if (YAJL_IS_ARRAY(jval)) {
424 /* create new array and add all elements into the array */
425 array_len = YAJL_GET_ARRAY(jval)->len;
426 jvalues = YAJL_GET_ARRAY(jval)->values;
427 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
428 for (int i = 0; i < array_len; i++)
429 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
430 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
431 } else if (YAJL_IS_OBJECT(jval)) {
432 /* create new object and add all elements into the object */
433 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
434 obj_len = YAJL_GET_OBJECT(jval)->len;
435 for (int i = 0; i < obj_len; i++) {
436 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
437 jobj_value = YAJL_GET_OBJECT(jval)->values[i];
438 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
439 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
440 }
441 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
442 } else {
443 OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
444 (int)(jval)->type);
445 goto yajl_gen_failure;
446 }
447 return yajl_gen_status_ok;
449 yajl_gen_failure:
450 OVS_ERROR("%s() error to generate value", __FUNCTION__);
451 return yajl_gen_ret;
452 }
454 /* OVS DB echo request handler. When OVS DB sends
455 * "echo" request to the client, client should generate
456 * "echo" replay with the same content received in the
457 * request */
458 static int
459 ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
460 {
461 yajl_val jparams;
462 yajl_val jid;
463 yajl_gen jgen;
464 size_t resp_len = 0;
465 const char *resp = NULL;
466 const char *params_path[] = {"params", NULL};
467 const char *id_path[] = {"id", NULL};
468 yajl_gen_status yajl_gen_ret;
470 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
471 return (-1);
473 /* check & get request attributes */
474 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
475 ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
476 OVS_ERROR("parse echo request failed");
477 goto yajl_gen_failure;
478 }
480 /* generate JSON echo response */
481 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
483 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
484 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
486 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
487 OVS_YAJL_CALL(yajl_gen_null, jgen);
489 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
490 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
492 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
493 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
494 &resp_len);
496 /* send the response */
497 OVS_DEBUG("response: %s", resp);
498 if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
499 OVS_ERROR("send echo reply failed");
500 goto yajl_gen_failure;
501 }
502 /* clean up and return success */
503 yajl_gen_clear(jgen);
504 return (0);
506 yajl_gen_failure:
507 /* release memory */
508 yajl_gen_clear(jgen);
509 return (-1);
510 }
512 /* Get OVS DB registered callback by YAJL val. The YAJL
513 * value should be YAJL string (UID). Returns NULL if
514 * callback hasn't been found.
515 */
516 static ovs_callback_t *
517 ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
518 {
519 char *endptr = NULL;
520 const char *suid = NULL;
521 uint64_t uid;
523 if (jid && YAJL_IS_STRING(jid)) {
524 suid = YAJL_GET_STRING(jid);
525 uid = (uint64_t) strtoul(suid, &endptr, 16);
526 if (*endptr == '\0' && uid)
527 return ovs_db_callback_get(pdb, uid);
528 }
530 return NULL;
531 }
533 /* OVS DB table update event handler.
534 * This callback is called by POLL thread if OVS DB
535 * table update callback is received from the DB
536 * server. Once registered callback found, it's called
537 * by this handler. */
538 static int
539 ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode)
540 {
541 ovs_callback_t *cb = NULL;
542 yajl_val jvalue;
543 yajl_val jparams;
544 yajl_val jtable_updates;
545 yajl_val jtable_update;
546 size_t obj_len = 0;
547 const char *table_name = NULL;
548 const char *params_path[] = {"params", NULL};
549 const char *id_path[] = {"id", NULL};
551 /* check & get request attributes */
552 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
553 (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
554 goto ovs_failure;
556 /* check array length: [<json-value>, <table-updates>] */
557 if (YAJL_GET_ARRAY(jparams)->len != 2)
558 goto ovs_failure;
560 jvalue = YAJL_GET_ARRAY(jparams)->values[0];
561 jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
562 if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
563 goto ovs_failure;
565 /* find registered callback based on <json-value> */
566 cb = ovs_db_table_callback_get(pdb, jvalue);
567 if (cb == NULL || cb->table.call == NULL)
568 goto ovs_failure;
570 /* call registered callback */
571 cb->table.call(jtable_updates);
572 return 0;
574 ovs_failure:
575 OVS_ERROR("invalid OVS DB table update event");
576 return (-1);
577 }
579 /* OVS DB result request handler.
580 * This callback is called by POLL thread if OVS DB
581 * result reply is received from the DB server.
582 * Once registered callback found, it's called
583 * by this handler. */
584 static int
585 ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
586 {
587 ovs_callback_t *cb = NULL;
588 yajl_val jresult;
589 yajl_val jerror;
590 yajl_val jid;
591 const char *result_path[] = {"result", NULL};
592 const char *error_path[] = {"error", NULL};
593 const char *id_path[] = {"id", NULL};
595 jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
596 jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
597 jid = yajl_tree_get(jnode, id_path, yajl_t_string);
599 /* check & get result attributes */
600 if (!jresult || !jerror || !jid)
601 return (-1);
603 /* try to find registered callback */
604 cb = ovs_db_table_callback_get(pdb, jid);
605 if (cb != NULL && cb->result.call != NULL) {
606 /* call registered callback */
607 cb->result.call(jresult, jerror);
608 /* unlock owner of the reply */
609 sem_post(&cb->result.sync);
610 }
612 return (0);
613 }
615 /* Handle JSON data (one request) and call
616 * appropriate event OVS DB handler. Currently,
617 * update callback 'ovs_db_table_update_cb' and
618 * result callback 'ovs_db_result_cb' is supported.
619 */
620 static int
621 ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
622 {
623 const char *method = NULL;
624 char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
625 const char *method_path[] = {"method", NULL};
626 const char *result_path[] = {"result", NULL};
627 char *sjson = NULL;
628 yajl_val jnode, jval;
630 /* duplicate the data to make null-terminated string
631 * required for yajl_tree_parse() */
632 if ((sjson = strndup(data, len)) == NULL)
633 return (-1);
635 OVS_DEBUG("%s", sjson);
637 /* parse json data */
638 jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
639 if (jnode == NULL) {
640 OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
641 return (-1);
642 }
644 /* get method name */
645 if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) {
646 method = YAJL_GET_STRING(jval);
647 if (strcmp("echo", method) == 0) {
648 /* echo request from the server */
649 if (ovs_db_table_echo_cb(pdb, jnode) < 0)
650 OVS_ERROR("handle echo request failed");
651 } else if (strcmp("update", method) == 0) {
652 /* update notification */
653 if (ovs_db_table_update_cb(pdb, jnode) < 0)
654 OVS_ERROR("handle update notification failed");
655 }
656 } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_object)) {
657 /* result notification */
658 if (ovs_db_result_cb(pdb, jnode) < 0)
659 OVS_ERROR("handle result reply failed");
660 }
662 /* release memory */
663 yajl_tree_free(jnode);
664 sfree(sjson);
665 return (0);
666 }
668 /*
669 * JSON reader implementation.
670 *
671 * This module process raw JSON data (byte stream) and
672 * returns fully-fledged JSON data which can be processed
673 * (parsed) by YAJL later.
674 */
676 /* Allocate JSON reader instance */
677 static inline ovs_json_reader_t *
678 ovs_json_reader_alloc()
679 {
680 ovs_json_reader_t *jreader = NULL;
682 if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
683 return NULL;
685 return jreader;
686 }
688 /* Push raw data into into the JSON reader for processing */
689 static inline int
690 ovs_json_reader_push_data(ovs_json_reader_t *jreader,
691 const char *data, size_t data_len)
692 {
693 char *new_buff = NULL;
694 size_t available = jreader->buff_size - jreader->buff_offset;
696 /* check/update required memory space */
697 if (available < data_len) {
698 OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
699 (int)jreader->buff_size, (int)available, (int)data_len);
701 /* allocate new chunk of memory */
702 new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
703 if (new_buff == NULL)
704 return (-1);
706 /* point to new allocated memory */
707 jreader->buff_ptr = new_buff;
708 jreader->buff_size += data_len;
709 }
711 /* store input data */
712 memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
713 jreader->buff_offset += data_len;
714 return (0);
715 }
717 /* Pop one fully-fledged JSON if already exists. Returns 0 if
718 * completed JSON already exists otherwise negative value is
719 * returned */
720 static inline int
721 ovs_json_reader_pop(ovs_json_reader_t *jreader,
722 const char **json_ptr, size_t *json_len_ptr)
723 {
724 size_t nbraces = 0;
725 size_t json_len = 0;
726 char *json = NULL;
728 /* search open/close brace */
729 for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
730 if (jreader->buff_ptr[i] == '{') {
731 nbraces++;
732 } else if (jreader->buff_ptr[i] == '}')
733 if (nbraces)
734 if (!(--nbraces)) {
735 /* JSON data */
736 *json_ptr = jreader->buff_ptr + jreader->json_offset;
737 *json_len_ptr = json_len + 1;
738 jreader->json_offset = i + 1;
739 return (0);
740 }
742 /* increase JSON data length */
743 if (nbraces)
744 json_len++;
745 }
747 if (jreader->json_offset) {
748 if (jreader->json_offset < jreader->buff_offset) {
749 /* shift data to the beginning of the buffer
750 * and zero rest of the buffer data */
751 json = &jreader->buff_ptr[jreader->json_offset];
752 json_len = jreader->buff_offset - jreader->json_offset;
753 for (int i = 0; i < jreader->buff_size; i++)
754 jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
755 jreader->buff_offset = json_len;
756 } else
757 /* reset the buffer */
758 jreader->buff_offset = 0;
760 /* data is at the beginning of the buffer */
761 jreader->json_offset = 0;
762 }
764 return (-1);
765 }
767 /* Reset JSON reader. It is useful when start processing
768 * new raw data. E.g.: in case of lost stream connection.
769 */
770 static inline void
771 ovs_json_reader_reset(ovs_json_reader_t *jreader)
772 {
773 if (jreader) {
774 jreader->buff_offset = 0;
775 jreader->json_offset = 0;
776 }
777 }
779 /* Release internal data allocated for JSON reader */
780 static inline void
781 ovs_json_reader_free(ovs_json_reader_t *jreader)
782 {
783 if (jreader) {
784 free(jreader->buff_ptr);
785 free(jreader);
786 }
787 }
789 /* Reconnect to OVD DB and call init OVS DB callback
790 * 'init_cb' if connection has been established.
791 */
792 static int
793 ovs_db_reconnect(ovs_db_t *pdb)
794 {
795 char errbuff[OVS_ERROR_BUFF_SIZE];
797 /* remove all registered OVS DB table/result callbacks */
798 ovs_db_callback_remove_all(pdb);
800 /* open new socket */
801 if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) {
802 sstrerror(errno, errbuff, sizeof(errbuff));
803 OVS_ERROR("socket(): %s", errbuff);
804 return (-1);
805 }
807 /* try to connect to server */
808 if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr,
809 pdb->conn.addr_size) < 0) {
810 sstrerror(errno, errbuff, sizeof(errbuff));
811 OVS_ERROR("connect(): %s", errbuff);
812 close(pdb->conn.sock);
813 return (-1);
814 }
816 /* send notification to event thread */
817 ovs_db_event_post(pdb, OVS_DB_EVENT_CONNECTED);
818 return (0);
819 }
821 /* POLL worker thread.
822 * It listens on OVS DB connection for incoming
823 * requests/reply/events etc. Also, it reconnects to OVS DB
824 * if connection has been lost.
825 */
826 static void *
827 ovs_poll_worker(void *arg)
828 {
829 ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
830 ovs_json_reader_t *jreader = NULL;
831 const char *json;
832 size_t json_len;
833 ssize_t nbytes = 0;
834 char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
835 struct pollfd poll_fd;
836 int poll_ret = 0;
838 if ((jreader = ovs_json_reader_alloc()) == NULL) {
839 OVS_ERROR("initialize json reader failed");
840 goto thread_exit;
841 }
843 /* start polling data */
844 poll_fd.fd = pdb->conn.sock;
845 poll_fd.events = POLLIN | POLLPRI;
846 poll_fd.revents = 0;
848 /* poll data */
849 while (ovs_db_poll_is_running(pdb)) {
850 poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
851 if (poll_ret > 0) {
852 if (poll_fd.revents & POLLNVAL) {
853 /* invalid file descriptor, reconnect */
854 if (ovs_db_reconnect(pdb) != 0) {
855 /* sleep awhile until next reconnect */
856 usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
857 }
858 ovs_json_reader_reset(jreader);
859 poll_fd.fd = pdb->conn.sock;
860 } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
861 /* connection is broken */
862 OVS_ERROR("poll() peer closed its end of the channel");
863 close(poll_fd.fd);
864 } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
865 /* read incoming data */
866 nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
867 if (nbytes > 0) {
868 OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
869 ovs_json_reader_push_data(jreader, buff, nbytes);
870 while (!ovs_json_reader_pop(jreader, &json, &json_len))
871 /* process JSON data */
872 ovs_db_json_data_process(pdb, json, json_len);
873 } else if (nbytes == 0) {
874 OVS_ERROR("recv() peer has performed an orderly shutdown");
875 close(poll_fd.fd);
876 } else {
877 OVS_ERROR("recv() receive data error");
878 break;
879 }
880 } /* poll() POLLIN & POLLPRI */
881 } else if (poll_ret == 0)
882 OVS_DEBUG("poll() timeout");
883 else {
884 OVS_ERROR("poll() error");
885 break;
886 }
887 }
889 thread_exit:
890 OVS_DEBUG("poll thread has been completed");
891 ovs_json_reader_free(jreader);
892 pthread_exit((void *)0);
893 return ((void *)0);
894 }
896 /* EVENT worker thread.
897 * Perform task based on incoming events. This
898 * task can be done asynchronously which allows to
899 * handle OVD DB callback like 'init_cb'.
900 */
901 static void *
902 ovs_event_worker(void *arg)
903 {
904 int ret = 0;
905 ovs_db_t *pdb = (ovs_db_t *)arg;
906 struct timespec ts;
908 while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
909 /* wait for an event */
910 clock_gettime(CLOCK_REALTIME, &ts);
911 ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
912 ret = pthread_cond_timedwait(&pdb->event_thread.cond,
913 &pdb->event_thread.mutex, &ts);
914 if (!ret) {
915 /* handle the event */
916 OVS_DEBUG("handle event %d", pdb->event_thread.value);
917 if (pdb->event_thread.value == OVS_DB_EVENT_CONNECTED)
918 if (pdb->init_cb)
919 pdb->init_cb(pdb);
920 } else if (ret == ETIMEDOUT) {
921 /* wait timeout */
922 OVS_DEBUG("no event received (timeout)");
923 continue;
924 } else {
925 /* unexpected error */
926 OVS_ERROR("pthread_cond_timedwait() failed");
927 break;
928 }
929 }
931 thread_exit:
932 OVS_DEBUG("event thread has been completed");
933 pthread_exit((void *)0);
934 return ((void *)0);
935 }
937 /* Stop EVENT thread */
938 static int
939 ovs_db_event_thread_stop(ovs_db_t *pdb)
940 {
941 ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
942 if (pthread_join(pdb->event_thread.tid, NULL) != 0)
943 return (-1);
944 pthread_mutex_unlock(&pdb->event_thread.mutex);
945 pthread_mutex_destroy(&pdb->event_thread.mutex);
946 return (0);
947 }
949 /* Stop POLL thread */
950 static int
951 ovs_db_poll_thread_stop(ovs_db_t *pdb)
952 {
953 ovs_db_poll_terminate(pdb);
954 if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
955 return (-1);
956 pthread_mutex_destroy(&pdb->poll_thread.mutex);
957 return (0);
958 }
960 /*
961 * Public OVS DB API implementation
962 */
964 ovs_db_t *
965 ovs_db_init(const char *surl, ovs_db_callback_t *cb)
966 {
967 pthread_mutexattr_t mutex_attr;
968 ovs_db_t *pdb = NULL;
970 /* allocate db data & fill it */
971 if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
972 return (NULL);
974 /* convert string url to socket addr */
975 if (ovs_db_url_parse(surl, &pdb->conn) < 0)
976 goto failure;
978 /* setup OVS DB callbacks */
979 if (cb)
980 pdb->init_cb = cb->init_cb;
982 /* prepare event thread */
983 pthread_cond_init(&pdb->event_thread.cond, NULL);
984 pthread_mutex_init(&pdb->event_thread.mutex, NULL);
985 pthread_mutex_lock(&pdb->event_thread.mutex);
986 if (plugin_thread_create(&pdb->event_thread.tid, NULL,
987 ovs_event_worker, pdb) != 0) {
988 OVS_ERROR("event worker start failed");
989 goto failure;
990 }
992 /* prepare polling thread */
993 ovs_db_reconnect(pdb);
994 pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
995 pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
996 if (plugin_thread_create(&pdb->poll_thread.tid, NULL,
997 ovs_poll_worker, pdb) != 0) {
998 OVS_ERROR("pull worker start failed");
999 goto failure;
1000 }
1002 /* init OVS DB mutex */
1003 if (pthread_mutexattr_init(&mutex_attr) ||
1004 pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
1005 pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1006 OVS_ERROR("OVS DB mutex init failed");
1007 goto failure;
1008 }
1010 /* return db to the caller */
1011 return pdb;
1013 failure:
1014 if (pdb->conn.sock)
1015 /* close connection */
1016 close(pdb->conn.sock);
1017 if (pdb->event_thread.tid != 0)
1018 /* stop event thread */
1019 if (ovs_db_event_thread_stop(pdb) < 0)
1020 OVS_ERROR("stop event thread failed");
1021 if (pdb->poll_thread.tid != 0)
1022 /* stop poll thread */
1023 if (ovs_db_poll_thread_stop(pdb) < 0)
1024 OVS_ERROR("stop poll thread failed");
1025 sfree(pdb);
1026 return NULL;
1027 }
1029 int
1030 ovs_db_send_request(ovs_db_t *pdb, const char *method,
1031 const char *params, ovs_db_result_cb_t cb)
1032 {
1033 int ret = 0;
1034 yajl_gen_status yajl_gen_ret;
1035 yajl_val jparams;
1036 yajl_gen jgen;
1037 ovs_callback_t *new_cb = NULL;
1038 uint64_t uid;
1039 char uid_buff[OVS_UID_STR_SIZE];
1040 const char *req = NULL;
1041 size_t req_len = 0;
1042 struct timespec ts;
1044 /* sanity check */
1045 if (!pdb || !method || !params)
1046 return (-1);
1048 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1049 return (-1);
1051 /* try to parse params */
1052 if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1053 OVS_ERROR("params is not a JSON string");
1054 yajl_gen_clear(jgen);
1055 return (-1);
1056 }
1058 /* generate method field */
1059 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1061 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1062 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1064 /* generate params field */
1065 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1066 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1067 yajl_tree_free(jparams);
1069 /* generate id field */
1070 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1071 uid = ovs_uid_generate();
1072 ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1073 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1075 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1077 if (cb) {
1078 /* register result callback */
1079 if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1080 goto yajl_gen_failure;
1082 /* add new callback to front */
1083 sem_init(&new_cb->result.sync, 0, 0);
1084 new_cb->result.call = cb;
1085 new_cb->uid = uid;
1086 ovs_db_callback_add(pdb, new_cb);
1087 }
1089 /* send the request */
1090 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req,
1091 &req_len);
1092 OVS_DEBUG("%s", req);
1093 if (!ovs_db_data_send(pdb, req, req_len)) {
1094 if (cb) {
1095 /* wait for result */
1096 clock_gettime(CLOCK_REALTIME, &ts);
1097 ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1098 if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1099 OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1100 OVS_DB_SEND_REQ_TIMEOUT);
1101 ret = (-1);
1102 }
1103 }
1104 } else {
1105 OVS_ERROR("ovs_db_data_send() failed");
1106 ret = (-1);
1107 }
1109 yajl_gen_failure:
1110 if (new_cb) {
1111 /* destroy callback */
1112 sem_destroy(&new_cb->result.sync);
1113 ovs_db_callback_remove(pdb, new_cb);
1114 }
1116 /* release memory */
1117 yajl_gen_clear(jgen);
1118 return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1119 }
1121 int
1122 ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1123 const char **tb_column, ovs_db_table_cb_t update_cb,
1124 ovs_db_result_cb_t result_cb, unsigned int flags)
1125 {
1126 yajl_gen jgen;
1127 yajl_gen_status yajl_gen_ret;
1128 ovs_callback_t *new_cb = NULL;
1129 char uid_str[OVS_UID_STR_SIZE];
1130 char *params;
1131 size_t params_len;
1132 int ovs_db_ret = 0;
1134 /* sanity check */
1135 if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1136 return (-1);
1138 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1139 return (-1);
1141 /* register table update callback */
1142 if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1143 return (-1);
1145 /* add new callback to front */
1146 new_cb->table.call = update_cb;
1147 new_cb->uid = ovs_uid_generate();
1148 ovs_db_callback_add(pdb, new_cb);
1150 /* make update notification request
1151 * [<db-name>, <json-value>, <monitor-requests>] */
1152 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1153 {
1154 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1156 /* uid string <json-value> */
1157 ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1158 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1160 /* <monitor-requests> */
1161 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1162 {
1163 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1164 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1165 {
1166 /* <monitor-request> */
1167 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1168 {
1169 if (tb_column) {
1170 /* columns within the table to be monitored */
1171 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1172 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1173 for (; *tb_column; tb_column++)
1174 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1175 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1176 }
1177 /* specify select option */
1178 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1179 {
1180 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1181 {
1182 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1183 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1184 flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1185 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1186 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1187 flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1188 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1189 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1190 flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1191 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1192 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1193 flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1194 }
1195 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1196 }
1197 }
1198 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1199 }
1200 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1201 }
1202 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1203 }
1204 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1206 /* make a request to subscribe to given table */
1207 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)¶ms,
1208 ¶ms_len);
1209 if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1210 OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1211 ovs_db_ret = (-1);
1212 }
1214 yajl_gen_failure:
1215 /* release memory */
1216 yajl_gen_clear(jgen);
1217 return ovs_db_ret;
1218 }
1220 int
1221 ovs_db_destroy(ovs_db_t *pdb)
1222 {
1223 int ovs_db_ret = 0;
1224 int ret = 0;
1226 /* sanity check */
1227 if (pdb == NULL)
1228 return (-1);
1230 /* try to lock the structure before releasing */
1231 if (ret = pthread_mutex_lock(&pdb->mutex)) {
1232 OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
1233 return (-1);
1234 }
1236 /* stop poll thread */
1237 if (ovs_db_event_thread_stop(pdb) < 0) {
1238 OVS_ERROR("stop poll thread failed");
1239 ovs_db_ret = (-1);
1240 }
1242 /* stop event thread */
1243 if (ovs_db_poll_thread_stop(pdb) < 0) {
1244 OVS_ERROR("stop event thread failed");
1245 ovs_db_ret = (-1);
1246 }
1248 /* unsubscribe callbacks */
1249 ovs_db_callback_remove_all(pdb);
1251 /* close connection */
1252 if (pdb->conn.sock)
1253 close(pdb->conn.sock);
1255 /* release DB handler */
1256 pthread_mutex_unlock(&pdb->mutex);
1257 pthread_mutex_destroy(&pdb->mutex);
1258 sfree(pdb);
1259 return ovs_db_ret;
1260 }
1262 /*
1263 * Public OVS utils API implementation
1264 */
1266 /* Get YAJL value by key from YAJL dictionary */
1267 yajl_val
1268 ovs_utils_get_value_by_key(yajl_val jval, const char *key)
1269 {
1270 const char *obj_key = NULL;
1272 /* check params */
1273 if (!YAJL_IS_OBJECT(jval) || !key)
1274 return NULL;
1276 /* find a value by key */
1277 for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1278 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1279 if (strcmp(obj_key, key) == 0)
1280 return YAJL_GET_OBJECT(jval)->values[i];
1281 }
1283 return NULL;
1284 }