Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use existing function (also empties requests_ after waitall).
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)                                                                                          \
30   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
31     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
32     if (not locked)                                                                                                    \
33       return MPI_ERR_WIN;                                                                                              \
34   }
35
36 namespace simgrid{
37 namespace smpi{
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
40
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
42     : base_(base)
43     , size_(size)
44     , disp_unit_(disp_unit)
45     , info_(info)
46     , comm_(comm)
47     , connected_wins_(comm->size())
48     , rank_(comm->rank())
49     , allocated_(allocated)
50     , dynamic_(dynamic)
51 {
52   XBT_DEBUG("Creating window");
53   if(info!=MPI_INFO_NULL)
54     info->ref();
55   connected_wins_[rank_] = this;
56   if(rank_==0){
57     bar_ = new s4u::Barrier(comm->size());
58   }
59   errhandler_->ref();
60   comm->add_rma_win(this);
61   comm->ref();
62
63   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64                    MPI_BYTE, comm);
65
66   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67
68   colls::barrier(comm);
69   this->add_f();
70 }
71
72 Win::~Win(){
73   //As per the standard, perform a barrier to ensure every async comm is finished
74   bar_->wait();
75
76   flush_local_all();
77
78   if (info_ != MPI_INFO_NULL)
79     simgrid::smpi::Info::unref(info_);
80   if (errhandler_ != MPI_ERRHANDLER_NULL)
81     simgrid::smpi::Errhandler::unref(errhandler_);
82
83   comm_->remove_rma_win(this);
84
85   colls::barrier(comm_);
86   Comm::unref(comm_);
87   
88   if (rank_ == 0)
89     delete bar_;
90
91   if (allocated_)
92     xbt_free(base_);
93
94   F2C::free_f(this->f2c_id());
95   cleanup_attr<Win>();
96 }
97
98 int Win::attach(void* /*base*/, MPI_Aint size)
99 {
100   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
101     return MPI_ERR_ARG;
102   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
103   size_+=size;
104   return MPI_SUCCESS;
105 }
106
107 int Win::detach(const void* /*base*/)
108 {
109   base_=MPI_BOTTOM;
110   size_=-1;
111   return MPI_SUCCESS;
112 }
113
114 void Win::get_name(char* name, int* length) const
115 {
116   *length = static_cast<int>(name_.length());
117   if (not name_.empty()) {
118     name_.copy(name, *length);
119     name[*length] = '\0';
120   }
121 }
122
123 void Win::get_group(MPI_Group* group){
124   if(comm_ != MPI_COMM_NULL){
125     *group = comm_->group();
126   } else {
127     *group = MPI_GROUP_NULL;
128   }
129 }
130
131 MPI_Info Win::info()
132 {
133   if (info_ == MPI_INFO_NULL)
134     info_ = new Info();
135   info_->ref();
136   return info_;
137 }
138
139 int Win::rank() const
140 {
141   return rank_;
142 }
143
144 MPI_Comm Win::comm() const
145 {
146   return comm_;
147 }
148
149 MPI_Aint Win::size() const
150 {
151   return size_;
152 }
153
154 void* Win::base() const
155 {
156   return base_;
157 }
158
159 int Win::disp_unit() const
160 {
161   return disp_unit_;
162 }
163
164 bool Win::dynamic() const
165 {
166   return dynamic_;
167 }
168
169 void Win::set_info(MPI_Info info)
170 {
171   if (info_ != MPI_INFO_NULL)
172     simgrid::smpi::Info::unref(info_);
173   info_ = info;
174   if (info_ != MPI_INFO_NULL)
175     info_->ref();
176 }
177
178 void Win::set_name(const char* name){
179   name_ = name;
180 }
181
182 int Win::fence(int assert)
183 {
184   XBT_DEBUG("Entering fence");
185   if (opened_ == 0)
186     opened_=1;
187   if (not (assert & MPI_MODE_NOPRECEDE)) {
188     // This is not the first fence => finalize what came before
189     bar_->wait();
190     flush_local_all();
191     count_=0;
192   }
193
194   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
195     opened_=0;
196   assert_ = assert;
197
198   bar_->wait();
199   XBT_DEBUG("Leaving fence");
200
201   return MPI_SUCCESS;
202 }
203
204 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
205               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
206 {
207   //get receiver pointer
208   Win* recv_win = connected_wins_[target_rank];
209
210   CHECK_WIN_LOCKED(recv_win)
211   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
212
213   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
214
215   if (target_rank != rank_) { // This is not for myself, so we need to send messages
216     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
217     // prepare send_request
218     MPI_Request sreq =
219         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
220                                MPI_OP_NULL);
221
222     //prepare receiver request
223     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
224                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
225
226     //start send
227     sreq->start();
228
229     if(request!=nullptr){
230       *request=sreq;
231     }else{
232       mut_->lock();
233       requests_.push_back(sreq);
234       mut_->unlock();
235     }
236
237     //push request to receiver's win
238     recv_win->mut_->lock();
239     recv_win->requests_.push_back(rreq);
240     rreq->start();
241     recv_win->mut_->unlock();
242   } else {
243     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
244     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
245     if(request!=nullptr)
246       *request = MPI_REQUEST_NULL;
247   }
248
249   return MPI_SUCCESS;
250 }
251
252 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
253               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
254 {
255   //get sender pointer
256   Win* send_win = connected_wins_[target_rank];
257
258   CHECK_WIN_LOCKED(send_win)
259   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
260
261   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
262   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
263
264   if (target_rank != rank_) {
265     //prepare send_request
266     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
267                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
268
269     //prepare receiver request
270     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
271                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
272
273     //start the send, with another process than us as sender.
274     sreq->start();
275     // push request to sender's win
276     send_win->mut_->lock();
277     send_win->requests_.push_back(sreq);
278     send_win->mut_->unlock();
279
280     //start recv
281     rreq->start();
282
283     if(request!=nullptr){
284       *request=rreq;
285     }else{
286       mut_->lock();
287       requests_.push_back(rreq);
288       mut_->unlock();
289     }
290   } else {
291     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
292     if(request!=nullptr)
293       *request=MPI_REQUEST_NULL;
294   }
295   return MPI_SUCCESS;
296 }
297
298 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
299               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
300 {
301   XBT_DEBUG("Entering MPI_Win_Accumulate");
302   //get receiver pointer
303   Win* recv_win = connected_wins_[target_rank];
304
305   //FIXME: local version
306   CHECK_WIN_LOCKED(recv_win)
307   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
308
309   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
310   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
311   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
312   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
313   // prepare send_request
314
315   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
316                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
317
318   // prepare receiver request
319   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
320                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
321
322   count_++;
323
324   // start send
325   sreq->start();
326   // push request to receiver's win
327   recv_win->mut_->lock();
328   recv_win->requests_.push_back(rreq);
329   rreq->start();
330   recv_win->mut_->unlock();
331
332   if (request != nullptr) {
333     *request = sreq;
334   } else {
335     mut_->lock();
336     requests_.push_back(sreq);
337     mut_->unlock();
338   }
339
340   XBT_DEBUG("Leaving MPI_Win_Accumulate");
341   return MPI_SUCCESS;
342 }
343
344 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
345                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
346                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
347 {
348   //get sender pointer
349   const Win* send_win = connected_wins_[target_rank];
350
351   CHECK_WIN_LOCKED(send_win)
352   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
353
354   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
355   //need to be sure ops are correctly ordered, so finish request here ? slow.
356   MPI_Request req = MPI_REQUEST_NULL;
357   send_win->atomic_mut_->lock();
358   get(result_addr, result_count, result_datatype, target_rank,
359               target_disp, target_count, target_datatype, &req);
360   if (req != MPI_REQUEST_NULL)
361     Request::wait(&req, MPI_STATUS_IGNORE);
362   if(op!=MPI_NO_OP)
363     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
364               target_disp, target_count, target_datatype, op, &req);
365   if (req != MPI_REQUEST_NULL)
366     Request::wait(&req, MPI_STATUS_IGNORE);
367   send_win->atomic_mut_->unlock();
368   return MPI_SUCCESS;
369 }
370
371 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
372                           int target_rank, MPI_Aint target_disp)
373 {
374   //get sender pointer
375   const Win* send_win = connected_wins_[target_rank];
376
377   CHECK_WIN_LOCKED(send_win)
378
379   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
380   MPI_Request req = MPI_REQUEST_NULL;
381   send_win->atomic_mut_->lock();
382   get(result_addr, 1, datatype, target_rank,
383               target_disp, 1, datatype, &req);
384   if (req != MPI_REQUEST_NULL)
385     Request::wait(&req, MPI_STATUS_IGNORE);
386   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
387     put(origin_addr, 1, datatype, target_rank,
388               target_disp, 1, datatype);
389   }
390   send_win->atomic_mut_->unlock();
391   return MPI_SUCCESS;
392 }
393
394 int Win::start(MPI_Group group, int /*assert*/)
395 {
396   /* From MPI forum advices
397   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
398   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
399   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
400   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
401   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
402   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
403   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
404   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
405   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
406   must complete, without further dependencies.  */
407
408   //naive, blocking implementation.
409   XBT_DEBUG("Entering MPI_Win_Start");
410   std::vector<MPI_Request> reqs;
411   for (int i = 0; i < group->size(); i++) {
412     int src = comm_->group()->rank(group->actor(i));
413     xbt_assert(src != MPI_UNDEFINED);
414     if (src != rank_)
415       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
416   }
417   int size = static_cast<int>(reqs.size());
418
419   Request::startall(size, reqs.data());
420   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
421   for (auto& req : reqs)
422     Request::unref(&req);
423
424   group->ref();
425   dst_group_ = group;
426   opened_++; // we're open for business !
427   XBT_DEBUG("Leaving MPI_Win_Start");
428   return MPI_SUCCESS;
429 }
430
431 int Win::post(MPI_Group group, int /*assert*/)
432 {
433   //let's make a synchronous send here
434   XBT_DEBUG("Entering MPI_Win_Post");
435   std::vector<MPI_Request> reqs;
436   for (int i = 0; i < group->size(); i++) {
437     int dst = comm_->group()->rank(group->actor(i));
438     xbt_assert(dst != MPI_UNDEFINED);
439     if (dst != rank_)
440       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
441   }
442   int size = static_cast<int>(reqs.size());
443
444   Request::startall(size, reqs.data());
445   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
446   for (auto& req : reqs)
447     Request::unref(&req);
448
449   group->ref();
450   src_group_ = group;
451   opened_++; // we're open for business !
452   XBT_DEBUG("Leaving MPI_Win_Post");
453   return MPI_SUCCESS;
454 }
455
456 int Win::complete(){
457   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
458
459   XBT_DEBUG("Entering MPI_Win_Complete");
460   std::vector<MPI_Request> reqs;
461   for (int i = 0; i < dst_group_->size(); i++) {
462     int dst = comm_->group()->rank(dst_group_->actor(i));
463     xbt_assert(dst != MPI_UNDEFINED);
464     if (dst != rank_)
465       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
466   }
467   int size = static_cast<int>(reqs.size());
468
469   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
470   Request::startall(size, reqs.data());
471   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
472   for (auto& req : reqs)
473     Request::unref(&req);
474
475   flush_local_all();
476
477   opened_--; //we're closed for business !
478   Group::unref(dst_group_);
479   dst_group_ = MPI_GROUP_NULL;
480   return MPI_SUCCESS;
481 }
482
483 int Win::wait(){
484   //naive, blocking implementation.
485   XBT_DEBUG("Entering MPI_Win_Wait");
486   std::vector<MPI_Request> reqs;
487   for (int i = 0; i < src_group_->size(); i++) {
488     int src = comm_->group()->rank(src_group_->actor(i));
489     xbt_assert(src != MPI_UNDEFINED);
490     if (src != rank_)
491       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
492   }
493   int size = static_cast<int>(reqs.size());
494
495   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
496   Request::startall(size, reqs.data());
497   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
498   for (auto& req : reqs)
499     Request::unref(&req);
500
501   flush_local_all();
502
503   opened_--; //we're closed for business !
504   Group::unref(src_group_);
505   src_group_ = MPI_GROUP_NULL;
506   return MPI_SUCCESS;
507 }
508
509 int Win::lock(int lock_type, int rank, int /*assert*/)
510 {
511   MPI_Win target_win = connected_wins_[rank];
512
513   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
514     target_win->lock_mut_->lock();
515     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
516     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
517       target_win->lock_mut_->unlock();
518    }
519   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
520     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
521
522   target_win->lockers_.push_back(rank_);
523
524   flush(rank);
525   return MPI_SUCCESS;
526 }
527
528 int Win::lock_all(int assert){
529   int retval = MPI_SUCCESS;
530   for (int i = 0; i < comm_->size(); i++) {
531     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
532     if (ret != MPI_SUCCESS)
533       retval = ret;
534   }
535   return retval;
536 }
537
538 int Win::unlock(int rank){
539   MPI_Win target_win = connected_wins_[rank];
540   int target_mode = target_win->mode_;
541   target_win->mode_= 0;
542   target_win->lockers_.remove(rank_);
543   if (target_mode==MPI_LOCK_EXCLUSIVE){
544     target_win->lock_mut_->unlock();
545   }
546
547   flush(rank);
548   return MPI_SUCCESS;
549 }
550
551 int Win::unlock_all(){
552   int retval = MPI_SUCCESS;
553   for (int i = 0; i < comm_->size(); i++) {
554     int ret = this->unlock(i);
555     if (ret != MPI_SUCCESS)
556       retval = ret;
557   }
558   return retval;
559 }
560
561 int Win::flush(int rank){
562   int finished = finish_comms(rank);
563   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
564   if (rank != rank_) {
565     finished = connected_wins_[rank]->finish_comms(rank_);
566     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
567   }
568   return MPI_SUCCESS;
569 }
570
571 int Win::flush_local(int rank){
572   int finished = finish_comms(rank);
573   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
574   return MPI_SUCCESS;
575 }
576
577 int Win::flush_all(){
578   int finished = finish_comms();
579   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
580   for (int i = 0; i < comm_->size(); i++) {
581     if (i != rank_) {
582       finished = connected_wins_[i]->finish_comms(rank_);
583       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
584     }
585   }
586   return MPI_SUCCESS;
587 }
588
589 int Win::flush_local_all(){
590   int finished = finish_comms();
591   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
592   return MPI_SUCCESS;
593 }
594
595 Win* Win::f2c(int id){
596   return static_cast<Win*>(F2C::f2c(id));
597 }
598
599 int Win::finish_comms(){
600   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
601   // Without this, the vector could get redimensioned when another process pushes.
602   // This would result in the array used by Request::waitall() to be invalidated.
603   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
604   mut_->lock();
605   //Finish own requests
606   int size = static_cast<int>(requests_.size());
607   if (size > 0) {
608     MPI_Request* treqs = requests_.data();
609     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
610     requests_.clear();
611   }
612   mut_->unlock();
613   return size;
614 }
615
616 int Win::finish_comms(int rank){
617   // See comment about the mutex in finish_comms() above
618   mut_->lock();
619   // Finish own requests
620   // Let's see if we're either the destination or the sender of this request
621   // because we only wait for requests that we are responsible for.
622   // Also use the process id here since the request itself returns from src()
623   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
624   aid_t proc_id = comm_->group()->actor(rank);
625   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
626     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
627   });
628   std::vector<MPI_Request> myreqqs(it, end(requests_));
629   requests_.erase(it, end(requests_));
630   int size = static_cast<int>(myreqqs.size());
631   if (size > 0) {
632     MPI_Request* treqs = myreqqs.data();
633     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
634     myreqqs.clear();
635   }
636   mut_->unlock();
637   return size;
638 }
639
640 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
641 {
642   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
643   for (int i = 0; not target_win && i < comm_->size(); i++) {
644     if (connected_wins_[i]->size_ > 0)
645       target_win = connected_wins_[i];
646   }
647   if (target_win) {
648     *size                         = target_win->size_;
649     *disp_unit                    = target_win->disp_unit_;
650     *static_cast<void**>(baseptr) = target_win->base_;
651   } else {
652     *size                         = 0;
653     *static_cast<void**>(baseptr) = nullptr;
654   }
655   return MPI_SUCCESS;
656 }
657
658 MPI_Errhandler Win::errhandler()
659 {
660   if (errhandler_ != MPI_ERRHANDLER_NULL)
661     errhandler_->ref();
662   return errhandler_;
663 }
664
665 void Win::set_errhandler(MPI_Errhandler errhandler)
666 {
667   if (errhandler_ != MPI_ERRHANDLER_NULL)
668     simgrid::smpi::Errhandler::unref(errhandler_);
669   errhandler_ = errhandler;
670   if (errhandler_ != MPI_ERRHANDLER_NULL)
671     errhandler_->ref();
672 }
673 } // namespace smpi
674 } // namespace simgrid