From: SUTER Frederic Date: Tue, 8 Feb 2022 10:57:07 +0000 (+0100) Subject: simplify calls to CommImpl::isend and CommImpl::irecv X-Git-Tag: v3.31~467 X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/18627870cf7119153d3f07f122ecef0b314623bb simplify calls to CommImpl::isend and CommImpl::irecv --- diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index f8f4db78fb..eb45116210 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -25,8 +25,9 @@ XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t sr void (*copy_data_fun)(simgrid::kernel::activity::CommImpl*, void*, size_t), void* data, double timeout) { - simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend( - src, mbox, task_size, rate, src_buff, src_buff_size, match_fun, nullptr, copy_data_fun, data, false); + simgrid::kernel::actor::CommIsendSimcall observer(src, mbox, task_size, rate, src_buff, src_buff_size, match_fun, + nullptr, copy_data_fun, data, false); + simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend(&observer); simcall->mc_value_ = 0; comm->wait_for(simcall->issuer_, timeout); } @@ -39,8 +40,9 @@ XBT_PRIVATE simgrid::kernel::activity::ActivityImplPtr simcall_HANDLER_comm_isen void (*copy_data_fun)(simgrid::kernel::activity::CommImpl*, void*, size_t), // used to copy data if not default one void* data, bool detached) { - return simgrid::kernel::activity::CommImpl::isend(src_proc, mbox, task_size, rate, src_buff, src_buff_size, match_fun, - clean_fun, copy_data_fun, data, detached); + simgrid::kernel::actor::CommIsendSimcall observer(src_proc, mbox, task_size, rate, src_buff, src_buff_size, match_fun, + nullptr, copy_data_fun, data, false); + return simgrid::kernel::activity::CommImpl::isend(&observer); } XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox, @@ -49,8 +51,9 @@ XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t re void (*copy_data_fun)(simgrid::kernel::activity::CommImpl*, void*, size_t), void* data, double timeout, double rate) { - simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv( - receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate); + simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, + data, rate); + simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv(&observer); simcall->mc_value_ = 0; comm->wait_for(simcall->issuer_, timeout); } @@ -61,8 +64,9 @@ simcall_HANDLER_comm_irecv(smx_simcall_t /*simcall*/, smx_actor_t receiver, smx_ void (*copy_data_fun)(simgrid::kernel::activity::CommImpl*, void*, size_t), void* data, double rate) { - return simgrid::kernel::activity::CommImpl::irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, + simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate); + return simgrid::kernel::activity::CommImpl::irecv(&observer); } void simcall_HANDLER_comm_wait(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comm, double timeout) @@ -261,13 +265,9 @@ void CommImpl::copy_data() copied_ = true; } -ActivityImplPtr -CommImpl::isend(actor::ActorImpl* sender, MailboxImpl* mbox, double task_size, double rate, unsigned char* src_buff, - size_t src_buff_size, bool (*match_fun)(void*, void*, CommImpl*), - void (*clean_fun)(void*), // used to free the synchro in case of problem after a detached send - void (*copy_data_fun)(CommImpl*, void*, size_t), // used to copy data if not default one - void* data, bool detached) +ActivityImplPtr CommImpl::isend(actor::CommIsendSimcall* observer) { + auto* mbox = observer->get_mailbox(); XBT_DEBUG("send from mailbox %p", mbox); /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */ @@ -277,8 +277,9 @@ CommImpl::isend(actor::ActorImpl* sender, MailboxImpl* mbox, double task_size, d * ourself so that the other side also gets a chance of choosing if it wants to match with us. * * If it is not found then push our communication into the rendez-vous point */ - CommImplPtr other_comm = mbox->find_matching_comm(CommImpl::Type::RECEIVE, match_fun, data, this_comm, - /*done*/ false, /*remove_matching*/ true); + CommImplPtr other_comm = + mbox->find_matching_comm(CommImpl::Type::RECEIVE, observer->match_fun_, observer->get_payload(), this_comm, + /*done*/ false, /*remove_matching*/ true); if (not other_comm) { other_comm = std::move(this_comm); @@ -299,35 +300,37 @@ CommImpl::isend(actor::ActorImpl* sender, MailboxImpl* mbox, double task_size, d other_comm->set_state(State::READY); } - if (detached) { + if (observer->is_detached()) { other_comm->detach(); - other_comm->clean_fun = clean_fun; + other_comm->clean_fun = observer->clean_fun_; } else { other_comm->clean_fun = nullptr; - sender->activities_.emplace_back(other_comm); + observer->get_issuer()->activities_.emplace_back(other_comm); } /* Setup the communication synchro */ - other_comm->src_actor_ = sender; - other_comm->src_data_ = data; - (*other_comm).set_src_buff(src_buff, src_buff_size).set_size(task_size).set_rate(rate); + other_comm->src_actor_ = observer->get_issuer(); + other_comm->src_data_ = observer->get_payload(); + (*other_comm) + .set_src_buff(observer->get_src_buff(), observer->get_src_buff_size()) + .set_size(observer->get_payload_size()) + .set_rate(observer->get_rate()); - other_comm->match_fun = match_fun; - other_comm->copy_data_fun = copy_data_fun; + other_comm->match_fun = observer->match_fun_; + other_comm->copy_data_fun = observer->copy_data_fun_; if (MC_is_active() || MC_record_replay_is_active()) other_comm->set_state(simgrid::kernel::activity::State::RUNNING); else other_comm->start(); - return (detached ? nullptr : other_comm); + return (observer->is_detached() ? nullptr : other_comm); } -ActivityImplPtr CommImpl::irecv(actor::ActorImpl* receiver, MailboxImpl* mbox, unsigned char* dst_buff, - size_t* dst_buff_size, bool (*match_fun)(void*, void*, CommImpl*), - void (*copy_data_fun)(CommImpl*, void*, size_t), void* data, double rate) +ActivityImplPtr CommImpl::irecv(actor::CommIrecvSimcall* observer) { CommImplPtr this_synchro(new CommImpl(CommImpl::Type::RECEIVE)); + auto* mbox = observer->get_mailbox(); XBT_DEBUG("recv from mbox %p. this_synchro=%p", mbox, this_synchro.get()); CommImplPtr other_comm; @@ -335,8 +338,8 @@ ActivityImplPtr CommImpl::irecv(actor::ActorImpl* receiver, MailboxImpl* mbox, u if (mbox->is_permanent() && mbox->has_some_done_comm()) { XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication"); // find a match in the list of already received comms - other_comm = mbox->find_matching_comm(CommImpl::Type::SEND, match_fun, data, this_synchro, /*done*/ true, - /*remove_matching*/ true); + other_comm = mbox->find_matching_comm(CommImpl::Type::SEND, observer->match_fun_, observer->get_payload(), + this_synchro, /*done*/ true, /*remove_matching*/ true); // if not found, assume the receiver came first, register it to the mailbox in the classical way if (not other_comm) { XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request " @@ -357,8 +360,8 @@ ActivityImplPtr CommImpl::irecv(actor::ActorImpl* receiver, MailboxImpl* mbox, u * ourself so that the other side also gets a chance of choosing if it wants to match with us. * * If it is not found then push our communication into the rendez-vous point */ - other_comm = mbox->find_matching_comm(CommImpl::Type::SEND, match_fun, data, this_synchro, /*done*/ false, - /*remove_matching*/ true); + other_comm = mbox->find_matching_comm(CommImpl::Type::SEND, observer->match_fun_, observer->get_payload(), + this_synchro, /*done*/ false, /*remove_matching*/ true); if (other_comm == nullptr) { XBT_DEBUG("Receive pushed first (%zu comm enqueued so far)", mbox->size()); @@ -369,19 +372,19 @@ ActivityImplPtr CommImpl::irecv(actor::ActorImpl* receiver, MailboxImpl* mbox, u other_comm->set_state(simgrid::kernel::activity::State::READY); } - receiver->activities_.emplace_back(other_comm); + observer->get_issuer()->activities_.emplace_back(other_comm); } /* Setup communication synchro */ - other_comm->dst_actor_ = receiver; - other_comm->dst_data_ = data; - other_comm->set_dst_buff(dst_buff, dst_buff_size); + other_comm->dst_actor_ = observer->get_issuer(); + other_comm->dst_data_ = observer->get_payload(); + other_comm->set_dst_buff(observer->get_dst_buff(), observer->get_dst_buff_size()); - if (rate > -1.0 && (other_comm->get_rate() < 0.0 || rate < other_comm->get_rate())) - other_comm->set_rate(rate); + if (observer->get_rate() > -1.0 && (other_comm->get_rate() < 0.0 || observer->get_rate() < other_comm->get_rate())) + other_comm->set_rate(observer->get_rate()); - other_comm->match_fun = match_fun; - other_comm->copy_data_fun = copy_data_fun; + other_comm->match_fun = observer->match_fun_; + other_comm->copy_data_fun = observer->copy_data_fun_; if (MC_is_active() || MC_record_replay_is_active()) { other_comm->set_state(State::RUNNING); diff --git a/src/kernel/activity/CommImpl.hpp b/src/kernel/activity/CommImpl.hpp index 5b131f079f..8e5ea1337a 100644 --- a/src/kernel/activity/CommImpl.hpp +++ b/src/kernel/activity/CommImpl.hpp @@ -8,6 +8,7 @@ #include "src/kernel/activity/ActivityImpl.hpp" #include "src/kernel/actor/ActorImpl.hpp" +#include "src/kernel/actor/SimcallObserver.hpp" namespace simgrid { namespace kernel { @@ -49,15 +50,8 @@ public: std::vector get_traversed_links() const; void copy_data(); - static ActivityImplPtr - isend(actor::ActorImpl* src, MailboxImpl* mbox, double task_size, double rate, unsigned char* src_buff, - size_t src_buff_size, bool (*match_fun)(void*, void*, CommImpl*), - void (*clean_fun)(void*), // used to free the synchro in case of problem after a detached send - void (*copy_data_fun)(CommImpl*, void*, size_t), // used to copy data if not default one - void* data, bool detached); - static ActivityImplPtr irecv(actor::ActorImpl* receiver, MailboxImpl* mbox, unsigned char* dst_buff, - size_t* dst_buff_size, bool (*match_fun)(void*, void*, CommImpl*), - void (*copy_data_fun)(CommImpl*, void*, size_t), void* data, double rate); + static ActivityImplPtr isend(actor::CommIsendSimcall* observer); + static ActivityImplPtr irecv(actor::CommIrecvSimcall* observer); bool test(actor::ActorImpl* issuer) override; void wait_for(actor::ActorImpl* issuer, double timeout) override; diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index c034a25ab4..cd8ea25a73 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -230,14 +230,7 @@ Comm* Comm::start() copy_data_function_, get_data(), detached_}; - pimpl_ = kernel::actor::simcall( - [&observer] { - return kernel::activity::CommImpl::isend( - observer.get_issuer(), observer.get_mailbox(), observer.get_payload_size(), observer.get_rate(), - observer.get_src_buff(), observer.get_src_buff_size(), observer.match_fun_, observer.clean_fun_, - observer.copy_data_fun_, observer.get_payload(), observer.is_detached()); - }, - &observer); + pimpl_ = kernel::actor::simcall([&observer] { return kernel::activity::CommImpl::isend(&observer); }, &observer); } else if (dst_buff_ != nullptr) { // Receiver side xbt_assert(not detached_, "Receive cannot be detached"); on_recv(*this); @@ -249,13 +242,7 @@ Comm* Comm::start() copy_data_function_, get_data(), rate_}; - pimpl_ = kernel::actor::simcall( - [&observer] { - return kernel::activity::CommImpl::irecv( - observer.get_issuer(), observer.get_mailbox(), observer.get_dst_buff(), observer.get_dst_buff_size(), - observer.match_fun_, observer.copy_data_fun_, observer.get_payload(), observer.get_rate()); - }, - &observer); + pimpl_ = kernel::actor::simcall([&observer] { return kernel::activity::CommImpl::irecv(&observer); }, &observer); } else { xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver"); } diff --git a/src/simix/libsmx.cpp b/src/simix/libsmx.cpp index bd350aeab4..0dd298f2b9 100644 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@ -49,14 +49,7 @@ void simcall_comm_send(smx_actor_t sender, smx_mailbox_t mbox, double task_size, sender, mbox, task_size, rate, static_cast(src_buff), src_buff_size, match_fun, nullptr, copy_data_fun, data, false}; comm = simgrid::kernel::actor::simcall( - [&send_observer] { - return simgrid::kernel::activity::CommImpl::isend( - send_observer.get_issuer(), send_observer.get_mailbox(), send_observer.get_payload_size(), - send_observer.get_rate(), send_observer.get_src_buff(), send_observer.get_src_buff_size(), - send_observer.match_fun_, send_observer.clean_fun_, send_observer.copy_data_fun_, - send_observer.get_payload(), send_observer.is_detached()); - }, - &send_observer); + [&send_observer] { return simgrid::kernel::activity::CommImpl::isend(&send_observer); }, &send_observer); simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout}; if (simgrid::kernel::actor::simcall_blocking( @@ -111,12 +104,7 @@ void simcall_comm_recv(smx_actor_t receiver, smx_mailbox_t mbox, void* dst_buff, simgrid::kernel::actor::CommIrecvSimcall observer{ receiver, mbox, static_cast(dst_buff), dst_buff_size, match_fun, copy_data_fun, data, rate}; comm = simgrid::kernel::actor::simcall( - [&observer] { - return simgrid::kernel::activity::CommImpl::irecv( - observer.get_issuer(), observer.get_mailbox(), observer.get_dst_buff(), observer.get_dst_buff_size(), - observer.match_fun_, observer.copy_data_fun_, observer.get_payload(), observer.get_rate()); - }, - &observer); + [&observer] { return simgrid::kernel::activity::CommImpl::irecv(&observer); }, &observer); simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout}; if (simgrid::kernel::actor::simcall_blocking( diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index 5d8551d15d..b690a145a2 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -520,13 +520,7 @@ void Request::start() this, -1.0}; - action_ = kernel::actor::simcall( - [&observer] { - return kernel::activity::CommImpl::irecv( - observer.get_issuer(), observer.get_mailbox(), observer.get_dst_buff(), observer.get_dst_buff_size(), - observer.match_fun_, observer.copy_data_fun_, observer.get_payload(), observer.get_rate()); - }, - &observer); + action_ = kernel::actor::simcall([&observer] { return kernel::activity::CommImpl::irecv(&observer); }, &observer); XBT_DEBUG("recv simcall posted"); @@ -630,14 +624,7 @@ void Request::start() process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, // detach if msg size < eager/rdv switch limit detached_}; - action_ = kernel::actor::simcall( - [&observer] { - return kernel::activity::CommImpl::isend( - observer.get_issuer(), observer.get_mailbox(), observer.get_payload_size(), observer.get_rate(), - observer.get_src_buff(), observer.get_src_buff_size(), observer.match_fun_, observer.clean_fun_, - observer.copy_data_fun_, observer.get_payload(), observer.is_detached()); - }, - &observer); + action_ = kernel::actor::simcall([&observer] { return kernel::activity::CommImpl::isend(&observer); }, &observer); XBT_DEBUG("send simcall posted"); /* FIXME: detached sends are not traceable (action_ == nullptr) */