Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
missing include
[simgrid.git] / src / smpi / colls / reduce_scatter-ompi.c
1 /* Copyright (c) 2013-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /*
8  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9  *                         University Research and Technology
10  *                         Corporation.  All rights reserved.
11  * Copyright (c) 2004-2012 The University of Tennessee and The University
12  *                         of Tennessee Research Foundation.  All rights
13  *                         reserved.
14  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15  *                         University of Stuttgart.  All rights reserved.
16  * Copyright (c) 2004-2005 The Regents of the University of California.
17  *                         All rights reserved.
18  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
19  * Copyright (c) 2009      University of Houston. All rights reserved.
20  *
21  * Additional copyrights may follow
22  */
23
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
26 #include "smpi/smpi.h"
27 #include "xbt/replay.h"
28
29 /*
30  * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
31  * I have removed the part which handles "large" message sizes 
32  * (non-overlapping version of reduce_Scatter).
33  */
34
35 /* copied function (with appropriate renaming) starts here */
36
37 /*
38  *  reduce_scatter_ompi_basic_recursivehalving
39  *
40  *  Function:   - reduce scatter implementation using recursive-halving 
41  *                algorithm
42  *  Accepts:    - same as MPI_Reduce_scatter()
43  *  Returns:    - MPI_SUCCESS or error code
44  *  Limitation: - Works only for commutative operations.
45  */
46 int
47 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf, 
48                                                             void *rbuf, 
49                                                             int *rcounts,
50                                                             MPI_Datatype dtype,
51                                                             MPI_Op op,
52                                                             MPI_Comm comm
53                                                             )
54 {
55     int i, rank, size, count, err = MPI_SUCCESS;
56     int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
57     ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
58     char *recv_buf = NULL, *recv_buf_free = NULL;
59     char *result_buf = NULL, *result_buf_free = NULL;
60    
61     /* Initialize */
62     rank = smpi_comm_rank(comm);
63     size = smpi_comm_size(comm);
64    
65     XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
66     if(!smpi_op_is_commute(op))
67       THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
68
69     /* Find displacements and the like */
70     disps = (int*) xbt_malloc(sizeof(int) * size);
71     if (NULL == disps) return MPI_ERR_OTHER;
72
73     disps[0] = 0;
74     for (i = 0; i < (size - 1); ++i) {
75         disps[i + 1] = disps[i] + rcounts[i];
76     }
77     count = disps[size - 1] + rcounts[size - 1];
78
79     /* short cut the trivial case */
80     if (0 == count) {
81         xbt_free(disps);
82         return MPI_SUCCESS;
83     }
84
85     /* get datatype information */
86     smpi_datatype_extent(dtype, &lb, &extent);
87     smpi_datatype_extent(dtype, &true_lb, &true_extent);
88     buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
89
90     /* Handle MPI_IN_PLACE */
91     if (MPI_IN_PLACE == sbuf) {
92         sbuf = rbuf;
93     }
94
95     /* Allocate temporary receive buffer. */
96     if(_xbt_replay_is_active()){
97       recv_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
98     }else{
99       recv_buf_free = (char*) xbt_malloc(buf_size);
100     }
101     recv_buf = recv_buf_free - lb;
102     if (NULL == recv_buf_free) {
103         err = MPI_ERR_OTHER;
104         goto cleanup;
105     }
106    
107     /* allocate temporary buffer for results */
108     if(_xbt_replay_is_active()){
109       result_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
110     }else{
111       result_buf_free = (char*) xbt_malloc(buf_size);
112     }
113     result_buf = result_buf_free - lb;
114    
115     /* copy local buffer into the temporary results */
116     err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
117     if (MPI_SUCCESS != err) goto cleanup;
118    
119     /* figure out power of two mapping: grow until larger than
120        comm size, then go back one, to get the largest power of
121        two less than comm size */
122     while (tmp_size <= size) tmp_size <<= 1;
123     tmp_size >>= 1;
124     remain = size - tmp_size;
125    
126     /* If comm size is not a power of two, have the first "remain"
127        procs with an even rank send to rank + 1, leaving a power of
128        two procs to do the rest of the algorithm */
129     if (rank < 2 * remain) {
130         if ((rank & 1) == 0) {
131             smpi_mpi_send(result_buf, count, dtype, rank + 1, 
132                                     COLL_TAG_REDUCE_SCATTER,
133                                     comm);
134             /* we don't participate from here on out */
135             tmp_rank = -1;
136         } else {
137             smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
138                                     COLL_TAG_REDUCE_SCATTER,
139                                     comm, MPI_STATUS_IGNORE);
140          
141             /* integrate their results into our temp results */
142             smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
143          
144             /* adjust rank to be the bottom "remain" ranks */
145             tmp_rank = rank / 2;
146         }
147     } else {
148         /* just need to adjust rank to show that the bottom "even
149            remain" ranks dropped out */
150         tmp_rank = rank - remain;
151     }
152    
153     /* For ranks not kicked out by the above code, perform the
154        recursive halving */
155     if (tmp_rank >= 0) {
156         int *tmp_disps = NULL, *tmp_rcounts = NULL;
157         int mask, send_index, recv_index, last_index;
158       
159         /* recalculate disps and rcounts to account for the
160            special "remainder" processes that are no longer doing
161            anything */
162         tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
163         if (NULL == tmp_rcounts) {
164             err = MPI_ERR_OTHER;
165             goto cleanup;
166         }
167         tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
168         if (NULL == tmp_disps) {
169             xbt_free(tmp_rcounts);
170             err = MPI_ERR_OTHER;
171             goto cleanup;
172         }
173
174         for (i = 0 ; i < tmp_size ; ++i) {
175             if (i < remain) {
176                 /* need to include old neighbor as well */
177                 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
178             } else {
179                 tmp_rcounts[i] = rcounts[i + remain];
180             }
181         }
182
183         tmp_disps[0] = 0;
184         for (i = 0; i < tmp_size - 1; ++i) {
185             tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
186         }
187
188         /* do the recursive halving communication.  Don't use the
189            dimension information on the communicator because I
190            think the information is invalidated by our "shrinking"
191            of the communicator */
192         mask = tmp_size >> 1;
193         send_index = recv_index = 0;
194         last_index = tmp_size;
195         while (mask > 0) {
196             int tmp_peer, peer, send_count, recv_count;
197             MPI_Request request;
198
199             tmp_peer = tmp_rank ^ mask;
200             peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
201
202             /* figure out if we're sending, receiving, or both */
203             send_count = recv_count = 0;
204             if (tmp_rank < tmp_peer) {
205                 send_index = recv_index + mask;
206                 for (i = send_index ; i < last_index ; ++i) {
207                     send_count += tmp_rcounts[i];
208                 }
209                 for (i = recv_index ; i < send_index ; ++i) {
210                     recv_count += tmp_rcounts[i];
211                 }
212             } else {
213                 recv_index = send_index + mask;
214                 for (i = send_index ; i < recv_index ; ++i) {
215                     send_count += tmp_rcounts[i];
216                 }
217                 for (i = recv_index ; i < last_index ; ++i) {
218                     recv_count += tmp_rcounts[i];
219                 }
220             }
221
222             /* actual data transfer.  Send from result_buf,
223                receive into recv_buf */
224             if (send_count > 0 && recv_count != 0) {
225                 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
226                                          recv_count, dtype, peer,
227                                          COLL_TAG_REDUCE_SCATTER,
228                                          comm);
229                 if (MPI_SUCCESS != err) {
230                     xbt_free(tmp_rcounts);
231                     xbt_free(tmp_disps);
232                     goto cleanup;
233                 }                                             
234             }
235             if (recv_count > 0 && send_count != 0) {
236                 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
237                                         send_count, dtype, peer, 
238                                         COLL_TAG_REDUCE_SCATTER,
239                                         comm);
240                 if (MPI_SUCCESS != err) {
241                     xbt_free(tmp_rcounts);
242                     xbt_free(tmp_disps);
243                     goto cleanup;
244                 }                                             
245             }
246             if (send_count > 0 && recv_count != 0) {
247                 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
248             }
249
250             /* if we received something on this step, push it into
251                the results buffer */
252             if (recv_count > 0) {
253                 smpi_op_apply(op, 
254                                recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent, 
255                                result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
256                                &recv_count, &dtype);
257             }
258
259             /* update for next iteration */
260             send_index = recv_index;
261             last_index = recv_index + mask;
262             mask >>= 1;
263         }
264
265         /* copy local results from results buffer into real receive buffer */
266         if (0 != rcounts[rank]) {
267             err = smpi_datatype_copy(result_buf + disps[rank] * extent,
268                                        rcounts[rank], dtype, 
269                                        rbuf, rcounts[rank], dtype);
270             if (MPI_SUCCESS != err) {
271                 xbt_free(tmp_rcounts);
272                 xbt_free(tmp_disps);
273                 goto cleanup;
274             }                                             
275         }
276
277         xbt_free(tmp_rcounts);
278         xbt_free(tmp_disps);
279     }
280
281     /* Now fix up the non-power of two case, by having the odd
282        procs send the even procs the proper results */
283     if (rank < (2 * remain)) {
284         if ((rank & 1) == 0) {
285             if (rcounts[rank]) {
286                 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
287                                         COLL_TAG_REDUCE_SCATTER,
288                                         comm, MPI_STATUS_IGNORE);
289             }
290         } else {
291             if (rcounts[rank - 1]) {
292                 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
293                                         rcounts[rank - 1], dtype, rank - 1,
294                                         COLL_TAG_REDUCE_SCATTER,
295                                         comm);
296             }
297         }            
298     }
299
300  cleanup:
301     if (NULL != disps) xbt_free(disps);
302     
303     if (!_xbt_replay_is_active()){
304       if (NULL != recv_buf_free) xbt_free(recv_buf_free);
305       if (NULL != result_buf_free) xbt_free(result_buf_free);
306     }else{
307       if (NULL != recv_buf_free) SMPI_SHARED_FREE(recv_buf_free);
308       if (NULL != result_buf_free) SMPI_SHARED_FREE(result_buf_free);
309     }
310     return err;
311 }
312
313 /* copied function (with appropriate renaming) ends here */
314
315
316 /*
317  *   smpi_coll_tuned_reduce_scatter_ompi_ring
318  *
319  *   Function:       Ring algorithm for reduce_scatter operation
320  *   Accepts:        Same as MPI_Reduce_scatter()
321  *   Returns:        MPI_SUCCESS or error code
322  *
323  *   Description:    Implements ring algorithm for reduce_scatter: 
324  *                   the block sizes defined in rcounts are exchanged and 
325  8                    updated until they reach proper destination.
326  *                   Algorithm requires 2 * max(rcounts) extra buffering
327  *
328  *   Limitations:    The algorithm DOES NOT preserve order of operations so it 
329  *                   can be used only for commutative operations.
330  *         Example on 5 nodes:
331  *         Initial state
332  *   #      0              1             2              3             4
333  *        [00]           [10]   ->     [20]           [30]           [40]
334  *        [01]           [11]          [21]  ->       [31]           [41]
335  *        [02]           [12]          [22]           [32]  ->       [42]
336  *    ->  [03]           [13]          [23]           [33]           [43] --> ..
337  *        [04]  ->       [14]          [24]           [34]           [44]
338  *
339  *        COMPUTATION PHASE
340  *         Step 0: rank r sends block (r-1) to rank (r+1) and 
341  *                 receives block (r+1) from rank (r-1) [with wraparound].
342  *   #      0              1             2              3             4
343  *        [00]           [10]        [10+20]   ->     [30]           [40]
344  *        [01]           [11]          [21]          [21+31]  ->     [41]
345  *    ->  [02]           [12]          [22]           [32]         [32+42] -->..
346  *      [43+03] ->       [13]          [23]           [33]           [43]
347  *        [04]         [04+14]  ->     [24]           [34]           [44]
348  *         
349  *         Step 1:
350  *   #      0              1             2              3             4
351  *        [00]           [10]        [10+20]       [10+20+30] ->     [40]
352  *    ->  [01]           [11]          [21]          [21+31]      [21+31+41] ->
353  *     [32+42+02] ->     [12]          [22]           [32]         [32+42] 
354  *        [03]        [43+03+13] ->    [23]           [33]           [43]
355  *        [04]         [04+14]      [04+14+24]  ->    [34]           [44]
356  *
357  *         Step 2:
358  *   #      0              1             2              3             4
359  *     -> [00]           [10]        [10+20]       [10+20+30]   [10+20+30+40] ->
360  *   [21+31+41+01]->     [11]          [21]          [21+31]      [21+31+41]
361  *     [32+42+02]   [32+42+02+12]->    [22]           [32]         [32+42] 
362  *        [03]        [43+03+13]   [43+03+13+23]->    [33]           [43]
363  *        [04]         [04+14]      [04+14+24]    [04+14+24+34] ->   [44]
364  *
365  *         Step 3:
366  *   #      0             1              2              3             4
367  * [10+20+30+40+00]     [10]         [10+20]       [10+20+30]   [10+20+30+40]
368  *  [21+31+41+01] [21+31+41+01+11]     [21]          [21+31]      [21+31+41]
369  *    [32+42+02]   [32+42+02+12] [32+42+02+12+22]     [32]         [32+42] 
370  *       [03]        [43+03+13]    [43+03+13+23] [43+03+13+23+33]    [43]
371  *       [04]         [04+14]       [04+14+24]    [04+14+24+34] [04+14+24+34+44]
372  *    DONE :)
373  *
374  */
375 int 
376 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
377                                           MPI_Datatype dtype,
378                                           MPI_Op op,
379                                           MPI_Comm comm
380                                           )
381 {
382     int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
383     int inbi, *displs = NULL;
384     char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
385     char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
386     ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
387     MPI_Request reqs[2] = {NULL, NULL};
388
389     size = smpi_comm_size(comm);
390     rank = smpi_comm_rank(comm);
391
392     XBT_DEBUG(  "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d", 
393                  rank, size);
394
395     /* Determine the maximum number of elements per node, 
396        corresponding block size, and displacements array.
397     */
398     displs = (int*) xbt_malloc(size * sizeof(int));
399     if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
400     displs[0] = 0;
401     total_count = rcounts[0];
402     max_block_count = rcounts[0];
403     for (i = 1; i < size; i++) { 
404         displs[i] = total_count;
405         total_count += rcounts[i];
406         if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
407     }
408       
409     /* Special case for size == 1 */
410     if (1 == size) {
411         if (MPI_IN_PLACE != sbuf) {
412             ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
413             if (ret < 0) { line = __LINE__; goto error_hndl; }
414         }
415         xbt_free(displs);
416         return MPI_SUCCESS;
417     }
418
419     /* Allocate and initialize temporary buffers, we need:
420        - a temporary buffer to perform reduction (size total_count) since
421        rbuf can be of rcounts[rank] size.
422        - up to two temporary buffers used for communication/computation overlap.
423     */
424     smpi_datatype_extent(dtype, &lb, &extent);
425     smpi_datatype_extent(dtype, &true_lb, &true_extent);
426
427     max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
428
429     accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent);
430     if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
431     accumbuf = accumbuf_free - lb;
432
433     inbuf_free[0] = (char*)xbt_malloc(max_real_segsize);
434     if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
435     inbuf[0] = inbuf_free[0] - lb;
436     if (size > 2) {
437         inbuf_free[1] = (char*)xbt_malloc(max_real_segsize);
438         if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
439         inbuf[1] = inbuf_free[1] - lb;
440     }
441
442     /* Handle MPI_IN_PLACE for size > 1 */
443     if (MPI_IN_PLACE == sbuf) {
444         sbuf = rbuf;
445     }
446
447     ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
448     if (ret < 0) { line = __LINE__; goto error_hndl; }
449
450     /* Computation loop */
451
452     /* 
453        For each of the remote nodes:
454        - post irecv for block (r-2) from (r-1) with wrap around
455        - send block (r-1) to (r+1)
456        - in loop for every step k = 2 .. n
457        - post irecv for block (r - 1 + n - k) % n
458        - wait on block (r + n - k) % n to arrive
459        - compute on block (r + n - k ) % n
460        - send block (r + n - k) % n
461        - wait on block (r)
462        - compute on block (r)
463        - copy block (r) to rbuf
464        Note that we must be careful when computing the begining of buffers and
465        for send operations and computation we must compute the exact block size.
466     */
467     send_to = (rank + 1) % size;
468     recv_from = (rank + size - 1) % size;
469
470     inbi = 0;
471     /* Initialize first receive from the neighbor on the left */
472     reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
473                              COLL_TAG_REDUCE_SCATTER, comm
474                              );
475     tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
476     smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
477                             COLL_TAG_REDUCE_SCATTER,
478                              comm);
479
480     for (k = 2; k < size; k++) {
481         const int prevblock = (rank + size - k) % size;
482       
483         inbi = inbi ^ 0x1;
484
485         /* Post irecv for the current block */
486         reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
487                                  COLL_TAG_REDUCE_SCATTER, comm
488                                  );
489       
490         /* Wait on previous block to arrive */
491         smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
492       
493         /* Apply operation on previous block: result goes to rbuf
494            rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
495         */
496         tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
497         smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
498       
499         /* send previous block to send_to */
500         smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
501                                 COLL_TAG_REDUCE_SCATTER,
502                                  comm);
503     }
504
505     /* Wait on the last block to arrive */
506     smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
507
508     /* Apply operation on the last block (my block)
509        rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
510     tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
511     smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
512    
513     /* Copy result from tmprecv to rbuf */
514     ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
515     if (ret < 0) { line = __LINE__; goto error_hndl; }
516
517     if (NULL != displs) xbt_free(displs);
518     if (NULL != accumbuf_free) xbt_free(accumbuf_free);
519     if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
520     if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
521
522     return MPI_SUCCESS;
523
524  error_hndl:
525     XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
526                  __FILE__, line, rank, ret);
527     if (NULL != displs) xbt_free(displs);
528     if (NULL != accumbuf_free) xbt_free(accumbuf_free);
529     if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
530     if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
531     return ret;
532 }
533