Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Sanitize the handling of timeouts in CommImpl, ExecImpl and IoImpl
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Thu, 23 Feb 2023 21:21:47 +0000 (22:21 +0100)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Thu, 23 Feb 2023 21:37:44 +0000 (22:37 +0100)
There is no need for a timeout detector in ExecImpl and IoImpl, as
wait_for injects a Synchro to that extend on need.

Use a uniq_ptr on the CommImpl timeout detectors, so that the memory
is managed automatically. As it used to be for ExecImpl.

This kills the last occurrence of surf in the code. For real this time.

src/kernel/activity/ActivityImpl.cpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/CommImpl.hpp
src/kernel/activity/ExecImpl.cpp
src/kernel/activity/ExecImpl.hpp
src/kernel/activity/IoImpl.cpp
src/kernel/activity/IoImpl.hpp

index 5d7c922..f425dc8 100644 (file)
@@ -115,9 +115,9 @@ void ActivityImpl::wait_for(actor::ActorImpl* issuer, double timeout)
       sleep_action->set_activity(comm);
 
       if (issuer == comm->src_actor_)
-        comm->src_timeout_ = sleep_action;
+        comm->src_timeout_.reset(sleep_action);
       else
-        comm->dst_timeout_ = sleep_action;
+        comm->dst_timeout_.reset(sleep_action);
     } else {
       SynchroImplPtr synchro(new SynchroImpl([this, issuer]() {
         this->unregister_simcall(&issuer->simcall_);
@@ -236,4 +236,5 @@ void intrusive_ptr_release(ActivityImpl* activity)
     delete activity;
   }
 }
+
 } // namespace simgrid::kernel::activity
index ee24f50..0d81366 100644 (file)
@@ -96,7 +96,7 @@ CommImpl::~CommImpl()
 {
   XBT_DEBUG("Really free communication %p in state %s (detached = %d)", this, get_state_str(), detached_);
 
-  cleanup_surf();
+  clean_action();
 
   if (detached_ && get_state() != State::DONE) {
     /* the communication has failed and was detached:
@@ -397,22 +397,6 @@ void CommImpl::cancel()
   }
 }
 
-/** @brief This is part of the cleanup process, probably an internal command */
-void CommImpl::cleanup_surf()
-{
-  clean_action();
-
-  if (src_timeout_) {
-    src_timeout_->unref();
-    src_timeout_ = nullptr;
-  }
-
-  if (dst_timeout_) {
-    dst_timeout_->unref();
-    dst_timeout_ = nullptr;
-  }
-}
-
 void CommImpl::post()
 {
   on_completion(*this);
@@ -438,7 +422,7 @@ void CommImpl::post()
             src_actor_.get(), dst_actor_.get(), detached_);
 
   /* destroy the model actions associated with the communication activity */
-  cleanup_surf();
+  clean_action();
 
   /* Answer all simcalls associated with the synchro */
   finish();
index c2758ab..7540c4a 100644 (file)
@@ -14,9 +14,10 @@ namespace simgrid::kernel::activity {
 
 enum class CommImplType { SEND, RECEIVE };
 
+using timeout_action_type = std::unique_ptr<resource::Action, std::function<void(resource::Action*)>>;
+
 class XBT_PUBLIC CommImpl : public ActivityImpl_T<CommImpl> {
   ~CommImpl() override;
-  void cleanup_surf();
 
   static std::function<void(CommImpl*, void*, size_t)> copy_data_callback_;
 
@@ -80,8 +81,9 @@ expectations of the other side, too. See  */
   std::function<void(CommImpl*, void*, size_t)> copy_data_fun;
 
   /* Model actions */
-  resource::Action* src_timeout_ = nullptr; /* represents the timeout set by the sender */
-  resource::Action* dst_timeout_ = nullptr; /* represents the timeout set by the receiver */
+  timeout_action_type src_timeout_{nullptr, [](resource::Action* a) { a->unref(); }}; /* timeout set by the sender */
+  timeout_action_type dst_timeout_{nullptr, [](resource::Action* a) { a->unref(); }}; /* timeout set by the receiver */
+
   actor::ActorImplPtr src_actor_ = nullptr;
   actor::ActorImplPtr dst_actor_ = nullptr;
 
index 88432f6..d7368f6 100644 (file)
@@ -41,15 +41,6 @@ ExecImpl& ExecImpl::set_hosts(const std::vector<s4u::Host*>& hosts)
   return *this;
 }
 
-ExecImpl& ExecImpl::set_timeout(double timeout)
-{
-  if (timeout >= 0 && not MC_is_active() && not MC_record_replay_is_active()) {
-    timeout_detector_.reset(get_host()->get_cpu()->sleep(timeout));
-    timeout_detector_->set_activity(this);
-  }
-  return *this;
-}
-
 ExecImpl& ExecImpl::set_flops_amount(double flops_amount)
 {
   flops_amounts_.assign(1, flops_amount);
@@ -153,16 +144,11 @@ void ExecImpl::post()
   } else if (model_action_->get_state() == resource::Action::State::FAILED) {
     /* If all the hosts are running the synchro didn't fail, then the synchro was canceled */
     set_state(State::CANCELED);
-  } else if (timeout_detector_ && timeout_detector_->get_state() == resource::Action::State::FINISHED &&
-             model_action_->get_remains() > 0.0) {
-    model_action_->set_state(resource::Action::State::FAILED);
-    set_state(State::TIMEOUT);
   } else {
     set_state(State::DONE);
   }
 
   clean_action();
-  timeout_detector_.reset();
   if (get_actor() != nullptr) {
     get_actor()->activities_.erase(this);
   }
index d75bf41..d15280c 100644 (file)
@@ -14,8 +14,6 @@
 namespace simgrid::kernel::activity {
 
 class XBT_PUBLIC ExecImpl : public ActivityImpl_T<ExecImpl> {
-  std::unique_ptr<resource::Action, std::function<void(resource::Action*)>> timeout_detector_{
-      nullptr, [](resource::Action* a) { a->unref(); }};
   double sharing_penalty_             = 1.0;
   double bound_                       = 0.0;
   std::vector<double> flops_amounts_;
@@ -26,7 +24,6 @@ class XBT_PUBLIC ExecImpl : public ActivityImpl_T<ExecImpl> {
 public:
   ExecImpl();
 
-  ExecImpl& set_timeout(double timeout) override;
   ExecImpl& set_bound(double bound);
   ExecImpl& set_sharing_penalty(double sharing_penalty);
   ExecImpl& update_sharing_penalty(double sharing_penalty);
index d270c4f..cd57ece 100644 (file)
@@ -38,14 +38,6 @@ IoImpl& IoImpl::update_sharing_penalty(double sharing_penalty)
   return *this;
 }
 
-IoImpl& IoImpl::set_timeout(double timeout)
-{
-  const s4u::Host* host = get_disk()->get_host();
-  timeout_detector_     = host->get_cpu()->sleep(timeout);
-  timeout_detector_->set_activity(this);
-  return *this;
-}
-
 IoImpl& IoImpl::set_type(s4u::Io::OpType type)
 {
   type_ = type;
@@ -116,19 +108,11 @@ void IoImpl::post()
       set_state(State::FAILED);
     else
       set_state(State::CANCELED);
-  } else if (timeout_detector_ && timeout_detector_->get_state() == resource::Action::State::FINISHED &&
-             model_action_->get_remains() > 0.0) {
-    model_action_->set_state(resource::Action::State::FAILED);
-    set_state(State::TIMEOUT);
   } else {
     set_state(State::DONE);
   }
 
   clean_action();
-  if (timeout_detector_) {
-    timeout_detector_->unref();
-    timeout_detector_ = nullptr;
-  }
 
   /* Answer all simcalls associated with the synchro */
   finish();
index 0e3dd9f..22e46a3 100644 (file)
@@ -20,13 +20,11 @@ class XBT_PUBLIC IoImpl : public ActivityImpl_T<IoImpl> {
   sg_size_t size_                     = 0;
   s4u::Io::OpType type_               = s4u::Io::OpType::READ;
   sg_size_t performed_ioops_          = 0;
-  resource::Action* timeout_detector_ = nullptr;
 
 public:
   IoImpl();
 
   IoImpl& set_sharing_penalty(double sharing_penalty);
-  IoImpl& set_timeout(double timeout) override;
   IoImpl& set_size(sg_size_t size);
   IoImpl& set_type(s4u::Io::OpType type);
   IoImpl& set_disk(resource::DiskImpl* disk);