1 /* selector for collective algorithms based on mpich decision logic */
3 /* Copyright (c) 2009-2021. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include "colls_private.hpp"
13 /* This is the default implementation of allreduce. The algorithm is:
15 Algorithm: MPI_Allreduce
17 For the heterogeneous case, we call MPI_Reduce followed by MPI_Bcast
18 in order to meet the requirement that all processes must have the
19 same result. For the homogeneous case, we use the following algorithms.
22 For long messages and for builtin ops and if count >= pof2 (where
23 pof2 is the nearest power-of-two less than or equal to the number
24 of processes), we use Rabenseifner's algorithm (see
25 http://www.hlrs.de/mpi/myreduce.html).
26 This algorithm implements the allreduce in two steps: first a
27 reduce-scatter, followed by an allgather. A recursive-halving
28 algorithm (beginning with processes that are distance 1 apart) is
29 used for the reduce-scatter, and a recursive doubling
30 algorithm is used for the allgather. The non-power-of-two case is
31 handled by dropping to the nearest lower power-of-two: the first
32 few even-numbered processes send their data to their right neighbors
33 (rank+1), and the reduce-scatter and allgather happen among the remaining
34 power-of-two processes. At the end, the first few even-numbered
35 processes get the result from their right neighbors.
37 For the power-of-two case, the cost for the reduce-scatter is
38 lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma. The cost for the
39 allgather lgp.alpha + n.((p-1)/p).beta. Therefore, the
41 Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta + n.((p-1)/p).gamma
43 For the non-power-of-two case,
44 Cost = (2.floor(lgp)+2).alpha + (2.((p-1)/p) + 2).n.beta + n.(1+(p-1)/p).gamma
47 For short messages, for user-defined ops, and for count < pof2
48 we use a recursive doubling algorithm (similar to the one in
49 MPI_Allgather). We use this algorithm in the case of user-defined ops
50 because in this case derived datatypes are allowed, and the user
51 could pass basic datatypes on one process and derived on another as
52 long as the type maps are the same. Breaking up derived datatypes
53 to do the reduce-scatter is tricky.
55 Cost = lgp.alpha + n.lgp.beta + n.lgp.gamma
57 Possible improvements:
59 End Algorithm: MPI_Allreduce
63 int allreduce__mpich(const void *sbuf, void *rbuf, int count,
64 MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
66 size_t dsize, block_dsize;
67 int comm_size = comm->size();
68 const size_t large_message = 2048; //MPIR_PARAM_ALLREDUCE_SHORT_MSG_SIZE
70 dsize = dtype->size();
71 block_dsize = dsize * count;
73 /*MPICH uses SMP algorithms for all commutative ops now*/
74 if (not comm->is_smp_comm()) {
75 if(comm->get_leaders_comm()==MPI_COMM_NULL){
78 if(op->is_commutative())
79 return allreduce__mvapich2_two_level(sbuf, rbuf,count, dtype, op, comm);
82 /* find nearest power-of-two less than or equal to comm_size */
84 while (pof2 <= comm_size) pof2 <<= 1;
87 if (block_dsize > large_message && count >= pof2 && (op==MPI_OP_NULL || op->is_commutative())) {
89 return allreduce__rab_rdb(sbuf, rbuf, count, dtype, op, comm);
91 //for short ones and count < pof2
92 return allreduce__rdb(sbuf, rbuf, count, dtype, op, comm);
97 /* This is the default implementation of alltoall. The algorithm is:
99 Algorithm: MPI_Alltoall
101 We use four algorithms for alltoall. For short messages and
102 (comm_size >= 8), we use the algorithm by Jehoshua Bruck et al,
103 IEEE TPDS, Nov. 1997. It is a store-and-forward algorithm that
104 takes lgp steps. Because of the extra communication, the bandwidth
105 requirement is (n/2).lgp.beta.
107 Cost = lgp.alpha + (n/2).lgp.beta
109 where n is the total amount of data a process needs to send to all
112 For medium size messages and (short messages for comm_size < 8), we
113 use an algorithm that posts all irecvs and isends and then does a
114 waitall. We scatter the order of sources and destinations among the
115 processes, so that all processes don't try to send/recv to/from the
116 same process at the same time.
118 *** Modification: We post only a small number of isends and irecvs
119 at a time and wait on them as suggested by Tony Ladd. ***
120 *** See comments below about an additional modification that
121 we may want to consider ***
123 For long messages and power-of-two number of processes, we use a
124 pairwise exchange algorithm, which takes p-1 steps. We
125 calculate the pairs by using an exclusive-or algorithm:
126 for (i=1; i<comm_size; i++)
128 This algorithm doesn't work if the number of processes is not a power of
129 two. For a non-power-of-two number of processes, we use an
130 algorithm in which, in step i, each process receives from (rank-i)
131 and sends to (rank+i).
133 Cost = (p-1).alpha + n.beta
135 where n is the total amount of data a process needs to send to all
138 Possible improvements:
140 End Algorithm: MPI_Alltoall
143 int alltoall__mpich(const void *sbuf, int scount,
145 void* rbuf, int rcount,
149 int communicator_size;
150 size_t dsize, block_dsize;
151 communicator_size = comm->size();
153 unsigned int short_size=256;
154 unsigned int medium_size=32768;
155 //short size and comm_size >=8 -> bruck
157 // medium size messages and (short messages for comm_size < 8), we
158 // use an algorithm that posts all irecvs and isends and then does a
161 // For long messages and power-of-two number of processes, we use a
162 // pairwise exchange algorithm
164 // For a non-power-of-two number of processes, we use an
165 // algorithm in which, in step i, each process receives from (rank-i)
166 // and sends to (rank+i).
169 dsize = sdtype->size();
170 block_dsize = dsize * scount;
172 if ((block_dsize < short_size) && (communicator_size >= 8)) {
173 return alltoall__bruck(sbuf, scount, sdtype,
174 rbuf, rcount, rdtype,
177 } else if (block_dsize < medium_size) {
178 return alltoall__mvapich2_scatter_dest(sbuf, scount, sdtype,
179 rbuf, rcount, rdtype,
181 }else if (communicator_size%2){
182 return alltoall__pair(sbuf, scount, sdtype,
183 rbuf, rcount, rdtype,
187 return alltoall__ring(sbuf, scount, sdtype,
188 rbuf, rcount, rdtype,
192 int alltoallv__mpich(const void *sbuf, const int *scounts, const int *sdisps,
194 void *rbuf, const int *rcounts, const int *rdisps,
199 /* For starters, just keep the original algorithm. */
200 return alltoallv__bruck(sbuf, scounts, sdisps, sdtype,
201 rbuf, rcounts, rdisps,rdtype,
206 int barrier__mpich(MPI_Comm comm)
208 return barrier__ompi_bruck(comm);
211 /* This is the default implementation of broadcast. The algorithm is:
215 For short messages, we use a binomial tree algorithm.
216 Cost = lgp.alpha + n.lgp.beta
218 For long messages, we do a scatter followed by an allgather.
219 We first scatter the buffer using a binomial tree algorithm. This costs
220 lgp.alpha + n.((p-1)/p).beta
221 If the datatype is contiguous and the communicator is homogeneous,
222 we treat the data as bytes and divide (scatter) it among processes
223 by using ceiling division. For the noncontiguous or heterogeneous
224 cases, we first pack the data into a temporary buffer by using
225 MPI_Pack, scatter it as bytes, and unpack it after the allgather.
227 For the allgather, we use a recursive doubling algorithm for
228 medium-size messages and power-of-two number of processes. This
229 takes lgp steps. In each step pairs of processes exchange all the
230 data they have (we take care of non-power-of-two situations). This
231 costs approximately lgp.alpha + n.((p-1)/p).beta. (Approximately
232 because it may be slightly more in the non-power-of-two case, but
233 it's still a logarithmic algorithm.) Therefore, for long messages
234 Total Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta
236 Note that this algorithm has twice the latency as the tree algorithm
237 we use for short messages, but requires lower bandwidth: 2.n.beta
238 versus n.lgp.beta. Therefore, for long messages and when lgp > 2,
239 this algorithm will perform better.
241 For long messages and for medium-size messages and non-power-of-two
242 processes, we use a ring algorithm for the allgather, which
243 takes p-1 steps, because it performs better than recursive doubling.
244 Total Cost = (lgp+p-1).alpha + 2.n.((p-1)/p).beta
246 Possible improvements:
247 For clusters of SMPs, we may want to do something differently to
248 take advantage of shared memory on each node.
250 End Algorithm: MPI_Bcast
254 int bcast__mpich(void *buff, int count,
255 MPI_Datatype datatype, int root,
259 /* Decision function based on MX results for
260 messages up to 36MB and communicator sizes up to 64 nodes */
261 const size_t small_message_size = 12288;
262 const size_t intermediate_message_size = 524288;
264 int communicator_size;
266 size_t message_size, dsize;
268 if (not comm->is_smp_comm()) {
269 if(comm->get_leaders_comm()==MPI_COMM_NULL){
272 if(comm->is_uniform())
273 return bcast__SMP_binomial(buff, count, datatype, root, comm);
276 communicator_size = comm->size();
278 /* else we need data size for decision function */
279 dsize = datatype->size();
280 message_size = dsize * (unsigned long)count; /* needed for decision */
282 /* Handle messages of small and intermediate size, and
283 single-element broadcasts */
284 if ((message_size < small_message_size) || (communicator_size <= 8)) {
285 /* Binomial without segmentation */
286 return bcast__binomial_tree(buff, count, datatype, root, comm);
288 } else if (message_size < intermediate_message_size && !(communicator_size%2)) {
289 // SplittedBinary with 1KB segments
290 return bcast__scatter_rdb_allgather(buff, count, datatype, root, comm);
293 //Handle large message sizes
294 return bcast__scatter_LR_allgather(buff, count, datatype, root, comm);
300 /* This is the default implementation of reduce. The algorithm is:
302 Algorithm: MPI_Reduce
304 For long messages and for builtin ops and if count >= pof2 (where
305 pof2 is the nearest power-of-two less than or equal to the number
306 of processes), we use Rabenseifner's algorithm (see
307 http://www.hlrs.de/organization/par/services/models/mpi/myreduce.html ).
308 This algorithm implements the reduce in two steps: first a
309 reduce-scatter, followed by a gather to the root. A
310 recursive-halving algorithm (beginning with processes that are
311 distance 1 apart) is used for the reduce-scatter, and a binomial tree
312 algorithm is used for the gather. The non-power-of-two case is
313 handled by dropping to the nearest lower power-of-two: the first
314 few odd-numbered processes send their data to their left neighbors
315 (rank-1), and the reduce-scatter happens among the remaining
316 power-of-two processes. If the root is one of the excluded
317 processes, then after the reduce-scatter, rank 0 sends its result to
318 the root and exits; the root now acts as rank 0 in the binomial tree
319 algorithm for gather.
321 For the power-of-two case, the cost for the reduce-scatter is
322 lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma. The cost for the
323 gather to root is lgp.alpha + n.((p-1)/p).beta. Therefore, the
325 Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta + n.((p-1)/p).gamma
327 For the non-power-of-two case, assuming the root is not one of the
328 odd-numbered processes that get excluded in the reduce-scatter,
329 Cost = (2.floor(lgp)+1).alpha + (2.((p-1)/p) + 1).n.beta +
333 For short messages, user-defined ops, and count < pof2, we use a
334 binomial tree algorithm for both short and long messages.
336 Cost = lgp.alpha + n.lgp.beta + n.lgp.gamma
339 We use the binomial tree algorithm in the case of user-defined ops
340 because in this case derived datatypes are allowed, and the user
341 could pass basic datatypes on one process and derived on another as
342 long as the type maps are the same. Breaking up derived datatypes
343 to do the reduce-scatter is tricky.
345 FIXME: Per the MPI-2.1 standard this case is not possible. We
346 should be able to use the reduce-scatter/gather approach as long as
347 count >= pof2. [goodell@ 2009-01-21]
349 Possible improvements:
351 End Algorithm: MPI_Reduce
355 int reduce__mpich(const void *sendbuf, void *recvbuf,
356 int count, MPI_Datatype datatype,
361 int communicator_size=0;
362 size_t message_size, dsize;
364 if (not comm->is_smp_comm()) {
365 if(comm->get_leaders_comm()==MPI_COMM_NULL){
368 if (op->is_commutative() == 1)
369 return reduce__mvapich2_two_level(sendbuf, recvbuf, count, datatype, op, root, comm);
372 communicator_size = comm->size();
374 /* need data size for decision function */
375 dsize=datatype->size();
376 message_size = dsize * count; /* needed for decision */
379 while (pof2 <= communicator_size) pof2 <<= 1;
382 if ((count < pof2) || (message_size < 2048) || (op != MPI_OP_NULL && not op->is_commutative())) {
383 return reduce__binomial(sendbuf, recvbuf, count, datatype, op, root, comm);
385 return reduce__scatter_gather(sendbuf, recvbuf, count, datatype, op, root, comm);
390 /* This is the default implementation of reduce_scatter. The algorithm is:
392 Algorithm: MPI_Reduce_scatter
394 If the operation is commutative, for short and medium-size
395 messages, we use a recursive-halving
396 algorithm in which the first p/2 processes send the second n/2 data
397 to their counterparts in the other half and receive the first n/2
398 data from them. This procedure continues recursively, halving the
399 data communicated at each step, for a total of lgp steps. If the
400 number of processes is not a power-of-two, we convert it to the
401 nearest lower power-of-two by having the first few even-numbered
402 processes send their data to the neighboring odd-numbered process
403 at (rank+1). Those odd-numbered processes compute the result for
404 their left neighbor as well in the recursive halving algorithm, and
405 then at the end send the result back to the processes that didn't
407 Therefore, if p is a power-of-two,
408 Cost = lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma
409 If p is not a power-of-two,
410 Cost = (floor(lgp)+2).alpha + n.(1+(p-1+n)/p).beta + n.(1+(p-1)/p).gamma
411 The above cost in the non power-of-two case is approximate because
412 there is some imbalance in the amount of work each process does
413 because some processes do the work of their neighbors as well.
415 For commutative operations and very long messages we use
416 we use a pairwise exchange algorithm similar to
417 the one used in MPI_Alltoall. At step i, each process sends n/p
418 amount of data to (rank+i) and receives n/p amount of data from
420 Cost = (p-1).alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma
423 If the operation is not commutative, we do the following:
425 We use a recursive doubling algorithm, which
426 takes lgp steps. At step 1, processes exchange (n-n/p) amount of
427 data; at step 2, (n-2n/p) amount of data; at step 3, (n-4n/p)
428 amount of data, and so forth.
430 Cost = lgp.alpha + n.(lgp-(p-1)/p).beta + n.(lgp-(p-1)/p).gamma
432 Possible improvements:
434 End Algorithm: MPI_Reduce_scatter
438 int reduce_scatter__mpich(const void *sbuf, void *rbuf,
446 size_t total_message_size;
448 if(sbuf==rbuf)sbuf=MPI_IN_PLACE; //restore MPI_IN_PLACE as these algorithms handle it
450 XBT_DEBUG("Coll_reduce_scatter_mpich::reduce");
452 comm_size = comm->size();
453 // We need data size for decision function
454 total_message_size = 0;
455 for (i = 0; i < comm_size; i++) {
456 total_message_size += rcounts[i];
459 if( (op==MPI_OP_NULL || op->is_commutative()) && total_message_size > 524288) {
460 return reduce_scatter__mpich_pair(sbuf, rbuf, rcounts, dtype, op, comm);
461 } else if ((op != MPI_OP_NULL && not op->is_commutative())) {
462 bool is_block_regular = true;
463 for (i = 0; i < (comm_size - 1); ++i) {
464 if (rcounts[i] != rcounts[i + 1]) {
465 is_block_regular = false;
470 /* slightly retask pof2 to mean pof2 equal or greater, not always greater as it is above */
472 while (pof2 < comm_size)
475 if (pof2 == comm_size && is_block_regular) {
476 /* noncommutative, pof2 size, and block regular */
477 return reduce_scatter__mpich_noncomm(sbuf, rbuf, rcounts, dtype, op, comm);
480 return reduce_scatter__mpich_rdb(sbuf, rbuf, rcounts, dtype, op, comm);
482 return reduce_scatter__mpich_rdb(sbuf, rbuf, rcounts, dtype, op, comm);
487 /* This is the default implementation of allgather. The algorithm is:
489 Algorithm: MPI_Allgather
491 For short messages and non-power-of-two no. of processes, we use
492 the algorithm from the Jehoshua Bruck et al IEEE TPDS Nov 97
493 paper. It is a variant of the disemmination algorithm for
494 barrier. It takes ceiling(lg p) steps.
496 Cost = lgp.alpha + n.((p-1)/p).beta
497 where n is total size of data gathered on each process.
499 For short or medium-size messages and power-of-two no. of
500 processes, we use the recursive doubling algorithm.
502 Cost = lgp.alpha + n.((p-1)/p).beta
504 TODO: On TCP, we may want to use recursive doubling instead of the Bruck
505 algorithm in all cases because of the pairwise-exchange property of
506 recursive doubling (see Benson et al paper in Euro PVM/MPI
509 It is interesting to note that either of the above algorithms for
510 MPI_Allgather has the same cost as the tree algorithm for MPI_Gather!
512 For long messages or medium-size messages and non-power-of-two
513 no. of processes, we use a ring algorithm. In the first step, each
514 process i sends its contribution to process i+1 and receives
515 the contribution from process i-1 (with wrap-around). From the
516 second step onwards, each process i forwards to process i+1 the
517 data it received from process i-1 in the previous step. This takes
518 a total of p-1 steps.
520 Cost = (p-1).alpha + n.((p-1)/p).beta
522 We use this algorithm instead of recursive doubling for long
523 messages because we find that this communication pattern (nearest
524 neighbor) performs twice as fast as recursive doubling for long
525 messages (on Myrinet and IBM SP).
527 Possible improvements:
529 End Algorithm: MPI_Allgather
532 int allgather__mpich(const void *sbuf, int scount,
534 void* rbuf, int rcount,
539 int communicator_size, pow2_size;
540 size_t dsize, total_dsize;
542 communicator_size = comm->size();
544 /* Determine complete data size */
545 dsize=sdtype->size();
546 total_dsize = dsize * scount * communicator_size;
548 for (pow2_size = 1; pow2_size < communicator_size; pow2_size <<=1);
550 /* Decision as in MPICH-2
551 presented in Thakur et.al. "Optimization of Collective Communication
552 Operations in MPICH", International Journal of High Performance Computing
553 Applications, Vol. 19, No. 1, 49-66 (2005)
554 - for power-of-two processes and small and medium size messages
555 (up to 512KB) use recursive doubling
556 - for non-power-of-two processes and small messages (80KB) use bruck,
557 - for everything else use ring.
559 if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
560 return allgather__rdb(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
561 } else if (total_dsize <= 81920) {
562 return allgather__bruck(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
564 return allgather__ring(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
568 /* This is the default implementation of allgatherv. The algorithm is:
570 Algorithm: MPI_Allgatherv
572 For short messages and non-power-of-two no. of processes, we use
573 the algorithm from the Jehoshua Bruck et al IEEE TPDS Nov 97
574 paper. It is a variant of the disemmination algorithm for
575 barrier. It takes ceiling(lg p) steps.
577 Cost = lgp.alpha + n.((p-1)/p).beta
578 where n is total size of data gathered on each process.
580 For short or medium-size messages and power-of-two no. of
581 processes, we use the recursive doubling algorithm.
583 Cost = lgp.alpha + n.((p-1)/p).beta
585 TODO: On TCP, we may want to use recursive doubling instead of the Bruck
586 algorithm in all cases because of the pairwise-exchange property of
587 recursive doubling (see Benson et al paper in Euro PVM/MPI
590 For long messages or medium-size messages and non-power-of-two
591 no. of processes, we use a ring algorithm. In the first step, each
592 process i sends its contribution to process i+1 and receives
593 the contribution from process i-1 (with wrap-around). From the
594 second step onwards, each process i forwards to process i+1 the
595 data it received from process i-1 in the previous step. This takes
596 a total of p-1 steps.
598 Cost = (p-1).alpha + n.((p-1)/p).beta
600 Possible improvements:
602 End Algorithm: MPI_Allgatherv
604 int allgatherv__mpich(const void *sbuf, int scount,
606 void* rbuf, const int *rcounts,
612 int communicator_size, pow2_size,i;
615 communicator_size = comm->size();
617 /* Determine complete data size */
619 for (i=0; i<communicator_size; i++)
620 total_dsize += rcounts[i];
621 if (total_dsize == 0)
624 for (pow2_size = 1; pow2_size < communicator_size; pow2_size <<=1);
626 if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
627 return allgatherv__mpich_rdb(sbuf, scount, sdtype, rbuf, rcounts, rdispls, rdtype, comm);
628 } else if (total_dsize <= 81920) {
629 return allgatherv__ompi_bruck(sbuf, scount, sdtype, rbuf, rcounts, rdispls, rdtype, comm);
631 return allgatherv__mpich_ring(sbuf, scount, sdtype, rbuf, rcounts, rdispls, rdtype, comm);
634 /* This is the default implementation of gather. The algorithm is:
636 Algorithm: MPI_Gather
638 We use a binomial tree algorithm for both short and long
639 messages. At nodes other than leaf nodes we need to allocate a
640 temporary buffer to store the incoming message. If the root is not
641 rank 0, for very small messages, we pack it into a temporary
642 contiguous buffer and reorder it to be placed in the right
643 order. For small (but not very small) messages, we use a derived
644 datatype to unpack the incoming data into non-contiguous buffers in
645 the right order. In the heterogeneous case we first pack the
646 buffers by using MPI_Pack and then do the gather.
648 Cost = lgp.alpha + n.((p-1)/p).beta
649 where n is the total size of the data gathered at the root.
651 Possible improvements:
653 End Algorithm: MPI_Gather
656 int gather__mpich(const void *sbuf, int scount,
658 void* rbuf, int rcount,
664 return gather__ompi_binomial(sbuf, scount, sdtype,
665 rbuf, rcount, rdtype,
669 /* This is the default implementation of scatter. The algorithm is:
671 Algorithm: MPI_Scatter
673 We use a binomial tree algorithm for both short and
674 long messages. At nodes other than leaf nodes we need to allocate
675 a temporary buffer to store the incoming message. If the root is
676 not rank 0, we reorder the sendbuf in order of relative ranks by
677 copying it into a temporary buffer, so that all the sends from the
678 root are contiguous and in the right order. In the heterogeneous
679 case, we first pack the buffer by using MPI_Pack and then do the
682 Cost = lgp.alpha + n.((p-1)/p).beta
683 where n is the total size of the data to be scattered from the root.
685 Possible improvements:
687 End Algorithm: MPI_Scatter
691 int scatter__mpich(const void *sbuf, int scount,
693 void* rbuf, int rcount,
695 int root, MPI_Comm comm
698 std::unique_ptr<unsigned char[]> tmp_buf;
699 if(comm->rank()!=root){
700 tmp_buf = std::make_unique<unsigned char[]>(rcount * rdtype->get_extent());
701 sbuf = tmp_buf.get();
705 return scatter__ompi_binomial(sbuf, scount, sdtype, rbuf, rcount, rdtype, root, comm);