Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of https://framagit.org/simgrid/simgrid
[simgrid.git] / src / s4u / s4u_Task.cpp
similarity index 57%
rename from src/plugins/task.cpp
rename to src/s4u/s4u_Task.cpp
index dca0c90..b6d8ccd 100644 (file)
@@ -1,23 +1,21 @@
+#include <memory>
 #include <simgrid/Exception.hpp>
 #include <simgrid/Exception.hpp>
-#include <simgrid/plugins/task.hpp>
 #include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Exec.hpp>
 #include <simgrid/s4u/Io.hpp>
 #include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Exec.hpp>
 #include <simgrid/s4u/Io.hpp>
+#include <simgrid/s4u/Task.hpp>
 #include <simgrid/simix.hpp>
 
 #include "src/simgrid/module.hpp"
 
 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
 #include <simgrid/simix.hpp>
 
 #include "src/simgrid/module.hpp"
 
 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
-/** @defgroup plugin_task plugin_task Plugin Task
-
+/**
   @beginrst
 
   @beginrst
 
-This is the task plugin, enabling management of Tasks.
-To activate this plugin, first call :cpp:func:`Task::init`.
 
 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
 Tasks can only be instancied using either
 
 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
 Tasks can only be instancied using either
-:cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
+:cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init`
 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
 
 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
 
@@ -25,12 +23,7 @@ A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API
  */
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
 
  */
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
 
-namespace simgrid::plugins {
-
-xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
-
-xbt::signal<void(Task*)> Task::on_start;
-xbt::signal<void(Task*)> Task::on_end;
+namespace simgrid::s4u {
 
 Task::Task(const std::string& name) : name_(name) {}
 
 
 Task::Task(const std::string& name) : name_(name) {}
 
@@ -40,7 +33,7 @@ Task::Task(const std::string& name) : name_(name) {}
  */
 bool Task::ready_to_run() const
 {
  */
 bool Task::ready_to_run() const
 {
-  return not working_ && queued_execs_ > 0;
+  return not working_ && queued_firings_ > 0;
 }
 
 /**
 }
 
 /**
@@ -52,8 +45,11 @@ bool Task::ready_to_run() const
 void Task::receive(Task* source)
 {
   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
 void Task::receive(Task* source)
 {
   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
-  predecessors_[source]++;
-  bool enough_tokens = true;
+  auto source_count = predecessors_[source]++;
+  if (tokens_received_.size() <= queued_firings_ + source_count)
+    tokens_received_.push_back({});
+  tokens_received_[queued_firings_ + source_count][source] = source->token_;
+  bool enough_tokens                                       = true;
   for (auto const& [key, val] : predecessors_)
     if (val < 1) {
       enough_tokens = false;
   for (auto const& [key, val] : predecessors_)
     if (val < 1) {
       enough_tokens = false;
@@ -62,7 +58,7 @@ void Task::receive(Task* source)
   if (enough_tokens) {
     for (auto& [key, val] : predecessors_)
       val--;
   if (enough_tokens) {
     for (auto& [key, val] : predecessors_)
       val--;
-    enqueue_execs(1);
+    enqueue_firings(1);
   }
 }
 
   }
 }
 
@@ -77,11 +73,11 @@ void Task::receive(Task* source)
  */
 void Task::complete()
 {
  */
 void Task::complete()
 {
-  xbt_assert(s4u::Actor::is_maestro());
+  xbt_assert(Actor::is_maestro());
   working_ = false;
   count_++;
   working_ = false;
   count_++;
-  on_this_end_(this);
-  Task::on_end(this);
+  on_this_completion(this);
+  on_completion(this);
   if (current_activity_)
     previous_activity_ = std::move(current_activity_);
   for (auto const& t : successors_)
   if (current_activity_)
     previous_activity_ = std::move(current_activity_);
   for (auto const& t : successors_)
@@ -90,42 +86,20 @@ void Task::complete()
     fire();
 }
 
     fire();
 }
 
-/** @ingroup plugin_task
- *  @brief Init the Task plugin.
- *  @note Add a completion callback to all Activities to call Task::complete().
- */
-void Task::init()
-{
-  static bool inited = false;
-  if (inited)
-    return;
-
-  inited                                  = true;
-  ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
-  simgrid::s4u::Exec::on_completion_cb(
-      [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
-  simgrid::s4u::Comm::on_completion_cb(
-      [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
-  simgrid::s4u::Io::on_completion_cb(
-      [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
-}
-
-/** @ingroup plugin_task
- *  @param n The number of executions to enqueue.
- *  @brief Enqueue executions.
- *  @note Immediatly starts an execution if possible.
+/** @param n The number of firings to enqueue.
+ *  @brief Enqueue firing.
+ *  @note Immediatly fire an activity if possible.
  */
  */
-void Task::enqueue_execs(int n)
+void Task::enqueue_firings(int n)
 {
   simgrid::kernel::actor::simcall_answered([this, n] {
 {
   simgrid::kernel::actor::simcall_answered([this, n] {
-    queued_execs_ += n;
+    queued_firings_ += n;
     if (ready_to_run())
       fire();
   });
 }
 
     if (ready_to_run())
       fire();
   });
 }
 
-/** @ingroup plugin_task
- *  @param amount The amount to set.
+/** @param amount The amount to set.
  *  @brief Set the amout of work to do.
  *  @note Amount in flop for ExecTask and in bytes for CommTask.
  */
  *  @brief Set the amout of work to do.
  *  @note Amount in flop for ExecTask and in bytes for CommTask.
  */
@@ -134,8 +108,34 @@ void Task::set_amount(double amount)
   simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
 }
 
   simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
 }
 
-/** @ingroup plugin_task
- *  @param successor The Task to add.
+/** @param token The token to set.
+ *  @brief Set the token to send to successors.
+ *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ */
+void Task::set_token(std::shared_ptr<Token> token)
+{
+  simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+}
+
+/** @return Map of tokens received for the next execution.
+ *  @note If there is no queued execution for this task the map might not exist or be partially empty.
+ */
+std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+{
+  return tokens_received_.front()[t];
+}
+
+void Task::fire()
+{
+  on_this_start(this);
+  on_start(this);
+  working_        = true;
+  queued_firings_ = std::max(queued_firings_ - 1, 0);
+  if (tokens_received_.size() > 0)
+    tokens_received_.pop_front();
+}
+
+/** @param successor The Task to add.
  *  @brief Add a successor to this Task.
  *  @note It also adds this as a predecessor of successor.
  */
  *  @brief Add a successor to this Task.
  *  @note It also adds this as a predecessor of successor.
  */
@@ -147,8 +147,7 @@ void Task::add_successor(TaskPtr successor)
   });
 }
 
   });
 }
 
-/** @ingroup plugin_task
- *  @param successor The Task to remove.
+/** @param successor The Task to remove.
  *  @brief Remove a successor from this Task.
  *  @note It also remove this from the predecessors of successor.
  */
  *  @brief Remove a successor from this Task.
  *  @note It also remove this from the predecessors of successor.
  */
@@ -171,34 +170,6 @@ void Task::remove_all_successors()
   });
 }
 
   });
 }
 
-/** @ingroup plugin_task
- *  @param func The function to set.
- *  @brief Set a function to be called before each execution.
- *  @note The function is called before the underlying Activity starts.
- */
-void Task::on_this_start_cb(const std::function<void(Task*)>& func)
-{
-  simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
-}
-
-/** @ingroup plugin_task
- *  @param func The function to set.
- *  @brief Set a function to be called after each execution.
- *  @note The function is called after the underlying Activity ends, but before sending tokens to successors.
- */
-void Task::on_this_end_cb(const std::function<void(Task*)>& func)
-{
-  simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
-}
-
-/** @ingroup plugin_task
- *  @brief Return the number of completed executions.
- */
-int Task::get_count() const
-{
-  return count_;
-}
-
 /**
  *  @brief Default constructor.
  */
 /**
  *  @brief Default constructor.
  */
@@ -215,7 +186,7 @@ ExecTaskPtr ExecTask::init(const std::string& name)
 /** @ingroup plugin_task
  *  @brief Smart Constructor.
  */
 /** @ingroup plugin_task
  *  @brief Smart Constructor.
  */
-ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
+ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
 {
   return init(name)->set_flops(flops)->set_host(host);
 }
 {
   return init(name)->set_flops(flops)->set_host(host);
 }
@@ -227,25 +198,18 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos
  */
 void ExecTask::fire()
 {
  */
 void ExecTask::fire()
 {
-  on_this_start_(this);
-  Task::on_start(this);
-  working_          = true;
-  queued_execs_     = std::max(queued_execs_ - 1, 0);
-  s4u::ExecPtr exec = s4u::Exec::init();
-  exec->set_name(name_);
-  exec->set_flops_amount(amount_);
-  exec->set_host(host_);
+  Task::fire();
+  auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
   exec->start();
   exec->start();
-  exec->extension_set(new ExtendedAttributeActivity());
-  exec->extension<ExtendedAttributeActivity>()->task_ = this;
-  current_activity_                                   = exec;
+  exec->on_this_completion_cb([this](Exec const&) { this->complete(); });
+  set_current_activity(exec);
 }
 
 /** @ingroup plugin_task
  *  @param host The host to set.
  *  @brief Set a new host.
  */
 }
 
 /** @ingroup plugin_task
  *  @param host The host to set.
  *  @brief Set a new host.
  */
-ExecTaskPtr ExecTask::set_host(s4u::Host* host)
+ExecTaskPtr ExecTask::set_host(Host* host)
 {
   kernel::actor::simcall_answered([this, host] { host_ = host; });
   return this;
 {
   kernel::actor::simcall_answered([this, host] { host_ = host; });
   return this;
@@ -256,7 +220,7 @@ ExecTaskPtr ExecTask::set_host(s4u::Host* host)
  */
 ExecTaskPtr ExecTask::set_flops(double flops)
 {
  */
 ExecTaskPtr ExecTask::set_flops(double flops)
 {
-  kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
+  kernel::actor::simcall_answered([this, flops] { set_amount(flops); });
   return this;
 }
 
   return this;
 }
 
@@ -276,7 +240,7 @@ CommTaskPtr CommTask::init(const std::string& name)
 /** @ingroup plugin_task
  *  @brief Smart constructor.
  */
 /** @ingroup plugin_task
  *  @brief Smart constructor.
  */
-CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
+CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
 {
   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
 }
 {
   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
 }
@@ -288,24 +252,18 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou
  */
 void CommTask::fire()
 {
  */
 void CommTask::fire()
 {
-  on_this_start_(this);
-  Task::on_start(this);
-  working_          = true;
-  queued_execs_     = std::max(queued_execs_ - 1, 0);
-  s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
-  comm->set_name(name_);
-  comm->set_payload_size(amount_);
+  Task::fire();
+  auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
   comm->start();
   comm->start();
-  comm->extension_set(new ExtendedAttributeActivity());
-  comm->extension<ExtendedAttributeActivity>()->task_ = this;
-  current_activity_                                   = comm;
+  comm->on_this_completion_cb([this](Comm const&) { this->complete(); });
+  set_current_activity(comm);
 }
 
 /** @ingroup plugin_task
  *  @param source The host to set.
  *  @brief Set a new source host.
  */
 }
 
 /** @ingroup plugin_task
  *  @param source The host to set.
  *  @brief Set a new source host.
  */
-CommTaskPtr CommTask::set_source(s4u::Host* source)
+CommTaskPtr CommTask::set_source(Host* source)
 {
   kernel::actor::simcall_answered([this, source] { source_ = source; });
   return this;
 {
   kernel::actor::simcall_answered([this, source] { source_ = source; });
   return this;
@@ -315,7 +273,7 @@ CommTaskPtr CommTask::set_source(s4u::Host* source)
  *  @param destination The host to set.
  *  @brief Set a new destination host.
  */
  *  @param destination The host to set.
  *  @brief Set a new destination host.
  */
-CommTaskPtr CommTask::set_destination(s4u::Host* destination)
+CommTaskPtr CommTask::set_destination(Host* destination)
 {
   kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
   return this;
 {
   kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
   return this;
@@ -326,7 +284,7 @@ CommTaskPtr CommTask::set_destination(s4u::Host* destination)
  */
 CommTaskPtr CommTask::set_bytes(double bytes)
 {
  */
 CommTaskPtr CommTask::set_bytes(double bytes)
 {
-  kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+  kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
   return this;
 }
 
   return this;
 }
 
@@ -346,7 +304,7 @@ IoTaskPtr IoTask::init(const std::string& name)
 /** @ingroup plugin_task
  *  @brief Smart Constructor.
  */
 /** @ingroup plugin_task
  *  @brief Smart Constructor.
  */
-IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
+IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
 {
   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
 }
 {
   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
 }
@@ -355,7 +313,7 @@ IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s
  *  @param disk The disk to set.
  *  @brief Set a new disk.
  */
  *  @param disk The disk to set.
  *  @brief Set a new disk.
  */
-IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
+IoTaskPtr IoTask::set_disk(Disk* disk)
 {
   kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
   return this;
 {
   kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
   return this;
@@ -366,12 +324,12 @@ IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
  */
 IoTaskPtr IoTask::set_bytes(double bytes)
 {
  */
 IoTaskPtr IoTask::set_bytes(double bytes)
 {
-  kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+  kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
   return this;
 }
 
 /** @ingroup plugin_task */
   return this;
 }
 
 /** @ingroup plugin_task */
-IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
+IoTaskPtr IoTask::set_op_type(Io::OpType type)
 {
   kernel::actor::simcall_answered([this, type] { type_ = type; });
   return this;
 {
   kernel::actor::simcall_answered([this, type] { type_ = type; });
   return this;
@@ -379,19 +337,11 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
 
 void IoTask::fire()
 {
 
 void IoTask::fire()
 {
-  on_this_start_(this);
-  Task::on_start(this);
-  working_      = true;
-  queued_execs_ = std::max(queued_execs_ - 1, 0);
-  s4u::IoPtr io = s4u::Io::init();
-  io->set_name(name_);
-  io->set_size(amount_);
-  io->set_disk(disk_);
-  io->set_op_type(type_);
+  Task::fire();
+  auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
   io->start();
   io->start();
-  io->extension_set(new ExtendedAttributeActivity());
-  io->extension<ExtendedAttributeActivity>()->task_ = this;
-  current_activity_                                 = io;
+  io->on_this_completion_cb([this](Io const&) { this->complete(); });
+  set_current_activity(io);
 }
 
 }
 
-} // namespace simgrid::plugins
+} // namespace simgrid::s4u