From: degomme Date: Tue, 7 Mar 2017 14:21:47 +0000 (+0100) Subject: MPI_Win (RMA) -> C++ X-Git-Tag: v3_15~195 X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/bb1ec3ebdc8aad0b788cfded0ca9bf7b071a6721 MPI_Win (RMA) -> C++ --- diff --git a/include/smpi/forward.hpp b/include/smpi/forward.hpp index b4f89bfac2..9097819bce 100644 --- a/include/smpi/forward.hpp +++ b/include/smpi/forward.hpp @@ -16,6 +16,7 @@ namespace smpi { class Group; class Comm; class Topo; +class Win; class Cart; class Graph; class Dist_Graph; @@ -26,6 +27,7 @@ class Dist_Graph; typedef simgrid::smpi::Group SMPI_Group; typedef simgrid::smpi::Comm SMPI_Comm; typedef simgrid::smpi::Topo SMPI_Topology; +typedef simgrid::smpi::Win SMPI_Win; typedef simgrid::smpi::Graph SMPI_Graph_topology; typedef simgrid::smpi::Cart SMPI_Cart_topology; typedef simgrid::smpi::Dist_Graph SMPI_Dist_Graph_topology; @@ -35,6 +37,7 @@ typedef simgrid::smpi::Dist_Graph SMPI_Dist_Graph_topology; typedef struct SMPI_Group SMPI_Group; typedef struct SMPI_Comm SMPI_Comm; typedef struct SMPI_Topology SMPI_Topology; +typedef struct SMPI_Win SMPI_Win; typedef struct SMPI_Graph_topology SMPI_Graph_topology; typedef struct SMPI_Cart_topology SMPI_Cart_topology; typedef struct SMPI_Dist_Graph_topology SMPI_Dist_Graph_topology; diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index c058e3a8d2..e7f73fcfea 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -248,8 +248,7 @@ typedef struct { int count; } MPI_Status; -struct s_smpi_mpi_win; -typedef struct s_smpi_mpi_win* MPI_Win; +typedef SMPI_Win* MPI_Win; struct s_smpi_mpi_info; typedef struct s_smpi_mpi_info *MPI_Info; diff --git a/src/smpi/colls/alltoall-pair.cpp b/src/smpi/colls/alltoall-pair.cpp index a7731c1dd6..632ef0af7c 100644 --- a/src/smpi/colls/alltoall-pair.cpp +++ b/src/smpi/colls/alltoall-pair.cpp @@ -45,19 +45,19 @@ int smpi_coll_tuned_alltoall_pair_rma(void *send_buff, int send_count, MPI_Datat send_chunk = smpi_datatype_get_extent(send_type); recv_chunk = smpi_datatype_get_extent(recv_type); - win=smpi_mpi_win_create(recv_buff, num_procs * recv_chunk * send_count, recv_chunk, 0, + win=new simgrid::smpi::Win(recv_buff, num_procs * recv_chunk * send_count, recv_chunk, 0, comm); send_chunk *= send_count; recv_chunk *= recv_count; - smpi_mpi_win_fence(assert, win); + win->fence(assert); for (i = 0; i < num_procs; i++) { dst = rank ^ i; - smpi_mpi_put(send_ptr + dst * send_chunk, send_count, send_type, dst, - rank /* send_chunk*/, send_count, send_type, win); + win->put(send_ptr + dst * send_chunk, send_count, send_type, dst, + rank /* send_chunk*/, send_count, send_type); } - smpi_mpi_win_fence(assert, win); - smpi_mpi_win_free(&win); + win->fence(assert); + delete win; return 0; } diff --git a/src/smpi/private.h b/src/smpi/private.h index ffaa5b6710..3f65868a7e 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -9,9 +9,6 @@ #include "simgrid/simix.h" #include "smpi/smpi.h" -#include "src/smpi/smpi_group.hpp" -#include "src/smpi/smpi_comm.hpp" -#include "src/smpi/smpi_topo.hpp" #include "src/include/smpi/smpi_interface.h" #include "src/instr/instr_private.h" #include "src/internal_config.h" @@ -19,7 +16,10 @@ #include "xbt/base.h" #include "xbt/synchro.h" #include "xbt/xbt_os_time.h" - +#include "src/smpi/smpi_group.hpp" +#include "src/smpi/smpi_comm.hpp" +#include "src/smpi/smpi_topo.hpp" +#include "src/smpi/smpi_win.hpp" SG_BEGIN_DECL() struct s_smpi_process_data; @@ -311,31 +311,6 @@ XBT_PRIVATE void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,MPI_Datat XBT_PRIVATE void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); -XBT_PRIVATE int smpi_mpi_win_free( MPI_Win* win); - -XBT_PRIVATE MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm); - -XBT_PRIVATE void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length); -XBT_PRIVATE void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group); -XBT_PRIVATE void smpi_mpi_win_set_name(MPI_Win win, char* name); - -XBT_PRIVATE int smpi_mpi_win_fence( int assert, MPI_Win win); - -XBT_PRIVATE int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win); -XBT_PRIVATE int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win); -XBT_PRIVATE int smpi_mpi_win_complete(MPI_Win win); -XBT_PRIVATE int smpi_mpi_win_wait(MPI_Win win); - -XBT_PRIVATE int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win); -XBT_PRIVATE int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win); -XBT_PRIVATE int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win); - -XBT_PRIVATE void nary_tree_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, int arity); -XBT_PRIVATE void nary_tree_barrier(MPI_Comm comm, int arity); - XBT_PRIVATE int smpi_coll_tuned_alltoall_ompi2(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm); XBT_PRIVATE int smpi_coll_tuned_alltoall_bruck(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, diff --git a/src/smpi/smpi_pmpi.cpp b/src/smpi/smpi_pmpi.cpp index 81d520da7a..cffccf4e81 100644 --- a/src/smpi/smpi_pmpi.cpp +++ b/src/smpi/smpi_pmpi.cpp @@ -2437,7 +2437,7 @@ int PMPI_Win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MP }else if ((base == nullptr && size != 0) || disp_unit <= 0 || size < 0 ){ retval= MPI_ERR_OTHER; }else{ - *win = smpi_mpi_win_create( base, size, disp_unit, info, comm); + *win = new simgrid::smpi::Win( base, size, disp_unit, info, comm); retval = MPI_SUCCESS; } smpi_bench_begin(); @@ -2450,7 +2450,8 @@ int PMPI_Win_free( MPI_Win* win){ if (win == nullptr || *win == MPI_WIN_NULL) { retval = MPI_ERR_WIN; }else{ - retval=smpi_mpi_win_free(win); + delete(*win); + retval=MPI_SUCCESS; } smpi_bench_begin(); return retval; @@ -2463,7 +2464,7 @@ int PMPI_Win_set_name(MPI_Win win, char * name) } else if (name == nullptr) { return MPI_ERR_ARG; } else { - smpi_mpi_win_set_name(win, name); + win->set_name(name); return MPI_SUCCESS; } } @@ -2475,7 +2476,7 @@ int PMPI_Win_get_name(MPI_Win win, char * name, int* len) } else if (name == nullptr) { return MPI_ERR_ARG; } else { - smpi_mpi_win_get_name(win, name, len); + win->get_name(name, len); return MPI_SUCCESS; } } @@ -2484,7 +2485,7 @@ int PMPI_Win_get_group(MPI_Win win, MPI_Group * group){ if (win == MPI_WIN_NULL) { return MPI_ERR_WIN; }else { - smpi_mpi_win_get_group(win, group); + win->get_group(group); (*group)->use(); return MPI_SUCCESS; } @@ -2498,7 +2499,7 @@ int PMPI_Win_fence( int assert, MPI_Win win){ } else { int rank = smpi_process_index(); TRACE_smpi_collective_in(rank, -1, __FUNCTION__, nullptr); - retval = smpi_mpi_win_fence(assert, win); + retval = win->fence(assert); TRACE_smpi_collective_out(rank, -1, __FUNCTION__); } smpi_bench_begin(); @@ -2525,12 +2526,12 @@ int PMPI_Get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, } else { int rank = smpi_process_index(); MPI_Group group; - smpi_mpi_win_get_group(win, &group); + win->get_group(&group); int src_traced = group->index(target_rank); TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, nullptr); - retval = smpi_mpi_get( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, - target_datatype, win); + retval = win->get( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, + target_datatype); TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__); } @@ -2558,13 +2559,13 @@ int PMPI_Put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, } else { int rank = smpi_process_index(); MPI_Group group; - smpi_mpi_win_get_group(win, &group); + win->get_group(&group); int dst_traced = group->index(target_rank); TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, nullptr); TRACE_smpi_send(rank, rank, dst_traced, SMPI_RMA_TAG, origin_count*smpi_datatype_size(origin_datatype)); - retval = smpi_mpi_put( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, - target_datatype, win); + retval = win->put( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, + target_datatype); TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__); } @@ -2595,12 +2596,12 @@ int PMPI_Accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_da } else { int rank = smpi_process_index(); MPI_Group group; - smpi_mpi_win_get_group(win, &group); + win->get_group(&group); int src_traced = group->index(target_rank); TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, nullptr); - retval = smpi_mpi_accumulate( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, - target_datatype, op, win); + retval = win->accumulate( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, + target_datatype, op); TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__); } @@ -2618,7 +2619,7 @@ int PMPI_Win_post(MPI_Group group, int assert, MPI_Win win){ } else { int rank = smpi_process_index(); TRACE_smpi_collective_in(rank, -1, __FUNCTION__, nullptr); - retval = smpi_mpi_win_post(group,assert,win); + retval = win->post(group,assert); TRACE_smpi_collective_out(rank, -1, __FUNCTION__); } smpi_bench_begin(); @@ -2635,7 +2636,7 @@ int PMPI_Win_start(MPI_Group group, int assert, MPI_Win win){ } else { int rank = smpi_process_index(); TRACE_smpi_collective_in(rank, -1, __FUNCTION__, nullptr); - retval = smpi_mpi_win_start(group,assert,win); + retval = win->start(group,assert); TRACE_smpi_collective_out(rank, -1, __FUNCTION__); } smpi_bench_begin(); @@ -2651,7 +2652,7 @@ int PMPI_Win_complete(MPI_Win win){ int rank = smpi_process_index(); TRACE_smpi_collective_in(rank, -1, __FUNCTION__, nullptr); - retval = smpi_mpi_win_complete(win); + retval = win->complete(); TRACE_smpi_collective_out(rank, -1, __FUNCTION__); } @@ -2668,7 +2669,7 @@ int PMPI_Win_wait(MPI_Win win){ int rank = smpi_process_index(); TRACE_smpi_collective_in(rank, -1, __FUNCTION__, nullptr); - retval = smpi_mpi_win_wait(win); + retval = win->wait(); TRACE_smpi_collective_out(rank, -1, __FUNCTION__); } diff --git a/src/smpi/smpi_rma.cpp b/src/smpi/smpi_win.cpp similarity index 61% rename from src/smpi/smpi_rma.cpp rename to src/smpi/smpi_win.cpp index 28dc6f96e1..803776fe93 100644 --- a/src/smpi/smpi_rma.cpp +++ b/src/smpi/smpi_win.cpp @@ -9,119 +9,91 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)"); -typedef struct s_smpi_mpi_win{ - void* base; - MPI_Aint size; - int disp_unit; - MPI_Comm comm; - MPI_Info info; - int assert; - std::vector *requests; - xbt_mutex_t mut; - msg_bar_t bar; - MPI_Win* connected_wins; - char* name; - int opened; - MPI_Group group; - int count; //for ordering the accs -} s_smpi_mpi_win_t; - - -MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){ +namespace simgrid{ +namespace smpi{ + +Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){ int comm_size = comm->size(); int rank = comm->rank(); XBT_DEBUG("Creating window"); - - MPI_Win win = xbt_new(s_smpi_mpi_win_t, 1); - win->base = base; - win->size = size; - win->disp_unit = disp_unit; - win->assert = 0; - win->info = info; if(info!=MPI_INFO_NULL) info->refcount++; - win->comm = comm; - win->name = nullptr; - win->opened = 0; - win->group = MPI_GROUP_NULL; - win->requests = new std::vector(); - win->mut=xbt_mutex_init(); - win->connected_wins = xbt_new0(MPI_Win, comm_size); - win->connected_wins[rank] = win; - win->count = 0; + name_ = nullptr; + opened_ = 0; + group_ = MPI_GROUP_NULL; + requests_ = new std::vector(); + mut_=xbt_mutex_init(); + connected_wins_ = new MPI_Win[comm_size]; + connected_wins_[rank] = this; + count_ = 0; if(rank==0){ - win->bar = MSG_barrier_init(comm_size); + bar_ = MSG_barrier_init(comm_size); } - mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win), + mpi_coll_allgather_fun(&(connected_wins_[rank]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE, comm); - mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm); + mpi_coll_bcast_fun(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm); mpi_coll_barrier_fun(comm); - - return win; } -int smpi_mpi_win_free( MPI_Win* win){ +Win::~Win(){ //As per the standard, perform a barrier to ensure every async comm is finished - MSG_barrier_wait((*win)->bar); - xbt_mutex_acquire((*win)->mut); - delete (*win)->requests; - xbt_mutex_release((*win)->mut); - xbt_free((*win)->connected_wins); - if ((*win)->name != nullptr){ - xbt_free((*win)->name); + MSG_barrier_wait(bar_); + xbt_mutex_acquire(mut_); + delete requests_; + xbt_mutex_release(mut_); + delete[] connected_wins_; + if (name_ != nullptr){ + xbt_free(name_); } - if((*win)->info!=MPI_INFO_NULL){ - MPI_Info_free(&(*win)->info); + if(info_!=MPI_INFO_NULL){ + MPI_Info_free(&info_); } - mpi_coll_barrier_fun((*win)->comm); - int rank=(*win)->comm->rank(); + mpi_coll_barrier_fun(comm_); + int rank=comm_->rank(); if(rank == 0) - MSG_barrier_destroy((*win)->bar); - xbt_mutex_destroy((*win)->mut); - xbt_free(*win); - *win = MPI_WIN_NULL; - return MPI_SUCCESS; + MSG_barrier_destroy(bar_); + xbt_mutex_destroy(mut_); } -void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){ - if(win->name==nullptr){ +void Win::get_name(char* name, int* length){ + if(name_==nullptr){ *length=0; name=nullptr; return; } - *length = strlen(win->name); - strncpy(name, win->name, *length+1); + *length = strlen(name_); + strncpy(name, name_, *length+1); } -void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){ - if(win->comm != MPI_COMM_NULL){ - *group = win->comm->group(); +void Win::get_group(MPI_Group* group){ + if(comm_ != MPI_COMM_NULL){ + *group = comm_->group(); } else { *group = MPI_GROUP_NULL; } } -void smpi_mpi_win_set_name(MPI_Win win, char* name){ - win->name = xbt_strdup(name); +void Win::set_name(char* name){ + name_ = xbt_strdup(name); } -int smpi_mpi_win_fence(int assert, MPI_Win win) +int Win::fence(int assert) { XBT_DEBUG("Entering fence"); - if (win->opened == 0) - win->opened=1; + if (opened_ == 0) + opened_=1; if (assert != MPI_MODE_NOPRECEDE) { // This is not the first fence => finalize what came before - MSG_barrier_wait(win->bar); - xbt_mutex_acquire(win->mut); + MSG_barrier_wait(bar_); + xbt_mutex_acquire(mut_); // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall. // Without this, the vector could get redimensionned when another process pushes. // This would result in the array used by smpi_mpi_waitall() to be invalidated. // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall - std::vector *reqs = win->requests; + std::vector *reqs = requests_; int size = static_cast(reqs->size()); // start all requests that have been prepared by another process if (size > 0) { @@ -134,48 +106,48 @@ int smpi_mpi_win_fence(int assert, MPI_Win win) smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE); } - win->count=0; - xbt_mutex_release(win->mut); + count_=0; + xbt_mutex_release(mut_); } - win->assert = assert; + assert_ = assert; - MSG_barrier_wait(win->bar); + MSG_barrier_wait(bar_); XBT_DEBUG("Leaving fence"); return MPI_SUCCESS; } -int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win) +int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype) { - if(win->opened==0)//check that post/start has been done + if(opened_==0)//check that post/start has been done return MPI_ERR_WIN; //get receiver pointer - MPI_Win recv_win = win->connected_wins[target_rank]; + MPI_Win recv_win = connected_wins_[target_rank]; - void* recv_addr = static_cast ( static_cast(recv_win->base) + target_disp * recv_win->disp_unit); + void* recv_addr = static_cast ( static_cast(recv_win->base_) + target_disp * recv_win->disp_unit_); XBT_DEBUG("Entering MPI_Put to %d", target_rank); - if(target_rank != win->comm->rank()){ + if(target_rank != comm_->rank()){ //prepare send_request MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(), - win->comm->group()->index(target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL); + comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL); //prepare receiver request MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(), - win->comm->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL); + comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL); //push request to receiver's win - xbt_mutex_acquire(recv_win->mut); - recv_win->requests->push_back(rreq); - xbt_mutex_release(recv_win->mut); + xbt_mutex_acquire(recv_win->mut_); + recv_win->requests_->push_back(rreq); + xbt_mutex_release(recv_win->mut_); //start send smpi_mpi_start(sreq); //push request to sender's win - xbt_mutex_acquire(win->mut); - win->requests->push_back(sreq); - xbt_mutex_release(win->mut); + xbt_mutex_acquire(mut_); + requests_->push_back(sreq); + xbt_mutex_release(mut_); }else{ smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype); } @@ -183,41 +155,41 @@ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datat return MPI_SUCCESS; } -int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win) +int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype) { - if(win->opened==0)//check that post/start has been done + if(opened_==0)//check that post/start has been done return MPI_ERR_WIN; //get sender pointer - MPI_Win send_win = win->connected_wins[target_rank]; + MPI_Win send_win = connected_wins_[target_rank]; - void* send_addr = static_cast(static_cast(send_win->base) + target_disp * send_win->disp_unit); + void* send_addr = static_cast(static_cast(send_win->base_) + target_disp * send_win->disp_unit_); XBT_DEBUG("Entering MPI_Get from %d", target_rank); - if(target_rank != win->comm->rank()){ + if(target_rank != comm_->rank()){ //prepare send_request MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype, - win->comm->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm, + comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm_, MPI_OP_NULL); //prepare receiver request MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype, - win->comm->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm, + comm_->group()->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, comm_, MPI_OP_NULL); //start the send, with another process than us as sender. smpi_mpi_start(sreq); //push request to receiver's win - xbt_mutex_acquire(send_win->mut); - send_win->requests->push_back(sreq); - xbt_mutex_release(send_win->mut); + xbt_mutex_acquire(send_win->mut_); + send_win->requests_->push_back(sreq); + xbt_mutex_release(send_win->mut_); //start recv smpi_mpi_start(rreq); //push request to sender's win - xbt_mutex_acquire(win->mut); - win->requests->push_back(rreq); - xbt_mutex_release(win->mut); + xbt_mutex_acquire(mut_); + requests_->push_back(rreq); + xbt_mutex_release(mut_); }else{ smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype); } @@ -226,43 +198,43 @@ int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datat } -int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win) +int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op) { - if(win->opened==0)//check that post/start has been done + if(opened_==0)//check that post/start has been done return MPI_ERR_WIN; //FIXME: local version //get receiver pointer - MPI_Win recv_win = win->connected_wins[target_rank]; + MPI_Win recv_win = connected_wins_[target_rank]; - void* recv_addr = static_cast(static_cast(recv_win->base) + target_disp * recv_win->disp_unit); + void* recv_addr = static_cast(static_cast(recv_win->base_) + target_disp * recv_win->disp_unit_); XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank); //As the tag will be used for ordering of the operations, add count to it //prepare send_request MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, - smpi_process_index(), win->comm->group()->index(target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op); + smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, comm_, op); //prepare receiver request MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, - smpi_process_index(), win->comm->group()->index(target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op); + smpi_process_index(), comm_->group()->index(target_rank), SMPI_RMA_TAG+3+count_, recv_win->comm_, op); - win->count++; + count_++; //push request to receiver's win - xbt_mutex_acquire(recv_win->mut); - recv_win->requests->push_back(rreq); - xbt_mutex_release(recv_win->mut); + xbt_mutex_acquire(recv_win->mut_); + recv_win->requests_->push_back(rreq); + xbt_mutex_release(recv_win->mut_); //start send smpi_mpi_start(sreq); //push request to sender's win - xbt_mutex_acquire(win->mut); - win->requests->push_back(sreq); - xbt_mutex_release(win->mut); + xbt_mutex_acquire(mut_); + requests_->push_back(sreq); + xbt_mutex_release(mut_); return MPI_SUCCESS; } -int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){ +int Win::start(MPI_Group group, int assert){ /* From MPI forum advices The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by @@ -296,13 +268,13 @@ int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){ smpi_mpi_request_free(&reqs[i]); } xbt_free(reqs); - win->opened++; //we're open for business ! - win->group=group; + opened_++; //we're open for business ! + group_=group; group->use(); return MPI_SUCCESS; } -int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){ +int Win::post(MPI_Group group, int assert){ //let's make a synchronous send here int i = 0; int j = 0; @@ -325,24 +297,24 @@ int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){ smpi_mpi_request_free(&reqs[i]); } xbt_free(reqs); - win->opened++; //we're open for business ! - win->group=group; + opened_++; //we're open for business ! + group_=group; group->use(); return MPI_SUCCESS; } -int smpi_mpi_win_complete(MPI_Win win){ - if(win->opened==0) +int Win::complete(){ + if(opened_==0) xbt_die("Complete called on already opened MPI_Win"); XBT_DEBUG("Entering MPI_Win_Complete"); int i = 0; int j = 0; - int size = win->group->size(); + int size = group_->size(); MPI_Request* reqs = xbt_new0(MPI_Request, size); while(j!=size){ - int dst=win->group->index(j); + int dst=group_->index(j); if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){ reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD); i++; @@ -360,8 +332,8 @@ int smpi_mpi_win_complete(MPI_Win win){ xbt_free(reqs); //now we can finish RMA calls - xbt_mutex_acquire(win->mut); - std::vector *reqqs = win->requests; + xbt_mutex_acquire(mut_); + std::vector *reqqs = requests_; size = static_cast(reqqs->size()); XBT_DEBUG("Win_complete - Finishing %d RMA calls", size); @@ -376,22 +348,22 @@ int smpi_mpi_win_complete(MPI_Win win){ smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE); reqqs->clear(); } - xbt_mutex_release(win->mut); + xbt_mutex_release(mut_); - win->group->unuse(); - win->opened--; //we're closed for business ! + group_->unuse(); + opened_--; //we're closed for business ! return MPI_SUCCESS; } -int smpi_mpi_win_wait(MPI_Win win){ +int Win::wait(){ //naive, blocking implementation. XBT_DEBUG("Entering MPI_Win_Wait"); int i=0,j=0; - int size = win->group->size(); + int size = group_->size(); MPI_Request* reqs = xbt_new0(MPI_Request, size); while(j!=size){ - int src=win->group->index(j); + int src=group_->index(j); if(src!=smpi_process_index() && src!=MPI_UNDEFINED){ reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD); i++; @@ -406,8 +378,8 @@ int smpi_mpi_win_wait(MPI_Win win){ smpi_mpi_request_free(&reqs[i]); } xbt_free(reqs); - xbt_mutex_acquire(win->mut); - std::vector *reqqs = win->requests; + xbt_mutex_acquire(mut_); + std::vector *reqqs = requests_; size = static_cast(reqqs->size()); XBT_DEBUG("Win_wait - Finishing %d RMA calls", size); @@ -422,9 +394,12 @@ int smpi_mpi_win_wait(MPI_Win win){ smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE); reqqs->clear(); } - xbt_mutex_release(win->mut); + xbt_mutex_release(mut_); - win->group->unuse(); - win->opened--; //we're opened for business ! + group_->unuse(); + opened_--; //we're opened for business ! return MPI_SUCCESS; } + +} +} diff --git a/src/smpi/smpi_win.hpp b/src/smpi/smpi_win.hpp new file mode 100644 index 0000000000..85ed320355 --- /dev/null +++ b/src/smpi/smpi_win.hpp @@ -0,0 +1,56 @@ +/* Copyright (c) 2010, 2013-2015. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SMPI_WIN_HPP_INCLUDED +#define SMPI_WIN_HPP_INCLUDED + +#include "private.h" +#include + +namespace simgrid{ +namespace smpi{ + +class Win { + private : + void* base_; + MPI_Aint size_; + int disp_unit_; + int assert_; + MPI_Info info_; + MPI_Comm comm_; + std::vector *requests_; + xbt_mutex_t mut_; + msg_bar_t bar_; + MPI_Win* connected_wins_; + char* name_; + int opened_; + MPI_Group group_; + int count_; //for ordering the accs + +public: + Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm); + ~Win(); + void get_name( char* name, int* length); + void get_group( MPI_Group* group); + void set_name( char* name); + int start(MPI_Group group, int assert); + int post(MPI_Group group, int assert); + int complete(); + int wait(); + int fence(int assert); + int put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype); + int get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype); + int accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op); +}; + + +} +} + +#endif diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index 02e408d59c..9552e15f20 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -222,7 +222,8 @@ set(SMPI_SRC src/smpi/smpi_mpi_dt.cpp src/smpi/smpi_pmpi.cpp src/smpi/smpi_replay.cpp - src/smpi/smpi_rma.cpp + src/smpi/smpi_win.cpp + src/smpi/smpi_win.hpp src/smpi/smpi_topo.cpp src/smpi/smpi_topo.hpp src/smpi/smpi_utils.cpp