diff --git a/src/utils/channel.c b/src/utils/channel.c
index c57b09d66123ba338925103f34d634cb861a4589..a45247a71060503b55589655e2cd6711dd1bcd7f 100644 (file)
--- a/src/utils/channel.c
+++ b/src/utils/channel.c
#include <assert.h>
+#include <errno.h>
+
#include <stdlib.h>
#include <string.h>
+#include <time.h>
+
#include <pthread.h>
/*
struct sdb_channel {
pthread_mutex_t lock;
+ /* signaling for select() operation */
+ pthread_cond_t cond;
+
/* maybe TODO: add support for 'nil' values using a boolean area */
void *data;
size_t head;
size_t tail;
_Bool full;
+
+ _Bool shutdown;
};
/*
#define TAIL(chan) ELEM(chan, (chan)->tail)
#define HEAD(chan) ELEM(chan, (chan)->head)
-/* Insert a new element at the end. */
+/* Insert a new element at the end; chan->lock must be held.
+ * Returns 0 if data has been written or if data may be written
+ * if 'data' is NULL. */
static int
channel_write(sdb_channel_t *chan, const void *data)
{
- assert(chan && data);
+ assert(chan);
- if (chan->full)
+ if (chan->full || chan->shutdown)
return -1;
+ else if (! data)
+ return 0;
memcpy(TAIL(chan), data, chan->elem_size);
chan->tail = NEXT_WRITE(chan);
chan->full = chan->tail == chan->head;
+ pthread_cond_broadcast(&chan->cond);
return 0;
} /* channel_write */
-/* retrieve the first element */
+/* Retrieve the first element; chan->lock must be held.
+ * Returns 0 if data has been read or if data is available
+ * if 'data' is NULL. */
static int
channel_read(sdb_channel_t *chan, void *data)
{
- assert(chan && data);
+ assert(chan);
if ((chan->head == chan->tail) && (! chan->full))
return -1;
+ else if (! data)
+ return 0;
memcpy(data, HEAD(chan), chan->elem_size);
chan->head = NEXT_READ(chan);
chan->full = 0;
+ pthread_cond_broadcast(&chan->cond);
return 0;
} /* channel_read */
chan->elem_size = elem_size;
pthread_mutex_init(&chan->lock, /* attr = */ NULL);
+ pthread_cond_init(&chan->cond, /* attr = */ NULL);
chan->head = chan->tail = 0;
return chan;
chan->data = NULL;
chan->data_len = 0;
+ pthread_cond_destroy(&chan->cond);
+
pthread_mutex_unlock(&chan->lock);
pthread_mutex_destroy(&chan->lock);
free(chan);
} /* sdb_channel_destroy */
+int
+sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data,
+ int *wantwrite, void *write_data, const struct timespec *timeout)
+{
+ int status = 0;
+
+ if (! chan) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if ((! wantread) && (! read_data) && (! wantwrite) && (! write_data)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ pthread_mutex_lock(&chan->lock);
+ while (! status) {
+ int read_status, write_status;
+
+ read_status = channel_read(chan, read_data);
+ write_status = channel_write(chan, write_data);
+
+ if ((! read_status) || (! write_status)) {
+ if (wantread)
+ *wantread = read_status == 0;
+ if (wantwrite)
+ *wantwrite = write_status == 0;
+
+ if (((wantread || read_data) && (! read_status))
+ || ((wantwrite || write_data) && (! write_status)))
+ break;
+ }
+
+ if (chan->shutdown) {
+ if (read_status)
+ status = EBADF;
+ break;
+ }
+
+ if (timeout) {
+ struct timespec abstime;
+
+ if (clock_gettime(CLOCK_REALTIME, &abstime)) {
+ pthread_mutex_unlock(&chan->lock);
+ return -1;
+ }
+
+ abstime.tv_sec += timeout->tv_sec;
+ abstime.tv_nsec += timeout->tv_nsec;
+
+ if (abstime.tv_nsec > 1000000000) {
+ abstime.tv_nsec -= 1000000000;
+ abstime.tv_sec += 1;
+ }
+
+ status = pthread_cond_timedwait(&chan->cond, &chan->lock,
+ &abstime);
+ }
+ else
+ status = pthread_cond_wait(&chan->cond, &chan->lock);
+ }
+ pthread_mutex_unlock(&chan->lock);
+
+ if (status) {
+ errno = status;
+ return -1;
+ }
+ return 0;
+} /* sdb_channel_select */
+
int
sdb_channel_write(sdb_channel_t *chan, const void *data)
{
return status;
} /* sdb_channel_read */
+int
+sdb_channel_shutdown(sdb_channel_t *chan)
+{
+ if (! chan)
+ return -1;
+ chan->shutdown = 1;
+ return 0;
+} /* sdb_channel_shutdown */
+
/* vim: set tw=78 sw=4 ts=4 noexpandtab : */