Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
8e9efebb81642b030b74cf697f2427c23f5381b8
[simgrid.git] / src / smpi / colls / reduce_scatter / reduce_scatter-ompi.cpp
1 /* Copyright (c) 2013-2022. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /*
8  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9  *                         University Research and Technology
10  *                         Corporation.  All rights reserved.
11  * Copyright (c) 2004-2012 The University of Tennessee and The University
12  *                         of Tennessee Research Foundation.  All rights
13  *                         reserved.
14  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15  *                         University of Stuttgart.  All rights reserved.
16  * Copyright (c) 2004-2005 The Regents of the University of California.
17  *                         All rights reserved.
18  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
19  * Copyright (c) 2009      University of Houston. All rights reserved.
20  *
21  * Additional copyrights may follow
22  */
23
24 #include "../coll_tuned_topo.hpp"
25 #include "../colls_private.hpp"
26
27 /*
28  * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
29  * I have removed the part which handles "large" message sizes
30  * (non-overlapping version of reduce_Scatter).
31  */
32
33 /* copied function (with appropriate renaming) starts here */
34
35 /*
36  *  reduce_scatter_ompi_basic_recursivehalving
37  *
38  *  Function:   - reduce scatter implementation using recursive-halving
39  *                algorithm
40  *  Accepts:    - same as MPI_Reduce_scatter()
41  *  Returns:    - MPI_SUCCESS or error code
42  *  Limitation: - Works only for commutative operations.
43  */
44 namespace simgrid {
45 namespace smpi {
46 int reduce_scatter__ompi_basic_recursivehalving(const void *sbuf,
47                                                 void *rbuf,
48                                                 const int *rcounts,
49                                                 MPI_Datatype dtype,
50                                                 MPI_Op op,
51                                                 MPI_Comm comm
52                                                 )
53 {
54     int i, rank, size, count, err = MPI_SUCCESS;
55     int tmp_size = 1, remain = 0, tmp_rank;
56     ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
57     unsigned char *result_buf = nullptr, *result_buf_free = nullptr;
58
59     /* Initialize */
60     rank = comm->rank();
61     size = comm->size();
62
63     XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
64     if ((op != MPI_OP_NULL && not op->is_commutative()))
65       throw std::invalid_argument(
66           "reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations!");
67
68     /* Find displacements and the like */
69     int* disps = new int[size];
70
71     disps[0] = 0;
72     for (i = 0; i < (size - 1); ++i) {
73         disps[i + 1] = disps[i] + rcounts[i];
74     }
75     count = disps[size - 1] + rcounts[size - 1];
76
77     /* short cut the trivial case */
78     if (0 == count) {
79       delete[] disps;
80       return MPI_SUCCESS;
81     }
82
83     /* get datatype information */
84     dtype->extent(&lb, &extent);
85     dtype->extent(&true_lb, &true_extent);
86     buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
87
88     /* Handle MPI_IN_PLACE */
89     if (MPI_IN_PLACE == sbuf) {
90         sbuf = rbuf;
91     }
92
93     /* Allocate temporary receive buffer. */
94     unsigned char* recv_buf_free = smpi_get_tmp_recvbuffer(buf_size);
95     unsigned char* recv_buf      = recv_buf_free - lb;
96     if (nullptr == recv_buf_free) {
97       err = MPI_ERR_OTHER;
98       goto cleanup;
99     }
100
101     /* allocate temporary buffer for results */
102     result_buf_free = smpi_get_tmp_sendbuffer(buf_size);
103     result_buf = result_buf_free - lb;
104
105     /* copy local buffer into the temporary results */
106     err =Datatype::copy(sbuf, count, dtype, result_buf, count, dtype);
107     if (MPI_SUCCESS != err) goto cleanup;
108
109     /* figure out power of two mapping: grow until larger than
110        comm size, then go back one, to get the largest power of
111        two less than comm size */
112     while (tmp_size <= size) tmp_size <<= 1;
113     tmp_size >>= 1;
114     remain = size - tmp_size;
115
116     /* If comm size is not a power of two, have the first "remain"
117        procs with an even rank send to rank + 1, leaving a power of
118        two procs to do the rest of the algorithm */
119     if (rank < 2 * remain) {
120         if ((rank & 1) == 0) {
121             Request::send(result_buf, count, dtype, rank + 1,
122                                     COLL_TAG_REDUCE_SCATTER,
123                                     comm);
124             /* we don't participate from here on out */
125             tmp_rank = -1;
126         } else {
127             Request::recv(recv_buf, count, dtype, rank - 1,
128                                     COLL_TAG_REDUCE_SCATTER,
129                                     comm, MPI_STATUS_IGNORE);
130
131             /* integrate their results into our temp results */
132             if(op!=MPI_OP_NULL) op->apply( recv_buf, result_buf, &count, dtype);
133
134             /* adjust rank to be the bottom "remain" ranks */
135             tmp_rank = rank / 2;
136         }
137     } else {
138         /* just need to adjust rank to show that the bottom "even
139            remain" ranks dropped out */
140         tmp_rank = rank - remain;
141     }
142
143     /* For ranks not kicked out by the above code, perform the
144        recursive halving */
145     if (tmp_rank >= 0) {
146         int mask, send_index, recv_index, last_index;
147
148         /* recalculate disps and rcounts to account for the
149            special "remainder" processes that are no longer doing
150            anything */
151         int* tmp_rcounts = new int[tmp_size];
152         int* tmp_disps   = new int[tmp_size];
153
154         for (i = 0 ; i < tmp_size ; ++i) {
155             if (i < remain) {
156                 /* need to include old neighbor as well */
157                 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
158             } else {
159                 tmp_rcounts[i] = rcounts[i + remain];
160             }
161         }
162
163         tmp_disps[0] = 0;
164         for (i = 0; i < tmp_size - 1; ++i) {
165             tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
166         }
167
168         /* do the recursive halving communication.  Don't use the
169            dimension information on the communicator because I
170            think the information is invalidated by our "shrinking"
171            of the communicator */
172         mask = tmp_size >> 1;
173         send_index = recv_index = 0;
174         last_index = tmp_size;
175         while (mask > 0) {
176             int tmp_peer, peer, send_count, recv_count;
177             MPI_Request request;
178
179             tmp_peer = tmp_rank ^ mask;
180             peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
181
182             /* figure out if we're sending, receiving, or both */
183             send_count = recv_count = 0;
184             if (tmp_rank < tmp_peer) {
185                 send_index = recv_index + mask;
186                 for (i = send_index ; i < last_index ; ++i) {
187                     send_count += tmp_rcounts[i];
188                 }
189                 for (i = recv_index ; i < send_index ; ++i) {
190                     recv_count += tmp_rcounts[i];
191                 }
192             } else {
193                 recv_index = send_index + mask;
194                 for (i = send_index ; i < recv_index ; ++i) {
195                     send_count += tmp_rcounts[i];
196                 }
197                 for (i = recv_index ; i < last_index ; ++i) {
198                     recv_count += tmp_rcounts[i];
199                 }
200             }
201
202             /* actual data transfer.  Send from result_buf,
203                receive into recv_buf */
204             if (send_count > 0 && recv_count != 0) {
205                 request=Request::irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
206                                          recv_count, dtype, peer,
207                                          COLL_TAG_REDUCE_SCATTER,
208                                          comm);
209                 if (MPI_SUCCESS != err) {
210                   delete[] tmp_rcounts;
211                   delete[] tmp_disps;
212                   goto cleanup;
213                 }
214             }
215             if (recv_count > 0 && send_count != 0) {
216                 Request::send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
217                                         send_count, dtype, peer,
218                                         COLL_TAG_REDUCE_SCATTER,
219                                         comm);
220                 if (MPI_SUCCESS != err) {
221                   delete[] tmp_rcounts;
222                   delete[] tmp_disps;
223                   goto cleanup;
224                 }
225             }
226             if (send_count > 0 && recv_count != 0) {
227                 Request::wait(&request, MPI_STATUS_IGNORE);
228             }
229
230             /* if we received something on this step, push it into
231                the results buffer */
232             if (recv_count > 0) {
233                 if(op!=MPI_OP_NULL) op->apply(
234                                recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
235                                result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
236                                &recv_count, dtype);
237             }
238
239             /* update for next iteration */
240             send_index = recv_index;
241             last_index = recv_index + mask;
242             mask >>= 1;
243         }
244
245         /* copy local results from results buffer into real receive buffer */
246         if (0 != rcounts[rank]) {
247             err = Datatype::copy(result_buf + disps[rank] * extent,
248                                        rcounts[rank], dtype,
249                                        rbuf, rcounts[rank], dtype);
250             if (MPI_SUCCESS != err) {
251               delete[] tmp_rcounts;
252               delete[] tmp_disps;
253               goto cleanup;
254             }
255         }
256
257         delete[] tmp_rcounts;
258         delete[] tmp_disps;
259     }
260
261     /* Now fix up the non-power of two case, by having the odd
262        procs send the even procs the proper results */
263     if (rank < (2 * remain)) {
264         if ((rank & 1) == 0) {
265             if (rcounts[rank]) {
266                 Request::recv(rbuf, rcounts[rank], dtype, rank + 1,
267                                         COLL_TAG_REDUCE_SCATTER,
268                                         comm, MPI_STATUS_IGNORE);
269             }
270         } else {
271             if (rcounts[rank - 1]) {
272                 Request::send(result_buf + disps[rank - 1] * extent,
273                                         rcounts[rank - 1], dtype, rank - 1,
274                                         COLL_TAG_REDUCE_SCATTER,
275                                         comm);
276             }
277         }
278     }
279
280  cleanup:
281     delete[] disps;
282     if (nullptr != recv_buf_free)
283       smpi_free_tmp_buffer(recv_buf_free);
284     if (nullptr != result_buf_free)
285       smpi_free_tmp_buffer(result_buf_free);
286
287     return err;
288 }
289
290 /* copied function (with appropriate renaming) ends here */
291
292
293 /*
294  *   Coll_reduce_scatter_ompi_ring::reduce_scatter
295  *
296  *   Function:       Ring algorithm for reduce_scatter operation
297  *   Accepts:        Same as MPI_Reduce_scatter()
298  *   Returns:        MPI_SUCCESS or error code
299  *
300  *   Description:    Implements ring algorithm for reduce_scatter:
301  *                   the block sizes defined in rcounts are exchanged and
302  8                    updated until they reach proper destination.
303  *                   Algorithm requires 2 * max(rcounts) extra buffering
304  *
305  *   Limitations:    The algorithm DOES NOT preserve order of operations so it
306  *                   can be used only for commutative operations.
307  *         Example on 5 nodes:
308  *         Initial state
309  *   #      0              1             2              3             4
310  *        [00]           [10]   ->     [20]           [30]           [40]
311  *        [01]           [11]          [21]  ->       [31]           [41]
312  *        [02]           [12]          [22]           [32]  ->       [42]
313  *    ->  [03]           [13]          [23]           [33]           [43] --> ..
314  *        [04]  ->       [14]          [24]           [34]           [44]
315  *
316  *        COMPUTATION PHASE
317  *         Step 0: rank r sends block (r-1) to rank (r+1) and
318  *                 receives block (r+1) from rank (r-1) [with wraparound].
319  *   #      0              1             2              3             4
320  *        [00]           [10]        [10+20]   ->     [30]           [40]
321  *        [01]           [11]          [21]          [21+31]  ->     [41]
322  *    ->  [02]           [12]          [22]           [32]         [32+42] -->..
323  *      [43+03] ->       [13]          [23]           [33]           [43]
324  *        [04]         [04+14]  ->     [24]           [34]           [44]
325  *
326  *         Step 1:
327  *   #      0              1             2              3             4
328  *        [00]           [10]        [10+20]       [10+20+30] ->     [40]
329  *    ->  [01]           [11]          [21]          [21+31]      [21+31+41] ->
330  *     [32+42+02] ->     [12]          [22]           [32]         [32+42]
331  *        [03]        [43+03+13] ->    [23]           [33]           [43]
332  *        [04]         [04+14]      [04+14+24]  ->    [34]           [44]
333  *
334  *         Step 2:
335  *   #      0              1             2              3             4
336  *     -> [00]           [10]        [10+20]       [10+20+30]   [10+20+30+40] ->
337  *   [21+31+41+01]->     [11]          [21]          [21+31]      [21+31+41]
338  *     [32+42+02]   [32+42+02+12]->    [22]           [32]         [32+42]
339  *        [03]        [43+03+13]   [43+03+13+23]->    [33]           [43]
340  *        [04]         [04+14]      [04+14+24]    [04+14+24+34] ->   [44]
341  *
342  *         Step 3:
343  *   #      0             1              2              3             4
344  * [10+20+30+40+00]     [10]         [10+20]       [10+20+30]   [10+20+30+40]
345  *  [21+31+41+01] [21+31+41+01+11]     [21]          [21+31]      [21+31+41]
346  *    [32+42+02]   [32+42+02+12] [32+42+02+12+22]     [32]         [32+42]
347  *       [03]        [43+03+13]    [43+03+13+23] [43+03+13+23+33]    [43]
348  *       [04]         [04+14]       [04+14+24]    [04+14+24+34] [04+14+24+34+44]
349  *    DONE :)
350  *
351  */
352 int reduce_scatter__ompi_ring(const void *sbuf, void *rbuf, const int *rcounts,
353                               MPI_Datatype dtype,
354                               MPI_Op op,
355                               MPI_Comm comm
356                               )
357 {
358     int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
359     int inbi;
360     unsigned char *tmpsend = nullptr, *tmprecv = nullptr, *accumbuf = nullptr, *accumbuf_free = nullptr;
361     unsigned char *inbuf_free[2] = {nullptr, nullptr}, *inbuf[2] = {nullptr, nullptr};
362     ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
363     MPI_Request reqs[2] = {nullptr, nullptr};
364
365     size = comm->size();
366     rank = comm->rank();
367
368     XBT_DEBUG(  "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d",
369                  rank, size);
370
371     /* Determine the maximum number of elements per node,
372        corresponding block size, and displacements array.
373     */
374     int* displs = new int[size];
375
376     displs[0] = 0;
377     total_count = rcounts[0];
378     max_block_count = rcounts[0];
379     for (i = 1; i < size; i++) {
380         displs[i] = total_count;
381         total_count += rcounts[i];
382         if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
383     }
384
385     /* Special case for size == 1 */
386     if (1 == size) {
387         if (MPI_IN_PLACE != sbuf) {
388             ret = Datatype::copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
389             if (ret < 0) { line = __LINE__; goto error_hndl; }
390         }
391         delete[] displs;
392         return MPI_SUCCESS;
393     }
394
395     /* Allocate and initialize temporary buffers, we need:
396        - a temporary buffer to perform reduction (size total_count) since
397        rbuf can be of rcounts[rank] size.
398        - up to two temporary buffers used for communication/computation overlap.
399     */
400     dtype->extent(&lb, &extent);
401     dtype->extent(&true_lb, &true_extent);
402
403     max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
404
405     accumbuf_free = smpi_get_tmp_recvbuffer(true_extent + (ptrdiff_t)(total_count - 1) * extent);
406     if (nullptr == accumbuf_free) {
407       ret  = -1;
408       line = __LINE__;
409       goto error_hndl;
410     }
411     accumbuf = accumbuf_free - lb;
412
413     inbuf_free[0] = smpi_get_tmp_sendbuffer(max_real_segsize);
414     if (nullptr == inbuf_free[0]) {
415       ret  = -1;
416       line = __LINE__;
417       goto error_hndl;
418     }
419     inbuf[0] = inbuf_free[0] - lb;
420     if (size > 2) {
421       inbuf_free[1] = smpi_get_tmp_sendbuffer(max_real_segsize);
422       if (nullptr == inbuf_free[1]) {
423         ret  = -1;
424         line = __LINE__;
425         goto error_hndl;
426       }
427       inbuf[1] = inbuf_free[1] - lb;
428     }
429
430     /* Handle MPI_IN_PLACE for size > 1 */
431     if (MPI_IN_PLACE == sbuf) {
432         sbuf = rbuf;
433     }
434
435     ret = Datatype::copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
436     if (ret < 0) { line = __LINE__; goto error_hndl; }
437
438     /* Computation loop */
439
440     /*
441        For each of the remote nodes:
442        - post irecv for block (r-2) from (r-1) with wrap around
443        - send block (r-1) to (r+1)
444        - in loop for every step k = 2 .. n
445        - post irecv for block (r - 1 + n - k) % n
446        - wait on block (r + n - k) % n to arrive
447        - compute on block (r + n - k ) % n
448        - send block (r + n - k) % n
449        - wait on block (r)
450        - compute on block (r)
451        - copy block (r) to rbuf
452        Note that we must be careful when computing the beginning of buffers and
453        for send operations and computation we must compute the exact block size.
454     */
455     send_to = (rank + 1) % size;
456     recv_from = (rank + size - 1) % size;
457
458     inbi = 0;
459     /* Initialize first receive from the neighbor on the left */
460     reqs[inbi]=Request::irecv(inbuf[inbi], max_block_count, dtype, recv_from,
461                              COLL_TAG_REDUCE_SCATTER, comm
462                              );
463     tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
464     Request::send(tmpsend, rcounts[recv_from], dtype, send_to,
465                             COLL_TAG_REDUCE_SCATTER,
466                              comm);
467
468     for (k = 2; k < size; k++) {
469         const int prevblock = (rank + size - k) % size;
470
471         inbi = inbi ^ 0x1;
472
473         /* Post irecv for the current block */
474         reqs[inbi]=Request::irecv(inbuf[inbi], max_block_count, dtype, recv_from,
475                                  COLL_TAG_REDUCE_SCATTER, comm
476                                  );
477
478         /* Wait on previous block to arrive */
479         Request::wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
480
481         /* Apply operation on previous block: result goes to rbuf
482            rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
483         */
484         tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
485         if (op != MPI_OP_NULL)
486           op->apply(inbuf[inbi ^ 0x1], tmprecv, &rcounts[prevblock], dtype);
487
488         /* send previous block to send_to */
489         Request::send(tmprecv, rcounts[prevblock], dtype, send_to,
490                                 COLL_TAG_REDUCE_SCATTER,
491                                  comm);
492     }
493
494     /* Wait on the last block to arrive */
495     Request::wait(&reqs[inbi], MPI_STATUS_IGNORE);
496
497     /* Apply operation on the last block (my block)
498        rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
499     tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
500     if (op != MPI_OP_NULL)
501       op->apply(inbuf[inbi], tmprecv, &rcounts[rank], dtype);
502
503     /* Copy result from tmprecv to rbuf */
504     ret = Datatype::copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
505     if (ret < 0) { line = __LINE__; goto error_hndl; }
506
507     delete[] displs;
508     if (nullptr != accumbuf_free)
509       smpi_free_tmp_buffer(accumbuf_free);
510     if (nullptr != inbuf_free[0])
511       smpi_free_tmp_buffer(inbuf_free[0]);
512     if (nullptr != inbuf_free[1])
513       smpi_free_tmp_buffer(inbuf_free[1]);
514
515     return MPI_SUCCESS;
516
517  error_hndl:
518     XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
519                  __FILE__, line, rank, ret);
520     delete[] displs;
521     if (nullptr != accumbuf_free)
522       smpi_free_tmp_buffer(accumbuf_free);
523     if (nullptr != inbuf_free[0])
524       smpi_free_tmp_buffer(inbuf_free[0]);
525     if (nullptr != inbuf_free[1])
526       smpi_free_tmp_buffer(inbuf_free[1]);
527     return ret;
528 }
529
530 static int ompi_sum_counts(const int *counts, int *displs, int nprocs_rem, int lo, int hi)
531 {
532     /* Adjust lo and hi for taking into account blocks of excluded processes */
533     lo = (lo < nprocs_rem) ? lo * 2 : lo + nprocs_rem;
534     hi = (hi < nprocs_rem) ? hi * 2 + 1 : hi + nprocs_rem;
535     return displs[hi] + counts[hi] - displs[lo];
536 }
537
538 /*
539  * ompi_mirror_perm: Returns mirror permutation of nbits low-order bits
540  *                   of x [*].
541  * [*] Warren Jr., Henry S. Hacker's Delight (2ed). 2013.
542  *     Chapter 7. Rearranging Bits and Bytes.
543  */
544 static unsigned int ompi_mirror_perm(unsigned int x, int nbits)
545 {
546     x = (((x & 0xaaaaaaaa) >> 1) | ((x & 0x55555555) << 1));
547     x = (((x & 0xcccccccc) >> 2) | ((x & 0x33333333) << 2));
548     x = (((x & 0xf0f0f0f0) >> 4) | ((x & 0x0f0f0f0f) << 4));
549     x = (((x & 0xff00ff00) >> 8) | ((x & 0x00ff00ff) << 8));
550     x = ((x >> 16) | (x << 16));
551     return x >> (sizeof(x) * 8 - nbits);
552 }
553 /*
554  * ompi_coll_base_reduce_scatter_intra_butterfly
555  *
556  * Function:  Butterfly algorithm for reduce_scatter
557  * Accepts:   Same as MPI_Reduce_scatter
558  * Returns:   MPI_SUCCESS or error code
559  *
560  * Description:  Implements butterfly algorithm for MPI_Reduce_scatter [*].
561  *               The algorithm can be used both by commutative and non-commutative
562  *               operations, for power-of-two and non-power-of-two number of processes.
563  *
564  * [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter
565  *     with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137.
566  *
567  * Time complexity: O(m\lambda + log(p)\alpha + m\beta + m\gamma),
568  *   where m = sum of rcounts[], p = comm_size
569  * Memory requirements (per process): 2 * m * typesize + comm_size
570  *
571  * Example: comm_size=6, nprocs_pof2=4, nprocs_rem=2, rcounts[]=1, sbuf=[0,1,...,5]
572  * Step 1. Reduce the number of processes to 4
573  * rank 0: [0|1|2|3|4|5]: send to 1: vrank -1
574  * rank 1: [0|1|2|3|4|5]: recv from 0, op: vrank 0: [0|2|4|6|8|10]
575  * rank 2: [0|1|2|3|4|5]: send to 3: vrank -1
576  * rank 3: [0|1|2|3|4|5]: recv from 2, op: vrank 1: [0|2|4|6|8|10]
577  * rank 4: [0|1|2|3|4|5]: vrank 2: [0|1|2|3|4|5]
578  * rank 5: [0|1|2|3|4|5]: vrank 3: [0|1|2|3|4|5]
579  *
580  * Step 2. Butterfly. Buffer of 6 elements is divided into 4 blocks.
581  * Round 1 (mask=1, nblocks=2)
582  * 0: vrank -1
583  * 1: vrank  0 [0 2|4 6|8|10]: exch with 1: send [2,3], recv [0,1]: [0 4|8 12|*|*]
584  * 2: vrank -1
585  * 3: vrank  1 [0 2|4 6|8|10]: exch with 0: send [0,1], recv [2,3]: [**|**|16|20]
586  * 4: vrank  2 [0 1|2 3|4|5] : exch with 3: send [2,3], recv [0,1]: [0 2|4 6|*|*]
587  * 5: vrank  3 [0 1|2 3|4|5] : exch with 2: send [0,1], recv [2,3]: [**|**|8|10]
588  *
589  * Round 2 (mask=2, nblocks=1)
590  * 0: vrank -1
591  * 1: vrank  0 [0 4|8 12|*|*]: exch with 2: send [1], recv [0]: [0 6|**|*|*]
592  * 2: vrank -1
593  * 3: vrank  1 [**|**|16|20] : exch with 3: send [3], recv [2]: [**|**|24|*]
594  * 4: vrank  2 [0 2|4 6|*|*] : exch with 0: send [0], recv [1]: [**|12 18|*|*]
595  * 5: vrank  3 [**|**|8|10]  : exch with 1: send [2], recv [3]: [**|**|*|30]
596  *
597  * Step 3. Exchange with remote process according to a mirror permutation:
598  *         mperm(0)=0, mperm(1)=2, mperm(2)=1, mperm(3)=3
599  * 0: vrank -1: recv "0" from process 0
600  * 1: vrank  0 [0 6|**|*|*]: send "0" to 0, copy "6" to rbuf (mperm(0)=0)
601  * 2: vrank -1: recv result "12" from process 4
602  * 3: vrank  1 [**|**|24|*]
603  * 4: vrank  2 [**|12 18|*|*]: send "12" to 2, send "18" to 3, recv "24" from 3
604  * 5: vrank  3 [**|**|*|30]: copy "30" to rbuf (mperm(3)=3)
605  */
606 int reduce_scatter__ompi_butterfly(
607     const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,
608     MPI_Op op, MPI_Comm comm)
609 {
610     char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
611     int *displs = NULL, index;
612     ptrdiff_t span, gap, totalcount, extent;
613     int err = MPI_SUCCESS;
614     int comm_size = comm->size();
615     int rank = comm->rank();
616     int vrank = -1;
617     int nprocs_rem = 0;
618
619     XBT_DEBUG("coll:base:reduce_scatter_intra_butterfly: rank %d/%d",
620                  rank, comm_size);
621     if (comm_size < 2)
622         return MPI_SUCCESS;
623
624     displs = (int*)malloc(sizeof(*displs) * comm_size);
625     if (NULL == displs) {
626         err = MPI_ERR_OTHER;
627         goto cleanup_and_return;
628     }
629     displs[0] = 0;
630     for (int i = 1; i < comm_size; i++) {
631         displs[i] = displs[i - 1] + rcounts[i - 1];
632     }
633     totalcount = displs[comm_size - 1] + rcounts[comm_size - 1];
634     dtype->extent(&gap, &extent);
635     span = extent * totalcount;
636     tmpbuf[0] = (char*)malloc(span);
637     tmpbuf[1] = (char*)malloc(span);
638     if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
639         err = MPI_ERR_OTHER;
640         goto cleanup_and_return;
641     }
642     psend = tmpbuf[0] - gap;
643     precv = tmpbuf[1] - gap;
644
645     if (sbuf != MPI_IN_PLACE) {
646         err = Datatype::copy(sbuf, totalcount, dtype, psend, totalcount, dtype);
647         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
648     } else {
649         err = Datatype::copy(rbuf, totalcount, dtype, psend, totalcount, dtype);
650         if (MPI_SUCCESS != err) { goto cleanup_and_return; }
651     }
652
653     /*
654      * Step 1. Reduce the number of processes to the nearest lower power of two
655      * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
656      * In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
657      * the input vector to their neighbor (rank + 1) and all the odd ranks recv
658      * the input vector and perform local reduction.
659      * The odd ranks (0 to 2r - 1) contain the reduction with the input
660      * vector on their neighbors (the even ranks). The first r odd
661      * processes and the p - 2r last processes are renumbered from
662      * 0 to 2^{\floor{\log_2 p}} - 1. Even ranks do not participate in the
663      * rest of the algorithm.
664      */
665
666     /* Find nearest power-of-two less than or equal to comm_size */
667     int nprocs_pof2, size;
668     for( nprocs_pof2 = 1, size = comm_size; size > 0; size >>= 1, nprocs_pof2 <<= 1 );
669     nprocs_pof2 >>= 1;
670
671     nprocs_rem = comm_size - nprocs_pof2;
672     int log2_size;
673     for (log2_size = 0, size = 1; size < nprocs_pof2; ++log2_size, size <<= 1);
674
675     if (rank < 2 * nprocs_rem) {
676         if ((rank % 2) == 0) {
677             /* Even process */
678             Request::send(psend, totalcount, dtype, rank + 1,
679                                     COLL_TAG_REDUCE_SCATTER, comm);
680             /* This process does not participate in the rest of the algorithm */
681             vrank = -1;
682         } else {
683             /* Odd process */
684             Request::recv(precv, totalcount, dtype, rank - 1,
685                                     COLL_TAG_REDUCE_SCATTER, comm, MPI_STATUS_IGNORE);
686             op->apply(precv, psend, (int*)&totalcount, dtype);
687             /* Adjust rank to be the bottom "remain" ranks */
688             vrank = rank / 2;
689         }
690     } else {
691         /* Adjust rank to show that the bottom "even remain" ranks dropped out */
692         vrank = rank - nprocs_rem;
693     }
694
695     if (vrank != -1) {
696         /*
697          * Now, psend vector of size totalcount is divided into nprocs_pof2 blocks:
698          * block 0:   rcounts[0] and rcounts[1] -- for process 0 and 1
699          * block 1:   rcounts[2] and rcounts[3] -- for process 2 and 3
700          * ...
701          * block r-1: rcounts[2*(r-1)] and rcounts[2*(r-1)+1]
702          * block r:   rcounts[r+r]
703          * block r+1: rcounts[r+r+1]
704          * ...
705          * block nprocs_pof2 - 1: rcounts[r+nprocs_pof2-1]
706          */
707         int nblocks = nprocs_pof2, send_index = 0, recv_index = 0;
708         for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
709             int vpeer = vrank ^ mask;
710             int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
711
712             nblocks /= 2;
713             if ((vrank & mask) == 0) {
714                 /* Send the upper half of reduction buffer, recv the lower half */
715                 send_index += nblocks;
716             } else {
717                 /* Send the upper half of reduction buffer, recv the lower half */
718                 recv_index += nblocks;
719             }
720
721             /* Send blocks: [send_index, send_index + nblocks - 1] */
722             int send_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
723                                              send_index, send_index + nblocks - 1);
724             index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
725             ptrdiff_t sdispl = displs[index];
726
727             /* Recv blocks: [recv_index, recv_index + nblocks - 1] */
728             int recv_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
729                                              recv_index, recv_index + nblocks - 1);
730             index = (recv_index < nprocs_rem) ? 2 * recv_index : nprocs_rem + recv_index;
731             ptrdiff_t rdispl = displs[index];
732
733             Request::sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count,
734                                           dtype, peer, COLL_TAG_REDUCE_SCATTER,
735                                           precv + (ptrdiff_t)rdispl * extent, recv_count,
736                                           dtype, peer, COLL_TAG_REDUCE_SCATTER,
737                                           comm, MPI_STATUS_IGNORE);
738
739             if (vrank < vpeer) {
740                 /* precv = psend <op> precv */
741                 op->apply(psend + (ptrdiff_t)rdispl * extent,
742                                precv + (ptrdiff_t)rdispl * extent, &recv_count, dtype);
743                 char *p = psend;
744                 psend = precv;
745                 precv = p;
746             } else {
747                 /* psend = precv <op> psend */
748                 op->apply(precv + (ptrdiff_t)rdispl * extent,
749                                psend + (ptrdiff_t)rdispl * extent, &recv_count, dtype);
750             }
751             send_index = recv_index;
752         }
753         /*
754          * psend points to the result block [send_index]
755          * Exchange results with remote process according to a mirror permutation.
756          */
757         int vpeer = ompi_mirror_perm(vrank, log2_size);
758         int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
759         index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
760
761         if (vpeer < nprocs_rem) {
762             /*
763              * Process has two blocks: for excluded process and own.
764              * Send the first block to excluded process.
765              */
766             Request::send(psend + (ptrdiff_t)displs[index] * extent,
767                                     rcounts[index], dtype, peer - 1,
768                                     COLL_TAG_REDUCE_SCATTER,
769                                     comm);
770         }
771
772         /* If process has two blocks, then send the second block (own block) */
773         if (vpeer < nprocs_rem)
774             index++;
775         if (vpeer != vrank) {
776             Request::sendrecv(psend + (ptrdiff_t)displs[index] * extent,
777                                           rcounts[index], dtype, peer,
778                                           COLL_TAG_REDUCE_SCATTER,
779                                           rbuf, rcounts[rank], dtype, peer,
780                                           COLL_TAG_REDUCE_SCATTER,
781                                           comm, MPI_STATUS_IGNORE);
782         } else {
783             err = Datatype::copy(psend + (ptrdiff_t)displs[rank] * extent, rcounts[rank], dtype,
784                                  rbuf, rcounts[rank], dtype);
785             if (MPI_SUCCESS != err) { goto cleanup_and_return; }
786         }
787
788     } else {
789         /* Excluded process: receive result */
790         int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size);
791         int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
792         Request::recv(rbuf, rcounts[rank], dtype, peer,
793                                 COLL_TAG_REDUCE_SCATTER, comm,
794                                 MPI_STATUS_IGNORE);
795     }
796
797 cleanup_and_return:
798     if (displs)
799         free(displs);
800     if (tmpbuf[0])
801         free(tmpbuf[0]);
802     if (tmpbuf[1])
803         free(tmpbuf[1]);
804     return err;
805 }
806 }
807 }