2 /* Copyright (c) 2007-2015. The SimGrid Team.
3 * All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 xbt_bar_t creation_bar = NULL;
16 typedef struct s_smpi_mpi_win{
25 MPI_Win* connected_wins;
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
35 int comm_size = smpi_comm_size(comm);
36 int rank=smpi_comm_rank(comm);
37 XBT_DEBUG("Creating window");
39 win = xbt_new(s_smpi_mpi_win_t, 1);
42 win->disp_unit = disp_unit;
45 if(info!=MPI_INFO_NULL)
50 win->group = MPI_GROUP_NULL;
51 win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
52 win->connected_wins = xbt_new0(MPI_Win, comm_size);
53 win->connected_wins[rank] = win;
56 win->bar=xbt_barrier_init(comm_size);
58 mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win),
61 mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm);
63 mpi_coll_barrier_fun(comm);
68 int smpi_mpi_win_free( MPI_Win* win){
69 //As per the standard, perform a barrier to ensure every async comm is finished
70 xbt_barrier_wait((*win)->bar);
71 xbt_dynar_free(&(*win)->requests);
72 xbt_free((*win)->connected_wins);
73 if ((*win)->name != NULL){
74 xbt_free((*win)->name);
76 if((*win)->info!=MPI_INFO_NULL){
77 MPI_Info_free(&(*win)->info);
80 mpi_coll_barrier_fun((*win)->comm);
81 int rank=smpi_comm_rank((*win)->comm);
83 xbt_barrier_destroy((*win)->bar);
89 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
95 *length = strlen(win->name);
96 strcpy(name, win->name);
99 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
100 if(win->comm != MPI_COMM_NULL){
101 *group = smpi_comm_group(win->comm);
105 void smpi_mpi_win_set_name(MPI_Win win, char* name){
106 win->name = xbt_strdup(name);;
109 int smpi_mpi_win_fence( int assert, MPI_Win win){
110 XBT_DEBUG("Entering fence");
113 if(assert != MPI_MODE_NOPRECEDE){
114 xbt_barrier_wait(win->bar);
116 xbt_dynar_t reqs = win->requests;
117 int size = xbt_dynar_length(reqs);
120 // start all requests that have been prepared by another process
121 xbt_dynar_foreach(reqs, cpt, req){
122 if (req->flags & PREPARED) smpi_mpi_start(req);
125 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqs));
126 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
127 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
131 win->assert = assert;
133 xbt_barrier_wait(win->bar);
134 XBT_DEBUG("Leaving fence ");
139 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
140 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
142 if(!win->opened)//check that post/start has been done
144 //get receiver pointer
145 MPI_Win recv_win = win->connected_wins[target_rank];
147 void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
148 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
150 if(target_rank != smpi_comm_rank(win->comm)){
151 //prepare send_request
152 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(),
153 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
155 //prepare receiver request
156 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(),
157 smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
159 //push request to receiver's win
160 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
163 smpi_mpi_start(sreq);
165 //push request to sender's win
166 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
168 smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
174 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
175 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
177 if(!win->opened)//check that post/start has been done
180 MPI_Win send_win = win->connected_wins[target_rank];
182 void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
183 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
185 if(target_rank != smpi_comm_rank(win->comm)){
186 //prepare send_request
187 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
188 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm,
191 //prepare receiver request
192 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
193 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm,
196 //start the send, with another process than us as sender.
197 smpi_mpi_start(sreq);
199 //push request to receiver's win
200 xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
203 smpi_mpi_start(rreq);
205 //push request to sender's win
206 xbt_dynar_push_as(win->requests, MPI_Request, rreq);
208 smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
215 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
216 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
218 if(!win->opened)//check that post/start has been done
220 //FIXME: local version
221 //get receiver pointer
222 MPI_Win recv_win = win->connected_wins[target_rank];
224 void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
225 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
227 //prepare send_request
228 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
229 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
231 //prepare receiver request
232 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
233 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
234 //push request to receiver's win
235 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
237 smpi_mpi_start(sreq);
239 //push request to sender's win
240 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
245 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
246 /* From MPI forum advices
247 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
248 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
249 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
250 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
251 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
252 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
253 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
254 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
255 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
256 must complete, without further dependencies. */
258 //naive, blocking implementation.
260 int size = smpi_group_size(group);
261 MPI_Request* reqs = xbt_new0(MPI_Request, size);
263 // for(i=0;i<size;i++){
265 int src=smpi_group_index(group,j);
266 if(src!=smpi_process_index()){
267 reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
273 smpi_mpi_startall(size, reqs);
274 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
276 smpi_mpi_request_free(&reqs[i]);
279 win->opened++; //we're open for business !
281 smpi_group_use(group);
285 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
286 //let's make a synchronous send here
288 int size = smpi_group_size(group);
289 MPI_Request* reqs = xbt_new0(MPI_Request, size);
292 int dst=smpi_group_index(group,j);
293 if(dst!=smpi_process_index()){
294 reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+4, MPI_COMM_WORLD);
301 smpi_mpi_startall(size, reqs);
302 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
304 smpi_mpi_request_free(&reqs[i]);
307 win->opened++; //we're open for business !
309 smpi_group_use(group);
313 int smpi_mpi_win_complete(MPI_Win win){
315 xbt_die("Complete called on already opened MPI_Win");
316 // xbt_barrier_wait(win->bar);
317 //MPI_Comm comm = smpi_comm_new(win->group, NULL);
318 //mpi_coll_barrier_fun(comm);
319 //smpi_comm_destroy(comm);
321 XBT_DEBUG("Entering MPI_Win_Complete");
323 int size = smpi_group_size(win->group);
324 MPI_Request* reqs = xbt_new0(MPI_Request, size);
327 int dst=smpi_group_index(win->group,j);
328 if(dst!=smpi_process_index()){
329 reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD);
335 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
336 smpi_mpi_startall(size, reqs);
337 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
340 smpi_mpi_request_free(&reqs[i]);
344 //now we can finish RMA calls
346 xbt_dynar_t reqqs = win->requests;
347 size = xbt_dynar_length(reqqs);
349 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
352 // start all requests that have been prepared by another process
353 xbt_dynar_foreach(reqqs, cpt, req){
354 if (req->flags & PREPARED) smpi_mpi_start(req);
357 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
358 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
359 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
361 smpi_group_unuse(win->group);
362 win->opened--; //we're closed for business !
366 int smpi_mpi_win_wait(MPI_Win win){
367 // xbt_barrier_wait(win->bar);
368 //MPI_Comm comm = smpi_comm_new(win->group, NULL);
369 //mpi_coll_barrier_fun(comm);
370 //smpi_comm_destroy(comm);
371 //naive, blocking implementation.
372 XBT_DEBUG("Entering MPI_Win_Wait");
374 int size = smpi_group_size(win->group);
375 MPI_Request* reqs = xbt_new0(MPI_Request, size);
377 // for(i=0;i<size;i++){
379 int src=smpi_group_index(win->group,j);
380 if(src!=smpi_process_index()){
381 reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
387 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
388 smpi_mpi_startall(size, reqs);
389 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
391 smpi_mpi_request_free(&reqs[i]);
395 xbt_dynar_t reqqs = win->requests;
396 size = xbt_dynar_length(reqqs);
398 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
402 // start all requests that have been prepared by another process
403 xbt_dynar_foreach(reqqs, cpt, req){
404 if (req->flags & PREPARED) smpi_mpi_start(req);
407 MPI_Request* treqs = static_cast<MPI_Request*>(xbt_dynar_to_array(reqqs));
408 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
409 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
411 smpi_group_unuse(win->group);
412 win->opened--; //we're opened for business !