1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include <boost/range/algorithm.hpp>
12 #include <simgrid/s4u/host.hpp>
14 #include "src/surf/surf_interface.hpp"
15 #include "src/simix/smx_private.h"
18 #include "src/mc/mc_replay.h"
20 #include "simgrid/s4u/mailbox.hpp"
22 #include "src/kernel/activity/SynchroComm.hpp"
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
26 static void SIMIX_mbox_free(void *data);
27 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
29 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
30 static void SIMIX_comm_copy_data(smx_synchro_t comm);
31 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
32 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
33 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
34 static void SIMIX_comm_start(smx_synchro_t synchro);
36 void SIMIX_mailbox_exit(void)
38 xbt_dict_free(&mailboxes);
41 /******************************************************************************/
42 /* Rendez-Vous Points */
43 /******************************************************************************/
45 smx_mailbox_t SIMIX_mbox_create(const char *name)
47 xbt_assert(name, "Mailboxes must have a name");
48 /* two processes may have pushed the same mbox_create simcall at the same time */
49 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
51 mbox = new simgrid::simix::Mailbox(name);
52 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
53 xbt_dict_set(mailboxes, mbox->name, mbox, nullptr);
58 void SIMIX_mbox_free(void *data)
60 XBT_DEBUG("mbox free %p", data);
61 smx_mailbox_t mbox = (smx_mailbox_t) data;
65 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
67 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
71 * \brief set the receiver of the rendez vous point to allow eager sends
72 * \param mbox The rendez-vous point
73 * \param process The receiving process
75 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
77 mbox->permanent_receiver = process;
81 * \brief Pushes a communication synchro into a rendez-vous point
82 * \param mbox The mailbox
83 * \param synchro The communication synchro
85 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
87 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
88 mbox->comm_queue.push_back(comm);
93 * \brief Removes a communication synchro from a rendez-vous point
94 * \param mbox The rendez-vous point
95 * \param synchro The communication synchro
97 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
99 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
101 comm->mbox = nullptr;
102 for (auto it = mbox->comm_queue.begin(); it != mbox->comm_queue.end(); it++)
104 mbox->comm_queue. erase(it);
107 xbt_die("Cannot remove the comm %p that is not part of the mailbox %s",comm, mbox->name);
111 * \brief Checks if there is a communication synchro queued in a deque matching our needs
112 * \param type The type of communication we are looking for (comm_send, comm_recv)
113 * \return The communication synchro if found, nullptr otherwise
115 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
116 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
118 void* other_user_data = nullptr;
120 for(auto it = deque->begin(); it != deque->end(); it++){
121 smx_synchro_t synchro = *it;
122 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
124 if (comm->type == SIMIX_COMM_SEND) {
125 other_user_data = comm->src_data;
126 } else if (comm->type == SIMIX_COMM_RECEIVE) {
127 other_user_data = comm->dst_data;
129 if (comm->type == type &&
130 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
131 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
132 XBT_DEBUG("Found a matching communication synchro %p", comm);
137 comm->mbox_cpy = comm->mbox;
139 comm->mbox = nullptr;
142 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
143 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
144 comm, (int)comm->type, (int)type);
146 XBT_DEBUG("No matching communication synchro found");
150 /******************************************************************************/
151 /* Communication synchros */
152 /******************************************************************************/
153 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
154 double task_size, double rate,
155 void *src_buff, size_t src_buff_size,
156 int (*match_fun)(void *, void *,smx_synchro_t),
157 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
158 void *data, double timeout){
159 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
160 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
162 SIMCALL_SET_MC_VALUE(simcall, 0);
163 simcall_HANDLER_comm_wait(simcall, comm, timeout);
165 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
166 double task_size, double rate,
167 void *src_buff, size_t src_buff_size,
168 int (*match_fun)(void *, void *,smx_synchro_t),
169 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
170 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
171 void *data, int detached)
173 XBT_DEBUG("send from %p", mbox);
175 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
176 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
178 /* Look for communication synchro matching our needs. We also provide a description of
179 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
181 * If it is not found then push our communication into the rendez-vous point */
182 smx_synchro_t other_synchro =
183 _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
184 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
187 if (!other_synchro) {
188 other_synchro = this_synchro;
189 other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
191 if (mbox->permanent_receiver!=nullptr){
192 //this mailbox is for small messages, which have to be sent right now
193 other_synchro->state = SIMIX_READY;
194 other_comm->dst_proc=mbox->permanent_receiver.get();
196 mbox->done_comm_queue.push_back(other_synchro);
197 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
200 SIMIX_mbox_push(mbox, this_synchro);
203 XBT_DEBUG("Receive already pushed");
204 this_synchro->unref();
206 other_comm->state = SIMIX_READY;
207 other_comm->type = SIMIX_COMM_READY;
210 xbt_fifo_push(src_proc->comms, other_synchro);
214 other_comm->detached = true;
215 other_comm->clean_fun = clean_fun;
217 other_comm->clean_fun = nullptr;
220 /* Setup the communication synchro */
221 other_comm->src_proc = src_proc;
222 other_comm->task_size = task_size;
223 other_comm->rate = rate;
224 other_comm->src_buff = src_buff;
225 other_comm->src_buff_size = src_buff_size;
226 other_comm->src_data = data;
228 other_comm->match_fun = match_fun;
229 other_comm->copy_data_fun = copy_data_fun;
232 if (MC_is_active() || MC_record_replay_is_active()) {
233 other_comm->state = SIMIX_RUNNING;
234 return (detached ? nullptr : other_comm);
237 SIMIX_comm_start(other_comm);
238 return (detached ? nullptr : other_comm);
241 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
242 void *dst_buff, size_t *dst_buff_size,
243 int (*match_fun)(void *, void *, smx_synchro_t),
244 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
245 void *data, double timeout, double rate)
247 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
248 SIMCALL_SET_MC_VALUE(simcall, 0);
249 simcall_HANDLER_comm_wait(simcall, comm, timeout);
252 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
253 void *dst_buff, size_t *dst_buff_size,
254 int (*match_fun)(void *, void *, smx_synchro_t),
255 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
256 void *data, double rate)
258 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
261 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
262 int (*match_fun)(void *, void *, smx_synchro_t),
263 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
264 void *data, double rate)
266 XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
267 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
269 smx_synchro_t other_synchro;
270 //communication already done, get it inside the fifo of completed comms
271 if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
273 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
274 //find a match in the already received fifo
275 other_synchro = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
276 //if not found, assume the receiver came first, register it to the mailbox in the classical way
277 if (!other_synchro) {
278 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
279 other_synchro = this_synchro;
280 SIMIX_mbox_push(mbox, this_synchro);
282 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
284 if(other_comm->surf_comm && other_comm->remains()==0.0) {
285 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
286 other_comm->state = SIMIX_DONE;
287 other_comm->type = SIMIX_COMM_DONE;
288 other_comm->mbox = nullptr;
291 static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
294 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
296 /* Look for communication synchro matching our needs. We also provide a description of
297 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
299 * If it is not found then push our communication into the rendez-vous point */
300 other_synchro = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
302 if (!other_synchro) {
303 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
304 other_synchro = this_synchro;
305 SIMIX_mbox_push(mbox, this_synchro);
307 this_synchro->unref();
308 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
310 other_comm->state = SIMIX_READY;
311 other_comm->type = SIMIX_COMM_READY;
313 xbt_fifo_push(dst_proc->comms, other_synchro);
316 /* Setup communication synchro */
317 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
318 other_comm->dst_proc = dst_proc;
319 other_comm->dst_buff = dst_buff;
320 other_comm->dst_buff_size = dst_buff_size;
321 other_comm->dst_data = data;
323 if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
324 other_comm->rate = rate;
326 other_comm->match_fun = match_fun;
327 other_comm->copy_data_fun = copy_data_fun;
329 if (MC_is_active() || MC_record_replay_is_active()) {
330 other_synchro->state = SIMIX_RUNNING;
331 return other_synchro;
334 SIMIX_comm_start(other_synchro);
335 return other_synchro;
338 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
339 int type, int src, int tag,
340 int (*match_fun)(void *, void *, smx_synchro_t),
342 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
345 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
346 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
348 XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
349 simgrid::kernel::activity::Comm* this_comm;
352 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
353 smx_type = SIMIX_COMM_RECEIVE;
355 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
356 smx_type = SIMIX_COMM_SEND;
358 smx_synchro_t other_synchro=nullptr;
359 if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
360 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
361 other_synchro = _find_matching_comm(&mbox->done_comm_queue,
362 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
365 XBT_DEBUG("check if we have more luck in the normal mailbox");
366 other_synchro = _find_matching_comm(&mbox->comm_queue,
367 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
371 other_synchro->unref();
374 return other_synchro;
377 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
379 /* Associate this simcall to the wait synchro */
380 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
382 synchro->simcalls.push_back(simcall);
383 simcall->issuer->waiting_synchro = synchro;
385 if (MC_is_active() || MC_record_replay_is_active()) {
386 int idx = SIMCALL_GET_MC_VALUE(simcall);
388 synchro->state = SIMIX_DONE;
390 /* If we reached this point, the wait simcall must have a timeout */
391 /* Otherwise it shouldn't be enabled and executed by the MC */
395 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
396 if (comm->src_proc == simcall->issuer)
397 comm->state = SIMIX_SRC_TIMEOUT;
399 comm->state = SIMIX_DST_TIMEOUT;
402 SIMIX_comm_finish(synchro);
406 /* If the synchro has already finish perform the error handling, */
407 /* otherwise set up a waiting timeout on the right side */
408 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
409 SIMIX_comm_finish(synchro);
410 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
411 surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
412 sleep->setData(synchro);
414 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
415 if (simcall->issuer == comm->src_proc)
416 comm->src_timeout = sleep;
418 comm->dst_timeout = sleep;
422 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
424 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
426 if (MC_is_active() || MC_record_replay_is_active()){
427 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
428 if (simcall_comm_test__get__result(simcall)){
429 synchro->state = SIMIX_DONE;
430 synchro->simcalls.push_back(simcall);
431 SIMIX_comm_finish(synchro);
433 SIMIX_simcall_answer(simcall);
438 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
439 if (simcall_comm_test__get__result(simcall)) {
440 synchro->simcalls.push_back(simcall);
441 SIMIX_comm_finish(synchro);
443 SIMIX_simcall_answer(simcall);
447 void simcall_HANDLER_comm_testany(
448 smx_simcall_t simcall, simgrid::kernel::activity::Synchro* comms[], size_t count)
450 // The default result is -1 -- this means, "nothing is ready".
451 // It can be changed below, but only if something matches.
452 simcall_comm_testany__set__result(simcall, -1);
454 if (MC_is_active() || MC_record_replay_is_active()){
455 int idx = SIMCALL_GET_MC_VALUE(simcall);
457 SIMIX_simcall_answer(simcall);
459 simgrid::kernel::activity::Synchro* synchro = comms[idx];
460 simcall_comm_testany__set__result(simcall, idx);
461 synchro->simcalls.push_back(simcall);
462 synchro->state = SIMIX_DONE;
463 SIMIX_comm_finish(synchro);
468 for (std::size_t i = 0; i != count; ++i) {
469 simgrid::kernel::activity::Synchro* synchro = comms[i];
470 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
471 simcall_comm_testany__set__result(simcall, i);
472 synchro->simcalls.push_back(simcall);
473 SIMIX_comm_finish(synchro);
477 SIMIX_simcall_answer(simcall);
480 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
482 smx_synchro_t synchro;
483 unsigned int cursor = 0;
485 if (MC_is_active() || MC_record_replay_is_active()){
487 xbt_die("Timeout not implemented for waitany in the model-checker");
488 int idx = SIMCALL_GET_MC_VALUE(simcall);
489 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
490 synchro->simcalls.push_back(simcall);
491 simcall_comm_waitany__set__result(simcall, idx);
492 synchro->state = SIMIX_DONE;
493 SIMIX_comm_finish(synchro);
498 simcall->timer = NULL;
500 simcall->timer = SIMIX_timer_set(timeout, [simcall]() {
501 SIMIX_waitany_remove_simcall_from_actions(simcall);
502 simcall_comm_waitany__set__result(simcall, -1);
503 SIMIX_simcall_answer(simcall);
507 xbt_dynar_foreach(synchros, cursor, synchro){
508 /* associate this simcall to the the synchro */
509 synchro->simcalls.push_back(simcall);
511 /* see if the synchro is already finished */
512 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
513 SIMIX_comm_finish(synchro);
519 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
521 smx_synchro_t synchro;
522 unsigned int cursor = 0;
523 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
525 xbt_dynar_foreach(synchros, cursor, synchro) {
526 // Remove the first occurence of simcall:
527 auto i = boost::range::find(synchro->simcalls, simcall);
528 if (i != synchro->simcalls.end())
529 synchro->simcalls.erase(i);
534 * \brief Starts the simulation of a communication synchro.
535 * \param synchro the communication synchro
537 static inline void SIMIX_comm_start(smx_synchro_t synchro)
539 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
541 /* If both the sender and the receiver are already there, start the communication */
542 if (synchro->state == SIMIX_READY) {
544 sg_host_t sender = comm->src_proc->host;
545 sg_host_t receiver = comm->dst_proc->host;
547 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
549 comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
550 comm->surf_comm->setData(synchro);
551 comm->state = SIMIX_RUNNING;
553 /* If a link is failed, detect it immediately */
554 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
555 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
556 sg_host_get_name(sender), sg_host_get_name(receiver));
557 comm->state = SIMIX_LINK_FAILURE;
561 /* If any of the process is suspend, create the synchro but stop its execution,
562 it will be restarted when the sender process resume */
563 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
564 if (SIMIX_process_is_suspended(comm->src_proc))
565 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the communication",
566 comm->src_proc->name.c_str(), sg_host_get_name(comm->src_proc->host));
568 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the communication",
569 comm->dst_proc->name.c_str(), sg_host_get_name(comm->dst_proc->host));
571 comm->surf_comm->suspend();
577 * \brief Answers the SIMIX simcalls associated to a communication synchro.
578 * \param synchro a finished communication synchro
580 void SIMIX_comm_finish(smx_synchro_t synchro)
582 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
583 unsigned int destroy_count = 0;
585 while (!synchro->simcalls.empty()) {
586 smx_simcall_t simcall = synchro->simcalls.front();
587 synchro->simcalls.pop_front();
589 /* If a waitany simcall is waiting for this synchro to finish, then remove
590 it from the other synchros in the waitany list. Afterwards, get the
591 position of the actual synchro in the waitany dynar and
592 return it as the result of the simcall */
594 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
595 continue; // if process handling comm is killed
596 if (simcall->call == SIMCALL_COMM_WAITANY) {
597 SIMIX_waitany_remove_simcall_from_actions(simcall);
598 if (simcall->timer) {
599 SIMIX_timer_remove(simcall->timer);
600 simcall->timer = nullptr;
602 if (!MC_is_active() && !MC_record_replay_is_active())
603 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
606 /* If the synchro is still in a rendez-vous point then remove from it */
608 SIMIX_mbox_remove(comm->mbox, synchro);
610 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
612 /* Check out for errors */
614 if (simcall->issuer->host->isOff()) {
615 simcall->issuer->context->iwannadie = 1;
616 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
618 switch (synchro->state) {
621 XBT_DEBUG("Communication %p complete!", synchro);
622 SIMIX_comm_copy_data(synchro);
625 case SIMIX_SRC_TIMEOUT:
626 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
629 case SIMIX_DST_TIMEOUT:
630 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
633 case SIMIX_SRC_HOST_FAILURE:
634 if (simcall->issuer == comm->src_proc)
635 simcall->issuer->context->iwannadie = 1;
636 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
638 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
641 case SIMIX_DST_HOST_FAILURE:
642 if (simcall->issuer == comm->dst_proc)
643 simcall->issuer->context->iwannadie = 1;
644 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
646 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
649 case SIMIX_LINK_FAILURE:
651 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
653 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : nullptr,
654 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : nullptr,
655 simcall->issuer->name.c_str(), simcall->issuer, comm->detached);
656 if (comm->src_proc == simcall->issuer) {
657 XBT_DEBUG("I'm source");
658 } else if (comm->dst_proc == simcall->issuer) {
659 XBT_DEBUG("I'm dest");
661 XBT_DEBUG("I'm neither source nor dest");
663 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
667 if (simcall->issuer == comm->dst_proc)
668 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
670 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
674 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
678 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
679 if (simcall->issuer->exception) {
680 // In order to modify the exception we have to rethrow it:
682 std::rethrow_exception(simcall->issuer->exception);
685 if (simcall->call == SIMCALL_COMM_WAITANY) {
686 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
688 else if (simcall->call == SIMCALL_COMM_TESTANY) {
690 auto comms = simcall_comm_testany__get__comms(simcall);
691 auto count = simcall_comm_testany__get__count(simcall);
692 auto element = std::find(comms, comms + count, synchro);
693 if (element == comms + count)
696 e.value = element - comms;
698 simcall->issuer->exception = std::make_exception_ptr(e);
705 if (simcall->issuer->host->isOff()) {
706 simcall->issuer->context->iwannadie = 1;
709 simcall->issuer->waiting_synchro = nullptr;
710 xbt_fifo_remove(simcall->issuer->comms, synchro);
712 if(simcall->issuer == comm->src_proc){
714 xbt_fifo_remove(comm->dst_proc->comms, synchro);
716 if(simcall->issuer == comm->dst_proc){
718 xbt_fifo_remove(comm->src_proc->comms, synchro);
719 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
723 SIMIX_simcall_answer(simcall);
727 while (destroy_count-- > 0)
728 static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
731 /******************************************************************************/
732 /* SIMIX_comm_copy_data callbacks */
733 /******************************************************************************/
734 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
736 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
738 SIMIX_comm_copy_data_callback = callback;
741 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
743 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
745 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
746 *(void **) (comm->dst_buff) = buff;
749 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
751 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
753 XBT_DEBUG("Copy the data over");
754 memcpy(comm->dst_buff, buff, buff_size);
755 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
757 comm->src_buff = nullptr;
763 * \brief Copy the communication data from the sender's buffer to the receiver's one
764 * \param comm The communication
766 void SIMIX_comm_copy_data(smx_synchro_t synchro)
768 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
770 size_t buff_size = comm->src_buff_size;
771 /* If there is no data to copy then return */
772 if (!comm->src_buff || !comm->dst_buff || comm->copied)
775 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
777 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
779 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
780 comm->dst_buff, buff_size);
782 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
783 if (comm->dst_buff_size)
784 buff_size = MIN(buff_size, *(comm->dst_buff_size));
786 /* Update the receiver's buffer size to the copied amount */
787 if (comm->dst_buff_size)
788 *comm->dst_buff_size = buff_size;
791 if(comm->copy_data_fun)
792 comm->copy_data_fun (comm, comm->src_buff, buff_size);
794 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
798 /* Set the copied flag so we copy data only once */
799 /* (this function might be called from both communication ends) */