]> AND Private Git Repository - loba.git/blob - messages.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Use a lock-free synchronized queue for the message queue.
[loba.git] / messages.cpp
1 #include <sstream>
2 #include <xbt/log.h>
3
4 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm);
5
6 #include "misc.h"
7 #include "options.h"
8
9 #include "messages.h"
10
11 std::string message::to_string()
12 {
13     static const char* str[] = { "INFO", "CREDIT", "LOAD",
14                                  "CTRL_CLOSE", "DATA_CLOSE" };
15     std::ostringstream oss;
16     oss << str[type] << ": " << amount;
17     return oss.str();
18 }
19
20 double message::get_size() const
21 {
22     // arbitrary: 8 for type, and 8 for amount
23     double size = 16;
24     if (type == LOAD)
25         size += opt::comm_cost(amount);
26     return size;
27 }
28
29 void message_queue::push(m_task_t task)
30 {
31     if (queue.push(task)) {
32          // list was empty, the push must be signaled
33         mutex.acquire();
34         cond.signal();
35         mutex.release();
36     }
37 }
38
39 bool message_queue::pop(message*& msg, m_host_t& from, double timeout)
40 {
41     m_task_t task;
42     if (!queue.try_pop(task)) {
43         if (timeout == 0.0)
44             return false;
45
46         mutex.acquire();
47         if (!queue.try_pop(task)) {
48             xbt_ex_t e;
49             XBT_DEBUG("waiting for a message to come");
50             TRY {
51                 if (timeout > 0)
52                     cond.timedwait(mutex, timeout);
53                 else
54                     cond.wait(mutex);
55             }
56             TRY_CLEANUP {
57                 mutex.release();
58             }
59             CATCH (e) {
60                 if (e.category != timeout_error)
61                     RETHROW;
62                 xbt_ex_free(e);
63                 return false;   // got a timeout
64             }
65             bool pop_was_successful = queue.try_pop(task);
66             xbt_assert(pop_was_successful);
67         } else {
68             mutex.release();
69         }
70     }
71     msg = static_cast<message*>(MSG_task_get_data(task));
72     from = MSG_task_get_source(task);
73     MSG_task_destroy(task);
74
75     XBT_DEBUG("received %s from %s",
76               msg->to_string().c_str(), MSG_host_get_name(from));
77
78     return true;
79 }