Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Ensure correct ordering of the accumulate requests.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)                                                                                          \
30   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
31     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
32     if (not locked)                                                                                                    \
33       return MPI_ERR_WIN;                                                                                              \
34   }
35
36 namespace simgrid{
37 namespace smpi{
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
40
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
42     : base_(base)
43     , size_(size)
44     , disp_unit_(disp_unit)
45     , info_(info)
46     , comm_(comm)
47     , connected_wins_(comm->size())
48     , rank_(comm->rank())
49     , allocated_(allocated)
50     , dynamic_(dynamic)
51 {
52   XBT_DEBUG("Creating window");
53   if(info!=MPI_INFO_NULL)
54     info->ref();
55   connected_wins_[rank_] = this;
56   if(rank_==0){
57     bar_ = new s4u::Barrier(comm->size());
58   }
59   errhandler_->ref();
60   comm->add_rma_win(this);
61   comm->ref();
62
63   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64                    MPI_BYTE, comm);
65
66   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67
68   colls::barrier(comm);
69   this->add_f();
70 }
71
72 Win::~Win(){
73   //As per the standard, perform a barrier to ensure every async comm is finished
74   bar_->wait();
75
76   flush_local_all();
77
78   if (info_ != MPI_INFO_NULL)
79     simgrid::smpi::Info::unref(info_);
80   if (errhandler_ != MPI_ERRHANDLER_NULL)
81     simgrid::smpi::Errhandler::unref(errhandler_);
82
83   comm_->remove_rma_win(this);
84
85   colls::barrier(comm_);
86   Comm::unref(comm_);
87   
88   if (rank_ == 0)
89     delete bar_;
90
91   if (allocated_)
92     xbt_free(base_);
93
94   F2C::free_f(this->f2c_id());
95   cleanup_attr<Win>();
96 }
97
98 int Win::attach(void* /*base*/, MPI_Aint size)
99 {
100   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
101     return MPI_ERR_ARG;
102   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
103   size_+=size;
104   return MPI_SUCCESS;
105 }
106
107 int Win::detach(const void* /*base*/)
108 {
109   base_=MPI_BOTTOM;
110   size_=-1;
111   return MPI_SUCCESS;
112 }
113
114 void Win::get_name(char* name, int* length) const
115 {
116   *length = static_cast<int>(name_.length());
117   if (not name_.empty()) {
118     name_.copy(name, *length);
119     name[*length] = '\0';
120   }
121 }
122
123 void Win::get_group(MPI_Group* group){
124   if(comm_ != MPI_COMM_NULL){
125     *group = comm_->group();
126   } else {
127     *group = MPI_GROUP_NULL;
128   }
129 }
130
131 MPI_Info Win::info()
132 {
133   if (info_ == MPI_INFO_NULL)
134     info_ = new Info();
135   info_->ref();
136   return info_;
137 }
138
139 int Win::rank() const
140 {
141   return rank_;
142 }
143
144 MPI_Comm Win::comm() const
145 {
146   return comm_;
147 }
148
149 MPI_Aint Win::size() const
150 {
151   return size_;
152 }
153
154 void* Win::base() const
155 {
156   return base_;
157 }
158
159 int Win::disp_unit() const
160 {
161   return disp_unit_;
162 }
163
164 bool Win::dynamic() const
165 {
166   return dynamic_;
167 }
168
169 void Win::set_info(MPI_Info info)
170 {
171   if (info_ != MPI_INFO_NULL)
172     simgrid::smpi::Info::unref(info_);
173   info_ = info;
174   if (info_ != MPI_INFO_NULL)
175     info_->ref();
176 }
177
178 void Win::set_name(const char* name){
179   name_ = name;
180 }
181
182 int Win::fence(int assert)
183 {
184   XBT_DEBUG("Entering fence");
185   if (opened_ == 0)
186     opened_=1;
187   if (not (assert & MPI_MODE_NOPRECEDE)) {
188     // This is not the first fence => finalize what came before
189     bar_->wait();
190     flush_local_all();
191     count_=0;
192   }
193
194   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
195     opened_=0;
196   assert_ = assert;
197
198   bar_->wait();
199   XBT_DEBUG("Leaving fence");
200
201   return MPI_SUCCESS;
202 }
203
204 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
205               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
206 {
207   //get receiver pointer
208   Win* recv_win = connected_wins_[target_rank];
209
210   CHECK_WIN_LOCKED(recv_win)
211   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
212
213   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
214
215   if (target_rank != rank_) { // This is not for myself, so we need to send messages
216     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
217     // prepare send_request
218     MPI_Request sreq =
219         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
220                                MPI_OP_NULL);
221
222     //prepare receiver request
223     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
224                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
225
226     //start send
227     sreq->start();
228
229     if(request!=nullptr){
230       *request=sreq;
231     }else{
232       mut_->lock();
233       requests_.push_back(sreq);
234       mut_->unlock();
235     }
236
237     //push request to receiver's win
238     recv_win->mut_->lock();
239     recv_win->requests_.push_back(rreq);
240     rreq->start();
241     recv_win->mut_->unlock();
242   } else {
243     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
244     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
245     if(request!=nullptr)
246       *request = MPI_REQUEST_NULL;
247   }
248
249   return MPI_SUCCESS;
250 }
251
252 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
253               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
254 {
255   //get sender pointer
256   Win* send_win = connected_wins_[target_rank];
257
258   CHECK_WIN_LOCKED(send_win)
259   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
260
261   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
262   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
263
264   if (target_rank != rank_) {
265     //prepare send_request
266     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
267                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
268
269     //prepare receiver request
270     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
271                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
272
273     //start the send, with another process than us as sender.
274     sreq->start();
275     // push request to sender's win
276     send_win->mut_->lock();
277     send_win->requests_.push_back(sreq);
278     send_win->mut_->unlock();
279
280     //start recv
281     rreq->start();
282
283     if(request!=nullptr){
284       *request=rreq;
285     }else{
286       mut_->lock();
287       requests_.push_back(rreq);
288       mut_->unlock();
289     }
290   } else {
291     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
292     if(request!=nullptr)
293       *request=MPI_REQUEST_NULL;
294   }
295   return MPI_SUCCESS;
296 }
297
298 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
299               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
300 {
301   XBT_DEBUG("Entering MPI_Win_Accumulate");
302   //get receiver pointer
303   Win* recv_win = connected_wins_[target_rank];
304
305   //FIXME: local version
306   CHECK_WIN_LOCKED(recv_win)
307   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
308
309   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
310   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
311   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
312   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
313   // prepare send_request
314
315   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
316                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
317
318   // prepare receiver request
319   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
320                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
321
322   count_++;
323
324   // start send
325   sreq->start();
326   // push request to receiver's win
327   recv_win->mut_->lock();
328   recv_win->requests_.push_back(rreq);
329   rreq->start();
330   recv_win->mut_->unlock();
331
332   if (request != nullptr) {
333     *request = sreq;
334   } else {
335     mut_->lock();
336     requests_.push_back(sreq);
337     mut_->unlock();
338   }
339
340   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
341   // 'flush' is a workaround to fix that.
342   flush(target_rank);
343   XBT_DEBUG("Leaving MPI_Win_Accumulate");
344   return MPI_SUCCESS;
345 }
346
347 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
348                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
349                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
350 {
351   //get sender pointer
352   const Win* send_win = connected_wins_[target_rank];
353
354   CHECK_WIN_LOCKED(send_win)
355   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
356
357   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
358   //need to be sure ops are correctly ordered, so finish request here ? slow.
359   MPI_Request req = MPI_REQUEST_NULL;
360   send_win->atomic_mut_->lock();
361   get(result_addr, result_count, result_datatype, target_rank,
362               target_disp, target_count, target_datatype, &req);
363   if (req != MPI_REQUEST_NULL)
364     Request::wait(&req, MPI_STATUS_IGNORE);
365   if(op!=MPI_NO_OP)
366     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
367               target_disp, target_count, target_datatype, op, &req);
368   if (req != MPI_REQUEST_NULL)
369     Request::wait(&req, MPI_STATUS_IGNORE);
370   send_win->atomic_mut_->unlock();
371   return MPI_SUCCESS;
372 }
373
374 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
375                           int target_rank, MPI_Aint target_disp)
376 {
377   //get sender pointer
378   const Win* send_win = connected_wins_[target_rank];
379
380   CHECK_WIN_LOCKED(send_win)
381
382   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
383   MPI_Request req = MPI_REQUEST_NULL;
384   send_win->atomic_mut_->lock();
385   get(result_addr, 1, datatype, target_rank,
386               target_disp, 1, datatype, &req);
387   if (req != MPI_REQUEST_NULL)
388     Request::wait(&req, MPI_STATUS_IGNORE);
389   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
390     put(origin_addr, 1, datatype, target_rank,
391               target_disp, 1, datatype);
392   }
393   send_win->atomic_mut_->unlock();
394   return MPI_SUCCESS;
395 }
396
397 int Win::start(MPI_Group group, int /*assert*/)
398 {
399   /* From MPI forum advices
400   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
401   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
402   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
403   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
404   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
405   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
406   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
407   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
408   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
409   must complete, without further dependencies.  */
410
411   //naive, blocking implementation.
412   XBT_DEBUG("Entering MPI_Win_Start");
413   std::vector<MPI_Request> reqs;
414   for (int i = 0; i < group->size(); i++) {
415     int src = comm_->group()->rank(group->actor(i));
416     xbt_assert(src != MPI_UNDEFINED);
417     if (src != rank_)
418       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
419   }
420   int size = static_cast<int>(reqs.size());
421
422   Request::startall(size, reqs.data());
423   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
424   for (auto& req : reqs)
425     Request::unref(&req);
426
427   group->ref();
428   dst_group_ = group;
429   opened_++; // we're open for business !
430   XBT_DEBUG("Leaving MPI_Win_Start");
431   return MPI_SUCCESS;
432 }
433
434 int Win::post(MPI_Group group, int /*assert*/)
435 {
436   //let's make a synchronous send here
437   XBT_DEBUG("Entering MPI_Win_Post");
438   std::vector<MPI_Request> reqs;
439   for (int i = 0; i < group->size(); i++) {
440     int dst = comm_->group()->rank(group->actor(i));
441     xbt_assert(dst != MPI_UNDEFINED);
442     if (dst != rank_)
443       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
444   }
445   int size = static_cast<int>(reqs.size());
446
447   Request::startall(size, reqs.data());
448   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
449   for (auto& req : reqs)
450     Request::unref(&req);
451
452   group->ref();
453   src_group_ = group;
454   opened_++; // we're open for business !
455   XBT_DEBUG("Leaving MPI_Win_Post");
456   return MPI_SUCCESS;
457 }
458
459 int Win::complete(){
460   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
461
462   XBT_DEBUG("Entering MPI_Win_Complete");
463   std::vector<MPI_Request> reqs;
464   for (int i = 0; i < dst_group_->size(); i++) {
465     int dst = comm_->group()->rank(dst_group_->actor(i));
466     xbt_assert(dst != MPI_UNDEFINED);
467     if (dst != rank_)
468       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
469   }
470   int size = static_cast<int>(reqs.size());
471
472   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
473   Request::startall(size, reqs.data());
474   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
475   for (auto& req : reqs)
476     Request::unref(&req);
477
478   flush_local_all();
479
480   opened_--; //we're closed for business !
481   Group::unref(dst_group_);
482   dst_group_ = MPI_GROUP_NULL;
483   return MPI_SUCCESS;
484 }
485
486 int Win::wait(){
487   //naive, blocking implementation.
488   XBT_DEBUG("Entering MPI_Win_Wait");
489   std::vector<MPI_Request> reqs;
490   for (int i = 0; i < src_group_->size(); i++) {
491     int src = comm_->group()->rank(src_group_->actor(i));
492     xbt_assert(src != MPI_UNDEFINED);
493     if (src != rank_)
494       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
495   }
496   int size = static_cast<int>(reqs.size());
497
498   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
499   Request::startall(size, reqs.data());
500   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
501   for (auto& req : reqs)
502     Request::unref(&req);
503
504   flush_local_all();
505
506   opened_--; //we're closed for business !
507   Group::unref(src_group_);
508   src_group_ = MPI_GROUP_NULL;
509   return MPI_SUCCESS;
510 }
511
512 int Win::lock(int lock_type, int rank, int /*assert*/)
513 {
514   MPI_Win target_win = connected_wins_[rank];
515
516   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
517     target_win->lock_mut_->lock();
518     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
519     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
520       target_win->lock_mut_->unlock();
521    }
522   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
523     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
524
525   target_win->lockers_.push_back(rank_);
526
527   flush(rank);
528   return MPI_SUCCESS;
529 }
530
531 int Win::lock_all(int assert){
532   int retval = MPI_SUCCESS;
533   for (int i = 0; i < comm_->size(); i++) {
534     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
535     if (ret != MPI_SUCCESS)
536       retval = ret;
537   }
538   return retval;
539 }
540
541 int Win::unlock(int rank){
542   MPI_Win target_win = connected_wins_[rank];
543   int target_mode = target_win->mode_;
544   target_win->mode_= 0;
545   target_win->lockers_.remove(rank_);
546   if (target_mode==MPI_LOCK_EXCLUSIVE){
547     target_win->lock_mut_->unlock();
548   }
549
550   flush(rank);
551   return MPI_SUCCESS;
552 }
553
554 int Win::unlock_all(){
555   int retval = MPI_SUCCESS;
556   for (int i = 0; i < comm_->size(); i++) {
557     int ret = this->unlock(i);
558     if (ret != MPI_SUCCESS)
559       retval = ret;
560   }
561   return retval;
562 }
563
564 int Win::flush(int rank){
565   int finished = finish_comms(rank);
566   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
567   if (rank != rank_) {
568     finished = connected_wins_[rank]->finish_comms(rank_);
569     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
570   }
571   return MPI_SUCCESS;
572 }
573
574 int Win::flush_local(int rank){
575   int finished = finish_comms(rank);
576   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
577   return MPI_SUCCESS;
578 }
579
580 int Win::flush_all(){
581   int finished = finish_comms();
582   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
583   for (int i = 0; i < comm_->size(); i++) {
584     if (i != rank_) {
585       finished = connected_wins_[i]->finish_comms(rank_);
586       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
587     }
588   }
589   return MPI_SUCCESS;
590 }
591
592 int Win::flush_local_all(){
593   int finished = finish_comms();
594   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
595   return MPI_SUCCESS;
596 }
597
598 Win* Win::f2c(int id){
599   return static_cast<Win*>(F2C::f2c(id));
600 }
601
602 int Win::finish_comms(){
603   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
604   // Without this, the vector could get redimensioned when another process pushes.
605   // This would result in the array used by Request::waitall() to be invalidated.
606   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
607   mut_->lock();
608   //Finish own requests
609   int size = static_cast<int>(requests_.size());
610   if (size > 0) {
611     MPI_Request* treqs = requests_.data();
612     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
613     requests_.clear();
614   }
615   mut_->unlock();
616   return size;
617 }
618
619 int Win::finish_comms(int rank){
620   // See comment about the mutex in finish_comms() above
621   mut_->lock();
622   // Finish own requests
623   // Let's see if we're either the destination or the sender of this request
624   // because we only wait for requests that we are responsible for.
625   // Also use the process id here since the request itself returns from src()
626   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
627   aid_t proc_id = comm_->group()->actor(rank);
628   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
629     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
630   });
631   std::vector<MPI_Request> myreqqs(it, end(requests_));
632   requests_.erase(it, end(requests_));
633   int size = static_cast<int>(myreqqs.size());
634   if (size > 0) {
635     MPI_Request* treqs = myreqqs.data();
636     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
637     myreqqs.clear();
638   }
639   mut_->unlock();
640   return size;
641 }
642
643 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
644 {
645   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
646   for (int i = 0; not target_win && i < comm_->size(); i++) {
647     if (connected_wins_[i]->size_ > 0)
648       target_win = connected_wins_[i];
649   }
650   if (target_win) {
651     *size                         = target_win->size_;
652     *disp_unit                    = target_win->disp_unit_;
653     *static_cast<void**>(baseptr) = target_win->base_;
654   } else {
655     *size                         = 0;
656     *static_cast<void**>(baseptr) = nullptr;
657   }
658   return MPI_SUCCESS;
659 }
660
661 MPI_Errhandler Win::errhandler()
662 {
663   if (errhandler_ != MPI_ERRHANDLER_NULL)
664     errhandler_->ref();
665   return errhandler_;
666 }
667
668 void Win::set_errhandler(MPI_Errhandler errhandler)
669 {
670   if (errhandler_ != MPI_ERRHANDLER_NULL)
671     simgrid::smpi::Errhandler::unref(errhandler_);
672   errhandler_ = errhandler;
673   if (errhandler_ != MPI_ERRHANDLER_NULL)
674     errhandler_->ref();
675 }
676 } // namespace smpi
677 } // namespace simgrid