Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
SMPI: enforce MPI message ordering. Fix #100 (hopefully)
[simgrid.git] / src / smpi / mpi / smpi_request.cpp
index 2b54eac..07b98cb 100644 (file)
@@ -69,6 +69,7 @@ Request::Request(const void* buf, int count, MPI_Datatype datatype, aid_t src, a
     refcount_ = 1;
   else
     refcount_ = 0;
+  message_id_ = 0;
   init_buffer(count);
   this->add_f();
 }
@@ -146,8 +147,6 @@ bool Request::match_common(MPI_Request req, MPI_Request sender, MPI_Request rece
       if (receiver->real_size_ < sender->real_size_){
         XBT_DEBUG("Truncating message - should not happen: receiver size : %zu < sender size : %zu", receiver->real_size_, sender->real_size_);
         receiver->truncated_ = true;
-      } else if (receiver->real_size_ > sender->real_size_){
-        receiver->real_size_=sender->real_size_;
       }
     }
     //0-sized datatypes/counts should not interfere and match
@@ -186,7 +185,24 @@ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*)
 {
   auto ref = static_cast<MPI_Request>(a);
   auto req = static_cast<MPI_Request>(b);
-  return match_common(req, req, ref);
+  bool match = match_common(req, req, ref);
+  if (match && (ref->comm_ != MPI_COMM_UNINITIALIZED) && !ref->comm_->is_smp_comm()){
+    if (ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_) == req->message_id_ ){
+      if (((ref->flags_ & MPI_REQ_PROBE) == 0 ) && ((req->flags_ & MPI_REQ_PROBE) == 0)){
+        XBT_DEBUG("increasing count in comm %p, which was %u from pid %ld, to pid %ld with tag %d", ref->comm_, ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_), req->src_, req->dst_, req->tag_);
+        ref->comm_->increment_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_);
+        if (ref->real_size_ > req->real_size_){
+          ref->real_size_=req->real_size_;
+        }
+      }
+    } else {
+      match = false;
+      req->flags_ &= ~MPI_REQ_MATCHED;
+      ref->detached_sender_=nullptr;
+      XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u != %u, from pid %ld to pid %ld, with tag %d",ref->comm_, ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_), req->message_id_ , req->src_, req->dst_, req->tag_);
+    }
+  }
+  return match;
 }
 
 bool Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*)
@@ -444,6 +460,9 @@ void Request::start()
     if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
       mut->lock();
 
+    bool is_probe = ((flags_ & MPI_REQ_PROBE) != 0);
+    flags_ |= MPI_REQ_PROBE;
+
     if (smpi_cfg_async_small_thresh() == 0 && (flags_ & MPI_REQ_RMA) == 0) {
       mailbox = process->mailbox();
     } else if (((flags_ & MPI_REQ_RMA) != 0) || static_cast<int>(size_) < smpi_cfg_async_small_thresh()) {
@@ -463,7 +482,7 @@ void Request::start()
           mailbox = process->mailbox_small();
         }
       } else {
-        XBT_DEBUG("yes there was something for us in the large mailbox");
+        XBT_DEBUG("yes there was something for us in the small mailbox");
       }
     } else {
       mailbox = process->mailbox_small();
@@ -477,6 +496,8 @@ void Request::start()
         XBT_DEBUG("yes there was something for us in the small mailbox");
       }
     }
+    if(!is_probe)
+      flags_ &= ~MPI_REQ_PROBE;
 
     action_   = simcall_comm_irecv(
         process->get_actor()->get_impl(), mailbox->get_impl(), buf_, &real_size_, &match_recv,
@@ -492,6 +513,9 @@ void Request::start()
       TRACE_smpi_send(src_, src_, dst_, tag_, size_);
     this->print_request("New send");
 
+    message_id_=comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
+    comm_->increment_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
+
     void* buf = buf_;
     if ((flags_ & MPI_REQ_SSEND) == 0 &&
         ((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 ||
@@ -541,6 +565,9 @@ void Request::start()
     if (not(smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)) {
       mailbox = process->mailbox();
     } else if (((flags_ & MPI_REQ_RMA) != 0) || static_cast<int>(size_) < smpi_cfg_async_small_thresh()) { // eager mode
+      bool is_probe = ((flags_ & MPI_REQ_PROBE) != 0);
+      flags_ |= MPI_REQ_PROBE;
+
       mailbox = process->mailbox();
       XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %s?", mailbox->get_cname());
       simgrid::kernel::activity::ActivityImplPtr action = mailbox->iprobe(1, &match_send, static_cast<void*>(this));
@@ -562,6 +589,8 @@ void Request::start()
       } else {
         XBT_DEBUG("Yes there was something for us in the large mailbox");
       }
+      if(!is_probe)
+        flags_ &= ~MPI_REQ_PROBE;
     } else {
       mailbox = process->mailbox();
       XBT_DEBUG("Send request %p is in the large mailbox %s (buf: %p)", this, mailbox->get_cname(), buf_);