From e98ff738e16a6e5a18426c8686407511f4c0c8a4 Mon Sep 17 00:00:00 2001 From: Adrien Gougeon Date: Mon, 2 Oct 2023 16:03:23 +0200 Subject: [PATCH 1/1] fix all Task examples to support the new Task format with dispatcher and collector --- .../s4u-task-parallelism.tesh | 22 +++++++-------- examples/cpp/task-storm/s4u-task-storm.cpp | 27 ++++++++++++------- examples/cpp/task-storm/s4u-task-storm.tesh | 4 +-- .../task-switch-host/s4u-task-switch-host.cpp | 11 ++++++-- examples/python/task-io/task-io.py | 2 +- examples/python/task-simple/task-simple.py | 2 +- examples/python/task-simple/task-simple.tesh | 4 +-- .../task-switch-host/task-switch-host.py | 27 ++++++++++++------- .../task-variable-load/task-variable-load.py | 2 +- include/simgrid/s4u/Task.hpp | 6 +++-- src/bindings/python/simgrid_python.cpp | 7 ++++- src/s4u/s4u_Task.cpp | 18 +++++++------ 12 files changed, 82 insertions(+), 50 deletions(-) diff --git a/examples/cpp/task-parallelism/s4u-task-parallelism.tesh b/examples/cpp/task-parallelism/s4u-task-parallelism.tesh index 938ad9677f..492d015adb 100644 --- a/examples/cpp/task-parallelism/s4u-task-parallelism.tesh +++ b/examples/cpp/task-parallelism/s4u-task-parallelism.tesh @@ -2,40 +2,40 @@ $ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml > [0.000000] [task_parallelism/INFO] Task exec_A start -> [100.000000] [task_parallelism/INFO] Task exec_A finished (1) > [100.000000] [task_parallelism/INFO] Task exec_A start +> [100.000000] [task_parallelism/INFO] Task exec_A finished (1) > [200.000000] [task_parallelism/INFO] Task exec_A finished (2) > [300.000000] [task_parallelism/INFO] Task exec_A start > [300.000000] [task_parallelism/INFO] Task exec_A start -> [400.000000] [task_parallelism/INFO] Task exec_A finished (3) > [400.000000] [task_parallelism/INFO] Task exec_A start -> [400.000000] [task_parallelism/INFO] Task exec_A finished (4) > [400.000000] [task_parallelism/INFO] Task exec_A start +> [400.000000] [task_parallelism/INFO] Task exec_A finished (3) +> [400.000000] [task_parallelism/INFO] Task exec_A finished (4) > [500.000000] [task_parallelism/INFO] Task exec_A finished (5) > [500.000000] [task_parallelism/INFO] Task exec_A finished (6) > [600.000000] [task_parallelism/INFO] Task exec_A start -> [700.000000] [task_parallelism/INFO] Task exec_A finished (7) > [700.000000] [task_parallelism/INFO] Task exec_A start +> [700.000000] [task_parallelism/INFO] Task exec_A finished (7) > [800.000000] [task_parallelism/INFO] Task exec_A finished (8) > [900.000000] [task_parallelism/INFO] Task exec_A start > [900.000000] [task_parallelism/INFO] Task exec_A start -> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9) > [1000.000000] [task_parallelism/INFO] Task exec_A start -> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10) > [1000.000000] [task_parallelism/INFO] Task exec_A start +> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9) +> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10) +> [1100.000000] [task_parallelism/INFO] Task exec_A start > [1100.000000] [task_parallelism/INFO] Task exec_A finished (11) > [1100.000000] [task_parallelism/INFO] Task exec_A finished (12) -> [1100.000000] [task_parallelism/INFO] Task exec_A start -> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13) > [1200.000000] [task_parallelism/INFO] Task exec_A start +> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13) > [1250.000000] [task_parallelism/INFO] Task exec_A start > [1250.000000] [task_parallelism/INFO] Task exec_A start -> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14) > [1300.000000] [task_parallelism/INFO] Task exec_A start -> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15) +> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14) > [1350.000000] [task_parallelism/INFO] Task exec_A start -> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16) > [1350.000000] [task_parallelism/INFO] Task exec_A start +> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15) +> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16) > [1400.000000] [task_parallelism/INFO] Task exec_A finished (17) > [1450.000000] [task_parallelism/INFO] Task exec_A finished (18) > [1450.000000] [task_parallelism/INFO] Task exec_A finished (19) \ No newline at end of file diff --git a/examples/cpp/task-storm/s4u-task-storm.cpp b/examples/cpp/task-storm/s4u-task-storm.cpp index 58d08c379c..0a0ab7143b 100644 --- a/examples/cpp/task-storm/s4u-task-storm.cpp +++ b/examples/cpp/task-storm/s4u-task-storm.cpp @@ -74,10 +74,10 @@ int main(int argc, char* argv[]) Alternatively we: remove/add the link between SA and SA_to_B2 add/remove the link between SA and SA_to_B1 */ - SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) { + SA->on_this_completion_cb([&SA_to_B1, &SA_to_B2](sg4::Task* t) { int count = t->get_count(); sg4::CommTaskPtr comm; - if (count % 2 == 0) { + if (count % 2 == 1) { t->remove_successor(SA_to_B2); t->add_successor(SA_to_B1); comm = SA_to_B1; @@ -86,7 +86,8 @@ int main(int argc, char* argv[]) t->add_successor(SA_to_B2); comm = SA_to_B2; } - std::vector amount = {1e3, 1e6, 1e9}; + std::vector amount = {1e9, 1e3, 1e6}; + // XBT_INFO("Comm %f", amount[count % 3]); comm->set_amount(amount[count % 3]); auto token = std::make_shared(); token->set_data(new double(amount[count % 3])); @@ -94,18 +95,26 @@ int main(int argc, char* argv[]) }); // The token sent by SA is forwarded by both communication tasks - SA_to_B1->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); }); - SA_to_B2->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); }); + SA_to_B1->on_this_completion_cb([&SA](sg4::Task* t) { + t->set_token(t->get_token_from(SA)); + t->deque_token_from(SA); + }); + SA_to_B2->on_this_completion_cb([&SA](sg4::Task* t) { + t->set_token(t->get_token_from(SA)); + t->deque_token_from(SA); + }); /* B1 and B2 read the value of the token received by their predecessors and use it to adapt their amount of work to do. */ - B1->on_this_start_cb([SA_to_B1](sg4::Task* t) { - auto data = t->get_next_token_from(SA_to_B1)->get_unique_data(); + B1->on_this_start_cb([&SA_to_B1](sg4::Task* t) { + auto data = t->get_token_from(SA_to_B1)->get_data(); + t->deque_token_from(SA_to_B1); t->set_amount(*data * 10); }); - B2->on_this_start_cb([SA_to_B2](sg4::Task* t) { - auto data = t->get_next_token_from(SA_to_B2)->get_unique_data(); + B2->on_this_start_cb([&SA_to_B2](sg4::Task* t) { + auto data = t->get_token_from(SA_to_B2)->get_data(); + t->deque_token_from(SA_to_B2); t->set_amount(*data * 10); }); diff --git a/examples/cpp/task-storm/s4u-task-storm.tesh b/examples/cpp/task-storm/s4u-task-storm.tesh index d7c364a837..376dc31a47 100644 --- a/examples/cpp/task-storm/s4u-task-storm.tesh +++ b/examples/cpp/task-storm/s4u-task-storm.tesh @@ -24,11 +24,11 @@ $ ${bindir:=.}/s4u-task-storm ${platfdir}/small_platform.xml > [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5) > [2.619232] [task_storm/INFO] Task B3 finished (1) > [6.743624] [task_storm/INFO] Task B3 finished (2) -> [10.868015] [task_storm/INFO] Task B3 finished (3) > [10.868015] [task_storm/INFO] Task B4 finished (1) +> [10.868015] [task_storm/INFO] Task B3 finished (3) > [14.992407] [task_storm/INFO] Task B3 finished (4) -> [19.116799] [task_storm/INFO] Task B3 finished (5) > [19.116799] [task_storm/INFO] Task B4 finished (2) +> [19.116799] [task_storm/INFO] Task B3 finished (5) > [23.241190] [task_storm/INFO] Task B4 finished (3) > [27.365582] [task_storm/INFO] Task B4 finished (4) > [31.489974] [task_storm/INFO] Task B4 finished (5) diff --git a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp index b007523f73..6f8c62b849 100644 --- a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp +++ b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp @@ -54,12 +54,19 @@ int main(int argc, char* argv[]) // successors to comm0 comm0->on_this_start_cb([&comm0, exec1, exec2, jupiter, fafard](const sg4::Task*) { static int count = 0; - if (count % 2 == 0) { + if (count % 2 == 0) comm0->set_destination(jupiter); + else + comm0->set_destination(fafard); + count++; + }); + + comm0->on_this_completion_cb([&comm0, exec1, exec2, jupiter, fafard](const sg4::Task*) { + static int count = 0; + if (count % 2 == 0) { comm0->add_successor(exec1); comm0->remove_successor(exec2); } else { - comm0->set_destination(fafard); comm0->add_successor(exec2); comm0->remove_successor(exec1); } diff --git a/examples/python/task-io/task-io.py b/examples/python/task-io/task-io.py index e75215a0f7..d3ab8c1b1d 100644 --- a/examples/python/task-io/task-io.py +++ b/examples/python/task-io/task-io.py @@ -18,7 +18,7 @@ def parse(): return parser.parse_args() def callback(t): - print(f'[{Engine.clock}] {t} finished ({t.count})') + print(f'[{Engine.clock}] {t} finished ({t.get_count()})') if __name__ == '__main__': args = parse() diff --git a/examples/python/task-simple/task-simple.py b/examples/python/task-simple/task-simple.py index 23e9fc0c8a..beca2b6a2c 100644 --- a/examples/python/task-simple/task-simple.py +++ b/examples/python/task-simple/task-simple.py @@ -28,7 +28,7 @@ def parse(): return parser.parse_args() def callback(t): - print(f'[{Engine.clock}] {t} finished ({t.count})') + print(f'[{Engine.clock}] {t} finished ({t.get_count()})') if __name__ == '__main__': args = parse() diff --git a/examples/python/task-simple/task-simple.tesh b/examples/python/task-simple/task-simple.tesh index 5a27a53ad4..f9a828fe28 100644 --- a/examples/python/task-simple/task-simple.tesh +++ b/examples/python/task-simple/task-simple.tesh @@ -5,5 +5,5 @@ $ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/task-simple.py --p > [11.714617112501687] CommTask(comm) finished (1) > [20.388399000968448] ExecTask(exec1) finished (2) > [21.90881661298591] CommTask(comm) finished (2) -> [24.82146412938331] ExecTask(exec2) finished (1) -> [37.92831114626493] ExecTask(exec2) finished (2) +> [24.821464129383305] ExecTask(exec2) finished (1) +> [37.928311146264925] ExecTask(exec2) finished (2) diff --git a/examples/python/task-switch-host/task-switch-host.py b/examples/python/task-switch-host/task-switch-host.py index 5be8922534..03dce6a7fb 100644 --- a/examples/python/task-switch-host/task-switch-host.py +++ b/examples/python/task-switch-host/task-switch-host.py @@ -44,12 +44,16 @@ def parse(): return parser.parse_args() def callback(t): - print(f'[{Engine.clock}] {t} finished ({t.count})') + print(f'[{Engine.clock}] {t} finished ({t.get_count()})') -def switch(t, hosts, execs): - comm0.destination = hosts[t.count % 2] - comm0.remove_successor(execs[t.count % 2 - 1]) - comm0.add_successor(execs[t.count % 2]) +def switch_destination(t, hosts): + t.destination = hosts[switch_destination.count % 2] + switch_destination.count += 1 +switch_destination.count = 0 + +def switch_successor(t, execs): + t.remove_successor(execs[t.get_count() % 2]) + t.add_successor(execs[t.get_count() % 2 - 1]) if __name__ == '__main__': args = parse() @@ -74,13 +78,16 @@ if __name__ == '__main__': exec1.add_successor(comm1) exec2.add_successor(comm2) - # Add a function to be called when tasks end for log purpose + # Add a callback when tasks end for log purpose Task.on_completion_cb(callback) - # Add a function to be called before each firing of comm0 - # This function modifies the graph of tasks by adding or removing - # successors to comm0 - comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2])) + # Add a callback before each firing of comm0 + # It switches the destination of comm0 + comm0.on_this_start_cb(lambda t: switch_destination(t, [jupiter, fafard])) + + # Add a callback before comm0 send tokens to successors + # It switches the successor of comm0 + comm0.on_this_completion_cb(lambda t: switch_successor(t, [exec1,exec2])) # Enqueue two firings for task exec1 comm0.enqueue_firings(4) diff --git a/examples/python/task-variable-load/task-variable-load.py b/examples/python/task-variable-load/task-variable-load.py index 51dbc1a6c6..14fc2ec94e 100644 --- a/examples/python/task-variable-load/task-variable-load.py +++ b/examples/python/task-variable-load/task-variable-load.py @@ -28,7 +28,7 @@ def parse(): return parser.parse_args() def callback(t): - print(f'[{Engine.clock}] {t} finished ({t.count})') + print(f'[{Engine.clock}] {t} finished ({t.get_count()})') def variable_load(t): print('--- Small load ---') diff --git a/include/simgrid/s4u/Task.hpp b/include/simgrid/s4u/Task.hpp index ea144e239d..1fe18138bd 100644 --- a/include/simgrid/s4u/Task.hpp +++ b/include/simgrid/s4u/Task.hpp @@ -48,7 +48,7 @@ class Task { void receive(Task* source); std::shared_ptr token_ = nullptr; - std::deque>> tokens_received_; + std::map>> tokens_received_; std::map> current_activities_ = { {"instance_0", {}}, {"dispatcher", {}}, {"collector", {}}}; @@ -83,7 +83,9 @@ public: void set_load_balancing_function(std::function func); void set_token(std::shared_ptr token); - std::shared_ptr get_next_token_from(TaskPtr t) const { return tokens_received_.front().at(t); } + std::shared_ptr get_token_from(TaskPtr t) const { return tokens_received_.at(t).front(); } + std::deque> get_tokens_from(TaskPtr t) const { return tokens_received_.at(t); } + void deque_token_from(TaskPtr t); void add_successor(TaskPtr t); void remove_successor(TaskPtr t); diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index 792ae4b5e3..2f2ef70bec 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -861,9 +861,14 @@ PYBIND11_MODULE(simgrid, m) }, "Add a callback called when each task ends.") .def_property_readonly("name", &Task::get_name, "The name of this task (read-only).") - .def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).") .def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).") .def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.") + .def( + "get_count", [](const TaskPtr t) { return t->get_count("instance_0"); }, + "The execution count of this task instance_0.") + .def( + "get_count", [](const TaskPtr t, const std::string& instance) { return t->get_count(instance); }, + "The execution count of this task instance.") .def("enqueue_firings", py::overload_cast(&Task::enqueue_firings), py::call_guard(), py::arg("n"), "Enqueue firings for this task.") .def("add_successor", py::overload_cast(&Task::add_successor), py::call_guard(), diff --git a/src/s4u/s4u_Task.cpp b/src/s4u/s4u_Task.cpp index e6dbe06914..ca72bfbcaa 100644 --- a/src/s4u/s4u_Task.cpp +++ b/src/s4u/s4u_Task.cpp @@ -51,10 +51,8 @@ void Task::receive(Task* source) XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str()); auto source_count = predecessors_[source]; predecessors_[source]++; - if (tokens_received_.size() <= queued_firings_["dispatcher"] + source_count) - tokens_received_.emplace_back(); - tokens_received_[queued_firings_["dispatcher"] + source_count][source] = source->token_; - bool enough_tokens = true; + tokens_received_[source].push_back(source->token_); + bool enough_tokens = true; for (auto const& [key, val] : predecessors_) if (val < 1) { enough_tokens = false; @@ -82,6 +80,7 @@ void Task::complete(std::string instance) running_instances_[instance] = running_instances_[instance] - 1; count_[instance] = count_[instance] + 1; if (instance == "collector") { + // XBT_INFO("Trigger on completion: %s - %s", get_cname(), instance.c_str()); on_this_completion(this); on_completion(this); for (auto const& t : successors_) @@ -167,26 +166,29 @@ void Task::set_amount(double amount, std::string instance) /** @param token The token to set. * @brief Set the token to send to successors. - * @note The token is passed to each successor after the task end, i.e., after the on_end callback. + * @note The token is passed to each successor after the task end, i.e., after the on_completion callback. */ void Task::set_token(std::shared_ptr token) { simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; }); } +void Task::deque_token_from(TaskPtr t) +{ + simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_.at(t).pop_front(); }); +} + void Task::fire(std::string instance) { if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) { current_activities_[instance].pop_front(); } - if (instance == "dispatcher") { + if (instance != "dispatcher" and instance != "collector") { on_this_start(this); on_start(this); } running_instances_[instance]++; queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0); - if (not tokens_received_.empty()) - tokens_received_.pop_front(); } /** @param successor The Task to add. -- 2.20.1