Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill trailing whitespaces in source code files.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 #include <algorithm>
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
20
21 #define CHECK_RMA_REMOTE_WIN(fun, win)\
22   if(target_count*target_datatype->get_extent()>win->size_){\
23     XBT_WARN("%s: Trying to move %zd, which exceeds the window size on target process %d : %zd - Bailing out.",\
24     fun, target_count*target_datatype->get_extent(), target_rank, win->size_);\
25     simgrid::smpi::utils::set_current_buffer(1,"win_base",win->base_);\
26     return MPI_ERR_RMA_RANGE;\
27   }
28
29 #define CHECK_WIN_LOCKED(win)                                                                                          \
30   if (opened_ == 0) { /*check that post/start has been done*/                                                          \
31     bool locked = std::any_of(begin(win->lockers_), end(win->lockers_), [this](int it) { return it == this->rank_; }); \
32     if (not locked)                                                                                                    \
33       return MPI_ERR_WIN;                                                                                              \
34   }
35
36 namespace simgrid{
37 namespace smpi{
38 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
39 int Win::keyval_id_=0;
40
41 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, bool allocated, bool dynamic)
42     : base_(base)
43     , size_(size)
44     , disp_unit_(disp_unit)
45     , info_(info)
46     , comm_(comm)
47     , connected_wins_(comm->size())
48     , rank_(comm->rank())
49     , allocated_(allocated)
50     , dynamic_(dynamic)
51 {
52   XBT_DEBUG("Creating window");
53   if(info!=MPI_INFO_NULL)
54     info->ref();
55   connected_wins_[rank_] = this;
56   if(rank_==0){
57     bar_ = new s4u::Barrier(comm->size());
58   }
59   errhandler_->ref();
60   comm->add_rma_win(this);
61   comm->ref();
62
63   colls::allgather(&connected_wins_[rank_], sizeof(MPI_Win), MPI_BYTE, connected_wins_.data(), sizeof(MPI_Win),
64                    MPI_BYTE, comm);
65
66   colls::bcast(&bar_, sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
67
68   colls::barrier(comm);
69   this->add_f();
70 }
71
72 Win::~Win(){
73   //As per the standard, perform a barrier to ensure every async comm is finished
74   bar_->wait();
75
76   flush_local_all();
77
78   if (info_ != MPI_INFO_NULL)
79     simgrid::smpi::Info::unref(info_);
80   if (errhandler_ != MPI_ERRHANDLER_NULL)
81     simgrid::smpi::Errhandler::unref(errhandler_);
82
83   comm_->remove_rma_win(this);
84
85   colls::barrier(comm_);
86   Comm::unref(comm_);
87
88   if (rank_ == 0)
89     delete bar_;
90
91   if (allocated_)
92     xbt_free(base_);
93
94   F2C::free_f(this->f2c_id());
95   cleanup_attr<Win>();
96 }
97
98 int Win::attach(void* /*base*/, MPI_Aint size)
99 {
100   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
101     return MPI_ERR_ARG;
102   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
103   size_+=size;
104   return MPI_SUCCESS;
105 }
106
107 int Win::detach(const void* /*base*/)
108 {
109   base_=MPI_BOTTOM;
110   size_=-1;
111   return MPI_SUCCESS;
112 }
113
114 void Win::get_name(char* name, int* length) const
115 {
116   *length = static_cast<int>(name_.length());
117   if (not name_.empty()) {
118     name_.copy(name, *length);
119     name[*length] = '\0';
120   }
121 }
122
123 void Win::get_group(MPI_Group* group){
124   if(comm_ != MPI_COMM_NULL){
125     *group = comm_->group();
126   } else {
127     *group = MPI_GROUP_NULL;
128   }
129 }
130
131 MPI_Info Win::info()
132 {
133   return info_;
134 }
135
136 int Win::rank() const
137 {
138   return rank_;
139 }
140
141 MPI_Comm Win::comm() const
142 {
143   return comm_;
144 }
145
146 MPI_Aint Win::size() const
147 {
148   return size_;
149 }
150
151 void* Win::base() const
152 {
153   return base_;
154 }
155
156 int Win::disp_unit() const
157 {
158   return disp_unit_;
159 }
160
161 bool Win::dynamic() const
162 {
163   return dynamic_;
164 }
165
166 void Win::set_info(MPI_Info info)
167 {
168   if (info_ != MPI_INFO_NULL)
169     simgrid::smpi::Info::unref(info_);
170   info_ = info;
171   if (info_ != MPI_INFO_NULL)
172     info_->ref();
173 }
174
175 void Win::set_name(const char* name){
176   name_ = name;
177 }
178
179 int Win::fence(int assert)
180 {
181   XBT_DEBUG("Entering fence");
182   opened_++;
183   if (not (assert & MPI_MODE_NOPRECEDE)) {
184     // This is not the first fence => finalize what came before
185     bar_->wait();
186     flush_local_all();
187     count_=0;
188   }
189
190   if (assert & MPI_MODE_NOSUCCEED) // there should be no ops after this one, tell we are closed.
191     opened_=0;
192   assert_ = assert;
193
194   bar_->wait();
195   XBT_DEBUG("Leaving fence");
196
197   return MPI_SUCCESS;
198 }
199
200 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
201               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
202 {
203   //get receiver pointer
204   Win* recv_win = connected_wins_[target_rank];
205
206   CHECK_WIN_LOCKED(recv_win)
207   CHECK_RMA_REMOTE_WIN("MPI_Put", recv_win)
208
209   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
210
211   if (target_rank != rank_) { // This is not for myself, so we need to send messages
212     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
213     // prepare send_request
214     MPI_Request sreq =
215         Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank, SMPI_RMA_TAG + 1, comm_,
216                                MPI_OP_NULL);
217
218     //prepare receiver request
219     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
220                                               SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
221
222     //start send
223     sreq->start();
224
225     if(request!=nullptr){
226       *request=sreq;
227     }else{
228       mut_->lock();
229       requests_.push_back(sreq);
230       mut_->unlock();
231     }
232
233     //push request to receiver's win
234     recv_win->mut_->lock();
235     recv_win->requests_.push_back(rreq);
236     rreq->start();
237     recv_win->mut_->unlock();
238   } else {
239     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
240     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
241     if(request!=nullptr)
242       *request = MPI_REQUEST_NULL;
243   }
244
245   return MPI_SUCCESS;
246 }
247
248 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
249               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
250 {
251   //get sender pointer
252   Win* send_win = connected_wins_[target_rank];
253
254   CHECK_WIN_LOCKED(send_win)
255   CHECK_RMA_REMOTE_WIN("MPI_Get", send_win)
256
257   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
258   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
259
260   if (target_rank != rank_) {
261     //prepare send_request
262     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank, rank_,
263                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
264
265     //prepare receiver request
266     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype, target_rank, rank_,
267                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
268
269     //start the send, with another process than us as sender.
270     sreq->start();
271     // push request to sender's win
272     send_win->mut_->lock();
273     send_win->requests_.push_back(sreq);
274     send_win->mut_->unlock();
275
276     //start recv
277     rreq->start();
278
279     if(request!=nullptr){
280       *request=rreq;
281     }else{
282       mut_->lock();
283       requests_.push_back(rreq);
284       mut_->unlock();
285     }
286   } else {
287     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
288     if(request!=nullptr)
289       *request=MPI_REQUEST_NULL;
290   }
291   return MPI_SUCCESS;
292 }
293
294 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
295               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
296 {
297   XBT_DEBUG("Entering MPI_Win_Accumulate");
298   //get receiver pointer
299   Win* recv_win = connected_wins_[target_rank];
300
301   //FIXME: local version
302   CHECK_WIN_LOCKED(recv_win)
303   CHECK_RMA_REMOTE_WIN("MPI_Accumulate", recv_win)
304
305   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
306   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
307   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
308   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
309   // prepare send_request
310
311   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, rank_, target_rank,
312                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
313
314   // prepare receiver request
315   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, rank_, target_rank,
316                                             SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
317
318   count_++;
319
320   // start send
321   sreq->start();
322   // push request to receiver's win
323   recv_win->mut_->lock();
324   recv_win->requests_.push_back(rreq);
325   rreq->start();
326   recv_win->mut_->unlock();
327
328   if (request != nullptr) {
329     *request = sreq;
330   } else {
331     mut_->lock();
332     requests_.push_back(sreq);
333     mut_->unlock();
334   }
335
336   // FIXME: The current implementation fails to ensure the correct ordering of the accumulate requests.  The following
337   // 'flush' is a workaround to fix that.
338   flush(target_rank);
339   XBT_DEBUG("Leaving MPI_Win_Accumulate");
340   return MPI_SUCCESS;
341 }
342
343 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
344                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
345                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
346 {
347   //get sender pointer
348   const Win* send_win = connected_wins_[target_rank];
349
350   CHECK_WIN_LOCKED(send_win)
351   CHECK_RMA_REMOTE_WIN("MPI_Get_Accumulate", send_win)
352
353   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
354   //need to be sure ops are correctly ordered, so finish request here ? slow.
355   MPI_Request req = MPI_REQUEST_NULL;
356   send_win->atomic_mut_->lock();
357   get(result_addr, result_count, result_datatype, target_rank,
358               target_disp, target_count, target_datatype, &req);
359   if (req != MPI_REQUEST_NULL)
360     Request::wait(&req, MPI_STATUS_IGNORE);
361   if(op!=MPI_NO_OP)
362     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
363               target_disp, target_count, target_datatype, op, &req);
364   if (req != MPI_REQUEST_NULL)
365     Request::wait(&req, MPI_STATUS_IGNORE);
366   send_win->atomic_mut_->unlock();
367   return MPI_SUCCESS;
368 }
369
370 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
371                           int target_rank, MPI_Aint target_disp)
372 {
373   //get sender pointer
374   const Win* send_win = connected_wins_[target_rank];
375
376   CHECK_WIN_LOCKED(send_win)
377
378   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
379   MPI_Request req = MPI_REQUEST_NULL;
380   send_win->atomic_mut_->lock();
381   get(result_addr, 1, datatype, target_rank,
382               target_disp, 1, datatype, &req);
383   if (req != MPI_REQUEST_NULL)
384     Request::wait(&req, MPI_STATUS_IGNORE);
385   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
386     put(origin_addr, 1, datatype, target_rank,
387               target_disp, 1, datatype);
388   }
389   send_win->atomic_mut_->unlock();
390   return MPI_SUCCESS;
391 }
392
393 int Win::start(MPI_Group group, int /*assert*/)
394 {
395   /* From MPI forum advices
396   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
397   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
398   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
399   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
400   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
401   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
402   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
403   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
404   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
405   must complete, without further dependencies.  */
406
407   //naive, blocking implementation.
408   XBT_DEBUG("Entering MPI_Win_Start");
409   std::vector<MPI_Request> reqs;
410   for (int i = 0; i < group->size(); i++) {
411     int src = comm_->group()->rank(group->actor(i));
412     xbt_assert(src != MPI_UNDEFINED);
413     if (src != rank_)
414       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_));
415   }
416   int size = static_cast<int>(reqs.size());
417
418   Request::startall(size, reqs.data());
419   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
420   for (auto& req : reqs)
421     Request::unref(&req);
422
423   group->ref();
424   dst_group_ = group;
425   opened_++; // we're open for business !
426   XBT_DEBUG("Leaving MPI_Win_Start");
427   return MPI_SUCCESS;
428 }
429
430 int Win::post(MPI_Group group, int /*assert*/)
431 {
432   //let's make a synchronous send here
433   XBT_DEBUG("Entering MPI_Win_Post");
434   std::vector<MPI_Request> reqs;
435   for (int i = 0; i < group->size(); i++) {
436     int dst = comm_->group()->rank(group->actor(i));
437     xbt_assert(dst != MPI_UNDEFINED);
438     if (dst != rank_)
439       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_));
440   }
441   int size = static_cast<int>(reqs.size());
442
443   Request::startall(size, reqs.data());
444   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
445   for (auto& req : reqs)
446     Request::unref(&req);
447
448   group->ref();
449   src_group_ = group;
450   opened_++; // we're open for business !
451   XBT_DEBUG("Leaving MPI_Win_Post");
452   return MPI_SUCCESS;
453 }
454
455 int Win::complete(){
456   xbt_assert(opened_ != 0, "Complete called on already opened MPI_Win");
457
458   XBT_DEBUG("Entering MPI_Win_Complete");
459   std::vector<MPI_Request> reqs;
460   for (int i = 0; i < dst_group_->size(); i++) {
461     int dst = comm_->group()->rank(dst_group_->actor(i));
462     xbt_assert(dst != MPI_UNDEFINED);
463     if (dst != rank_)
464       reqs.emplace_back(Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_));
465   }
466   int size = static_cast<int>(reqs.size());
467
468   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
469   Request::startall(size, reqs.data());
470   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
471   for (auto& req : reqs)
472     Request::unref(&req);
473
474   flush_local_all();
475
476   opened_--; //we're closed for business !
477   Group::unref(dst_group_);
478   dst_group_ = MPI_GROUP_NULL;
479   return MPI_SUCCESS;
480 }
481
482 int Win::wait(){
483   //naive, blocking implementation.
484   XBT_DEBUG("Entering MPI_Win_Wait");
485   std::vector<MPI_Request> reqs;
486   for (int i = 0; i < src_group_->size(); i++) {
487     int src = comm_->group()->rank(src_group_->actor(i));
488     xbt_assert(src != MPI_UNDEFINED);
489     if (src != rank_)
490       reqs.emplace_back(Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_));
491   }
492   int size = static_cast<int>(reqs.size());
493
494   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
495   Request::startall(size, reqs.data());
496   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
497   for (auto& req : reqs)
498     Request::unref(&req);
499
500   flush_local_all();
501
502   opened_--; //we're closed for business !
503   Group::unref(src_group_);
504   src_group_ = MPI_GROUP_NULL;
505   return MPI_SUCCESS;
506 }
507
508 int Win::lock(int lock_type, int rank, int /*assert*/)
509 {
510   MPI_Win target_win = connected_wins_[rank];
511
512   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
513     target_win->lock_mut_->lock();
514     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
515     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
516       target_win->lock_mut_->unlock();
517    }
518   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
519     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
520
521   target_win->lockers_.push_back(rank_);
522
523   flush(rank);
524   return MPI_SUCCESS;
525 }
526
527 int Win::lock_all(int assert){
528   int retval = MPI_SUCCESS;
529   for (int i = 0; i < comm_->size(); i++) {
530     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
531     if (ret != MPI_SUCCESS)
532       retval = ret;
533   }
534   return retval;
535 }
536
537 int Win::unlock(int rank){
538   MPI_Win target_win = connected_wins_[rank];
539   int target_mode = target_win->mode_;
540   target_win->mode_= 0;
541   target_win->lockers_.remove(rank_);
542   if (target_mode==MPI_LOCK_EXCLUSIVE){
543     target_win->lock_mut_->unlock();
544   }
545
546   flush(rank);
547   return MPI_SUCCESS;
548 }
549
550 int Win::unlock_all(){
551   int retval = MPI_SUCCESS;
552   for (int i = 0; i < comm_->size(); i++) {
553     int ret = this->unlock(i);
554     if (ret != MPI_SUCCESS)
555       retval = ret;
556   }
557   return retval;
558 }
559
560 int Win::flush(int rank){
561   int finished = finish_comms(rank);
562   XBT_DEBUG("Win_flush on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
563   if (rank != rank_) {
564     finished = connected_wins_[rank]->finish_comms(rank_);
565     XBT_DEBUG("Win_flush on remote %d for local %d - Finished %d RMA calls", rank, rank_, finished);
566   }
567   return MPI_SUCCESS;
568 }
569
570 int Win::flush_local(int rank){
571   int finished = finish_comms(rank);
572   XBT_DEBUG("Win_flush_local on local %d for remote %d - Finished %d RMA calls", rank_, rank, finished);
573   return MPI_SUCCESS;
574 }
575
576 int Win::flush_all(){
577   int finished = finish_comms();
578   XBT_DEBUG("Win_flush_all on local %d - Finished %d RMA calls", rank_, finished);
579   for (int i = 0; i < comm_->size(); i++) {
580     if (i != rank_) {
581       finished = connected_wins_[i]->finish_comms(rank_);
582       XBT_DEBUG("Win_flush_all on remote %d for local %d - Finished %d RMA calls", i, rank_, finished);
583     }
584   }
585   return MPI_SUCCESS;
586 }
587
588 int Win::flush_local_all(){
589   int finished = finish_comms();
590   XBT_DEBUG("Win_flush_local_all on local %d - Finished %d RMA calls", rank_, finished);
591   return MPI_SUCCESS;
592 }
593
594 Win* Win::f2c(int id){
595   return static_cast<Win*>(F2C::f2c(id));
596 }
597
598 int Win::finish_comms(){
599   // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
600   // Without this, the vector could get redimensioned when another process pushes.
601   // This would result in the array used by Request::waitall() to be invalidated.
602   // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
603   mut_->lock();
604   //Finish own requests
605   int size = static_cast<int>(requests_.size());
606   if (size > 0) {
607     MPI_Request* treqs = requests_.data();
608     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
609     requests_.clear();
610   }
611   mut_->unlock();
612   return size;
613 }
614
615 int Win::finish_comms(int rank){
616   // See comment about the mutex in finish_comms() above
617   mut_->lock();
618   // Finish own requests
619   // Let's see if we're either the destination or the sender of this request
620   // because we only wait for requests that we are responsible for.
621   // Also use the process id here since the request itself returns from src()
622   // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
623   aid_t proc_id = comm_->group()->actor(rank);
624   auto it     = std::stable_partition(begin(requests_), end(requests_), [proc_id](const MPI_Request& req) {
625     return (req == MPI_REQUEST_NULL || (req->src() != proc_id && req->dst() != proc_id));
626   });
627   std::vector<MPI_Request> myreqqs(it, end(requests_));
628   requests_.erase(it, end(requests_));
629   int size = static_cast<int>(myreqqs.size());
630   if (size > 0) {
631     MPI_Request* treqs = myreqqs.data();
632     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
633     myreqqs.clear();
634   }
635   mut_->unlock();
636   return size;
637 }
638
639 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
640 {
641   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
642   for (int i = 0; not target_win && i < comm_->size(); i++) {
643     if (connected_wins_[i]->size_ > 0)
644       target_win = connected_wins_[i];
645   }
646   if (target_win) {
647     *size                         = target_win->size_;
648     *disp_unit                    = target_win->disp_unit_;
649     *static_cast<void**>(baseptr) = target_win->base_;
650   } else {
651     *size                         = 0;
652     *static_cast<void**>(baseptr) = nullptr;
653   }
654   return MPI_SUCCESS;
655 }
656
657 MPI_Errhandler Win::errhandler()
658 {
659   if (errhandler_ != MPI_ERRHANDLER_NULL)
660     errhandler_->ref();
661   return errhandler_;
662 }
663
664 void Win::set_errhandler(MPI_Errhandler errhandler)
665 {
666   if (errhandler_ != MPI_ERRHANDLER_NULL)
667     simgrid::smpi::Errhandler::unref(errhandler_);
668   errhandler_ = errhandler;
669   if (errhandler_ != MPI_ERRHANDLER_NULL)
670     errhandler_->ref();
671 }
672 } // namespace smpi
673 } // namespace simgrid