X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/b1020c2311048f26a586af241101434faa59445b..e71a2a302d28430dc1bfee906f842f5f3d0fa3ce:/src/s4u/s4u_Comm.cpp diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index f9c3c37dea..23eaf0d2dc 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -23,9 +23,9 @@ Comm::~Comm() { if (state_ == State::STARTED && not detached_ && (pimpl_ == nullptr || pimpl_->state_ == kernel::activity::State::RUNNING)) { - XBT_INFO("Comm %p freed before its completion. Detached: %d, State: %d", this, detached_, (int)state_); + XBT_INFO("Comm %p freed before its completion. Did you forget to detach it? (state: %s)", this, get_state_str()); if (pimpl_ != nullptr) - XBT_INFO("pimpl_->state: %d", static_cast(pimpl_->state_)); + XBT_INFO("pimpl_->state: %s", pimpl_->get_state_str()); else XBT_INFO("pimpl_ is null"); xbt_backtrace_display_current(); @@ -38,8 +38,10 @@ int Comm::wait_any_for(const std::vector* comms, double timeout) std::transform(begin(*comms), end(*comms), begin(rcomms), [](const CommPtr& comm) { return static_cast(comm->pimpl_.get()); }); int changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout); - if (changed_pos != -1) + if (changed_pos != -1) { + on_completion(*(comms->at(changed_pos))); comms->at(changed_pos)->release_dependencies(); + } return changed_pos; } @@ -114,6 +116,11 @@ CommPtr Comm::set_dst_data(void** buff, size_t size) dst_buff_size_ = size; return this; } +CommPtr Comm::set_payload_size(double bytes) +{ + Activity::set_remaining(bytes); + return this; +} CommPtr Comm::sendto_init(Host* from, Host* to) { @@ -123,13 +130,19 @@ CommPtr Comm::sendto_init(Host* from, Host* to) return res; } + CommPtr Comm::sendto_async(Host* from, Host* to, double simulated_size_in_bytes) { - auto res = Comm::sendto_init(from, to); - res->set_remaining(simulated_size_in_bytes)->start(); + auto res = Comm::sendto_init(from, to)->set_payload_size(simulated_size_in_bytes); + res->vetoable_start(); return res; } +void Comm::sendto(Host* from, Host* to, double simulated_size_in_bytes) +{ + sendto_async(from, to, simulated_size_in_bytes)->wait(); +} + Comm* Comm::start() { xbt_assert(get_state() == State::INITED || get_state() == State::STARTING, @@ -139,7 +152,7 @@ Comm* Comm::start() xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr, "Direct host-to-host communications cannot carry any data."); pimpl_ = kernel::actor::simcall([this] { - auto res = new kernel::activity::CommImpl(this->from_, this->to_, this->get_remaining()); + kernel::activity::CommImplPtr res(new kernel::activity::CommImpl(this->from_, this->to_, this->get_remaining())); res->start(); return res; }); @@ -186,7 +199,7 @@ Comm* Comm::wait_for(double timeout) case State::INITED: case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication if (from_ != nullptr || to_ != nullptr) { - return start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls + return vetoable_start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls } else if (src_buff_ != nullptr) { on_start(*this, true /*is_sender*/); simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_, @@ -230,9 +243,9 @@ int Comm::test_any(const std::vector* comms) Comm* Comm::detach() { - xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)", - __FUNCTION__); - xbt_assert(src_buff_ != nullptr && src_buff_size_ != 0, "You can only detach sends, not recvs"); + xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication is %s (not implemented)", + __FUNCTION__, get_state_str()); + xbt_assert(dst_buff_ == nullptr && dst_buff_size_ == 0, "You can only detach sends, not recvs"); detached_ = true; vetoable_start(); return this;