From ce07073eff21e1ee0470d9bdad1ae4a38fe4b502 Mon Sep 17 00:00:00 2001 From: Jean-Edouard BOULANGER Date: Fri, 25 Mar 2022 08:26:51 +0100 Subject: [PATCH] Add remaining Comm bindings and examples --- ChangeLog | 17 +++ MANIFEST.in | 18 +++ docs/source/app_s4u.rst | 18 +++ examples/README.rst | 10 ++ examples/python/CMakeLists.txt | 3 +- examples/python/comm-failure/comm-failure.py | 88 +++++++++++++ .../python/comm-failure/comm-failure.tesh | 19 +++ .../python/comm-host2host/comm-host2host.py | 86 +++++++++++++ .../python/comm-host2host/comm-host2host.tesh | 10 ++ .../python/comm-pingpong/comm-pingpong.py | 69 ++++++++++ .../python/comm-pingpong/comm-pingpong.tesh | 26 ++++ examples/python/comm-ready/comm-ready.py | 87 +++++++++++++ examples/python/comm-ready/comm-ready.tesh | 91 ++++++++++++++ .../python/comm-serialize/comm-serialize.py | 92 ++++++++++++++ .../python/comm-serialize/comm-serialize.tesh | 26 ++++ examples/python/comm-suspend/comm-suspend.py | 67 ++++++++++ .../python/comm-suspend/comm-suspend.tesh | 11 ++ examples/python/comm-testany/comm-testany.py | 72 +++++++++++ .../python/comm-testany/comm-testany.tesh | 16 +++ .../python/comm-throttling/comm-throttling.py | 67 ++++++++++ .../comm-throttling/comm-throttling.tesh | 7 ++ .../python/comm-waitallfor/comm-waitallfor.py | 1 + .../python/comm-waituntil/comm-waituntil.py | 77 ++++++++++++ .../python/comm-waituntil/comm-waituntil.tesh | 14 +++ src/bindings/python/simgrid_python.cpp | 118 +++++++++++++----- src/s4u/s4u_Engine.cpp | 2 +- 26 files changed, 1080 insertions(+), 32 deletions(-) create mode 100644 examples/python/comm-failure/comm-failure.py create mode 100644 examples/python/comm-failure/comm-failure.tesh create mode 100644 examples/python/comm-host2host/comm-host2host.py create mode 100644 examples/python/comm-host2host/comm-host2host.tesh create mode 100644 examples/python/comm-pingpong/comm-pingpong.py create mode 100644 examples/python/comm-pingpong/comm-pingpong.tesh create mode 100644 examples/python/comm-ready/comm-ready.py create mode 100644 examples/python/comm-ready/comm-ready.tesh create mode 100644 examples/python/comm-serialize/comm-serialize.py create mode 100644 examples/python/comm-serialize/comm-serialize.tesh create mode 100644 examples/python/comm-suspend/comm-suspend.py create mode 100644 examples/python/comm-suspend/comm-suspend.tesh create mode 100644 examples/python/comm-testany/comm-testany.py create mode 100644 examples/python/comm-testany/comm-testany.tesh create mode 100644 examples/python/comm-throttling/comm-throttling.py create mode 100644 examples/python/comm-throttling/comm-throttling.tesh create mode 100644 examples/python/comm-waituntil/comm-waituntil.py create mode 100644 examples/python/comm-waituntil/comm-waituntil.tesh diff --git a/ChangeLog b/ChangeLog index 74e0477e33..6987256d20 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,22 @@ SimGrid (3.31.1) NOT RELEASED YET (v3.32 expected June 21. 2022, 09:13 UTC) +Python: + - Added the following bindings / examples: + - Comm (now 100% covers the C++ interface): + - Comm.dst_data_size, Comm.mailbox, Comm.sender, Comm.start_time, Comm.finish_time + - Comm.state_str [examples: examples/python/comm-failure/, examples/python/comm-host2host/] + - Comm.remaining [examples: examples/python/comm-host2host/, examples/python/comm-suspend/] + - Comm.set_payload_size [example: examples/python/comm-host2host/] + - Comm.set_rate [example: examples/python/comm-throttling/] + - Comm.sendto, Comm.sendto_init, Comm.sendto_async [example: examples/python/comm-host2host/] + - Comm.start, Comm.suspend, Comm.resume [example: examples/python/comm-host2host/] + - Comm.test_any [example: examples/python/comm-testany/] + - Comm.wait_until [example: examples/python/comm-waituntil/] + - Engine: + - Engine.host_by_name [example: examples/python/comm-host2host/] + - Engine.mailbox_by_name_or_create [example: examples/python/comm-pingpong/] + - Mailbox: Mailbox.ready [example: examples/python/comm-ready/] + ---------------------------------------------------------------------------- SimGrid (3.31) March 22. 2022. diff --git a/MANIFEST.in b/MANIFEST.in index 686060565c..6c34780114 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -521,6 +521,22 @@ include examples/python/app-masterworkers/app-masterworkers.py include examples/python/app-masterworkers/app-masterworkers.tesh include examples/python/clusters-multicpu/clusters-multicpu.py include examples/python/clusters-multicpu/clusters-multicpu.tesh +include examples/python/comm-failure/comm-failure.py +include examples/python/comm-failure/comm-failure.tesh +include examples/python/comm-host2host/comm-host2host.py +include examples/python/comm-host2host/comm-host2host.tesh +include examples/python/comm-pingpong/comm-pingpong.py +include examples/python/comm-pingpong/comm-pingpong.tesh +include examples/python/comm-ready/comm-ready.py +include examples/python/comm-ready/comm-ready.tesh +include examples/python/comm-serialize/comm-serialize.py +include examples/python/comm-serialize/comm-serialize.tesh +include examples/python/comm-suspend/comm-suspend.py +include examples/python/comm-suspend/comm-suspend.tesh +include examples/python/comm-testany/comm-testany.py +include examples/python/comm-testany/comm-testany.tesh +include examples/python/comm-throttling/comm-throttling.py +include examples/python/comm-throttling/comm-throttling.tesh include examples/python/comm-wait/comm-wait.py include examples/python/comm-wait/comm-wait.tesh include examples/python/comm-waitall/comm-waitall.py @@ -531,6 +547,8 @@ include examples/python/comm-waitany/comm-waitany.py include examples/python/comm-waitany/comm-waitany.tesh include examples/python/comm-waitfor/comm-waitfor.py include examples/python/comm-waitfor/comm-waitfor.tesh +include examples/python/comm-waituntil/comm-waituntil.py +include examples/python/comm-waituntil/comm-waituntil.tesh include examples/python/exec-async/exec-async.py include examples/python/exec-async/exec-async.tesh include examples/python/exec-basic/exec-basic.py diff --git a/docs/source/app_s4u.rst b/docs/source/app_s4u.rst index 2047854710..6a46c29c21 100644 --- a/docs/source/app_s4u.rst +++ b/docs/source/app_s4u.rst @@ -908,6 +908,7 @@ Retrieving hosts .. group-tab:: Python .. autoattribute:: simgrid.Engine.all_hosts + .. automethod:: simgrid::s4u::Engine::host_by_name .. group-tab:: C @@ -1080,6 +1081,7 @@ Receiving data .. automethod:: simgrid.Mailbox.get .. automethod:: simgrid.Mailbox.get_async + .. autoattribute:: simgrid.Mailbox.ready .. group-tab:: C @@ -2188,6 +2190,13 @@ Querying info .. group-tab:: Python + .. autoattribute:: simgrid.Comm.dst_data_size + .. autoattribute:: simgrid.Comm.mailbox + .. autoattribute:: simgrid.Comm.sender + .. autoattribute:: simgrid.Comm.state_str + .. automethod:: simgrid.Comm.detach + .. automethod:: simgrid.Comm.set_payload_size + .. automethod:: simgrid.Comm.set_rate .. automethod:: simgrid.Comm.detach Life cycle @@ -2215,16 +2224,25 @@ also start direct communications as shown below. .. doxygenfunction:: simgrid::s4u::Comm::wait_any(const std::vector< CommPtr >& comms) .. doxygenfunction:: simgrid::s4u::Comm::wait_any_for(const std::vector< CommPtr >& comms, double timeout) .. doxygenfunction:: simgrid::s4u::Comm::wait_for + .. doxygenfunction:: simgrid::s4u::Comm::wait_until .. group-tab:: Python + .. automethod:: simgrid.Comm.sendto + .. automethod:: simgrid.Comm.sendto_init + .. automethod:: simgrid.Comm.sendto_async + + .. automethod:: simgrid.Comm.cancel + .. automethod:: simgrid.Comm.start .. automethod:: simgrid.Comm.test + .. automethod:: simgrid.Comm.test_any .. automethod:: simgrid.Comm.wait .. automethod:: simgrid.Comm.wait_for .. automethod:: simgrid.Comm.wait_all .. automethod:: simgrid.Comm.wait_all_for .. automethod:: simgrid.Comm.wait_any .. automethod:: simgrid.Comm.wait_any_for + .. automethod:: simgrid.Comm.wait_until .. group-tab:: C diff --git a/examples/README.rst b/examples/README.rst index af59d293a2..45b314f322 100644 --- a/examples/README.rst +++ b/examples/README.rst @@ -319,6 +319,8 @@ the simulators (as detailed in Section :ref:`options`). .. example-tab:: examples/cpp/comm-pingpong/s4u-comm-pingpong.cpp + .. example-tab:: examples/python/comm-pingpong/comm-pingpong.py + .. example-tab:: examples/c/comm-pingpong/comm-pingpong.c @@ -371,6 +373,11 @@ The ``suspend()`` and ``resume()`` functions block the progression of a given co :cpp:func:`simgrid::s4u::Activity::resume()` and :cpp:func:`simgrid::s4u::Activity::is_suspended()`. + .. example-tab:: examples/python/comm-suspend/comm-suspend.py + + See also :py:func:`simgrid.Comm::suspend()` and + :py:func:`simgrid.Comm.resume()`. + Waiting for all communications in a set ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -420,6 +427,9 @@ The ``test_any()`` returns whether at least one activity of the set has complete See also :cpp:func:`simgrid::s4u::Comm::test_any()`. + .. example-tab:: examples/python/comm-testany/comm-testany.py + + See also :py:func:`simgrid.Comm.test_any()`. .. _s4u_ex_execution: diff --git a/examples/python/CMakeLists.txt b/examples/python/CMakeLists.txt index 9f04d7abf7..48f69d4e37 100644 --- a/examples/python/CMakeLists.txt +++ b/examples/python/CMakeLists.txt @@ -1,6 +1,7 @@ foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime app-masterworkers - comm-wait comm-waitall comm-waitallfor comm-waitany comm-waitfor + comm-wait comm-waitall comm-waitallfor comm-waitany comm-waitfor comm-failure comm-host2host comm-pingpong + comm-ready comm-serialize comm-suspend comm-testany comm-throttling comm-waitallfor comm-waituntil exec-async exec-basic exec-dvfs exec-remote platform-profile platform-failures network-nonlinear clusters-multicpu io-degradation exec-cpu-nonlinear diff --git a/examples/python/comm-failure/comm-failure.py b/examples/python/comm-failure/comm-failure.py new file mode 100644 index 0000000000..b251f781a1 --- /dev/null +++ b/examples/python/comm-failure/comm-failure.py @@ -0,0 +1,88 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +import sys + +from simgrid import Engine, Actor, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException + + +def sender(mailbox1_name: str, mailbox2_name: str) -> None: + mailbox1: Mailbox = Mailbox.by_name(mailbox1_name) + mailbox2: Mailbox = Mailbox.by_name(mailbox2_name) + + this_actor.info(f"Initiating asynchronous send to {mailbox1.name}") + comm1: Comm = mailbox1.put_async(666, 5) + + this_actor.info(f"Initiating asynchronous send to {mailbox2.name}") + comm2: Comm = mailbox2.put_async(666, 2) + + this_actor.info(f"Calling wait_any..") + pending_comms = [comm1, comm2] + try: + index = Comm.wait_any([comm1, comm2]) + this_actor.info(f"Wait any returned index {index} (comm to {pending_comms[index].mailbox.name})") + except NetworkFailureException: + this_actor.info(f"Sender has experienced a network failure exception, so it knows that something went wrong") + this_actor.info(f"Now it needs to figure out which of the two comms failed by looking at their state") + + this_actor.info(f"Comm to {comm1.mailbox.name} has state: {comm1.state_str}") + this_actor.info(f"Comm to {comm2.mailbox.name} has state: {comm2.state_str}") + + try: + comm1.wait() + except NetworkFailureException: + this_actor.info(f"Waiting on a FAILED comm raises an exception") + + this_actor.info("Wait for remaining comm, just to be nice") + pending_comms.pop(0) + try: + Comm.wait_any(pending_comms) + except Exception as e: + this_actor.warning(str(e)) + + +def receiver(mailbox_name: str) -> None: + mailbox: Mailbox = Mailbox.by_name(mailbox_name) + this_actor.info(f"Receiver posting a receive ({mailbox_name})...") + try: + mailbox.get() + this_actor.info(f"Receiver has received successfully ({mailbox_name})!") + except NetworkFailureException: + this_actor.info(f"Receiver has experience a network failure exception ({mailbox_name})") + + +def link_killer(link_name: str) -> None: + link_to_kill = Link.by_name(link_name) + this_actor.info("sleeping 10 seconds...") + this_actor.sleep_for(10.0) + this_actor.info(f"turning off link {link_to_kill.name}") + link_to_kill.turn_off() + this_actor.info("link killed. exiting") + + +def main(): + e = Engine(sys.argv) + zone: NetZone = NetZone.create_full_zone("AS0") + host1 = zone.create_host("Host1", "1f") + host2 = zone.create_host("Host2", "1f") + host3 = zone.create_host("Host3", "1f") + + link_to_2 = LinkInRoute(zone.create_link("link_to_2", "1bps").seal()) + link_to_3 = LinkInRoute(zone.create_link("link_to_3", "1bps").seal()) + + zone.add_route(host1.netpoint, host2.netpoint, None, None, [link_to_2], False) + zone.add_route(host1.netpoint, host3.netpoint, None, None, [link_to_3], False) + zone.seal() + + Actor.create("Sender", host1, sender, "mailbox2", "mailbox3") + Actor.create("Receiver-1", host2, receiver, "mailbox2").daemonize() + Actor.create("Receiver-2", host3, receiver, "mailbox3").daemonize() + Actor.create("LinkKiller", host2, link_killer, "link_to_2").daemonize() + + e.run() + + +if __name__ == "__main__": + main() diff --git a/examples/python/comm-failure/comm-failure.tesh b/examples/python/comm-failure/comm-failure.tesh new file mode 100644 index 0000000000..cc78b708b8 --- /dev/null +++ b/examples/python/comm-failure/comm-failure.tesh @@ -0,0 +1,19 @@ +#!/usr/bin/env tesh + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-failure.py "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (4:LinkKiller@Host2) sleeping 10 seconds... +>[ 0.000000] (2:Receiver-1@Host2) Receiver posting a receive (mailbox2)... +>[ 0.000000] (3:Receiver-2@Host3) Receiver posting a receive (mailbox3)... +>[ 0.000000] (1:Sender@Host1) Initiating asynchronous send to mailbox2 +>[ 0.000000] (1:Sender@Host1) Initiating asynchronous send to mailbox3 +>[ 0.000000] (1:Sender@Host1) Calling wait_any.. +>[ 10.000000] (4:LinkKiller@Host2) turning off link link_to_2 +>[ 10.000000] (4:LinkKiller@Host2) link killed. exiting +>[ 10.000000] (2:Receiver-1@Host2) Receiver has experience a network failure exception (mailbox2) +>[ 10.000000] (1:Sender@Host1) Sender has experienced a network failure exception, so it knows that something went wrong +>[ 10.000000] (1:Sender@Host1) Now it needs to figure out which of the two comms failed by looking at their state +>[ 10.000000] (1:Sender@Host1) Comm to mailbox2 has state: FAILED +>[ 10.000000] (1:Sender@Host1) Comm to mailbox3 has state: STARTED +>[ 10.000000] (1:Sender@Host1) Waiting on a FAILED comm raises an exception +>[ 10.000000] (1:Sender@Host1) Wait for remaining comm, just to be nice +>[ 16.494845] (3:Receiver-2@Host3) Receiver has received successfully (mailbox3)! diff --git a/examples/python/comm-host2host/comm-host2host.py b/examples/python/comm-host2host/comm-host2host.py new file mode 100644 index 0000000000..e352a96582 --- /dev/null +++ b/examples/python/comm-host2host/comm-host2host.py @@ -0,0 +1,86 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +""" +This simple example demonstrates the Comm.sento_init() Comm.sento_async() functions, +that can be used to create a direct communication from one host to another without +relying on the mailbox mechanism. + +There is not much to say, actually: The _init variant creates the communication and +leaves it unstarted (in case you want to modify this communication before it starts), +while the _async variant creates and start it. In both cases, you need to wait() it. + +It is mostly useful when you want to have a centralized simulation of your settings, +with a central actor declaring all communications occurring on your distributed system. +""" + +from argparse import ArgumentParser +import sys + +from simgrid import Actor, Comm, Engine, Host, this_actor + + +def create_parser() -> ArgumentParser: + parser = ArgumentParser() + parser.add_argument( + '--platform', + type=str, + required=True, + help='path to the platform description' + ) + return parser + + +def sender(h1: Host, h2: Host, h3: Host, h4: Host): + this_actor.info(f"Send c12 with sendto_async({h1.name} -> {h2.name})," + f" and c34 with sendto_init({h3.name} -> {h4.name})") + c12: Comm = Comm.sendto_async(h1, h2, int(1.5e7)) + c34: Comm = Comm.sendto_init(h3, h4) + c34.set_payload_size(int(1e7)) + + # You can also detach() communications that you never plan to test() or wait(). + # Here we create a communication that only slows down the other ones + noise: Comm = Comm.sendto_init(h1, h2) + noise.set_payload_size(10000) + noise.detach() + + this_actor.info(f"After creation, c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);" + f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)") + this_actor.sleep_for(1) + this_actor.info(f"One sec later, c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);" + f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)") + c34.start() + this_actor.info(f"After c34.start(), c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);" + f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)") + c12.wait() + this_actor.info(f"After c12.wait(), c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);" + f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)") + c34.wait() + this_actor.info(f"After c34.wait(), c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);" + f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)") + + # As usual, you don't have to explicitly start communications that were just init()ed. + # The wait() will start it automatically. + c14: Comm = Comm.sendto_init(h1, h4) + c14.set_payload_size(100).wait() + + +def main(): + settings = create_parser().parse_known_args()[0] + e = Engine(sys.argv) + e.load_platform(settings.platform) + Actor.create( + "sender", e.host_by_name("Boivin"), sender, + e.host_by_name("Tremblay"), # h1 + e.host_by_name("Jupiter"), # h2 + e.host_by_name("Fafard"), # h3 + e.host_by_name("Ginette") # h4 + ) + e.run() + this_actor.info(f"Total simulation time: {e.clock:.3f}") + + +if __name__ == "__main__": + main() diff --git a/examples/python/comm-host2host/comm-host2host.tesh b/examples/python/comm-host2host/comm-host2host.tesh new file mode 100644 index 0000000000..9b6466de66 --- /dev/null +++ b/examples/python/comm-host2host/comm-host2host.tesh @@ -0,0 +1,10 @@ +#!/usr/bin/env tesh + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-host2host.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (1:sender@Boivin) Send c12 with sendto_async(Tremblay -> Jupiter), and c34 with sendto_init(Fafard -> Ginette) +>[ 0.000000] (1:sender@Boivin) After creation, c12 is STARTED (remaining: 1.50e+07 bytes); c34 is STARTING (remaining: 1.00e+07 bytes) +>[ 1.000000] (1:sender@Boivin) One sec later, c12 is STARTED (remaining: 8.48e+06 bytes); c34 is STARTING (remaining: 1.00e+07 bytes) +>[ 1.000000] (1:sender@Boivin) After c34.start(), c12 is STARTED (remaining: 8.48e+06 bytes); c34 is STARTED (remaining: 1.00e+07 bytes) +>[ 2.272621] (1:sender@Boivin) After c12.wait(), c12 is FINISHED (remaining: 0.00e+00 bytes); c34 is STARTED (remaining: 5.33e+05 bytes) +>[ 2.343278] (1:sender@Boivin) After c34.wait(), c12 is FINISHED (remaining: 0.00e+00 bytes); c34 is FINISHED (remaining: 0.00e+00 bytes) +>[ 2.359841] (0:maestro@) Total simulation time: 2.360 diff --git a/examples/python/comm-pingpong/comm-pingpong.py b/examples/python/comm-pingpong/comm-pingpong.py new file mode 100644 index 0000000000..d5c76f9dd3 --- /dev/null +++ b/examples/python/comm-pingpong/comm-pingpong.py @@ -0,0 +1,69 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +from argparse import ArgumentParser +import sys + +from simgrid import Engine, Actor, Mailbox, this_actor + + +def create_parser() -> ArgumentParser: + parser = ArgumentParser() + parser.add_argument( + '--platform', + type=str, + required=True, + help='path to the platform description' + ) + return parser + + +def pinger(mailbox_in: Mailbox, mailbox_out: Mailbox): + this_actor.info(f"Ping from mailbox {mailbox_in.name} to mailbox {mailbox_out.name}") + + # Do the ping with a 1-Byte payload (latency bound) ... + payload = Engine.clock + mailbox_out.put(payload, 1) + + # ... then wait for the (large) pong + sender_time: float = mailbox_in.get() + communication_time = Engine.clock - sender_time + this_actor.info("Payload received : large communication (bandwidth bound)") + this_actor.info(f"Pong time (bandwidth bound): {communication_time:.3f}") + + +def ponger(mailbox_in: Mailbox, mailbox_out: Mailbox): + this_actor.info(f"Pong from mailbox {mailbox_in.name} to mailbox {mailbox_out.name}") + + # Receive the (small) ping first ... + sender_time: float = mailbox_in.get() + communication_time = Engine.clock - sender_time + this_actor.info("Payload received : small communication (latency bound)") + this_actor.info(f"Ping time (latency bound) {communication_time:.3f}") + + # ... Then send a 1GB pong back (bandwidth bound) + payload = Engine.clock + this_actor.info(f"payload = {payload:.3f}") + mailbox_out.put(payload, int(1e9)) + + +def main(): + settings = create_parser().parse_known_args()[0] + e = Engine(sys.argv) + e.load_platform(settings.platform) + + mb1: Mailbox = e.mailbox_by_name_or_create("Mailbox 1") + mb2: Mailbox = e.mailbox_by_name_or_create("Mailbox 2") + + Actor.create("pinger", e.host_by_name("Tremblay"), pinger, mb1, mb2) + Actor.create("ponger", e.host_by_name("Jupiter"), ponger, mb2, mb1) + + e.run() + + this_actor.info(f"Total simulation time: {e.clock:.3f}") + + +if __name__ == "__main__": + main() diff --git a/examples/python/comm-pingpong/comm-pingpong.tesh b/examples/python/comm-pingpong/comm-pingpong.tesh new file mode 100644 index 0000000000..94d22ce504 --- /dev/null +++ b/examples/python/comm-pingpong/comm-pingpong.tesh @@ -0,0 +1,26 @@ +#!/usr/bin/env tesh + +p Testing with default compound + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-pingpong.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (1:pinger@Tremblay) Ping from mailbox Mailbox 1 to mailbox Mailbox 2 +>[ 0.000000] (2:ponger@Jupiter) Pong from mailbox Mailbox 2 to mailbox Mailbox 1 +>[ 0.019014] (2:ponger@Jupiter) Payload received : small communication (latency bound) +>[ 0.019014] (2:ponger@Jupiter) Ping time (latency bound) 0.019 +>[ 0.019014] (2:ponger@Jupiter) payload = 0.019 +>[150.178356] (1:pinger@Tremblay) Payload received : large communication (bandwidth bound) +>[150.178356] (1:pinger@Tremblay) Pong time (bandwidth bound): 150.159 +>[150.178356] (0:maestro@) Total simulation time: 150.178 + +p Testing with default compound Full network optimization + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-pingpong.py --platform ${platfdir}/small_platform.xml "--cfg=network/optim:Full" "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (0:maestro@) Configuration change: Set 'network/optim' to 'Full' +>[ 0.000000] (1:pinger@Tremblay) Ping from mailbox Mailbox 1 to mailbox Mailbox 2 +>[ 0.000000] (2:ponger@Jupiter) Pong from mailbox Mailbox 2 to mailbox Mailbox 1 +>[ 0.019014] (2:ponger@Jupiter) Payload received : small communication (latency bound) +>[ 0.019014] (2:ponger@Jupiter) Ping time (latency bound) 0.019 +>[ 0.019014] (2:ponger@Jupiter) payload = 0.019 +>[150.178356] (1:pinger@Tremblay) Payload received : large communication (bandwidth bound) +>[150.178356] (1:pinger@Tremblay) Pong time (bandwidth bound): 150.159 +>[150.178356] (0:maestro@) Total simulation time: 150.178 diff --git a/examples/python/comm-ready/comm-ready.py b/examples/python/comm-ready/comm-ready.py new file mode 100644 index 0000000000..646d4d1af1 --- /dev/null +++ b/examples/python/comm-ready/comm-ready.py @@ -0,0 +1,87 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +from argparse import ArgumentParser +from typing import List +import sys + +from simgrid import Actor, Comm, Engine, Mailbox, this_actor + + +FINALIZE_MESSAGE = "finalize" + + +def create_parser() -> ArgumentParser: + parser = ArgumentParser() + parser.add_argument( + '--platform', + type=str, + required=True, + help='path to the platform description' + ) + return parser + + +def get_peer_mailbox(peer_id: int) -> Mailbox: + return Mailbox.by_name(f"peer-{peer_id}") + + +def peer(my_id: int, message_count: int, payload_size: int, peers_count: int): + my_mailbox: Mailbox = get_peer_mailbox(my_id) + my_mailbox.set_receiver(Actor.self()) + pending_comms: List[Comm] = [] + # Start dispatching all messages to peers others that myself + for i in range(message_count): + for peer_id in range(peers_count): + if peer_id != my_id: + peer_mailbox = get_peer_mailbox(peer_id) + message = f"Message {i} from peer {my_id}" + this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'") + pending_comms.append(peer_mailbox.put_async(message, payload_size)) + + # Start sending messages to let peers know that they should stop + for peer_id in range(peers_count): + if peer_id != my_id: + peer_mailbox = get_peer_mailbox(peer_id) + payload = str(FINALIZE_MESSAGE) + pending_comms.append(peer_mailbox.put_async(payload, payload_size)) + this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'") + this_actor.info("Done dispatching all messages") + + # Retrieve all the messages other peers have been sending to me until I receive all the corresponding "Finalize" + # messages + pending_finalize_messages = peers_count - 1 + while pending_finalize_messages > 0: + if my_mailbox.ready: + start = Engine.clock + received: str = my_mailbox.get() + waiting_time = Engine.clock - start + if waiting_time != 0.0: + raise AssertionError(f"Expecting the waiting time to be 0.0 because the communication was supposedly " + f"ready, but got {waiting_time} instead") + this_actor.info(f"I got a '{received}'.") + if received == FINALIZE_MESSAGE: + pending_finalize_messages -= 1 + else: + this_actor.info("Nothing ready to consume yet, I better sleep for a while") + this_actor.sleep_for(0.01) + + this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting") + Comm.wait_all(pending_comms) + this_actor.info("Goodbye now!") + + +def main(): + settings = create_parser().parse_known_args()[0] + e = Engine(sys.argv) + e.load_platform(settings.platform) + Actor.create("peer", e.host_by_name("Tremblay"), peer, 0, 2, int(5e7), 3) + Actor.create("peer", e.host_by_name("Ruby"), peer, 1, 6, int(2.5e5), 3) + Actor.create("peer", e.host_by_name("Perl"), peer, 2, 0, int(5e7), 3) + e.run() + + +if __name__ == "__main__": + main() diff --git a/examples/python/comm-ready/comm-ready.tesh b/examples/python/comm-ready/comm-ready.tesh new file mode 100644 index 0000000000..638bbab6be --- /dev/null +++ b/examples/python/comm-ready/comm-ready.tesh @@ -0,0 +1,91 @@ +#!/usr/bin/env tesh + +p Peer sending and receiving + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-ready.py --platform ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (1:peer@Tremblay) Send 'Message 0 from peer 0' to 'peer-1' +>[ 0.000000] (2:peer@Ruby) Send 'Message 0 from peer 1' to 'peer-0' +>[ 0.000000] (3:peer@Perl) Send 'finalize' to 'peer-0' +>[ 0.000000] (1:peer@Tremblay) Send 'Message 0 from peer 0' to 'peer-2' +>[ 0.000000] (2:peer@Ruby) Send 'Message 0 from peer 1' to 'peer-2' +>[ 0.000000] (3:peer@Perl) Send 'finalize' to 'peer-1' +>[ 0.000000] (3:peer@Perl) Done dispatching all messages +>[ 0.000000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.000000] (1:peer@Tremblay) Send 'Message 1 from peer 0' to 'peer-1' +>[ 0.000000] (2:peer@Ruby) Send 'Message 1 from peer 1' to 'peer-0' +>[ 0.000000] (1:peer@Tremblay) Send 'Message 1 from peer 0' to 'peer-2' +>[ 0.000000] (2:peer@Ruby) Send 'Message 1 from peer 1' to 'peer-2' +>[ 0.000000] (2:peer@Ruby) Send 'Message 2 from peer 1' to 'peer-0' +>[ 0.000000] (1:peer@Tremblay) Send 'finalize' to 'peer-1' +>[ 0.000000] (2:peer@Ruby) Send 'Message 2 from peer 1' to 'peer-2' +>[ 0.000000] (1:peer@Tremblay) Send 'finalize' to 'peer-2' +>[ 0.000000] (1:peer@Tremblay) Done dispatching all messages +>[ 0.000000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.000000] (2:peer@Ruby) Send 'Message 3 from peer 1' to 'peer-0' +>[ 0.000000] (2:peer@Ruby) Send 'Message 3 from peer 1' to 'peer-2' +>[ 0.000000] (2:peer@Ruby) Send 'Message 4 from peer 1' to 'peer-0' +>[ 0.000000] (2:peer@Ruby) Send 'Message 4 from peer 1' to 'peer-2' +>[ 0.000000] (2:peer@Ruby) Send 'Message 5 from peer 1' to 'peer-0' +>[ 0.000000] (2:peer@Ruby) Send 'Message 5 from peer 1' to 'peer-2' +>[ 0.000000] (2:peer@Ruby) Send 'finalize' to 'peer-0' +>[ 0.000000] (2:peer@Ruby) Send 'finalize' to 'peer-2' +>[ 0.000000] (2:peer@Ruby) Done dispatching all messages +>[ 0.000000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.010000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.010000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.010000] (1:peer@Tremblay) I got a 'Message 0 from peer 1'. +>[ 0.010000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.020000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.020000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.020000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.030000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.030000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.030000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.040000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.040000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.040000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.050000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.050000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.050000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.060000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.060000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.060000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.070000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.070000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.070000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.080000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.080000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.080000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.090000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.090000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.090000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.100000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while +>[ 0.100000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while +>[ 0.100000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while +>[ 0.110000] (2:peer@Ruby) I got a 'Message 0 from peer 0'. +>[ 0.110000] (3:peer@Perl) I got a 'Message 0 from peer 0'. +>[ 0.110000] (1:peer@Tremblay) I got a 'finalize'. +>[ 0.110000] (2:peer@Ruby) I got a 'finalize'. +>[ 0.110000] (3:peer@Perl) I got a 'Message 0 from peer 1'. +>[ 0.110000] (1:peer@Tremblay) I got a 'Message 1 from peer 1'. +>[ 0.110000] (2:peer@Ruby) I got a 'Message 1 from peer 0'. +>[ 0.110000] (3:peer@Perl) I got a 'Message 1 from peer 0'. +>[ 0.110000] (1:peer@Tremblay) I got a 'Message 2 from peer 1'. +>[ 0.110000] (2:peer@Ruby) I got a 'finalize'. +>[ 0.110000] (2:peer@Ruby) I'm done, just waiting for my peers to receive the messages before exiting +>[ 0.110000] (3:peer@Perl) I got a 'Message 1 from peer 1'. +>[ 0.110000] (1:peer@Tremblay) I got a 'Message 3 from peer 1'. +>[ 0.110000] (3:peer@Perl) I got a 'finalize'. +>[ 0.110000] (1:peer@Tremblay) I got a 'Message 4 from peer 1'. +>[ 0.110000] (3:peer@Perl) I got a 'Message 2 from peer 1'. +>[ 0.110000] (1:peer@Tremblay) I got a 'Message 5 from peer 1'. +>[ 0.110000] (3:peer@Perl) I got a 'Message 3 from peer 1'. +>[ 0.110000] (1:peer@Tremblay) I got a 'finalize'. +>[ 0.110000] (1:peer@Tremblay) I'm done, just waiting for my peers to receive the messages before exiting +>[ 0.110000] (3:peer@Perl) I got a 'Message 4 from peer 1'. +>[ 0.110000] (3:peer@Perl) I got a 'Message 5 from peer 1'. +>[ 0.110000] (3:peer@Perl) I got a 'finalize'. +>[ 0.110000] (3:peer@Perl) I'm done, just waiting for my peers to receive the messages before exiting +>[ 0.110000] (1:peer@Tremblay) Goodbye now! +>[ 0.110000] (2:peer@Ruby) Goodbye now! +>[ 0.110000] (3:peer@Perl) Goodbye now! diff --git a/examples/python/comm-serialize/comm-serialize.py b/examples/python/comm-serialize/comm-serialize.py new file mode 100644 index 0000000000..e02708f2fa --- /dev/null +++ b/examples/python/comm-serialize/comm-serialize.py @@ -0,0 +1,92 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +from typing import List, Tuple +import sys + +from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync + + +RECEIVER_MAILBOX_NAME = "receiver" + + +class Sender(object): + def __init__(self, message_size: int, messages_count: int): + self.message_size = message_size + self.messages_count = messages_count + + def __call__(self) -> None: + # List in which we store all ongoing communications + pending_comms: List[Comm] = [] + + # Make a vector of the mailboxes to use + receiver_mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME) + for i in range(self.messages_count): + message_content = f"Message {i}" + this_actor.info(f"Send '{message_content}' to '{receiver_mailbox.name}'") + # Create a communication representing the ongoing communication, and store it in pending_comms + pending_comms.append(receiver_mailbox.put_async(message_content, self.message_size)) + + this_actor.info("Done dispatching all messages") + + # Now that all message exchanges were initiated, wait for their completion in one single call + Comm.wait_all(pending_comms) + + this_actor.info("Goodbye now!") + + +class Receiver(object): + def __init__(self, messages_count: int): + self.mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME) + self.messages_count = messages_count + + def __call__(self): + # List in which we store all incoming msgs + pending_comms: List[Tuple[Comm, PyGetAsync]] = [] + this_actor.info(f"Wait for {self.messages_count} messages asynchronously") + for i in range(self.messages_count): + pending_comms.append(self.mailbox.get_async()) + while pending_comms: + index = Comm.wait_any([comm for (comm, _) in pending_comms]) + _, async_data = pending_comms[index] + this_actor.info(f"I got '{async_data.get()}'.") + pending_comms.pop(index) + + +def main(): + e = Engine(sys.argv) + # Creates the platform + # ________ __________ + # | Sender |===============| Receiver | + # |________| Link1 |__________| + # + zone: NetZone = NetZone.create_full_zone("Zone1") + sender_host: Host = zone.create_host("sender", 1).seal() + receiver_host: Host = zone.create_host("receiver", 1).seal() + + # create split-duplex link1 (UP/DOWN), limiting the number of concurrent flows in it for 2 + link = zone.create_split_duplex_link("link1", 10e9).set_latency(10e-6).set_concurrency_limit(2).seal() + + # create routes between nodes + zone.add_route( + sender_host.netpoint, + receiver_host.netpoint, + None, + None, + [LinkInRoute(link, LinkInRoute.UP)], + True + ) + zone.seal() + + # create actors Sender/Receiver + messages_count = 10 + Actor.create("receiver", receiver_host, Receiver(messages_count=messages_count)) + Actor.create("sender", sender_host, Sender(messages_count=messages_count, message_size=int(1e6))) + + e.run() + + +if __name__ == "__main__": + main() diff --git a/examples/python/comm-serialize/comm-serialize.tesh b/examples/python/comm-serialize/comm-serialize.tesh new file mode 100644 index 0000000000..0a0174850e --- /dev/null +++ b/examples/python/comm-serialize/comm-serialize.tesh @@ -0,0 +1,26 @@ +#!/usr/bin/env tesh + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-serialize.py "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (1:receiver@receiver) Wait for 10 messages asynchronously +>[ 0.000000] (2:sender@sender) Send 'Message 0' to 'receiver' +>[ 0.000000] (2:sender@sender) Send 'Message 1' to 'receiver' +>[ 0.000000] (2:sender@sender) Send 'Message 2' to 'receiver' +>[ 0.000000] (2:sender@sender) Send 'Message 3' to 'receiver' +>[ 0.000000] (2:sender@sender) Send 'Message 4' to 'receiver' +>[ 0.000000] (2:sender@sender) Send 'Message 5' to 'receiver' +>[ 0.000000] (2:sender@sender) Send 'Message 6' to 'receiver' +>[ 0.000000] (2:sender@sender) Send 'Message 7' to 'receiver' +>[ 0.000000] (2:sender@sender) Send 'Message 8' to 'receiver' +>[ 0.000000] (2:sender@sender) Send 'Message 9' to 'receiver' +>[ 0.000000] (2:sender@sender) Done dispatching all messages +>[ 0.000336] (1:receiver@receiver) I got 'Message 1'. +>[ 0.000336] (1:receiver@receiver) I got 'Message 0'. +>[ 0.000542] (1:receiver@receiver) I got 'Message 3'. +>[ 0.000542] (1:receiver@receiver) I got 'Message 2'. +>[ 0.000749] (1:receiver@receiver) I got 'Message 5'. +>[ 0.000749] (1:receiver@receiver) I got 'Message 4'. +>[ 0.000955] (1:receiver@receiver) I got 'Message 7'. +>[ 0.000955] (1:receiver@receiver) I got 'Message 6'. +>[ 0.001161] (1:receiver@receiver) I got 'Message 9'. +>[ 0.001161] (1:receiver@receiver) I got 'Message 8'. +>[ 0.001161] (2:sender@sender) Goodbye now! diff --git a/examples/python/comm-suspend/comm-suspend.py b/examples/python/comm-suspend/comm-suspend.py new file mode 100644 index 0000000000..126b7caeb6 --- /dev/null +++ b/examples/python/comm-suspend/comm-suspend.py @@ -0,0 +1,67 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +""" This example shows how to suspend and resume an asynchronous communication. +""" + +from argparse import ArgumentParser +import sys + +from simgrid import Actor, Comm, Engine, Mailbox, this_actor + + +def create_parser() -> ArgumentParser: + parser = ArgumentParser() + parser.add_argument( + '--platform', + type=str, + required=True, + help='path to the platform description' + ) + return parser + + +def sender(): + mailbox: Mailbox = Mailbox.by_name("receiver") + payload = "Sent message" + + # Create a communication representing the ongoing communication and then + simulated_size_in_bytes = 13194230 + comm: Comm = mailbox.put_init(payload, simulated_size_in_bytes) + this_actor.info(f"Suspend the communication before it starts (remaining: {comm.remaining:.0f} bytes)" + f" and wait a second.") + this_actor.sleep_for(1) + this_actor.info(f"Now, start the communication (remaining: {comm.remaining:.0f} bytes) and wait another second.") + comm.start() + this_actor.sleep_for(1) + this_actor.info(f"There is still {comm.remaining:.0f} bytes to transfer in this communication." + " Suspend it for one second.") + comm.suspend() + this_actor.info(f"Now there is {comm.remaining:.0f} bytes to transfer. Resume it and wait for its completion.") + comm.resume() + comm.wait() + this_actor.info(f"There is {comm.remaining:.0f} bytes to transfer after the communication completion.") + this_actor.info(f"Suspending a completed activity is a no-op.") + comm.suspend() + + +def receiver(): + mailbox: Mailbox = Mailbox.by_name("receiver") + this_actor.info("Wait for the message.") + received: str = mailbox.get() + this_actor.info(f"I got '{received}'.") + + +def main(): + settings = create_parser().parse_known_args()[0] + e = Engine(sys.argv) + e.load_platform(settings.platform) + Actor.create("sender", e.host_by_name("Tremblay"), sender) + Actor.create("receiver", e.host_by_name("Jupiter"), receiver) + e.run() + + +if __name__ == "__main__": + main() diff --git a/examples/python/comm-suspend/comm-suspend.tesh b/examples/python/comm-suspend/comm-suspend.tesh new file mode 100644 index 0000000000..8715ece386 --- /dev/null +++ b/examples/python/comm-suspend/comm-suspend.tesh @@ -0,0 +1,11 @@ +#!/usr/bin/env tesh + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-suspend.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +> [ 0.000000] (1:sender@Tremblay) Suspend the communication before it starts (remaining: 13194230 bytes) and wait a second. +> [ 0.000000] (2:receiver@Jupiter) Wait for the message. +> [ 1.000000] (1:sender@Tremblay) Now, start the communication (remaining: 13194230 bytes) and wait another second. +> [ 2.000000] (1:sender@Tremblay) There is still 6660438 bytes to transfer in this communication. Suspend it for one second. +> [ 2.000000] (1:sender@Tremblay) Now there is 6660438 bytes to transfer. Resume it and wait for its completion. +> [ 3.000000] (2:receiver@Jupiter) I got 'Sent message'. +> [ 3.000000] (1:sender@Tremblay) There is 0 bytes to transfer after the communication completion. +> [ 3.000000] (1:sender@Tremblay) Suspending a completed activity is a no-op. diff --git a/examples/python/comm-testany/comm-testany.py b/examples/python/comm-testany/comm-testany.py new file mode 100644 index 0000000000..1b32047802 --- /dev/null +++ b/examples/python/comm-testany/comm-testany.py @@ -0,0 +1,72 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +from argparse import ArgumentParser +from typing import List +import sys + +from simgrid import Engine, Actor, Comm, Mailbox, this_actor + + +def create_parser() -> ArgumentParser: + parser = ArgumentParser() + parser.add_argument( + '--platform', + type=str, + required=True, + help='path to the platform description' + ) + return parser + + +def rank0(): + rank0_mailbox: Mailbox = Mailbox.by_name("rank0") + this_actor.info("Post my asynchronous receives") + comm1, a1 = rank0_mailbox.get_async() + comm2, a2 = rank0_mailbox.get_async() + comm3, a3 = rank0_mailbox.get_async() + pending_comms: List[Comm] = [comm1, comm2, comm3] + + this_actor.info("Send some data to rank-1") + rank1_mailbox: Mailbox = Mailbox.by_name("rank1") + for i in range(3): + rank1_mailbox.put(i, 1) + + this_actor.info("Test for completed comms") + while pending_comms: + flag = Comm.test_any(pending_comms) + if flag != -1: + pending_comms.pop(flag) + this_actor.info("Remove a pending comm.") + else: + # Nothing matches, wait for a little bit + this_actor.sleep_for(0.1) + this_actor.info("Last comm is complete") + + +def rank1(): + rank0_mailbox: Mailbox = Mailbox.by_name("rank0") + rank1_mailbox: Mailbox = Mailbox.by_name("rank1") + for i in range(3): + data: int = rank1_mailbox.get() + this_actor.info(f"Received {data}") + msg_content = f"Message {i}" + this_actor.info(f"Send '{msg_content}'") + rank0_mailbox.put(msg_content, int(1e6)) + + +def main(): + settings = create_parser().parse_known_args()[0] + e = Engine(sys.argv) + e.load_platform(settings.platform) + + Actor.create("rank0", e.host_by_name("Tremblay"), rank0) + Actor.create("rank1", e.host_by_name("Fafard"), rank1) + + e.run() + + +if __name__ == "__main__": + main() diff --git a/examples/python/comm-testany/comm-testany.tesh b/examples/python/comm-testany/comm-testany.tesh new file mode 100644 index 0000000000..83d80f118b --- /dev/null +++ b/examples/python/comm-testany/comm-testany.tesh @@ -0,0 +1,16 @@ +#!/usr/bin/env tesh + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-testany.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (1:rank0@Tremblay) Post my asynchronous receives +>[ 0.000000] (1:rank0@Tremblay) Send some data to rank-1 +>[ 0.025708] (2:rank1@Fafard) Received 0 +>[ 0.025708] (2:rank1@Fafard) Send 'Message 0' +>[ 0.209813] (2:rank1@Fafard) Received 1 +>[ 0.209813] (2:rank1@Fafard) Send 'Message 1' +>[ 0.393918] (1:rank0@Tremblay) Test for completed comms +>[ 0.393918] (2:rank1@Fafard) Received 2 +>[ 0.393918] (2:rank1@Fafard) Send 'Message 2' +>[ 0.393918] (1:rank0@Tremblay) Remove a pending comm. +>[ 0.393918] (1:rank0@Tremblay) Remove a pending comm. +>[ 0.593918] (1:rank0@Tremblay) Remove a pending comm. +>[ 0.593918] (1:rank0@Tremblay) Last comm is complete diff --git a/examples/python/comm-throttling/comm-throttling.py b/examples/python/comm-throttling/comm-throttling.py new file mode 100644 index 0000000000..3e7e140d98 --- /dev/null +++ b/examples/python/comm-throttling/comm-throttling.py @@ -0,0 +1,67 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +from argparse import ArgumentParser +import sys + +from simgrid import Engine, Actor, Comm, Mailbox, this_actor + + +def create_parser() -> ArgumentParser: + parser = ArgumentParser() + parser.add_argument( + '--platform', + type=str, + required=True, + help='path to the platform description' + ) + return parser + + +def sender(mailbox: Mailbox): + this_actor.info("Send at full bandwidth") + + # First send a 2.5e8 Bytes payload at full bandwidth (1.25e8 Bps) + payload = Engine.clock + mailbox.put(payload, int(2.5e8)) + + this_actor.info("Throttle the bandwidth at the Comm level") + # ... then send it again but throttle the Comm + payload = Engine.clock + # get a handler on the comm first + comm: Comm = mailbox.put_init(payload, int(2.5e8)) + + # let throttle the communication. It amounts to set the rate of the comm to half the nominal bandwidth of the link, + # i.e., 1.25e8 / 2. This second communication will thus take approximately twice as long as the first one + comm.set_rate(int(1.25e8 / 2)).wait() + + +def receiver(mailbox: Mailbox): + # Receive the first payload sent at full bandwidth + sender_time = mailbox.get() + communication_time = Engine.clock - sender_time + this_actor.info(f"Payload received (full bandwidth) in {communication_time} seconds") + + # ... Then receive the second payload sent with a throttled Comm + sender_time = mailbox.get() + communication_time = Engine.clock - sender_time + this_actor.info(f"Payload received (throttled) in {communication_time} seconds") + + +def main(): + settings = create_parser().parse_known_args()[0] + e = Engine(sys.argv) + e.load_platform(settings.platform) + + mailbox = e.mailbox_by_name_or_create("Mailbox") + + Actor.create("sender", e.host_by_name("node-0.simgrid.org"), sender, mailbox) + Actor.create("receiver", e.host_by_name("node-1.simgrid.org"), receiver, mailbox) + + e.run() + + +if __name__ == "__main__": + main() diff --git a/examples/python/comm-throttling/comm-throttling.tesh b/examples/python/comm-throttling/comm-throttling.tesh new file mode 100644 index 0000000000..366b83d215 --- /dev/null +++ b/examples/python/comm-throttling/comm-throttling.tesh @@ -0,0 +1,7 @@ +#!/usr/bin/env tesh + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-throttling.py --platform ${platfdir}/cluster_backbone.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (1:sender@node-0.simgrid.org) Send at full bandwidth +>[ 2.069662] (1:sender@node-0.simgrid.org) Throttle the bandwidth at the Comm level +>[ 2.069662] (2:receiver@node-1.simgrid.org) Payload received (full bandwidth) in 2.0696616701030925 seconds +>[ 6.077468] (2:receiver@node-1.simgrid.org) Payload received (throttled) in 4.007806 seconds diff --git a/examples/python/comm-waitallfor/comm-waitallfor.py b/examples/python/comm-waitallfor/comm-waitallfor.py index ecd697abe1..fd94bcdf75 100644 --- a/examples/python/comm-waitallfor/comm-waitallfor.py +++ b/examples/python/comm-waitallfor/comm-waitallfor.py @@ -34,6 +34,7 @@ def create_parser() -> ArgumentParser: parser.add_argument( '--platform', type=str, + required=True, help='path to the platform description' ) parser.add_argument( diff --git a/examples/python/comm-waituntil/comm-waituntil.py b/examples/python/comm-waituntil/comm-waituntil.py new file mode 100644 index 0000000000..0209dc86a1 --- /dev/null +++ b/examples/python/comm-waituntil/comm-waituntil.py @@ -0,0 +1,77 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +""" This example shows how to suspend and resume an asynchronous communication. +""" + +from argparse import ArgumentParser +from typing import List +import sys + +from simgrid import Actor, Comm, Engine, Mailbox, this_actor + + +FINALIZE_MESSAGE = "finalize" + + +def create_parser() -> ArgumentParser: + parser = ArgumentParser() + parser.add_argument( + '--platform', + type=str, + required=True, + help='path to the platform description' + ) + return parser + + +def sender(receiver_mailbox: Mailbox, messages_count: int, payload_size: int): + pending_comms: List[Comm] = [] + # Start dispatching all messages to the receiver + for i in range(messages_count): + payload = f"Message {i}" + this_actor.info(f"Send '{payload}' to '{receiver_mailbox.name}'") + # Create a communication representing the ongoing communication + comm = receiver_mailbox.put_async(payload, payload_size) + # Add this comm to the vector of all known comms + pending_comms.append(comm) + + # Start the finalize signal to the receiver + final_comm = receiver_mailbox.put_async(FINALIZE_MESSAGE, 0) + pending_comms.append(final_comm) + this_actor.info(f"Send '{FINALIZE_MESSAGE}' to '{receiver_mailbox.name}'") + this_actor.info("Done dispatching all messages") + + # Now that all message exchanges were initiated, wait for their completion, in order of creation + while pending_comms: + comm = pending_comms[-1] + comm.wait_until(Engine.clock + 1) + pending_comms.pop() # remove it from the list + this_actor.info("Goodbye now!") + + +def receiver(mailbox: Mailbox): + this_actor.info("Wait for my first message") + finalized = False + while not finalized: + received: str = mailbox.get() + this_actor.info(f"I got a '{received}'.") + # If it's a finalize message, we're done. + if received == FINALIZE_MESSAGE: + finalized = True + + +def main(): + settings = create_parser().parse_known_args()[0] + e = Engine(sys.argv) + e.load_platform(settings.platform) + receiver_mailbox: Mailbox = Mailbox.by_name("receiver") + Actor.create("sender", e.host_by_name("Tremblay"), sender, receiver_mailbox, 3, int(5e7)) + Actor.create("receiver", e.host_by_name("Ruby"), receiver, receiver_mailbox) + e.run() + + +if __name__ == "__main__": + main() diff --git a/examples/python/comm-waituntil/comm-waituntil.tesh b/examples/python/comm-waituntil/comm-waituntil.tesh new file mode 100644 index 0000000000..1837f0d3cf --- /dev/null +++ b/examples/python/comm-waituntil/comm-waituntil.tesh @@ -0,0 +1,14 @@ +#!/usr/bin/env tesh + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waituntil.py --platform ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver' +>[ 0.000000] (2:receiver@Ruby) Wait for my first message +>[ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver' +>[ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver' +>[ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver' +>[ 0.000000] (1:sender@Tremblay) Done dispatching all messages +>[ 0.105458] (2:receiver@Ruby) I got a 'Message 0'. +>[ 0.210917] (2:receiver@Ruby) I got a 'Message 1'. +>[ 0.316375] (2:receiver@Ruby) I got a 'Message 2'. +>[ 0.318326] (2:receiver@Ruby) I got a 'finalize'. +>[ 0.318326] (1:sender@Tremblay) Goodbye now! diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index c7762da72c..78f34a2ede 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -49,6 +49,8 @@ using simgrid::s4u::Actor; using simgrid::s4u::ActorPtr; using simgrid::s4u::Barrier; using simgrid::s4u::BarrierPtr; +using simgrid::s4u::Comm; +using simgrid::s4u::CommPtr; using simgrid::s4u::Engine; using simgrid::s4u::Host; using simgrid::s4u::Link; @@ -182,6 +184,8 @@ PYBIND11_MODULE(simgrid, m) "get_all_hosts() is deprecated and will be dropped after v3.33, use all_hosts instead.", 1); return self.attr("all_hosts"); }) + .def("host_by_name", &Engine::host_by_name_or_null, py::call_guard(), + "Retrieve a host by its name, or None if it does not exist in the platform.") .def_property_readonly("all_hosts", &Engine::get_all_hosts, "Returns the list of all hosts found in the platform") .def("get_all_links", [](py::object self) // XBT_ATTRIB_DEPRECATED_v334 @@ -214,6 +218,10 @@ PYBIND11_MODULE(simgrid, m) .def("netzone_by_name", &Engine::netzone_by_name_or_null) .def("load_platform", &Engine::load_platform, "Load a platform file describing the environment") .def("load_deployment", &Engine::load_deployment, "Load a deployment file and launch the actors that it contains") + .def("mailbox_by_name_or_create", &Engine::mailbox_by_name_or_create, + py::call_guard(), + py::arg("name"), + "Find a mailbox from its name or create one if it does not exist") .def("run", &Engine::run, py::call_guard(), "Run the simulation until its end") .def("run_until", py::overload_cast(&Engine::run_until, py::const_), py::call_guard(), "Run the simulation until the given date", @@ -645,6 +653,8 @@ PYBIND11_MODULE(simgrid, m) return std::string(self->get_name().c_str()); // Convert from xbt::string because of MC }, "The name of that mailbox (read-only property).") + .def_property_readonly("ready", &Mailbox::ready, py::call_guard(), + "Check if there is a communication ready to be consumed from a mailbox.") .def( "put", [](Mailbox* self, py::object data, int size, double timeout) { @@ -684,7 +694,7 @@ PYBIND11_MODULE(simgrid, m) py::call_guard(), "Blocking data reception") .def( "get_async", - [](Mailbox* self) -> std::tuple { + [](Mailbox* self) -> std::tuple { PyGetAsync wrap; auto comm = self->get_async(wrap.get()); return std::make_tuple(std::move(comm), std::move(wrap)); @@ -703,45 +713,93 @@ PYBIND11_MODULE(simgrid, m) "Get python object after async communication in receiver side"); /* Class Comm */ - py::class_(m, "Comm", - "Communication. See the C++ documentation for details.") - .def("test", &simgrid::s4u::Comm::test, py::call_guard(), + py::class_(m, "Comm", "Communication. See the C++ documentation for details.") + .def_property_readonly("dst_data_size", &Comm::get_dst_data_size, + py::call_guard(), + "Retrieve the size of the received data.") + .def_property_readonly("mailbox", &Comm::get_mailbox, + py::call_guard(), + "Retrieve the mailbox on which this comm acts.") + .def_property_readonly("sender", &Comm::get_sender, + py::call_guard()) + .def_property_readonly("state_str", [](Comm* self){ return std::string(self->get_state_str()); }, + py::call_guard(), + "Retrieve the Comm state as string") + .def_property_readonly("remaining", &Comm::get_remaining, + py::call_guard(), + "Remaining amount of work that this Comm entails") + .def_property_readonly("start_time", &Comm::get_start_time, + py::call_guard(), + "Time at which this Comm started") + .def_property_readonly("finish_time", &Comm::get_finish_time, + py::call_guard(), + "Time at which this Comm finished") + .def("set_payload_size", &Comm::set_payload_size, py::call_guard(), + py::arg("bytes"), + "Specify the amount of bytes which exchange should be simulated.") + .def("set_rate", &Comm::set_rate, py::call_guard(), + py::arg("rate"), + "Sets the maximal communication rate (in byte/sec). Must be done before start") + .def("cancel", [](Comm* self){ return self->cancel(); }, + py::call_guard(), py::return_value_policy::reference_internal, + "Cancel the activity.") + .def("start", [](Comm* self){ return self->start(); }, + py::call_guard(), py::return_value_policy::reference_internal, + "Starts a previously created activity. This function is optional: you can call wait() even if you didn't " + "call start()") + .def("suspend", [](Comm* self){ return self->suspend(); }, + py::call_guard(), py::return_value_policy::reference_internal, + "Suspend the activity.") + .def("resume", [](Comm* self){ return self->resume(); }, + py::call_guard(), py::return_value_policy::reference_internal, + "Resume the activity.") + .def("test", &Comm::test, py::call_guard(), "Test whether the communication is terminated.") - .def("wait", &simgrid::s4u::Comm::wait, py::call_guard(), + .def("wait", &Comm::wait, py::call_guard(), "Block until the completion of that communication.") - .def("wait_for", &simgrid::s4u::Comm::wait_for, + .def("wait_for", &Comm::wait_for, py::call_guard(), py::arg("timeout"), - py::call_guard(), "Block until the completion of that communication, or raises TimeoutException after the specified timeout.") - .def("detach", [](simgrid::s4u::Comm* self) { - return self->detach(); - }, + .def("wait_until", &Comm::wait_until, py::call_guard(), + py::arg("time_limit"), + "Block until the completion of that communication, or raises TimeoutException after the specified time.") + .def("detach", [](Comm* self) { return self->detach(); }, py::return_value_policy::reference_internal, py::call_guard(), "Start the comm, and ignore its result. It can be completely forgotten after that.") - .def_static( - "wait_all", &simgrid::s4u::Comm::wait_all, - py::arg("comms"), - py::call_guard(), - "Block until the completion of all communications in the list.") - .def_static("wait_all_for", &simgrid::s4u::Comm::wait_all_for, - py::arg("comms"), py::arg("timeout"), + .def_static("sendto", &Comm::sendto, py::call_guard(), + py::arg("from"), py::arg("to"), py::arg("simulated_size_in_bytes"), + "Do a blocking communication between two arbitrary hosts.") + .def_static("sendto_init", py::overload_cast(&Comm::sendto_init), + py::call_guard(), + py::arg("from"), py::arg("to"), + "Creates a communication between the two given hosts, bypassing the mailbox mechanism.") + .def_static("sendto_async", &Comm::sendto_async, py::call_guard(), + py::arg("from"), py::arg("to"), py::arg("simulated_size_in_bytes"), + "Do a blocking communication between two arbitrary hosts.\n\nThis initializes a communication that " + "completely bypass the mailbox and actors mechanism. There is really no limit on the hosts involved. " + "In particular, the actor does not have to be on one of the involved hosts.") + .def_static("test_any", &Comm::test_any, py::call_guard(), + py::arg("comms"), + "take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done)") + .def_static("wait_all", &Comm::wait_all, py::call_guard(), + py::arg("comms"), + "Block until the completion of all communications in the list.") + .def_static("wait_all_for", &Comm::wait_all_for, py::call_guard(), + py::arg("comms"), py::arg("timeout"), "Block until the completion of all communications in the list, or raises TimeoutException after " "the specified timeout.") - .def_static( - "wait_any", &simgrid::s4u::Comm::wait_any, - py::arg("comms"), - py::call_guard(), - "Block until the completion of any communication in the list and return the index of the terminated one.") - .def_static( - "wait_any_for", - &simgrid::s4u::Comm::wait_any_for, - py::arg("comms"), py::arg("timeout"), - py::call_guard(), - "Block until the completion of any communication in the list and return the index of the terminated " - "one, or -1 if a timeout occurred." - ); + .def_static("wait_any", &Comm::wait_any, + py::call_guard(), + py::arg("comms"), + "Block until the completion of any communication in the list and return the index of the " + "terminated one.") + .def_static("wait_any_for", &Comm::wait_any_for, + py::call_guard(), + py::arg("comms"), py::arg("timeout"), + "Block until the completion of any communication in the list and return the index of the terminated " + "one, or -1 if a timeout occurred."); /* Class Io */ py::class_(m, "Io", "I/O activities. See the C++ documentation for details.") diff --git a/src/s4u/s4u_Engine.cpp b/src/s4u/s4u_Engine.cpp index 9f4dd40973..996ce73ad9 100644 --- a/src/s4u/s4u_Engine.cpp +++ b/src/s4u/s4u_Engine.cpp @@ -256,7 +256,7 @@ Link* Engine::link_by_name_or_null(const std::string& name) const return link == pimpl->links_.end() ? nullptr : link->second->get_iface(); } -/** @brief Find a mailox from its name or create one if it does not exist) */ +/** @brief Find a mailbox from its name or create one if it does not exist) */ Mailbox* Engine::mailbox_by_name_or_create(const std::string& name) const { /* two actors may have pushed the same mbox_create simcall at the same time */ -- 2.20.1