Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'plugins-energy-battery-interaction' into 'master'
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 13 Sep 2023 13:44:11 +0000 (13:44 +0000)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 13 Sep 2023 13:44:11 +0000 (13:44 +0000)
Battery plugin revamp

See merge request simgrid/simgrid!168

127 files changed:
BuildSimGrid.sh
CMakeLists.txt
ChangeLog
MANIFEST.in
docs/source/Installing_SimGrid.rst
docs/source/Models.rst
docs/source/Platform_routing.rst
examples/c/CMakeLists.txt
examples/c/activityset-testany/activityset-testany.c [new file with mode: 0644]
examples/c/activityset-testany/activityset-testany.tesh [new file with mode: 0644]
examples/c/activityset-waitall/activityset-waitall.c [new file with mode: 0644]
examples/c/activityset-waitall/activityset-waitall.tesh [new file with mode: 0644]
examples/c/activityset-waitallfor/activityset-waitallfor.c [new file with mode: 0644]
examples/c/activityset-waitallfor/activityset-waitallfor.tesh [new file with mode: 0644]
examples/c/activityset-waitany/activityset-waitany.c [new file with mode: 0644]
examples/c/activityset-waitany/activityset-waitany.tesh [new file with mode: 0644]
examples/c/app-chainsend/broadcaster.c
examples/c/app-chainsend/chainsend.h
examples/c/app-chainsend/peer.c
examples/c/comm-waitall/comm-waitall.c [deleted file]
examples/c/comm-waitall/comm-waitall.tesh [deleted file]
examples/c/comm-waitall/comm-waitall_d.xml [deleted file]
examples/c/comm-waitany/comm-waitany.c [deleted file]
examples/c/comm-waitany/comm-waitany.tesh [deleted file]
examples/c/comm-waitany/comm-waitany_d.xml [deleted file]
examples/c/exec-waitany/exec-waitany.c [deleted file]
examples/c/exec-waitany/exec-waitany.tesh [deleted file]
examples/cpp/CMakeLists.txt
examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp
examples/cpp/activityset-waitall/s4u-activityset-waitall.tesh
examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.cpp
examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh
examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp
examples/cpp/activityset-waitany/s4u-activityset-waitany.tesh
examples/cpp/app-chainsend/s4u-app-chainsend.cpp
examples/cpp/clusters-multicpu/s4u-clusters-multicpu.cpp
examples/cpp/comm-failure/s4u-comm-failure.cpp
examples/cpp/comm-failure/s4u-comm-failure.tesh
examples/cpp/comm-ready/s4u-comm-ready.cpp
examples/cpp/energy-link/s4u-energy-link.cpp
examples/cpp/exec-dependent/s4u-exec-dependent.cpp
examples/cpp/io-dependent/s4u-io-dependent.cpp
examples/cpp/network-factors/s4u-network-factors.cpp
examples/cpp/network-nonlinear/s4u-network-nonlinear.cpp
examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.cpp
examples/cpp/platform-failures/s4u-platform-failures.cpp
examples/cpp/plugin-jbod/s4u-plugin-jbod.cpp [new file with mode: 0644]
examples/cpp/plugin-jbod/s4u-plugin-jbod.tesh [new file with mode: 0644]
examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp
examples/python/CMakeLists.txt
examples/python/activityset-testany/activityset-testany.py [new file with mode: 0644]
examples/python/activityset-testany/activityset-testany.tesh [new file with mode: 0644]
examples/python/activityset-waitall/activityset-waitall.py [new file with mode: 0644]
examples/python/activityset-waitall/activityset-waitall.tesh [new file with mode: 0644]
examples/python/activityset-waitallfor/activityset-waitallfor.py [new file with mode: 0644]
examples/python/activityset-waitallfor/activityset-waitallfor.tesh [new file with mode: 0644]
examples/python/activityset-waitany/activityset-waitany.py [new file with mode: 0644]
examples/python/activityset-waitany/activityset-waitany.tesh [new file with mode: 0644]
examples/python/clusters-multicpu/clusters-multicpu.py
examples/python/comm-failure/comm-failure.py
examples/python/comm-failure/comm-failure.tesh
examples/python/comm-ready/comm-ready.py
examples/python/comm-testany/comm-testany.py [deleted file]
examples/python/comm-testany/comm-testany.tesh [deleted file]
examples/python/comm-waitall/comm-waitall.py [deleted file]
examples/python/comm-waitall/comm-waitall.tesh [deleted file]
examples/python/comm-waitallfor/comm-waitallfor.py [deleted file]
examples/python/comm-waitallfor/comm-waitallfor.tesh [deleted file]
examples/python/comm-waitany/comm-waitany.py [deleted file]
examples/python/comm-waitany/comm-waitany.tesh [deleted file]
examples/python/network-nonlinear/network-nonlinear.py
examples/python/platform-comm-serialize/platform-comm-serialize.py
examples/python/plugin-host-load/plugin-host-load.py
examples/python/task-switch-host/task-switch-host.py
examples/smpi/comm_dynamic_costs/comm-dynamic-cost.cpp
examples/smpi/smpi_s4u_masterworker/masterworker_mailbox_smpi.cpp
examples/smpi/smpi_s4u_masterworker/s4u_smpi.tesh
include/simgrid/activity_set.h [new file with mode: 0644]
include/simgrid/comm.h
include/simgrid/exec.h
include/simgrid/forward.h
include/simgrid/plugins/jbod.hpp [new file with mode: 0644]
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/ActivitySet.hpp
include/simgrid/s4u/Comm.hpp
include/simgrid/s4u/Exec.hpp
include/simgrid/s4u/Io.hpp
include/simgrid/s4u/Mailbox.hpp
include/simgrid/s4u/NetZone.hpp
include/smpi/smpi.h
src/bindings/python/simgrid_python.cpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/CommImpl.hpp
src/kernel/actor/CommObserver.hpp
src/kernel/xml/sg_platf.cpp
src/mc/api/strategy/UniformStrategy.hpp
src/plugins/jbod.cpp [new file with mode: 0644]
src/s4u/s4u_Activity.cpp
src/s4u/s4u_ActivitySet.cpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Exec.cpp
src/s4u/s4u_Io.cpp
src/s4u/s4u_Mailbox.cpp
src/s4u/s4u_Netzone.cpp
src/smpi/bindings/smpi_pmpi_comm.cpp
src/smpi/include/private.hpp
src/smpi/internals/smpi_actor.cpp
src/smpi/internals/smpi_deployment.cpp
teshsuite/models/cm02-set-lat-bw/cm02-set-lat-bw.cpp
teshsuite/models/cm02-tcpgamma/cm02-tcpgamma.cpp
teshsuite/models/issue105/issue105.cpp
teshsuite/models/ptask-subflows/ptask-subflows.cpp
teshsuite/python/platform-mix/platform-mix.py
teshsuite/s4u/CMakeLists.txt
teshsuite/s4u/activity-lifecycle/testing_comm.cpp
teshsuite/s4u/activity-lifecycle/testing_test-wait.cpp
teshsuite/s4u/comm-fault-scenarios/comm-fault-scenarios.cpp
teshsuite/s4u/monkey-masterworkers/monkey-masterworkers.cpp
teshsuite/s4u/monkey-masterworkers/monkey-masterworkers.py
teshsuite/s4u/monkey-semaphore/monkey-semaphore.cpp
teshsuite/s4u/seal-platform/seal-platform.cpp
teshsuite/s4u/wait-all-for/wait-all-for.cpp [deleted file]
teshsuite/s4u/wait-all-for/wait-all-for.tesh [deleted file]
teshsuite/s4u/wait-any-for/wait-any-for.cpp [deleted file]
teshsuite/s4u/wait-any-for/wait-any-for.tesh [deleted file]
tools/cmake/DefinePackages.cmake
tools/jenkins/build.sh

index 773ca48..66b309e 100755 (executable)
@@ -15,7 +15,7 @@ if [ ! -e Makefile ] && [ ! -e build.ninja ]; then
   fi
 fi
 
-target=tests
+target=examples
 ncores=$(grep -c processor /proc/cpuinfo)
 
 install_path=$(sed -n 's/^CMAKE_INSTALL_PREFIX:PATH=//p' CMakeCache.txt)
index 545604e..1a4a245 100644 (file)
@@ -757,6 +757,8 @@ add_custom_target(tests-mc COMMENT "Recompiling the MC tests and tools.")
 add_dependencies(tests tests-mc)
 add_custom_target(tests-ns3 COMMENT "Recompiling the ns3 tests and tools.")
 add_dependencies(tests tests-ns3)
+add_custom_target(examples COMMENT "Recompiling all examples")
+add_dependencies(examples tests)
 
 ### Build some Maintainer files
 include(${CMAKE_HOME_DIRECTORY}/tools/cmake/MaintainerMode.cmake)
index 6e5f715..b35e98a 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,9 +2,27 @@ SimGrid (3.34.1) not released (Target: fall 2023)
 
 S4U:
  - New class ActivitySet to ease wait_any()/test_any()/wait_all()
+   - Deprecate {Comm,Io,Exec}::{wait_any,wait_all,test_any} and friends
+ - New function NetZone::add_route(host1, host2, links) when you don't need gateways
+   Also add a variant with s4u::Link, when you don't want to specify the directions
+   on symmetric routes.
+ - Introduce a Mailbox::get_async() with no payload parameter. You can use the new 
+   Comm::get_payload() once the communication is over to retrieve the payload.
+
+SMPI:
+ - New SMPI_app_instance_join(): wait for the completion of a started MPI instance
+ - MPI_UNIVERSE_SIZE now initialized to the total amount of hosts in the platform
 
 Python:
  - Make the host_load plugin available from Python. See examples/python/plugin-host-load
+ - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
+ - Comm::waitall/waitany/testany() are gone. Please use ActivitySet() instead.
+ - Comm::waitallfor() is gone too. Its semantic was unclear on timeout anyway.
+ - Io::waitany() and waitanyfor() are gone. Please use ActivitySet() instead.
+
+C API:
+ - Introduce sg_activity_set_t and deprecate wait_all/wait_any/test_any for
+   Exec, Io and Comm.
 
 ----------------------------------------------------------------------------
 
index 85b0de1..e0c96c2 100644 (file)
@@ -1,6 +1,14 @@
 # This file lists the content of the python source package
 # Prepared in tools/cmake/Distrib.cmake
 
+include examples/c/activityset-testany/activityset-testany.c
+include examples/c/activityset-testany/activityset-testany.tesh
+include examples/c/activityset-waitall/activityset-waitall.c
+include examples/c/activityset-waitall/activityset-waitall.tesh
+include examples/c/activityset-waitallfor/activityset-waitallfor.c
+include examples/c/activityset-waitallfor/activityset-waitallfor.tesh
+include examples/c/activityset-waitany/activityset-waitany.c
+include examples/c/activityset-waitany/activityset-waitany.tesh
 include examples/c/actor-create/actor-create.c
 include examples/c/actor-create/actor-create.tesh
 include examples/c/actor-create/actor-create_d.xml
@@ -65,12 +73,6 @@ include examples/c/comm-wait/comm-wait2_d.xml
 include examples/c/comm-wait/comm-wait3_d.xml
 include examples/c/comm-wait/comm-wait4_d.xml
 include examples/c/comm-wait/comm-wait_d.xml
-include examples/c/comm-waitall/comm-waitall.c
-include examples/c/comm-waitall/comm-waitall.tesh
-include examples/c/comm-waitall/comm-waitall_d.xml
-include examples/c/comm-waitany/comm-waitany.c
-include examples/c/comm-waitany/comm-waitany.tesh
-include examples/c/comm-waitany/comm-waitany_d.xml
 include examples/c/dht-kademlia/answer.c
 include examples/c/dht-kademlia/answer.h
 include examples/c/dht-kademlia/common.h
@@ -102,8 +104,6 @@ include examples/c/exec-dvfs/exec-dvfs.c
 include examples/c/exec-dvfs/exec-dvfs.tesh
 include examples/c/exec-remote/exec-remote.c
 include examples/c/exec-remote/exec-remote.tesh
-include examples/c/exec-waitany/exec-waitany.c
-include examples/c/exec-waitany/exec-waitany.tesh
 include examples/c/io-disk-raw/io-disk-raw.c
 include examples/c/io-disk-raw/io-disk-raw.tesh
 include examples/c/io-file-remote/io-file-remote.c
@@ -359,6 +359,8 @@ include examples/cpp/platform-properties/s4u-platform-properties.cpp
 include examples/cpp/platform-properties/s4u-platform-properties.tesh
 include examples/cpp/plugin-host-load/s4u-plugin-host-load.cpp
 include examples/cpp/plugin-host-load/s4u-plugin-host-load.tesh
+include examples/cpp/plugin-jbod/s4u-plugin-jbod.cpp
+include examples/cpp/plugin-jbod/s4u-plugin-jbod.tesh
 include examples/cpp/plugin-link-load/s4u-plugin-link-load.cpp
 include examples/cpp/plugin-link-load/s4u-plugin-link-load.tesh
 include examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp
@@ -414,6 +416,14 @@ include examples/cpp/trace-process-migration/s4u-trace-process-migration.cpp
 include examples/cpp/trace-process-migration/s4u-trace-process-migration.tesh
 include examples/cpp/trace-route-user-variables/s4u-trace-route-user-variables.cpp
 include examples/cpp/trace-route-user-variables/s4u-trace-route-user-variables.tesh
+include examples/python/activityset-testany/activityset-testany.py
+include examples/python/activityset-testany/activityset-testany.tesh
+include examples/python/activityset-waitall/activityset-waitall.py
+include examples/python/activityset-waitall/activityset-waitall.tesh
+include examples/python/activityset-waitallfor/activityset-waitallfor.py
+include examples/python/activityset-waitallfor/activityset-waitallfor.tesh
+include examples/python/activityset-waitany/activityset-waitany.py
+include examples/python/activityset-waitany/activityset-waitany.tesh
 include examples/python/actor-create/actor-create.py
 include examples/python/actor-create/actor-create.tesh
 include examples/python/actor-daemon/actor-daemon.py
@@ -444,18 +454,10 @@ include examples/python/comm-ready/comm-ready.py
 include examples/python/comm-ready/comm-ready.tesh
 include examples/python/comm-suspend/comm-suspend.py
 include examples/python/comm-suspend/comm-suspend.tesh
-include examples/python/comm-testany/comm-testany.py
-include examples/python/comm-testany/comm-testany.tesh
 include examples/python/comm-throttling/comm-throttling.py
 include examples/python/comm-throttling/comm-throttling.tesh
 include examples/python/comm-wait/comm-wait.py
 include examples/python/comm-wait/comm-wait.tesh
-include examples/python/comm-waitall/comm-waitall.py
-include examples/python/comm-waitall/comm-waitall.tesh
-include examples/python/comm-waitallfor/comm-waitallfor.py
-include examples/python/comm-waitallfor/comm-waitallfor.tesh
-include examples/python/comm-waitany/comm-waitany.py
-include examples/python/comm-waitany/comm-waitany.tesh
 include examples/python/comm-waituntil/comm-waituntil.py
 include examples/python/comm-waituntil/comm-waituntil.tesh
 include examples/python/exec-async/exec-async.py
@@ -824,10 +826,6 @@ include teshsuite/s4u/vm-live-migration/vm-live-migration.cpp
 include teshsuite/s4u/vm-live-migration/vm-live-migration.tesh
 include teshsuite/s4u/vm-suicide/vm-suicide.cpp
 include teshsuite/s4u/vm-suicide/vm-suicide.tesh
-include teshsuite/s4u/wait-all-for/wait-all-for.cpp
-include teshsuite/s4u/wait-all-for/wait-all-for.tesh
-include teshsuite/s4u/wait-any-for/wait-any-for.cpp
-include teshsuite/s4u/wait-any-for/wait-any-for.tesh
 include teshsuite/smpi/MBI/CollArgGenerator.py
 include teshsuite/smpi/MBI/CollComGenerator.py
 include teshsuite/smpi/MBI/CollLocalConcurrencyGenerator.py
@@ -1901,6 +1899,7 @@ include examples/smpi/replay_multiple_manual_deploy/CMakeLists.txt
 include examples/smpi/smpi_s4u_masterworker/CMakeLists.txt
 include examples/sthread/CMakeLists.txt
 include include/simgrid/Exception.hpp
+include include/simgrid/activity_set.h
 include include/simgrid/actor.h
 include include/simgrid/barrier.h
 include include/simgrid/chrono.hpp
@@ -1940,6 +1939,7 @@ include include/simgrid/plugins/battery.hpp
 include include/simgrid/plugins/dvfs.h
 include include/simgrid/plugins/energy.h
 include include/simgrid/plugins/file_system.h
+include include/simgrid/plugins/jbod.hpp
 include include/simgrid/plugins/live_migration.h
 include include/simgrid/plugins/load.h
 include include/simgrid/plugins/ns3.hpp
@@ -2316,6 +2316,7 @@ include src/plugins/file_system/s4u_FileSystem.cpp
 include src/plugins/host_dvfs.cpp
 include src/plugins/host_energy.cpp
 include src/plugins/host_load.cpp
+include src/plugins/jbod.cpp
 include src/plugins/link_energy.cpp
 include src/plugins/link_energy_wifi.cpp
 include src/plugins/link_load.cpp
index 7cd2404..477b53e 100644 (file)
@@ -325,7 +325,7 @@ existing targets are not really for public consumption so don't worry
 if some do not work for you.
 
 - **make**: Build the core of SimGrid that gets installed, but not any example.
-- **make tests**: Build the tests and examples.
+- **make examples**: Build the examples, which are needed by the tests.
 - **make simgrid**: Build only the SimGrid library. Not any example nor the helper tools.
 - **make s4u-comm-pingpong**: Build only this example (works for any example)
 - **make python-bindings**: Build the Python bindings
index 229d61c..e8ee901 100644 (file)
@@ -125,7 +125,7 @@ small delay that corresponds to the end-to-end latency. During that time, the co
 communications are not slowed down, because there is no contention yet).
 
 As an alternative to the above LMM-based models, it is possible to use the :ref:`ns-3 simulator as a network model <models_ns3>`. ns-3 performs
-a mushc more detailed, packet-level simulation 
+a much more detailed, packet-level simulation 
 than the above models. As a result is is much slower but will produce more accurate results. 
 Both simulators have time complexity that is linear in the size of their input, but ns-3 has a much larger input in case of large communications
 because it considers individual network packets. 
index 6cc6d75..fe5a7c4 100644 (file)
@@ -110,11 +110,12 @@ Vivaldi
 =======
 
 This routing model is particularly well adapted to Peer-to-Peer and Clouds platforms: each component is connected to the
-cloud through a private link of which the upload and download rate may be asymmetric.
+cloud through a private link whose upload and download rates may be asymmetric.
 
-The network core (between the private links) is assumed to be over-sized so only the latency is taken into account.
-Instead of a matrix of latencies that would become too large when the amount of peers grows, Vivaldi netzones give a
-coordinate to each peer and compute the latency between host A=(xA,yA,zA) and host B=(xB,yB,zB) as follows:
+The network core (between the private links) is assumed to be  over-provisioned so that only the latency has to be
+taken into account. Instead of a matrix of latencies that would become too large when the amount of peers grows,
+Vivaldi netzones give a coordinate to each peer and compute the latency between host A=(xA,yA,zA) and host B=(xB,yB,zB)
+as follows:
 
   latency = sqrt( (xA-xB)² + (yA-yB)² ) + zA + zB
 
index 25cccf5..3a9d3a6 100644 (file)
@@ -4,11 +4,12 @@
 foreach(x
         actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-stacksize
         actor-suspend actor-yield
+        activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
         app-masterworker app-token-ring
-        comm-pingpong comm-wait comm-waitall comm-waitany
+        comm-pingpong comm-wait
         cloud-capping cloud-masterworker cloud-migration cloud-simple
         dht-pastry
-        exec-async exec-basic exec-dvfs exec-remote exec-waitany
+        exec-async exec-basic exec-dvfs exec-remote
         energy-exec energy-exec-ptask energy-vm
         io-disk-raw io-file-remote io-file-system
         platform-failures platform-properties
@@ -85,8 +86,6 @@ set(xml_files     ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/actor-create/actor-cr
                                ${CMAKE_CURRENT_SOURCE_DIR}/comm-wait/comm-wait2_d.xml
                                ${CMAKE_CURRENT_SOURCE_DIR}/comm-wait/comm-wait3_d.xml
                                ${CMAKE_CURRENT_SOURCE_DIR}/comm-wait/comm-wait4_d.xml
-                               ${CMAKE_CURRENT_SOURCE_DIR}/comm-waitall/comm-waitall_d.xml
-                               ${CMAKE_CURRENT_SOURCE_DIR}/comm-waitany/comm-waitany_d.xml
                                ${CMAKE_CURRENT_SOURCE_DIR}/dht-kademlia/dht-kademlia_d.xml
                                ${CMAKE_CURRENT_SOURCE_DIR}/dht-pastry/dht-pastry_d.xml
                                ${CMAKE_CURRENT_SOURCE_DIR}/io-file-remote/io-file-remote_d.xml
@@ -96,11 +95,12 @@ set(xml_files     ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/actor-create/actor-cr
 foreach(x
         actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-stacksize
         actor-suspend actor-yield
+        activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
         app-bittorrent app-chainsend app-masterworker app-token-ring
-        comm-pingpong comm-wait comm-waitall comm-waitany
+        comm-pingpong comm-wait
         cloud-capping  cloud-masterworker cloud-migration cloud-simple
         dht-kademlia dht-pastry
-        exec-async exec-basic exec-dvfs exec-remote exec-waitany
+        exec-async exec-basic exec-dvfs exec-remote
         energy-exec energy-exec-ptask energy-vm
         io-disk-raw io-file-remote io-file-system
         platform-failures platform-properties
diff --git a/examples/c/activityset-testany/activityset-testany.c b/examples/c/activityset-testany/activityset-testany.c
new file mode 100644 (file)
index 0000000..370e684
--- /dev/null
@@ -0,0 +1,76 @@
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_testany, "Messages specific for this s4u example");
+
+static void bob(int argc, char* argv[])
+{
+  XBT_INFO("Create my asynchronous activities");
+  sg_exec_t exec = sg_actor_exec_init(5e9);
+  sg_exec_start(exec);
+
+  sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+  void* payload     = NULL;
+  sg_comm_t comm    = sg_mailbox_get_async(mbox, &payload);
+
+  sg_activity_set_t pending_activities = sg_activity_set_init();
+  sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+  sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+  XBT_INFO("Sleep_for a while");
+  sg_actor_sleep_for(1);
+
+  XBT_INFO("Test for completed activities");
+  while (!sg_activity_set_empty(pending_activities)) {
+    sg_activity_t completed_one = sg_activity_set_test_any(pending_activities);
+    if (completed_one != NULL) {
+      if (sg_comm_isinstance(completed_one))
+        XBT_INFO("Completed a Comm");
+      if (sg_exec_isinstance(completed_one))
+        XBT_INFO("Completed an Exec");
+    } else {
+      XBT_INFO("Nothing matches, test again in 0.5s");
+      sg_actor_sleep_for(.5);
+    }
+  }
+  XBT_INFO("Last activity is complete");
+  free(payload);
+}
+
+static void alice(int argc, char* argv[])
+{
+  char* payload = xbt_strdup("Message");
+  XBT_INFO("Send '%s'", payload);
+  sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid_init(&argc, argv);
+  xbt_assert(argc > 1,
+             "Usage: %s platform_file\n"
+             "\tExample: %s hosts_with_disks.xml\n",
+             argv[0], argv[0]);
+
+  simgrid_load_platform(argv[1]);
+
+  sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+  sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+  simgrid_run();
+
+  return 0;
+}
diff --git a/examples/c/activityset-testany/activityset-testany.tesh b/examples/c/activityset-testany/activityset-testany.tesh
new file mode 100644 (file)
index 0000000..6293fa3
--- /dev/null
@@ -0,0 +1,19 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n"
+> [0.00] [alice] Send 'Message'
+> [0.00] [  bob] Create my asynchronous activities
+> [0.00] [  bob] Sleep_for a while
+> [1.00] [  bob] Test for completed activities
+> [1.00] [  bob] Nothing matches, test again in 0.5s
+> [1.50] [  bob] Nothing matches, test again in 0.5s
+> [2.00] [  bob] Nothing matches, test again in 0.5s
+> [2.50] [  bob] Nothing matches, test again in 0.5s
+> [3.00] [  bob] Nothing matches, test again in 0.5s
+> [3.50] [  bob] Nothing matches, test again in 0.5s
+> [4.00] [  bob] Nothing matches, test again in 0.5s
+> [4.50] [  bob] Nothing matches, test again in 0.5s
+> [5.00] [  bob] Completed an Exec
+> [5.00] [  bob] Nothing matches, test again in 0.5s
+> [5.50] [  bob] Completed a Comm
+> [5.50] [  bob] Last activity is complete
diff --git a/examples/c/activityset-waitall/activityset-waitall.c b/examples/c/activityset-waitall/activityset-waitall.c
new file mode 100644 (file)
index 0000000..c63f091
--- /dev/null
@@ -0,0 +1,63 @@
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitall, "Messages specific for this s4u example");
+
+static void bob()
+{
+  XBT_INFO("Create my asynchronous activities");
+  sg_exec_t exec = sg_actor_exec_init(5e9);
+  sg_exec_start(exec);
+
+  sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+  void* payload     = NULL;
+  sg_comm_t comm    = sg_mailbox_get_async(mbox, &payload);
+
+  sg_activity_set_t pending_activities = sg_activity_set_init();
+  sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+  sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+  XBT_INFO("Wait for asynchronous activities to complete, all in one shot.");
+  sg_activity_set_wait_all(pending_activities);
+
+  XBT_INFO("All activities are completed.");
+  free(payload);
+}
+
+static void alice()
+{
+  char* payload = xbt_strdup("Message");
+  XBT_INFO("Send '%s'", payload);
+  sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid_init(&argc, argv);
+  xbt_assert(argc > 1,
+             "Usage: %s platform_file\n"
+             "\tExample: %s hosts_with_disks.xml\n",
+             argv[0], argv[0]);
+
+  simgrid_load_platform(argv[1]);
+
+  sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+  sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+  simgrid_run();
+
+  return 0;
+}
diff --git a/examples/c/activityset-waitall/activityset-waitall.tesh b/examples/c/activityset-waitall/activityset-waitall.tesh
new file mode 100644 (file)
index 0000000..1093563
--- /dev/null
@@ -0,0 +1,7 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitall ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchronous activities to complete, all in one shot.
+> [5.197828] [  bob] All activities are completed.
diff --git a/examples/c/activityset-waitallfor/activityset-waitallfor.c b/examples/c/activityset-waitallfor/activityset-waitallfor.c
new file mode 100644 (file)
index 0000000..93bbe67
--- /dev/null
@@ -0,0 +1,76 @@
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitallfor, "Messages specific for this s4u example");
+
+static void bob()
+{
+  XBT_INFO("Create my asynchronous activities");
+  sg_exec_t exec = sg_actor_exec_init(5e9);
+  sg_exec_start(exec);
+
+  sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+  void* payload     = NULL;
+  sg_comm_t comm    = sg_mailbox_get_async(mbox, &payload);
+
+  sg_activity_set_t pending_activities = sg_activity_set_init();
+  sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+  sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+  XBT_INFO("Wait for asynchronous activities to complete");
+  while (!sg_activity_set_empty(pending_activities)) {
+    if (!sg_activity_set_wait_all_for(pending_activities, 1)) {
+      XBT_INFO("Not all activities are terminated yet.");
+    }
+
+    sg_activity_t completed_one = sg_activity_set_test_any(pending_activities);
+    while (completed_one != NULL) {
+      if (sg_comm_isinstance(completed_one))
+        XBT_INFO("Completed a Comm");
+      if (sg_exec_isinstance(completed_one))
+        XBT_INFO("Completed an Exec");
+      completed_one = sg_activity_set_test_any(pending_activities);
+    }
+  }
+
+  XBT_INFO("Last activity is complete");
+  free(payload);
+}
+
+static void alice()
+{
+  char* payload = xbt_strdup("Message");
+  XBT_INFO("Send '%s'", payload);
+  sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid_init(&argc, argv);
+  xbt_assert(argc > 1,
+             "Usage: %s platform_file\n"
+             "\tExample: %s hosts_with_disks.xml\n",
+             argv[0], argv[0]);
+
+  simgrid_load_platform(argv[1]);
+
+  sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+  sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+  simgrid_run();
+
+  return 0;
+}
diff --git a/examples/c/activityset-waitallfor/activityset-waitallfor.tesh b/examples/c/activityset-waitallfor/activityset-waitallfor.tesh
new file mode 100644 (file)
index 0000000..26a73c5
--- /dev/null
@@ -0,0 +1,14 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitallfor ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchronous activities to complete
+> [1.000000] [  bob] Not all activities are terminated yet.
+> [2.000000] [  bob] Not all activities are terminated yet.
+> [3.000000] [  bob] Not all activities are terminated yet.
+> [4.000000] [  bob] Not all activities are terminated yet.
+> [5.000000] [  bob] Not all activities are terminated yet.
+> [5.000000] [  bob] Completed an Exec
+> [5.197828] [  bob] Completed a Comm
+> [5.197828] [  bob] Last activity is complete
diff --git a/examples/c/activityset-waitany/activityset-waitany.c b/examples/c/activityset-waitany/activityset-waitany.c
new file mode 100644 (file)
index 0000000..183693f
--- /dev/null
@@ -0,0 +1,71 @@
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+
+static void bob()
+{
+  XBT_INFO("Create my asynchronous activities");
+  sg_exec_t exec = sg_actor_exec_init(5e9);
+  sg_exec_start(exec);
+
+  sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+  void* payload     = NULL;
+  sg_comm_t comm    = sg_mailbox_get_async(mbox, &payload);
+
+  sg_activity_set_t pending_activities = sg_activity_set_init();
+  sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+  sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+  XBT_INFO("Wait for asynchronous activities to complete");
+  while (!sg_activity_set_empty(pending_activities)) {
+
+    sg_activity_t completed_one = sg_activity_set_wait_any(pending_activities);
+    if (sg_comm_isinstance(completed_one))
+      XBT_INFO("Completed a Comm");
+    else if (sg_exec_isinstance(completed_one))
+      XBT_INFO("Completed an Exec");
+    else
+      xbt_die("This activity set is supposed to only contain Comm or Exec");
+  }
+  XBT_INFO("Last activity is complete");
+  free(payload);
+}
+
+static void alice()
+{
+  char* payload = xbt_strdup("Message");
+  XBT_INFO("Send '%s'", payload);
+  sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid_init(&argc, argv);
+  xbt_assert(argc > 1,
+             "Usage: %s platform_file\n"
+             "\tExample: %s hosts_with_disks.xml\n",
+             argv[0], argv[0]);
+
+  simgrid_load_platform(argv[1]);
+
+  sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+  sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+  simgrid_run();
+
+  return 0;
+}
diff --git a/examples/c/activityset-waitany/activityset-waitany.tesh b/examples/c/activityset-waitany/activityset-waitany.tesh
new file mode 100644 (file)
index 0000000..dac02a8
--- /dev/null
@@ -0,0 +1,9 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchronous activities to complete
+> [5.000000] [  bob] Completed an Exec
+> [5.197828] [  bob] Completed a Comm
+> [5.197828] [  bob] Last activity is complete
index 3e71581..7133408 100644 (file)
@@ -36,16 +36,14 @@ static void broadcaster_build_chain(broadcaster_t bc)
 
 static void broadcaster_send_file(const_broadcaster_t bc)
 {
-  int nb_pending_sends = 0;
-
   for (unsigned int current_piece = 0; current_piece < bc->piece_count; current_piece++) {
     XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg_host_self_get_name(),
               sg_mailbox_get_name(bc->first));
     char* file_piece = bprintf("piece-%u", current_piece);
     sg_comm_t comm   = sg_mailbox_put_async(bc->first, file_piece, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
-    bc->pending_sends[nb_pending_sends++] = comm;
+    sg_activity_set_push(bc->pending_sends, (sg_activity_t)comm);
   }
-  sg_comm_wait_all(bc->pending_sends, nb_pending_sends);
+  sg_activity_set_wait_all(bc->pending_sends);
 }
 
 static broadcaster_t broadcaster_init(sg_mailbox_t* mailboxes, unsigned int host_count, unsigned int piece_count)
@@ -56,7 +54,7 @@ static broadcaster_t broadcaster_init(sg_mailbox_t* mailboxes, unsigned int host
   bc->host_count    = host_count;
   bc->piece_count   = piece_count;
   bc->mailboxes     = mailboxes;
-  bc->pending_sends = xbt_malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
+  bc->pending_sends = sg_activity_set_init();
 
   broadcaster_build_chain(bc);
 
@@ -65,7 +63,7 @@ static broadcaster_t broadcaster_init(sg_mailbox_t* mailboxes, unsigned int host
 
 static void broadcaster_destroy(broadcaster_t bc)
 {
-  xbt_free(bc->pending_sends);
+  sg_activity_set_delete(bc->pending_sends);
   xbt_free(bc->mailboxes);
   xbt_free(bc);
 }
index 8bbe961..86a86aa 100644 (file)
@@ -7,6 +7,7 @@
 #ifndef CHAINSEND_H
 #define CHAINSEND_H
 
+#include "simgrid/activity_set.h"
 #include "simgrid/actor.h"
 #include "simgrid/comm.h"
 #include "simgrid/engine.h"
@@ -30,7 +31,7 @@ typedef struct s_broadcaster {
   unsigned int piece_count;
   sg_mailbox_t first;
   sg_mailbox_t* mailboxes;
-  sg_comm_t* pending_sends;
+  sg_activity_set_t pending_sends;
 } s_broadcaster_t;
 
 typedef s_broadcaster_t* broadcaster_t;
@@ -54,8 +55,8 @@ typedef struct s_peer {
   unsigned long long received_bytes;
   unsigned int received_pieces;
   unsigned int total_pieces;
-  sg_comm_t* pending_recvs;
-  sg_comm_t* pending_sends;
+  sg_activity_set_t pending_recvs;
+  sg_activity_set_t pending_sends;
 } s_peer_t;
 
 typedef s_peer_t* peer_t;
index 513c123..f4591f8 100644 (file)
@@ -1,5 +1,4 @@
-/* Copyright (c) 2012-2023. The SimGrid Team.
- * All rights reserved.                                                     */
+/* Copyright (c) 2012-2023. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -22,26 +21,21 @@ static void peer_join_chain(peer_t p)
 static void peer_forward_file(peer_t p)
 {
   void* received;
-  int done                = 0;
-  size_t nb_pending_sends = 0;
-  size_t nb_pending_recvs = 0;
+  int done = 0;
 
   while (!done) {
-    p->pending_recvs[nb_pending_recvs] = sg_mailbox_get_async(p->me, &received);
-    nb_pending_recvs++;
+    sg_activity_set_push(p->pending_recvs, (sg_activity_t)sg_mailbox_get_async(p->me, &received));
 
-    ssize_t idx = sg_comm_wait_any(p->pending_recvs, nb_pending_recvs);
-    if (idx != -1) {
+    sg_activity_t acti = sg_activity_set_wait_any(p->pending_recvs);
+    if (acti != NULL) {
+      sg_comm_unref((sg_comm_t)acti);
       XBT_DEBUG("Peer %s got a 'SEND_DATA' message", sg_mailbox_get_name(p->me));
-      /* move the last pending comm where the finished one was, and decrement */
-      p->pending_recvs[idx] = p->pending_recvs[--nb_pending_recvs];
 
       if (p->next != NULL) {
         XBT_DEBUG("Sending %s (asynchronously) from %s to %s", (char*)received, sg_mailbox_get_name(p->me),
                   sg_mailbox_get_name(p->next));
         sg_comm_t send = sg_mailbox_put_async(p->next, received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
-        p->pending_sends[nb_pending_sends] = send;
-        nb_pending_sends++;
+        sg_activity_set_push(p->pending_sends, (sg_activity_t)send);
       } else
         free(received);
 
@@ -53,7 +47,7 @@ static void peer_forward_file(peer_t p)
       }
     }
   }
-  sg_comm_wait_all(p->pending_sends, nb_pending_sends);
+  sg_activity_set_wait_all(p->pending_sends);
 }
 
 static peer_t peer_init(int argc, char* argv[])
@@ -63,8 +57,8 @@ static peer_t peer_init(int argc, char* argv[])
   p->next            = NULL;
   p->received_pieces = 0;
   p->received_bytes  = 0;
-  p->pending_recvs   = xbt_malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
-  p->pending_sends   = xbt_malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
+  p->pending_recvs   = sg_activity_set_init();
+  p->pending_sends   = sg_activity_set_init();
 
   p->me = sg_mailbox_by_name(sg_host_self_get_name());
 
@@ -73,8 +67,8 @@ static peer_t peer_init(int argc, char* argv[])
 
 static void peer_delete(peer_t p)
 {
-  xbt_free(p->pending_recvs);
-  xbt_free(p->pending_sends);
+  sg_activity_set_delete(p->pending_recvs);
+  sg_activity_set_delete(p->pending_sends);
 
   xbt_free(p);
 }
diff --git a/examples/c/comm-waitall/comm-waitall.c b/examples/c/comm-waitall/comm-waitall.c
deleted file mode 100644 (file)
index 4477c68..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/actor.h"
-#include "simgrid/comm.h"
-#include "simgrid/engine.h"
-#include "simgrid/host.h"
-#include "simgrid/mailbox.h"
-
-#include "xbt/log.h"
-#include "xbt/str.h"
-#include "xbt/sysdep.h"
-
-#include <stdio.h> /* snprintf */
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(comm_waitall, "Messages specific for this example");
-
-static void sender(int argc, char* argv[])
-{
-  xbt_assert(argc == 4, "This function expects 3 parameters from the XML deployment file");
-  long messages_count  = xbt_str_parse_int(argv[1], "Invalid message count");
-  long message_size    = xbt_str_parse_int(argv[2], "Invalid message size");
-  long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers");
-  xbt_assert(receivers_count > 0);
-
-  /* Array in which we store all ongoing communications */
-  sg_comm_t* pending_comms = xbt_malloc(sizeof(sg_comm_t) * (messages_count + receivers_count));
-  int pending_comms_count  = 0;
-
-  /* Make an array of the mailboxes to use */
-  sg_mailbox_t* mboxes = xbt_malloc(sizeof(sg_mailbox_t) * receivers_count);
-  for (long i = 0; i < receivers_count; i++) {
-    char mailbox_name[80];
-    snprintf(mailbox_name, 79, "receiver-%ld", i);
-    sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
-    mboxes[i]         = mbox;
-  }
-
-  /* Start dispatching all messages to receivers, in a round robin fashion */
-  for (long i = 0; i < messages_count; i++) {
-    char msg_content[80];
-    snprintf(msg_content, 79, "Message %ld", i);
-    sg_mailbox_t mbox = mboxes[i % receivers_count];
-    XBT_INFO("Send '%s' to '%s'", msg_content, sg_mailbox_get_name(mbox));
-    /* Create a communication representing the ongoing communication, and store it in pending_comms */
-    pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, xbt_strdup(msg_content), message_size);
-  }
-
-  /* Start sending messages to let the workers know that they should stop */
-  for (long i = 0; i < receivers_count; i++) {
-    XBT_INFO("Send 'finalize' to 'receiver-%ld'", i);
-    char* end_msg                        = xbt_strdup("finalize");
-    sg_mailbox_t mbox                    = mboxes[i % receivers_count];
-    pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, end_msg, 0);
-  }
-
-  XBT_INFO("Done dispatching all messages");
-
-  /* Now that all message exchanges were initiated, wait for their completion in one single call */
-  sg_comm_wait_all(pending_comms, pending_comms_count);
-
-  xbt_free(pending_comms);
-  xbt_free(mboxes);
-
-  XBT_INFO("Goodbye now!");
-}
-
-static void receiver(int argc, char* argv[])
-{
-  xbt_assert(argc == 2, "Expecting one parameter from the XML deployment file but got %d", argc);
-  int id = (int)xbt_str_parse_int(argv[1], "ID should be numerical");
-  char mailbox_name[80];
-  snprintf(mailbox_name, 79, "receiver-%d", id);
-  sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
-  XBT_INFO("Wait for my first message");
-  while (1) {
-    char* received = (char*)sg_mailbox_get(mbox);
-    XBT_INFO("I got a '%s'.", received);
-    if (!strcmp(received, "finalize")) { // If it's a finalize message, we're done
-      xbt_free(received);
-      break;
-    }
-    xbt_free(received);
-  }
-}
-
-int main(int argc, char* argv[])
-{
-  simgrid_init(&argc, argv);
-  xbt_assert(argc > 2,
-             "Usage: %s platform_file deployment_file\n"
-             "\tExample: %s platform.xml deployment.xml\n",
-             argv[0], argv[0]);
-
-  simgrid_load_platform(argv[1]);
-
-  simgrid_register_function("sender", sender);
-  simgrid_register_function("receiver", receiver);
-  simgrid_load_deployment(argv[2]);
-
-  simgrid_run();
-
-  return 0;
-}
diff --git a/examples/c/comm-waitall/comm-waitall.tesh b/examples/c/comm-waitall/comm-waitall.tesh
deleted file mode 100644 (file)
index 1c3cfe6..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/usr/bin/env tesh
-
-! output sort 19
-$ ${bindir:=.}/c-comm-waitall ${platfdir:=.}/small_platform_fatpipe.xml ${srcdir:=.}/comm-waitall_d.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
-> [  0.000000] (2:receiver@Ruby) Wait for my first message
-> [  0.000000] (3:receiver@Perl) Wait for my first message
-> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [  0.004022] (2:receiver@Ruby) I got a 'Message 0'.
-> [  0.004022] (3:receiver@Perl) I got a 'Message 1'.
-> [  0.008043] (2:receiver@Ruby) I got a 'Message 2'.
-> [  0.008043] (3:receiver@Perl) I got a 'Message 3'.
-> [  0.009995] (3:receiver@Perl) I got a 'finalize'.
-> [  0.012065] (2:receiver@Ruby) I got a 'Message 4'.
-> [  0.014016] (2:receiver@Ruby) I got a 'finalize'.
-> [  0.014016] (1:sender@Tremblay) Goodbye now!
diff --git a/examples/c/comm-waitall/comm-waitall_d.xml b/examples/c/comm-waitall/comm-waitall_d.xml
deleted file mode 100644 (file)
index 8f9d88b..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-<?xml version='1.0'?>
-<!DOCTYPE platform SYSTEM "https://simgrid.org/simgrid.dtd">
-<platform version="4.1">
-  <!-- The master actor (with some arguments) -->
-  <actor host="Tremblay" function="sender">
-    <argument value="5"/>       <!-- Number of messages -->
-    <argument value="1000000"/> <!-- Size of messages -->
-    <argument value="2"/>       <!-- Number of receivers -->
-  </actor>
-  <!-- The receiver actors -->
-  <actor host="Ruby" function="receiver">
-    <argument value="0"/>
-  </actor>
-  <actor host="Perl" function="receiver">
-    <argument value="1"/>
-  </actor>
-</platform>
diff --git a/examples/c/comm-waitany/comm-waitany.c b/examples/c/comm-waitany/comm-waitany.c
deleted file mode 100644 (file)
index 03504d8..0000000
+++ /dev/null
@@ -1,122 +0,0 @@
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/actor.h"
-#include "simgrid/comm.h"
-#include "simgrid/engine.h"
-#include "simgrid/forward.h"
-#include "simgrid/mailbox.h"
-#include "xbt/log.h"
-#include "xbt/str.h"
-#include "xbt/sysdep.h"
-
-#include <stdio.h> /* snprintf */
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(comm_waitany, "Messages specific for this example");
-
-static void sender(int argc, char* argv[])
-{
-  xbt_assert(argc == 4, "Expecting 3 parameters from the XML deployment file but got %d", argc);
-  long messages_count  = xbt_str_parse_int(argv[1], "Invalid message count");
-  long msg_size        = xbt_str_parse_int(argv[2], "Invalid message size");
-  long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers");
-  xbt_assert(receivers_count > 0);
-
-  /* Array in which we store all ongoing communications */
-  sg_comm_t* pending_comms = xbt_malloc(sizeof(sg_comm_t) * (messages_count + receivers_count));
-  int pending_comms_count  = 0;
-
-  /* Make an array of the mailboxes to use */
-  sg_mailbox_t* mboxes = xbt_malloc(sizeof(sg_mailbox_t) * receivers_count);
-  for (long i = 0; i < receivers_count; i++) {
-    char mailbox_name[80];
-    snprintf(mailbox_name, 79, "receiver-%ld", i);
-    sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
-    mboxes[i]         = mbox;
-  }
-
-  /* Start dispatching all messages to receivers, in a round robin fashion */
-  for (long i = 0; i < messages_count; i++) {
-    char msg_content[80];
-    snprintf(msg_content, 79, "Message %ld", i);
-    sg_mailbox_t mbox = mboxes[i % receivers_count];
-    XBT_INFO("Send '%s' to '%s'", msg_content, sg_mailbox_get_name(mbox));
-
-    /* Create a communication representing the ongoing communication, and store it in pending_comms */
-    pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, xbt_strdup(msg_content), msg_size);
-  }
-  /* Start sending messages to let the workers know that they should stop */
-  for (long i = 0; i < receivers_count; i++) {
-    XBT_INFO("Send 'finalize' to 'receiver-%ld'", i);
-    char* end_msg                        = xbt_strdup("finalize");
-    sg_mailbox_t mbox                    = mboxes[i % receivers_count];
-    pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, end_msg, 0);
-  }
-
-  XBT_INFO("Done dispatching all messages");
-
-  /* Now that all message exchanges were initiated, wait for their completion, in order of termination.
-   *
-   * This loop waits for first terminating message with wait_any() and remove it from the array (with a memmove),
-   *  until all comms are terminated.
-   * Even in this simple example, the pending comms do not terminate in the exact same order of creation.
-   */
-  while (pending_comms_count != 0) {
-    ssize_t changed_pos = sg_comm_wait_any(pending_comms, pending_comms_count);
-    memmove(pending_comms + changed_pos, pending_comms + changed_pos + 1,
-            sizeof(sg_comm_t) * (pending_comms_count - changed_pos - 1));
-    pending_comms_count--;
-
-    if (changed_pos != 0)
-      XBT_INFO("Remove the %zdth pending comm: it terminated earlier than another comm that was initiated first.",
-               changed_pos);
-  }
-
-  xbt_free(pending_comms);
-  xbt_free(mboxes);
-
-  XBT_INFO("Goodbye now!");
-}
-
-static void receiver(int argc, char* argv[])
-{
-  xbt_assert(argc == 2, "Expecting one parameter from the XML deployment file but got %d", argc);
-  int id = (int)xbt_str_parse_int(argv[1], "ID should be numerical");
-  char mailbox_name[80];
-  snprintf(mailbox_name, 79, "receiver-%d", id);
-  sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
-  XBT_INFO("Wait for my first message on '%s'", mailbox_name);
-  while (1) {
-    char* received = (char*)sg_mailbox_get(mbox);
-    XBT_INFO("I got a '%s'.", received);
-    if (!strcmp(received, "finalize")) { // If it's a finalize message, we're done
-      xbt_free(received);
-      break;
-    }
-    xbt_free(received);
-  }
-
-  XBT_INFO("I'm done. See you!");
-}
-
-int main(int argc, char* argv[])
-{
-  simgrid_init(&argc, argv);
-  xbt_assert(argc > 2,
-             "Usage: %s platform_file deployment_file\n"
-             "\tExample: %s platform.xml deployment.xml\n",
-             argv[0], argv[0]);
-
-  simgrid_load_platform(argv[1]);
-
-  simgrid_register_function("sender", sender);
-  simgrid_register_function("receiver", receiver);
-  simgrid_load_deployment(argv[2]);
-
-  simgrid_run();
-  XBT_INFO("Simulation time %g", simgrid_get_clock());
-
-  return 0;
-}
diff --git a/examples/c/comm-waitany/comm-waitany.tesh b/examples/c/comm-waitany/comm-waitany.tesh
deleted file mode 100644 (file)
index b9e4a98..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/usr/bin/env tesh
-
-! output sort 19
-$ ${bindir:=.}/c-comm-waitany ${platfdir:=.}/small_platform.xml ${srcdir:=.}/comm-waitany_d.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 5' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [  0.000000] (2:receiver@Fafard) Wait for my first message on 'receiver-0'
-> [  0.000000] (3:receiver@Jupiter) Wait for my first message on 'receiver-1'
-> [  0.158397] (2:receiver@Fafard) I got a 'Message 0'.
-> [  0.169155] (3:receiver@Jupiter) I got a 'Message 1'.
-> [  0.316794] (2:receiver@Fafard) I got a 'Message 2'.
-> [  0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
-> [  0.475190] (2:receiver@Fafard) I got a 'Message 4'.
-> [  0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
-> [  0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [  0.500898] (2:receiver@Fafard) I'm done. See you!
-> [  0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
-> [  0.526478] (0:maestro@) Simulation time 0.526478
-> [  0.526478] (1:sender@Tremblay) Goodbye now!
-> [  0.526478] (3:receiver@Jupiter) I got a 'finalize'.
-> [  0.526478] (3:receiver@Jupiter) I'm done. See you!
\ No newline at end of file
diff --git a/examples/c/comm-waitany/comm-waitany_d.xml b/examples/c/comm-waitany/comm-waitany_d.xml
deleted file mode 100644 (file)
index dd639f5..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-<?xml version='1.0'?>
-<!DOCTYPE platform SYSTEM "https://simgrid.org/simgrid.dtd">
-<platform version="4.1">
-  <!-- The master actor (with some arguments) -->
-  <actor host="Tremblay" function="sender">
-    <argument value="6"/>       <!-- Number of tasks -->
-    <argument value="1000000"/>      <!-- Communication size of tasks -->
-    <argument value="2"/>         <!-- Number of receivers -->
-  </actor>
-  <!-- The receiver actors -->
-  <actor host="Fafard" function="receiver">
-    <argument value="0"/>
-  </actor>
-  <actor host="Jupiter" function="receiver">
-    <argument value="1"/>
-  </actor>
-</platform>
diff --git a/examples/c/exec-waitany/exec-waitany.c b/examples/c/exec-waitany/exec-waitany.c
deleted file mode 100644 (file)
index bfff989..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/actor.h"
-#include "simgrid/engine.h"
-#include "simgrid/exec.h"
-#include "simgrid/host.h"
-
-#include "xbt/log.h"
-#include "xbt/sysdep.h"
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(exec_waitany, "Messages specific for this example");
-
-static void worker(int argc, char* argv[])
-{
-  xbt_assert(argc > 1);
-  int with_timeout = !strcmp(argv[1], "true");
-
-  /* Vector in which we store all pending executions*/
-  sg_exec_t* pending_execs = xbt_malloc(sizeof(sg_exec_t) * 3);
-  int pending_execs_count  = 0;
-
-  for (int i = 0; i < 3; i++) {
-    char* name    = bprintf("Exec-%d", i);
-    double amount = (6 * (i % 2) + i + 1) * sg_host_get_speed(sg_host_self());
-
-    sg_exec_t exec = sg_actor_exec_init(amount);
-    sg_exec_set_name(exec, name);
-    pending_execs[pending_execs_count++] = exec;
-    sg_exec_start(exec);
-
-    XBT_INFO("Activity %s has started for %.0f seconds", name, amount / sg_host_get_speed(sg_host_self()));
-    free(name);
-  }
-
-  /* Now that executions were initiated, wait for their completion, in order of termination.
-   *
-   * This loop waits for first terminating execution with wait_any() and remove it with erase(), until all execs are
-   * terminated.
-   */
-  while (pending_execs_count > 0) {
-    ssize_t pos;
-    if (with_timeout)
-      pos = sg_exec_wait_any_for(pending_execs, pending_execs_count, 4);
-    else
-      pos = sg_exec_wait_any(pending_execs, pending_execs_count);
-
-    if (pos < 0) {
-      XBT_INFO("Do not wait any longer for an activity");
-      pending_execs_count = 0;
-    } else {
-      XBT_INFO("Activity at position %zd is complete", pos);
-      memmove(pending_execs + pos, pending_execs + pos + 1, sizeof(sg_exec_t) * (pending_execs_count - pos - 1));
-      pending_execs_count--;
-    }
-    XBT_INFO("%d activities remain pending", pending_execs_count);
-  }
-
-  xbt_free(pending_execs);
-}
-
-int main(int argc, char* argv[])
-{
-  simgrid_init(&argc, argv);
-  simgrid_load_platform(argv[1]);
-
-  const char* worker_argv[] = {"worker", "false"};
-  sg_actor_create_("worker", sg_host_by_name("Tremblay"), worker, 2, worker_argv);
-
-  worker_argv[1] = "true";
-  sg_actor_create_("worker_timeout", sg_host_by_name("Tremblay"), worker, 2, worker_argv);
-
-  simgrid_run();
-  return 0;
-}
diff --git a/examples/c/exec-waitany/exec-waitany.tesh b/examples/c/exec-waitany/exec-waitany.tesh
deleted file mode 100644 (file)
index 2ea0293..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/usr/bin/env tesh
-
-! output sort 19
-$ ${bindir:=.}/c-exec-waitany ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n"
-> [  0.000000] [        worker] Activity Exec-0 has started for 1 seconds
-> [  0.000000] [worker_timeout] Activity Exec-0 has started for 1 seconds
-> [  0.000000] [        worker] Activity Exec-1 has started for 8 seconds
-> [  0.000000] [worker_timeout] Activity Exec-1 has started for 8 seconds
-> [  0.000000] [        worker] Activity Exec-2 has started for 3 seconds
-> [  0.000000] [worker_timeout] Activity Exec-2 has started for 3 seconds
-> [  1.000000] [worker_timeout] Activity at position 0 is complete
-> [  1.000000] [worker_timeout] 2 activities remain pending
-> [  1.000000] [        worker] Activity at position 0 is complete
-> [  1.000000] [        worker] 2 activities remain pending
-> [  3.000000] [worker_timeout] Activity at position 1 is complete
-> [  3.000000] [worker_timeout] 1 activities remain pending
-> [  3.000000] [        worker] Activity at position 1 is complete
-> [  3.000000] [        worker] 1 activities remain pending
-> [  7.000000] [worker_timeout] Do not wait any longer for an activity
-> [  7.000000] [worker_timeout] 0 activities remain pending
-> [  8.000000] [        worker] Activity at position 0 is complete
-> [  8.000000] [        worker] 0 activities remain pending
index e1e5390..72e060f 100644 (file)
@@ -174,7 +174,7 @@ foreach (example activityset-testany activityset-waitany activityset-waitall act
                  task-io task-simple task-variable-load task-storm task-switch-host
                  photovoltaic-simple
                  platform-comm-serialize platform-failures platform-profile platform-properties
-                 plugin-host-load plugin-link-load plugin-prodcons
+                 plugin-host-load plugin-jbod plugin-link-load plugin-prodcons
                  replay-comm replay-io
                  routing-get-clusters
                  synchro-barrier synchro-condition-variable synchro-condition-variable-waituntil synchro-mutex synchro-semaphore
index 176e50f..efbc3a0 100644 (file)
@@ -26,7 +26,7 @@ static void bob()
                                        boost::dynamic_pointer_cast<sg4::Activity>(comm),
                                        boost::dynamic_pointer_cast<sg4::Activity>(io)});
 
-  XBT_INFO("Wait for asynchrounous activities to complete, all in one shot.");
+  XBT_INFO("Wait for asynchronous activities to complete, all in one shot.");
   pending_activities.wait_all();
 
   XBT_INFO("All activities are completed.");
index 6f01429..889b405 100644 (file)
@@ -3,5 +3,5 @@
 $ ${bindir:=.}/s4u-activityset-waitall ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
 > [0.000000] [alice] Send 'Message'
 > [0.000000] [  bob] Create my asynchronous activities
-> [0.000000] [  bob] Wait for asynchrounous activities to complete, all in one shot.
+> [0.000000] [  bob] Wait for asynchronous activities to complete, all in one shot.
 > [5.197828] [  bob] All activities are completed.
index b1d405d..971d774 100644 (file)
@@ -24,7 +24,7 @@ static void bob()
 
   sg4::ActivitySet pending_activities({exec, comm, io});
 
-  XBT_INFO("Wait for asynchrounous activities to complete");
+  XBT_INFO("Wait for asynchronous activities to complete");
   while (not pending_activities.empty()) {
     try {
       pending_activities.wait_all_for(1);
index d0c1016..014c4f0 100644 (file)
@@ -3,7 +3,7 @@
 $ ${bindir:=.}/s4u-activityset-waitallfor ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
 > [0.000000] [alice] Send 'Message'
 > [0.000000] [  bob] Create my asynchronous activities
-> [0.000000] [  bob] Wait for asynchrounous activities to complete
+> [0.000000] [  bob] Wait for asynchronous activities to complete
 > [1.000000] [  bob] Not all activities are terminated yet.
 > [2.000000] [  bob] Not all activities are terminated yet.
 > [3.000000] [  bob] Not all activities are terminated yet.
index 1878a13..6f081b3 100644 (file)
@@ -26,7 +26,7 @@ static void bob()
                                        boost::dynamic_pointer_cast<sg4::Activity>(comm),
                                        boost::dynamic_pointer_cast<sg4::Activity>(io)});
 
-  XBT_INFO("Wait for asynchrounous activities to complete");
+  XBT_INFO("Wait for asynchronous activities to complete");
   while (not pending_activities.empty()) {
     auto completed_one = pending_activities.wait_any();
     if (completed_one != nullptr) {
index bc33cc4..b9ecf1e 100644 (file)
@@ -3,7 +3,7 @@
 $ ${bindir:=.}/s4u-activityset-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
 > [0.000000] [alice] Send 'Message'
 > [0.000000] [  bob] Create my asynchronous activities
-> [0.000000] [  bob] Wait for asynchrounous activities to complete
+> [0.000000] [  bob] Wait for asynchronous activities to complete
 > [3.000000] [  bob] Completed an I/O
 > [5.000000] [  bob] Completed an Exec
 > [5.197828] [  bob] Completed a Comm
index 2ba057c..bd7e25d 100644 (file)
@@ -34,8 +34,8 @@ public:
   sg4::Mailbox* prev = nullptr;
   sg4::Mailbox* next = nullptr;
   sg4::Mailbox* me   = nullptr;
-  std::vector<sg4::CommPtr> pending_recvs;
-  std::vector<sg4::CommPtr> pending_sends;
+  sg4::ActivitySet pending_recvs;
+  sg4::ActivitySet pending_sends;
 
   unsigned long long received_bytes = 0;
   unsigned int received_pieces      = 0;
@@ -60,17 +60,16 @@ public:
 
     while (not done) {
       sg4::CommPtr comm = me->get_async<FilePiece>(&received);
-      pending_recvs.push_back(comm);
+      pending_recvs.push(comm);
 
-      ssize_t idx = sg4::Comm::wait_any(pending_recvs);
-      if (idx != -1) {
-        comm = pending_recvs.at(idx);
+      auto completed_one = pending_recvs.wait_any();
+      if (completed_one != nullptr) {
+        comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
         XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
-        pending_recvs.erase(pending_recvs.begin() + idx);
         if (next != nullptr) {
           XBT_DEBUG("Sending (asynchronously) from %s to %s", me->get_cname(), next->get_cname());
           sg4::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
-          pending_sends.push_back(send);
+          pending_sends.push(send);
         } else
           delete received;
 
@@ -110,14 +109,14 @@ public:
 
   void sendFile()
   {
-    std::vector<sg4::CommPtr> pending_sends;
+    sg4::ActivitySet pending_sends;
     for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
       XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg4::Host::current()->get_cname(),
                 first->get_cname());
       sg4::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
-      pending_sends.push_back(comm);
+      pending_sends.push(comm);
     }
-    sg4::Comm::wait_all(pending_sends);
+    pending_sends.wait_all();
   }
 
   Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
@@ -140,7 +139,7 @@ static void peer()
   p.joinChain();
   p.forwardFile();
 
-  sg4::Comm::wait_all(p.pending_sends);
+  p.pending_sends.wait_all();
   double end_time = sg4::Engine::get_clock();
 
   XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p.received_bytes,
index b637a23..2a98162 100644 (file)
@@ -24,7 +24,7 @@ public:
   void operator()() const
   {
     /* Vector in which we store all ongoing communications */
-    std::vector<sg4::CommPtr> pending_comms;
+    sg4::ActivitySet pending_comms;
 
     /* Make a vector of the mailboxes to use */
     std::vector<sg4::Mailbox*> mboxes;
@@ -40,13 +40,13 @@ public:
       auto* mbox = sg4::Mailbox::by_name(host->get_name());
       mboxes.push_back(mbox);
       sg4::CommPtr comm = mbox->put_async(payload, msg_size);
-      pending_comms.push_back(comm);
+      pending_comms.push(comm);
     }
 
     XBT_INFO("Done dispatching all messages");
 
     /* Now that all message exchanges were initiated, wait for their completion in one single call */
-    sg4::Comm::wait_all(pending_comms);
+    pending_comms.wait_all();
 
     XBT_INFO("Goodbye now!");
   }
index 90b3778..33c7729 100644 (file)
@@ -33,12 +33,12 @@ public:
     auto comm2 = mailbox2->put_async((void*)666, 2);
 
     XBT_INFO("Calling wait_any..");
-    std::vector<sg4::CommPtr> pending_comms;
-    pending_comms.push_back(comm1);
-    pending_comms.push_back(comm2);
+    sg4::ActivitySet pending_comms;
+    pending_comms.push(comm1);
+    pending_comms.push(comm2);
     try {
-      long index = sg4::Comm::wait_any(pending_comms);
-      XBT_INFO("Wait any returned index %ld (comm to %s)", index, pending_comms.at(index)->get_mailbox()->get_cname());
+      auto* acti = pending_comms.wait_any().get();
+      XBT_INFO("Wait any returned comm to %s", dynamic_cast<sg4::Comm*>(acti)->get_mailbox()->get_cname());
     } catch (const simgrid::NetworkFailureException&) {
       XBT_INFO("Sender has experienced a network failure exception, so it knows that something went wrong");
       XBT_INFO("Now it needs to figure out which of the two comms failed by looking at their state:");
@@ -52,8 +52,7 @@ public:
       XBT_INFO("Waiting on a FAILED comm raises an exception: '%s'", e.what());
     }
     XBT_INFO("Wait for remaining comm, just to be nice");
-    pending_comms.erase(pending_comms.begin());
-    sg4::Comm::wait_any(pending_comms);
+    pending_comms.wait_all();
   }
 };
 
@@ -82,18 +81,17 @@ int main(int argc, char** argv)
   auto* host1 = zone->create_host("Host1", "1f");
   auto* host2 = zone->create_host("Host2", "1f");
   auto* host3 = zone->create_host("Host3", "1f");
+  auto* link2 = zone->create_link("linkto2", "1bps")->seal();
+  auto* link3 = zone->create_link("linkto3", "1bps")->seal();
 
-  sg4::LinkInRoute linkto2{zone->create_link("linkto2", "1bps")->seal()};
-  sg4::LinkInRoute linkto3{zone->create_link("linkto3", "1bps")->seal()};
-
-  zone->add_route(host1->get_netpoint(), host2->get_netpoint(), nullptr, nullptr, {linkto2}, false);
-  zone->add_route(host1->get_netpoint(), host3->get_netpoint(), nullptr, nullptr, {linkto3}, false);
+  zone->add_route(host1, host2, {link2});
+  zone->add_route(host1, host3, {link3});
   zone->seal();
 
   sg4::Actor::create("Sender", host1, Sender("mailbox2", "mailbox3"));
   sg4::Actor::create("Receiver", host2, Receiver("mailbox2"));
   sg4::Actor::create("Receiver", host3, Receiver("mailbox3"));
-  
+
   sg4::Actor::create("LinkKiller", host1, [](){
     sg4::this_actor::sleep_for(10.0);
     XBT_INFO("Turning off link 'linkto2'");
index f3dcc62..8a58871 100644 (file)
@@ -14,4 +14,4 @@ $ ${bindir:=.}/s4u-comm-failure "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
 > [ 10.000000] (1:Sender@Host1)   Comm to mailbox3 has state: STARTED
 > [ 10.000000] (1:Sender@Host1) Waiting on a FAILED comm raises an exception: 'Cannot wait for a failed communication'
 > [ 10.000000] (1:Sender@Host1) Wait for remaining comm, just to be nice
-> [ 16.494845] (3:Receiver@Host3) Receiver has received successfully!
+> [ 17.319588] (3:Receiver@Host3) Receiver has received successfully!
index 6897298..f46ee8d 100644 (file)
@@ -33,7 +33,7 @@ static void peer(int my_id, int messages_count, size_t payload_size, int peers_c
   sg4::Mailbox* my_mbox = sg4::Mailbox::by_name("peer-" + std::to_string(my_id));
   my_mbox->set_receiver(sg4::Actor::self());
 
-  std::vector<sg4::CommPtr> pending_comms;
+  sg4::ActivitySet pending_comms;
 
   /* Start dispatching all messages to peers others that myself */
   for (int i = 0; i < messages_count; i++) {
@@ -45,7 +45,7 @@ static void peer(int my_id, int messages_count, size_t payload_size, int peers_c
         // 'message' is not a stable storage location
         XBT_INFO("Send '%s' to '%s'", message.c_str(), mbox->get_cname());
         /* Create a communication representing the ongoing communication */
-        pending_comms.push_back(mbox->put_async(payload, payload_size));
+        pending_comms.push(mbox->put_async(payload, payload_size));
       }
     }
   }
@@ -55,7 +55,7 @@ static void peer(int my_id, int messages_count, size_t payload_size, int peers_c
     if (peer_id != my_id) {
       sg4::Mailbox* mbox = sg4::Mailbox::by_name("peer-" + std::to_string(peer_id));
       auto* payload      = new std::string("finalize"); // Make a copy of the data we will send
-      pending_comms.push_back(mbox->put_async(payload, payload_size));
+      pending_comms.push(mbox->put_async(payload, payload_size));
       XBT_INFO("Send 'finalize' to 'peer-%d'", peer_id);
     }
   }
@@ -84,7 +84,7 @@ static void peer(int my_id, int messages_count, size_t payload_size, int peers_c
   }
 
   XBT_INFO("I'm done, just waiting for my peers to receive the messages before exiting");
-  sg4::Comm::wait_all(pending_comms);
+  pending_comms.wait_all();
 
   XBT_INFO("Goodbye now!");
 }
index 093f5ab..b20ad0a 100644 (file)
@@ -33,10 +33,10 @@ static void sender(std::vector<std::string> args)
     mailbox->put(payload, comm_size);
   } else {
     // Start all comms in parallel, and wait for all completions in one shot
-    std::vector<sg4::CommPtr> comms;
+    sg4::ActivitySet comms;
     for (int i = 0; i < flow_amount; i++)
-      comms.push_back(mailbox->put_async(bprintf("%d", i), comm_size));
-    sg4::Comm::wait_all(comms);
+      comms.push(mailbox->put_async(bprintf("%d", i), comm_size));
+    comms.wait_all();
   }
   XBT_INFO("sender done.");
 }
@@ -56,11 +56,11 @@ static void receiver(std::vector<std::string> args)
     std::vector<char*> data(flow_amount);
 
     // Start all comms in parallel, and wait for their completion in one shot
-    std::vector<sg4::CommPtr> comms;
+    sg4::ActivitySet comms;
     for (int i = 0; i < flow_amount; i++)
-      comms.push_back(mailbox->get_async<char>(&data[i]));
+      comms.push(mailbox->get_async<char>(&data[i]));
 
-    sg4::Comm::wait_all(comms);
+    comms.wait_all();
     for (int i = 0; i < flow_amount; i++)
       xbt_free(data[i]);
   }
index f473664..08129e4 100644 (file)
@@ -14,16 +14,14 @@ static void worker()
   // Define an amount of work that should take 1 second to execute.
   double computation_amount = sg4::this_actor::get_host()->get_speed();
 
-  std::vector<sg4::ExecPtr> pending_execs;
   // Create a small DAG
   // + Two parents and a child
   // + First parent ends after 1 second and the Second parent after 2 seconds.
   sg4::ExecPtr first_parent = sg4::this_actor::exec_init(computation_amount);
-  pending_execs.push_back(first_parent);
   sg4::ExecPtr second_parent = sg4::this_actor::exec_init(2 * computation_amount);
-  pending_execs.push_back(second_parent);
   sg4::ExecPtr child = sg4::Exec::init()->set_flops_amount(computation_amount);
-  pending_execs.push_back(child);
+
+  sg4::ActivitySet pending_execs ({first_parent, second_parent, child});
 
   // Name the activities (for logging purposes only)
   first_parent->set_name("parent 1");
@@ -41,9 +39,9 @@ static void worker()
 
   // wait for the completion of all activities
   while (not pending_execs.empty()) {
-    ssize_t changed_pos = sg4::Exec::wait_any_for(pending_execs, -1);
-    XBT_INFO("Exec '%s' is complete", pending_execs[changed_pos]->get_cname());
-    pending_execs.erase(pending_execs.begin() + changed_pos);
+    auto completed_one = pending_execs.wait_any();
+    if (completed_one != nullptr)
+      XBT_INFO("Exec '%s' is complete", completed_one->get_cname());
   }
 }
 
index c86a78c..4772376 100644 (file)
@@ -12,16 +12,15 @@ namespace sg4 = simgrid::s4u;
 
 static void test()
 {
-  std::vector<sg4::ActivityPtr> pending_activities;
-
   sg4::ExecPtr bob_compute = sg4::this_actor::exec_init(1e9);
-  pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(bob_compute));
   sg4::IoPtr bob_write = sg4::Host::current()->get_disks().front()->io_init(4000000, sg4::Io::OpType::WRITE);
-  pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(bob_write));
   sg4::IoPtr carl_read = sg4::Host::by_name("carl")->get_disks().front()->io_init(4000000, sg4::Io::OpType::READ);
-  pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(carl_read));
   sg4::ExecPtr carl_compute = sg4::Host::by_name("carl")->exec_init(1e9);
-  pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(carl_compute));
+
+  sg4::ActivitySet pending_activities ({boost::dynamic_pointer_cast<sg4::Activity>(bob_compute),
+                                        boost::dynamic_pointer_cast<sg4::Activity>(bob_write),
+                                        boost::dynamic_pointer_cast<sg4::Activity>(carl_read),
+                                        boost::dynamic_pointer_cast<sg4::Activity>(carl_compute)});
 
   // Name the activities (for logging purposes only)
   bob_compute->set_name("bob compute");
@@ -45,9 +44,9 @@ static void test()
 
   // wait for the completion of all activities
   while (not pending_activities.empty()) {
-    ssize_t changed_pos = sg4::Activity::wait_any(pending_activities);
-    XBT_INFO("Activity '%s' is complete", pending_activities[changed_pos]->get_cname());
-    pending_activities.erase(pending_activities.begin() + changed_pos);
+    auto completed_one = pending_activities.wait_any();
+    if (completed_one != nullptr)
+      XBT_INFO("Activity '%s' is complete", completed_one->get_cname());
   }
 }
 
index 1ff67f6..7cb9437 100644 (file)
@@ -78,8 +78,8 @@ static void load_platform()
     /* add link UP/DOWN for communications from the host */
     root->add_route(host->get_netpoint(), nullptr, nullptr, nullptr, {{l, sg4::LinkInRoute::Direction::UP}}, true);
 
-    const sg4::Link* loopback = root->create_link(hostname + "_loopback", BW_LOCAL)->set_latency(LATENCY)->seal();
-    root->add_route(host->get_netpoint(), host->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(loopback)});
+    sg4::Link* loopback = root->create_link(hostname + "_loopback", BW_LOCAL)->set_latency(LATENCY)->seal();
+    root->add_route(host, host, {loopback});
   }
 
   root->seal();
index 6384e76..d109be7 100644 (file)
@@ -8,6 +8,7 @@
  */
 
 #include <simgrid/s4u.hpp>
+#include <string>
 
 namespace sg4 = simgrid::s4u;
 
@@ -23,10 +24,10 @@ public:
   void operator()() const
   {
     // sphinx-doc: init-begin (this line helps the doc to build; ignore it)
-    /* Vector in which we store all ongoing communications */
-    std::vector<sg4::CommPtr> pending_comms;
+    /* ActivitySet in which we store all ongoing communications */
+    sg4::ActivitySet pending_comms;
 
-    /* Make a vector of the mailboxes to use */
+    /* Mailbox to use */
     sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver");
     // sphinx-doc: init-end
 
@@ -41,13 +42,13 @@ public:
 
       /* Create a communication representing the ongoing communication, and store it in pending_comms */
       sg4::CommPtr comm = mbox->put_async(payload, size);
-      pending_comms.push_back(comm);
+      pending_comms.push(comm);
     }
 
     XBT_INFO("Done dispatching all messages");
 
     /* Now that all message exchanges were initiated, wait for their completion in one single call */
-    sg4::Comm::wait_all(pending_comms);
+    pending_comms.wait_all();
     // sphinx-doc: put-end
 
     XBT_INFO("Goodbye now!");
@@ -63,23 +64,28 @@ public:
   explicit Receiver(int count) : messages_count(count) { mbox = sg4::Mailbox::by_name("receiver"); }
   void operator()()
   {
-    /* Vector in which we store all incoming msgs */
-    std::vector<std::unique_ptr<std::string*>> pending_msgs;
-    std::vector<sg4::CommPtr> pending_comms;
+    /* Where we store all incoming msgs */
+    std::unordered_map<sg4::CommPtr, std::shared_ptr<std::string*>> pending_msgs;
+    sg4::ActivitySet pending_comms;
 
     XBT_INFO("Wait for %d messages asynchronously", messages_count);
     for (int i = 0; i < messages_count; i++) {
-      pending_msgs.push_back(std::make_unique<std::string*>());
-      pending_comms.emplace_back(mbox->get_async<std::string>(pending_msgs[i].get()));
+      std::shared_ptr<std::string*> msg =std::make_shared<std::string*>();
+      auto comm = mbox->get_async<std::string>(msg.get());
+      pending_comms.push(comm);
+      pending_msgs.insert({comm, msg});
     }
+
     while (not pending_comms.empty()) {
-      ssize_t index    = sg4::Comm::wait_any(pending_comms);
-      std::string* msg = *pending_msgs[index];
-      XBT_INFO("I got '%s'.", msg->c_str());
-      /* cleanup memory and remove from vectors */
-      delete msg;
-      pending_comms.erase(pending_comms.begin() + index);
-      pending_msgs.erase(pending_msgs.begin() + index);
+      auto completed_one = pending_comms.wait_any();
+      if (completed_one != nullptr){
+        auto comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
+        auto msg = *pending_msgs[comm];
+        XBT_INFO("I got '%s'.", msg->c_str());
+        /* cleanup memory and remove from map */
+        delete msg;
+        pending_msgs.erase(comm);
+      }
     }
   }
 };
@@ -125,8 +131,7 @@ static void load_platform()
   link->set_latency(10e-6)->seal();
 
   /* create routes between nodes */
-  zone->add_route(sender->get_netpoint(), receiver->get_netpoint(), nullptr, nullptr,
-                  {{link, sg4::LinkInRoute::Direction::UP}}, true);
+  zone->add_route(sender, receiver, {link});
   zone->seal();
 
   /* create actors Sender/Receiver */
index 3fe272d..1e637ff 100644 (file)
@@ -7,7 +7,7 @@
  *
  * This example is very similar to the other asynchronous communication examples, but messages get serialized by the platform.
  * Without this call to Link::set_concurrency_limit(2) in main, all messages would be received at the exact same timestamp since
- * they are initiated at the same instant and are of the same size. But with this extra configuration to the link, at most 2 
+ * they are initiated at the same instant and are of the same size. But with this extra configuration to the link, at most 2
  * messages can travel through the link at the same time.
  */
 
@@ -26,10 +26,10 @@ public:
   void operator()() const
   {
     // sphinx-doc: init-begin (this line helps the doc to build; ignore it)
-    /* Vector in which we store all ongoing communications */
-    std::vector<sg4::CommPtr> pending_comms;
+    /* ActivitySet in which we store all ongoing communications */
+    sg4::ActivitySet pending_comms;
 
-    /* Make a vector of the mailboxes to use */
+    /* Mailbox to use */
     sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver");
     // sphinx-doc: init-end
 
@@ -44,13 +44,13 @@ public:
 
       /* Create a communication representing the ongoing communication, and store it in pending_comms */
       sg4::CommPtr comm = mbox->put_async(payload, msg_size);
-      pending_comms.push_back(comm);
+      pending_comms.push(comm);
     }
 
     XBT_INFO("Done dispatching all messages");
 
     /* Now that all message exchanges were initiated, wait for their completion in one single call */
-    sg4::Comm::wait_all(pending_comms);
+    pending_comms.wait_all();
     // sphinx-doc: put-end
 
     XBT_INFO("Goodbye now!");
@@ -66,23 +66,28 @@ public:
   explicit Receiver(int count) : messages_count(count) { mbox = sg4::Mailbox::by_name("receiver"); }
   void operator()()
   {
-    /* Vector in which we store all incoming msgs */
-    std::vector<std::unique_ptr<std::string*>> pending_msgs;
-    std::vector<sg4::CommPtr> pending_comms;
+    /* Where we store all incoming msgs */
+    std::unordered_map<sg4::CommPtr, std::shared_ptr<std::string*>> pending_msgs;
+    sg4::ActivitySet pending_comms;
 
     XBT_INFO("Wait for %d messages asynchronously", messages_count);
     for (int i = 0; i < messages_count; i++) {
-      pending_msgs.push_back(std::make_unique<std::string*>());
-      pending_comms.emplace_back(mbox->get_async<std::string>(pending_msgs[i].get()));
+      std::shared_ptr<std::string*> msg =std::make_shared<std::string*>();
+      auto comm = mbox->get_async<std::string>(msg.get());
+      pending_comms.push(comm);
+      pending_msgs.insert({comm, msg});
     }
+
     while (not pending_comms.empty()) {
-      ssize_t index    = sg4::Comm::wait_any(pending_comms);
-      std::string* msg = *pending_msgs[index];
-      XBT_INFO("I got '%s'.", msg->c_str());
-      /* cleanup memory and remove from vectors */
-      delete msg;
-      pending_comms.erase(pending_comms.begin() + index);
-      pending_msgs.erase(pending_msgs.begin() + index);
+      auto completed_one = pending_comms.wait_any();
+      if (completed_one != nullptr){
+        auto comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
+        auto msg = *pending_msgs[comm];
+        XBT_INFO("I got '%s'.", msg->c_str());
+        /* cleanup memory and remove from map */
+        delete msg;
+        pending_msgs.erase(comm);
+      }
     }
   }
 };
@@ -104,8 +109,7 @@ int main(int argc, char* argv[])
       zone->create_split_duplex_link("link1", 10e9)->set_latency(10e-6)->set_concurrency_limit(2)->seal();
 
   /* create routes between nodes */
-  zone->add_route(sender->get_netpoint(), receiver->get_netpoint(), nullptr, nullptr,
-                  {{link, sg4::LinkInRoute::Direction::UP}}, true);
+  zone->add_route(sender, receiver, {link});
   zone->seal();
 
   /* create actors Sender/Receiver */
index 4cafcc1..f622d90 100644 (file)
@@ -118,8 +118,8 @@ int main(int argc, char* argv[])
   // Add a new host programatically, and attach a state profile to it
   auto* root     = e.get_netzone_root();
   auto* lilibeth = root->create_host("Lilibeth", 1e15);
-  auto link      = sg4::LinkInRoute(e.link_by_name("10"));
-  root->add_route(e.host_by_name("Tremblay")->get_netpoint(), lilibeth->get_netpoint(), nullptr, nullptr, {link}, true);
+  auto link      = e.link_by_name("10");
+  root->add_route(e.host_by_name("Tremblay"), lilibeth, {link});
   lilibeth->set_state_profile(simgrid::kernel::profile::ProfileBuilder::from_string("lilibeth_profile", R"(
 4 0
 5 1
diff --git a/examples/cpp/plugin-jbod/s4u-plugin-jbod.cpp b/examples/cpp/plugin-jbod/s4u-plugin-jbod.cpp
new file mode 100644 (file)
index 0000000..67e0b5c
--- /dev/null
@@ -0,0 +1,87 @@
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include "simgrid/plugins/jbod.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(jbod_test, "Messages specific for this simulation");
+namespace sg4 = simgrid::s4u;
+
+static void write_then_read(simgrid::plugin::Jbod* jbod)
+{
+  simgrid::plugin::JbodIoPtr io = jbod->write_async(1e7);
+  XBT_INFO("asynchronous write posted, wait for it");
+  io->wait();
+  XBT_INFO("asynchronous write done");
+  jbod->read(1e7);
+  XBT_INFO("synchonous read done");
+  jbod->write(1e7);
+  XBT_INFO("synchonous write done");
+  io = jbod->read_async(1e7);
+  XBT_INFO("asynchronous read posted, wait for it");
+  io->wait();
+  XBT_INFO("asynchonous read done");
+  jbod->write(1e7);
+  XBT_INFO("synchonous write done");
+  jbod->read(1e7);
+  XBT_INFO("synchonous read done");
+  jbod->read(1e7);
+  XBT_INFO("synchonous read done");
+}
+
+int main(int argc, char** argv)
+{
+  sg4::Engine e(&argc, argv);
+  auto* zone = sg4::create_full_zone("zone");
+  auto* host = zone->create_host("host", "1Gf");
+  // set up link so that data transfer from host to JBOD takes exactly 1 second (without crosstraffic)
+  auto* link = zone->create_link("link", 1e7/0.97)->set_latency(0);
+
+  auto* jbod_raid0 =
+    simgrid::plugin::Jbod::create_jbod(zone, "jbod_raid0", 1e9, 4, simgrid::plugin::Jbod::RAID::RAID0, 1e7, 5e6);
+  zone->add_route(host->get_netpoint(), jbod_raid0->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(link)});
+
+  auto* jbod_raid1 =
+    simgrid::plugin::Jbod::create_jbod(zone, "jbod_raid1", 1e9, 4, simgrid::plugin::Jbod::RAID::RAID1, 1e7, 5e6);
+  zone->add_route(host->get_netpoint(), jbod_raid1->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(link)});
+
+  auto* jbod_raid4 =
+    simgrid::plugin::Jbod::create_jbod(zone, "jbod_raid4", 1e9, 4, simgrid::plugin::Jbod::RAID::RAID4, 1e7, 5e6);
+  zone->add_route(host->get_netpoint(), jbod_raid4->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(link)});
+
+  auto* jbod_raid5 =
+    simgrid::plugin::Jbod::create_jbod(zone, "jbod_raid5", 1e9, 4, simgrid::plugin::Jbod::RAID::RAID5, 1e7, 5e6);
+  zone->add_route(host->get_netpoint(), jbod_raid5->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(link)});
+
+  auto* jbod_raid6 =
+    simgrid::plugin::Jbod::create_jbod(zone, "jbod_raid6", 1e9, 4, simgrid::plugin::Jbod::RAID::RAID6, 1e7, 5e6);
+  zone->add_route(host->get_netpoint(), jbod_raid6->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(link)});
+
+  zone->seal();
+
+  XBT_INFO("XXXXXXXXXXXXXXX RAID 0 XXXXXXXXXXXXXXXX");
+  sg4::Actor::create("", host, write_then_read, jbod_raid0);
+  e.run();
+
+  XBT_INFO("XXXXXXXXXXXXXXX RAID 1 XXXXXXXXXXXXXXXX");
+  sg4::Actor::create("", host, write_then_read, jbod_raid1);
+  e.run();
+
+  XBT_INFO("XXXXXXXXXXXXXXX RAID 4 XXXXXXXXXXXXXXXX");
+  sg4::Actor::create("", host, write_then_read, jbod_raid4);
+  e.run();
+
+  XBT_INFO("XXXXXXXXXXXXXXX RAID 5 XXXXXXXXXXXXXXXX");
+  sg4::Actor::create("", host, write_then_read, jbod_raid5);
+  e.run();
+
+  XBT_INFO("XXXXXXXXXXXXXXX RAID 6 XXXXXXXXXXXXXXXX");
+  sg4::Actor::create("", host, write_then_read, jbod_raid6);
+  e.run();
+
+  XBT_INFO("Simulated time: %g", sg4::Engine::get_clock());
+
+  return 0;
+}
diff --git a/examples/cpp/plugin-jbod/s4u-plugin-jbod.tesh b/examples/cpp/plugin-jbod/s4u-plugin-jbod.tesh
new file mode 100644 (file)
index 0000000..cfb05a0
--- /dev/null
@@ -0,0 +1,55 @@
+#!/usr/bin/env tesh
+
+$ ${bindir}/s4u-plugin-jbod --cfg=network/crosstraffic:0 "--log=root.fmt:[%10.6r]%e%m%n"
+> [  0.000000] Configuration change: Set 'network/crosstraffic' to '0'
+> [  0.000000] XXXXXXXXXXXXXXX RAID 0 XXXXXXXXXXXXXXXX
+> [  0.000000] asynchronous write posted, wait for it
+> [  1.500000] asynchronous write done
+> [  2.750000] synchonous read done
+> [  4.250000] synchonous write done
+> [  4.250000] asynchronous read posted, wait for it
+> [  5.500000] asynchonous read done
+> [  7.000000] synchonous write done
+> [  8.250000] synchonous read done
+> [  9.500000] synchonous read done
+> [  9.500000] XXXXXXXXXXXXXXX RAID 1 XXXXXXXXXXXXXXXX
+> [  9.500000] asynchronous write posted, wait for it
+> [ 12.500000] asynchronous write done
+> [ 14.500000] synchonous read done
+> [ 17.500000] synchonous write done
+> [ 17.500000] asynchronous read posted, wait for it
+> [ 19.500000] asynchonous read done
+> [ 22.500000] synchonous write done
+> [ 24.500000] synchonous read done
+> [ 26.500000] synchonous read done
+> [ 26.500000] XXXXXXXXXXXXXXX RAID 4 XXXXXXXXXXXXXXXX
+> [ 26.500000] asynchronous write posted, wait for it
+> [ 28.170000] asynchronous write done
+> [ 29.503333] synchonous read done
+> [ 31.173333] synchonous write done
+> [ 31.173333] asynchronous read posted, wait for it
+> [ 32.506666] asynchonous read done
+> [ 34.176666] synchonous write done
+> [ 35.510000] synchonous read done
+> [ 36.843333] synchonous read done
+> [ 36.843333] XXXXXXXXXXXXXXX RAID 5 XXXXXXXXXXXXXXXX
+> [ 36.843333] asynchronous write posted, wait for it
+> [ 38.513333] asynchronous write done
+> [ 39.846666] synchonous read done
+> [ 41.516666] synchonous write done
+> [ 41.516666] asynchronous read posted, wait for it
+> [ 42.849999] asynchonous read done
+> [ 44.519999] synchonous write done
+> [ 45.853333] synchonous read done
+> [ 47.186666] synchonous read done
+> [ 47.186666] XXXXXXXXXXXXXXX RAID 6 XXXXXXXXXXXXXXXX
+> [ 47.186666] asynchronous write posted, wait for it
+> [ 50.186666] asynchronous write done
+> [ 51.686666] synchonous read done
+> [ 54.686666] synchonous write done
+> [ 54.686666] asynchronous read posted, wait for it
+> [ 56.186666] asynchonous read done
+> [ 59.186666] synchonous write done
+> [ 60.686666] synchonous read done
+> [ 62.186666] synchonous read done
+> [ 62.186666] Simulated time: 62.1867
\ No newline at end of file
index 4483303..ee7a3ce 100644 (file)
@@ -70,7 +70,9 @@ int main(int argc, char* argv[])
   }
 
   auto* router = cluster->create_router("cluster_router");
-  cluster->add_route(router, nullptr, nullptr, nullptr, {});
+  std::vector<sg4::LinkInRoute> links; // empty
+  cluster->add_route(router, nullptr, nullptr, nullptr, links);
+  cluster->seal();
 
   simgrid::plugin::ProducerConsumerPtr<int> pc = simgrid::plugin::ProducerConsumer<int>::create(2);
 
index 555cfc1..3705f47 100644 (file)
@@ -1,7 +1,8 @@
 foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
+        activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
         app-masterworkers
-        comm-wait comm-waitall comm-waitallfor comm-waitany comm-failure comm-host2host comm-pingpong
-        comm-ready comm-suspend comm-testany comm-throttling comm-waitallfor comm-waituntil
+        comm-wait comm-failure comm-host2host comm-pingpong
+        comm-ready comm-suspend comm-throttling comm-waituntil
         exec-async exec-basic exec-dvfs exec-remote exec-ptask
         task-io task-simple task-switch-host task-variable-load
         platform-comm-serialize platform-profile platform-failures
diff --git a/examples/python/activityset-testany/activityset-testany.py b/examples/python/activityset-testany/activityset-testany.py
new file mode 100644 (file)
index 0000000..6a8901f
--- /dev/null
@@ -0,0 +1,57 @@
+# Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.
+#
+# This program is free software you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+Usage: activityset-testany.py platform_file [other parameters]
+"""
+
+import sys
+from simgrid import Actor, ActivitySet, Engine, Comm, Exec, Io, Host, Mailbox, this_actor
+
+def bob():
+  mbox = Mailbox.by_name("mbox")
+  disk = Host.current().get_disks()[0]
+
+  this_actor.info("Create my asynchronous activities")
+  exec = this_actor.exec_async(5e9)
+  comm = mbox.get_async()
+  io   = disk.read_async(300000000)
+
+  pending_activities = ActivitySet([exec, comm])
+  pending_activities.push(io) # Activities can be pushed after creation, too
+  this_actor.info("Sleep_for a while")
+  this_actor.sleep_for(1)
+
+  this_actor.info("Test for completed activities")
+  while not pending_activities.empty():
+    completed_one = pending_activities.test_any()
+    if completed_one == None:
+      this_actor.info("Nothing matches, test again in 0.5s")
+      this_actor.sleep_for(.5)
+    elif isinstance(completed_one, Comm):
+      this_actor.info("Completed a Comm")
+    elif isinstance(completed_one, Exec):
+      this_actor.info("Completed an Exec")
+    elif isinstance(completed_one, Io):
+      this_actor.info("Completed an I/O")
+
+  this_actor.info("Last activity is complete")
+
+def alice():
+  this_actor.info("Send 'Message'")
+  Mailbox.by_name("mbox").put("Message", 600000000)
+
+if __name__ == '__main__':
+  e = Engine(sys.argv)
+  e.set_log_control("root.fmt:[%4.2r]%e[%5a]%e%m%n")
+
+  # Load the platform description
+  e.load_platform(sys.argv[1])
+
+  Actor.create("bob",   Host.by_name("bob"), bob)
+  Actor.create("alice", Host.by_name("alice"), alice)
+
+  e.run()
diff --git a/examples/python/activityset-testany/activityset-testany.tesh b/examples/python/activityset-testany/activityset-testany.tesh
new file mode 100644 (file)
index 0000000..5177a16
--- /dev/null
@@ -0,0 +1,20 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/activityset-testany.py ${platfdir}/hosts_with_disks.xml
+> [0.00] [alice] Send 'Message'
+> [0.00] [  bob] Create my asynchronous activities
+> [0.00] [  bob] Sleep_for a while
+> [1.00] [  bob] Test for completed activities
+> [1.00] [  bob] Nothing matches, test again in 0.5s
+> [1.50] [  bob] Nothing matches, test again in 0.5s
+> [2.00] [  bob] Nothing matches, test again in 0.5s
+> [2.50] [  bob] Nothing matches, test again in 0.5s
+> [3.00] [  bob] Completed an I/O
+> [3.00] [  bob] Nothing matches, test again in 0.5s
+> [3.50] [  bob] Nothing matches, test again in 0.5s
+> [4.00] [  bob] Nothing matches, test again in 0.5s
+> [4.50] [  bob] Nothing matches, test again in 0.5s
+> [5.00] [  bob] Completed an Exec
+> [5.00] [  bob] Nothing matches, test again in 0.5s
+> [5.50] [  bob] Completed a Comm
+> [5.50] [  bob] Last activity is complete
diff --git a/examples/python/activityset-waitall/activityset-waitall.py b/examples/python/activityset-waitall/activityset-waitall.py
new file mode 100644 (file)
index 0000000..4e4d1ee
--- /dev/null
@@ -0,0 +1,44 @@
+# Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.
+#
+# This program is free software you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+Usage: activityset-waitall.py platform_file [other parameters]
+"""
+
+import sys
+from simgrid import Actor, ActivitySet, Engine, Comm, Exec, Io, Host, Mailbox, this_actor
+
+def bob():
+  mbox = Mailbox.by_name("mbox")
+  disk = Host.current().get_disks()[0]
+
+  this_actor.info("Create my asynchronous activities")
+  exec = this_actor.exec_async(5e9)
+  comm = mbox.get_async()
+  io   = disk.read_async(300000000)
+
+  pending_activities = ActivitySet([exec, comm])
+  pending_activities.push(io) # Activities can be pushed after creation, too
+  this_actor.info("Wait for asynchronous activities to complete, all in one shot.")
+  pending_activities.wait_all()
+
+  this_actor.info("All activities are completed.")
+
+def alice():
+  this_actor.info("Send 'Message'")
+  Mailbox.by_name("mbox").put("Message", 600000000)
+
+if __name__ == '__main__':
+  e = Engine(sys.argv)
+  e.set_log_control("root.fmt:[%4.6r]%e[%5a]%e%m%n")
+
+  # Load the platform description
+  e.load_platform(sys.argv[1])
+
+  Actor.create("bob",   Host.by_name("bob"), bob)
+  Actor.create("alice", Host.by_name("alice"), alice)
+
+  e.run()
diff --git a/examples/python/activityset-waitall/activityset-waitall.tesh b/examples/python/activityset-waitall/activityset-waitall.tesh
new file mode 100644 (file)
index 0000000..ab4b175
--- /dev/null
@@ -0,0 +1,7 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/activityset-waitall.py ${platfdir}/hosts_with_disks.xml
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchronous activities to complete, all in one shot.
+> [5.197828] [  bob] All activities are completed.
diff --git a/examples/python/activityset-waitallfor/activityset-waitallfor.py b/examples/python/activityset-waitallfor/activityset-waitallfor.py
new file mode 100644 (file)
index 0000000..44b3c6f
--- /dev/null
@@ -0,0 +1,58 @@
+# Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.
+#
+# This program is free software you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+Usage: activityset-waitallfor.py platform_file [other parameters]
+"""
+
+import sys
+from simgrid import Actor, ActivitySet, Engine, Comm, Exec, Io, Host, Mailbox, this_actor, TimeoutException
+
+def bob():
+  mbox = Mailbox.by_name("mbox")
+  disk = Host.current().get_disks()[0]
+
+  this_actor.info("Create my asynchronous activities")
+  exec = this_actor.exec_async(5e9)
+  comm = mbox.get_async()
+  io   = disk.read_async(300000000)
+
+  pending_activities = ActivitySet([exec, comm])
+  pending_activities.push(io) # Activities can be pushed after creation, too
+  this_actor.info("Wait for asynchronous activities to complete")
+  while not pending_activities.empty():
+    try:
+      pending_activities.wait_all_for(1)
+    except TimeoutException:
+      this_actor.info("Not all activities are terminated yet.")
+
+    completed_one = pending_activities.test_any()
+    while completed_one != None:
+      if isinstance(completed_one, Comm):
+        this_actor.info("Completed a Comm")
+      elif isinstance(completed_one, Exec):
+        this_actor.info("Completed an Exec")
+      elif isinstance(completed_one, Io):
+        this_actor.info("Completed an I/O")
+      completed_one = pending_activities.test_any()
+
+  this_actor.info("Last activity is complete")
+
+def alice():
+  this_actor.info("Send 'Message'")
+  Mailbox.by_name("mbox").put("Message", 600000000)
+
+if __name__ == '__main__':
+  e = Engine(sys.argv)
+  e.set_log_control("root.fmt:[%4.6r]%e[%5a]%e%m%n")
+
+  # Load the platform description
+  e.load_platform(sys.argv[1])
+
+  Actor.create("bob",   Host.by_name("bob"), bob)
+  Actor.create("alice", Host.by_name("alice"), alice)
+
+  e.run()
diff --git a/examples/python/activityset-waitallfor/activityset-waitallfor.tesh b/examples/python/activityset-waitallfor/activityset-waitallfor.tesh
new file mode 100644 (file)
index 0000000..cf08947
--- /dev/null
@@ -0,0 +1,15 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/activityset-waitallfor.py ${platfdir}/hosts_with_disks.xml
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchronous activities to complete
+> [1.000000] [  bob] Not all activities are terminated yet.
+> [2.000000] [  bob] Not all activities are terminated yet.
+> [3.000000] [  bob] Not all activities are terminated yet.
+> [3.000000] [  bob] Completed an I/O
+> [4.000000] [  bob] Not all activities are terminated yet.
+> [5.000000] [  bob] Not all activities are terminated yet.
+> [5.000000] [  bob] Completed an Exec
+> [5.197828] [  bob] Completed a Comm
+> [5.197828] [  bob] Last activity is complete
diff --git a/examples/python/activityset-waitany/activityset-waitany.py b/examples/python/activityset-waitany/activityset-waitany.py
new file mode 100644 (file)
index 0000000..88ac531
--- /dev/null
@@ -0,0 +1,52 @@
+# Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.
+#
+# This program is free software you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+Usage: activityset-waitany.py platform_file [other parameters]
+"""
+
+import sys
+from simgrid import Actor, ActivitySet, Engine, Comm, Exec, Io, Host, Mailbox, this_actor
+
+def bob():
+  mbox = Mailbox.by_name("mbox")
+  disk = Host.current().get_disks()[0]
+
+  this_actor.info("Create my asynchronous activities")
+  exec = this_actor.exec_async(5e9)
+  comm = mbox.get_async()
+  io   = disk.read_async(300000000)
+
+  pending_activities = ActivitySet([exec, comm])
+  pending_activities.push(io) # Activities can be pushed after creation, too
+  this_actor.info("Wait for asynchronous activities to complete")
+  while not pending_activities.empty():
+    completed_one = pending_activities.wait_any()
+
+    if isinstance(completed_one, Comm):
+      this_actor.info("Completed a Comm")
+    elif isinstance(completed_one, Exec):
+      this_actor.info("Completed an Exec")
+    elif isinstance(completed_one, Io):
+      this_actor.info("Completed an I/O")
+
+  this_actor.info("Last activity is complete")
+
+def alice():
+  this_actor.info("Send 'Message'")
+  Mailbox.by_name("mbox").put("Message", 600000000)
+
+if __name__ == '__main__':
+  e = Engine(sys.argv)
+  e.set_log_control("root.fmt:[%4.6r]%e[%5a]%e%m%n")
+
+  # Load the platform description
+  e.load_platform(sys.argv[1])
+
+  Actor.create("bob",   Host.by_name("bob"), bob)
+  Actor.create("alice", Host.by_name("alice"), alice)
+
+  e.run()
diff --git a/examples/python/activityset-waitany/activityset-waitany.tesh b/examples/python/activityset-waitany/activityset-waitany.tesh
new file mode 100644 (file)
index 0000000..70b707b
--- /dev/null
@@ -0,0 +1,10 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/activityset-waitany.py ${platfdir}/hosts_with_disks.xml
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchronous activities to complete
+> [3.000000] [  bob] Completed an I/O
+> [5.000000] [  bob] Completed an Exec
+> [5.197828] [  bob] Completed a Comm
+> [5.197828] [  bob] Last activity is complete
index 79ee80c..b48cb38 100644 (file)
@@ -28,19 +28,19 @@ class Sender:
     # Actors that are created as object will execute their __call__ method.
     # So, the following constitutes the main function of the Sender actor.
     def __call__(self):
-        pending_comms = []
+        pending_comms = simgrid.ActivitySet()
         mboxes = []
 
         for host in self.hosts:
             msg = "Hello, I'm alive and running on " + simgrid.this_actor.get_host().name
             mbox = simgrid.Mailbox.by_name(host.name)
             mboxes.append(mbox)
-            pending_comms.append(mbox.put_async(msg, self.msg_size))
+            pending_comms.push(mbox.put_async(msg, self.msg_size))
 
         simgrid.this_actor.info("Done dispatching all messages")
 
         # Now that all message exchanges were initiated, wait for their completion in one single call
-        simgrid.Comm.wait_all(pending_comms)
+        pending_comms.wait_all()
 
         simgrid.this_actor.info("Goodbye now!")
 
index 4b70b33..02cd701 100644 (file)
@@ -5,7 +5,7 @@
 
 import sys
 
-from simgrid import Engine, Actor, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException
+from simgrid import Engine, Actor, ActivitySet, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException
 
 
 def sender(mailbox1_name: str, mailbox2_name: str) -> None:
@@ -19,10 +19,10 @@ def sender(mailbox1_name: str, mailbox2_name: str) -> None:
     comm2: Comm = mailbox2.put_async(666, 2)
 
     this_actor.info("Calling wait_any..")
-    pending_comms = [comm1, comm2]
+    pending_comms = ActivitySet([comm1, comm2])
     try:
-        index = Comm.wait_any([comm1, comm2])
-        this_actor.info(f"Wait any returned index {index} (comm to {pending_comms[index].mailbox.name})")
+        comm = pending_comms.wait_any()
+        this_actor.info(f"Wait any returned a comm to {comm.mailbox.name})")
     except NetworkFailureException:
         this_actor.info("Sender has experienced a network failure exception, so it knows that something went wrong")
         this_actor.info("Now it needs to figure out which of the two comms failed by looking at their state:")
@@ -36,9 +36,8 @@ def sender(mailbox1_name: str, mailbox2_name: str) -> None:
         this_actor.info(f"Waiting on a FAILED comm raises an exception: '{err}'")
 
     this_actor.info("Wait for remaining comm, just to be nice")
-    pending_comms.pop(0)
     try:
-        Comm.wait_any(pending_comms)
+        pending_comms.wait_all()
     except Exception as e:
         this_actor.warning(str(e))
 
@@ -66,11 +65,11 @@ def main():
     host2 = zone.create_host("Host2", "1f")
     host3 = zone.create_host("Host3", "1f")
 
-    link_to_2 = LinkInRoute(zone.create_link("link_to_2", "1bps").seal())
-    link_to_3 = LinkInRoute(zone.create_link("link_to_3", "1bps").seal())
+    link_to_2 = zone.create_link("link_to_2", "1bps").seal()
+    link_to_3 = zone.create_link("link_to_3", "1bps").seal()
 
-    zone.add_route(host1.netpoint, host2.netpoint, None, None, [link_to_2], False)
-    zone.add_route(host1.netpoint, host3.netpoint, None, None, [link_to_3], False)
+    zone.add_route(host1, host2, [link_to_2])
+    zone.add_route(host1, host3, [link_to_3])
     zone.seal()
 
     Actor.create("Sender", host1, sender, "mailbox2", "mailbox3")
index ee26ae8..8f4de0e 100644 (file)
@@ -14,4 +14,4 @@ $ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-failure.py "-
 > [ 10.000000] (1:Sender@Host1)   Comm to mailbox3 has state: STARTED
 > [ 10.000000] (1:Sender@Host1) Waiting on a FAILED comm raises an exception: 'Cannot wait for a failed communication'
 > [ 10.000000] (1:Sender@Host1) Wait for remaining comm, just to be nice
-> [ 16.494845] (3:Receiver-2@Host3) Receiver has received successfully (mailbox3)!
+> [ 17.319588] (3:Receiver-2@Host3) Receiver has received successfully (mailbox3)!
index fa5f91e..f874bd9 100644 (file)
@@ -7,7 +7,7 @@ from argparse import ArgumentParser
 from typing import List
 import sys
 
-from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+from simgrid import Actor, ActivitySet, Comm, Engine, Mailbox, this_actor
 
 
 FINALIZE_MESSAGE = "finalize"
@@ -31,7 +31,7 @@ def get_peer_mailbox(peer_id: int) -> Mailbox:
 def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
     my_mailbox: Mailbox = get_peer_mailbox(my_id)
     my_mailbox.set_receiver(Actor.self())
-    pending_comms: List[Comm] = []
+    pending_comms = ActivitySet()
     # Start dispatching all messages to peers others that myself
     for i in range(message_count):
         for peer_id in range(peers_count):
@@ -39,14 +39,14 @@ def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
                 peer_mailbox = get_peer_mailbox(peer_id)
                 message = f"Message {i} from peer {my_id}"
                 this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'")
-                pending_comms.append(peer_mailbox.put_async(message, payload_size))
+                pending_comms.push(peer_mailbox.put_async(message, payload_size))
 
     # Start sending messages to let peers know that they should stop
     for peer_id in range(peers_count):
         if peer_id != my_id:
             peer_mailbox = get_peer_mailbox(peer_id)
             payload = str(FINALIZE_MESSAGE)
-            pending_comms.append(peer_mailbox.put_async(payload, payload_size))
+            pending_comms.push(peer_mailbox.put_async(payload, payload_size))
             this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'")
     this_actor.info("Done dispatching all messages")
 
@@ -69,7 +69,7 @@ def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
             this_actor.sleep_for(0.01)
 
     this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting")
-    Comm.wait_all(pending_comms)
+    pending_comms.wait_all()
     this_actor.info("Goodbye now!")
 
 
diff --git a/examples/python/comm-testany/comm-testany.py b/examples/python/comm-testany/comm-testany.py
deleted file mode 100644 (file)
index 52220cf..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-from argparse import ArgumentParser
-from typing import List
-import sys
-
-from simgrid import Engine, Actor, Comm, Mailbox, this_actor
-
-
-def create_parser() -> ArgumentParser:
-    parser = ArgumentParser()
-    parser.add_argument(
-        '--platform',
-        type=str,
-        required=True,
-        help='path to the platform description'
-    )
-    return parser
-
-
-def rank0():
-    rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
-    this_actor.info("Post my asynchronous receives")
-    comm1, a1 = rank0_mailbox.get_async()
-    comm2, a2 = rank0_mailbox.get_async()
-    comm3, a3 = rank0_mailbox.get_async()
-    pending_comms: List[Comm] = [comm1, comm2, comm3]
-
-    this_actor.info("Send some data to rank-1")
-    rank1_mailbox: Mailbox = Mailbox.by_name("rank1")
-    for i in range(3):
-        rank1_mailbox.put(i, 1)
-
-    this_actor.info("Test for completed comms")
-    while pending_comms:
-        flag = Comm.test_any(pending_comms)
-        if flag != -1:
-            pending_comms.pop(flag)
-            this_actor.info("Remove a pending comm.")
-        else:
-            # Nothing matches, wait for a little bit
-            this_actor.sleep_for(0.1)
-    this_actor.info("Last comm is complete")
-
-
-def rank1():
-    rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
-    rank1_mailbox: Mailbox = Mailbox.by_name("rank1")
-    for i in range(3):
-        data: int = rank1_mailbox.get()
-        this_actor.info(f"Received {data}")
-        msg_content = f"Message {i}"
-        this_actor.info(f"Send '{msg_content}'")
-        rank0_mailbox.put(msg_content, int(1e6))
-
-
-def main():
-    settings = create_parser().parse_known_args()[0]
-    e = Engine(sys.argv)
-    e.load_platform(settings.platform)
-
-    Actor.create("rank0", e.host_by_name("Tremblay"), rank0)
-    Actor.create("rank1", e.host_by_name("Fafard"), rank1)
-
-    e.run()
-
-
-if __name__ == "__main__":
-    main()
diff --git a/examples/python/comm-testany/comm-testany.tesh b/examples/python/comm-testany/comm-testany.tesh
deleted file mode 100644 (file)
index 83d80f1..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-#!/usr/bin/env tesh
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-testany.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[  0.000000] (1:rank0@Tremblay) Post my asynchronous receives
->[  0.000000] (1:rank0@Tremblay) Send some data to rank-1
->[  0.025708] (2:rank1@Fafard) Received 0
->[  0.025708] (2:rank1@Fafard) Send 'Message 0'
->[  0.209813] (2:rank1@Fafard) Received 1
->[  0.209813] (2:rank1@Fafard) Send 'Message 1'
->[  0.393918] (1:rank0@Tremblay) Test for completed comms
->[  0.393918] (2:rank1@Fafard) Received 2
->[  0.393918] (2:rank1@Fafard) Send 'Message 2'
->[  0.393918] (1:rank0@Tremblay) Remove a pending comm.
->[  0.393918] (1:rank0@Tremblay) Remove a pending comm.
->[  0.593918] (1:rank0@Tremblay) Remove a pending comm.
->[  0.593918] (1:rank0@Tremblay) Last comm is complete
diff --git a/examples/python/comm-waitall/comm-waitall.py b/examples/python/comm-waitall/comm-waitall.py
deleted file mode 100644 (file)
index 68dacca..0000000
+++ /dev/null
@@ -1,74 +0,0 @@
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-"""
-This example shows how to block on the completion of a set of communications.
-
-As for the other asynchronous examples, the sender initiate all the messages it wants to send and
-pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
-
-The sender then blocks until all ongoing communication terminate, using simgrid.Comm.wait_all()
-"""
-
-import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
-
-
-def sender(messages_count, msg_size, receivers_count):
-    # List in which we store all ongoing communications
-    pending_comms = []
-
-    # Vector of the used mailboxes
-    mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
-              for i in range(0, receivers_count)]
-
-    # Start dispatching all messages to receivers, in a round robin fashion
-    for i in range(0, messages_count):
-        content = "Message {:d}".format(i)
-        mbox = mboxes[i % receivers_count]
-
-        this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
-
-        # Create a communication representing the ongoing communication, and store it in pending_comms
-        comm = mbox.put_async(content, msg_size)
-        pending_comms.append(comm)
-
-    # Start sending messages to let the workers know that they should stop
-    for i in range(0, receivers_count):
-        mbox = mboxes[i]
-        this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
-        comm = mbox.put_async("finalize", 0)
-        pending_comms.append(comm)
-
-    this_actor.info("Done dispatching all messages")
-
-    # Now that all message exchanges were initiated, wait for their completion in one single call
-    Comm.wait_all(pending_comms)
-
-    this_actor.info("Goodbye now!")
-
-
-def receiver(my_id):
-    mbox = Mailbox.by_name("receiver-{:d}".format(my_id))
-
-    this_actor.info("Wait for my first message")
-    while True:
-        received = mbox.get()
-        this_actor.info("I got a '{:s}'.".format(received))
-        if received == "finalize":
-            break  # If it's a finalize message, we're done.
-
-
-if __name__ == '__main__':
-    e = Engine(sys.argv)
-
-    # Load the platform description
-    e.load_platform(sys.argv[1])
-
-    Actor.create("sender", Host.by_name("Tremblay"), sender, 5, 1000000, 2)
-    Actor.create("receiver", Host.by_name("Ruby"), receiver, 0)
-    Actor.create("receiver", Host.by_name("Perl"), receiver, 1)
-
-    e.run()
diff --git a/examples/python/comm-waitall/comm-waitall.tesh b/examples/python/comm-waitall/comm-waitall.tesh
deleted file mode 100644 (file)
index 32ca46c..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/usr/bin/env tesh
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitall.py ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (2:receiver@Ruby) Wait for my first message
-> [  0.000000] (3:receiver@Perl) Wait for my first message
-> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [  0.004022] (2:receiver@Ruby) I got a 'Message 0'.
-> [  0.004022] (3:receiver@Perl) I got a 'Message 1'.
-> [  0.008043] (2:receiver@Ruby) I got a 'Message 2'.
-> [  0.008043] (3:receiver@Perl) I got a 'Message 3'.
-> [  0.009995] (3:receiver@Perl) I got a 'finalize'.
-> [  0.012065] (2:receiver@Ruby) I got a 'Message 4'.
-> [  0.014016] (2:receiver@Ruby) I got a 'finalize'.
-> [  0.014016] (1:sender@Tremblay) Goodbye now!
diff --git a/examples/python/comm-waitallfor/comm-waitallfor.py b/examples/python/comm-waitallfor/comm-waitallfor.py
deleted file mode 100644 (file)
index 0f38795..0000000
+++ /dev/null
@@ -1,133 +0,0 @@
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-"""
-This example implements the following scenario:
-- Multiple workers consume jobs (Job) from a shared mailbox (worker)
-- A client first dispatches several jobs (with a simulated 'cost' - i.e. time the worker will 'process' the job)
-- The client then waits for all job results for a maximum time set by the 'wait timeout' (Comm.wait_all_for)
-- The client then displays the status of individual jobs.
-"""
-
-
-from argparse import ArgumentParser
-from dataclasses import dataclass
-from typing import List
-from uuid import uuid4
-import sys
-
-from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
-
-
-SIMULATED_JOB_SIZE_BYTES = 1024
-SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
-
-
-def parse_requests(requests_str: str) -> List[float]:
-    return [float(item.strip()) for item in requests_str.split(",")]
-
-
-def create_parser() -> ArgumentParser:
-    parser = ArgumentParser()
-    parser.add_argument(
-        '--platform',
-        type=str,
-        required=True,
-        help='path to the platform description'
-    )
-    parser.add_argument(
-        "--workers",
-        type=int,
-        default=1,
-        help="number of worker actors to start"
-    )
-    parser.add_argument(
-        "--jobs",
-        type=parse_requests,
-        default="1,2,3,4,5",
-        help="duration of individual jobs sent to the workers by the client"
-    )
-    parser.add_argument(
-        "--wait-timeout",
-        type=float,
-        default=5.0,
-        help="number of seconds before the client gives up waiting for results and aborts the simulation"
-    )
-    return parser
-
-
-@dataclass
-class Job:
-    job_id: str
-    duration: float
-    result_mailbox: Mailbox
-
-
-def worker(worker_id: str):
-    this_actor.info(f"{worker_id} started")
-    mailbox: Mailbox = Mailbox.by_name("worker")
-    while True:
-        job: Job = mailbox.get()
-        this_actor.info(f"{worker_id} working on {job.job_id} (will take {job.duration}s to complete)")
-        this_actor.sleep_for(job.duration)
-        job.result_mailbox.put(f"{worker_id}", SIMULATED_RESULT_SIZE_BYTES)
-
-
-@dataclass
-class AsyncJobResult:
-    job: Job
-    result_comm: Comm
-    async_data: PyGetAsync
-
-    @property
-    def complete(self) -> bool:
-        return self.result_comm.test()
-
-    @property
-    def status(self) -> str:
-        return "complete" if self.complete else "pending"
-
-
-def client(client_id: str, jobs: List[float], wait_timeout: float):
-    worker_mailbox: Mailbox = Mailbox.by_name("worker")
-    this_actor.info(f"{client_id} started")
-    async_job_results: list[AsyncJobResult] = []
-    for job_idx, job_duration in enumerate(jobs):
-        result_mailbox: Mailbox = Mailbox.by_name(str(uuid4()))
-        job = Job(job_id=f"job-{job_idx}", duration=job_duration, result_mailbox=result_mailbox)
-        out_comm = worker_mailbox.put_init(Job(
-            job_id=f"job-{job_idx}",
-            duration=job_duration,
-            result_mailbox=result_mailbox
-        ), SIMULATED_JOB_SIZE_BYTES)
-        out_comm.detach()
-        result_comm, async_data = result_mailbox.get_async()
-        async_job_results.append(AsyncJobResult(
-            job=job,
-            result_comm=result_comm,
-            async_data=async_data
-        ))
-    this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
-    completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
-    logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
-    logger(f"received {completed_comms}/{len(async_job_results)} results")
-    for result in async_job_results:
-        this_actor.info(f"{result.job.job_id}"
-                        f" status={result.status}"
-                        f" result_payload={result.async_data.get() if result.complete else ''}")
-
-
-def main():
-    settings = create_parser().parse_known_args()[0]
-    e = Engine(sys.argv)
-    e.load_platform(settings.platform)
-    Actor.create("client", Host.by_name("Tremblay"), client, "client", settings.jobs, settings.wait_timeout)
-    for worker_idx in range(settings.workers):
-        Actor.create("worker", Host.by_name("Ruby"), worker, f"worker-{worker_idx}").daemonize()
-    e.run()
-
-
-if __name__ == "__main__":
-    main()
diff --git a/examples/python/comm-waitallfor/comm-waitallfor.tesh b/examples/python/comm-waitallfor/comm-waitallfor.tesh
deleted file mode 100644 (file)
index 50cc88c..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-#!/usr/bin/env tesh
-
-p Testing Comm.wait_all_for()
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout 1 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[  0.000000] (2:worker@Ruby) worker-0 started
->[  0.000000] (1:client@Tremblay) client started
->[  0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=1.0s)
->[  0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
->[  1.000000] (1:client@Tremblay) received 0/5 results
->[  1.000000] (1:client@Tremblay) job-0 status=pending result_payload=
->[  1.000000] (1:client@Tremblay) job-1 status=pending result_payload=
->[  1.000000] (1:client@Tremblay) job-2 status=pending result_payload=
->[  1.000000] (1:client@Tremblay) job-3 status=pending result_payload=
->[  1.000000] (1:client@Tremblay) job-4 status=pending result_payload=
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout 5 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[  0.000000] (2:worker@Ruby) worker-0 started
->[  0.000000] (1:client@Tremblay) client started
->[  0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=5.0s)
->[  0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
->[  1.008029] (2:worker@Ruby) worker-0 working on job-1 (will take 2.0s to complete)
->[  3.014105] (2:worker@Ruby) worker-0 working on job-2 (will take 3.0s to complete)
->[  5.000000] (1:client@Tremblay) received 2/5 results
->[  5.000000] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
->[  5.000000] (1:client@Tremblay) job-1 status=complete result_payload=worker-0
->[  5.000000] (1:client@Tremblay) job-2 status=pending result_payload=
->[  5.000000] (1:client@Tremblay) job-3 status=pending result_payload=
->[  5.000000] (1:client@Tremblay) job-4 status=pending result_payload=
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout -1 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[  0.000000] (2:worker@Ruby) worker-0 started
->[  0.000000] (1:client@Tremblay) client started
->[  0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=-1.0s)
->[  0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
->[  1.008029] (2:worker@Ruby) worker-0 working on job-1 (will take 2.0s to complete)
->[  3.014105] (2:worker@Ruby) worker-0 working on job-2 (will take 3.0s to complete)
->[  6.020181] (2:worker@Ruby) worker-0 working on job-3 (will take 4.0s to complete)
->[ 10.026257] (2:worker@Ruby) worker-0 working on job-4 (will take 5.0s to complete)
->[ 15.030379] (1:client@Tremblay) received 5/5 results
->[ 15.030379] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
->[ 15.030379] (1:client@Tremblay) job-1 status=complete result_payload=worker-0
->[ 15.030379] (1:client@Tremblay) job-2 status=complete result_payload=worker-0
->[ 15.030379] (1:client@Tremblay) job-3 status=complete result_payload=worker-0
->[ 15.030379] (1:client@Tremblay) job-4 status=complete result_payload=worker-0
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 5 --wait-timeout 3 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[  0.000000] (2:worker@Ruby) worker-0 started
->[  0.000000] (3:worker@Ruby) worker-1 started
->[  0.000000] (4:worker@Ruby) worker-2 started
->[  0.000000] (5:worker@Ruby) worker-3 started
->[  0.000000] (6:worker@Ruby) worker-4 started
->[  0.000000] (1:client@Tremblay) client started
->[  0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=3.0s)
->[  0.001954] (6:worker@Ruby) worker-4 working on job-4 (will take 5.0s to complete)
->[  0.001954] (5:worker@Ruby) worker-3 working on job-3 (will take 4.0s to complete)
->[  0.001954] (4:worker@Ruby) worker-2 working on job-2 (will take 3.0s to complete)
->[  0.001954] (3:worker@Ruby) worker-1 working on job-1 (will take 2.0s to complete)
->[  0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
->[  3.000000] (1:client@Tremblay) received 2/5 results
->[  3.000000] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
->[  3.000000] (1:client@Tremblay) job-1 status=complete result_payload=worker-1
->[  3.000000] (1:client@Tremblay) job-2 status=pending result_payload=
->[  3.000000] (1:client@Tremblay) job-3 status=pending result_payload=
->[  3.000000] (1:client@Tremblay) job-4 status=pending result_payload=
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 5 --wait-timeout -1 --jobs 5,10,5,20,5,40,5,80,5,160 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[  0.000000] (2:worker@Ruby) worker-0 started
->[  0.000000] (3:worker@Ruby) worker-1 started
->[  0.000000] (4:worker@Ruby) worker-2 started
->[  0.000000] (5:worker@Ruby) worker-3 started
->[  0.000000] (6:worker@Ruby) worker-4 started
->[  0.000000] (1:client@Tremblay) client started
->[  0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=-1.0s)
->[  0.001954] (6:worker@Ruby) worker-4 working on job-4 (will take 5.0s to complete)
->[  0.001954] (5:worker@Ruby) worker-3 working on job-3 (will take 20.0s to complete)
->[  0.001954] (4:worker@Ruby) worker-2 working on job-2 (will take 5.0s to complete)
->[  0.001954] (3:worker@Ruby) worker-1 working on job-1 (will take 10.0s to complete)
->[  0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 5.0s to complete)
->[  5.008029] (2:worker@Ruby) worker-0 working on job-7 (will take 80.0s to complete)
->[  5.008029] (4:worker@Ruby) worker-2 working on job-6 (will take 5.0s to complete)
->[  5.008029] (6:worker@Ruby) worker-4 working on job-5 (will take 40.0s to complete)
->[ 10.008029] (3:worker@Ruby) worker-1 working on job-8 (will take 5.0s to complete)
->[ 10.014105] (4:worker@Ruby) worker-2 working on job-9 (will take 160.0s to complete)
->[170.018227] (1:client@Tremblay) received 10/10 results
->[170.018227] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
->[170.018227] (1:client@Tremblay) job-1 status=complete result_payload=worker-1
->[170.018227] (1:client@Tremblay) job-2 status=complete result_payload=worker-2
->[170.018227] (1:client@Tremblay) job-3 status=complete result_payload=worker-3
->[170.018227] (1:client@Tremblay) job-4 status=complete result_payload=worker-4
->[170.018227] (1:client@Tremblay) job-5 status=complete result_payload=worker-4
->[170.018227] (1:client@Tremblay) job-6 status=complete result_payload=worker-2
->[170.018227] (1:client@Tremblay) job-7 status=complete result_payload=worker-0
->[170.018227] (1:client@Tremblay) job-8 status=complete result_payload=worker-1
->[170.018227] (1:client@Tremblay) job-9 status=complete result_payload=worker-2
diff --git a/examples/python/comm-waitany/comm-waitany.py b/examples/python/comm-waitany/comm-waitany.py
deleted file mode 100644 (file)
index 76dd8ce..0000000
+++ /dev/null
@@ -1,87 +0,0 @@
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-"""
-This example shows how to block on the completion of a set of communications.
-
-As for the other asynchronous examples, the sender initiate all the messages it wants to send and
-pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
-
-The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
-will notice events as soon as they occur even if it does not follow the order of the container.
-
-Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
-other messages of this application.  As expected, the trace shows that the finalize of worker 1 is
-processed before 'Message 5' that is sent to worker 0.
-"""
-
-import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
-
-
-def sender(messages_count, msg_size, receivers_count):
-    # List in which we store all ongoing communications
-    pending_comms = []
-
-    # Vector of the used mailboxes
-    mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
-              for i in range(0, receivers_count)]
-
-    # Start dispatching all messages to receivers, in a round robin fashion
-    for i in range(0, messages_count):
-        content = "Message {:d}".format(i)
-        mbox = mboxes[i % receivers_count]
-
-        this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
-
-        # Create a communication representing the ongoing communication, and store it in pending_comms
-        comm = mbox.put_async(content, msg_size)
-        pending_comms.append(comm)
-
-    # Start sending messages to let the workers know that they should stop
-    for i in range(0, receivers_count):
-        mbox = mboxes[i]
-        this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
-        comm = mbox.put_async("finalize", 0)
-        pending_comms.append(comm)
-
-    this_actor.info("Done dispatching all messages")
-
-    # Now that all message exchanges were initiated, wait for their completion, in order of completion.
-    #
-    # This loop waits for first terminating message with wait_any() and remove it with del, until all comms are
-    # terminated.
-    # Even in this simple example, the pending comms do not terminate in the exact same order of creation.
-    while pending_comms:
-        changed_pos = Comm.wait_any(pending_comms)
-        del pending_comms[changed_pos]
-        if changed_pos != 0:
-            this_actor.info(
-                "Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first."
-                .format(changed_pos))
-
-    this_actor.info("Goodbye now!")
-
-
-def receiver(my_id):
-    mbox = Mailbox.by_name("receiver-{:d}".format(my_id))
-    this_actor.info("Wait for my first message")
-    while True:
-        received = mbox.get()
-        this_actor.info("I got a '{:s}'.".format(received))
-        if received == "finalize":
-            break  # If it's a finalize message, we're done.
-
-if __name__ == '__main__':
-    e = Engine(sys.argv)
-
-    # Load the platform description
-    e.load_platform(sys.argv[1])
-
-    Actor.create("sender", Host.by_name("Tremblay"), sender, 6, 1000000, 2)
-    Actor.create("receiver", Host.by_name("Fafard"), receiver, 0)
-    Actor.create("receiver", Host.by_name("Jupiter"), receiver, 1)
-
-    e.run()
diff --git a/examples/python/comm-waitany/comm-waitany.tesh b/examples/python/comm-waitany/comm-waitany.tesh
deleted file mode 100644 (file)
index d4593c1..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/usr/bin/env tesh
-
-p Testing Comm.wait_any()
-
-! output sort 19
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitany.py ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)'
-> [  0.000000] (2:receiver@Fafard) Wait for my first message
-> [  0.000000] (3:receiver@Jupiter) Wait for my first message
-> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 5' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [  0.158397] (2:receiver@Fafard) I got a 'Message 0'.
-> [  0.169155] (3:receiver@Jupiter) I got a 'Message 1'.
-> [  0.316794] (2:receiver@Fafard) I got a 'Message 2'.
-> [  0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
-> [  0.475190] (2:receiver@Fafard) I got a 'Message 4'.
-> [  0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [  0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
-> [  0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
-> [  0.526478] (3:receiver@Jupiter) I got a 'finalize'.
-> [  0.526478] (1:sender@Tremblay) Goodbye now!
index fc957d8..40df779 100644 (file)
@@ -9,7 +9,7 @@ This example shows how to simulate a non-linear resource sharing for network lin
 
 import functools
 import sys
-from simgrid import Actor, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor
+from simgrid import Actor, ActivitySet, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor
 
 class Sender:
     """
@@ -22,7 +22,7 @@ class Sender:
     # Actors that are created as object will execute their __call__ method.
     # So, the following constitutes the main function of the Sender actor.
     def __call__(self):
-        pending_comms = []
+        pending_comms = ActivitySet()
         mbox = Mailbox.by_name("receiver")
 
         for i in range(self.msg_count):
@@ -30,12 +30,12 @@ class Sender:
             size = self.msg_size * (i + 1)
             this_actor.info("Send '%s' to '%s, msg size: %d'" % (msg, mbox.name, size))
             comm = mbox.put_async(msg, size)
-            pending_comms.append(comm)
+            pending_comms.push(comm)
 
         this_actor.info("Done dispatching all messages")
 
         # Now that all message exchanges were initiated, wait for their completion in one single call
-        Comm.wait_all(pending_comms)
+        pending_comms.wait_all()
 
         this_actor.info("Goodbye now!")
 
@@ -50,21 +50,16 @@ class Receiver:
     def __call__(self):
         mbox = Mailbox.by_name("receiver")
 
-        pending_msgs = []
-        pending_comms = []
+        pending_comms = ActivitySet()
 
         this_actor.info("Wait for %d messages asynchronously" % self.msg_count)
         for _ in range(self.msg_count):
-            comm, data = mbox.get_async()
-            pending_comms.append(comm)
-            pending_msgs.append(data)
+            comm = mbox.get_async()
+            pending_comms.push(comm)
 
-        while pending_comms:
-            index = Comm.wait_any(pending_comms)
-            msg = pending_msgs[index].get()
-            this_actor.info("I got '%s'." % msg)
-            del pending_comms[index]
-            del pending_msgs[index]
+        while not pending_comms.empty():
+            comm = pending_comms.wait_any()
+            this_actor.info("I got '%s'." % comm.get_payload())
 
 ####################################################################################################
 def link_nonlinear(link: Link, capacity: float, n: int) -> float:
@@ -104,8 +99,7 @@ def load_platform():
     link.set_latency(10e-6).seal()
 
     # create routes between nodes
-    zone.add_route(sender.netpoint, receiver.netpoint, None, None,
-                   [LinkInRoute(link, LinkInRoute.Direction.UP)], True)
+    zone.add_route(sender, receiver, [link])
     zone.seal()
 
     # create actors Sender/Receiver
index b50b60c..f221f49 100644 (file)
@@ -6,7 +6,7 @@
 from typing import List, Tuple
 import sys
 
-from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync
+from simgrid import Engine, Actor, ActivitySet, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
 
 
 RECEIVER_MAILBOX_NAME = "receiver"
@@ -19,7 +19,7 @@ class Sender(object):
 
     def __call__(self) -> None:
         # List in which we store all ongoing communications
-        pending_comms: List[Comm] = []
+        pending_comms = ActivitySet()
 
         # Make a vector of the mailboxes to use
         receiver_mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
@@ -27,12 +27,12 @@ class Sender(object):
             message_content = f"Message {i}"
             this_actor.info(f"Send '{message_content}' to '{receiver_mailbox.name}'")
             # Create a communication representing the ongoing communication, and store it in pending_comms
-            pending_comms.append(receiver_mailbox.put_async(message_content, self.message_size))
+            pending_comms.push(receiver_mailbox.put_async(message_content, self.message_size))
 
         this_actor.info("Done dispatching all messages")
 
         # Now that all message exchanges were initiated, wait for their completion in one single call
-        Comm.wait_all(pending_comms)
+        pending_comms.wait_all()
 
         this_actor.info("Goodbye now!")
 
@@ -44,15 +44,13 @@ class Receiver(object):
 
     def __call__(self):
         # List in which we store all incoming msgs
-        pending_comms: List[Tuple[Comm, PyGetAsync]] = []
+        pending_comms = ActivitySet()
         this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
         for _ in range(self.messages_count):
-            pending_comms.append(self.mailbox.get_async())
-        while pending_comms:
-            index = Comm.wait_any([comm for (comm, _) in pending_comms])
-            _, async_data = pending_comms[index]
-            this_actor.info(f"I got '{async_data.get()}'.")
-            pending_comms.pop(index)
+            pending_comms.push(self.mailbox.get_async())
+        while not pending_comms.empty():
+            comm = pending_comms.wait_any()
+            this_actor.info(f"I got '{comm.get_payload()}'.")
 
 
 def main():
@@ -70,14 +68,7 @@ def main():
     link = zone.create_split_duplex_link("link1", 10e9).set_latency(10e-6).set_concurrency_limit(2).seal()
 
     # create routes between nodes
-    zone.add_route(
-        sender_host.netpoint,
-        receiver_host.netpoint,
-        None,
-        None,
-        [LinkInRoute(link, LinkInRoute.UP)],
-        True
-    )
+    zone.add_route(sender_host, receiver_host, [link])
     zone.seal()
 
     # create actors Sender/Receiver
index 9fce743..71d3ac8 100644 (file)
@@ -48,7 +48,7 @@ far: {host.computed_flops:.0E}, average load as reported by the HostLoad plugin:
   this_actor.info(f'Run an activity of {100E6:.0E} flops')
   this_actor.execute(100E6)
   this_actor.info(f'Done working on my activity; this took {Engine.clock - start}s; current peak speed: {host.speed:.0E} flop/s; number of flops computed so far: {host.computed_flops:.0E}')
-  Engine
+
   start = Engine.clock
   this_actor.info("========= Requesting a reset of the computation and load counters")
   host.reset_load()
index b2904c3..5be8922 100644 (file)
@@ -4,12 +4,12 @@
 # under the terms of the license (GNU LGPL) which comes with this package.
 
 """
-/* This example demonstrates how to dynamically modify a graph of tasks.
- *
- * Assuming we have two instances of a service placed on different hosts,
- * we want to send data alternatively to thoses instances.
- *
- * We consider the following graph:
+This example demonstrates how to dynamically modify a graph of tasks.
+
+Assuming we have two instances of a service placed on different hosts,
+we want to send data alternatively to thoses instances.
+
+We consider the following graph:
 
            comm1
      ┌────────────────────────┐
@@ -26,8 +26,8 @@
      │                        │
      └────────────────────────┘
            comm2
- */
- """
+
+"""
 
 from argparse import ArgumentParser
 import sys
index 9688bed..470836b 100644 (file)
@@ -47,8 +47,7 @@ void load_platform(const sg4::Engine& /*e*/)
 
   const sg4::Link* link9 = root->create_split_duplex_link("9", "7.20975MBps")->set_latency("1.461517ms")->seal();
 
-  root->add_route(tremblay->get_netpoint(), jupiter->get_netpoint(), nullptr, nullptr,
-                  {{link9, sg4::LinkInRoute::Direction::UP}}, true);
+  root->add_route(tremblay, jupiter, {link9});
   root->seal();
 
   /* set cost callback for MPI_Send and MPI_Recv */
index 0539a00..d81d9fa 100644 (file)
@@ -127,7 +127,10 @@ int main(int argc, char* argv[])
     SMPI_app_instance_start("alltoall_mpi", alltoall_mpi,
                             {e.host_by_name_or_null("Ginette"), e.host_by_name_or_null("Bourassa"),
                              e.host_by_name_or_null("Jupiter"), e.host_by_name_or_null("Fafard")});
+    SMPI_app_instance_join("alltoall_mpi");
+    XBT_INFO("This other alltoall_mpi instance terminated.");
   });
+
   e.run();
 
   XBT_INFO("Simulation time %g", simgrid::s4u::Engine::get_clock());
index 00d5dc8..ba6efdd 100644 (file)
@@ -50,4 +50,5 @@ $ ./masterworker_mailbox_smpi ${srcdir:=.}/../../platforms/small_platform_with_r
 > [Ginette:alltoall_mpi#0:(11) 10.036773] [smpi_masterworkers/INFO] after alltoall 0
 > [Bourassa:alltoall_mpi#1:(12) 10.046578] [smpi_masterworkers/INFO] after alltoall 1
 > [Fafard:alltoall_mpi#3:(14) 10.046865] [smpi_masterworkers/INFO] after alltoall 3
-> [Jupiter:alltoall_mpi#2:(13) 10.046865] [smpi_masterworkers/INFO] after alltoall 2
\ No newline at end of file
+> [Jupiter:alltoall_mpi#2:(13) 10.046865] [smpi_masterworkers/INFO] after alltoall 2
+> [Ginette:launcher:(10) 10.046865] [smpi_masterworkers/INFO] This other alltoall_mpi instance terminated.
\ No newline at end of file
diff --git a/include/simgrid/activity_set.h b/include/simgrid/activity_set.h
new file mode 100644 (file)
index 0000000..9028407
--- /dev/null
@@ -0,0 +1,31 @@
+/* Copyright (c) 2018-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef INCLUDE_SIMGRID_ACTIVITY_SET_H
+#define INCLUDE_SIMGRID_ACTIVITY_SET_H
+
+#include <simgrid/forward.h>
+#include <sys/types.h> /* ssize_t */
+
+/* C interface */
+SG_BEGIN_DECL
+
+XBT_PUBLIC sg_activity_set_t sg_activity_set_init();
+XBT_PUBLIC void sg_activity_set_push(sg_activity_set_t as, sg_activity_t acti);
+XBT_PUBLIC void sg_activity_set_erase(sg_activity_set_t as, sg_activity_t acti);
+XBT_PUBLIC size_t sg_activity_set_size(sg_activity_set_t as);
+XBT_PUBLIC int sg_activity_set_empty(sg_activity_set_t as);
+
+XBT_PUBLIC sg_activity_t sg_activity_set_test_any(sg_activity_set_t as);
+XBT_PUBLIC void sg_activity_set_wait_all(sg_activity_set_t as);
+/** Returns true if it terminated successfully (or false on timeout) */
+XBT_PUBLIC int sg_activity_set_wait_all_for(sg_activity_set_t as, double timeout);
+XBT_PUBLIC sg_activity_t sg_activity_set_wait_any(sg_activity_set_t as);
+XBT_PUBLIC sg_activity_t sg_activity_set_wait_any_for(sg_activity_set_t as, double timeout);
+XBT_PUBLIC void sg_activity_set_delete(sg_activity_set_t as);
+
+SG_END_DECL
+
+#endif /* INCLUDE_SIMGRID_ACTIVITY_SET_H */
index ee5f5e7..538122f 100644 (file)
 /* C interface */
 SG_BEGIN_DECL
 
+XBT_PUBLIC int sg_comm_isinstance(sg_activity_t acti);
+
 XBT_PUBLIC void sg_comm_detach(sg_comm_t comm, void (*clean_function)(void*));
 XBT_PUBLIC int sg_comm_test(sg_comm_t comm);
 XBT_PUBLIC sg_error_t sg_comm_wait(sg_comm_t comm);
 XBT_PUBLIC sg_error_t sg_comm_wait_for(sg_comm_t comm, double timeout);
-XBT_PUBLIC void sg_comm_wait_all(sg_comm_t* comms, size_t count);
-XBT_PUBLIC size_t sg_comm_wait_all_for(sg_comm_t* comms, size_t count, double timeout);
-XBT_PUBLIC ssize_t sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout);
-XBT_PUBLIC ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count);
 XBT_PUBLIC void sg_comm_unref(sg_comm_t comm);
 
+#ifndef DOXYGEN
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC
+    void sg_comm_wait_all(sg_comm_t* comms, size_t count);
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC
+    ssize_t sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout);
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC
+    ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count);
+#endif
+
 SG_END_DECL
 
 #endif /* INCLUDE_SIMGRID_COMM_H_ */
index 42e413b..565684c 100644 (file)
@@ -12,6 +12,8 @@
 /* C interface */
 SG_BEGIN_DECL
 
+XBT_PUBLIC int sg_exec_isinstance(sg_activity_t acti);
+
 XBT_PUBLIC void sg_exec_set_bound(sg_exec_t exec, double bound);
 XBT_PUBLIC const char* sg_exec_get_name(const_sg_exec_t exec);
 XBT_PUBLIC void sg_exec_set_name(sg_exec_t exec, const char* name);
@@ -24,8 +26,11 @@ XBT_PUBLIC void sg_exec_cancel(sg_exec_t exec);
 XBT_PUBLIC int sg_exec_test(sg_exec_t exec);
 XBT_PUBLIC sg_error_t sg_exec_wait(sg_exec_t exec);
 XBT_PUBLIC sg_error_t sg_exec_wait_for(sg_exec_t exec, double timeout);
-XBT_PUBLIC ssize_t sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout);
-XBT_PUBLIC ssize_t sg_exec_wait_any(sg_exec_t* execs, size_t count);
+
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC ssize_t
+    sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout);
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC ssize_t
+    sg_exec_wait_any(sg_exec_t* execs, size_t count);
 
 SG_END_DECL
 
index 6c72682..85fe6a7 100644 (file)
@@ -221,6 +221,8 @@ class RemoteApp;
 } // namespace simgrid
 
 using s4u_Actor             = simgrid::s4u::Actor;
+using s4u_Activity          = simgrid::s4u::Activity;
+using s4u_ActivitySet       = simgrid::s4u::ActivitySet;
 using s4u_Barrier           = simgrid::s4u::Barrier;
 using s4u_Comm              = simgrid::s4u::Comm;
 using s4u_Exec              = simgrid::s4u::Exec;
@@ -239,6 +241,8 @@ using smx_activity_t = simgrid::kernel::activity::ActivityImpl*;
 #else
 
 typedef struct s4u_Actor s4u_Actor;
+typedef struct s4u_Activity s4u_Activity;
+typedef struct s4u_ActivitySet s4u_ActivitySet;
 typedef struct s4u_Barrier s4u_Barrier;
 typedef struct s4u_Comm s4u_Comm;
 typedef struct s4u_Exec s4u_Exec;
@@ -289,6 +293,16 @@ typedef s4u_Actor* sg_actor_t;
 /** Pointer to a constant actor object */
 typedef const s4u_Actor* const_sg_actor_t;
 
+/** Pointer to an activity object */
+typedef s4u_Activity* sg_activity_t;
+/** Pointer to a constant activity object */
+typedef const s4u_Activity* const_sg_activity_t;
+
+/** Pointer to an activity set object */
+typedef s4u_ActivitySet* sg_activity_set_t;
+/** Pointer to a constant activity set object */
+typedef const s4u_ActivitySet* const_sg_activity_set_t;
+
 /** @ingroup m_datatypes_management_details
  * @brief Type for any SimGrid size
  */
diff --git a/include/simgrid/plugins/jbod.hpp b/include/simgrid/plugins/jbod.hpp
new file mode 100644 (file)
index 0000000..5c84e91
--- /dev/null
@@ -0,0 +1,78 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_PLUGIN_JBOD_HPP
+#define SIMGRID_PLUGIN_JBOD_HPP
+#include <simgrid/s4u/Host.hpp>
+#include <simgrid/s4u/Io.hpp>
+
+namespace simgrid::plugin {
+
+class JbodIo;
+/** Smart pointer to a simgrid::s4u::Activity */
+using JbodIoPtr = boost::intrusive_ptr<JbodIo>;
+XBT_PUBLIC void intrusive_ptr_release(const JbodIo* io);
+XBT_PUBLIC void intrusive_ptr_add_ref(const JbodIo* io);
+
+class Jbod : public s4u::Host {
+public:
+  enum class RAID {RAID0 = 0, RAID1 = 1, RAID4 = 4 , RAID5 = 5, RAID6 = 6};
+  int get_parity_disk_idx() { return parity_disk_idx_; }
+  void update_parity_disk_idx() { parity_disk_idx_ = (parity_disk_idx_- 1) % num_disks_; }
+
+  int get_next_read_disk_idx() { return (++read_disk_idx_) % num_disks_; }
+
+  JbodIoPtr read_async(sg_size_t size);
+  sg_size_t read(sg_size_t size);
+
+  JbodIoPtr write_async(sg_size_t size);
+  sg_size_t write(sg_size_t size);
+
+  static Jbod* create_jbod(s4u::NetZone* zone, const std::string& name, double speed, unsigned int num_disks,
+                           RAID raid_level, double read_bandwidth, double write_bandwidth);
+
+protected:
+  void set_num_disks(unsigned int num_disks) { num_disks_ = num_disks; }
+  void set_parity_disk_idx(unsigned int index) { parity_disk_idx_ = index; }
+  void set_read_disk_idx(int index) { read_disk_idx_ = index; }
+  void set_raid_level(RAID raid_level) { raid_level_ = raid_level; }
+
+private:
+  unsigned int num_disks_;
+  RAID raid_level_;
+  unsigned int parity_disk_idx_;
+  int read_disk_idx_;
+};
+
+class JbodIo {
+  const Jbod* jbod_;
+  s4u::CommPtr transfer_;
+  s4u::ExecPtr parity_block_comp_;
+  std::vector<s4u::IoPtr> pending_ios_;
+  s4u::Io::OpType type_;
+  std::atomic_int_fast32_t refcount_{0};
+public:
+
+  explicit JbodIo(const Jbod* jbod, const s4u::CommPtr transfer, const s4u::ExecPtr parity_block_comp,
+                  const std::vector<s4u::IoPtr>& pending_ios, s4u::Io::OpType type)
+    : jbod_(jbod), transfer_(transfer), parity_block_comp_(parity_block_comp), pending_ios_(pending_ios), type_(type)
+    {}
+
+  void wait();
+
+#ifndef DOXYGEN
+  friend void intrusive_ptr_release(JbodIo* io)
+  {
+    if (io->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
+      std::atomic_thread_fence(std::memory_order_acquire);
+      delete io;
+    }
+  }
+  friend void intrusive_ptr_add_ref(JbodIo* io) { io->refcount_.fetch_add(1, std::memory_order_relaxed); }
+#endif
+};
+
+} // namespace simgrid::plugin
+#endif
\ No newline at end of file
index 9973a00..ef51321 100644 (file)
@@ -158,8 +158,6 @@ public:
   virtual Activity* do_start() = 0;
   /** Tests whether the given activity is terminated yet. */
   virtual bool test();
-  /*! take a vector s4u::ActivityPtr and return the rank of the first finished one (or -1 if none is done). */
-  static ssize_t test_any(const std::vector<ActivityPtr>& activities);
 
   /** Blocks the current actor until the activity is terminated */
   Activity* wait() { return wait_for(-1.0); }
@@ -169,11 +167,6 @@ public:
   /** Blocks the current actor until the activity is terminated, or until the time limit is reached\n
    * Raises: timeout exception. */
   void wait_until(double time_limit);
-  /*! take a vector of s4u::ActivityPtr and return when one of them is finished.
-   * The return value is the rank of the first finished ActivityPtr. */
-  static ssize_t wait_any(const std::vector<ActivityPtr>& activities) { return wait_any_for(activities, -1); }
-  /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
-  static ssize_t wait_any_for(const std::vector<ActivityPtr>& activities, double timeout);
 
   /** Cancel that activity */
   Activity* cancel();
@@ -205,6 +198,9 @@ public:
   kernel::activity::ActivityImpl* get_impl() const { return pimpl_.get(); }
 
 #ifndef DOXYGEN
+  static ssize_t deprecated_wait_any_for(const std::vector<ActivityPtr>& activities, double timeout); // XBT_ATTRIB_DEPRECATED_v339
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t test_any(const std::vector<ActivityPtr>& activities);
+
   friend void intrusive_ptr_release(Activity* a)
   {
     if (a->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
@@ -283,16 +279,13 @@ public:
    *  dependency or no resource assigned) */
   void on_this_veto_cb(const std::function<void(AnyActivity&)>& cb) { on_this_veto.connect(cb); }
 
-  XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(
-      const std::function<void(Activity const&)>& cb)
-  {
-    on_suspend.connect(cb);
-  }
-  XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(
-      const std::function<void(Activity const&)>& cb)
-  {
-    on_resume.connect(cb);
-  }
+#ifndef DOXYGEN
+  XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
+  XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(const std::function<void(Activity const&)>& cb) { on_resume.connect(cb); }
+
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any(const std::vector<ActivityPtr>& activities) { return deprecated_wait_any_for(activities, -1); }
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any_for(const std::vector<ActivityPtr>& activities, double timeout) { return deprecated_wait_any_for(activities, timeout); }
+#endif
 
   AnyActivity* add_successor(ActivityPtr a)
   {
index 172bb36..49c79d6 100644 (file)
 
 #include <vector>
 
-namespace simgrid::s4u {
+namespace simgrid {
+
+extern template class XBT_PUBLIC xbt::Extendable<s4u::ActivitySet>;
+
+namespace s4u {
 /** @brief ActivitiesSet
  *
  * This class is a container of activities, allowing to wait for the completion of any or all activities in the set.
@@ -19,10 +23,12 @@ namespace simgrid::s4u {
  * activities.
  */
 class XBT_PUBLIC ActivitySet : public xbt::Extendable<ActivitySet> {
-  std::vector<ActivityPtr>
-      activities_; // We use a vector instead of a set to improve reproductibility accross architectures
+  std::atomic_int_fast32_t refcount_{1};
+  std::vector<ActivityPtr> activities_; // Use vectors, not sets for better reproductibility accross architectures
   std::vector<ActivityPtr> failed_activities_;
 
+  void handle_failed_activities();
+
 public:
   ActivitySet()  = default;
   ActivitySet(const std::vector<ActivityPtr> init) : activities_(init) {}
@@ -74,8 +80,22 @@ public:
 
   ActivityPtr get_failed_activity();
   bool has_failed_activities() { return not failed_activities_.empty(); }
+
+  // boost::intrusive_ptr<ActivitySet> support:
+  friend void intrusive_ptr_add_ref(ActivitySet* as)
+  {
+    XBT_ATTRIB_UNUSED auto previous = as->refcount_.fetch_add(1);
+    xbt_assert(previous != 0);
+  }
+
+  friend void intrusive_ptr_release(ActivitySet* as)
+  {
+    if (as->refcount_.fetch_sub(1) == 1)
+      delete as;
+  }
 };
 
-}; // namespace simgrid::s4u
+} // namespace s4u
+} // namespace simgrid
 
 #endif
index 3b1f417..cf4cf5a 100644 (file)
@@ -151,6 +151,9 @@ public:
   void* get_dst_data() const { return dst_buff_; }
   /** Retrieve the size of the received data. Not to be mixed with @ref Activity::get_remaining()  */
   size_t get_dst_data_size() const { return dst_buff_size_; }
+  /** Retrieve the payload associated to the communication. You can only do that once the comm is (gracefully)
+   * terminated, and it is only setup by the default copy_data callback (not the SMPI one) */
+  void* get_payload() const;
 
   /* Common functions */
 
@@ -181,20 +184,17 @@ public:
 
   Comm* wait_for(double timeout) override;
 
-  /*! \static take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done). */
-  static ssize_t test_any(const std::vector<CommPtr>& comms);
+#ifndef DOXYGEN
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any(const std::vector<CommPtr>& comms) { return deprecated_wait_any_for(comms, -1); }
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout) { return deprecated_wait_any_for(comms, timeout); }
 
-  /*! \static take a vector s4u::CommPtr and return when one of them is finished.
-   * The return value is the rank of the first finished CommPtr. */
-  static ssize_t wait_any(const std::vector<CommPtr>& comms) { return wait_any_for(comms, -1); }
-  /*! \static Same as wait_any, but with a timeout. Return -1 if the timeout occurs.*/
-  static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout);
+  static ssize_t deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout);
 
-  /*! \static take a vector s4u::CommPtr and return when all of them is finished. */
-  static void wait_all(const std::vector<CommPtr>& comms);
-  /*! \static Same as wait_all, but with a timeout. Return the number of terminated comm (less than comms.size() if
-   *  the timeout occurs). */
-  static size_t wait_all_for(const std::vector<CommPtr>& comms, double timeout);
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t test_any(const std::vector<CommPtr>& comms);
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static void wait_all(const std::vector<CommPtr>& comms);
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static size_t
+      wait_all_for(const std::vector<CommPtr>& comms, double timeout);
+#endif
 };
 } // namespace simgrid::s4u
 
index b8e728a..8167b87 100644 (file)
@@ -50,12 +50,6 @@ public:
   /*! \static Initiate the creation of an Exec. Setters have to be called afterwards */
   static ExecPtr init();
 
-  /*! \static take a vector of s4u::ExecPtr and return when one of them is finished.
-   * The return value is the rank of the first finished ExecPtr. */
-  static ssize_t wait_any(const std::vector<ExecPtr>& execs) { return wait_any_for(execs, -1); }
-  /*! \static Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
-  static ssize_t wait_any_for(const std::vector<ExecPtr>& execs, double timeout);
-
   /** @brief On sequential executions, returns the amount of flops that remain to be done; This cannot be used on
    * parallel executions. */
   double get_remaining() const override;
@@ -81,6 +75,15 @@ public:
   double get_cost() const;
   bool is_parallel() const { return parallel_; }
   bool is_assigned() const override;
+
+#ifndef DOXYGEN
+  static ssize_t deprecated_wait_any_for(const std::vector<ExecPtr>& execs, double timeout); // XBT_ATTRIB_DEPRECATED_v339
+
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t
+      wait_any(const std::vector<ExecPtr>& execs) { return deprecated_wait_any_for(execs, -1); }
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t
+      wait_any_for(const std::vector<ExecPtr>& execs, double timeout) { return deprecated_wait_any_for(execs, timeout); }
+#endif
 };
 
 } // namespace simgrid::s4u
index 2dc88c6..6dc9a3f 100644 (file)
@@ -28,16 +28,19 @@ protected:
   explicit Io(kernel::activity::IoImplPtr pimpl);
   Io* do_start() override;
 
+  static ssize_t deprecated_wait_any_for(const std::vector<IoPtr>& ios, double timeout);
+
 public:
   enum class OpType { READ, WRITE };
 
    /*! \static Initiate the creation of an I/O. Setters have to be called afterwards */
   static IoPtr init();
-  /*! \static take a vector of s4u::IoPtr and return when one of them is finished.
-   * The return value is the rank of the first finished IoPtr. */
-  static ssize_t wait_any(const std::vector<IoPtr>& ios) { return wait_any_for(ios, -1); }
-  /*! \static Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
-  static ssize_t wait_any_for(const std::vector<IoPtr>& ios, double timeout);
+#ifndef DOXYGEN
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") 
+  static ssize_t wait_any(const std::vector<IoPtr>& ios) { return deprecated_wait_any_for(ios, -1); }
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") 
+  static ssize_t wait_any_for(const std::vector<IoPtr>& ios, double timeout) { return deprecated_wait_any_for(ios, timeout); }
+#endif
 
   double get_remaining() const override;
   sg_size_t get_performed_ioops() const;
index b5723df..b8eddff 100644 (file)
@@ -118,6 +118,9 @@ public:
   CommPtr get_init();
   /** Creates and start an async data reception to that mailbox */
   template <typename T> CommPtr get_async(T** data);
+  /** Creates and start an async data reception to that mailbox. Since the data location is not provided, you'll have to
+   * use Comm::get_payload once the comm terminates */
+  CommPtr get_async();
 
   /** Blocking data reception */
   template <typename T> T* get();
index 97ecfbb..e9b909f 100644 (file)
@@ -81,6 +81,39 @@ public:
    */
   void add_route(kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst, kernel::routing::NetPoint* gw_src,
                  kernel::routing::NetPoint* gw_dst, const std::vector<LinkInRoute>& link_list, bool symmetrical = true);
+  /**
+   * @brief Add a route between 2 netpoints, and same in other direction
+   *
+   * Create a route:
+   * - route between 2 hosts/routers in same netzone, no gateway is needed
+   * - route between 2 netzones, connecting 2 gateways.
+   *
+   * @param src Source netzone's netpoint
+   * @param dst Destination netzone' netpoint
+   * @param gw_src Netpoint of the gateway in the source netzone
+   * @param gw_dst Netpoint of the gateway in the destination netzone
+   * @param link_list List of links
+   */
+  void add_route(kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst, kernel::routing::NetPoint* gw_src,
+                 kernel::routing::NetPoint* gw_dst, const std::vector<const Link*>& links);
+
+  /**
+   * @brief Add a route between 2 hosts
+   *
+   * @param src Source host
+   * @param dst Destination host
+   * @param link_list List of links and their direction used in this communication
+   * @param symmetrical Bi-directional communication
+   */
+  void add_route(const Host* src, const Host* dst, const std::vector<LinkInRoute>& link_list, bool symmetrical = true);
+  /**
+   * @brief Add a route between 2 hosts
+   *
+   * @param src Source host
+   * @param dst Destination host
+   * @param link_list List of links. The UP direction will be used on src->dst and DOWN direction on dst->src
+   */
+  void add_route(const Host* src, const Host* dst, const std::vector<const Link*>& links);
 
   void add_bypass_route(kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst,
                         kernel::routing::NetPoint* gw_src, kernel::routing::NetPoint* gw_dst,
index ad3140c..453a6ef 100644 (file)
@@ -1235,6 +1235,7 @@ SG_END_DECL
 #ifdef __cplusplus
 XBT_PUBLIC void SMPI_app_instance_start(const char* name, std::function<void()> const& code,
                                         std::vector<simgrid::s4u::Host*> const& hosts);
+XBT_PUBLIC void SMPI_app_instance_join(const std::string& instance_id);
 
 /* This version without parameter is nice to use with SMPI_app_instance_start() */
 XBT_PUBLIC void MPI_Init();
index 07517ed..792ae4b 100644 (file)
@@ -12,6 +12,7 @@
 #include "simgrid/kernel/routing/NetPoint.hpp"
 #include "simgrid/plugins/load.h"
 #include <simgrid/Exception.hpp>
+#include <simgrid/s4u/ActivitySet.hpp>
 #include <simgrid/s4u/Actor.hpp>
 #include <simgrid/s4u/Barrier.hpp>
 #include <simgrid/s4u/Comm.hpp>
 #include <vector>
 
 namespace py = pybind11;
+using simgrid::s4u::Activity;
+using simgrid::s4u::ActivityPtr;
+using simgrid::s4u::ActivitySet;
+using simgrid::s4u::ActivitySetPtr;
 using simgrid::s4u::Actor;
 using simgrid::s4u::ActorPtr;
 using simgrid::s4u::Barrier;
@@ -71,15 +76,6 @@ std::string get_simgrid_version()
   sg_version_get(&major, &minor, &patch);
   return simgrid::xbt::string_printf("%i.%i.%i", major, minor, patch);
 }
-
-/** @brief Wrap for mailbox::get_async */
-class PyGetAsync {
-  std::unique_ptr<PyObject*> data = std::make_unique<PyObject*>();
-
-public:
-  PyObject** get() const { return data.get(); }
-};
-
 } // namespace
 
 PYBIND11_DECLARE_HOLDER_TYPE(T, boost::intrusive_ptr<T>)
@@ -223,7 +219,8 @@ PYBIND11_MODULE(simgrid, m)
               }
             });
           },
-          "Registers the main function of an actor");
+          "Registers the main function of an actor")
+      .def("set_log_control", [](Engine*, const std::string& settings) { xbt_log_control_set(settings.c_str()); });
 
   /* Class Netzone */
   py::class_<simgrid::s4u::NetZone, std::unique_ptr<simgrid::s4u::NetZone, py::nodelete>> netzone(
@@ -243,6 +240,14 @@ PYBIND11_MODULE(simgrid, m)
                              simgrid::kernel::routing::NetPoint*, simgrid::kernel::routing::NetPoint*,
                              const std::vector<simgrid::s4u::LinkInRoute>&, bool>(&simgrid::s4u::NetZone::add_route),
            "Add a route between 2 netpoints")
+      .def("add_route",
+           py::overload_cast<const simgrid::s4u::Host*, const simgrid::s4u::Host*,
+                             const std::vector<simgrid::s4u::LinkInRoute>&, bool>(&simgrid::s4u::NetZone::add_route),
+           "Add a route between 2 netpoints")
+      .def("add_route",
+           py::overload_cast<const simgrid::s4u::Host*, const simgrid::s4u::Host*,
+                             const std::vector<const simgrid::s4u::Link*>&>(&simgrid::s4u::NetZone::add_route),
+           "Add a route between 2 netpoints")
       .def("create_host", py::overload_cast<const std::string&, double>(&simgrid::s4u::NetZone::create_host),
            "Creates a host")
       .def("create_host",
@@ -620,26 +625,17 @@ PYBIND11_MODULE(simgrid, m)
           "get", [](Mailbox* self) { return py::reinterpret_steal<py::object>(self->get<PyObject>()); },
           py::call_guard<py::gil_scoped_release>(), "Blocking data reception")
       .def(
-          "get_async",
-          [](Mailbox* self) -> std::tuple<CommPtr, PyGetAsync> {
-            PyGetAsync wrap;
-            auto comm = self->get_async(wrap.get());
-            return std::make_tuple(std::move(comm), std::move(wrap));
-          },
+          "get_async", [](Mailbox* self) -> CommPtr { return self->get_async(); },
           py::call_guard<py::gil_scoped_release>(),
           "Non-blocking data reception. Use data.get() to get the python object after the communication has finished")
       .def("set_receiver", &Mailbox::set_receiver, py::call_guard<py::gil_scoped_release>(),
            "Sets the actor as permanent receiver");
 
-  /* Class PyGetAsync */
-  py::class_<PyGetAsync>(m, "PyGetAsync", "Wrapper for async get communications")
-      .def(py::init<>())
-      .def(
-          "get", [](const PyGetAsync* self) { return py::reinterpret_steal<py::object>(*(self->get())); },
-          "Get python object after async communication in receiver side");
+  /* class Activity */
+  py::class_<Activity, ActivityPtr>(m, "Activity", "Activity. See the C++ documentation for details.");
 
   /* Class Comm */
-  py::class_<Comm, CommPtr>(m, "Comm", "Communication. See the C++ documentation for details.")
+  py::class_<Comm, CommPtr, Activity>(m, "Comm", "Communication. See the C++ documentation for details.")
       .def_property_readonly("dst_data_size", &Comm::get_dst_data_size, py::call_guard<py::gil_scoped_release>(),
                              "Retrieve the size of the received data.")
       .def_property_readonly("mailbox", &Comm::get_mailbox, py::call_guard<py::gil_scoped_release>(),
@@ -676,6 +672,11 @@ PYBIND11_MODULE(simgrid, m)
            "Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
       .def("wait_until", &Comm::wait_until, py::call_guard<py::gil_scoped_release>(), py::arg("time_limit"),
            "Block until the completion of that communication, or raises TimeoutException after the specified time.")
+      .def(
+          "get_payload",
+          [](const Comm* self) { return py::reinterpret_steal<py::object>((PyObject*)self->get_payload()); },
+          py::call_guard<py::gil_scoped_release>(),
+          "Retrieve the message's payload of a get_async. You cannot call this until after the comm termination.")
       .def("detach", py::overload_cast<>(&Comm::detach), py::return_value_policy::reference_internal,
            py::call_guard<py::gil_scoped_release>(),
            "Start the comm, and ignore its result. It can be completely forgotten after that.")
@@ -688,37 +689,19 @@ PYBIND11_MODULE(simgrid, m)
                   py::arg("to"), py::arg("simulated_size_in_bytes"),
                   "Do a blocking communication between two arbitrary hosts.\n\nThis initializes a communication that "
                   "completely bypass the mailbox and actors mechanism. There is really no limit on the hosts involved. "
-                  "In particular, the actor does not have to be on one of the involved hosts.")
-      .def_static("test_any", &Comm::test_any, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
-                  "take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done)")
-      .def_static("wait_all", &Comm::wait_all, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
-                  "Block until the completion of all communications in the list.")
-      .def_static("wait_all_for", &Comm::wait_all_for, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
-                  py::arg("timeout"),
-                  "Block until the completion of all communications in the list, or raises TimeoutException after "
-                  "the specified timeout.")
-      .def_static("wait_any", &Comm::wait_any, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
-                  "Block until the completion of any communication in the list and return the index of the "
-                  "terminated one.")
-      .def_static("wait_any_for", &Comm::wait_any_for, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
-                  py::arg("timeout"),
-                  "Block until the completion of any communication in the list and return the index of the terminated "
-                  "one, or -1 if a timeout occurred.");
+                  "In particular, the actor does not have to be on one of the involved hosts.");
 
   /* Class Io */
-  py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr>(m, "Io", "I/O activities. See the C++ documentation for details.")
+  py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr, Activity>(m, "Io",
+                                                              "I/O activities. See the C++ documentation for details.")
       .def("test", &simgrid::s4u::Io::test, py::call_guard<py::gil_scoped_release>(),
            "Test whether the I/O is terminated.")
       .def("wait", &simgrid::s4u::Io::wait, py::call_guard<py::gil_scoped_release>(),
-           "Block until the completion of that I/O operation")
-      .def_static(
-          "wait_any_for", &simgrid::s4u::Io::wait_any_for, py::call_guard<py::gil_scoped_release>(),
-          "Block until the completion of any I/O in the list (or timeout) and return the index of the terminated one.")
-      .def_static("wait_any", &simgrid::s4u::Io::wait_any, py::call_guard<py::gil_scoped_release>(),
-                  "Block until the completion of any I/O in the list and return the index of the terminated one.");
+           "Block until the completion of that I/O operation");
 
   /* Class Exec */
-  py::class_<simgrid::s4u::Exec, simgrid::s4u::ExecPtr>(m, "Exec", "Execution. See the C++ documentation for details.")
+  py::class_<simgrid::s4u::Exec, simgrid::s4u::ExecPtr, Activity>(m, "Exec",
+                                                                  "Execution. See the C++ documentation for details.")
       .def_property_readonly("remaining", &simgrid::s4u::Exec::get_remaining, py::call_guard<py::gil_scoped_release>(),
                              "Amount of flops that remain to be computed until completion (read-only property).")
       .def_property_readonly("remaining_ratio", &simgrid::s4u::Exec::get_remaining_ratio,
@@ -938,4 +921,41 @@ PYBIND11_MODULE(simgrid, m)
       .def(
           "__repr__", [](const IoTaskPtr io) { return "IoTask(" + io->get_name() + ")"; },
           "Textual representation of the IoTask");
+
+  /* Class ActivitySet */
+  py::class_<ActivitySet, ActivitySetPtr>(m, "ActivitySet", "ActivitySet. See the C++ documentation for details.")
+      .def(py::init([](std::vector<simgrid::s4u::ActivityPtr> activities) {
+             auto* ret = new ActivitySet();
+             for (auto a : activities)
+               ret->push(a);
+             return ActivitySetPtr(ret);
+           }),
+           "The constructor should take the parameters from the command line, as is ")
+      .def(py::init([]() { return ActivitySetPtr(new ActivitySet()); }),
+           "The constructor should take the parameters from the command line, as is ")
+
+      .def("push", &ActivitySet::push, py::call_guard<py::gil_scoped_release>(), py::arg("activity"),
+           "Add an activity to the set")
+      .def("erase", &ActivitySet::erase, py::call_guard<py::gil_scoped_release>(), py::arg("activity"),
+           "Remove that activity from the set")
+      .def_property_readonly("size", &ActivitySet::size, "Count of activities in the set")
+      .def("empty", &ActivitySet::empty, "Returns whether the set is empty")
+      .def("has_failed_activities", &ActivitySet::has_failed_activities,
+           "Returns whether there is any failed activities")
+      .def("get_failed_activity", &ActivitySet::get_failed_activity, "Returns a failed activity from the set, or None")
+
+      .def("wait_all_for", &ActivitySet::wait_all_for, py::call_guard<py::gil_scoped_release>(), py::arg("timeout"),
+           "Wait for the completion of all activities in the set, but not longer than the provided timeout")
+      .def("wait_all", &ActivitySet::wait_all, py::call_guard<py::gil_scoped_release>(),
+           "Wait for the completion of all activities in the set, endlessly")
+      .def("test_any", &ActivitySet::test_any, py::call_guard<py::gil_scoped_release>(),
+           "Returns the first terminated activity if any, or None if no activity is terminated")
+      .def("wait_any_for", &ActivitySet::wait_any_for, py::call_guard<py::gil_scoped_release>(), py::arg("timeout"),
+           "Wait for the completion of one activity in the set, but not longer than the provided timeout")
+      .def("wait_any", &ActivitySet::wait_any, py::call_guard<py::gil_scoped_release>(),
+           "Wait for the completion of one activity in the set, endlessly")
+
+      .def(
+          "__repr__", [](const ActivitySetPtr as) { return "ActivitySet([...])"; },
+          "Textual representation of the ActivitySet");
 }
index e5c1ddf..25a7661 100644 (file)
@@ -37,7 +37,9 @@ CommImpl::CommImpl()
 std::function<void(CommImpl*, void*, size_t)> CommImpl::copy_data_callback_ = [](kernel::activity::CommImpl* comm,
                                                                                  void* buff, size_t buff_size) {
   xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
-  *(void**)(comm->dst_buff_) = buff;
+  if (comm->dst_buff_ != nullptr) // get_async provided a buffer
+    *(void**)(comm->dst_buff_) = buff;
+  comm->payload_ = buff; // Setup what will be retrieved by s4u::Comm::get_payload()
 };
 
 void CommImpl::set_copy_data_callback(const std::function<void(CommImpl*, void*, size_t)>& callback)
@@ -192,7 +194,7 @@ void CommImpl::copy_data()
 {
   size_t buff_size = src_buff_size_;
   /* If there is no data to copy then return */
-  if (not src_buff_ || not dst_buff_ || copied_)
+  if (not src_buff_ || not dst_buff_size_ || copied_)
     return;
 
   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this,
index 4fcf67c..f3d9839 100644 (file)
@@ -98,6 +98,7 @@ expectations of the other side, too. See  */
   unsigned char* dst_buff_ = nullptr;
   size_t src_buff_size_    = 0;
   size_t* dst_buff_size_   = nullptr;
+  void* payload_           = nullptr; // If dst_buff_ is NULL, the default copy callback puts the data here
 
   void* src_data_ = nullptr; /* User data associated to the communication */
   void* dst_data_ = nullptr;
index c8e2c8c..ab03651 100644 (file)
@@ -110,7 +110,7 @@ public:
       const std::function<void(void*)>& clean_fun, // used to free the synchro in case of problem after a detached send
       const std::function<void(activity::CommImpl*, void*, size_t)>&
           copy_data_fun, // used to copy data if not default one
-      void* payload, bool detached, std::string fun_call)
+      void* payload, bool detached, std::string_view fun_call)
       : SimcallObserver(actor)
       , mbox_(mbox)
       , payload_size_(payload_size)
@@ -160,7 +160,7 @@ public:
   CommIrecvSimcall(ActorImpl* actor, activity::MailboxImpl* mbox, unsigned char* dst_buff, size_t* dst_buff_size,
                    const std::function<bool(void*, void*, activity::CommImpl*)>& match_fun,
                    const std::function<void(activity::CommImpl*, void*, size_t)>& copy_data_fun, void* payload,
-                   double rate, std::string fun_call)
+                   double rate, std::string_view fun_call)
       : SimcallObserver(actor)
       , mbox_(mbox)
       , dst_buff_(dst_buff)
index b4b4a32..3d85fe3 100644 (file)
@@ -331,7 +331,8 @@ static void sg_platf_new_cluster_flat(simgrid::kernel::routing::ClusterCreationA
   if (cluster->router_id.empty())
     cluster->router_id = cluster->prefix + cluster->id + "_router" + cluster->suffix;
   auto* router = zone->create_router(cluster->router_id);
-  zone->add_route(router, nullptr, nullptr, nullptr, {});
+  std::vector<simgrid::s4u::LinkInRoute> links;
+  zone->add_route(router, nullptr, nullptr, nullptr, links);
 
   simgrid::kernel::routing::on_cluster_creation(*cluster);
 }
index 0b665a8..472c571 100644 (file)
@@ -48,7 +48,7 @@ public:
        chosen = xbt::random::uniform_int(0, possibilities-1);
 
     for (auto const& [aid, actor] : actors_to_run_) {
-        if (((not actor.is_todo()) && must_be_todo) || actor.is_done() || (not actor.is_enabled()))
+      if (((not actor.is_todo()) && must_be_todo) || actor.is_done() || (not actor.is_enabled()))
         continue;
       if (chosen == 0) {
         return std::make_pair(aid, valuation.at(aid));
diff --git a/src/plugins/jbod.cpp b/src/plugins/jbod.cpp
new file mode 100644 (file)
index 0000000..bd42728
--- /dev/null
@@ -0,0 +1,169 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/plugins/jbod.hpp>
+#include <simgrid/s4u/Comm.hpp>
+#include <simgrid/s4u/Disk.hpp>
+#include <simgrid/s4u/Exec.hpp>
+#include <simgrid/s4u/NetZone.hpp>
+
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_jbod, s4u, "Logging specific to the JBOD implmentation");
+
+namespace simgrid::plugin {
+
+Jbod* Jbod::create_jbod(s4u::NetZone* zone, const std::string& name, double speed, unsigned int num_disks,
+                        RAID raid_level, double read_bandwidth, double write_bandwidth)
+{
+  xbt_assert(not ((raid_level == RAID::RAID4 || raid_level == RAID::RAID5) && num_disks < 3),
+             "RAID%d requires at least 3 disks", (int) raid_level);
+  xbt_assert(not (raid_level == RAID::RAID6 && num_disks < 4), "RAID6 requires at least 4 disks");
+
+  auto* jbod = static_cast<Jbod*>(zone->create_host(name, speed));
+  jbod->set_num_disks(num_disks);
+  jbod->set_parity_disk_idx(num_disks -1 );
+  jbod->set_read_disk_idx(-1);
+  jbod->set_raid_level(raid_level);
+  for (unsigned int i = 0; i < num_disks; i++)
+    jbod->create_disk(name + "_disk_" + std::to_string(i), read_bandwidth, write_bandwidth);
+
+  return jbod;
+}
+
+JbodIoPtr Jbod::read_async(sg_size_t size)
+{
+  auto comm = s4u::Comm::sendto_init()->set_source(const_cast<Jbod*>(this))->set_payload_size(size);
+  std::vector<s4u::IoPtr> pending_ios;
+  sg_size_t read_size = 0;
+  std::vector<s4u::Disk*> targets;
+  switch(raid_level_) {
+    case RAID::RAID0:
+      read_size = size / num_disks_;
+      targets = get_disks();
+      break;
+    case RAID::RAID1:
+      read_size = size;
+      targets.push_back(get_disks().at(get_next_read_disk_idx()));
+      break;
+    case RAID::RAID4:
+      read_size = size / (num_disks_ - 1);
+      targets = get_disks();
+      targets.pop_back();
+      break;
+    case RAID::RAID5:
+      read_size = size / (num_disks_ - 1);
+      targets = get_disks();
+      targets.erase(targets.begin() + (get_parity_disk_idx() + 1 % num_disks_));
+      break;
+    case RAID::RAID6:
+      read_size = size / (num_disks_ - 2);
+      targets = get_disks();
+      if ( (get_parity_disk_idx() + 2 % num_disks_) == 0 ) {
+        targets.pop_back();
+        targets.erase(targets.begin());
+      } else if (get_parity_disk_idx() + 1 == static_cast<int>(num_disks_)) {
+        targets.pop_back();
+        targets.pop_back();
+      } else {
+        targets.erase(targets.begin() + (get_parity_disk_idx() + 1) % num_disks_,
+                      targets.begin() + get_parity_disk_idx() + 3);
+      }
+      break;
+    default:
+      xbt_die("Unsupported RAID level. Supported level are: 0, 1, 4, 5, and 6");
+  }
+  for (const auto* disk : targets) {
+    auto io = s4u::IoPtr(disk->io_init(read_size, s4u::Io::OpType::READ));
+    io->set_name(disk->get_name())->start();
+    pending_ios.push_back(io);
+  }
+
+  return JbodIoPtr(new JbodIo(this, comm, nullptr, pending_ios, s4u::Io::OpType::READ));
+}
+
+sg_size_t Jbod::read(sg_size_t size)
+{
+  read_async(size)->wait();
+  return size;
+}
+
+JbodIoPtr Jbod::write_async(sg_size_t size)
+{
+  auto comm = s4u::Comm::sendto_init(s4u::Host::current(), const_cast<Jbod*>(this));
+  std::vector<s4u::IoPtr> pending_ios;
+  sg_size_t write_size = 0;
+  switch(raid_level_) {
+    case RAID::RAID0:
+      write_size = size / num_disks_;
+      break;
+    case RAID::RAID1:
+      write_size = size;
+      break;
+    case RAID::RAID4:
+      write_size = size / (num_disks_ - 1);
+      break;
+    case RAID::RAID5:
+      update_parity_disk_idx();
+      write_size = size / (num_disks_ - 1);
+      break;
+    case RAID::RAID6:
+      update_parity_disk_idx();
+      update_parity_disk_idx();
+      write_size = size / (num_disks_ - 2);
+      break;
+    default:
+      xbt_die("Unsupported RAID level. Supported level are: 0, 1, 4, 5, and 6");
+  }
+  for (const auto* disk : get_disks()) {
+    auto io = s4u::IoPtr(disk->io_init(write_size, s4u::Io::OpType::WRITE));
+    io->set_name(disk->get_name());
+    pending_ios.push_back(io);
+  }
+
+  s4u::ExecPtr parity_block_comp = nullptr;
+  if (raid_level_ == RAID::RAID4 || raid_level_ == RAID::RAID5 || raid_level_ == RAID::RAID6) {
+    // Assume 1 flop per byte to write per parity block and two for RAID6.
+    // Do not assign the Exec yet, will be done after the completion of the CommPtr
+    if (raid_level_ == RAID::RAID6)
+      parity_block_comp = s4u::Exec::init()->set_flops_amount(200 * write_size);
+    else
+      parity_block_comp = s4u::Exec::init()->set_flops_amount(write_size);
+  }
+
+  comm->set_payload_size(size)->start();
+  return JbodIoPtr(new JbodIo(this, comm, parity_block_comp, pending_ios, s4u::Io::OpType::WRITE));
+}
+
+sg_size_t Jbod::write(sg_size_t size)
+{
+  write_async(size)->wait();
+  return size;
+}
+
+void JbodIo::wait()
+{
+  if (type_ == s4u::Io::OpType::WRITE) {
+    transfer_->wait();
+    XBT_DEBUG("Data received on JBOD");
+    if (parity_block_comp_) {
+      parity_block_comp_->set_host(const_cast<Jbod*>(jbod_))->wait();
+      XBT_DEBUG("Parity block computed");
+    }
+    XBT_DEBUG("Start writing");
+    for (const auto& io : pending_ios_)
+      io->start();
+  }
+
+  for (const auto& io : pending_ios_) {
+    XBT_DEBUG("Wait for I/O on %s", io->get_cname());
+    io->wait();
+  }
+
+  if (type_ == s4u::Io::OpType::READ) {
+    XBT_DEBUG("Data read on JBOD, send it to %s", s4u::Host::current()->get_cname());
+    transfer_->set_destination(s4u::Host::current())->wait();
+  }
+}
+} // namespace simgrid::plugin
index 65fd067..445364f 100644 (file)
@@ -107,7 +107,7 @@ ssize_t Activity::test_any(const std::vector<ActivityPtr>& activities)
   return changed_pos;
 }
 
-ssize_t Activity::wait_any_for(const std::vector<ActivityPtr>& activities, double timeout)
+ssize_t Activity::deprecated_wait_any_for(const std::vector<ActivityPtr>& activities, double timeout) // XBT_ATTRIB_DEPRECATED_v339
 {
   std::vector<kernel::activity::ActivityImpl*> ractivities(activities.size());
   std::transform(begin(activities), end(activities), begin(ractivities),
index abd272d..215f126 100644 (file)
@@ -6,12 +6,18 @@
 #include "src/kernel/activity/ActivityImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
 #include "src/kernel/actor/CommObserver.hpp"
+#include <simgrid/Exception.hpp>
+#include <simgrid/activity_set.h>
 #include <simgrid/s4u/ActivitySet.hpp>
 #include <simgrid/s4u/Engine.hpp>
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activityset, s4u_activity, "S4U set of activities");
 
-namespace simgrid::s4u {
+namespace simgrid {
+
+template class xbt::Extendable<s4u::ActivitySet>;
+
+namespace s4u {
 
 void ActivitySet::erase(ActivityPtr a)
 {
@@ -58,6 +64,21 @@ ActivityPtr ActivitySet::test_any()
   return ret;
 }
 
+void ActivitySet::handle_failed_activities()
+{
+  for (size_t i = 0; i < activities_.size(); i++) {
+    auto act = activities_[i];
+    if (act->pimpl_->get_state() == kernel::activity::State::FAILED) {
+      act->complete(Activity::State::FAILED);
+
+      failed_activities_.push_back(act);
+      activities_[i] = activities_[activities_.size() - 1];
+      activities_.resize(activities_.size() - 1);
+      i--; // compensate the i++ occuring at the end of the loop
+    }
+  }
+}
+
 ActivityPtr ActivitySet::wait_any_for(double timeout)
 {
   std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
@@ -66,19 +87,30 @@ ActivityPtr ActivitySet::wait_any_for(double timeout)
 
   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
   kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
-  ssize_t changed_pos = kernel::actor::simcall_blocking(
-      [&observer] {
-        kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
-                                                     observer.get_timeout());
-      },
-      &observer);
-  xbt_assert(changed_pos != -1,
-             "ActivityImpl::wait_any_for is not supposed to return -1 but instead to raise exceptions");
-
-  auto ret = activities_.at(changed_pos);
-  erase(ret);
-  ret->complete(Activity::State::FINISHED);
-  return ret;
+  try {
+    ssize_t changed_pos = kernel::actor::simcall_blocking(
+        [&observer] {
+          kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+                                                       observer.get_timeout());
+        },
+        &observer);
+    if (changed_pos == -1)
+      throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+
+    auto ret = activities_.at(changed_pos);
+    erase(ret);
+    ret->complete(Activity::State::FINISHED);
+    return ret;
+  } catch (const HostFailureException& e) {
+    handle_failed_activities();
+    throw e;
+  } catch (const NetworkFailureException& e) {
+    handle_failed_activities();
+    throw e;
+  } catch (const StorageFailureException& e) {
+    handle_failed_activities();
+    throw e;
+  }
 }
 
 ActivityPtr ActivitySet::get_failed_activity()
@@ -90,4 +122,65 @@ ActivityPtr ActivitySet::get_failed_activity()
   return ret;
 }
 
-}; // namespace simgrid::s4u
\ No newline at end of file
+} // namespace s4u
+} // namespace simgrid
+
+SG_BEGIN_DECL
+
+sg_activity_set_t sg_activity_set_init()
+{
+  return new simgrid::s4u::ActivitySet();
+}
+void sg_activity_set_push(sg_activity_set_t as, sg_activity_t acti)
+{
+  as->push(acti);
+}
+void sg_activity_set_erase(sg_activity_set_t as, sg_activity_t acti)
+{
+  as->erase(acti);
+}
+size_t sg_activity_set_size(sg_activity_set_t as)
+{
+  return as->size();
+}
+int sg_activity_set_empty(sg_activity_set_t as)
+{
+  return as->empty();
+}
+
+sg_activity_t sg_activity_set_test_any(sg_activity_set_t as)
+{
+  return as->test_any().get();
+}
+void sg_activity_set_wait_all(sg_activity_set_t as)
+{
+  as->wait_all();
+}
+int sg_activity_set_wait_all_for(sg_activity_set_t as, double timeout)
+{
+  try {
+    as->wait_all_for(timeout);
+    return 1;
+  } catch (const simgrid::TimeoutException& e) {
+    return 0;
+  }
+}
+sg_activity_t sg_activity_set_wait_any(sg_activity_set_t as)
+{
+  return as->wait_any().get();
+}
+sg_activity_t sg_activity_set_wait_any_for(sg_activity_set_t as, double timeout)
+{
+  try {
+    return as->wait_any_for(timeout).get();
+  } catch (const simgrid::TimeoutException& e) {
+    return nullptr;
+  }
+}
+
+void sg_activity_set_delete(sg_activity_set_t as)
+{
+  delete as;
+}
+
+SG_END_DECL
index fe13409..ab9fefb 100644 (file)
@@ -6,6 +6,7 @@
 #include <cmath>
 #include <simgrid/Exception.hpp>
 #include <simgrid/comm.h>
+#include <simgrid/s4u/ActivitySet.hpp>
 #include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Engine.hpp>
 #include <simgrid/s4u/Mailbox.hpp>
@@ -283,6 +284,14 @@ CommPtr Comm::set_payload_size(uint64_t bytes)
   return this;
 }
 
+void* Comm::get_payload() const
+{
+  xbt_assert(get_state() == State::FINISHED,
+             "You can only retrieve the payload of a communication that gracefully terminated, but its state is %s.",
+             get_state_str());
+  return static_cast<kernel::activity::CommImpl*>(pimpl_.get())->payload_;
+}
+
 Actor* Comm::get_sender() const
 {
   kernel::actor::ActorImplPtr sender = nullptr;
@@ -309,18 +318,21 @@ Comm* Comm::do_start()
 {
   xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
              "You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
+
+  auto myself = kernel::actor::ActorImpl::self();
+
   if (get_source() != nullptr || get_destination() != nullptr) {
     xbt_assert(is_assigned(), "When either from_ or to_ is specified, both must be.");
     xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
                "Direct host-to-host communications cannot carry any data.");
     XBT_DEBUG("host-to-host Comm. Pimpl already created and set, just start it.");
-    fire_on_start();
-    fire_on_this_start();
     kernel::actor::simcall_answered([this] {
       pimpl_->set_state(kernel::activity::State::READY);
       boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->start();
     });
-  } else if (src_buff_ != nullptr) { // Sender side
+    fire_on_start();
+    fire_on_this_start();
+  } else if (myself == sender_) {
     on_send(*this);
     on_this_send(*this);
     kernel::actor::CommIsendSimcall observer{sender_,
@@ -337,7 +349,7 @@ Comm* Comm::do_start()
                                              "Isend"};
     pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
                                              &observer);
-  } else if (dst_buff_ != nullptr) { // Receiver side
+  } else if (myself == receiver_) {
     xbt_assert(not detached_, "Receive cannot be detached");
     on_recv(*this);
     on_this_recv(*this);
@@ -383,12 +395,21 @@ Comm* Comm::detach()
   return this;
 }
 
-ssize_t Comm::test_any(const std::vector<CommPtr>& comms)
+ssize_t Comm::test_any(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339
 {
-  std::vector<ActivityPtr> activities;
-  for (const auto& comm : comms)
-    activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
-  return Activity::test_any(activities);
+  std::vector<kernel::activity::ActivityImpl*> ractivities(comms.size());
+  std::transform(begin(comms), end(comms), begin(ractivities), [](const CommPtr& act) { return act->pimpl_.get(); });
+
+  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+  kernel::actor::ActivityTestanySimcall observer{issuer, ractivities, "test_any"};
+  ssize_t changed_pos = kernel::actor::simcall_answered(
+      [&observer] {
+        return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
+      },
+      &observer);
+  if (changed_pos != -1)
+    comms.at(changed_pos)->complete(State::FINISHED);
+  return changed_pos;
 }
 
 /** @brief Block the calling actor until the communication is finished, or until timeout
@@ -449,55 +470,60 @@ Comm* Comm::wait_for(double timeout)
   return this;
 }
 
-ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
+ssize_t Comm::deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout) // XBT_ATTRIB_DEPRECATED_v339
 {
-  std::vector<ActivityPtr> activities;
+  if (comms.empty())
+    return -1;
+  ActivitySet set;
   for (const auto& comm : comms)
-    activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
-  ssize_t changed_pos;
+    set.push(comm);
   try {
-    changed_pos = Activity::wait_any_for(activities, timeout);
+    auto* ret = set.wait_any_for(timeout).get();
+    for (size_t i = 0; i < comms.size(); i++)
+      if (comms[i].get() == ret)
+        return i;
+
+  } catch (TimeoutException& e) {
+    return -1;
   } catch (const NetworkFailureException& e) {
-    changed_pos = -1;
-    for (auto c : comms) {
-      if (c->pimpl_->get_state() == kernel::activity::State::FAILED) {
+    for (auto c : comms)
+      if (c->pimpl_->get_state() == kernel::activity::State::FAILED)
         c->complete(State::FAILED);
-      }
-    }
+
     e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
   }
-  return changed_pos;
+  return -1;
 }
 
-void Comm::wait_all(const std::vector<CommPtr>& comms)
+void Comm::wait_all(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339
 {
   // TODO: this should be a simcall or something
   for (const auto& comm : comms)
     comm->wait();
 }
 
-size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout)
+size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout) // XBT_ATTRIB_DEPRECATED_v339
 {
   if (timeout < 0.0) {
-    wait_all(comms);
+    for (const auto& comm : comms)
+      comm->wait();
     return comms.size();
   }
 
-  double deadline = Engine::get_clock() + timeout;
-  std::vector<CommPtr> waited_comm(1, nullptr);
-  for (size_t i = 0; i < comms.size(); i++) {
-    double wait_timeout = std::max(0.0, deadline - Engine::get_clock());
-    waited_comm[0]      = comms[i];
-    // Using wait_any_for() here (and not wait_for) because we don't want comms to be invalidated on timeout
-    if (wait_any_for(waited_comm, wait_timeout) == -1) {
-      XBT_DEBUG("Timeout (%g): i = %zu", wait_timeout, i);
-      return i;
-    }
-  }
-  return comms.size();
+  ActivitySet set;
+  for (auto comm : comms)
+    set.push(comm);
+  set.wait_all_for(timeout);
+
+  return set.size();
 }
 } // namespace simgrid::s4u
 /* **************************** Public C interface *************************** */
+int sg_comm_isinstance(sg_activity_t acti)
+{
+  return dynamic_cast<simgrid::s4u::Comm*>(acti) != nullptr;
+}
+
 void sg_comm_detach(sg_comm_t comm, void (*clean_function)(void*))
 {
   comm->detach(clean_function);
@@ -539,33 +565,34 @@ sg_error_t sg_comm_wait_for(sg_comm_t comm, double timeout)
 
 void sg_comm_wait_all(sg_comm_t* comms, size_t count)
 {
-  sg_comm_wait_all_for(comms, count, -1);
+  simgrid::s4u::ActivitySet as;
+  for (size_t i = 0; i < count; i++)
+    as.push(comms[i]);
+
+  as.wait_all();
 }
 
-size_t sg_comm_wait_all_for(sg_comm_t* comms, size_t count, double timeout)
+ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count)
 {
   std::vector<simgrid::s4u::CommPtr> s4u_comms;
   for (size_t i = 0; i < count; i++)
     s4u_comms.emplace_back(comms[i], false);
 
-  size_t pos = simgrid::s4u::Comm::wait_all_for(s4u_comms, timeout);
-  for (size_t i = pos; i < count; i++)
-    s4u_comms[i]->add_ref();
+  ssize_t pos = simgrid::s4u::Comm::deprecated_wait_any_for(s4u_comms, -1);
+  for (size_t i = 0; i < count; i++) {
+    if (pos != -1 && static_cast<size_t>(pos) != i)
+      s4u_comms[i]->add_ref();
+  }
   return pos;
 }
 
-ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count)
-{
-  return sg_comm_wait_any_for(comms, count, -1);
-}
-
 ssize_t sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout)
 {
   std::vector<simgrid::s4u::CommPtr> s4u_comms;
   for (size_t i = 0; i < count; i++)
     s4u_comms.emplace_back(comms[i], false);
 
-  ssize_t pos = simgrid::s4u::Comm::wait_any_for(s4u_comms, timeout);
+  ssize_t pos = simgrid::s4u::Comm::deprecated_wait_any_for(s4u_comms, timeout);
   for (size_t i = 0; i < count; i++) {
     if (pos != -1 && static_cast<size_t>(pos) != i)
       s4u_comms[i]->add_ref();
index 49f935e..caa7466 100644 (file)
@@ -6,6 +6,7 @@
 #include "simgrid/simix.hpp"
 #include <simgrid/Exception.hpp>
 #include <simgrid/exec.h>
+#include <simgrid/s4u/ActivitySet.hpp>
 #include <simgrid/s4u/Exec.hpp>
 #include <simgrid/s4u/Host.hpp>
 
@@ -51,12 +52,23 @@ Exec* Exec::do_start()
   return this;
 }
 
-ssize_t Exec::wait_any_for(const std::vector<ExecPtr>& execs, double timeout)
+ssize_t Exec::deprecated_wait_any_for(const std::vector<ExecPtr>& execs, double timeout) // XBT_ATTRIB_DEPRECATED_v339
 {
-  std::vector<ActivityPtr> activities;
+  if (execs.empty())
+    return -1;
+  ActivitySet set;
   for (const auto& exec : execs)
-    activities.push_back(boost::dynamic_pointer_cast<Activity>(exec));
-  return Activity::wait_any_for(activities, timeout);
+    set.push(exec);
+  try {
+    auto* ret = set.wait_any_for(timeout).get();
+    for (size_t i = 0; i < execs.size(); i++)
+      if (execs[i].get() == ret)
+        return i;
+
+  } catch (TimeoutException& e) {
+    return -1;
+  }
+  return -1;
 }
 
 /** @brief change the execution bound
@@ -247,6 +259,11 @@ bool Exec::is_assigned() const
 } // namespace simgrid::s4u
 
 /* **************************** Public C interface *************************** */
+int sg_exec_isinstance(sg_activity_t acti)
+{
+  return dynamic_cast<simgrid::s4u::Exec*>(acti) != nullptr;
+}
+
 void sg_exec_set_bound(sg_exec_t exec, double bound)
 {
   exec->set_bound(bound);
@@ -319,18 +336,27 @@ sg_error_t sg_exec_wait_for(sg_exec_t exec, double timeout)
   return status;
 }
 
-ssize_t sg_exec_wait_any(sg_exec_t* execs, size_t count)
+ssize_t sg_exec_wait_any(sg_exec_t* execs, size_t count) // XBT_ATTRIB_DEPRECATED_v339
 {
-  return sg_exec_wait_any_for(execs, count, -1.0);
+  std::vector<simgrid::s4u::ExecPtr> s4u_execs;
+  for (size_t i = 0; i < count; i++)
+    s4u_execs.emplace_back(execs[i], false);
+
+  ssize_t pos = simgrid::s4u::Exec::deprecated_wait_any_for(s4u_execs, -1.0);
+  for (size_t i = 0; i < count; i++) {
+    if (pos != -1 && static_cast<size_t>(pos) != i)
+      s4u_execs[i]->add_ref();
+  }
+  return pos;
 }
 
-ssize_t sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout)
+ssize_t sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout) // XBT_ATTRIB_DEPRECATED_v339
 {
   std::vector<simgrid::s4u::ExecPtr> s4u_execs;
   for (size_t i = 0; i < count; i++)
     s4u_execs.emplace_back(execs[i], false);
 
-  ssize_t pos = simgrid::s4u::Exec::wait_any_for(s4u_execs, timeout);
+  ssize_t pos = simgrid::s4u::Exec::deprecated_wait_any_for(s4u_execs, timeout);
   for (size_t i = 0; i < count; i++) {
     if (pos != -1 && static_cast<size_t>(pos) != i)
       s4u_execs[i]->add_ref();
index 7f0e45c..a0482b5 100644 (file)
@@ -3,6 +3,7 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
+#include <simgrid/s4u/ActivitySet.hpp>
 #include <simgrid/s4u/Disk.hpp>
 #include <simgrid/s4u/Io.hpp>
 #include <xbt/log.h>
@@ -94,12 +95,18 @@ Io* Io::do_start()
   return this;
 }
 
-ssize_t Io::wait_any_for(const std::vector<IoPtr>& ios, double timeout)
+ssize_t Io::deprecated_wait_any_for(const std::vector<IoPtr>& ios, double timeout)
 {
-  std::vector<ActivityPtr> activities;
+  ActivitySet set;
   for (const auto& io : ios)
-    activities.push_back(boost::dynamic_pointer_cast<Activity>(io));
-  return Activity::wait_any_for(activities, timeout);
+    set.push(boost::dynamic_pointer_cast<Activity>(io));
+
+  auto* ret = set.wait_any_for(timeout).get();
+  for (size_t i = 0; i < ios.size(); i++)
+    if (ios[i].get() == ret)
+      return i;
+
+  return -1;
 }
 
 IoPtr Io::set_disk(const_sg_disk_t disk)
index 6ce4a03..76613fb 100644 (file)
@@ -127,6 +127,13 @@ CommPtr Mailbox::get_init()
   return res;
 }
 
+CommPtr Mailbox::get_async()
+{
+  CommPtr res = get_init()->set_dst_data(nullptr, sizeof(void*));
+  res->start();
+  return res;
+}
+
 kernel::activity::ActivityImplPtr
 Mailbox::iprobe(int type, const std::function<bool(void*, void*, kernel::activity::CommImpl*)>& match_fun, void* data)
 {
index 139baf6..49b23a0 100644 (file)
@@ -91,6 +91,35 @@ void NetZone::add_route(kernel::routing::NetPoint* src, kernel::routing::NetPoin
 {
   pimpl_->add_route(src, dst, gw_src, gw_dst, link_list, symmetrical);
 }
+void NetZone::add_route(kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst,
+                        kernel::routing::NetPoint* gw_src, kernel::routing::NetPoint* gw_dst,
+                        const std::vector<const Link*>& links)
+{
+  std::vector<LinkInRoute> links_direct;
+  std::vector<LinkInRoute> links_reverse;
+  for (auto* l : links) {
+    links_direct.emplace_back(LinkInRoute(l, LinkInRoute::Direction::UP));
+    links_reverse.emplace_back(LinkInRoute(l, LinkInRoute::Direction::DOWN));
+  }
+  pimpl_->add_route(src, dst, gw_src, gw_dst, links_direct, false);
+  pimpl_->add_route(dst, src, gw_dst, gw_src, links_reverse, false);
+}
+
+void NetZone::add_route(const Host* src, const Host* dst, const std::vector<LinkInRoute>& link_list, bool symmetrical)
+{
+  pimpl_->add_route(src->get_netpoint(), dst->get_netpoint(), nullptr, nullptr, link_list, symmetrical);
+}
+void NetZone::add_route(const Host* src, const Host* dst, const std::vector<const Link*>& links)
+{
+  std::vector<LinkInRoute> links_direct;
+  std::vector<LinkInRoute> links_reverse;
+  for (auto* l : links) {
+    links_direct.emplace_back(LinkInRoute(l, LinkInRoute::Direction::UP));
+    links_reverse.emplace_back(LinkInRoute(l, LinkInRoute::Direction::DOWN));
+  }
+  pimpl_->add_route(src->get_netpoint(), dst->get_netpoint(), nullptr, nullptr, links_direct, false);
+  pimpl_->add_route(dst->get_netpoint(), src->get_netpoint(), nullptr, nullptr, links_reverse, false);
+}
 
 void NetZone::add_bypass_route(kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst,
                                kernel::routing::NetPoint* gw_src, kernel::routing::NetPoint* gw_dst,
index d7efa43..c30dfda 100644 (file)
@@ -6,9 +6,10 @@
 #include <climits>
 
 #include "private.hpp"
+#include "simgrid/s4u/Engine.hpp"
 #include "smpi_comm.hpp"
-#include "smpi_info.hpp"
 #include "smpi_errhandler.hpp"
+#include "smpi_info.hpp"
 #include "src/smpi/include/smpi_actor.hpp"
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(smpi_pmpi);
@@ -248,7 +249,7 @@ int PMPI_Attr_get(MPI_Comm comm, int keyval, void* attr_value, int* flag) {
     return MPI_SUCCESS;
   case MPI_UNIVERSE_SIZE:
     *flag = 1;
-    universe_size                   = smpi_get_universe_size();
+    universe_size                   = simgrid::s4u::Engine::get_instance()->get_host_count();
     *static_cast<int**>(attr_value) = &universe_size;
     return MPI_SUCCESS;
   case MPI_LASTUSEDCODE:
index 2133849..cae46b9 100644 (file)
@@ -74,7 +74,6 @@ using MPIR_Dist_Graph_Topology = SMPI_Dist_Graph_topology*;
 
 XBT_PRIVATE simgrid::smpi::ActorExt* smpi_process();
 XBT_PRIVATE simgrid::smpi::ActorExt* smpi_process_remote(simgrid::s4u::ActorPtr actor);
-XBT_PRIVATE int smpi_get_universe_size();
 
 XBT_PRIVATE void smpi_deployment_register_process(const std::string& instance_id, int rank,
                                                   const simgrid::s4u::Actor* actor);
index 66917b4..83bb798 100644 (file)
@@ -224,9 +224,6 @@ int ActorExt::sampling() const
 
 void ActorExt::init()
 {
-  xbt_assert(smpi_get_universe_size() != 0, "SimGrid was not initialized properly before entering MPI_Init. "
-                                            "Aborting, please check compilation process and use smpirun.");
-
   ActorExt* ext = smpi_process();
   // if we are in MPI_Init and argc handling has already been done.
   if (ext->initialized())
index e73865c..9c9d7c8 100644 (file)
@@ -15,15 +15,12 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(smpi);
 
 namespace simgrid::smpi::app {
 
-static int universe_size = 0;
-
 class Instance {
 public:
   explicit Instance(int max_no_processes) : size_(max_no_processes)
   {
     auto* group = new simgrid::smpi::Group(size_);
     comm_world_ = new simgrid::smpi::Comm(group, nullptr, false, -1);
-    universe_size += max_no_processes;
     bar_ = s4u::Barrier::create(size_);
   }
   s4u::BarrierPtr bar_;
@@ -75,6 +72,17 @@ void SMPI_app_instance_start(const char* name, const std::function<void()>& code
     rank++;
   }
 }
+void SMPI_app_instance_join(const std::string& instance_id)
+{
+  std::vector<simgrid::s4u::ActorPtr> actors =
+      simgrid::s4u::Engine::get_instance()->get_filtered_actors([instance_id](simgrid::s4u::ActorPtr act) {
+        auto* actor_instance = act->get_property("instance_id");
+        return actor_instance != nullptr && strcmp(actor_instance, instance_id.c_str()) == 0;
+      });
+
+  for (auto& act : actors)
+    act->join();
+}
 
 void smpi_deployment_register_process(const std::string& instance_id, int rank, const simgrid::s4u::Actor* actor)
 {
@@ -113,11 +121,6 @@ void smpi_deployment_cleanup_instances(){
   smpi_instances.clear();
 }
 
-int smpi_get_universe_size()
-{
-  return simgrid::smpi::app::universe_size;
-}
-
 /** @brief Auxiliary method to get list of hosts to deploy app */
 static std::vector<simgrid::s4u::Host*> smpi_get_hosts(const simgrid::s4u::Engine* e, const std::string& hostfile)
 {
index 5f2c22e..1751a7d 100644 (file)
@@ -112,11 +112,8 @@ int main(int argc, char** argv)
     links[name] = zone->create_link(name, 1e9)->set_latency(1e-9)->seal();
   }
   links["L0"] = zone->create_link("L0", 1e3)->seal();
-  zone->add_route(hosts["S1"]->get_netpoint(), hosts["C1"]->get_netpoint(), nullptr, nullptr,
-                  {sg4::LinkInRoute(links["L1"]), sg4::LinkInRoute(links["L0"]), sg4::LinkInRoute(links["L2"])});
-  zone->add_route(hosts["S2"]->get_netpoint(), hosts["C2"]->get_netpoint(), nullptr, nullptr,
-                  {sg4::LinkInRoute(links["L3"]), sg4::LinkInRoute(links["L0"]), sg4::LinkInRoute(links["L4"])});
-
+  zone->add_route(hosts["S1"], hosts["C1"], {links["L1"], links["L0"], links["L2"]});
+  zone->add_route(hosts["S2"], hosts["C2"], {links["L3"], links["L0"], links["L4"]});
   zone->seal();
 
   sg4::Actor::create("", hosts["S1"], sender, "C1", nullptr);
index 966f548..4dfd24f 100644 (file)
@@ -86,7 +86,7 @@ int main(int argc, char** argv)
   auto const* host1 = zone->create_host("host1", 1e6)->seal();
   auto const* host2 = zone->create_host("host2", 1e6)->seal();
   auto* testlink    = zone->create_link("L1", 1e10)->seal();
-  zone->add_route(host1->get_netpoint(), host2->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(testlink)});
+  zone->add_route(host1, host2, {testlink});
 
   simgrid::s4u::Actor::create("dispatcher", engine.host_by_name("host1"), main_dispatcher, testlink);
   engine.run();
index 8268a37..c9df586 100644 (file)
@@ -15,13 +15,13 @@ namespace sg4 = simgrid::s4u;
 XBT_LOG_NEW_DEFAULT_CATEGORY(issue105, "Issue105");
 static void load_generator(sg4::Mailbox* mailbox)
 {
-  std::vector<sg4::CommPtr> comms;
+  sg4::ActivitySet comms;
 
   // Send the task messages
   for (int i = 0; i < 100; i++) {
     auto* payload     = new int(i);
     sg4::CommPtr comm = mailbox->put_async(payload, 1024);
-    comms.push_back(comm);
+    comms.push(comm);
     sg4::this_actor::sleep_for(1.0);
   }
 
@@ -29,10 +29,10 @@ static void load_generator(sg4::Mailbox* mailbox)
   auto* payload     = new int(-1);
   sg4::CommPtr comm = mailbox->put_async(payload, 1024);
   XBT_INFO("Sent shutdown");
-  comms.push_back(comm);
+  comms.push(comm);
 
   // Wait for all messages to be consumed before ending the simulation
-  sg4::Comm::wait_all(comms);
+  comms.wait_all();
   XBT_INFO("Load generator finished");
 }
 
@@ -75,8 +75,7 @@ int main(int argc, char* argv[])
                                 ->set_bandwidth_profile(linkSaBandwidthProfile)
                                 ->seal();
 
-  world->add_route(hostGl01->get_netpoint(), hostSa01->get_netpoint(), nullptr, nullptr,
-                   {{linkSa, sg4::LinkInRoute::Direction::NONE}}, true);
+  world->add_route(hostGl01, hostSa01, {{linkSa, sg4::LinkInRoute::Direction::NONE}}, true);
   world->seal();
 
   sg4::Mailbox* mb1 = e.mailbox_by_name_or_create("Mailbox 1");
index 4a13429..4cea0bb 100644 (file)
@@ -47,8 +47,8 @@ int main(int argc, char* argv[])
   auto* rootzone = sg4::create_full_zone("root");
   auto* hostA    = rootzone->create_host("hostA", 1e9);
   auto* hostB    = rootzone->create_host("hostB", 1e9);
-  sg4::LinkInRoute link(rootzone->create_link("backbone", "1")->set_latency("1s")->seal());
-  rootzone->add_route(hostA->get_netpoint(), hostB->get_netpoint(), nullptr, nullptr, {link}, true);
+  auto* backb    = rootzone->create_link("backbone", "1")->set_latency("1s")->seal();
+  rootzone->add_route(hostA, hostB, {backb});
   rootzone->seal();
 
   sg4::Actor::create("ptask", hostA, ptask, hostA, hostB);
index a28a532..5bab3cb 100644 (file)
@@ -63,8 +63,8 @@ def load_platform():
     hosts.append(host2)
     link1 = dijkstra.create_link("link1_up", [1e9]).set_latency(1e-3).set_concurrency_limit(10).seal()
     link2 = dijkstra.create_link("link1_down", ["1GBps"]).set_latency("1ms").seal()
-    dijkstra.add_route(host1.netpoint, host2.netpoint, None, None, [LinkInRoute(link1)], False)
-    dijkstra.add_route(host2.netpoint, host1.netpoint, None, None, [LinkInRoute(link2)], False)
+    dijkstra.add_route(host1, host2, [LinkInRoute(link1)], False)
+    dijkstra.add_route(host2, host1, [LinkInRoute(link2)], False)
     dijkstra.seal()
 
     # vivaldi
index 56d9e61..a21866a 100644 (file)
@@ -6,7 +6,7 @@ endforeach()
 
 foreach(x actor actor-autorestart actor-suspend
         activity-lifecycle
-        comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for wait-any-for
+        comm-get-sender comm-pt2pt comm-fault-scenarios
         cloud-interrupt-migration cloud-two-execs
        monkey-masterworkers monkey-semaphore
         concurrent_rw
@@ -39,7 +39,7 @@ set_property(TARGET activity-lifecycle APPEND PROPERTY INCLUDE_DIRECTORIES "${IN
 
 ## Add the tests.
 ## Some need to be run with all factories, some don't need tesh to run
-foreach(x actor actor-autorestart actor-suspend activity-lifecycle comm-get-sender wait-all-for wait-any-for
+foreach(x actor actor-autorestart actor-suspend activity-lifecycle comm-get-sender
         cloud-interrupt-migration cloud-two-execs concurrent_rw dag-incomplete-simulation dependencies io-set-bw io-stream
              vm-live-migration vm-suicide)
   set(tesh_files    ${tesh_files}    ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.tesh)
index f485fc8..f00b25b 100644 (file)
@@ -312,9 +312,9 @@ TEST_CASE("Activity lifecycle: comm activities")
     simgrid::s4u::ActorPtr receiver = simgrid::s4u::Actor::create("receiver", all_hosts[1], []() {
       assert_exit(true, 2);
       int* data;
-      simgrid::s4u::CommPtr comm                       = simgrid::s4u::Mailbox::by_name("mb")->get_async<int>(&data);
-      std::vector<simgrid::s4u::CommPtr> pending_comms = {comm};
-      REQUIRE_NETWORK_FAILURE(simgrid::s4u::Comm::wait_any(pending_comms));
+      simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("mb")->get_async<int>(&data);
+      simgrid::s4u::ActivitySet pending_comms({comm});
+      REQUIRE_NETWORK_FAILURE(pending_comms.wait_any());
     });
 
     simgrid::s4u::ActorPtr sender = simgrid::s4u::Actor::create("sender", all_hosts[2], []() {
index 35867e0..855f56e 100644 (file)
@@ -60,19 +60,21 @@ template <int Duration, typename Activity> bool tester_wait_any(const Activity&
   const double timeout      = simgrid::s4u::Engine::get_clock() + duration;
   bool ret;
   try {
-    std::vector<Activity> activities = {activity};
+    simgrid::s4u::ActivitySet set;
+    set.push(activity);
+
     XBT_DEBUG("calling wait_any_for(%f)", duration);
-    ssize_t index = Activity::element_type::wait_any_for(activities, duration);
-    if (index == -1) {
-      XBT_DEBUG("wait_any_for() timed out");
-      INFO("wait_any_for() timeout should expire at expected date: " << timeout);
-      REQUIRE(simgrid::s4u::Engine::get_clock() == Approx(timeout));
-      ret = false;
-    } else {
-      XBT_DEBUG("wait_any_for() returned index %zd", index);
-      REQUIRE(index == 0);
-      ret = true;
-    }
+    auto waited_activity = set.wait_any_for(duration);
+
+    XBT_DEBUG("wait_any_for() returned activity %p", waited_activity.get());
+    REQUIRE(waited_activity.get() == activity);
+    ret = true;
+
+  } catch (const simgrid::TimeoutException& e) {
+    XBT_DEBUG("wait_any_for() timed out");
+    INFO("wait_any_for() timeout should expire at expected date: " << timeout);
+    REQUIRE(simgrid::s4u::Engine::get_clock() == Approx(timeout));
+    ret = false;
   } catch (const simgrid::Exception& e) {
     XBT_DEBUG("wait_any_for() threw an exception: %s", e.what());
     ret = true;
index 1723741..b996076 100644 (file)
@@ -409,8 +409,7 @@ int main(int argc, char* argv[])
   pr::Profile* profile_link = pr::ProfileBuilder::from_string("link_profile", ctx.link_profile.str(), 0);
   sg4::Link const* link =
       zone->create_link("link", LinkBandwidth)->set_latency(LinkLatency)->set_state_profile(profile_link)->seal();
-  zone->add_route(sender_host->get_netpoint(), receiver_host->get_netpoint(), nullptr, nullptr,
-                  {sg4::LinkInRoute{link}}, false);
+  zone->add_route(sender_host, receiver_host, {link});
   zone->seal();
 
   sg4::Host::on_onoff_cb([mbox](sg4::Host const& host) {
index 72bbbec..4a4fca3 100644 (file)
@@ -114,8 +114,8 @@ int main(int argc, char* argv[])
   for (int i = 1; i < cfg_host_count; i++) {
     auto hostname = "lilibeth " + std::to_string(i);
     auto* host    = rootzone->create_host(hostname, 1e9);
-    sg4::LinkInRoute link(rootzone->create_link(hostname, "1MBps")->set_latency("24us")->seal());
-    rootzone->add_route(master_host->get_netpoint(), host->get_netpoint(), nullptr, nullptr, {link}, true);
+    auto* link    = rootzone->create_link(hostname, "1MBps")->set_latency("24us")->seal();
+    rootzone->add_route(master_host, host, {link});
     worker_hosts.push_back(host);
   }
   rootzone->seal();
index 26944b7..863b392 100644 (file)
@@ -92,7 +92,7 @@ if __name__ == '__main__':
   for i in range(1, host_count):
     link = rootzone.create_split_duplex_link(f"link {i}", "1MBps").set_latency("24us")
     host = rootzone.create_host(f"lilibeth {i}", 1e9)
-    rootzone.add_route(main.netpoint, host.netpoint, None, None, [LinkInRoute(link, LinkInRoute.Direction.UP)], True)
+    rootzone.add_route(main, host, [link])
     Actor.create("worker", host, worker, i).set_auto_restart(True)
 
   e.netzone_root.seal()
index 828303a..6e73575 100644 (file)
@@ -116,8 +116,8 @@ int main(int argc, char** argv)
   auto* rootzone = sg4::create_full_zone("root");
   auto* paul     = rootzone->create_host("Paul", 1e9);
   auto* carol    = rootzone->create_host("Carol", 1e9);
-  sg4::LinkInRoute link(rootzone->create_link("link", "1MBps")->set_latency("24us")->seal());
-  rootzone->add_route(paul->get_netpoint(), carol->get_netpoint(), nullptr, nullptr, {link}, true);
+  auto* link     = rootzone->create_link("link", "1MBps")->set_latency("24us")->seal();
+  rootzone->add_route(paul, carol, {link});
 
   SharedBuffer buffer;
   sg4::Actor::create("producer", paul, producer, std::ref(buffer))->set_auto_restart();
index a3315e6..3b7d79a 100644 (file)
@@ -17,7 +17,7 @@ public:
   void operator()() const
   {
     /* Vector in which we store all ongoing communications */
-    std::vector<sg4::CommPtr> pending_comms;
+    sg4::ActivitySet pending_comms;
     /* Make a vector of the mailboxes to use */
     std::vector<sg4::Mailbox*> mboxes;
 
@@ -28,13 +28,13 @@ public:
       auto* mbox = sg4::Mailbox::by_name(host->get_name());
       mboxes.push_back(mbox);
       sg4::CommPtr comm = mbox->put_async(payload, msg_size);
-      pending_comms.push_back(comm);
+      pending_comms.push(comm);
     }
 
     XBT_INFO("Done dispatching all messages");
 
     /* Now that all message exchanges were initiated, wait for their completion in one single call */
-    sg4::Comm::wait_all(pending_comms);
+    pending_comms.wait_all();
 
     XBT_INFO("Goodbye now!");
   }
@@ -68,7 +68,7 @@ static sg4::NetZone* create_zone(const sg4::NetZone* root, const std::string& id
     auto* host           = zone->create_host(hostname, 1e9);
     host->create_disk("disk-" + hostname, 1e9, 1e6);
     const auto* link = zone->create_link("link-" + hostname, 1e9);
-    zone->add_route(host->get_netpoint(), router, nullptr, nullptr, {sg4::LinkInRoute(link)});
+    zone->add_route(host->get_netpoint(), router, nullptr, nullptr, {link});
   }
   return zone;
 }
diff --git a/teshsuite/s4u/wait-all-for/wait-all-for.cpp b/teshsuite/s4u/wait-all-for/wait-all-for.cpp
deleted file mode 100644 (file)
index 4f0c3d0..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include <cstdlib>
-#include <iostream>
-#include <simgrid/s4u.hpp>
-#include <string>
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(meh, "meh");
-
-static void worker()
-{
-  auto* mbox = simgrid::s4u::Mailbox::by_name("meh");
-  int input1 = 42;
-  int input2 = 51;
-
-  XBT_INFO("Sending and receiving %d and %d asynchronously", input1, input2);
-
-  auto put1 = mbox->put_async(&input1, 1000 * 1000 * 500);
-  auto put2 = mbox->put_async(&input2, 1000 * 1000 * 1000);
-
-  int* out1;
-  auto get1 = mbox->get_async<int>(&out1);
-
-  int* out2;
-  auto get2 = mbox->get_async<int>(&out2);
-
-  XBT_INFO("All comms have started");
-  std::vector<simgrid::s4u::CommPtr> comms = {put1, put2, get1, get2};
-
-  while (not comms.empty()) {
-    size_t index = simgrid::s4u::Comm::wait_all_for(comms, 0.5);
-    if (index < comms.size())
-      XBT_INFO("wait_all_for: Timeout reached");
-    XBT_INFO("wait_all_for: %zu comms finished (#comms=%zu)", index, comms.size());
-    comms.erase(comms.begin(), comms.begin() + index);
-  }
-
-  XBT_INFO("All comms have finished");
-  XBT_INFO("Got %d and %d", *out1, *out2);
-}
-
-int main(int argc, char* argv[])
-
-{
-  simgrid::s4u::Engine e(&argc, argv);
-  e.load_platform(argv[1]);
-  simgrid::s4u::Actor::create("worker", e.host_by_name("Tremblay"), worker);
-  e.run();
-  return 0;
-}
diff --git a/teshsuite/s4u/wait-all-for/wait-all-for.tesh b/teshsuite/s4u/wait-all-for/wait-all-for.tesh
deleted file mode 100644 (file)
index c7b30a9..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/usr/bin/env tesh
-
-p Testing the wait_all_for feature of S4U
-
-! output sort 19
-$ ${bindir:=.}/wait-all-for ${platfdir:=.}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (1:worker@Tremblay) Sending and receiving 42 and 51 asynchronously
-> [  0.000000] (1:worker@Tremblay) All comms have started
-> [  0.500000] (1:worker@Tremblay) wait_all_for: Timeout reached
-> [  0.500000] (1:worker@Tremblay) wait_all_for: 0 comms finished (#comms=4)
-> [  1.000000] (1:worker@Tremblay) wait_all_for: Timeout reached
-> [  1.000000] (1:worker@Tremblay) wait_all_for: 0 comms finished (#comms=4)
-> [  1.500000] (1:worker@Tremblay) wait_all_for: Timeout reached
-> [  1.500000] (1:worker@Tremblay) wait_all_for: 1 comms finished (#comms=4)
-> [  2.000000] (1:worker@Tremblay) wait_all_for: Timeout reached
-> [  2.000000] (1:worker@Tremblay) wait_all_for: 0 comms finished (#comms=3)
-> [  2.070331] (1:worker@Tremblay) wait_all_for: 3 comms finished (#comms=3)
-> [  2.070331] (1:worker@Tremblay) All comms have finished
-> [  2.070331] (1:worker@Tremblay) Got 42 and 51
diff --git a/teshsuite/s4u/wait-any-for/wait-any-for.cpp b/teshsuite/s4u/wait-any-for/wait-any-for.cpp
deleted file mode 100644 (file)
index 8eae452..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include <cstdlib>
-#include <iostream>
-#include <string>
-#include <simgrid/s4u.hpp>
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(meh, "meh");
-
-static void worker()
-{
-  auto* mbox = simgrid::s4u::Mailbox::by_name("meh");
-  int input1 = 42;
-  int input2 = 51;
-
-  XBT_INFO("Sending and receiving %d and %d asynchronously", input1, input2);
-
-  auto put1 = mbox->put_async(&input1, 1000 * 1000 * 500);
-  auto put2 = mbox->put_async(&input2, 1000 * 1000 * 1000);
-
-  int* out1;
-  auto get1 = mbox->get_async<int>(&out1);
-
-  int* out2;
-  auto get2 = mbox->get_async<int>(&out2);
-
-  XBT_INFO("All comms have started");
-  std::vector<simgrid::s4u::CommPtr> comms = {put1, put2, get1, get2};
-
-  while (not comms.empty()) {
-    ssize_t index = simgrid::s4u::Comm::wait_any_for(comms, 0.5);
-    if (index < 0)
-      XBT_INFO("wait_any_for: Timeout reached");
-    else {
-      XBT_INFO("wait_any_for: A comm finished (index=%zd, #comms=%zu)", index, comms.size());
-      comms.erase(comms.begin() + index);
-    }
-  }
-
-  XBT_INFO("All comms have finished");
-  XBT_INFO("Got %d and %d", *out1, *out2);
-}
-
-int main(int argc, char* argv[])
-
-{
-  simgrid::s4u::Engine e(&argc, argv);
-  e.load_platform(argv[1]);
-  simgrid::s4u::Actor::create("worker", e.host_by_name("Tremblay"), worker);
-  e.run();
-  return 0;
-}
diff --git a/teshsuite/s4u/wait-any-for/wait-any-for.tesh b/teshsuite/s4u/wait-any-for/wait-any-for.tesh
deleted file mode 100644 (file)
index 87ae347..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env tesh
-
-p Testing the wait_any_for feature of S4U
-
-! output sort 19
-$ ${bindir:=.}/wait-any-for ${platfdir:=.}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (1:worker@Tremblay) Sending and receiving 42 and 51 asynchronously
-> [  0.000000] (1:worker@Tremblay) All comms have started
-> [  0.500000] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [  1.000000] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [  1.035263] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=4)
-> [  1.035263] (1:worker@Tremblay) wait_any_for: A comm finished (index=1, #comms=3)
-> [  1.535263] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [  2.035263] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [  2.070331] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=2)
-> [  2.070331] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=1)
-> [  2.070331] (1:worker@Tremblay) All comms have finished
-> [  2.070331] (1:worker@Tremblay) Got 42 and 51
index 3a03fc2..8215478 100644 (file)
@@ -445,6 +445,7 @@ set(PLUGINS_SRC
   src/plugins/host_dvfs.cpp
   src/plugins/host_energy.cpp
   src/plugins/host_load.cpp
+  src/plugins/jbod.cpp
   src/plugins/link_energy.cpp
   src/plugins/link_energy_wifi.cpp
   src/plugins/link_load.cpp
@@ -648,6 +649,7 @@ set(MC_SIMGRID_MC_SRC  src/mc/explo/simgrid_mc.cpp)
 
 set(headers_to_install
   include/simgrid/actor.h
+  include/simgrid/activity_set.h
   include/simgrid/barrier.h
   include/simgrid/comm.h
   include/simgrid/engine.h
@@ -658,6 +660,7 @@ set(headers_to_install
   include/simgrid/plugins/dvfs.h
   include/simgrid/plugins/energy.h
   include/simgrid/plugins/file_system.h
+  include/simgrid/plugins/jbod.hpp
   include/simgrid/plugins/live_migration.h
   include/simgrid/plugins/load.h
   include/simgrid/plugins/photovoltaic.hpp
index 9c8fc76..a9d9d64 100755 (executable)
@@ -2,6 +2,13 @@
 
 # This script is used by various build projects on Jenkins
 
+case "$JENKINS_HOME" in
+*-qualif)
+  echo "Build skipped on $JENKINS_HOME."
+  exit 0
+  ;;
+esac
+
 # See https://ci.inria.fr/simgrid/job/SimGrid/configure
 # See https://ci.inria.fr/simgrid/job/Simgrid-Windows/configure