+void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
+{
+ double begin = xbt_time();
+
+ xbt_mutex_acquire(queue->mutex);
+
+ if (delay == 0) {
+ if (xbt_dynar_is_empty(queue->data)) {
+ xbt_mutex_release(queue->mutex);
+ THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
+ }
+ } else {
+ while ((xbt_dynar_is_empty(queue->data)) &&
+ (delay < 0 || (xbt_time() - begin) <= delay)) {
+ XBT_DEBUG("Queue %p empty. Waiting", queue);
+ TRY {
+ xbt_cond_timedwait(queue->not_empty, queue->mutex,
+ delay < 0 ? -1 : delay - (xbt_time() - begin));
+ }
+ CATCH_ANONYMOUS {
+ xbt_mutex_release(queue->mutex);
+ RETHROW;
+ }
+ }
+ }
+
+ xbt_dynar_pop(queue->data, dst);
+ xbt_cond_signal(queue->not_full);
+ xbt_mutex_release(queue->mutex);
+}
+
+/** @brief Unshift something to the message exchange queue, with a timeout.
+ *
+ * @see #xbt_queue_unshift
+ */
+void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,
+ double delay)
+{
+ double begin = xbt_time();
+
+ xbt_mutex_acquire(queue->mutex);
+
+ if (delay == 0) {
+ if (queue->capacity != 0 &&
+ queue->capacity == xbt_dynar_length(queue->data)) {
+
+ xbt_mutex_release(queue->mutex);
+ THROWF(timeout_error, 0,
+ "Capacity of %p exceeded (=%d), and delay = 0", queue,
+ queue->capacity);
+ }
+ } else {
+ while (queue->capacity != 0 &&
+ queue->capacity == xbt_dynar_length(queue->data) &&
+ (delay < 0 || (xbt_time() - begin) <= delay)) {
+
+ XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
+ queue->capacity);
+ TRY {
+ xbt_cond_timedwait(queue->not_full, queue->mutex,
+ delay < 0 ? -1 : delay - (xbt_time() - begin));
+ }
+ CATCH_ANONYMOUS {
+ xbt_mutex_release(queue->mutex);
+ RETHROW;
+ }
+ }
+ }
+
+ xbt_dynar_unshift(queue->data, src);
+ xbt_cond_signal(queue->not_empty);
+ xbt_mutex_release(queue->mutex);
+}
+
+
+/** @brief Shift something from the message exchange queue, with a timeout.
+ *
+ * @see #xbt_queue_shift
+ *
+ */
+void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst,
+ double delay)
+{
+ double begin = xbt_time();
+
+ xbt_mutex_acquire(queue->mutex);
+
+ if (delay == 0) {
+ if (xbt_dynar_is_empty(queue->data)) {
+ xbt_mutex_release(queue->mutex);
+ THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
+ }
+ } else {
+ while ((xbt_dynar_is_empty(queue->data)) &&
+ (delay < 0 || (xbt_time() - begin) <= delay)) {
+ XBT_DEBUG("Queue %p empty. Waiting", queue);
+ TRY {
+ xbt_cond_timedwait(queue->not_empty, queue->mutex,
+ delay < 0 ? -1 : delay - (xbt_time() - begin));
+ }
+ CATCH_ANONYMOUS {
+ xbt_mutex_release(queue->mutex);
+ RETHROW;
+ }
+ }
+ }
+
+ if (xbt_dynar_is_empty(queue->data)) {
+ xbt_mutex_release(queue->mutex);
+ THROWF(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
+ delay);
+ }
+
+ xbt_dynar_shift(queue->data, dst);
+ xbt_cond_signal(queue->not_full);
+ xbt_mutex_release(queue->mutex);