1 /* selector for collective algorithms based on mpich decision logic */
3 /* Copyright (c) 2009-2023. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include "colls_private.hpp"
13 /* This is the default implementation of allreduce. The algorithm is:
15 Algorithm: MPI_Allreduce
17 For the heterogeneous case, we call MPI_Reduce followed by MPI_Bcast
18 in order to meet the requirement that all processes must have the
19 same result. For the homogeneous case, we use the following algorithms.
22 For long messages and for builtin ops and if count >= pof2 (where
23 pof2 is the nearest power-of-two less than or equal to the number
24 of processes), we use Rabenseifner's algorithm (see
25 http://www.hlrs.de/mpi/myreduce.html).
26 This algorithm implements the allreduce in two steps: first a
27 reduce-scatter, followed by an allgather. A recursive-halving
28 algorithm (beginning with processes that are distance 1 apart) is
29 used for the reduce-scatter, and a recursive doubling
30 algorithm is used for the allgather. The non-power-of-two case is
31 handled by dropping to the nearest lower power-of-two: the first
32 few even-numbered processes send their data to their right neighbors
33 (rank+1), and the reduce-scatter and allgather happen among the remaining
34 power-of-two processes. At the end, the first few even-numbered
35 processes get the result from their right neighbors.
37 For the power-of-two case, the cost for the reduce-scatter is
38 lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma. The cost for the
39 allgather lgp.alpha + n.((p-1)/p).beta. Therefore, the
41 Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta + n.((p-1)/p).gamma
43 For the non-power-of-two case,
44 Cost = (2.floor(lgp)+2).alpha + (2.((p-1)/p) + 2).n.beta + n.(1+(p-1)/p).gamma
47 For short messages, for user-defined ops, and for count < pof2
48 we use a recursive doubling algorithm (similar to the one in
49 MPI_Allgather). We use this algorithm in the case of user-defined ops
50 because in this case derived datatypes are allowed, and the user
51 could pass basic datatypes on one process and derived on another as
52 long as the type maps are the same. Breaking up derived datatypes
53 to do the reduce-scatter is tricky.
55 Cost = lgp.alpha + n.lgp.beta + n.lgp.gamma
57 Possible improvements:
59 End Algorithm: MPI_Allreduce
61 namespace simgrid::smpi {
62 int allreduce__mpich(const void *sbuf, void *rbuf, int count,
63 MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
65 size_t dsize, block_dsize;
66 int comm_size = comm->size();
67 const size_t large_message = 2048; //MPIR_PARAM_ALLREDUCE_SHORT_MSG_SIZE
69 dsize = dtype->size();
70 block_dsize = dsize * count;
72 /*MPICH uses SMP algorithms for all commutative ops now*/
73 if (not comm->is_smp_comm()) {
74 if(comm->get_leaders_comm()==MPI_COMM_NULL){
77 if(op->is_commutative())
78 return allreduce__mvapich2_two_level(sbuf, rbuf,count, dtype, op, comm);
81 /* find nearest power-of-two less than or equal to comm_size */
83 while (pof2 <= comm_size) pof2 <<= 1;
86 if (block_dsize > large_message && count >= pof2 && (op==MPI_OP_NULL || op->is_commutative())) {
88 return allreduce__rab_rdb(sbuf, rbuf, count, dtype, op, comm);
90 //for short ones and count < pof2
91 return allreduce__rdb(sbuf, rbuf, count, dtype, op, comm);
96 /* This is the default implementation of alltoall. The algorithm is:
98 Algorithm: MPI_Alltoall
100 We use four algorithms for alltoall. For short messages and
101 (comm_size >= 8), we use the algorithm by Jehoshua Bruck et al,
102 IEEE TPDS, Nov. 1997. It is a store-and-forward algorithm that
103 takes lgp steps. Because of the extra communication, the bandwidth
104 requirement is (n/2).lgp.beta.
106 Cost = lgp.alpha + (n/2).lgp.beta
108 where n is the total amount of data a process needs to send to all
111 For medium size messages and (short messages for comm_size < 8), we
112 use an algorithm that posts all irecvs and isends and then does a
113 waitall. We scatter the order of sources and destinations among the
114 processes, so that all processes don't try to send/recv to/from the
115 same process at the same time.
117 *** Modification: We post only a small number of isends and irecvs
118 at a time and wait on them as suggested by Tony Ladd. ***
119 *** See comments below about an additional modification that
120 we may want to consider ***
122 For long messages and power-of-two number of processes, we use a
123 pairwise exchange algorithm, which takes p-1 steps. We
124 calculate the pairs by using an exclusive-or algorithm:
125 for (i=1; i<comm_size; i++)
127 This algorithm doesn't work if the number of processes is not a power of
128 two. For a non-power-of-two number of processes, we use an
129 algorithm in which, in step i, each process receives from (rank-i)
130 and sends to (rank+i).
132 Cost = (p-1).alpha + n.beta
134 where n is the total amount of data a process needs to send to all
137 Possible improvements:
139 End Algorithm: MPI_Alltoall
142 int alltoall__mpich(const void *sbuf, int scount,
144 void* rbuf, int rcount,
148 int communicator_size;
149 size_t dsize, block_dsize;
150 communicator_size = comm->size();
152 unsigned int short_size=256;
153 unsigned int medium_size=32768;
154 //short size and comm_size >=8 -> bruck
156 // medium size messages and (short messages for comm_size < 8), we
157 // use an algorithm that posts all irecvs and isends and then does a
160 // For long messages and power-of-two number of processes, we use a
161 // pairwise exchange algorithm
163 // For a non-power-of-two number of processes, we use an
164 // algorithm in which, in step i, each process receives from (rank-i)
165 // and sends to (rank+i).
168 dsize = sdtype->size();
169 block_dsize = dsize * scount;
171 if ((block_dsize < short_size) && (communicator_size >= 8)) {
172 return alltoall__bruck(sbuf, scount, sdtype,
173 rbuf, rcount, rdtype,
176 } else if (block_dsize < medium_size) {
177 return alltoall__mvapich2_scatter_dest(sbuf, scount, sdtype,
178 rbuf, rcount, rdtype,
180 }else if (communicator_size%2){
181 return alltoall__pair(sbuf, scount, sdtype,
182 rbuf, rcount, rdtype,
186 return alltoall__ring(sbuf, scount, sdtype,
187 rbuf, rcount, rdtype,
191 int alltoallv__mpich(const void *sbuf, const int *scounts, const int *sdisps,
193 void *rbuf, const int *rcounts, const int *rdisps,
198 /* For starters, just keep the original algorithm. */
199 return alltoallv__bruck(sbuf, scounts, sdisps, sdtype,
200 rbuf, rcounts, rdisps,rdtype,
205 int barrier__mpich(MPI_Comm comm)
207 return barrier__ompi_bruck(comm);
210 /* This is the default implementation of broadcast. The algorithm is:
214 For short messages, we use a binomial tree algorithm.
215 Cost = lgp.alpha + n.lgp.beta
217 For long messages, we do a scatter followed by an allgather.
218 We first scatter the buffer using a binomial tree algorithm. This costs
219 lgp.alpha + n.((p-1)/p).beta
220 If the datatype is contiguous and the communicator is homogeneous,
221 we treat the data as bytes and divide (scatter) it among processes
222 by using ceiling division. For the noncontiguous or heterogeneous
223 cases, we first pack the data into a temporary buffer by using
224 MPI_Pack, scatter it as bytes, and unpack it after the allgather.
226 For the allgather, we use a recursive doubling algorithm for
227 medium-size messages and power-of-two number of processes. This
228 takes lgp steps. In each step pairs of processes exchange all the
229 data they have (we take care of non-power-of-two situations). This
230 costs approximately lgp.alpha + n.((p-1)/p).beta. (Approximately
231 because it may be slightly more in the non-power-of-two case, but
232 it's still a logarithmic algorithm.) Therefore, for long messages
233 Total Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta
235 Note that this algorithm has twice the latency as the tree algorithm
236 we use for short messages, but requires lower bandwidth: 2.n.beta
237 versus n.lgp.beta. Therefore, for long messages and when lgp > 2,
238 this algorithm will perform better.
240 For long messages and for medium-size messages and non-power-of-two
241 processes, we use a ring algorithm for the allgather, which
242 takes p-1 steps, because it performs better than recursive doubling.
243 Total Cost = (lgp+p-1).alpha + 2.n.((p-1)/p).beta
245 Possible improvements:
246 For clusters of SMPs, we may want to do something differently to
247 take advantage of shared memory on each node.
249 End Algorithm: MPI_Bcast
253 int bcast__mpich(void *buff, int count,
254 MPI_Datatype datatype, int root,
258 /* Decision function based on MX results for
259 messages up to 36MB and communicator sizes up to 64 nodes */
260 const size_t small_message_size = 12288;
261 const size_t intermediate_message_size = 524288;
263 int communicator_size;
265 size_t message_size, dsize;
267 if (not comm->is_smp_comm()) {
268 if(comm->get_leaders_comm()==MPI_COMM_NULL){
271 if(comm->is_uniform())
272 return bcast__SMP_binomial(buff, count, datatype, root, comm);
275 communicator_size = comm->size();
277 /* else we need data size for decision function */
278 dsize = datatype->size();
279 message_size = dsize * (unsigned long)count; /* needed for decision */
281 /* Handle messages of small and intermediate size, and
282 single-element broadcasts */
283 if ((message_size < small_message_size) || (communicator_size <= 8)) {
284 /* Binomial without segmentation */
285 return bcast__binomial_tree(buff, count, datatype, root, comm);
287 } else if (message_size < intermediate_message_size && !(communicator_size%2)) {
288 // SplittedBinary with 1KB segments
289 return bcast__scatter_rdb_allgather(buff, count, datatype, root, comm);
292 //Handle large message sizes
293 return bcast__scatter_LR_allgather(buff, count, datatype, root, comm);
299 /* This is the default implementation of reduce. The algorithm is:
301 Algorithm: MPI_Reduce
303 For long messages and for builtin ops and if count >= pof2 (where
304 pof2 is the nearest power-of-two less than or equal to the number
305 of processes), we use Rabenseifner's algorithm (see
306 http://www.hlrs.de/organization/par/services/models/mpi/myreduce.html ).
307 This algorithm implements the reduce in two steps: first a
308 reduce-scatter, followed by a gather to the root. A
309 recursive-halving algorithm (beginning with processes that are
310 distance 1 apart) is used for the reduce-scatter, and a binomial tree
311 algorithm is used for the gather. The non-power-of-two case is
312 handled by dropping to the nearest lower power-of-two: the first
313 few odd-numbered processes send their data to their left neighbors
314 (rank-1), and the reduce-scatter happens among the remaining
315 power-of-two processes. If the root is one of the excluded
316 processes, then after the reduce-scatter, rank 0 sends its result to
317 the root and exits; the root now acts as rank 0 in the binomial tree
318 algorithm for gather.
320 For the power-of-two case, the cost for the reduce-scatter is
321 lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma. The cost for the
322 gather to root is lgp.alpha + n.((p-1)/p).beta. Therefore, the
324 Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta + n.((p-1)/p).gamma
326 For the non-power-of-two case, assuming the root is not one of the
327 odd-numbered processes that get excluded in the reduce-scatter,
328 Cost = (2.floor(lgp)+1).alpha + (2.((p-1)/p) + 1).n.beta +
332 For short messages, user-defined ops, and count < pof2, we use a
333 binomial tree algorithm for both short and long messages.
335 Cost = lgp.alpha + n.lgp.beta + n.lgp.gamma
338 We use the binomial tree algorithm in the case of user-defined ops
339 because in this case derived datatypes are allowed, and the user
340 could pass basic datatypes on one process and derived on another as
341 long as the type maps are the same. Breaking up derived datatypes
342 to do the reduce-scatter is tricky.
344 FIXME: Per the MPI-2.1 standard this case is not possible. We
345 should be able to use the reduce-scatter/gather approach as long as
346 count >= pof2. [goodell@ 2009-01-21]
348 Possible improvements:
350 End Algorithm: MPI_Reduce
354 int reduce__mpich(const void *sendbuf, void *recvbuf,
355 int count, MPI_Datatype datatype,
360 int communicator_size=0;
361 size_t message_size, dsize;
363 if (not comm->is_smp_comm()) {
364 if(comm->get_leaders_comm()==MPI_COMM_NULL){
367 if (op->is_commutative() == 1)
368 return reduce__mvapich2_two_level(sendbuf, recvbuf, count, datatype, op, root, comm);
371 communicator_size = comm->size();
373 /* need data size for decision function */
374 dsize=datatype->size();
375 message_size = dsize * count; /* needed for decision */
378 while (pof2 <= communicator_size) pof2 <<= 1;
381 if ((count < pof2) || (message_size < 2048) || (op != MPI_OP_NULL && not op->is_commutative())) {
382 return reduce__binomial(sendbuf, recvbuf, count, datatype, op, root, comm);
384 return reduce__scatter_gather(sendbuf, recvbuf, count, datatype, op, root, comm);
389 /* This is the default implementation of reduce_scatter. The algorithm is:
391 Algorithm: MPI_Reduce_scatter
393 If the operation is commutative, for short and medium-size
394 messages, we use a recursive-halving
395 algorithm in which the first p/2 processes send the second n/2 data
396 to their counterparts in the other half and receive the first n/2
397 data from them. This procedure continues recursively, halving the
398 data communicated at each step, for a total of lgp steps. If the
399 number of processes is not a power-of-two, we convert it to the
400 nearest lower power-of-two by having the first few even-numbered
401 processes send their data to the neighboring odd-numbered process
402 at (rank+1). Those odd-numbered processes compute the result for
403 their left neighbor as well in the recursive halving algorithm, and
404 then at the end send the result back to the processes that didn't
406 Therefore, if p is a power-of-two,
407 Cost = lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma
408 If p is not a power-of-two,
409 Cost = (floor(lgp)+2).alpha + n.(1+(p-1+n)/p).beta + n.(1+(p-1)/p).gamma
410 The above cost in the non power-of-two case is approximate because
411 there is some imbalance in the amount of work each process does
412 because some processes do the work of their neighbors as well.
414 For commutative operations and very long messages we use
415 we use a pairwise exchange algorithm similar to
416 the one used in MPI_Alltoall. At step i, each process sends n/p
417 amount of data to (rank+i) and receives n/p amount of data from
419 Cost = (p-1).alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma
422 If the operation is not commutative, we do the following:
424 We use a recursive doubling algorithm, which
425 takes lgp steps. At step 1, processes exchange (n-n/p) amount of
426 data; at step 2, (n-2n/p) amount of data; at step 3, (n-4n/p)
427 amount of data, and so forth.
429 Cost = lgp.alpha + n.(lgp-(p-1)/p).beta + n.(lgp-(p-1)/p).gamma
431 Possible improvements:
433 End Algorithm: MPI_Reduce_scatter
437 int reduce_scatter__mpich(const void *sbuf, void *rbuf,
445 size_t total_message_size;
447 if(sbuf==rbuf)sbuf=MPI_IN_PLACE; //restore MPI_IN_PLACE as these algorithms handle it
449 XBT_DEBUG("Coll_reduce_scatter_mpich::reduce");
451 comm_size = comm->size();
452 // We need data size for decision function
453 total_message_size = 0;
454 for (i = 0; i < comm_size; i++) {
455 total_message_size += rcounts[i];
458 if( (op==MPI_OP_NULL || op->is_commutative()) && total_message_size > 524288) {
459 return reduce_scatter__mpich_pair(sbuf, rbuf, rcounts, dtype, op, comm);
460 } else if ((op != MPI_OP_NULL && not op->is_commutative())) {
461 bool is_block_regular = true;
462 for (i = 0; i < (comm_size - 1); ++i) {
463 if (rcounts[i] != rcounts[i + 1]) {
464 is_block_regular = false;
469 /* slightly retask pof2 to mean pof2 equal or greater, not always greater as it is above */
471 while (pof2 < comm_size)
474 if (pof2 == comm_size && is_block_regular) {
475 /* noncommutative, pof2 size, and block regular */
476 return reduce_scatter__mpich_noncomm(sbuf, rbuf, rcounts, dtype, op, comm);
479 return reduce_scatter__mpich_rdb(sbuf, rbuf, rcounts, dtype, op, comm);
481 return reduce_scatter__mpich_rdb(sbuf, rbuf, rcounts, dtype, op, comm);
486 /* This is the default implementation of allgather. The algorithm is:
488 Algorithm: MPI_Allgather
490 For short messages and non-power-of-two no. of processes, we use
491 the algorithm from the Jehoshua Bruck et al IEEE TPDS Nov 97
492 paper. It is a variant of the disemmination algorithm for
493 barrier. It takes ceiling(lg p) steps.
495 Cost = lgp.alpha + n.((p-1)/p).beta
496 where n is total size of data gathered on each process.
498 For short or medium-size messages and power-of-two no. of
499 processes, we use the recursive doubling algorithm.
501 Cost = lgp.alpha + n.((p-1)/p).beta
503 TODO: On TCP, we may want to use recursive doubling instead of the Bruck
504 algorithm in all cases because of the pairwise-exchange property of
505 recursive doubling (see Benson et al paper in Euro PVM/MPI
508 It is interesting to note that either of the above algorithms for
509 MPI_Allgather has the same cost as the tree algorithm for MPI_Gather!
511 For long messages or medium-size messages and non-power-of-two
512 no. of processes, we use a ring algorithm. In the first step, each
513 process i sends its contribution to process i+1 and receives
514 the contribution from process i-1 (with wrap-around). From the
515 second step onwards, each process i forwards to process i+1 the
516 data it received from process i-1 in the previous step. This takes
517 a total of p-1 steps.
519 Cost = (p-1).alpha + n.((p-1)/p).beta
521 We use this algorithm instead of recursive doubling for long
522 messages because we find that this communication pattern (nearest
523 neighbor) performs twice as fast as recursive doubling for long
524 messages (on Myrinet and IBM SP).
526 Possible improvements:
528 End Algorithm: MPI_Allgather
531 int allgather__mpich(const void *sbuf, int scount,
533 void* rbuf, int rcount,
538 int communicator_size, pow2_size;
539 size_t dsize, total_dsize;
541 communicator_size = comm->size();
543 /* Determine complete data size */
544 dsize=sdtype->size();
545 total_dsize = dsize * scount * communicator_size;
547 for (pow2_size = 1; pow2_size < communicator_size; pow2_size <<=1);
549 /* Decision as in MPICH-2
550 presented in Thakur et.al. "Optimization of Collective Communication
551 Operations in MPICH", International Journal of High Performance Computing
552 Applications, Vol. 19, No. 1, 49-66 (2005)
553 - for power-of-two processes and small and medium size messages
554 (up to 512KB) use recursive doubling
555 - for non-power-of-two processes and small messages (80KB) use bruck,
556 - for everything else use ring.
558 if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
559 return allgather__rdb(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
560 } else if (total_dsize <= 81920) {
561 return allgather__bruck(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
563 return allgather__ring(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
567 /* This is the default implementation of allgatherv. The algorithm is:
569 Algorithm: MPI_Allgatherv
571 For short messages and non-power-of-two no. of processes, we use
572 the algorithm from the Jehoshua Bruck et al IEEE TPDS Nov 97
573 paper. It is a variant of the disemmination algorithm for
574 barrier. It takes ceiling(lg p) steps.
576 Cost = lgp.alpha + n.((p-1)/p).beta
577 where n is total size of data gathered on each process.
579 For short or medium-size messages and power-of-two no. of
580 processes, we use the recursive doubling algorithm.
582 Cost = lgp.alpha + n.((p-1)/p).beta
584 TODO: On TCP, we may want to use recursive doubling instead of the Bruck
585 algorithm in all cases because of the pairwise-exchange property of
586 recursive doubling (see Benson et al paper in Euro PVM/MPI
589 For long messages or medium-size messages and non-power-of-two
590 no. of processes, we use a ring algorithm. In the first step, each
591 process i sends its contribution to process i+1 and receives
592 the contribution from process i-1 (with wrap-around). From the
593 second step onwards, each process i forwards to process i+1 the
594 data it received from process i-1 in the previous step. This takes
595 a total of p-1 steps.
597 Cost = (p-1).alpha + n.((p-1)/p).beta
599 Possible improvements:
601 End Algorithm: MPI_Allgatherv
603 int allgatherv__mpich(const void *sbuf, int scount,
605 void* rbuf, const int *rcounts,
611 int communicator_size, pow2_size,i;
614 communicator_size = comm->size();
616 /* Determine complete data size */
618 for (i=0; i<communicator_size; i++)
619 total_dsize += rcounts[i];
620 if (total_dsize == 0)
623 for (pow2_size = 1; pow2_size < communicator_size; pow2_size <<=1);
625 if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
626 return allgatherv__mpich_rdb(sbuf, scount, sdtype, rbuf, rcounts, rdispls, rdtype, comm);
627 } else if (total_dsize <= 81920) {
628 return allgatherv__ompi_bruck(sbuf, scount, sdtype, rbuf, rcounts, rdispls, rdtype, comm);
630 return allgatherv__mpich_ring(sbuf, scount, sdtype, rbuf, rcounts, rdispls, rdtype, comm);
633 /* This is the default implementation of gather. The algorithm is:
635 Algorithm: MPI_Gather
637 We use a binomial tree algorithm for both short and long
638 messages. At nodes other than leaf nodes we need to allocate a
639 temporary buffer to store the incoming message. If the root is not
640 rank 0, for very small messages, we pack it into a temporary
641 contiguous buffer and reorder it to be placed in the right
642 order. For small (but not very small) messages, we use a derived
643 datatype to unpack the incoming data into non-contiguous buffers in
644 the right order. In the heterogeneous case we first pack the
645 buffers by using MPI_Pack and then do the gather.
647 Cost = lgp.alpha + n.((p-1)/p).beta
648 where n is the total size of the data gathered at the root.
650 Possible improvements:
652 End Algorithm: MPI_Gather
655 int gather__mpich(const void *sbuf, int scount,
657 void* rbuf, int rcount,
663 return gather__ompi_binomial(sbuf, scount, sdtype,
664 rbuf, rcount, rdtype,
668 /* This is the default implementation of scatter. The algorithm is:
670 Algorithm: MPI_Scatter
672 We use a binomial tree algorithm for both short and
673 long messages. At nodes other than leaf nodes we need to allocate
674 a temporary buffer to store the incoming message. If the root is
675 not rank 0, we reorder the sendbuf in order of relative ranks by
676 copying it into a temporary buffer, so that all the sends from the
677 root are contiguous and in the right order. In the heterogeneous
678 case, we first pack the buffer by using MPI_Pack and then do the
681 Cost = lgp.alpha + n.((p-1)/p).beta
682 where n is the total size of the data to be scattered from the root.
684 Possible improvements:
686 End Algorithm: MPI_Scatter
690 int scatter__mpich(const void *sbuf, int scount,
692 void* rbuf, int rcount,
694 int root, MPI_Comm comm
697 std::unique_ptr<unsigned char[]> tmp_buf;
698 if(comm->rank()!=root){
699 tmp_buf = std::make_unique<unsigned char[]>(rcount * rdtype->get_extent());
700 sbuf = tmp_buf.get();
704 return scatter__ompi_binomial(sbuf, scount, sdtype, rbuf, rcount, rdtype, root, comm);
706 } // namespace simgrid::smpi