]> AND Private Git Repository - loba.git/blob - messages.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Added Least Load Neighbors and another bulk algorithm
[loba.git] / messages.cpp
1 #include <sstream>
2 #include <xbt/log.h>
3
4 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm);
5
6 #include "misc.h"
7 #include "options.h"
8
9 #include "messages.h"
10
11 std::string message::to_string()
12 {
13     static const char* str[] = { "INFO", "CREDIT", "LOAD",
14                                  "CTRL_CLOSE", "DATA_CLOSE" };
15     std::ostringstream oss;
16     oss << str[type] << ": " << amount;
17     return oss.str();
18 }
19
20 double message::get_size() const
21 {
22     // arbitrary: 8 for type, and 8 for amount
23     double size = 16;
24     if (type == LOAD)
25         size += opt::comm_cost(amount);
26     return size;
27 }
28
29 void message_queue::push(m_task_t task)
30 {
31     mutex.acquire();
32     queue.push(task);
33     cond.signal();
34     mutex.release();
35 }
36
37 bool message_queue::pop(message*& msg, m_host_t& from, double timeout)
38 {
39     if (timeout != 0) {
40         volatile double deadline =
41             timeout > 0 ? MSG_get_clock() + timeout : 0.0;
42         mutex.acquire();
43         while (queue.empty() && (!deadline || deadline > MSG_get_clock())) {
44             xbt_ex_t e;
45             XBT_DEBUG("waiting for a message to come");
46             TRY {
47                 if (deadline)
48                     cond.timedwait(mutex, deadline - MSG_get_clock());
49                 else
50                     cond.wait(mutex);
51             }
52             CATCH (e) {
53                 if (e.category != timeout_error)
54                     RETHROW;
55                 xbt_ex_free(e);
56             }
57         }
58         mutex.release();
59     }
60
61     if (queue.empty())
62         return false;
63
64     m_task_t task = queue.front();
65     queue.pop();
66     msg = static_cast<message*>(MSG_task_get_data(task));
67     from = MSG_task_get_source(task);
68     MSG_task_destroy(task);
69
70     XBT_DEBUG("received %s from %s",
71               msg->to_string().c_str(), MSG_host_get_name(from));
72
73     return true;
74 }