- simgrid::kernel::activity::CommImplPtr this_synchro =
- simgrid::kernel::activity::CommImplPtr(new simgrid::kernel::activity::CommImpl());
- this_synchro->set_type(simgrid::kernel::activity::CommImpl::Type::RECEIVE);
- XBT_DEBUG("recv from mbox %p. this_synchro=%p", mbox, this_synchro.get());
-
- simgrid::kernel::activity::CommImplPtr other_comm;
- // communication already done, get it inside the list of completed comms
- if (mbox->permanent_receiver_ != nullptr && not mbox->done_comm_queue_.empty()) {
- XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
- // find a match in the list of already received comms
- other_comm = mbox->find_matching_comm(simgrid::kernel::activity::CommImpl::Type::SEND, match_fun, data,
- this_synchro, /*done*/ true,
- /*remove_matching*/ true);
- // if not found, assume the receiver came first, register it to the mailbox in the classical way
- if (not other_comm) {
- XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request "
- "into list");
- other_comm = std::move(this_synchro);
- mbox->push(other_comm);
- } else {
- if (other_comm->surf_action_ && other_comm->get_remaining() < 1e-12) {
- XBT_DEBUG("comm %p has been already sent, and is finished, destroy it", other_comm.get());
- other_comm->state_ = simgrid::kernel::activity::State::DONE;
- other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::DONE).set_mailbox(nullptr);
- }
- }
- } else {
- /* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */
-
- /* Look for communication activity matching our needs. We also provide a description of
- * ourself so that the other side also gets a chance of choosing if it wants to match with us.
- *
- * If it is not found then push our communication into the rendez-vous point */
- other_comm = mbox->find_matching_comm(simgrid::kernel::activity::CommImpl::Type::SEND, match_fun, data,
- this_synchro, /*done*/ false,
- /*remove_matching*/ true);
-
- if (other_comm == nullptr) {
- XBT_DEBUG("Receive pushed first (%zu comm enqueued so far)", mbox->comm_queue_.size());
- other_comm = std::move(this_synchro);
- mbox->push(other_comm);
- } else {
- XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_comm.get());
-
- other_comm->state_ = simgrid::kernel::activity::State::READY;
- other_comm->set_type(simgrid::kernel::activity::CommImpl::Type::READY);
- }
- receiver->activities_.push_back(other_comm);
- }
-
- /* Setup communication synchro */
- other_comm->dst_actor_ = receiver;
- other_comm->dst_data_ = data;
- other_comm->set_dst_buff(dst_buff, dst_buff_size);
-
- if (rate > -1.0 && (other_comm->get_rate() < 0.0 || rate < other_comm->get_rate()))
- other_comm->set_rate(rate);
-
- other_comm->match_fun = match_fun;
- other_comm->copy_data_fun = copy_data_fun;
-
- if (MC_is_active() || MC_record_replay_is_active()) {
- other_comm->state_ = simgrid::kernel::activity::State::RUNNING;
- return other_comm;
- }
- other_comm->start();
- return other_comm;
-}
-
-void simcall_HANDLER_comm_wait(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comm, double timeout)
-{
- /* Associate this simcall to the wait synchro */
- XBT_DEBUG("simcall_HANDLER_comm_wait, %p", comm);
-
- comm->register_simcall(simcall);
-
- if (MC_is_active() || MC_record_replay_is_active()) {
- int idx = SIMCALL_GET_MC_VALUE(*simcall);
- if (idx == 0) {
- comm->state_ = simgrid::kernel::activity::State::DONE;
- } else {
- /* If we reached this point, the wait simcall must have a timeout */
- /* Otherwise it shouldn't be enabled and executed by the MC */
- if (timeout < 0.0)
- THROW_IMPOSSIBLE;
-
- if (comm->src_actor_ == simcall->issuer_)
- comm->state_ = simgrid::kernel::activity::State::SRC_TIMEOUT;
- else
- comm->state_ = simgrid::kernel::activity::State::DST_TIMEOUT;
- }
-
- comm->finish();
- return;
- }
-
- /* If the synchro has already finish perform the error handling, */
- /* otherwise set up a waiting timeout on the right side */
- if (comm->state_ != simgrid::kernel::activity::State::WAITING &&
- comm->state_ != simgrid::kernel::activity::State::RUNNING) {
- comm->finish();
- } else { /* we need a sleep action (even when there is no timeout) to be notified of host failures */
- simgrid::kernel::resource::Action* sleep = simcall->issuer_->get_host()->pimpl_cpu->sleep(timeout);
- sleep->set_activity(comm);
-
- if (simcall->issuer_ == comm->src_actor_)
- comm->src_timeout_ = sleep;
- else
- comm->dst_timeout_ = sleep;
- }
-}
-
-void simcall_HANDLER_comm_test(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comm)
-{
- bool res;
-
- if (MC_is_active() || MC_record_replay_is_active()) {
- res = comm->src_actor_ && comm->dst_actor_;
- if (res)
- comm->state_ = simgrid::kernel::activity::State::DONE;
- } else {
- res = comm->state_ != simgrid::kernel::activity::State::WAITING &&
- comm->state_ != simgrid::kernel::activity::State::RUNNING;
- }
-
- simcall_comm_test__set__result(simcall, res);
- if (res) {
- comm->simcalls_.push_back(simcall);
- comm->finish();
- } else {
- simcall->issuer_->simcall_answer();
- }
-}
-
-void simcall_HANDLER_comm_testany(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comms[], size_t count)
-{
- // The default result is -1 -- this means, "nothing is ready".
- // It can be changed below, but only if something matches.
- simcall_comm_testany__set__result(simcall, -1);
-
- if (MC_is_active() || MC_record_replay_is_active()) {
- int idx = SIMCALL_GET_MC_VALUE(*simcall);
- if (idx == -1) {
- simcall->issuer_->simcall_answer();
- } else {
- simgrid::kernel::activity::CommImpl* comm = comms[idx];
- simcall_comm_testany__set__result(simcall, idx);
- comm->simcalls_.push_back(simcall);
- comm->state_ = simgrid::kernel::activity::State::DONE;
- comm->finish();
- }
- return;
- }
-
- for (std::size_t i = 0; i != count; ++i) {
- simgrid::kernel::activity::CommImpl* comm = comms[i];
- if (comm->state_ != simgrid::kernel::activity::State::WAITING &&
- comm->state_ != simgrid::kernel::activity::State::RUNNING) {
- simcall_comm_testany__set__result(simcall, i);
- comm->simcalls_.push_back(simcall);
- comm->finish();
- return;
- }
- }
- simcall->issuer_->simcall_answer();
-}
-
-static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
-{
- simgrid::kernel::activity::CommImpl** comms = simcall_comm_waitany__get__comms(simcall);
- size_t count = simcall_comm_waitany__get__count(simcall);
-
- for (size_t i = 0; i < count; i++) {
- // Remove the first occurrence of simcall:
- auto* comm = comms[i];
- auto j = boost::range::find(comm->simcalls_, simcall);
- if (j != comm->simcalls_.end())
- comm->simcalls_.erase(j);
- }
-}
-void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comms[], size_t count,
- double timeout)
-{
- if (MC_is_active() || MC_record_replay_is_active()) {
- if (timeout > 0.0)
- xbt_die("Timeout not implemented for waitany in the model-checker");
- int idx = SIMCALL_GET_MC_VALUE(*simcall);
- auto* comm = comms[idx];
- comm->simcalls_.push_back(simcall);
- simcall_comm_waitany__set__result(simcall, idx);
- comm->state_ = simgrid::kernel::activity::State::DONE;
- comm->finish();
- return;
- }
-
- if (timeout < 0.0) {
- simcall->timeout_cb_ = NULL;
- } else {
- simcall->timeout_cb_ = simgrid::simix::Timer::set(SIMIX_get_clock() + timeout, [simcall]() {
- simcall->timeout_cb_ = nullptr;
- SIMIX_waitany_remove_simcall_from_actions(simcall);
- simcall_comm_waitany__set__result(simcall, -1);
- simcall->issuer_->simcall_answer();
- });
- }
-
- for (size_t i = 0; i < count; i++) {
- /* associate this simcall to the the synchro */
- auto* comm = comms[i];
- comm->simcalls_.push_back(simcall);
-
- /* see if the synchro is already finished */
- if (comm->state_ != simgrid::kernel::activity::State::WAITING &&
- comm->state_ != simgrid::kernel::activity::State::RUNNING) {
- comm->finish();
- break;
- }
- }
-}
-
-/******************************************************************************/
-/* SIMIX_comm_copy_data callbacks */
-/******************************************************************************/
-static void (*SIMIX_comm_copy_data_callback)(simgrid::kernel::activity::CommImpl*, void*,
- size_t) = &SIMIX_comm_copy_pointer_callback;
-
-void SIMIX_comm_copy_buffer_callback(simgrid::kernel::activity::CommImpl* comm, void* buff, size_t buff_size)
-{
- XBT_DEBUG("Copy the data over");
- memcpy(comm->dst_buff_, buff, buff_size);
- if (comm->detached()) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the
- // original buffer available to the application ASAP
- xbt_free(buff);
- comm->src_buff_ = nullptr;
- }
-}
-
-void SIMIX_comm_set_copy_data_callback(void (*callback)(simgrid::kernel::activity::CommImpl*, void*, size_t))
-{
- SIMIX_comm_copy_data_callback = callback;