X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/blobdiff_plain/802ce8090d999864f8b55f6c2e19ac4fe0805ff3..4d5ec9803b74d0f3e9c002a92a748bd08b6640ac:/messages.cpp diff --git a/messages.cpp b/messages.cpp index f5df396..e20c4ca 100644 --- a/messages.cpp +++ b/messages.cpp @@ -8,61 +8,84 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm); #include "messages.h" +message::message(message_type t, double a, double c) + : type(t), amount(a) , credit(c) +{ + // compute message size + // arbitrary: 8 for type, and 8 for each double + switch (type) { + case message_type::CTRL: + size = opt::bookkeeping ? 24 : 16; // type + amount + (credit)? + break; + case message_type::DATA: + size = 16 + opt::comm_cost(amount); // type + amount + data size + break; + default: + size = 8; // type + break; + } +} + std::string message::to_string() { - static const char* str[] = { "INFO", "CREDIT", "LOAD", - "CTRL_CLOSE", "DATA_CLOSE" }; std::ostringstream oss; - oss << str[type] << ": " << amount; + switch (type) { + case message_type::CTRL: + oss << "CTRL: " << amount << " (info)"; + if (opt::bookkeeping) + oss << "; " << credit << " (credit)"; + break; + case message_type::DATA: + oss << "DATA: " << amount << " (load)"; + break; + case message_type::CTRL_CLOSE: + oss << "CTRL_CLOSE"; + break; + case message_type::DATA_CLOSE: + oss << "DATA_CLOSE"; + break; + default: + oss << "UNKNOWN MESSAGE TYPE: " << static_cast(type); + break; + } return oss.str(); } -double message::get_size() const +void message_queue::push(msg_task_t task) { - // arbitrary: 8 for type, and 8 for amount - double size = 16; - if (type == LOAD) - size += opt::comm_cost(amount); - return size; + if (queue.push(task)) { + // list was empty, the push must be signaled + mutex.acquire(); + cond.signal(); + mutex.release(); + } } -void message_queue::push(m_task_t task) +bool message_queue::pop(message*& msg, msg_host_t& from, double timeout) { - mutex.acquire(); - queue.push(task); - cond.signal(); - mutex.release(); -} + msg_task_t task = nullptr; + if (!queue.try_pop(task)) { + if (timeout == 0.0) + return false; -bool message_queue::pop(message*& msg, m_host_t& from, double timeout) -{ - if (timeout != 0) { - volatile double deadline = - timeout > 0 ? MSG_get_clock() + timeout : 0.0; mutex.acquire(); - while (queue.empty() && (!deadline || deadline > MSG_get_clock())) { - xbt_ex_t e; + if (!queue.try_pop(task)) { XBT_DEBUG("waiting for a message to come"); - TRY { - if (deadline) - cond.timedwait(mutex, deadline - MSG_get_clock()); - else - cond.wait(mutex); - } - CATCH (e) { - if (e.category != timeout_error) - RETHROW; - xbt_ex_free(e); + bool hit_timeout; + if (timeout > 0) { + hit_timeout = !cond.timedwait(mutex, timeout); + } else { + cond.wait(mutex); + hit_timeout = false; } + bool pop_was_successful = queue.try_pop(task); + xbt_assert(hit_timeout || pop_was_successful); } mutex.release(); } - - if (queue.empty()) + if (task == nullptr) return false; - m_task_t task = queue.front(); - queue.pop(); msg = static_cast(MSG_task_get_data(task)); from = MSG_task_get_source(task); MSG_task_destroy(task);