Code

utils channel: Added sdb_channel_select().
authorSebastian Harl <sh@tokkee.org>
Sun, 20 Oct 2013 13:35:35 +0000 (15:35 +0200)
committerSebastian Harl <sh@tokkee.org>
Sun, 20 Oct 2013 13:35:35 +0000 (15:35 +0200)
This function behaves similar to select(2) providing to passively block on a
channel operation until it becomes ready for reading and / or writing.
Optionally, a deadline may be specified.

src/include/utils/channel.h
src/utils/channel.c

index 3eb7dc1897feab7639ec289ceb408089216f932b..ca450ce22282dc2af98b7889f7fc15189a544b86 100644 (file)
@@ -30,6 +30,8 @@
 
 #include "core/object.h"
 
 
 #include "core/object.h"
 
+#include <sys/time.h>
+
 #ifdef __cplusplus
 extern "C" {
 #endif
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -89,6 +91,24 @@ sdb_channel_write(sdb_channel_t *chan, const void *data);
 int
 sdb_channel_read(sdb_channel_t *chan, void *data);
 
 int
 sdb_channel_read(sdb_channel_t *chan, void *data);
 
+/*
+ * sdb_channel_select:
+ * Wait for a channel to become "ready" for I/O. A channel is considered ready
+ * if it is possible to perform the corresponding I/O operation successfully
+ * *in some thread*. In case 'wantread' or 'read_data' is non-NULL, wait for
+ * data to be available in the channel for reading. In case 'wantwrite' or
+ * 'write_data' is non-NULL, wait for buffer space to be available for writing
+ * to the channel. If non-NULL, the value pointed to by the 'want...'
+ * arguments will be "true" iff the respective operation is ready. If the
+ * '..._data' arguments are non-NULL, the respective operation is executed
+ * atomically once the channel is ready for it. If 'abstime' is specified, the
+ * operation will time out with an error if the specified absolute time has
+ * passed.
+ */
+int
+sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data,
+               int *wantwrite, void *write_data, const struct timespec *timeout);
+
 #ifdef __cplusplus
 } /* extern "C" */
 #endif
 #ifdef __cplusplus
 } /* extern "C" */
 #endif
index c57b09d66123ba338925103f34d634cb861a4589..22e31a8b4eb0303e063ca6b3bfae2e0e8dcb19ad 100644 (file)
@@ -41,6 +41,9 @@
 struct sdb_channel {
        pthread_mutex_t lock;
 
 struct sdb_channel {
        pthread_mutex_t lock;
 
+       /* signaling for select() operation */
+       pthread_cond_t cond;
+
        /* maybe TODO: add support for 'nil' values using a boolean area */
 
        void  *data;
        /* maybe TODO: add support for 'nil' values using a boolean area */
 
        void  *data;
@@ -64,35 +67,45 @@ struct sdb_channel {
 #define TAIL(chan) ELEM(chan, (chan)->tail)
 #define HEAD(chan) ELEM(chan, (chan)->head)
 
 #define TAIL(chan) ELEM(chan, (chan)->tail)
 #define HEAD(chan) ELEM(chan, (chan)->head)
 
-/* Insert a new element at the end. */
+/* Insert a new element at the end; chan->lock must be held.
+ * Returns 0 if data has been written or if data may be written
+ * if 'data' is NULL. */
 static int
 channel_write(sdb_channel_t *chan, const void *data)
 {
 static int
 channel_write(sdb_channel_t *chan, const void *data)
 {
-       assert(chan && data);
+       assert(chan);
 
        if (chan->full)
                return -1;
 
        if (chan->full)
                return -1;
+       else if (! data)
+               return 0;
 
        memcpy(TAIL(chan), data, chan->elem_size);
        chan->tail = NEXT_WRITE(chan);
 
        chan->full = chan->tail == chan->head;
 
        memcpy(TAIL(chan), data, chan->elem_size);
        chan->tail = NEXT_WRITE(chan);
 
        chan->full = chan->tail == chan->head;
+       pthread_cond_broadcast(&chan->cond);
        return 0;
 } /* channel_write */
 
        return 0;
 } /* channel_write */
 
-/* retrieve the first element */
+/* Retrieve the first element; chan->lock must be held.
+ * Returns 0 if data has been read or if data is available
+ * if 'data' is NULL. */
 static int
 channel_read(sdb_channel_t *chan, void *data)
 {
 static int
 channel_read(sdb_channel_t *chan, void *data)
 {
-       assert(chan && data);
+       assert(chan);
 
        if ((chan->head == chan->tail) && (! chan->full))
                return -1;
 
        if ((chan->head == chan->tail) && (! chan->full))
                return -1;
+       else if (! data)
+               return 0;
 
        memcpy(data, HEAD(chan), chan->elem_size);
        chan->head = NEXT_READ(chan);
 
        chan->full = 0;
 
        memcpy(data, HEAD(chan), chan->elem_size);
        chan->head = NEXT_READ(chan);
 
        chan->full = 0;
+       pthread_cond_broadcast(&chan->cond);
        return 0;
 } /* channel_read */
 
        return 0;
 } /* channel_read */
 
@@ -124,6 +137,7 @@ sdb_channel_create(size_t size, size_t elem_size)
        chan->elem_size = elem_size;
 
        pthread_mutex_init(&chan->lock, /* attr = */ NULL);
        chan->elem_size = elem_size;
 
        pthread_mutex_init(&chan->lock, /* attr = */ NULL);
+       pthread_cond_init(&chan->cond, /* attr = */ NULL);
 
        chan->head = chan->tail = 0;
        return chan;
 
        chan->head = chan->tail = 0;
        return chan;
@@ -140,11 +154,54 @@ sdb_channel_destroy(sdb_channel_t *chan)
        chan->data = NULL;
        chan->data_len = 0;
 
        chan->data = NULL;
        chan->data_len = 0;
 
+       pthread_cond_destroy(&chan->cond);
+
        pthread_mutex_unlock(&chan->lock);
        pthread_mutex_destroy(&chan->lock);
        free(chan);
 } /* sdb_channel_destroy */
 
        pthread_mutex_unlock(&chan->lock);
        pthread_mutex_destroy(&chan->lock);
        free(chan);
 } /* sdb_channel_destroy */
 
+int
+sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data,
+               int *wantwrite, void *write_data, const struct timespec *timeout)
+{
+       int status = 0;
+
+       if (! chan)
+               return -1;
+
+       if ((! wantread) && (! read_data) && (! wantwrite) && (! write_data))
+               return -1;
+
+       pthread_mutex_lock(&chan->lock);
+       while (! status) {
+               int read_status, write_status;
+
+               read_status = channel_read(chan, read_data);
+               write_status = channel_write(chan, write_data);
+
+               if ((! read_status) || (! write_status)) {
+                       if (wantread)
+                               *wantread = read_status == 0;
+                       if (wantwrite)
+                               *wantwrite = write_status == 0;
+
+                       if (((wantread || read_data) && (! read_status))
+                                       || ((wantwrite || write_data) && (! write_status)))
+                               break;
+               }
+
+               if (timeout)
+                       status = pthread_cond_timedwait(&chan->cond, &chan->lock,
+                                       timeout);
+               else
+                       status = pthread_cond_wait(&chan->cond, &chan->lock);
+       }
+
+       pthread_mutex_unlock(&chan->lock);
+       return status;
+} /* sdb_channel_select */
+
 int
 sdb_channel_write(sdb_channel_t *chan, const void *data)
 {
 int
 sdb_channel_write(sdb_channel_t *chan, const void *data)
 {