X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/blobdiff_plain/f29b38ef2a056daa14bbfda2fce78063faa773d4..6f5ec5fdc42f96a8fe95f4b846b163d4dc92e0c8:/communicator.cpp?ds=inline diff --git a/communicator.cpp b/communicator.cpp index b78b1c3..93389c7 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -3,12 +3,14 @@ #include #include #include "communicator.h" +#include "simgrid_features.h" // XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu); XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu, "Messages from asynchronous pipes"); namespace { + bool comm_test_n_destroy(msg_comm_t& comm) { if (MSG_comm_test(comm)) { @@ -17,56 +19,118 @@ namespace { } else return false; } + } +class communicator::message { +public: + message(message_type t, double a): type(t), amount(a) { } + + message_type type; + double amount; +}; + communicator::communicator() { const char* hostname = MSG_host_get_name(MSG_host_self()); - size_t len = std::strlen(hostname); - recv_mbox = new char[len + 1]; - strcpy(recv_mbox, hostname); - recv_task = NULL; - recv_comm = MSG_task_irecv(&recv_task, recv_mbox); + ctrl_mbox = bprintf("%s_ctrl", hostname); + ctrl_task = NULL; + ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox); + + data_mbox = bprintf("%s_data", hostname); + data_task = NULL; + data_comm = MSG_task_irecv(&data_task, data_mbox); } communicator::~communicator() { - send_acknowledge(); + // fixme: don't know how to free pending communications + // (data_comm, ctrl_comm and sent_comm) + + free(data_mbox); + free(ctrl_mbox); + + flush_sent(); if (!sent_comm.empty()) WARN1("Lost %ld send communications!", (long )sent_comm.size()); - delete[] recv_mbox; } +void communicator::send_info(const neighbor& dest, double amount) +{ + message* msg = new message(INFO_MSG, amount); + m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg); + send(dest.get_ctrl_mbox(), task); +} + +void communicator::send_credit(const neighbor& dest, double amount) +{ + message* msg = new message(CREDIT_MSG, amount); + m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg); + send(dest.get_ctrl_mbox(), task); +} -void communicator::send(m_task_t task, const char *dest) +void communicator::send_load(const neighbor& dest, double amount) +{ + m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL); + send(dest.get_data_mbox(), task); +} + +void communicator::send(const char* dest, m_task_t task) { sent_comm.push_back(MSG_task_isend(task, dest)); - send_acknowledge(); + flush_sent(); +} + +bool communicator::recv_info(double& amount, m_host_t& from) +{ + return recv_ctrl(INFO_MSG, amount, from); } -void communicator::send(m_task_t task, const std::string& dest) +bool communicator::recv_credit(double& amount, m_host_t& from) { - send(task, dest.c_str()); + return recv_ctrl(CREDIT_MSG, amount, from); } -m_task_t communicator::recv() -{ - m_task_t task = NULL; - if (comm_test_n_destroy(recv_comm)) { - task = recv_task; - recv_task = NULL; - recv_comm = MSG_task_irecv(&recv_task, recv_mbox); +bool communicator::recv_load(double& amount, m_host_t& from) +{ + bool res = comm_test_n_destroy(data_comm); + if (res) { + amount = MSG_task_get_data_size(data_task); + from = MSG_task_get_source(data_task); + MSG_task_destroy(data_task); + data_task = NULL; + data_comm = MSG_task_irecv(&data_task, data_mbox); + } + return res; +} + +bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from) +{ + bool res = MSG_comm_test(ctrl_comm); + if (res) { + message* msg = (message* )MSG_task_get_data(ctrl_task); + if (msg->type == type) { + MSG_comm_destroy(ctrl_comm); + amount = msg->amount; + from = MSG_task_get_source(ctrl_task); + delete msg; + MSG_task_destroy(ctrl_task); + ctrl_task = NULL; + ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox); + } else { + res = false; + } } - return task; + return res; } -int communicator::sent_count() +int communicator::send_backlog() { - send_acknowledge(); + flush_sent(); return sent_comm.size(); } -void communicator::send_acknowledge() +void communicator::flush_sent() { std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy); }