1 /* Copyright (c) 2013-2022. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9 * University Research and Technology
10 * Corporation. All rights reserved.
11 * Copyright (c) 2004-2012 The University of Tennessee and The University
12 * of Tennessee Research Foundation. All rights
14 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15 * University of Stuttgart. All rights reserved.
16 * Copyright (c) 2004-2005 The Regents of the University of California.
17 * All rights reserved.
18 * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
19 * Copyright (c) 2009 University of Houston. All rights reserved.
21 * Additional copyrights may follow
24 #include "../coll_tuned_topo.hpp"
25 #include "../colls_private.hpp"
28 * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
29 * I have removed the part which handles "large" message sizes
30 * (non-overlapping version of reduce_Scatter).
33 /* copied function (with appropriate renaming) starts here */
36 * reduce_scatter_ompi_basic_recursivehalving
38 * Function: - reduce scatter implementation using recursive-halving
40 * Accepts: - same as MPI_Reduce_scatter()
41 * Returns: - MPI_SUCCESS or error code
42 * Limitation: - Works only for commutative operations.
46 int reduce_scatter__ompi_basic_recursivehalving(const void *sbuf,
54 int i, rank, size, count, err = MPI_SUCCESS;
55 int tmp_size = 1, remain = 0, tmp_rank;
56 ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
57 unsigned char *result_buf = nullptr, *result_buf_free = nullptr;
63 XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
64 if ((op != MPI_OP_NULL && not op->is_commutative()))
65 throw std::invalid_argument(
66 "reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations!");
68 /* Find displacements and the like */
69 int* disps = new int[size];
72 for (i = 0; i < (size - 1); ++i) {
73 disps[i + 1] = disps[i] + rcounts[i];
75 count = disps[size - 1] + rcounts[size - 1];
77 /* short cut the trivial case */
83 /* get datatype information */
84 dtype->extent(&lb, &extent);
85 dtype->extent(&true_lb, &true_extent);
86 buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
88 /* Handle MPI_IN_PLACE */
89 if (MPI_IN_PLACE == sbuf) {
93 /* Allocate temporary receive buffer. */
94 unsigned char* recv_buf_free = smpi_get_tmp_recvbuffer(buf_size);
95 unsigned char* recv_buf = recv_buf_free - lb;
96 if (nullptr == recv_buf_free) {
101 /* allocate temporary buffer for results */
102 result_buf_free = smpi_get_tmp_sendbuffer(buf_size);
103 result_buf = result_buf_free - lb;
105 /* copy local buffer into the temporary results */
106 err =Datatype::copy(sbuf, count, dtype, result_buf, count, dtype);
107 if (MPI_SUCCESS != err) goto cleanup;
109 /* figure out power of two mapping: grow until larger than
110 comm size, then go back one, to get the largest power of
111 two less than comm size */
112 while (tmp_size <= size) tmp_size <<= 1;
114 remain = size - tmp_size;
116 /* If comm size is not a power of two, have the first "remain"
117 procs with an even rank send to rank + 1, leaving a power of
118 two procs to do the rest of the algorithm */
119 if (rank < 2 * remain) {
120 if ((rank & 1) == 0) {
121 Request::send(result_buf, count, dtype, rank + 1,
122 COLL_TAG_REDUCE_SCATTER,
124 /* we don't participate from here on out */
127 Request::recv(recv_buf, count, dtype, rank - 1,
128 COLL_TAG_REDUCE_SCATTER,
129 comm, MPI_STATUS_IGNORE);
131 /* integrate their results into our temp results */
132 if(op!=MPI_OP_NULL) op->apply( recv_buf, result_buf, &count, dtype);
134 /* adjust rank to be the bottom "remain" ranks */
138 /* just need to adjust rank to show that the bottom "even
139 remain" ranks dropped out */
140 tmp_rank = rank - remain;
143 /* For ranks not kicked out by the above code, perform the
146 int mask, send_index, recv_index, last_index;
148 /* recalculate disps and rcounts to account for the
149 special "remainder" processes that are no longer doing
151 int* tmp_rcounts = new int[tmp_size];
152 int* tmp_disps = new int[tmp_size];
154 for (i = 0 ; i < tmp_size ; ++i) {
156 /* need to include old neighbor as well */
157 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
159 tmp_rcounts[i] = rcounts[i + remain];
164 for (i = 0; i < tmp_size - 1; ++i) {
165 tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
168 /* do the recursive halving communication. Don't use the
169 dimension information on the communicator because I
170 think the information is invalidated by our "shrinking"
171 of the communicator */
172 mask = tmp_size >> 1;
173 send_index = recv_index = 0;
174 last_index = tmp_size;
176 int tmp_peer, peer, send_count, recv_count;
179 tmp_peer = tmp_rank ^ mask;
180 peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
182 /* figure out if we're sending, receiving, or both */
183 send_count = recv_count = 0;
184 if (tmp_rank < tmp_peer) {
185 send_index = recv_index + mask;
186 for (i = send_index ; i < last_index ; ++i) {
187 send_count += tmp_rcounts[i];
189 for (i = recv_index ; i < send_index ; ++i) {
190 recv_count += tmp_rcounts[i];
193 recv_index = send_index + mask;
194 for (i = send_index ; i < recv_index ; ++i) {
195 send_count += tmp_rcounts[i];
197 for (i = recv_index ; i < last_index ; ++i) {
198 recv_count += tmp_rcounts[i];
202 /* actual data transfer. Send from result_buf,
203 receive into recv_buf */
204 if (send_count > 0 && recv_count != 0) {
205 request=Request::irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
206 recv_count, dtype, peer,
207 COLL_TAG_REDUCE_SCATTER,
209 if (MPI_SUCCESS != err) {
210 delete[] tmp_rcounts;
215 if (recv_count > 0 && send_count != 0) {
216 Request::send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
217 send_count, dtype, peer,
218 COLL_TAG_REDUCE_SCATTER,
220 if (MPI_SUCCESS != err) {
221 delete[] tmp_rcounts;
226 if (send_count > 0 && recv_count != 0) {
227 Request::wait(&request, MPI_STATUS_IGNORE);
230 /* if we received something on this step, push it into
231 the results buffer */
232 if (recv_count > 0) {
233 if(op!=MPI_OP_NULL) op->apply(
234 recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
235 result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
239 /* update for next iteration */
240 send_index = recv_index;
241 last_index = recv_index + mask;
245 /* copy local results from results buffer into real receive buffer */
246 if (0 != rcounts[rank]) {
247 err = Datatype::copy(result_buf + disps[rank] * extent,
248 rcounts[rank], dtype,
249 rbuf, rcounts[rank], dtype);
250 if (MPI_SUCCESS != err) {
251 delete[] tmp_rcounts;
257 delete[] tmp_rcounts;
261 /* Now fix up the non-power of two case, by having the odd
262 procs send the even procs the proper results */
263 if (rank < (2 * remain)) {
264 if ((rank & 1) == 0) {
266 Request::recv(rbuf, rcounts[rank], dtype, rank + 1,
267 COLL_TAG_REDUCE_SCATTER,
268 comm, MPI_STATUS_IGNORE);
271 if (rcounts[rank - 1]) {
272 Request::send(result_buf + disps[rank - 1] * extent,
273 rcounts[rank - 1], dtype, rank - 1,
274 COLL_TAG_REDUCE_SCATTER,
282 if (nullptr != recv_buf_free)
283 smpi_free_tmp_buffer(recv_buf_free);
284 if (nullptr != result_buf_free)
285 smpi_free_tmp_buffer(result_buf_free);
290 /* copied function (with appropriate renaming) ends here */
294 * Coll_reduce_scatter_ompi_ring::reduce_scatter
296 * Function: Ring algorithm for reduce_scatter operation
297 * Accepts: Same as MPI_Reduce_scatter()
298 * Returns: MPI_SUCCESS or error code
300 * Description: Implements ring algorithm for reduce_scatter:
301 * the block sizes defined in rcounts are exchanged and
302 8 updated until they reach proper destination.
303 * Algorithm requires 2 * max(rcounts) extra buffering
305 * Limitations: The algorithm DOES NOT preserve order of operations so it
306 * can be used only for commutative operations.
307 * Example on 5 nodes:
310 * [00] [10] -> [20] [30] [40]
311 * [01] [11] [21] -> [31] [41]
312 * [02] [12] [22] [32] -> [42]
313 * -> [03] [13] [23] [33] [43] --> ..
314 * [04] -> [14] [24] [34] [44]
317 * Step 0: rank r sends block (r-1) to rank (r+1) and
318 * receives block (r+1) from rank (r-1) [with wraparound].
320 * [00] [10] [10+20] -> [30] [40]
321 * [01] [11] [21] [21+31] -> [41]
322 * -> [02] [12] [22] [32] [32+42] -->..
323 * [43+03] -> [13] [23] [33] [43]
324 * [04] [04+14] -> [24] [34] [44]
328 * [00] [10] [10+20] [10+20+30] -> [40]
329 * -> [01] [11] [21] [21+31] [21+31+41] ->
330 * [32+42+02] -> [12] [22] [32] [32+42]
331 * [03] [43+03+13] -> [23] [33] [43]
332 * [04] [04+14] [04+14+24] -> [34] [44]
336 * -> [00] [10] [10+20] [10+20+30] [10+20+30+40] ->
337 * [21+31+41+01]-> [11] [21] [21+31] [21+31+41]
338 * [32+42+02] [32+42+02+12]-> [22] [32] [32+42]
339 * [03] [43+03+13] [43+03+13+23]-> [33] [43]
340 * [04] [04+14] [04+14+24] [04+14+24+34] -> [44]
344 * [10+20+30+40+00] [10] [10+20] [10+20+30] [10+20+30+40]
345 * [21+31+41+01] [21+31+41+01+11] [21] [21+31] [21+31+41]
346 * [32+42+02] [32+42+02+12] [32+42+02+12+22] [32] [32+42]
347 * [03] [43+03+13] [43+03+13+23] [43+03+13+23+33] [43]
348 * [04] [04+14] [04+14+24] [04+14+24+34] [04+14+24+34+44]
352 int reduce_scatter__ompi_ring(const void *sbuf, void *rbuf, const int *rcounts,
358 int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
360 unsigned char *tmpsend = nullptr, *tmprecv = nullptr, *accumbuf = nullptr, *accumbuf_free = nullptr;
361 unsigned char *inbuf_free[2] = {nullptr, nullptr}, *inbuf[2] = {nullptr, nullptr};
362 ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
363 MPI_Request reqs[2] = {nullptr, nullptr};
368 XBT_DEBUG( "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d",
371 /* Determine the maximum number of elements per node,
372 corresponding block size, and displacements array.
374 int* displs = new int[size];
377 total_count = rcounts[0];
378 max_block_count = rcounts[0];
379 for (i = 1; i < size; i++) {
380 displs[i] = total_count;
381 total_count += rcounts[i];
382 if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
385 /* Special case for size == 1 */
387 if (MPI_IN_PLACE != sbuf) {
388 ret = Datatype::copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
389 if (ret < 0) { line = __LINE__; goto error_hndl; }
395 /* Allocate and initialize temporary buffers, we need:
396 - a temporary buffer to perform reduction (size total_count) since
397 rbuf can be of rcounts[rank] size.
398 - up to two temporary buffers used for communication/computation overlap.
400 dtype->extent(&lb, &extent);
401 dtype->extent(&true_lb, &true_extent);
403 max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
405 accumbuf_free = smpi_get_tmp_recvbuffer(true_extent + (ptrdiff_t)(total_count - 1) * extent);
406 if (nullptr == accumbuf_free) {
411 accumbuf = accumbuf_free - lb;
413 inbuf_free[0] = smpi_get_tmp_sendbuffer(max_real_segsize);
414 if (nullptr == inbuf_free[0]) {
419 inbuf[0] = inbuf_free[0] - lb;
421 inbuf_free[1] = smpi_get_tmp_sendbuffer(max_real_segsize);
422 if (nullptr == inbuf_free[1]) {
427 inbuf[1] = inbuf_free[1] - lb;
430 /* Handle MPI_IN_PLACE for size > 1 */
431 if (MPI_IN_PLACE == sbuf) {
435 ret = Datatype::copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
436 if (ret < 0) { line = __LINE__; goto error_hndl; }
438 /* Computation loop */
441 For each of the remote nodes:
442 - post irecv for block (r-2) from (r-1) with wrap around
443 - send block (r-1) to (r+1)
444 - in loop for every step k = 2 .. n
445 - post irecv for block (r - 1 + n - k) % n
446 - wait on block (r + n - k) % n to arrive
447 - compute on block (r + n - k ) % n
448 - send block (r + n - k) % n
450 - compute on block (r)
451 - copy block (r) to rbuf
452 Note that we must be careful when computing the beginning of buffers and
453 for send operations and computation we must compute the exact block size.
455 send_to = (rank + 1) % size;
456 recv_from = (rank + size - 1) % size;
459 /* Initialize first receive from the neighbor on the left */
460 reqs[inbi]=Request::irecv(inbuf[inbi], max_block_count, dtype, recv_from,
461 COLL_TAG_REDUCE_SCATTER, comm
463 tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
464 Request::send(tmpsend, rcounts[recv_from], dtype, send_to,
465 COLL_TAG_REDUCE_SCATTER,
468 for (k = 2; k < size; k++) {
469 const int prevblock = (rank + size - k) % size;
473 /* Post irecv for the current block */
474 reqs[inbi]=Request::irecv(inbuf[inbi], max_block_count, dtype, recv_from,
475 COLL_TAG_REDUCE_SCATTER, comm
478 /* Wait on previous block to arrive */
479 Request::wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
481 /* Apply operation on previous block: result goes to rbuf
482 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
484 tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
485 if (op != MPI_OP_NULL)
486 op->apply(inbuf[inbi ^ 0x1], tmprecv, &rcounts[prevblock], dtype);
488 /* send previous block to send_to */
489 Request::send(tmprecv, rcounts[prevblock], dtype, send_to,
490 COLL_TAG_REDUCE_SCATTER,
494 /* Wait on the last block to arrive */
495 Request::wait(&reqs[inbi], MPI_STATUS_IGNORE);
497 /* Apply operation on the last block (my block)
498 rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
499 tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
500 if (op != MPI_OP_NULL)
501 op->apply(inbuf[inbi], tmprecv, &rcounts[rank], dtype);
503 /* Copy result from tmprecv to rbuf */
504 ret = Datatype::copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
505 if (ret < 0) { line = __LINE__; goto error_hndl; }
508 if (nullptr != accumbuf_free)
509 smpi_free_tmp_buffer(accumbuf_free);
510 if (nullptr != inbuf_free[0])
511 smpi_free_tmp_buffer(inbuf_free[0]);
512 if (nullptr != inbuf_free[1])
513 smpi_free_tmp_buffer(inbuf_free[1]);
518 XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
519 __FILE__, line, rank, ret);
521 if (nullptr != accumbuf_free)
522 smpi_free_tmp_buffer(accumbuf_free);
523 if (nullptr != inbuf_free[0])
524 smpi_free_tmp_buffer(inbuf_free[0]);
525 if (nullptr != inbuf_free[1])
526 smpi_free_tmp_buffer(inbuf_free[1]);
530 static int ompi_sum_counts(const int *counts, int *displs, int nprocs_rem, int lo, int hi)
532 /* Adjust lo and hi for taking into account blocks of excluded processes */
533 lo = (lo < nprocs_rem) ? lo * 2 : lo + nprocs_rem;
534 hi = (hi < nprocs_rem) ? hi * 2 + 1 : hi + nprocs_rem;
535 return displs[hi] + counts[hi] - displs[lo];
539 * ompi_mirror_perm: Returns mirror permutation of nbits low-order bits
541 * [*] Warren Jr., Henry S. Hacker's Delight (2ed). 2013.
542 * Chapter 7. Rearranging Bits and Bytes.
544 static unsigned int ompi_mirror_perm(unsigned int x, int nbits)
546 x = (((x & 0xaaaaaaaa) >> 1) | ((x & 0x55555555) << 1));
547 x = (((x & 0xcccccccc) >> 2) | ((x & 0x33333333) << 2));
548 x = (((x & 0xf0f0f0f0) >> 4) | ((x & 0x0f0f0f0f) << 4));
549 x = (((x & 0xff00ff00) >> 8) | ((x & 0x00ff00ff) << 8));
550 x = ((x >> 16) | (x << 16));
551 return x >> (sizeof(x) * 8 - nbits);
554 * ompi_coll_base_reduce_scatter_intra_butterfly
556 * Function: Butterfly algorithm for reduce_scatter
557 * Accepts: Same as MPI_Reduce_scatter
558 * Returns: MPI_SUCCESS or error code
560 * Description: Implements butterfly algorithm for MPI_Reduce_scatter [*].
561 * The algorithm can be used both by commutative and non-commutative
562 * operations, for power-of-two and non-power-of-two number of processes.
564 * [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter
565 * with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137.
567 * Time complexity: O(m\lambda + log(p)\alpha + m\beta + m\gamma),
568 * where m = sum of rcounts[], p = comm_size
569 * Memory requirements (per process): 2 * m * typesize + comm_size
571 * Example: comm_size=6, nprocs_pof2=4, nprocs_rem=2, rcounts[]=1, sbuf=[0,1,...,5]
572 * Step 1. Reduce the number of processes to 4
573 * rank 0: [0|1|2|3|4|5]: send to 1: vrank -1
574 * rank 1: [0|1|2|3|4|5]: recv from 0, op: vrank 0: [0|2|4|6|8|10]
575 * rank 2: [0|1|2|3|4|5]: send to 3: vrank -1
576 * rank 3: [0|1|2|3|4|5]: recv from 2, op: vrank 1: [0|2|4|6|8|10]
577 * rank 4: [0|1|2|3|4|5]: vrank 2: [0|1|2|3|4|5]
578 * rank 5: [0|1|2|3|4|5]: vrank 3: [0|1|2|3|4|5]
580 * Step 2. Butterfly. Buffer of 6 elements is divided into 4 blocks.
581 * Round 1 (mask=1, nblocks=2)
583 * 1: vrank 0 [0 2|4 6|8|10]: exch with 1: send [2,3], recv [0,1]: [0 4|8 12|*|*]
585 * 3: vrank 1 [0 2|4 6|8|10]: exch with 0: send [0,1], recv [2,3]: [**|**|16|20]
586 * 4: vrank 2 [0 1|2 3|4|5] : exch with 3: send [2,3], recv [0,1]: [0 2|4 6|*|*]
587 * 5: vrank 3 [0 1|2 3|4|5] : exch with 2: send [0,1], recv [2,3]: [**|**|8|10]
589 * Round 2 (mask=2, nblocks=1)
591 * 1: vrank 0 [0 4|8 12|*|*]: exch with 2: send [1], recv [0]: [0 6|**|*|*]
593 * 3: vrank 1 [**|**|16|20] : exch with 3: send [3], recv [2]: [**|**|24|*]
594 * 4: vrank 2 [0 2|4 6|*|*] : exch with 0: send [0], recv [1]: [**|12 18|*|*]
595 * 5: vrank 3 [**|**|8|10] : exch with 1: send [2], recv [3]: [**|**|*|30]
597 * Step 3. Exchange with remote process according to a mirror permutation:
598 * mperm(0)=0, mperm(1)=2, mperm(2)=1, mperm(3)=3
599 * 0: vrank -1: recv "0" from process 0
600 * 1: vrank 0 [0 6|**|*|*]: send "0" to 0, copy "6" to rbuf (mperm(0)=0)
601 * 2: vrank -1: recv result "12" from process 4
602 * 3: vrank 1 [**|**|24|*]
603 * 4: vrank 2 [**|12 18|*|*]: send "12" to 2, send "18" to 3, recv "24" from 3
604 * 5: vrank 3 [**|**|*|30]: copy "30" to rbuf (mperm(3)=3)
606 int reduce_scatter__ompi_butterfly(
607 const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,
608 MPI_Op op, MPI_Comm comm)
610 char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
611 int *displs = NULL, index;
612 ptrdiff_t span, gap, totalcount, extent;
613 int err = MPI_SUCCESS;
614 int comm_size = comm->size();
615 int rank = comm->rank();
619 XBT_DEBUG("coll:base:reduce_scatter_intra_butterfly: rank %d/%d",
624 displs = (int*)malloc(sizeof(*displs) * comm_size);
625 if (NULL == displs) {
627 goto cleanup_and_return;
630 for (int i = 1; i < comm_size; i++) {
631 displs[i] = displs[i - 1] + rcounts[i - 1];
633 totalcount = displs[comm_size - 1] + rcounts[comm_size - 1];
634 dtype->extent(&gap, &extent);
635 span = extent * totalcount;
636 tmpbuf[0] = (char*)malloc(span);
637 tmpbuf[1] = (char*)malloc(span);
638 if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
640 goto cleanup_and_return;
642 psend = tmpbuf[0] - gap;
643 precv = tmpbuf[1] - gap;
645 if (sbuf != MPI_IN_PLACE) {
646 err = Datatype::copy(sbuf, totalcount, dtype, psend, totalcount, dtype);
647 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
649 err = Datatype::copy(rbuf, totalcount, dtype, psend, totalcount, dtype);
650 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
654 * Step 1. Reduce the number of processes to the nearest lower power of two
655 * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
656 * In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
657 * the input vector to their neighbor (rank + 1) and all the odd ranks recv
658 * the input vector and perform local reduction.
659 * The odd ranks (0 to 2r - 1) contain the reduction with the input
660 * vector on their neighbors (the even ranks). The first r odd
661 * processes and the p - 2r last processes are renumbered from
662 * 0 to 2^{\floor{\log_2 p}} - 1. Even ranks do not participate in the
663 * rest of the algorithm.
666 /* Find nearest power-of-two less than or equal to comm_size */
667 int nprocs_pof2, size;
668 for( nprocs_pof2 = 1, size = comm_size; size > 0; size >>= 1, nprocs_pof2 <<= 1 );
671 nprocs_rem = comm_size - nprocs_pof2;
673 for (log2_size = 0, size = 1; size < nprocs_pof2; ++log2_size, size <<= 1);
675 if (rank < 2 * nprocs_rem) {
676 if ((rank % 2) == 0) {
678 Request::send(psend, totalcount, dtype, rank + 1,
679 COLL_TAG_REDUCE_SCATTER, comm);
680 /* This process does not participate in the rest of the algorithm */
684 Request::recv(precv, totalcount, dtype, rank - 1,
685 COLL_TAG_REDUCE_SCATTER, comm, MPI_STATUS_IGNORE);
686 op->apply(precv, psend, (int*)&totalcount, dtype);
687 /* Adjust rank to be the bottom "remain" ranks */
691 /* Adjust rank to show that the bottom "even remain" ranks dropped out */
692 vrank = rank - nprocs_rem;
697 * Now, psend vector of size totalcount is divided into nprocs_pof2 blocks:
698 * block 0: rcounts[0] and rcounts[1] -- for process 0 and 1
699 * block 1: rcounts[2] and rcounts[3] -- for process 2 and 3
701 * block r-1: rcounts[2*(r-1)] and rcounts[2*(r-1)+1]
702 * block r: rcounts[r+r]
703 * block r+1: rcounts[r+r+1]
705 * block nprocs_pof2 - 1: rcounts[r+nprocs_pof2-1]
707 int nblocks = nprocs_pof2, send_index = 0, recv_index = 0;
708 for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
709 int vpeer = vrank ^ mask;
710 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
713 if ((vrank & mask) == 0) {
714 /* Send the upper half of reduction buffer, recv the lower half */
715 send_index += nblocks;
717 /* Send the upper half of reduction buffer, recv the lower half */
718 recv_index += nblocks;
721 /* Send blocks: [send_index, send_index + nblocks - 1] */
722 int send_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
723 send_index, send_index + nblocks - 1);
724 index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
725 ptrdiff_t sdispl = displs[index];
727 /* Recv blocks: [recv_index, recv_index + nblocks - 1] */
728 int recv_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
729 recv_index, recv_index + nblocks - 1);
730 index = (recv_index < nprocs_rem) ? 2 * recv_index : nprocs_rem + recv_index;
731 ptrdiff_t rdispl = displs[index];
733 Request::sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count,
734 dtype, peer, COLL_TAG_REDUCE_SCATTER,
735 precv + (ptrdiff_t)rdispl * extent, recv_count,
736 dtype, peer, COLL_TAG_REDUCE_SCATTER,
737 comm, MPI_STATUS_IGNORE);
740 /* precv = psend <op> precv */
741 op->apply(psend + (ptrdiff_t)rdispl * extent,
742 precv + (ptrdiff_t)rdispl * extent, &recv_count, dtype);
747 /* psend = precv <op> psend */
748 op->apply(precv + (ptrdiff_t)rdispl * extent,
749 psend + (ptrdiff_t)rdispl * extent, &recv_count, dtype);
751 send_index = recv_index;
754 * psend points to the result block [send_index]
755 * Exchange results with remote process according to a mirror permutation.
757 int vpeer = ompi_mirror_perm(vrank, log2_size);
758 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
759 index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
761 if (vpeer < nprocs_rem) {
763 * Process has two blocks: for excluded process and own.
764 * Send the first block to excluded process.
766 Request::send(psend + (ptrdiff_t)displs[index] * extent,
767 rcounts[index], dtype, peer - 1,
768 COLL_TAG_REDUCE_SCATTER,
772 /* If process has two blocks, then send the second block (own block) */
773 if (vpeer < nprocs_rem)
775 if (vpeer != vrank) {
776 Request::sendrecv(psend + (ptrdiff_t)displs[index] * extent,
777 rcounts[index], dtype, peer,
778 COLL_TAG_REDUCE_SCATTER,
779 rbuf, rcounts[rank], dtype, peer,
780 COLL_TAG_REDUCE_SCATTER,
781 comm, MPI_STATUS_IGNORE);
783 err = Datatype::copy(psend + (ptrdiff_t)displs[rank] * extent, rcounts[rank], dtype,
784 rbuf, rcounts[rank], dtype);
785 if (MPI_SUCCESS != err) { goto cleanup_and_return; }
789 /* Excluded process: receive result */
790 int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size);
791 int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
792 Request::recv(rbuf, rcounts[rank], dtype, peer,
793 COLL_TAG_REDUCE_SCATTER, comm,