X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/e0f1a9dae032024c4f4043e29fa878fc2a6f7699..8dc1e5f8e4b854dc237e4fb6a90a205ec78dce2a:/src/plugins/operation.cpp diff --git a/src/plugins/operation.cpp b/src/plugins/operation.cpp index 27bb193f82..32ed8661b4 100644 --- a/src/plugins/operation.cpp +++ b/src/plugins/operation.cpp @@ -16,7 +16,7 @@ To activate this plugin, first call :cpp:func:`Operation::init`. Operations are designed to represent workflows, i.e, graphs of Operations. Operations can only be instancied using either -:cpp:func:`simgrid::plugins::ExecOp::create` or :cpp:func:`simgrid::plugins::CommOp::create` +:cpp:func:`simgrid::plugins::ExecOp::init` or :cpp:func:`simgrid::plugins::CommOp::init` An ExecOp is an Execution Operation. Its underlying Activity is an :ref:`Exec `. A CommOp is a Communication Operation. Its underlying Activity is a :ref:`Comm `. @@ -25,12 +25,11 @@ A CommOp is a Communication Operation. Its underlying Activity is a :ref:`Comm < XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Operation, kernel, "Logging specific to the operation plugin"); namespace simgrid::plugins { -Operation::Operation(const std::string& name, double amount) : name_(name), amount_(amount) {} -std::string Operation::get_name() -{ - return name_; -} +xbt::signal Operation::on_start; +xbt::signal Operation::on_end; + +Operation::Operation(const std::string& name) : name_(name) {} /** * @param predecessor The Operation to add. @@ -57,10 +56,7 @@ void Operation::remove_predecessor(Operation* predecessor) */ bool Operation::ready_to_run() const { - if (working_ or queued_execs_ <= 0) - return false; - else - return true; + return not working_ && queued_execs_ > 0; } /** @@ -82,7 +78,7 @@ void Operation::receive(Operation* source) break; } if (enough_tokens) { - for (auto [key, val] : predecessors_) + for (auto& [key, val] : predecessors_) val--; enqueue_execs(1); } @@ -91,8 +87,11 @@ void Operation::receive(Operation* source) /** * @brief Operation routine when finishing an execution. - * @note Set its working status as false. Add 1 to its count of finished executions. - * Call the on_end() func. Send a token to each of its successors. + * @note Set its working status as false. + * Add 1 to its count of finished executions. + * Call the on_this_end func. + * Fire on_end callback. + * Send a token to each of its successors. * Start a new execution if possible. */ void Operation::complete() @@ -101,7 +100,9 @@ void Operation::complete() working_ = false; count_++; }); - end_func_(this); + if (end_func_) + end_func_(this); + Operation::on_end(this); for (auto const& op : successors_) op->receive(this); if (ready_to_run()) @@ -118,7 +119,7 @@ void Operation::init() return; Operation::inited_ = true; ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create(); - simgrid::s4u::Activity::on_completion_cb([&](simgrid::s4u::Activity const& activity) { + simgrid::s4u::Activity::on_completion_cb([](simgrid::s4u::Activity const& activity) { activity.extension()->operation_->complete(); }); } @@ -169,14 +170,25 @@ void Operation::remove_successor(OperationPtr successor) successor->remove_predecessor(this); } +void Operation::remove_all_successors() +{ + simgrid::kernel::actor::simcall_answered([this] { + while (not successors_.empty()) { + auto* successor = *(successors_.begin()); + successor->predecessors_.erase(this); + successors_.erase(successor); + } + }); +} + /** @ingroup plugin_operation * @param func The function to set. * @brief Set a function to be called before each execution. * @note The function is called before the underlying Activity starts. */ -void Operation::on_start(std::function func) +void Operation::on_this_start(const std::function& func) { - simgrid::kernel::actor::simcall_answered([this, func] { start_func_ = func; }); + simgrid::kernel::actor::simcall_answered([this, &func] { start_func_ = func; }); } /** @ingroup plugin_operation @@ -184,15 +196,15 @@ void Operation::on_start(std::function func) * @brief Set a function to be called after each execution. * @note The function is called after the underlying Activity ends, but before sending tokens to successors. */ -void Operation::on_end(std::function func) +void Operation::on_this_end(const std::function& func) { - simgrid::kernel::actor::simcall_answered([this, func] { end_func_ = func; }); + simgrid::kernel::actor::simcall_answered([this, &func] { end_func_ = func; }); } /** @ingroup plugin_operation * @brief Return the number of completed executions. */ -int Operation::get_count() +int Operation::get_count() const { return count_; } @@ -200,103 +212,139 @@ int Operation::get_count() /** * @brief Default constructor. */ -ExecOp::ExecOp(const std::string& name, double flops, simgrid::s4u::Host* host) : Operation(name, flops), host_(host) {} +ExecOp::ExecOp(const std::string& name) : Operation(name) {} + +/** @ingroup plugin_operation + * @brief Smart Constructor. + */ +ExecOpPtr ExecOp::init(const std::string& name) +{ + return ExecOpPtr(new ExecOp(name)); +} /** @ingroup plugin_operation * @brief Smart Constructor. */ -ExecOpPtr ExecOp::create(const std::string& name, double flops, simgrid::s4u::Host* host) +ExecOpPtr ExecOp::init(const std::string& name, double flops, s4u::Host* host) { - auto op = ExecOpPtr(new ExecOp(name, flops, host)); - return op; + return init(name)->set_flops(flops)->set_host(host); } /** * @brief Do one execution of the Operation. - * @note Call the on_start() func. Set its working status as true. - * Create and start the underlying Activity. + * @note Call the on_this_start() func. Set its working status as true. + * Init and start the underlying Activity. */ void ExecOp::execute() { - start_func_(this); - simgrid::kernel::actor::simcall_answered([this] { + if (start_func_) + start_func_(this); + Operation::on_start(this); + kernel::actor::simcall_answered([this] { working_ = true; queued_execs_ = std::max(queued_execs_ - 1, 0); }); - simgrid::s4u::ExecPtr exec = simgrid::s4u::Exec::init(); + s4u::ExecPtr exec = s4u::Exec::init(); exec->set_name(name_); exec->set_flops_amount(amount_); exec->set_host(host_); exec->start(); exec->extension_set(new ExtendedAttributeActivity()); exec->extension()->operation_ = this; - simgrid::kernel::actor::simcall_answered([this, exec] { current_activity_ = exec; }); + kernel::actor::simcall_answered([this, exec] { current_activity_ = exec; }); } /** @ingroup plugin_operation * @param host The host to set. * @brief Set a new host. */ -void ExecOp::set_host(simgrid::s4u::Host* host) +ExecOpPtr ExecOp::set_host(s4u::Host* host) { - simgrid::kernel::actor::simcall_answered([this, host] { host_ = host; }); + kernel::actor::simcall_answered([this, host] { host_ = host; }); + return this; +} + +/** @ingroup plugin_operation + * @param flops The amount of flops to set. + */ +ExecOpPtr ExecOp::set_flops(double flops) +{ + kernel::actor::simcall_answered([this, flops] { amount_ = flops; }); + return this; } /** * @brief Default constructor. */ -CommOp::CommOp(const std::string& name, double bytes, simgrid::s4u::Host* source, simgrid::s4u::Host* destination) - : Operation(name, bytes), source_(source), destination_(destination) +CommOp::CommOp(const std::string& name) : Operation(name) {} + +/** @ingroup plugin_operation + * @brief Smart constructor. + */ +CommOpPtr CommOp::init(const std::string& name) { + return CommOpPtr(new CommOp(name)); } /** @ingroup plugin_operation * @brief Smart constructor. */ -CommOpPtr CommOp::create(const std::string& name, double bytes, simgrid::s4u::Host* source, - simgrid::s4u::Host* destination) +CommOpPtr CommOp::init(const std::string& name, double bytes, s4u::Host* source, + s4u::Host* destination) { - auto op = CommOpPtr(new CommOp(name, bytes, source, destination)); - return op; + return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination); } /** * @brief Do one execution of the Operation. - * @note Call the on_start() func. Set its working status as true. - * Create and start the underlying Activity. + * @note Call the on_this_start() func. Set its working status as true. + * Init and start the underlying Activity. */ void CommOp::execute() { - start_func_(this); - simgrid::kernel::actor::simcall_answered([this] { + if (start_func_) + start_func_(this); + Operation::on_start(this); + kernel::actor::simcall_answered([this] { working_ = true; queued_execs_ = std::max(queued_execs_ - 1, 0); }); - simgrid::s4u::CommPtr comm = simgrid::s4u::Comm::sendto_init(source_, destination_); + s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_); comm->set_name(name_); comm->set_payload_size(amount_); comm->start(); comm->extension_set(new ExtendedAttributeActivity()); comm->extension()->operation_ = this; - simgrid::kernel::actor::simcall_answered([this, comm] { current_activity_ = comm; }); + kernel::actor::simcall_answered([this, comm] { current_activity_ = comm; }); } /** @ingroup plugin_operation * @param source The host to set. * @brief Set a new source host. */ -void CommOp::set_source(simgrid::s4u::Host* source) +CommOpPtr CommOp::set_source(s4u::Host* source) { - simgrid::kernel::actor::simcall_answered([this, source] { source_ = source; }); + kernel::actor::simcall_answered([this, source] { source_ = source; }); + return this; } /** @ingroup plugin_operation * @param destination The host to set. * @brief Set a new destination host. */ -void CommOp::set_destination(simgrid::s4u::Host* destination) +CommOpPtr CommOp::set_destination(s4u::Host* destination) +{ + kernel::actor::simcall_answered([this, destination] { destination_ = destination; }); + return this; +} + +/** @ingroup plugin_operation + * @param bytes The amount of bytes to set. + */ +CommOpPtr CommOp::set_bytes(double bytes) { - simgrid::kernel::actor::simcall_answered([this, destination] { destination_ = destination; }); + kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + return this; } } // namespace simgrid::plugins