Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Properly terminate non blocking collective requests in all cases (mpi_test/testall...
authorAugustin Degomme <adegomme@users.noreply.github.com>
Fri, 7 May 2021 10:17:54 +0000 (12:17 +0200)
committerAugustin Degomme <adegomme@users.noreply.github.com>
Fri, 7 May 2021 10:23:06 +0000 (12:23 +0200)
Also use flags to identify non blocking collectivre requests instead of the size of an array (which should be turned in a vector soon)
Fix one issue reported in #67

src/smpi/bindings/smpi_pmpi_request.cpp
src/smpi/colls/smpi_nbc_impl.cpp
src/smpi/include/private.hpp
src/smpi/include/smpi_request.hpp
src/smpi/mpi/smpi_request.cpp

index e0abad6..19de25e 100644 (file)
@@ -617,8 +617,7 @@ int PMPI_Wait(MPI_Request * request, MPI_Status * status)
   } else {
     // for tracing, save the handle which might get overridden before we can use the helper on it
     MPI_Request savedreq = *request;
-    if (savedreq != MPI_REQUEST_NULL && not(savedreq->flags() & MPI_REQ_FINISHED)
-    && not(savedreq->flags() & MPI_REQ_GENERALIZED))
+    if (savedreq != MPI_REQUEST_NULL && not(savedreq->flags() & (MPI_REQ_FINISHED | MPI_REQ_GENERALIZED | MPI_REQ_NBC)))
       savedreq->ref();//don't erase the handle in Request::wait, we'll need it later
     else
       savedreq = MPI_REQUEST_NULL;
@@ -652,7 +651,7 @@ int PMPI_Waitany(int count, MPI_Request requests[], int *index, MPI_Status * sta
   // for tracing, save the handles which might get overridden before we can use the helper on it
   std::vector<MPI_Request> savedreqs(requests, requests + count);
   for (MPI_Request& req : savedreqs) {
-    if (req != MPI_REQUEST_NULL && not(req->flags() & MPI_REQ_FINISHED))
+    if (req != MPI_REQUEST_NULL && not(req->flags() & (MPI_REQ_FINISHED | MPI_REQ_NBC)))
       req->ref();
     else
       req = MPI_REQUEST_NULL;
@@ -683,7 +682,7 @@ int PMPI_Waitall(int count, MPI_Request requests[], MPI_Status status[])
   // for tracing, save the handles which might get overridden before we can use the helper on it
   std::vector<MPI_Request> savedreqs(requests, requests + count);
   for (MPI_Request& req : savedreqs) {
-    if (req != MPI_REQUEST_NULL && not(req->flags() & MPI_REQ_FINISHED))
+    if (req != MPI_REQUEST_NULL && not(req->flags() & (MPI_REQ_FINISHED | MPI_REQ_NBC)))
       req->ref();
     else
       req = MPI_REQUEST_NULL;
index 6c0c050..f3b4a46 100644 (file)
@@ -17,7 +17,7 @@ int colls::ibarrier(MPI_Comm comm, MPI_Request* request, int external)
   int rank = comm->rank();
   int system_tag=COLL_TAG_BARRIER-external;
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   if (rank > 0) {
     auto* requests = new MPI_Request[2];
     requests[0] = Request::isend (nullptr, 0, MPI_BYTE, 0,
@@ -46,7 +46,7 @@ int colls::ibcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Com
   int rank = comm->rank();
   int system_tag=COLL_TAG_BCAST-external;
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   if (rank != root) {
     auto* requests = new MPI_Request[1];
     requests[0] = Request::irecv (buf, count, datatype, root,
@@ -82,7 +82,7 @@ int colls::iallgather(const void* sendbuf, int sendcount, MPI_Datatype sendtype,
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   // FIXME: check for errors
   recvtype->extent(&lb, &recvext);
   // Local copy from self
@@ -115,7 +115,7 @@ int colls::iscatter(const void* sendbuf, int sendcount, MPI_Datatype sendtype, v
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   if(rank != root) {
     auto* requests = new MPI_Request[1];
     // Recv buffer from root
@@ -155,7 +155,7 @@ int colls::iallgatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   recvtype->extent(&lb, &recvext);
   // Local copy from self
   Datatype::copy(sendbuf, sendcount, sendtype,
@@ -191,7 +191,7 @@ int colls::ialltoall(const void* sendbuf, int sendcount, MPI_Datatype sendtype,
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   sendtype->extent(&lb, &sendext);
   recvtype->extent(&lb, &recvext);
   /* simple optimization */
@@ -237,7 +237,7 @@ int colls::ialltoallv(const void* sendbuf, const int* sendcounts, const int* sen
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   sendtype->extent(&lb, &sendext);
   recvtype->extent(&lb, &recvext);
   /* Local copy from self */
@@ -284,7 +284,7 @@ int colls::ialltoallw(const void* sendbuf, const int* sendcounts, const int* sen
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   /* Local copy from self */
   int err = (sendcounts[rank]>0 && recvcounts[rank]) ? Datatype::copy(static_cast<const char *>(sendbuf) + senddisps[rank], sendcounts[rank], sendtypes[rank],
                                static_cast<char *>(recvbuf) + recvdisps[rank], recvcounts[rank], recvtypes[rank]): MPI_SUCCESS;
@@ -329,7 +329,7 @@ int colls::igather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, vo
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   if(rank != root) {
     // Send buffer to root
     auto* requests = new MPI_Request[1];
@@ -368,7 +368,7 @@ int colls::igatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, v
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   if (rank != root) {
     // Send buffer to root
     auto* requests = new MPI_Request[1];
@@ -406,7 +406,7 @@ int colls::iscatterv(const void* sendbuf, const int* sendcounts, const int* disp
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( nullptr, 0, MPI_BYTE,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
   if(rank != root) {
     // Recv buffer from root
     auto* requests = new MPI_Request[1];
@@ -460,11 +460,11 @@ int colls::ireduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype d
 
   if(rank == root){
     (*request) =  new Request( recvbuf, count, datatype,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
   }
   else
     (*request) = new Request( nullptr, count, datatype,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
 
   if(rank != root) {
     // Send buffer to root
@@ -507,7 +507,7 @@ int colls::iallreduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatyp
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( recvbuf, count, datatype,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
   // FIXME: check for errors
   datatype->extent(&lb, &dataext);
   // Local copy from self
@@ -539,7 +539,7 @@ int colls::iscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype dat
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( recvbuf, count, datatype,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
   datatype->extent(&lb, &dataext);
 
   // Local copy from self
@@ -571,7 +571,7 @@ int colls::iexscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype d
   int rank = comm->rank();
   int size = comm->size();
   (*request) = new Request( recvbuf, count, datatype,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
   datatype->extent(&lb, &dataext);
   if(rank != 0)
     memset(recvbuf, 0, count*dataext);
@@ -605,7 +605,7 @@ int colls::ireduce_scatter(const void* sendbuf, void* recvbuf, const int* recvco
   int size = comm->size();
   int count=recvcounts[rank];
   (*request) = new Request( recvbuf, count, datatype,
-                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
+                         rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op);
   datatype->extent(&lb, &dataext);
 
   // Send/Recv buffers to/from others;
index dbc9d31..870a397 100644 (file)
@@ -29,6 +29,7 @@ constexpr unsigned MPI_REQ_COMPLETE       = 0x1000;
 constexpr unsigned MPI_REQ_BSEND          = 0x2000;
 constexpr unsigned MPI_REQ_MATCHED        = 0x4000;
 constexpr unsigned MPI_REQ_CANCELLED      = 0x8000;
+constexpr unsigned MPI_REQ_NBC            = 0x10000;
 
 enum class SmpiProcessState { UNINITIALIZED, INITIALIZING, INITIALIZED /*(=MPI_Init called)*/, FINALIZED };
 
index a4c08f9..8027533 100644 (file)
@@ -73,6 +73,7 @@ public:
   void init_buffer(int count);
   void ref();
   void set_nbc_requests(MPI_Request* reqs, int size);
+  static int finish_nbc_requests(MPI_Request* req);
   int get_nbc_requests_size() const;
   MPI_Request* get_nbc_requests() const;
   static void finish_wait(MPI_Request* request, MPI_Status* status);
index 7736157..b782759 100644 (file)
@@ -598,19 +598,7 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
 
   static int nsleeps = 1;
   int ret = MPI_SUCCESS;
-  
-  // Are we testing a request meant for non blocking collectives ?
-  // If so, test all the subrequests.
-  if ((*request)->nbc_requests_size_>0){
-    ret = testall((*request)->nbc_requests_size_, (*request)->nbc_requests_, flag, MPI_STATUSES_IGNORE);
-    if(*flag){
-      delete[] (*request)->nbc_requests_;
-      (*request)->nbc_requests_size_=0;
-      unref(request);
-    }
-    return ret;
-  }
-  
+
   if(smpi_test_sleep > 0)
     simgrid::s4u::this_actor::sleep_for(nsleeps * smpi_test_sleep);
 
@@ -856,6 +844,30 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
   xbt_assert(request == MPI_REQUEST_NULL);
 }
 
+int Request::finish_nbc_requests(MPI_Request* request){
+  int ret = waitall((*request)->nbc_requests_size_, (*request)->nbc_requests_, MPI_STATUSES_IGNORE);
+  XBT_DEBUG("finish non blocking collective request with %d sub-requests", (*request)->nbc_requests_size_);
+  for (int i = 0; i < (*request)->nbc_requests_size_; i++) {
+    if((*request)->buf_!=nullptr && (*request)->nbc_requests_[i]!=MPI_REQUEST_NULL){//reduce case
+      void * buf=(*request)->nbc_requests_[i]->buf_;
+      if((*request)->old_type_->flags() & DT_FLAG_DERIVED)
+        buf=(*request)->nbc_requests_[i]->old_buf_;
+      if((*request)->nbc_requests_[i]->flags_ & MPI_REQ_RECV ){
+        if((*request)->op_!=MPI_OP_NULL){
+          int count=(*request)->size_/ (*request)->old_type_->size();
+          (*request)->op_->apply(buf, (*request)->buf_, &count, (*request)->old_type_);
+        }
+        smpi_free_tmp_buffer(static_cast<unsigned char*>(buf));
+      }
+    }
+    if((*request)->nbc_requests_[i]!=MPI_REQUEST_NULL)
+      Request::unref(&((*request)->nbc_requests_[i]));
+  }
+  delete[] (*request)->nbc_requests_;
+  (*request)->nbc_requests_size_=0;
+  return ret;
+}
+
 void Request::finish_wait(MPI_Request* request, MPI_Status * status)
 {
   MPI_Request req = *request;
@@ -869,6 +881,12 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
     return;
   }
 
+  if ((*request)->flags() & MPI_REQ_NBC){
+    int ret = finish_nbc_requests(request);
+    if (ret != MPI_SUCCESS)
+      xbt_die("error when finishing non blocking collective requests");
+  }
+
   if ((req->flags_ & (MPI_REQ_PREPARED | MPI_REQ_GENERALIZED | MPI_REQ_FINISHED)) == 0) {
     if (status != MPI_STATUS_IGNORE) {
       if (req->src_== MPI_PROC_NULL || req->dst_== MPI_PROC_NULL){
@@ -969,32 +987,6 @@ int Request::wait(MPI_Request * request, MPI_Status * status)
     (*request)=MPI_REQUEST_NULL;
     return ret;
   }
-  // Are we waiting on a request meant for non blocking collectives ?
-  // If so, wait for all the subrequests.
-  if ((*request)->nbc_requests_size_>0){
-    ret = waitall((*request)->nbc_requests_size_, (*request)->nbc_requests_, MPI_STATUSES_IGNORE);
-    for (int i = 0; i < (*request)->nbc_requests_size_; i++) {
-      if((*request)->buf_!=nullptr && (*request)->nbc_requests_[i]!=MPI_REQUEST_NULL){//reduce case
-        void * buf=(*request)->nbc_requests_[i]->buf_;
-        if((*request)->old_type_->flags() & DT_FLAG_DERIVED)
-          buf=(*request)->nbc_requests_[i]->old_buf_;
-        if((*request)->nbc_requests_[i]->flags_ & MPI_REQ_RECV ){
-          if((*request)->op_!=MPI_OP_NULL){
-            int count=(*request)->size_/ (*request)->old_type_->size();
-            (*request)->op_->apply(buf, (*request)->buf_, &count, (*request)->old_type_);
-          }
-          smpi_free_tmp_buffer(static_cast<unsigned char*>(buf));
-        }
-      }
-      if((*request)->nbc_requests_[i]!=MPI_REQUEST_NULL)
-        Request::unref(&((*request)->nbc_requests_[i]));
-    }
-    delete[] (*request)->nbc_requests_;
-    (*request)->nbc_requests_size_=0;
-    unref(request);
-    (*request)=MPI_REQUEST_NULL;
-    return ret;
-  }
 
   (*request)->print_request("Waiting");
   if ((*request)->flags_ & (MPI_REQ_PREPARED | MPI_REQ_FINISHED)) {