#include <msg/msg.h>
#include <xbt/log.h>
#include "communicator.h"
+#include "simgrid_features.h"
// XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
"Messages from asynchronous pipes");
namespace {
+
bool comm_test_n_destroy(msg_comm_t& comm)
{
if (MSG_comm_test(comm)) {
} else
return false;
}
+
}
+class communicator::message {
+public:
+ message(message_type t, double a): type(t), amount(a) { }
+
+ message_type type;
+ double amount;
+};
+
communicator::communicator()
{
const char* hostname = MSG_host_get_name(MSG_host_self());
- size_t len = std::strlen(hostname);
- recv_mbox = new char[len + 1];
- strcpy(recv_mbox, hostname);
- recv_task = NULL;
- recv_comm = MSG_task_irecv(&recv_task, recv_mbox);
+ ctrl_mbox = bprintf("%s_ctrl", hostname);
+ ctrl_task = NULL;
+ ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
+
+ data_mbox = bprintf("%s_data", hostname);
+ data_task = NULL;
+ data_comm = MSG_task_irecv(&data_task, data_mbox);
}
communicator::~communicator()
{
- send_acknowledge();
+ // fixme: don't know how to free pending communications
+ // (data_comm, ctrl_comm and sent_comm)
+
+ free(data_mbox);
+ free(ctrl_mbox);
+
+ flush_sent();
if (!sent_comm.empty())
WARN1("Lost %ld send communications!", (long )sent_comm.size());
- delete[] recv_mbox;
}
+void communicator::send_info(const neighbor& dest, double amount)
+{
+ message* msg = new message(INFO_MSG, amount);
+ m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg);
+ send(dest.get_ctrl_mbox(), task);
+}
+
+void communicator::send_credit(const neighbor& dest, double amount)
+{
+ message* msg = new message(CREDIT_MSG, amount);
+ m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg);
+ send(dest.get_ctrl_mbox(), task);
+}
-void communicator::send(m_task_t task, const char *dest)
+void communicator::send_load(const neighbor& dest, double amount)
+{
+ m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL);
+ send(dest.get_data_mbox(), task);
+}
+
+void communicator::send(const char* dest, m_task_t task)
{
sent_comm.push_back(MSG_task_isend(task, dest));
- send_acknowledge();
+ flush_sent();
+}
+
+bool communicator::recv_info(double& amount, m_host_t& from)
+{
+ return recv_ctrl(INFO_MSG, amount, from);
}
-void communicator::send(m_task_t task, const std::string& dest)
+bool communicator::recv_credit(double& amount, m_host_t& from)
{
- send(task, dest.c_str());
+ return recv_ctrl(CREDIT_MSG, amount, from);
}
-m_task_t communicator::recv()
-{
- m_task_t task = NULL;
- if (comm_test_n_destroy(recv_comm)) {
- task = recv_task;
- recv_task = NULL;
- recv_comm = MSG_task_irecv(&recv_task, recv_mbox);
+bool communicator::recv_load(double& amount, m_host_t& from)
+{
+ bool res = comm_test_n_destroy(data_comm);
+ if (res) {
+ amount = MSG_task_get_data_size(data_task);
+ from = MSG_task_get_source(data_task);
+ MSG_task_destroy(data_task);
+ data_task = NULL;
+ data_comm = MSG_task_irecv(&data_task, data_mbox);
+ }
+ return res;
+}
+
+bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from)
+{
+ bool res = MSG_comm_test(ctrl_comm);
+ if (res) {
+ message* msg = (message* )MSG_task_get_data(ctrl_task);
+ if (msg->type == type) {
+ MSG_comm_destroy(ctrl_comm);
+ amount = msg->amount;
+ from = MSG_task_get_source(ctrl_task);
+ delete msg;
+ MSG_task_destroy(ctrl_task);
+ ctrl_task = NULL;
+ ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
+ } else {
+ res = false;
+ }
}
- return task;
+ return res;
}
-int communicator::sent_count()
+int communicator::send_backlog()
{
- send_acknowledge();
+ flush_sent();
return sent_comm.size();
}
-void communicator::send_acknowledge()
+void communicator::flush_sent()
{
std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy);
}