1 /* A (synchronized) message queue. */
2 /* Popping an empty queue is blocking, as well as pushing a full one */
4 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
5 * All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "xbt/sysdep.h"
13 #include "xbt/dynar.h"
15 #include "xbt/synchro.h"
16 #include "xbt/queue.h" /* this module */
17 #include "gras/virtu.h"
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt,
19 "Message exchanging queue");
21 typedef struct s_xbt_queue_ {
25 xbt_cond_t not_full, not_empty;
28 /** @brief Create a new message exchange queue.
30 * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
31 * @param elm_size size of each element stored in it (see #xbt_dynar_new)
33 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
35 xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
39 res->capacity = capacity;
40 res->data = xbt_dynar_new(elm_size, NULL);
41 res->mutex = xbt_mutex_init();
42 res->not_full = xbt_cond_init();
43 res->not_empty = xbt_cond_init();
47 /** @brief Destroy a message exchange queue.
49 * Any remaining content is leaked.
51 void xbt_queue_free(xbt_queue_t * queue)
54 xbt_dynar_free(&((*queue)->data));
55 xbt_mutex_destroy((*queue)->mutex);
56 xbt_cond_destroy((*queue)->not_full);
57 xbt_cond_destroy((*queue)->not_empty);
62 /** @brief Get the queue size */
63 unsigned long xbt_queue_length(const xbt_queue_t queue)
66 xbt_mutex_acquire(queue->mutex);
67 res = xbt_dynar_length(queue->data);
68 xbt_mutex_release(queue->mutex);
72 /** @brief Push something to the message exchange queue.
74 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
76 * @see #xbt_dynar_push
78 void xbt_queue_push(xbt_queue_t queue, const void *src)
80 xbt_mutex_acquire(queue->mutex);
81 while (queue->capacity != 0
82 && queue->capacity == xbt_dynar_length(queue->data)) {
83 XBT_DEBUG("Capacity of %p exceded (=%d). Waiting", queue,
85 xbt_cond_wait(queue->not_full, queue->mutex);
87 xbt_dynar_push(queue->data, src);
88 xbt_cond_signal(queue->not_empty);
89 xbt_mutex_release(queue->mutex);
93 /** @brief Pop something from the message exchange queue.
95 * This is blocking if the queue is empty.
100 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
102 xbt_mutex_acquire(queue->mutex);
103 while (xbt_dynar_length(queue->data) == 0) {
104 XBT_DEBUG("Queue %p empty. Waiting", queue);
105 xbt_cond_wait(queue->not_empty, queue->mutex);
107 xbt_dynar_pop(queue->data, dst);
108 xbt_cond_signal(queue->not_full);
109 xbt_mutex_release(queue->mutex);
112 /** @brief Unshift something to the message exchange queue.
114 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
116 * @see #xbt_dynar_unshift
118 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
120 xbt_mutex_acquire(queue->mutex);
121 while (queue->capacity != 0
122 && queue->capacity == xbt_dynar_length(queue->data)) {
123 XBT_DEBUG("Capacity of %p exceded (=%d). Waiting", queue,
125 xbt_cond_wait(queue->not_full, queue->mutex);
127 xbt_dynar_unshift(queue->data, src);
128 xbt_cond_signal(queue->not_empty);
129 xbt_mutex_release(queue->mutex);
133 /** @brief Shift something from the message exchange queue.
135 * This is blocking if the queue is empty.
137 * @see #xbt_dynar_shift
140 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
142 xbt_mutex_acquire(queue->mutex);
143 while (xbt_dynar_length(queue->data) == 0) {
144 XBT_DEBUG("Queue %p empty. Waiting", queue);
145 xbt_cond_wait(queue->not_empty, queue->mutex);
147 xbt_dynar_shift(queue->data, dst);
148 xbt_cond_signal(queue->not_full);
149 xbt_mutex_release(queue->mutex);
155 /** @brief Push something to the message exchange queue, with a timeout.
157 * @see #xbt_queue_push
159 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
161 double begin = xbt_time();
164 xbt_mutex_acquire(queue->mutex);
167 if (queue->capacity != 0 &&
168 queue->capacity == xbt_dynar_length(queue->data)) {
170 xbt_mutex_release(queue->mutex);
171 THROWF(timeout_error, 0,
172 "Capacity of %p exceded (=%d), and delay = 0", queue,
176 while (queue->capacity != 0 &&
177 queue->capacity == xbt_dynar_length(queue->data) &&
178 (delay < 0 || (xbt_time() - begin) <= delay)) {
180 XBT_DEBUG("Capacity of %p exceded (=%d). Waiting", queue,
183 xbt_cond_timedwait(queue->not_full, queue->mutex,
184 delay < 0 ? -1 : delay - (xbt_time() - begin));
187 xbt_mutex_release(queue->mutex);
193 xbt_dynar_push(queue->data, src);
194 xbt_cond_signal(queue->not_empty);
195 xbt_mutex_release(queue->mutex);
199 /** @brief Pop something from the message exchange queue, with a timeout.
201 * @see #xbt_queue_pop
204 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
206 double begin = xbt_time();
209 xbt_mutex_acquire(queue->mutex);
212 if (xbt_dynar_length(queue->data) == 0) {
213 xbt_mutex_release(queue->mutex);
214 THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
217 while ((xbt_dynar_length(queue->data) == 0) &&
218 (delay < 0 || (xbt_time() - begin) <= delay)) {
219 XBT_DEBUG("Queue %p empty. Waiting", queue);
221 xbt_cond_timedwait(queue->not_empty, queue->mutex,
222 delay < 0 ? -1 : delay - (xbt_time() - begin));
225 xbt_mutex_release(queue->mutex);
231 xbt_dynar_pop(queue->data, dst);
232 xbt_cond_signal(queue->not_full);
233 xbt_mutex_release(queue->mutex);
236 /** @brief Unshift something to the message exchange queue, with a timeout.
238 * @see #xbt_queue_unshift
240 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,
243 double begin = xbt_time();
246 xbt_mutex_acquire(queue->mutex);
249 if (queue->capacity != 0 &&
250 queue->capacity == xbt_dynar_length(queue->data)) {
252 xbt_mutex_release(queue->mutex);
253 THROWF(timeout_error, 0,
254 "Capacity of %p exceded (=%d), and delay = 0", queue,
258 while (queue->capacity != 0 &&
259 queue->capacity == xbt_dynar_length(queue->data) &&
260 (delay < 0 || (xbt_time() - begin) <= delay)) {
262 XBT_DEBUG("Capacity of %p exceded (=%d). Waiting", queue,
265 xbt_cond_timedwait(queue->not_full, queue->mutex,
266 delay < 0 ? -1 : delay - (xbt_time() - begin));
269 xbt_mutex_release(queue->mutex);
275 xbt_dynar_unshift(queue->data, src);
276 xbt_cond_signal(queue->not_empty);
277 xbt_mutex_release(queue->mutex);
281 /** @brief Shift something from the message exchange queue, with a timeout.
283 * @see #xbt_queue_shift
286 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst,
289 double begin = xbt_time();
292 xbt_mutex_acquire(queue->mutex);
295 if (xbt_dynar_length(queue->data) == 0) {
296 xbt_mutex_release(queue->mutex);
297 THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
300 while ((xbt_dynar_length(queue->data) == 0) &&
301 (delay < 0 || (xbt_time() - begin) <= delay)) {
302 XBT_DEBUG("Queue %p empty. Waiting", queue);
304 xbt_cond_timedwait(queue->not_empty, queue->mutex,
305 delay < 0 ? -1 : delay - (xbt_time() - begin));
308 xbt_mutex_release(queue->mutex);
314 if (xbt_dynar_length(queue->data) == 0) {
315 xbt_mutex_release(queue->mutex);
316 THROWF(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
320 xbt_dynar_shift(queue->data, dst);
321 xbt_cond_signal(queue->not_full);
322 xbt_mutex_release(queue->mutex);