1 /* A (synchronized) message queue. */
2 /* Popping an empty queue is blocking, as well as pushing a full one */
4 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
5 * All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "xbt/sysdep.h"
13 #include "xbt/dynar.h"
15 #include "xbt/queue.h" /* this module */
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt,
17 "Message exchanging queue");
19 typedef struct s_xbt_queue_ {
23 xbt_cond_t not_full, not_empty;
26 /** @brief Create a new message exchange queue.
28 * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
29 * @param elm_size size of each element stored in it (see #xbt_dynar_new)
31 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
33 xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
37 res->capacity = capacity;
38 res->data = xbt_dynar_new(elm_size, NULL);
39 res->mutex = xbt_mutex_init();
40 res->not_full = xbt_cond_init();
41 res->not_empty = xbt_cond_init();
45 /** @brief Destroy a message exchange queue.
47 * Any remaining content is leaked.
49 void xbt_queue_free(xbt_queue_t * queue)
52 xbt_dynar_free(&((*queue)->data));
53 xbt_mutex_destroy((*queue)->mutex);
54 xbt_cond_destroy((*queue)->not_full);
55 xbt_cond_destroy((*queue)->not_empty);
60 /** @brief Get the queue size */
61 unsigned long xbt_queue_length(const xbt_queue_t queue)
64 xbt_mutex_acquire(queue->mutex);
65 res = xbt_dynar_length(queue->data);
66 xbt_mutex_release(queue->mutex);
70 /** @brief Push something to the message exchange queue.
72 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
74 * @see #xbt_dynar_push
76 void xbt_queue_push(xbt_queue_t queue, const void *src)
78 xbt_mutex_acquire(queue->mutex);
79 while (queue->capacity != 0
80 && queue->capacity == xbt_dynar_length(queue->data)) {
81 XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
83 xbt_cond_wait(queue->not_full, queue->mutex);
85 xbt_dynar_push(queue->data, src);
86 xbt_cond_signal(queue->not_empty);
87 xbt_mutex_release(queue->mutex);
91 /** @brief Pop something from the message exchange queue.
93 * This is blocking if the queue is empty.
98 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
100 xbt_mutex_acquire(queue->mutex);
101 while (xbt_dynar_is_empty(queue->data)) {
102 XBT_DEBUG("Queue %p empty. Waiting", queue);
103 xbt_cond_wait(queue->not_empty, queue->mutex);
105 xbt_dynar_pop(queue->data, dst);
106 xbt_cond_signal(queue->not_full);
107 xbt_mutex_release(queue->mutex);
110 /** @brief Unshift something to the message exchange queue.
112 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
114 * @see #xbt_dynar_unshift
116 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
118 xbt_mutex_acquire(queue->mutex);
119 while (queue->capacity != 0
120 && queue->capacity == xbt_dynar_length(queue->data)) {
121 XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
123 xbt_cond_wait(queue->not_full, queue->mutex);
125 xbt_dynar_unshift(queue->data, src);
126 xbt_cond_signal(queue->not_empty);
127 xbt_mutex_release(queue->mutex);
131 /** @brief Shift something from the message exchange queue.
133 * This is blocking if the queue is empty.
135 * @see #xbt_dynar_shift
138 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
140 xbt_mutex_acquire(queue->mutex);
141 while (xbt_dynar_is_empty(queue->data)) {
142 XBT_DEBUG("Queue %p empty. Waiting", queue);
143 xbt_cond_wait(queue->not_empty, queue->mutex);
145 xbt_dynar_shift(queue->data, dst);
146 xbt_cond_signal(queue->not_full);
147 xbt_mutex_release(queue->mutex);
153 /** @brief Push something to the message exchange queue, with a timeout.
155 * @see #xbt_queue_push
157 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
159 double begin = xbt_time();
161 xbt_mutex_acquire(queue->mutex);
164 if (queue->capacity != 0 &&
165 queue->capacity == xbt_dynar_length(queue->data)) {
167 xbt_mutex_release(queue->mutex);
168 THROWF(timeout_error, 0,
169 "Capacity of %p exceeded (=%d), and delay = 0", queue,
173 while (queue->capacity != 0 &&
174 queue->capacity == xbt_dynar_length(queue->data) &&
175 (delay < 0 || (xbt_time() - begin) <= delay)) {
177 XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
180 xbt_cond_timedwait(queue->not_full, queue->mutex,
181 delay < 0 ? -1 : delay - (xbt_time() - begin));
184 xbt_mutex_release(queue->mutex);
190 xbt_dynar_push(queue->data, src);
191 xbt_cond_signal(queue->not_empty);
192 xbt_mutex_release(queue->mutex);
196 /** @brief Pop something from the message exchange queue, with a timeout.
198 * @see #xbt_queue_pop
201 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
203 double begin = xbt_time();
205 xbt_mutex_acquire(queue->mutex);
208 if (xbt_dynar_is_empty(queue->data)) {
209 xbt_mutex_release(queue->mutex);
210 THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
213 while ((xbt_dynar_is_empty(queue->data)) &&
214 (delay < 0 || (xbt_time() - begin) <= delay)) {
215 XBT_DEBUG("Queue %p empty. Waiting", queue);
217 xbt_cond_timedwait(queue->not_empty, queue->mutex,
218 delay < 0 ? -1 : delay - (xbt_time() - begin));
221 xbt_mutex_release(queue->mutex);
227 xbt_dynar_pop(queue->data, dst);
228 xbt_cond_signal(queue->not_full);
229 xbt_mutex_release(queue->mutex);
232 /** @brief Unshift something to the message exchange queue, with a timeout.
234 * @see #xbt_queue_unshift
236 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,
239 double begin = xbt_time();
241 xbt_mutex_acquire(queue->mutex);
244 if (queue->capacity != 0 &&
245 queue->capacity == xbt_dynar_length(queue->data)) {
247 xbt_mutex_release(queue->mutex);
248 THROWF(timeout_error, 0,
249 "Capacity of %p exceeded (=%d), and delay = 0", queue,
253 while (queue->capacity != 0 &&
254 queue->capacity == xbt_dynar_length(queue->data) &&
255 (delay < 0 || (xbt_time() - begin) <= delay)) {
257 XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
260 xbt_cond_timedwait(queue->not_full, queue->mutex,
261 delay < 0 ? -1 : delay - (xbt_time() - begin));
264 xbt_mutex_release(queue->mutex);
270 xbt_dynar_unshift(queue->data, src);
271 xbt_cond_signal(queue->not_empty);
272 xbt_mutex_release(queue->mutex);
276 /** @brief Shift something from the message exchange queue, with a timeout.
278 * @see #xbt_queue_shift
281 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst,
284 double begin = xbt_time();
286 xbt_mutex_acquire(queue->mutex);
289 if (xbt_dynar_is_empty(queue->data)) {
290 xbt_mutex_release(queue->mutex);
291 THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
294 while ((xbt_dynar_is_empty(queue->data)) &&
295 (delay < 0 || (xbt_time() - begin) <= delay)) {
296 XBT_DEBUG("Queue %p empty. Waiting", queue);
298 xbt_cond_timedwait(queue->not_empty, queue->mutex,
299 delay < 0 ? -1 : delay - (xbt_time() - begin));
302 xbt_mutex_release(queue->mutex);
308 if (xbt_dynar_is_empty(queue->data)) {
309 xbt_mutex_release(queue->mutex);
310 THROWF(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
314 xbt_dynar_shift(queue->data, dst);
315 xbt_cond_signal(queue->not_full);
316 xbt_mutex_release(queue->mutex);