From e43541cbedc435e38e6ed4c546960296b36e4e1e Mon Sep 17 00:00:00 2001 From: oetiker Date: Mon, 6 Oct 2008 19:05:47 +0000 Subject: [PATCH] This patch introduces "BATCH" mode. In this mode, a client can feed multiple commands to rrdcached without waiting for acknowledgement. This permits multiple commands to be sent for each read()/write(). This can dramatically increase the command throughput by increasing the amount of work done per system call. It enables over 100k updates/second with no CPU utilization due to the reduced system calls. -- kevin brintnall git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk@1581 a5681a0c-68f1-0310-ab6d-d61299d08faa --- program/doc/rrdcached.pod | 26 ++++++++++++++ program/src/rrd_daemon.c | 73 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/program/doc/rrdcached.pod b/program/doc/rrdcached.pod index e0571a4e..4bd1bb81 100644 --- a/program/doc/rrdcached.pod +++ b/program/doc/rrdcached.pod @@ -407,6 +407,32 @@ written out to disk. It is used during journal replay to determine which updates have already been applied. It is I valid in the journal; it is not accepted from the other command channels. +=item B + +This command initiates the bulk load of multiple commands. This is +designed for installations with extremely high update rates, since it +permits more than one command to be issued per read() and write(). + +All commands are executed just as they would be if given individually, +except for output to the user. Messages indicating success are +suppressed, and error messages are delayed until the client is finished. + +Command processing is finished when the client sends a dot (".") on its +own line. After the client has finished, the server responds with an +error count and the list of error messages (if any). Each error messages +indicates the number of the command to which it corresponds, and the error +message itself. The first user command after B is command number one. + + client: BATCH + server: 0 Go ahead. End with dot '.' on its own line. + client: UPDATE x.rrd N:1:2:3 <--- command #1 + client: UPDATE y.rrd N:3:4:5 <--- command #2 + client: and so on... + client: . + server: 2 Errors + server: 1 message for command 1 + server: 12 message for command 12 + =back =head2 Performance Values diff --git a/program/src/rrd_daemon.c b/program/src/rrd_daemon.c index 03dd181f..e2726e39 100644 --- a/program/src/rrd_daemon.c +++ b/program/src/rrd_daemon.c @@ -116,6 +116,10 @@ struct listen_socket_s int family; socket_privilege privilege; + /* state for BATCH processing */ + int batch_mode; + int batch_cmd; + /* buffered IO */ char *rbuf; off_t next_cmd; @@ -399,6 +403,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ int len; if (sock == NULL) return 0; /* journal replay mode */ + if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */ va_start(argp, fmt); #ifdef HAVE_VSNPRINTF @@ -446,10 +451,14 @@ static int send_response (listen_socket_t *sock, response_code rc, if (sock == NULL) return rc; /* journal replay mode */ - if (rc == RESP_OK) + if (sock->batch_mode) { - lines = count_lines(sock->wbuf); + if (rc == RESP_OK) + return rc; /* no response on success during BATCH */ + lines = sock->batch_cmd; } + else if (rc == RESP_OK) + lines = count_lines(sock->wbuf); else lines = -1; @@ -466,6 +475,10 @@ static int send_response (listen_socket_t *sock, response_code rc, len += rclen; + /* append the result to the wbuf, don't write to the user */ + if (sock->batch_mode) + return add_to_wbuf(sock, buffer, len); + /* first write must be complete */ if (len != write(sock->fd, buffer, len)) { @@ -969,6 +982,7 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ "FLUSHALL\n" "HELP []\n" "UPDATE [ ...]\n" + "BATCH\n" "STATS\n" }; @@ -1016,6 +1030,28 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ "a description of the values.\n" }; + char *help_batch[2] = + { + "Help for BATCH\n" + , + "The 'BATCH' command permits the client to initiate a bulk load\n" + " of commands to rrdcached.\n" + "\n" + "Usage:\n" + "\n" + " client: BATCH\n" + " server: 0 Go ahead. End with dot '.' on its own line.\n" + " client: command #1\n" + " client: command #2\n" + " client: ... and so on\n" + " client: .\n" + " server: 2 errors\n" + " server: 7 message for command #7\n" + " server: 9 message for command #9\n" + "\n" + "For more information, consult the rrdcached(1) documentation.\n" + }; + status = buffer_get_field (&buffer, &buffer_size, &command); if (status != 0) help_text = help_help; @@ -1029,6 +1065,8 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ help_text = help_flushall; else if (strcasecmp (command, "stats") == 0) help_text = help_stats; + else if (strcasecmp (command, "batch") == 0) + help_text = help_batch; else help_text = help_help; } @@ -1303,6 +1341,30 @@ static int handle_request_wrote (const char *buffer) /* {{{ */ return (0); } /* }}} int handle_request_wrote */ +/* start "BATCH" processing */ +static int batch_start (listen_socket_t *sock) /* {{{ */ +{ + int status; + if (sock->batch_mode) + return send_response(sock, RESP_ERR, "Already in BATCH\n"); + + status = send_response(sock, RESP_OK, + "Go ahead. End with dot '.' on its own line.\n"); + sock->batch_mode = 1; + sock->batch_cmd = 0; + + return status; +} /* }}} static int batch_start */ + +/* finish "BATCH" processing and return results to the client */ +static int batch_done (listen_socket_t *sock) /* {{{ */ +{ + assert(sock->batch_mode); + sock->batch_mode = 0; + sock->batch_cmd = 0; + return send_response(sock, RESP_OK, "errors\n"); +} /* }}} static int batch_done */ + /* returns 1 if we have the required privilege level */ static int has_privilege (listen_socket_t *sock, /* {{{ */ socket_privilege priv) @@ -1335,6 +1397,9 @@ static int handle_request (listen_socket_t *sock, /* {{{ */ return (-1); } + if (sock != NULL && sock->batch_mode) + sock->batch_cmd++; + if (strcasecmp (command, "update") == 0) { status = has_privilege(sock, PRIV_HIGH); @@ -1366,6 +1431,10 @@ static int handle_request (listen_socket_t *sock, /* {{{ */ return (handle_request_stats (sock)); else if (strcasecmp (command, "help") == 0) return (handle_request_help (sock, buffer_ptr, buffer_size)); + else if (strcasecmp (command, "batch") == 0 && sock != NULL) + return batch_start(sock); + else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode) + return batch_done(sock); else return send_response(sock, RESP_ERR, "Unknown command: %s\n", command); -- 2.30.2