]> AND Private Git Repository - loba.git/blob - messages.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Define class for message queue.
[loba.git] / messages.cpp
1 #include <sstream>
2
3 #include "messages.h"
4
5 std::string message::to_string()
6 {
7     static const char* str[] = { "INFO", "CREDIT", "LOAD",
8                                  "CTRL_CLOSE", "DATA_CLOSE" };
9     std::ostringstream oss;
10     oss << str[type] << ": " << amount;
11     return oss.str();
12 }
13
14 void message_queue::push(m_task_t task)
15 {
16     mutex.acquire();
17     queue.push(task);
18     cond.signal();
19     mutex.release();
20 }
21
22 bool message_queue::pop(message*& msg, m_host_t& from, double timeout)
23 {
24     if (timeout != 0) {
25         volatile double deadline =
26             timeout > 0 ? MSG_get_clock() + timeout : 0.0;
27         mutex.acquire();
28         while (queue.empty() && (!deadline || deadline > MSG_get_clock())) {
29             xbt_ex_t e;
30             TRY {
31                 if (deadline)
32                     cond.timedwait(mutex, deadline - MSG_get_clock());
33                 else
34                     cond.wait(mutex);
35             }
36             CATCH (e) {
37                 if (e.category != timeout_error)
38                     RETHROW;
39                 xbt_ex_free(e);
40             }
41         }
42         mutex.release();
43     }
44
45     if (queue.empty())
46         return false;
47
48     m_task_t task = queue.front();
49     queue.pop();
50     msg = static_cast<message*>(MSG_task_get_data(task));
51     from = MSG_task_get_source(task);
52     MSG_task_destroy(task);
53
54     return true;
55 }