Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'python_ptask' into 'master'
authorArnaud Giersch <arnaud.giersch@univ-fcomte.fr>
Thu, 30 Jun 2022 08:02:59 +0000 (08:02 +0000)
committerArnaud Giersch <arnaud.giersch@univ-fcomte.fr>
Thu, 30 Jun 2022 08:02:59 +0000 (08:02 +0000)
Add ptasks in the Python bindings

See merge request simgrid/simgrid!113

ChangeLog
MANIFEST.in
examples/python/CMakeLists.txt
examples/python/exec-ptask/exec-ptask.py [new file with mode: 0644]
examples/python/exec-ptask/exec-ptask.tesh [new file with mode: 0644]
src/bindings/python/simgrid_python.cpp

index 6503665..3dc42c3 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -19,7 +19,13 @@ Python:
     - Engine:
       - Engine.host_by_name [example: examples/python/comm-host2host/]
       - Engine.mailbox_by_name_or_create [example: examples/python/comm-pingpong/]
+         - Engine.set_config
     - Mailbox: Mailbox.ready [example: examples/python/comm-ready/]
+       - Ptask [example: examples/python/exec-ptask/]:
+         - this_actor.exec_init
+         - this_actor.parallel_execute
+         - Exec.suspend
+         - Exec.wait_for
   - Added an AssertionError exception that may be thrown in case of error. For instance, creating tow hosts with the
     same name will now throw this exception instead of killing the interpreter.
 
index 5bacdf0..09a068e 100644 (file)
@@ -553,6 +553,8 @@ include examples/python/exec-async/exec-async.py
 include examples/python/exec-async/exec-async.tesh
 include examples/python/exec-basic/exec-basic.py
 include examples/python/exec-basic/exec-basic.tesh
+include examples/python/exec-basic/exec-ptask.py
+include examples/python/exec-basic/exec-ptask.tesh
 include examples/python/exec-cpu-nonlinear/exec-cpu-nonlinear.py
 include examples/python/exec-cpu-nonlinear/exec-cpu-nonlinear.tesh
 include examples/python/exec-dvfs/exec-dvfs.py
index 48f69d4..1efe8e2 100644 (file)
@@ -2,7 +2,7 @@ foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate ac
         app-masterworkers
         comm-wait comm-waitall comm-waitallfor comm-waitany comm-waitfor comm-failure comm-host2host comm-pingpong
         comm-ready comm-serialize comm-suspend comm-testany comm-throttling comm-waitallfor comm-waituntil
-        exec-async exec-basic exec-dvfs exec-remote
+        exec-async exec-basic exec-dvfs exec-remote exec-ptask
         platform-profile platform-failures
         network-nonlinear clusters-multicpu io-degradation exec-cpu-nonlinear
         synchro-barrier synchro-mutex synchro-semaphore)
diff --git a/examples/python/exec-ptask/exec-ptask.py b/examples/python/exec-ptask/exec-ptask.py
new file mode 100644 (file)
index 0000000..d5ddb6a
--- /dev/null
@@ -0,0 +1,103 @@
+# Copyright (c) 2018-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+
+# This script does exactly the same thing as file s4u-exec-ptask.cpp
+
+import sys
+from simgrid import Actor, Engine, Host, this_actor, TimeoutException
+
+def runner():
+    hosts = Engine.instance.all_hosts
+    hosts_count = len(hosts)
+
+    # Test 1
+    this_actor.info("First, build a classical parallel activity, with 1 Gflop to execute on each node, "
+               "and 10MB to exchange between each pair")
+    computation_amounts = [1e9]*hosts_count
+    communication_amounts = [0]*hosts_count*hosts_count
+    for i in range(hosts_count):
+        for j in range(i+1, hosts_count):
+            communication_amounts[i * hosts_count + j] = 1e7
+    this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
+
+    # Test 2
+    this_actor.info("We can do the same with a timeout of 10 seconds enabled.")
+    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+    try:
+        activity.wait_for(10.0)
+        sys.exit("Woops, this did not timeout as expected... Please report that bug.")
+    except TimeoutException:
+        this_actor.info("Caught the expected timeout exception.")
+        activity.cancel()
+
+    # Test 3
+    this_actor.info("Then, build a parallel activity involving only computations (of different amounts) and no communication")
+    computation_amounts = [3e8, 6e8, 1e9]
+    communication_amounts = []
+    this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
+
+    # Test 4
+    this_actor.info("Then, build a parallel activity with no computation nor communication (synchro only)")
+    computation_amounts = []
+    this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
+
+    # Test 5
+    this_actor.info("Then, Monitor the execution of a parallel activity")
+    computation_amounts = [1e6]*hosts_count
+    communication_amounts = [0, 1e6, 0, 0, 0, 1e6, 1e6, 0, 0]
+    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+    activity.start()
+    while not activity.test():
+        ratio = activity.remaining_ratio * 100
+        this_actor.info(f"Remaining flop ratio: {ratio:.0f}%")
+        this_actor.sleep_for(5)
+    activity.wait()
+
+    # Test 6
+    this_actor.info("Finally, simulate a malleable task (a parallel execution that gets reconfigured after its start).")
+    this_actor.info("  - Start a regular parallel execution, with both comm and computation")
+    computation_amounts = [1e6]*hosts_count
+    communication_amounts = [0, 1e6, 0, 0, 1e6, 0, 1e6, 0, 0]
+    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+    activity.start()
+    this_actor.sleep_for(10)
+    remaining_ratio = activity.remaining_ratio
+    this_actor.info(f"  - After 10 seconds, {remaining_ratio*100:.2f}% remains to be done. Change it from 3 hosts to 2 hosts only.")
+    this_actor.info("    Let's first suspend the task.")
+    activity.suspend()
+    this_actor.info("  - Now, simulate the reconfiguration (modeled as a comm from the removed host to the remaining ones).")
+    rescheduling_comp = [0, 0, 0]
+    rescheduling_comm = [0, 0, 0, 0, 0, 0, 25000, 25000, 0]
+    this_actor.parallel_execute(hosts, rescheduling_comp, rescheduling_comm)
+    this_actor.info("  - Now, let's cancel the old task and create a new task with modified comm and computation vectors:")
+    this_actor.info("    What was already done is removed, and the load of the removed host is shared between remaining ones.")
+    for i in range(2):
+        # remove what we've done so far, for both comm and compute load
+        computation_amounts[i]   *= remaining_ratio
+        communication_amounts[i] *= remaining_ratio
+        # The work from 1 must be shared between 2 remaining ones. 1/2=50% of extra work for each
+        computation_amounts[i]   *= 1.5;
+        communication_amounts[i] *= 1.5;
+    hosts = hosts[:2]
+    computation_amounts = computation_amounts[:2]
+    remaining_comm = communication_amounts[1]
+    communication_amounts = [0, remaining_comm, remaining_comm, 0]
+    activity.cancel()
+    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
+    this_actor.info("  - Done, let's wait for the task completion")
+    activity.wait()
+    this_actor.info("Goodbye now!")
+
+
+if __name__ == "__main__":
+    if len(sys.argv) != 2:
+        sys.exit(f"Syntax: {sys.argv[0]} <platform_file>")
+    platform = sys.argv[1]
+    engine = Engine.instance
+    Engine.set_config("host/model:ptask_L07")  # /!\ this is required for running ptasks
+    engine.load_platform(platform)
+    Actor.create("foo", engine.host_by_name("MyHost1"), runner)
+    engine.run()
diff --git a/examples/python/exec-ptask/exec-ptask.tesh b/examples/python/exec-ptask/exec-ptask.tesh
new file mode 100644 (file)
index 0000000..90ff802
--- /dev/null
@@ -0,0 +1,26 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir}/exec-ptask.py ${platfdir}/energy_platform.xml
+> [0.000000] [xbt_cfg/INFO] Configuration change: Set 'host/model' to 'ptask_L07'
+> [0.000000] [xbt_cfg/INFO] Switching to the L07 model to handle parallel tasks.
+> [MyHost1:foo:(1) 0.000000] [python/INFO] First, build a classical parallel activity, with 1 Gflop to execute on each node, and 10MB to exchange between each pair
+> [MyHost1:foo:(1) 300.000000] [python/INFO] We can do the same with a timeout of 10 seconds enabled.
+> [MyHost1:foo:(1) 310.000000] [python/INFO] Caught the expected timeout exception.
+> [MyHost1:foo:(1) 310.000000] [python/INFO] Then, build a parallel activity involving only computations (of different amounts) and no communication
+> [MyHost1:foo:(1) 320.000000] [python/INFO] Then, build a parallel activity with no computation nor communication (synchro only)
+> [MyHost1:foo:(1) 320.000000] [python/INFO] Then, Monitor the execution of a parallel activity
+> [MyHost1:foo:(1) 320.000000] [python/INFO] Remaining flop ratio: 100%
+> [MyHost1:foo:(1) 325.000000] [python/INFO] Remaining flop ratio: 83%
+> [MyHost1:foo:(1) 330.000000] [python/INFO] Remaining flop ratio: 67%
+> [MyHost1:foo:(1) 335.000000] [python/INFO] Remaining flop ratio: 50%
+> [MyHost1:foo:(1) 340.000000] [python/INFO] Remaining flop ratio: 33%
+> [MyHost1:foo:(1) 345.000000] [python/INFO] Remaining flop ratio: 17%
+> [MyHost1:foo:(1) 350.000000] [python/INFO] Finally, simulate a malleable task (a parallel execution that gets reconfigured after its start).
+> [MyHost1:foo:(1) 350.000000] [python/INFO]   - Start a regular parallel execution, with both comm and computation
+> [MyHost1:foo:(1) 360.000000] [python/INFO]   - After 10 seconds, 50.00% remains to be done. Change it from 3 hosts to 2 hosts only.
+> [MyHost1:foo:(1) 360.000000] [python/INFO]     Let's first suspend the task.
+> [MyHost1:foo:(1) 360.000000] [python/INFO]   - Now, simulate the reconfiguration (modeled as a comm from the removed host to the remaining ones).
+> [MyHost1:foo:(1) 360.500000] [python/INFO]   - Now, let's cancel the old task and create a new task with modified comm and computation vectors:
+> [MyHost1:foo:(1) 360.500000] [python/INFO]     What was already done is removed, and the load of the removed host is shared between remaining ones.
+> [MyHost1:foo:(1) 360.500000] [python/INFO]   - Done, let's wait for the task completion
+> [MyHost1:foo:(1) 375.500000] [python/INFO] Goodbye now!
index 256129e..e1c42ed 100644 (file)
@@ -123,6 +123,14 @@ PYBIND11_MODULE(simgrid, m)
            py::call_guard<py::gil_scoped_release>())
       .def("exec_async", py::overload_cast<double>(&simgrid::s4u::this_actor::exec_async),
            py::call_guard<py::gil_scoped_release>())
+      .def("parallel_execute", &simgrid::s4u::this_actor::parallel_execute,
+           py::call_guard<py::gil_scoped_release>(),
+           "Run a parallel task (requires the 'ptask_L07' model)")
+      .def("exec_init",
+           py::overload_cast<const std::vector<simgrid::s4u::Host*>&, const std::vector<double>&,
+           const std::vector<double>&>  (&simgrid::s4u::this_actor::exec_init),
+           py::call_guard<py::gil_scoped_release>(),
+           "Initiate a parallel task (requires the 'ptask_L07' model)")
       .def("get_host", &simgrid::s4u::this_actor::get_host, "Retrieves host on which the current actor is located")
       .def("set_host", &simgrid::s4u::this_actor::set_host, py::call_guard<py::gil_scoped_release>(),
            "Moves the current actor to another host.", py::arg("dest"))
@@ -219,6 +227,8 @@ PYBIND11_MODULE(simgrid, m)
                              "Retrieve the root netzone, containing all others.")
       .def("netpoint_by_name", &Engine::netpoint_by_name_or_null)
       .def("netzone_by_name", &Engine::netzone_by_name_or_null)
+      .def("set_config", py::overload_cast<const std::string&>(&Engine::set_config),
+           "Change one of SimGrid's configurations")
       .def("load_platform", &Engine::load_platform, "Load a platform file describing the environment")
       .def("load_deployment", &Engine::load_deployment, "Load a deployment file and launch the actors that it contains")
       .def("mailbox_by_name_or_create", &Engine::mailbox_by_name_or_create,
@@ -840,8 +850,12 @@ PYBIND11_MODULE(simgrid, m)
            "Test whether the execution is terminated.")
       .def("cancel", &simgrid::s4u::Exec::cancel, py::call_guard<py::gil_scoped_release>(), "Cancel that execution.")
       .def("start", &simgrid::s4u::Exec::start, py::call_guard<py::gil_scoped_release>(), "Start that execution.")
+      .def("suspend", &simgrid::s4u::Exec::suspend, py::call_guard<py::gil_scoped_release>(), "Suspend that execution.")
       .def("wait", &simgrid::s4u::Exec::wait, py::call_guard<py::gil_scoped_release>(),
-           "Block until the completion of that execution.");
+           "Block until the completion of that execution.")
+      .def("wait_for", &simgrid::s4u::Exec::wait_for, py::call_guard<py::gil_scoped_release>(),
+           py::arg("timeout"),
+           "Block until the completion of that activity, or raises TimeoutException after the specified timeout.");
 
   /* Class Semaphore */
   py::class_<Semaphore, SemaphorePtr>(m, "Semaphore",