Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'add_missing_comm_python_bindings' into 'master'
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 30 Mar 2022 21:30:16 +0000 (21:30 +0000)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 30 Mar 2022 21:30:16 +0000 (21:30 +0000)
Add remaining Comm bindings and examples

See merge request simgrid/simgrid!93

26 files changed:
ChangeLog
MANIFEST.in
docs/source/app_s4u.rst
examples/README.rst
examples/python/CMakeLists.txt
examples/python/comm-failure/comm-failure.py [new file with mode: 0644]
examples/python/comm-failure/comm-failure.tesh [new file with mode: 0644]
examples/python/comm-host2host/comm-host2host.py [new file with mode: 0644]
examples/python/comm-host2host/comm-host2host.tesh [new file with mode: 0644]
examples/python/comm-pingpong/comm-pingpong.py [new file with mode: 0644]
examples/python/comm-pingpong/comm-pingpong.tesh [new file with mode: 0644]
examples/python/comm-ready/comm-ready.py [new file with mode: 0644]
examples/python/comm-ready/comm-ready.tesh [new file with mode: 0644]
examples/python/comm-serialize/comm-serialize.py [new file with mode: 0644]
examples/python/comm-serialize/comm-serialize.tesh [new file with mode: 0644]
examples/python/comm-suspend/comm-suspend.py [new file with mode: 0644]
examples/python/comm-suspend/comm-suspend.tesh [new file with mode: 0644]
examples/python/comm-testany/comm-testany.py [new file with mode: 0644]
examples/python/comm-testany/comm-testany.tesh [new file with mode: 0644]
examples/python/comm-throttling/comm-throttling.py [new file with mode: 0644]
examples/python/comm-throttling/comm-throttling.tesh [new file with mode: 0644]
examples/python/comm-waitallfor/comm-waitallfor.py
examples/python/comm-waituntil/comm-waituntil.py [new file with mode: 0644]
examples/python/comm-waituntil/comm-waituntil.tesh [new file with mode: 0644]
src/bindings/python/simgrid_python.cpp
src/s4u/s4u_Engine.cpp

index 1c5f22d..fcd0729 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,22 @@
 SimGrid (3.31.1) NOT RELEASED YET (v3.32 expected June 21. 2022, 09:13 UTC)
 
+Python:
+  - Added the following bindings / examples:
+    - Comm (now 100% covers the C++ interface):
+      - Comm.dst_data_size, Comm.mailbox, Comm.sender, Comm.start_time, Comm.finish_time
+      - Comm.state_str [examples: examples/python/comm-failure/, examples/python/comm-host2host/]
+      - Comm.remaining [examples: examples/python/comm-host2host/, examples/python/comm-suspend/]
+      - Comm.set_payload_size [example: examples/python/comm-host2host/]
+      - Comm.set_rate [example: examples/python/comm-throttling/]
+      - Comm.sendto, Comm.sendto_init, Comm.sendto_async [example: examples/python/comm-host2host/]
+      - Comm.start, Comm.suspend, Comm.resume [example: examples/python/comm-host2host/]
+      - Comm.test_any [example: examples/python/comm-testany/]
+      - Comm.wait_until [example: examples/python/comm-waituntil/]
+    - Engine:
+      - Engine.host_by_name [example: examples/python/comm-host2host/]
+      - Engine.mailbox_by_name_or_create [example: examples/python/comm-pingpong/]
+    - Mailbox: Mailbox.ready [example: examples/python/comm-ready/]
+
 ----------------------------------------------------------------------------
 
 S4U:
index ffbad75..7f28604 100644 (file)
@@ -521,6 +521,22 @@ include examples/python/app-masterworkers/app-masterworkers.py
 include examples/python/app-masterworkers/app-masterworkers.tesh
 include examples/python/clusters-multicpu/clusters-multicpu.py
 include examples/python/clusters-multicpu/clusters-multicpu.tesh
+include examples/python/comm-failure/comm-failure.py
+include examples/python/comm-failure/comm-failure.tesh
+include examples/python/comm-host2host/comm-host2host.py
+include examples/python/comm-host2host/comm-host2host.tesh
+include examples/python/comm-pingpong/comm-pingpong.py
+include examples/python/comm-pingpong/comm-pingpong.tesh
+include examples/python/comm-ready/comm-ready.py
+include examples/python/comm-ready/comm-ready.tesh
+include examples/python/comm-serialize/comm-serialize.py
+include examples/python/comm-serialize/comm-serialize.tesh
+include examples/python/comm-suspend/comm-suspend.py
+include examples/python/comm-suspend/comm-suspend.tesh
+include examples/python/comm-testany/comm-testany.py
+include examples/python/comm-testany/comm-testany.tesh
+include examples/python/comm-throttling/comm-throttling.py
+include examples/python/comm-throttling/comm-throttling.tesh
 include examples/python/comm-wait/comm-wait.py
 include examples/python/comm-wait/comm-wait.tesh
 include examples/python/comm-waitall/comm-waitall.py
@@ -531,6 +547,8 @@ include examples/python/comm-waitany/comm-waitany.py
 include examples/python/comm-waitany/comm-waitany.tesh
 include examples/python/comm-waitfor/comm-waitfor.py
 include examples/python/comm-waitfor/comm-waitfor.tesh
+include examples/python/comm-waituntil/comm-waituntil.py
+include examples/python/comm-waituntil/comm-waituntil.tesh
 include examples/python/exec-async/exec-async.py
 include examples/python/exec-async/exec-async.tesh
 include examples/python/exec-basic/exec-basic.py
index ededc62..a463115 100644 (file)
@@ -908,6 +908,7 @@ Retrieving hosts
    .. group-tab:: Python
 
       .. autoattribute:: simgrid.Engine.all_hosts
+      .. automethod:: simgrid::s4u::Engine::host_by_name
 
    .. group-tab:: C
 
@@ -1080,6 +1081,7 @@ Receiving data
 
        .. automethod:: simgrid.Mailbox.get
        .. automethod:: simgrid.Mailbox.get_async
+       .. autoattribute:: simgrid.Mailbox.ready
 
    .. group-tab:: C
 
@@ -2199,6 +2201,13 @@ Querying info
 
    .. group-tab:: Python
 
+      .. autoattribute:: simgrid.Comm.dst_data_size
+      .. autoattribute:: simgrid.Comm.mailbox
+      .. autoattribute:: simgrid.Comm.sender
+      .. autoattribute:: simgrid.Comm.state_str
+      .. automethod:: simgrid.Comm.detach
+      .. automethod:: simgrid.Comm.set_payload_size
+      .. automethod:: simgrid.Comm.set_rate
       .. automethod:: simgrid.Comm.detach
 
 Life cycle
@@ -2226,16 +2235,25 @@ also start direct communications as shown below.
       .. doxygenfunction:: simgrid::s4u::Comm::wait_any(const std::vector< CommPtr >& comms)
       .. doxygenfunction:: simgrid::s4u::Comm::wait_any_for(const std::vector< CommPtr >& comms, double timeout)
       .. doxygenfunction:: simgrid::s4u::Comm::wait_for
+      .. doxygenfunction:: simgrid::s4u::Comm::wait_until
 
    .. group-tab:: Python
 
+      .. automethod:: simgrid.Comm.sendto
+      .. automethod:: simgrid.Comm.sendto_init
+      .. automethod:: simgrid.Comm.sendto_async
+
+      .. automethod:: simgrid.Comm.cancel
+      .. automethod:: simgrid.Comm.start
       .. automethod:: simgrid.Comm.test
+      .. automethod:: simgrid.Comm.test_any
       .. automethod:: simgrid.Comm.wait
       .. automethod:: simgrid.Comm.wait_for
       .. automethod:: simgrid.Comm.wait_all
       .. automethod:: simgrid.Comm.wait_all_for
       .. automethod:: simgrid.Comm.wait_any
       .. automethod:: simgrid.Comm.wait_any_for
+      .. automethod:: simgrid.Comm.wait_until
 
    .. group-tab:: C
 
index af59d29..45b314f 100644 (file)
@@ -319,6 +319,8 @@ the simulators (as detailed in Section :ref:`options`).
 
    .. example-tab:: examples/cpp/comm-pingpong/s4u-comm-pingpong.cpp
 
+   .. example-tab:: examples/python/comm-pingpong/comm-pingpong.py
+
    .. example-tab:: examples/c/comm-pingpong/comm-pingpong.c
 
 
@@ -371,6 +373,11 @@ The ``suspend()`` and ``resume()`` functions block the progression of a given co
       :cpp:func:`simgrid::s4u::Activity::resume()` and
       :cpp:func:`simgrid::s4u::Activity::is_suspended()`.
 
+   .. example-tab:: examples/python/comm-suspend/comm-suspend.py
+
+      See also :py:func:`simgrid.Comm::suspend()` and
+      :py:func:`simgrid.Comm.resume()`.
+
 Waiting for all communications in a set
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
@@ -420,6 +427,9 @@ The ``test_any()`` returns whether at least one activity of the set has complete
 
       See also :cpp:func:`simgrid::s4u::Comm::test_any()`.
 
+   .. example-tab:: examples/python/comm-testany/comm-testany.py
+
+      See also :py:func:`simgrid.Comm.test_any()`.
 
 .. _s4u_ex_execution:
 
index 9f04d7a..48f69d4 100644 (file)
@@ -1,6 +1,7 @@
 foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
         app-masterworkers
-        comm-wait comm-waitall comm-waitallfor comm-waitany comm-waitfor
+        comm-wait comm-waitall comm-waitallfor comm-waitany comm-waitfor comm-failure comm-host2host comm-pingpong
+        comm-ready comm-serialize comm-suspend comm-testany comm-throttling comm-waitallfor comm-waituntil
         exec-async exec-basic exec-dvfs exec-remote
         platform-profile platform-failures
         network-nonlinear clusters-multicpu io-degradation exec-cpu-nonlinear
diff --git a/examples/python/comm-failure/comm-failure.py b/examples/python/comm-failure/comm-failure.py
new file mode 100644 (file)
index 0000000..b251f78
--- /dev/null
@@ -0,0 +1,88 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+import sys
+
+from simgrid import Engine, Actor, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException
+
+
+def sender(mailbox1_name: str, mailbox2_name: str) -> None:
+    mailbox1: Mailbox = Mailbox.by_name(mailbox1_name)
+    mailbox2: Mailbox = Mailbox.by_name(mailbox2_name)
+
+    this_actor.info(f"Initiating asynchronous send to {mailbox1.name}")
+    comm1: Comm = mailbox1.put_async(666, 5)
+
+    this_actor.info(f"Initiating asynchronous send to {mailbox2.name}")
+    comm2: Comm = mailbox2.put_async(666, 2)
+
+    this_actor.info(f"Calling wait_any..")
+    pending_comms = [comm1, comm2]
+    try:
+        index = Comm.wait_any([comm1, comm2])
+        this_actor.info(f"Wait any returned index {index} (comm to {pending_comms[index].mailbox.name})")
+    except NetworkFailureException:
+        this_actor.info(f"Sender has experienced a network failure exception, so it knows that something went wrong")
+        this_actor.info(f"Now it needs to figure out which of the two comms failed by looking at their state")
+
+    this_actor.info(f"Comm to {comm1.mailbox.name} has state: {comm1.state_str}")
+    this_actor.info(f"Comm to {comm2.mailbox.name} has state: {comm2.state_str}")
+
+    try:
+        comm1.wait()
+    except NetworkFailureException:
+        this_actor.info(f"Waiting on a FAILED comm raises an exception")
+
+    this_actor.info("Wait for remaining comm, just to be nice")
+    pending_comms.pop(0)
+    try:
+        Comm.wait_any(pending_comms)
+    except Exception as e:
+        this_actor.warning(str(e))
+
+
+def receiver(mailbox_name: str) -> None:
+    mailbox: Mailbox = Mailbox.by_name(mailbox_name)
+    this_actor.info(f"Receiver posting a receive ({mailbox_name})...")
+    try:
+        mailbox.get()
+        this_actor.info(f"Receiver has received successfully ({mailbox_name})!")
+    except NetworkFailureException:
+        this_actor.info(f"Receiver has experience a network failure exception ({mailbox_name})")
+
+
+def link_killer(link_name: str) -> None:
+    link_to_kill = Link.by_name(link_name)
+    this_actor.info("sleeping 10 seconds...")
+    this_actor.sleep_for(10.0)
+    this_actor.info(f"turning off link {link_to_kill.name}")
+    link_to_kill.turn_off()
+    this_actor.info("link killed. exiting")
+
+
+def main():
+    e = Engine(sys.argv)
+    zone: NetZone = NetZone.create_full_zone("AS0")
+    host1 = zone.create_host("Host1", "1f")
+    host2 = zone.create_host("Host2", "1f")
+    host3 = zone.create_host("Host3", "1f")
+
+    link_to_2 = LinkInRoute(zone.create_link("link_to_2", "1bps").seal())
+    link_to_3 = LinkInRoute(zone.create_link("link_to_3", "1bps").seal())
+
+    zone.add_route(host1.netpoint, host2.netpoint, None, None, [link_to_2], False)
+    zone.add_route(host1.netpoint, host3.netpoint, None, None, [link_to_3], False)
+    zone.seal()
+
+    Actor.create("Sender", host1, sender, "mailbox2", "mailbox3")
+    Actor.create("Receiver-1", host2, receiver, "mailbox2").daemonize()
+    Actor.create("Receiver-2", host3, receiver, "mailbox3").daemonize()
+    Actor.create("LinkKiller", host2, link_killer, "link_to_2").daemonize()
+
+    e.run()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/python/comm-failure/comm-failure.tesh b/examples/python/comm-failure/comm-failure.tesh
new file mode 100644 (file)
index 0000000..cc78b70
--- /dev/null
@@ -0,0 +1,19 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-failure.py "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (4:LinkKiller@Host2) sleeping 10 seconds...
+>[  0.000000] (2:Receiver-1@Host2) Receiver posting a receive (mailbox2)...
+>[  0.000000] (3:Receiver-2@Host3) Receiver posting a receive (mailbox3)...
+>[  0.000000] (1:Sender@Host1) Initiating asynchronous send to mailbox2
+>[  0.000000] (1:Sender@Host1) Initiating asynchronous send to mailbox3
+>[  0.000000] (1:Sender@Host1) Calling wait_any..
+>[ 10.000000] (4:LinkKiller@Host2) turning off link link_to_2
+>[ 10.000000] (4:LinkKiller@Host2) link killed. exiting
+>[ 10.000000] (2:Receiver-1@Host2) Receiver has experience a network failure exception (mailbox2)
+>[ 10.000000] (1:Sender@Host1) Sender has experienced a network failure exception, so it knows that something went wrong
+>[ 10.000000] (1:Sender@Host1) Now it needs to figure out which of the two comms failed by looking at their state
+>[ 10.000000] (1:Sender@Host1) Comm to mailbox2 has state: FAILED
+>[ 10.000000] (1:Sender@Host1) Comm to mailbox3 has state: STARTED
+>[ 10.000000] (1:Sender@Host1) Waiting on a FAILED comm raises an exception
+>[ 10.000000] (1:Sender@Host1) Wait for remaining comm, just to be nice
+>[ 16.494845] (3:Receiver-2@Host3) Receiver has received successfully (mailbox3)!
diff --git a/examples/python/comm-host2host/comm-host2host.py b/examples/python/comm-host2host/comm-host2host.py
new file mode 100644 (file)
index 0000000..e352a96
--- /dev/null
@@ -0,0 +1,86 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+This simple example demonstrates the Comm.sento_init() Comm.sento_async() functions,
+that can be used to create a direct communication from one host to another without
+relying on the mailbox mechanism.
+
+There is not much to say, actually: The _init variant creates the communication and
+leaves it unstarted (in case you want to modify this communication before it starts),
+while the _async variant creates and start it. In both cases, you need to wait() it.
+
+It is mostly useful when you want to have a centralized simulation of your settings,
+with a central actor declaring all communications occurring on your distributed system.
+"""
+
+from argparse import ArgumentParser
+import sys
+
+from simgrid import Actor, Comm, Engine, Host, this_actor
+
+
+def create_parser() -> ArgumentParser:
+    parser = ArgumentParser()
+    parser.add_argument(
+        '--platform',
+        type=str,
+        required=True,
+        help='path to the platform description'
+    )
+    return parser
+
+
+def sender(h1: Host, h2: Host, h3: Host, h4: Host):
+    this_actor.info(f"Send c12 with sendto_async({h1.name} -> {h2.name}),"
+                    f" and c34 with sendto_init({h3.name} -> {h4.name})")
+    c12: Comm = Comm.sendto_async(h1, h2, int(1.5e7))
+    c34: Comm = Comm.sendto_init(h3, h4)
+    c34.set_payload_size(int(1e7))
+
+    # You can also detach() communications that you never plan to test() or wait().
+    # Here we create a communication that only slows down the other ones
+    noise: Comm = Comm.sendto_init(h1, h2)
+    noise.set_payload_size(10000)
+    noise.detach()
+
+    this_actor.info(f"After creation, c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);"
+                    f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)")
+    this_actor.sleep_for(1)
+    this_actor.info(f"One sec later, c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);"
+                    f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)")
+    c34.start()
+    this_actor.info(f"After c34.start(), c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);"
+                    f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)")
+    c12.wait()
+    this_actor.info(f"After c12.wait(), c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);"
+                    f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)")
+    c34.wait()
+    this_actor.info(f"After c34.wait(), c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);"
+                    f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)")
+
+    # As usual, you don't have to explicitly start communications that were just init()ed.
+    # The wait() will start it automatically.
+    c14: Comm = Comm.sendto_init(h1, h4)
+    c14.set_payload_size(100).wait()
+
+
+def main():
+    settings = create_parser().parse_known_args()[0]
+    e = Engine(sys.argv)
+    e.load_platform(settings.platform)
+    Actor.create(
+        "sender", e.host_by_name("Boivin"), sender,
+        e.host_by_name("Tremblay"),  # h1
+        e.host_by_name("Jupiter"),  # h2
+        e.host_by_name("Fafard"),  # h3
+        e.host_by_name("Ginette")  # h4
+    )
+    e.run()
+    this_actor.info(f"Total simulation time: {e.clock:.3f}")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/python/comm-host2host/comm-host2host.tesh b/examples/python/comm-host2host/comm-host2host.tesh
new file mode 100644 (file)
index 0000000..9b6466d
--- /dev/null
@@ -0,0 +1,10 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-host2host.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (1:sender@Boivin) Send c12 with sendto_async(Tremblay -> Jupiter), and c34 with sendto_init(Fafard -> Ginette)
+>[  0.000000] (1:sender@Boivin) After creation, c12 is STARTED (remaining: 1.50e+07 bytes); c34 is STARTING (remaining: 1.00e+07 bytes)
+>[  1.000000] (1:sender@Boivin) One sec later, c12 is STARTED (remaining: 8.48e+06 bytes); c34 is STARTING (remaining: 1.00e+07 bytes)
+>[  1.000000] (1:sender@Boivin) After c34.start(), c12 is STARTED (remaining: 8.48e+06 bytes); c34 is STARTED (remaining: 1.00e+07 bytes)
+>[  2.272621] (1:sender@Boivin) After c12.wait(), c12 is FINISHED (remaining: 0.00e+00 bytes); c34 is STARTED (remaining: 5.33e+05 bytes)
+>[  2.343278] (1:sender@Boivin) After c34.wait(), c12 is FINISHED (remaining: 0.00e+00 bytes); c34 is FINISHED (remaining: 0.00e+00 bytes)
+>[  2.359841] (0:maestro@) Total simulation time: 2.360
diff --git a/examples/python/comm-pingpong/comm-pingpong.py b/examples/python/comm-pingpong/comm-pingpong.py
new file mode 100644 (file)
index 0000000..d5c76f9
--- /dev/null
@@ -0,0 +1,69 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from argparse import ArgumentParser
+import sys
+
+from simgrid import Engine, Actor, Mailbox, this_actor
+
+
+def create_parser() -> ArgumentParser:
+    parser = ArgumentParser()
+    parser.add_argument(
+        '--platform',
+        type=str,
+        required=True,
+        help='path to the platform description'
+    )
+    return parser
+
+
+def pinger(mailbox_in: Mailbox, mailbox_out: Mailbox):
+    this_actor.info(f"Ping from mailbox {mailbox_in.name} to mailbox {mailbox_out.name}")
+
+    # Do the ping with a 1-Byte payload (latency bound) ...
+    payload = Engine.clock
+    mailbox_out.put(payload, 1)
+
+    # ... then wait for the (large) pong
+    sender_time: float = mailbox_in.get()
+    communication_time = Engine.clock - sender_time
+    this_actor.info("Payload received : large communication (bandwidth bound)")
+    this_actor.info(f"Pong time (bandwidth bound): {communication_time:.3f}")
+
+
+def ponger(mailbox_in: Mailbox, mailbox_out: Mailbox):
+    this_actor.info(f"Pong from mailbox {mailbox_in.name} to mailbox {mailbox_out.name}")
+
+    # Receive the (small) ping first ...
+    sender_time: float = mailbox_in.get()
+    communication_time = Engine.clock - sender_time
+    this_actor.info("Payload received : small communication (latency bound)")
+    this_actor.info(f"Ping time (latency bound) {communication_time:.3f}")
+
+    # ... Then send a 1GB pong back (bandwidth bound)
+    payload = Engine.clock
+    this_actor.info(f"payload = {payload:.3f}")
+    mailbox_out.put(payload, int(1e9))
+
+
+def main():
+    settings = create_parser().parse_known_args()[0]
+    e = Engine(sys.argv)
+    e.load_platform(settings.platform)
+
+    mb1: Mailbox = e.mailbox_by_name_or_create("Mailbox 1")
+    mb2: Mailbox = e.mailbox_by_name_or_create("Mailbox 2")
+
+    Actor.create("pinger", e.host_by_name("Tremblay"), pinger, mb1, mb2)
+    Actor.create("ponger", e.host_by_name("Jupiter"), ponger, mb2, mb1)
+
+    e.run()
+
+    this_actor.info(f"Total simulation time: {e.clock:.3f}")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/python/comm-pingpong/comm-pingpong.tesh b/examples/python/comm-pingpong/comm-pingpong.tesh
new file mode 100644 (file)
index 0000000..94d22ce
--- /dev/null
@@ -0,0 +1,26 @@
+#!/usr/bin/env tesh
+
+p Testing with default compound
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-pingpong.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (1:pinger@Tremblay) Ping from mailbox Mailbox 1 to mailbox Mailbox 2
+>[  0.000000] (2:ponger@Jupiter) Pong from mailbox Mailbox 2 to mailbox Mailbox 1
+>[  0.019014] (2:ponger@Jupiter) Payload received : small communication (latency bound)
+>[  0.019014] (2:ponger@Jupiter) Ping time (latency bound) 0.019
+>[  0.019014] (2:ponger@Jupiter) payload = 0.019
+>[150.178356] (1:pinger@Tremblay) Payload received : large communication (bandwidth bound)
+>[150.178356] (1:pinger@Tremblay) Pong time (bandwidth bound): 150.159
+>[150.178356] (0:maestro@) Total simulation time: 150.178
+
+p Testing with default compound Full network optimization
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-pingpong.py --platform ${platfdir}/small_platform.xml "--cfg=network/optim:Full" "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (0:maestro@) Configuration change: Set 'network/optim' to 'Full'
+>[  0.000000] (1:pinger@Tremblay) Ping from mailbox Mailbox 1 to mailbox Mailbox 2
+>[  0.000000] (2:ponger@Jupiter) Pong from mailbox Mailbox 2 to mailbox Mailbox 1
+>[  0.019014] (2:ponger@Jupiter) Payload received : small communication (latency bound)
+>[  0.019014] (2:ponger@Jupiter) Ping time (latency bound) 0.019
+>[  0.019014] (2:ponger@Jupiter) payload = 0.019
+>[150.178356] (1:pinger@Tremblay) Payload received : large communication (bandwidth bound)
+>[150.178356] (1:pinger@Tremblay) Pong time (bandwidth bound): 150.159
+>[150.178356] (0:maestro@) Total simulation time: 150.178
diff --git a/examples/python/comm-ready/comm-ready.py b/examples/python/comm-ready/comm-ready.py
new file mode 100644 (file)
index 0000000..646d4d1
--- /dev/null
@@ -0,0 +1,87 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from argparse import ArgumentParser
+from typing import List
+import sys
+
+from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+
+
+FINALIZE_MESSAGE = "finalize"
+
+
+def create_parser() -> ArgumentParser:
+    parser = ArgumentParser()
+    parser.add_argument(
+        '--platform',
+        type=str,
+        required=True,
+        help='path to the platform description'
+    )
+    return parser
+
+
+def get_peer_mailbox(peer_id: int) -> Mailbox:
+    return Mailbox.by_name(f"peer-{peer_id}")
+
+
+def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
+    my_mailbox: Mailbox = get_peer_mailbox(my_id)
+    my_mailbox.set_receiver(Actor.self())
+    pending_comms: List[Comm] = []
+    # Start dispatching all messages to peers others that myself
+    for i in range(message_count):
+        for peer_id in range(peers_count):
+            if peer_id != my_id:
+                peer_mailbox = get_peer_mailbox(peer_id)
+                message = f"Message {i} from peer {my_id}"
+                this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'")
+                pending_comms.append(peer_mailbox.put_async(message, payload_size))
+
+    # Start sending messages to let peers know that they should stop
+    for peer_id in range(peers_count):
+        if peer_id != my_id:
+            peer_mailbox = get_peer_mailbox(peer_id)
+            payload = str(FINALIZE_MESSAGE)
+            pending_comms.append(peer_mailbox.put_async(payload, payload_size))
+            this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'")
+    this_actor.info("Done dispatching all messages")
+
+    # Retrieve all the messages other peers have been sending to me until I receive all the corresponding "Finalize"
+    # messages
+    pending_finalize_messages = peers_count - 1
+    while pending_finalize_messages > 0:
+        if my_mailbox.ready:
+            start = Engine.clock
+            received: str = my_mailbox.get()
+            waiting_time = Engine.clock - start
+            if waiting_time != 0.0:
+                raise AssertionError(f"Expecting the waiting time to be 0.0 because the communication was supposedly "
+                                     f"ready, but got {waiting_time} instead")
+            this_actor.info(f"I got a '{received}'.")
+            if received == FINALIZE_MESSAGE:
+                pending_finalize_messages -= 1
+        else:
+            this_actor.info("Nothing ready to consume yet, I better sleep for a while")
+            this_actor.sleep_for(0.01)
+
+    this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting")
+    Comm.wait_all(pending_comms)
+    this_actor.info("Goodbye now!")
+
+
+def main():
+    settings = create_parser().parse_known_args()[0]
+    e = Engine(sys.argv)
+    e.load_platform(settings.platform)
+    Actor.create("peer", e.host_by_name("Tremblay"), peer, 0, 2, int(5e7), 3)
+    Actor.create("peer", e.host_by_name("Ruby"), peer, 1, 6, int(2.5e5), 3)
+    Actor.create("peer", e.host_by_name("Perl"), peer, 2, 0, int(5e7), 3)
+    e.run()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/python/comm-ready/comm-ready.tesh b/examples/python/comm-ready/comm-ready.tesh
new file mode 100644 (file)
index 0000000..638bbab
--- /dev/null
@@ -0,0 +1,91 @@
+#!/usr/bin/env tesh
+
+p Peer sending and receiving
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-ready.py --platform ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (1:peer@Tremblay) Send 'Message 0 from peer 0' to 'peer-1'
+>[  0.000000] (2:peer@Ruby) Send 'Message 0 from peer 1' to 'peer-0'
+>[  0.000000] (3:peer@Perl) Send 'finalize' to 'peer-0'
+>[  0.000000] (1:peer@Tremblay) Send 'Message 0 from peer 0' to 'peer-2'
+>[  0.000000] (2:peer@Ruby) Send 'Message 0 from peer 1' to 'peer-2'
+>[  0.000000] (3:peer@Perl) Send 'finalize' to 'peer-1'
+>[  0.000000] (3:peer@Perl) Done dispatching all messages
+>[  0.000000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.000000] (1:peer@Tremblay) Send 'Message 1 from peer 0' to 'peer-1'
+>[  0.000000] (2:peer@Ruby) Send 'Message 1 from peer 1' to 'peer-0'
+>[  0.000000] (1:peer@Tremblay) Send 'Message 1 from peer 0' to 'peer-2'
+>[  0.000000] (2:peer@Ruby) Send 'Message 1 from peer 1' to 'peer-2'
+>[  0.000000] (2:peer@Ruby) Send 'Message 2 from peer 1' to 'peer-0'
+>[  0.000000] (1:peer@Tremblay) Send 'finalize' to 'peer-1'
+>[  0.000000] (2:peer@Ruby) Send 'Message 2 from peer 1' to 'peer-2'
+>[  0.000000] (1:peer@Tremblay) Send 'finalize' to 'peer-2'
+>[  0.000000] (1:peer@Tremblay) Done dispatching all messages
+>[  0.000000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.000000] (2:peer@Ruby) Send 'Message 3 from peer 1' to 'peer-0'
+>[  0.000000] (2:peer@Ruby) Send 'Message 3 from peer 1' to 'peer-2'
+>[  0.000000] (2:peer@Ruby) Send 'Message 4 from peer 1' to 'peer-0'
+>[  0.000000] (2:peer@Ruby) Send 'Message 4 from peer 1' to 'peer-2'
+>[  0.000000] (2:peer@Ruby) Send 'Message 5 from peer 1' to 'peer-0'
+>[  0.000000] (2:peer@Ruby) Send 'Message 5 from peer 1' to 'peer-2'
+>[  0.000000] (2:peer@Ruby) Send 'finalize' to 'peer-0'
+>[  0.000000] (2:peer@Ruby) Send 'finalize' to 'peer-2'
+>[  0.000000] (2:peer@Ruby) Done dispatching all messages
+>[  0.000000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.010000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.010000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.010000] (1:peer@Tremblay) I got a 'Message 0 from peer 1'.
+>[  0.010000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.020000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.020000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.020000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.030000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.030000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.030000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.040000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.040000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.040000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.050000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.050000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.050000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.060000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.060000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.060000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.070000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.070000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.070000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.080000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.080000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.080000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.090000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.090000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.090000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.100000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[  0.100000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[  0.100000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[  0.110000] (2:peer@Ruby) I got a 'Message 0 from peer 0'.
+>[  0.110000] (3:peer@Perl) I got a 'Message 0 from peer 0'.
+>[  0.110000] (1:peer@Tremblay) I got a 'finalize'.
+>[  0.110000] (2:peer@Ruby) I got a 'finalize'.
+>[  0.110000] (3:peer@Perl) I got a 'Message 0 from peer 1'.
+>[  0.110000] (1:peer@Tremblay) I got a 'Message 1 from peer 1'.
+>[  0.110000] (2:peer@Ruby) I got a 'Message 1 from peer 0'.
+>[  0.110000] (3:peer@Perl) I got a 'Message 1 from peer 0'.
+>[  0.110000] (1:peer@Tremblay) I got a 'Message 2 from peer 1'.
+>[  0.110000] (2:peer@Ruby) I got a 'finalize'.
+>[  0.110000] (2:peer@Ruby) I'm done, just waiting for my peers to receive the messages before exiting
+>[  0.110000] (3:peer@Perl) I got a 'Message 1 from peer 1'.
+>[  0.110000] (1:peer@Tremblay) I got a 'Message 3 from peer 1'.
+>[  0.110000] (3:peer@Perl) I got a 'finalize'.
+>[  0.110000] (1:peer@Tremblay) I got a 'Message 4 from peer 1'.
+>[  0.110000] (3:peer@Perl) I got a 'Message 2 from peer 1'.
+>[  0.110000] (1:peer@Tremblay) I got a 'Message 5 from peer 1'.
+>[  0.110000] (3:peer@Perl) I got a 'Message 3 from peer 1'.
+>[  0.110000] (1:peer@Tremblay) I got a 'finalize'.
+>[  0.110000] (1:peer@Tremblay) I'm done, just waiting for my peers to receive the messages before exiting
+>[  0.110000] (3:peer@Perl) I got a 'Message 4 from peer 1'.
+>[  0.110000] (3:peer@Perl) I got a 'Message 5 from peer 1'.
+>[  0.110000] (3:peer@Perl) I got a 'finalize'.
+>[  0.110000] (3:peer@Perl) I'm done, just waiting for my peers to receive the messages before exiting
+>[  0.110000] (1:peer@Tremblay) Goodbye now!
+>[  0.110000] (2:peer@Ruby) Goodbye now!
+>[  0.110000] (3:peer@Perl) Goodbye now!
diff --git a/examples/python/comm-serialize/comm-serialize.py b/examples/python/comm-serialize/comm-serialize.py
new file mode 100644 (file)
index 0000000..e02708f
--- /dev/null
@@ -0,0 +1,92 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from typing import List, Tuple
+import sys
+
+from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync
+
+
+RECEIVER_MAILBOX_NAME = "receiver"
+
+
+class Sender(object):
+    def __init__(self, message_size: int, messages_count: int):
+        self.message_size = message_size
+        self.messages_count = messages_count
+
+    def __call__(self) -> None:
+        # List in which we store all ongoing communications
+        pending_comms: List[Comm] = []
+
+        # Make a vector of the mailboxes to use
+        receiver_mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
+        for i in range(self.messages_count):
+            message_content = f"Message {i}"
+            this_actor.info(f"Send '{message_content}' to '{receiver_mailbox.name}'")
+            # Create a communication representing the ongoing communication, and store it in pending_comms
+            pending_comms.append(receiver_mailbox.put_async(message_content, self.message_size))
+
+        this_actor.info("Done dispatching all messages")
+
+        # Now that all message exchanges were initiated, wait for their completion in one single call
+        Comm.wait_all(pending_comms)
+
+        this_actor.info("Goodbye now!")
+
+
+class Receiver(object):
+    def __init__(self, messages_count: int):
+        self.mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
+        self.messages_count = messages_count
+
+    def __call__(self):
+        # List in which we store all incoming msgs
+        pending_comms: List[Tuple[Comm, PyGetAsync]] = []
+        this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
+        for i in range(self.messages_count):
+            pending_comms.append(self.mailbox.get_async())
+        while pending_comms:
+            index = Comm.wait_any([comm for (comm, _) in pending_comms])
+            _, async_data = pending_comms[index]
+            this_actor.info(f"I got '{async_data.get()}'.")
+            pending_comms.pop(index)
+
+
+def main():
+    e = Engine(sys.argv)
+    # Creates the platform
+    #  ________                 __________
+    # | Sender |===============| Receiver |
+    # |________|    Link1      |__________|
+    #
+    zone: NetZone = NetZone.create_full_zone("Zone1")
+    sender_host: Host = zone.create_host("sender", 1).seal()
+    receiver_host: Host = zone.create_host("receiver", 1).seal()
+
+    # create split-duplex link1 (UP/DOWN), limiting the number of concurrent flows in it for 2
+    link = zone.create_split_duplex_link("link1", 10e9).set_latency(10e-6).set_concurrency_limit(2).seal()
+
+    # create routes between nodes
+    zone.add_route(
+        sender_host.netpoint,
+        receiver_host.netpoint,
+        None,
+        None,
+        [LinkInRoute(link, LinkInRoute.UP)],
+        True
+    )
+    zone.seal()
+
+    # create actors Sender/Receiver
+    messages_count = 10
+    Actor.create("receiver", receiver_host, Receiver(messages_count=messages_count))
+    Actor.create("sender", sender_host, Sender(messages_count=messages_count, message_size=int(1e6)))
+
+    e.run()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/python/comm-serialize/comm-serialize.tesh b/examples/python/comm-serialize/comm-serialize.tesh
new file mode 100644 (file)
index 0000000..0a01748
--- /dev/null
@@ -0,0 +1,26 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-serialize.py "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (1:receiver@receiver) Wait for 10 messages asynchronously
+>[  0.000000] (2:sender@sender) Send 'Message 0' to 'receiver'
+>[  0.000000] (2:sender@sender) Send 'Message 1' to 'receiver'
+>[  0.000000] (2:sender@sender) Send 'Message 2' to 'receiver'
+>[  0.000000] (2:sender@sender) Send 'Message 3' to 'receiver'
+>[  0.000000] (2:sender@sender) Send 'Message 4' to 'receiver'
+>[  0.000000] (2:sender@sender) Send 'Message 5' to 'receiver'
+>[  0.000000] (2:sender@sender) Send 'Message 6' to 'receiver'
+>[  0.000000] (2:sender@sender) Send 'Message 7' to 'receiver'
+>[  0.000000] (2:sender@sender) Send 'Message 8' to 'receiver'
+>[  0.000000] (2:sender@sender) Send 'Message 9' to 'receiver'
+>[  0.000000] (2:sender@sender) Done dispatching all messages
+>[  0.000336] (1:receiver@receiver) I got 'Message 1'.
+>[  0.000336] (1:receiver@receiver) I got 'Message 0'.
+>[  0.000542] (1:receiver@receiver) I got 'Message 3'.
+>[  0.000542] (1:receiver@receiver) I got 'Message 2'.
+>[  0.000749] (1:receiver@receiver) I got 'Message 5'.
+>[  0.000749] (1:receiver@receiver) I got 'Message 4'.
+>[  0.000955] (1:receiver@receiver) I got 'Message 7'.
+>[  0.000955] (1:receiver@receiver) I got 'Message 6'.
+>[  0.001161] (1:receiver@receiver) I got 'Message 9'.
+>[  0.001161] (1:receiver@receiver) I got 'Message 8'.
+>[  0.001161] (2:sender@sender) Goodbye now!
diff --git a/examples/python/comm-suspend/comm-suspend.py b/examples/python/comm-suspend/comm-suspend.py
new file mode 100644 (file)
index 0000000..126b7ca
--- /dev/null
@@ -0,0 +1,67 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+""" This example shows how to suspend and resume an asynchronous communication.
+"""
+
+from argparse import ArgumentParser
+import sys
+
+from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+
+
+def create_parser() -> ArgumentParser:
+    parser = ArgumentParser()
+    parser.add_argument(
+        '--platform',
+        type=str,
+        required=True,
+        help='path to the platform description'
+    )
+    return parser
+
+
+def sender():
+    mailbox: Mailbox = Mailbox.by_name("receiver")
+    payload = "Sent message"
+
+    # Create a communication representing the ongoing communication and then
+    simulated_size_in_bytes = 13194230
+    comm: Comm = mailbox.put_init(payload, simulated_size_in_bytes)
+    this_actor.info(f"Suspend the communication before it starts (remaining: {comm.remaining:.0f} bytes)"
+                    f" and wait a second.")
+    this_actor.sleep_for(1)
+    this_actor.info(f"Now, start the communication (remaining: {comm.remaining:.0f} bytes) and wait another second.")
+    comm.start()
+    this_actor.sleep_for(1)
+    this_actor.info(f"There is still {comm.remaining:.0f} bytes to transfer in this communication."
+                    " Suspend it for one second.")
+    comm.suspend()
+    this_actor.info(f"Now there is {comm.remaining:.0f} bytes to transfer. Resume it and wait for its completion.")
+    comm.resume()
+    comm.wait()
+    this_actor.info(f"There is {comm.remaining:.0f} bytes to transfer after the communication completion.")
+    this_actor.info(f"Suspending a completed activity is a no-op.")
+    comm.suspend()
+
+
+def receiver():
+    mailbox: Mailbox = Mailbox.by_name("receiver")
+    this_actor.info("Wait for the message.")
+    received: str = mailbox.get()
+    this_actor.info(f"I got '{received}'.")
+
+
+def main():
+    settings = create_parser().parse_known_args()[0]
+    e = Engine(sys.argv)
+    e.load_platform(settings.platform)
+    Actor.create("sender", e.host_by_name("Tremblay"), sender)
+    Actor.create("receiver", e.host_by_name("Jupiter"), receiver)
+    e.run()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/python/comm-suspend/comm-suspend.tesh b/examples/python/comm-suspend/comm-suspend.tesh
new file mode 100644 (file)
index 0000000..8715ece
--- /dev/null
@@ -0,0 +1,11 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-suspend.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [  0.000000] (1:sender@Tremblay) Suspend the communication before it starts (remaining: 13194230 bytes) and wait a second.
+> [  0.000000] (2:receiver@Jupiter) Wait for the message.
+> [  1.000000] (1:sender@Tremblay) Now, start the communication (remaining: 13194230 bytes) and wait another second.
+> [  2.000000] (1:sender@Tremblay) There is still 6660438 bytes to transfer in this communication. Suspend it for one second.
+> [  2.000000] (1:sender@Tremblay) Now there is 6660438 bytes to transfer. Resume it and wait for its completion.
+> [  3.000000] (2:receiver@Jupiter) I got 'Sent message'.
+> [  3.000000] (1:sender@Tremblay) There is 0 bytes to transfer after the communication completion.
+> [  3.000000] (1:sender@Tremblay) Suspending a completed activity is a no-op.
diff --git a/examples/python/comm-testany/comm-testany.py b/examples/python/comm-testany/comm-testany.py
new file mode 100644 (file)
index 0000000..1b32047
--- /dev/null
@@ -0,0 +1,72 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from argparse import ArgumentParser
+from typing import List
+import sys
+
+from simgrid import Engine, Actor, Comm, Mailbox, this_actor
+
+
+def create_parser() -> ArgumentParser:
+    parser = ArgumentParser()
+    parser.add_argument(
+        '--platform',
+        type=str,
+        required=True,
+        help='path to the platform description'
+    )
+    return parser
+
+
+def rank0():
+    rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
+    this_actor.info("Post my asynchronous receives")
+    comm1, a1 = rank0_mailbox.get_async()
+    comm2, a2 = rank0_mailbox.get_async()
+    comm3, a3 = rank0_mailbox.get_async()
+    pending_comms: List[Comm] = [comm1, comm2, comm3]
+
+    this_actor.info("Send some data to rank-1")
+    rank1_mailbox: Mailbox = Mailbox.by_name("rank1")
+    for i in range(3):
+        rank1_mailbox.put(i, 1)
+
+    this_actor.info("Test for completed comms")
+    while pending_comms:
+        flag = Comm.test_any(pending_comms)
+        if flag != -1:
+            pending_comms.pop(flag)
+            this_actor.info("Remove a pending comm.")
+        else:
+            # Nothing matches, wait for a little bit
+            this_actor.sleep_for(0.1)
+    this_actor.info("Last comm is complete")
+
+
+def rank1():
+    rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
+    rank1_mailbox: Mailbox = Mailbox.by_name("rank1")
+    for i in range(3):
+        data: int = rank1_mailbox.get()
+        this_actor.info(f"Received {data}")
+        msg_content = f"Message {i}"
+        this_actor.info(f"Send '{msg_content}'")
+        rank0_mailbox.put(msg_content, int(1e6))
+
+
+def main():
+    settings = create_parser().parse_known_args()[0]
+    e = Engine(sys.argv)
+    e.load_platform(settings.platform)
+
+    Actor.create("rank0", e.host_by_name("Tremblay"), rank0)
+    Actor.create("rank1", e.host_by_name("Fafard"), rank1)
+
+    e.run()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/python/comm-testany/comm-testany.tesh b/examples/python/comm-testany/comm-testany.tesh
new file mode 100644 (file)
index 0000000..83d80f1
--- /dev/null
@@ -0,0 +1,16 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-testany.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (1:rank0@Tremblay) Post my asynchronous receives
+>[  0.000000] (1:rank0@Tremblay) Send some data to rank-1
+>[  0.025708] (2:rank1@Fafard) Received 0
+>[  0.025708] (2:rank1@Fafard) Send 'Message 0'
+>[  0.209813] (2:rank1@Fafard) Received 1
+>[  0.209813] (2:rank1@Fafard) Send 'Message 1'
+>[  0.393918] (1:rank0@Tremblay) Test for completed comms
+>[  0.393918] (2:rank1@Fafard) Received 2
+>[  0.393918] (2:rank1@Fafard) Send 'Message 2'
+>[  0.393918] (1:rank0@Tremblay) Remove a pending comm.
+>[  0.393918] (1:rank0@Tremblay) Remove a pending comm.
+>[  0.593918] (1:rank0@Tremblay) Remove a pending comm.
+>[  0.593918] (1:rank0@Tremblay) Last comm is complete
diff --git a/examples/python/comm-throttling/comm-throttling.py b/examples/python/comm-throttling/comm-throttling.py
new file mode 100644 (file)
index 0000000..3e7e140
--- /dev/null
@@ -0,0 +1,67 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from argparse import ArgumentParser
+import sys
+
+from simgrid import Engine, Actor, Comm, Mailbox, this_actor
+
+
+def create_parser() -> ArgumentParser:
+    parser = ArgumentParser()
+    parser.add_argument(
+        '--platform',
+        type=str,
+        required=True,
+        help='path to the platform description'
+    )
+    return parser
+
+
+def sender(mailbox: Mailbox):
+    this_actor.info("Send at full bandwidth")
+
+    # First send a 2.5e8 Bytes payload at full bandwidth (1.25e8 Bps)
+    payload = Engine.clock
+    mailbox.put(payload, int(2.5e8))
+
+    this_actor.info("Throttle the bandwidth at the Comm level")
+    # ... then send it again but throttle the Comm
+    payload = Engine.clock
+    # get a handler on the comm first
+    comm: Comm = mailbox.put_init(payload, int(2.5e8))
+
+    # let throttle the communication. It amounts to set the rate of the comm to half the nominal bandwidth of the link,
+    # i.e., 1.25e8 / 2. This second communication will thus take approximately twice as long as the first one
+    comm.set_rate(int(1.25e8 / 2)).wait()
+
+
+def receiver(mailbox: Mailbox):
+    # Receive the first payload sent at full bandwidth
+    sender_time = mailbox.get()
+    communication_time = Engine.clock - sender_time
+    this_actor.info(f"Payload received (full bandwidth) in {communication_time} seconds")
+
+    # ... Then receive the second payload sent with a throttled Comm
+    sender_time = mailbox.get()
+    communication_time = Engine.clock - sender_time
+    this_actor.info(f"Payload received (throttled) in {communication_time} seconds")
+
+
+def main():
+    settings = create_parser().parse_known_args()[0]
+    e = Engine(sys.argv)
+    e.load_platform(settings.platform)
+
+    mailbox = e.mailbox_by_name_or_create("Mailbox")
+
+    Actor.create("sender", e.host_by_name("node-0.simgrid.org"), sender, mailbox)
+    Actor.create("receiver", e.host_by_name("node-1.simgrid.org"), receiver, mailbox)
+
+    e.run()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/python/comm-throttling/comm-throttling.tesh b/examples/python/comm-throttling/comm-throttling.tesh
new file mode 100644 (file)
index 0000000..366b83d
--- /dev/null
@@ -0,0 +1,7 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-throttling.py --platform ${platfdir}/cluster_backbone.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (1:sender@node-0.simgrid.org) Send at full bandwidth
+>[  2.069662] (1:sender@node-0.simgrid.org) Throttle the bandwidth at the Comm level
+>[  2.069662] (2:receiver@node-1.simgrid.org) Payload received (full bandwidth) in 2.0696616701030925 seconds
+>[  6.077468] (2:receiver@node-1.simgrid.org) Payload received (throttled) in 4.007806 seconds
index ecd697a..fd94bcd 100644 (file)
@@ -34,6 +34,7 @@ def create_parser() -> ArgumentParser:
     parser.add_argument(
         '--platform',
         type=str,
+        required=True,
         help='path to the platform description'
     )
     parser.add_argument(
diff --git a/examples/python/comm-waituntil/comm-waituntil.py b/examples/python/comm-waituntil/comm-waituntil.py
new file mode 100644 (file)
index 0000000..0209dc8
--- /dev/null
@@ -0,0 +1,77 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+""" This example shows how to suspend and resume an asynchronous communication.
+"""
+
+from argparse import ArgumentParser
+from typing import List
+import sys
+
+from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+
+
+FINALIZE_MESSAGE = "finalize"
+
+
+def create_parser() -> ArgumentParser:
+    parser = ArgumentParser()
+    parser.add_argument(
+        '--platform',
+        type=str,
+        required=True,
+        help='path to the platform description'
+    )
+    return parser
+
+
+def sender(receiver_mailbox: Mailbox, messages_count: int, payload_size: int):
+    pending_comms: List[Comm] = []
+    # Start dispatching all messages to the receiver
+    for i in range(messages_count):
+        payload = f"Message {i}"
+        this_actor.info(f"Send '{payload}' to '{receiver_mailbox.name}'")
+        # Create a communication representing the ongoing communication
+        comm = receiver_mailbox.put_async(payload, payload_size)
+        # Add this comm to the vector of all known comms
+        pending_comms.append(comm)
+
+    # Start the finalize signal to the receiver
+    final_comm = receiver_mailbox.put_async(FINALIZE_MESSAGE, 0)
+    pending_comms.append(final_comm)
+    this_actor.info(f"Send '{FINALIZE_MESSAGE}' to '{receiver_mailbox.name}'")
+    this_actor.info("Done dispatching all messages")
+
+    # Now that all message exchanges were initiated, wait for their completion, in order of creation
+    while pending_comms:
+        comm = pending_comms[-1]
+        comm.wait_until(Engine.clock + 1)
+        pending_comms.pop()  # remove it from the list
+    this_actor.info("Goodbye now!")
+
+
+def receiver(mailbox: Mailbox):
+    this_actor.info("Wait for my first message")
+    finalized = False
+    while not finalized:
+        received: str = mailbox.get()
+        this_actor.info(f"I got a '{received}'.")
+        # If it's a finalize message, we're done.
+        if received == FINALIZE_MESSAGE:
+            finalized = True
+
+
+def main():
+    settings = create_parser().parse_known_args()[0]
+    e = Engine(sys.argv)
+    e.load_platform(settings.platform)
+    receiver_mailbox: Mailbox = Mailbox.by_name("receiver")
+    Actor.create("sender", e.host_by_name("Tremblay"), sender, receiver_mailbox, 3, int(5e7))
+    Actor.create("receiver", e.host_by_name("Ruby"), receiver, receiver_mailbox)
+    e.run()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/python/comm-waituntil/comm-waituntil.tesh b/examples/python/comm-waituntil/comm-waituntil.tesh
new file mode 100644 (file)
index 0000000..1837f0d
--- /dev/null
@@ -0,0 +1,14 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waituntil.py --platform ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver'
+>[  0.000000] (2:receiver@Ruby) Wait for my first message
+>[  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver'
+>[  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver'
+>[  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver'
+>[  0.000000] (1:sender@Tremblay) Done dispatching all messages
+>[  0.105458] (2:receiver@Ruby) I got a 'Message 0'.
+>[  0.210917] (2:receiver@Ruby) I got a 'Message 1'.
+>[  0.316375] (2:receiver@Ruby) I got a 'Message 2'.
+>[  0.318326] (2:receiver@Ruby) I got a 'finalize'.
+>[  0.318326] (1:sender@Tremblay) Goodbye now!
index c7762da..78f34a2 100644 (file)
@@ -49,6 +49,8 @@ using simgrid::s4u::Actor;
 using simgrid::s4u::ActorPtr;
 using simgrid::s4u::Barrier;
 using simgrid::s4u::BarrierPtr;
+using simgrid::s4u::Comm;
+using simgrid::s4u::CommPtr;
 using simgrid::s4u::Engine;
 using simgrid::s4u::Host;
 using simgrid::s4u::Link;
@@ -182,6 +184,8 @@ PYBIND11_MODULE(simgrid, m)
                           "get_all_hosts() is deprecated and  will be dropped after v3.33, use all_hosts instead.", 1);
              return self.attr("all_hosts");
            })
+      .def("host_by_name", &Engine::host_by_name_or_null, py::call_guard<py::gil_scoped_release>(),
+           "Retrieve a host by its name, or None if it does not exist in the platform.")
       .def_property_readonly("all_hosts", &Engine::get_all_hosts, "Returns the list of all hosts found in the platform")
       .def("get_all_links",
            [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
@@ -214,6 +218,10 @@ PYBIND11_MODULE(simgrid, m)
       .def("netzone_by_name", &Engine::netzone_by_name_or_null)
       .def("load_platform", &Engine::load_platform, "Load a platform file describing the environment")
       .def("load_deployment", &Engine::load_deployment, "Load a deployment file and launch the actors that it contains")
+      .def("mailbox_by_name_or_create", &Engine::mailbox_by_name_or_create,
+           py::call_guard<py::gil_scoped_release>(),
+           py::arg("name"),
+           "Find a mailbox from its name or create one if it does not exist")
       .def("run", &Engine::run, py::call_guard<py::gil_scoped_release>(), "Run the simulation until its end")
       .def("run_until", py::overload_cast<double>(&Engine::run_until, py::const_),
            py::call_guard<py::gil_scoped_release>(), "Run the simulation until the given date",
@@ -645,6 +653,8 @@ PYBIND11_MODULE(simgrid, m)
             return std::string(self->get_name().c_str()); // Convert from xbt::string because of MC
           },
           "The name of that mailbox (read-only property).")
+      .def_property_readonly("ready", &Mailbox::ready, py::call_guard<py::gil_scoped_release>(),
+                             "Check if there is a communication ready to be consumed from a mailbox.")
       .def(
           "put",
           [](Mailbox* self, py::object data, int size, double timeout) {
@@ -684,7 +694,7 @@ PYBIND11_MODULE(simgrid, m)
           py::call_guard<py::gil_scoped_release>(), "Blocking data reception")
       .def(
           "get_async",
-          [](Mailbox* self) -> std::tuple<simgrid::s4u::CommPtr, PyGetAsync> {
+          [](Mailbox* self) -> std::tuple<CommPtr, PyGetAsync> {
             PyGetAsync wrap;
             auto comm = self->get_async(wrap.get());
             return std::make_tuple(std::move(comm), std::move(wrap));
@@ -703,45 +713,93 @@ PYBIND11_MODULE(simgrid, m)
           "Get python object after async communication in receiver side");
 
   /* Class Comm */
-  py::class_<simgrid::s4u::Comm, simgrid::s4u::CommPtr>(m, "Comm",
-                                                        "Communication. See the C++ documentation for details.")
-      .def("test", &simgrid::s4u::Comm::test, py::call_guard<py::gil_scoped_release>(),
+  py::class_<Comm, CommPtr>(m, "Comm", "Communication. See the C++ documentation for details.")
+      .def_property_readonly("dst_data_size", &Comm::get_dst_data_size,
+                             py::call_guard<py::gil_scoped_release>(),
+                             "Retrieve the size of the received data.")
+      .def_property_readonly("mailbox", &Comm::get_mailbox,
+                             py::call_guard<py::gil_scoped_release>(),
+                             "Retrieve the mailbox on which this comm acts.")
+      .def_property_readonly("sender", &Comm::get_sender,
+                             py::call_guard<py::gil_scoped_release>())
+      .def_property_readonly("state_str", [](Comm* self){ return std::string(self->get_state_str()); },
+                             py::call_guard<py::gil_scoped_release>(),
+                             "Retrieve the Comm state as string")
+      .def_property_readonly("remaining",  &Comm::get_remaining,
+                             py::call_guard<py::gil_scoped_release>(),
+                             "Remaining amount of work that this Comm entails")
+      .def_property_readonly("start_time",  &Comm::get_start_time,
+                             py::call_guard<py::gil_scoped_release>(),
+                             "Time at which this Comm started")
+      .def_property_readonly("finish_time",  &Comm::get_finish_time,
+                             py::call_guard<py::gil_scoped_release>(),
+                             "Time at which this Comm finished")
+      .def("set_payload_size", &Comm::set_payload_size, py::call_guard<py::gil_scoped_release>(),
+           py::arg("bytes"),
+           "Specify the amount of bytes which exchange should be simulated.")
+      .def("set_rate", &Comm::set_rate, py::call_guard<py::gil_scoped_release>(),
+           py::arg("rate"),
+           "Sets the maximal communication rate (in byte/sec). Must be done before start")
+      .def("cancel", [](Comm* self){ return self->cancel(); },
+           py::call_guard<py::gil_scoped_release>(), py::return_value_policy::reference_internal,
+           "Cancel the activity.")
+      .def("start", [](Comm* self){ return self->start(); },
+           py::call_guard<py::gil_scoped_release>(), py::return_value_policy::reference_internal,
+           "Starts a previously created activity. This function is optional: you can call wait() even if you didn't "
+           "call start()")
+      .def("suspend", [](Comm* self){ return self->suspend(); },
+          py::call_guard<py::gil_scoped_release>(), py::return_value_policy::reference_internal,
+          "Suspend the activity.")
+      .def("resume", [](Comm* self){ return self->resume(); },
+          py::call_guard<py::gil_scoped_release>(), py::return_value_policy::reference_internal,
+          "Resume the activity.")
+      .def("test", &Comm::test, py::call_guard<py::gil_scoped_release>(),
            "Test whether the communication is terminated.")
-      .def("wait", &simgrid::s4u::Comm::wait, py::call_guard<py::gil_scoped_release>(),
+      .def("wait", &Comm::wait, py::call_guard<py::gil_scoped_release>(),
            "Block until the completion of that communication.")
-      .def("wait_for", &simgrid::s4u::Comm::wait_for,
+      .def("wait_for", &Comm::wait_for, py::call_guard<py::gil_scoped_release>(),
            py::arg("timeout"),
-           py::call_guard<py::gil_scoped_release>(),
            "Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
-      .def("detach", [](simgrid::s4u::Comm* self) {
-              return self->detach();
-           },
+      .def("wait_until", &Comm::wait_until, py::call_guard<py::gil_scoped_release>(),
+           py::arg("time_limit"),
+           "Block until the completion of that communication, or raises TimeoutException after the specified time.")
+      .def("detach", [](Comm* self) { return self->detach(); },
            py::return_value_policy::reference_internal,
            py::call_guard<py::gil_scoped_release>(),
            "Start the comm, and ignore its result. It can be completely forgotten after that.")
-      .def_static(
-          "wait_all", &simgrid::s4u::Comm::wait_all,
-          py::arg("comms"),
-          py::call_guard<py::gil_scoped_release>(),
-          "Block until the completion of all communications in the list.")
-      .def_static("wait_all_for", &simgrid::s4u::Comm::wait_all_for,
-                  py::arg("comms"), py::arg("timeout"),
+      .def_static("sendto", &Comm::sendto, py::call_guard<py::gil_scoped_release>(),
+                  py::arg("from"), py::arg("to"), py::arg("simulated_size_in_bytes"),
+                  "Do a blocking communication between two arbitrary hosts.")
+      .def_static("sendto_init", py::overload_cast<Host*, Host*>(&Comm::sendto_init),
+                  py::call_guard<py::gil_scoped_release>(),
+                  py::arg("from"), py::arg("to"),
+                  "Creates a communication between the two given hosts, bypassing the mailbox mechanism.")
+      .def_static("sendto_async", &Comm::sendto_async, py::call_guard<py::gil_scoped_release>(),
+                  py::arg("from"), py::arg("to"), py::arg("simulated_size_in_bytes"),
+                  "Do a blocking communication between two arbitrary hosts.\n\nThis initializes a communication that "
+                  "completely bypass the mailbox and actors mechanism. There is really no limit on the hosts involved. "
+                  "In particular, the actor does not have to be on one of the involved hosts.")
+      .def_static("test_any", &Comm::test_any,
                   py::call_guard<py::gil_scoped_release>(),
+                  py::arg("comms"),
+                  "take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done)")
+      .def_static("wait_all", &Comm::wait_all, py::call_guard<py::gil_scoped_release>(),
+                  py::arg("comms"),
+                  "Block until the completion of all communications in the list.")
+      .def_static("wait_all_for", &Comm::wait_all_for, py::call_guard<py::gil_scoped_release>(),
+                  py::arg("comms"), py::arg("timeout"),
                   "Block until the completion of all communications in the list, or raises TimeoutException after "
                   "the specified timeout.")
-      .def_static(
-          "wait_any", &simgrid::s4u::Comm::wait_any,
-          py::arg("comms"),
-          py::call_guard<py::gil_scoped_release>(),
-          "Block until the completion of any communication in the list and return the index of the terminated one.")
-      .def_static(
-          "wait_any_for",
-          &simgrid::s4u::Comm::wait_any_for,
-          py::arg("comms"), py::arg("timeout"),
-          py::call_guard<py::gil_scoped_release>(),
-          "Block until the completion of any communication in the list and return the index of the terminated "
-          "one, or -1 if a timeout occurred."
-      );
+      .def_static("wait_any", &Comm::wait_any,
+                  py::call_guard<py::gil_scoped_release>(),
+                  py::arg("comms"),
+                  "Block until the completion of any communication in the list and return the index of the "
+                  "terminated one.")
+      .def_static("wait_any_for", &Comm::wait_any_for,
+                  py::call_guard<py::gil_scoped_release>(),
+                  py::arg("comms"), py::arg("timeout"),
+                  "Block until the completion of any communication in the list and return the index of the terminated "
+                  "one, or -1 if a timeout occurred.");
 
   /* Class Io */
   py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr>(m, "Io", "I/O activities. See the C++ documentation for details.")
index 8098954..d9da9e1 100644 (file)
@@ -257,7 +257,7 @@ Link* Engine::link_by_name_or_null(const std::string& name) const
   return link == pimpl->links_.end() ? nullptr : link->second->get_iface();
 }
 
-/** @brief Find a mailox from its name or create one if it does not exist) */
+/** @brief Find a mailbox from its name or create one if it does not exist) */
 Mailbox* Engine::mailbox_by_name_or_create(const std::string& name) const
 {
   /* two actors may have pushed the same mbox_create simcall at the same time */