Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add simgrid::plugins::Token. switch from get_tokens to get_next_token_from
[simgrid.git] / src / plugins / task.cpp
index dca0c90..835b307 100644 (file)
@@ -1,3 +1,4 @@
+#include <memory>
 #include <simgrid/Exception.hpp>
 #include <simgrid/plugins/task.hpp>
 #include <simgrid/s4u/Comm.hpp>
@@ -52,7 +53,10 @@ bool Task::ready_to_run() const
 void Task::receive(Task* source)
 {
   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
-  predecessors_[source]++;
+  auto source_count = predecessors_[source]++;
+  if (tokens_received_.size() <= queued_execs_ + source_count)
+    tokens_received_.push_back({});
+  tokens_received_[queued_execs_ + source_count][source] = source->token_;
   bool enough_tokens = true;
   for (auto const& [key, val] : predecessors_)
     if (val < 1) {
@@ -134,6 +138,25 @@ void Task::set_amount(double amount)
   simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
 }
 
+/** @ingroup plugin_task
+ *  @param token The token to set.
+ *  @brief Set the token to send to successors.
+ *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ */
+void Task::set_token(std::shared_ptr<Token> token)
+{
+  simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+}
+
+/** @ingroup plugin_task
+ *  @return Map of tokens received for the next execution.
+ *  @note If there is no queued execution for this task the map might not exist or be partially empty.
+ */
+std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+{
+  return tokens_received_.front()[t];
+}
+
 /** @ingroup plugin_task
  *  @param successor The Task to add.
  *  @brief Add a successor to this Task.
@@ -231,6 +254,8 @@ void ExecTask::fire()
   Task::on_start(this);
   working_          = true;
   queued_execs_     = std::max(queued_execs_ - 1, 0);
+  if (tokens_received_.size() > 0)
+      tokens_received_.pop_front();
   s4u::ExecPtr exec = s4u::Exec::init();
   exec->set_name(name_);
   exec->set_flops_amount(amount_);
@@ -292,6 +317,8 @@ void CommTask::fire()
   Task::on_start(this);
   working_          = true;
   queued_execs_     = std::max(queued_execs_ - 1, 0);
+  if (tokens_received_.size() > 0)
+      tokens_received_.pop_front();
   s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
   comm->set_name(name_);
   comm->set_payload_size(amount_);
@@ -383,6 +410,8 @@ void IoTask::fire()
   Task::on_start(this);
   working_      = true;
   queued_execs_ = std::max(queued_execs_ - 1, 0);
+  if (tokens_received_.size() > 0)
+      tokens_received_.pop_front();
   s4u::IoPtr io = s4u::Io::init();
   io->set_name(name_);
   io->set_size(amount_);