From: SUTER Frederic Date: Tue, 1 Mar 2022 03:53:48 +0000 (+0100) Subject: rework how host-to-host comms are managed X-Git-Tag: v3.31~266 X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/ec39b7be49afee33014ec95bb7a6673e059cb216 rework how host-to-host comms are managed --- diff --git a/MANIFEST.in b/MANIFEST.in index a581a1b992..530dbf16eb 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -806,6 +806,7 @@ include teshsuite/s4u/issue71/platform_bad.xml include teshsuite/s4u/listen_async/listen_async.cpp include teshsuite/s4u/listen_async/listen_async.tesh include teshsuite/s4u/monkey-masterworkers/monkey-masterworkers.cpp +include teshsuite/s4u/monkey-masterworkers/monkey-masterworkers.tesh include teshsuite/s4u/ns3-from-src-to-itself/ns3-from-src-to-itself.cpp include teshsuite/s4u/ns3-from-src-to-itself/ns3-from-src-to-itself.tesh include teshsuite/s4u/ns3-simultaneous-send-rcv/ns3-simultaneous-send-rcv.cpp diff --git a/examples/cpp/comm-host2host/s4u-comm-host2host.cpp b/examples/cpp/comm-host2host/s4u-comm-host2host.cpp index c6efb45b68..ae5bd31139 100644 --- a/examples/cpp/comm-host2host/s4u-comm-host2host.cpp +++ b/examples/cpp/comm-host2host/s4u-comm-host2host.cpp @@ -28,12 +28,12 @@ static void sender(sg4::Host* h1, sg4::Host* h2, sg4::Host* h3, sg4::Host* h4) auto c12 = sg4::Comm::sendto_async(h1, h2, 1.5e7); // Creates and start a direct communication auto c34 = sg4::Comm::sendto_init(h3, h4); // Creates but do not start another direct communication - c34->set_remaining(1e7); // Specify the amount of bytes to exchange in this comm + c34->set_payload_size(1e7); // Specify the amount of bytes to exchange in this comm // You can also detach() communications that you never plan to test() or wait(). // Here we create a communication that only slows down the other ones auto noise = sg4::Comm::sendto_init(h1, h2); - noise->set_remaining(10000); + noise->set_payload_size(10000); noise->detach(); XBT_INFO("After creation, c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)", @@ -54,7 +54,7 @@ static void sender(sg4::Host* h1, sg4::Host* h2, sg4::Host* h3, sg4::Host* h4) /* As usual, you don't have to explicitly start communications that were just init()ed. The wait() will start it automatically. */ auto c14 = sg4::Comm::sendto_init(h1, h4); - c14->set_remaining(100)->wait(); // Chaining 2 operations on this new communication + c14->set_payload_size(100)->wait(); // Chaining 2 operations on this new communication } int main(int argc, char* argv[]) diff --git a/examples/cpp/comm-host2host/s4u-comm-host2host.tesh b/examples/cpp/comm-host2host/s4u-comm-host2host.tesh index 1335cb8889..a3cb33f43c 100644 --- a/examples/cpp/comm-host2host/s4u-comm-host2host.tesh +++ b/examples/cpp/comm-host2host/s4u-comm-host2host.tesh @@ -2,8 +2,8 @@ $ ${bindir:=.}/s4u-comm-host2host ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" > [ 0.000000] (1:sender@Boivin) Send c12 with sendto_async(Tremblay -> Jupiter), and c34 with sendto_init(Fafard -> Ginette) -> [ 0.000000] (1:sender@Boivin) After creation, c12 is STARTED (remaining: 1.50e+07 bytes); c34 is INITED (remaining: 1.00e+07 bytes) -> [ 1.000000] (1:sender@Boivin) One sec later, c12 is STARTED (remaining: 8.48e+06 bytes); c34 is INITED (remaining: 1.00e+07 bytes) +> [ 0.000000] (1:sender@Boivin) After creation, c12 is STARTED (remaining: 1.50e+07 bytes); c34 is STARTING (remaining: 1.00e+07 bytes) +> [ 1.000000] (1:sender@Boivin) One sec later, c12 is STARTED (remaining: 8.48e+06 bytes); c34 is STARTING (remaining: 1.00e+07 bytes) > [ 1.000000] (1:sender@Boivin) After c34->start,c12 is STARTED (remaining: 8.48e+06 bytes); c34 is STARTED (remaining: 1.00e+07 bytes) > [ 2.272621] (1:sender@Boivin) After c12->wait, c12 is FINISHED (remaining: 0.00e+00 bytes); c34 is STARTED (remaining: 5.33e+05 bytes) > [ 2.343278] (1:sender@Boivin) After c34->wait, c12 is FINISHED (remaining: 0.00e+00 bytes); c34 is FINISHED (remaining: 0.00e+00 bytes) diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index ff9fd78b1f..d1d8ab4e1f 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -237,7 +237,6 @@ public: } AnyActivity* set_name(const std::string& name) { - xbt_assert(get_state() == State::INITED, "Cannot change the name of an activity after its start"); name_ = name; return static_cast(this); } diff --git a/include/simgrid/s4u/Comm.hpp b/include/simgrid/s4u/Comm.hpp index 1ce99cb247..d43131a9fa 100644 --- a/include/simgrid/s4u/Comm.hpp +++ b/include/simgrid/s4u/Comm.hpp @@ -30,10 +30,6 @@ class XBT_PUBLIC Comm : public Activity_T { void* src_buff_ = nullptr; size_t src_buff_size_ = sizeof(void*); - /* specified only for direct host-to-host communications */ - Host* from_ = nullptr; - Host* to_ = nullptr; - /* FIXME: expose these elements in the API */ bool detached_ = false; bool (*match_fun_)(void*, void*, kernel::activity::CommImpl*) = nullptr; @@ -74,9 +70,9 @@ public: static void sendto(Host* from, Host* to, uint64_t simulated_size_in_bytes); CommPtr set_source(Host* from); - Host* get_source() const { return from_; } + Host* get_source() const; CommPtr set_destination(Host* to); - Host* get_destination() const { return to_; } + Host* get_destination() const; /* Mailbox-based communications */ CommPtr set_mailbox(Mailbox* mailbox); @@ -140,7 +136,7 @@ public: /** Sets the maximal communication rate (in byte/sec). Must be done before start */ CommPtr set_rate(double rate); - bool is_assigned() const override { return (to_ != nullptr && from_ != nullptr) || (mailbox_ != nullptr); } + bool is_assigned() const override; Actor* get_sender() const; /* Comm life cycle */ diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index bbc8910dfa..9af07ce8a5 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -59,6 +59,18 @@ CommImpl& CommImpl::set_type(CommImplType type) return *this; } +CommImpl& CommImpl::set_source(s4u::Host* from) +{ + from_ = from; + return *this; +} + +CommImpl& CommImpl::set_destination(s4u::Host* to) +{ + to_ = to; + return *this; +} + CommImpl& CommImpl::set_size(double size) { size_ = size; @@ -98,11 +110,6 @@ CommImpl& CommImpl::detach() return *this; } -CommImpl::CommImpl(s4u::Host* from, s4u::Host* to, double bytes) : size_(bytes), detached_(true), from_(from), to_(to) -{ - set_state(State::READY); -} - CommImpl::~CommImpl() { XBT_DEBUG("Really free communication %p in state %s (detached = %d)", this, get_state_str(), detached_); diff --git a/src/kernel/activity/CommImpl.hpp b/src/kernel/activity/CommImpl.hpp index 2d9a9023fd..75b74c0eb9 100644 --- a/src/kernel/activity/CommImpl.hpp +++ b/src/kernel/activity/CommImpl.hpp @@ -34,13 +34,16 @@ class XBT_PUBLIC CommImpl : public ActivityImpl_T { CommImplType type_ = CommImplType::SEND; /* Type of the communication (SEND or RECEIVE) */ public: - static void set_copy_data_callback(void (*callback)(CommImpl*, void*, size_t)); - CommImpl() = default; - CommImpl(s4u::Host* from, s4u::Host* to, double bytes); + + static void set_copy_data_callback(void (*callback)(CommImpl*, void*, size_t)); CommImpl& set_type(CommImplType type); CommImplType get_type() const { return type_; } + CommImpl& set_source(s4u::Host* from); + s4u::Host* get_source() const { return from_; } + CommImpl& set_destination(s4u::Host* to); + s4u::Host* get_destination() const { return to_; } CommImpl& set_size(double size); CommImpl& set_src_buff(unsigned char* buff, size_t size); CommImpl& set_dst_buff(unsigned char* buff, size_t* size); @@ -52,6 +55,7 @@ public: MailboxImpl* get_mailbox() const { return mbox_; } long get_mailbox_id() const { return mbox_id_; } bool detached() const { return detached_; } + bool is_assigned() { return (to_ != nullptr && from_ != nullptr); } std::vector get_traversed_links() const; void copy_data(); diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index 3e1d6b43ad..cff5e622ad 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -63,24 +63,22 @@ Comm::~Comm() CommPtr Comm::sendto_init() { CommPtr res(new Comm()); + res->pimpl_ = kernel::activity::CommImplPtr(new kernel::activity::CommImpl()); + boost::static_pointer_cast(res->pimpl_)->detach(); res->sender_ = kernel::actor::ActorImpl::self(); return res; } CommPtr Comm::sendto_init(Host* from, Host* to) { - auto res = Comm::sendto_init(); - res->from_ = from; - res->to_ = to; - + auto res = Comm::sendto_init()->set_source(from)->set_destination(to); + res->set_state(State::STARTING); return res; } CommPtr Comm::sendto_async(Host* from, Host* to, uint64_t simulated_size_in_bytes) { - auto res = Comm::sendto_init(from, to)->set_payload_size(simulated_size_in_bytes); - res->vetoable_start(); - return res; + return Comm::sendto_init()->set_payload_size(simulated_size_in_bytes)->set_source(from)->set_destination(to); } void Comm::sendto(Host* from, Host* to, uint64_t simulated_size_in_bytes) @@ -92,24 +90,39 @@ CommPtr Comm::set_source(Host* from) { xbt_assert(state_ == State::INITED || state_ == State::STARTING, "Cannot change the source of a Comm once it's started (state: %s)", to_c_str(state_)); - from_ = from; - // Setting 'from_' may allow to start the activity, let's try - vetoable_start(); + boost::static_pointer_cast(pimpl_)->set_source(from); + // Setting 'source' may allow to start the activity, let's try + if (state_ == State::STARTING && remains_ <= 0) + XBT_DEBUG("This communication has a payload size of 0 byte. It cannot start yet"); + else + vetoable_start(); return this; } +Host* Comm::get_source() const +{ + return pimpl_ ? boost::static_pointer_cast(pimpl_)->get_source() : nullptr; +} CommPtr Comm::set_destination(Host* to) { xbt_assert(state_ == State::INITED || state_ == State::STARTING, "Cannot change the destination of a Comm once it's started (state: %s)", to_c_str(state_)); - to_ = to; - // Setting 'to_' may allow to start the activity, let's try - vetoable_start(); + boost::static_pointer_cast(pimpl_)->set_destination(to); + // Setting 'destination' may allow to start the activity, let's try + if (state_ == State::STARTING && remains_ <= 0) + XBT_DEBUG("This communication has a payload size of 0 byte. It cannot start yet"); + else + vetoable_start(); return this; } +Host* Comm::get_destination() const +{ + return pimpl_ ? boost::static_pointer_cast(pimpl_)->get_destination() : nullptr; +} + CommPtr Comm::set_rate(double rate) { xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)", @@ -177,6 +190,9 @@ CommPtr Comm::set_dst_data(void** buff, size_t size) CommPtr Comm::set_payload_size(uint64_t bytes) { Activity::set_remaining(bytes); + if (pimpl_) { + boost::static_pointer_cast(pimpl_)->set_size(bytes); + } return this; } @@ -188,20 +204,25 @@ Actor* Comm::get_sender() const return sender ? sender->get_ciface() : nullptr; } +bool Comm::is_assigned() const +{ + return (pimpl_ && boost::static_pointer_cast(pimpl_)->is_assigned()) || + mailbox_ != nullptr; +} + Comm* Comm::start() { xbt_assert(get_state() == State::INITED || get_state() == State::STARTING, "You cannot use %s() once your communication started (not implemented)", __FUNCTION__); - if (from_ != nullptr || to_ != nullptr) { - xbt_assert(from_ != nullptr && to_ != nullptr, "When either from_ or to_ is specified, both must be."); + if (get_source() != nullptr || get_destination() != nullptr) { + xbt_assert(is_assigned(), "When either from_ or to_ is specified, both must be."); xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr, "Direct host-to-host communications cannot carry any data."); - pimpl_ = kernel::actor::simcall_answered([this] { - kernel::activity::CommImplPtr res(new kernel::activity::CommImpl(this->from_, this->to_, this->get_remaining())); - res->start(); - return res; + XBT_DEBUG("host-to-host Comm. Pimpl already created and set, just start it."); + kernel::actor::simcall_answered([this] { + pimpl_->set_state(kernel::activity::State::READY); + boost::static_pointer_cast(pimpl_)->start(); }); - } else if (src_buff_ != nullptr) { // Sender side on_send(*this); kernel::actor::CommIsendSimcall observer{sender_, @@ -248,8 +269,8 @@ Comm* Comm::start() Comm* Comm::detach() { - xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication is %s (not implemented)", - __FUNCTION__, get_state_str()); + xbt_assert(state_ == State::INITED || state_ == State::STARTING, + "You cannot use %s() once your communication is %s (not implemented)", __FUNCTION__, get_state_str()); xbt_assert(dst_buff_ == nullptr && dst_buff_size_ == 0, "You can only detach sends, not recvs"); detached_ = true; vetoable_start(); @@ -281,7 +302,7 @@ Comm* Comm::wait_for(double timeout) throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication"); case State::INITED: case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication - if (from_ != nullptr || to_ != nullptr) { + if (get_source() != nullptr || get_destination() != nullptr) { return vetoable_start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls } else if (src_buff_ != nullptr) { on_send(*this); @@ -319,6 +340,7 @@ Comm* Comm::wait_for(double timeout) complete(State::FINISHED); return this; } + ssize_t Comm::wait_any_for(const std::vector& comms, double timeout) { std::vector activities;