Code

core/store*_test.c: Converted most tests to "loop" tests.
[sysdb.git] / src / utils / channel.c
1 /*
2  * SysDB - src/utils/channel.c
3  * Copyright (C) 2013 Sebastian 'tokkee' Harl <sh@tokkee.org>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
19  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
28 #if HAVE_CONFIG_H
29 #       include "config.h"
30 #endif /* HAVE_CONFIG_H */
32 #include "utils/channel.h"
34 #include <assert.h>
35 #include <errno.h>
37 #include <stdbool.h>
38 #include <stdlib.h>
39 #include <string.h>
41 #include <time.h>
43 #include <pthread.h>
45 /*
46  * private data types
47  */
49 struct sdb_channel {
50         pthread_mutex_t lock;
52         /* signaling for select() operation */
53         pthread_cond_t cond;
55         /* maybe TODO: add support for 'nil' values using a boolean area */
57         void  *data;
58         size_t data_len;
59         size_t elem_size;
61         size_t head;
62         size_t tail;
63         bool  full;
65         bool shutdown;
66 };
68 /*
69  * private helper functions
70  */
72 #define NEXT_WRITE(chan) (((chan)->tail + 1) % (chan)->data_len)
73 #define NEXT_READ(chan) (((chan)->head + 1) % (chan)->data_len)
75 #define ELEM(chan, i) \
76         (void *)((char *)(chan)->data + (i) * (chan)->elem_size)
77 #define TAIL(chan) ELEM(chan, (chan)->tail)
78 #define HEAD(chan) ELEM(chan, (chan)->head)
80 /* Insert a new element at the end; chan->lock must be held.
81  * Returns 0 if data has been written or if data may be written
82  * if 'data' is NULL. */
83 static int
84 channel_write(sdb_channel_t *chan, const void *data)
85 {
86         assert(chan);
88         if (chan->full || chan->shutdown)
89                 return -1;
90         else if (! data)
91                 return 0;
93         memcpy(TAIL(chan), data, chan->elem_size);
94         chan->tail = NEXT_WRITE(chan);
96         chan->full = chan->tail == chan->head;
97         pthread_cond_broadcast(&chan->cond);
98         return 0;
99 } /* channel_write */
101 /* Retrieve the first element; chan->lock must be held.
102  * Returns 0 if data has been read or if data is available
103  * if 'data' is NULL. */
104 static int
105 channel_read(sdb_channel_t *chan, void *data)
107         assert(chan);
109         if ((chan->head == chan->tail) && (! chan->full))
110                 return -1;
111         else if (! data)
112                 return 0;
114         memcpy(data, HEAD(chan), chan->elem_size);
115         chan->head = NEXT_READ(chan);
117         chan->full = 0;
118         pthread_cond_broadcast(&chan->cond);
119         return 0;
120 } /* channel_read */
122 /*
123  * public API
124  */
126 sdb_channel_t *
127 sdb_channel_create(size_t size, size_t elem_size)
129         sdb_channel_t *chan;
131         if (! elem_size)
132                 return NULL;
133         if (! size)
134                 size = 1;
136         chan = calloc(1, sizeof(*chan));
137         if (! chan)
138                 return NULL;
140         chan->data = calloc(size, elem_size);
141         if (! chan->data) {
142                 sdb_channel_destroy(chan);
143                 return NULL;
144         }
146         chan->data_len = size;
147         chan->elem_size = elem_size;
149         pthread_mutex_init(&chan->lock, /* attr = */ NULL);
150         pthread_cond_init(&chan->cond, /* attr = */ NULL);
152         chan->head = chan->tail = 0;
153         return chan;
154 } /* sdb_channel_create */
156 void
157 sdb_channel_destroy(sdb_channel_t *chan)
159         if (! chan)
160                 return;
162         pthread_mutex_lock(&chan->lock);
163         free(chan->data);
164         chan->data = NULL;
165         chan->data_len = 0;
167         pthread_cond_destroy(&chan->cond);
169         pthread_mutex_unlock(&chan->lock);
170         pthread_mutex_destroy(&chan->lock);
171         free(chan);
172 } /* sdb_channel_destroy */
174 int
175 sdb_channel_select(sdb_channel_t *chan, int *wantread, void *read_data,
176                 int *wantwrite, void *write_data, const struct timespec *timeout)
178         int status = 0;
180         if (! chan) {
181                 errno = EINVAL;
182                 return -1;
183         }
185         if ((! wantread) && (! read_data) && (! wantwrite) && (! write_data)) {
186                 errno = EINVAL;
187                 return -1;
188         }
190         pthread_mutex_lock(&chan->lock);
191         while (! status) {
192                 int read_status, write_status;
194                 read_status = channel_read(chan, read_data);
195                 write_status = channel_write(chan, write_data);
197                 if ((! read_status) || (! write_status)) {
198                         if (wantread)
199                                 *wantread = read_status == 0;
200                         if (wantwrite)
201                                 *wantwrite = write_status == 0;
203                         if (((wantread || read_data) && (! read_status))
204                                         || ((wantwrite || write_data) && (! write_status)))
205                                 break;
206                 }
208                 if (chan->shutdown) {
209                         if (read_status)
210                                 status = EBADF;
211                         break;
212                 }
214                 if (timeout) {
215                         struct timespec abstime;
217                         if (clock_gettime(CLOCK_REALTIME, &abstime)) {
218                                 pthread_mutex_unlock(&chan->lock);
219                                 return -1;
220                         }
222                         abstime.tv_sec += timeout->tv_sec;
223                         abstime.tv_nsec += timeout->tv_nsec;
225                         if (abstime.tv_nsec > 1000000000) {
226                                 abstime.tv_nsec -= 1000000000;
227                                 abstime.tv_sec += 1;
228                         }
230                         status = pthread_cond_timedwait(&chan->cond, &chan->lock,
231                                         &abstime);
232                 }
233                 else
234                         status = pthread_cond_wait(&chan->cond, &chan->lock);
235         }
236         pthread_mutex_unlock(&chan->lock);
238         if (status) {
239                 errno = status;
240                 return -1;
241         }
242         return 0;
243 } /* sdb_channel_select */
245 int
246 sdb_channel_write(sdb_channel_t *chan, const void *data)
248         int status;
250         if ((! chan) || (! data))
251                 return -1;
253         pthread_mutex_lock(&chan->lock);
254         status = channel_write(chan, data);
255         pthread_mutex_unlock(&chan->lock);
256         return status;
257 } /* sdb_channel_write */
259 int
260 sdb_channel_read(sdb_channel_t *chan, void *data)
262         int status;
264         if ((! chan) || (! data))
265                 return -1;
267         pthread_mutex_lock(&chan->lock);
268         status = channel_read(chan, data);
269         pthread_mutex_unlock(&chan->lock);
270         return status;
271 } /* sdb_channel_read */
273 int
274 sdb_channel_shutdown(sdb_channel_t *chan)
276         if (! chan)
277                 return -1;
278         chan->shutdown = 1;
279         return 0;
280 } /* sdb_channel_shutdown */
282 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */