1 /* Copyright (c) 2009-2013. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smx_private.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23 int (*match_fun)(void *, void *,smx_action_t),
24 void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26 int (*match_fun)(void *, void *,smx_action_t),
27 void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29 static void SIMIX_comm_start(smx_action_t action);
31 void SIMIX_network_init(void)
33 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
35 MC_ignore_global_variable("smx_total_comms");
38 void SIMIX_network_exit(void)
40 xbt_dict_free(&rdv_points);
43 /******************************************************************************/
44 /* Rendez-Vous Points */
45 /******************************************************************************/
47 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
48 return SIMIX_rdv_create(name);
50 smx_rdv_t SIMIX_rdv_create(const char *name)
52 /* two processes may have pushed the same rdv_create simcall at the same time */
53 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
56 rdv = xbt_new0(s_smx_rvpoint_t, 1);
57 rdv->name = name ? xbt_strdup(name) : NULL;
58 rdv->comm_fifo = xbt_fifo_new();
59 rdv->done_comm_fifo = xbt_fifo_new();
60 rdv->permanent_receiver=NULL;
62 XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
65 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
70 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71 return SIMIX_rdv_destroy(rdv);
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
76 xbt_dict_remove(rdv_points, rdv->name);
79 void SIMIX_rdv_free(void *data)
81 XBT_DEBUG("rdv free %p", data);
82 smx_rdv_t rdv = (smx_rdv_t) data;
84 xbt_fifo_free(rdv->comm_fifo);
85 xbt_fifo_free(rdv->done_comm_fifo);
90 xbt_dict_t SIMIX_get_rdv_points()
95 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
96 return SIMIX_rdv_get_by_name(name);
98 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
100 return xbt_dict_get_or_null(rdv_points, name);
103 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
104 return SIMIX_rdv_comm_count_by_host(rdv, host);
106 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
108 smx_action_t comm = NULL;
109 xbt_fifo_item_t item = NULL;
112 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
113 if (comm->comm.src_proc->smx_host == host)
120 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
121 return SIMIX_rdv_get_head(rdv);
123 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
125 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
128 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
129 return SIMIX_rdv_get_receiver(rdv);
132 * \brief get the receiver (process associated to the mailbox)
133 * \param rdv The rendez-vous point
134 * \return process The receiving process (NULL if not set)
136 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
138 return rdv->permanent_receiver;
141 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
142 smx_process_t process){
143 SIMIX_rdv_set_receiver(rdv, process);
146 * \brief set the receiver of the rendez vous point to allow eager sends
147 * \param rdv The rendez-vous point
148 * \param process The receiving process
150 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
152 rdv->permanent_receiver=process;
156 * \brief Pushes a communication action into a rendez-vous point
157 * \param rdv The rendez-vous point
158 * \param comm The communication action
160 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
162 xbt_fifo_push(rdv->comm_fifo, comm);
163 comm->comm.rdv = rdv;
167 * \brief Removes a communication action from a rendez-vous point
168 * \param rdv The rendez-vous point
169 * \param comm The communication action
171 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
173 xbt_fifo_remove(rdv->comm_fifo, comm);
174 comm->comm.rdv = NULL;
178 * \brief Checks if there is a communication action queued in a fifo matching our needs
179 * \param type The type of communication we are looking for (comm_send, comm_recv)
180 * \return The communication action if found, NULL otherwise
182 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
183 int (*match_fun)(void *, void *,smx_action_t),
184 void *this_user_data, smx_action_t my_action)
187 xbt_fifo_item_t item;
188 void* other_user_data = NULL;
190 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
191 if (action->comm.type == SIMIX_COMM_SEND) {
192 other_user_data = action->comm.src_data;
193 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
194 other_user_data = action->comm.dst_data;
196 if (action->comm.type == type &&
197 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
198 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
199 XBT_DEBUG("Found a matching communication action %p", action);
200 xbt_fifo_remove_item(fifo, item);
201 xbt_fifo_free_item(item);
202 action->comm.refcount++;
204 action->comm.rdv_cpy = action->comm.rdv;
206 action->comm.rdv = NULL;
209 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
210 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
211 action, (int)action->comm.type, (int)type);
213 XBT_DEBUG("No matching communication action found");
219 * \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
220 * \param type The type of communication we are looking for (comm_send, comm_recv)
221 * \return The communication action if found, NULL otherwise
223 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
224 int (*match_fun)(void *, void *,smx_action_t),
225 void *this_user_data, smx_action_t my_action)
228 xbt_fifo_item_t item;
229 void* other_user_data = NULL;
231 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
232 if (action->comm.type == SIMIX_COMM_SEND) {
233 other_user_data = action->comm.src_data;
234 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
235 other_user_data = action->comm.dst_data;
237 if (action->comm.type == type &&
238 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
239 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
240 XBT_DEBUG("Found a matching communication action %p", action);
241 action->comm.refcount++;
245 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
246 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
247 action, (int)action->comm.type, (int)type);
249 XBT_DEBUG("No matching communication action found");
252 /******************************************************************************/
253 /* Communication Actions */
254 /******************************************************************************/
257 * \brief Creates a new communicate action
258 * \param type The direction of communication (comm_send, comm_recv)
259 * \return The new communicate action
261 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
265 /* alloc structures */
266 act = xbt_mallocator_get(simix_global->action_mallocator);
268 act->type = SIMIX_ACTION_COMMUNICATE;
269 act->state = SIMIX_WAITING;
271 /* set communication */
272 act->comm.type = type;
273 act->comm.refcount = 1;
274 act->comm.src_data=NULL;
275 act->comm.dst_data=NULL;
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279 //initialize with unknown value
280 act->latency_limited = -1;
284 act->category = NULL;
287 XBT_DEBUG("Create communicate action %p", act);
293 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
294 SIMIX_comm_destroy(action);
297 * \brief Destroy a communicate action
298 * \param action The communicate action to be destroyed
300 void SIMIX_comm_destroy(smx_action_t action)
302 XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
303 action, action->comm.refcount, (int)action->state);
305 if (action->comm.refcount <= 0) {
306 xbt_backtrace_display_current();
307 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
308 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
310 action->comm.refcount--;
311 if (action->comm.refcount > 0)
313 XBT_DEBUG("Really free communication %p; refcount is now %d", action,
314 action->comm.refcount);
316 #ifdef HAVE_LATENCY_BOUND_TRACKING
317 action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
320 xbt_free(action->name);
321 SIMIX_comm_destroy_internal_actions(action);
323 if (action->comm.detached && action->state != SIMIX_DONE) {
324 /* the communication has failed and was detached:
325 * we have to free the buffer */
326 if (action->comm.clean_fun) {
327 action->comm.clean_fun(action->comm.src_buff);
329 action->comm.src_buff = NULL;
333 SIMIX_rdv_remove(action->comm.rdv, action);
335 xbt_mallocator_release(simix_global->action_mallocator, action);
338 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
340 if (action->comm.surf_comm){
341 #ifdef HAVE_LATENCY_BOUND_TRACKING
342 action->latency_limited = SIMIX_comm_is_latency_bounded(action);
344 action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
345 action->comm.surf_comm = NULL;
348 if (action->comm.src_timeout){
349 action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
350 action->comm.src_timeout = NULL;
353 if (action->comm.dst_timeout){
354 action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
355 action->comm.dst_timeout = NULL;
359 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
360 double task_size, double rate,
361 void *src_buff, size_t src_buff_size,
362 int (*match_fun)(void *, void *,smx_action_t),
363 void *data, double timeout){
364 smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
365 src_buff, src_buff_size, match_fun, NULL,
367 simcall->mc_value = 0;
368 SIMIX_pre_comm_wait(simcall, comm, timeout);
370 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
371 double task_size, double rate,
372 void *src_buff, size_t src_buff_size,
373 int (*match_fun)(void *, void *,smx_action_t),
374 void (*clean_fun)(void *),
375 void *data, int detached){
376 return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
377 src_buff_size, match_fun, clean_fun, data, detached);
380 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
381 double task_size, double rate,
382 void *src_buff, size_t src_buff_size,
383 int (*match_fun)(void *, void *,smx_action_t),
384 void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
388 XBT_DEBUG("send from %p\n", rdv);
390 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
391 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
393 /* Look for communication action matching our needs. We also provide a description of
394 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
396 * If it is not found then push our communication into the rendez-vous point */
397 smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
400 other_action = this_action;
402 if (rdv->permanent_receiver!=NULL){
403 //this mailbox is for small messages, which have to be sent right now
404 other_action->state = SIMIX_READY;
405 other_action->comm.dst_proc=rdv->permanent_receiver;
406 other_action->comm.refcount++;
407 xbt_fifo_push(rdv->done_comm_fifo,other_action);
408 other_action->comm.rdv=rdv;
409 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
412 SIMIX_rdv_push(rdv, this_action);
415 XBT_DEBUG("Receive already pushed\n");
417 SIMIX_comm_destroy(this_action);
418 --smx_total_comms; // this creation was a pure waste
420 other_action->state = SIMIX_READY;
421 other_action->comm.type = SIMIX_COMM_READY;
424 xbt_fifo_push(src_proc->comms, other_action);
426 /* if the communication action is detached then decrease the refcount
427 * by one, so it will be eliminated by the receiver's destroy call */
429 other_action->comm.detached = 1;
430 other_action->comm.refcount--;
431 other_action->comm.clean_fun = clean_fun;
433 other_action->comm.clean_fun = NULL;
436 /* Setup the communication action */
437 other_action->comm.src_proc = src_proc;
438 other_action->comm.task_size = task_size;
439 other_action->comm.rate = rate;
440 other_action->comm.src_buff = src_buff;
441 other_action->comm.src_buff_size = src_buff_size;
442 other_action->comm.src_data = data;
444 other_action->comm.match_fun = match_fun;
446 if (MC_is_active()) {
447 other_action->state = SIMIX_RUNNING;
451 SIMIX_comm_start(other_action);
452 return (detached ? NULL : other_action);
455 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
456 void *dst_buff, size_t *dst_buff_size,
457 int (*match_fun)(void *, void *, smx_action_t),
458 void *data, double timeout){
459 smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
460 dst_buff_size, match_fun, data);
461 simcall->mc_value = 0;
462 SIMIX_pre_comm_wait(simcall, comm, timeout);
465 void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
466 void *dst_buff, size_t *dst_buff_size,
467 int (*match_fun)(void *, void *, smx_action_t),
468 void *data, double timeout, double rate){
469 smx_action_t comm = SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff,
470 dst_buff_size, match_fun, data, rate);
471 simcall->mc_value = 0;
472 SIMIX_pre_comm_wait(simcall, comm, timeout);
475 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
476 void *dst_buff, size_t *dst_buff_size,
477 int (*match_fun)(void *, void *, smx_action_t),
479 return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
483 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
484 void *dst_buff, size_t *dst_buff_size,
485 int (*match_fun)(void *, void *, smx_action_t), void *data)
487 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
488 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
490 smx_action_t other_action;
491 //communication already done, get it inside the fifo of completed comms
492 //permanent receive v1
493 //int already_received=0;
494 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
496 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
497 //find a match in the already received fifo
498 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
499 //if not found, assume the receiver came first, register it to the mailbox in the classical way
501 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
502 other_action = this_action;
503 SIMIX_rdv_push(rdv, this_action);
505 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
507 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
508 other_action->state = SIMIX_DONE;
509 other_action->comm.type = SIMIX_COMM_DONE;
510 other_action->comm.rdv = NULL;
511 //SIMIX_comm_destroy(this_action);
512 //--smx_total_comms; // this creation was a pure waste
513 //already_received=1;
514 //other_action->comm.refcount--;
516 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
518 other_action->comm.refcount--;
519 SIMIX_comm_destroy(this_action);
520 --smx_total_comms; // this creation was a pure waste
523 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
525 /* Look for communication action matching our needs. We also provide a description of
526 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
528 * If it is not found then push our communication into the rendez-vous point */
529 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
532 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
533 other_action = this_action;
534 SIMIX_rdv_push(rdv, this_action);
536 SIMIX_comm_destroy(this_action);
537 --smx_total_comms; // this creation was a pure waste
538 other_action->state = SIMIX_READY;
539 other_action->comm.type = SIMIX_COMM_READY;
540 //other_action->comm.refcount--;
542 xbt_fifo_push(dst_proc->comms, other_action);
545 /* Setup communication action */
546 other_action->comm.dst_proc = dst_proc;
547 other_action->comm.dst_buff = dst_buff;
548 other_action->comm.dst_buff_size = dst_buff_size;
549 other_action->comm.dst_data = data;
551 other_action->comm.match_fun = match_fun;
554 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
555 SIMIX_comm_copy_data(other_action);*/
558 if (MC_is_active()) {
559 other_action->state = SIMIX_RUNNING;
563 SIMIX_comm_start(other_action);
568 smx_action_t SIMIX_pre_comm_irecv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
569 void *dst_buff, size_t *dst_buff_size,
570 int (*match_fun)(void *, void *, smx_action_t),
571 void *data, double rate){
572 return SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, dst_buff_size,
573 match_fun, data, rate);
576 smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
577 void *dst_buff, size_t *dst_buff_size,
578 int (*match_fun)(void *, void *, smx_action_t), void *data, double rate)
580 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
581 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
583 smx_action_t other_action;
584 //communication already done, get it inside the fifo of completed comms
585 //permanent receive v1
586 //int already_received=0;
587 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
589 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
590 //find a match in the already received fifo
591 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
592 //if not found, assume the receiver came first, register it to the mailbox in the classical way
594 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
595 other_action = this_action;
596 SIMIX_rdv_push(rdv, this_action);
598 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
600 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
601 other_action->state = SIMIX_DONE;
602 other_action->comm.type = SIMIX_COMM_DONE;
603 other_action->comm.rdv = NULL;
604 //SIMIX_comm_destroy(this_action);
605 //--smx_total_comms; // this creation was a pure waste
606 //already_received=1;
607 //other_action->comm.refcount--;
609 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
611 other_action->comm.refcount--;
612 SIMIX_comm_destroy(this_action);
613 --smx_total_comms; // this creation was a pure waste
616 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
618 /* Look for communication action matching our needs. We also provide a description of
619 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
621 * If it is not found then push our communication into the rendez-vous point */
622 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
625 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
626 other_action = this_action;
627 SIMIX_rdv_push(rdv, this_action);
629 SIMIX_comm_destroy(this_action);
630 --smx_total_comms; // this creation was a pure waste
631 other_action->state = SIMIX_READY;
632 other_action->comm.type = SIMIX_COMM_READY;
633 //other_action->comm.refcount--;
635 xbt_fifo_push(dst_proc->comms, other_action);
638 /* Setup communication action */
639 other_action->comm.dst_proc = dst_proc;
640 other_action->comm.dst_buff = dst_buff;
641 other_action->comm.dst_buff_size = dst_buff_size;
642 other_action->comm.dst_data = data;
644 if (rate < other_action->comm.rate || other_action->comm.rate == -1.0)
645 other_action->comm.rate = rate;
647 other_action->comm.match_fun = match_fun;
650 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
651 SIMIX_comm_copy_data(other_action);*/
654 if (MC_is_active()) {
655 other_action->state = SIMIX_RUNNING;
659 SIMIX_comm_start(other_action);
664 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
666 int (*match_fun)(void *, void *, smx_action_t),
668 return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
671 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
672 int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
674 XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
675 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
677 smx_action_t other_action=NULL;
678 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
679 //find a match in the already received fifo
680 XBT_DEBUG("first try in the perm recv mailbox \n");
682 other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
686 XBT_DEBUG("second try in the other mailbox");
687 other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
690 if(other_action)other_action->comm.refcount--;
692 SIMIX_comm_destroy(this_action);
697 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
699 /* the simcall may be a wait, a send or a recv */
702 /* Associate this simcall to the wait action */
703 XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
705 xbt_fifo_push(action->simcalls, simcall);
706 simcall->issuer->waiting_action = action;
708 if (MC_is_active()) {
709 int idx = simcall->mc_value;
711 action->state = SIMIX_DONE;
713 /* If we reached this point, the wait simcall must have a timeout */
714 /* Otherwise it shouldn't be enabled and executed by the MC */
718 if (action->comm.src_proc == simcall->issuer)
719 action->state = SIMIX_SRC_TIMEOUT;
721 action->state = SIMIX_DST_TIMEOUT;
724 SIMIX_comm_finish(action);
728 /* If the action has already finish perform the error handling, */
729 /* otherwise set up a waiting timeout on the right side */
730 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
731 SIMIX_comm_finish(action);
732 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
733 sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host, timeout);
734 surf_workstation_model->action_data_set(sleep, action);
736 if (simcall->issuer == action->comm.src_proc)
737 action->comm.src_timeout = sleep;
739 action->comm.dst_timeout = sleep;
743 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
746 simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
747 if(simcall_comm_test__get__result(simcall)){
748 action->state = SIMIX_DONE;
749 xbt_fifo_push(action->simcalls, simcall);
750 SIMIX_comm_finish(action);
752 SIMIX_simcall_answer(simcall);
757 simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
758 if (simcall_comm_test__get__result(simcall)) {
759 xbt_fifo_push(action->simcalls, simcall);
760 SIMIX_comm_finish(action);
762 SIMIX_simcall_answer(simcall);
766 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
770 simcall_comm_testany__set__result(simcall, -1);
773 int idx = simcall->mc_value;
775 SIMIX_simcall_answer(simcall);
777 action = xbt_dynar_get_as(actions, idx, smx_action_t);
778 simcall_comm_testany__set__result(simcall, idx);
779 xbt_fifo_push(action->simcalls, simcall);
780 action->state = SIMIX_DONE;
781 SIMIX_comm_finish(action);
786 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
787 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
788 simcall_comm_testany__set__result(simcall, cursor);
789 xbt_fifo_push(action->simcalls, simcall);
790 SIMIX_comm_finish(action);
794 SIMIX_simcall_answer(simcall);
797 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
800 unsigned int cursor = 0;
803 int idx = simcall->mc_value;
804 action = xbt_dynar_get_as(actions, idx, smx_action_t);
805 xbt_fifo_push(action->simcalls, simcall);
806 simcall_comm_waitany__set__result(simcall, idx);
807 action->state = SIMIX_DONE;
808 SIMIX_comm_finish(action);
812 xbt_dynar_foreach(actions, cursor, action){
813 /* associate this simcall to the the action */
814 xbt_fifo_push(action->simcalls, simcall);
816 /* see if the action is already finished */
817 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
818 SIMIX_comm_finish(action);
824 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
827 unsigned int cursor = 0;
828 xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
830 xbt_dynar_foreach(actions, cursor, action) {
831 xbt_fifo_remove(action->simcalls, simcall);
836 * \brief Starts the simulation of a communication action.
837 * \param action the communication action
839 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
841 /* If both the sender and the receiver are already there, start the communication */
842 if (action->state == SIMIX_READY) {
844 smx_host_t sender = action->comm.src_proc->smx_host;
845 smx_host_t receiver = action->comm.dst_proc->smx_host;
847 XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
848 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
850 action->comm.surf_comm = surf_workstation_model->extension.workstation.
851 communicate(sender, receiver, action->comm.task_size, action->comm.rate);
853 surf_workstation_model->action_data_set(action->comm.surf_comm, action);
855 action->state = SIMIX_RUNNING;
857 /* If a link is failed, detect it immediately */
858 if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
859 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
860 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
861 action->state = SIMIX_LINK_FAILURE;
862 SIMIX_comm_destroy_internal_actions(action);
865 /* If any of the process is suspend, create the action but stop its execution,
866 it will be restarted when the sender process resume */
867 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
868 SIMIX_process_is_suspended(action->comm.dst_proc)) {
869 /* FIXME: check what should happen with the action state */
871 if (SIMIX_process_is_suspended(action->comm.src_proc))
872 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
873 SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
875 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
876 SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
878 surf_workstation_model->suspend(action->comm.surf_comm);
885 * \brief Answers the SIMIX simcalls associated to a communication action.
886 * \param action a finished communication action
888 void SIMIX_comm_finish(smx_action_t action)
890 unsigned int destroy_count = 0;
891 smx_simcall_t simcall;
893 while ((simcall = xbt_fifo_shift(action->simcalls))) {
895 /* If a waitany simcall is waiting for this action to finish, then remove
896 it from the other actions in the waitany list. Afterwards, get the
897 position of the actual action in the waitany dynar and
898 return it as the result of the simcall */
899 if (simcall->call == SIMCALL_COMM_WAITANY) {
900 SIMIX_waitany_remove_simcall_from_actions(simcall);
902 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
905 /* If the action is still in a rendez-vous point then remove from it */
906 if (action->comm.rdv)
907 SIMIX_rdv_remove(action->comm.rdv, action);
909 XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
911 /* Check out for errors */
912 switch (action->state) {
915 XBT_DEBUG("Communication %p complete!", action);
916 SIMIX_comm_copy_data(action);
919 case SIMIX_SRC_TIMEOUT:
920 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
921 "Communication timeouted because of sender");
924 case SIMIX_DST_TIMEOUT:
925 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
926 "Communication timeouted because of receiver");
929 case SIMIX_SRC_HOST_FAILURE:
930 if (simcall->issuer == action->comm.src_proc)
931 simcall->issuer->context->iwannadie = 1;
932 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
934 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
937 case SIMIX_DST_HOST_FAILURE:
938 if (simcall->issuer == action->comm.dst_proc)
939 simcall->issuer->context->iwannadie = 1;
940 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
942 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
945 case SIMIX_LINK_FAILURE:
946 XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
948 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
949 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
950 simcall->issuer->name, simcall->issuer, action->comm.detached);
951 if (action->comm.src_proc == simcall->issuer) {
952 XBT_DEBUG("I'm source");
953 } else if (action->comm.dst_proc == simcall->issuer) {
954 XBT_DEBUG("I'm dest");
956 XBT_DEBUG("I'm neither source nor dest");
958 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
962 if (simcall->issuer == action->comm.dst_proc)
963 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
964 "Communication canceled by the sender");
966 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
967 "Communication canceled by the receiver");
971 xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
974 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
975 if (simcall->issuer->doexception) {
976 if (simcall->call == SIMCALL_COMM_WAITANY) {
977 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
979 else if (simcall->call == SIMCALL_COMM_TESTANY) {
980 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
984 if (surf_workstation_model->extension.
985 workstation.get_state(simcall->issuer->smx_host) != SURF_RESOURCE_ON) {
986 simcall->issuer->context->iwannadie = 1;
989 simcall->issuer->waiting_action = NULL;
990 xbt_fifo_remove(simcall->issuer->comms, action);
991 if(action->comm.detached){
992 if(simcall->issuer == action->comm.src_proc){
993 if(action->comm.dst_proc)
994 xbt_fifo_remove(action->comm.dst_proc->comms, action);
996 if(simcall->issuer == action->comm.dst_proc){
997 if(action->comm.src_proc)
998 xbt_fifo_remove(action->comm.src_proc->comms, action);
1001 SIMIX_simcall_answer(simcall);
1005 while (destroy_count-- > 0)
1006 SIMIX_comm_destroy(action);
1010 * \brief This function is called when a Surf communication action is finished.
1011 * \param action the corresponding Simix communication
1013 void SIMIX_post_comm(smx_action_t action)
1015 /* Update action state */
1016 if (action->comm.src_timeout &&
1017 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
1018 action->state = SIMIX_SRC_TIMEOUT;
1019 else if (action->comm.dst_timeout &&
1020 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
1021 action->state = SIMIX_DST_TIMEOUT;
1022 else if (action->comm.src_timeout &&
1023 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
1024 action->state = SIMIX_SRC_HOST_FAILURE;
1025 else if (action->comm.dst_timeout &&
1026 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
1027 action->state = SIMIX_DST_HOST_FAILURE;
1028 else if (action->comm.surf_comm &&
1029 surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
1030 XBT_DEBUG("Puta madre. Surf says that the link broke");
1031 action->state = SIMIX_LINK_FAILURE;
1033 action->state = SIMIX_DONE;
1035 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
1036 action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
1038 /* destroy the surf actions associated with the Simix communication */
1039 SIMIX_comm_destroy_internal_actions(action);
1041 /* remove the communication action from the list of pending communications
1042 * of both processes (if they still exist) */
1043 if (action->comm.src_proc) {
1044 xbt_fifo_remove(action->comm.src_proc->comms, action);
1046 if (action->comm.dst_proc) {
1047 xbt_fifo_remove(action->comm.dst_proc->comms, action);
1050 /* if there are simcalls associated with the action, then answer them */
1051 if (xbt_fifo_size(action->simcalls)) {
1052 SIMIX_comm_finish(action);
1056 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
1057 SIMIX_comm_cancel(action);
1059 void SIMIX_comm_cancel(smx_action_t action)
1061 /* if the action is a waiting state means that it is still in a rdv */
1062 /* so remove from it and delete it */
1063 if (action->state == SIMIX_WAITING) {
1064 SIMIX_rdv_remove(action->comm.rdv, action);
1065 action->state = SIMIX_CANCELED;
1067 else if (!MC_is_active() /* when running the MC there are no surf actions */
1068 && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
1070 surf_workstation_model->action_cancel(action->comm.surf_comm);
1074 void SIMIX_comm_suspend(smx_action_t action)
1076 /*FIXME: shall we suspend also the timeout actions? */
1077 if (action->comm.surf_comm)
1078 surf_workstation_model->suspend(action->comm.surf_comm);
1079 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
1082 void SIMIX_comm_resume(smx_action_t action)
1084 /*FIXME: check what happen with the timeouts */
1085 if (action->comm.surf_comm)
1086 surf_workstation_model->resume(action->comm.surf_comm);
1087 /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1091 /************* Action Getters **************/
1093 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1094 return SIMIX_comm_get_remains(action);
1097 * \brief get the amount remaining from the communication
1098 * \param action The communication
1100 double SIMIX_comm_get_remains(smx_action_t action)
1108 switch (action->state) {
1111 remains = surf_workstation_model->get_remains(action->comm.surf_comm);
1116 remains = 0; /*FIXME: check what should be returned */
1120 remains = 0; /*FIXME: is this correct? */
1126 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1127 return SIMIX_comm_get_state(action);
1129 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1131 return action->state;
1134 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1135 return SIMIX_comm_get_src_data(action);
1138 * \brief Return the user data associated to the sender of the communication
1139 * \param action The communication
1140 * \return the user data
1142 void* SIMIX_comm_get_src_data(smx_action_t action)
1144 return action->comm.src_data;
1147 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1148 return SIMIX_comm_get_dst_data(action);
1151 * \brief Return the user data associated to the receiver of the communication
1152 * \param action The communication
1153 * \return the user data
1155 void* SIMIX_comm_get_dst_data(smx_action_t action)
1157 return action->comm.dst_data;
1160 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1161 return SIMIX_comm_get_src_proc(action);
1163 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1165 return action->comm.src_proc;
1168 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1169 return SIMIX_comm_get_dst_proc(action);
1171 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1173 return action->comm.dst_proc;
1176 #ifdef HAVE_LATENCY_BOUND_TRACKING
1177 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1179 return SIMIX_comm_is_latency_bounded(action);
1183 * \brief verify if communication is latency bounded
1184 * \param comm The communication
1186 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1191 if (action->comm.surf_comm){
1192 XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1193 action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1194 XBT_DEBUG("Action limited is %d", action->latency_limited);
1196 return action->latency_limited;
1200 /******************************************************************************/
1201 /* SIMIX_comm_copy_data callbacks */
1202 /******************************************************************************/
1203 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1204 &SIMIX_comm_copy_pointer_callback;
1207 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1209 SIMIX_comm_copy_data_callback = callback;
1212 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1214 xbt_assert((buff_size == sizeof(void *)),
1215 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1216 *(void **) (comm->comm.dst_buff) = buff;
1219 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1221 XBT_DEBUG("Copy the data over");
1222 memcpy(comm->comm.dst_buff, buff, buff_size);
1223 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1225 comm->comm.src_buff = NULL;
1231 * \brief Copy the communication data from the sender's buffer to the receiver's one
1232 * \param comm The communication
1234 void SIMIX_comm_copy_data(smx_action_t comm)
1236 size_t buff_size = comm->comm.src_buff_size;
1237 /* If there is no data to be copy then return */
1238 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1241 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1243 comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1244 comm->comm.src_buff,
1245 comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1246 comm->comm.dst_buff, buff_size);
1248 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1249 if (comm->comm.dst_buff_size)
1250 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1252 /* Update the receiver's buffer size to the copied amount */
1253 if (comm->comm.dst_buff_size)
1254 *comm->comm.dst_buff_size = buff_size;
1257 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1259 /* Set the copied flag so we copy data only once */
1260 /* (this function might be called from both communication ends) */
1261 comm->comm.copied = 1;