From: Martin Quinson Date: Mon, 30 Oct 2023 02:36:31 +0000 (+0000) Subject: Merge branch 'example-battery-chiller-solar' into 'master' X-Git-Tag: v3.35~89^2~29 X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/5d565c9f388ee34b957b69a595e98bbfaf06fb10?hp=05ea8b91f5cb2baace69b4373f4fab3c7465b116 Merge branch 'example-battery-chiller-solar' into 'master' add battery-chiller-solar example. See merge request simgrid/simgrid!174 --- diff --git a/BuildSimGrid.sh b/BuildSimGrid.sh index 66b309eb8b..410d1ddb70 100755 --- a/BuildSimGrid.sh +++ b/BuildSimGrid.sh @@ -17,6 +17,7 @@ fi target=examples ncores=$(grep -c processor /proc/cpuinfo) +halfcores=$(expr $ncores / 2 + 1) install_path=$(sed -n 's/^CMAKE_INSTALL_PREFIX:PATH=//p' CMakeCache.txt) if [ -e ${install_path} ] && [ -d ${install_path} ] && [ -x ${install_path} ] && [ -w ${install_path} ] ; then @@ -32,7 +33,8 @@ fi ( echo "install_path: ${install_path}" echo "Target: ${target}" - echo "Cores: ${ncores}" - (nice ${builder} -j${ncores} ${target} tests || make ${target} tests) && nice ctest -j${ncores} --output-on-failure ; date + echo "Cores to build: ${ncores}" + echo "Cores to test: ${halfcores}" + (nice ${builder} -j${ncores} ${target} tests || ${builder} ${target} tests) && nice ctest -j${halfcores} --output-on-failure ; date ) 2>&1 | tee BuildSimGrid.sh.log diff --git a/CMakeLists.txt b/CMakeLists.txt index 1a4a2459c5..ec4f39bbd1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -217,21 +217,25 @@ if(enable_ns3) endif() ### Check for Eigen library -set(SIMGRID_HAVE_EIGEN3 OFF) -find_package (Eigen3 3.3 CONFIG - HINTS ${EIGEN3_HINT}) -if (Eigen3_FOUND) - set(SIMGRID_HAVE_EIGEN3 ON) - message(STATUS "Found Eigen3: ${EIGEN3_INCLUDE_DIR}") - include_directories(${EIGEN3_INCLUDE_DIR}) - if ("3.3.4" VERSION_EQUAL EIGEN3_VERSION_STRING AND CMAKE_COMPILER_IS_GNUCC) - message(STATUS "Avoid build error of Eigen3 v3.3.4 using -Wno-error=int-in-bool-context") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=int-in-bool-context") +if ((NOT DEFINED EIGEN3_HINT) OR (NOT EIGEN3_HINT STRLESS_EQUAL "OFF")) + set(SIMGRID_HAVE_EIGEN3 OFF) + find_package (Eigen3 3.3 CONFIG + HINTS ${EIGEN3_HINT}) + if (Eigen3_FOUND) + set(SIMGRID_HAVE_EIGEN3 ON) + message(STATUS "Found Eigen3: ${EIGEN3_INCLUDE_DIR}") + include_directories(${EIGEN3_INCLUDE_DIR}) + if ("3.3.4" VERSION_EQUAL EIGEN3_VERSION_STRING AND CMAKE_COMPILER_IS_GNUCC) + message(STATUS "Avoid build error of Eigen3 v3.3.4 using -Wno-error=int-in-bool-context") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=int-in-bool-context") + endif() + else() + message(STATUS "Disabling model BMF because Eigen3 was not found. If it's installed, use EIGEN3_HINT to hint cmake about the location of Eigen3Config.cmake") endif() + mark_as_advanced(Eigen3_DIR) else() - message(STATUS "Disabling model BMF because Eigen3 was not found. If it's installed, use EIGEN3_HINT to hint cmake about the location of Eigen3Config.cmake") + message(STATUS "Disabling Eigen3 as requested by the user (EIGEN3_HINT is set to 'OFF')") endif() -mark_as_advanced(Eigen3_DIR) # Check for our JSON dependency set(SIMGRID_HAVE_JSON 0) @@ -964,4 +968,3 @@ execute_process(COMMAND ${CMAKE_COMMAND} -E make_directory ${PROJECT_BINARY_DIR} file(WRITE ${PROJECT_BINARY_DIR}/Testing/Notes/Build "GIT version : ${GIT_VERSION}\n") file(APPEND ${PROJECT_BINARY_DIR}/Testing/Notes/Build "Release : simgrid-${release_version}\n") -INCLUDE(Dart) diff --git a/ChangeLog b/ChangeLog index 893664d9bc..605242b95a 100644 --- a/ChangeLog +++ b/ChangeLog @@ -8,11 +8,16 @@ S4U: on symmetric routes. - Introduce a Mailbox::get_async() with no payload parameter. You can use the new Comm::get_payload() once the communication is over to retrieve the payload. + - Implement recursive mutexes. Simply pass true to the constructor to get one. SMPI: - New SMPI_app_instance_join(): wait for the completion of a started MPI instance - MPI_UNIVERSE_SIZE now initialized to the total amount of hosts in the platform +sthread: + - Implement recursive pthreads. + - Many bug fixes. + Python: - Make the host_load plugin available from Python. See examples/python/plugin-host-load - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead. diff --git a/MANIFEST.in b/MANIFEST.in index 422f0e42c9..2c3927c307 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -352,8 +352,6 @@ include examples/cpp/network-ns3/s4u-network-ns3-timed.tesh include examples/cpp/network-ns3/s4u-network-ns3.cpp include examples/cpp/network-wifi/s4u-network-wifi.cpp include examples/cpp/network-wifi/s4u-network-wifi.tesh -include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp -include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.cpp include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.tesh include examples/cpp/platform-failures/s4u-platform-failures.cpp @@ -384,6 +382,8 @@ include examples/cpp/replay-io/s4u-replay-io.txt include examples/cpp/replay-io/s4u-replay-io_d.xml include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.cpp include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.tesh +include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp +include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh include examples/cpp/synchro-barrier/s4u-mc-synchro-barrier.tesh include examples/cpp/synchro-barrier/s4u-synchro-barrier.cpp include examples/cpp/synchro-barrier/s4u-synchro-barrier.tesh @@ -398,6 +398,8 @@ include examples/cpp/synchro-mutex/s4u-synchro-mutex.tesh include examples/cpp/synchro-semaphore/s4u-mc-synchro-semaphore.tesh include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.cpp include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.tesh +include examples/cpp/task-dispatch/s4u-task-dispatch.cpp +include examples/cpp/task-dispatch/s4u-task-dispatch.tesh include examples/cpp/task-io/s4u-task-io.cpp include examples/cpp/task-io/s4u-task-io.tesh include examples/cpp/task-microservice/s4u-task-microservice.cpp @@ -645,9 +647,12 @@ include examples/smpi/trace_call_location/trace_call_location.c include examples/smpi/trace_call_location/trace_call_location.tesh include examples/smpi/trace_simple/trace_simple.c include examples/smpi/trace_simple/trace_simple.tesh +include examples/sthread/pthread-mc-mutex-recursive.tesh include examples/sthread/pthread-mc-mutex-simple.tesh include examples/sthread/pthread-mc-mutex-simpledeadlock.tesh include examples/sthread/pthread-mc-producer-consumer.tesh +include examples/sthread/pthread-mutex-recursive.c +include examples/sthread/pthread-mutex-recursive.tesh include examples/sthread/pthread-mutex-simple.c include examples/sthread/pthread-mutex-simple.tesh include examples/sthread/pthread-mutex-simpledeadlock.c diff --git a/docs/source/Installing_SimGrid.rst b/docs/source/Installing_SimGrid.rst index 477b53ed9a..7d268678ef 100644 --- a/docs/source/Installing_SimGrid.rst +++ b/docs/source/Installing_SimGrid.rst @@ -108,7 +108,7 @@ Eigen3 (optional) - On Debian / Ubuntu: ``apt install libeigen3-dev`` - On CentOS / Fedora: ``dnf install eigen3-devel`` - On macOS with homebrew: ``brew install eigen`` - - Use EIGEN3_HINT to specify where it's installed if cmake doesn't find it automatically. + - Use EIGEN3_HINT to specify where it's installed if cmake doesn't find it automatically. Set EIGEN3_HINT=OFF to disable detection even if it could be found. JSON (optional, for the DAG wfcommons loader) - On Debian / Ubuntu: ``apt install nlohmann-json3-dev`` - Use nlohmann_json_HINT to specify where it's installed if cmake doesn't find it automatically. @@ -269,6 +269,7 @@ NS3_HINT (empty by default) EIGEN3_HINT (empty by default) Alternative path into which Eigen3 should be searched for. + Providing the value OFF as an hint will disable the detection alltogether. SIMGRID_PYTHON_LIBDIR (auto-detected) Where to install the Python module library. By default, it is set to the cmake Python3_SITEARCH variable if installing to /usr, diff --git a/docs/source/app_s4u.rst b/docs/source/app_s4u.rst index 23e0e04195..8695720ee5 100644 --- a/docs/source/app_s4u.rst +++ b/docs/source/app_s4u.rst @@ -203,32 +203,33 @@ Repeatable Activities In order to simulate the execution of Dataflow applications, we introduced the concept of |API_s4u_Tasks|, that can be seen as repeatable activities. A Dataflow -is defined as a graph of |API_s4u_Tasks| through which circulate Tokens. Tokens -can carry any user-defined data, using the same internal mechanisms as for the -other simulated objects. Each Task has to receive a token from each of its -predecessor to fire a new instance of a :ref:`Communication `, -:ref:`Execution `, or :ref:`I/O ` activity. -On completion of this activity, the Task propagates tokens -to its successors, and waits for the next set of tokens to arrive. -Multiple instances of the same Task can run in parallel by adjusting its -horizontal scaling with -:cpp:func:`s4u::Task::set_parallelism_degree() `. +is defined as a graph of |API_s4u_Tasks|, where each |API_s4u_Tasks| has a set of +successors and predecessors. When a |API_s4u_Tasks| ends it sends a token to each +of its successors. Each |API_s4u_Tasks| has to receive a token from each of its +predecessor to start. Tokens can carry any user-defined data. -:ref:`Communications ` (started on Mailboxes and consuming links), -:ref:`Executions ` (started on Host and consuming CPU resources) -:ref:`I/O ` (started on and consuming disks). +|API_s4u_Tasks| are composed of several instances: a dispatcher, a collector, and +instance_0 to instance_n. The dispatcher rely on a load balancing function to select +the next instance to fire. Once this instance finishes it fires the collector. + +Each instance of an |API_s4u_ExecTask| can be placed on a different host. +|API_s4u_Comm| activities are automatically created when an instance triggers +another instance on a different host. Each instance has its own parallelism degree +to scale horizontally on several cores. To initiate the execution of a Dataflow, it is possible to some make |API_s4u_Tasks| fire one or more activities without waiting for any token with the :cpp:func:`s4u::Task::enqueue_firings() ` function. -The parameters and successors of a Task can be redefined at runtime by attaching +The parameters of Tasks can be redefined at runtime by attaching callbacks to the :cpp:func:`s4u::Task::on_this_start ` and :cpp:func:`s4u::Task::on_this_completion ` -signals. +signals. The former is triggered by instances others than the dispatcher and the collector, +and the latter is triggered by the collector. + .. _s4u_mailbox: diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index a0fc73f401..6f1af7f34e 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -172,7 +172,7 @@ foreach (example activityset-testany activityset-waitany activityset-waitall act mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert network-ns3 network-ns3-wifi network-wifi io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent - task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load + task-dispatch task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load solar-panel-simple platform-comm-serialize platform-failures platform-profile platform-properties plugin-host-load plugin-jbod plugin-link-load plugin-prodcons diff --git a/examples/cpp/clusters-multicpu/s4u-clusters-multicpu.cpp b/examples/cpp/clusters-multicpu/s4u-clusters-multicpu.cpp index 8ef63370cc..b2ca2e78f7 100644 --- a/examples/cpp/clusters-multicpu/s4u-clusters-multicpu.cpp +++ b/examples/cpp/clusters-multicpu/s4u-clusters-multicpu.cpp @@ -87,8 +87,7 @@ public: * @param id Internal identifier in the torus (for information) * @return netpoint, gateway: the netpoint to the StarZone and CPU0 as gateway */ -static std::pair -create_hostzone(const sg4::NetZone* zone, const std::vector& /*coord*/, unsigned long id) +static sg4::NetZone* create_hostzone(const sg4::NetZone* zone, const std::vector& /*coord*/, unsigned long id) { constexpr int num_cpus = 8; //!< Number of CPUs in the zone constexpr double speed = 1e9; //!< Speed of each CPU @@ -101,14 +100,13 @@ create_hostzone(const sg4::NetZone* zone, const std::vector& /*co /* setting my Torus parent zone */ host_zone->set_parent(zone); - simgrid::kernel::routing::NetPoint* gateway = nullptr; /* create CPUs */ for (int i = 0; i < num_cpus; i++) { std::string cpu_name = hostname + "-cpu" + std::to_string(i); const sg4::Host* host = host_zone->create_host(cpu_name, speed); /* the first CPU is the gateway */ if (i == 0) - gateway = host->get_netpoint(); + host_zone->set_gateway(host->get_netpoint()); /* create split-duplex link */ auto* link = host_zone->create_split_duplex_link("link-" + cpu_name, link_bw)->set_latency(link_lat); /* connecting CPU to outer world */ @@ -116,7 +114,7 @@ create_hostzone(const sg4::NetZone* zone, const std::vector& /*co } /* seal newly created netzone */ host_zone->seal(); - return std::make_pair(host_zone->get_netpoint(), gateway); + return host_zone; } /*************************************************************************************************/ diff --git a/examples/cpp/task-dispatch/s4u-task-dispatch.cpp b/examples/cpp/task-dispatch/s4u-task-dispatch.cpp new file mode 100644 index 0000000000..10bd0d2364 --- /dev/null +++ b/examples/cpp/task-dispatch/s4u-task-dispatch.cpp @@ -0,0 +1,103 @@ +/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "simgrid/s4u.hpp" + +XBT_LOG_NEW_DEFAULT_CATEGORY(task_dispatch, "Messages specific for this s4u example"); +namespace sg4 = simgrid::s4u; + +static void manager(sg4::ExecTaskPtr t) +{ + auto PM0 = sg4::Engine::get_instance()->host_by_name("PM0"); + auto PM1 = sg4::Engine::get_instance()->host_by_name("PM1"); + + XBT_INFO("Test set_flops"); + t->enqueue_firings(2); + sg4::this_actor::sleep_for(50); + XBT_INFO("Set instance_0 flops to 50."); + t->set_flops(50 * PM0->get_speed()); + sg4::this_actor::sleep_for(250); + t->set_flops(100 * PM0->get_speed()); + + XBT_INFO("Test set_parallelism degree"); + t->enqueue_firings(3); + sg4::this_actor::sleep_for(50); + XBT_INFO("Set Task parallelism degree to 2."); + t->set_parallelism_degree(2); + sg4::this_actor::sleep_for(250); + t->set_parallelism_degree(1); + + XBT_INFO("Test set_host dispatcher"); + t->enqueue_firings(2); + sg4::this_actor::sleep_for(50); + XBT_INFO("Move dispatcher to PM1"); + t->set_host(PM1, "dispatcher"); + t->set_internal_bytes(1e6, "dispatcher"); + sg4::this_actor::sleep_for(250); + t->set_host(PM0, "dispatcher"); + + XBT_INFO("Test set_host instance_0"); + t->enqueue_firings(2); + sg4::this_actor::sleep_for(50); + XBT_INFO("Move instance_0 to PM1"); + t->set_host(PM1, "instance_0"); + t->set_flops(100 * PM1->get_speed()); + t->set_internal_bytes(1e6, "instance_0"); + sg4::this_actor::sleep_for(250); + t->set_host(PM0, "instance_0"); + t->set_flops(100 * PM0->get_speed()); + + XBT_INFO("Test set_host collector"); + t->enqueue_firings(2); + sg4::this_actor::sleep_for(50); + XBT_INFO("Move collector to PM1"); + t->set_host(PM1, "collector"); + sg4::this_actor::sleep_for(250); + t->set_host(PM0, "collector"); + + XBT_INFO("Test add_instances"); + t->enqueue_firings(1); + sg4::this_actor::sleep_for(50); + XBT_INFO("Add 1 instance and update load balancing function"); + t->add_instances(1); + t->set_load_balancing_function([]() { + static int round_robin_counter = 0; + int ret = round_robin_counter; + round_robin_counter = round_robin_counter == 1 ? 0 : round_robin_counter + 1; + return "instance_" + std::to_string(ret); + }); + t->enqueue_firings(2); + sg4::this_actor::sleep_for(250); + + XBT_INFO("Test remove_instances"); + XBT_INFO("Remove 1 instance and update load balancing function"); + t->remove_instances(1); + t->set_load_balancing_function([]() { return "instance_0"; }); + t->enqueue_firings(2); + sg4::this_actor::sleep_for(300); +} + +int main(int argc, char* argv[]) +{ + sg4::Engine e(&argc, argv); + e.load_platform(argv[1]); + auto PM0 = e.host_by_name("PM0"); + auto PM1 = sg4::Engine::get_instance()->host_by_name("PM1"); + + auto a = sg4::ExecTask::init("A", 100 * PM0->get_speed(), PM0); + auto b = sg4::ExecTask::init("B", 50 * PM0->get_speed(), PM0); + auto c = sg4::CommTask::init("C", 1e6, PM1, PM0); + + a->add_successor(b); + + sg4::Task::on_completion_cb( + [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); + sg4::Task::on_start_cb([](const sg4::Task* t) { XBT_INFO("Task %s start", t->get_name().c_str()); }); + + sg4::Actor::create("manager", PM0, manager, a); + + e.run(); + return 0; +} diff --git a/examples/cpp/task-dispatch/s4u-task-dispatch.tesh b/examples/cpp/task-dispatch/s4u-task-dispatch.tesh new file mode 100644 index 0000000000..a4ace5478a --- /dev/null +++ b/examples/cpp/task-dispatch/s4u-task-dispatch.tesh @@ -0,0 +1,82 @@ +#!/usr/bin/env tesh + +> > $ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml +> [PM0:manager:(1) 0.000000] [task_dispatch/INFO] Test set_flops +> [0.000000] [task_dispatch/INFO] Task A start +> [0.000000] [task_dispatch/INFO] Task A start +> [PM0:manager:(1) 50.000000] [task_dispatch/INFO] Set instance_0 flops to 50. +> [100.000000] [task_dispatch/INFO] Task A finished (1) +> [100.000000] [task_dispatch/INFO] Task B start +> [150.000000] [task_dispatch/INFO] Task A finished (2) +> [150.000000] [task_dispatch/INFO] Task B start +> [150.000000] [task_dispatch/INFO] Task B finished (1) +> [200.000000] [task_dispatch/INFO] Task B finished (2) +> [PM0:manager:(1) 300.000000] [task_dispatch/INFO] Test set_parallelism degree +> [300.000000] [task_dispatch/INFO] Task A start +> [300.000000] [task_dispatch/INFO] Task A start +> [300.000000] [task_dispatch/INFO] Task A start +> [PM0:manager:(1) 350.000000] [task_dispatch/INFO] Set Task parallelism degree to 2. +> [400.000000] [task_dispatch/INFO] Task A finished (3) +> [400.000000] [task_dispatch/INFO] Task B start +> [450.000000] [task_dispatch/INFO] Task A finished (4) +> [450.000000] [task_dispatch/INFO] Task B start +> [450.000000] [task_dispatch/INFO] Task B finished (3) +> [500.000000] [task_dispatch/INFO] Task A finished (5) +> [500.000000] [task_dispatch/INFO] Task B start +> [500.000000] [task_dispatch/INFO] Task B finished (4) +> [550.000000] [task_dispatch/INFO] Task B finished (5) +> [PM0:manager:(1) 600.000000] [task_dispatch/INFO] Test set_host dispatcher +> [600.000000] [task_dispatch/INFO] Task A start +> [600.000000] [task_dispatch/INFO] Task A start +> [PM0:manager:(1) 650.000000] [task_dispatch/INFO] Move dispatcher to PM1 +> [700.000000] [task_dispatch/INFO] Task A finished (6) +> [700.000000] [task_dispatch/INFO] Task B start +> [750.000000] [task_dispatch/INFO] Task B finished (6) +> [800.009961] [task_dispatch/INFO] Task A finished (7) +> [800.009961] [task_dispatch/INFO] Task B start +> [850.009961] [task_dispatch/INFO] Task B finished (7) +> [PM0:manager:(1) 900.000000] [task_dispatch/INFO] Test set_host instance_0 +> [900.000000] [task_dispatch/INFO] Task A start +> [900.000000] [task_dispatch/INFO] Task A start +> [PM0:manager:(1) 950.000000] [task_dispatch/INFO] Move instance_0 to PM1 +> [1000.000000] [task_dispatch/INFO] Task A finished (8) +> [1000.000000] [task_dispatch/INFO] Task B start +> [1050.000000] [task_dispatch/INFO] Task B finished (8) +> [1100.019922] [task_dispatch/INFO] Task A finished (9) +> [1100.019922] [task_dispatch/INFO] Task B start +> [1150.019922] [task_dispatch/INFO] Task B finished (9) +> [PM0:manager:(1) 1200.000000] [task_dispatch/INFO] Test set_host collector +> [1200.000000] [task_dispatch/INFO] Task A start +> [1200.000000] [task_dispatch/INFO] Task A start +> [PM0:manager:(1) 1250.000000] [task_dispatch/INFO] Move collector to PM1 +> [1300.000000] [task_dispatch/INFO] Task A finished (10) +> [1300.000000] [task_dispatch/INFO] Task B start +> [1350.000000] [task_dispatch/INFO] Task B finished (10) +> [1400.009961] [task_dispatch/INFO] Task A finished (11) +> [1400.009961] [task_dispatch/INFO] Task B start +> [1450.009961] [task_dispatch/INFO] Task B finished (11) +> [PM0:manager:(1) 1500.000000] [task_dispatch/INFO] Test add_instances +> [1500.000000] [task_dispatch/INFO] Task A start +> [PM0:manager:(1) 1550.000000] [task_dispatch/INFO] Add 1 instance and update load balancing function +> [1550.000000] [task_dispatch/INFO] Task A start +> [1550.000000] [task_dispatch/INFO] Task A start +> [1600.000000] [task_dispatch/INFO] Task A finished (12) +> [1600.000000] [task_dispatch/INFO] Task B start +> [1650.000000] [task_dispatch/INFO] Task A finished (13) +> [1650.000000] [task_dispatch/INFO] Task B start +> [1650.000000] [task_dispatch/INFO] Task B finished (12) +> [1700.000000] [task_dispatch/INFO] Task A finished (14) +> [1700.000000] [task_dispatch/INFO] Task B start +> [1700.000000] [task_dispatch/INFO] Task B finished (13) +> [1750.000000] [task_dispatch/INFO] Task B finished (14) +> [PM0:manager:(1) 1800.000000] [task_dispatch/INFO] Test remove_instances +> [PM0:manager:(1) 1800.000000] [task_dispatch/INFO] Remove 1 instance and update load balancing function +> [1800.000000] [task_dispatch/INFO] Task A start +> [1800.000000] [task_dispatch/INFO] Task A start +> [1900.000000] [task_dispatch/INFO] Task A finished (15) +> [1900.000000] [task_dispatch/INFO] Task B start +> [1950.000000] [task_dispatch/INFO] Task B finished (15) +> [2000.000000] [task_dispatch/INFO] Task A finished (16) +> [2000.000000] [task_dispatch/INFO] Task B start +> [2050.000000] [task_dispatch/INFO] Task B finished (16) +> \ No newline at end of file diff --git a/examples/cpp/task-parallelism/s4u-task-parallelism.tesh b/examples/cpp/task-parallelism/s4u-task-parallelism.tesh index 938ad9677f..492d015adb 100644 --- a/examples/cpp/task-parallelism/s4u-task-parallelism.tesh +++ b/examples/cpp/task-parallelism/s4u-task-parallelism.tesh @@ -2,40 +2,40 @@ $ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml > [0.000000] [task_parallelism/INFO] Task exec_A start -> [100.000000] [task_parallelism/INFO] Task exec_A finished (1) > [100.000000] [task_parallelism/INFO] Task exec_A start +> [100.000000] [task_parallelism/INFO] Task exec_A finished (1) > [200.000000] [task_parallelism/INFO] Task exec_A finished (2) > [300.000000] [task_parallelism/INFO] Task exec_A start > [300.000000] [task_parallelism/INFO] Task exec_A start -> [400.000000] [task_parallelism/INFO] Task exec_A finished (3) > [400.000000] [task_parallelism/INFO] Task exec_A start -> [400.000000] [task_parallelism/INFO] Task exec_A finished (4) > [400.000000] [task_parallelism/INFO] Task exec_A start +> [400.000000] [task_parallelism/INFO] Task exec_A finished (3) +> [400.000000] [task_parallelism/INFO] Task exec_A finished (4) > [500.000000] [task_parallelism/INFO] Task exec_A finished (5) > [500.000000] [task_parallelism/INFO] Task exec_A finished (6) > [600.000000] [task_parallelism/INFO] Task exec_A start -> [700.000000] [task_parallelism/INFO] Task exec_A finished (7) > [700.000000] [task_parallelism/INFO] Task exec_A start +> [700.000000] [task_parallelism/INFO] Task exec_A finished (7) > [800.000000] [task_parallelism/INFO] Task exec_A finished (8) > [900.000000] [task_parallelism/INFO] Task exec_A start > [900.000000] [task_parallelism/INFO] Task exec_A start -> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9) > [1000.000000] [task_parallelism/INFO] Task exec_A start -> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10) > [1000.000000] [task_parallelism/INFO] Task exec_A start +> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9) +> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10) +> [1100.000000] [task_parallelism/INFO] Task exec_A start > [1100.000000] [task_parallelism/INFO] Task exec_A finished (11) > [1100.000000] [task_parallelism/INFO] Task exec_A finished (12) -> [1100.000000] [task_parallelism/INFO] Task exec_A start -> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13) > [1200.000000] [task_parallelism/INFO] Task exec_A start +> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13) > [1250.000000] [task_parallelism/INFO] Task exec_A start > [1250.000000] [task_parallelism/INFO] Task exec_A start -> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14) > [1300.000000] [task_parallelism/INFO] Task exec_A start -> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15) +> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14) > [1350.000000] [task_parallelism/INFO] Task exec_A start -> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16) > [1350.000000] [task_parallelism/INFO] Task exec_A start +> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15) +> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16) > [1400.000000] [task_parallelism/INFO] Task exec_A finished (17) > [1450.000000] [task_parallelism/INFO] Task exec_A finished (18) > [1450.000000] [task_parallelism/INFO] Task exec_A finished (19) \ No newline at end of file diff --git a/examples/cpp/task-storm/s4u-task-storm.cpp b/examples/cpp/task-storm/s4u-task-storm.cpp index 58d08c379c..0a0ab7143b 100644 --- a/examples/cpp/task-storm/s4u-task-storm.cpp +++ b/examples/cpp/task-storm/s4u-task-storm.cpp @@ -74,10 +74,10 @@ int main(int argc, char* argv[]) Alternatively we: remove/add the link between SA and SA_to_B2 add/remove the link between SA and SA_to_B1 */ - SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) { + SA->on_this_completion_cb([&SA_to_B1, &SA_to_B2](sg4::Task* t) { int count = t->get_count(); sg4::CommTaskPtr comm; - if (count % 2 == 0) { + if (count % 2 == 1) { t->remove_successor(SA_to_B2); t->add_successor(SA_to_B1); comm = SA_to_B1; @@ -86,7 +86,8 @@ int main(int argc, char* argv[]) t->add_successor(SA_to_B2); comm = SA_to_B2; } - std::vector amount = {1e3, 1e6, 1e9}; + std::vector amount = {1e9, 1e3, 1e6}; + // XBT_INFO("Comm %f", amount[count % 3]); comm->set_amount(amount[count % 3]); auto token = std::make_shared(); token->set_data(new double(amount[count % 3])); @@ -94,18 +95,26 @@ int main(int argc, char* argv[]) }); // The token sent by SA is forwarded by both communication tasks - SA_to_B1->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); }); - SA_to_B2->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); }); + SA_to_B1->on_this_completion_cb([&SA](sg4::Task* t) { + t->set_token(t->get_token_from(SA)); + t->deque_token_from(SA); + }); + SA_to_B2->on_this_completion_cb([&SA](sg4::Task* t) { + t->set_token(t->get_token_from(SA)); + t->deque_token_from(SA); + }); /* B1 and B2 read the value of the token received by their predecessors and use it to adapt their amount of work to do. */ - B1->on_this_start_cb([SA_to_B1](sg4::Task* t) { - auto data = t->get_next_token_from(SA_to_B1)->get_unique_data(); + B1->on_this_start_cb([&SA_to_B1](sg4::Task* t) { + auto data = t->get_token_from(SA_to_B1)->get_data(); + t->deque_token_from(SA_to_B1); t->set_amount(*data * 10); }); - B2->on_this_start_cb([SA_to_B2](sg4::Task* t) { - auto data = t->get_next_token_from(SA_to_B2)->get_unique_data(); + B2->on_this_start_cb([&SA_to_B2](sg4::Task* t) { + auto data = t->get_token_from(SA_to_B2)->get_data(); + t->deque_token_from(SA_to_B2); t->set_amount(*data * 10); }); diff --git a/examples/cpp/task-storm/s4u-task-storm.tesh b/examples/cpp/task-storm/s4u-task-storm.tesh index d7c364a837..376dc31a47 100644 --- a/examples/cpp/task-storm/s4u-task-storm.tesh +++ b/examples/cpp/task-storm/s4u-task-storm.tesh @@ -24,11 +24,11 @@ $ ${bindir:=.}/s4u-task-storm ${platfdir}/small_platform.xml > [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5) > [2.619232] [task_storm/INFO] Task B3 finished (1) > [6.743624] [task_storm/INFO] Task B3 finished (2) -> [10.868015] [task_storm/INFO] Task B3 finished (3) > [10.868015] [task_storm/INFO] Task B4 finished (1) +> [10.868015] [task_storm/INFO] Task B3 finished (3) > [14.992407] [task_storm/INFO] Task B3 finished (4) -> [19.116799] [task_storm/INFO] Task B3 finished (5) > [19.116799] [task_storm/INFO] Task B4 finished (2) +> [19.116799] [task_storm/INFO] Task B3 finished (5) > [23.241190] [task_storm/INFO] Task B4 finished (3) > [27.365582] [task_storm/INFO] Task B4 finished (4) > [31.489974] [task_storm/INFO] Task B4 finished (5) diff --git a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp index b007523f73..c236bdbdb9 100644 --- a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp +++ b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp @@ -54,12 +54,19 @@ int main(int argc, char* argv[]) // successors to comm0 comm0->on_this_start_cb([&comm0, exec1, exec2, jupiter, fafard](const sg4::Task*) { static int count = 0; - if (count % 2 == 0) { + if (count % 2 == 0) comm0->set_destination(jupiter); + else + comm0->set_destination(fafard); + count++; + }); + + comm0->on_this_completion_cb([&comm0, exec1, exec2](const sg4::Task*) { + static int count = 0; + if (count % 2 == 0) { comm0->add_successor(exec1); comm0->remove_successor(exec2); } else { - comm0->set_destination(fafard); comm0->add_successor(exec2); comm0->remove_successor(exec1); } diff --git a/examples/python/task-io/task-io.py b/examples/python/task-io/task-io.py index e75215a0f7..d3ab8c1b1d 100644 --- a/examples/python/task-io/task-io.py +++ b/examples/python/task-io/task-io.py @@ -18,7 +18,7 @@ def parse(): return parser.parse_args() def callback(t): - print(f'[{Engine.clock}] {t} finished ({t.count})') + print(f'[{Engine.clock}] {t} finished ({t.get_count()})') if __name__ == '__main__': args = parse() diff --git a/examples/python/task-simple/task-simple.py b/examples/python/task-simple/task-simple.py index 23e9fc0c8a..beca2b6a2c 100644 --- a/examples/python/task-simple/task-simple.py +++ b/examples/python/task-simple/task-simple.py @@ -28,7 +28,7 @@ def parse(): return parser.parse_args() def callback(t): - print(f'[{Engine.clock}] {t} finished ({t.count})') + print(f'[{Engine.clock}] {t} finished ({t.get_count()})') if __name__ == '__main__': args = parse() diff --git a/examples/python/task-simple/task-simple.tesh b/examples/python/task-simple/task-simple.tesh index 5a27a53ad4..f9a828fe28 100644 --- a/examples/python/task-simple/task-simple.tesh +++ b/examples/python/task-simple/task-simple.tesh @@ -5,5 +5,5 @@ $ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/task-simple.py --p > [11.714617112501687] CommTask(comm) finished (1) > [20.388399000968448] ExecTask(exec1) finished (2) > [21.90881661298591] CommTask(comm) finished (2) -> [24.82146412938331] ExecTask(exec2) finished (1) -> [37.92831114626493] ExecTask(exec2) finished (2) +> [24.821464129383305] ExecTask(exec2) finished (1) +> [37.928311146264925] ExecTask(exec2) finished (2) diff --git a/examples/python/task-switch-host/task-switch-host.py b/examples/python/task-switch-host/task-switch-host.py index 5be8922534..03dce6a7fb 100644 --- a/examples/python/task-switch-host/task-switch-host.py +++ b/examples/python/task-switch-host/task-switch-host.py @@ -44,12 +44,16 @@ def parse(): return parser.parse_args() def callback(t): - print(f'[{Engine.clock}] {t} finished ({t.count})') + print(f'[{Engine.clock}] {t} finished ({t.get_count()})') -def switch(t, hosts, execs): - comm0.destination = hosts[t.count % 2] - comm0.remove_successor(execs[t.count % 2 - 1]) - comm0.add_successor(execs[t.count % 2]) +def switch_destination(t, hosts): + t.destination = hosts[switch_destination.count % 2] + switch_destination.count += 1 +switch_destination.count = 0 + +def switch_successor(t, execs): + t.remove_successor(execs[t.get_count() % 2]) + t.add_successor(execs[t.get_count() % 2 - 1]) if __name__ == '__main__': args = parse() @@ -74,13 +78,16 @@ if __name__ == '__main__': exec1.add_successor(comm1) exec2.add_successor(comm2) - # Add a function to be called when tasks end for log purpose + # Add a callback when tasks end for log purpose Task.on_completion_cb(callback) - # Add a function to be called before each firing of comm0 - # This function modifies the graph of tasks by adding or removing - # successors to comm0 - comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2])) + # Add a callback before each firing of comm0 + # It switches the destination of comm0 + comm0.on_this_start_cb(lambda t: switch_destination(t, [jupiter, fafard])) + + # Add a callback before comm0 send tokens to successors + # It switches the successor of comm0 + comm0.on_this_completion_cb(lambda t: switch_successor(t, [exec1,exec2])) # Enqueue two firings for task exec1 comm0.enqueue_firings(4) diff --git a/examples/python/task-variable-load/task-variable-load.py b/examples/python/task-variable-load/task-variable-load.py index 51dbc1a6c6..14fc2ec94e 100644 --- a/examples/python/task-variable-load/task-variable-load.py +++ b/examples/python/task-variable-load/task-variable-load.py @@ -28,7 +28,7 @@ def parse(): return parser.parse_args() def callback(t): - print(f'[{Engine.clock}] {t} finished ({t.count})') + print(f'[{Engine.clock}] {t} finished ({t.get_count()})') def variable_load(t): print('--- Small load ---') diff --git a/examples/smpi/mc/sendsend.tesh b/examples/smpi/mc/sendsend.tesh index ece38f56e0..3a628573f9 100644 --- a/examples/smpi/mc/sendsend.tesh +++ b/examples/smpi/mc/sendsend.tesh @@ -22,10 +22,10 @@ $ $VALGRIND_NO_LEAK_CHECK ../../../smpi_script/bin/smpirun -quiet -wrapper "${bi > [0.000000] [ker_engine/INFO] 2 actors are still running, waiting for something. > [0.000000] [ker_engine/INFO] Legend of the following listing: "Actor (@): " > [0.000000] [ker_engine/INFO] Actor 1 (0@node-0.simgrid.org) simcall CommWait(comm_id:1 src:1 dst:-1 mbox:SMPI-2(id:2)) -> [0.000000] [ker_engine/INFO] Actor 2 (1@node-1.simgrid.org) simcall CommWait(comm_id:2 src:2 dst:-1 mbox:SMPI-1(id:0)) +> [0.000000] [ker_engine/INFO] Actor 2 (1@node-1.simgrid.org) simcall CommWait(comm_id:2 src:2 dst:-1 mbox:SMPI-1(id:3)) > [0.000000] [mc_global/INFO] Counter-example execution trace: > [0.000000] [mc_global/INFO] Actor 1 in :0:() ==> simcall: iSend(mbox=2) -> [0.000000] [mc_global/INFO] Actor 2 in :0:() ==> simcall: iSend(mbox=0) +> [0.000000] [mc_global/INFO] Actor 2 in :0:() ==> simcall: iSend(mbox=3) > [0.000000] [mc_Session/INFO] You can debug the problem (and see the whole details) by rerunning out of simgrid-mc with --cfg=model-check/replay:'1;2' > [0.000000] [mc_dfs/INFO] DFS exploration ended. 3 unique states visited; 0 backtracks (0 transition replays, 3 states visited overall) > Execution failed with code 3. diff --git a/examples/sthread/CMakeLists.txt b/examples/sthread/CMakeLists.txt index 98811c23ad..7d9d7b5bca 100644 --- a/examples/sthread/CMakeLists.txt +++ b/examples/sthread/CMakeLists.txt @@ -5,7 +5,7 @@ find_package(Threads REQUIRED) ######################################################################### foreach(x - mutex-simple + mutex-simple mutex-recursive producer-consumer) if("${CMAKE_SYSTEM}" MATCHES "Linux") diff --git a/examples/sthread/pthread-mc-mutex-recursive.tesh b/examples/sthread/pthread-mc-mutex-recursive.tesh new file mode 100644 index 0000000000..984a4b56d1 --- /dev/null +++ b/examples/sthread/pthread-mc-mutex-recursive.tesh @@ -0,0 +1,15 @@ +# We ignore the LD_PRELOAD lines from the expected output because they contain the build path +! ignore .*LD_PRELOAD.* + +$ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-mutex-recursive +> sthread is intercepting the execution of ./pthread-mutex-recursive +> [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor. +> Got the lock on the default mutex. +> Failed to relock the default mutex. +> Got the lock on the recursive mutex. +> Got the lock again on the recursive mutex. +> Got the lock on the default mutex. +> Failed to relock the default mutex. +> Got the lock on the recursive mutex. +> Got the lock again on the recursive mutex. +> [0.000000] [mc_dfs/INFO] DFS exploration ended. 17 unique states visited; 1 backtracks (3 transition replays, 21 states visited overall) diff --git a/examples/sthread/pthread-mc-mutex-simple.tesh b/examples/sthread/pthread-mc-mutex-simple.tesh index 55ecc955dc..9e38ceadf3 100644 --- a/examples/sthread/pthread-mc-mutex-simple.tesh +++ b/examples/sthread/pthread-mc-mutex-simple.tesh @@ -3,7 +3,7 @@ ! ignore .*LD_PRELOAD.* $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-mutex-simple -> [0.000000] [sthread/INFO] Starting the simulation. +> sthread is intercepting the execution of ./pthread-mutex-simple > All threads are started. > [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor. > The thread 0 is terminating. diff --git a/examples/sthread/pthread-mc-mutex-simpledeadlock.tesh b/examples/sthread/pthread-mc-mutex-simpledeadlock.tesh index 2de4e42603..e843a409b9 100644 --- a/examples/sthread/pthread-mc-mutex-simpledeadlock.tesh +++ b/examples/sthread/pthread-mc-mutex-simpledeadlock.tesh @@ -6,7 +6,7 @@ ! ignore .*LD_PRELOAD.* $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-mutex-simpledeadlock -> [0.000000] [sthread/INFO] Starting the simulation. +> sthread is intercepting the execution of ./pthread-mutex-simpledeadlock > All threads are started. > [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor. > The thread 0 is terminating. diff --git a/examples/sthread/pthread-mc-producer-consumer.tesh b/examples/sthread/pthread-mc-producer-consumer.tesh index a9f5a723d7..584b7a5857 100644 --- a/examples/sthread/pthread-mc-producer-consumer.tesh +++ b/examples/sthread/pthread-mc-producer-consumer.tesh @@ -2,18 +2,18 @@ ! ignore .*LD_PRELOAD.* $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-producer-consumer -q -C 1 -P 1 -> [0.000000] [sthread/INFO] Starting the simulation. +> sthread is intercepting the execution of ./pthread-producer-consumer > [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: dpor. > [0.000000] [mc_dfs/INFO] DFS exploration ended. 786 unique states visited; 97 backtracks (2049 transition replays, 2932 states visited overall) $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/reduction:sdpor --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-producer-consumer -q -C 1 -P 1 > [0.000000] [xbt_cfg/INFO] Configuration change: Set 'model-check/reduction' to 'sdpor' -> [0.000000] [sthread/INFO] Starting the simulation. +> sthread is intercepting the execution of ./pthread-producer-consumer > [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: sdpor. > [0.000000] [mc_dfs/INFO] DFS exploration ended. 1186 unique states visited; 157 backtracks (3403 transition replays, 4746 states visited overall) $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../bin/simgrid-mc --cfg=model-check/reduction:odpor --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsgmalloc.so:${libdir:=.}/libsthread.so ${bindir:=.}/pthread-producer-consumer -q -C 1 -P 1 > [0.000000] [xbt_cfg/INFO] Configuration change: Set 'model-check/reduction' to 'odpor' -> [0.000000] [sthread/INFO] Starting the simulation. +> sthread is intercepting the execution of ./pthread-producer-consumer > [0.000000] [mc_dfs/INFO] Start a DFS exploration. Reduction is: odpor. > [0.000000] [mc_dfs/INFO] DFS exploration ended. 39 unique states visited; 0 backtracks (0 transition replays, 39 states visited overall) diff --git a/examples/sthread/pthread-mutex-recursive.c b/examples/sthread/pthread-mutex-recursive.c new file mode 100644 index 0000000000..482cf3ee7f --- /dev/null +++ b/examples/sthread/pthread-mutex-recursive.c @@ -0,0 +1,65 @@ +/* Copyright (c) 2002-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +/* Code with both recursive and non-recursive mutexes */ + +#include +#include +#include + +// Structure to hold the mutex's name and pointer to the actual mutex +typedef struct { + const char* name; + pthread_mutex_t* mutex; +} ThreadData; + +static void* thread_function(void* arg) +{ + ThreadData* data = (ThreadData*)arg; + pthread_mutex_t* mutex = data->mutex; + const char* name = data->name; + + pthread_mutex_lock(mutex); + fprintf(stderr, "Got the lock on the %s mutex.\n", name); + + // Attempt to relock the mutex - This behavior depends on the mutex type + if (pthread_mutex_trylock(mutex) == 0) { + fprintf(stderr, "Got the lock again on the %s mutex.\n", name); + pthread_mutex_unlock(mutex); + } else { + fprintf(stderr, "Failed to relock the %s mutex.\n", name); + } + + pthread_mutex_unlock(mutex); + + // pthread_exit(NULL); TODO: segfaulting + return NULL; +} + +int main() +{ + pthread_t thread1, thread2; + pthread_mutex_t mutex_dflt = PTHREAD_MUTEX_INITIALIZER; // Non-recursive mutex + + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_t mutex_rec; + pthread_mutex_init(&mutex_rec, &attr); + + ThreadData data1 = {"default", &mutex_dflt}; + ThreadData data2 = {"recursive", &mutex_rec}; + + pthread_create(&thread1, NULL, thread_function, &data1); + pthread_create(&thread2, NULL, thread_function, &data2); + + pthread_join(thread1, NULL); + pthread_join(thread2, NULL); + + pthread_mutex_destroy(&mutex_dflt); + pthread_mutex_destroy(&mutex_rec); + + return 0; +} diff --git a/examples/sthread/pthread-mutex-recursive.tesh b/examples/sthread/pthread-mutex-recursive.tesh new file mode 100644 index 0000000000..ca535da834 --- /dev/null +++ b/examples/sthread/pthread-mutex-recursive.tesh @@ -0,0 +1,7 @@ +$ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-mutex-recursive +> sthread is intercepting the execution of ./pthread-mutex-recursive +> Got the lock on the default mutex. +> Failed to relock the default mutex. +> Got the lock on the recursive mutex. +> Got the lock again on the recursive mutex. +> [0.000000] [sthread/INFO] All threads exited. Terminating the simulation. diff --git a/examples/sthread/pthread-mutex-simple.c b/examples/sthread/pthread-mutex-simple.c index 1d391418ba..a7dea6bd6b 100644 --- a/examples/sthread/pthread-mutex-simple.c +++ b/examples/sthread/pthread-mutex-simple.c @@ -1,3 +1,8 @@ +/* Copyright (c) 2002-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + /* Simple test code with no bug */ #include diff --git a/examples/sthread/pthread-mutex-simple.tesh b/examples/sthread/pthread-mutex-simple.tesh index 29d66a92df..2e12ec1204 100644 --- a/examples/sthread/pthread-mutex-simple.tesh +++ b/examples/sthread/pthread-mutex-simple.tesh @@ -1,5 +1,5 @@ $ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-mutex-simple -> [0.000000] [sthread/INFO] Starting the simulation. +> sthread is intercepting the execution of ./pthread-mutex-simple > All threads are started. > The thread 0 is terminating. > The thread 1 is terminating. diff --git a/examples/sthread/pthread-mutex-simpledeadlock.c b/examples/sthread/pthread-mutex-simpledeadlock.c index 09be6c11ea..92a04a1ace 100644 --- a/examples/sthread/pthread-mutex-simpledeadlock.c +++ b/examples/sthread/pthread-mutex-simpledeadlock.c @@ -1,3 +1,8 @@ +/* Copyright (c) 2002-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + /* Simple test code that may deadlock: Thread 1 locks mutex1 then mutex2 while thread 2 locks in reverse order. diff --git a/examples/sthread/pthread-producer-consumer.tesh b/examples/sthread/pthread-producer-consumer.tesh index a54bed01af..d8bf22cf52 100644 --- a/examples/sthread/pthread-producer-consumer.tesh +++ b/examples/sthread/pthread-producer-consumer.tesh @@ -1,5 +1,5 @@ $ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-producer-consumer -> [0.000000] [sthread/INFO] Starting the simulation. +> sthread is intercepting the execution of ./pthread-producer-consumer > Producer 1: Insert Item 0 at 0 > Producer 2: Insert Item 0 at 1 > Consumer 1: Remove Item 0 from 0 @@ -15,7 +15,7 @@ $ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=. > [0.000000] [sthread/INFO] All threads exited. Terminating the simulation. $ env ASAN_OPTIONS=verify_asan_link_order=0:$ASAN_OPTIONS LD_PRELOAD=${libdir:=.}/libsthread.so ./pthread-producer-consumer -c 2 -C 1 -p 2 -P 1 -> [0.000000] [sthread/INFO] Starting the simulation. +> sthread is intercepting the execution of ./pthread-producer-consumer > Producer 1: Insert Item 0 at 0 > Consumer 1: Remove Item 0 from 0 > Producer 1: Insert Item 1 at 1 diff --git a/examples/sthread/stdobject/stdobject.cpp b/examples/sthread/stdobject/stdobject.cpp index 06e49c86a5..246095eaef 100644 --- a/examples/sthread/stdobject/stdobject.cpp +++ b/examples/sthread/stdobject/stdobject.cpp @@ -6,13 +6,15 @@ std::vector v = {1, 2, 3, 5, 8, 13}; extern "C" { -extern int sthread_access_begin(void* addr, const char* objname, const char* file, int line) __attribute__((weak)); -extern void sthread_access_end(void* addr, const char* objname, const char* file, int line) __attribute__((weak)); +extern int sthread_access_begin(void* addr, const char* objname, const char* file, int line, const char* func) + __attribute__((weak)); +extern void sthread_access_end(void* addr, const char* objname, const char* file, int line, const char* func) + __attribute__((weak)); } #define STHREAD_ACCESS(obj) \ - for (bool first = sthread_access_begin(static_cast(obj), #obj, __FILE__, __LINE__) || true; first; \ - sthread_access_end(static_cast(obj), #obj, __FILE__, __LINE__), first = false) + for (bool first = sthread_access_begin(static_cast(obj), #obj, __FILE__, __LINE__, __FUNCTION__) || true; \ + first; sthread_access_end(static_cast(obj), #obj, __FILE__, __LINE__, __FUNCTION__), first = false) static void thread_code() { diff --git a/examples/sthread/stdobject/stdobject.tesh b/examples/sthread/stdobject/stdobject.tesh index 457238ced8..bb8e1b7beb 100644 --- a/examples/sthread/stdobject/stdobject.tesh +++ b/examples/sthread/stdobject/stdobject.tesh @@ -5,7 +5,7 @@ ! ignore .*LD_PRELOAD.* $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../../bin/simgrid-mc --cfg=model-check/setenv:LD_PRELOAD=${libdir:=.}/libsthread.so ${bindir:=.}/stdobject "--log=root.fmt:[%11.6r]%e(%a@%h)%e%m%n" --log=no_loc -> [ 0.000000] (maestro@) Starting the simulation. +> sthread is intercepting the execution of ./stdobject > starting two helpers... > waiting for helpers to finish... > [ 0.000000] (maestro@) Start a DFS exploration. Reduction is: dpor. @@ -16,7 +16,7 @@ $ $VALGRIND_NO_TRACE_CHILDREN ${bindir:=.}/../../../bin/simgrid-mc --cfg=model-c > v = { 1, 2, 3, 5, 8, 13, 21, 21, }; > [ 0.000000] (maestro@) thread 1 takes &v > [ 0.000000] (maestro@) thread 2 takes &v -> [ 0.000000] (maestro@) Unprotected concurent access to &v: thread 1 vs thread 2 (locations hidden because of --log=no_loc). +> [ 0.000000] (maestro@) Unprotected concurent access to &v: thread 1 from 1 location vs thread 2 (locations hidden because of --log=no_loc). > [ 0.000000] (maestro@) ************************** > [ 0.000000] (maestro@) *** PROPERTY NOT VALID *** > [ 0.000000] (maestro@) ************************** diff --git a/examples/sthread/sthread-mutex-simple.tesh b/examples/sthread/sthread-mutex-simple.tesh index ffa04c1dc4..c44bd9794c 100644 --- a/examples/sthread/sthread-mutex-simple.tesh +++ b/examples/sthread/sthread-mutex-simple.tesh @@ -1,5 +1,5 @@ $ ./sthread-mutex-simple -> [0.000000] [sthread/INFO] Starting the simulation. +> sthread is intercepting the execution of ./sthread-mutex-simple > All threads are started. > The thread 0 is terminating. > The thread 1 is terminating. diff --git a/include/simgrid/forward.h b/include/simgrid/forward.h index 85fe6a71a4..d4486a29f1 100644 --- a/include/simgrid/forward.h +++ b/include/simgrid/forward.h @@ -123,6 +123,7 @@ using ActorCodeFactory = std::function args)> class Simcall; class SimcallObserver; +class MutexObserver; class ObjectAccessSimcallObserver; class ObjectAccessSimcallItem; } // namespace actor @@ -197,6 +198,7 @@ class StandardLinkImpl; class SplitDuplexLinkImpl; class NetworkAction; class DiskImpl; +using DiskImplPtr = boost::intrusive_ptr; class DiskModel; class VirtualMachineImpl; class VMModel; diff --git a/include/simgrid/s4u/Mutex.hpp b/include/simgrid/s4u/Mutex.hpp index 7791cd7e23..06f04c2aa5 100644 --- a/include/simgrid/s4u/Mutex.hpp +++ b/include/simgrid/s4u/Mutex.hpp @@ -12,6 +12,8 @@ namespace simgrid::s4u { /** @brief A classical mutex, but blocking in the simulation world. + * + * S4U mutexes are not recursive. If an actor tries to lock the same object twice, it deadlocks with itself. * * @beginrst * It is strictly impossible to use a real mutex, such as @@ -48,7 +50,8 @@ class XBT_PUBLIC Mutex { public: /** \static Constructs a new mutex */ - static MutexPtr create(); + static MutexPtr create(bool recursive = false); + void lock(); void unlock(); bool try_lock(); diff --git a/include/simgrid/s4u/NetZone.hpp b/include/simgrid/s4u/NetZone.hpp index b272312fad..770b70ab06 100644 --- a/include/simgrid/s4u/NetZone.hpp +++ b/include/simgrid/s4u/NetZone.hpp @@ -248,8 +248,29 @@ struct ClusterCallbacks { * @param id: Internal identifier of the element * @return pair: returns a pair of netpoint and gateway. */ + // XBT_ATTRIB_DEPRECATED_v339 using ClusterNetPointCb = std::pair( NetZone* zone, const std::vector& coord, unsigned long id); + + /** + * @brief Callback used to set the NetZone located at some leaf of clusters (Torus, FatTree, etc) + * + * @param zone: The parent zone, needed for creating new resources (hosts, links) + * @param coord: the coordinates of the element + * @param id: Internal identifier of the element + * @return NetZone*: returns newly created netzone + */ + using ClusterNetZoneCb = NetZone*(NetZone* zone, const std::vector& coord, unsigned long id); + /** + * @brief Callback used to set the Host located at some leaf of clusters (Torus, FatTree, etc) + * + * @param zone: The parent zone, needed for creating new resources (hosts, links) + * @param coord: the coordinates of the element + * @param id: Internal identifier of the element + * @return Host*: returns newly created host + */ + using ClusterHostCb = Host*(NetZone* zone, const std::vector& coord, unsigned long id); + /** * @brief Callback used to set the links for some leaf of the cluster (Torus, FatTree, etc) * @@ -267,14 +288,36 @@ struct ClusterCallbacks { */ using ClusterLinkCb = Link*(NetZone* zone, const std::vector& coord, unsigned long id); - std::function netpoint; + bool by_netzone_ = false; + bool is_by_netzone() const { return by_netzone_; } + bool by_netpoint_ = false; // XBT_ATTRIB_DEPRECATED_v339 + bool is_by_netpoint() const { return by_netpoint_; } // XBT_ATTRIB_DEPRECATED_v339 + std::function netpoint; // XBT_ATTRIB_DEPRECATED_v339 + std::function host; + std::function netzone; std::function loopback = {}; std::function limiter = {}; + explicit ClusterCallbacks(const std::function& set_netzone) + : by_netzone_(true), netzone(set_netzone){/* nothing to do */}; + + ClusterCallbacks(const std::function& set_netzone, + const std::function& set_loopback, const std::function& set_limiter) + : by_netzone_(true), netzone(set_netzone), loopback(set_loopback), limiter(set_limiter){/* nothing to do */}; + + explicit ClusterCallbacks(const std::function& set_host) + : host(set_host) {/* nothing to do */}; + + ClusterCallbacks(const std::function& set_host, + const std::function& set_loopback, const std::function& set_limiter) + : host(set_host), loopback(set_loopback), limiter(set_limiter){/* nothing to do */}; + + XBT_ATTRIB_DEPRECATED_v339("Please use callback with either a Host/NetZone creation function as first parameter") explicit ClusterCallbacks(const std::function& set_netpoint) - : netpoint(set_netpoint){/*nothing to do */}; + : by_netpoint_(true), netpoint(set_netpoint){/* nothing to do */}; + XBT_ATTRIB_DEPRECATED_v339("Please use callback with either a Host/NetZone creation function as first parameter") ClusterCallbacks(const std::function& set_netpoint, const std::function& set_loopback, const std::function& set_limiter) - : netpoint(set_netpoint), loopback(set_loopback), limiter(set_limiter){/*nothing to do */}; + : by_netpoint_(true), netpoint(set_netpoint), loopback(set_loopback), limiter(set_limiter){/* nothing to do */}; }; /** * @brief Create a torus zone diff --git a/include/simgrid/s4u/Task.hpp b/include/simgrid/s4u/Task.hpp index f2b6eb6d75..03ee2653ba 100644 --- a/include/simgrid/s4u/Task.hpp +++ b/include/simgrid/s4u/Task.hpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace simgrid::s4u { @@ -27,23 +28,29 @@ using IoTaskPtr = boost::intrusive_ptr; class XBT_PUBLIC Token : public xbt::Extendable {}; class Task { + std::string name_; - double amount_; - int queued_firings_ = 0; - int count_ = 0; - int running_instances_ = 0; - int parallelism_degree_ = 1; + + std::map amount_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}}; + std::map queued_firings_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}}; + std::map running_instances_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}}; + std::map count_ = {{"instance_0", 0}, {"dispatcher", 0}, {"collector", 0}}; + std::map parallelism_degree_ = {{"instance_0", 1}, {"dispatcher", 1}, {"collector", 1}}; + std::map internal_bytes_to_send_ = {{"instance_0", 0}, {"dispatcher", 0}}; + + std::function load_balancing_function_; std::set successors_ = {}; std::map predecessors_ = {}; std::atomic_int_fast32_t refcount_{0}; - bool ready_to_run() const; + bool ready_to_run(std::string instance); void receive(Task* source); std::shared_ptr token_ = nullptr; - std::deque>> tokens_received_; - std::deque current_activities_; + std::map>> tokens_received_; + std::map> current_activities_ = { + {"instance_0", {}}, {"dispatcher", {}}, {"collector", {}}}; inline static xbt::signal on_start; xbt::signal on_this_start; @@ -54,23 +61,33 @@ protected: explicit Task(const std::string& name); virtual ~Task() = default; - virtual void fire(); - void complete(); + virtual void fire(std::string instance); + void complete(std::string instance); - void store_activity(ActivityPtr a) { current_activities_.push_back(a); } + void store_activity(ActivityPtr a, std::string instance) { current_activities_[instance].push_back(a); } + + virtual void add_instances(int n); + virtual void remove_instances(int n); public: void set_name(std::string name); const std::string& get_name() const { return name_; } const char* get_cname() const { return name_.c_str(); } - void set_amount(double amount); - double get_amount() const { return amount_; } - int get_count() const { return count_; } - void set_parallelism_degree(int n); - int get_parallelism_degree() const { return parallelism_degree_; } + void set_amount(double amount, std::string instance = "instance_0"); + double get_amount(std::string instance = "instance_0") const { return amount_.at(instance); } + int get_queued_firings(std::string instance = "instance_0") { return queued_firings_.at(instance); } + int get_running_count(std::string instance = "instance_0") { return running_instances_.at(instance); } + int get_count(std::string instance = "collector") const { return count_.at(instance); } + void set_parallelism_degree(int n, std::string instance = "all"); + int get_parallelism_degree(std::string instance = "instance_0") const { return parallelism_degree_.at(instance); } + void set_internal_bytes(int bytes, std::string instance = "instance_0"); + double get_internal_bytes(std::string instance = "instance_0") const { return internal_bytes_to_send_.at(instance); } + void set_load_balancing_function(std::function func); void set_token(std::shared_ptr token); - std::shared_ptr get_next_token_from(TaskPtr t); + std::shared_ptr get_token_from(TaskPtr t) const { return tokens_received_.at(t).front(); } + std::deque> get_tokens_from(TaskPtr t) const { return tokens_received_.at(t); } + void deque_token_from(TaskPtr t); void add_successor(TaskPtr t); void remove_successor(TaskPtr t); @@ -107,7 +124,7 @@ class CommTask : public Task { Host* destination_; explicit CommTask(const std::string& name); - void fire() override; + void fire(std::string instance) override; public: static CommTaskPtr init(const std::string& name); @@ -118,30 +135,33 @@ public: CommTaskPtr set_destination(Host* destination); Host* get_destination() const { return destination_; } CommTaskPtr set_bytes(double bytes); - double get_bytes() const { return get_amount(); } + double get_bytes() const { return get_amount("instance_0"); } }; class ExecTask : public Task { - Host* host_; + std::map host_ = {{"instance_0", nullptr}, {"dispatcher", nullptr}, {"collector", nullptr}}; explicit ExecTask(const std::string& name); - void fire() override; + void fire(std::string instance) override; public: static ExecTaskPtr init(const std::string& name); static ExecTaskPtr init(const std::string& name, double flops, Host* host); - ExecTaskPtr set_host(Host* host); - Host* get_host() const { return host_; } - ExecTaskPtr set_flops(double flops); - double get_flops() const { return get_amount(); } + ExecTaskPtr set_host(Host* host, std::string instance = "all"); + Host* get_host(std::string instance = "instance_0") const { return host_.at(instance); } + ExecTaskPtr set_flops(double flops, std::string instance = "instance_0"); + double get_flops(std::string instance = "instance_0") const { return get_amount(instance); } + + void add_instances(int n) override; + void remove_instances(int n) override; }; class IoTask : public Task { Disk* disk_; Io::OpType type_; explicit IoTask(const std::string& name); - void fire() override; + void fire(std::string instance) override; public: static IoTaskPtr init(const std::string& name); @@ -150,7 +170,7 @@ public: IoTaskPtr set_disk(Disk* disk); Disk* get_disk() const { return disk_; } IoTaskPtr set_bytes(double bytes); - double get_bytes() const { return get_amount(); } + double get_bytes() const { return get_amount("instance_0"); } IoTaskPtr set_op_type(Io::OpType type); Io::OpType get_op_type() const { return type_; } }; diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index 792ae4b5e3..53065f0a72 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -750,7 +750,8 @@ PYBIND11_MODULE(simgrid, m) py::class_(m, "Mutex", "A classical mutex, but blocking in the simulation world." "See the C++ documentation for details.") - .def(py::init<>(&Mutex::create), py::call_guard(), "Mutex constructor.") + .def(py::init<>(&Mutex::create), py::call_guard(), + "Mutex constructor (pass True as a parameter to get a recursive Mutex).", py::arg("recursive") = false) .def("lock", &Mutex::lock, py::call_guard(), "Block until the mutex is acquired.") .def("try_lock", &Mutex::try_lock, py::call_guard(), "Try to acquire the mutex. Return true if the mutex was acquired, false otherwise.") @@ -861,9 +862,14 @@ PYBIND11_MODULE(simgrid, m) }, "Add a callback called when each task ends.") .def_property_readonly("name", &Task::get_name, "The name of this task (read-only).") - .def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).") .def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).") .def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.") + .def( + "get_count", [](const TaskPtr t) { return t->get_count("instance_0"); }, + "The execution count of this task instance_0.") + .def( + "get_count", [](const TaskPtr t, const std::string& instance) { return t->get_count(instance); }, + "The execution count of this task instance.") .def("enqueue_firings", py::overload_cast(&Task::enqueue_firings), py::call_guard(), py::arg("n"), "Enqueue firings for this task.") .def("add_successor", py::overload_cast(&Task::add_successor), py::call_guard(), diff --git a/src/kernel/activity/MailboxImpl.cpp b/src/kernel/activity/MailboxImpl.cpp index cb49f3b2b8..aaeeb83dc4 100644 --- a/src/kernel/activity/MailboxImpl.cpp +++ b/src/kernel/activity/MailboxImpl.cpp @@ -46,7 +46,7 @@ void MailboxImpl::set_receiver(s4u::ActorPtr actor) /** @brief Pushes a communication activity into a mailbox * @param comm What to add */ -void MailboxImpl::push(CommImplPtr comm) +void MailboxImpl::push(const CommImplPtr& comm) { comm->set_mailbox(this); this->comm_queue_.push_back(std::move(comm)); @@ -61,12 +61,11 @@ void MailboxImpl::remove(const CommImplPtr& comm) (comm->get_mailbox() ? comm->get_mailbox()->get_cname() : "(null)"), this->get_cname()); comm->set_mailbox(nullptr); - for (auto it = this->comm_queue_.begin(); it != this->comm_queue_.end(); it++) - if (*it == comm) { - this->comm_queue_.erase(it); - return; - } - xbt_die("Comm %p not found in mailbox %s", comm.get(), this->get_cname()); + auto it = std::find(this->comm_queue_.begin(), this->comm_queue_.end(), comm); + if (it != this->comm_queue_.end()) + this->comm_queue_.erase(it); + else + xbt_die("Comm %p not found in mailbox %s", comm.get(), this->get_cname()); } /** @brief Removes all communication activities from a mailbox @@ -74,7 +73,7 @@ void MailboxImpl::remove(const CommImplPtr& comm) void MailboxImpl::clear(bool do_finish) { // CommImpl::cancel() will remove the comm from the mailbox.. - for (auto comm : done_comm_queue_) { + for (const auto& comm : done_comm_queue_) { comm->cancel(); comm->set_state(State::FAILED); if (do_finish) diff --git a/src/kernel/activity/MailboxImpl.hpp b/src/kernel/activity/MailboxImpl.hpp index 199fcd12cc..9d30a4e13e 100644 --- a/src/kernel/activity/MailboxImpl.hpp +++ b/src/kernel/activity/MailboxImpl.hpp @@ -6,26 +6,32 @@ #ifndef SIMGRID_KERNEL_ACTIVITY_MAILBOX_HPP #define SIMGRID_KERNEL_ACTIVITY_MAILBOX_HPP +#include "simgrid/config.h" /* FIXME: KILLME. This makes the ABI config-dependent, but mandatory for the hack below */ #include "simgrid/s4u/Engine.hpp" #include "simgrid/s4u/Mailbox.hpp" #include "src/kernel/activity/CommImpl.hpp" #include "src/kernel/actor/ActorImpl.hpp" -#include - namespace simgrid::kernel::activity { /** @brief Implementation of the s4u::Mailbox */ class MailboxImpl { - static constexpr size_t MAX_MAILBOX_SIZE = 10000000; - s4u::Mailbox piface_; std::string name_; actor::ActorImplPtr permanent_receiver_; // actor to which the mailbox is attached - boost::circular_buffer_space_optimized comm_queue_{MAX_MAILBOX_SIZE}; +#if SIMGRID_HAVE_STATEFUL_MC + /* Using deque here is faster in benchmarks, but break the state equality heuristic of Liveness checking on Debian + * testing. This would desserve a proper investiguation, but simply use a single-sided list for the time being. HACK. + */ + std::list comm_queue_; // messages already received in the permanent receive mode - boost::circular_buffer_space_optimized done_comm_queue_{MAX_MAILBOX_SIZE}; + std::list done_comm_queue_; +#else + std::deque comm_queue_; + // messages already received in the permanent receive mode + std::deque done_comm_queue_; +#endif friend s4u::Engine; friend s4u::Mailbox; @@ -50,8 +56,8 @@ public: const std::string& get_name() const { return name_; } const char* get_cname() const { return name_.c_str(); } void set_receiver(s4u::ActorPtr actor); - void push(CommImplPtr comm); - void push_done(CommImplPtr done_comm) { done_comm_queue_.push_back(done_comm); } + void push(const CommImplPtr& comm); + void push_done(const CommImplPtr& done_comm) { done_comm_queue_.push_back(done_comm); } void remove(const CommImplPtr& comm); void clear(bool do_finish); CommImplPtr iprobe(int type, const std::function& match_fun, void* data); @@ -61,9 +67,9 @@ public: actor::ActorImplPtr get_permanent_receiver() const { return permanent_receiver_; } bool empty() const { return comm_queue_.empty(); } size_t size() const { return comm_queue_.size(); } - CommImplPtr front() const { return comm_queue_.front(); } + const CommImplPtr& front() const { return comm_queue_.front(); } bool has_some_done_comm() const { return not done_comm_queue_.empty(); } - CommImplPtr done_front() const { return done_comm_queue_.front(); } + const CommImplPtr& done_front() const { return done_comm_queue_.front(); } }; } // namespace simgrid::kernel::activity diff --git a/src/kernel/activity/MutexImpl.cpp b/src/kernel/activity/MutexImpl.cpp index 43a511b9d3..c981c3ee76 100644 --- a/src/kernel/activity/MutexImpl.cpp +++ b/src/kernel/activity/MutexImpl.cpp @@ -47,13 +47,41 @@ unsigned MutexImpl::next_id_ = 0; MutexAcquisitionImplPtr MutexImpl::lock_async(actor::ActorImpl* issuer) { - auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true); - - if (owner_ != nullptr) { - /* Somebody is using the mutex; register the acquisition */ + /* If the mutex is recursive */ + if (is_recursive_) { + if (owner_ == issuer) { + recursive_depth++; + auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true); + res->grant(); + return res; + } else if (owner_ == nullptr) { // Free + owner_ = issuer; + recursive_depth = 1; + auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true); + res->grant(); + return res; + } + + for (auto acq : ongoing_acquisitions_) + if (acq->get_issuer() == issuer) { + acq->recursive_depth_++; + return acq; + } + + // Not yet in the ongoing acquisition list. Get in there + auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true); ongoing_acquisitions_.push_back(res); - } else { + return res; + } + + // None-recursive mutex + auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true); + if (owner_ == nullptr) { // Lock is free, take it owner_ = issuer; + recursive_depth = 1; + res->grant(); + } else { // Somebody is using the mutex; register the acquisition + ongoing_acquisitions_.push_back(res); } return res; } @@ -65,14 +93,14 @@ MutexAcquisitionImplPtr MutexImpl::lock_async(actor::ActorImpl* issuer) */ bool MutexImpl::try_lock(actor::ActorImpl* issuer) { - XBT_IN("(%p, %p)", this, issuer); - if (owner_ != nullptr) { - XBT_OUT(); - return false; + if (owner_ == issuer && is_recursive_) { + recursive_depth++; + return true; } + if (owner_ != nullptr) + return false; - owner_ = issuer; - XBT_OUT(); + owner_ = issuer; return true; } @@ -88,12 +116,20 @@ void MutexImpl::unlock(actor::ActorImpl* issuer) xbt_assert(issuer == owner_, "Cannot release that mutex: you're not the owner. %s is (pid:%ld).", owner_ != nullptr ? owner_->get_cname() : "(nobody)", owner_ != nullptr ? owner_->get_pid() : -1); + if (is_recursive_) { + recursive_depth--; + if (recursive_depth > 0) // Still owning the lock + return; + } + if (not ongoing_acquisitions_.empty()) { /* Give the ownership to the first waiting actor */ auto acq = ongoing_acquisitions_.front(); ongoing_acquisitions_.pop_front(); owner_ = acq->get_issuer(); + acq->grant(); + recursive_depth = acq->recursive_depth_; if (acq == owner_->waiting_synchro_) acq->finish(); // else, the issuer is not blocked on this acquisition so no need to release it diff --git a/src/kernel/activity/MutexImpl.hpp b/src/kernel/activity/MutexImpl.hpp index 3eebec2b12..9d9242b4a5 100644 --- a/src/kernel/activity/MutexImpl.hpp +++ b/src/kernel/activity/MutexImpl.hpp @@ -42,11 +42,18 @@ namespace simgrid::kernel::activity { class XBT_PUBLIC MutexAcquisitionImpl : public ActivityImpl_T { actor::ActorImpl* issuer_ = nullptr; MutexImpl* mutex_ = nullptr; + int recursive_depth_ = 1; + // TODO: use granted_ this instead of owner_ == self to test(). + // This is mandatory to get double-lock on non-recursive locks to properly deadlock + bool granted_ = false; + + friend MutexImpl; public: MutexAcquisitionImpl(actor::ActorImpl* issuer, MutexImpl* mutex) : issuer_(issuer), mutex_(mutex) {} MutexImplPtr get_mutex() { return mutex_; } actor::ActorImpl* get_issuer() { return issuer_; } + void grant() { granted_ = true; } bool test(actor::ActorImpl* issuer = nullptr) override; void wait_for(actor::ActorImpl* issuer, double timeout) override; @@ -63,11 +70,13 @@ class XBT_PUBLIC MutexImpl { std::deque ongoing_acquisitions_; static unsigned next_id_; unsigned id_ = next_id_++; + bool is_recursive_ = false; + int recursive_depth = 0; friend MutexAcquisitionImpl; public: - MutexImpl() : piface_(this) {} + MutexImpl(bool recursive = false) : piface_(this), is_recursive_(recursive) {} MutexImpl(MutexImpl const&) = delete; MutexImpl& operator=(MutexImpl const&) = delete; diff --git a/src/kernel/resource/DiskImpl.hpp b/src/kernel/resource/DiskImpl.hpp index 55f6857cdd..a1bdb8ae50 100644 --- a/src/kernel/resource/DiskImpl.hpp +++ b/src/kernel/resource/DiskImpl.hpp @@ -50,6 +50,7 @@ class DiskImpl : public Resource_T, public xbt::PropertyHolder { Metric read_bw_ = {0.0, 0, nullptr}; Metric write_bw_ = {0.0, 0, nullptr}; double readwrite_bw_ = -1; /* readwrite constraint bound, usually max(read, write) */ + std::atomic_int_fast32_t refcount_{0}; void apply_sharing_policy_cfg(); @@ -60,6 +61,17 @@ public: explicit DiskImpl(const std::string& name, double read_bandwidth, double write_bandwidth); DiskImpl(const DiskImpl&) = delete; DiskImpl& operator=(const DiskImpl&) = delete; + friend void intrusive_ptr_add_ref(DiskImpl* disk) + { + disk->refcount_.fetch_add(1, std::memory_order_acq_rel); + } + friend void intrusive_ptr_release(DiskImpl* disk) + { + if (disk->refcount_.fetch_sub(1, std::memory_order_release) == 1) { + std::atomic_thread_fence(std::memory_order_acquire); + delete disk; + } + } /** @brief Public interface */ const s4u::Disk* get_iface() const { return &piface_; } diff --git a/src/kernel/resource/HostImpl.cpp b/src/kernel/resource/HostImpl.cpp index 614bf9e387..da11f9aa1e 100644 --- a/src/kernel/resource/HostImpl.cpp +++ b/src/kernel/resource/HostImpl.cpp @@ -8,6 +8,7 @@ #include #include "src/kernel/EngineImpl.hpp" +#include "src/kernel/resource/NetworkModel.hpp" #include "src/kernel/resource/VirtualMachineImpl.hpp" #include "xbt/asserts.hpp" @@ -16,7 +17,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(res_host, ker_resource, "Host resources agregate CPU, networking and I/O features"); /************* - * Callbacks *t + * Callbacks * *************/ namespace simgrid::kernel::resource { @@ -24,6 +25,28 @@ namespace simgrid::kernel::resource { /********* * Model * *********/ +Action* HostModel::io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, + double size) +{ + auto* net_model = src_host->get_englobing_zone()->get_network_model(); + auto* system = net_model->get_maxmin_system(); + auto* action = net_model->communicate(src_host, dst_host, size, -1, true); + + // We don't want to apply the network model bandwidth factor to the I/O constraints + double bw_factor = net_model->get_bandwidth_factor(); + if (src_disk != nullptr) { + // FIXME: if the stream starts from a disk, we might not want to pay the network latency + system->expand(src_disk->get_constraint(), action->get_variable(), bw_factor); + system->expand(src_disk->get_read_constraint(), action->get_variable(), bw_factor); + } + if (dst_disk != nullptr) { + system->expand(dst_disk->get_constraint(), action->get_variable(), bw_factor); + system->expand(dst_disk->get_write_constraint(), action->get_variable(), bw_factor); + } + + return action; +} + /************ * Resource * ************/ @@ -53,9 +76,6 @@ HostImpl::~HostImpl() delete arg; actors_at_boot_.clear(); - for (auto const& [_, d] : disks_) - d->destroy(); - for (auto const& [_, vm] : vms_) vm->vm_destroy(); } @@ -128,6 +148,7 @@ std::vector HostImpl::get_all_actors() res.emplace_back(actor.get_ciface()); return res; } + size_t HostImpl::get_actor_count() const { return actor_list_.size(); @@ -216,7 +237,7 @@ s4u::Disk* HostImpl::create_disk(const std::string& name, double read_bandwidth, void HostImpl::add_disk(const s4u::Disk* disk) { - disks_[disk->get_name()] = disk->get_impl(); + disks_.insert({disk->get_name(), kernel::resource::DiskImplPtr(disk->get_impl())}); } void HostImpl::remove_disk(const std::string& name) diff --git a/src/kernel/resource/HostImpl.hpp b/src/kernel/resource/HostImpl.hpp index 0de2d0b4be..506a3d6b5a 100644 --- a/src/kernel/resource/HostImpl.hpp +++ b/src/kernel/resource/HostImpl.hpp @@ -29,8 +29,8 @@ public: virtual Action* execute_parallel(const std::vector& host_list, const double* flops_amount, const double* bytes_amount, double rate) = 0; - virtual Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, - double size) = 0; + Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, + double size); }; /************ @@ -49,7 +49,7 @@ class XBT_PRIVATE HostImpl : public xbt::PropertyHolder, public actor::ObjectAcc ActorList actor_list_; std::vector actors_at_boot_; s4u::Host piface_; - std::map> disks_; + std::map> disks_; std::map> vms_; std::string name_{"noname"}; routing::NetZoneImpl* englobing_zone_ = nullptr; diff --git a/src/kernel/resource/VirtualMachineImpl.hpp b/src/kernel/resource/VirtualMachineImpl.hpp index 8d51c7df04..781a33be18 100644 --- a/src/kernel/resource/VirtualMachineImpl.hpp +++ b/src/kernel/resource/VirtualMachineImpl.hpp @@ -99,7 +99,6 @@ public: { return nullptr; }; - Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size) override { return nullptr; } }; } // namespace kernel::resource } // namespace simgrid diff --git a/src/kernel/resource/models/host_clm03.cpp b/src/kernel/resource/models/host_clm03.cpp index c5b47a11e6..5050b1fd2e 100644 --- a/src/kernel/resource/models/host_clm03.cpp +++ b/src/kernel/resource/models/host_clm03.cpp @@ -50,28 +50,6 @@ static inline double has_cost(const double* array, size_t pos) return -1.0; } -Action* HostCLM03Model::io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, - double size) -{ - auto* net_model = src_host->get_englobing_zone()->get_network_model(); - auto* system = net_model->get_maxmin_system(); - auto* action = net_model->communicate(src_host, dst_host, size, -1, true); - - // We don't want to apply the network model bandwidth factor to the I/O constraints - double bw_factor = net_model->get_bandwidth_factor(); - if (src_disk != nullptr) { - // FIXME: if the stream starts from a disk, we might not want to pay the network latency - system->expand(src_disk->get_constraint(), action->get_variable(), bw_factor); - system->expand(src_disk->get_read_constraint(), action->get_variable(), bw_factor); - } - if (dst_disk != nullptr) { - system->expand(dst_disk->get_constraint(), action->get_variable(), bw_factor); - system->expand(dst_disk->get_write_constraint(), action->get_variable(), bw_factor); - } - - return action; -} - Action* HostCLM03Model::execute_parallel(const std::vector& host_list, const double* flops_amount, const double* bytes_amount, double rate) { diff --git a/src/kernel/resource/models/host_clm03.hpp b/src/kernel/resource/models/host_clm03.hpp index 65715c4d36..cc5fc9f8ab 100644 --- a/src/kernel/resource/models/host_clm03.hpp +++ b/src/kernel/resource/models/host_clm03.hpp @@ -22,8 +22,6 @@ public: Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override; Action* execute_parallel(const std::vector& host_list, const double* flops_amount, const double* bytes_amount, double rate) override; - Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, - double size) override; }; } // namespace simgrid::kernel::resource diff --git a/src/kernel/resource/models/network_ns3.cpp b/src/kernel/resource/models/network_ns3.cpp index b2064421aa..3d4c0650ff 100644 --- a/src/kernel/resource/models/network_ns3.cpp +++ b/src/kernel/resource/models/network_ns3.cpp @@ -119,8 +119,14 @@ static void zoneCreation_cb(simgrid::s4u::NetZone const& zone) wifiPhy.Set("Antennas", ns3::UintegerValue(nss_value)); wifiPhy.Set("MaxSupportedTxSpatialStreams", ns3::UintegerValue(nss_value)); wifiPhy.Set("MaxSupportedRxSpatialStreams", ns3::UintegerValue(nss_value)); -#if NS3_MINOR_VERSION > 33 +#if NS3_MINOR_VERSION < 33 + // This fails with "The channel width does not uniquely identify an operating channel" on v3.34, + // so we specified the ChannelWidth of wifiPhy to 40, above, when creating wifiPhy with v3.34 and higher + ns3::Config::Set("/NodeList/*/DeviceList/*/$ns3::WifiNetDevice/Phy/ChannelWidth", ns3::UintegerValue(40)); +#elif NS3_MINOR_VERSION < 36 wifiPhy.Set("ChannelWidth", ns3::UintegerValue(40)); +#else + wifiPhy.Set("ChannelSettings", ns3::StringValue("{0, 40, BAND_UNSPECIFIED, 0}")); #endif wifiMac.SetType("ns3::ApWifiMac", "Ssid", ns3::SsidValue(ssid)); @@ -166,12 +172,6 @@ static void zoneCreation_cb(simgrid::s4u::NetZone const& zone) ns3::Simulator::Schedule(ns3::Seconds(start_time_value), &resumeWifiDevice, device); } -#if NS3_MINOR_VERSION < 33 - // This fails with "The channel width does not uniquely identify an operating channel" on v3.34, - // so we specified the ChannelWidth of wifiPhy to 40, above, when creating wifiPhy with v3.34 and higher - ns3::Config::Set("/NodeList/*/DeviceList/*/$ns3::WifiNetDevice/Phy/ChannelWidth", ns3::UintegerValue(40)); -#endif - mobility.SetPositionAllocator(positionAllocS); mobility.Install(nodes); ns3::Ipv4AddressHelper address; diff --git a/src/kernel/resource/models/ptask_L07.cpp b/src/kernel/resource/models/ptask_L07.cpp index d0344f972b..13f4986b7d 100644 --- a/src/kernel/resource/models/ptask_L07.cpp +++ b/src/kernel/resource/models/ptask_L07.cpp @@ -204,7 +204,8 @@ L07Action::L07Action(Model* model, const std::vector& host_list, con XBT_DEBUG("Creating a parallel task (%p) with %zu hosts and %zu unique links.", this, host_nb, link_nb); latency_ = latency; - set_variable(model->get_maxmin_system()->variable_new(this, 1.0, (rate > 0 ? rate : -1.0), host_nb + link_nb)); + // Allocate more space for constraints (+4) in case users want to mix ptasks and io streams + set_variable(model->get_maxmin_system()->variable_new(this, 1.0, (rate > 0 ? rate : -1.0), host_nb + link_nb + 4)); if (latency_ > 0) model->get_maxmin_system()->update_variable_penalty(get_variable(), 0.0); diff --git a/src/kernel/resource/models/ptask_L07.hpp b/src/kernel/resource/models/ptask_L07.hpp index 7bac5d6bca..0683aff609 100644 --- a/src/kernel/resource/models/ptask_L07.hpp +++ b/src/kernel/resource/models/ptask_L07.hpp @@ -42,11 +42,6 @@ public: Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override { return nullptr; } CpuAction* execute_parallel(const std::vector& host_list, const double* flops_amount, const double* bytes_amount, double rate) override; - Action* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, - double size) override - { - return nullptr; - } }; class CpuL07Model : public CpuModel { diff --git a/src/kernel/routing/ClusterZone.cpp b/src/kernel/routing/ClusterZone.cpp index 72f6dca848..fa82dd48d0 100644 --- a/src/kernel/routing/ClusterZone.cpp +++ b/src/kernel/routing/ClusterZone.cpp @@ -3,6 +3,7 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ +#include "simgrid/s4u/Host.hpp" #include "simgrid/kernel/routing/ClusterZone.hpp" #include "simgrid/kernel/routing/NetPoint.hpp" #include "src/kernel/resource/StandardLinkImpl.hpp" @@ -82,7 +83,17 @@ void ClusterBase::fill_leaf_from_cb(unsigned long position, const std::vectorget_netpoint(); + gw = netzone->get_gateway(); + } else { + s4u::Host* host = set_callbacks.host(get_iface(), dims, position); + netpoint = host->get_netpoint(); + } + xbt_assert(netpoint, "set_netpoint(elem=%lu): Invalid netpoint (nullptr)", position); if (netpoint->is_netzone()) { xbt_assert(gw && not gw->is_netzone(), diff --git a/src/kernel/routing/NetZone_test.hpp b/src/kernel/routing/NetZone_test.hpp index 3cc1c7ab79..76a955b54d 100644 --- a/src/kernel/routing/NetZone_test.hpp +++ b/src/kernel/routing/NetZone_test.hpp @@ -6,17 +6,16 @@ #ifndef NETZONE_TEST_HPP #define NETZONE_TEST_HPP -#include "simgrid/kernel/routing/NetPoint.hpp" -#include "simgrid/s4u/Host.hpp" #include "simgrid/s4u/NetZone.hpp" +#include "xbt/log.h" +XBT_LOG_EXTERNAL_CATEGORY(ker_platform); // Callback function common to several routing unit tests struct CreateHost { - std::pair - operator()(simgrid::s4u::NetZone* zone, const std::vector& /*coord*/, unsigned long id) const + simgrid::s4u::Host* operator()(simgrid::s4u::NetZone* zone, const std::vector& /*coord*/, + unsigned long id) const { - const simgrid::s4u::Host* host = zone->create_host(std::to_string(id), 1e9)->seal(); - return std::make_pair(host->get_netpoint(), nullptr); + return zone->create_host(std::to_string(id), "1Gf"); } }; diff --git a/src/kernel/xml/sg_platf.cpp b/src/kernel/xml/sg_platf.cpp index 3dd5269927..d953e005ab 100644 --- a/src/kernel/xml/sg_platf.cpp +++ b/src/kernel/xml/sg_platf.cpp @@ -149,7 +149,7 @@ void sg_platf_new_disk(const simgrid::kernel::routing::DiskCreationArgs* disk) /*************************************************************************************************/ /** @brief Auxiliary function to create hosts */ -static std::pair +static simgrid::s4u::Host* sg_platf_cluster_create_host(const simgrid::kernel::routing::ClusterCreationArgs* cluster, simgrid::s4u::NetZone* zone, const std::vector& /*coord*/, unsigned long id) { @@ -160,11 +160,10 @@ sg_platf_cluster_create_host(const simgrid::kernel::routing::ClusterCreationArgs std::string host_id = cluster->prefix + std::to_string(cluster->radicals[id]) + cluster->suffix; XBT_DEBUG("Cluster: creating host=%s speed=%f", host_id.c_str(), cluster->speeds.front()); - const simgrid::s4u::Host* host = zone->create_host(host_id, cluster->speeds) - ->set_core_count(cluster->core_amount) - ->set_properties(cluster->properties) - ->seal(); - return std::make_pair(host->get_netpoint(), nullptr); + simgrid::s4u::Host* host = zone->create_host(host_id, cluster->speeds) + ->set_core_count(cluster->core_amount) + ->set_properties(cluster->properties); + return host; } /** @brief Auxiliary function to create loopback links */ @@ -210,7 +209,8 @@ static void sg_platf_new_cluster_hierarchical(const simgrid::kernel::routing::Cl using simgrid::kernel::routing::FatTreeZone; using simgrid::kernel::routing::TorusZone; - auto set_host = std::bind(sg_platf_cluster_create_host, cluster, _1, _2, _3); + std::function set_host = + std::bind(sg_platf_cluster_create_host, cluster, _1, _2, _3); std::function set_loopback{}; std::function set_limiter{}; diff --git a/src/mc/transition/TransitionSynchro.cpp b/src/mc/transition/TransitionSynchro.cpp index a0c16de759..df630eec34 100644 --- a/src/mc/transition/TransitionSynchro.cpp +++ b/src/mc/transition/TransitionSynchro.cpp @@ -29,6 +29,10 @@ bool BarrierTransition::depends(const Transition* o) const if (o->type_ < type_) return o->depends(this); + // Actions executed by the same actor are always dependent + if (o->aid_ == aid_) + return true; + if (const auto* other = dynamic_cast(o)) { if (bar_ != other->bar_) return false; diff --git a/src/s4u/s4u_Mutex.cpp b/src/s4u/s4u_Mutex.cpp index 051f11c153..12fa915d5b 100644 --- a/src/s4u/s4u_Mutex.cpp +++ b/src/s4u/s4u_Mutex.cpp @@ -55,9 +55,9 @@ bool Mutex::try_lock() * * See @ref s4u_raii. */ -MutexPtr Mutex::create() +MutexPtr Mutex::create(bool recursive) { - auto* mutex = new kernel::activity::MutexImpl(); + auto* mutex = new kernel::activity::MutexImpl(recursive); return MutexPtr(&mutex->mutex(), false); } diff --git a/src/s4u/s4u_Netzone.cpp b/src/s4u/s4u_Netzone.cpp index 48377bbf13..1917f0f658 100644 --- a/src/s4u/s4u_Netzone.cpp +++ b/src/s4u/s4u_Netzone.cpp @@ -96,8 +96,8 @@ void NetZone::add_route(const NetZone* src, const NetZone* dst, const std::vecto pimpl_->add_route(src ? src->get_netpoint() : nullptr, dst ? dst->get_netpoint(): nullptr, src ? src->get_gateway() : nullptr, dst ? dst->get_gateway() : nullptr, links_direct, false); - pimpl_->add_route(src ? src->get_netpoint() : nullptr, dst ? dst->get_netpoint(): nullptr, - src ? src->get_gateway() : nullptr, dst ? dst->get_gateway() : nullptr, + pimpl_->add_route(dst ? dst->get_netpoint(): nullptr, src ? src->get_netpoint() : nullptr, + dst ? dst->get_gateway() : nullptr, src ? src->get_gateway() : nullptr, links_reverse, false); } diff --git a/src/s4u/s4u_Task.cpp b/src/s4u/s4u_Task.cpp index 7b04687668..bcc2f4eea4 100644 --- a/src/s4u/s4u_Task.cpp +++ b/src/s4u/s4u_Task.cpp @@ -1,45 +1,34 @@ +#include #include #include +#include #include +#include #include #include #include #include +#include +#include #include "src/simgrid/module.hpp" SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr) -/** - @beginrst - - -Tasks are designed to represent dataflows, i.e, graphs of Tasks. -Tasks can only be instancied using either -:cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init` -An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec `. -A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm `. - - - - @endrst - */ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin"); namespace simgrid::s4u { Task::Task(const std::string& name) : name_(name) {} -/** - * @brief Return True if the Task can start a new Activity. - * @note The Task is ready if not already doing something and there is at least one execution waiting in queue. +/** @param instance The Task instance to check. + * @brief Return True if this Task instance can start. */ -bool Task::ready_to_run() const +bool Task::ready_to_run(std::string instance) { - return running_instances_ < parallelism_degree_ && queued_firings_ > 0; + return running_instances_[instance] < parallelism_degree_[instance] && queued_firings_[instance] > 0; } -/** - * @param source The sender. +/** @param source The sender. * @brief Receive a token from another Task. * @note Check upon reception if the Task has received a token from each of its predecessors, * and in this case consumes those tokens and enqueue an execution. @@ -47,12 +36,10 @@ bool Task::ready_to_run() const void Task::receive(Task* source) { XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str()); - auto source_count = predecessors_[source]; predecessors_[source]++; - if (tokens_received_.size() <= queued_firings_ + source_count) - tokens_received_.emplace_back(); - tokens_received_[queued_firings_ + source_count][source] = source->token_; - bool enough_tokens = true; + if (source->token_ != nullptr) + tokens_received_[source].push_back(source->token_); + bool enough_tokens = true; for (auto const& [key, val] : predecessors_) if (val < 1) { enough_tokens = false; @@ -65,54 +52,88 @@ void Task::receive(Task* source) } } -/** - * @brief Task routine when finishing an execution. - * @note Set its working status as false. - * Add 1 to its count of finished executions. - * Call the on_this_end func. - * Fire on_end callback. - * Send a token to each of its successors. - * Start a new execution if possible. +/** @param instance The Taks instance to complete. + * @brief Task instance routine when finishing an execution of an instance. + * @note The dispatcher instance enqueues a firing for the next instance. + * The collector instance triggers the on_completion signals and sends tokens to successors. + * Others instances enqueue a firing of the collector instance. */ -void Task::complete() +void Task::complete(std::string instance) { xbt_assert(Actor::is_maestro()); - running_instances_--; - count_++; - on_this_completion(this); - on_completion(this); - for (auto const& t : successors_) - t->receive(this); - if (ready_to_run()) - fire(); + running_instances_[instance]--; + count_[instance]++; + if (instance == "collector") { + on_this_completion(this); + on_completion(this); + for (auto const& t : successors_) + t->receive(this); + } else if (instance == "dispatcher") { + auto next_instance = load_balancing_function_(); + xbt_assert(next_instance != "dispatcher" and next_instance != "collector", "Invalid instance selected: %s", + next_instance.c_str()); + queued_firings_[next_instance] = queued_firings_.at(next_instance) + 1; + while (ready_to_run(next_instance)) + fire(next_instance); + } else { + queued_firings_["collector"]++; + while (ready_to_run("collector")) + fire("collector"); + } + if (ready_to_run(instance)) + fire(instance); } -/** @param n The new parallelism degree of the Task. - * @brief Set the parallelism degree of the Task to inscrease or decrease horizontal scaling. - * @note When increasing the degree the function starts new instances if there is queued firings. - * When decreasing the degree the function does NOT stop running instances. - +/** @param n The new parallelism degree of the Task instance. + * @param instance The Task instance to modify. + * @note You can use instance "all" to modify the parallelism degree of all instances of this Task. + * When increasing the degree new executions are started if there is queued firings. + * When decreasing the degree instances already running are NOT stopped. */ -void Task::set_parallelism_degree(int n) +void Task::set_parallelism_degree(int n, std::string instance) { - xbt_assert(n > 0, "Parallelism degree of Tasks must be above 0."); - simgrid::kernel::actor::simcall_answered([this, n] { - parallelism_degree_ = n; - while (ready_to_run()) - fire(); + xbt_assert(n > 0, "Parallelism degree must be above 0."); + simgrid::kernel::actor::simcall_answered([this, n, &instance] { + if (instance == "all") { + for (auto& [key, value] : parallelism_degree_) { + parallelism_degree_[key] = n; + while (ready_to_run(key)) + fire(key); + } + } else { + parallelism_degree_[instance] = n; + while (ready_to_run(instance)) + fire(instance); + } }); } +/** @param bytes The internal bytes of the Task instance. + * @param instance The Task instance to modify. + * @note Internal bytes are used for Comms between the dispatcher and instance_n, + * and between instance_n and the collector if they are not on the same host. + */ +void Task::set_internal_bytes(int bytes, std::string instance) +{ + simgrid::kernel::actor::simcall_answered([this, bytes, &instance] { internal_bytes_to_send_[instance] = bytes; }); +} + +/** @param func The load balancing function. + * @note The dispatcher uses this function to determine which instance to trigger next. + */ +void Task::set_load_balancing_function(std::function func) +{ + simgrid::kernel::actor::simcall_answered([this, func] { load_balancing_function_ = func; }); +} + /** @param n The number of firings to enqueue. - * @brief Enqueue firing. - * @note Immediatly fire an activity if possible. */ void Task::enqueue_firings(int n) { simgrid::kernel::actor::simcall_answered([this, n] { - queued_firings_ += n; - while (ready_to_run()) - fire(); + queued_firings_["dispatcher"] += n; + while (ready_to_run("dispatcher")) + fire("dispatcher"); }); } @@ -125,46 +146,45 @@ void Task::set_name(std::string name) } /** @param amount The amount to set. - * @brief Set the amout of work to do. + * @param instance The Task instance to modify. * @note Amount in flop for ExecTask and in bytes for CommTask. */ -void Task::set_amount(double amount) +void Task::set_amount(double amount, std::string instance) { - simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; }); + simgrid::kernel::actor::simcall_answered([this, amount, &instance] { amount_[instance] = amount; }); } /** @param token The token to set. * @brief Set the token to send to successors. - * @note The token is passed to each successor after the task end, i.e., after the on_end callback. + * @note The token is passed to each successor after the Task instance collector end, i.e., after the on_completion + * callback. */ void Task::set_token(std::shared_ptr token) { simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; }); } -/** @return Map of tokens received for the next execution. - * @note If there is no queued execution for this task the map might not exist or be partially empty. +/** @param t The Task to deque a token from. */ -std::shared_ptr Task::get_next_token_from(TaskPtr t) +void Task::deque_token_from(TaskPtr t) { - return tokens_received_.front()[t]; + simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_[t].pop_front(); }); } -void Task::fire() +void Task::fire(std::string instance) { - if ((int)current_activities_.size() > parallelism_degree_) { - current_activities_.pop_front(); + if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) { + current_activities_[instance].pop_front(); + } + if (instance != "dispatcher" and instance != "collector") { + on_this_start(this); + on_start(this); } - on_this_start(this); - on_start(this); - running_instances_++; - queued_firings_ = std::max(queued_firings_ - 1, 0); - if (not tokens_received_.empty()) - tokens_received_.pop_front(); + running_instances_[instance]++; + queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0); } -/** @param successor The Task to add. - * @brief Add a successor to this Task. +/** @param successor The Task to add as a successor. * @note It also adds this as a predecessor of successor. */ void Task::add_successor(TaskPtr successor) @@ -175,8 +195,7 @@ void Task::add_successor(TaskPtr successor) }); } -/** @param successor The Task to remove. - * @brief Remove a successor from this Task. +/** @param successor The Task to remove from the successors of this Task. * @note It also remove this from the predecessors of successor. */ void Task::remove_successor(TaskPtr successor) @@ -187,6 +206,8 @@ void Task::remove_successor(TaskPtr successor) }); } +/** @brief Remove all successors from this Task. + */ void Task::remove_all_successors() { simgrid::kernel::actor::simcall_answered([this] { @@ -198,12 +219,58 @@ void Task::remove_all_successors() }); } +/** @param n The number of instances to add to this Task (>=0). + * @note Instances goes always from instance_0 to instance_x, + * where x is the current number of instance. + */ +void Task::add_instances(int n) +{ + xbt_assert(n >= 0, "Cannot add a negative number of instances (provided: %d)", n); + int instance_count = (int)amount_.size() - 2; + for (int i = instance_count; i < n + instance_count; i++) { + amount_["instance_" + std::to_string(i)] = amount_.at("instance_0"); + queued_firings_["instance_" + std::to_string(i)] = 0; + running_instances_["instance_" + std::to_string(i)] = 0; + count_["instance_" + std::to_string(i)] = 0; + parallelism_degree_["instance_" + std::to_string(i)] = parallelism_degree_.at("instance_0"); + current_activities_["instance_" + std::to_string(i)] = {}; + internal_bytes_to_send_["instance_" + std::to_string(i)] = internal_bytes_to_send_.at("instance_0"); + ; + } +} + +/** @param n The number of instances to remove from this Task (>=0). + * @note Instances goes always from instance_0 to instance_x, + * where x is the current number of instance. + * Running instances cannot be removed. + */ +void Task::remove_instances(int n) +{ + int instance_count = (int)amount_.size() - 2; + xbt_assert(n >= 0, "Cannot remove a negative number of instances (provided: %d)", n); + xbt_assert(instance_count - n > 0, "The number of instances must be above 0 (instances: %d, provided: %d)", + instance_count, n); + for (int i = instance_count - 1; i >= instance_count - n; i--) { + xbt_assert(running_instances_.at("instance_" + std::to_string(i)) == 0, + "Cannot remove a running instance (instances: %d)", i); + amount_.erase("instance_" + std::to_string(i)); + queued_firings_.erase("instance_" + std::to_string(i)); + running_instances_.erase("instance_" + std::to_string(i)); + count_.erase("instance_" + std::to_string(i)); + parallelism_degree_.erase("instance_" + std::to_string(i)); + current_activities_.erase("instance_" + std::to_string(i)); + } +} + /** * @brief Default constructor. */ -ExecTask::ExecTask(const std::string& name) : Task(name) {} +ExecTask::ExecTask(const std::string& name) : Task(name) +{ + set_load_balancing_function([]() { return "instance_0"; }); +} -/** @ingroup plugin_task +/** * @brief Smart Constructor. */ ExecTaskPtr ExecTask::init(const std::string& name) @@ -211,7 +278,7 @@ ExecTaskPtr ExecTask::init(const std::string& name) return ExecTaskPtr(new ExecTask(name)); } -/** @ingroup plugin_task +/** * @brief Smart Constructor. */ ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host) @@ -219,45 +286,109 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host) return init(name)->set_flops(flops)->set_host(host); } -/** - * @brief Do one execution of the Task. - * @note Call the on_this_start() func. - * Init and start the underlying Activity. +/** @param instance The Task instance to fire. + * @note Only the dispatcher instance triggers the on_start signal. + * Comms are created if hosts differ between dispatcher and the instance to fire, + * or between the instance and the collector. */ -void ExecTask::fire() +void ExecTask::fire(std::string instance) { - Task::fire(); - auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_); - exec->start(); - exec->on_this_completion_cb([this](Exec const&) { complete(); }); - store_activity(exec); + Task::fire(instance); + if (instance == "dispatcher" or instance == "collector") { + auto exec = Exec::init() + ->set_name(get_name() + "_" + instance) + ->set_flops_amount(get_amount(instance)) + ->set_host(host_[instance]); + exec->start(); + exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); }); + store_activity(exec, instance); + } else { + auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_[instance]); + if (host_["dispatcher"] == host_[instance]) { + exec->start(); + store_activity(exec, instance); + } else { + auto comm = Comm::sendto_init(host_["dispatcher"], host_[instance]) + ->set_name(get_name() + "_dispatcher_to_" + instance) + ->set_payload_size(get_internal_bytes("dispatcher")); + comm->add_successor(exec); + comm->start(); + store_activity(comm, instance); + } + if (host_[instance] == host_["collector"]) { + exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); }); + if (host_["dispatcher"] != host_[instance]) + store_activity(exec, instance); + } else { + auto comm = Comm::sendto_init(host_[instance], host_["collector"]) + ->set_name(get_name() + instance + "_to_collector") + ->set_payload_size(get_internal_bytes(instance)); + exec->add_successor(comm); + comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); }); + comm.detach(); + } + } } -/** @ingroup plugin_task - * @param host The host to set. +/** @param host The host to set. + * @param instance The Task instance to modify. * @brief Set a new host. */ -ExecTaskPtr ExecTask::set_host(Host* host) +ExecTaskPtr ExecTask::set_host(Host* host, std::string instance) { - kernel::actor::simcall_answered([this, host] { host_ = host; }); + kernel::actor::simcall_answered([this, host, &instance] { + if (instance == "all") + for (auto& [key, value] : host_) + host_[key] = host; + else + host_[instance] = host; + }); return this; } -/** @ingroup plugin_task - * @param flops The amount of flops to set. +/** @param flops The amount of flops to set. + * @param instance The Task instance to modify. */ -ExecTaskPtr ExecTask::set_flops(double flops) +ExecTaskPtr ExecTask::set_flops(double flops, std::string instance) { - kernel::actor::simcall_answered([this, flops] { set_amount(flops); }); + kernel::actor::simcall_answered([this, flops, &instance] { set_amount(flops, instance); }); return this; } +/** @param n The number of instances to add to this Task (>=0). + @note Instances goes always from instance_0 to instance_x, + where x is the current number of instance. + */ +void ExecTask::add_instances(int n) +{ + Task::add_instances(n); + int instance_count = (int)host_.size() - 2; + for (int i = instance_count; i < n + instance_count; i++) + host_["instance_" + std::to_string(i)] = host_.at("instance_0"); +} + +/** @param n The number of instances to remove from this Task (>=0). + @note Instances goes always from instance_0 to instance_x, + where x is the current number of instance. + Running instance cannot be removed. + */ +void ExecTask::remove_instances(int n) +{ + Task::remove_instances(n); + int instance_count = (int)host_.size() - 2; + for (int i = instance_count - 1; i >= instance_count - n; i--) + host_.erase("instance_" + std::to_string(i)); +} + /** * @brief Default constructor. */ -CommTask::CommTask(const std::string& name) : Task(name) {} +CommTask::CommTask(const std::string& name) : Task(name) +{ + set_load_balancing_function([]() { return "instance_0"; }); +} -/** @ingroup plugin_task +/** * @brief Smart constructor. */ CommTaskPtr CommTask::init(const std::string& name) @@ -265,7 +396,7 @@ CommTaskPtr CommTask::init(const std::string& name) return CommTaskPtr(new CommTask(name)); } -/** @ingroup plugin_task +/** * @brief Smart constructor. */ CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination) @@ -273,21 +404,29 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination); } -/** - * @brief Do one execution of the Task. - * @note Call the on_this_start() func. - * Init and start the underlying Activity. +/** @param instance The Task instance to fire. + * @note Only the dispatcher instance triggers the on_start signal. */ -void CommTask::fire() +void CommTask::fire(std::string instance) { - Task::fire(); - auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount()); - comm->start(); - comm->on_this_completion_cb([this](Comm const&) { complete(); }); - store_activity(comm); + Task::fire(instance); + if (instance == "dispatcher" or instance == "collector") { + auto exec = Exec::init() + ->set_name(get_name() + "_" + instance) + ->set_flops_amount(get_amount(instance)) + ->set_host(instance == "dispatcher" ? source_ : destination_); + exec->start(); + exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); }); + store_activity(exec, instance); + } else { + auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount()); + comm->start(); + comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); }); + store_activity(comm, instance); + } } -/** @ingroup plugin_task +/** * @param source The host to set. * @brief Set a new source host. */ @@ -297,7 +436,7 @@ CommTaskPtr CommTask::set_source(Host* source) return this; } -/** @ingroup plugin_task +/** * @param destination The host to set. * @brief Set a new destination host. */ @@ -307,7 +446,7 @@ CommTaskPtr CommTask::set_destination(Host* destination) return this; } -/** @ingroup plugin_task +/** * @param bytes The amount of bytes to set. */ CommTaskPtr CommTask::set_bytes(double bytes) @@ -319,9 +458,12 @@ CommTaskPtr CommTask::set_bytes(double bytes) /** * @brief Default constructor. */ -IoTask::IoTask(const std::string& name) : Task(name) {} +IoTask::IoTask(const std::string& name) : Task(name) +{ + set_load_balancing_function([]() { return "instance_0"; }); +} -/** @ingroup plugin_task +/** * @brief Smart Constructor. */ IoTaskPtr IoTask::init(const std::string& name) @@ -329,7 +471,7 @@ IoTaskPtr IoTask::init(const std::string& name) return IoTaskPtr(new IoTask(name)); } -/** @ingroup plugin_task +/** * @brief Smart Constructor. */ IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type) @@ -337,9 +479,8 @@ IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::Op return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type); } -/** @ingroup plugin_task +/** * @param disk The disk to set. - * @brief Set a new disk. */ IoTaskPtr IoTask::set_disk(Disk* disk) { @@ -347,7 +488,7 @@ IoTaskPtr IoTask::set_disk(Disk* disk) return this; } -/** @ingroup plugin_task +/** * @param bytes The amount of bytes to set. */ IoTaskPtr IoTask::set_bytes(double bytes) @@ -356,20 +497,34 @@ IoTaskPtr IoTask::set_bytes(double bytes) return this; } -/** @ingroup plugin_task */ +/** + * @param type The op type to set. + */ IoTaskPtr IoTask::set_op_type(Io::OpType type) { kernel::actor::simcall_answered([this, type] { type_ = type; }); return this; } -void IoTask::fire() +/** @param instance The Task instance to fire. + * @note Only the dispatcher instance triggers the on_start signal. + */ +void IoTask::fire(std::string instance) { - Task::fire(); - auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_); - io->start(); - io->on_this_completion_cb([this](Io const&) { complete(); }); - store_activity(io); + Task::fire(instance); + if (instance == "dispatcher" or instance == "collector") { + auto exec = Exec::init() + ->set_name(get_name() + "_" + instance) + ->set_flops_amount(get_amount(instance)) + ->set_host(disk_->get_host()); + exec->start(); + exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); }); + store_activity(exec, instance); + } else { + auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_); + io->start(); + io->on_this_completion_cb([this, instance](Io const&) { complete(instance); }); + store_activity(io, instance); + } } - } // namespace simgrid::s4u diff --git a/src/simgrid/sg_config.hpp b/src/simgrid/sg_config.hpp index 10dbe7baa7..25e4a168b2 100644 --- a/src/simgrid/sg_config.hpp +++ b/src/simgrid/sg_config.hpp @@ -10,7 +10,7 @@ /** Config Globals */ -XBT_PUBLIC_DATA int _sg_cfg_init_status; +XBT_PUBLIC_DATA int _sg_cfg_init_status; /* 0: not inited; 1: config module inited; 2: root zone of platform created */ XBT_PUBLIC void sg_config_init(int* argc, char** argv); XBT_PUBLIC void sg_config_finalize(); diff --git a/src/smpi/bindings/smpi_pmpi_request.cpp b/src/smpi/bindings/smpi_pmpi_request.cpp index 94bb62be2a..8cf9b41312 100644 --- a/src/smpi/bindings/smpi_pmpi_request.cpp +++ b/src/smpi/bindings/smpi_pmpi_request.cpp @@ -381,6 +381,8 @@ int PMPI_Sendrecv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, int CHECK_TYPE(8, recvtype) CHECK_BUFFER(1, sendbuf, sendcount, sendtype) CHECK_BUFFER(6, recvbuf, recvcount, recvtype) + CHECK_ARGS(sendbuf == recvbuf && sendcount > 0 && recvcount > 0, MPI_ERR_BUFFER, + "%s: Invalid parameters 1 and 6: sendbuf and recvbuf must be disjoint", __func__); CHECK_TAG(10, recvtag) CHECK_COMM(11) const SmpiBenchGuard suspend_bench; diff --git a/src/smpi/include/smpi_actor.hpp b/src/smpi/include/smpi_actor.hpp index f6194db4a5..cb097d26c3 100644 --- a/src/smpi/include/smpi_actor.hpp +++ b/src/smpi/include/smpi_actor.hpp @@ -68,8 +68,8 @@ public: smpi_trace_call_location_t* call_location(); void set_privatized_region(smpi_privatization_region_t region); smpi_privatization_region_t privatized_region() const; - s4u::Mailbox* mailbox() const { return mailbox_; } - s4u::Mailbox* mailbox_small() const { return mailbox_small_; } + s4u::Mailbox* mailbox(); + s4u::Mailbox* mailbox_small(); s4u::MutexPtr mailboxes_mutex() const; #if HAVE_PAPI int papi_event_set() const; diff --git a/src/smpi/internals/smpi_actor.cpp b/src/smpi/internals/smpi_actor.cpp index 83bb79837f..2cd94c93ef 100644 --- a/src/smpi/internals/smpi_actor.cpp +++ b/src/smpi/internals/smpi_actor.cpp @@ -26,8 +26,8 @@ ActorExt::ActorExt(s4u::Actor* actor) : actor_(actor) if (not simgrid::smpi::ActorExt::EXTENSION_ID.valid()) simgrid::smpi::ActorExt::EXTENSION_ID = simgrid::s4u::Actor::extension_create(); - mailbox_ = s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid())); - mailbox_small_ = s4u::Mailbox::by_name("small-" + std::to_string(actor_->get_pid())); + mailbox_ = nullptr; + mailbox_small_ = nullptr; mailboxes_mutex_ = s4u::Mutex::create(); timer_ = xbt_os_timer_new(); state_ = SmpiProcessState::UNINITIALIZED; @@ -91,6 +91,22 @@ int ActorExt::initialized() const return (state_ == SmpiProcessState::INITIALIZED); } +/** @brief Return main mailbox of the process */ +s4u::Mailbox* ActorExt::mailbox() +{ + if(mailbox_==nullptr) + mailbox_=s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid())); + return mailbox_; +} + +/** @brief Return mailbox for small messages */ +s4u::Mailbox* ActorExt::mailbox_small() +{ + if(mailbox_small_==nullptr) + mailbox_small_=s4u::Mailbox::by_name("small-" + std::to_string(actor_->get_pid())); + return mailbox_small_; +} + /** @brief Mark a process as initialized (=MPI_Init called) */ void ActorExt::mark_as_initialized() { @@ -242,7 +258,7 @@ void ActorExt::init() ext->comm_world_ = smpi_deployment_comm_world(ext->instance_id_); // set the process attached to the mailbox - ext->mailbox_small_->set_receiver(ext->actor_); + ext->mailbox_small()->set_receiver(ext->actor_); XBT_DEBUG("<%ld> SMPI process has been initialized: %p", ext->actor_->get_pid(), ext->actor_); } diff --git a/src/sthread/ObjectAccess.cpp b/src/sthread/ObjectAccess.cpp index 6e38668af8..25e9b52c32 100644 --- a/src/sthread/ObjectAccess.cpp +++ b/src/sthread/ObjectAccess.cpp @@ -51,6 +51,7 @@ struct ObjectOwner { simgrid::kernel::actor::ActorImpl* owner = nullptr; const char* file = nullptr; int line = -1; + int recursive_depth = 0; explicit ObjectOwner(simgrid::kernel::actor::ActorImpl* o) : owner(o) {} }; @@ -73,7 +74,7 @@ static ObjectOwner* get_owner(void* object) return o; } -int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line) +int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line, const char* func) { sthread_disable(); auto* self = simgrid::kernel::actor::ActorImpl::self(); @@ -83,10 +84,23 @@ int sthread_access_begin(void* objaddr, const char* objname, const char* file, i [self, objaddr, objname, file, line]() -> bool { XBT_INFO("%s takes %s", self->get_cname(), objname); auto* ownership = get_owner(objaddr); - if (ownership->owner != nullptr) { + if (ownership->owner == self) { + ownership->recursive_depth++; + return true; + } else if (ownership->owner != nullptr) { auto msg = std::string("Unprotected concurent access to ") + objname + ": " + ownership->owner->get_name(); - if (not xbt_log_no_loc) + if (not xbt_log_no_loc) { msg += simgrid::xbt::string_printf(" at %s:%d", ownership->file, ownership->line); + if (ownership->recursive_depth > 1) { + msg += simgrid::xbt::string_printf(" (and %d other locations)", ownership->recursive_depth - 1); + if (ownership->recursive_depth != 2) + msg += "s"; + } + } else { + msg += simgrid::xbt::string_printf(" from %d location", ownership->recursive_depth); + if (ownership->recursive_depth != 1) + msg += "s"; + } msg += " vs " + self->get_name(); if (xbt_log_no_loc) msg += std::string(" (locations hidden because of --log=no_loc)."); @@ -98,6 +112,7 @@ int sthread_access_begin(void* objaddr, const char* objname, const char* file, i ownership->owner = self; ownership->file = file; ownership->line = line; + ownership->recursive_depth = 1; return true; }, &observer); @@ -106,7 +121,7 @@ int sthread_access_begin(void* objaddr, const char* objname, const char* file, i sthread_enable(); return true; } -void sthread_access_end(void* objaddr, const char* objname, const char* file, int line) +void sthread_access_end(void* objaddr, const char* objname, const char* file, int line, const char* func) { sthread_disable(); auto* self = simgrid::kernel::actor::ActorImpl::self(); @@ -116,9 +131,12 @@ void sthread_access_end(void* objaddr, const char* objname, const char* file, in [self, objaddr, objname]() -> void { XBT_INFO("%s releases %s", self->get_cname(), objname); auto* ownership = get_owner(objaddr); - xbt_assert(ownership->owner == self, "safety check failed: %s is not owner of the object it's releasing.", - self->get_cname()); - ownership->owner = nullptr; + xbt_assert(ownership->owner == self, + "safety check failed: %s is not owner of the object it's releasing. That object owned by %s.", + self->get_cname(), (ownership->owner == nullptr ? "nobody" : ownership->owner->get_cname())); + ownership->recursive_depth--; + if (ownership->recursive_depth == 0) + ownership->owner = nullptr; }, &observer); sthread_enable(); diff --git a/src/sthread/sthread.c b/src/sthread/sthread.c index ee9e0847ea..2005f297ad 100644 --- a/src/sthread/sthread.c +++ b/src/sthread/sthread.c @@ -28,6 +28,12 @@ static int (*raw_mutex_trylock)(pthread_mutex_t*); static int (*raw_mutex_unlock)(pthread_mutex_t*); static int (*raw_mutex_destroy)(pthread_mutex_t*); +static int (*raw_pthread_mutexattr_init)(pthread_mutexattr_t*); +static int (*raw_pthread_mutexattr_settype)(pthread_mutexattr_t*, int); +static int (*raw_pthread_mutexattr_gettype)(const pthread_mutexattr_t* restrict, int* restrict); +static int (*raw_pthread_mutexattr_getrobust)(const pthread_mutexattr_t*, int*); +static int (*raw_pthread_mutexattr_setrobust)(pthread_mutexattr_t*, int); + static unsigned int (*raw_sleep)(unsigned int); static int (*raw_usleep)(useconds_t); static int (*raw_gettimeofday)(struct timeval*, void*); @@ -50,6 +56,12 @@ static void intercepter_init() raw_mutex_unlock = dlsym(RTLD_NEXT, "pthread_mutex_unlock"); raw_mutex_destroy = dlsym(RTLD_NEXT, "pthread_mutex_destroy"); + raw_pthread_mutexattr_init = dlsym(RTLD_NEXT, "pthread_mutexattr_init"); + raw_pthread_mutexattr_settype = dlsym(RTLD_NEXT, "pthread_mutexattr_settype"); + raw_pthread_mutexattr_gettype = dlsym(RTLD_NEXT, "pthread_mutexattr_gettype"); + raw_pthread_mutexattr_getrobust = dlsym(RTLD_NEXT, "pthread_mutexattr_getrobust"); + raw_pthread_mutexattr_setrobust = dlsym(RTLD_NEXT, "pthread_mutexattr_setrobust"); + raw_sleep = dlsym(RTLD_NEXT, "sleep"); raw_usleep = dlsym(RTLD_NEXT, "usleep"); raw_gettimeofday = dlsym(RTLD_NEXT, "gettimeofday"); @@ -85,6 +97,32 @@ int pthread_create(pthread_t* thread, const pthread_attr_t* attr, void* (*start_ sthread_enable(); return res; } + +#define _STHREAD_CONCAT(a, b) a##b +#define intercepted_call(name, raw_params, call_params, sim_params) \ + int _STHREAD_CONCAT(pthread_, name) raw_params \ + { \ + if (_STHREAD_CONCAT(raw_pthread_, name) == NULL) \ + intercepter_init(); \ + if (sthread_inside_simgrid) \ + return _STHREAD_CONCAT(raw_pthread_, name) call_params; \ + \ + sthread_disable(); \ + int res = _STHREAD_CONCAT(sthread_, name) sim_params; \ + sthread_enable(); \ + return res; \ + } + +intercepted_call(mutexattr_init, (pthread_mutexattr_t * attr), (attr), ((sthread_mutexattr_t*)attr)); +intercepted_call(mutexattr_settype, (pthread_mutexattr_t * attr, int type), (attr, type), + ((sthread_mutexattr_t*)attr, type)); +intercepted_call(mutexattr_gettype, (const pthread_mutexattr_t* restrict attr, int* type), (attr, type), + ((sthread_mutexattr_t*)attr, type)); +intercepted_call(mutexattr_setrobust, (pthread_mutexattr_t* restrict attr, int robustness), (attr, robustness), + ((sthread_mutexattr_t*)attr, robustness)); +intercepted_call(mutexattr_getrobust, (const pthread_mutexattr_t* restrict attr, int* restrict robustness), + (attr, robustness), ((sthread_mutexattr_t*)attr, robustness)); + int pthread_join(pthread_t thread, void** retval) { if (raw_pthread_join == NULL) @@ -107,7 +145,7 @@ int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* attr) return raw_mutex_init(mutex, attr); sthread_disable(); - int res = sthread_mutex_init((sthread_mutex_t*)mutex, attr); + int res = sthread_mutex_init((sthread_mutex_t*)mutex, (sthread_mutexattr_t*)attr); sthread_enable(); return res; } diff --git a/src/sthread/sthread.h b/src/sthread/sthread.h index 65279685e6..f0a9117db1 100644 --- a/src/sthread/sthread.h +++ b/src/sthread/sthread.h @@ -31,10 +31,22 @@ typedef unsigned long int sthread_t; int sthread_create(sthread_t* thread, const /*pthread_attr_t*/ void* attr, void* (*start_routine)(void*), void* arg); int sthread_join(sthread_t thread, void** retval); +typedef struct { + unsigned recursive : 1; + unsigned errorcheck : 1; + unsigned robust : 1; +} sthread_mutexattr_t; + +int sthread_mutexattr_init(sthread_mutexattr_t* attr); +int sthread_mutexattr_settype(sthread_mutexattr_t* attr, int type); +int sthread_mutexattr_gettype(const sthread_mutexattr_t* attr, int* type); +int sthread_mutexattr_getrobust(const sthread_mutexattr_t* attr, int* robustness); +int sthread_mutexattr_setrobust(sthread_mutexattr_t* attr, int robustness); + typedef struct { void* mutex; } sthread_mutex_t; -int sthread_mutex_init(sthread_mutex_t* mutex, const /*pthread_mutexattr_t*/ void* attr); +int sthread_mutex_init(sthread_mutex_t* mutex, const sthread_mutexattr_t* attr); int sthread_mutex_lock(sthread_mutex_t* mutex); int sthread_mutex_trylock(sthread_mutex_t* mutex); int sthread_mutex_unlock(sthread_mutex_t* mutex); @@ -53,8 +65,8 @@ int sthread_sem_timedwait(sthread_sem_t* sem, const struct timespec* abs_timeout int sthread_gettimeofday(struct timeval* tv); void sthread_sleep(double seconds); -int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line); -void sthread_access_end(void* objaddr, const char* objname, const char* file, int line); +int sthread_access_begin(void* objaddr, const char* objname, const char* file, int line, const char* function); +void sthread_access_end(void* objaddr, const char* objname, const char* file, int line, const char* function); #if defined(__cplusplus) } diff --git a/src/sthread/sthread_impl.cpp b/src/sthread/sthread_impl.cpp index 45ba730029..de2667b0e1 100644 --- a/src/sthread/sthread_impl.cpp +++ b/src/sthread/sthread_impl.cpp @@ -6,6 +6,9 @@ /* SimGrid's pthread interposer. Actual implementation of the symbols (see the comment in sthread.h) */ #include "smpi/smpi.h" +#include "xbt/asserts.h" +#include "xbt/ex.h" +#include "xbt/log.h" #include "xbt/string.hpp" #include #include @@ -38,14 +41,22 @@ int sthread_main(int argc, char** argv, char** envp, int (*raw_main)(int, char** { /* Do not intercept the main when run from SMPI: it will initialize the simulation properly */ for (int i = 0; envp[i] != nullptr; i++) - if (std::string_view(envp[i]).rfind("SMPI_GLOBAL_SIZE", 0) == 0) + if (std::string_view(envp[i]).rfind("SMPI_GLOBAL_SIZE", 0) == 0) { + printf("sthread refuses to intercept the SMPI application %s directly, as its interception is done otherwise.\n", + argv[0]); return raw_main(argc, argv, envp); + } - /* If not in SMPI, the old main becomes an actor in a newly created simulation */ - std::ostringstream id; - id << std::this_thread::get_id(); + /* Do not intercept valgrind step 1 */ + if (not strcmp(argv[0], "/usr/bin/valgrind.bin") || not strcmp(argv[0], "/bin/sh")) { + printf("sthread refuses to intercept the execution of %s. Running the application unmodified.\n", argv[0]); + fflush(stdout); + return raw_main(argc, argv, envp); + } - XBT_DEBUG("sthread main() is starting in thread %s", id.str().c_str()); + /* If not in SMPI, the old main becomes an actor in a newly created simulation */ + printf("sthread is intercepting the execution of %s\n", argv[0]); + fflush(stdout); sg4::Engine e(&argc, argv); auto* zone = sg4::create_full_zone("world"); @@ -56,7 +67,6 @@ int sthread_main(int argc, char** argv, char** envp, int (*raw_main)(int, char** sthread_enable(); sg4::ActorPtr main_actor = sg4::Actor::create("main thread", lilibeth, raw_main, argc, argv, envp); - XBT_INFO("Starting the simulation."); sg4::Engine::get_instance()->run(); sthread_disable(); XBT_INFO("All threads exited. Terminating the simulation."); @@ -108,9 +118,57 @@ int sthread_join(sthread_t thread, void** /*retval*/) return 0; } -int sthread_mutex_init(sthread_mutex_t* mutex, const void* /*pthread_mutexattr_t* attr*/) +int sthread_mutexattr_init(sthread_mutexattr_t* attr) +{ + memset(attr, 0, sizeof(*attr)); + return 0; +} +int sthread_mutexattr_settype(sthread_mutexattr_t* attr, int type) { - auto m = sg4::Mutex::create(); + switch (type) { + case PTHREAD_MUTEX_NORMAL: + xbt_assert(not attr->recursive, "S4U does not allow to remove the recursivness of a mutex."); + attr->recursive = 0; + break; + case PTHREAD_MUTEX_RECURSIVE: + attr->recursive = 1; + attr->errorcheck = 0; // reset + break; + case PTHREAD_MUTEX_ERRORCHECK: + attr->errorcheck = 1; + THROW_UNIMPLEMENTED; + break; + default: + THROW_IMPOSSIBLE; + } + return 0; +} +int sthread_mutexattr_gettype(const sthread_mutexattr_t* attr, int* type) +{ + if (attr->recursive) + *type = PTHREAD_MUTEX_RECURSIVE; + else if (attr->errorcheck) + *type = PTHREAD_MUTEX_ERRORCHECK; + else + *type = PTHREAD_MUTEX_NORMAL; + return 0; +} +int sthread_mutexattr_getrobust(const sthread_mutexattr_t* attr, int* robustness) +{ + *robustness = attr->robust; + return 0; +} +int sthread_mutexattr_setrobust(sthread_mutexattr_t* attr, int robustness) +{ + attr->robust = robustness; + if (robustness) + THROW_UNIMPLEMENTED; + return 0; +} + +int sthread_mutex_init(sthread_mutex_t* mutex, const sthread_mutexattr_t* attr) +{ + auto m = sg4::Mutex::create(attr != nullptr && attr->recursive); intrusive_ptr_add_ref(m.get()); mutex->mutex = m.get(); @@ -123,6 +181,7 @@ int sthread_mutex_lock(sthread_mutex_t* mutex) if (mutex->mutex == nullptr) sthread_mutex_init(mutex, nullptr); + XBT_DEBUG("%s(%p)", __FUNCTION__, mutex); static_cast(mutex->mutex)->lock(); return 0; } @@ -133,7 +192,10 @@ int sthread_mutex_trylock(sthread_mutex_t* mutex) if (mutex->mutex == nullptr) sthread_mutex_init(mutex, nullptr); - return static_cast(mutex->mutex)->try_lock(); + XBT_DEBUG("%s(%p)", __FUNCTION__, mutex); + if (static_cast(mutex->mutex)->try_lock()) + return 0; + return EBUSY; } int sthread_mutex_unlock(sthread_mutex_t* mutex) @@ -142,6 +204,7 @@ int sthread_mutex_unlock(sthread_mutex_t* mutex) if (mutex->mutex == nullptr) sthread_mutex_init(mutex, nullptr); + XBT_DEBUG("%s(%p)", __FUNCTION__, mutex); static_cast(mutex->mutex)->unlock(); return 0; } @@ -151,6 +214,7 @@ int sthread_mutex_destroy(sthread_mutex_t* mutex) if (mutex->mutex == nullptr) sthread_mutex_init(mutex, nullptr); + XBT_DEBUG("%s(%p)", __FUNCTION__, mutex); intrusive_ptr_release(static_cast(mutex->mutex)); return 0; } diff --git a/src/xbt/xbt_os_time.c b/src/xbt/xbt_os_time.c index b6a16ad27f..f0d5401389 100644 --- a/src/xbt/xbt_os_time.c +++ b/src/xbt/xbt_os_time.c @@ -40,11 +40,11 @@ double xbt_os_time(void) #if HAVE_GETTIMEOFDAY struct timeval tv; gettimeofday(&tv, NULL); + + return (double)tv.tv_sec + (double)tv.tv_usec / 1e6; #else /* no gettimeofday => poor resolution */ return (double) (time(NULL)); #endif /* HAVE_GETTIMEOFDAY? */ - - return (double)tv.tv_sec + (double)tv.tv_usec / 1e6; } void xbt_os_sleep(double sec) @@ -59,7 +59,7 @@ void xbt_os_sleep(double sec) struct timeval timeout; timeout.tv_sec = (long)sec; - timeout.tv_usec = (long)(sec - floor(sec)) * 1e6); + timeout.tv_usec = (long)(sec - floor(sec)) * 1e6; select(0, NULL, NULL, NULL, &timeout); #endif diff --git a/teshsuite/smpi/MBI/CMakeLists.txt b/teshsuite/smpi/MBI/CMakeLists.txt index d4f9eb8c92..d0c587e9cc 100644 --- a/teshsuite/smpi/MBI/CMakeLists.txt +++ b/teshsuite/smpi/MBI/CMakeLists.txt @@ -64,6 +64,7 @@ if (enable_smpi_MBI_testsuite) add_executable(mbi_${basefile} EXCLUDE_FROM_ALL ${CMAKE_BINARY_DIR}/MBI/${cfile}) target_link_libraries(mbi_${basefile} simgrid) target_compile_options(mbi_${basefile} PRIVATE "-Wno-unused-variable") + target_compile_options(mbi_${basefile} PRIVATE "-Wno-unused-but-set-variable") set_target_properties(mbi_${basefile} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/MBI) add_dependencies(tests-mbi mbi_${basefile}) diff --git a/teshsuite/smpi/MBI/InputHazardGenerator.py b/teshsuite/smpi/MBI/InputHazardGenerator.py index 899e810f96..d9f8a5141c 100755 --- a/teshsuite/smpi/MBI/InputHazardGenerator.py +++ b/teshsuite/smpi/MBI/InputHazardGenerator.py @@ -26,11 +26,11 @@ END_MPI_FEATURES BEGIN_MBI_TESTS $ mpirun -np 2 ${EXE} 1 - | @{outcome}@ - | @{errormsg}@ + | @{outcome1}@ + | @{errormsg1}@ $ mpirun -np 2 ${EXE} 2 - | @{outcome}@ - | @{errormsg}@ + | @{outcome2}@ + | @{errormsg2}@ END_MBI_TESTS ////////////////////// End of MBI headers /////////////////// */ @@ -127,16 +127,20 @@ for s in gen.send + gen.isend: replace = patterns.copy() replace['shortdesc'] = 'Correct call ordering.' replace['longdesc'] = 'Correct call ordering.' - replace['outcome'] = 'OK' - replace['errormsg'] = 'OK' + replace['outcome1'] = 'OK' + replace['errormsg1'] = 'OK' + replace['outcome2'] = 'OK' + replace['errormsg2'] = 'OK' gen.make_file(template, f'InputHazardCallOrdering_{r}_{s}_ok.c', replace) # Generate the incorrect matching replace = patterns.copy() replace['shortdesc'] = 'Missing Send function.' replace['longdesc'] = 'Missing Send function call for a path depending to input, a deadlock is created.' - replace['outcome'] = 'ERROR: IHCallMatching' - replace['errormsg'] = 'P2P mistmatch. Missing @{r}@ at @{filename}@:@{line:MBIERROR}@.' + replace['outcome1'] = 'OK' + replace['errormsg1'] = 'OK' + replace['outcome2'] = 'ERROR: IHCallMatching' + replace['errormsg2'] = 'P2P mistmatch. Missing @{r}@ at @{filename}@:@{line:MBIERROR}@.' replace['errorcond'] = '/* MBIERROR */' replace['operation1b'] = '' replace['fini1b'] = '' @@ -172,16 +176,20 @@ for c in gen.coll: replace = patterns.copy() replace['shortdesc'] = 'Correct call ordering.' replace['longdesc'] = 'Correct call ordering.' - replace['outcome'] = 'OK' - replace['errormsg'] = 'OK' + replace['outcome1'] = 'OK' + replace['errormsg1'] = 'OK' + replace['outcome2'] = 'OK' + replace['errormsg2'] = 'OK' gen.make_file(template, f'InputHazardCallOrdering_{c}_ok.c', replace) # Generate the incorrect matching replace = patterns.copy() replace['shortdesc'] = 'Missing collective function call.' replace['longdesc'] = 'Missing collective function call for a path depending to input, a deadlock is created.' - replace['outcome'] = 'ERROR: IHCallMatching' - replace['errormsg'] = 'P2P mistmatch. Missing @{c}@ at @{filename}@:@{line:MBIERROR}@.' + replace['outcome1'] = 'OK' + replace['errormsg1'] = 'OK' + replace['outcome2'] = 'ERROR: IHCallMatching' + replace['errormsg2'] = 'P2P mistmatch. Missing @{c}@ at @{filename}@:@{line:MBIERROR}@.' replace['errorcond'] = '/* MBIERROR */' replace['operation1b'] = '' replace['fini1b'] = '' diff --git a/teshsuite/smpi/MBI/MBIutils.py b/teshsuite/smpi/MBI/MBIutils.py index c422cca055..9a7d837293 100644 --- a/teshsuite/smpi/MBI/MBIutils.py +++ b/teshsuite/smpi/MBI/MBIutils.py @@ -67,6 +67,9 @@ possible_details = { 'GlobalConcurrency':'DGlobalConcurrency', # larger scope 'BufferingHazard':'EBufferingHazard', + # Input Hazard + 'IHCallMatching':'InputHazard', + 'OK':'FOK'} error_scope = { diff --git a/teshsuite/smpi/MBI/P2PBufferingGenerator.py b/teshsuite/smpi/MBI/P2PBufferingGenerator.py index 05253b8738..663a548a32 100755 --- a/teshsuite/smpi/MBI/P2PBufferingGenerator.py +++ b/teshsuite/smpi/MBI/P2PBufferingGenerator.py @@ -25,11 +25,11 @@ END_MPI_FEATURES BEGIN_MBI_TESTS $ mpirun -np 4 $zero_buffer ${EXE} - | @{outcome1}@ - | @{errormsg1}@ + | @{outcome_zerob}@ + | @{errormsg_zerob}@ $ mpirun -np 4 $infty_buffer ${EXE} - | @{outcome1}@ - | @{errormsg1}@ + | @{outcome_infty}@ + | @{errormsg_infty}@ END_MBI_TESTS ////////////////////// End of MBI headers /////////////////// */ @@ -123,8 +123,10 @@ for s in gen.send + gen.isend: replace = patterns.copy() replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ may not be matched' replace['longdesc'] = 'Processes 0 and 1 both call @{s}@ and @{r}@. This results in a deadlock depending on the buffering mode' - replace['outcome1'] = 'ERROR: BufferingHazard' - replace['errormsg1'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause by two processes call {s} before {r}.' + replace['outcome_zerob'] = 'ERROR: BufferingHazard' + replace['errormsg_zerob'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause by two processes call {s} before {r}.' + replace['outcome_infty'] = 'OK' + replace['errormsg_infty'] = 'OK' gen.make_file(template, f'P2PBuffering_{s}_{r}_{s}_{r}_nok.c', replace) # Generate the incorrect matching with send message to the same process depending on the buffering mode (send + recv) @@ -136,8 +138,10 @@ for s in gen.send + gen.isend: replace['dest2'] = '1' replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ may not be matched' replace['longdesc'] = 'Processes 0 and 1 both call @{s}@ and @{r}@. This results in a deadlock depending on the buffering mode' - replace['outcome1'] = 'ERROR: BufferingHazard' - replace['errormsg1'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause Send message to the same process.' + replace['outcome_zerob'] = 'ERROR: BufferingHazard' + replace['errormsg_zerob'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause Send message to the same process.' + replace['outcome_infty'] = 'OK' + replace['errormsg_infty'] = 'OK' gen.make_file(template, f'P2PBuffering_SameProcess_{s}_{r}_nok.c', replace) # Generate the incorrect matching with circular send message depending on the buffering mode (send + recv) @@ -155,16 +159,20 @@ for s in gen.send + gen.isend: replace['operation2c'] = gen.operation[r]("2") replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ may not be matched' replace['longdesc'] = 'Processes 0 and 1 both call @{s}@ and @{r}@. This results in a deadlock depending on the buffering mode' - replace['outcome1'] = 'ERROR: BufferingHazard' - replace['errormsg1'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause circular send message.' + replace['outcome_zerob'] = 'ERROR: BufferingHazard' + replace['errormsg_zerob'] = f'Buffering Hazard. Possible deadlock depending the buffer size of MPI implementation and system environment cause circular send message.' + replace['outcome_infty'] = 'OK' + replace['errormsg_infty'] = 'OK' gen.make_file(template, f'P2PBuffering_Circular_{s}_{r}_nok.c', replace) # Generate the incorrect matching depending on the buffering mode (recv + send) replace = patterns.copy() replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ are not matched' replace['longdesc'] = 'Processes 0 and 1 both call @{r}@ and @{s}@. This results in a deadlock' - replace['outcome1'] = 'ERROR: CallMatching' - replace['errormsg1'] = 'ERROR: CallMatching' + replace['outcome_zerob'] = 'ERROR: CallMatching' + replace['errormsg_zerob'] = 'ERROR: CallMatching' + replace['outcome_infty'] = 'ERROR: CallMatching' + replace['errormsg_infty'] = 'ERROR: CallMatching' replace['operation1a'] = gen.operation[r]("2") replace['fini1a'] = gen.fini[r]("2") replace['operation2a'] = gen.operation[s]("1") @@ -179,10 +187,19 @@ for s in gen.send + gen.isend: replace = patterns.copy() replace['shortdesc'] = 'Point to point @{s}@ and @{r}@ are correctly matched' replace['longdesc'] = 'Process 0 calls @{s}@ and process 1 calls @{r}@.' - replace['outcome1'] = 'OK' - replace['errormsg1'] = 'OK' - replace['fini1a'] = gen.fini[s]("1") - replace['fini2a'] = gen.fini[r]("2") + replace['outcome_zerob'] = 'OK' + replace['errormsg_zerob'] = 'OK' + replace['outcome_infty'] = 'OK' + replace['errormsg_infty'] = 'OK' + patterns['init1'] = gen.init[s]("1") replace['operation1a'] = gen.operation[s]("1") - replace['operation2a'] = gen.operation[r]("2") + replace['fini1a'] = gen.fini[s]("1") + replace['operation2a'] = '' + replace['fini2a'] = '' + + patterns['init2'] = gen.init[r]("2") + replace['operation1b'] = gen.operation[r]("2") + replace['fini1b'] = gen.fini[r]("2") + replace['operation2b'] = '' + replace['fini2b'] = '' gen.make_file(template, f'P2PCallMatching_{s}_{r}_{r}_{s}_ok.c', replace) diff --git a/teshsuite/smpi/MBI/P2PSendrecvArgGenerator.py b/teshsuite/smpi/MBI/P2PSendrecvArgGenerator.py index 8ed991fed7..ef6fb16e42 100644 --- a/teshsuite/smpi/MBI/P2PSendrecvArgGenerator.py +++ b/teshsuite/smpi/MBI/P2PSendrecvArgGenerator.py @@ -126,7 +126,7 @@ for s in gen.send: # Generate a code with non distinct buffer replace = patterns.copy() replace['shortdesc'] = 'Invalid buffer on Sendrecv function.' - replace['longdesc'] = 'Invalid buffer on Sendrecv, the tow buffers must be distinct.' + replace['longdesc'] = 'Invalid buffer on Sendrecv, the two buffers must be distinct.' replace['outcome'] = 'ERROR: InvalidBuffer' replace['errormsg'] = '@{sr}@ at @{filename}@:@{line:MBIERROR}@ send buffer and recv buffer must be distinct.' replace['change_arg'] = gen.write[sr]("2") diff --git a/teshsuite/smpi/MBI/generator_utils.py b/teshsuite/smpi/MBI/generator_utils.py index 3801ae271b..af5750bbd5 100644 --- a/teshsuite/smpi/MBI/generator_utils.py +++ b/teshsuite/smpi/MBI/generator_utils.py @@ -358,7 +358,7 @@ start['MPI_Sendrecv'] = lambda n: "" operation['MPI_Sendrecv'] = lambda n: f'MPI_Sendrecv(psbuf{n}, buff_size, type, dest, stag, prbuf{n}, buff_size, type, src, rtag, newcom, &sta{n});' fini['MPI_Sendrecv'] = lambda n: "" free['MPI_Sendrecv'] = lambda n: "" -write['MPI_Sendrecv'] = lambda n: f"prbuf{n} = &sbuf{n}[2];" +write['MPI_Sendrecv'] = lambda n: f"prbuf{n} = &sbuf{n}[0];" ### P2P:nonblocking diff --git a/tools/cmake/Distrib.cmake b/tools/cmake/Distrib.cmake index 4f0c8c7d7e..50952960ff 100644 --- a/tools/cmake/Distrib.cmake +++ b/tools/cmake/Distrib.cmake @@ -43,6 +43,9 @@ add_custom_target(simgrid_convert_TI_traces ALL # libraries install(TARGETS simgrid DESTINATION ${CMAKE_INSTALL_LIBDIR}/) +if("${CMAKE_SYSTEM}" MATCHES "Linux") + install(TARGETS sthread DESTINATION ${CMAKE_INSTALL_LIBDIR}/) +endif() # pkg-config files configure_file("${CMAKE_HOME_DIRECTORY}/tools/pkg-config/simgrid.pc.in" diff --git a/tools/cmake/MakeLib.cmake b/tools/cmake/MakeLib.cmake index f41f3b70af..e0357bf6e4 100644 --- a/tools/cmake/MakeLib.cmake +++ b/tools/cmake/MakeLib.cmake @@ -29,6 +29,7 @@ add_dependencies(simgrid maintainer_files) if("${CMAKE_SYSTEM}" MATCHES "Linux") add_library(sthread SHARED ${STHREAD_SRC}) + set_target_properties(sthread PROPERTIES VERSION ${libsimgrid_version}) set_property(TARGET sthread APPEND PROPERTY INCLUDE_DIRECTORIES "${INTERNAL_INCLUDES}") target_link_libraries(sthread simgrid) diff --git a/tools/jenkins/build.sh b/tools/jenkins/build.sh index a9d9d64f6b..050c850ead 100755 --- a/tools/jenkins/build.sh +++ b/tools/jenkins/build.sh @@ -129,6 +129,13 @@ echo "Branch built is $branch_name" NUMBER_OF_PROCESSORS="$(nproc)" || NUMBER_OF_PROCESSORS=1 GENERATOR="Unix Makefiles" +BUILDER=make +VERBOSE_BUILD="VERBOSE=1" +if which ninja 2>/dev/null >/dev/null ; then + GENERATOR=Ninja + BUILDER=ninja + VERBOSE_BUILD="-v" +fi ulimit -c 0 || true @@ -169,7 +176,7 @@ echo "XX pwd: $(pwd)" echo "XX" cmake -G"$GENERATOR" -Denable_documentation=OFF "$WORKSPACE" -make dist -j $NUMBER_OF_PROCESSORS +${BUILDER} dist -j $NUMBER_OF_PROCESSORS SIMGRID_VERSION=$(cat VERSION) echo "XX" @@ -217,7 +224,7 @@ cmake -G"$GENERATOR" ${INSTALL:+-DCMAKE_INSTALL_PREFIX=$INSTALL} \ -DCMAKE_CXX_COMPILER_LAUNCHER=ccache \ "$SRCFOLDER" -make -j $NUMBER_OF_PROCESSORS VERBOSE=1 tests +${BUILDER} -j $NUMBER_OF_PROCESSORS ${VERBOSE_BUILD} tests echo "XX" echo "XX Run the tests" @@ -233,7 +240,7 @@ if test -n "$INSTALL" && [ "${branch_name}" = "origin/master" ] ; then rm -rf "$INSTALL" - make install + ${BUILDER} install fi echo "XX" diff --git a/tools/jenkins/ci-bigdft.sh b/tools/jenkins/ci-bigdft.sh index 29673a7af0..6833428c6c 100755 --- a/tools/jenkins/ci-bigdft.sh +++ b/tools/jenkins/ci-bigdft.sh @@ -22,6 +22,10 @@ export PATH=$PWD/simgrid-dev/smpi_script/bin/:$PATH export LD_LIBRARY_PATH=$PWD/simgrid-dev/lib/:$LD_LIBRARY_PATH export JHBUILD_RUN_AS_ROOT=1 +#workaround issue with ntpoly 3.0.0 +sed -i 's|repository type="tarball" name="ntpoly" href="https://github.com/william-dawson/NTPoly/archive/"|repository type="git" name="ntpoly" href="https://github.com/william-dawson/"|' ../modulesets/hpc-upstream.modules +sed -i 's|module="ntpoly-v3.0.0.tar.gz"|module="ntpoly"|' ../modulesets/hpc-upstream.modules + ../Installer.py autogen -y ../Installer.py -f ../../tools/jenkins/gfortran-simgrid.rc -y build diff --git a/tools/jenkins/ci-starpu.sh b/tools/jenkins/ci-starpu.sh index f435e01c26..73864b4426 100755 --- a/tools/jenkins/ci-starpu.sh +++ b/tools/jenkins/ci-starpu.sh @@ -12,12 +12,12 @@ echo "XXXXXXXXXXXXXXXX Install APT dependencies" $SUDO apt-get update $SUDO apt-get -y install build-essential libboost-all-dev wget git xsltproc -for i in master 1.3 ; do +for i in master starpu-1.3 ; do echo "XXXXXXXXXXXXXXXX Build and test StarPU $i" rm -rf starpu* - wget https://files.inria.fr/starpu/simgrid/starpu-simgrid-$i.tar.gz - md5sum starpu-simgrid-$i.tar.gz - tar xf starpu-simgrid-$i.tar.gz + wget https://files.inria.fr/starpu/testing/$i/starpu-nightly-latest.tar.gz + md5sum starpu-nightly-latest.tar.gz + tar xf starpu-nightly-latest.tar.gz cd starpu-1* # NOTE: Do *not* introduce parameters to "make it work" here. diff --git a/tools/jenkins/project_description.sh b/tools/jenkins/project_description.sh index 4a52a6fc65..8af5e8d804 100755 --- a/tools/jenkins/project_description.sh +++ b/tools/jenkins/project_description.sh @@ -26,7 +26,7 @@ get_json(){ } get_ns3(){ - sed -n 's/.*-- ns-3 found (v\(3[-.0-9a-z]\+\); minor:.*/\1/p;T;q' ./consoleText + sed -n 's/.*-- ns-3 found (v\(3[-.0-9a-z]\+\).*/\1/p;T;q' ./consoleText } get_python(){ diff --git a/tools/tesh/catch-signal.tesh b/tools/tesh/catch-signal.tesh index 2799b4f0d9..42fede22bc 100644 --- a/tools/tesh/catch-signal.tesh +++ b/tools/tesh/catch-signal.tesh @@ -19,7 +19,7 @@ $ perl segfault.pl p Check that we return the expected return value on SEGV ! expect return 11 < $ perl segfault.pl -$ ${bindir:=.}/tesh +$ ${bindir:=.}/tesh --no-auto-valgrind > Test suite from stdin > [(stdin):1] perl segfault.pl > Test suite `(stdin)': NOK (<(stdin):1> got signal SIGSEGV) diff --git a/tools/tesh/catch-timeout-output.tesh b/tools/tesh/catch-timeout-output.tesh index 227d93c77a..e308580806 100644 --- a/tools/tesh/catch-timeout-output.tesh +++ b/tools/tesh/catch-timeout-output.tesh @@ -14,6 +14,7 @@ > @@ -0,0 +1 @@ > +I crashed > Test suite `(stdin)': NOK (<(stdin):2> output mismatch) +> In addition, <(stdin):2> got signal SIGTERM. $ ${bindir:=.}/tesh diff --git a/tools/tesh/tesh.py b/tools/tesh/tesh.py index 2baa1b31df..a0e96d5072 100755 --- a/tools/tesh/tesh.py +++ b/tools/tesh/tesh.py @@ -196,6 +196,7 @@ class TeshState(Singleton): self.args_suffix = "" self.ignore_regexps_common = [] self.jenkins = False # not a Jenkins run by default + self.auto_valgrind = True self.timeout = 10 # default value: 10 sec self.wrapper = None self.keep = False @@ -237,6 +238,7 @@ class Cmd: self.output_display = False self.sort = -1 + self.rerun_with_valgrind = False self.ignore_regexps = TeshState().ignore_regexps_common @@ -302,6 +304,14 @@ class Cmd: _thread.start_new_thread(Cmd._run, (self, lock)) else: self._run() + if self.rerun_with_valgrind and TeshState().auto_valgrind: + print('\n\n\nXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') + print( 'XXXXXXXXX Rerunning this test with valgrind to help debugging it XXXXXXXXX') + print( 'XXXXXXXX (this will fail if valgrind is not installed, of course) XXXXXXXX') + print( 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n\n\n') + + self.args = "valgrind " + self.args + self._run() return True def _run(self, lock=None): @@ -411,13 +421,17 @@ class Cmd: print('\n'.join(logs)) return - if self.output_display: - logs.append(str(stdout_data)) - # remove text colors ansi_escape = re.compile(r'\x1b[^m]*m') stdout_data = ansi_escape.sub('', stdout_data) + if self.output_display: + logs.append(str(stdout_data)) + + if self.rerun_with_valgrind: + print(str(stdout_data), file=sys.stderr) + return + if self.ignore_output: logs.append("(ignoring the output of <{cmd}> as requested)".format(cmd=cmd_name)) else: @@ -461,6 +475,18 @@ class Cmd: logs.append("Test suite `{file}': NOK (<{cmd}> output mismatch)".format( file=FileReader().filename, cmd=cmd_name)) + + # Also report any failed return code and/or signal we got in case of output mismatch + if not proc.returncode in self.expect_return: + if proc.returncode >= 0: + logs.append("In addition, <{cmd}> returned code {code}.".format( + cmd=cmd_name, code=proc.returncode)) + else: + logs.append("In addition, <{cmd}> got signal {sig}.".format(cmd=cmd_name, + sig=SIGNALS_TO_NAMES_DICT[-proc.returncode])) + if proc.returncode == -signal.SIGSEGV: + self.rerun_with_valgrind = True + if lock is not None: lock.release() if TeshState().keep: @@ -495,6 +521,10 @@ class Cmd: logs.append("Test suite `{file}': NOK (<{cmd}> got signal {sig})".format( file=FileReader().filename, cmd=cmd_name, sig=SIGNALS_TO_NAMES_DICT[-proc.returncode])) + + if proc.returncode == -signal.SIGSEGV: + self.rerun_with_valgrind = True + if lock is not None: lock.release() TeshState().set_return_code(max(-proc.returncode, 1)) @@ -535,6 +565,10 @@ def main(): '--ignore-jenkins', action='store_true', help='ignore all cruft generated on SimGrid continuous integration servers') + group1.add_argument( + '--no-auto-valgrind', + action='store_true', + help='do not automaticall launch segfaulting commands in valgrind') group1.add_argument('--wrapper', metavar='arg', help='Run each command in the provided wrapper (eg valgrind)') group1.add_argument( '--keep', @@ -568,6 +602,9 @@ def main(): ] TeshState().jenkins = True # This is a Jenkins build + if options.no_auto_valgrind: + TeshState().auto_valgrind = False + if options.teshfile is None: file = FileReader(None) print("Test suite from stdin")