1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24 if(target_count*target_datatype->get_extent()>win->size_){\
25 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28 return MPI_ERR_RMA_RANGE;\
31 #define CHECK_WIN_LOCKED(win) \
32 if (opened_ == 0) { /*check that post/start has been done*/ \
33 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
46 , disp_unit_(disp_unit)
49 , connected_wins_(comm->size())
51 , allocated_(allocated)
54 XBT_DEBUG("Creating window");
55 if(info!=MPI_INFO_NULL)
57 connected_wins_[rank_] = this;
59 comm->add_rma_win(this);
62 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64 if (MC_is_active() || MC_record_replay_is_active()){
65 s4u::Barrier* bar_ptr;
67 bar_ = s4u::Barrier::create(comm->size());
70 colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
72 bar_ = s4u::BarrierPtr(bar_ptr);
77 int Win::del(Win* win){
78 //As per the standard, perform a barrier to ensure every async comm is finished
79 if (MC_is_active() || MC_record_replay_is_active())
82 colls::barrier(win->comm_);
83 win->flush_local_all();
85 if (win->info_ != MPI_INFO_NULL)
86 simgrid::smpi::Info::unref(win->info_);
87 if (win->errhandler_ != MPI_ERRHANDLER_NULL)
88 simgrid::smpi::Errhandler::unref(win->errhandler_);
90 win->comm_->remove_rma_win(win);
92 colls::barrier(win->comm_);
93 Comm::unref(win->comm_);
94 if (!win->lockers_.empty() || win->opened_ < 0){
95 XBT_WARN("Freeing a locked or opened window");
101 F2C::free_f(win->f2c_id());
102 win->cleanup_attr<Win>();
108 int Win::attach(void* /*base*/, MPI_Aint size)
110 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
112 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
117 int Win::detach(const void* /*base*/)
124 void Win::get_name(char* name, int* length) const
126 *length = static_cast<int>(name_.length());
127 if (not name_.empty()) {
128 name_.copy(name, *length);
129 name[*length] = '\0';
133 void Win::get_group(MPI_Group* group){
134 if(comm_ != MPI_COMM_NULL){
135 *group = comm_->group();
137 *group = MPI_GROUP_NULL;
146 int Win::rank() const
151 MPI_Comm Win::comm() const
156 MPI_Aint Win::size() const
161 void* Win::base() const
166 int Win::disp_unit() const
171 bool Win::dynamic() const
176 void Win::set_info(MPI_Info info)
178 if (info_ != MPI_INFO_NULL)
179 simgrid::smpi::Info::unref(info_);
181 if (info_ != MPI_INFO_NULL)
185 void Win::set_name(const char* name){
189 int Win::fence(int assert)
191 XBT_DEBUG("Entering fence");
193 if (not (assert & MPI_MODE_NOPRECEDE)) {
194 // This is not the first fence => finalize what came before
195 if (MC_is_active() || MC_record_replay_is_active())
198 colls::barrier(comm_);
203 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
206 if (MC_is_active() || MC_record_replay_is_active())
209 colls::barrier(comm_);
210 XBT_DEBUG("Leaving fence");
215 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
216 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
218 //get receiver pointer
219 Win* recv_win = connected_wins_[target_rank];
221 CHECK_WIN_LOCKED(recv_win)
222 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
224 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
226 if (target_rank != rank_) { // This is not for myself, so we need to send messages
227 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
228 // prepare send_request
230 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
233 //prepare receiver request
234 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
235 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
240 if(request!=nullptr){
244 requests_.push_back(sreq);
248 //push request to receiver's win
249 recv_win->mut_->lock();
250 recv_win->requests_.push_back(rreq);
252 recv_win->mut_->unlock();
254 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
255 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
257 *request = MPI_REQUEST_NULL;
263 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
264 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
267 Win* send_win = connected_wins_[target_rank];
269 CHECK_WIN_LOCKED(send_win)
270 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
272 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
273 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
275 if (target_rank != rank_) {
276 //prepare send_request
277 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
278 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
280 //prepare receiver request
281 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
282 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
284 //start the send, with another process than us as sender.
286 // push request to sender's win
287 send_win->mut_->lock();
288 send_win->requests_.push_back(sreq);
289 send_win->mut_->unlock();
294 if(request!=nullptr){
298 requests_.push_back(rreq);
302 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
304 *request=MPI_REQUEST_NULL;
309 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
310 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
312 XBT_DEBUG("Entering MPI_Win_Accumulate");
313 //get receiver pointer
314 Win* recv_win = connected_wins_[target_rank];
316 //FIXME: local version
317 CHECK_WIN_LOCKED(recv_win)
318 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
320 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
321 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
322 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
323 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
324 // prepare send_request
326 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
327 SMPI_RMA_TAG - 3 - count_, comm_, op);
329 // prepare receiver request
330 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
331 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
337 // push request to receiver's win
338 recv_win->mut_->lock();
339 recv_win->requests_.push_back(rreq);
341 recv_win->mut_->unlock();
343 if (request != nullptr) {
347 requests_.push_back(sreq);
351 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
352 // 'flush' is a workaround to fix that.
354 XBT_DEBUG("Leaving MPI_Win_Accumulate");
358 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
359 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
360 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
363 const Win* send_win = connected_wins_[target_rank];
365 CHECK_WIN_LOCKED(send_win)
366 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
368 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
369 //need to be sure ops are correctly ordered, so finish request here ? slow.
370 MPI_Request req = MPI_REQUEST_NULL;
371 send_win->atomic_mut_->lock();
372 get(result_addr, result_count, result_datatype, target_rank,
373 target_disp, target_count, target_datatype, &req);
374 if (req != MPI_REQUEST_NULL)
375 Request::wait(&req, MPI_STATUS_IGNORE);
377 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
378 target_disp, target_count, target_datatype, op, &req);
379 if (req != MPI_REQUEST_NULL)
380 Request::wait(&req, MPI_STATUS_IGNORE);
381 send_win->atomic_mut_->unlock();
385 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
386 int target_rank, MPI_Aint target_disp)
389 const Win* send_win = connected_wins_[target_rank];
391 CHECK_WIN_LOCKED(send_win)
393 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
394 MPI_Request req = MPI_REQUEST_NULL;
395 send_win->atomic_mut_->lock();
396 get(result_addr, 1, datatype, target_rank,
397 target_disp, 1, datatype, &req);
398 if (req != MPI_REQUEST_NULL)
399 Request::wait(&req, MPI_STATUS_IGNORE);
400 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
401 put(origin_addr, 1, datatype, target_rank,
402 target_disp, 1, datatype);
404 send_win->atomic_mut_->unlock();
408 int Win::start(MPI_Group group, int /*assert*/)
410 /* From MPI forum advices
411 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
412 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
413 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
414 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
415 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
416 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
417 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
418 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
419 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
420 must complete, without further dependencies. */
422 //naive, blocking implementation.
423 XBT_DEBUG("Entering MPI_Win_Start");
424 std::vector<MPI_Request> reqs;
425 for (int i = 0; i < group->size(); i++) {
426 int src = comm_->group()->rank(group->actor(i));
427 xbt_assert(src != MPI_UNDEFINED);
429 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
431 int size = static_cast<int>(reqs.size());
433 Request::startall(size, reqs.data());
434 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
435 for (auto& req : reqs)
436 Request::unref(&req);
440 opened_--; // we're open for business !
441 XBT_DEBUG("Leaving MPI_Win_Start");
445 int Win::post(MPI_Group group, int /*assert*/)
447 //let's make a synchronous send here
448 XBT_DEBUG("Entering MPI_Win_Post");
449 std::vector<MPI_Request> reqs;
450 for (int i = 0; i < group->size(); i++) {
451 int dst = comm_->group()->rank(group->actor(i));
452 xbt_assert(dst != MPI_UNDEFINED);
454 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
456 int size = static_cast<int>(reqs.size());
458 Request::startall(size, reqs.data());
459 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
460 for (auto& req : reqs)
461 Request::unref(&req);
465 opened_--; // we're open for business !
466 XBT_DEBUG("Leaving MPI_Win_Post");
471 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
473 XBT_DEBUG("Entering MPI_Win_Complete");
474 std::vector<MPI_Request> reqs;
475 for (int i = 0; i < dst_group_->size(); i++) {
476 int dst = comm_->group()->rank(dst_group_->actor(i));
477 xbt_assert(dst != MPI_UNDEFINED);
479 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
481 int size = static_cast<int>(reqs.size());
483 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
484 Request::startall(size, reqs.data());
485 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
486 for (auto& req : reqs)
487 Request::unref(&req);
491 opened_++; //we're closed for business !
492 Group::unref(dst_group_);
493 dst_group_ = MPI_GROUP_NULL;
498 //naive, blocking implementation.
499 XBT_DEBUG("Entering MPI_Win_Wait");
500 std::vector<MPI_Request> reqs;
501 for (int i = 0; i < src_group_->size(); i++) {
502 int src = comm_->group()->rank(src_group_->actor(i));
503 xbt_assert(src != MPI_UNDEFINED);
505 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
507 int size = static_cast<int>(reqs.size());
509 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
510 Request::startall(size, reqs.data());
511 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
512 for (auto& req : reqs)
513 Request::unref(&req);
517 opened_++; //we're closed for business !
518 Group::unref(src_group_);
519 src_group_ = MPI_GROUP_NULL;
523 int Win::lock(int lock_type, int rank, int /*assert*/)
525 MPI_Win target_win = connected_wins_[rank];
527 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
528 target_win->lock_mut_->lock();
529 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
530 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
531 target_win->lock_mut_->unlock();
533 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
534 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
536 target_win->lockers_.push_back(rank_);
542 int Win::lock_all(int assert){
543 int retval = MPI_SUCCESS;
544 for (int i = 0; i < comm_->size(); i++) {
545 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
546 if (ret != MPI_SUCCESS)
552 int Win::unlock(int rank){
553 MPI_Win target_win = connected_wins_[rank];
554 int target_mode = target_win->mode_;
555 target_win->mode_= 0;
556 target_win->lockers_.remove(rank_);
557 if (target_mode==MPI_LOCK_EXCLUSIVE){
558 target_win->lock_mut_->unlock();
565 int Win::unlock_all(){
566 int retval = MPI_SUCCESS;
567 for (int i = 0; i < comm_->size(); i++) {
568 int ret = this->unlock(i);
569 if (ret != MPI_SUCCESS)
575 int Win::flush(int rank){
576 int finished = finish_comms(rank);
577 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
579 finished = connected_wins_[rank]->finish_comms(rank_);
580 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
585 int Win::flush_local(int rank){
586 int finished = finish_comms(rank);
587 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
591 int Win::flush_all(){
592 int finished = finish_comms();
593 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
594 for (int i = 0; i < comm_->size(); i++) {
596 finished = connected_wins_[i]->finish_comms(rank_);
597 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
603 int Win::flush_local_all(){
604 int finished = finish_comms();
605 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
609 Win* Win::f2c(int id){
610 return static_cast<Win*>(F2C::f2c(id));
613 int Win::finish_comms(){
614 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
615 // Without this, the vector could get redimensioned when another process pushes.
616 // This would result in the array used by Request::waitall() to be invalidated.
617 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
619 //Finish own requests
620 int size = static_cast<int>(requests_.size());
622 MPI_Request* treqs = requests_.data();
623 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
630 int Win::finish_comms(int rank){
631 // See comment about the mutex in finish_comms() above
633 // Finish own requests
634 // Let's see if we're either the destination or the sender of this request
635 // because we only wait for requests that we are responsible for.
636 // Also use the process id here since the request itself returns from src()
637 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
638 aid_t proc_id = comm_->group()->actor(rank);
639 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
640 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
642 std::vector<MPI_Request> myreqqs(it, end(requests_));
643 requests_.erase(it, end(requests_));
644 int size = static_cast<int>(myreqqs.size());
646 MPI_Request* treqs = myreqqs.data();
647 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
654 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
656 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
657 for (int i = 0; not target_win && i < comm_->size(); i++) {
658 if (connected_wins_[i]->size_ > 0)
659 target_win = connected_wins_[i];
662 *size = target_win->size_;
663 *disp_unit = target_win->disp_unit_;
664 *static_cast<void**>(baseptr) = target_win->base_;
667 *static_cast<void**>(baseptr) = nullptr;
672 MPI_Errhandler Win::errhandler()
674 if (errhandler_ != MPI_ERRHANDLER_NULL)
679 void Win::set_errhandler(MPI_Errhandler errhandler)
681 if (errhandler_ != MPI_ERRHANDLER_NULL)
682 simgrid::smpi::Errhandler::unref(errhandler_);
683 errhandler_ = errhandler;
684 if (errhandler_ != MPI_ERRHANDLER_NULL)
688 } // namespace simgrid