summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 3d018d3)
raw | patch | inline | side by side (parent: 3d018d3)
author | Sebastian Harl <sh@tokkee.org> | |
Sun, 20 Oct 2013 13:35:35 +0000 (15:35 +0200) | ||
committer | Sebastian Harl <sh@tokkee.org> | |
Sun, 20 Oct 2013 13:35:35 +0000 (15:35 +0200) |
This function behaves similar to select(2) providing to passively block on a
channel operation until it becomes ready for reading and / or writing.
Optionally, a deadline may be specified.
channel operation until it becomes ready for reading and / or writing.
Optionally, a deadline may be specified.
src/include/utils/channel.h | patch | blob | history | |
src/utils/channel.c | patch | blob | history |
index 3eb7dc1897feab7639ec289ceb408089216f932b..ca450ce22282dc2af98b7889f7fc15189a544b86 100644 (file)
#include "core/object.h"
+#include <sys/time.h>
+
#ifdef __cplusplus
extern "C" {
#endif
int
sdb_channel_read(sdb_channel_t *chan, void *data);
+/*
+ * sdb_channel_select:
+ * Wait for a channel to become "ready" for I/O. A channel is considered ready
+ * if it is possible to perform the corresponding I/O operation successfully
+ * *in some thread*. In case 'wantread' or 'read_data' is non-NULL, wait for
+ * data to be available in the channel for reading. In case 'wantwrite' or
+ * 'write_data' is non-NULL, wait for buffer space to be available for writing
+ * to the channel. If non-NULL, the value pointed to by the 'want...'
+ * arguments will be "true" iff the respective operation is ready. If the
+ * '..._data' arguments are non-NULL, the respective operation is executed
+ * atomically once the channel is ready for it. If 'abstime' is specified, the
+ * operation will time out with an error if the specified absolute time has
+ * passed.
+ */
+int
+sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data,
+ int *wantwrite, void *write_data, const struct timespec *timeout);
+
#ifdef __cplusplus
} /* extern "C" */
#endif
diff --git a/src/utils/channel.c b/src/utils/channel.c
index c57b09d66123ba338925103f34d634cb861a4589..22e31a8b4eb0303e063ca6b3bfae2e0e8dcb19ad 100644 (file)
--- a/src/utils/channel.c
+++ b/src/utils/channel.c
struct sdb_channel {
pthread_mutex_t lock;
+ /* signaling for select() operation */
+ pthread_cond_t cond;
+
/* maybe TODO: add support for 'nil' values using a boolean area */
void *data;
#define TAIL(chan) ELEM(chan, (chan)->tail)
#define HEAD(chan) ELEM(chan, (chan)->head)
-/* Insert a new element at the end. */
+/* Insert a new element at the end; chan->lock must be held.
+ * Returns 0 if data has been written or if data may be written
+ * if 'data' is NULL. */
static int
channel_write(sdb_channel_t *chan, const void *data)
{
- assert(chan && data);
+ assert(chan);
if (chan->full)
return -1;
+ else if (! data)
+ return 0;
memcpy(TAIL(chan), data, chan->elem_size);
chan->tail = NEXT_WRITE(chan);
chan->full = chan->tail == chan->head;
+ pthread_cond_broadcast(&chan->cond);
return 0;
} /* channel_write */
-/* retrieve the first element */
+/* Retrieve the first element; chan->lock must be held.
+ * Returns 0 if data has been read or if data is available
+ * if 'data' is NULL. */
static int
channel_read(sdb_channel_t *chan, void *data)
{
- assert(chan && data);
+ assert(chan);
if ((chan->head == chan->tail) && (! chan->full))
return -1;
+ else if (! data)
+ return 0;
memcpy(data, HEAD(chan), chan->elem_size);
chan->head = NEXT_READ(chan);
chan->full = 0;
+ pthread_cond_broadcast(&chan->cond);
return 0;
} /* channel_read */
chan->elem_size = elem_size;
pthread_mutex_init(&chan->lock, /* attr = */ NULL);
+ pthread_cond_init(&chan->cond, /* attr = */ NULL);
chan->head = chan->tail = 0;
return chan;
chan->data = NULL;
chan->data_len = 0;
+ pthread_cond_destroy(&chan->cond);
+
pthread_mutex_unlock(&chan->lock);
pthread_mutex_destroy(&chan->lock);
free(chan);
} /* sdb_channel_destroy */
+int
+sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data,
+ int *wantwrite, void *write_data, const struct timespec *timeout)
+{
+ int status = 0;
+
+ if (! chan)
+ return -1;
+
+ if ((! wantread) && (! read_data) && (! wantwrite) && (! write_data))
+ return -1;
+
+ pthread_mutex_lock(&chan->lock);
+ while (! status) {
+ int read_status, write_status;
+
+ read_status = channel_read(chan, read_data);
+ write_status = channel_write(chan, write_data);
+
+ if ((! read_status) || (! write_status)) {
+ if (wantread)
+ *wantread = read_status == 0;
+ if (wantwrite)
+ *wantwrite = write_status == 0;
+
+ if (((wantread || read_data) && (! read_status))
+ || ((wantwrite || write_data) && (! write_status)))
+ break;
+ }
+
+ if (timeout)
+ status = pthread_cond_timedwait(&chan->cond, &chan->lock,
+ timeout);
+ else
+ status = pthread_cond_wait(&chan->cond, &chan->lock);
+ }
+
+ pthread_mutex_unlock(&chan->lock);
+ return status;
+} /* sdb_channel_select */
+
int
sdb_channel_write(sdb_channel_t *chan, const void *data)
{