From: Florian Forster Date: Tue, 16 Jan 2007 14:51:50 +0000 (+0100) Subject: network plugin: Use a global buffer and fill it as much as possible. X-Git-Tag: collectd-4.0.0-rc4~139 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=5661326c8102fa9035b870316e9f152f43aca31c;p=collectd.git network plugin: Use a global buffer and fill it as much as possible. This should lessen network load, hopefully. --- diff --git a/src/network.c b/src/network.c index 11c97181..cf2a0179 100644 --- a/src/network.c +++ b/src/network.c @@ -1,6 +1,6 @@ /** * collectd - src/network.c - * Copyright (C) 2005,2006 Florian octo Forster + * Copyright (C) 2005-2007 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -57,7 +57,7 @@ # endif #endif /* !IP_ADD_MEMBERSHIP */ -#define BUFF_SIZE 4096 +#define BUFF_SIZE 1024 /* * Private data types @@ -157,6 +157,12 @@ static int listen_sockets_num = 0; static pthread_t listen_thread = 0; static int listen_loop = 0; +static char send_buffer[BUFF_SIZE]; +static char *send_buffer_ptr; +static int send_buffer_fill; +static value_list_t send_buffer_vl = VALUE_LIST_INIT; +static char send_buffer_type[DATA_MAX_NAME_LEN]; + /* * Private functions */ @@ -225,9 +231,6 @@ static int write_part_string (char **ret_buffer, int *ret_buffer_len, part_string_t ps; int len; - if (str_len < 1) - return (-1); - len = 4 + str_len + 1; if (*ret_buffer_len < len) return (-1); @@ -238,7 +241,8 @@ static int write_part_string (char **ret_buffer, int *ret_buffer_len, ps.head->type = htons ((uint16_t) type); ps.head->length = htons ((uint16_t) str_len + 5); - memcpy (ps.value, str, str_len); + if (str_len > 0) + memcpy (ps.value, str, str_len); ps.value[str_len] = '\0'; *ret_buffer = (void *) (ps.value + (str_len + 1)); @@ -868,135 +872,150 @@ static void *receive_thread (void *arg) return (network_receive () ? (void *) 1 : (void *) 0); } /* void *receive_thread */ -#if 0 -int network_send (char *type, char *inst, char *value) +static void network_send_buffer (const char *buffer, int buffer_len) { - char buf[BUFF_SIZE]; - int buflen; - sockent_t *se; - - int ret; int status; - DBG ("type = %s, inst = %s, value = %s", type, inst, value); - - assert (operating_mode == MODE_CLIENT); - - buflen = snprintf (buf, BUFF_SIZE, "%s %s %s", type, inst, value); - if ((buflen >= BUFF_SIZE) || (buflen < 1)) - { - syslog (LOG_WARNING, "network_send: snprintf failed.."); - return (-1); - } - buf[buflen] = '\0'; - buflen++; - - if (socklist_head == NULL) - network_create_default_socket (0 /* listen == false */); + DBG ("buffer_len = %i", buffer_len); - ret = 0; - for (se = socklist_head; se != NULL; se = se->next) + for (se = sending_sockets; se != NULL; se = se->next) { - while (1) + while (42) { - status = sendto (se->fd, buf, buflen, 0, + status = sendto (se->fd, buffer, buffer_len, 0 /* no flags */, (struct sockaddr *) se->addr, se->addrlen); - - if (status == -1) + if (status < 0) { if (errno == EINTR) - { - DBG ("sendto was interrupted"); continue; - } - else - { - syslog (LOG_ERR, "sendto: %s", strerror (errno)); - ret = -1; - break; - } + syslog (LOG_ERR, "network plugin: sendto failed: %s", + strerror (errno)); + break; } - else if (ret >= 0) - ret++; - break; - } - } - - if (ret == 0) - syslog (LOG_WARNING, "Message wasn't sent to anybody.."); - return (ret); -} /* int network_send */ -#endif + break; + } /* while (42) */ + } /* for (sending_sockets) */ +} /* void network_send_buffer */ -static int network_write (const data_set_t *ds, const value_list_t *vl) +static int add_to_buffer (char *buffer, int buffer_size, + value_list_t *vl_def, char *type_def, + const data_set_t *ds, const value_list_t *vl) { - char buf[BUFF_SIZE]; - char *buf_ptr; - int buf_len; + if (strcmp (vl_def->host, vl->host) != 0) + { + if (write_part_string (&buffer, &buffer_size, TYPE_HOST, + vl->host, strlen (vl->host)) != 0) + return (-1); + strcpy (vl_def->host, vl->host); + DBG ("host = %s", vl->host); + } - sockent_t *se; + if (vl_def->time != vl->time) + { + if (write_part_number (&buffer, &buffer_size, TYPE_TIME, + (uint64_t) vl->time)) + return (-1); + vl_def->time = vl->time; + DBG ("time = %u", (unsigned int) vl->time); + } - DBG ("time = %u; host = %s; " - "plugin = %s; plugin_instance = %s; " - "type = %s; type_instance = %s;", - (unsigned int) vl->time, vl->host, - vl->plugin, vl->plugin_instance, - ds->type, vl->type_instance); - - buf_len = sizeof (buf); - buf_ptr = buf; - if (write_part_string (&buf_ptr, &buf_len, TYPE_HOST, - vl->host, strlen (vl->host)) != 0) - return (-1); - if (write_part_number (&buf_ptr, &buf_len, TYPE_TIME, - (uint64_t) vl->time)) - return (-1); - if (write_part_string (&buf_ptr, &buf_len, TYPE_PLUGIN, - vl->plugin, strlen (vl->plugin)) != 0) - return (-1); - if (strlen (vl->plugin_instance) > 0) - if (write_part_string (&buf_ptr, &buf_len, TYPE_PLUGIN_INSTANCE, + if (strcmp (vl_def->plugin, vl->plugin) != 0) + { + if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN, + vl->plugin, strlen (vl->plugin)) != 0) + return (-1); + strcpy (vl_def->plugin, vl->plugin); + DBG ("plugin = %s", vl->plugin); + } + + if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0) + { + if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE, vl->plugin_instance, strlen (vl->plugin_instance)) != 0) return (-1); - if (write_part_string (&buf_ptr, &buf_len, TYPE_TYPE, - ds->type, strlen (ds->type)) != 0) - return (-1); - if (strlen (vl->type_instance) > 0) - if (write_part_string (&buf_ptr, &buf_len, TYPE_PLUGIN_INSTANCE, + strcpy (vl_def->plugin_instance, vl->plugin_instance); + DBG ("plugin_instance = %s", vl->plugin_instance); + } + + if (strcmp (type_def, ds->type) != 0) + { + if (write_part_string (&buffer, &buffer_size, TYPE_TYPE, + ds->type, strlen (ds->type)) != 0) + return (-1); + strcpy (type_def, ds->type); + DBG ("type = %s", ds->type); + } + + if (strcmp (vl_def->type_instance, vl->type_instance) != 0) + { + if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE, vl->type_instance, strlen (vl->type_instance)) != 0) return (-1); + strcpy (vl_def->type_instance, vl->type_instance); + DBG ("type_instance = %s", vl->type_instance); + } - write_part_values (&buf_ptr, &buf_len, ds, vl); + if (write_part_values (&buffer, &buffer_size, ds, vl) != 0) + return (-1); - buf_len = sizeof (buf) - buf_len; + return (buffer_size); +} /* int add_to_buffer */ - for (se = sending_sockets; se != NULL; se = se->next) +static void flush_buffer (void) +{ + network_send_buffer (send_buffer, send_buffer_fill); + send_buffer_ptr = send_buffer; + send_buffer_fill = 0; + memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl)); + memset (send_buffer_type, '\0', sizeof (send_buffer_type)); +} + +static int network_write (const data_set_t *ds, const value_list_t *vl) +{ + int status; + /* TODO: lock buffer */ + status = add_to_buffer (send_buffer_ptr, + sizeof (send_buffer) - send_buffer_fill, + &send_buffer_vl, send_buffer_type, + ds, vl); + if (status >= 0) { - int status; + send_buffer_fill += status; + send_buffer_ptr += status; + } + else + { + flush_buffer (); - while (42) + status = add_to_buffer (send_buffer_ptr, + sizeof (send_buffer) - send_buffer_fill, + &send_buffer_vl, send_buffer_type, + ds, vl); + + if (status >= 0) { - status = sendto (se->fd, buf, buf_len, 0 /* no flags */, - (struct sockaddr *) se->addr, se->addrlen); - if (status < 0) - { - if (errno == EINTR) - continue; - syslog (LOG_ERR, "network: sendto failed: %s", - strerror (errno)); - break; - } + send_buffer_fill += status; + send_buffer_ptr += status; + } + } - break; - } /* while (42) */ - } /* for (sending_sockets) */ + if (status < 0) + { + syslog (LOG_ERR, "network plugin: Unable to append to the " + "buffer for some weird reason"); + } + else if ((sizeof (send_buffer) - send_buffer_fill) < 15) + { + flush_buffer (); + } + /* TODO: unlock buffer */ - return 0; -} + return ((status < 0) ? -1 : 0); +} /* int network_write */ static int network_config (const char *key, const char *val) { @@ -1060,6 +1079,11 @@ static int network_init (void) { plugin_register_shutdown ("network", network_shutdown); + send_buffer_ptr = send_buffer; + send_buffer_fill = 0; + memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl)); + memset (send_buffer_type, '\0', sizeof (send_buffer_type)); + /* setup socket(s) and so on */ if (sending_sockets != NULL) plugin_register_write ("network", network_write);