Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
rework how host-to-host comms are managed
authorSUTER Frederic <frederic.suter@cc.in2p3.fr>
Tue, 1 Mar 2022 03:53:48 +0000 (04:53 +0100)
committerSUTER Frederic <frederic.suter@cc.in2p3.fr>
Tue, 1 Mar 2022 03:53:48 +0000 (04:53 +0100)
MANIFEST.in
examples/cpp/comm-host2host/s4u-comm-host2host.cpp
examples/cpp/comm-host2host/s4u-comm-host2host.tesh
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Comm.hpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/CommImpl.hpp
src/s4u/s4u_Comm.cpp

index a581a1b..530dbf1 100644 (file)
@@ -806,6 +806,7 @@ include teshsuite/s4u/issue71/platform_bad.xml
 include teshsuite/s4u/listen_async/listen_async.cpp
 include teshsuite/s4u/listen_async/listen_async.tesh
 include teshsuite/s4u/monkey-masterworkers/monkey-masterworkers.cpp
+include teshsuite/s4u/monkey-masterworkers/monkey-masterworkers.tesh
 include teshsuite/s4u/ns3-from-src-to-itself/ns3-from-src-to-itself.cpp
 include teshsuite/s4u/ns3-from-src-to-itself/ns3-from-src-to-itself.tesh
 include teshsuite/s4u/ns3-simultaneous-send-rcv/ns3-simultaneous-send-rcv.cpp
index c6efb45..ae5bd31 100644 (file)
@@ -28,12 +28,12 @@ static void sender(sg4::Host* h1, sg4::Host* h2, sg4::Host* h3, sg4::Host* h4)
   auto c12 = sg4::Comm::sendto_async(h1, h2, 1.5e7); // Creates and start a direct communication
 
   auto c34 = sg4::Comm::sendto_init(h3, h4); // Creates but do not start another direct communication
-  c34->set_remaining(1e7);                   // Specify the amount of bytes to exchange in this comm
+  c34->set_payload_size(1e7);                // Specify the amount of bytes to exchange in this comm
 
   // You can also detach() communications that you never plan to test() or wait().
   // Here we create a communication that only slows down the other ones
   auto noise = sg4::Comm::sendto_init(h1, h2);
-  noise->set_remaining(10000);
+  noise->set_payload_size(10000);
   noise->detach();
 
   XBT_INFO("After creation,  c12 is %s (remaining: %.2e bytes); c34 is %s (remaining: %.2e bytes)",
@@ -54,7 +54,7 @@ static void sender(sg4::Host* h1, sg4::Host* h2, sg4::Host* h3, sg4::Host* h4)
   /* As usual, you don't have to explicitly start communications that were just init()ed.
      The wait() will start it automatically. */
   auto c14 = sg4::Comm::sendto_init(h1, h4);
-  c14->set_remaining(100)->wait(); // Chaining 2 operations on this new communication
+  c14->set_payload_size(100)->wait(); // Chaining 2 operations on this new communication
 }
 
 int main(int argc, char* argv[])
index 1335cb8..a3cb33f 100644 (file)
@@ -2,8 +2,8 @@
 
 $ ${bindir:=.}/s4u-comm-host2host ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
 > [  0.000000] (1:sender@Boivin) Send c12 with sendto_async(Tremblay -> Jupiter), and c34 with sendto_init(Fafard -> Ginette)
-> [  0.000000] (1:sender@Boivin) After creation,  c12 is STARTED (remaining: 1.50e+07 bytes); c34 is INITED (remaining: 1.00e+07 bytes)
-> [  1.000000] (1:sender@Boivin) One sec later,   c12 is STARTED (remaining: 8.48e+06 bytes); c34 is INITED (remaining: 1.00e+07 bytes)
+> [  0.000000] (1:sender@Boivin) After creation,  c12 is STARTED (remaining: 1.50e+07 bytes); c34 is STARTING (remaining: 1.00e+07 bytes)
+> [  1.000000] (1:sender@Boivin) One sec later,   c12 is STARTED (remaining: 8.48e+06 bytes); c34 is STARTING (remaining: 1.00e+07 bytes)
 > [  1.000000] (1:sender@Boivin) After c34->start,c12 is STARTED (remaining: 8.48e+06 bytes); c34 is STARTED (remaining: 1.00e+07 bytes)
 > [  2.272621] (1:sender@Boivin) After c12->wait, c12 is FINISHED (remaining: 0.00e+00 bytes); c34 is STARTED (remaining: 5.33e+05 bytes)
 > [  2.343278] (1:sender@Boivin) After c34->wait, c12 is FINISHED (remaining: 0.00e+00 bytes); c34 is FINISHED (remaining: 0.00e+00 bytes)
index ff9fd78..d1d8ab4 100644 (file)
@@ -237,7 +237,6 @@ public:
   }
   AnyActivity* set_name(const std::string& name)
   {
-    xbt_assert(get_state() == State::INITED, "Cannot change the name of an activity after its start");
     name_ = name;
     return static_cast<AnyActivity*>(this);
   }
index 1ce99cb..d43131a 100644 (file)
@@ -30,10 +30,6 @@ class XBT_PUBLIC Comm : public Activity_T<Comm> {
   void* src_buff_                     = nullptr;
   size_t src_buff_size_               = sizeof(void*);
 
-  /* specified only for direct host-to-host communications */
-  Host* from_ = nullptr;
-  Host* to_   = nullptr;
-
   /* FIXME: expose these elements in the API */
   bool detached_                                                          = false;
   bool (*match_fun_)(void*, void*, kernel::activity::CommImpl*)           = nullptr;
@@ -74,9 +70,9 @@ public:
   static void sendto(Host* from, Host* to, uint64_t simulated_size_in_bytes);
 
   CommPtr set_source(Host* from);
-  Host* get_source() const { return from_; }
+  Host* get_source() const;
   CommPtr set_destination(Host* to);
-  Host* get_destination() const { return to_; }
+  Host* get_destination() const;
 
   /* Mailbox-based communications */
   CommPtr set_mailbox(Mailbox* mailbox);
@@ -140,7 +136,7 @@ public:
   /** Sets the maximal communication rate (in byte/sec). Must be done before start */
   CommPtr set_rate(double rate);
 
-  bool is_assigned() const override { return (to_ != nullptr && from_ != nullptr) || (mailbox_ != nullptr); }
+  bool is_assigned() const override;
   Actor* get_sender() const;
 
   /* Comm life cycle */
index bbc8910..9af07ce 100644 (file)
@@ -59,6 +59,18 @@ CommImpl& CommImpl::set_type(CommImplType type)
   return *this;
 }
 
+CommImpl& CommImpl::set_source(s4u::Host* from)
+{
+  from_ = from;
+  return *this;
+}
+
+CommImpl& CommImpl::set_destination(s4u::Host* to)
+{
+  to_ = to;
+  return *this;
+}
+
 CommImpl& CommImpl::set_size(double size)
 {
   size_ = size;
@@ -98,11 +110,6 @@ CommImpl& CommImpl::detach()
   return *this;
 }
 
-CommImpl::CommImpl(s4u::Host* from, s4u::Host* to, double bytes) : size_(bytes), detached_(true), from_(from), to_(to)
-{
-  set_state(State::READY);
-}
-
 CommImpl::~CommImpl()
 {
   XBT_DEBUG("Really free communication %p in state %s (detached = %d)", this, get_state_str(), detached_);
index 2d9a902..75b74c0 100644 (file)
@@ -34,13 +34,16 @@ class XBT_PUBLIC CommImpl : public ActivityImpl_T<CommImpl> {
   CommImplType type_ = CommImplType::SEND; /* Type of the communication (SEND or RECEIVE) */
 
 public:
-  static void set_copy_data_callback(void (*callback)(CommImpl*, void*, size_t));
-
   CommImpl() = default;
-  CommImpl(s4u::Host* from, s4u::Host* to, double bytes);
+
+  static void set_copy_data_callback(void (*callback)(CommImpl*, void*, size_t));
 
   CommImpl& set_type(CommImplType type);
   CommImplType get_type() const { return type_; }
+  CommImpl& set_source(s4u::Host* from);
+  s4u::Host* get_source() const { return from_; }
+  CommImpl& set_destination(s4u::Host* to);
+  s4u::Host* get_destination() const { return to_; }
   CommImpl& set_size(double size);
   CommImpl& set_src_buff(unsigned char* buff, size_t size);
   CommImpl& set_dst_buff(unsigned char* buff, size_t* size);
@@ -52,6 +55,7 @@ public:
   MailboxImpl* get_mailbox() const { return mbox_; }
   long get_mailbox_id() const { return mbox_id_; }
   bool detached() const { return detached_; }
+  bool is_assigned() { return (to_ != nullptr && from_ != nullptr); }
 
   std::vector<s4u::Link*> get_traversed_links() const;
   void copy_data();
index 3e1d6b4..cff5e62 100644 (file)
@@ -63,24 +63,22 @@ Comm::~Comm()
 CommPtr Comm::sendto_init()
 {
   CommPtr res(new Comm());
+  res->pimpl_ = kernel::activity::CommImplPtr(new kernel::activity::CommImpl());
+  boost::static_pointer_cast<kernel::activity::CommImpl>(res->pimpl_)->detach();
   res->sender_ = kernel::actor::ActorImpl::self();
   return res;
 }
 
 CommPtr Comm::sendto_init(Host* from, Host* to)
 {
-  auto res   = Comm::sendto_init();
-  res->from_ = from;
-  res->to_   = to;
-
+  auto res = Comm::sendto_init()->set_source(from)->set_destination(to);
+  res->set_state(State::STARTING);
   return res;
 }
 
 CommPtr Comm::sendto_async(Host* from, Host* to, uint64_t simulated_size_in_bytes)
 {
-  auto res = Comm::sendto_init(from, to)->set_payload_size(simulated_size_in_bytes);
-  res->vetoable_start();
-  return res;
+  return Comm::sendto_init()->set_payload_size(simulated_size_in_bytes)->set_source(from)->set_destination(to);
 }
 
 void Comm::sendto(Host* from, Host* to, uint64_t simulated_size_in_bytes)
@@ -92,24 +90,39 @@ CommPtr Comm::set_source(Host* from)
 {
   xbt_assert(state_ == State::INITED || state_ == State::STARTING,
              "Cannot change the source of a Comm once it's started (state: %s)", to_c_str(state_));
-  from_ = from;
-  // Setting 'from_' may allow to start the activity, let's try
-  vetoable_start();
+  boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->set_source(from);
+  // Setting 'source' may allow to start the activity, let's try
+  if (state_ == State::STARTING && remains_ <= 0)
+    XBT_DEBUG("This communication has a payload size of 0 byte. It cannot start yet");
+  else
+    vetoable_start();
 
   return this;
 }
+Host* Comm::get_source() const
+{
+  return pimpl_ ? boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->get_source() : nullptr;
+}
 
 CommPtr Comm::set_destination(Host* to)
 {
   xbt_assert(state_ == State::INITED || state_ == State::STARTING,
              "Cannot change the destination of a Comm once it's started (state: %s)", to_c_str(state_));
-  to_ = to;
-  // Setting 'to_' may allow to start the activity, let's try
-  vetoable_start();
+  boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->set_destination(to);
+  // Setting 'destination' may allow to start the activity, let's try
+  if (state_ == State::STARTING && remains_ <= 0)
+    XBT_DEBUG("This communication has a payload size of 0 byte. It cannot start yet");
+  else
+    vetoable_start();
 
   return this;
 }
 
+Host* Comm::get_destination() const
+{
+  return pimpl_ ? boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->get_destination() : nullptr;
+}
+
 CommPtr Comm::set_rate(double rate)
 {
   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
@@ -177,6 +190,9 @@ CommPtr Comm::set_dst_data(void** buff, size_t size)
 CommPtr Comm::set_payload_size(uint64_t bytes)
 {
   Activity::set_remaining(bytes);
+  if (pimpl_) {
+    boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->set_size(bytes);
+  }
   return this;
 }
 
@@ -188,20 +204,25 @@ Actor* Comm::get_sender() const
   return sender ? sender->get_ciface() : nullptr;
 }
 
+bool Comm::is_assigned() const
+{
+  return (pimpl_ && boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->is_assigned()) ||
+         mailbox_ != nullptr;
+}
+
 Comm* Comm::start()
 {
   xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
              "You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
-  if (from_ != nullptr || to_ != nullptr) {
-    xbt_assert(from_ != nullptr && to_ != nullptr, "When either from_ or to_ is specified, both must be.");
+  if (get_source() != nullptr || get_destination() != nullptr) {
+    xbt_assert(is_assigned(), "When either from_ or to_ is specified, both must be.");
     xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
                "Direct host-to-host communications cannot carry any data.");
-    pimpl_ = kernel::actor::simcall_answered([this] {
-      kernel::activity::CommImplPtr res(new kernel::activity::CommImpl(this->from_, this->to_, this->get_remaining()));
-      res->start();
-      return res;
+    XBT_DEBUG("host-to-host Comm. Pimpl already created and set, just start it.");
+    kernel::actor::simcall_answered([this] {
+      pimpl_->set_state(kernel::activity::State::READY);
+      boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->start();
     });
-
   } else if (src_buff_ != nullptr) { // Sender side
     on_send(*this);
     kernel::actor::CommIsendSimcall observer{sender_,
@@ -248,8 +269,8 @@ Comm* Comm::start()
 
 Comm* Comm::detach()
 {
-  xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication is %s (not implemented)",
-             __FUNCTION__, get_state_str());
+  xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+             "You cannot use %s() once your communication is %s (not implemented)", __FUNCTION__, get_state_str());
   xbt_assert(dst_buff_ == nullptr && dst_buff_size_ == 0, "You can only detach sends, not recvs");
   detached_ = true;
   vetoable_start();
@@ -281,7 +302,7 @@ Comm* Comm::wait_for(double timeout)
       throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
     case State::INITED:
     case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication
-      if (from_ != nullptr || to_ != nullptr) {
+      if (get_source() != nullptr || get_destination() != nullptr) {
         return vetoable_start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls
       } else if (src_buff_ != nullptr) {
         on_send(*this);
@@ -319,6 +340,7 @@ Comm* Comm::wait_for(double timeout)
   complete(State::FINISHED);
   return this;
 }
+
 ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
 {
   std::vector<ActivityPtr> activities;