Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Improve debug messages and avoid calling finish_comms twice when for myself.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)                                                                                          \
30   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
31     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
32     if (not locked)                                                                                                    \
33       return MPI_ERR_WIN;                                                                                              \
34   }
35
36 namespace simgrid{
37 namespace smpi{
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
40
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
42     : base_(base)
43     , size_(size)
44     , disp_unit_(disp_unit)
45     , info_(info)
46     , comm_(comm)
47     , connected_wins_(comm->size())
48     , rank_(comm->rank())
49     , allocated_(allocated)
50     , dynamic_(dynamic)
51 {
52   XBT_DEBUG("Creating window");
53   if(info!=MPI_INFO_NULL)
54     info->ref();
55   connected_wins_[rank_] = this;
56   if(rank_==0){
57     bar_ = new s4u::Barrier(comm->size());
58   }
59   errhandler_->ref();
60   comm->add_rma_win(this);
61   comm->ref();
62
63   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64                    MPI_BYTE, comm);
65
66   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67
68   colls::barrier(comm);
69   this->add_f();
70 }
71
72 Win::~Win(){
73   //As per the standard, perform a barrier to ensure every async comm is finished
74   bar_->wait();
75
76   flush_local_all();
77
78   if (info_ != MPI_INFO_NULL)
79     simgrid::smpi::Info::unref(info_);
80   if (errhandler_ != MPI_ERRHANDLER_NULL)
81     simgrid::smpi::Errhandler::unref(errhandler_);
82
83   comm_->remove_rma_win(this);
84
85   colls::barrier(comm_);
86   Comm::unref(comm_);
87   
88   if (rank_ == 0)
89     delete bar_;
90
91   if (allocated_)
92     xbt_free(base_);
93
94   F2C::free_f(this->f2c_id());
95   cleanup_attr<Win>();
96 }
97
98 int Win::attach(void* /*base*/, MPI_Aint size)
99 {
100   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
101     return MPI_ERR_ARG;
102   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
103   size_+=size;
104   return MPI_SUCCESS;
105 }
106
107 int Win::detach(const void* /*base*/)
108 {
109   base_=MPI_BOTTOM;
110   size_=-1;
111   return MPI_SUCCESS;
112 }
113
114 void Win::get_name(char* name, int* length) const
115 {
116   *length = static_cast<int>(name_.length());
117   if (not name_.empty()) {
118     name_.copy(name, *length);
119     name[*length] = '\0';
120   }
121 }
122
123 void Win::get_group(MPI_Group* group){
124   if(comm_ != MPI_COMM_NULL){
125     *group = comm_->group();
126   } else {
127     *group = MPI_GROUP_NULL;
128   }
129 }
130
131 MPI_Info Win::info()
132 {
133   if (info_ == MPI_INFO_NULL)
134     info_ = new Info();
135   info_->ref();
136   return info_;
137 }
138
139 int Win::rank() const
140 {
141   return rank_;
142 }
143
144 MPI_Comm Win::comm() const
145 {
146   return comm_;
147 }
148
149 MPI_Aint Win::size() const
150 {
151   return size_;
152 }
153
154 void* Win::base() const
155 {
156   return base_;
157 }
158
159 int Win::disp_unit() const
160 {
161   return disp_unit_;
162 }
163
164 bool Win::dynamic() const
165 {
166   return dynamic_;
167 }
168
169 void Win::set_info(MPI_Info info)
170 {
171   if (info_ != MPI_INFO_NULL)
172     simgrid::smpi::Info::unref(info_);
173   info_ = info;
174   if (info_ != MPI_INFO_NULL)
175     info_->ref();
176 }
177
178 void Win::set_name(const char* name){
179   name_ = name;
180 }
181
182 int Win::fence(int assert)
183 {
184   XBT_DEBUG("Entering fence");
185   if (opened_ == 0)
186     opened_=1;
187   if (not (assert & MPI_MODE_NOPRECEDE)) {
188     // This is not the first fence => finalize what came before
189     bar_->wait();
190     mut_->lock();
191     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
192     // Without this, the vector could get redimensioned when another process pushes.
193     // This would result in the array used by Request::waitall() to be invalidated.
194     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
195
196     // start all requests that have been prepared by another process
197     if (not requests_.empty()) {
198       int size           = static_cast<int>(requests_.size());
199       MPI_Request* treqs = requests_.data();
200       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
201     }
202     count_=0;
203     mut_->unlock();
204   }
205
206   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
207     opened_=0;
208   assert_ = assert;
209
210   bar_->wait();
211   XBT_DEBUG("Leaving fence");
212
213   return MPI_SUCCESS;
214 }
215
216 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
217               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
218 {
219   //get receiver pointer
220   Win* recv_win = connected_wins_[target_rank];
221
222   CHECK_WIN_LOCKED(recv_win)
223   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
224
225   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
226
227   if (target_rank != rank_) { // This is not for myself, so we need to send messages
228     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
229     // prepare send_request
230     MPI_Request sreq =
231         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
232                                MPI_OP_NULL);
233
234     //prepare receiver request
235     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
236                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
237
238     //start send
239     sreq->start();
240
241     if(request!=nullptr){
242       *request=sreq;
243     }else{
244       mut_->lock();
245       requests_.push_back(sreq);
246       mut_->unlock();
247     }
248
249     //push request to receiver's win
250     recv_win->mut_->lock();
251     recv_win->requests_.push_back(rreq);
252     rreq->start();
253     recv_win->mut_->unlock();
254   } else {
255     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
256     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
257     if(request!=nullptr)
258       *request = MPI_REQUEST_NULL;
259   }
260
261   return MPI_SUCCESS;
262 }
263
264 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
265               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
266 {
267   //get sender pointer
268   Win* send_win = connected_wins_[target_rank];
269
270   CHECK_WIN_LOCKED(send_win)
271   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
272
273   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
274   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
275
276   if (target_rank != rank_) {
277     //prepare send_request
278     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
279                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
280
281     //prepare receiver request
282     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
283                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
284
285     //start the send, with another process than us as sender.
286     sreq->start();
287     // push request to sender's win
288     send_win->mut_->lock();
289     send_win->requests_.push_back(sreq);
290     send_win->mut_->unlock();
291
292     //start recv
293     rreq->start();
294
295     if(request!=nullptr){
296       *request=rreq;
297     }else{
298       mut_->lock();
299       requests_.push_back(rreq);
300       mut_->unlock();
301     }
302   } else {
303     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
304     if(request!=nullptr)
305       *request=MPI_REQUEST_NULL;
306   }
307   return MPI_SUCCESS;
308 }
309
310 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
311               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
312 {
313   XBT_DEBUG("Entering MPI_Win_Accumulate");
314   //get receiver pointer
315   Win* recv_win = connected_wins_[target_rank];
316
317   //FIXME: local version
318   CHECK_WIN_LOCKED(recv_win)
319   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
320
321   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
322   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
323   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
324   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
325   // prepare send_request
326
327   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
328                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
329
330   // prepare receiver request
331   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
332                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
333
334   count_++;
335
336   // start send
337   sreq->start();
338   // push request to receiver's win
339   recv_win->mut_->lock();
340   recv_win->requests_.push_back(rreq);
341   rreq->start();
342   recv_win->mut_->unlock();
343
344   if (request != nullptr) {
345     *request = sreq;
346   } else {
347     mut_->lock();
348     requests_.push_back(sreq);
349     mut_->unlock();
350   }
351
352   XBT_DEBUG("Leaving MPI_Win_Accumulate");
353   return MPI_SUCCESS;
354 }
355
356 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
357                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
358                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
359 {
360   //get sender pointer
361   const Win* send_win = connected_wins_[target_rank];
362
363   CHECK_WIN_LOCKED(send_win)
364   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
365
366   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
367   //need to be sure ops are correctly ordered, so finish request here ? slow.
368   MPI_Request req = MPI_REQUEST_NULL;
369   send_win->atomic_mut_->lock();
370   get(result_addr, result_count, result_datatype, target_rank,
371               target_disp, target_count, target_datatype, &req);
372   if (req != MPI_REQUEST_NULL)
373     Request::wait(&req, MPI_STATUS_IGNORE);
374   if(op!=MPI_NO_OP)
375     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
376               target_disp, target_count, target_datatype, op, &req);
377   if (req != MPI_REQUEST_NULL)
378     Request::wait(&req, MPI_STATUS_IGNORE);
379   send_win->atomic_mut_->unlock();
380   return MPI_SUCCESS;
381 }
382
383 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
384                           int target_rank, MPI_Aint target_disp)
385 {
386   //get sender pointer
387   const Win* send_win = connected_wins_[target_rank];
388
389   CHECK_WIN_LOCKED(send_win)
390
391   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
392   MPI_Request req = MPI_REQUEST_NULL;
393   send_win->atomic_mut_->lock();
394   get(result_addr, 1, datatype, target_rank,
395               target_disp, 1, datatype, &req);
396   if (req != MPI_REQUEST_NULL)
397     Request::wait(&req, MPI_STATUS_IGNORE);
398   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
399     put(origin_addr, 1, datatype, target_rank,
400               target_disp, 1, datatype);
401   }
402   send_win->atomic_mut_->unlock();
403   return MPI_SUCCESS;
404 }
405
406 int Win::start(MPI_Group group, int /*assert*/)
407 {
408   /* From MPI forum advices
409   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
410   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
411   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
412   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
413   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
414   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
415   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
416   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
417   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
418   must complete, without further dependencies.  */
419
420   //naive, blocking implementation.
421   XBT_DEBUG("Entering MPI_Win_Start");
422   std::vector<MPI_Request> reqs;
423   for (int i = 0; i < group->size(); i++) {
424     int src = comm_->group()->rank(group->actor(i));
425     xbt_assert(src != MPI_UNDEFINED);
426     if (src != rank_)
427       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
428   }
429   int size = static_cast<int>(reqs.size());
430
431   Request::startall(size, reqs.data());
432   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
433   for (auto& req : reqs)
434     Request::unref(&req);
435
436   group->ref();
437   dst_group_ = group;
438   opened_++; // we're open for business !
439   XBT_DEBUG("Leaving MPI_Win_Start");
440   return MPI_SUCCESS;
441 }
442
443 int Win::post(MPI_Group group, int /*assert*/)
444 {
445   //let's make a synchronous send here
446   XBT_DEBUG("Entering MPI_Win_Post");
447   std::vector<MPI_Request> reqs;
448   for (int i = 0; i < group->size(); i++) {
449     int dst = comm_->group()->rank(group->actor(i));
450     xbt_assert(dst != MPI_UNDEFINED);
451     if (dst != rank_)
452       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
453   }
454   int size = static_cast<int>(reqs.size());
455
456   Request::startall(size, reqs.data());
457   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
458   for (auto& req : reqs)
459     Request::unref(&req);
460
461   group->ref();
462   src_group_ = group;
463   opened_++; // we're open for business !
464   XBT_DEBUG("Leaving MPI_Win_Post");
465   return MPI_SUCCESS;
466 }
467
468 int Win::complete(){
469   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
470
471   XBT_DEBUG("Entering MPI_Win_Complete");
472   std::vector<MPI_Request> reqs;
473   for (int i = 0; i < dst_group_->size(); i++) {
474     int dst = comm_->group()->rank(dst_group_->actor(i));
475     xbt_assert(dst != MPI_UNDEFINED);
476     if (dst != rank_)
477       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
478   }
479   int size = static_cast<int>(reqs.size());
480
481   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
482   Request::startall(size, reqs.data());
483   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
484   for (auto& req : reqs)
485     Request::unref(&req);
486
487   flush_local_all();
488
489   opened_--; //we're closed for business !
490   Group::unref(dst_group_);
491   dst_group_ = MPI_GROUP_NULL;
492   return MPI_SUCCESS;
493 }
494
495 int Win::wait(){
496   //naive, blocking implementation.
497   XBT_DEBUG("Entering MPI_Win_Wait");
498   std::vector<MPI_Request> reqs;
499   for (int i = 0; i < src_group_->size(); i++) {
500     int src = comm_->group()->rank(src_group_->actor(i));
501     xbt_assert(src != MPI_UNDEFINED);
502     if (src != rank_)
503       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
504   }
505   int size = static_cast<int>(reqs.size());
506
507   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
508   Request::startall(size, reqs.data());
509   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
510   for (auto& req : reqs)
511     Request::unref(&req);
512
513   flush_local_all();
514
515   opened_--; //we're closed for business !
516   Group::unref(src_group_);
517   src_group_ = MPI_GROUP_NULL;
518   return MPI_SUCCESS;
519 }
520
521 int Win::lock(int lock_type, int rank, int /*assert*/)
522 {
523   MPI_Win target_win = connected_wins_[rank];
524
525   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
526     target_win->lock_mut_->lock();
527     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
528     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
529       target_win->lock_mut_->unlock();
530    }
531   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
532     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
533
534   target_win->lockers_.push_back(rank_);
535
536   flush(rank);
537   return MPI_SUCCESS;
538 }
539
540 int Win::lock_all(int assert){
541   int retval = MPI_SUCCESS;
542   for (int i = 0; i < comm_->size(); i++) {
543     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
544     if (ret != MPI_SUCCESS)
545       retval = ret;
546   }
547   return retval;
548 }
549
550 int Win::unlock(int rank){
551   MPI_Win target_win = connected_wins_[rank];
552   int target_mode = target_win->mode_;
553   target_win->mode_= 0;
554   target_win->lockers_.remove(rank_);
555   if (target_mode==MPI_LOCK_EXCLUSIVE){
556     target_win->lock_mut_->unlock();
557   }
558
559   flush(rank);
560   return MPI_SUCCESS;
561 }
562
563 int Win::unlock_all(){
564   int retval = MPI_SUCCESS;
565   for (int i = 0; i < comm_->size(); i++) {
566     int ret = this->unlock(i);
567     if (ret != MPI_SUCCESS)
568       retval = ret;
569   }
570   return retval;
571 }
572
573 int Win::flush(int rank){
574   int finished = finish_comms(rank);
575   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
576   if (rank != rank_) {
577     finished = connected_wins_[rank]->finish_comms(rank_);
578     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
579   }
580   return MPI_SUCCESS;
581 }
582
583 int Win::flush_local(int rank){
584   int finished = finish_comms(rank);
585   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
586   return MPI_SUCCESS;
587 }
588
589 int Win::flush_all(){
590   int finished = finish_comms();
591   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
592   for (int i = 0; i < comm_->size(); i++) {
593     if (i != rank_) {
594       finished = connected_wins_[i]->finish_comms(rank_);
595       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
596     }
597   }
598   return MPI_SUCCESS;
599 }
600
601 int Win::flush_local_all(){
602   int finished = finish_comms();
603   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
604   return MPI_SUCCESS;
605 }
606
607 Win* Win::f2c(int id){
608   return static_cast<Win*>(F2C::f2c(id));
609 }
610
611 int Win::finish_comms(){
612   mut_->lock();
613   //Finish own requests
614   int size = static_cast<int>(requests_.size());
615   if (size > 0) {
616     MPI_Request* treqs = requests_.data();
617     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
618     requests_.clear();
619   }
620   mut_->unlock();
621   return size;
622 }
623
624 int Win::finish_comms(int rank){
625   mut_->lock();
626   // Finish own requests
627   // Let's see if we're either the destination or the sender of this request
628   // because we only wait for requests that we are responsible for.
629   // Also use the process id here since the request itself returns from src()
630   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
631   aid_t proc_id = comm_->group()->actor(rank);
632   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
633     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
634   });
635   std::vector<MPI_Request> myreqqs(it, end(requests_));
636   requests_.erase(it, end(requests_));
637   int size = static_cast<int>(myreqqs.size());
638   if (size > 0) {
639     MPI_Request* treqs = myreqqs.data();
640     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
641     myreqqs.clear();
642   }
643   mut_->unlock();
644   return size;
645 }
646
647 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
648 {
649   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
650   for (int i = 0; not target_win && i < comm_->size(); i++) {
651     if (connected_wins_[i]->size_ > 0)
652       target_win = connected_wins_[i];
653   }
654   if (target_win) {
655     *size                         = target_win->size_;
656     *disp_unit                    = target_win->disp_unit_;
657     *static_cast<void**>(baseptr) = target_win->base_;
658   } else {
659     *size                         = 0;
660     *static_cast<void**>(baseptr) = nullptr;
661   }
662   return MPI_SUCCESS;
663 }
664
665 MPI_Errhandler Win::errhandler()
666 {
667   if (errhandler_ != MPI_ERRHANDLER_NULL)
668     errhandler_->ref();
669   return errhandler_;
670 }
671
672 void Win::set_errhandler(MPI_Errhandler errhandler)
673 {
674   if (errhandler_ != MPI_ERRHANDLER_NULL)
675     simgrid::smpi::Errhandler::unref(errhandler_);
676   errhandler_ = errhandler;
677   if (errhandler_ != MPI_ERRHANDLER_NULL)
678     errhandler_->ref();
679 }
680 } // namespace smpi
681 } // namespace simgrid