1 /* A (synchronized) message queue. */
2 /* Popping an empty queue is blocking, as well as pushing a full one */
4 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
5 * All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "xbt/sysdep.h"
13 #include "xbt/dynar.h"
15 #include "xbt/synchro.h"
16 #include "xbt/queue.h" /* this module */
17 #include "gras/virtu.h"
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt, "Message exchanging queue");
20 typedef struct s_xbt_queue_ {
24 xbt_cond_t not_full, not_empty;
27 /** @brief Create a new message exchange queue.
29 * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
30 * @param elm_size size of each element stored in it (see #xbt_dynar_new)
32 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
34 xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
35 xbt_assert0(capacity >= 0, "Capacity cannot be negative");
37 res->capacity = capacity;
38 res->data = xbt_dynar_new(elm_size, NULL);
39 res->mutex = xbt_mutex_init();
40 res->not_full = xbt_cond_init();
41 res->not_empty = xbt_cond_init();
45 /** @brief Destroy a message exchange queue.
47 * Any remaining content is leaked.
49 void xbt_queue_free(xbt_queue_t * queue)
52 xbt_dynar_free(&((*queue)->data));
53 xbt_mutex_destroy((*queue)->mutex);
54 xbt_cond_destroy((*queue)->not_full);
55 xbt_cond_destroy((*queue)->not_empty);
60 /** @brief Get the queue size */
61 unsigned long xbt_queue_length(const xbt_queue_t queue)
64 xbt_mutex_acquire(queue->mutex);
65 res = xbt_dynar_length(queue->data);
66 xbt_mutex_release(queue->mutex);
70 /** @brief Push something to the message exchange queue.
72 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
74 * @see #xbt_dynar_push
76 void xbt_queue_push(xbt_queue_t queue, const void *src)
78 xbt_mutex_acquire(queue->mutex);
79 while (queue->capacity != 0
80 && queue->capacity == xbt_dynar_length(queue->data)) {
81 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
82 xbt_cond_wait(queue->not_full, queue->mutex);
84 xbt_dynar_push(queue->data, src);
85 xbt_cond_signal(queue->not_empty);
86 xbt_mutex_release(queue->mutex);
90 /** @brief Pop something from the message exchange queue.
92 * This is blocking if the queue is empty.
97 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
99 xbt_mutex_acquire(queue->mutex);
100 while (xbt_dynar_length(queue->data) == 0) {
101 DEBUG1("Queue %p empty. Waiting", queue);
102 xbt_cond_wait(queue->not_empty, queue->mutex);
104 xbt_dynar_pop(queue->data, dst);
105 xbt_cond_signal(queue->not_full);
106 xbt_mutex_release(queue->mutex);
109 /** @brief Unshift something to the message exchange queue.
111 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
113 * @see #xbt_dynar_unshift
115 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
117 xbt_mutex_acquire(queue->mutex);
118 while (queue->capacity != 0
119 && queue->capacity == xbt_dynar_length(queue->data)) {
120 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
121 xbt_cond_wait(queue->not_full, queue->mutex);
123 xbt_dynar_unshift(queue->data, src);
124 xbt_cond_signal(queue->not_empty);
125 xbt_mutex_release(queue->mutex);
129 /** @brief Shift something from the message exchange queue.
131 * This is blocking if the queue is empty.
133 * @see #xbt_dynar_shift
136 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
138 xbt_mutex_acquire(queue->mutex);
139 while (xbt_dynar_length(queue->data) == 0) {
140 DEBUG1("Queue %p empty. Waiting", queue);
141 xbt_cond_wait(queue->not_empty, queue->mutex);
143 xbt_dynar_shift(queue->data, dst);
144 xbt_cond_signal(queue->not_full);
145 xbt_mutex_release(queue->mutex);
151 /** @brief Push something to the message exchange queue, with a timeout.
153 * @see #xbt_queue_push
155 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
157 double begin = xbt_time();
160 xbt_mutex_acquire(queue->mutex);
163 if (queue->capacity != 0 &&
164 queue->capacity == xbt_dynar_length(queue->data)) {
166 xbt_mutex_release(queue->mutex);
167 THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
168 queue, queue->capacity);
171 while (queue->capacity != 0 &&
172 queue->capacity == xbt_dynar_length(queue->data) &&
173 (delay < 0 || (xbt_time() - begin) <= delay)) {
175 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
177 xbt_cond_timedwait(queue->not_full, queue->mutex,
178 delay < 0 ? -1 : delay - (xbt_time() - begin));
181 xbt_mutex_release(queue->mutex);
187 xbt_dynar_push(queue->data, src);
188 xbt_cond_signal(queue->not_empty);
189 xbt_mutex_release(queue->mutex);
193 /** @brief Pop something from the message exchange queue, with a timeout.
195 * @see #xbt_queue_pop
198 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
200 double begin = xbt_time();
203 xbt_mutex_acquire(queue->mutex);
206 if (xbt_dynar_length(queue->data) == 0) {
207 xbt_mutex_release(queue->mutex);
208 THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
211 while ((xbt_dynar_length(queue->data) == 0) &&
212 (delay < 0 || (xbt_time() - begin) <= delay)) {
213 DEBUG1("Queue %p empty. Waiting", queue);
215 xbt_cond_timedwait(queue->not_empty, queue->mutex,
216 delay < 0 ? -1 : delay - (xbt_time() - begin));
219 xbt_mutex_release(queue->mutex);
225 xbt_dynar_pop(queue->data, dst);
226 xbt_cond_signal(queue->not_full);
227 xbt_mutex_release(queue->mutex);
230 /** @brief Unshift something to the message exchange queue, with a timeout.
232 * @see #xbt_queue_unshift
234 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src, double delay)
236 double begin = xbt_time();
239 xbt_mutex_acquire(queue->mutex);
242 if (queue->capacity != 0 &&
243 queue->capacity == xbt_dynar_length(queue->data)) {
245 xbt_mutex_release(queue->mutex);
246 THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
247 queue, queue->capacity);
250 while (queue->capacity != 0 &&
251 queue->capacity == xbt_dynar_length(queue->data) &&
252 (delay < 0 || (xbt_time() - begin) <= delay)) {
254 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
256 xbt_cond_timedwait(queue->not_full, queue->mutex,
257 delay < 0 ? -1 : delay - (xbt_time() - begin));
260 xbt_mutex_release(queue->mutex);
266 xbt_dynar_unshift(queue->data, src);
267 xbt_cond_signal(queue->not_empty);
268 xbt_mutex_release(queue->mutex);
272 /** @brief Shift something from the message exchange queue, with a timeout.
274 * @see #xbt_queue_shift
277 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst, double delay)
279 double begin = xbt_time();
282 xbt_mutex_acquire(queue->mutex);
285 if (xbt_dynar_length(queue->data) == 0) {
286 xbt_mutex_release(queue->mutex);
287 THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
290 while ((xbt_dynar_length(queue->data) == 0) &&
291 (delay < 0 || (xbt_time() - begin) <= delay)) {
292 DEBUG1("Queue %p empty. Waiting", queue);
294 xbt_cond_timedwait(queue->not_empty, queue->mutex,
295 delay < 0 ? -1 : delay - (xbt_time() - begin));
298 xbt_mutex_release(queue->mutex);
304 if (xbt_dynar_length(queue->data) == 0) {
305 xbt_mutex_release(queue->mutex);
306 THROW1(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
310 xbt_dynar_shift(queue->data, dst);
311 xbt_cond_signal(queue->not_full);
312 xbt_mutex_release(queue->mutex);