Code

utils channel: Added sdb_channel_select().
[sysdb.git] / src / utils / channel.c
index c57b09d66123ba338925103f34d634cb861a4589..22e31a8b4eb0303e063ca6b3bfae2e0e8dcb19ad 100644 (file)
@@ -41,6 +41,9 @@
 struct sdb_channel {
        pthread_mutex_t lock;
 
+       /* signaling for select() operation */
+       pthread_cond_t cond;
+
        /* maybe TODO: add support for 'nil' values using a boolean area */
 
        void  *data;
@@ -64,35 +67,45 @@ struct sdb_channel {
 #define TAIL(chan) ELEM(chan, (chan)->tail)
 #define HEAD(chan) ELEM(chan, (chan)->head)
 
-/* Insert a new element at the end. */
+/* Insert a new element at the end; chan->lock must be held.
+ * Returns 0 if data has been written or if data may be written
+ * if 'data' is NULL. */
 static int
 channel_write(sdb_channel_t *chan, const void *data)
 {
-       assert(chan && data);
+       assert(chan);
 
        if (chan->full)
                return -1;
+       else if (! data)
+               return 0;
 
        memcpy(TAIL(chan), data, chan->elem_size);
        chan->tail = NEXT_WRITE(chan);
 
        chan->full = chan->tail == chan->head;
+       pthread_cond_broadcast(&chan->cond);
        return 0;
 } /* channel_write */
 
-/* retrieve the first element */
+/* Retrieve the first element; chan->lock must be held.
+ * Returns 0 if data has been read or if data is available
+ * if 'data' is NULL. */
 static int
 channel_read(sdb_channel_t *chan, void *data)
 {
-       assert(chan && data);
+       assert(chan);
 
        if ((chan->head == chan->tail) && (! chan->full))
                return -1;
+       else if (! data)
+               return 0;
 
        memcpy(data, HEAD(chan), chan->elem_size);
        chan->head = NEXT_READ(chan);
 
        chan->full = 0;
+       pthread_cond_broadcast(&chan->cond);
        return 0;
 } /* channel_read */
 
@@ -124,6 +137,7 @@ sdb_channel_create(size_t size, size_t elem_size)
        chan->elem_size = elem_size;
 
        pthread_mutex_init(&chan->lock, /* attr = */ NULL);
+       pthread_cond_init(&chan->cond, /* attr = */ NULL);
 
        chan->head = chan->tail = 0;
        return chan;
@@ -140,11 +154,54 @@ sdb_channel_destroy(sdb_channel_t *chan)
        chan->data = NULL;
        chan->data_len = 0;
 
+       pthread_cond_destroy(&chan->cond);
+
        pthread_mutex_unlock(&chan->lock);
        pthread_mutex_destroy(&chan->lock);
        free(chan);
 } /* sdb_channel_destroy */
 
+int
+sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data,
+               int *wantwrite, void *write_data, const struct timespec *timeout)
+{
+       int status = 0;
+
+       if (! chan)
+               return -1;
+
+       if ((! wantread) && (! read_data) && (! wantwrite) && (! write_data))
+               return -1;
+
+       pthread_mutex_lock(&chan->lock);
+       while (! status) {
+               int read_status, write_status;
+
+               read_status = channel_read(chan, read_data);
+               write_status = channel_write(chan, write_data);
+
+               if ((! read_status) || (! write_status)) {
+                       if (wantread)
+                               *wantread = read_status == 0;
+                       if (wantwrite)
+                               *wantwrite = write_status == 0;
+
+                       if (((wantread || read_data) && (! read_status))
+                                       || ((wantwrite || write_data) && (! write_status)))
+                               break;
+               }
+
+               if (timeout)
+                       status = pthread_cond_timedwait(&chan->cond, &chan->lock,
+                                       timeout);
+               else
+                       status = pthread_cond_wait(&chan->cond, &chan->lock);
+       }
+
+       pthread_mutex_unlock(&chan->lock);
+       return status;
+} /* sdb_channel_select */
+
 int
 sdb_channel_write(sdb_channel_t *chan, const void *data)
 {