From 5e505d5063b668b17ac84f1ad474b0e3fc338818 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Mon, 3 Sep 2012 08:22:16 +0200 Subject: [PATCH] memcached plugin: Refactor the memcached_query_daemon() function. The connecting code has been broken out in separate functions and the writing and reading from the socket no longer uses poll(2), non-blocking I/O and a custom built retry logic. Instead block on I/O and let the read-thread-pool do its thing. --- src/memcached.c | 273 +++++++++++++++++++++++------------------------- 1 file changed, 130 insertions(+), 143 deletions(-) diff --git a/src/memcached.c b/src/memcached.c index b96845ea..2df28877 100644 --- a/src/memcached.c +++ b/src/memcached.c @@ -1,7 +1,7 @@ /** * collectd - src/memcached.c, based on src/hddtemp.c * Copyright (C) 2007 Antony Dovgal - * Copyright (C) 2007-2010 Florian Forster + * Copyright (C) 2007-2012 Florian Forster * Copyright (C) 2009 Doug MacEachern * Copyright (C) 2009 Franck Lombardi * Copyright (C) 2012 Nicolas Szalay @@ -33,23 +33,15 @@ #include "plugin.h" #include "configfile.h" -# include -# include -# include -# include -# include -# include - -/* Hack to work around the missing define in AIX */ -#ifndef MSG_DONTWAIT -# define MSG_DONTWAIT MSG_NONBLOCK -#endif +#include +#include +#include +#include +#include #define MEMCACHED_DEF_HOST "127.0.0.1" #define MEMCACHED_DEF_PORT "11211" -#define MEMCACHED_RETRY_COUNT 100 - struct memcached_s { char *name; @@ -74,177 +66,172 @@ static void memcached_free (memcached_t *st) sfree (st->port); } -static int memcached_query_daemon (char *buffer, int buffer_size, user_data_t *user_data) +static int memcached_connect_unix (memcached_t *st) { - int fd = -1; - ssize_t status; - int buffer_fill; - int i = 0; + struct sockaddr_un serv_addr; + int fd; - memcached_t *st; - st = user_data->data; - if (st->socket != NULL) + memset (&serv_addr, 0, sizeof (serv_addr)); + serv_addr.sun_family = AF_UNIX; + sstrncpy (serv_addr.sun_path, st->socket, + sizeof (serv_addr.sun_path)); + + /* create our socket descriptor */ + fd = socket (AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) { - struct sockaddr_un serv_addr; - - memset (&serv_addr, 0, sizeof (serv_addr)); - serv_addr.sun_family = AF_UNIX; - sstrncpy (serv_addr.sun_path, st->socket, - sizeof (serv_addr.sun_path)); - - /* create our socket descriptor */ - fd = socket (AF_UNIX, SOCK_STREAM, 0); - if (fd < 0) { - char errbuf[1024]; - ERROR ("memcached: unix socket: %s", sstrerror (errno, errbuf, - sizeof (errbuf))); - return -1; - } + char errbuf[1024]; + ERROR ("memcached: memcached_connect_unix: socket(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); } - else - { - const char *host; - const char *port; - struct addrinfo ai_hints; - struct addrinfo *ai_list, *ai_ptr; - int ai_return = 0; + return (fd); +} /* int memcached_connect_unix */ + +static int memcached_connect_inet (memcached_t *st) +{ + char *host; + char *port; - memset (&ai_hints, '\0', sizeof (ai_hints)); - ai_hints.ai_flags = 0; + struct addrinfo ai_hints; + struct addrinfo *ai_list, *ai_ptr; + int status; + int fd = -1; + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = 0; #ifdef AI_ADDRCONFIG - ai_hints.ai_flags |= AI_ADDRCONFIG; + ai_hints.ai_flags |= AI_ADDRCONFIG; #endif - ai_hints.ai_family = AF_UNSPEC; - ai_hints.ai_socktype = SOCK_STREAM; - ai_hints.ai_protocol = 0; + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_STREAM; + ai_hints.ai_protocol = 0; - host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST; - port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT; + host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST; + port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT; - if ((ai_return = getaddrinfo (host, port, &ai_hints, &ai_list)) != 0) { + ai_list = NULL; + status = getaddrinfo (host, port, &ai_hints, &ai_list); + if (status != 0) + { + char errbuf[1024]; + ERROR ("memcached: memcached_connect_inet: getaddrinfo(%s,%s) failed: %s", + host, port, + (status == EAI_SYSTEM) + ? sstrerror (errno, errbuf, sizeof (errbuf)) + : gai_strerror (status)); + return (-1); + } + + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + /* create our socket descriptor */ + fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + if (fd < 0) + { char errbuf[1024]; - ERROR ("memcached: getaddrinfo (%s, %s): %s", - host, port, - (ai_return == EAI_SYSTEM) - ? sstrerror (errno, errbuf, sizeof (errbuf)) - : gai_strerror (ai_return)); - return -1; + WARNING ("memcached: memcached_connect_inet: socket(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + continue; } - for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { - /* create our socket descriptor */ - fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); - if (fd < 0) { - char errbuf[1024]; - ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf))); - continue; - } - - /* connect to the memcached daemon */ - status = (ssize_t) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); - if (status != 0) { - shutdown (fd, SHUT_RDWR); - close (fd); - fd = -1; - continue; - } - - /* A socket could be opened and connecting succeeded. We're - * done. */ - break; + /* connect to the memcached daemon */ + status = (int) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + shutdown (fd, SHUT_RDWR); + close (fd); + fd = -1; + continue; } - freeaddrinfo (ai_list); + /* A socket could be opened and connecting succeeded. We're done. */ + break; } + freeaddrinfo (ai_list); + return (fd); +} /* int memcached_connect_inet */ + +static int memcached_connect (memcached_t *st) +{ + if (st->socket != NULL) + return (memcached_connect_unix (st)); + else + return (memcached_connect_inet (st)); +} + +static int memcached_query_daemon (char *buffer, size_t buffer_size, memcached_t *st) +{ + int fd = -1; + int status; + size_t buffer_fill; + + fd = memcached_connect (st); if (fd < 0) { ERROR ("memcached: Could not connect to daemon."); return -1; } - if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) { - ERROR ("memcached: Could not send command to the memcached daemon."); - return -1; - } - + status = (int) swrite (fd, "stats\r\n", strlen ("stats\r\n")); + if (status != 0) { - struct pollfd p; - int status; - - memset (&p, 0, sizeof (p)); - p.fd = fd; - p.events = POLLIN | POLLERR | POLLHUP; - p.revents = 0; - - status = poll (&p, /* nfds = */ 1, - /* timeout = */ CDTIME_T_TO_MS (interval_g)); - if (status <= 0) - { - if (status == 0) - { - ERROR ("memcached: poll(2) timed out after %.3f seconds.", - CDTIME_T_TO_DOUBLE (interval_g)); - } - else - { - char errbuf[1024]; - ERROR ("memcached: poll(2) failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - } - shutdown (fd, SHUT_RDWR); - close (fd); - return (-1); - } + char errbuf[1024]; + ERROR ("memcached plugin: write(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + shutdown(fd, SHUT_RDWR); + close (fd); + return (-1); } /* receive data from the memcached daemon */ - memset (buffer, '\0', buffer_size); + memset (buffer, 0, buffer_size); buffer_fill = 0; - while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) { - if (i > MEMCACHED_RETRY_COUNT) { - ERROR("recv() timed out"); - break; - } - i++; - - if (status == -1) { + while ((status = (int) recv (fd, buffer + buffer_fill, + buffer_size - buffer_fill, /* flags = */ 0)) != 0) + { + char const end_token[5] = {'E', 'N', 'D', '\r', '\n'}; + if (status < 0) + { char errbuf[1024]; - if (errno == EAGAIN) { - continue; - } + if ((errno == EAGAIN) || (errno == EINTR)) + continue; ERROR ("memcached: Error reading from socket: %s", sstrerror (errno, errbuf, sizeof (errbuf))); shutdown(fd, SHUT_RDWR); close (fd); - return -1; + return (-1); } - buffer_fill += status; - if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') { - /* we got all the data */ + buffer_fill += (size_t) status; + if (buffer_fill > buffer_size) + { + buffer_fill = buffer_size; + WARNING ("memcached plugin: Message was truncated."); break; } - } - if (buffer_fill >= buffer_size) { - buffer[buffer_size - 1] = '\0'; - WARNING ("memcached: Message from memcached has been truncated."); - } else if (buffer_fill == 0) { - WARNING ("memcached: Peer has unexpectedly shut down the socket. " - "Buffer: `%s'", buffer); - shutdown(fd, SHUT_RDWR); - close(fd); - return -1; + /* If buffer ends in end_token, we have all the data. */ + if (memcmp (buffer + buffer_fill - sizeof (end_token), + end_token, sizeof (end_token)) == 0) + break; + } /* while (recv) */ + + status = 0; + if (buffer_fill == 0) + { + WARNING ("memcached plugin: No data returned by memcached."); + status = -1; } shutdown(fd, SHUT_RDWR); close(fd); - return 0; -} + return (status); +} /* int memcached_query_daemon */ static int memcached_add_read_callback (memcached_t *st) { @@ -508,7 +495,7 @@ static int memcached_read (user_data_t *user_data) st = user_data->data; /* get data from daemon */ - if (memcached_query_daemon (buf, sizeof (buf), user_data) < 0) { + if (memcached_query_daemon (buf, sizeof (buf), st) < 0) { return -1; } @@ -645,7 +632,7 @@ static int memcached_read (user_data_t *user_data) } return 0; -} +} /* int memcached_read */ void module_register (void) { -- 2.30.2