1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/replay.h"
12 #include "surf/surf.h"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
15 "Logging specific to SMPI (base)");
17 static int match_recv(void* a, void* b, smx_action_t ignored) {
18 MPI_Request ref = (MPI_Request)a;
19 MPI_Request req = (MPI_Request)b;
21 xbt_assert(ref, "Cannot match recv against null reference");
22 xbt_assert(req, "Cannot match recv against null request");
23 return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
24 && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
27 static int match_send(void* a, void* b,smx_action_t ignored) {
28 MPI_Request ref = (MPI_Request)a;
29 MPI_Request req = (MPI_Request)b;
31 xbt_assert(ref, "Cannot match send against null reference");
32 xbt_assert(req, "Cannot match send against null request");
33 return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
34 && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
37 static MPI_Request build_request(void *buf, int count,
38 MPI_Datatype datatype, int src, int dst,
39 int tag, MPI_Comm comm, unsigned flags)
43 request = xbt_new(s_smpi_mpi_request_t, 1);
45 // FIXME: this will have to be changed to support non-contiguous datatypes
46 request->size = smpi_datatype_size(datatype) * count;
51 request->action = NULL;
52 request->flags = flags;
60 void smpi_action_trace_run(char *path)
64 xbt_dict_cursor_t cursor;
68 action_fp = fopen(path, "r");
69 xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
73 if (!xbt_dict_is_empty(action_queues)) {
75 ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
78 xbt_dict_foreach(action_queues, cursor, name, todo) {
79 XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
85 xbt_dict_free(&action_queues);
86 action_queues = xbt_dict_new_homogeneous(NULL);
89 static void smpi_mpi_request_free_voidp(void* request)
91 MPI_Request req = request;
92 smpi_mpi_request_free(&req);
95 /* MPI Low level calls */
96 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
97 int dst, int tag, MPI_Comm comm)
100 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
101 comm, PERSISTENT | SEND);
106 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
107 int src, int tag, MPI_Comm comm)
109 MPI_Request request =
110 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
111 comm, PERSISTENT | RECV);
116 void smpi_mpi_start(MPI_Request request)
121 xbt_assert(!request->action,
122 "Cannot (re)start a non-finished communication");
123 if(request->flags & RECV) {
124 print_request("New recv", request);
125 if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
126 mailbox = smpi_process_mailbox_small();
128 mailbox = smpi_process_mailbox();
130 // FIXME: SIMIX does not yet support non-contiguous datatypes
131 request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
133 print_request("New send", request);
135 if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode => detached send (FIXME: this limit should be configurable)
136 mailbox = smpi_process_remote_mailbox_small(
137 smpi_group_index(smpi_comm_group(request->comm), request->dst));
139 XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
140 mailbox = smpi_process_remote_mailbox(
141 smpi_group_index(smpi_comm_group(request->comm), request->dst));
143 if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
144 void *oldbuf = request->buf;
146 request->buf = malloc(request->size);
148 memcpy(request->buf,oldbuf,request->size);
149 XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
153 simcall_comm_isend(mailbox, request->size, -1.0,
154 request->buf, request->size,
156 &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
158 // detach if msg size < eager/rdv switch limit
162 /* FIXME: detached sends are not traceable (request->action == NULL) */
164 simcall_set_category(request->action, TRACE_internal_smpi_get_category());
171 void smpi_mpi_startall(int count, MPI_Request * requests)
175 for(i = 0; i < count; i++) {
176 smpi_mpi_start(requests[i]);
180 void smpi_mpi_request_free(MPI_Request * request)
183 *request = MPI_REQUEST_NULL;
186 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
187 int dst, int tag, MPI_Comm comm)
189 MPI_Request request =
190 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
191 comm, NON_PERSISTENT | SEND);
196 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
197 int dst, int tag, MPI_Comm comm)
199 MPI_Request request =
200 smpi_isend_init(buf, count, datatype, dst, tag, comm);
202 smpi_mpi_start(request);
206 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
207 int src, int tag, MPI_Comm comm)
209 MPI_Request request =
210 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
211 comm, NON_PERSISTENT | RECV);
216 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
217 int src, int tag, MPI_Comm comm)
219 MPI_Request request =
220 smpi_irecv_init(buf, count, datatype, src, tag, comm);
222 smpi_mpi_start(request);
226 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
227 int tag, MPI_Comm comm, MPI_Status * status)
231 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
232 smpi_mpi_wait(&request, status);
237 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
238 int tag, MPI_Comm comm)
242 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
243 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
246 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
247 int dst, int sendtag, void *recvbuf, int recvcount,
248 MPI_Datatype recvtype, int src, int recvtag,
249 MPI_Comm comm, MPI_Status * status)
251 MPI_Request requests[2];
255 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
257 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
258 smpi_mpi_startall(2, requests);
259 smpi_mpi_waitall(2, requests, stats);
260 if(status != MPI_STATUS_IGNORE) {
261 // Copy receive status
262 memcpy(status, &stats[1], sizeof(MPI_Status));
266 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
268 return status->count / smpi_datatype_size(datatype);
271 static void finish_wait(MPI_Request * request, MPI_Status * status)
273 MPI_Request req = *request;
274 // if we have a sender, we should use its data, and not the data from the receive
276 (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
277 req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
279 if(status != MPI_STATUS_IGNORE) {
280 status->MPI_SOURCE = req->src;
281 status->MPI_TAG = req->tag;
282 status->MPI_ERROR = MPI_SUCCESS;
283 // FIXME: really this should just contain the count of receive-type blocks,
285 status->count = req->size;
289 print_request("Finishing", req);
290 if(req->flags & NON_PERSISTENT) {
291 smpi_mpi_request_free(request);
297 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
300 if ((*request)->action == NULL)
303 flag = simcall_comm_test((*request)->action);
305 smpi_mpi_wait(request, status);
310 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
317 *index = MPI_UNDEFINED;
320 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
321 map = xbt_new(int, count);
323 for(i = 0; i < count; i++) {
324 if(requests[i]->action) {
325 xbt_dynar_push(comms, &requests[i]->action);
331 i = simcall_comm_testany(comms);
332 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
333 if(i != MPI_UNDEFINED) {
335 smpi_mpi_wait(&requests[*index], status);
340 xbt_dynar_free(&comms);
346 int smpi_mpi_testall(int count, MPI_Request requests[],
355 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
356 map = xbt_new(int, count);
358 for(i = 0; i < count; i++) {
359 if(requests[i]->action) {
360 xbt_dynar_push(comms, &requests[i]->action);
367 while(n_finished<size) {
368 i = simcall_comm_testany(comms);
369 if(i != MPI_UNDEFINED) {
370 smpi_mpi_wait(&requests[i], &status[i]);
373 XBT_DEBUG("TestAll, n finished %d on %d to test", n_finished, size);
377 xbt_dynar_free(&comms);
382 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
384 //FIXME find another wait to avoid busy waiting ?
385 // the issue here is that we have to wait on a nonexistent comm
388 request = smpi_mpi_iprobe(source, tag, comm, &flag, status);
389 XBT_DEBUG("Busy Waiting on probing : %d", flag);
391 smpi_mpi_request_free(&request);
392 simcall_process_sleep(0.0001);
397 MPI_Request smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
398 MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
399 comm, NON_PERSISTENT | RECV);
400 // behave like a receive, but don't do it
403 print_request("New iprobe", request);
404 // We have to test both mailboxes as we don't know if we will receive one one or another
405 if (xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")>0){
406 mailbox = smpi_process_mailbox_small();
407 request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
410 if (request->action==NULL){
411 mailbox = smpi_process_mailbox();
412 request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
416 MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
418 if(status != MPI_STATUS_IGNORE) {
419 status->MPI_SOURCE = req->src;
420 status->MPI_TAG = req->tag;
421 status->MPI_ERROR = MPI_SUCCESS;
422 status->count = req->size;
430 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
432 print_request("Waiting", *request);
433 if ((*request)->action != NULL) { // this is not a detached send
434 simcall_comm_wait((*request)->action, -1.0);
435 finish_wait(request, status);
437 // FIXME for a detached send, finish_wait is not called:
440 int smpi_mpi_waitany(int count, MPI_Request requests[],
447 index = MPI_UNDEFINED;
449 // Wait for a request to complete
450 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
451 map = xbt_new(int, count);
453 XBT_DEBUG("Wait for one of");
454 for(i = 0; i < count; i++) {
455 if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
456 print_request("Waiting any ", requests[i]);
457 xbt_dynar_push(comms, &requests[i]->action);
463 i = simcall_comm_waitany(comms);
465 // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
466 if (i != MPI_UNDEFINED) {
468 finish_wait(&requests[index], status);
473 xbt_dynar_free(&comms);
478 void smpi_mpi_waitall(int count, MPI_Request requests[],
483 MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
485 for(c = 0; c < count; c++) {
487 smpi_mpi_wait(&requests[c], pstat);
490 index = smpi_mpi_waitany(count, requests, pstat);
491 if(index == MPI_UNDEFINED) {
495 if(status != MPI_STATUS_IGNORE) {
496 memcpy(&status[index], pstat, sizeof(*pstat));
501 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
507 for(i = 0; i < incount; i++) {
508 if(smpi_mpi_testany(incount, requests, &index, status)) {
509 indices[count] = index;
516 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
519 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
520 nary_tree_bcast(buf, count, datatype, root, comm, 4);
523 void smpi_mpi_barrier(MPI_Comm comm)
525 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
526 nary_tree_barrier(comm, 4);
529 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
530 void *recvbuf, int recvcount, MPI_Datatype recvtype,
531 int root, MPI_Comm comm)
533 int system_tag = 666;
534 int rank, size, src, index;
535 MPI_Aint lb = 0, recvext = 0;
536 MPI_Request *requests;
538 rank = smpi_comm_rank(comm);
539 size = smpi_comm_size(comm);
541 // Send buffer to root
542 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
544 // FIXME: check for errors
545 smpi_datatype_extent(recvtype, &lb, &recvext);
546 // Local copy from root
547 smpi_datatype_copy(sendbuf, sendcount, sendtype,
548 (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
549 // Receive buffers from senders
550 requests = xbt_new(MPI_Request, size - 1);
552 for(src = 0; src < size; src++) {
554 requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
556 src, system_tag, comm);
560 // Wait for completion of irecv's.
561 smpi_mpi_startall(size - 1, requests);
562 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
567 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
568 void *recvbuf, int *recvcounts, int *displs,
569 MPI_Datatype recvtype, int root, MPI_Comm comm)
571 int system_tag = 666;
572 int rank, size, src, index;
573 MPI_Aint lb = 0, recvext = 0;
574 MPI_Request *requests;
576 rank = smpi_comm_rank(comm);
577 size = smpi_comm_size(comm);
579 // Send buffer to root
580 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
582 // FIXME: check for errors
583 smpi_datatype_extent(recvtype, &lb, &recvext);
584 // Local copy from root
585 smpi_datatype_copy(sendbuf, sendcount, sendtype,
586 (char *)recvbuf + displs[root] * recvext,
587 recvcounts[root], recvtype);
588 // Receive buffers from senders
589 requests = xbt_new(MPI_Request, size - 1);
591 for(src = 0; src < size; src++) {
594 smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
595 recvcounts[src], recvtype, src, system_tag, comm);
599 // Wait for completion of irecv's.
600 smpi_mpi_startall(size - 1, requests);
601 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
606 void smpi_mpi_allgather(void *sendbuf, int sendcount,
607 MPI_Datatype sendtype, void *recvbuf,
608 int recvcount, MPI_Datatype recvtype,
611 int system_tag = 666;
612 int rank, size, other, index;
613 MPI_Aint lb = 0, recvext = 0;
614 MPI_Request *requests;
616 rank = smpi_comm_rank(comm);
617 size = smpi_comm_size(comm);
618 // FIXME: check for errors
619 smpi_datatype_extent(recvtype, &lb, &recvext);
620 // Local copy from self
621 smpi_datatype_copy(sendbuf, sendcount, sendtype,
622 (char *)recvbuf + rank * recvcount * recvext, recvcount,
624 // Send/Recv buffers to/from others;
625 requests = xbt_new(MPI_Request, 2 * (size - 1));
627 for(other = 0; other < size; other++) {
630 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
633 requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
634 recvcount, recvtype, other,
639 // Wait for completion of all comms.
640 smpi_mpi_startall(2 * (size - 1), requests);
641 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
645 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
646 MPI_Datatype sendtype, void *recvbuf,
647 int *recvcounts, int *displs,
648 MPI_Datatype recvtype, MPI_Comm comm)
650 int system_tag = 666;
651 int rank, size, other, index;
652 MPI_Aint lb = 0, recvext = 0;
653 MPI_Request *requests;
655 rank = smpi_comm_rank(comm);
656 size = smpi_comm_size(comm);
657 // FIXME: check for errors
658 smpi_datatype_extent(recvtype, &lb, &recvext);
659 // Local copy from self
660 smpi_datatype_copy(sendbuf, sendcount, sendtype,
661 (char *)recvbuf + displs[rank] * recvext,
662 recvcounts[rank], recvtype);
663 // Send buffers to others;
664 requests = xbt_new(MPI_Request, 2 * (size - 1));
666 for(other = 0; other < size; other++) {
669 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
673 smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
674 recvtype, other, system_tag, comm);
678 // Wait for completion of all comms.
679 smpi_mpi_startall(2 * (size - 1), requests);
680 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
684 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
685 void *recvbuf, int recvcount, MPI_Datatype recvtype,
686 int root, MPI_Comm comm)
688 int system_tag = 666;
689 int rank, size, dst, index;
690 MPI_Aint lb = 0, sendext = 0;
691 MPI_Request *requests;
693 rank = smpi_comm_rank(comm);
694 size = smpi_comm_size(comm);
696 // Recv buffer from root
697 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
700 // FIXME: check for errors
701 smpi_datatype_extent(sendtype, &lb, &sendext);
702 // Local copy from root
703 smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
704 sendcount, sendtype, recvbuf, recvcount, recvtype);
705 // Send buffers to receivers
706 requests = xbt_new(MPI_Request, size - 1);
708 for(dst = 0; dst < size; dst++) {
710 requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
711 sendcount, sendtype, dst,
716 // Wait for completion of isend's.
717 smpi_mpi_startall(size - 1, requests);
718 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
723 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
724 MPI_Datatype sendtype, void *recvbuf, int recvcount,
725 MPI_Datatype recvtype, int root, MPI_Comm comm)
727 int system_tag = 666;
728 int rank, size, dst, index;
729 MPI_Aint lb = 0, sendext = 0;
730 MPI_Request *requests;
732 rank = smpi_comm_rank(comm);
733 size = smpi_comm_size(comm);
735 // Recv buffer from root
736 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
739 // FIXME: check for errors
740 smpi_datatype_extent(sendtype, &lb, &sendext);
741 // Local copy from root
742 smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
743 sendtype, recvbuf, recvcount, recvtype);
744 // Send buffers to receivers
745 requests = xbt_new(MPI_Request, size - 1);
747 for(dst = 0; dst < size; dst++) {
750 smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
751 sendtype, dst, system_tag, comm);
755 // Wait for completion of isend's.
756 smpi_mpi_startall(size - 1, requests);
757 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
762 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
763 MPI_Datatype datatype, MPI_Op op, int root,
766 int system_tag = 666;
767 int rank, size, src, index;
768 MPI_Aint lb = 0, dataext = 0;
769 MPI_Request *requests;
772 rank = smpi_comm_rank(comm);
773 size = smpi_comm_size(comm);
775 // Send buffer to root
776 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
778 // FIXME: check for errors
779 smpi_datatype_extent(datatype, &lb, &dataext);
780 // Local copy from root
781 if (sendbuf && recvbuf)
782 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
783 // Receive buffers from senders
784 //TODO: make a MPI_barrier here ?
785 requests = xbt_new(MPI_Request, size - 1);
786 tmpbufs = xbt_new(void *, size - 1);
788 for(src = 0; src < size; src++) {
790 // FIXME: possibly overkill we we have contiguous/noncontiguous data
792 tmpbufs[index] = xbt_malloc(count * dataext);
794 smpi_irecv_init(tmpbufs[index], count, datatype, src,
799 // Wait for completion of irecv's.
800 smpi_mpi_startall(size - 1, requests);
801 for(src = 0; src < size - 1; src++) {
802 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
803 XBT_VERB("finished waiting any request with index %d", index);
804 if(index == MPI_UNDEFINED) {
807 if(op) /* op can be MPI_OP_NULL that does nothing */
808 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
810 for(index = 0; index < size - 1; index++) {
811 xbt_free(tmpbufs[index]);
818 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
819 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
821 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
822 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
825 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
826 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
828 int system_tag = 666;
829 int rank, size, other, index;
830 MPI_Aint lb = 0, dataext = 0;
831 MPI_Request *requests;
834 rank = smpi_comm_rank(comm);
835 size = smpi_comm_size(comm);
837 // FIXME: check for errors
838 smpi_datatype_extent(datatype, &lb, &dataext);
840 // Local copy from self
841 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
843 // Send/Recv buffers to/from others;
844 requests = xbt_new(MPI_Request, size - 1);
845 tmpbufs = xbt_new(void *, rank);
847 for(other = 0; other < rank; other++) {
848 // FIXME: possibly overkill we we have contiguous/noncontiguous data
850 tmpbufs[index] = xbt_malloc(count * dataext);
852 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
856 for(other = rank + 1; other < size; other++) {
858 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
861 // Wait for completion of all comms.
862 smpi_mpi_startall(size - 1, requests);
863 for(other = 0; other < size - 1; other++) {
864 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
865 if(index == MPI_UNDEFINED) {
869 // #Request is below rank: it's a irecv
870 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
873 for(index = 0; index < rank; index++) {
874 xbt_free(tmpbufs[index]);