1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18 using simgrid::s4u::Actor;
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26 int comm_size = comm->size();
28 XBT_DEBUG("Creating window");
29 if(info!=MPI_INFO_NULL)
33 group_ = MPI_GROUP_NULL;
34 requests_ = new std::vector<MPI_Request>();
35 mut_ = xbt_mutex_init();
36 lock_mut_ = xbt_mutex_init();
37 atomic_mut_ = xbt_mutex_init();
38 connected_wins_ = new MPI_Win[comm_size];
39 connected_wins_[rank_] = this;
42 bar_ = MSG_barrier_init(comm_size);
46 comm->add_rma_win(this);
48 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
51 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
57 //As per the standard, perform a barrier to ensure every async comm is finished
58 MSG_barrier_wait(bar_);
60 int finished = finish_comms();
61 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64 delete[] connected_wins_;
65 if (name_ != nullptr){
68 if(info_!=MPI_INFO_NULL){
69 MPI_Info_free(&info_);
72 comm_->remove_rma_win(this);
74 Colls::barrier(comm_);
76 MSG_barrier_destroy(bar_);
77 xbt_mutex_destroy(mut_);
78 xbt_mutex_destroy(lock_mut_);
79 xbt_mutex_destroy(atomic_mut_);
87 int Win::attach (void *base, MPI_Aint size){
88 if (not(base_ == MPI_BOTTOM || base_ == 0))
90 base_=0;//actually the address will be given in the RMA calls, as being the disp.
95 int Win::detach (void *base){
101 void Win::get_name(char* name, int* length){
107 *length = strlen(name_);
108 strncpy(name, name_, *length+1);
111 void Win::get_group(MPI_Group* group){
112 if(comm_ != MPI_COMM_NULL){
113 *group = comm_->group();
115 *group = MPI_GROUP_NULL;
119 MPI_Info Win::info(){
120 if(info_== MPI_INFO_NULL)
130 MPI_Aint Win::size(){
138 int Win::disp_unit(){
146 void Win::set_info(MPI_Info info){
147 if(info_!= MPI_INFO_NULL)
152 void Win::set_name(char* name){
153 name_ = xbt_strdup(name);
156 int Win::fence(int assert)
158 XBT_DEBUG("Entering fence");
161 if (assert != MPI_MODE_NOPRECEDE) {
162 // This is not the first fence => finalize what came before
163 MSG_barrier_wait(bar_);
164 xbt_mutex_acquire(mut_);
165 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
166 // Without this, the vector could get redimensionned when another process pushes.
167 // This would result in the array used by Request::waitall() to be invalidated.
168 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
169 std::vector<MPI_Request> *reqs = requests_;
170 int size = static_cast<int>(reqs->size());
171 // start all requests that have been prepared by another process
173 MPI_Request* treqs = &(*reqs)[0];
174 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
177 xbt_mutex_release(mut_);
180 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
184 MSG_barrier_wait(bar_);
185 XBT_DEBUG("Leaving fence");
190 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
191 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
193 //get receiver pointer
194 MPI_Win recv_win = connected_wins_[target_rank];
196 if(opened_==0){//check that post/start has been done
197 // no fence or start .. lock ok ?
199 for (auto const& it : recv_win->lockers_)
200 if (it == comm_->rank())
206 if(target_count*target_datatype->get_extent()>recv_win->size_)
209 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
210 XBT_DEBUG("Entering MPI_Put to %d", target_rank);
212 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
213 // prepare send_request
215 // TODO cheinrich Check for rank / pid conversion
216 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
219 //prepare receiver request
220 // TODO cheinrich Check for rank / pid conversion
221 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
222 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
227 if(request!=nullptr){
230 xbt_mutex_acquire(mut_);
231 requests_->push_back(sreq);
232 xbt_mutex_release(mut_);
235 //push request to receiver's win
236 xbt_mutex_acquire(recv_win->mut_);
237 recv_win->requests_->push_back(rreq);
239 xbt_mutex_release(recv_win->mut_);
242 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
244 *request = MPI_REQUEST_NULL;
250 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
251 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
254 MPI_Win send_win = connected_wins_[target_rank];
256 if(opened_==0){//check that post/start has been done
257 // no fence or start .. lock ok ?
259 for (auto const& it : send_win->lockers_)
260 if (it == comm_->rank())
266 if(target_count*target_datatype->get_extent()>send_win->size_)
269 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
270 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
272 if(target_rank != comm_->rank()){
273 //prepare send_request
274 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
275 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
277 //prepare receiver request
278 MPI_Request rreq = Request::rma_recv_init(
279 origin_addr, origin_count, origin_datatype, target_rank,
280 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
281 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
283 //start the send, with another process than us as sender.
285 //push request to receiver's win
286 xbt_mutex_acquire(send_win->mut_);
287 send_win->requests_->push_back(sreq);
288 xbt_mutex_release(send_win->mut_);
293 if(request!=nullptr){
296 xbt_mutex_acquire(mut_);
297 requests_->push_back(rreq);
298 xbt_mutex_release(mut_);
302 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
304 *request=MPI_REQUEST_NULL;
311 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
312 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
314 XBT_DEBUG("Entering MPI_Win_Accumulate");
315 //get receiver pointer
316 MPI_Win recv_win = connected_wins_[target_rank];
318 if(opened_==0){//check that post/start has been done
319 // no fence or start .. lock ok ?
321 for (auto const& it : recv_win->lockers_)
322 if (it == comm_->rank())
327 //FIXME: local version
329 if(target_count*target_datatype->get_extent()>recv_win->size_)
332 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
333 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
334 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
335 //prepare send_request
337 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
338 SMPI_RMA_TAG - 3 - count_, comm_, op);
340 // prepare receiver request
341 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
342 target_rank, SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
348 // push request to receiver's win
349 xbt_mutex_acquire(recv_win->mut_);
350 recv_win->requests_->push_back(rreq);
352 xbt_mutex_release(recv_win->mut_);
354 if (request != nullptr) {
357 xbt_mutex_acquire(mut_);
358 requests_->push_back(sreq);
359 xbt_mutex_release(mut_);
362 XBT_DEBUG("Leaving MPI_Win_Accumulate");
366 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
367 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
368 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
371 MPI_Win send_win = connected_wins_[target_rank];
373 if(opened_==0){//check that post/start has been done
374 // no fence or start .. lock ok ?
376 for (auto const& it : send_win->lockers_)
377 if (it == comm_->rank())
383 if(target_count*target_datatype->get_extent()>send_win->size_)
386 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
387 //need to be sure ops are correctly ordered, so finish request here ? slow.
389 xbt_mutex_acquire(send_win->atomic_mut_);
390 get(result_addr, result_count, result_datatype, target_rank,
391 target_disp, target_count, target_datatype, &req);
392 if (req != MPI_REQUEST_NULL)
393 Request::wait(&req, MPI_STATUS_IGNORE);
395 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
396 target_disp, target_count, target_datatype, op, &req);
397 if (req != MPI_REQUEST_NULL)
398 Request::wait(&req, MPI_STATUS_IGNORE);
399 xbt_mutex_release(send_win->atomic_mut_);
404 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
405 void *result_addr, MPI_Datatype datatype, int target_rank,
406 MPI_Aint target_disp){
408 MPI_Win send_win = connected_wins_[target_rank];
410 if(opened_==0){//check that post/start has been done
411 // no fence or start .. lock ok ?
413 for (auto const& it : send_win->lockers_)
414 if (it == comm_->rank())
420 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
421 MPI_Request req = MPI_REQUEST_NULL;
422 xbt_mutex_acquire(send_win->atomic_mut_);
423 get(result_addr, 1, datatype, target_rank,
424 target_disp, 1, datatype, &req);
425 if (req != MPI_REQUEST_NULL)
426 Request::wait(&req, MPI_STATUS_IGNORE);
427 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
428 put(origin_addr, 1, datatype, target_rank,
429 target_disp, 1, datatype);
431 xbt_mutex_release(send_win->atomic_mut_);
435 int Win::start(MPI_Group group, int assert){
436 /* From MPI forum advices
437 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
438 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
439 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
440 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
441 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
442 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
443 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
444 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
445 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
446 must complete, without further dependencies. */
448 //naive, blocking implementation.
451 int size = group->size();
452 MPI_Request* reqs = xbt_new0(MPI_Request, size);
454 XBT_DEBUG("Entering MPI_Win_Start");
456 int src = comm_->group()->rank(group->actor(j));
457 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
458 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
464 Request::startall(size, reqs);
465 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
466 for (i = 0; i < size; i++) {
467 Request::unref(&reqs[i]);
470 opened_++; //we're open for business !
473 XBT_DEBUG("Leaving MPI_Win_Start");
477 int Win::post(MPI_Group group, int assert){
478 //let's make a synchronous send here
481 int size = group->size();
482 MPI_Request* reqs = xbt_new0(MPI_Request, size);
484 XBT_DEBUG("Entering MPI_Win_Post");
486 int dst = comm_->group()->rank(group->actor(j));
487 if (dst != rank_ && dst != MPI_UNDEFINED) {
488 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
495 Request::startall(size, reqs);
496 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
498 Request::unref(&reqs[i]);
501 opened_++; //we're open for business !
504 XBT_DEBUG("Leaving MPI_Win_Post");
510 xbt_die("Complete called on already opened MPI_Win");
512 XBT_DEBUG("Entering MPI_Win_Complete");
515 int size = group_->size();
516 MPI_Request* reqs = xbt_new0(MPI_Request, size);
519 int dst = comm_->group()->rank(group_->actor(j));
520 if (dst != rank_ && dst != MPI_UNDEFINED) {
521 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
527 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
528 Request::startall(size, reqs);
529 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
532 Request::unref(&reqs[i]);
536 int finished = finish_comms();
537 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
539 Group::unref(group_);
540 opened_--; //we're closed for business !
545 //naive, blocking implementation.
546 XBT_DEBUG("Entering MPI_Win_Wait");
549 int size = group_->size();
550 MPI_Request* reqs = xbt_new0(MPI_Request, size);
553 int src = comm_->group()->rank(group_->actor(j));
554 if (src != rank_ && src != MPI_UNDEFINED) {
555 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
561 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
562 Request::startall(size, reqs);
563 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
565 Request::unref(&reqs[i]);
568 int finished = finish_comms();
569 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
571 Group::unref(group_);
572 opened_--; //we're opened for business !
576 int Win::lock(int lock_type, int rank, int assert){
577 MPI_Win target_win = connected_wins_[rank];
579 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
580 xbt_mutex_acquire(target_win->lock_mut_);
581 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
582 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
583 xbt_mutex_release(target_win->lock_mut_);
585 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
586 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
588 target_win->lockers_.push_back(comm_->rank());
590 int finished = finish_comms(rank);
591 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
592 finished = target_win->finish_comms(rank_);
593 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
597 int Win::lock_all(int assert){
599 int retval = MPI_SUCCESS;
600 for (i=0; i<comm_->size();i++){
601 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
602 if(ret != MPI_SUCCESS)
608 int Win::unlock(int rank){
609 MPI_Win target_win = connected_wins_[rank];
610 int target_mode = target_win->mode_;
611 target_win->mode_= 0;
612 target_win->lockers_.remove(comm_->rank());
613 if (target_mode==MPI_LOCK_EXCLUSIVE){
614 xbt_mutex_release(target_win->lock_mut_);
617 int finished = finish_comms(rank);
618 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
619 finished = target_win->finish_comms(rank_);
620 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
624 int Win::unlock_all(){
626 int retval = MPI_SUCCESS;
627 for (i=0; i<comm_->size();i++){
628 int ret = this->unlock(i);
629 if(ret != MPI_SUCCESS)
635 int Win::flush(int rank){
636 MPI_Win target_win = connected_wins_[rank];
637 int finished = finish_comms(rank);
638 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
639 finished = target_win->finish_comms(rank_);
640 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
644 int Win::flush_local(int rank){
645 int finished = finish_comms(rank);
646 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
650 int Win::flush_all(){
653 finished = finish_comms();
654 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
655 for (i=0; i<comm_->size();i++){
656 finished = connected_wins_[i]->finish_comms(rank_);
657 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
662 int Win::flush_local_all(){
663 int finished = finish_comms();
664 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
668 Win* Win::f2c(int id){
669 return static_cast<Win*>(F2C::f2c(id));
673 int Win::finish_comms(){
674 xbt_mutex_acquire(mut_);
675 //Finish own requests
676 std::vector<MPI_Request> *reqqs = requests_;
677 int size = static_cast<int>(reqqs->size());
679 MPI_Request* treqs = &(*reqqs)[0];
680 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
683 xbt_mutex_release(mut_);
687 int Win::finish_comms(int rank){
688 xbt_mutex_acquire(mut_);
689 //Finish own requests
690 std::vector<MPI_Request> *reqqs = requests_;
691 int size = static_cast<int>(reqqs->size());
694 std::vector<MPI_Request> myreqqs;
695 std::vector<MPI_Request>::iterator iter = reqqs->begin();
696 while (iter != reqqs->end()){
697 if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
698 myreqqs.push_back(*iter);
699 iter = reqqs->erase(iter);
706 MPI_Request* treqs = &myreqqs[0];
707 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
711 xbt_mutex_release(mut_);