Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'actor-comms' into 'master'
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 15 Feb 2023 23:59:31 +0000 (23:59 +0000)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 15 Feb 2023 23:59:31 +0000 (23:59 +0000)
Serialize pending transitions when responding to `ACTOR_STATUS` messages on the application side

See merge request simgrid/simgrid!127

include/xbt/utility.hpp
src/mc/api/ActorState.hpp
src/mc/api/RemoteApp.cpp
src/mc/api/State.cpp
src/mc/api/State.hpp
src/mc/remote/AppSide.cpp
src/mc/remote/Channel.cpp
src/mc/remote/mc_protocol.h

index 34dc34e..bb599d6 100644 (file)
     constexpr std::array<const char*, _XBT_COUNT_ARGS(__VA_ARGS__)> names{{_XBT_STRINGIFY_ARGS(__VA_ARGS__)}};         \
     return names.at(static_cast<int>(value));                                                                          \
   }                                                                                                                    \
+  static constexpr bool is_valid_##EnumType(int raw_value)                                                             \
+  {                                                                                                                    \
+    return raw_value >= 0 && raw_value < _XBT_COUNT_ARGS(__VA_ARGS__);                                                 \
+  }                                                                                                                    \
   enum class EnumType { __VA_ARGS__ } /* defined here to handle trailing semicolon */
 
 namespace simgrid {
@@ -76,6 +80,6 @@ template <class List, class Elem> inline void intrusive_erase(List& list, Elem&
   list.erase(list.iterator_to(elem));
 }
 
-}
-}
+} // namespace xbt
+} // namespace simgrid
 #endif
index cee8581..16232ff 100644 (file)
@@ -9,15 +9,56 @@
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/mc/remote/RemotePtr.hpp"
 
+#include <exception>
+#include <vector>
+
 namespace simgrid::mc {
 
 /* On every state, each actor has an entry of the following type.
- * This represents both the actor and its transition because
- *   an actor cannot have more than one enabled transition at a given time.
+ * This usually represents both the actor and its transition because
+ * most of the time an actor cannot have more than one enabled transition
+ * at a given time. However, certain transitions have multiple "paths"
+ * that can be followed, which means that a given actor may be able
+ * to do more than one thing at a time.
+ *
+ * Formally, at this state multiple transitions would exist all of
+ * which happened to be executed by the same actor. This distinction
+ * is important in cases
  */
 class ActorState {
+
+  /**
+   * @brief The transitions that the actor is allowed to execute from this
+   * state, viz. those that are enabled for this actor
+   *
+   * Most actors can take only a single action from any given state.
+   * However, when an actor executes a transition with multiple
+   * possible variations (e.g. an MC_Random() [see class: RandomTransition]
+   * for more details]), multiple enabled actions are defined
+   *
+   * @invariant The transitions are arranged such that an actor
+   * with multiple possible paths of execution will contain all
+   * such transitions such that `pending_transitions_[i]` represents
+   * the variation of the transition with `times_considered = i`.
+   *
+   * TODO: If only a subset of transitions of an actor that can
+   * take multiple transitions in some state are truly enabled,
+   * we would instead need to map `times_considered` to a transition,
+   * as the map is currently implicit in the ordering of the transitions
+   * in the vector
+   *
+   * TODO: If a single transition is taken at a time in a concurrent system,
+   * then nearly all of the transitions from in a state `s'` after taking
+   * an action `t` from state `s`  (i.e. s -- t --> s') are the same
+   * sans for the new transition of the actor which just executed t.
+   * This means there may be a way to store the list once and apply differences
+   * rather than repeating elements frequently.
+   */
+  std::vector<std::unique_ptr<Transition>> pending_transitions_;
+
   /* Possible exploration status of an actor transition in a state.
-   * Either the checker did not consider the transition, or it was considered and still to do, or considered and done.
+   * Either the checker did not consider the transition, or it was considered and still to do, or considered and
+   * done.
    */
   enum class InterleavingType {
     /** This actor transition is not considered by the checker (yet?) */
@@ -44,8 +85,10 @@ class ActorState {
   bool enabled_;
 
 public:
-  ActorState(aid_t aid, bool enabled, unsigned int max_consider)
-      : aid_(aid), max_consider_(max_consider), enabled_(enabled)
+  ActorState(aid_t aid, bool enabled, unsigned int max_consider) : ActorState(aid, enabled, max_consider, {}) {}
+
+  ActorState(aid_t aid, bool enabled, unsigned int max_consider, std::vector<std::unique_ptr<Transition>> transitions)
+      : aid_(aid), max_consider_(max_consider), enabled_(enabled), pending_transitions_(std::move(transitions))
   {
   }
 
@@ -71,6 +114,24 @@ public:
     this->times_considered_ = 0;
   }
   void set_done() { this->state_ = InterleavingType::done; }
+
+  inline Transition* get_transition(unsigned times_considered)
+  {
+    xbt_assert(times_considered < this->pending_transitions_.size(),
+               "Actor %lu does not have a state available transition with `times_considered = %d`,\n"
+               "yet one was asked for",
+               aid_, times_considered);
+    return this->pending_transitions_[times_considered].get();
+  }
+
+  inline void set_transition(std::unique_ptr<Transition> t, unsigned times_considered)
+  {
+    xbt_assert(times_considered < this->pending_transitions_.size(),
+               "Actor %lu does not have a state available transition with `times_considered = %d`, "
+               "yet one was attempted to be set",
+               aid_, times_considered);
+    this->pending_transitions_[times_considered] = std::move(t);
+  }
 };
 
 } // namespace simgrid::mc
index d9f5782..d2a2b61 100644 (file)
 #include "xbt/log.h"
 #include "xbt/system_error.hpp"
 
+#include <algorithm>
 #include <array>
 #include <boost/tokenizer.hpp>
 #include <memory>
+#include <numeric>
 #include <string>
 
 #include <fcntl.h>
@@ -159,6 +161,13 @@ unsigned long RemoteApp::get_maxpid() const
 
 void RemoteApp::get_actors_status(std::map<aid_t, ActorState>& whereto) const
 {
+  // The messaging happens as follows:
+  //
+  // CheckerSide                  AppSide
+  // send ACTORS_STATUS ---->
+  //                    <----- send ACTORS_STATUS_REPLY
+  //                    <----- send `N` `s_mc_message_actors_status_one_t` structs
+  //                    <----- send `M` `s_mc_message_simcall_probe_one_t` structs
   s_mc_message_t msg;
   memset(&msg, 0, sizeof msg);
   msg.type = simgrid::mc::MessageType::ACTORS_STATUS;
@@ -180,9 +189,42 @@ void RemoteApp::get_actors_status(std::map<aid_t, ActorState>& whereto) const
     xbt_assert(static_cast<size_t>(received) == size);
   }
 
+  std::vector<s_mc_message_simcall_probe_one_t> action_pool(answer.transition_count);
+  if (answer.transition_count > 0) {
+    size_t size = action_pool.size() * sizeof(s_mc_message_simcall_probe_one_t);
+    received    = model_checker_->channel().receive(action_pool.data(), size);
+    xbt_assert(static_cast<size_t>(received) == size);
+  }
+
+  // Ensures that each actor sends precisely `actor.max_considered` transitions. While technically
+  // this doesn't catch the edge case where actor A sends 3 instead of 2 and actor B sends 2 instead
+  // of 3 transitions, that is ignored here since that invariant needs to be enforced on the AppSide
+  const auto expected_transitions = std::accumulate(
+      status.begin(), status.end(), 0, [](int total, const auto& actor) { return total + actor.n_transitions; });
+  xbt_assert(expected_transitions == action_pool.size(),
+             "Expected to receive %d transition(s) but was only notified of %lu by the app side", expected_transitions,
+             action_pool.size());
+
   whereto.clear();
-  for (auto const& actor : status)
-    whereto.try_emplace(actor.aid, actor.aid, actor.enabled, actor.max_considered);
+  auto action_pool_iter = std::move_iterator(action_pool.begin());
+
+  for (const auto& actor : status) {
+    xbt_assert(actor.n_transitions == 0 || actor.n_transitions == actor.max_considered,
+               "If any transitions are serialized for an actor, it must match the "
+               "total number of transitions that can be considered for the actor "
+               "(currently %d), but only %d transition(s) was/were said to be encoded",
+               actor.max_considered, actor.n_transitions);
+
+    auto actor_transitions = std::vector<std::unique_ptr<Transition>>(actor.n_transitions);
+    for (int times_considered = 0; times_considered < actor.n_transitions; times_considered++, action_pool_iter++) {
+      std::stringstream stream((*action_pool_iter).buffer.data());
+      auto transition = std::unique_ptr<Transition>(deserialize_transition(actor.aid, times_considered, stream));
+      actor_transitions[times_considered] = std::move(transition);
+    }
+
+    XBT_DEBUG("Received %d transitions for actor %ld", actor.n_transitions, actor.aid);
+    whereto.try_emplace(actor.aid, actor.aid, actor.enabled, actor.max_considered, std::move(actor_transitions));
+  }
 }
 
 void RemoteApp::check_deadlock() const
index 66f7868..f91a369 100644 (file)
@@ -18,7 +18,6 @@ State::State(const RemoteApp& remote_app) : num_(++expended_states_)
 {
   remote_app.get_actors_status(actors_to_run_);
 
-  transition_.reset(new Transition());
   /* Stateful model checking */
   if ((_sg_mc_checkpoint > 0 && (num_ % _sg_mc_checkpoint == 0)) || _sg_mc_termination) {
     system_state_ = std::make_shared<simgrid::mc::Snapshot>(num_);
@@ -32,7 +31,7 @@ std::size_t State::count_todo() const
 
 Transition* State::get_transition() const
 {
-  return transition_.get();
+  return transition_;
 }
 
 aid_t State::next_transition() const
@@ -47,16 +46,42 @@ aid_t State::next_transition() const
   }
   return -1;
 }
+
 void State::execute_next(aid_t next)
 {
-  /* This actor is ready to be executed. Prepare its execution when simcall_handle will be called on it */
-  const unsigned times_considered = actors_to_run_.at(next).do_consider();
+  // This actor is ready to be executed. Execution involves three phases:
+
+  // 1. Identify the appropriate ActorState to prepare for execution
+  // when simcall_handle will be called on it
+  auto& actor_state                        = actors_to_run_.at(next);
+  const unsigned times_considered          = actor_state.do_consider();
+  const auto* expected_executed_transition = actor_state.get_transition(times_considered);
+  xbt_assert(expected_executed_transition != nullptr,
+             "Expected a transition with %d times considered to be noted in actor %lu", times_considered, next);
 
   XBT_DEBUG("Let's run actor %ld (times_considered = %u)", next, times_considered);
 
+  // 2. Execute the actor according to the preparation above
   Transition::executed_transitions_++;
+  auto* just_executed = mc_model_checker->handle_simcall(next, times_considered, true);
+  xbt_assert(just_executed->type_ == expected_executed_transition->type_,
+             "The transition that was just executed by actor %lu, viz:\n"
+             "%s\n"
+             "is not what was purportedly scheduled to execute, which was:\n"
+             "%s\n",
+             next, just_executed->to_string().c_str(), expected_executed_transition->to_string().c_str());
+
+  // 3. Update the state with the newest information. This means recording
+  // both
+  //  1. what action was last taken from this state (viz. `executed_transition`)
+  //  2. what action actor `next` was able to take given `times_considered`
+  // The latter update is important as *more* information is potentially available
+  // about a transition AFTER it has executed.
+  transition_ = just_executed;
+
+  auto executed_transition = std::unique_ptr<Transition>(just_executed);
+  actor_state.set_transition(std::move(executed_transition), times_considered);
 
-  transition_.reset(mc_model_checker->handle_simcall(next, times_considered, true));
   mc_model_checker->wait_for_requests();
 }
 } // namespace simgrid::mc
index b6da4d5..08a4ecf 100644 (file)
@@ -17,13 +17,20 @@ namespace simgrid::mc {
 class XBT_PRIVATE State : public xbt::Extendable<State> {
   static long expended_states_; /* Count total amount of states, for stats */
 
-  /* Outgoing transition: what was the last transition that we took to leave this state? */
-  std::unique_ptr<Transition> transition_;
+  /**
+   * @brief The outgoing transition: what was the last transition that
+   * we took to leave this state?
+   *
+   * The owner of the transition is the `ActorState` instance which exists in this state,
+   * or nullptr if the state represents the root
+   */
+  Transition* transition_ = nullptr;
 
   /** Sequential state ID (used for debugging) */
   long num_ = 0;
 
-  /** State's exploration status by actor. Not all the actors are there, only the ones that are ready-to-run in this state */
+  /** State's exploration status by actor. Not all the actors are there, only the ones that are ready-to-run in this
+   * state */
   std::map<aid_t, ActorState> actors_to_run_;
 
   /** Snapshot of system state (if needed) */
@@ -43,7 +50,7 @@ public:
   void mark_todo(aid_t actor) { actors_to_run_.at(actor).mark_todo(); }
   bool is_done(aid_t actor) const { return actors_to_run_.at(actor).is_done(); }
   Transition* get_transition() const;
-  void set_transition(Transition* t) { transition_.reset(t); }
+  void set_transition(Transition* t) { transition_ = t; }
   std::map<aid_t, ActorState> const& get_actors_list() const { return actors_to_run_; }
 
   unsigned long get_actor_count() const { return actors_to_run_.size(); }
index bad0c1c..80e4c52 100644 (file)
@@ -26,6 +26,7 @@
 #include <cstdlib>
 #include <cstring>
 #include <memory>
+#include <numeric>
 #include <sys/ptrace.h>
 #include <sys/socket.h>
 #include <sys/types.h>
@@ -154,25 +155,72 @@ void AppSide::handle_finalize(const s_mc_message_int_t* msg) const
 void AppSide::handle_actors_status() const
 {
   auto const& actor_list = kernel::EngineImpl::get_instance()->get_actor_list();
-  int count              = actor_list.size();
-  XBT_DEBUG("Serialize the actors to answer ACTORS_STATUS from the checker. %d actors to go.", count);
+  const int num_actors   = actor_list.size();
+  XBT_DEBUG("Serialize the actors to answer ACTORS_STATUS from the checker. %d actors to go.", num_actors);
+
+  std::vector<s_mc_message_actors_status_one_t> status(num_actors);
+  int i                 = 0;
+  int total_transitions = 0;
 
-  struct s_mc_message_actors_status_answer_t answer {
-    MessageType::ACTORS_STATUS_REPLY, count
-  };
-  std::vector<s_mc_message_actors_status_one_t> status(count);
-  int i = 0;
   for (auto const& [aid, actor] : actor_list) {
     status[i].aid            = aid;
     status[i].enabled        = mc::actor_is_enabled(actor);
     status[i].max_considered = actor->simcall_.observer_->get_max_consider();
+    status[i].n_transitions  = mc::actor_is_enabled(actor) ? status[i].max_considered : 0;
+    total_transitions += status[i].n_transitions;
     i++;
   }
+
+  struct s_mc_message_actors_status_answer_t answer {
+    MessageType::ACTORS_STATUS_REPLY, num_actors, total_transitions
+  };
+
   xbt_assert(channel_.send(answer) == 0, "Could not send ACTORS_STATUS_REPLY msg");
   if (answer.count > 0) {
     size_t size = status.size() * sizeof(s_mc_message_actors_status_one_t);
     xbt_assert(channel_.send(status.data(), size) == 0, "Could not send ACTORS_STATUS_REPLY data");
   }
+
+  // Serialize each transition to describe what each actor is doing
+  if (total_transitions > 0) {
+    std::vector<s_mc_message_simcall_probe_one_t> probes(total_transitions);
+    auto probes_iter = probes.begin();
+
+    for (const auto& actor_status : status) {
+      if (not actor_status.enabled)
+        continue;
+
+      const auto& actor        = actor_list.at(actor_status.aid);
+      const int max_considered = actor_status.max_considered;
+
+      for (int times_considered = 0; times_considered < max_considered; times_considered++, probes_iter++) {
+        std::stringstream stream;
+        s_mc_message_simcall_probe_one_t& probe = *probes_iter;
+
+        if (actor->simcall_.observer_ != nullptr) {
+          actor->simcall_.observer_->prepare(times_considered);
+          actor->simcall_.observer_->serialize(stream);
+        } else {
+          stream << (short)mc::Transition::Type::UNKNOWN;
+        }
+
+        std::string str = stream.str();
+        xbt_assert(str.size() + 1 <= probe.buffer.size(),
+                   "The serialized transition is too large for the buffer. Please fix the code.");
+        strncpy(probe.buffer.data(), str.c_str(), probe.buffer.size() - 1);
+        probe.buffer.back() = '\0';
+      }
+      // NOTE: We do NOT need to reset `times_considered` for each actor's
+      // simcall observer here to the "original" value (i.e. the value BEFORE
+      // multiple prepare() calls were made for serialization purposes) since
+      // each SIMCALL_EXECUTE provides a `times_considered` to be used to prepare
+      // the transition before execution.
+    }
+
+    size_t size = probes.size() * sizeof(s_mc_message_simcall_probe_one_t);
+    XBT_DEBUG("Deliver ACTOR_TRANSITION_PROBE payload");
+    xbt_assert(channel_.send(probes.data(), size) == 0, "Could not send ACTOR_TRANSITION_PROBE payload");
+  }
 }
 
 #define assert_msg_size(_name_, _type_)                                                                                \
index ef234a8..60d5989 100644 (file)
@@ -26,23 +26,34 @@ Channel::~Channel()
 /** @brief Send a message; returns 0 on success or errno on failure */
 int Channel::send(const void* message, size_t size) const
 {
-  XBT_DEBUG("Send %s", to_c_str(*(MessageType*)message));
   while (::send(this->socket_, message, size, 0) == -1) {
     if (errno != EINTR) {
       XBT_ERROR("Channel::send failure: %s", strerror(errno));
       return errno;
     }
   }
+
+  if (is_valid_MessageType(*(int*)message)) {
+    XBT_DEBUG("Sending %s (%lu bytes sent)", to_c_str(*(MessageType*)message), size);
+  } else {
+    XBT_DEBUG("Sending bytes directly (from address %p) (%lu bytes sent)", message, size);
+  }
+
   return 0;
 }
 
 ssize_t Channel::receive(void* message, size_t size, bool block) const
 {
   ssize_t res = recv(this->socket_, message, size, block ? 0 : MSG_DONTWAIT);
-  if (res != -1)
-    XBT_DEBUG("Receive %s", to_c_str(*(MessageType*)message));
-  else
+  if (res != -1) {
+    if (is_valid_MessageType(*(int*)message)) {
+      XBT_DEBUG("Receive %s (requested %lu; received %lu)", to_c_str(*(MessageType*)message), size, res);
+    } else {
+      XBT_DEBUG("Receive %lu bytes", res);
+    }
+  } else {
     XBT_ERROR("Channel::receive failure: %s", strerror(errno));
+  }
   return res;
 }
 } // namespace simgrid::mc
index 2f8e34b..156e3c7 100644 (file)
@@ -29,7 +29,7 @@ XBT_DECLARE_ENUM_CLASS(MessageType, NONE, INITIAL_ADDRESSES, CONTINUE, IGNORE_HE
                        FINALIZE_REPLY);
 } // namespace simgrid::mc
 
-constexpr unsigned MC_MESSAGE_LENGTH = 512;
+constexpr unsigned MC_MESSAGE_LENGTH                 = 512;
 constexpr unsigned SIMCALL_SERIALIZATION_BUFFER_SIZE = 2048;
 
 /** Basic structure for a MC message
@@ -104,12 +104,23 @@ struct s_mc_message_restore_t {
 struct s_mc_message_actors_status_answer_t {
   simgrid::mc::MessageType type;
   int count;
+  int transition_count; // The total number of transitions sent as a payload to the checker
 };
 struct s_mc_message_actors_status_one_t { // an array of `s_mc_message_actors_status_one_t[count]` is sent right after
-                                          // after a s_mc_message_actors_status_answer_t
+                                          // after a `s_mc_message_actors_status_answer_t`
   aid_t aid;
   bool enabled;
   int max_considered;
+
+  // The total number of transitions that are serialized and associated with this actor.
+  // Enforced to be either `0` or the same as `max_considered`
+  int n_transitions;
+};
+
+// Answer from an actor to the question "what are you about to run?"
+struct s_mc_message_simcall_probe_one_t { // an array of `s_mc_message_simcall_probe_one_t[n_transitions]
+                                          // is sent right after a `s_mc_message_actors_status_one_t`
+  std::array<char, SIMCALL_SERIALIZATION_BUFFER_SIZE> buffer;
 };
 
 #endif // __cplusplus