From cfb3987654b857620ba388fab9883a095debeb72 Mon Sep 17 00:00:00 2001 From: Martin Quinson Date: Mon, 24 Jul 2023 19:33:41 +0200 Subject: [PATCH] Introduce a Mailbox::get_async() with no payload parameter You can use the new Comm::get_payload() once the communication is over to retrieve the payload. --- ChangeLog | 3 ++ .../activityset-testany.py | 2 +- .../activityset-waitall.py | 2 +- .../activityset-waitallfor.py | 2 +- .../activityset-waitany.py | 2 +- examples/python/comm-testany/comm-testany.py | 6 ++-- .../python/comm-waitallfor/comm-waitallfor.py | 16 +++++----- .../network-nonlinear/network-nonlinear.py | 7 ++--- .../platform-comm-serialize.py | 9 +++--- include/simgrid/s4u/Comm.hpp | 3 ++ include/simgrid/s4u/Mailbox.hpp | 3 ++ src/bindings/python/simgrid_python.cpp | 30 +++++-------------- src/kernel/activity/CommImpl.cpp | 6 ++-- src/kernel/activity/CommImpl.hpp | 1 + src/s4u/s4u_Comm.cpp | 16 ++++++++-- src/s4u/s4u_Mailbox.cpp | 7 +++++ teshsuite/s4u/CMakeLists.txt | 2 +- 17 files changed, 63 insertions(+), 54 deletions(-) diff --git a/ChangeLog b/ChangeLog index fd83efffeb..158d75bcf4 100644 --- a/ChangeLog +++ b/ChangeLog @@ -5,6 +5,8 @@ S4U: - New function NetZone::add_route(host1, host2, links) when you don't need gateways Also add a variant with s4u::Link, when you don't want to specify the directions on symmetric routes. + - Introduce a Mailbox::get_async() with no payload parameter. You can use the new + Comm::get_payload() once the communication is over to retrieve the payload. SMPI: - New SMPI_app_instance_join(): wait for the completion of a started MPI instance @@ -12,6 +14,7 @@ SMPI: Python: - Make the host_load plugin available from Python. See examples/python/plugin-host-load + - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead. ---------------------------------------------------------------------------- diff --git a/examples/python/activityset-testany/activityset-testany.py b/examples/python/activityset-testany/activityset-testany.py index 34f1e9a7f9..6a8901ff70 100644 --- a/examples/python/activityset-testany/activityset-testany.py +++ b/examples/python/activityset-testany/activityset-testany.py @@ -16,7 +16,7 @@ def bob(): this_actor.info("Create my asynchronous activities") exec = this_actor.exec_async(5e9) - comm, payload = mbox.get_async() + comm = mbox.get_async() io = disk.read_async(300000000) pending_activities = ActivitySet([exec, comm]) diff --git a/examples/python/activityset-waitall/activityset-waitall.py b/examples/python/activityset-waitall/activityset-waitall.py index 36b70b8569..4e4d1eede5 100644 --- a/examples/python/activityset-waitall/activityset-waitall.py +++ b/examples/python/activityset-waitall/activityset-waitall.py @@ -16,7 +16,7 @@ def bob(): this_actor.info("Create my asynchronous activities") exec = this_actor.exec_async(5e9) - comm, payload = mbox.get_async() + comm = mbox.get_async() io = disk.read_async(300000000) pending_activities = ActivitySet([exec, comm]) diff --git a/examples/python/activityset-waitallfor/activityset-waitallfor.py b/examples/python/activityset-waitallfor/activityset-waitallfor.py index 4f2880935f..44b3c6f5b2 100644 --- a/examples/python/activityset-waitallfor/activityset-waitallfor.py +++ b/examples/python/activityset-waitallfor/activityset-waitallfor.py @@ -16,7 +16,7 @@ def bob(): this_actor.info("Create my asynchronous activities") exec = this_actor.exec_async(5e9) - comm, payload = mbox.get_async() + comm = mbox.get_async() io = disk.read_async(300000000) pending_activities = ActivitySet([exec, comm]) diff --git a/examples/python/activityset-waitany/activityset-waitany.py b/examples/python/activityset-waitany/activityset-waitany.py index 4f16d7da20..88ac531128 100644 --- a/examples/python/activityset-waitany/activityset-waitany.py +++ b/examples/python/activityset-waitany/activityset-waitany.py @@ -16,7 +16,7 @@ def bob(): this_actor.info("Create my asynchronous activities") exec = this_actor.exec_async(5e9) - comm, payload = mbox.get_async() + comm = mbox.get_async() io = disk.read_async(300000000) pending_activities = ActivitySet([exec, comm]) diff --git a/examples/python/comm-testany/comm-testany.py b/examples/python/comm-testany/comm-testany.py index 52220cf726..84469b078b 100644 --- a/examples/python/comm-testany/comm-testany.py +++ b/examples/python/comm-testany/comm-testany.py @@ -24,9 +24,9 @@ def create_parser() -> ArgumentParser: def rank0(): rank0_mailbox: Mailbox = Mailbox.by_name("rank0") this_actor.info("Post my asynchronous receives") - comm1, a1 = rank0_mailbox.get_async() - comm2, a2 = rank0_mailbox.get_async() - comm3, a3 = rank0_mailbox.get_async() + comm1 = rank0_mailbox.get_async() + comm2 = rank0_mailbox.get_async() + comm3 = rank0_mailbox.get_async() pending_comms: List[Comm] = [comm1, comm2, comm3] this_actor.info("Send some data to rank-1") diff --git a/examples/python/comm-waitallfor/comm-waitallfor.py b/examples/python/comm-waitallfor/comm-waitallfor.py index 0f387951d2..d72490ba4b 100644 --- a/examples/python/comm-waitallfor/comm-waitallfor.py +++ b/examples/python/comm-waitallfor/comm-waitallfor.py @@ -18,7 +18,7 @@ from typing import List from uuid import uuid4 import sys -from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor +from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor SIMULATED_JOB_SIZE_BYTES = 1024 @@ -78,12 +78,11 @@ def worker(worker_id: str): @dataclass class AsyncJobResult: job: Job - result_comm: Comm - async_data: PyGetAsync + comm: Comm @property def complete(self) -> bool: - return self.result_comm.test() + return self.comm.test() @property def status(self) -> str: @@ -103,20 +102,19 @@ def client(client_id: str, jobs: List[float], wait_timeout: float): result_mailbox=result_mailbox ), SIMULATED_JOB_SIZE_BYTES) out_comm.detach() - result_comm, async_data = result_mailbox.get_async() + result_comm = result_mailbox.get_async() async_job_results.append(AsyncJobResult( job=job, - result_comm=result_comm, - async_data=async_data + comm=result_comm )) this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)") - completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout) + completed_comms = Comm.wait_all_for([entry.comm for entry in async_job_results], wait_timeout) logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info logger(f"received {completed_comms}/{len(async_job_results)} results") for result in async_job_results: this_actor.info(f"{result.job.job_id}" f" status={result.status}" - f" result_payload={result.async_data.get() if result.complete else ''}") + f" result_payload={result.comm.get_payload() if result.complete else ''}") def main(): diff --git a/examples/python/network-nonlinear/network-nonlinear.py b/examples/python/network-nonlinear/network-nonlinear.py index 7bc12183b6..96b098122f 100644 --- a/examples/python/network-nonlinear/network-nonlinear.py +++ b/examples/python/network-nonlinear/network-nonlinear.py @@ -50,21 +50,18 @@ class Receiver: def __call__(self): mbox = Mailbox.by_name("receiver") - pending_msgs = [] pending_comms = [] this_actor.info("Wait for %d messages asynchronously" % self.msg_count) for _ in range(self.msg_count): - comm, data = mbox.get_async() + comm = mbox.get_async() pending_comms.append(comm) - pending_msgs.append(data) while pending_comms: index = Comm.wait_any(pending_comms) - msg = pending_msgs[index].get() + msg = pending_comms[index].get_payload() this_actor.info("I got '%s'." % msg) del pending_comms[index] - del pending_msgs[index] #################################################################################################### def link_nonlinear(link: Link, capacity: float, n: int) -> float: diff --git a/examples/python/platform-comm-serialize/platform-comm-serialize.py b/examples/python/platform-comm-serialize/platform-comm-serialize.py index bbd871d869..eaa880c1cb 100644 --- a/examples/python/platform-comm-serialize/platform-comm-serialize.py +++ b/examples/python/platform-comm-serialize/platform-comm-serialize.py @@ -6,7 +6,7 @@ from typing import List, Tuple import sys -from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync +from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor RECEIVER_MAILBOX_NAME = "receiver" @@ -44,14 +44,13 @@ class Receiver(object): def __call__(self): # List in which we store all incoming msgs - pending_comms: List[Tuple[Comm, PyGetAsync]] = [] + pending_comms: List[Comm] = [] this_actor.info(f"Wait for {self.messages_count} messages asynchronously") for _ in range(self.messages_count): pending_comms.append(self.mailbox.get_async()) while pending_comms: - index = Comm.wait_any([comm for (comm, _) in pending_comms]) - _, async_data = pending_comms[index] - this_actor.info(f"I got '{async_data.get()}'.") + index = Comm.wait_any(pending_comms) + this_actor.info(f"I got '{pending_comms[index].get_payload()}'.") pending_comms.pop(index) diff --git a/include/simgrid/s4u/Comm.hpp b/include/simgrid/s4u/Comm.hpp index 3b1f417627..c8a0156c36 100644 --- a/include/simgrid/s4u/Comm.hpp +++ b/include/simgrid/s4u/Comm.hpp @@ -151,6 +151,9 @@ public: void* get_dst_data() const { return dst_buff_; } /** Retrieve the size of the received data. Not to be mixed with @ref Activity::get_remaining() */ size_t get_dst_data_size() const { return dst_buff_size_; } + /** Retrieve the payload associated to the communication. You can only do that once the comm is (gracefully) + * terminated, and it is only setup by the default copy_data callback (not the SMPI one) */ + void* get_payload() const; /* Common functions */ diff --git a/include/simgrid/s4u/Mailbox.hpp b/include/simgrid/s4u/Mailbox.hpp index b5723dfe20..b8eddff976 100644 --- a/include/simgrid/s4u/Mailbox.hpp +++ b/include/simgrid/s4u/Mailbox.hpp @@ -118,6 +118,9 @@ public: CommPtr get_init(); /** Creates and start an async data reception to that mailbox */ template CommPtr get_async(T** data); + /** Creates and start an async data reception to that mailbox. Since the data location is not provided, you'll have to + * use Comm::get_payload once the comm terminates */ + CommPtr get_async(); /** Blocking data reception */ template T* get(); diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index a8e9ce79da..04f0a167a2 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -76,15 +76,6 @@ std::string get_simgrid_version() sg_version_get(&major, &minor, &patch); return simgrid::xbt::string_printf("%i.%i.%i", major, minor, patch); } - -/** @brief Wrap for mailbox::get_async */ -class PyGetAsync { - std::unique_ptr data = std::make_unique(); - -public: - PyObject** get() const { return data.get(); } -}; - } // namespace PYBIND11_DECLARE_HOLDER_TYPE(T, boost::intrusive_ptr) @@ -634,26 +625,14 @@ PYBIND11_MODULE(simgrid, m) "get", [](Mailbox* self) { return py::reinterpret_steal(self->get()); }, py::call_guard(), "Blocking data reception") .def( - "get_async", - [](Mailbox* self) -> std::tuple { - PyGetAsync wrap; - auto comm = self->get_async(wrap.get()); - return std::make_tuple(std::move(comm), std::move(wrap)); - }, + "get_async", [](Mailbox* self) -> CommPtr { return self->get_async(); }, py::call_guard(), "Non-blocking data reception. Use data.get() to get the python object after the communication has finished") .def("set_receiver", &Mailbox::set_receiver, py::call_guard(), "Sets the actor as permanent receiver"); - /* Class PyGetAsync */ - py::class_(m, "PyGetAsync", "Wrapper for async get communications") - .def(py::init<>()) - .def( - "get", [](const PyGetAsync* self) { return py::reinterpret_steal(*(self->get())); }, - "Get python object after async communication in receiver side"); - /* class Activity */ - py::class_(m, "Activityy", "Activity. See the C++ documentation for details."); + py::class_(m, "Activity", "Activity. See the C++ documentation for details."); /* Class Comm */ py::class_(m, "Comm", "Communication. See the C++ documentation for details.") @@ -693,6 +672,11 @@ PYBIND11_MODULE(simgrid, m) "Block until the completion of that communication, or raises TimeoutException after the specified timeout.") .def("wait_until", &Comm::wait_until, py::call_guard(), py::arg("time_limit"), "Block until the completion of that communication, or raises TimeoutException after the specified time.") + .def( + "get_payload", + [](const Comm* self) { return py::reinterpret_steal((PyObject*)self->get_payload()); }, + py::call_guard(), + "Retrieve the message's payload of a get_async. You cannot call this until after the comm termination.") .def("detach", py::overload_cast<>(&Comm::detach), py::return_value_policy::reference_internal, py::call_guard(), "Start the comm, and ignore its result. It can be completely forgotten after that.") diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index e5c1ddf071..25a76617e4 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -37,7 +37,9 @@ CommImpl::CommImpl() std::function CommImpl::copy_data_callback_ = [](kernel::activity::CommImpl* comm, void* buff, size_t buff_size) { xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size); - *(void**)(comm->dst_buff_) = buff; + if (comm->dst_buff_ != nullptr) // get_async provided a buffer + *(void**)(comm->dst_buff_) = buff; + comm->payload_ = buff; // Setup what will be retrieved by s4u::Comm::get_payload() }; void CommImpl::set_copy_data_callback(const std::function& callback) @@ -192,7 +194,7 @@ void CommImpl::copy_data() { size_t buff_size = src_buff_size_; /* If there is no data to copy then return */ - if (not src_buff_ || not dst_buff_ || copied_) + if (not src_buff_ || not dst_buff_size_ || copied_) return; XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this, diff --git a/src/kernel/activity/CommImpl.hpp b/src/kernel/activity/CommImpl.hpp index 4fcf67c680..f3d98399ca 100644 --- a/src/kernel/activity/CommImpl.hpp +++ b/src/kernel/activity/CommImpl.hpp @@ -98,6 +98,7 @@ expectations of the other side, too. See */ unsigned char* dst_buff_ = nullptr; size_t src_buff_size_ = 0; size_t* dst_buff_size_ = nullptr; + void* payload_ = nullptr; // If dst_buff_ is NULL, the default copy callback puts the data here void* src_data_ = nullptr; /* User data associated to the communication */ void* dst_data_ = nullptr; diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index 4b7030e994..404321acd1 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -283,6 +284,14 @@ CommPtr Comm::set_payload_size(uint64_t bytes) return this; } +void* Comm::get_payload() const +{ + xbt_assert(get_state() == State::FINISHED, + "You can only retrieve the payload of a communication that gracefully terminated, but its state is %s.", + get_state_str()); + return static_cast(pimpl_.get())->payload_; +} + Actor* Comm::get_sender() const { kernel::actor::ActorImplPtr sender = nullptr; @@ -309,6 +318,9 @@ Comm* Comm::do_start() { xbt_assert(get_state() == State::INITED || get_state() == State::STARTING, "You cannot use %s() once your communication started (not implemented)", __FUNCTION__); + + auto myself = kernel::actor::ActorImpl::self(); + if (get_source() != nullptr || get_destination() != nullptr) { xbt_assert(is_assigned(), "When either from_ or to_ is specified, both must be."); xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr, @@ -320,7 +332,7 @@ Comm* Comm::do_start() }); fire_on_start(); fire_on_this_start(); - } else if (src_buff_ != nullptr) { // Sender side + } else if (myself == sender_) { on_send(*this); on_this_send(*this); kernel::actor::CommIsendSimcall observer{sender_, @@ -337,7 +349,7 @@ Comm* Comm::do_start() "Isend"}; pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); }, &observer); - } else if (dst_buff_ != nullptr) { // Receiver side + } else if (myself == receiver_) { xbt_assert(not detached_, "Receive cannot be detached"); on_recv(*this); on_this_recv(*this); diff --git a/src/s4u/s4u_Mailbox.cpp b/src/s4u/s4u_Mailbox.cpp index 6ce4a0339b..76613fbfa8 100644 --- a/src/s4u/s4u_Mailbox.cpp +++ b/src/s4u/s4u_Mailbox.cpp @@ -127,6 +127,13 @@ CommPtr Mailbox::get_init() return res; } +CommPtr Mailbox::get_async() +{ + CommPtr res = get_init()->set_dst_data(nullptr, sizeof(void*)); + res->start(); + return res; +} + kernel::activity::ActivityImplPtr Mailbox::iprobe(int type, const std::function& match_fun, void* data) { diff --git a/teshsuite/s4u/CMakeLists.txt b/teshsuite/s4u/CMakeLists.txt index 56d9e61ea0..0ea50b2a97 100644 --- a/teshsuite/s4u/CMakeLists.txt +++ b/teshsuite/s4u/CMakeLists.txt @@ -6,7 +6,7 @@ endforeach() foreach(x actor actor-autorestart actor-suspend activity-lifecycle - comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for wait-any-for + comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for cloud-interrupt-migration cloud-two-execs monkey-masterworkers monkey-semaphore concurrent_rw -- 2.20.1