Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add finalizing state for smpi actor:
authorAugustin Degomme <adegomme@users.noreply.github.com>
Sun, 20 Mar 2022 13:01:45 +0000 (14:01 +0100)
committerAugustin Degomme <adegomme@users.noreply.github.com>
Sun, 20 Mar 2022 13:01:45 +0000 (14:01 +0100)
if finalization barrier is used, tag is now different, in order not to match against other MPI_Barrier that may be in the MPI code

src/smpi/bindings/smpi_pmpi.cpp
src/smpi/colls/barrier/barrier-mvapich2-pair.cpp
src/smpi/colls/barrier/barrier-ompi.cpp
src/smpi/include/private.hpp
src/smpi/include/smpi_actor.hpp
src/smpi/internals/smpi_actor.cpp

index 8764f14..7de62bd 100644 (file)
@@ -65,6 +65,7 @@ int PMPI_Finalize()
 {
   smpi_bench_end();
   aid_t rank_traced = simgrid::s4u::this_actor::get_pid();
 {
   smpi_bench_end();
   aid_t rank_traced = simgrid::s4u::this_actor::get_pid();
+  smpi_process()->mark_as_finalizing();
   TRACE_smpi_comm_in(rank_traced, __func__, new simgrid::instr::NoOpTIData("finalize"));
 
   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
   TRACE_smpi_comm_in(rank_traced, __func__, new simgrid::instr::NoOpTIData("finalize"));
 
   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
index 57b87e2..8def355 100644 (file)
@@ -41,6 +41,7 @@
 
 #include "../coll_tuned_topo.hpp"
 #include "../colls_private.hpp"
 
 #include "../coll_tuned_topo.hpp"
 #include "../colls_private.hpp"
+#include "smpi_actor.hpp"
 namespace simgrid{
 namespace smpi{
 int barrier__mvapich2_pair(MPI_Comm comm)
 namespace simgrid{
 namespace smpi{
 int barrier__mvapich2_pair(MPI_Comm comm)
@@ -49,6 +50,7 @@ int barrier__mvapich2_pair(MPI_Comm comm)
     int size, rank;
     int d, dst, src;
     int mpi_errno = MPI_SUCCESS;
     int size, rank;
     int d, dst, src;
     int mpi_errno = MPI_SUCCESS;
+    int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER;
 
     size = comm->size();
     /* Trivial barriers return immediately */
 
     size = comm->size();
     /* Trivial barriers return immediately */
@@ -68,25 +70,25 @@ int barrier__mvapich2_pair(MPI_Comm comm)
         if (rank < surfeit) {
             /* get the fanin letter from the upper "half" process: */
             dst = N2_prev + rank;
         if (rank < surfeit) {
             /* get the fanin letter from the upper "half" process: */
             dst = N2_prev + rank;
-            Request::recv(nullptr, 0, MPI_BYTE, dst, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+            Request::recv(nullptr, 0, MPI_BYTE, dst, tag, comm, MPI_STATUS_IGNORE);
         }
 
         /* combine on embedded N2_prev power-of-two processes */
         for (d = 1; d < N2_prev; d <<= 1) {
             dst = (rank ^ d);
         }
 
         /* combine on embedded N2_prev power-of-two processes */
         for (d = 1; d < N2_prev; d <<= 1) {
             dst = (rank ^ d);
-            Request::sendrecv(nullptr, 0, MPI_BYTE, dst, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, dst, COLL_TAG_BARRIER,
+            Request::sendrecv(nullptr, 0, MPI_BYTE, dst, tag, nullptr, 0, MPI_BYTE, dst, tag,
                               comm, MPI_STATUS_IGNORE);
         }
 
         /* fanout data to nodes above N2_prev... */
         if (rank < surfeit) {
             dst = N2_prev + rank;
                               comm, MPI_STATUS_IGNORE);
         }
 
         /* fanout data to nodes above N2_prev... */
         if (rank < surfeit) {
             dst = N2_prev + rank;
-            Request::send(nullptr, 0, MPI_BYTE, dst, COLL_TAG_BARRIER, comm);
+            Request::send(nullptr, 0, MPI_BYTE, dst, tag, comm);
         }
     } else {
         /* fanin data to power of 2 subset */
         src = rank - N2_prev;
         }
     } else {
         /* fanin data to power of 2 subset */
         src = rank - N2_prev;
-        Request::sendrecv(nullptr, 0, MPI_BYTE, src, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, src, COLL_TAG_BARRIER,
+        Request::sendrecv(nullptr, 0, MPI_BYTE, src, tag, nullptr, 0, MPI_BYTE, src, tag,
                           comm, MPI_STATUS_IGNORE);
     }
 
                           comm, MPI_STATUS_IGNORE);
     }
 
index 3d540de..5c921c8 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "../coll_tuned_topo.hpp"
 #include "../colls_private.hpp"
 
 #include "../coll_tuned_topo.hpp"
 #include "../colls_private.hpp"
+#include "smpi_actor.hpp"
 
 /*
  * Barrier is meant to be a synchronous operation, as some BTLs can mark
 
 /*
  * Barrier is meant to be a synchronous operation, as some BTLs can mark
@@ -52,35 +53,35 @@ int barrier__ompi_doublering(MPI_Comm comm)
 
     rank = comm->rank();
     size = comm->size();
 
     rank = comm->rank();
     size = comm->size();
-
+    int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER;
     XBT_DEBUG("ompi_coll_tuned_barrier_ompi_doublering rank %d", rank);
 
     left = ((rank-1+size)%size);
     right = ((rank+1)%size);
 
     if (rank > 0) { /* receive message from the left */
     XBT_DEBUG("ompi_coll_tuned_barrier_ompi_doublering rank %d", rank);
 
     left = ((rank-1+size)%size);
     right = ((rank+1)%size);
 
     if (rank > 0) { /* receive message from the left */
-      Request::recv(nullptr, 0, MPI_BYTE, left, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+      Request::recv(nullptr, 0, MPI_BYTE, left, tag, comm, MPI_STATUS_IGNORE);
     }
 
     /* Send message to the right */
     }
 
     /* Send message to the right */
-    Request::send(nullptr, 0, MPI_BYTE, right, COLL_TAG_BARRIER, comm);
+    Request::send(nullptr, 0, MPI_BYTE, right, tag, comm);
 
     /* root needs to receive from the last node */
     if (rank == 0) {
 
     /* root needs to receive from the last node */
     if (rank == 0) {
-      Request::recv(nullptr, 0, MPI_BYTE, left, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+      Request::recv(nullptr, 0, MPI_BYTE, left, tag, comm, MPI_STATUS_IGNORE);
     }
 
     /* Allow nodes to exit */
     if (rank > 0) { /* post Receive from left */
     }
 
     /* Allow nodes to exit */
     if (rank > 0) { /* post Receive from left */
-      Request::recv(nullptr, 0, MPI_BYTE, left, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+      Request::recv(nullptr, 0, MPI_BYTE, left, tag, comm, MPI_STATUS_IGNORE);
     }
 
     /* send message to the right one */
     }
 
     /* send message to the right one */
-    Request::send(nullptr, 0, MPI_BYTE, right, COLL_TAG_BARRIER, comm);
+    Request::send(nullptr, 0, MPI_BYTE, right, tag, comm);
 
     /* rank 0 post receive from the last node */
     if (rank == 0) {
 
     /* rank 0 post receive from the last node */
     if (rank == 0) {
-      Request::recv(nullptr, 0, MPI_BYTE, left, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+      Request::recv(nullptr, 0, MPI_BYTE, left, tag, comm, MPI_STATUS_IGNORE);
     }
 
     return MPI_SUCCESS;
     }
 
     return MPI_SUCCESS;
@@ -99,6 +100,7 @@ int barrier__ompi_recursivedoubling(MPI_Comm comm)
 
     rank = comm->rank();
     size = comm->size();
 
     rank = comm->rank();
     size = comm->size();
+    int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER;
     XBT_DEBUG(
                  "ompi_coll_tuned_barrier_ompi_recursivedoubling rank %d",
                  rank);
     XBT_DEBUG(
                  "ompi_coll_tuned_barrier_ompi_recursivedoubling rank %d",
                  rank);
@@ -112,13 +114,13 @@ int barrier__ompi_recursivedoubling(MPI_Comm comm)
         if (rank >= adjsize) {
             /* send message to lower ranked node */
             remote = rank - adjsize;
         if (rank >= adjsize) {
             /* send message to lower ranked node */
             remote = rank - adjsize;
-            Request::sendrecv(nullptr, 0, MPI_BYTE, remote, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, remote,
-                              COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+            Request::sendrecv(nullptr, 0, MPI_BYTE, remote, tag, nullptr, 0, MPI_BYTE, remote,
+                              tag, comm, MPI_STATUS_IGNORE);
 
         } else if (rank < (size - adjsize)) {
 
             /* receive message from high level rank */
 
         } else if (rank < (size - adjsize)) {
 
             /* receive message from high level rank */
-            Request::recv(nullptr, 0, MPI_BYTE, rank + adjsize, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+            Request::recv(nullptr, 0, MPI_BYTE, rank + adjsize, tag, comm, MPI_STATUS_IGNORE);
         }
     }
 
         }
     }
 
@@ -131,8 +133,8 @@ int barrier__ompi_recursivedoubling(MPI_Comm comm)
             if (remote >= adjsize) continue;
 
             /* post receive from the remote node */
             if (remote >= adjsize) continue;
 
             /* post receive from the remote node */
-            Request::sendrecv(nullptr, 0, MPI_BYTE, remote, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, remote,
-                              COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+            Request::sendrecv(nullptr, 0, MPI_BYTE, remote, tag, nullptr, 0, MPI_BYTE, remote,
+                              tag, comm, MPI_STATUS_IGNORE);
         }
     }
 
         }
     }
 
@@ -141,7 +143,7 @@ int barrier__ompi_recursivedoubling(MPI_Comm comm)
         if (rank < (size - adjsize)) {
             /* send enter message to higher ranked node */
             remote = rank + adjsize;
         if (rank < (size - adjsize)) {
             /* send enter message to higher ranked node */
             remote = rank + adjsize;
-            Request::send(nullptr, 0, MPI_BYTE, remote, COLL_TAG_BARRIER, comm);
+            Request::send(nullptr, 0, MPI_BYTE, remote, tag, comm);
         }
     }
 
         }
     }
 
@@ -161,6 +163,7 @@ int barrier__ompi_bruck(MPI_Comm comm)
 
     rank = comm->rank();
     size = comm->size();
 
     rank = comm->rank();
     size = comm->size();
+    int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER;
     XBT_DEBUG(
                  "ompi_coll_tuned_barrier_ompi_bruck rank %d", rank);
 
     XBT_DEBUG(
                  "ompi_coll_tuned_barrier_ompi_bruck rank %d", rank);
 
@@ -170,7 +173,7 @@ int barrier__ompi_bruck(MPI_Comm comm)
         to   = (rank + distance) % size;
 
         /* send message to lower ranked node */
         to   = (rank + distance) % size;
 
         /* send message to lower ranked node */
-        Request::sendrecv(nullptr, 0, MPI_BYTE, to, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, from, COLL_TAG_BARRIER,
+        Request::sendrecv(nullptr, 0, MPI_BYTE, to, tag, nullptr, 0, MPI_BYTE, from, tag,
                           comm, MPI_STATUS_IGNORE);
     }
 
                           comm, MPI_STATUS_IGNORE);
     }
 
@@ -188,11 +191,12 @@ int barrier__ompi_two_procs(MPI_Comm comm)
     int remote;
 
     remote = comm->rank();
     int remote;
 
     remote = comm->rank();
+    int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER;
     XBT_DEBUG(
                  "ompi_coll_tuned_barrier_ompi_two_procs rank %d", remote);
     remote = (remote + 1) & 0x1;
 
     XBT_DEBUG(
                  "ompi_coll_tuned_barrier_ompi_two_procs rank %d", remote);
     remote = (remote + 1) & 0x1;
 
-    Request::sendrecv(nullptr, 0, MPI_BYTE, remote, COLL_TAG_BARRIER, nullptr, 0, MPI_BYTE, remote, COLL_TAG_BARRIER,
+    Request::sendrecv(nullptr, 0, MPI_BYTE, remote, tag, nullptr, 0, MPI_BYTE, remote, tag,
                       comm, MPI_STATUS_IGNORE);
     return (MPI_SUCCESS);
 }
                       comm, MPI_STATUS_IGNORE);
     return (MPI_SUCCESS);
 }
@@ -218,12 +222,13 @@ int barrier__ompi_basic_linear(MPI_Comm comm)
     int size = comm->size();
     int rank = comm->rank();
 
     int size = comm->size();
     int rank = comm->rank();
 
+    int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER;
     /* All non-root send & receive zero-length message. */
 
     if (rank > 0) {
     /* All non-root send & receive zero-length message. */
 
     if (rank > 0) {
-      Request::send(nullptr, 0, MPI_BYTE, 0, COLL_TAG_BARRIER, comm);
+      Request::send(nullptr, 0, MPI_BYTE, 0, tag, comm);
 
 
-      Request::recv(nullptr, 0, MPI_BYTE, 0, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+      Request::recv(nullptr, 0, MPI_BYTE, 0, tag, comm, MPI_STATUS_IGNORE);
     }
 
     /* The root collects and broadcasts the messages. */
     }
 
     /* The root collects and broadcasts the messages. */
@@ -233,12 +238,12 @@ int barrier__ompi_basic_linear(MPI_Comm comm)
 
         requests = new MPI_Request[size];
         for (i = 1; i < size; ++i) {
 
         requests = new MPI_Request[size];
         for (i = 1; i < size; ++i) {
-          requests[i] = Request::irecv(nullptr, 0, MPI_BYTE, i, COLL_TAG_BARRIER, comm);
+          requests[i] = Request::irecv(nullptr, 0, MPI_BYTE, i, tag, comm);
         }
         Request::waitall( size-1, requests+1, MPI_STATUSES_IGNORE );
 
         for (i = 1; i < size; ++i) {
         }
         Request::waitall( size-1, requests+1, MPI_STATUSES_IGNORE );
 
         for (i = 1; i < size; ++i) {
-          requests[i] = Request::isend(nullptr, 0, MPI_BYTE, i, COLL_TAG_BARRIER, comm);
+          requests[i] = Request::isend(nullptr, 0, MPI_BYTE, i, tag, comm);
         }
         Request::waitall( size-1, requests+1, MPI_STATUSES_IGNORE );
         delete[] requests;
         }
         Request::waitall( size-1, requests+1, MPI_STATUSES_IGNORE );
         delete[] requests;
@@ -262,6 +267,7 @@ int barrier__ompi_tree(MPI_Comm comm)
 
     rank = comm->rank();
     size = comm->size();
 
     rank = comm->rank();
     size = comm->size();
+    int tag = smpi_process()->finalizing() ? COLL_TAG_BARRIER-1: COLL_TAG_BARRIER;
     XBT_DEBUG(
                  "ompi_coll_tuned_barrier_ompi_tree %d",
                  rank);
     XBT_DEBUG(
                  "ompi_coll_tuned_barrier_ompi_tree %d",
                  rank);
@@ -273,9 +279,9 @@ int barrier__ompi_tree(MPI_Comm comm)
         partner = rank ^ jump;
         if (!(partner & (jump-1)) && partner < size) {
             if (partner > rank) {
         partner = rank ^ jump;
         if (!(partner & (jump-1)) && partner < size) {
             if (partner > rank) {
-              Request::recv(nullptr, 0, MPI_BYTE, partner, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+              Request::recv(nullptr, 0, MPI_BYTE, partner, tag, comm, MPI_STATUS_IGNORE);
             } else if (partner < rank) {
             } else if (partner < rank) {
-              Request::send(nullptr, 0, MPI_BYTE, partner, COLL_TAG_BARRIER, comm);
+              Request::send(nullptr, 0, MPI_BYTE, partner, tag, comm);
             }
         }
     }
             }
         }
     }
@@ -285,9 +291,9 @@ int barrier__ompi_tree(MPI_Comm comm)
         partner = rank ^ jump;
         if (!(partner & (jump-1)) && partner < size) {
             if (partner > rank) {
         partner = rank ^ jump;
         if (!(partner & (jump-1)) && partner < size) {
             if (partner > rank) {
-              Request::send(nullptr, 0, MPI_BYTE, partner, COLL_TAG_BARRIER, comm);
+              Request::send(nullptr, 0, MPI_BYTE, partner, tag, comm);
             } else if (partner < rank) {
             } else if (partner < rank) {
-              Request::recv(nullptr, 0, MPI_BYTE, partner, COLL_TAG_BARRIER, comm, MPI_STATUS_IGNORE);
+              Request::recv(nullptr, 0, MPI_BYTE, partner, tag, comm, MPI_STATUS_IGNORE);
             }
         }
     }
             }
         }
     }
index 3351208..7d1fa6c 100644 (file)
@@ -31,7 +31,7 @@ constexpr unsigned MPI_REQ_MATCHED        = 0x4000;
 constexpr unsigned MPI_REQ_CANCELLED      = 0x8000;
 constexpr unsigned MPI_REQ_NBC            = 0x10000;
 
 constexpr unsigned MPI_REQ_CANCELLED      = 0x8000;
 constexpr unsigned MPI_REQ_NBC            = 0x10000;
 
-enum class SmpiProcessState { UNINITIALIZED, INITIALIZING, INITIALIZED /*(=MPI_Init called)*/, FINALIZED };
+enum class SmpiProcessState { UNINITIALIZED, INITIALIZING, INITIALIZED /*(=MPI_Init called)*/, FINALIZING, FINALIZED };
 
 constexpr int COLL_TAG_REDUCE         = -112;
 constexpr int COLL_TAG_SCATTER        = -223;
 
 constexpr int COLL_TAG_REDUCE         = -112;
 constexpr int COLL_TAG_SCATTER        = -223;
index 3b1e6f8..a44f60f 100644 (file)
@@ -56,7 +56,9 @@ public:
   int finalized() const;
   int initializing() const;
   int initialized() const;
   int finalized() const;
   int initializing() const;
   int initialized() const;
+  int finalizing() const;
   void mark_as_initialized();
   void mark_as_initialized();
+  void mark_as_finalizing();
   void set_replaying(bool value);
   bool replaying() const;
   std::string get_instance_id() const { return instance_id_;}
   void set_replaying(bool value);
   bool replaying() const;
   std::string get_instance_id() const { return instance_id_;}
index 4e2fa04..ad8af7d 100644 (file)
@@ -99,6 +99,19 @@ void ActorExt::mark_as_initialized()
     state_ = SmpiProcessState::INITIALIZED;
 }
 
     state_ = SmpiProcessState::INITIALIZED;
 }
 
+/** @brief Mark a process as finalizing (=MPI_Finalize called) */
+void ActorExt::mark_as_finalizing()
+{
+  if (state_ != SmpiProcessState::FINALIZED)
+    state_ = SmpiProcessState::FINALIZING;
+}
+
+/** @brief Check if a process is finalizing */
+int ActorExt::finalizing() const
+{
+  return (state_ == SmpiProcessState::FINALIZING);
+}
+
 void ActorExt::set_replaying(bool value)
 {
   if (state_ != SmpiProcessState::FINALIZED)
 void ActorExt::set_replaying(bool value)
 {
   if (state_ != SmpiProcessState::FINALIZED)