]> AND Private Git Repository - loba.git/blob - communicator.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Wip++...
[loba.git] / communicator.cpp
1 #include <algorithm>
2 #include <cstring>
3 #include <msg/msg.h>
4 #include <xbt/log.h>
5 #include "communicator.h"
6 #include "simgrid_features.h"
7
8 // XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
9 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
10                                 "Messages from asynchronous pipes");
11
12 namespace {
13
14     bool comm_test_n_destroy(msg_comm_t& comm)
15     {
16         if (MSG_comm_test(comm)) {
17             MSG_comm_destroy(comm);
18             return true;
19         } else
20             return false;
21     }
22
23 }
24
25 class communicator::message  {
26 public:
27     message(message_type t, double a): type(t), amount(a) { }
28
29     message_type type;
30     double amount;
31 };
32
33 communicator::communicator()
34 {
35     const char* hostname = MSG_host_get_name(MSG_host_self());
36     ctrl_mbox = bprintf("%s_ctrl", hostname);
37     ctrl_task = NULL;
38     ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
39
40     data_mbox = bprintf("%s_data", hostname);
41     data_task = NULL;
42     data_comm = MSG_task_irecv(&data_task, data_mbox);
43 }
44
45 communicator::~communicator()
46 {
47     // fixme: don't know how to free pending communications
48     // (data_comm, ctrl_comm and sent_comm)
49
50     free(data_mbox);
51     free(ctrl_mbox);
52
53     flush_sent();
54     if (!sent_comm.empty())
55         WARN1("Lost %ld send communications!", (long )sent_comm.size());
56 }
57
58 void communicator::send_info(const neighbor& dest, double amount)
59 {
60     message* msg = new message(INFO_MSG, amount);
61     m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg);
62     send(dest.get_ctrl_mbox(), task);
63 }
64
65 void communicator::send_credit(const neighbor& dest, double amount)
66 {
67     message* msg = new message(CREDIT_MSG, amount);
68     m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg);
69     send(dest.get_ctrl_mbox(), task);
70 }
71
72 void communicator::send_load(const neighbor& dest, double amount)
73 {
74     m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL);
75     send(dest.get_data_mbox(), task);
76 }
77
78 void communicator::send(const char* dest, m_task_t task)
79 {
80     sent_comm.push_back(MSG_task_isend(task, dest));
81     flush_sent();
82 }
83
84 bool communicator::recv_info(double& amount, m_host_t& from)
85 {
86     return recv_ctrl(INFO_MSG, amount, from);
87 }
88
89 bool communicator::recv_credit(double& amount, m_host_t& from)
90 {
91     return recv_ctrl(CREDIT_MSG, amount, from);
92 }
93
94 bool communicator::recv_load(double& amount, m_host_t& from)
95 {
96     bool res = comm_test_n_destroy(data_comm);
97     if (res) {
98         amount = MSG_task_get_data_size(data_task);
99         from = MSG_task_get_source(data_task);
100         MSG_task_destroy(data_task);
101         data_task = NULL;
102         data_comm = MSG_task_irecv(&data_task, data_mbox);
103     }
104     return res;
105 }
106
107 bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from)
108 {
109     bool res = MSG_comm_test(ctrl_comm);
110     if (res) {
111         message* msg = (message* )MSG_task_get_data(ctrl_task);
112         if (msg->type == type) {
113             MSG_comm_destroy(ctrl_comm);
114             amount = msg->amount;
115             from = MSG_task_get_source(ctrl_task);
116             delete msg;
117             MSG_task_destroy(ctrl_task);
118             ctrl_task = NULL;
119             ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
120         } else {
121             res = false;
122         }
123     }
124     return res;
125 }
126
127 int communicator::send_backlog()
128 {
129     flush_sent();
130     return sent_comm.size();
131 }
132
133 void communicator::flush_sent()
134 {
135     std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy);
136 }
137
138 // Local variables:
139 // mode: c++
140 // End: