Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
another try at cleanly unlocking the mutexes before destroying them in SMPI::RMA
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
8
9 #include "private.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
18
19 #include <algorithm>
20 #include <mutex> // std::scoped_lock
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
23
24 #define CHECK_RMA_REMOTE_WIN(fun, win)\
25   if(target_count*target_datatype->get_extent()>win->size_){\
26     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
27     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
28     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
29     return MPI_ERR_RMA_RANGE;\
30   }
31
32 #define CHECK_WIN_LOCKED(win)                                                                                          \
33   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
34     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
35     if (not locked)                                                                                                    \
36       return MPI_ERR_WIN;                                                                                              \
37   }
38
39 namespace simgrid::smpi {
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
42
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44     : base_(base)
45     , size_(size)
46     , disp_unit_(disp_unit)
47     , info_(info)
48     , comm_(comm)
49     , connected_wins_(comm->size())
50     , rank_(comm->rank())
51     , allocated_(allocated)
52     , dynamic_(dynamic)
53 {
54   XBT_DEBUG("Creating window");
55   if(info!=MPI_INFO_NULL)
56     info->ref();
57   connected_wins_[rank_] = this;
58   errhandler_->ref();
59   comm->add_rma_win(this);
60   comm->ref();
61
62   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
63                    MPI_BYTE, comm);
64   if  (MC_is_active() || MC_record_replay_is_active()){
65     s4u::Barrier* bar_ptr;
66     if (rank_ == 0) {
67       bar_ = s4u::Barrier::create(comm->size());
68       bar_ptr = bar_.get();
69     }
70     colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
71     if (rank_ != 0)
72       bar_ = s4u::BarrierPtr(bar_ptr);
73   }
74   this->add_f();
75 }
76
77 int Win::del(Win* win){
78   //As per the standard, perform a barrier to ensure every async comm is finished
79   if  (MC_is_active() || MC_record_replay_is_active())
80     win->bar_->wait();
81   else
82     colls::barrier(win->comm_);
83   win->flush_local_all();
84
85   if (win->info_ != MPI_INFO_NULL)
86     simgrid::smpi::Info::unref(win->info_);
87   if (win->errhandler_ != MPI_ERRHANDLER_NULL)
88     simgrid::smpi::Errhandler::unref(win->errhandler_);
89
90   win->comm_->remove_rma_win(win);
91
92   colls::barrier(win->comm_);
93   Comm::unref(win->comm_);
94   if (not win->lockers_.empty() || win->opened_ < 0) {
95     XBT_WARN("Freeing a locked or opened window");
96     return MPI_ERR_WIN;
97   }
98   if (win->allocated_)
99     xbt_free(win->base_);
100   for (auto m : {win->mut_, win->lock_mut_, win->atomic_mut_})
101     if (m->get_owner() != nullptr)
102       m->unlock();
103
104   F2C::free_f(win->f2c_id());
105   win->cleanup_attr<Win>();
106
107   delete win;
108   return MPI_SUCCESS;
109 }
110
111 int Win::attach(void* /*base*/, MPI_Aint size)
112 {
113   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
114     return MPI_ERR_ARG;
115   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
116   size_+=size;
117   return MPI_SUCCESS;
118 }
119
120 int Win::detach(const void* /*base*/)
121 {
122   base_=MPI_BOTTOM;
123   size_=-1;
124   return MPI_SUCCESS;
125 }
126
127 void Win::get_name(char* name, int* length) const
128 {
129   *length = static_cast<int>(name_.length());
130   if (not name_.empty()) {
131     name_.copy(name, *length);
132     name[*length] = '\0';
133   }
134 }
135
136 void Win::get_group(MPI_Group* group){
137   if(comm_ != MPI_COMM_NULL){
138     *group = comm_->group();
139   } else {
140     *group = MPI_GROUP_NULL;
141   }
142 }
143
144 MPI_Info Win::info()
145 {
146   return info_;
147 }
148
149 int Win::rank() const
150 {
151   return rank_;
152 }
153
154 MPI_Comm Win::comm() const
155 {
156   return comm_;
157 }
158
159 MPI_Aint Win::size() const
160 {
161   return size_;
162 }
163
164 void* Win::base() const
165 {
166   return base_;
167 }
168
169 int Win::disp_unit() const
170 {
171   return disp_unit_;
172 }
173
174 bool Win::dynamic() const
175 {
176   return dynamic_;
177 }
178
179 void Win::set_info(MPI_Info info)
180 {
181   if (info_ != MPI_INFO_NULL)
182     simgrid::smpi::Info::unref(info_);
183   info_ = info;
184   if (info_ != MPI_INFO_NULL)
185     info_->ref();
186 }
187
188 void Win::set_name(const char* name){
189   name_ = name;
190 }
191
192 int Win::fence(int assert)
193 {
194   XBT_DEBUG("Entering fence");
195   opened_++;
196   if (not (assert & MPI_MODE_NOPRECEDE)) {
197     // This is not the first fence => finalize what came before
198     if (MC_is_active() || MC_record_replay_is_active())
199       bar_->wait();
200     else
201       colls::barrier(comm_);
202     flush_local_all();
203     count_=0;
204   }
205
206   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
207     opened_=0;
208   assert_ = assert;
209   if (MC_is_active() || MC_record_replay_is_active())
210     bar_->wait();
211   else
212     colls::barrier(comm_);
213   XBT_DEBUG("Leaving fence");
214
215   return MPI_SUCCESS;
216 }
217
218 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
219               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
220 {
221   //get receiver pointer
222   Win* recv_win = connected_wins_[target_rank];
223
224   CHECK_WIN_LOCKED(recv_win)
225   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
226
227   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
228
229   if (target_rank != rank_) { // This is not for myself, so we need to send messages
230     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
231     // prepare send_request
232     MPI_Request sreq =
233         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
234                                MPI_OP_NULL);
235
236     //prepare receiver request
237     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
238                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
239
240     //start send
241     sreq->start();
242
243     if(request!=nullptr){
244       *request=sreq;
245     }else{
246       const std::scoped_lock lock(*mut_);
247       requests_.push_back(sreq);
248     }
249
250     //push request to receiver's win
251     const std::scoped_lock recv_lock(*recv_win->mut_);
252     recv_win->requests_.push_back(rreq);
253     rreq->start();
254   } else {
255     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
256     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
257     if(request!=nullptr)
258       *request = MPI_REQUEST_NULL;
259   }
260
261   return MPI_SUCCESS;
262 }
263
264 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
265               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
266 {
267   //get sender pointer
268   Win* send_win = connected_wins_[target_rank];
269
270   CHECK_WIN_LOCKED(send_win)
271   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
272
273   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
274   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
275
276   if (target_rank != rank_) {
277     //prepare send_request
278     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
279                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
280
281     //prepare receiver request
282     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
283                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
284
285     //start the send, with another process than us as sender.
286     sreq->start();
287     // push request to sender's win
288     if (const std::scoped_lock send_lock(*send_win->mut_); true) {
289       send_win->requests_.push_back(sreq);
290     }
291
292     //start recv
293     rreq->start();
294
295     if(request!=nullptr){
296       *request=rreq;
297     }else{
298       const std::scoped_lock lock(*mut_);
299       requests_.push_back(rreq);
300     }
301   } else {
302     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
303     if(request!=nullptr)
304       *request=MPI_REQUEST_NULL;
305   }
306   return MPI_SUCCESS;
307 }
308
309 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
310               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
311 {
312   XBT_DEBUG("Entering MPI_Win_Accumulate");
313   //get receiver pointer
314   Win* recv_win = connected_wins_[target_rank];
315
316   //FIXME: local version
317   CHECK_WIN_LOCKED(recv_win)
318   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
319
320   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
321   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
322   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
323   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
324   // prepare send_request
325
326   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
327                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
328
329   // prepare receiver request
330   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
331                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
332
333   count_++;
334
335   // start send
336   sreq->start();
337   // push request to receiver's win
338   if (const std::scoped_lock recv_lock(*recv_win->mut_); true) {
339     recv_win->requests_.push_back(rreq);
340     rreq->start();
341   }
342
343   if (request != nullptr) {
344     *request = sreq;
345   } else {
346     const std::scoped_lock lock(*mut_);
347     requests_.push_back(sreq);
348   }
349
350   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
351   // 'flush' is a workaround to fix that.
352   flush(target_rank);
353   XBT_DEBUG("Leaving MPI_Win_Accumulate");
354   return MPI_SUCCESS;
355 }
356
357 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
358                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
359                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
360 {
361   //get sender pointer
362   const Win* send_win = connected_wins_[target_rank];
363
364   CHECK_WIN_LOCKED(send_win)
365   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
366
367   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
368   //need to be sure ops are correctly ordered, so finish request here ? slow.
369   MPI_Request req = MPI_REQUEST_NULL;
370   const std::scoped_lock lock(*send_win->atomic_mut_);
371   get(result_addr, result_count, result_datatype, target_rank,
372               target_disp, target_count, target_datatype, &req);
373   if (req != MPI_REQUEST_NULL)
374     Request::wait(&req, MPI_STATUS_IGNORE);
375   if(op!=MPI_NO_OP)
376     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
377               target_disp, target_count, target_datatype, op, &req);
378   if (req != MPI_REQUEST_NULL)
379     Request::wait(&req, MPI_STATUS_IGNORE);
380   return MPI_SUCCESS;
381 }
382
383 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
384                           int target_rank, MPI_Aint target_disp)
385 {
386   //get sender pointer
387   const Win* send_win = connected_wins_[target_rank];
388
389   CHECK_WIN_LOCKED(send_win)
390
391   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
392   MPI_Request req = MPI_REQUEST_NULL;
393   const std::scoped_lock lock(*send_win->atomic_mut_);
394   get(result_addr, 1, datatype, target_rank,
395               target_disp, 1, datatype, &req);
396   if (req != MPI_REQUEST_NULL)
397     Request::wait(&req, MPI_STATUS_IGNORE);
398   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
399     put(origin_addr, 1, datatype, target_rank,
400               target_disp, 1, datatype);
401   }
402   return MPI_SUCCESS;
403 }
404
405 int Win::start(MPI_Group group, int /*assert*/)
406 {
407   /* From MPI forum advices
408   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
409   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
410   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
411   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
412   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
413   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
414   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
415   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
416   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
417   must complete, without further dependencies.  */
418
419   //naive, blocking implementation.
420   XBT_DEBUG("Entering MPI_Win_Start");
421   std::vector<MPI_Request> reqs;
422   for (int i = 0; i < group->size(); i++) {
423     int src = comm_->group()->rank(group->actor(i));
424     xbt_assert(src != MPI_UNDEFINED);
425     if (src != rank_)
426       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
427   }
428   int size = static_cast<int>(reqs.size());
429
430   Request::startall(size, reqs.data());
431   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
432   for (auto& req : reqs)
433     Request::unref(&req);
434
435   group->ref();
436   dst_group_ = group;
437   opened_--; // we're open for business !
438   XBT_DEBUG("Leaving MPI_Win_Start");
439   return MPI_SUCCESS;
440 }
441
442 int Win::post(MPI_Group group, int /*assert*/)
443 {
444   //let's make a synchronous send here
445   XBT_DEBUG("Entering MPI_Win_Post");
446   std::vector<MPI_Request> reqs;
447   for (int i = 0; i < group->size(); i++) {
448     int dst = comm_->group()->rank(group->actor(i));
449     xbt_assert(dst != MPI_UNDEFINED);
450     if (dst != rank_)
451       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
452   }
453   int size = static_cast<int>(reqs.size());
454
455   Request::startall(size, reqs.data());
456   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
457   for (auto& req : reqs)
458     Request::unref(&req);
459
460   group->ref();
461   src_group_ = group;
462   opened_--; // we're open for business !
463   XBT_DEBUG("Leaving MPI_Win_Post");
464   return MPI_SUCCESS;
465 }
466
467 int Win::complete(){
468   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
469
470   XBT_DEBUG("Entering MPI_Win_Complete");
471   std::vector<MPI_Request> reqs;
472   for (int i = 0; i < dst_group_->size(); i++) {
473     int dst = comm_->group()->rank(dst_group_->actor(i));
474     xbt_assert(dst != MPI_UNDEFINED);
475     if (dst != rank_)
476       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
477   }
478   int size = static_cast<int>(reqs.size());
479
480   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
481   Request::startall(size, reqs.data());
482   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
483   for (auto& req : reqs)
484     Request::unref(&req);
485
486   flush_local_all();
487
488   opened_++; //we're closed for business !
489   Group::unref(dst_group_);
490   dst_group_ = MPI_GROUP_NULL;
491   return MPI_SUCCESS;
492 }
493
494 int Win::wait(){
495   //naive, blocking implementation.
496   XBT_DEBUG("Entering MPI_Win_Wait");
497   std::vector<MPI_Request> reqs;
498   for (int i = 0; i < src_group_->size(); i++) {
499     int src = comm_->group()->rank(src_group_->actor(i));
500     xbt_assert(src != MPI_UNDEFINED);
501     if (src != rank_)
502       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
503   }
504   int size = static_cast<int>(reqs.size());
505
506   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
507   Request::startall(size, reqs.data());
508   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
509   for (auto& req : reqs)
510     Request::unref(&req);
511
512   flush_local_all();
513
514   opened_++; //we're closed for business !
515   Group::unref(src_group_);
516   src_group_ = MPI_GROUP_NULL;
517   return MPI_SUCCESS;
518 }
519
520 int Win::lock(int lock_type, int rank, int /*assert*/)
521 {
522   MPI_Win target_win = connected_wins_[rank];
523
524   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
525     target_win->lock_mut_->lock();
526     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
527     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
528       target_win->lock_mut_->unlock();
529    }
530   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
531     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
532
533   target_win->lockers_.push_back(rank_);
534
535   flush(rank);
536   return MPI_SUCCESS;
537 }
538
539 int Win::lock_all(int assert){
540   int retval = MPI_SUCCESS;
541   for (int i = 0; i < comm_->size(); i++) {
542     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
543     if (ret != MPI_SUCCESS)
544       retval = ret;
545   }
546   return retval;
547 }
548
549 int Win::unlock(int rank){
550   MPI_Win target_win = connected_wins_[rank];
551   int target_mode = target_win->mode_;
552   target_win->mode_= 0;
553   target_win->lockers_.remove(rank_);
554   if (target_mode==MPI_LOCK_EXCLUSIVE){
555     target_win->lock_mut_->unlock();
556   }
557
558   flush(rank);
559   return MPI_SUCCESS;
560 }
561
562 int Win::unlock_all(){
563   int retval = MPI_SUCCESS;
564   for (int i = 0; i < comm_->size(); i++) {
565     int ret = this->unlock(i);
566     if (ret != MPI_SUCCESS)
567       retval = ret;
568   }
569   return retval;
570 }
571
572 int Win::flush(int rank){
573   int finished = finish_comms(rank);
574   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
575   if (rank != rank_) {
576     finished = connected_wins_[rank]->finish_comms(rank_);
577     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
578   }
579   return MPI_SUCCESS;
580 }
581
582 int Win::flush_local(int rank){
583   int finished = finish_comms(rank);
584   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
585   return MPI_SUCCESS;
586 }
587
588 int Win::flush_all(){
589   int finished = finish_comms();
590   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
591   for (int i = 0; i < comm_->size(); i++) {
592     if (i != rank_) {
593       finished = connected_wins_[i]->finish_comms(rank_);
594       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
595     }
596   }
597   return MPI_SUCCESS;
598 }
599
600 int Win::flush_local_all(){
601   int finished = finish_comms();
602   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
603   return MPI_SUCCESS;
604 }
605
606 Win* Win::f2c(int id){
607   return static_cast<Win*>(F2C::f2c(id));
608 }
609
610 int Win::finish_comms(){
611   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
612   // Without this, the vector could get redimensioned when another process pushes.
613   // This would result in the array used by Request::waitall() to be invalidated.
614   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
615   const std::scoped_lock lock(*mut_);
616   //Finish own requests
617   int size = static_cast<int>(requests_.size());
618   if (size > 0) {
619     MPI_Request* treqs = requests_.data();
620     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
621     requests_.clear();
622   }
623   return size;
624 }
625
626 int Win::finish_comms(int rank){
627   // See comment about the mutex in finish_comms() above
628   const std::scoped_lock lock(*mut_);
629   // Finish own requests
630   // Let's see if we're either the destination or the sender of this request
631   // because we only wait for requests that we are responsible for.
632   // Also use the process id here since the request itself returns from src()
633   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
634   aid_t proc_id = comm_->group()->actor(rank);
635   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
636     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
637   });
638   std::vector<MPI_Request> myreqqs(it, end(requests_));
639   requests_.erase(it, end(requests_));
640   int size = static_cast<int>(myreqqs.size());
641   if (size > 0) {
642     MPI_Request* treqs = myreqqs.data();
643     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
644     myreqqs.clear();
645   }
646   return size;
647 }
648
649 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
650 {
651   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
652   for (int i = 0; not target_win && i < comm_->size(); i++) {
653     if (connected_wins_[i]->size_ > 0)
654       target_win = connected_wins_[i];
655   }
656   if (target_win) {
657     *size                         = target_win->size_;
658     *disp_unit                    = target_win->disp_unit_;
659     *static_cast<void**>(baseptr) = target_win->base_;
660   } else {
661     *size                         = 0;
662     *static_cast<void**>(baseptr) = nullptr;
663   }
664   return MPI_SUCCESS;
665 }
666
667 MPI_Errhandler Win::errhandler()
668 {
669   if (errhandler_ != MPI_ERRHANDLER_NULL)
670     errhandler_->ref();
671   return errhandler_;
672 }
673
674 void Win::set_errhandler(MPI_Errhandler errhandler)
675 {
676   if (errhandler_ != MPI_ERRHANDLER_NULL)
677     simgrid::smpi::Errhandler::unref(errhandler_);
678   errhandler_ = errhandler;
679   if (errhandler_ != MPI_ERRHANDLER_NULL)
680     errhandler_->ref();
681 }
682 } // namespace simgrid::smpi