1 /* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
20 #include <mutex> // std::scoped_lock
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
24 #define CHECK_RMA_REMOTE_WIN(fun, win)\
25 if(target_count*target_datatype->get_extent()>win->size_){\
26 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
27 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
28 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
29 return MPI_ERR_RMA_RANGE;\
32 #define CHECK_WIN_LOCKED(win) \
33 if (opened_ == 0) { /*check that post/start has been done*/ \
34 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
39 namespace simgrid::smpi {
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
46 , disp_unit_(disp_unit)
49 , connected_wins_(comm->size())
51 , allocated_(allocated)
54 XBT_DEBUG("Creating window");
55 if(info!=MPI_INFO_NULL)
57 connected_wins_[rank_] = this;
59 comm->add_rma_win(this);
62 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64 if (MC_is_active() || MC_record_replay_is_active()){
65 s4u::Barrier* bar_ptr;
67 bar_ = s4u::Barrier::create(comm->size());
70 colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
72 bar_ = s4u::BarrierPtr(bar_ptr);
77 int Win::del(Win* win){
78 //As per the standard, perform a barrier to ensure every async comm is finished
79 if (MC_is_active() || MC_record_replay_is_active())
82 colls::barrier(win->comm_);
83 win->flush_local_all();
85 if (win->info_ != MPI_INFO_NULL)
86 simgrid::smpi::Info::unref(win->info_);
87 if (win->errhandler_ != MPI_ERRHANDLER_NULL)
88 simgrid::smpi::Errhandler::unref(win->errhandler_);
90 win->comm_->remove_rma_win(win);
92 colls::barrier(win->comm_);
93 Comm::unref(win->comm_);
94 if (not win->lockers_.empty() || win->opened_ < 0) {
95 XBT_WARN("Freeing a locked or opened window");
100 if (win->mut_->get_owner() != nullptr)
103 F2C::free_f(win->f2c_id());
104 win->cleanup_attr<Win>();
110 int Win::attach(void* /*base*/, MPI_Aint size)
112 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
114 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
119 int Win::detach(const void* /*base*/)
126 void Win::get_name(char* name, int* length) const
128 *length = static_cast<int>(name_.length());
129 if (not name_.empty()) {
130 name_.copy(name, *length);
131 name[*length] = '\0';
135 void Win::get_group(MPI_Group* group){
136 if(comm_ != MPI_COMM_NULL){
137 *group = comm_->group();
139 *group = MPI_GROUP_NULL;
148 int Win::rank() const
153 MPI_Comm Win::comm() const
158 MPI_Aint Win::size() const
163 void* Win::base() const
168 int Win::disp_unit() const
173 bool Win::dynamic() const
178 void Win::set_info(MPI_Info info)
180 if (info_ != MPI_INFO_NULL)
181 simgrid::smpi::Info::unref(info_);
183 if (info_ != MPI_INFO_NULL)
187 void Win::set_name(const char* name){
191 int Win::fence(int assert)
193 XBT_DEBUG("Entering fence");
195 if (not (assert & MPI_MODE_NOPRECEDE)) {
196 // This is not the first fence => finalize what came before
197 if (MC_is_active() || MC_record_replay_is_active())
200 colls::barrier(comm_);
205 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
208 if (MC_is_active() || MC_record_replay_is_active())
211 colls::barrier(comm_);
212 XBT_DEBUG("Leaving fence");
217 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
218 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
220 //get receiver pointer
221 Win* recv_win = connected_wins_[target_rank];
223 CHECK_WIN_LOCKED(recv_win)
224 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
226 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
228 if (target_rank != rank_) { // This is not for myself, so we need to send messages
229 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
230 // prepare send_request
232 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
235 //prepare receiver request
236 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
237 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
242 if(request!=nullptr){
245 const std::scoped_lock lock(*mut_);
246 requests_.push_back(sreq);
249 //push request to receiver's win
250 const std::scoped_lock recv_lock(*recv_win->mut_);
251 recv_win->requests_.push_back(rreq);
254 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
255 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
257 *request = MPI_REQUEST_NULL;
263 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
264 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
267 Win* send_win = connected_wins_[target_rank];
269 CHECK_WIN_LOCKED(send_win)
270 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
272 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
273 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
275 if (target_rank != rank_) {
276 //prepare send_request
277 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
278 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
280 //prepare receiver request
281 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
282 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
284 //start the send, with another process than us as sender.
286 // push request to sender's win
287 if (const std::scoped_lock send_lock(*send_win->mut_); true) {
288 send_win->requests_.push_back(sreq);
294 if(request!=nullptr){
297 const std::scoped_lock lock(*mut_);
298 requests_.push_back(rreq);
301 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
303 *request=MPI_REQUEST_NULL;
308 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
309 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
311 XBT_DEBUG("Entering MPI_Win_Accumulate");
312 //get receiver pointer
313 Win* recv_win = connected_wins_[target_rank];
315 //FIXME: local version
316 CHECK_WIN_LOCKED(recv_win)
317 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
319 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
320 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
321 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
322 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
323 // prepare send_request
325 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
326 SMPI_RMA_TAG - 3 - count_, comm_, op);
328 // prepare receiver request
329 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
330 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
336 // push request to receiver's win
337 if (const std::scoped_lock recv_lock(*recv_win->mut_); true) {
338 recv_win->requests_.push_back(rreq);
342 if (request != nullptr) {
345 const std::scoped_lock lock(*mut_);
346 requests_.push_back(sreq);
349 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
350 // 'flush' is a workaround to fix that.
352 XBT_DEBUG("Leaving MPI_Win_Accumulate");
356 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
357 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
358 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
361 const Win* send_win = connected_wins_[target_rank];
363 CHECK_WIN_LOCKED(send_win)
364 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
366 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
367 //need to be sure ops are correctly ordered, so finish request here ? slow.
368 MPI_Request req = MPI_REQUEST_NULL;
369 const std::scoped_lock lock(*send_win->atomic_mut_);
370 get(result_addr, result_count, result_datatype, target_rank,
371 target_disp, target_count, target_datatype, &req);
372 if (req != MPI_REQUEST_NULL)
373 Request::wait(&req, MPI_STATUS_IGNORE);
375 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
376 target_disp, target_count, target_datatype, op, &req);
377 if (req != MPI_REQUEST_NULL)
378 Request::wait(&req, MPI_STATUS_IGNORE);
382 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
383 int target_rank, MPI_Aint target_disp)
386 const Win* send_win = connected_wins_[target_rank];
388 CHECK_WIN_LOCKED(send_win)
390 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
391 MPI_Request req = MPI_REQUEST_NULL;
392 const std::scoped_lock lock(*send_win->atomic_mut_);
393 get(result_addr, 1, datatype, target_rank,
394 target_disp, 1, datatype, &req);
395 if (req != MPI_REQUEST_NULL)
396 Request::wait(&req, MPI_STATUS_IGNORE);
397 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
398 put(origin_addr, 1, datatype, target_rank,
399 target_disp, 1, datatype);
404 int Win::start(MPI_Group group, int /*assert*/)
406 /* From MPI forum advices
407 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
408 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
409 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
410 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
411 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
412 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
413 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
414 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
415 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
416 must complete, without further dependencies. */
418 //naive, blocking implementation.
419 XBT_DEBUG("Entering MPI_Win_Start");
420 std::vector<MPI_Request> reqs;
421 for (int i = 0; i < group->size(); i++) {
422 int src = comm_->group()->rank(group->actor(i));
423 xbt_assert(src != MPI_UNDEFINED);
425 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
427 int size = static_cast<int>(reqs.size());
429 Request::startall(size, reqs.data());
430 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
431 for (auto& req : reqs)
432 Request::unref(&req);
436 opened_--; // we're open for business !
437 XBT_DEBUG("Leaving MPI_Win_Start");
441 int Win::post(MPI_Group group, int /*assert*/)
443 //let's make a synchronous send here
444 XBT_DEBUG("Entering MPI_Win_Post");
445 std::vector<MPI_Request> reqs;
446 for (int i = 0; i < group->size(); i++) {
447 int dst = comm_->group()->rank(group->actor(i));
448 xbt_assert(dst != MPI_UNDEFINED);
450 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
452 int size = static_cast<int>(reqs.size());
454 Request::startall(size, reqs.data());
455 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
456 for (auto& req : reqs)
457 Request::unref(&req);
461 opened_--; // we're open for business !
462 XBT_DEBUG("Leaving MPI_Win_Post");
467 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
469 XBT_DEBUG("Entering MPI_Win_Complete");
470 std::vector<MPI_Request> reqs;
471 for (int i = 0; i < dst_group_->size(); i++) {
472 int dst = comm_->group()->rank(dst_group_->actor(i));
473 xbt_assert(dst != MPI_UNDEFINED);
475 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
477 int size = static_cast<int>(reqs.size());
479 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
480 Request::startall(size, reqs.data());
481 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
482 for (auto& req : reqs)
483 Request::unref(&req);
487 opened_++; //we're closed for business !
488 Group::unref(dst_group_);
489 dst_group_ = MPI_GROUP_NULL;
494 //naive, blocking implementation.
495 XBT_DEBUG("Entering MPI_Win_Wait");
496 std::vector<MPI_Request> reqs;
497 for (int i = 0; i < src_group_->size(); i++) {
498 int src = comm_->group()->rank(src_group_->actor(i));
499 xbt_assert(src != MPI_UNDEFINED);
501 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
503 int size = static_cast<int>(reqs.size());
505 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
506 Request::startall(size, reqs.data());
507 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
508 for (auto& req : reqs)
509 Request::unref(&req);
513 opened_++; //we're closed for business !
514 Group::unref(src_group_);
515 src_group_ = MPI_GROUP_NULL;
519 int Win::lock(int lock_type, int rank, int /*assert*/)
521 MPI_Win target_win = connected_wins_[rank];
523 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
524 target_win->lock_mut_->lock();
525 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
526 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
527 target_win->lock_mut_->unlock();
529 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
530 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
532 target_win->lockers_.push_back(rank_);
538 int Win::lock_all(int assert){
539 int retval = MPI_SUCCESS;
540 for (int i = 0; i < comm_->size(); i++) {
541 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
542 if (ret != MPI_SUCCESS)
548 int Win::unlock(int rank){
549 MPI_Win target_win = connected_wins_[rank];
550 int target_mode = target_win->mode_;
551 target_win->mode_= 0;
552 target_win->lockers_.remove(rank_);
553 if (target_mode==MPI_LOCK_EXCLUSIVE){
554 target_win->lock_mut_->unlock();
561 int Win::unlock_all(){
562 int retval = MPI_SUCCESS;
563 for (int i = 0; i < comm_->size(); i++) {
564 int ret = this->unlock(i);
565 if (ret != MPI_SUCCESS)
571 int Win::flush(int rank){
572 int finished = finish_comms(rank);
573 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
575 finished = connected_wins_[rank]->finish_comms(rank_);
576 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
581 int Win::flush_local(int rank){
582 int finished = finish_comms(rank);
583 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
587 int Win::flush_all(){
588 int finished = finish_comms();
589 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
590 for (int i = 0; i < comm_->size(); i++) {
592 finished = connected_wins_[i]->finish_comms(rank_);
593 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
599 int Win::flush_local_all(){
600 int finished = finish_comms();
601 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
605 Win* Win::f2c(int id){
606 return static_cast<Win*>(F2C::f2c(id));
609 int Win::finish_comms(){
610 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
611 // Without this, the vector could get redimensioned when another process pushes.
612 // This would result in the array used by Request::waitall() to be invalidated.
613 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
614 const std::scoped_lock lock(*mut_);
615 //Finish own requests
616 int size = static_cast<int>(requests_.size());
618 MPI_Request* treqs = requests_.data();
619 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
625 int Win::finish_comms(int rank){
626 // See comment about the mutex in finish_comms() above
627 const std::scoped_lock lock(*mut_);
628 // Finish own requests
629 // Let's see if we're either the destination or the sender of this request
630 // because we only wait for requests that we are responsible for.
631 // Also use the process id here since the request itself returns from src()
632 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
633 aid_t proc_id = comm_->group()->actor(rank);
634 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
635 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
637 std::vector<MPI_Request> myreqqs(it, end(requests_));
638 requests_.erase(it, end(requests_));
639 int size = static_cast<int>(myreqqs.size());
641 MPI_Request* treqs = myreqqs.data();
642 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
648 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
650 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
651 for (int i = 0; not target_win && i < comm_->size(); i++) {
652 if (connected_wins_[i]->size_ > 0)
653 target_win = connected_wins_[i];
656 *size = target_win->size_;
657 *disp_unit = target_win->disp_unit_;
658 *static_cast<void**>(baseptr) = target_win->base_;
661 *static_cast<void**>(baseptr) = nullptr;
666 MPI_Errhandler Win::errhandler()
668 if (errhandler_ != MPI_ERRHANDLER_NULL)
673 void Win::set_errhandler(MPI_Errhandler errhandler)
675 if (errhandler_ != MPI_ERRHANDLER_NULL)
676 simgrid::smpi::Errhandler::unref(errhandler_);
677 errhandler_ = errhandler;
678 if (errhandler_ != MPI_ERRHANDLER_NULL)
681 } // namespace simgrid::smpi