1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smx_private.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23 int (*match_fun)(void *, void *,smx_action_t),
24 void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26 int (*match_fun)(void *, void *,smx_action_t),
27 void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29 static void SIMIX_comm_start(smx_action_t action);
31 void SIMIX_network_init(void)
33 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
35 MC_ignore_global_variable("smx_total_comms");
38 void SIMIX_network_exit(void)
40 xbt_dict_free(&rdv_points);
43 /******************************************************************************/
44 /* Rendez-Vous Points */
45 /******************************************************************************/
47 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
48 return SIMIX_rdv_create(name);
50 smx_rdv_t SIMIX_rdv_create(const char *name)
52 /* two processes may have pushed the same rdv_create simcall at the same time */
53 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
56 rdv = xbt_new0(s_smx_rvpoint_t, 1);
57 rdv->name = name ? xbt_strdup(name) : NULL;
58 rdv->comm_fifo = xbt_fifo_new();
59 rdv->done_comm_fifo = xbt_fifo_new();
60 rdv->permanent_receiver=NULL;
62 XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
65 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
70 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71 return SIMIX_rdv_destroy(rdv);
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
76 xbt_dict_remove(rdv_points, rdv->name);
79 void SIMIX_rdv_free(void *data)
81 XBT_DEBUG("rdv free %p", data);
82 smx_rdv_t rdv = (smx_rdv_t) data;
84 xbt_fifo_free(rdv->comm_fifo);
85 xbt_fifo_free(rdv->done_comm_fifo);
90 xbt_dict_t SIMIX_get_rdv_points()
95 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
96 return SIMIX_rdv_get_by_name(name);
98 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
100 return xbt_dict_get_or_null(rdv_points, name);
103 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
104 return SIMIX_rdv_comm_count_by_host(rdv, host);
106 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
108 smx_action_t comm = NULL;
109 xbt_fifo_item_t item = NULL;
112 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
113 if (comm->comm.src_proc->smx_host == host)
120 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
121 return SIMIX_rdv_get_head(rdv);
123 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
125 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
128 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
129 return SIMIX_rdv_get_receiver(rdv);
132 * \brief get the receiver (process associated to the mailbox)
133 * \param rdv The rendez-vous point
134 * \return process The receiving process (NULL if not set)
136 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
138 return rdv->permanent_receiver;
141 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
142 smx_process_t process){
143 SIMIX_rdv_set_receiver(rdv, process);
146 * \brief set the receiver of the rendez vous point to allow eager sends
147 * \param rdv The rendez-vous point
148 * \param process The receiving process
150 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
152 rdv->permanent_receiver=process;
156 * \brief Pushes a communication action into a rendez-vous point
157 * \param rdv The rendez-vous point
158 * \param comm The communication action
160 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
162 xbt_fifo_push(rdv->comm_fifo, comm);
163 comm->comm.rdv = rdv;
167 * \brief Removes a communication action from a rendez-vous point
168 * \param rdv The rendez-vous point
169 * \param comm The communication action
171 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
173 xbt_fifo_remove(rdv->comm_fifo, comm);
174 comm->comm.rdv = NULL;
178 * \brief Checks if there is a communication action queued in a fifo matching our needs
179 * \param type The type of communication we are looking for (comm_send, comm_recv)
180 * \return The communication action if found, NULL otherwise
182 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
183 int (*match_fun)(void *, void *,smx_action_t),
184 void *this_user_data, smx_action_t my_action)
187 xbt_fifo_item_t item;
188 void* other_user_data = NULL;
190 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
191 if (action->comm.type == SIMIX_COMM_SEND) {
192 other_user_data = action->comm.src_data;
193 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
194 other_user_data = action->comm.dst_data;
196 if (action->comm.type == type &&
197 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
198 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
199 XBT_DEBUG("Found a matching communication action %p", action);
200 xbt_fifo_remove_item(fifo, item);
201 xbt_fifo_free_item(item);
202 action->comm.refcount++;
204 action->comm.rdv_cpy = action->comm.rdv;
206 action->comm.rdv = NULL;
209 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
210 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
211 action, (int)action->comm.type, (int)type);
213 XBT_DEBUG("No matching communication action found");
219 * \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
220 * \param type The type of communication we are looking for (comm_send, comm_recv)
221 * \return The communication action if found, NULL otherwise
223 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
224 int (*match_fun)(void *, void *,smx_action_t),
225 void *this_user_data, smx_action_t my_action)
228 xbt_fifo_item_t item;
229 void* other_user_data = NULL;
231 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
232 if (action->comm.type == SIMIX_COMM_SEND) {
233 other_user_data = action->comm.src_data;
234 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
235 other_user_data = action->comm.dst_data;
237 if (action->comm.type == type &&
238 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
239 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
240 XBT_DEBUG("Found a matching communication action %p", action);
241 action->comm.refcount++;
245 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
246 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
247 action, (int)action->comm.type, (int)type);
249 XBT_DEBUG("No matching communication action found");
252 /******************************************************************************/
253 /* Communication Actions */
254 /******************************************************************************/
257 * \brief Creates a new communicate action
258 * \param type The direction of communication (comm_send, comm_recv)
259 * \return The new communicate action
261 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
265 /* alloc structures */
266 act = xbt_mallocator_get(simix_global->action_mallocator);
268 act->type = SIMIX_ACTION_COMMUNICATE;
269 act->state = SIMIX_WAITING;
271 /* set communication */
272 act->comm.type = type;
273 act->comm.refcount = 1;
274 act->comm.src_data=NULL;
275 act->comm.dst_data=NULL;
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279 //initialize with unknown value
280 act->latency_limited = -1;
284 act->category = NULL;
287 XBT_DEBUG("Create communicate action %p", act);
294 * \brief Destroy a communicate action
295 * \param action The communicate action to be destroyed
297 void SIMIX_comm_destroy(smx_action_t action)
299 XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
300 action, action->comm.refcount, (int)action->state);
302 if (action->comm.refcount <= 0) {
303 xbt_backtrace_display_current();
304 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
305 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
307 action->comm.refcount--;
308 if (action->comm.refcount > 0)
310 XBT_DEBUG("Really free communication %p; refcount is now %d", action,
311 action->comm.refcount);
313 #ifdef HAVE_LATENCY_BOUND_TRACKING
314 action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
317 xbt_free(action->name);
318 SIMIX_comm_destroy_internal_actions(action);
320 if (action->comm.detached && action->state != SIMIX_DONE) {
321 /* the communication has failed and was detached:
322 * we have to free the buffer */
323 if (action->comm.clean_fun) {
324 action->comm.clean_fun(action->comm.src_buff);
326 action->comm.src_buff = NULL;
330 SIMIX_rdv_remove(action->comm.rdv, action);
332 xbt_mallocator_release(simix_global->action_mallocator, action);
335 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
337 if (action->comm.surf_comm){
338 #ifdef HAVE_LATENCY_BOUND_TRACKING
339 action->latency_limited = SIMIX_comm_is_latency_bounded(action);
341 surf_action_unref(action->comm.surf_comm);
342 action->comm.surf_comm = NULL;
345 if (action->comm.src_timeout){
346 surf_action_unref(action->comm.src_timeout);
347 action->comm.src_timeout = NULL;
350 if (action->comm.dst_timeout){
351 surf_action_unref(action->comm.dst_timeout);
352 action->comm.dst_timeout = NULL;
356 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
357 double task_size, double rate,
358 void *src_buff, size_t src_buff_size,
359 int (*match_fun)(void *, void *,smx_action_t),
360 void *data, double timeout){
361 smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
362 src_buff, src_buff_size, match_fun, NULL,
364 SIMCALL_SET_MC_VALUE(simcall, 0);
365 SIMIX_pre_comm_wait(simcall, comm, timeout);
367 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
368 double task_size, double rate,
369 void *src_buff, size_t src_buff_size,
370 int (*match_fun)(void *, void *,smx_action_t),
371 void (*clean_fun)(void *),
372 void *data, int detached){
373 return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
374 src_buff_size, match_fun, clean_fun, data, detached);
377 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
378 double task_size, double rate,
379 void *src_buff, size_t src_buff_size,
380 int (*match_fun)(void *, void *,smx_action_t),
381 void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
385 XBT_DEBUG("send from %p\n", rdv);
387 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
388 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
390 /* Look for communication action matching our needs. We also provide a description of
391 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
393 * If it is not found then push our communication into the rendez-vous point */
394 smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
397 other_action = this_action;
399 if (rdv->permanent_receiver!=NULL){
400 //this mailbox is for small messages, which have to be sent right now
401 other_action->state = SIMIX_READY;
402 other_action->comm.dst_proc=rdv->permanent_receiver;
403 other_action->comm.refcount++;
404 xbt_fifo_push(rdv->done_comm_fifo,other_action);
405 other_action->comm.rdv=rdv;
406 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
409 SIMIX_rdv_push(rdv, this_action);
412 XBT_DEBUG("Receive already pushed\n");
414 SIMIX_comm_destroy(this_action);
415 --smx_total_comms; // this creation was a pure waste
417 other_action->state = SIMIX_READY;
418 other_action->comm.type = SIMIX_COMM_READY;
421 xbt_fifo_push(src_proc->comms, other_action);
423 /* if the communication action is detached then decrease the refcount
424 * by one, so it will be eliminated by the receiver's destroy call */
426 other_action->comm.detached = 1;
427 other_action->comm.refcount--;
428 other_action->comm.clean_fun = clean_fun;
430 other_action->comm.clean_fun = NULL;
433 /* Setup the communication action */
434 other_action->comm.src_proc = src_proc;
435 other_action->comm.task_size = task_size;
436 other_action->comm.rate = rate;
437 other_action->comm.src_buff = src_buff;
438 other_action->comm.src_buff_size = src_buff_size;
439 other_action->comm.src_data = data;
441 other_action->comm.match_fun = match_fun;
443 if (MC_is_active()) {
444 other_action->state = SIMIX_RUNNING;
445 return (detached ? NULL : other_action);
448 SIMIX_comm_start(other_action);
449 return (detached ? NULL : other_action);
452 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
453 void *dst_buff, size_t *dst_buff_size,
454 int (*match_fun)(void *, void *, smx_action_t),
455 void *data, double timeout, double rate)
457 smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
458 dst_buff_size, match_fun, data, rate);
459 SIMCALL_SET_MC_VALUE(simcall, 0);
460 SIMIX_pre_comm_wait(simcall, comm, timeout);
463 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
464 void *dst_buff, size_t *dst_buff_size,
465 int (*match_fun)(void *, void *, smx_action_t),
466 void *data, double rate)
468 return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
469 match_fun, data, rate);
472 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
473 void *dst_buff, size_t *dst_buff_size,
474 int (*match_fun)(void *, void *, smx_action_t),
475 void *data, double rate)
477 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
478 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
480 smx_action_t other_action;
481 //communication already done, get it inside the fifo of completed comms
482 //permanent receive v1
483 //int already_received=0;
484 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
486 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
487 //find a match in the already received fifo
488 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
489 //if not found, assume the receiver came first, register it to the mailbox in the classical way
491 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
492 other_action = this_action;
493 SIMIX_rdv_push(rdv, this_action);
495 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
497 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
498 other_action->state = SIMIX_DONE;
499 other_action->comm.type = SIMIX_COMM_DONE;
500 other_action->comm.rdv = NULL;
501 //SIMIX_comm_destroy(this_action);
502 //--smx_total_comms; // this creation was a pure waste
503 //already_received=1;
504 //other_action->comm.refcount--;
506 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
508 other_action->comm.refcount--;
509 SIMIX_comm_destroy(this_action);
510 --smx_total_comms; // this creation was a pure waste
513 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
515 /* Look for communication action matching our needs. We also provide a description of
516 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
518 * If it is not found then push our communication into the rendez-vous point */
519 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
522 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
523 other_action = this_action;
524 SIMIX_rdv_push(rdv, this_action);
526 SIMIX_comm_destroy(this_action);
527 --smx_total_comms; // this creation was a pure waste
528 other_action->state = SIMIX_READY;
529 other_action->comm.type = SIMIX_COMM_READY;
530 //other_action->comm.refcount--;
532 xbt_fifo_push(dst_proc->comms, other_action);
535 /* Setup communication action */
536 other_action->comm.dst_proc = dst_proc;
537 other_action->comm.dst_buff = dst_buff;
538 other_action->comm.dst_buff_size = dst_buff_size;
539 other_action->comm.dst_data = data;
542 (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
543 other_action->comm.rate = rate;
545 other_action->comm.match_fun = match_fun;
548 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
549 SIMIX_comm_copy_data(other_action);*/
552 if (MC_is_active()) {
553 other_action->state = SIMIX_RUNNING;
557 SIMIX_comm_start(other_action);
562 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
564 int (*match_fun)(void *, void *, smx_action_t),
566 return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
569 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
570 int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
572 XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
573 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
575 smx_action_t other_action=NULL;
576 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
577 //find a match in the already received fifo
578 XBT_DEBUG("first try in the perm recv mailbox \n");
580 other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
584 XBT_DEBUG("second try in the other mailbox");
585 other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
588 if(other_action)other_action->comm.refcount--;
590 SIMIX_comm_destroy(this_action);
595 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
597 /* the simcall may be a wait, a send or a recv */
600 /* Associate this simcall to the wait action */
601 XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
603 xbt_fifo_push(action->simcalls, simcall);
604 simcall->issuer->waiting_action = action;
606 if (MC_is_active()) {
607 int idx = SIMCALL_GET_MC_VALUE(simcall);
609 action->state = SIMIX_DONE;
611 /* If we reached this point, the wait simcall must have a timeout */
612 /* Otherwise it shouldn't be enabled and executed by the MC */
616 if (action->comm.src_proc == simcall->issuer)
617 action->state = SIMIX_SRC_TIMEOUT;
619 action->state = SIMIX_DST_TIMEOUT;
622 SIMIX_comm_finish(action);
626 /* If the action has already finish perform the error handling, */
627 /* otherwise set up a waiting timeout on the right side */
628 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
629 SIMIX_comm_finish(action);
630 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
631 sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
632 surf_action_set_data(sleep, action);
634 if (simcall->issuer == action->comm.src_proc)
635 action->comm.src_timeout = sleep;
637 action->comm.dst_timeout = sleep;
641 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
644 simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
645 if(simcall_comm_test__get__result(simcall)){
646 action->state = SIMIX_DONE;
647 xbt_fifo_push(action->simcalls, simcall);
648 SIMIX_comm_finish(action);
650 SIMIX_simcall_answer(simcall);
655 simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
656 if (simcall_comm_test__get__result(simcall)) {
657 xbt_fifo_push(action->simcalls, simcall);
658 SIMIX_comm_finish(action);
660 SIMIX_simcall_answer(simcall);
664 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
668 simcall_comm_testany__set__result(simcall, -1);
671 int idx = SIMCALL_GET_MC_VALUE(simcall);
673 SIMIX_simcall_answer(simcall);
675 action = xbt_dynar_get_as(actions, idx, smx_action_t);
676 simcall_comm_testany__set__result(simcall, idx);
677 xbt_fifo_push(action->simcalls, simcall);
678 action->state = SIMIX_DONE;
679 SIMIX_comm_finish(action);
684 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
685 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
686 simcall_comm_testany__set__result(simcall, cursor);
687 xbt_fifo_push(action->simcalls, simcall);
688 SIMIX_comm_finish(action);
692 SIMIX_simcall_answer(simcall);
695 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
698 unsigned int cursor = 0;
701 int idx = SIMCALL_GET_MC_VALUE(simcall);
702 action = xbt_dynar_get_as(actions, idx, smx_action_t);
703 xbt_fifo_push(action->simcalls, simcall);
704 simcall_comm_waitany__set__result(simcall, idx);
705 action->state = SIMIX_DONE;
706 SIMIX_comm_finish(action);
710 xbt_dynar_foreach(actions, cursor, action){
711 /* associate this simcall to the the action */
712 xbt_fifo_push(action->simcalls, simcall);
714 /* see if the action is already finished */
715 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
716 SIMIX_comm_finish(action);
722 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
725 unsigned int cursor = 0;
726 xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
728 xbt_dynar_foreach(actions, cursor, action) {
729 xbt_fifo_remove(action->simcalls, simcall);
734 * \brief Starts the simulation of a communication action.
735 * \param action the communication action
737 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
739 /* If both the sender and the receiver are already there, start the communication */
740 if (action->state == SIMIX_READY) {
742 smx_host_t sender = action->comm.src_proc->smx_host;
743 smx_host_t receiver = action->comm.dst_proc->smx_host;
745 XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
746 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
748 action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
750 action->comm.task_size, action->comm.rate);
752 surf_action_set_data(action->comm.surf_comm, action);
754 action->state = SIMIX_RUNNING;
756 /* If a link is failed, detect it immediately */
757 if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
758 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
759 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
760 action->state = SIMIX_LINK_FAILURE;
761 SIMIX_comm_destroy_internal_actions(action);
764 /* If any of the process is suspend, create the action but stop its execution,
765 it will be restarted when the sender process resume */
766 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
767 SIMIX_process_is_suspended(action->comm.dst_proc)) {
768 /* FIXME: check what should happen with the action state */
770 if (SIMIX_process_is_suspended(action->comm.src_proc))
771 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
772 SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
774 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
775 SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
777 surf_action_suspend(action->comm.surf_comm);
784 * \brief Answers the SIMIX simcalls associated to a communication action.
785 * \param action a finished communication action
787 void SIMIX_comm_finish(smx_action_t action)
789 unsigned int destroy_count = 0;
790 smx_simcall_t simcall;
793 while ((simcall = xbt_fifo_shift(action->simcalls))) {
795 /* If a waitany simcall is waiting for this action to finish, then remove
796 it from the other actions in the waitany list. Afterwards, get the
797 position of the actual action in the waitany dynar and
798 return it as the result of the simcall */
799 if (simcall->call == SIMCALL_COMM_WAITANY) {
800 SIMIX_waitany_remove_simcall_from_actions(simcall);
802 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
805 /* If the action is still in a rendez-vous point then remove from it */
806 if (action->comm.rdv)
807 SIMIX_rdv_remove(action->comm.rdv, action);
809 XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
811 /* Check out for errors */
812 switch (action->state) {
815 XBT_DEBUG("Communication %p complete!", action);
816 SIMIX_comm_copy_data(action);
819 case SIMIX_SRC_TIMEOUT:
820 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
821 "Communication timeouted because of sender");
824 case SIMIX_DST_TIMEOUT:
825 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
826 "Communication timeouted because of receiver");
829 case SIMIX_SRC_HOST_FAILURE:
830 if (simcall->issuer == action->comm.src_proc)
831 simcall->issuer->context->iwannadie = 1;
832 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
834 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
837 case SIMIX_DST_HOST_FAILURE:
838 if (simcall->issuer == action->comm.dst_proc)
839 simcall->issuer->context->iwannadie = 1;
840 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
842 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
845 case SIMIX_LINK_FAILURE:
846 XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
848 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
849 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
850 simcall->issuer->name, simcall->issuer, action->comm.detached);
851 if (action->comm.src_proc == simcall->issuer) {
852 XBT_DEBUG("I'm source");
853 } else if (action->comm.dst_proc == simcall->issuer) {
854 XBT_DEBUG("I'm dest");
856 XBT_DEBUG("I'm neither source nor dest");
858 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
862 if (simcall->issuer == action->comm.dst_proc)
863 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
864 "Communication canceled by the sender");
866 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
867 "Communication canceled by the receiver");
871 xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
874 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
875 if (simcall->issuer->doexception) {
876 if (simcall->call == SIMCALL_COMM_WAITANY) {
877 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
879 else if (simcall->call == SIMCALL_COMM_TESTANY) {
880 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
884 if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
885 simcall->issuer->context->iwannadie = 1;
888 simcall->issuer->waiting_action = NULL;
889 xbt_fifo_remove(simcall->issuer->comms, action);
890 if(action->comm.detached){
894 if(simcall->issuer == action->comm.src_proc){
895 if(action->comm.dst_proc){
896 xbt_swag_foreach(proc, simix_global->process_list)
898 if(proc==action->comm.dst_proc){
904 if(still_alive) xbt_fifo_remove(action->comm.dst_proc->comms, action);
906 if(simcall->issuer == action->comm.dst_proc){
907 if(action->comm.src_proc)
908 if(action->comm.dst_proc){
909 xbt_swag_foreach(proc, simix_global->process_list)
911 if(proc==action->comm.src_proc){
917 if(still_alive) xbt_fifo_remove(action->comm.src_proc->comms, action);
920 SIMIX_simcall_answer(simcall);
924 while (destroy_count-- > 0)
925 SIMIX_comm_destroy(action);
929 * \brief This function is called when a Surf communication action is finished.
930 * \param action the corresponding Simix communication
932 void SIMIX_post_comm(smx_action_t action)
934 /* Update action state */
935 if (action->comm.src_timeout &&
936 surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
937 action->state = SIMIX_SRC_TIMEOUT;
938 else if (action->comm.dst_timeout &&
939 surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
940 action->state = SIMIX_DST_TIMEOUT;
941 else if (action->comm.src_timeout &&
942 surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
943 action->state = SIMIX_SRC_HOST_FAILURE;
944 else if (action->comm.dst_timeout &&
945 surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
946 action->state = SIMIX_DST_HOST_FAILURE;
947 else if (action->comm.surf_comm &&
948 surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
949 XBT_DEBUG("Puta madre. Surf says that the link broke");
950 action->state = SIMIX_LINK_FAILURE;
952 action->state = SIMIX_DONE;
954 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
955 action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
957 /* destroy the surf actions associated with the Simix communication */
958 SIMIX_comm_destroy_internal_actions(action);
960 /* remove the communication action from the list of pending communications
961 * of both processes (if they still exist) */
962 if (action->comm.src_proc) {
963 xbt_fifo_remove(action->comm.src_proc->comms, action);
965 if (action->comm.dst_proc) {
966 xbt_fifo_remove(action->comm.dst_proc->comms, action);
969 /* if there are simcalls associated with the action, then answer them */
970 if (xbt_fifo_size(action->simcalls)) {
971 SIMIX_comm_finish(action);
975 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
976 SIMIX_comm_cancel(action);
978 void SIMIX_comm_cancel(smx_action_t action)
980 /* if the action is a waiting state means that it is still in a rdv */
981 /* so remove from it and delete it */
982 if (action->state == SIMIX_WAITING) {
983 SIMIX_rdv_remove(action->comm.rdv, action);
984 action->state = SIMIX_CANCELED;
986 else if (!MC_is_active() /* when running the MC there are no surf actions */
987 && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
989 surf_action_cancel(action->comm.surf_comm);
993 void SIMIX_comm_suspend(smx_action_t action)
995 /*FIXME: shall we suspend also the timeout actions? */
996 if (action->comm.surf_comm)
997 surf_action_suspend(action->comm.surf_comm);
998 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
1001 void SIMIX_comm_resume(smx_action_t action)
1003 /*FIXME: check what happen with the timeouts */
1004 if (action->comm.surf_comm)
1005 surf_action_resume(action->comm.surf_comm);
1006 /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1010 /************* Action Getters **************/
1012 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1013 return SIMIX_comm_get_remains(action);
1016 * \brief get the amount remaining from the communication
1017 * \param action The communication
1019 double SIMIX_comm_get_remains(smx_action_t action)
1027 switch (action->state) {
1030 remains = surf_action_get_remains(action->comm.surf_comm);
1035 remains = 0; /*FIXME: check what should be returned */
1039 remains = 0; /*FIXME: is this correct? */
1045 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1046 return SIMIX_comm_get_state(action);
1048 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1050 return action->state;
1053 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1054 return SIMIX_comm_get_src_data(action);
1057 * \brief Return the user data associated to the sender of the communication
1058 * \param action The communication
1059 * \return the user data
1061 void* SIMIX_comm_get_src_data(smx_action_t action)
1063 return action->comm.src_data;
1066 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1067 return SIMIX_comm_get_dst_data(action);
1070 * \brief Return the user data associated to the receiver of the communication
1071 * \param action The communication
1072 * \return the user data
1074 void* SIMIX_comm_get_dst_data(smx_action_t action)
1076 return action->comm.dst_data;
1079 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1080 return SIMIX_comm_get_src_proc(action);
1082 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1084 return action->comm.src_proc;
1087 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1088 return SIMIX_comm_get_dst_proc(action);
1090 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1092 return action->comm.dst_proc;
1095 #ifdef HAVE_LATENCY_BOUND_TRACKING
1096 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1098 return SIMIX_comm_is_latency_bounded(action);
1102 * \brief verify if communication is latency bounded
1103 * \param comm The communication
1105 int SIMIX_comm_is_latency_bounded(smx_action_t action)
1110 if (action->comm.surf_comm){
1111 XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1112 action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
1113 XBT_DEBUG("Action limited is %d", action->latency_limited);
1115 return action->latency_limited;
1119 /******************************************************************************/
1120 /* SIMIX_comm_copy_data callbacks */
1121 /******************************************************************************/
1122 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1123 &SIMIX_comm_copy_pointer_callback;
1126 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1128 SIMIX_comm_copy_data_callback = callback;
1131 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1133 xbt_assert((buff_size == sizeof(void *)),
1134 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1135 *(void **) (comm->comm.dst_buff) = buff;
1138 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1140 XBT_DEBUG("Copy the data over");
1141 memcpy(comm->comm.dst_buff, buff, buff_size);
1142 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1144 comm->comm.src_buff = NULL;
1150 * \brief Copy the communication data from the sender's buffer to the receiver's one
1151 * \param comm The communication
1153 void SIMIX_comm_copy_data(smx_action_t comm)
1155 size_t buff_size = comm->comm.src_buff_size;
1156 /* If there is no data to be copy then return */
1157 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1160 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1162 comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1163 comm->comm.src_buff,
1164 comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1165 comm->comm.dst_buff, buff_size);
1167 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1168 if (comm->comm.dst_buff_size)
1169 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1171 /* Update the receiver's buffer size to the copied amount */
1172 if (comm->comm.dst_buff_size)
1173 *comm->comm.dst_buff_size = buff_size;
1176 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1178 /* Set the copied flag so we copy data only once */
1179 /* (this function might be called from both communication ends) */
1180 comm->comm.copied = 1;