]> AND Private Git Repository - loba.git/blob - messages.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Make communicator::recv() inline.
[loba.git] / messages.cpp
1 #include <sstream>
2 #include <xbt/log.h>
3
4 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm);
5
6 #include "misc.h"
7
8 #include "messages.h"
9
10 std::string message::to_string()
11 {
12     static const char* str[] = { "INFO", "CREDIT", "LOAD",
13                                  "CTRL_CLOSE", "DATA_CLOSE" };
14     std::ostringstream oss;
15     oss << str[type] << ": " << amount;
16     return oss.str();
17 }
18
19 void message_queue::push(m_task_t task)
20 {
21     mutex.acquire();
22     queue.push(task);
23     cond.signal();
24     mutex.release();
25 }
26
27 bool message_queue::pop(message*& msg, m_host_t& from, double timeout)
28 {
29     if (timeout != 0) {
30         volatile double deadline =
31             timeout > 0 ? MSG_get_clock() + timeout : 0.0;
32         mutex.acquire();
33         while (queue.empty() && (!deadline || deadline > MSG_get_clock())) {
34             xbt_ex_t e;
35             XBT_DEBUG("waiting for a message to come");
36             TRY {
37                 if (deadline)
38                     cond.timedwait(mutex, deadline - MSG_get_clock());
39                 else
40                     cond.wait(mutex);
41             }
42             CATCH (e) {
43                 if (e.category != timeout_error)
44                     RETHROW;
45                 xbt_ex_free(e);
46             }
47         }
48         mutex.release();
49     }
50
51     if (queue.empty())
52         return false;
53
54     m_task_t task = queue.front();
55     queue.pop();
56     msg = static_cast<message*>(MSG_task_get_data(task));
57     from = MSG_task_get_source(task);
58     MSG_task_destroy(task);
59
60     XBT_DEBUG("received %s from %s",
61               msg->to_string().c_str(), MSG_host_get_name(from));
62
63     return true;
64 }