3 /* A (synchronized) message queue. */
4 /* Popping an empty queue is blocking, as well as pushing a full one */
6 /* Copyright (c) 2007 Martin Quinson. All rights reserved. */
8 /* This program is free software; you can redistribute it and/or modify it
9 * under the terms of the license (GNU LGPL) which comes with this package. */
12 #include "xbt/sysdep.h"
15 #include "xbt/dynar.h"
17 #include "xbt/synchro.h"
18 #include "xbt/queue.h" /* this module */
19 #include "gras/virtu.h"
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
22 typedef struct s_xbt_queue_ {
26 xbt_cond_t not_full, not_empty;
29 /** @brief Create a new message exchange queue.
31 * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
32 * @param elm_size size of each element stored in it (see #xbt_dynar_new)
34 xbt_queue_t xbt_queue_new(int capacity,unsigned long elm_size) {
35 xbt_queue_t res = xbt_new0(s_xbt_queue_t,1);
36 xbt_assert0(capacity>=0,"Capacity cannot be negative");
38 res->capacity = capacity;
39 res->data = xbt_dynar_new(elm_size,NULL);
40 res->mutex = xbt_mutex_init();
41 res->not_full = xbt_cond_init();
42 res->not_empty = xbt_cond_init();
45 /** @brief Destroy a message exchange queue.
47 * Any remaining content is leaked.
49 void xbt_queue_free(xbt_queue_t *queue) {
51 xbt_dynar_free(&( (*queue)->data ));
52 xbt_mutex_destroy( (*queue)->mutex );
53 xbt_cond_destroy( (*queue)->not_full );
54 xbt_cond_destroy( (*queue)->not_empty );
59 /** @brief Get the queue size */
60 unsigned long xbt_queue_length(const xbt_queue_t queue) {
62 xbt_mutex_acquire(queue->mutex);
63 res=xbt_dynar_length(queue->data);
64 xbt_mutex_release(queue->mutex);
68 /** @brief Push something to the message exchange queue.
70 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
72 * @see #xbt_dynar_push
74 void xbt_queue_push(xbt_queue_t queue, const void *src) {
75 xbt_mutex_acquire(queue->mutex);
76 while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
77 DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
78 xbt_cond_wait(queue->not_full,queue->mutex);
80 xbt_dynar_push(queue->data,src);
81 xbt_cond_signal(queue->not_empty);
82 xbt_mutex_release(queue->mutex);
86 /** @brief Pop something from the message exchange queue.
88 * This is blocking if the queue is empty.
93 void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
94 xbt_mutex_acquire(queue->mutex);
95 while (xbt_dynar_length(queue->data) == 0) {
96 DEBUG1("Queue %p empty. Waiting",queue);
97 xbt_cond_wait(queue->not_empty,queue->mutex);
99 xbt_dynar_pop(queue->data,dst);
100 xbt_cond_signal(queue->not_full);
101 xbt_mutex_release(queue->mutex);
104 /** @brief Unshift something to the message exchange queue.
106 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
108 * @see #xbt_dynar_unshift
110 void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
111 xbt_mutex_acquire(queue->mutex);
112 while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
113 DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
114 xbt_cond_wait(queue->not_full,queue->mutex);
116 xbt_dynar_unshift(queue->data,src);
117 xbt_cond_signal(queue->not_empty);
118 xbt_mutex_release(queue->mutex);
122 /** @brief Shift something from the message exchange queue.
124 * This is blocking if the queue is empty.
126 * @see #xbt_dynar_shift
129 void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
130 xbt_mutex_acquire(queue->mutex);
131 while (xbt_dynar_length(queue->data) == 0) {
132 DEBUG1("Queue %p empty. Waiting",queue);
133 xbt_cond_wait(queue->not_empty,queue->mutex);
135 xbt_dynar_shift(queue->data,dst);
136 xbt_cond_signal(queue->not_full);
137 xbt_mutex_release(queue->mutex);
143 /** @brief Push something to the message exchange queue, with a timeout.
145 * @see #xbt_queue_push
147 void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
148 double timeout = xbt_time() + delay;
151 xbt_mutex_acquire(queue->mutex);
154 if (queue->capacity != 0 &&
155 queue->capacity == xbt_dynar_length(queue->data)) {
157 xbt_mutex_release(queue->mutex);
158 THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
159 queue,queue->capacity);
162 while (queue->capacity != 0 &&
163 queue->capacity == xbt_dynar_length(queue->data) &&
164 (delay<0 || xbt_time() < timeout) ) {
166 DEBUG2("Capacity of %p exceded (=%d). Waiting",
167 queue,queue->capacity);
169 xbt_cond_timedwait(queue->not_full,queue->mutex,
170 delay < 0 ? -1 : timeout - xbt_time());
172 xbt_mutex_release(queue->mutex);
178 xbt_dynar_push(queue->data,src);
179 xbt_cond_signal(queue->not_empty);
180 xbt_mutex_release(queue->mutex);
184 /** @brief Pop something from the message exchange queue, with a timeout.
186 * @see #xbt_queue_pop
189 void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
190 double timeout = xbt_time() + delay;
193 xbt_mutex_acquire(queue->mutex);
196 if (xbt_dynar_length(queue->data) == 0) {
197 xbt_mutex_release(queue->mutex);
198 THROW0(timeout_error,0,"Delay = 0, and queue is empty");
201 while ( (xbt_dynar_length(queue->data) == 0) &&
202 (delay<0 || xbt_time() < timeout) ) {
203 DEBUG1("Queue %p empty. Waiting",queue);
205 xbt_cond_timedwait(queue->not_empty,queue->mutex,
206 delay<0 ? -1 : timeout - xbt_time());
208 xbt_mutex_release(queue->mutex);
214 xbt_dynar_pop(queue->data,dst);
215 xbt_cond_signal(queue->not_full);
216 xbt_mutex_release(queue->mutex);
219 /** @brief Unshift something to the message exchange queue, with a timeout.
221 * @see #xbt_queue_unshift
223 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
224 double timeout = xbt_time() + delay;
227 xbt_mutex_acquire(queue->mutex);
230 if (queue->capacity != 0 &&
231 queue->capacity == xbt_dynar_length(queue->data)) {
233 xbt_mutex_release(queue->mutex);
234 THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
235 queue,queue->capacity);
238 while (queue->capacity != 0 &&
239 queue->capacity == xbt_dynar_length(queue->data) &&
240 (delay<0 || xbt_time() < timeout) ) {
242 DEBUG2("Capacity of %p exceded (=%d). Waiting",
243 queue,queue->capacity);
245 xbt_cond_timedwait(queue->not_full,queue->mutex,
246 delay < 0 ? -1 : timeout - xbt_time());
248 xbt_mutex_release(queue->mutex);
254 xbt_dynar_unshift(queue->data,src);
255 xbt_cond_signal(queue->not_empty);
256 xbt_mutex_release(queue->mutex);
260 /** @brief Shift something from the message exchange queue, with a timeout.
262 * @see #xbt_queue_shift
265 void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
266 double timeout = xbt_time() + delay;
269 xbt_mutex_acquire(queue->mutex);
272 if (xbt_dynar_length(queue->data) == 0) {
273 xbt_mutex_release(queue->mutex);
274 THROW0(timeout_error,0,"Delay = 0, and queue is empty");
277 while ( (xbt_dynar_length(queue->data) == 0) &&
278 (delay<0 || xbt_time() < timeout) ) {
279 DEBUG1("Queue %p empty. Waiting",queue);
281 xbt_cond_timedwait(queue->not_empty,queue->mutex,
282 delay<0 ? -1 : timeout - xbt_time());
284 xbt_mutex_release(queue->mutex);
290 xbt_dynar_shift(queue->data,dst);
291 xbt_cond_signal(queue->not_full);
292 xbt_mutex_release(queue->mutex);