Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Fix wrong src/dst calculation
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_win.hpp"
7 #include "private.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_datatype.hpp"
11 #include "smpi_info.hpp"
12 #include "smpi_keyvals.hpp"
13 #include "smpi_process.hpp"
14 #include "smpi_request.hpp"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
17
18 using simgrid::s4u::Actor;
19
20 namespace simgrid{
21 namespace smpi{
22 std::unordered_map<int, smpi_key_elem> Win::keyvals_;
23 int Win::keyval_id_=0;
24
25 Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, int allocated, int dynamic): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm), allocated_(allocated), dynamic_(dynamic){
26   int comm_size = comm->size();
27   rank_      = comm->rank();
28   XBT_DEBUG("Creating window");
29   if(info!=MPI_INFO_NULL)
30     info->ref();
31   name_ = nullptr;
32   opened_ = 0;
33   group_ = MPI_GROUP_NULL;
34   requests_ = new std::vector<MPI_Request>();
35   mut_=xbt_mutex_init();
36   lock_mut_=xbt_mutex_init();
37   atomic_mut_=xbt_mutex_init();
38   connected_wins_ = new MPI_Win[comm_size];
39   connected_wins_[rank_] = this;
40   count_ = 0;
41   if(rank_==0){
42     bar_ = MSG_barrier_init(comm_size);
43   }
44   mode_=0;
45
46   comm->add_rma_win(this);
47
48   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
49                          MPI_BYTE, comm);
50
51   Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm);
52
53   Colls::barrier(comm);
54 }
55
56 Win::~Win(){
57   //As per the standard, perform a barrier to ensure every async comm is finished
58   MSG_barrier_wait(bar_);
59
60   int finished = finish_comms();
61   XBT_DEBUG("Win destructor - Finished %d RMA calls", finished);
62
63   delete requests_;
64   delete[] connected_wins_;
65   if (name_ != nullptr){
66     xbt_free(name_);
67   }
68   if(info_!=MPI_INFO_NULL){
69     MPI_Info_free(&info_);
70   }
71
72   comm_->remove_rma_win(this);
73
74   Colls::barrier(comm_);
75   if (rank_ == 0)
76     MSG_barrier_destroy(bar_);
77   xbt_mutex_destroy(mut_);
78   xbt_mutex_destroy(lock_mut_);
79   xbt_mutex_destroy(atomic_mut_);
80
81   if(allocated_ !=0)
82     xbt_free(base_);
83
84   cleanup_attr<Win>();
85 }
86
87 int Win::attach (void *base, MPI_Aint size){
88   if (not(base_ == MPI_BOTTOM || base_ == 0))
89     return MPI_ERR_ARG;
90   base_=0;//actually the address will be given in the RMA calls, as being the disp.
91   size_+=size;
92   return MPI_SUCCESS;
93 }
94
95 int Win::detach (void *base){
96   base_=MPI_BOTTOM;
97   size_=-1;
98   return MPI_SUCCESS;
99 }
100
101 void Win::get_name(char* name, int* length){
102   if(name_==nullptr){
103     *length=0;
104     name=nullptr;
105     return;
106   }
107   *length = strlen(name_);
108   strncpy(name, name_, *length+1);
109 }
110
111 void Win::get_group(MPI_Group* group){
112   if(comm_ != MPI_COMM_NULL){
113     *group = comm_->group();
114   } else {
115     *group = MPI_GROUP_NULL;
116   }
117 }
118
119 MPI_Info Win::info(){
120   if(info_== MPI_INFO_NULL)
121     info_ = new Info();
122   info_->ref();
123   return info_;
124 }
125
126 int Win::rank(){
127   return rank_;
128 }
129
130 MPI_Aint Win::size(){
131   return size_;
132 }
133
134 void* Win::base(){
135   return base_;
136 }
137
138 int Win::disp_unit(){
139   return disp_unit_;
140 }
141
142 int Win::dynamic(){
143   return dynamic_;
144 }
145
146 void Win::set_info(MPI_Info info){
147   if(info_!= MPI_INFO_NULL)
148     info->ref();
149   info_=info;
150 }
151
152 void Win::set_name(char* name){
153   name_ = xbt_strdup(name);
154 }
155
156 int Win::fence(int assert)
157 {
158   XBT_DEBUG("Entering fence");
159   if (opened_ == 0)
160     opened_=1;
161   if (assert != MPI_MODE_NOPRECEDE) {
162     // This is not the first fence => finalize what came before
163     MSG_barrier_wait(bar_);
164     xbt_mutex_acquire(mut_);
165     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
166     // Without this, the vector could get redimensionned when another process pushes.
167     // This would result in the array used by Request::waitall() to be invalidated.
168     // Another solution would be to copy the data and cleanup the vector *before* Request::waitall
169     std::vector<MPI_Request> *reqs = requests_;
170     int size = static_cast<int>(reqs->size());
171     // start all requests that have been prepared by another process
172     if (size > 0) {
173       MPI_Request* treqs = &(*reqs)[0];
174       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
175     }
176     count_=0;
177     xbt_mutex_release(mut_);
178   }
179
180   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
181     opened_=0;
182   assert_ = assert;
183
184   MSG_barrier_wait(bar_);
185   XBT_DEBUG("Leaving fence");
186
187   return MPI_SUCCESS;
188 }
189
190 int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
191               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
192 {
193   //get receiver pointer
194   MPI_Win recv_win = connected_wins_[target_rank];
195
196   if(opened_==0){//check that post/start has been done
197     // no fence or start .. lock ok ?
198     int locked=0;
199     for (auto const& it : recv_win->lockers_)
200       if (it == comm_->rank())
201         locked = 1;
202     if(locked != 1)
203       return MPI_ERR_WIN;
204   }
205
206   if(target_count*target_datatype->get_extent()>recv_win->size_)
207     return MPI_ERR_ARG;
208
209   void* recv_addr = static_cast<void*> ( static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
210   XBT_DEBUG("Entering MPI_Put to %d", target_rank);
211
212   if (target_rank != comm_->rank()) { // This is not for myself, so we need to send messages
213     //prepare send_request
214     MPI_Request sreq =
215         // TODO cheinrich Check for rank / pid conversion
216         Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(),
217                                target_rank, SMPI_RMA_TAG + 1, comm_, MPI_OP_NULL);
218
219     //prepare receiver request
220         // TODO cheinrich Check for rank / pid conversion
221     MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
222                                               target_rank, SMPI_RMA_TAG + 1,
223                                               recv_win->comm_, MPI_OP_NULL);
224
225     //start send
226     sreq->start();
227
228     if(request!=nullptr){
229       *request=sreq;
230     }else{
231       xbt_mutex_acquire(mut_);
232       requests_->push_back(sreq);
233       xbt_mutex_release(mut_);
234     }
235
236     //push request to receiver's win
237     xbt_mutex_acquire(recv_win->mut_);
238     recv_win->requests_->push_back(rreq);
239     rreq->start();
240     xbt_mutex_release(recv_win->mut_);
241
242   }else{
243     Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype);
244     if(request!=nullptr)
245       *request = MPI_REQUEST_NULL;
246   }
247
248   return MPI_SUCCESS;
249 }
250
251 int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
252               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
253 {
254   //get sender pointer
255   MPI_Win send_win = connected_wins_[target_rank];
256
257   if(opened_==0){//check that post/start has been done
258     // no fence or start .. lock ok ?
259     int locked=0;
260     for (auto const& it : send_win->lockers_)
261       if (it == comm_->rank())
262         locked = 1;
263     if(locked != 1)
264       return MPI_ERR_WIN;
265   }
266
267   if(target_count*target_datatype->get_extent()>send_win->size_)
268     return MPI_ERR_ARG;
269
270   void* send_addr = static_cast<void*>(static_cast<char*>(send_win->base_) + target_disp * send_win->disp_unit_);
271   XBT_DEBUG("Entering MPI_Get from %d", target_rank);
272
273   if(target_rank != comm_->rank()){
274     //prepare send_request
275     MPI_Request sreq = Request::rma_send_init(send_addr, target_count, target_datatype,
276                                               target_rank, send_win->comm_->rank(),
277                                               SMPI_RMA_TAG + 2, send_win->comm_, MPI_OP_NULL);
278
279     //prepare receiver request
280     MPI_Request rreq = Request::rma_recv_init(origin_addr, origin_count, origin_datatype,
281                                               target_rank, comm_->rank(), // TODO cheinrich Check here if comm_->rank() and above send_win->comm_->rank() are correct
282                                               SMPI_RMA_TAG + 2, comm_, MPI_OP_NULL);
283
284     //start the send, with another process than us as sender.
285     sreq->start();
286     //push request to receiver's win
287     xbt_mutex_acquire(send_win->mut_);
288     send_win->requests_->push_back(sreq);
289     xbt_mutex_release(send_win->mut_);
290
291     //start recv
292     rreq->start();
293
294     if(request!=nullptr){
295       *request=rreq;
296     }else{
297       xbt_mutex_acquire(mut_);
298       requests_->push_back(rreq);
299       xbt_mutex_release(mut_);
300     }
301
302   }else{
303     Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype);
304     if(request!=nullptr)
305       *request=MPI_REQUEST_NULL;
306   }
307
308   return MPI_SUCCESS;
309 }
310
311
312 int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
313               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
314 {
315   XBT_DEBUG("Entering MPI_Win_Accumulate");
316   //get receiver pointer
317   MPI_Win recv_win = connected_wins_[target_rank];
318
319   if(opened_==0){//check that post/start has been done
320     // no fence or start .. lock ok ?
321     int locked=0;
322     for (auto const& it : recv_win->lockers_)
323       if (it == comm_->rank())
324         locked = 1;
325     if(locked != 1)
326       return MPI_ERR_WIN;
327   }
328   //FIXME: local version
329
330   if(target_count*target_datatype->get_extent()>recv_win->size_)
331     return MPI_ERR_ARG;
332
333   void* recv_addr = static_cast<void*>(static_cast<char*>(recv_win->base_) + target_disp * recv_win->disp_unit_);
334   XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank);
335     //As the tag will be used for ordering of the operations, substract count from it (to avoid collisions with other SMPI tags, SMPI_RMA_TAG is set below all the other ones we use )
336     //prepare send_request
337
338   MPI_Request sreq =
339       Request::rma_send_init(origin_addr, origin_count, origin_datatype, comm_->rank(),
340                              target_rank, SMPI_RMA_TAG - 3 - count_, comm_, op);
341
342   // prepare receiver request
343   MPI_Request rreq = Request::rma_recv_init(recv_addr, target_count, target_datatype, recv_win->comm_->rank(),
344                                             target_rank, SMPI_RMA_TAG - 3 - count_,
345                                             recv_win->comm_, op);
346
347   count_++;
348
349   // start send
350   sreq->start();
351   // push request to receiver's win
352   xbt_mutex_acquire(recv_win->mut_);
353   recv_win->requests_->push_back(rreq);
354   rreq->start();
355   xbt_mutex_release(recv_win->mut_);
356
357   if (request != nullptr) {
358     *request = sreq;
359     }else{
360       xbt_mutex_acquire(mut_);
361       requests_->push_back(sreq);
362       xbt_mutex_release(mut_);
363     }
364
365   XBT_DEBUG("Leaving MPI_Win_Accumulate");
366   return MPI_SUCCESS;
367 }
368
369 int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
370               int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
371               MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
372
373   //get sender pointer
374   MPI_Win send_win = connected_wins_[target_rank];
375
376   if(opened_==0){//check that post/start has been done
377     // no fence or start .. lock ok ?
378     int locked=0;
379     for (auto const& it : send_win->lockers_)
380       if (it == comm_->rank())
381         locked = 1;
382     if(locked != 1)
383       return MPI_ERR_WIN;
384   }
385
386   if(target_count*target_datatype->get_extent()>send_win->size_)
387     return MPI_ERR_ARG;
388
389   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
390   //need to be sure ops are correctly ordered, so finish request here ? slow.
391   MPI_Request req;
392   xbt_mutex_acquire(send_win->atomic_mut_);
393   get(result_addr, result_count, result_datatype, target_rank,
394               target_disp, target_count, target_datatype, &req);
395   if (req != MPI_REQUEST_NULL)
396     Request::wait(&req, MPI_STATUS_IGNORE);
397   if(op!=MPI_NO_OP)
398     accumulate(origin_addr, origin_count, origin_datatype, target_rank,
399               target_disp, target_count, target_datatype, op, &req);
400   if (req != MPI_REQUEST_NULL)
401     Request::wait(&req, MPI_STATUS_IGNORE);
402   xbt_mutex_release(send_win->atomic_mut_);
403   return MPI_SUCCESS;
404
405 }
406
407 int Win::compare_and_swap(void *origin_addr, void *compare_addr,
408         void *result_addr, MPI_Datatype datatype, int target_rank,
409         MPI_Aint target_disp){
410   //get sender pointer
411   MPI_Win send_win = connected_wins_[target_rank];
412
413   if(opened_==0){//check that post/start has been done
414     // no fence or start .. lock ok ?
415     int locked=0;
416     for (auto const& it : send_win->lockers_)
417       if (it == comm_->rank())
418         locked = 1;
419     if(locked != 1)
420       return MPI_ERR_WIN;
421   }
422
423   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
424   MPI_Request req = MPI_REQUEST_NULL;
425   xbt_mutex_acquire(send_win->atomic_mut_);
426   get(result_addr, 1, datatype, target_rank,
427               target_disp, 1, datatype, &req);
428   if (req != MPI_REQUEST_NULL)
429     Request::wait(&req, MPI_STATUS_IGNORE);
430   if (not memcmp(result_addr, compare_addr, datatype->get_extent())) {
431     put(origin_addr, 1, datatype, target_rank,
432               target_disp, 1, datatype);
433   }
434   xbt_mutex_release(send_win->atomic_mut_);
435   return MPI_SUCCESS;
436 }
437
438 int Win::start(MPI_Group group, int assert){
439     /* From MPI forum advices
440     The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
441     will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
442     the target process. This still leaves much choice to implementors. The call to MPI_WIN_START can block until the
443     matching call to MPI_WIN_POST occurs at all target processes. One can also have implementations where the call to
444     MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching call to MPI_WIN_POST occurred; or
445     implementations where the first two calls are nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call
446     to MPI_WIN_POST occurred; or even implementations where all three calls can complete before any target process
447     called MPI_WIN_POST --- the data put must be buffered, in this last case, so as to allow the put to complete at the
448     origin ahead of its completion at the target. However, once the call to MPI_WIN_POST is issued, the sequence above
449     must complete, without further dependencies.  */
450
451   //naive, blocking implementation.
452     int i             = 0;
453     int j             = 0;
454     int size          = group->size();
455     MPI_Request* reqs = xbt_new0(MPI_Request, size);
456
457   XBT_DEBUG("Entering MPI_Win_Start");
458     while (j != size) {
459       int src = comm_->group()->rank(group->actor(j));
460       if (src != rank_ && src != MPI_UNDEFINED) { // TODO cheinrich: The check of MPI_UNDEFINED should be useless here
461         reqs[i] = Request::irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, comm_);
462         i++;
463       }
464       j++;
465     }
466   size=i;
467   Request::startall(size, reqs);
468   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
469   for(i=0;i<size;i++){
470     Request::unref(&reqs[i]);
471   }
472   xbt_free(reqs);
473   opened_++; //we're open for business !
474   group_=group;
475   group->ref();
476   XBT_DEBUG("Leaving MPI_Win_Start");
477   return MPI_SUCCESS;
478 }
479
480 int Win::post(MPI_Group group, int assert){
481   //let's make a synchronous send here
482   int i             = 0;
483   int j             = 0;
484   int size = group->size();
485   MPI_Request* reqs = xbt_new0(MPI_Request, size);
486
487   XBT_DEBUG("Entering MPI_Win_Post");
488   while(j!=size){
489     int dst = comm_->group()->rank(group->actor(j));
490     if (dst != rank_ && dst != MPI_UNDEFINED) {
491       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, comm_);
492       i++;
493     }
494     j++;
495   }
496   size=i;
497
498   Request::startall(size, reqs);
499   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
500   for(i=0;i<size;i++){
501     Request::unref(&reqs[i]);
502   }
503   xbt_free(reqs);
504   opened_++; //we're open for business !
505   group_=group;
506   group->ref();
507   XBT_DEBUG("Leaving MPI_Win_Post");
508   return MPI_SUCCESS;
509 }
510
511 int Win::complete(){
512   if(opened_==0)
513     xbt_die("Complete called on already opened MPI_Win");
514
515   XBT_DEBUG("Entering MPI_Win_Complete");
516   int i             = 0;
517   int j             = 0;
518   int size = group_->size();
519   MPI_Request* reqs = xbt_new0(MPI_Request, size);
520
521   while(j!=size){
522     int dst = comm_->group()->rank(group_->actor(j));
523     if (dst != rank_ && dst != MPI_UNDEFINED) {
524       reqs[i]=Request::send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, comm_);
525       i++;
526     }
527     j++;
528   }
529   size=i;
530   XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size);
531   Request::startall(size, reqs);
532   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
533
534   for(i=0;i<size;i++){
535     Request::unref(&reqs[i]);
536   }
537   xbt_free(reqs);
538
539   int finished = finish_comms();
540   XBT_DEBUG("Win_complete - Finished %d RMA calls", finished);
541
542   Group::unref(group_);
543   opened_--; //we're closed for business !
544   return MPI_SUCCESS;
545 }
546
547 int Win::wait(){
548   //naive, blocking implementation.
549   XBT_DEBUG("Entering MPI_Win_Wait");
550   int i             = 0;
551   int j             = 0;
552   int size          = group_->size();
553   MPI_Request* reqs = xbt_new0(MPI_Request, size);
554
555   while(j!=size){
556     int src = comm_->group()->rank(group_->actor(j));
557     if (src != rank_ && src != MPI_UNDEFINED) {
558       reqs[i]=Request::irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, comm_);
559       i++;
560     }
561     j++;
562   }
563   size=i;
564   XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size);
565   Request::startall(size, reqs);
566   Request::waitall(size, reqs, MPI_STATUSES_IGNORE);
567   for(i=0;i<size;i++){
568     Request::unref(&reqs[i]);
569   }
570   xbt_free(reqs);
571   int finished = finish_comms();
572   XBT_DEBUG("Win_wait - Finished %d RMA calls", finished);
573
574   Group::unref(group_);
575   opened_--; //we're opened for business !
576   return MPI_SUCCESS;
577 }
578
579 int Win::lock(int lock_type, int rank, int assert){
580   MPI_Win target_win = connected_wins_[rank];
581
582   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
583     xbt_mutex_acquire(target_win->lock_mut_);
584     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
585     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
586       xbt_mutex_release(target_win->lock_mut_);
587    }
588   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
589     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
590
591   target_win->lockers_.push_back(comm_->rank());
592
593   int finished = finish_comms(rank);
594   XBT_DEBUG("Win_lock %d - Finished %d RMA calls", rank, finished);
595   finished = target_win->finish_comms(rank_);
596   XBT_DEBUG("Win_lock target %d - Finished %d RMA calls", rank, finished);
597   return MPI_SUCCESS;
598 }
599
600 int Win::lock_all(int assert){
601   int i=0;
602   int retval = MPI_SUCCESS;
603   for (i=0; i<comm_->size();i++){
604       int ret = this->lock(MPI_LOCK_SHARED, i, assert);
605       if(ret != MPI_SUCCESS)
606         retval = ret;
607   }
608   return retval;
609 }
610
611 int Win::unlock(int rank){
612   MPI_Win target_win = connected_wins_[rank];
613   int target_mode = target_win->mode_;
614   target_win->mode_= 0;
615   target_win->lockers_.remove(comm_->rank());
616   if (target_mode==MPI_LOCK_EXCLUSIVE){
617     xbt_mutex_release(target_win->lock_mut_);
618   }
619
620   int finished = finish_comms(rank);
621   XBT_DEBUG("Win_unlock %d - Finished %d RMA calls", rank, finished);
622   finished = target_win->finish_comms(rank_);
623   XBT_DEBUG("Win_unlock target %d - Finished %d RMA calls", rank, finished);
624   return MPI_SUCCESS;
625 }
626
627 int Win::unlock_all(){
628   int i=0;
629   int retval = MPI_SUCCESS;
630   for (i=0; i<comm_->size();i++){
631       int ret = this->unlock(i);
632       if(ret != MPI_SUCCESS)
633         retval = ret;
634   }
635   return retval;
636 }
637
638 int Win::flush(int rank){
639   MPI_Win target_win = connected_wins_[rank];
640   int finished = finish_comms(rank);
641   XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished);
642   finished = target_win->finish_comms(rank_);
643   XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished);
644   return MPI_SUCCESS;
645 }
646
647 int Win::flush_local(int rank){
648   int finished = finish_comms(rank);
649   XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished);
650   return MPI_SUCCESS;
651 }
652
653 int Win::flush_all(){
654   int i=0;
655   int finished = 0;
656   finished = finish_comms();
657   XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished);
658   for (i=0; i<comm_->size();i++){
659     finished = connected_wins_[i]->finish_comms(rank_);
660     XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished);
661   }
662   return MPI_SUCCESS;
663 }
664
665 int Win::flush_local_all(){
666   int finished = finish_comms();
667   XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished);
668   return MPI_SUCCESS;
669 }
670
671 Win* Win::f2c(int id){
672   return static_cast<Win*>(F2C::f2c(id));
673 }
674
675
676 int Win::finish_comms(){
677   xbt_mutex_acquire(mut_);
678   //Finish own requests
679   std::vector<MPI_Request> *reqqs = requests_;
680   int size = static_cast<int>(reqqs->size());
681   if (size > 0) {
682     MPI_Request* treqs = &(*reqqs)[0];
683     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
684     reqqs->clear();
685   }
686   xbt_mutex_release(mut_);
687   return size;
688 }
689
690 int Win::finish_comms(int rank){
691   xbt_mutex_acquire(mut_);
692   //Finish own requests
693   std::vector<MPI_Request> *reqqs = requests_;
694   int size = static_cast<int>(reqqs->size());
695   if (size > 0) {
696     size = 0;
697     std::vector<MPI_Request> myreqqs;
698     std::vector<MPI_Request>::iterator iter = reqqs->begin();
699     while (iter != reqqs->end()){
700       if(((*iter)!=MPI_REQUEST_NULL) && (((*iter)->src() == rank) || ((*iter)->dst() == rank))){
701         myreqqs.push_back(*iter);
702         iter = reqqs->erase(iter);
703         size++;
704       } else {
705         ++iter;
706       }
707     }
708     if(size >0){
709       MPI_Request* treqs = &myreqqs[0];
710       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
711       myreqqs.clear();
712     }
713   }
714   xbt_mutex_release(mut_);
715   return size;
716 }
717
718
719 }
720 }