1 /* Copyright (c) 2013-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9 * University Research and Technology
10 * Corporation. All rights reserved.
11 * Copyright (c) 2004-2012 The University of Tennessee and The University
12 * of Tennessee Research Foundation. All rights
14 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15 * University of Stuttgart. All rights reserved.
16 * Copyright (c) 2004-2005 The Regents of the University of California.
17 * All rights reserved.
18 * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
19 * Copyright (c) 2009 University of Houston. All rights reserved.
21 * Additional copyrights may follow
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
26 #include "smpi/smpi.h"
27 #include "xbt/replay.h"
30 * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
31 * I have removed the part which handles "large" message sizes
32 * (non-overlapping version of reduce_Scatter).
35 /* copied function (with appropriate renaming) starts here */
38 * reduce_scatter_ompi_basic_recursivehalving
40 * Function: - reduce scatter implementation using recursive-halving
42 * Accepts: - same as MPI_Reduce_scatter()
43 * Returns: - MPI_SUCCESS or error code
44 * Limitation: - Works only for commutative operations.
47 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
55 int i, rank, size, count, err = MPI_SUCCESS;
56 int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
57 ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
58 char *recv_buf = NULL, *recv_buf_free = NULL;
59 char *result_buf = NULL, *result_buf_free = NULL;
62 rank = smpi_comm_rank(comm);
63 size = smpi_comm_size(comm);
65 XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
66 if(!smpi_op_is_commute(op))
67 THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
69 /* Find displacements and the like */
70 disps = (int*) xbt_malloc(sizeof(int) * size);
71 if (NULL == disps) return MPI_ERR_OTHER;
74 for (i = 0; i < (size - 1); ++i) {
75 disps[i + 1] = disps[i] + rcounts[i];
77 count = disps[size - 1] + rcounts[size - 1];
79 /* short cut the trivial case */
85 /* get datatype information */
86 smpi_datatype_extent(dtype, &lb, &extent);
87 smpi_datatype_extent(dtype, &true_lb, &true_extent);
88 buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
90 /* Handle MPI_IN_PLACE */
91 if (MPI_IN_PLACE == sbuf) {
95 /* Allocate temporary receive buffer. */
96 if(_xbt_replay_is_active()){
97 recv_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
99 recv_buf_free = (char*) xbt_malloc(buf_size);
101 recv_buf = recv_buf_free - lb;
102 if (NULL == recv_buf_free) {
107 /* allocate temporary buffer for results */
108 if(_xbt_replay_is_active()){
109 result_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
111 result_buf_free = (char*) xbt_malloc(buf_size);
113 result_buf = result_buf_free - lb;
115 /* copy local buffer into the temporary results */
116 err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
117 if (MPI_SUCCESS != err) goto cleanup;
119 /* figure out power of two mapping: grow until larger than
120 comm size, then go back one, to get the largest power of
121 two less than comm size */
122 while (tmp_size <= size) tmp_size <<= 1;
124 remain = size - tmp_size;
126 /* If comm size is not a power of two, have the first "remain"
127 procs with an even rank send to rank + 1, leaving a power of
128 two procs to do the rest of the algorithm */
129 if (rank < 2 * remain) {
130 if ((rank & 1) == 0) {
131 smpi_mpi_send(result_buf, count, dtype, rank + 1,
132 COLL_TAG_REDUCE_SCATTER,
134 /* we don't participate from here on out */
137 smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
138 COLL_TAG_REDUCE_SCATTER,
139 comm, MPI_STATUS_IGNORE);
141 /* integrate their results into our temp results */
142 smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
144 /* adjust rank to be the bottom "remain" ranks */
148 /* just need to adjust rank to show that the bottom "even
149 remain" ranks dropped out */
150 tmp_rank = rank - remain;
153 /* For ranks not kicked out by the above code, perform the
156 int *tmp_disps = NULL, *tmp_rcounts = NULL;
157 int mask, send_index, recv_index, last_index;
159 /* recalculate disps and rcounts to account for the
160 special "remainder" processes that are no longer doing
162 tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
163 if (NULL == tmp_rcounts) {
167 tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
168 if (NULL == tmp_disps) {
169 xbt_free(tmp_rcounts);
174 for (i = 0 ; i < tmp_size ; ++i) {
176 /* need to include old neighbor as well */
177 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
179 tmp_rcounts[i] = rcounts[i + remain];
184 for (i = 0; i < tmp_size - 1; ++i) {
185 tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
188 /* do the recursive halving communication. Don't use the
189 dimension information on the communicator because I
190 think the information is invalidated by our "shrinking"
191 of the communicator */
192 mask = tmp_size >> 1;
193 send_index = recv_index = 0;
194 last_index = tmp_size;
196 int tmp_peer, peer, send_count, recv_count;
199 tmp_peer = tmp_rank ^ mask;
200 peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
202 /* figure out if we're sending, receiving, or both */
203 send_count = recv_count = 0;
204 if (tmp_rank < tmp_peer) {
205 send_index = recv_index + mask;
206 for (i = send_index ; i < last_index ; ++i) {
207 send_count += tmp_rcounts[i];
209 for (i = recv_index ; i < send_index ; ++i) {
210 recv_count += tmp_rcounts[i];
213 recv_index = send_index + mask;
214 for (i = send_index ; i < recv_index ; ++i) {
215 send_count += tmp_rcounts[i];
217 for (i = recv_index ; i < last_index ; ++i) {
218 recv_count += tmp_rcounts[i];
222 /* actual data transfer. Send from result_buf,
223 receive into recv_buf */
224 if (send_count > 0 && recv_count != 0) {
225 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
226 recv_count, dtype, peer,
227 COLL_TAG_REDUCE_SCATTER,
229 if (MPI_SUCCESS != err) {
230 xbt_free(tmp_rcounts);
235 if (recv_count > 0 && send_count != 0) {
236 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
237 send_count, dtype, peer,
238 COLL_TAG_REDUCE_SCATTER,
240 if (MPI_SUCCESS != err) {
241 xbt_free(tmp_rcounts);
246 if (send_count > 0 && recv_count != 0) {
247 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
250 /* if we received something on this step, push it into
251 the results buffer */
252 if (recv_count > 0) {
254 recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
255 result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
256 &recv_count, &dtype);
259 /* update for next iteration */
260 send_index = recv_index;
261 last_index = recv_index + mask;
265 /* copy local results from results buffer into real receive buffer */
266 if (0 != rcounts[rank]) {
267 err = smpi_datatype_copy(result_buf + disps[rank] * extent,
268 rcounts[rank], dtype,
269 rbuf, rcounts[rank], dtype);
270 if (MPI_SUCCESS != err) {
271 xbt_free(tmp_rcounts);
277 xbt_free(tmp_rcounts);
281 /* Now fix up the non-power of two case, by having the odd
282 procs send the even procs the proper results */
283 if (rank < (2 * remain)) {
284 if ((rank & 1) == 0) {
286 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
287 COLL_TAG_REDUCE_SCATTER,
288 comm, MPI_STATUS_IGNORE);
291 if (rcounts[rank - 1]) {
292 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
293 rcounts[rank - 1], dtype, rank - 1,
294 COLL_TAG_REDUCE_SCATTER,
301 if (NULL != disps) xbt_free(disps);
303 if (!_xbt_replay_is_active()){
304 if (NULL != recv_buf_free) xbt_free(recv_buf_free);
305 if (NULL != result_buf_free) xbt_free(result_buf_free);
307 if (NULL != recv_buf_free) SMPI_SHARED_FREE(recv_buf_free);
308 if (NULL != result_buf_free) SMPI_SHARED_FREE(result_buf_free);
313 /* copied function (with appropriate renaming) ends here */
317 * smpi_coll_tuned_reduce_scatter_ompi_ring
319 * Function: Ring algorithm for reduce_scatter operation
320 * Accepts: Same as MPI_Reduce_scatter()
321 * Returns: MPI_SUCCESS or error code
323 * Description: Implements ring algorithm for reduce_scatter:
324 * the block sizes defined in rcounts are exchanged and
325 8 updated until they reach proper destination.
326 * Algorithm requires 2 * max(rcounts) extra buffering
328 * Limitations: The algorithm DOES NOT preserve order of operations so it
329 * can be used only for commutative operations.
330 * Example on 5 nodes:
333 * [00] [10] -> [20] [30] [40]
334 * [01] [11] [21] -> [31] [41]
335 * [02] [12] [22] [32] -> [42]
336 * -> [03] [13] [23] [33] [43] --> ..
337 * [04] -> [14] [24] [34] [44]
340 * Step 0: rank r sends block (r-1) to rank (r+1) and
341 * receives block (r+1) from rank (r-1) [with wraparound].
343 * [00] [10] [10+20] -> [30] [40]
344 * [01] [11] [21] [21+31] -> [41]
345 * -> [02] [12] [22] [32] [32+42] -->..
346 * [43+03] -> [13] [23] [33] [43]
347 * [04] [04+14] -> [24] [34] [44]
351 * [00] [10] [10+20] [10+20+30] -> [40]
352 * -> [01] [11] [21] [21+31] [21+31+41] ->
353 * [32+42+02] -> [12] [22] [32] [32+42]
354 * [03] [43+03+13] -> [23] [33] [43]
355 * [04] [04+14] [04+14+24] -> [34] [44]
359 * -> [00] [10] [10+20] [10+20+30] [10+20+30+40] ->
360 * [21+31+41+01]-> [11] [21] [21+31] [21+31+41]
361 * [32+42+02] [32+42+02+12]-> [22] [32] [32+42]
362 * [03] [43+03+13] [43+03+13+23]-> [33] [43]
363 * [04] [04+14] [04+14+24] [04+14+24+34] -> [44]
367 * [10+20+30+40+00] [10] [10+20] [10+20+30] [10+20+30+40]
368 * [21+31+41+01] [21+31+41+01+11] [21] [21+31] [21+31+41]
369 * [32+42+02] [32+42+02+12] [32+42+02+12+22] [32] [32+42]
370 * [03] [43+03+13] [43+03+13+23] [43+03+13+23+33] [43]
371 * [04] [04+14] [04+14+24] [04+14+24+34] [04+14+24+34+44]
376 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
382 int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
383 int inbi, *displs = NULL;
384 char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
385 char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
386 ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
387 MPI_Request reqs[2] = {NULL, NULL};
389 size = smpi_comm_size(comm);
390 rank = smpi_comm_rank(comm);
392 XBT_DEBUG( "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d",
395 /* Determine the maximum number of elements per node,
396 corresponding block size, and displacements array.
398 displs = (int*) xbt_malloc(size * sizeof(int));
399 if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
401 total_count = rcounts[0];
402 max_block_count = rcounts[0];
403 for (i = 1; i < size; i++) {
404 displs[i] = total_count;
405 total_count += rcounts[i];
406 if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
409 /* Special case for size == 1 */
411 if (MPI_IN_PLACE != sbuf) {
412 ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
413 if (ret < 0) { line = __LINE__; goto error_hndl; }
419 /* Allocate and initialize temporary buffers, we need:
420 - a temporary buffer to perform reduction (size total_count) since
421 rbuf can be of rcounts[rank] size.
422 - up to two temporary buffers used for communication/computation overlap.
424 smpi_datatype_extent(dtype, &lb, &extent);
425 smpi_datatype_extent(dtype, &true_lb, &true_extent);
427 max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
429 accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent);
430 if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
431 accumbuf = accumbuf_free - lb;
433 inbuf_free[0] = (char*)xbt_malloc(max_real_segsize);
434 if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
435 inbuf[0] = inbuf_free[0] - lb;
437 inbuf_free[1] = (char*)xbt_malloc(max_real_segsize);
438 if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
439 inbuf[1] = inbuf_free[1] - lb;
442 /* Handle MPI_IN_PLACE for size > 1 */
443 if (MPI_IN_PLACE == sbuf) {
447 ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
448 if (ret < 0) { line = __LINE__; goto error_hndl; }
450 /* Computation loop */
453 For each of the remote nodes:
454 - post irecv for block (r-2) from (r-1) with wrap around
455 - send block (r-1) to (r+1)
456 - in loop for every step k = 2 .. n
457 - post irecv for block (r - 1 + n - k) % n
458 - wait on block (r + n - k) % n to arrive
459 - compute on block (r + n - k ) % n
460 - send block (r + n - k) % n
462 - compute on block (r)
463 - copy block (r) to rbuf
464 Note that we must be careful when computing the begining of buffers and
465 for send operations and computation we must compute the exact block size.
467 send_to = (rank + 1) % size;
468 recv_from = (rank + size - 1) % size;
471 /* Initialize first receive from the neighbor on the left */
472 reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
473 COLL_TAG_REDUCE_SCATTER, comm
475 tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
476 smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
477 COLL_TAG_REDUCE_SCATTER,
480 for (k = 2; k < size; k++) {
481 const int prevblock = (rank + size - k) % size;
485 /* Post irecv for the current block */
486 reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
487 COLL_TAG_REDUCE_SCATTER, comm
490 /* Wait on previous block to arrive */
491 smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
493 /* Apply operation on previous block: result goes to rbuf
494 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
496 tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
497 smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
499 /* send previous block to send_to */
500 smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
501 COLL_TAG_REDUCE_SCATTER,
505 /* Wait on the last block to arrive */
506 smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
508 /* Apply operation on the last block (my block)
509 rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
510 tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
511 smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
513 /* Copy result from tmprecv to rbuf */
514 ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
515 if (ret < 0) { line = __LINE__; goto error_hndl; }
517 if (NULL != displs) xbt_free(displs);
518 if (NULL != accumbuf_free) xbt_free(accumbuf_free);
519 if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
520 if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
525 XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
526 __FILE__, line, rank, ret);
527 if (NULL != displs) xbt_free(displs);
528 if (NULL != accumbuf_free) xbt_free(accumbuf_free);
529 if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
530 if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);