Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Handle case where different groups are given to MPI_Win_start and MPI_Win_post on...
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)\
30   if(opened_==0){ /*check that post/start has been done*/\
31     int locked=0;\
32     for (auto const& it : win->lockers_)\
33       if (it == comm_->rank())\
34         locked = 1;\
35     if(locked != 1)\
36       return MPI_ERR_WIN;\
37   }
38
39 namespace simgrid{
40 namespace smpi{
41 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
42 int Win::keyval_id_=0;
43
44 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
45     : base_(base)
46     , size_(size)
47     , disp_unit_(disp_unit)
48     , info_(info)
49     , comm_(comm)
50     , connected_wins_(comm->size())
51     , rank_(comm->rank())
52     , allocated_(allocated)
53     , dynamic_(dynamic)
54 {
55   XBT_DEBUG("Creating window");
56   if(info!=MPI_INFO_NULL)
57     info->ref();
58   connected_wins_[rank_] = this;
59   if(rank_==0){
60     bar_ = new s4u::Barrier(comm->size());
61   }
62   errhandler_->ref();
63   comm->add_rma_win(this);
64   comm->ref();
65
66   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
67                    MPI_BYTE, comm);
68
69   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
70
71   colls::barrier(comm);
72   this->add_f();
73 }
74
75 Win::~Win(){
76   //As per the standard, perform a barrier to ensure every async comm is finished
77   bar_->wait();
78
79   int finished = finish_comms();
80   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
81
82   if (info_ != MPI_INFO_NULL)
83     simgrid::smpi::Info::unref(info_);
84   if (errhandler_ != MPI_ERRHANDLER_NULL)
85     simgrid::smpi::Errhandler::unref(errhandler_);
86
87   comm_->remove_rma_win(this);
88
89   colls::barrier(comm_);
90   Comm::unref(comm_);
91   
92   if (rank_ == 0)
93     delete bar_;
94
95   if(allocated_ !=0)
96     xbt_free(base_);
97
98   F2C::free_f(this->f2c_id());
99   cleanup_attr<Win>();
100 }
101
102 int Win::attach(void* /*base*/, MPI_Aint size)
103 {
104   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
105     return MPI_ERR_ARG;
106   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
107   size_+=size;
108   return MPI_SUCCESS;
109 }
110
111 int Win::detach(const void* /*base*/)
112 {
113   base_=MPI_BOTTOM;
114   size_=-1;
115   return MPI_SUCCESS;
116 }
117
118 void Win::get_name(char* name, int* length) const
119 {
120   *length = static_cast<int>(name_.length());
121   if (not name_.empty()) {
122     name_.copy(name, *length);
123     name[*length] = '\0';
124   }
125 }
126
127 void Win::get_group(MPI_Group* group){
128   if(comm_ != MPI_COMM_NULL){
129     *group = comm_->group();
130   } else {
131     *group = MPI_GROUP_NULL;
132   }
133 }
134
135 MPI_Info Win::info()
136 {
137   if (info_ == MPI_INFO_NULL)
138     info_ = new Info();
139   info_->ref();
140   return info_;
141 }
142
143 int Win::rank() const
144 {
145   return rank_;
146 }
147
148 MPI_Comm Win::comm() const
149 {
150   return comm_;
151 }
152
153 MPI_Aint Win::size() const
154 {
155   return size_;
156 }
157
158 void* Win::base() const
159 {
160   return base_;
161 }
162
163 int Win::disp_unit() const
164 {
165   return disp_unit_;
166 }
167
168 int Win::dynamic() const
169 {
170   return dynamic_;
171 }
172
173 void Win::set_info(MPI_Info info)
174 {
175   if (info_ != MPI_INFO_NULL)
176     simgrid::smpi::Info::unref(info_);
177   info_ = info;
178   if (info_ != MPI_INFO_NULL)
179     info_->ref();
180 }
181
182 void Win::set_name(const char* name){
183   name_ = name;
184 }
185
186 int Win::fence(int assert)
187 {
188   XBT_DEBUG("Entering fence");
189   if (opened_ == 0)
190     opened_=1;
191   if (assert != MPI_MODE_NOPRECEDE) {
192     // This is not the first fence => finalize what came before
193     bar_->wait();
194     mut_->lock();
195     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
196     // Without this, the vector could get redimensioned when another process pushes.
197     // This would result in the array used by Request::waitall() to be invalidated.
198     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
199
200     // start all requests that have been prepared by another process
201     if (not requests_.empty()) {
202       int size           = static_cast<int>(requests_.size());
203       MPI_Request* treqs = requests_.data();
204       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
205     }
206     count_=0;
207     mut_->unlock();
208   }
209
210   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
211     opened_=0;
212   assert_ = assert;
213
214   bar_->wait();
215   XBT_DEBUG("Leaving fence");
216
217   return MPI_SUCCESS;
218 }
219
220 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
221               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
222 {
223   //get receiver pointer
224   Win* recv_win = connected_wins_[target_rank];
225
226   if(opened_==0){//check that post/start has been done
227     // no fence or start .. lock ok ?
228     int locked=0;
229     for (auto const& it : recv_win->lockers_)
230       if (it == comm_->rank())
231         locked = 1;
232     if(locked != 1)
233       return MPI_ERR_WIN;
234   }
235
236   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
237
238   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
239
240   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
241     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
242     // prepare send_request
243     MPI_Request sreq =
244         // TODO cheinrich Check for rank / pid conversion
245         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
246                                comm_, MPI_OP_NULL);
247
248     //prepare receiver request
249     // TODO cheinrich Check for rank / pid conversion
250     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
251                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
252
253     //start send
254     sreq->start();
255
256     if(request!=nullptr){
257       *request=sreq;
258     }else{
259       mut_->lock();
260       requests_.push_back(sreq);
261       mut_->unlock();
262     }
263
264     //push request to receiver's win
265     recv_win->mut_->lock();
266     recv_win->requests_.push_back(rreq);
267     rreq->start();
268     recv_win->mut_->unlock();
269   } else {
270     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
271     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
272     if(request!=nullptr)
273       *request = MPI_REQUEST_NULL;
274   }
275
276   return MPI_SUCCESS;
277 }
278
279 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
280               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
281 {
282   //get sender pointer
283   Win* send_win = connected_wins_[target_rank];
284
285   CHECK_WIN_LOCKED(send_win)
286   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
287
288   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
289   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
290
291   if(target_rank != comm_->rank()){
292     //prepare send_request
293     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
294                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
295
296     //prepare receiver request
297     MPI_Request rreq = Request::rma_recv_init(
298         origin_addr, origin_count, origin_datatype, target_rank,
299         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
300         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
301
302     //start the send, with another process than us as sender.
303     sreq->start();
304     //push request to receiver's win
305     send_win->mut_->lock();
306     send_win->requests_.push_back(sreq);
307     send_win->mut_->unlock();
308
309     //start recv
310     rreq->start();
311
312     if(request!=nullptr){
313       *request=rreq;
314     }else{
315       mut_->lock();
316       requests_.push_back(rreq);
317       mut_->unlock();
318     }
319   } else {
320     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
321     if(request!=nullptr)
322       *request=MPI_REQUEST_NULL;
323   }
324   return MPI_SUCCESS;
325 }
326
327 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
328               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
329 {
330   XBT_DEBUG("Entering MPI_Win_Accumulate");
331   //get receiver pointer
332   Win* recv_win = connected_wins_[target_rank];
333
334   //FIXME: local version
335   CHECK_WIN_LOCKED(recv_win)
336   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
337
338   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
339   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
340   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
341   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
342   // prepare send_request
343
344   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
345                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
346
347   // prepare receiver request
348   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
349                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)),
350                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
351
352   count_++;
353
354   // start send
355   sreq->start();
356   // push request to receiver's win
357   recv_win->mut_->lock();
358   recv_win->requests_.push_back(rreq);
359   rreq->start();
360   recv_win->mut_->unlock();
361
362   if (request != nullptr) {
363     *request = sreq;
364   } else {
365     mut_->lock();
366     requests_.push_back(sreq);
367     mut_->unlock();
368   }
369
370   XBT_DEBUG("Leaving MPI_Win_Accumulate");
371   return MPI_SUCCESS;
372 }
373
374 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
375                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
376                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
377 {
378   //get sender pointer
379   const Win* send_win = connected_wins_[target_rank];
380
381   CHECK_WIN_LOCKED(send_win)
382   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
383
384   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
385   //need to be sure ops are correctly ordered, so finish request here ? slow.
386   MPI_Request req;
387   send_win->atomic_mut_->lock();
388   get(result_addr, result_count, result_datatype, target_rank,
389               target_disp, target_count, target_datatype, &req);
390   if (req != MPI_REQUEST_NULL)
391     Request::wait(&req, MPI_STATUS_IGNORE);
392   if(op!=MPI_NO_OP)
393     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
394               target_disp, target_count, target_datatype, op, &req);
395   if (req != MPI_REQUEST_NULL)
396     Request::wait(&req, MPI_STATUS_IGNORE);
397   send_win->atomic_mut_->unlock();
398   return MPI_SUCCESS;
399 }
400
401 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
402                           int target_rank, MPI_Aint target_disp)
403 {
404   //get sender pointer
405   const Win* send_win = connected_wins_[target_rank];
406
407   CHECK_WIN_LOCKED(send_win)
408
409   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
410   MPI_Request req = MPI_REQUEST_NULL;
411   send_win->atomic_mut_->lock();
412   get(result_addr, 1, datatype, target_rank,
413               target_disp, 1, datatype, &req);
414   if (req != MPI_REQUEST_NULL)
415     Request::wait(&req, MPI_STATUS_IGNORE);
416   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
417     put(origin_addr, 1, datatype, target_rank,
418               target_disp, 1, datatype);
419   }
420   send_win->atomic_mut_->unlock();
421   return MPI_SUCCESS;
422 }
423
424 int Win::start(MPI_Group group, int /*assert*/)
425 {
426   /* From MPI forum advices
427   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
428   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
429   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
430   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
431   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
432   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
433   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
434   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
435   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
436   must complete, without further dependencies.  */
437
438   //naive, blocking implementation.
439   int i             = 0;
440   int j             = 0;
441   int size          = group->size();
442   std::vector<MPI_Request> reqs(size);
443
444   XBT_DEBUG("Entering MPI_Win_Start");
445   while (j != size) {
446     int src = comm_->group()->rank(group->actor(j));
447     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
448       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
449       i++;
450     }
451     j++;
452   }
453   size = i;
454   Request::startall(size, reqs.data());
455   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
456   for (i = 0; i < size; i++) {
457     Request::unref(&reqs[i]);
458   }
459   group->ref();
460   dst_group_ = group;
461   opened_++; // we're open for business !
462   XBT_DEBUG("Leaving MPI_Win_Start");
463   return MPI_SUCCESS;
464 }
465
466 int Win::post(MPI_Group group, int /*assert*/)
467 {
468   //let's make a synchronous send here
469   int i             = 0;
470   int j             = 0;
471   int size = group->size();
472   std::vector<MPI_Request> reqs(size);
473
474   XBT_DEBUG("Entering MPI_Win_Post");
475   while(j!=size){
476     int dst = comm_->group()->rank(group->actor(j));
477     if (dst != rank_ && dst != MPI_UNDEFINED) {
478       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
479       i++;
480     }
481     j++;
482   }
483   size=i;
484
485   Request::startall(size, reqs.data());
486   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
487   for(i=0;i<size;i++){
488     Request::unref(&reqs[i]);
489   }
490   group->ref();
491   src_group_ = group;
492   opened_++; // we're open for business !
493   XBT_DEBUG("Leaving MPI_Win_Post");
494   return MPI_SUCCESS;
495 }
496
497 int Win::complete(){
498   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
499
500   XBT_DEBUG("Entering MPI_Win_Complete");
501   int i             = 0;
502   int j             = 0;
503   int size          = dst_group_->size();
504   std::vector<MPI_Request> reqs(size);
505
506   while(j!=size){
507     int dst = comm_->group()->rank(dst_group_->actor(j));
508     if (dst != rank_ && dst != MPI_UNDEFINED) {
509       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
510       i++;
511     }
512     j++;
513   }
514   size=i;
515   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
516   Request::startall(size, reqs.data());
517   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
518
519   for(i=0;i<size;i++){
520     Request::unref(&reqs[i]);
521   }
522
523   int finished = finish_comms();
524   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
525
526   opened_--; //we're closed for business !
527   Group::unref(dst_group_);
528   dst_group_ = MPI_GROUP_NULL;
529   return MPI_SUCCESS;
530 }
531
532 int Win::wait(){
533   //naive, blocking implementation.
534   XBT_DEBUG("Entering MPI_Win_Wait");
535   int i             = 0;
536   int j             = 0;
537   int size          = src_group_->size();
538   std::vector<MPI_Request> reqs(size);
539
540   while(j!=size){
541     int src = comm_->group()->rank(src_group_->actor(j));
542     if (src != rank_ && src != MPI_UNDEFINED) {
543       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
544       i++;
545     }
546     j++;
547   }
548   size=i;
549   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
550   Request::startall(size, reqs.data());
551   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
552   for(i=0;i<size;i++){
553     Request::unref(&reqs[i]);
554   }
555   int finished = finish_comms();
556   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
557
558   opened_--; //we're closed for business !
559   Group::unref(src_group_);
560   src_group_ = MPI_GROUP_NULL;
561   return MPI_SUCCESS;
562 }
563
564 int Win::lock(int lock_type, int rank, int /*assert*/)
565 {
566   MPI_Win target_win = connected_wins_[rank];
567
568   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
569     target_win->lock_mut_->lock();
570     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
571     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
572       target_win->lock_mut_->unlock();
573    }
574   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
575     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
576
577   target_win->lockers_.push_back(comm_->rank());
578
579   int finished = finish_comms(rank);
580   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
581   finished = target_win->finish_comms(rank_);
582   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
583   return MPI_SUCCESS;
584 }
585
586 int Win::lock_all(int assert){
587   int retval = MPI_SUCCESS;
588   for (int i = 0; i < comm_->size(); i++) {
589     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
590     if (ret != MPI_SUCCESS)
591       retval = ret;
592   }
593   return retval;
594 }
595
596 int Win::unlock(int rank){
597   MPI_Win target_win = connected_wins_[rank];
598   int target_mode = target_win->mode_;
599   target_win->mode_= 0;
600   target_win->lockers_.remove(comm_->rank());
601   if (target_mode==MPI_LOCK_EXCLUSIVE){
602     target_win->lock_mut_->unlock();
603   }
604
605   int finished = finish_comms(rank);
606   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
607   finished = target_win->finish_comms(rank_);
608   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
609   return MPI_SUCCESS;
610 }
611
612 int Win::unlock_all(){
613   int retval = MPI_SUCCESS;
614   for (int i = 0; i < comm_->size(); i++) {
615     int ret = this->unlock(i);
616     if (ret != MPI_SUCCESS)
617       retval = ret;
618   }
619   return retval;
620 }
621
622 int Win::flush(int rank){
623   MPI_Win target_win = connected_wins_[rank];
624   int finished       = finish_comms(rank_);
625   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
626   finished = target_win->finish_comms(rank);
627   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
628   return MPI_SUCCESS;
629 }
630
631 int Win::flush_local(int rank){
632   int finished = finish_comms(rank);
633   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
634   return MPI_SUCCESS;
635 }
636
637 int Win::flush_all(){
638   int finished = finish_comms();
639   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
640   for (int i = 0; i < comm_->size(); i++) {
641     finished = connected_wins_[i]->finish_comms(rank_);
642     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
643   }
644   return MPI_SUCCESS;
645 }
646
647 int Win::flush_local_all(){
648   int finished = finish_comms();
649   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
650   return MPI_SUCCESS;
651 }
652
653 Win* Win::f2c(int id){
654   return static_cast<Win*>(F2C::f2c(id));
655 }
656
657 int Win::finish_comms(){
658   mut_->lock();
659   //Finish own requests
660   int size = static_cast<int>(requests_.size());
661   if (size > 0) {
662     MPI_Request* treqs = requests_.data();
663     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
664     requests_.clear();
665   }
666   mut_->unlock();
667   return size;
668 }
669
670 int Win::finish_comms(int rank){
671   mut_->lock();
672   // Finish own requests
673   // Let's see if we're either the destination or the sender of this request
674   // because we only wait for requests that we are responsible for.
675   // Also use the process id here since the request itself returns from src()
676   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
677   aid_t proc_id = comm_->group()->actor(rank);
678   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
679     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
680   });
681   std::vector<MPI_Request> myreqqs(it, end(requests_));
682   requests_.erase(it, end(requests_));
683   int size = static_cast<int>(myreqqs.size());
684   if (size > 0) {
685     MPI_Request* treqs = myreqqs.data();
686     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
687     myreqqs.clear();
688   }
689   mut_->unlock();
690   return size;
691 }
692
693 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
694 {
695   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
696   for (int i = 0; not target_win && i < comm_->size(); i++) {
697     if (connected_wins_[i]->size_ > 0)
698       target_win = connected_wins_[i];
699   }
700   if (target_win) {
701     *size                         = target_win->size_;
702     *disp_unit                    = target_win->disp_unit_;
703     *static_cast<void**>(baseptr) = target_win->base_;
704   } else {
705     *size                         = 0;
706     *static_cast<void**>(baseptr) = nullptr;
707   }
708   return MPI_SUCCESS;
709 }
710
711 MPI_Errhandler Win::errhandler()
712 {
713   if (errhandler_ != MPI_ERRHANDLER_NULL)
714     errhandler_->ref();
715   return errhandler_;
716 }
717
718 void Win::set_errhandler(MPI_Errhandler errhandler)
719 {
720   if (errhandler_ != MPI_ERRHANDLER_NULL)
721     simgrid::smpi::Errhandler::unref(errhandler_);
722   errhandler_ = errhandler;
723   if (errhandler_ != MPI_ERRHANDLER_NULL)
724     errhandler_->ref();
725 }
726 } // namespace smpi
727 } // namespace simgrid