From: mlaurent Date: Tue, 27 Jun 2023 09:20:07 +0000 (+0200) Subject: Merge branch 'master' of https://framagit.org/simgrid/simgrid X-Git-Tag: v3.35~164^2~1 X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/1363ce9624f4327f3ad5c934b15736a776637dfd?hp=-c Merge branch 'master' of https://framagit.org/simgrid/simgrid --- 1363ce9624f4327f3ad5c934b15736a776637dfd diff --combined examples/cpp/task-io/s4u-task-io.cpp index 6301657dad,f5d3c1560c..c0f6eb0a3b --- a/examples/cpp/task-io/s4u-task-io.cpp +++ b/examples/cpp/task-io/s4u-task-io.cpp @@@ -3,7 -3,7 +3,7 @@@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ - /* This example demonstrate basic use of the task plugin. + /* This example demonstrate basic use of tasks. * * We model the following graph: * @@@ -13,27 -13,27 +13,27 @@@ * comm is a communication task. */ - #include "simgrid/plugins/task.hpp" -#include "simgrid/s4u/Task.hpp" #include "simgrid/s4u.hpp" ++#include "simgrid/s4u/Task.hpp" #include XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example"); + namespace sg4 = simgrid::s4u; int main(int argc, char* argv[]) { - simgrid::s4u::Engine e(&argc, argv); + sg4::Engine e(&argc, argv); e.load_platform(argv[1]); - simgrid::plugins::Task::init(); // Retrieve hosts auto* bob = e.host_by_name("bob"); auto* carl = e.host_by_name("carl"); // Create tasks - auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, bob); - auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, carl); - auto write = simgrid::plugins::IoTask::init("write", 1e7, bob->get_disks().front(), simgrid::s4u::Io::OpType::WRITE); - auto read = simgrid::plugins::IoTask::init("read", 1e7, carl->get_disks().front(), simgrid::s4u::Io::OpType::READ); + auto exec1 = sg4::ExecTask::init("exec1", 1e9, bob); + auto exec2 = sg4::ExecTask::init("exec2", 1e9, carl); + auto write = sg4::IoTask::init("write", 1e7, bob->get_disks().front(), sg4::Io::OpType::WRITE); + auto read = sg4::IoTask::init("read", 1e7, carl->get_disks().front(), sg4::Io::OpType::READ); // Create the graph by defining dependencies between tasks exec1->add_successor(write); @@@ -41,12 -41,12 +41,11 @@@ read->add_successor(exec2); // Add a function to be called when tasks end for log purpose - simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) { - sg4::Task::on_completion_cb([](const sg4::Task* t) { -- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); -- }); ++ sg4::Task::on_completion_cb( ++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); - // Enqueue two executions for task exec1 - exec1->enqueue_execs(2); + // Enqueue two firings for task exec1 + exec1->enqueue_firings(2); // Start the simulation e.run(); diff --combined examples/cpp/task-simple/s4u-task-simple.cpp index 3e9d14fa13,72ddf4c0d9..2d7224fc85 --- a/examples/cpp/task-simple/s4u-task-simple.cpp +++ b/examples/cpp/task-simple/s4u-task-simple.cpp @@@ -3,7 -3,7 +3,7 @@@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ - /* This example demonstrate basic use of the task plugin. + /* This example demonstrate basic use of tasks. * * We model the following graph: * @@@ -13,37 -13,37 +13,36 @@@ * comm is a communication task. */ - #include "simgrid/plugins/task.hpp" #include "simgrid/s4u.hpp" XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example"); + namespace sg4 = simgrid::s4u; + int main(int argc, char* argv[]) { - simgrid::s4u::Engine e(&argc, argv); + sg4::Engine e(&argc, argv); e.load_platform(argv[1]); - simgrid::plugins::Task::init(); // Retrieve hosts auto* tremblay = e.host_by_name("Tremblay"); auto* jupiter = e.host_by_name("Jupiter"); // Create tasks - auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, tremblay); - auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, jupiter); - auto comm = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter); + auto exec1 = sg4::ExecTask::init("exec1", 1e9, tremblay); + auto exec2 = sg4::ExecTask::init("exec2", 1e9, jupiter); + auto comm = sg4::CommTask::init("comm", 1e7, tremblay, jupiter); // Create the graph by defining dependencies between tasks exec1->add_successor(comm); comm->add_successor(exec2); // Add a function to be called when tasks end for log purpose - simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) { - sg4::Task::on_completion_cb([](const sg4::Task* t) { -- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); -- }); ++ sg4::Task::on_completion_cb( ++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); - // Enqueue two executions for task exec1 - exec1->enqueue_execs(2); + // Enqueue two firings for task exec1 + exec1->enqueue_firings(2); // Start the simulation e.run(); diff --combined examples/cpp/task-storm/s4u-task-storm.cpp index 0000000000,2c4edb1cb4..d290ca01e9 mode 000000,100644..100644 --- a/examples/cpp/task-storm/s4u-task-storm.cpp +++ b/examples/cpp/task-storm/s4u-task-storm.cpp @@@ -1,0 -1,130 +1,123 @@@ + /* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */ + + /* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + -/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html - and use them to build a simulation of a stream processing application ++/* This example takes the main concepts of Apache Storm presented here ++ https://storm.apache.org/releases/2.4.0/Concepts.html and use them to build a simulation of a stream processing ++ application + + Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes. + Spout SB produces 1e6 bytes every 200ms. + - Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes - Bolt B3 processes data from Spout SB. - Bolt B4 processes data from Bolt B3. ++ Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per ++ bytes Bolt B3 processes data from Spout SB. Bolt B4 processes data from Bolt B3. + + Fafard + ┌────┐ + ┌──►│ B1 │ + Tremblay │ └────┘ + ┌────┐ │ + │ SA ├────┤ Ginette + └────┘ │ ┌────┐ + └──►│ B2 │ + └────┘ + + + Bourassa + Jupiter ┌──────────┐ + ┌────┐ │ │ + │ SB ├─────┤ B3 ──► B4│ + └────┘ │ │ + └──────────┘ + */ + + #include "simgrid/s4u.hpp" + + XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example"); + namespace sg4 = simgrid::s4u; + + int main(int argc, char* argv[]) + { + sg4::Engine e(&argc, argv); + e.load_platform(argv[1]); + + // Retrieve hosts + auto tremblay = e.host_by_name("Tremblay"); + auto jupiter = e.host_by_name("Jupiter"); - auto fafard = e.host_by_name("Fafard"); ++ auto fafard = e.host_by_name("Fafard"); + auto ginette = e.host_by_name("Ginette"); + auto bourassa = e.host_by_name("Bourassa"); + + // Create execution tasks + auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay); + auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter); + auto B1 = sg4::ExecTask::init("B1", 1e8, fafard); + auto B2 = sg4::ExecTask::init("B2", 1e8, ginette); + auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa); + auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa); + + // Create communication tasks + auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard); + auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette); + auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa); + + // Create the graph by defining dependencies between tasks + // Some dependencies are defined dynamically + SA_to_B1->add_successor(B1); + SA_to_B2->add_successor(B2); + SB->add_successor(SB_to_B3); + SB_to_B3->add_successor(B3); + B3->add_successor(B4); + + /* Dynamic modification of the graph and bytes sent + Alternatively we: remove/add the link between SA and SA_to_B2 + add/remove the link between SA and SA_to_B1 + */ - SA->on_this_start_cb([SA_to_B1,SA_to_B2](sg4::Task* t) { ++ SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) { + int count = t->get_count(); + sg4::CommTaskPtr comm; + if (count % 2 == 0) { + t->remove_successor(SA_to_B2); + t->add_successor(SA_to_B1); + comm = SA_to_B1; - } - else { ++ } else { + t->remove_successor(SA_to_B1); + t->add_successor(SA_to_B2); + comm = SA_to_B2; + } - std::vector amount = {1e3,1e6,1e9}; ++ std::vector amount = {1e3, 1e6, 1e9}; + comm->set_amount(amount[count % 3]); + auto token = std::make_shared(); + token->set_data(new double(amount[count % 3])); + t->set_token(token); + }); + + // The token sent by SA is forwarded by both communication tasks - SA_to_B1->on_this_start_cb([SA](sg4::Task* t) { - t->set_token(t->get_next_token_from(SA)); - }); - SA_to_B2->on_this_start_cb([SA](sg4::Task* t) { - t->set_token(t->get_next_token_from(SA)); - }); ++ SA_to_B1->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); }); ++ SA_to_B2->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); }); + + /* B1 and B2 read the value of the token received by their predecessors + and use it to adapt their amount of work to do. + */ + B1->on_this_start_cb([SA_to_B1](sg4::Task* t) { + auto data = t->get_next_token_from(SA_to_B1)->get_unique_data(); + t->set_amount(*data * 10); + }); + B2->on_this_start_cb([SA_to_B2](sg4::Task* t) { + auto data = t->get_next_token_from(SA_to_B2)->get_unique_data(); + t->set_amount(*data * 10); + }); + + // Enqueue firings for tasks without predecessors + SA->enqueue_firings(5); + SB->enqueue_firings(5); + + // Add a function to be called when tasks end for log purpose - sg4::Task::on_completion_cb([] - (const sg4::Task* t) { - XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); - }); ++ sg4::Task::on_completion_cb( ++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); + + // Start the simulation + e.run(); + return 0; + } diff --combined examples/cpp/task-switch-host/s4u-task-switch-host.cpp index 6ebca129da,7023f68b48..252f3382e3 --- a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp +++ b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp @@@ -16,16 -16,15 +16,15 @@@ * With exec1 and exec2 on different hosts. */ - #include "simgrid/plugins/task.hpp" #include "simgrid/s4u.hpp" XBT_LOG_NEW_DEFAULT_CATEGORY(task_switch_host, "Messages specific for this task example"); + namespace sg4 = simgrid::s4u; int main(int argc, char* argv[]) { - simgrid::s4u::Engine e(&argc, argv); + sg4::Engine e(&argc, argv); e.load_platform(argv[1]); - simgrid::plugins::Task::init(); // Retrieve hosts auto* tremblay = e.host_by_name("Tremblay"); @@@ -33,13 -32,13 +32,13 @@@ auto* fafard = e.host_by_name("Fafard"); // Create tasks - auto comm0 = simgrid::plugins::CommTask::init("comm0"); + auto comm0 = sg4::CommTask::init("comm0"); comm0->set_bytes(1e7); comm0->set_source(tremblay); - auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, jupiter); - auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, fafard); - auto comm1 = simgrid::plugins::CommTask::init("comm1", 1e7, jupiter, tremblay); - auto comm2 = simgrid::plugins::CommTask::init("comm2", 1e7, fafard, tremblay); + auto exec1 = sg4::ExecTask::init("exec1", 1e9, jupiter); + auto exec2 = sg4::ExecTask::init("exec2", 1e9, fafard); + auto comm1 = sg4::CommTask::init("comm1", 1e7, jupiter, tremblay); + auto comm2 = sg4::CommTask::init("comm2", 1e7, fafard, tremblay); // Create the initial graph by defining dependencies between tasks comm0->add_successor(exec2); @@@ -47,15 -46,14 +46,13 @@@ exec2->add_successor(comm2); // Add a function to be called when tasks end for log purpose - simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) { - sg4::Task::on_completion_cb([](const sg4::Task* t) { -- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); -- }); ++ sg4::Task::on_completion_cb( ++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); - // Add a function to be called before each executions of comm0 + // Add a function to be called before each firing of comm0 // This function modifies the graph of tasks by adding or removing // successors to comm0 - comm0->on_this_start_cb([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) { - auto* comm0 = dynamic_cast(t); + comm0->on_this_start_cb([comm0, exec1, exec2, jupiter, fafard](sg4::Task*) { static int count = 0; if (count % 2 == 0) { comm0->set_destination(jupiter); @@@ -69,8 -67,8 +66,8 @@@ count++; }); - // Enqueue four executions for task comm0 - comm0->enqueue_execs(4); + // Enqueue four firings for task comm0 + comm0->enqueue_firings(4); // Start the simulation e.run(); diff --combined examples/cpp/task-variable-load/s4u-task-variable-load.cpp index 960fb660a6,0955c2e2a4..fff790dc68 --- a/examples/cpp/task-variable-load/s4u-task-variable-load.cpp +++ b/examples/cpp/task-variable-load/s4u-task-variable-load.cpp @@@ -13,50 -13,49 +13,48 @@@ * With a heavy load there is a burst of comm before the exec task can even finish once. */ - #include "simgrid/plugins/task.hpp" #include "simgrid/s4u.hpp" XBT_LOG_NEW_DEFAULT_CATEGORY(task_variable_load, "Messages specific for this s4u example"); + namespace sg4 = simgrid::s4u; - static void variable_load(simgrid::plugins::TaskPtr t) + static void variable_load(sg4::TaskPtr t) { XBT_INFO("--- Small load ---"); for (int i = 0; i < 3; i++) { - t->enqueue_execs(1); - simgrid::s4u::this_actor::sleep_for(100); + t->enqueue_firings(1); + sg4::this_actor::sleep_for(100); } - simgrid::s4u::this_actor::sleep_until(1000); + sg4::this_actor::sleep_until(1000); XBT_INFO("--- Heavy load ---"); for (int i = 0; i < 3; i++) { - t->enqueue_execs(1); - simgrid::s4u::this_actor::sleep_for(1); + t->enqueue_firings(1); + sg4::this_actor::sleep_for(1); } } int main(int argc, char* argv[]) { - simgrid::s4u::Engine e(&argc, argv); + sg4::Engine e(&argc, argv); e.load_platform(argv[1]); - simgrid::plugins::Task::init(); // Retreive hosts auto* tremblay = e.host_by_name("Tremblay"); auto* jupiter = e.host_by_name("Jupiter"); // Create tasks - auto comm = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter); - auto exec = simgrid::plugins::ExecTask::init("exec", 1e9, jupiter); + auto comm = sg4::CommTask::init("comm", 1e7, tremblay, jupiter); + auto exec = sg4::ExecTask::init("exec", 1e9, jupiter); // Create the graph by defining dependencies between tasks comm->add_successor(exec); // Add a function to be called when tasks end for log purpose - simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) { - sg4::Task::on_completion_cb([](const sg4::Task* t) { -- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); -- }); ++ sg4::Task::on_completion_cb( ++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); // Create the actor that will inject load during the simulation - simgrid::s4u::Actor::create("input", tremblay, variable_load, comm); + sg4::Actor::create("input", tremblay, variable_load, comm); // Start the simulation e.run(); diff --combined include/simgrid/s4u/Activity.hpp index 3c6a4eb725,4f73b17506..46ab38b77e --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@@ -114,10 -114,6 +114,6 @@@ protected virtual void fire_on_this_veto() const = 0; public: - XBT_ATTRIB_DEPRECATED_v334("All start() are vetoable now. Please use start() ") void vetoable_start() - { - start(); - } void start() { state_ = State::STARTING; @@@ -267,10 -263,10 +263,16 @@@ public * dependency or no resource assigned) */ void on_this_veto_cb(const std::function& cb) { on_this_veto.connect(cb); } - XBT_ATTRIB_DEPRECATED_v337("Please use on_suspend_cb() instead") static void on_suspended_cb( - const std::function& cb) { on_suspend.connect(cb); } - XBT_ATTRIB_DEPRECATED_v337("Please use on_resume_cb() instead") static void on_resumed_cb( - const std::function& cb) { on_resume.connect(cb); } + XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb( - const std::function& cb) { on_suspend.connect(cb); } ++ const std::function& cb) ++ { ++ on_suspend.connect(cb); ++ } + XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb( - const std::function& cb) { on_resume.connect(cb); } ++ const std::function& cb) ++ { ++ on_resume.connect(cb); ++ } AnyActivity* add_successor(ActivityPtr a) { @@@ -299,20 -295,6 +301,6 @@@ } const std::string& get_tracing_category() const { return tracing_category_; } - XBT_ATTRIB_DEPRECATED_v334("Please use Activity::set_data()") AnyActivity* set_user_data(void* data) - { - set_data(data); - return static_cast(this); - } - - XBT_ATTRIB_DEPRECATED_v334("Please use Activity::get_data<>()") void* get_user_data() const - { - return get_data(); - } - XBT_ATTRIB_DEPRECATED_v334("All start() are vetoable now. Please use start() ") AnyActivity* vetoable_start() - { - return start(); - } AnyActivity* start() { Activity::start(); diff --combined include/simgrid/s4u/Link.hpp index 3e0832bba5,13a92bc7d5..c79ca462ac --- a/include/simgrid/s4u/Link.hpp +++ b/include/simgrid/s4u/Link.hpp @@@ -133,10 -133,10 +133,7 @@@ public double get_load() const; #ifndef DOXYGEN - XBT_ATTRIB_DEPRECATED_v337("Please use get_load() instead") double get_usage() const - XBT_ATTRIB_DEPRECATED_v338("Please use get_load() instead") double get_usage() const -- { -- return get_load(); -- } ++ XBT_ATTRIB_DEPRECATED_v338("Please use get_load() instead") double get_usage() const { return get_load(); } #endif /** @brief Check if the Link is used (at least one flow uses the link) */ @@@ -202,7 -202,7 +199,7 @@@ public on_this_destruction.connect(cb); } - XBT_ATTRIB_DEPRECATED_v337("Please use on_onoff_cb() instead") static void on_state_change_cb( + XBT_ATTRIB_DEPRECATED_v338("Please use on_onoff_cb() instead") static void on_state_change_cb( const std::function& cb) { on_onoff.connect(cb); diff --combined include/simgrid/s4u/Task.hpp index 8143333d86,c0677da108..5c8050b86f --- a/include/simgrid/s4u/Task.hpp +++ b/include/simgrid/s4u/Task.hpp @@@ -1,16 -1,17 +1,17 @@@ - #ifndef SIMGRID_PLUGINS_TASK_H_ - #define SIMGRID_PLUGINS_TASK_H_ + #ifndef SIMGRID_S4U_TASK_H_ + #define SIMGRID_S4U_TASK_H_ #include #include #include #include + #include #include #include #include - namespace simgrid::plugins { + namespace simgrid::s4u { class Task; using TaskPtr = boost::intrusive_ptr; @@@ -23,59 -24,68 +24,68 @@@ using CommTaskPtr = boost::intrusive_pt class IoTask; using IoTaskPtr = boost::intrusive_ptr; - struct ExtendedAttributeActivity { - static simgrid::xbt::Extension EXTENSION_ID; - Task* task_; - }; + class XBT_PUBLIC Token : public xbt::Extendable {}; class Task { + std::string name_; + double amount_; + int queued_firings_ = 0; - int count_ = 0; - bool working_ = false; ++ int count_ = 0; ++ bool working_ = false; + std::set successors_ = {}; std::map predecessors_ = {}; + std::atomic_int_fast32_t refcount_{0}; bool ready_to_run() const; void receive(Task* source); - void complete(); + + std::shared_ptr token_ = nullptr; + std::deque>> tokens_received_; + ActivityPtr previous_activity_; + ActivityPtr current_activity_; protected: - std::string name_; - double amount_; - int queued_execs_ = 0; - int count_ = 0; - bool working_ = false; - s4u::ActivityPtr previous_activity_; - s4u::ActivityPtr current_activity_; - xbt::signal on_this_start_; - xbt::signal on_this_end_; explicit Task(const std::string& name); -- virtual ~Task() = default; - virtual void fire() = 0; ++ virtual ~Task() = default; - static xbt::signal on_start; - static xbt::signal on_end; - std::atomic_int_fast32_t refcount_{0}; + virtual void fire(); + void complete(); + - void set_current_activity (ActivityPtr a) { current_activity_ = a; } ++ void set_current_activity(ActivityPtr a) { current_activity_ = a; } + + inline static xbt::signal on_start; + xbt::signal on_this_start; + inline static xbt::signal on_completion; + xbt::signal on_this_completion; public: - static void init(); const std::string& get_name() const { return name_; } const char* get_cname() const { return name_.c_str(); } - void enqueue_execs(int n); void set_amount(double amount); double get_amount() const { return amount_; } + int get_count() const { return count_; } + + void set_token(std::shared_ptr token); + std::shared_ptr get_next_token_from(TaskPtr t); + void add_successor(TaskPtr t); void remove_successor(TaskPtr t); void remove_all_successors(); const std::set& get_successors() const { return successors_; } - void on_this_start_cb(const std::function& func); - void on_this_end_cb(const std::function& func); - int get_count() const; - /** Add a callback fired before a task activity start. + void enqueue_firings(int n); + + /** Add a callback fired before this task activity starts */ - void on_this_start_cb(const std::function& func){ on_this_start.connect(func); } ++ void on_this_start_cb(const std::function& func) { on_this_start.connect(func); } + /** Add a callback fired before a task activity starts. * Triggered after the on_this_start function**/ static void on_start_cb(const std::function& cb) { on_start.connect(cb); } - /** Add a callback fired after a task activity end. - * Triggered after the on_this_end function, but before - * sending tokens to successors.**/ - static void on_end_cb(const std::function& cb) { on_end.connect(cb); } + /** Add a callback fired before this task activity ends */ + void on_this_completion_cb(const std::function& func) { on_this_completion.connect(func); }; + /** Add a callback fired after a task activity ends. + * Triggered after the on_this_end function, but before sending tokens to successors.**/ + static void on_completion_cb(const std::function& cb) { on_completion.connect(cb); } #ifndef DOXYGEN friend void intrusive_ptr_release(Task* o) @@@ -89,54 -99,57 +99,57 @@@ #endif }; - class ExecTask : public Task { - s4u::Host* host_; + class CommTask : public Task { + Host* source_; + Host* destination_; - explicit ExecTask(const std::string& name); + explicit CommTask(const std::string& name); void fire() override; public: - static ExecTaskPtr init(const std::string& name); - static ExecTaskPtr init(const std::string& name, double flops, s4u::Host* host); - ExecTaskPtr set_host(s4u::Host* host); - s4u::Host* get_host() const { return host_; } - ExecTaskPtr set_flops(double flops); - double get_flops() const { return get_amount(); } + static CommTaskPtr init(const std::string& name); + static CommTaskPtr init(const std::string& name, double bytes, Host* source, Host* destination); + + CommTaskPtr set_source(Host* source); + Host* get_source() const { return source_; } + CommTaskPtr set_destination(Host* destination); + Host* get_destination() const { return destination_; } + CommTaskPtr set_bytes(double bytes); + double get_bytes() const { return get_amount(); } }; - class CommTask : public Task { - s4u::Host* source_; - s4u::Host* destination_; + class ExecTask : public Task { + Host* host_; - explicit CommTask(const std::string& name); + explicit ExecTask(const std::string& name); void fire() override; public: - static CommTaskPtr init(const std::string& name); - static CommTaskPtr init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination); - CommTaskPtr set_source(s4u::Host* source); - s4u::Host* get_source() const { return source_; } - CommTaskPtr set_destination(s4u::Host* destination); - s4u::Host* get_destination() const { return destination_; } - CommTaskPtr set_bytes(double bytes); - double get_bytes() const { return get_amount(); } + static ExecTaskPtr init(const std::string& name); + static ExecTaskPtr init(const std::string& name, double flops, Host* host); + + ExecTaskPtr set_host(Host* host); + Host* get_host() const { return host_; } + ExecTaskPtr set_flops(double flops); + double get_flops() const { return get_amount(); } }; class IoTask : public Task { - s4u::Disk* disk_; - s4u::Io::OpType type_; + Disk* disk_; + Io::OpType type_; explicit IoTask(const std::string& name); void fire() override; public: static IoTaskPtr init(const std::string& name); - static IoTaskPtr init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type); - IoTaskPtr set_disk(s4u::Disk* disk); - s4u::Disk* get_disk() const { return disk_; } + static IoTaskPtr init(const std::string& name, double bytes, Disk* disk, Io::OpType type); + + IoTaskPtr set_disk(Disk* disk); + Disk* get_disk() const { return disk_; } IoTaskPtr set_bytes(double bytes); double get_bytes() const { return get_amount(); } - IoTaskPtr set_op_type(s4u::Io::OpType type); - s4u::Io::OpType get_op_type() const { return type_; } + IoTaskPtr set_op_type(Io::OpType type); + Io::OpType get_op_type() const { return type_; } }; - } // namespace simgrid::plugins + } // namespace simgrid::s4u #endif diff --combined src/bindings/python/simgrid_python.cpp index 5f0f33599c,9f0c715caf..268750fb19 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@@ -11,7 -11,6 +11,6 @@@ #include "simgrid/kernel/ProfileBuilder.hpp" #include "simgrid/kernel/routing/NetPoint.hpp" #include - #include #include #include #include @@@ -25,6 -24,7 +24,7 @@@ #include #include #include + #include #include #include @@@ -33,30 -33,30 +33,30 @@@ #include namespace py = pybind11; - using simgrid::plugins::CommTask; - using simgrid::plugins::CommTaskPtr; - using simgrid::plugins::ExecTask; - using simgrid::plugins::ExecTaskPtr; - using simgrid::plugins::IoTask; - using simgrid::plugins::IoTaskPtr; - using simgrid::plugins::Task; - using simgrid::plugins::TaskPtr; -using simgrid::s4u::CommTask; -using simgrid::s4u::CommTaskPtr; -using simgrid::s4u::ExecTask; -using simgrid::s4u::ExecTaskPtr; -using simgrid::s4u::IoTask; -using simgrid::s4u::IoTaskPtr; -using simgrid::s4u::Task; -using simgrid::s4u::TaskPtr; using simgrid::s4u::Actor; using simgrid::s4u::ActorPtr; using simgrid::s4u::Barrier; using simgrid::s4u::BarrierPtr; using simgrid::s4u::Comm; using simgrid::s4u::CommPtr; ++using simgrid::s4u::CommTask; ++using simgrid::s4u::CommTaskPtr; using simgrid::s4u::Disk; using simgrid::s4u::Engine; ++using simgrid::s4u::ExecTask; ++using simgrid::s4u::ExecTaskPtr; using simgrid::s4u::Host; using simgrid::s4u::Io; ++using simgrid::s4u::IoTask; ++using simgrid::s4u::IoTaskPtr; using simgrid::s4u::Link; using simgrid::s4u::Mailbox; using simgrid::s4u::Mutex; using simgrid::s4u::MutexPtr; using simgrid::s4u::Semaphore; using simgrid::s4u::SemaphorePtr; ++using simgrid::s4u::Task; ++using simgrid::s4u::TaskPtr; XBT_LOG_NEW_DEFAULT_CATEGORY(python, "python"); @@@ -171,54 -171,16 +171,16 @@@ PYBIND11_MODULE(simgrid, m return new simgrid::s4u::Engine(&argc, argv.data()); }), "The constructor should take the parameters from the command line, as is ") - .def_static("get_clock", - []() // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx( - PyExc_DeprecationWarning, - "get_clock() is deprecated and will be dropped after v3.33, use `Engine.clock` instead.", 1); - return Engine::get_clock(); - }) .def_property_readonly_static( "clock", [](py::object /* self */) { return Engine::get_clock(); }, "The simulation time, ie the amount of simulated seconds since the simulation start.") .def_property_readonly_static( "instance", [](py::object /* self */) { return Engine::get_instance(); }, "Retrieve the simulation engine") - .def("get_all_hosts", - [](py::object self) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx(PyExc_DeprecationWarning, - "get_all_hosts() is deprecated and will be dropped after v3.33, use all_hosts instead.", 1); - return self.attr("all_hosts"); - }) .def("host_by_name", &Engine::host_by_name_or_null, "Retrieve a host by its name, or None if it does not exist in the platform.") .def_property_readonly("all_hosts", &Engine::get_all_hosts, "Returns the list of all hosts found in the platform") - .def("get_all_links", - [](py::object self) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx(PyExc_DeprecationWarning, - "get_all_links() is deprecated and will be dropped after v3.33, use all_links instead.", 1); - return self.attr("all_links"); - }) .def_property_readonly("all_links", &Engine::get_all_links, "Returns the list of all links found in the platform") - .def("get_all_netpoints", - [](py::object self) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx( - PyExc_DeprecationWarning, - "get_all_netpoints() is deprecated and will be dropped after v3.33, use all_netpoints instead.", 1); - return self.attr("all_netpoints"); - }) .def_property_readonly("all_netpoints", &Engine::get_all_netpoints) - .def("get_netzone_root", - [](py::object self) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx( - PyExc_DeprecationWarning, - "get_netzone_root() is deprecated and will be dropped after v3.33, use netzone_root instead.", 1); - return self.attr("netzone_root"); - }) .def_property_readonly("netzone_root", &Engine::get_netzone_root, "Retrieve the root netzone, containing all others.") .def("netpoint_by_name", &Engine::netpoint_by_name_or_null) @@@ -311,13 -273,6 +273,6 @@@ .def("create_router", &simgrid::s4u::NetZone::create_router, "Create a router") .def("set_parent", &simgrid::s4u::NetZone::set_parent, "Set the parent of this zone") .def("set_property", &simgrid::s4u::NetZone::set_property, "Add a property to this zone") - .def("get_netpoint", - [](py::object self) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx(PyExc_DeprecationWarning, - "get_netpoint() is deprecated and will be dropped after v3.33, use netpoint instead.", 1); - return self.attr("netpoint"); - }) .def_property_readonly("netpoint", &simgrid::s4u::NetZone::get_netpoint, "Retrieve the netpoint associated to this zone") .def("seal", &simgrid::s4u::NetZone::seal, "Seal this NetZone") @@@ -392,42 -347,11 +347,11 @@@ " \"\"\"\n\n" "The second function parameter is the periodicity: the time to wait after the last event to start again over " "the list. Set it to -1 to not loop over.") - .def("get_pstate_count", - [](py::object self) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx( - PyExc_DeprecationWarning, - "get_pstate_count() is deprecated and will be dropped after v3.33, use pstate_count instead.", 1); - return self.attr("pstate_count"); - }) .def_property_readonly("pstate_count", &Host::get_pstate_count, "Retrieve the count of defined pstate levels") - .def("get_pstate_speed", - [](py::object self, int state) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx( - PyExc_DeprecationWarning, - "get_pstate_speed() is deprecated and will be dropped after v3.33, use pstate_speed instead.", 1); - return self.attr("pstate_speed")(state); - }) .def("pstate_speed", &Host::get_pstate_speed, "Retrieve the maximal speed at the given pstate") - .def("get_netpoint", - [](py::object self) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx(PyExc_DeprecationWarning, - "get_netpoint() is deprecated and will be dropped after v3.33, use netpoint instead.", 1); - return self.attr("netpoint"); - }) .def_property_readonly("netpoint", &Host::get_netpoint, "Retrieve the netpoint associated to this zone") .def_property_readonly("disks", &Host::get_disks, "The list of disks on this host (read-only).") .def("get_disks", &Host::get_disks, "Retrieve the list of disks in this host") - .def("set_core_count", - [](py::object self, double count) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx(PyExc_DeprecationWarning, - "set_core_count() is deprecated and will be dropped after v3.33, use core_count instead.", - 1); - self.attr("core_count")(count); - }) .def_property("core_count", &Host::get_core_count, py::cpp_function(&Host::set_core_count, py::call_guard()), "Manage the number of cores in the CPU") @@@ -622,21 -546,7 +546,7 @@@ /* Class Split-Duplex Link */ py::class_>( m, "SplitDuplexLink", "Network split-duplex link") - .def("get_link_up", - [](py::object self) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx(PyExc_DeprecationWarning, - "get_link_up() is deprecated and will be dropped after v3.33, use link_up instead.", 1); - return self.attr("link_up"); - }) .def_property_readonly("link_up", &simgrid::s4u::SplitDuplexLink::get_link_up, "Get link direction up") - .def("get_link_down", - [](py::object self) // XBT_ATTRIB_DEPRECATED_v334 - { - PyErr_WarnEx(PyExc_DeprecationWarning, - "get_link_down() is deprecated and will be dropped after v3.33, use link_down instead.", 1); - return self.attr("link_down"); - }) .def_property_readonly("link_down", &simgrid::s4u::SplitDuplexLink::get_link_down, "Get link direction down"); /* Class Mailbox */ @@@ -921,7 -831,6 +831,6 @@@ /* Class Task */ py::class_(m, "Task", "Task. See the C++ documentation for details.") - .def_static("init", &Task::init) .def_static( "on_start_cb", [](py::object cb) { @@@ -934,11 -843,11 +843,11 @@@ }, "Add a callback called when each task starts.") .def_static( - "on_end_cb", + "on_completion_cb", [](py::object cb) { cb.inc_ref(); // keep alive after return const py::gil_scoped_release gil_release; - Task::on_end_cb([cb_p = cb.ptr()](Task* op) { + Task::on_completion_cb([cb_p = cb.ptr()](Task* op) { const py::gil_scoped_acquire py_context; // need a new context for callback py::reinterpret_borrow(cb_p)(op); }); @@@ -948,8 -857,8 +857,8 @@@ .def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).") .def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).") .def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.") - .def("enqueue_execs", py::overload_cast(&Task::enqueue_execs), py::call_guard(), - py::arg("n"), "Enqueue executions for this task.") + .def("enqueue_firings", py::overload_cast(&Task::enqueue_firings), py::call_guard(), + py::arg("n"), "Enqueue firings for this task.") .def("add_successor", py::overload_cast(&Task::add_successor), py::call_guard(), py::arg("op"), "Add a successor to this task.") .def("remove_successor", py::overload_cast(&Task::remove_successor), @@@ -958,7 -867,7 +867,7 @@@ "Remove all successors of this task.") .def("on_this_start_cb", py::overload_cast&>(&Task::on_this_start_cb), py::arg("func"), "Add a callback called when this task starts.") - .def("on_this_end_cb", py::overload_cast&>(&Task::on_this_end_cb), + .def("on_this_completion_cb", py::overload_cast&>(&Task::on_this_completion_cb), py::arg("func"), "Add a callback called when this task ends.") .def( "__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; }, diff --combined src/mc/api/strategy/MaxMatchComm.hpp index a2f1d601a5,1f9a826746..0bc3dade6a --- a/src/mc/api/strategy/MaxMatchComm.hpp +++ b/src/mc/api/strategy/MaxMatchComm.hpp @@@ -13,7 -13,6 +13,6 @@@ namespace simgrid::mc /** Wait MC guiding class that aims at minimizing the number of in-fly communication. * When possible, it will try to match corresponding in-fly communications. */ class MaxMatchComm : public Strategy { - /** Stores for each mailbox what kind of transition is waiting on it. * Negative number means that much recv are waiting on that mailbox, while * a positiv number means that much send are waiting there. */ @@@ -56,10 -55,9 +55,9 @@@ public if (mailbox_.count(cast_recv->get_mailbox()) > 0 and mailbox_.at(cast_recv->get_mailbox()) > 0) { aid_value--; // This means we have waiting recv corresponding to this recv -- } else { -- aid_value++; - -- } ++ } else { ++ aid_value++; ++ } } const CommSendTransition* cast_send = dynamic_cast(transition); @@@ -92,7 -90,6 +90,6 @@@ if (cast_send != nullptr) last_mailbox_ = cast_send->get_mailbox(); } - }; } // namespace simgrid::mc diff --combined src/s4u/s4u_Comm.cpp index d9a610138a,315a2f82f4..a36df4e27e --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@@ -29,7 -29,7 +29,7 @@@ CommPtr Comm::set_copy_data_callback(co } void Comm::copy_buffer_callback(kernel::activity::CommImpl* comm, void* buff, - size_t buff_size) // XBT_ATTRIB_DEPRECATED_v337 + size_t buff_size) // XBT_ATTRIB_DEPRECATED_v338 { XBT_DEBUG("Copy the data over"); memcpy(comm->dst_buff_, buff, buff_size); @@@ -41,7 -41,7 +41,7 @@@ } void Comm::copy_pointer_callback(kernel::activity::CommImpl* comm, void* buff, - size_t buff_size) // XBT_ATTRIB_DEPRECATED_v337 + size_t buff_size) // XBT_ATTRIB_DEPRECATED_v338 { xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size); *(void**)(comm->dst_buff_) = buff; @@@ -78,13 -78,12 +78,13 @@@ void Comm::send(kernel::actor::ActorImp simgrid::kernel::activity::ActivityImplPtr comm = nullptr; simgrid::kernel::actor::CommIsendSimcall send_observer{ - sender, mbox->get_impl(), task_size, rate, static_cast(src_buff), src_buff_size, match_fun, - nullptr, copy_data_fun, data, false}; + sender, mbox->get_impl(), task_size, rate, static_cast(src_buff), + src_buff_size, match_fun, nullptr, copy_data_fun, data, + false, "Isend"}; comm = simgrid::kernel::actor::simcall_answered( [&send_observer] { return simgrid::kernel::activity::CommImpl::isend(&send_observer); }, &send_observer); - if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout}; + if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout, "Wait"}; simgrid::kernel::actor::simcall_blocking( [&wait_observer] { wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout()); @@@ -96,7 -95,7 +96,7 @@@ } else { simgrid::kernel::actor::CommIsendSimcall observer(sender, mbox->get_impl(), task_size, rate, static_cast(src_buff), src_buff_size, match_fun, - nullptr, copy_data_fun, data, false); + nullptr, copy_data_fun, data, false, "Isend"); simgrid::kernel::actor::simcall_blocking([&observer, timeout] { simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend(&observer); comm->wait_for(observer.get_issuer(), timeout); @@@ -123,12 -122,11 +123,12 @@@ void Comm::recv(kernel::actor::ActorImp match_fun, copy_data_fun, data, - rate}; + rate, + "Irecv"}; comm = simgrid::kernel::actor::simcall_answered( [&observer] { return simgrid::kernel::activity::CommImpl::irecv(&observer); }, &observer); - if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout}; + if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout, "wait"}; simgrid::kernel::actor::simcall_blocking( [&wait_observer] { wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout()); @@@ -139,7 -137,7 +139,7 @@@ comm = nullptr; } else { simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox->get_impl(), static_cast(dst_buff), - dst_buff_size, match_fun, copy_data_fun, data, rate); + dst_buff_size, match_fun, copy_data_fun, data, rate, "Irecv"); simgrid::kernel::actor::simcall_blocking([&observer, timeout] { simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv(&observer); comm->wait_for(observer.get_issuer(), timeout); @@@ -333,8 -331,7 +333,8 @@@ Comm* Comm::do_start( clean_fun_, copy_data_function_, get_data(), - detached_}; + detached_, + "Isend"}; pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); }, &observer); } else if (dst_buff_ != nullptr) { // Receiver side @@@ -348,8 -345,7 +348,8 @@@ match_fun_, copy_data_function_, get_data(), - rate_}; + rate_, + "Irecv"}; pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); }, &observer); } else { @@@ -426,7 -422,7 +426,7 @@@ Comm* Comm::wait_for(double timeout case State::STARTED: try { issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout}; + kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "Wait"}; if (kernel::actor::simcall_blocking( [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); }, &observer)) { diff --combined src/s4u/s4u_Task.cpp index dca0c9013d,bda7ec170e..b6d8ccd7ff --- a/src/s4u/s4u_Task.cpp +++ b/src/s4u/s4u_Task.cpp @@@ -1,23 -1,21 +1,21 @@@ + #include #include - #include -#include #include #include #include ++#include #include #include "src/simgrid/module.hpp" SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr) - /** @defgroup plugin_task plugin_task Plugin Task - + /** @beginrst - This is the task plugin, enabling management of Tasks. - To activate this plugin, first call :cpp:func:`Task::init`. Tasks are designed to represent dataflows, i.e, graphs of Tasks. Tasks can only be instancied using either - :cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init` + :cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init` An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec `. A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm `. @@@ -25,12 -23,7 +23,7 @@@ */ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin"); - namespace simgrid::plugins { - - xbt::Extension ExtendedAttributeActivity::EXTENSION_ID; - - xbt::signal Task::on_start; - xbt::signal Task::on_end; + namespace simgrid::s4u { Task::Task(const std::string& name) : name_(name) {} @@@ -40,7 -33,7 +33,7 @@@ */ bool Task::ready_to_run() const { - return not working_ && queued_execs_ > 0; + return not working_ && queued_firings_ > 0; } /** @@@ -52,8 -45,11 +45,11 @@@ void Task::receive(Task* source) { XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str()); - predecessors_[source]++; - bool enough_tokens = true; + auto source_count = predecessors_[source]++; + if (tokens_received_.size() <= queued_firings_ + source_count) + tokens_received_.push_back({}); + tokens_received_[queued_firings_ + source_count][source] = source->token_; - bool enough_tokens = true; ++ bool enough_tokens = true; for (auto const& [key, val] : predecessors_) if (val < 1) { enough_tokens = false; @@@ -62,7 -58,7 +58,7 @@@ if (enough_tokens) { for (auto& [key, val] : predecessors_) val--; - enqueue_execs(1); + enqueue_firings(1); } } @@@ -77,11 -73,11 +73,11 @@@ */ void Task::complete() { - xbt_assert(s4u::Actor::is_maestro()); + xbt_assert(Actor::is_maestro()); working_ = false; count_++; - on_this_end_(this); - Task::on_end(this); + on_this_completion(this); + on_completion(this); if (current_activity_) previous_activity_ = std::move(current_activity_); for (auto const& t : successors_) @@@ -90,42 -86,20 +86,20 @@@ fire(); } - /** @ingroup plugin_task - * @brief Init the Task plugin. - * @note Add a completion callback to all Activities to call Task::complete(). - */ - void Task::init() - { - static bool inited = false; - if (inited) - return; - - inited = true; - ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create(); - simgrid::s4u::Exec::on_completion_cb( - [](simgrid::s4u::Exec const& exec) { exec.extension()->task_->complete(); }); - simgrid::s4u::Comm::on_completion_cb( - [](simgrid::s4u::Comm const& comm) { comm.extension()->task_->complete(); }); - simgrid::s4u::Io::on_completion_cb( - [](simgrid::s4u::Io const& io) { io.extension()->task_->complete(); }); - } - - /** @ingroup plugin_task - * @param n The number of executions to enqueue. - * @brief Enqueue executions. - * @note Immediatly starts an execution if possible. + /** @param n The number of firings to enqueue. + * @brief Enqueue firing. + * @note Immediatly fire an activity if possible. */ - void Task::enqueue_execs(int n) + void Task::enqueue_firings(int n) { simgrid::kernel::actor::simcall_answered([this, n] { - queued_execs_ += n; + queued_firings_ += n; if (ready_to_run()) fire(); }); } - /** @ingroup plugin_task - * @param amount The amount to set. + /** @param amount The amount to set. * @brief Set the amout of work to do. * @note Amount in flop for ExecTask and in bytes for CommTask. */ @@@ -134,8 -108,33 +108,34 @@@ void Task::set_amount(double amount simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; }); } - /** @ingroup plugin_task - * @param successor The Task to add. + /** @param token The token to set. + * @brief Set the token to send to successors. + * @note The token is passed to each successor after the task end, i.e., after the on_end callback. + */ + void Task::set_token(std::shared_ptr token) + { + simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; }); + } + + /** @return Map of tokens received for the next execution. + * @note If there is no queued execution for this task the map might not exist or be partially empty. + */ + std::shared_ptr Task::get_next_token_from(TaskPtr t) + { + return tokens_received_.front()[t]; + } + -void Task::fire() { ++void Task::fire() ++{ + on_this_start(this); + on_start(this); - working_ = true; ++ working_ = true; + queued_firings_ = std::max(queued_firings_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); + } + + /** @param successor The Task to add. * @brief Add a successor to this Task. * @note It also adds this as a predecessor of successor. */ @@@ -147,8 -146,7 +147,7 @@@ void Task::add_successor(TaskPtr succes }); } - /** @ingroup plugin_task - * @param successor The Task to remove. + /** @param successor The Task to remove. * @brief Remove a successor from this Task. * @note It also remove this from the predecessors of successor. */ @@@ -171,34 -169,6 +170,6 @@@ void Task::remove_all_successors( }); } - /** @ingroup plugin_task - * @param func The function to set. - * @brief Set a function to be called before each execution. - * @note The function is called before the underlying Activity starts. - */ - void Task::on_this_start_cb(const std::function& func) - { - simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); }); - } - - /** @ingroup plugin_task - * @param func The function to set. - * @brief Set a function to be called after each execution. - * @note The function is called after the underlying Activity ends, but before sending tokens to successors. - */ - void Task::on_this_end_cb(const std::function& func) - { - simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); }); - } - - /** @ingroup plugin_task - * @brief Return the number of completed executions. - */ - int Task::get_count() const - { - return count_; - } - /** * @brief Default constructor. */ @@@ -215,7 -185,7 +186,7 @@@ ExecTaskPtr ExecTask::init(const std::s /** @ingroup plugin_task * @brief Smart Constructor. */ - ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host) + ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host) { return init(name)->set_flops(flops)->set_host(host); } @@@ -227,25 -197,18 +198,18 @@@ */ void ExecTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - s4u::ExecPtr exec = s4u::Exec::init(); - exec->set_name(name_); - exec->set_flops_amount(amount_); - exec->set_host(host_); + Task::fire(); + auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_); exec->start(); - exec->extension_set(new ExtendedAttributeActivity()); - exec->extension()->task_ = this; - current_activity_ = exec; + exec->on_this_completion_cb([this](Exec const&) { this->complete(); }); + set_current_activity(exec); } /** @ingroup plugin_task * @param host The host to set. * @brief Set a new host. */ - ExecTaskPtr ExecTask::set_host(s4u::Host* host) + ExecTaskPtr ExecTask::set_host(Host* host) { kernel::actor::simcall_answered([this, host] { host_ = host; }); return this; @@@ -256,7 -219,7 +220,7 @@@ */ ExecTaskPtr ExecTask::set_flops(double flops) { - kernel::actor::simcall_answered([this, flops] { amount_ = flops; }); + kernel::actor::simcall_answered([this, flops] { set_amount(flops); }); return this; } @@@ -276,7 -239,7 +240,7 @@@ CommTaskPtr CommTask::init(const std::s /** @ingroup plugin_task * @brief Smart constructor. */ - CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination) + CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination) { return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination); } @@@ -288,24 -251,18 +252,18 @@@ */ void CommTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_); - comm->set_name(name_); - comm->set_payload_size(amount_); + Task::fire(); + auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount()); comm->start(); - comm->extension_set(new ExtendedAttributeActivity()); - comm->extension()->task_ = this; - current_activity_ = comm; + comm->on_this_completion_cb([this](Comm const&) { this->complete(); }); + set_current_activity(comm); } /** @ingroup plugin_task * @param source The host to set. * @brief Set a new source host. */ - CommTaskPtr CommTask::set_source(s4u::Host* source) + CommTaskPtr CommTask::set_source(Host* source) { kernel::actor::simcall_answered([this, source] { source_ = source; }); return this; @@@ -315,7 -272,7 +273,7 @@@ * @param destination The host to set. * @brief Set a new destination host. */ - CommTaskPtr CommTask::set_destination(s4u::Host* destination) + CommTaskPtr CommTask::set_destination(Host* destination) { kernel::actor::simcall_answered([this, destination] { destination_ = destination; }); return this; @@@ -326,7 -283,7 +284,7 @@@ */ CommTaskPtr CommTask::set_bytes(double bytes) { - kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); }); return this; } @@@ -346,7 -303,7 +304,7 @@@ IoTaskPtr IoTask::init(const std::strin /** @ingroup plugin_task * @brief Smart Constructor. */ - IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type) + IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type) { return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type); } @@@ -355,7 -312,7 +313,7 @@@ * @param disk The disk to set. * @brief Set a new disk. */ - IoTaskPtr IoTask::set_disk(s4u::Disk* disk) + IoTaskPtr IoTask::set_disk(Disk* disk) { kernel::actor::simcall_answered([this, disk] { disk_ = disk; }); return this; @@@ -366,12 -323,12 +324,12 @@@ */ IoTaskPtr IoTask::set_bytes(double bytes) { - kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); }); return this; } /** @ingroup plugin_task */ - IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) + IoTaskPtr IoTask::set_op_type(Io::OpType type) { kernel::actor::simcall_answered([this, type] { type_ = type; }); return this; @@@ -379,19 -336,11 +337,11 @@@ void IoTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - s4u::IoPtr io = s4u::Io::init(); - io->set_name(name_); - io->set_size(amount_); - io->set_disk(disk_); - io->set_op_type(type_); + Task::fire(); + auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_); io->start(); - io->extension_set(new ExtendedAttributeActivity()); - io->extension()->task_ = this; - current_activity_ = io; + io->on_this_completion_cb([this](Io const&) { this->complete(); }); + set_current_activity(io); } - } // namespace simgrid::plugins + } // namespace simgrid::s4u