1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22 if(target_count*target_datatype->get_extent()>win->size_){\
23 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26 return MPI_ERR_RMA_RANGE;\
29 #define CHECK_WIN_LOCKED(win) \
30 if (opened_ == 0) { /*check that post/start has been done*/ \
31 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44 , disp_unit_(disp_unit)
47 , connected_wins_(comm->size())
49 , allocated_(allocated)
52 XBT_DEBUG("Creating window");
53 if(info!=MPI_INFO_NULL)
55 connected_wins_[rank_] = this;
57 bar_ = new s4u::Barrier(comm->size());
60 comm->add_rma_win(this);
63 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
66 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
73 //As per the standard, perform a barrier to ensure every async comm is finished
78 if (info_ != MPI_INFO_NULL)
79 simgrid::smpi::Info::unref(info_);
80 if (errhandler_ != MPI_ERRHANDLER_NULL)
81 simgrid::smpi::Errhandler::unref(errhandler_);
83 comm_->remove_rma_win(this);
85 colls::barrier(comm_);
94 F2C::free_f(this->f2c_id());
98 int Win::attach(void* /*base*/, MPI_Aint size)
100 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
102 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
107 int Win::detach(const void* /*base*/)
114 void Win::get_name(char* name, int* length) const
116 *length = static_cast<int>(name_.length());
117 if (not name_.empty()) {
118 name_.copy(name, *length);
119 name[*length] = '\0';
123 void Win::get_group(MPI_Group* group){
124 if(comm_ != MPI_COMM_NULL){
125 *group = comm_->group();
127 *group = MPI_GROUP_NULL;
133 if (info_ == MPI_INFO_NULL)
139 int Win::rank() const
144 MPI_Comm Win::comm() const
149 MPI_Aint Win::size() const
154 void* Win::base() const
159 int Win::disp_unit() const
164 bool Win::dynamic() const
169 void Win::set_info(MPI_Info info)
171 if (info_ != MPI_INFO_NULL)
172 simgrid::smpi::Info::unref(info_);
174 if (info_ != MPI_INFO_NULL)
178 void Win::set_name(const char* name){
182 int Win::fence(int assert)
184 XBT_DEBUG("Entering fence");
187 if (not (assert & MPI_MODE_NOPRECEDE)) {
188 // This is not the first fence => finalize what came before
194 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
199 XBT_DEBUG("Leaving fence");
204 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
205 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
207 //get receiver pointer
208 Win* recv_win = connected_wins_[target_rank];
210 CHECK_WIN_LOCKED(recv_win)
211 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
213 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
215 if (target_rank != rank_) { // This is not for myself, so we need to send messages
216 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
217 // prepare send_request
219 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
222 //prepare receiver request
223 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
224 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
229 if(request!=nullptr){
233 requests_.push_back(sreq);
237 //push request to receiver's win
238 recv_win->mut_->lock();
239 recv_win->requests_.push_back(rreq);
241 recv_win->mut_->unlock();
243 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
244 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
246 *request = MPI_REQUEST_NULL;
252 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
253 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
256 Win* send_win = connected_wins_[target_rank];
258 CHECK_WIN_LOCKED(send_win)
259 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
261 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
262 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
264 if (target_rank != rank_) {
265 //prepare send_request
266 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
267 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
269 //prepare receiver request
270 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
271 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
273 //start the send, with another process than us as sender.
275 // push request to sender's win
276 send_win->mut_->lock();
277 send_win->requests_.push_back(sreq);
278 send_win->mut_->unlock();
283 if(request!=nullptr){
287 requests_.push_back(rreq);
291 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
293 *request=MPI_REQUEST_NULL;
298 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
299 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
301 XBT_DEBUG("Entering MPI_Win_Accumulate");
302 //get receiver pointer
303 Win* recv_win = connected_wins_[target_rank];
305 //FIXME: local version
306 CHECK_WIN_LOCKED(recv_win)
307 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
309 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
310 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
311 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
312 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
313 // prepare send_request
315 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
316 SMPI_RMA_TAG - 3 - count_, comm_, op);
318 // prepare receiver request
319 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
320 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
326 // push request to receiver's win
327 recv_win->mut_->lock();
328 recv_win->requests_.push_back(rreq);
330 recv_win->mut_->unlock();
332 if (request != nullptr) {
336 requests_.push_back(sreq);
340 XBT_DEBUG("Leaving MPI_Win_Accumulate");
344 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
345 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
346 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
349 const Win* send_win = connected_wins_[target_rank];
351 CHECK_WIN_LOCKED(send_win)
352 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
354 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
355 //need to be sure ops are correctly ordered, so finish request here ? slow.
356 MPI_Request req = MPI_REQUEST_NULL;
357 send_win->atomic_mut_->lock();
358 get(result_addr, result_count, result_datatype, target_rank,
359 target_disp, target_count, target_datatype, &req);
360 if (req != MPI_REQUEST_NULL)
361 Request::wait(&req, MPI_STATUS_IGNORE);
363 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
364 target_disp, target_count, target_datatype, op, &req);
365 if (req != MPI_REQUEST_NULL)
366 Request::wait(&req, MPI_STATUS_IGNORE);
367 send_win->atomic_mut_->unlock();
371 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
372 int target_rank, MPI_Aint target_disp)
375 const Win* send_win = connected_wins_[target_rank];
377 CHECK_WIN_LOCKED(send_win)
379 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
380 MPI_Request req = MPI_REQUEST_NULL;
381 send_win->atomic_mut_->lock();
382 get(result_addr, 1, datatype, target_rank,
383 target_disp, 1, datatype, &req);
384 if (req != MPI_REQUEST_NULL)
385 Request::wait(&req, MPI_STATUS_IGNORE);
386 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
387 put(origin_addr, 1, datatype, target_rank,
388 target_disp, 1, datatype);
390 send_win->atomic_mut_->unlock();
394 int Win::start(MPI_Group group, int /*assert*/)
396 /* From MPI forum advices
397 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
398 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
399 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
400 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
401 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
402 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
403 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
404 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
405 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
406 must complete, without further dependencies. */
408 //naive, blocking implementation.
409 XBT_DEBUG("Entering MPI_Win_Start");
410 std::vector<MPI_Request> reqs;
411 for (int i = 0; i < group->size(); i++) {
412 int src = comm_->group()->rank(group->actor(i));
413 xbt_assert(src != MPI_UNDEFINED);
415 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
417 int size = static_cast<int>(reqs.size());
419 Request::startall(size, reqs.data());
420 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
421 for (auto& req : reqs)
422 Request::unref(&req);
426 opened_++; // we're open for business !
427 XBT_DEBUG("Leaving MPI_Win_Start");
431 int Win::post(MPI_Group group, int /*assert*/)
433 //let's make a synchronous send here
434 XBT_DEBUG("Entering MPI_Win_Post");
435 std::vector<MPI_Request> reqs;
436 for (int i = 0; i < group->size(); i++) {
437 int dst = comm_->group()->rank(group->actor(i));
438 xbt_assert(dst != MPI_UNDEFINED);
440 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
442 int size = static_cast<int>(reqs.size());
444 Request::startall(size, reqs.data());
445 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
446 for (auto& req : reqs)
447 Request::unref(&req);
451 opened_++; // we're open for business !
452 XBT_DEBUG("Leaving MPI_Win_Post");
457 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
459 XBT_DEBUG("Entering MPI_Win_Complete");
460 std::vector<MPI_Request> reqs;
461 for (int i = 0; i < dst_group_->size(); i++) {
462 int dst = comm_->group()->rank(dst_group_->actor(i));
463 xbt_assert(dst != MPI_UNDEFINED);
465 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
467 int size = static_cast<int>(reqs.size());
469 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
470 Request::startall(size, reqs.data());
471 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
472 for (auto& req : reqs)
473 Request::unref(&req);
477 opened_--; //we're closed for business !
478 Group::unref(dst_group_);
479 dst_group_ = MPI_GROUP_NULL;
484 //naive, blocking implementation.
485 XBT_DEBUG("Entering MPI_Win_Wait");
486 std::vector<MPI_Request> reqs;
487 for (int i = 0; i < src_group_->size(); i++) {
488 int src = comm_->group()->rank(src_group_->actor(i));
489 xbt_assert(src != MPI_UNDEFINED);
491 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
493 int size = static_cast<int>(reqs.size());
495 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
496 Request::startall(size, reqs.data());
497 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
498 for (auto& req : reqs)
499 Request::unref(&req);
503 opened_--; //we're closed for business !
504 Group::unref(src_group_);
505 src_group_ = MPI_GROUP_NULL;
509 int Win::lock(int lock_type, int rank, int /*assert*/)
511 MPI_Win target_win = connected_wins_[rank];
513 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
514 target_win->lock_mut_->lock();
515 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
516 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
517 target_win->lock_mut_->unlock();
519 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
520 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
522 target_win->lockers_.push_back(rank_);
528 int Win::lock_all(int assert){
529 int retval = MPI_SUCCESS;
530 for (int i = 0; i < comm_->size(); i++) {
531 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
532 if (ret != MPI_SUCCESS)
538 int Win::unlock(int rank){
539 MPI_Win target_win = connected_wins_[rank];
540 int target_mode = target_win->mode_;
541 target_win->mode_= 0;
542 target_win->lockers_.remove(rank_);
543 if (target_mode==MPI_LOCK_EXCLUSIVE){
544 target_win->lock_mut_->unlock();
551 int Win::unlock_all(){
552 int retval = MPI_SUCCESS;
553 for (int i = 0; i < comm_->size(); i++) {
554 int ret = this->unlock(i);
555 if (ret != MPI_SUCCESS)
561 int Win::flush(int rank){
562 int finished = finish_comms(rank);
563 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
565 finished = connected_wins_[rank]->finish_comms(rank_);
566 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
571 int Win::flush_local(int rank){
572 int finished = finish_comms(rank);
573 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
577 int Win::flush_all(){
578 int finished = finish_comms();
579 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
580 for (int i = 0; i < comm_->size(); i++) {
582 finished = connected_wins_[i]->finish_comms(rank_);
583 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
589 int Win::flush_local_all(){
590 int finished = finish_comms();
591 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
595 Win* Win::f2c(int id){
596 return static_cast<Win*>(F2C::f2c(id));
599 int Win::finish_comms(){
600 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
601 // Without this, the vector could get redimensioned when another process pushes.
602 // This would result in the array used by Request::waitall() to be invalidated.
603 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
605 //Finish own requests
606 int size = static_cast<int>(requests_.size());
608 MPI_Request* treqs = requests_.data();
609 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
616 int Win::finish_comms(int rank){
617 // See comment about the mutex in finish_comms() above
619 // Finish own requests
620 // Let's see if we're either the destination or the sender of this request
621 // because we only wait for requests that we are responsible for.
622 // Also use the process id here since the request itself returns from src()
623 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
624 aid_t proc_id = comm_->group()->actor(rank);
625 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
626 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
628 std::vector<MPI_Request> myreqqs(it, end(requests_));
629 requests_.erase(it, end(requests_));
630 int size = static_cast<int>(myreqqs.size());
632 MPI_Request* treqs = myreqqs.data();
633 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
640 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
642 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
643 for (int i = 0; not target_win && i < comm_->size(); i++) {
644 if (connected_wins_[i]->size_ > 0)
645 target_win = connected_wins_[i];
648 *size = target_win->size_;
649 *disp_unit = target_win->disp_unit_;
650 *static_cast<void**>(baseptr) = target_win->base_;
653 *static_cast<void**>(baseptr) = nullptr;
658 MPI_Errhandler Win::errhandler()
660 if (errhandler_ != MPI_ERRHANDLER_NULL)
665 void Win::set_errhandler(MPI_Errhandler errhandler)
667 if (errhandler_ != MPI_ERRHANDLER_NULL)
668 simgrid::smpi::Errhandler::unref(errhandler_);
669 errhandler_ = errhandler;
670 if (errhandler_ != MPI_ERRHANDLER_NULL)
674 } // namespace simgrid