1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26 int comm_size = comm->size();
28 XBT_DEBUG("Creating window");
29 if(info!=MPI_INFO_NULL)
33 group_ = MPI_GROUP_NULL;
34 requests_ = new std::vector<MPI_Request>();
35 mut_ = s4u::Mutex::create();
36 lock_mut_ = s4u::Mutex::create();
37 atomic_mut_ = s4u::Mutex::create();
38 connected_wins_ = new MPI_Win[comm_size];
39 connected_wins_[rank_] = this;
42 bar_ = new s4u::Barrier(comm_size);
45 errhandler_=MPI_ERRORS_ARE_FATAL;
47 comm->add_rma_win(this);
50 colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
53 colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
59 //As per the standard, perform a barrier to ensure every async comm is finished
62 int finished = finish_comms();
63 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
66 delete[] connected_wins_;
67 if (name_ != nullptr){
70 if (info_ != MPI_INFO_NULL)
71 simgrid::smpi::Info::unref(info_);
72 if (errhandler_ != MPI_ERRHANDLER_NULL)
73 simgrid::smpi::Errhandler::unref(errhandler_);
75 comm_->remove_rma_win(this);
77 colls::barrier(comm_);
89 int Win::attach(void* /*base*/, MPI_Aint size)
91 if (not(base_ == MPI_BOTTOM || base_ == 0))
93 base_=0;//actually the address will be given in the RMA calls, as being the disp.
98 int Win::detach(const void* /*base*/)
105 void Win::get_name(char* name, int* length){
111 *length = strlen(name_);
112 strncpy(name, name_, *length+1);
115 void Win::get_group(MPI_Group* group){
116 if(comm_ != MPI_COMM_NULL){
117 *group = comm_->group();
119 *group = MPI_GROUP_NULL;
125 if (info_ == MPI_INFO_NULL)
135 MPI_Aint Win::size(){
143 int Win::disp_unit(){
151 void Win::set_info(MPI_Info info)
153 if (info_ != MPI_INFO_NULL)
154 simgrid::smpi::Info::unref(info_);
156 if (info_ != MPI_INFO_NULL)
160 void Win::set_name(const char* name){
161 name_ = xbt_strdup(name);
164 int Win::fence(int assert)
166 XBT_DEBUG("Entering fence");
169 if (assert != MPI_MODE_NOPRECEDE) {
170 // This is not the first fence => finalize what came before
173 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
174 // Without this, the vector could get redimensioned when another process pushes.
175 // This would result in the array used by Request::waitall() to be invalidated.
176 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
177 std::vector<MPI_Request> *reqs = requests_;
178 int size = static_cast<int>(reqs->size());
179 // start all requests that have been prepared by another process
181 MPI_Request* treqs = &(*reqs)[0];
182 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
188 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
193 XBT_DEBUG("Leaving fence");
198 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
199 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
201 //get receiver pointer
202 MPI_Win recv_win = connected_wins_[target_rank];
204 if(opened_==0){//check that post/start has been done
205 // no fence or start .. lock ok ?
207 for (auto const& it : recv_win->lockers_)
208 if (it == comm_->rank())
214 if(target_count*target_datatype->get_extent()>recv_win->size_)
217 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
219 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
220 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
221 // prepare send_request
223 // TODO cheinrich Check for rank / pid conversion
224 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
227 //prepare receiver request
228 // TODO cheinrich Check for rank / pid conversion
229 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
230 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
235 if(request!=nullptr){
239 requests_->push_back(sreq);
243 //push request to receiver's win
244 recv_win->mut_->lock();
245 recv_win->requests_->push_back(rreq);
247 recv_win->mut_->unlock();
249 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
250 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
252 *request = MPI_REQUEST_NULL;
258 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
259 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
262 MPI_Win send_win = connected_wins_[target_rank];
264 if(opened_==0){//check that post/start has been done
265 // no fence or start .. lock ok ?
267 for (auto const& it : send_win->lockers_)
268 if (it == comm_->rank())
274 if(target_count*target_datatype->get_extent()>send_win->size_)
277 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
278 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
280 if(target_rank != comm_->rank()){
281 //prepare send_request
282 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
283 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
285 //prepare receiver request
286 MPI_Request rreq = Request::rma_recv_init(
287 origin_addr, origin_count, origin_datatype, target_rank,
288 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
289 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
291 //start the send, with another process than us as sender.
293 //push request to receiver's win
294 send_win->mut_->lock();
295 send_win->requests_->push_back(sreq);
296 send_win->mut_->unlock();
301 if(request!=nullptr){
305 requests_->push_back(rreq);
309 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
311 *request=MPI_REQUEST_NULL;
316 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
317 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
319 XBT_DEBUG("Entering MPI_Win_Accumulate");
320 //get receiver pointer
321 MPI_Win recv_win = connected_wins_[target_rank];
323 if(opened_==0){//check that post/start has been done
324 // no fence or start .. lock ok ?
326 for (auto const& it : recv_win->lockers_)
327 if (it == comm_->rank())
332 //FIXME: local version
334 if(target_count*target_datatype->get_extent()>recv_win->size_)
337 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
338 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
339 // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
340 // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
341 // prepare send_request
343 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
344 SMPI_RMA_TAG - 3 - count_, comm_, op);
346 // prepare receiver request
347 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
348 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
354 // push request to receiver's win
355 recv_win->mut_->lock();
356 recv_win->requests_->push_back(rreq);
358 recv_win->mut_->unlock();
360 if (request != nullptr) {
364 requests_->push_back(sreq);
368 XBT_DEBUG("Leaving MPI_Win_Accumulate");
372 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
373 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
374 int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
377 MPI_Win send_win = connected_wins_[target_rank];
379 if(opened_==0){//check that post/start has been done
380 // no fence or start .. lock ok ?
382 for (auto const& it : send_win->lockers_)
383 if (it == comm_->rank())
389 if(target_count*target_datatype->get_extent()>send_win->size_)
392 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
393 //need to be sure ops are correctly ordered, so finish request here ? slow.
395 send_win->atomic_mut_->lock();
396 get(result_addr, result_count, result_datatype, target_rank,
397 target_disp, target_count, target_datatype, &req);
398 if (req != MPI_REQUEST_NULL)
399 Request::wait(&req, MPI_STATUS_IGNORE);
401 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
402 target_disp, target_count, target_datatype, op, &req);
403 if (req != MPI_REQUEST_NULL)
404 Request::wait(&req, MPI_STATUS_IGNORE);
405 send_win->atomic_mut_->unlock();
409 int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
410 void *result_addr, MPI_Datatype datatype, int target_rank,
411 MPI_Aint target_disp){
413 MPI_Win send_win = connected_wins_[target_rank];
415 if(opened_==0){//check that post/start has been done
416 // no fence or start .. lock ok ?
418 for (auto const& it : send_win->lockers_)
419 if (it == comm_->rank())
425 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
426 MPI_Request req = MPI_REQUEST_NULL;
427 send_win->atomic_mut_->lock();
428 get(result_addr, 1, datatype, target_rank,
429 target_disp, 1, datatype, &req);
430 if (req != MPI_REQUEST_NULL)
431 Request::wait(&req, MPI_STATUS_IGNORE);
432 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
433 put(origin_addr, 1, datatype, target_rank,
434 target_disp, 1, datatype);
436 send_win->atomic_mut_->unlock();
440 int Win::start(MPI_Group group, int /*assert*/)
442 /* From MPI forum advices
443 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
444 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
445 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
446 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
447 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
448 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
449 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
450 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
451 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
452 must complete, without further dependencies. */
454 //naive, blocking implementation.
457 int size = group->size();
458 MPI_Request* reqs = xbt_new0(MPI_Request, size);
460 XBT_DEBUG("Entering MPI_Win_Start");
462 int src = comm_->group()->rank(group->actor(j));
463 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
464 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
470 Request::startall(size, reqs);
471 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
472 for (i = 0; i < size; i++) {
473 Request::unref(&reqs[i]);
476 opened_++; //we're open for business !
479 XBT_DEBUG("Leaving MPI_Win_Start");
483 int Win::post(MPI_Group group, int /*assert*/)
485 //let's make a synchronous send here
488 int size = group->size();
489 MPI_Request* reqs = xbt_new0(MPI_Request, size);
491 XBT_DEBUG("Entering MPI_Win_Post");
493 int dst = comm_->group()->rank(group->actor(j));
494 if (dst != rank_ && dst != MPI_UNDEFINED) {
495 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
502 Request::startall(size, reqs);
503 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
505 Request::unref(&reqs[i]);
508 opened_++; //we're open for business !
511 XBT_DEBUG("Leaving MPI_Win_Post");
517 xbt_die("Complete called on already opened MPI_Win");
519 XBT_DEBUG("Entering MPI_Win_Complete");
522 int size = group_->size();
523 MPI_Request* reqs = xbt_new0(MPI_Request, size);
526 int dst = comm_->group()->rank(group_->actor(j));
527 if (dst != rank_ && dst != MPI_UNDEFINED) {
528 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
534 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
535 Request::startall(size, reqs);
536 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
539 Request::unref(&reqs[i]);
543 int finished = finish_comms();
544 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
546 Group::unref(group_);
547 opened_--; //we're closed for business !
552 //naive, blocking implementation.
553 XBT_DEBUG("Entering MPI_Win_Wait");
556 int size = group_->size();
557 MPI_Request* reqs = xbt_new0(MPI_Request, size);
560 int src = comm_->group()->rank(group_->actor(j));
561 if (src != rank_ && src != MPI_UNDEFINED) {
562 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
568 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
569 Request::startall(size, reqs);
570 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
572 Request::unref(&reqs[i]);
575 int finished = finish_comms();
576 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
578 Group::unref(group_);
579 opened_--; //we're opened for business !
583 int Win::lock(int lock_type, int rank, int /*assert*/)
585 MPI_Win target_win = connected_wins_[rank];
587 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
588 target_win->lock_mut_->lock();
589 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
590 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
591 target_win->lock_mut_->unlock();
593 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
594 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
596 target_win->lockers_.push_back(comm_->rank());
598 int finished = finish_comms(rank);
599 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
600 finished = target_win->finish_comms(rank_);
601 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
605 int Win::lock_all(int assert){
607 int retval = MPI_SUCCESS;
608 for (i=0; i<comm_->size();i++){
609 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
610 if(ret != MPI_SUCCESS)
616 int Win::unlock(int rank){
617 MPI_Win target_win = connected_wins_[rank];
618 int target_mode = target_win->mode_;
619 target_win->mode_= 0;
620 target_win->lockers_.remove(comm_->rank());
621 if (target_mode==MPI_LOCK_EXCLUSIVE){
622 target_win->lock_mut_->unlock();
625 int finished = finish_comms(rank);
626 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
627 finished = target_win->finish_comms(rank_);
628 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
632 int Win::unlock_all(){
634 int retval = MPI_SUCCESS;
635 for (i=0; i<comm_->size();i++){
636 int ret = this->unlock(i);
637 if (ret != MPI_SUCCESS)
643 int Win::flush(int rank){
644 MPI_Win target_win = connected_wins_[rank];
645 int finished = finish_comms(rank_);
646 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
647 finished = target_win->finish_comms(rank);
648 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
652 int Win::flush_local(int rank){
653 int finished = finish_comms(rank);
654 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
658 int Win::flush_all(){
659 int finished = finish_comms();
660 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
661 for (int i = 0; i < comm_->size(); i++) {
662 finished = connected_wins_[i]->finish_comms(rank_);
663 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
668 int Win::flush_local_all(){
669 int finished = finish_comms();
670 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
674 Win* Win::f2c(int id){
675 return static_cast<Win*>(F2C::f2c(id));
678 int Win::finish_comms(){
680 //Finish own requests
681 std::vector<MPI_Request> *reqqs = requests_;
682 int size = static_cast<int>(reqqs->size());
684 MPI_Request* treqs = &(*reqqs)[0];
685 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
692 int Win::finish_comms(int rank){
694 //Finish own requests
695 std::vector<MPI_Request> *reqqs = requests_;
696 int size = static_cast<int>(reqqs->size());
699 std::vector<MPI_Request> myreqqs;
700 std::vector<MPI_Request>::iterator iter = reqqs->begin();
701 int proc_id = comm_->group()->actor(rank)->get_pid();
702 while (iter != reqqs->end()){
703 // Let's see if we're either the destination or the sender of this request
704 // because we only wait for requests that we are responsible for.
705 // Also use the process id here since the request itself returns from src()
706 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
707 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
708 myreqqs.push_back(*iter);
709 iter = reqqs->erase(iter);
716 MPI_Request* treqs = &myreqqs[0];
717 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
725 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
727 MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
728 for (int i = 0; not target_win && i < comm_->size(); i++) {
729 if (connected_wins_[i]->size_ > 0)
730 target_win = connected_wins_[i];
733 *size = target_win->size_;
734 *disp_unit = target_win->disp_unit_;
735 *static_cast<void**>(baseptr) = target_win->base_;
738 *static_cast<void**>(baseptr) = nullptr;
743 MPI_Errhandler Win::errhandler()
745 if (errhandler_ != MPI_ERRHANDLER_NULL)
750 void Win::set_errhandler(MPI_Errhandler errhandler)
752 if (errhandler_ != MPI_ERRHANDLER_NULL)
753 simgrid::smpi::Errhandler::unref(errhandler_);
754 errhandler_ = errhandler;
755 if (errhandler_ != MPI_ERRHANDLER_NULL)
759 } // namespace simgrid