}
-class communicator::message {
-public:
- message(message_type t, double a): type(t), amount(a) { }
-
- message_type type;
- double amount;
-};
-
communicator::communicator()
{
const char* hostname = MSG_host_get_name(MSG_host_self());
- ctrl_mbox = bprintf("%s_ctrl", hostname);
+ ctrl_mbox = hostname;
+ ctrl_mbox += "_ctrl";
ctrl_task = NULL;
- ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
+ ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
- data_mbox = bprintf("%s_data", hostname);
+ data_mbox = hostname;
+ data_mbox += "_data";
data_task = NULL;
- data_comm = MSG_task_irecv(&data_task, data_mbox);
+ data_comm = MSG_task_irecv(&data_task, get_data_mbox());
}
communicator::~communicator()
// fixme: don't know how to free pending communications
// (data_comm, ctrl_comm and sent_comm)
- free(data_mbox);
- free(ctrl_mbox);
-
flush_sent();
if (!sent_comm.empty())
WARN1("Lost %ld send communications!", (long )sent_comm.size());
}
-void communicator::send_info(const neighbor& dest, double amount)
-{
- message* msg = new message(INFO_MSG, amount);
- m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg);
- send(dest.get_ctrl_mbox(), task);
-}
-
-void communicator::send_credit(const neighbor& dest, double amount)
-{
- message* msg = new message(CREDIT_MSG, amount);
- m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg);
- send(dest.get_ctrl_mbox(), task);
-}
-
-void communicator::send_load(const neighbor& dest, double amount)
-{
- m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL);
- send(dest.get_data_mbox(), task);
-}
-
-void communicator::send(const char* dest, m_task_t task)
+void communicator::send(const char* dest, message* msg)
{
+ double msg_size = sizeof *msg;
+ if (msg->get_type() == message::LOAD)
+ msg_size += msg->get_amount();
+ m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);
sent_comm.push_back(MSG_task_isend(task, dest));
flush_sent();
}
-bool communicator::recv_info(double& amount, m_host_t& from)
+bool communicator::recv(message*& msg, m_host_t& from)
{
- return recv_ctrl(INFO_MSG, amount, from);
-}
+ msg = NULL;
-bool communicator::recv_credit(double& amount, m_host_t& from)
-{
- return recv_ctrl(CREDIT_MSG, amount, from);
-}
+ if (comm_test_n_destroy(ctrl_comm)) {
+ msg = (message* )MSG_task_get_data(ctrl_task);
+ from = MSG_task_get_source(ctrl_task);
+ MSG_task_destroy(ctrl_task);
+ ctrl_task = NULL;
+ ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
-bool communicator::recv_load(double& amount, m_host_t& from)
-{
- bool res = comm_test_n_destroy(data_comm);
- if (res) {
- amount = MSG_task_get_data_size(data_task);
+ } else if (comm_test_n_destroy(data_comm)) {
+ msg = (message* )MSG_task_get_data(data_task);
from = MSG_task_get_source(data_task);
MSG_task_destroy(data_task);
data_task = NULL;
- data_comm = MSG_task_irecv(&data_task, data_mbox);
+ data_comm = MSG_task_irecv(&data_task, get_data_mbox());
}
- return res;
-}
-bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from)
-{
- bool res = MSG_comm_test(ctrl_comm);
- if (res) {
- message* msg = (message* )MSG_task_get_data(ctrl_task);
- if (msg->type == type) {
- MSG_comm_destroy(ctrl_comm);
- amount = msg->amount;
- from = MSG_task_get_source(ctrl_task);
- delete msg;
- MSG_task_destroy(ctrl_task);
- ctrl_task = NULL;
- ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
- } else {
- res = false;
- }
- }
- return res;
+ return msg != NULL;
}
int communicator::send_backlog()