1 /* Copyright (c) 2013-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "colls_private.h"
8 /* IMPLEMENTED BY PITCH PATARASUK
9 Non-topoloty-specific (however, number of cores/node need to be changed)
10 all-reduce operation designed for smp clusters
11 It uses 2-layer communication: binomial for intra-communication
12 and rdb for inter-communication*/
14 /* change number of core per smp-node
15 we assume that number of core per process will be the same for all implementations */
21 Use -DMPICH2 if this code does not compile.
22 MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
23 This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
26 //#include <star-reduction.c>
29 This fucntion performs all-reduce operation as follow.
30 1) binomial_tree reduce inside each SMP node
31 2) Recursive doubling intra-communication between root of each SMP node
32 3) binomial_tree bcast inside each SMP node
34 int smpi_coll_tuned_allreduce_smp_rdb(void *send_buf, void *recv_buf, int count,
35 MPI_Datatype dtype, MPI_Op op,
40 int tag = COLL_TAG_ALLREDUCE;
43 int num_core = simcall_host_get_core(SIMIX_host_self());
44 // do we use the default one or the number of cores in the platform ?
45 // if the number of cores is one, the platform may be simulated with 1 node = 1 core
46 if (num_core == 1) num_core = NUM_CORE;
48 #ifdef MPICH2_REDUCTION
49 MPI_User_function * uop = MPIR_Op_table[op % 16 - 1];
51 MPI_User_function *uop;
52 struct MPIR_OP *op_ptr;
53 op_ptr = MPIR_ToPointer(op);
57 comm_size = smpi_comm_size(comm);
58 rank = smpi_comm_rank(comm);
60 extent = smpi_datatype_get_extent(dtype);
61 tmp_buf = (void *) xbt_malloc(count * extent);
63 /* compute intra and inter ranking */
64 int intra_rank, inter_rank;
65 intra_rank = rank % num_core;
66 inter_rank = rank / num_core;
68 /* size of processes participate in intra communications =>
69 should be equal to number of machines */
70 int inter_comm_size = (comm_size + num_core - 1) / num_core;
72 /* copy input buffer to output buffer */
73 smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
74 recv_buf, count, dtype, rank, tag, comm, &status);
76 /* start binomial reduce intra communication inside each SMP node */
78 while (mask < num_core) {
79 if ((mask & intra_rank) == 0) {
80 src = (inter_rank * num_core) + (intra_rank | mask);
81 if (src < comm_size) {
82 smpi_mpi_recv(tmp_buf, count, dtype, src, tag, comm, &status);
83 smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
86 dst = (inter_rank * num_core) + (intra_rank & (~mask));
87 smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
91 } /* end binomial reduce intra-communication */
94 /* start rdb (recursive doubling) all-reduce inter-communication
95 between each SMP nodes : each node only have one process that can communicate
97 if (intra_rank == 0) {
99 /* find nearest power-of-two less than or equal to inter_comm_size */
100 int pof2, rem, newrank, newdst;
102 while (pof2 <= inter_comm_size)
105 rem = inter_comm_size - pof2;
107 /* In the non-power-of-two case, all even-numbered
108 processes of rank < 2*rem send their data to
109 (rank+1). These even-numbered processes no longer
110 participate in the algorithm until the very end.
112 if (inter_rank < 2 * rem) {
113 if (inter_rank % 2 == 0) {
114 dst = rank + num_core;
115 smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
118 src = rank - num_core;
119 smpi_mpi_recv(tmp_buf, count, dtype, src, tag, comm, &status);
120 smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
121 newrank = inter_rank / 2;
124 newrank = inter_rank - rem;
127 /* example inter-communication RDB rank change algorithm
128 0,4,8,12..36 <= true rank (assume 4 core per SMP)
129 0123 4567 89 <= inter_rank
130 1 3 4567 89 (1,3 got data from 0,2 : 0,2 will be idle until the end)
132 0 1 2345 67 => newrank
137 while (mask < pof2) {
138 newdst = newrank ^ mask;
139 /* find real rank of dest */
140 dst = (newdst < rem) ? newdst * 2 + 1 : newdst + rem;
143 /* exchange data in rdb manner */
144 smpi_mpi_sendrecv(recv_buf, count, dtype, dst, tag, tmp_buf, count, dtype,
145 dst, tag, comm, &status);
146 smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
152 left-over processes (all even ranks: < 2 * rem) get the result
154 if (inter_rank < 2 * rem) {
155 if (inter_rank % 2) {
156 smpi_mpi_send(recv_buf, count, dtype, rank - num_core, tag, comm);
158 smpi_mpi_recv(recv_buf, count, dtype, rank + num_core, tag, comm, &status);
163 /* start binomial broadcast intra-communication inside each SMP nodes */
164 int num_core_in_current_smp = num_core;
165 if (inter_rank == (inter_comm_size - 1)) {
166 num_core_in_current_smp = comm_size - (inter_rank * num_core);
169 while (mask < num_core_in_current_smp) {
170 if (intra_rank & mask) {
171 src = (inter_rank * num_core) + (intra_rank - mask);
172 smpi_mpi_recv(recv_buf, count, dtype, src, tag, comm, &status);
180 dst = (inter_rank * num_core) + (intra_rank + mask);
181 if (dst < comm_size) {
182 smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);