1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/smpi/private.h"
7 #include "src/smpi/smpi_coll.hpp"
8 #include "src/smpi/smpi_comm.hpp"
9 #include "src/smpi/smpi_datatype.hpp"
10 #include "src/smpi/smpi_info.hpp"
11 #include "src/smpi/smpi_keyvals.hpp"
12 #include "src/smpi/smpi_process.hpp"
13 #include "src/smpi/smpi_request.hpp"
14 #include "src/smpi/smpi_win.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
21 int Win::keyval_id_=0;
23 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
24 int comm_size = comm->size();
26 XBT_DEBUG("Creating window");
27 if(info!=MPI_INFO_NULL)
31 group_ = MPI_GROUP_NULL;
32 requests_ = new std::vector<MPI_Request>();
33 mut_=xbt_mutex_init();
34 lock_mut_=xbt_mutex_init();
35 atomic_mut_=xbt_mutex_init();
36 connected_wins_ = new MPI_Win[comm_size];
37 connected_wins_[rank_] = this;
40 bar_ = MSG_barrier_init(comm_size);
44 comm->add_rma_win(this);
46 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
49 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
55 //As per the standard, perform a barrier to ensure every async comm is finished
56 MSG_barrier_wait(bar_);
58 int finished = finish_comms();
59 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
62 delete[] connected_wins_;
63 if (name_ != nullptr){
66 if(info_!=MPI_INFO_NULL){
67 MPI_Info_free(&info_);
70 comm_->remove_rma_win(this);
72 Colls::barrier(comm_);
73 int rank=comm_->rank();
75 MSG_barrier_destroy(bar_);
76 xbt_mutex_destroy(mut_);
77 xbt_mutex_destroy(lock_mut_);
78 xbt_mutex_destroy(atomic_mut_);
86 int Win::attach (void *base, MPI_Aint size){
87 if (not(base_ == MPI_BOTTOM || base_ == 0))
89 base_=0;//actually the address will be given in the RMA calls, as being the disp.
94 int Win::detach (void *base){
100 void Win::get_name(char* name, int* length){
106 *length = strlen(name_);
107 strncpy(name, name_, *length+1);
110 void Win::get_group(MPI_Group* group){
111 if(comm_ != MPI_COMM_NULL){
112 *group = comm_->group();
114 *group = MPI_GROUP_NULL;
118 MPI_Info Win::info(){
119 if(info_== MPI_INFO_NULL)
129 MPI_Aint Win::size(){
137 int Win::disp_unit(){
145 void Win::set_info(MPI_Info info){
146 if(info_!= MPI_INFO_NULL)
151 void Win::set_name(char* name){
152 name_ = xbt_strdup(name);
155 int Win::fence(int assert)
157 XBT_DEBUG("Entering fence");
160 if (assert != MPI_MODE_NOPRECEDE) {
161 // This is not the first fence => finalize what came before
162 MSG_barrier_wait(bar_);
163 xbt_mutex_acquire(mut_);
164 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
165 // Without this, the vector could get redimensionned when another process pushes.
166 // This would result in the array used by Request::waitall() to be invalidated.
167 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
168 std::vector<MPI_Request> *reqs = requests_;
169 int size = static_cast<int>(reqs->size());
170 // start all requests that have been prepared by another process
172 MPI_Request* treqs = &(*reqs)[0];
173 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
176 xbt_mutex_release(mut_);
179 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
183 MSG_barrier_wait(bar_);
184 XBT_DEBUG("Leaving fence");
189 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
190 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
192 //get receiver pointer
193 MPI_Win recv_win = connected_wins_[target_rank];
195 if(opened_==0){//check that post/start has been done
196 // no fence or start .. lock ok ?
198 for(auto it : recv_win->lockers_)
199 if (it == comm_->rank())
205 if(target_count*target_datatype->get_extent()>recv_win->size_)
208 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
209 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
211 if(target_rank != comm_->rank()){
212 //prepare send_request
213 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
214 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
216 //prepare receiver request
217 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
218 comm_->group()->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
223 if(request!=nullptr){
226 xbt_mutex_acquire(mut_);
227 requests_->push_back(sreq);
228 xbt_mutex_release(mut_);
231 //push request to receiver's win
232 xbt_mutex_acquire(recv_win->mut_);
233 recv_win->requests_->push_back(rreq);
235 xbt_mutex_release(recv_win->mut_);
238 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
240 *request = MPI_REQUEST_NULL;
246 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
247 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
250 MPI_Win send_win = connected_wins_[target_rank];
252 if(opened_==0){//check that post/start has been done
253 // no fence or start .. lock ok ?
255 for(auto it : send_win->lockers_)
256 if (it == comm_->rank())
262 if(target_count*target_datatype->get_extent()>send_win->size_)
265 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
266 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
268 if(target_rank != comm_->rank()){
269 //prepare send_request
270 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
271 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
274 //prepare receiver request
275 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
276 comm_->group()->index(target_rank), smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
279 //start the send, with another process than us as sender.
281 //push request to receiver's win
282 xbt_mutex_acquire(send_win->mut_);
283 send_win->requests_->push_back(sreq);
284 xbt_mutex_release(send_win->mut_);
289 if(request!=nullptr){
292 xbt_mutex_acquire(mut_);
293 requests_->push_back(rreq);
294 xbt_mutex_release(mut_);
298 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
300 *request=MPI_REQUEST_NULL;
307 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
308 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
311 //get receiver pointer
312 MPI_Win recv_win = connected_wins_[target_rank];
314 if(opened_==0){//check that post/start has been done
315 // no fence or start .. lock ok ?
317 for(auto it : recv_win->lockers_)
318 if (it == comm_->rank())
323 //FIXME: local version
325 if(target_count*target_datatype->get_extent()>recv_win->size_)
328 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
329 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
330 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
331 //prepare send_request
333 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
334 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, comm_, op);
336 //prepare receiver request
337 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
338 smpi_process()->index(), comm_->group()->index(target_rank), SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
344 //push request to receiver's win
345 xbt_mutex_acquire(recv_win->mut_);
346 recv_win->requests_->push_back(rreq);
348 xbt_mutex_release(recv_win->mut_);
350 if(request!=nullptr){
353 xbt_mutex_acquire(mut_);
354 requests_->push_back(sreq);
355 xbt_mutex_release(mut_);
361 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
362 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
363 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
366 MPI_Win send_win = connected_wins_[target_rank];
368 if(opened_==0){//check that post/start has been done
369 // no fence or start .. lock ok ?
371 for(auto it : send_win->lockers_)
372 if (it == comm_->rank())
378 if(target_count*target_datatype->get_extent()>send_win->size_)
381 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
382 //need to be sure ops are correctly ordered, so finish request here ? slow.
384 xbt_mutex_acquire(send_win->atomic_mut_);
385 get(result_addr, result_count, result_datatype, target_rank,
386 target_disp, target_count, target_datatype, &req);
387 if (req != MPI_REQUEST_NULL)
388 Request::wait(&req, MPI_STATUS_IGNORE);
390 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
391 target_disp, target_count, target_datatype, op, &req);
392 if (req != MPI_REQUEST_NULL)
393 Request::wait(&req, MPI_STATUS_IGNORE);
394 xbt_mutex_release(send_win->atomic_mut_);
399 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
400 void *result_addr, MPI_Datatype datatype, int target_rank,
401 MPI_Aint target_disp){
403 MPI_Win send_win = connected_wins_[target_rank];
405 if(opened_==0){//check that post/start has been done
406 // no fence or start .. lock ok ?
408 for(auto it : send_win->lockers_)
409 if (it == comm_->rank())
415 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
417 xbt_mutex_acquire(send_win->atomic_mut_);
418 get(result_addr, 1, datatype, target_rank,
419 target_disp, 1, datatype, &req);
420 if (req != MPI_REQUEST_NULL)
421 Request::wait(&req, MPI_STATUS_IGNORE);
422 if(! memcmp (result_addr, compare_addr, datatype->get_extent() )){
423 put(origin_addr, 1, datatype, target_rank,
424 target_disp, 1, datatype);
426 xbt_mutex_release(send_win->atomic_mut_);
430 int Win::start(MPI_Group group, int assert){
431 /* From MPI forum advices
432 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
433 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
434 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
435 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
436 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
437 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
438 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
439 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
440 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
441 must complete, without further dependencies. */
443 //naive, blocking implementation.
446 int size = group->size();
447 MPI_Request* reqs = xbt_new0(MPI_Request, size);
450 int src = group->index(j);
451 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
452 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
458 Request::startall(size, reqs);
459 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
461 Request::unref(&reqs[i]);
464 opened_++; //we're open for business !
470 int Win::post(MPI_Group group, int assert){
471 //let's make a synchronous send here
474 int size = group->size();
475 MPI_Request* reqs = xbt_new0(MPI_Request, size);
478 int dst=group->index(j);
479 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
480 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
487 Request::startall(size, reqs);
488 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
490 Request::unref(&reqs[i]);
493 opened_++; //we're open for business !
501 xbt_die("Complete called on already opened MPI_Win");
503 XBT_DEBUG("Entering MPI_Win_Complete");
506 int size = group_->size();
507 MPI_Request* reqs = xbt_new0(MPI_Request, size);
510 int dst=group_->index(j);
511 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
512 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
518 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
519 Request::startall(size, reqs);
520 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
523 Request::unref(&reqs[i]);
527 int finished = finish_comms();
528 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
530 Group::unref(group_);
531 opened_--; //we're closed for business !
536 //naive, blocking implementation.
537 XBT_DEBUG("Entering MPI_Win_Wait");
540 int size = group_->size();
541 MPI_Request* reqs = xbt_new0(MPI_Request, size);
544 int src=group_->index(j);
545 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
546 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
552 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
553 Request::startall(size, reqs);
554 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
556 Request::unref(&reqs[i]);
559 int finished = finish_comms();
560 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
562 Group::unref(group_);
563 opened_--; //we're opened for business !
567 int Win::lock(int lock_type, int rank, int assert){
568 MPI_Win target_win = connected_wins_[rank];
570 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
571 xbt_mutex_acquire(target_win->lock_mut_);
572 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
573 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
574 xbt_mutex_release(target_win->lock_mut_);
576 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
577 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
579 target_win->lockers_.push_back(comm_->rank());
581 int finished = finish_comms(rank);
582 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
583 finished = target_win->finish_comms(rank_);
584 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
588 int Win::lock_all(int assert){
590 int retval = MPI_SUCCESS;
591 for (i=0; i<comm_->size();i++){
592 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
593 if(ret != MPI_SUCCESS)
599 int Win::unlock(int rank){
600 MPI_Win target_win = connected_wins_[rank];
601 int target_mode = target_win->mode_;
602 target_win->mode_= 0;
603 target_win->lockers_.remove(comm_->rank());
604 if (target_mode==MPI_LOCK_EXCLUSIVE){
605 xbt_mutex_release(target_win->lock_mut_);
608 int finished = finish_comms(rank);
609 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
610 finished = target_win->finish_comms(rank_);
611 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
615 int Win::unlock_all(){
617 int retval = MPI_SUCCESS;
618 for (i=0; i<comm_->size();i++){
619 int ret = this->unlock(i);
620 if(ret != MPI_SUCCESS)
626 int Win::flush(int rank){
627 MPI_Win target_win = connected_wins_[rank];
628 int finished = finish_comms(rank);
629 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
630 finished = target_win->finish_comms(rank_);
631 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
635 int Win::flush_local(int rank){
636 int finished = finish_comms(rank);
637 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
641 int Win::flush_all(){
644 finished = finish_comms();
645 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
646 for (i=0; i<comm_->size();i++){
647 finished = connected_wins_[i]->finish_comms(rank_);
648 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
653 int Win::flush_local_all(){
654 int finished = finish_comms();
655 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
659 Win* Win::f2c(int id){
660 return static_cast<Win*>(F2C::f2c(id));
664 int Win::finish_comms(){
665 xbt_mutex_acquire(mut_);
666 //Finish own requests
667 std::vector<MPI_Request> *reqqs = requests_;
668 int size = static_cast<int>(reqqs->size());
670 MPI_Request* treqs = &(*reqqs)[0];
671 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
674 xbt_mutex_release(mut_);
678 int Win::finish_comms(int rank){
679 xbt_mutex_acquire(mut_);
680 //Finish own requests
681 std::vector<MPI_Request> *reqqs = requests_;
682 int size = static_cast<int>(reqqs->size());
685 std::vector<MPI_Request>* myreqqs = new std::vector<MPI_Request>();
686 std::vector<MPI_Request>::iterator iter = reqqs->begin();
687 while (iter != reqqs->end()){
688 if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
689 myreqqs->push_back(*iter);
690 iter = reqqs->erase(iter);
697 MPI_Request* treqs = &(*myreqqs)[0];
698 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
703 xbt_mutex_release(mut_);