From: Florian Forster Date: Sat, 6 Sep 2014 08:58:45 +0000 (+0200) Subject: network plugin: Improve client connecting behavior. X-Git-Tag: collectd-5.3.2~41 X-Git-Url: https://git.tokkee.org/?p=collectd.git;a=commitdiff_plain;h=61adba6f19b8f6ec08b996f4b2ddc9d6fa5d7fc7 network plugin: Improve client connecting behavior. This moves the socket creation logic so it's called from networt_send_buffer_plain(). This allows us to recover after network failures or when collectd was started before the network was available. Fixes: #627 --- diff --git a/src/network.c b/src/network.c index 1b6cf1ea..ce9b0cc7 100644 --- a/src/network.c +++ b/src/network.c @@ -395,7 +395,7 @@ static _Bool check_send_notify_okay (const notification_t *n) /* {{{ */ { c_complain_once (LOG_ERR, &complain_forwarding, "network plugin: A notification has been received via the network " - "forwarding if enabled. Forwarding of notifications is currently " + "and forwarding is enabled. Forwarding of notifications is currently " "not supported, because there is not loop-deteciton available. " "Please contact the collectd mailing list if you need this " "feature."); @@ -1992,14 +1992,19 @@ static int network_bind_socket (int fd, const struct addrinfo *ai, const int int /* Initialize a sockent structure. `type' must be either `SOCKENT_TYPE_CLIENT' * or `SOCKENT_TYPE_SERVER' */ -static int sockent_init (sockent_t *se, int type) /* {{{ */ +static sockent_t *sockent_create (int type) /* {{{ */ { - if (se == NULL) - return (-1); + sockent_t *se; + + if ((type != SOCKENT_TYPE_CLIENT) || (type != SOCKENT_TYPE_SERVER)) + return (NULL); + se = malloc (sizeof (*se)); + if (se == NULL) + return (NULL); memset (se, 0, sizeof (*se)); - se->type = SOCKENT_TYPE_CLIENT; + se->type = type; se->node = NULL; se->service = NULL; se->interface = 0; @@ -2007,7 +2012,6 @@ static int sockent_init (sockent_t *se, int type) /* {{{ */ if (type == SOCKENT_TYPE_SERVER) { - se->type = SOCKENT_TYPE_SERVER; se->data.server.fd = NULL; #if HAVE_LIBGCRYPT se->data.server.security_level = SECURITY_LEVEL_NONE; @@ -2028,23 +2032,11 @@ static int sockent_init (sockent_t *se, int type) /* {{{ */ #endif } - return (0); -} /* }}} int sockent_init */ + return (se); +} /* }}} sockent_t *sockent_create */ -/* Open the file descriptors for a initialized sockent structure. */ -static int sockent_open (sockent_t *se) /* {{{ */ +static int sockent_init_crypto (sockent_t *se) /* {{{ */ { - struct addrinfo ai_hints; - struct addrinfo *ai_list, *ai_ptr; - int ai_return; - - const char *node; - const char *service; - - if (se == NULL) - return (-1); - - /* Set up the security structures. */ #if HAVE_LIBGCRYPT /* {{{ */ if (se->type == SOCKENT_TYPE_CLIENT) { @@ -2095,13 +2087,134 @@ static int sockent_open (sockent_t *se) /* {{{ */ } #endif /* }}} HAVE_LIBGCRYPT */ + return (0); +} /* }}} int sockent_init_crypto */ + +static int sockent_client_connect (sockent_t *se) /* {{{ */ +{ + static c_complain_t complaint = C_COMPLAIN_INIT_STATIC; + + struct sockent_client *client; + struct addrinfo ai_hints; + struct addrinfo *ai_list = NULL, *ai_ptr; + int status; + + if ((se == NULL) || (se->type != SOCKENT_TYPE_CLIENT)) + return (EINVAL); + + client = &se->data.client; + if (client->fd >= 0) /* already connected */ + return (0); + + memset (&ai_hints, 0, sizeof (ai_hints)); +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_DGRAM; + ai_hints.ai_protocol = IPPROTO_UDP; + + status = getaddrinfo (se->node, + (se->service != NULL) ? se->service : NET_DEFAULT_PORT, + &ai_hints, &ai_list); + if (status != 0) + { + c_complain (LOG_ERR, &complaint, + "network plugin: getaddrinfo (%s, %s) failed: %s", + (se->node == NULL) ? "(null)" : se->node, + (se->service == NULL) ? "(null)" : se->service, + gai_strerror (status)); + return (-1); + } + else + { + c_release (LOG_NOTICE, &complaint, + "network plugin: Successfully resolved \"%s\".", + se->node); + } + + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + client->fd = socket (ai_ptr->ai_family, + ai_ptr->ai_socktype, + ai_ptr->ai_protocol); + if (client->fd < 0) + { + char errbuf[1024]; + ERROR ("network plugin: socket(2) failed: %s", + sstrerror (errno, errbuf, + sizeof (errbuf))); + continue; + } + + client->addr = malloc (sizeof (*client->addr)); + if (client->addr == NULL) + { + ERROR ("network plugin: malloc failed."); + close (client->fd); + client->fd = -1; + continue; + } + + memset (client->addr, 0, sizeof (*client->addr)); + assert (sizeof (*client->addr) >= ai_ptr->ai_addrlen); + memcpy (client->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + client->addrlen = ai_ptr->ai_addrlen; + + network_set_ttl (se, ai_ptr); + network_set_interface (se, ai_ptr); + + /* We don't open more than one write-socket per + * node/service pair.. */ + break; + } + + freeaddrinfo (ai_list); + if (client->fd < 0) + return (-1); + return (0); +} /* }}} int sockent_client_connect */ + +static int sockent_client_disconnect (sockent_t *se) /* {{{ */ +{ + struct sockent_client *client; + + if ((se == NULL) || (se->type != SOCKENT_TYPE_CLIENT)) + return (EINVAL); + + client = &se->data.client; + if (client->fd >= 0) /* connected */ + { + close (client->fd); + client->fd = -1; + } + + sfree (client->addr); + client->addrlen = 0; + + return (0); +} /* }}} int sockent_client_disconnect */ + +/* Open the file descriptors for a initialized sockent structure. */ +static int sockent_server_listen (sockent_t *se) /* {{{ */ +{ + struct addrinfo ai_hints; + struct addrinfo *ai_list, *ai_ptr; + int status; + + const char *node; + const char *service; + + if (se == NULL) + return (-1); + node = se->node; service = se->service; if (service == NULL) service = NET_DEFAULT_PORT; - DEBUG ("network plugin: sockent_open: node = %s; service = %s;", + DEBUG ("network plugin: sockent_server_listen: node = %s; service = %s;", node, service); memset (&ai_hints, 0, sizeof (ai_hints)); @@ -2116,109 +2229,59 @@ static int sockent_open (sockent_t *se) /* {{{ */ ai_hints.ai_socktype = SOCK_DGRAM; ai_hints.ai_protocol = IPPROTO_UDP; - ai_return = getaddrinfo (node, service, &ai_hints, &ai_list); - if (ai_return != 0) + status = getaddrinfo (node, service, &ai_hints, &ai_list); + if (status != 0) { ERROR ("network plugin: getaddrinfo (%s, %s) failed: %s", (se->node == NULL) ? "(null)" : se->node, (se->service == NULL) ? "(null)" : se->service, - gai_strerror (ai_return)); + gai_strerror (status)); return (-1); } for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { - int status; + int *tmp; - if (se->type == SOCKENT_TYPE_SERVER) /* {{{ */ + tmp = realloc (se->data.server.fd, + sizeof (*tmp) * (se->data.server.fd_num + 1)); + if (tmp == NULL) { - int *tmp; - - tmp = realloc (se->data.server.fd, - sizeof (*tmp) * (se->data.server.fd_num + 1)); - if (tmp == NULL) - { - ERROR ("network plugin: realloc failed."); - continue; - } - se->data.server.fd = tmp; - tmp = se->data.server.fd + se->data.server.fd_num; - - *tmp = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, - ai_ptr->ai_protocol); - if (*tmp < 0) - { - char errbuf[1024]; - ERROR ("network plugin: socket(2) failed: %s", - sstrerror (errno, errbuf, - sizeof (errbuf))); - continue; - } - - status = network_bind_socket (*tmp, ai_ptr, se->interface); - if (status != 0) - { - close (*tmp); - *tmp = -1; - continue; - } - - se->data.server.fd_num++; + ERROR ("network plugin: realloc failed."); continue; - } /* }}} if (se->type == SOCKENT_TYPE_SERVER) */ - else /* if (se->type == SOCKENT_TYPE_CLIENT) {{{ */ - { - se->data.client.fd = socket (ai_ptr->ai_family, - ai_ptr->ai_socktype, - ai_ptr->ai_protocol); - if (se->data.client.fd < 0) - { - char errbuf[1024]; - ERROR ("network plugin: socket(2) failed: %s", - sstrerror (errno, errbuf, - sizeof (errbuf))); - continue; - } - - se->data.client.addr = malloc (sizeof (*se->data.client.addr)); - if (se->data.client.addr == NULL) - { - ERROR ("network plugin: malloc failed."); - close (se->data.client.fd); - se->data.client.fd = -1; - continue; - } + } + se->data.server.fd = tmp; + tmp = se->data.server.fd + se->data.server.fd_num; - memset (se->data.client.addr, 0, sizeof (*se->data.client.addr)); - assert (sizeof (*se->data.client.addr) >= ai_ptr->ai_addrlen); - memcpy (se->data.client.addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen); - se->data.client.addrlen = ai_ptr->ai_addrlen; + *tmp = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, + ai_ptr->ai_protocol); + if (*tmp < 0) + { + char errbuf[1024]; + ERROR ("network plugin: socket(2) failed: %s", + sstrerror (errno, errbuf, + sizeof (errbuf))); + continue; + } - network_set_ttl (se, ai_ptr); - network_set_interface (se, ai_ptr); + status = network_bind_socket (*tmp, ai_ptr, se->interface); + if (status != 0) + { + close (*tmp); + *tmp = -1; + continue; + } - /* We don't open more than one write-socket per - * node/service pair.. */ - break; - } /* }}} if (se->type == SOCKENT_TYPE_CLIENT) */ + se->data.server.fd_num++; + continue; } /* for (ai_list) */ freeaddrinfo (ai_list); - /* Check if all went well. */ - if (se->type == SOCKENT_TYPE_SERVER) - { - if (se->data.server.fd_num <= 0) - return (-1); - } - else /* if (se->type == SOCKENT_TYPE_CLIENT) */ - { - if (se->data.client.fd < 0) - return (-1); - } - + if (se->data.server.fd_num <= 0) + return (-1); return (0); -} /* }}} int sockent_open */ +} /* }}} int sockent_server_listen */ /* Add a sockent to the global list of sockets */ static int sockent_add (sockent_t *se) /* {{{ */ @@ -2486,26 +2549,32 @@ static void network_init_buffer (void) memset (&send_buffer_vl, 0, sizeof (send_buffer_vl)); } /* int network_init_buffer */ -static void networt_send_buffer_plain (const sockent_t *se, /* {{{ */ +static void networt_send_buffer_plain (sockent_t *se, /* {{{ */ const char *buffer, size_t buffer_size) { int status; while (42) { + status = sockent_client_connect (se); + if (status != 0) + return; + status = sendto (se->data.client.fd, buffer, buffer_size, - /* flags = */ 0, - (struct sockaddr *) se->data.client.addr, - se->data.client.addrlen); - if (status < 0) + /* flags = */ 0, + (struct sockaddr *) se->data.client.addr, + se->data.client.addrlen); + if (status < 0) { char errbuf[1024]; - if (errno == EINTR) + + if ((errno == EINTR) || (errno == EAGAIN)) continue; - ERROR ("network plugin: sendto failed: %s", - sstrerror (errno, errbuf, - sizeof (errbuf))); - break; + + ERROR ("network plugin: sendto failed: %s. Closing sending socket.", + sstrerror (errno, errbuf, sizeof (errbuf))); + sockent_client_disconnect (se); + return; } break; @@ -2518,7 +2587,7 @@ static void networt_send_buffer_plain (const sockent_t *se, /* {{{ */ buffer_offset += (s); \ } while (0) -static void networt_send_buffer_signed (const sockent_t *se, /* {{{ */ +static void networt_send_buffer_signed (sockent_t *se, /* {{{ */ const char *in_buffer, size_t in_buffer_size) { part_signature_sha256_t ps; @@ -3014,13 +3083,12 @@ static int network_config_add_listen (const oconfig_item_t *ci) /* {{{ */ return (-1); } - se = malloc (sizeof (*se)); + se = sockent_create (SOCKENT_TYPE_SERVER); if (se == NULL) { - ERROR ("network plugin: malloc failed."); + ERROR ("network plugin: sockent_create failed."); return (-1); } - sockent_init (se, SOCKENT_TYPE_SERVER); se->node = strdup (ci->values[0].value.string); if (ci->values_num >= 2) @@ -3060,10 +3128,18 @@ static int network_config_add_listen (const oconfig_item_t *ci) /* {{{ */ } #endif /* HAVE_LIBGCRYPT */ - status = sockent_open (se); + status = sockent_init_crypto (se); if (status != 0) { - ERROR ("network plugin: network_config_add_listen: sockent_open failed."); + ERROR ("network plugin: network_config_add_listen: sockent_init_crypto() failed."); + sockent_destroy (se); + return (-1); + } + + status = sockent_server_listen (se); + if (status != 0) + { + ERROR ("network plugin: network_config_add_server: sockent_server_listen failed."); sockent_destroy (se); return (-1); } @@ -3094,13 +3170,12 @@ static int network_config_add_server (const oconfig_item_t *ci) /* {{{ */ return (-1); } - se = malloc (sizeof (*se)); + se = sockent_create (SOCKENT_TYPE_CLIENT); if (se == NULL) { - ERROR ("network plugin: malloc failed."); + ERROR ("network plugin: sockent_create failed."); return (-1); } - sockent_init (se, SOCKENT_TYPE_CLIENT); se->node = strdup (ci->values[0].value.string); if (ci->values_num >= 2) @@ -3143,14 +3218,17 @@ static int network_config_add_server (const oconfig_item_t *ci) /* {{{ */ } #endif /* HAVE_LIBGCRYPT */ - status = sockent_open (se); + status = sockent_init_crypto (se); if (status != 0) { - ERROR ("network plugin: network_config_add_server: sockent_open failed."); + ERROR ("network plugin: network_config_add_server: sockent_init_crypto() failed."); sockent_destroy (se); return (-1); } + /* No call to sockent_client_connect() here -- it is called from + * networt_send_buffer_plain(). */ + status = sockent_add (se); if (status != 0) { @@ -3268,6 +3346,8 @@ static int network_notification (const notification_t *n, static int network_shutdown (void) { + sockent_t *se; + listen_loop++; /* Kill the listening thread */ @@ -3298,7 +3378,9 @@ static int network_shutdown (void) sfree (send_buffer); - /* TODO: Close `sending_sockets' */ + for (se = sending_sockets; se != NULL; se = se->next) + sockent_client_disconnect (se); + sockent_destroy (sending_sockets); plugin_unregister_config ("network"); plugin_unregister_init ("network");