X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/1b89437671ba572a199dd4ff5766ba82720cdf03..c19a107a096f503e67217fb178fa98eb742ceb4d:/src/kernel/activity/CommImpl.cpp diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index 91e3f17297..97495307e0 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -77,7 +77,7 @@ XBT_PRIVATE simgrid::kernel::activity::ActivityImplPtr simcall_HANDLER_comm_isen other_comm->clean_fun = clean_fun; } else { other_comm->clean_fun = nullptr; - src_proc->activities_.push_back(other_comm); + src_proc->activities_.emplace_back(other_comm); } /* Setup the communication synchro */ @@ -160,7 +160,7 @@ simcall_HANDLER_comm_irecv(smx_simcall_t /*simcall*/, smx_actor_t receiver, smx_ other_comm->state_ = simgrid::kernel::activity::State::READY; other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::READY); } - receiver->activities_.push_back(other_comm); + receiver->activities_.emplace_back(other_comm); } /* Setup communication synchro */ @@ -309,7 +309,7 @@ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, simgrid::kernel::activi } if (timeout < 0.0) { - simcall->timeout_cb_ = NULL; + simcall->timeout_cb_ = nullptr; } else { simcall->timeout_cb_ = simgrid::simix::Timer::set(SIMIX_get_clock() + timeout, [simcall]() { simcall->timeout_cb_ = nullptr; @@ -408,11 +408,16 @@ CommImpl& CommImpl::detach() return *this; } +CommImpl::CommImpl(s4u::Host* from, s4u::Host* to, double bytes) : size_(bytes), detached_(true), from_(from), to_(to) +{ + state_ = State::READY; +} + CommImpl::~CommImpl() { XBT_DEBUG("Really free communication %p in state %d (detached = %d)", this, static_cast(state_), detached_); - cleanupSurf(); + cleanup_surf(); if (detached_ && state_ != State::DONE) { /* the communication has failed and was detached: @@ -430,25 +435,26 @@ CommImpl* CommImpl::start() { /* If both the sender and the receiver are already there, start the communication */ if (state_ == State::READY) { - s4u::Host* sender = src_actor_->get_host(); - s4u::Host* receiver = dst_actor_->get_host(); + from_ = from_ != nullptr ? from_ : src_actor_->get_host(); + to_ = to_ != nullptr ? to_ : dst_actor_->get_host(); - surf_action_ = surf_network_model->communicate(sender, receiver, size_, rate_); + surf_action_ = surf_network_model->communicate(from_, to_, size_, rate_); surf_action_->set_activity(this); surf_action_->set_category(get_tracing_category()); state_ = State::RUNNING; - XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", this, sender->get_cname(), - receiver->get_cname(), surf_action_); + XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p; state: %s)", this, from_->get_cname(), + to_->get_cname(), surf_action_, get_state_str()); /* If a link is failed, detect it immediately */ if (surf_action_->get_state() == resource::Action::State::FAILED) { - XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(), - receiver->get_cname()); + XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", from_->get_cname(), + to_->get_cname()); state_ = State::LINK_FAILURE; post(); - } else if (src_actor_->is_suspended() || dst_actor_->is_suspended()) { + } else if ((src_actor_ != nullptr && src_actor_->is_suspended()) || + (dst_actor_ != nullptr && dst_actor_->is_suspended())) { /* If any of the process is suspended, create the synchro but stop its execution, it will be restarted when the sender process resume */ if (src_actor_->is_suspended()) @@ -480,12 +486,12 @@ void CommImpl::copy_data() dst_actor_ ? dst_actor_->get_host()->get_cname() : "a finished process", dst_buff_, buff_size); /* Copy at most dst_buff_size bytes of the message to receiver's buffer */ - if (dst_buff_size_) + if (dst_buff_size_) { buff_size = std::min(buff_size, *(dst_buff_size_)); - /* Update the receiver's buffer size to the copied amount */ - if (dst_buff_size_) + /* Update the receiver's buffer size to the copied amount */ *dst_buff_size_ = buff_size; + } if (buff_size > 0) { if (copy_data_fun) @@ -530,7 +536,7 @@ void CommImpl::cancel() } /** @brief This is part of the cleanup process, probably an internal command */ -void CommImpl::cleanupSurf() +void CommImpl::cleanup_surf() { clean_action(); @@ -561,11 +567,11 @@ void CommImpl::post() } else state_ = State::DONE; - XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d", this, (int)state_, + XBT_DEBUG("CommImpl::post(): comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(), src_actor_.get(), dst_actor_.get(), detached_); /* destroy the surf actions associated with the Simix communication */ - cleanupSurf(); + cleanup_surf(); /* Answer all simcalls associated with the synchro */ finish(); @@ -581,9 +587,9 @@ void CommImpl::finish() * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the * simcall */ - if (simcall->call_ == SIMCALL_NONE) // FIXME: maybe a better way to handle this case - continue; // if actor handling comm is killed - if (simcall->call_ == SIMCALL_COMM_WAITANY) { + if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case + continue; // if actor handling comm is killed + if (simcall->call_ == simix::Simcall::COMM_WAITANY) { SIMIX_waitany_remove_simcall_from_actions(simcall); if (simcall->timeout_cb_) { simcall->timeout_cb_->remove(); @@ -674,15 +680,15 @@ void CommImpl::finish() } /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */ if (simcall->issuer_->exception_ && - (simcall->call_ == SIMCALL_COMM_WAITANY || simcall->call_ == SIMCALL_COMM_TESTANY)) { + (simcall->call_ == simix::Simcall::COMM_WAITANY || simcall->call_ == simix::Simcall::COMM_TESTANY)) { // First retrieve the rank of our failing synchro CommImpl** comms; size_t count; - if (simcall->call_ == SIMCALL_COMM_WAITANY) { + if (simcall->call_ == simix::Simcall::COMM_WAITANY) { comms = simcall_comm_waitany__get__comms(simcall); count = simcall_comm_waitany__get__count(simcall); } else { - /* simcall->call_ == SIMCALL_COMM_TESTANY */ + /* simcall->call_ == simix::Simcall::COMM_TESTANY */ comms = simcall_comm_testany__get__comms(simcall); count = simcall_comm_testany__get__count(simcall); } @@ -707,8 +713,10 @@ void CommImpl::finish() if (src_actor_) src_actor_->activities_.remove(this); } else { - dst_actor_->activities_.remove(this); - src_actor_->activities_.remove(this); + if (dst_actor_ != nullptr) + dst_actor_->activities_.remove(this); + if (src_actor_ != nullptr) + src_actor_->activities_.remove(this); } } }