Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'example-battery-chiller-solar' into 'master'
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Mon, 30 Oct 2023 02:36:31 +0000 (02:36 +0000)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Mon, 30 Oct 2023 02:36:31 +0000 (02:36 +0000)
add battery-chiller-solar example.

See merge request simgrid/simgrid!174

83 files changed:
BuildSimGrid.sh
CMakeLists.txt
ChangeLog
MANIFEST.in
docs/source/Installing_SimGrid.rst
docs/source/app_s4u.rst
examples/cpp/CMakeLists.txt
examples/cpp/clusters-multicpu/s4u-clusters-multicpu.cpp
examples/cpp/task-dispatch/s4u-task-dispatch.cpp [new file with mode: 0644]
examples/cpp/task-dispatch/s4u-task-dispatch.tesh [new file with mode: 0644]
examples/cpp/task-parallelism/s4u-task-parallelism.tesh
examples/cpp/task-storm/s4u-task-storm.cpp
examples/cpp/task-storm/s4u-task-storm.tesh
examples/cpp/task-switch-host/s4u-task-switch-host.cpp
examples/python/task-io/task-io.py
examples/python/task-simple/task-simple.py
examples/python/task-simple/task-simple.tesh
examples/python/task-switch-host/task-switch-host.py
examples/python/task-variable-load/task-variable-load.py
examples/smpi/mc/sendsend.tesh
examples/sthread/CMakeLists.txt
examples/sthread/pthread-mc-mutex-recursive.tesh [new file with mode: 0644]
examples/sthread/pthread-mc-mutex-simple.tesh
examples/sthread/pthread-mc-mutex-simpledeadlock.tesh
examples/sthread/pthread-mc-producer-consumer.tesh
examples/sthread/pthread-mutex-recursive.c [new file with mode: 0644]
examples/sthread/pthread-mutex-recursive.tesh [new file with mode: 0644]
examples/sthread/pthread-mutex-simple.c
examples/sthread/pthread-mutex-simple.tesh
examples/sthread/pthread-mutex-simpledeadlock.c
examples/sthread/pthread-producer-consumer.tesh
examples/sthread/stdobject/stdobject.cpp
examples/sthread/stdobject/stdobject.tesh
examples/sthread/sthread-mutex-simple.tesh
include/simgrid/forward.h
include/simgrid/s4u/Mutex.hpp
include/simgrid/s4u/NetZone.hpp
include/simgrid/s4u/Task.hpp
src/bindings/python/simgrid_python.cpp
src/kernel/activity/MailboxImpl.cpp
src/kernel/activity/MailboxImpl.hpp
src/kernel/activity/MutexImpl.cpp
src/kernel/activity/MutexImpl.hpp
src/kernel/resource/DiskImpl.hpp
src/kernel/resource/HostImpl.cpp
src/kernel/resource/HostImpl.hpp
src/kernel/resource/VirtualMachineImpl.hpp
src/kernel/resource/models/host_clm03.cpp
src/kernel/resource/models/host_clm03.hpp
src/kernel/resource/models/network_ns3.cpp
src/kernel/resource/models/ptask_L07.cpp
src/kernel/resource/models/ptask_L07.hpp
src/kernel/routing/ClusterZone.cpp
src/kernel/routing/NetZone_test.hpp
src/kernel/xml/sg_platf.cpp
src/mc/transition/TransitionSynchro.cpp
src/s4u/s4u_Mutex.cpp
src/s4u/s4u_Netzone.cpp
src/s4u/s4u_Task.cpp
src/simgrid/sg_config.hpp
src/smpi/bindings/smpi_pmpi_request.cpp
src/smpi/include/smpi_actor.hpp
src/smpi/internals/smpi_actor.cpp
src/sthread/ObjectAccess.cpp
src/sthread/sthread.c
src/sthread/sthread.h
src/sthread/sthread_impl.cpp
src/xbt/xbt_os_time.c
teshsuite/smpi/MBI/CMakeLists.txt
teshsuite/smpi/MBI/InputHazardGenerator.py
teshsuite/smpi/MBI/MBIutils.py
teshsuite/smpi/MBI/P2PBufferingGenerator.py
teshsuite/smpi/MBI/P2PSendrecvArgGenerator.py
teshsuite/smpi/MBI/generator_utils.py
tools/cmake/Distrib.cmake
tools/cmake/MakeLib.cmake
tools/jenkins/build.sh
tools/jenkins/ci-bigdft.sh
tools/jenkins/ci-starpu.sh
tools/jenkins/project_description.sh
tools/tesh/catch-signal.tesh
tools/tesh/catch-timeout-output.tesh
tools/tesh/tesh.py

index 66b309e..410d1dd 100755 (executable)
@@ -17,6 +17,7 @@ fi
 
 target=examples
 ncores=$(grep -c processor /proc/cpuinfo)
+halfcores=$(expr $ncores / 2 + 1)
 
 install_path=$(sed -n 's/^CMAKE_INSTALL_PREFIX:PATH=//p' CMakeCache.txt)
 if [ -e ${install_path} ] && [ -d ${install_path} ] && [ -x ${install_path} ] && [ -w ${install_path} ] ; then
@@ -32,7 +33,8 @@ fi
 (
   echo "install_path: ${install_path}"
   echo "Target: ${target}"
-  echo "Cores: ${ncores}"
-  (nice ${builder} -j${ncores} ${target} tests || make ${target} tests) && nice ctest -j${ncores} --output-on-failure ; date
+  echo "Cores to build: ${ncores}"
+  echo "Cores to test: ${halfcores}"
+  (nice ${builder} -j${ncores} ${target} tests || ${builder} ${target} tests) && nice ctest -j${halfcores} --output-on-failure ; date
 ) 2>&1 | tee BuildSimGrid.sh.log
 
index 1a4a245..ec4f39b 100644 (file)
@@ -217,21 +217,25 @@ if(enable_ns3)
 endif()
 
 ### Check for Eigen library
-set(SIMGRID_HAVE_EIGEN3 OFF)
-find_package (Eigen3 3.3 CONFIG
-              HINTS ${EIGEN3_HINT})
-if (Eigen3_FOUND)
-  set(SIMGRID_HAVE_EIGEN3 ON) 
-  message(STATUS "Found Eigen3: ${EIGEN3_INCLUDE_DIR}")
-  include_directories(${EIGEN3_INCLUDE_DIR})
-  if ("3.3.4" VERSION_EQUAL EIGEN3_VERSION_STRING AND CMAKE_COMPILER_IS_GNUCC)
-    message(STATUS "Avoid build error of Eigen3 v3.3.4 using -Wno-error=int-in-bool-context")
-    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=int-in-bool-context")
+if ((NOT DEFINED EIGEN3_HINT) OR (NOT EIGEN3_HINT STRLESS_EQUAL "OFF"))
+  set(SIMGRID_HAVE_EIGEN3 OFF)
+  find_package (Eigen3 3.3 CONFIG
+                HINTS ${EIGEN3_HINT})
+  if (Eigen3_FOUND)
+    set(SIMGRID_HAVE_EIGEN3 ON)
+    message(STATUS "Found Eigen3: ${EIGEN3_INCLUDE_DIR}")
+    include_directories(${EIGEN3_INCLUDE_DIR})
+    if ("3.3.4" VERSION_EQUAL EIGEN3_VERSION_STRING AND CMAKE_COMPILER_IS_GNUCC)
+      message(STATUS "Avoid build error of Eigen3 v3.3.4 using -Wno-error=int-in-bool-context")
+      set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=int-in-bool-context")
+    endif()
+  else()
+    message(STATUS "Disabling model BMF because Eigen3 was not found. If it's installed, use EIGEN3_HINT to hint cmake about the location of Eigen3Config.cmake")
   endif()
+  mark_as_advanced(Eigen3_DIR)
 else()
-  message(STATUS "Disabling model BMF because Eigen3 was not found. If it's installed, use EIGEN3_HINT to hint cmake about the location of Eigen3Config.cmake")
+  message(STATUS "Disabling Eigen3 as requested by the user (EIGEN3_HINT is set to 'OFF')")
 endif()
-mark_as_advanced(Eigen3_DIR)
 
 # Check for our JSON dependency
 set(SIMGRID_HAVE_JSON 0)
@@ -964,4 +968,3 @@ execute_process(COMMAND ${CMAKE_COMMAND} -E make_directory ${PROJECT_BINARY_DIR}
 file(WRITE ${PROJECT_BINARY_DIR}/Testing/Notes/Build  "GIT version : ${GIT_VERSION}\n")
 file(APPEND ${PROJECT_BINARY_DIR}/Testing/Notes/Build "Release     : simgrid-${release_version}\n")
 
-INCLUDE(Dart)
index 893664d..605242b 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -8,11 +8,16 @@ S4U:
    on symmetric routes.
  - Introduce a Mailbox::get_async() with no payload parameter. You can use the new 
    Comm::get_payload() once the communication is over to retrieve the payload.
+ - Implement recursive mutexes. Simply pass true to the constructor to get one.
 
 SMPI:
  - New SMPI_app_instance_join(): wait for the completion of a started MPI instance
  - MPI_UNIVERSE_SIZE now initialized to the total amount of hosts in the platform
 
+sthread:
+ - Implement recursive pthreads.
+ - Many bug fixes.
+
 Python:
  - Make the host_load plugin available from Python. See examples/python/plugin-host-load
  - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
index 422f0e4..2c3927c 100644 (file)
@@ -352,8 +352,6 @@ include examples/cpp/network-ns3/s4u-network-ns3-timed.tesh
 include examples/cpp/network-ns3/s4u-network-ns3.cpp
 include examples/cpp/network-wifi/s4u-network-wifi.cpp
 include examples/cpp/network-wifi/s4u-network-wifi.tesh
-include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp
-include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh
 include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.cpp
 include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.tesh
 include examples/cpp/platform-failures/s4u-platform-failures.cpp
@@ -384,6 +382,8 @@ include examples/cpp/replay-io/s4u-replay-io.txt
 include examples/cpp/replay-io/s4u-replay-io_d.xml
 include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.cpp
 include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.tesh
+include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp
+include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh
 include examples/cpp/synchro-barrier/s4u-mc-synchro-barrier.tesh
 include examples/cpp/synchro-barrier/s4u-synchro-barrier.cpp
 include examples/cpp/synchro-barrier/s4u-synchro-barrier.tesh
@@ -398,6 +398,8 @@ include examples/cpp/synchro-mutex/s4u-synchro-mutex.tesh
 include examples/cpp/synchro-semaphore/s4u-mc-synchro-semaphore.tesh
 include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.cpp
 include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.tesh
+include examples/cpp/task-dispatch/s4u-task-dispatch.cpp
+include examples/cpp/task-dispatch/s4u-task-dispatch.tesh
 include examples/cpp/task-io/s4u-task-io.cpp
 include examples/cpp/task-io/s4u-task-io.tesh
 include examples/cpp/task-microservice/s4u-task-microservice.cpp
@@ -645,9 +647,12 @@ include examples/smpi/trace_call_location/trace_call_location.c
 include examples/smpi/trace_call_location/trace_call_location.tesh
 include examples/smpi/trace_simple/trace_simple.c
 include examples/smpi/trace_simple/trace_simple.tesh
+include examples/sthread/pthread-mc-mutex-recursive.tesh
 include examples/sthread/pthread-mc-mutex-simple.tesh
 include examples/sthread/pthread-mc-mutex-simpledeadlock.tesh
 include examples/sthread/pthread-mc-producer-consumer.tesh
+include examples/sthread/pthread-mutex-recursive.c
+include examples/sthread/pthread-mutex-recursive.tesh
 include examples/sthread/pthread-mutex-simple.c
 include examples/sthread/pthread-mutex-simple.tesh
 include examples/sthread/pthread-mutex-simpledeadlock.c
index 477b53e..7d26867 100644 (file)
@@ -108,7 +108,7 @@ Eigen3 (optional)
   - On Debian / Ubuntu: ``apt install libeigen3-dev``
   - On CentOS / Fedora: ``dnf install eigen3-devel``
   - On macOS with homebrew: ``brew install eigen``
-  - Use EIGEN3_HINT to specify where it's installed if cmake doesn't find it automatically.
+  - Use EIGEN3_HINT to specify where it's installed if cmake doesn't find it automatically. Set EIGEN3_HINT=OFF to disable detection even if it could be found.
 JSON (optional, for the DAG wfcommons loader)
   - On Debian / Ubuntu: ``apt install nlohmann-json3-dev``
   - Use nlohmann_json_HINT to specify where it's installed if cmake doesn't find it automatically.
@@ -269,6 +269,7 @@ NS3_HINT (empty by default)
 
 EIGEN3_HINT (empty by default)
   Alternative path into which Eigen3 should be searched for.
+  Providing the value OFF as an hint will disable the detection alltogether.
 
 SIMGRID_PYTHON_LIBDIR (auto-detected)
   Where to install the Python module library. By default, it is set to the cmake Python3_SITEARCH variable if installing to /usr,
index 23e0e04..8695720 100644 (file)
@@ -203,32 +203,33 @@ Repeatable Activities
 
 In order to simulate the execution of Dataflow applications, we introduced the
 concept of |API_s4u_Tasks|, that can be seen as repeatable activities. A Dataflow
-is defined as a graph of |API_s4u_Tasks| through which circulate Tokens. Tokens
-can carry any user-defined data, using the same internal mechanisms as for the
-other simulated objects. Each Task has to receive a token from each of its
-predecessor to fire a new instance of a :ref:`Communication <API_s4u_Comm>`,
-:ref:`Execution <API_s4u_Exec>`, or :ref:`I/O <API_s4u_Io>` activity.
-On completion of this activity, the Task propagates tokens
-to its successors, and waits for the next set of tokens to arrive.
-Multiple instances of the same Task can run in parallel by adjusting its
-horizontal scaling with
-:cpp:func:`s4u::Task::set_parallelism_degree() <simgrid::s4u::Task::set_parallelism_degree>`.
+is defined as a graph of |API_s4u_Tasks|, where each |API_s4u_Tasks| has a set of
+successors and predecessors. When a |API_s4u_Tasks| ends it sends a token to each
+of its successors. Each |API_s4u_Tasks| has to receive a token from each of its
+predecessor to start. Tokens can carry any user-defined data.
 
-:ref:`Communications <API_s4u_Comm>` (started on Mailboxes and consuming links),
-:ref:`Executions <API_s4u_Exec>` (started on Host and consuming CPU resources)
-:ref:`I/O <API_s4u_Io>` (started on and consuming disks).
+|API_s4u_Tasks| are composed of several instances: a dispatcher, a collector, and
+instance_0 to instance_n. The dispatcher rely on a load balancing function to select
+the next instance to fire. Once this instance finishes it fires the collector.
+
+Each instance of an |API_s4u_ExecTask| can be placed on a different host.
+|API_s4u_Comm| activities are automatically created when an instance triggers
+another instance on a different host. Each instance has its own parallelism degree
+to scale horizontally on several cores.
 
 To initiate the execution of a Dataflow, it is possible to some make
 |API_s4u_Tasks| fire one or more activities without waiting for any token with the
 :cpp:func:`s4u::Task::enqueue_firings() <simgrid::s4u::Task::enqueue_firings>`
 function.
 
-The parameters and successors of a Task can be redefined at runtime by attaching
+The parameters of Tasks can be redefined at runtime by attaching
 callbacks to the
 :cpp:func:`s4u::Task::on_this_start <simgrid::s4u::Task::on_this_start>`
 and
 :cpp:func:`s4u::Task::on_this_completion <simgrid::s4u::Task::on_this_completion>`
-signals.
+signals. The former is triggered by instances others than the dispatcher and the collector,
+and the latter is triggered by the collector.
+
 
 
 .. _s4u_mailbox:
index a0fc73f..6f1af7f 100644 (file)
@@ -172,7 +172,7 @@ foreach (example activityset-testany activityset-waitany activityset-waitall act
                  mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
                  network-ns3 network-ns3-wifi network-wifi
                  io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent
-                 task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load 
+                 task-dispatch task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load
                  solar-panel-simple
                  platform-comm-serialize platform-failures platform-profile platform-properties
                  plugin-host-load plugin-jbod plugin-link-load plugin-prodcons
index 8ef6337..b2ca2e7 100644 (file)
@@ -87,8 +87,7 @@ public:
  * @param id Internal identifier in the torus (for information)
  * @return netpoint, gateway: the netpoint to the StarZone and CPU0 as gateway
  */
-static std::pair<simgrid::kernel::routing::NetPoint*, simgrid::kernel::routing::NetPoint*>
-create_hostzone(const sg4::NetZone* zone, const std::vector<unsigned long>& /*coord*/, unsigned long id)
+static sg4::NetZone* create_hostzone(const sg4::NetZone* zone, const std::vector<unsigned long>& /*coord*/, unsigned long id)
 {
   constexpr int num_cpus    = 8;     //!< Number of CPUs in the zone
   constexpr double speed    = 1e9;   //!< Speed of each CPU
@@ -101,14 +100,13 @@ create_hostzone(const sg4::NetZone* zone, const std::vector<unsigned long>& /*co
   /* setting my Torus parent zone */
   host_zone->set_parent(zone);
 
-  simgrid::kernel::routing::NetPoint* gateway = nullptr;
   /* create CPUs */
   for (int i = 0; i < num_cpus; i++) {
     std::string cpu_name  = hostname + "-cpu" + std::to_string(i);
     const sg4::Host* host = host_zone->create_host(cpu_name, speed);
     /* the first CPU is the gateway */
     if (i == 0)
-      gateway = host->get_netpoint();
+      host_zone->set_gateway(host->get_netpoint());
     /* create split-duplex link */
     auto* link = host_zone->create_split_duplex_link("link-" + cpu_name, link_bw)->set_latency(link_lat);
     /* connecting CPU to outer world */
@@ -116,7 +114,7 @@ create_hostzone(const sg4::NetZone* zone, const std::vector<unsigned long>& /*co
   }
   /* seal newly created netzone */
   host_zone->seal();
-  return std::make_pair(host_zone->get_netpoint(), gateway);
+  return host_zone;
 }
 
 /*************************************************************************************************/
diff --git a/examples/cpp/task-dispatch/s4u-task-dispatch.cpp b/examples/cpp/task-dispatch/s4u-task-dispatch.cpp
new file mode 100644 (file)
index 0000000..10bd0d2
--- /dev/null
@@ -0,0 +1,103 @@
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_dispatch, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
+
+static void manager(sg4::ExecTaskPtr t)
+{
+  auto PM0 = sg4::Engine::get_instance()->host_by_name("PM0");
+  auto PM1 = sg4::Engine::get_instance()->host_by_name("PM1");
+
+  XBT_INFO("Test set_flops");
+  t->enqueue_firings(2);
+  sg4::this_actor::sleep_for(50);
+  XBT_INFO("Set instance_0 flops to 50.");
+  t->set_flops(50 * PM0->get_speed());
+  sg4::this_actor::sleep_for(250);
+  t->set_flops(100 * PM0->get_speed());
+
+  XBT_INFO("Test set_parallelism degree");
+  t->enqueue_firings(3);
+  sg4::this_actor::sleep_for(50);
+  XBT_INFO("Set Task parallelism degree to 2.");
+  t->set_parallelism_degree(2);
+  sg4::this_actor::sleep_for(250);
+  t->set_parallelism_degree(1);
+
+  XBT_INFO("Test set_host dispatcher");
+  t->enqueue_firings(2);
+  sg4::this_actor::sleep_for(50);
+  XBT_INFO("Move dispatcher to PM1");
+  t->set_host(PM1, "dispatcher");
+  t->set_internal_bytes(1e6, "dispatcher");
+  sg4::this_actor::sleep_for(250);
+  t->set_host(PM0, "dispatcher");
+
+  XBT_INFO("Test set_host instance_0");
+  t->enqueue_firings(2);
+  sg4::this_actor::sleep_for(50);
+  XBT_INFO("Move instance_0 to PM1");
+  t->set_host(PM1, "instance_0");
+  t->set_flops(100 * PM1->get_speed());
+  t->set_internal_bytes(1e6, "instance_0");
+  sg4::this_actor::sleep_for(250);
+  t->set_host(PM0, "instance_0");
+  t->set_flops(100 * PM0->get_speed());
+
+  XBT_INFO("Test set_host collector");
+  t->enqueue_firings(2);
+  sg4::this_actor::sleep_for(50);
+  XBT_INFO("Move collector to PM1");
+  t->set_host(PM1, "collector");
+  sg4::this_actor::sleep_for(250);
+  t->set_host(PM0, "collector");
+
+  XBT_INFO("Test add_instances");
+  t->enqueue_firings(1);
+  sg4::this_actor::sleep_for(50);
+  XBT_INFO("Add 1 instance and update load balancing function");
+  t->add_instances(1);
+  t->set_load_balancing_function([]() {
+    static int round_robin_counter = 0;
+    int ret                        = round_robin_counter;
+    round_robin_counter            = round_robin_counter == 1 ? 0 : round_robin_counter + 1;
+    return "instance_" + std::to_string(ret);
+  });
+  t->enqueue_firings(2);
+  sg4::this_actor::sleep_for(250);
+
+  XBT_INFO("Test remove_instances");
+  XBT_INFO("Remove 1 instance and update load balancing function");
+  t->remove_instances(1);
+  t->set_load_balancing_function([]() { return "instance_0"; });
+  t->enqueue_firings(2);
+  sg4::this_actor::sleep_for(300);
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+  e.load_platform(argv[1]);
+  auto PM0 = e.host_by_name("PM0");
+  auto PM1 = sg4::Engine::get_instance()->host_by_name("PM1");
+
+  auto a = sg4::ExecTask::init("A", 100 * PM0->get_speed(), PM0);
+  auto b = sg4::ExecTask::init("B", 50 * PM0->get_speed(), PM0);
+  auto c = sg4::CommTask::init("C", 1e6, PM1, PM0);
+
+  a->add_successor(b);
+
+  sg4::Task::on_completion_cb(
+      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
+  sg4::Task::on_start_cb([](const sg4::Task* t) { XBT_INFO("Task %s start", t->get_name().c_str()); });
+
+  sg4::Actor::create("manager", PM0, manager, a);
+
+  e.run();
+  return 0;
+}
diff --git a/examples/cpp/task-dispatch/s4u-task-dispatch.tesh b/examples/cpp/task-dispatch/s4u-task-dispatch.tesh
new file mode 100644 (file)
index 0000000..a4ace54
--- /dev/null
@@ -0,0 +1,82 @@
+#!/usr/bin/env tesh
+
+> > $ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml
+> [PM0:manager:(1) 0.000000] [task_dispatch/INFO] Test set_flops
+> [0.000000] [task_dispatch/INFO] Task A start
+> [0.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 50.000000] [task_dispatch/INFO] Set instance_0 flops to 50.
+> [100.000000] [task_dispatch/INFO] Task A finished (1)
+> [100.000000] [task_dispatch/INFO] Task B start
+> [150.000000] [task_dispatch/INFO] Task A finished (2)
+> [150.000000] [task_dispatch/INFO] Task B start
+> [150.000000] [task_dispatch/INFO] Task B finished (1)
+> [200.000000] [task_dispatch/INFO] Task B finished (2)
+> [PM0:manager:(1) 300.000000] [task_dispatch/INFO] Test set_parallelism degree
+> [300.000000] [task_dispatch/INFO] Task A start
+> [300.000000] [task_dispatch/INFO] Task A start
+> [300.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 350.000000] [task_dispatch/INFO] Set Task parallelism degree to 2.
+> [400.000000] [task_dispatch/INFO] Task A finished (3)
+> [400.000000] [task_dispatch/INFO] Task B start
+> [450.000000] [task_dispatch/INFO] Task A finished (4)
+> [450.000000] [task_dispatch/INFO] Task B start
+> [450.000000] [task_dispatch/INFO] Task B finished (3)
+> [500.000000] [task_dispatch/INFO] Task A finished (5)
+> [500.000000] [task_dispatch/INFO] Task B start
+> [500.000000] [task_dispatch/INFO] Task B finished (4)
+> [550.000000] [task_dispatch/INFO] Task B finished (5)
+> [PM0:manager:(1) 600.000000] [task_dispatch/INFO] Test set_host dispatcher
+> [600.000000] [task_dispatch/INFO] Task A start
+> [600.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 650.000000] [task_dispatch/INFO] Move dispatcher to PM1
+> [700.000000] [task_dispatch/INFO] Task A finished (6)
+> [700.000000] [task_dispatch/INFO] Task B start
+> [750.000000] [task_dispatch/INFO] Task B finished (6)
+> [800.009961] [task_dispatch/INFO] Task A finished (7)
+> [800.009961] [task_dispatch/INFO] Task B start
+> [850.009961] [task_dispatch/INFO] Task B finished (7)
+> [PM0:manager:(1) 900.000000] [task_dispatch/INFO] Test set_host instance_0
+> [900.000000] [task_dispatch/INFO] Task A start
+> [900.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 950.000000] [task_dispatch/INFO] Move instance_0 to PM1
+> [1000.000000] [task_dispatch/INFO] Task A finished (8)
+> [1000.000000] [task_dispatch/INFO] Task B start
+> [1050.000000] [task_dispatch/INFO] Task B finished (8)
+> [1100.019922] [task_dispatch/INFO] Task A finished (9)
+> [1100.019922] [task_dispatch/INFO] Task B start
+> [1150.019922] [task_dispatch/INFO] Task B finished (9)
+> [PM0:manager:(1) 1200.000000] [task_dispatch/INFO] Test set_host collector
+> [1200.000000] [task_dispatch/INFO] Task A start
+> [1200.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 1250.000000] [task_dispatch/INFO] Move collector to PM1
+> [1300.000000] [task_dispatch/INFO] Task A finished (10)
+> [1300.000000] [task_dispatch/INFO] Task B start
+> [1350.000000] [task_dispatch/INFO] Task B finished (10)
+> [1400.009961] [task_dispatch/INFO] Task A finished (11)
+> [1400.009961] [task_dispatch/INFO] Task B start
+> [1450.009961] [task_dispatch/INFO] Task B finished (11)
+> [PM0:manager:(1) 1500.000000] [task_dispatch/INFO] Test add_instances
+> [1500.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 1550.000000] [task_dispatch/INFO] Add 1 instance and update load balancing function
+> [1550.000000] [task_dispatch/INFO] Task A start
+> [1550.000000] [task_dispatch/INFO] Task A start
+> [1600.000000] [task_dispatch/INFO] Task A finished (12)
+> [1600.000000] [task_dispatch/INFO] Task B start
+> [1650.000000] [task_dispatch/INFO] Task A finished (13)
+> [1650.000000] [task_dispatch/INFO] Task B start
+> [1650.000000] [task_dispatch/INFO] Task B finished (12)
+> [1700.000000] [task_dispatch/INFO] Task A finished (14)
+> [1700.000000] [task_dispatch/INFO] Task B start
+> [1700.000000] [task_dispatch/INFO] Task B finished (13)
+> [1750.000000] [task_dispatch/INFO] Task B finished (14)
+> [PM0:manager:(1) 1800.000000] [task_dispatch/INFO] Test remove_instances
+> [PM0:manager:(1) 1800.000000] [task_dispatch/INFO] Remove 1 instance and update load balancing function
+> [1800.000000] [task_dispatch/INFO] Task A start
+> [1800.000000] [task_dispatch/INFO] Task A start
+> [1900.000000] [task_dispatch/INFO] Task A finished (15)
+> [1900.000000] [task_dispatch/INFO] Task B start
+> [1950.000000] [task_dispatch/INFO] Task B finished (15)
+> [2000.000000] [task_dispatch/INFO] Task A finished (16)
+> [2000.000000] [task_dispatch/INFO] Task B start
+> [2050.000000] [task_dispatch/INFO] Task B finished (16)
+> 
\ No newline at end of file
index 938ad96..492d015 100644 (file)
@@ -2,40 +2,40 @@
 
 $ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml
 > [0.000000] [task_parallelism/INFO] Task exec_A start
-> [100.000000] [task_parallelism/INFO] Task exec_A finished (1)
 > [100.000000] [task_parallelism/INFO] Task exec_A start
+> [100.000000] [task_parallelism/INFO] Task exec_A finished (1)
 > [200.000000] [task_parallelism/INFO] Task exec_A finished (2)
 > [300.000000] [task_parallelism/INFO] Task exec_A start
 > [300.000000] [task_parallelism/INFO] Task exec_A start
-> [400.000000] [task_parallelism/INFO] Task exec_A finished (3)
 > [400.000000] [task_parallelism/INFO] Task exec_A start
-> [400.000000] [task_parallelism/INFO] Task exec_A finished (4)
 > [400.000000] [task_parallelism/INFO] Task exec_A start
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (3)
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (4)
 > [500.000000] [task_parallelism/INFO] Task exec_A finished (5)
 > [500.000000] [task_parallelism/INFO] Task exec_A finished (6)
 > [600.000000] [task_parallelism/INFO] Task exec_A start
-> [700.000000] [task_parallelism/INFO] Task exec_A finished (7)
 > [700.000000] [task_parallelism/INFO] Task exec_A start
+> [700.000000] [task_parallelism/INFO] Task exec_A finished (7)
 > [800.000000] [task_parallelism/INFO] Task exec_A finished (8)
 > [900.000000] [task_parallelism/INFO] Task exec_A start
 > [900.000000] [task_parallelism/INFO] Task exec_A start
-> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9)
 > [1000.000000] [task_parallelism/INFO] Task exec_A start
-> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10)
 > [1000.000000] [task_parallelism/INFO] Task exec_A start
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9)
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10)
+> [1100.000000] [task_parallelism/INFO] Task exec_A start
 > [1100.000000] [task_parallelism/INFO] Task exec_A finished (11)
 > [1100.000000] [task_parallelism/INFO] Task exec_A finished (12)
-> [1100.000000] [task_parallelism/INFO] Task exec_A start
-> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13)
 > [1200.000000] [task_parallelism/INFO] Task exec_A start
+> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13)
 > [1250.000000] [task_parallelism/INFO] Task exec_A start
 > [1250.000000] [task_parallelism/INFO] Task exec_A start
-> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14)
 > [1300.000000] [task_parallelism/INFO] Task exec_A start
-> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15)
+> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14)
 > [1350.000000] [task_parallelism/INFO] Task exec_A start
-> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16)
 > [1350.000000] [task_parallelism/INFO] Task exec_A start
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15)
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16)
 > [1400.000000] [task_parallelism/INFO] Task exec_A finished (17)
 > [1450.000000] [task_parallelism/INFO] Task exec_A finished (18)
 > [1450.000000] [task_parallelism/INFO] Task exec_A finished (19)
\ No newline at end of file
index 58d08c3..0a0ab71 100644 (file)
@@ -74,10 +74,10 @@ int main(int argc, char* argv[])
      Alternatively we: remove/add the link between SA and SA_to_B2
                        add/remove the link between SA and SA_to_B1
   */
-  SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) {
+  SA->on_this_completion_cb([&SA_to_B1, &SA_to_B2](sg4::Task* t) {
     int count = t->get_count();
     sg4::CommTaskPtr comm;
-    if (count % 2 == 0) {
+    if (count % 2 == 1) {
       t->remove_successor(SA_to_B2);
       t->add_successor(SA_to_B1);
       comm = SA_to_B1;
@@ -86,7 +86,8 @@ int main(int argc, char* argv[])
       t->add_successor(SA_to_B2);
       comm = SA_to_B2;
     }
-    std::vector<double> amount = {1e3, 1e6, 1e9};
+    std::vector<double> amount = {1e9, 1e3, 1e6};
+    // XBT_INFO("Comm %f", amount[count % 3]);
     comm->set_amount(amount[count % 3]);
     auto token = std::make_shared<sg4::Token>();
     token->set_data(new double(amount[count % 3]));
@@ -94,18 +95,26 @@ int main(int argc, char* argv[])
   });
 
   // The token sent by SA is forwarded by both communication tasks
-  SA_to_B1->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
-  SA_to_B2->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
+  SA_to_B1->on_this_completion_cb([&SA](sg4::Task* t) {
+    t->set_token(t->get_token_from(SA));
+    t->deque_token_from(SA);
+  });
+  SA_to_B2->on_this_completion_cb([&SA](sg4::Task* t) {
+    t->set_token(t->get_token_from(SA));
+    t->deque_token_from(SA);
+  });
 
   /* B1 and B2 read the value of the token received by their predecessors
      and use it to adapt their amount of work to do.
   */
-  B1->on_this_start_cb([SA_to_B1](sg4::Task* t) {
-    auto data = t->get_next_token_from(SA_to_B1)->get_unique_data<double>();
+  B1->on_this_start_cb([&SA_to_B1](sg4::Task* t) {
+    auto data = t->get_token_from(SA_to_B1)->get_data<double>();
+    t->deque_token_from(SA_to_B1);
     t->set_amount(*data * 10);
   });
-  B2->on_this_start_cb([SA_to_B2](sg4::Task* t) {
-    auto data = t->get_next_token_from(SA_to_B2)->get_unique_data<double>();
+  B2->on_this_start_cb([&SA_to_B2](sg4::Task* t) {
+    auto data = t->get_token_from(SA_to_B2)->get_data<double>();
+    t->deque_token_from(SA_to_B2);
     t->set_amount(*data * 10);
   });
 
index d7c364a..376dc31 100644 (file)
@@ -24,11 +24,11 @@ $ ${bindir:=.}/s4u-task-storm ${platfdir}/small_platform.xml
 > [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5)
 > [2.619232] [task_storm/INFO] Task B3 finished (1)
 > [6.743624] [task_storm/INFO] Task B3 finished (2)
-> [10.868015] [task_storm/INFO] Task B3 finished (3)
 > [10.868015] [task_storm/INFO] Task B4 finished (1)
+> [10.868015] [task_storm/INFO] Task B3 finished (3)
 > [14.992407] [task_storm/INFO] Task B3 finished (4)
-> [19.116799] [task_storm/INFO] Task B3 finished (5)
 > [19.116799] [task_storm/INFO] Task B4 finished (2)
+> [19.116799] [task_storm/INFO] Task B3 finished (5)
 > [23.241190] [task_storm/INFO] Task B4 finished (3)
 > [27.365582] [task_storm/INFO] Task B4 finished (4)
 > [31.489974] [task_storm/INFO] Task B4 finished (5)
index b007523..c236bdb 100644 (file)
@@ -54,12 +54,19 @@ int main(int argc, char* argv[])
   // successors to comm0
   comm0->on_this_start_cb([&comm0, exec1, exec2, jupiter, fafard](const sg4::Task*) {
     static int count = 0;
-    if (count % 2 == 0) {
+    if (count % 2 == 0)
       comm0->set_destination(jupiter);
+    else
+      comm0->set_destination(fafard);
+    count++;
+  });
+
+  comm0->on_this_completion_cb([&comm0, exec1, exec2](const sg4::Task*) {
+    static int count = 0;
+    if (count % 2 == 0) {
       comm0->add_successor(exec1);
       comm0->remove_successor(exec2);
     } else {
-      comm0->set_destination(fafard);
       comm0->add_successor(exec2);
       comm0->remove_successor(exec1);
     }
index e75215a..d3ab8c1 100644 (file)
@@ -18,7 +18,7 @@ def parse():
     return parser.parse_args()
 
 def callback(t):
-    print(f'[{Engine.clock}] {t} finished ({t.count})')
+    print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
 
 if __name__ == '__main__':
     args = parse()
index 23e9fc0..beca2b6 100644 (file)
@@ -28,7 +28,7 @@ def parse():
     return parser.parse_args()
 
 def callback(t):
-    print(f'[{Engine.clock}] {t} finished ({t.count})')
+    print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
 
 if __name__ == '__main__':
     args = parse()
index 5a27a53..f9a828f 100644 (file)
@@ -5,5 +5,5 @@ $ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/task-simple.py --p
 > [11.714617112501687] CommTask(comm) finished (1)
 > [20.388399000968448] ExecTask(exec1) finished (2)
 > [21.90881661298591] CommTask(comm) finished (2)
-> [24.82146412938331] ExecTask(exec2) finished (1)
-> [37.92831114626493] ExecTask(exec2) finished (2)
+> [24.821464129383305] ExecTask(exec2) finished (1)
+> [37.928311146264925] ExecTask(exec2) finished (2)
index 5be8922..03dce6a 100644 (file)
@@ -44,12 +44,16 @@ def parse():
     return parser.parse_args()
 
 def callback(t):
-    print(f'[{Engine.clock}] {t} finished ({t.count})')
+    print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
 
-def switch(t, hosts, execs):
-    comm0.destination = hosts[t.count % 2]
-    comm0.remove_successor(execs[t.count % 2 - 1])
-    comm0.add_successor(execs[t.count % 2])
+def switch_destination(t, hosts):
+    t.destination = hosts[switch_destination.count % 2]
+    switch_destination.count += 1
+switch_destination.count = 0
+
+def switch_successor(t, execs):
+    t.remove_successor(execs[t.get_count() % 2])
+    t.add_successor(execs[t.get_count() % 2 - 1])
 
 if __name__ == '__main__':
     args = parse()
@@ -74,13 +78,16 @@ if __name__ == '__main__':
     exec1.add_successor(comm1)
     exec2.add_successor(comm2)
 
-    # Add a function to be called when tasks end for log purpose
+    # Add a callback when tasks end for log purpose
     Task.on_completion_cb(callback)
 
-    # Add a function to be called before each firing of comm0
-    # This function modifies the graph of tasks by adding or removing
-    # successors to comm0
-    comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2]))
+    # Add a callback before each firing of comm0
+    # It switches the destination of comm0
+    comm0.on_this_start_cb(lambda t: switch_destination(t, [jupiter, fafard]))
+
+    # Add a callback before comm0 send tokens to successors
+    # It switches the successor of comm0
+    comm0.on_this_completion_cb(lambda t: switch_successor(t, [exec1,exec2]))
 
     # Enqueue two firings for task exec1
     comm0.enqueue_firings(4)
index 51dbc1a..14fc2ec 100644 (file)
@@ -28,7 +28,7 @@ def parse():
     return parser.parse_args()
 
 def callback(t):
-    print(f'[{Engine.clock}] {t} finished ({t.count})')
+    print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
 
 def variable_load(t):
     print('--- Small load ---')
index ece38f5..3a62857 100644 (file)
@@ -22,10 +22,10 @@ $ $VALGRIND_NO_LEAK_CHECK ../../../smpi_script/bin/smpirun -quiet -wrapper "${bi
 > [0.000000] [ker_engine/INFO] 2 actors are still running, waiting for something.
 > [0.000000] [ker_engine/INFO] Legend of the following listing: "Actor <pid> (<name>@<host>): <status>"
 > [0.000000] [ker_engine/INFO] Actor 1 (0@node-0.simgrid.org) simcall CommWait(comm_id:1 src:1 dst:-1 mbox:SMPI-2(id:2))
-> [0.000000] [ker_engine/INFO] Actor 2 (1@node-1.simgrid.org) simcall CommWait(comm_id:2 src:2 dst:-1 mbox:SMPI-1(id:0))
+> [0.000000] [ker_engine/INFO] Actor 2 (1@node-1.simgrid.org) simcall CommWait(comm_id:2 src:2 dst:-1 mbox:SMPI-1(id:3))
 > [0.000000] [mc_global/INFO] Counter-example execution trace:
 > [0.000000] [mc_global/INFO]   Actor 1 in :0:() ==> simcall: iSend(mbox=2)
-> [0.000000] [mc_global/INFO]   Actor 2 in :0:() ==> simcall: iSend(mbox=0)
+> [0.000000] [mc_global/INFO]   Actor 2 in :0:() ==> simcall: iSend(mbox=3)
 > [0.000000] [mc_Session/INFO] You can debug the problem (and see the whole details) by rerunning out of simgrid-mc with --cfg=model-check/replay:'1;2'
 > [0.000000] [mc_dfs/INFO] DFS exploration ended. 3 unique states visited; 0 backtracks (0 transition replays, 3 states visited overall)
 > Execution failed with code 3.
index 98811c2..7d9d7b5 100644 (file)
@@ -5,7 +5,7 @@ find_package(Threads REQUIRED)
 #########################################################################
 
 foreach(x
-        mutex-simple
+        mutex-simple mutex-recursive
        producer-consumer)
 
   if("${CMAKE_SYSTEM}" MATCHES "Linux")
diff --git a/examples/sthread/pthread-mc-mutex-recursive.tesh b/examples/sthread/pthread-mc-mutex-recursive.tesh
new file mode 100644 (file)
index 0000000..984a4b5
--- /dev/null
@@ -0,0 +1,15 @@
+# We ignore the LD_PRELOAD lines from the expected output because they contain the build path
+! ignore .*LD_PRELOAD.*
+
+$ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-mutex-recursive
+> sthread is intercepting the execution of ./pthread-mutex-recursive
+> [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor.
+> Got the lock on the default mutex.
+> Failed to relock the default mutex.
+> Got the lock on the recursive mutex.
+> Got the lock again on the recursive mutex.
+> Got the lock on the default mutex.
+> Failed to relock the default mutex.
+> Got the lock on the recursive mutex.
+> Got the lock again on the recursive mutex.
+> [0.000000] [mc_dfs/INFO] DFS exploration ended. 17 unique states visited; 1 backtracks (3 transition replays, 21 states visited overall)
index 55ecc95..9e38cea 100644 (file)
@@ -3,7 +3,7 @@
 ! ignore .*LD_PRELOAD.*
 
 $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-mutex-simple
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-mutex-simple
 > All threads are started.
 > [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor.
 > The thread 0 is terminating.
index 2de4e42..e843a40 100644 (file)
@@ -6,7 +6,7 @@
 ! ignore .*LD_PRELOAD.*
 
 $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-mutex-simpledeadlock
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-mutex-simpledeadlock
 > All threads are started.
 > [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor.
 > The thread 0 is terminating.
index a9f5a72..584b7a5 100644 (file)
@@ -2,18 +2,18 @@
 ! ignore .*LD_PRELOAD.*
 
 $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-producer-consumer -q  -C 1 -P 1
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-producer-consumer
 > [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor.
 > [0.000000] [mc_dfs/INFO] DFS exploration ended. 786 unique states visited; 97 backtracks (2049 transition replays, 2932 states visited overall)
 
 $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/reduction:sdpor --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-producer-consumer -q  -C 1 -P 1
 > [0.000000] [xbt_cfg/INFO] Configuration change: Set 'model-check/reduction' to 'sdpor'
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-producer-consumer
 > [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: sdpor.
 > [0.000000] [mc_dfs/INFO] DFS exploration ended. 1186 unique states visited; 157 backtracks (3403 transition replays, 4746 states visited overall)
 
 $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/reduction:odpor --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-producer-consumer -q  -C 1 -P 1
 > [0.000000] [xbt_cfg/INFO] Configuration change: Set 'model-check/reduction' to 'odpor'
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-producer-consumer
 > [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: odpor.
 > [0.000000] [mc_dfs/INFO] DFS exploration ended. 39 unique states visited; 0 backtracks (0 transition replays, 39 states visited overall)
diff --git a/examples/sthread/pthread-mutex-recursive.c b/examples/sthread/pthread-mutex-recursive.c
new file mode 100644 (file)
index 0000000..482cf3e
--- /dev/null
@@ -0,0 +1,65 @@
+/* Copyright (c) 2002-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* Code with both recursive and non-recursive mutexes */
+
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+// Structure to hold the mutex's name and pointer to the actual mutex
+typedef struct {
+  const char* name;
+  pthread_mutex_t* mutex;
+} ThreadData;
+
+static void* thread_function(void* arg)
+{
+  ThreadData* data       = (ThreadData*)arg;
+  pthread_mutex_t* mutex = data->mutex;
+  const char* name       = data->name;
+
+  pthread_mutex_lock(mutex);
+  fprintf(stderr, "Got the lock on the %s mutex.\n", name);
+
+  // Attempt to relock the mutex - This behavior depends on the mutex type
+  if (pthread_mutex_trylock(mutex) == 0) {
+    fprintf(stderr, "Got the lock again on the %s mutex.\n", name);
+    pthread_mutex_unlock(mutex);
+  } else {
+    fprintf(stderr, "Failed to relock the %s mutex.\n", name);
+  }
+
+  pthread_mutex_unlock(mutex);
+
+  // pthread_exit(NULL); TODO: segfaulting
+  return NULL;
+}
+
+int main()
+{
+  pthread_t thread1, thread2;
+  pthread_mutex_t mutex_dflt = PTHREAD_MUTEX_INITIALIZER; // Non-recursive mutex
+
+  pthread_mutexattr_t attr;
+  pthread_mutexattr_init(&attr);
+  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
+  pthread_mutex_t mutex_rec;
+  pthread_mutex_init(&mutex_rec, &attr);
+
+  ThreadData data1 = {"default", &mutex_dflt};
+  ThreadData data2 = {"recursive", &mutex_rec};
+
+  pthread_create(&thread1, NULL, thread_function, &data1);
+  pthread_create(&thread2, NULL, thread_function, &data2);
+
+  pthread_join(thread1, NULL);
+  pthread_join(thread2, NULL);
+
+  pthread_mutex_destroy(&mutex_dflt);
+  pthread_mutex_destroy(&mutex_rec);
+
+  return 0;
+}
diff --git a/examples/sthread/pthread-mutex-recursive.tesh b/examples/sthread/pthread-mutex-recursive.tesh
new file mode 100644 (file)
index 0000000..ca535da
--- /dev/null
@@ -0,0 +1,7 @@
+$ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-mutex-recursive
+> sthread is intercepting the execution of ./pthread-mutex-recursive
+> Got the lock on the default mutex.
+> Failed to relock the default mutex.
+> Got the lock on the recursive mutex.
+> Got the lock again on the recursive mutex.
+> [0.000000] [sthread/INFO] All threads exited. Terminating the simulation.
index 1d39141..a7dea6b 100644 (file)
@@ -1,3 +1,8 @@
+/* Copyright (c) 2002-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
 /* Simple test code with no bug  */
 
 #include <pthread.h>
index 29d66a9..2e12ec1 100644 (file)
@@ -1,5 +1,5 @@
 $ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-mutex-simple
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-mutex-simple
 > All threads are started.
 > The thread 0 is terminating.
 > The thread 1 is terminating.
index 09be6c1..92a04a1 100644 (file)
@@ -1,3 +1,8 @@
+/* Copyright (c) 2002-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
 /* Simple test code that may deadlock:
 
    Thread 1 locks mutex1 then mutex2 while thread 2 locks in reverse order.
index a54bed0..d8bf22c 100644 (file)
@@ -1,5 +1,5 @@
 $ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-producer-consumer
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-producer-consumer
 > Producer 1: Insert Item 0 at 0
 > Producer 2: Insert Item 0 at 1
 > Consumer 1: Remove Item 0 from 0
@@ -15,7 +15,7 @@ $ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.
 > [0.000000] [sthread/INFO] All threads exited. Terminating the simulation.
 
 $ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-producer-consumer -c 2 -C 1 -p 2 -P 1
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-producer-consumer
 > Producer 1: Insert Item 0 at 0
 > Consumer 1: Remove Item 0 from 0
 > Producer 1: Insert Item 1 at 1
index 06e49c8..246095e 100644 (file)
@@ -6,13 +6,15 @@
 std::vector<int> v = {1, 2, 3, 5, 8, 13};
 
 extern "C" {
-extern int sthread_access_begin(void* addr, const char* objname, const char* file, int line) __attribute__((weak));
-extern void sthread_access_end(void* addr, const char* objname, const char* file, int line) __attribute__((weak));
+extern int sthread_access_begin(void* addr, const char* objname, const char* file, int line, const char* func)
+    __attribute__((weak));
+extern void sthread_access_end(void* addr, const char* objname, const char* file, int line, const char* func)
+    __attribute__((weak));
 }
 
 #define STHREAD_ACCESS(obj)                                                                                            \
-  for (bool first = sthread_access_begin(static_cast<void*>(obj), #obj, __FILE__, __LINE__) || true; first;            \
-       sthread_access_end(static_cast<void*>(obj), #obj, __FILE__, __LINE__), first = false)
+  for (bool first = sthread_access_begin(static_cast<void*>(obj), #obj, __FILE__, __LINE__, __FUNCTION__) || true;     \
+       first; sthread_access_end(static_cast<void*>(obj), #obj, __FILE__, __LINE__, __FUNCTION__), first = false)
 
 static void thread_code()
 {
index 457238c..bb8e1b7 100644 (file)
@@ -5,7 +5,7 @@
 ! ignore .*LD_PRELOAD.*
 
 $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsthread.so ${bindir:=.}/stdobject "--log=root.fmt:[%11.6r]%e(%a@%h)%e%m%n" --log=no_loc
-> [   0.000000] (maestro@) Starting the simulation.
+> sthread is intercepting the execution of ./stdobject
 > starting two helpers...
 > waiting for helpers to finish...
 > [   0.000000] (maestro@) Start a DFS exploration. Reduction is: dpor.
@@ -16,7 +16,7 @@ $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../../bin/simgrid-mc --cfg=model-c
 > v = { 1, 2, 3, 5, 8, 13, 21, 21, }; 
 > [   0.000000] (maestro@) thread 1 takes &v
 > [   0.000000] (maestro@) thread 2 takes &v
-> [   0.000000] (maestro@) Unprotected concurent access to &v: thread 1 vs thread 2 (locations hidden because of --log=no_loc).
+> [   0.000000] (maestro@) Unprotected concurent access to &v: thread 1 from 1 location vs thread 2 (locations hidden because of --log=no_loc).
 > [   0.000000] (maestro@) **************************
 > [   0.000000] (maestro@) *** PROPERTY NOT VALID ***
 > [   0.000000] (maestro@) **************************
index ffa04c1..c44bd97 100644 (file)
@@ -1,5 +1,5 @@
 $ ./sthread-mutex-simple
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./sthread-mutex-simple
 > All threads are started.
 > The thread 0 is terminating.
 > The thread 1 is terminating.
index 85fe6a7..d4486a2 100644 (file)
@@ -123,6 +123,7 @@ using ActorCodeFactory = std::function<ActorCode(std::vector<std::string> args)>
 
 class Simcall;
 class SimcallObserver;
+class MutexObserver;
 class ObjectAccessSimcallObserver;
 class ObjectAccessSimcallItem;
 } // namespace actor
@@ -197,6 +198,7 @@ class StandardLinkImpl;
 class SplitDuplexLinkImpl;
 class NetworkAction;
 class DiskImpl;
+using DiskImplPtr = boost::intrusive_ptr<DiskImpl>;
 class DiskModel;
 class VirtualMachineImpl;
 class VMModel;
index 7791cd7..06f04c2 100644 (file)
@@ -12,6 +12,8 @@
 namespace simgrid::s4u {
 
 /** @brief A classical mutex, but blocking in the simulation world.
+ *
+ * S4U mutexes are not recursive. If an actor tries to lock the same object twice, it deadlocks with itself.
  *
  * @beginrst
  * It is strictly impossible to use a real mutex, such as
@@ -48,7 +50,8 @@ class XBT_PUBLIC Mutex {
 
 public:
   /** \static Constructs a new mutex */
-  static MutexPtr create();
+  static MutexPtr create(bool recursive = false);
+
   void lock();
   void unlock();
   bool try_lock();
index b272312..770b70a 100644 (file)
@@ -248,8 +248,29 @@ struct ClusterCallbacks {
    * @param id: Internal identifier of the element
    * @return pair<NetPoint*, NetPoint*>: returns a pair of netpoint and gateway.
    */
+  // XBT_ATTRIB_DEPRECATED_v339
   using ClusterNetPointCb = std::pair<kernel::routing::NetPoint*, kernel::routing::NetPoint*>(
       NetZone* zone, const std::vector<unsigned long>& coord, unsigned long id);
+
+   /**
+   * @brief Callback used to set the NetZone located at some leaf of clusters (Torus, FatTree, etc)
+   *
+   * @param zone: The parent zone, needed for creating new resources (hosts, links)
+   * @param coord: the coordinates of the element
+   * @param id: Internal identifier of the element
+   * @return NetZone*: returns newly created netzone
+   */
+  using ClusterNetZoneCb = NetZone*(NetZone* zone, const std::vector<unsigned long>& coord, unsigned long id);
+  /**
+   * @brief Callback used to set the Host located at some leaf of clusters (Torus, FatTree, etc)
+   *
+   * @param zone: The parent zone, needed for creating new resources (hosts, links)
+   * @param coord: the coordinates of the element
+   * @param id: Internal identifier of the element
+   * @return Host*: returns newly created host
+   */
+  using ClusterHostCb = Host*(NetZone* zone, const std::vector<unsigned long>& coord, unsigned long id);
+
   /**
    * @brief Callback used to set the links for some leaf of the cluster (Torus, FatTree, etc)
    *
@@ -267,14 +288,36 @@ struct ClusterCallbacks {
    */
   using ClusterLinkCb = Link*(NetZone* zone, const std::vector<unsigned long>& coord, unsigned long id);
 
-  std::function<ClusterNetPointCb> netpoint;
+  bool by_netzone_ = false;
+  bool is_by_netzone() const { return by_netzone_; }
+  bool by_netpoint_ = false; // XBT_ATTRIB_DEPRECATED_v339
+  bool is_by_netpoint() const { return by_netpoint_; } // XBT_ATTRIB_DEPRECATED_v339
+  std::function<ClusterNetPointCb> netpoint; // XBT_ATTRIB_DEPRECATED_v339
+  std::function<ClusterHostCb> host;
+  std::function<ClusterNetZoneCb> netzone;
   std::function<ClusterLinkCb> loopback = {};
   std::function<ClusterLinkCb> limiter  = {};
+  explicit ClusterCallbacks(const std::function<ClusterNetZoneCb>& set_netzone)
+      : by_netzone_(true), netzone(set_netzone){/* nothing to do */};
+
+  ClusterCallbacks(const std::function<ClusterNetZoneCb>& set_netzone,
+                   const std::function<ClusterLinkCb>& set_loopback, const std::function<ClusterLinkCb>& set_limiter)
+      :  by_netzone_(true), netzone(set_netzone), loopback(set_loopback), limiter(set_limiter){/* nothing to do */};
+
+  explicit ClusterCallbacks(const std::function<ClusterHostCb>& set_host)
+      : host(set_host) {/* nothing to do */};
+
+  ClusterCallbacks(const std::function<ClusterHostCb>& set_host,
+                   const std::function<ClusterLinkCb>& set_loopback, const std::function<ClusterLinkCb>& set_limiter)
+      :  host(set_host), loopback(set_loopback), limiter(set_limiter){/* nothing to do */};
+
+  XBT_ATTRIB_DEPRECATED_v339("Please use callback with either a Host/NetZone creation function as first parameter")
   explicit ClusterCallbacks(const std::function<ClusterNetPointCb>& set_netpoint)
-      : netpoint(set_netpoint){/*nothing to do */};
+      : by_netpoint_(true), netpoint(set_netpoint){/* nothing to do */};
+  XBT_ATTRIB_DEPRECATED_v339("Please use callback with either a Host/NetZone creation function as first parameter")
   ClusterCallbacks(const std::function<ClusterNetPointCb>& set_netpoint,
                    const std::function<ClusterLinkCb>& set_loopback, const std::function<ClusterLinkCb>& set_limiter)
-      : netpoint(set_netpoint), loopback(set_loopback), limiter(set_limiter){/*nothing to do */};
+      : by_netpoint_(true), netpoint(set_netpoint), loopback(set_loopback), limiter(set_limiter){/* nothing to do */};
 };
 /**
  * @brief Create a torus zone
index f2b6eb6..03ee265 100644 (file)
@@ -10,6 +10,7 @@
 #include <map>
 #include <memory>
 #include <set>
+#include <xbt/asserts.h>
 
 namespace simgrid::s4u {
 
@@ -27,23 +28,29 @@ using IoTaskPtr = boost::intrusive_ptr<IoTask>;
 class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
 
 class Task {
+
   std::string name_;
-  double amount_;
-  int queued_firings_     = 0;
-  int count_              = 0;
-  int running_instances_  = 0;
-  int parallelism_degree_ = 1;
+
+  std::map<std::string, double> amount_              = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}};
+  std::map<std::string, int> queued_firings_         = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}};
+  std::map<std::string, int> running_instances_      = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}};
+  std::map<std::string, int> count_                  = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}};
+  std::map<std::string, int> parallelism_degree_     = {{"instance_0", 1}, {"dispatcher", 1}, {"collector", 1}};
+  std::map<std::string, int> internal_bytes_to_send_ = {{"instance_0", 0}, {"dispatcher", 0}};
+
+  std::function<std::string()> load_balancing_function_;
 
   std::set<Task*> successors_                 = {};
   std::map<Task*, unsigned int> predecessors_ = {};
   std::atomic_int_fast32_t refcount_{0};
 
-  bool ready_to_run() const;
+  bool ready_to_run(std::string instance);
   void receive(Task* source);
 
   std::shared_ptr<Token> token_ = nullptr;
-  std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
-  std::deque<ActivityPtr> current_activities_;
+  std::map<TaskPtr, std::deque<std::shared_ptr<Token>>> tokens_received_;
+  std::map<std::string, std::deque<ActivityPtr>> current_activities_ = {
+      {"instance_0", {}}, {"dispatcher", {}}, {"collector", {}}};
 
   inline static xbt::signal<void(Task*)> on_start;
   xbt::signal<void(Task*)> on_this_start;
@@ -54,23 +61,33 @@ protected:
   explicit Task(const std::string& name);
   virtual ~Task() = default;
 
-  virtual void fire();
-  void complete();
+  virtual void fire(std::string instance);
+  void complete(std::string instance);
 
-  void store_activity(ActivityPtr a) { current_activities_.push_back(a); }
+  void store_activity(ActivityPtr a, std::string instance) { current_activities_[instance].push_back(a); }
+
+  virtual void add_instances(int n);
+  virtual void remove_instances(int n);
 
 public:
   void set_name(std::string name);
   const std::string& get_name() const { return name_; }
   const char* get_cname() const { return name_.c_str(); }
-  void set_amount(double amount);
-  double get_amount() const { return amount_; }
-  int get_count() const { return count_; }
-  void set_parallelism_degree(int n);
-  int get_parallelism_degree() const { return parallelism_degree_; }
+  void set_amount(double amount, std::string instance = "instance_0");
+  double get_amount(std::string instance = "instance_0") const { return amount_.at(instance); }
+  int get_queued_firings(std::string instance = "instance_0") { return queued_firings_.at(instance); }
+  int get_running_count(std::string instance = "instance_0") { return running_instances_.at(instance); }
+  int get_count(std::string instance = "collector") const { return count_.at(instance); }
+  void set_parallelism_degree(int n, std::string instance = "all");
+  int get_parallelism_degree(std::string instance = "instance_0") const { return parallelism_degree_.at(instance); }
+  void set_internal_bytes(int bytes, std::string instance = "instance_0");
+  double get_internal_bytes(std::string instance = "instance_0") const { return internal_bytes_to_send_.at(instance); }
+  void set_load_balancing_function(std::function<std::string()> func);
 
   void set_token(std::shared_ptr<Token> token);
-  std::shared_ptr<Token> get_next_token_from(TaskPtr t);
+  std::shared_ptr<Token> get_token_from(TaskPtr t) const { return tokens_received_.at(t).front(); }
+  std::deque<std::shared_ptr<Token>> get_tokens_from(TaskPtr t) const { return tokens_received_.at(t); }
+  void deque_token_from(TaskPtr t);
 
   void add_successor(TaskPtr t);
   void remove_successor(TaskPtr t);
@@ -107,7 +124,7 @@ class CommTask : public Task {
   Host* destination_;
 
   explicit CommTask(const std::string& name);
-  void fire() override;
+  void fire(std::string instance) override;
 
 public:
   static CommTaskPtr init(const std::string& name);
@@ -118,30 +135,33 @@ public:
   CommTaskPtr set_destination(Host* destination);
   Host* get_destination() const { return destination_; }
   CommTaskPtr set_bytes(double bytes);
-  double get_bytes() const { return get_amount(); }
+  double get_bytes() const { return get_amount("instance_0"); }
 };
 
 class ExecTask : public Task {
-  Host* host_;
+  std::map<std::string, Host*> host_ = {{"instance_0", nullptr}, {"dispatcher", nullptr}, {"collector", nullptr}};
 
   explicit ExecTask(const std::string& name);
-  void fire() override;
+  void fire(std::string instance) override;
 
 public:
   static ExecTaskPtr init(const std::string& name);
   static ExecTaskPtr init(const std::string& name, double flops, Host* host);
 
-  ExecTaskPtr set_host(Host* host);
-  Host* get_host() const { return host_; }
-  ExecTaskPtr set_flops(double flops);
-  double get_flops() const { return get_amount(); }
+  ExecTaskPtr set_host(Host* host, std::string instance = "all");
+  Host* get_host(std::string instance = "instance_0") const { return host_.at(instance); }
+  ExecTaskPtr set_flops(double flops, std::string instance = "instance_0");
+  double get_flops(std::string instance = "instance_0") const { return get_amount(instance); }
+
+  void add_instances(int n) override;
+  void remove_instances(int n) override;
 };
 
 class IoTask : public Task {
   Disk* disk_;
   Io::OpType type_;
   explicit IoTask(const std::string& name);
-  void fire() override;
+  void fire(std::string instance) override;
 
 public:
   static IoTaskPtr init(const std::string& name);
@@ -150,7 +170,7 @@ public:
   IoTaskPtr set_disk(Disk* disk);
   Disk* get_disk() const { return disk_; }
   IoTaskPtr set_bytes(double bytes);
-  double get_bytes() const { return get_amount(); }
+  double get_bytes() const { return get_amount("instance_0"); }
   IoTaskPtr set_op_type(Io::OpType type);
   Io::OpType get_op_type() const { return type_; }
 };
index 792ae4b..53065f0 100644 (file)
@@ -750,7 +750,8 @@ PYBIND11_MODULE(simgrid, m)
   py::class_<Mutex, MutexPtr>(m, "Mutex",
                               "A classical mutex, but blocking in the simulation world."
                               "See the C++ documentation for details.")
-      .def(py::init<>(&Mutex::create), py::call_guard<py::gil_scoped_release>(), "Mutex constructor.")
+      .def(py::init<>(&Mutex::create), py::call_guard<py::gil_scoped_release>(),
+           "Mutex constructor (pass True as a parameter to get a recursive Mutex).", py::arg("recursive") = false)
       .def("lock", &Mutex::lock, py::call_guard<py::gil_scoped_release>(), "Block until the mutex is acquired.")
       .def("try_lock", &Mutex::try_lock, py::call_guard<py::gil_scoped_release>(),
            "Try to acquire the mutex. Return true if the mutex was acquired, false otherwise.")
@@ -861,9 +862,14 @@ PYBIND11_MODULE(simgrid, m)
           },
           "Add a callback called when each task ends.")
       .def_property_readonly("name", &Task::get_name, "The name of this task (read-only).")
-      .def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).")
       .def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).")
       .def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.")
+      .def(
+          "get_count", [](const TaskPtr t) { return t->get_count("instance_0"); },
+          "The execution count of this task instance_0.")
+      .def(
+          "get_count", [](const TaskPtr t, const std::string& instance) { return t->get_count(instance); },
+          "The execution count of this task instance.")
       .def("enqueue_firings", py::overload_cast<int>(&Task::enqueue_firings), py::call_guard<py::gil_scoped_release>(),
            py::arg("n"), "Enqueue firings for this task.")
       .def("add_successor", py::overload_cast<TaskPtr>(&Task::add_successor), py::call_guard<py::gil_scoped_release>(),
index cb49f3b..aaeeb83 100644 (file)
@@ -46,7 +46,7 @@ void MailboxImpl::set_receiver(s4u::ActorPtr actor)
 /** @brief Pushes a communication activity into a mailbox
  *  @param comm What to add
  */
-void MailboxImpl::push(CommImplPtr comm)
+void MailboxImpl::push(const CommImplPtr& comm)
 {
   comm->set_mailbox(this);
   this->comm_queue_.push_back(std::move(comm));
@@ -61,12 +61,11 @@ void MailboxImpl::remove(const CommImplPtr& comm)
              (comm->get_mailbox() ? comm->get_mailbox()->get_cname() : "(null)"), this->get_cname());
 
   comm->set_mailbox(nullptr);
-  for (auto it = this->comm_queue_.begin(); it != this->comm_queue_.end(); it++)
-    if (*it == comm) {
-      this->comm_queue_.erase(it);
-      return;
-    }
-  xbt_die("Comm %p not found in mailbox %s", comm.get(), this->get_cname());
+  auto it = std::find(this->comm_queue_.begin(), this->comm_queue_.end(), comm);
+  if (it != this->comm_queue_.end())
+    this->comm_queue_.erase(it);
+  else
+    xbt_die("Comm %p not found in mailbox %s", comm.get(), this->get_cname());
 }
 
 /** @brief Removes all communication activities from a mailbox
@@ -74,7 +73,7 @@ void MailboxImpl::remove(const CommImplPtr& comm)
 void MailboxImpl::clear(bool do_finish)
 {
   // CommImpl::cancel() will remove the comm from the mailbox..
-  for (auto comm : done_comm_queue_) {
+  for (const auto& comm : done_comm_queue_) {
     comm->cancel();
     comm->set_state(State::FAILED);
     if (do_finish)
index 199fcd1..9d30a4e 100644 (file)
@@ -6,26 +6,32 @@
 #ifndef SIMGRID_KERNEL_ACTIVITY_MAILBOX_HPP
 #define SIMGRID_KERNEL_ACTIVITY_MAILBOX_HPP
 
+#include "simgrid/config.h" /* FIXME: KILLME. This makes the ABI config-dependent, but mandatory for the hack below */
 #include "simgrid/s4u/Engine.hpp"
 #include "simgrid/s4u/Mailbox.hpp"
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
 
-#include <boost/circular_buffer.hpp>
-
 namespace simgrid::kernel::activity {
 
 /** @brief Implementation of the s4u::Mailbox */
 
 class MailboxImpl {
-  static constexpr size_t MAX_MAILBOX_SIZE = 10000000;
-
   s4u::Mailbox piface_;
   std::string name_;
   actor::ActorImplPtr permanent_receiver_; // actor to which the mailbox is attached
-  boost::circular_buffer_space_optimized<CommImplPtr> comm_queue_{MAX_MAILBOX_SIZE};
+#if SIMGRID_HAVE_STATEFUL_MC
+  /* Using deque here is faster in benchmarks, but break the state equality heuristic of Liveness checking on Debian
+   * testing. This would desserve a proper investiguation, but simply use a single-sided list for the time being. HACK.
+   */
+  std::list<CommImplPtr> comm_queue_;
   // messages already received in the permanent receive mode
-  boost::circular_buffer_space_optimized<CommImplPtr> done_comm_queue_{MAX_MAILBOX_SIZE};
+  std::list<CommImplPtr> done_comm_queue_;
+#else
+  std::deque<CommImplPtr> comm_queue_;
+  // messages already received in the permanent receive mode
+  std::deque<CommImplPtr> done_comm_queue_;
+#endif
 
   friend s4u::Engine;
   friend s4u::Mailbox;
@@ -50,8 +56,8 @@ public:
   const std::string& get_name() const { return name_; }
   const char* get_cname() const { return name_.c_str(); }
   void set_receiver(s4u::ActorPtr actor);
-  void push(CommImplPtr comm);
-  void push_done(CommImplPtr done_comm) { done_comm_queue_.push_back(done_comm); }
+  void push(const CommImplPtr& comm);
+  void push_done(const CommImplPtr& done_comm) { done_comm_queue_.push_back(done_comm); }
   void remove(const CommImplPtr& comm);
   void clear(bool do_finish);
   CommImplPtr iprobe(int type, const std::function<bool(void*, void*, CommImpl*)>& match_fun, void* data);
@@ -61,9 +67,9 @@ public:
   actor::ActorImplPtr get_permanent_receiver() const { return permanent_receiver_; }
   bool empty() const { return comm_queue_.empty(); }
   size_t size() const { return comm_queue_.size(); }
-  CommImplPtr front() const { return comm_queue_.front(); }
+  const CommImplPtr& front() const { return comm_queue_.front(); }
   bool has_some_done_comm() const { return not done_comm_queue_.empty(); }
-  CommImplPtr done_front() const { return done_comm_queue_.front(); }
+  const CommImplPtr& done_front() const { return done_comm_queue_.front(); }
 };
 } // namespace simgrid::kernel::activity
 
index 43a511b..c981c3e 100644 (file)
@@ -47,13 +47,41 @@ unsigned MutexImpl::next_id_ = 0;
 
 MutexAcquisitionImplPtr MutexImpl::lock_async(actor::ActorImpl* issuer)
 {
-  auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true);
-
-  if (owner_ != nullptr) {
-    /* Somebody is using the mutex; register the acquisition */
+  /* If the mutex is recursive */
+  if (is_recursive_) {
+    if (owner_ == issuer) {
+      recursive_depth++;
+      auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true);
+      res->grant();
+      return res;
+    } else if (owner_ == nullptr) { // Free
+      owner_          = issuer;
+      recursive_depth = 1;
+      auto res        = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true);
+      res->grant();
+      return res;
+    }
+
+    for (auto acq : ongoing_acquisitions_)
+      if (acq->get_issuer() == issuer) {
+        acq->recursive_depth_++;
+        return acq;
+      }
+
+    // Not yet in the ongoing acquisition list. Get in there
+    auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true);
     ongoing_acquisitions_.push_back(res);
-  } else {
+    return res;
+  }
+
+  // None-recursive mutex
+  auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true);
+  if (owner_ == nullptr) { // Lock is free, take it
     owner_  = issuer;
+    recursive_depth = 1;
+    res->grant();
+  } else { // Somebody is using the mutex; register the acquisition
+    ongoing_acquisitions_.push_back(res);
   }
   return res;
 }
@@ -65,14 +93,14 @@ MutexAcquisitionImplPtr MutexImpl::lock_async(actor::ActorImpl* issuer)
  */
 bool MutexImpl::try_lock(actor::ActorImpl* issuer)
 {
-  XBT_IN("(%p, %p)", this, issuer);
-  if (owner_ != nullptr) {
-    XBT_OUT();
-    return false;
+  if (owner_ == issuer && is_recursive_) {
+    recursive_depth++;
+    return true;
   }
+  if (owner_ != nullptr)
+    return false;
 
-  owner_  = issuer;
-  XBT_OUT();
+  owner_ = issuer;
   return true;
 }
 
@@ -88,12 +116,20 @@ void MutexImpl::unlock(actor::ActorImpl* issuer)
   xbt_assert(issuer == owner_, "Cannot release that mutex: you're not the owner. %s is (pid:%ld).",
              owner_ != nullptr ? owner_->get_cname() : "(nobody)", owner_ != nullptr ? owner_->get_pid() : -1);
 
+  if (is_recursive_) {
+    recursive_depth--;
+    if (recursive_depth > 0) // Still owning the lock
+      return;
+  }
+
   if (not ongoing_acquisitions_.empty()) {
     /* Give the ownership to the first waiting actor */
     auto acq = ongoing_acquisitions_.front();
     ongoing_acquisitions_.pop_front();
 
     owner_ = acq->get_issuer();
+    acq->grant();
+    recursive_depth = acq->recursive_depth_;
     if (acq == owner_->waiting_synchro_)
       acq->finish();
     // else, the issuer is not blocked on this acquisition so no need to release it
index 3eebec2..9d9242b 100644 (file)
@@ -42,11 +42,18 @@ namespace simgrid::kernel::activity {
 class XBT_PUBLIC MutexAcquisitionImpl : public ActivityImpl_T<MutexAcquisitionImpl> {
   actor::ActorImpl* issuer_ = nullptr;
   MutexImpl* mutex_         = nullptr;
+  int recursive_depth_      = 1;
+  // TODO: use granted_ this instead of owner_ == self to test().
+  // This is mandatory to get double-lock on non-recursive locks to properly deadlock
+  bool granted_ = false;
+
+  friend MutexImpl;
 
 public:
   MutexAcquisitionImpl(actor::ActorImpl* issuer, MutexImpl* mutex) : issuer_(issuer), mutex_(mutex) {}
   MutexImplPtr get_mutex() { return mutex_; }
   actor::ActorImpl* get_issuer() { return issuer_; }
+  void grant() { granted_ = true; }
 
   bool test(actor::ActorImpl* issuer = nullptr) override;
   void wait_for(actor::ActorImpl* issuer, double timeout) override;
@@ -63,11 +70,13 @@ class XBT_PUBLIC MutexImpl {
   std::deque<MutexAcquisitionImplPtr> ongoing_acquisitions_;
   static unsigned next_id_;
   unsigned id_ = next_id_++;
+  bool is_recursive_  = false;
+  int recursive_depth = 0;
 
   friend MutexAcquisitionImpl;
 
 public:
-  MutexImpl() : piface_(this) {}
+  MutexImpl(bool recursive = false) : piface_(this), is_recursive_(recursive) {}
   MutexImpl(MutexImpl const&) = delete;
   MutexImpl& operator=(MutexImpl const&) = delete;
 
index 55f6857..a1bdb8a 100644 (file)
@@ -50,6 +50,7 @@ class DiskImpl : public Resource_T<DiskImpl>, public xbt::PropertyHolder {
   Metric read_bw_      = {0.0, 0, nullptr};
   Metric write_bw_     = {0.0, 0, nullptr};
   double readwrite_bw_ = -1; /* readwrite constraint bound, usually max(read, write) */
+  std::atomic_int_fast32_t refcount_{0};
 
   void apply_sharing_policy_cfg();
 
@@ -60,6 +61,17 @@ public:
   explicit DiskImpl(const std::string& name, double read_bandwidth, double write_bandwidth);
   DiskImpl(const DiskImpl&) = delete;
   DiskImpl& operator=(const DiskImpl&) = delete;
+  friend void intrusive_ptr_add_ref(DiskImpl* disk)
+  {
+    disk->refcount_.fetch_add(1, std::memory_order_acq_rel);
+  }
+  friend void intrusive_ptr_release(DiskImpl* disk)
+  {
+    if (disk->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
+      std::atomic_thread_fence(std::memory_order_acquire);
+      delete disk;
+    }
+  }
 
   /** @brief Public interface */
   const s4u::Disk* get_iface() const { return &piface_; }
index 614bf9e..da11f9a 100644 (file)
@@ -8,6 +8,7 @@
 #include <simgrid/s4u/Host.hpp>
 
 #include "src/kernel/EngineImpl.hpp"
+#include "src/kernel/resource/NetworkModel.hpp"
 #include "src/kernel/resource/VirtualMachineImpl.hpp"
 #include "xbt/asserts.hpp"
 
@@ -16,7 +17,7 @@
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(res_host, ker_resource, "Host resources agregate CPU, networking and I/O features");
 
 /*************
- * Callbacks *t
+ * Callbacks *
  *************/
 
 namespace simgrid::kernel::resource {
@@ -24,6 +25,28 @@ namespace simgrid::kernel::resource {
 /*********
  * Model *
  *********/
+Action* HostModel::io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
+                                double size)
+{
+  auto* net_model = src_host->get_englobing_zone()->get_network_model();
+  auto* system    = net_model->get_maxmin_system();
+  auto* action   = net_model->communicate(src_host, dst_host, size, -1, true);
+
+  // We don't want to apply the network model bandwidth factor to the I/O constraints
+  double bw_factor = net_model->get_bandwidth_factor();
+  if (src_disk != nullptr) {
+    // FIXME: if the stream starts from a disk, we might not want to pay the network latency
+    system->expand(src_disk->get_constraint(), action->get_variable(), bw_factor);
+    system->expand(src_disk->get_read_constraint(), action->get_variable(), bw_factor);
+  }
+  if (dst_disk != nullptr) {
+    system->expand(dst_disk->get_constraint(), action->get_variable(), bw_factor);
+    system->expand(dst_disk->get_write_constraint(), action->get_variable(), bw_factor);
+  }
+
+  return action;
+}
+
 /************
  * Resource *
  ************/
@@ -53,9 +76,6 @@ HostImpl::~HostImpl()
     delete arg;
   actors_at_boot_.clear();
 
-  for (auto const& [_, d] : disks_)
-    d->destroy();
-
   for (auto const& [_, vm] : vms_)
     vm->vm_destroy();
 }
@@ -128,6 +148,7 @@ std::vector<s4u::ActorPtr> HostImpl::get_all_actors()
     res.emplace_back(actor.get_ciface());
   return res;
 }
+
 size_t HostImpl::get_actor_count() const
 {
   return actor_list_.size();
@@ -216,7 +237,7 @@ s4u::Disk* HostImpl::create_disk(const std::string& name, double read_bandwidth,
 
 void HostImpl::add_disk(const s4u::Disk* disk)
 {
-  disks_[disk->get_name()] = disk->get_impl();
+  disks_.insert({disk->get_name(), kernel::resource::DiskImplPtr(disk->get_impl())});
 }
 
 void HostImpl::remove_disk(const std::string& name)
index 0de2d0b..506a3d6 100644 (file)
@@ -29,8 +29,8 @@ public:
 
   virtual Action* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                                    const double* bytes_amount, double rate) = 0;
-  virtual Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
-                            double size)                                    = 0;
+  Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
+                            double size);
 };
 
 /************
@@ -49,7 +49,7 @@ class XBT_PRIVATE HostImpl : public xbt::PropertyHolder, public actor::ObjectAcc
   ActorList actor_list_;
   std::vector<actor::ProcessArg*> actors_at_boot_;
   s4u::Host piface_;
-  std::map<std::string, DiskImpl*, std::less<>> disks_;
+  std::map<std::string, DiskImplPtr, std::less<>> disks_;
   std::map<std::string, VirtualMachineImpl*, std::less<>> vms_;
   std::string name_{"noname"};
   routing::NetZoneImpl* englobing_zone_ = nullptr;
index 8d51c7d..781a33b 100644 (file)
@@ -99,7 +99,6 @@ public:
   {
     return nullptr;
   };
-  Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size) override { return nullptr; }
 };
 } // namespace kernel::resource
 } // namespace simgrid
index c5b47a1..5050b1f 100644 (file)
@@ -50,28 +50,6 @@ static inline double has_cost(const double* array, size_t pos)
   return -1.0;
 }
 
-Action* HostCLM03Model::io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
-                                  double size)
-{
-  auto* net_model = src_host->get_englobing_zone()->get_network_model();
-  auto* system    = net_model->get_maxmin_system();
-  auto* action   = net_model->communicate(src_host, dst_host, size, -1, true);
-
-  // We don't want to apply the network model bandwidth factor to the I/O constraints
-  double bw_factor = net_model->get_bandwidth_factor();
-  if (src_disk != nullptr) {
-    // FIXME: if the stream starts from a disk, we might not want to pay the network latency
-    system->expand(src_disk->get_constraint(), action->get_variable(), bw_factor);
-    system->expand(src_disk->get_read_constraint(), action->get_variable(), bw_factor);
-  }
-  if (dst_disk != nullptr) {
-    system->expand(dst_disk->get_constraint(), action->get_variable(), bw_factor);
-    system->expand(dst_disk->get_write_constraint(), action->get_variable(), bw_factor);
-  }
-
-  return action;
-}
-
 Action* HostCLM03Model::execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                                          const double* bytes_amount, double rate)
 {
index 65715c4..cc5fc9f 100644 (file)
@@ -22,8 +22,6 @@ public:
   Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override;
   Action* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                            const double* bytes_amount, double rate) override;
-  Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
-                    double size) override;
 };
 } // namespace simgrid::kernel::resource
 
index b206442..3d4c065 100644 (file)
@@ -119,8 +119,14 @@ static void zoneCreation_cb(simgrid::s4u::NetZone const& zone)
   wifiPhy.Set("Antennas", ns3::UintegerValue(nss_value));
   wifiPhy.Set("MaxSupportedTxSpatialStreams", ns3::UintegerValue(nss_value));
   wifiPhy.Set("MaxSupportedRxSpatialStreams", ns3::UintegerValue(nss_value));
-#if NS3_MINOR_VERSION > 33
+#if NS3_MINOR_VERSION < 33
+  // This fails with "The channel width does not uniquely identify an operating channel" on v3.34,
+  // so we specified the ChannelWidth of wifiPhy to 40, above, when creating wifiPhy with v3.34 and higher
+  ns3::Config::Set("/NodeList/*/DeviceList/*/$ns3::WifiNetDevice/Phy/ChannelWidth", ns3::UintegerValue(40));
+#elif NS3_MINOR_VERSION < 36
   wifiPhy.Set("ChannelWidth", ns3::UintegerValue(40));
+#else
+  wifiPhy.Set("ChannelSettings", ns3::StringValue("{0, 40, BAND_UNSPECIFIED, 0}"));
 #endif
   wifiMac.SetType("ns3::ApWifiMac", "Ssid", ns3::SsidValue(ssid));
 
@@ -166,12 +172,6 @@ static void zoneCreation_cb(simgrid::s4u::NetZone const& zone)
     ns3::Simulator::Schedule(ns3::Seconds(start_time_value), &resumeWifiDevice, device);
   }
 
-#if NS3_MINOR_VERSION < 33
-  // This fails with "The channel width does not uniquely identify an operating channel" on v3.34,
-  // so we specified the ChannelWidth of wifiPhy to 40, above, when creating wifiPhy with v3.34 and higher
-  ns3::Config::Set("/NodeList/*/DeviceList/*/$ns3::WifiNetDevice/Phy/ChannelWidth", ns3::UintegerValue(40));
-#endif
-
   mobility.SetPositionAllocator(positionAllocS);
   mobility.Install(nodes);
   ns3::Ipv4AddressHelper address;
index d0344f9..13f4986 100644 (file)
@@ -204,7 +204,8 @@ L07Action::L07Action(Model* model, const std::vector<s4u::Host*>& host_list, con
   XBT_DEBUG("Creating a parallel task (%p) with %zu hosts and %zu unique links.", this, host_nb, link_nb);
   latency_ = latency;
 
-  set_variable(model->get_maxmin_system()->variable_new(this, 1.0, (rate > 0 ? rate : -1.0), host_nb + link_nb));
+  // Allocate more space for constraints (+4) in case users want to mix ptasks and io streams
+  set_variable(model->get_maxmin_system()->variable_new(this, 1.0, (rate > 0 ? rate : -1.0), host_nb + link_nb + 4));
 
   if (latency_ > 0)
     model->get_maxmin_system()->update_variable_penalty(get_variable(), 0.0);
index 7bac5d6..0683aff 100644 (file)
@@ -42,11 +42,6 @@ public:
   Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override { return nullptr; }
   CpuAction* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
                               const double* bytes_amount, double rate) override;
-  Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
-                    double size) override
-  {
-    return nullptr;
-  }
 };
 
 class CpuL07Model : public CpuModel {
index 72f6dca..fa82dd4 100644 (file)
@@ -3,6 +3,7 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
+#include "simgrid/s4u/Host.hpp"
 #include "simgrid/kernel/routing/ClusterZone.hpp"
 #include "simgrid/kernel/routing/NetPoint.hpp"
 #include "src/kernel/resource/StandardLinkImpl.hpp"
@@ -82,7 +83,17 @@ void ClusterBase::fill_leaf_from_cb(unsigned long position, const std::vector<un
   kernel::routing::NetPoint* netpoint = nullptr;
   kernel::routing::NetPoint* gw       = nullptr;
   auto dims                           = index_to_dims(position);
-  std::tie(netpoint, gw)              = set_callbacks.netpoint(get_iface(), dims, position);
+  if (set_callbacks.is_by_netpoint()) { // XBT_ATTRIB_DEPRECATED_v339
+    std::tie(netpoint, gw) = set_callbacks.netpoint(get_iface(), dims, position); // XBT_ATTRIB_DEPRECATED_v339
+  } else if (set_callbacks.is_by_netzone()) {
+    s4u::NetZone* netzone = set_callbacks.netzone(get_iface(), dims, position);
+    netpoint              = netzone->get_netpoint();
+    gw                    = netzone->get_gateway();
+  } else {
+    s4u::Host* host = set_callbacks.host(get_iface(), dims, position);
+    netpoint        = host->get_netpoint();
+  }
+
   xbt_assert(netpoint, "set_netpoint(elem=%lu): Invalid netpoint (nullptr)", position);
   if (netpoint->is_netzone()) {
     xbt_assert(gw && not gw->is_netzone(),
index 3cc1c7a..76a955b 100644 (file)
@@ -6,17 +6,16 @@
 #ifndef NETZONE_TEST_HPP
 #define NETZONE_TEST_HPP
 
-#include "simgrid/kernel/routing/NetPoint.hpp"
-#include "simgrid/s4u/Host.hpp"
 #include "simgrid/s4u/NetZone.hpp"
+#include "xbt/log.h"
+XBT_LOG_EXTERNAL_CATEGORY(ker_platform);
 
 // Callback function common to several routing unit tests
 struct CreateHost {
-  std::pair<simgrid::kernel::routing::NetPoint*, simgrid::kernel::routing::NetPoint*>
-  operator()(simgrid::s4u::NetZone* zone, const std::vector<unsigned long>& /*coord*/, unsigned long id) const
+  simgrid::s4u::Host* operator()(simgrid::s4u::NetZone* zone, const std::vector<unsigned long>& /*coord*/,
+                                    unsigned long id) const
   {
-    const simgrid::s4u::Host* host = zone->create_host(std::to_string(id), 1e9)->seal();
-    return std::make_pair(host->get_netpoint(), nullptr);
+    return zone->create_host(std::to_string(id), "1Gf");
   }
 };
 
index 3dd5269..d953e00 100644 (file)
@@ -149,7 +149,7 @@ void sg_platf_new_disk(const simgrid::kernel::routing::DiskCreationArgs* disk)
 
 /*************************************************************************************************/
 /** @brief Auxiliary function to create hosts */
-static std::pair<simgrid::kernel::routing::NetPoint*, simgrid::kernel::routing::NetPoint*>
+static simgrid::s4u::Host*
 sg_platf_cluster_create_host(const simgrid::kernel::routing::ClusterCreationArgs* cluster, simgrid::s4u::NetZone* zone,
                              const std::vector<unsigned long>& /*coord*/, unsigned long id)
 {
@@ -160,11 +160,10 @@ sg_platf_cluster_create_host(const simgrid::kernel::routing::ClusterCreationArgs
 
   std::string host_id = cluster->prefix + std::to_string(cluster->radicals[id]) + cluster->suffix;
   XBT_DEBUG("Cluster: creating host=%s speed=%f", host_id.c_str(), cluster->speeds.front());
-  const simgrid::s4u::Host* host = zone->create_host(host_id, cluster->speeds)
-                                       ->set_core_count(cluster->core_amount)
-                                       ->set_properties(cluster->properties)
-                                       ->seal();
-  return std::make_pair(host->get_netpoint(), nullptr);
+  simgrid::s4u::Host* host = zone->create_host(host_id, cluster->speeds)
+                                 ->set_core_count(cluster->core_amount)
+                                 ->set_properties(cluster->properties);
+  return host;
 }
 
 /** @brief Auxiliary function to create loopback links */
@@ -210,7 +209,8 @@ static void sg_platf_new_cluster_hierarchical(const simgrid::kernel::routing::Cl
   using simgrid::kernel::routing::FatTreeZone;
   using simgrid::kernel::routing::TorusZone;
 
-  auto set_host = std::bind(sg_platf_cluster_create_host, cluster, _1, _2, _3);
+  std::function<simgrid::s4u::ClusterCallbacks::ClusterHostCb> set_host =
+    std::bind(sg_platf_cluster_create_host, cluster, _1, _2, _3);
   std::function<simgrid::s4u::ClusterCallbacks::ClusterLinkCb> set_loopback{};
   std::function<simgrid::s4u::ClusterCallbacks::ClusterLinkCb> set_limiter{};
 
index a0c16de..df630ee 100644 (file)
@@ -29,6 +29,10 @@ bool BarrierTransition::depends(const Transition* o) const
   if (o->type_ < type_)
     return o->depends(this);
 
+  // Actions executed by the same actor are always dependent
+  if (o->aid_ == aid_)
+    return true;
+
   if (const auto* other = dynamic_cast<const BarrierTransition*>(o)) {
     if (bar_ != other->bar_)
       return false;
index 051f11c..12fa915 100644 (file)
@@ -55,9 +55,9 @@ bool Mutex::try_lock()
  *
  * See @ref s4u_raii.
  */
-MutexPtr Mutex::create()
+MutexPtr Mutex::create(bool recursive)
 {
-  auto* mutex = new kernel::activity::MutexImpl();
+  auto* mutex = new kernel::activity::MutexImpl(recursive);
   return MutexPtr(&mutex->mutex(), false);
 }
 
index 48377bb..1917f0f 100644 (file)
@@ -96,8 +96,8 @@ void NetZone::add_route(const NetZone* src, const NetZone* dst, const std::vecto
   pimpl_->add_route(src ? src->get_netpoint() : nullptr, dst ? dst->get_netpoint(): nullptr,
                     src ? src->get_gateway() : nullptr, dst ? dst->get_gateway() : nullptr,
                     links_direct, false);
-  pimpl_->add_route(src ? src->get_netpoint() : nullptr, dst ? dst->get_netpoint(): nullptr,
-                    src ? src->get_gateway() : nullptr, dst ? dst->get_gateway() : nullptr,
+  pimpl_->add_route(dst ? dst->get_netpoint(): nullptr, src ? src->get_netpoint() : nullptr,
+                    dst ? dst->get_gateway() : nullptr, src ? src->get_gateway() : nullptr,
                     links_reverse, false);
 }
 
index 7b04687..bcc2f4e 100644 (file)
@@ -1,45 +1,34 @@
+#include <cstddef>
 #include <memory>
 #include <simgrid/Exception.hpp>
+#include <simgrid/s4u/Activity.hpp>
 #include <simgrid/s4u/Comm.hpp>
+#include <simgrid/s4u/Disk.hpp>
 #include <simgrid/s4u/Exec.hpp>
 #include <simgrid/s4u/Io.hpp>
 #include <simgrid/s4u/Task.hpp>
 #include <simgrid/simix.hpp>
+#include <string>
+#include <xbt/asserts.h>
 
 #include "src/simgrid/module.hpp"
 
 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
-/**
-  @beginrst
-
-
-Tasks are designed to represent dataflows, i.e, graphs of Tasks.
-Tasks can only be instancied using either
-:cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init`
-An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
-A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
-
-
-
-  @endrst
- */
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
 
 namespace simgrid::s4u {
 
 Task::Task(const std::string& name) : name_(name) {}
 
-/**
- *  @brief Return True if the Task can start a new Activity.
- *  @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
+/** @param instance The Task instance to check.
+ *  @brief Return True if this Task instance can start.
  */
-bool Task::ready_to_run() const
+bool Task::ready_to_run(std::string instance)
 {
-  return running_instances_ < parallelism_degree_ && queued_firings_ > 0;
+  return running_instances_[instance] < parallelism_degree_[instance] && queued_firings_[instance] > 0;
 }
 
-/**
- *  @param source The sender.
+/** @param source The sender.
  *  @brief Receive a token from another Task.
  *  @note Check upon reception if the Task has received a token from each of its predecessors,
  * and in this case consumes those tokens and enqueue an execution.
@@ -47,12 +36,10 @@ bool Task::ready_to_run() const
 void Task::receive(Task* source)
 {
   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
-  auto source_count = predecessors_[source];
   predecessors_[source]++;
-  if (tokens_received_.size() <= queued_firings_ + source_count)
-    tokens_received_.emplace_back();
-  tokens_received_[queued_firings_ + source_count][source] = source->token_;
-  bool enough_tokens                                       = true;
+  if (source->token_ != nullptr)
+    tokens_received_[source].push_back(source->token_);
+  bool enough_tokens = true;
   for (auto const& [key, val] : predecessors_)
     if (val < 1) {
       enough_tokens = false;
@@ -65,54 +52,88 @@ void Task::receive(Task* source)
   }
 }
 
-/**
- *  @brief Task routine when finishing an execution.
- *  @note Set its working status as false.
- * Add 1 to its count of finished executions.
- * Call the on_this_end func.
- * Fire on_end callback.
- * Send a token to each of its successors.
- * Start a new execution if possible.
+/** @param instance The Taks instance to complete.
+ *  @brief Task instance routine when finishing an execution of an instance.
+ *  @note The dispatcher instance enqueues a firing for the next instance.
+ *        The collector instance triggers the on_completion signals and sends tokens to successors.
+ *        Others instances enqueue a firing of the collector instance.
  */
-void Task::complete()
+void Task::complete(std::string instance)
 {
   xbt_assert(Actor::is_maestro());
-  running_instances_--;
-  count_++;
-  on_this_completion(this);
-  on_completion(this);
-  for (auto const& t : successors_)
-    t->receive(this);
-  if (ready_to_run())
-    fire();
+  running_instances_[instance]--;
+  count_[instance]++;
+  if (instance == "collector") {
+    on_this_completion(this);
+    on_completion(this);
+    for (auto const& t : successors_)
+      t->receive(this);
+  } else if (instance == "dispatcher") {
+    auto next_instance = load_balancing_function_();
+    xbt_assert(next_instance != "dispatcher" and next_instance != "collector", "Invalid instance selected: %s",
+               next_instance.c_str());
+    queued_firings_[next_instance] = queued_firings_.at(next_instance) + 1;
+    while (ready_to_run(next_instance))
+      fire(next_instance);
+  } else {
+    queued_firings_["collector"]++;
+    while (ready_to_run("collector"))
+      fire("collector");
+  }
+  if (ready_to_run(instance))
+    fire(instance);
 }
 
-/** @param n The new parallelism degree of the Task.
- *  @brief Set the parallelism degree of the Task to inscrease or decrease horizontal scaling.
- *  @note When increasing the degree the function starts new instances if there is queued firings.
- *        When decreasing the degree the function does NOT stop running instances.
-
+/** @param n The new parallelism degree of the Task instance.
+ *  @param instance The Task instance to modify.
+ *  @note You can use instance "all" to modify the parallelism degree of all instances of this Task.
+ *        When increasing the degree new executions are started if there is queued firings.
+ *        When decreasing the degree instances already running are NOT stopped.
  */
-void Task::set_parallelism_degree(int n)
+void Task::set_parallelism_degree(int n, std::string instance)
 {
-  xbt_assert(n > 0, "Parallelism degree of Tasks must be above 0.");
-  simgrid::kernel::actor::simcall_answered([this, n] {
-    parallelism_degree_ = n;
-    while (ready_to_run())
-      fire();
+  xbt_assert(n > 0, "Parallelism degree must be above 0.");
+  simgrid::kernel::actor::simcall_answered([this, n, &instance] {
+    if (instance == "all") {
+      for (auto& [key, value] : parallelism_degree_) {
+        parallelism_degree_[key] = n;
+        while (ready_to_run(key))
+          fire(key);
+      }
+    } else {
+      parallelism_degree_[instance] = n;
+      while (ready_to_run(instance))
+        fire(instance);
+    }
   });
 }
 
+/** @param bytes The internal bytes of the Task instance.
+ *  @param instance The Task instance to modify.
+ *  @note Internal bytes are used for Comms between the dispatcher and instance_n,
+ *        and between instance_n and the collector if they are not on the same host.
+ */
+void Task::set_internal_bytes(int bytes, std::string instance)
+{
+  simgrid::kernel::actor::simcall_answered([this, bytes, &instance] { internal_bytes_to_send_[instance] = bytes; });
+}
+
+/** @param func The load balancing function.
+ *  @note The dispatcher uses this function to determine which instance to trigger next.
+ */
+void Task::set_load_balancing_function(std::function<std::string()> func)
+{
+  simgrid::kernel::actor::simcall_answered([this, func] { load_balancing_function_ = func; });
+}
+
 /** @param n The number of firings to enqueue.
- *  @brief Enqueue firing.
- *  @note Immediatly fire an activity if possible.
  */
 void Task::enqueue_firings(int n)
 {
   simgrid::kernel::actor::simcall_answered([this, n] {
-    queued_firings_ += n;
-    while (ready_to_run())
-      fire();
+    queued_firings_["dispatcher"] += n;
+    while (ready_to_run("dispatcher"))
+      fire("dispatcher");
   });
 }
 
@@ -125,46 +146,45 @@ void Task::set_name(std::string name)
 }
 
 /** @param amount The amount to set.
- *  @brief Set the amout of work to do.
+ *  @param instance The Task instance to modify.
  *  @note Amount in flop for ExecTask and in bytes for CommTask.
  */
-void Task::set_amount(double amount)
+void Task::set_amount(double amount, std::string instance)
 {
-  simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
+  simgrid::kernel::actor::simcall_answered([this, amount, &instance] { amount_[instance] = amount; });
 }
 
 /** @param token The token to set.
  *  @brief Set the token to send to successors.
- *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ *  @note The token is passed to each successor after the Task instance collector end, i.e., after the on_completion
+ * callback.
  */
 void Task::set_token(std::shared_ptr<Token> token)
 {
   simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
 }
 
-/** @return Map of tokens received for the next execution.
- *  @note If there is no queued execution for this task the map might not exist or be partially empty.
+/** @param t The Task to deque a token from.
  */
-std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+void Task::deque_token_from(TaskPtr t)
 {
-  return tokens_received_.front()[t];
+  simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_[t].pop_front(); });
 }
 
-void Task::fire()
+void Task::fire(std::string instance)
 {
-  if ((int)current_activities_.size() > parallelism_degree_) {
-    current_activities_.pop_front();
+  if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) {
+    current_activities_[instance].pop_front();
+  }
+  if (instance != "dispatcher" and instance != "collector") {
+    on_this_start(this);
+    on_start(this);
   }
-  on_this_start(this);
-  on_start(this);
-  running_instances_++;
-  queued_firings_ = std::max(queued_firings_ - 1, 0);
-  if (not tokens_received_.empty())
-    tokens_received_.pop_front();
+  running_instances_[instance]++;
+  queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0);
 }
 
-/** @param successor The Task to add.
- *  @brief Add a successor to this Task.
+/** @param successor The Task to add as a successor.
  *  @note It also adds this as a predecessor of successor.
  */
 void Task::add_successor(TaskPtr successor)
@@ -175,8 +195,7 @@ void Task::add_successor(TaskPtr successor)
   });
 }
 
-/** @param successor The Task to remove.
- *  @brief Remove a successor from this Task.
+/** @param successor The Task to remove from the successors of this Task.
  *  @note It also remove this from the predecessors of successor.
  */
 void Task::remove_successor(TaskPtr successor)
@@ -187,6 +206,8 @@ void Task::remove_successor(TaskPtr successor)
   });
 }
 
+/** @brief Remove all successors from this Task.
+ */
 void Task::remove_all_successors()
 {
   simgrid::kernel::actor::simcall_answered([this] {
@@ -198,12 +219,58 @@ void Task::remove_all_successors()
   });
 }
 
+/** @param n The number of instances to add to this Task (>=0).
+ *  @note Instances goes always from instance_0 to instance_x,
+ *        where x is the current number of instance.
+ */
+void Task::add_instances(int n)
+{
+  xbt_assert(n >= 0, "Cannot add a negative number of instances (provided: %d)", n);
+  int instance_count = (int)amount_.size() - 2;
+  for (int i = instance_count; i < n + instance_count; i++) {
+    amount_["instance_" + std::to_string(i)]                 = amount_.at("instance_0");
+    queued_firings_["instance_" + std::to_string(i)]         = 0;
+    running_instances_["instance_" + std::to_string(i)]      = 0;
+    count_["instance_" + std::to_string(i)]                  = 0;
+    parallelism_degree_["instance_" + std::to_string(i)]     = parallelism_degree_.at("instance_0");
+    current_activities_["instance_" + std::to_string(i)]     = {};
+    internal_bytes_to_send_["instance_" + std::to_string(i)] = internal_bytes_to_send_.at("instance_0");
+    ;
+  }
+}
+
+/** @param n The number of instances to remove from this Task (>=0).
+ *  @note Instances goes always from instance_0 to instance_x,
+ *        where x is the current number of instance.
+ *        Running instances cannot be removed.
+ */
+void Task::remove_instances(int n)
+{
+  int instance_count = (int)amount_.size() - 2;
+  xbt_assert(n >= 0, "Cannot remove a negative number of instances (provided: %d)", n);
+  xbt_assert(instance_count - n > 0, "The number of instances must be above 0 (instances: %d, provided: %d)",
+             instance_count, n);
+  for (int i = instance_count - 1; i >= instance_count - n; i--) {
+    xbt_assert(running_instances_.at("instance_" + std::to_string(i)) == 0,
+               "Cannot remove a running instance (instances: %d)", i);
+    amount_.erase("instance_" + std::to_string(i));
+    queued_firings_.erase("instance_" + std::to_string(i));
+    running_instances_.erase("instance_" + std::to_string(i));
+    count_.erase("instance_" + std::to_string(i));
+    parallelism_degree_.erase("instance_" + std::to_string(i));
+    current_activities_.erase("instance_" + std::to_string(i));
+  }
+}
+
 /**
  *  @brief Default constructor.
  */
-ExecTask::ExecTask(const std::string& name) : Task(name) {}
+ExecTask::ExecTask(const std::string& name) : Task(name)
+{
+  set_load_balancing_function([]() { return "instance_0"; });
+}
 
-/** @ingroup plugin_task
+/**
  *  @brief Smart Constructor.
  */
 ExecTaskPtr ExecTask::init(const std::string& name)
@@ -211,7 +278,7 @@ ExecTaskPtr ExecTask::init(const std::string& name)
   return ExecTaskPtr(new ExecTask(name));
 }
 
-/** @ingroup plugin_task
+/**
  *  @brief Smart Constructor.
  */
 ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
@@ -219,45 +286,109 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
   return init(name)->set_flops(flops)->set_host(host);
 }
 
-/**
- *  @brief Do one execution of the Task.
- *  @note Call the on_this_start() func.
- *  Init and start the underlying Activity.
+/** @param instance The Task instance to fire.
+ *  @note Only the dispatcher instance triggers the on_start signal.
+ *        Comms are created if hosts differ between dispatcher and the instance to fire,
+ *        or between the instance and the collector.
  */
-void ExecTask::fire()
+void ExecTask::fire(std::string instance)
 {
-  Task::fire();
-  auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
-  exec->start();
-  exec->on_this_completion_cb([this](Exec const&) { complete(); });
-  store_activity(exec);
+  Task::fire(instance);
+  if (instance == "dispatcher" or instance == "collector") {
+    auto exec = Exec::init()
+                    ->set_name(get_name() + "_" + instance)
+                    ->set_flops_amount(get_amount(instance))
+                    ->set_host(host_[instance]);
+    exec->start();
+    exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
+    store_activity(exec, instance);
+  } else {
+    auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_[instance]);
+    if (host_["dispatcher"] == host_[instance]) {
+      exec->start();
+      store_activity(exec, instance);
+    } else {
+      auto comm = Comm::sendto_init(host_["dispatcher"], host_[instance])
+                      ->set_name(get_name() + "_dispatcher_to_" + instance)
+                      ->set_payload_size(get_internal_bytes("dispatcher"));
+      comm->add_successor(exec);
+      comm->start();
+      store_activity(comm, instance);
+    }
+    if (host_[instance] == host_["collector"]) {
+      exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
+      if (host_["dispatcher"] != host_[instance])
+        store_activity(exec, instance);
+    } else {
+      auto comm = Comm::sendto_init(host_[instance], host_["collector"])
+                      ->set_name(get_name() + instance + "_to_collector")
+                      ->set_payload_size(get_internal_bytes(instance));
+      exec->add_successor(comm);
+      comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
+      comm.detach();
+    }
+  }
 }
 
-/** @ingroup plugin_task
- *  @param host The host to set.
+/** @param host The host to set.
+ *  @param instance The Task instance to modify.
  *  @brief Set a new host.
  */
-ExecTaskPtr ExecTask::set_host(Host* host)
+ExecTaskPtr ExecTask::set_host(Host* host, std::string instance)
 {
-  kernel::actor::simcall_answered([this, host] { host_ = host; });
+  kernel::actor::simcall_answered([this, host, &instance] {
+    if (instance == "all")
+      for (auto& [key, value] : host_)
+        host_[key] = host;
+    else
+      host_[instance] = host;
+  });
   return this;
 }
 
-/** @ingroup plugin_task
- *  @param flops The amount of flops to set.
+/** @param flops The amount of flops to set.
+ *  @param instance The Task instance to modify.
  */
-ExecTaskPtr ExecTask::set_flops(double flops)
+ExecTaskPtr ExecTask::set_flops(double flops, std::string instance)
 {
-  kernel::actor::simcall_answered([this, flops] { set_amount(flops); });
+  kernel::actor::simcall_answered([this, flops, &instance] { set_amount(flops, instance); });
   return this;
 }
 
+/** @param n The number of instances to add to this Task (>=0).
+    @note Instances goes always from instance_0 to instance_x,
+          where x is the current number of instance.
+ */
+void ExecTask::add_instances(int n)
+{
+  Task::add_instances(n);
+  int instance_count = (int)host_.size() - 2;
+  for (int i = instance_count; i < n + instance_count; i++)
+    host_["instance_" + std::to_string(i)] = host_.at("instance_0");
+}
+
+/** @param n The number of instances to remove from this Task (>=0).
+    @note Instances goes always from instance_0 to instance_x,
+          where x is the current number of instance.
+          Running instance cannot be removed.
+ */
+void ExecTask::remove_instances(int n)
+{
+  Task::remove_instances(n);
+  int instance_count = (int)host_.size() - 2;
+  for (int i = instance_count - 1; i >= instance_count - n; i--)
+    host_.erase("instance_" + std::to_string(i));
+}
+
 /**
  *  @brief Default constructor.
  */
-CommTask::CommTask(const std::string& name) : Task(name) {}
+CommTask::CommTask(const std::string& name) : Task(name)
+{
+  set_load_balancing_function([]() { return "instance_0"; });
+}
 
-/** @ingroup plugin_task
+/**
  *  @brief Smart constructor.
  */
 CommTaskPtr CommTask::init(const std::string& name)
@@ -265,7 +396,7 @@ CommTaskPtr CommTask::init(const std::string& name)
   return CommTaskPtr(new CommTask(name));
 }
 
-/** @ingroup plugin_task
+/**
  *  @brief Smart constructor.
  */
 CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
@@ -273,21 +404,29 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source,
   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
 }
 
-/**
- *  @brief Do one execution of the Task.
- *  @note Call the on_this_start() func.
- *  Init and start the underlying Activity.
+/** @param instance The Task instance to fire.
+ *  @note Only the dispatcher instance triggers the on_start signal.
  */
-void CommTask::fire()
+void CommTask::fire(std::string instance)
 {
-  Task::fire();
-  auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
-  comm->start();
-  comm->on_this_completion_cb([this](Comm const&) { complete(); });
-  store_activity(comm);
+  Task::fire(instance);
+  if (instance == "dispatcher" or instance == "collector") {
+    auto exec = Exec::init()
+                    ->set_name(get_name() + "_" + instance)
+                    ->set_flops_amount(get_amount(instance))
+                    ->set_host(instance == "dispatcher" ? source_ : destination_);
+    exec->start();
+    exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
+    store_activity(exec, instance);
+  } else {
+    auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
+    comm->start();
+    comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
+    store_activity(comm, instance);
+  }
 }
 
-/** @ingroup plugin_task
+/**
  *  @param source The host to set.
  *  @brief Set a new source host.
  */
@@ -297,7 +436,7 @@ CommTaskPtr CommTask::set_source(Host* source)
   return this;
 }
 
-/** @ingroup plugin_task
+/**
  *  @param destination The host to set.
  *  @brief Set a new destination host.
  */
@@ -307,7 +446,7 @@ CommTaskPtr CommTask::set_destination(Host* destination)
   return this;
 }
 
-/** @ingroup plugin_task
+/**
  *  @param bytes The amount of bytes to set.
  */
 CommTaskPtr CommTask::set_bytes(double bytes)
@@ -319,9 +458,12 @@ CommTaskPtr CommTask::set_bytes(double bytes)
 /**
  *  @brief Default constructor.
  */
-IoTask::IoTask(const std::string& name) : Task(name) {}
+IoTask::IoTask(const std::string& name) : Task(name)
+{
+  set_load_balancing_function([]() { return "instance_0"; });
+}
 
-/** @ingroup plugin_task
+/**
  *  @brief Smart Constructor.
  */
 IoTaskPtr IoTask::init(const std::string& name)
@@ -329,7 +471,7 @@ IoTaskPtr IoTask::init(const std::string& name)
   return IoTaskPtr(new IoTask(name));
 }
 
-/** @ingroup plugin_task
+/**
  *  @brief Smart Constructor.
  */
 IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
@@ -337,9 +479,8 @@ IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::Op
   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
 }
 
-/** @ingroup plugin_task
+/**
  *  @param disk The disk to set.
- *  @brief Set a new disk.
  */
 IoTaskPtr IoTask::set_disk(Disk* disk)
 {
@@ -347,7 +488,7 @@ IoTaskPtr IoTask::set_disk(Disk* disk)
   return this;
 }
 
-/** @ingroup plugin_task
+/**
  *  @param bytes The amount of bytes to set.
  */
 IoTaskPtr IoTask::set_bytes(double bytes)
@@ -356,20 +497,34 @@ IoTaskPtr IoTask::set_bytes(double bytes)
   return this;
 }
 
-/** @ingroup plugin_task */
+/**
+ *  @param type The op type to set.
+ */
 IoTaskPtr IoTask::set_op_type(Io::OpType type)
 {
   kernel::actor::simcall_answered([this, type] { type_ = type; });
   return this;
 }
 
-void IoTask::fire()
+/** @param instance The Task instance to fire.
+ *  @note Only the dispatcher instance triggers the on_start signal.
+ */
+void IoTask::fire(std::string instance)
 {
-  Task::fire();
-  auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
-  io->start();
-  io->on_this_completion_cb([this](Io const&) { complete(); });
-  store_activity(io);
+  Task::fire(instance);
+  if (instance == "dispatcher" or instance == "collector") {
+    auto exec = Exec::init()
+                    ->set_name(get_name() + "_" + instance)
+                    ->set_flops_amount(get_amount(instance))
+                    ->set_host(disk_->get_host());
+    exec->start();
+    exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
+    store_activity(exec, instance);
+  } else {
+    auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
+    io->start();
+    io->on_this_completion_cb([this, instance](Io const&) { complete(instance); });
+    store_activity(io, instance);
+  }
 }
-
 } // namespace simgrid::s4u
index 10dbe7b..25e4a16 100644 (file)
@@ -10,7 +10,7 @@
 
 /** Config Globals */
 
-XBT_PUBLIC_DATA int _sg_cfg_init_status;
+XBT_PUBLIC_DATA int _sg_cfg_init_status; /* 0: not inited; 1: config module inited; 2: root zone of platform created */
 
 XBT_PUBLIC void sg_config_init(int* argc, char** argv);
 XBT_PUBLIC void sg_config_finalize();
index 94bb62b..8cf9b41 100644 (file)
@@ -381,6 +381,8 @@ int PMPI_Sendrecv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, int
   CHECK_TYPE(8, recvtype)
   CHECK_BUFFER(1, sendbuf, sendcount, sendtype)
   CHECK_BUFFER(6, recvbuf, recvcount, recvtype)
+  CHECK_ARGS(sendbuf == recvbuf && sendcount > 0 && recvcount > 0, MPI_ERR_BUFFER,
+             "%s: Invalid parameters 1 and 6: sendbuf and recvbuf must be disjoint", __func__);
   CHECK_TAG(10, recvtag)
   CHECK_COMM(11)
   const SmpiBenchGuard suspend_bench;
index f6194db..cb097d2 100644 (file)
@@ -68,8 +68,8 @@ public:
   smpi_trace_call_location_t* call_location();
   void set_privatized_region(smpi_privatization_region_t region);
   smpi_privatization_region_t privatized_region() const;
-  s4u::Mailbox* mailbox() const { return mailbox_; }
-  s4u::Mailbox* mailbox_small() const { return mailbox_small_; }
+  s4u::Mailbox* mailbox();
+  s4u::Mailbox* mailbox_small();
   s4u::MutexPtr mailboxes_mutex() const;
 #if HAVE_PAPI
   int papi_event_set() const;
index 83bb798..2cd94c9 100644 (file)
@@ -26,8 +26,8 @@ ActorExt::ActorExt(s4u::Actor* actor) : actor_(actor)
   if (not simgrid::smpi::ActorExt::EXTENSION_ID.valid())
     simgrid::smpi::ActorExt::EXTENSION_ID = simgrid::s4u::Actor::extension_create<simgrid::smpi::ActorExt>();
 
-  mailbox_         = s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid()));
-  mailbox_small_   = s4u::Mailbox::by_name("small-" + std::to_string(actor_->get_pid()));
+  mailbox_         = nullptr;
+  mailbox_small_   = nullptr;
   mailboxes_mutex_ = s4u::Mutex::create();
   timer_           = xbt_os_timer_new();
   state_           = SmpiProcessState::UNINITIALIZED;
@@ -91,6 +91,22 @@ int ActorExt::initialized() const
   return (state_ == SmpiProcessState::INITIALIZED);
 }
 
+/** @brief Return main mailbox of the process */
+s4u::Mailbox* ActorExt::mailbox()
+{
+  if(mailbox_==nullptr)
+    mailbox_=s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid()));
+  return mailbox_;
+}
+
+/** @brief Return mailbox for small messages */
+s4u::Mailbox* ActorExt::mailbox_small()
+{
+  if(mailbox_small_==nullptr)
+    mailbox_small_=s4u::Mailbox::by_name("small-" + std::to_string(actor_->get_pid()));
+  return mailbox_small_;
+}
+
 /** @brief Mark a process as initialized (=MPI_Init called) */
 void ActorExt::mark_as_initialized()
 {
@@ -242,7 +258,7 @@ void ActorExt::init()
   ext->comm_world_ = smpi_deployment_comm_world(ext->instance_id_);
 
   // set the process attached to the mailbox
-  ext->mailbox_small_->set_receiver(ext->actor_);
+  ext->mailbox_small()->set_receiver(ext->actor_);
   XBT_DEBUG("<%ld> SMPI process has been initialized: %p", ext->actor_->get_pid(), ext->actor_);
 }
 
index 6e38668..25e9b52 100644 (file)
@@ -51,6 +51,7 @@ struct ObjectOwner {
   simgrid::kernel::actor::ActorImpl* owner = nullptr;
   const char* file                         = nullptr;
   int line                                 = -1;
+  int recursive_depth                      = 0;
   explicit ObjectOwner(simgrid::kernel::actor::ActorImpl* o) : owner(o) {}
 };
 
@@ -73,7 +74,7 @@ static ObjectOwner* get_owner(void* object)
   return o;
 }
 
-int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line)
+int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line, const char* func)
 {
   sthread_disable();
   auto* self = simgrid::kernel::actor::ActorImpl::self();
@@ -83,10 +84,23 @@ int sthread_access_begin(void* objaddr, const char* objname, const char* file, i
       [self, objaddr, objname, file, line]() -> bool {
         XBT_INFO("%s takes %s", self->get_cname(), objname);
         auto* ownership = get_owner(objaddr);
-        if (ownership->owner != nullptr) {
+        if (ownership->owner == self) {
+          ownership->recursive_depth++;
+          return true;
+        } else if (ownership->owner != nullptr) {
           auto msg = std::string("Unprotected concurent access to ") + objname + ": " + ownership->owner->get_name();
-          if (not xbt_log_no_loc)
+          if (not xbt_log_no_loc) {
             msg += simgrid::xbt::string_printf(" at %s:%d", ownership->file, ownership->line);
+            if (ownership->recursive_depth > 1) {
+              msg += simgrid::xbt::string_printf(" (and %d other locations)", ownership->recursive_depth - 1);
+              if (ownership->recursive_depth != 2)
+                msg += "s";
+            }
+          } else {
+            msg += simgrid::xbt::string_printf(" from %d location", ownership->recursive_depth);
+            if (ownership->recursive_depth != 1)
+              msg += "s";
+          }
           msg += " vs " + self->get_name();
           if (xbt_log_no_loc)
             msg += std::string(" (locations hidden because of --log=no_loc).");
@@ -98,6 +112,7 @@ int sthread_access_begin(void* objaddr, const char* objname, const char* file, i
         ownership->owner = self;
         ownership->file  = file;
         ownership->line  = line;
+        ownership->recursive_depth = 1;
         return true;
       },
       &observer);
@@ -106,7 +121,7 @@ int sthread_access_begin(void* objaddr, const char* objname, const char* file, i
   sthread_enable();
   return true;
 }
-void sthread_access_end(void* objaddr, const char* objname, const char* file, int line)
+void sthread_access_end(void* objaddr, const char* objname, const char* file, int line, const char* func)
 {
   sthread_disable();
   auto* self = simgrid::kernel::actor::ActorImpl::self();
@@ -116,9 +131,12 @@ void sthread_access_end(void* objaddr, const char* objname, const char* file, in
       [self, objaddr, objname]() -> void {
         XBT_INFO("%s releases %s", self->get_cname(), objname);
         auto* ownership = get_owner(objaddr);
-        xbt_assert(ownership->owner == self, "safety check failed: %s is not owner of the object it's releasing.",
-                   self->get_cname());
-        ownership->owner = nullptr;
+        xbt_assert(ownership->owner == self,
+                   "safety check failed: %s is not owner of the object it's releasing. That object owned by %s.",
+                   self->get_cname(), (ownership->owner == nullptr ? "nobody" : ownership->owner->get_cname()));
+        ownership->recursive_depth--;
+        if (ownership->recursive_depth == 0)
+          ownership->owner = nullptr;
       },
       &observer);
   sthread_enable();
index ee9e084..2005f29 100644 (file)
@@ -28,6 +28,12 @@ static int (*raw_mutex_trylock)(pthread_mutex_t*);
 static int (*raw_mutex_unlock)(pthread_mutex_t*);
 static int (*raw_mutex_destroy)(pthread_mutex_t*);
 
+static int (*raw_pthread_mutexattr_init)(pthread_mutexattr_t*);
+static int (*raw_pthread_mutexattr_settype)(pthread_mutexattr_t*, int);
+static int (*raw_pthread_mutexattr_gettype)(const pthread_mutexattr_t* restrict, int* restrict);
+static int (*raw_pthread_mutexattr_getrobust)(const pthread_mutexattr_t*, int*);
+static int (*raw_pthread_mutexattr_setrobust)(pthread_mutexattr_t*, int);
+
 static unsigned int (*raw_sleep)(unsigned int);
 static int (*raw_usleep)(useconds_t);
 static int (*raw_gettimeofday)(struct timeval*, void*);
@@ -50,6 +56,12 @@ static void intercepter_init()
   raw_mutex_unlock   = dlsym(RTLD_NEXT, "pthread_mutex_unlock");
   raw_mutex_destroy  = dlsym(RTLD_NEXT, "pthread_mutex_destroy");
 
+  raw_pthread_mutexattr_init      = dlsym(RTLD_NEXT, "pthread_mutexattr_init");
+  raw_pthread_mutexattr_settype   = dlsym(RTLD_NEXT, "pthread_mutexattr_settype");
+  raw_pthread_mutexattr_gettype   = dlsym(RTLD_NEXT, "pthread_mutexattr_gettype");
+  raw_pthread_mutexattr_getrobust = dlsym(RTLD_NEXT, "pthread_mutexattr_getrobust");
+  raw_pthread_mutexattr_setrobust = dlsym(RTLD_NEXT, "pthread_mutexattr_setrobust");
+
   raw_sleep        = dlsym(RTLD_NEXT, "sleep");
   raw_usleep       = dlsym(RTLD_NEXT, "usleep");
   raw_gettimeofday = dlsym(RTLD_NEXT, "gettimeofday");
@@ -85,6 +97,32 @@ int pthread_create(pthread_t* thread, const pthread_attr_t* attr, void* (*start_
   sthread_enable();
   return res;
 }
+
+#define _STHREAD_CONCAT(a, b) a##b
+#define intercepted_call(name, raw_params, call_params, sim_params)                                                    \
+  int _STHREAD_CONCAT(pthread_, name) raw_params                                                                       \
+  {                                                                                                                    \
+    if (_STHREAD_CONCAT(raw_pthread_, name) == NULL)                                                                   \
+      intercepter_init();                                                                                              \
+    if (sthread_inside_simgrid)                                                                                        \
+      return _STHREAD_CONCAT(raw_pthread_, name) call_params;                                                          \
+                                                                                                                       \
+    sthread_disable();                                                                                                 \
+    int res = _STHREAD_CONCAT(sthread_, name) sim_params;                                                              \
+    sthread_enable();                                                                                                  \
+    return res;                                                                                                        \
+  }
+
+intercepted_call(mutexattr_init, (pthread_mutexattr_t * attr), (attr), ((sthread_mutexattr_t*)attr));
+intercepted_call(mutexattr_settype, (pthread_mutexattr_t * attr, int type), (attr, type),
+                 ((sthread_mutexattr_t*)attr, type));
+intercepted_call(mutexattr_gettype, (const pthread_mutexattr_t* restrict attr, int* type), (attr, type),
+                 ((sthread_mutexattr_t*)attr, type));
+intercepted_call(mutexattr_setrobust, (pthread_mutexattr_t* restrict attr, int robustness), (attr, robustness),
+                 ((sthread_mutexattr_t*)attr, robustness));
+intercepted_call(mutexattr_getrobust, (const pthread_mutexattr_t* restrict attr, int* restrict robustness),
+                 (attr, robustness), ((sthread_mutexattr_t*)attr, robustness));
+
 int pthread_join(pthread_t thread, void** retval)
 {
   if (raw_pthread_join == NULL)
@@ -107,7 +145,7 @@ int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* attr)
     return raw_mutex_init(mutex, attr);
 
   sthread_disable();
-  int res = sthread_mutex_init((sthread_mutex_t*)mutex, attr);
+  int res = sthread_mutex_init((sthread_mutex_t*)mutex, (sthread_mutexattr_t*)attr);
   sthread_enable();
   return res;
 }
index 6527968..f0a9117 100644 (file)
@@ -31,10 +31,22 @@ typedef unsigned long int sthread_t;
 int sthread_create(sthread_t* thread, const /*pthread_attr_t*/ void* attr, void* (*start_routine)(void*), void* arg);
 int sthread_join(sthread_t thread, void** retval);
 
+typedef struct {
+  unsigned recursive : 1;
+  unsigned errorcheck : 1;
+  unsigned robust : 1;
+} sthread_mutexattr_t;
+
+int sthread_mutexattr_init(sthread_mutexattr_t* attr);
+int sthread_mutexattr_settype(sthread_mutexattr_t* attr, int type);
+int sthread_mutexattr_gettype(const sthread_mutexattr_t* attr, int* type);
+int sthread_mutexattr_getrobust(const sthread_mutexattr_t* attr, int* robustness);
+int sthread_mutexattr_setrobust(sthread_mutexattr_t* attr, int robustness);
+
 typedef struct {
   void* mutex;
 } sthread_mutex_t;
-int sthread_mutex_init(sthread_mutex_t* mutex, const /*pthread_mutexattr_t*/ void* attr);
+int sthread_mutex_init(sthread_mutex_t* mutex, const sthread_mutexattr_t* attr);
 int sthread_mutex_lock(sthread_mutex_t* mutex);
 int sthread_mutex_trylock(sthread_mutex_t* mutex);
 int sthread_mutex_unlock(sthread_mutex_t* mutex);
@@ -53,8 +65,8 @@ int sthread_sem_timedwait(sthread_sem_t* sem, const struct timespec* abs_timeout
 int sthread_gettimeofday(struct timeval* tv);
 void sthread_sleep(double seconds);
 
-int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line);
-void sthread_access_end(void* objaddr, const char* objname, const char* file, int line);
+int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line, const char* function);
+void sthread_access_end(void* objaddr, const char* objname, const char* file, int line, const char* function);
 
 #if defined(__cplusplus)
 }
index 45ba730..de2667b 100644 (file)
@@ -6,6 +6,9 @@
 /* SimGrid's pthread interposer. Actual implementation of the symbols (see the comment in sthread.h) */
 
 #include "smpi/smpi.h"
+#include "xbt/asserts.h"
+#include "xbt/ex.h"
+#include "xbt/log.h"
 #include "xbt/string.hpp"
 #include <simgrid/actor.h>
 #include <simgrid/s4u/Actor.hpp>
@@ -38,14 +41,22 @@ int sthread_main(int argc, char** argv, char** envp, int (*raw_main)(int, char**
 {
   /* Do not intercept the main when run from SMPI: it will initialize the simulation properly */
   for (int i = 0; envp[i] != nullptr; i++)
-    if (std::string_view(envp[i]).rfind("SMPI_GLOBAL_SIZE", 0) == 0)
+    if (std::string_view(envp[i]).rfind("SMPI_GLOBAL_SIZE", 0) == 0) {
+      printf("sthread refuses to intercept the SMPI application %s directly, as its interception is done otherwise.\n",
+             argv[0]);
       return raw_main(argc, argv, envp);
+    }
 
-  /* If not in SMPI, the old main becomes an actor in a newly created simulation */
-  std::ostringstream id;
-  id << std::this_thread::get_id();
+  /* Do not intercept valgrind step 1 */
+  if (not strcmp(argv[0], "/usr/bin/valgrind.bin") || not strcmp(argv[0], "/bin/sh")) {
+    printf("sthread refuses to intercept the execution of %s. Running the application unmodified.\n", argv[0]);
+    fflush(stdout);
+    return raw_main(argc, argv, envp);
+  }
 
-  XBT_DEBUG("sthread main() is starting in thread %s", id.str().c_str());
+  /* If not in SMPI, the old main becomes an actor in a newly created simulation */
+  printf("sthread is intercepting the execution of %s\n", argv[0]);
+  fflush(stdout);
 
   sg4::Engine e(&argc, argv);
   auto* zone = sg4::create_full_zone("world");
@@ -56,7 +67,6 @@ int sthread_main(int argc, char** argv, char** envp, int (*raw_main)(int, char**
   sthread_enable();
   sg4::ActorPtr main_actor = sg4::Actor::create("main thread", lilibeth, raw_main, argc, argv, envp);
 
-  XBT_INFO("Starting the simulation.");
   sg4::Engine::get_instance()->run();
   sthread_disable();
   XBT_INFO("All threads exited. Terminating the simulation.");
@@ -108,9 +118,57 @@ int sthread_join(sthread_t thread, void** /*retval*/)
   return 0;
 }
 
-int sthread_mutex_init(sthread_mutex_t* mutex, const void* /*pthread_mutexattr_t* attr*/)
+int sthread_mutexattr_init(sthread_mutexattr_t* attr)
+{
+  memset(attr, 0, sizeof(*attr));
+  return 0;
+}
+int sthread_mutexattr_settype(sthread_mutexattr_t* attr, int type)
 {
-  auto m = sg4::Mutex::create();
+  switch (type) {
+    case PTHREAD_MUTEX_NORMAL:
+      xbt_assert(not attr->recursive, "S4U does not allow to remove the recursivness of a mutex.");
+      attr->recursive = 0;
+      break;
+    case PTHREAD_MUTEX_RECURSIVE:
+      attr->recursive = 1;
+      attr->errorcheck = 0; // reset
+      break;
+    case PTHREAD_MUTEX_ERRORCHECK:
+      attr->errorcheck = 1;
+      THROW_UNIMPLEMENTED;
+      break;
+    default:
+      THROW_IMPOSSIBLE;
+  }
+  return 0;
+}
+int sthread_mutexattr_gettype(const sthread_mutexattr_t* attr, int* type)
+{
+  if (attr->recursive)
+    *type = PTHREAD_MUTEX_RECURSIVE;
+  else if (attr->errorcheck)
+    *type = PTHREAD_MUTEX_ERRORCHECK;
+  else
+    *type = PTHREAD_MUTEX_NORMAL;
+  return 0;
+}
+int sthread_mutexattr_getrobust(const sthread_mutexattr_t* attr, int* robustness)
+{
+  *robustness = attr->robust;
+  return 0;
+}
+int sthread_mutexattr_setrobust(sthread_mutexattr_t* attr, int robustness)
+{
+  attr->robust = robustness;
+  if (robustness)
+    THROW_UNIMPLEMENTED;
+  return 0;
+}
+
+int sthread_mutex_init(sthread_mutex_t* mutex, const sthread_mutexattr_t* attr)
+{
+  auto m = sg4::Mutex::create(attr != nullptr && attr->recursive);
   intrusive_ptr_add_ref(m.get());
 
   mutex->mutex = m.get();
@@ -123,6 +181,7 @@ int sthread_mutex_lock(sthread_mutex_t* mutex)
   if (mutex->mutex == nullptr)
     sthread_mutex_init(mutex, nullptr);
 
+  XBT_DEBUG("%s(%p)", __FUNCTION__, mutex);
   static_cast<sg4::Mutex*>(mutex->mutex)->lock();
   return 0;
 }
@@ -133,7 +192,10 @@ int sthread_mutex_trylock(sthread_mutex_t* mutex)
   if (mutex->mutex == nullptr)
     sthread_mutex_init(mutex, nullptr);
 
-  return static_cast<sg4::Mutex*>(mutex->mutex)->try_lock();
+  XBT_DEBUG("%s(%p)", __FUNCTION__, mutex);
+  if (static_cast<sg4::Mutex*>(mutex->mutex)->try_lock())
+    return 0;
+  return EBUSY;
 }
 
 int sthread_mutex_unlock(sthread_mutex_t* mutex)
@@ -142,6 +204,7 @@ int sthread_mutex_unlock(sthread_mutex_t* mutex)
   if (mutex->mutex == nullptr)
     sthread_mutex_init(mutex, nullptr);
 
+  XBT_DEBUG("%s(%p)", __FUNCTION__, mutex);
   static_cast<sg4::Mutex*>(mutex->mutex)->unlock();
   return 0;
 }
@@ -151,6 +214,7 @@ int sthread_mutex_destroy(sthread_mutex_t* mutex)
   if (mutex->mutex == nullptr)
     sthread_mutex_init(mutex, nullptr);
 
+  XBT_DEBUG("%s(%p)", __FUNCTION__, mutex);
   intrusive_ptr_release(static_cast<sg4::Mutex*>(mutex->mutex));
   return 0;
 }
index b6a16ad..f0d5401 100644 (file)
@@ -40,11 +40,11 @@ double xbt_os_time(void)
 #if HAVE_GETTIMEOFDAY
   struct timeval tv;
   gettimeofday(&tv, NULL);
+
+  return (double)tv.tv_sec + (double)tv.tv_usec / 1e6;
 #else                           /* no gettimeofday => poor resolution */
   return (double) (time(NULL));
 #endif                          /* HAVE_GETTIMEOFDAY? */
-
-  return (double)tv.tv_sec + (double)tv.tv_usec / 1e6;
 }
 
 void xbt_os_sleep(double sec)
@@ -59,7 +59,7 @@ void xbt_os_sleep(double sec)
   struct timeval timeout;
 
   timeout.tv_sec  = (long)sec;
-  timeout.tv_usec = (long)(sec - floor(sec)) * 1e6);
+  timeout.tv_usec = (long)(sec - floor(sec)) * 1e6;
 
   select(0, NULL, NULL, NULL, &timeout);
 #endif
index d4f9eb8..d0c587e 100644 (file)
@@ -64,6 +64,7 @@ if (enable_smpi_MBI_testsuite)
     add_executable(mbi_${basefile} EXCLUDE_FROM_ALL ${CMAKE_BINARY_DIR}/MBI/${cfile})
     target_link_libraries(mbi_${basefile} simgrid)
     target_compile_options(mbi_${basefile} PRIVATE "-Wno-unused-variable")
+    target_compile_options(mbi_${basefile} PRIVATE "-Wno-unused-but-set-variable")
     set_target_properties(mbi_${basefile} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/MBI)
     add_dependencies(tests-mbi mbi_${basefile})
 
index 899e810..d9f8a51 100755 (executable)
@@ -26,11 +26,11 @@ END_MPI_FEATURES
 
 BEGIN_MBI_TESTS
   $ mpirun -np 2 ${EXE} 1
-  | @{outcome}@
-  | @{errormsg}@
+  | @{outcome1}@
+  | @{errormsg1}@
   $ mpirun -np 2 ${EXE} 2
-  | @{outcome}@
-  | @{errormsg}@
+  | @{outcome2}@
+  | @{errormsg2}@
 END_MBI_TESTS
 //////////////////////       End of MBI headers        /////////////////// */
 
@@ -127,16 +127,20 @@ for s in gen.send + gen.isend:
         replace = patterns.copy()
         replace['shortdesc'] = 'Correct call ordering.'
         replace['longdesc'] = 'Correct call ordering.'
-        replace['outcome'] = 'OK'
-        replace['errormsg'] = 'OK'
+        replace['outcome1'] = 'OK'
+        replace['errormsg1'] = 'OK'
+        replace['outcome2'] = 'OK'
+        replace['errormsg2'] = 'OK'
         gen.make_file(template, f'InputHazardCallOrdering_{r}_{s}_ok.c', replace)
 
         # Generate the incorrect matching
         replace = patterns.copy()
         replace['shortdesc'] = 'Missing Send function.'
         replace['longdesc'] = 'Missing Send function call for a path depending to input, a deadlock is created.'
-        replace['outcome'] = 'ERROR: IHCallMatching'
-        replace['errormsg'] = 'P2P mistmatch. Missing @{r}@ at @{filename}@:@{line:MBIERROR}@.'
+        replace['outcome1'] = 'OK'
+        replace['errormsg1'] = 'OK'
+        replace['outcome2'] = 'ERROR: IHCallMatching'
+        replace['errormsg2'] = 'P2P mistmatch. Missing @{r}@ at @{filename}@:@{line:MBIERROR}@.'
         replace['errorcond'] = '/* MBIERROR */'
         replace['operation1b'] = ''
         replace['fini1b'] = ''
@@ -172,16 +176,20 @@ for c in gen.coll:
     replace = patterns.copy()
     replace['shortdesc'] = 'Correct call ordering.'
     replace['longdesc'] = 'Correct call ordering.'
-    replace['outcome'] = 'OK'
-    replace['errormsg'] = 'OK'
+    replace['outcome1'] = 'OK'
+    replace['errormsg1'] = 'OK'
+    replace['outcome2'] = 'OK'
+    replace['errormsg2'] = 'OK'
     gen.make_file(template, f'InputHazardCallOrdering_{c}_ok.c', replace)
 
     # Generate the incorrect matching
     replace = patterns.copy()
     replace['shortdesc'] = 'Missing collective function call.'
     replace['longdesc'] = 'Missing collective function call for a path depending to input, a deadlock is created.'
-    replace['outcome'] = 'ERROR: IHCallMatching'
-    replace['errormsg'] = 'P2P mistmatch. Missing @{c}@ at @{filename}@:@{line:MBIERROR}@.'
+    replace['outcome1'] = 'OK'
+    replace['errormsg1'] = 'OK'
+    replace['outcome2'] = 'ERROR: IHCallMatching'
+    replace['errormsg2'] = 'P2P mistmatch. Missing @{c}@ at @{filename}@:@{line:MBIERROR}@.'
     replace['errorcond'] = '/* MBIERROR */'
     replace['operation1b'] = ''
     replace['fini1b'] = ''
index c422cca..9a7d837 100644 (file)
@@ -67,6 +67,9 @@ possible_details = {
     'GlobalConcurrency':'DGlobalConcurrency',
     # larger scope
     'BufferingHazard':'EBufferingHazard',
+    # Input Hazard
+    'IHCallMatching':'InputHazard',
+    
     'OK':'FOK'}
 
 error_scope = {
index 05253b8..663a548 100755 (executable)
@@ -25,11 +25,11 @@ END_MPI_FEATURES
 
 BEGIN_MBI_TESTS
   $ mpirun -np 4 $zero_buffer ${EXE}
-  | @{outcome1}@
-  | @{errormsg1}@
+  | @{outcome_zerob}@
+  | @{errormsg_zerob}@
   $ mpirun -np 4 $infty_buffer ${EXE}
-  | @{outcome1}@
-  | @{errormsg1}@
+  | @{outcome_infty}@
+  | @{errormsg_infty}@
 END_MBI_TESTS
 //////////////////////       End of MBI headers        /////////////////// */
 
@@ -123,8 +123,10 @@ for s in gen.send + gen.isend:
         replace = patterns.copy()
         replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ may not be matched'
         replace['longdesc'] = 'Processes 0 and 1 both call @{s}@ and @{r}@. This results in a deadlock depending on the buffering mode'
-        replace['outcome1'] = 'ERROR: BufferingHazard'
-        replace['errormsg1'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause by two processes call {s} before {r}.'
+        replace['outcome_zerob'] = 'ERROR: BufferingHazard'
+        replace['errormsg_zerob'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause by two processes call {s} before {r}.'
+        replace['outcome_infty'] = 'OK'
+        replace['errormsg_infty'] = 'OK'
         gen.make_file(template, f'P2PBuffering_{s}_{r}_{s}_{r}_nok.c', replace)
 
         # Generate the incorrect matching with send message to the same process depending on the buffering mode (send + recv)
@@ -136,8 +138,10 @@ for s in gen.send + gen.isend:
         replace['dest2'] = '1'
         replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ may not be matched'
         replace['longdesc'] = 'Processes 0 and 1 both call @{s}@ and @{r}@. This results in a deadlock depending on the buffering mode'
-        replace['outcome1'] = 'ERROR: BufferingHazard'
-        replace['errormsg1'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause Send message to the same process.'
+        replace['outcome_zerob'] = 'ERROR: BufferingHazard'
+        replace['errormsg_zerob'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause Send message to the same process.'
+        replace['outcome_infty'] = 'OK'
+        replace['errormsg_infty'] = 'OK'
         gen.make_file(template, f'P2PBuffering_SameProcess_{s}_{r}_nok.c', replace)
 
         # Generate the incorrect matching with circular send message depending on the buffering mode (send + recv)
@@ -155,16 +159,20 @@ for s in gen.send + gen.isend:
         replace['operation2c'] = gen.operation[r]("2")
         replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ may not be matched'
         replace['longdesc'] = 'Processes 0 and 1 both call @{s}@ and @{r}@. This results in a deadlock depending on the buffering mode'
-        replace['outcome1'] = 'ERROR: BufferingHazard'
-        replace['errormsg1'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause circular send message.'
+        replace['outcome_zerob'] = 'ERROR: BufferingHazard'
+        replace['errormsg_zerob'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause circular send message.'
+        replace['outcome_infty'] = 'OK'
+        replace['errormsg_infty'] = 'OK'
         gen.make_file(template, f'P2PBuffering_Circular_{s}_{r}_nok.c', replace)
 
         # Generate the incorrect matching depending on the buffering mode (recv + send)
         replace = patterns.copy()
         replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ are not matched'
         replace['longdesc'] = 'Processes 0 and 1 both call @{r}@ and @{s}@. This results in a deadlock'
-        replace['outcome1'] = 'ERROR: CallMatching'
-        replace['errormsg1'] = 'ERROR: CallMatching'
+        replace['outcome_zerob'] = 'ERROR: CallMatching'
+        replace['errormsg_zerob'] = 'ERROR: CallMatching'
+        replace['outcome_infty'] = 'ERROR: CallMatching'
+        replace['errormsg_infty'] = 'ERROR: CallMatching'
         replace['operation1a'] = gen.operation[r]("2")
         replace['fini1a'] = gen.fini[r]("2")
         replace['operation2a'] = gen.operation[s]("1")
@@ -179,10 +187,19 @@ for s in gen.send + gen.isend:
         replace = patterns.copy()
         replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ are correctly  matched'
         replace['longdesc'] = 'Process 0 calls @{s}@ and process 1 calls @{r}@.'
-        replace['outcome1'] = 'OK'
-        replace['errormsg1'] = 'OK'
-        replace['fini1a'] = gen.fini[s]("1")
-        replace['fini2a'] = gen.fini[r]("2")
+        replace['outcome_zerob'] = 'OK'
+        replace['errormsg_zerob'] = 'OK'
+        replace['outcome_infty'] = 'OK'
+        replace['errormsg_infty'] = 'OK'
+        patterns['init1'] = gen.init[s]("1")
         replace['operation1a'] = gen.operation[s]("1")
-        replace['operation2a'] = gen.operation[r]("2")
+        replace['fini1a'] = gen.fini[s]("1")
+        replace['operation2a'] = ''
+        replace['fini2a'] = ''
+        
+        patterns['init2'] = gen.init[r]("2")
+        replace['operation1b'] = gen.operation[r]("2")
+        replace['fini1b'] = gen.fini[r]("2")
+        replace['operation2b'] = ''
+        replace['fini2b'] = ''
         gen.make_file(template, f'P2PCallMatching_{s}_{r}_{r}_{s}_ok.c', replace)
index 8ed991f..ef6fb16 100644 (file)
@@ -126,7 +126,7 @@ for s in gen.send:
             # Generate a code with non distinct buffer
             replace = patterns.copy()
             replace['shortdesc'] = 'Invalid buffer on Sendrecv function.'
-            replace['longdesc'] = 'Invalid buffer on Sendrecv, the tow buffers must be distinct.'
+            replace['longdesc'] = 'Invalid buffer on Sendrecv, the two buffers must be distinct.'
             replace['outcome'] = 'ERROR: InvalidBuffer'
             replace['errormsg'] = '@{sr}@ at @{filename}@:@{line:MBIERROR}@ send buffer and recv buffer must be distinct.'
             replace['change_arg'] = gen.write[sr]("2")
index 3801ae2..af5750b 100644 (file)
@@ -358,7 +358,7 @@ start['MPI_Sendrecv'] = lambda n: ""
 operation['MPI_Sendrecv'] = lambda n: f'MPI_Sendrecv(psbuf{n}, buff_size, type, dest, stag, prbuf{n}, buff_size, type, src, rtag, newcom, &sta{n});'
 fini['MPI_Sendrecv'] = lambda n: ""
 free['MPI_Sendrecv'] = lambda n: ""
-write['MPI_Sendrecv'] = lambda n: f"prbuf{n} = &sbuf{n}[2];"
+write['MPI_Sendrecv'] = lambda n: f"prbuf{n} = &sbuf{n}[0];"
 
 
 ### P2P:nonblocking
index 4f0c8c7..5095296 100644 (file)
@@ -43,6 +43,9 @@ add_custom_target(simgrid_convert_TI_traces ALL
 
 # libraries
 install(TARGETS simgrid DESTINATION ${CMAKE_INSTALL_LIBDIR}/)
+if("${CMAKE_SYSTEM}" MATCHES "Linux")
+  install(TARGETS sthread DESTINATION ${CMAKE_INSTALL_LIBDIR}/)
+endif()
 
 # pkg-config files
 configure_file("${CMAKE_HOME_DIRECTORY}/tools/pkg-config/simgrid.pc.in"
index f41f3b7..e0357bf 100644 (file)
@@ -29,6 +29,7 @@ add_dependencies(simgrid maintainer_files)
 
 if("${CMAKE_SYSTEM}" MATCHES "Linux")
   add_library(sthread SHARED ${STHREAD_SRC})
+  set_target_properties(sthread PROPERTIES VERSION ${libsimgrid_version})
   set_property(TARGET sthread
                 APPEND PROPERTY INCLUDE_DIRECTORIES "${INTERNAL_INCLUDES}")
   target_link_libraries(sthread simgrid)
index a9d9d64..050c850 100755 (executable)
@@ -129,6 +129,13 @@ echo "Branch built is $branch_name"
 
 NUMBER_OF_PROCESSORS="$(nproc)" || NUMBER_OF_PROCESSORS=1
 GENERATOR="Unix Makefiles"
+BUILDER=make
+VERBOSE_BUILD="VERBOSE=1"
+if which ninja 2>/dev/null >/dev/null ; then
+  GENERATOR=Ninja
+  BUILDER=ninja
+  VERBOSE_BUILD="-v"
+fi
 
 ulimit -c 0 || true
 
@@ -169,7 +176,7 @@ echo "XX   pwd: $(pwd)"
 echo "XX"
 
 cmake -G"$GENERATOR" -Denable_documentation=OFF "$WORKSPACE"
-make dist -j $NUMBER_OF_PROCESSORS
+${BUILDER} dist -j $NUMBER_OF_PROCESSORS
 SIMGRID_VERSION=$(cat VERSION)
 
 echo "XX"
@@ -217,7 +224,7 @@ cmake -G"$GENERATOR" ${INSTALL:+-DCMAKE_INSTALL_PREFIX=$INSTALL} \
   -DCMAKE_CXX_COMPILER_LAUNCHER=ccache \
   "$SRCFOLDER"
 
-make -j $NUMBER_OF_PROCESSORS VERBOSE=1 tests
+${BUILDER} -j $NUMBER_OF_PROCESSORS ${VERBOSE_BUILD} tests
 
 echo "XX"
 echo "XX Run the tests"
@@ -233,7 +240,7 @@ if test -n "$INSTALL" && [ "${branch_name}" = "origin/master" ] ; then
 
   rm -rf "$INSTALL"
 
-  make install
+  ${BUILDER} install
 fi
 
 echo "XX"
index 29673a7..6833428 100755 (executable)
@@ -22,6 +22,10 @@ export PATH=$PWD/simgrid-dev/smpi_script/bin/:$PATH
 export LD_LIBRARY_PATH=$PWD/simgrid-dev/lib/:$LD_LIBRARY_PATH
 export JHBUILD_RUN_AS_ROOT=1
 
+#workaround issue with ntpoly 3.0.0
+sed -i 's|repository type="tarball" name="ntpoly" href="https://github.com/william-dawson/NTPoly/archive/"|repository type="git" name="ntpoly" href="https://github.com/william-dawson/"|' ../modulesets/hpc-upstream.modules
+sed -i 's|module="ntpoly-v3.0.0.tar.gz"|module="ntpoly"|' ../modulesets/hpc-upstream.modules
+
 ../Installer.py autogen -y
 
 ../Installer.py -f ../../tools/jenkins/gfortran-simgrid.rc -y build
index f435e01..73864b4 100755 (executable)
@@ -12,12 +12,12 @@ echo "XXXXXXXXXXXXXXXX Install APT dependencies"
 $SUDO apt-get update
 $SUDO apt-get -y install build-essential libboost-all-dev wget git xsltproc
 
-for i in master 1.3 ; do
+for i in master starpu-1.3 ; do
   echo "XXXXXXXXXXXXXXXX Build and test StarPU $i"
   rm -rf starpu*
-  wget https://files.inria.fr/starpu/simgrid/starpu-simgrid-$i.tar.gz
-  md5sum starpu-simgrid-$i.tar.gz
-  tar xf starpu-simgrid-$i.tar.gz
+  wget https://files.inria.fr/starpu/testing/$i/starpu-nightly-latest.tar.gz
+  md5sum starpu-nightly-latest.tar.gz
+  tar xf starpu-nightly-latest.tar.gz
   cd starpu-1*
 
   # NOTE: Do *not* introduce parameters to "make it work" here.
index 4a52a6f..8af5e8d 100755 (executable)
@@ -26,7 +26,7 @@ get_json(){
 }
 
 get_ns3(){
-  sed -n 's/.*-- ns-3 found (v\(3[-.0-9a-z]\+\); minor:.*/\1/p;T;q' ./consoleText
+  sed -n 's/.*-- ns-3 found (v\(3[-.0-9a-z]\+\).*/\1/p;T;q' ./consoleText
 }
 
 get_python(){
index 2799b4f..42fede2 100644 (file)
@@ -19,7 +19,7 @@ $ perl segfault.pl
 p Check that we return the expected return value on SEGV
 ! expect return 11
 < $ perl segfault.pl
-$ ${bindir:=.}/tesh
+$ ${bindir:=.}/tesh --no-auto-valgrind
 > Test suite from stdin
 > [(stdin):1] perl segfault.pl
 > Test suite `(stdin)': NOK (<(stdin):1> got signal SIGSEGV)
index 227d93c..e308580 100644 (file)
@@ -14,6 +14,7 @@
 > @@ -0,0 +1 @@
 > +I crashed
 > Test suite `(stdin)': NOK (<(stdin):2> output mismatch)
+> In addition, <(stdin):2> got signal SIGTERM.
 $ ${bindir:=.}/tesh
 
 
index 2baa1b3..a0e96d5 100755 (executable)
@@ -196,6 +196,7 @@ class TeshState(Singleton):
         self.args_suffix = ""
         self.ignore_regexps_common = []
         self.jenkins = False  # not a Jenkins run by default
+        self.auto_valgrind = True
         self.timeout = 10  # default value: 10 sec
         self.wrapper = None
         self.keep = False
@@ -237,6 +238,7 @@ class Cmd:
         self.output_display = False
 
         self.sort = -1
+        self.rerun_with_valgrind = False
 
         self.ignore_regexps = TeshState().ignore_regexps_common
 
@@ -302,6 +304,14 @@ class Cmd:
             _thread.start_new_thread(Cmd._run, (self, lock))
         else:
             self._run()
+            if self.rerun_with_valgrind and TeshState().auto_valgrind:
+                print('\n\n\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX')
+                print(      'XXXXXXXXX Rerunning this test with valgrind to help debugging it XXXXXXXXX')
+                print(      'XXXXXXXX (this will fail if valgrind is not installed, of course) XXXXXXXX')
+                print(      'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n\n\n')
+
+                self.args = "valgrind " + self.args
+                self._run()
         return True
 
     def _run(self, lock=None):
@@ -411,13 +421,17 @@ class Cmd:
                 print('\n'.join(logs))
                 return
 
-        if self.output_display:
-            logs.append(str(stdout_data))
-
         # remove text colors
         ansi_escape = re.compile(r'\x1b[^m]*m')
         stdout_data = ansi_escape.sub('', stdout_data)
 
+        if self.output_display:
+            logs.append(str(stdout_data))
+
+        if self.rerun_with_valgrind:
+            print(str(stdout_data), file=sys.stderr)
+            return
+
         if self.ignore_output:
             logs.append("(ignoring the output of <{cmd}> as requested)".format(cmd=cmd_name))
         else:
@@ -461,6 +475,18 @@ class Cmd:
 
                 logs.append("Test suite `{file}': NOK (<{cmd}> output mismatch)".format(
                     file=FileReader().filename, cmd=cmd_name))
+
+                # Also report any failed return code and/or signal we got in case of output mismatch
+                if not proc.returncode in self.expect_return:
+                    if proc.returncode >= 0:
+                        logs.append("In addition, <{cmd}> returned code {code}.".format(
+                            cmd=cmd_name, code=proc.returncode))
+                    else:
+                        logs.append("In addition, <{cmd}> got signal {sig}.".format(cmd=cmd_name,
+                            sig=SIGNALS_TO_NAMES_DICT[-proc.returncode]))
+                    if proc.returncode == -signal.SIGSEGV:
+                        self.rerun_with_valgrind = True
+
                 if lock is not None:
                     lock.release()
                 if TeshState().keep:
@@ -495,6 +521,10 @@ class Cmd:
             logs.append("Test suite `{file}': NOK (<{cmd}> got signal {sig})".format(
                 file=FileReader().filename, cmd=cmd_name,
                 sig=SIGNALS_TO_NAMES_DICT[-proc.returncode]))
+
+            if proc.returncode == -signal.SIGSEGV:
+                self.rerun_with_valgrind = True
+
             if lock is not None:
                 lock.release()
             TeshState().set_return_code(max(-proc.returncode, 1))
@@ -535,6 +565,10 @@ def main():
         '--ignore-jenkins',
         action='store_true',
         help='ignore all cruft generated on SimGrid continuous integration servers')
+    group1.add_argument(
+        '--no-auto-valgrind',
+        action='store_true',
+        help='do not automaticall launch segfaulting commands in valgrind')
     group1.add_argument('--wrapper', metavar='arg', help='Run each command in the provided wrapper (eg valgrind)')
     group1.add_argument(
         '--keep',
@@ -568,6 +602,9 @@ def main():
         ]
         TeshState().jenkins = True  # This is a Jenkins build
 
+    if options.no_auto_valgrind:
+        TeshState().auto_valgrind = False
+
     if options.teshfile is None:
         file = FileReader(None)
         print("Test suite from stdin")