Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Try to fix a failure about mutex freed too early in RMA
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
8
9 #include "private.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
18
19 #include <algorithm>
20 #include <mutex> // std::scoped_lock
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
23
24 #define CHECK_RMA_REMOTE_WIN(fun, win)\
25   if(target_count*target_datatype->get_extent()>win->size_){\
26     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
27     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
28     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
29     return MPI_ERR_RMA_RANGE;\
30   }
31
32 #define CHECK_WIN_LOCKED(win)                                                                                          \
33   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
34     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
35     if (not locked)                                                                                                    \
36       return MPI_ERR_WIN;                                                                                              \
37   }
38
39 namespace simgrid::smpi {
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
42
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44     : base_(base)
45     , size_(size)
46     , disp_unit_(disp_unit)
47     , info_(info)
48     , comm_(comm)
49     , connected_wins_(comm->size())
50     , rank_(comm->rank())
51     , allocated_(allocated)
52     , dynamic_(dynamic)
53 {
54   XBT_DEBUG("Creating window");
55   if(info!=MPI_INFO_NULL)
56     info->ref();
57   connected_wins_[rank_] = this;
58   errhandler_->ref();
59   comm->add_rma_win(this);
60   comm->ref();
61
62   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
63                    MPI_BYTE, comm);
64   if  (MC_is_active() || MC_record_replay_is_active()){
65     s4u::Barrier* bar_ptr;
66     if (rank_ == 0) {
67       bar_ = s4u::Barrier::create(comm->size());
68       bar_ptr = bar_.get();
69     }
70     colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
71     if (rank_ != 0)
72       bar_ = s4u::BarrierPtr(bar_ptr);
73   }
74   this->add_f();
75 }
76
77 int Win::del(Win* win){
78   //As per the standard, perform a barrier to ensure every async comm is finished
79   if  (MC_is_active() || MC_record_replay_is_active())
80     win->bar_->wait();
81   else
82     colls::barrier(win->comm_);
83   win->flush_local_all();
84
85   if (win->info_ != MPI_INFO_NULL)
86     simgrid::smpi::Info::unref(win->info_);
87   if (win->errhandler_ != MPI_ERRHANDLER_NULL)
88     simgrid::smpi::Errhandler::unref(win->errhandler_);
89
90   win->comm_->remove_rma_win(win);
91
92   colls::barrier(win->comm_);
93   Comm::unref(win->comm_);
94   if (not win->lockers_.empty() || win->opened_ < 0) {
95     XBT_WARN("Freeing a locked or opened window");
96     return MPI_ERR_WIN;
97   }
98   if (win->allocated_)
99     xbt_free(win->base_);
100   if (win->mut_->get_owner() != nullptr)
101     win->mut_->unlock();
102
103   F2C::free_f(win->f2c_id());
104   win->cleanup_attr<Win>();
105
106   delete win;
107   return MPI_SUCCESS;
108 }
109
110 int Win::attach(void* /*base*/, MPI_Aint size)
111 {
112   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
113     return MPI_ERR_ARG;
114   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
115   size_+=size;
116   return MPI_SUCCESS;
117 }
118
119 int Win::detach(const void* /*base*/)
120 {
121   base_=MPI_BOTTOM;
122   size_=-1;
123   return MPI_SUCCESS;
124 }
125
126 void Win::get_name(char* name, int* length) const
127 {
128   *length = static_cast<int>(name_.length());
129   if (not name_.empty()) {
130     name_.copy(name, *length);
131     name[*length] = '\0';
132   }
133 }
134
135 void Win::get_group(MPI_Group* group){
136   if(comm_ != MPI_COMM_NULL){
137     *group = comm_->group();
138   } else {
139     *group = MPI_GROUP_NULL;
140   }
141 }
142
143 MPI_Info Win::info()
144 {
145   return info_;
146 }
147
148 int Win::rank() const
149 {
150   return rank_;
151 }
152
153 MPI_Comm Win::comm() const
154 {
155   return comm_;
156 }
157
158 MPI_Aint Win::size() const
159 {
160   return size_;
161 }
162
163 void* Win::base() const
164 {
165   return base_;
166 }
167
168 int Win::disp_unit() const
169 {
170   return disp_unit_;
171 }
172
173 bool Win::dynamic() const
174 {
175   return dynamic_;
176 }
177
178 void Win::set_info(MPI_Info info)
179 {
180   if (info_ != MPI_INFO_NULL)
181     simgrid::smpi::Info::unref(info_);
182   info_ = info;
183   if (info_ != MPI_INFO_NULL)
184     info_->ref();
185 }
186
187 void Win::set_name(const char* name){
188   name_ = name;
189 }
190
191 int Win::fence(int assert)
192 {
193   XBT_DEBUG("Entering fence");
194   opened_++;
195   if (not (assert & MPI_MODE_NOPRECEDE)) {
196     // This is not the first fence => finalize what came before
197     if (MC_is_active() || MC_record_replay_is_active())
198       bar_->wait();
199     else
200       colls::barrier(comm_);
201     flush_local_all();
202     count_=0;
203   }
204
205   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
206     opened_=0;
207   assert_ = assert;
208   if (MC_is_active() || MC_record_replay_is_active())
209     bar_->wait();
210   else
211     colls::barrier(comm_);
212   XBT_DEBUG("Leaving fence");
213
214   return MPI_SUCCESS;
215 }
216
217 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
218               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
219 {
220   //get receiver pointer
221   Win* recv_win = connected_wins_[target_rank];
222
223   CHECK_WIN_LOCKED(recv_win)
224   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
225
226   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
227
228   if (target_rank != rank_) { // This is not for myself, so we need to send messages
229     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
230     // prepare send_request
231     MPI_Request sreq =
232         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
233                                MPI_OP_NULL);
234
235     //prepare receiver request
236     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
237                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
238
239     //start send
240     sreq->start();
241
242     if(request!=nullptr){
243       *request=sreq;
244     }else{
245       const std::scoped_lock lock(*mut_);
246       requests_.push_back(sreq);
247     }
248
249     //push request to receiver's win
250     const std::scoped_lock recv_lock(*recv_win->mut_);
251     recv_win->requests_.push_back(rreq);
252     rreq->start();
253   } else {
254     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
255     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
256     if(request!=nullptr)
257       *request = MPI_REQUEST_NULL;
258   }
259
260   return MPI_SUCCESS;
261 }
262
263 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
264               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
265 {
266   //get sender pointer
267   Win* send_win = connected_wins_[target_rank];
268
269   CHECK_WIN_LOCKED(send_win)
270   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
271
272   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
273   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
274
275   if (target_rank != rank_) {
276     //prepare send_request
277     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
278                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
279
280     //prepare receiver request
281     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
282                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
283
284     //start the send, with another process than us as sender.
285     sreq->start();
286     // push request to sender's win
287     if (const std::scoped_lock send_lock(*send_win->mut_); true) {
288       send_win->requests_.push_back(sreq);
289     }
290
291     //start recv
292     rreq->start();
293
294     if(request!=nullptr){
295       *request=rreq;
296     }else{
297       const std::scoped_lock lock(*mut_);
298       requests_.push_back(rreq);
299     }
300   } else {
301     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
302     if(request!=nullptr)
303       *request=MPI_REQUEST_NULL;
304   }
305   return MPI_SUCCESS;
306 }
307
308 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
309               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
310 {
311   XBT_DEBUG("Entering MPI_Win_Accumulate");
312   //get receiver pointer
313   Win* recv_win = connected_wins_[target_rank];
314
315   //FIXME: local version
316   CHECK_WIN_LOCKED(recv_win)
317   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
318
319   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
320   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
321   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
322   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
323   // prepare send_request
324
325   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
326                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
327
328   // prepare receiver request
329   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
330                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
331
332   count_++;
333
334   // start send
335   sreq->start();
336   // push request to receiver's win
337   if (const std::scoped_lock recv_lock(*recv_win->mut_); true) {
338     recv_win->requests_.push_back(rreq);
339     rreq->start();
340   }
341
342   if (request != nullptr) {
343     *request = sreq;
344   } else {
345     const std::scoped_lock lock(*mut_);
346     requests_.push_back(sreq);
347   }
348
349   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
350   // 'flush' is a workaround to fix that.
351   flush(target_rank);
352   XBT_DEBUG("Leaving MPI_Win_Accumulate");
353   return MPI_SUCCESS;
354 }
355
356 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
357                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
358                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
359 {
360   //get sender pointer
361   const Win* send_win = connected_wins_[target_rank];
362
363   CHECK_WIN_LOCKED(send_win)
364   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
365
366   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
367   //need to be sure ops are correctly ordered, so finish request here ? slow.
368   MPI_Request req = MPI_REQUEST_NULL;
369   const std::scoped_lock lock(*send_win->atomic_mut_);
370   get(result_addr, result_count, result_datatype, target_rank,
371               target_disp, target_count, target_datatype, &req);
372   if (req != MPI_REQUEST_NULL)
373     Request::wait(&req, MPI_STATUS_IGNORE);
374   if(op!=MPI_NO_OP)
375     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
376               target_disp, target_count, target_datatype, op, &req);
377   if (req != MPI_REQUEST_NULL)
378     Request::wait(&req, MPI_STATUS_IGNORE);
379   return MPI_SUCCESS;
380 }
381
382 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
383                           int target_rank, MPI_Aint target_disp)
384 {
385   //get sender pointer
386   const Win* send_win = connected_wins_[target_rank];
387
388   CHECK_WIN_LOCKED(send_win)
389
390   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
391   MPI_Request req = MPI_REQUEST_NULL;
392   const std::scoped_lock lock(*send_win->atomic_mut_);
393   get(result_addr, 1, datatype, target_rank,
394               target_disp, 1, datatype, &req);
395   if (req != MPI_REQUEST_NULL)
396     Request::wait(&req, MPI_STATUS_IGNORE);
397   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
398     put(origin_addr, 1, datatype, target_rank,
399               target_disp, 1, datatype);
400   }
401   return MPI_SUCCESS;
402 }
403
404 int Win::start(MPI_Group group, int /*assert*/)
405 {
406   /* From MPI forum advices
407   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
408   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
409   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
410   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
411   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
412   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
413   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
414   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
415   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
416   must complete, without further dependencies.  */
417
418   //naive, blocking implementation.
419   XBT_DEBUG("Entering MPI_Win_Start");
420   std::vector<MPI_Request> reqs;
421   for (int i = 0; i < group->size(); i++) {
422     int src = comm_->group()->rank(group->actor(i));
423     xbt_assert(src != MPI_UNDEFINED);
424     if (src != rank_)
425       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
426   }
427   int size = static_cast<int>(reqs.size());
428
429   Request::startall(size, reqs.data());
430   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
431   for (auto& req : reqs)
432     Request::unref(&req);
433
434   group->ref();
435   dst_group_ = group;
436   opened_--; // we're open for business !
437   XBT_DEBUG("Leaving MPI_Win_Start");
438   return MPI_SUCCESS;
439 }
440
441 int Win::post(MPI_Group group, int /*assert*/)
442 {
443   //let's make a synchronous send here
444   XBT_DEBUG("Entering MPI_Win_Post");
445   std::vector<MPI_Request> reqs;
446   for (int i = 0; i < group->size(); i++) {
447     int dst = comm_->group()->rank(group->actor(i));
448     xbt_assert(dst != MPI_UNDEFINED);
449     if (dst != rank_)
450       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
451   }
452   int size = static_cast<int>(reqs.size());
453
454   Request::startall(size, reqs.data());
455   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
456   for (auto& req : reqs)
457     Request::unref(&req);
458
459   group->ref();
460   src_group_ = group;
461   opened_--; // we're open for business !
462   XBT_DEBUG("Leaving MPI_Win_Post");
463   return MPI_SUCCESS;
464 }
465
466 int Win::complete(){
467   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
468
469   XBT_DEBUG("Entering MPI_Win_Complete");
470   std::vector<MPI_Request> reqs;
471   for (int i = 0; i < dst_group_->size(); i++) {
472     int dst = comm_->group()->rank(dst_group_->actor(i));
473     xbt_assert(dst != MPI_UNDEFINED);
474     if (dst != rank_)
475       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
476   }
477   int size = static_cast<int>(reqs.size());
478
479   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
480   Request::startall(size, reqs.data());
481   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
482   for (auto& req : reqs)
483     Request::unref(&req);
484
485   flush_local_all();
486
487   opened_++; //we're closed for business !
488   Group::unref(dst_group_);
489   dst_group_ = MPI_GROUP_NULL;
490   return MPI_SUCCESS;
491 }
492
493 int Win::wait(){
494   //naive, blocking implementation.
495   XBT_DEBUG("Entering MPI_Win_Wait");
496   std::vector<MPI_Request> reqs;
497   for (int i = 0; i < src_group_->size(); i++) {
498     int src = comm_->group()->rank(src_group_->actor(i));
499     xbt_assert(src != MPI_UNDEFINED);
500     if (src != rank_)
501       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
502   }
503   int size = static_cast<int>(reqs.size());
504
505   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
506   Request::startall(size, reqs.data());
507   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
508   for (auto& req : reqs)
509     Request::unref(&req);
510
511   flush_local_all();
512
513   opened_++; //we're closed for business !
514   Group::unref(src_group_);
515   src_group_ = MPI_GROUP_NULL;
516   return MPI_SUCCESS;
517 }
518
519 int Win::lock(int lock_type, int rank, int /*assert*/)
520 {
521   MPI_Win target_win = connected_wins_[rank];
522
523   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
524     target_win->lock_mut_->lock();
525     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
526     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
527       target_win->lock_mut_->unlock();
528    }
529   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
530     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
531
532   target_win->lockers_.push_back(rank_);
533
534   flush(rank);
535   return MPI_SUCCESS;
536 }
537
538 int Win::lock_all(int assert){
539   int retval = MPI_SUCCESS;
540   for (int i = 0; i < comm_->size(); i++) {
541     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
542     if (ret != MPI_SUCCESS)
543       retval = ret;
544   }
545   return retval;
546 }
547
548 int Win::unlock(int rank){
549   MPI_Win target_win = connected_wins_[rank];
550   int target_mode = target_win->mode_;
551   target_win->mode_= 0;
552   target_win->lockers_.remove(rank_);
553   if (target_mode==MPI_LOCK_EXCLUSIVE){
554     target_win->lock_mut_->unlock();
555   }
556
557   flush(rank);
558   return MPI_SUCCESS;
559 }
560
561 int Win::unlock_all(){
562   int retval = MPI_SUCCESS;
563   for (int i = 0; i < comm_->size(); i++) {
564     int ret = this->unlock(i);
565     if (ret != MPI_SUCCESS)
566       retval = ret;
567   }
568   return retval;
569 }
570
571 int Win::flush(int rank){
572   int finished = finish_comms(rank);
573   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
574   if (rank != rank_) {
575     finished = connected_wins_[rank]->finish_comms(rank_);
576     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
577   }
578   return MPI_SUCCESS;
579 }
580
581 int Win::flush_local(int rank){
582   int finished = finish_comms(rank);
583   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
584   return MPI_SUCCESS;
585 }
586
587 int Win::flush_all(){
588   int finished = finish_comms();
589   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
590   for (int i = 0; i < comm_->size(); i++) {
591     if (i != rank_) {
592       finished = connected_wins_[i]->finish_comms(rank_);
593       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
594     }
595   }
596   return MPI_SUCCESS;
597 }
598
599 int Win::flush_local_all(){
600   int finished = finish_comms();
601   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
602   return MPI_SUCCESS;
603 }
604
605 Win* Win::f2c(int id){
606   return static_cast<Win*>(F2C::f2c(id));
607 }
608
609 int Win::finish_comms(){
610   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
611   // Without this, the vector could get redimensioned when another process pushes.
612   // This would result in the array used by Request::waitall() to be invalidated.
613   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
614   const std::scoped_lock lock(*mut_);
615   //Finish own requests
616   int size = static_cast<int>(requests_.size());
617   if (size > 0) {
618     MPI_Request* treqs = requests_.data();
619     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
620     requests_.clear();
621   }
622   return size;
623 }
624
625 int Win::finish_comms(int rank){
626   // See comment about the mutex in finish_comms() above
627   const std::scoped_lock lock(*mut_);
628   // Finish own requests
629   // Let's see if we're either the destination or the sender of this request
630   // because we only wait for requests that we are responsible for.
631   // Also use the process id here since the request itself returns from src()
632   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
633   aid_t proc_id = comm_->group()->actor(rank);
634   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
635     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
636   });
637   std::vector<MPI_Request> myreqqs(it, end(requests_));
638   requests_.erase(it, end(requests_));
639   int size = static_cast<int>(myreqqs.size());
640   if (size > 0) {
641     MPI_Request* treqs = myreqqs.data();
642     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
643     myreqqs.clear();
644   }
645   return size;
646 }
647
648 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
649 {
650   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
651   for (int i = 0; not target_win && i < comm_->size(); i++) {
652     if (connected_wins_[i]->size_ > 0)
653       target_win = connected_wins_[i];
654   }
655   if (target_win) {
656     *size                         = target_win->size_;
657     *disp_unit                    = target_win->disp_unit_;
658     *static_cast<void**>(baseptr) = target_win->base_;
659   } else {
660     *size                         = 0;
661     *static_cast<void**>(baseptr) = nullptr;
662   }
663   return MPI_SUCCESS;
664 }
665
666 MPI_Errhandler Win::errhandler()
667 {
668   if (errhandler_ != MPI_ERRHANDLER_NULL)
669     errhandler_->ref();
670   return errhandler_;
671 }
672
673 void Win::set_errhandler(MPI_Errhandler errhandler)
674 {
675   if (errhandler_ != MPI_ERRHANDLER_NULL)
676     simgrid::smpi::Errhandler::unref(errhandler_);
677   errhandler_ = errhandler;
678   if (errhandler_ != MPI_ERRHANDLER_NULL)
679     errhandler_->ref();
680 }
681 } // namespace simgrid::smpi