1 #include "colls_private.h"
2 /* IMPLEMENTED BY PITCH PATARASUK
3 Non-topoloty-specific all-reduce operation designed bandwidth optimally */
6 Use -DMPICH2_REDUCTION if this code does not compile.
7 MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
8 This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
11 //#include <star-reduction.c>
14 smpi_coll_tuned_allreduce_NTS(void *sbuf, void *rbuf, int rcount,
15 MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
17 int tag = COLL_TAG_ALLREDUCE;
19 int rank, i, size, count;
20 int send_offset, recv_offset;
21 int remainder, remainder_flag, remainder_offset;
23 rank = smpi_comm_rank(MPI_COMM_WORLD);
24 size = smpi_comm_size(MPI_COMM_WORLD);
26 /* make it compatible with all data type */
28 extent = smpi_datatype_get_extent(dtype);
30 /* when communication size is smaller than number of process (not support) */
32 return mpi_coll_allreduce_fun(sbuf, rbuf, rcount, dtype, op, comm);
35 /* when communication size is not divisible by number of process:
36 call the native implementation for the remain chunk at the end of the operation */
37 if (rcount % size != 0) {
38 remainder = rcount % size;
40 remainder_offset = (rcount / size) * size * extent;
42 remainder = remainder_flag = remainder_offset = 0;
45 /* size of each point-to-point communication is equal to the size of the whole message
46 divided by number of processes
48 count = rcount / size;
50 /* our ALL-REDUCE implementation
51 1. copy (partial of)send_buf to recv_buf
52 2. use logical ring reduce-scatter
53 3. use logical ring all-gather
57 send_offset = ((rank - 1 + size) % size) * count * extent;
58 recv_offset = ((rank - 1 + size) % size) * count * extent;
59 smpi_mpi_sendrecv((char *) sbuf + send_offset, count, dtype, rank, tag - 1,
60 (char *) rbuf + recv_offset, count, dtype, rank, tag - 1, comm,
64 for (i = 0; i < (size - 1); i++) {
65 send_offset = ((rank - 1 - i + size) % size) * count * extent;
66 recv_offset = ((rank - 2 - i + size) % size) * count * extent;
67 smpi_mpi_sendrecv((char *) rbuf + send_offset, count, dtype, ((rank + 1) % size),
68 tag + i, (char *) rbuf + recv_offset, count, dtype,
69 ((rank + size - 1) % size), tag + i, comm, &status);
71 // compute result to rbuf+recv_offset
72 smpi_op_apply(op, (char *)sbuf + recv_offset, (char *)rbuf + recv_offset, &count, &dtype);
76 for (i = 0; i < (size - 1); i++) {
77 send_offset = ((rank - i + size) % size) * count * extent;
78 recv_offset = ((rank - 1 - i + size) % size) * count * extent;
79 smpi_mpi_sendrecv((char *) rbuf + send_offset, count, dtype, ((rank + 1) % size),
80 tag + i, (char *) rbuf + recv_offset, count, dtype,
81 ((rank + size - 1) % size), tag + i, comm, &status);
84 /* when communication size is not divisible by number of process:
85 call the native implementation for the remain chunk at the end of the operation */
87 XBT_WARN("MPI_allreduce_NTS use default MPI_allreduce.");
88 smpi_mpi_allreduce((char *) sbuf + remainder_offset,
89 (char *) rbuf + remainder_offset, remainder, dtype, op,