1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
21 int Win::keyval_id_=0;
23 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
24 int comm_size = comm->size();
26 XBT_DEBUG("Creating window");
27 if(info!=MPI_INFO_NULL)
31 group_ = MPI_GROUP_NULL;
32 requests_ = new std::vector<MPI_Request>();
33 mut_=xbt_mutex_init();
34 lock_mut_=xbt_mutex_init();
35 atomic_mut_=xbt_mutex_init();
36 connected_wins_ = new MPI_Win[comm_size];
37 connected_wins_[rank_] = this;
40 bar_ = MSG_barrier_init(comm_size);
44 comm->add_rma_win(this);
46 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
49 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
55 //As per the standard, perform a barrier to ensure every async comm is finished
56 MSG_barrier_wait(bar_);
58 int finished = finish_comms();
59 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
62 delete[] connected_wins_;
63 if (name_ != nullptr){
66 if(info_!=MPI_INFO_NULL){
67 MPI_Info_free(&info_);
70 comm_->remove_rma_win(this);
72 Colls::barrier(comm_);
74 MSG_barrier_destroy(bar_);
75 xbt_mutex_destroy(mut_);
76 xbt_mutex_destroy(lock_mut_);
77 xbt_mutex_destroy(atomic_mut_);
85 int Win::attach (void *base, MPI_Aint size){
86 if (not(base_ == MPI_BOTTOM || base_ == 0))
88 base_=0;//actually the address will be given in the RMA calls, as being the disp.
93 int Win::detach (void *base){
99 void Win::get_name(char* name, int* length){
105 *length = strlen(name_);
106 strncpy(name, name_, *length+1);
109 void Win::get_group(MPI_Group* group){
110 if(comm_ != MPI_COMM_NULL){
111 *group = comm_->group();
113 *group = MPI_GROUP_NULL;
117 MPI_Info Win::info(){
118 if(info_== MPI_INFO_NULL)
128 MPI_Aint Win::size(){
136 int Win::disp_unit(){
144 void Win::set_info(MPI_Info info){
145 if(info_!= MPI_INFO_NULL)
150 void Win::set_name(char* name){
151 name_ = xbt_strdup(name);
154 int Win::fence(int assert)
156 XBT_DEBUG("Entering fence");
159 if (assert != MPI_MODE_NOPRECEDE) {
160 // This is not the first fence => finalize what came before
161 MSG_barrier_wait(bar_);
162 xbt_mutex_acquire(mut_);
163 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
164 // Without this, the vector could get redimensionned when another process pushes.
165 // This would result in the array used by Request::waitall() to be invalidated.
166 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
167 std::vector<MPI_Request> *reqs = requests_;
168 int size = static_cast<int>(reqs->size());
169 // start all requests that have been prepared by another process
171 MPI_Request* treqs = &(*reqs)[0];
172 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
175 xbt_mutex_release(mut_);
178 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
182 MSG_barrier_wait(bar_);
183 XBT_DEBUG("Leaving fence");
188 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
189 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
191 //get receiver pointer
192 MPI_Win recv_win = connected_wins_[target_rank];
194 if(opened_==0){//check that post/start has been done
195 // no fence or start .. lock ok ?
197 for (auto const& it : recv_win->lockers_)
198 if (it == comm_->rank())
204 if(target_count*target_datatype->get_extent()>recv_win->size_)
207 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
208 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
210 if (target_rank != comm_->rank()) {
211 //prepare send_request
213 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(),
214 comm_->group()->actor(target_rank)->getPid() - 1, SMPI_RMA_TAG + 1, comm_, MPI_OP_NULL);
216 //prepare receiver request
217 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, comm_->rank(),
218 comm_->group()->actor(target_rank)->getPid() - 1, SMPI_RMA_TAG + 1,
219 recv_win->comm_, MPI_OP_NULL);
224 if(request!=nullptr){
227 xbt_mutex_acquire(mut_);
228 requests_->push_back(sreq);
229 xbt_mutex_release(mut_);
232 //push request to receiver's win
233 xbt_mutex_acquire(recv_win->mut_);
234 recv_win->requests_->push_back(rreq);
236 xbt_mutex_release(recv_win->mut_);
239 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
241 *request = MPI_REQUEST_NULL;
247 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
248 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
251 MPI_Win send_win = connected_wins_[target_rank];
253 if(opened_==0){//check that post/start has been done
254 // no fence or start .. lock ok ?
256 for (auto const& it : send_win->lockers_)
257 if (it == comm_->rank())
263 if(target_count*target_datatype->get_extent()>send_win->size_)
266 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
267 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
269 if(target_rank != comm_->rank()){
270 //prepare send_request
271 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
272 comm_->group()->actor(target_rank)->getPid() - 1, comm_->rank(),
273 SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
275 //prepare receiver request
276 MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
277 comm_->group()->actor(target_rank)->getPid() - 1, comm_->rank(),
278 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
280 //start the send, with another process than us as sender.
282 //push request to receiver's win
283 xbt_mutex_acquire(send_win->mut_);
284 send_win->requests_->push_back(sreq);
285 xbt_mutex_release(send_win->mut_);
290 if(request!=nullptr){
293 xbt_mutex_acquire(mut_);
294 requests_->push_back(rreq);
295 xbt_mutex_release(mut_);
299 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
301 *request=MPI_REQUEST_NULL;
308 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
309 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
311 XBT_DEBUG("Entering MPI_Win_Accumulate");
312 //get receiver pointer
313 MPI_Win recv_win = connected_wins_[target_rank];
315 if(opened_==0){//check that post/start has been done
316 // no fence or start .. lock ok ?
318 for (auto const& it : recv_win->lockers_)
319 if (it == comm_->rank())
324 //FIXME: local version
326 if(target_count*target_datatype->get_extent()>recv_win->size_)
329 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
330 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
331 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
332 //prepare send_request
335 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(),
336 comm_->group()->actor(target_rank)->getPid() - 1, SMPI_RMA_TAG - 3 - count_, comm_, op);
338 // prepare receiver request
339 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, comm_->rank(),
340 comm_->group()->actor(target_rank)->getPid() - 1, SMPI_RMA_TAG - 3 - count_,
341 recv_win->comm_, op);
347 // push request to receiver's win
348 xbt_mutex_acquire(recv_win->mut_);
349 recv_win->requests_->push_back(rreq);
351 xbt_mutex_release(recv_win->mut_);
353 if (request != nullptr) {
356 xbt_mutex_acquire(mut_);
357 requests_->push_back(sreq);
358 xbt_mutex_release(mut_);
361 XBT_DEBUG("Leaving MPI_Win_Accumulate");
365 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
366 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
367 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
370 MPI_Win send_win = connected_wins_[target_rank];
372 if(opened_==0){//check that post/start has been done
373 // no fence or start .. lock ok ?
375 for (auto const& it : send_win->lockers_)
376 if (it == comm_->rank())
382 if(target_count*target_datatype->get_extent()>send_win->size_)
385 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
386 //need to be sure ops are correctly ordered, so finish request here ? slow.
388 xbt_mutex_acquire(send_win->atomic_mut_);
389 get(result_addr, result_count, result_datatype, target_rank,
390 target_disp, target_count, target_datatype, &req);
391 if (req != MPI_REQUEST_NULL)
392 Request::wait(&req, MPI_STATUS_IGNORE);
394 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
395 target_disp, target_count, target_datatype, op, &req);
396 if (req != MPI_REQUEST_NULL)
397 Request::wait(&req, MPI_STATUS_IGNORE);
398 xbt_mutex_release(send_win->atomic_mut_);
403 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
404 void *result_addr, MPI_Datatype datatype, int target_rank,
405 MPI_Aint target_disp){
407 MPI_Win send_win = connected_wins_[target_rank];
409 if(opened_==0){//check that post/start has been done
410 // no fence or start .. lock ok ?
412 for (auto const& it : send_win->lockers_)
413 if (it == comm_->rank())
419 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
420 MPI_Request req = MPI_REQUEST_NULL;
421 xbt_mutex_acquire(send_win->atomic_mut_);
422 get(result_addr, 1, datatype, target_rank,
423 target_disp, 1, datatype, &req);
424 if (req != MPI_REQUEST_NULL)
425 Request::wait(&req, MPI_STATUS_IGNORE);
426 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
427 put(origin_addr, 1, datatype, target_rank,
428 target_disp, 1, datatype);
430 xbt_mutex_release(send_win->atomic_mut_);
434 int Win::start(MPI_Group group, int assert){
435 /* From MPI forum advices
436 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
437 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
438 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
439 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
440 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
441 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
442 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
443 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
444 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
445 must complete, without further dependencies. */
447 //naive, blocking implementation.
450 int size = group->size();
451 MPI_Request* reqs = xbt_new0(MPI_Request, size);
453 XBT_DEBUG("Entering MPI_Win_Start");
455 int src = group->actor(j)->getPid()-1;
456 if (src != comm_->rank() && src != MPI_UNDEFINED) {
457 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD);
463 Request::startall(size, reqs);
464 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
466 Request::unref(&reqs[i]);
469 opened_++; //we're open for business !
472 XBT_DEBUG("Leaving MPI_Win_Start");
476 int Win::post(MPI_Group group, int assert){
477 //let's make a synchronous send here
480 int size = group->size();
481 MPI_Request* reqs = xbt_new0(MPI_Request, size);
483 XBT_DEBUG("Entering MPI_Win_Post");
485 int dst=group->actor(j)->getPid()-1;
486 if (dst != comm_->rank() && dst != MPI_UNDEFINED) {
487 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD);
494 Request::startall(size, reqs);
495 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
497 Request::unref(&reqs[i]);
500 opened_++; //we're open for business !
503 XBT_DEBUG("Leaving MPI_Win_Post");
509 xbt_die("Complete called on already opened MPI_Win");
511 XBT_DEBUG("Entering MPI_Win_Complete");
514 int size = group_->size();
515 MPI_Request* reqs = xbt_new0(MPI_Request, size);
518 int dst=group_->actor(j)->getPid()-1;
519 if (dst != comm_->rank() && dst != MPI_UNDEFINED) {
520 reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD);
526 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
527 Request::startall(size, reqs);
528 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
531 Request::unref(&reqs[i]);
535 int finished = finish_comms();
536 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
538 Group::unref(group_);
539 opened_--; //we're closed for business !
544 //naive, blocking implementation.
545 XBT_DEBUG("Entering MPI_Win_Wait");
548 int size = group_->size();
549 MPI_Request* reqs = xbt_new0(MPI_Request, size);
552 int src=group_->actor(j)->getPid()-1;
553 if (src != comm_->rank() && src != MPI_UNDEFINED) {
554 reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD);
560 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
561 Request::startall(size, reqs);
562 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
564 Request::unref(&reqs[i]);
567 int finished = finish_comms();
568 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
570 Group::unref(group_);
571 opened_--; //we're opened for business !
575 int Win::lock(int lock_type, int rank, int assert){
576 MPI_Win target_win = connected_wins_[rank];
578 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
579 xbt_mutex_acquire(target_win->lock_mut_);
580 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
581 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
582 xbt_mutex_release(target_win->lock_mut_);
584 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
585 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
587 target_win->lockers_.push_back(comm_->rank());
589 int finished = finish_comms(rank);
590 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
591 finished = target_win->finish_comms(rank_);
592 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
596 int Win::lock_all(int assert){
598 int retval = MPI_SUCCESS;
599 for (i=0; i<comm_->size();i++){
600 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
601 if(ret != MPI_SUCCESS)
607 int Win::unlock(int rank){
608 MPI_Win target_win = connected_wins_[rank];
609 int target_mode = target_win->mode_;
610 target_win->mode_= 0;
611 target_win->lockers_.remove(comm_->rank());
612 if (target_mode==MPI_LOCK_EXCLUSIVE){
613 xbt_mutex_release(target_win->lock_mut_);
616 int finished = finish_comms(rank);
617 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
618 finished = target_win->finish_comms(rank_);
619 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
623 int Win::unlock_all(){
625 int retval = MPI_SUCCESS;
626 for (i=0; i<comm_->size();i++){
627 int ret = this->unlock(i);
628 if(ret != MPI_SUCCESS)
634 int Win::flush(int rank){
635 MPI_Win target_win = connected_wins_[rank];
636 int finished = finish_comms(rank);
637 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
638 finished = target_win->finish_comms(rank_);
639 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
643 int Win::flush_local(int rank){
644 int finished = finish_comms(rank);
645 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
649 int Win::flush_all(){
652 finished = finish_comms();
653 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
654 for (i=0; i<comm_->size();i++){
655 finished = connected_wins_[i]->finish_comms(rank_);
656 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
661 int Win::flush_local_all(){
662 int finished = finish_comms();
663 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
667 Win* Win::f2c(int id){
668 return static_cast<Win*>(F2C::f2c(id));
672 int Win::finish_comms(){
673 xbt_mutex_acquire(mut_);
674 //Finish own requests
675 std::vector<MPI_Request> *reqqs = requests_;
676 int size = static_cast<int>(reqqs->size());
678 MPI_Request* treqs = &(*reqqs)[0];
679 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
682 xbt_mutex_release(mut_);
686 int Win::finish_comms(int rank){
687 xbt_mutex_acquire(mut_);
688 //Finish own requests
689 std::vector<MPI_Request> *reqqs = requests_;
690 int size = static_cast<int>(reqqs->size());
693 std::vector<MPI_Request> myreqqs;
694 std::vector<MPI_Request>::iterator iter = reqqs->begin();
695 while (iter != reqqs->end()){
696 if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
697 myreqqs.push_back(*iter);
698 iter = reqqs->erase(iter);
705 MPI_Request* treqs = &myreqqs[0];
706 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
710 xbt_mutex_release(mut_);