1 /* Asynchronous parts of the basic collective algorithms, meant to be used both for the naive default implementation, but also for non blocking collectives */
3 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "colls_private.hpp"
9 #include "src/smpi/include/smpi_actor.hpp"
14 int colls::ibarrier(MPI_Comm comm, MPI_Request* request, int external)
16 int size = comm->size();
17 int rank = comm->rank();
18 int system_tag=COLL_TAG_BARRIER-external;
19 (*request) = new Request( nullptr, 0, MPI_BYTE,
20 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
21 std::vector<MPI_Request> requests;
24 requests.push_back(Request::isend_init (nullptr, 0, MPI_BYTE, 0, system_tag, comm));
25 requests.push_back(Request::irecv_init(nullptr, 0, MPI_BYTE, 0, system_tag, comm));
28 for (int i = 1; i < 2 * size - 1; i += 2) {
29 requests.push_back(Request::irecv_init(nullptr, 0, MPI_BYTE, MPI_ANY_SOURCE, system_tag, comm));
30 requests.push_back(Request::isend_init(nullptr, 0, MPI_BYTE, (i + 1) / 2, system_tag, comm));
33 (*request)->start_nbc_requests(requests);
37 int colls::ibcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, MPI_Request* request,
40 int size = comm->size();
41 int rank = comm->rank();
42 int system_tag=COLL_TAG_BCAST-external;
43 std::vector<MPI_Request> requests;
44 (*request) = new Request( nullptr, 0, MPI_BYTE,
45 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
47 requests.push_back(Request::irecv_init(buf, count, datatype, root, system_tag, comm));
50 for (int i = 0; i < size; i++) {
52 requests.push_back(Request::isend_init(buf, count, datatype, i, system_tag, comm));
56 (*request)->start_nbc_requests(requests);
60 int colls::iallgather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
61 MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
64 const int system_tag = COLL_TAG_ALLGATHER-external;
67 std::vector<MPI_Request> requests;
69 int rank = comm->rank();
70 int size = comm->size();
71 (*request) = new Request( nullptr, 0, MPI_BYTE,
72 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
73 // FIXME: check for errors
74 recvtype->extent(&lb, &recvext);
75 // Local copy from self
76 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
78 // Send/Recv buffers to/from others;
79 for (int other = 0; other < size; other++) {
81 requests.push_back(Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm));
82 requests.push_back(Request::irecv_init(static_cast<char*>(recvbuf) + other * recvcount * recvext, recvcount,
83 recvtype, other, system_tag, comm));
86 (*request)->start_nbc_requests(requests);
90 int colls::iscatter(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
91 MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request, int external)
93 const int system_tag = COLL_TAG_SCATTER-external;
96 std::vector<MPI_Request> requests;
98 int rank = comm->rank();
99 int size = comm->size();
100 (*request) = new Request( nullptr, 0, MPI_BYTE,
101 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
103 // Recv buffer from root
104 requests.push_back(Request::irecv_init(recvbuf, recvcount, recvtype, root, system_tag, comm));
106 sendtype->extent(&lb, &sendext);
107 // Local copy from root
108 if(recvbuf!=MPI_IN_PLACE){
109 Datatype::copy(static_cast<const char *>(sendbuf) + root * sendcount * sendext,
110 sendcount, sendtype, recvbuf, recvcount, recvtype);
112 // Send buffers to receivers
113 for(int dst = 0; dst < size; dst++) {
115 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
116 dst, system_tag, comm));
120 (*request)->start_nbc_requests(requests);
124 int colls::iallgatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, const int* recvcounts,
125 const int* displs, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
127 const int system_tag = COLL_TAG_ALLGATHERV-external;
129 MPI_Aint recvext = 0;
130 std::vector<MPI_Request> requests;
132 int rank = comm->rank();
133 int size = comm->size();
134 (*request) = new Request( nullptr, 0, MPI_BYTE,
135 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
136 recvtype->extent(&lb, &recvext);
137 // Local copy from self
138 Datatype::copy(sendbuf, sendcount, sendtype,
139 static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
140 // Send buffers to others;
141 for (int other = 0; other < size; other++) {
143 requests.push_back(Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm));
144 requests.push_back(Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
145 recvtype, other, system_tag, comm));
148 // Wait for completion of all comms.
149 (*request)->start_nbc_requests(requests);
153 int colls::ialltoall(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
154 MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
156 int system_tag = COLL_TAG_ALLTOALL-external;
158 MPI_Aint sendext = 0;
159 MPI_Aint recvext = 0;
160 std::vector<MPI_Request> requests;
163 int rank = comm->rank();
164 int size = comm->size();
165 (*request) = new Request( nullptr, 0, MPI_BYTE,
166 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
167 sendtype->extent(&lb, &sendext);
168 recvtype->extent(&lb, &recvext);
169 /* simple optimization */
170 int err = Datatype::copy(static_cast<const char *>(sendbuf) + rank * sendcount * sendext, sendcount, sendtype,
171 static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount, recvtype);
172 if (err == MPI_SUCCESS && size > 1) {
173 /* Initiate all send/recv to/from others. */
174 /* Post all receives first -- a simple optimization */
175 for (int i = (rank + 1) % size; i != rank; i = (i + 1) % size) {
176 requests.push_back(Request::irecv_init(static_cast<char *>(recvbuf) + i * recvcount * recvext, recvcount,
177 recvtype, i, system_tag, comm));
179 /* Now post all sends in reverse order
180 * - We would like to minimize the search time through message queue
181 * when messages actually arrive in the order in which they were posted.
182 * TODO: check the previous assertion
184 for (int i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size) {
185 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + i * sendcount * sendext, sendcount,
186 sendtype, i, system_tag, comm));
188 /* Wait for them all. */
189 (*request)->start_nbc_requests(requests);
194 int colls::ialltoallv(const void* sendbuf, const int* sendcounts, const int* senddisps, MPI_Datatype sendtype,
195 void* recvbuf, const int* recvcounts, const int* recvdisps, MPI_Datatype recvtype, MPI_Comm comm,
196 MPI_Request* request, int external)
198 const int system_tag = COLL_TAG_ALLTOALLV-external;
200 MPI_Aint sendext = 0;
201 MPI_Aint recvext = 0;
202 std::vector<MPI_Request> requests;
205 int rank = comm->rank();
206 int size = comm->size();
207 (*request) = new Request( nullptr, 0, MPI_BYTE,
208 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
209 sendtype->extent(&lb, &sendext);
210 recvtype->extent(&lb, &recvext);
211 /* Local copy from self */
212 int err = Datatype::copy(static_cast<const char *>(sendbuf) + senddisps[rank] * sendext, sendcounts[rank], sendtype,
213 static_cast<char *>(recvbuf) + recvdisps[rank] * recvext, recvcounts[rank], recvtype);
214 if (err == MPI_SUCCESS && size > 1) {
215 /* Initiate all send/recv to/from others. */
216 /* Create all receives that will be posted first */
217 for (int i = 0; i < size; ++i) {
219 requests.push_back(Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i] * recvext,
220 recvcounts[i], recvtype, i, system_tag, comm));
222 XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
225 /* Now create all sends */
226 for (int i = 0; i < size; ++i) {
228 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + senddisps[i] * sendext,
229 sendcounts[i], sendtype, i, system_tag, comm));
231 XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
234 /* Wait for them all. */
235 (*request)->start_nbc_requests(requests);
240 int colls::ialltoallw(const void* sendbuf, const int* sendcounts, const int* senddisps, const MPI_Datatype* sendtypes,
241 void* recvbuf, const int* recvcounts, const int* recvdisps, const MPI_Datatype* recvtypes,
242 MPI_Comm comm, MPI_Request* request, int external)
244 const int system_tag = COLL_TAG_ALLTOALLW-external;
245 std::vector<MPI_Request> requests;
248 int rank = comm->rank();
249 int size = comm->size();
250 (*request) = new Request( nullptr, 0, MPI_BYTE,
251 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
252 /* Local copy from self */
253 int err = (sendcounts[rank]>0 && recvcounts[rank]) ? Datatype::copy(static_cast<const char *>(sendbuf) + senddisps[rank], sendcounts[rank], sendtypes[rank],
254 static_cast<char *>(recvbuf) + recvdisps[rank], recvcounts[rank], recvtypes[rank]): MPI_SUCCESS;
255 if (err == MPI_SUCCESS && size > 1) {
256 /* Initiate all send/recv to/from others. */
257 /* Create all receives that will be posted first */
258 for (int i = 0; i < size; ++i) {
260 requests.push_back(Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i],
261 recvcounts[i], recvtypes[i], i, system_tag, comm));
263 XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
266 /* Now create all sends */
267 for (int i = 0; i < size; ++i) {
269 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + senddisps[i] ,
270 sendcounts[i], sendtypes[i], i, system_tag, comm));
272 XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
275 /* Wait for them all. */
276 (*request)->start_nbc_requests(requests);
281 int colls::igather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
282 MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request, int external)
284 const int system_tag = COLL_TAG_GATHER-external;
286 MPI_Aint recvext = 0;
287 std::vector<MPI_Request> requests;
289 int rank = comm->rank();
290 int size = comm->size();
291 (*request) = new Request( nullptr, 0, MPI_BYTE,
292 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
294 // Send buffer to root
295 requests.push_back(Request::isend_init(sendbuf, sendcount, sendtype, root, system_tag, comm));
297 recvtype->extent(&lb, &recvext);
298 // Local copy from root
299 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
300 recvcount, recvtype);
301 // Receive buffers from senders
302 for (int src = 0; src < size; src++) {
304 requests.push_back(Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
305 src, system_tag, comm));
309 (*request)->start_nbc_requests(requests);
313 int colls::igatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, const int* recvcounts,
314 const int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request,
317 int system_tag = COLL_TAG_GATHERV-external;
319 MPI_Aint recvext = 0;
320 std::vector<MPI_Request> requests;
322 int rank = comm->rank();
323 int size = comm->size();
324 (*request) = new Request( nullptr, 0, MPI_BYTE,
325 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
327 // Send buffer to root
328 requests.push_back(Request::isend_init(sendbuf, sendcount, sendtype, root, system_tag, comm));
330 recvtype->extent(&lb, &recvext);
331 // Local copy from root
332 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
333 recvcounts[root], recvtype);
334 // Receive buffers from senders
335 for (int src = 0; src < size; src++) {
337 requests.push_back(Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
338 recvcounts[src], recvtype, src, system_tag, comm));
342 // Wait for completion of irecv's.
343 (*request)->start_nbc_requests(requests);
346 int colls::iscatterv(const void* sendbuf, const int* sendcounts, const int* displs, MPI_Datatype sendtype,
347 void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request,
350 int system_tag = COLL_TAG_SCATTERV-external;
352 MPI_Aint sendext = 0;
353 std::vector<MPI_Request> requests;
355 int rank = comm->rank();
356 int size = comm->size();
357 (*request) = new Request( nullptr, 0, MPI_BYTE,
358 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
360 // Recv buffer from root
361 requests.push_back(Request::irecv_init(recvbuf, recvcount, recvtype, root, system_tag, comm));
363 sendtype->extent(&lb, &sendext);
364 // Local copy from root
365 if(recvbuf!=MPI_IN_PLACE){
366 Datatype::copy(static_cast<const char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
367 sendtype, recvbuf, recvcount, recvtype);
369 // Send buffers to receivers
370 for (int dst = 0; dst < size; dst++) {
372 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
373 sendtype, dst, system_tag, comm));
377 (*request)->start_nbc_requests(requests);
381 int colls::ireduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
382 MPI_Comm comm, MPI_Request* request, int external)
384 const int system_tag = COLL_TAG_REDUCE-external;
386 MPI_Aint dataext = 0;
387 std::vector<MPI_Request> requests;
389 const void* real_sendbuf = sendbuf;
391 int rank = comm->rank();
392 int size = comm->size();
397 unsigned char* tmp_sendbuf = nullptr;
398 if( sendbuf == MPI_IN_PLACE ) {
399 tmp_sendbuf = smpi_get_tmp_sendbuffer(count * datatype->get_extent());
400 Datatype::copy(recvbuf, count, datatype, tmp_sendbuf, count, datatype);
401 real_sendbuf = tmp_sendbuf;
405 (*request) = new Request( recvbuf, count, datatype,
406 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
409 (*request) = new Request( nullptr, count, datatype,
410 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
413 // Send buffer to root
414 requests.push_back(Request::isend_init(real_sendbuf, count, datatype, root, system_tag, comm));
416 datatype->extent(&lb, &dataext);
417 // Local copy from root
418 if (real_sendbuf != nullptr && recvbuf != nullptr)
419 Datatype::copy(real_sendbuf, count, datatype, recvbuf, count, datatype);
420 // Receive buffers from senders
421 for (int src = 0; src < size; src++) {
423 requests.push_back(Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, src, system_tag, comm));
427 (*request)->start_nbc_requests(requests);
428 if( sendbuf == MPI_IN_PLACE ) {
429 smpi_free_tmp_buffer(tmp_sendbuf);
434 int colls::iallreduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
435 MPI_Request* request, int external)
438 const int system_tag = COLL_TAG_ALLREDUCE-external;
440 MPI_Aint dataext = 0;
441 std::vector<MPI_Request> requests;
443 int rank = comm->rank();
444 int size = comm->size();
445 (*request) = new Request( recvbuf, count, datatype,
446 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
447 // FIXME: check for errors
448 datatype->extent(&lb, &dataext);
449 // Local copy from self
450 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
451 // Send/Recv buffers to/from others;
452 for (int other = 0; other < size; other++) {
454 requests.push_back(Request::isend_init(sendbuf, count, datatype, other, system_tag,comm));
455 requests.push_back(Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype,
456 other, system_tag, comm));
459 (*request)->start_nbc_requests(requests);
463 int colls::iscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
464 MPI_Request* request, int external)
466 int system_tag = -888-external;
468 MPI_Aint dataext = 0;
469 std::vector<MPI_Request> requests;
471 int rank = comm->rank();
472 int size = comm->size();
473 (*request) = new Request( recvbuf, count, datatype,
474 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
475 datatype->extent(&lb, &dataext);
477 // Local copy from self
478 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
480 // Send/Recv buffers to/from others
481 for (int other = 0; other < rank; other++) {
482 requests.push_back(Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, other, system_tag, comm));
484 for (int other = rank + 1; other < size; other++) {
485 requests.push_back(Request::isend_init(sendbuf, count, datatype, other, system_tag, comm));
487 // Wait for completion of all comms.
488 (*request)->start_nbc_requests(requests);
492 int colls::iexscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
493 MPI_Request* request, int external)
495 int system_tag = -888-external;
497 MPI_Aint dataext = 0;
498 std::vector<MPI_Request> requests;
500 int rank = comm->rank();
501 int size = comm->size();
502 (*request) = new Request( recvbuf, count, datatype,
503 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
504 datatype->extent(&lb, &dataext);
506 memset(recvbuf, 0, count*dataext);
508 // Send/Recv buffers to/from others
509 for (int other = 0; other < rank; other++) {
510 requests.push_back(Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, other, system_tag, comm));
512 for (int other = rank + 1; other < size; other++) {
513 requests.push_back(Request::isend_init(sendbuf, count, datatype, other, system_tag, comm));
515 // Wait for completion of all comms.
516 (*request)->start_nbc_requests(requests);
520 int colls::ireduce_scatter(const void* sendbuf, void* recvbuf, const int* recvcounts, MPI_Datatype datatype, MPI_Op op,
521 MPI_Comm comm, MPI_Request* request, int external)
523 // Version where each process performs the reduce for its own part. Alltoall pattern for comms.
524 const int system_tag = COLL_TAG_REDUCE_SCATTER-external;
526 MPI_Aint dataext = 0;
527 std::vector<MPI_Request> requests;
529 int rank = comm->rank();
530 int size = comm->size();
531 int count=recvcounts[rank];
532 (*request) = new Request( recvbuf, count, datatype,
533 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
534 datatype->extent(&lb, &dataext);
536 // Send/Recv buffers to/from others;
538 for (int other = 0; other < size; other++) {
540 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + recvdisp * dataext, recvcounts[other], datatype, other, system_tag,comm));
541 XBT_VERB("sending with recvdisp %d", recvdisp);
542 requests.push_back(Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype,
543 other, system_tag, comm));
545 Datatype::copy(static_cast<const char *>(sendbuf) + recvdisp * dataext, count, datatype, recvbuf, count, datatype);
547 recvdisp+=recvcounts[other];
549 (*request)->start_nbc_requests(requests);