Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add remaining Comm synchronisation Python bindings
authorJean-Edouard BOULANGER <jean.edouard.boulanger@gmail.com>
Wed, 9 Mar 2022 17:57:02 +0000 (18:57 +0100)
committerJean-Edouard BOULANGER <jean.edouard.boulanger@gmail.com>
Wed, 9 Mar 2022 17:57:02 +0000 (18:57 +0100)
ChangeLog
docs/source/app_s4u.rst
examples/python/comm-waitallfor/comm-waitallfor.py [new file with mode: 0644]
examples/python/comm-waitallfor/comm-waitallfor.tesh [new file with mode: 0644]
src/bindings/python/simgrid_python.cpp

index 0e36dac..48b717b 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -38,8 +38,15 @@ XBT:
  - Drop xbt_dynar_shrink().
 
 Python:
- - Added the following bindings: Comm.wait_for() and Comm.wait_any_for()
-   Example: examples/python/comm-waitfor/
+ - Fixed the following bindings:
+   - Actor.kill_all() [previously declared a member of the Actor class although it is a static member]
+ - Added the following bindings:
+     - this_actor.warning()
+     - Mailbox.put_init() [example: examples/python/comm-waitallfor]
+     - Comm.detach() [example: examples/python/comm-waitallfor]
+     - Comm.wait_for() [example: examples/python/comm-waitfor/]
+     - Comm.wait_any_for()
+     - Comm.wait_all_for() [example: examples/python/comm-waitallfor]
 
 Fixed bugs (FG#.. -> FramaGit bugs; FG!.. -> FG merge requests)
  (FG: issues on Framagit; GH: issues on GitHub)
index 35c7f24..eefd497 100644 (file)
@@ -701,6 +701,7 @@ Logging messages
 
        .. autofunction:: simgrid.this_actor.debug
        .. autofunction:: simgrid.this_actor.info
+       .. autofunction:: simgrid.this_actor.warning
        .. autofunction:: simgrid.this_actor.error
 
 Sleeping
@@ -1047,6 +1048,7 @@ Sending data
 
       .. automethod:: simgrid.Mailbox.put
       .. automethod:: simgrid.Mailbox.put_async
+      .. automethod:: simgrid.Mailbox.put_init
 
    .. group-tab:: C
 
@@ -2183,6 +2185,10 @@ Querying info
       .. doxygenfunction:: simgrid::s4u::Comm::set_src_data(void *buff, size_t size)
       .. doxygenfunction:: simgrid::s4u::Comm::set_src_data_size(size_t size)
 
+   .. group-tab:: Python
+
+      .. automethod:: simgrid.Comm.detach
+
 Life cycle
 ----------
 
@@ -2215,6 +2221,7 @@ also start direct communications as shown below.
       .. automethod:: simgrid.Comm.wait
       .. automethod:: simgrid.Comm.wait_for
       .. automethod:: simgrid.Comm.wait_all
+      .. automethod:: simgrid.Comm.wait_all_for
       .. automethod:: simgrid.Comm.wait_any
       .. automethod:: simgrid.Comm.wait_any_for
 
diff --git a/examples/python/comm-waitallfor/comm-waitallfor.py b/examples/python/comm-waitallfor/comm-waitallfor.py
new file mode 100644 (file)
index 0000000..d7af786
--- /dev/null
@@ -0,0 +1,131 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+This example implements the following scenario:
+- Multiple workers consume jobs (Job) from a shared mailbox (worker)
+- A client first dispatches several jobs (with a simulated 'cost' - i.e. time the worker will 'process' the job)
+- The client then waits for all job results for a maximum time set by the 'wait timeout' (Comm.wait_all_for)
+- The client then displays the status of individual jobs.
+"""
+
+
+from argparse import ArgumentParser
+from dataclasses import dataclass
+from uuid import uuid4
+import sys
+
+from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
+
+
+SIMULATED_JOB_SIZE_BYTES = 1024
+SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
+
+
+def parse_requests(requests_str: str) -> list[float]:
+    return [float(item.strip()) for item in requests_str.split(",")]
+
+
+def create_parser() -> ArgumentParser:
+    parser = ArgumentParser()
+    parser.add_argument(
+        '--platform',
+        type=str,
+        help='path to the platform description'
+    )
+    parser.add_argument(
+        "--workers",
+        type=int,
+        default=1,
+        help="number of worker actors to start"
+    )
+    parser.add_argument(
+        "--jobs",
+        type=parse_requests,
+        default="1,2,3,4,5",
+        help="duration of individual jobs sent to the workers by the client"
+    )
+    parser.add_argument(
+        "--wait-timeout",
+        type=float,
+        default=5.0,
+        help="number of seconds before the client gives up waiting for results and aborts the simulation"
+    )
+    return parser
+
+
+@dataclass
+class Job:
+    job_id: str
+    duration: float
+    result_mailbox: Mailbox
+
+
+def worker(worker_id: str):
+    this_actor.info(f"{worker_id} started")
+    mailbox: Mailbox = Mailbox.by_name("worker")
+    while True:
+        job: Job = mailbox.get()
+        this_actor.info(f"{worker_id} working on {job.job_id} (will take {job.duration}s to complete)")
+        this_actor.sleep_for(job.duration)
+        job.result_mailbox.put(f"{worker_id}", SIMULATED_RESULT_SIZE_BYTES)
+
+
+@dataclass
+class AsyncJobResult:
+    job: Job
+    result_comm: Comm
+    async_data: PyGetAsync
+
+    @property
+    def complete(self) -> bool:
+        return self.result_comm.test()
+
+    @property
+    def status(self) -> str:
+        return "complete" if self.complete else "pending"
+
+
+def client(client_id: str, jobs: list[float], wait_timeout: float):
+    worker_mailbox: Mailbox = Mailbox.by_name("worker")
+    this_actor.info(f"{client_id} started")
+    async_job_results: list[AsyncJobResult] = []
+    for job_idx, job_duration in enumerate(jobs):
+        result_mailbox: Mailbox = Mailbox.by_name(str(uuid4()))
+        job = Job(job_id=f"job-{job_idx}", duration=job_duration, result_mailbox=result_mailbox)
+        out_comm = worker_mailbox.put_init(Job(
+            job_id=f"job-{job_idx}",
+            duration=job_duration,
+            result_mailbox=result_mailbox
+        ), SIMULATED_JOB_SIZE_BYTES)
+        out_comm.detach()
+        result_comm, async_data = result_mailbox.get_async()
+        async_job_results.append(AsyncJobResult(
+            job=job,
+            result_comm=result_comm,
+            async_data=async_data
+        ))
+    this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
+    completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
+    logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
+    logger(f"received {completed_comms}/{len(async_job_results)} results")
+    for result in async_job_results:
+        this_actor.info(f"{result.job.job_id}"
+                        f" status={result.status}"
+                        f" result_payload={result.async_data.get() if result.complete else ''}")
+
+
+def main():
+    settings = create_parser().parse_known_args()[0]
+    e = Engine(sys.argv)
+    e.load_platform(settings.platform)
+    Actor.create("client", Host.by_name("Tremblay"), client, "client", settings.jobs, settings.wait_timeout)
+    for worker_idx in range(settings.workers):
+        Actor.create("worker", Host.by_name("Ruby"), worker, f"worker-{worker_idx}").daemonize()
+    e.run()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/examples/python/comm-waitallfor/comm-waitallfor.tesh b/examples/python/comm-waitallfor/comm-waitallfor.tesh
new file mode 100644 (file)
index 0000000..50cc88c
--- /dev/null
@@ -0,0 +1,95 @@
+#!/usr/bin/env tesh
+
+p Testing Comm.wait_all_for()
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout 1 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (2:worker@Ruby) worker-0 started
+>[  0.000000] (1:client@Tremblay) client started
+>[  0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=1.0s)
+>[  0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
+>[  1.000000] (1:client@Tremblay) received 0/5 results
+>[  1.000000] (1:client@Tremblay) job-0 status=pending result_payload=
+>[  1.000000] (1:client@Tremblay) job-1 status=pending result_payload=
+>[  1.000000] (1:client@Tremblay) job-2 status=pending result_payload=
+>[  1.000000] (1:client@Tremblay) job-3 status=pending result_payload=
+>[  1.000000] (1:client@Tremblay) job-4 status=pending result_payload=
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout 5 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (2:worker@Ruby) worker-0 started
+>[  0.000000] (1:client@Tremblay) client started
+>[  0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=5.0s)
+>[  0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
+>[  1.008029] (2:worker@Ruby) worker-0 working on job-1 (will take 2.0s to complete)
+>[  3.014105] (2:worker@Ruby) worker-0 working on job-2 (will take 3.0s to complete)
+>[  5.000000] (1:client@Tremblay) received 2/5 results
+>[  5.000000] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
+>[  5.000000] (1:client@Tremblay) job-1 status=complete result_payload=worker-0
+>[  5.000000] (1:client@Tremblay) job-2 status=pending result_payload=
+>[  5.000000] (1:client@Tremblay) job-3 status=pending result_payload=
+>[  5.000000] (1:client@Tremblay) job-4 status=pending result_payload=
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout -1 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (2:worker@Ruby) worker-0 started
+>[  0.000000] (1:client@Tremblay) client started
+>[  0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=-1.0s)
+>[  0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
+>[  1.008029] (2:worker@Ruby) worker-0 working on job-1 (will take 2.0s to complete)
+>[  3.014105] (2:worker@Ruby) worker-0 working on job-2 (will take 3.0s to complete)
+>[  6.020181] (2:worker@Ruby) worker-0 working on job-3 (will take 4.0s to complete)
+>[ 10.026257] (2:worker@Ruby) worker-0 working on job-4 (will take 5.0s to complete)
+>[ 15.030379] (1:client@Tremblay) received 5/5 results
+>[ 15.030379] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
+>[ 15.030379] (1:client@Tremblay) job-1 status=complete result_payload=worker-0
+>[ 15.030379] (1:client@Tremblay) job-2 status=complete result_payload=worker-0
+>[ 15.030379] (1:client@Tremblay) job-3 status=complete result_payload=worker-0
+>[ 15.030379] (1:client@Tremblay) job-4 status=complete result_payload=worker-0
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 5 --wait-timeout 3 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (2:worker@Ruby) worker-0 started
+>[  0.000000] (3:worker@Ruby) worker-1 started
+>[  0.000000] (4:worker@Ruby) worker-2 started
+>[  0.000000] (5:worker@Ruby) worker-3 started
+>[  0.000000] (6:worker@Ruby) worker-4 started
+>[  0.000000] (1:client@Tremblay) client started
+>[  0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=3.0s)
+>[  0.001954] (6:worker@Ruby) worker-4 working on job-4 (will take 5.0s to complete)
+>[  0.001954] (5:worker@Ruby) worker-3 working on job-3 (will take 4.0s to complete)
+>[  0.001954] (4:worker@Ruby) worker-2 working on job-2 (will take 3.0s to complete)
+>[  0.001954] (3:worker@Ruby) worker-1 working on job-1 (will take 2.0s to complete)
+>[  0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
+>[  3.000000] (1:client@Tremblay) received 2/5 results
+>[  3.000000] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
+>[  3.000000] (1:client@Tremblay) job-1 status=complete result_payload=worker-1
+>[  3.000000] (1:client@Tremblay) job-2 status=pending result_payload=
+>[  3.000000] (1:client@Tremblay) job-3 status=pending result_payload=
+>[  3.000000] (1:client@Tremblay) job-4 status=pending result_payload=
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 5 --wait-timeout -1 --jobs 5,10,5,20,5,40,5,80,5,160 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[  0.000000] (2:worker@Ruby) worker-0 started
+>[  0.000000] (3:worker@Ruby) worker-1 started
+>[  0.000000] (4:worker@Ruby) worker-2 started
+>[  0.000000] (5:worker@Ruby) worker-3 started
+>[  0.000000] (6:worker@Ruby) worker-4 started
+>[  0.000000] (1:client@Tremblay) client started
+>[  0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=-1.0s)
+>[  0.001954] (6:worker@Ruby) worker-4 working on job-4 (will take 5.0s to complete)
+>[  0.001954] (5:worker@Ruby) worker-3 working on job-3 (will take 20.0s to complete)
+>[  0.001954] (4:worker@Ruby) worker-2 working on job-2 (will take 5.0s to complete)
+>[  0.001954] (3:worker@Ruby) worker-1 working on job-1 (will take 10.0s to complete)
+>[  0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 5.0s to complete)
+>[  5.008029] (2:worker@Ruby) worker-0 working on job-7 (will take 80.0s to complete)
+>[  5.008029] (4:worker@Ruby) worker-2 working on job-6 (will take 5.0s to complete)
+>[  5.008029] (6:worker@Ruby) worker-4 working on job-5 (will take 40.0s to complete)
+>[ 10.008029] (3:worker@Ruby) worker-1 working on job-8 (will take 5.0s to complete)
+>[ 10.014105] (4:worker@Ruby) worker-2 working on job-9 (will take 160.0s to complete)
+>[170.018227] (1:client@Tremblay) received 10/10 results
+>[170.018227] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
+>[170.018227] (1:client@Tremblay) job-1 status=complete result_payload=worker-1
+>[170.018227] (1:client@Tremblay) job-2 status=complete result_payload=worker-2
+>[170.018227] (1:client@Tremblay) job-3 status=complete result_payload=worker-3
+>[170.018227] (1:client@Tremblay) job-4 status=complete result_payload=worker-4
+>[170.018227] (1:client@Tremblay) job-5 status=complete result_payload=worker-4
+>[170.018227] (1:client@Tremblay) job-6 status=complete result_payload=worker-2
+>[170.018227] (1:client@Tremblay) job-7 status=complete result_payload=worker-0
+>[170.018227] (1:client@Tremblay) job-8 status=complete result_payload=worker-1
+>[170.018227] (1:client@Tremblay) job-9 status=complete result_payload=worker-2
index 3d2fa25..60ce5f6 100644 (file)
@@ -100,6 +100,8 @@ PYBIND11_MODULE(simgrid, m)
           "debug", [](const char* s) { XBT_DEBUG("%s", s); }, "Display a logging message of 'debug' priority.")
       .def(
           "info", [](const char* s) { XBT_INFO("%s", s); }, "Display a logging message of 'info' priority.")
+      .def(
+          "warning", [](const char* s) { XBT_WARN("%s", s); }, "Display a logging message of 'warning' priority.")
       .def(
           "error", [](const char* s) { XBT_ERROR("%s", s); }, "Display a logging message of 'error' priority.")
       .def("execute", py::overload_cast<double, double>(&simgrid::s4u::this_actor::execute),
@@ -653,6 +655,14 @@ PYBIND11_MODULE(simgrid, m)
             return self->put_async(data.ptr(), size);
           },
           py::call_guard<py::gil_scoped_release>(), "Non-blocking data transmission")
+      .def(
+          "put_init",
+          [](Mailbox* self, py::object data, int size) {
+            data.inc_ref();
+            return self->put_init(data.ptr(), size);
+          },
+          py::call_guard<py::gil_scoped_release>(),
+          "Creates (but don’t start) a data transmission to that mailbox.")
       .def(
           "get",
           [](Mailbox* self) {
@@ -689,19 +699,35 @@ PYBIND11_MODULE(simgrid, m)
       .def("wait", &simgrid::s4u::Comm::wait, py::call_guard<py::gil_scoped_release>(),
            "Block until the completion of that communication.")
       .def("wait_for", &simgrid::s4u::Comm::wait_for,
+           py::arg("timeout"),
            py::call_guard<py::gil_scoped_release>(),
            "Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
+      .def("detach", [](simgrid::s4u::Comm* self) {
+              return self->detach();
+           },
+           py::return_value_policy::reference_internal,
+           py::call_guard<py::gil_scoped_release>(),
+           "Start the comm, and ignore its result. It can be completely forgotten after that.")
       // use py::overload_cast for wait_all/wait_any, until the overload marked XBT_ATTRIB_DEPRECATED_v332 is removed
       .def_static(
           "wait_all", py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&>(&simgrid::s4u::Comm::wait_all),
-          py::call_guard<py::gil_scoped_release>(), "Block until the completion of all communications in the list.")
+          py::arg("comms"),
+          py::call_guard<py::gil_scoped_release>(),
+          "Block until the completion of all communications in the list.")
+      .def_static("wait_all_for", &simgrid::s4u::Comm::wait_all_for,
+                  py::arg("comms"), py::arg("timeout"),
+                  py::call_guard<py::gil_scoped_release>(),
+                  "Block until the completion of all communications in the list, or raises TimeoutException after "
+                  "the specified timeout.")
       .def_static(
           "wait_any", py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&>(&simgrid::s4u::Comm::wait_any),
+          py::arg("comms"),
           py::call_guard<py::gil_scoped_release>(),
           "Block until the completion of any communication in the list and return the index of the terminated one.")
       .def_static(
           "wait_any_for",
           py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&, double>(&simgrid::s4u::Comm::wait_any_for),
+          py::arg("comms"), py::arg("timeout"),
           py::call_guard<py::gil_scoped_release>(),
           "Block until the completion of any communication in the list and return the index of the terminated "
           "one, or -1 if a timeout occurred."
@@ -798,11 +824,11 @@ PYBIND11_MODULE(simgrid, m)
       .def("join", py::overload_cast<double>(&Actor::join, py::const_), py::call_guard<py::gil_scoped_release>(),
            "Wait for the actor to finish (more info in the C++ documentation).", py::arg("timeout"))
       .def("kill", &Actor::kill, py::call_guard<py::gil_scoped_release>(), "Kill that actor")
-      .def("kill_all", &Actor::kill_all, py::call_guard<py::gil_scoped_release>(), "Kill all actors but the caller.")
       .def("self", &Actor::self, "Retrieves the current actor.")
       .def("is_suspended", &Actor::is_suspended, "Returns True if that actor is currently suspended.")
       .def("suspend", &Actor::suspend, py::call_guard<py::gil_scoped_release>(),
            "Suspend that actor, that is blocked until resume()ed by another actor.")
       .def("resume", &Actor::resume, py::call_guard<py::gil_scoped_release>(),
-           "Resume that actor, that was previously suspend()ed.");
+           "Resume that actor, that was previously suspend()ed.")
+      .def_static("kill_all", &Actor::kill_all, py::call_guard<py::gil_scoped_release>(), "Kill all actors but the caller.");
 }