Code

9e96198d6951c4eb64f469b4a28e083fe3950a07
[collectd.git] / src / write_graphite.c
1 /**
2  * collectd - src/write_graphite.c
3  * Copyright (C) 2011  Scott Sanders
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Author:
19  *   Scott Sanders <scott@jssjr.com>
20  *
21  *   based on the excellent write_http plugin
22  **/
24  /* write_graphite plugin configuation example
25   *
26   * <Plugin write_graphite>
27   *   <Carbon>
28   *     Host "localhost"
29   *     Port "2003"
30   *     Prefix "collectd"
31   *   </Carbon>
32   * </Plugin>
33   */
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "configfile.h"
40 #include "utils_cache.h"
41 #include "utils_parse_option.h"
43 /* Folks without pthread will need to disable this plugin. */
44 #include <pthread.h>
46 #include <sys/socket.h>
47 #include <sys/stat.h>
48 #include <sys/types.h>
50 #include <netinet/in.h>
51 #include <netdb.h>
53 #ifndef WG_FORMAT_NAME
54 #define WG_FORMAT_NAME(ret, ret_len, vl, cb, name) \
55         wg_format_name (ret, ret_len, (vl)->host, (vl)->plugin, \
56                          (vl)->plugin_instance, (vl)->type, \
57                          (vl)->type_instance, (cb)->prefix, (cb)->postfix, \
58                          name, (cb)->dotchar)
59 #endif
61 #ifndef WG_DEFAULT_NODE
62 # define WG_DEFAULT_NODE "localhost"
63 #endif
65 #ifndef WG_DEFAULT_SERVICE
66 # define WG_DEFAULT_SERVICE "2003"
67 #endif
69 #ifndef WG_SEND_BUF_SIZE
70 # define WG_SEND_BUF_SIZE 4096
71 #endif
73 /*
74  * Private variables
75  */
76 struct wg_callback
77 {
78     int      sock_fd;
79     struct hostent *server;
81     char    *node;
82     char    *service;
83     char    *prefix;
84     char    *postfix;
85     char     dotchar;
87     char     send_buf[WG_SEND_BUF_SIZE];
88     size_t   send_buf_free;
89     size_t   send_buf_fill;
90     cdtime_t send_buf_init_time;
92     pthread_mutex_t send_lock;
93 };
96 /*
97  * Functions
98  */
99 static void wg_reset_buffer (struct wg_callback *cb)
101     memset (cb->send_buf, 0, sizeof (cb->send_buf));
102     cb->send_buf_free = sizeof (cb->send_buf);
103     cb->send_buf_fill = 0;
104     cb->send_buf_init_time = cdtime ();
107 static int wg_send_buffer (struct wg_callback *cb)
109     int status = 0;
111     status = write (cb->sock_fd, cb->send_buf, strlen (cb->send_buf));
112     if (status < 0)
113     {
114         ERROR ("write_graphite plugin: send failed with "
115                 "status %i (%s)",
116                 status,
117                 strerror (errno));
119         pthread_mutex_trylock (&cb->send_lock);
121         DEBUG ("write_graphite plugin: closing socket and restting fd "
122                 "so reinit will occur");
123         close (cb->sock_fd);
124         cb->sock_fd = -1;
126         pthread_mutex_unlock (&cb->send_lock);
128         return (-1);
129     }
130     return (0);
133 static int wg_flush_nolock (cdtime_t timeout, struct wg_callback *cb)
135     int status;
137     DEBUG ("write_graphite plugin: wg_flush_nolock: timeout = %.3f; "
138             "send_buf_fill = %zu;",
139             (double)timeout,
140             cb->send_buf_fill);
142     /* timeout == 0  => flush unconditionally */
143     if (timeout > 0)
144     {
145         cdtime_t now;
147         now = cdtime ();
148         if ((cb->send_buf_init_time + timeout) > now)
149             return (0);
150     }
152     if (cb->send_buf_fill <= 0)
153     {
154         cb->send_buf_init_time = cdtime ();
155         return (0);
156     }
158     status = wg_send_buffer (cb);
159     wg_reset_buffer (cb);
161     return (status);
164 static int wg_callback_init (struct wg_callback *cb)
166     struct addrinfo ai_hints;
167     struct addrinfo *ai_list;
168     struct addrinfo *ai_ptr;
169     int status;
171     const char *node = cb->node ? cb->node : WG_DEFAULT_NODE;
172     const char *service = cb->service ? cb->service : WG_DEFAULT_SERVICE;
174     if (cb->sock_fd > 0)
175         return (0);
177     memset (&ai_hints, 0, sizeof (ai_hints));
178 #ifdef AI_ADDRCONFIG
179     ai_hints.ai_flags |= AI_ADDRCONFIG;
180 #endif
181     ai_hints.ai_family = AF_UNSPEC;
182     ai_hints.ai_socktype = SOCK_STREAM;
184     ai_list = NULL;
186     status = getaddrinfo (node, service, &ai_hints, &ai_list);
187     if (status != 0)
188     {
189         ERROR ("write_graphite plugin: getaddrinfo (%s, %s) failed: %s",
190                 node, service, gai_strerror (status));
191         return (-1);
192     }
194     assert (ai_list != NULL);
195     for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
196     {
197         cb->sock_fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
198                 ai_ptr->ai_protocol);
199         if (cb->sock_fd < 0)
200             continue;
202         status = connect (cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
203         if (status != 0)
204         {
205             close (cb->sock_fd);
206             cb->sock_fd = -1;
207             continue;
208         }
210         break;
211     }
213     freeaddrinfo (ai_list);
215     if (cb->sock_fd < 0)
216     {
217         char errbuf[1024];
218         ERROR ("write_graphite plugin: Connecting to %s:%s failed. "
219                 "The last error was: %s", node, service,
220                 sstrerror (errno, errbuf, sizeof (errbuf)));
221         close (cb->sock_fd);
222         return (-1);
223     }
225     wg_reset_buffer (cb);
227     return (0);
230 static void wg_callback_free (void *data)
232     struct wg_callback *cb;
234     if (data == NULL)
235         return;
237     cb = data;
239     wg_flush_nolock (/* timeout = */ 0, cb);
241     close(cb->sock_fd);
242     sfree(cb->node);
243     sfree(cb->service);
244     sfree(cb->prefix);
245     sfree(cb->postfix);
247     sfree(cb);
250 static int wg_flush (cdtime_t timeout,
251         const char *identifier __attribute__((unused)),
252         user_data_t *user_data)
254     struct wg_callback *cb;
255     int status;
257     if (user_data == NULL)
258         return (-EINVAL);
260     cb = user_data->data;
262     pthread_mutex_lock (&cb->send_lock);
264     if (cb->sock_fd < 0)
265     {
266         status = wg_callback_init (cb);
267         if (status != 0)
268         {
269             ERROR ("write_graphite plugin: wg_callback_init failed.");
270             pthread_mutex_unlock (&cb->send_lock);
271             return (-1);
272         }
273     }
275     status = wg_flush_nolock (timeout, cb);
276     pthread_mutex_unlock (&cb->send_lock);
278     return (status);
281 static int wg_format_values (char *ret, size_t ret_len,
282         int ds_num, const data_set_t *ds, const value_list_t *vl,
283         _Bool store_rates)
285     size_t offset = 0;
286     int status;
287     gauge_t *rates = NULL;
289     assert (0 == strcmp (ds->type, vl->type));
291     memset (ret, 0, ret_len);
293 #define BUFFER_ADD(...) do { \
294     status = ssnprintf (ret + offset, ret_len - offset, \
295             __VA_ARGS__); \
296     if (status < 1) \
297     { \
298         sfree (rates); \
299         return (-1); \
300     } \
301     else if (((size_t) status) >= (ret_len - offset)) \
302     { \
303         sfree (rates); \
304         return (-1); \
305     } \
306     else \
307     offset += ((size_t) status); \
308 } while (0)
310     if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
311         BUFFER_ADD ("%f", vl->values[ds_num].gauge);
312     else if (store_rates)
313     {
314         if (rates == NULL)
315             rates = uc_get_rate (ds, vl);
316         if (rates == NULL)
317         {
318             WARNING ("format_values: "
319                     "uc_get_rate failed.");
320             return (-1);
321         }
322         BUFFER_ADD ("%g", rates[ds_num]);
323     }
324     else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
325         BUFFER_ADD ("%llu", vl->values[ds_num].counter);
326     else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
327         BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive);
328     else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
329         BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute);
330     else
331     {
332         ERROR ("format_values plugin: Unknown data source type: %i",
333                 ds->ds[ds_num].type);
334         sfree (rates);
335         return (-1);
336     }
338 #undef BUFFER_ADD
340     sfree (rates);
341     return (0);
344 static int swap_chars (char *dst, const char *src,
345         const char from, const char to)
347     size_t i;
349     int reps = 0;
351     for (i = 0; i < strlen(src) ; i++)
352     {
353         if (src[i] == from)
354         {
355             dst[i] = to;
356             ++reps;
357         }
358         else
359             dst[i] = src[i];
360     }
361     dst[i] = '\0';
363     return reps;
366 static int wg_format_name (char *ret, int ret_len,
367         const char *hostname,
368         const char *plugin, const char *plugin_instance,
369         const char *type, const char *type_instance,
370         const char *prefix, const char *postfix,
371         const char *ds_name, const char dotchar)
373     int  status;
374     char *n_hostname = NULL;
375     char *n_plugin = NULL;
376     char *n_plugin_instance = NULL;
377     char *n_type = NULL;
378     char *n_type_instance = NULL;
380     assert (plugin != NULL);
381     assert (type != NULL);
383     if ((n_hostname = malloc(strlen(hostname)+1)) == NULL)
384     {
385         ERROR ("Unable to allocate memory for normalized hostname buffer");
386         return (-1);
387     }
389     if ((n_plugin = malloc(strlen(plugin)+1)) == NULL)
390     {
391         ERROR ("Unable to allocate memory for normalized plugin buffer");
392         return (-1);
393     }
395     if ((n_type = malloc(strlen(type)+1)) == NULL)
396     {
397         ERROR ("Unable to allocate memory for normalized type buffer");
398         return (-1);
399     }
401     if (swap_chars(n_hostname, hostname, '.', dotchar) == -1)
402     {
403         ERROR ("Unable to normalize hostname");
404         return (-1);
405     }
407     if (swap_chars(n_plugin, plugin, ' ', '_') == -1)
408     {
409         ERROR ("Unable to normalize plugin");
410         return (-1);
411     }
413     if (swap_chars(n_type, type, ' ', '_') == -1)
414     {
415         ERROR ("Unable to normalize type");
416         return (-1);
417     }
419     if (type_instance != NULL && type_instance[0] != '\0')
420     {
421         if ((n_type_instance = malloc(strlen(type_instance)+1)) == NULL)
422         {
423             ERROR ("Unable to allocate memory for normalized datasource name buffer");
424             return (-1);
425         }
426         if (swap_chars(n_type_instance, type_instance, '.', dotchar) == -1)
427         {
428             ERROR ("Unable to normalize datasource name");
429             return (-1);
430         }
431         if (swap_chars(n_type_instance, type_instance, ' ', '_') == -1)
432         {
433             ERROR ("Unable to normalize datasource name");
434             return (-1);
435         }
436     }
438     if (plugin_instance != NULL && plugin_instance[0] != '\0')
439     {
440         if ((n_plugin_instance = malloc(strlen(plugin_instance)+1)) == NULL)
441         {
442             ERROR ("Unable to allocate memory for normalized plugin instance buffer");
443             return (-1);
444         }
445         if (swap_chars(n_plugin_instance, plugin_instance, ' ', '_') == -1)
446         {
447             ERROR ("Unable to normalize datasource name");
448             return (-1);
449         }
450     }
452     if ((n_plugin_instance == NULL) || (n_plugin_instance[0] == '\0'))
453     {
454         if ((n_type_instance == NULL) || (n_type_instance[0] == '\0'))
455         {
456             if ((ds_name == NULL) || (ds_name[0] == '\0'))
457                 status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s",
458                         prefix, n_hostname, postfix, n_plugin, n_type);
459             else
460                 status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s",
461                         prefix, n_hostname, postfix, n_plugin, n_type, ds_name);
462         }
463         else
464         {
465             if ((ds_name == NULL) || (ds_name[0] == '\0'))
466                 status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s-%s",
467                         prefix, n_hostname, postfix, n_plugin, n_type,
468                         n_type_instance);
469             else
470                 status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s-%s.%s",
471                         prefix, n_hostname, postfix, n_plugin, n_type,
472                         n_type_instance, ds_name);
473         }
474     }
475     else
476     {
477         if ((n_type_instance == NULL) || (n_type_instance[0] == '\0'))
478         {
479             if ((ds_name == NULL) || (ds_name[0] == '\0'))
480                 status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s",
481                         prefix, n_hostname, postfix, n_plugin,
482                         n_plugin_instance, n_type);
483             else
484                 status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s.%s",
485                         prefix, n_hostname, postfix, n_plugin,
486                         n_plugin_instance, n_type, ds_name);
487         }
488         else
489         {
490             if ((ds_name == NULL) || (ds_name[0] == '\0'))
491                 status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s-%s",
492                         prefix, n_hostname, postfix, n_plugin,
493                         n_plugin_instance, n_type, n_type_instance);
494             else
495                 status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s-%s.%s",
496                         prefix, n_hostname, postfix, n_plugin,
497                         n_plugin_instance, n_type, n_type_instance, ds_name);
498         }
499     }
501     sfree(n_hostname);
502     sfree(n_type_instance);
503     sfree(n_type);
504     sfree(n_plugin);
505     sfree(n_plugin_instance);
507     if ((status < 1) || (status >= ret_len))
508         return (-1);
509     return (0);
512 static int wg_send_message (const char* key, const char* value,
513         cdtime_t time, struct wg_callback *cb)
515     int status;
516     size_t message_len;
517     char message[1024];
519     message_len = (size_t) ssnprintf (message, sizeof (message),
520             "%s %s %.0f\n",
521             key,
522             value,
523             CDTIME_T_TO_DOUBLE(time));
524     if (message_len >= sizeof (message)) {
525         ERROR ("write_graphite plugin: message buffer too small: "
526                 "Need %zu bytes.", message_len + 1);
527         return (-1);
528     }
531     pthread_mutex_lock (&cb->send_lock);
533     if (cb->sock_fd < 0)
534     {
535         status = wg_callback_init (cb);
536         if (status != 0)
537         {
538             ERROR ("write_graphite plugin: wg_callback_init failed.");
539             pthread_mutex_unlock (&cb->send_lock);
540             return (-1);
541         }
542     }
544     if (message_len >= cb->send_buf_free)
545     {
546         status = wg_flush_nolock (/* timeout = */ 0, cb);
547         if (status != 0)
548         {
549             pthread_mutex_unlock (&cb->send_lock);
550             return (status);
551         }
552     }
553     assert (message_len < cb->send_buf_free);
555     /* `message_len + 1' because `message_len' does not include the
556      * trailing null byte. Neither does `send_buffer_fill'. */
557     memcpy (cb->send_buf + cb->send_buf_fill,
558             message, message_len + 1);
559     cb->send_buf_fill += message_len;
560     cb->send_buf_free -= message_len;
562     DEBUG ("write_graphite plugin: <%s:%s> buf %zu/%zu (%g%%) \"%s\"",
563             cb->node,
564             cb->service,
565             cb->send_buf_fill, sizeof (cb->send_buf),
566             100.0 * ((double) cb->send_buf_fill) / ((double) sizeof (cb->send_buf)),
567             message);
569     /* Check if we have enough space for this message. */
570     pthread_mutex_unlock (&cb->send_lock);
572     return (0);
575 static int wg_write_messages (const data_set_t *ds, const value_list_t *vl,
576         struct wg_callback *cb)
578     char key[10*DATA_MAX_NAME_LEN];
579     char values[512];
581     int status, i;
583     if (0 != strcmp (ds->type, vl->type))
584     {
585         ERROR ("write_graphite plugin: DS type does not match "
586                 "value list type");
587         return -1;
588     }
590     if (ds->ds_num > 1)
591     {
592         for (i = 0; i < ds->ds_num; i++)
593         {
594             /* Copy the identifier to `key' and escape it. */
595             status = WG_FORMAT_NAME (key, sizeof (key), vl, cb, ds->ds[i].name);
596             if (status != 0)
597             {
598                 ERROR ("write_graphite plugin: error with format_name");
599                 return (status);
600             }
602             escape_string (key, sizeof (key));
603             /* Convert the values to an ASCII representation and put that
604              * into `values'. */
605             status = wg_format_values (values, sizeof (values), i, ds, vl, 0);
606             if (status != 0)
607             {
608                 ERROR ("write_graphite plugin: error with "
609                         "wg_format_values");
610                 return (status);
611             }
613             /* Send the message to graphite */
614             status = wg_send_message (key, values, vl->time, cb);
615             if (status != 0)
616             {
617                 ERROR ("write_graphite plugin: error with "
618                         "wg_send_message");
619                 return (status);
620             }
621         }
622     }
623     else
624     {
625         /* Copy the identifier to `key' and escape it. */
626         status = WG_FORMAT_NAME (key, sizeof (key), vl, cb, NULL);
627         if (status != 0)
628         {
629             ERROR ("write_graphite plugin: error with format_name");
630             return (status);
631         }
633         escape_string (key, sizeof (key));
634         /* Convert the values to an ASCII representation and put that into
635          * `values'. */
636         status = wg_format_values (values, sizeof (values), 0, ds, vl, 0);
637         if (status != 0)
638         {
639             ERROR ("write_graphite plugin: error with "
640                     "wg_format_values");
641             return (status);
642         }
644         /* Send the message to graphite */
645         status = wg_send_message (key, values, vl->time, cb);
646         if (status != 0)
647         {
648             ERROR ("write_graphite plugin: error with "
649                     "wg_send_message");
650             return (status);
651         }
652     }
654     return (0);
657 static int wg_write (const data_set_t *ds, const value_list_t *vl,
658         user_data_t *user_data)
660     struct wg_callback *cb;
661     int status;
663     if (user_data == NULL)
664         return (-EINVAL);
666     cb = user_data->data;
668     status = wg_write_messages (ds, vl, cb);
670     return (status);
673 static int config_set_char (char *dest,
674         oconfig_item_t *ci)
676     if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
677     {
678         WARNING ("write_graphite plugin: The `%s' config option "
679                 "needs exactly one string argument.", ci->key);
680         return (-1);
681     }
683     *dest = ci->values[0].value.string[0];
685     return (0);
688 static int wg_config_carbon (oconfig_item_t *ci)
690     struct wg_callback *cb;
691     user_data_t user_data;
692     int i;
694     cb = malloc (sizeof (*cb));
695     if (cb == NULL)
696     {
697         ERROR ("write_graphite plugin: malloc failed.");
698         return (-1);
699     }
700     memset (cb, 0, sizeof (*cb));
701     cb->sock_fd = -1;
702     cb->node = NULL;
703     cb->service = NULL;
704     cb->prefix = NULL;
705     cb->postfix = NULL;
706     cb->server = NULL;
707     cb->dotchar = '_';
709     pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
711     for (i = 0; i < ci->children_num; i++)
712     {
713         oconfig_item_t *child = ci->children + i;
715         if (strcasecmp ("Host", child->key) == 0)
716             cf_util_get_string (child, &cb->node);
717         else if (strcasecmp ("Port", child->key) == 0)
718             cf_util_get_string (child, &cb->service);
719         else if (strcasecmp ("Prefix", child->key) == 0)
720             cf_util_get_string (child, &cb->prefix);
721         else if (strcasecmp ("Postfix", child->key) == 0)
722             cf_util_get_string (child, &cb->postfix);
723         else if (strcasecmp ("DotCharacter", child->key) == 0)
724             config_set_char (&cb->dotchar, child);
725         else
726         {
727             ERROR ("write_graphite plugin: Invalid configuration "
728                         "option: %s.", child->key);
729         }
730     }
732     DEBUG ("write_graphite: Registering write callback to carbon agent %s:%s",
733             cb->node ? cb->node : WG_DEFAULT_NODE,
734             cb->service ? cb->service : WG_DEFAULT_SERVICE);
736     memset (&user_data, 0, sizeof (user_data));
737     user_data.data = cb;
738     user_data.free_func = NULL;
739     plugin_register_flush ("write_graphite", wg_flush, &user_data);
741     user_data.free_func = wg_callback_free;
742     plugin_register_write ("write_graphite", wg_write, &user_data);
744     return (0);
747 static int wg_config (oconfig_item_t *ci)
749     int i;
751     for (i = 0; i < ci->children_num; i++)
752     {
753         oconfig_item_t *child = ci->children + i;
755         if (strcasecmp ("Carbon", child->key) == 0)
756             wg_config_carbon (child);
757         else
758         {
759             ERROR ("write_graphite plugin: Invalid configuration "
760                     "option: %s.", child->key);
761         }
762     }
764     return (0);
767 void module_register (void)
769     plugin_register_complex_config ("write_graphite", wg_config);
772 /* vim: set sw=4 ts=4 sts=4 tw=78 et : */