X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Futils%2Fchannel.c;h=a45247a71060503b55589655e2cd6711dd1bcd7f;hb=a6753b33e70f0934d4b34b19366eddda56f05d9c;hp=1b687845a0f14a1f3c4b37d9d1b51f61df17fa80;hpb=19ab20f21c0fa168de54621e72121cc225d1d235;p=sysdb.git diff --git a/src/utils/channel.c b/src/utils/channel.c index 1b68784..a45247a 100644 --- a/src/utils/channel.c +++ b/src/utils/channel.c @@ -29,9 +29,13 @@ #include +#include + #include #include +#include + #include /* @@ -39,7 +43,10 @@ */ struct sdb_channel { - pthread_rwlock_t lock; + pthread_mutex_t lock; + + /* signaling for select() operation */ + pthread_cond_t cond; /* maybe TODO: add support for 'nil' values using a boolean area */ @@ -50,6 +57,8 @@ struct sdb_channel { size_t head; size_t tail; _Bool full; + + _Bool shutdown; }; /* @@ -64,35 +73,45 @@ struct sdb_channel { #define TAIL(chan) ELEM(chan, (chan)->tail) #define HEAD(chan) ELEM(chan, (chan)->head) -/* Insert a new element at the end. */ +/* Insert a new element at the end; chan->lock must be held. + * Returns 0 if data has been written or if data may be written + * if 'data' is NULL. */ static int channel_write(sdb_channel_t *chan, const void *data) { - assert(chan && data); + assert(chan); - if (chan->full) + if (chan->full || chan->shutdown) return -1; + else if (! data) + return 0; memcpy(TAIL(chan), data, chan->elem_size); chan->tail = NEXT_WRITE(chan); chan->full = chan->tail == chan->head; + pthread_cond_broadcast(&chan->cond); return 0; } /* channel_write */ -/* retrieve the first element */ +/* Retrieve the first element; chan->lock must be held. + * Returns 0 if data has been read or if data is available + * if 'data' is NULL. */ static int channel_read(sdb_channel_t *chan, void *data) { - assert(chan && data); + assert(chan); if ((chan->head == chan->tail) && (! chan->full)) return -1; + else if (! data) + return 0; memcpy(data, HEAD(chan), chan->elem_size); chan->head = NEXT_READ(chan); chan->full = 0; + pthread_cond_broadcast(&chan->cond); return 0; } /* channel_read */ @@ -123,7 +142,8 @@ sdb_channel_create(size_t size, size_t elem_size) chan->data_len = size; chan->elem_size = elem_size; - pthread_rwlock_init(&chan->lock, /* attr = */ NULL); + pthread_mutex_init(&chan->lock, /* attr = */ NULL); + pthread_cond_init(&chan->cond, /* attr = */ NULL); chan->head = chan->tail = 0; return chan; @@ -135,16 +155,89 @@ sdb_channel_destroy(sdb_channel_t *chan) if (! chan) return; - pthread_rwlock_wrlock(&chan->lock); + pthread_mutex_lock(&chan->lock); free(chan->data); chan->data = NULL; chan->data_len = 0; - pthread_rwlock_unlock(&chan->lock); - pthread_rwlock_destroy(&chan->lock); + pthread_cond_destroy(&chan->cond); + + pthread_mutex_unlock(&chan->lock); + pthread_mutex_destroy(&chan->lock); free(chan); } /* sdb_channel_destroy */ +int +sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data, + int *wantwrite, void *write_data, const struct timespec *timeout) +{ + int status = 0; + + if (! chan) { + errno = EINVAL; + return -1; + } + + if ((! wantread) && (! read_data) && (! wantwrite) && (! write_data)) { + errno = EINVAL; + return -1; + } + + pthread_mutex_lock(&chan->lock); + while (! status) { + int read_status, write_status; + + read_status = channel_read(chan, read_data); + write_status = channel_write(chan, write_data); + + if ((! read_status) || (! write_status)) { + if (wantread) + *wantread = read_status == 0; + if (wantwrite) + *wantwrite = write_status == 0; + + if (((wantread || read_data) && (! read_status)) + || ((wantwrite || write_data) && (! write_status))) + break; + } + + if (chan->shutdown) { + if (read_status) + status = EBADF; + break; + } + + if (timeout) { + struct timespec abstime; + + if (clock_gettime(CLOCK_REALTIME, &abstime)) { + pthread_mutex_unlock(&chan->lock); + return -1; + } + + abstime.tv_sec += timeout->tv_sec; + abstime.tv_nsec += timeout->tv_nsec; + + if (abstime.tv_nsec > 1000000000) { + abstime.tv_nsec -= 1000000000; + abstime.tv_sec += 1; + } + + status = pthread_cond_timedwait(&chan->cond, &chan->lock, + &abstime); + } + else + status = pthread_cond_wait(&chan->cond, &chan->lock); + } + pthread_mutex_unlock(&chan->lock); + + if (status) { + errno = status; + return -1; + } + return 0; +} /* sdb_channel_select */ + int sdb_channel_write(sdb_channel_t *chan, const void *data) { @@ -153,9 +246,9 @@ sdb_channel_write(sdb_channel_t *chan, const void *data) if ((! chan) || (! data)) return -1; - pthread_rwlock_wrlock(&chan->lock); + pthread_mutex_lock(&chan->lock); status = channel_write(chan, data); - pthread_rwlock_unlock(&chan->lock); + pthread_mutex_unlock(&chan->lock); return status; } /* sdb_channel_write */ @@ -167,11 +260,20 @@ sdb_channel_read(sdb_channel_t *chan, void *data) if ((! chan) || (! data)) return -1; - pthread_rwlock_wrlock(&chan->lock); + pthread_mutex_lock(&chan->lock); status = channel_read(chan, data); - pthread_rwlock_unlock(&chan->lock); + pthread_mutex_unlock(&chan->lock); return status; } /* sdb_channel_read */ +int +sdb_channel_shutdown(sdb_channel_t *chan) +{ + if (! chan) + return -1; + chan->shutdown = 1; + return 0; +} /* sdb_channel_shutdown */ + /* vim: set tw=78 sw=4 ts=4 noexpandtab : */