Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Change "if(...) xbt_die(...)" to "xbt_assert(...)".
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21
22 namespace simgrid{
23 namespace smpi{
24 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
25 int Win::keyval_id_=0;
26
27 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
28     : base_(base)
29     , size_(size)
30     , disp_unit_(disp_unit)
31     , info_(info)
32     , comm_(comm)
33     , connected_wins_(comm->size())
34     , rank_(comm->rank())
35     , allocated_(allocated)
36     , dynamic_(dynamic)
37 {
38   XBT_DEBUG("Creating window");
39   if(info!=MPI_INFO_NULL)
40     info->ref();
41   connected_wins_[rank_] = this;
42   if(rank_==0){
43     bar_ = new s4u::Barrier(comm->size());
44   }
45   errhandler_->ref();
46   comm->add_rma_win(this);
47   comm->ref();
48
49   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
50                    MPI_BYTE, comm);
51
52   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
53
54   colls::barrier(comm);
55   this->add_f();
56 }
57
58 Win::~Win(){
59   //As per the standard, perform a barrier to ensure every async comm is finished
60   bar_->wait();
61
62   int finished = finish_comms();
63   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
64
65   if (info_ != MPI_INFO_NULL)
66     simgrid::smpi::Info::unref(info_);
67   if (errhandler_ != MPI_ERRHANDLER_NULL)
68     simgrid::smpi::Errhandler::unref(errhandler_);
69
70   comm_->remove_rma_win(this);
71
72   colls::barrier(comm_);
73   Comm::unref(comm_);
74   
75   if (rank_ == 0)
76     delete bar_;
77
78   if(allocated_ !=0)
79     xbt_free(base_);
80
81   F2C::free_f(this->c2f());
82   cleanup_attr<Win>();
83 }
84
85 int Win::attach(void* /*base*/, MPI_Aint size)
86 {
87   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
88     return MPI_ERR_ARG;
89   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
90   size_+=size;
91   return MPI_SUCCESS;
92 }
93
94 int Win::detach(const void* /*base*/)
95 {
96   base_=MPI_BOTTOM;
97   size_=-1;
98   return MPI_SUCCESS;
99 }
100
101 void Win::get_name(char* name, int* length) const
102 {
103   *length = static_cast<int>(name_.length());
104   if (not name_.empty()) {
105     name_.copy(name, *length);
106     name[*length] = '\0';
107   }
108 }
109
110 void Win::get_group(MPI_Group* group){
111   if(comm_ != MPI_COMM_NULL){
112     *group = comm_->group();
113   } else {
114     *group = MPI_GROUP_NULL;
115   }
116 }
117
118 MPI_Info Win::info()
119 {
120   if (info_ == MPI_INFO_NULL)
121     info_ = new Info();
122   info_->ref();
123   return info_;
124 }
125
126 int Win::rank() const
127 {
128   return rank_;
129 }
130
131 MPI_Aint Win::size() const
132 {
133   return size_;
134 }
135
136 void* Win::base() const
137 {
138   return base_;
139 }
140
141 int Win::disp_unit() const
142 {
143   return disp_unit_;
144 }
145
146 int Win::dynamic() const
147 {
148   return dynamic_;
149 }
150
151 void Win::set_info(MPI_Info info)
152 {
153   if (info_ != MPI_INFO_NULL)
154     simgrid::smpi::Info::unref(info_);
155   info_ = info;
156   if (info_ != MPI_INFO_NULL)
157     info_->ref();
158 }
159
160 void Win::set_name(const char* name){
161   name_ = name;
162 }
163
164 int Win::fence(int assert)
165 {
166   XBT_DEBUG("Entering fence");
167   if (opened_ == 0)
168     opened_=1;
169   if (assert != MPI_MODE_NOPRECEDE) {
170     // This is not the first fence => finalize what came before
171     bar_->wait();
172     mut_->lock();
173     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
174     // Without this, the vector could get redimensioned when another process pushes.
175     // This would result in the array used by Request::waitall() to be invalidated.
176     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
177
178     // start all requests that have been prepared by another process
179     if (not requests_.empty()) {
180       int size           = static_cast<int>(requests_.size());
181       MPI_Request* treqs = requests_.data();
182       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
183     }
184     count_=0;
185     mut_->unlock();
186   }
187
188   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
189     opened_=0;
190   assert_ = assert;
191
192   bar_->wait();
193   XBT_DEBUG("Leaving fence");
194
195   return MPI_SUCCESS;
196 }
197
198 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
199               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
200 {
201   //get receiver pointer
202   Win* recv_win = connected_wins_[target_rank];
203
204   if(opened_==0){//check that post/start has been done
205     // no fence or start .. lock ok ?
206     int locked=0;
207     for (auto const& it : recv_win->lockers_)
208       if (it == comm_->rank())
209         locked = 1;
210     if(locked != 1)
211       return MPI_ERR_WIN;
212   }
213
214   if(target_count*target_datatype->get_extent()>recv_win->size_){
215     XBT_WARN("Trying to put more than the window size - Bailing out.");
216     return MPI_ERR_RMA_RANGE;
217   }
218
219   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
220
221   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
222     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
223     // prepare send_request
224     MPI_Request sreq =
225         // TODO cheinrich Check for rank / pid conversion
226         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
227                                comm_, MPI_OP_NULL);
228
229     //prepare receiver request
230     // TODO cheinrich Check for rank / pid conversion
231     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
232                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
233
234     //start send
235     sreq->start();
236
237     if(request!=nullptr){
238       *request=sreq;
239     }else{
240       mut_->lock();
241       requests_.push_back(sreq);
242       mut_->unlock();
243     }
244
245     //push request to receiver's win
246     recv_win->mut_->lock();
247     recv_win->requests_.push_back(rreq);
248     rreq->start();
249     recv_win->mut_->unlock();
250   } else {
251     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
252     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
253     if(request!=nullptr)
254       *request = MPI_REQUEST_NULL;
255   }
256
257   return MPI_SUCCESS;
258 }
259
260 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
261               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
262 {
263   //get sender pointer
264   Win* send_win = connected_wins_[target_rank];
265
266   if(opened_==0){//check that post/start has been done
267     // no fence or start .. lock ok ?
268     int locked=0;
269     for (auto const& it : send_win->lockers_)
270       if (it == comm_->rank())
271         locked = 1;
272     if(locked != 1)
273       return MPI_ERR_WIN;
274   }
275
276   if(target_count*target_datatype->get_extent()>send_win->size_){
277     XBT_WARN("Trying to get more than the window size - Bailing out.");
278     return MPI_ERR_RMA_RANGE;
279   }
280
281   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
282   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
283
284   if(target_rank != comm_->rank()){
285     //prepare send_request
286     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
287                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
288
289     //prepare receiver request
290     MPI_Request rreq = Request::rma_recv_init(
291         origin_addr, origin_count, origin_datatype, target_rank,
292         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
293         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
294
295     //start the send, with another process than us as sender.
296     sreq->start();
297     //push request to receiver's win
298     send_win->mut_->lock();
299     send_win->requests_.push_back(sreq);
300     send_win->mut_->unlock();
301
302     //start recv
303     rreq->start();
304
305     if(request!=nullptr){
306       *request=rreq;
307     }else{
308       mut_->lock();
309       requests_.push_back(rreq);
310       mut_->unlock();
311     }
312   } else {
313     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
314     if(request!=nullptr)
315       *request=MPI_REQUEST_NULL;
316   }
317   return MPI_SUCCESS;
318 }
319
320 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
321               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
322 {
323   XBT_DEBUG("Entering MPI_Win_Accumulate");
324   //get receiver pointer
325   Win* recv_win = connected_wins_[target_rank];
326
327   if(opened_==0){//check that post/start has been done
328     // no fence or start .. lock ok ?
329     int locked=0;
330     for (auto const& it : recv_win->lockers_)
331       if (it == comm_->rank())
332         locked = 1;
333     if(locked != 1)
334       return MPI_ERR_WIN;
335   }
336   //FIXME: local version
337
338   if(target_count*target_datatype->get_extent()>recv_win->size_){
339     XBT_WARN("Trying to accumulate more than the window size - Bailing out.");
340     return MPI_ERR_RMA_RANGE;
341   }
342
343   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
344   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
345   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
346   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
347   // prepare send_request
348
349   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
350                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
351
352   // prepare receiver request
353   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
354                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
355
356   count_++;
357
358   // start send
359   sreq->start();
360   // push request to receiver's win
361   recv_win->mut_->lock();
362   recv_win->requests_.push_back(rreq);
363   rreq->start();
364   recv_win->mut_->unlock();
365
366   if (request != nullptr) {
367     *request = sreq;
368   } else {
369     mut_->lock();
370     requests_.push_back(sreq);
371     mut_->unlock();
372   }
373
374   XBT_DEBUG("Leaving MPI_Win_Accumulate");
375   return MPI_SUCCESS;
376 }
377
378 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
379                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
380                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
381 {
382   //get sender pointer
383   const Win* send_win = connected_wins_[target_rank];
384
385   if(opened_==0){//check that post/start has been done
386     // no fence or start .. lock ok ?
387     int locked=0;
388     for (auto const& it : send_win->lockers_)
389       if (it == comm_->rank())
390         locked = 1;
391     if(locked != 1)
392       return MPI_ERR_WIN;
393   }
394
395   if(target_count*target_datatype->get_extent()>send_win->size_){
396     XBT_WARN("Trying to get_accumulate more than the window size - Bailing out.");
397     return MPI_ERR_RMA_RANGE;
398   }
399
400
401   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
402   //need to be sure ops are correctly ordered, so finish request here ? slow.
403   MPI_Request req;
404   send_win->atomic_mut_->lock();
405   get(result_addr, result_count, result_datatype, target_rank,
406               target_disp, target_count, target_datatype, &req);
407   if (req != MPI_REQUEST_NULL)
408     Request::wait(&req, MPI_STATUS_IGNORE);
409   if(op!=MPI_NO_OP)
410     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
411               target_disp, target_count, target_datatype, op, &req);
412   if (req != MPI_REQUEST_NULL)
413     Request::wait(&req, MPI_STATUS_IGNORE);
414   send_win->atomic_mut_->unlock();
415   return MPI_SUCCESS;
416 }
417
418 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
419                           int target_rank, MPI_Aint target_disp)
420 {
421   //get sender pointer
422   const Win* send_win = connected_wins_[target_rank];
423
424   if(opened_==0){//check that post/start has been done
425     // no fence or start .. lock ok ?
426     int locked=0;
427     for (auto const& it : send_win->lockers_)
428       if (it == comm_->rank())
429         locked = 1;
430     if(locked != 1)
431       return MPI_ERR_WIN;
432   }
433
434   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
435   MPI_Request req = MPI_REQUEST_NULL;
436   send_win->atomic_mut_->lock();
437   get(result_addr, 1, datatype, target_rank,
438               target_disp, 1, datatype, &req);
439   if (req != MPI_REQUEST_NULL)
440     Request::wait(&req, MPI_STATUS_IGNORE);
441   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
442     put(origin_addr, 1, datatype, target_rank,
443               target_disp, 1, datatype);
444   }
445   send_win->atomic_mut_->unlock();
446   return MPI_SUCCESS;
447 }
448
449 int Win::start(MPI_Group group, int /*assert*/)
450 {
451   /* From MPI forum advices
452   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
453   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
454   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
455   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
456   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
457   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
458   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
459   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
460   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
461   must complete, without further dependencies.  */
462
463   //naive, blocking implementation.
464   int i             = 0;
465   int j             = 0;
466   int size          = group->size();
467   std::vector<MPI_Request> reqs(size);
468
469   XBT_DEBUG("Entering MPI_Win_Start");
470   while (j != size) {
471     int src = comm_->group()->rank(group->actor(j));
472     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
473       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
474       i++;
475     }
476     j++;
477   }
478   size = i;
479   Request::startall(size, reqs.data());
480   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
481   for (i = 0; i < size; i++) {
482     Request::unref(&reqs[i]);
483   }
484   opened_++; //we're open for business !
485   group_=group;
486   group->ref();
487   XBT_DEBUG("Leaving MPI_Win_Start");
488   return MPI_SUCCESS;
489 }
490
491 int Win::post(MPI_Group group, int /*assert*/)
492 {
493   //let's make a synchronous send here
494   int i             = 0;
495   int j             = 0;
496   int size = group->size();
497   std::vector<MPI_Request> reqs(size);
498
499   XBT_DEBUG("Entering MPI_Win_Post");
500   while(j!=size){
501     int dst = comm_->group()->rank(group->actor(j));
502     if (dst != rank_ && dst != MPI_UNDEFINED) {
503       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
504       i++;
505     }
506     j++;
507   }
508   size=i;
509
510   Request::startall(size, reqs.data());
511   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
512   for(i=0;i<size;i++){
513     Request::unref(&reqs[i]);
514   }
515   opened_++; //we're open for business !
516   group_=group;
517   group->ref();
518   XBT_DEBUG("Leaving MPI_Win_Post");
519   return MPI_SUCCESS;
520 }
521
522 int Win::complete(){
523   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
524
525   XBT_DEBUG("Entering MPI_Win_Complete");
526   int i             = 0;
527   int j             = 0;
528   int size          = group_->size();
529   std::vector<MPI_Request> reqs(size);
530
531   while(j!=size){
532     int dst = comm_->group()->rank(group_->actor(j));
533     if (dst != rank_ && dst != MPI_UNDEFINED) {
534       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
535       i++;
536     }
537     j++;
538   }
539   size=i;
540   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
541   Request::startall(size, reqs.data());
542   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
543
544   for(i=0;i<size;i++){
545     Request::unref(&reqs[i]);
546   }
547
548   int finished = finish_comms();
549   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
550
551   Group::unref(group_);
552   opened_--; //we're closed for business !
553   return MPI_SUCCESS;
554 }
555
556 int Win::wait(){
557   //naive, blocking implementation.
558   XBT_DEBUG("Entering MPI_Win_Wait");
559   int i             = 0;
560   int j             = 0;
561   int size          = group_->size();
562   std::vector<MPI_Request> reqs(size);
563
564   while(j!=size){
565     int src = comm_->group()->rank(group_->actor(j));
566     if (src != rank_ && src != MPI_UNDEFINED) {
567       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
568       i++;
569     }
570     j++;
571   }
572   size=i;
573   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
574   Request::startall(size, reqs.data());
575   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
576   for(i=0;i<size;i++){
577     Request::unref(&reqs[i]);
578   }
579   int finished = finish_comms();
580   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
581
582   Group::unref(group_);
583   opened_--; //we're opened for business !
584   return MPI_SUCCESS;
585 }
586
587 int Win::lock(int lock_type, int rank, int /*assert*/)
588 {
589   MPI_Win target_win = connected_wins_[rank];
590
591   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
592     target_win->lock_mut_->lock();
593     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
594     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
595       target_win->lock_mut_->unlock();
596    }
597   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
598     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
599
600   target_win->lockers_.push_back(comm_->rank());
601
602   int finished = finish_comms(rank);
603   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
604   finished = target_win->finish_comms(rank_);
605   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
606   return MPI_SUCCESS;
607 }
608
609 int Win::lock_all(int assert){
610   int retval = MPI_SUCCESS;
611   for (int i = 0; i < comm_->size(); i++) {
612     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
613     if (ret != MPI_SUCCESS)
614       retval = ret;
615   }
616   return retval;
617 }
618
619 int Win::unlock(int rank){
620   MPI_Win target_win = connected_wins_[rank];
621   int target_mode = target_win->mode_;
622   target_win->mode_= 0;
623   target_win->lockers_.remove(comm_->rank());
624   if (target_mode==MPI_LOCK_EXCLUSIVE){
625     target_win->lock_mut_->unlock();
626   }
627
628   int finished = finish_comms(rank);
629   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
630   finished = target_win->finish_comms(rank_);
631   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
632   return MPI_SUCCESS;
633 }
634
635 int Win::unlock_all(){
636   int retval = MPI_SUCCESS;
637   for (int i = 0; i < comm_->size(); i++) {
638     int ret = this->unlock(i);
639     if (ret != MPI_SUCCESS)
640       retval = ret;
641   }
642   return retval;
643 }
644
645 int Win::flush(int rank){
646   MPI_Win target_win = connected_wins_[rank];
647   int finished       = finish_comms(rank_);
648   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
649   finished = target_win->finish_comms(rank);
650   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
651   return MPI_SUCCESS;
652 }
653
654 int Win::flush_local(int rank){
655   int finished = finish_comms(rank);
656   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
657   return MPI_SUCCESS;
658 }
659
660 int Win::flush_all(){
661   int finished = finish_comms();
662   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
663   for (int i = 0; i < comm_->size(); i++) {
664     finished = connected_wins_[i]->finish_comms(rank_);
665     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
666   }
667   return MPI_SUCCESS;
668 }
669
670 int Win::flush_local_all(){
671   int finished = finish_comms();
672   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
673   return MPI_SUCCESS;
674 }
675
676 Win* Win::f2c(int id){
677   return static_cast<Win*>(F2C::f2c(id));
678 }
679
680 int Win::finish_comms(){
681   mut_->lock();
682   //Finish own requests
683   int size = static_cast<int>(requests_.size());
684   if (size > 0) {
685     MPI_Request* treqs = requests_.data();
686     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
687     requests_.clear();
688   }
689   mut_->unlock();
690   return size;
691 }
692
693 int Win::finish_comms(int rank){
694   mut_->lock();
695   // Finish own requests
696   // Let's see if we're either the destination or the sender of this request
697   // because we only wait for requests that we are responsible for.
698   // Also use the process id here since the request itself returns from src()
699   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
700   int proc_id = comm_->group()->actor(rank)->get_pid();
701   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
702     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
703   });
704   std::vector<MPI_Request> myreqqs(it, end(requests_));
705   requests_.erase(it, end(requests_));
706   int size = static_cast<int>(myreqqs.size());
707   if (size > 0) {
708     MPI_Request* treqs = myreqqs.data();
709     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
710     myreqqs.clear();
711   }
712   mut_->unlock();
713   return size;
714 }
715
716 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
717 {
718   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
719   for (int i = 0; not target_win && i < comm_->size(); i++) {
720     if (connected_wins_[i]->size_ > 0)
721       target_win = connected_wins_[i];
722   }
723   if (target_win) {
724     *size                         = target_win->size_;
725     *disp_unit                    = target_win->disp_unit_;
726     *static_cast<void**>(baseptr) = target_win->base_;
727   } else {
728     *size                         = 0;
729     *static_cast<void**>(baseptr) = nullptr;
730   }
731   return MPI_SUCCESS;
732 }
733
734 MPI_Errhandler Win::errhandler()
735 {
736   if (errhandler_ != MPI_ERRHANDLER_NULL)
737     errhandler_->ref();
738   return errhandler_;
739 }
740
741 void Win::set_errhandler(MPI_Errhandler errhandler)
742 {
743   if (errhandler_ != MPI_ERRHANDLER_NULL)
744     simgrid::smpi::Errhandler::unref(errhandler_);
745   errhandler_ = errhandler;
746   if (errhandler_ != MPI_ERRHANDLER_NULL)
747     errhandler_->ref();
748 }
749 } // namespace smpi
750 } // namespace simgrid