Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make Datatype::name_ and Win::name_ a std::string.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7
8 #include "private.hpp"
9 #include "smpi_coll.hpp"
10 #include "smpi_comm.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_info.hpp"
13 #include "smpi_keyvals.hpp"
14 #include "smpi_request.hpp"
15 #include "src/smpi/include/smpi_actor.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
18
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void* base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic)
26     : base_(base)
27     , size_(size)
28     , disp_unit_(disp_unit)
29     , info_(info)
30     , comm_(comm)
31     , rank_(comm->rank())
32     , allocated_(allocated)
33     , dynamic_(dynamic)
34 {
35   XBT_DEBUG("Creating window");
36   if(info!=MPI_INFO_NULL)
37     info->ref();
38   int comm_size          = comm->size();
39   opened_                = 0;
40   group_                 = MPI_GROUP_NULL;
41   requests_              = new std::vector<MPI_Request>();
42   mut_                   = s4u::Mutex::create();
43   lock_mut_              = s4u::Mutex::create();
44   atomic_mut_            = s4u::Mutex::create();
45   connected_wins_        = new MPI_Win[comm_size];
46   connected_wins_[rank_] = this;
47   count_                 = 0;
48   if(rank_==0){
49     bar_ = new s4u::Barrier(comm_size);
50   }
51   mode_=0;
52   errhandler_=MPI_ERRORS_ARE_FATAL;
53   errhandler_->ref();
54   comm->add_rma_win(this);
55   comm->ref();
56
57   colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE,
58                    comm);
59
60   colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
61
62   colls::barrier(comm);
63 }
64
65 Win::~Win(){
66   //As per the standard, perform a barrier to ensure every async comm is finished
67   bar_->wait();
68
69   int finished = finish_comms();
70   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
71
72   delete requests_;
73   delete[] connected_wins_;
74   if (info_ != MPI_INFO_NULL)
75     simgrid::smpi::Info::unref(info_);
76   if (errhandler_ != MPI_ERRHANDLER_NULL)
77     simgrid::smpi::Errhandler::unref(errhandler_);
78
79   comm_->remove_rma_win(this);
80
81   colls::barrier(comm_);
82   Comm::unref(comm_);
83   
84   if (rank_ == 0)
85     delete bar_;
86
87   if(allocated_ !=0)
88     xbt_free(base_);
89
90   cleanup_attr<Win>();
91 }
92
93 int Win::attach(void* /*base*/, MPI_Aint size)
94 {
95   if (not(base_ == MPI_BOTTOM || base_ == nullptr))
96     return MPI_ERR_ARG;
97   base_ = nullptr; // actually the address will be given in the RMA calls, as being the disp.
98   size_+=size;
99   return MPI_SUCCESS;
100 }
101
102 int Win::detach(const void* /*base*/)
103 {
104   base_=MPI_BOTTOM;
105   size_=-1;
106   return MPI_SUCCESS;
107 }
108
109 void Win::get_name(char* name, int* length) const
110 {
111   *length = name_.length();
112   if (not name_.empty()) {
113     name_.copy(name, *length);
114     name[*length] = '\0';
115   }
116 }
117
118 void Win::get_group(MPI_Group* group){
119   if(comm_ != MPI_COMM_NULL){
120     *group = comm_->group();
121   } else {
122     *group = MPI_GROUP_NULL;
123   }
124 }
125
126 MPI_Info Win::info()
127 {
128   if (info_ == MPI_INFO_NULL)
129     info_ = new Info();
130   info_->ref();
131   return info_;
132 }
133
134 int Win::rank() const
135 {
136   return rank_;
137 }
138
139 MPI_Aint Win::size() const
140 {
141   return size_;
142 }
143
144 void* Win::base() const
145 {
146   return base_;
147 }
148
149 int Win::disp_unit() const
150 {
151   return disp_unit_;
152 }
153
154 int Win::dynamic() const
155 {
156   return dynamic_;
157 }
158
159 void Win::set_info(MPI_Info info)
160 {
161   if (info_ != MPI_INFO_NULL)
162     simgrid::smpi::Info::unref(info_);
163   info_ = info;
164   if (info_ != MPI_INFO_NULL)
165     info_->ref();
166 }
167
168 void Win::set_name(const char* name){
169   name_ = name;
170 }
171
172 int Win::fence(int assert)
173 {
174   XBT_DEBUG("Entering fence");
175   if (opened_ == 0)
176     opened_=1;
177   if (assert != MPI_MODE_NOPRECEDE) {
178     // This is not the first fence => finalize what came before
179     bar_->wait();
180     mut_->lock();
181     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
182     // Without this, the vector could get redimensioned when another process pushes.
183     // This would result in the array used by Request::waitall() to be invalidated.
184     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
185     std::vector<MPI_Request> *reqs = requests_;
186     int size = static_cast<int>(reqs->size());
187     // start all requests that have been prepared by another process
188     if (size > 0) {
189       MPI_Request* treqs = &(*reqs)[0];
190       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
191     }
192     count_=0;
193     mut_->unlock();
194   }
195
196   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
197     opened_=0;
198   assert_ = assert;
199
200   bar_->wait();
201   XBT_DEBUG("Leaving fence");
202
203   return MPI_SUCCESS;
204 }
205
206 int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
207               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
208 {
209   //get receiver pointer
210   const Win* recv_win = connected_wins_[target_rank];
211
212   if(opened_==0){//check that post/start has been done
213     // no fence or start .. lock ok ?
214     int locked=0;
215     for (auto const& it : recv_win->lockers_)
216       if (it == comm_->rank())
217         locked = 1;
218     if(locked != 1)
219       return MPI_ERR_WIN;
220   }
221
222   if(target_count*target_datatype->get_extent()>recv_win->size_)
223     return MPI_ERR_ARG;
224
225   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
226
227   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
228     XBT_DEBUG("Entering MPI_Put to remote rank %d", target_rank);
229     // prepare send_request
230     MPI_Request sreq =
231         // TODO cheinrich Check for rank / pid conversion
232         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank, SMPI_RMA_TAG + 1,
233                                comm_, MPI_OP_NULL);
234
235     //prepare receiver request
236     // TODO cheinrich Check for rank / pid conversion
237     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
238                                               target_rank, SMPI_RMA_TAG + 1, recv_win->comm_, MPI_OP_NULL);
239
240     //start send
241     sreq->start();
242
243     if(request!=nullptr){
244       *request=sreq;
245     }else{
246       mut_->lock();
247       requests_->push_back(sreq);
248       mut_->unlock();
249     }
250
251     //push request to receiver's win
252     recv_win->mut_->lock();
253     recv_win->requests_->push_back(rreq);
254     rreq->start();
255     recv_win->mut_->unlock();
256   } else {
257     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
258     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
259     if(request!=nullptr)
260       *request = MPI_REQUEST_NULL;
261   }
262
263   return MPI_SUCCESS;
264 }
265
266 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
267               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
268 {
269   //get sender pointer
270   const Win* send_win = connected_wins_[target_rank];
271
272   if(opened_==0){//check that post/start has been done
273     // no fence or start .. lock ok ?
274     int locked=0;
275     for (auto const& it : send_win->lockers_)
276       if (it == comm_->rank())
277         locked = 1;
278     if(locked != 1)
279       return MPI_ERR_WIN;
280   }
281
282   if(target_count*target_datatype->get_extent()>send_win->size_)
283     return MPI_ERR_ARG;
284
285   const void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
286   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
287
288   if(target_rank != comm_->rank()){
289     //prepare send_request
290     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype, target_rank,
291                                               send_win->comm_->rank(), SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
292
293     //prepare receiver request
294     MPI_Request rreq = Request::rma_recv_init(
295         origin_addr, origin_count, origin_datatype, target_rank,
296         comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
297         SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
298
299     //start the send, with another process than us as sender.
300     sreq->start();
301     //push request to receiver's win
302     send_win->mut_->lock();
303     send_win->requests_->push_back(sreq);
304     send_win->mut_->unlock();
305
306     //start recv
307     rreq->start();
308
309     if(request!=nullptr){
310       *request=rreq;
311     }else{
312       mut_->lock();
313       requests_->push_back(rreq);
314       mut_->unlock();
315     }
316   } else {
317     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
318     if(request!=nullptr)
319       *request=MPI_REQUEST_NULL;
320   }
321   return MPI_SUCCESS;
322 }
323
324 int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
325               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
326 {
327   XBT_DEBUG("Entering MPI_Win_Accumulate");
328   //get receiver pointer
329   const Win* recv_win = connected_wins_[target_rank];
330
331   if(opened_==0){//check that post/start has been done
332     // no fence or start .. lock ok ?
333     int locked=0;
334     for (auto const& it : recv_win->lockers_)
335       if (it == comm_->rank())
336         locked = 1;
337     if(locked != 1)
338       return MPI_ERR_WIN;
339   }
340   //FIXME: local version
341
342   if(target_count*target_datatype->get_extent()>recv_win->size_)
343     return MPI_ERR_ARG;
344
345   void* recv_addr = static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_;
346   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
347   // As the tag will be used for ordering of the operations, subtract count from it (to avoid collisions with other
348   // SMPI tags, SMPI_RMA_TAG is set below all the other ones we use)
349   // prepare send_request
350
351   MPI_Request sreq = Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(), target_rank,
352                                             SMPI_RMA_TAG - 3 - count_, comm_, op);
353
354   // prepare receiver request
355   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
356                                             recv_win->comm_->group()->rank(comm_->group()->actor(target_rank)), SMPI_RMA_TAG - 3 - count_, recv_win->comm_, op);
357
358   count_++;
359
360   // start send
361   sreq->start();
362   // push request to receiver's win
363   recv_win->mut_->lock();
364   recv_win->requests_->push_back(rreq);
365   rreq->start();
366   recv_win->mut_->unlock();
367
368   if (request != nullptr) {
369     *request = sreq;
370   } else {
371     mut_->lock();
372     requests_->push_back(sreq);
373     mut_->unlock();
374   }
375
376   XBT_DEBUG("Leaving MPI_Win_Accumulate");
377   return MPI_SUCCESS;
378 }
379
380 int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
381                         int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
382                         int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
383 {
384   //get sender pointer
385   const Win* send_win = connected_wins_[target_rank];
386
387   if(opened_==0){//check that post/start has been done
388     // no fence or start .. lock ok ?
389     int locked=0;
390     for (auto const& it : send_win->lockers_)
391       if (it == comm_->rank())
392         locked = 1;
393     if(locked != 1)
394       return MPI_ERR_WIN;
395   }
396
397   if(target_count*target_datatype->get_extent()>send_win->size_)
398     return MPI_ERR_ARG;
399
400   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
401   //need to be sure ops are correctly ordered, so finish request here ? slow.
402   MPI_Request req;
403   send_win->atomic_mut_->lock();
404   get(result_addr, result_count, result_datatype, target_rank,
405               target_disp, target_count, target_datatype, &req);
406   if (req != MPI_REQUEST_NULL)
407     Request::wait(&req, MPI_STATUS_IGNORE);
408   if(op!=MPI_NO_OP)
409     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
410               target_disp, target_count, target_datatype, op, &req);
411   if (req != MPI_REQUEST_NULL)
412     Request::wait(&req, MPI_STATUS_IGNORE);
413   send_win->atomic_mut_->unlock();
414   return MPI_SUCCESS;
415 }
416
417 int Win::compare_and_swap(const void* origin_addr, const void* compare_addr, void* result_addr, MPI_Datatype datatype,
418                           int target_rank, MPI_Aint target_disp)
419 {
420   //get sender pointer
421   const Win* send_win = connected_wins_[target_rank];
422
423   if(opened_==0){//check that post/start has been done
424     // no fence or start .. lock ok ?
425     int locked=0;
426     for (auto const& it : send_win->lockers_)
427       if (it == comm_->rank())
428         locked = 1;
429     if(locked != 1)
430       return MPI_ERR_WIN;
431   }
432
433   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
434   MPI_Request req = MPI_REQUEST_NULL;
435   send_win->atomic_mut_->lock();
436   get(result_addr, 1, datatype, target_rank,
437               target_disp, 1, datatype, &req);
438   if (req != MPI_REQUEST_NULL)
439     Request::wait(&req, MPI_STATUS_IGNORE);
440   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
441     put(origin_addr, 1, datatype, target_rank,
442               target_disp, 1, datatype);
443   }
444   send_win->atomic_mut_->unlock();
445   return MPI_SUCCESS;
446 }
447
448 int Win::start(MPI_Group group, int /*assert*/)
449 {
450   /* From MPI forum advices
451   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
452   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
453   the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
454   matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
455   MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
456   implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
457   to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
458   called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
459   origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
460   must complete, without further dependencies.  */
461
462   //naive, blocking implementation.
463   int i             = 0;
464   int j             = 0;
465   int size          = group->size();
466   std::vector<MPI_Request> reqs(size);
467
468   XBT_DEBUG("Entering MPI_Win_Start");
469   while (j != size) {
470     int src = comm_->group()->rank(group->actor(j));
471     if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
472       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
473       i++;
474     }
475     j++;
476   }
477   size = i;
478   Request::startall(size, reqs.data());
479   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
480   for (i = 0; i < size; i++) {
481     Request::unref(&reqs[i]);
482   }
483   opened_++; //we're open for business !
484   group_=group;
485   group->ref();
486   XBT_DEBUG("Leaving MPI_Win_Start");
487   return MPI_SUCCESS;
488 }
489
490 int Win::post(MPI_Group group, int /*assert*/)
491 {
492   //let's make a synchronous send here
493   int i             = 0;
494   int j             = 0;
495   int size = group->size();
496   std::vector<MPI_Request> reqs(size);
497
498   XBT_DEBUG("Entering MPI_Win_Post");
499   while(j!=size){
500     int dst = comm_->group()->rank(group->actor(j));
501     if (dst != rank_ && dst != MPI_UNDEFINED) {
502       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 4, comm_);
503       i++;
504     }
505     j++;
506   }
507   size=i;
508
509   Request::startall(size, reqs.data());
510   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
511   for(i=0;i<size;i++){
512     Request::unref(&reqs[i]);
513   }
514   opened_++; //we're open for business !
515   group_=group;
516   group->ref();
517   XBT_DEBUG("Leaving MPI_Win_Post");
518   return MPI_SUCCESS;
519 }
520
521 int Win::complete(){
522   if(opened_==0)
523     xbt_die("Complete called on already opened MPI_Win");
524
525   XBT_DEBUG("Entering MPI_Win_Complete");
526   int i             = 0;
527   int j             = 0;
528   int size          = group_->size();
529   std::vector<MPI_Request> reqs(size);
530
531   while(j!=size){
532     int dst = comm_->group()->rank(group_->actor(j));
533     if (dst != rank_ && dst != MPI_UNDEFINED) {
534       reqs[i] = Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG + 5, comm_);
535       i++;
536     }
537     j++;
538   }
539   size=i;
540   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
541   Request::startall(size, reqs.data());
542   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
543
544   for(i=0;i<size;i++){
545     Request::unref(&reqs[i]);
546   }
547
548   int finished = finish_comms();
549   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
550
551   Group::unref(group_);
552   opened_--; //we're closed for business !
553   return MPI_SUCCESS;
554 }
555
556 int Win::wait(){
557   //naive, blocking implementation.
558   XBT_DEBUG("Entering MPI_Win_Wait");
559   int i             = 0;
560   int j             = 0;
561   int size          = group_->size();
562   std::vector<MPI_Request> reqs(size);
563
564   while(j!=size){
565     int src = comm_->group()->rank(group_->actor(j));
566     if (src != rank_ && src != MPI_UNDEFINED) {
567       reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 5, comm_);
568       i++;
569     }
570     j++;
571   }
572   size=i;
573   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
574   Request::startall(size, reqs.data());
575   Request::waitall(size, reqs.data(), MPI_STATUSES_IGNORE);
576   for(i=0;i<size;i++){
577     Request::unref(&reqs[i]);
578   }
579   int finished = finish_comms();
580   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
581
582   Group::unref(group_);
583   opened_--; //we're opened for business !
584   return MPI_SUCCESS;
585 }
586
587 int Win::lock(int lock_type, int rank, int /*assert*/)
588 {
589   MPI_Win target_win = connected_wins_[rank];
590
591   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
592     target_win->lock_mut_->lock();
593     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
594     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
595       target_win->lock_mut_->unlock();
596    }
597   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
598     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
599
600   target_win->lockers_.push_back(comm_->rank());
601
602   int finished = finish_comms(rank);
603   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
604   finished = target_win->finish_comms(rank_);
605   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
606   return MPI_SUCCESS;
607 }
608
609 int Win::lock_all(int assert){
610   int retval = MPI_SUCCESS;
611   for (int i = 0; i < comm_->size(); i++) {
612     int ret = this->lock(MPI_LOCK_SHARED, i, assert);
613     if (ret != MPI_SUCCESS)
614       retval = ret;
615   }
616   return retval;
617 }
618
619 int Win::unlock(int rank){
620   MPI_Win target_win = connected_wins_[rank];
621   int target_mode = target_win->mode_;
622   target_win->mode_= 0;
623   target_win->lockers_.remove(comm_->rank());
624   if (target_mode==MPI_LOCK_EXCLUSIVE){
625     target_win->lock_mut_->unlock();
626   }
627
628   int finished = finish_comms(rank);
629   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
630   finished = target_win->finish_comms(rank_);
631   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
632   return MPI_SUCCESS;
633 }
634
635 int Win::unlock_all(){
636   int retval = MPI_SUCCESS;
637   for (int i = 0; i < comm_->size(); i++) {
638     int ret = this->unlock(i);
639     if (ret != MPI_SUCCESS)
640       retval = ret;
641   }
642   return retval;
643 }
644
645 int Win::flush(int rank){
646   MPI_Win target_win = connected_wins_[rank];
647   int finished       = finish_comms(rank_);
648   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
649   finished = target_win->finish_comms(rank);
650   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
651   return MPI_SUCCESS;
652 }
653
654 int Win::flush_local(int rank){
655   int finished = finish_comms(rank);
656   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
657   return MPI_SUCCESS;
658 }
659
660 int Win::flush_all(){
661   int finished = finish_comms();
662   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
663   for (int i = 0; i < comm_->size(); i++) {
664     finished = connected_wins_[i]->finish_comms(rank_);
665     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
666   }
667   return MPI_SUCCESS;
668 }
669
670 int Win::flush_local_all(){
671   int finished = finish_comms();
672   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
673   return MPI_SUCCESS;
674 }
675
676 Win* Win::f2c(int id){
677   return static_cast<Win*>(F2C::f2c(id));
678 }
679
680 int Win::finish_comms(){
681   mut_->lock();
682   //Finish own requests
683   std::vector<MPI_Request> *reqqs = requests_;
684   int size = static_cast<int>(reqqs->size());
685   if (size > 0) {
686     MPI_Request* treqs = &(*reqqs)[0];
687     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
688     reqqs->clear();
689   }
690   mut_->unlock();
691   return size;
692 }
693
694 int Win::finish_comms(int rank){
695   mut_->lock();
696   //Finish own requests
697   std::vector<MPI_Request> *reqqs = requests_;
698   int size = static_cast<int>(reqqs->size());
699   if (size > 0) {
700     size = 0;
701     std::vector<MPI_Request> myreqqs;
702     auto iter                               = reqqs->begin();
703     int proc_id                             = comm_->group()->actor(rank)->get_pid();
704     while (iter != reqqs->end()){
705       // Let's see if we're either the destination or the sender of this request
706       // because we only wait for requests that we are responsible for.
707       // Also use the process id here since the request itself returns from src()
708       // and dst() the process id, NOT the rank (which only exists in the context of a communicator).
709       if (((*iter) != MPI_REQUEST_NULL) && (((*iter)->src() == proc_id) || ((*iter)->dst() == proc_id))) {
710         myreqqs.push_back(*iter);
711         iter = reqqs->erase(iter);
712         size++;
713       } else {
714         ++iter;
715       }
716     }
717     if(size >0){
718       MPI_Request* treqs = &myreqqs[0];
719       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
720       myreqqs.clear();
721     }
722   }
723   mut_->unlock();
724   return size;
725 }
726
727 int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr) const
728 {
729   const Win* target_win = rank != MPI_PROC_NULL ? connected_wins_[rank] : nullptr;
730   for (int i = 0; not target_win && i < comm_->size(); i++) {
731     if (connected_wins_[i]->size_ > 0)
732       target_win = connected_wins_[i];
733   }
734   if (target_win) {
735     *size                         = target_win->size_;
736     *disp_unit                    = target_win->disp_unit_;
737     *static_cast<void**>(baseptr) = target_win->base_;
738   } else {
739     *size                         = 0;
740     *static_cast<void**>(baseptr) = nullptr;
741   }
742   return MPI_SUCCESS;
743 }
744
745 MPI_Errhandler Win::errhandler()
746 {
747   if (errhandler_ != MPI_ERRHANDLER_NULL)
748     errhandler_->ref();
749   return errhandler_;
750 }
751
752 void Win::set_errhandler(MPI_Errhandler errhandler)
753 {
754   if (errhandler_ != MPI_ERRHANDLER_NULL)
755     simgrid::smpi::Errhandler::unref(errhandler_);
756   errhandler_ = errhandler;
757   if (errhandler_ != MPI_ERRHANDLER_NULL)
758     errhandler_->ref();
759 }
760 } // namespace smpi
761 } // namespace simgrid