Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Crash and burn when a message is truncated.
[simgrid.git] / src / smpi / mpi / smpi_request.cpp
index df1f75b..93098dc 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved.          */
+/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -118,8 +118,9 @@ bool Request::match_common(MPI_Request req, MPI_Request sender, MPI_Request rece
       receiver->real_src_ = sender->src_;
     if (receiver->tag_ == MPI_ANY_TAG)
       receiver->real_tag_ = sender->tag_;
-    if (receiver->real_size_ < sender->real_size_)
+    if (receiver->real_size_ < sender->real_size_){
       receiver->truncated_ = true;
+    }
     if (sender->detached_)
       receiver->detached_sender_ = sender; // tie the sender to the receiver, as it is detached and has to be freed in
                                            // the receiver
@@ -375,6 +376,8 @@ void Request::start()
   flags_ &= ~MPI_REQ_FINISHED;
   this->ref();
 
+  // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
+  real_size_=size_;
   if ((flags_ & MPI_REQ_RECV) != 0) {
     this->print_request("New recv");
 
@@ -418,8 +421,6 @@ void Request::start()
       }
     }
 
-    // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
-    real_size_=size_;
     action_   = simcall_comm_irecv(
         process->get_actor()->get_impl(), mailbox->get_impl(), buf_, &real_size_, &match_recv,
         process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, -1.0);
@@ -510,8 +511,6 @@ void Request::start()
       XBT_DEBUG("Send request %p is in the large mailbox %s (buf: %p)", this, mailbox->get_cname(), buf_);
     }
 
-    // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
-    real_size_=size_;
     size_t payload_size_ = size_ + 16;//MPI enveloppe size (tag+dest+communicator)
     action_   = simcall_comm_isend(
         simgrid::s4u::Actor::by_pid(src_)->get_impl(), mailbox->get_impl(), payload_size_, -1.0, buf, real_size_, &match_send,
@@ -551,11 +550,13 @@ void Request::cancel()
 }
 
 int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
-  //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before)
+  // assume that *request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before)
   // to avoid deadlocks if used as a break condition, such as
   //     while (MPI_Test(request, flag, status) && flag) dostuff...
   // because the time will not normally advance when only calls to MPI_Test are made -> deadlock
   // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
+  xbt_assert(*request != MPI_REQUEST_NULL);
+
   static int nsleeps = 1;
   int ret = MPI_SUCCESS;
   
@@ -585,12 +586,10 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
         return ret;
       }
     }
-    if (*request != MPI_REQUEST_NULL && 
-        ((*request)->flags_ & MPI_REQ_GENERALIZED)
-        && !((*request)->flags_ & MPI_REQ_COMPLETE)) 
+    if (((*request)->flags_ & MPI_REQ_GENERALIZED) && !((*request)->flags_ & MPI_REQ_COMPLETE))
       *flag=0;
     if (*flag) {
-      finish_wait(request,status);
+      finish_wait(request, status); // may invalidate *request
       if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){
         MPI_Status tmp_status;
         MPI_Status* mystatus;
@@ -892,11 +891,31 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
   if (req->flags_ & MPI_REQ_PERSISTENT)
     req->action_ = nullptr;
   req->flags_ |= MPI_REQ_FINISHED;
+
+  if (req->truncated_) {
+    char error_string[MPI_MAX_ERROR_STRING];
+    int error_size;
+    PMPI_Error_string(MPI_ERR_TRUNCATE, error_string, &error_size);
+    MPI_Errhandler err = (req->comm_) ? (req->comm_)->errhandler() : MPI_ERRHANDLER_NULL;
+    if (err == MPI_ERRHANDLER_NULL || err == MPI_ERRORS_RETURN)
+      XBT_WARN("recv - returned %.*s instead of MPI_SUCCESS", error_size, error_string);
+    else if (err == MPI_ERRORS_ARE_FATAL)
+      xbt_die("recv - returned %.*s instead of MPI_SUCCESS", error_size, error_string);
+    else
+      err->call((req->comm_), MPI_ERR_TRUNCATE);
+    if (err != MPI_ERRHANDLER_NULL)
+      simgrid::smpi::Errhandler::unref(err);
+    MC_assert(not MC_is_active()); /* Only fail in MC mode */
+  }
   unref(request);
+
 }
 
 int Request::wait(MPI_Request * request, MPI_Status * status)
 {
+  // assume that *request is not MPI_REQUEST_NULL (filtered in PMPI_Wait before)
+  xbt_assert(*request != MPI_REQUEST_NULL);
+
   int ret=MPI_SUCCESS;
   // Are we waiting on a request meant for non blocking collectives ?
   // If so, wait for all the subrequests.
@@ -940,7 +959,7 @@ int Request::wait(MPI_Request * request, MPI_Status * status)
       }
   }
 
-  if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){
+  if ((*request)->flags_ & MPI_REQ_GENERALIZED) {
     if(!((*request)->flags_ & MPI_REQ_COMPLETE)){
       ((*request)->generalized_funcs)->mutex->lock();
       ((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex);
@@ -957,7 +976,7 @@ int Request::wait(MPI_Request * request, MPI_Status * status)
     ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus);
   }
 
-  finish_wait(request,status);
+  finish_wait(request, status); // may invalidate *request
   if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & MPI_REQ_NON_PERSISTENT) != 0))
     *request = MPI_REQUEST_NULL;
   return ret;
@@ -965,12 +984,11 @@ int Request::wait(MPI_Request * request, MPI_Status * status)
 
 int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
 {
-  std::vector<simgrid::kernel::activity::CommImpl*> comms;
-  comms.reserve(count);
   int index = MPI_UNDEFINED;
 
   if(count > 0) {
     // Wait for a request to complete
+    std::vector<simgrid::kernel::activity::CommImpl*> comms;
     std::vector<int> map;
     XBT_DEBUG("Wait for one of %d", count);
     for(int i = 0; i < count; i++) {
@@ -982,7 +1000,7 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
           map.push_back(i);
         } else {
           // This is a finished detached request, let's return this one
-          comms.clear(); // so we free don't do the waitany call
+          comms.clear(); // don't do the waitany call afterwards
           index = i;
           finish_wait(&requests[i], status); // cleanup if refcount = 0
           if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
@@ -993,13 +1011,12 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
     }
     if (not comms.empty()) {
       XBT_DEBUG("Enter waitany for %zu comms", comms.size());
-      int i=MPI_UNDEFINED;
+      int i;
       try{
-        // this is not a detached send
         i = simcall_comm_waitany(comms.data(), comms.size(), -1);
       } catch (const Exception&) {
-        XBT_INFO("request %d cancelled ", i);
-        return i;
+        XBT_INFO("request cancelled");
+        i = -1;
       }
 
       // not MPI_UNDEFINED, as this is a simix return code