Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'dev' into 'master'
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 1 Mar 2022 18:35:53 +0000 (18:35 +0000)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 1 Mar 2022 18:35:53 +0000 (18:35 +0000)
Minimal changes to support hosts on/off

See merge request simgrid/simgrid!77

src/bindings/python/simgrid_python.cpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/MailboxImpl.cpp
src/kernel/activity/MailboxImpl.hpp
src/kernel/actor/ActorImpl.cpp
src/kernel/actor/ActorImpl.hpp
teshsuite/s4u/host-on-off-actors/host-on-off-actors.tesh

index 1930f15..96e067b 100644 (file)
@@ -89,6 +89,10 @@ PYBIND11_MODULE(simgrid, m)
 
   py::register_exception<simgrid::NetworkFailureException>(m, "NetworkFailureException");
   py::register_exception<simgrid::TimeoutException>(m, "TimeoutException");
+  py::register_exception<simgrid::HostFailureException>(m, "HostFailureException");
+  py::register_exception<simgrid::StorageFailureException>(m, "StorageFailureException");
+  py::register_exception<simgrid::VmFailureException>(m, "VmFailureException");
+  py::register_exception<simgrid::CancelException>(m, "CancelException");
 
   /* this_actor namespace */
   m.def_submodule("this_actor", "Bindings of the s4u::this_actor namespace. See the C++ documentation for details.")
@@ -123,8 +127,8 @@ PYBIND11_MODULE(simgrid, m)
             py::function fun = py::reinterpret_borrow<py::function>(cb);
             fun.inc_ref(); // FIXME: why is this needed for tests like actor-kill and actor-lifetime?
             simgrid::s4u::this_actor::on_exit([fun](bool /*failed*/) {
+              py::gil_scoped_acquire py_context; // need a new context for callback
               try {
-                py::gil_scoped_acquire py_context; // need a new context for callback
                 fun();
               } catch (const py::error_already_set& e) {
                 xbt_die("Error while executing the on_exit lambda: %s", e.what());
@@ -205,8 +209,8 @@ PYBIND11_MODULE(simgrid, m)
           "register_actor",
           [](Engine* e, const std::string& name, py::object fun_or_class) {
             e->register_actor(name, [fun_or_class](std::vector<std::string> args) {
+              py::gil_scoped_acquire py_context;
               try {
-                py::gil_scoped_acquire py_context;
                 /* Convert the std::vector into a py::tuple */
                 py::tuple params(args.size() - 1);
                 for (size_t i = 1; i < args.size(); i++)
@@ -439,8 +443,8 @@ PYBIND11_MODULE(simgrid, m)
           [](py::object cb) {
             Host::on_creation_cb([cb](Host& h) {
               py::function fun = py::reinterpret_borrow<py::function>(cb);
+              py::gil_scoped_acquire py_context; // need a new context for callback
               try {
-                py::gil_scoped_acquire py_context; // need a new context for callback
                 fun(&h);
               } catch (const py::error_already_set& e) {
                 xbt_die("Error while executing the on_creation lambda : %s", e.what());
@@ -739,8 +743,8 @@ PYBIND11_MODULE(simgrid, m)
             fun.inc_ref();  // FIXME: why is this needed for tests like exec-async, exec-dvfs and exec-remote?
             args.inc_ref(); // FIXME: why is this needed for tests like actor-migrate?
             return simgrid::s4u::Actor::create(name, h, [fun, args]() {
+              py::gil_scoped_acquire py_context;
               try {
-                py::gil_scoped_acquire py_context;
                 fun(*args);
               } catch (const py::error_already_set& ex) {
                 if (ex.matches(pyForcefulKillEx)) {
index 9af07ce..95d8f7b 100644 (file)
@@ -133,7 +133,10 @@ CommImpl* CommImpl::start()
   /* If both the sender and the receiver are already there, start the communication */
   if (get_state() == State::READY) {
     from_ = from_ != nullptr ? from_ : src_actor_->get_host();
+    xbt_assert(from_->is_on());
     to_   = to_ != nullptr ? to_ : dst_actor_->get_host();
+    xbt_assert(to_->is_on());
+
     /* Getting the network_model from the origin host
      * Valid while we have a single network model, otherwise we would need to change this function to first get the
      * routes and later create the respective surf actions */
@@ -436,14 +439,17 @@ void CommImpl::post()
     set_state(State::SRC_TIMEOUT);
   else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FINISHED)
     set_state(State::DST_TIMEOUT);
-  else if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FAILED)
+  else if ((from_ && not from_->is_on()) || (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FAILED))
     set_state(State::SRC_HOST_FAILURE);
-  else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FAILED)
+  else if ((to_ && not to_->is_on()) || (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FAILED))
     set_state(State::DST_HOST_FAILURE);
   else if (surf_action_ && surf_action_->get_state() == resource::Action::State::FAILED) {
     set_state(State::LINK_FAILURE);
-  } else
+  } else if (get_state() == State::RUNNING) {
+    xbt_assert(from_ && from_->is_on());
+    xbt_assert(to_ && to_->is_on());
     set_state(State::DONE);
+  }
 
   XBT_DEBUG("CommImpl::post(): comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(),
             src_actor_.get(), dst_actor_.get(), detached_);
index a046c69..3cb2e1c 100644 (file)
@@ -20,11 +20,22 @@ namespace activity {
 
 unsigned MailboxImpl::next_id_ = 0;
 
+MailboxImpl::~MailboxImpl()
+{
+  clear();
+  set_receiver(nullptr);
+}
+
 /** @brief set the receiver of the mailbox to allow eager sends
  *  @param actor The receiving dude
  */
 void MailboxImpl::set_receiver(s4u::ActorPtr actor)
 {
+  if (this->permanent_receiver_) {
+    std::vector<MailboxImpl*>& mboxes = this->permanent_receiver_->mailboxes;
+    mboxes.erase(std::remove(mboxes.begin(), mboxes.end(), this), mboxes.end());
+  }
+
   if (actor != nullptr)
     this->permanent_receiver_ = actor->get_impl();
   else
@@ -56,6 +67,27 @@ void MailboxImpl::remove(const CommImplPtr& comm)
   xbt_die("Comm %p not found in mailbox %s", comm.get(), this->get_cname());
 }
 
+/** @brief Removes all communication activities from a mailbox
+ */
+void MailboxImpl::clear()
+{
+  for (auto comm : done_comm_queue_) {
+    comm->cancel();
+    comm->set_state(State::DST_HOST_FAILURE);
+  }
+  done_comm_queue_.clear();
+
+  // CommImpl::cancel() will remove the comm from the mailbox..
+  while (not comm_queue_.empty()) {
+    auto comm = comm_queue_.back();
+    if (comm->get_state() == State::WAITING && not comm->detached()) {
+      comm->cancel();
+      comm->set_state(State::DST_HOST_FAILURE);
+    } else
+      comm_queue_.pop_back();
+  }
+}
+
 CommImplPtr MailboxImpl::iprobe(int type, bool (*match_fun)(void*, void*, CommImpl*), void* data)
 {
   XBT_DEBUG("iprobe from %p %p", this, &comm_queue_);
index b9c1214..541283e 100644 (file)
@@ -43,6 +43,8 @@ public:
   /** @brief Public interface */
   unsigned get_id() const { return id_; }
 
+  ~MailboxImpl();
+
   const s4u::Mailbox* get_iface() const { return &piface_; }
   s4u::Mailbox* get_iface() { return &piface_; }
 
@@ -52,6 +54,7 @@ public:
   void push(CommImplPtr comm);
   void push_done(CommImplPtr done_comm) { done_comm_queue_.push_back(done_comm); }
   void remove(const CommImplPtr& comm);
+  void clear();
   CommImplPtr iprobe(int type, bool (*match_fun)(void*, void*, CommImpl*), void* data);
   CommImplPtr find_matching_comm(CommImplType type, bool (*match_fun)(void*, void*, CommImpl*), void* this_user_data,
                                  const CommImplPtr& my_synchro, bool done, bool remove_matching);
index efe1fa4..49c758a 100644 (file)
@@ -169,6 +169,9 @@ void ActorImpl::cleanup()
     activity->cancel();
   activities_.clear();
 
+  while (not mailboxes.empty())
+    mailboxes.back()->set_receiver(nullptr);
+
   XBT_DEBUG("%s@%s(%ld) should not run anymore", get_cname(), get_host()->get_cname(), get_pid());
 
   if (EngineImpl::get_instance()->is_maestro(this)) /* Do not cleanup maestro */
@@ -212,6 +215,9 @@ void ActorImpl::exit()
     activity->cancel();
   activities_.clear();
 
+  while (not mailboxes.empty())
+    mailboxes.back()->set_receiver(nullptr);
+
   // Forcefully kill the actor if its host is turned off. Not a HostFailureException because you should not survive that
   this->throw_exception(std::make_exception_ptr(ForcefulKillException(host_->is_on() ? "exited" : "host failed")));
 }
index a1672be..7385792 100644 (file)
@@ -28,6 +28,9 @@ class XBT_PUBLIC ActorImpl : public xbt::PropertyHolder {
   bool auto_restart_ = false;
   unsigned stacksize_; // set to default value in constructor
 
+  std::vector<activity::MailboxImpl*> mailboxes;
+  friend activity::MailboxImpl;
+
 public:
   xbt::string name_;
   ActorImpl(xbt::string name, s4u::Host* host);
index 8dee4ed..3eb77ab 100644 (file)
@@ -27,7 +27,7 @@ $ ./host-on-off-actors ${platfdir}/small_platform.xml 4 --log=no_loc
 > [Tremblay:test_launcher:(1) 20.000000] [s4u_test/INFO]   Turn Jupiter off
 > [Tremblay:test_launcher:(1) 20.000000] [s4u_test/INFO] Test 4 is ok.  (number of actors : 2, it should be 1 or 2 if RX has not been satisfied). An exception is raised when we turn off a node that has an actor sleeping
 > [Tremblay:test_launcher:(1) 20.000000] [s4u_test/INFO]   Test done. See you!
-> [Tremblay:commRX:(2) 25.033047] [s4u_test/INFO]   Receive message: COMM
+> [Tremblay:commRX:(2) 25.033047] [s4u_test/INFO]   Receive message: TRANSFER_FAILURE
 > [Tremblay:commRX:(2) 25.033047] [s4u_test/INFO]   RX Done
 > [25.033047] [s4u_test/INFO] Simulation time 25.033