Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
0a34e118714c6177509a9e1b2d0386cd5d4d054b
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/modelchecker.h>
7 #include "smpi_win.hpp"
8
9 #include "private.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_comm.hpp"
12 #include "smpi_datatype.hpp"
13 #include "smpi_info.hpp"
14 #include "smpi_keyvals.hpp"
15 #include "smpi_request.hpp"
16 #include "src/smpi/include/smpi_actor.hpp"
17 #include "src/mc/mc_replay.hpp"
18
19 #include <algorithm>
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
22
23 #define CHECK_RMA_REMOTE_WIN(fun, win)\
24   if(target_count*target_datatype->get_extent()>win->size_){\
25     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
26     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
27     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
28     return MPI_ERR_RMA_RANGE;\
29   }
30
31 #define CHECK_WIN_LOCKED(win)                                                                                          \
32   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
33     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
34     if (not locked)                                                                                                    \
35       return MPI_ERR_WIN;                                                                                              \
36   }
37
38 namespace simgrid{
39 namespace smpi{
40 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
41 int Win::keyval_id_=0;
42
43 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
44     : base_(base)
45     , size_(size)
46     , disp_unit_(disp_unit)
47     , info_(info)
48     , comm_(comm)
49     , connected_wins_(comm->size())
50     , rank_(comm->rank())
51     , allocated_(allocated)
52     , dynamic_(dynamic)
53 {
54   XBT_DEBUG("Creating window");
55   if(info!=MPI_INFO_NULL)
56     info->ref();
57   connected_wins_[rank_] = this;
58   errhandler_->ref();
59   comm->add_rma_win(this);
60   comm->ref();
61
62   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
63                    MPI_BYTE, comm);
64   if  (MC_is_active() || MC_record_replay_is_active()){
65     s4u::Barrier* bar_ptr;
66     if (rank_ == 0) {
67       bar_ = s4u::Barrier::create(comm->size());
68       bar_ptr = bar_.get();
69     }
70     colls::bcast(&bar_ptr, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
71     if (rank_ != 0)
72       bar_ = s4u::BarrierPtr(bar_ptr);
73     bar_->wait();
74   }else{
75     colls::barrier(comm);
76   }
77   this->add_f();
78 }
79
80 int Win::del(Win* win){
81   //As per the standard, perform a barrier to ensure every async comm is finished
82   if  (MC_is_active() || MC_record_replay_is_active())
83     win->bar_->wait();
84   else
85     colls::barrier(win->comm_);
86   win->flush_local_all();
87
88   if (win->info_ != MPI_INFO_NULL)
89     simgrid::smpi::Info::unref(win->info_);
90   if (win->errhandler_ != MPI_ERRHANDLER_NULL)
91     simgrid::smpi::Errhandler::unref(win->errhandler_);
92
93   win->comm_->remove_rma_win(win);
94
95   colls::barrier(win->comm_);
96   Comm::unref(win->comm_);
97   if (!win->lockers_.empty() || win->opened_ < 0){
98     XBT_WARN("Freeing a locked or opened window");
99     return MPI_ERR_WIN;
100   }
101   if (win->allocated_)
102     xbt_free(win->base_);
103
104   F2C::free_f(win->f2c_id());
105   win->cleanup_attr<Win>();
106
107   delete win;
108   return MPI_SUCCESS;
109 }
110
111 int Win::attach(void* /*base*/, MPI_Aint size)
112 {
113   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
114     return MPI_ERR_ARG;
115   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
116   size_+=size;
117   return MPI_SUCCESS;
118 }
119
120 int Win::detach(const void* /*base*/)
121 {
122   base_=MPI_BOTTOM;
123   size_=-1;
124   return MPI_SUCCESS;
125 }
126
127 void Win::get_name(char* name, int* length) const
128 {
129   *length = static_cast<int>(name_.length());
130   if (not name_.empty()) {
131     name_.copy(name, *length);
132     name[*length] = '\0';
133   }
134 }
135
136 void Win::get_group(MPI_Group* group){
137   if(comm_ != MPI_COMM_NULL){
138     *group = comm_->group();
139   } else {
140     *group = MPI_GROUP_NULL;
141   }
142 }
143
144 MPI_Info Win::info()
145 {
146   return info_;
147 }
148
149 int Win::rank() const
150 {
151   return rank_;
152 }
153
154 MPI_Comm Win::comm() const
155 {
156   return comm_;
157 }
158
159 MPI_Aint Win::size() const
160 {
161   return size_;
162 }
163
164 void* Win::base() const
165 {
166   return base_;
167 }
168
169 int Win::disp_unit() const
170 {
171   return disp_unit_;
172 }
173
174 bool Win::dynamic() const
175 {
176   return dynamic_;
177 }
178
179 void Win::set_info(MPI_Info info)
180 {
181   if (info_ != MPI_INFO_NULL)
182     simgrid::smpi::Info::unref(info_);
183   info_ = info;
184   if (info_ != MPI_INFO_NULL)
185     info_->ref();
186 }
187
188 void Win::set_name(const char* name){
189   name_ = name;
190 }
191
192 int Win::fence(int assert)
193 {
194   XBT_DEBUG("Entering fence");
195   opened_++;
196   if (not (assert & MPI_MODE_NOPRECEDE)) {
197     // This is not the first fence => finalize what came before
198     if (MC_is_active() || MC_record_replay_is_active())
199       bar_->wait();
200     else
201       colls::barrier(comm_);
202     flush_local_all();
203     count_=0;
204   }
205
206   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
207     opened_=0;
208   assert_ = assert;
209   if (MC_is_active() || MC_record_replay_is_active())
210     bar_->wait();
211   else
212     colls::barrier(comm_);
213   XBT_DEBUG("Leaving fence");
214
215   return MPI_SUCCESS;
216 }
217
218 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
219               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
220 {
221   //get receiver pointer
222   Win* recv_win = connected_wins_[target_rank];
223
224   CHECK_WIN_LOCKED(recv_win)
225   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
226
227   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
228
229   if (target_rank != rank_) { // This is not for myself, so we need to send messages
230     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
231     // prepare send_request
232     MPI_Request sreq =
233         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
234                                MPI_OP_NULL);
235
236     //prepare receiver request
237     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
238                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
239
240     //start send
241     sreq->start();
242
243     if(request!=nullptr){
244       *request=sreq;
245     }else{
246       mut_->lock();
247       requests_.push_back(sreq);
248       mut_->unlock();
249     }
250
251     //push request to receiver's win
252     recv_win->mut_->lock();
253     recv_win->requests_.push_back(rreq);
254     rreq->start();
255     recv_win->mut_->unlock();
256   } else {
257     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
258     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
259     if(request!=nullptr)
260       *request = MPI_REQUEST_NULL;
261   }
262
263   return MPI_SUCCESS;
264 }
265
266 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
267               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
268 {
269   //get sender pointer
270   Win* send_win = connected_wins_[target_rank];
271
272   CHECK_WIN_LOCKED(send_win)
273   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
274
275   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
276   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
277
278   if (target_rank != rank_) {
279     //prepare send_request
280     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
281                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
282
283     //prepare receiver request
284     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
285                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
286
287     //start the send, with another process than us as sender.
288     sreq->start();
289     // push request to sender's win
290     send_win->mut_->lock();
291     send_win->requests_.push_back(sreq);
292     send_win->mut_->unlock();
293
294     //start recv
295     rreq->start();
296
297     if(request!=nullptr){
298       *request=rreq;
299     }else{
300       mut_->lock();
301       requests_.push_back(rreq);
302       mut_->unlock();
303     }
304   } else {
305     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
306     if(request!=nullptr)
307       *request=MPI_REQUEST_NULL;
308   }
309   return MPI_SUCCESS;
310 }
311
312 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
313               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
314 {
315   XBT_DEBUG("Entering MPI_Win_Accumulate");
316   //get receiver pointer
317   Win* recv_win = connected_wins_[target_rank];
318
319   //FIXME: local version
320   CHECK_WIN_LOCKED(recv_win)
321   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
322
323   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
324   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
325   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
326   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
327   // prepare send_request
328
329   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
330                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
331
332   // prepare receiver request
333   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
334                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
335
336   count_++;
337
338   // start send
339   sreq->start();
340   // push request to receiver's win
341   recv_win->mut_->lock();
342   recv_win->requests_.push_back(rreq);
343   rreq->start();
344   recv_win->mut_->unlock();
345
346   if (request != nullptr) {
347     *request = sreq;
348   } else {
349     mut_->lock();
350     requests_.push_back(sreq);
351     mut_->unlock();
352   }
353
354   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
355   // 'flush' is a workaround to fix that.
356   flush(target_rank);
357   XBT_DEBUG("Leaving MPI_Win_Accumulate");
358   return MPI_SUCCESS;
359 }
360
361 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
362                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
363                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
364 {
365   //get sender pointer
366   const Win* send_win = connected_wins_[target_rank];
367
368   CHECK_WIN_LOCKED(send_win)
369   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
370
371   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
372   //need to be sure ops are correctly ordered, so finish request here ? slow.
373   MPI_Request req = MPI_REQUEST_NULL;
374   send_win->atomic_mut_->lock();
375   get(result_addr, result_count, result_datatype, target_rank,
376               target_disp, target_count, target_datatype, &req);
377   if (req != MPI_REQUEST_NULL)
378     Request::wait(&req, MPI_STATUS_IGNORE);
379   if(op!=MPI_NO_OP)
380     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
381               target_disp, target_count, target_datatype, op, &req);
382   if (req != MPI_REQUEST_NULL)
383     Request::wait(&req, MPI_STATUS_IGNORE);
384   send_win->atomic_mut_->unlock();
385   return MPI_SUCCESS;
386 }
387
388 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
389                           int target_rank, MPI_Aint target_disp)
390 {
391   //get sender pointer
392   const Win* send_win = connected_wins_[target_rank];
393
394   CHECK_WIN_LOCKED(send_win)
395
396   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
397   MPI_Request req = MPI_REQUEST_NULL;
398   send_win->atomic_mut_->lock();
399   get(result_addr, 1, datatype, target_rank,
400               target_disp, 1, datatype, &req);
401   if (req != MPI_REQUEST_NULL)
402     Request::wait(&req, MPI_STATUS_IGNORE);
403   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
404     put(origin_addr, 1, datatype, target_rank,
405               target_disp, 1, datatype);
406   }
407   send_win->atomic_mut_->unlock();
408   return MPI_SUCCESS;
409 }
410
411 int Win::start(MPI_Group group, int /*assert*/)
412 {
413   /* From MPI forum advices
414   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
415   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
416   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
417   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
418   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
419   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
420   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
421   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
422   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
423   must complete, without further dependencies.  */
424
425   //naive, blocking implementation.
426   XBT_DEBUG("Entering MPI_Win_Start");
427   std::vector<MPI_Request> reqs;
428   for (int i = 0; i < group->size(); i++) {
429     int src = comm_->group()->rank(group->actor(i));
430     xbt_assert(src != MPI_UNDEFINED);
431     if (src != rank_)
432       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
433   }
434   int size = static_cast<int>(reqs.size());
435
436   Request::startall(size, reqs.data());
437   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
438   for (auto& req : reqs)
439     Request::unref(&req);
440
441   group->ref();
442   dst_group_ = group;
443   opened_--; // we're open for business !
444   XBT_DEBUG("Leaving MPI_Win_Start");
445   return MPI_SUCCESS;
446 }
447
448 int Win::post(MPI_Group group, int /*assert*/)
449 {
450   //let's make a synchronous send here
451   XBT_DEBUG("Entering MPI_Win_Post");
452   std::vector<MPI_Request> reqs;
453   for (int i = 0; i < group->size(); i++) {
454     int dst = comm_->group()->rank(group->actor(i));
455     xbt_assert(dst != MPI_UNDEFINED);
456     if (dst != rank_)
457       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
458   }
459   int size = static_cast<int>(reqs.size());
460
461   Request::startall(size, reqs.data());
462   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
463   for (auto& req : reqs)
464     Request::unref(&req);
465
466   group->ref();
467   src_group_ = group;
468   opened_--; // we're open for business !
469   XBT_DEBUG("Leaving MPI_Win_Post");
470   return MPI_SUCCESS;
471 }
472
473 int Win::complete(){
474   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
475
476   XBT_DEBUG("Entering MPI_Win_Complete");
477   std::vector<MPI_Request> reqs;
478   for (int i = 0; i < dst_group_->size(); i++) {
479     int dst = comm_->group()->rank(dst_group_->actor(i));
480     xbt_assert(dst != MPI_UNDEFINED);
481     if (dst != rank_)
482       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
483   }
484   int size = static_cast<int>(reqs.size());
485
486   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
487   Request::startall(size, reqs.data());
488   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
489   for (auto& req : reqs)
490     Request::unref(&req);
491
492   flush_local_all();
493
494   opened_++; //we're closed for business !
495   Group::unref(dst_group_);
496   dst_group_ = MPI_GROUP_NULL;
497   return MPI_SUCCESS;
498 }
499
500 int Win::wait(){
501   //naive, blocking implementation.
502   XBT_DEBUG("Entering MPI_Win_Wait");
503   std::vector<MPI_Request> reqs;
504   for (int i = 0; i < src_group_->size(); i++) {
505     int src = comm_->group()->rank(src_group_->actor(i));
506     xbt_assert(src != MPI_UNDEFINED);
507     if (src != rank_)
508       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
509   }
510   int size = static_cast<int>(reqs.size());
511
512   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
513   Request::startall(size, reqs.data());
514   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
515   for (auto& req : reqs)
516     Request::unref(&req);
517
518   flush_local_all();
519
520   opened_++; //we're closed for business !
521   Group::unref(src_group_);
522   src_group_ = MPI_GROUP_NULL;
523   return MPI_SUCCESS;
524 }
525
526 int Win::lock(int lock_type, int rank, int /*assert*/)
527 {
528   MPI_Win target_win = connected_wins_[rank];
529
530   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
531     target_win->lock_mut_->lock();
532     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
533     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
534       target_win->lock_mut_->unlock();
535    }
536   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
537     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
538
539   target_win->lockers_.push_back(rank_);
540
541   flush(rank);
542   return MPI_SUCCESS;
543 }
544
545 int Win::lock_all(int assert){
546   int retval = MPI_SUCCESS;
547   for (int i = 0; i < comm_->size(); i++) {
548     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
549     if (ret != MPI_SUCCESS)
550       retval = ret;
551   }
552   return retval;
553 }
554
555 int Win::unlock(int rank){
556   MPI_Win target_win = connected_wins_[rank];
557   int target_mode = target_win->mode_;
558   target_win->mode_= 0;
559   target_win->lockers_.remove(rank_);
560   if (target_mode==MPI_LOCK_EXCLUSIVE){
561     target_win->lock_mut_->unlock();
562   }
563
564   flush(rank);
565   return MPI_SUCCESS;
566 }
567
568 int Win::unlock_all(){
569   int retval = MPI_SUCCESS;
570   for (int i = 0; i < comm_->size(); i++) {
571     int ret = this->unlock(i);
572     if (ret != MPI_SUCCESS)
573       retval = ret;
574   }
575   return retval;
576 }
577
578 int Win::flush(int rank){
579   int finished = finish_comms(rank);
580   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
581   if (rank != rank_) {
582     finished = connected_wins_[rank]->finish_comms(rank_);
583     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
584   }
585   return MPI_SUCCESS;
586 }
587
588 int Win::flush_local(int rank){
589   int finished = finish_comms(rank);
590   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
591   return MPI_SUCCESS;
592 }
593
594 int Win::flush_all(){
595   int finished = finish_comms();
596   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
597   for (int i = 0; i < comm_->size(); i++) {
598     if (i != rank_) {
599       finished = connected_wins_[i]->finish_comms(rank_);
600       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
601     }
602   }
603   return MPI_SUCCESS;
604 }
605
606 int Win::flush_local_all(){
607   int finished = finish_comms();
608   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
609   return MPI_SUCCESS;
610 }
611
612 Win* Win::f2c(int id){
613   return static_cast<Win*>(F2C::f2c(id));
614 }
615
616 int Win::finish_comms(){
617   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
618   // Without this, the vector could get redimensioned when another process pushes.
619   // This would result in the array used by Request::waitall() to be invalidated.
620   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
621   mut_->lock();
622   //Finish own requests
623   int size = static_cast<int>(requests_.size());
624   if (size > 0) {
625     MPI_Request* treqs = requests_.data();
626     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
627     requests_.clear();
628   }
629   mut_->unlock();
630   return size;
631 }
632
633 int Win::finish_comms(int rank){
634   // See comment about the mutex in finish_comms() above
635   mut_->lock();
636   // Finish own requests
637   // Let's see if we're either the destination or the sender of this request
638   // because we only wait for requests that we are responsible for.
639   // Also use the process id here since the request itself returns from src()
640   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
641   aid_t proc_id = comm_->group()->actor(rank);
642   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
643     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
644   });
645   std::vector<MPI_Request> myreqqs(it, end(requests_));
646   requests_.erase(it, end(requests_));
647   int size = static_cast<int>(myreqqs.size());
648   if (size > 0) {
649     MPI_Request* treqs = myreqqs.data();
650     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
651     myreqqs.clear();
652   }
653   mut_->unlock();
654   return size;
655 }
656
657 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
658 {
659   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
660   for (int i = 0; not target_win && i < comm_->size(); i++) {
661     if (connected_wins_[i]->size_ > 0)
662       target_win = connected_wins_[i];
663   }
664   if (target_win) {
665     *size                         = target_win->size_;
666     *disp_unit                    = target_win->disp_unit_;
667     *static_cast<void**>(baseptr) = target_win->base_;
668   } else {
669     *size                         = 0;
670     *static_cast<void**>(baseptr) = nullptr;
671   }
672   return MPI_SUCCESS;
673 }
674
675 MPI_Errhandler Win::errhandler()
676 {
677   if (errhandler_ != MPI_ERRHANDLER_NULL)
678     errhandler_->ref();
679   return errhandler_;
680 }
681
682 void Win::set_errhandler(MPI_Errhandler errhandler)
683 {
684   if (errhandler_ != MPI_ERRHANDLER_NULL)
685     simgrid::smpi::Errhandler::unref(errhandler_);
686   errhandler_ = errhandler;
687   if (errhandler_ != MPI_ERRHANDLER_NULL)
688     errhandler_->ref();
689 }
690 } // namespace smpi
691 } // namespace simgrid