add battery-chiller-solar example.
See merge request simgrid/simgrid!174
target=examples
ncores=$(grep -c processor /proc/cpuinfo)
+halfcores=$(expr $ncores / 2 + 1)
install_path=$(sed -n 's/^CMAKE_INSTALL_PREFIX:PATH=//p' CMakeCache.txt)
if [ -e ${install_path} ] && [ -d ${install_path} ] && [ -x ${install_path} ] && [ -w ${install_path} ] ; then
(
echo "install_path: ${install_path}"
echo "Target: ${target}"
- echo "Cores: ${ncores}"
- (nice ${builder} -j${ncores} ${target} tests || make ${target} tests) && nice ctest -j${ncores} --output-on-failure ; date
+ echo "Cores to build: ${ncores}"
+ echo "Cores to test: ${halfcores}"
+ (nice ${builder} -j${ncores} ${target} tests || ${builder} ${target} tests) && nice ctest -j${halfcores} --output-on-failure ; date
) 2>&1 | tee BuildSimGrid.sh.log
endif()
### Check for Eigen library
-set(SIMGRID_HAVE_EIGEN3 OFF)
-find_package (Eigen3 3.3 CONFIG
- HINTS ${EIGEN3_HINT})
-if (Eigen3_FOUND)
- set(SIMGRID_HAVE_EIGEN3 ON)
- message(STATUS "Found Eigen3: ${EIGEN3_INCLUDE_DIR}")
- include_directories(${EIGEN3_INCLUDE_DIR})
- if ("3.3.4" VERSION_EQUAL EIGEN3_VERSION_STRING AND CMAKE_COMPILER_IS_GNUCC)
- message(STATUS "Avoid build error of Eigen3 v3.3.4 using -Wno-error=int-in-bool-context")
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=int-in-bool-context")
+if ((NOT DEFINED EIGEN3_HINT) OR (NOT EIGEN3_HINT STRLESS_EQUAL "OFF"))
+ set(SIMGRID_HAVE_EIGEN3 OFF)
+ find_package (Eigen3 3.3 CONFIG
+ HINTS ${EIGEN3_HINT})
+ if (Eigen3_FOUND)
+ set(SIMGRID_HAVE_EIGEN3 ON)
+ message(STATUS "Found Eigen3: ${EIGEN3_INCLUDE_DIR}")
+ include_directories(${EIGEN3_INCLUDE_DIR})
+ if ("3.3.4" VERSION_EQUAL EIGEN3_VERSION_STRING AND CMAKE_COMPILER_IS_GNUCC)
+ message(STATUS "Avoid build error of Eigen3 v3.3.4 using -Wno-error=int-in-bool-context")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=int-in-bool-context")
+ endif()
+ else()
+ message(STATUS "Disabling model BMF because Eigen3 was not found. If it's installed, use EIGEN3_HINT to hint cmake about the location of Eigen3Config.cmake")
endif()
+ mark_as_advanced(Eigen3_DIR)
else()
- message(STATUS "Disabling model BMF because Eigen3 was not found. If it's installed, use EIGEN3_HINT to hint cmake about the location of Eigen3Config.cmake")
+ message(STATUS "Disabling Eigen3 as requested by the user (EIGEN3_HINT is set to 'OFF')")
endif()
-mark_as_advanced(Eigen3_DIR)
# Check for our JSON dependency
set(SIMGRID_HAVE_JSON 0)
file(WRITE ${PROJECT_BINARY_DIR}/Testing/Notes/Build "GIT version : ${GIT_VERSION}\n")
file(APPEND ${PROJECT_BINARY_DIR}/Testing/Notes/Build "Release : simgrid-${release_version}\n")
-INCLUDE(Dart)
on symmetric routes.
- Introduce a Mailbox::get_async() with no payload parameter. You can use the new
Comm::get_payload() once the communication is over to retrieve the payload.
+ - Implement recursive mutexes. Simply pass true to the constructor to get one.
SMPI:
- New SMPI_app_instance_join(): wait for the completion of a started MPI instance
- MPI_UNIVERSE_SIZE now initialized to the total amount of hosts in the platform
+sthread:
+ - Implement recursive pthreads.
+ - Many bug fixes.
+
Python:
- Make the host_load plugin available from Python. See examples/python/plugin-host-load
- Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
include examples/cpp/network-ns3/s4u-network-ns3.cpp
include examples/cpp/network-wifi/s4u-network-wifi.cpp
include examples/cpp/network-wifi/s4u-network-wifi.tesh
-include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp
-include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh
include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.cpp
include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.tesh
include examples/cpp/platform-failures/s4u-platform-failures.cpp
include examples/cpp/replay-io/s4u-replay-io_d.xml
include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.cpp
include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.tesh
+include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp
+include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh
include examples/cpp/synchro-barrier/s4u-mc-synchro-barrier.tesh
include examples/cpp/synchro-barrier/s4u-synchro-barrier.cpp
include examples/cpp/synchro-barrier/s4u-synchro-barrier.tesh
include examples/cpp/synchro-semaphore/s4u-mc-synchro-semaphore.tesh
include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.cpp
include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.tesh
+include examples/cpp/task-dispatch/s4u-task-dispatch.cpp
+include examples/cpp/task-dispatch/s4u-task-dispatch.tesh
include examples/cpp/task-io/s4u-task-io.cpp
include examples/cpp/task-io/s4u-task-io.tesh
include examples/cpp/task-microservice/s4u-task-microservice.cpp
include examples/smpi/trace_call_location/trace_call_location.tesh
include examples/smpi/trace_simple/trace_simple.c
include examples/smpi/trace_simple/trace_simple.tesh
+include examples/sthread/pthread-mc-mutex-recursive.tesh
include examples/sthread/pthread-mc-mutex-simple.tesh
include examples/sthread/pthread-mc-mutex-simpledeadlock.tesh
include examples/sthread/pthread-mc-producer-consumer.tesh
+include examples/sthread/pthread-mutex-recursive.c
+include examples/sthread/pthread-mutex-recursive.tesh
include examples/sthread/pthread-mutex-simple.c
include examples/sthread/pthread-mutex-simple.tesh
include examples/sthread/pthread-mutex-simpledeadlock.c
- On Debian / Ubuntu: ``apt install libeigen3-dev``
- On CentOS / Fedora: ``dnf install eigen3-devel``
- On macOS with homebrew: ``brew install eigen``
- - Use EIGEN3_HINT to specify where it's installed if cmake doesn't find it automatically.
+ - Use EIGEN3_HINT to specify where it's installed if cmake doesn't find it automatically. Set EIGEN3_HINT=OFF to disable detection even if it could be found.
JSON (optional, for the DAG wfcommons loader)
- On Debian / Ubuntu: ``apt install nlohmann-json3-dev``
- Use nlohmann_json_HINT to specify where it's installed if cmake doesn't find it automatically.
EIGEN3_HINT (empty by default)
Alternative path into which Eigen3 should be searched for.
+ Providing the value OFF as an hint will disable the detection alltogether.
SIMGRID_PYTHON_LIBDIR (auto-detected)
Where to install the Python module library. By default, it is set to the cmake Python3_SITEARCH variable if installing to /usr,
In order to simulate the execution of Dataflow applications, we introduced the
concept of |API_s4u_Tasks|, that can be seen as repeatable activities. A Dataflow
-is defined as a graph of |API_s4u_Tasks| through which circulate Tokens. Tokens
-can carry any user-defined data, using the same internal mechanisms as for the
-other simulated objects. Each Task has to receive a token from each of its
-predecessor to fire a new instance of a :ref:`Communication <API_s4u_Comm>`,
-:ref:`Execution <API_s4u_Exec>`, or :ref:`I/O <API_s4u_Io>` activity.
-On completion of this activity, the Task propagates tokens
-to its successors, and waits for the next set of tokens to arrive.
-Multiple instances of the same Task can run in parallel by adjusting its
-horizontal scaling with
-:cpp:func:`s4u::Task::set_parallelism_degree() <simgrid::s4u::Task::set_parallelism_degree>`.
+is defined as a graph of |API_s4u_Tasks|, where each |API_s4u_Tasks| has a set of
+successors and predecessors. When a |API_s4u_Tasks| ends it sends a token to each
+of its successors. Each |API_s4u_Tasks| has to receive a token from each of its
+predecessor to start. Tokens can carry any user-defined data.
-:ref:`Communications <API_s4u_Comm>` (started on Mailboxes and consuming links),
-:ref:`Executions <API_s4u_Exec>` (started on Host and consuming CPU resources)
-:ref:`I/O <API_s4u_Io>` (started on and consuming disks).
+|API_s4u_Tasks| are composed of several instances: a dispatcher, a collector, and
+instance_0 to instance_n. The dispatcher rely on a load balancing function to select
+the next instance to fire. Once this instance finishes it fires the collector.
+
+Each instance of an |API_s4u_ExecTask| can be placed on a different host.
+|API_s4u_Comm| activities are automatically created when an instance triggers
+another instance on a different host. Each instance has its own parallelism degree
+to scale horizontally on several cores.
To initiate the execution of a Dataflow, it is possible to some make
|API_s4u_Tasks| fire one or more activities without waiting for any token with the
:cpp:func:`s4u::Task::enqueue_firings() <simgrid::s4u::Task::enqueue_firings>`
function.
-The parameters and successors of a Task can be redefined at runtime by attaching
+The parameters of Tasks can be redefined at runtime by attaching
callbacks to the
:cpp:func:`s4u::Task::on_this_start <simgrid::s4u::Task::on_this_start>`
and
:cpp:func:`s4u::Task::on_this_completion <simgrid::s4u::Task::on_this_completion>`
-signals.
+signals. The former is triggered by instances others than the dispatcher and the collector,
+and the latter is triggered by the collector.
+
.. _s4u_mailbox:
mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
network-ns3 network-ns3-wifi network-wifi
io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent
- task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load
+ task-dispatch task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load
solar-panel-simple
platform-comm-serialize platform-failures platform-profile platform-properties
plugin-host-load plugin-jbod plugin-link-load plugin-prodcons
* @param id Internal identifier in the torus (for information)
* @return netpoint, gateway: the netpoint to the StarZone and CPU0 as gateway
*/
-static std::pair<simgrid::kernel::routing::NetPoint*, simgrid::kernel::routing::NetPoint*>
-create_hostzone(const sg4::NetZone* zone, const std::vector<unsigned long>& /*coord*/, unsigned long id)
+static sg4::NetZone* create_hostzone(const sg4::NetZone* zone, const std::vector<unsigned long>& /*coord*/, unsigned long id)
{
constexpr int num_cpus = 8; //!< Number of CPUs in the zone
constexpr double speed = 1e9; //!< Speed of each CPU
/* setting my Torus parent zone */
host_zone->set_parent(zone);
- simgrid::kernel::routing::NetPoint* gateway = nullptr;
/* create CPUs */
for (int i = 0; i < num_cpus; i++) {
std::string cpu_name = hostname + "-cpu" + std::to_string(i);
const sg4::Host* host = host_zone->create_host(cpu_name, speed);
/* the first CPU is the gateway */
if (i == 0)
- gateway = host->get_netpoint();
+ host_zone->set_gateway(host->get_netpoint());
/* create split-duplex link */
auto* link = host_zone->create_split_duplex_link("link-" + cpu_name, link_bw)->set_latency(link_lat);
/* connecting CPU to outer world */
}
/* seal newly created netzone */
host_zone->seal();
- return std::make_pair(host_zone->get_netpoint(), gateway);
+ return host_zone;
}
/*************************************************************************************************/
--- /dev/null
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_dispatch, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
+
+static void manager(sg4::ExecTaskPtr t)
+{
+ auto PM0 = sg4::Engine::get_instance()->host_by_name("PM0");
+ auto PM1 = sg4::Engine::get_instance()->host_by_name("PM1");
+
+ XBT_INFO("Test set_flops");
+ t->enqueue_firings(2);
+ sg4::this_actor::sleep_for(50);
+ XBT_INFO("Set instance_0 flops to 50.");
+ t->set_flops(50 * PM0->get_speed());
+ sg4::this_actor::sleep_for(250);
+ t->set_flops(100 * PM0->get_speed());
+
+ XBT_INFO("Test set_parallelism degree");
+ t->enqueue_firings(3);
+ sg4::this_actor::sleep_for(50);
+ XBT_INFO("Set Task parallelism degree to 2.");
+ t->set_parallelism_degree(2);
+ sg4::this_actor::sleep_for(250);
+ t->set_parallelism_degree(1);
+
+ XBT_INFO("Test set_host dispatcher");
+ t->enqueue_firings(2);
+ sg4::this_actor::sleep_for(50);
+ XBT_INFO("Move dispatcher to PM1");
+ t->set_host(PM1, "dispatcher");
+ t->set_internal_bytes(1e6, "dispatcher");
+ sg4::this_actor::sleep_for(250);
+ t->set_host(PM0, "dispatcher");
+
+ XBT_INFO("Test set_host instance_0");
+ t->enqueue_firings(2);
+ sg4::this_actor::sleep_for(50);
+ XBT_INFO("Move instance_0 to PM1");
+ t->set_host(PM1, "instance_0");
+ t->set_flops(100 * PM1->get_speed());
+ t->set_internal_bytes(1e6, "instance_0");
+ sg4::this_actor::sleep_for(250);
+ t->set_host(PM0, "instance_0");
+ t->set_flops(100 * PM0->get_speed());
+
+ XBT_INFO("Test set_host collector");
+ t->enqueue_firings(2);
+ sg4::this_actor::sleep_for(50);
+ XBT_INFO("Move collector to PM1");
+ t->set_host(PM1, "collector");
+ sg4::this_actor::sleep_for(250);
+ t->set_host(PM0, "collector");
+
+ XBT_INFO("Test add_instances");
+ t->enqueue_firings(1);
+ sg4::this_actor::sleep_for(50);
+ XBT_INFO("Add 1 instance and update load balancing function");
+ t->add_instances(1);
+ t->set_load_balancing_function([]() {
+ static int round_robin_counter = 0;
+ int ret = round_robin_counter;
+ round_robin_counter = round_robin_counter == 1 ? 0 : round_robin_counter + 1;
+ return "instance_" + std::to_string(ret);
+ });
+ t->enqueue_firings(2);
+ sg4::this_actor::sleep_for(250);
+
+ XBT_INFO("Test remove_instances");
+ XBT_INFO("Remove 1 instance and update load balancing function");
+ t->remove_instances(1);
+ t->set_load_balancing_function([]() { return "instance_0"; });
+ t->enqueue_firings(2);
+ sg4::this_actor::sleep_for(300);
+}
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+ auto PM0 = e.host_by_name("PM0");
+ auto PM1 = sg4::Engine::get_instance()->host_by_name("PM1");
+
+ auto a = sg4::ExecTask::init("A", 100 * PM0->get_speed(), PM0);
+ auto b = sg4::ExecTask::init("B", 50 * PM0->get_speed(), PM0);
+ auto c = sg4::CommTask::init("C", 1e6, PM1, PM0);
+
+ a->add_successor(b);
+
+ sg4::Task::on_completion_cb(
+ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
+ sg4::Task::on_start_cb([](const sg4::Task* t) { XBT_INFO("Task %s start", t->get_name().c_str()); });
+
+ sg4::Actor::create("manager", PM0, manager, a);
+
+ e.run();
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+> > $ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml
+> [PM0:manager:(1) 0.000000] [task_dispatch/INFO] Test set_flops
+> [0.000000] [task_dispatch/INFO] Task A start
+> [0.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 50.000000] [task_dispatch/INFO] Set instance_0 flops to 50.
+> [100.000000] [task_dispatch/INFO] Task A finished (1)
+> [100.000000] [task_dispatch/INFO] Task B start
+> [150.000000] [task_dispatch/INFO] Task A finished (2)
+> [150.000000] [task_dispatch/INFO] Task B start
+> [150.000000] [task_dispatch/INFO] Task B finished (1)
+> [200.000000] [task_dispatch/INFO] Task B finished (2)
+> [PM0:manager:(1) 300.000000] [task_dispatch/INFO] Test set_parallelism degree
+> [300.000000] [task_dispatch/INFO] Task A start
+> [300.000000] [task_dispatch/INFO] Task A start
+> [300.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 350.000000] [task_dispatch/INFO] Set Task parallelism degree to 2.
+> [400.000000] [task_dispatch/INFO] Task A finished (3)
+> [400.000000] [task_dispatch/INFO] Task B start
+> [450.000000] [task_dispatch/INFO] Task A finished (4)
+> [450.000000] [task_dispatch/INFO] Task B start
+> [450.000000] [task_dispatch/INFO] Task B finished (3)
+> [500.000000] [task_dispatch/INFO] Task A finished (5)
+> [500.000000] [task_dispatch/INFO] Task B start
+> [500.000000] [task_dispatch/INFO] Task B finished (4)
+> [550.000000] [task_dispatch/INFO] Task B finished (5)
+> [PM0:manager:(1) 600.000000] [task_dispatch/INFO] Test set_host dispatcher
+> [600.000000] [task_dispatch/INFO] Task A start
+> [600.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 650.000000] [task_dispatch/INFO] Move dispatcher to PM1
+> [700.000000] [task_dispatch/INFO] Task A finished (6)
+> [700.000000] [task_dispatch/INFO] Task B start
+> [750.000000] [task_dispatch/INFO] Task B finished (6)
+> [800.009961] [task_dispatch/INFO] Task A finished (7)
+> [800.009961] [task_dispatch/INFO] Task B start
+> [850.009961] [task_dispatch/INFO] Task B finished (7)
+> [PM0:manager:(1) 900.000000] [task_dispatch/INFO] Test set_host instance_0
+> [900.000000] [task_dispatch/INFO] Task A start
+> [900.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 950.000000] [task_dispatch/INFO] Move instance_0 to PM1
+> [1000.000000] [task_dispatch/INFO] Task A finished (8)
+> [1000.000000] [task_dispatch/INFO] Task B start
+> [1050.000000] [task_dispatch/INFO] Task B finished (8)
+> [1100.019922] [task_dispatch/INFO] Task A finished (9)
+> [1100.019922] [task_dispatch/INFO] Task B start
+> [1150.019922] [task_dispatch/INFO] Task B finished (9)
+> [PM0:manager:(1) 1200.000000] [task_dispatch/INFO] Test set_host collector
+> [1200.000000] [task_dispatch/INFO] Task A start
+> [1200.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 1250.000000] [task_dispatch/INFO] Move collector to PM1
+> [1300.000000] [task_dispatch/INFO] Task A finished (10)
+> [1300.000000] [task_dispatch/INFO] Task B start
+> [1350.000000] [task_dispatch/INFO] Task B finished (10)
+> [1400.009961] [task_dispatch/INFO] Task A finished (11)
+> [1400.009961] [task_dispatch/INFO] Task B start
+> [1450.009961] [task_dispatch/INFO] Task B finished (11)
+> [PM0:manager:(1) 1500.000000] [task_dispatch/INFO] Test add_instances
+> [1500.000000] [task_dispatch/INFO] Task A start
+> [PM0:manager:(1) 1550.000000] [task_dispatch/INFO] Add 1 instance and update load balancing function
+> [1550.000000] [task_dispatch/INFO] Task A start
+> [1550.000000] [task_dispatch/INFO] Task A start
+> [1600.000000] [task_dispatch/INFO] Task A finished (12)
+> [1600.000000] [task_dispatch/INFO] Task B start
+> [1650.000000] [task_dispatch/INFO] Task A finished (13)
+> [1650.000000] [task_dispatch/INFO] Task B start
+> [1650.000000] [task_dispatch/INFO] Task B finished (12)
+> [1700.000000] [task_dispatch/INFO] Task A finished (14)
+> [1700.000000] [task_dispatch/INFO] Task B start
+> [1700.000000] [task_dispatch/INFO] Task B finished (13)
+> [1750.000000] [task_dispatch/INFO] Task B finished (14)
+> [PM0:manager:(1) 1800.000000] [task_dispatch/INFO] Test remove_instances
+> [PM0:manager:(1) 1800.000000] [task_dispatch/INFO] Remove 1 instance and update load balancing function
+> [1800.000000] [task_dispatch/INFO] Task A start
+> [1800.000000] [task_dispatch/INFO] Task A start
+> [1900.000000] [task_dispatch/INFO] Task A finished (15)
+> [1900.000000] [task_dispatch/INFO] Task B start
+> [1950.000000] [task_dispatch/INFO] Task B finished (15)
+> [2000.000000] [task_dispatch/INFO] Task A finished (16)
+> [2000.000000] [task_dispatch/INFO] Task B start
+> [2050.000000] [task_dispatch/INFO] Task B finished (16)
+>
\ No newline at end of file
$ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml
> [0.000000] [task_parallelism/INFO] Task exec_A start
-> [100.000000] [task_parallelism/INFO] Task exec_A finished (1)
> [100.000000] [task_parallelism/INFO] Task exec_A start
+> [100.000000] [task_parallelism/INFO] Task exec_A finished (1)
> [200.000000] [task_parallelism/INFO] Task exec_A finished (2)
> [300.000000] [task_parallelism/INFO] Task exec_A start
> [300.000000] [task_parallelism/INFO] Task exec_A start
-> [400.000000] [task_parallelism/INFO] Task exec_A finished (3)
> [400.000000] [task_parallelism/INFO] Task exec_A start
-> [400.000000] [task_parallelism/INFO] Task exec_A finished (4)
> [400.000000] [task_parallelism/INFO] Task exec_A start
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (3)
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (4)
> [500.000000] [task_parallelism/INFO] Task exec_A finished (5)
> [500.000000] [task_parallelism/INFO] Task exec_A finished (6)
> [600.000000] [task_parallelism/INFO] Task exec_A start
-> [700.000000] [task_parallelism/INFO] Task exec_A finished (7)
> [700.000000] [task_parallelism/INFO] Task exec_A start
+> [700.000000] [task_parallelism/INFO] Task exec_A finished (7)
> [800.000000] [task_parallelism/INFO] Task exec_A finished (8)
> [900.000000] [task_parallelism/INFO] Task exec_A start
> [900.000000] [task_parallelism/INFO] Task exec_A start
-> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9)
> [1000.000000] [task_parallelism/INFO] Task exec_A start
-> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10)
> [1000.000000] [task_parallelism/INFO] Task exec_A start
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9)
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10)
+> [1100.000000] [task_parallelism/INFO] Task exec_A start
> [1100.000000] [task_parallelism/INFO] Task exec_A finished (11)
> [1100.000000] [task_parallelism/INFO] Task exec_A finished (12)
-> [1100.000000] [task_parallelism/INFO] Task exec_A start
-> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13)
> [1200.000000] [task_parallelism/INFO] Task exec_A start
+> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13)
> [1250.000000] [task_parallelism/INFO] Task exec_A start
> [1250.000000] [task_parallelism/INFO] Task exec_A start
-> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14)
> [1300.000000] [task_parallelism/INFO] Task exec_A start
-> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15)
+> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14)
> [1350.000000] [task_parallelism/INFO] Task exec_A start
-> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16)
> [1350.000000] [task_parallelism/INFO] Task exec_A start
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15)
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16)
> [1400.000000] [task_parallelism/INFO] Task exec_A finished (17)
> [1450.000000] [task_parallelism/INFO] Task exec_A finished (18)
> [1450.000000] [task_parallelism/INFO] Task exec_A finished (19)
\ No newline at end of file
Alternatively we: remove/add the link between SA and SA_to_B2
add/remove the link between SA and SA_to_B1
*/
- SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) {
+ SA->on_this_completion_cb([&SA_to_B1, &SA_to_B2](sg4::Task* t) {
int count = t->get_count();
sg4::CommTaskPtr comm;
- if (count % 2 == 0) {
+ if (count % 2 == 1) {
t->remove_successor(SA_to_B2);
t->add_successor(SA_to_B1);
comm = SA_to_B1;
t->add_successor(SA_to_B2);
comm = SA_to_B2;
}
- std::vector<double> amount = {1e3, 1e6, 1e9};
+ std::vector<double> amount = {1e9, 1e3, 1e6};
+ // XBT_INFO("Comm %f", amount[count % 3]);
comm->set_amount(amount[count % 3]);
auto token = std::make_shared<sg4::Token>();
token->set_data(new double(amount[count % 3]));
});
// The token sent by SA is forwarded by both communication tasks
- SA_to_B1->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
- SA_to_B2->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
+ SA_to_B1->on_this_completion_cb([&SA](sg4::Task* t) {
+ t->set_token(t->get_token_from(SA));
+ t->deque_token_from(SA);
+ });
+ SA_to_B2->on_this_completion_cb([&SA](sg4::Task* t) {
+ t->set_token(t->get_token_from(SA));
+ t->deque_token_from(SA);
+ });
/* B1 and B2 read the value of the token received by their predecessors
and use it to adapt their amount of work to do.
*/
- B1->on_this_start_cb([SA_to_B1](sg4::Task* t) {
- auto data = t->get_next_token_from(SA_to_B1)->get_unique_data<double>();
+ B1->on_this_start_cb([&SA_to_B1](sg4::Task* t) {
+ auto data = t->get_token_from(SA_to_B1)->get_data<double>();
+ t->deque_token_from(SA_to_B1);
t->set_amount(*data * 10);
});
- B2->on_this_start_cb([SA_to_B2](sg4::Task* t) {
- auto data = t->get_next_token_from(SA_to_B2)->get_unique_data<double>();
+ B2->on_this_start_cb([&SA_to_B2](sg4::Task* t) {
+ auto data = t->get_token_from(SA_to_B2)->get_data<double>();
+ t->deque_token_from(SA_to_B2);
t->set_amount(*data * 10);
});
> [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5)
> [2.619232] [task_storm/INFO] Task B3 finished (1)
> [6.743624] [task_storm/INFO] Task B3 finished (2)
-> [10.868015] [task_storm/INFO] Task B3 finished (3)
> [10.868015] [task_storm/INFO] Task B4 finished (1)
+> [10.868015] [task_storm/INFO] Task B3 finished (3)
> [14.992407] [task_storm/INFO] Task B3 finished (4)
-> [19.116799] [task_storm/INFO] Task B3 finished (5)
> [19.116799] [task_storm/INFO] Task B4 finished (2)
+> [19.116799] [task_storm/INFO] Task B3 finished (5)
> [23.241190] [task_storm/INFO] Task B4 finished (3)
> [27.365582] [task_storm/INFO] Task B4 finished (4)
> [31.489974] [task_storm/INFO] Task B4 finished (5)
// successors to comm0
comm0->on_this_start_cb([&comm0, exec1, exec2, jupiter, fafard](const sg4::Task*) {
static int count = 0;
- if (count % 2 == 0) {
+ if (count % 2 == 0)
comm0->set_destination(jupiter);
+ else
+ comm0->set_destination(fafard);
+ count++;
+ });
+
+ comm0->on_this_completion_cb([&comm0, exec1, exec2](const sg4::Task*) {
+ static int count = 0;
+ if (count % 2 == 0) {
comm0->add_successor(exec1);
comm0->remove_successor(exec2);
} else {
- comm0->set_destination(fafard);
comm0->add_successor(exec2);
comm0->remove_successor(exec1);
}
return parser.parse_args()
def callback(t):
- print(f'[{Engine.clock}] {t} finished ({t.count})')
+ print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
if __name__ == '__main__':
args = parse()
return parser.parse_args()
def callback(t):
- print(f'[{Engine.clock}] {t} finished ({t.count})')
+ print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
if __name__ == '__main__':
args = parse()
> [11.714617112501687] CommTask(comm) finished (1)
> [20.388399000968448] ExecTask(exec1) finished (2)
> [21.90881661298591] CommTask(comm) finished (2)
-> [24.82146412938331] ExecTask(exec2) finished (1)
-> [37.92831114626493] ExecTask(exec2) finished (2)
+> [24.821464129383305] ExecTask(exec2) finished (1)
+> [37.928311146264925] ExecTask(exec2) finished (2)
return parser.parse_args()
def callback(t):
- print(f'[{Engine.clock}] {t} finished ({t.count})')
+ print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
-def switch(t, hosts, execs):
- comm0.destination = hosts[t.count % 2]
- comm0.remove_successor(execs[t.count % 2 - 1])
- comm0.add_successor(execs[t.count % 2])
+def switch_destination(t, hosts):
+ t.destination = hosts[switch_destination.count % 2]
+ switch_destination.count += 1
+switch_destination.count = 0
+
+def switch_successor(t, execs):
+ t.remove_successor(execs[t.get_count() % 2])
+ t.add_successor(execs[t.get_count() % 2 - 1])
if __name__ == '__main__':
args = parse()
exec1.add_successor(comm1)
exec2.add_successor(comm2)
- # Add a function to be called when tasks end for log purpose
+ # Add a callback when tasks end for log purpose
Task.on_completion_cb(callback)
- # Add a function to be called before each firing of comm0
- # This function modifies the graph of tasks by adding or removing
- # successors to comm0
- comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2]))
+ # Add a callback before each firing of comm0
+ # It switches the destination of comm0
+ comm0.on_this_start_cb(lambda t: switch_destination(t, [jupiter, fafard]))
+
+ # Add a callback before comm0 send tokens to successors
+ # It switches the successor of comm0
+ comm0.on_this_completion_cb(lambda t: switch_successor(t, [exec1,exec2]))
# Enqueue two firings for task exec1
comm0.enqueue_firings(4)
return parser.parse_args()
def callback(t):
- print(f'[{Engine.clock}] {t} finished ({t.count})')
+ print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
def variable_load(t):
print('--- Small load ---')
> [0.000000] [ker_engine/INFO] 2 actors are still running, waiting for something.
> [0.000000] [ker_engine/INFO] Legend of the following listing: "Actor <pid> (<name>@<host>): <status>"
> [0.000000] [ker_engine/INFO] Actor 1 (0@node-0.simgrid.org) simcall CommWait(comm_id:1 src:1 dst:-1 mbox:SMPI-2(id:2))
-> [0.000000] [ker_engine/INFO] Actor 2 (1@node-1.simgrid.org) simcall CommWait(comm_id:2 src:2 dst:-1 mbox:SMPI-1(id:0))
+> [0.000000] [ker_engine/INFO] Actor 2 (1@node-1.simgrid.org) simcall CommWait(comm_id:2 src:2 dst:-1 mbox:SMPI-1(id:3))
> [0.000000] [mc_global/INFO] Counter-example execution trace:
> [0.000000] [mc_global/INFO] Actor 1 in :0:() ==> simcall: iSend(mbox=2)
-> [0.000000] [mc_global/INFO] Actor 2 in :0:() ==> simcall: iSend(mbox=0)
+> [0.000000] [mc_global/INFO] Actor 2 in :0:() ==> simcall: iSend(mbox=3)
> [0.000000] [mc_Session/INFO] You can debug the problem (and see the whole details) by rerunning out of simgrid-mc with --cfg=model-check/replay:'1;2'
> [0.000000] [mc_dfs/INFO] DFS exploration ended. 3 unique states visited; 0 backtracks (0 transition replays, 3 states visited overall)
> Execution failed with code 3.
#########################################################################
foreach(x
- mutex-simple
+ mutex-simple mutex-recursive
producer-consumer)
if("${CMAKE_SYSTEM}" MATCHES "Linux")
--- /dev/null
+# We ignore the LD_PRELOAD lines from the expected output because they contain the build path
+! ignore .*LD_PRELOAD.*
+
+$ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-mutex-recursive
+> sthread is intercepting the execution of ./pthread-mutex-recursive
+> [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor.
+> Got the lock on the default mutex.
+> Failed to relock the default mutex.
+> Got the lock on the recursive mutex.
+> Got the lock again on the recursive mutex.
+> Got the lock on the default mutex.
+> Failed to relock the default mutex.
+> Got the lock on the recursive mutex.
+> Got the lock again on the recursive mutex.
+> [0.000000] [mc_dfs/INFO] DFS exploration ended. 17 unique states visited; 1 backtracks (3 transition replays, 21 states visited overall)
! ignore .*LD_PRELOAD.*
$ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-mutex-simple
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-mutex-simple
> All threads are started.
> [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor.
> The thread 0 is terminating.
! ignore .*LD_PRELOAD.*
$ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-mutex-simpledeadlock
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-mutex-simpledeadlock
> All threads are started.
> [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor.
> The thread 0 is terminating.
! ignore .*LD_PRELOAD.*
$ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-producer-consumer -q -C 1 -P 1
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-producer-consumer
> [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor.
> [0.000000] [mc_dfs/INFO] DFS exploration ended. 786 unique states visited; 97 backtracks (2049 transition replays, 2932 states visited overall)
$ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/reduction:sdpor --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-producer-consumer -q -C 1 -P 1
> [0.000000] [xbt_cfg/INFO] Configuration change: Set 'model-check/reduction' to 'sdpor'
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-producer-consumer
> [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: sdpor.
> [0.000000] [mc_dfs/INFO] DFS exploration ended. 1186 unique states visited; 157 backtracks (3403 transition replays, 4746 states visited overall)
$ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/reduction:odpor --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-producer-consumer -q -C 1 -P 1
> [0.000000] [xbt_cfg/INFO] Configuration change: Set 'model-check/reduction' to 'odpor'
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-producer-consumer
> [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: odpor.
> [0.000000] [mc_dfs/INFO] DFS exploration ended. 39 unique states visited; 0 backtracks (0 transition replays, 39 states visited overall)
--- /dev/null
+/* Copyright (c) 2002-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* Code with both recursive and non-recursive mutexes */
+
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+// Structure to hold the mutex's name and pointer to the actual mutex
+typedef struct {
+ const char* name;
+ pthread_mutex_t* mutex;
+} ThreadData;
+
+static void* thread_function(void* arg)
+{
+ ThreadData* data = (ThreadData*)arg;
+ pthread_mutex_t* mutex = data->mutex;
+ const char* name = data->name;
+
+ pthread_mutex_lock(mutex);
+ fprintf(stderr, "Got the lock on the %s mutex.\n", name);
+
+ // Attempt to relock the mutex - This behavior depends on the mutex type
+ if (pthread_mutex_trylock(mutex) == 0) {
+ fprintf(stderr, "Got the lock again on the %s mutex.\n", name);
+ pthread_mutex_unlock(mutex);
+ } else {
+ fprintf(stderr, "Failed to relock the %s mutex.\n", name);
+ }
+
+ pthread_mutex_unlock(mutex);
+
+ // pthread_exit(NULL); TODO: segfaulting
+ return NULL;
+}
+
+int main()
+{
+ pthread_t thread1, thread2;
+ pthread_mutex_t mutex_dflt = PTHREAD_MUTEX_INITIALIZER; // Non-recursive mutex
+
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
+ pthread_mutex_t mutex_rec;
+ pthread_mutex_init(&mutex_rec, &attr);
+
+ ThreadData data1 = {"default", &mutex_dflt};
+ ThreadData data2 = {"recursive", &mutex_rec};
+
+ pthread_create(&thread1, NULL, thread_function, &data1);
+ pthread_create(&thread2, NULL, thread_function, &data2);
+
+ pthread_join(thread1, NULL);
+ pthread_join(thread2, NULL);
+
+ pthread_mutex_destroy(&mutex_dflt);
+ pthread_mutex_destroy(&mutex_rec);
+
+ return 0;
+}
--- /dev/null
+$ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-mutex-recursive
+> sthread is intercepting the execution of ./pthread-mutex-recursive
+> Got the lock on the default mutex.
+> Failed to relock the default mutex.
+> Got the lock on the recursive mutex.
+> Got the lock again on the recursive mutex.
+> [0.000000] [sthread/INFO] All threads exited. Terminating the simulation.
+/* Copyright (c) 2002-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
/* Simple test code with no bug */
#include <pthread.h>
$ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-mutex-simple
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-mutex-simple
> All threads are started.
> The thread 0 is terminating.
> The thread 1 is terminating.
+/* Copyright (c) 2002-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
/* Simple test code that may deadlock:
Thread 1 locks mutex1 then mutex2 while thread 2 locks in reverse order.
$ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-producer-consumer
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-producer-consumer
> Producer 1: Insert Item 0 at 0
> Producer 2: Insert Item 0 at 1
> Consumer 1: Remove Item 0 from 0
> [0.000000] [sthread/INFO] All threads exited. Terminating the simulation.
$ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-producer-consumer -c 2 -C 1 -p 2 -P 1
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./pthread-producer-consumer
> Producer 1: Insert Item 0 at 0
> Consumer 1: Remove Item 0 from 0
> Producer 1: Insert Item 1 at 1
std::vector<int> v = {1, 2, 3, 5, 8, 13};
extern "C" {
-extern int sthread_access_begin(void* addr, const char* objname, const char* file, int line) __attribute__((weak));
-extern void sthread_access_end(void* addr, const char* objname, const char* file, int line) __attribute__((weak));
+extern int sthread_access_begin(void* addr, const char* objname, const char* file, int line, const char* func)
+ __attribute__((weak));
+extern void sthread_access_end(void* addr, const char* objname, const char* file, int line, const char* func)
+ __attribute__((weak));
}
#define STHREAD_ACCESS(obj) \
- for (bool first = sthread_access_begin(static_cast<void*>(obj), #obj, __FILE__, __LINE__) || true; first; \
- sthread_access_end(static_cast<void*>(obj), #obj, __FILE__, __LINE__), first = false)
+ for (bool first = sthread_access_begin(static_cast<void*>(obj), #obj, __FILE__, __LINE__, __FUNCTION__) || true; \
+ first; sthread_access_end(static_cast<void*>(obj), #obj, __FILE__, __LINE__, __FUNCTION__), first = false)
static void thread_code()
{
! ignore .*LD_PRELOAD.*
$ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsthread.so ${bindir:=.}/stdobject "--log=root.fmt:[%11.6r]%e(%a@%h)%e%m%n" --log=no_loc
-> [ 0.000000] (maestro@) Starting the simulation.
+> sthread is intercepting the execution of ./stdobject
> starting two helpers...
> waiting for helpers to finish...
> [ 0.000000] (maestro@) Start a DFS exploration. Reduction is: dpor.
> v = { 1, 2, 3, 5, 8, 13, 21, 21, };
> [ 0.000000] (maestro@) thread 1 takes &v
> [ 0.000000] (maestro@) thread 2 takes &v
-> [ 0.000000] (maestro@) Unprotected concurent access to &v: thread 1 vs thread 2 (locations hidden because of --log=no_loc).
+> [ 0.000000] (maestro@) Unprotected concurent access to &v: thread 1 from 1 location vs thread 2 (locations hidden because of --log=no_loc).
> [ 0.000000] (maestro@) **************************
> [ 0.000000] (maestro@) *** PROPERTY NOT VALID ***
> [ 0.000000] (maestro@) **************************
$ ./sthread-mutex-simple
-> [0.000000] [sthread/INFO] Starting the simulation.
+> sthread is intercepting the execution of ./sthread-mutex-simple
> All threads are started.
> The thread 0 is terminating.
> The thread 1 is terminating.
class Simcall;
class SimcallObserver;
+class MutexObserver;
class ObjectAccessSimcallObserver;
class ObjectAccessSimcallItem;
} // namespace actor
class SplitDuplexLinkImpl;
class NetworkAction;
class DiskImpl;
+using DiskImplPtr = boost::intrusive_ptr<DiskImpl>;
class DiskModel;
class VirtualMachineImpl;
class VMModel;
namespace simgrid::s4u {
/** @brief A classical mutex, but blocking in the simulation world.
+ *
+ * S4U mutexes are not recursive. If an actor tries to lock the same object twice, it deadlocks with itself.
*
* @beginrst
* It is strictly impossible to use a real mutex, such as
public:
/** \static Constructs a new mutex */
- static MutexPtr create();
+ static MutexPtr create(bool recursive = false);
+
void lock();
void unlock();
bool try_lock();
* @param id: Internal identifier of the element
* @return pair<NetPoint*, NetPoint*>: returns a pair of netpoint and gateway.
*/
+ // XBT_ATTRIB_DEPRECATED_v339
using ClusterNetPointCb = std::pair<kernel::routing::NetPoint*, kernel::routing::NetPoint*>(
NetZone* zone, const std::vector<unsigned long>& coord, unsigned long id);
+
+ /**
+ * @brief Callback used to set the NetZone located at some leaf of clusters (Torus, FatTree, etc)
+ *
+ * @param zone: The parent zone, needed for creating new resources (hosts, links)
+ * @param coord: the coordinates of the element
+ * @param id: Internal identifier of the element
+ * @return NetZone*: returns newly created netzone
+ */
+ using ClusterNetZoneCb = NetZone*(NetZone* zone, const std::vector<unsigned long>& coord, unsigned long id);
+ /**
+ * @brief Callback used to set the Host located at some leaf of clusters (Torus, FatTree, etc)
+ *
+ * @param zone: The parent zone, needed for creating new resources (hosts, links)
+ * @param coord: the coordinates of the element
+ * @param id: Internal identifier of the element
+ * @return Host*: returns newly created host
+ */
+ using ClusterHostCb = Host*(NetZone* zone, const std::vector<unsigned long>& coord, unsigned long id);
+
/**
* @brief Callback used to set the links for some leaf of the cluster (Torus, FatTree, etc)
*
*/
using ClusterLinkCb = Link*(NetZone* zone, const std::vector<unsigned long>& coord, unsigned long id);
- std::function<ClusterNetPointCb> netpoint;
+ bool by_netzone_ = false;
+ bool is_by_netzone() const { return by_netzone_; }
+ bool by_netpoint_ = false; // XBT_ATTRIB_DEPRECATED_v339
+ bool is_by_netpoint() const { return by_netpoint_; } // XBT_ATTRIB_DEPRECATED_v339
+ std::function<ClusterNetPointCb> netpoint; // XBT_ATTRIB_DEPRECATED_v339
+ std::function<ClusterHostCb> host;
+ std::function<ClusterNetZoneCb> netzone;
std::function<ClusterLinkCb> loopback = {};
std::function<ClusterLinkCb> limiter = {};
+ explicit ClusterCallbacks(const std::function<ClusterNetZoneCb>& set_netzone)
+ : by_netzone_(true), netzone(set_netzone){/* nothing to do */};
+
+ ClusterCallbacks(const std::function<ClusterNetZoneCb>& set_netzone,
+ const std::function<ClusterLinkCb>& set_loopback, const std::function<ClusterLinkCb>& set_limiter)
+ : by_netzone_(true), netzone(set_netzone), loopback(set_loopback), limiter(set_limiter){/* nothing to do */};
+
+ explicit ClusterCallbacks(const std::function<ClusterHostCb>& set_host)
+ : host(set_host) {/* nothing to do */};
+
+ ClusterCallbacks(const std::function<ClusterHostCb>& set_host,
+ const std::function<ClusterLinkCb>& set_loopback, const std::function<ClusterLinkCb>& set_limiter)
+ : host(set_host), loopback(set_loopback), limiter(set_limiter){/* nothing to do */};
+
+ XBT_ATTRIB_DEPRECATED_v339("Please use callback with either a Host/NetZone creation function as first parameter")
explicit ClusterCallbacks(const std::function<ClusterNetPointCb>& set_netpoint)
- : netpoint(set_netpoint){/*nothing to do */};
+ : by_netpoint_(true), netpoint(set_netpoint){/* nothing to do */};
+ XBT_ATTRIB_DEPRECATED_v339("Please use callback with either a Host/NetZone creation function as first parameter")
ClusterCallbacks(const std::function<ClusterNetPointCb>& set_netpoint,
const std::function<ClusterLinkCb>& set_loopback, const std::function<ClusterLinkCb>& set_limiter)
- : netpoint(set_netpoint), loopback(set_loopback), limiter(set_limiter){/*nothing to do */};
+ : by_netpoint_(true), netpoint(set_netpoint), loopback(set_loopback), limiter(set_limiter){/* nothing to do */};
};
/**
* @brief Create a torus zone
#include <map>
#include <memory>
#include <set>
+#include <xbt/asserts.h>
namespace simgrid::s4u {
class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
class Task {
+
std::string name_;
- double amount_;
- int queued_firings_ = 0;
- int count_ = 0;
- int running_instances_ = 0;
- int parallelism_degree_ = 1;
+
+ std::map<std::string, double> amount_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}};
+ std::map<std::string, int> queued_firings_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}};
+ std::map<std::string, int> running_instances_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}};
+ std::map<std::string, int> count_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}};
+ std::map<std::string, int> parallelism_degree_ = {{"instance_0", 1}, {"dispatcher", 1}, {"collector", 1}};
+ std::map<std::string, int> internal_bytes_to_send_ = {{"instance_0", 0}, {"dispatcher", 0}};
+
+ std::function<std::string()> load_balancing_function_;
std::set<Task*> successors_ = {};
std::map<Task*, unsigned int> predecessors_ = {};
std::atomic_int_fast32_t refcount_{0};
- bool ready_to_run() const;
+ bool ready_to_run(std::string instance);
void receive(Task* source);
std::shared_ptr<Token> token_ = nullptr;
- std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
- std::deque<ActivityPtr> current_activities_;
+ std::map<TaskPtr, std::deque<std::shared_ptr<Token>>> tokens_received_;
+ std::map<std::string, std::deque<ActivityPtr>> current_activities_ = {
+ {"instance_0", {}}, {"dispatcher", {}}, {"collector", {}}};
inline static xbt::signal<void(Task*)> on_start;
xbt::signal<void(Task*)> on_this_start;
explicit Task(const std::string& name);
virtual ~Task() = default;
- virtual void fire();
- void complete();
+ virtual void fire(std::string instance);
+ void complete(std::string instance);
- void store_activity(ActivityPtr a) { current_activities_.push_back(a); }
+ void store_activity(ActivityPtr a, std::string instance) { current_activities_[instance].push_back(a); }
+
+ virtual void add_instances(int n);
+ virtual void remove_instances(int n);
public:
void set_name(std::string name);
const std::string& get_name() const { return name_; }
const char* get_cname() const { return name_.c_str(); }
- void set_amount(double amount);
- double get_amount() const { return amount_; }
- int get_count() const { return count_; }
- void set_parallelism_degree(int n);
- int get_parallelism_degree() const { return parallelism_degree_; }
+ void set_amount(double amount, std::string instance = "instance_0");
+ double get_amount(std::string instance = "instance_0") const { return amount_.at(instance); }
+ int get_queued_firings(std::string instance = "instance_0") { return queued_firings_.at(instance); }
+ int get_running_count(std::string instance = "instance_0") { return running_instances_.at(instance); }
+ int get_count(std::string instance = "collector") const { return count_.at(instance); }
+ void set_parallelism_degree(int n, std::string instance = "all");
+ int get_parallelism_degree(std::string instance = "instance_0") const { return parallelism_degree_.at(instance); }
+ void set_internal_bytes(int bytes, std::string instance = "instance_0");
+ double get_internal_bytes(std::string instance = "instance_0") const { return internal_bytes_to_send_.at(instance); }
+ void set_load_balancing_function(std::function<std::string()> func);
void set_token(std::shared_ptr<Token> token);
- std::shared_ptr<Token> get_next_token_from(TaskPtr t);
+ std::shared_ptr<Token> get_token_from(TaskPtr t) const { return tokens_received_.at(t).front(); }
+ std::deque<std::shared_ptr<Token>> get_tokens_from(TaskPtr t) const { return tokens_received_.at(t); }
+ void deque_token_from(TaskPtr t);
void add_successor(TaskPtr t);
void remove_successor(TaskPtr t);
Host* destination_;
explicit CommTask(const std::string& name);
- void fire() override;
+ void fire(std::string instance) override;
public:
static CommTaskPtr init(const std::string& name);
CommTaskPtr set_destination(Host* destination);
Host* get_destination() const { return destination_; }
CommTaskPtr set_bytes(double bytes);
- double get_bytes() const { return get_amount(); }
+ double get_bytes() const { return get_amount("instance_0"); }
};
class ExecTask : public Task {
- Host* host_;
+ std::map<std::string, Host*> host_ = {{"instance_0", nullptr}, {"dispatcher", nullptr}, {"collector", nullptr}};
explicit ExecTask(const std::string& name);
- void fire() override;
+ void fire(std::string instance) override;
public:
static ExecTaskPtr init(const std::string& name);
static ExecTaskPtr init(const std::string& name, double flops, Host* host);
- ExecTaskPtr set_host(Host* host);
- Host* get_host() const { return host_; }
- ExecTaskPtr set_flops(double flops);
- double get_flops() const { return get_amount(); }
+ ExecTaskPtr set_host(Host* host, std::string instance = "all");
+ Host* get_host(std::string instance = "instance_0") const { return host_.at(instance); }
+ ExecTaskPtr set_flops(double flops, std::string instance = "instance_0");
+ double get_flops(std::string instance = "instance_0") const { return get_amount(instance); }
+
+ void add_instances(int n) override;
+ void remove_instances(int n) override;
};
class IoTask : public Task {
Disk* disk_;
Io::OpType type_;
explicit IoTask(const std::string& name);
- void fire() override;
+ void fire(std::string instance) override;
public:
static IoTaskPtr init(const std::string& name);
IoTaskPtr set_disk(Disk* disk);
Disk* get_disk() const { return disk_; }
IoTaskPtr set_bytes(double bytes);
- double get_bytes() const { return get_amount(); }
+ double get_bytes() const { return get_amount("instance_0"); }
IoTaskPtr set_op_type(Io::OpType type);
Io::OpType get_op_type() const { return type_; }
};
py::class_<Mutex, MutexPtr>(m, "Mutex",
"A classical mutex, but blocking in the simulation world."
"See the C++ documentation for details.")
- .def(py::init<>(&Mutex::create), py::call_guard<py::gil_scoped_release>(), "Mutex constructor.")
+ .def(py::init<>(&Mutex::create), py::call_guard<py::gil_scoped_release>(),
+ "Mutex constructor (pass True as a parameter to get a recursive Mutex).", py::arg("recursive") = false)
.def("lock", &Mutex::lock, py::call_guard<py::gil_scoped_release>(), "Block until the mutex is acquired.")
.def("try_lock", &Mutex::try_lock, py::call_guard<py::gil_scoped_release>(),
"Try to acquire the mutex. Return true if the mutex was acquired, false otherwise.")
},
"Add a callback called when each task ends.")
.def_property_readonly("name", &Task::get_name, "The name of this task (read-only).")
- .def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).")
.def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).")
.def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.")
+ .def(
+ "get_count", [](const TaskPtr t) { return t->get_count("instance_0"); },
+ "The execution count of this task instance_0.")
+ .def(
+ "get_count", [](const TaskPtr t, const std::string& instance) { return t->get_count(instance); },
+ "The execution count of this task instance.")
.def("enqueue_firings", py::overload_cast<int>(&Task::enqueue_firings), py::call_guard<py::gil_scoped_release>(),
py::arg("n"), "Enqueue firings for this task.")
.def("add_successor", py::overload_cast<TaskPtr>(&Task::add_successor), py::call_guard<py::gil_scoped_release>(),
/** @brief Pushes a communication activity into a mailbox
* @param comm What to add
*/
-void MailboxImpl::push(CommImplPtr comm)
+void MailboxImpl::push(const CommImplPtr& comm)
{
comm->set_mailbox(this);
this->comm_queue_.push_back(std::move(comm));
(comm->get_mailbox() ? comm->get_mailbox()->get_cname() : "(null)"), this->get_cname());
comm->set_mailbox(nullptr);
- for (auto it = this->comm_queue_.begin(); it != this->comm_queue_.end(); it++)
- if (*it == comm) {
- this->comm_queue_.erase(it);
- return;
- }
- xbt_die("Comm %p not found in mailbox %s", comm.get(), this->get_cname());
+ auto it = std::find(this->comm_queue_.begin(), this->comm_queue_.end(), comm);
+ if (it != this->comm_queue_.end())
+ this->comm_queue_.erase(it);
+ else
+ xbt_die("Comm %p not found in mailbox %s", comm.get(), this->get_cname());
}
/** @brief Removes all communication activities from a mailbox
void MailboxImpl::clear(bool do_finish)
{
// CommImpl::cancel() will remove the comm from the mailbox..
- for (auto comm : done_comm_queue_) {
+ for (const auto& comm : done_comm_queue_) {
comm->cancel();
comm->set_state(State::FAILED);
if (do_finish)
#ifndef SIMGRID_KERNEL_ACTIVITY_MAILBOX_HPP
#define SIMGRID_KERNEL_ACTIVITY_MAILBOX_HPP
+#include "simgrid/config.h" /* FIXME: KILLME. This makes the ABI config-dependent, but mandatory for the hack below */
#include "simgrid/s4u/Engine.hpp"
#include "simgrid/s4u/Mailbox.hpp"
#include "src/kernel/activity/CommImpl.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
-#include <boost/circular_buffer.hpp>
-
namespace simgrid::kernel::activity {
/** @brief Implementation of the s4u::Mailbox */
class MailboxImpl {
- static constexpr size_t MAX_MAILBOX_SIZE = 10000000;
-
s4u::Mailbox piface_;
std::string name_;
actor::ActorImplPtr permanent_receiver_; // actor to which the mailbox is attached
- boost::circular_buffer_space_optimized<CommImplPtr> comm_queue_{MAX_MAILBOX_SIZE};
+#if SIMGRID_HAVE_STATEFUL_MC
+ /* Using deque here is faster in benchmarks, but break the state equality heuristic of Liveness checking on Debian
+ * testing. This would desserve a proper investiguation, but simply use a single-sided list for the time being. HACK.
+ */
+ std::list<CommImplPtr> comm_queue_;
// messages already received in the permanent receive mode
- boost::circular_buffer_space_optimized<CommImplPtr> done_comm_queue_{MAX_MAILBOX_SIZE};
+ std::list<CommImplPtr> done_comm_queue_;
+#else
+ std::deque<CommImplPtr> comm_queue_;
+ // messages already received in the permanent receive mode
+ std::deque<CommImplPtr> done_comm_queue_;
+#endif
friend s4u::Engine;
friend s4u::Mailbox;
const std::string& get_name() const { return name_; }
const char* get_cname() const { return name_.c_str(); }
void set_receiver(s4u::ActorPtr actor);
- void push(CommImplPtr comm);
- void push_done(CommImplPtr done_comm) { done_comm_queue_.push_back(done_comm); }
+ void push(const CommImplPtr& comm);
+ void push_done(const CommImplPtr& done_comm) { done_comm_queue_.push_back(done_comm); }
void remove(const CommImplPtr& comm);
void clear(bool do_finish);
CommImplPtr iprobe(int type, const std::function<bool(void*, void*, CommImpl*)>& match_fun, void* data);
actor::ActorImplPtr get_permanent_receiver() const { return permanent_receiver_; }
bool empty() const { return comm_queue_.empty(); }
size_t size() const { return comm_queue_.size(); }
- CommImplPtr front() const { return comm_queue_.front(); }
+ const CommImplPtr& front() const { return comm_queue_.front(); }
bool has_some_done_comm() const { return not done_comm_queue_.empty(); }
- CommImplPtr done_front() const { return done_comm_queue_.front(); }
+ const CommImplPtr& done_front() const { return done_comm_queue_.front(); }
};
} // namespace simgrid::kernel::activity
MutexAcquisitionImplPtr MutexImpl::lock_async(actor::ActorImpl* issuer)
{
- auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true);
-
- if (owner_ != nullptr) {
- /* Somebody is using the mutex; register the acquisition */
+ /* If the mutex is recursive */
+ if (is_recursive_) {
+ if (owner_ == issuer) {
+ recursive_depth++;
+ auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true);
+ res->grant();
+ return res;
+ } else if (owner_ == nullptr) { // Free
+ owner_ = issuer;
+ recursive_depth = 1;
+ auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true);
+ res->grant();
+ return res;
+ }
+
+ for (auto acq : ongoing_acquisitions_)
+ if (acq->get_issuer() == issuer) {
+ acq->recursive_depth_++;
+ return acq;
+ }
+
+ // Not yet in the ongoing acquisition list. Get in there
+ auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true);
ongoing_acquisitions_.push_back(res);
- } else {
+ return res;
+ }
+
+ // None-recursive mutex
+ auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true);
+ if (owner_ == nullptr) { // Lock is free, take it
owner_ = issuer;
+ recursive_depth = 1;
+ res->grant();
+ } else { // Somebody is using the mutex; register the acquisition
+ ongoing_acquisitions_.push_back(res);
}
return res;
}
*/
bool MutexImpl::try_lock(actor::ActorImpl* issuer)
{
- XBT_IN("(%p, %p)", this, issuer);
- if (owner_ != nullptr) {
- XBT_OUT();
- return false;
+ if (owner_ == issuer && is_recursive_) {
+ recursive_depth++;
+ return true;
}
+ if (owner_ != nullptr)
+ return false;
- owner_ = issuer;
- XBT_OUT();
+ owner_ = issuer;
return true;
}
xbt_assert(issuer == owner_, "Cannot release that mutex: you're not the owner. %s is (pid:%ld).",
owner_ != nullptr ? owner_->get_cname() : "(nobody)", owner_ != nullptr ? owner_->get_pid() : -1);
+ if (is_recursive_) {
+ recursive_depth--;
+ if (recursive_depth > 0) // Still owning the lock
+ return;
+ }
+
if (not ongoing_acquisitions_.empty()) {
/* Give the ownership to the first waiting actor */
auto acq = ongoing_acquisitions_.front();
ongoing_acquisitions_.pop_front();
owner_ = acq->get_issuer();
+ acq->grant();
+ recursive_depth = acq->recursive_depth_;
if (acq == owner_->waiting_synchro_)
acq->finish();
// else, the issuer is not blocked on this acquisition so no need to release it
class XBT_PUBLIC MutexAcquisitionImpl : public ActivityImpl_T<MutexAcquisitionImpl> {
actor::ActorImpl* issuer_ = nullptr;
MutexImpl* mutex_ = nullptr;
+ int recursive_depth_ = 1;
+ // TODO: use granted_ this instead of owner_ == self to test().
+ // This is mandatory to get double-lock on non-recursive locks to properly deadlock
+ bool granted_ = false;
+
+ friend MutexImpl;
public:
MutexAcquisitionImpl(actor::ActorImpl* issuer, MutexImpl* mutex) : issuer_(issuer), mutex_(mutex) {}
MutexImplPtr get_mutex() { return mutex_; }
actor::ActorImpl* get_issuer() { return issuer_; }
+ void grant() { granted_ = true; }
bool test(actor::ActorImpl* issuer = nullptr) override;
void wait_for(actor::ActorImpl* issuer, double timeout) override;
std::deque<MutexAcquisitionImplPtr> ongoing_acquisitions_;
static unsigned next_id_;
unsigned id_ = next_id_++;
+ bool is_recursive_ = false;
+ int recursive_depth = 0;
friend MutexAcquisitionImpl;
public:
- MutexImpl() : piface_(this) {}
+ MutexImpl(bool recursive = false) : piface_(this), is_recursive_(recursive) {}
MutexImpl(MutexImpl const&) = delete;
MutexImpl& operator=(MutexImpl const&) = delete;
Metric read_bw_ = {0.0, 0, nullptr};
Metric write_bw_ = {0.0, 0, nullptr};
double readwrite_bw_ = -1; /* readwrite constraint bound, usually max(read, write) */
+ std::atomic_int_fast32_t refcount_{0};
void apply_sharing_policy_cfg();
explicit DiskImpl(const std::string& name, double read_bandwidth, double write_bandwidth);
DiskImpl(const DiskImpl&) = delete;
DiskImpl& operator=(const DiskImpl&) = delete;
+ friend void intrusive_ptr_add_ref(DiskImpl* disk)
+ {
+ disk->refcount_.fetch_add(1, std::memory_order_acq_rel);
+ }
+ friend void intrusive_ptr_release(DiskImpl* disk)
+ {
+ if (disk->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ delete disk;
+ }
+ }
/** @brief Public interface */
const s4u::Disk* get_iface() const { return &piface_; }
#include <simgrid/s4u/Host.hpp>
#include "src/kernel/EngineImpl.hpp"
+#include "src/kernel/resource/NetworkModel.hpp"
#include "src/kernel/resource/VirtualMachineImpl.hpp"
#include "xbt/asserts.hpp"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(res_host, ker_resource, "Host resources agregate CPU, networking and I/O features");
/*************
- * Callbacks *t
+ * Callbacks *
*************/
namespace simgrid::kernel::resource {
/*********
* Model *
*********/
+Action* HostModel::io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
+ double size)
+{
+ auto* net_model = src_host->get_englobing_zone()->get_network_model();
+ auto* system = net_model->get_maxmin_system();
+ auto* action = net_model->communicate(src_host, dst_host, size, -1, true);
+
+ // We don't want to apply the network model bandwidth factor to the I/O constraints
+ double bw_factor = net_model->get_bandwidth_factor();
+ if (src_disk != nullptr) {
+ // FIXME: if the stream starts from a disk, we might not want to pay the network latency
+ system->expand(src_disk->get_constraint(), action->get_variable(), bw_factor);
+ system->expand(src_disk->get_read_constraint(), action->get_variable(), bw_factor);
+ }
+ if (dst_disk != nullptr) {
+ system->expand(dst_disk->get_constraint(), action->get_variable(), bw_factor);
+ system->expand(dst_disk->get_write_constraint(), action->get_variable(), bw_factor);
+ }
+
+ return action;
+}
+
/************
* Resource *
************/
delete arg;
actors_at_boot_.clear();
- for (auto const& [_, d] : disks_)
- d->destroy();
-
for (auto const& [_, vm] : vms_)
vm->vm_destroy();
}
res.emplace_back(actor.get_ciface());
return res;
}
+
size_t HostImpl::get_actor_count() const
{
return actor_list_.size();
void HostImpl::add_disk(const s4u::Disk* disk)
{
- disks_[disk->get_name()] = disk->get_impl();
+ disks_.insert({disk->get_name(), kernel::resource::DiskImplPtr(disk->get_impl())});
}
void HostImpl::remove_disk(const std::string& name)
virtual Action* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
const double* bytes_amount, double rate) = 0;
- virtual Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
- double size) = 0;
+ Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
+ double size);
};
/************
ActorList actor_list_;
std::vector<actor::ProcessArg*> actors_at_boot_;
s4u::Host piface_;
- std::map<std::string, DiskImpl*, std::less<>> disks_;
+ std::map<std::string, DiskImplPtr, std::less<>> disks_;
std::map<std::string, VirtualMachineImpl*, std::less<>> vms_;
std::string name_{"noname"};
routing::NetZoneImpl* englobing_zone_ = nullptr;
{
return nullptr;
};
- Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size) override { return nullptr; }
};
} // namespace kernel::resource
} // namespace simgrid
return -1.0;
}
-Action* HostCLM03Model::io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
- double size)
-{
- auto* net_model = src_host->get_englobing_zone()->get_network_model();
- auto* system = net_model->get_maxmin_system();
- auto* action = net_model->communicate(src_host, dst_host, size, -1, true);
-
- // We don't want to apply the network model bandwidth factor to the I/O constraints
- double bw_factor = net_model->get_bandwidth_factor();
- if (src_disk != nullptr) {
- // FIXME: if the stream starts from a disk, we might not want to pay the network latency
- system->expand(src_disk->get_constraint(), action->get_variable(), bw_factor);
- system->expand(src_disk->get_read_constraint(), action->get_variable(), bw_factor);
- }
- if (dst_disk != nullptr) {
- system->expand(dst_disk->get_constraint(), action->get_variable(), bw_factor);
- system->expand(dst_disk->get_write_constraint(), action->get_variable(), bw_factor);
- }
-
- return action;
-}
-
Action* HostCLM03Model::execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
const double* bytes_amount, double rate)
{
Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override;
Action* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
const double* bytes_amount, double rate) override;
- Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
- double size) override;
};
} // namespace simgrid::kernel::resource
wifiPhy.Set("Antennas", ns3::UintegerValue(nss_value));
wifiPhy.Set("MaxSupportedTxSpatialStreams", ns3::UintegerValue(nss_value));
wifiPhy.Set("MaxSupportedRxSpatialStreams", ns3::UintegerValue(nss_value));
-#if NS3_MINOR_VERSION > 33
+#if NS3_MINOR_VERSION < 33
+ // This fails with "The channel width does not uniquely identify an operating channel" on v3.34,
+ // so we specified the ChannelWidth of wifiPhy to 40, above, when creating wifiPhy with v3.34 and higher
+ ns3::Config::Set("/NodeList/*/DeviceList/*/$ns3::WifiNetDevice/Phy/ChannelWidth", ns3::UintegerValue(40));
+#elif NS3_MINOR_VERSION < 36
wifiPhy.Set("ChannelWidth", ns3::UintegerValue(40));
+#else
+ wifiPhy.Set("ChannelSettings", ns3::StringValue("{0, 40, BAND_UNSPECIFIED, 0}"));
#endif
wifiMac.SetType("ns3::ApWifiMac", "Ssid", ns3::SsidValue(ssid));
ns3::Simulator::Schedule(ns3::Seconds(start_time_value), &resumeWifiDevice, device);
}
-#if NS3_MINOR_VERSION < 33
- // This fails with "The channel width does not uniquely identify an operating channel" on v3.34,
- // so we specified the ChannelWidth of wifiPhy to 40, above, when creating wifiPhy with v3.34 and higher
- ns3::Config::Set("/NodeList/*/DeviceList/*/$ns3::WifiNetDevice/Phy/ChannelWidth", ns3::UintegerValue(40));
-#endif
-
mobility.SetPositionAllocator(positionAllocS);
mobility.Install(nodes);
ns3::Ipv4AddressHelper address;
XBT_DEBUG("Creating a parallel task (%p) with %zu hosts and %zu unique links.", this, host_nb, link_nb);
latency_ = latency;
- set_variable(model->get_maxmin_system()->variable_new(this, 1.0, (rate > 0 ? rate : -1.0), host_nb + link_nb));
+ // Allocate more space for constraints (+4) in case users want to mix ptasks and io streams
+ set_variable(model->get_maxmin_system()->variable_new(this, 1.0, (rate > 0 ? rate : -1.0), host_nb + link_nb + 4));
if (latency_ > 0)
model->get_maxmin_system()->update_variable_penalty(get_variable(), 0.0);
Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override { return nullptr; }
CpuAction* execute_parallel(const std::vector<s4u::Host*>& host_list, const double* flops_amount,
const double* bytes_amount, double rate) override;
- Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk,
- double size) override
- {
- return nullptr;
- }
};
class CpuL07Model : public CpuModel {
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
+#include "simgrid/s4u/Host.hpp"
#include "simgrid/kernel/routing/ClusterZone.hpp"
#include "simgrid/kernel/routing/NetPoint.hpp"
#include "src/kernel/resource/StandardLinkImpl.hpp"
kernel::routing::NetPoint* netpoint = nullptr;
kernel::routing::NetPoint* gw = nullptr;
auto dims = index_to_dims(position);
- std::tie(netpoint, gw) = set_callbacks.netpoint(get_iface(), dims, position);
+ if (set_callbacks.is_by_netpoint()) { // XBT_ATTRIB_DEPRECATED_v339
+ std::tie(netpoint, gw) = set_callbacks.netpoint(get_iface(), dims, position); // XBT_ATTRIB_DEPRECATED_v339
+ } else if (set_callbacks.is_by_netzone()) {
+ s4u::NetZone* netzone = set_callbacks.netzone(get_iface(), dims, position);
+ netpoint = netzone->get_netpoint();
+ gw = netzone->get_gateway();
+ } else {
+ s4u::Host* host = set_callbacks.host(get_iface(), dims, position);
+ netpoint = host->get_netpoint();
+ }
+
xbt_assert(netpoint, "set_netpoint(elem=%lu): Invalid netpoint (nullptr)", position);
if (netpoint->is_netzone()) {
xbt_assert(gw && not gw->is_netzone(),
#ifndef NETZONE_TEST_HPP
#define NETZONE_TEST_HPP
-#include "simgrid/kernel/routing/NetPoint.hpp"
-#include "simgrid/s4u/Host.hpp"
#include "simgrid/s4u/NetZone.hpp"
+#include "xbt/log.h"
+XBT_LOG_EXTERNAL_CATEGORY(ker_platform);
// Callback function common to several routing unit tests
struct CreateHost {
- std::pair<simgrid::kernel::routing::NetPoint*, simgrid::kernel::routing::NetPoint*>
- operator()(simgrid::s4u::NetZone* zone, const std::vector<unsigned long>& /*coord*/, unsigned long id) const
+ simgrid::s4u::Host* operator()(simgrid::s4u::NetZone* zone, const std::vector<unsigned long>& /*coord*/,
+ unsigned long id) const
{
- const simgrid::s4u::Host* host = zone->create_host(std::to_string(id), 1e9)->seal();
- return std::make_pair(host->get_netpoint(), nullptr);
+ return zone->create_host(std::to_string(id), "1Gf");
}
};
/*************************************************************************************************/
/** @brief Auxiliary function to create hosts */
-static std::pair<simgrid::kernel::routing::NetPoint*, simgrid::kernel::routing::NetPoint*>
+static simgrid::s4u::Host*
sg_platf_cluster_create_host(const simgrid::kernel::routing::ClusterCreationArgs* cluster, simgrid::s4u::NetZone* zone,
const std::vector<unsigned long>& /*coord*/, unsigned long id)
{
std::string host_id = cluster->prefix + std::to_string(cluster->radicals[id]) + cluster->suffix;
XBT_DEBUG("Cluster: creating host=%s speed=%f", host_id.c_str(), cluster->speeds.front());
- const simgrid::s4u::Host* host = zone->create_host(host_id, cluster->speeds)
- ->set_core_count(cluster->core_amount)
- ->set_properties(cluster->properties)
- ->seal();
- return std::make_pair(host->get_netpoint(), nullptr);
+ simgrid::s4u::Host* host = zone->create_host(host_id, cluster->speeds)
+ ->set_core_count(cluster->core_amount)
+ ->set_properties(cluster->properties);
+ return host;
}
/** @brief Auxiliary function to create loopback links */
using simgrid::kernel::routing::FatTreeZone;
using simgrid::kernel::routing::TorusZone;
- auto set_host = std::bind(sg_platf_cluster_create_host, cluster, _1, _2, _3);
+ std::function<simgrid::s4u::ClusterCallbacks::ClusterHostCb> set_host =
+ std::bind(sg_platf_cluster_create_host, cluster, _1, _2, _3);
std::function<simgrid::s4u::ClusterCallbacks::ClusterLinkCb> set_loopback{};
std::function<simgrid::s4u::ClusterCallbacks::ClusterLinkCb> set_limiter{};
if (o->type_ < type_)
return o->depends(this);
+ // Actions executed by the same actor are always dependent
+ if (o->aid_ == aid_)
+ return true;
+
if (const auto* other = dynamic_cast<const BarrierTransition*>(o)) {
if (bar_ != other->bar_)
return false;
*
* See @ref s4u_raii.
*/
-MutexPtr Mutex::create()
+MutexPtr Mutex::create(bool recursive)
{
- auto* mutex = new kernel::activity::MutexImpl();
+ auto* mutex = new kernel::activity::MutexImpl(recursive);
return MutexPtr(&mutex->mutex(), false);
}
pimpl_->add_route(src ? src->get_netpoint() : nullptr, dst ? dst->get_netpoint(): nullptr,
src ? src->get_gateway() : nullptr, dst ? dst->get_gateway() : nullptr,
links_direct, false);
- pimpl_->add_route(src ? src->get_netpoint() : nullptr, dst ? dst->get_netpoint(): nullptr,
- src ? src->get_gateway() : nullptr, dst ? dst->get_gateway() : nullptr,
+ pimpl_->add_route(dst ? dst->get_netpoint(): nullptr, src ? src->get_netpoint() : nullptr,
+ dst ? dst->get_gateway() : nullptr, src ? src->get_gateway() : nullptr,
links_reverse, false);
}
+#include <cstddef>
#include <memory>
#include <simgrid/Exception.hpp>
+#include <simgrid/s4u/Activity.hpp>
#include <simgrid/s4u/Comm.hpp>
+#include <simgrid/s4u/Disk.hpp>
#include <simgrid/s4u/Exec.hpp>
#include <simgrid/s4u/Io.hpp>
#include <simgrid/s4u/Task.hpp>
#include <simgrid/simix.hpp>
+#include <string>
+#include <xbt/asserts.h>
#include "src/simgrid/module.hpp"
SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
-/**
- @beginrst
-
-
-Tasks are designed to represent dataflows, i.e, graphs of Tasks.
-Tasks can only be instancied using either
-:cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init`
-An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
-A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
-
-
-
- @endrst
- */
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
namespace simgrid::s4u {
Task::Task(const std::string& name) : name_(name) {}
-/**
- * @brief Return True if the Task can start a new Activity.
- * @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
+/** @param instance The Task instance to check.
+ * @brief Return True if this Task instance can start.
*/
-bool Task::ready_to_run() const
+bool Task::ready_to_run(std::string instance)
{
- return running_instances_ < parallelism_degree_ && queued_firings_ > 0;
+ return running_instances_[instance] < parallelism_degree_[instance] && queued_firings_[instance] > 0;
}
-/**
- * @param source The sender.
+/** @param source The sender.
* @brief Receive a token from another Task.
* @note Check upon reception if the Task has received a token from each of its predecessors,
* and in this case consumes those tokens and enqueue an execution.
void Task::receive(Task* source)
{
XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
- auto source_count = predecessors_[source];
predecessors_[source]++;
- if (tokens_received_.size() <= queued_firings_ + source_count)
- tokens_received_.emplace_back();
- tokens_received_[queued_firings_ + source_count][source] = source->token_;
- bool enough_tokens = true;
+ if (source->token_ != nullptr)
+ tokens_received_[source].push_back(source->token_);
+ bool enough_tokens = true;
for (auto const& [key, val] : predecessors_)
if (val < 1) {
enough_tokens = false;
}
}
-/**
- * @brief Task routine when finishing an execution.
- * @note Set its working status as false.
- * Add 1 to its count of finished executions.
- * Call the on_this_end func.
- * Fire on_end callback.
- * Send a token to each of its successors.
- * Start a new execution if possible.
+/** @param instance The Taks instance to complete.
+ * @brief Task instance routine when finishing an execution of an instance.
+ * @note The dispatcher instance enqueues a firing for the next instance.
+ * The collector instance triggers the on_completion signals and sends tokens to successors.
+ * Others instances enqueue a firing of the collector instance.
*/
-void Task::complete()
+void Task::complete(std::string instance)
{
xbt_assert(Actor::is_maestro());
- running_instances_--;
- count_++;
- on_this_completion(this);
- on_completion(this);
- for (auto const& t : successors_)
- t->receive(this);
- if (ready_to_run())
- fire();
+ running_instances_[instance]--;
+ count_[instance]++;
+ if (instance == "collector") {
+ on_this_completion(this);
+ on_completion(this);
+ for (auto const& t : successors_)
+ t->receive(this);
+ } else if (instance == "dispatcher") {
+ auto next_instance = load_balancing_function_();
+ xbt_assert(next_instance != "dispatcher" and next_instance != "collector", "Invalid instance selected: %s",
+ next_instance.c_str());
+ queued_firings_[next_instance] = queued_firings_.at(next_instance) + 1;
+ while (ready_to_run(next_instance))
+ fire(next_instance);
+ } else {
+ queued_firings_["collector"]++;
+ while (ready_to_run("collector"))
+ fire("collector");
+ }
+ if (ready_to_run(instance))
+ fire(instance);
}
-/** @param n The new parallelism degree of the Task.
- * @brief Set the parallelism degree of the Task to inscrease or decrease horizontal scaling.
- * @note When increasing the degree the function starts new instances if there is queued firings.
- * When decreasing the degree the function does NOT stop running instances.
-
+/** @param n The new parallelism degree of the Task instance.
+ * @param instance The Task instance to modify.
+ * @note You can use instance "all" to modify the parallelism degree of all instances of this Task.
+ * When increasing the degree new executions are started if there is queued firings.
+ * When decreasing the degree instances already running are NOT stopped.
*/
-void Task::set_parallelism_degree(int n)
+void Task::set_parallelism_degree(int n, std::string instance)
{
- xbt_assert(n > 0, "Parallelism degree of Tasks must be above 0.");
- simgrid::kernel::actor::simcall_answered([this, n] {
- parallelism_degree_ = n;
- while (ready_to_run())
- fire();
+ xbt_assert(n > 0, "Parallelism degree must be above 0.");
+ simgrid::kernel::actor::simcall_answered([this, n, &instance] {
+ if (instance == "all") {
+ for (auto& [key, value] : parallelism_degree_) {
+ parallelism_degree_[key] = n;
+ while (ready_to_run(key))
+ fire(key);
+ }
+ } else {
+ parallelism_degree_[instance] = n;
+ while (ready_to_run(instance))
+ fire(instance);
+ }
});
}
+/** @param bytes The internal bytes of the Task instance.
+ * @param instance The Task instance to modify.
+ * @note Internal bytes are used for Comms between the dispatcher and instance_n,
+ * and between instance_n and the collector if they are not on the same host.
+ */
+void Task::set_internal_bytes(int bytes, std::string instance)
+{
+ simgrid::kernel::actor::simcall_answered([this, bytes, &instance] { internal_bytes_to_send_[instance] = bytes; });
+}
+
+/** @param func The load balancing function.
+ * @note The dispatcher uses this function to determine which instance to trigger next.
+ */
+void Task::set_load_balancing_function(std::function<std::string()> func)
+{
+ simgrid::kernel::actor::simcall_answered([this, func] { load_balancing_function_ = func; });
+}
+
/** @param n The number of firings to enqueue.
- * @brief Enqueue firing.
- * @note Immediatly fire an activity if possible.
*/
void Task::enqueue_firings(int n)
{
simgrid::kernel::actor::simcall_answered([this, n] {
- queued_firings_ += n;
- while (ready_to_run())
- fire();
+ queued_firings_["dispatcher"] += n;
+ while (ready_to_run("dispatcher"))
+ fire("dispatcher");
});
}
}
/** @param amount The amount to set.
- * @brief Set the amout of work to do.
+ * @param instance The Task instance to modify.
* @note Amount in flop for ExecTask and in bytes for CommTask.
*/
-void Task::set_amount(double amount)
+void Task::set_amount(double amount, std::string instance)
{
- simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
+ simgrid::kernel::actor::simcall_answered([this, amount, &instance] { amount_[instance] = amount; });
}
/** @param token The token to set.
* @brief Set the token to send to successors.
- * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ * @note The token is passed to each successor after the Task instance collector end, i.e., after the on_completion
+ * callback.
*/
void Task::set_token(std::shared_ptr<Token> token)
{
simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
}
-/** @return Map of tokens received for the next execution.
- * @note If there is no queued execution for this task the map might not exist or be partially empty.
+/** @param t The Task to deque a token from.
*/
-std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+void Task::deque_token_from(TaskPtr t)
{
- return tokens_received_.front()[t];
+ simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_[t].pop_front(); });
}
-void Task::fire()
+void Task::fire(std::string instance)
{
- if ((int)current_activities_.size() > parallelism_degree_) {
- current_activities_.pop_front();
+ if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) {
+ current_activities_[instance].pop_front();
+ }
+ if (instance != "dispatcher" and instance != "collector") {
+ on_this_start(this);
+ on_start(this);
}
- on_this_start(this);
- on_start(this);
- running_instances_++;
- queued_firings_ = std::max(queued_firings_ - 1, 0);
- if (not tokens_received_.empty())
- tokens_received_.pop_front();
+ running_instances_[instance]++;
+ queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0);
}
-/** @param successor The Task to add.
- * @brief Add a successor to this Task.
+/** @param successor The Task to add as a successor.
* @note It also adds this as a predecessor of successor.
*/
void Task::add_successor(TaskPtr successor)
});
}
-/** @param successor The Task to remove.
- * @brief Remove a successor from this Task.
+/** @param successor The Task to remove from the successors of this Task.
* @note It also remove this from the predecessors of successor.
*/
void Task::remove_successor(TaskPtr successor)
});
}
+/** @brief Remove all successors from this Task.
+ */
void Task::remove_all_successors()
{
simgrid::kernel::actor::simcall_answered([this] {
});
}
+/** @param n The number of instances to add to this Task (>=0).
+ * @note Instances goes always from instance_0 to instance_x,
+ * where x is the current number of instance.
+ */
+void Task::add_instances(int n)
+{
+ xbt_assert(n >= 0, "Cannot add a negative number of instances (provided: %d)", n);
+ int instance_count = (int)amount_.size() - 2;
+ for (int i = instance_count; i < n + instance_count; i++) {
+ amount_["instance_" + std::to_string(i)] = amount_.at("instance_0");
+ queued_firings_["instance_" + std::to_string(i)] = 0;
+ running_instances_["instance_" + std::to_string(i)] = 0;
+ count_["instance_" + std::to_string(i)] = 0;
+ parallelism_degree_["instance_" + std::to_string(i)] = parallelism_degree_.at("instance_0");
+ current_activities_["instance_" + std::to_string(i)] = {};
+ internal_bytes_to_send_["instance_" + std::to_string(i)] = internal_bytes_to_send_.at("instance_0");
+ ;
+ }
+}
+
+/** @param n The number of instances to remove from this Task (>=0).
+ * @note Instances goes always from instance_0 to instance_x,
+ * where x is the current number of instance.
+ * Running instances cannot be removed.
+ */
+void Task::remove_instances(int n)
+{
+ int instance_count = (int)amount_.size() - 2;
+ xbt_assert(n >= 0, "Cannot remove a negative number of instances (provided: %d)", n);
+ xbt_assert(instance_count - n > 0, "The number of instances must be above 0 (instances: %d, provided: %d)",
+ instance_count, n);
+ for (int i = instance_count - 1; i >= instance_count - n; i--) {
+ xbt_assert(running_instances_.at("instance_" + std::to_string(i)) == 0,
+ "Cannot remove a running instance (instances: %d)", i);
+ amount_.erase("instance_" + std::to_string(i));
+ queued_firings_.erase("instance_" + std::to_string(i));
+ running_instances_.erase("instance_" + std::to_string(i));
+ count_.erase("instance_" + std::to_string(i));
+ parallelism_degree_.erase("instance_" + std::to_string(i));
+ current_activities_.erase("instance_" + std::to_string(i));
+ }
+}
+
/**
* @brief Default constructor.
*/
-ExecTask::ExecTask(const std::string& name) : Task(name) {}
+ExecTask::ExecTask(const std::string& name) : Task(name)
+{
+ set_load_balancing_function([]() { return "instance_0"; });
+}
-/** @ingroup plugin_task
+/**
* @brief Smart Constructor.
*/
ExecTaskPtr ExecTask::init(const std::string& name)
return ExecTaskPtr(new ExecTask(name));
}
-/** @ingroup plugin_task
+/**
* @brief Smart Constructor.
*/
ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
return init(name)->set_flops(flops)->set_host(host);
}
-/**
- * @brief Do one execution of the Task.
- * @note Call the on_this_start() func.
- * Init and start the underlying Activity.
+/** @param instance The Task instance to fire.
+ * @note Only the dispatcher instance triggers the on_start signal.
+ * Comms are created if hosts differ between dispatcher and the instance to fire,
+ * or between the instance and the collector.
*/
-void ExecTask::fire()
+void ExecTask::fire(std::string instance)
{
- Task::fire();
- auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
- exec->start();
- exec->on_this_completion_cb([this](Exec const&) { complete(); });
- store_activity(exec);
+ Task::fire(instance);
+ if (instance == "dispatcher" or instance == "collector") {
+ auto exec = Exec::init()
+ ->set_name(get_name() + "_" + instance)
+ ->set_flops_amount(get_amount(instance))
+ ->set_host(host_[instance]);
+ exec->start();
+ exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
+ store_activity(exec, instance);
+ } else {
+ auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_[instance]);
+ if (host_["dispatcher"] == host_[instance]) {
+ exec->start();
+ store_activity(exec, instance);
+ } else {
+ auto comm = Comm::sendto_init(host_["dispatcher"], host_[instance])
+ ->set_name(get_name() + "_dispatcher_to_" + instance)
+ ->set_payload_size(get_internal_bytes("dispatcher"));
+ comm->add_successor(exec);
+ comm->start();
+ store_activity(comm, instance);
+ }
+ if (host_[instance] == host_["collector"]) {
+ exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
+ if (host_["dispatcher"] != host_[instance])
+ store_activity(exec, instance);
+ } else {
+ auto comm = Comm::sendto_init(host_[instance], host_["collector"])
+ ->set_name(get_name() + instance + "_to_collector")
+ ->set_payload_size(get_internal_bytes(instance));
+ exec->add_successor(comm);
+ comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
+ comm.detach();
+ }
+ }
}
-/** @ingroup plugin_task
- * @param host The host to set.
+/** @param host The host to set.
+ * @param instance The Task instance to modify.
* @brief Set a new host.
*/
-ExecTaskPtr ExecTask::set_host(Host* host)
+ExecTaskPtr ExecTask::set_host(Host* host, std::string instance)
{
- kernel::actor::simcall_answered([this, host] { host_ = host; });
+ kernel::actor::simcall_answered([this, host, &instance] {
+ if (instance == "all")
+ for (auto& [key, value] : host_)
+ host_[key] = host;
+ else
+ host_[instance] = host;
+ });
return this;
}
-/** @ingroup plugin_task
- * @param flops The amount of flops to set.
+/** @param flops The amount of flops to set.
+ * @param instance The Task instance to modify.
*/
-ExecTaskPtr ExecTask::set_flops(double flops)
+ExecTaskPtr ExecTask::set_flops(double flops, std::string instance)
{
- kernel::actor::simcall_answered([this, flops] { set_amount(flops); });
+ kernel::actor::simcall_answered([this, flops, &instance] { set_amount(flops, instance); });
return this;
}
+/** @param n The number of instances to add to this Task (>=0).
+ @note Instances goes always from instance_0 to instance_x,
+ where x is the current number of instance.
+ */
+void ExecTask::add_instances(int n)
+{
+ Task::add_instances(n);
+ int instance_count = (int)host_.size() - 2;
+ for (int i = instance_count; i < n + instance_count; i++)
+ host_["instance_" + std::to_string(i)] = host_.at("instance_0");
+}
+
+/** @param n The number of instances to remove from this Task (>=0).
+ @note Instances goes always from instance_0 to instance_x,
+ where x is the current number of instance.
+ Running instance cannot be removed.
+ */
+void ExecTask::remove_instances(int n)
+{
+ Task::remove_instances(n);
+ int instance_count = (int)host_.size() - 2;
+ for (int i = instance_count - 1; i >= instance_count - n; i--)
+ host_.erase("instance_" + std::to_string(i));
+}
+
/**
* @brief Default constructor.
*/
-CommTask::CommTask(const std::string& name) : Task(name) {}
+CommTask::CommTask(const std::string& name) : Task(name)
+{
+ set_load_balancing_function([]() { return "instance_0"; });
+}
-/** @ingroup plugin_task
+/**
* @brief Smart constructor.
*/
CommTaskPtr CommTask::init(const std::string& name)
return CommTaskPtr(new CommTask(name));
}
-/** @ingroup plugin_task
+/**
* @brief Smart constructor.
*/
CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
}
-/**
- * @brief Do one execution of the Task.
- * @note Call the on_this_start() func.
- * Init and start the underlying Activity.
+/** @param instance The Task instance to fire.
+ * @note Only the dispatcher instance triggers the on_start signal.
*/
-void CommTask::fire()
+void CommTask::fire(std::string instance)
{
- Task::fire();
- auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
- comm->start();
- comm->on_this_completion_cb([this](Comm const&) { complete(); });
- store_activity(comm);
+ Task::fire(instance);
+ if (instance == "dispatcher" or instance == "collector") {
+ auto exec = Exec::init()
+ ->set_name(get_name() + "_" + instance)
+ ->set_flops_amount(get_amount(instance))
+ ->set_host(instance == "dispatcher" ? source_ : destination_);
+ exec->start();
+ exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
+ store_activity(exec, instance);
+ } else {
+ auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
+ comm->start();
+ comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
+ store_activity(comm, instance);
+ }
}
-/** @ingroup plugin_task
+/**
* @param source The host to set.
* @brief Set a new source host.
*/
return this;
}
-/** @ingroup plugin_task
+/**
* @param destination The host to set.
* @brief Set a new destination host.
*/
return this;
}
-/** @ingroup plugin_task
+/**
* @param bytes The amount of bytes to set.
*/
CommTaskPtr CommTask::set_bytes(double bytes)
/**
* @brief Default constructor.
*/
-IoTask::IoTask(const std::string& name) : Task(name) {}
+IoTask::IoTask(const std::string& name) : Task(name)
+{
+ set_load_balancing_function([]() { return "instance_0"; });
+}
-/** @ingroup plugin_task
+/**
* @brief Smart Constructor.
*/
IoTaskPtr IoTask::init(const std::string& name)
return IoTaskPtr(new IoTask(name));
}
-/** @ingroup plugin_task
+/**
* @brief Smart Constructor.
*/
IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
}
-/** @ingroup plugin_task
+/**
* @param disk The disk to set.
- * @brief Set a new disk.
*/
IoTaskPtr IoTask::set_disk(Disk* disk)
{
return this;
}
-/** @ingroup plugin_task
+/**
* @param bytes The amount of bytes to set.
*/
IoTaskPtr IoTask::set_bytes(double bytes)
return this;
}
-/** @ingroup plugin_task */
+/**
+ * @param type The op type to set.
+ */
IoTaskPtr IoTask::set_op_type(Io::OpType type)
{
kernel::actor::simcall_answered([this, type] { type_ = type; });
return this;
}
-void IoTask::fire()
+/** @param instance The Task instance to fire.
+ * @note Only the dispatcher instance triggers the on_start signal.
+ */
+void IoTask::fire(std::string instance)
{
- Task::fire();
- auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
- io->start();
- io->on_this_completion_cb([this](Io const&) { complete(); });
- store_activity(io);
+ Task::fire(instance);
+ if (instance == "dispatcher" or instance == "collector") {
+ auto exec = Exec::init()
+ ->set_name(get_name() + "_" + instance)
+ ->set_flops_amount(get_amount(instance))
+ ->set_host(disk_->get_host());
+ exec->start();
+ exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
+ store_activity(exec, instance);
+ } else {
+ auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
+ io->start();
+ io->on_this_completion_cb([this, instance](Io const&) { complete(instance); });
+ store_activity(io, instance);
+ }
}
-
} // namespace simgrid::s4u
/** Config Globals */
-XBT_PUBLIC_DATA int _sg_cfg_init_status;
+XBT_PUBLIC_DATA int _sg_cfg_init_status; /* 0: not inited; 1: config module inited; 2: root zone of platform created */
XBT_PUBLIC void sg_config_init(int* argc, char** argv);
XBT_PUBLIC void sg_config_finalize();
CHECK_TYPE(8, recvtype)
CHECK_BUFFER(1, sendbuf, sendcount, sendtype)
CHECK_BUFFER(6, recvbuf, recvcount, recvtype)
+ CHECK_ARGS(sendbuf == recvbuf && sendcount > 0 && recvcount > 0, MPI_ERR_BUFFER,
+ "%s: Invalid parameters 1 and 6: sendbuf and recvbuf must be disjoint", __func__);
CHECK_TAG(10, recvtag)
CHECK_COMM(11)
const SmpiBenchGuard suspend_bench;
smpi_trace_call_location_t* call_location();
void set_privatized_region(smpi_privatization_region_t region);
smpi_privatization_region_t privatized_region() const;
- s4u::Mailbox* mailbox() const { return mailbox_; }
- s4u::Mailbox* mailbox_small() const { return mailbox_small_; }
+ s4u::Mailbox* mailbox();
+ s4u::Mailbox* mailbox_small();
s4u::MutexPtr mailboxes_mutex() const;
#if HAVE_PAPI
int papi_event_set() const;
if (not simgrid::smpi::ActorExt::EXTENSION_ID.valid())
simgrid::smpi::ActorExt::EXTENSION_ID = simgrid::s4u::Actor::extension_create<simgrid::smpi::ActorExt>();
- mailbox_ = s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid()));
- mailbox_small_ = s4u::Mailbox::by_name("small-" + std::to_string(actor_->get_pid()));
+ mailbox_ = nullptr;
+ mailbox_small_ = nullptr;
mailboxes_mutex_ = s4u::Mutex::create();
timer_ = xbt_os_timer_new();
state_ = SmpiProcessState::UNINITIALIZED;
return (state_ == SmpiProcessState::INITIALIZED);
}
+/** @brief Return main mailbox of the process */
+s4u::Mailbox* ActorExt::mailbox()
+{
+ if(mailbox_==nullptr)
+ mailbox_=s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid()));
+ return mailbox_;
+}
+
+/** @brief Return mailbox for small messages */
+s4u::Mailbox* ActorExt::mailbox_small()
+{
+ if(mailbox_small_==nullptr)
+ mailbox_small_=s4u::Mailbox::by_name("small-" + std::to_string(actor_->get_pid()));
+ return mailbox_small_;
+}
+
/** @brief Mark a process as initialized (=MPI_Init called) */
void ActorExt::mark_as_initialized()
{
ext->comm_world_ = smpi_deployment_comm_world(ext->instance_id_);
// set the process attached to the mailbox
- ext->mailbox_small_->set_receiver(ext->actor_);
+ ext->mailbox_small()->set_receiver(ext->actor_);
XBT_DEBUG("<%ld> SMPI process has been initialized: %p", ext->actor_->get_pid(), ext->actor_);
}
simgrid::kernel::actor::ActorImpl* owner = nullptr;
const char* file = nullptr;
int line = -1;
+ int recursive_depth = 0;
explicit ObjectOwner(simgrid::kernel::actor::ActorImpl* o) : owner(o) {}
};
return o;
}
-int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line)
+int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line, const char* func)
{
sthread_disable();
auto* self = simgrid::kernel::actor::ActorImpl::self();
[self, objaddr, objname, file, line]() -> bool {
XBT_INFO("%s takes %s", self->get_cname(), objname);
auto* ownership = get_owner(objaddr);
- if (ownership->owner != nullptr) {
+ if (ownership->owner == self) {
+ ownership->recursive_depth++;
+ return true;
+ } else if (ownership->owner != nullptr) {
auto msg = std::string("Unprotected concurent access to ") + objname + ": " + ownership->owner->get_name();
- if (not xbt_log_no_loc)
+ if (not xbt_log_no_loc) {
msg += simgrid::xbt::string_printf(" at %s:%d", ownership->file, ownership->line);
+ if (ownership->recursive_depth > 1) {
+ msg += simgrid::xbt::string_printf(" (and %d other locations)", ownership->recursive_depth - 1);
+ if (ownership->recursive_depth != 2)
+ msg += "s";
+ }
+ } else {
+ msg += simgrid::xbt::string_printf(" from %d location", ownership->recursive_depth);
+ if (ownership->recursive_depth != 1)
+ msg += "s";
+ }
msg += " vs " + self->get_name();
if (xbt_log_no_loc)
msg += std::string(" (locations hidden because of --log=no_loc).");
ownership->owner = self;
ownership->file = file;
ownership->line = line;
+ ownership->recursive_depth = 1;
return true;
},
&observer);
sthread_enable();
return true;
}
-void sthread_access_end(void* objaddr, const char* objname, const char* file, int line)
+void sthread_access_end(void* objaddr, const char* objname, const char* file, int line, const char* func)
{
sthread_disable();
auto* self = simgrid::kernel::actor::ActorImpl::self();
[self, objaddr, objname]() -> void {
XBT_INFO("%s releases %s", self->get_cname(), objname);
auto* ownership = get_owner(objaddr);
- xbt_assert(ownership->owner == self, "safety check failed: %s is not owner of the object it's releasing.",
- self->get_cname());
- ownership->owner = nullptr;
+ xbt_assert(ownership->owner == self,
+ "safety check failed: %s is not owner of the object it's releasing. That object owned by %s.",
+ self->get_cname(), (ownership->owner == nullptr ? "nobody" : ownership->owner->get_cname()));
+ ownership->recursive_depth--;
+ if (ownership->recursive_depth == 0)
+ ownership->owner = nullptr;
},
&observer);
sthread_enable();
static int (*raw_mutex_unlock)(pthread_mutex_t*);
static int (*raw_mutex_destroy)(pthread_mutex_t*);
+static int (*raw_pthread_mutexattr_init)(pthread_mutexattr_t*);
+static int (*raw_pthread_mutexattr_settype)(pthread_mutexattr_t*, int);
+static int (*raw_pthread_mutexattr_gettype)(const pthread_mutexattr_t* restrict, int* restrict);
+static int (*raw_pthread_mutexattr_getrobust)(const pthread_mutexattr_t*, int*);
+static int (*raw_pthread_mutexattr_setrobust)(pthread_mutexattr_t*, int);
+
static unsigned int (*raw_sleep)(unsigned int);
static int (*raw_usleep)(useconds_t);
static int (*raw_gettimeofday)(struct timeval*, void*);
raw_mutex_unlock = dlsym(RTLD_NEXT, "pthread_mutex_unlock");
raw_mutex_destroy = dlsym(RTLD_NEXT, "pthread_mutex_destroy");
+ raw_pthread_mutexattr_init = dlsym(RTLD_NEXT, "pthread_mutexattr_init");
+ raw_pthread_mutexattr_settype = dlsym(RTLD_NEXT, "pthread_mutexattr_settype");
+ raw_pthread_mutexattr_gettype = dlsym(RTLD_NEXT, "pthread_mutexattr_gettype");
+ raw_pthread_mutexattr_getrobust = dlsym(RTLD_NEXT, "pthread_mutexattr_getrobust");
+ raw_pthread_mutexattr_setrobust = dlsym(RTLD_NEXT, "pthread_mutexattr_setrobust");
+
raw_sleep = dlsym(RTLD_NEXT, "sleep");
raw_usleep = dlsym(RTLD_NEXT, "usleep");
raw_gettimeofday = dlsym(RTLD_NEXT, "gettimeofday");
sthread_enable();
return res;
}
+
+#define _STHREAD_CONCAT(a, b) a##b
+#define intercepted_call(name, raw_params, call_params, sim_params) \
+ int _STHREAD_CONCAT(pthread_, name) raw_params \
+ { \
+ if (_STHREAD_CONCAT(raw_pthread_, name) == NULL) \
+ intercepter_init(); \
+ if (sthread_inside_simgrid) \
+ return _STHREAD_CONCAT(raw_pthread_, name) call_params; \
+ \
+ sthread_disable(); \
+ int res = _STHREAD_CONCAT(sthread_, name) sim_params; \
+ sthread_enable(); \
+ return res; \
+ }
+
+intercepted_call(mutexattr_init, (pthread_mutexattr_t * attr), (attr), ((sthread_mutexattr_t*)attr));
+intercepted_call(mutexattr_settype, (pthread_mutexattr_t * attr, int type), (attr, type),
+ ((sthread_mutexattr_t*)attr, type));
+intercepted_call(mutexattr_gettype, (const pthread_mutexattr_t* restrict attr, int* type), (attr, type),
+ ((sthread_mutexattr_t*)attr, type));
+intercepted_call(mutexattr_setrobust, (pthread_mutexattr_t* restrict attr, int robustness), (attr, robustness),
+ ((sthread_mutexattr_t*)attr, robustness));
+intercepted_call(mutexattr_getrobust, (const pthread_mutexattr_t* restrict attr, int* restrict robustness),
+ (attr, robustness), ((sthread_mutexattr_t*)attr, robustness));
+
int pthread_join(pthread_t thread, void** retval)
{
if (raw_pthread_join == NULL)
return raw_mutex_init(mutex, attr);
sthread_disable();
- int res = sthread_mutex_init((sthread_mutex_t*)mutex, attr);
+ int res = sthread_mutex_init((sthread_mutex_t*)mutex, (sthread_mutexattr_t*)attr);
sthread_enable();
return res;
}
int sthread_create(sthread_t* thread, const /*pthread_attr_t*/ void* attr, void* (*start_routine)(void*), void* arg);
int sthread_join(sthread_t thread, void** retval);
+typedef struct {
+ unsigned recursive : 1;
+ unsigned errorcheck : 1;
+ unsigned robust : 1;
+} sthread_mutexattr_t;
+
+int sthread_mutexattr_init(sthread_mutexattr_t* attr);
+int sthread_mutexattr_settype(sthread_mutexattr_t* attr, int type);
+int sthread_mutexattr_gettype(const sthread_mutexattr_t* attr, int* type);
+int sthread_mutexattr_getrobust(const sthread_mutexattr_t* attr, int* robustness);
+int sthread_mutexattr_setrobust(sthread_mutexattr_t* attr, int robustness);
+
typedef struct {
void* mutex;
} sthread_mutex_t;
-int sthread_mutex_init(sthread_mutex_t* mutex, const /*pthread_mutexattr_t*/ void* attr);
+int sthread_mutex_init(sthread_mutex_t* mutex, const sthread_mutexattr_t* attr);
int sthread_mutex_lock(sthread_mutex_t* mutex);
int sthread_mutex_trylock(sthread_mutex_t* mutex);
int sthread_mutex_unlock(sthread_mutex_t* mutex);
int sthread_gettimeofday(struct timeval* tv);
void sthread_sleep(double seconds);
-int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line);
-void sthread_access_end(void* objaddr, const char* objname, const char* file, int line);
+int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line, const char* function);
+void sthread_access_end(void* objaddr, const char* objname, const char* file, int line, const char* function);
#if defined(__cplusplus)
}
/* SimGrid's pthread interposer. Actual implementation of the symbols (see the comment in sthread.h) */
#include "smpi/smpi.h"
+#include "xbt/asserts.h"
+#include "xbt/ex.h"
+#include "xbt/log.h"
#include "xbt/string.hpp"
#include <simgrid/actor.h>
#include <simgrid/s4u/Actor.hpp>
{
/* Do not intercept the main when run from SMPI: it will initialize the simulation properly */
for (int i = 0; envp[i] != nullptr; i++)
- if (std::string_view(envp[i]).rfind("SMPI_GLOBAL_SIZE", 0) == 0)
+ if (std::string_view(envp[i]).rfind("SMPI_GLOBAL_SIZE", 0) == 0) {
+ printf("sthread refuses to intercept the SMPI application %s directly, as its interception is done otherwise.\n",
+ argv[0]);
return raw_main(argc, argv, envp);
+ }
- /* If not in SMPI, the old main becomes an actor in a newly created simulation */
- std::ostringstream id;
- id << std::this_thread::get_id();
+ /* Do not intercept valgrind step 1 */
+ if (not strcmp(argv[0], "/usr/bin/valgrind.bin") || not strcmp(argv[0], "/bin/sh")) {
+ printf("sthread refuses to intercept the execution of %s. Running the application unmodified.\n", argv[0]);
+ fflush(stdout);
+ return raw_main(argc, argv, envp);
+ }
- XBT_DEBUG("sthread main() is starting in thread %s", id.str().c_str());
+ /* If not in SMPI, the old main becomes an actor in a newly created simulation */
+ printf("sthread is intercepting the execution of %s\n", argv[0]);
+ fflush(stdout);
sg4::Engine e(&argc, argv);
auto* zone = sg4::create_full_zone("world");
sthread_enable();
sg4::ActorPtr main_actor = sg4::Actor::create("main thread", lilibeth, raw_main, argc, argv, envp);
- XBT_INFO("Starting the simulation.");
sg4::Engine::get_instance()->run();
sthread_disable();
XBT_INFO("All threads exited. Terminating the simulation.");
return 0;
}
-int sthread_mutex_init(sthread_mutex_t* mutex, const void* /*pthread_mutexattr_t* attr*/)
+int sthread_mutexattr_init(sthread_mutexattr_t* attr)
+{
+ memset(attr, 0, sizeof(*attr));
+ return 0;
+}
+int sthread_mutexattr_settype(sthread_mutexattr_t* attr, int type)
{
- auto m = sg4::Mutex::create();
+ switch (type) {
+ case PTHREAD_MUTEX_NORMAL:
+ xbt_assert(not attr->recursive, "S4U does not allow to remove the recursivness of a mutex.");
+ attr->recursive = 0;
+ break;
+ case PTHREAD_MUTEX_RECURSIVE:
+ attr->recursive = 1;
+ attr->errorcheck = 0; // reset
+ break;
+ case PTHREAD_MUTEX_ERRORCHECK:
+ attr->errorcheck = 1;
+ THROW_UNIMPLEMENTED;
+ break;
+ default:
+ THROW_IMPOSSIBLE;
+ }
+ return 0;
+}
+int sthread_mutexattr_gettype(const sthread_mutexattr_t* attr, int* type)
+{
+ if (attr->recursive)
+ *type = PTHREAD_MUTEX_RECURSIVE;
+ else if (attr->errorcheck)
+ *type = PTHREAD_MUTEX_ERRORCHECK;
+ else
+ *type = PTHREAD_MUTEX_NORMAL;
+ return 0;
+}
+int sthread_mutexattr_getrobust(const sthread_mutexattr_t* attr, int* robustness)
+{
+ *robustness = attr->robust;
+ return 0;
+}
+int sthread_mutexattr_setrobust(sthread_mutexattr_t* attr, int robustness)
+{
+ attr->robust = robustness;
+ if (robustness)
+ THROW_UNIMPLEMENTED;
+ return 0;
+}
+
+int sthread_mutex_init(sthread_mutex_t* mutex, const sthread_mutexattr_t* attr)
+{
+ auto m = sg4::Mutex::create(attr != nullptr && attr->recursive);
intrusive_ptr_add_ref(m.get());
mutex->mutex = m.get();
if (mutex->mutex == nullptr)
sthread_mutex_init(mutex, nullptr);
+ XBT_DEBUG("%s(%p)", __FUNCTION__, mutex);
static_cast<sg4::Mutex*>(mutex->mutex)->lock();
return 0;
}
if (mutex->mutex == nullptr)
sthread_mutex_init(mutex, nullptr);
- return static_cast<sg4::Mutex*>(mutex->mutex)->try_lock();
+ XBT_DEBUG("%s(%p)", __FUNCTION__, mutex);
+ if (static_cast<sg4::Mutex*>(mutex->mutex)->try_lock())
+ return 0;
+ return EBUSY;
}
int sthread_mutex_unlock(sthread_mutex_t* mutex)
if (mutex->mutex == nullptr)
sthread_mutex_init(mutex, nullptr);
+ XBT_DEBUG("%s(%p)", __FUNCTION__, mutex);
static_cast<sg4::Mutex*>(mutex->mutex)->unlock();
return 0;
}
if (mutex->mutex == nullptr)
sthread_mutex_init(mutex, nullptr);
+ XBT_DEBUG("%s(%p)", __FUNCTION__, mutex);
intrusive_ptr_release(static_cast<sg4::Mutex*>(mutex->mutex));
return 0;
}
#if HAVE_GETTIMEOFDAY
struct timeval tv;
gettimeofday(&tv, NULL);
+
+ return (double)tv.tv_sec + (double)tv.tv_usec / 1e6;
#else /* no gettimeofday => poor resolution */
return (double) (time(NULL));
#endif /* HAVE_GETTIMEOFDAY? */
-
- return (double)tv.tv_sec + (double)tv.tv_usec / 1e6;
}
void xbt_os_sleep(double sec)
struct timeval timeout;
timeout.tv_sec = (long)sec;
- timeout.tv_usec = (long)(sec - floor(sec)) * 1e6);
+ timeout.tv_usec = (long)(sec - floor(sec)) * 1e6;
select(0, NULL, NULL, NULL, &timeout);
#endif
add_executable(mbi_${basefile} EXCLUDE_FROM_ALL ${CMAKE_BINARY_DIR}/MBI/${cfile})
target_link_libraries(mbi_${basefile} simgrid)
target_compile_options(mbi_${basefile} PRIVATE "-Wno-unused-variable")
+ target_compile_options(mbi_${basefile} PRIVATE "-Wno-unused-but-set-variable")
set_target_properties(mbi_${basefile} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/MBI)
add_dependencies(tests-mbi mbi_${basefile})
BEGIN_MBI_TESTS
$ mpirun -np 2 ${EXE} 1
- | @{outcome}@
- | @{errormsg}@
+ | @{outcome1}@
+ | @{errormsg1}@
$ mpirun -np 2 ${EXE} 2
- | @{outcome}@
- | @{errormsg}@
+ | @{outcome2}@
+ | @{errormsg2}@
END_MBI_TESTS
////////////////////// End of MBI headers /////////////////// */
replace = patterns.copy()
replace['shortdesc'] = 'Correct call ordering.'
replace['longdesc'] = 'Correct call ordering.'
- replace['outcome'] = 'OK'
- replace['errormsg'] = 'OK'
+ replace['outcome1'] = 'OK'
+ replace['errormsg1'] = 'OK'
+ replace['outcome2'] = 'OK'
+ replace['errormsg2'] = 'OK'
gen.make_file(template, f'InputHazardCallOrdering_{r}_{s}_ok.c', replace)
# Generate the incorrect matching
replace = patterns.copy()
replace['shortdesc'] = 'Missing Send function.'
replace['longdesc'] = 'Missing Send function call for a path depending to input, a deadlock is created.'
- replace['outcome'] = 'ERROR: IHCallMatching'
- replace['errormsg'] = 'P2P mistmatch. Missing @{r}@ at @{filename}@:@{line:MBIERROR}@.'
+ replace['outcome1'] = 'OK'
+ replace['errormsg1'] = 'OK'
+ replace['outcome2'] = 'ERROR: IHCallMatching'
+ replace['errormsg2'] = 'P2P mistmatch. Missing @{r}@ at @{filename}@:@{line:MBIERROR}@.'
replace['errorcond'] = '/* MBIERROR */'
replace['operation1b'] = ''
replace['fini1b'] = ''
replace = patterns.copy()
replace['shortdesc'] = 'Correct call ordering.'
replace['longdesc'] = 'Correct call ordering.'
- replace['outcome'] = 'OK'
- replace['errormsg'] = 'OK'
+ replace['outcome1'] = 'OK'
+ replace['errormsg1'] = 'OK'
+ replace['outcome2'] = 'OK'
+ replace['errormsg2'] = 'OK'
gen.make_file(template, f'InputHazardCallOrdering_{c}_ok.c', replace)
# Generate the incorrect matching
replace = patterns.copy()
replace['shortdesc'] = 'Missing collective function call.'
replace['longdesc'] = 'Missing collective function call for a path depending to input, a deadlock is created.'
- replace['outcome'] = 'ERROR: IHCallMatching'
- replace['errormsg'] = 'P2P mistmatch. Missing @{c}@ at @{filename}@:@{line:MBIERROR}@.'
+ replace['outcome1'] = 'OK'
+ replace['errormsg1'] = 'OK'
+ replace['outcome2'] = 'ERROR: IHCallMatching'
+ replace['errormsg2'] = 'P2P mistmatch. Missing @{c}@ at @{filename}@:@{line:MBIERROR}@.'
replace['errorcond'] = '/* MBIERROR */'
replace['operation1b'] = ''
replace['fini1b'] = ''
'GlobalConcurrency':'DGlobalConcurrency',
# larger scope
'BufferingHazard':'EBufferingHazard',
+ # Input Hazard
+ 'IHCallMatching':'InputHazard',
+
'OK':'FOK'}
error_scope = {
BEGIN_MBI_TESTS
$ mpirun -np 4 $zero_buffer ${EXE}
- | @{outcome1}@
- | @{errormsg1}@
+ | @{outcome_zerob}@
+ | @{errormsg_zerob}@
$ mpirun -np 4 $infty_buffer ${EXE}
- | @{outcome1}@
- | @{errormsg1}@
+ | @{outcome_infty}@
+ | @{errormsg_infty}@
END_MBI_TESTS
////////////////////// End of MBI headers /////////////////// */
replace = patterns.copy()
replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ may not be matched'
replace['longdesc'] = 'Processes 0 and 1 both call @{s}@ and @{r}@. This results in a deadlock depending on the buffering mode'
- replace['outcome1'] = 'ERROR: BufferingHazard'
- replace['errormsg1'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause by two processes call {s} before {r}.'
+ replace['outcome_zerob'] = 'ERROR: BufferingHazard'
+ replace['errormsg_zerob'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause by two processes call {s} before {r}.'
+ replace['outcome_infty'] = 'OK'
+ replace['errormsg_infty'] = 'OK'
gen.make_file(template, f'P2PBuffering_{s}_{r}_{s}_{r}_nok.c', replace)
# Generate the incorrect matching with send message to the same process depending on the buffering mode (send + recv)
replace['dest2'] = '1'
replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ may not be matched'
replace['longdesc'] = 'Processes 0 and 1 both call @{s}@ and @{r}@. This results in a deadlock depending on the buffering mode'
- replace['outcome1'] = 'ERROR: BufferingHazard'
- replace['errormsg1'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause Send message to the same process.'
+ replace['outcome_zerob'] = 'ERROR: BufferingHazard'
+ replace['errormsg_zerob'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause Send message to the same process.'
+ replace['outcome_infty'] = 'OK'
+ replace['errormsg_infty'] = 'OK'
gen.make_file(template, f'P2PBuffering_SameProcess_{s}_{r}_nok.c', replace)
# Generate the incorrect matching with circular send message depending on the buffering mode (send + recv)
replace['operation2c'] = gen.operation[r]("2")
replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ may not be matched'
replace['longdesc'] = 'Processes 0 and 1 both call @{s}@ and @{r}@. This results in a deadlock depending on the buffering mode'
- replace['outcome1'] = 'ERROR: BufferingHazard'
- replace['errormsg1'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause circular send message.'
+ replace['outcome_zerob'] = 'ERROR: BufferingHazard'
+ replace['errormsg_zerob'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause circular send message.'
+ replace['outcome_infty'] = 'OK'
+ replace['errormsg_infty'] = 'OK'
gen.make_file(template, f'P2PBuffering_Circular_{s}_{r}_nok.c', replace)
# Generate the incorrect matching depending on the buffering mode (recv + send)
replace = patterns.copy()
replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ are not matched'
replace['longdesc'] = 'Processes 0 and 1 both call @{r}@ and @{s}@. This results in a deadlock'
- replace['outcome1'] = 'ERROR: CallMatching'
- replace['errormsg1'] = 'ERROR: CallMatching'
+ replace['outcome_zerob'] = 'ERROR: CallMatching'
+ replace['errormsg_zerob'] = 'ERROR: CallMatching'
+ replace['outcome_infty'] = 'ERROR: CallMatching'
+ replace['errormsg_infty'] = 'ERROR: CallMatching'
replace['operation1a'] = gen.operation[r]("2")
replace['fini1a'] = gen.fini[r]("2")
replace['operation2a'] = gen.operation[s]("1")
replace = patterns.copy()
replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ are correctly matched'
replace['longdesc'] = 'Process 0 calls @{s}@ and process 1 calls @{r}@.'
- replace['outcome1'] = 'OK'
- replace['errormsg1'] = 'OK'
- replace['fini1a'] = gen.fini[s]("1")
- replace['fini2a'] = gen.fini[r]("2")
+ replace['outcome_zerob'] = 'OK'
+ replace['errormsg_zerob'] = 'OK'
+ replace['outcome_infty'] = 'OK'
+ replace['errormsg_infty'] = 'OK'
+ patterns['init1'] = gen.init[s]("1")
replace['operation1a'] = gen.operation[s]("1")
- replace['operation2a'] = gen.operation[r]("2")
+ replace['fini1a'] = gen.fini[s]("1")
+ replace['operation2a'] = ''
+ replace['fini2a'] = ''
+
+ patterns['init2'] = gen.init[r]("2")
+ replace['operation1b'] = gen.operation[r]("2")
+ replace['fini1b'] = gen.fini[r]("2")
+ replace['operation2b'] = ''
+ replace['fini2b'] = ''
gen.make_file(template, f'P2PCallMatching_{s}_{r}_{r}_{s}_ok.c', replace)
# Generate a code with non distinct buffer
replace = patterns.copy()
replace['shortdesc'] = 'Invalid buffer on Sendrecv function.'
- replace['longdesc'] = 'Invalid buffer on Sendrecv, the tow buffers must be distinct.'
+ replace['longdesc'] = 'Invalid buffer on Sendrecv, the two buffers must be distinct.'
replace['outcome'] = 'ERROR: InvalidBuffer'
replace['errormsg'] = '@{sr}@ at @{filename}@:@{line:MBIERROR}@ send buffer and recv buffer must be distinct.'
replace['change_arg'] = gen.write[sr]("2")
operation['MPI_Sendrecv'] = lambda n: f'MPI_Sendrecv(psbuf{n}, buff_size, type, dest, stag, prbuf{n}, buff_size, type, src, rtag, newcom, &sta{n});'
fini['MPI_Sendrecv'] = lambda n: ""
free['MPI_Sendrecv'] = lambda n: ""
-write['MPI_Sendrecv'] = lambda n: f"prbuf{n} = &sbuf{n}[2];"
+write['MPI_Sendrecv'] = lambda n: f"prbuf{n} = &sbuf{n}[0];"
### P2P:nonblocking
# libraries
install(TARGETS simgrid DESTINATION ${CMAKE_INSTALL_LIBDIR}/)
+if("${CMAKE_SYSTEM}" MATCHES "Linux")
+ install(TARGETS sthread DESTINATION ${CMAKE_INSTALL_LIBDIR}/)
+endif()
# pkg-config files
configure_file("${CMAKE_HOME_DIRECTORY}/tools/pkg-config/simgrid.pc.in"
if("${CMAKE_SYSTEM}" MATCHES "Linux")
add_library(sthread SHARED ${STHREAD_SRC})
+ set_target_properties(sthread PROPERTIES VERSION ${libsimgrid_version})
set_property(TARGET sthread
APPEND PROPERTY INCLUDE_DIRECTORIES "${INTERNAL_INCLUDES}")
target_link_libraries(sthread simgrid)
NUMBER_OF_PROCESSORS="$(nproc)" || NUMBER_OF_PROCESSORS=1
GENERATOR="Unix Makefiles"
+BUILDER=make
+VERBOSE_BUILD="VERBOSE=1"
+if which ninja 2>/dev/null >/dev/null ; then
+ GENERATOR=Ninja
+ BUILDER=ninja
+ VERBOSE_BUILD="-v"
+fi
ulimit -c 0 || true
echo "XX"
cmake -G"$GENERATOR" -Denable_documentation=OFF "$WORKSPACE"
-make dist -j $NUMBER_OF_PROCESSORS
+${BUILDER} dist -j $NUMBER_OF_PROCESSORS
SIMGRID_VERSION=$(cat VERSION)
echo "XX"
-DCMAKE_CXX_COMPILER_LAUNCHER=ccache \
"$SRCFOLDER"
-make -j $NUMBER_OF_PROCESSORS VERBOSE=1 tests
+${BUILDER} -j $NUMBER_OF_PROCESSORS ${VERBOSE_BUILD} tests
echo "XX"
echo "XX Run the tests"
rm -rf "$INSTALL"
- make install
+ ${BUILDER} install
fi
echo "XX"
export LD_LIBRARY_PATH=$PWD/simgrid-dev/lib/:$LD_LIBRARY_PATH
export JHBUILD_RUN_AS_ROOT=1
+#workaround issue with ntpoly 3.0.0
+sed -i 's|repository type="tarball" name="ntpoly" href="https://github.com/william-dawson/NTPoly/archive/"|repository type="git" name="ntpoly" href="https://github.com/william-dawson/"|' ../modulesets/hpc-upstream.modules
+sed -i 's|module="ntpoly-v3.0.0.tar.gz"|module="ntpoly"|' ../modulesets/hpc-upstream.modules
+
../Installer.py autogen -y
../Installer.py -f ../../tools/jenkins/gfortran-simgrid.rc -y build
$SUDO apt-get update
$SUDO apt-get -y install build-essential libboost-all-dev wget git xsltproc
-for i in master 1.3 ; do
+for i in master starpu-1.3 ; do
echo "XXXXXXXXXXXXXXXX Build and test StarPU $i"
rm -rf starpu*
- wget https://files.inria.fr/starpu/simgrid/starpu-simgrid-$i.tar.gz
- md5sum starpu-simgrid-$i.tar.gz
- tar xf starpu-simgrid-$i.tar.gz
+ wget https://files.inria.fr/starpu/testing/$i/starpu-nightly-latest.tar.gz
+ md5sum starpu-nightly-latest.tar.gz
+ tar xf starpu-nightly-latest.tar.gz
cd starpu-1*
# NOTE: Do *not* introduce parameters to "make it work" here.
}
get_ns3(){
- sed -n 's/.*-- ns-3 found (v\(3[-.0-9a-z]\+\); minor:.*/\1/p;T;q' ./consoleText
+ sed -n 's/.*-- ns-3 found (v\(3[-.0-9a-z]\+\).*/\1/p;T;q' ./consoleText
}
get_python(){
p Check that we return the expected return value on SEGV
! expect return 11
< $ perl segfault.pl
-$ ${bindir:=.}/tesh
+$ ${bindir:=.}/tesh --no-auto-valgrind
> Test suite from stdin
> [(stdin):1] perl segfault.pl
> Test suite `(stdin)': NOK (<(stdin):1> got signal SIGSEGV)
> @@ -0,0 +1 @@
> +I crashed
> Test suite `(stdin)': NOK (<(stdin):2> output mismatch)
+> In addition, <(stdin):2> got signal SIGTERM.
$ ${bindir:=.}/tesh
self.args_suffix = ""
self.ignore_regexps_common = []
self.jenkins = False # not a Jenkins run by default
+ self.auto_valgrind = True
self.timeout = 10 # default value: 10 sec
self.wrapper = None
self.keep = False
self.output_display = False
self.sort = -1
+ self.rerun_with_valgrind = False
self.ignore_regexps = TeshState().ignore_regexps_common
_thread.start_new_thread(Cmd._run, (self, lock))
else:
self._run()
+ if self.rerun_with_valgrind and TeshState().auto_valgrind:
+ print('\n\n\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX')
+ print( 'XXXXXXXXX Rerunning this test with valgrind to help debugging it XXXXXXXXX')
+ print( 'XXXXXXXX (this will fail if valgrind is not installed, of course) XXXXXXXX')
+ print( 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n\n\n')
+
+ self.args = "valgrind " + self.args
+ self._run()
return True
def _run(self, lock=None):
print('\n'.join(logs))
return
- if self.output_display:
- logs.append(str(stdout_data))
-
# remove text colors
ansi_escape = re.compile(r'\x1b[^m]*m')
stdout_data = ansi_escape.sub('', stdout_data)
+ if self.output_display:
+ logs.append(str(stdout_data))
+
+ if self.rerun_with_valgrind:
+ print(str(stdout_data), file=sys.stderr)
+ return
+
if self.ignore_output:
logs.append("(ignoring the output of <{cmd}> as requested)".format(cmd=cmd_name))
else:
logs.append("Test suite `{file}': NOK (<{cmd}> output mismatch)".format(
file=FileReader().filename, cmd=cmd_name))
+
+ # Also report any failed return code and/or signal we got in case of output mismatch
+ if not proc.returncode in self.expect_return:
+ if proc.returncode >= 0:
+ logs.append("In addition, <{cmd}> returned code {code}.".format(
+ cmd=cmd_name, code=proc.returncode))
+ else:
+ logs.append("In addition, <{cmd}> got signal {sig}.".format(cmd=cmd_name,
+ sig=SIGNALS_TO_NAMES_DICT[-proc.returncode]))
+ if proc.returncode == -signal.SIGSEGV:
+ self.rerun_with_valgrind = True
+
if lock is not None:
lock.release()
if TeshState().keep:
logs.append("Test suite `{file}': NOK (<{cmd}> got signal {sig})".format(
file=FileReader().filename, cmd=cmd_name,
sig=SIGNALS_TO_NAMES_DICT[-proc.returncode]))
+
+ if proc.returncode == -signal.SIGSEGV:
+ self.rerun_with_valgrind = True
+
if lock is not None:
lock.release()
TeshState().set_return_code(max(-proc.returncode, 1))
'--ignore-jenkins',
action='store_true',
help='ignore all cruft generated on SimGrid continuous integration servers')
+ group1.add_argument(
+ '--no-auto-valgrind',
+ action='store_true',
+ help='do not automaticall launch segfaulting commands in valgrind')
group1.add_argument('--wrapper', metavar='arg', help='Run each command in the provided wrapper (eg valgrind)')
group1.add_argument(
'--keep',
]
TeshState().jenkins = True # This is a Jenkins build
+ if options.no_auto_valgrind:
+ TeshState().auto_valgrind = False
+
if options.teshfile is None:
file = FileReader(None)
print("Test suite from stdin")