summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: c330990)
raw | patch | inline | side by side (parent: c330990)
author | Florian Forster <octo@collectd.org> | |
Mon, 3 Sep 2012 06:22:16 +0000 (08:22 +0200) | ||
committer | Florian Forster <octo@collectd.org> | |
Mon, 3 Sep 2012 06:22:16 +0000 (08:22 +0200) |
The connecting code has been broken out in separate functions and the
writing and reading from the socket no longer uses poll(2),
non-blocking I/O and a custom built retry logic. Instead block on I/O and
let the read-thread-pool do its thing.
writing and reading from the socket no longer uses poll(2),
non-blocking I/O and a custom built retry logic. Instead block on I/O and
let the read-thread-pool do its thing.
src/memcached.c | patch | blob | history |
diff --git a/src/memcached.c b/src/memcached.c
index b96845eaf6beb42ce7a4329a2e0f91988b0b70aa..2df28877252a2d3d0ec95a4dbd7675ad38c159a8 100644 (file)
--- a/src/memcached.c
+++ b/src/memcached.c
/**
* collectd - src/memcached.c, based on src/hddtemp.c
* Copyright (C) 2007 Antony Dovgal
- * Copyright (C) 2007-2010 Florian Forster
+ * Copyright (C) 2007-2012 Florian Forster
* Copyright (C) 2009 Doug MacEachern
* Copyright (C) 2009 Franck Lombardi
* Copyright (C) 2012 Nicolas Szalay
#include "plugin.h"
#include "configfile.h"
-# include <poll.h>
-# include <netdb.h>
-# include <sys/socket.h>
-# include <sys/un.h>
-# include <netinet/in.h>
-# include <netinet/tcp.h>
-
-/* Hack to work around the missing define in AIX */
-#ifndef MSG_DONTWAIT
-# define MSG_DONTWAIT MSG_NONBLOCK
-#endif
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
#define MEMCACHED_DEF_HOST "127.0.0.1"
#define MEMCACHED_DEF_PORT "11211"
-#define MEMCACHED_RETRY_COUNT 100
-
struct memcached_s
{
char *name;
sfree (st->port);
}
-static int memcached_query_daemon (char *buffer, int buffer_size, user_data_t *user_data)
+static int memcached_connect_unix (memcached_t *st)
{
- int fd = -1;
- ssize_t status;
- int buffer_fill;
- int i = 0;
+ struct sockaddr_un serv_addr;
+ int fd;
- memcached_t *st;
- st = user_data->data;
- if (st->socket != NULL)
+ memset (&serv_addr, 0, sizeof (serv_addr));
+ serv_addr.sun_family = AF_UNIX;
+ sstrncpy (serv_addr.sun_path, st->socket,
+ sizeof (serv_addr.sun_path));
+
+ /* create our socket descriptor */
+ fd = socket (AF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0)
{
- struct sockaddr_un serv_addr;
-
- memset (&serv_addr, 0, sizeof (serv_addr));
- serv_addr.sun_family = AF_UNIX;
- sstrncpy (serv_addr.sun_path, st->socket,
- sizeof (serv_addr.sun_path));
-
- /* create our socket descriptor */
- fd = socket (AF_UNIX, SOCK_STREAM, 0);
- if (fd < 0) {
- char errbuf[1024];
- ERROR ("memcached: unix socket: %s", sstrerror (errno, errbuf,
- sizeof (errbuf)));
- return -1;
- }
+ char errbuf[1024];
+ ERROR ("memcached: memcached_connect_unix: socket(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ return (-1);
}
- else
- {
- const char *host;
- const char *port;
- struct addrinfo ai_hints;
- struct addrinfo *ai_list, *ai_ptr;
- int ai_return = 0;
+ return (fd);
+} /* int memcached_connect_unix */
+
+static int memcached_connect_inet (memcached_t *st)
+{
+ char *host;
+ char *port;
- memset (&ai_hints, '\0', sizeof (ai_hints));
- ai_hints.ai_flags = 0;
+ struct addrinfo ai_hints;
+ struct addrinfo *ai_list, *ai_ptr;
+ int status;
+ int fd = -1;
+
+ memset (&ai_hints, 0, sizeof (ai_hints));
+ ai_hints.ai_flags = 0;
#ifdef AI_ADDRCONFIG
- ai_hints.ai_flags |= AI_ADDRCONFIG;
+ ai_hints.ai_flags |= AI_ADDRCONFIG;
#endif
- ai_hints.ai_family = AF_UNSPEC;
- ai_hints.ai_socktype = SOCK_STREAM;
- ai_hints.ai_protocol = 0;
+ ai_hints.ai_family = AF_UNSPEC;
+ ai_hints.ai_socktype = SOCK_STREAM;
+ ai_hints.ai_protocol = 0;
- host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
- port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
+ host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
+ port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
- if ((ai_return = getaddrinfo (host, port, &ai_hints, &ai_list)) != 0) {
+ ai_list = NULL;
+ status = getaddrinfo (host, port, &ai_hints, &ai_list);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("memcached: memcached_connect_inet: getaddrinfo(%s,%s) failed: %s",
+ host, port,
+ (status == EAI_SYSTEM)
+ ? sstrerror (errno, errbuf, sizeof (errbuf))
+ : gai_strerror (status));
+ return (-1);
+ }
+
+ for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+ {
+ /* create our socket descriptor */
+ fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+ if (fd < 0)
+ {
char errbuf[1024];
- ERROR ("memcached: getaddrinfo (%s, %s): %s",
- host, port,
- (ai_return == EAI_SYSTEM)
- ? sstrerror (errno, errbuf, sizeof (errbuf))
- : gai_strerror (ai_return));
- return -1;
+ WARNING ("memcached: memcached_connect_inet: socket(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ continue;
}
- for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) {
- /* create our socket descriptor */
- fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
- if (fd < 0) {
- char errbuf[1024];
- ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf)));
- continue;
- }
-
- /* connect to the memcached daemon */
- status = (ssize_t) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
- if (status != 0) {
- shutdown (fd, SHUT_RDWR);
- close (fd);
- fd = -1;
- continue;
- }
-
- /* A socket could be opened and connecting succeeded. We're
- * done. */
- break;
+ /* connect to the memcached daemon */
+ status = (int) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+ if (status != 0)
+ {
+ shutdown (fd, SHUT_RDWR);
+ close (fd);
+ fd = -1;
+ continue;
}
- freeaddrinfo (ai_list);
+ /* A socket could be opened and connecting succeeded. We're done. */
+ break;
}
+ freeaddrinfo (ai_list);
+ return (fd);
+} /* int memcached_connect_inet */
+
+static int memcached_connect (memcached_t *st)
+{
+ if (st->socket != NULL)
+ return (memcached_connect_unix (st));
+ else
+ return (memcached_connect_inet (st));
+}
+
+static int memcached_query_daemon (char *buffer, size_t buffer_size, memcached_t *st)
+{
+ int fd = -1;
+ int status;
+ size_t buffer_fill;
+
+ fd = memcached_connect (st);
if (fd < 0) {
ERROR ("memcached: Could not connect to daemon.");
return -1;
}
- if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) {
- ERROR ("memcached: Could not send command to the memcached daemon.");
- return -1;
- }
-
+ status = (int) swrite (fd, "stats\r\n", strlen ("stats\r\n"));
+ if (status != 0)
{
- struct pollfd p;
- int status;
-
- memset (&p, 0, sizeof (p));
- p.fd = fd;
- p.events = POLLIN | POLLERR | POLLHUP;
- p.revents = 0;
-
- status = poll (&p, /* nfds = */ 1,
- /* timeout = */ CDTIME_T_TO_MS (interval_g));
- if (status <= 0)
- {
- if (status == 0)
- {
- ERROR ("memcached: poll(2) timed out after %.3f seconds.",
- CDTIME_T_TO_DOUBLE (interval_g));
- }
- else
- {
- char errbuf[1024];
- ERROR ("memcached: poll(2) failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- }
- shutdown (fd, SHUT_RDWR);
- close (fd);
- return (-1);
- }
+ char errbuf[1024];
+ ERROR ("memcached plugin: write(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ shutdown(fd, SHUT_RDWR);
+ close (fd);
+ return (-1);
}
/* receive data from the memcached daemon */
- memset (buffer, '\0', buffer_size);
+ memset (buffer, 0, buffer_size);
buffer_fill = 0;
- while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) {
- if (i > MEMCACHED_RETRY_COUNT) {
- ERROR("recv() timed out");
- break;
- }
- i++;
-
- if (status == -1) {
+ while ((status = (int) recv (fd, buffer + buffer_fill,
+ buffer_size - buffer_fill, /* flags = */ 0)) != 0)
+ {
+ char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
+ if (status < 0)
+ {
char errbuf[1024];
- if (errno == EAGAIN) {
- continue;
- }
+ if ((errno == EAGAIN) || (errno == EINTR))
+ continue;
ERROR ("memcached: Error reading from socket: %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
shutdown(fd, SHUT_RDWR);
close (fd);
- return -1;
+ return (-1);
}
- buffer_fill += status;
- if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') {
- /* we got all the data */
+ buffer_fill += (size_t) status;
+ if (buffer_fill > buffer_size)
+ {
+ buffer_fill = buffer_size;
+ WARNING ("memcached plugin: Message was truncated.");
break;
}
- }
- if (buffer_fill >= buffer_size) {
- buffer[buffer_size - 1] = '\0';
- WARNING ("memcached: Message from memcached has been truncated.");
- } else if (buffer_fill == 0) {
- WARNING ("memcached: Peer has unexpectedly shut down the socket. "
- "Buffer: `%s'", buffer);
- shutdown(fd, SHUT_RDWR);
- close(fd);
- return -1;
+ /* If buffer ends in end_token, we have all the data. */
+ if (memcmp (buffer + buffer_fill - sizeof (end_token),
+ end_token, sizeof (end_token)) == 0)
+ break;
+ } /* while (recv) */
+
+ status = 0;
+ if (buffer_fill == 0)
+ {
+ WARNING ("memcached plugin: No data returned by memcached.");
+ status = -1;
}
shutdown(fd, SHUT_RDWR);
close(fd);
- return 0;
-}
+ return (status);
+} /* int memcached_query_daemon */
static int memcached_add_read_callback (memcached_t *st)
{
st = user_data->data;
/* get data from daemon */
- if (memcached_query_daemon (buf, sizeof (buf), user_data) < 0) {
+ if (memcached_query_daemon (buf, sizeof (buf), st) < 0) {
return -1;
}
}
return 0;
-}
+} /* int memcached_read */
void module_register (void)
{