Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cleanups and refactoring in s4u::Task
authorFred Suter <suterf@ornl.gov>
Wed, 21 Jun 2023 16:57:36 +0000 (12:57 -0400)
committerFred Suter <suterf@ornl.gov>
Wed, 21 Jun 2023 17:44:32 +0000 (13:44 -0400)
12 files changed:
examples/cpp/task-io/s4u-task-io.cpp
examples/cpp/task-simple/s4u-task-simple.cpp
examples/cpp/task-storm/s4u-task-storm.cpp
examples/cpp/task-switch-host/s4u-task-switch-host.cpp
examples/cpp/task-variable-load/s4u-task-variable-load.cpp
examples/python/task-io/task-io.py
examples/python/task-simple/task-simple.py
examples/python/task-switch-host/task-switch-host.py
examples/python/task-variable-load/task-variable-load.py
include/simgrid/s4u/Task.hpp
src/bindings/python/simgrid_python.cpp
src/s4u/s4u_Task.cpp

index 19102f2..52dcca1 100644 (file)
@@ -41,7 +41,7 @@ int main(int argc, char* argv[])
   read->add_successor(exec2);
 
   // Add a function to be called when tasks end for log purpose
-  sg4::Task::on_end_cb([](const sg4::Task* t) {
+  sg4::Task::on_completion_cb([](const sg4::Task* t) {
     XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
   });
 
index dc3df1d..0ec6aaa 100644 (file)
@@ -38,7 +38,7 @@ int main(int argc, char* argv[])
   comm->add_successor(exec2);
 
   // Add a function to be called when tasks end for log purpose
-  sg4::Task::on_end_cb([](const sg4::Task* t) {
+  sg4::Task::on_completion_cb([](const sg4::Task* t) {
     XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
   });
 
index bfd52b7..ca75e9a 100644 (file)
@@ -119,7 +119,7 @@ int main(int argc, char* argv[])
   SB->enqueue_execs(5);
 
   // Add a function to be called when tasks end for log purpose
-  sg4::Task::on_end_cb([]
+  sg4::Task::on_completion_cb([]
   (const sg4::Task* t) {
     XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
   });
index ae19ebc..3694a7d 100644 (file)
@@ -46,7 +46,7 @@ int main(int argc, char* argv[])
   exec2->add_successor(comm2);
 
   // Add a function to be called when tasks end for log purpose
-  sg4::Task::on_end_cb([](const sg4::Task* t) {
+  sg4::Task::on_completion_cb([](const sg4::Task* t) {
     XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
   });
 
index b0d3efd..df41c95 100644 (file)
@@ -50,7 +50,7 @@ int main(int argc, char* argv[])
   comm->add_successor(exec);
 
   // Add a function to be called when tasks end for log purpose
-  sg4::Task::on_end_cb([](const sg4::Task* t) {
+  sg4::Task::on_completion_cb([](const sg4::Task* t) {
     XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
   });
 
index 431a789..4db1456 100644 (file)
@@ -41,7 +41,7 @@ if __name__ == '__main__':
     read.add_successor(exec2)
 
     # Add a function to be called when tasks end for log purpose
-    Task.on_end_cb(callback)
+    Task.on_completion_cb(callback)
 
     # Enqueue two executions for task exec1
     exec1.enqueue_execs(2)
index 4ac876e..ce57577 100644 (file)
@@ -49,7 +49,7 @@ if __name__ == '__main__':
     comm.add_successor(exec2)
 
     # Add a function to be called when tasks end for log purpose
-    Task.on_end_cb(callback)
+    Task.on_completion_cb(callback)
 
     # Enqueue two executions for task exec1
     exec1.enqueue_execs(2)
index f56bc40..53ba3f2 100644 (file)
@@ -75,7 +75,7 @@ if __name__ == '__main__':
     exec2.add_successor(comm2)
 
     # Add a function to be called when tasks end for log purpose
-    Task.on_end_cb(callback)
+    Task.on_completion_cb(callback)
 
     # Add a function to be called before each executions of comm0
     # This function modifies the graph of tasks by adding or removing
index 63925d9..719bf56 100644 (file)
@@ -58,7 +58,7 @@ if __name__ == '__main__':
     comm.add_successor(exec)
 
     # Add a function to be called when tasks end for log purpose
-    Task.on_end_cb(callback)
+    Task.on_completion_cb(callback)
 
     # Create the actor that will inject load during the simulation
     Actor.create("input", tremblay, variable_load, comm)
index ef78734..d6f41f5 100644 (file)
@@ -27,56 +27,65 @@ using IoTaskPtr = boost::intrusive_ptr<IoTask>;
 class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
 
 class Task {
+  std::string name_;
+  double amount_;
+  int queued_execs_ = 0;
+  int count_        = 0;
+  bool working_     = false;
+
   std::set<Task*> successors_                 = {};
   std::map<Task*, unsigned int> predecessors_ = {};
+  std::atomic_int_fast32_t refcount_{0};
 
   bool ready_to_run() const;
   void receive(Task* source);
 
-protected:
-  std::string name_;
-  double amount_;
   std::shared_ptr<Token> token_ = nullptr;
   std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
-  int queued_execs_ = 0;
-  int count_        = 0;
-  bool working_     = false;
-  s4u::ActivityPtr previous_activity_;
-  s4u::ActivityPtr current_activity_;
-  xbt::signal<void(Task*)> on_this_start_;
-  xbt::signal<void(Task*)> on_this_end_;
+  ActivityPtr previous_activity_;
+  ActivityPtr current_activity_;
+
+protected:
   explicit Task(const std::string& name);
   virtual ~Task()     = default;
-  virtual void fire() = 0;
+
+  virtual void fire();
   void complete();
 
-  static xbt::signal<void(Task*)> on_start;
-  static xbt::signal<void(Task*)> on_end;
-  std::atomic_int_fast32_t refcount_{0};
+  void set_current_activity (ActivityPtr a) { current_activity_ = a; }
+
+  inline static xbt::signal<void(Task*)> on_start;
+  xbt::signal<void(Task*)> on_this_start;
+  inline static xbt::signal<void(Task*)> on_completion;
+  xbt::signal<void(Task*)> on_this_completion;
 
 public:
   const std::string& get_name() const { return name_; }
   const char* get_cname() const { return name_.c_str(); }
-  void enqueue_execs(int n);
   void set_amount(double amount);
   double get_amount() const { return amount_; }
+  int get_count() const { return count_; }
+
   void set_token(std::shared_ptr<Token> token);
   std::shared_ptr<Token> get_next_token_from(TaskPtr t);
+
   void add_successor(TaskPtr t);
   void remove_successor(TaskPtr t);
   void remove_all_successors();
   const std::set<Task*>& get_successors() const { return successors_; }
-  void on_this_start_cb(const std::function<void(Task*)>& func);
-  void on_this_end_cb(const std::function<void(Task*)>& func);
-  int get_count() const;
 
-  /** Add a callback fired before a task activity start.
+  void enqueue_execs(int n);
+
+  /** Add a callback fired before this task activity starts */
+  void on_this_start_cb(const std::function<void(Task*)>& func){ on_this_start.connect(func); }
+  /** Add a callback fired before a task activity starts.
    * Triggered after the on_this_start function**/
   static void on_start_cb(const std::function<void(Task*)>& cb) { on_start.connect(cb); }
-  /** Add a callback fired after a task activity end.
-   * Triggered after the on_this_end function, but before
-   * sending tokens to successors.**/
-  static void on_end_cb(const std::function<void(Task*)>& cb) { on_end.connect(cb); }
+  /** Add a callback fired before this task activity ends */
+  void on_this_completion_cb(const std::function<void(Task*)>& func) { on_this_completion.connect(func); };
+  /** Add a callback fired after a task activity ends.
+   * Triggered after the on_this_end function, but before sending tokens to successors.**/
+  static void on_completion_cb(const std::function<void(Task*)>& cb) { on_completion.connect(cb); }
 
 #ifndef DOXYGEN
   friend void intrusive_ptr_release(Task* o)
@@ -90,54 +99,57 @@ public:
 #endif
 };
 
-class ExecTask : public Task {
-  s4u::Host* host_;
+class CommTask : public Task {
+  Host* source_;
+  Host* destination_;
 
-  explicit ExecTask(const std::string& name);
+  explicit CommTask(const std::string& name);
   void fire() override;
 
 public:
-  static ExecTaskPtr init(const std::string& name);
-  static ExecTaskPtr init(const std::string& name, double flops, s4u::Host* host);
-  ExecTaskPtr set_host(s4u::Host* host);
-  s4u::Host* get_host() const { return host_; }
-  ExecTaskPtr set_flops(double flops);
-  double get_flops() const { return get_amount(); }
+  static CommTaskPtr init(const std::string& name);
+  static CommTaskPtr init(const std::string& name, double bytes, Host* source, Host* destination);
+
+  CommTaskPtr set_source(Host* source);
+  Host* get_source() const { return source_; }
+  CommTaskPtr set_destination(Host* destination);
+  Host* get_destination() const { return destination_; }
+  CommTaskPtr set_bytes(double bytes);
+  double get_bytes() const { return get_amount(); }
 };
 
-class CommTask : public Task {
-  s4u::Host* source_;
-  s4u::Host* destination_;
+class ExecTask : public Task {
+  Host* host_;
 
-  explicit CommTask(const std::string& name);
+  explicit ExecTask(const std::string& name);
   void fire() override;
 
 public:
-  static CommTaskPtr init(const std::string& name);
-  static CommTaskPtr init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination);
-  CommTaskPtr set_source(s4u::Host* source);
-  s4u::Host* get_source() const { return source_; }
-  CommTaskPtr set_destination(s4u::Host* destination);
-  s4u::Host* get_destination() const { return destination_; }
-  CommTaskPtr set_bytes(double bytes);
-  double get_bytes() const { return get_amount(); }
+  static ExecTaskPtr init(const std::string& name);
+  static ExecTaskPtr init(const std::string& name, double flops, Host* host);
+
+  ExecTaskPtr set_host(Host* host);
+  Host* get_host() const { return host_; }
+  ExecTaskPtr set_flops(double flops);
+  double get_flops() const { return get_amount(); }
 };
 
 class IoTask : public Task {
-  s4u::Disk* disk_;
-  s4u::Io::OpType type_;
+  Disk* disk_;
+  Io::OpType type_;
   explicit IoTask(const std::string& name);
   void fire() override;
 
 public:
   static IoTaskPtr init(const std::string& name);
-  static IoTaskPtr init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type);
-  IoTaskPtr set_disk(s4u::Disk* disk);
-  s4u::Disk* get_disk() const { return disk_; }
+  static IoTaskPtr init(const std::string& name, double bytes, Disk* disk, Io::OpType type);
+
+  IoTaskPtr set_disk(Disk* disk);
+  Disk* get_disk() const { return disk_; }
   IoTaskPtr set_bytes(double bytes);
   double get_bytes() const { return get_amount(); }
-  IoTaskPtr set_op_type(s4u::Io::OpType type);
-  s4u::Io::OpType get_op_type() const { return type_; }
+  IoTaskPtr set_op_type(Io::OpType type);
+  Io::OpType get_op_type() const { return type_; }
 };
 } // namespace simgrid::s4u
 #endif
index 52f5ca6..f7f06b7 100644 (file)
@@ -933,11 +933,11 @@ PYBIND11_MODULE(simgrid, m)
           },
           "Add a callback called when each task starts.")
       .def_static(
-          "on_end_cb",
+          "on_completion_cb",
           [](py::object cb) {
             cb.inc_ref(); // keep alive after return
             const py::gil_scoped_release gil_release;
-            Task::on_end_cb([cb_p = cb.ptr()](Task* op) {
+            Task::on_completion_cb([cb_p = cb.ptr()](Task* op) {
               const py::gil_scoped_acquire py_context; // need a new context for callback
               py::reinterpret_borrow<py::function>(cb_p)(op);
             });
@@ -957,7 +957,7 @@ PYBIND11_MODULE(simgrid, m)
            "Remove all successors of this task.")
       .def("on_this_start_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_start_cb),
            py::arg("func"), "Add a callback called when this task starts.")
-      .def("on_this_end_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_end_cb),
+      .def("on_this_completion_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_completion_cb),
            py::arg("func"), "Add a callback called when this task ends.")
       .def(
           "__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; },
index c0a6e38..9c9fbe9 100644 (file)
@@ -25,9 +25,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plug
 
 namespace simgrid::s4u {
 
-xbt::signal<void(Task*)> Task::on_start;
-xbt::signal<void(Task*)> Task::on_end;
-
 Task::Task(const std::string& name) : name_(name) {}
 
 /**
@@ -76,11 +73,11 @@ void Task::receive(Task* source)
  */
 void Task::complete()
 {
-  xbt_assert(s4u::Actor::is_maestro());
+  xbt_assert(Actor::is_maestro());
   working_ = false;
   count_++;
-  on_this_end_(this);
-  Task::on_end(this);
+  on_this_completion(this);
+  on_completion(this);
   if (current_activity_)
     previous_activity_ = std::move(current_activity_);
   for (auto const& t : successors_)
@@ -128,6 +125,15 @@ std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
   return tokens_received_.front()[t];
 }
 
+void Task::fire() {
+  on_this_start(this);
+  on_start(this);
+  working_ = true;
+  queued_execs_ = std::max(queued_execs_ - 1, 0);
+  if (tokens_received_.size() > 0)
+    tokens_received_.pop_front();
+}
+
 /** @param successor The Task to add.
  *  @brief Add a successor to this Task.
  *  @note It also adds this as a predecessor of successor.
@@ -163,34 +169,6 @@ void Task::remove_all_successors()
   });
 }
 
-/** @ingroup plugin_task
- *  @param func The function to set.
- *  @brief Set a function to be called before each execution.
- *  @note The function is called before the underlying Activity starts.
- */
-void Task::on_this_start_cb(const std::function<void(Task*)>& func)
-{
-  simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
-}
-
-/** @ingroup plugin_task
- *  @param func The function to set.
- *  @brief Set a function to be called after each execution.
- *  @note The function is called after the underlying Activity ends, but before sending tokens to successors.
- */
-void Task::on_this_end_cb(const std::function<void(Task*)>& func)
-{
-  simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
-}
-
-/** @ingroup plugin_task
- *  @brief Return the number of completed executions.
- */
-int Task::get_count() const
-{
-  return count_;
-}
-
 /**
  *  @brief Default constructor.
  */
@@ -207,7 +185,7 @@ ExecTaskPtr ExecTask::init(const std::string& name)
 /** @ingroup plugin_task
  *  @brief Smart Constructor.
  */
-ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
+ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
 {
   return init(name)->set_flops(flops)->set_host(host);
 }
@@ -219,26 +197,18 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos
  */
 void ExecTask::fire()
 {
-  on_this_start_(this);
-  Task::on_start(this);
-  working_          = true;
-  queued_execs_     = std::max(queued_execs_ - 1, 0);
-  if (tokens_received_.size() > 0)
-      tokens_received_.pop_front();
-  s4u::ExecPtr exec = s4u::Exec::init();
-  exec->set_name(name_);
-  exec->set_flops_amount(amount_);
-  exec->set_host(host_);
+  Task::fire();
+  auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
   exec->start();
-  exec->on_this_completion_cb([this](Exec const& exec) { this->complete(); });
-  current_activity_                                   = exec;
+  exec->on_this_completion_cb([this](Exec const&) { this->complete(); });
+  set_current_activity(exec);
 }
 
 /** @ingroup plugin_task
  *  @param host The host to set.
  *  @brief Set a new host.
  */
-ExecTaskPtr ExecTask::set_host(s4u::Host* host)
+ExecTaskPtr ExecTask::set_host(Host* host)
 {
   kernel::actor::simcall_answered([this, host] { host_ = host; });
   return this;
@@ -249,7 +219,7 @@ ExecTaskPtr ExecTask::set_host(s4u::Host* host)
  */
 ExecTaskPtr ExecTask::set_flops(double flops)
 {
-  kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
+  kernel::actor::simcall_answered([this, flops] { set_amount(flops); });
   return this;
 }
 
@@ -269,7 +239,7 @@ CommTaskPtr CommTask::init(const std::string& name)
 /** @ingroup plugin_task
  *  @brief Smart constructor.
  */
-CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
+CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
 {
   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
 }
@@ -281,25 +251,18 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou
  */
 void CommTask::fire()
 {
-  on_this_start_(this);
-  Task::on_start(this);
-  working_          = true;
-  queued_execs_     = std::max(queued_execs_ - 1, 0);
-  if (tokens_received_.size() > 0)
-      tokens_received_.pop_front();
-  s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
-  comm->set_name(name_);
-  comm->set_payload_size(amount_);
+  Task::fire();
+  auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
   comm->start();
-  comm->on_this_completion_cb([this](Comm const& comm) { this->complete(); });
-  current_activity_                                   = comm;
+  comm->on_this_completion_cb([this](Comm const&) { this->complete(); });
+  set_current_activity(comm);
 }
 
 /** @ingroup plugin_task
  *  @param source The host to set.
  *  @brief Set a new source host.
  */
-CommTaskPtr CommTask::set_source(s4u::Host* source)
+CommTaskPtr CommTask::set_source(Host* source)
 {
   kernel::actor::simcall_answered([this, source] { source_ = source; });
   return this;
@@ -309,7 +272,7 @@ CommTaskPtr CommTask::set_source(s4u::Host* source)
  *  @param destination The host to set.
  *  @brief Set a new destination host.
  */
-CommTaskPtr CommTask::set_destination(s4u::Host* destination)
+CommTaskPtr CommTask::set_destination(Host* destination)
 {
   kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
   return this;
@@ -320,7 +283,7 @@ CommTaskPtr CommTask::set_destination(s4u::Host* destination)
  */
 CommTaskPtr CommTask::set_bytes(double bytes)
 {
-  kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+  kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
   return this;
 }
 
@@ -340,7 +303,7 @@ IoTaskPtr IoTask::init(const std::string& name)
 /** @ingroup plugin_task
  *  @brief Smart Constructor.
  */
-IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
+IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
 {
   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
 }
@@ -349,7 +312,7 @@ IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s
  *  @param disk The disk to set.
  *  @brief Set a new disk.
  */
-IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
+IoTaskPtr IoTask::set_disk(Disk* disk)
 {
   kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
   return this;
@@ -360,12 +323,12 @@ IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
  */
 IoTaskPtr IoTask::set_bytes(double bytes)
 {
-  kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+  kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
   return this;
 }
 
 /** @ingroup plugin_task */
-IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
+IoTaskPtr IoTask::set_op_type(Io::OpType type)
 {
   kernel::actor::simcall_answered([this, type] { type_ = type; });
   return this;
@@ -373,20 +336,11 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
 
 void IoTask::fire()
 {
-  on_this_start_(this);
-  Task::on_start(this);
-  working_      = true;
-  queued_execs_ = std::max(queued_execs_ - 1, 0);
-  if (tokens_received_.size() > 0)
-      tokens_received_.pop_front();
-  s4u::IoPtr io = s4u::Io::init();
-  io->set_name(name_);
-  io->set_size(amount_);
-  io->set_disk(disk_);
-  io->set_op_type(type_);
+  Task::fire();
+  auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
   io->start();
-  io->on_this_completion_cb([this](Io const& io) { this->complete(); });
-  current_activity_ = io;
+  io->on_this_completion_cb([this](Io const&) { this->complete(); });
+  set_current_activity(io);
 }
 
 } // namespace simgrid::s4u