1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include <boost/range/algorithm.hpp>
12 #include <simgrid/s4u/host.hpp>
15 #include "simgrid/s4u/Mailbox.hpp"
16 #include "src/mc/mc_replay.h"
17 #include "src/simix/smx_private.h"
18 #include "src/surf/cpu_interface.hpp"
19 #include "src/surf/surf_interface.hpp"
23 #include "src/kernel/activity/SynchroComm.hpp"
24 #include "src/surf/network_interface.hpp"
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
28 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
29 static void SIMIX_comm_copy_data(smx_activity_t comm);
30 static void SIMIX_comm_start(smx_activity_t synchro);
31 static smx_activity_t _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t> *deque, e_smx_comm_type_t type,
32 int (*match_fun)(void *, void *,smx_activity_t), void *user_data, smx_activity_t my_synchro, bool remove_matching);
35 * \brief Checks if there is a communication synchro queued in a deque matching our needs
36 * \param type The type of communication we are looking for (comm_send, comm_recv)
37 * \return The communication synchro if found, nullptr otherwise
39 static smx_activity_t _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t> *deque, e_smx_comm_type_t type,
40 int (*match_fun)(void *, void *,smx_activity_t), void *this_user_data, smx_activity_t my_synchro, bool remove_matching)
42 void* other_user_data = nullptr;
44 for(auto it = deque->begin(); it != deque->end(); it++){
45 smx_activity_t synchro = *it;
46 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
48 if (comm->type == SIMIX_COMM_SEND) {
49 other_user_data = comm->src_data;
50 } else if (comm->type == SIMIX_COMM_RECEIVE) {
51 other_user_data = comm->dst_data;
53 if (comm->type == type &&
54 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
55 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
56 XBT_DEBUG("Found a matching communication synchro %p", comm);
61 comm->mbox_cpy = comm->mbox;
66 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
67 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
68 comm, (int)comm->type, (int)type);
70 XBT_DEBUG("No matching communication synchro found");
74 /******************************************************************************/
75 /* Communication synchros */
76 /******************************************************************************/
77 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox,
78 double task_size, double rate,
79 void *src_buff, size_t src_buff_size,
80 int (*match_fun)(void *, void *,smx_activity_t),
81 void (*copy_data_fun)(smx_activity_t, void*, size_t),
82 void *data, double timeout){
83 smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
84 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
86 SIMCALL_SET_MC_VALUE(simcall, 0);
87 simcall_HANDLER_comm_wait(simcall, comm, timeout);
89 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_actor_t src_proc, smx_mailbox_t mbox,
90 double task_size, double rate,
91 void *src_buff, size_t src_buff_size,
92 int (*match_fun)(void *, void *,smx_activity_t),
93 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
94 void (*copy_data_fun)(smx_activity_t, void*, size_t),// used to copy data if not default one
95 void *data, int detached)
97 XBT_DEBUG("send from %p", mbox);
99 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
100 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
102 /* Look for communication synchro matching our needs. We also provide a description of
103 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
105 * If it is not found then push our communication into the rendez-vous point */
106 smx_activity_t other_synchro =
107 _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
108 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
111 if (!other_synchro) {
112 other_synchro = this_synchro;
113 other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
115 if (mbox->permanent_receiver!=nullptr){
116 //this mailbox is for small messages, which have to be sent right now
117 other_synchro->state = SIMIX_READY;
118 other_comm->dst_proc=mbox->permanent_receiver.get();
120 mbox->done_comm_queue.push_back(other_synchro);
121 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
124 mbox->push(this_synchro);
127 XBT_DEBUG("Receive already pushed");
128 this_synchro->unref();
130 other_comm->state = SIMIX_READY;
131 other_comm->type = SIMIX_COMM_READY;
134 xbt_fifo_push(src_proc->comms, other_synchro);
138 other_comm->detached = true;
139 other_comm->clean_fun = clean_fun;
141 other_comm->clean_fun = nullptr;
144 /* Setup the communication synchro */
145 other_comm->src_proc = src_proc;
146 other_comm->task_size = task_size;
147 other_comm->rate = rate;
148 other_comm->src_buff = src_buff;
149 other_comm->src_buff_size = src_buff_size;
150 other_comm->src_data = data;
152 other_comm->match_fun = match_fun;
153 other_comm->copy_data_fun = copy_data_fun;
156 if (MC_is_active() || MC_record_replay_is_active()) {
157 other_comm->state = SIMIX_RUNNING;
158 return (detached ? nullptr : other_comm);
161 SIMIX_comm_start(other_comm);
162 return (detached ? nullptr : other_comm);
165 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
166 void *dst_buff, size_t *dst_buff_size,
167 int (*match_fun)(void *, void *, smx_activity_t),
168 void (*copy_data_fun)(smx_activity_t, void*, size_t),
169 void *data, double timeout, double rate)
171 smx_activity_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
172 SIMCALL_SET_MC_VALUE(simcall, 0);
173 simcall_HANDLER_comm_wait(simcall, comm, timeout);
176 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
177 void *dst_buff, size_t *dst_buff_size,
178 int (*match_fun)(void *, void *, smx_activity_t),
179 void (*copy_data_fun)(smx_activity_t, void*, size_t),
180 void *data, double rate)
182 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
185 smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
186 int (*match_fun)(void *, void *, smx_activity_t),
187 void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
188 void *data, double rate)
190 XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
191 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
193 smx_activity_t other_synchro;
194 //communication already done, get it inside the fifo of completed comms
195 if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
197 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
198 //find a match in the already received fifo
199 other_synchro = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
200 //if not found, assume the receiver came first, register it to the mailbox in the classical way
201 if (!other_synchro) {
202 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
203 other_synchro = this_synchro;
204 mbox->push(this_synchro);
206 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
208 if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
209 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
210 other_comm->state = SIMIX_DONE;
211 other_comm->type = SIMIX_COMM_DONE;
212 other_comm->mbox = nullptr;
215 static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
218 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
220 /* Look for communication synchro matching our needs. We also provide a description of
221 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
223 * If it is not found then push our communication into the rendez-vous point */
224 other_synchro = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
226 if (!other_synchro) {
227 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
228 other_synchro = this_synchro;
229 mbox->push(this_synchro);
231 this_synchro->unref();
232 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
234 other_comm->state = SIMIX_READY;
235 other_comm->type = SIMIX_COMM_READY;
237 xbt_fifo_push(dst_proc->comms, other_synchro);
240 /* Setup communication synchro */
241 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
242 other_comm->dst_proc = dst_proc;
243 other_comm->dst_buff = dst_buff;
244 other_comm->dst_buff_size = dst_buff_size;
245 other_comm->dst_data = data;
247 if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate))
248 other_comm->rate = rate;
250 other_comm->match_fun = match_fun;
251 other_comm->copy_data_fun = copy_data_fun;
253 if (MC_is_active() || MC_record_replay_is_active()) {
254 other_synchro->state = SIMIX_RUNNING;
255 return other_synchro;
258 SIMIX_comm_start(other_synchro);
259 return other_synchro;
262 smx_activity_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
263 int type, int src, int tag,
264 int (*match_fun)(void *, void *, smx_activity_t),
266 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
269 smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int type, int src,
270 int tag, int (*match_fun)(void *, void *, smx_activity_t), void *data)
272 XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
273 simgrid::kernel::activity::Comm* this_comm;
276 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
277 smx_type = SIMIX_COMM_RECEIVE;
279 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
280 smx_type = SIMIX_COMM_SEND;
282 smx_activity_t other_synchro=nullptr;
283 if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
284 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
285 other_synchro = _find_matching_comm(&mbox->done_comm_queue,
286 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
289 XBT_DEBUG("check if we have more luck in the normal mailbox");
290 other_synchro = _find_matching_comm(&mbox->comm_queue,
291 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
295 other_synchro->unref();
298 return other_synchro;
301 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout)
303 /* Associate this simcall to the wait synchro */
304 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
306 synchro->simcalls.push_back(simcall);
307 simcall->issuer->waiting_synchro = synchro;
309 if (MC_is_active() || MC_record_replay_is_active()) {
310 int idx = SIMCALL_GET_MC_VALUE(simcall);
312 synchro->state = SIMIX_DONE;
314 /* If we reached this point, the wait simcall must have a timeout */
315 /* Otherwise it shouldn't be enabled and executed by the MC */
319 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
320 if (comm->src_proc == simcall->issuer)
321 comm->state = SIMIX_SRC_TIMEOUT;
323 comm->state = SIMIX_DST_TIMEOUT;
326 SIMIX_comm_finish(synchro);
330 /* If the synchro has already finish perform the error handling, */
331 /* otherwise set up a waiting timeout on the right side */
332 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
333 SIMIX_comm_finish(synchro);
334 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
335 surf_action_t sleep = simcall->issuer->host->pimpl_cpu->sleep(timeout);
336 sleep->setData(synchro);
338 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
339 if (simcall->issuer == comm->src_proc)
340 comm->src_timeout = sleep;
342 comm->dst_timeout = sleep;
346 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro)
348 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
350 if (MC_is_active() || MC_record_replay_is_active()){
351 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
352 if (simcall_comm_test__get__result(simcall)){
353 synchro->state = SIMIX_DONE;
354 synchro->simcalls.push_back(simcall);
355 SIMIX_comm_finish(synchro);
357 SIMIX_simcall_answer(simcall);
362 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
363 if (simcall_comm_test__get__result(simcall)) {
364 synchro->simcalls.push_back(simcall);
365 SIMIX_comm_finish(synchro);
367 SIMIX_simcall_answer(simcall);
371 void simcall_HANDLER_comm_testany(
372 smx_simcall_t simcall, simgrid::kernel::activity::ActivityImpl* comms[], size_t count)
374 // The default result is -1 -- this means, "nothing is ready".
375 // It can be changed below, but only if something matches.
376 simcall_comm_testany__set__result(simcall, -1);
378 if (MC_is_active() || MC_record_replay_is_active()){
379 int idx = SIMCALL_GET_MC_VALUE(simcall);
381 SIMIX_simcall_answer(simcall);
383 simgrid::kernel::activity::ActivityImpl* synchro = comms[idx];
384 simcall_comm_testany__set__result(simcall, idx);
385 synchro->simcalls.push_back(simcall);
386 synchro->state = SIMIX_DONE;
387 SIMIX_comm_finish(synchro);
392 for (std::size_t i = 0; i != count; ++i) {
393 simgrid::kernel::activity::ActivityImpl* synchro = comms[i];
394 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
395 simcall_comm_testany__set__result(simcall, i);
396 synchro->simcalls.push_back(simcall);
397 SIMIX_comm_finish(synchro);
401 SIMIX_simcall_answer(simcall);
404 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
406 smx_activity_t synchro;
407 unsigned int cursor = 0;
409 if (MC_is_active() || MC_record_replay_is_active()){
411 xbt_die("Timeout not implemented for waitany in the model-checker");
412 int idx = SIMCALL_GET_MC_VALUE(simcall);
413 synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
414 synchro->simcalls.push_back(simcall);
415 simcall_comm_waitany__set__result(simcall, idx);
416 synchro->state = SIMIX_DONE;
417 SIMIX_comm_finish(synchro);
422 simcall->timer = NULL;
424 simcall->timer = SIMIX_timer_set(SIMIX_get_clock() + timeout, [simcall]() {
425 SIMIX_waitany_remove_simcall_from_actions(simcall);
426 simcall_comm_waitany__set__result(simcall, -1);
427 SIMIX_simcall_answer(simcall);
431 xbt_dynar_foreach(synchros, cursor, synchro){
432 /* associate this simcall to the the synchro */
433 synchro->simcalls.push_back(simcall);
435 /* see if the synchro is already finished */
436 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
437 SIMIX_comm_finish(synchro);
443 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
445 smx_activity_t synchro;
446 unsigned int cursor = 0;
447 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
449 xbt_dynar_foreach(synchros, cursor, synchro) {
450 // Remove the first occurence of simcall:
451 auto i = boost::range::find(synchro->simcalls, simcall);
452 if (i != synchro->simcalls.end())
453 synchro->simcalls.erase(i);
458 * \brief Starts the simulation of a communication synchro.
459 * \param synchro the communication synchro
461 static inline void SIMIX_comm_start(smx_activity_t synchro)
463 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
465 /* If both the sender and the receiver are already there, start the communication */
466 if (synchro->state == SIMIX_READY) {
468 simgrid::s4u::Host* sender = comm->src_proc->host;
469 simgrid::s4u::Host* receiver = comm->dst_proc->host;
471 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sender->cname(), receiver->cname());
473 comm->surf_comm = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate);
474 comm->surf_comm->setData(synchro);
475 comm->state = SIMIX_RUNNING;
477 /* If a link is failed, detect it immediately */
478 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
479 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->cname(),
481 comm->state = SIMIX_LINK_FAILURE;
485 /* If any of the process is suspend, create the synchro but stop its execution,
486 it will be restarted when the sender process resume */
487 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
488 if (SIMIX_process_is_suspended(comm->src_proc))
489 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the "
491 comm->src_proc->cname(), comm->src_proc->host->cname());
493 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the "
495 comm->dst_proc->cname(), comm->dst_proc->host->cname());
497 comm->surf_comm->suspend();
503 * \brief Answers the SIMIX simcalls associated to a communication synchro.
504 * \param synchro a finished communication synchro
506 void SIMIX_comm_finish(smx_activity_t synchro)
508 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
509 unsigned int destroy_count = 0;
511 while (!synchro->simcalls.empty()) {
512 smx_simcall_t simcall = synchro->simcalls.front();
513 synchro->simcalls.pop_front();
515 /* If a waitany simcall is waiting for this synchro to finish, then remove
516 it from the other synchros in the waitany list. Afterwards, get the
517 position of the actual synchro in the waitany dynar and
518 return it as the result of the simcall */
520 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
521 continue; // if process handling comm is killed
522 if (simcall->call == SIMCALL_COMM_WAITANY) {
523 SIMIX_waitany_remove_simcall_from_actions(simcall);
524 if (simcall->timer) {
525 SIMIX_timer_remove(simcall->timer);
526 simcall->timer = nullptr;
528 if (!MC_is_active() && !MC_record_replay_is_active())
529 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
532 /* If the synchro is still in a rendez-vous point then remove from it */
534 comm->mbox->remove(synchro);
536 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
538 /* Check out for errors */
540 if (simcall->issuer->host->isOff()) {
541 simcall->issuer->context->iwannadie = 1;
542 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
544 switch (synchro->state) {
547 XBT_DEBUG("Communication %p complete!", synchro);
548 SIMIX_comm_copy_data(synchro);
551 case SIMIX_SRC_TIMEOUT:
552 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
555 case SIMIX_DST_TIMEOUT:
556 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
559 case SIMIX_SRC_HOST_FAILURE:
560 if (simcall->issuer == comm->src_proc)
561 simcall->issuer->context->iwannadie = 1;
562 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
564 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
567 case SIMIX_DST_HOST_FAILURE:
568 if (simcall->issuer == comm->dst_proc)
569 simcall->issuer->context->iwannadie = 1;
570 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
572 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
575 case SIMIX_LINK_FAILURE:
578 "Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
579 synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr,
580 comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer,
582 if (comm->src_proc == simcall->issuer) {
583 XBT_DEBUG("I'm source");
584 } else if (comm->dst_proc == simcall->issuer) {
585 XBT_DEBUG("I'm dest");
587 XBT_DEBUG("I'm neither source nor dest");
589 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
593 if (simcall->issuer == comm->dst_proc)
594 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
596 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
600 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
604 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
605 if (simcall->issuer->exception) {
606 // In order to modify the exception we have to rethrow it:
608 std::rethrow_exception(simcall->issuer->exception);
611 if (simcall->call == SIMCALL_COMM_WAITANY) {
612 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
614 else if (simcall->call == SIMCALL_COMM_TESTANY) {
616 auto comms = simcall_comm_testany__get__comms(simcall);
617 auto count = simcall_comm_testany__get__count(simcall);
618 auto element = std::find(comms, comms + count, synchro);
619 if (element == comms + count)
622 e.value = element - comms;
624 simcall->issuer->exception = std::make_exception_ptr(e);
631 if (simcall->issuer->host->isOff()) {
632 simcall->issuer->context->iwannadie = 1;
635 simcall->issuer->waiting_synchro = nullptr;
636 xbt_fifo_remove(simcall->issuer->comms, synchro);
638 if(simcall->issuer == comm->src_proc){
640 xbt_fifo_remove(comm->dst_proc->comms, synchro);
642 if(simcall->issuer == comm->dst_proc){
644 xbt_fifo_remove(comm->src_proc->comms, synchro);
645 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
649 SIMIX_simcall_answer(simcall);
653 while (destroy_count-- > 0)
654 static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
657 /******************************************************************************/
658 /* SIMIX_comm_copy_data callbacks */
659 /******************************************************************************/
660 static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
662 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
664 SIMIX_comm_copy_data_callback = callback;
667 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
669 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
671 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
672 *(void **) (comm->dst_buff) = buff;
675 void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
677 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
679 XBT_DEBUG("Copy the data over");
680 memcpy(comm->dst_buff, buff, buff_size);
681 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
683 comm->src_buff = nullptr;
689 * @brief Copy the communication data from the sender's buffer to the receiver's one
690 * @param synchro The communication
692 void SIMIX_comm_copy_data(smx_activity_t synchro)
694 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
696 size_t buff_size = comm->src_buff_size;
697 /* If there is no data to copy then return */
698 if (!comm->src_buff || !comm->dst_buff || comm->copied)
701 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm,
702 comm->src_proc ? comm->src_proc->host->cname() : "a finished process", comm->src_buff,
703 comm->dst_proc ? comm->dst_proc->host->cname() : "a finished process", comm->dst_buff, buff_size);
705 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
706 if (comm->dst_buff_size)
707 buff_size = MIN(buff_size, *(comm->dst_buff_size));
709 /* Update the receiver's buffer size to the copied amount */
710 if (comm->dst_buff_size)
711 *comm->dst_buff_size = buff_size;
714 if(comm->copy_data_fun)
715 comm->copy_data_fun (comm, comm->src_buff, buff_size);
717 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
721 /* Set the copied flag so we copy data only once */
722 /* (this function might be called from both communication ends) */