1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
12 msg_bar_t creation_bar = nullptr;
14 typedef struct s_smpi_mpi_win{
21 std::vector<MPI_Request> *requests;
23 MPI_Win* connected_wins;
30 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
33 int comm_size = smpi_comm_size(comm);
34 int rank=smpi_comm_rank(comm);
35 XBT_DEBUG("Creating window");
37 win = xbt_new(s_smpi_mpi_win_t, 1);
40 win->disp_unit = disp_unit;
43 if(info!=MPI_INFO_NULL)
48 win->group = MPI_GROUP_NULL;
49 win->requests = new std::vector<MPI_Request>();
50 win->connected_wins = xbt_new0(MPI_Win, comm_size);
51 win->connected_wins[rank] = win;
54 win->bar = MSG_barrier_init(comm_size);
56 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
59 mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
61 mpi_coll_barrier_fun(comm);
66 int smpi_mpi_win_free( MPI_Win* win){
67 //As per the standard, perform a barrier to ensure every async comm is finished
68 MSG_barrier_wait((*win)->bar);
69 delete (*win)->requests;
70 xbt_free((*win)->connected_wins);
71 if ((*win)->name != nullptr){
72 xbt_free((*win)->name);
74 if((*win)->info!=MPI_INFO_NULL){
75 MPI_Info_free(&(*win)->info);
78 mpi_coll_barrier_fun((*win)->comm);
79 int rank=smpi_comm_rank((*win)->comm);
81 MSG_barrier_destroy((*win)->bar);
87 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
88 if(win->name==nullptr){
93 *length = strlen(win->name);
94 strncpy(name, win->name, *length+1);
97 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
98 if(win->comm != MPI_COMM_NULL){
99 *group = smpi_comm_group(win->comm);
103 void smpi_mpi_win_set_name(MPI_Win win, char* name){
104 win->name = xbt_strdup(name);
107 int smpi_mpi_win_fence( int assert, MPI_Win win){
108 XBT_DEBUG("Entering fence");
111 if(assert != MPI_MODE_NOPRECEDE){
112 MSG_barrier_wait(win->bar);
114 std::vector<MPI_Request> *reqs = win->requests;
115 int size = static_cast<int>(reqs->size());
116 // start all requests that have been prepared by another process
117 for(auto req: *reqs){
118 if (req && (req->flags & PREPARED))
122 MPI_Request* treqs = &(*reqs)[0];
123 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
125 win->assert = assert;
127 MSG_barrier_wait(win->bar);
128 XBT_DEBUG("Leaving fence ");
133 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
134 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
136 if(win->opened==0)//check that post/start has been done
138 //get receiver pointer
139 MPI_Win recv_win = win->connected_wins[target_rank];
141 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
142 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
144 if(target_rank != smpi_comm_rank(win->comm)){
145 //prepare send_request
146 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
147 smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL);
149 //prepare receiver request
150 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
151 smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
153 //push request to receiver's win
154 recv_win->requests->push_back(rreq);
157 smpi_mpi_start(sreq);
159 //push request to sender's win
160 win->requests->push_back(sreq);
162 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
168 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
169 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
171 if(win->opened==0)//check that post/start has been done
174 MPI_Win send_win = win->connected_wins[target_rank];
176 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base) + target_disp * send_win->disp_unit);
177 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
179 if(target_rank != smpi_comm_rank(win->comm)){
180 //prepare send_request
181 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
182 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm,
185 //prepare receiver request
186 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
187 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm,
190 //start the send, with another process than us as sender.
191 smpi_mpi_start(sreq);
193 //push request to receiver's win
194 send_win->requests->push_back(sreq);
197 smpi_mpi_start(rreq);
199 //push request to sender's win
200 win->requests->push_back(rreq);
202 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
209 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
210 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
212 if(win->opened==0)//check that post/start has been done
214 //FIXME: local version
215 //get receiver pointer
216 MPI_Win recv_win = win->connected_wins[target_rank];
218 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base) + target_disp * recv_win->disp_unit);
219 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
221 //prepare send_request
222 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
223 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, win->comm, op);
225 //prepare receiver request
226 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
227 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, recv_win->comm, op);
228 //push request to receiver's win
229 recv_win->requests->push_back(rreq);
231 smpi_mpi_start(sreq);
233 //push request to sender's win
234 win->requests->push_back(sreq);
239 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
240 /* From MPI forum advices
241 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
242 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
243 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
244 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
245 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
246 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
247 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
248 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
249 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
250 must complete, without further dependencies. */
252 //naive, blocking implementation.
254 int size = smpi_group_size(group);
255 MPI_Request* reqs = xbt_new0(MPI_Request, size);
258 int src=smpi_group_index(group,j);
259 if(src!=smpi_process_index()){
260 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD);
266 smpi_mpi_startall(size, reqs);
267 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
269 smpi_mpi_request_free(&reqs[i]);
272 win->opened++; //we're open for business !
274 smpi_group_use(group);
278 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
279 //let's make a synchronous send here
281 int size = smpi_group_size(group);
282 MPI_Request* reqs = xbt_new0(MPI_Request, size);
285 int dst=smpi_group_index(group,j);
286 if(dst!=smpi_process_index()){
287 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
294 smpi_mpi_startall(size, reqs);
295 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
297 smpi_mpi_request_free(&reqs[i]);
300 win->opened++; //we're open for business !
302 smpi_group_use(group);
306 int smpi_mpi_win_complete(MPI_Win win){
308 xbt_die("Complete called on already opened MPI_Win");
310 XBT_DEBUG("Entering MPI_Win_Complete");
312 int size = smpi_group_size(win->group);
313 MPI_Request* reqs = xbt_new0(MPI_Request, size);
316 int dst=smpi_group_index(win->group,j);
317 if(dst!=smpi_process_index()){
318 reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
324 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
325 smpi_mpi_startall(size, reqs);
326 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
329 smpi_mpi_request_free(&reqs[i]);
333 //now we can finish RMA calls
335 std::vector<MPI_Request> *reqqs = win->requests;
336 size = static_cast<int>(reqqs->size());
338 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
339 // start all requests that have been prepared by another process
340 for (auto req: *reqqs){
341 if (req && (req->flags & PREPARED))
345 MPI_Request* treqs = &(*reqqs)[0];
346 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
348 smpi_group_unuse(win->group);
349 win->opened--; //we're closed for business !
353 int smpi_mpi_win_wait(MPI_Win win){
354 //naive, blocking implementation.
355 XBT_DEBUG("Entering MPI_Win_Wait");
357 int size = smpi_group_size(win->group);
358 MPI_Request* reqs = xbt_new0(MPI_Request, size);
361 int src=smpi_group_index(win->group,j);
362 if(src!=smpi_process_index()){
363 reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
369 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
370 smpi_mpi_startall(size, reqs);
371 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
373 smpi_mpi_request_free(&reqs[i]);
377 std::vector<MPI_Request> *reqqs = win->requests;
378 size = static_cast<int>(reqqs->size());
380 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
382 // start all requests that have been prepared by another process
383 for(auto req: *reqqs){
384 if (req && (req->flags & PREPARED))
388 MPI_Request* treqs = &(*reqqs)[0];
389 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
391 smpi_group_unuse(win->group);
392 win->opened--; //we're opened for business !