1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22 static MPI_Request build_request(void *buf, int count,
23 MPI_Datatype datatype, int src, int dst,
24 int tag, MPI_Comm comm, unsigned flags)
28 request = xbt_new(s_smpi_mpi_request_t, 1);
30 request->size = smpi_datatype_size(datatype) * count;
37 request->complete = 0;
38 request->match = MPI_REQUEST_NULL;
39 request->flags = flags;
47 /* MPI Low level calls */
48 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
49 int dst, int tag, MPI_Comm comm)
52 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
53 comm, PERSISTENT | SEND);
58 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
59 int src, int tag, MPI_Comm comm)
62 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
63 comm, PERSISTENT | RECV);
68 void smpi_mpi_start(MPI_Request request)
70 xbt_assert0(request->complete == 0,
71 "Cannot start a non-finished communication");
72 if ((request->flags & RECV) == RECV) {
73 smpi_process_post_recv(request);
74 print_request("New recv", request);
76 SIMIX_network_irecv(request->rdv, request->buf, &request->size);
78 smpi_process_post_send(request->comm, request); // FIXME
79 print_request("New send", request);
81 SIMIX_network_isend(request->rdv, request->size, -1.0,
82 request->buf, request->size, NULL);
86 void smpi_mpi_startall(int count, MPI_Request * requests)
90 for (i = 0; i < count; i++) {
91 smpi_mpi_start(requests[i]);
95 void smpi_mpi_request_free(MPI_Request * request)
98 *request = MPI_REQUEST_NULL;
101 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
102 int dst, int tag, MPI_Comm comm)
104 MPI_Request request =
105 build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
106 comm, NON_PERSISTENT | SEND);
111 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
112 int dst, int tag, MPI_Comm comm)
114 MPI_Request request =
115 smpi_isend_init(buf, count, datatype, dst, tag, comm);
117 smpi_mpi_start(request);
121 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
122 int src, int tag, MPI_Comm comm)
124 MPI_Request request =
125 build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
126 comm, NON_PERSISTENT | RECV);
131 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
132 int src, int tag, MPI_Comm comm)
134 MPI_Request request =
135 smpi_irecv_init(buf, count, datatype, src, tag, comm);
137 smpi_mpi_start(request);
141 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
142 int tag, MPI_Comm comm, MPI_Status * status)
146 request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
147 smpi_mpi_wait(&request, status);
150 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
151 int tag, MPI_Comm comm)
155 request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
156 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
159 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
160 int dst, int sendtag, void *recvbuf, int recvcount,
161 MPI_Datatype recvtype, int src, int recvtag,
162 MPI_Comm comm, MPI_Status * status)
164 MPI_Request requests[2];
168 smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
170 smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
171 smpi_mpi_startall(2, requests);
172 smpi_mpi_waitall(2, requests, stats);
173 if (status != MPI_STATUS_IGNORE) {
174 // Copy receive status
175 memcpy(status, &stats[1], sizeof(MPI_Status));
179 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
181 return status->count / smpi_datatype_size(datatype);
184 static void finish_wait(MPI_Request * request, MPI_Status * status)
186 if (status != MPI_STATUS_IGNORE) {
187 status->MPI_SOURCE = (*request)->src;
188 status->MPI_TAG = (*request)->tag;
189 status->MPI_ERROR = MPI_SUCCESS;
190 status->count = SIMIX_communication_get_dst_buf_size((*request)->pair);
192 SIMIX_communication_destroy((*request)->pair);
193 print_request("finishing wait", *request);
194 if ((*request)->complete == 1) {
195 SIMIX_rdv_destroy((*request)->rdv);
197 (*request)->match->complete = 1;
198 (*request)->match->match = MPI_REQUEST_NULL;
200 if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
201 smpi_mpi_request_free(request);
203 (*request)->rdv = NULL;
204 (*request)->pair = NULL;
208 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
210 int flag = (*request)->complete;
213 smpi_mpi_wait(request, status);
218 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
223 *index = MPI_UNDEFINED;
225 for (i = 0; i < count; i++) {
226 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
227 smpi_mpi_wait(&requests[i], status);
236 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
238 print_request("wait", *request);
239 SIMIX_network_wait((*request)->pair, -1.0);
240 finish_wait(request, status);
243 int smpi_mpi_waitany(int count, MPI_Request requests[],
250 index = MPI_UNDEFINED;
252 // First check for already completed requests
253 for (i = 0; i < count; i++) {
254 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
256 smpi_mpi_wait(&requests[index], status);
260 if (index == MPI_UNDEFINED) {
261 // Otherwise, wait for a request to complete
262 comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
263 map = xbt_new(int, count);
265 DEBUG0("Wait for one of");
266 for (i = 0; i < count; i++) {
267 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
268 print_request(" ", requests[i]);
269 xbt_dynar_push(comms, &requests[i]->pair);
275 index = SIMIX_network_waitany(comms);
277 finish_wait(&requests[index], status);
280 xbt_dynar_free(&comms);
286 void smpi_mpi_waitall(int count, MPI_Request requests[],
293 index = smpi_mpi_waitany(count, requests, &stat);
294 if (index == MPI_UNDEFINED) {
297 if (status != MPI_STATUS_IGNORE) {
298 memcpy(&status[index], &stat, sizeof(stat));
300 // FIXME: check this -v
301 // Move the last request to the found position
302 requests[index] = requests[count - 1];
303 requests[count - 1] = MPI_REQUEST_NULL;
308 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
314 for (i = 0; i < incount; i++) {
315 if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
316 smpi_mpi_wait(&requests[i],
318 MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
326 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
329 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
330 nary_tree_bcast(buf, count, datatype, root, comm, 4);
333 void smpi_mpi_barrier(MPI_Comm comm)
335 // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
336 nary_tree_barrier(comm, 4);
339 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
340 void *recvbuf, int recvcount, MPI_Datatype recvtype,
341 int root, MPI_Comm comm)
343 int system_tag = 666;
344 int rank, size, src, index, sendsize, recvsize;
345 MPI_Request *requests;
347 rank = smpi_comm_rank(comm);
348 size = smpi_comm_size(comm);
350 // Send buffer to root
351 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
353 sendsize = smpi_datatype_size(sendtype);
354 recvsize = smpi_datatype_size(recvtype);
355 // Local copy from root
356 memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
357 sendcount * sendsize * sizeof(char));
358 // Receive buffers from senders
359 requests = xbt_new(MPI_Request, size - 1);
361 for (src = 0; src < size; src++) {
363 requests[index] = smpi_irecv_init(&((char *) recvbuf)
364 [src * recvcount * recvsize],
365 recvcount, recvtype, src,
370 // Wait for completion of irecv's.
371 smpi_mpi_startall(size - 1, requests);
372 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
377 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
378 void *recvbuf, int *recvcounts, int *displs,
379 MPI_Datatype recvtype, int root, MPI_Comm comm)
381 int system_tag = 666;
382 int rank, size, src, index, sendsize;
383 MPI_Request *requests;
385 rank = smpi_comm_rank(comm);
386 size = smpi_comm_size(comm);
388 // Send buffer to root
389 smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
391 sendsize = smpi_datatype_size(sendtype);
392 // Local copy from root
393 memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
394 sendcount * sendsize * sizeof(char));
395 // Receive buffers from senders
396 requests = xbt_new(MPI_Request, size - 1);
398 for (src = 0; src < size; src++) {
401 smpi_irecv_init(&((char *) recvbuf)[displs[src]],
402 recvcounts[src], recvtype, src, system_tag,
407 // Wait for completion of irecv's.
408 smpi_mpi_startall(size - 1, requests);
409 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
414 void smpi_mpi_allgather(void *sendbuf, int sendcount,
415 MPI_Datatype sendtype, void *recvbuf,
416 int recvcount, MPI_Datatype recvtype,
419 int system_tag = 666;
420 int rank, size, other, index, sendsize, recvsize;
421 MPI_Request *requests;
423 rank = smpi_comm_rank(comm);
424 size = smpi_comm_size(comm);
425 sendsize = smpi_datatype_size(sendtype);
426 recvsize = smpi_datatype_size(recvtype);
427 // Local copy from self
428 memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
429 sendcount * sendsize * sizeof(char));
430 // Send/Recv buffers to/from others;
431 requests = xbt_new(MPI_Request, 2 * (size - 1));
433 for (other = 0; other < size; other++) {
436 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
439 requests[index] = smpi_irecv_init(&((char *) recvbuf)
440 [other * recvcount * recvsize],
441 recvcount, recvtype, other,
446 // Wait for completion of all comms.
447 smpi_mpi_startall(2 * (size - 1), requests);
448 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
452 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
453 MPI_Datatype sendtype, void *recvbuf,
454 int *recvcounts, int *displs,
455 MPI_Datatype recvtype, MPI_Comm comm)
457 int system_tag = 666;
458 int rank, size, other, index, sendsize, recvsize;
459 MPI_Request *requests;
461 rank = smpi_comm_rank(comm);
462 size = smpi_comm_size(comm);
463 sendsize = smpi_datatype_size(sendtype);
464 recvsize = smpi_datatype_size(recvtype);
465 // Local copy from self
466 memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
467 sendcount * sendsize * sizeof(char));
468 // Send buffers to others;
469 requests = xbt_new(MPI_Request, 2 * (size - 1));
471 for (other = 0; other < size; other++) {
474 smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
478 smpi_irecv_init(&((char *) recvbuf)[displs[other]],
479 recvcounts[other], recvtype, other, system_tag,
484 // Wait for completion of all comms.
485 smpi_mpi_startall(2 * (size - 1), requests);
486 smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
490 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
491 void *recvbuf, int recvcount, MPI_Datatype recvtype,
492 int root, MPI_Comm comm)
494 int system_tag = 666;
495 int rank, size, dst, index, sendsize, recvsize;
496 MPI_Request *requests;
498 rank = smpi_comm_rank(comm);
499 size = smpi_comm_size(comm);
501 // Recv buffer from root
502 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
505 sendsize = smpi_datatype_size(sendtype);
506 recvsize = smpi_datatype_size(recvtype);
507 // Local copy from root
508 memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
509 recvcount * recvsize * sizeof(char));
510 // Send buffers to receivers
511 requests = xbt_new(MPI_Request, size - 1);
513 for (dst = 0; dst < size; dst++) {
515 requests[index] = smpi_isend_init(&((char *) sendbuf)
516 [dst * sendcount * sendsize],
517 sendcount, sendtype, dst,
522 // Wait for completion of isend's.
523 smpi_mpi_startall(size - 1, requests);
524 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
529 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
530 MPI_Datatype sendtype, void *recvbuf, int recvcount,
531 MPI_Datatype recvtype, int root, MPI_Comm comm)
533 int system_tag = 666;
534 int rank, size, dst, index, sendsize, recvsize;
535 MPI_Request *requests;
537 rank = smpi_comm_rank(comm);
538 size = smpi_comm_size(comm);
540 // Recv buffer from root
541 smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
544 sendsize = smpi_datatype_size(sendtype);
545 recvsize = smpi_datatype_size(recvtype);
546 // Local copy from root
547 memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
548 recvcount * recvsize * sizeof(char));
549 // Send buffers to receivers
550 requests = xbt_new(MPI_Request, size - 1);
552 for (dst = 0; dst < size; dst++) {
555 smpi_isend_init(&((char *) sendbuf)[displs[dst]],
556 sendcounts[dst], sendtype, dst, system_tag,
561 // Wait for completion of isend's.
562 smpi_mpi_startall(size - 1, requests);
563 smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
568 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
569 MPI_Datatype datatype, MPI_Op op, int root,
572 int system_tag = 666;
573 int rank, size, src, index, datasize;
574 MPI_Request *requests;
577 rank = smpi_comm_rank(comm);
578 size = smpi_comm_size(comm);
580 // Send buffer to root
581 smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
583 datasize = smpi_datatype_size(datatype);
584 // Local copy from root
585 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
586 // Receive buffers from senders
587 //TODO: make a MPI_barrier here ?
588 requests = xbt_new(MPI_Request, size - 1);
589 tmpbufs = xbt_new(void *, size - 1);
591 for (src = 0; src < size; src++) {
593 tmpbufs[index] = xbt_malloc(count * datasize);
595 smpi_irecv_init(tmpbufs[index], count, datatype, src,
600 // Wait for completion of irecv's.
601 smpi_mpi_startall(size - 1, requests);
602 for (src = 0; src < size - 1; src++) {
603 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
604 if (index == MPI_UNDEFINED) {
607 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
609 for (index = 0; index < size - 1; index++) {
610 xbt_free(tmpbufs[index]);
617 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
618 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
620 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
621 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
624 FIXME: buggy implementation
626 int system_tag = 666;
627 int rank, size, other, index, datasize;
628 MPI_Request* requests;
631 rank = smpi_comm_rank(comm);
632 size = smpi_comm_size(comm);
633 datasize = smpi_datatype_size(datatype);
634 // Local copy from self
635 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
636 // Send/Recv buffers to/from others;
637 //TODO: make a MPI_barrier here ?
638 requests = xbt_new(MPI_Request, 2 * (size - 1));
639 tmpbufs = xbt_new(void*, size - 1);
641 for(other = 0; other < size; other++) {
643 tmpbufs[index / 2] = xbt_malloc(count * datasize);
644 requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
645 requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
649 // Wait for completion of all comms.
650 for(other = 0; other < 2 * (size - 1); other++) {
651 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
652 if(index == MPI_UNDEFINED) {
655 if((index & 1) == 1) {
656 // Request is odd: it's a irecv
657 smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
660 for(index = 0; index < size - 1; index++) {
661 xbt_free(tmpbufs[index]);
668 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
669 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
671 int system_tag = 666;
672 int rank, size, other, index, datasize;
674 MPI_Request *requests;
677 rank = smpi_comm_rank(comm);
678 size = smpi_comm_size(comm);
679 datasize = smpi_datatype_size(datatype);
680 // Local copy from self
681 memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
682 // Send/Recv buffers to/from others;
683 total = rank + (size - (rank + 1));
684 requests = xbt_new(MPI_Request, total);
685 tmpbufs = xbt_new(void *, rank);
687 for (other = 0; other < rank; other++) {
688 tmpbufs[index] = xbt_malloc(count * datasize);
690 smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
694 for (other = rank + 1; other < size; other++) {
696 smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
699 // Wait for completion of all comms.
700 smpi_mpi_startall(size - 1, requests);
701 for (other = 0; other < total; other++) {
702 index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
703 if (index == MPI_UNDEFINED) {
707 // #Request is below rank: it's a irecv
708 smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
711 for (index = 0; index < size - 1; index++) {
712 xbt_free(tmpbufs[index]);