1 /**
2 * collectd - src/statsd.c
3 * Copyright (C) 2013 Florian octo Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Florian octo Forster <octo at collectd.org>
25 */
27 #include "collectd.h"
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_avltree.h"
32 #include "utils_latency.h"
34 #include <netdb.h>
35 #include <poll.h>
36 #include <sys/types.h>
38 /* AIX doesn't have MSG_DONTWAIT */
39 #ifndef MSG_DONTWAIT
40 #define MSG_DONTWAIT MSG_NONBLOCK
41 #endif
43 #ifndef STATSD_DEFAULT_NODE
44 #define STATSD_DEFAULT_NODE NULL
45 #endif
47 #ifndef STATSD_DEFAULT_SERVICE
48 #define STATSD_DEFAULT_SERVICE "8125"
49 #endif
51 enum metric_type_e { STATSD_COUNTER, STATSD_TIMER, STATSD_GAUGE, STATSD_SET };
52 typedef enum metric_type_e metric_type_t;
54 struct statsd_metric_s {
55 metric_type_t type;
56 double value;
57 derive_t counter;
58 latency_counter_t *latency;
59 c_avl_tree_t *set;
60 unsigned long updates_num;
61 };
62 typedef struct statsd_metric_s statsd_metric_t;
64 static c_avl_tree_t *metrics_tree = NULL;
65 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
67 static pthread_t network_thread;
68 static _Bool network_thread_running = 0;
69 static _Bool network_thread_shutdown = 0;
71 static char *conf_node = NULL;
72 static char *conf_service = NULL;
74 static _Bool conf_delete_counters = 0;
75 static _Bool conf_delete_timers = 0;
76 static _Bool conf_delete_gauges = 0;
77 static _Bool conf_delete_sets = 0;
79 static double *conf_timer_percentile = NULL;
80 static size_t conf_timer_percentile_num = 0;
82 static _Bool conf_counter_sum = 0;
83 static _Bool conf_timer_lower = 0;
84 static _Bool conf_timer_upper = 0;
85 static _Bool conf_timer_sum = 0;
86 static _Bool conf_timer_count = 0;
88 /* Must hold metrics_lock when calling this function. */
89 static statsd_metric_t *statsd_metric_lookup_unsafe(char const *name, /* {{{ */
90 metric_type_t type) {
91 char key[DATA_MAX_NAME_LEN + 2];
92 char *key_copy;
93 statsd_metric_t *metric;
94 int status;
96 switch (type) {
97 case STATSD_COUNTER:
98 key[0] = 'c';
99 break;
100 case STATSD_TIMER:
101 key[0] = 't';
102 break;
103 case STATSD_GAUGE:
104 key[0] = 'g';
105 break;
106 case STATSD_SET:
107 key[0] = 's';
108 break;
109 default:
110 return (NULL);
111 }
113 key[1] = ':';
114 sstrncpy(&key[2], name, sizeof(key) - 2);
116 status = c_avl_get(metrics_tree, key, (void *)&metric);
117 if (status == 0)
118 return (metric);
120 key_copy = strdup(key);
121 if (key_copy == NULL) {
122 ERROR("statsd plugin: strdup failed.");
123 return (NULL);
124 }
126 metric = calloc(1, sizeof(*metric));
127 if (metric == NULL) {
128 ERROR("statsd plugin: calloc failed.");
129 sfree(key_copy);
130 return (NULL);
131 }
133 metric->type = type;
134 metric->latency = NULL;
135 metric->set = NULL;
137 status = c_avl_insert(metrics_tree, key_copy, metric);
138 if (status != 0) {
139 ERROR("statsd plugin: c_avl_insert failed.");
140 sfree(key_copy);
141 sfree(metric);
142 return (NULL);
143 }
145 return (metric);
146 } /* }}} statsd_metric_lookup_unsafe */
148 static int statsd_metric_set(char const *name, double value, /* {{{ */
149 metric_type_t type) {
150 statsd_metric_t *metric;
152 pthread_mutex_lock(&metrics_lock);
154 metric = statsd_metric_lookup_unsafe(name, type);
155 if (metric == NULL) {
156 pthread_mutex_unlock(&metrics_lock);
157 return (-1);
158 }
160 metric->value = value;
161 metric->updates_num++;
163 pthread_mutex_unlock(&metrics_lock);
165 return (0);
166 } /* }}} int statsd_metric_set */
168 static int statsd_metric_add(char const *name, double delta, /* {{{ */
169 metric_type_t type) {
170 statsd_metric_t *metric;
172 pthread_mutex_lock(&metrics_lock);
174 metric = statsd_metric_lookup_unsafe(name, type);
175 if (metric == NULL) {
176 pthread_mutex_unlock(&metrics_lock);
177 return (-1);
178 }
180 metric->value += delta;
181 metric->updates_num++;
183 pthread_mutex_unlock(&metrics_lock);
185 return (0);
186 } /* }}} int statsd_metric_add */
188 static void statsd_metric_free(statsd_metric_t *metric) /* {{{ */
189 {
190 if (metric == NULL)
191 return;
193 if (metric->latency != NULL) {
194 latency_counter_destroy(metric->latency);
195 metric->latency = NULL;
196 }
198 if (metric->set != NULL) {
199 void *key;
200 void *value;
202 while (c_avl_pick(metric->set, &key, &value) == 0) {
203 sfree(key);
204 assert(value == NULL);
205 }
207 c_avl_destroy(metric->set);
208 metric->set = NULL;
209 }
211 sfree(metric);
212 } /* }}} void statsd_metric_free */
214 static int statsd_parse_value(char const *str, value_t *ret_value) /* {{{ */
215 {
216 char *endptr = NULL;
218 ret_value->gauge = (gauge_t)strtod(str, &endptr);
219 if ((str == endptr) || ((endptr != NULL) && (*endptr != 0)))
220 return (-1);
222 return (0);
223 } /* }}} int statsd_parse_value */
225 static int statsd_handle_counter(char const *name, /* {{{ */
226 char const *value_str, char const *extra) {
227 value_t value;
228 value_t scale;
229 int status;
231 if ((extra != NULL) && (extra[0] != '@'))
232 return (-1);
234 scale.gauge = 1.0;
235 if (extra != NULL) {
236 status = statsd_parse_value(extra + 1, &scale);
237 if (status != 0)
238 return (status);
240 if (!isfinite(scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
241 return (-1);
242 }
244 value.gauge = 1.0;
245 status = statsd_parse_value(value_str, &value);
246 if (status != 0)
247 return (status);
249 /* Changes to the counter are added to (statsd_metric_t*)->value. ->counter is
250 * only updated in statsd_metric_submit_unsafe(). */
251 return (statsd_metric_add(name, (double)(value.gauge / scale.gauge),
252 STATSD_COUNTER));
253 } /* }}} int statsd_handle_counter */
255 static int statsd_handle_gauge(char const *name, /* {{{ */
256 char const *value_str) {
257 value_t value;
258 int status;
260 value.gauge = 0;
261 status = statsd_parse_value(value_str, &value);
262 if (status != 0)
263 return (status);
265 if ((value_str[0] == '+') || (value_str[0] == '-'))
266 return (statsd_metric_add(name, (double)value.gauge, STATSD_GAUGE));
267 else
268 return (statsd_metric_set(name, (double)value.gauge, STATSD_GAUGE));
269 } /* }}} int statsd_handle_gauge */
271 static int statsd_handle_timer(char const *name, /* {{{ */
272 char const *value_str, char const *extra) {
273 statsd_metric_t *metric;
274 value_t value_ms;
275 value_t scale;
276 cdtime_t value;
277 int status;
279 if ((extra != NULL) && (extra[0] != '@'))
280 return (-1);
282 scale.gauge = 1.0;
283 if (extra != NULL) {
284 status = statsd_parse_value(extra + 1, &scale);
285 if (status != 0)
286 return (status);
288 if (!isfinite(scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
289 return (-1);
290 }
292 value_ms.derive = 0;
293 status = statsd_parse_value(value_str, &value_ms);
294 if (status != 0)
295 return (status);
297 value = MS_TO_CDTIME_T(value_ms.gauge / scale.gauge);
299 pthread_mutex_lock(&metrics_lock);
301 metric = statsd_metric_lookup_unsafe(name, STATSD_TIMER);
302 if (metric == NULL) {
303 pthread_mutex_unlock(&metrics_lock);
304 return (-1);
305 }
307 if (metric->latency == NULL)
308 metric->latency = latency_counter_create();
309 if (metric->latency == NULL) {
310 pthread_mutex_unlock(&metrics_lock);
311 return (-1);
312 }
314 latency_counter_add(metric->latency, value);
315 metric->updates_num++;
317 pthread_mutex_unlock(&metrics_lock);
318 return (0);
319 } /* }}} int statsd_handle_timer */
321 static int statsd_handle_set(char const *name, /* {{{ */
322 char const *set_key_orig) {
323 statsd_metric_t *metric = NULL;
324 char *set_key;
325 int status;
327 pthread_mutex_lock(&metrics_lock);
329 metric = statsd_metric_lookup_unsafe(name, STATSD_SET);
330 if (metric == NULL) {
331 pthread_mutex_unlock(&metrics_lock);
332 return (-1);
333 }
335 /* Make sure metric->set exists. */
336 if (metric->set == NULL)
337 metric->set = c_avl_create((int (*)(const void *, const void *))strcmp);
339 if (metric->set == NULL) {
340 pthread_mutex_unlock(&metrics_lock);
341 ERROR("statsd plugin: c_avl_create failed.");
342 return (-1);
343 }
345 set_key = strdup(set_key_orig);
346 if (set_key == NULL) {
347 pthread_mutex_unlock(&metrics_lock);
348 ERROR("statsd plugin: strdup failed.");
349 return (-1);
350 }
352 status = c_avl_insert(metric->set, set_key, /* value = */ NULL);
353 if (status < 0) {
354 pthread_mutex_unlock(&metrics_lock);
355 if (status < 0)
356 ERROR("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
357 set_key, status);
358 sfree(set_key);
359 return (-1);
360 } else if (status > 0) /* key already exists */
361 {
362 sfree(set_key);
363 }
365 metric->updates_num++;
367 pthread_mutex_unlock(&metrics_lock);
368 return (0);
369 } /* }}} int statsd_handle_set */
371 static int statsd_parse_line(char *buffer) /* {{{ */
372 {
373 char *name = buffer;
374 char *value;
375 char *type;
376 char *extra;
378 type = strchr(name, '|');
379 if (type == NULL)
380 return (-1);
381 *type = 0;
382 type++;
384 value = strrchr(name, ':');
385 if (value == NULL)
386 return (-1);
387 *value = 0;
388 value++;
390 extra = strchr(type, '|');
391 if (extra != NULL) {
392 *extra = 0;
393 extra++;
394 }
396 if (strcmp("c", type) == 0)
397 return (statsd_handle_counter(name, value, extra));
398 else if (strcmp("ms", type) == 0)
399 return (statsd_handle_timer(name, value, extra));
401 /* extra is only valid for counters and timers */
402 if (extra != NULL)
403 return (-1);
405 if (strcmp("g", type) == 0)
406 return (statsd_handle_gauge(name, value));
407 else if (strcmp("s", type) == 0)
408 return (statsd_handle_set(name, value));
409 else
410 return (-1);
411 } /* }}} void statsd_parse_line */
413 static void statsd_parse_buffer(char *buffer) /* {{{ */
414 {
415 while (buffer != NULL) {
416 char orig[64];
417 char *next;
418 int status;
420 next = strchr(buffer, '\n');
421 if (next != NULL) {
422 *next = 0;
423 next++;
424 }
426 if (*buffer == 0) {
427 buffer = next;
428 continue;
429 }
431 sstrncpy(orig, buffer, sizeof(orig));
433 status = statsd_parse_line(buffer);
434 if (status != 0)
435 ERROR("statsd plugin: Unable to parse line: \"%s\"", orig);
437 buffer = next;
438 }
439 } /* }}} void statsd_parse_buffer */
441 static void statsd_network_read(int fd) /* {{{ */
442 {
443 char buffer[4096];
444 size_t buffer_size;
445 ssize_t status;
447 status = recv(fd, buffer, sizeof(buffer), /* flags = */ MSG_DONTWAIT);
448 if (status < 0) {
449 char errbuf[1024];
451 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
452 return;
454 ERROR("statsd plugin: recv(2) failed: %s",
455 sstrerror(errno, errbuf, sizeof(errbuf)));
456 return;
457 }
459 buffer_size = (size_t)status;
460 if (buffer_size >= sizeof(buffer))
461 buffer_size = sizeof(buffer) - 1;
462 buffer[buffer_size] = 0;
464 statsd_parse_buffer(buffer);
465 } /* }}} void statsd_network_read */
467 static int statsd_network_init(struct pollfd **ret_fds, /* {{{ */
468 size_t *ret_fds_num) {
469 struct pollfd *fds = NULL;
470 size_t fds_num = 0;
472 struct addrinfo *ai_list;
473 int status;
475 char const *node = (conf_node != NULL) ? conf_node : STATSD_DEFAULT_NODE;
476 char const *service =
477 (conf_service != NULL) ? conf_service : STATSD_DEFAULT_SERVICE;
479 struct addrinfo ai_hints = {.ai_family = AF_UNSPEC,
480 .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
481 .ai_socktype = SOCK_DGRAM};
483 status = getaddrinfo(node, service, &ai_hints, &ai_list);
484 if (status != 0) {
485 ERROR("statsd plugin: getaddrinfo (\"%s\", \"%s\") failed: %s", node,
486 service, gai_strerror(status));
487 return (status);
488 }
490 for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL;
491 ai_ptr = ai_ptr->ai_next) {
492 int fd;
493 struct pollfd *tmp;
495 char dbg_node[NI_MAXHOST];
496 char dbg_service[NI_MAXSERV];
498 fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
499 if (fd < 0) {
500 char errbuf[1024];
501 ERROR("statsd plugin: socket(2) failed: %s",
502 sstrerror(errno, errbuf, sizeof(errbuf)));
503 continue;
504 }
506 getnameinfo(ai_ptr->ai_addr, ai_ptr->ai_addrlen, dbg_node, sizeof(dbg_node),
507 dbg_service, sizeof(dbg_service),
508 NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV);
509 DEBUG("statsd plugin: Trying to bind to [%s]:%s ...", dbg_node,
510 dbg_service);
512 status = bind(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
513 if (status != 0) {
514 char errbuf[1024];
515 ERROR("statsd plugin: bind(2) failed: %s",
516 sstrerror(errno, errbuf, sizeof(errbuf)));
517 close(fd);
518 continue;
519 }
521 tmp = realloc(fds, sizeof(*fds) * (fds_num + 1));
522 if (tmp == NULL) {
523 ERROR("statsd plugin: realloc failed.");
524 close(fd);
525 continue;
526 }
527 fds = tmp;
528 tmp = fds + fds_num;
529 fds_num++;
531 memset(tmp, 0, sizeof(*tmp));
532 tmp->fd = fd;
533 tmp->events = POLLIN | POLLPRI;
534 }
536 freeaddrinfo(ai_list);
538 if (fds_num == 0) {
539 ERROR("statsd plugin: Unable to create listening socket for [%s]:%s.",
540 (node != NULL) ? node : "::", service);
541 return (ENOENT);
542 }
544 *ret_fds = fds;
545 *ret_fds_num = fds_num;
546 return (0);
547 } /* }}} int statsd_network_init */
549 static void *statsd_network_thread(void *args) /* {{{ */
550 {
551 struct pollfd *fds = NULL;
552 size_t fds_num = 0;
553 int status;
555 status = statsd_network_init(&fds, &fds_num);
556 if (status != 0) {
557 ERROR("statsd plugin: Unable to open listening sockets.");
558 pthread_exit((void *)0);
559 }
561 while (!network_thread_shutdown) {
562 status = poll(fds, (nfds_t)fds_num, /* timeout = */ -1);
563 if (status < 0) {
564 char errbuf[1024];
566 if ((errno == EINTR) || (errno == EAGAIN))
567 continue;
569 ERROR("statsd plugin: poll(2) failed: %s",
570 sstrerror(errno, errbuf, sizeof(errbuf)));
571 break;
572 }
574 for (size_t i = 0; i < fds_num; i++) {
575 if ((fds[i].revents & (POLLIN | POLLPRI)) == 0)
576 continue;
578 statsd_network_read(fds[i].fd);
579 fds[i].revents = 0;
580 }
581 } /* while (!network_thread_shutdown) */
583 /* Clean up */
584 for (size_t i = 0; i < fds_num; i++)
585 close(fds[i].fd);
586 sfree(fds);
588 return ((void *)0);
589 } /* }}} void *statsd_network_thread */
591 static int statsd_config_timer_percentile(oconfig_item_t *ci) /* {{{ */
592 {
593 double percent = NAN;
594 double *tmp;
595 int status;
597 status = cf_util_get_double(ci, &percent);
598 if (status != 0)
599 return (status);
601 if ((percent <= 0.0) || (percent >= 100)) {
602 ERROR("statsd plugin: The value for \"%s\" must be between 0 and 100, "
603 "exclusively.",
604 ci->key);
605 return (ERANGE);
606 }
608 tmp = realloc(conf_timer_percentile, sizeof(*conf_timer_percentile) *
609 (conf_timer_percentile_num + 1));
610 if (tmp == NULL) {
611 ERROR("statsd plugin: realloc failed.");
612 return (ENOMEM);
613 }
614 conf_timer_percentile = tmp;
615 conf_timer_percentile[conf_timer_percentile_num] = percent;
616 conf_timer_percentile_num++;
618 return (0);
619 } /* }}} int statsd_config_timer_percentile */
621 static int statsd_config(oconfig_item_t *ci) /* {{{ */
622 {
623 for (int i = 0; i < ci->children_num; i++) {
624 oconfig_item_t *child = ci->children + i;
626 if (strcasecmp("Host", child->key) == 0)
627 cf_util_get_string(child, &conf_node);
628 else if (strcasecmp("Port", child->key) == 0)
629 cf_util_get_service(child, &conf_service);
630 else if (strcasecmp("DeleteCounters", child->key) == 0)
631 cf_util_get_boolean(child, &conf_delete_counters);
632 else if (strcasecmp("DeleteTimers", child->key) == 0)
633 cf_util_get_boolean(child, &conf_delete_timers);
634 else if (strcasecmp("DeleteGauges", child->key) == 0)
635 cf_util_get_boolean(child, &conf_delete_gauges);
636 else if (strcasecmp("DeleteSets", child->key) == 0)
637 cf_util_get_boolean(child, &conf_delete_sets);
638 else if (strcasecmp("CounterSum", child->key) == 0)
639 cf_util_get_boolean(child, &conf_counter_sum);
640 else if (strcasecmp("TimerLower", child->key) == 0)
641 cf_util_get_boolean(child, &conf_timer_lower);
642 else if (strcasecmp("TimerUpper", child->key) == 0)
643 cf_util_get_boolean(child, &conf_timer_upper);
644 else if (strcasecmp("TimerSum", child->key) == 0)
645 cf_util_get_boolean(child, &conf_timer_sum);
646 else if (strcasecmp("TimerCount", child->key) == 0)
647 cf_util_get_boolean(child, &conf_timer_count);
648 else if (strcasecmp("TimerPercentile", child->key) == 0)
649 statsd_config_timer_percentile(child);
650 else
651 ERROR("statsd plugin: The \"%s\" config option is not valid.",
652 child->key);
653 }
655 return (0);
656 } /* }}} int statsd_config */
658 static int statsd_init(void) /* {{{ */
659 {
660 pthread_mutex_lock(&metrics_lock);
661 if (metrics_tree == NULL)
662 metrics_tree = c_avl_create((int (*)(const void *, const void *))strcmp);
664 if (!network_thread_running) {
665 int status;
667 status = pthread_create(&network_thread,
668 /* attr = */ NULL, statsd_network_thread,
669 /* args = */ NULL);
670 if (status != 0) {
671 char errbuf[1024];
672 pthread_mutex_unlock(&metrics_lock);
673 ERROR("statsd plugin: pthread_create failed: %s",
674 sstrerror(errno, errbuf, sizeof(errbuf)));
675 return (status);
676 }
677 }
678 network_thread_running = 1;
680 pthread_mutex_unlock(&metrics_lock);
682 return (0);
683 } /* }}} int statsd_init */
685 /* Must hold metrics_lock when calling this function. */
686 static int statsd_metric_clear_set_unsafe(statsd_metric_t *metric) /* {{{ */
687 {
688 void *key;
689 void *value;
691 if ((metric == NULL) || (metric->type != STATSD_SET))
692 return (EINVAL);
694 if (metric->set == NULL)
695 return (0);
697 while (c_avl_pick(metric->set, &key, &value) == 0) {
698 sfree(key);
699 sfree(value);
700 }
702 return (0);
703 } /* }}} int statsd_metric_clear_set_unsafe */
705 /* Must hold metrics_lock when calling this function. */
706 static int statsd_metric_submit_unsafe(char const *name,
707 statsd_metric_t *metric) /* {{{ */
708 {
709 value_t values[1];
710 value_list_t vl = VALUE_LIST_INIT;
712 vl.values = values;
713 vl.values_len = 1;
714 sstrncpy(vl.host, hostname_g, sizeof(vl.host));
715 sstrncpy(vl.plugin, "statsd", sizeof(vl.plugin));
717 if (metric->type == STATSD_GAUGE)
718 sstrncpy(vl.type, "gauge", sizeof(vl.type));
719 else if (metric->type == STATSD_TIMER)
720 sstrncpy(vl.type, "latency", sizeof(vl.type));
721 else if (metric->type == STATSD_SET)
722 sstrncpy(vl.type, "objects", sizeof(vl.type));
723 else /* if (metric->type == STATSD_COUNTER) */
724 sstrncpy(vl.type, "derive", sizeof(vl.type));
726 sstrncpy(vl.type_instance, name, sizeof(vl.type_instance));
728 if (metric->type == STATSD_GAUGE)
729 values[0].gauge = (gauge_t)metric->value;
730 else if (metric->type == STATSD_TIMER) {
731 _Bool have_events = (metric->updates_num > 0);
733 /* Make sure all timer metrics share the *same* timestamp. */
734 vl.time = cdtime();
736 ssnprintf(vl.type_instance, sizeof(vl.type_instance), "%s-average", name);
737 values[0].gauge =
738 have_events
739 ? CDTIME_T_TO_DOUBLE(latency_counter_get_average(metric->latency))
740 : NAN;
741 plugin_dispatch_values(&vl);
743 if (conf_timer_lower) {
744 ssnprintf(vl.type_instance, sizeof(vl.type_instance), "%s-lower", name);
745 values[0].gauge =
746 have_events
747 ? CDTIME_T_TO_DOUBLE(latency_counter_get_min(metric->latency))
748 : NAN;
749 plugin_dispatch_values(&vl);
750 }
752 if (conf_timer_upper) {
753 ssnprintf(vl.type_instance, sizeof(vl.type_instance), "%s-upper", name);
754 values[0].gauge =
755 have_events
756 ? CDTIME_T_TO_DOUBLE(latency_counter_get_max(metric->latency))
757 : NAN;
758 plugin_dispatch_values(&vl);
759 }
761 if (conf_timer_sum) {
762 ssnprintf(vl.type_instance, sizeof(vl.type_instance), "%s-sum", name);
763 values[0].gauge =
764 have_events
765 ? CDTIME_T_TO_DOUBLE(latency_counter_get_sum(metric->latency))
766 : NAN;
767 plugin_dispatch_values(&vl);
768 }
770 for (size_t i = 0; i < conf_timer_percentile_num; i++) {
771 ssnprintf(vl.type_instance, sizeof(vl.type_instance),
772 "%s-percentile-%.0f", name, conf_timer_percentile[i]);
773 values[0].gauge = have_events
774 ? CDTIME_T_TO_DOUBLE(latency_counter_get_percentile(
775 metric->latency, conf_timer_percentile[i]))
776 : NAN;
777 plugin_dispatch_values(&vl);
778 }
780 /* Keep this at the end, since vl.type is set to "gauge" here. The
781 * vl.type's above are implicitly set to "latency". */
782 if (conf_timer_count) {
783 sstrncpy(vl.type, "gauge", sizeof(vl.type));
784 ssnprintf(vl.type_instance, sizeof(vl.type_instance), "%s-count", name);
785 values[0].gauge = latency_counter_get_num(metric->latency);
786 plugin_dispatch_values(&vl);
787 }
789 latency_counter_reset(metric->latency);
790 return (0);
791 } else if (metric->type == STATSD_SET) {
792 if (metric->set == NULL)
793 values[0].gauge = 0.0;
794 else
795 values[0].gauge = (gauge_t)c_avl_size(metric->set);
796 } else { /* STATSD_COUNTER */
797 gauge_t delta = nearbyint(metric->value);
799 /* Etsy's statsd writes counters as two metrics: a rate and the change since
800 * the last write. Since collectd does not reset its DERIVE metrics to zero,
801 * this makes little sense, but we're dispatching a "count" metric here
802 * anyway - if requested by the user - for compatibility reasons. */
803 if (conf_counter_sum) {
804 sstrncpy(vl.type, "count", sizeof(vl.type));
805 values[0].gauge = delta;
806 plugin_dispatch_values(&vl);
808 /* restore vl.type */
809 sstrncpy(vl.type, "derive", sizeof(vl.type));
810 }
812 /* Rather than resetting value to zero, subtract delta so we correctly keep
813 * track of residuals. */
814 metric->value -= delta;
815 metric->counter += (derive_t)delta;
817 values[0].derive = metric->counter;
818 }
820 return (plugin_dispatch_values(&vl));
821 } /* }}} int statsd_metric_submit_unsafe */
823 static int statsd_read(void) /* {{{ */
824 {
825 c_avl_iterator_t *iter;
826 char *name;
827 statsd_metric_t *metric;
829 char **to_be_deleted = NULL;
830 size_t to_be_deleted_num = 0;
832 pthread_mutex_lock(&metrics_lock);
834 if (metrics_tree == NULL) {
835 pthread_mutex_unlock(&metrics_lock);
836 return (0);
837 }
839 iter = c_avl_get_iterator(metrics_tree);
840 while (c_avl_iterator_next(iter, (void *)&name, (void *)&metric) == 0) {
841 if ((metric->updates_num == 0) &&
842 ((conf_delete_counters && (metric->type == STATSD_COUNTER)) ||
843 (conf_delete_timers && (metric->type == STATSD_TIMER)) ||
844 (conf_delete_gauges && (metric->type == STATSD_GAUGE)) ||
845 (conf_delete_sets && (metric->type == STATSD_SET)))) {
846 DEBUG("statsd plugin: Deleting metric \"%s\".", name);
847 strarray_add(&to_be_deleted, &to_be_deleted_num, name);
848 continue;
849 }
851 /* Names have a prefix, e.g. "c:", which determines the (statsd) type.
852 * Remove this here. */
853 statsd_metric_submit_unsafe(name + 2, metric);
855 /* Reset the metric. */
856 metric->updates_num = 0;
857 if (metric->type == STATSD_SET)
858 statsd_metric_clear_set_unsafe(metric);
859 }
860 c_avl_iterator_destroy(iter);
862 for (size_t i = 0; i < to_be_deleted_num; i++) {
863 int status;
865 status = c_avl_remove(metrics_tree, to_be_deleted[i], (void *)&name,
866 (void *)&metric);
867 if (status != 0) {
868 ERROR("stats plugin: c_avl_remove (\"%s\") failed with status %i.",
869 to_be_deleted[i], status);
870 continue;
871 }
873 sfree(name);
874 statsd_metric_free(metric);
875 }
877 pthread_mutex_unlock(&metrics_lock);
879 strarray_free(to_be_deleted, to_be_deleted_num);
881 return (0);
882 } /* }}} int statsd_read */
884 static int statsd_shutdown(void) /* {{{ */
885 {
886 void *key;
887 void *value;
889 if (network_thread_running) {
890 network_thread_shutdown = 1;
891 pthread_kill(network_thread, SIGTERM);
892 pthread_join(network_thread, /* retval = */ NULL);
893 }
894 network_thread_running = 0;
896 pthread_mutex_lock(&metrics_lock);
898 while (c_avl_pick(metrics_tree, &key, &value) == 0) {
899 sfree(key);
900 statsd_metric_free(value);
901 }
902 c_avl_destroy(metrics_tree);
903 metrics_tree = NULL;
905 sfree(conf_node);
906 sfree(conf_service);
908 pthread_mutex_unlock(&metrics_lock);
910 return (0);
911 } /* }}} int statsd_shutdown */
913 void module_register(void) {
914 plugin_register_complex_config("statsd", statsd_config);
915 plugin_register_init("statsd", statsd_init);
916 plugin_register_read("statsd", statsd_read);
917 plugin_register_shutdown("statsd", statsd_shutdown);
918 }
920 /* vim: set sw=2 sts=2 et fdm=marker : */