1 /* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
20 #include <mutex> // std::scoped_lock
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
24 #define CHECK_RMA_REMOTE_WIN(fun, win)\
25 if(target_count*target_datatype->get_extent()>win->size_){\
26 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
27 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
28 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
29 return MPI_ERR_RMA_RANGE;\
32 #define CHECK_WIN_LOCKED(win) \
33 if (opened_ == 0) { /*check that post/start has been done*/ \
34 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
39 namespace simgrid::smpi {
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
46 , disp_unit_(disp_unit)
49 , connected_wins_(comm->size())
51 , allocated_(allocated)
54 XBT_DEBUG("Creating window");
55 if(info!=MPI_INFO_NULL)
57 connected_wins_[rank_] = this;
59 comm->add_rma_win(this);
62 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64 if (MC_is_active() || MC_record_replay_is_active()){
65 s4u::Barrier* bar_ptr;
67 bar_ = s4u::Barrier::create(comm->size());
70 colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
72 bar_ = s4u::BarrierPtr(bar_ptr);
77 int Win::del(Win* win){
78 //As per the standard, perform a barrier to ensure every async comm is finished
79 if (MC_is_active() || MC_record_replay_is_active())
82 colls::barrier(win->comm_);
83 win->flush_local_all();
85 if (win->info_ != MPI_INFO_NULL)
86 simgrid::smpi::Info::unref(win->info_);
87 if (win->errhandler_ != MPI_ERRHANDLER_NULL)
88 simgrid::smpi::Errhandler::unref(win->errhandler_);
90 win->comm_->remove_rma_win(win);
92 colls::barrier(win->comm_);
93 Comm::unref(win->comm_);
94 if (not win->lockers_.empty() || win->opened_ < 0) {
95 XBT_WARN("Freeing a locked or opened window");
100 for (auto m : {win->mut_, win->lock_mut_, win->atomic_mut_})
101 if (m->get_owner() != nullptr)
104 F2C::free_f(win->f2c_id());
105 win->cleanup_attr<Win>();
111 int Win::attach(void* /*base*/, MPI_Aint size)
113 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
115 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
120 int Win::detach(const void* /*base*/)
127 void Win::get_name(char* name, int* length) const
129 *length = static_cast<int>(name_.length());
130 if (not name_.empty()) {
131 name_.copy(name, *length);
132 name[*length] = '\0';
136 void Win::get_group(MPI_Group* group){
137 if(comm_ != MPI_COMM_NULL){
138 *group = comm_->group();
140 *group = MPI_GROUP_NULL;
149 int Win::rank() const
154 MPI_Comm Win::comm() const
159 MPI_Aint Win::size() const
164 void* Win::base() const
169 int Win::disp_unit() const
174 bool Win::dynamic() const
179 void Win::set_info(MPI_Info info)
181 if (info_ != MPI_INFO_NULL)
182 simgrid::smpi::Info::unref(info_);
184 if (info_ != MPI_INFO_NULL)
188 void Win::set_name(const char* name){
192 int Win::fence(int assert)
194 XBT_DEBUG("Entering fence");
196 if (not (assert & MPI_MODE_NOPRECEDE)) {
197 // This is not the first fence => finalize what came before
198 if (MC_is_active() || MC_record_replay_is_active())
201 colls::barrier(comm_);
206 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
209 if (MC_is_active() || MC_record_replay_is_active())
212 colls::barrier(comm_);
213 XBT_DEBUG("Leaving fence");
218 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
219 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
221 //get receiver pointer
222 Win* recv_win = connected_wins_[target_rank];
224 CHECK_WIN_LOCKED(recv_win)
225 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
227 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
229 if (target_rank != rank_) { // This is not for myself, so we need to send messages
230 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
231 // prepare send_request
233 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
236 //prepare receiver request
237 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
238 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
243 if(request!=nullptr){
246 const std::scoped_lock lock(*mut_);
247 requests_.push_back(sreq);
250 //push request to receiver's win
251 const std::scoped_lock recv_lock(*recv_win->mut_);
252 recv_win->requests_.push_back(rreq);
255 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
256 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
258 *request = MPI_REQUEST_NULL;
264 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
265 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
268 Win* send_win = connected_wins_[target_rank];
270 CHECK_WIN_LOCKED(send_win)
271 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
273 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
274 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
276 if (target_rank != rank_) {
277 //prepare send_request
278 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
279 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
281 //prepare receiver request
282 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
283 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
285 //start the send, with another process than us as sender.
287 // push request to sender's win
288 if (const std::scoped_lock send_lock(*send_win->mut_); true) {
289 send_win->requests_.push_back(sreq);
295 if(request!=nullptr){
298 const std::scoped_lock lock(*mut_);
299 requests_.push_back(rreq);
302 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
304 *request=MPI_REQUEST_NULL;
309 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
310 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
312 XBT_DEBUG("Entering MPI_Win_Accumulate");
313 //get receiver pointer
314 Win* recv_win = connected_wins_[target_rank];
316 //FIXME: local version
317 CHECK_WIN_LOCKED(recv_win)
318 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
320 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
321 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
322 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
323 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
324 // prepare send_request
326 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
327 SMPI_RMA_TAG - 3 - count_, comm_, op);
329 // prepare receiver request
330 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
331 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
337 // push request to receiver's win
338 if (const std::scoped_lock recv_lock(*recv_win->mut_); true) {
339 recv_win->requests_.push_back(rreq);
343 if (request != nullptr) {
346 const std::scoped_lock lock(*mut_);
347 requests_.push_back(sreq);
350 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
351 // 'flush' is a workaround to fix that.
353 XBT_DEBUG("Leaving MPI_Win_Accumulate");
357 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
358 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
359 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
362 const Win* send_win = connected_wins_[target_rank];
364 CHECK_WIN_LOCKED(send_win)
365 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
367 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
368 //need to be sure ops are correctly ordered, so finish request here ? slow.
369 MPI_Request req = MPI_REQUEST_NULL;
370 const std::scoped_lock lock(*send_win->atomic_mut_);
371 get(result_addr, result_count, result_datatype, target_rank,
372 target_disp, target_count, target_datatype, &req);
373 if (req != MPI_REQUEST_NULL)
374 Request::wait(&req, MPI_STATUS_IGNORE);
376 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
377 target_disp, target_count, target_datatype, op, &req);
378 if (req != MPI_REQUEST_NULL)
379 Request::wait(&req, MPI_STATUS_IGNORE);
383 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
384 int target_rank, MPI_Aint target_disp)
387 const Win* send_win = connected_wins_[target_rank];
389 CHECK_WIN_LOCKED(send_win)
391 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
392 MPI_Request req = MPI_REQUEST_NULL;
393 const std::scoped_lock lock(*send_win->atomic_mut_);
394 get(result_addr, 1, datatype, target_rank,
395 target_disp, 1, datatype, &req);
396 if (req != MPI_REQUEST_NULL)
397 Request::wait(&req, MPI_STATUS_IGNORE);
398 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
399 put(origin_addr, 1, datatype, target_rank,
400 target_disp, 1, datatype);
405 int Win::start(MPI_Group group, int /*assert*/)
407 /* From MPI forum advices
408 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
409 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
410 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
411 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
412 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
413 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
414 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
415 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
416 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
417 must complete, without further dependencies. */
419 //naive, blocking implementation.
420 XBT_DEBUG("Entering MPI_Win_Start");
421 std::vector<MPI_Request> reqs;
422 for (int i = 0; i < group->size(); i++) {
423 int src = comm_->group()->rank(group->actor(i));
424 xbt_assert(src != MPI_UNDEFINED);
426 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
428 int size = static_cast<int>(reqs.size());
430 Request::startall(size, reqs.data());
431 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
432 for (auto& req : reqs)
433 Request::unref(&req);
437 opened_--; // we're open for business !
438 XBT_DEBUG("Leaving MPI_Win_Start");
442 int Win::post(MPI_Group group, int /*assert*/)
444 //let's make a synchronous send here
445 XBT_DEBUG("Entering MPI_Win_Post");
446 std::vector<MPI_Request> reqs;
447 for (int i = 0; i < group->size(); i++) {
448 int dst = comm_->group()->rank(group->actor(i));
449 xbt_assert(dst != MPI_UNDEFINED);
451 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
453 int size = static_cast<int>(reqs.size());
455 Request::startall(size, reqs.data());
456 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
457 for (auto& req : reqs)
458 Request::unref(&req);
462 opened_--; // we're open for business !
463 XBT_DEBUG("Leaving MPI_Win_Post");
468 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
470 XBT_DEBUG("Entering MPI_Win_Complete");
471 std::vector<MPI_Request> reqs;
472 for (int i = 0; i < dst_group_->size(); i++) {
473 int dst = comm_->group()->rank(dst_group_->actor(i));
474 xbt_assert(dst != MPI_UNDEFINED);
476 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
478 int size = static_cast<int>(reqs.size());
480 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
481 Request::startall(size, reqs.data());
482 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
483 for (auto& req : reqs)
484 Request::unref(&req);
488 opened_++; //we're closed for business !
489 Group::unref(dst_group_);
490 dst_group_ = MPI_GROUP_NULL;
495 //naive, blocking implementation.
496 XBT_DEBUG("Entering MPI_Win_Wait");
497 std::vector<MPI_Request> reqs;
498 for (int i = 0; i < src_group_->size(); i++) {
499 int src = comm_->group()->rank(src_group_->actor(i));
500 xbt_assert(src != MPI_UNDEFINED);
502 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
504 int size = static_cast<int>(reqs.size());
506 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
507 Request::startall(size, reqs.data());
508 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
509 for (auto& req : reqs)
510 Request::unref(&req);
514 opened_++; //we're closed for business !
515 Group::unref(src_group_);
516 src_group_ = MPI_GROUP_NULL;
520 int Win::lock(int lock_type, int rank, int /*assert*/)
522 MPI_Win target_win = connected_wins_[rank];
524 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
525 target_win->lock_mut_->lock();
526 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
527 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
528 target_win->lock_mut_->unlock();
530 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
531 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
533 target_win->lockers_.push_back(rank_);
539 int Win::lock_all(int assert){
540 int retval = MPI_SUCCESS;
541 for (int i = 0; i < comm_->size(); i++) {
542 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
543 if (ret != MPI_SUCCESS)
549 int Win::unlock(int rank){
550 MPI_Win target_win = connected_wins_[rank];
551 int target_mode = target_win->mode_;
552 target_win->mode_= 0;
553 target_win->lockers_.remove(rank_);
554 if (target_mode==MPI_LOCK_EXCLUSIVE){
555 target_win->lock_mut_->unlock();
562 int Win::unlock_all(){
563 int retval = MPI_SUCCESS;
564 for (int i = 0; i < comm_->size(); i++) {
565 int ret = this->unlock(i);
566 if (ret != MPI_SUCCESS)
572 int Win::flush(int rank){
573 int finished = finish_comms(rank);
574 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
576 finished = connected_wins_[rank]->finish_comms(rank_);
577 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
582 int Win::flush_local(int rank){
583 int finished = finish_comms(rank);
584 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
588 int Win::flush_all(){
589 int finished = finish_comms();
590 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
591 for (int i = 0; i < comm_->size(); i++) {
593 finished = connected_wins_[i]->finish_comms(rank_);
594 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
600 int Win::flush_local_all(){
601 int finished = finish_comms();
602 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
606 Win* Win::f2c(int id){
607 return static_cast<Win*>(F2C::f2c(id));
610 int Win::finish_comms(){
611 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
612 // Without this, the vector could get redimensioned when another process pushes.
613 // This would result in the array used by Request::waitall() to be invalidated.
614 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
615 const std::scoped_lock lock(*mut_);
616 //Finish own requests
617 int size = static_cast<int>(requests_.size());
619 MPI_Request* treqs = requests_.data();
620 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
626 int Win::finish_comms(int rank){
627 // See comment about the mutex in finish_comms() above
628 const std::scoped_lock lock(*mut_);
629 // Finish own requests
630 // Let's see if we're either the destination or the sender of this request
631 // because we only wait for requests that we are responsible for.
632 // Also use the process id here since the request itself returns from src()
633 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
634 aid_t proc_id = comm_->group()->actor(rank);
635 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
636 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
638 std::vector<MPI_Request> myreqqs(it, end(requests_));
639 requests_.erase(it, end(requests_));
640 int size = static_cast<int>(myreqqs.size());
642 MPI_Request* treqs = myreqqs.data();
643 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
649 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
651 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
652 for (int i = 0; not target_win && i < comm_->size(); i++) {
653 if (connected_wins_[i]->size_ > 0)
654 target_win = connected_wins_[i];
657 *size = target_win->size_;
658 *disp_unit = target_win->disp_unit_;
659 *static_cast<void**>(baseptr) = target_win->base_;
662 *static_cast<void**>(baseptr) = nullptr;
667 MPI_Errhandler Win::errhandler()
669 if (errhandler_ != MPI_ERRHANDLER_NULL)
674 void Win::set_errhandler(MPI_Errhandler errhandler)
676 if (errhandler_ != MPI_ERRHANDLER_NULL)
677 simgrid::smpi::Errhandler::unref(errhandler_);
678 errhandler_ = errhandler;
679 if (errhandler_ != MPI_ERRHANDLER_NULL)
682 } // namespace simgrid::smpi