Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
introduce thread_execute
authorSUTER Frederic <frederic.suter@cc.in2p3.fr>
Tue, 8 Mar 2022 01:31:29 +0000 (02:31 +0100)
committerSUTER Frederic <frederic.suter@cc.in2p3.fr>
Tue, 8 Mar 2022 01:31:29 +0000 (02:31 +0100)
17 files changed:
ChangeLog
MANIFEST.in
examples/cpp/CMakeLists.txt
examples/cpp/exec-thread/s4u-exec-thread.cpp [new file with mode: 0644]
examples/cpp/exec-thread/s4u-exec-thread.tesh [new file with mode: 0644]
include/simgrid/s4u/Actor.hpp
include/simgrid/s4u/Exec.hpp
src/kernel/activity/ExecImpl.cpp
src/kernel/activity/ExecImpl.hpp
src/kernel/resource/VirtualMachineImpl.cpp
src/kernel/resource/VirtualMachineImpl.hpp
src/s4u/s4u_Actor.cpp
src/s4u/s4u_Exec.cpp
src/surf/HostImpl.hpp
src/surf/host_clm03.cpp
src/surf/host_clm03.hpp
src/surf/ptask_L07.hpp

index baaf8e4..8656b62 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -21,6 +21,10 @@ SMPI:
 
 S4U:
  - New signal: Engine::on_simulation_start_cb()
+ - Introduce a new execution mode with this_actor::thread_execute(). This simulate 
+   the execution of a certain amount of flops by multiple threads ran by a host. Each
+   thread executes the same number of flops, given as argument. An example of this new
+   function can be found in examples/cpp/exec-thread.
  - Reimplementation of barriers natively. 
    Previously, they were implemented on top of s4u::Mutex and s4u::ConditionVariable. 
    The new version should be faster (and can be used in the model-checker).
index 7428bde..cec3181 100644 (file)
@@ -275,6 +275,8 @@ include examples/cpp/exec-ptask/s4u-exec-ptask.cpp
 include examples/cpp/exec-ptask/s4u-exec-ptask.tesh
 include examples/cpp/exec-remote/s4u-exec-remote.cpp
 include examples/cpp/exec-remote/s4u-exec-remote.tesh
+include examples/cpp/exec-thread/s4u-exec-thread.cpp
+include examples/cpp/exec-thread/s4u-exec-thread.tesh
 include examples/cpp/exec-unassigned/s4u-exec-unassigned.cpp
 include examples/cpp/exec-unassigned/s4u-exec-unassigned.tesh
 include examples/cpp/exec-waitany/s4u-exec-waitany.cpp
@@ -525,6 +527,8 @@ include examples/python/comm-waitall/comm-waitall.py
 include examples/python/comm-waitall/comm-waitall.tesh
 include examples/python/comm-waitany/comm-waitany.py
 include examples/python/comm-waitany/comm-waitany.tesh
+include examples/python/comm-waitfor/comm-waitfor.py
+include examples/python/comm-waitfor/comm-waitfor.tesh
 include examples/python/exec-async/exec-async.py
 include examples/python/exec-async/exec-async.tesh
 include examples/python/exec-basic/exec-basic.py
@@ -851,6 +855,7 @@ include teshsuite/s4u/wait-any-for/wait-any-for.tesh
 include teshsuite/smpi/MBI/CollMatchingGenerator.py
 include teshsuite/smpi/MBI/MBI.py
 include teshsuite/smpi/MBI/MBIutils.py
+include teshsuite/smpi/MBI/ResleakGenerator.py
 include teshsuite/smpi/MBI/generator_utils.py
 include teshsuite/smpi/MBI/simgrid.py
 include teshsuite/smpi/auto-shared/auto-shared.c
index 89d9024..8af7978 100644 (file)
@@ -119,7 +119,7 @@ foreach (example activity-testany activity-waitany
                  energy-exec energy-boot energy-link energy-vm energy-exec-ptask energy-wifi
                  engine-filtering engine-run-partial
                  exec-async exec-basic exec-dvfs exec-remote exec-waitany exec-waitfor exec-dependent exec-unassigned
-                 exec-ptask-multicore exec-ptask-multicore-latency exec-cpu-nonlinear exec-cpu-factors exec-failure
+                 exec-ptask-multicore exec-ptask-multicore-latency exec-cpu-nonlinear exec-cpu-factors exec-failure exec-thread
                  maestro-set
                  mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
                  network-ns3 network-ns3-wifi network-wifi
diff --git a/examples/cpp/exec-thread/s4u-exec-thread.cpp b/examples/cpp/exec-thread/s4u-exec-thread.cpp
new file mode 100644 (file)
index 0000000..8d4675f
--- /dev/null
@@ -0,0 +1,57 @@
+/* Copyright (c) 2017-2022. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/s4u.hpp>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_thread, "Messages specific for this s4u example");
+
+namespace sg4 = simgrid::s4u;
+
+static void runner()
+{
+  auto e                    = sg4::Engine::get_instance();
+  sg4::Host* multicore_host = e->host_by_name("MyHost1");
+  // First test with less than, same number as, and more threads than cores
+  double start_time = sg4::Engine::get_clock();
+  sg4::this_actor::thread_execute(multicore_host, 1e9, 2);
+  XBT_INFO("Computed 2-thread activity on a 4-core host. Took %g s", e->get_clock() - start_time);
+
+  start_time = sg4::Engine::get_clock();
+  sg4::this_actor::thread_execute(multicore_host, 1e9, 4);
+  XBT_INFO("Computed 4-thread activity on a 4-core host. Took %g s", e->get_clock() - start_time);
+
+  start_time = sg4::Engine::get_clock();
+  sg4::this_actor::thread_execute(multicore_host, 1e9, 6);
+  XBT_INFO("Computed 6-thread activity on a 4-core host. Took %g s", e->get_clock() - start_time);
+
+  auto MyHost1                          = e->host_by_name("MyHost1");
+  simgrid::s4u::ExecPtr background_task = MyHost1->exec_async(2.5e9);
+  XBT_INFO("Start a 1-core background task on the 4-core host.");
+
+  start_time = sg4::Engine::get_clock();
+  sg4::this_actor::thread_execute(multicore_host, 1e9, 2);
+  XBT_INFO("Computed 2-thread activity on a 4-core host. Took %g s", e->get_clock() - start_time);
+
+  start_time = sg4::Engine::get_clock();
+  sg4::this_actor::thread_execute(multicore_host, 1e9, 4);
+  XBT_INFO("Computed 4-thread activity on a 4-core host. Took %g s", e->get_clock() - start_time);
+
+  background_task->wait();
+  XBT_INFO("The background task has ended.");
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+
+  xbt_assert(argc == 2, "Usage: %s <platform file>", argv[0]);
+
+  e.load_platform(argv[1]);
+  sg4::Actor::create("test", e.host_by_name("MyHost1"), runner);
+
+  e.run();
+  XBT_INFO("Simulation done.");
+  return 0;
+}
diff --git a/examples/cpp/exec-thread/s4u-exec-thread.tesh b/examples/cpp/exec-thread/s4u-exec-thread.tesh
new file mode 100644 (file)
index 0000000..fb2eba3
--- /dev/null
@@ -0,0 +1,11 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-exec-thread ${platfdir}/energy_platform.xml "--log=root.fmt:[%10.6r]%e%m%n"
+> [ 10.000000] Computed 2-thread activity on a 4-core host. Took 10 s
+> [ 20.000000] Computed 4-thread activity on a 4-core host. Took 10 s
+> [ 35.000000] Computed 6-thread activity on a 4-core host. Took 15 s
+> [ 35.000000] Start a 1-core background task on the 4-core host.
+> [ 45.000000] Computed 2-thread activity on a 4-core host. Took 10 s
+> [ 57.500000] Computed 4-thread activity on a 4-core host. Took 12.5 s
+> [ 62.500000] The background task has ended.
+> [ 62.500000] Simulation done.
\ No newline at end of file
index 4ed1da9..7b8b429 100644 (file)
@@ -111,6 +111,9 @@ XBT_PUBLIC void execute(double flop, double priority);
 XBT_PUBLIC void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
                                  const std::vector<double>& bytes_amounts);
 
+/** Block the current actor until the built multi-thread execution completes */
+XBT_PUBLIC void thread_execute(s4u::Host* host, double flop_amounts, int thread_count);
+
 /** Initialize a sequential execution that must then be started manually */
 XBT_PUBLIC ExecPtr exec_init(double flops_amounts);
 /** Initialize a parallel execution that must then be started manually */
index 754b4bd..3458593 100644 (file)
@@ -81,6 +81,8 @@ public:
   ExecPtr set_flops_amounts(const std::vector<double>& flops_amounts);
   ExecPtr set_bytes_amounts(const std::vector<double>& bytes_amounts);
 
+  ExecPtr set_thread_count(int thread_count);
+
   ExecPtr set_bound(double bound);
   ExecPtr set_priority(double priority);
   ExecPtr update_priority(double priority);
index 2eaaa8f..d7e0986 100644 (file)
@@ -70,14 +70,25 @@ ExecImpl& ExecImpl::set_bytes_amounts(const std::vector<double>& bytes_amounts)
 
   return *this;
 }
+ExecImpl& ExecImpl::set_thread_count(int thread_count)
+{
+  thread_count_ = thread_count;
+
+  return *this;
+}
 
 ExecImpl* ExecImpl::start()
 {
   set_state(State::RUNNING);
   if (not MC_is_active() && not MC_record_replay_is_active()) {
     if (hosts_.size() == 1) {
-      surf_action_ = hosts_.front()->get_cpu()->execution_start(flops_amounts_.front(), bound_);
-      surf_action_->set_sharing_penalty(sharing_penalty_);
+      if (thread_count_ == 1) {
+        surf_action_ = hosts_.front()->get_cpu()->execution_start(flops_amounts_.front(), bound_);
+        surf_action_->set_sharing_penalty(sharing_penalty_);
+      } else {
+        auto host_model = hosts_.front()->get_netpoint()->get_englobing_zone()->get_host_model();
+        surf_action_    = host_model->execute_thread(hosts_.front(), flops_amounts_.front(), thread_count_);
+      }
       surf_action_->set_category(get_tracing_category());
     } else {
       // get the model from first host since we have only 1 by now
index cbe373f..f69b6ee 100644 (file)
@@ -23,6 +23,7 @@ class XBT_PUBLIC ExecImpl : public ActivityImpl_T<ExecImpl> {
   std::vector<s4u::Host*> hosts_;
   std::vector<double> flops_amounts_;
   std::vector<double> bytes_amounts_;
+  int thread_count_ = 1;
   int cb_id_ = -1; // callback id from Host::on_state_change.connect()
 
 public:
@@ -42,6 +43,7 @@ public:
 
   ExecImpl& set_flops_amounts(const std::vector<double>& flops_amounts);
   ExecImpl& set_bytes_amounts(const std::vector<double>& bytes_amounts);
+  ExecImpl& set_thread_count(int thread_count);
   ExecImpl& set_hosts(const std::vector<s4u::Host*>& hosts);
 
   unsigned int get_host_number() const { return hosts_.size(); }
index fdb27a1..c1ae6a1 100644 (file)
@@ -14,6 +14,8 @@
 #include "src/surf/cpu_cas01.hpp"
 #include "src/surf/cpu_ti.hpp"
 
+#include <numeric>
+
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(res_vm, ker_resource, "Virtual Machines, containing actors and mobile across hosts");
 
 void surf_vm_model_init_HL13(simgrid::kernel::resource::CpuModel* cpu_pm_model)
@@ -169,6 +171,14 @@ double VMModel::next_occurring_event(double now)
   return -1.0;
 }
 
+Action* VMModel::execute_thread(const s4u::Host* host, double flops_amount, int thread_count)
+{
+  auto cpu = host->get_cpu();
+  return cpu->execution_start(thread_count * flops_amount, thread_count,
+                              cpu->get_speed(1.0) / cpu->get_speed_ratio() *
+                                  std::min(thread_count, cpu->get_core_count()));
+}
+
 /************
  * Resource *
  ************/
index fc8672a..30355aa 100644 (file)
@@ -90,6 +90,7 @@ public:
 
   double next_occurring_event(double now) override;
   void update_actions_state(double /*now*/, double /*delta*/) override{};
+  Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override;
   Action* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                            const double* bytes_amount, double rate) override
   {
index 605a4c6..ce00aaf 100644 (file)
@@ -358,6 +358,10 @@ void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<do
   exec_init(hosts, flops_amounts, bytes_amounts)->wait();
 }
 
+void thread_execute(s4u::Host* host, double flops_amount, int thread_count)
+{
+  Exec::init()->set_flops_amount(flops_amount)->set_host(host)->set_thread_count(thread_count)->wait();
+}
 ExecPtr exec_init(double flops_amount)
 {
   return Exec::init()->set_flops_amount(flops_amount)->set_host(get_host());
index 8f5ec42..58b5abf 100644 (file)
@@ -137,6 +137,16 @@ ExecPtr Exec::set_bytes_amounts(const std::vector<double>& bytes_amounts)
   return this;
 }
 
+ExecPtr Exec::set_thread_count(int thread_count)
+{
+  xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+             "Cannot change the bytes_amounts of an exec after its start");
+  kernel::actor::simcall_answered([this, thread_count] {
+    boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->set_thread_count(thread_count);
+  });
+  return this;
+}
+
 /** @brief Retrieve the host on which this activity takes place.
  *  If it runs on more than one host, only the first host is returned.
  */
index 8d8179a..36f71ba 100644 (file)
@@ -28,6 +28,8 @@ namespace resource {
 class XBT_PRIVATE HostModel : public Model {
 public:
   using Model::Model;
+  virtual Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) = 0;
+
   virtual Action* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                                    const double* bytes_amount, double rate) = 0;
 };
index 185b5d2..55b8cf4 100644 (file)
@@ -94,6 +94,17 @@ Action* HostCLM03Model::execute_parallel(const std::vector<s4u::Host*>& host_lis
   return action;
 }
 
+Action* HostCLM03Model::execute_thread(const s4u::Host* host, double flops_amount, int thread_count)
+{
+  auto cpu = host->get_cpu();
+  /* Create a single action whose cost is thread_count * flops_amount,, that requests thread_count cores and
+   * is bounded to the currently available speed (i.e., under current load) time the minimum  between the
+   * number of thread and the number of cores of the host */
+  return cpu->execution_start(thread_count * flops_amount, thread_count,
+                              cpu->get_speed(1.0) / cpu->get_speed_ratio() *
+                                  std::min(thread_count, cpu->get_core_count()));
+}
+
 } // namespace resource
 } // namespace kernel
 } // namespace simgrid
index 43aa5e9..7e1a81f 100644 (file)
@@ -21,6 +21,7 @@ public:
   using HostModel::HostModel;
   double next_occurring_event(double now) override;
   void update_actions_state(double now, double delta) override;
+  Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override;
   Action* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                            const double* bytes_amount, double rate) override;
 };
index 439c8b4..fa57e85 100644 (file)
@@ -39,6 +39,7 @@ public:
 
   double next_occurring_event(double now) override;
   void update_actions_state(double now, double delta) override;
+  Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override { return nullptr; }
   CpuAction* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                               const double* bytes_amount, double rate) override;
 };