X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/4c0c321f16afe6cd8f89bf17ef4cd501d7c2ec36..6cc7f1cf554d2780015c029522082e312ec21398:/src/smpi/mpi/smpi_request.cpp diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index 8d77e4b1c6..57c1acab56 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -15,6 +15,7 @@ #include "smpi_datatype.hpp" #include "smpi_host.hpp" #include "smpi_op.hpp" +#include "src/kernel/EngineImpl.hpp" #include "src/kernel/activity/CommImpl.hpp" #include "src/kernel/actor/ActorImpl.hpp" #include "src/kernel/actor/SimcallObserver.hpp" @@ -33,7 +34,7 @@ static simgrid::config::Flag smpi_test_sleep( std::vector smpi_ois_values; -extern void (*smpi_comm_copy_data_callback)(simgrid::kernel::activity::CommImpl*, void*, size_t); +extern std::function smpi_comm_copy_data_callback; namespace simgrid{ namespace smpi{ @@ -510,10 +511,20 @@ void Request::start() } if(!is_probe) flags_ &= ~MPI_REQ_PROBE; + kernel::actor::CommIrecvSimcall observer{process->get_actor()->get_impl(), + mailbox->get_impl(), + static_cast(buf_), + &real_size_, + &match_recv, + process->replaying() ? &smpi_comm_null_copy_buffer_callback + : smpi_comm_copy_data_callback, + this, + -1.0}; + observer.set_tag(tag_); + + action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); }, + &observer); - action_ = simcall_comm_irecv( - process->get_actor()->get_impl(), mailbox->get_impl(), buf_, &real_size_, &match_recv, - process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, -1.0); XBT_DEBUG("recv simcall posted"); if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0) @@ -609,13 +620,16 @@ void Request::start() } size_t payload_size_ = size_ + 16;//MPI enveloppe size (tag+dest+communicator) - action_ = simcall_comm_isend( - simgrid::kernel::actor::ActorImpl::by_pid(src_), mailbox->get_impl(), payload_size_, -1.0, buf, real_size_, - &match_send, + kernel::actor::CommIsendSimcall observer{ + simgrid::kernel::EngineImpl::get_instance()->get_actor_by_pid(src_), mailbox->get_impl(), + static_cast(payload_size_), -1, static_cast(buf), real_size_, &match_send, &xbt_free_f, // how to free the userdata if a detached send fails process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, // detach if msg size < eager/rdv switch limit - detached_); + detached_}; + observer.set_tag(tag_); + action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); }, + &observer); XBT_DEBUG("send simcall posted"); /* FIXME: detached sends are not traceable (action_ == nullptr) */ @@ -673,8 +687,8 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) { try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()}; - *flag = kernel::actor::simcall_blocking([&observer] { observer.get_activity()->test(observer.get_issuer()); }, - &observer); + *flag = kernel::actor::simcall_answered( + [&observer] { return observer.get_activity()->test(observer.get_issuer()); }, &observer); } catch (const Exception&) { *flag = 0; return ret; @@ -763,8 +777,10 @@ int Request::testany(int count, MPI_Request requests[], int *index, int* flag, M try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); kernel::actor::ActivityTestanySimcall observer{issuer, comms}; - i = kernel::actor::simcall_blocking( - [&observer] { kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); }, + i = kernel::actor::simcall_answered( + [&observer] { + return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); + }, &observer); } catch (const Exception&) { XBT_DEBUG("Exception in testany"); @@ -1077,9 +1093,8 @@ int Request::wait(MPI_Request * request, MPI_Status * status) // this is not a detached send kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1}; - kernel::actor::simcall_blocking( - [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); }, - &observer); + kernel::actor::simcall_blocking([issuer, &observer] { observer.get_activity()->wait_for(issuer, -1); }, + &observer); } catch (const CancelException&) { XBT_VERB("Request cancelled"); }