From 3090a8852788aaca0d8063fdf9ac4ba66f048cd8 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Sun, 5 Jun 2016 13:59:49 +0200 Subject: [PATCH] Add a generic interface for parsing the text protocol. Implement it for PUTVAL for now. The text protocol is used in multiple places and the parser will avoid code duplication in client programs which can, instead, use it to generate the respective requests. Use the 'cmd_' prefix for public functions related to command handling. --- src/Makefile.am | 6 +- src/amqp.c | 8 +- src/exec.c | 2 +- src/unixsock.c | 2 +- src/utils_cmd_putval.c | 180 ++++++++++++++++++++++++++--------------- src/utils_cmd_putval.h | 10 ++- src/utils_cmds.c | 129 +++++++++++++++++++++++++++++ src/utils_cmds.h | 143 ++++++++++++++++++++++++++++++++ src/write_kafka.c | 4 +- 9 files changed, 407 insertions(+), 77 deletions(-) create mode 100644 src/utils_cmds.c create mode 100644 src/utils_cmds.h diff --git a/src/Makefile.am b/src/Makefile.am index 99a7c024..c545227f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -154,8 +154,9 @@ endif if BUILD_PLUGIN_AMQP pkglib_LTLIBRARIES += amqp.la amqp_la_SOURCES = amqp.c \ + utils_cmds.c utils_cmds.h \ utils_cmd_putval.c utils_cmd_putval.h \ - utils_parse_option.c utils_parse_option.h \ + utils_parse_option.c utils_parse_option.h \ utils_format_graphite.c utils_format_graphite.h amqp_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBRABBITMQ_LDFLAGS) amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS) @@ -419,6 +420,7 @@ endif if BUILD_PLUGIN_EXEC pkglib_LTLIBRARIES += exec.la exec_la_SOURCES = exec.c \ + utils_cmds.c utils_cmds.h \ utils_cmd_putnotif.c utils_cmd_putnotif.h \ utils_cmd_putval.c utils_cmd_putval.h \ utils_parse_option.h utils_parse_option.c @@ -1181,6 +1183,7 @@ endif if BUILD_PLUGIN_UNIXSOCK pkglib_LTLIBRARIES += unixsock.la unixsock_la_SOURCES = unixsock.c \ + utils_cmds.c utils_cmds.h \ utils_cmd_flush.h utils_cmd_flush.c \ utils_cmd_getval.h utils_cmd_getval.c \ utils_cmd_getthreshold.h utils_cmd_getthreshold.c \ @@ -1281,6 +1284,7 @@ if BUILD_PLUGIN_WRITE_KAFKA pkglib_LTLIBRARIES += write_kafka.la write_kafka_la_SOURCES = write_kafka.c \ utils_format_graphite.c utils_format_graphite.h \ + utils_cmds.c utils_cmds.h \ utils_cmd_putval.c utils_cmd_putval.h \ utils_crc32.c utils_crc32.h write_kafka_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRDKAFKA_CPPFLAGS) diff --git a/src/amqp.c b/src/amqp.c index 06fd1f1d..f9777a9b 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -599,9 +599,9 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ if (strcasecmp ("text/collectd", content_type) == 0) { - status = handle_putval (stderr, body); + status = cmd_handle_putval (stderr, body); if (status != 0) - ERROR ("amqp plugin: handle_putval failed with status %i.", + ERROR ("amqp plugin: cmd_handle_putval failed with status %i.", status); return (status); } @@ -838,10 +838,10 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ if (conf->format == CAMQP_FORMAT_COMMAND) { - status = create_putval (buffer, sizeof (buffer), ds, vl); + status = cmd_create_putval (buffer, sizeof (buffer), ds, vl); if (status != 0) { - ERROR ("amqp plugin: create_putval failed with status %i.", + ERROR ("amqp plugin: cmd_create_putval failed with status %i.", status); return (status); } diff --git a/src/exec.c b/src/exec.c index dfd4b05f..616d0794 100644 --- a/src/exec.c +++ b/src/exec.c @@ -547,7 +547,7 @@ failed: static int parse_line (char *buffer) /* {{{ */ { if (strncasecmp ("PUTVAL", buffer, strlen ("PUTVAL")) == 0) - return (handle_putval (stdout, buffer)); + return (cmd_handle_putval (stdout, buffer)); else if (strncasecmp ("PUTNOTIF", buffer, strlen ("PUTNOTIF")) == 0) return (handle_putnotif (stdout, buffer)); else diff --git a/src/unixsock.c b/src/unixsock.c index 808ba98b..c8d1950a 100644 --- a/src/unixsock.c +++ b/src/unixsock.c @@ -297,7 +297,7 @@ static void *us_handle_client (void *arg) } else if (strcasecmp (fields[0], "putval") == 0) { - handle_putval (fhout, buffer); + cmd_handle_putval (fhout, buffer); } else if (strcasecmp (fields[0], "listval") == 0) { diff --git a/src/utils_cmd_putval.c b/src/utils_cmd_putval.c index bf3e2b6f..36419c6a 100644 --- a/src/utils_cmd_putval.c +++ b/src/utils_cmd_putval.c @@ -1,6 +1,7 @@ /** * collectd - src/utils_cmd_putval.c * Copyright (C) 2007-2009 Florian octo Forster + * Copyright (C) 2016 Sebastian tokkee Harl * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -22,6 +23,7 @@ * * Authors: * Florian octo Forster + * Sebastian tokkee Harl **/ #include "collectd.h" @@ -29,20 +31,14 @@ #include "common.h" #include "plugin.h" +#include "utils_cmds.h" +#include "utils_cmd_putval.h" #include "utils_parse_option.h" #include "utils_cmd_putval.h" -#define print_to_socket(fh, ...) \ - do { \ - if (fprintf (fh, __VA_ARGS__) < 0) { \ - char errbuf[1024]; \ - WARNING ("handle_putval: failed to write to socket #%i: %s", \ - fileno (fh), sstrerror (errno, errbuf, sizeof (errbuf))); \ - sfree (vl.values); \ - return -1; \ - } \ - fflush(fh); \ - } while (0) +/* + * private helper functions + */ static int set_option (value_list_t *vl, const char *key, const char *value) { @@ -66,11 +62,15 @@ static int set_option (value_list_t *vl, const char *key, const char *value) return (1); return (0); -} /* int parse_option */ +} /* int set_option */ + +/* + * public API + */ -int handle_putval (FILE *fh, char *buffer) +cmd_status_t cmd_parse_putval (char *buffer, + cmd_putval_t *ret_putval, cmd_error_handler_t *err) { - char *command; char *identifier; char *hostname; char *plugin; @@ -78,7 +78,6 @@ int handle_putval (FILE *fh, char *buffer) char *type; char *type_instance; int status; - int values_submitted; char *identifier_copy; @@ -86,30 +85,12 @@ int handle_putval (FILE *fh, char *buffer) value_list_t vl = VALUE_LIST_INIT; vl.values = NULL; - DEBUG ("utils_cmd_putval: handle_putval (fh = %p, buffer = %s);", - (void *) fh, buffer); - - command = NULL; - status = parse_string (&buffer, &command); - if (status != 0) - { - print_to_socket (fh, "-1 Cannot parse command.\n"); - return (-1); - } - assert (command != NULL); - - if (strcasecmp ("PUTVAL", command) != 0) - { - print_to_socket (fh, "-1 Unexpected command: `%s'.\n", command); - return (-1); - } - identifier = NULL; status = parse_string (&buffer, &identifier); if (status != 0) { - print_to_socket (fh, "-1 Cannot parse identifier.\n"); - return (-1); + cmd_error (CMD_PARSE_ERROR, err, "Cannot parse identifier."); + return (CMD_PARSE_ERROR); } assert (identifier != NULL); @@ -122,12 +103,12 @@ int handle_putval (FILE *fh, char *buffer) &type, &type_instance); if (status != 0) { - DEBUG ("handle_putval: Cannot parse identifier `%s'.", + DEBUG ("cmd_handle_putval: Cannot parse identifier `%s'.", identifier); - print_to_socket (fh, "-1 Cannot parse identifier `%s'.\n", + cmd_error (CMD_PARSE_ERROR, err, "Cannot parse identifier `%s'.", identifier); sfree (identifier_copy); - return (-1); + return (CMD_PARSE_ERROR); } if ((strlen (hostname) >= sizeof (vl.host)) @@ -137,9 +118,9 @@ int handle_putval (FILE *fh, char *buffer) || ((type_instance != NULL) && (strlen (type_instance) >= sizeof (vl.type_instance)))) { - print_to_socket (fh, "-1 Identifier too long.\n"); + cmd_error (CMD_PARSE_ERROR, err, "Identifier too long."); sfree (identifier_copy); - return (-1); + return (CMD_PARSE_ERROR); } sstrncpy (vl.host, hostname, sizeof (vl.host)); @@ -151,10 +132,11 @@ int handle_putval (FILE *fh, char *buffer) sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance)); ds = plugin_get_ds (type); - if (ds == NULL) { - print_to_socket (fh, "-1 Type `%s' isn't defined.\n", type); + if (ds == NULL) + { + cmd_error (CMD_PARSE_ERROR, err, "1 Type `%s' isn't defined.", type); sfree (identifier_copy); - return (-1); + return (CMD_PARSE_ERROR); } /* Free identifier_copy */ @@ -167,14 +149,23 @@ int handle_putval (FILE *fh, char *buffer) vl.values = malloc (vl.values_len * sizeof (*vl.values)); if (vl.values == NULL) { - print_to_socket (fh, "-1 malloc failed.\n"); - return (-1); + cmd_error (CMD_ERROR, err, "malloc failed."); + return (CMD_ERROR); + } + + ret_putval->identifier = strdup (identifier); + if (ret_putval->identifier == NULL) + { + cmd_error (CMD_ERROR, err, "malloc failed."); + cmd_destroy_putval (ret_putval); + return (CMD_ERROR); } /* All the remaining fields are part of the optionlist. */ - values_submitted = 0; while (*buffer != 0) { + value_list_t *tmp; + char *string = NULL; char *value = NULL; @@ -183,9 +174,9 @@ int handle_putval (FILE *fh, char *buffer) { /* parse_option failed, buffer has been modified. * => we need to abort */ - print_to_socket (fh, "-1 Misformatted option.\n"); - sfree (vl.values); - return (-1); + cmd_error (CMD_PARSE_ERROR, err, "Misformatted option."); + cmd_destroy_putval (ret_putval); + return (CMD_PARSE_ERROR); } else if (status == 0) { @@ -200,35 +191,92 @@ int handle_putval (FILE *fh, char *buffer) status = parse_string (&buffer, &string); if (status != 0) { - print_to_socket (fh, "-1 Misformatted value.\n"); - sfree (vl.values); - return (-1); + cmd_error (CMD_PARSE_ERROR, err, "Misformatted value."); + cmd_destroy_putval (ret_putval); + return (CMD_PARSE_ERROR); } assert (string != NULL); status = parse_values (string, &vl, ds); if (status != 0) { - print_to_socket (fh, "-1 Parsing the values string failed.\n"); - sfree (vl.values); - return (-1); + cmd_error (CMD_PARSE_ERROR, err, "Parsing the values string failed."); + cmd_destroy_putval (ret_putval); + return (CMD_PARSE_ERROR); } - plugin_dispatch_values (&vl); - values_submitted++; + tmp = (value_list_t *) realloc (ret_putval->vl, + (ret_putval->vl_num + 1) * sizeof(*ret_putval->vl)); + if (tmp == NULL) + { + cmd_error (CMD_ERROR, err, "realloc failed."); + cmd_destroy_putval (ret_putval); + return (CMD_ERROR); + } + + ret_putval->vl = tmp; + ret_putval->vl_num++; + memcpy (&ret_putval->vl[ret_putval->vl_num - 1], &vl, sizeof (vl)); } /* while (*buffer != 0) */ /* Done parsing the options. */ - if (fh!=stdout) - print_to_socket (fh, "0 Success: %i %s been dispatched.\n", - values_submitted, - (values_submitted == 1) ? "value has" : "values have"); + return (CMD_OK); +} /* cmd_status_t cmd_parse_putval */ - sfree (vl.values); - return (0); -} /* int handle_putval */ +void cmd_destroy_putval (cmd_putval_t *putval) +{ + size_t i; + + if (putval == NULL) + return; + + sfree (putval->identifier); + + for (i = 0; i < putval->vl_num; ++i) + { + sfree (putval->vl[i].values); + meta_data_destroy (putval->vl[i].meta); + putval->vl[i].meta = NULL; + } + sfree (putval->vl); + putval->vl = NULL; + putval->vl_num = 0; +} /* void cmd_destroy_putval */ + +cmd_status_t cmd_handle_putval (FILE *fh, char *buffer) +{ + cmd_error_handler_t err = { cmd_error_fh, fh }; + cmd_t cmd; + size_t i; + + int status; + + DEBUG ("utils_cmd_putval: cmd_handle_putval (fh = %p, buffer = %s);", + (void *) fh, buffer); + + if ((status = cmd_parse (buffer, &cmd, &err)) != CMD_OK) + return (status); + if (cmd.type != CMD_PUTVAL) + { + cmd_error (CMD_UNKNOWN_COMMAND, &err, "Unexpected command: `%s'.", + CMD_TO_STRING (cmd.type)); + cmd_destroy (&cmd); + return (CMD_UNKNOWN_COMMAND); + } + + for (i = 0; i < cmd.cmd.putval.vl_num; ++i) + plugin_dispatch_values (&cmd.cmd.putval.vl[i]); + + if (fh != stdout) + cmd_error (CMD_OK, &err, "Success: %i %s been dispatched.", + (int)cmd.cmd.putval.vl_num, + (cmd.cmd.putval.vl_num == 1) ? "value has" : "values have"); + + cmd_destroy (&cmd); + return (CMD_OK); +} /* int cmd_handle_putval */ -int create_putval (char *ret, size_t ret_len, /* {{{ */ +int cmd_create_putval (char *ret, size_t ret_len, /* {{{ */ const data_set_t *ds, const value_list_t *vl) { char buffer_ident[6 * DATA_MAX_NAME_LEN]; @@ -255,4 +303,4 @@ int create_putval (char *ret, size_t ret_len, /* {{{ */ buffer_values); return (0); -} /* }}} int create_putval */ +} /* }}} int cmd_create_putval */ diff --git a/src/utils_cmd_putval.h b/src/utils_cmd_putval.h index 795409eb..bc7763d5 100644 --- a/src/utils_cmd_putval.h +++ b/src/utils_cmd_putval.h @@ -30,10 +30,16 @@ #include #include "plugin.h" +#include "utils_cmds.h" -int handle_putval (FILE *fh, char *buffer); +cmd_status_t cmd_parse_putval (char *buffer, + cmd_putval_t *ret_cmd, cmd_error_handler_t *err); -int create_putval (char *ret, size_t ret_len, +cmd_status_t cmd_handle_putval (FILE *fh, char *buffer); + +void cmd_destroy_putval (cmd_putval_t *putval); + +int cmd_create_putval (char *ret, size_t ret_len, const data_set_t *ds, const value_list_t *vl); #endif /* UTILS_CMD_PUTVAL_H */ diff --git a/src/utils_cmds.c b/src/utils_cmds.c new file mode 100644 index 00000000..8468267e --- /dev/null +++ b/src/utils_cmds.c @@ -0,0 +1,129 @@ +/** + * collectd - src/utils_cmds.c + * Copyright (C) 2016 Sebastian 'tokkee' Harl + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Sebastian 'tokkee' Harl + **/ + +#include "utils_cmds.h" +#include "utils_cmd_putval.h" +#include "utils_parse_option.h" +#include "daemon/common.h" + +#include +#include + +/* + * public API + */ + +void cmd_error (cmd_status_t status, cmd_error_handler_t *err, + const char *format, ...) +{ + va_list ap; + + if ((err == NULL) || (err->cb == NULL)) + return; + + va_start (ap, format); + err->cb (err->ud, status, format, ap); + va_end (ap); +} /* void cmd_error */ + +cmd_status_t cmd_parse (char *buffer, + cmd_t *ret_cmd, cmd_error_handler_t *err) +{ + char *command = NULL; + int status; + + if ((buffer == NULL) || (ret_cmd == NULL)) + { + errno = EINVAL; + cmd_error (CMD_ERROR, err, "Invalid arguments to cmd_parse."); + return CMD_ERROR; + } + + if ((status = parse_string (&buffer, &command)) != 0) + { + cmd_error (CMD_PARSE_ERROR, err, + "Failed to extract command from `%s'.", buffer); + return (CMD_PARSE_ERROR); + } + assert (command != NULL); + + memset (ret_cmd, 0, sizeof (*ret_cmd)); + if (strcasecmp ("PUTVAL", command) == 0) + { + ret_cmd->type = CMD_PUTVAL; + return cmd_parse_putval (buffer, &ret_cmd->cmd.putval, err); + } + else + { + ret_cmd->type = CMD_UNKNOWN; + cmd_error (CMD_UNKNOWN_COMMAND, err, + "Unknown command `%s'.", command); + return (CMD_UNKNOWN_COMMAND); + } + + return (CMD_OK); +} /* cmd_status_t cmd_parse */ + +void cmd_destroy (cmd_t *cmd) +{ + if (cmd == NULL) + return; + + switch (cmd->type) + { + case CMD_UNKNOWN: + /* nothing to do */ + break; + case CMD_PUTVAL: + cmd_destroy_putval (&cmd->cmd.putval); + break; + } +} /* void cmd_destroy */ + +void cmd_error_fh (void *ud, cmd_status_t status, + const char *format, va_list ap) +{ + FILE *fh = ud; + int code = -1; + char buf[1024]; + + if (status == CMD_OK) + code = 0; + + vsnprintf (buf, sizeof(buf), format, ap); + buf[sizeof (buf) - 1] = '\0'; + if (fprintf (fh, "%i %s\n", code, buf) < 0) + { + char errbuf[1024]; + WARNING ("utils_cmds: failed to write to file-handle #%i: %s", + fileno (fh), sstrerror (errno, errbuf, sizeof (errbuf))); + return; + } + + fflush (fh); +} /* void cmd_error_fh */ + +/* vim: set sw=4 ts=4 tw=78 noexpandtab : */ diff --git a/src/utils_cmds.h b/src/utils_cmds.h new file mode 100644 index 00000000..f3daa41f --- /dev/null +++ b/src/utils_cmds.h @@ -0,0 +1,143 @@ +/** + * collectd - src/utils_cmds.h + * Copyright (C) 2016 Sebastian 'tokkee' Harl + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Sebastian 'tokkee' Harl + **/ + +#ifndef UTILS_CMDS_H +#define UTILS_CMDS_H 1 + +#include "plugin.h" + +#include + +typedef enum { + CMD_UNKNOWN = 0, + CMD_PUTVAL = 1, +} cmd_type_t; +#define CMD_TO_STRING(type) \ + ((type) == CMD_PUTVAL) ? "PUTVAL" \ + : "UNKNOWN" + +typedef struct { + /* The raw identifier as provided by the user. */ + char *identifier; + + /* An array of the fully parsed identifier and all value lists, and their + * options as provided by the user. */ + value_list_t *vl; + size_t vl_num; +} cmd_putval_t; + +/* + * NAME + * cmd_t + * + * DESCRIPTION + * The representation of a fully parsed command. + */ +typedef struct { + cmd_type_t type; + union { + cmd_putval_t putval; + } cmd; +} cmd_t; + +/* + * NAME + * cmd_status_t + * + * DESCRIPTION + * Status codes describing the parse result. + */ +typedef enum { + CMD_OK = 0, + CMD_ERROR = -1, + CMD_PARSE_ERROR = -2, + CMD_UNKNOWN_COMMAND = -3, +} cmd_status_t; + +/* + * NAME + * cmd_error_handler_t + * + * DESCRIPTION + * An error handler describes a callback to be invoked when the parser + * encounters an error. The user data pointer will be passed to the callback + * as the first argument. + */ +typedef struct { + void (*cb) (void *, cmd_status_t, const char *, va_list); + void *ud; +} cmd_error_handler_t; + +/* + * NAME: + * cmd_error + * + * DESCRIPTION + * Reports an error via the specified error handler (if set). + */ +void cmd_error (cmd_status_t status, cmd_error_handler_t *err, + const char *format, ...); + +/* + * NAME + * cmd_parse + * + * DESCRIPTION + * Parse a command string and populate a command object. + * + * PARAMETERS + * `buffer' The command string to be parsed. + * `ret_cmd' The parse result will be stored at this location. + * `err' An optional error handler to invoke on error. + * + * RETURN VALUE + * CMD_OK on success or the respective error code otherwise. + */ +cmd_status_t cmd_parse (char *buffer, + cmd_t *ret_cmd, cmd_error_handler_t *err); + +void cmd_destroy (cmd_t *cmd); + +/* + * NAME + * cmd_error_fh + * + * DESCRIPTION + * An error callback writing the message to an open file handle using the + * format expected by the unixsock or exec plugins. + * + * PARAMETERS + * `ud' Error handler user-data pointer. This must be an open + * file-handle (FILE *). + * `status' The error status code. + * `format' Printf-style format string. + * `ap' Variable argument list providing the arguments for the format + * string. + */ +void cmd_error_fh (void *ud, cmd_status_t status, + const char *format, va_list ap); + +#endif /* UTILS_CMDS_H */ diff --git a/src/write_kafka.c b/src/write_kafka.c index 75da6aaa..4b422310 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -172,9 +172,9 @@ static int kafka_write(const data_set_t *ds, /* {{{ */ switch (ctx->format) { case KAFKA_FORMAT_COMMAND: - status = create_putval(buffer, sizeof(buffer), ds, vl); + status = cmd_create_putval(buffer, sizeof(buffer), ds, vl); if (status != 0) { - ERROR("write_kafka plugin: create_putval failed with status %i.", + ERROR("write_kafka plugin: cmd_create_putval failed with status %i.", status); return status; } -- 2.30.2