From 1c3da8681b394ce3b14fb44e27cb682c045d5f92 Mon Sep 17 00:00:00 2001 From: Arnaud Giersch Date: Mon, 23 May 2011 14:57:56 +0200 Subject: [PATCH 1/1] Use a lock-free synchronized queue for the message queue. --- messages.cpp | 39 ++++++++++++++++++++++----------------- messages.h | 3 ++- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/messages.cpp b/messages.cpp index f5df396..025871a 100644 --- a/messages.cpp +++ b/messages.cpp @@ -28,41 +28,46 @@ double message::get_size() const void message_queue::push(m_task_t task) { - mutex.acquire(); - queue.push(task); - cond.signal(); - mutex.release(); + if (queue.push(task)) { + // list was empty, the push must be signaled + mutex.acquire(); + cond.signal(); + mutex.release(); + } } bool message_queue::pop(message*& msg, m_host_t& from, double timeout) { - if (timeout != 0) { - volatile double deadline = - timeout > 0 ? MSG_get_clock() + timeout : 0.0; + m_task_t task; + if (!queue.try_pop(task)) { + if (timeout == 0.0) + return false; + mutex.acquire(); - while (queue.empty() && (!deadline || deadline > MSG_get_clock())) { + if (!queue.try_pop(task)) { xbt_ex_t e; XBT_DEBUG("waiting for a message to come"); TRY { - if (deadline) - cond.timedwait(mutex, deadline - MSG_get_clock()); + if (timeout > 0) + cond.timedwait(mutex, timeout); else cond.wait(mutex); } + TRY_CLEANUP { + mutex.release(); + } CATCH (e) { if (e.category != timeout_error) RETHROW; xbt_ex_free(e); + return false; // got a timeout } + bool pop_was_successful = queue.try_pop(task); + xbt_assert(pop_was_successful); + } else { + mutex.release(); } - mutex.release(); } - - if (queue.empty()) - return false; - - m_task_t task = queue.front(); - queue.pop(); msg = static_cast(MSG_task_get_data(task)); from = MSG_task_get_source(task); MSG_task_destroy(task); diff --git a/messages.h b/messages.h index 605b3be..8de464a 100644 --- a/messages.h +++ b/messages.h @@ -5,6 +5,7 @@ #include #include #include "synchro.h" +#include "sync_queue.h" class message { public: @@ -39,7 +40,7 @@ public: private: mutex_t mutex; condition_t cond; - std::queue queue; + sync_queue queue; }; #endif // !MESSAGES_H -- 2.39.5