1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18 using simgrid::s4u::Actor;
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26 int comm_size = comm->size();
28 XBT_DEBUG("Creating window");
29 if(info!=MPI_INFO_NULL)
33 group_ = MPI_GROUP_NULL;
34 requests_ = new std::vector<MPI_Request>();
35 mut_=xbt_mutex_init();
36 lock_mut_=xbt_mutex_init();
37 atomic_mut_=xbt_mutex_init();
38 connected_wins_ = new MPI_Win[comm_size];
39 connected_wins_[rank_] = this;
42 bar_ = MSG_barrier_init(comm_size);
46 comm->add_rma_win(this);
48 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
51 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
57 //As per the standard, perform a barrier to ensure every async comm is finished
58 MSG_barrier_wait(bar_);
60 int finished = finish_comms();
61 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64 delete[] connected_wins_;
65 if (name_ != nullptr){
68 if(info_!=MPI_INFO_NULL){
69 MPI_Info_free(&info_);
72 comm_->remove_rma_win(this);
74 Colls::barrier(comm_);
76 MSG_barrier_destroy(bar_);
77 xbt_mutex_destroy(mut_);
78 xbt_mutex_destroy(lock_mut_);
79 xbt_mutex_destroy(atomic_mut_);
87 int Win::attach (void *base, MPI_Aint size){
88 if (not(base_ == MPI_BOTTOM || base_ == 0))
90 base_=0;//actually the address will be given in the RMA calls, as being the disp.
95 int Win::detach (void *base){
101 void Win::get_name(char* name, int* length){
107 *length = strlen(name_);
108 strncpy(name, name_, *length+1);
111 void Win::get_group(MPI_Group* group){
112 if(comm_ != MPI_COMM_NULL){
113 *group = comm_->group();
115 *group = MPI_GROUP_NULL;
119 MPI_Info Win::info(){
120 if(info_== MPI_INFO_NULL)
130 MPI_Aint Win::size(){
138 int Win::disp_unit(){
146 void Win::set_info(MPI_Info info){
147 if(info_!= MPI_INFO_NULL)
152 void Win::set_name(char* name){
153 name_ = xbt_strdup(name);
156 int Win::fence(int assert)
158 XBT_DEBUG("Entering fence");
161 if (assert != MPI_MODE_NOPRECEDE) {
162 // This is not the first fence => finalize what came before
163 MSG_barrier_wait(bar_);
164 xbt_mutex_acquire(mut_);
165 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
166 // Without this, the vector could get redimensionned when another process pushes.
167 // This would result in the array used by Request::waitall() to be invalidated.
168 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
169 std::vector<MPI_Request> *reqs = requests_;
170 int size = static_cast<int>(reqs->size());
171 // start all requests that have been prepared by another process
173 MPI_Request* treqs = &(*reqs)[0];
174 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
177 xbt_mutex_release(mut_);
180 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
184 MSG_barrier_wait(bar_);
185 XBT_DEBUG("Leaving fence");
190 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
191 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
193 //get receiver pointer
194 MPI_Win recv_win = connected_wins_[target_rank];
196 if(opened_==0){//check that post/start has been done
197 // no fence or start .. lock ok ?
199 for (auto const& it : recv_win->lockers_)
200 if (it == comm_->rank())
206 if(target_count*target_datatype->get_extent()>recv_win->size_)
209 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
210 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
212 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
213 //prepare send_request
215 // TODO cheinrich Check for rank / pid conversion
216 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(),
217 target_rank, SMPI_RMA_TAG + 1, comm_, MPI_OP_NULL);
219 //prepare receiver request
220 // TODO cheinrich Check for rank / pid conversion
221 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
222 target_rank, SMPI_RMA_TAG + 1,
223 recv_win->comm_, MPI_OP_NULL);
228 if(request!=nullptr){
231 xbt_mutex_acquire(mut_);
232 requests_->push_back(sreq);
233 xbt_mutex_release(mut_);
236 //push request to receiver's win
237 xbt_mutex_acquire(recv_win->mut_);
238 recv_win->requests_->push_back(rreq);
240 xbt_mutex_release(recv_win->mut_);
243 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
245 *request = MPI_REQUEST_NULL;
251 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
252 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
255 MPI_Win send_win = connected_wins_[target_rank];
257 if(opened_==0){//check that post/start has been done
258 // no fence or start .. lock ok ?
260 for (auto const& it : send_win->lockers_)
261 if (it == comm_->rank())
267 if(target_count*target_datatype->get_extent()>send_win->size_)
270 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
271 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
273 if(target_rank != comm_->rank()){
274 //prepare send_request
275 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
276 target_rank, send_win->comm_->rank(),
277 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
279 //prepare receiver request
280 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
281 target_rank, comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
282 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
284 //start the send, with another process than us as sender.
286 //push request to receiver's win
287 xbt_mutex_acquire(send_win->mut_);
288 send_win->requests_->push_back(sreq);
289 xbt_mutex_release(send_win->mut_);
294 if(request!=nullptr){
297 xbt_mutex_acquire(mut_);
298 requests_->push_back(rreq);
299 xbt_mutex_release(mut_);
303 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
305 *request=MPI_REQUEST_NULL;
312 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
313 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
315 XBT_DEBUG("Entering MPI_Win_Accumulate");
316 //get receiver pointer
317 MPI_Win recv_win = connected_wins_[target_rank];
319 if(opened_==0){//check that post/start has been done
320 // no fence or start .. lock ok ?
322 for (auto const& it : recv_win->lockers_)
323 if (it == comm_->rank())
328 //FIXME: local version
330 if(target_count*target_datatype->get_extent()>recv_win->size_)
333 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
334 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
335 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
336 //prepare send_request
339 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(),
340 target_rank, SMPI_RMA_TAG - 3 - count_, comm_, op);
342 // prepare receiver request
343 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
344 target_rank, SMPI_RMA_TAG - 3 - count_,
345 recv_win->comm_, op);
351 // push request to receiver's win
352 xbt_mutex_acquire(recv_win->mut_);
353 recv_win->requests_->push_back(rreq);
355 xbt_mutex_release(recv_win->mut_);
357 if (request != nullptr) {
360 xbt_mutex_acquire(mut_);
361 requests_->push_back(sreq);
362 xbt_mutex_release(mut_);
365 XBT_DEBUG("Leaving MPI_Win_Accumulate");
369 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
370 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
371 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
374 MPI_Win send_win = connected_wins_[target_rank];
376 if(opened_==0){//check that post/start has been done
377 // no fence or start .. lock ok ?
379 for (auto const& it : send_win->lockers_)
380 if (it == comm_->rank())
386 if(target_count*target_datatype->get_extent()>send_win->size_)
389 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
390 //need to be sure ops are correctly ordered, so finish request here ? slow.
392 xbt_mutex_acquire(send_win->atomic_mut_);
393 get(result_addr, result_count, result_datatype, target_rank,
394 target_disp, target_count, target_datatype, &req);
395 if (req != MPI_REQUEST_NULL)
396 Request::wait(&req, MPI_STATUS_IGNORE);
398 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
399 target_disp, target_count, target_datatype, op, &req);
400 if (req != MPI_REQUEST_NULL)
401 Request::wait(&req, MPI_STATUS_IGNORE);
402 xbt_mutex_release(send_win->atomic_mut_);
407 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
408 void *result_addr, MPI_Datatype datatype, int target_rank,
409 MPI_Aint target_disp){
411 MPI_Win send_win = connected_wins_[target_rank];
413 if(opened_==0){//check that post/start has been done
414 // no fence or start .. lock ok ?
416 for (auto const& it : send_win->lockers_)
417 if (it == comm_->rank())
423 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
424 MPI_Request req = MPI_REQUEST_NULL;
425 xbt_mutex_acquire(send_win->atomic_mut_);
426 get(result_addr, 1, datatype, target_rank,
427 target_disp, 1, datatype, &req);
428 if (req != MPI_REQUEST_NULL)
429 Request::wait(&req, MPI_STATUS_IGNORE);
430 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
431 put(origin_addr, 1, datatype, target_rank,
432 target_disp, 1, datatype);
434 xbt_mutex_release(send_win->atomic_mut_);
438 int Win::start(MPI_Group group, int assert){
439 /* From MPI forum advices
440 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
441 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
442 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
443 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
444 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
445 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
446 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
447 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
448 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
449 must complete, without further dependencies. */
451 //naive, blocking implementation.
454 int size = group->size();
455 MPI_Request* reqs = xbt_new0(MPI_Request, size);
457 XBT_DEBUG("Entering MPI_Win_Start");
459 int src = comm_->group()->rank(group->actor(j));
460 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
461 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
467 Request::startall(size, reqs);
468 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
470 Request::unref(&reqs[i]);
473 opened_++; //we're open for business !
476 XBT_DEBUG("Leaving MPI_Win_Start");
480 int Win::post(MPI_Group group, int assert){
481 //let's make a synchronous send here
484 int size = group->size();
485 MPI_Request* reqs = xbt_new0(MPI_Request, size);
487 XBT_DEBUG("Entering MPI_Win_Post");
489 int dst = comm_->group()->rank(group->actor(j));
490 if (dst != rank_ && dst != MPI_UNDEFINED) {
491 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, comm_);
498 Request::startall(size, reqs);
499 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
501 Request::unref(&reqs[i]);
504 opened_++; //we're open for business !
507 XBT_DEBUG("Leaving MPI_Win_Post");
513 xbt_die("Complete called on already opened MPI_Win");
515 XBT_DEBUG("Entering MPI_Win_Complete");
518 int size = group_->size();
519 MPI_Request* reqs = xbt_new0(MPI_Request, size);
522 int dst = comm_->group()->rank(group_->actor(j));
523 if (dst != rank_ && dst != MPI_UNDEFINED) {
524 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, comm_);
530 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
531 Request::startall(size, reqs);
532 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
535 Request::unref(&reqs[i]);
539 int finished = finish_comms();
540 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
542 Group::unref(group_);
543 opened_--; //we're closed for business !
548 //naive, blocking implementation.
549 XBT_DEBUG("Entering MPI_Win_Wait");
552 int size = group_->size();
553 MPI_Request* reqs = xbt_new0(MPI_Request, size);
556 int src = comm_->group()->rank(group_->actor(j));
557 if (src != rank_ && src != MPI_UNDEFINED) {
558 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, comm_);
564 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
565 Request::startall(size, reqs);
566 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
568 Request::unref(&reqs[i]);
571 int finished = finish_comms();
572 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
574 Group::unref(group_);
575 opened_--; //we're opened for business !
579 int Win::lock(int lock_type, int rank, int assert){
580 MPI_Win target_win = connected_wins_[rank];
582 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
583 xbt_mutex_acquire(target_win->lock_mut_);
584 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
585 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
586 xbt_mutex_release(target_win->lock_mut_);
588 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
589 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
591 target_win->lockers_.push_back(comm_->rank());
593 int finished = finish_comms(rank);
594 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
595 finished = target_win->finish_comms(rank_);
596 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
600 int Win::lock_all(int assert){
602 int retval = MPI_SUCCESS;
603 for (i=0; i<comm_->size();i++){
604 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
605 if(ret != MPI_SUCCESS)
611 int Win::unlock(int rank){
612 MPI_Win target_win = connected_wins_[rank];
613 int target_mode = target_win->mode_;
614 target_win->mode_= 0;
615 target_win->lockers_.remove(comm_->rank());
616 if (target_mode==MPI_LOCK_EXCLUSIVE){
617 xbt_mutex_release(target_win->lock_mut_);
620 int finished = finish_comms(rank);
621 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
622 finished = target_win->finish_comms(rank_);
623 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
627 int Win::unlock_all(){
629 int retval = MPI_SUCCESS;
630 for (i=0; i<comm_->size();i++){
631 int ret = this->unlock(i);
632 if(ret != MPI_SUCCESS)
638 int Win::flush(int rank){
639 MPI_Win target_win = connected_wins_[rank];
640 int finished = finish_comms(rank);
641 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
642 finished = target_win->finish_comms(rank_);
643 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
647 int Win::flush_local(int rank){
648 int finished = finish_comms(rank);
649 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
653 int Win::flush_all(){
656 finished = finish_comms();
657 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
658 for (i=0; i<comm_->size();i++){
659 finished = connected_wins_[i]->finish_comms(rank_);
660 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
665 int Win::flush_local_all(){
666 int finished = finish_comms();
667 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
671 Win* Win::f2c(int id){
672 return static_cast<Win*>(F2C::f2c(id));
676 int Win::finish_comms(){
677 xbt_mutex_acquire(mut_);
678 //Finish own requests
679 std::vector<MPI_Request> *reqqs = requests_;
680 int size = static_cast<int>(reqqs->size());
682 MPI_Request* treqs = &(*reqqs)[0];
683 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
686 xbt_mutex_release(mut_);
690 int Win::finish_comms(int rank){
691 xbt_mutex_acquire(mut_);
692 //Finish own requests
693 std::vector<MPI_Request> *reqqs = requests_;
694 int size = static_cast<int>(reqqs->size());
697 std::vector<MPI_Request> myreqqs;
698 std::vector<MPI_Request>::iterator iter = reqqs->begin();
699 while (iter != reqqs->end()){
700 if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
701 myreqqs.push_back(*iter);
702 iter = reqqs->erase(iter);
709 MPI_Request* treqs = &myreqqs[0];
710 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
714 xbt_mutex_release(mut_);