1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22 if(target_count*target_datatype->get_extent()>win->size_){\
23 XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24 fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25 simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26 return MPI_ERR_RMA_RANGE;\
29 #define CHECK_WIN_LOCKED(win) \
30 if (opened_ == 0) { /*check that post/start has been done*/ \
31 bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44 , disp_unit_(disp_unit)
47 , connected_wins_(comm->size())
49 , allocated_(allocated)
52 XBT_DEBUG("Creating window");
53 if(info!=MPI_INFO_NULL)
55 connected_wins_[rank_] = this;
57 comm->add_rma_win(this);
60 colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
68 //As per the standard, perform a barrier to ensure every async comm is finished
69 colls::barrier(comm_);
72 if (info_ != MPI_INFO_NULL)
73 simgrid::smpi::Info::unref(info_);
74 if (errhandler_ != MPI_ERRHANDLER_NULL)
75 simgrid::smpi::Errhandler::unref(errhandler_);
77 comm_->remove_rma_win(this);
79 colls::barrier(comm_);
85 F2C::free_f(this->f2c_id());
89 int Win::attach(void* /*base*/, MPI_Aint size)
91 if (not(base_ == MPI_BOTTOM || base_ == nullptr))
93 base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
98 int Win::detach(const void* /*base*/)
105 void Win::get_name(char* name, int* length) const
107 *length = static_cast<int>(name_.length());
108 if (not name_.empty()) {
109 name_.copy(name, *length);
110 name[*length] = '\0';
114 void Win::get_group(MPI_Group* group){
115 if(comm_ != MPI_COMM_NULL){
116 *group = comm_->group();
118 *group = MPI_GROUP_NULL;
127 int Win::rank() const
132 MPI_Comm Win::comm() const
137 MPI_Aint Win::size() const
142 void* Win::base() const
147 int Win::disp_unit() const
152 bool Win::dynamic() const
157 void Win::set_info(MPI_Info info)
159 if (info_ != MPI_INFO_NULL)
160 simgrid::smpi::Info::unref(info_);
162 if (info_ != MPI_INFO_NULL)
166 void Win::set_name(const char* name){
170 int Win::fence(int assert)
172 XBT_DEBUG("Entering fence");
174 if (not (assert & MPI_MODE_NOPRECEDE)) {
175 // This is not the first fence => finalize what came before
176 colls::barrier(comm_);
181 if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
184 colls::barrier(comm_);
185 XBT_DEBUG("Leaving fence");
190 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
191 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
193 //get receiver pointer
194 Win* recv_win = connected_wins_[target_rank];
196 CHECK_WIN_LOCKED(recv_win)
197 CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
199 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
201 if (target_rank != rank_) { // This is not for myself, so we need to send messages
202 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
203 // prepare send_request
205 Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
208 //prepare receiver request
209 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
210 SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
215 if(request!=nullptr){
219 requests_.push_back(sreq);
223 //push request to receiver's win
224 recv_win->mut_->lock();
225 recv_win->requests_.push_back(rreq);
227 recv_win->mut_->unlock();
229 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
230 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
232 *request = MPI_REQUEST_NULL;
238 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
239 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
242 Win* send_win = connected_wins_[target_rank];
244 CHECK_WIN_LOCKED(send_win)
245 CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
247 const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
248 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
250 if (target_rank != rank_) {
251 //prepare send_request
252 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
253 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
255 //prepare receiver request
256 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
257 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
259 //start the send, with another process than us as sender.
261 // push request to sender's win
262 send_win->mut_->lock();
263 send_win->requests_.push_back(sreq);
264 send_win->mut_->unlock();
269 if(request!=nullptr){
273 requests_.push_back(rreq);
277 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
279 *request=MPI_REQUEST_NULL;
284 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
285 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
287 XBT_DEBUG("Entering MPI_Win_Accumulate");
288 //get receiver pointer
289 Win* recv_win = connected_wins_[target_rank];
291 //FIXME: local version
292 CHECK_WIN_LOCKED(recv_win)
293 CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
295 void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
296 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
297 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
298 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
299 // prepare send_request
301 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
302 SMPI_RMA_TAG - 3 - count_, comm_, op);
304 // prepare receiver request
305 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
306 SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
312 // push request to receiver's win
313 recv_win->mut_->lock();
314 recv_win->requests_.push_back(rreq);
316 recv_win->mut_->unlock();
318 if (request != nullptr) {
322 requests_.push_back(sreq);
326 // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests. The following
327 // 'flush' is a workaround to fix that.
329 XBT_DEBUG("Leaving MPI_Win_Accumulate");
333 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
334 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
335 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
338 const Win* send_win = connected_wins_[target_rank];
340 CHECK_WIN_LOCKED(send_win)
341 CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
343 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
344 //need to be sure ops are correctly ordered, so finish request here ? slow.
345 MPI_Request req = MPI_REQUEST_NULL;
346 send_win->atomic_mut_->lock();
347 get(result_addr, result_count, result_datatype, target_rank,
348 target_disp, target_count, target_datatype, &req);
349 if (req != MPI_REQUEST_NULL)
350 Request::wait(&req, MPI_STATUS_IGNORE);
352 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
353 target_disp, target_count, target_datatype, op, &req);
354 if (req != MPI_REQUEST_NULL)
355 Request::wait(&req, MPI_STATUS_IGNORE);
356 send_win->atomic_mut_->unlock();
360 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
361 int target_rank, MPI_Aint target_disp)
364 const Win* send_win = connected_wins_[target_rank];
366 CHECK_WIN_LOCKED(send_win)
368 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
369 MPI_Request req = MPI_REQUEST_NULL;
370 send_win->atomic_mut_->lock();
371 get(result_addr, 1, datatype, target_rank,
372 target_disp, 1, datatype, &req);
373 if (req != MPI_REQUEST_NULL)
374 Request::wait(&req, MPI_STATUS_IGNORE);
375 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
376 put(origin_addr, 1, datatype, target_rank,
377 target_disp, 1, datatype);
379 send_win->atomic_mut_->unlock();
383 int Win::start(MPI_Group group, int /*assert*/)
385 /* From MPI forum advices
386 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
387 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
388 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
389 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
390 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
391 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
392 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
393 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
394 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
395 must complete, without further dependencies. */
397 //naive, blocking implementation.
398 XBT_DEBUG("Entering MPI_Win_Start");
399 std::vector<MPI_Request> reqs;
400 for (int i = 0; i < group->size(); i++) {
401 int src = comm_->group()->rank(group->actor(i));
402 xbt_assert(src != MPI_UNDEFINED);
404 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
406 int size = static_cast<int>(reqs.size());
408 Request::startall(size, reqs.data());
409 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
410 for (auto& req : reqs)
411 Request::unref(&req);
415 opened_++; // we're open for business !
416 XBT_DEBUG("Leaving MPI_Win_Start");
420 int Win::post(MPI_Group group, int /*assert*/)
422 //let's make a synchronous send here
423 XBT_DEBUG("Entering MPI_Win_Post");
424 std::vector<MPI_Request> reqs;
425 for (int i = 0; i < group->size(); i++) {
426 int dst = comm_->group()->rank(group->actor(i));
427 xbt_assert(dst != MPI_UNDEFINED);
429 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
431 int size = static_cast<int>(reqs.size());
433 Request::startall(size, reqs.data());
434 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
435 for (auto& req : reqs)
436 Request::unref(&req);
440 opened_++; // we're open for business !
441 XBT_DEBUG("Leaving MPI_Win_Post");
446 xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
448 XBT_DEBUG("Entering MPI_Win_Complete");
449 std::vector<MPI_Request> reqs;
450 for (int i = 0; i < dst_group_->size(); i++) {
451 int dst = comm_->group()->rank(dst_group_->actor(i));
452 xbt_assert(dst != MPI_UNDEFINED);
454 reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
456 int size = static_cast<int>(reqs.size());
458 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
459 Request::startall(size, reqs.data());
460 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
461 for (auto& req : reqs)
462 Request::unref(&req);
466 opened_--; //we're closed for business !
467 Group::unref(dst_group_);
468 dst_group_ = MPI_GROUP_NULL;
473 //naive, blocking implementation.
474 XBT_DEBUG("Entering MPI_Win_Wait");
475 std::vector<MPI_Request> reqs;
476 for (int i = 0; i < src_group_->size(); i++) {
477 int src = comm_->group()->rank(src_group_->actor(i));
478 xbt_assert(src != MPI_UNDEFINED);
480 reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
482 int size = static_cast<int>(reqs.size());
484 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
485 Request::startall(size, reqs.data());
486 Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
487 for (auto& req : reqs)
488 Request::unref(&req);
492 opened_--; //we're closed for business !
493 Group::unref(src_group_);
494 src_group_ = MPI_GROUP_NULL;
498 int Win::lock(int lock_type, int rank, int /*assert*/)
500 MPI_Win target_win = connected_wins_[rank];
502 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
503 target_win->lock_mut_->lock();
504 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
505 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
506 target_win->lock_mut_->unlock();
508 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
509 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
511 target_win->lockers_.push_back(rank_);
517 int Win::lock_all(int assert){
518 int retval = MPI_SUCCESS;
519 for (int i = 0; i < comm_->size(); i++) {
520 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
521 if (ret != MPI_SUCCESS)
527 int Win::unlock(int rank){
528 MPI_Win target_win = connected_wins_[rank];
529 int target_mode = target_win->mode_;
530 target_win->mode_= 0;
531 target_win->lockers_.remove(rank_);
532 if (target_mode==MPI_LOCK_EXCLUSIVE){
533 target_win->lock_mut_->unlock();
540 int Win::unlock_all(){
541 int retval = MPI_SUCCESS;
542 for (int i = 0; i < comm_->size(); i++) {
543 int ret = this->unlock(i);
544 if (ret != MPI_SUCCESS)
550 int Win::flush(int rank){
551 int finished = finish_comms(rank);
552 XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
554 finished = connected_wins_[rank]->finish_comms(rank_);
555 XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
560 int Win::flush_local(int rank){
561 int finished = finish_comms(rank);
562 XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
566 int Win::flush_all(){
567 int finished = finish_comms();
568 XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
569 for (int i = 0; i < comm_->size(); i++) {
571 finished = connected_wins_[i]->finish_comms(rank_);
572 XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
578 int Win::flush_local_all(){
579 int finished = finish_comms();
580 XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
584 Win* Win::f2c(int id){
585 return static_cast<Win*>(F2C::f2c(id));
588 int Win::finish_comms(){
589 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
590 // Without this, the vector could get redimensioned when another process pushes.
591 // This would result in the array used by Request::waitall() to be invalidated.
592 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
594 //Finish own requests
595 int size = static_cast<int>(requests_.size());
597 MPI_Request* treqs = requests_.data();
598 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
605 int Win::finish_comms(int rank){
606 // See comment about the mutex in finish_comms() above
608 // Finish own requests
609 // Let's see if we're either the destination or the sender of this request
610 // because we only wait for requests that we are responsible for.
611 // Also use the process id here since the request itself returns from src()
612 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
613 aid_t proc_id = comm_->group()->actor(rank);
614 auto it = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
615 return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
617 std::vector<MPI_Request> myreqqs(it, end(requests_));
618 requests_.erase(it, end(requests_));
619 int size = static_cast<int>(myreqqs.size());
621 MPI_Request* treqs = myreqqs.data();
622 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
629 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
631 const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
632 for (int i = 0; not target_win && i < comm_->size(); i++) {
633 if (connected_wins_[i]->size_ > 0)
634 target_win = connected_wins_[i];
637 *size = target_win->size_;
638 *disp_unit = target_win->disp_unit_;
639 *static_cast<void**>(baseptr) = target_win->base_;
642 *static_cast<void**>(baseptr) = nullptr;
647 MPI_Errhandler Win::errhandler()
649 if (errhandler_ != MPI_ERRHANDLER_NULL)
654 void Win::set_errhandler(MPI_Errhandler errhandler)
656 if (errhandler_ != MPI_ERRHANDLER_NULL)
657 simgrid::smpi::Errhandler::unref(errhandler_);
658 errhandler_ = errhandler;
659 if (errhandler_ != MPI_ERRHANDLER_NULL)
663 } // namespace simgrid