From: Sebastian Harl Date: Sun, 20 Oct 2013 13:35:35 +0000 (+0200) Subject: utils channel: Added sdb_channel_select(). X-Git-Tag: sysdb-0.1.0~336^2~55 X-Git-Url: https://git.tokkee.org/?p=sysdb.git;a=commitdiff_plain;h=b2ea91bbb3fd18b562fd3e740599af17a113e6d2 utils channel: Added sdb_channel_select(). This function behaves similar to select(2) providing to passively block on a channel operation until it becomes ready for reading and / or writing. Optionally, a deadline may be specified. --- diff --git a/src/include/utils/channel.h b/src/include/utils/channel.h index 3eb7dc1..ca450ce 100644 --- a/src/include/utils/channel.h +++ b/src/include/utils/channel.h @@ -30,6 +30,8 @@ #include "core/object.h" +#include + #ifdef __cplusplus extern "C" { #endif @@ -89,6 +91,24 @@ sdb_channel_write(sdb_channel_t *chan, const void *data); int sdb_channel_read(sdb_channel_t *chan, void *data); +/* + * sdb_channel_select: + * Wait for a channel to become "ready" for I/O. A channel is considered ready + * if it is possible to perform the corresponding I/O operation successfully + * *in some thread*. In case 'wantread' or 'read_data' is non-NULL, wait for + * data to be available in the channel for reading. In case 'wantwrite' or + * 'write_data' is non-NULL, wait for buffer space to be available for writing + * to the channel. If non-NULL, the value pointed to by the 'want...' + * arguments will be "true" iff the respective operation is ready. If the + * '..._data' arguments are non-NULL, the respective operation is executed + * atomically once the channel is ready for it. If 'abstime' is specified, the + * operation will time out with an error if the specified absolute time has + * passed. + */ +int +sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data, + int *wantwrite, void *write_data, const struct timespec *timeout); + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/src/utils/channel.c b/src/utils/channel.c index c57b09d..22e31a8 100644 --- a/src/utils/channel.c +++ b/src/utils/channel.c @@ -41,6 +41,9 @@ struct sdb_channel { pthread_mutex_t lock; + /* signaling for select() operation */ + pthread_cond_t cond; + /* maybe TODO: add support for 'nil' values using a boolean area */ void *data; @@ -64,35 +67,45 @@ struct sdb_channel { #define TAIL(chan) ELEM(chan, (chan)->tail) #define HEAD(chan) ELEM(chan, (chan)->head) -/* Insert a new element at the end. */ +/* Insert a new element at the end; chan->lock must be held. + * Returns 0 if data has been written or if data may be written + * if 'data' is NULL. */ static int channel_write(sdb_channel_t *chan, const void *data) { - assert(chan && data); + assert(chan); if (chan->full) return -1; + else if (! data) + return 0; memcpy(TAIL(chan), data, chan->elem_size); chan->tail = NEXT_WRITE(chan); chan->full = chan->tail == chan->head; + pthread_cond_broadcast(&chan->cond); return 0; } /* channel_write */ -/* retrieve the first element */ +/* Retrieve the first element; chan->lock must be held. + * Returns 0 if data has been read or if data is available + * if 'data' is NULL. */ static int channel_read(sdb_channel_t *chan, void *data) { - assert(chan && data); + assert(chan); if ((chan->head == chan->tail) && (! chan->full)) return -1; + else if (! data) + return 0; memcpy(data, HEAD(chan), chan->elem_size); chan->head = NEXT_READ(chan); chan->full = 0; + pthread_cond_broadcast(&chan->cond); return 0; } /* channel_read */ @@ -124,6 +137,7 @@ sdb_channel_create(size_t size, size_t elem_size) chan->elem_size = elem_size; pthread_mutex_init(&chan->lock, /* attr = */ NULL); + pthread_cond_init(&chan->cond, /* attr = */ NULL); chan->head = chan->tail = 0; return chan; @@ -140,11 +154,54 @@ sdb_channel_destroy(sdb_channel_t *chan) chan->data = NULL; chan->data_len = 0; + pthread_cond_destroy(&chan->cond); + pthread_mutex_unlock(&chan->lock); pthread_mutex_destroy(&chan->lock); free(chan); } /* sdb_channel_destroy */ +int +sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data, + int *wantwrite, void *write_data, const struct timespec *timeout) +{ + int status = 0; + + if (! chan) + return -1; + + if ((! wantread) && (! read_data) && (! wantwrite) && (! write_data)) + return -1; + + pthread_mutex_lock(&chan->lock); + while (! status) { + int read_status, write_status; + + read_status = channel_read(chan, read_data); + write_status = channel_write(chan, write_data); + + if ((! read_status) || (! write_status)) { + if (wantread) + *wantread = read_status == 0; + if (wantwrite) + *wantwrite = write_status == 0; + + if (((wantread || read_data) && (! read_status)) + || ((wantwrite || write_data) && (! write_status))) + break; + } + + if (timeout) + status = pthread_cond_timedwait(&chan->cond, &chan->lock, + timeout); + else + status = pthread_cond_wait(&chan->cond, &chan->lock); + } + + pthread_mutex_unlock(&chan->lock); + return status; +} /* sdb_channel_select */ + int sdb_channel_write(sdb_channel_t *chan, const void *data) {