- Drop xbt_dynar_shrink().
Python:
- - Added the following bindings: Comm.wait_for() and Comm.wait_any_for()
- Example: examples/python/comm-waitfor/
+ - Fixed the following bindings:
+ - Actor.kill_all() [previously declared a member of the Actor class although it is a static member]
+ - Added the following bindings:
+ - this_actor.warning()
+ - Mailbox.put_init() [example: examples/python/comm-waitallfor]
+ - Comm.detach() [example: examples/python/comm-waitallfor]
+ - Comm.wait_for() [example: examples/python/comm-waitfor/]
+ - Comm.wait_any_for()
+ - Comm.wait_all_for() [example: examples/python/comm-waitallfor]
Fixed bugs (FG#.. -> FramaGit bugs; FG!.. -> FG merge requests)
(FG: issues on Framagit; GH: issues on GitHub)
.. autofunction:: simgrid.this_actor.debug
.. autofunction:: simgrid.this_actor.info
+ .. autofunction:: simgrid.this_actor.warning
.. autofunction:: simgrid.this_actor.error
Sleeping
.. automethod:: simgrid.Mailbox.put
.. automethod:: simgrid.Mailbox.put_async
+ .. automethod:: simgrid.Mailbox.put_init
.. group-tab:: C
.. doxygenfunction:: simgrid::s4u::Comm::set_src_data(void *buff, size_t size)
.. doxygenfunction:: simgrid::s4u::Comm::set_src_data_size(size_t size)
+ .. group-tab:: Python
+
+ .. automethod:: simgrid.Comm.detach
+
Life cycle
----------
.. automethod:: simgrid.Comm.wait
.. automethod:: simgrid.Comm.wait_for
.. automethod:: simgrid.Comm.wait_all
+ .. automethod:: simgrid.Comm.wait_all_for
.. automethod:: simgrid.Comm.wait_any
.. automethod:: simgrid.Comm.wait_any_for
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+This example implements the following scenario:
+- Multiple workers consume jobs (Job) from a shared mailbox (worker)
+- A client first dispatches several jobs (with a simulated 'cost' - i.e. time the worker will 'process' the job)
+- The client then waits for all job results for a maximum time set by the 'wait timeout' (Comm.wait_all_for)
+- The client then displays the status of individual jobs.
+"""
+
+
+from argparse import ArgumentParser
+from dataclasses import dataclass
+from uuid import uuid4
+import sys
+
+from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
+
+
+SIMULATED_JOB_SIZE_BYTES = 1024
+SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
+
+
+def parse_requests(requests_str: str) -> list[float]:
+ return [float(item.strip()) for item in requests_str.split(",")]
+
+
+def create_parser() -> ArgumentParser:
+ parser = ArgumentParser()
+ parser.add_argument(
+ '--platform',
+ type=str,
+ help='path to the platform description'
+ )
+ parser.add_argument(
+ "--workers",
+ type=int,
+ default=1,
+ help="number of worker actors to start"
+ )
+ parser.add_argument(
+ "--jobs",
+ type=parse_requests,
+ default="1,2,3,4,5",
+ help="duration of individual jobs sent to the workers by the client"
+ )
+ parser.add_argument(
+ "--wait-timeout",
+ type=float,
+ default=5.0,
+ help="number of seconds before the client gives up waiting for results and aborts the simulation"
+ )
+ return parser
+
+
+@dataclass
+class Job:
+ job_id: str
+ duration: float
+ result_mailbox: Mailbox
+
+
+def worker(worker_id: str):
+ this_actor.info(f"{worker_id} started")
+ mailbox: Mailbox = Mailbox.by_name("worker")
+ while True:
+ job: Job = mailbox.get()
+ this_actor.info(f"{worker_id} working on {job.job_id} (will take {job.duration}s to complete)")
+ this_actor.sleep_for(job.duration)
+ job.result_mailbox.put(f"{worker_id}", SIMULATED_RESULT_SIZE_BYTES)
+
+
+@dataclass
+class AsyncJobResult:
+ job: Job
+ result_comm: Comm
+ async_data: PyGetAsync
+
+ @property
+ def complete(self) -> bool:
+ return self.result_comm.test()
+
+ @property
+ def status(self) -> str:
+ return "complete" if self.complete else "pending"
+
+
+def client(client_id: str, jobs: list[float], wait_timeout: float):
+ worker_mailbox: Mailbox = Mailbox.by_name("worker")
+ this_actor.info(f"{client_id} started")
+ async_job_results: list[AsyncJobResult] = []
+ for job_idx, job_duration in enumerate(jobs):
+ result_mailbox: Mailbox = Mailbox.by_name(str(uuid4()))
+ job = Job(job_id=f"job-{job_idx}", duration=job_duration, result_mailbox=result_mailbox)
+ out_comm = worker_mailbox.put_init(Job(
+ job_id=f"job-{job_idx}",
+ duration=job_duration,
+ result_mailbox=result_mailbox
+ ), SIMULATED_JOB_SIZE_BYTES)
+ out_comm.detach()
+ result_comm, async_data = result_mailbox.get_async()
+ async_job_results.append(AsyncJobResult(
+ job=job,
+ result_comm=result_comm,
+ async_data=async_data
+ ))
+ this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
+ completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
+ logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
+ logger(f"received {completed_comms}/{len(async_job_results)} results")
+ for result in async_job_results:
+ this_actor.info(f"{result.job.job_id}"
+ f" status={result.status}"
+ f" result_payload={result.async_data.get() if result.complete else ''}")
+
+
+def main():
+ settings = create_parser().parse_known_args()[0]
+ e = Engine(sys.argv)
+ e.load_platform(settings.platform)
+ Actor.create("client", Host.by_name("Tremblay"), client, "client", settings.jobs, settings.wait_timeout)
+ for worker_idx in range(settings.workers):
+ Actor.create("worker", Host.by_name("Ruby"), worker, f"worker-{worker_idx}").daemonize()
+ e.run()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+p Testing Comm.wait_all_for()
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout 1 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (2:worker@Ruby) worker-0 started
+>[ 0.000000] (1:client@Tremblay) client started
+>[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=1.0s)
+>[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
+>[ 1.000000] (1:client@Tremblay) received 0/5 results
+>[ 1.000000] (1:client@Tremblay) job-0 status=pending result_payload=
+>[ 1.000000] (1:client@Tremblay) job-1 status=pending result_payload=
+>[ 1.000000] (1:client@Tremblay) job-2 status=pending result_payload=
+>[ 1.000000] (1:client@Tremblay) job-3 status=pending result_payload=
+>[ 1.000000] (1:client@Tremblay) job-4 status=pending result_payload=
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout 5 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (2:worker@Ruby) worker-0 started
+>[ 0.000000] (1:client@Tremblay) client started
+>[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=5.0s)
+>[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
+>[ 1.008029] (2:worker@Ruby) worker-0 working on job-1 (will take 2.0s to complete)
+>[ 3.014105] (2:worker@Ruby) worker-0 working on job-2 (will take 3.0s to complete)
+>[ 5.000000] (1:client@Tremblay) received 2/5 results
+>[ 5.000000] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
+>[ 5.000000] (1:client@Tremblay) job-1 status=complete result_payload=worker-0
+>[ 5.000000] (1:client@Tremblay) job-2 status=pending result_payload=
+>[ 5.000000] (1:client@Tremblay) job-3 status=pending result_payload=
+>[ 5.000000] (1:client@Tremblay) job-4 status=pending result_payload=
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout -1 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (2:worker@Ruby) worker-0 started
+>[ 0.000000] (1:client@Tremblay) client started
+>[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=-1.0s)
+>[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
+>[ 1.008029] (2:worker@Ruby) worker-0 working on job-1 (will take 2.0s to complete)
+>[ 3.014105] (2:worker@Ruby) worker-0 working on job-2 (will take 3.0s to complete)
+>[ 6.020181] (2:worker@Ruby) worker-0 working on job-3 (will take 4.0s to complete)
+>[ 10.026257] (2:worker@Ruby) worker-0 working on job-4 (will take 5.0s to complete)
+>[ 15.030379] (1:client@Tremblay) received 5/5 results
+>[ 15.030379] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
+>[ 15.030379] (1:client@Tremblay) job-1 status=complete result_payload=worker-0
+>[ 15.030379] (1:client@Tremblay) job-2 status=complete result_payload=worker-0
+>[ 15.030379] (1:client@Tremblay) job-3 status=complete result_payload=worker-0
+>[ 15.030379] (1:client@Tremblay) job-4 status=complete result_payload=worker-0
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 5 --wait-timeout 3 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (2:worker@Ruby) worker-0 started
+>[ 0.000000] (3:worker@Ruby) worker-1 started
+>[ 0.000000] (4:worker@Ruby) worker-2 started
+>[ 0.000000] (5:worker@Ruby) worker-3 started
+>[ 0.000000] (6:worker@Ruby) worker-4 started
+>[ 0.000000] (1:client@Tremblay) client started
+>[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=3.0s)
+>[ 0.001954] (6:worker@Ruby) worker-4 working on job-4 (will take 5.0s to complete)
+>[ 0.001954] (5:worker@Ruby) worker-3 working on job-3 (will take 4.0s to complete)
+>[ 0.001954] (4:worker@Ruby) worker-2 working on job-2 (will take 3.0s to complete)
+>[ 0.001954] (3:worker@Ruby) worker-1 working on job-1 (will take 2.0s to complete)
+>[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
+>[ 3.000000] (1:client@Tremblay) received 2/5 results
+>[ 3.000000] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
+>[ 3.000000] (1:client@Tremblay) job-1 status=complete result_payload=worker-1
+>[ 3.000000] (1:client@Tremblay) job-2 status=pending result_payload=
+>[ 3.000000] (1:client@Tremblay) job-3 status=pending result_payload=
+>[ 3.000000] (1:client@Tremblay) job-4 status=pending result_payload=
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 5 --wait-timeout -1 --jobs 5,10,5,20,5,40,5,80,5,160 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (2:worker@Ruby) worker-0 started
+>[ 0.000000] (3:worker@Ruby) worker-1 started
+>[ 0.000000] (4:worker@Ruby) worker-2 started
+>[ 0.000000] (5:worker@Ruby) worker-3 started
+>[ 0.000000] (6:worker@Ruby) worker-4 started
+>[ 0.000000] (1:client@Tremblay) client started
+>[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=-1.0s)
+>[ 0.001954] (6:worker@Ruby) worker-4 working on job-4 (will take 5.0s to complete)
+>[ 0.001954] (5:worker@Ruby) worker-3 working on job-3 (will take 20.0s to complete)
+>[ 0.001954] (4:worker@Ruby) worker-2 working on job-2 (will take 5.0s to complete)
+>[ 0.001954] (3:worker@Ruby) worker-1 working on job-1 (will take 10.0s to complete)
+>[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 5.0s to complete)
+>[ 5.008029] (2:worker@Ruby) worker-0 working on job-7 (will take 80.0s to complete)
+>[ 5.008029] (4:worker@Ruby) worker-2 working on job-6 (will take 5.0s to complete)
+>[ 5.008029] (6:worker@Ruby) worker-4 working on job-5 (will take 40.0s to complete)
+>[ 10.008029] (3:worker@Ruby) worker-1 working on job-8 (will take 5.0s to complete)
+>[ 10.014105] (4:worker@Ruby) worker-2 working on job-9 (will take 160.0s to complete)
+>[170.018227] (1:client@Tremblay) received 10/10 results
+>[170.018227] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
+>[170.018227] (1:client@Tremblay) job-1 status=complete result_payload=worker-1
+>[170.018227] (1:client@Tremblay) job-2 status=complete result_payload=worker-2
+>[170.018227] (1:client@Tremblay) job-3 status=complete result_payload=worker-3
+>[170.018227] (1:client@Tremblay) job-4 status=complete result_payload=worker-4
+>[170.018227] (1:client@Tremblay) job-5 status=complete result_payload=worker-4
+>[170.018227] (1:client@Tremblay) job-6 status=complete result_payload=worker-2
+>[170.018227] (1:client@Tremblay) job-7 status=complete result_payload=worker-0
+>[170.018227] (1:client@Tremblay) job-8 status=complete result_payload=worker-1
+>[170.018227] (1:client@Tremblay) job-9 status=complete result_payload=worker-2
"debug", [](const char* s) { XBT_DEBUG("%s", s); }, "Display a logging message of 'debug' priority.")
.def(
"info", [](const char* s) { XBT_INFO("%s", s); }, "Display a logging message of 'info' priority.")
+ .def(
+ "warning", [](const char* s) { XBT_WARN("%s", s); }, "Display a logging message of 'warning' priority.")
.def(
"error", [](const char* s) { XBT_ERROR("%s", s); }, "Display a logging message of 'error' priority.")
.def("execute", py::overload_cast<double, double>(&simgrid::s4u::this_actor::execute),
return self->put_async(data.ptr(), size);
},
py::call_guard<py::gil_scoped_release>(), "Non-blocking data transmission")
+ .def(
+ "put_init",
+ [](Mailbox* self, py::object data, int size) {
+ data.inc_ref();
+ return self->put_init(data.ptr(), size);
+ },
+ py::call_guard<py::gil_scoped_release>(),
+ "Creates (but don’t start) a data transmission to that mailbox.")
.def(
"get",
[](Mailbox* self) {
.def("wait", &simgrid::s4u::Comm::wait, py::call_guard<py::gil_scoped_release>(),
"Block until the completion of that communication.")
.def("wait_for", &simgrid::s4u::Comm::wait_for,
+ py::arg("timeout"),
py::call_guard<py::gil_scoped_release>(),
"Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
+ .def("detach", [](simgrid::s4u::Comm* self) {
+ return self->detach();
+ },
+ py::return_value_policy::reference_internal,
+ py::call_guard<py::gil_scoped_release>(),
+ "Start the comm, and ignore its result. It can be completely forgotten after that.")
// use py::overload_cast for wait_all/wait_any, until the overload marked XBT_ATTRIB_DEPRECATED_v332 is removed
.def_static(
"wait_all", py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&>(&simgrid::s4u::Comm::wait_all),
- py::call_guard<py::gil_scoped_release>(), "Block until the completion of all communications in the list.")
+ py::arg("comms"),
+ py::call_guard<py::gil_scoped_release>(),
+ "Block until the completion of all communications in the list.")
+ .def_static("wait_all_for", &simgrid::s4u::Comm::wait_all_for,
+ py::arg("comms"), py::arg("timeout"),
+ py::call_guard<py::gil_scoped_release>(),
+ "Block until the completion of all communications in the list, or raises TimeoutException after "
+ "the specified timeout.")
.def_static(
"wait_any", py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&>(&simgrid::s4u::Comm::wait_any),
+ py::arg("comms"),
py::call_guard<py::gil_scoped_release>(),
"Block until the completion of any communication in the list and return the index of the terminated one.")
.def_static(
"wait_any_for",
py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&, double>(&simgrid::s4u::Comm::wait_any_for),
+ py::arg("comms"), py::arg("timeout"),
py::call_guard<py::gil_scoped_release>(),
"Block until the completion of any communication in the list and return the index of the terminated "
"one, or -1 if a timeout occurred."
.def("join", py::overload_cast<double>(&Actor::join, py::const_), py::call_guard<py::gil_scoped_release>(),
"Wait for the actor to finish (more info in the C++ documentation).", py::arg("timeout"))
.def("kill", &Actor::kill, py::call_guard<py::gil_scoped_release>(), "Kill that actor")
- .def("kill_all", &Actor::kill_all, py::call_guard<py::gil_scoped_release>(), "Kill all actors but the caller.")
.def("self", &Actor::self, "Retrieves the current actor.")
.def("is_suspended", &Actor::is_suspended, "Returns True if that actor is currently suspended.")
.def("suspend", &Actor::suspend, py::call_guard<py::gil_scoped_release>(),
"Suspend that actor, that is blocked until resume()ed by another actor.")
.def("resume", &Actor::resume, py::call_guard<py::gil_scoped_release>(),
- "Resume that actor, that was previously suspend()ed.");
+ "Resume that actor, that was previously suspend()ed.")
+ .def_static("kill_all", &Actor::kill_all, py::call_guard<py::gil_scoped_release>(), "Kill all actors but the caller.");
}