61c43fed7ae921265e049c663804a92e0273a060
1 /**
2 * collectd - src/write_prometheus.c
3 * Copyright (C) 2016 Florian octo Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 *
23 * Authors:
24 * Florian octo Forster <octo at collectd.org>
25 */
27 #include "collectd.h"
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_avltree.h"
32 #include "utils_complain.h"
33 #include "utils_time.h"
35 #include "prometheus.pb-c.h"
37 #include <microhttpd.h>
39 #ifndef PROMETHEUS_DEFAULT_STALENESS_DELTA
40 #define PROMETHEUS_DEFAULT_STALENESS_DELTA TIME_T_TO_CDTIME_T_STATIC(300)
41 #endif
43 #define VARINT_UINT32_BYTES 5
45 #define CONTENT_TYPE_PROTO \
46 "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; " \
47 "encoding=delimited"
48 #define CONTENT_TYPE_TEXT "text/plain; version=0.0.4"
50 static c_avl_tree_t *metrics;
51 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
53 static unsigned short httpd_port = 9103;
54 static struct MHD_Daemon *httpd;
56 static cdtime_t staleness_delta = PROMETHEUS_DEFAULT_STALENESS_DELTA;
58 /* Unfortunately, protoc-c doesn't export it's implementation of varint, so we
59 * need to implement our own. */
60 static size_t varint(uint8_t buffer[static VARINT_UINT32_BYTES],
61 uint32_t value) {
62 for (size_t i = 0; i < VARINT_UINT32_BYTES; i++) {
63 buffer[i] = (uint8_t)(value & 0x7f);
64 value >>= 7;
66 if (value == 0)
67 return i + 1;
69 buffer[i] |= 0x80;
70 }
72 return 0;
73 }
75 /* format_protobuf iterates over all metric families in "metrics" and adds them
76 * to a buffer in ProtoBuf format. It prefixes each protobuf with its encoded
77 * size, the so called "delimited" format. */
78 static void format_protobuf(ProtobufCBuffer *buffer) {
79 pthread_mutex_lock(&metrics_lock);
81 char *unused_name;
82 Io__Prometheus__Client__MetricFamily *fam;
83 c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
84 while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
85 /* Prometheus uses a message length prefix to determine where one
86 * MetricFamily ends and the next begins. This delimiter is encoded as a
87 * "varint", which is common in Protobufs. */
88 uint8_t delim[VARINT_UINT32_BYTES] = {0};
89 size_t delim_len = varint(
90 delim,
91 (uint32_t)io__prometheus__client__metric_family__get_packed_size(fam));
92 buffer->append(buffer, delim_len, delim);
94 io__prometheus__client__metric_family__pack_to_buffer(fam, buffer);
95 }
96 c_avl_iterator_destroy(iter);
98 pthread_mutex_unlock(&metrics_lock);
99 }
101 static char const *escape_label_value(char *buffer, size_t buffer_size,
102 char const *value) {
103 /* shortcut for values that don't need escaping. */
104 if (strpbrk(value, "\n\"\\") == NULL)
105 return value;
107 size_t value_len = strlen(value);
108 size_t buffer_len = 0;
110 for (size_t i = 0; i < value_len; i++) {
111 switch (value[i]) {
112 case '\n':
113 case '"':
114 case '\\':
115 if ((buffer_size - buffer_len) < 3) {
116 break;
117 }
118 buffer[buffer_len] = '\\';
119 buffer[buffer_len + 1] = (value[i] == '\n') ? 'n' : value[i];
120 buffer_len += 2;
121 break;
123 default:
124 if ((buffer_size - buffer_len) < 2) {
125 break;
126 }
127 buffer[buffer_len] = value[i];
128 buffer_len++;
129 break;
130 }
131 }
133 assert(buffer_len < buffer_size);
134 buffer[buffer_len] = 0;
135 return buffer;
136 }
138 /* format_labels formats a metric's labels in Prometheus-compatible format. This
139 * format looks like this:
140 *
141 * key0="value0",key1="value1"
142 */
143 static char *format_labels(char *buffer, size_t buffer_size,
144 Io__Prometheus__Client__Metric const *m) {
145 /* our metrics always have at least one and at most three labels. */
146 assert(m->n_label >= 1);
147 assert(m->n_label <= 3);
149 #define LABEL_KEY_SIZE DATA_MAX_NAME_LEN
150 #define LABEL_VALUE_SIZE (2 * DATA_MAX_NAME_LEN - 1)
151 #define LABEL_BUFFER_SIZE (LABEL_KEY_SIZE + LABEL_VALUE_SIZE + 4)
153 char *labels[3] = {
154 (char[LABEL_BUFFER_SIZE]){0}, (char[LABEL_BUFFER_SIZE]){0},
155 (char[LABEL_BUFFER_SIZE]){0},
156 };
158 /* N.B.: the label *names* are hard-coded by this plugin and therefore we
159 * know that they are sane. */
160 for (size_t i = 0; i < m->n_label; i++) {
161 char value[LABEL_VALUE_SIZE];
162 ssnprintf(labels[i], LABEL_BUFFER_SIZE, "%s=\"%s\"", m->label[i]->name,
163 escape_label_value(value, sizeof(value), m->label[i]->value));
164 }
166 strjoin(buffer, buffer_size, labels, m->n_label, ",");
167 return buffer;
168 }
170 /* format_protobuf iterates over all metric families in "metrics" and adds them
171 * to a buffer in plain text format. */
172 static void format_text(ProtobufCBuffer *buffer) {
173 pthread_mutex_lock(&metrics_lock);
175 char *unused_name;
176 Io__Prometheus__Client__MetricFamily *fam;
177 c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
178 while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
179 char line[1024]; /* 4x DATA_MAX_NAME_LEN? */
181 ssnprintf(line, sizeof(line), "# HELP %s %s\n", fam->name, fam->help);
182 buffer->append(buffer, strlen(line), (uint8_t *)line);
184 ssnprintf(line, sizeof(line), "# TYPE %s %s\n", fam->name,
185 (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
186 ? "gauge"
187 : "counter");
188 buffer->append(buffer, strlen(line), (uint8_t *)line);
190 for (size_t i = 0; i < fam->n_metric; i++) {
191 Io__Prometheus__Client__Metric *m = fam->metric[i];
193 char labels[1024];
195 char timestamp_ms[24] = "";
196 if (m->has_timestamp_ms)
197 ssnprintf(timestamp_ms, sizeof(timestamp_ms), " %" PRIi64,
198 m->timestamp_ms);
200 if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
201 ssnprintf(line, sizeof(line), "%s{%s} " GAUGE_FORMAT "%s\n", fam->name,
202 format_labels(labels, sizeof(labels), m), m->gauge->value,
203 timestamp_ms);
204 else /* if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER) */
205 ssnprintf(line, sizeof(line), "%s{%s} %.0f%s\n", fam->name,
206 format_labels(labels, sizeof(labels), m), m->counter->value,
207 timestamp_ms);
209 buffer->append(buffer, strlen(line), (uint8_t *)line);
210 }
211 }
212 c_avl_iterator_destroy(iter);
214 char server[1024];
215 ssnprintf(server, sizeof(server), "\n# collectd/write_prometheus %s at %s\n",
216 PACKAGE_VERSION, hostname_g);
217 buffer->append(buffer, strlen(server), (uint8_t *)server);
219 pthread_mutex_unlock(&metrics_lock);
220 }
222 /* http_handler is the callback called by the microhttpd library. It essentially
223 * handles all HTTP request aspects and creates an HTTP response. */
224 static int http_handler(void *cls, struct MHD_Connection *connection,
225 const char *url, const char *method,
226 const char *version, const char *upload_data,
227 size_t *upload_data_size, void **connection_state) {
228 if (strcmp(method, MHD_HTTP_METHOD_GET) != 0) {
229 return MHD_NO;
230 }
232 /* On the first call for each connection, return without anything further.
233 * Apparently not everything has been initialized yet or so; the docs are not
234 * very specific on the issue. */
235 if (*connection_state == NULL) {
236 /* set to a random non-NULL pointer. */
237 *connection_state = &(int){42};
238 return MHD_YES;
239 }
241 char const *accept = MHD_lookup_connection_value(connection, MHD_HEADER_KIND,
242 MHD_HTTP_HEADER_ACCEPT);
243 _Bool want_proto =
244 (accept != NULL) &&
245 (strstr(accept, "application/vnd.google.protobuf") != NULL);
247 uint8_t scratch[4096] = {0};
248 ProtobufCBufferSimple simple = PROTOBUF_C_BUFFER_SIMPLE_INIT(scratch);
249 ProtobufCBuffer *buffer = (ProtobufCBuffer *)&simple;
251 if (want_proto)
252 format_protobuf(buffer);
253 else
254 format_text(buffer);
256 #if defined(MHD_VERSION) && MHD_VERSION >= 0x00090500
257 struct MHD_Response *res = MHD_create_response_from_buffer(
258 simple.len, simple.data, MHD_RESPMEM_MUST_COPY);
259 #else
260 struct MHD_Response *res = MHD_create_response_from_data(
261 simple.len, simple.data, /* must_free = */ 0, /* must_copy = */ 1);
262 #endif
263 MHD_add_response_header(res, MHD_HTTP_HEADER_CONTENT_TYPE,
264 want_proto ? CONTENT_TYPE_PROTO : CONTENT_TYPE_TEXT);
266 int status = MHD_queue_response(connection, MHD_HTTP_OK, res);
268 MHD_destroy_response(res);
269 PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&simple);
270 return status;
271 }
273 /*
274 * Functions for manipulating the global state in "metrics". This is organized
275 * in two tiers: the global "metrics" tree holds "metric families", which are
276 * identified by a name (a string). Each metric family has one or more
277 * "metrics", which are identified by a unique set of key-value-pairs. For
278 * example:
279 *
280 * collectd_cpu_total
281 * {cpu="0",type="idle"}
282 * {cpu="0",type="user"}
283 * ...
284 * collectd_memory
285 * {memory="used"}
286 * {memory="free"}
287 * ...
288 * {{{ */
289 /* label_pair_destroy frees the memory used by a label pair. */
290 static void label_pair_destroy(Io__Prometheus__Client__LabelPair *msg) {
291 if (msg == NULL)
292 return;
294 sfree(msg->name);
295 sfree(msg->value);
297 sfree(msg);
298 }
300 /* label_pair_clone allocates and initializes a new label pair. */
301 static Io__Prometheus__Client__LabelPair *
302 label_pair_clone(Io__Prometheus__Client__LabelPair const *orig) {
303 Io__Prometheus__Client__LabelPair *copy = calloc(1, sizeof(*copy));
304 if (copy == NULL)
305 return NULL;
306 io__prometheus__client__label_pair__init(copy);
308 copy->name = strdup(orig->name);
309 copy->value = strdup(orig->value);
310 if ((copy->name == NULL) || (copy->value == NULL)) {
311 label_pair_destroy(copy);
312 return NULL;
313 }
315 return copy;
316 }
318 /* metric_destroy frees the memory used by a metric. */
319 static void metric_destroy(Io__Prometheus__Client__Metric *msg) {
320 if (msg == NULL)
321 return;
323 for (size_t i = 0; i < msg->n_label; i++) {
324 label_pair_destroy(msg->label[i]);
325 }
326 sfree(msg->label);
328 sfree(msg->gauge);
329 sfree(msg->counter);
331 sfree(msg);
332 }
334 /* metric_cmp compares two metrics. It's prototype makes it easy to use with
335 * qsort(3) and bsearch(3). */
336 static int metric_cmp(void const *a, void const *b) {
337 Io__Prometheus__Client__Metric const *m_a =
338 *((Io__Prometheus__Client__Metric **)a);
339 Io__Prometheus__Client__Metric const *m_b =
340 *((Io__Prometheus__Client__Metric **)b);
342 if (m_a->n_label < m_b->n_label)
343 return -1;
344 else if (m_a->n_label > m_b->n_label)
345 return 1;
347 /* Prometheus does not care about the order of labels. All labels in this
348 * plugin are created by METRIC_ADD_LABELS(), though, and therefore always
349 * appear in the same order. We take advantage of this and simplify the check
350 * by making sure all labels are the same in each position.
351 *
352 * We also only need to check the label values, because the label names are
353 * the same for all metrics in a metric family.
354 *
355 * 3 labels:
356 * [0] $plugin="$plugin_instance" => $plugin is the same within a family
357 * [1] type="$type_instance" => "type" is a static string
358 * [2] instance="$host" => "instance" is a static string
359 *
360 * 2 labels, variant 1:
361 * [0] $plugin="$plugin_instance" => $plugin is the same within a family
362 * [1] instance="$host" => "instance" is a static string
363 *
364 * 2 labels, variant 2:
365 * [0] $plugin="$type_instance" => $plugin is the same within a family
366 * [1] instance="$host" => "instance" is a static string
367 *
368 * 1 label:
369 * [1] instance="$host" => "instance" is a static string
370 */
371 for (size_t i = 0; i < m_a->n_label; i++) {
372 int status = strcmp(m_a->label[i]->value, m_b->label[i]->value);
373 if (status != 0)
374 return status;
376 #if COLLECT_DEBUG
377 assert(strcmp(m_a->label[i]->name, m_b->label[i]->name) == 0);
378 #endif
379 }
381 return 0;
382 }
384 #define METRIC_INIT \
385 &(Io__Prometheus__Client__Metric) { \
386 .label = \
387 (Io__Prometheus__Client__LabelPair *[]){ \
388 &(Io__Prometheus__Client__LabelPair){ \
389 .name = NULL, \
390 }, \
391 &(Io__Prometheus__Client__LabelPair){ \
392 .name = NULL, \
393 }, \
394 &(Io__Prometheus__Client__LabelPair){ \
395 .name = NULL, \
396 }, \
397 }, \
398 .n_label = 0, \
399 }
401 #define METRIC_ADD_LABELS(m, vl) \
402 do { \
403 if (strlen((vl)->plugin_instance) != 0) { \
404 (m)->label[(m)->n_label]->name = (char *)(vl)->plugin; \
405 (m)->label[(m)->n_label]->value = (char *)(vl)->plugin_instance; \
406 (m)->n_label++; \
407 } \
408 \
409 if (strlen((vl)->type_instance) != 0) { \
410 (m)->label[(m)->n_label]->name = "type"; \
411 if (strlen((vl)->plugin_instance) == 0) \
412 (m)->label[(m)->n_label]->name = (char *)(vl)->plugin; \
413 (m)->label[(m)->n_label]->value = (char *)(vl)->type_instance; \
414 (m)->n_label++; \
415 } \
416 \
417 (m)->label[(m)->n_label]->name = "instance"; \
418 (m)->label[(m)->n_label]->value = (char *)(vl)->host; \
419 (m)->n_label++; \
420 } while (0)
422 /* metric_clone allocates and initializes a new metric based on orig. */
423 static Io__Prometheus__Client__Metric *
424 metric_clone(Io__Prometheus__Client__Metric const *orig) {
425 Io__Prometheus__Client__Metric *copy = calloc(1, sizeof(*copy));
426 if (copy == NULL)
427 return NULL;
428 io__prometheus__client__metric__init(copy);
430 copy->n_label = orig->n_label;
431 copy->label = calloc(copy->n_label, sizeof(*copy->label));
432 if (copy->label == NULL) {
433 sfree(copy);
434 return NULL;
435 }
437 for (size_t i = 0; i < copy->n_label; i++) {
438 copy->label[i] = label_pair_clone(orig->label[i]);
439 if (copy->label[i] == NULL) {
440 metric_destroy(copy);
441 return NULL;
442 }
443 }
445 return copy;
446 }
448 /* metric_update stores the new value and timestamp in m. */
449 static int metric_update(Io__Prometheus__Client__Metric *m, value_t value,
450 int ds_type, cdtime_t t, cdtime_t interval) {
451 if (ds_type == DS_TYPE_GAUGE) {
452 sfree(m->counter);
453 if (m->gauge == NULL) {
454 m->gauge = calloc(1, sizeof(*m->gauge));
455 if (m->gauge == NULL)
456 return ENOMEM;
457 io__prometheus__client__gauge__init(m->gauge);
458 }
460 m->gauge->value = (double)value.gauge;
461 m->gauge->has_value = 1;
462 } else { /* not gauge */
463 sfree(m->gauge);
464 if (m->counter == NULL) {
465 m->counter = calloc(1, sizeof(*m->counter));
466 if (m->counter == NULL)
467 return ENOMEM;
468 io__prometheus__client__counter__init(m->counter);
469 }
471 switch (ds_type) {
472 case DS_TYPE_ABSOLUTE:
473 m->counter->value = (double)value.absolute;
474 break;
475 case DS_TYPE_COUNTER:
476 m->counter->value = (double)value.counter;
477 break;
478 default:
479 m->counter->value = (double)value.derive;
480 break;
481 }
482 m->counter->has_value = 1;
483 }
485 /* Prometheus has a globally configured timeout after which metrics are
486 * considered stale. This causes problems when metrics have an interval
487 * exceeding that limit. We emulate the behavior of "pushgateway" and *not*
488 * send a timestamp value – Prometheus will fill in the current time. */
489 if (interval <= staleness_delta) {
490 m->timestamp_ms = CDTIME_T_TO_MS(t);
491 m->has_timestamp_ms = 1;
492 } else {
493 static c_complain_t long_metric = C_COMPLAIN_INIT_STATIC;
494 c_complain(
495 LOG_NOTICE, &long_metric,
496 "write_prometheus plugin: You have metrics with an interval exceeding "
497 "\"StalenessDelta\" setting (%.3fs). This is suboptimal, please check "
498 "the collectd.conf(5) manual page to understand what's going on.",
499 CDTIME_T_TO_DOUBLE(staleness_delta));
501 m->timestamp_ms = 0;
502 m->has_timestamp_ms = 0;
503 }
505 return 0;
506 }
508 /* metric_family_add_metric adds m to the metric list of fam. */
509 static int metric_family_add_metric(Io__Prometheus__Client__MetricFamily *fam,
510 Io__Prometheus__Client__Metric *m) {
511 Io__Prometheus__Client__Metric **tmp =
512 realloc(fam->metric, (fam->n_metric + 1) * sizeof(*fam->metric));
513 if (tmp == NULL)
514 return ENOMEM;
515 fam->metric = tmp;
517 fam->metric[fam->n_metric] = m;
518 fam->n_metric++;
520 /* Sort the metrics so that lookup is fast. */
521 qsort(fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
523 return 0;
524 }
526 /* metric_family_delete_metric looks up and deletes the metric corresponding to
527 * vl. */
528 static int
529 metric_family_delete_metric(Io__Prometheus__Client__MetricFamily *fam,
530 value_list_t const *vl) {
531 Io__Prometheus__Client__Metric *key = METRIC_INIT;
532 METRIC_ADD_LABELS(key, vl);
534 size_t i;
535 for (i = 0; i < fam->n_metric; i++) {
536 if (metric_cmp(&key, &fam->metric[i]) == 0)
537 break;
538 }
540 if (i >= fam->n_metric)
541 return ENOENT;
543 metric_destroy(fam->metric[i]);
544 if ((fam->n_metric - 1) > i)
545 memmove(&fam->metric[i], &fam->metric[i + 1],
546 ((fam->n_metric - 1) - i) * sizeof(fam->metric[i]));
547 fam->n_metric--;
549 Io__Prometheus__Client__Metric **tmp =
550 realloc(fam->metric, fam->n_metric * sizeof(*fam->metric));
551 if ((tmp != NULL) || (fam->n_metric == 0))
552 fam->metric = tmp;
554 return 0;
555 }
557 /* metric_family_get_metric looks up the matching metric in a metric family,
558 * allocating it if necessary. */
559 static Io__Prometheus__Client__Metric *
560 metric_family_get_metric(Io__Prometheus__Client__MetricFamily *fam,
561 value_list_t const *vl) {
562 Io__Prometheus__Client__Metric *key = METRIC_INIT;
563 METRIC_ADD_LABELS(key, vl);
565 /* Metrics are sorted in metric_family_add_metric() so that we can do a binary
566 * search here. */
567 Io__Prometheus__Client__Metric **m = bsearch(
568 &key, fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
570 if (m != NULL) {
571 return *m;
572 }
574 Io__Prometheus__Client__Metric *new_metric = metric_clone(key);
575 if (new_metric == NULL)
576 return NULL;
578 DEBUG("write_prometheus plugin: created new metric in family");
579 int status = metric_family_add_metric(fam, new_metric);
580 if (status != 0) {
581 metric_destroy(new_metric);
582 return NULL;
583 }
585 return new_metric;
586 }
588 /* metric_family_update looks up the matching metric in a metric family,
589 * allocating it if necessary, and updates the metric to the latest value. */
590 static int metric_family_update(Io__Prometheus__Client__MetricFamily *fam,
591 data_set_t const *ds, value_list_t const *vl,
592 size_t ds_index) {
593 Io__Prometheus__Client__Metric *m = metric_family_get_metric(fam, vl);
594 if (m == NULL)
595 return -1;
597 return metric_update(m, vl->values[ds_index], ds->ds[ds_index].type, vl->time,
598 vl->interval);
599 }
601 /* metric_family_destroy frees the memory used by a metric family. */
602 static void metric_family_destroy(Io__Prometheus__Client__MetricFamily *msg) {
603 if (msg == NULL)
604 return;
606 sfree(msg->name);
607 sfree(msg->help);
609 for (size_t i = 0; i < msg->n_metric; i++) {
610 metric_destroy(msg->metric[i]);
611 }
612 sfree(msg->metric);
614 sfree(msg);
615 }
617 /* metric_family_create allocates and initializes a new metric family. */
618 static Io__Prometheus__Client__MetricFamily *
619 metric_family_create(char *name, data_set_t const *ds, value_list_t const *vl,
620 size_t ds_index) {
621 Io__Prometheus__Client__MetricFamily *msg = calloc(1, sizeof(*msg));
622 if (msg == NULL)
623 return NULL;
624 io__prometheus__client__metric_family__init(msg);
626 msg->name = name;
628 char help[1024];
629 ssnprintf(
630 help, sizeof(help),
631 "write_prometheus plugin: '%s' Type: '%s', Dstype: '%s', Dsname: '%s'",
632 vl->plugin, vl->type, DS_TYPE_TO_STRING(ds->ds[ds_index].type),
633 ds->ds[ds_index].name);
634 msg->help = strdup(help);
636 msg->type = (ds->ds[ds_index].type == DS_TYPE_GAUGE)
637 ? IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE
638 : IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER;
639 msg->has_type = 1;
641 return msg;
642 }
644 /* metric_family_name creates a metric family's name from a data source. This is
645 * done in the same way as done by the "collectd_exporter" for best possible
646 * compatibility. In essence, the plugin, type and data source name go in the
647 * metric family name, while hostname, plugin instance and type instance go into
648 * the labels of a metric. */
649 static char *metric_family_name(data_set_t const *ds, value_list_t const *vl,
650 size_t ds_index) {
651 char const *fields[5] = {"collectd"};
652 size_t fields_num = 1;
654 if (strcmp(vl->plugin, vl->type) != 0) {
655 fields[fields_num] = vl->plugin;
656 fields_num++;
657 }
658 fields[fields_num] = vl->type;
659 fields_num++;
661 if (strcmp("value", ds->ds[ds_index].name) != 0) {
662 fields[fields_num] = ds->ds[ds_index].name;
663 fields_num++;
664 }
666 /* Prometheus best practices:
667 * cumulative metrics should have a "total" suffix. */
668 if ((ds->ds[ds_index].type == DS_TYPE_COUNTER) ||
669 (ds->ds[ds_index].type == DS_TYPE_DERIVE)) {
670 fields[fields_num] = "total";
671 fields_num++;
672 }
674 char name[5 * DATA_MAX_NAME_LEN];
675 strjoin(name, sizeof(name), (char **)fields, fields_num, "_");
676 return strdup(name);
677 }
679 /* metric_family_get looks up the matching metric family, allocating it if
680 * necessary. */
681 static Io__Prometheus__Client__MetricFamily *
682 metric_family_get(data_set_t const *ds, value_list_t const *vl, size_t ds_index,
683 _Bool allocate) {
684 char *name = metric_family_name(ds, vl, ds_index);
685 if (name == NULL) {
686 ERROR("write_prometheus plugin: Allocating metric family name failed.");
687 return NULL;
688 }
690 Io__Prometheus__Client__MetricFamily *fam = NULL;
691 if (c_avl_get(metrics, name, (void *)&fam) == 0) {
692 sfree(name);
693 assert(fam != NULL);
694 return fam;
695 }
697 if (!allocate) {
698 sfree(name);
699 return NULL;
700 }
702 fam = metric_family_create(name, ds, vl, ds_index);
703 if (fam == NULL) {
704 ERROR("write_prometheus plugin: Allocating metric family failed.");
705 sfree(name);
706 return NULL;
707 }
709 /* If successful, "name" is owned by "fam", i.e. don't free it here. */
710 DEBUG("write_prometheus plugin: metric family \"%s\" has been created.",
711 name);
712 name = NULL;
714 int status = c_avl_insert(metrics, fam->name, fam);
715 if (status != 0) {
716 ERROR("write_prometheus plugin: Adding \"%s\" failed.", name);
717 metric_family_destroy(fam);
718 return NULL;
719 }
721 return fam;
722 }
723 /* }}} */
725 /*
726 * collectd callbacks
727 */
728 static int prom_config(oconfig_item_t *ci) {
729 for (int i = 0; i < ci->children_num; i++) {
730 oconfig_item_t *child = ci->children + i;
732 if (strcasecmp("Port", child->key) == 0) {
733 int status = cf_util_get_port_number(child);
734 if (status > 0)
735 httpd_port = (unsigned short)status;
736 } else if (strcasecmp("StalenessDelta", child->key) == 0) {
737 cf_util_get_cdtime(child, &staleness_delta);
738 } else {
739 WARNING("write_prometheus plugin: Ignoring unknown configuration option "
740 "\"%s\".",
741 child->key);
742 }
743 }
745 return 0;
746 }
748 static int prom_init() {
749 if (metrics == NULL) {
750 metrics = c_avl_create((void *)strcmp);
751 if (metrics == NULL) {
752 ERROR("write_prometheus plugin: c_avl_create() failed.");
753 return -1;
754 }
755 }
757 if (httpd == NULL) {
758 unsigned int flags = MHD_USE_THREAD_PER_CONNECTION;
759 #if MHD_VERSION >= 0x00093300
760 flags |= MHD_USE_DUAL_STACK;
761 #endif
763 httpd = MHD_start_daemon(flags, httpd_port,
764 /* MHD_AcceptPolicyCallback = */ NULL,
765 /* MHD_AcceptPolicyCallback arg = */ NULL,
766 http_handler, NULL, MHD_OPTION_END);
767 if (httpd == NULL) {
768 ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
769 return -1;
770 }
771 DEBUG("write_prometheus plugin: Successfully started microhttpd %s",
772 MHD_get_version());
773 }
775 return 0;
776 }
778 static int prom_write(data_set_t const *ds, value_list_t const *vl,
779 __attribute__((unused)) user_data_t *ud) {
780 pthread_mutex_lock(&metrics_lock);
782 for (size_t i = 0; i < ds->ds_num; i++) {
783 Io__Prometheus__Client__MetricFamily *fam =
784 metric_family_get(ds, vl, i, /* allocate = */ 1);
785 if (fam == NULL)
786 continue;
788 int status = metric_family_update(fam, ds, vl, i);
789 if (status != 0) {
790 ERROR("write_prometheus plugin: Updating metric \"%s\" failed with "
791 "status %d",
792 fam->name, status);
793 continue;
794 }
795 }
797 pthread_mutex_unlock(&metrics_lock);
798 return 0;
799 }
801 static int prom_missing(value_list_t const *vl,
802 __attribute__((unused)) user_data_t *ud) {
803 data_set_t const *ds = plugin_get_ds(vl->type);
804 if (ds == NULL)
805 return ENOENT;
807 pthread_mutex_lock(&metrics_lock);
809 for (size_t i = 0; i < ds->ds_num; i++) {
810 Io__Prometheus__Client__MetricFamily *fam =
811 metric_family_get(ds, vl, i, /* allocate = */ 0);
812 if (fam == NULL)
813 continue;
815 int status = metric_family_delete_metric(fam, vl);
816 if (status != 0) {
817 ERROR("write_prometheus plugin: Deleting a metric in family \"%s\" "
818 "failed with status %d",
819 fam->name, status);
821 continue;
822 }
824 if (fam->n_metric == 0) {
825 int status = c_avl_remove(metrics, fam->name, NULL, NULL);
826 if (status != 0) {
827 ERROR("write_prometheus plugin: Deleting metric family \"%s\" failed "
828 "with status %d",
829 fam->name, status);
830 continue;
831 }
832 metric_family_destroy(fam);
833 }
834 }
836 pthread_mutex_unlock(&metrics_lock);
837 return 0;
838 }
840 static int prom_shutdown() {
841 if (httpd != NULL) {
842 MHD_stop_daemon(httpd);
843 httpd = NULL;
844 }
846 pthread_mutex_lock(&metrics_lock);
847 if (metrics != NULL) {
848 char *name;
849 Io__Prometheus__Client__MetricFamily *fam;
850 while (c_avl_pick(metrics, (void *)&name, (void *)&fam) == 0) {
851 assert(name == fam->name);
852 name = NULL;
854 metric_family_destroy(fam);
855 }
856 c_avl_destroy(metrics);
857 metrics = NULL;
858 }
859 pthread_mutex_unlock(&metrics_lock);
861 return 0;
862 }
864 void module_register() {
865 plugin_register_complex_config("write_prometheus", prom_config);
866 plugin_register_init("write_prometheus", prom_init);
867 plugin_register_write("write_prometheus", prom_write,
868 /* user data = */ NULL);
869 plugin_register_missing("write_prometheus", prom_missing,
870 /* user data = */ NULL);
871 plugin_register_shutdown("write_prometheus", prom_shutdown);
872 }