Code

plugin: Make sdb_plugin_info_t public.
[sysdb.git] / src / utils / channel.c
index c57b09d66123ba338925103f34d634cb861a4589..7c5c002ae545f43901e5b9dab352329936218b67 100644 (file)
  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+#if HAVE_CONFIG_H
+#      include "config.h"
+#endif /* HAVE_CONFIG_H */
+
 #include "utils/channel.h"
 
 #include <assert.h>
 
+#include <errno.h>
+
 #include <stdlib.h>
 #include <string.h>
 
+#include <time.h>
+
 #include <pthread.h>
 
 /*
@@ -41,6 +49,9 @@
 struct sdb_channel {
        pthread_mutex_t lock;
 
+       /* signaling for select() operation */
+       pthread_cond_t cond;
+
        /* maybe TODO: add support for 'nil' values using a boolean area */
 
        void  *data;
@@ -50,6 +61,8 @@ struct sdb_channel {
        size_t head;
        size_t tail;
        _Bool full;
+
+       _Bool shutdown;
 };
 
 /*
@@ -64,35 +77,45 @@ struct sdb_channel {
 #define TAIL(chan) ELEM(chan, (chan)->tail)
 #define HEAD(chan) ELEM(chan, (chan)->head)
 
-/* Insert a new element at the end. */
+/* Insert a new element at the end; chan->lock must be held.
+ * Returns 0 if data has been written or if data may be written
+ * if 'data' is NULL. */
 static int
 channel_write(sdb_channel_t *chan, const void *data)
 {
-       assert(chan && data);
+       assert(chan);
 
-       if (chan->full)
+       if (chan->full || chan->shutdown)
                return -1;
+       else if (! data)
+               return 0;
 
        memcpy(TAIL(chan), data, chan->elem_size);
        chan->tail = NEXT_WRITE(chan);
 
        chan->full = chan->tail == chan->head;
+       pthread_cond_broadcast(&chan->cond);
        return 0;
 } /* channel_write */
 
-/* retrieve the first element */
+/* Retrieve the first element; chan->lock must be held.
+ * Returns 0 if data has been read or if data is available
+ * if 'data' is NULL. */
 static int
 channel_read(sdb_channel_t *chan, void *data)
 {
-       assert(chan && data);
+       assert(chan);
 
        if ((chan->head == chan->tail) && (! chan->full))
                return -1;
+       else if (! data)
+               return 0;
 
        memcpy(data, HEAD(chan), chan->elem_size);
        chan->head = NEXT_READ(chan);
 
        chan->full = 0;
+       pthread_cond_broadcast(&chan->cond);
        return 0;
 } /* channel_read */
 
@@ -124,6 +147,7 @@ sdb_channel_create(size_t size, size_t elem_size)
        chan->elem_size = elem_size;
 
        pthread_mutex_init(&chan->lock, /* attr = */ NULL);
+       pthread_cond_init(&chan->cond, /* attr = */ NULL);
 
        chan->head = chan->tail = 0;
        return chan;
@@ -140,11 +164,84 @@ sdb_channel_destroy(sdb_channel_t *chan)
        chan->data = NULL;
        chan->data_len = 0;
 
+       pthread_cond_destroy(&chan->cond);
+
        pthread_mutex_unlock(&chan->lock);
        pthread_mutex_destroy(&chan->lock);
        free(chan);
 } /* sdb_channel_destroy */
 
+int
+sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data,
+               int *wantwrite, void *write_data, const struct timespec *timeout)
+{
+       int status = 0;
+
+       if (! chan) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       if ((! wantread) && (! read_data) && (! wantwrite) && (! write_data)) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       pthread_mutex_lock(&chan->lock);
+       while (! status) {
+               int read_status, write_status;
+
+               read_status = channel_read(chan, read_data);
+               write_status = channel_write(chan, write_data);
+
+               if ((! read_status) || (! write_status)) {
+                       if (wantread)
+                               *wantread = read_status == 0;
+                       if (wantwrite)
+                               *wantwrite = write_status == 0;
+
+                       if (((wantread || read_data) && (! read_status))
+                                       || ((wantwrite || write_data) && (! write_status)))
+                               break;
+               }
+
+               if (chan->shutdown) {
+                       if (read_status)
+                               status = EBADF;
+                       break;
+               }
+
+               if (timeout) {
+                       struct timespec abstime;
+
+                       if (clock_gettime(CLOCK_REALTIME, &abstime)) {
+                               pthread_mutex_unlock(&chan->lock);
+                               return -1;
+                       }
+
+                       abstime.tv_sec += timeout->tv_sec;
+                       abstime.tv_nsec += timeout->tv_nsec;
+
+                       if (abstime.tv_nsec > 1000000000) {
+                               abstime.tv_nsec -= 1000000000;
+                               abstime.tv_sec += 1;
+                       }
+
+                       status = pthread_cond_timedwait(&chan->cond, &chan->lock,
+                                       &abstime);
+               }
+               else
+                       status = pthread_cond_wait(&chan->cond, &chan->lock);
+       }
+       pthread_mutex_unlock(&chan->lock);
+
+       if (status) {
+               errno = status;
+               return -1;
+       }
+       return 0;
+} /* sdb_channel_select */
+
 int
 sdb_channel_write(sdb_channel_t *chan, const void *data)
 {
@@ -173,5 +270,14 @@ sdb_channel_read(sdb_channel_t *chan, void *data)
        return status;
 } /* sdb_channel_read */
 
+int
+sdb_channel_shutdown(sdb_channel_t *chan)
+{
+       if (! chan)
+               return -1;
+       chan->shutdown = 1;
+       return 0;
+} /* sdb_channel_shutdown */
+
 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */