1 /* Copyright (c) 2007-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18 using simgrid::s4u::Actor;
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26 int comm_size = comm->size();
28 XBT_DEBUG("Creating window");
29 if(info!=MPI_INFO_NULL)
33 group_ = MPI_GROUP_NULL;
34 requests_ = new std::vector<MPI_Request>();
35 mut_ = xbt_mutex_init();
36 lock_mut_ = xbt_mutex_init();
37 atomic_mut_ = xbt_mutex_init();
38 connected_wins_ = new MPI_Win[comm_size];
39 connected_wins_[rank_] = this;
42 bar_ = new simgrid::s4u::Barrier(comm_size);
46 comm->add_rma_win(this);
49 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
52 Colls::bcast(&(bar_), sizeof(simgrid::s4u::Barrier*), MPI_BYTE, 0, comm);
58 //As per the standard, perform a barrier to ensure every async comm is finished
61 int finished = finish_comms();
62 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
65 delete[] connected_wins_;
66 if (name_ != nullptr){
69 if(info_!=MPI_INFO_NULL){
70 MPI_Info_free(&info_);
73 comm_->remove_rma_win(this);
75 Colls::barrier(comm_);
80 xbt_mutex_destroy(mut_);
81 xbt_mutex_destroy(lock_mut_);
82 xbt_mutex_destroy(atomic_mut_);
90 int Win::attach (void *base, MPI_Aint size){
91 if (not(base_ == MPI_BOTTOM || base_ == 0))
93 base_=0;//actually the address will be given in the RMA calls, as being the disp.
98 int Win::detach (void *base){
104 void Win::get_name(char* name, int* length){
110 *length = strlen(name_);
111 strncpy(name, name_, *length+1);
114 void Win::get_group(MPI_Group* group){
115 if(comm_ != MPI_COMM_NULL){
116 *group = comm_->group();
118 *group = MPI_GROUP_NULL;
122 MPI_Info Win::info(){
123 if(info_== MPI_INFO_NULL)
133 MPI_Aint Win::size(){
141 int Win::disp_unit(){
149 void Win::set_info(MPI_Info info){
150 if(info_!= MPI_INFO_NULL)
155 void Win::set_name(char* name){
156 name_ = xbt_strdup(name);
159 int Win::fence(int assert)
161 XBT_DEBUG("Entering fence");
164 if (assert != MPI_MODE_NOPRECEDE) {
165 // This is not the first fence => finalize what came before
167 xbt_mutex_acquire(mut_);
168 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
169 // Without this, the vector could get redimensionned when another process pushes.
170 // This would result in the array used by Request::waitall() to be invalidated.
171 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
172 std::vector<MPI_Request> *reqs = requests_;
173 int size = static_cast<int>(reqs->size());
174 // start all requests that have been prepared by another process
176 MPI_Request* treqs = &(*reqs)[0];
177 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
180 xbt_mutex_release(mut_);
183 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
188 XBT_DEBUG("Leaving fence");
193 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
194 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
196 //get receiver pointer
197 MPI_Win recv_win = connected_wins_[target_rank];
199 if(opened_==0){//check that post/start has been done
200 // no fence or start .. lock ok ?
202 for (auto const& it : recv_win->lockers_)
203 if (it == comm_->rank())
209 if(target_count*target_datatype->get_extent()>recv_win->size_)
212 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
214 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
215 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
216 // prepare send_request
218 // TODO cheinrich Check for rank / pid conversion
219 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
222 //prepare receiver request
223 // TODO cheinrich Check for rank / pid conversion
224 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
225 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
230 if(request!=nullptr){
233 xbt_mutex_acquire(mut_);
234 requests_->push_back(sreq);
235 xbt_mutex_release(mut_);
238 //push request to receiver's win
239 xbt_mutex_acquire(recv_win->mut_);
240 recv_win->requests_->push_back(rreq);
242 xbt_mutex_release(recv_win->mut_);
245 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
246 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
248 *request = MPI_REQUEST_NULL;
254 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
255 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
258 MPI_Win send_win = connected_wins_[target_rank];
260 if(opened_==0){//check that post/start has been done
261 // no fence or start .. lock ok ?
263 for (auto const& it : send_win->lockers_)
264 if (it == comm_->rank())
270 if(target_count*target_datatype->get_extent()>send_win->size_)
273 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
274 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
276 if(target_rank != comm_->rank()){
277 //prepare send_request
278 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
279 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
281 //prepare receiver request
282 MPI_Request rreq = Request::rma_recv_init(
283 origin_addr, origin_count, origin_datatype, target_rank,
284 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
285 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
287 //start the send, with another process than us as sender.
289 //push request to receiver's win
290 xbt_mutex_acquire(send_win->mut_);
291 send_win->requests_->push_back(sreq);
292 xbt_mutex_release(send_win->mut_);
297 if(request!=nullptr){
300 xbt_mutex_acquire(mut_);
301 requests_->push_back(rreq);
302 xbt_mutex_release(mut_);
306 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
308 *request=MPI_REQUEST_NULL;
315 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
316 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
318 XBT_DEBUG("Entering MPI_Win_Accumulate");
319 //get receiver pointer
320 MPI_Win recv_win = connected_wins_[target_rank];
322 if(opened_==0){//check that post/start has been done
323 // no fence or start .. lock ok ?
325 for (auto const& it : recv_win->lockers_)
326 if (it == comm_->rank())
331 //FIXME: local version
333 if(target_count*target_datatype->get_extent()>recv_win->size_)
336 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
337 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
338 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
339 //prepare send_request
341 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
342 SMPI_RMA_TAG - 3 - count_, comm_, op);
344 // prepare receiver request
345 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
346 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
352 // push request to receiver's win
353 xbt_mutex_acquire(recv_win->mut_);
354 recv_win->requests_->push_back(rreq);
356 xbt_mutex_release(recv_win->mut_);
358 if (request != nullptr) {
361 xbt_mutex_acquire(mut_);
362 requests_->push_back(sreq);
363 xbt_mutex_release(mut_);
366 XBT_DEBUG("Leaving MPI_Win_Accumulate");
370 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
371 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
372 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
375 MPI_Win send_win = connected_wins_[target_rank];
377 if(opened_==0){//check that post/start has been done
378 // no fence or start .. lock ok ?
380 for (auto const& it : send_win->lockers_)
381 if (it == comm_->rank())
387 if(target_count*target_datatype->get_extent()>send_win->size_)
390 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
391 //need to be sure ops are correctly ordered, so finish request here ? slow.
393 xbt_mutex_acquire(send_win->atomic_mut_);
394 get(result_addr, result_count, result_datatype, target_rank,
395 target_disp, target_count, target_datatype, &req);
396 if (req != MPI_REQUEST_NULL)
397 Request::wait(&req, MPI_STATUS_IGNORE);
399 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
400 target_disp, target_count, target_datatype, op, &req);
401 if (req != MPI_REQUEST_NULL)
402 Request::wait(&req, MPI_STATUS_IGNORE);
403 xbt_mutex_release(send_win->atomic_mut_);
408 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
409 void *result_addr, MPI_Datatype datatype, int target_rank,
410 MPI_Aint target_disp){
412 MPI_Win send_win = connected_wins_[target_rank];
414 if(opened_==0){//check that post/start has been done
415 // no fence or start .. lock ok ?
417 for (auto const& it : send_win->lockers_)
418 if (it == comm_->rank())
424 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
425 MPI_Request req = MPI_REQUEST_NULL;
426 xbt_mutex_acquire(send_win->atomic_mut_);
427 get(result_addr, 1, datatype, target_rank,
428 target_disp, 1, datatype, &req);
429 if (req != MPI_REQUEST_NULL)
430 Request::wait(&req, MPI_STATUS_IGNORE);
431 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
432 put(origin_addr, 1, datatype, target_rank,
433 target_disp, 1, datatype);
435 xbt_mutex_release(send_win->atomic_mut_);
439 int Win::start(MPI_Group group, int assert){
440 /* From MPI forum advices
441 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
442 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
443 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
444 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
445 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
446 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
447 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
448 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
449 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
450 must complete, without further dependencies. */
452 //naive, blocking implementation.
455 int size = group->size();
456 MPI_Request* reqs = xbt_new0(MPI_Request, size);
458 XBT_DEBUG("Entering MPI_Win_Start");
460 int src = comm_->group()->rank(group->actor(j));
461 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
462 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
468 Request::startall(size, reqs);
469 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
470 for (i = 0; i < size; i++) {
471 Request::unref(&reqs[i]);
474 opened_++; //we're open for business !
477 XBT_DEBUG("Leaving MPI_Win_Start");
481 int Win::post(MPI_Group group, int assert){
482 //let's make a synchronous send here
485 int size = group->size();
486 MPI_Request* reqs = xbt_new0(MPI_Request, size);
488 XBT_DEBUG("Entering MPI_Win_Post");
490 int dst = comm_->group()->rank(group->actor(j));
491 if (dst != rank_ && dst != MPI_UNDEFINED) {
492 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
499 Request::startall(size, reqs);
500 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
502 Request::unref(&reqs[i]);
505 opened_++; //we're open for business !
508 XBT_DEBUG("Leaving MPI_Win_Post");
514 xbt_die("Complete called on already opened MPI_Win");
516 XBT_DEBUG("Entering MPI_Win_Complete");
519 int size = group_->size();
520 MPI_Request* reqs = xbt_new0(MPI_Request, size);
523 int dst = comm_->group()->rank(group_->actor(j));
524 if (dst != rank_ && dst != MPI_UNDEFINED) {
525 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
531 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
532 Request::startall(size, reqs);
533 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
536 Request::unref(&reqs[i]);
540 int finished = finish_comms();
541 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
543 Group::unref(group_);
544 opened_--; //we're closed for business !
549 //naive, blocking implementation.
550 XBT_DEBUG("Entering MPI_Win_Wait");
553 int size = group_->size();
554 MPI_Request* reqs = xbt_new0(MPI_Request, size);
557 int src = comm_->group()->rank(group_->actor(j));
558 if (src != rank_ && src != MPI_UNDEFINED) {
559 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
565 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
566 Request::startall(size, reqs);
567 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
569 Request::unref(&reqs[i]);
572 int finished = finish_comms();
573 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
575 Group::unref(group_);
576 opened_--; //we're opened for business !
580 int Win::lock(int lock_type, int rank, int assert){
581 MPI_Win target_win = connected_wins_[rank];
583 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
584 xbt_mutex_acquire(target_win->lock_mut_);
585 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
586 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
587 xbt_mutex_release(target_win->lock_mut_);
589 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
590 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
592 target_win->lockers_.push_back(comm_->rank());
594 int finished = finish_comms(rank);
595 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
596 finished = target_win->finish_comms(rank_);
597 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
601 int Win::lock_all(int assert){
603 int retval = MPI_SUCCESS;
604 for (i=0; i<comm_->size();i++){
605 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
606 if(ret != MPI_SUCCESS)
612 int Win::unlock(int rank){
613 MPI_Win target_win = connected_wins_[rank];
614 int target_mode = target_win->mode_;
615 target_win->mode_= 0;
616 target_win->lockers_.remove(comm_->rank());
617 if (target_mode==MPI_LOCK_EXCLUSIVE){
618 xbt_mutex_release(target_win->lock_mut_);
621 int finished = finish_comms(rank);
622 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
623 finished = target_win->finish_comms(rank_);
624 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
628 int Win::unlock_all(){
630 int retval = MPI_SUCCESS;
631 for (i=0; i<comm_->size();i++){
632 int ret = this->unlock(i);
633 if (ret != MPI_SUCCESS)
639 int Win::flush(int rank){
640 MPI_Win target_win = connected_wins_[rank];
641 int finished = finish_comms(rank_);
642 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
643 finished = target_win->finish_comms(rank);
644 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
648 int Win::flush_local(int rank){
649 int finished = finish_comms(rank);
650 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
654 int Win::flush_all(){
655 int finished = finish_comms();
656 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
657 for (int i = 0; i < comm_->size(); i++) {
658 finished = connected_wins_[i]->finish_comms(rank_);
659 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
664 int Win::flush_local_all(){
665 int finished = finish_comms();
666 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
670 Win* Win::f2c(int id){
671 return static_cast<Win*>(F2C::f2c(id));
674 int Win::finish_comms(){
675 xbt_mutex_acquire(mut_);
676 //Finish own requests
677 std::vector<MPI_Request> *reqqs = requests_;
678 int size = static_cast<int>(reqqs->size());
680 MPI_Request* treqs = &(*reqqs)[0];
681 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
684 xbt_mutex_release(mut_);
688 int Win::finish_comms(int rank){
689 xbt_mutex_acquire(mut_);
690 //Finish own requests
691 std::vector<MPI_Request> *reqqs = requests_;
692 int size = static_cast<int>(reqqs->size());
695 std::vector<MPI_Request> myreqqs;
696 std::vector<MPI_Request>::iterator iter = reqqs->begin();
697 int proc_id = comm_->group()->actor(rank)->get_pid();
698 while (iter != reqqs->end()){
699 // Let's see if we're either the destination or the sender of this request
700 // because we only wait for requests that we are responsible for.
701 // Also use the process id here since the request itself returns from src()
702 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
703 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
704 myreqqs.push_back(*iter);
705 iter = reqqs->erase(iter);
712 MPI_Request* treqs = &myreqqs[0];
713 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
717 xbt_mutex_release(mut_);
721 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
723 MPI_Win target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
724 for (int i = 0; not target_win && i < comm_->size(); i++) {
725 if (connected_wins_[i]->size_ > 0)
726 target_win = connected_wins_[i];
729 *size = target_win->size_;
730 *disp_unit = target_win->disp_unit_;
731 *static_cast<void**>(baseptr) = target_win->base_;
734 *static_cast<void**>(baseptr) = xbt_malloc(0);