X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/74c678b8644aa28d345490294709922d4c489bf5..1363ce9624f4327f3ad5c934b15736a776637dfd:/src/s4u/s4u_Task.cpp diff --git a/src/s4u/s4u_Task.cpp b/src/s4u/s4u_Task.cpp index c0a6e38e90..b6d8ccd7ff 100644 --- a/src/s4u/s4u_Task.cpp +++ b/src/s4u/s4u_Task.cpp @@ -1,9 +1,9 @@ #include #include -#include #include #include #include +#include #include #include "src/simgrid/module.hpp" @@ -25,9 +25,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plug namespace simgrid::s4u { -xbt::signal Task::on_start; -xbt::signal Task::on_end; - Task::Task(const std::string& name) : name_(name) {} /** @@ -36,7 +33,7 @@ Task::Task(const std::string& name) : name_(name) {} */ bool Task::ready_to_run() const { - return not working_ && queued_execs_ > 0; + return not working_ && queued_firings_ > 0; } /** @@ -49,10 +46,10 @@ void Task::receive(Task* source) { XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str()); auto source_count = predecessors_[source]++; - if (tokens_received_.size() <= queued_execs_ + source_count) + if (tokens_received_.size() <= queued_firings_ + source_count) tokens_received_.push_back({}); - tokens_received_[queued_execs_ + source_count][source] = source->token_; - bool enough_tokens = true; + tokens_received_[queued_firings_ + source_count][source] = source->token_; + bool enough_tokens = true; for (auto const& [key, val] : predecessors_) if (val < 1) { enough_tokens = false; @@ -61,7 +58,7 @@ void Task::receive(Task* source) if (enough_tokens) { for (auto& [key, val] : predecessors_) val--; - enqueue_execs(1); + enqueue_firings(1); } } @@ -76,11 +73,11 @@ void Task::receive(Task* source) */ void Task::complete() { - xbt_assert(s4u::Actor::is_maestro()); + xbt_assert(Actor::is_maestro()); working_ = false; count_++; - on_this_end_(this); - Task::on_end(this); + on_this_completion(this); + on_completion(this); if (current_activity_) previous_activity_ = std::move(current_activity_); for (auto const& t : successors_) @@ -89,14 +86,14 @@ void Task::complete() fire(); } -/** @param n The number of executions to enqueue. - * @brief Enqueue executions. - * @note Immediatly starts an execution if possible. +/** @param n The number of firings to enqueue. + * @brief Enqueue firing. + * @note Immediatly fire an activity if possible. */ -void Task::enqueue_execs(int n) +void Task::enqueue_firings(int n) { simgrid::kernel::actor::simcall_answered([this, n] { - queued_execs_ += n; + queued_firings_ += n; if (ready_to_run()) fire(); }); @@ -128,6 +125,16 @@ std::shared_ptr Task::get_next_token_from(TaskPtr t) return tokens_received_.front()[t]; } +void Task::fire() +{ + on_this_start(this); + on_start(this); + working_ = true; + queued_firings_ = std::max(queued_firings_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); +} + /** @param successor The Task to add. * @brief Add a successor to this Task. * @note It also adds this as a predecessor of successor. @@ -163,34 +170,6 @@ void Task::remove_all_successors() }); } -/** @ingroup plugin_task - * @param func The function to set. - * @brief Set a function to be called before each execution. - * @note The function is called before the underlying Activity starts. - */ -void Task::on_this_start_cb(const std::function& func) -{ - simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); }); -} - -/** @ingroup plugin_task - * @param func The function to set. - * @brief Set a function to be called after each execution. - * @note The function is called after the underlying Activity ends, but before sending tokens to successors. - */ -void Task::on_this_end_cb(const std::function& func) -{ - simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); }); -} - -/** @ingroup plugin_task - * @brief Return the number of completed executions. - */ -int Task::get_count() const -{ - return count_; -} - /** * @brief Default constructor. */ @@ -207,7 +186,7 @@ ExecTaskPtr ExecTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart Constructor. */ -ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host) +ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host) { return init(name)->set_flops(flops)->set_host(host); } @@ -219,26 +198,18 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos */ void ExecTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - if (tokens_received_.size() > 0) - tokens_received_.pop_front(); - s4u::ExecPtr exec = s4u::Exec::init(); - exec->set_name(name_); - exec->set_flops_amount(amount_); - exec->set_host(host_); + Task::fire(); + auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_); exec->start(); - exec->on_this_completion_cb([this](Exec const& exec) { this->complete(); }); - current_activity_ = exec; + exec->on_this_completion_cb([this](Exec const&) { this->complete(); }); + set_current_activity(exec); } /** @ingroup plugin_task * @param host The host to set. * @brief Set a new host. */ -ExecTaskPtr ExecTask::set_host(s4u::Host* host) +ExecTaskPtr ExecTask::set_host(Host* host) { kernel::actor::simcall_answered([this, host] { host_ = host; }); return this; @@ -249,7 +220,7 @@ ExecTaskPtr ExecTask::set_host(s4u::Host* host) */ ExecTaskPtr ExecTask::set_flops(double flops) { - kernel::actor::simcall_answered([this, flops] { amount_ = flops; }); + kernel::actor::simcall_answered([this, flops] { set_amount(flops); }); return this; } @@ -269,7 +240,7 @@ CommTaskPtr CommTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart constructor. */ -CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination) +CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination) { return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination); } @@ -281,25 +252,18 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou */ void CommTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - if (tokens_received_.size() > 0) - tokens_received_.pop_front(); - s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_); - comm->set_name(name_); - comm->set_payload_size(amount_); + Task::fire(); + auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount()); comm->start(); - comm->on_this_completion_cb([this](Comm const& comm) { this->complete(); }); - current_activity_ = comm; + comm->on_this_completion_cb([this](Comm const&) { this->complete(); }); + set_current_activity(comm); } /** @ingroup plugin_task * @param source The host to set. * @brief Set a new source host. */ -CommTaskPtr CommTask::set_source(s4u::Host* source) +CommTaskPtr CommTask::set_source(Host* source) { kernel::actor::simcall_answered([this, source] { source_ = source; }); return this; @@ -309,7 +273,7 @@ CommTaskPtr CommTask::set_source(s4u::Host* source) * @param destination The host to set. * @brief Set a new destination host. */ -CommTaskPtr CommTask::set_destination(s4u::Host* destination) +CommTaskPtr CommTask::set_destination(Host* destination) { kernel::actor::simcall_answered([this, destination] { destination_ = destination; }); return this; @@ -320,7 +284,7 @@ CommTaskPtr CommTask::set_destination(s4u::Host* destination) */ CommTaskPtr CommTask::set_bytes(double bytes) { - kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); }); return this; } @@ -340,7 +304,7 @@ IoTaskPtr IoTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart Constructor. */ -IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type) +IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type) { return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type); } @@ -349,7 +313,7 @@ IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s * @param disk The disk to set. * @brief Set a new disk. */ -IoTaskPtr IoTask::set_disk(s4u::Disk* disk) +IoTaskPtr IoTask::set_disk(Disk* disk) { kernel::actor::simcall_answered([this, disk] { disk_ = disk; }); return this; @@ -360,12 +324,12 @@ IoTaskPtr IoTask::set_disk(s4u::Disk* disk) */ IoTaskPtr IoTask::set_bytes(double bytes) { - kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); }); return this; } /** @ingroup plugin_task */ -IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) +IoTaskPtr IoTask::set_op_type(Io::OpType type) { kernel::actor::simcall_answered([this, type] { type_ = type; }); return this; @@ -373,20 +337,11 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) void IoTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - if (tokens_received_.size() > 0) - tokens_received_.pop_front(); - s4u::IoPtr io = s4u::Io::init(); - io->set_name(name_); - io->set_size(amount_); - io->set_disk(disk_); - io->set_op_type(type_); + Task::fire(); + auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_); io->start(); - io->on_this_completion_cb([this](Io const& io) { this->complete(); }); - current_activity_ = io; + io->on_this_completion_cb([this](Io const&) { this->complete(); }); + set_current_activity(io); } } // namespace simgrid::s4u