1 /* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
28 , disp_unit_(disp_unit)
32 , allocated_(allocated)
35 XBT_DEBUG("Creating window");
36 if(info!=MPI_INFO_NULL)
38 int comm_size = comm->size();
41 group_ = MPI_GROUP_NULL;
42 requests_ = new std::vector<MPI_Request>();
43 mut_ = s4u::Mutex::create();
44 lock_mut_ = s4u::Mutex::create();
45 atomic_mut_ = s4u::Mutex::create();
46 connected_wins_ = new MPI_Win[comm_size];
47 connected_wins_[rank_] = this;
50 bar_ = new s4u::Barrier(comm_size);
53 errhandler_=MPI_ERRORS_ARE_FATAL;
55 comm->add_rma_win(this);
58 colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
61 colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67 //As per the standard, perform a barrier to ensure every async comm is finished
70 int finished = finish_comms();
71 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
74 delete[] connected_wins_;
75 if (name_ != nullptr){
78 if (info_ != MPI_INFO_NULL)
79 simgrid::smpi::Info::unref(info_);
80 if (errhandler_ != MPI_ERRHANDLER_NULL)
81 simgrid::smpi::Errhandler::unref(errhandler_);
83 comm_->remove_rma_win(this);
85 colls::barrier(comm_);
97 int Win::attach(void* /*base*/, MPI_Aint size)
99 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
101 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
106 int Win::detach(const void* /*base*/)
113 void Win::get_name(char* name, int* length) const
120 *length = strlen(name_);
121 strncpy(name, name_, *length+1);
124 void Win::get_group(MPI_Group* group){
125 if(comm_ != MPI_COMM_NULL){
126 *group = comm_->group();
128 *group = MPI_GROUP_NULL;
134 if (info_ == MPI_INFO_NULL)
140 int Win::rank() const
145 MPI_Aint Win::size() const
150 void* Win::base() const
155 int Win::disp_unit() const
160 int Win::dynamic() const
165 void Win::set_info(MPI_Info info)
167 if (info_ != MPI_INFO_NULL)
168 simgrid::smpi::Info::unref(info_);
170 if (info_ != MPI_INFO_NULL)
174 void Win::set_name(const char* name){
175 name_ = xbt_strdup(name);
178 int Win::fence(int assert)
180 XBT_DEBUG("Entering fence");
183 if (assert != MPI_MODE_NOPRECEDE) {
184 // This is not the first fence => finalize what came before
187 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
188 // Without this, the vector could get redimensioned when another process pushes.
189 // This would result in the array used by Request::waitall() to be invalidated.
190 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
191 std::vector<MPI_Request> *reqs = requests_;
192 int size = static_cast<int>(reqs->size());
193 // start all requests that have been prepared by another process
195 MPI_Request* treqs = &(*reqs)[0];
196 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
202 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
207 XBT_DEBUG("Leaving fence");
212 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
213 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
215 //get receiver pointer
216 const Win* recv_win = connected_wins_[target_rank];
218 if(opened_==0){//check that post/start has been done
219 // no fence or start .. lock ok ?
221 for (auto const& it : recv_win->lockers_)
222 if (it == comm_->rank())
228 if(target_count*target_datatype->get_extent()>recv_win->size_)
231 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
233 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
234 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
235 // prepare send_request
237 // TODO cheinrich Check for rank / pid conversion
238 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
241 //prepare receiver request
242 // TODO cheinrich Check for rank / pid conversion
243 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
244 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
249 if(request!=nullptr){
253 requests_->push_back(sreq);
257 //push request to receiver's win
258 recv_win->mut_->lock();
259 recv_win->requests_->push_back(rreq);
261 recv_win->mut_->unlock();
263 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
264 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
266 *request = MPI_REQUEST_NULL;
272 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
273 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
276 const Win* send_win = connected_wins_[target_rank];
278 if(opened_==0){//check that post/start has been done
279 // no fence or start .. lock ok ?
281 for (auto const& it : send_win->lockers_)
282 if (it == comm_->rank())
288 if(target_count*target_datatype->get_extent()>send_win->size_)
291 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
292 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
294 if(target_rank != comm_->rank()){
295 //prepare send_request
296 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
297 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
299 //prepare receiver request
300 MPI_Request rreq = Request::rma_recv_init(
301 origin_addr, origin_count, origin_datatype, target_rank,
302 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
303 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
305 //start the send, with another process than us as sender.
307 //push request to receiver's win
308 send_win->mut_->lock();
309 send_win->requests_->push_back(sreq);
310 send_win->mut_->unlock();
315 if(request!=nullptr){
319 requests_->push_back(rreq);
323 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
325 *request=MPI_REQUEST_NULL;
330 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
331 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
333 XBT_DEBUG("Entering MPI_Win_Accumulate");
334 //get receiver pointer
335 const Win* recv_win = connected_wins_[target_rank];
337 if(opened_==0){//check that post/start has been done
338 // no fence or start .. lock ok ?
340 for (auto const& it : recv_win->lockers_)
341 if (it == comm_->rank())
346 //FIXME: local version
348 if(target_count*target_datatype->get_extent()>recv_win->size_)
351 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
352 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
353 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
354 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
355 // prepare send_request
357 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
358 SMPI_RMA_TAG - 3 - count_, comm_, op);
360 // prepare receiver request
361 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
362 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
368 // push request to receiver's win
369 recv_win->mut_->lock();
370 recv_win->requests_->push_back(rreq);
372 recv_win->mut_->unlock();
374 if (request != nullptr) {
378 requests_->push_back(sreq);
382 XBT_DEBUG("Leaving MPI_Win_Accumulate");
386 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
387 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
388 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
391 const Win* send_win = connected_wins_[target_rank];
393 if(opened_==0){//check that post/start has been done
394 // no fence or start .. lock ok ?
396 for (auto const& it : send_win->lockers_)
397 if (it == comm_->rank())
403 if(target_count*target_datatype->get_extent()>send_win->size_)
406 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
407 //need to be sure ops are correctly ordered, so finish request here ? slow.
409 send_win->atomic_mut_->lock();
410 get(result_addr, result_count, result_datatype, target_rank,
411 target_disp, target_count, target_datatype, &req);
412 if (req != MPI_REQUEST_NULL)
413 Request::wait(&req, MPI_STATUS_IGNORE);
415 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
416 target_disp, target_count, target_datatype, op, &req);
417 if (req != MPI_REQUEST_NULL)
418 Request::wait(&req, MPI_STATUS_IGNORE);
419 send_win->atomic_mut_->unlock();
423 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
424 int target_rank, MPI_Aint target_disp)
427 const Win* send_win = connected_wins_[target_rank];
429 if(opened_==0){//check that post/start has been done
430 // no fence or start .. lock ok ?
432 for (auto const& it : send_win->lockers_)
433 if (it == comm_->rank())
439 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
440 MPI_Request req = MPI_REQUEST_NULL;
441 send_win->atomic_mut_->lock();
442 get(result_addr, 1, datatype, target_rank,
443 target_disp, 1, datatype, &req);
444 if (req != MPI_REQUEST_NULL)
445 Request::wait(&req, MPI_STATUS_IGNORE);
446 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
447 put(origin_addr, 1, datatype, target_rank,
448 target_disp, 1, datatype);
450 send_win->atomic_mut_->unlock();
454 int Win::start(MPI_Group group, int /*assert*/)
456 /* From MPI forum advices
457 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
458 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
459 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
460 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
461 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
462 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
463 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
464 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
465 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
466 must complete, without further dependencies. */
468 //naive, blocking implementation.
471 int size = group->size();
472 std::vector<MPI_Request> reqs(size);
474 XBT_DEBUG("Entering MPI_Win_Start");
476 int src = comm_->group()->rank(group->actor(j));
477 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
478 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
484 Request::startall(size, reqs.data());
485 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
486 for (i = 0; i < size; i++) {
487 Request::unref(&reqs[i]);
489 opened_++; //we're open for business !
492 XBT_DEBUG("Leaving MPI_Win_Start");
496 int Win::post(MPI_Group group, int /*assert*/)
498 //let's make a synchronous send here
501 int size = group->size();
502 std::vector<MPI_Request> reqs(size);
504 XBT_DEBUG("Entering MPI_Win_Post");
506 int dst = comm_->group()->rank(group->actor(j));
507 if (dst != rank_ && dst != MPI_UNDEFINED) {
508 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
515 Request::startall(size, reqs.data());
516 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
518 Request::unref(&reqs[i]);
520 opened_++; //we're open for business !
523 XBT_DEBUG("Leaving MPI_Win_Post");
529 xbt_die("Complete called on already opened MPI_Win");
531 XBT_DEBUG("Entering MPI_Win_Complete");
534 int size = group_->size();
535 std::vector<MPI_Request> reqs(size);
538 int dst = comm_->group()->rank(group_->actor(j));
539 if (dst != rank_ && dst != MPI_UNDEFINED) {
540 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
546 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
547 Request::startall(size, reqs.data());
548 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
551 Request::unref(&reqs[i]);
554 int finished = finish_comms();
555 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
557 Group::unref(group_);
558 opened_--; //we're closed for business !
563 //naive, blocking implementation.
564 XBT_DEBUG("Entering MPI_Win_Wait");
567 int size = group_->size();
568 std::vector<MPI_Request> reqs(size);
571 int src = comm_->group()->rank(group_->actor(j));
572 if (src != rank_ && src != MPI_UNDEFINED) {
573 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
579 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
580 Request::startall(size, reqs.data());
581 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
583 Request::unref(&reqs[i]);
585 int finished = finish_comms();
586 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
588 Group::unref(group_);
589 opened_--; //we're opened for business !
593 int Win::lock(int lock_type, int rank, int /*assert*/)
595 MPI_Win target_win = connected_wins_[rank];
597 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
598 target_win->lock_mut_->lock();
599 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
600 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
601 target_win->lock_mut_->unlock();
603 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
604 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
606 target_win->lockers_.push_back(comm_->rank());
608 int finished = finish_comms(rank);
609 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
610 finished = target_win->finish_comms(rank_);
611 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
615 int Win::lock_all(int assert){
616 int retval = MPI_SUCCESS;
617 for (int i = 0; i < comm_->size(); i++) {
618 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
619 if (ret != MPI_SUCCESS)
625 int Win::unlock(int rank){
626 MPI_Win target_win = connected_wins_[rank];
627 int target_mode = target_win->mode_;
628 target_win->mode_= 0;
629 target_win->lockers_.remove(comm_->rank());
630 if (target_mode==MPI_LOCK_EXCLUSIVE){
631 target_win->lock_mut_->unlock();
634 int finished = finish_comms(rank);
635 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
636 finished = target_win->finish_comms(rank_);
637 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
641 int Win::unlock_all(){
642 int retval = MPI_SUCCESS;
643 for (int i = 0; i < comm_->size(); i++) {
644 int ret = this->unlock(i);
645 if (ret != MPI_SUCCESS)
651 int Win::flush(int rank){
652 MPI_Win target_win = connected_wins_[rank];
653 int finished = finish_comms(rank_);
654 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
655 finished = target_win->finish_comms(rank);
656 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
660 int Win::flush_local(int rank){
661 int finished = finish_comms(rank);
662 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
666 int Win::flush_all(){
667 int finished = finish_comms();
668 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
669 for (int i = 0; i < comm_->size(); i++) {
670 finished = connected_wins_[i]->finish_comms(rank_);
671 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
676 int Win::flush_local_all(){
677 int finished = finish_comms();
678 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
682 Win* Win::f2c(int id){
683 return static_cast<Win*>(F2C::f2c(id));
686 int Win::finish_comms(){
688 //Finish own requests
689 std::vector<MPI_Request> *reqqs = requests_;
690 int size = static_cast<int>(reqqs->size());
692 MPI_Request* treqs = &(*reqqs)[0];
693 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
700 int Win::finish_comms(int rank){
702 //Finish own requests
703 std::vector<MPI_Request> *reqqs = requests_;
704 int size = static_cast<int>(reqqs->size());
707 std::vector<MPI_Request> myreqqs;
708 auto iter = reqqs->begin();
709 int proc_id = comm_->group()->actor(rank)->get_pid();
710 while (iter != reqqs->end()){
711 // Let's see if we're either the destination or the sender of this request
712 // because we only wait for requests that we are responsible for.
713 // Also use the process id here since the request itself returns from src()
714 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
715 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
716 myreqqs.push_back(*iter);
717 iter = reqqs->erase(iter);
724 MPI_Request* treqs = &myreqqs[0];
725 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
733 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
735 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
736 for (int i = 0; not target_win && i < comm_->size(); i++) {
737 if (connected_wins_[i]->size_ > 0)
738 target_win = connected_wins_[i];
741 *size = target_win->size_;
742 *disp_unit = target_win->disp_unit_;
743 *static_cast<void**>(baseptr) = target_win->base_;
746 *static_cast<void**>(baseptr) = nullptr;
751 MPI_Errhandler Win::errhandler()
753 if (errhandler_ != MPI_ERRHANDLER_NULL)
758 void Win::set_errhandler(MPI_Errhandler errhandler)
760 if (errhandler_ != MPI_ERRHANDLER_NULL)
761 simgrid::smpi::Errhandler::unref(errhandler_);
762 errhandler_ = errhandler;
763 if (errhandler_ != MPI_ERRHANDLER_NULL)
767 } // namespace simgrid