From dca41ee04eef1ecacf75350a5fcc64c7db463618 Mon Sep 17 00:00:00 2001 From: oetiker Date: Sun, 14 Sep 2008 15:28:34 +0000 Subject: [PATCH] The previous code was broken: The response was read using `read(2)'. If the server wasn't sending fast enough, the client would stop reading before the entire message had been read. This patch changes the communication code to use the (line based) `fgets' function rather than the lower level `read' function. After reading the first line (which contains the total number of line to be expected), this precise number of lines is read - blocking if necessary. Also, the missing four new statistic values have been added to `rrdc_stats_get'. --Folorian Forester git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk@1506 a5681a0c-68f1-0310-ab6d-d61299d08faa --- program/src/rrd_client.c | 443 ++++++++++++++++----------------------- 1 file changed, 177 insertions(+), 266 deletions(-) diff --git a/program/src/rrd_client.c b/program/src/rrd_client.c index d1ad5e06..11fb80dc 100644 --- a/program/src/rrd_client.c +++ b/program/src/rrd_client.c @@ -48,84 +48,28 @@ typedef struct rrdc_response_s rrdc_response_t; static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; static int sd = -1; +static FILE *sh = NULL; static char *sd_path = NULL; /* cache the path for sd */ -static void _disconnect(void); -static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ +/* One must hold `lock' when calling `close_connection'. */ +static void close_connection (void) /* {{{ */ { - char *buffer; - size_t buffer_used; - size_t buffer_free; - ssize_t status; - - buffer = (char *) buffer_void; - buffer_used = 0; - buffer_free = buffer_size; - - while (buffer_free > 0) + if (sh != NULL) { - status = read (sd, buffer + buffer_used, buffer_free); - if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) - continue; - - if (status < 0) - return (-1); - - if (status == 0) - { - _disconnect(); - errno = EPROTO; - return (-1); - } - - assert ((0 > status) || (buffer_free >= (size_t) status)); - - buffer_free -= status; - buffer_used += status; - - if (buffer[buffer_used - 1] == '\n') - break; + fclose (sh); + sh = NULL; + sd = -1; } - - if (buffer[buffer_used - 1] != '\n') + else if (sd >= 0) { - errno = ENOBUFS; - return (-1); + close (sd); + sd = -1; } - buffer[buffer_used - 1] = '\0'; - return (buffer_used); -} /* }}} ssize_t sread */ - -static ssize_t swrite (const void *buf, size_t count) /* {{{ */ -{ - const char *ptr; - size_t nleft; - ssize_t status; - - ptr = (const char *) buf; - nleft = count; - - while (nleft > 0) - { - status = write (sd, (const void *) ptr, nleft); - - if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) - continue; - - if (status < 0) - { - _disconnect(); - rrd_set_error("lost connection to rrdcached"); - return (status); - } - - nleft -= status; - ptr += status; - } - - return (0); -} /* }}} ssize_t swrite */ + if (sd_path != NULL) + free (sd_path); + sd_path = NULL; +} /* }}} void close_connection */ static int buffer_add_string (const char *str, /* {{{ */ char **buffer_ret, size_t *buffer_size_ret) @@ -192,103 +136,151 @@ static int buffer_add_value (const char *value, /* {{{ */ return (buffer_add_string (temp, buffer_ret, buffer_size_ret)); } /* }}} int buffer_add_value */ -static int response_parse (char *buffer, size_t buffer_size, /* {{{ */ - rrdc_response_t **ret_response) +/* Remove trailing newline (NL) and carriage return (CR) characters. Similar to + * the Perl function `chomp'. Returns the number of characters that have been + * removed. */ +static int chomp (char *str) /* {{{ */ { - rrdc_response_t *ret; + size_t len; + int removed; + + if (str == NULL) + return (-1); + + len = strlen (str); + removed = 0; + while ((len > 0) && ((str[len - 1] == '\n') || (str[len - 1] == '\r'))) + { + str[len - 1] = 0; + len--; + removed++; + } + + return (removed); +} /* }}} int chomp */ + +static void response_free (rrdc_response_t *res) /* {{{ */ +{ + if (res == NULL) + return; + + if (res->lines != NULL) + { + size_t i; + + for (i = 0; i < res->lines_num; i++) + if (res->lines[i] != NULL) + free (res->lines[i]); + free (res->lines); + } + + free (res); +} /* }}} void response_free */ - char *dummy; - char *saveptr; +static int response_read (rrdc_response_t **ret_response) /* {{{ */ +{ + rrdc_response_t *ret; - char *line_ptr; - size_t line_counter; + char buffer[4096]; + char *buffer_ptr; - if (buffer == NULL) - return (EINVAL); - if (buffer_size <= 0) - return (EINVAL); + size_t i; - if (buffer[buffer_size - 1] != 0) + if (sh == NULL) return (-1); ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t)); if (ret == NULL) - return (ENOMEM); + return (-2); memset (ret, 0, sizeof (*ret)); + ret->lines = NULL; + ret->lines_num = 0; + + buffer_ptr = fgets (buffer, sizeof (buffer), sh); + if (buffer_ptr == NULL) + return (-3); + chomp (buffer); + + ret->status = strtol (buffer, &ret->message, 0); + if (buffer == ret->message) + { + response_free (ret); + return (-4); + } + /* Skip leading whitespace of the status message */ + ret->message += strspn (ret->message, " \t"); - line_counter = 0; + if (ret->status <= 0) + { + *ret_response = ret; + return (0); + } - dummy = buffer; - saveptr = NULL; - while ((line_ptr = strtok_r (dummy, "\r\n", &saveptr)) != NULL) + ret->lines = (char **) malloc (sizeof (char *) * ret->status); + if (ret->lines == NULL) { - dummy = NULL; + response_free (ret); + return (-5); + } + memset (ret->lines, 0, sizeof (char *) * ret->status); + ret->lines_num = (size_t) ret->status; - if (ret->message == NULL) + for (i = 0; i < ret->lines_num; i++) + { + buffer_ptr = fgets (buffer, sizeof (buffer), sh); + if (buffer_ptr == NULL) { - ret->status = strtol (buffer, &ret->message, 0); - if (buffer == ret->message) - { - free (ret); - return (EPROTO); - } - - /* Skip leading whitespace of the status message */ - ret->message += strspn (ret->message, " \t"); - - if (ret->status > 0) - { - ret->lines = (char **) malloc (sizeof (char *) * ret->status); - if (ret->lines == NULL) - { - free (ret); - return (ENOMEM); - } - memset (ret->lines, 0, sizeof (char *) * ret->status); - ret->lines_num = (size_t) ret->status; - } - else - { - ret->lines = NULL; - ret->lines_num = 0; - } + response_free (ret); + return (-6); } - else /* if (ret->message != NULL) */ + chomp (buffer); + + ret->lines[i] = strdup (buffer); + if (ret->lines[i] == NULL) { - if (line_counter < ret->lines_num) - ret->lines[line_counter] = line_ptr; - line_counter++; + response_free (ret); + return (-7); } - } /* while (strtok_r) */ - - if (ret->lines_num != line_counter) - { - errno = EPROTO; - if (ret->lines != NULL) - free (ret->lines); - free (ret); - return (-1); } *ret_response = ret; return (0); -} /* }}} int response_parse */ +} /* }}} rrdc_response_t *response_read */ -static void response_free (rrdc_response_t *res) /* {{{ */ +static int request (const char *buffer, size_t buffer_size, /* {{{ */ + rrdc_response_t **ret_response) { - if (res == NULL) - return; + int status; + rrdc_response_t *res; - if (res->lines != NULL) + pthread_mutex_lock (&lock); + + if (sh == NULL) { - res->lines_num = 0; - free (res->lines); - res->lines = NULL; + pthread_mutex_unlock (&lock); + return (ENOTCONN); } - free (res); -} /* }}} void response_free */ + status = (int) fwrite (buffer, buffer_size, /* nmemb = */ 1, sh); + if (status != 1) + { + close_connection (); + pthread_mutex_unlock (&lock); + return (-1); + } + fflush (sh); + res = NULL; + status = response_read (&res); + + pthread_mutex_unlock (&lock); + + if (status != 0) + return (status); + + *ret_response = res; + return (0); +} /* }}} int request */ /* determine whether we are connected to the specified daemon_addr if * NULL, return whether we are connected at all @@ -340,6 +332,15 @@ static int rrdc_connect_unix (const char *path) /* {{{ */ if (status != 0) { status = errno; + close_connection (); + return (status); + } + + sh = fdopen (sd, "r+"); + if (sh == NULL) + { + status = errno; + close_connection (); return (status); } @@ -383,7 +384,15 @@ static int rrdc_connect_network (const char *addr) /* {{{ */ if (status != 0) { status = errno; - _disconnect(); + close_connection(); + continue; + } + + sh = fdopen (sd, "r+"); + if (sh == NULL) + { + status = errno; + close_connection (); continue; } @@ -414,7 +423,7 @@ int rrdc_connect (const char *addr) /* {{{ */ } else { - _disconnect(); + close_connection(); } if (strncmp ("unix:", addr, strlen ("unix:")) == 0) @@ -436,23 +445,11 @@ int rrdc_connect (const char *addr) /* {{{ */ return (status); } /* }}} int rrdc_connect */ -static void _disconnect(void) /* {{{ */ -{ - if (sd >= 0) - close(sd); - - if (sd_path != NULL) - free(sd_path); - - sd = -1; - sd_path = NULL; -} /* }}} static void _disconnect(void) */ - int rrdc_disconnect (void) /* {{{ */ { pthread_mutex_lock (&lock); - _disconnect(); + close_connection(); pthread_mutex_unlock (&lock); @@ -466,6 +463,7 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */ char *buffer_ptr; size_t buffer_free; size_t buffer_size; + rrdc_response_t *res; int status; int i; @@ -493,37 +491,14 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */ assert (buffer[buffer_size - 1] == ' '); buffer[buffer_size - 1] = '\n'; - pthread_mutex_lock (&lock); - - if (sd < 0) - { - pthread_mutex_unlock (&lock); - return (ENOTCONN); - } - - status = swrite (buffer, buffer_size); + res = NULL; + status = request (buffer, buffer_size, &res); if (status != 0) - { - pthread_mutex_unlock (&lock); - return (status); - } - - status = sread (buffer, sizeof (buffer)); - if (status < 0) - { - status = errno; - pthread_mutex_unlock (&lock); return (status); - } - else if (status == 0) - { - pthread_mutex_unlock (&lock); - return (ENODATA); - } - pthread_mutex_unlock (&lock); + status = res->status; + response_free (res); - status = atoi (buffer); return (status); } /* }}} int rrdc_update */ @@ -533,6 +508,7 @@ int rrdc_flush (const char *filename) /* {{{ */ char *buffer_ptr; size_t buffer_free; size_t buffer_size; + rrdc_response_t *res; int status; if (filename == NULL) @@ -555,42 +531,18 @@ int rrdc_flush (const char *filename) /* {{{ */ assert (buffer[buffer_size - 1] == ' '); buffer[buffer_size - 1] = '\n'; - pthread_mutex_lock (&lock); - - if (sd < 0) - { - pthread_mutex_unlock (&lock); - return (ENOTCONN); - } - - status = swrite (buffer, buffer_size); + res = NULL; + status = request (buffer, buffer_size, &res); if (status != 0) - { - pthread_mutex_unlock (&lock); return (status); - } - status = sread (buffer, sizeof (buffer)); - if (status < 0) - { - status = errno; - pthread_mutex_unlock (&lock); - return (status); - } - else if (status == 0) - { - pthread_mutex_unlock (&lock); - return (ENODATA); - } - - pthread_mutex_unlock (&lock); + status = res->status; + response_free (res); - status = atoi (buffer); return (status); } /* }}} int rrdc_flush */ - /* convenience function; if there is a daemon specified, or if we can * detect one from the environment, then flush the file. Otherwise, no-op */ @@ -619,21 +571,11 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */ rrdc_stats_t *head; rrdc_stats_t *tail; - rrdc_response_t *response; + rrdc_response_t *res; - char buffer[4096]; - size_t buffer_size; int status; size_t i; - pthread_mutex_lock (&lock); - - if (sd < 0) - { - pthread_mutex_unlock (&lock); - return (ENOTCONN); - } - /* Protocol example: {{{ * -> STATS * <- 5 Statistics follow @@ -643,63 +585,28 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */ * <- TreeNodesNumber: 0 * <- TreeDepth: 0 * }}} */ - status = swrite ("STATS\n", strlen ("STATS\n")); - if (status != 0) - { - pthread_mutex_unlock (&lock); - return (status); - } - - status = sread (buffer, sizeof (buffer)); - if (status < 0) - { - status = errno; - pthread_mutex_unlock (&lock); - return (status); - } - else if (status == 0) - { - pthread_mutex_unlock (&lock); - return (ENODATA); - } - - pthread_mutex_unlock (&lock); - - /* Assert NULL termination */ - buffer_size = (size_t) status; - if (buffer[buffer_size - 1] != 0) - { - if (buffer_size < sizeof (buffer)) - { - buffer[buffer_size] = 0; - buffer_size++; - } - else - { - return (ENOBUFS); - } - } - status = response_parse (buffer, buffer_size, &response); + res = NULL; + status = request ("STATS\n", strlen ("STATS\n"), &res); if (status != 0) return (status); - if (response->status <= 0) + if (res->status <= 0) { - response_free (response); + response_free (res); return (EIO); } head = NULL; tail = NULL; - for (i = 0; i < response->lines_num; i++) + for (i = 0; i < res->lines_num; i++) { char *key; char *value; char *endptr; rrdc_stats_t *s; - key = response->lines[i]; + key = res->lines[i]; value = strchr (key, ':'); if (value == NULL) continue; @@ -718,14 +625,18 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */ endptr = NULL; if ((strcmp ("QueueLength", key) == 0) - || (strcmp ("TreeNodesNumber", key) == 0) - || (strcmp ("TreeDepth", key) == 0)) + || (strcmp ("TreeDepth", key) == 0) + || (strcmp ("TreeNodesNumber", key) == 0)) { s->type = RRDC_STATS_TYPE_GAUGE; s->value.gauge = strtod (value, &endptr); } - else if ((strcmp ("UpdatesWritten", key) == 0) - || (strcmp ("DataSetsWritten", key) == 0)) + else if ((strcmp ("DataSetsWritten", key) == 0) + || (strcmp ("FlushesReceived", key) == 0) + || (strcmp ("JournalBytes", key) == 0) + || (strcmp ("JournalRotate", key) == 0) + || (strcmp ("UpdatesReceived", key) == 0) + || (strcmp ("UpdatesWritten", key) == 0)) { s->type = RRDC_STATS_TYPE_COUNTER; s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0); @@ -754,9 +665,9 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */ tail->next = s; tail = s; } - } /* for (i = 0; i < response->lines_num; i++) */ + } /* for (i = 0; i < res->lines_num; i++) */ - response_free (response); + response_free (res); if (head == NULL) return (EPROTO); -- 2.39.5