1 /* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
28 , disp_unit_(disp_unit)
32 , allocated_(allocated)
35 XBT_DEBUG("Creating window");
36 if(info!=MPI_INFO_NULL)
38 int comm_size = comm->size();
40 group_ = MPI_GROUP_NULL;
41 requests_ = new std::vector<MPI_Request>();
42 mut_ = s4u::Mutex::create();
43 lock_mut_ = s4u::Mutex::create();
44 atomic_mut_ = s4u::Mutex::create();
45 connected_wins_ = new MPI_Win[comm_size];
46 connected_wins_[rank_] = this;
49 bar_ = new s4u::Barrier(comm_size);
52 errhandler_=MPI_ERRORS_ARE_FATAL;
54 comm->add_rma_win(this);
57 colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
60 colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
66 //As per the standard, perform a barrier to ensure every async comm is finished
69 int finished = finish_comms();
70 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
73 delete[] connected_wins_;
74 if (info_ != MPI_INFO_NULL)
75 simgrid::smpi::Info::unref(info_);
76 if (errhandler_ != MPI_ERRHANDLER_NULL)
77 simgrid::smpi::Errhandler::unref(errhandler_);
79 comm_->remove_rma_win(this);
81 colls::barrier(comm_);
93 int Win::attach(void* /*base*/, MPI_Aint size)
95 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
97 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
102 int Win::detach(const void* /*base*/)
109 void Win::get_name(char* name, int* length) const
111 *length = name_.length();
112 if (not name_.empty()) {
113 name_.copy(name, *length);
114 name[*length] = '\0';
118 void Win::get_group(MPI_Group* group){
119 if(comm_ != MPI_COMM_NULL){
120 *group = comm_->group();
122 *group = MPI_GROUP_NULL;
128 if (info_ == MPI_INFO_NULL)
134 int Win::rank() const
139 MPI_Aint Win::size() const
144 void* Win::base() const
149 int Win::disp_unit() const
154 int Win::dynamic() const
159 void Win::set_info(MPI_Info info)
161 if (info_ != MPI_INFO_NULL)
162 simgrid::smpi::Info::unref(info_);
164 if (info_ != MPI_INFO_NULL)
168 void Win::set_name(const char* name){
172 int Win::fence(int assert)
174 XBT_DEBUG("Entering fence");
177 if (assert != MPI_MODE_NOPRECEDE) {
178 // This is not the first fence => finalize what came before
181 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
182 // Without this, the vector could get redimensioned when another process pushes.
183 // This would result in the array used by Request::waitall() to be invalidated.
184 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
185 std::vector<MPI_Request> *reqs = requests_;
186 int size = static_cast<int>(reqs->size());
187 // start all requests that have been prepared by another process
189 MPI_Request* treqs = &(*reqs)[0];
190 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
196 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
201 XBT_DEBUG("Leaving fence");
206 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
207 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
209 //get receiver pointer
210 const Win* recv_win = connected_wins_[target_rank];
212 if(opened_==0){//check that post/start has been done
213 // no fence or start .. lock ok ?
215 for (auto const& it : recv_win->lockers_)
216 if (it == comm_->rank())
222 if(target_count*target_datatype->get_extent()>recv_win->size_)
225 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
227 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
228 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
229 // prepare send_request
231 // TODO cheinrich Check for rank / pid conversion
232 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
235 //prepare receiver request
236 // TODO cheinrich Check for rank / pid conversion
237 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
238 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
243 if(request!=nullptr){
247 requests_->push_back(sreq);
251 //push request to receiver's win
252 recv_win->mut_->lock();
253 recv_win->requests_->push_back(rreq);
255 recv_win->mut_->unlock();
257 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
258 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
260 *request = MPI_REQUEST_NULL;
266 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
267 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
270 const Win* send_win = connected_wins_[target_rank];
272 if(opened_==0){//check that post/start has been done
273 // no fence or start .. lock ok ?
275 for (auto const& it : send_win->lockers_)
276 if (it == comm_->rank())
282 if(target_count*target_datatype->get_extent()>send_win->size_)
285 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
286 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
288 if(target_rank != comm_->rank()){
289 //prepare send_request
290 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
291 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
293 //prepare receiver request
294 MPI_Request rreq = Request::rma_recv_init(
295 origin_addr, origin_count, origin_datatype, target_rank,
296 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
297 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
299 //start the send, with another process than us as sender.
301 //push request to receiver's win
302 send_win->mut_->lock();
303 send_win->requests_->push_back(sreq);
304 send_win->mut_->unlock();
309 if(request!=nullptr){
313 requests_->push_back(rreq);
317 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
319 *request=MPI_REQUEST_NULL;
324 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
325 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
327 XBT_DEBUG("Entering MPI_Win_Accumulate");
328 //get receiver pointer
329 const Win* recv_win = connected_wins_[target_rank];
331 if(opened_==0){//check that post/start has been done
332 // no fence or start .. lock ok ?
334 for (auto const& it : recv_win->lockers_)
335 if (it == comm_->rank())
340 //FIXME: local version
342 if(target_count*target_datatype->get_extent()>recv_win->size_)
345 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
346 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
347 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
348 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
349 // prepare send_request
351 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
352 SMPI_RMA_TAG - 3 - count_, comm_, op);
354 // prepare receiver request
355 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
356 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
362 // push request to receiver's win
363 recv_win->mut_->lock();
364 recv_win->requests_->push_back(rreq);
366 recv_win->mut_->unlock();
368 if (request != nullptr) {
372 requests_->push_back(sreq);
376 XBT_DEBUG("Leaving MPI_Win_Accumulate");
380 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
381 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
382 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
385 const Win* send_win = connected_wins_[target_rank];
387 if(opened_==0){//check that post/start has been done
388 // no fence or start .. lock ok ?
390 for (auto const& it : send_win->lockers_)
391 if (it == comm_->rank())
397 if(target_count*target_datatype->get_extent()>send_win->size_)
400 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
401 //need to be sure ops are correctly ordered, so finish request here ? slow.
403 send_win->atomic_mut_->lock();
404 get(result_addr, result_count, result_datatype, target_rank,
405 target_disp, target_count, target_datatype, &req);
406 if (req != MPI_REQUEST_NULL)
407 Request::wait(&req, MPI_STATUS_IGNORE);
409 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
410 target_disp, target_count, target_datatype, op, &req);
411 if (req != MPI_REQUEST_NULL)
412 Request::wait(&req, MPI_STATUS_IGNORE);
413 send_win->atomic_mut_->unlock();
417 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
418 int target_rank, MPI_Aint target_disp)
421 const Win* send_win = connected_wins_[target_rank];
423 if(opened_==0){//check that post/start has been done
424 // no fence or start .. lock ok ?
426 for (auto const& it : send_win->lockers_)
427 if (it == comm_->rank())
433 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
434 MPI_Request req = MPI_REQUEST_NULL;
435 send_win->atomic_mut_->lock();
436 get(result_addr, 1, datatype, target_rank,
437 target_disp, 1, datatype, &req);
438 if (req != MPI_REQUEST_NULL)
439 Request::wait(&req, MPI_STATUS_IGNORE);
440 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
441 put(origin_addr, 1, datatype, target_rank,
442 target_disp, 1, datatype);
444 send_win->atomic_mut_->unlock();
448 int Win::start(MPI_Group group, int /*assert*/)
450 /* From MPI forum advices
451 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
452 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
453 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
454 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
455 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
456 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
457 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
458 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
459 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
460 must complete, without further dependencies. */
462 //naive, blocking implementation.
465 int size = group->size();
466 std::vector<MPI_Request> reqs(size);
468 XBT_DEBUG("Entering MPI_Win_Start");
470 int src = comm_->group()->rank(group->actor(j));
471 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
472 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
478 Request::startall(size, reqs.data());
479 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
480 for (i = 0; i < size; i++) {
481 Request::unref(&reqs[i]);
483 opened_++; //we're open for business !
486 XBT_DEBUG("Leaving MPI_Win_Start");
490 int Win::post(MPI_Group group, int /*assert*/)
492 //let's make a synchronous send here
495 int size = group->size();
496 std::vector<MPI_Request> reqs(size);
498 XBT_DEBUG("Entering MPI_Win_Post");
500 int dst = comm_->group()->rank(group->actor(j));
501 if (dst != rank_ && dst != MPI_UNDEFINED) {
502 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
509 Request::startall(size, reqs.data());
510 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
512 Request::unref(&reqs[i]);
514 opened_++; //we're open for business !
517 XBT_DEBUG("Leaving MPI_Win_Post");
523 xbt_die("Complete called on already opened MPI_Win");
525 XBT_DEBUG("Entering MPI_Win_Complete");
528 int size = group_->size();
529 std::vector<MPI_Request> reqs(size);
532 int dst = comm_->group()->rank(group_->actor(j));
533 if (dst != rank_ && dst != MPI_UNDEFINED) {
534 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
540 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
541 Request::startall(size, reqs.data());
542 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
545 Request::unref(&reqs[i]);
548 int finished = finish_comms();
549 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
551 Group::unref(group_);
552 opened_--; //we're closed for business !
557 //naive, blocking implementation.
558 XBT_DEBUG("Entering MPI_Win_Wait");
561 int size = group_->size();
562 std::vector<MPI_Request> reqs(size);
565 int src = comm_->group()->rank(group_->actor(j));
566 if (src != rank_ && src != MPI_UNDEFINED) {
567 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
573 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
574 Request::startall(size, reqs.data());
575 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
577 Request::unref(&reqs[i]);
579 int finished = finish_comms();
580 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
582 Group::unref(group_);
583 opened_--; //we're opened for business !
587 int Win::lock(int lock_type, int rank, int /*assert*/)
589 MPI_Win target_win = connected_wins_[rank];
591 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
592 target_win->lock_mut_->lock();
593 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
594 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
595 target_win->lock_mut_->unlock();
597 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
598 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
600 target_win->lockers_.push_back(comm_->rank());
602 int finished = finish_comms(rank);
603 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
604 finished = target_win->finish_comms(rank_);
605 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
609 int Win::lock_all(int assert){
610 int retval = MPI_SUCCESS;
611 for (int i = 0; i < comm_->size(); i++) {
612 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
613 if (ret != MPI_SUCCESS)
619 int Win::unlock(int rank){
620 MPI_Win target_win = connected_wins_[rank];
621 int target_mode = target_win->mode_;
622 target_win->mode_= 0;
623 target_win->lockers_.remove(comm_->rank());
624 if (target_mode==MPI_LOCK_EXCLUSIVE){
625 target_win->lock_mut_->unlock();
628 int finished = finish_comms(rank);
629 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
630 finished = target_win->finish_comms(rank_);
631 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
635 int Win::unlock_all(){
636 int retval = MPI_SUCCESS;
637 for (int i = 0; i < comm_->size(); i++) {
638 int ret = this->unlock(i);
639 if (ret != MPI_SUCCESS)
645 int Win::flush(int rank){
646 MPI_Win target_win = connected_wins_[rank];
647 int finished = finish_comms(rank_);
648 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
649 finished = target_win->finish_comms(rank);
650 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
654 int Win::flush_local(int rank){
655 int finished = finish_comms(rank);
656 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
660 int Win::flush_all(){
661 int finished = finish_comms();
662 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
663 for (int i = 0; i < comm_->size(); i++) {
664 finished = connected_wins_[i]->finish_comms(rank_);
665 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
670 int Win::flush_local_all(){
671 int finished = finish_comms();
672 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
676 Win* Win::f2c(int id){
677 return static_cast<Win*>(F2C::f2c(id));
680 int Win::finish_comms(){
682 //Finish own requests
683 std::vector<MPI_Request> *reqqs = requests_;
684 int size = static_cast<int>(reqqs->size());
686 MPI_Request* treqs = &(*reqqs)[0];
687 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
694 int Win::finish_comms(int rank){
696 //Finish own requests
697 std::vector<MPI_Request> *reqqs = requests_;
698 int size = static_cast<int>(reqqs->size());
701 std::vector<MPI_Request> myreqqs;
702 auto iter = reqqs->begin();
703 int proc_id = comm_->group()->actor(rank)->get_pid();
704 while (iter != reqqs->end()){
705 // Let's see if we're either the destination or the sender of this request
706 // because we only wait for requests that we are responsible for.
707 // Also use the process id here since the request itself returns from src()
708 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
709 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
710 myreqqs.push_back(*iter);
711 iter = reqqs->erase(iter);
718 MPI_Request* treqs = &myreqqs[0];
719 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
727 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
729 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
730 for (int i = 0; not target_win && i < comm_->size(); i++) {
731 if (connected_wins_[i]->size_ > 0)
732 target_win = connected_wins_[i];
735 *size = target_win->size_;
736 *disp_unit = target_win->disp_unit_;
737 *static_cast<void**>(baseptr) = target_win->base_;
740 *static_cast<void**>(baseptr) = nullptr;
745 MPI_Errhandler Win::errhandler()
747 if (errhandler_ != MPI_ERRHANDLER_NULL)
752 void Win::set_errhandler(MPI_Errhandler errhandler)
754 if (errhandler_ != MPI_ERRHANDLER_NULL)
755 simgrid::smpi::Errhandler::unref(errhandler_);
756 errhandler_ = errhandler;
757 if (errhandler_ != MPI_ERRHANDLER_NULL)
761 } // namespace simgrid