1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
21 int Win::keyval_id_=0;
23 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
24 int comm_size = comm->size();
26 XBT_DEBUG("Creating window");
27 if(info!=MPI_INFO_NULL)
31 group_ = MPI_GROUP_NULL;
32 requests_ = new std::vector<MPI_Request>();
33 mut_=xbt_mutex_init();
34 lock_mut_=xbt_mutex_init();
35 atomic_mut_=xbt_mutex_init();
36 connected_wins_ = new MPI_Win[comm_size];
37 connected_wins_[rank_] = this;
40 bar_ = MSG_barrier_init(comm_size);
44 comm->add_rma_win(this);
46 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
49 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
55 //As per the standard, perform a barrier to ensure every async comm is finished
56 MSG_barrier_wait(bar_);
58 int finished = finish_comms();
59 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
62 delete[] connected_wins_;
63 if (name_ != nullptr){
66 if(info_!=MPI_INFO_NULL){
67 MPI_Info_free(&info_);
70 comm_->remove_rma_win(this);
72 Colls::barrier(comm_);
74 MSG_barrier_destroy(bar_);
75 xbt_mutex_destroy(mut_);
76 xbt_mutex_destroy(lock_mut_);
77 xbt_mutex_destroy(atomic_mut_);
85 int Win::attach (void *base, MPI_Aint size){
86 if (not(base_ == MPI_BOTTOM || base_ == 0))
88 base_=0;//actually the address will be given in the RMA calls, as being the disp.
93 int Win::detach (void *base){
99 void Win::get_name(char* name, int* length){
105 *length = strlen(name_);
106 strncpy(name, name_, *length+1);
109 void Win::get_group(MPI_Group* group){
110 if(comm_ != MPI_COMM_NULL){
111 *group = comm_->group();
113 *group = MPI_GROUP_NULL;
117 MPI_Info Win::info(){
118 if(info_== MPI_INFO_NULL)
128 MPI_Aint Win::size(){
136 int Win::disp_unit(){
144 void Win::set_info(MPI_Info info){
145 if(info_!= MPI_INFO_NULL)
150 void Win::set_name(char* name){
151 name_ = xbt_strdup(name);
154 int Win::fence(int assert)
156 XBT_DEBUG("Entering fence");
159 if (assert != MPI_MODE_NOPRECEDE) {
160 // This is not the first fence => finalize what came before
161 MSG_barrier_wait(bar_);
162 xbt_mutex_acquire(mut_);
163 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
164 // Without this, the vector could get redimensionned when another process pushes.
165 // This would result in the array used by Request::waitall() to be invalidated.
166 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
167 std::vector<MPI_Request> *reqs = requests_;
168 int size = static_cast<int>(reqs->size());
169 // start all requests that have been prepared by another process
171 MPI_Request* treqs = &(*reqs)[0];
172 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
175 xbt_mutex_release(mut_);
178 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
182 MSG_barrier_wait(bar_);
183 XBT_DEBUG("Leaving fence");
188 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
189 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
191 //get receiver pointer
192 MPI_Win recv_win = connected_wins_[target_rank];
194 if(opened_==0){//check that post/start has been done
195 // no fence or start .. lock ok ?
197 for (auto const& it : recv_win->lockers_)
198 if (it == comm_->rank())
204 if(target_count*target_datatype->get_extent()>recv_win->size_)
207 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
208 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
210 if(target_rank != comm_->rank()){
211 //prepare send_request
212 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process()->index(),
213 comm_->group()->actor(target_rank)->getPid()-1, SMPI_RMA_TAG+1, comm_, MPI_OP_NULL);
215 //prepare receiver request
216 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, smpi_process()->index(),
217 comm_->group()->actor(target_rank)->getPid()-1, SMPI_RMA_TAG+1, recv_win->comm_, MPI_OP_NULL);
222 if(request!=nullptr){
225 xbt_mutex_acquire(mut_);
226 requests_->push_back(sreq);
227 xbt_mutex_release(mut_);
230 //push request to receiver's win
231 xbt_mutex_acquire(recv_win->mut_);
232 recv_win->requests_->push_back(rreq);
234 xbt_mutex_release(recv_win->mut_);
237 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
239 *request = MPI_REQUEST_NULL;
245 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
246 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
249 MPI_Win send_win = connected_wins_[target_rank];
251 if(opened_==0){//check that post/start has been done
252 // no fence or start .. lock ok ?
254 for (auto const& it : send_win->lockers_)
255 if (it == comm_->rank())
261 if(target_count*target_datatype->get_extent()>send_win->size_)
264 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
265 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
267 if(target_rank != comm_->rank()){
268 //prepare send_request
269 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
270 comm_->group()->actor(target_rank)->getPid()-1, smpi_process()->index(), SMPI_RMA_TAG+2, send_win->comm_,
273 //prepare receiver request
274 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
275 comm_->group()->actor(target_rank)->getPid()-1, smpi_process()->index(), SMPI_RMA_TAG+2, comm_,
278 //start the send, with another process than us as sender.
280 //push request to receiver's win
281 xbt_mutex_acquire(send_win->mut_);
282 send_win->requests_->push_back(sreq);
283 xbt_mutex_release(send_win->mut_);
288 if(request!=nullptr){
291 xbt_mutex_acquire(mut_);
292 requests_->push_back(rreq);
293 xbt_mutex_release(mut_);
297 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
299 *request=MPI_REQUEST_NULL;
306 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
307 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
309 XBT_DEBUG("Entering MPI_Win_Accumulate");
310 //get receiver pointer
311 MPI_Win recv_win = connected_wins_[target_rank];
313 if(opened_==0){//check that post/start has been done
314 // no fence or start .. lock ok ?
316 for (auto const& it : recv_win->lockers_)
317 if (it == comm_->rank())
322 //FIXME: local version
324 if(target_count*target_datatype->get_extent()>recv_win->size_)
327 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
328 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
329 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
330 //prepare send_request
332 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype,
333 smpi_process()->index(), comm_->group()->actor(target_rank)->getPid()-1, SMPI_RMA_TAG-3-count_, comm_, op);
335 //prepare receiver request
336 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype,
337 smpi_process()->index(), comm_->group()->actor(target_rank)->getPid()-1, SMPI_RMA_TAG-3-count_, recv_win->comm_, op);
343 //push request to receiver's win
344 xbt_mutex_acquire(recv_win->mut_);
345 recv_win->requests_->push_back(rreq);
347 xbt_mutex_release(recv_win->mut_);
349 if(request!=nullptr){
352 xbt_mutex_acquire(mut_);
353 requests_->push_back(sreq);
354 xbt_mutex_release(mut_);
357 XBT_DEBUG("Leaving MPI_Win_Accumulate");
361 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
362 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
363 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
366 MPI_Win send_win = connected_wins_[target_rank];
368 if(opened_==0){//check that post/start has been done
369 // no fence or start .. lock ok ?
371 for (auto const& it : send_win->lockers_)
372 if (it == comm_->rank())
378 if(target_count*target_datatype->get_extent()>send_win->size_)
381 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
382 //need to be sure ops are correctly ordered, so finish request here ? slow.
384 xbt_mutex_acquire(send_win->atomic_mut_);
385 get(result_addr, result_count, result_datatype, target_rank,
386 target_disp, target_count, target_datatype, &req);
387 if (req != MPI_REQUEST_NULL)
388 Request::wait(&req, MPI_STATUS_IGNORE);
390 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
391 target_disp, target_count, target_datatype, op, &req);
392 if (req != MPI_REQUEST_NULL)
393 Request::wait(&req, MPI_STATUS_IGNORE);
394 xbt_mutex_release(send_win->atomic_mut_);
399 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
400 void *result_addr, MPI_Datatype datatype, int target_rank,
401 MPI_Aint target_disp){
403 MPI_Win send_win = connected_wins_[target_rank];
405 if(opened_==0){//check that post/start has been done
406 // no fence or start .. lock ok ?
408 for (auto const& it : send_win->lockers_)
409 if (it == comm_->rank())
415 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
416 MPI_Request req = MPI_REQUEST_NULL;
417 xbt_mutex_acquire(send_win->atomic_mut_);
418 get(result_addr, 1, datatype, target_rank,
419 target_disp, 1, datatype, &req);
420 if (req != MPI_REQUEST_NULL)
421 Request::wait(&req, MPI_STATUS_IGNORE);
422 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
423 put(origin_addr, 1, datatype, target_rank,
424 target_disp, 1, datatype);
426 xbt_mutex_release(send_win->atomic_mut_);
430 int Win::start(MPI_Group group, int assert){
431 /* From MPI forum advices
432 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
433 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
434 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
435 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
436 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
437 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
438 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
439 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
440 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
441 must complete, without further dependencies. */
443 //naive, blocking implementation.
446 int size = group->size();
447 MPI_Request* reqs = xbt_new0(MPI_Request, size);
449 XBT_DEBUG("Entering MPI_Win_Start");
451 int src = group->actor(j)->getPid()-1;
452 if (src != smpi_process()->index() && src != MPI_UNDEFINED) {
453 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
459 Request::startall(size, reqs);
460 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
462 Request::unref(&reqs[i]);
465 opened_++; //we're open for business !
468 XBT_DEBUG("Leaving MPI_Win_Start");
472 int Win::post(MPI_Group group, int assert){
473 //let's make a synchronous send here
476 int size = group->size();
477 MPI_Request* reqs = xbt_new0(MPI_Request, size);
479 XBT_DEBUG("Entering MPI_Win_Post");
481 int dst=group->actor(j)->getPid()-1;
482 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
483 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
490 Request::startall(size, reqs);
491 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
493 Request::unref(&reqs[i]);
496 opened_++; //we're open for business !
499 XBT_DEBUG("Leaving MPI_Win_Post");
505 xbt_die("Complete called on already opened MPI_Win");
507 XBT_DEBUG("Entering MPI_Win_Complete");
510 int size = group_->size();
511 MPI_Request* reqs = xbt_new0(MPI_Request, size);
514 int dst=group_->actor(j)->getPid()-1;
515 if(dst!=smpi_process()->index() && dst!=MPI_UNDEFINED){
516 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
522 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
523 Request::startall(size, reqs);
524 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
527 Request::unref(&reqs[i]);
531 int finished = finish_comms();
532 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
534 Group::unref(group_);
535 opened_--; //we're closed for business !
540 //naive, blocking implementation.
541 XBT_DEBUG("Entering MPI_Win_Wait");
544 int size = group_->size();
545 MPI_Request* reqs = xbt_new0(MPI_Request, size);
548 int src=group_->actor(j)->getPid()-1;
549 if(src!=smpi_process()->index() && src!=MPI_UNDEFINED){
550 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
556 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
557 Request::startall(size, reqs);
558 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
560 Request::unref(&reqs[i]);
563 int finished = finish_comms();
564 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
566 Group::unref(group_);
567 opened_--; //we're opened for business !
571 int Win::lock(int lock_type, int rank, int assert){
572 MPI_Win target_win = connected_wins_[rank];
574 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
575 xbt_mutex_acquire(target_win->lock_mut_);
576 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
577 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
578 xbt_mutex_release(target_win->lock_mut_);
580 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
581 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
583 target_win->lockers_.push_back(comm_->rank());
585 int finished = finish_comms(rank);
586 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
587 finished = target_win->finish_comms(rank_);
588 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
592 int Win::lock_all(int assert){
594 int retval = MPI_SUCCESS;
595 for (i=0; i<comm_->size();i++){
596 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
597 if(ret != MPI_SUCCESS)
603 int Win::unlock(int rank){
604 MPI_Win target_win = connected_wins_[rank];
605 int target_mode = target_win->mode_;
606 target_win->mode_= 0;
607 target_win->lockers_.remove(comm_->rank());
608 if (target_mode==MPI_LOCK_EXCLUSIVE){
609 xbt_mutex_release(target_win->lock_mut_);
612 int finished = finish_comms(rank);
613 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
614 finished = target_win->finish_comms(rank_);
615 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
619 int Win::unlock_all(){
621 int retval = MPI_SUCCESS;
622 for (i=0; i<comm_->size();i++){
623 int ret = this->unlock(i);
624 if(ret != MPI_SUCCESS)
630 int Win::flush(int rank){
631 MPI_Win target_win = connected_wins_[rank];
632 int finished = finish_comms(rank);
633 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
634 finished = target_win->finish_comms(rank_);
635 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
639 int Win::flush_local(int rank){
640 int finished = finish_comms(rank);
641 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
645 int Win::flush_all(){
648 finished = finish_comms();
649 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
650 for (i=0; i<comm_->size();i++){
651 finished = connected_wins_[i]->finish_comms(rank_);
652 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
657 int Win::flush_local_all(){
658 int finished = finish_comms();
659 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
663 Win* Win::f2c(int id){
664 return static_cast<Win*>(F2C::f2c(id));
668 int Win::finish_comms(){
669 xbt_mutex_acquire(mut_);
670 //Finish own requests
671 std::vector<MPI_Request> *reqqs = requests_;
672 int size = static_cast<int>(reqqs->size());
674 MPI_Request* treqs = &(*reqqs)[0];
675 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
678 xbt_mutex_release(mut_);
682 int Win::finish_comms(int rank){
683 xbt_mutex_acquire(mut_);
684 //Finish own requests
685 std::vector<MPI_Request> *reqqs = requests_;
686 int size = static_cast<int>(reqqs->size());
689 std::vector<MPI_Request> myreqqs;
690 std::vector<MPI_Request>::iterator iter = reqqs->begin();
691 while (iter != reqqs->end()){
692 if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
693 myreqqs.push_back(*iter);
694 iter = reqqs->erase(iter);
701 MPI_Request* treqs = &myreqqs[0];
702 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
706 xbt_mutex_release(mut_);