1 /**
2 * collectd - src/write_http.c
3 * Copyright (C) 2009 Paul Sadauskas
4 * Copyright (C) 2009 Doug MacEachern
5 * Copyright (C) 2007-2009 Florian octo Forster
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; only version 2 of the License is applicable.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 *
20 * Authors:
21 * Florian octo Forster <octo at verplant.org>
22 * Doug MacEachern <dougm@hyperic.com>
23 * Paul Sadauskas <psadauskas@gmail.com>
24 **/
26 #include "collectd.h"
27 #include "plugin.h"
28 #include "common.h"
29 #include "utils_cache.h"
30 #include "utils_parse_option.h"
31 #include "utils_format_json.h"
33 #if HAVE_PTHREAD_H
34 # include <pthread.h>
35 #endif
37 #include <curl/curl.h>
39 /*
40 * Private variables
41 */
42 struct wh_callback_s
43 {
44 char *location;
46 char *user;
47 char *pass;
48 char *credentials;
49 _Bool verify_peer;
50 _Bool verify_host;
51 char *cacert;
52 char *capath;
53 char *clientkey;
54 char *clientcert;
55 char *clientkeypass;
56 long sslversion;
57 _Bool store_rates;
59 #define WH_FORMAT_COMMAND 0
60 #define WH_FORMAT_JSON 1
61 int format;
63 CURL *curl;
64 char curl_errbuf[CURL_ERROR_SIZE];
66 char send_buffer[4096];
67 size_t send_buffer_free;
68 size_t send_buffer_fill;
69 cdtime_t send_buffer_init_time;
71 pthread_mutex_t send_lock;
72 };
73 typedef struct wh_callback_s wh_callback_t;
75 static void wh_reset_buffer (wh_callback_t *cb) /* {{{ */
76 {
77 memset (cb->send_buffer, 0, sizeof (cb->send_buffer));
78 cb->send_buffer_free = sizeof (cb->send_buffer);
79 cb->send_buffer_fill = 0;
80 cb->send_buffer_init_time = cdtime ();
82 if (cb->format == WH_FORMAT_JSON)
83 {
84 format_json_initialize (cb->send_buffer,
85 &cb->send_buffer_fill,
86 &cb->send_buffer_free);
87 }
88 } /* }}} wh_reset_buffer */
90 static int wh_send_buffer (wh_callback_t *cb) /* {{{ */
91 {
92 int status = 0;
94 curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, cb->send_buffer);
95 status = curl_easy_perform (cb->curl);
96 if (status != CURLE_OK)
97 {
98 ERROR ("write_http plugin: curl_easy_perform failed with "
99 "status %i: %s",
100 status, cb->curl_errbuf);
101 }
102 return (status);
103 } /* }}} wh_send_buffer */
105 static int wh_callback_init (wh_callback_t *cb) /* {{{ */
106 {
107 struct curl_slist *headers;
109 if (cb->curl != NULL)
110 return (0);
112 cb->curl = curl_easy_init ();
113 if (cb->curl == NULL)
114 {
115 ERROR ("curl plugin: curl_easy_init failed.");
116 return (-1);
117 }
119 curl_easy_setopt (cb->curl, CURLOPT_NOSIGNAL, 1L);
120 curl_easy_setopt (cb->curl, CURLOPT_USERAGENT, COLLECTD_USERAGENT);
122 headers = NULL;
123 headers = curl_slist_append (headers, "Accept: */*");
124 if (cb->format == WH_FORMAT_JSON)
125 headers = curl_slist_append (headers, "Content-Type: application/json");
126 else
127 headers = curl_slist_append (headers, "Content-Type: text/plain");
128 headers = curl_slist_append (headers, "Expect:");
129 curl_easy_setopt (cb->curl, CURLOPT_HTTPHEADER, headers);
131 curl_easy_setopt (cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
132 curl_easy_setopt (cb->curl, CURLOPT_URL, cb->location);
134 if (cb->user != NULL)
135 {
136 size_t credentials_size;
138 credentials_size = strlen (cb->user) + 2;
139 if (cb->pass != NULL)
140 credentials_size += strlen (cb->pass);
142 cb->credentials = (char *) malloc (credentials_size);
143 if (cb->credentials == NULL)
144 {
145 ERROR ("curl plugin: malloc failed.");
146 return (-1);
147 }
149 ssnprintf (cb->credentials, credentials_size, "%s:%s",
150 cb->user, (cb->pass == NULL) ? "" : cb->pass);
151 curl_easy_setopt (cb->curl, CURLOPT_USERPWD, cb->credentials);
152 curl_easy_setopt (cb->curl, CURLOPT_HTTPAUTH, CURLAUTH_ANY);
153 }
155 curl_easy_setopt (cb->curl, CURLOPT_SSL_VERIFYPEER, (long) cb->verify_peer);
156 curl_easy_setopt (cb->curl, CURLOPT_SSL_VERIFYHOST,
157 cb->verify_host ? 2L : 0L);
158 curl_easy_setopt (cb->curl, CURLOPT_SSLVERSION, cb->sslversion);
159 if (cb->cacert != NULL)
160 curl_easy_setopt (cb->curl, CURLOPT_CAINFO, cb->cacert);
161 if (cb->capath != NULL)
162 curl_easy_setopt (cb->curl, CURLOPT_CAPATH, cb->capath);
164 if (cb->clientkey != NULL && cb->clientcert != NULL)
165 {
166 curl_easy_setopt (cb->curl, CURLOPT_SSLKEY, cb->clientkey);
167 curl_easy_setopt (cb->curl, CURLOPT_SSLCERT, cb->clientcert);
169 if (cb->clientkeypass != NULL)
170 curl_easy_setopt (cb->curl, CURLOPT_SSLKEYPASSWD, cb->clientkeypass);
171 }
173 wh_reset_buffer (cb);
175 return (0);
176 } /* }}} int wh_callback_init */
178 static int wh_flush_nolock (cdtime_t timeout, wh_callback_t *cb) /* {{{ */
179 {
180 int status;
182 DEBUG ("write_http plugin: wh_flush_nolock: timeout = %.3f; "
183 "send_buffer_fill = %zu;",
184 CDTIME_T_TO_DOUBLE (timeout),
185 cb->send_buffer_fill);
187 /* timeout == 0 => flush unconditionally */
188 if (timeout > 0)
189 {
190 cdtime_t now;
192 now = cdtime ();
193 if ((cb->send_buffer_init_time + timeout) > now)
194 return (0);
195 }
197 if (cb->format == WH_FORMAT_COMMAND)
198 {
199 if (cb->send_buffer_fill <= 0)
200 {
201 cb->send_buffer_init_time = cdtime ();
202 return (0);
203 }
205 status = wh_send_buffer (cb);
206 wh_reset_buffer (cb);
207 }
208 else if (cb->format == WH_FORMAT_JSON)
209 {
210 if (cb->send_buffer_fill <= 2)
211 {
212 cb->send_buffer_init_time = cdtime ();
213 return (0);
214 }
216 status = format_json_finalize (cb->send_buffer,
217 &cb->send_buffer_fill,
218 &cb->send_buffer_free);
219 if (status != 0)
220 {
221 ERROR ("write_http: wh_flush_nolock: "
222 "format_json_finalize failed.");
223 wh_reset_buffer (cb);
224 return (status);
225 }
227 status = wh_send_buffer (cb);
228 wh_reset_buffer (cb);
229 }
230 else
231 {
232 ERROR ("write_http: wh_flush_nolock: "
233 "Unknown format: %i",
234 cb->format);
235 return (-1);
236 }
238 return (status);
239 } /* }}} wh_flush_nolock */
241 static int wh_flush (cdtime_t timeout, /* {{{ */
242 const char *identifier __attribute__((unused)),
243 user_data_t *user_data)
244 {
245 wh_callback_t *cb;
246 int status;
248 if (user_data == NULL)
249 return (-EINVAL);
251 cb = user_data->data;
253 pthread_mutex_lock (&cb->send_lock);
255 if (cb->curl == NULL)
256 {
257 status = wh_callback_init (cb);
258 if (status != 0)
259 {
260 ERROR ("write_http plugin: wh_callback_init failed.");
261 pthread_mutex_unlock (&cb->send_lock);
262 return (-1);
263 }
264 }
266 status = wh_flush_nolock (timeout, cb);
267 pthread_mutex_unlock (&cb->send_lock);
269 return (status);
270 } /* }}} int wh_flush */
272 static void wh_callback_free (void *data) /* {{{ */
273 {
274 wh_callback_t *cb;
276 if (data == NULL)
277 return;
279 cb = data;
281 wh_flush_nolock (/* timeout = */ 0, cb);
283 curl_easy_cleanup (cb->curl);
284 sfree (cb->location);
285 sfree (cb->user);
286 sfree (cb->pass);
287 sfree (cb->credentials);
288 sfree (cb->cacert);
289 sfree (cb->capath);
290 sfree (cb->clientkey);
291 sfree (cb->clientcert);
292 sfree (cb->clientkeypass);
294 sfree (cb);
295 } /* }}} void wh_callback_free */
297 static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{{ */
298 wh_callback_t *cb)
299 {
300 char key[10*DATA_MAX_NAME_LEN];
301 char values[512];
302 char command[1024];
303 size_t command_len;
305 int status;
307 if (0 != strcmp (ds->type, vl->type)) {
308 ERROR ("write_http plugin: DS type does not match "
309 "value list type");
310 return -1;
311 }
313 /* Copy the identifier to `key' and escape it. */
314 status = FORMAT_VL (key, sizeof (key), vl);
315 if (status != 0) {
316 ERROR ("write_http plugin: error with format_name");
317 return (status);
318 }
319 escape_string (key, sizeof (key));
321 /* Convert the values to an ASCII representation and put that into
322 * `values'. */
323 status = format_values (values, sizeof (values), ds, vl, cb->store_rates);
324 if (status != 0) {
325 ERROR ("write_http plugin: error with "
326 "wh_value_list_to_string");
327 return (status);
328 }
330 command_len = (size_t) ssnprintf (command, sizeof (command),
331 "PUTVAL %s interval=%.3f %s\r\n",
332 key,
333 CDTIME_T_TO_DOUBLE (vl->interval),
334 values);
335 if (command_len >= sizeof (command)) {
336 ERROR ("write_http plugin: Command buffer too small: "
337 "Need %zu bytes.", command_len + 1);
338 return (-1);
339 }
341 pthread_mutex_lock (&cb->send_lock);
343 if (cb->curl == NULL)
344 {
345 status = wh_callback_init (cb);
346 if (status != 0)
347 {
348 ERROR ("write_http plugin: wh_callback_init failed.");
349 pthread_mutex_unlock (&cb->send_lock);
350 return (-1);
351 }
352 }
354 if (command_len >= cb->send_buffer_free)
355 {
356 status = wh_flush_nolock (/* timeout = */ 0, cb);
357 if (status != 0)
358 {
359 pthread_mutex_unlock (&cb->send_lock);
360 return (status);
361 }
362 }
363 assert (command_len < cb->send_buffer_free);
365 /* `command_len + 1' because `command_len' does not include the
366 * trailing null byte. Neither does `send_buffer_fill'. */
367 memcpy (cb->send_buffer + cb->send_buffer_fill,
368 command, command_len + 1);
369 cb->send_buffer_fill += command_len;
370 cb->send_buffer_free -= command_len;
372 DEBUG ("write_http plugin: <%s> buffer %zu/%zu (%g%%) \"%s\"",
373 cb->location,
374 cb->send_buffer_fill, sizeof (cb->send_buffer),
375 100.0 * ((double) cb->send_buffer_fill) / ((double) sizeof (cb->send_buffer)),
376 command);
378 /* Check if we have enough space for this command. */
379 pthread_mutex_unlock (&cb->send_lock);
381 return (0);
382 } /* }}} int wh_write_command */
384 static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ */
385 wh_callback_t *cb)
386 {
387 int status;
389 pthread_mutex_lock (&cb->send_lock);
391 if (cb->curl == NULL)
392 {
393 status = wh_callback_init (cb);
394 if (status != 0)
395 {
396 ERROR ("write_http plugin: wh_callback_init failed.");
397 pthread_mutex_unlock (&cb->send_lock);
398 return (-1);
399 }
400 }
402 status = format_json_value_list (cb->send_buffer,
403 &cb->send_buffer_fill,
404 &cb->send_buffer_free,
405 ds, vl, cb->store_rates);
406 if (status == (-ENOMEM))
407 {
408 status = wh_flush_nolock (/* timeout = */ 0, cb);
409 if (status != 0)
410 {
411 wh_reset_buffer (cb);
412 pthread_mutex_unlock (&cb->send_lock);
413 return (status);
414 }
416 status = format_json_value_list (cb->send_buffer,
417 &cb->send_buffer_fill,
418 &cb->send_buffer_free,
419 ds, vl, cb->store_rates);
420 }
421 if (status != 0)
422 {
423 pthread_mutex_unlock (&cb->send_lock);
424 return (status);
425 }
427 DEBUG ("write_http plugin: <%s> buffer %zu/%zu (%g%%)",
428 cb->location,
429 cb->send_buffer_fill, sizeof (cb->send_buffer),
430 100.0 * ((double) cb->send_buffer_fill) / ((double) sizeof (cb->send_buffer)));
432 /* Check if we have enough space for this command. */
433 pthread_mutex_unlock (&cb->send_lock);
435 return (0);
436 } /* }}} int wh_write_json */
438 static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
439 user_data_t *user_data)
440 {
441 wh_callback_t *cb;
442 int status;
444 if (user_data == NULL)
445 return (-EINVAL);
447 cb = user_data->data;
449 if (cb->format == WH_FORMAT_JSON)
450 status = wh_write_json (ds, vl, cb);
451 else
452 status = wh_write_command (ds, vl, cb);
454 return (status);
455 } /* }}} int wh_write */
457 static int config_set_format (wh_callback_t *cb, /* {{{ */
458 oconfig_item_t *ci)
459 {
460 char *string;
462 if ((ci->values_num != 1)
463 || (ci->values[0].type != OCONFIG_TYPE_STRING))
464 {
465 WARNING ("write_http plugin: The `%s' config option "
466 "needs exactly one string argument.", ci->key);
467 return (-1);
468 }
470 string = ci->values[0].value.string;
471 if (strcasecmp ("Command", string) == 0)
472 cb->format = WH_FORMAT_COMMAND;
473 else if (strcasecmp ("JSON", string) == 0)
474 cb->format = WH_FORMAT_JSON;
475 else
476 {
477 ERROR ("write_http plugin: Invalid format string: %s",
478 string);
479 return (-1);
480 }
482 return (0);
483 } /* }}} int config_set_format */
485 static int wh_config_url (oconfig_item_t *ci) /* {{{ */
486 {
487 wh_callback_t *cb;
488 user_data_t user_data;
489 int i;
491 cb = malloc (sizeof (*cb));
492 if (cb == NULL)
493 {
494 ERROR ("write_http plugin: malloc failed.");
495 return (-1);
496 }
497 memset (cb, 0, sizeof (*cb));
498 cb->verify_peer = 1;
499 cb->verify_host = 1;
500 cb->format = WH_FORMAT_COMMAND;
501 cb->sslversion = CURL_SSLVERSION_DEFAULT;
503 pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
505 cf_util_get_string (ci, &cb->location);
506 if (cb->location == NULL)
507 return (-1);
509 for (i = 0; i < ci->children_num; i++)
510 {
511 oconfig_item_t *child = ci->children + i;
513 if (strcasecmp ("User", child->key) == 0)
514 cf_util_get_string (child, &cb->user);
515 else if (strcasecmp ("Password", child->key) == 0)
516 cf_util_get_string (child, &cb->pass);
517 else if (strcasecmp ("VerifyPeer", child->key) == 0)
518 cf_util_get_boolean (child, &cb->verify_peer);
519 else if (strcasecmp ("VerifyHost", child->key) == 0)
520 cf_util_get_boolean (child, &cb->verify_host);
521 else if (strcasecmp ("CACert", child->key) == 0)
522 cf_util_get_string (child, &cb->cacert);
523 else if (strcasecmp ("CAPath", child->key) == 0)
524 cf_util_get_string (child, &cb->capath);
525 else if (strcasecmp ("ClientKey", child->key) == 0)
526 cf_util_get_string (child, &cb->clientkey);
527 else if (strcasecmp ("ClientCert", child->key) == 0)
528 cf_util_get_string (child, &cb->clientcert);
529 else if (strcasecmp ("ClientKeyPass", child->key) == 0)
530 cf_util_get_string (child, &cb->clientkeypass);
531 else if (strcasecmp ("SSLVersion", child->key) == 0)
532 {
533 char *value = NULL;
535 cf_util_get_string (child, &value);
537 if (value == NULL || strcasecmp ("default", value) == 0)
538 cb->sslversion = CURL_SSLVERSION_DEFAULT;
539 else if (strcasecmp ("SSLv2", value) == 0)
540 cb->sslversion = CURL_SSLVERSION_SSLv2;
541 else if (strcasecmp ("SSLv3", value) == 0)
542 cb->sslversion = CURL_SSLVERSION_SSLv3;
543 else if (strcasecmp ("TLSv1", value) == 0)
544 cb->sslversion = CURL_SSLVERSION_TLSv1;
545 #if (LIBCURL_VERSION_MAJOR > 7) || (LIBCURL_VERSION_MAJOR == 7 && LIBCURL_VERSION_MINOR >= 34)
546 else if (strcasecmp ("TLSv1_0", value) == 0)
547 cb->sslversion = CURL_SSLVERSION_TLSv1_0;
548 else if (strcasecmp ("TLSv1_1", value) == 0)
549 cb->sslversion = CURL_SSLVERSION_TLSv1_1;
550 else if (strcasecmp ("TLSv1_2", value) == 0)
551 cb->sslversion = CURL_SSLVERSION_TLSv1_2;
552 #endif
553 else
554 ERROR ("write_http plugin: Invalid SSLVersion "
555 "option: %s.", value);
557 sfree(value);
558 }
559 else if (strcasecmp ("Format", child->key) == 0)
560 config_set_format (cb, child);
561 else if (strcasecmp ("StoreRates", child->key) == 0)
562 cf_util_get_boolean (child, &cb->store_rates);
563 else
564 {
565 ERROR ("write_http plugin: Invalid configuration "
566 "option: %s.", child->key);
567 }
568 }
570 DEBUG ("write_http: Registering write callback with URL %s",
571 cb->location);
573 memset (&user_data, 0, sizeof (user_data));
574 user_data.data = cb;
575 user_data.free_func = NULL;
576 plugin_register_flush ("write_http", wh_flush, &user_data);
578 user_data.free_func = wh_callback_free;
579 plugin_register_write ("write_http", wh_write, &user_data);
581 return (0);
582 } /* }}} int wh_config_url */
584 static int wh_config (oconfig_item_t *ci) /* {{{ */
585 {
586 int i;
588 for (i = 0; i < ci->children_num; i++)
589 {
590 oconfig_item_t *child = ci->children + i;
592 if (strcasecmp ("URL", child->key) == 0)
593 wh_config_url (child);
594 else
595 {
596 ERROR ("write_http plugin: Invalid configuration "
597 "option: %s.", child->key);
598 }
599 }
601 return (0);
602 } /* }}} int wh_config */
604 void module_register (void) /* {{{ */
605 {
606 plugin_register_complex_config ("write_http", wh_config);
607 } /* }}} void module_register */
609 /* vim: set fdm=marker sw=8 ts=8 tw=78 et : */