void (*copy_data_fun)(simgrid::kernel::activity::CommImpl*, void*, size_t),
void* data, double timeout)
{
- simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend(
- src, mbox, task_size, rate, src_buff, src_buff_size, match_fun, nullptr, copy_data_fun, data, false);
+ simgrid::kernel::actor::CommIsendSimcall observer(src, mbox, task_size, rate, src_buff, src_buff_size, match_fun,
+ nullptr, copy_data_fun, data, false);
+ simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend(&observer);
simcall->mc_value_ = 0;
comm->wait_for(simcall->issuer_, timeout);
}
void (*copy_data_fun)(simgrid::kernel::activity::CommImpl*, void*, size_t), // used to copy data if not default one
void* data, bool detached)
{
- return simgrid::kernel::activity::CommImpl::isend(src_proc, mbox, task_size, rate, src_buff, src_buff_size, match_fun,
- clean_fun, copy_data_fun, data, detached);
+ simgrid::kernel::actor::CommIsendSimcall observer(src_proc, mbox, task_size, rate, src_buff, src_buff_size, match_fun,
+ nullptr, copy_data_fun, data, false);
+ return simgrid::kernel::activity::CommImpl::isend(&observer);
}
XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
void (*copy_data_fun)(simgrid::kernel::activity::CommImpl*, void*, size_t),
void* data, double timeout, double rate)
{
- simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv(
- receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
+ simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun,
+ data, rate);
+ simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv(&observer);
simcall->mc_value_ = 0;
comm->wait_for(simcall->issuer_, timeout);
}
void (*copy_data_fun)(simgrid::kernel::activity::CommImpl*, void*, size_t), void* data,
double rate)
{
- return simgrid::kernel::activity::CommImpl::irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun,
+ simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun,
data, rate);
+ return simgrid::kernel::activity::CommImpl::irecv(&observer);
}
void simcall_HANDLER_comm_wait(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comm, double timeout)
copied_ = true;
}
-ActivityImplPtr
-CommImpl::isend(actor::ActorImpl* sender, MailboxImpl* mbox, double task_size, double rate, unsigned char* src_buff,
- size_t src_buff_size, bool (*match_fun)(void*, void*, CommImpl*),
- void (*clean_fun)(void*), // used to free the synchro in case of problem after a detached send
- void (*copy_data_fun)(CommImpl*, void*, size_t), // used to copy data if not default one
- void* data, bool detached)
+ActivityImplPtr CommImpl::isend(actor::CommIsendSimcall* observer)
{
+ auto* mbox = observer->get_mailbox();
XBT_DEBUG("send from mailbox %p", mbox);
/* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
* ourself so that the other side also gets a chance of choosing if it wants to match with us.
*
* If it is not found then push our communication into the rendez-vous point */
- CommImplPtr other_comm = mbox->find_matching_comm(CommImpl::Type::RECEIVE, match_fun, data, this_comm,
- /*done*/ false, /*remove_matching*/ true);
+ CommImplPtr other_comm =
+ mbox->find_matching_comm(CommImpl::Type::RECEIVE, observer->match_fun_, observer->get_payload(), this_comm,
+ /*done*/ false, /*remove_matching*/ true);
if (not other_comm) {
other_comm = std::move(this_comm);
other_comm->set_state(State::READY);
}
- if (detached) {
+ if (observer->is_detached()) {
other_comm->detach();
- other_comm->clean_fun = clean_fun;
+ other_comm->clean_fun = observer->clean_fun_;
} else {
other_comm->clean_fun = nullptr;
- sender->activities_.emplace_back(other_comm);
+ observer->get_issuer()->activities_.emplace_back(other_comm);
}
/* Setup the communication synchro */
- other_comm->src_actor_ = sender;
- other_comm->src_data_ = data;
- (*other_comm).set_src_buff(src_buff, src_buff_size).set_size(task_size).set_rate(rate);
+ other_comm->src_actor_ = observer->get_issuer();
+ other_comm->src_data_ = observer->get_payload();
+ (*other_comm)
+ .set_src_buff(observer->get_src_buff(), observer->get_src_buff_size())
+ .set_size(observer->get_payload_size())
+ .set_rate(observer->get_rate());
- other_comm->match_fun = match_fun;
- other_comm->copy_data_fun = copy_data_fun;
+ other_comm->match_fun = observer->match_fun_;
+ other_comm->copy_data_fun = observer->copy_data_fun_;
if (MC_is_active() || MC_record_replay_is_active())
other_comm->set_state(simgrid::kernel::activity::State::RUNNING);
else
other_comm->start();
- return (detached ? nullptr : other_comm);
+ return (observer->is_detached() ? nullptr : other_comm);
}
-ActivityImplPtr CommImpl::irecv(actor::ActorImpl* receiver, MailboxImpl* mbox, unsigned char* dst_buff,
- size_t* dst_buff_size, bool (*match_fun)(void*, void*, CommImpl*),
- void (*copy_data_fun)(CommImpl*, void*, size_t), void* data, double rate)
+ActivityImplPtr CommImpl::irecv(actor::CommIrecvSimcall* observer)
{
CommImplPtr this_synchro(new CommImpl(CommImpl::Type::RECEIVE));
+ auto* mbox = observer->get_mailbox();
XBT_DEBUG("recv from mbox %p. this_synchro=%p", mbox, this_synchro.get());
CommImplPtr other_comm;
if (mbox->is_permanent() && mbox->has_some_done_comm()) {
XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
// find a match in the list of already received comms
- other_comm = mbox->find_matching_comm(CommImpl::Type::SEND, match_fun, data, this_synchro, /*done*/ true,
- /*remove_matching*/ true);
+ other_comm = mbox->find_matching_comm(CommImpl::Type::SEND, observer->match_fun_, observer->get_payload(),
+ this_synchro, /*done*/ true, /*remove_matching*/ true);
// if not found, assume the receiver came first, register it to the mailbox in the classical way
if (not other_comm) {
XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request "
* ourself so that the other side also gets a chance of choosing if it wants to match with us.
*
* If it is not found then push our communication into the rendez-vous point */
- other_comm = mbox->find_matching_comm(CommImpl::Type::SEND, match_fun, data, this_synchro, /*done*/ false,
- /*remove_matching*/ true);
+ other_comm = mbox->find_matching_comm(CommImpl::Type::SEND, observer->match_fun_, observer->get_payload(),
+ this_synchro, /*done*/ false, /*remove_matching*/ true);
if (other_comm == nullptr) {
XBT_DEBUG("Receive pushed first (%zu comm enqueued so far)", mbox->size());
other_comm->set_state(simgrid::kernel::activity::State::READY);
}
- receiver->activities_.emplace_back(other_comm);
+ observer->get_issuer()->activities_.emplace_back(other_comm);
}
/* Setup communication synchro */
- other_comm->dst_actor_ = receiver;
- other_comm->dst_data_ = data;
- other_comm->set_dst_buff(dst_buff, dst_buff_size);
+ other_comm->dst_actor_ = observer->get_issuer();
+ other_comm->dst_data_ = observer->get_payload();
+ other_comm->set_dst_buff(observer->get_dst_buff(), observer->get_dst_buff_size());
- if (rate > -1.0 && (other_comm->get_rate() < 0.0 || rate < other_comm->get_rate()))
- other_comm->set_rate(rate);
+ if (observer->get_rate() > -1.0 && (other_comm->get_rate() < 0.0 || observer->get_rate() < other_comm->get_rate()))
+ other_comm->set_rate(observer->get_rate());
- other_comm->match_fun = match_fun;
- other_comm->copy_data_fun = copy_data_fun;
+ other_comm->match_fun = observer->match_fun_;
+ other_comm->copy_data_fun = observer->copy_data_fun_;
if (MC_is_active() || MC_record_replay_is_active()) {
other_comm->set_state(State::RUNNING);
sender, mbox, task_size, rate, static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
nullptr, copy_data_fun, data, false};
comm = simgrid::kernel::actor::simcall(
- [&send_observer] {
- return simgrid::kernel::activity::CommImpl::isend(
- send_observer.get_issuer(), send_observer.get_mailbox(), send_observer.get_payload_size(),
- send_observer.get_rate(), send_observer.get_src_buff(), send_observer.get_src_buff_size(),
- send_observer.match_fun_, send_observer.clean_fun_, send_observer.copy_data_fun_,
- send_observer.get_payload(), send_observer.is_detached());
- },
- &send_observer);
+ [&send_observer] { return simgrid::kernel::activity::CommImpl::isend(&send_observer); }, &send_observer);
simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout};
if (simgrid::kernel::actor::simcall_blocking(
simgrid::kernel::actor::CommIrecvSimcall observer{
receiver, mbox, static_cast<unsigned char*>(dst_buff), dst_buff_size, match_fun, copy_data_fun, data, rate};
comm = simgrid::kernel::actor::simcall(
- [&observer] {
- return simgrid::kernel::activity::CommImpl::irecv(
- observer.get_issuer(), observer.get_mailbox(), observer.get_dst_buff(), observer.get_dst_buff_size(),
- observer.match_fun_, observer.copy_data_fun_, observer.get_payload(), observer.get_rate());
- },
- &observer);
+ [&observer] { return simgrid::kernel::activity::CommImpl::irecv(&observer); }, &observer);
simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout};
if (simgrid::kernel::actor::simcall_blocking(