From 1f8f016bd1ab08f33c16b4589b0371c81d66b122 Mon Sep 17 00:00:00 2001 From: Jim Radford Date: Wed, 7 Aug 2013 13:05:02 -0700 Subject: [PATCH] curl_json plugin: support getting json from a Unix socket as well as a URL --- src/curl_json.c | 85 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 63 insertions(+), 22 deletions(-) diff --git a/src/curl_json.c b/src/curl_json.c index a07846d1..de66862c 100644 --- a/src/curl_json.c +++ b/src/curl_json.c @@ -28,7 +28,12 @@ #include "utils_avltree.h" #include "utils_complain.h" +#include +#include +#include + #include + #include #if HAVE_YAJL_YAJL_VERSION_H # include @@ -60,6 +65,8 @@ struct cj_s /* {{{ */ char *instance; char *host; + char *sock; + char *url; char *user; char *pass; @@ -259,7 +266,7 @@ static int cj_cb_start (void *ctx) cj_t *db = (cj_t *)ctx; if (++db->depth >= YAJL_MAX_DEPTH) { - ERROR ("curl_json plugin: %s depth exceeds max, aborting.", db->url); + ERROR ("curl_json plugin: %s depth exceeds max, aborting.", db->url ? db->url : db->sock); return (CJ_CB_ABORT); } return (CJ_CB_CONTINUE); @@ -597,20 +604,20 @@ static int cj_config_add_url (oconfig_item_t *ci) /* {{{ */ memset (db, 0, sizeof (*db)); if (strcasecmp ("URL", ci->key) == 0) - { status = cf_util_get_string (ci, &db->url); - if (status != 0) - { - sfree (db); - return (status); - } - } + else if (strcasecmp ("Sock", ci->key) == 0) + status = cf_util_get_string (ci, &db->sock); else { ERROR ("curl_json plugin: cj_config: " "Invalid key: %s", ci->key); return (-1); } + if (status != 0) + { + sfree (db); + return (status); + } /* Fill the `cj_t' structure.. */ for (i = 0; i < ci->children_num; i++) @@ -621,19 +628,19 @@ static int cj_config_add_url (oconfig_item_t *ci) /* {{{ */ status = cf_util_get_string (child, &db->instance); else if (strcasecmp ("Host", child->key) == 0) status = cf_util_get_string (child, &db->host); - else if (strcasecmp ("User", child->key) == 0) + else if (db->url && strcasecmp ("User", child->key) == 0) status = cf_util_get_string (child, &db->user); - else if (strcasecmp ("Password", child->key) == 0) + else if (db->url && strcasecmp ("Password", child->key) == 0) status = cf_util_get_string (child, &db->pass); - else if (strcasecmp ("VerifyPeer", child->key) == 0) + else if (db->url && strcasecmp ("VerifyPeer", child->key) == 0) status = cf_util_get_boolean (child, &db->verify_peer); - else if (strcasecmp ("VerifyHost", child->key) == 0) + else if (db->url && strcasecmp ("VerifyHost", child->key) == 0) status = cf_util_get_boolean (child, &db->verify_host); - else if (strcasecmp ("CACert", child->key) == 0) + else if (db->url && strcasecmp ("CACert", child->key) == 0) status = cf_util_get_string (child, &db->cacert); - else if (strcasecmp ("Header", child->key) == 0) + else if (db->url && strcasecmp ("Header", child->key) == 0) status = cj_config_append_string ("Header", &db->headers, child); - else if (strcasecmp ("Post", child->key) == 0) + else if (db->url && strcasecmp ("Post", child->key) == 0) status = cf_util_get_string (child, &db->post_body); else if (strcasecmp ("Key", child->key) == 0) status = cj_config_add_key (db, child); @@ -652,10 +659,10 @@ static int cj_config_add_url (oconfig_item_t *ci) /* {{{ */ if (db->tree == NULL) { WARNING ("curl_json plugin: No (valid) `Key' block " - "within `URL' block `%s'.", db->url); + "within `%s' block `%s'.", db->url ? "URL" : "Sock", db->url ? db->url : db->sock); status = -1; } - if (status == 0) + if (status == 0 && db->url) status = cj_init_curl (db); } @@ -676,7 +683,7 @@ static int cj_config_add_url (oconfig_item_t *ci) /* {{{ */ ud.free_func = cj_free; ssnprintf (cb_name, sizeof (cb_name), "curl_json-%s-%s", - db->instance, db->url); + db->instance, db->url ? db->url : db->sock); plugin_register_complex_read (/* group = */ NULL, cb_name, cj_read, /* interval = */ NULL, &ud); @@ -705,7 +712,7 @@ static int cj_config (oconfig_item_t *ci) /* {{{ */ { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("URL", child->key) == 0) + if (strcasecmp ("Sock", child->key) == 0 || strcasecmp ("URL", child->key) == 0) { status = cj_config_add_url (child); if (status == 0) @@ -765,7 +772,42 @@ static void cj_submit (cj_t *db, cj_key_t *key, value_t *value) /* {{{ */ plugin_dispatch_values (&vl); } /* }}} int cj_submit */ -static int cj_curl_perform (cj_t *db) /* {{{ */ +static int cj_sock_perform (cj_t *db) /* {{{ */ +{ + struct sockaddr_un sa_unix = {}; + sa_unix.sun_family = AF_UNIX; + sstrncpy (sa_unix.sun_path, db->sock, sizeof (sa_unix.sun_path)); + + int fd = socket (AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) + return (-1); + if (connect (fd, (struct sockaddr *)&sa_unix, sizeof(sa_unix)) < 0) + { + ERROR ("curl_json plugin: connect(%s) failed: %s", + (db->sock != NULL) ? db->sock : "", strerror(errno)); + close (fd); + return (-1); + } + + ssize_t red; + do { + unsigned char buffer[4096]; + red = read (fd, buffer, sizeof(buffer)); + if (red < 0) { + ERROR ("curl_json plugin: read(%s) failed: %s", + (db->sock != NULL) ? db->sock : "", strerror(errno)); + close (fd); + return (-1); + } + if (!cj_curl_callback (buffer, red, 1, db)) + break; + } while (red > 0); + close (fd); + return (0); +} /* }}} int cj_sock_perform */ + + +static int cj_curl_perform(cj_t *db) /* {{{ */ { int status; long rc; @@ -790,7 +832,6 @@ static int cj_curl_perform (cj_t *db) /* {{{ */ "response code %ld (%s)", rc, url); return (-1); } - return (0); } /* }}} int cj_curl_perform */ @@ -813,7 +854,7 @@ static int cj_perform (cj_t *db) /* {{{ */ return (-1); } - if (cj_curl_perform(db) < 0) + if (db->url ? cj_curl_perform (db) < 0 : cj_sock_perform (db) < 0) { yajl_free (db->yajl); db->yajl = yprev; -- 2.30.2