1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/replay.h"
12 #include "surf/surf.h"
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
18 static int match_recv(void* a, void* b, smx_action_t ignored) {
19 MPI_Request ref = (MPI_Request)a;
20 MPI_Request req = (MPI_Request)b;
21 XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
23 xbt_assert(ref, "Cannot match recv against null reference");
24 xbt_assert(req, "Cannot match recv against null request");
25 return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
26 && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
29 static int match_send(void* a, void* b,smx_action_t ignored) {
30 MPI_Request ref = (MPI_Request)a;
31 MPI_Request req = (MPI_Request)b;
32 XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
33 xbt_assert(ref, "Cannot match send against null reference");
34 xbt_assert(req, "Cannot match send against null request");
35 return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
36 && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
39 static MPI_Request build_request(void *buf, int count,
40 MPI_Datatype datatype, int src, int dst,
41 int tag, MPI_Comm comm, unsigned flags)
47 request = xbt_new(s_smpi_mpi_request_t, 1);
49 s_smpi_subtype_t *subtype = datatype->substruct;
51 if(datatype->has_subtype == 1){
52 // This part handles the problem of non-contignous memory
54 buf = malloc(count*smpi_datatype_size(datatype));
56 subtype->serialize(old_buf, buf, count, datatype->substruct);
61 // This part handles the problem of non-contignous memory (for the
62 // unserialisation at the reception)
63 request->old_buf = old_buf;
64 request->old_type = datatype;
66 request->size = smpi_datatype_size(datatype) * count;
71 request->action = NULL;
72 request->flags = flags;
81 void smpi_empty_status(MPI_Status * status) {
82 if(status != MPI_STATUS_IGNORE) {
83 status->MPI_SOURCE=MPI_ANY_SOURCE;
84 status->MPI_TAG=MPI_ANY_TAG;
89 void smpi_action_trace_run(char *path)
93 xbt_dict_cursor_t cursor;
97 action_fp = fopen(path, "r");
98 xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
102 if (!xbt_dict_is_empty(action_queues)) {
104 ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
107 xbt_dict_foreach(action_queues, cursor, name, todo) {
108 XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
114 xbt_dict_free(&action_queues);
115 action_queues = xbt_dict_new_homogeneous(NULL);
118 static void smpi_mpi_request_free_voidp(void* request)
120 MPI_Request req = request;
121 smpi_mpi_request_free(&req);
124 /* MPI Low level calls */
125 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
126 int dst, int tag, MPI_Comm comm)
128 MPI_Request request =
129 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
130 comm, PERSISTENT | SEND);
135 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
136 int src, int tag, MPI_Comm comm)
138 MPI_Request request =
139 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
140 comm, PERSISTENT | RECV);
145 void smpi_mpi_start(MPI_Request request)
150 xbt_assert(!request->action,
151 "Cannot (re)start a non-finished communication");
152 if(request->flags & RECV) {
153 print_request("New recv", request);
154 if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
155 mailbox = smpi_process_mailbox_small();
157 mailbox = smpi_process_mailbox();
159 // FIXME: SIMIX does not yet support non-contiguous datatypes
160 request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
162 print_request("New send", request);
163 if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode
164 mailbox = smpi_process_remote_mailbox_small(
165 smpi_group_index(smpi_comm_group(request->comm), request->dst));
167 XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
168 mailbox = smpi_process_remote_mailbox(
169 smpi_group_index(smpi_comm_group(request->comm), request->dst));
171 if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
172 void *oldbuf = request->buf;
174 request->buf = malloc(request->size);
176 memcpy(request->buf,oldbuf,request->size);
177 XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
181 simcall_comm_isend(mailbox, request->size, -1.0,
182 request->buf, request->size,
184 &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
186 // detach if msg size < eager/rdv switch limit
190 /* FIXME: detached sends are not traceable (request->action == NULL) */
192 simcall_set_category(request->action, TRACE_internal_smpi_get_category());
199 void smpi_mpi_startall(int count, MPI_Request * requests)
203 for(i = 0; i < count; i++) {
204 smpi_mpi_start(requests[i]);
208 void smpi_mpi_request_free(MPI_Request * request)
211 *request = MPI_REQUEST_NULL;
214 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
215 int dst, int tag, MPI_Comm comm)
217 MPI_Request request =
218 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
219 comm, NON_PERSISTENT | SEND);
224 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
225 int dst, int tag, MPI_Comm comm)
227 MPI_Request request =
228 smpi_isend_init(buf, count, datatype, dst, tag, comm);
230 smpi_mpi_start(request);
234 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
235 int src, int tag, MPI_Comm comm)
237 MPI_Request request =
238 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
239 comm, NON_PERSISTENT | RECV);
243 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
244 int src, int tag, MPI_Comm comm)
246 MPI_Request request =
247 smpi_irecv_init(buf, count, datatype, src, tag, comm);
249 smpi_mpi_start(request);
253 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
254 int tag, MPI_Comm comm, MPI_Status * status)
257 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
258 smpi_mpi_wait(&request, status);
263 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
264 int tag, MPI_Comm comm)
268 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
269 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
272 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
273 int dst, int sendtag, void *recvbuf, int recvcount,
274 MPI_Datatype recvtype, int src, int recvtag,
275 MPI_Comm comm, MPI_Status * status)
277 MPI_Request requests[2];
281 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
283 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
284 smpi_mpi_startall(2, requests);
285 smpi_mpi_waitall(2, requests, stats);
286 if(status != MPI_STATUS_IGNORE) {
287 // Copy receive status
288 memcpy(status, &stats[1], sizeof(MPI_Status));
292 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
294 return status->count / smpi_datatype_size(datatype);
297 static void finish_wait(MPI_Request * request, MPI_Status * status)
299 MPI_Request req = *request;
300 // if we have a sender, we should use its data, and not the data from the receive
302 (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
303 req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
305 if(status != MPI_STATUS_IGNORE) {
306 status->MPI_SOURCE = req->src;
307 status->MPI_TAG = req->tag;
308 status->MPI_ERROR = MPI_SUCCESS;
309 // FIXME: really this should just contain the count of receive-type blocks,
311 status->count = req->size;
315 print_request("Finishing", req);
316 MPI_Datatype datatype = req->old_type;
317 if(datatype->has_subtype == 1){
318 // This part handles the problem of non-contignous memory
319 // the unserialization at the reception
320 s_smpi_subtype_t *subtype = datatype->substruct;
321 if(req->flags & RECV) {
322 subtype->unserialize(req->buf, req->old_buf, req->size/smpi_datatype_size(datatype) , datatype->substruct);
324 //FIXME: I am not sure that if the send is detached we have to free
325 //the sender buffer thus I do it only for the reciever
326 if(req->flags & RECV) free(req->buf);
329 if(req->flags & NON_PERSISTENT) {
330 smpi_mpi_request_free(request);
336 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
339 //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
340 if ((*request)->action == NULL)
343 flag = simcall_comm_test((*request)->action);
345 finish_wait(request, status);
347 smpi_empty_status(status);
352 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
359 *index = MPI_UNDEFINED;
362 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
363 map = xbt_new(int, count);
365 for(i = 0; i < count; i++) {
366 if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
367 xbt_dynar_push(comms, &requests[i]->action);
373 i = simcall_comm_testany(comms);
374 // not MPI_UNDEFINED, as this is a simix return code
377 finish_wait(&requests[*index], status);
381 //all requests are null or inactive, return true
383 smpi_empty_status(status);
386 xbt_dynar_free(&comms);
393 int smpi_mpi_testall(int count, MPI_Request requests[],
397 MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
400 for(i=0; i<count; i++){
401 if(requests[i]!= MPI_REQUEST_NULL){
402 if (smpi_mpi_test(&requests[i], pstat)!=1){
406 smpi_empty_status(pstat);
408 if(status != MPI_STATUSES_IGNORE) {
409 memcpy(&status[i], pstat, sizeof(*pstat));
415 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
417 //FIXME find another wait to avoid busy waiting ?
418 // the issue here is that we have to wait on a nonexistent comm
420 smpi_mpi_iprobe(source, tag, comm, &flag, status);
421 XBT_DEBUG("Busy Waiting on probing : %d", flag);
423 simcall_process_sleep(0.0001);
428 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
429 MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
430 comm, NON_PERSISTENT | RECV);
432 // behave like a receive, but don't do it
435 print_request("New iprobe", request);
436 // We have to test both mailboxes as we don't know if we will receive one one or another
437 if (xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")>0){
438 mailbox = smpi_process_mailbox_small();
439 XBT_DEBUG("trying to probe the perm recv mailbox");
440 request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
442 if (request->action==NULL){
443 mailbox = smpi_process_mailbox();
444 XBT_DEBUG("trying to probe the other mailbox");
445 request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
449 MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
451 if(status != MPI_STATUS_IGNORE) {
452 status->MPI_SOURCE = req->src;
453 status->MPI_TAG = req->tag;
454 status->MPI_ERROR = MPI_SUCCESS;
455 status->count = req->size;
459 smpi_mpi_request_free(&request);
464 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
466 print_request("Waiting", *request);
467 if ((*request)->action != NULL) { // this is not a detached send
468 simcall_comm_wait((*request)->action, -1.0);
469 finish_wait(request, status);
471 // FIXME for a detached send, finish_wait is not called:
474 int smpi_mpi_waitany(int count, MPI_Request requests[],
481 index = MPI_UNDEFINED;
483 // Wait for a request to complete
484 comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
485 map = xbt_new(int, count);
487 XBT_DEBUG("Wait for one of");
488 for(i = 0; i < count; i++) {
489 if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
490 print_request("Waiting any ", requests[i]);
491 xbt_dynar_push(comms, &requests[i]->action);
497 i = simcall_comm_waitany(comms);
499 // not MPI_UNDEFINED, as this is a simix return code
502 finish_wait(&requests[index], status);
506 xbt_dynar_free(&comms);
509 if (index==MPI_UNDEFINED)
510 smpi_empty_status(status);
515 void smpi_mpi_waitall(int count, MPI_Request requests[],
520 MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
521 //tag invalid requests in the set
522 for(c = 0; c < count; c++) {
523 if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
524 if(status != MPI_STATUSES_IGNORE)
525 smpi_empty_status(&status[c]);
526 }else if(requests[c]->src == MPI_PROC_NULL ){
527 if(status != MPI_STATUSES_IGNORE) {
528 smpi_empty_status(&status[c]);
529 status[c].MPI_SOURCE=MPI_PROC_NULL;
534 for(c = 0; c < count; c++) {
536 smpi_mpi_wait(&requests[c], pstat);
539 index = smpi_mpi_waitany(count, requests, pstat);
540 if(index == MPI_UNDEFINED) {
543 if(status != MPI_STATUSES_IGNORE) {
544 memcpy(&status[index], pstat, sizeof(*pstat));
551 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
556 MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
559 for(i = 0; i < incount; i++)
561 index=smpi_mpi_waitany(incount, requests, pstat);
562 if(index!=MPI_UNDEFINED){
563 indices[count] = index;
565 if(status != MPI_STATUSES_IGNORE) {
566 memcpy(&status[index], pstat, sizeof(*pstat));
569 return MPI_UNDEFINED;
575 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
578 int i, count, count_dead;
580 MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
583 for(i = 0; i < incount; i++) {
584 if((requests[i] != MPI_REQUEST_NULL)) {
585 if(smpi_mpi_test(&requests[i], pstat)) {
588 if(status != MPI_STATUSES_IGNORE) {
589 memcpy(&status[i], pstat, sizeof(*pstat));
596 if(count_dead==incount)return MPI_UNDEFINED;
600 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
603 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
604 nary_tree_bcast(buf, count, datatype, root, comm, 4);
607 void smpi_mpi_barrier(MPI_Comm comm)
609 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
610 nary_tree_barrier(comm, 4);
613 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
614 void *recvbuf, int recvcount, MPI_Datatype recvtype,
615 int root, MPI_Comm comm)
617 int system_tag = 666;
618 int rank, size, src, index;
619 MPI_Aint lb = 0, recvext = 0;
620 MPI_Request *requests;
622 rank = smpi_comm_rank(comm);
623 size = smpi_comm_size(comm);
625 // Send buffer to root
626 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
628 // FIXME: check for errors
629 smpi_datatype_extent(recvtype, &lb, &recvext);
630 // Local copy from root
631 smpi_datatype_copy(sendbuf, sendcount, sendtype,
632 (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
633 // Receive buffers from senders
634 requests = xbt_new(MPI_Request, size - 1);
636 for(src = 0; src < size; src++) {
638 requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
640 src, system_tag, comm);
644 // Wait for completion of irecv's.
645 smpi_mpi_startall(size - 1, requests);
646 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
651 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
652 void *recvbuf, int *recvcounts, int *displs,
653 MPI_Datatype recvtype, int root, MPI_Comm comm)
655 int system_tag = 666;
656 int rank, size, src, index;
657 MPI_Aint lb = 0, recvext = 0;
658 MPI_Request *requests;
660 rank = smpi_comm_rank(comm);
661 size = smpi_comm_size(comm);
663 // Send buffer to root
664 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
666 // FIXME: check for errors
667 smpi_datatype_extent(recvtype, &lb, &recvext);
668 // Local copy from root
669 smpi_datatype_copy(sendbuf, sendcount, sendtype,
670 (char *)recvbuf + displs[root] * recvext,
671 recvcounts[root], recvtype);
672 // Receive buffers from senders
673 requests = xbt_new(MPI_Request, size - 1);
675 for(src = 0; src < size; src++) {
678 smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
679 recvcounts[src], recvtype, src, system_tag, comm);
683 // Wait for completion of irecv's.
684 smpi_mpi_startall(size - 1, requests);
685 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
690 void smpi_mpi_allgather(void *sendbuf, int sendcount,
691 MPI_Datatype sendtype, void *recvbuf,
692 int recvcount, MPI_Datatype recvtype,
695 int system_tag = 666;
696 int rank, size, other, index;
697 MPI_Aint lb = 0, recvext = 0;
698 MPI_Request *requests;
700 rank = smpi_comm_rank(comm);
701 size = smpi_comm_size(comm);
702 // FIXME: check for errors
703 smpi_datatype_extent(recvtype, &lb, &recvext);
704 // Local copy from self
705 smpi_datatype_copy(sendbuf, sendcount, sendtype,
706 (char *)recvbuf + rank * recvcount * recvext, recvcount,
708 // Send/Recv buffers to/from others;
709 requests = xbt_new(MPI_Request, 2 * (size - 1));
711 for(other = 0; other < size; other++) {
714 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
717 requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
718 recvcount, recvtype, other,
723 // Wait for completion of all comms.
724 smpi_mpi_startall(2 * (size - 1), requests);
725 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
729 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
730 MPI_Datatype sendtype, void *recvbuf,
731 int *recvcounts, int *displs,
732 MPI_Datatype recvtype, MPI_Comm comm)
734 int system_tag = 666;
735 int rank, size, other, index;
736 MPI_Aint lb = 0, recvext = 0;
737 MPI_Request *requests;
739 rank = smpi_comm_rank(comm);
740 size = smpi_comm_size(comm);
741 // FIXME: check for errors
742 smpi_datatype_extent(recvtype, &lb, &recvext);
743 // Local copy from self
744 smpi_datatype_copy(sendbuf, sendcount, sendtype,
745 (char *)recvbuf + displs[rank] * recvext,
746 recvcounts[rank], recvtype);
747 // Send buffers to others;
748 requests = xbt_new(MPI_Request, 2 * (size - 1));
750 for(other = 0; other < size; other++) {
753 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
757 smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
758 recvtype, other, system_tag, comm);
762 // Wait for completion of all comms.
763 smpi_mpi_startall(2 * (size - 1), requests);
764 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
768 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
769 void *recvbuf, int recvcount, MPI_Datatype recvtype,
770 int root, MPI_Comm comm)
772 int system_tag = 666;
773 int rank, size, dst, index;
774 MPI_Aint lb = 0, sendext = 0;
775 MPI_Request *requests;
777 rank = smpi_comm_rank(comm);
778 size = smpi_comm_size(comm);
780 // Recv buffer from root
781 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
784 // FIXME: check for errors
785 smpi_datatype_extent(sendtype, &lb, &sendext);
786 // Local copy from root
787 smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
788 sendcount, sendtype, recvbuf, recvcount, recvtype);
789 // Send buffers to receivers
790 requests = xbt_new(MPI_Request, size - 1);
792 for(dst = 0; dst < size; dst++) {
794 requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
795 sendcount, sendtype, dst,
800 // Wait for completion of isend's.
801 smpi_mpi_startall(size - 1, requests);
802 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
807 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
808 MPI_Datatype sendtype, void *recvbuf, int recvcount,
809 MPI_Datatype recvtype, int root, MPI_Comm comm)
811 int system_tag = 666;
812 int rank, size, dst, index;
813 MPI_Aint lb = 0, sendext = 0;
814 MPI_Request *requests;
816 rank = smpi_comm_rank(comm);
817 size = smpi_comm_size(comm);
819 // Recv buffer from root
820 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
823 // FIXME: check for errors
824 smpi_datatype_extent(sendtype, &lb, &sendext);
825 // Local copy from root
826 smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
827 sendtype, recvbuf, recvcount, recvtype);
828 // Send buffers to receivers
829 requests = xbt_new(MPI_Request, size - 1);
831 for(dst = 0; dst < size; dst++) {
834 smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
835 sendtype, dst, system_tag, comm);
839 // Wait for completion of isend's.
840 smpi_mpi_startall(size - 1, requests);
841 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
846 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
847 MPI_Datatype datatype, MPI_Op op, int root,
850 int system_tag = 666;
851 int rank, size, src, index;
852 MPI_Aint lb = 0, dataext = 0;
853 MPI_Request *requests;
856 rank = smpi_comm_rank(comm);
857 size = smpi_comm_size(comm);
859 // Send buffer to root
860 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
862 // FIXME: check for errors
863 smpi_datatype_extent(datatype, &lb, &dataext);
864 // Local copy from root
865 if (sendbuf && recvbuf)
866 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
867 // Receive buffers from senders
868 //TODO: make a MPI_barrier here ?
869 requests = xbt_new(MPI_Request, size - 1);
870 tmpbufs = xbt_new(void *, size - 1);
872 for(src = 0; src < size; src++) {
874 // FIXME: possibly overkill we we have contiguous/noncontiguous data
876 tmpbufs[index] = xbt_malloc(count * dataext);
878 smpi_irecv_init(tmpbufs[index], count, datatype, src,
883 // Wait for completion of irecv's.
884 smpi_mpi_startall(size - 1, requests);
885 for(src = 0; src < size - 1; src++) {
886 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
887 XBT_DEBUG("finished waiting any request with index %d", index);
888 if(index == MPI_UNDEFINED) {
891 if(op) /* op can be MPI_OP_NULL that does nothing */
892 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
894 for(index = 0; index < size - 1; index++) {
895 xbt_free(tmpbufs[index]);
902 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
903 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
905 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
906 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
909 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
910 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
912 int system_tag = 666;
913 int rank, size, other, index;
914 MPI_Aint lb = 0, dataext = 0;
915 MPI_Request *requests;
918 rank = smpi_comm_rank(comm);
919 size = smpi_comm_size(comm);
921 // FIXME: check for errors
922 smpi_datatype_extent(datatype, &lb, &dataext);
924 // Local copy from self
925 smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
927 // Send/Recv buffers to/from others;
928 requests = xbt_new(MPI_Request, size - 1);
929 tmpbufs = xbt_new(void *, rank);
931 for(other = 0; other < rank; other++) {
932 // FIXME: possibly overkill we we have contiguous/noncontiguous data
934 tmpbufs[index] = xbt_malloc(count * dataext);
936 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
940 for(other = rank + 1; other < size; other++) {
942 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
945 // Wait for completion of all comms.
946 smpi_mpi_startall(size - 1, requests);
947 for(other = 0; other < size - 1; other++) {
948 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
949 if(index == MPI_UNDEFINED) {
953 // #Request is below rank: it's a irecv
954 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
957 for(index = 0; index < rank; index++) {
958 xbt_free(tmpbufs[index]);