1 /* Copyright (c) 2007-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
19 using simgrid::s4u::Actor;
23 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
24 int Win::keyval_id_=0;
26 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
27 int comm_size = comm->size();
29 XBT_DEBUG("Creating window");
30 if(info!=MPI_INFO_NULL)
34 group_ = MPI_GROUP_NULL;
35 requests_ = new std::vector<MPI_Request>();
36 mut_ = xbt_mutex_init();
37 lock_mut_ = xbt_mutex_init();
38 atomic_mut_ = xbt_mutex_init();
39 connected_wins_ = new MPI_Win[comm_size];
40 connected_wins_[rank_] = this;
43 bar_ = new simgrid::s4u::Barrier(comm_size);
47 comm->add_rma_win(this);
50 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
53 Colls::bcast(&(bar_), sizeof(simgrid::s4u::Barrier*), MPI_BYTE, 0, comm);
59 //As per the standard, perform a barrier to ensure every async comm is finished
62 int finished = finish_comms();
63 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
66 delete[] connected_wins_;
67 if (name_ != nullptr){
70 if(info_!=MPI_INFO_NULL){
71 MPI_Info_free(&info_);
74 comm_->remove_rma_win(this);
76 Colls::barrier(comm_);
81 xbt_mutex_destroy(mut_);
82 xbt_mutex_destroy(lock_mut_);
83 xbt_mutex_destroy(atomic_mut_);
91 int Win::attach (void *base, MPI_Aint size){
92 if (not(base_ == MPI_BOTTOM || base_ == 0))
94 base_=0;//actually the address will be given in the RMA calls, as being the disp.
99 int Win::detach (void *base){
105 void Win::get_name(char* name, int* length){
111 *length = strlen(name_);
112 strncpy(name, name_, *length+1);
115 void Win::get_group(MPI_Group* group){
116 if(comm_ != MPI_COMM_NULL){
117 *group = comm_->group();
119 *group = MPI_GROUP_NULL;
123 MPI_Info Win::info(){
124 if(info_== MPI_INFO_NULL)
134 MPI_Aint Win::size(){
142 int Win::disp_unit(){
150 void Win::set_info(MPI_Info info){
151 if(info_!= MPI_INFO_NULL)
156 void Win::set_name(char* name){
157 name_ = xbt_strdup(name);
160 int Win::fence(int assert)
162 XBT_DEBUG("Entering fence");
165 if (assert != MPI_MODE_NOPRECEDE) {
166 // This is not the first fence => finalize what came before
168 xbt_mutex_acquire(mut_);
169 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
170 // Without this, the vector could get redimensionned when another process pushes.
171 // This would result in the array used by Request::waitall() to be invalidated.
172 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
173 std::vector<MPI_Request> *reqs = requests_;
174 int size = static_cast<int>(reqs->size());
175 // start all requests that have been prepared by another process
177 MPI_Request* treqs = &(*reqs)[0];
178 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
181 xbt_mutex_release(mut_);
184 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
189 XBT_DEBUG("Leaving fence");
194 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
195 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
197 //get receiver pointer
198 MPI_Win recv_win = connected_wins_[target_rank];
200 if(opened_==0){//check that post/start has been done
201 // no fence or start .. lock ok ?
203 for (auto const& it : recv_win->lockers_)
204 if (it == comm_->rank())
210 if(target_count*target_datatype->get_extent()>recv_win->size_)
213 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
215 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
216 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
217 // prepare send_request
219 // TODO cheinrich Check for rank / pid conversion
220 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
223 //prepare receiver request
224 // TODO cheinrich Check for rank / pid conversion
225 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
226 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
231 if(request!=nullptr){
234 xbt_mutex_acquire(mut_);
235 requests_->push_back(sreq);
236 xbt_mutex_release(mut_);
239 //push request to receiver's win
240 xbt_mutex_acquire(recv_win->mut_);
241 recv_win->requests_->push_back(rreq);
243 xbt_mutex_release(recv_win->mut_);
246 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
247 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
249 *request = MPI_REQUEST_NULL;
255 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
256 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
259 MPI_Win send_win = connected_wins_[target_rank];
261 if(opened_==0){//check that post/start has been done
262 // no fence or start .. lock ok ?
264 for (auto const& it : send_win->lockers_)
265 if (it == comm_->rank())
271 if(target_count*target_datatype->get_extent()>send_win->size_)
274 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
275 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
277 if(target_rank != comm_->rank()){
278 //prepare send_request
279 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
280 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
282 //prepare receiver request
283 MPI_Request rreq = Request::rma_recv_init(
284 origin_addr, origin_count, origin_datatype, target_rank,
285 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
286 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
288 //start the send, with another process than us as sender.
290 //push request to receiver's win
291 xbt_mutex_acquire(send_win->mut_);
292 send_win->requests_->push_back(sreq);
293 xbt_mutex_release(send_win->mut_);
298 if(request!=nullptr){
301 xbt_mutex_acquire(mut_);
302 requests_->push_back(rreq);
303 xbt_mutex_release(mut_);
307 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
309 *request=MPI_REQUEST_NULL;
316 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
317 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
319 XBT_DEBUG("Entering MPI_Win_Accumulate");
320 //get receiver pointer
321 MPI_Win recv_win = connected_wins_[target_rank];
323 if(opened_==0){//check that post/start has been done
324 // no fence or start .. lock ok ?
326 for (auto const& it : recv_win->lockers_)
327 if (it == comm_->rank())
332 //FIXME: local version
334 if(target_count*target_datatype->get_extent()>recv_win->size_)
337 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
338 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
339 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
340 //prepare send_request
342 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
343 SMPI_RMA_TAG - 3 - count_, comm_, op);
345 // prepare receiver request
346 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
347 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
353 // push request to receiver's win
354 xbt_mutex_acquire(recv_win->mut_);
355 recv_win->requests_->push_back(rreq);
357 xbt_mutex_release(recv_win->mut_);
359 if (request != nullptr) {
362 xbt_mutex_acquire(mut_);
363 requests_->push_back(sreq);
364 xbt_mutex_release(mut_);
367 XBT_DEBUG("Leaving MPI_Win_Accumulate");
371 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
372 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
373 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
376 MPI_Win send_win = connected_wins_[target_rank];
378 if(opened_==0){//check that post/start has been done
379 // no fence or start .. lock ok ?
381 for (auto const& it : send_win->lockers_)
382 if (it == comm_->rank())
388 if(target_count*target_datatype->get_extent()>send_win->size_)
391 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
392 //need to be sure ops are correctly ordered, so finish request here ? slow.
394 xbt_mutex_acquire(send_win->atomic_mut_);
395 get(result_addr, result_count, result_datatype, target_rank,
396 target_disp, target_count, target_datatype, &req);
397 if (req != MPI_REQUEST_NULL)
398 Request::wait(&req, MPI_STATUS_IGNORE);
400 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
401 target_disp, target_count, target_datatype, op, &req);
402 if (req != MPI_REQUEST_NULL)
403 Request::wait(&req, MPI_STATUS_IGNORE);
404 xbt_mutex_release(send_win->atomic_mut_);
409 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
410 void *result_addr, MPI_Datatype datatype, int target_rank,
411 MPI_Aint target_disp){
413 MPI_Win send_win = connected_wins_[target_rank];
415 if(opened_==0){//check that post/start has been done
416 // no fence or start .. lock ok ?
418 for (auto const& it : send_win->lockers_)
419 if (it == comm_->rank())
425 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
426 MPI_Request req = MPI_REQUEST_NULL;
427 xbt_mutex_acquire(send_win->atomic_mut_);
428 get(result_addr, 1, datatype, target_rank,
429 target_disp, 1, datatype, &req);
430 if (req != MPI_REQUEST_NULL)
431 Request::wait(&req, MPI_STATUS_IGNORE);
432 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
433 put(origin_addr, 1, datatype, target_rank,
434 target_disp, 1, datatype);
436 xbt_mutex_release(send_win->atomic_mut_);
440 int Win::start(MPI_Group group, int assert){
441 /* From MPI forum advices
442 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
443 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
444 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
445 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
446 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
447 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
448 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
449 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
450 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
451 must complete, without further dependencies. */
453 //naive, blocking implementation.
456 int size = group->size();
457 MPI_Request* reqs = xbt_new0(MPI_Request, size);
459 XBT_DEBUG("Entering MPI_Win_Start");
461 int src = comm_->group()->rank(group->actor(j));
462 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
463 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
469 Request::startall(size, reqs);
470 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
471 for (i = 0; i < size; i++) {
472 Request::unref(&reqs[i]);
475 opened_++; //we're open for business !
478 XBT_DEBUG("Leaving MPI_Win_Start");
482 int Win::post(MPI_Group group, int assert){
483 //let's make a synchronous send here
486 int size = group->size();
487 MPI_Request* reqs = xbt_new0(MPI_Request, size);
489 XBT_DEBUG("Entering MPI_Win_Post");
491 int dst = comm_->group()->rank(group->actor(j));
492 if (dst != rank_ && dst != MPI_UNDEFINED) {
493 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
500 Request::startall(size, reqs);
501 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
503 Request::unref(&reqs[i]);
506 opened_++; //we're open for business !
509 XBT_DEBUG("Leaving MPI_Win_Post");
515 xbt_die("Complete called on already opened MPI_Win");
517 XBT_DEBUG("Entering MPI_Win_Complete");
520 int size = group_->size();
521 MPI_Request* reqs = xbt_new0(MPI_Request, size);
524 int dst = comm_->group()->rank(group_->actor(j));
525 if (dst != rank_ && dst != MPI_UNDEFINED) {
526 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
532 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
533 Request::startall(size, reqs);
534 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
537 Request::unref(&reqs[i]);
541 int finished = finish_comms();
542 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
544 Group::unref(group_);
545 opened_--; //we're closed for business !
550 //naive, blocking implementation.
551 XBT_DEBUG("Entering MPI_Win_Wait");
554 int size = group_->size();
555 MPI_Request* reqs = xbt_new0(MPI_Request, size);
558 int src = comm_->group()->rank(group_->actor(j));
559 if (src != rank_ && src != MPI_UNDEFINED) {
560 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
566 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
567 Request::startall(size, reqs);
568 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
570 Request::unref(&reqs[i]);
573 int finished = finish_comms();
574 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
576 Group::unref(group_);
577 opened_--; //we're opened for business !
581 int Win::lock(int lock_type, int rank, int assert){
582 MPI_Win target_win = connected_wins_[rank];
584 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
585 xbt_mutex_acquire(target_win->lock_mut_);
586 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
587 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
588 xbt_mutex_release(target_win->lock_mut_);
590 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
591 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
593 target_win->lockers_.push_back(comm_->rank());
595 int finished = finish_comms(rank);
596 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
597 finished = target_win->finish_comms(rank_);
598 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
602 int Win::lock_all(int assert){
604 int retval = MPI_SUCCESS;
605 for (i=0; i<comm_->size();i++){
606 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
607 if(ret != MPI_SUCCESS)
613 int Win::unlock(int rank){
614 MPI_Win target_win = connected_wins_[rank];
615 int target_mode = target_win->mode_;
616 target_win->mode_= 0;
617 target_win->lockers_.remove(comm_->rank());
618 if (target_mode==MPI_LOCK_EXCLUSIVE){
619 xbt_mutex_release(target_win->lock_mut_);
622 int finished = finish_comms(rank);
623 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
624 finished = target_win->finish_comms(rank_);
625 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
629 int Win::unlock_all(){
631 int retval = MPI_SUCCESS;
632 for (i=0; i<comm_->size();i++){
633 int ret = this->unlock(i);
634 if (ret != MPI_SUCCESS)
640 int Win::flush(int rank){
641 MPI_Win target_win = connected_wins_[rank];
642 int finished = finish_comms(rank_);
643 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
644 finished = target_win->finish_comms(rank);
645 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
649 int Win::flush_local(int rank){
650 int finished = finish_comms(rank);
651 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
655 int Win::flush_all(){
656 int finished = finish_comms();
657 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
658 for (int i = 0; i < comm_->size(); i++) {
659 finished = connected_wins_[i]->finish_comms(rank_);
660 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
665 int Win::flush_local_all(){
666 int finished = finish_comms();
667 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
671 Win* Win::f2c(int id){
672 return static_cast<Win*>(F2C::f2c(id));
675 int Win::finish_comms(){
676 xbt_mutex_acquire(mut_);
677 //Finish own requests
678 std::vector<MPI_Request> *reqqs = requests_;
679 int size = static_cast<int>(reqqs->size());
681 MPI_Request* treqs = &(*reqqs)[0];
682 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
685 xbt_mutex_release(mut_);
689 int Win::finish_comms(int rank){
690 xbt_mutex_acquire(mut_);
691 //Finish own requests
692 std::vector<MPI_Request> *reqqs = requests_;
693 int size = static_cast<int>(reqqs->size());
696 std::vector<MPI_Request> myreqqs;
697 std::vector<MPI_Request>::iterator iter = reqqs->begin();
698 int proc_id = comm_->group()->actor(rank)->get_pid();
699 while (iter != reqqs->end()){
700 // Let's see if we're either the destination or the sender of this request
701 // because we only wait for requests that we are responsible for.
702 // Also use the process id here since the request itself returns from src()
703 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
704 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
705 myreqqs.push_back(*iter);
706 iter = reqqs->erase(iter);
713 MPI_Request* treqs = &myreqqs[0];
714 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
718 xbt_mutex_release(mut_);
722 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
724 MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
725 for (int i = 0; not target_win && i < comm_->size(); i++) {
726 if (connected_wins_[i]->size_ > 0)
727 target_win = connected_wins_[i];
730 *size = target_win->size_;
731 *disp_unit = target_win->disp_unit_;
732 *static_cast<void**>(baseptr) = target_win->base_;
735 *static_cast<void**>(baseptr) = xbt_malloc(0);