2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2009 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2009 University of Houston. All rights reserved.
15 * Additional copyrights may follow
18 * Redistribution and use in source and binary forms, with or without
19 * modification, are permitted provided that the following conditions are
22 * - Redistributions of source code must retain the above copyright
23 * notice, this list of conditions and the following disclaimer.
25 * - Redistributions in binary form must reproduce the above copyright
26 * notice, this list of conditions and the following disclaimer listed
27 * in this license in the documentation and/or other materials
28 * provided with the distribution.
30 * - Neither the name of the copyright holders nor the names of its
31 * contributors may be used to endorse or promote products derived from
32 * this software without specific prior written permission.
34 * The copyright holders provide no reassurances that the source code
35 * provided does not infringe any patent, copyright, or any other
36 * intellectual property rights of third parties. The copyright holders
37 * disclaim any liability to any recipient for claims brought against
38 * recipient by any third party for infringement of that parties
39 * intellectual property rights.
41 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
42 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
43 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
44 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
45 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
46 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
47 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
48 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
49 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
50 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
51 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
58 * ompi_coll_tuned_allreduce_intra_ring_segmented
60 * Function: Pipelined ring algorithm for allreduce operation
61 * Accepts: Same as MPI_Allreduce(), segment size
62 * Returns: MPI_SUCCESS or error code
64 * Description: Implements pipelined ring algorithm for allreduce:
65 * user supplies suggested segment size for the pipelining of
67 * The segment size determines the number of phases, np, for
68 * the algorithm execution.
69 * The message is automatically divided into blocks of
70 * approximately (count / (np * segcount)) elements.
71 * At the end of reduction phase, allgather like step is
73 * Algorithm requires (np + 1)*(N - 1) steps.
75 * Limitations: The algorithm DOES NOT preserve order of operations so it
76 * can be used only for commutative operations.
77 * In addition, algorithm cannot work if the total size is
78 * less than size * segment size.
79 * Example on 3 nodes with 2 phases
89 * COMPUTATION PHASE 0 (a)
90 * Step 0: rank r sends block ra to rank (r+1) and receives bloc (r-1)a
91 * from rank (r-1) [with wraparound].
93 * [00a] [00a+10a] [20a]
95 * [01a] [11a] [11a+21a]
97 * [22a+02a] [12a] [22a]
100 * Step 1: rank r sends block (r-1)a to rank (r+1) and receives bloc
101 * (r-2)a from rank (r-1) [with wraparound].
103 * [00a] [00a+10a] [00a+10a+20a]
105 * [11a+21a+01a] [11a] [11a+21a]
107 * [22a+02a] [22a+02a+12a] [22a]
110 * COMPUTATION PHASE 1 (b)
111 * Step 0: rank r sends block rb to rank (r+1) and receives bloc (r-1)b
112 * from rank (r-1) [with wraparound].
114 * [00a] [00a+10a] [20a]
115 * [00b] [00b+10b] [20b]
116 * [01a] [11a] [11a+21a]
117 * [01b] [11b] [11b+21b]
118 * [22a+02a] [12a] [22a]
119 * [22b+02b] [12b] [22b]
121 * Step 1: rank r sends block (r-1)b to rank (r+1) and receives bloc
122 * (r-2)b from rank (r-1) [with wraparound].
124 * [00a] [00a+10a] [00a+10a+20a]
125 * [00b] [10b] [0bb+10b+20b]
126 * [11a+21a+01a] [11a] [11a+21a]
127 * [11b+21b+01b] [11b] [21b]
128 * [22a+02a] [22a+02a+12a] [22a]
129 * [02b] [22b+01b+12b] [22b]
132 * DISTRIBUTION PHASE: ring ALLGATHER with ranks shifted by 1 (same as
133 * in regular ring algorithm.
137 #define COLL_TUNED_COMPUTED_SEGCOUNT(SEGSIZE, TYPELNG, SEGCOUNT) \
138 if( ((SEGSIZE) >= (TYPELNG)) && \
139 ((SEGSIZE) < ((TYPELNG) * (SEGCOUNT))) ) { \
141 (SEGCOUNT) = (int)((SEGSIZE) / (TYPELNG)); \
142 residual = (SEGSIZE) - (SEGCOUNT) * (TYPELNG); \
143 if( residual > ((TYPELNG) >> 1) ) \
147 #define COLL_TUNED_COMPUTE_BLOCKCOUNT( COUNT, NUM_BLOCKS, SPLIT_INDEX, \
148 EARLY_BLOCK_COUNT, LATE_BLOCK_COUNT ) \
149 EARLY_BLOCK_COUNT = LATE_BLOCK_COUNT = COUNT / NUM_BLOCKS; \
150 SPLIT_INDEX = COUNT % NUM_BLOCKS; \
151 if (0 != SPLIT_INDEX) { \
152 EARLY_BLOCK_COUNT = EARLY_BLOCK_COUNT + 1; \
155 #include "colls_private.h"
157 smpi_coll_tuned_allreduce_ompi_ring_segmented(void *sbuf, void *rbuf, int count,
162 int ret = MPI_SUCCESS;
164 int rank, size, k, recv_from, send_to;
165 int early_blockcount, late_blockcount, split_rank;
166 int segcount, max_segcount;
167 int num_phases, phase;
168 int block_count, inbi;
170 char *tmpsend = NULL, *tmprecv = NULL;
171 char *inbuf[2] = {NULL, NULL};
172 ptrdiff_t true_extent, extent;
173 ptrdiff_t block_offset, max_real_segsize;
174 MPI_Request reqs[2] = {NULL, NULL};
175 const size_t segsize = 1 << 20; /* 1 MB */
176 size = smpi_comm_size(comm);
177 rank = smpi_comm_rank(comm);
179 XBT_DEBUG("coll:tuned:allreduce_intra_ring_segmented rank %d, count %d", rank, count);
181 /* Special case for size == 1 */
183 if (MPI_IN_PLACE != sbuf) {
184 ret= smpi_datatype_copy(sbuf, count, dtype,rbuf, count, dtype);
185 if (ret < 0) { line = __LINE__; goto error_hndl; }
190 /* Determine segment count based on the suggested segment size */
191 extent = smpi_datatype_get_extent(dtype);
192 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
193 true_extent = smpi_datatype_get_extent(dtype);
194 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
195 typelng = smpi_datatype_size(dtype);
196 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
198 COLL_TUNED_COMPUTED_SEGCOUNT(segsize, typelng, segcount)
200 /* Special case for count less than size * segcount - use regular ring */
201 if (count < size * segcount) {
202 XBT_DEBUG( "coll:tuned:allreduce_ring_segmented rank %d/%d, count %d, switching to regular ring", rank, size, count);
203 return (smpi_coll_tuned_allreduce_lr(sbuf, rbuf, count, dtype, op,
207 /* Determine the number of phases of the algorithm */
208 num_phases = count / (size * segcount);
209 if ((count % (size * segcount) >= size) &&
210 (count % (size * segcount) > ((size * segcount) / 2))) {
214 /* Determine the number of elements per block and corresponding
216 The blocks are divided into "early" and "late" ones:
217 blocks 0 .. (split_rank - 1) are "early" and
218 blocks (split_rank) .. (size - 1) are "late".
219 Early blocks are at most 1 element larger than the late ones.
220 Note, these blocks will be split into num_phases segments,
221 out of the largest one will have max_segcount elements.
223 COLL_TUNED_COMPUTE_BLOCKCOUNT( count, size, split_rank,
224 early_blockcount, late_blockcount )
225 COLL_TUNED_COMPUTE_BLOCKCOUNT( early_blockcount, num_phases, inbi,
227 max_real_segsize = true_extent + (max_segcount - 1) * extent;
229 /* Allocate and initialize temporary buffers */
230 inbuf[0] = (char*)malloc(max_real_segsize);
231 if (NULL == inbuf[0]) { ret = -1; line = __LINE__; goto error_hndl; }
233 inbuf[1] = (char*)malloc(max_real_segsize);
234 if (NULL == inbuf[1]) { ret = -1; line = __LINE__; goto error_hndl; }
237 /* Handle MPI_IN_PLACE */
238 if (MPI_IN_PLACE != sbuf) {
239 ret= smpi_datatype_copy(sbuf, count, dtype,rbuf, count, dtype);
240 if (ret < 0) { line = __LINE__; goto error_hndl; }
243 /* Computation loop: for each phase, repeat ring allreduce computation loop */
244 for (phase = 0; phase < num_phases; phase ++) {
245 ptrdiff_t phase_offset;
246 int early_phase_segcount, late_phase_segcount, split_phase, phase_count;
249 For each of the remote nodes:
250 - post irecv for block (r-1)
252 To do this, first compute block offset and count, and use block offset
253 to compute phase offset.
254 - in loop for every step k = 2 .. n
255 - post irecv for block (r + n - k) % n
256 - wait on block (r + n - k + 1) % n to arrive
257 - compute on block (r + n - k + 1) % n
258 - send block (r + n - k + 1) % n
259 - wait on block (r + 1)
260 - compute on block (r + 1)
261 - send block (r + 1) to rank (r + 1)
262 Note that we must be careful when computing the begining of buffers and
263 for send operations and computation we must compute the exact block size.
265 send_to = (rank + 1) % size;
266 recv_from = (rank + size - 1) % size;
269 /* Initialize first receive from the neighbor on the left */
270 reqs[inbi] = smpi_mpi_irecv(inbuf[inbi], max_segcount, dtype, recv_from,
272 /* Send first block (my block) to the neighbor on the right:
273 - compute my block and phase offset
275 block_offset = ((rank < split_rank)?
276 (rank * early_blockcount) :
277 (rank * late_blockcount + split_rank));
278 block_count = ((rank < split_rank)? early_blockcount : late_blockcount);
279 COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
280 early_phase_segcount, late_phase_segcount)
281 phase_count = ((phase < split_phase)?
282 (early_phase_segcount) : (late_phase_segcount));
283 phase_offset = ((phase < split_phase)?
284 (phase * early_phase_segcount) :
285 (phase * late_phase_segcount + split_phase));
286 tmpsend = ((char*)rbuf) + (block_offset + phase_offset) * extent;
287 smpi_mpi_send(tmpsend, phase_count, dtype, send_to,
290 for (k = 2; k < size; k++) {
291 const int prevblock = (rank + size - k + 1) % size;
295 /* Post irecv for the current block */
296 reqs[inbi] = smpi_mpi_irecv(inbuf[inbi], max_segcount, dtype, recv_from,
298 if (MPI_SUCCESS != ret) { line = __LINE__; goto error_hndl; }
300 /* Wait on previous block to arrive */
301 smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
303 /* Apply operation on previous block: result goes to rbuf
304 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
306 block_offset = ((prevblock < split_rank)?
307 (prevblock * early_blockcount) :
308 (prevblock * late_blockcount + split_rank));
309 block_count = ((prevblock < split_rank)?
310 early_blockcount : late_blockcount);
311 COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
312 early_phase_segcount, late_phase_segcount)
313 phase_count = ((phase < split_phase)?
314 (early_phase_segcount) : (late_phase_segcount));
315 phase_offset = ((phase < split_phase)?
316 (phase * early_phase_segcount) :
317 (phase * late_phase_segcount + split_phase));
318 tmprecv = ((char*)rbuf) + (block_offset + phase_offset) * extent;
319 smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &phase_count, &dtype);
320 /* send previous block to send_to */
321 smpi_mpi_send(tmprecv, phase_count, dtype, send_to,
325 /* Wait on the last block to arrive */
326 smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
329 /* Apply operation on the last block (from neighbor (rank + 1)
330 rbuf[rank+1] = inbuf[inbi] (op) rbuf[rank + 1] */
331 recv_from = (rank + 1) % size;
332 block_offset = ((recv_from < split_rank)?
333 (recv_from * early_blockcount) :
334 (recv_from * late_blockcount + split_rank));
335 block_count = ((recv_from < split_rank)?
336 early_blockcount : late_blockcount);
337 COLL_TUNED_COMPUTE_BLOCKCOUNT(block_count, num_phases, split_phase,
338 early_phase_segcount, late_phase_segcount)
339 phase_count = ((phase < split_phase)?
340 (early_phase_segcount) : (late_phase_segcount));
341 phase_offset = ((phase < split_phase)?
342 (phase * early_phase_segcount) :
343 (phase * late_phase_segcount + split_phase));
344 tmprecv = ((char*)rbuf) + (block_offset + phase_offset) * extent;
345 smpi_op_apply(op, inbuf[inbi], tmprecv, &phase_count, &dtype);
348 /* Distribution loop - variation of ring allgather */
349 send_to = (rank + 1) % size;
350 recv_from = (rank + size - 1) % size;
351 for (k = 0; k < size - 1; k++) {
352 const int recv_data_from = (rank + size - k) % size;
353 const int send_data_from = (rank + 1 + size - k) % size;
354 const int send_block_offset =
355 ((send_data_from < split_rank)?
356 (send_data_from * early_blockcount) :
357 (send_data_from * late_blockcount + split_rank));
358 const int recv_block_offset =
359 ((recv_data_from < split_rank)?
360 (recv_data_from * early_blockcount) :
361 (recv_data_from * late_blockcount + split_rank));
362 block_count = ((send_data_from < split_rank)?
363 early_blockcount : late_blockcount);
365 tmprecv = (char*)rbuf + recv_block_offset * extent;
366 tmpsend = (char*)rbuf + send_block_offset * extent;
368 smpi_mpi_sendrecv(tmpsend, block_count, dtype, send_to,
370 tmprecv, early_blockcount, dtype, recv_from,
372 comm, MPI_STATUS_IGNORE);
376 if (NULL != inbuf[0]) free(inbuf[0]);
377 if (NULL != inbuf[1]) free(inbuf[1]);
382 XBT_DEBUG("%s:%4d\tRank %d Error occurred %d\n",
383 __FILE__, line, rank, ret);
384 if (NULL != inbuf[0]) free(inbuf[0]);
385 if (NULL != inbuf[1]) free(inbuf[1]);