Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
make a single barrier per RMA window and share it through broadcast
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
8
9 #include "private.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
18
19 #include <algorithm>
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24   if(target_count*target_datatype->get_extent()>win->size_){\
25     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28     return MPI_ERR_RMA_RANGE;\
29   }
30
31 #define CHECK_WIN_LOCKED(win)                                                                                          \
32   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
33     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
34     if (not locked)                                                                                                    \
35       return MPI_ERR_WIN;                                                                                              \
36   }
37
38 namespace simgrid{
39 namespace smpi{
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
42
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44     : base_(base)
45     , size_(size)
46     , disp_unit_(disp_unit)
47     , info_(info)
48     , comm_(comm)
49     , connected_wins_(comm->size())
50     , rank_(comm->rank())
51     , allocated_(allocated)
52     , dynamic_(dynamic)
53 {
54   XBT_DEBUG("Creating window");
55   if(info!=MPI_INFO_NULL)
56     info->ref();
57   connected_wins_[rank_] = this;
58   errhandler_->ref();
59   comm->add_rma_win(this);
60   comm->ref();
61
62   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
63                    MPI_BYTE, comm);
64   if  (MC_is_active() || MC_record_replay_is_active()){
65     s4u::Barrier* bar_ptr;
66     if (rank_ == 0) {
67       bar_ = s4u::Barrier::create(comm->size());
68       bar_ptr = bar_.get();
69     }
70     colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
71     if (rank_ != 0)
72       bar_ = s4u::BarrierPtr(bar_ptr);
73     bar_->wait();
74   }else{
75     colls::barrier(comm);
76   }
77   this->add_f();
78 }
79
80 Win::~Win(){
81   //As per the standard, perform a barrier to ensure every async comm is finished
82   if  (MC_is_active() || MC_record_replay_is_active())
83     bar_->wait();
84   else
85     colls::barrier(comm_);
86   flush_local_all();
87
88   if (info_ != MPI_INFO_NULL)
89     simgrid::smpi::Info::unref(info_);
90   if (errhandler_ != MPI_ERRHANDLER_NULL)
91     simgrid::smpi::Errhandler::unref(errhandler_);
92
93   comm_->remove_rma_win(this);
94
95   colls::barrier(comm_);
96   Comm::unref(comm_);
97
98   if (allocated_)
99     xbt_free(base_);
100
101   F2C::free_f(this->f2c_id());
102   cleanup_attr<Win>();
103 }
104
105 int Win::attach(void* /*base*/, MPI_Aint size)
106 {
107   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
108     return MPI_ERR_ARG;
109   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
110   size_+=size;
111   return MPI_SUCCESS;
112 }
113
114 int Win::detach(const void* /*base*/)
115 {
116   base_=MPI_BOTTOM;
117   size_=-1;
118   return MPI_SUCCESS;
119 }
120
121 void Win::get_name(char* name, int* length) const
122 {
123   *length = static_cast<int>(name_.length());
124   if (not name_.empty()) {
125     name_.copy(name, *length);
126     name[*length] = '\0';
127   }
128 }
129
130 void Win::get_group(MPI_Group* group){
131   if(comm_ != MPI_COMM_NULL){
132     *group = comm_->group();
133   } else {
134     *group = MPI_GROUP_NULL;
135   }
136 }
137
138 MPI_Info Win::info()
139 {
140   return info_;
141 }
142
143 int Win::rank() const
144 {
145   return rank_;
146 }
147
148 MPI_Comm Win::comm() const
149 {
150   return comm_;
151 }
152
153 MPI_Aint Win::size() const
154 {
155   return size_;
156 }
157
158 void* Win::base() const
159 {
160   return base_;
161 }
162
163 int Win::disp_unit() const
164 {
165   return disp_unit_;
166 }
167
168 bool Win::dynamic() const
169 {
170   return dynamic_;
171 }
172
173 void Win::set_info(MPI_Info info)
174 {
175   if (info_ != MPI_INFO_NULL)
176     simgrid::smpi::Info::unref(info_);
177   info_ = info;
178   if (info_ != MPI_INFO_NULL)
179     info_->ref();
180 }
181
182 void Win::set_name(const char* name){
183   name_ = name;
184 }
185
186 int Win::fence(int assert)
187 {
188   XBT_DEBUG("Entering fence");
189   opened_++;
190   if (not (assert & MPI_MODE_NOPRECEDE)) {
191     // This is not the first fence => finalize what came before
192     if (MC_is_active() || MC_record_replay_is_active())
193       bar_->wait();
194     else
195       colls::barrier(comm_);
196     flush_local_all();
197     count_=0;
198   }
199
200   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
201     opened_=0;
202   assert_ = assert;
203   if (MC_is_active() || MC_record_replay_is_active())
204     bar_->wait();
205   else
206     colls::barrier(comm_);
207   XBT_DEBUG("Leaving fence");
208
209   return MPI_SUCCESS;
210 }
211
212 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
213               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
214 {
215   //get receiver pointer
216   Win* recv_win = connected_wins_[target_rank];
217
218   CHECK_WIN_LOCKED(recv_win)
219   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
220
221   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
222
223   if (target_rank != rank_) { // This is not for myself, so we need to send messages
224     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
225     // prepare send_request
226     MPI_Request sreq =
227         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
228                                MPI_OP_NULL);
229
230     //prepare receiver request
231     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
232                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
233
234     //start send
235     sreq->start();
236
237     if(request!=nullptr){
238       *request=sreq;
239     }else{
240       mut_->lock();
241       requests_.push_back(sreq);
242       mut_->unlock();
243     }
244
245     //push request to receiver's win
246     recv_win->mut_->lock();
247     recv_win->requests_.push_back(rreq);
248     rreq->start();
249     recv_win->mut_->unlock();
250   } else {
251     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
252     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
253     if(request!=nullptr)
254       *request = MPI_REQUEST_NULL;
255   }
256
257   return MPI_SUCCESS;
258 }
259
260 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
261               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
262 {
263   //get sender pointer
264   Win* send_win = connected_wins_[target_rank];
265
266   CHECK_WIN_LOCKED(send_win)
267   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
268
269   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
270   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
271
272   if (target_rank != rank_) {
273     //prepare send_request
274     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
275                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
276
277     //prepare receiver request
278     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
279                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
280
281     //start the send, with another process than us as sender.
282     sreq->start();
283     // push request to sender's win
284     send_win->mut_->lock();
285     send_win->requests_.push_back(sreq);
286     send_win->mut_->unlock();
287
288     //start recv
289     rreq->start();
290
291     if(request!=nullptr){
292       *request=rreq;
293     }else{
294       mut_->lock();
295       requests_.push_back(rreq);
296       mut_->unlock();
297     }
298   } else {
299     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
300     if(request!=nullptr)
301       *request=MPI_REQUEST_NULL;
302   }
303   return MPI_SUCCESS;
304 }
305
306 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
307               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
308 {
309   XBT_DEBUG("Entering MPI_Win_Accumulate");
310   //get receiver pointer
311   Win* recv_win = connected_wins_[target_rank];
312
313   //FIXME: local version
314   CHECK_WIN_LOCKED(recv_win)
315   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
316
317   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
318   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
319   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
320   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
321   // prepare send_request
322
323   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
324                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
325
326   // prepare receiver request
327   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
328                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
329
330   count_++;
331
332   // start send
333   sreq->start();
334   // push request to receiver's win
335   recv_win->mut_->lock();
336   recv_win->requests_.push_back(rreq);
337   rreq->start();
338   recv_win->mut_->unlock();
339
340   if (request != nullptr) {
341     *request = sreq;
342   } else {
343     mut_->lock();
344     requests_.push_back(sreq);
345     mut_->unlock();
346   }
347
348   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
349   // 'flush' is a workaround to fix that.
350   flush(target_rank);
351   XBT_DEBUG("Leaving MPI_Win_Accumulate");
352   return MPI_SUCCESS;
353 }
354
355 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
356                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
357                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
358 {
359   //get sender pointer
360   const Win* send_win = connected_wins_[target_rank];
361
362   CHECK_WIN_LOCKED(send_win)
363   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
364
365   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
366   //need to be sure ops are correctly ordered, so finish request here ? slow.
367   MPI_Request req = MPI_REQUEST_NULL;
368   send_win->atomic_mut_->lock();
369   get(result_addr, result_count, result_datatype, target_rank,
370               target_disp, target_count, target_datatype, &req);
371   if (req != MPI_REQUEST_NULL)
372     Request::wait(&req, MPI_STATUS_IGNORE);
373   if(op!=MPI_NO_OP)
374     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
375               target_disp, target_count, target_datatype, op, &req);
376   if (req != MPI_REQUEST_NULL)
377     Request::wait(&req, MPI_STATUS_IGNORE);
378   send_win->atomic_mut_->unlock();
379   return MPI_SUCCESS;
380 }
381
382 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
383                           int target_rank, MPI_Aint target_disp)
384 {
385   //get sender pointer
386   const Win* send_win = connected_wins_[target_rank];
387
388   CHECK_WIN_LOCKED(send_win)
389
390   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
391   MPI_Request req = MPI_REQUEST_NULL;
392   send_win->atomic_mut_->lock();
393   get(result_addr, 1, datatype, target_rank,
394               target_disp, 1, datatype, &req);
395   if (req != MPI_REQUEST_NULL)
396     Request::wait(&req, MPI_STATUS_IGNORE);
397   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
398     put(origin_addr, 1, datatype, target_rank,
399               target_disp, 1, datatype);
400   }
401   send_win->atomic_mut_->unlock();
402   return MPI_SUCCESS;
403 }
404
405 int Win::start(MPI_Group group, int /*assert*/)
406 {
407   /* From MPI forum advices
408   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
409   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
410   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
411   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
412   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
413   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
414   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
415   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
416   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
417   must complete, without further dependencies.  */
418
419   //naive, blocking implementation.
420   XBT_DEBUG("Entering MPI_Win_Start");
421   std::vector<MPI_Request> reqs;
422   for (int i = 0; i < group->size(); i++) {
423     int src = comm_->group()->rank(group->actor(i));
424     xbt_assert(src != MPI_UNDEFINED);
425     if (src != rank_)
426       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
427   }
428   int size = static_cast<int>(reqs.size());
429
430   Request::startall(size, reqs.data());
431   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
432   for (auto& req : reqs)
433     Request::unref(&req);
434
435   group->ref();
436   dst_group_ = group;
437   opened_++; // we're open for business !
438   XBT_DEBUG("Leaving MPI_Win_Start");
439   return MPI_SUCCESS;
440 }
441
442 int Win::post(MPI_Group group, int /*assert*/)
443 {
444   //let's make a synchronous send here
445   XBT_DEBUG("Entering MPI_Win_Post");
446   std::vector<MPI_Request> reqs;
447   for (int i = 0; i < group->size(); i++) {
448     int dst = comm_->group()->rank(group->actor(i));
449     xbt_assert(dst != MPI_UNDEFINED);
450     if (dst != rank_)
451       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
452   }
453   int size = static_cast<int>(reqs.size());
454
455   Request::startall(size, reqs.data());
456   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
457   for (auto& req : reqs)
458     Request::unref(&req);
459
460   group->ref();
461   src_group_ = group;
462   opened_++; // we're open for business !
463   XBT_DEBUG("Leaving MPI_Win_Post");
464   return MPI_SUCCESS;
465 }
466
467 int Win::complete(){
468   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
469
470   XBT_DEBUG("Entering MPI_Win_Complete");
471   std::vector<MPI_Request> reqs;
472   for (int i = 0; i < dst_group_->size(); i++) {
473     int dst = comm_->group()->rank(dst_group_->actor(i));
474     xbt_assert(dst != MPI_UNDEFINED);
475     if (dst != rank_)
476       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
477   }
478   int size = static_cast<int>(reqs.size());
479
480   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
481   Request::startall(size, reqs.data());
482   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
483   for (auto& req : reqs)
484     Request::unref(&req);
485
486   flush_local_all();
487
488   opened_--; //we're closed for business !
489   Group::unref(dst_group_);
490   dst_group_ = MPI_GROUP_NULL;
491   return MPI_SUCCESS;
492 }
493
494 int Win::wait(){
495   //naive, blocking implementation.
496   XBT_DEBUG("Entering MPI_Win_Wait");
497   std::vector<MPI_Request> reqs;
498   for (int i = 0; i < src_group_->size(); i++) {
499     int src = comm_->group()->rank(src_group_->actor(i));
500     xbt_assert(src != MPI_UNDEFINED);
501     if (src != rank_)
502       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
503   }
504   int size = static_cast<int>(reqs.size());
505
506   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
507   Request::startall(size, reqs.data());
508   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
509   for (auto& req : reqs)
510     Request::unref(&req);
511
512   flush_local_all();
513
514   opened_--; //we're closed for business !
515   Group::unref(src_group_);
516   src_group_ = MPI_GROUP_NULL;
517   return MPI_SUCCESS;
518 }
519
520 int Win::lock(int lock_type, int rank, int /*assert*/)
521 {
522   MPI_Win target_win = connected_wins_[rank];
523
524   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
525     target_win->lock_mut_->lock();
526     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
527     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
528       target_win->lock_mut_->unlock();
529    }
530   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
531     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
532
533   target_win->lockers_.push_back(rank_);
534
535   flush(rank);
536   return MPI_SUCCESS;
537 }
538
539 int Win::lock_all(int assert){
540   int retval = MPI_SUCCESS;
541   for (int i = 0; i < comm_->size(); i++) {
542     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
543     if (ret != MPI_SUCCESS)
544       retval = ret;
545   }
546   return retval;
547 }
548
549 int Win::unlock(int rank){
550   MPI_Win target_win = connected_wins_[rank];
551   int target_mode = target_win->mode_;
552   target_win->mode_= 0;
553   target_win->lockers_.remove(rank_);
554   if (target_mode==MPI_LOCK_EXCLUSIVE){
555     target_win->lock_mut_->unlock();
556   }
557
558   flush(rank);
559   return MPI_SUCCESS;
560 }
561
562 int Win::unlock_all(){
563   int retval = MPI_SUCCESS;
564   for (int i = 0; i < comm_->size(); i++) {
565     int ret = this->unlock(i);
566     if (ret != MPI_SUCCESS)
567       retval = ret;
568   }
569   return retval;
570 }
571
572 int Win::flush(int rank){
573   int finished = finish_comms(rank);
574   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
575   if (rank != rank_) {
576     finished = connected_wins_[rank]->finish_comms(rank_);
577     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
578   }
579   return MPI_SUCCESS;
580 }
581
582 int Win::flush_local(int rank){
583   int finished = finish_comms(rank);
584   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
585   return MPI_SUCCESS;
586 }
587
588 int Win::flush_all(){
589   int finished = finish_comms();
590   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
591   for (int i = 0; i < comm_->size(); i++) {
592     if (i != rank_) {
593       finished = connected_wins_[i]->finish_comms(rank_);
594       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
595     }
596   }
597   return MPI_SUCCESS;
598 }
599
600 int Win::flush_local_all(){
601   int finished = finish_comms();
602   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
603   return MPI_SUCCESS;
604 }
605
606 Win* Win::f2c(int id){
607   return static_cast<Win*>(F2C::f2c(id));
608 }
609
610 int Win::finish_comms(){
611   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
612   // Without this, the vector could get redimensioned when another process pushes.
613   // This would result in the array used by Request::waitall() to be invalidated.
614   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
615   mut_->lock();
616   //Finish own requests
617   int size = static_cast<int>(requests_.size());
618   if (size > 0) {
619     MPI_Request* treqs = requests_.data();
620     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
621     requests_.clear();
622   }
623   mut_->unlock();
624   return size;
625 }
626
627 int Win::finish_comms(int rank){
628   // See comment about the mutex in finish_comms() above
629   mut_->lock();
630   // Finish own requests
631   // Let's see if we're either the destination or the sender of this request
632   // because we only wait for requests that we are responsible for.
633   // Also use the process id here since the request itself returns from src()
634   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
635   aid_t proc_id = comm_->group()->actor(rank);
636   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
637     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
638   });
639   std::vector<MPI_Request> myreqqs(it, end(requests_));
640   requests_.erase(it, end(requests_));
641   int size = static_cast<int>(myreqqs.size());
642   if (size > 0) {
643     MPI_Request* treqs = myreqqs.data();
644     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
645     myreqqs.clear();
646   }
647   mut_->unlock();
648   return size;
649 }
650
651 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
652 {
653   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
654   for (int i = 0; not target_win && i < comm_->size(); i++) {
655     if (connected_wins_[i]->size_ > 0)
656       target_win = connected_wins_[i];
657   }
658   if (target_win) {
659     *size                         = target_win->size_;
660     *disp_unit                    = target_win->disp_unit_;
661     *static_cast<void**>(baseptr) = target_win->base_;
662   } else {
663     *size                         = 0;
664     *static_cast<void**>(baseptr) = nullptr;
665   }
666   return MPI_SUCCESS;
667 }
668
669 MPI_Errhandler Win::errhandler()
670 {
671   if (errhandler_ != MPI_ERRHANDLER_NULL)
672     errhandler_->ref();
673   return errhandler_;
674 }
675
676 void Win::set_errhandler(MPI_Errhandler errhandler)
677 {
678   if (errhandler_ != MPI_ERRHANDLER_NULL)
679     simgrid::smpi::Errhandler::unref(errhandler_);
680   errhandler_ = errhandler;
681   if (errhandler_ != MPI_ERRHANDLER_NULL)
682     errhandler_->ref();
683 }
684 } // namespace smpi
685 } // namespace simgrid