Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix all Task examples to support the new Task format with dispatcher and collector
[simgrid.git] / src / s4u / s4u_Task.cpp
index e6dbe06..ca72bfb 100644 (file)
@@ -51,10 +51,8 @@ void Task::receive(Task* source)
   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
   auto source_count = predecessors_[source];
   predecessors_[source]++;
-  if (tokens_received_.size() <= queued_firings_["dispatcher"] + source_count)
-    tokens_received_.emplace_back();
-  tokens_received_[queued_firings_["dispatcher"] + source_count][source] = source->token_;
-  bool enough_tokens                                                     = true;
+  tokens_received_[source].push_back(source->token_);
+  bool enough_tokens = true;
   for (auto const& [key, val] : predecessors_)
     if (val < 1) {
       enough_tokens = false;
@@ -82,6 +80,7 @@ void Task::complete(std::string instance)
   running_instances_[instance] = running_instances_[instance] - 1;
   count_[instance]             = count_[instance] + 1;
   if (instance == "collector") {
+    // XBT_INFO("Trigger on completion: %s - %s", get_cname(), instance.c_str());
     on_this_completion(this);
     on_completion(this);
     for (auto const& t : successors_)
@@ -167,26 +166,29 @@ void Task::set_amount(double amount, std::string instance)
 
 /** @param token The token to set.
  *  @brief Set the token to send to successors.
- *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ *  @note The token is passed to each successor after the task end, i.e., after the on_completion callback.
  */
 void Task::set_token(std::shared_ptr<Token> token)
 {
   simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
 }
 
+void Task::deque_token_from(TaskPtr t)
+{
+  simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_.at(t).pop_front(); });
+}
+
 void Task::fire(std::string instance)
 {
   if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) {
     current_activities_[instance].pop_front();
   }
-  if (instance == "dispatcher") {
+  if (instance != "dispatcher" and instance != "collector") {
     on_this_start(this);
     on_start(this);
   }
   running_instances_[instance]++;
   queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0);
-  if (not tokens_received_.empty())
-    tokens_received_.pop_front();
 }
 
 /** @param successor The Task to add.