Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of framagit.org:simgrid/simgrid
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 1 Nov 2022 22:55:04 +0000 (23:55 +0100)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 1 Nov 2022 22:55:04 +0000 (23:55 +0100)
26 files changed:
MANIFEST.in
include/simgrid/s4u/Io.hpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/IoImpl.cpp
src/kernel/activity/IoImpl.hpp
src/kernel/resource/DiskImpl.cpp
src/kernel/resource/DiskImpl.hpp
src/kernel/resource/VirtualMachineImpl.hpp
src/kernel/routing/NetZoneImpl.cpp
src/s4u/s4u_Io.cpp
src/simgrid/sg_config.cpp
src/surf/HostImpl.hpp
src/surf/disk_s19.cpp
src/surf/disk_s19.hpp
src/surf/host_clm03.cpp
src/surf/host_clm03.hpp
src/surf/ptask_L07.cpp
src/surf/ptask_L07.hpp
src/surf/sio_S22.cpp [new file with mode: 0644]
src/surf/sio_S22.hpp [new file with mode: 0644]
src/surf/surf_interface.cpp
src/surf/surf_interface.hpp
teshsuite/s4u/CMakeLists.txt
teshsuite/s4u/io-stream/io-stream.cpp [new file with mode: 0644]
teshsuite/s4u/io-stream/io-stream.tesh [new file with mode: 0644]
tools/cmake/DefinePackages.cmake

index ccc49b5..80df1c2 100644 (file)
@@ -851,6 +851,8 @@ include teshsuite/s4u/host-on-off/host-on-off.cpp
 include teshsuite/s4u/host-on-off/host-on-off.tesh
 include teshsuite/s4u/io-set-bw/io-set-bw.cpp
 include teshsuite/s4u/io-set-bw/io-set-bw.tesh
+include teshsuite/s4u/io-stream/io-stream.cpp
+include teshsuite/s4u/io-stream/io-stream.tesh
 include teshsuite/s4u/is-router/is-router.cpp
 include teshsuite/s4u/is-router/is-router.tesh
 include teshsuite/s4u/issue71/issue71.cpp
@@ -2628,6 +2630,8 @@ include src/surf/ns3/ns3_simulator.hpp
 include src/surf/ptask_L07.cpp
 include src/surf/ptask_L07.hpp
 include src/surf/sg_platf.cpp
+include src/surf/sio_S22.cpp
+include src/surf/sio_S22.hpp
 include src/surf/surf_interface.cpp
 include src/surf/surf_interface.hpp
 include src/surf/xml/platf.hpp
index c3118c3..81dafc9 100644 (file)
@@ -50,6 +50,13 @@ public:
   IoPtr set_size(sg_size_t size);
   IoPtr set_op_type(OpType type);
 
+  static IoPtr streamto_init(Host* from, Disk* from_disk, Host* to, Disk* to_disk);
+  static IoPtr streamto_async(Host* from, Disk* from_disk, Host* to, Disk* to_disk, uint64_t simulated_size_in_bytes);
+  static void streamto(Host* from, Disk* from_disk, Host* to, Disk* to_disk, uint64_t simulated_size_in_bytes);
+
+  IoPtr set_source(Host* from, Disk* from_disk);
+  IoPtr set_destination(Host* to, Disk* to_disk);
+
   IoPtr update_priority(double priority);
 
   bool is_assigned() const override;
index 0d21716..ca40f6a 100644 (file)
@@ -144,7 +144,7 @@ CommImpl* CommImpl::start()
     } else if ((src_actor_ != nullptr && src_actor_->is_suspended()) ||
                (dst_actor_ != nullptr && dst_actor_->is_suspended())) {
       /* If any of the actor is suspended, create the synchro but stop its execution,
-         it will be restarted when the sender actor resume */
+         it will be restarted when the sender actor resumes */
       if (src_actor_->is_suspended())
         XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the "
                   "communication",
index c30b1a9..339b91a 100644 (file)
@@ -15,6 +15,7 @@
 #include "src/kernel/resource/CpuImpl.hpp"
 #include "src/kernel/resource/DiskImpl.hpp"
 #include "src/mc/mc_replay.hpp"
+#include "src/surf/HostImpl.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_io, kernel, "Kernel io-related synchronization");
 
@@ -69,14 +70,40 @@ IoImpl& IoImpl::set_disk(resource::DiskImpl* disk)
   return *this;
 }
 
+IoImpl& IoImpl::set_host(s4u::Host* host)
+{
+  host_ = host;
+  return *this;
+}
+
+IoImpl& IoImpl::set_dst_disk(resource::DiskImpl* disk)
+{
+  dst_disk_ = disk;
+  return *this;
+}
+
+IoImpl& IoImpl::set_dst_host(s4u::Host* host)
+{
+  dst_host_ = host;
+  return *this;
+}
+
 IoImpl* IoImpl::start()
 {
   set_state(State::RUNNING);
-  surf_action_ =
-      disk_->get_host()->get_netpoint()->get_englobing_zone()->get_disk_model()->io_start(disk_, size_, type_);
-  surf_action_->set_sharing_penalty(sharing_penalty_);
+  if (dst_host_ == nullptr) {
+    XBT_DEBUG("Starting regular I/O");
+    surf_action_ = disk_->io_start(size_, type_);
+    surf_action_->set_sharing_penalty(sharing_penalty_);
+  } else {
+    XBT_DEBUG("Starting streaming I/O");
+    auto host_model = dst_host_->get_netpoint()->get_englobing_zone()->get_host_model();
+    surf_action_    = host_model->io_stream(host_, disk_, dst_host_, dst_disk_, size_);
+  }
+
   surf_action_->set_activity(this);
   set_start_time(surf_action_->get_start_time());
+#include "src/surf/HostImpl.hpp"
 
   XBT_DEBUG("Create IO synchro %p %s", this, get_cname());
 
index 399775b..d22447c 100644 (file)
 namespace simgrid::kernel::activity {
 
 class XBT_PUBLIC IoImpl : public ActivityImpl_T<IoImpl> {
+  s4u::Host* host_                    = nullptr;
   resource::DiskImpl* disk_           = nullptr;
+  s4u::Host* dst_host_                = nullptr;
+  resource::DiskImpl* dst_disk_       = nullptr;
   double sharing_penalty_             = 1.0;
   sg_size_t size_                     = 0;
   s4u::Io::OpType type_               = s4u::Io::OpType::READ;
@@ -27,6 +30,11 @@ public:
   IoImpl& set_size(sg_size_t size);
   IoImpl& set_type(s4u::Io::OpType type);
   IoImpl& set_disk(resource::DiskImpl* disk);
+  IoImpl& set_host(s4u::Host* host);
+  s4u::Host* get_host() const { return host_; }
+  IoImpl& set_dst_disk(resource::DiskImpl* disk);
+  IoImpl& set_dst_host(s4u::Host* host);
+  s4u::Host* get_dst_host() const { return dst_host_; }
 
   IoImpl& update_sharing_penalty(double sharing_penalty);
 
index 07709ce..126d53f 100644 (file)
@@ -6,32 +6,15 @@
 #include "DiskImpl.hpp"
 
 #include "simgrid/s4u/Engine.hpp"
-#include "simgrid/sg_config.hpp"
 #include "src/kernel/EngineImpl.hpp"
 #include "src/kernel/lmm/maxmin.hpp"
 #include "src/kernel/resource/profile/Profile.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(res_disk, ker_resource, "Disk resources, that fuel I/O activities");
-/***********
- * Options *
- ***********/
-static simgrid::config::Flag<std::string> cfg_disk_solver("disk/solver",
-                                                          "Set linear equations solver used by disk model", "maxmin",
-                                                          &simgrid::kernel::lmm::System::validate_solver);
-
 namespace simgrid::kernel::resource {
 
 xbt::signal<void(DiskAction const&, Action::State, Action::State)> DiskAction::on_state_change;
 
-/*********
- * Model *
- *********/
-
-DiskModel::DiskModel(const std::string& name) : Model(name)
-{
-  set_maxmin_system(lmm::System::build(cfg_disk_solver.get(), true /* selective update */));
-}
-
 /************
  * Resource *
  ************/
@@ -177,4 +160,9 @@ void DiskAction::set_state(Action::State new_state)
     on_state_change(*this, previous_state, new_state);
   }
 }
+
+void DiskAction::update_remains_lazy(double /*now*/)
+{
+  THROW_IMPOSSIBLE;
+}
 } // namespace simgrid::kernel::resource
index 611234d..7b46f08 100644 (file)
@@ -32,13 +32,9 @@ class DiskAction;
  *********/
 class DiskModel : public Model {
 public:
-  explicit DiskModel(const std::string& name);
-  DiskModel(const DiskModel&) = delete;
-  DiskModel& operator=(const DiskModel&) = delete;
+  using Model::Model;
 
   virtual DiskImpl* create_disk(const std::string& name, double read_bandwidth, double write_bandwidth) = 0;
-
-  virtual DiskAction* io_start(const DiskImpl* disk, sg_size_t size, s4u::Io::OpType type) = 0;
 };
 
 /************
@@ -111,6 +107,8 @@ public:
 
   void seal() override;
   void destroy(); // Must be called instead of the destructor
+
+  virtual DiskAction* io_start(sg_size_t size, s4u::Io::OpType type) = 0;
 };
 
 /**********
@@ -123,6 +121,7 @@ public:
 
   using Action::Action;
   void set_state(simgrid::kernel::resource::Action::State state) override;
+  void update_remains_lazy(double now) override;
 };
 
 } // namespace simgrid::kernel::resource
index 81cd3d6..09399be 100644 (file)
@@ -99,6 +99,7 @@ public:
   {
     return nullptr;
   };
+  Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size) override { return nullptr; }
 };
 } // namespace kernel::resource
 } // namespace simgrid
index 18d00d9..92b52b5 100644 (file)
@@ -33,8 +33,8 @@ static void surf_config_models_setup()
   std::string disk_model_name    = simgrid::config::get_value<std::string>("disk/model");
 
   /* The compound host model is needed when using non-default net/cpu models */
-  if ((not simgrid::config::is_default("network/model") || not simgrid::config::is_default("cpu/model")) &&
-      simgrid::config::is_default("host/model")) {
+  if ((not simgrid::config::is_default("network/model") || not simgrid::config::is_default("cpu/model") ||
+       not simgrid::config::is_default("disk/model")) && simgrid::config::is_default("host/model")) {
     host_model_name = "compound";
     simgrid::config::set_value("host/model", host_model_name);
   }
@@ -42,11 +42,15 @@ static void surf_config_models_setup()
   XBT_DEBUG("host model: %s", host_model_name.c_str());
   if (host_model_name == "compound") {
     xbt_enforce(not cpu_model_name.empty(), "Set a cpu model to use with the 'compound' host model");
+    xbt_enforce(not disk_model_name.empty(), "Set a disk model to use with the 'compound' host model");
     xbt_enforce(not network_model_name.empty(), "Set a network model to use with the 'compound' host model");
 
     const auto* cpu_model = find_model_description(surf_cpu_model_description, cpu_model_name);
     cpu_model->model_init_preparse();
 
+    const auto* disk_model = find_model_description(surf_disk_model_description, disk_model_name);
+    disk_model->model_init_preparse();
+
     const auto* network_model = find_model_description(surf_network_model_description, network_model_name);
     network_model->model_init_preparse();
   }
@@ -61,10 +65,6 @@ static void surf_config_models_setup()
    * To be reviewed in the future */
   surf_vm_model_init_HL13(
       simgrid::s4u::Engine::get_instance()->get_netzone_root()->get_impl()->get_cpu_pm_model().get());
-
-  XBT_DEBUG("Call disk_model_init");
-  const auto* disk_model = find_model_description(surf_disk_model_description, disk_model_name);
-  disk_model->model_init_preparse();
 }
 
 xbt::signal<void(bool symmetrical, kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst,
index 906b0be..6d34be9 100644 (file)
@@ -11,6 +11,8 @@
 #include "src/kernel/actor/ActorImpl.hpp"
 #include "src/kernel/actor/SimcallObserver.hpp"
 
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_io, s4u_activity, "S4U asynchronous I/Os");
+
 namespace simgrid::s4u {
 xbt::signal<void(Io const&)> Io::on_start;
 
@@ -25,6 +27,61 @@ IoPtr Io::init()
   return IoPtr(static_cast<Io*>(pimpl->get_iface()));
 }
 
+IoPtr Io::streamto_init(Host* from, Disk* from_disk, Host* to, Disk* to_disk)
+{
+  auto res = Io::init()->set_source(from, from_disk)->set_destination(to, to_disk);
+  res->set_state(State::STARTING);
+  return res;
+}
+
+IoPtr Io::streamto_async(Host* from, Disk* from_disk, Host* to, Disk* to_disk, uint64_t simulated_size_in_bytes)
+{
+  return Io::init()->set_size(simulated_size_in_bytes)->set_source(from, from_disk)->set_destination(to, to_disk);
+}
+
+void Io::streamto(Host* from, Disk* from_disk, Host* to, Disk* to_disk, uint64_t simulated_size_in_bytes)
+{
+  streamto_async(from, from_disk, to, to_disk, simulated_size_in_bytes)->wait();
+}
+
+IoPtr Io::set_source(Host* from, Disk* from_disk)
+{
+  xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+             "Cannot change the source of an IO stream once it's started (state: %s)", to_c_str(state_));
+  kernel::actor::simcall_answered(
+      [this, from, from_disk] {
+        boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)->set_host(from);
+        if (from_disk != nullptr)
+          boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)->set_disk(from_disk->get_impl());
+      });
+  // Setting 'source' may allow to start the activity, let's try
+  if (state_ == State::STARTING && remains_ <= 0)
+    XBT_DEBUG("This IO has a size of 0 byte. It cannot start yet");
+  else
+    vetoable_start();
+
+  return this;
+}
+
+IoPtr Io::set_destination(Host* to, Disk* to_disk)
+{
+  xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+             "Cannot change the source of an IO stream once it's started (state: %s)", to_c_str(state_));
+  kernel::actor::simcall_answered(
+      [this, to, to_disk] {
+        boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)->set_dst_host(to);
+        if (to_disk != nullptr)
+          boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)->set_dst_disk(to_disk->get_impl());
+      });
+  // Setting 'destination' may allow to start the activity, let's try
+  if (state_ == State::STARTING && remains_ <= 0)
+    XBT_DEBUG("This IO has a size of 0 byte. It cannot start yet");
+  else
+    vetoable_start();
+
+  return this;
+}
+
 Io* Io::start()
 {
   kernel::actor::simcall_answered(
@@ -109,7 +166,11 @@ sg_size_t Io::get_performed_ioops() const
 
 bool Io::is_assigned() const
 {
-  return boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)->get_disk() != nullptr;
+  if (boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)->get_host() == nullptr) {
+    return boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)->get_disk() != nullptr;
+  } else {
+    return boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)->get_dst_host() != nullptr;
+  }
 }
 
 } // namespace simgrid::s4u
index 0fa1beb..7d00fcf 100644 (file)
@@ -248,7 +248,7 @@ void sg_config_init(int *argc, char **argv)
   declare_model_flag("cpu/model", "Cas01", &_sg_cfg_cb__cpu_model, surf_cpu_model_description, "model",
                      "The model to use for the CPU");
 
-  declare_model_flag("disk/model", "default", &_sg_cfg_cb__disk_model, surf_disk_model_description, "model",
+  declare_model_flag("disk/model", "S19", &_sg_cfg_cb__disk_model, surf_disk_model_description, "model",
                      "The model to use for the disk");
 
   declare_model_flag("network/model", "LV08", &_sg_cfg_cb__network_model, surf_network_model_description, "model",
index 610068b..e2533b2 100644 (file)
@@ -29,6 +29,7 @@ public:
 
   virtual Action* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                                    const double* bytes_amount, double rate) = 0;
+  virtual Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size) = 0;
 };
 
 /************
index a6f894f..8299b4b 100644 (file)
@@ -3,6 +3,7 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
+#include "simgrid/sg_config.hpp"
 #include <simgrid/kernel/routing/NetPoint.hpp>
 #include <simgrid/kernel/routing/NetZoneImpl.hpp>
 #include <simgrid/s4u/Engine.hpp>
 #include "src/surf/disk_s19.hpp"
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(res_disk);
+/***********
+ * Options *
+ ***********/
+static simgrid::config::Flag<std::string> cfg_disk_solver("disk/solver",
+                                                          "Set linear equations solver used by disk model", "maxmin",
+                                                          &simgrid::kernel::lmm::System::validate_solver);
 
 /*********
  * Model *
  *********/
 
-void surf_disk_model_init_default()
+void surf_disk_model_init_S19()
 {
   auto disk_model = std::make_shared<simgrid::kernel::resource::DiskS19Model>("Disk");
   auto* engine    = simgrid::kernel::EngineImpl::get_instance();
@@ -28,6 +35,15 @@ void surf_disk_model_init_default()
 }
 
 namespace simgrid::kernel::resource {
+/*********
+ * Model *
+ *********/
+
+DiskS19Model::DiskS19Model(const std::string& name) : DiskModel(name)
+{
+  set_maxmin_system(lmm::System::build(cfg_disk_solver.get(), true /* selective update */));
+}
+
 
 DiskImpl* DiskS19Model::create_disk(const std::string& name, double read_bandwidth, double write_bandwidth)
 {
@@ -49,21 +65,21 @@ void DiskS19Model::update_actions_state(double /*now*/, double delta)
   }
 }
 
-DiskAction* DiskS19Model::io_start(const DiskImpl* disk, sg_size_t size, s4u::Io::OpType type)
+DiskAction* DiskS19::io_start(sg_size_t size, s4u::Io::OpType type)
 {
-  auto* action = new DiskS19Action(this, static_cast<double>(size), not disk->is_on());
-  get_maxmin_system()->expand(disk->get_constraint(), action->get_variable(), 1.0);
+  auto* action = new DiskS19Action(get_model(), static_cast<double>(size), not is_on());
+  get_model()->get_maxmin_system()->expand(get_constraint(), action->get_variable(), 1.0);
   switch (type) {
     case s4u::Io::OpType::READ:
-      get_maxmin_system()->expand(disk->get_read_constraint(), action->get_variable(), 1.0);
+      get_model()->get_maxmin_system()->expand(get_read_constraint(), action->get_variable(), 1.0);
       break;
     case s4u::Io::OpType::WRITE:
-      get_maxmin_system()->expand(disk->get_write_constraint(), action->get_variable(), 1.0);
+      get_model()->get_maxmin_system()->expand(get_write_constraint(), action->get_variable(), 1.0);
       break;
     default:
       THROW_UNIMPLEMENTED;
   }
-  if (const auto& factor_cb = disk->get_factor_cb()) { // handling disk variability
+  if (const auto& factor_cb = get_factor_cb()) { // handling disk variability
     action->set_rate_factor(factor_cb(size, type));
   }
   return action;
@@ -128,8 +144,4 @@ DiskS19Action::DiskS19Action(Model* model, double cost, bool failed)
 {
 }
 
-void DiskS19Action::update_remains_lazy(double /*now*/)
-{
-  THROW_IMPOSSIBLE;
-}
 } // namespace simgrid::kernel::resource
index a6413dd..b4c352b 100644 (file)
@@ -26,11 +26,11 @@ class XBT_PRIVATE DiskS19Action;
 
 class DiskS19Model : public DiskModel {
 public:
-  using DiskModel::DiskModel;
+  explicit DiskS19Model(const std::string& name);
+  DiskS19Model(const DiskS19Model&) = delete;
+  DiskS19Model& operator=(const DiskS19Model&) = delete;
   DiskImpl* create_disk(const std::string& name, double read_bandwidth, double write_bandwidth) override;
 
-  DiskAction* io_start(const DiskImpl* disk, sg_size_t size, s4u::Io::OpType type) override;
-
   void update_actions_state(double now, double delta) override;
 };
 
@@ -45,6 +45,7 @@ public:
   void set_write_bandwidth(double value) override;
   void set_readwrite_bandwidth(double value) override;
   void apply_event(kernel::profile::Event* triggered, double value) override;
+  DiskAction* io_start(sg_size_t size, s4u::Io::OpType type) override;
 };
 
 /**********
@@ -54,7 +55,6 @@ public:
 class DiskS19Action : public DiskAction {
 public:
   DiskS19Action(Model* model, double cost, bool failed);
-  void update_remains_lazy(double now) override;
 };
 
 } // namespace simgrid::kernel::resource
index 0005ed4..03dd2a9 100644 (file)
@@ -22,6 +22,7 @@ void surf_host_model_init_current_default()
   engine->add_model(host_model);
   engine->get_netzone_root()->set_host_model(host_model);
   surf_cpu_model_init_Cas01();
+  surf_disk_model_init_S19();
   surf_network_model_init_LegrandVelho();
 }
 
index e972cb7..9e073b5 100644 (file)
@@ -22,6 +22,7 @@ public:
   Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override;
   Action* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                            const double* bytes_amount, double rate) override;
+  Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size) override { return nullptr; }
 };
 } // namespace simgrid::kernel::resource
 
index 60dddcd..bee5736 100644 (file)
@@ -57,6 +57,8 @@ HostL07Model::HostL07Model(const std::string& name, lmm::System* sys) : HostMode
   auto cpu_model = std::make_shared<CpuL07Model>("Cpu_Ptask", this, sys);
   engine->add_model(cpu_model);
   engine->get_netzone_root()->set_cpu_pm_model(cpu_model);
+
+  surf_disk_model_init_S19();
 }
 
 CpuL07Model::CpuL07Model(const std::string& name, HostL07Model* hmodel, lmm::System* sys)
index 68fc050..c48d999 100644 (file)
@@ -41,6 +41,7 @@ public:
   Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override { return nullptr; }
   CpuAction* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                               const double* bytes_amount, double rate) override;
+  Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size) override { return nullptr; }
 };
 
 class CpuL07Model : public CpuModel {
diff --git a/src/surf/sio_S22.cpp b/src/surf/sio_S22.cpp
new file mode 100644 (file)
index 0000000..24fa3bf
--- /dev/null
@@ -0,0 +1,395 @@
+/* Copyright (c) 2022. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/kernel/routing/NetZoneImpl.hpp>
+#include <simgrid/s4u/Engine.hpp>
+#include <xbt/config.hpp>
+
+#include "simgrid/config.h"
+#include "src/kernel/EngineImpl.hpp"
+#if SIMGRID_HAVE_EIGEN3
+#include "src/kernel/lmm/bmf.hpp"
+#endif
+#include "src/kernel/resource/profile/Event.hpp"
+#include "src/surf/sio_S22.hpp"
+
+#include <unordered_set>
+
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(res_host);
+XBT_LOG_EXTERNAL_CATEGORY(xbt_cfg);
+
+/***********
+ * Options *
+ ***********/
+ static simgrid::config::Flag<std::string> cfg_sio_solver("host/sio_solver",
+                                                           "Set linear equations solver used by sio model",
+                                                           "fairbottleneck",
+                                                           &simgrid::kernel::lmm::System::validate_solver);
+
+/**************************************/
+/*** Resource Creation & Destruction **/
+/**************************************/
+void surf_host_model_init_sio_S22()
+{
+  XBT_CINFO(xbt_cfg, "Switching to the S22 model to handle streaming I/Os.");
+  xbt_assert(cfg_sio_solver != "maxmin", "Invalid configuration. Cannot use maxmin solver with streaming I/Os.");
+
+  auto* system    = simgrid::kernel::lmm::System::build(cfg_sio_solver.get(), true /* selective update */);
+  auto host_model = std::make_shared<simgrid::kernel::resource::HostS22Model>("Host_Sio", system);
+  auto* engine    = simgrid::kernel::EngineImpl::get_instance();
+  engine->add_model(host_model);
+  engine->get_netzone_root()->set_host_model(host_model);
+}
+
+namespace simgrid::kernel::resource {
+
+HostS22Model::HostS22Model(const std::string& name, lmm::System* sys) : HostModel(name)
+{
+  set_maxmin_system(sys);
+
+  auto net_model = std::make_shared<NetworkS22Model>("Network_Sio", this, sys);
+  auto engine    = EngineImpl::get_instance();
+  engine->add_model(net_model);
+  engine->get_netzone_root()->set_network_model(net_model);
+
+  auto disk_model = std::make_shared<DiskS22Model>("Disk_Sio", this, sys);
+  engine->add_model(disk_model);
+  engine->get_netzone_root()->set_disk_model(disk_model);
+
+  surf_cpu_model_init_Cas01();
+}
+
+DiskS22Model::DiskS22Model(const std::string& name, HostS22Model* hmodel, lmm::System* sys)
+    : DiskModel(name), hostModel_(hmodel)
+{
+  set_maxmin_system(sys);
+}
+
+DiskS22Model::~DiskS22Model()
+{
+  set_maxmin_system(nullptr);
+}
+
+NetworkS22Model::NetworkS22Model(const std::string& name, HostS22Model* hmodel, lmm::System* sys)
+    : NetworkModel(name), hostModel_(hmodel)
+{
+  set_maxmin_system(sys);
+  loopback_.reset(create_link("__loopback__", {simgrid::config::get_value<double>("network/loopback-bw")}));
+  loopback_->set_sharing_policy(s4u::Link::SharingPolicy::FATPIPE, {});
+  loopback_->set_latency(simgrid::config::get_value<double>("network/loopback-lat"));
+  loopback_->get_iface()->seal();
+}
+
+NetworkS22Model::~NetworkS22Model()
+{
+  set_maxmin_system(nullptr);
+}
+
+double HostS22Model::next_occurring_event(double now)
+{
+  double min = HostModel::next_occurring_event_full(now);
+  for (Action const& action : *get_started_action_set()) {
+    const auto& net_action = static_cast<const S22Action&>(action);
+    if (net_action.get_latency() > 0 && (min < 0 || net_action.get_latency() < min)) {
+      min = net_action.get_latency();
+      XBT_DEBUG("Updating min with %p (start %f): %f", &net_action, net_action.get_start_time(), min);
+    }
+  }
+  XBT_DEBUG("min value: %f", min);
+
+  return min;
+}
+
+void HostS22Model::update_actions_state(double /*now*/, double delta)
+{
+  for (auto it = std::begin(*get_started_action_set()); it != std::end(*get_started_action_set());) {
+    auto& action = static_cast<S22Action&>(*it);
+    ++it; // increment iterator here since the following calls to action.finish() may invalidate it
+    if (action.get_latency() > 0) {
+      if (action.get_latency() > delta) {
+        action.update_latency(delta, sg_surf_precision);
+      } else {
+        action.set_latency(0.0);
+      }
+      if ((action.get_latency() <= 0.0) && (action.is_suspended() == 0)) {
+        action.update_bound();
+        get_maxmin_system()->update_variable_penalty(action.get_variable(), 1.0);
+        action.set_last_update();
+      }
+    }
+    XBT_DEBUG("Action (%p) : remains (%g) updated by %g.", &action, action.get_remains(), action.get_rate() * delta);
+    action.update_remains(action.get_rate() * delta);
+    action.update_max_duration(delta);
+
+    XBT_DEBUG("Action (%p) : remains (%g).", &action, action.get_remains());
+
+    /* In the next if cascade, the action can be finished either because:
+     *  - The amount of remaining work reached 0
+     *  - The max duration was reached
+     * If it's not done, it may have failed.
+     */
+
+    if (((action.get_remains() <= 0) && (action.get_variable()->get_penalty() > 0)) ||
+        ((action.get_max_duration() != NO_MAX_DURATION) && (action.get_max_duration() <= 0))) {
+      action.finish(Action::State::FINISHED);
+      continue;
+    }
+
+    /* Need to check that none of the model has failed */
+    int i                               = 0;
+    const lmm::Constraint* cnst         = action.get_variable()->get_constraint(i);
+    while (cnst != nullptr) {
+      i++;
+      if (not cnst->get_id()->is_on()) {
+        XBT_DEBUG("Action (%p) Failed!!", &action);
+        action.finish(Action::State::FAILED);
+        break;
+      }
+      cnst = action.get_variable()->get_constraint(i);
+    }
+  }
+}
+
+S22Action::S22Action(Model* model, s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size)
+    : DiskAction(model, 1, false)
+    , src_host_(src_host)
+    , src_disk_(src_disk)
+    , dst_host_(dst_host)
+    , dst_disk_(dst_disk)
+    , size_(size)
+{
+  size_t disk_nb       = 0;
+  size_t link_nb       = 0;
+  double latency       = 0.0;
+  this->set_last_update();
+
+  if (src_disk_ != nullptr)
+    disk_nb++;
+  if (dst_disk_ != nullptr)
+    disk_nb++;
+
+  if (src_host_ != dst_host_ && size_ > 0) {
+    std::unordered_set<const char*> affected_links;
+    double lat = 0.0;
+    std::vector<StandardLinkImpl*> route;
+    src_host_->route_to(dst_host_, route, &lat);
+    latency = std::max(latency, lat);
+
+    for (auto const& link : route)
+      affected_links.insert(link->get_cname());
+
+    link_nb = affected_links.size();
+  }
+
+  XBT_DEBUG("Creating a stream io (%p) with %zu disk(s) and %zu unique link(s).", this, disk_nb, link_nb);
+  latency_ = latency;
+
+  set_variable(model->get_maxmin_system()->variable_new(this, 1.0, -1.0, disk_nb + link_nb));
+
+  if (latency_ > 0)
+    model->get_maxmin_system()->update_variable_penalty(get_variable(), 0.0);
+
+  /* Expand it for the disks even if there is nothing to read/write, to make sure that it gets expended even if there is no
+   * communication either */
+  if (src_disk_ != nullptr)
+    model->get_maxmin_system()->expand(src_disk_->get_constraint(), get_variable(), size, true);
+  if (dst_disk_ != nullptr)
+    model->get_maxmin_system()->expand(dst_disk_->get_constraint(), get_variable(), size, true);
+
+  if (src_host_ != dst_host_) {
+    std::vector<StandardLinkImpl*> route;
+    src_host_->route_to(dst_host_, route, nullptr);
+    for (auto const& link : route)
+      model->get_maxmin_system()->expand(link->get_constraint(), this->get_variable(), size_);
+  }
+
+  if (link_nb + disk_nb == 0) {
+    this->set_cost(1.0);
+    this->set_remains(0.0);
+  }
+
+  /* finally calculate the initial bound value */
+  update_bound();
+}
+
+DiskAction* HostS22Model::io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
+                                   double size)
+{
+  return new S22Action(this, src_host, src_disk, dst_host, dst_disk, size);
+}
+
+Action* NetworkS22Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double /*rate*/)
+{
+  return hostModel_->io_stream(src, nullptr, dst, nullptr, size);
+}
+
+DiskImpl* DiskS22Model::create_disk(const std::string& name, double read_bandwidth, double write_bandwidth)
+{
+  return (new DiskS22(name, read_bandwidth, write_bandwidth))->set_model(this);
+}
+
+StandardLinkImpl* NetworkS22Model::create_link(const std::string& name, const std::vector<double>& bandwidths)
+{
+  xbt_assert(bandwidths.size() == 1, "Non WIFI link must have only 1 bandwidth.");
+  auto link = new LinkS22(name, bandwidths[0], get_maxmin_system());
+  link->set_model(this);
+  return link;
+}
+
+StandardLinkImpl* NetworkS22Model::create_wifi_link(const std::string& name, const std::vector<double>& bandwidths)
+{
+  THROW_UNIMPLEMENTED;
+}
+
+/************
+ * Resource *
+ ************/
+DiskAction* DiskS22::io_start(sg_size_t size, s4u::Io::OpType type)
+{
+  DiskAction* res;
+  switch (type) {
+    case s4u::Io::OpType::READ:
+      res = static_cast<DiskS22Model*>(get_model())->hostModel_->io_stream(get_host(), this, get_host(), nullptr, size);
+    break;
+    case s4u::Io::OpType::WRITE:
+      res = static_cast<DiskS22Model*>(get_model())->hostModel_->io_stream(get_host(), nullptr, get_host(), this, size);
+    break;
+    default:
+      THROW_UNIMPLEMENTED;
+  }
+
+   return res;
+}
+
+void DiskS22::apply_event(kernel::profile::Event* triggered, double value)
+{
+  /* Find out which of my iterators was triggered, and react accordingly */
+  if (triggered == get_read_event()) {
+    set_read_bandwidth(value);
+    unref_read_event();
+  } else if (triggered == get_write_event()) {
+    set_write_bandwidth(value);
+    unref_write_event();
+  } else if (triggered == get_state_event()) {
+    if (value > 0)
+      turn_on();
+    else
+      turn_off();
+    unref_state_event();
+  } else {
+    xbt_die("Unknown event!\n");
+  }
+}
+
+LinkS22::LinkS22(const std::string& name, double bandwidth, lmm::System* system) : StandardLinkImpl(name)
+{
+  this->set_constraint(system->constraint_new(this, bandwidth));
+  bandwidth_.peak = bandwidth;
+}
+
+void LinkS22::apply_event(profile::Event* triggered, double value)
+{
+  XBT_DEBUG("Updating link %s (%p) with value=%f", get_cname(), this, value);
+  if (triggered == bandwidth_.event) {
+    set_bandwidth(value);
+    tmgr_trace_event_unref(&bandwidth_.event);
+
+  } else if (triggered == latency_.event) {
+    set_latency(value);
+    tmgr_trace_event_unref(&latency_.event);
+
+  } else if (triggered == get_state_event()) {
+    if (value > 0)
+      turn_on();
+    else
+      turn_off();
+    unref_state_event();
+  } else {
+    xbt_die("Unknown event ! \n");
+  }
+}
+
+void LinkS22::set_bandwidth(double value)
+{
+  bandwidth_.peak = value;
+  StandardLinkImpl::on_bandwidth_change();
+
+  get_model()->get_maxmin_system()->update_constraint_bound(get_constraint(), bandwidth_.peak * bandwidth_.scale);
+}
+
+void LinkS22::set_latency(double value)
+{
+  latency_check(value);
+  const lmm::Element* elem = nullptr;
+
+  latency_.peak = value;
+  while (const auto* var = get_constraint()->get_variable(&elem)) {
+    const auto* action = static_cast<S22Action*>(var->get_id());
+    action->update_bound();
+  }
+}
+
+/**********
+ * Action *
+ **********/
+
+double S22Action::calculate_io_read_bound() const
+{
+  double io_read_bound = std::numeric_limits<double>::max();
+
+  if (src_disk_ == nullptr)
+    return io_read_bound;
+
+  if (size_ > 0)
+    io_read_bound = std::min(io_read_bound, src_disk_->get_read_bandwidth() / size_);
+
+  return io_read_bound;
+}
+
+double S22Action::calculate_io_write_bound() const
+{
+  double io_write_bound = std::numeric_limits<double>::max();
+
+  if (dst_disk_ == nullptr)
+    return io_write_bound;
+
+  if (size_ > 0)
+    io_write_bound = std::min(io_write_bound, dst_disk_->get_write_bandwidth() / size_);
+
+  return io_write_bound;
+}
+
+double S22Action::calculate_network_bound() const
+{
+  double lat = 0.0;
+  double lat_bound   = std::numeric_limits<double>::max();
+
+  if (src_host_ == dst_host_)
+    return lat_bound;
+
+  if (size_ <= 0){
+    std::vector<StandardLinkImpl*> route;
+    src_host_->route_to(dst_host_, route, &lat);
+  }
+
+  if (lat > 0)
+    lat_bound = NetworkModel::cfg_tcp_gamma / (2.0 * lat);
+
+  return lat_bound;
+}
+
+void S22Action::update_bound() const
+{
+  double bound = std::min(std::min(calculate_io_read_bound(), calculate_io_write_bound()),
+                          calculate_network_bound());
+
+  XBT_DEBUG("action (%p) : bound = %g", this, bound);
+
+  /* latency has been paid (or no latency), we can set the appropriate bound for network limit */
+  if ((bound < std::numeric_limits<double>::max()) && (latency_ <= 0.0))
+    get_model()->get_maxmin_system()->update_variable_bound(get_variable(), bound);
+ }
+
+} // namespace simgrid::kernel::resource
diff --git a/src/surf/sio_S22.hpp b/src/surf/sio_S22.hpp
new file mode 100644 (file)
index 0000000..a8d7cb7
--- /dev/null
@@ -0,0 +1,151 @@
+/* Copyright (c) 2022. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+
+#include "src/kernel/resource/NetworkModel.hpp"
+#include "src/surf/HostImpl.hpp"
+#include <xbt/base.h>
+
+#ifndef SIO_S22_HPP_
+#define SIO_S22_HPP_
+
+namespace simgrid::kernel::resource {
+
+/***********
+ * Classes *
+ ***********/
+
+class XBT_PRIVATE HostS22Model;
+class XBT_PRIVATE DiskS22Model;
+class XBT_PRIVATE NetworkS22Model;
+
+class XBT_PRIVATE DiskS22;
+class XBT_PRIVATE LinkS22;
+
+class XBT_PRIVATE S22Action;
+
+/*********
+ * Model *
+ *********/
+class HostS22Model : public HostModel {
+public:
+  HostS22Model(const std::string& name, lmm::System* sys);
+  HostS22Model(const HostS22Model&) = delete;
+  HostS22Model& operator=(const HostS22Model&) = delete;
+
+  double next_occurring_event(double now) override;
+  void update_actions_state(double now, double delta) override;
+  Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override { return nullptr; }
+  CpuAction* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
+                              const double* bytes_amount, double rate) override { return nullptr; }
+  DiskAction* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
+                        double size) override;
+};
+
+class DiskS22Model : public DiskModel {
+public:
+  DiskS22Model(const std::string& name, HostS22Model* hmodel, lmm::System* sys);
+  DiskS22Model(const DiskS22Model&) = delete;
+  DiskS22Model& operator=(const DiskS22Model&) = delete;
+  ~DiskS22Model() override;
+  void update_actions_state(double /*now*/, double /*delta*/) override{
+      /* this action is done by HostS22Model which shares the LMM system with the Disk model
+       * Overriding to an empty function here allows us to handle the DiskS22Model as a regular
+       * method in EngineImpl::presolve */
+  };
+
+  DiskImpl* create_disk(const std::string& name, double read_bandwidth, double write_bandwidth) override;
+  HostS22Model* hostModel_;
+};
+
+class NetworkS22Model : public NetworkModel {
+public:
+  NetworkS22Model(const std::string& name, HostS22Model* hmodel, lmm::System* sys);
+  NetworkS22Model(const NetworkS22Model&) = delete;
+  NetworkS22Model& operator=(const NetworkS22Model&) = delete;
+  ~NetworkS22Model() override;
+  StandardLinkImpl* create_link(const std::string& name, const std::vector<double>& bandwidths) final;
+  StandardLinkImpl* create_wifi_link(const std::string& name, const std::vector<double>& bandwidths) override;
+
+  Action* communicate(s4u::Host* src, s4u::Host* dst, double size, double rate) override;
+  void update_actions_state(double /*now*/, double /*delta*/) override{
+      /* this action is done by HostS22Model which shares the LMM system with the Network model
+       * Overriding to an empty function here allows us to handle the NetworkS22Model as a regular
+       * method in EngineImpl::presolve */
+  };
+
+  HostS22Model* hostModel_;
+};
+
+/************
+ * Resource *
+ ************/
+
+class DiskS22 : public DiskImpl {
+public:
+  using DiskImpl::DiskImpl;
+  DiskS22(const DiskS22&) = delete;
+  DiskS22& operator=(const DiskS22&) = delete;
+
+  void apply_event(profile::Event* event, double value) override;
+  DiskAction* io_start(sg_size_t size, s4u::Io::OpType type) override;
+};
+
+class LinkS22 : public StandardLinkImpl {
+public:
+  LinkS22(const std::string& name, double bandwidth, lmm::System* system);
+  LinkS22(const LinkS22&) = delete;
+  LinkS22& operator=(const LinkS22&) = delete;
+  ~LinkS22() = default;
+
+  void apply_event(profile::Event* event, double value) override;
+  void set_bandwidth(double value) override;
+  void set_latency(double value) override;
+};
+
+/**********
+ * Action *
+ **********/
+
+class S22Action : public DiskAction {
+  const s4u::Host* src_host_;
+  const DiskImpl* src_disk_;
+  const s4u::Host* dst_host_;
+  const DiskImpl* dst_disk_;
+
+  const double size_;
+
+  double latency_;
+
+  friend DiskAction* DiskS22::io_start(sg_size_t size, s4u::Io::OpType type);
+  friend Action* NetworkS22Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate);
+
+  double calculate_io_read_bound() const;
+  double calculate_io_write_bound() const;
+
+  /**
+   * @brief Calculate the network bound for the parallel task
+   *
+   * The network bound depends on the largest latency between the communication in the ptask.
+   * Return MAX_DOUBLE if latency is 0 (or ptask doesn't have any communication)
+   */
+  double calculate_network_bound() const;
+
+public:
+  S22Action() = delete;
+  S22Action(Model* model, s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size);
+  S22Action(const S22Action&) = delete;
+  S22Action& operator=(const S22Action&) = delete;
+  ~S22Action() = default;
+
+  void update_bound() const;
+  double get_latency() const { return latency_; }
+  void set_latency(double latency) { latency_ = latency; }
+  void update_latency(double delta, double precision) { double_update(&latency_, delta, precision); }
+};
+
+} // namespace simgrid::kernel::resource
+
+#endif /* SIO_S22_HPP_ */
index 26eecf3..8b46ada 100644 (file)
@@ -70,13 +70,19 @@ const std::vector<surf_model_description_t> surf_cpu_model_description = {
     {"Cas01", "Simplistic CPU model (time=size/speed).", &surf_cpu_model_init_Cas01},
 };
 
+const std::vector<surf_model_description_t> surf_disk_model_description = {
+    {"S19", "Simplistic disk model.", &surf_disk_model_init_S19},
+};
+
 const std::vector<surf_model_description_t> surf_host_model_description = {
-    {"default", "Default host model. Currently, CPU:Cas01 and network:LV08 (with cross traffic enabled)",
+    {"default", "Default host model. Currently, CPU:Cas01, network:LV08 (with cross traffic enabled), and disk:S19",
      &surf_host_model_init_current_default},
-    {"compound", "Host model that is automatically chosen if you change the network and CPU models",
+    {"compound", "Host model that is automatically chosen if you change the CPU, network, and disk models",
      &surf_host_model_init_compound},
-    {"ptask_L07", "Host model somehow similar to Cas01+CM02 but allowing parallel tasks",
+    {"ptask_L07", "Host model somehow similar to Cas01+CM02+S19 but allowing parallel tasks",
      &surf_host_model_init_ptask_L07},
+    {"sio_S22", "Host model somehow similar to Cas01+CM02+S19 but allowing streaming I/Os",
+     &surf_host_model_init_sio_S22},
 };
 
 const std::vector<surf_model_description_t> surf_optimization_mode_description = {
@@ -88,10 +94,6 @@ const std::vector<surf_model_description_t> surf_optimization_mode_description =
     {"Full", "Full update of remaining and variables. Slow but may be useful when debugging.", nullptr},
 };
 
-const std::vector<surf_model_description_t> surf_disk_model_description = {
-    {"default", "Simplistic disk model.", &surf_disk_model_init_default},
-};
-
 /* returns whether #file_path is an absolute file path. Surprising, isn't it ? */
 static bool is_absolute_file_path(const std::string& file_path)
 {
index 8a6d580..f40f731 100644 (file)
@@ -75,6 +75,9 @@ static inline int double_equals(double value1, double value2, double precision)
  */
 XBT_PUBLIC void surf_cpu_model_init_Cas01();
 
+XBT_ATTRIB_DEPRECATED_v337 ("Please use surf_disk_model_init_S19()") XBT_PUBLIC void surf_disk_model_init_default();
+XBT_PUBLIC void surf_disk_model_init_S19();
+
 /** @ingroup SURF_models
  *  @brief Same as network model 'LagrangeVelho', only with different correction factors.
  *
@@ -175,8 +178,7 @@ XBT_PUBLIC void surf_host_model_init_current_default();
  *  equal share of the model to each action.
  */
 XBT_PUBLIC void surf_host_model_init_ptask_L07();
-
-XBT_PUBLIC void surf_disk_model_init_default();
+XBT_PUBLIC void surf_host_model_init_sio_S22();
 
 /* --------------------
  *  Model Descriptions
index bc7a813..57b21fe 100644 (file)
@@ -11,7 +11,8 @@ foreach(x actor actor-autorestart actor-suspend
        monkey-masterworkers monkey-semaphore
         concurrent_rw
         dag-incomplete-simulation dependencies
-        host-on-off host-on-off-actors host-on-off-recv host-multicore-speed-file io-set-bw
+        host-on-off host-on-off-actors host-on-off-recv host-multicore-speed-file
+        io-set-bw io-stream
         basic-link-test basic-parsing-test evaluate-get-route-time evaluate-parse-time is-router
         storage_client_server listen_async pid
         trace-integration
@@ -39,7 +40,7 @@ set_property(TARGET activity-lifecycle APPEND PROPERTY INCLUDE_DIRECTORIES "${IN
 ## Add the tests.
 ## Some need to be run with all factories, some don't need tesh to run
 foreach(x actor actor-autorestart actor-suspend activity-lifecycle comm-get-sender wait-all-for wait-any-for
-        cloud-interrupt-migration cloud-two-execs concurrent_rw dag-incomplete-simulation dependencies io-set-bw
+        cloud-interrupt-migration cloud-two-execs concurrent_rw dag-incomplete-simulation dependencies io-set-bw io-stream
              vm-live-migration vm-suicide)
   set(tesh_files    ${tesh_files}    ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.tesh)
   ADD_TESH_FACTORIES(tesh-s4u-${x} "*" --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/s4u/${x} --setenv srcdir=${CMAKE_HOME_DIRECTORY}/teshsuite/s4u/${x} --setenv platfdir=${CMAKE_HOME_DIRECTORY}/examples/platforms --cd ${CMAKE_BINARY_DIR}/teshsuite/s4u/${x} ${CMAKE_HOME_DIRECTORY}/teshsuite/s4u/${x}/${x}.tesh)
diff --git a/teshsuite/s4u/io-stream/io-stream.cpp b/teshsuite/s4u/io-stream/io-stream.cpp
new file mode 100644 (file)
index 0000000..d6b69af
--- /dev/null
@@ -0,0 +1,111 @@
+/* Copyright (c) 2017-2022. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/s4u.hpp>
+
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(io_stream, "Messages specific for this simulation");
+
+static void streamer(size_t size)
+{
+  auto* bob        = sg4::Host::by_name("bob");
+  auto* bob_disk   = bob->get_disks().front();
+  auto* alice      = sg4::Host::by_name("alice");
+  auto* alice_disk = alice->get_disks().front();
+  double clock = sg4::Engine::get_clock();
+
+  XBT_INFO("[Bob -> Alice] Store and Forward (1 block)");
+  bob_disk->read(size);
+  XBT_INFO("    Read  : %.6f seconds", sg4::Engine::get_clock() - clock);
+  clock = sg4::Engine::get_clock();
+  sg4::Comm::sendto(bob, alice, size);
+  XBT_INFO("    Send  : %.6f seconds", sg4::Engine::get_clock() - clock);
+  clock = sg4::Engine::get_clock();
+  alice_disk->write(size);
+  XBT_INFO("    Write : %.6f seconds", sg4::Engine::get_clock() - clock);
+  XBT_INFO("    Total : %.6f seconds", sg4::Engine::get_clock());
+
+  XBT_INFO("[Bob -> Alice] Store and Forward (100 blocks)");
+  sg4::IoPtr read = bob_disk->read_async(size/100.0);
+  sg4::CommPtr transfer = sg4::Comm::sendto_async(bob, alice, size/100.0);
+  sg4::IoPtr write = alice_disk->write_async(size/100.);
+
+  clock = sg4::Engine::get_clock();
+
+  for (int i = 0; i < 99; i++){
+    read->wait();
+    read = bob_disk->read_async(size/100.0);
+    transfer->wait();
+    transfer = sg4::Comm::sendto_async(bob, alice, size/100.0);
+    write->wait();
+    write = alice_disk->write_async(size/100.);
+  }
+
+  read->wait();
+  transfer->wait();
+  write->wait();
+  XBT_INFO("    Total : %.6f seconds", sg4::Engine::get_clock() - clock);
+
+  XBT_INFO("[Bob -> Alice] Streaming (Read bottleneck)");
+  clock = sg4::Engine::get_clock();
+  sg4::Io::streamto(bob, bob_disk, alice, alice_disk, size);
+  XBT_INFO("    Total : %.6f seconds", sg4::Engine::get_clock() - clock);
+
+  XBT_INFO("[Bob -> Alice] Streaming (Write bottleneck)");
+  clock = sg4::Engine::get_clock();
+  sg4::Io::streamto(alice, alice_disk, bob, bob_disk, size);
+  XBT_INFO("    Total : %.6f seconds", sg4::Engine::get_clock() - clock);
+
+  XBT_INFO("Start two 10-second background traffic between Bob and Alice");
+  sg4::CommPtr bt1 = sg4::Comm::sendto_async(bob, alice, 2e7);
+  sg4::CommPtr bt2 = sg4::Comm::sendto_async(bob, alice, 2e7);
+  XBT_INFO("[Bob -> Alice] Streaming (Transfer bottleneck)");
+  clock = sg4::Engine::get_clock();
+  sg4::Io::streamto(bob, bob_disk, alice, alice_disk, size);
+  XBT_INFO("    Total : %.6f seconds", sg4::Engine::get_clock() - clock);
+  bt1->wait();
+  bt2->wait();
+
+  XBT_INFO("[Bob -> Alice] Streaming \"from disk to memory\" (no write)");
+  clock = sg4::Engine::get_clock();
+  sg4::Io::streamto(bob, bob_disk, alice, nullptr, size);
+  XBT_INFO("    Total : %.6f seconds", sg4::Engine::get_clock() - clock);
+
+  XBT_INFO("[Bob -> Alice] Streaming \"from memory to disk\" (no read)");
+  clock = sg4::Engine::get_clock();
+  sg4::Io::streamto(bob, nullptr, alice, alice_disk, size);
+  XBT_INFO("    Total : %.6f seconds", sg4::Engine::get_clock() - clock);
+
+  XBT_INFO("[Bob -> Bob] Disk to disk (no transfer)");
+  clock = sg4::Engine::get_clock();
+  sg4::Io::streamto(bob, bob_disk, bob, bob_disk, size);
+  XBT_INFO("    Total : %.6f seconds", sg4::Engine::get_clock() - clock);
+
+}
+
+int main(int argc, char** argv)
+{
+  sg4::Engine e(&argc, argv);
+
+  /* simple platform containing 2 hosts and 2 disks */
+  auto* zone = sg4::create_full_zone("");
+  auto* bob  = zone->create_host("bob", 1e6);
+  auto* alice  = zone->create_host("alice", 1e6);
+
+  sg4::LinkInRoute link(zone->create_link("link", "2MBps")->set_latency("50us")->seal());
+  zone->add_route(bob->get_netpoint(), alice->get_netpoint(), nullptr, nullptr, {link}, true);
+
+  bob->create_disk("bob_disk", 1e6, 5e5);
+  alice->create_disk("alice_disk", 4e6, 4e6);
+
+  zone->seal();
+
+  sg4::Actor::create("streamer", bob, streamer, 4e6);
+
+  e.run();
+
+  return 0;
+}
diff --git a/teshsuite/s4u/io-stream/io-stream.tesh b/teshsuite/s4u/io-stream/io-stream.tesh
new file mode 100644 (file)
index 0000000..f2dbda3
--- /dev/null
@@ -0,0 +1,25 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/io-stream  --cfg=host/model:sio_S22 "--log=root.fmt:[%10.6r]%e[%i:%a@%h]%e%m%n"
+> [  0.000000] [0:maestro@] Configuration change: Set 'host/model' to 'sio_S22'
+> [  0.000000] [0:maestro@] Switching to the S22 model to handle streaming I/Os.
+> [  0.000000] [1:streamer@bob] [Bob -> Alice] Store and Forward (1 block)
+> [  4.000000] [1:streamer@bob]     Read  : 4.000000 seconds
+> [  6.000050] [1:streamer@bob]     Send  : 2.000050 seconds
+> [  7.000050] [1:streamer@bob]     Write : 1.000000 seconds
+> [  7.000050] [1:streamer@bob]     Total : 7.000050 seconds
+> [  7.000050] [1:streamer@bob] [Bob -> Alice] Store and Forward (100 blocks)
+> [ 11.000050] [1:streamer@bob]     Total : 4.000000 seconds
+> [ 11.000050] [1:streamer@bob] [Bob -> Alice] Streaming (Read bottleneck)
+> [ 15.000100] [1:streamer@bob]     Total : 4.000050 seconds
+> [ 15.000100] [1:streamer@bob] [Bob -> Alice] Streaming (Write bottleneck)
+> [ 23.000150] [1:streamer@bob]     Total : 8.000050 seconds
+> [ 23.000150] [1:streamer@bob] Start two 10-second background traffic between Bob and Alice
+> [ 23.000150] [1:streamer@bob] [Bob -> Alice] Streaming (Transfer bottleneck)
+> [ 29.000200] [1:streamer@bob]     Total : 6.000050 seconds
+> [ 45.000200] [1:streamer@bob] [Bob -> Alice] Streaming "from disk to memory" (no write)
+> [ 49.000250] [1:streamer@bob]     Total : 4.000050 seconds
+> [ 49.000250] [1:streamer@bob] [Bob -> Alice] Streaming "from memory to disk" (no read)
+> [ 51.000300] [1:streamer@bob]     Total : 2.000050 seconds
+> [ 51.000300] [1:streamer@bob] [Bob -> Bob] Disk to disk (no transfer)
+> [ 59.000300] [1:streamer@bob]     Total : 8.000000 seconds
\ No newline at end of file
index e3970ee..74c3cdf 100644 (file)
@@ -43,6 +43,7 @@ set(EXTRA_DIST
   src/surf/network_ns3.hpp
   src/surf/ns3/ns3_simulator.hpp
   src/surf/ptask_L07.hpp
+  src/surf/sio_S22.hpp
   src/surf/surf_interface.hpp
   src/surf/xml/simgrid.dtd
   src/surf/xml/simgrid_dtd.c
@@ -360,6 +361,7 @@ set(SURF_SRC
   src/surf/network_cm02.cpp
   src/surf/network_constant.cpp
   src/surf/ptask_L07.cpp
+  src/surf/sio_S22.cpp
   src/surf/sg_platf.cpp
   src/surf/surf_interface.cpp
   src/surf/xml/platf.hpp