1 /* Asynchronous parts of the basic collective algorithms, meant to be used both for the naive default implementation, but also for non blocking collectives */
3 /* Copyright (c) 2009-2023. The SimGrid Team. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "colls_private.hpp"
9 #include "src/smpi/include/smpi_actor.hpp"
11 namespace simgrid::smpi {
13 int colls::ibarrier(MPI_Comm comm, MPI_Request* request, int external)
15 int size = comm->size();
16 int rank = comm->rank();
17 int system_tag=COLL_TAG_BARRIER-external;
18 (*request) = new Request( nullptr, 0, MPI_BYTE,
19 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
20 std::vector<MPI_Request> requests;
23 requests.push_back(Request::isend_init (nullptr, 0, MPI_BYTE, 0, system_tag, comm));
24 requests.push_back(Request::irecv_init(nullptr, 0, MPI_BYTE, 0, system_tag, comm));
27 for (int i = 1; i < 2 * size - 1; i += 2) {
28 requests.push_back(Request::irecv_init(nullptr, 0, MPI_BYTE, MPI_ANY_SOURCE, system_tag, comm));
29 requests.push_back(Request::isend_init(nullptr, 0, MPI_BYTE, (i + 1) / 2, system_tag, comm));
32 (*request)->start_nbc_requests(requests);
36 int colls::ibcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, MPI_Request* request,
39 int size = comm->size();
40 int rank = comm->rank();
41 int system_tag=COLL_TAG_BCAST-external;
42 std::vector<MPI_Request> requests;
43 (*request) = new Request( nullptr, 0, MPI_BYTE,
44 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
46 requests.push_back(Request::irecv_init(buf, count, datatype, root, system_tag, comm));
49 for (int i = 0; i < size; i++) {
51 requests.push_back(Request::isend_init(buf, count, datatype, i, system_tag, comm));
55 (*request)->start_nbc_requests(requests);
59 int colls::iallgather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
60 MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
63 const int system_tag = COLL_TAG_ALLGATHER-external;
66 std::vector<MPI_Request> requests;
68 int rank = comm->rank();
69 int size = comm->size();
70 (*request) = new Request( nullptr, 0, MPI_BYTE,
71 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
72 // FIXME: check for errors
73 recvtype->extent(&lb, &recvext);
74 // Local copy from self
75 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
77 // Send/Recv buffers to/from others;
78 for (int other = 0; other < size; other++) {
80 requests.push_back(Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm));
81 requests.push_back(Request::irecv_init(static_cast<char*>(recvbuf) + other * recvcount * recvext, recvcount,
82 recvtype, other, system_tag, comm));
85 (*request)->start_nbc_requests(requests);
89 int colls::iscatter(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
90 MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request, int external)
92 const int system_tag = COLL_TAG_SCATTER-external;
95 std::vector<MPI_Request> requests;
97 int rank = comm->rank();
98 int size = comm->size();
99 (*request) = new Request( nullptr, 0, MPI_BYTE,
100 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
102 // Recv buffer from root
103 requests.push_back(Request::irecv_init(recvbuf, recvcount, recvtype, root, system_tag, comm));
105 sendtype->extent(&lb, &sendext);
106 // Local copy from root
107 if(recvbuf!=MPI_IN_PLACE){
108 Datatype::copy(static_cast<const char *>(sendbuf) + root * sendcount * sendext,
109 sendcount, sendtype, recvbuf, recvcount, recvtype);
111 // Send buffers to receivers
112 for(int dst = 0; dst < size; dst++) {
114 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
115 dst, system_tag, comm));
119 (*request)->start_nbc_requests(requests);
123 int colls::iallgatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, const int* recvcounts,
124 const int* displs, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
126 const int system_tag = COLL_TAG_ALLGATHERV-external;
128 MPI_Aint recvext = 0;
129 std::vector<MPI_Request> requests;
131 int rank = comm->rank();
132 int size = comm->size();
133 (*request) = new Request( nullptr, 0, MPI_BYTE,
134 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
135 recvtype->extent(&lb, &recvext);
136 // Local copy from self
137 Datatype::copy(sendbuf, sendcount, sendtype,
138 static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
139 // Send buffers to others;
140 for (int other = 0; other < size; other++) {
142 requests.push_back(Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm));
143 requests.push_back(Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
144 recvtype, other, system_tag, comm));
147 // Wait for completion of all comms.
148 (*request)->start_nbc_requests(requests);
152 int colls::ialltoall(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
153 MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
155 int system_tag = COLL_TAG_ALLTOALL-external;
157 MPI_Aint sendext = 0;
158 MPI_Aint recvext = 0;
159 std::vector<MPI_Request> requests;
162 int rank = comm->rank();
163 int size = comm->size();
164 (*request) = new Request( nullptr, 0, MPI_BYTE,
165 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
166 sendtype->extent(&lb, &sendext);
167 recvtype->extent(&lb, &recvext);
168 /* simple optimization */
169 int err = Datatype::copy(static_cast<const char *>(sendbuf) + rank * sendcount * sendext, sendcount, sendtype,
170 static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount, recvtype);
171 if (err == MPI_SUCCESS && size > 1) {
172 /* Initiate all send/recv to/from others. */
173 /* Post all receives first -- a simple optimization */
174 for (int i = (rank + 1) % size; i != rank; i = (i + 1) % size) {
175 requests.push_back(Request::irecv_init(static_cast<char *>(recvbuf) + i * recvcount * recvext, recvcount,
176 recvtype, i, system_tag, comm));
178 /* Now post all sends in reverse order
179 * - We would like to minimize the search time through message queue
180 * when messages actually arrive in the order in which they were posted.
181 * TODO: check the previous assertion
183 for (int i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size) {
184 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + i * sendcount * sendext, sendcount,
185 sendtype, i, system_tag, comm));
187 /* Wait for them all. */
188 (*request)->start_nbc_requests(requests);
193 int colls::ialltoallv(const void* sendbuf, const int* sendcounts, const int* senddisps, MPI_Datatype sendtype,
194 void* recvbuf, const int* recvcounts, const int* recvdisps, MPI_Datatype recvtype, MPI_Comm comm,
195 MPI_Request* request, int external)
197 const int system_tag = COLL_TAG_ALLTOALLV-external;
199 MPI_Aint sendext = 0;
200 MPI_Aint recvext = 0;
201 std::vector<MPI_Request> requests;
204 int rank = comm->rank();
205 int size = comm->size();
206 (*request) = new Request( nullptr, 0, MPI_BYTE,
207 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
208 sendtype->extent(&lb, &sendext);
209 recvtype->extent(&lb, &recvext);
210 /* Local copy from self */
211 int err = Datatype::copy(static_cast<const char *>(sendbuf) + senddisps[rank] * sendext, sendcounts[rank], sendtype,
212 static_cast<char *>(recvbuf) + recvdisps[rank] * recvext, recvcounts[rank], recvtype);
213 if (err == MPI_SUCCESS && size > 1) {
214 /* Initiate all send/recv to/from others. */
215 /* Create all receives that will be posted first */
216 for (int i = 0; i < size; ++i) {
218 requests.push_back(Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i] * recvext,
219 recvcounts[i], recvtype, i, system_tag, comm));
221 XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
224 /* Now create all sends */
225 for (int i = 0; i < size; ++i) {
227 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + senddisps[i] * sendext,
228 sendcounts[i], sendtype, i, system_tag, comm));
230 XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
233 /* Wait for them all. */
234 (*request)->start_nbc_requests(requests);
239 int colls::ialltoallw(const void* sendbuf, const int* sendcounts, const int* senddisps, const MPI_Datatype* sendtypes,
240 void* recvbuf, const int* recvcounts, const int* recvdisps, const MPI_Datatype* recvtypes,
241 MPI_Comm comm, MPI_Request* request, int external)
243 const int system_tag = COLL_TAG_ALLTOALLW-external;
244 std::vector<MPI_Request> requests;
247 int rank = comm->rank();
248 int size = comm->size();
249 (*request) = new Request( nullptr, 0, MPI_BYTE,
250 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
251 /* Local copy from self */
252 int err = (sendcounts[rank]>0 && recvcounts[rank]) ? Datatype::copy(static_cast<const char *>(sendbuf) + senddisps[rank], sendcounts[rank], sendtypes[rank],
253 static_cast<char *>(recvbuf) + recvdisps[rank], recvcounts[rank], recvtypes[rank]): MPI_SUCCESS;
254 if (err == MPI_SUCCESS && size > 1) {
255 /* Initiate all send/recv to/from others. */
256 /* Create all receives that will be posted first */
257 for (int i = 0; i < size; ++i) {
259 requests.push_back(Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i],
260 recvcounts[i], recvtypes[i], i, system_tag, comm));
262 XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
265 /* Now create all sends */
266 for (int i = 0; i < size; ++i) {
268 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + senddisps[i] ,
269 sendcounts[i], sendtypes[i], i, system_tag, comm));
271 XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
274 /* Wait for them all. */
275 (*request)->start_nbc_requests(requests);
280 int colls::igather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
281 MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request, int external)
283 const int system_tag = COLL_TAG_GATHER-external;
285 MPI_Aint recvext = 0;
286 std::vector<MPI_Request> requests;
288 int rank = comm->rank();
289 int size = comm->size();
290 (*request) = new Request( nullptr, 0, MPI_BYTE,
291 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
293 // Send buffer to root
294 requests.push_back(Request::isend_init(sendbuf, sendcount, sendtype, root, system_tag, comm));
296 recvtype->extent(&lb, &recvext);
297 // Local copy from root
298 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
299 recvcount, recvtype);
300 // Receive buffers from senders
301 for (int src = 0; src < size; src++) {
303 requests.push_back(Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
304 src, system_tag, comm));
308 (*request)->start_nbc_requests(requests);
312 int colls::igatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, const int* recvcounts,
313 const int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request,
316 int system_tag = COLL_TAG_GATHERV-external;
318 MPI_Aint recvext = 0;
319 std::vector<MPI_Request> requests;
321 int rank = comm->rank();
322 int size = comm->size();
323 (*request) = new Request( nullptr, 0, MPI_BYTE,
324 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
326 // Send buffer to root
327 requests.push_back(Request::isend_init(sendbuf, sendcount, sendtype, root, system_tag, comm));
329 recvtype->extent(&lb, &recvext);
330 // Local copy from root
331 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
332 recvcounts[root], recvtype);
333 // Receive buffers from senders
334 for (int src = 0; src < size; src++) {
336 requests.push_back(Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
337 recvcounts[src], recvtype, src, system_tag, comm));
341 // Wait for completion of irecv's.
342 (*request)->start_nbc_requests(requests);
345 int colls::iscatterv(const void* sendbuf, const int* sendcounts, const int* displs, MPI_Datatype sendtype,
346 void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request,
349 int system_tag = COLL_TAG_SCATTERV-external;
351 MPI_Aint sendext = 0;
352 std::vector<MPI_Request> requests;
354 int rank = comm->rank();
355 int size = comm->size();
356 (*request) = new Request( nullptr, 0, MPI_BYTE,
357 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
359 // Recv buffer from root
360 requests.push_back(Request::irecv_init(recvbuf, recvcount, recvtype, root, system_tag, comm));
362 sendtype->extent(&lb, &sendext);
363 // Local copy from root
364 if(recvbuf!=MPI_IN_PLACE){
365 Datatype::copy(static_cast<const char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
366 sendtype, recvbuf, recvcount, recvtype);
368 // Send buffers to receivers
369 for (int dst = 0; dst < size; dst++) {
371 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
372 sendtype, dst, system_tag, comm));
376 (*request)->start_nbc_requests(requests);
380 int colls::ireduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
381 MPI_Comm comm, MPI_Request* request, int external)
383 const int system_tag = COLL_TAG_REDUCE-external;
385 MPI_Aint dataext = 0;
386 std::vector<MPI_Request> requests;
388 const void* real_sendbuf = sendbuf;
390 int rank = comm->rank();
391 int size = comm->size();
396 unsigned char* tmp_sendbuf = nullptr;
397 if( sendbuf == MPI_IN_PLACE ) {
398 tmp_sendbuf = smpi_get_tmp_sendbuffer(count * datatype->get_extent());
399 Datatype::copy(recvbuf, count, datatype, tmp_sendbuf, count, datatype);
400 real_sendbuf = tmp_sendbuf;
404 (*request) = new Request( recvbuf, count, datatype,
405 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
408 (*request) = new Request( nullptr, count, datatype,
409 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
412 // Send buffer to root
413 requests.push_back(Request::isend_init(real_sendbuf, count, datatype, root, system_tag, comm));
415 datatype->extent(&lb, &dataext);
416 // Local copy from root
417 if (real_sendbuf != nullptr && recvbuf != nullptr)
418 Datatype::copy(real_sendbuf, count, datatype, recvbuf, count, datatype);
419 // Receive buffers from senders
420 for (int src = 0; src < size; src++) {
422 requests.push_back(Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, src, system_tag, comm));
426 (*request)->start_nbc_requests(requests);
427 if( sendbuf == MPI_IN_PLACE ) {
428 smpi_free_tmp_buffer(tmp_sendbuf);
433 int colls::iallreduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
434 MPI_Request* request, int external)
437 const int system_tag = COLL_TAG_ALLREDUCE-external;
439 MPI_Aint dataext = 0;
440 std::vector<MPI_Request> requests;
442 int rank = comm->rank();
443 int size = comm->size();
444 (*request) = new Request( recvbuf, count, datatype,
445 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
446 // FIXME: check for errors
447 datatype->extent(&lb, &dataext);
448 // Local copy from self
449 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
450 // Send/Recv buffers to/from others;
451 for (int other = 0; other < size; other++) {
453 requests.push_back(Request::isend_init(sendbuf, count, datatype, other, system_tag,comm));
454 requests.push_back(Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype,
455 other, system_tag, comm));
458 (*request)->start_nbc_requests(requests);
462 int colls::iscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
463 MPI_Request* request, int external)
465 int system_tag = -888-external;
467 MPI_Aint dataext = 0;
468 std::vector<MPI_Request> requests;
470 int rank = comm->rank();
471 int size = comm->size();
472 (*request) = new Request( recvbuf, count, datatype,
473 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
474 datatype->extent(&lb, &dataext);
476 // Local copy from self
477 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
479 // Send/Recv buffers to/from others
480 for (int other = 0; other < rank; other++) {
481 requests.push_back(Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, other, system_tag, comm));
483 for (int other = rank + 1; other < size; other++) {
484 requests.push_back(Request::isend_init(sendbuf, count, datatype, other, system_tag, comm));
486 // Wait for completion of all comms.
487 (*request)->start_nbc_requests(requests);
491 int colls::iexscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
492 MPI_Request* request, int external)
494 int system_tag = -888-external;
496 MPI_Aint dataext = 0;
497 std::vector<MPI_Request> requests;
499 int rank = comm->rank();
500 int size = comm->size();
501 (*request) = new Request( recvbuf, count, datatype,
502 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
503 datatype->extent(&lb, &dataext);
505 memset(recvbuf, 0, count*dataext);
507 // Send/Recv buffers to/from others
508 for (int other = 0; other < rank; other++) {
509 requests.push_back(Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, other, system_tag, comm));
511 for (int other = rank + 1; other < size; other++) {
512 requests.push_back(Request::isend_init(sendbuf, count, datatype, other, system_tag, comm));
514 // Wait for completion of all comms.
515 (*request)->start_nbc_requests(requests);
519 int colls::ireduce_scatter(const void* sendbuf, void* recvbuf, const int* recvcounts, MPI_Datatype datatype, MPI_Op op,
520 MPI_Comm comm, MPI_Request* request, int external)
522 // Version where each process performs the reduce for its own part. Alltoall pattern for comms.
523 const int system_tag = COLL_TAG_REDUCE_SCATTER-external;
525 MPI_Aint dataext = 0;
526 std::vector<MPI_Request> requests;
528 int rank = comm->rank();
529 int size = comm->size();
530 int count=recvcounts[rank];
531 (*request) = new Request( recvbuf, count, datatype,
532 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
533 datatype->extent(&lb, &dataext);
535 // Send/Recv buffers to/from others;
537 for (int other = 0; other < size; other++) {
539 requests.push_back(Request::isend_init(static_cast<const char *>(sendbuf) + recvdisp * dataext, recvcounts[other], datatype, other, system_tag,comm));
540 XBT_VERB("sending with recvdisp %d", recvdisp);
541 requests.push_back(Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype,
542 other, system_tag, comm));
544 Datatype::copy(static_cast<const char *>(sendbuf) + recvdisp * dataext, count, datatype, recvbuf, count, datatype);
546 recvdisp+=recvcounts[other];
548 (*request)->start_nbc_requests(requests);
552 } // namespace simgrid::smpi