Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Catch more specific exceptions.
[simgrid.git] / src / s4u / s4u_Comm.cpp
index c58896f68d39dd11c1d90f3c2faf12f805b55a01..d3a766b644f5a1410ecdeecb125d2cf080c69d2f 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2006-2021. The SimGrid Team. All rights reserved.          */
+/* Copyright (c) 2006-2022. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -23,16 +23,10 @@ xbt::signal<void(Comm const&)> Comm::on_send;
 xbt::signal<void(Comm const&)> Comm::on_recv;
 xbt::signal<void(Comm const&)> Comm::on_completion;
 
-void Comm::complete(Activity::State state)
-{
-  Activity::complete(state);
-  on_completion(*this);
-}
-
 Comm::~Comm()
 {
   if (state_ == State::STARTED && not detached_ &&
-      (pimpl_ == nullptr || pimpl_->state_ == kernel::activity::State::RUNNING)) {
+      (pimpl_ == nullptr || pimpl_->get_state() == kernel::activity::State::RUNNING)) {
     XBT_INFO("Comm %p freed before its completion. Did you forget to detach it? (state: %s)", this, get_state_str());
     if (pimpl_ != nullptr)
       XBT_INFO("pimpl_->state: %s", pimpl_->get_state_str());
@@ -52,7 +46,7 @@ ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
     changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout);
   } catch (const NetworkFailureException& e) {
     for (auto c : comms) {
-      if (c->pimpl_->state_ == kernel::activity::State::FAILED) {
+      if (c->pimpl_->get_state() == kernel::activity::State::FAILED) {
         c->complete(State::FAILED);
       }
     }
@@ -91,6 +85,28 @@ size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout)
   return comms.size();
 }
 
+CommPtr Comm::set_source(Host* from)
+{
+  xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+             "Cannot change the source of a Comm once it's started (state: %s)", to_c_str(state_));
+  from_ = from;
+  // Setting 'from_' may allow to start the activity, let's try
+  vetoable_start();
+
+  return this;
+}
+
+CommPtr Comm::set_destination(Host* to)
+{
+  xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+             "Cannot change the destination of a Comm once it's started (state: %s)", to_c_str(state_));
+  to_ = to;
+  // Setting 'to_' may allow to start the activity, let's try
+  vetoable_start();
+
+  return this;
+}
+
 CommPtr Comm::set_rate(double rate)
 {
   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
@@ -160,9 +176,16 @@ CommPtr Comm::set_payload_size(uint64_t bytes)
   return this;
 }
 
-CommPtr Comm::sendto_init(Host* from, Host* to)
+CommPtr Comm::sendto_init()
 {
   CommPtr res(new Comm());
+  res->sender_ = kernel::actor::ActorImpl::self();
+  return res;
+}
+
+CommPtr Comm::sendto_init(Host* from, Host* to)
+{
+  auto res   = Comm::sendto_init();
   res->from_ = from;
   res->to_   = to;
 
@@ -212,8 +235,11 @@ Comm* Comm::start()
   if (suspended_)
     pimpl_->suspend();
 
-  if (not detached_)
-    static_cast<kernel::activity::CommImpl*>(pimpl_.get())->set_iface(this);
+  if (not detached_) {
+    pimpl_->set_iface(this);
+    pimpl_->set_actor(sender_);
+  }
+
   state_ = State::STARTED;
   return this;
 }