5 #include "communicator.h"
6 #include "simgrid_features.h"
8 // XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
9 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
10 "Messages from asynchronous pipes");
14 bool comm_test_n_destroy(msg_comm_t& comm)
16 if (MSG_comm_test(comm)) {
17 MSG_comm_destroy(comm);
25 class communicator::message {
27 message(message_type t, double a): type(t), amount(a) { }
33 communicator::communicator()
35 const char* hostname = MSG_host_get_name(MSG_host_self());
36 ctrl_mbox = bprintf("%s_ctrl", hostname);
38 ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
40 data_mbox = bprintf("%s_data", hostname);
42 data_comm = MSG_task_irecv(&data_task, data_mbox);
45 communicator::~communicator()
47 // fixme: don't know how to free pending communications
48 // (data_comm, ctrl_comm and sent_comm)
54 if (!sent_comm.empty())
55 WARN1("Lost %ld send communications!", (long )sent_comm.size());
58 void communicator::send_info(const neighbor& dest, double amount)
60 message* msg = new message(INFO_MSG, amount);
61 m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg);
62 send(dest.get_ctrl_mbox(), task);
65 void communicator::send_credit(const neighbor& dest, double amount)
67 message* msg = new message(CREDIT_MSG, amount);
68 m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg);
69 send(dest.get_ctrl_mbox(), task);
72 void communicator::send_load(const neighbor& dest, double amount)
74 m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL);
75 send(dest.get_data_mbox(), task);
78 void communicator::send(const char* dest, m_task_t task)
80 sent_comm.push_back(MSG_task_isend(task, dest));
84 bool communicator::recv_info(double& amount, m_host_t& from)
86 return recv_ctrl(INFO_MSG, amount, from);
89 bool communicator::recv_credit(double& amount, m_host_t& from)
91 return recv_ctrl(CREDIT_MSG, amount, from);
94 bool communicator::recv_load(double& amount, m_host_t& from)
96 bool res = comm_test_n_destroy(data_comm);
98 amount = MSG_task_get_data_size(data_task);
99 from = MSG_task_get_source(data_task);
100 MSG_task_destroy(data_task);
102 data_comm = MSG_task_irecv(&data_task, data_mbox);
107 bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from)
109 bool res = MSG_comm_test(ctrl_comm);
111 message* msg = (message* )MSG_task_get_data(ctrl_task);
112 if (msg->type == type) {
113 MSG_comm_destroy(ctrl_comm);
114 amount = msg->amount;
115 from = MSG_task_get_source(ctrl_task);
117 MSG_task_destroy(ctrl_task);
119 ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
127 int communicator::send_backlog()
130 return sent_comm.size();
133 void communicator::flush_sent()
135 std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy);