1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24 if(target_count*target_datatype->get_extent()>win->size_){\
25 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28 return MPI_ERR_RMA_RANGE;\
31 #define CHECK_WIN_LOCKED(win) \
32 if (opened_ == 0) { /*check that post/start has been done*/ \
33 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
46 , disp_unit_(disp_unit)
49 , connected_wins_(comm->size())
51 , allocated_(allocated)
54 XBT_DEBUG("Creating window");
55 if(info!=MPI_INFO_NULL)
57 connected_wins_[rank_] = this;
59 comm->add_rma_win(this);
62 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64 if (MC_is_active() || MC_record_replay_is_active()){
65 if (bar_.get() == nullptr) // First to arrive on the barrier
66 bar_ = s4u::Barrier::create(comm->size());
75 //As per the standard, perform a barrier to ensure every async comm is finished
76 if (MC_is_active() || MC_record_replay_is_active())
79 colls::barrier(comm_);
82 if (info_ != MPI_INFO_NULL)
83 simgrid::smpi::Info::unref(info_);
84 if (errhandler_ != MPI_ERRHANDLER_NULL)
85 simgrid::smpi::Errhandler::unref(errhandler_);
87 comm_->remove_rma_win(this);
89 colls::barrier(comm_);
95 F2C::free_f(this->f2c_id());
99 int Win::attach(void* /*base*/, MPI_Aint size)
101 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
103 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
108 int Win::detach(const void* /*base*/)
115 void Win::get_name(char* name, int* length) const
117 *length = static_cast<int>(name_.length());
118 if (not name_.empty()) {
119 name_.copy(name, *length);
120 name[*length] = '\0';
124 void Win::get_group(MPI_Group* group){
125 if(comm_ != MPI_COMM_NULL){
126 *group = comm_->group();
128 *group = MPI_GROUP_NULL;
137 int Win::rank() const
142 MPI_Comm Win::comm() const
147 MPI_Aint Win::size() const
152 void* Win::base() const
157 int Win::disp_unit() const
162 bool Win::dynamic() const
167 void Win::set_info(MPI_Info info)
169 if (info_ != MPI_INFO_NULL)
170 simgrid::smpi::Info::unref(info_);
172 if (info_ != MPI_INFO_NULL)
176 void Win::set_name(const char* name){
180 int Win::fence(int assert)
182 XBT_DEBUG("Entering fence");
184 if (not (assert & MPI_MODE_NOPRECEDE)) {
185 // This is not the first fence => finalize what came before
186 if (MC_is_active() || MC_record_replay_is_active())
189 colls::barrier(comm_);
194 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
197 if (MC_is_active() || MC_record_replay_is_active())
200 colls::barrier(comm_);
201 XBT_DEBUG("Leaving fence");
206 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
207 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
209 //get receiver pointer
210 Win* recv_win = connected_wins_[target_rank];
212 CHECK_WIN_LOCKED(recv_win)
213 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
215 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
217 if (target_rank != rank_) { // This is not for myself, so we need to send messages
218 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
219 // prepare send_request
221 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
224 //prepare receiver request
225 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
226 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
231 if(request!=nullptr){
235 requests_.push_back(sreq);
239 //push request to receiver's win
240 recv_win->mut_->lock();
241 recv_win->requests_.push_back(rreq);
243 recv_win->mut_->unlock();
245 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
246 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
248 *request = MPI_REQUEST_NULL;
254 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
255 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
258 Win* send_win = connected_wins_[target_rank];
260 CHECK_WIN_LOCKED(send_win)
261 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
263 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
264 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
266 if (target_rank != rank_) {
267 //prepare send_request
268 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
269 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
271 //prepare receiver request
272 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
273 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
275 //start the send, with another process than us as sender.
277 // push request to sender's win
278 send_win->mut_->lock();
279 send_win->requests_.push_back(sreq);
280 send_win->mut_->unlock();
285 if(request!=nullptr){
289 requests_.push_back(rreq);
293 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
295 *request=MPI_REQUEST_NULL;
300 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
301 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
303 XBT_DEBUG("Entering MPI_Win_Accumulate");
304 //get receiver pointer
305 Win* recv_win = connected_wins_[target_rank];
307 //FIXME: local version
308 CHECK_WIN_LOCKED(recv_win)
309 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
311 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
312 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
313 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
314 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
315 // prepare send_request
317 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
318 SMPI_RMA_TAG - 3 - count_, comm_, op);
320 // prepare receiver request
321 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
322 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
328 // push request to receiver's win
329 recv_win->mut_->lock();
330 recv_win->requests_.push_back(rreq);
332 recv_win->mut_->unlock();
334 if (request != nullptr) {
338 requests_.push_back(sreq);
342 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
343 // 'flush' is a workaround to fix that.
345 XBT_DEBUG("Leaving MPI_Win_Accumulate");
349 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
350 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
351 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
354 const Win* send_win = connected_wins_[target_rank];
356 CHECK_WIN_LOCKED(send_win)
357 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
359 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
360 //need to be sure ops are correctly ordered, so finish request here ? slow.
361 MPI_Request req = MPI_REQUEST_NULL;
362 send_win->atomic_mut_->lock();
363 get(result_addr, result_count, result_datatype, target_rank,
364 target_disp, target_count, target_datatype, &req);
365 if (req != MPI_REQUEST_NULL)
366 Request::wait(&req, MPI_STATUS_IGNORE);
368 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
369 target_disp, target_count, target_datatype, op, &req);
370 if (req != MPI_REQUEST_NULL)
371 Request::wait(&req, MPI_STATUS_IGNORE);
372 send_win->atomic_mut_->unlock();
376 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
377 int target_rank, MPI_Aint target_disp)
380 const Win* send_win = connected_wins_[target_rank];
382 CHECK_WIN_LOCKED(send_win)
384 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
385 MPI_Request req = MPI_REQUEST_NULL;
386 send_win->atomic_mut_->lock();
387 get(result_addr, 1, datatype, target_rank,
388 target_disp, 1, datatype, &req);
389 if (req != MPI_REQUEST_NULL)
390 Request::wait(&req, MPI_STATUS_IGNORE);
391 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
392 put(origin_addr, 1, datatype, target_rank,
393 target_disp, 1, datatype);
395 send_win->atomic_mut_->unlock();
399 int Win::start(MPI_Group group, int /*assert*/)
401 /* From MPI forum advices
402 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
403 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
404 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
405 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
406 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
407 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
408 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
409 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
410 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
411 must complete, without further dependencies. */
413 //naive, blocking implementation.
414 XBT_DEBUG("Entering MPI_Win_Start");
415 std::vector<MPI_Request> reqs;
416 for (int i = 0; i < group->size(); i++) {
417 int src = comm_->group()->rank(group->actor(i));
418 xbt_assert(src != MPI_UNDEFINED);
420 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
422 int size = static_cast<int>(reqs.size());
424 Request::startall(size, reqs.data());
425 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
426 for (auto& req : reqs)
427 Request::unref(&req);
431 opened_++; // we're open for business !
432 XBT_DEBUG("Leaving MPI_Win_Start");
436 int Win::post(MPI_Group group, int /*assert*/)
438 //let's make a synchronous send here
439 XBT_DEBUG("Entering MPI_Win_Post");
440 std::vector<MPI_Request> reqs;
441 for (int i = 0; i < group->size(); i++) {
442 int dst = comm_->group()->rank(group->actor(i));
443 xbt_assert(dst != MPI_UNDEFINED);
445 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
447 int size = static_cast<int>(reqs.size());
449 Request::startall(size, reqs.data());
450 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
451 for (auto& req : reqs)
452 Request::unref(&req);
456 opened_++; // we're open for business !
457 XBT_DEBUG("Leaving MPI_Win_Post");
462 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
464 XBT_DEBUG("Entering MPI_Win_Complete");
465 std::vector<MPI_Request> reqs;
466 for (int i = 0; i < dst_group_->size(); i++) {
467 int dst = comm_->group()->rank(dst_group_->actor(i));
468 xbt_assert(dst != MPI_UNDEFINED);
470 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
472 int size = static_cast<int>(reqs.size());
474 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
475 Request::startall(size, reqs.data());
476 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
477 for (auto& req : reqs)
478 Request::unref(&req);
482 opened_--; //we're closed for business !
483 Group::unref(dst_group_);
484 dst_group_ = MPI_GROUP_NULL;
489 //naive, blocking implementation.
490 XBT_DEBUG("Entering MPI_Win_Wait");
491 std::vector<MPI_Request> reqs;
492 for (int i = 0; i < src_group_->size(); i++) {
493 int src = comm_->group()->rank(src_group_->actor(i));
494 xbt_assert(src != MPI_UNDEFINED);
496 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
498 int size = static_cast<int>(reqs.size());
500 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
501 Request::startall(size, reqs.data());
502 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
503 for (auto& req : reqs)
504 Request::unref(&req);
508 opened_--; //we're closed for business !
509 Group::unref(src_group_);
510 src_group_ = MPI_GROUP_NULL;
514 int Win::lock(int lock_type, int rank, int /*assert*/)
516 MPI_Win target_win = connected_wins_[rank];
518 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
519 target_win->lock_mut_->lock();
520 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
521 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
522 target_win->lock_mut_->unlock();
524 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
525 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
527 target_win->lockers_.push_back(rank_);
533 int Win::lock_all(int assert){
534 int retval = MPI_SUCCESS;
535 for (int i = 0; i < comm_->size(); i++) {
536 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
537 if (ret != MPI_SUCCESS)
543 int Win::unlock(int rank){
544 MPI_Win target_win = connected_wins_[rank];
545 int target_mode = target_win->mode_;
546 target_win->mode_= 0;
547 target_win->lockers_.remove(rank_);
548 if (target_mode==MPI_LOCK_EXCLUSIVE){
549 target_win->lock_mut_->unlock();
556 int Win::unlock_all(){
557 int retval = MPI_SUCCESS;
558 for (int i = 0; i < comm_->size(); i++) {
559 int ret = this->unlock(i);
560 if (ret != MPI_SUCCESS)
566 int Win::flush(int rank){
567 int finished = finish_comms(rank);
568 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
570 finished = connected_wins_[rank]->finish_comms(rank_);
571 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
576 int Win::flush_local(int rank){
577 int finished = finish_comms(rank);
578 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
582 int Win::flush_all(){
583 int finished = finish_comms();
584 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
585 for (int i = 0; i < comm_->size(); i++) {
587 finished = connected_wins_[i]->finish_comms(rank_);
588 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
594 int Win::flush_local_all(){
595 int finished = finish_comms();
596 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
600 Win* Win::f2c(int id){
601 return static_cast<Win*>(F2C::f2c(id));
604 int Win::finish_comms(){
605 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
606 // Without this, the vector could get redimensioned when another process pushes.
607 // This would result in the array used by Request::waitall() to be invalidated.
608 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
610 //Finish own requests
611 int size = static_cast<int>(requests_.size());
613 MPI_Request* treqs = requests_.data();
614 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
621 int Win::finish_comms(int rank){
622 // See comment about the mutex in finish_comms() above
624 // Finish own requests
625 // Let's see if we're either the destination or the sender of this request
626 // because we only wait for requests that we are responsible for.
627 // Also use the process id here since the request itself returns from src()
628 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
629 aid_t proc_id = comm_->group()->actor(rank);
630 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
631 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
633 std::vector<MPI_Request> myreqqs(it, end(requests_));
634 requests_.erase(it, end(requests_));
635 int size = static_cast<int>(myreqqs.size());
637 MPI_Request* treqs = myreqqs.data();
638 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
645 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
647 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
648 for (int i = 0; not target_win && i < comm_->size(); i++) {
649 if (connected_wins_[i]->size_ > 0)
650 target_win = connected_wins_[i];
653 *size = target_win->size_;
654 *disp_unit = target_win->disp_unit_;
655 *static_cast<void**>(baseptr) = target_win->base_;
658 *static_cast<void**>(baseptr) = nullptr;
663 MPI_Errhandler Win::errhandler()
665 if (errhandler_ != MPI_ERRHANDLER_NULL)
670 void Win::set_errhandler(MPI_Errhandler errhandler)
672 if (errhandler_ != MPI_ERRHANDLER_NULL)
673 simgrid::smpi::Errhandler::unref(errhandler_);
674 errhandler_ = errhandler;
675 if (errhandler_ != MPI_ERRHANDLER_NULL)
679 } // namespace simgrid