}
-class communicator::message {
-public:
- message(message_type t, double a): type(t), amount(a) { }
-
- message_type type;
- double amount;
-};
-
communicator::communicator()
{
const char* hostname = MSG_host_get_name(MSG_host_self());
- ctrl_mbox = bprintf("%s_ctrl", hostname);
+ ctrl_mbox = hostname;
+ ctrl_mbox += "_ctrl";
ctrl_task = NULL;
- ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
+ ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
- data_mbox = bprintf("%s_data", hostname);
+ data_mbox = hostname;
+ data_mbox += "_data";
data_task = NULL;
- data_comm = MSG_task_irecv(&data_task, data_mbox);
+ data_comm = MSG_task_irecv(&data_task, get_data_mbox());
}
communicator::~communicator()
// fixme: don't know how to free pending communications
// (data_comm, ctrl_comm and sent_comm)
- free(data_mbox);
- free(ctrl_mbox);
-
flush_sent();
if (!sent_comm.empty())
WARN1("Lost %ld send communications!", (long )sent_comm.size());
}
-void communicator::send_info(const neighbor& dest, double amount)
-{
- message* msg = new message(INFO_MSG, amount);
- m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg);
- send(dest.get_ctrl_mbox(), task);
-}
-
-void communicator::send_credit(const neighbor& dest, double amount)
-{
- message* msg = new message(CREDIT_MSG, amount);
- m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg);
- send(dest.get_ctrl_mbox(), task);
-}
-
-void communicator::send_load(const neighbor& dest, double amount)
-{
- m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL);
- send(dest.get_data_mbox(), task);
-}
-
-void communicator::send(const char* dest, m_task_t task)
+void communicator::send(const char* dest, message* msg)
{
+ double msg_size = sizeof *msg;
+ if (msg->get_type() == message::LOAD)
+ msg_size += msg->get_amount();
+ m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);
sent_comm.push_back(MSG_task_isend(task, dest));
flush_sent();
}
-bool communicator::recv_info(double& amount, m_host_t& from)
+bool communicator::recv(message*& msg, m_host_t& from)
{
- return recv_ctrl(INFO_MSG, amount, from);
-}
+ msg = NULL;
-bool communicator::recv_credit(double& amount, m_host_t& from)
-{
- return recv_ctrl(CREDIT_MSG, amount, from);
-}
+ if (comm_test_n_destroy(ctrl_comm)) {
+ msg = (message* )MSG_task_get_data(ctrl_task);
+ from = MSG_task_get_source(ctrl_task);
+ MSG_task_destroy(ctrl_task);
+ ctrl_task = NULL;
+ ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
-bool communicator::recv_load(double& amount, m_host_t& from)
-{
- bool res = comm_test_n_destroy(data_comm);
- if (res) {
- amount = MSG_task_get_data_size(data_task);
+ } else if (comm_test_n_destroy(data_comm)) {
+ msg = (message* )MSG_task_get_data(data_task);
from = MSG_task_get_source(data_task);
MSG_task_destroy(data_task);
data_task = NULL;
- data_comm = MSG_task_irecv(&data_task, data_mbox);
+ data_comm = MSG_task_irecv(&data_task, get_data_mbox());
}
- return res;
-}
-bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from)
-{
- bool res = MSG_comm_test(ctrl_comm);
- if (res) {
- message* msg = (message* )MSG_task_get_data(ctrl_task);
- if (msg->type == type) {
- MSG_comm_destroy(ctrl_comm);
- amount = msg->amount;
- from = MSG_task_get_source(ctrl_task);
- delete msg;
- MSG_task_destroy(ctrl_task);
- ctrl_task = NULL;
- ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
- } else {
- res = false;
- }
- }
- return res;
+ return msg != NULL;
}
int communicator::send_backlog()
#define COMMUNICATOR_H
#include <list>
-
+#include <string>
#include <msg/msg.h>
-#include <xbt/sysdep.h>
-#include "neighbor.h"
+
+class message {
+public:
+ enum message_type { INFO, CREDIT, LOAD, CLOSE };
+
+ message(message_type t, double a): type(t), amount(a) { }
+
+ message_type get_type() const { return type; }
+ double get_amount() const { return amount; }
+
+private:
+ message_type type;
+ double amount;
+};
class communicator {
public:
communicator();
~communicator();
- void send_info(const neighbor& dest, double amount);
- void send_credit(const neighbor& dest, double amount);
- void send_load(const neighbor& dest, double amount);
-
- bool recv_info(double& amount, m_host_t& from);
- bool recv_credit(double& amount, m_host_t& from);
- bool recv_load(double& amount, m_host_t& from);
+ void send(const char* dest, message* msg);
+ bool recv(message*& msg, m_host_t& from);
int send_backlog();
private:
- enum message_type { INFO_MSG, CREDIT_MSG };
- class message;
-
// List of pending send communications
std::list<msg_comm_t> sent_comm;
// Control channel for receiving
- char* ctrl_mbox;
+ std::string ctrl_mbox;
msg_comm_t ctrl_comm;
m_task_t ctrl_task;
// Data channel for receiving
- char* data_mbox;
+ std::string data_mbox;
msg_comm_t data_comm;
m_task_t data_task;
- bool recv_ctrl(message_type type, double& amount, m_host_t& from);
- void send(const char* dest, m_task_t task);
+ const char* get_ctrl_mbox() const { return ctrl_mbox.c_str(); }
+ const char* get_data_mbox() const { return data_mbox.c_str(); }
void flush_sent();
};
* compute load balancing;
* send tasks to neighbors;
* }
+ * finalize;
+ * wait for pending messages;
*/
/* Open Questions :
void process::receive()
{
- bool received;
- do {
- double amount;
- m_host_t from;
- received = false;
- if (comm.recv_info(amount, from)) {
+ message* msg;
+ m_host_t from;
+ while (comm.recv(msg, from)) {
+ switch (msg->get_type()) {
+ case message::INFO:
// fixme: update neighbor
- received = true;
+ break;
+ case message::CREDIT:
+ expected_load += msg->get_amount();
+ break;
+ case message::LOAD:
+ load += msg->get_amount();
+ break;
}
- if (comm.recv_credit(amount, from)) {
- expected_load += amount;
- received = true;
- }
- if (comm.recv_load(amount, from)) {
- load += amount;
- received = true;
- }
- } while (received);
+ delete msg;
+ }
}
void process::compute()
MSG_task_destroy(task);
}
+void process::finalize()
+{
+ // fixme
+}
+
void process::print_loads(e_xbt_log_priority_t logp)
{
if (!LOG_ISENABLED(logp))