]> AND Public Git Repository - simgrid.git/blobdiff - src/kernel/activity/CommImpl.cpp
Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add the capacity the update the priority of an I/O during its execution
[simgrid.git] / src / kernel / activity / CommImpl.cpp
index 67a94e87261e2092f1e0ae941cdba38c8ddfcdb5..b2f92e74424dde00ea1a656cffe8ffc4136c3f55 100644 (file)
@@ -3,18 +3,17 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
+#include <simgrid/Exception.hpp>
+#include <simgrid/kernel/routing/NetPoint.hpp>
+#include <simgrid/modelchecker.h>
+#include <simgrid/s4u/Host.hpp>
+
 #include "src/kernel/activity/CommImpl.hpp"
-#include "simgrid/Exception.hpp"
-#include "simgrid/kernel/resource/Action.hpp"
-#include "simgrid/kernel/routing/NetPoint.hpp"
-#include "simgrid/modelchecker.h"
-#include "simgrid/s4u/Host.hpp"
 #include "src/kernel/activity/MailboxImpl.hpp"
 #include "src/kernel/context/Context.hpp"
 #include "src/mc/mc_replay.hpp"
 #include "src/surf/cpu_interface.hpp"
 #include "src/surf/network_interface.hpp"
-#include "src/surf/surf_interface.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
 
@@ -212,28 +211,25 @@ void SIMIX_comm_set_copy_data_callback(void (*callback)(simgrid::kernel::activit
   simgrid::kernel::activity::CommImpl::set_copy_data_callback(callback);
 }
 
+// XBT_ATTRIB_DEPRECATED_v333
 void SIMIX_comm_copy_buffer_callback(simgrid::kernel::activity::CommImpl* comm, void* buff, size_t buff_size)
 {
-  XBT_DEBUG("Copy the data over");
-  memcpy(comm->dst_buff_, buff, buff_size);
-  if (comm->detached()) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the
-                          // original buffer available to the application ASAP
-    xbt_free(buff);
-    comm->src_buff_ = nullptr;
-  }
+  simgrid::s4u::Comm::copy_buffer_callback(comm, buff, buff_size);
 }
 
+// XBT_ATTRIB_DEPRECATED_v333
 void SIMIX_comm_copy_pointer_callback(simgrid::kernel::activity::CommImpl* comm, void* buff, size_t buff_size)
 {
-  xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
-  *(void**)(comm->dst_buff_) = buff;
+  simgrid::s4u::Comm::copy_pointer_callback(comm, buff, buff_size);
 }
 
 namespace simgrid {
 namespace kernel {
 namespace activity {
+xbt::signal<void(CommImpl const&)> CommImpl::on_start;
+xbt::signal<void(CommImpl const&)> CommImpl::on_completion;
 
-void (*CommImpl::copy_data_callback_)(CommImpl*, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
+void (*CommImpl::copy_data_callback_)(CommImpl*, void*, size_t) = &s4u::Comm::copy_pointer_callback;
 
 void CommImpl::set_copy_data_callback(void (*callback)(CommImpl*, void*, size_t))
 {
@@ -306,7 +302,6 @@ CommImpl* CommImpl::start()
   if (state_ == State::READY) {
     from_ = from_ != nullptr ? from_ : src_actor_->get_host();
     to_   = to_ != nullptr ? to_ : dst_actor_->get_host();
-
     /* Getting the network_model from the origin host
      * Valid while we have a single network model, otherwise we would need to change this function to first get the
      * routes and later create the respective surf actions */
@@ -316,6 +311,7 @@ CommImpl* CommImpl::start()
     surf_action_->set_activity(this);
     surf_action_->set_category(get_tracing_category());
     state_ = State::RUNNING;
+    on_start(*this);
 
     XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p; state: %s)", this, from_->get_cname(),
               to_->get_cname(), surf_action_, get_state_str());
@@ -347,6 +343,16 @@ CommImpl* CommImpl::start()
   return this;
 }
 
+std::vector<s4u::Link*> CommImpl::get_traversed_links() const
+{
+  xbt_assert(state_ != State::WAITING, "You cannot use %s() if your communication is not ready (%s)", __FUNCTION__,
+             get_state_str());
+  std::vector<s4u::Link*> vlinks;
+  XBT_ATTRIB_UNUSED double res = 0;
+  from_->route_to(to_, vlinks, &res);
+  return vlinks;
+}
+
 /** @brief Copy the communication data from the sender's buffer to the receiver's one  */
 void CommImpl::copy_data()
 {
@@ -525,6 +531,8 @@ void CommImpl::cleanup_surf()
 
 void CommImpl::post()
 {
+  on_completion(*this);
+
   /* Update synchro state */
   if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED)
     state_ = State::SRC_TIMEOUT;
@@ -552,7 +560,6 @@ void CommImpl::post()
 void CommImpl::finish()
 {
   XBT_DEBUG("CommImpl::finish() in state %s", to_c_str(state_));
-
   /* If the synchro is still in a rendez-vous point then remove from it */
   if (mbox_)
     mbox_->remove(this);
@@ -580,8 +587,8 @@ void CommImpl::finish()
         simcall->timeout_cb_ = nullptr;
       }
       if (not MC_is_active() && not MC_record_replay_is_active()) {
-        CommImpl** element = std::find(comms, comms + count, this);
-        ssize_t rank       = (element != comms + count) ? element - comms : -1;
+        auto element = std::find(comms, comms + count, this);
+        ssize_t rank = (element != comms + count) ? element - comms : -1;
         simcall_comm_waitany__set__result(simcall, rank);
       }
     }
@@ -673,8 +680,8 @@ void CommImpl::finish()
         comms = simcall_comm_testany__get__comms(simcall);
         count = simcall_comm_testany__get__count(simcall);
       }
-      CommImpl** element = std::find(comms, comms + count, this);
-      ssize_t rank       = (element != comms + count) ? element - comms : -1;
+      auto element = std::find(comms, comms + count, this);
+      ssize_t rank = (element != comms + count) ? element - comms : -1;
       // In order to modify the exception we have to rethrow it:
       try {
         std::rethrow_exception(simcall->issuer_->exception_);