1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_win.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18 using simgrid::s4u::Actor;
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26 int comm_size = comm->size();
28 XBT_DEBUG("Creating window");
29 if(info!=MPI_INFO_NULL)
33 group_ = MPI_GROUP_NULL;
34 requests_ = new std::vector<MPI_Request>();
35 mut_ = xbt_mutex_init();
36 lock_mut_ = xbt_mutex_init();
37 atomic_mut_ = xbt_mutex_init();
38 connected_wins_ = new MPI_Win[comm_size];
39 connected_wins_[rank_] = this;
42 bar_ = MSG_barrier_init(comm_size);
46 comm->add_rma_win(this);
48 Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
51 Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
57 //As per the standard, perform a barrier to ensure every async comm is finished
58 MSG_barrier_wait(bar_);
60 int finished = finish_comms();
61 XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64 delete[] connected_wins_;
65 if (name_ != nullptr){
68 if(info_!=MPI_INFO_NULL){
69 MPI_Info_free(&info_);
72 comm_->remove_rma_win(this);
74 Colls::barrier(comm_);
76 MSG_barrier_destroy(bar_);
77 xbt_mutex_destroy(mut_);
78 xbt_mutex_destroy(lock_mut_);
79 xbt_mutex_destroy(atomic_mut_);
87 int Win::attach (void *base, MPI_Aint size){
88 if (not(base_ == MPI_BOTTOM || base_ == 0))
90 base_=0;//actually the address will be given in the RMA calls, as being the disp.
95 int Win::detach (void *base){
101 void Win::get_name(char* name, int* length){
107 *length = strlen(name_);
108 strncpy(name, name_, *length+1);
111 void Win::get_group(MPI_Group* group){
112 if(comm_ != MPI_COMM_NULL){
113 *group = comm_->group();
115 *group = MPI_GROUP_NULL;
119 MPI_Info Win::info(){
120 if(info_== MPI_INFO_NULL)
130 MPI_Aint Win::size(){
138 int Win::disp_unit(){
146 void Win::set_info(MPI_Info info){
147 if(info_!= MPI_INFO_NULL)
152 void Win::set_name(char* name){
153 name_ = xbt_strdup(name);
156 int Win::fence(int assert)
158 XBT_DEBUG("Entering fence");
161 if (assert != MPI_MODE_NOPRECEDE) {
162 // This is not the first fence => finalize what came before
163 MSG_barrier_wait(bar_);
164 xbt_mutex_acquire(mut_);
165 // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
166 // Without this, the vector could get redimensionned when another process pushes.
167 // This would result in the array used by Request::waitall() to be invalidated.
168 // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
169 std::vector<MPI_Request> *reqs = requests_;
170 int size = static_cast<int>(reqs->size());
171 // start all requests that have been prepared by another process
173 MPI_Request* treqs = &(*reqs)[0];
174 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
177 xbt_mutex_release(mut_);
180 if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
184 MSG_barrier_wait(bar_);
185 XBT_DEBUG("Leaving fence");
190 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
191 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
193 //get receiver pointer
194 MPI_Win recv_win = connected_wins_[target_rank];
196 if(opened_==0){//check that post/start has been done
197 // no fence or start .. lock ok ?
199 for (auto const& it : recv_win->lockers_)
200 if (it == comm_->rank())
206 if(target_count*target_datatype->get_extent()>recv_win->size_)
209 void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
211 if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
212 XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
213 // prepare send_request
215 // TODO cheinrich Check for rank / pid conversion
216 Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
219 //prepare receiver request
220 // TODO cheinrich Check for rank / pid conversion
221 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
222 target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
227 if(request!=nullptr){
230 xbt_mutex_acquire(mut_);
231 requests_->push_back(sreq);
232 xbt_mutex_release(mut_);
235 //push request to receiver's win
236 xbt_mutex_acquire(recv_win->mut_);
237 recv_win->requests_->push_back(rreq);
239 xbt_mutex_release(recv_win->mut_);
242 XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
243 Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
245 *request = MPI_REQUEST_NULL;
251 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
252 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
255 MPI_Win send_win = connected_wins_[target_rank];
257 if(opened_==0){//check that post/start has been done
258 // no fence or start .. lock ok ?
260 for (auto const& it : send_win->lockers_)
261 if (it == comm_->rank())
267 if(target_count*target_datatype->get_extent()>send_win->size_)
270 void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
271 XBT_DEBUG("Entering MPI_Get from %d", target_rank);
273 if(target_rank != comm_->rank()){
274 //prepare send_request
275 MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
276 send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
278 //prepare receiver request
279 MPI_Request rreq = Request::rma_recv_init(
280 origin_addr, origin_count, origin_datatype, target_rank,
281 comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
282 SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
284 //start the send, with another process than us as sender.
286 //push request to receiver's win
287 xbt_mutex_acquire(send_win->mut_);
288 send_win->requests_->push_back(sreq);
289 xbt_mutex_release(send_win->mut_);
294 if(request!=nullptr){
297 xbt_mutex_acquire(mut_);
298 requests_->push_back(rreq);
299 xbt_mutex_release(mut_);
303 Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
305 *request=MPI_REQUEST_NULL;
312 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
313 MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
315 XBT_DEBUG("Entering MPI_Win_Accumulate");
316 //get receiver pointer
317 MPI_Win recv_win = connected_wins_[target_rank];
319 if(opened_==0){//check that post/start has been done
320 // no fence or start .. lock ok ?
322 for (auto const& it : recv_win->lockers_)
323 if (it == comm_->rank())
328 //FIXME: local version
330 if(target_count*target_datatype->get_extent()>recv_win->size_)
333 void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
334 XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
335 //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
336 //prepare send_request
338 MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
339 SMPI_RMA_TAG - 3 - count_, comm_, op);
341 // prepare receiver request
342 MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
343 recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
349 // push request to receiver's win
350 xbt_mutex_acquire(recv_win->mut_);
351 recv_win->requests_->push_back(rreq);
353 xbt_mutex_release(recv_win->mut_);
355 if (request != nullptr) {
358 xbt_mutex_acquire(mut_);
359 requests_->push_back(sreq);
360 xbt_mutex_release(mut_);
363 XBT_DEBUG("Leaving MPI_Win_Accumulate");
367 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
368 int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
369 MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
372 MPI_Win send_win = connected_wins_[target_rank];
374 if(opened_==0){//check that post/start has been done
375 // no fence or start .. lock ok ?
377 for (auto const& it : send_win->lockers_)
378 if (it == comm_->rank())
384 if(target_count*target_datatype->get_extent()>send_win->size_)
387 XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
388 //need to be sure ops are correctly ordered, so finish request here ? slow.
390 xbt_mutex_acquire(send_win->atomic_mut_);
391 get(result_addr, result_count, result_datatype, target_rank,
392 target_disp, target_count, target_datatype, &req);
393 if (req != MPI_REQUEST_NULL)
394 Request::wait(&req, MPI_STATUS_IGNORE);
396 accumulate(origin_addr, origin_count, origin_datatype, target_rank,
397 target_disp, target_count, target_datatype, op, &req);
398 if (req != MPI_REQUEST_NULL)
399 Request::wait(&req, MPI_STATUS_IGNORE);
400 xbt_mutex_release(send_win->atomic_mut_);
405 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
406 void *result_addr, MPI_Datatype datatype, int target_rank,
407 MPI_Aint target_disp){
409 MPI_Win send_win = connected_wins_[target_rank];
411 if(opened_==0){//check that post/start has been done
412 // no fence or start .. lock ok ?
414 for (auto const& it : send_win->lockers_)
415 if (it == comm_->rank())
421 XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
422 MPI_Request req = MPI_REQUEST_NULL;
423 xbt_mutex_acquire(send_win->atomic_mut_);
424 get(result_addr, 1, datatype, target_rank,
425 target_disp, 1, datatype, &req);
426 if (req != MPI_REQUEST_NULL)
427 Request::wait(&req, MPI_STATUS_IGNORE);
428 if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
429 put(origin_addr, 1, datatype, target_rank,
430 target_disp, 1, datatype);
432 xbt_mutex_release(send_win->atomic_mut_);
436 int Win::start(MPI_Group group, int assert){
437 /* From MPI forum advices
438 The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
439 will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
440 the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
441 matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
442 MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
443 implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
444 to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
445 called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
446 origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
447 must complete, without further dependencies. */
449 //naive, blocking implementation.
452 int size = group->size();
453 MPI_Request* reqs = xbt_new0(MPI_Request, size);
455 XBT_DEBUG("Entering MPI_Win_Start");
457 int src = comm_->group()->rank(group->actor(j));
458 if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
459 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
465 Request::startall(size, reqs);
466 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
467 for (i = 0; i < size; i++) {
468 Request::unref(&reqs[i]);
471 opened_++; //we're open for business !
474 XBT_DEBUG("Leaving MPI_Win_Start");
478 int Win::post(MPI_Group group, int assert){
479 //let's make a synchronous send here
482 int size = group->size();
483 MPI_Request* reqs = xbt_new0(MPI_Request, size);
485 XBT_DEBUG("Entering MPI_Win_Post");
487 int dst = comm_->group()->rank(group->actor(j));
488 if (dst != rank_ && dst != MPI_UNDEFINED) {
489 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
496 Request::startall(size, reqs);
497 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
499 Request::unref(&reqs[i]);
502 opened_++; //we're open for business !
505 XBT_DEBUG("Leaving MPI_Win_Post");
511 xbt_die("Complete called on already opened MPI_Win");
513 XBT_DEBUG("Entering MPI_Win_Complete");
516 int size = group_->size();
517 MPI_Request* reqs = xbt_new0(MPI_Request, size);
520 int dst = comm_->group()->rank(group_->actor(j));
521 if (dst != rank_ && dst != MPI_UNDEFINED) {
522 reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
528 XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
529 Request::startall(size, reqs);
530 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
533 Request::unref(&reqs[i]);
537 int finished = finish_comms();
538 XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
540 Group::unref(group_);
541 opened_--; //we're closed for business !
546 //naive, blocking implementation.
547 XBT_DEBUG("Entering MPI_Win_Wait");
550 int size = group_->size();
551 MPI_Request* reqs = xbt_new0(MPI_Request, size);
554 int src = comm_->group()->rank(group_->actor(j));
555 if (src != rank_ && src != MPI_UNDEFINED) {
556 reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
562 XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
563 Request::startall(size, reqs);
564 Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
566 Request::unref(&reqs[i]);
569 int finished = finish_comms();
570 XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
572 Group::unref(group_);
573 opened_--; //we're opened for business !
577 int Win::lock(int lock_type, int rank, int assert){
578 MPI_Win target_win = connected_wins_[rank];
580 if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
581 xbt_mutex_acquire(target_win->lock_mut_);
582 target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
583 if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
584 xbt_mutex_release(target_win->lock_mut_);
586 } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
587 target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
589 target_win->lockers_.push_back(comm_->rank());
591 int finished = finish_comms(rank);
592 XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
593 finished = target_win->finish_comms(rank_);
594 XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
598 int Win::lock_all(int assert){
600 int retval = MPI_SUCCESS;
601 for (i=0; i<comm_->size();i++){
602 int ret = this->lock(MPI_LOCK_SHARED, i, assert);
603 if(ret != MPI_SUCCESS)
609 int Win::unlock(int rank){
610 MPI_Win target_win = connected_wins_[rank];
611 int target_mode = target_win->mode_;
612 target_win->mode_= 0;
613 target_win->lockers_.remove(comm_->rank());
614 if (target_mode==MPI_LOCK_EXCLUSIVE){
615 xbt_mutex_release(target_win->lock_mut_);
618 int finished = finish_comms(rank);
619 XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
620 finished = target_win->finish_comms(rank_);
621 XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
625 int Win::unlock_all(){
627 int retval = MPI_SUCCESS;
628 for (i=0; i<comm_->size();i++){
629 int ret = this->unlock(i);
630 if(ret != MPI_SUCCESS)
636 int Win::flush(int rank){
637 MPI_Win target_win = connected_wins_[rank];
638 int finished = finish_comms(rank_);
639 XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
640 finished = target_win->finish_comms(rank);
641 XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
645 int Win::flush_local(int rank){
646 int finished = finish_comms(rank);
647 XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
651 int Win::flush_all(){
654 finished = finish_comms();
655 XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
656 for (i=0; i<comm_->size();i++){
657 finished = connected_wins_[i]->finish_comms(rank_);
658 XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
663 int Win::flush_local_all(){
664 int finished = finish_comms();
665 XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
669 Win* Win::f2c(int id){
670 return static_cast<Win*>(F2C::f2c(id));
674 int Win::finish_comms(){
675 xbt_mutex_acquire(mut_);
676 //Finish own requests
677 std::vector<MPI_Request> *reqqs = requests_;
678 int size = static_cast<int>(reqqs->size());
680 MPI_Request* treqs = &(*reqqs)[0];
681 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
684 xbt_mutex_release(mut_);
688 int Win::finish_comms(int rank){
689 xbt_mutex_acquire(mut_);
690 //Finish own requests
691 std::vector<MPI_Request> *reqqs = requests_;
692 int size = static_cast<int>(reqqs->size());
695 std::vector<MPI_Request> myreqqs;
696 std::vector<MPI_Request>::iterator iter = reqqs->begin();
697 int proc_id = comm_->group()->actor(rank)->getPid();
698 while (iter != reqqs->end()){
699 // Let's see if we're either the destination or the sender of this request
700 // because we only wait for requests that we are responsible for.
701 // Also use the process id here since the request itself returns from src()
702 // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
703 if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
704 myreqqs.push_back(*iter);
705 iter = reqqs->erase(iter);
712 MPI_Request* treqs = &myreqqs[0];
713 Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
717 xbt_mutex_release(mut_);