Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Some int -> bool conversions (+ use of existing macro).
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)                                                                                          \
30   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
31     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
32     if (not locked)                                                                                                    \
33       return MPI_ERR_WIN;                                                                                              \
34   }
35
36 namespace simgrid{
37 namespace smpi{
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
40
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
42     : base_(base)
43     , size_(size)
44     , disp_unit_(disp_unit)
45     , info_(info)
46     , comm_(comm)
47     , connected_wins_(comm->size())
48     , rank_(comm->rank())
49     , allocated_(allocated)
50     , dynamic_(dynamic)
51 {
52   XBT_DEBUG("Creating window");
53   if(info!=MPI_INFO_NULL)
54     info->ref();
55   connected_wins_[rank_] = this;
56   if(rank_==0){
57     bar_ = new s4u::Barrier(comm->size());
58   }
59   errhandler_->ref();
60   comm->add_rma_win(this);
61   comm->ref();
62
63   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64                    MPI_BYTE, comm);
65
66   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67
68   colls::barrier(comm);
69   this->add_f();
70 }
71
72 Win::~Win(){
73   //As per the standard, perform a barrier to ensure every async comm is finished
74   bar_->wait();
75
76   int finished = finish_comms();
77   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
78
79   if (info_ != MPI_INFO_NULL)
80     simgrid::smpi::Info::unref(info_);
81   if (errhandler_ != MPI_ERRHANDLER_NULL)
82     simgrid::smpi::Errhandler::unref(errhandler_);
83
84   comm_->remove_rma_win(this);
85
86   colls::barrier(comm_);
87   Comm::unref(comm_);
88   
89   if (rank_ == 0)
90     delete bar_;
91
92   if (allocated_)
93     xbt_free(base_);
94
95   F2C::free_f(this->f2c_id());
96   cleanup_attr<Win>();
97 }
98
99 int Win::attach(void* /*base*/, MPI_Aint size)
100 {
101   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
102     return MPI_ERR_ARG;
103   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
104   size_+=size;
105   return MPI_SUCCESS;
106 }
107
108 int Win::detach(const void* /*base*/)
109 {
110   base_=MPI_BOTTOM;
111   size_=-1;
112   return MPI_SUCCESS;
113 }
114
115 void Win::get_name(char* name, int* length) const
116 {
117   *length = static_cast<int>(name_.length());
118   if (not name_.empty()) {
119     name_.copy(name, *length);
120     name[*length] = '\0';
121   }
122 }
123
124 void Win::get_group(MPI_Group* group){
125   if(comm_ != MPI_COMM_NULL){
126     *group = comm_->group();
127   } else {
128     *group = MPI_GROUP_NULL;
129   }
130 }
131
132 MPI_Info Win::info()
133 {
134   if (info_ == MPI_INFO_NULL)
135     info_ = new Info();
136   info_->ref();
137   return info_;
138 }
139
140 int Win::rank() const
141 {
142   return rank_;
143 }
144
145 MPI_Comm Win::comm() const
146 {
147   return comm_;
148 }
149
150 MPI_Aint Win::size() const
151 {
152   return size_;
153 }
154
155 void* Win::base() const
156 {
157   return base_;
158 }
159
160 int Win::disp_unit() const
161 {
162   return disp_unit_;
163 }
164
165 bool Win::dynamic() const
166 {
167   return dynamic_;
168 }
169
170 void Win::set_info(MPI_Info info)
171 {
172   if (info_ != MPI_INFO_NULL)
173     simgrid::smpi::Info::unref(info_);
174   info_ = info;
175   if (info_ != MPI_INFO_NULL)
176     info_->ref();
177 }
178
179 void Win::set_name(const char* name){
180   name_ = name;
181 }
182
183 int Win::fence(int assert)
184 {
185   XBT_DEBUG("Entering fence");
186   if (opened_ == 0)
187     opened_=1;
188   if (not (assert & MPI_MODE_NOPRECEDE)) {
189     // This is not the first fence => finalize what came before
190     bar_->wait();
191     mut_->lock();
192     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
193     // Without this, the vector could get redimensioned when another process pushes.
194     // This would result in the array used by Request::waitall() to be invalidated.
195     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
196
197     // start all requests that have been prepared by another process
198     if (not requests_.empty()) {
199       int size           = static_cast<int>(requests_.size());
200       MPI_Request* treqs = requests_.data();
201       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
202     }
203     count_=0;
204     mut_->unlock();
205   }
206
207   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
208     opened_=0;
209   assert_ = assert;
210
211   bar_->wait();
212   XBT_DEBUG("Leaving fence");
213
214   return MPI_SUCCESS;
215 }
216
217 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
218               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
219 {
220   //get receiver pointer
221   Win* recv_win = connected_wins_[target_rank];
222
223   CHECK_WIN_LOCKED(recv_win)
224   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
225
226   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
227
228   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
229     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
230     // prepare send_request
231     MPI_Request sreq =
232         // TODO cheinrich Check for rank / pid conversion
233         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
234                                comm_, MPI_OP_NULL);
235
236     //prepare receiver request
237     // TODO cheinrich Check for rank / pid conversion
238     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
239                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
240
241     //start send
242     sreq->start();
243
244     if(request!=nullptr){
245       *request=sreq;
246     }else{
247       mut_->lock();
248       requests_.push_back(sreq);
249       mut_->unlock();
250     }
251
252     //push request to receiver's win
253     recv_win->mut_->lock();
254     recv_win->requests_.push_back(rreq);
255     rreq->start();
256     recv_win->mut_->unlock();
257   } else {
258     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
259     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
260     if(request!=nullptr)
261       *request = MPI_REQUEST_NULL;
262   }
263
264   return MPI_SUCCESS;
265 }
266
267 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
268               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
269 {
270   //get sender pointer
271   Win* send_win = connected_wins_[target_rank];
272
273   CHECK_WIN_LOCKED(send_win)
274   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
275
276   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
277   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
278
279   if(target_rank != comm_->rank()){
280     //prepare send_request
281     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
282                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
283
284     //prepare receiver request
285     MPI_Request rreq = Request::rma_recv_init(
286         origin_addr, origin_count, origin_datatype, target_rank,
287         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
288         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
289
290     //start the send, with another process than us as sender.
291     sreq->start();
292     //push request to receiver's win
293     send_win->mut_->lock();
294     send_win->requests_.push_back(sreq);
295     send_win->mut_->unlock();
296
297     //start recv
298     rreq->start();
299
300     if(request!=nullptr){
301       *request=rreq;
302     }else{
303       mut_->lock();
304       requests_.push_back(rreq);
305       mut_->unlock();
306     }
307   } else {
308     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
309     if(request!=nullptr)
310       *request=MPI_REQUEST_NULL;
311   }
312   return MPI_SUCCESS;
313 }
314
315 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
316               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
317 {
318   XBT_DEBUG("Entering MPI_Win_Accumulate");
319   //get receiver pointer
320   Win* recv_win = connected_wins_[target_rank];
321
322   //FIXME: local version
323   CHECK_WIN_LOCKED(recv_win)
324   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
325
326   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
327   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
328   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
329   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
330   // prepare send_request
331
332   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
333                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
334
335   // prepare receiver request
336   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
337                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)),
338                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
339
340   count_++;
341
342   // start send
343   sreq->start();
344   // push request to receiver's win
345   recv_win->mut_->lock();
346   recv_win->requests_.push_back(rreq);
347   rreq->start();
348   recv_win->mut_->unlock();
349
350   if (request != nullptr) {
351     *request = sreq;
352   } else {
353     mut_->lock();
354     requests_.push_back(sreq);
355     mut_->unlock();
356   }
357
358   XBT_DEBUG("Leaving MPI_Win_Accumulate");
359   return MPI_SUCCESS;
360 }
361
362 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
363                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
364                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
365 {
366   //get sender pointer
367   const Win* send_win = connected_wins_[target_rank];
368
369   CHECK_WIN_LOCKED(send_win)
370   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
371
372   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
373   //need to be sure ops are correctly ordered, so finish request here ? slow.
374   MPI_Request req;
375   send_win->atomic_mut_->lock();
376   get(result_addr, result_count, result_datatype, target_rank,
377               target_disp, target_count, target_datatype, &req);
378   if (req != MPI_REQUEST_NULL)
379     Request::wait(&req, MPI_STATUS_IGNORE);
380   if(op!=MPI_NO_OP)
381     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
382               target_disp, target_count, target_datatype, op, &req);
383   if (req != MPI_REQUEST_NULL)
384     Request::wait(&req, MPI_STATUS_IGNORE);
385   send_win->atomic_mut_->unlock();
386   return MPI_SUCCESS;
387 }
388
389 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
390                           int target_rank, MPI_Aint target_disp)
391 {
392   //get sender pointer
393   const Win* send_win = connected_wins_[target_rank];
394
395   CHECK_WIN_LOCKED(send_win)
396
397   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
398   MPI_Request req = MPI_REQUEST_NULL;
399   send_win->atomic_mut_->lock();
400   get(result_addr, 1, datatype, target_rank,
401               target_disp, 1, datatype, &req);
402   if (req != MPI_REQUEST_NULL)
403     Request::wait(&req, MPI_STATUS_IGNORE);
404   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
405     put(origin_addr, 1, datatype, target_rank,
406               target_disp, 1, datatype);
407   }
408   send_win->atomic_mut_->unlock();
409   return MPI_SUCCESS;
410 }
411
412 int Win::start(MPI_Group group, int /*assert*/)
413 {
414   /* From MPI forum advices
415   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
416   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
417   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
418   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
419   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
420   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
421   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
422   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
423   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
424   must complete, without further dependencies.  */
425
426   //naive, blocking implementation.
427   int i             = 0;
428   int j             = 0;
429   int size          = group->size();
430   std::vector<MPI_Request> reqs(size);
431
432   XBT_DEBUG("Entering MPI_Win_Start");
433   while (j != size) {
434     int src = comm_->group()->rank(group->actor(j));
435     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
436       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
437       i++;
438     }
439     j++;
440   }
441   size = i;
442   Request::startall(size, reqs.data());
443   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
444   for (i = 0; i < size; i++) {
445     Request::unref(&reqs[i]);
446   }
447   group->ref();
448   dst_group_ = group;
449   opened_++; // we're open for business !
450   XBT_DEBUG("Leaving MPI_Win_Start");
451   return MPI_SUCCESS;
452 }
453
454 int Win::post(MPI_Group group, int /*assert*/)
455 {
456   //let's make a synchronous send here
457   int i             = 0;
458   int j             = 0;
459   int size = group->size();
460   std::vector<MPI_Request> reqs(size);
461
462   XBT_DEBUG("Entering MPI_Win_Post");
463   while(j!=size){
464     int dst = comm_->group()->rank(group->actor(j));
465     if (dst != rank_ && dst != MPI_UNDEFINED) {
466       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
467       i++;
468     }
469     j++;
470   }
471   size=i;
472
473   Request::startall(size, reqs.data());
474   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
475   for(i=0;i<size;i++){
476     Request::unref(&reqs[i]);
477   }
478   group->ref();
479   src_group_ = group;
480   opened_++; // we're open for business !
481   XBT_DEBUG("Leaving MPI_Win_Post");
482   return MPI_SUCCESS;
483 }
484
485 int Win::complete(){
486   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
487
488   XBT_DEBUG("Entering MPI_Win_Complete");
489   int i             = 0;
490   int j             = 0;
491   int size          = dst_group_->size();
492   std::vector<MPI_Request> reqs(size);
493
494   while(j!=size){
495     int dst = comm_->group()->rank(dst_group_->actor(j));
496     if (dst != rank_ && dst != MPI_UNDEFINED) {
497       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
498       i++;
499     }
500     j++;
501   }
502   size=i;
503   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
504   Request::startall(size, reqs.data());
505   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
506
507   for(i=0;i<size;i++){
508     Request::unref(&reqs[i]);
509   }
510
511   int finished = finish_comms();
512   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
513
514   opened_--; //we're closed for business !
515   Group::unref(dst_group_);
516   dst_group_ = MPI_GROUP_NULL;
517   return MPI_SUCCESS;
518 }
519
520 int Win::wait(){
521   //naive, blocking implementation.
522   XBT_DEBUG("Entering MPI_Win_Wait");
523   int i             = 0;
524   int j             = 0;
525   int size          = src_group_->size();
526   std::vector<MPI_Request> reqs(size);
527
528   while(j!=size){
529     int src = comm_->group()->rank(src_group_->actor(j));
530     if (src != rank_ && src != MPI_UNDEFINED) {
531       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
532       i++;
533     }
534     j++;
535   }
536   size=i;
537   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
538   Request::startall(size, reqs.data());
539   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
540   for(i=0;i<size;i++){
541     Request::unref(&reqs[i]);
542   }
543   int finished = finish_comms();
544   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
545
546   opened_--; //we're closed for business !
547   Group::unref(src_group_);
548   src_group_ = MPI_GROUP_NULL;
549   return MPI_SUCCESS;
550 }
551
552 int Win::lock(int lock_type, int rank, int /*assert*/)
553 {
554   MPI_Win target_win = connected_wins_[rank];
555
556   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
557     target_win->lock_mut_->lock();
558     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
559     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
560       target_win->lock_mut_->unlock();
561    }
562   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
563     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
564
565   target_win->lockers_.push_back(comm_->rank());
566
567   int finished = finish_comms(rank);
568   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
569   finished = target_win->finish_comms(rank_);
570   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
571   return MPI_SUCCESS;
572 }
573
574 int Win::lock_all(int assert){
575   int retval = MPI_SUCCESS;
576   for (int i = 0; i < comm_->size(); i++) {
577     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
578     if (ret != MPI_SUCCESS)
579       retval = ret;
580   }
581   return retval;
582 }
583
584 int Win::unlock(int rank){
585   MPI_Win target_win = connected_wins_[rank];
586   int target_mode = target_win->mode_;
587   target_win->mode_= 0;
588   target_win->lockers_.remove(comm_->rank());
589   if (target_mode==MPI_LOCK_EXCLUSIVE){
590     target_win->lock_mut_->unlock();
591   }
592
593   int finished = finish_comms(rank);
594   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
595   finished = target_win->finish_comms(rank_);
596   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
597   return MPI_SUCCESS;
598 }
599
600 int Win::unlock_all(){
601   int retval = MPI_SUCCESS;
602   for (int i = 0; i < comm_->size(); i++) {
603     int ret = this->unlock(i);
604     if (ret != MPI_SUCCESS)
605       retval = ret;
606   }
607   return retval;
608 }
609
610 int Win::flush(int rank){
611   MPI_Win target_win = connected_wins_[rank];
612   int finished       = finish_comms(rank_);
613   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
614   finished = target_win->finish_comms(rank);
615   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
616   return MPI_SUCCESS;
617 }
618
619 int Win::flush_local(int rank){
620   int finished = finish_comms(rank);
621   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
622   return MPI_SUCCESS;
623 }
624
625 int Win::flush_all(){
626   int finished = finish_comms();
627   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
628   for (int i = 0; i < comm_->size(); i++) {
629     finished = connected_wins_[i]->finish_comms(rank_);
630     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
631   }
632   return MPI_SUCCESS;
633 }
634
635 int Win::flush_local_all(){
636   int finished = finish_comms();
637   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
638   return MPI_SUCCESS;
639 }
640
641 Win* Win::f2c(int id){
642   return static_cast<Win*>(F2C::f2c(id));
643 }
644
645 int Win::finish_comms(){
646   mut_->lock();
647   //Finish own requests
648   int size = static_cast<int>(requests_.size());
649   if (size > 0) {
650     MPI_Request* treqs = requests_.data();
651     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
652     requests_.clear();
653   }
654   mut_->unlock();
655   return size;
656 }
657
658 int Win::finish_comms(int rank){
659   mut_->lock();
660   // Finish own requests
661   // Let's see if we're either the destination or the sender of this request
662   // because we only wait for requests that we are responsible for.
663   // Also use the process id here since the request itself returns from src()
664   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
665   aid_t proc_id = comm_->group()->actor(rank);
666   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
667     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
668   });
669   std::vector<MPI_Request> myreqqs(it, end(requests_));
670   requests_.erase(it, end(requests_));
671   int size = static_cast<int>(myreqqs.size());
672   if (size > 0) {
673     MPI_Request* treqs = myreqqs.data();
674     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
675     myreqqs.clear();
676   }
677   mut_->unlock();
678   return size;
679 }
680
681 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
682 {
683   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
684   for (int i = 0; not target_win && i < comm_->size(); i++) {
685     if (connected_wins_[i]->size_ > 0)
686       target_win = connected_wins_[i];
687   }
688   if (target_win) {
689     *size                         = target_win->size_;
690     *disp_unit                    = target_win->disp_unit_;
691     *static_cast<void**>(baseptr) = target_win->base_;
692   } else {
693     *size                         = 0;
694     *static_cast<void**>(baseptr) = nullptr;
695   }
696   return MPI_SUCCESS;
697 }
698
699 MPI_Errhandler Win::errhandler()
700 {
701   if (errhandler_ != MPI_ERRHANDLER_NULL)
702     errhandler_->ref();
703   return errhandler_;
704 }
705
706 void Win::set_errhandler(MPI_Errhandler errhandler)
707 {
708   if (errhandler_ != MPI_ERRHANDLER_NULL)
709     simgrid::smpi::Errhandler::unref(errhandler_);
710   errhandler_ = errhandler;
711   if (errhandler_ != MPI_ERRHANDLER_NULL)
712     errhandler_->ref();
713 }
714 } // namespace smpi
715 } // namespace simgrid