1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24 if(target_count*target_datatype->get_extent()>win->size_){\
25 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28 return MPI_ERR_RMA_RANGE;\
31 #define CHECK_WIN_LOCKED(win) \
32 if (opened_ == 0) { /*check that post/start has been done*/ \
33 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
46 , disp_unit_(disp_unit)
49 , connected_wins_(comm->size())
51 , allocated_(allocated)
54 XBT_DEBUG("Creating window");
55 if(info!=MPI_INFO_NULL)
57 connected_wins_[rank_] = this;
59 comm->add_rma_win(this);
62 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64 if (MC_is_active() || MC_record_replay_is_active()){
65 s4u::Barrier* bar_ptr;
67 bar_ = s4u::Barrier::create(comm->size());
70 colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
72 bar_ = s4u::BarrierPtr(bar_ptr);
81 //As per the standard, perform a barrier to ensure every async comm is finished
82 if (MC_is_active() || MC_record_replay_is_active())
85 colls::barrier(comm_);
88 if (info_ != MPI_INFO_NULL)
89 simgrid::smpi::Info::unref(info_);
90 if (errhandler_ != MPI_ERRHANDLER_NULL)
91 simgrid::smpi::Errhandler::unref(errhandler_);
93 comm_->remove_rma_win(this);
95 colls::barrier(comm_);
101 F2C::free_f(this->f2c_id());
105 int Win::attach(void* /*base*/, MPI_Aint size)
107 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
109 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
114 int Win::detach(const void* /*base*/)
121 void Win::get_name(char* name, int* length) const
123 *length = static_cast<int>(name_.length());
124 if (not name_.empty()) {
125 name_.copy(name, *length);
126 name[*length] = '\0';
130 void Win::get_group(MPI_Group* group){
131 if(comm_ != MPI_COMM_NULL){
132 *group = comm_->group();
134 *group = MPI_GROUP_NULL;
143 int Win::rank() const
148 MPI_Comm Win::comm() const
153 MPI_Aint Win::size() const
158 void* Win::base() const
163 int Win::disp_unit() const
168 bool Win::dynamic() const
173 void Win::set_info(MPI_Info info)
175 if (info_ != MPI_INFO_NULL)
176 simgrid::smpi::Info::unref(info_);
178 if (info_ != MPI_INFO_NULL)
182 void Win::set_name(const char* name){
186 int Win::fence(int assert)
188 XBT_DEBUG("Entering fence");
190 if (not (assert & MPI_MODE_NOPRECEDE)) {
191 // This is not the first fence => finalize what came before
192 if (MC_is_active() || MC_record_replay_is_active())
195 colls::barrier(comm_);
200 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
203 if (MC_is_active() || MC_record_replay_is_active())
206 colls::barrier(comm_);
207 XBT_DEBUG("Leaving fence");
212 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
213 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
215 //get receiver pointer
216 Win* recv_win = connected_wins_[target_rank];
218 CHECK_WIN_LOCKED(recv_win)
219 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
221 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
223 if (target_rank != rank_) { // This is not for myself, so we need to send messages
224 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
225 // prepare send_request
227 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
230 //prepare receiver request
231 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
232 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
237 if(request!=nullptr){
241 requests_.push_back(sreq);
245 //push request to receiver's win
246 recv_win->mut_->lock();
247 recv_win->requests_.push_back(rreq);
249 recv_win->mut_->unlock();
251 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
252 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
254 *request = MPI_REQUEST_NULL;
260 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
261 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
264 Win* send_win = connected_wins_[target_rank];
266 CHECK_WIN_LOCKED(send_win)
267 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
269 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
270 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
272 if (target_rank != rank_) {
273 //prepare send_request
274 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
275 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
277 //prepare receiver request
278 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
279 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
281 //start the send, with another process than us as sender.
283 // push request to sender's win
284 send_win->mut_->lock();
285 send_win->requests_.push_back(sreq);
286 send_win->mut_->unlock();
291 if(request!=nullptr){
295 requests_.push_back(rreq);
299 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
301 *request=MPI_REQUEST_NULL;
306 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
307 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
309 XBT_DEBUG("Entering MPI_Win_Accumulate");
310 //get receiver pointer
311 Win* recv_win = connected_wins_[target_rank];
313 //FIXME: local version
314 CHECK_WIN_LOCKED(recv_win)
315 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
317 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
318 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
319 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
320 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
321 // prepare send_request
323 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
324 SMPI_RMA_TAG - 3 - count_, comm_, op);
326 // prepare receiver request
327 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
328 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
334 // push request to receiver's win
335 recv_win->mut_->lock();
336 recv_win->requests_.push_back(rreq);
338 recv_win->mut_->unlock();
340 if (request != nullptr) {
344 requests_.push_back(sreq);
348 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
349 // 'flush' is a workaround to fix that.
351 XBT_DEBUG("Leaving MPI_Win_Accumulate");
355 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
356 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
357 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
360 const Win* send_win = connected_wins_[target_rank];
362 CHECK_WIN_LOCKED(send_win)
363 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
365 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
366 //need to be sure ops are correctly ordered, so finish request here ? slow.
367 MPI_Request req = MPI_REQUEST_NULL;
368 send_win->atomic_mut_->lock();
369 get(result_addr, result_count, result_datatype, target_rank,
370 target_disp, target_count, target_datatype, &req);
371 if (req != MPI_REQUEST_NULL)
372 Request::wait(&req, MPI_STATUS_IGNORE);
374 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
375 target_disp, target_count, target_datatype, op, &req);
376 if (req != MPI_REQUEST_NULL)
377 Request::wait(&req, MPI_STATUS_IGNORE);
378 send_win->atomic_mut_->unlock();
382 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
383 int target_rank, MPI_Aint target_disp)
386 const Win* send_win = connected_wins_[target_rank];
388 CHECK_WIN_LOCKED(send_win)
390 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
391 MPI_Request req = MPI_REQUEST_NULL;
392 send_win->atomic_mut_->lock();
393 get(result_addr, 1, datatype, target_rank,
394 target_disp, 1, datatype, &req);
395 if (req != MPI_REQUEST_NULL)
396 Request::wait(&req, MPI_STATUS_IGNORE);
397 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
398 put(origin_addr, 1, datatype, target_rank,
399 target_disp, 1, datatype);
401 send_win->atomic_mut_->unlock();
405 int Win::start(MPI_Group group, int /*assert*/)
407 /* From MPI forum advices
408 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
409 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
410 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
411 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
412 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
413 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
414 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
415 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
416 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
417 must complete, without further dependencies. */
419 //naive, blocking implementation.
420 XBT_DEBUG("Entering MPI_Win_Start");
421 std::vector<MPI_Request> reqs;
422 for (int i = 0; i < group->size(); i++) {
423 int src = comm_->group()->rank(group->actor(i));
424 xbt_assert(src != MPI_UNDEFINED);
426 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
428 int size = static_cast<int>(reqs.size());
430 Request::startall(size, reqs.data());
431 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
432 for (auto& req : reqs)
433 Request::unref(&req);
437 opened_++; // we're open for business !
438 XBT_DEBUG("Leaving MPI_Win_Start");
442 int Win::post(MPI_Group group, int /*assert*/)
444 //let's make a synchronous send here
445 XBT_DEBUG("Entering MPI_Win_Post");
446 std::vector<MPI_Request> reqs;
447 for (int i = 0; i < group->size(); i++) {
448 int dst = comm_->group()->rank(group->actor(i));
449 xbt_assert(dst != MPI_UNDEFINED);
451 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
453 int size = static_cast<int>(reqs.size());
455 Request::startall(size, reqs.data());
456 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
457 for (auto& req : reqs)
458 Request::unref(&req);
462 opened_++; // we're open for business !
463 XBT_DEBUG("Leaving MPI_Win_Post");
468 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
470 XBT_DEBUG("Entering MPI_Win_Complete");
471 std::vector<MPI_Request> reqs;
472 for (int i = 0; i < dst_group_->size(); i++) {
473 int dst = comm_->group()->rank(dst_group_->actor(i));
474 xbt_assert(dst != MPI_UNDEFINED);
476 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
478 int size = static_cast<int>(reqs.size());
480 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
481 Request::startall(size, reqs.data());
482 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
483 for (auto& req : reqs)
484 Request::unref(&req);
488 opened_--; //we're closed for business !
489 Group::unref(dst_group_);
490 dst_group_ = MPI_GROUP_NULL;
495 //naive, blocking implementation.
496 XBT_DEBUG("Entering MPI_Win_Wait");
497 std::vector<MPI_Request> reqs;
498 for (int i = 0; i < src_group_->size(); i++) {
499 int src = comm_->group()->rank(src_group_->actor(i));
500 xbt_assert(src != MPI_UNDEFINED);
502 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
504 int size = static_cast<int>(reqs.size());
506 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
507 Request::startall(size, reqs.data());
508 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
509 for (auto& req : reqs)
510 Request::unref(&req);
514 opened_--; //we're closed for business !
515 Group::unref(src_group_);
516 src_group_ = MPI_GROUP_NULL;
520 int Win::lock(int lock_type, int rank, int /*assert*/)
522 MPI_Win target_win = connected_wins_[rank];
524 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
525 target_win->lock_mut_->lock();
526 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
527 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
528 target_win->lock_mut_->unlock();
530 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
531 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
533 target_win->lockers_.push_back(rank_);
539 int Win::lock_all(int assert){
540 int retval = MPI_SUCCESS;
541 for (int i = 0; i < comm_->size(); i++) {
542 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
543 if (ret != MPI_SUCCESS)
549 int Win::unlock(int rank){
550 MPI_Win target_win = connected_wins_[rank];
551 int target_mode = target_win->mode_;
552 target_win->mode_= 0;
553 target_win->lockers_.remove(rank_);
554 if (target_mode==MPI_LOCK_EXCLUSIVE){
555 target_win->lock_mut_->unlock();
562 int Win::unlock_all(){
563 int retval = MPI_SUCCESS;
564 for (int i = 0; i < comm_->size(); i++) {
565 int ret = this->unlock(i);
566 if (ret != MPI_SUCCESS)
572 int Win::flush(int rank){
573 int finished = finish_comms(rank);
574 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
576 finished = connected_wins_[rank]->finish_comms(rank_);
577 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
582 int Win::flush_local(int rank){
583 int finished = finish_comms(rank);
584 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
588 int Win::flush_all(){
589 int finished = finish_comms();
590 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
591 for (int i = 0; i < comm_->size(); i++) {
593 finished = connected_wins_[i]->finish_comms(rank_);
594 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
600 int Win::flush_local_all(){
601 int finished = finish_comms();
602 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
606 Win* Win::f2c(int id){
607 return static_cast<Win*>(F2C::f2c(id));
610 int Win::finish_comms(){
611 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
612 // Without this, the vector could get redimensioned when another process pushes.
613 // This would result in the array used by Request::waitall() to be invalidated.
614 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
616 //Finish own requests
617 int size = static_cast<int>(requests_.size());
619 MPI_Request* treqs = requests_.data();
620 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
627 int Win::finish_comms(int rank){
628 // See comment about the mutex in finish_comms() above
630 // Finish own requests
631 // Let's see if we're either the destination or the sender of this request
632 // because we only wait for requests that we are responsible for.
633 // Also use the process id here since the request itself returns from src()
634 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
635 aid_t proc_id = comm_->group()->actor(rank);
636 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
637 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
639 std::vector<MPI_Request> myreqqs(it, end(requests_));
640 requests_.erase(it, end(requests_));
641 int size = static_cast<int>(myreqqs.size());
643 MPI_Request* treqs = myreqqs.data();
644 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
651 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
653 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
654 for (int i = 0; not target_win && i < comm_->size(); i++) {
655 if (connected_wins_[i]->size_ > 0)
656 target_win = connected_wins_[i];
659 *size = target_win->size_;
660 *disp_unit = target_win->disp_unit_;
661 *static_cast<void**>(baseptr) = target_win->base_;
664 *static_cast<void**>(baseptr) = nullptr;
669 MPI_Errhandler Win::errhandler()
671 if (errhandler_ != MPI_ERRHANDLER_NULL)
676 void Win::set_errhandler(MPI_Errhandler errhandler)
678 if (errhandler_ != MPI_ERRHANDLER_NULL)
679 simgrid::smpi::Errhandler::unref(errhandler_);
680 errhandler_ = errhandler;
681 if (errhandler_ != MPI_ERRHANDLER_NULL)
685 } // namespace simgrid