1 /* selector for collective algorithms based on openmpi's default coll_tuned_decision_fixed selector */
3 /* Copyright (c) 2009-2019. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include "colls_private.hpp"
14 int Coll_allreduce_ompi::allreduce(const void *sbuf, void *rbuf, int count,
15 MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
17 size_t dsize, block_dsize;
18 int comm_size = comm->size();
19 const size_t intermediate_message = 10000;
22 * Decision function based on MX results from the Grig cluster at UTK.
24 * Currently, linear, recursive doubling, and nonoverlapping algorithms
25 * can handle both commutative and non-commutative operations.
26 * Ring algorithm does not support non-commutative operations.
28 dsize = dtype->size();
29 block_dsize = dsize * count;
31 if (block_dsize < intermediate_message) {
32 return (Coll_allreduce_rdb::allreduce (sbuf, rbuf,
37 if( ((op==MPI_OP_NULL) || op->is_commutative()) && (count > comm_size) ) {
38 const size_t segment_size = 1 << 20; /* 1 MB */
39 if ((comm_size * segment_size >= block_dsize)) {
40 //FIXME: ok, these are not the right algorithms, try to find closer ones
41 // lr is a good match for allreduce_ring (difference is mainly the use of sendrecv)
42 return Coll_allreduce_lr::allreduce(sbuf, rbuf, count, dtype,
45 return (Coll_allreduce_ompi_ring_segmented::allreduce (sbuf, rbuf,
52 return (Coll_allreduce_redbcast::allreduce(sbuf, rbuf, count,
58 int Coll_alltoall_ompi::alltoall(const void *sbuf, int scount,
60 void* rbuf, int rcount,
64 int communicator_size;
65 size_t dsize, block_dsize;
66 communicator_size = comm->size();
68 /* Decision function based on measurement on Grig cluster at
69 the University of Tennessee (2GB MX) up to 64 nodes.
70 Has better performance for messages of intermediate sizes than the old one */
71 /* determine block size */
72 dsize = sdtype->size();
73 block_dsize = dsize * scount;
75 if ((block_dsize < 200) && (communicator_size > 12)) {
76 return Coll_alltoall_bruck::alltoall(sbuf, scount, sdtype,
80 } else if (block_dsize < 3000) {
81 return Coll_alltoall_basic_linear::alltoall(sbuf, scount, sdtype,
86 return Coll_alltoall_ring::alltoall (sbuf, scount, sdtype,
91 int Coll_alltoallv_ompi::alltoallv(const void *sbuf, const int *scounts, const int *sdisps,
93 void *rbuf, const int *rcounts, const int *rdisps,
98 /* For starters, just keep the original algorithm. */
99 return Coll_alltoallv_ring::alltoallv(sbuf, scounts, sdisps, sdtype,
100 rbuf, rcounts, rdisps,rdtype,
105 int Coll_barrier_ompi::barrier(MPI_Comm comm)
106 { int communicator_size = comm->size();
108 if( 2 == communicator_size )
109 return Coll_barrier_ompi_two_procs::barrier(comm);
110 /* * Basic optimisation. If we have a power of 2 number of nodes*/
111 /* * the use the recursive doubling algorithm, otherwise*/
112 /* * bruck is the one we want.*/
115 for( ; communicator_size > 0; communicator_size >>= 1 ) {
116 if( communicator_size & 0x1 ) {
118 return Coll_barrier_ompi_bruck::barrier(comm);
123 return Coll_barrier_ompi_recursivedoubling::barrier(comm);
126 int Coll_bcast_ompi::bcast(void *buff, int count,
127 MPI_Datatype datatype, int root,
131 /* Decision function based on MX results for
132 messages up to 36MB and communicator sizes up to 64 nodes */
133 const size_t small_message_size = 2048;
134 const size_t intermediate_message_size = 370728;
135 const double a_p16 = 3.2118e-6; /* [1 / byte] */
136 const double b_p16 = 8.7936;
137 const double a_p64 = 2.3679e-6; /* [1 / byte] */
138 const double b_p64 = 1.1787;
139 const double a_p128 = 1.6134e-6; /* [1 / byte] */
140 const double b_p128 = 2.1102;
142 int communicator_size;
144 size_t message_size, dsize;
146 communicator_size = comm->size();
148 /* else we need data size for decision function */
149 dsize = datatype->size();
150 message_size = dsize * (unsigned long)count; /* needed for decision */
152 /* Handle messages of small and intermediate size, and
153 single-element broadcasts */
154 if ((message_size < small_message_size) || (count <= 1)) {
155 /* Binomial without segmentation */
156 return Coll_bcast_binomial_tree::bcast (buff, count, datatype,
159 } else if (message_size < intermediate_message_size) {
160 // SplittedBinary with 1KB segments
161 return Coll_bcast_ompi_split_bintree::bcast(buff, count, datatype,
165 //Handle large message sizes
166 else if (communicator_size < (a_p128 * message_size + b_p128)) {
167 //Pipeline with 128KB segments
168 //segsize = 1024 << 7;
169 return Coll_bcast_ompi_pipeline::bcast (buff, count, datatype,
173 } else if (communicator_size < 13) {
174 // Split Binary with 8KB segments
175 return Coll_bcast_ompi_split_bintree::bcast(buff, count, datatype,
178 } else if (communicator_size < (a_p64 * message_size + b_p64)) {
179 // Pipeline with 64KB segments
180 //segsize = 1024 << 6;
181 return Coll_bcast_ompi_pipeline::bcast (buff, count, datatype,
185 } else if (communicator_size < (a_p16 * message_size + b_p16)) {
186 //Pipeline with 16KB segments
187 //segsize = 1024 << 4;
188 return Coll_bcast_ompi_pipeline::bcast (buff, count, datatype,
193 /* Pipeline with 8KB segments */
194 //segsize = 1024 << 3;
195 return Coll_bcast_flattree_pipeline::bcast (buff, count, datatype,
199 /* this is based on gige measurements */
201 if (communicator_size < 4) {
202 return Coll_bcast_intra_basic_linear::bcast (buff, count, datatype, root, comm, module);
204 if (communicator_size == 4) {
205 if (message_size < 524288) segsize = 0;
206 else segsize = 16384;
207 return Coll_bcast_intra_bintree::bcast (buff, count, datatype, root, comm, module, segsize);
209 if (communicator_size <= 8 && message_size < 4096) {
210 return Coll_bcast_intra_basic_linear::bcast (buff, count, datatype, root, comm, module);
212 if (communicator_size > 8 && message_size >= 32768 && message_size < 524288) {
214 return Coll_bcast_intra_bintree::bcast (buff, count, datatype, root, comm, module, segsize);
216 if (message_size >= 524288) {
218 return Coll_bcast_intra_pipeline::bcast (buff, count, datatype, root, comm, module, segsize);
221 /* once tested can swap this back in */
222 /* return Coll_bcast_intra_bmtree::bcast (buff, count, datatype, root, comm, segsize); */
223 return Coll_bcast_intra_bintree::bcast (buff, count, datatype, root, comm, module, segsize);
227 int Coll_reduce_ompi::reduce(const void *sendbuf, void *recvbuf,
228 int count, MPI_Datatype datatype,
233 int communicator_size=0;
235 size_t message_size, dsize;
236 const double a1 = 0.6016 / 1024.0; /* [1/B] */
237 const double b1 = 1.3496;
238 const double a2 = 0.0410 / 1024.0; /* [1/B] */
239 const double b2 = 9.7128;
240 const double a3 = 0.0422 / 1024.0; /* [1/B] */
241 const double b3 = 1.1614;
242 //const double a4 = 0.0033 / 1024.0; [1/B]
243 //const double b4 = 1.6761;
245 /* no limit on # of outstanding requests */
246 //const int max_requests = 0;
248 communicator_size = comm->size();
250 /* need data size for decision function */
251 dsize=datatype->size();
252 message_size = dsize * count; /* needed for decision */
255 * If the operation is non commutative we currently have choice of linear
256 * or in-order binary tree algorithm.
258 if ((op != MPI_OP_NULL) && not op->is_commutative()) {
259 if ((communicator_size < 12) && (message_size < 2048)) {
260 return Coll_reduce_ompi_basic_linear::reduce(sendbuf, recvbuf, count, datatype, op, root, comm /*, module*/);
262 return Coll_reduce_ompi_in_order_binary::reduce(sendbuf, recvbuf, count, datatype, op, root, comm /*, module,
266 if ((communicator_size < 8) && (message_size < 512)){
268 return Coll_reduce_ompi_basic_linear::reduce (sendbuf, recvbuf, count, datatype, op, root, comm);
269 } else if (((communicator_size < 8) && (message_size < 20480)) ||
270 (message_size < 2048) || (count <= 1)) {
273 return Coll_reduce_ompi_binomial::reduce(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
274 segsize, max_requests*/);
275 } else if (communicator_size > (a1 * message_size + b1)) {
278 return Coll_reduce_ompi_binomial::reduce(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
279 segsize, max_requests*/);
280 } else if (communicator_size > (a2 * message_size + b2)) {
283 return Coll_reduce_ompi_pipeline::reduce (sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
284 segsize, max_requests*/);
285 } else if (communicator_size > (a3 * message_size + b3)) {
288 return Coll_reduce_ompi_binary::reduce( sendbuf, recvbuf, count, datatype, op, root,
289 comm/*, module, segsize, max_requests*/);
291 // if (communicator_size > (a4 * message_size + b4)) {
293 // segsize = 32*1024;
296 // segsize = 64*1024;
298 return Coll_reduce_ompi_pipeline::reduce (sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
299 segsize, max_requests*/);
302 /* for small messages use linear algorithm */
303 if (message_size <= 4096) {
305 fanout = communicator_size - 1;
306 /* when linear implemented or taken from basic put here, right now using chain as a linear system */
307 /* it is implemented and I shouldn't be calling a chain with a fanout bigger than MAXTREEFANOUT from topo.h! */
308 return Coll_reduce_intra_basic_linear::reduce (sendbuf, recvbuf, count, datatype, op, root, comm, module);
309 /* return Coll_reduce_intra_chain::reduce (sendbuf, recvbuf, count, datatype, op, root, comm, segsize, fanout); */
311 if (message_size < 524288) {
312 if (message_size <= 65536 ) {
317 fanout = communicator_size/2;
319 /* later swap this for a binary tree */
321 return Coll_reduce_intra_chain::reduce (sendbuf, recvbuf, count, datatype, op, root, comm, module,
322 segsize, fanout, max_requests);
325 return Coll_reduce_intra_pipeline::reduce (sendbuf, recvbuf, count, datatype, op, root, comm, module,
326 segsize, max_requests);
330 int Coll_reduce_scatter_ompi::reduce_scatter(const void *sbuf, void *rbuf,
337 int comm_size, i, pow2;
338 size_t total_message_size, dsize;
339 const double a = 0.0012;
340 const double b = 8.0;
341 const size_t small_message_size = 12 * 1024;
342 const size_t large_message_size = 256 * 1024;
345 XBT_DEBUG("Coll_reduce_scatter_ompi::reduce_scatter");
347 comm_size = comm->size();
348 // We need data size for decision function
350 total_message_size = 0;
351 for (i = 0; i < comm_size; i++) {
352 total_message_size += rcounts[i];
353 if (0 == rcounts[i]) {
358 if (((op != MPI_OP_NULL) && not op->is_commutative()) || (zerocounts)) {
359 Coll_reduce_scatter_default::reduce_scatter(sbuf, rbuf, rcounts, dtype, op, comm);
363 total_message_size *= dsize;
365 // compute the nearest power of 2
366 for (pow2 = 1; pow2 < comm_size; pow2 <<= 1);
368 if ((total_message_size <= small_message_size) ||
369 ((total_message_size <= large_message_size) && (pow2 == comm_size)) ||
370 (comm_size >= a * total_message_size + b)) {
372 Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(sbuf, rbuf, rcounts,
376 return Coll_reduce_scatter_ompi_ring::reduce_scatter(sbuf, rbuf, rcounts,
384 int Coll_allgather_ompi::allgather(const void *sbuf, int scount,
386 void* rbuf, int rcount,
391 int communicator_size, pow2_size;
392 size_t dsize, total_dsize;
394 communicator_size = comm->size();
396 /* Special case for 2 processes */
397 if (communicator_size == 2) {
398 return Coll_allgather_pair::allgather (sbuf, scount, sdtype,
399 rbuf, rcount, rdtype,
403 /* Determine complete data size */
404 dsize=sdtype->size();
405 total_dsize = dsize * scount * communicator_size;
407 for (pow2_size = 1; pow2_size < communicator_size; pow2_size <<=1);
409 /* Decision based on MX 2Gb results from Grig cluster at
410 The University of Tennesse, Knoxville
411 - if total message size is less than 50KB use either bruck or
412 recursive doubling for non-power of two and power of two nodes,
414 - else use ring and neighbor exchange algorithms for odd and even
415 number of nodes, respectively.
417 if (total_dsize < 50000) {
418 if (pow2_size == communicator_size) {
419 return Coll_allgather_rdb::allgather(sbuf, scount, sdtype,
420 rbuf, rcount, rdtype,
423 return Coll_allgather_bruck::allgather(sbuf, scount, sdtype,
424 rbuf, rcount, rdtype,
428 if (communicator_size % 2) {
429 return Coll_allgather_ring::allgather(sbuf, scount, sdtype,
430 rbuf, rcount, rdtype,
433 return Coll_allgather_ompi_neighborexchange::allgather(sbuf, scount, sdtype,
434 rbuf, rcount, rdtype,
439 #if defined(USE_MPICH2_DECISION)
440 /* Decision as in MPICH-2
441 presented in Thakur et.al. "Optimization of Collective Communication
442 Operations in MPICH", International Journal of High Performance Computing
443 Applications, Vol. 19, No. 1, 49-66 (2005)
444 - for power-of-two processes and small and medium size messages
445 (up to 512KB) use recursive doubling
446 - for non-power-of-two processes and small messages (80KB) use bruck,
447 - for everything else use ring.
449 if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
450 return Coll_allgather_rdb::allgather(sbuf, scount, sdtype,
451 rbuf, rcount, rdtype,
453 } else if (total_dsize <= 81920) {
454 return Coll_allgather_bruck::allgather(sbuf, scount, sdtype,
455 rbuf, rcount, rdtype,
458 return Coll_allgather_ring::allgather(sbuf, scount, sdtype,
459 rbuf, rcount, rdtype,
461 #endif /* defined(USE_MPICH2_DECISION) */
464 int Coll_allgatherv_ompi::allgatherv(const void *sbuf, int scount,
466 void* rbuf, const int *rcounts,
473 int communicator_size;
474 size_t dsize, total_dsize;
476 communicator_size = comm->size();
478 /* Special case for 2 processes */
479 if (communicator_size == 2) {
480 return Coll_allgatherv_pair::allgatherv(sbuf, scount, sdtype,
481 rbuf, rcounts, rdispls, rdtype,
485 /* Determine complete data size */
486 dsize=sdtype->size();
488 for (i = 0; i < communicator_size; i++) {
489 total_dsize += dsize * rcounts[i];
492 /* Decision based on allgather decision. */
493 if (total_dsize < 50000) {
494 return Coll_allgatherv_ompi_bruck::allgatherv(sbuf, scount, sdtype,
495 rbuf, rcounts, rdispls, rdtype,
499 if (communicator_size % 2) {
500 return Coll_allgatherv_ring::allgatherv(sbuf, scount, sdtype,
501 rbuf, rcounts, rdispls, rdtype,
504 return Coll_allgatherv_ompi_neighborexchange::allgatherv(sbuf, scount, sdtype,
505 rbuf, rcounts, rdispls, rdtype,
511 int Coll_gather_ompi::gather(const void *sbuf, int scount,
513 void* rbuf, int rcount,
519 //const int large_segment_size = 32768;
520 //const int small_segment_size = 1024;
522 //const size_t large_block_size = 92160;
523 const size_t intermediate_block_size = 6000;
524 const size_t small_block_size = 1024;
526 const int large_communicator_size = 60;
527 const int small_communicator_size = 10;
529 int communicator_size, rank;
530 size_t dsize, block_size;
532 XBT_DEBUG("smpi_coll_tuned_gather_ompi");
534 communicator_size = comm->size();
537 // Determine block size
539 dsize = rdtype->size();
540 block_size = dsize * rcount;
542 dsize = sdtype->size();
543 block_size = dsize * scount;
546 /* if (block_size > large_block_size) {*/
547 /* return smpi_coll_tuned_gather_ompi_linear_sync (sbuf, scount, sdtype, */
548 /* rbuf, rcount, rdtype, */
551 /* } else*/ if (block_size > intermediate_block_size) {
552 return Coll_gather_ompi_linear_sync::gather (sbuf, scount, sdtype,
553 rbuf, rcount, rdtype,
556 } else if ((communicator_size > large_communicator_size) ||
557 ((communicator_size > small_communicator_size) &&
558 (block_size < small_block_size))) {
559 return Coll_gather_ompi_binomial::gather (sbuf, scount, sdtype,
560 rbuf, rcount, rdtype,
564 // Otherwise, use basic linear
565 return Coll_gather_ompi_basic_linear::gather (sbuf, scount, sdtype,
566 rbuf, rcount, rdtype,
571 int Coll_scatter_ompi::scatter(const void *sbuf, int scount,
573 void* rbuf, int rcount,
575 int root, MPI_Comm comm
578 const size_t small_block_size = 300;
579 const int small_comm_size = 10;
580 int communicator_size, rank;
581 size_t dsize, block_size;
583 XBT_DEBUG("Coll_scatter_ompi::scatter");
585 communicator_size = comm->size();
587 // Determine block size
589 dsize=sdtype->size();
590 block_size = dsize * scount;
592 dsize=rdtype->size();
593 block_size = dsize * rcount;
596 if ((communicator_size > small_comm_size) &&
597 (block_size < small_block_size)) {
598 std::unique_ptr<unsigned char[]> tmp_buf;
600 tmp_buf.reset(new unsigned char[rcount * rdtype->get_extent()]);
601 sbuf = tmp_buf.get();
605 return Coll_scatter_ompi_binomial::scatter(sbuf, scount, sdtype, rbuf, rcount, rdtype, root, comm);
607 return Coll_scatter_ompi_basic_linear::scatter (sbuf, scount, sdtype,
608 rbuf, rcount, rdtype,