Serialize pending transitions when responding to `ACTOR_STATUS` messages on the application side
See merge request simgrid/simgrid!127
constexpr std::array<const char*, _XBT_COUNT_ARGS(__VA_ARGS__)> names{{_XBT_STRINGIFY_ARGS(__VA_ARGS__)}}; \
return names.at(static_cast<int>(value)); \
} \
+ static constexpr bool is_valid_##EnumType(int raw_value) \
+ { \
+ return raw_value >= 0 && raw_value < _XBT_COUNT_ARGS(__VA_ARGS__); \
+ } \
enum class EnumType { __VA_ARGS__ } /* defined here to handle trailing semicolon */
namespace simgrid {
list.erase(list.iterator_to(elem));
}
-}
-}
+} // namespace xbt
+} // namespace simgrid
#endif
#include "src/kernel/activity/CommImpl.hpp"
#include "src/mc/remote/RemotePtr.hpp"
+#include <exception>
+#include <vector>
+
namespace simgrid::mc {
/* On every state, each actor has an entry of the following type.
- * This represents both the actor and its transition because
- * an actor cannot have more than one enabled transition at a given time.
+ * This usually represents both the actor and its transition because
+ * most of the time an actor cannot have more than one enabled transition
+ * at a given time. However, certain transitions have multiple "paths"
+ * that can be followed, which means that a given actor may be able
+ * to do more than one thing at a time.
+ *
+ * Formally, at this state multiple transitions would exist all of
+ * which happened to be executed by the same actor. This distinction
+ * is important in cases
*/
class ActorState {
+
+ /**
+ * @brief The transitions that the actor is allowed to execute from this
+ * state, viz. those that are enabled for this actor
+ *
+ * Most actors can take only a single action from any given state.
+ * However, when an actor executes a transition with multiple
+ * possible variations (e.g. an MC_Random() [see class: RandomTransition]
+ * for more details]), multiple enabled actions are defined
+ *
+ * @invariant The transitions are arranged such that an actor
+ * with multiple possible paths of execution will contain all
+ * such transitions such that `pending_transitions_[i]` represents
+ * the variation of the transition with `times_considered = i`.
+ *
+ * TODO: If only a subset of transitions of an actor that can
+ * take multiple transitions in some state are truly enabled,
+ * we would instead need to map `times_considered` to a transition,
+ * as the map is currently implicit in the ordering of the transitions
+ * in the vector
+ *
+ * TODO: If a single transition is taken at a time in a concurrent system,
+ * then nearly all of the transitions from in a state `s'` after taking
+ * an action `t` from state `s` (i.e. s -- t --> s') are the same
+ * sans for the new transition of the actor which just executed t.
+ * This means there may be a way to store the list once and apply differences
+ * rather than repeating elements frequently.
+ */
+ std::vector<std::unique_ptr<Transition>> pending_transitions_;
+
/* Possible exploration status of an actor transition in a state.
- * Either the checker did not consider the transition, or it was considered and still to do, or considered and done.
+ * Either the checker did not consider the transition, or it was considered and still to do, or considered and
+ * done.
*/
enum class InterleavingType {
/** This actor transition is not considered by the checker (yet?) */
bool enabled_;
public:
- ActorState(aid_t aid, bool enabled, unsigned int max_consider)
- : aid_(aid), max_consider_(max_consider), enabled_(enabled)
+ ActorState(aid_t aid, bool enabled, unsigned int max_consider) : ActorState(aid, enabled, max_consider, {}) {}
+
+ ActorState(aid_t aid, bool enabled, unsigned int max_consider, std::vector<std::unique_ptr<Transition>> transitions)
+ : aid_(aid), max_consider_(max_consider), enabled_(enabled), pending_transitions_(std::move(transitions))
{
}
this->times_considered_ = 0;
}
void set_done() { this->state_ = InterleavingType::done; }
+
+ inline Transition* get_transition(unsigned times_considered)
+ {
+ xbt_assert(times_considered < this->pending_transitions_.size(),
+ "Actor %lu does not have a state available transition with `times_considered = %d`,\n"
+ "yet one was asked for",
+ aid_, times_considered);
+ return this->pending_transitions_[times_considered].get();
+ }
+
+ inline void set_transition(std::unique_ptr<Transition> t, unsigned times_considered)
+ {
+ xbt_assert(times_considered < this->pending_transitions_.size(),
+ "Actor %lu does not have a state available transition with `times_considered = %d`, "
+ "yet one was attempted to be set",
+ aid_, times_considered);
+ this->pending_transitions_[times_considered] = std::move(t);
+ }
};
} // namespace simgrid::mc
#include "xbt/log.h"
#include "xbt/system_error.hpp"
+#include <algorithm>
#include <array>
#include <boost/tokenizer.hpp>
#include <memory>
+#include <numeric>
#include <string>
#include <fcntl.h>
void RemoteApp::get_actors_status(std::map<aid_t, ActorState>& whereto) const
{
+ // The messaging happens as follows:
+ //
+ // CheckerSide AppSide
+ // send ACTORS_STATUS ---->
+ // <----- send ACTORS_STATUS_REPLY
+ // <----- send `N` `s_mc_message_actors_status_one_t` structs
+ // <----- send `M` `s_mc_message_simcall_probe_one_t` structs
s_mc_message_t msg;
memset(&msg, 0, sizeof msg);
msg.type = simgrid::mc::MessageType::ACTORS_STATUS;
xbt_assert(static_cast<size_t>(received) == size);
}
+ std::vector<s_mc_message_simcall_probe_one_t> action_pool(answer.transition_count);
+ if (answer.transition_count > 0) {
+ size_t size = action_pool.size() * sizeof(s_mc_message_simcall_probe_one_t);
+ received = model_checker_->channel().receive(action_pool.data(), size);
+ xbt_assert(static_cast<size_t>(received) == size);
+ }
+
+ // Ensures that each actor sends precisely `actor.max_considered` transitions. While technically
+ // this doesn't catch the edge case where actor A sends 3 instead of 2 and actor B sends 2 instead
+ // of 3 transitions, that is ignored here since that invariant needs to be enforced on the AppSide
+ const auto expected_transitions = std::accumulate(
+ status.begin(), status.end(), 0, [](int total, const auto& actor) { return total + actor.n_transitions; });
+ xbt_assert(expected_transitions == action_pool.size(),
+ "Expected to receive %d transition(s) but was only notified of %lu by the app side", expected_transitions,
+ action_pool.size());
+
whereto.clear();
- for (auto const& actor : status)
- whereto.try_emplace(actor.aid, actor.aid, actor.enabled, actor.max_considered);
+ auto action_pool_iter = std::move_iterator(action_pool.begin());
+
+ for (const auto& actor : status) {
+ xbt_assert(actor.n_transitions == 0 || actor.n_transitions == actor.max_considered,
+ "If any transitions are serialized for an actor, it must match the "
+ "total number of transitions that can be considered for the actor "
+ "(currently %d), but only %d transition(s) was/were said to be encoded",
+ actor.max_considered, actor.n_transitions);
+
+ auto actor_transitions = std::vector<std::unique_ptr<Transition>>(actor.n_transitions);
+ for (int times_considered = 0; times_considered < actor.n_transitions; times_considered++, action_pool_iter++) {
+ std::stringstream stream((*action_pool_iter).buffer.data());
+ auto transition = std::unique_ptr<Transition>(deserialize_transition(actor.aid, times_considered, stream));
+ actor_transitions[times_considered] = std::move(transition);
+ }
+
+ XBT_DEBUG("Received %d transitions for actor %ld", actor.n_transitions, actor.aid);
+ whereto.try_emplace(actor.aid, actor.aid, actor.enabled, actor.max_considered, std::move(actor_transitions));
+ }
}
void RemoteApp::check_deadlock() const
{
remote_app.get_actors_status(actors_to_run_);
- transition_.reset(new Transition());
/* Stateful model checking */
if ((_sg_mc_checkpoint > 0 && (num_ % _sg_mc_checkpoint == 0)) || _sg_mc_termination) {
system_state_ = std::make_shared<simgrid::mc::Snapshot>(num_);
Transition* State::get_transition() const
{
- return transition_.get();
+ return transition_;
}
aid_t State::next_transition() const
}
return -1;
}
+
void State::execute_next(aid_t next)
{
- /* This actor is ready to be executed. Prepare its execution when simcall_handle will be called on it */
- const unsigned times_considered = actors_to_run_.at(next).do_consider();
+ // This actor is ready to be executed. Execution involves three phases:
+
+ // 1. Identify the appropriate ActorState to prepare for execution
+ // when simcall_handle will be called on it
+ auto& actor_state = actors_to_run_.at(next);
+ const unsigned times_considered = actor_state.do_consider();
+ const auto* expected_executed_transition = actor_state.get_transition(times_considered);
+ xbt_assert(expected_executed_transition != nullptr,
+ "Expected a transition with %d times considered to be noted in actor %lu", times_considered, next);
XBT_DEBUG("Let's run actor %ld (times_considered = %u)", next, times_considered);
+ // 2. Execute the actor according to the preparation above
Transition::executed_transitions_++;
+ auto* just_executed = mc_model_checker->handle_simcall(next, times_considered, true);
+ xbt_assert(just_executed->type_ == expected_executed_transition->type_,
+ "The transition that was just executed by actor %lu, viz:\n"
+ "%s\n"
+ "is not what was purportedly scheduled to execute, which was:\n"
+ "%s\n",
+ next, just_executed->to_string().c_str(), expected_executed_transition->to_string().c_str());
+
+ // 3. Update the state with the newest information. This means recording
+ // both
+ // 1. what action was last taken from this state (viz. `executed_transition`)
+ // 2. what action actor `next` was able to take given `times_considered`
+ // The latter update is important as *more* information is potentially available
+ // about a transition AFTER it has executed.
+ transition_ = just_executed;
+
+ auto executed_transition = std::unique_ptr<Transition>(just_executed);
+ actor_state.set_transition(std::move(executed_transition), times_considered);
- transition_.reset(mc_model_checker->handle_simcall(next, times_considered, true));
mc_model_checker->wait_for_requests();
}
} // namespace simgrid::mc
class XBT_PRIVATE State : public xbt::Extendable<State> {
static long expended_states_; /* Count total amount of states, for stats */
- /* Outgoing transition: what was the last transition that we took to leave this state? */
- std::unique_ptr<Transition> transition_;
+ /**
+ * @brief The outgoing transition: what was the last transition that
+ * we took to leave this state?
+ *
+ * The owner of the transition is the `ActorState` instance which exists in this state,
+ * or nullptr if the state represents the root
+ */
+ Transition* transition_ = nullptr;
/** Sequential state ID (used for debugging) */
long num_ = 0;
- /** State's exploration status by actor. Not all the actors are there, only the ones that are ready-to-run in this state */
+ /** State's exploration status by actor. Not all the actors are there, only the ones that are ready-to-run in this
+ * state */
std::map<aid_t, ActorState> actors_to_run_;
/** Snapshot of system state (if needed) */
void mark_todo(aid_t actor) { actors_to_run_.at(actor).mark_todo(); }
bool is_done(aid_t actor) const { return actors_to_run_.at(actor).is_done(); }
Transition* get_transition() const;
- void set_transition(Transition* t) { transition_.reset(t); }
+ void set_transition(Transition* t) { transition_ = t; }
std::map<aid_t, ActorState> const& get_actors_list() const { return actors_to_run_; }
unsigned long get_actor_count() const { return actors_to_run_.size(); }
#include <cstdlib>
#include <cstring>
#include <memory>
+#include <numeric>
#include <sys/ptrace.h>
#include <sys/socket.h>
#include <sys/types.h>
void AppSide::handle_actors_status() const
{
auto const& actor_list = kernel::EngineImpl::get_instance()->get_actor_list();
- int count = actor_list.size();
- XBT_DEBUG("Serialize the actors to answer ACTORS_STATUS from the checker. %d actors to go.", count);
+ const int num_actors = actor_list.size();
+ XBT_DEBUG("Serialize the actors to answer ACTORS_STATUS from the checker. %d actors to go.", num_actors);
+
+ std::vector<s_mc_message_actors_status_one_t> status(num_actors);
+ int i = 0;
+ int total_transitions = 0;
- struct s_mc_message_actors_status_answer_t answer {
- MessageType::ACTORS_STATUS_REPLY, count
- };
- std::vector<s_mc_message_actors_status_one_t> status(count);
- int i = 0;
for (auto const& [aid, actor] : actor_list) {
status[i].aid = aid;
status[i].enabled = mc::actor_is_enabled(actor);
status[i].max_considered = actor->simcall_.observer_->get_max_consider();
+ status[i].n_transitions = mc::actor_is_enabled(actor) ? status[i].max_considered : 0;
+ total_transitions += status[i].n_transitions;
i++;
}
+
+ struct s_mc_message_actors_status_answer_t answer {
+ MessageType::ACTORS_STATUS_REPLY, num_actors, total_transitions
+ };
+
xbt_assert(channel_.send(answer) == 0, "Could not send ACTORS_STATUS_REPLY msg");
if (answer.count > 0) {
size_t size = status.size() * sizeof(s_mc_message_actors_status_one_t);
xbt_assert(channel_.send(status.data(), size) == 0, "Could not send ACTORS_STATUS_REPLY data");
}
+
+ // Serialize each transition to describe what each actor is doing
+ if (total_transitions > 0) {
+ std::vector<s_mc_message_simcall_probe_one_t> probes(total_transitions);
+ auto probes_iter = probes.begin();
+
+ for (const auto& actor_status : status) {
+ if (not actor_status.enabled)
+ continue;
+
+ const auto& actor = actor_list.at(actor_status.aid);
+ const int max_considered = actor_status.max_considered;
+
+ for (int times_considered = 0; times_considered < max_considered; times_considered++, probes_iter++) {
+ std::stringstream stream;
+ s_mc_message_simcall_probe_one_t& probe = *probes_iter;
+
+ if (actor->simcall_.observer_ != nullptr) {
+ actor->simcall_.observer_->prepare(times_considered);
+ actor->simcall_.observer_->serialize(stream);
+ } else {
+ stream << (short)mc::Transition::Type::UNKNOWN;
+ }
+
+ std::string str = stream.str();
+ xbt_assert(str.size() + 1 <= probe.buffer.size(),
+ "The serialized transition is too large for the buffer. Please fix the code.");
+ strncpy(probe.buffer.data(), str.c_str(), probe.buffer.size() - 1);
+ probe.buffer.back() = '\0';
+ }
+ // NOTE: We do NOT need to reset `times_considered` for each actor's
+ // simcall observer here to the "original" value (i.e. the value BEFORE
+ // multiple prepare() calls were made for serialization purposes) since
+ // each SIMCALL_EXECUTE provides a `times_considered` to be used to prepare
+ // the transition before execution.
+ }
+
+ size_t size = probes.size() * sizeof(s_mc_message_simcall_probe_one_t);
+ XBT_DEBUG("Deliver ACTOR_TRANSITION_PROBE payload");
+ xbt_assert(channel_.send(probes.data(), size) == 0, "Could not send ACTOR_TRANSITION_PROBE payload");
+ }
}
#define assert_msg_size(_name_, _type_) \
/** @brief Send a message; returns 0 on success or errno on failure */
int Channel::send(const void* message, size_t size) const
{
- XBT_DEBUG("Send %s", to_c_str(*(MessageType*)message));
while (::send(this->socket_, message, size, 0) == -1) {
if (errno != EINTR) {
XBT_ERROR("Channel::send failure: %s", strerror(errno));
return errno;
}
}
+
+ if (is_valid_MessageType(*(int*)message)) {
+ XBT_DEBUG("Sending %s (%lu bytes sent)", to_c_str(*(MessageType*)message), size);
+ } else {
+ XBT_DEBUG("Sending bytes directly (from address %p) (%lu bytes sent)", message, size);
+ }
+
return 0;
}
ssize_t Channel::receive(void* message, size_t size, bool block) const
{
ssize_t res = recv(this->socket_, message, size, block ? 0 : MSG_DONTWAIT);
- if (res != -1)
- XBT_DEBUG("Receive %s", to_c_str(*(MessageType*)message));
- else
+ if (res != -1) {
+ if (is_valid_MessageType(*(int*)message)) {
+ XBT_DEBUG("Receive %s (requested %lu; received %lu)", to_c_str(*(MessageType*)message), size, res);
+ } else {
+ XBT_DEBUG("Receive %lu bytes", res);
+ }
+ } else {
XBT_ERROR("Channel::receive failure: %s", strerror(errno));
+ }
return res;
}
} // namespace simgrid::mc
FINALIZE_REPLY);
} // namespace simgrid::mc
-constexpr unsigned MC_MESSAGE_LENGTH = 512;
+constexpr unsigned MC_MESSAGE_LENGTH = 512;
constexpr unsigned SIMCALL_SERIALIZATION_BUFFER_SIZE = 2048;
/** Basic structure for a MC message
struct s_mc_message_actors_status_answer_t {
simgrid::mc::MessageType type;
int count;
+ int transition_count; // The total number of transitions sent as a payload to the checker
};
struct s_mc_message_actors_status_one_t { // an array of `s_mc_message_actors_status_one_t[count]` is sent right after
- // after a s_mc_message_actors_status_answer_t
+ // after a `s_mc_message_actors_status_answer_t`
aid_t aid;
bool enabled;
int max_considered;
+
+ // The total number of transitions that are serialized and associated with this actor.
+ // Enforced to be either `0` or the same as `max_considered`
+ int n_transitions;
+};
+
+// Answer from an actor to the question "what are you about to run?"
+struct s_mc_message_simcall_probe_one_t { // an array of `s_mc_message_simcall_probe_one_t[n_transitions]
+ // is sent right after a `s_mc_message_actors_status_one_t`
+ std::array<char, SIMCALL_SERIALIZATION_BUFFER_SIZE> buffer;
};
#endif // __cplusplus