1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24 if(target_count*target_datatype->get_extent()>win->size_){\
25 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28 return MPI_ERR_RMA_RANGE;\
31 #define CHECK_WIN_LOCKED(win) \
32 if (opened_ == 0) { /*check that post/start has been done*/ \
33 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
46 , disp_unit_(disp_unit)
49 , connected_wins_(comm->size())
51 , allocated_(allocated)
54 XBT_DEBUG("Creating window");
55 if(info!=MPI_INFO_NULL)
57 connected_wins_[rank_] = this;
59 comm->add_rma_win(this);
62 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64 if (MC_is_active() || MC_record_replay_is_active()){
65 s4u::Barrier* bar_ptr;
67 bar_ = s4u::Barrier::create(comm->size());
70 colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
72 bar_ = s4u::BarrierPtr(bar_ptr);
80 int Win::del(Win* win){
81 //As per the standard, perform a barrier to ensure every async comm is finished
82 if (MC_is_active() || MC_record_replay_is_active())
85 colls::barrier(win->comm_);
86 win->flush_local_all();
88 if (win->info_ != MPI_INFO_NULL)
89 simgrid::smpi::Info::unref(win->info_);
90 if (win->errhandler_ != MPI_ERRHANDLER_NULL)
91 simgrid::smpi::Errhandler::unref(win->errhandler_);
93 win->comm_->remove_rma_win(win);
95 colls::barrier(win->comm_);
96 Comm::unref(win->comm_);
97 if (!win->lockers_.empty() || win->opened_ < 0){
98 XBT_WARN("Freeing a locked or opened window");
102 xbt_free(win->base_);
104 F2C::free_f(win->f2c_id());
105 win->cleanup_attr<Win>();
111 int Win::attach(void* /*base*/, MPI_Aint size)
113 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
115 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
120 int Win::detach(const void* /*base*/)
127 void Win::get_name(char* name, int* length) const
129 *length = static_cast<int>(name_.length());
130 if (not name_.empty()) {
131 name_.copy(name, *length);
132 name[*length] = '\0';
136 void Win::get_group(MPI_Group* group){
137 if(comm_ != MPI_COMM_NULL){
138 *group = comm_->group();
140 *group = MPI_GROUP_NULL;
149 int Win::rank() const
154 MPI_Comm Win::comm() const
159 MPI_Aint Win::size() const
164 void* Win::base() const
169 int Win::disp_unit() const
174 bool Win::dynamic() const
179 void Win::set_info(MPI_Info info)
181 if (info_ != MPI_INFO_NULL)
182 simgrid::smpi::Info::unref(info_);
184 if (info_ != MPI_INFO_NULL)
188 void Win::set_name(const char* name){
192 int Win::fence(int assert)
194 XBT_DEBUG("Entering fence");
196 if (not (assert & MPI_MODE_NOPRECEDE)) {
197 // This is not the first fence => finalize what came before
198 if (MC_is_active() || MC_record_replay_is_active())
201 colls::barrier(comm_);
206 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
209 if (MC_is_active() || MC_record_replay_is_active())
212 colls::barrier(comm_);
213 XBT_DEBUG("Leaving fence");
218 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
219 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
221 //get receiver pointer
222 Win* recv_win = connected_wins_[target_rank];
224 CHECK_WIN_LOCKED(recv_win)
225 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
227 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
229 if (target_rank != rank_) { // This is not for myself, so we need to send messages
230 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
231 // prepare send_request
233 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
236 //prepare receiver request
237 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
238 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
243 if(request!=nullptr){
247 requests_.push_back(sreq);
251 //push request to receiver's win
252 recv_win->mut_->lock();
253 recv_win->requests_.push_back(rreq);
255 recv_win->mut_->unlock();
257 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
258 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
260 *request = MPI_REQUEST_NULL;
266 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
267 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
270 Win* send_win = connected_wins_[target_rank];
272 CHECK_WIN_LOCKED(send_win)
273 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
275 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
276 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
278 if (target_rank != rank_) {
279 //prepare send_request
280 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
281 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
283 //prepare receiver request
284 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
285 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
287 //start the send, with another process than us as sender.
289 // push request to sender's win
290 send_win->mut_->lock();
291 send_win->requests_.push_back(sreq);
292 send_win->mut_->unlock();
297 if(request!=nullptr){
301 requests_.push_back(rreq);
305 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
307 *request=MPI_REQUEST_NULL;
312 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
313 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
315 XBT_DEBUG("Entering MPI_Win_Accumulate");
316 //get receiver pointer
317 Win* recv_win = connected_wins_[target_rank];
319 //FIXME: local version
320 CHECK_WIN_LOCKED(recv_win)
321 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
323 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
324 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
325 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
326 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
327 // prepare send_request
329 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
330 SMPI_RMA_TAG - 3 - count_, comm_, op);
332 // prepare receiver request
333 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
334 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
340 // push request to receiver's win
341 recv_win->mut_->lock();
342 recv_win->requests_.push_back(rreq);
344 recv_win->mut_->unlock();
346 if (request != nullptr) {
350 requests_.push_back(sreq);
354 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
355 // 'flush' is a workaround to fix that.
357 XBT_DEBUG("Leaving MPI_Win_Accumulate");
361 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
362 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
363 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
366 const Win* send_win = connected_wins_[target_rank];
368 CHECK_WIN_LOCKED(send_win)
369 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
371 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
372 //need to be sure ops are correctly ordered, so finish request here ? slow.
373 MPI_Request req = MPI_REQUEST_NULL;
374 send_win->atomic_mut_->lock();
375 get(result_addr, result_count, result_datatype, target_rank,
376 target_disp, target_count, target_datatype, &req);
377 if (req != MPI_REQUEST_NULL)
378 Request::wait(&req, MPI_STATUS_IGNORE);
380 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
381 target_disp, target_count, target_datatype, op, &req);
382 if (req != MPI_REQUEST_NULL)
383 Request::wait(&req, MPI_STATUS_IGNORE);
384 send_win->atomic_mut_->unlock();
388 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
389 int target_rank, MPI_Aint target_disp)
392 const Win* send_win = connected_wins_[target_rank];
394 CHECK_WIN_LOCKED(send_win)
396 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
397 MPI_Request req = MPI_REQUEST_NULL;
398 send_win->atomic_mut_->lock();
399 get(result_addr, 1, datatype, target_rank,
400 target_disp, 1, datatype, &req);
401 if (req != MPI_REQUEST_NULL)
402 Request::wait(&req, MPI_STATUS_IGNORE);
403 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
404 put(origin_addr, 1, datatype, target_rank,
405 target_disp, 1, datatype);
407 send_win->atomic_mut_->unlock();
411 int Win::start(MPI_Group group, int /*assert*/)
413 /* From MPI forum advices
414 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
415 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
416 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
417 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
418 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
419 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
420 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
421 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
422 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
423 must complete, without further dependencies. */
425 //naive, blocking implementation.
426 XBT_DEBUG("Entering MPI_Win_Start");
427 std::vector<MPI_Request> reqs;
428 for (int i = 0; i < group->size(); i++) {
429 int src = comm_->group()->rank(group->actor(i));
430 xbt_assert(src != MPI_UNDEFINED);
432 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
434 int size = static_cast<int>(reqs.size());
436 Request::startall(size, reqs.data());
437 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
438 for (auto& req : reqs)
439 Request::unref(&req);
443 opened_--; // we're open for business !
444 XBT_DEBUG("Leaving MPI_Win_Start");
448 int Win::post(MPI_Group group, int /*assert*/)
450 //let's make a synchronous send here
451 XBT_DEBUG("Entering MPI_Win_Post");
452 std::vector<MPI_Request> reqs;
453 for (int i = 0; i < group->size(); i++) {
454 int dst = comm_->group()->rank(group->actor(i));
455 xbt_assert(dst != MPI_UNDEFINED);
457 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
459 int size = static_cast<int>(reqs.size());
461 Request::startall(size, reqs.data());
462 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
463 for (auto& req : reqs)
464 Request::unref(&req);
468 opened_--; // we're open for business !
469 XBT_DEBUG("Leaving MPI_Win_Post");
474 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
476 XBT_DEBUG("Entering MPI_Win_Complete");
477 std::vector<MPI_Request> reqs;
478 for (int i = 0; i < dst_group_->size(); i++) {
479 int dst = comm_->group()->rank(dst_group_->actor(i));
480 xbt_assert(dst != MPI_UNDEFINED);
482 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
484 int size = static_cast<int>(reqs.size());
486 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
487 Request::startall(size, reqs.data());
488 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
489 for (auto& req : reqs)
490 Request::unref(&req);
494 opened_++; //we're closed for business !
495 Group::unref(dst_group_);
496 dst_group_ = MPI_GROUP_NULL;
501 //naive, blocking implementation.
502 XBT_DEBUG("Entering MPI_Win_Wait");
503 std::vector<MPI_Request> reqs;
504 for (int i = 0; i < src_group_->size(); i++) {
505 int src = comm_->group()->rank(src_group_->actor(i));
506 xbt_assert(src != MPI_UNDEFINED);
508 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
510 int size = static_cast<int>(reqs.size());
512 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
513 Request::startall(size, reqs.data());
514 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
515 for (auto& req : reqs)
516 Request::unref(&req);
520 opened_++; //we're closed for business !
521 Group::unref(src_group_);
522 src_group_ = MPI_GROUP_NULL;
526 int Win::lock(int lock_type, int rank, int /*assert*/)
528 MPI_Win target_win = connected_wins_[rank];
530 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
531 target_win->lock_mut_->lock();
532 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
533 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
534 target_win->lock_mut_->unlock();
536 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
537 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
539 target_win->lockers_.push_back(rank_);
545 int Win::lock_all(int assert){
546 int retval = MPI_SUCCESS;
547 for (int i = 0; i < comm_->size(); i++) {
548 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
549 if (ret != MPI_SUCCESS)
555 int Win::unlock(int rank){
556 MPI_Win target_win = connected_wins_[rank];
557 int target_mode = target_win->mode_;
558 target_win->mode_= 0;
559 target_win->lockers_.remove(rank_);
560 if (target_mode==MPI_LOCK_EXCLUSIVE){
561 target_win->lock_mut_->unlock();
568 int Win::unlock_all(){
569 int retval = MPI_SUCCESS;
570 for (int i = 0; i < comm_->size(); i++) {
571 int ret = this->unlock(i);
572 if (ret != MPI_SUCCESS)
578 int Win::flush(int rank){
579 int finished = finish_comms(rank);
580 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
582 finished = connected_wins_[rank]->finish_comms(rank_);
583 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
588 int Win::flush_local(int rank){
589 int finished = finish_comms(rank);
590 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
594 int Win::flush_all(){
595 int finished = finish_comms();
596 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
597 for (int i = 0; i < comm_->size(); i++) {
599 finished = connected_wins_[i]->finish_comms(rank_);
600 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
606 int Win::flush_local_all(){
607 int finished = finish_comms();
608 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
612 Win* Win::f2c(int id){
613 return static_cast<Win*>(F2C::f2c(id));
616 int Win::finish_comms(){
617 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
618 // Without this, the vector could get redimensioned when another process pushes.
619 // This would result in the array used by Request::waitall() to be invalidated.
620 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
622 //Finish own requests
623 int size = static_cast<int>(requests_.size());
625 MPI_Request* treqs = requests_.data();
626 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
633 int Win::finish_comms(int rank){
634 // See comment about the mutex in finish_comms() above
636 // Finish own requests
637 // Let's see if we're either the destination or the sender of this request
638 // because we only wait for requests that we are responsible for.
639 // Also use the process id here since the request itself returns from src()
640 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
641 aid_t proc_id = comm_->group()->actor(rank);
642 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
643 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
645 std::vector<MPI_Request> myreqqs(it, end(requests_));
646 requests_.erase(it, end(requests_));
647 int size = static_cast<int>(myreqqs.size());
649 MPI_Request* treqs = myreqqs.data();
650 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
657 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
659 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
660 for (int i = 0; not target_win && i < comm_->size(); i++) {
661 if (connected_wins_[i]->size_ > 0)
662 target_win = connected_wins_[i];
665 *size = target_win->size_;
666 *disp_unit = target_win->disp_unit_;
667 *static_cast<void**>(baseptr) = target_win->base_;
670 *static_cast<void**>(baseptr) = nullptr;
675 MPI_Errhandler Win::errhandler()
677 if (errhandler_ != MPI_ERRHANDLER_NULL)
682 void Win::set_errhandler(MPI_Errhandler errhandler)
684 if (errhandler_ != MPI_ERRHANDLER_NULL)
685 simgrid::smpi::Errhandler::unref(errhandler_);
686 errhandler_ = errhandler;
687 if (errhandler_ != MPI_ERRHANDLER_NULL)
691 } // namespace simgrid