2 /* Copyright (c) 2007-2015. The SimGrid Team.
3 * All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
14 xbt_bar_t creation_bar = NULL;
16 typedef struct s_smpi_mpi_win{
25 MPI_Win* connected_wins;
32 MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){
36 int comm_size = smpi_comm_size(comm);
37 int rank=smpi_comm_rank(comm);
38 XBT_DEBUG("Creating window");
40 win = xbt_new(s_smpi_mpi_win_t, 1);
43 win->disp_unit = disp_unit;
46 if(info!=MPI_INFO_NULL)
51 win->group = MPI_GROUP_NULL;
52 win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL);
53 win->connected_wins = xbt_malloc0(comm_size*sizeof(MPI_Win));
54 win->connected_wins[rank] = win;
57 win->bar=xbt_barrier_init(comm_size);
60 mpi_coll_allgather_fun(&(win->connected_wins[rank]),
68 mpi_coll_bcast_fun( &(win->bar),
74 mpi_coll_barrier_fun(comm);
79 int smpi_mpi_win_free( MPI_Win* win){
81 //As per the standard, perform a barrier to ensure every async comm is finished
82 xbt_barrier_wait((*win)->bar);
83 xbt_dynar_free(&(*win)->requests);
84 xbt_free((*win)->connected_wins);
85 if ((*win)->name != NULL){
86 xbt_free((*win)->name);
88 if((*win)->info!=MPI_INFO_NULL){
89 MPI_Info_free(&(*win)->info);
96 void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){
102 *length = strlen(win->name);
103 strcpy(name, win->name);
106 void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){
107 if(win->comm != MPI_COMM_NULL){
108 *group = smpi_comm_group(win->comm);
109 smpi_group_use(*group);
113 void smpi_mpi_win_set_name(MPI_Win win, char* name){
114 win->name = xbt_strdup(name);;
118 int smpi_mpi_win_fence( int assert, MPI_Win win){
120 XBT_DEBUG("Entering fence");
123 if(assert != MPI_MODE_NOPRECEDE){
124 xbt_barrier_wait(win->bar);
126 xbt_dynar_t reqs = win->requests;
127 int size = xbt_dynar_length(reqs);
130 // start all requests that have been prepared by another process
131 xbt_dynar_foreach(reqs, cpt, req){
132 if (req->flags & PREPARED) smpi_mpi_start(req);
135 MPI_Request* treqs = xbt_dynar_to_array(reqs);
136 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
138 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
141 win->assert = assert;
143 xbt_barrier_wait(win->bar);
144 XBT_DEBUG("Leaving fence ");
149 int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
150 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
153 if(!win->opened)//check that post/start has been done
155 //get receiver pointer
156 MPI_Win recv_win = win->connected_wins[target_rank];
158 void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
159 smpi_datatype_use(origin_datatype);
160 smpi_datatype_use(target_datatype);
161 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
163 if(target_rank != smpi_comm_rank(win->comm)){
164 //prepare send_request
165 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
166 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL);
168 //prepare receiver request
169 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
170 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL);
172 //push request to receiver's win
173 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
176 smpi_mpi_start(sreq);
178 //push request to sender's win
179 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
181 smpi_datatype_copy(origin_addr, origin_count, origin_datatype,
182 recv_addr, target_count, target_datatype);
188 int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
189 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
191 if(!win->opened)//check that post/start has been done
194 MPI_Win send_win = win->connected_wins[target_rank];
196 void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit);
197 smpi_datatype_use(origin_datatype);
198 smpi_datatype_use(target_datatype);
199 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
201 if(target_rank != smpi_comm_rank(win->comm)){
202 //prepare send_request
203 MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype,
204 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm, MPI_OP_NULL);
206 //prepare receiver request
207 MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype,
208 smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm, MPI_OP_NULL);
210 //start the send, with another process than us as sender.
211 smpi_mpi_start(sreq);
213 //push request to receiver's win
214 xbt_dynar_push_as(send_win->requests, MPI_Request, sreq);
217 smpi_mpi_start(rreq);
219 //push request to sender's win
220 xbt_dynar_push_as(win->requests, MPI_Request, rreq);
222 smpi_datatype_copy(send_addr, target_count, target_datatype,
223 origin_addr, origin_count, origin_datatype);
230 int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
231 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)
233 if(!win->opened)//check that post/start has been done
235 //FIXME: local version
236 //get receiver pointer
237 MPI_Win recv_win = win->connected_wins[target_rank];
239 void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit);
240 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
242 smpi_datatype_use(origin_datatype);
243 smpi_datatype_use(target_datatype);
246 //prepare send_request
247 MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype,
248 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op);
250 //prepare receiver request
251 MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype,
252 smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op);
253 //push request to receiver's win
254 xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq);
256 smpi_mpi_start(sreq);
258 //push request to sender's win
259 xbt_dynar_push_as(win->requests, MPI_Request, sreq);
266 int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){
267 /* From MPI forum advices
268 The call to MPI_WIN_COMPLETE does not return until the put call has completed at
269 the origin; and the target window will be accessed by the put operation only
270 after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by the target
271 process. This still leaves much choice to implementors. The call to
272 MPI_WIN_START can block until the matching call to MPI_WIN_POST occurs at all
273 target processes. One can also have implementations where the call to
274 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching
275 call to MPI_WIN_POST occurred; or implementations where the first two calls are
276 nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call to
277 MPI_WIN_POST occurred; or even implementations where all three calls can
278 complete before any target process called MPI_WIN_POST --- the data put must be
279 buffered, in this last case, so as to allow the put to complete at the origin
280 ahead of its completion at the target. However, once the call to MPI_WIN_POST is
281 issued, the sequence above must complete, without further dependencies.
284 //naive, blocking implementation.
286 int size = smpi_group_size(group);
287 MPI_Request* reqs = xbt_new0(MPI_Request, size);
289 // for(i=0;i<size;i++){
291 int src=smpi_group_index(group,j);
292 if(src!=smpi_process_index()){
293 reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+4, MPI_COMM_WORLD);
299 smpi_mpi_startall(size, reqs);
300 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
302 smpi_mpi_request_free(&reqs[i]);
305 win->opened++; //we're open for business !
307 smpi_group_use(group);
311 int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){
312 //let's make a synchronous send here
314 int size = smpi_group_size(group);
315 MPI_Request* reqs = xbt_new0(MPI_Request, size);
318 int dst=smpi_group_index(group,j);
319 if(dst!=smpi_process_index()){
320 reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst,
321 RMA_TAG+4, MPI_COMM_WORLD);
328 smpi_mpi_startall(size, reqs);
329 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
331 smpi_mpi_request_free(&reqs[i]);
334 win->opened++; //we're open for business !
336 smpi_group_use(group);
340 int smpi_mpi_win_complete(MPI_Win win){
342 xbt_die("Complete called on already opened MPI_Win");
343 // xbt_barrier_wait(win->bar);
344 //MPI_Comm comm = smpi_comm_new(win->group, NULL);
345 //mpi_coll_barrier_fun(comm);
346 //smpi_comm_destroy(comm);
348 XBT_DEBUG("Entering MPI_Win_Complete");
350 int size = smpi_group_size(win->group);
351 MPI_Request* reqs = xbt_new0(MPI_Request, size);
354 int dst=smpi_group_index(win->group,j);
355 if(dst!=smpi_process_index()){
356 reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst,
357 RMA_TAG+5, MPI_COMM_WORLD);
363 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
364 smpi_mpi_startall(size, reqs);
365 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
368 smpi_mpi_request_free(&reqs[i]);
372 //now we can finish RMA calls
374 xbt_dynar_t reqqs = win->requests;
375 size = xbt_dynar_length(reqqs);
377 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
380 // start all requests that have been prepared by another process
381 xbt_dynar_foreach(reqqs, cpt, req){
382 if (req->flags & PREPARED) smpi_mpi_start(req);
385 MPI_Request* treqs = xbt_dynar_to_array(reqqs);
386 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
388 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
389 win->opened--; //we're closed for business !
395 int smpi_mpi_win_wait(MPI_Win win){
396 // xbt_barrier_wait(win->bar);
397 //MPI_Comm comm = smpi_comm_new(win->group, NULL);
398 //mpi_coll_barrier_fun(comm);
399 //smpi_comm_destroy(comm);
400 //naive, blocking implementation.
401 XBT_DEBUG("Entering MPI_Win_Wait");
403 int size = smpi_group_size(win->group);
404 MPI_Request* reqs = xbt_new0(MPI_Request, size);
406 // for(i=0;i<size;i++){
408 int src=smpi_group_index(win->group,j);
409 if(src!=smpi_process_index()){
410 reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD);
416 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
417 smpi_mpi_startall(size, reqs);
418 smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE);
420 smpi_mpi_request_free(&reqs[i]);
424 xbt_dynar_t reqqs = win->requests;
425 size = xbt_dynar_length(reqqs);
427 XBT_DEBUG("Win_complete - Finishing %d RMA calls", size);
431 // start all requests that have been prepared by another process
432 xbt_dynar_foreach(reqqs, cpt, req){
433 if (req->flags & PREPARED) smpi_mpi_start(req);
436 MPI_Request* treqs = xbt_dynar_to_array(reqqs);
437 smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE);
439 win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL);
440 win->opened--; //we're opened for business !