1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22 if(target_count*target_datatype->get_extent()>win->size_){\
23 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26 return MPI_ERR_RMA_RANGE;\
29 #define CHECK_WIN_LOCKED(win) \
30 if (opened_ == 0) { /*check that post/start has been done*/ \
31 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44 , disp_unit_(disp_unit)
47 , connected_wins_(comm->size())
49 , allocated_(allocated)
52 XBT_DEBUG("Creating window");
53 if(info!=MPI_INFO_NULL)
55 connected_wins_[rank_] = this;
57 bar_ = new s4u::Barrier(comm->size());
60 comm->add_rma_win(this);
63 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
66 colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
73 //As per the standard, perform a barrier to ensure every async comm is finished
78 if (info_ != MPI_INFO_NULL)
79 simgrid::smpi::Info::unref(info_);
80 if (errhandler_ != MPI_ERRHANDLER_NULL)
81 simgrid::smpi::Errhandler::unref(errhandler_);
83 comm_->remove_rma_win(this);
85 colls::barrier(comm_);
94 F2C::free_f(this->f2c_id());
98 int Win::attach(void* /*base*/, MPI_Aint size)
100 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
102 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
107 int Win::detach(const void* /*base*/)
114 void Win::get_name(char* name, int* length) const
116 *length = static_cast<int>(name_.length());
117 if (not name_.empty()) {
118 name_.copy(name, *length);
119 name[*length] = '\0';
123 void Win::get_group(MPI_Group* group){
124 if(comm_ != MPI_COMM_NULL){
125 *group = comm_->group();
127 *group = MPI_GROUP_NULL;
133 if (info_ == MPI_INFO_NULL)
139 int Win::rank() const
144 MPI_Comm Win::comm() const
149 MPI_Aint Win::size() const
154 void* Win::base() const
159 int Win::disp_unit() const
164 bool Win::dynamic() const
169 void Win::set_info(MPI_Info info)
171 if (info_ != MPI_INFO_NULL)
172 simgrid::smpi::Info::unref(info_);
174 if (info_ != MPI_INFO_NULL)
178 void Win::set_name(const char* name){
182 int Win::fence(int assert)
184 XBT_DEBUG("Entering fence");
187 if (not (assert & MPI_MODE_NOPRECEDE)) {
188 // This is not the first fence => finalize what came before
191 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
192 // Without this, the vector could get redimensioned when another process pushes.
193 // This would result in the array used by Request::waitall() to be invalidated.
194 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
196 // start all requests that have been prepared by another process
197 if (not requests_.empty()) {
198 int size = static_cast<int>(requests_.size());
199 MPI_Request* treqs = requests_.data();
200 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
206 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
211 XBT_DEBUG("Leaving fence");
216 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
217 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
219 //get receiver pointer
220 Win* recv_win = connected_wins_[target_rank];
222 CHECK_WIN_LOCKED(recv_win)
223 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
225 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
227 if (target_rank != rank_) { // This is not for myself, so we need to send messages
228 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
229 // prepare send_request
231 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
234 //prepare receiver request
235 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
236 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
241 if(request!=nullptr){
245 requests_.push_back(sreq);
249 //push request to receiver's win
250 recv_win->mut_->lock();
251 recv_win->requests_.push_back(rreq);
253 recv_win->mut_->unlock();
255 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
256 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
258 *request = MPI_REQUEST_NULL;
264 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
265 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
268 Win* send_win = connected_wins_[target_rank];
270 CHECK_WIN_LOCKED(send_win)
271 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
273 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
274 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
276 if (target_rank != rank_) {
277 //prepare send_request
278 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
279 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
281 //prepare receiver request
282 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
283 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
285 //start the send, with another process than us as sender.
287 // push request to sender's win
288 send_win->mut_->lock();
289 send_win->requests_.push_back(sreq);
290 send_win->mut_->unlock();
295 if(request!=nullptr){
299 requests_.push_back(rreq);
303 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
305 *request=MPI_REQUEST_NULL;
310 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
311 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
313 XBT_DEBUG("Entering MPI_Win_Accumulate");
314 //get receiver pointer
315 Win* recv_win = connected_wins_[target_rank];
317 //FIXME: local version
318 CHECK_WIN_LOCKED(recv_win)
319 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
321 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
322 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
323 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
324 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
325 // prepare send_request
327 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
328 SMPI_RMA_TAG - 3 - count_, comm_, op);
330 // prepare receiver request
331 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
332 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
338 // push request to receiver's win
339 recv_win->mut_->lock();
340 recv_win->requests_.push_back(rreq);
342 recv_win->mut_->unlock();
344 if (request != nullptr) {
348 requests_.push_back(sreq);
352 XBT_DEBUG("Leaving MPI_Win_Accumulate");
356 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
357 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
358 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
361 const Win* send_win = connected_wins_[target_rank];
363 CHECK_WIN_LOCKED(send_win)
364 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
366 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
367 //need to be sure ops are correctly ordered, so finish request here ? slow.
368 MPI_Request req = MPI_REQUEST_NULL;
369 send_win->atomic_mut_->lock();
370 get(result_addr, result_count, result_datatype, target_rank,
371 target_disp, target_count, target_datatype, &req);
372 if (req != MPI_REQUEST_NULL)
373 Request::wait(&req, MPI_STATUS_IGNORE);
375 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
376 target_disp, target_count, target_datatype, op, &req);
377 if (req != MPI_REQUEST_NULL)
378 Request::wait(&req, MPI_STATUS_IGNORE);
379 send_win->atomic_mut_->unlock();
383 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
384 int target_rank, MPI_Aint target_disp)
387 const Win* send_win = connected_wins_[target_rank];
389 CHECK_WIN_LOCKED(send_win)
391 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
392 MPI_Request req = MPI_REQUEST_NULL;
393 send_win->atomic_mut_->lock();
394 get(result_addr, 1, datatype, target_rank,
395 target_disp, 1, datatype, &req);
396 if (req != MPI_REQUEST_NULL)
397 Request::wait(&req, MPI_STATUS_IGNORE);
398 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
399 put(origin_addr, 1, datatype, target_rank,
400 target_disp, 1, datatype);
402 send_win->atomic_mut_->unlock();
406 int Win::start(MPI_Group group, int /*assert*/)
408 /* From MPI forum advices
409 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
410 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
411 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
412 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
413 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
414 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
415 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
416 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
417 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
418 must complete, without further dependencies. */
420 //naive, blocking implementation.
421 XBT_DEBUG("Entering MPI_Win_Start");
422 std::vector<MPI_Request> reqs;
423 for (int i = 0; i < group->size(); i++) {
424 int src = comm_->group()->rank(group->actor(i));
425 xbt_assert(src != MPI_UNDEFINED);
427 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
429 int size = static_cast<int>(reqs.size());
431 Request::startall(size, reqs.data());
432 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
433 for (auto& req : reqs)
434 Request::unref(&req);
438 opened_++; // we're open for business !
439 XBT_DEBUG("Leaving MPI_Win_Start");
443 int Win::post(MPI_Group group, int /*assert*/)
445 //let's make a synchronous send here
446 XBT_DEBUG("Entering MPI_Win_Post");
447 std::vector<MPI_Request> reqs;
448 for (int i = 0; i < group->size(); i++) {
449 int dst = comm_->group()->rank(group->actor(i));
450 xbt_assert(dst != MPI_UNDEFINED);
452 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
454 int size = static_cast<int>(reqs.size());
456 Request::startall(size, reqs.data());
457 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
458 for (auto& req : reqs)
459 Request::unref(&req);
463 opened_++; // we're open for business !
464 XBT_DEBUG("Leaving MPI_Win_Post");
469 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
471 XBT_DEBUG("Entering MPI_Win_Complete");
472 std::vector<MPI_Request> reqs;
473 for (int i = 0; i < dst_group_->size(); i++) {
474 int dst = comm_->group()->rank(dst_group_->actor(i));
475 xbt_assert(dst != MPI_UNDEFINED);
477 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
479 int size = static_cast<int>(reqs.size());
481 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
482 Request::startall(size, reqs.data());
483 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
484 for (auto& req : reqs)
485 Request::unref(&req);
489 opened_--; //we're closed for business !
490 Group::unref(dst_group_);
491 dst_group_ = MPI_GROUP_NULL;
496 //naive, blocking implementation.
497 XBT_DEBUG("Entering MPI_Win_Wait");
498 std::vector<MPI_Request> reqs;
499 for (int i = 0; i < src_group_->size(); i++) {
500 int src = comm_->group()->rank(src_group_->actor(i));
501 xbt_assert(src != MPI_UNDEFINED);
503 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
505 int size = static_cast<int>(reqs.size());
507 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
508 Request::startall(size, reqs.data());
509 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
510 for (auto& req : reqs)
511 Request::unref(&req);
515 opened_--; //we're closed for business !
516 Group::unref(src_group_);
517 src_group_ = MPI_GROUP_NULL;
521 int Win::lock(int lock_type, int rank, int /*assert*/)
523 MPI_Win target_win = connected_wins_[rank];
525 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
526 target_win->lock_mut_->lock();
527 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
528 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
529 target_win->lock_mut_->unlock();
531 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
532 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
534 target_win->lockers_.push_back(rank_);
540 int Win::lock_all(int assert){
541 int retval = MPI_SUCCESS;
542 for (int i = 0; i < comm_->size(); i++) {
543 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
544 if (ret != MPI_SUCCESS)
550 int Win::unlock(int rank){
551 MPI_Win target_win = connected_wins_[rank];
552 int target_mode = target_win->mode_;
553 target_win->mode_= 0;
554 target_win->lockers_.remove(rank_);
555 if (target_mode==MPI_LOCK_EXCLUSIVE){
556 target_win->lock_mut_->unlock();
563 int Win::unlock_all(){
564 int retval = MPI_SUCCESS;
565 for (int i = 0; i < comm_->size(); i++) {
566 int ret = this->unlock(i);
567 if (ret != MPI_SUCCESS)
573 int Win::flush(int rank){
574 int finished = finish_comms(rank);
575 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
577 finished = connected_wins_[rank]->finish_comms(rank_);
578 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
583 int Win::flush_local(int rank){
584 int finished = finish_comms(rank);
585 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
589 int Win::flush_all(){
590 int finished = finish_comms();
591 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
592 for (int i = 0; i < comm_->size(); i++) {
594 finished = connected_wins_[i]->finish_comms(rank_);
595 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
601 int Win::flush_local_all(){
602 int finished = finish_comms();
603 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
607 Win* Win::f2c(int id){
608 return static_cast<Win*>(F2C::f2c(id));
611 int Win::finish_comms(){
613 //Finish own requests
614 int size = static_cast<int>(requests_.size());
616 MPI_Request* treqs = requests_.data();
617 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
624 int Win::finish_comms(int rank){
626 // Finish own requests
627 // Let's see if we're either the destination or the sender of this request
628 // because we only wait for requests that we are responsible for.
629 // Also use the process id here since the request itself returns from src()
630 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
631 aid_t proc_id = comm_->group()->actor(rank);
632 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
633 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
635 std::vector<MPI_Request> myreqqs(it, end(requests_));
636 requests_.erase(it, end(requests_));
637 int size = static_cast<int>(myreqqs.size());
639 MPI_Request* treqs = myreqqs.data();
640 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
647 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
649 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
650 for (int i = 0; not target_win && i < comm_->size(); i++) {
651 if (connected_wins_[i]->size_ > 0)
652 target_win = connected_wins_[i];
655 *size = target_win->size_;
656 *disp_unit = target_win->disp_unit_;
657 *static_cast<void**>(baseptr) = target_win->base_;
660 *static_cast<void**>(baseptr) = nullptr;
665 MPI_Errhandler Win::errhandler()
667 if (errhandler_ != MPI_ERRHANDLER_NULL)
672 void Win::set_errhandler(MPI_Errhandler errhandler)
674 if (errhandler_ != MPI_ERRHANDLER_NULL)
675 simgrid::smpi::Errhandler::unref(errhandler_);
676 errhandler_ = errhandler;
677 if (errhandler_ != MPI_ERRHANDLER_NULL)
681 } // namespace simgrid