From e2ce4181c0c71b95daaf02cb52cb80d9936dff24 Mon Sep 17 00:00:00 2001 From: Jean-Edouard BOULANGER Date: Wed, 9 Mar 2022 18:57:02 +0100 Subject: [PATCH] Add remaining Comm synchronisation Python bindings --- ChangeLog | 11 +- docs/source/app_s4u.rst | 7 + .../python/comm-waitallfor/comm-waitallfor.py | 131 ++++++++++++++++++ .../comm-waitallfor/comm-waitallfor.tesh | 95 +++++++++++++ src/bindings/python/simgrid_python.cpp | 32 ++++- 5 files changed, 271 insertions(+), 5 deletions(-) create mode 100644 examples/python/comm-waitallfor/comm-waitallfor.py create mode 100644 examples/python/comm-waitallfor/comm-waitallfor.tesh diff --git a/ChangeLog b/ChangeLog index 0e36dac86b..48b717b1cb 100644 --- a/ChangeLog +++ b/ChangeLog @@ -38,8 +38,15 @@ XBT: - Drop xbt_dynar_shrink(). Python: - - Added the following bindings: Comm.wait_for() and Comm.wait_any_for() - Example: examples/python/comm-waitfor/ + - Fixed the following bindings: + - Actor.kill_all() [previously declared a member of the Actor class although it is a static member] + - Added the following bindings: + - this_actor.warning() + - Mailbox.put_init() [example: examples/python/comm-waitallfor] + - Comm.detach() [example: examples/python/comm-waitallfor] + - Comm.wait_for() [example: examples/python/comm-waitfor/] + - Comm.wait_any_for() + - Comm.wait_all_for() [example: examples/python/comm-waitallfor] Fixed bugs (FG#.. -> FramaGit bugs; FG!.. -> FG merge requests) (FG: issues on Framagit; GH: issues on GitHub) diff --git a/docs/source/app_s4u.rst b/docs/source/app_s4u.rst index 35c7f24c5c..eefd497ba9 100644 --- a/docs/source/app_s4u.rst +++ b/docs/source/app_s4u.rst @@ -701,6 +701,7 @@ Logging messages .. autofunction:: simgrid.this_actor.debug .. autofunction:: simgrid.this_actor.info + .. autofunction:: simgrid.this_actor.warning .. autofunction:: simgrid.this_actor.error Sleeping @@ -1047,6 +1048,7 @@ Sending data .. automethod:: simgrid.Mailbox.put .. automethod:: simgrid.Mailbox.put_async + .. automethod:: simgrid.Mailbox.put_init .. group-tab:: C @@ -2183,6 +2185,10 @@ Querying info .. doxygenfunction:: simgrid::s4u::Comm::set_src_data(void *buff, size_t size) .. doxygenfunction:: simgrid::s4u::Comm::set_src_data_size(size_t size) + .. group-tab:: Python + + .. automethod:: simgrid.Comm.detach + Life cycle ---------- @@ -2215,6 +2221,7 @@ also start direct communications as shown below. .. automethod:: simgrid.Comm.wait .. automethod:: simgrid.Comm.wait_for .. automethod:: simgrid.Comm.wait_all + .. automethod:: simgrid.Comm.wait_all_for .. automethod:: simgrid.Comm.wait_any .. automethod:: simgrid.Comm.wait_any_for diff --git a/examples/python/comm-waitallfor/comm-waitallfor.py b/examples/python/comm-waitallfor/comm-waitallfor.py new file mode 100644 index 0000000000..d7af7862ed --- /dev/null +++ b/examples/python/comm-waitallfor/comm-waitallfor.py @@ -0,0 +1,131 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +""" +This example implements the following scenario: +- Multiple workers consume jobs (Job) from a shared mailbox (worker) +- A client first dispatches several jobs (with a simulated 'cost' - i.e. time the worker will 'process' the job) +- The client then waits for all job results for a maximum time set by the 'wait timeout' (Comm.wait_all_for) +- The client then displays the status of individual jobs. +""" + + +from argparse import ArgumentParser +from dataclasses import dataclass +from uuid import uuid4 +import sys + +from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor + + +SIMULATED_JOB_SIZE_BYTES = 1024 +SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024 + + +def parse_requests(requests_str: str) -> list[float]: + return [float(item.strip()) for item in requests_str.split(",")] + + +def create_parser() -> ArgumentParser: + parser = ArgumentParser() + parser.add_argument( + '--platform', + type=str, + help='path to the platform description' + ) + parser.add_argument( + "--workers", + type=int, + default=1, + help="number of worker actors to start" + ) + parser.add_argument( + "--jobs", + type=parse_requests, + default="1,2,3,4,5", + help="duration of individual jobs sent to the workers by the client" + ) + parser.add_argument( + "--wait-timeout", + type=float, + default=5.0, + help="number of seconds before the client gives up waiting for results and aborts the simulation" + ) + return parser + + +@dataclass +class Job: + job_id: str + duration: float + result_mailbox: Mailbox + + +def worker(worker_id: str): + this_actor.info(f"{worker_id} started") + mailbox: Mailbox = Mailbox.by_name("worker") + while True: + job: Job = mailbox.get() + this_actor.info(f"{worker_id} working on {job.job_id} (will take {job.duration}s to complete)") + this_actor.sleep_for(job.duration) + job.result_mailbox.put(f"{worker_id}", SIMULATED_RESULT_SIZE_BYTES) + + +@dataclass +class AsyncJobResult: + job: Job + result_comm: Comm + async_data: PyGetAsync + + @property + def complete(self) -> bool: + return self.result_comm.test() + + @property + def status(self) -> str: + return "complete" if self.complete else "pending" + + +def client(client_id: str, jobs: list[float], wait_timeout: float): + worker_mailbox: Mailbox = Mailbox.by_name("worker") + this_actor.info(f"{client_id} started") + async_job_results: list[AsyncJobResult] = [] + for job_idx, job_duration in enumerate(jobs): + result_mailbox: Mailbox = Mailbox.by_name(str(uuid4())) + job = Job(job_id=f"job-{job_idx}", duration=job_duration, result_mailbox=result_mailbox) + out_comm = worker_mailbox.put_init(Job( + job_id=f"job-{job_idx}", + duration=job_duration, + result_mailbox=result_mailbox + ), SIMULATED_JOB_SIZE_BYTES) + out_comm.detach() + result_comm, async_data = result_mailbox.get_async() + async_job_results.append(AsyncJobResult( + job=job, + result_comm=result_comm, + async_data=async_data + )) + this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)") + completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout) + logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info + logger(f"received {completed_comms}/{len(async_job_results)} results") + for result in async_job_results: + this_actor.info(f"{result.job.job_id}" + f" status={result.status}" + f" result_payload={result.async_data.get() if result.complete else ''}") + + +def main(): + settings = create_parser().parse_known_args()[0] + e = Engine(sys.argv) + e.load_platform(settings.platform) + Actor.create("client", Host.by_name("Tremblay"), client, "client", settings.jobs, settings.wait_timeout) + for worker_idx in range(settings.workers): + Actor.create("worker", Host.by_name("Ruby"), worker, f"worker-{worker_idx}").daemonize() + e.run() + + +if __name__ == "__main__": + main() diff --git a/examples/python/comm-waitallfor/comm-waitallfor.tesh b/examples/python/comm-waitallfor/comm-waitallfor.tesh new file mode 100644 index 0000000000..50cc88c625 --- /dev/null +++ b/examples/python/comm-waitallfor/comm-waitallfor.tesh @@ -0,0 +1,95 @@ +#!/usr/bin/env tesh + +p Testing Comm.wait_all_for() + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout 1 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (2:worker@Ruby) worker-0 started +>[ 0.000000] (1:client@Tremblay) client started +>[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=1.0s) +>[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete) +>[ 1.000000] (1:client@Tremblay) received 0/5 results +>[ 1.000000] (1:client@Tremblay) job-0 status=pending result_payload= +>[ 1.000000] (1:client@Tremblay) job-1 status=pending result_payload= +>[ 1.000000] (1:client@Tremblay) job-2 status=pending result_payload= +>[ 1.000000] (1:client@Tremblay) job-3 status=pending result_payload= +>[ 1.000000] (1:client@Tremblay) job-4 status=pending result_payload= + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout 5 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (2:worker@Ruby) worker-0 started +>[ 0.000000] (1:client@Tremblay) client started +>[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=5.0s) +>[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete) +>[ 1.008029] (2:worker@Ruby) worker-0 working on job-1 (will take 2.0s to complete) +>[ 3.014105] (2:worker@Ruby) worker-0 working on job-2 (will take 3.0s to complete) +>[ 5.000000] (1:client@Tremblay) received 2/5 results +>[ 5.000000] (1:client@Tremblay) job-0 status=complete result_payload=worker-0 +>[ 5.000000] (1:client@Tremblay) job-1 status=complete result_payload=worker-0 +>[ 5.000000] (1:client@Tremblay) job-2 status=pending result_payload= +>[ 5.000000] (1:client@Tremblay) job-3 status=pending result_payload= +>[ 5.000000] (1:client@Tremblay) job-4 status=pending result_payload= + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout -1 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (2:worker@Ruby) worker-0 started +>[ 0.000000] (1:client@Tremblay) client started +>[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=-1.0s) +>[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete) +>[ 1.008029] (2:worker@Ruby) worker-0 working on job-1 (will take 2.0s to complete) +>[ 3.014105] (2:worker@Ruby) worker-0 working on job-2 (will take 3.0s to complete) +>[ 6.020181] (2:worker@Ruby) worker-0 working on job-3 (will take 4.0s to complete) +>[ 10.026257] (2:worker@Ruby) worker-0 working on job-4 (will take 5.0s to complete) +>[ 15.030379] (1:client@Tremblay) received 5/5 results +>[ 15.030379] (1:client@Tremblay) job-0 status=complete result_payload=worker-0 +>[ 15.030379] (1:client@Tremblay) job-1 status=complete result_payload=worker-0 +>[ 15.030379] (1:client@Tremblay) job-2 status=complete result_payload=worker-0 +>[ 15.030379] (1:client@Tremblay) job-3 status=complete result_payload=worker-0 +>[ 15.030379] (1:client@Tremblay) job-4 status=complete result_payload=worker-0 + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 5 --wait-timeout 3 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (2:worker@Ruby) worker-0 started +>[ 0.000000] (3:worker@Ruby) worker-1 started +>[ 0.000000] (4:worker@Ruby) worker-2 started +>[ 0.000000] (5:worker@Ruby) worker-3 started +>[ 0.000000] (6:worker@Ruby) worker-4 started +>[ 0.000000] (1:client@Tremblay) client started +>[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=3.0s) +>[ 0.001954] (6:worker@Ruby) worker-4 working on job-4 (will take 5.0s to complete) +>[ 0.001954] (5:worker@Ruby) worker-3 working on job-3 (will take 4.0s to complete) +>[ 0.001954] (4:worker@Ruby) worker-2 working on job-2 (will take 3.0s to complete) +>[ 0.001954] (3:worker@Ruby) worker-1 working on job-1 (will take 2.0s to complete) +>[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete) +>[ 3.000000] (1:client@Tremblay) received 2/5 results +>[ 3.000000] (1:client@Tremblay) job-0 status=complete result_payload=worker-0 +>[ 3.000000] (1:client@Tremblay) job-1 status=complete result_payload=worker-1 +>[ 3.000000] (1:client@Tremblay) job-2 status=pending result_payload= +>[ 3.000000] (1:client@Tremblay) job-3 status=pending result_payload= +>[ 3.000000] (1:client@Tremblay) job-4 status=pending result_payload= + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 5 --wait-timeout -1 --jobs 5,10,5,20,5,40,5,80,5,160 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +>[ 0.000000] (2:worker@Ruby) worker-0 started +>[ 0.000000] (3:worker@Ruby) worker-1 started +>[ 0.000000] (4:worker@Ruby) worker-2 started +>[ 0.000000] (5:worker@Ruby) worker-3 started +>[ 0.000000] (6:worker@Ruby) worker-4 started +>[ 0.000000] (1:client@Tremblay) client started +>[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=-1.0s) +>[ 0.001954] (6:worker@Ruby) worker-4 working on job-4 (will take 5.0s to complete) +>[ 0.001954] (5:worker@Ruby) worker-3 working on job-3 (will take 20.0s to complete) +>[ 0.001954] (4:worker@Ruby) worker-2 working on job-2 (will take 5.0s to complete) +>[ 0.001954] (3:worker@Ruby) worker-1 working on job-1 (will take 10.0s to complete) +>[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 5.0s to complete) +>[ 5.008029] (2:worker@Ruby) worker-0 working on job-7 (will take 80.0s to complete) +>[ 5.008029] (4:worker@Ruby) worker-2 working on job-6 (will take 5.0s to complete) +>[ 5.008029] (6:worker@Ruby) worker-4 working on job-5 (will take 40.0s to complete) +>[ 10.008029] (3:worker@Ruby) worker-1 working on job-8 (will take 5.0s to complete) +>[ 10.014105] (4:worker@Ruby) worker-2 working on job-9 (will take 160.0s to complete) +>[170.018227] (1:client@Tremblay) received 10/10 results +>[170.018227] (1:client@Tremblay) job-0 status=complete result_payload=worker-0 +>[170.018227] (1:client@Tremblay) job-1 status=complete result_payload=worker-1 +>[170.018227] (1:client@Tremblay) job-2 status=complete result_payload=worker-2 +>[170.018227] (1:client@Tremblay) job-3 status=complete result_payload=worker-3 +>[170.018227] (1:client@Tremblay) job-4 status=complete result_payload=worker-4 +>[170.018227] (1:client@Tremblay) job-5 status=complete result_payload=worker-4 +>[170.018227] (1:client@Tremblay) job-6 status=complete result_payload=worker-2 +>[170.018227] (1:client@Tremblay) job-7 status=complete result_payload=worker-0 +>[170.018227] (1:client@Tremblay) job-8 status=complete result_payload=worker-1 +>[170.018227] (1:client@Tremblay) job-9 status=complete result_payload=worker-2 diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index 3d2fa256bb..60ce5f652f 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -100,6 +100,8 @@ PYBIND11_MODULE(simgrid, m) "debug", [](const char* s) { XBT_DEBUG("%s", s); }, "Display a logging message of 'debug' priority.") .def( "info", [](const char* s) { XBT_INFO("%s", s); }, "Display a logging message of 'info' priority.") + .def( + "warning", [](const char* s) { XBT_WARN("%s", s); }, "Display a logging message of 'warning' priority.") .def( "error", [](const char* s) { XBT_ERROR("%s", s); }, "Display a logging message of 'error' priority.") .def("execute", py::overload_cast(&simgrid::s4u::this_actor::execute), @@ -653,6 +655,14 @@ PYBIND11_MODULE(simgrid, m) return self->put_async(data.ptr(), size); }, py::call_guard(), "Non-blocking data transmission") + .def( + "put_init", + [](Mailbox* self, py::object data, int size) { + data.inc_ref(); + return self->put_init(data.ptr(), size); + }, + py::call_guard(), + "Creates (but don’t start) a data transmission to that mailbox.") .def( "get", [](Mailbox* self) { @@ -689,19 +699,35 @@ PYBIND11_MODULE(simgrid, m) .def("wait", &simgrid::s4u::Comm::wait, py::call_guard(), "Block until the completion of that communication.") .def("wait_for", &simgrid::s4u::Comm::wait_for, + py::arg("timeout"), py::call_guard(), "Block until the completion of that communication, or raises TimeoutException after the specified timeout.") + .def("detach", [](simgrid::s4u::Comm* self) { + return self->detach(); + }, + py::return_value_policy::reference_internal, + py::call_guard(), + "Start the comm, and ignore its result. It can be completely forgotten after that.") // use py::overload_cast for wait_all/wait_any, until the overload marked XBT_ATTRIB_DEPRECATED_v332 is removed .def_static( "wait_all", py::overload_cast&>(&simgrid::s4u::Comm::wait_all), - py::call_guard(), "Block until the completion of all communications in the list.") + py::arg("comms"), + py::call_guard(), + "Block until the completion of all communications in the list.") + .def_static("wait_all_for", &simgrid::s4u::Comm::wait_all_for, + py::arg("comms"), py::arg("timeout"), + py::call_guard(), + "Block until the completion of all communications in the list, or raises TimeoutException after " + "the specified timeout.") .def_static( "wait_any", py::overload_cast&>(&simgrid::s4u::Comm::wait_any), + py::arg("comms"), py::call_guard(), "Block until the completion of any communication in the list and return the index of the terminated one.") .def_static( "wait_any_for", py::overload_cast&, double>(&simgrid::s4u::Comm::wait_any_for), + py::arg("comms"), py::arg("timeout"), py::call_guard(), "Block until the completion of any communication in the list and return the index of the terminated " "one, or -1 if a timeout occurred." @@ -798,11 +824,11 @@ PYBIND11_MODULE(simgrid, m) .def("join", py::overload_cast(&Actor::join, py::const_), py::call_guard(), "Wait for the actor to finish (more info in the C++ documentation).", py::arg("timeout")) .def("kill", &Actor::kill, py::call_guard(), "Kill that actor") - .def("kill_all", &Actor::kill_all, py::call_guard(), "Kill all actors but the caller.") .def("self", &Actor::self, "Retrieves the current actor.") .def("is_suspended", &Actor::is_suspended, "Returns True if that actor is currently suspended.") .def("suspend", &Actor::suspend, py::call_guard(), "Suspend that actor, that is blocked until resume()ed by another actor.") .def("resume", &Actor::resume, py::call_guard(), - "Resume that actor, that was previously suspend()ed."); + "Resume that actor, that was previously suspend()ed.") + .def_static("kill_all", &Actor::kill_all, py::call_guard(), "Kill all actors but the caller."); } -- 2.20.1