Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of https://framagit.org/simgrid/simgrid
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
26     : base_(base)
27     , size_(size)
28     , disp_unit_(disp_unit)
29     , info_(info)
30     , comm_(comm)
31     , rank_(comm->rank())
32     , allocated_(allocated)
33     , dynamic_(dynamic)
34 {
35   XBT_DEBUG("Creating window");
36   if(info!=MPI_INFO_NULL)
37     info->ref();
38   int comm_size          = comm->size();
39   name_                  = nullptr;
40   opened_                = 0;
41   group_                 = MPI_GROUP_NULL;
42   requests_              = new std::vector<MPI_Request>();
43   mut_                   = s4u::Mutex::create();
44   lock_mut_              = s4u::Mutex::create();
45   atomic_mut_            = s4u::Mutex::create();
46   connected_wins_        = new MPI_Win[comm_size];
47   connected_wins_[rank_] = this;
48   count_                 = 0;
49   if(rank_==0){
50     bar_ = new s4u::Barrier(comm_size);
51   }
52   mode_=0;
53   errhandler_=MPI_ERRORS_ARE_FATAL;
54   errhandler_->ref();
55   comm->add_rma_win(this);
56   comm->ref();
57
58   colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
59                    comm);
60
61   colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
62
63   colls::barrier(comm);
64 }
65
66 Win::~Win(){
67   //As per the standard, perform a barrier to ensure every async comm is finished
68   bar_->wait();
69
70   int finished = finish_comms();
71   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
72
73   delete requests_;
74   delete[] connected_wins_;
75   if (name_ != nullptr){
76     xbt_free(name_);
77   }
78   if (info_ != MPI_INFO_NULL)
79     simgrid::smpi::Info::unref(info_);
80   if (errhandler_ != MPI_ERRHANDLER_NULL)
81     simgrid::smpi::Errhandler::unref(errhandler_);
82
83   comm_->remove_rma_win(this);
84
85   colls::barrier(comm_);
86   Comm::unref(comm_);
87   
88   if (rank_ == 0)
89     delete bar_;
90
91   if(allocated_ !=0)
92     xbt_free(base_);
93
94   cleanup_attr<Win>();
95 }
96
97 int Win::attach(void* /*base*/, MPI_Aint size)
98 {
99   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
100     return MPI_ERR_ARG;
101   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
102   size_+=size;
103   return MPI_SUCCESS;
104 }
105
106 int Win::detach(const void* /*base*/)
107 {
108   base_=MPI_BOTTOM;
109   size_=-1;
110   return MPI_SUCCESS;
111 }
112
113 void Win::get_name(char* name, int* length) const
114 {
115   if(name_==nullptr){
116     *length=0;
117     name=nullptr;
118     return;
119   }
120   *length = strlen(name_);
121   strncpy(name, name_, *length+1);
122 }
123
124 void Win::get_group(MPI_Group* group){
125   if(comm_ != MPI_COMM_NULL){
126     *group = comm_->group();
127   } else {
128     *group = MPI_GROUP_NULL;
129   }
130 }
131
132 MPI_Info Win::info()
133 {
134   if (info_ == MPI_INFO_NULL)
135     info_ = new Info();
136   info_->ref();
137   return info_;
138 }
139
140 int Win::rank() const
141 {
142   return rank_;
143 }
144
145 MPI_Aint Win::size() const
146 {
147   return size_;
148 }
149
150 void* Win::base() const
151 {
152   return base_;
153 }
154
155 int Win::disp_unit() const
156 {
157   return disp_unit_;
158 }
159
160 int Win::dynamic() const
161 {
162   return dynamic_;
163 }
164
165 void Win::set_info(MPI_Info info)
166 {
167   if (info_ != MPI_INFO_NULL)
168     simgrid::smpi::Info::unref(info_);
169   info_ = info;
170   if (info_ != MPI_INFO_NULL)
171     info_->ref();
172 }
173
174 void Win::set_name(const char* name){
175   name_ = xbt_strdup(name);
176 }
177
178 int Win::fence(int assert)
179 {
180   XBT_DEBUG("Entering fence");
181   if (opened_ == 0)
182     opened_=1;
183   if (assert != MPI_MODE_NOPRECEDE) {
184     // This is not the first fence => finalize what came before
185     bar_->wait();
186     mut_->lock();
187     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
188     // Without this, the vector could get redimensioned when another process pushes.
189     // This would result in the array used by Request::waitall() to be invalidated.
190     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
191     std::vector<MPI_Request> *reqs = requests_;
192     int size = static_cast<int>(reqs->size());
193     // start all requests that have been prepared by another process
194     if (size > 0) {
195       MPI_Request* treqs = &(*reqs)[0];
196       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
197     }
198     count_=0;
199     mut_->unlock();
200   }
201
202   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
203     opened_=0;
204   assert_ = assert;
205
206   bar_->wait();
207   XBT_DEBUG("Leaving fence");
208
209   return MPI_SUCCESS;
210 }
211
212 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
213               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
214 {
215   //get receiver pointer
216   const Win* recv_win = connected_wins_[target_rank];
217
218   if(opened_==0){//check that post/start has been done
219     // no fence or start .. lock ok ?
220     int locked=0;
221     for (auto const& it : recv_win->lockers_)
222       if (it == comm_->rank())
223         locked = 1;
224     if(locked != 1)
225       return MPI_ERR_WIN;
226   }
227
228   if(target_count*target_datatype->get_extent()>recv_win->size_)
229     return MPI_ERR_ARG;
230
231   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
232
233   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
234     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
235     // prepare send_request
236     MPI_Request sreq =
237         // TODO cheinrich Check for rank / pid conversion
238         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
239                                comm_, MPI_OP_NULL);
240
241     //prepare receiver request
242     // TODO cheinrich Check for rank / pid conversion
243     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
244                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
245
246     //start send
247     sreq->start();
248
249     if(request!=nullptr){
250       *request=sreq;
251     }else{
252       mut_->lock();
253       requests_->push_back(sreq);
254       mut_->unlock();
255     }
256
257     //push request to receiver's win
258     recv_win->mut_->lock();
259     recv_win->requests_->push_back(rreq);
260     rreq->start();
261     recv_win->mut_->unlock();
262   } else {
263     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
264     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
265     if(request!=nullptr)
266       *request = MPI_REQUEST_NULL;
267   }
268
269   return MPI_SUCCESS;
270 }
271
272 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
273               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
274 {
275   //get sender pointer
276   const Win* send_win = connected_wins_[target_rank];
277
278   if(opened_==0){//check that post/start has been done
279     // no fence or start .. lock ok ?
280     int locked=0;
281     for (auto const& it : send_win->lockers_)
282       if (it == comm_->rank())
283         locked = 1;
284     if(locked != 1)
285       return MPI_ERR_WIN;
286   }
287
288   if(target_count*target_datatype->get_extent()>send_win->size_)
289     return MPI_ERR_ARG;
290
291   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
292   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
293
294   if(target_rank != comm_->rank()){
295     //prepare send_request
296     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
297                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
298
299     //prepare receiver request
300     MPI_Request rreq = Request::rma_recv_init(
301         origin_addr, origin_count, origin_datatype, target_rank,
302         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
303         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
304
305     //start the send, with another process than us as sender.
306     sreq->start();
307     //push request to receiver's win
308     send_win->mut_->lock();
309     send_win->requests_->push_back(sreq);
310     send_win->mut_->unlock();
311
312     //start recv
313     rreq->start();
314
315     if(request!=nullptr){
316       *request=rreq;
317     }else{
318       mut_->lock();
319       requests_->push_back(rreq);
320       mut_->unlock();
321     }
322   } else {
323     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
324     if(request!=nullptr)
325       *request=MPI_REQUEST_NULL;
326   }
327   return MPI_SUCCESS;
328 }
329
330 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
331               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
332 {
333   XBT_DEBUG("Entering MPI_Win_Accumulate");
334   //get receiver pointer
335   const Win* recv_win = connected_wins_[target_rank];
336
337   if(opened_==0){//check that post/start has been done
338     // no fence or start .. lock ok ?
339     int locked=0;
340     for (auto const& it : recv_win->lockers_)
341       if (it == comm_->rank())
342         locked = 1;
343     if(locked != 1)
344       return MPI_ERR_WIN;
345   }
346   //FIXME: local version
347
348   if(target_count*target_datatype->get_extent()>recv_win->size_)
349     return MPI_ERR_ARG;
350
351   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
352   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
353   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
354   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
355   // prepare send_request
356
357   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
358                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
359
360   // prepare receiver request
361   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
362                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
363
364   count_++;
365
366   // start send
367   sreq->start();
368   // push request to receiver's win
369   recv_win->mut_->lock();
370   recv_win->requests_->push_back(rreq);
371   rreq->start();
372   recv_win->mut_->unlock();
373
374   if (request != nullptr) {
375     *request = sreq;
376   } else {
377     mut_->lock();
378     requests_->push_back(sreq);
379     mut_->unlock();
380   }
381
382   XBT_DEBUG("Leaving MPI_Win_Accumulate");
383   return MPI_SUCCESS;
384 }
385
386 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
387                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
388                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
389 {
390   //get sender pointer
391   const Win* send_win = connected_wins_[target_rank];
392
393   if(opened_==0){//check that post/start has been done
394     // no fence or start .. lock ok ?
395     int locked=0;
396     for (auto const& it : send_win->lockers_)
397       if (it == comm_->rank())
398         locked = 1;
399     if(locked != 1)
400       return MPI_ERR_WIN;
401   }
402
403   if(target_count*target_datatype->get_extent()>send_win->size_)
404     return MPI_ERR_ARG;
405
406   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
407   //need to be sure ops are correctly ordered, so finish request here ? slow.
408   MPI_Request req;
409   send_win->atomic_mut_->lock();
410   get(result_addr, result_count, result_datatype, target_rank,
411               target_disp, target_count, target_datatype, &req);
412   if (req != MPI_REQUEST_NULL)
413     Request::wait(&req, MPI_STATUS_IGNORE);
414   if(op!=MPI_NO_OP)
415     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
416               target_disp, target_count, target_datatype, op, &req);
417   if (req != MPI_REQUEST_NULL)
418     Request::wait(&req, MPI_STATUS_IGNORE);
419   send_win->atomic_mut_->unlock();
420   return MPI_SUCCESS;
421 }
422
423 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
424                           int target_rank, MPI_Aint target_disp)
425 {
426   //get sender pointer
427   const Win* send_win = connected_wins_[target_rank];
428
429   if(opened_==0){//check that post/start has been done
430     // no fence or start .. lock ok ?
431     int locked=0;
432     for (auto const& it : send_win->lockers_)
433       if (it == comm_->rank())
434         locked = 1;
435     if(locked != 1)
436       return MPI_ERR_WIN;
437   }
438
439   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
440   MPI_Request req = MPI_REQUEST_NULL;
441   send_win->atomic_mut_->lock();
442   get(result_addr, 1, datatype, target_rank,
443               target_disp, 1, datatype, &req);
444   if (req != MPI_REQUEST_NULL)
445     Request::wait(&req, MPI_STATUS_IGNORE);
446   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
447     put(origin_addr, 1, datatype, target_rank,
448               target_disp, 1, datatype);
449   }
450   send_win->atomic_mut_->unlock();
451   return MPI_SUCCESS;
452 }
453
454 int Win::start(MPI_Group group, int /*assert*/)
455 {
456   /* From MPI forum advices
457   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
458   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
459   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
460   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
461   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
462   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
463   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
464   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
465   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
466   must complete, without further dependencies.  */
467
468   //naive, blocking implementation.
469   int i             = 0;
470   int j             = 0;
471   int size          = group->size();
472   std::vector<MPI_Request> reqs(size);
473
474   XBT_DEBUG("Entering MPI_Win_Start");
475   while (j != size) {
476     int src = comm_->group()->rank(group->actor(j));
477     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
478       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
479       i++;
480     }
481     j++;
482   }
483   size = i;
484   Request::startall(size, reqs.data());
485   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
486   for (i = 0; i < size; i++) {
487     Request::unref(&reqs[i]);
488   }
489   opened_++; //we're open for business !
490   group_=group;
491   group->ref();
492   XBT_DEBUG("Leaving MPI_Win_Start");
493   return MPI_SUCCESS;
494 }
495
496 int Win::post(MPI_Group group, int /*assert*/)
497 {
498   //let's make a synchronous send here
499   int i             = 0;
500   int j             = 0;
501   int size = group->size();
502   std::vector<MPI_Request> reqs(size);
503
504   XBT_DEBUG("Entering MPI_Win_Post");
505   while(j!=size){
506     int dst = comm_->group()->rank(group->actor(j));
507     if (dst != rank_ && dst != MPI_UNDEFINED) {
508       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
509       i++;
510     }
511     j++;
512   }
513   size=i;
514
515   Request::startall(size, reqs.data());
516   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
517   for(i=0;i<size;i++){
518     Request::unref(&reqs[i]);
519   }
520   opened_++; //we're open for business !
521   group_=group;
522   group->ref();
523   XBT_DEBUG("Leaving MPI_Win_Post");
524   return MPI_SUCCESS;
525 }
526
527 int Win::complete(){
528   if(opened_==0)
529     xbt_die("Complete called on already opened MPI_Win");
530
531   XBT_DEBUG("Entering MPI_Win_Complete");
532   int i             = 0;
533   int j             = 0;
534   int size          = group_->size();
535   std::vector<MPI_Request> reqs(size);
536
537   while(j!=size){
538     int dst = comm_->group()->rank(group_->actor(j));
539     if (dst != rank_ && dst != MPI_UNDEFINED) {
540       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
541       i++;
542     }
543     j++;
544   }
545   size=i;
546   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
547   Request::startall(size, reqs.data());
548   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
549
550   for(i=0;i<size;i++){
551     Request::unref(&reqs[i]);
552   }
553
554   int finished = finish_comms();
555   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
556
557   Group::unref(group_);
558   opened_--; //we're closed for business !
559   return MPI_SUCCESS;
560 }
561
562 int Win::wait(){
563   //naive, blocking implementation.
564   XBT_DEBUG("Entering MPI_Win_Wait");
565   int i             = 0;
566   int j             = 0;
567   int size          = group_->size();
568   std::vector<MPI_Request> reqs(size);
569
570   while(j!=size){
571     int src = comm_->group()->rank(group_->actor(j));
572     if (src != rank_ && src != MPI_UNDEFINED) {
573       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
574       i++;
575     }
576     j++;
577   }
578   size=i;
579   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
580   Request::startall(size, reqs.data());
581   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
582   for(i=0;i<size;i++){
583     Request::unref(&reqs[i]);
584   }
585   int finished = finish_comms();
586   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
587
588   Group::unref(group_);
589   opened_--; //we're opened for business !
590   return MPI_SUCCESS;
591 }
592
593 int Win::lock(int lock_type, int rank, int /*assert*/)
594 {
595   MPI_Win target_win = connected_wins_[rank];
596
597   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
598     target_win->lock_mut_->lock();
599     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
600     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
601       target_win->lock_mut_->unlock();
602    }
603   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
604     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
605
606   target_win->lockers_.push_back(comm_->rank());
607
608   int finished = finish_comms(rank);
609   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
610   finished = target_win->finish_comms(rank_);
611   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
612   return MPI_SUCCESS;
613 }
614
615 int Win::lock_all(int assert){
616   int retval = MPI_SUCCESS;
617   for (int i = 0; i < comm_->size(); i++) {
618     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
619     if (ret != MPI_SUCCESS)
620       retval = ret;
621   }
622   return retval;
623 }
624
625 int Win::unlock(int rank){
626   MPI_Win target_win = connected_wins_[rank];
627   int target_mode = target_win->mode_;
628   target_win->mode_= 0;
629   target_win->lockers_.remove(comm_->rank());
630   if (target_mode==MPI_LOCK_EXCLUSIVE){
631     target_win->lock_mut_->unlock();
632   }
633
634   int finished = finish_comms(rank);
635   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
636   finished = target_win->finish_comms(rank_);
637   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
638   return MPI_SUCCESS;
639 }
640
641 int Win::unlock_all(){
642   int retval = MPI_SUCCESS;
643   for (int i = 0; i < comm_->size(); i++) {
644     int ret = this->unlock(i);
645     if (ret != MPI_SUCCESS)
646       retval = ret;
647   }
648   return retval;
649 }
650
651 int Win::flush(int rank){
652   MPI_Win target_win = connected_wins_[rank];
653   int finished       = finish_comms(rank_);
654   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
655   finished = target_win->finish_comms(rank);
656   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
657   return MPI_SUCCESS;
658 }
659
660 int Win::flush_local(int rank){
661   int finished = finish_comms(rank);
662   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
663   return MPI_SUCCESS;
664 }
665
666 int Win::flush_all(){
667   int finished = finish_comms();
668   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
669   for (int i = 0; i < comm_->size(); i++) {
670     finished = connected_wins_[i]->finish_comms(rank_);
671     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
672   }
673   return MPI_SUCCESS;
674 }
675
676 int Win::flush_local_all(){
677   int finished = finish_comms();
678   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
679   return MPI_SUCCESS;
680 }
681
682 Win* Win::f2c(int id){
683   return static_cast<Win*>(F2C::f2c(id));
684 }
685
686 int Win::finish_comms(){
687   mut_->lock();
688   //Finish own requests
689   std::vector<MPI_Request> *reqqs = requests_;
690   int size = static_cast<int>(reqqs->size());
691   if (size > 0) {
692     MPI_Request* treqs = &(*reqqs)[0];
693     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
694     reqqs->clear();
695   }
696   mut_->unlock();
697   return size;
698 }
699
700 int Win::finish_comms(int rank){
701   mut_->lock();
702   //Finish own requests
703   std::vector<MPI_Request> *reqqs = requests_;
704   int size = static_cast<int>(reqqs->size());
705   if (size > 0) {
706     size = 0;
707     std::vector<MPI_Request> myreqqs;
708     auto iter                               = reqqs->begin();
709     int proc_id                             = comm_->group()->actor(rank)->get_pid();
710     while (iter != reqqs->end()){
711       // Let's see if we're either the destination or the sender of this request
712       // because we only wait for requests that we are responsible for.
713       // Also use the process id here since the request itself returns from src()
714       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
715       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
716         myreqqs.push_back(*iter);
717         iter = reqqs->erase(iter);
718         size++;
719       } else {
720         ++iter;
721       }
722     }
723     if(size >0){
724       MPI_Request* treqs = &myreqqs[0];
725       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
726       myreqqs.clear();
727     }
728   }
729   mut_->unlock();
730   return size;
731 }
732
733 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
734 {
735   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
736   for (int i = 0; not target_win && i < comm_->size(); i++) {
737     if (connected_wins_[i]->size_ > 0)
738       target_win = connected_wins_[i];
739   }
740   if (target_win) {
741     *size                         = target_win->size_;
742     *disp_unit                    = target_win->disp_unit_;
743     *static_cast<void**>(baseptr) = target_win->base_;
744   } else {
745     *size                         = 0;
746     *static_cast<void**>(baseptr) = nullptr;
747   }
748   return MPI_SUCCESS;
749 }
750
751 MPI_Errhandler Win::errhandler()
752 {
753   if (errhandler_ != MPI_ERRHANDLER_NULL)
754     errhandler_->ref();
755   return errhandler_;
756 }
757
758 void Win::set_errhandler(MPI_Errhandler errhandler)
759 {
760   if (errhandler_ != MPI_ERRHANDLER_NULL)
761     simgrid::smpi::Errhandler::unref(errhandler_);
762   errhandler_ = errhandler;
763   if (errhandler_ != MPI_ERRHANDLER_NULL)
764     errhandler_->ref();
765 }
766 } // namespace smpi
767 } // namespace simgrid