XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
auto source_count = predecessors_[source];
predecessors_[source]++;
- if (tokens_received_.size() <= queued_firings_["dispatcher"] + source_count)
- tokens_received_.emplace_back();
- tokens_received_[queued_firings_["dispatcher"] + source_count][source] = source->token_;
- bool enough_tokens = true;
+ tokens_received_[source].push_back(source->token_);
+ bool enough_tokens = true;
for (auto const& [key, val] : predecessors_)
if (val < 1) {
enough_tokens = false;
running_instances_[instance] = running_instances_[instance] - 1;
count_[instance] = count_[instance] + 1;
if (instance == "collector") {
+ // XBT_INFO("Trigger on completion: %s - %s", get_cname(), instance.c_str());
on_this_completion(this);
on_completion(this);
for (auto const& t : successors_)
/** @param token The token to set.
* @brief Set the token to send to successors.
- * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ * @note The token is passed to each successor after the task end, i.e., after the on_completion callback.
*/
void Task::set_token(std::shared_ptr<Token> token)
{
simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
}
+void Task::deque_token_from(TaskPtr t)
+{
+ simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_.at(t).pop_front(); });
+}
+
void Task::fire(std::string instance)
{
if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) {
current_activities_[instance].pop_front();
}
- if (instance == "dispatcher") {
+ if (instance != "dispatcher" and instance != "collector") {
on_this_start(this);
on_start(this);
}
running_instances_[instance]++;
queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0);
- if (not tokens_received_.empty())
- tokens_received_.pop_front();
}
/** @param successor The Task to add.