X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/3cb644bc04ff9dd4da159790850b6bcfda970cf9:/src/plugins/task.cpp..1363ce9624f4327f3ad5c934b15736a776637dfd:/src/s4u/s4u_Task.cpp diff --git a/src/plugins/task.cpp b/src/s4u/s4u_Task.cpp similarity index 57% rename from src/plugins/task.cpp rename to src/s4u/s4u_Task.cpp index dca0c9013d..b6d8ccd7ff 100644 --- a/src/plugins/task.cpp +++ b/src/s4u/s4u_Task.cpp @@ -1,23 +1,21 @@ +#include #include -#include #include #include #include +#include #include #include "src/simgrid/module.hpp" SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr) -/** @defgroup plugin_task plugin_task Plugin Task - +/** @beginrst -This is the task plugin, enabling management of Tasks. -To activate this plugin, first call :cpp:func:`Task::init`. Tasks are designed to represent dataflows, i.e, graphs of Tasks. Tasks can only be instancied using either -:cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init` +:cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init` An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec `. A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm `. @@ -25,12 +23,7 @@ A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm ExtendedAttributeActivity::EXTENSION_ID; - -xbt::signal Task::on_start; -xbt::signal Task::on_end; +namespace simgrid::s4u { Task::Task(const std::string& name) : name_(name) {} @@ -40,7 +33,7 @@ Task::Task(const std::string& name) : name_(name) {} */ bool Task::ready_to_run() const { - return not working_ && queued_execs_ > 0; + return not working_ && queued_firings_ > 0; } /** @@ -52,8 +45,11 @@ bool Task::ready_to_run() const void Task::receive(Task* source) { XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str()); - predecessors_[source]++; - bool enough_tokens = true; + auto source_count = predecessors_[source]++; + if (tokens_received_.size() <= queued_firings_ + source_count) + tokens_received_.push_back({}); + tokens_received_[queued_firings_ + source_count][source] = source->token_; + bool enough_tokens = true; for (auto const& [key, val] : predecessors_) if (val < 1) { enough_tokens = false; @@ -62,7 +58,7 @@ void Task::receive(Task* source) if (enough_tokens) { for (auto& [key, val] : predecessors_) val--; - enqueue_execs(1); + enqueue_firings(1); } } @@ -77,11 +73,11 @@ void Task::receive(Task* source) */ void Task::complete() { - xbt_assert(s4u::Actor::is_maestro()); + xbt_assert(Actor::is_maestro()); working_ = false; count_++; - on_this_end_(this); - Task::on_end(this); + on_this_completion(this); + on_completion(this); if (current_activity_) previous_activity_ = std::move(current_activity_); for (auto const& t : successors_) @@ -90,42 +86,20 @@ void Task::complete() fire(); } -/** @ingroup plugin_task - * @brief Init the Task plugin. - * @note Add a completion callback to all Activities to call Task::complete(). - */ -void Task::init() -{ - static bool inited = false; - if (inited) - return; - - inited = true; - ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create(); - simgrid::s4u::Exec::on_completion_cb( - [](simgrid::s4u::Exec const& exec) { exec.extension()->task_->complete(); }); - simgrid::s4u::Comm::on_completion_cb( - [](simgrid::s4u::Comm const& comm) { comm.extension()->task_->complete(); }); - simgrid::s4u::Io::on_completion_cb( - [](simgrid::s4u::Io const& io) { io.extension()->task_->complete(); }); -} - -/** @ingroup plugin_task - * @param n The number of executions to enqueue. - * @brief Enqueue executions. - * @note Immediatly starts an execution if possible. +/** @param n The number of firings to enqueue. + * @brief Enqueue firing. + * @note Immediatly fire an activity if possible. */ -void Task::enqueue_execs(int n) +void Task::enqueue_firings(int n) { simgrid::kernel::actor::simcall_answered([this, n] { - queued_execs_ += n; + queued_firings_ += n; if (ready_to_run()) fire(); }); } -/** @ingroup plugin_task - * @param amount The amount to set. +/** @param amount The amount to set. * @brief Set the amout of work to do. * @note Amount in flop for ExecTask and in bytes for CommTask. */ @@ -134,8 +108,34 @@ void Task::set_amount(double amount) simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; }); } -/** @ingroup plugin_task - * @param successor The Task to add. +/** @param token The token to set. + * @brief Set the token to send to successors. + * @note The token is passed to each successor after the task end, i.e., after the on_end callback. + */ +void Task::set_token(std::shared_ptr token) +{ + simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; }); +} + +/** @return Map of tokens received for the next execution. + * @note If there is no queued execution for this task the map might not exist or be partially empty. + */ +std::shared_ptr Task::get_next_token_from(TaskPtr t) +{ + return tokens_received_.front()[t]; +} + +void Task::fire() +{ + on_this_start(this); + on_start(this); + working_ = true; + queued_firings_ = std::max(queued_firings_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); +} + +/** @param successor The Task to add. * @brief Add a successor to this Task. * @note It also adds this as a predecessor of successor. */ @@ -147,8 +147,7 @@ void Task::add_successor(TaskPtr successor) }); } -/** @ingroup plugin_task - * @param successor The Task to remove. +/** @param successor The Task to remove. * @brief Remove a successor from this Task. * @note It also remove this from the predecessors of successor. */ @@ -171,34 +170,6 @@ void Task::remove_all_successors() }); } -/** @ingroup plugin_task - * @param func The function to set. - * @brief Set a function to be called before each execution. - * @note The function is called before the underlying Activity starts. - */ -void Task::on_this_start_cb(const std::function& func) -{ - simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); }); -} - -/** @ingroup plugin_task - * @param func The function to set. - * @brief Set a function to be called after each execution. - * @note The function is called after the underlying Activity ends, but before sending tokens to successors. - */ -void Task::on_this_end_cb(const std::function& func) -{ - simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); }); -} - -/** @ingroup plugin_task - * @brief Return the number of completed executions. - */ -int Task::get_count() const -{ - return count_; -} - /** * @brief Default constructor. */ @@ -215,7 +186,7 @@ ExecTaskPtr ExecTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart Constructor. */ -ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host) +ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host) { return init(name)->set_flops(flops)->set_host(host); } @@ -227,25 +198,18 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos */ void ExecTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - s4u::ExecPtr exec = s4u::Exec::init(); - exec->set_name(name_); - exec->set_flops_amount(amount_); - exec->set_host(host_); + Task::fire(); + auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_); exec->start(); - exec->extension_set(new ExtendedAttributeActivity()); - exec->extension()->task_ = this; - current_activity_ = exec; + exec->on_this_completion_cb([this](Exec const&) { this->complete(); }); + set_current_activity(exec); } /** @ingroup plugin_task * @param host The host to set. * @brief Set a new host. */ -ExecTaskPtr ExecTask::set_host(s4u::Host* host) +ExecTaskPtr ExecTask::set_host(Host* host) { kernel::actor::simcall_answered([this, host] { host_ = host; }); return this; @@ -256,7 +220,7 @@ ExecTaskPtr ExecTask::set_host(s4u::Host* host) */ ExecTaskPtr ExecTask::set_flops(double flops) { - kernel::actor::simcall_answered([this, flops] { amount_ = flops; }); + kernel::actor::simcall_answered([this, flops] { set_amount(flops); }); return this; } @@ -276,7 +240,7 @@ CommTaskPtr CommTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart constructor. */ -CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination) +CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination) { return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination); } @@ -288,24 +252,18 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou */ void CommTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_); - comm->set_name(name_); - comm->set_payload_size(amount_); + Task::fire(); + auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount()); comm->start(); - comm->extension_set(new ExtendedAttributeActivity()); - comm->extension()->task_ = this; - current_activity_ = comm; + comm->on_this_completion_cb([this](Comm const&) { this->complete(); }); + set_current_activity(comm); } /** @ingroup plugin_task * @param source The host to set. * @brief Set a new source host. */ -CommTaskPtr CommTask::set_source(s4u::Host* source) +CommTaskPtr CommTask::set_source(Host* source) { kernel::actor::simcall_answered([this, source] { source_ = source; }); return this; @@ -315,7 +273,7 @@ CommTaskPtr CommTask::set_source(s4u::Host* source) * @param destination The host to set. * @brief Set a new destination host. */ -CommTaskPtr CommTask::set_destination(s4u::Host* destination) +CommTaskPtr CommTask::set_destination(Host* destination) { kernel::actor::simcall_answered([this, destination] { destination_ = destination; }); return this; @@ -326,7 +284,7 @@ CommTaskPtr CommTask::set_destination(s4u::Host* destination) */ CommTaskPtr CommTask::set_bytes(double bytes) { - kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); }); return this; } @@ -346,7 +304,7 @@ IoTaskPtr IoTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart Constructor. */ -IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type) +IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type) { return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type); } @@ -355,7 +313,7 @@ IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s * @param disk The disk to set. * @brief Set a new disk. */ -IoTaskPtr IoTask::set_disk(s4u::Disk* disk) +IoTaskPtr IoTask::set_disk(Disk* disk) { kernel::actor::simcall_answered([this, disk] { disk_ = disk; }); return this; @@ -366,12 +324,12 @@ IoTaskPtr IoTask::set_disk(s4u::Disk* disk) */ IoTaskPtr IoTask::set_bytes(double bytes) { - kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); }); return this; } /** @ingroup plugin_task */ -IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) +IoTaskPtr IoTask::set_op_type(Io::OpType type) { kernel::actor::simcall_answered([this, type] { type_ = type; }); return this; @@ -379,19 +337,11 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) void IoTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - s4u::IoPtr io = s4u::Io::init(); - io->set_name(name_); - io->set_size(amount_); - io->set_disk(disk_); - io->set_op_type(type_); + Task::fire(); + auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_); io->start(); - io->extension_set(new ExtendedAttributeActivity()); - io->extension()->task_ = this; - current_activity_ = io; + io->on_this_completion_cb([this](Io const&) { this->complete(); }); + set_current_activity(io); } -} // namespace simgrid::plugins +} // namespace simgrid::s4u