X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/77ea2ef0536d5aa97f77ab76f94d31285dffef0c..6f8347f58430e00fabef8e6cbbf3fb94e6b8a49d:/src/plugins/task.cpp diff --git a/src/plugins/task.cpp b/src/plugins/task.cpp index c464f04276..835b307144 100644 --- a/src/plugins/task.cpp +++ b/src/plugins/task.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -27,30 +28,13 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plug namespace simgrid::plugins { +xbt::Extension ExtendedAttributeActivity::EXTENSION_ID; + xbt::signal Task::on_start; xbt::signal Task::on_end; Task::Task(const std::string& name) : name_(name) {} -/** - * @param predecessor The Task to add. - * @brief Add a predecessor to this Task. - */ -void Task::add_predecessor(Task* predecessor) -{ - if (predecessors_.find(predecessor) == predecessors_.end()) - simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_[predecessor] = 0; }); -} - -/** - * @param predecessor The Task to remove. - * @brief Remove a predecessor from this Task. - */ -void Task::remove_predecessor(Task* predecessor) -{ - simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_.erase(predecessor); }); -} - /** * @brief Return True if the Task can start a new Activity. * @note The Task is ready if not already doing something and there is at least one execution waiting in queue. @@ -69,21 +53,21 @@ bool Task::ready_to_run() const void Task::receive(Task* source) { XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str()); - auto it = predecessors_.find(source); - simgrid::kernel::actor::simcall_answered([this, it] { - it->second++; - bool enough_tokens = true; - for (auto const& [key, val] : predecessors_) - if (val < 1) { - enough_tokens = false; - break; - } - if (enough_tokens) { - for (auto& [key, val] : predecessors_) - val--; - enqueue_execs(1); + auto source_count = predecessors_[source]++; + if (tokens_received_.size() <= queued_execs_ + source_count) + tokens_received_.push_back({}); + tokens_received_[queued_execs_ + source_count][source] = source->token_; + bool enough_tokens = true; + for (auto const& [key, val] : predecessors_) + if (val < 1) { + enough_tokens = false; + break; } - }); + if (enough_tokens) { + for (auto& [key, val] : predecessors_) + val--; + enqueue_execs(1); + } } /** @@ -97,13 +81,13 @@ void Task::receive(Task* source) */ void Task::complete() { - simgrid::kernel::actor::simcall_answered([this] { - working_ = false; - count_++; - }); - for (auto const& end_func : end_func_handlers_) - end_func(this); + xbt_assert(s4u::Actor::is_maestro()); + working_ = false; + count_++; + on_this_end_(this); Task::on_end(this); + if (current_activity_) + previous_activity_ = std::move(current_activity_); for (auto const& t : successors_) t->receive(this); if (ready_to_run()) @@ -116,9 +100,11 @@ void Task::complete() */ void Task::init() { - if (Task::inited_) + static bool inited = false; + if (inited) return; - Task::inited_ = true; + + inited = true; ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create(); simgrid::s4u::Exec::on_completion_cb( [](simgrid::s4u::Exec const& exec) { exec.extension()->task_->complete(); }); @@ -152,6 +138,25 @@ void Task::set_amount(double amount) simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; }); } +/** @ingroup plugin_task + * @param token The token to set. + * @brief Set the token to send to successors. + * @note The token is passed to each successor after the task end, i.e., after the on_end callback. + */ +void Task::set_token(std::shared_ptr token) +{ + simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; }); +} + +/** @ingroup plugin_task + * @return Map of tokens received for the next execution. + * @note If there is no queued execution for this task the map might not exist or be partially empty. + */ +std::shared_ptr Task::get_next_token_from(TaskPtr t) +{ + return tokens_received_.front()[t]; +} + /** @ingroup plugin_task * @param successor The Task to add. * @brief Add a successor to this Task. @@ -159,8 +164,10 @@ void Task::set_amount(double amount) */ void Task::add_successor(TaskPtr successor) { - simgrid::kernel::actor::simcall_answered([this, successor] { successors_.insert(successor.get()); }); - successor->add_predecessor(this); + simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] { + successors_.insert(successor_p); + successor_p->predecessors_.try_emplace(this, 0); + }); } /** @ingroup plugin_task @@ -170,8 +177,10 @@ void Task::add_successor(TaskPtr successor) */ void Task::remove_successor(TaskPtr successor) { - simgrid::kernel::actor::simcall_answered([this, successor] { successors_.erase(successor.get()); }); - successor->remove_predecessor(this); + simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] { + successor_p->predecessors_.erase(this); + successors_.erase(successor_p); + }); } void Task::remove_all_successors() @@ -190,9 +199,9 @@ void Task::remove_all_successors() * @brief Set a function to be called before each execution. * @note The function is called before the underlying Activity starts. */ -void Task::on_this_start(const std::function& func) +void Task::on_this_start_cb(const std::function& func) { - simgrid::kernel::actor::simcall_answered([this, &func] { start_func_handlers_.push_back(func); }); + simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); }); } /** @ingroup plugin_task @@ -200,9 +209,9 @@ void Task::on_this_start(const std::function& func) * @brief Set a function to be called after each execution. * @note The function is called after the underlying Activity ends, but before sending tokens to successors. */ -void Task::on_this_end(const std::function& func) +void Task::on_this_end_cb(const std::function& func) { - simgrid::kernel::actor::simcall_answered([this, &func] { end_func_handlers_.push_back(func); }); + simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); }); } /** @ingroup plugin_task @@ -241,13 +250,12 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos */ void ExecTask::fire() { - for (auto const& start_func : start_func_handlers_) - start_func(this); + on_this_start_(this); Task::on_start(this); - kernel::actor::simcall_answered([this] { - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - }); + working_ = true; + queued_execs_ = std::max(queued_execs_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); s4u::ExecPtr exec = s4u::Exec::init(); exec->set_name(name_); exec->set_flops_amount(amount_); @@ -255,7 +263,7 @@ void ExecTask::fire() exec->start(); exec->extension_set(new ExtendedAttributeActivity()); exec->extension()->task_ = this; - kernel::actor::simcall_answered([this, exec] { current_activity_ = exec; }); + current_activity_ = exec; } /** @ingroup plugin_task @@ -305,20 +313,19 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou */ void CommTask::fire() { - for (auto const& start_func : start_func_handlers_) - start_func(this); + on_this_start_(this); Task::on_start(this); - kernel::actor::simcall_answered([this] { - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - }); + working_ = true; + queued_execs_ = std::max(queued_execs_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_); comm->set_name(name_); comm->set_payload_size(amount_); comm->start(); comm->extension_set(new ExtendedAttributeActivity()); comm->extension()->task_ = this; - kernel::actor::simcall_answered([this, comm] { current_activity_ = comm; }); + current_activity_ = comm; } /** @ingroup plugin_task @@ -399,13 +406,12 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) void IoTask::fire() { - for (auto const& start_func : start_func_handlers_) - start_func(this); + on_this_start_(this); Task::on_start(this); - kernel::actor::simcall_answered([this] { - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - }); + working_ = true; + queued_execs_ = std::max(queued_execs_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); s4u::IoPtr io = s4u::Io::init(); io->set_name(name_); io->set_size(amount_); @@ -414,11 +420,7 @@ void IoTask::fire() io->start(); io->extension_set(new ExtendedAttributeActivity()); io->extension()->task_ = this; - kernel::actor::simcall_answered([this, io] { current_activity_ = io; }); + current_activity_ = io; } } // namespace simgrid::plugins - -simgrid::xbt::Extension - simgrid::plugins::ExtendedAttributeActivity::EXTENSION_ID; -bool simgrid::plugins::Task::inited_ = false;