X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fnetwork.c;h=cf2a01799cbb1723c710357be176810aac053fb6;hb=f8c6d793143453b064c68f92cb17e88302b8bbb9;hp=4516331e9096507ae12eeb89c670d2ee242c58a6;hpb=634504760b46b852ec2b812a7b68277e9c005f1b;p=collectd.git diff --git a/src/network.c b/src/network.c index 4516331e..cf2a0179 100644 --- a/src/network.c +++ b/src/network.c @@ -1,6 +1,6 @@ /** * collectd - src/network.c - * Copyright (C) 2005,2006 Florian octo Forster + * Copyright (C) 2005-2007 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -57,7 +57,7 @@ # endif #endif /* !IP_ADD_MEMBERSHIP */ -#define BUFF_SIZE 4096 +#define BUFF_SIZE 1024 /* * Private data types @@ -98,6 +98,21 @@ struct part_string_s }; typedef struct part_string_s part_string_t; +/* 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-------------------------------+-------------------------------+ + * ! Type ! Length ! + * +-------------------------------+-------------------------------+ + * : (Length - 4 == 2 || 4 || 8) Bytes : + * +---------------------------------------------------------------+ + */ +struct part_number_s +{ + part_header_t *head; + uint64_t *value; +}; +typedef struct part_number_s part_number_t; + /* 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 * +-------------------------------+-------------------------------+ @@ -142,6 +157,12 @@ static int listen_sockets_num = 0; static pthread_t listen_thread = 0; static int listen_loop = 0; +static char send_buffer[BUFF_SIZE]; +static char *send_buffer_ptr; +static int send_buffer_fill; +static value_list_t send_buffer_vl = VALUE_LIST_INIT; +static char send_buffer_type[DATA_MAX_NAME_LEN]; + /* * Private functions */ @@ -183,15 +204,33 @@ static int write_part_values (char **ret_buffer, int *ret_buffer_len, return (0); } /* int write_part_values */ +static int write_part_number (char **ret_buffer, int *ret_buffer_len, + int type, uint64_t value) +{ + part_number_t pn; + + if (*ret_buffer_len < 12) + return (-1); + + pn.head = (part_header_t *) *ret_buffer; + pn.value = (uint64_t *) (pn.head + 1); + + pn.head->type = htons (type); + pn.head->length = htons (12); + *pn.value = htonll (value); + + *ret_buffer = (char *) (pn.value + 1); + *ret_buffer_len -= 12; + + return (0); +} /* int write_part_number */ + static int write_part_string (char **ret_buffer, int *ret_buffer_len, int type, const char *str, int str_len) { part_string_t ps; int len; - if (str_len < 1) - return (-1); - len = 4 + str_len + 1; if (*ret_buffer_len < len) return (-1); @@ -201,10 +240,11 @@ static int write_part_string (char **ret_buffer, int *ret_buffer_len, ps.value = (char *) (ps.head + 1); ps.head->type = htons ((uint16_t) type); - ps.head->length = htons ((uint16_t) str_len + 4); - memcpy (ps.value, str, str_len); + ps.head->length = htons ((uint16_t) str_len + 5); + if (str_len > 0) + memcpy (ps.value, str, str_len); ps.value[str_len] = '\0'; - *ret_buffer = (void *) (ps.value + str_len); + *ret_buffer = (void *) (ps.value + (str_len + 1)); return (0); } /* int write_part_string */ @@ -214,83 +254,121 @@ static int parse_part_values (void **ret_buffer, int *ret_buffer_len, { char *buffer = *ret_buffer; int buffer_len = *ret_buffer_len; - part_values_t *pvalues; + part_values_t pv; int i; + uint16_t h_length; + uint16_t h_type; + uint16_t h_num; + if (buffer_len < (15)) { - DBG ("packet is too short"); + DBG ("packet is too short: buffer_len = %i", buffer_len); return (-1); } - pvalues = (part_values_t *) malloc (sizeof (part_values_t)); - if (pvalues == NULL) - return (-1); + pv.head = (part_header_t *) buffer; + h_length = ntohs (pv.head->length); + h_type = ntohs (pv.head->type); - pvalues->head = (part_header_t *) buffer; - assert (pvalues->head->type == htons (TYPE_VALUES)); + assert (h_type == TYPE_VALUES); - pvalues->num_values = (uint16_t *) (buffer + 4); - if (ntohs (*pvalues->num_values) - != ((ntohs (pvalues->head->length) - 6) / 9)) + pv.num_values = (uint16_t *) (pv.head + 1); + h_num = ntohs (*pv.num_values); + + if (h_num != ((h_length - 6) / 9)) { DBG ("`length' and `num of values' don't match"); - free (pvalues); return (-1); } - pvalues->values_types = (uint8_t *) (buffer + 6); - pvalues->values = (value_t *) (buffer + 6 + *pvalues->num_values); - - for (i = 0; i < *pvalues->num_values; i++) - if (pvalues->values_types[i] == DS_TYPE_COUNTER) - pvalues->values[i].counter = ntohll (pvalues->values[i].counter); + pv.values_types = (uint8_t *) (pv.num_values + 1); + pv.values = (value_t *) (pv.values_types + h_num); - *ret_buffer = (void *) buffer; - *ret_buffer_len = buffer_len - pvalues->head->length; - *ret_num_values = *pvalues->num_values; - *ret_values = pvalues->values; + for (i = 0; i < h_num; i++) + if (pv.values_types[i] == DS_TYPE_COUNTER) + pv.values[i].counter = ntohll (pv.values[i].counter); - free (pvalues); + *ret_buffer = (void *) (pv.values + h_num); + *ret_buffer_len = buffer_len - h_length; + *ret_num_values = h_num; + *ret_values = pv.values; return (0); } /* int parse_part_values */ +static int parse_part_number (void **ret_buffer, int *ret_buffer_len, + uint64_t *value) +{ + part_number_t pn; + uint16_t len; + + pn.head = (part_header_t *) *ret_buffer; + pn.value = (uint64_t *) (pn.head + 1); + + len = ntohs (pn.head->length); + if (len != 12) + return (-1); + if (len > *ret_buffer_len) + return (-1); + *value = ntohll (*pn.value); + + *ret_buffer = (void *) (pn.value + 1); + *ret_buffer_len -= len; + + return (0); +} /* int parse_part_number */ + static int parse_part_string (void **ret_buffer, int *ret_buffer_len, char *output, int output_len) { char *buffer = *ret_buffer; int buffer_len = *ret_buffer_len; - part_string_t part_string; + part_string_t ps; + + uint16_t h_length; + uint16_t h_type; + + DBG ("ret_buffer = %p; ret_buffer_len = %i; output = %p; output_len = %i;", + *ret_buffer, *ret_buffer_len, + (void *) output, output_len); - part_string.head = (part_header_t *) buffer; - if (buffer_len < part_string.head->length) + ps.head = (part_header_t *) buffer; + + h_length = ntohs (ps.head->length); + h_type = ntohs (ps.head->type); + + DBG ("length = %hu; type = %hu;", h_length, h_type); + + if (buffer_len < h_length) { DBG ("packet is too short"); return (-1); } - assert ((part_string.head->type == htons (TYPE_HOST)) - || (part_string.head->type == htons (TYPE_PLUGIN)) - || (part_string.head->type == htons (TYPE_PLUGIN_INSTANCE)) - || (part_string.head->type == htons (TYPE_TYPE)) - || (part_string.head->type == htons (TYPE_TYPE_INSTANCE))); - - part_string.value = buffer + 4; - if (part_string.value[part_string.head->length - 5] != '\0') + assert ((h_type == TYPE_HOST) + || (h_type == TYPE_PLUGIN) + || (h_type == TYPE_PLUGIN_INSTANCE) + || (h_type == TYPE_TYPE) + || (h_type == TYPE_TYPE_INSTANCE)); + + ps.value = buffer + 4; + if (ps.value[h_length - 5] != '\0') { DBG ("String does not end with a nullbyte"); return (-1); } - if (output_len < (part_string.head->length - 4)) + if (output_len < (h_length - 4)) { DBG ("output buffer is too small"); return (-1); } - strcpy (output, part_string.value); + strcpy (output, ps.value); + + DBG ("output = %s", output); - *ret_buffer = (void *) (buffer + part_string.head->length); - *ret_buffer_len = buffer_len - part_string.head->length; + *ret_buffer = (void *) (buffer + h_length); + *ret_buffer_len = buffer_len - h_length; return (0); } /* int parse_part_string */ @@ -300,51 +378,74 @@ static int parse_packet (void *buffer, int buffer_len) part_header_t *header; int status; - value_list_t vl; + value_list_t vl = VALUE_LIST_INIT; char type[DATA_MAX_NAME_LEN]; + DBG ("buffer = %p; buffer_len = %i;", buffer, buffer_len); + memset (&vl, '\0', sizeof (vl)); memset (&type, '\0', sizeof (type)); + status = 0; - while (buffer_len > sizeof (part_header_t)) + while ((status == 0) && (buffer_len > sizeof (part_header_t))) { header = (part_header_t *) buffer; - if (header->length > buffer_len) + if (ntohs (header->length) > buffer_len) break; - if (header->type == TYPE_VALUES) + if (header->type == htons (TYPE_VALUES)) { status = parse_part_values (&buffer, &buffer_len, &vl.values, &vl.values_len); - if ((status == 0) + if (status != 0) + { + DBG ("parse_part_values failed."); + break; + } + + if ((vl.time > 0) && (strlen (vl.host) > 0) && (strlen (vl.plugin) > 0) && (strlen (type) > 0)) + { + DBG ("dispatching values"); plugin_dispatch_values (type, &vl); + } + else + { + DBG ("NOT dispatching values"); + } } - else if (header->type == TYPE_HOST) + else if (header->type == ntohs (TYPE_TIME)) + { + uint64_t tmp = 0; + status = parse_part_number (&buffer, &buffer_len, &tmp); + if (status == 0) + vl.time = (time_t) tmp; + } + else if (header->type == ntohs (TYPE_HOST)) { status = parse_part_string (&buffer, &buffer_len, vl.host, sizeof (vl.host)); } - else if (header->type == TYPE_PLUGIN) + else if (header->type == ntohs (TYPE_PLUGIN)) { status = parse_part_string (&buffer, &buffer_len, vl.plugin, sizeof (vl.plugin)); } - else if (header->type == TYPE_PLUGIN_INSTANCE) + else if (header->type == ntohs (TYPE_PLUGIN_INSTANCE)) { status = parse_part_string (&buffer, &buffer_len, vl.plugin_instance, sizeof (vl.plugin_instance)); } - else if (header->type == TYPE_TYPE) + else if (header->type == ntohs (TYPE_TYPE)) { status = parse_part_string (&buffer, &buffer_len, type, sizeof (type)); } - else if (header->type == TYPE_TYPE_INSTANCE) + else if (header->type == ntohs (TYPE_TYPE_INSTANCE)) { status = parse_part_string (&buffer, &buffer_len, vl.type_instance, sizeof (vl.type_instance)); @@ -587,6 +688,7 @@ static sockent_t *network_create_socket (const char *node, { if (network_bind_socket (se, ai_ptr) != 0) { + close (se->fd); free (se->addr); free (se); continue; @@ -767,131 +869,153 @@ int network_receive (void) static void *receive_thread (void *arg) { - return ((void *) network_receive ()); + return (network_receive () ? (void *) 1 : (void *) 0); } /* void *receive_thread */ -#if 0 -int network_send (char *type, char *inst, char *value) +static void network_send_buffer (const char *buffer, int buffer_len) { - char buf[BUFF_SIZE]; - int buflen; - sockent_t *se; - - int ret; int status; - DBG ("type = %s, inst = %s, value = %s", type, inst, value); + DBG ("buffer_len = %i", buffer_len); - assert (operating_mode == MODE_CLIENT); - - buflen = snprintf (buf, BUFF_SIZE, "%s %s %s", type, inst, value); - if ((buflen >= BUFF_SIZE) || (buflen < 1)) - { - syslog (LOG_WARNING, "network_send: snprintf failed.."); - return (-1); - } - buf[buflen] = '\0'; - buflen++; - - if (socklist_head == NULL) - network_create_default_socket (0 /* listen == false */); - - ret = 0; - for (se = socklist_head; se != NULL; se = se->next) + for (se = sending_sockets; se != NULL; se = se->next) { - while (1) + while (42) { - status = sendto (se->fd, buf, buflen, 0, + status = sendto (se->fd, buffer, buffer_len, 0 /* no flags */, (struct sockaddr *) se->addr, se->addrlen); - - if (status == -1) + if (status < 0) { if (errno == EINTR) - { - DBG ("sendto was interrupted"); continue; - } - else - { - syslog (LOG_ERR, "sendto: %s", strerror (errno)); - ret = -1; - break; - } + syslog (LOG_ERR, "network plugin: sendto failed: %s", + strerror (errno)); + break; } - else if (ret >= 0) - ret++; - break; - } - } - if (ret == 0) - syslog (LOG_WARNING, "Message wasn't sent to anybody.."); - - return (ret); -} /* int network_send */ -#endif + break; + } /* while (42) */ + } /* for (sending_sockets) */ +} /* void network_send_buffer */ -static int network_write (const data_set_t *ds, const value_list_t *vl) +static int add_to_buffer (char *buffer, int buffer_size, + value_list_t *vl_def, char *type_def, + const data_set_t *ds, const value_list_t *vl) { - char buf[BUFF_SIZE]; - char *buf_ptr; - int buf_len; + if (strcmp (vl_def->host, vl->host) != 0) + { + if (write_part_string (&buffer, &buffer_size, TYPE_HOST, + vl->host, strlen (vl->host)) != 0) + return (-1); + strcpy (vl_def->host, vl->host); + DBG ("host = %s", vl->host); + } - sockent_t *se; + if (vl_def->time != vl->time) + { + if (write_part_number (&buffer, &buffer_size, TYPE_TIME, + (uint64_t) vl->time)) + return (-1); + vl_def->time = vl->time; + DBG ("time = %u", (unsigned int) vl->time); + } - DBG ("host = %s; plugin = %s; plugin_instance = %s; type = %s; type_instance = %s;", - vl->host, vl->plugin, vl->plugin_instance, ds->type, vl->type_instance); + if (strcmp (vl_def->plugin, vl->plugin) != 0) + { + if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN, + vl->plugin, strlen (vl->plugin)) != 0) + return (-1); + strcpy (vl_def->plugin, vl->plugin); + DBG ("plugin = %s", vl->plugin); + } - buf_len = sizeof (buf); - buf_ptr = buf; - if (write_part_string (&buf_ptr, &buf_len, TYPE_HOST, - vl->host, strlen (vl->host)) != 0) - return (-1); - if (write_part_string (&buf_ptr, &buf_len, TYPE_PLUGIN, - vl->plugin, strlen (vl->plugin)) != 0) - return (-1); - if (strlen (vl->plugin_instance) > 0) - if (write_part_string (&buf_ptr, &buf_len, TYPE_PLUGIN_INSTANCE, + if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0) + { + if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE, vl->plugin_instance, strlen (vl->plugin_instance)) != 0) return (-1); - if (write_part_string (&buf_ptr, &buf_len, TYPE_TYPE, - ds->type, strlen (ds->type)) != 0) - return (-1); - if (strlen (vl->type_instance) > 0) - if (write_part_string (&buf_ptr, &buf_len, TYPE_PLUGIN_INSTANCE, + strcpy (vl_def->plugin_instance, vl->plugin_instance); + DBG ("plugin_instance = %s", vl->plugin_instance); + } + + if (strcmp (type_def, ds->type) != 0) + { + if (write_part_string (&buffer, &buffer_size, TYPE_TYPE, + ds->type, strlen (ds->type)) != 0) + return (-1); + strcpy (type_def, ds->type); + DBG ("type = %s", ds->type); + } + + if (strcmp (vl_def->type_instance, vl->type_instance) != 0) + { + if (write_part_string (&buffer, &buffer_size, TYPE_PLUGIN_INSTANCE, vl->type_instance, strlen (vl->type_instance)) != 0) return (-1); + strcpy (vl_def->type_instance, vl->type_instance); + DBG ("type_instance = %s", vl->type_instance); + } - write_part_values (&buf_ptr, &buf_len, ds, vl); + if (write_part_values (&buffer, &buffer_size, ds, vl) != 0) + return (-1); - buf_len = sizeof (buf) - buf_len; + return (buffer_size); +} /* int add_to_buffer */ - for (se = sending_sockets; se != NULL; se = se->next) +static void flush_buffer (void) +{ + network_send_buffer (send_buffer, send_buffer_fill); + send_buffer_ptr = send_buffer; + send_buffer_fill = 0; + memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl)); + memset (send_buffer_type, '\0', sizeof (send_buffer_type)); +} + +static int network_write (const data_set_t *ds, const value_list_t *vl) +{ + int status; + /* TODO: lock buffer */ + status = add_to_buffer (send_buffer_ptr, + sizeof (send_buffer) - send_buffer_fill, + &send_buffer_vl, send_buffer_type, + ds, vl); + if (status >= 0) { - int status; + send_buffer_fill += status; + send_buffer_ptr += status; + } + else + { + flush_buffer (); - while (42) + status = add_to_buffer (send_buffer_ptr, + sizeof (send_buffer) - send_buffer_fill, + &send_buffer_vl, send_buffer_type, + ds, vl); + + if (status >= 0) { - status = sendto (se->fd, buf, buf_len, 0 /* no flags */, - (struct sockaddr *) se->addr, se->addrlen); - if (status < 0) - { - if (errno == EINTR) - continue; - syslog (LOG_ERR, "network: sendto failed: %s", - strerror (errno)); - break; - } + send_buffer_fill += status; + send_buffer_ptr += status; + } + } - break; - } /* while (42) */ - } /* for (sending_sockets) */ + if (status < 0) + { + syslog (LOG_ERR, "network plugin: Unable to append to the " + "buffer for some weird reason"); + } + else if ((sizeof (send_buffer) - send_buffer_fill) < 15) + { + flush_buffer (); + } + /* TODO: unlock buffer */ - return 0; -} + return ((status < 0) ? -1 : 0); +} /* int network_write */ static int network_config (const char *key, const char *val) { @@ -955,6 +1079,11 @@ static int network_init (void) { plugin_register_shutdown ("network", network_shutdown); + send_buffer_ptr = send_buffer; + send_buffer_fill = 0; + memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl)); + memset (send_buffer_type, '\0', sizeof (send_buffer_type)); + /* setup socket(s) and so on */ if (sending_sockets != NULL) plugin_register_write ("network", network_write);