Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
9ecb998b2b19632d079bc3c09ba7a6041a1a48d0
[simgrid.git] / src / smpi / colls / smpi_mvapich2_selector.cpp
1 /* selector for collective algorithms based on mvapich decision logic */
2
3 /* Copyright (c) 2009-2019. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "colls_private.hpp"
10
11 #include "smpi_mvapich2_selector_stampede.hpp"
12
13 namespace simgrid{
14 namespace smpi{
15
16
17 int Coll_alltoall_mvapich2::alltoall( void *sendbuf, int sendcount,
18     MPI_Datatype sendtype,
19     void* recvbuf, int recvcount,
20     MPI_Datatype recvtype,
21     MPI_Comm comm)
22 {
23
24   if(mv2_alltoall_table_ppn_conf==NULL)
25     init_mv2_alltoall_tables_stampede();
26
27   int sendtype_size, recvtype_size, comm_size;
28   char * tmp_buf = NULL;
29   int mpi_errno=MPI_SUCCESS;
30   int range = 0;
31   int range_threshold = 0;
32   int conf_index = 0;
33   comm_size =  comm->size();
34
35   sendtype_size=sendtype->size();
36   recvtype_size=recvtype->size();
37   long nbytes = sendtype_size * sendcount;
38
39   /* check if safe to use partial subscription mode */
40
41   /* Search for the corresponding system size inside the tuning table */
42   while ((range < (mv2_size_alltoall_tuning_table[conf_index] - 1)) &&
43       (comm_size > mv2_alltoall_thresholds_table[conf_index][range].numproc)) {
44       range++;
45   }
46   /* Search for corresponding inter-leader function */
47   while ((range_threshold < (mv2_alltoall_thresholds_table[conf_index][range].size_table - 1))
48       && (nbytes >
49   mv2_alltoall_thresholds_table[conf_index][range].algo_table[range_threshold].max)
50   && (mv2_alltoall_thresholds_table[conf_index][range].algo_table[range_threshold].max != -1)) {
51       range_threshold++;
52   }
53   MV2_Alltoall_function = mv2_alltoall_thresholds_table[conf_index][range].algo_table[range_threshold]
54                                                                                       .MV2_pt_Alltoall_function;
55
56   if(sendbuf != MPI_IN_PLACE) {
57       mpi_errno = MV2_Alltoall_function(sendbuf, sendcount, sendtype,
58           recvbuf, recvcount, recvtype,
59           comm);
60   } else {
61       range_threshold = 0;
62       if(nbytes <
63           mv2_alltoall_thresholds_table[conf_index][range].in_place_algo_table[range_threshold].min
64           ||nbytes > mv2_alltoall_thresholds_table[conf_index][range].in_place_algo_table[range_threshold].max
65       ) {
66           tmp_buf = (char *)smpi_get_tmp_sendbuffer( comm_size * recvcount * recvtype_size );
67           Datatype::copy((char *)recvbuf,
68               comm_size*recvcount, recvtype,
69               (char *)tmp_buf,
70               comm_size*recvcount, recvtype);
71
72           mpi_errno = MV2_Alltoall_function(tmp_buf, recvcount, recvtype,
73               recvbuf, recvcount, recvtype,
74               comm );
75           smpi_free_tmp_buffer(tmp_buf);
76       } else {
77           mpi_errno = MPIR_Alltoall_inplace_MV2(sendbuf, sendcount, sendtype,
78               recvbuf, recvcount, recvtype,
79               comm );
80       }
81   }
82
83
84   return (mpi_errno);
85 }
86
87 int Coll_allgather_mvapich2::allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
88     void *recvbuf, int recvcount, MPI_Datatype recvtype,
89     MPI_Comm comm)
90 {
91
92   int mpi_errno = MPI_SUCCESS;
93   long nbytes = 0, comm_size, recvtype_size;
94   int range = 0;
95   int partial_sub_ok = 0;
96   int conf_index = 0;
97   int range_threshold = 0;
98   int is_two_level = 0;
99   MPI_Comm shmem_comm;
100   //MPI_Comm *shmem_commptr=NULL;
101   /* Get the size of the communicator */
102   comm_size = comm->size();
103   recvtype_size=recvtype->size();
104   nbytes = recvtype_size * recvcount;
105
106   if(mv2_allgather_table_ppn_conf==NULL)
107     init_mv2_allgather_tables_stampede();
108
109   if(comm->get_leaders_comm()==MPI_COMM_NULL){
110     comm->init_smp();
111   }
112
113   if (comm->is_uniform()){
114     shmem_comm = comm->get_intra_comm();
115     int local_size = shmem_comm->size();
116     int i          = 0;
117     if (mv2_allgather_table_ppn_conf[0] == -1) {
118       // Indicating user defined tuning
119       conf_index = 0;
120       goto conf_check_end;
121     }
122     do {
123       if (local_size == mv2_allgather_table_ppn_conf[i]) {
124         conf_index = i;
125         partial_sub_ok = 1;
126         break;
127       }
128       i++;
129     } while(i < mv2_allgather_num_ppn_conf);
130   }
131   conf_check_end:
132   if (partial_sub_ok != 1) {
133     conf_index = 0;
134   }
135
136   /* Search for the corresponding system size inside the tuning table */
137   while ((range < (mv2_size_allgather_tuning_table[conf_index] - 1)) &&
138       (comm_size >
139   mv2_allgather_thresholds_table[conf_index][range].numproc)) {
140       range++;
141   }
142   /* Search for corresponding inter-leader function */
143   while ((range_threshold <
144       (mv2_allgather_thresholds_table[conf_index][range].size_inter_table - 1))
145       && (nbytes > mv2_allgather_thresholds_table[conf_index][range].inter_leader[range_threshold].max)
146       && (mv2_allgather_thresholds_table[conf_index][range].inter_leader[range_threshold].max !=
147           -1)) {
148       range_threshold++;
149   }
150
151   /* Set inter-leader pt */
152   MV2_Allgatherction =
153       mv2_allgather_thresholds_table[conf_index][range].inter_leader[range_threshold].
154       MV2_pt_Allgatherction;
155
156   is_two_level =  mv2_allgather_thresholds_table[conf_index][range].two_level[range_threshold];
157
158   /* intracommunicator */
159   if(is_two_level ==1){
160     if(partial_sub_ok ==1){
161       if (comm->is_blocked()){
162       mpi_errno = MPIR_2lvl_Allgather_MV2(sendbuf, sendcount, sendtype,
163                             recvbuf, recvcount, recvtype,
164                             comm);
165       }else{
166       mpi_errno = Coll_allgather_mpich::allgather(sendbuf, sendcount, sendtype,
167                             recvbuf, recvcount, recvtype,
168                             comm);
169       }
170     } else {
171       mpi_errno = MPIR_Allgather_RD_MV2(sendbuf, sendcount, sendtype,
172           recvbuf, recvcount, recvtype,
173           comm);
174     }
175   } else if(MV2_Allgatherction == &MPIR_Allgather_Bruck_MV2
176       || MV2_Allgatherction == &MPIR_Allgather_RD_MV2
177       || MV2_Allgatherction == &MPIR_Allgather_Ring_MV2) {
178       mpi_errno = MV2_Allgatherction(sendbuf, sendcount, sendtype,
179           recvbuf, recvcount, recvtype,
180           comm);
181   }else{
182       return MPI_ERR_OTHER;
183   }
184
185   return mpi_errno;
186 }
187
188 int Coll_gather_mvapich2::gather(void *sendbuf,
189     int sendcnt,
190     MPI_Datatype sendtype,
191     void *recvbuf,
192     int recvcnt,
193     MPI_Datatype recvtype,
194     int root, MPI_Comm  comm)
195 {
196   if(mv2_gather_thresholds_table==NULL)
197     init_mv2_gather_tables_stampede();
198
199   int mpi_errno = MPI_SUCCESS;
200   int range = 0;
201   int range_threshold = 0;
202   int range_intra_threshold = 0;
203   long nbytes = 0;
204   int comm_size = 0;
205   int recvtype_size, sendtype_size;
206   int rank = -1;
207   comm_size = comm->size();
208   rank = comm->rank();
209
210   if (rank == root) {
211       recvtype_size=recvtype->size();
212       nbytes = recvcnt * recvtype_size;
213   } else {
214       sendtype_size=sendtype->size();
215       nbytes = sendcnt * sendtype_size;
216   }
217
218   /* Search for the corresponding system size inside the tuning table */
219   while ((range < (mv2_size_gather_tuning_table - 1)) &&
220       (comm_size > mv2_gather_thresholds_table[range].numproc)) {
221       range++;
222   }
223   /* Search for corresponding inter-leader function */
224   while ((range_threshold < (mv2_gather_thresholds_table[range].size_inter_table - 1))
225       && (nbytes >
226   mv2_gather_thresholds_table[range].inter_leader[range_threshold].max)
227   && (mv2_gather_thresholds_table[range].inter_leader[range_threshold].max !=
228       -1)) {
229       range_threshold++;
230   }
231
232   /* Search for corresponding intra node function */
233   while ((range_intra_threshold < (mv2_gather_thresholds_table[range].size_intra_table - 1))
234       && (nbytes >
235   mv2_gather_thresholds_table[range].intra_node[range_intra_threshold].max)
236   && (mv2_gather_thresholds_table[range].intra_node[range_intra_threshold].max !=
237       -1)) {
238       range_intra_threshold++;
239   }
240
241     if (comm->is_blocked() ) {
242         // Set intra-node function pt for gather_two_level
243         MV2_Gather_intra_node_function =
244                               mv2_gather_thresholds_table[range].intra_node[range_intra_threshold].
245                               MV2_pt_Gather_function;
246         //Set inter-leader pt
247         MV2_Gather_inter_leader_function =
248                               mv2_gather_thresholds_table[range].inter_leader[range_threshold].
249                               MV2_pt_Gather_function;
250         // We call Gather function
251         mpi_errno =
252             MV2_Gather_inter_leader_function(sendbuf, sendcnt, sendtype, recvbuf, recvcnt,
253                                              recvtype, root, comm);
254
255     } else {
256   // Indeed, direct (non SMP-aware)gather is MPICH one
257   mpi_errno = Coll_gather_mpich::gather(sendbuf, sendcnt, sendtype,
258       recvbuf, recvcnt, recvtype,
259       root, comm);
260   }
261
262   return mpi_errno;
263 }
264
265 int Coll_allgatherv_mvapich2::allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
266     void *recvbuf, int *recvcounts, int *displs,
267     MPI_Datatype recvtype, MPI_Comm  comm )
268 {
269   int mpi_errno = MPI_SUCCESS;
270   int range = 0, comm_size, total_count, recvtype_size, i;
271   int range_threshold = 0;
272   long nbytes = 0;
273
274   if(mv2_allgatherv_thresholds_table==NULL)
275     init_mv2_allgatherv_tables_stampede();
276
277   comm_size = comm->size();
278   total_count = 0;
279   for (i = 0; i < comm_size; i++)
280     total_count += recvcounts[i];
281
282   recvtype_size=recvtype->size();
283   nbytes = total_count * recvtype_size;
284
285   /* Search for the corresponding system size inside the tuning table */
286   while ((range < (mv2_size_allgatherv_tuning_table - 1)) &&
287       (comm_size > mv2_allgatherv_thresholds_table[range].numproc)) {
288       range++;
289   }
290   /* Search for corresponding inter-leader function */
291   while ((range_threshold < (mv2_allgatherv_thresholds_table[range].size_inter_table - 1))
292       && (nbytes >
293   comm_size * mv2_allgatherv_thresholds_table[range].inter_leader[range_threshold].max)
294   && (mv2_allgatherv_thresholds_table[range].inter_leader[range_threshold].max !=
295       -1)) {
296       range_threshold++;
297   }
298   /* Set inter-leader pt */
299   MV2_Allgatherv_function =
300       mv2_allgatherv_thresholds_table[range].inter_leader[range_threshold].
301       MV2_pt_Allgatherv_function;
302
303   if (MV2_Allgatherv_function == &MPIR_Allgatherv_Rec_Doubling_MV2)
304     {
305     if (not(comm_size & (comm_size - 1))) {
306       mpi_errno =
307           MPIR_Allgatherv_Rec_Doubling_MV2(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm);
308         } else {
309             mpi_errno =
310                 MPIR_Allgatherv_Bruck_MV2(sendbuf, sendcount,
311                     sendtype, recvbuf,
312                     recvcounts, displs,
313                     recvtype, comm);
314         }
315     } else {
316         mpi_errno =
317             MV2_Allgatherv_function(sendbuf, sendcount, sendtype,
318                 recvbuf, recvcounts, displs,
319                 recvtype, comm);
320     }
321
322   return mpi_errno;
323 }
324
325
326
327 int Coll_allreduce_mvapich2::allreduce(void *sendbuf,
328     void *recvbuf,
329     int count,
330     MPI_Datatype datatype,
331     MPI_Op op, MPI_Comm comm)
332 {
333
334   int mpi_errno = MPI_SUCCESS;
335   //int rank = 0,
336   int comm_size = 0;
337
338   comm_size = comm->size();
339   //rank = comm->rank();
340
341   if (count == 0) {
342       return MPI_SUCCESS;
343   }
344
345   if (mv2_allreduce_thresholds_table == NULL)
346     init_mv2_allreduce_tables_stampede();
347
348   /* check if multiple threads are calling this collective function */
349
350   MPI_Aint sendtype_size = 0;
351   long nbytes = 0;
352   int is_commutative = 0;
353   MPI_Aint true_lb, true_extent;
354
355   sendtype_size=datatype->size();
356   nbytes = count * sendtype_size;
357
358   datatype->extent(&true_lb, &true_extent);
359   //MPI_Op *op_ptr;
360   //is_commutative = op->is_commutative();
361
362   {
363     int range = 0, range_threshold = 0, range_threshold_intra = 0;
364     int is_two_level = 0;
365
366     /* Search for the corresponding system size inside the tuning table */
367     while ((range < (mv2_size_allreduce_tuning_table - 1)) &&
368         (comm_size > mv2_allreduce_thresholds_table[range].numproc)) {
369         range++;
370     }
371     /* Search for corresponding inter-leader function */
372     /* skip mcast poiters if mcast is not available */
373     if(mv2_allreduce_thresholds_table[range].mcast_enabled != 1){
374         while ((range_threshold < (mv2_allreduce_thresholds_table[range].size_inter_table - 1))
375             && ((mv2_allreduce_thresholds_table[range].
376                 inter_leader[range_threshold].MV2_pt_Allreducection
377                 == &MPIR_Allreduce_mcst_reduce_redscat_gather_MV2) ||
378                 (mv2_allreduce_thresholds_table[range].
379                     inter_leader[range_threshold].MV2_pt_Allreducection
380                     == &MPIR_Allreduce_mcst_reduce_two_level_helper_MV2)
381             )) {
382             range_threshold++;
383         }
384     }
385     while ((range_threshold < (mv2_allreduce_thresholds_table[range].size_inter_table - 1))
386         && (nbytes >
387     mv2_allreduce_thresholds_table[range].inter_leader[range_threshold].max)
388     && (mv2_allreduce_thresholds_table[range].inter_leader[range_threshold].max != -1)) {
389         range_threshold++;
390     }
391     if(mv2_allreduce_thresholds_table[range].is_two_level_allreduce[range_threshold] == 1){
392         is_two_level = 1;
393     }
394     /* Search for corresponding intra-node function */
395     while ((range_threshold_intra <
396         (mv2_allreduce_thresholds_table[range].size_intra_table - 1))
397         && (nbytes >
398     mv2_allreduce_thresholds_table[range].intra_node[range_threshold_intra].max)
399     && (mv2_allreduce_thresholds_table[range].intra_node[range_threshold_intra].max !=
400         -1)) {
401         range_threshold_intra++;
402     }
403
404     MV2_Allreducection = mv2_allreduce_thresholds_table[range].inter_leader[range_threshold]
405                                                                                 .MV2_pt_Allreducection;
406
407     MV2_Allreduce_intra_function = mv2_allreduce_thresholds_table[range].intra_node[range_threshold_intra]
408                                                                                     .MV2_pt_Allreducection;
409
410     /* check if mcast is ready, otherwise replace mcast with other algorithm */
411     if((MV2_Allreducection == &MPIR_Allreduce_mcst_reduce_redscat_gather_MV2)||
412         (MV2_Allreducection == &MPIR_Allreduce_mcst_reduce_two_level_helper_MV2)){
413         {
414           MV2_Allreducection = &MPIR_Allreduce_pt2pt_rd_MV2;
415         }
416         if(is_two_level != 1) {
417             MV2_Allreducection = &MPIR_Allreduce_pt2pt_rd_MV2;
418         }
419     }
420
421     if(is_two_level == 1){
422         // check if shm is ready, if not use other algorithm first
423         if (is_commutative) {
424           if(comm->get_leaders_comm()==MPI_COMM_NULL){
425             comm->init_smp();
426           }
427           mpi_errno = MPIR_Allreduce_two_level_MV2(sendbuf, recvbuf, count,
428                                                      datatype, op, comm);
429                 } else {
430         mpi_errno = MPIR_Allreduce_pt2pt_rd_MV2(sendbuf, recvbuf, count,
431             datatype, op, comm);
432         }
433     } else {
434         mpi_errno = MV2_Allreducection(sendbuf, recvbuf, count,
435             datatype, op, comm);
436     }
437   }
438
439   //comm->ch.intra_node_done=0;
440
441   return (mpi_errno);
442
443
444 }
445
446
447 int Coll_alltoallv_mvapich2::alltoallv(void *sbuf, int *scounts, int *sdisps,
448     MPI_Datatype sdtype,
449     void *rbuf, int *rcounts, int *rdisps,
450     MPI_Datatype rdtype,
451     MPI_Comm  comm
452 )
453 {
454
455   if (sbuf == MPI_IN_PLACE) {
456       return Coll_alltoallv_ompi_basic_linear::alltoallv(sbuf, scounts, sdisps, sdtype,
457           rbuf, rcounts, rdisps,rdtype,
458           comm);
459   } else     /* For starters, just keep the original algorithm. */
460   return Coll_alltoallv_ring::alltoallv(sbuf, scounts, sdisps, sdtype,
461       rbuf, rcounts, rdisps,rdtype,
462       comm);
463 }
464
465
466 int Coll_barrier_mvapich2::barrier(MPI_Comm  comm)
467 {
468   return Coll_barrier_mvapich2_pair::barrier(comm);
469 }
470
471
472
473
474 int Coll_bcast_mvapich2::bcast(void *buffer,
475     int count,
476     MPI_Datatype datatype,
477     int root, MPI_Comm comm)
478 {
479     int mpi_errno = MPI_SUCCESS;
480     int comm_size/*, rank*/;
481     int two_level_bcast = 1;
482     long nbytes = 0;
483     int range = 0;
484     int range_threshold = 0;
485     int range_threshold_intra = 0;
486     int is_homogeneous, is_contig;
487     MPI_Aint type_size;
488     //, position;
489     void *tmp_buf = NULL;
490     MPI_Comm shmem_comm;
491     //MPID_Datatype *dtp;
492
493     if (count == 0)
494         return MPI_SUCCESS;
495     if(comm->get_leaders_comm()==MPI_COMM_NULL){
496       comm->init_smp();
497     }
498     if (not mv2_bcast_thresholds_table)
499       init_mv2_bcast_tables_stampede();
500     comm_size = comm->size();
501     //rank = comm->rank();
502
503     is_contig=1;
504 /*    if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN)*/
505 /*        is_contig = 1;*/
506 /*    else {*/
507 /*        MPID_Datatype_get_ptr(datatype, dtp);*/
508 /*        is_contig = dtp->is_contig;*/
509 /*    }*/
510
511     is_homogeneous = 1;
512
513     /* MPI_Type_size() might not give the accurate size of the packed
514      * datatype for heterogeneous systems (because of padding, encoding,
515      * etc). On the other hand, MPI_Pack_size() can become very
516      * expensive, depending on the implementation, especially for
517      * heterogeneous systems. We want to use MPI_Type_size() wherever
518      * possible, and MPI_Pack_size() in other places.
519      */
520     //if (is_homogeneous) {
521         type_size=datatype->size();
522
523    /* } else {
524         MPIR_Pack_size_impl(1, datatype, &type_size);
525     }*/
526     nbytes =  (count) * (type_size);
527
528     /* Search for the corresponding system size inside the tuning table */
529     while ((range < (mv2_size_bcast_tuning_table - 1)) &&
530            (comm_size > mv2_bcast_thresholds_table[range].numproc)) {
531         range++;
532     }
533     /* Search for corresponding inter-leader function */
534     while ((range_threshold < (mv2_bcast_thresholds_table[range].size_inter_table - 1))
535            && (nbytes >
536                mv2_bcast_thresholds_table[range].inter_leader[range_threshold].max)
537            && (mv2_bcast_thresholds_table[range].inter_leader[range_threshold].max != -1)) {
538         range_threshold++;
539     }
540
541     /* Search for corresponding intra-node function */
542     while ((range_threshold_intra <
543             (mv2_bcast_thresholds_table[range].size_intra_table - 1))
544            && (nbytes >
545                mv2_bcast_thresholds_table[range].intra_node[range_threshold_intra].max)
546            && (mv2_bcast_thresholds_table[range].intra_node[range_threshold_intra].max !=
547                -1)) {
548         range_threshold_intra++;
549     }
550
551     MV2_Bcast_function =
552         mv2_bcast_thresholds_table[range].inter_leader[range_threshold].
553         MV2_pt_Bcast_function;
554
555     MV2_Bcast_intra_node_function =
556         mv2_bcast_thresholds_table[range].
557         intra_node[range_threshold_intra].MV2_pt_Bcast_function;
558
559 /*    if (mv2_user_bcast_intra == NULL && */
560 /*            MV2_Bcast_intra_node_function == &MPIR_Knomial_Bcast_intra_node_MV2) {*/
561 /*            MV2_Bcast_intra_node_function = &MPIR_Shmem_Bcast_MV2;*/
562 /*    }*/
563
564     if (mv2_bcast_thresholds_table[range].inter_leader[range_threshold].
565         zcpy_pipelined_knomial_factor != -1) {
566         zcpy_knomial_factor =
567             mv2_bcast_thresholds_table[range].inter_leader[range_threshold].
568             zcpy_pipelined_knomial_factor;
569     }
570
571     if (mv2_pipelined_zcpy_knomial_factor != -1) {
572         zcpy_knomial_factor = mv2_pipelined_zcpy_knomial_factor;
573     }
574
575     if(MV2_Bcast_intra_node_function == NULL) {
576         /* if tuning table do not have any intra selection, set func pointer to
577         ** default one for mcast intra node */
578         MV2_Bcast_intra_node_function = &MPIR_Shmem_Bcast_MV2;
579     }
580
581     /* Set value of pipeline segment size */
582     bcast_segment_size = mv2_bcast_thresholds_table[range].bcast_segment_size;
583
584     /* Set value of inter node knomial factor */
585     mv2_inter_node_knomial_factor = mv2_bcast_thresholds_table[range].inter_node_knomial_factor;
586
587     /* Set value of intra node knomial factor */
588     mv2_intra_node_knomial_factor = mv2_bcast_thresholds_table[range].intra_node_knomial_factor;
589
590     /* Check if we will use a two level algorithm or not */
591     two_level_bcast =
592 #if defined(_MCST_SUPPORT_)
593         mv2_bcast_thresholds_table[range].is_two_level_bcast[range_threshold]
594         || comm->ch.is_mcast_ok;
595 #else
596         mv2_bcast_thresholds_table[range].is_two_level_bcast[range_threshold];
597 #endif
598      if (two_level_bcast == 1) {
599        if (not is_contig || not is_homogeneous) {
600          tmp_buf = (void*)smpi_get_tmp_sendbuffer(nbytes);
601
602          /*            position = 0;*/
603          /*            if (rank == root) {*/
604          /*                mpi_errno =*/
605          /*                    MPIR_Pack_impl(buffer, count, datatype, tmp_buf, nbytes, &position);*/
606          /*                if (mpi_errno)*/
607          /*                    MPIU_ERR_POP(mpi_errno);*/
608          /*            }*/
609         }
610 #ifdef CHANNEL_MRAIL_GEN2
611         if ((mv2_enable_zcpy_bcast == 1) &&
612               (&MPIR_Pipelined_Bcast_Zcpy_MV2 == MV2_Bcast_function)) {
613           if (not is_contig || not is_homogeneous) {
614             mpi_errno = MPIR_Pipelined_Bcast_Zcpy_MV2(tmp_buf, nbytes, MPI_BYTE, root, comm);
615             } else {
616                 mpi_errno = MPIR_Pipelined_Bcast_Zcpy_MV2(buffer, count, datatype,
617                                                  root, comm);
618             }
619         } else
620 #endif /* defined(CHANNEL_MRAIL_GEN2) */
621         {
622             shmem_comm = comm->get_intra_comm();
623             if (not is_contig || not is_homogeneous) {
624               MPIR_Bcast_tune_inter_node_helper_MV2(tmp_buf, nbytes, MPI_BYTE, root, comm);
625             } else {
626               MPIR_Bcast_tune_inter_node_helper_MV2(buffer, count, datatype, root, comm);
627             }
628
629             /* We are now done with the inter-node phase */
630
631
632                     root = INTRA_NODE_ROOT;
633
634                     if (not is_contig || not is_homogeneous) {
635                       mpi_errno = MV2_Bcast_intra_node_function(tmp_buf, nbytes, MPI_BYTE, root, shmem_comm);
636                 } else {
637                     mpi_errno = MV2_Bcast_intra_node_function(buffer, count,
638                                                               datatype, root, shmem_comm);
639
640                 }
641         }
642         /*        if (not is_contig || not is_homogeneous) {*/
643         /*            if (rank != root) {*/
644         /*                position = 0;*/
645         /*                mpi_errno = MPIR_Unpack_impl(tmp_buf, nbytes, &position, buffer,*/
646         /*                                             count, datatype);*/
647         /*            }*/
648         /*        }*/
649     } else {
650         /* We use Knomial for intra node */
651         MV2_Bcast_intra_node_function = &MPIR_Knomial_Bcast_intra_node_MV2;
652 /*        if (mv2_enable_shmem_bcast == 0) {*/
653             /* Fall back to non-tuned version */
654 /*            MPIR_Bcast_intra_MV2(buffer, count, datatype, root, comm);*/
655 /*        } else {*/
656             mpi_errno = MV2_Bcast_function(buffer, count, datatype, root,
657                                            comm);
658
659 /*        }*/
660     }
661
662
663     return mpi_errno;
664
665 }
666
667
668
669 int Coll_reduce_mvapich2::reduce( void *sendbuf,
670     void *recvbuf,
671     int count,
672     MPI_Datatype datatype,
673     MPI_Op op, int root, MPI_Comm comm)
674 {
675   if(mv2_reduce_thresholds_table == NULL)
676     init_mv2_reduce_tables_stampede();
677
678   int mpi_errno = MPI_SUCCESS;
679   int range = 0;
680   int range_threshold = 0;
681   int range_intra_threshold = 0;
682   int is_commutative, pof2;
683   int comm_size = 0;
684   long nbytes = 0;
685   int sendtype_size;
686   int is_two_level = 0;
687
688   comm_size = comm->size();
689   sendtype_size=datatype->size();
690   nbytes = count * sendtype_size;
691
692   if (count == 0)
693     return MPI_SUCCESS;
694
695   is_commutative = (op==MPI_OP_NULL || op->is_commutative());
696
697   /* find nearest power-of-two less than or equal to comm_size */
698   for( pof2 = 1; pof2 <= comm_size; pof2 <<= 1 );
699   pof2 >>=1;
700
701
702   /* Search for the corresponding system size inside the tuning table */
703   while ((range < (mv2_size_reduce_tuning_table - 1)) &&
704       (comm_size > mv2_reduce_thresholds_table[range].numproc)) {
705       range++;
706   }
707   /* Search for corresponding inter-leader function */
708   while ((range_threshold < (mv2_reduce_thresholds_table[range].size_inter_table - 1))
709       && (nbytes >
710   mv2_reduce_thresholds_table[range].inter_leader[range_threshold].max)
711   && (mv2_reduce_thresholds_table[range].inter_leader[range_threshold].max !=
712       -1)) {
713       range_threshold++;
714   }
715
716   /* Search for corresponding intra node function */
717   while ((range_intra_threshold < (mv2_reduce_thresholds_table[range].size_intra_table - 1))
718       && (nbytes >
719   mv2_reduce_thresholds_table[range].intra_node[range_intra_threshold].max)
720   && (mv2_reduce_thresholds_table[range].intra_node[range_intra_threshold].max !=
721       -1)) {
722       range_intra_threshold++;
723   }
724
725   /* Set intra-node function pt for reduce_two_level */
726   MV2_Reduce_intra_function =
727       mv2_reduce_thresholds_table[range].intra_node[range_intra_threshold].
728       MV2_pt_Reduce_function;
729   /* Set inter-leader pt */
730   MV2_Reduce_function =
731       mv2_reduce_thresholds_table[range].inter_leader[range_threshold].
732       MV2_pt_Reduce_function;
733
734   if(mv2_reduce_intra_knomial_factor<0)
735     {
736       mv2_reduce_intra_knomial_factor = mv2_reduce_thresholds_table[range].intra_k_degree;
737     }
738   if(mv2_reduce_inter_knomial_factor<0)
739     {
740       mv2_reduce_inter_knomial_factor = mv2_reduce_thresholds_table[range].inter_k_degree;
741     }
742   if(mv2_reduce_thresholds_table[range].is_two_level_reduce[range_threshold] == 1){
743       is_two_level = 1;
744   }
745   /* We call Reduce function */
746   if(is_two_level == 1)
747     {
748        if (is_commutative == 1) {
749          if(comm->get_leaders_comm()==MPI_COMM_NULL){
750            comm->init_smp();
751          }
752          mpi_errno = MPIR_Reduce_two_level_helper_MV2(sendbuf, recvbuf, count,
753                                            datatype, op, root, comm);
754         } else {
755       mpi_errno = MPIR_Reduce_binomial_MV2(sendbuf, recvbuf, count,
756           datatype, op, root, comm);
757       }
758     } else if(MV2_Reduce_function == &MPIR_Reduce_inter_knomial_wrapper_MV2 ){
759         if(is_commutative ==1)
760           {
761             mpi_errno = MV2_Reduce_function(sendbuf, recvbuf, count,
762                 datatype, op, root, comm);
763           } else {
764               mpi_errno = MPIR_Reduce_binomial_MV2(sendbuf, recvbuf, count,
765                   datatype, op, root, comm);
766           }
767     } else if(MV2_Reduce_function == &MPIR_Reduce_redscat_gather_MV2){
768         if (/*(HANDLE_GET_KIND(op) == HANDLE_KIND_BUILTIN) &&*/ (count >= pof2))
769           {
770             mpi_errno = MV2_Reduce_function(sendbuf, recvbuf, count,
771                 datatype, op, root, comm);
772           } else {
773               mpi_errno = MPIR_Reduce_binomial_MV2(sendbuf, recvbuf, count,
774                   datatype, op, root, comm);
775           }
776     } else {
777         mpi_errno = MV2_Reduce_function(sendbuf, recvbuf, count,
778             datatype, op, root, comm);
779     }
780
781
782   return mpi_errno;
783
784 }
785
786
787 int Coll_reduce_scatter_mvapich2::reduce_scatter(void *sendbuf, void *recvbuf, int *recvcnts,
788     MPI_Datatype datatype, MPI_Op op,
789     MPI_Comm comm)
790 {
791   int mpi_errno = MPI_SUCCESS;
792   int i = 0, comm_size = comm->size(), total_count = 0, type_size =
793       0, nbytes = 0;
794   int is_commutative = 0;
795   int* disps          = new int[comm_size];
796
797   if(mv2_red_scat_thresholds_table==NULL)
798     init_mv2_reduce_scatter_tables_stampede();
799
800   is_commutative=(op==MPI_OP_NULL || op->is_commutative());
801   for (i = 0; i < comm_size; i++) {
802       disps[i] = total_count;
803       total_count += recvcnts[i];
804   }
805
806   type_size=datatype->size();
807   nbytes = total_count * type_size;
808
809   if (is_commutative) {
810     int range           = 0;
811     int range_threshold = 0;
812
813       /* Search for the corresponding system size inside the tuning table */
814       while ((range < (mv2_size_red_scat_tuning_table - 1)) &&
815           (comm_size > mv2_red_scat_thresholds_table[range].numproc)) {
816           range++;
817       }
818       /* Search for corresponding inter-leader function */
819       while ((range_threshold < (mv2_red_scat_thresholds_table[range].size_inter_table - 1))
820           && (nbytes >
821       mv2_red_scat_thresholds_table[range].inter_leader[range_threshold].max)
822       && (mv2_red_scat_thresholds_table[range].inter_leader[range_threshold].max !=
823           -1)) {
824           range_threshold++;
825       }
826
827       /* Set inter-leader pt */
828       MV2_Red_scat_function =
829           mv2_red_scat_thresholds_table[range].inter_leader[range_threshold].
830           MV2_pt_Red_scat_function;
831
832       mpi_errno = MV2_Red_scat_function(sendbuf, recvbuf,
833           recvcnts, datatype,
834           op, comm);
835   } else {
836       int is_block_regular = 1;
837       for (i = 0; i < (comm_size - 1); ++i) {
838           if (recvcnts[i] != recvcnts[i+1]) {
839               is_block_regular = 0;
840               break;
841           }
842       }
843       int pof2 = 1;
844       while (pof2 < comm_size) pof2 <<= 1;
845       if (pof2 == comm_size && is_block_regular) {
846           /* noncommutative, pof2 size, and block regular */
847           MPIR_Reduce_scatter_non_comm_MV2(sendbuf, recvbuf,
848               recvcnts, datatype,
849               op, comm);
850       }
851       mpi_errno =  Coll_reduce_scatter_mpich_rdb::reduce_scatter(sendbuf, recvbuf,
852           recvcnts, datatype,
853           op, comm);
854   }
855   delete[] disps;
856   return mpi_errno;
857
858 }
859
860
861
862 int Coll_scatter_mvapich2::scatter(void *sendbuf,
863     int sendcnt,
864     MPI_Datatype sendtype,
865     void *recvbuf,
866     int recvcnt,
867     MPI_Datatype recvtype,
868     int root, MPI_Comm comm)
869 {
870   int range = 0, range_threshold = 0, range_threshold_intra = 0;
871   int mpi_errno = MPI_SUCCESS;
872   //   int mpi_errno_ret = MPI_SUCCESS;
873   int rank, nbytes, comm_size;
874   int partial_sub_ok = 0;
875   int conf_index = 0;
876      MPI_Comm shmem_comm;
877   //    MPID_Comm *shmem_commptr=NULL;
878   if(mv2_scatter_thresholds_table==NULL)
879     init_mv2_scatter_tables_stampede();
880
881   if(comm->get_leaders_comm()==MPI_COMM_NULL){
882     comm->init_smp();
883   }
884
885   comm_size = comm->size();
886
887   rank = comm->rank();
888
889   if (rank == root) {
890     int sendtype_size = sendtype->size();
891     nbytes            = sendcnt * sendtype_size;
892   } else {
893     int recvtype_size = recvtype->size();
894     nbytes            = recvcnt * recvtype_size;
895   }
896
897     // check if safe to use partial subscription mode
898     if (comm->is_uniform()) {
899
900         shmem_comm = comm->get_intra_comm();
901         int local_size = shmem_comm->size();
902         int i          = 0;
903         if (mv2_scatter_table_ppn_conf[0] == -1) {
904             // Indicating user defined tuning
905             conf_index = 0;
906         }else{
907             do {
908                 if (local_size == mv2_scatter_table_ppn_conf[i]) {
909                     conf_index = i;
910                     partial_sub_ok = 1;
911                     break;
912                 }
913                 i++;
914             } while(i < mv2_scatter_num_ppn_conf);
915         }
916     }
917
918   if (partial_sub_ok != 1) {
919       conf_index = 0;
920   }
921
922   /* Search for the corresponding system size inside the tuning table */
923   while ((range < (mv2_size_scatter_tuning_table[conf_index] - 1)) &&
924       (comm_size > mv2_scatter_thresholds_table[conf_index][range].numproc)) {
925       range++;
926   }
927   /* Search for corresponding inter-leader function */
928   while ((range_threshold < (mv2_scatter_thresholds_table[conf_index][range].size_inter_table - 1))
929       && (nbytes >
930   mv2_scatter_thresholds_table[conf_index][range].inter_leader[range_threshold].max)
931   && (mv2_scatter_thresholds_table[conf_index][range].inter_leader[range_threshold].max != -1)) {
932       range_threshold++;
933   }
934
935   /* Search for corresponding intra-node function */
936   while ((range_threshold_intra <
937       (mv2_scatter_thresholds_table[conf_index][range].size_intra_table - 1))
938       && (nbytes >
939   mv2_scatter_thresholds_table[conf_index][range].intra_node[range_threshold_intra].max)
940   && (mv2_scatter_thresholds_table[conf_index][range].intra_node[range_threshold_intra].max !=
941       -1)) {
942       range_threshold_intra++;
943   }
944
945   MV2_Scatter_function = mv2_scatter_thresholds_table[conf_index][range].inter_leader[range_threshold]
946                                                                                       .MV2_pt_Scatter_function;
947
948   if(MV2_Scatter_function == &MPIR_Scatter_mcst_wrap_MV2) {
949 #if defined(_MCST_SUPPORT_)
950       if(comm->ch.is_mcast_ok == 1
951           && mv2_use_mcast_scatter == 1
952           && comm->ch.shmem_coll_ok == 1) {
953           MV2_Scatter_function = &MPIR_Scatter_mcst_MV2;
954       } else
955 #endif /*#if defined(_MCST_SUPPORT_) */
956         {
957           if(mv2_scatter_thresholds_table[conf_index][range].inter_leader[range_threshold + 1].
958               MV2_pt_Scatter_function != NULL) {
959               MV2_Scatter_function = mv2_scatter_thresholds_table[conf_index][range].inter_leader[range_threshold + 1]
960                                                                                                   .MV2_pt_Scatter_function;
961           } else {
962               /* Fallback! */
963               MV2_Scatter_function = &MPIR_Scatter_MV2_Binomial;
964           }
965         }
966   }
967
968   if( (MV2_Scatter_function == &MPIR_Scatter_MV2_two_level_Direct) ||
969       (MV2_Scatter_function == &MPIR_Scatter_MV2_two_level_Binomial)) {
970        if( comm->is_blocked()) {
971              MV2_Scatter_intra_function = mv2_scatter_thresholds_table[conf_index][range].intra_node[range_threshold_intra]
972                                 .MV2_pt_Scatter_function;
973
974              mpi_errno =
975                    MV2_Scatter_function(sendbuf, sendcnt, sendtype,
976                                         recvbuf, recvcnt, recvtype, root,
977                                         comm);
978          } else {
979       mpi_errno = MPIR_Scatter_MV2_Binomial(sendbuf, sendcnt, sendtype,
980           recvbuf, recvcnt, recvtype, root,
981           comm);
982
983       }
984   } else {
985       mpi_errno = MV2_Scatter_function(sendbuf, sendcnt, sendtype,
986           recvbuf, recvcnt, recvtype, root,
987           comm);
988   }
989   return (mpi_errno);
990 }
991
992 }
993 }
994
995 void smpi_coll_cleanup_mvapich2()
996 {
997   if (mv2_alltoall_thresholds_table)
998     delete[] mv2_alltoall_thresholds_table[0];
999   delete[] mv2_alltoall_thresholds_table;
1000   delete[] mv2_size_alltoall_tuning_table;
1001   delete[] mv2_alltoall_table_ppn_conf;
1002
1003   delete[] mv2_gather_thresholds_table;
1004   if (mv2_allgather_thresholds_table)
1005     delete[] mv2_allgather_thresholds_table[0];
1006   delete[] mv2_size_allgather_tuning_table;
1007   delete[] mv2_allgather_table_ppn_conf;
1008   delete[] mv2_allgather_thresholds_table;
1009
1010   delete[] mv2_allgatherv_thresholds_table;
1011   delete[] mv2_reduce_thresholds_table;
1012   delete[] mv2_red_scat_thresholds_table;
1013   delete[] mv2_allreduce_thresholds_table;
1014   delete[] mv2_bcast_thresholds_table;
1015   if (mv2_scatter_thresholds_table)
1016     delete[] mv2_scatter_thresholds_table[0];
1017   delete[] mv2_scatter_thresholds_table;
1018   delete[] mv2_size_scatter_tuning_table;
1019   delete[] mv2_scatter_table_ppn_conf;
1020 }