X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/blobdiff_plain/f29b38ef2a056daa14bbfda2fce78063faa773d4..f4125505064e3ff346b31ab9e48f894672e5a7a7:/communicator.cpp diff --git a/communicator.cpp b/communicator.cpp index b78b1c3..da5e2b5 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -3,12 +3,14 @@ #include #include #include "communicator.h" +#include "simgrid_features.h" // XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu); XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu, "Messages from asynchronous pipes"); namespace { + bool comm_test_n_destroy(msg_comm_t& comm) { if (MSG_comm_test(comm)) { @@ -17,56 +19,72 @@ namespace { } else return false; } + } communicator::communicator() { const char* hostname = MSG_host_get_name(MSG_host_self()); - size_t len = std::strlen(hostname); - recv_mbox = new char[len + 1]; - strcpy(recv_mbox, hostname); - recv_task = NULL; - recv_comm = MSG_task_irecv(&recv_task, recv_mbox); + ctrl_mbox = hostname; + ctrl_mbox += "_ctrl"; + ctrl_task = NULL; + ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox()); + + data_mbox = hostname; + data_mbox += "_data"; + data_task = NULL; + data_comm = MSG_task_irecv(&data_task, get_data_mbox()); } communicator::~communicator() { - send_acknowledge(); + // fixme: don't know how to free pending communications + // (data_comm, ctrl_comm and sent_comm) + + flush_sent(); if (!sent_comm.empty()) WARN1("Lost %ld send communications!", (long )sent_comm.size()); - delete[] recv_mbox; } - -void communicator::send(m_task_t task, const char *dest) +void communicator::send(const char* dest, message* msg) { + double msg_size = sizeof *msg; + if (msg->get_type() == message::LOAD) + msg_size += msg->get_amount(); + m_task_t task = MSG_task_create("message", 0.0, msg_size, msg); sent_comm.push_back(MSG_task_isend(task, dest)); - send_acknowledge(); + flush_sent(); } -void communicator::send(m_task_t task, const std::string& dest) +bool communicator::recv(message*& msg, m_host_t& from) { - send(task, dest.c_str()); -} + msg = NULL; -m_task_t communicator::recv() -{ - m_task_t task = NULL; - if (comm_test_n_destroy(recv_comm)) { - task = recv_task; - recv_task = NULL; - recv_comm = MSG_task_irecv(&recv_task, recv_mbox); + if (comm_test_n_destroy(ctrl_comm)) { + msg = (message* )MSG_task_get_data(ctrl_task); + from = MSG_task_get_source(ctrl_task); + MSG_task_destroy(ctrl_task); + ctrl_task = NULL; + ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox()); + + } else if (comm_test_n_destroy(data_comm)) { + msg = (message* )MSG_task_get_data(data_task); + from = MSG_task_get_source(data_task); + MSG_task_destroy(data_task); + data_task = NULL; + data_comm = MSG_task_irecv(&data_task, get_data_mbox()); } - return task; + + return msg != NULL; } -int communicator::sent_count() +int communicator::send_backlog() { - send_acknowledge(); + flush_sent(); return sent_comm.size(); } -void communicator::send_acknowledge() +void communicator::flush_sent() { std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy); }