1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22 if(target_count*target_datatype->get_extent()>win->size_){\
23 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26 return MPI_ERR_RMA_RANGE;\
29 #define CHECK_WIN_LOCKED(win) \
30 if (opened_ == 0) { /*check that post/start has been done*/ \
31 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44 , disp_unit_(disp_unit)
47 , connected_wins_(comm->size())
49 , allocated_(allocated)
52 XBT_DEBUG("Creating window");
53 if(info!=MPI_INFO_NULL)
55 connected_wins_[rank_] = this;
57 bar_ = new s4u::Barrier(comm->size());
60 comm->add_rma_win(this);
63 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
66 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
73 //As per the standard, perform a barrier to ensure every async comm is finished
78 if (info_ != MPI_INFO_NULL)
79 simgrid::smpi::Info::unref(info_);
80 if (errhandler_ != MPI_ERRHANDLER_NULL)
81 simgrid::smpi::Errhandler::unref(errhandler_);
83 comm_->remove_rma_win(this);
85 colls::barrier(comm_);
94 F2C::free_f(this->f2c_id());
98 int Win::attach(void* /*base*/, MPI_Aint size)
100 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
102 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
107 int Win::detach(const void* /*base*/)
114 void Win::get_name(char* name, int* length) const
116 *length = static_cast<int>(name_.length());
117 if (not name_.empty()) {
118 name_.copy(name, *length);
119 name[*length] = '\0';
123 void Win::get_group(MPI_Group* group){
124 if(comm_ != MPI_COMM_NULL){
125 *group = comm_->group();
127 *group = MPI_GROUP_NULL;
133 if (info_ == MPI_INFO_NULL)
139 int Win::rank() const
144 MPI_Comm Win::comm() const
149 MPI_Aint Win::size() const
154 void* Win::base() const
159 int Win::disp_unit() const
164 bool Win::dynamic() const
169 void Win::set_info(MPI_Info info)
171 if (info_ != MPI_INFO_NULL)
172 simgrid::smpi::Info::unref(info_);
174 if (info_ != MPI_INFO_NULL)
178 void Win::set_name(const char* name){
182 int Win::fence(int assert)
184 XBT_DEBUG("Entering fence");
187 if (not (assert & MPI_MODE_NOPRECEDE)) {
188 // This is not the first fence => finalize what came before
194 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
199 XBT_DEBUG("Leaving fence");
204 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
205 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
207 //get receiver pointer
208 Win* recv_win = connected_wins_[target_rank];
210 CHECK_WIN_LOCKED(recv_win)
211 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
213 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
215 if (target_rank != rank_) { // This is not for myself, so we need to send messages
216 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
217 // prepare send_request
219 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
222 //prepare receiver request
223 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
224 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
229 if(request!=nullptr){
233 requests_.push_back(sreq);
237 //push request to receiver's win
238 recv_win->mut_->lock();
239 recv_win->requests_.push_back(rreq);
241 recv_win->mut_->unlock();
243 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
244 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
246 *request = MPI_REQUEST_NULL;
252 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
253 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
256 Win* send_win = connected_wins_[target_rank];
258 CHECK_WIN_LOCKED(send_win)
259 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
261 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
262 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
264 if (target_rank != rank_) {
265 //prepare send_request
266 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
267 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
269 //prepare receiver request
270 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
271 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
273 //start the send, with another process than us as sender.
275 // push request to sender's win
276 send_win->mut_->lock();
277 send_win->requests_.push_back(sreq);
278 send_win->mut_->unlock();
283 if(request!=nullptr){
287 requests_.push_back(rreq);
291 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
293 *request=MPI_REQUEST_NULL;
298 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
299 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
301 XBT_DEBUG("Entering MPI_Win_Accumulate");
302 //get receiver pointer
303 Win* recv_win = connected_wins_[target_rank];
305 //FIXME: local version
306 CHECK_WIN_LOCKED(recv_win)
307 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
309 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
310 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
311 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
312 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
313 // prepare send_request
315 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
316 SMPI_RMA_TAG - 3 - count_, comm_, op);
318 // prepare receiver request
319 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
320 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
326 // push request to receiver's win
327 recv_win->mut_->lock();
328 recv_win->requests_.push_back(rreq);
330 recv_win->mut_->unlock();
332 if (request != nullptr) {
336 requests_.push_back(sreq);
340 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
341 // 'flush' is a workaround to fix that.
343 XBT_DEBUG("Leaving MPI_Win_Accumulate");
347 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
348 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
349 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
352 const Win* send_win = connected_wins_[target_rank];
354 CHECK_WIN_LOCKED(send_win)
355 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
357 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
358 //need to be sure ops are correctly ordered, so finish request here ? slow.
359 MPI_Request req = MPI_REQUEST_NULL;
360 send_win->atomic_mut_->lock();
361 get(result_addr, result_count, result_datatype, target_rank,
362 target_disp, target_count, target_datatype, &req);
363 if (req != MPI_REQUEST_NULL)
364 Request::wait(&req, MPI_STATUS_IGNORE);
366 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
367 target_disp, target_count, target_datatype, op, &req);
368 if (req != MPI_REQUEST_NULL)
369 Request::wait(&req, MPI_STATUS_IGNORE);
370 send_win->atomic_mut_->unlock();
374 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
375 int target_rank, MPI_Aint target_disp)
378 const Win* send_win = connected_wins_[target_rank];
380 CHECK_WIN_LOCKED(send_win)
382 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
383 MPI_Request req = MPI_REQUEST_NULL;
384 send_win->atomic_mut_->lock();
385 get(result_addr, 1, datatype, target_rank,
386 target_disp, 1, datatype, &req);
387 if (req != MPI_REQUEST_NULL)
388 Request::wait(&req, MPI_STATUS_IGNORE);
389 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
390 put(origin_addr, 1, datatype, target_rank,
391 target_disp, 1, datatype);
393 send_win->atomic_mut_->unlock();
397 int Win::start(MPI_Group group, int /*assert*/)
399 /* From MPI forum advices
400 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
401 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
402 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
403 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
404 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
405 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
406 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
407 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
408 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
409 must complete, without further dependencies. */
411 //naive, blocking implementation.
412 XBT_DEBUG("Entering MPI_Win_Start");
413 std::vector<MPI_Request> reqs;
414 for (int i = 0; i < group->size(); i++) {
415 int src = comm_->group()->rank(group->actor(i));
416 xbt_assert(src != MPI_UNDEFINED);
418 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
420 int size = static_cast<int>(reqs.size());
422 Request::startall(size, reqs.data());
423 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
424 for (auto& req : reqs)
425 Request::unref(&req);
429 opened_++; // we're open for business !
430 XBT_DEBUG("Leaving MPI_Win_Start");
434 int Win::post(MPI_Group group, int /*assert*/)
436 //let's make a synchronous send here
437 XBT_DEBUG("Entering MPI_Win_Post");
438 std::vector<MPI_Request> reqs;
439 for (int i = 0; i < group->size(); i++) {
440 int dst = comm_->group()->rank(group->actor(i));
441 xbt_assert(dst != MPI_UNDEFINED);
443 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
445 int size = static_cast<int>(reqs.size());
447 Request::startall(size, reqs.data());
448 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
449 for (auto& req : reqs)
450 Request::unref(&req);
454 opened_++; // we're open for business !
455 XBT_DEBUG("Leaving MPI_Win_Post");
460 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
462 XBT_DEBUG("Entering MPI_Win_Complete");
463 std::vector<MPI_Request> reqs;
464 for (int i = 0; i < dst_group_->size(); i++) {
465 int dst = comm_->group()->rank(dst_group_->actor(i));
466 xbt_assert(dst != MPI_UNDEFINED);
468 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
470 int size = static_cast<int>(reqs.size());
472 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
473 Request::startall(size, reqs.data());
474 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
475 for (auto& req : reqs)
476 Request::unref(&req);
480 opened_--; //we're closed for business !
481 Group::unref(dst_group_);
482 dst_group_ = MPI_GROUP_NULL;
487 //naive, blocking implementation.
488 XBT_DEBUG("Entering MPI_Win_Wait");
489 std::vector<MPI_Request> reqs;
490 for (int i = 0; i < src_group_->size(); i++) {
491 int src = comm_->group()->rank(src_group_->actor(i));
492 xbt_assert(src != MPI_UNDEFINED);
494 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
496 int size = static_cast<int>(reqs.size());
498 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
499 Request::startall(size, reqs.data());
500 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
501 for (auto& req : reqs)
502 Request::unref(&req);
506 opened_--; //we're closed for business !
507 Group::unref(src_group_);
508 src_group_ = MPI_GROUP_NULL;
512 int Win::lock(int lock_type, int rank, int /*assert*/)
514 MPI_Win target_win = connected_wins_[rank];
516 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
517 target_win->lock_mut_->lock();
518 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
519 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
520 target_win->lock_mut_->unlock();
522 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
523 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
525 target_win->lockers_.push_back(rank_);
531 int Win::lock_all(int assert){
532 int retval = MPI_SUCCESS;
533 for (int i = 0; i < comm_->size(); i++) {
534 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
535 if (ret != MPI_SUCCESS)
541 int Win::unlock(int rank){
542 MPI_Win target_win = connected_wins_[rank];
543 int target_mode = target_win->mode_;
544 target_win->mode_= 0;
545 target_win->lockers_.remove(rank_);
546 if (target_mode==MPI_LOCK_EXCLUSIVE){
547 target_win->lock_mut_->unlock();
554 int Win::unlock_all(){
555 int retval = MPI_SUCCESS;
556 for (int i = 0; i < comm_->size(); i++) {
557 int ret = this->unlock(i);
558 if (ret != MPI_SUCCESS)
564 int Win::flush(int rank){
565 int finished = finish_comms(rank);
566 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
568 finished = connected_wins_[rank]->finish_comms(rank_);
569 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
574 int Win::flush_local(int rank){
575 int finished = finish_comms(rank);
576 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
580 int Win::flush_all(){
581 int finished = finish_comms();
582 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
583 for (int i = 0; i < comm_->size(); i++) {
585 finished = connected_wins_[i]->finish_comms(rank_);
586 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
592 int Win::flush_local_all(){
593 int finished = finish_comms();
594 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
598 Win* Win::f2c(int id){
599 return static_cast<Win*>(F2C::f2c(id));
602 int Win::finish_comms(){
603 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
604 // Without this, the vector could get redimensioned when another process pushes.
605 // This would result in the array used by Request::waitall() to be invalidated.
606 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
608 //Finish own requests
609 int size = static_cast<int>(requests_.size());
611 MPI_Request* treqs = requests_.data();
612 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
619 int Win::finish_comms(int rank){
620 // See comment about the mutex in finish_comms() above
622 // Finish own requests
623 // Let's see if we're either the destination or the sender of this request
624 // because we only wait for requests that we are responsible for.
625 // Also use the process id here since the request itself returns from src()
626 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
627 aid_t proc_id = comm_->group()->actor(rank);
628 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
629 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
631 std::vector<MPI_Request> myreqqs(it, end(requests_));
632 requests_.erase(it, end(requests_));
633 int size = static_cast<int>(myreqqs.size());
635 MPI_Request* treqs = myreqqs.data();
636 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
643 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
645 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
646 for (int i = 0; not target_win && i < comm_->size(); i++) {
647 if (connected_wins_[i]->size_ > 0)
648 target_win = connected_wins_[i];
651 *size = target_win->size_;
652 *disp_unit = target_win->disp_unit_;
653 *static_cast<void**>(baseptr) = target_win->base_;
656 *static_cast<void**>(baseptr) = nullptr;
661 MPI_Errhandler Win::errhandler()
663 if (errhandler_ != MPI_ERRHANDLER_NULL)
668 void Win::set_errhandler(MPI_Errhandler errhandler)
670 if (errhandler_ != MPI_ERRHANDLER_NULL)
671 simgrid::smpi::Errhandler::unref(errhandler_);
672 errhandler_ = errhandler;
673 if (errhandler_ != MPI_ERRHANDLER_NULL)
677 } // namespace simgrid