summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: f6201a7)
raw | patch | inline | side by side (parent: f6201a7)
author | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Wed, 23 Jan 2008 19:30:06 +0000 (20:30 +0100) | ||
committer | Florian Forster <octo@leeloo.lan.home.verplant.org> | |
Wed, 23 Jan 2008 19:31:03 +0000 (20:31 +0100) |
One that only receives and enqueues packets and one which parses the packets
and dispatches them to the daemon. This should solve problems with (too) short
socket buffers and (very) heavy load.
and dispatches them to the daemon. This should solve problems with (too) short
socket buffers and (very) heavy load.
src/network.c | patch | blob | history |
diff --git a/src/network.c b/src/network.c
index 19344312e1edf22b1d81b42adee3aa3a5dc3a4f2..17038a45a0a9e09416b0ce5d28f43bafd005469e 100644 (file)
--- a/src/network.c
+++ b/src/network.c
};
typedef struct part_values_s part_values_t;
+struct receive_list_entry_s
+{
+ char data[BUFF_SIZE];
+ int data_len;
+ struct receive_list_entry_s *next;
+};
+typedef struct receive_list_entry_s receive_list_entry_t;
+
/*
* Private variables
*/
static sockent_t *sending_sockets = NULL;
+static receive_list_entry_t *receive_list_head = NULL;
+static receive_list_entry_t *receive_list_tail = NULL;
+static pthread_mutex_t receive_list_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t receive_list_cond = PTHREAD_COND_INITIALIZER;
+
static struct pollfd *listen_sockets = NULL;
static int listen_sockets_num = 0;
-static pthread_t listen_thread = 0;
+
static int listen_loop = 0;
+static pthread_t receive_thread_id = 0;
+static pthread_t dispatch_thread_id = 0;
static char send_buffer[BUFF_SIZE];
static char *send_buffer_ptr;
return (0);
} /* int network_get_listen_socket */
+static void *dispatch_thread (void *arg)
+{
+ while (42)
+ {
+ receive_list_entry_t *ent;
+
+ /* Lock and wait for more data to come in */
+ pthread_mutex_lock (&receive_list_lock);
+ while ((listen_loop == 0)
+ && (receive_list_head == NULL))
+ pthread_cond_wait (&receive_list_cond, &receive_list_lock);
+
+ /* Remove the head entry and unlock */
+ ent = receive_list_head;
+ if (ent != NULL)
+ receive_list_head = ent->next;
+ pthread_mutex_unlock (&receive_list_lock);
+
+ /* Check whether we are supposed to exit. We do NOT check `listen_loop'
+ * because we dispatch all missing packets before shutting down. */
+ if (ent == NULL)
+ break;
+
+ parse_packet (ent->data, ent->data_len);
+
+ sfree (ent);
+ } /* while (42) */
+
+ return (NULL);
+} /* void *receive_thread */
+
static int network_receive (void)
{
char buffer[BUFF_SIZE];
for (i = 0; (i < listen_sockets_num) && (status > 0); i++)
{
+ receive_list_entry_t *ent;
+
if ((listen_sockets[i].revents & (POLLIN | POLLPRI)) == 0)
continue;
status--;
return (-1);
}
- parse_packet (buffer, buffer_len);
+ ent = malloc (sizeof (receive_list_entry_t));
+ if (ent == NULL)
+ {
+ ERROR ("network plugin: malloc failed.");
+ return (-1);
+ }
+ memset (ent, '\0', sizeof (receive_list_entry_t));
+
+ /* Hopefully this be optimized out by the compiler. It
+ * might help prevent stupid bugs in the future though.
+ */
+ assert (sizeof (ent->data) == sizeof (buffer));
+
+ memcpy (ent->data, buffer, buffer_len);
+ ent->data_len = buffer_len;
+
+ pthread_mutex_lock (&receive_list_lock);
+ if (receive_list_head == NULL)
+ {
+ receive_list_head = ent;
+ receive_list_tail = ent;
+ }
+ else
+ {
+ receive_list_tail->next = ent;
+ receive_list_tail = ent;
+ }
+ pthread_cond_signal (&receive_list_cond);
+ pthread_mutex_unlock (&receive_list_lock);
} /* for (listen_sockets) */
} /* while (listen_loop == 0) */
{
listen_loop++;
- if (listen_thread != (pthread_t) 0)
+ /* Kill the listening thread */
+ if (receive_thread_id != (pthread_t) 0)
{
- pthread_kill (listen_thread, SIGTERM);
- pthread_join (listen_thread, NULL /* no return value */);
- listen_thread = (pthread_t) 0;
+ pthread_kill (receive_thread_id, SIGTERM);
+ pthread_join (receive_thread_id, NULL /* no return value */);
+ receive_thread_id = (pthread_t) 0;
}
+ /* Shutdown the dispatching thread */
+ if (dispatch_thread_id != (pthread_t) 0)
+ pthread_cond_broadcast (&receive_list_cond);
+
if (send_buffer_fill > 0)
flush_buffer ();
if (sending_sockets != NULL)
plugin_register_write ("network", network_write);
- if ((listen_sockets_num != 0) && (listen_thread == 0))
+ if ((listen_sockets_num != 0) && (receive_thread_id == 0))
{
int status;
- status = pthread_create (&listen_thread, NULL /* no attributes */,
- receive_thread, NULL /* no argument */);
+ status = pthread_create (&dispatch_thread_id,
+ NULL /* no attributes */,
+ dispatch_thread,
+ NULL /* no argument */);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("network: pthread_create failed: %s",
+ sstrerror (errno, errbuf,
+ sizeof (errbuf)));
+ }
+ status = pthread_create (&receive_thread_id,
+ NULL /* no attributes */,
+ receive_thread,
+ NULL /* no argument */);
if (status != 0)
{
char errbuf[1024];