From 19ab20f21c0fa168de54621e72121cc225d1d235 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Sat, 19 Oct 2013 10:50:11 +0200 Subject: [PATCH] utils/channel: Added an asynchronous I/O multiplexer. A channel is a buffered pipe supporting multiple parallel readers and writers. --- src/Makefile.am | 1 + src/include/utils/channel.h | 99 ++++++++++++++++++++ src/utils/channel.c | 177 ++++++++++++++++++++++++++++++++++++ 3 files changed, 277 insertions(+) create mode 100644 src/include/utils/channel.h create mode 100644 src/utils/channel.c diff --git a/src/Makefile.am b/src/Makefile.am index 3cdb58e..48e77eb 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -25,6 +25,7 @@ libsysdb_la_SOURCES = \ core/store.c include/core/store.h \ include/core/data.h \ core/error.c include/core/error.h \ + utils/channel.c include/utils/channel.h \ utils/llist.c include/utils/llist.h \ utils/strbuf.c include/utils/strbuf.h \ core/time.c include/core/time.h \ diff --git a/src/include/utils/channel.h b/src/include/utils/channel.h new file mode 100644 index 0000000..3eb7dc1 --- /dev/null +++ b/src/include/utils/channel.h @@ -0,0 +1,99 @@ +/* + * SysDB - src/include/utils/channel.h + * Copyright (C) 2013 Sebastian 'tokkee' Harl + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef SDB_UTILS_CHANNEL_H +#define SDB_UTILS_CHANNEL_H 1 + +#include "core/object.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * A channel is an asynchronous I/O multiplexer supporting multiple parallel + * readers and writers. A channel may be buffered (depending on its 'size' + * attribute). Writing fails unless buffer space is available and reading + * fails if no data is available. + */ + +struct sdb_channel; +typedef struct sdb_channel sdb_channel_t; + +/* + * sdb_channel_create: + * Create a new channel for elements of size 'elem_size'. At most, 'size' + * elements can be buffered in the channel (default: 1). + * + * Returns: + * - a channel object on success + * - a negative value else + */ +sdb_channel_t * +sdb_channel_create(size_t size, size_t elem_size); + +/* + * sdb_channel_destroy: + * Removing all pending data and destroy the specified channel freeing its + * memory. + */ +void +sdb_channel_destroy(sdb_channel_t *chan); + +/* + * sdb_channel_write: + * Write an element to a channel. The memory pointed to by 'data' is copied to + * the buffer based on the channel's element size. + * + * Returns: + * - 0 on success + * - a negative value else + */ +int +sdb_channel_write(sdb_channel_t *chan, const void *data); + +/* + * sdb_channel_read: + * Read an element from a channel. The element is copied to the location + * pointed to by 'data' which needs to be large enough to store a whole + * element based on the channel's element size. + * + * Returns: + * - 0 on success + * - a negative value else + */ +int +sdb_channel_read(sdb_channel_t *chan, void *data); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* ! SDB_UTILS_CHANNEL_H */ + +/* vim: set tw=78 sw=4 ts=4 noexpandtab : */ + diff --git a/src/utils/channel.c b/src/utils/channel.c new file mode 100644 index 0000000..1b68784 --- /dev/null +++ b/src/utils/channel.c @@ -0,0 +1,177 @@ +/* + * SysDB - src/utils/channel.c + * Copyright (C) 2013 Sebastian 'tokkee' Harl + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "utils/channel.h" + +#include + +#include +#include + +#include + +/* + * private data types + */ + +struct sdb_channel { + pthread_rwlock_t lock; + + /* maybe TODO: add support for 'nil' values using a boolean area */ + + void *data; + size_t data_len; + size_t elem_size; + + size_t head; + size_t tail; + _Bool full; +}; + +/* + * private helper functions + */ + +#define NEXT_WRITE(chan) (((chan)->tail + 1) % (chan)->data_len) +#define NEXT_READ(chan) (((chan)->head + 1) % (chan)->data_len) + +#define ELEM(chan, i) \ + (void *)((char *)(chan)->data + (i) * (chan)->elem_size) +#define TAIL(chan) ELEM(chan, (chan)->tail) +#define HEAD(chan) ELEM(chan, (chan)->head) + +/* Insert a new element at the end. */ +static int +channel_write(sdb_channel_t *chan, const void *data) +{ + assert(chan && data); + + if (chan->full) + return -1; + + memcpy(TAIL(chan), data, chan->elem_size); + chan->tail = NEXT_WRITE(chan); + + chan->full = chan->tail == chan->head; + return 0; +} /* channel_write */ + +/* retrieve the first element */ +static int +channel_read(sdb_channel_t *chan, void *data) +{ + assert(chan && data); + + if ((chan->head == chan->tail) && (! chan->full)) + return -1; + + memcpy(data, HEAD(chan), chan->elem_size); + chan->head = NEXT_READ(chan); + + chan->full = 0; + return 0; +} /* channel_read */ + +/* + * public API + */ + +sdb_channel_t * +sdb_channel_create(size_t size, size_t elem_size) +{ + sdb_channel_t *chan; + + if (! elem_size) + return NULL; + if (! size) + size = 1; + + chan = calloc(1, sizeof(*chan)); + if (! chan) + return NULL; + + chan->data = calloc(size, elem_size); + if (! chan->data) { + sdb_channel_destroy(chan); + return NULL; + } + + chan->data_len = size; + chan->elem_size = elem_size; + + pthread_rwlock_init(&chan->lock, /* attr = */ NULL); + + chan->head = chan->tail = 0; + return chan; +} /* sdb_channel_create */ + +void +sdb_channel_destroy(sdb_channel_t *chan) +{ + if (! chan) + return; + + pthread_rwlock_wrlock(&chan->lock); + free(chan->data); + chan->data = NULL; + chan->data_len = 0; + + pthread_rwlock_unlock(&chan->lock); + pthread_rwlock_destroy(&chan->lock); + free(chan); +} /* sdb_channel_destroy */ + +int +sdb_channel_write(sdb_channel_t *chan, const void *data) +{ + int status; + + if ((! chan) || (! data)) + return -1; + + pthread_rwlock_wrlock(&chan->lock); + status = channel_write(chan, data); + pthread_rwlock_unlock(&chan->lock); + return status; +} /* sdb_channel_write */ + +int +sdb_channel_read(sdb_channel_t *chan, void *data) +{ + int status; + + if ((! chan) || (! data)) + return -1; + + pthread_rwlock_wrlock(&chan->lock); + status = channel_read(chan, data); + pthread_rwlock_unlock(&chan->lock); + return status; +} /* sdb_channel_read */ + +/* vim: set tw=78 sw=4 ts=4 noexpandtab : */ + -- 2.30.2