fi
fi
-target=tests
+target=examples
ncores=$(grep -c processor /proc/cpuinfo)
install_path=$(sed -n 's/^CMAKE_INSTALL_PREFIX:PATH=//p' CMakeCache.txt)
add_dependencies(tests tests-mc)
add_custom_target(tests-ns3 COMMENT "Recompiling the ns3 tests and tools.")
add_dependencies(tests tests-ns3)
+add_custom_target(examples COMMENT "Recompiling all examples")
+add_dependencies(examples tests)
### Build some Maintainer files
include(${CMAKE_HOME_DIRECTORY}/tools/cmake/MaintainerMode.cmake)
S4U:
- New class ActivitySet to ease wait_any()/test_any()/wait_all()
+ - Deprecate {Comm,Io,Exec}::{wait_any,wait_all,test_any} and friends
+ - New function NetZone::add_route(host1, host2, links) when you don't need gateways
+ Also add a variant with s4u::Link, when you don't want to specify the directions
+ on symmetric routes.
+ - Introduce a Mailbox::get_async() with no payload parameter. You can use the new
+ Comm::get_payload() once the communication is over to retrieve the payload.
+
+SMPI:
+ - New SMPI_app_instance_join(): wait for the completion of a started MPI instance
+ - MPI_UNIVERSE_SIZE now initialized to the total amount of hosts in the platform
Python:
- Make the host_load plugin available from Python. See examples/python/plugin-host-load
+ - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
+ - Comm::waitall/waitany/testany() are gone. Please use ActivitySet() instead.
+ - Comm::waitallfor() is gone too. Its semantic was unclear on timeout anyway.
+ - Io::waitany() and waitanyfor() are gone. Please use ActivitySet() instead.
+
+C API:
+ - Introduce sg_activity_set_t and deprecate wait_all/wait_any/test_any for
+ Exec, Io and Comm.
----------------------------------------------------------------------------
# This file lists the content of the python source package
# Prepared in tools/cmake/Distrib.cmake
+include examples/c/activityset-testany/activityset-testany.c
+include examples/c/activityset-testany/activityset-testany.tesh
+include examples/c/activityset-waitall/activityset-waitall.c
+include examples/c/activityset-waitall/activityset-waitall.tesh
+include examples/c/activityset-waitallfor/activityset-waitallfor.c
+include examples/c/activityset-waitallfor/activityset-waitallfor.tesh
+include examples/c/activityset-waitany/activityset-waitany.c
+include examples/c/activityset-waitany/activityset-waitany.tesh
include examples/c/actor-create/actor-create.c
include examples/c/actor-create/actor-create.tesh
include examples/c/actor-create/actor-create_d.xml
include examples/c/comm-wait/comm-wait3_d.xml
include examples/c/comm-wait/comm-wait4_d.xml
include examples/c/comm-wait/comm-wait_d.xml
-include examples/c/comm-waitall/comm-waitall.c
-include examples/c/comm-waitall/comm-waitall.tesh
-include examples/c/comm-waitall/comm-waitall_d.xml
-include examples/c/comm-waitany/comm-waitany.c
-include examples/c/comm-waitany/comm-waitany.tesh
-include examples/c/comm-waitany/comm-waitany_d.xml
include examples/c/dht-kademlia/answer.c
include examples/c/dht-kademlia/answer.h
include examples/c/dht-kademlia/common.h
include examples/c/exec-dvfs/exec-dvfs.tesh
include examples/c/exec-remote/exec-remote.c
include examples/c/exec-remote/exec-remote.tesh
-include examples/c/exec-waitany/exec-waitany.c
-include examples/c/exec-waitany/exec-waitany.tesh
include examples/c/io-disk-raw/io-disk-raw.c
include examples/c/io-disk-raw/io-disk-raw.tesh
include examples/c/io-file-remote/io-file-remote.c
include examples/cpp/platform-properties/s4u-platform-properties.tesh
include examples/cpp/plugin-host-load/s4u-plugin-host-load.cpp
include examples/cpp/plugin-host-load/s4u-plugin-host-load.tesh
+include examples/cpp/plugin-jbod/s4u-plugin-jbod.cpp
+include examples/cpp/plugin-jbod/s4u-plugin-jbod.tesh
include examples/cpp/plugin-link-load/s4u-plugin-link-load.cpp
include examples/cpp/plugin-link-load/s4u-plugin-link-load.tesh
include examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp
include examples/cpp/trace-process-migration/s4u-trace-process-migration.tesh
include examples/cpp/trace-route-user-variables/s4u-trace-route-user-variables.cpp
include examples/cpp/trace-route-user-variables/s4u-trace-route-user-variables.tesh
+include examples/python/activityset-testany/activityset-testany.py
+include examples/python/activityset-testany/activityset-testany.tesh
+include examples/python/activityset-waitall/activityset-waitall.py
+include examples/python/activityset-waitall/activityset-waitall.tesh
+include examples/python/activityset-waitallfor/activityset-waitallfor.py
+include examples/python/activityset-waitallfor/activityset-waitallfor.tesh
+include examples/python/activityset-waitany/activityset-waitany.py
+include examples/python/activityset-waitany/activityset-waitany.tesh
include examples/python/actor-create/actor-create.py
include examples/python/actor-create/actor-create.tesh
include examples/python/actor-daemon/actor-daemon.py
include examples/python/comm-ready/comm-ready.tesh
include examples/python/comm-suspend/comm-suspend.py
include examples/python/comm-suspend/comm-suspend.tesh
-include examples/python/comm-testany/comm-testany.py
-include examples/python/comm-testany/comm-testany.tesh
include examples/python/comm-throttling/comm-throttling.py
include examples/python/comm-throttling/comm-throttling.tesh
include examples/python/comm-wait/comm-wait.py
include examples/python/comm-wait/comm-wait.tesh
-include examples/python/comm-waitall/comm-waitall.py
-include examples/python/comm-waitall/comm-waitall.tesh
-include examples/python/comm-waitallfor/comm-waitallfor.py
-include examples/python/comm-waitallfor/comm-waitallfor.tesh
-include examples/python/comm-waitany/comm-waitany.py
-include examples/python/comm-waitany/comm-waitany.tesh
include examples/python/comm-waituntil/comm-waituntil.py
include examples/python/comm-waituntil/comm-waituntil.tesh
include examples/python/exec-async/exec-async.py
include teshsuite/s4u/vm-live-migration/vm-live-migration.tesh
include teshsuite/s4u/vm-suicide/vm-suicide.cpp
include teshsuite/s4u/vm-suicide/vm-suicide.tesh
-include teshsuite/s4u/wait-all-for/wait-all-for.cpp
-include teshsuite/s4u/wait-all-for/wait-all-for.tesh
-include teshsuite/s4u/wait-any-for/wait-any-for.cpp
-include teshsuite/s4u/wait-any-for/wait-any-for.tesh
include teshsuite/smpi/MBI/CollArgGenerator.py
include teshsuite/smpi/MBI/CollComGenerator.py
include teshsuite/smpi/MBI/CollLocalConcurrencyGenerator.py
include examples/smpi/smpi_s4u_masterworker/CMakeLists.txt
include examples/sthread/CMakeLists.txt
include include/simgrid/Exception.hpp
+include include/simgrid/activity_set.h
include include/simgrid/actor.h
include include/simgrid/barrier.h
include include/simgrid/chrono.hpp
include include/simgrid/plugins/dvfs.h
include include/simgrid/plugins/energy.h
include include/simgrid/plugins/file_system.h
+include include/simgrid/plugins/jbod.hpp
include include/simgrid/plugins/live_migration.h
include include/simgrid/plugins/load.h
include include/simgrid/plugins/ns3.hpp
include src/plugins/host_dvfs.cpp
include src/plugins/host_energy.cpp
include src/plugins/host_load.cpp
+include src/plugins/jbod.cpp
include src/plugins/link_energy.cpp
include src/plugins/link_energy_wifi.cpp
include src/plugins/link_load.cpp
if some do not work for you.
- **make**: Build the core of SimGrid that gets installed, but not any example.
-- **make tests**: Build the tests and examples.
+- **make examples**: Build the examples, which are needed by the tests.
- **make simgrid**: Build only the SimGrid library. Not any example nor the helper tools.
- **make s4u-comm-pingpong**: Build only this example (works for any example)
- **make python-bindings**: Build the Python bindings
communications are not slowed down, because there is no contention yet).
As an alternative to the above LMM-based models, it is possible to use the :ref:`ns-3 simulator as a network model <models_ns3>`. ns-3 performs
-a mushc more detailed, packet-level simulation
+a much more detailed, packet-level simulation
than the above models. As a result is is much slower but will produce more accurate results.
Both simulators have time complexity that is linear in the size of their input, but ns-3 has a much larger input in case of large communications
because it considers individual network packets.
=======
This routing model is particularly well adapted to Peer-to-Peer and Clouds platforms: each component is connected to the
-cloud through a private link of which the upload and download rate may be asymmetric.
+cloud through a private link whose upload and download rates may be asymmetric.
-The network core (between the private links) is assumed to be over-sized so only the latency is taken into account.
-Instead of a matrix of latencies that would become too large when the amount of peers grows, Vivaldi netzones give a
-coordinate to each peer and compute the latency between host A=(xA,yA,zA) and host B=(xB,yB,zB) as follows:
+The network core (between the private links) is assumed to be over-provisioned so that only the latency has to be
+taken into account. Instead of a matrix of latencies that would become too large when the amount of peers grows,
+Vivaldi netzones give a coordinate to each peer and compute the latency between host A=(xA,yA,zA) and host B=(xB,yB,zB)
+as follows:
latency = sqrt( (xA-xB)² + (yA-yB)² ) + zA + zB
foreach(x
actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-stacksize
actor-suspend actor-yield
+ activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
app-masterworker app-token-ring
- comm-pingpong comm-wait comm-waitall comm-waitany
+ comm-pingpong comm-wait
cloud-capping cloud-masterworker cloud-migration cloud-simple
dht-pastry
- exec-async exec-basic exec-dvfs exec-remote exec-waitany
+ exec-async exec-basic exec-dvfs exec-remote
energy-exec energy-exec-ptask energy-vm
io-disk-raw io-file-remote io-file-system
platform-failures platform-properties
${CMAKE_CURRENT_SOURCE_DIR}/comm-wait/comm-wait2_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/comm-wait/comm-wait3_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/comm-wait/comm-wait4_d.xml
- ${CMAKE_CURRENT_SOURCE_DIR}/comm-waitall/comm-waitall_d.xml
- ${CMAKE_CURRENT_SOURCE_DIR}/comm-waitany/comm-waitany_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/dht-kademlia/dht-kademlia_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/dht-pastry/dht-pastry_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/io-file-remote/io-file-remote_d.xml
foreach(x
actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-stacksize
actor-suspend actor-yield
+ activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
app-bittorrent app-chainsend app-masterworker app-token-ring
- comm-pingpong comm-wait comm-waitall comm-waitany
+ comm-pingpong comm-wait
cloud-capping cloud-masterworker cloud-migration cloud-simple
dht-kademlia dht-pastry
- exec-async exec-basic exec-dvfs exec-remote exec-waitany
+ exec-async exec-basic exec-dvfs exec-remote
energy-exec energy-exec-ptask energy-vm
io-disk-raw io-file-remote io-file-system
platform-failures platform-properties
--- /dev/null
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_testany, "Messages specific for this s4u example");
+
+static void bob(int argc, char* argv[])
+{
+ XBT_INFO("Create my asynchronous activities");
+ sg_exec_t exec = sg_actor_exec_init(5e9);
+ sg_exec_start(exec);
+
+ sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+ void* payload = NULL;
+ sg_comm_t comm = sg_mailbox_get_async(mbox, &payload);
+
+ sg_activity_set_t pending_activities = sg_activity_set_init();
+ sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+ sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+ XBT_INFO("Sleep_for a while");
+ sg_actor_sleep_for(1);
+
+ XBT_INFO("Test for completed activities");
+ while (!sg_activity_set_empty(pending_activities)) {
+ sg_activity_t completed_one = sg_activity_set_test_any(pending_activities);
+ if (completed_one != NULL) {
+ if (sg_comm_isinstance(completed_one))
+ XBT_INFO("Completed a Comm");
+ if (sg_exec_isinstance(completed_one))
+ XBT_INFO("Completed an Exec");
+ } else {
+ XBT_INFO("Nothing matches, test again in 0.5s");
+ sg_actor_sleep_for(.5);
+ }
+ }
+ XBT_INFO("Last activity is complete");
+ free(payload);
+}
+
+static void alice(int argc, char* argv[])
+{
+ char* payload = xbt_strdup("Message");
+ XBT_INFO("Send '%s'", payload);
+ sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid_init(&argc, argv);
+ xbt_assert(argc > 1,
+ "Usage: %s platform_file\n"
+ "\tExample: %s hosts_with_disks.xml\n",
+ argv[0], argv[0]);
+
+ simgrid_load_platform(argv[1]);
+
+ sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+ sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+ simgrid_run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n"
+> [0.00] [alice] Send 'Message'
+> [0.00] [ bob] Create my asynchronous activities
+> [0.00] [ bob] Sleep_for a while
+> [1.00] [ bob] Test for completed activities
+> [1.00] [ bob] Nothing matches, test again in 0.5s
+> [1.50] [ bob] Nothing matches, test again in 0.5s
+> [2.00] [ bob] Nothing matches, test again in 0.5s
+> [2.50] [ bob] Nothing matches, test again in 0.5s
+> [3.00] [ bob] Nothing matches, test again in 0.5s
+> [3.50] [ bob] Nothing matches, test again in 0.5s
+> [4.00] [ bob] Nothing matches, test again in 0.5s
+> [4.50] [ bob] Nothing matches, test again in 0.5s
+> [5.00] [ bob] Completed an Exec
+> [5.00] [ bob] Nothing matches, test again in 0.5s
+> [5.50] [ bob] Completed a Comm
+> [5.50] [ bob] Last activity is complete
--- /dev/null
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitall, "Messages specific for this s4u example");
+
+static void bob()
+{
+ XBT_INFO("Create my asynchronous activities");
+ sg_exec_t exec = sg_actor_exec_init(5e9);
+ sg_exec_start(exec);
+
+ sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+ void* payload = NULL;
+ sg_comm_t comm = sg_mailbox_get_async(mbox, &payload);
+
+ sg_activity_set_t pending_activities = sg_activity_set_init();
+ sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+ sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+ XBT_INFO("Wait for asynchronous activities to complete, all in one shot.");
+ sg_activity_set_wait_all(pending_activities);
+
+ XBT_INFO("All activities are completed.");
+ free(payload);
+}
+
+static void alice()
+{
+ char* payload = xbt_strdup("Message");
+ XBT_INFO("Send '%s'", payload);
+ sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid_init(&argc, argv);
+ xbt_assert(argc > 1,
+ "Usage: %s platform_file\n"
+ "\tExample: %s hosts_with_disks.xml\n",
+ argv[0], argv[0]);
+
+ simgrid_load_platform(argv[1]);
+
+ sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+ sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+ simgrid_run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitall ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchronous activities to complete, all in one shot.
+> [5.197828] [ bob] All activities are completed.
--- /dev/null
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitallfor, "Messages specific for this s4u example");
+
+static void bob()
+{
+ XBT_INFO("Create my asynchronous activities");
+ sg_exec_t exec = sg_actor_exec_init(5e9);
+ sg_exec_start(exec);
+
+ sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+ void* payload = NULL;
+ sg_comm_t comm = sg_mailbox_get_async(mbox, &payload);
+
+ sg_activity_set_t pending_activities = sg_activity_set_init();
+ sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+ sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+ XBT_INFO("Wait for asynchronous activities to complete");
+ while (!sg_activity_set_empty(pending_activities)) {
+ if (!sg_activity_set_wait_all_for(pending_activities, 1)) {
+ XBT_INFO("Not all activities are terminated yet.");
+ }
+
+ sg_activity_t completed_one = sg_activity_set_test_any(pending_activities);
+ while (completed_one != NULL) {
+ if (sg_comm_isinstance(completed_one))
+ XBT_INFO("Completed a Comm");
+ if (sg_exec_isinstance(completed_one))
+ XBT_INFO("Completed an Exec");
+ completed_one = sg_activity_set_test_any(pending_activities);
+ }
+ }
+
+ XBT_INFO("Last activity is complete");
+ free(payload);
+}
+
+static void alice()
+{
+ char* payload = xbt_strdup("Message");
+ XBT_INFO("Send '%s'", payload);
+ sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid_init(&argc, argv);
+ xbt_assert(argc > 1,
+ "Usage: %s platform_file\n"
+ "\tExample: %s hosts_with_disks.xml\n",
+ argv[0], argv[0]);
+
+ simgrid_load_platform(argv[1]);
+
+ sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+ sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+ simgrid_run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitallfor ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchronous activities to complete
+> [1.000000] [ bob] Not all activities are terminated yet.
+> [2.000000] [ bob] Not all activities are terminated yet.
+> [3.000000] [ bob] Not all activities are terminated yet.
+> [4.000000] [ bob] Not all activities are terminated yet.
+> [5.000000] [ bob] Not all activities are terminated yet.
+> [5.000000] [ bob] Completed an Exec
+> [5.197828] [ bob] Completed a Comm
+> [5.197828] [ bob] Last activity is complete
--- /dev/null
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+
+static void bob()
+{
+ XBT_INFO("Create my asynchronous activities");
+ sg_exec_t exec = sg_actor_exec_init(5e9);
+ sg_exec_start(exec);
+
+ sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+ void* payload = NULL;
+ sg_comm_t comm = sg_mailbox_get_async(mbox, &payload);
+
+ sg_activity_set_t pending_activities = sg_activity_set_init();
+ sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+ sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+ XBT_INFO("Wait for asynchronous activities to complete");
+ while (!sg_activity_set_empty(pending_activities)) {
+
+ sg_activity_t completed_one = sg_activity_set_wait_any(pending_activities);
+ if (sg_comm_isinstance(completed_one))
+ XBT_INFO("Completed a Comm");
+ else if (sg_exec_isinstance(completed_one))
+ XBT_INFO("Completed an Exec");
+ else
+ xbt_die("This activity set is supposed to only contain Comm or Exec");
+ }
+ XBT_INFO("Last activity is complete");
+ free(payload);
+}
+
+static void alice()
+{
+ char* payload = xbt_strdup("Message");
+ XBT_INFO("Send '%s'", payload);
+ sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid_init(&argc, argv);
+ xbt_assert(argc > 1,
+ "Usage: %s platform_file\n"
+ "\tExample: %s hosts_with_disks.xml\n",
+ argv[0], argv[0]);
+
+ simgrid_load_platform(argv[1]);
+
+ sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+ sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+ simgrid_run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchronous activities to complete
+> [5.000000] [ bob] Completed an Exec
+> [5.197828] [ bob] Completed a Comm
+> [5.197828] [ bob] Last activity is complete
static void broadcaster_send_file(const_broadcaster_t bc)
{
- int nb_pending_sends = 0;
-
for (unsigned int current_piece = 0; current_piece < bc->piece_count; current_piece++) {
XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg_host_self_get_name(),
sg_mailbox_get_name(bc->first));
char* file_piece = bprintf("piece-%u", current_piece);
sg_comm_t comm = sg_mailbox_put_async(bc->first, file_piece, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
- bc->pending_sends[nb_pending_sends++] = comm;
+ sg_activity_set_push(bc->pending_sends, (sg_activity_t)comm);
}
- sg_comm_wait_all(bc->pending_sends, nb_pending_sends);
+ sg_activity_set_wait_all(bc->pending_sends);
}
static broadcaster_t broadcaster_init(sg_mailbox_t* mailboxes, unsigned int host_count, unsigned int piece_count)
bc->host_count = host_count;
bc->piece_count = piece_count;
bc->mailboxes = mailboxes;
- bc->pending_sends = xbt_malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
+ bc->pending_sends = sg_activity_set_init();
broadcaster_build_chain(bc);
static void broadcaster_destroy(broadcaster_t bc)
{
- xbt_free(bc->pending_sends);
+ sg_activity_set_delete(bc->pending_sends);
xbt_free(bc->mailboxes);
xbt_free(bc);
}
#ifndef CHAINSEND_H
#define CHAINSEND_H
+#include "simgrid/activity_set.h"
#include "simgrid/actor.h"
#include "simgrid/comm.h"
#include "simgrid/engine.h"
unsigned int piece_count;
sg_mailbox_t first;
sg_mailbox_t* mailboxes;
- sg_comm_t* pending_sends;
+ sg_activity_set_t pending_sends;
} s_broadcaster_t;
typedef s_broadcaster_t* broadcaster_t;
unsigned long long received_bytes;
unsigned int received_pieces;
unsigned int total_pieces;
- sg_comm_t* pending_recvs;
- sg_comm_t* pending_sends;
+ sg_activity_set_t pending_recvs;
+ sg_activity_set_t pending_sends;
} s_peer_t;
typedef s_peer_t* peer_t;
-/* Copyright (c) 2012-2023. The SimGrid Team.
- * All rights reserved. */
+/* Copyright (c) 2012-2023. The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
static void peer_forward_file(peer_t p)
{
void* received;
- int done = 0;
- size_t nb_pending_sends = 0;
- size_t nb_pending_recvs = 0;
+ int done = 0;
while (!done) {
- p->pending_recvs[nb_pending_recvs] = sg_mailbox_get_async(p->me, &received);
- nb_pending_recvs++;
+ sg_activity_set_push(p->pending_recvs, (sg_activity_t)sg_mailbox_get_async(p->me, &received));
- ssize_t idx = sg_comm_wait_any(p->pending_recvs, nb_pending_recvs);
- if (idx != -1) {
+ sg_activity_t acti = sg_activity_set_wait_any(p->pending_recvs);
+ if (acti != NULL) {
+ sg_comm_unref((sg_comm_t)acti);
XBT_DEBUG("Peer %s got a 'SEND_DATA' message", sg_mailbox_get_name(p->me));
- /* move the last pending comm where the finished one was, and decrement */
- p->pending_recvs[idx] = p->pending_recvs[--nb_pending_recvs];
if (p->next != NULL) {
XBT_DEBUG("Sending %s (asynchronously) from %s to %s", (char*)received, sg_mailbox_get_name(p->me),
sg_mailbox_get_name(p->next));
sg_comm_t send = sg_mailbox_put_async(p->next, received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
- p->pending_sends[nb_pending_sends] = send;
- nb_pending_sends++;
+ sg_activity_set_push(p->pending_sends, (sg_activity_t)send);
} else
free(received);
}
}
}
- sg_comm_wait_all(p->pending_sends, nb_pending_sends);
+ sg_activity_set_wait_all(p->pending_sends);
}
static peer_t peer_init(int argc, char* argv[])
p->next = NULL;
p->received_pieces = 0;
p->received_bytes = 0;
- p->pending_recvs = xbt_malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
- p->pending_sends = xbt_malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
+ p->pending_recvs = sg_activity_set_init();
+ p->pending_sends = sg_activity_set_init();
p->me = sg_mailbox_by_name(sg_host_self_get_name());
static void peer_delete(peer_t p)
{
- xbt_free(p->pending_recvs);
- xbt_free(p->pending_sends);
+ sg_activity_set_delete(p->pending_recvs);
+ sg_activity_set_delete(p->pending_sends);
xbt_free(p);
}
+++ /dev/null
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/actor.h"
-#include "simgrid/comm.h"
-#include "simgrid/engine.h"
-#include "simgrid/host.h"
-#include "simgrid/mailbox.h"
-
-#include "xbt/log.h"
-#include "xbt/str.h"
-#include "xbt/sysdep.h"
-
-#include <stdio.h> /* snprintf */
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(comm_waitall, "Messages specific for this example");
-
-static void sender(int argc, char* argv[])
-{
- xbt_assert(argc == 4, "This function expects 3 parameters from the XML deployment file");
- long messages_count = xbt_str_parse_int(argv[1], "Invalid message count");
- long message_size = xbt_str_parse_int(argv[2], "Invalid message size");
- long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers");
- xbt_assert(receivers_count > 0);
-
- /* Array in which we store all ongoing communications */
- sg_comm_t* pending_comms = xbt_malloc(sizeof(sg_comm_t) * (messages_count + receivers_count));
- int pending_comms_count = 0;
-
- /* Make an array of the mailboxes to use */
- sg_mailbox_t* mboxes = xbt_malloc(sizeof(sg_mailbox_t) * receivers_count);
- for (long i = 0; i < receivers_count; i++) {
- char mailbox_name[80];
- snprintf(mailbox_name, 79, "receiver-%ld", i);
- sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
- mboxes[i] = mbox;
- }
-
- /* Start dispatching all messages to receivers, in a round robin fashion */
- for (long i = 0; i < messages_count; i++) {
- char msg_content[80];
- snprintf(msg_content, 79, "Message %ld", i);
- sg_mailbox_t mbox = mboxes[i % receivers_count];
- XBT_INFO("Send '%s' to '%s'", msg_content, sg_mailbox_get_name(mbox));
- /* Create a communication representing the ongoing communication, and store it in pending_comms */
- pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, xbt_strdup(msg_content), message_size);
- }
-
- /* Start sending messages to let the workers know that they should stop */
- for (long i = 0; i < receivers_count; i++) {
- XBT_INFO("Send 'finalize' to 'receiver-%ld'", i);
- char* end_msg = xbt_strdup("finalize");
- sg_mailbox_t mbox = mboxes[i % receivers_count];
- pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, end_msg, 0);
- }
-
- XBT_INFO("Done dispatching all messages");
-
- /* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg_comm_wait_all(pending_comms, pending_comms_count);
-
- xbt_free(pending_comms);
- xbt_free(mboxes);
-
- XBT_INFO("Goodbye now!");
-}
-
-static void receiver(int argc, char* argv[])
-{
- xbt_assert(argc == 2, "Expecting one parameter from the XML deployment file but got %d", argc);
- int id = (int)xbt_str_parse_int(argv[1], "ID should be numerical");
- char mailbox_name[80];
- snprintf(mailbox_name, 79, "receiver-%d", id);
- sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
- XBT_INFO("Wait for my first message");
- while (1) {
- char* received = (char*)sg_mailbox_get(mbox);
- XBT_INFO("I got a '%s'.", received);
- if (!strcmp(received, "finalize")) { // If it's a finalize message, we're done
- xbt_free(received);
- break;
- }
- xbt_free(received);
- }
-}
-
-int main(int argc, char* argv[])
-{
- simgrid_init(&argc, argv);
- xbt_assert(argc > 2,
- "Usage: %s platform_file deployment_file\n"
- "\tExample: %s platform.xml deployment.xml\n",
- argv[0], argv[0]);
-
- simgrid_load_platform(argv[1]);
-
- simgrid_register_function("sender", sender);
- simgrid_register_function("receiver", receiver);
- simgrid_load_deployment(argv[2]);
-
- simgrid_run();
-
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-! output sort 19
-$ ${bindir:=.}/c-comm-waitall ${platfdir:=.}/small_platform_fatpipe.xml ${srcdir:=.}/comm-waitall_d.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
-> [ 0.000000] (2:receiver@Ruby) Wait for my first message
-> [ 0.000000] (3:receiver@Perl) Wait for my first message
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [ 0.004022] (2:receiver@Ruby) I got a 'Message 0'.
-> [ 0.004022] (3:receiver@Perl) I got a 'Message 1'.
-> [ 0.008043] (2:receiver@Ruby) I got a 'Message 2'.
-> [ 0.008043] (3:receiver@Perl) I got a 'Message 3'.
-> [ 0.009995] (3:receiver@Perl) I got a 'finalize'.
-> [ 0.012065] (2:receiver@Ruby) I got a 'Message 4'.
-> [ 0.014016] (2:receiver@Ruby) I got a 'finalize'.
-> [ 0.014016] (1:sender@Tremblay) Goodbye now!
+++ /dev/null
-<?xml version='1.0'?>
-<!DOCTYPE platform SYSTEM "https://simgrid.org/simgrid.dtd">
-<platform version="4.1">
- <!-- The master actor (with some arguments) -->
- <actor host="Tremblay" function="sender">
- <argument value="5"/> <!-- Number of messages -->
- <argument value="1000000"/> <!-- Size of messages -->
- <argument value="2"/> <!-- Number of receivers -->
- </actor>
- <!-- The receiver actors -->
- <actor host="Ruby" function="receiver">
- <argument value="0"/>
- </actor>
- <actor host="Perl" function="receiver">
- <argument value="1"/>
- </actor>
-</platform>
+++ /dev/null
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/actor.h"
-#include "simgrid/comm.h"
-#include "simgrid/engine.h"
-#include "simgrid/forward.h"
-#include "simgrid/mailbox.h"
-#include "xbt/log.h"
-#include "xbt/str.h"
-#include "xbt/sysdep.h"
-
-#include <stdio.h> /* snprintf */
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(comm_waitany, "Messages specific for this example");
-
-static void sender(int argc, char* argv[])
-{
- xbt_assert(argc == 4, "Expecting 3 parameters from the XML deployment file but got %d", argc);
- long messages_count = xbt_str_parse_int(argv[1], "Invalid message count");
- long msg_size = xbt_str_parse_int(argv[2], "Invalid message size");
- long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers");
- xbt_assert(receivers_count > 0);
-
- /* Array in which we store all ongoing communications */
- sg_comm_t* pending_comms = xbt_malloc(sizeof(sg_comm_t) * (messages_count + receivers_count));
- int pending_comms_count = 0;
-
- /* Make an array of the mailboxes to use */
- sg_mailbox_t* mboxes = xbt_malloc(sizeof(sg_mailbox_t) * receivers_count);
- for (long i = 0; i < receivers_count; i++) {
- char mailbox_name[80];
- snprintf(mailbox_name, 79, "receiver-%ld", i);
- sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
- mboxes[i] = mbox;
- }
-
- /* Start dispatching all messages to receivers, in a round robin fashion */
- for (long i = 0; i < messages_count; i++) {
- char msg_content[80];
- snprintf(msg_content, 79, "Message %ld", i);
- sg_mailbox_t mbox = mboxes[i % receivers_count];
- XBT_INFO("Send '%s' to '%s'", msg_content, sg_mailbox_get_name(mbox));
-
- /* Create a communication representing the ongoing communication, and store it in pending_comms */
- pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, xbt_strdup(msg_content), msg_size);
- }
- /* Start sending messages to let the workers know that they should stop */
- for (long i = 0; i < receivers_count; i++) {
- XBT_INFO("Send 'finalize' to 'receiver-%ld'", i);
- char* end_msg = xbt_strdup("finalize");
- sg_mailbox_t mbox = mboxes[i % receivers_count];
- pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, end_msg, 0);
- }
-
- XBT_INFO("Done dispatching all messages");
-
- /* Now that all message exchanges were initiated, wait for their completion, in order of termination.
- *
- * This loop waits for first terminating message with wait_any() and remove it from the array (with a memmove),
- * until all comms are terminated.
- * Even in this simple example, the pending comms do not terminate in the exact same order of creation.
- */
- while (pending_comms_count != 0) {
- ssize_t changed_pos = sg_comm_wait_any(pending_comms, pending_comms_count);
- memmove(pending_comms + changed_pos, pending_comms + changed_pos + 1,
- sizeof(sg_comm_t) * (pending_comms_count - changed_pos - 1));
- pending_comms_count--;
-
- if (changed_pos != 0)
- XBT_INFO("Remove the %zdth pending comm: it terminated earlier than another comm that was initiated first.",
- changed_pos);
- }
-
- xbt_free(pending_comms);
- xbt_free(mboxes);
-
- XBT_INFO("Goodbye now!");
-}
-
-static void receiver(int argc, char* argv[])
-{
- xbt_assert(argc == 2, "Expecting one parameter from the XML deployment file but got %d", argc);
- int id = (int)xbt_str_parse_int(argv[1], "ID should be numerical");
- char mailbox_name[80];
- snprintf(mailbox_name, 79, "receiver-%d", id);
- sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
- XBT_INFO("Wait for my first message on '%s'", mailbox_name);
- while (1) {
- char* received = (char*)sg_mailbox_get(mbox);
- XBT_INFO("I got a '%s'.", received);
- if (!strcmp(received, "finalize")) { // If it's a finalize message, we're done
- xbt_free(received);
- break;
- }
- xbt_free(received);
- }
-
- XBT_INFO("I'm done. See you!");
-}
-
-int main(int argc, char* argv[])
-{
- simgrid_init(&argc, argv);
- xbt_assert(argc > 2,
- "Usage: %s platform_file deployment_file\n"
- "\tExample: %s platform.xml deployment.xml\n",
- argv[0], argv[0]);
-
- simgrid_load_platform(argv[1]);
-
- simgrid_register_function("sender", sender);
- simgrid_register_function("receiver", receiver);
- simgrid_load_deployment(argv[2]);
-
- simgrid_run();
- XBT_INFO("Simulation time %g", simgrid_get_clock());
-
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-! output sort 19
-$ ${bindir:=.}/c-comm-waitany ${platfdir:=.}/small_platform.xml ${srcdir:=.}/comm-waitany_d.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 5' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [ 0.000000] (2:receiver@Fafard) Wait for my first message on 'receiver-0'
-> [ 0.000000] (3:receiver@Jupiter) Wait for my first message on 'receiver-1'
-> [ 0.158397] (2:receiver@Fafard) I got a 'Message 0'.
-> [ 0.169155] (3:receiver@Jupiter) I got a 'Message 1'.
-> [ 0.316794] (2:receiver@Fafard) I got a 'Message 2'.
-> [ 0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
-> [ 0.475190] (2:receiver@Fafard) I got a 'Message 4'.
-> [ 0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
-> [ 0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [ 0.500898] (2:receiver@Fafard) I'm done. See you!
-> [ 0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
-> [ 0.526478] (0:maestro@) Simulation time 0.526478
-> [ 0.526478] (1:sender@Tremblay) Goodbye now!
-> [ 0.526478] (3:receiver@Jupiter) I got a 'finalize'.
-> [ 0.526478] (3:receiver@Jupiter) I'm done. See you!
\ No newline at end of file
+++ /dev/null
-<?xml version='1.0'?>
-<!DOCTYPE platform SYSTEM "https://simgrid.org/simgrid.dtd">
-<platform version="4.1">
- <!-- The master actor (with some arguments) -->
- <actor host="Tremblay" function="sender">
- <argument value="6"/> <!-- Number of tasks -->
- <argument value="1000000"/> <!-- Communication size of tasks -->
- <argument value="2"/> <!-- Number of receivers -->
- </actor>
- <!-- The receiver actors -->
- <actor host="Fafard" function="receiver">
- <argument value="0"/>
- </actor>
- <actor host="Jupiter" function="receiver">
- <argument value="1"/>
- </actor>
-</platform>
+++ /dev/null
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/actor.h"
-#include "simgrid/engine.h"
-#include "simgrid/exec.h"
-#include "simgrid/host.h"
-
-#include "xbt/log.h"
-#include "xbt/sysdep.h"
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(exec_waitany, "Messages specific for this example");
-
-static void worker(int argc, char* argv[])
-{
- xbt_assert(argc > 1);
- int with_timeout = !strcmp(argv[1], "true");
-
- /* Vector in which we store all pending executions*/
- sg_exec_t* pending_execs = xbt_malloc(sizeof(sg_exec_t) * 3);
- int pending_execs_count = 0;
-
- for (int i = 0; i < 3; i++) {
- char* name = bprintf("Exec-%d", i);
- double amount = (6 * (i % 2) + i + 1) * sg_host_get_speed(sg_host_self());
-
- sg_exec_t exec = sg_actor_exec_init(amount);
- sg_exec_set_name(exec, name);
- pending_execs[pending_execs_count++] = exec;
- sg_exec_start(exec);
-
- XBT_INFO("Activity %s has started for %.0f seconds", name, amount / sg_host_get_speed(sg_host_self()));
- free(name);
- }
-
- /* Now that executions were initiated, wait for their completion, in order of termination.
- *
- * This loop waits for first terminating execution with wait_any() and remove it with erase(), until all execs are
- * terminated.
- */
- while (pending_execs_count > 0) {
- ssize_t pos;
- if (with_timeout)
- pos = sg_exec_wait_any_for(pending_execs, pending_execs_count, 4);
- else
- pos = sg_exec_wait_any(pending_execs, pending_execs_count);
-
- if (pos < 0) {
- XBT_INFO("Do not wait any longer for an activity");
- pending_execs_count = 0;
- } else {
- XBT_INFO("Activity at position %zd is complete", pos);
- memmove(pending_execs + pos, pending_execs + pos + 1, sizeof(sg_exec_t) * (pending_execs_count - pos - 1));
- pending_execs_count--;
- }
- XBT_INFO("%d activities remain pending", pending_execs_count);
- }
-
- xbt_free(pending_execs);
-}
-
-int main(int argc, char* argv[])
-{
- simgrid_init(&argc, argv);
- simgrid_load_platform(argv[1]);
-
- const char* worker_argv[] = {"worker", "false"};
- sg_actor_create_("worker", sg_host_by_name("Tremblay"), worker, 2, worker_argv);
-
- worker_argv[1] = "true";
- sg_actor_create_("worker_timeout", sg_host_by_name("Tremblay"), worker, 2, worker_argv);
-
- simgrid_run();
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-! output sort 19
-$ ${bindir:=.}/c-exec-waitany ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n"
-> [ 0.000000] [ worker] Activity Exec-0 has started for 1 seconds
-> [ 0.000000] [worker_timeout] Activity Exec-0 has started for 1 seconds
-> [ 0.000000] [ worker] Activity Exec-1 has started for 8 seconds
-> [ 0.000000] [worker_timeout] Activity Exec-1 has started for 8 seconds
-> [ 0.000000] [ worker] Activity Exec-2 has started for 3 seconds
-> [ 0.000000] [worker_timeout] Activity Exec-2 has started for 3 seconds
-> [ 1.000000] [worker_timeout] Activity at position 0 is complete
-> [ 1.000000] [worker_timeout] 2 activities remain pending
-> [ 1.000000] [ worker] Activity at position 0 is complete
-> [ 1.000000] [ worker] 2 activities remain pending
-> [ 3.000000] [worker_timeout] Activity at position 1 is complete
-> [ 3.000000] [worker_timeout] 1 activities remain pending
-> [ 3.000000] [ worker] Activity at position 1 is complete
-> [ 3.000000] [ worker] 1 activities remain pending
-> [ 7.000000] [worker_timeout] Do not wait any longer for an activity
-> [ 7.000000] [worker_timeout] 0 activities remain pending
-> [ 8.000000] [ worker] Activity at position 0 is complete
-> [ 8.000000] [ worker] 0 activities remain pending
task-io task-simple task-variable-load task-storm task-switch-host
photovoltaic-simple
platform-comm-serialize platform-failures platform-profile platform-properties
- plugin-host-load plugin-link-load plugin-prodcons
+ plugin-host-load plugin-jbod plugin-link-load plugin-prodcons
replay-comm replay-io
routing-get-clusters
synchro-barrier synchro-condition-variable synchro-condition-variable-waituntil synchro-mutex synchro-semaphore
boost::dynamic_pointer_cast<sg4::Activity>(comm),
boost::dynamic_pointer_cast<sg4::Activity>(io)});
- XBT_INFO("Wait for asynchrounous activities to complete, all in one shot.");
+ XBT_INFO("Wait for asynchronous activities to complete, all in one shot.");
pending_activities.wait_all();
XBT_INFO("All activities are completed.");
$ ${bindir:=.}/s4u-activityset-waitall ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
> [0.000000] [alice] Send 'Message'
> [0.000000] [ bob] Create my asynchronous activities
-> [0.000000] [ bob] Wait for asynchrounous activities to complete, all in one shot.
+> [0.000000] [ bob] Wait for asynchronous activities to complete, all in one shot.
> [5.197828] [ bob] All activities are completed.
sg4::ActivitySet pending_activities({exec, comm, io});
- XBT_INFO("Wait for asynchrounous activities to complete");
+ XBT_INFO("Wait for asynchronous activities to complete");
while (not pending_activities.empty()) {
try {
pending_activities.wait_all_for(1);
$ ${bindir:=.}/s4u-activityset-waitallfor ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
> [0.000000] [alice] Send 'Message'
> [0.000000] [ bob] Create my asynchronous activities
-> [0.000000] [ bob] Wait for asynchrounous activities to complete
+> [0.000000] [ bob] Wait for asynchronous activities to complete
> [1.000000] [ bob] Not all activities are terminated yet.
> [2.000000] [ bob] Not all activities are terminated yet.
> [3.000000] [ bob] Not all activities are terminated yet.
boost::dynamic_pointer_cast<sg4::Activity>(comm),
boost::dynamic_pointer_cast<sg4::Activity>(io)});
- XBT_INFO("Wait for asynchrounous activities to complete");
+ XBT_INFO("Wait for asynchronous activities to complete");
while (not pending_activities.empty()) {
auto completed_one = pending_activities.wait_any();
if (completed_one != nullptr) {
$ ${bindir:=.}/s4u-activityset-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
> [0.000000] [alice] Send 'Message'
> [0.000000] [ bob] Create my asynchronous activities
-> [0.000000] [ bob] Wait for asynchrounous activities to complete
+> [0.000000] [ bob] Wait for asynchronous activities to complete
> [3.000000] [ bob] Completed an I/O
> [5.000000] [ bob] Completed an Exec
> [5.197828] [ bob] Completed a Comm
sg4::Mailbox* prev = nullptr;
sg4::Mailbox* next = nullptr;
sg4::Mailbox* me = nullptr;
- std::vector<sg4::CommPtr> pending_recvs;
- std::vector<sg4::CommPtr> pending_sends;
+ sg4::ActivitySet pending_recvs;
+ sg4::ActivitySet pending_sends;
unsigned long long received_bytes = 0;
unsigned int received_pieces = 0;
while (not done) {
sg4::CommPtr comm = me->get_async<FilePiece>(&received);
- pending_recvs.push_back(comm);
+ pending_recvs.push(comm);
- ssize_t idx = sg4::Comm::wait_any(pending_recvs);
- if (idx != -1) {
- comm = pending_recvs.at(idx);
+ auto completed_one = pending_recvs.wait_any();
+ if (completed_one != nullptr) {
+ comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
- pending_recvs.erase(pending_recvs.begin() + idx);
if (next != nullptr) {
XBT_DEBUG("Sending (asynchronously) from %s to %s", me->get_cname(), next->get_cname());
sg4::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
- pending_sends.push_back(send);
+ pending_sends.push(send);
} else
delete received;
void sendFile()
{
- std::vector<sg4::CommPtr> pending_sends;
+ sg4::ActivitySet pending_sends;
for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg4::Host::current()->get_cname(),
first->get_cname());
sg4::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
- pending_sends.push_back(comm);
+ pending_sends.push(comm);
}
- sg4::Comm::wait_all(pending_sends);
+ pending_sends.wait_all();
}
Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
p.joinChain();
p.forwardFile();
- sg4::Comm::wait_all(p.pending_sends);
+ p.pending_sends.wait_all();
double end_time = sg4::Engine::get_clock();
XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p.received_bytes,
void operator()() const
{
/* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
+ sg4::ActivitySet pending_comms;
/* Make a vector of the mailboxes to use */
std::vector<sg4::Mailbox*> mboxes;
auto* mbox = sg4::Mailbox::by_name(host->get_name());
mboxes.push_back(mbox);
sg4::CommPtr comm = mbox->put_async(payload, msg_size);
- pending_comms.push_back(comm);
+ pending_comms.push(comm);
}
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(pending_comms);
+ pending_comms.wait_all();
XBT_INFO("Goodbye now!");
}
auto comm2 = mailbox2->put_async((void*)666, 2);
XBT_INFO("Calling wait_any..");
- std::vector<sg4::CommPtr> pending_comms;
- pending_comms.push_back(comm1);
- pending_comms.push_back(comm2);
+ sg4::ActivitySet pending_comms;
+ pending_comms.push(comm1);
+ pending_comms.push(comm2);
try {
- long index = sg4::Comm::wait_any(pending_comms);
- XBT_INFO("Wait any returned index %ld (comm to %s)", index, pending_comms.at(index)->get_mailbox()->get_cname());
+ auto* acti = pending_comms.wait_any().get();
+ XBT_INFO("Wait any returned comm to %s", dynamic_cast<sg4::Comm*>(acti)->get_mailbox()->get_cname());
} catch (const simgrid::NetworkFailureException&) {
XBT_INFO("Sender has experienced a network failure exception, so it knows that something went wrong");
XBT_INFO("Now it needs to figure out which of the two comms failed by looking at their state:");
XBT_INFO("Waiting on a FAILED comm raises an exception: '%s'", e.what());
}
XBT_INFO("Wait for remaining comm, just to be nice");
- pending_comms.erase(pending_comms.begin());
- sg4::Comm::wait_any(pending_comms);
+ pending_comms.wait_all();
}
};
auto* host1 = zone->create_host("Host1", "1f");
auto* host2 = zone->create_host("Host2", "1f");
auto* host3 = zone->create_host("Host3", "1f");
+ auto* link2 = zone->create_link("linkto2", "1bps")->seal();
+ auto* link3 = zone->create_link("linkto3", "1bps")->seal();
- sg4::LinkInRoute linkto2{zone->create_link("linkto2", "1bps")->seal()};
- sg4::LinkInRoute linkto3{zone->create_link("linkto3", "1bps")->seal()};
-
- zone->add_route(host1->get_netpoint(), host2->get_netpoint(), nullptr, nullptr, {linkto2}, false);
- zone->add_route(host1->get_netpoint(), host3->get_netpoint(), nullptr, nullptr, {linkto3}, false);
+ zone->add_route(host1, host2, {link2});
+ zone->add_route(host1, host3, {link3});
zone->seal();
sg4::Actor::create("Sender", host1, Sender("mailbox2", "mailbox3"));
sg4::Actor::create("Receiver", host2, Receiver("mailbox2"));
sg4::Actor::create("Receiver", host3, Receiver("mailbox3"));
-
+
sg4::Actor::create("LinkKiller", host1, [](){
sg4::this_actor::sleep_for(10.0);
XBT_INFO("Turning off link 'linkto2'");
> [ 10.000000] (1:Sender@Host1) Comm to mailbox3 has state: STARTED
> [ 10.000000] (1:Sender@Host1) Waiting on a FAILED comm raises an exception: 'Cannot wait for a failed communication'
> [ 10.000000] (1:Sender@Host1) Wait for remaining comm, just to be nice
-> [ 16.494845] (3:Receiver@Host3) Receiver has received successfully!
+> [ 17.319588] (3:Receiver@Host3) Receiver has received successfully!
sg4::Mailbox* my_mbox = sg4::Mailbox::by_name("peer-" + std::to_string(my_id));
my_mbox->set_receiver(sg4::Actor::self());
- std::vector<sg4::CommPtr> pending_comms;
+ sg4::ActivitySet pending_comms;
/* Start dispatching all messages to peers others that myself */
for (int i = 0; i < messages_count; i++) {
// 'message' is not a stable storage location
XBT_INFO("Send '%s' to '%s'", message.c_str(), mbox->get_cname());
/* Create a communication representing the ongoing communication */
- pending_comms.push_back(mbox->put_async(payload, payload_size));
+ pending_comms.push(mbox->put_async(payload, payload_size));
}
}
}
if (peer_id != my_id) {
sg4::Mailbox* mbox = sg4::Mailbox::by_name("peer-" + std::to_string(peer_id));
auto* payload = new std::string("finalize"); // Make a copy of the data we will send
- pending_comms.push_back(mbox->put_async(payload, payload_size));
+ pending_comms.push(mbox->put_async(payload, payload_size));
XBT_INFO("Send 'finalize' to 'peer-%d'", peer_id);
}
}
}
XBT_INFO("I'm done, just waiting for my peers to receive the messages before exiting");
- sg4::Comm::wait_all(pending_comms);
+ pending_comms.wait_all();
XBT_INFO("Goodbye now!");
}
mailbox->put(payload, comm_size);
} else {
// Start all comms in parallel, and wait for all completions in one shot
- std::vector<sg4::CommPtr> comms;
+ sg4::ActivitySet comms;
for (int i = 0; i < flow_amount; i++)
- comms.push_back(mailbox->put_async(bprintf("%d", i), comm_size));
- sg4::Comm::wait_all(comms);
+ comms.push(mailbox->put_async(bprintf("%d", i), comm_size));
+ comms.wait_all();
}
XBT_INFO("sender done.");
}
std::vector<char*> data(flow_amount);
// Start all comms in parallel, and wait for their completion in one shot
- std::vector<sg4::CommPtr> comms;
+ sg4::ActivitySet comms;
for (int i = 0; i < flow_amount; i++)
- comms.push_back(mailbox->get_async<char>(&data[i]));
+ comms.push(mailbox->get_async<char>(&data[i]));
- sg4::Comm::wait_all(comms);
+ comms.wait_all();
for (int i = 0; i < flow_amount; i++)
xbt_free(data[i]);
}
// Define an amount of work that should take 1 second to execute.
double computation_amount = sg4::this_actor::get_host()->get_speed();
- std::vector<sg4::ExecPtr> pending_execs;
// Create a small DAG
// + Two parents and a child
// + First parent ends after 1 second and the Second parent after 2 seconds.
sg4::ExecPtr first_parent = sg4::this_actor::exec_init(computation_amount);
- pending_execs.push_back(first_parent);
sg4::ExecPtr second_parent = sg4::this_actor::exec_init(2 * computation_amount);
- pending_execs.push_back(second_parent);
sg4::ExecPtr child = sg4::Exec::init()->set_flops_amount(computation_amount);
- pending_execs.push_back(child);
+
+ sg4::ActivitySet pending_execs ({first_parent, second_parent, child});
// Name the activities (for logging purposes only)
first_parent->set_name("parent 1");
// wait for the completion of all activities
while (not pending_execs.empty()) {
- ssize_t changed_pos = sg4::Exec::wait_any_for(pending_execs, -1);
- XBT_INFO("Exec '%s' is complete", pending_execs[changed_pos]->get_cname());
- pending_execs.erase(pending_execs.begin() + changed_pos);
+ auto completed_one = pending_execs.wait_any();
+ if (completed_one != nullptr)
+ XBT_INFO("Exec '%s' is complete", completed_one->get_cname());
}
}
static void test()
{
- std::vector<sg4::ActivityPtr> pending_activities;
-
sg4::ExecPtr bob_compute = sg4::this_actor::exec_init(1e9);
- pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(bob_compute));
sg4::IoPtr bob_write = sg4::Host::current()->get_disks().front()->io_init(4000000, sg4::Io::OpType::WRITE);
- pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(bob_write));
sg4::IoPtr carl_read = sg4::Host::by_name("carl")->get_disks().front()->io_init(4000000, sg4::Io::OpType::READ);
- pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(carl_read));
sg4::ExecPtr carl_compute = sg4::Host::by_name("carl")->exec_init(1e9);
- pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(carl_compute));
+
+ sg4::ActivitySet pending_activities ({boost::dynamic_pointer_cast<sg4::Activity>(bob_compute),
+ boost::dynamic_pointer_cast<sg4::Activity>(bob_write),
+ boost::dynamic_pointer_cast<sg4::Activity>(carl_read),
+ boost::dynamic_pointer_cast<sg4::Activity>(carl_compute)});
// Name the activities (for logging purposes only)
bob_compute->set_name("bob compute");
// wait for the completion of all activities
while (not pending_activities.empty()) {
- ssize_t changed_pos = sg4::Activity::wait_any(pending_activities);
- XBT_INFO("Activity '%s' is complete", pending_activities[changed_pos]->get_cname());
- pending_activities.erase(pending_activities.begin() + changed_pos);
+ auto completed_one = pending_activities.wait_any();
+ if (completed_one != nullptr)
+ XBT_INFO("Activity '%s' is complete", completed_one->get_cname());
}
}
/* add link UP/DOWN for communications from the host */
root->add_route(host->get_netpoint(), nullptr, nullptr, nullptr, {{l, sg4::LinkInRoute::Direction::UP}}, true);
- const sg4::Link* loopback = root->create_link(hostname + "_loopback", BW_LOCAL)->set_latency(LATENCY)->seal();
- root->add_route(host->get_netpoint(), host->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(loopback)});
+ sg4::Link* loopback = root->create_link(hostname + "_loopback", BW_LOCAL)->set_latency(LATENCY)->seal();
+ root->add_route(host, host, {loopback});
}
root->seal();
*/
#include <simgrid/s4u.hpp>
+#include <string>
namespace sg4 = simgrid::s4u;
void operator()() const
{
// sphinx-doc: init-begin (this line helps the doc to build; ignore it)
- /* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
+ /* ActivitySet in which we store all ongoing communications */
+ sg4::ActivitySet pending_comms;
- /* Make a vector of the mailboxes to use */
+ /* Mailbox to use */
sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver");
// sphinx-doc: init-end
/* Create a communication representing the ongoing communication, and store it in pending_comms */
sg4::CommPtr comm = mbox->put_async(payload, size);
- pending_comms.push_back(comm);
+ pending_comms.push(comm);
}
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(pending_comms);
+ pending_comms.wait_all();
// sphinx-doc: put-end
XBT_INFO("Goodbye now!");
explicit Receiver(int count) : messages_count(count) { mbox = sg4::Mailbox::by_name("receiver"); }
void operator()()
{
- /* Vector in which we store all incoming msgs */
- std::vector<std::unique_ptr<std::string*>> pending_msgs;
- std::vector<sg4::CommPtr> pending_comms;
+ /* Where we store all incoming msgs */
+ std::unordered_map<sg4::CommPtr, std::shared_ptr<std::string*>> pending_msgs;
+ sg4::ActivitySet pending_comms;
XBT_INFO("Wait for %d messages asynchronously", messages_count);
for (int i = 0; i < messages_count; i++) {
- pending_msgs.push_back(std::make_unique<std::string*>());
- pending_comms.emplace_back(mbox->get_async<std::string>(pending_msgs[i].get()));
+ std::shared_ptr<std::string*> msg =std::make_shared<std::string*>();
+ auto comm = mbox->get_async<std::string>(msg.get());
+ pending_comms.push(comm);
+ pending_msgs.insert({comm, msg});
}
+
while (not pending_comms.empty()) {
- ssize_t index = sg4::Comm::wait_any(pending_comms);
- std::string* msg = *pending_msgs[index];
- XBT_INFO("I got '%s'.", msg->c_str());
- /* cleanup memory and remove from vectors */
- delete msg;
- pending_comms.erase(pending_comms.begin() + index);
- pending_msgs.erase(pending_msgs.begin() + index);
+ auto completed_one = pending_comms.wait_any();
+ if (completed_one != nullptr){
+ auto comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
+ auto msg = *pending_msgs[comm];
+ XBT_INFO("I got '%s'.", msg->c_str());
+ /* cleanup memory and remove from map */
+ delete msg;
+ pending_msgs.erase(comm);
+ }
}
}
};
link->set_latency(10e-6)->seal();
/* create routes between nodes */
- zone->add_route(sender->get_netpoint(), receiver->get_netpoint(), nullptr, nullptr,
- {{link, sg4::LinkInRoute::Direction::UP}}, true);
+ zone->add_route(sender, receiver, {link});
zone->seal();
/* create actors Sender/Receiver */
*
* This example is very similar to the other asynchronous communication examples, but messages get serialized by the platform.
* Without this call to Link::set_concurrency_limit(2) in main, all messages would be received at the exact same timestamp since
- * they are initiated at the same instant and are of the same size. But with this extra configuration to the link, at most 2
+ * they are initiated at the same instant and are of the same size. But with this extra configuration to the link, at most 2
* messages can travel through the link at the same time.
*/
void operator()() const
{
// sphinx-doc: init-begin (this line helps the doc to build; ignore it)
- /* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
+ /* ActivitySet in which we store all ongoing communications */
+ sg4::ActivitySet pending_comms;
- /* Make a vector of the mailboxes to use */
+ /* Mailbox to use */
sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver");
// sphinx-doc: init-end
/* Create a communication representing the ongoing communication, and store it in pending_comms */
sg4::CommPtr comm = mbox->put_async(payload, msg_size);
- pending_comms.push_back(comm);
+ pending_comms.push(comm);
}
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(pending_comms);
+ pending_comms.wait_all();
// sphinx-doc: put-end
XBT_INFO("Goodbye now!");
explicit Receiver(int count) : messages_count(count) { mbox = sg4::Mailbox::by_name("receiver"); }
void operator()()
{
- /* Vector in which we store all incoming msgs */
- std::vector<std::unique_ptr<std::string*>> pending_msgs;
- std::vector<sg4::CommPtr> pending_comms;
+ /* Where we store all incoming msgs */
+ std::unordered_map<sg4::CommPtr, std::shared_ptr<std::string*>> pending_msgs;
+ sg4::ActivitySet pending_comms;
XBT_INFO("Wait for %d messages asynchronously", messages_count);
for (int i = 0; i < messages_count; i++) {
- pending_msgs.push_back(std::make_unique<std::string*>());
- pending_comms.emplace_back(mbox->get_async<std::string>(pending_msgs[i].get()));
+ std::shared_ptr<std::string*> msg =std::make_shared<std::string*>();
+ auto comm = mbox->get_async<std::string>(msg.get());
+ pending_comms.push(comm);
+ pending_msgs.insert({comm, msg});
}
+
while (not pending_comms.empty()) {
- ssize_t index = sg4::Comm::wait_any(pending_comms);
- std::string* msg = *pending_msgs[index];
- XBT_INFO("I got '%s'.", msg->c_str());
- /* cleanup memory and remove from vectors */
- delete msg;
- pending_comms.erase(pending_comms.begin() + index);
- pending_msgs.erase(pending_msgs.begin() + index);
+ auto completed_one = pending_comms.wait_any();
+ if (completed_one != nullptr){
+ auto comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
+ auto msg = *pending_msgs[comm];
+ XBT_INFO("I got '%s'.", msg->c_str());
+ /* cleanup memory and remove from map */
+ delete msg;
+ pending_msgs.erase(comm);
+ }
}
}
};
zone->create_split_duplex_link("link1", 10e9)->set_latency(10e-6)->set_concurrency_limit(2)->seal();
/* create routes between nodes */
- zone->add_route(sender->get_netpoint(), receiver->get_netpoint(), nullptr, nullptr,
- {{link, sg4::LinkInRoute::Direction::UP}}, true);
+ zone->add_route(sender, receiver, {link});
zone->seal();
/* create actors Sender/Receiver */
// Add a new host programatically, and attach a state profile to it
auto* root = e.get_netzone_root();
auto* lilibeth = root->create_host("Lilibeth", 1e15);
- auto link = sg4::LinkInRoute(e.link_by_name("10"));
- root->add_route(e.host_by_name("Tremblay")->get_netpoint(), lilibeth->get_netpoint(), nullptr, nullptr, {link}, true);
+ auto link = e.link_by_name("10");
+ root->add_route(e.host_by_name("Tremblay"), lilibeth, {link});
lilibeth->set_state_profile(simgrid::kernel::profile::ProfileBuilder::from_string("lilibeth_profile", R"(
4 0
5 1
--- /dev/null
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include "simgrid/plugins/jbod.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(jbod_test, "Messages specific for this simulation");
+namespace sg4 = simgrid::s4u;
+
+static void write_then_read(simgrid::plugin::Jbod* jbod)
+{
+ simgrid::plugin::JbodIoPtr io = jbod->write_async(1e7);
+ XBT_INFO("asynchronous write posted, wait for it");
+ io->wait();
+ XBT_INFO("asynchronous write done");
+ jbod->read(1e7);
+ XBT_INFO("synchonous read done");
+ jbod->write(1e7);
+ XBT_INFO("synchonous write done");
+ io = jbod->read_async(1e7);
+ XBT_INFO("asynchronous read posted, wait for it");
+ io->wait();
+ XBT_INFO("asynchonous read done");
+ jbod->write(1e7);
+ XBT_INFO("synchonous write done");
+ jbod->read(1e7);
+ XBT_INFO("synchonous read done");
+ jbod->read(1e7);
+ XBT_INFO("synchonous read done");
+}
+
+int main(int argc, char** argv)
+{
+ sg4::Engine e(&argc, argv);
+ auto* zone = sg4::create_full_zone("zone");
+ auto* host = zone->create_host("host", "1Gf");
+ // set up link so that data transfer from host to JBOD takes exactly 1 second (without crosstraffic)
+ auto* link = zone->create_link("link", 1e7/0.97)->set_latency(0);
+
+ auto* jbod_raid0 =
+ simgrid::plugin::Jbod::create_jbod(zone, "jbod_raid0", 1e9, 4, simgrid::plugin::Jbod::RAID::RAID0, 1e7, 5e6);
+ zone->add_route(host->get_netpoint(), jbod_raid0->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(link)});
+
+ auto* jbod_raid1 =
+ simgrid::plugin::Jbod::create_jbod(zone, "jbod_raid1", 1e9, 4, simgrid::plugin::Jbod::RAID::RAID1, 1e7, 5e6);
+ zone->add_route(host->get_netpoint(), jbod_raid1->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(link)});
+
+ auto* jbod_raid4 =
+ simgrid::plugin::Jbod::create_jbod(zone, "jbod_raid4", 1e9, 4, simgrid::plugin::Jbod::RAID::RAID4, 1e7, 5e6);
+ zone->add_route(host->get_netpoint(), jbod_raid4->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(link)});
+
+ auto* jbod_raid5 =
+ simgrid::plugin::Jbod::create_jbod(zone, "jbod_raid5", 1e9, 4, simgrid::plugin::Jbod::RAID::RAID5, 1e7, 5e6);
+ zone->add_route(host->get_netpoint(), jbod_raid5->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(link)});
+
+ auto* jbod_raid6 =
+ simgrid::plugin::Jbod::create_jbod(zone, "jbod_raid6", 1e9, 4, simgrid::plugin::Jbod::RAID::RAID6, 1e7, 5e6);
+ zone->add_route(host->get_netpoint(), jbod_raid6->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(link)});
+
+ zone->seal();
+
+ XBT_INFO("XXXXXXXXXXXXXXX RAID 0 XXXXXXXXXXXXXXXX");
+ sg4::Actor::create("", host, write_then_read, jbod_raid0);
+ e.run();
+
+ XBT_INFO("XXXXXXXXXXXXXXX RAID 1 XXXXXXXXXXXXXXXX");
+ sg4::Actor::create("", host, write_then_read, jbod_raid1);
+ e.run();
+
+ XBT_INFO("XXXXXXXXXXXXXXX RAID 4 XXXXXXXXXXXXXXXX");
+ sg4::Actor::create("", host, write_then_read, jbod_raid4);
+ e.run();
+
+ XBT_INFO("XXXXXXXXXXXXXXX RAID 5 XXXXXXXXXXXXXXXX");
+ sg4::Actor::create("", host, write_then_read, jbod_raid5);
+ e.run();
+
+ XBT_INFO("XXXXXXXXXXXXXXX RAID 6 XXXXXXXXXXXXXXXX");
+ sg4::Actor::create("", host, write_then_read, jbod_raid6);
+ e.run();
+
+ XBT_INFO("Simulated time: %g", sg4::Engine::get_clock());
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir}/s4u-plugin-jbod --cfg=network/crosstraffic:0 "--log=root.fmt:[%10.6r]%e%m%n"
+> [ 0.000000] Configuration change: Set 'network/crosstraffic' to '0'
+> [ 0.000000] XXXXXXXXXXXXXXX RAID 0 XXXXXXXXXXXXXXXX
+> [ 0.000000] asynchronous write posted, wait for it
+> [ 1.500000] asynchronous write done
+> [ 2.750000] synchonous read done
+> [ 4.250000] synchonous write done
+> [ 4.250000] asynchronous read posted, wait for it
+> [ 5.500000] asynchonous read done
+> [ 7.000000] synchonous write done
+> [ 8.250000] synchonous read done
+> [ 9.500000] synchonous read done
+> [ 9.500000] XXXXXXXXXXXXXXX RAID 1 XXXXXXXXXXXXXXXX
+> [ 9.500000] asynchronous write posted, wait for it
+> [ 12.500000] asynchronous write done
+> [ 14.500000] synchonous read done
+> [ 17.500000] synchonous write done
+> [ 17.500000] asynchronous read posted, wait for it
+> [ 19.500000] asynchonous read done
+> [ 22.500000] synchonous write done
+> [ 24.500000] synchonous read done
+> [ 26.500000] synchonous read done
+> [ 26.500000] XXXXXXXXXXXXXXX RAID 4 XXXXXXXXXXXXXXXX
+> [ 26.500000] asynchronous write posted, wait for it
+> [ 28.170000] asynchronous write done
+> [ 29.503333] synchonous read done
+> [ 31.173333] synchonous write done
+> [ 31.173333] asynchronous read posted, wait for it
+> [ 32.506666] asynchonous read done
+> [ 34.176666] synchonous write done
+> [ 35.510000] synchonous read done
+> [ 36.843333] synchonous read done
+> [ 36.843333] XXXXXXXXXXXXXXX RAID 5 XXXXXXXXXXXXXXXX
+> [ 36.843333] asynchronous write posted, wait for it
+> [ 38.513333] asynchronous write done
+> [ 39.846666] synchonous read done
+> [ 41.516666] synchonous write done
+> [ 41.516666] asynchronous read posted, wait for it
+> [ 42.849999] asynchonous read done
+> [ 44.519999] synchonous write done
+> [ 45.853333] synchonous read done
+> [ 47.186666] synchonous read done
+> [ 47.186666] XXXXXXXXXXXXXXX RAID 6 XXXXXXXXXXXXXXXX
+> [ 47.186666] asynchronous write posted, wait for it
+> [ 50.186666] asynchronous write done
+> [ 51.686666] synchonous read done
+> [ 54.686666] synchonous write done
+> [ 54.686666] asynchronous read posted, wait for it
+> [ 56.186666] asynchonous read done
+> [ 59.186666] synchonous write done
+> [ 60.686666] synchonous read done
+> [ 62.186666] synchonous read done
+> [ 62.186666] Simulated time: 62.1867
\ No newline at end of file
}
auto* router = cluster->create_router("cluster_router");
- cluster->add_route(router, nullptr, nullptr, nullptr, {});
+ std::vector<sg4::LinkInRoute> links; // empty
+ cluster->add_route(router, nullptr, nullptr, nullptr, links);
+ cluster->seal();
simgrid::plugin::ProducerConsumerPtr<int> pc = simgrid::plugin::ProducerConsumer<int>::create(2);
foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
+ activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
app-masterworkers
- comm-wait comm-waitall comm-waitallfor comm-waitany comm-failure comm-host2host comm-pingpong
- comm-ready comm-suspend comm-testany comm-throttling comm-waitallfor comm-waituntil
+ comm-wait comm-failure comm-host2host comm-pingpong
+ comm-ready comm-suspend comm-throttling comm-waituntil
exec-async exec-basic exec-dvfs exec-remote exec-ptask
task-io task-simple task-switch-host task-variable-load
platform-comm-serialize platform-profile platform-failures
--- /dev/null
+# Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.
+#
+# This program is free software you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+Usage: activityset-testany.py platform_file [other parameters]
+"""
+
+import sys
+from simgrid import Actor, ActivitySet, Engine, Comm, Exec, Io, Host, Mailbox, this_actor
+
+def bob():
+ mbox = Mailbox.by_name("mbox")
+ disk = Host.current().get_disks()[0]
+
+ this_actor.info("Create my asynchronous activities")
+ exec = this_actor.exec_async(5e9)
+ comm = mbox.get_async()
+ io = disk.read_async(300000000)
+
+ pending_activities = ActivitySet([exec, comm])
+ pending_activities.push(io) # Activities can be pushed after creation, too
+
+ this_actor.info("Sleep_for a while")
+ this_actor.sleep_for(1)
+
+ this_actor.info("Test for completed activities")
+ while not pending_activities.empty():
+ completed_one = pending_activities.test_any()
+ if completed_one == None:
+ this_actor.info("Nothing matches, test again in 0.5s")
+ this_actor.sleep_for(.5)
+ elif isinstance(completed_one, Comm):
+ this_actor.info("Completed a Comm")
+ elif isinstance(completed_one, Exec):
+ this_actor.info("Completed an Exec")
+ elif isinstance(completed_one, Io):
+ this_actor.info("Completed an I/O")
+
+ this_actor.info("Last activity is complete")
+
+def alice():
+ this_actor.info("Send 'Message'")
+ Mailbox.by_name("mbox").put("Message", 600000000)
+
+if __name__ == '__main__':
+ e = Engine(sys.argv)
+ e.set_log_control("root.fmt:[%4.2r]%e[%5a]%e%m%n")
+
+ # Load the platform description
+ e.load_platform(sys.argv[1])
+
+ Actor.create("bob", Host.by_name("bob"), bob)
+ Actor.create("alice", Host.by_name("alice"), alice)
+
+ e.run()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/activityset-testany.py ${platfdir}/hosts_with_disks.xml
+> [0.00] [alice] Send 'Message'
+> [0.00] [ bob] Create my asynchronous activities
+> [0.00] [ bob] Sleep_for a while
+> [1.00] [ bob] Test for completed activities
+> [1.00] [ bob] Nothing matches, test again in 0.5s
+> [1.50] [ bob] Nothing matches, test again in 0.5s
+> [2.00] [ bob] Nothing matches, test again in 0.5s
+> [2.50] [ bob] Nothing matches, test again in 0.5s
+> [3.00] [ bob] Completed an I/O
+> [3.00] [ bob] Nothing matches, test again in 0.5s
+> [3.50] [ bob] Nothing matches, test again in 0.5s
+> [4.00] [ bob] Nothing matches, test again in 0.5s
+> [4.50] [ bob] Nothing matches, test again in 0.5s
+> [5.00] [ bob] Completed an Exec
+> [5.00] [ bob] Nothing matches, test again in 0.5s
+> [5.50] [ bob] Completed a Comm
+> [5.50] [ bob] Last activity is complete
--- /dev/null
+# Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.
+#
+# This program is free software you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+Usage: activityset-waitall.py platform_file [other parameters]
+"""
+
+import sys
+from simgrid import Actor, ActivitySet, Engine, Comm, Exec, Io, Host, Mailbox, this_actor
+
+def bob():
+ mbox = Mailbox.by_name("mbox")
+ disk = Host.current().get_disks()[0]
+
+ this_actor.info("Create my asynchronous activities")
+ exec = this_actor.exec_async(5e9)
+ comm = mbox.get_async()
+ io = disk.read_async(300000000)
+
+ pending_activities = ActivitySet([exec, comm])
+ pending_activities.push(io) # Activities can be pushed after creation, too
+
+ this_actor.info("Wait for asynchronous activities to complete, all in one shot.")
+ pending_activities.wait_all()
+
+ this_actor.info("All activities are completed.")
+
+def alice():
+ this_actor.info("Send 'Message'")
+ Mailbox.by_name("mbox").put("Message", 600000000)
+
+if __name__ == '__main__':
+ e = Engine(sys.argv)
+ e.set_log_control("root.fmt:[%4.6r]%e[%5a]%e%m%n")
+
+ # Load the platform description
+ e.load_platform(sys.argv[1])
+
+ Actor.create("bob", Host.by_name("bob"), bob)
+ Actor.create("alice", Host.by_name("alice"), alice)
+
+ e.run()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/activityset-waitall.py ${platfdir}/hosts_with_disks.xml
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchronous activities to complete, all in one shot.
+> [5.197828] [ bob] All activities are completed.
--- /dev/null
+# Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.
+#
+# This program is free software you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+Usage: activityset-waitallfor.py platform_file [other parameters]
+"""
+
+import sys
+from simgrid import Actor, ActivitySet, Engine, Comm, Exec, Io, Host, Mailbox, this_actor, TimeoutException
+
+def bob():
+ mbox = Mailbox.by_name("mbox")
+ disk = Host.current().get_disks()[0]
+
+ this_actor.info("Create my asynchronous activities")
+ exec = this_actor.exec_async(5e9)
+ comm = mbox.get_async()
+ io = disk.read_async(300000000)
+
+ pending_activities = ActivitySet([exec, comm])
+ pending_activities.push(io) # Activities can be pushed after creation, too
+
+ this_actor.info("Wait for asynchronous activities to complete")
+ while not pending_activities.empty():
+ try:
+ pending_activities.wait_all_for(1)
+ except TimeoutException:
+ this_actor.info("Not all activities are terminated yet.")
+
+ completed_one = pending_activities.test_any()
+ while completed_one != None:
+ if isinstance(completed_one, Comm):
+ this_actor.info("Completed a Comm")
+ elif isinstance(completed_one, Exec):
+ this_actor.info("Completed an Exec")
+ elif isinstance(completed_one, Io):
+ this_actor.info("Completed an I/O")
+ completed_one = pending_activities.test_any()
+
+ this_actor.info("Last activity is complete")
+
+def alice():
+ this_actor.info("Send 'Message'")
+ Mailbox.by_name("mbox").put("Message", 600000000)
+
+if __name__ == '__main__':
+ e = Engine(sys.argv)
+ e.set_log_control("root.fmt:[%4.6r]%e[%5a]%e%m%n")
+
+ # Load the platform description
+ e.load_platform(sys.argv[1])
+
+ Actor.create("bob", Host.by_name("bob"), bob)
+ Actor.create("alice", Host.by_name("alice"), alice)
+
+ e.run()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/activityset-waitallfor.py ${platfdir}/hosts_with_disks.xml
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchronous activities to complete
+> [1.000000] [ bob] Not all activities are terminated yet.
+> [2.000000] [ bob] Not all activities are terminated yet.
+> [3.000000] [ bob] Not all activities are terminated yet.
+> [3.000000] [ bob] Completed an I/O
+> [4.000000] [ bob] Not all activities are terminated yet.
+> [5.000000] [ bob] Not all activities are terminated yet.
+> [5.000000] [ bob] Completed an Exec
+> [5.197828] [ bob] Completed a Comm
+> [5.197828] [ bob] Last activity is complete
--- /dev/null
+# Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.
+#
+# This program is free software you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+Usage: activityset-waitany.py platform_file [other parameters]
+"""
+
+import sys
+from simgrid import Actor, ActivitySet, Engine, Comm, Exec, Io, Host, Mailbox, this_actor
+
+def bob():
+ mbox = Mailbox.by_name("mbox")
+ disk = Host.current().get_disks()[0]
+
+ this_actor.info("Create my asynchronous activities")
+ exec = this_actor.exec_async(5e9)
+ comm = mbox.get_async()
+ io = disk.read_async(300000000)
+
+ pending_activities = ActivitySet([exec, comm])
+ pending_activities.push(io) # Activities can be pushed after creation, too
+
+ this_actor.info("Wait for asynchronous activities to complete")
+ while not pending_activities.empty():
+ completed_one = pending_activities.wait_any()
+
+ if isinstance(completed_one, Comm):
+ this_actor.info("Completed a Comm")
+ elif isinstance(completed_one, Exec):
+ this_actor.info("Completed an Exec")
+ elif isinstance(completed_one, Io):
+ this_actor.info("Completed an I/O")
+
+ this_actor.info("Last activity is complete")
+
+def alice():
+ this_actor.info("Send 'Message'")
+ Mailbox.by_name("mbox").put("Message", 600000000)
+
+if __name__ == '__main__':
+ e = Engine(sys.argv)
+ e.set_log_control("root.fmt:[%4.6r]%e[%5a]%e%m%n")
+
+ # Load the platform description
+ e.load_platform(sys.argv[1])
+
+ Actor.create("bob", Host.by_name("bob"), bob)
+ Actor.create("alice", Host.by_name("alice"), alice)
+
+ e.run()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/activityset-waitany.py ${platfdir}/hosts_with_disks.xml
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchronous activities to complete
+> [3.000000] [ bob] Completed an I/O
+> [5.000000] [ bob] Completed an Exec
+> [5.197828] [ bob] Completed a Comm
+> [5.197828] [ bob] Last activity is complete
# Actors that are created as object will execute their __call__ method.
# So, the following constitutes the main function of the Sender actor.
def __call__(self):
- pending_comms = []
+ pending_comms = simgrid.ActivitySet()
mboxes = []
for host in self.hosts:
msg = "Hello, I'm alive and running on " + simgrid.this_actor.get_host().name
mbox = simgrid.Mailbox.by_name(host.name)
mboxes.append(mbox)
- pending_comms.append(mbox.put_async(msg, self.msg_size))
+ pending_comms.push(mbox.put_async(msg, self.msg_size))
simgrid.this_actor.info("Done dispatching all messages")
# Now that all message exchanges were initiated, wait for their completion in one single call
- simgrid.Comm.wait_all(pending_comms)
+ pending_comms.wait_all()
simgrid.this_actor.info("Goodbye now!")
import sys
-from simgrid import Engine, Actor, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException
+from simgrid import Engine, Actor, ActivitySet, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException
def sender(mailbox1_name: str, mailbox2_name: str) -> None:
comm2: Comm = mailbox2.put_async(666, 2)
this_actor.info("Calling wait_any..")
- pending_comms = [comm1, comm2]
+ pending_comms = ActivitySet([comm1, comm2])
try:
- index = Comm.wait_any([comm1, comm2])
- this_actor.info(f"Wait any returned index {index} (comm to {pending_comms[index].mailbox.name})")
+ comm = pending_comms.wait_any()
+ this_actor.info(f"Wait any returned a comm to {comm.mailbox.name})")
except NetworkFailureException:
this_actor.info("Sender has experienced a network failure exception, so it knows that something went wrong")
this_actor.info("Now it needs to figure out which of the two comms failed by looking at their state:")
this_actor.info(f"Waiting on a FAILED comm raises an exception: '{err}'")
this_actor.info("Wait for remaining comm, just to be nice")
- pending_comms.pop(0)
try:
- Comm.wait_any(pending_comms)
+ pending_comms.wait_all()
except Exception as e:
this_actor.warning(str(e))
host2 = zone.create_host("Host2", "1f")
host3 = zone.create_host("Host3", "1f")
- link_to_2 = LinkInRoute(zone.create_link("link_to_2", "1bps").seal())
- link_to_3 = LinkInRoute(zone.create_link("link_to_3", "1bps").seal())
+ link_to_2 = zone.create_link("link_to_2", "1bps").seal()
+ link_to_3 = zone.create_link("link_to_3", "1bps").seal()
- zone.add_route(host1.netpoint, host2.netpoint, None, None, [link_to_2], False)
- zone.add_route(host1.netpoint, host3.netpoint, None, None, [link_to_3], False)
+ zone.add_route(host1, host2, [link_to_2])
+ zone.add_route(host1, host3, [link_to_3])
zone.seal()
Actor.create("Sender", host1, sender, "mailbox2", "mailbox3")
> [ 10.000000] (1:Sender@Host1) Comm to mailbox3 has state: STARTED
> [ 10.000000] (1:Sender@Host1) Waiting on a FAILED comm raises an exception: 'Cannot wait for a failed communication'
> [ 10.000000] (1:Sender@Host1) Wait for remaining comm, just to be nice
-> [ 16.494845] (3:Receiver-2@Host3) Receiver has received successfully (mailbox3)!
+> [ 17.319588] (3:Receiver-2@Host3) Receiver has received successfully (mailbox3)!
from typing import List
import sys
-from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+from simgrid import Actor, ActivitySet, Comm, Engine, Mailbox, this_actor
FINALIZE_MESSAGE = "finalize"
def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
my_mailbox: Mailbox = get_peer_mailbox(my_id)
my_mailbox.set_receiver(Actor.self())
- pending_comms: List[Comm] = []
+ pending_comms = ActivitySet()
# Start dispatching all messages to peers others that myself
for i in range(message_count):
for peer_id in range(peers_count):
peer_mailbox = get_peer_mailbox(peer_id)
message = f"Message {i} from peer {my_id}"
this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'")
- pending_comms.append(peer_mailbox.put_async(message, payload_size))
+ pending_comms.push(peer_mailbox.put_async(message, payload_size))
# Start sending messages to let peers know that they should stop
for peer_id in range(peers_count):
if peer_id != my_id:
peer_mailbox = get_peer_mailbox(peer_id)
payload = str(FINALIZE_MESSAGE)
- pending_comms.append(peer_mailbox.put_async(payload, payload_size))
+ pending_comms.push(peer_mailbox.put_async(payload, payload_size))
this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'")
this_actor.info("Done dispatching all messages")
this_actor.sleep_for(0.01)
this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting")
- Comm.wait_all(pending_comms)
+ pending_comms.wait_all()
this_actor.info("Goodbye now!")
+++ /dev/null
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-from argparse import ArgumentParser
-from typing import List
-import sys
-
-from simgrid import Engine, Actor, Comm, Mailbox, this_actor
-
-
-def create_parser() -> ArgumentParser:
- parser = ArgumentParser()
- parser.add_argument(
- '--platform',
- type=str,
- required=True,
- help='path to the platform description'
- )
- return parser
-
-
-def rank0():
- rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
- this_actor.info("Post my asynchronous receives")
- comm1, a1 = rank0_mailbox.get_async()
- comm2, a2 = rank0_mailbox.get_async()
- comm3, a3 = rank0_mailbox.get_async()
- pending_comms: List[Comm] = [comm1, comm2, comm3]
-
- this_actor.info("Send some data to rank-1")
- rank1_mailbox: Mailbox = Mailbox.by_name("rank1")
- for i in range(3):
- rank1_mailbox.put(i, 1)
-
- this_actor.info("Test for completed comms")
- while pending_comms:
- flag = Comm.test_any(pending_comms)
- if flag != -1:
- pending_comms.pop(flag)
- this_actor.info("Remove a pending comm.")
- else:
- # Nothing matches, wait for a little bit
- this_actor.sleep_for(0.1)
- this_actor.info("Last comm is complete")
-
-
-def rank1():
- rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
- rank1_mailbox: Mailbox = Mailbox.by_name("rank1")
- for i in range(3):
- data: int = rank1_mailbox.get()
- this_actor.info(f"Received {data}")
- msg_content = f"Message {i}"
- this_actor.info(f"Send '{msg_content}'")
- rank0_mailbox.put(msg_content, int(1e6))
-
-
-def main():
- settings = create_parser().parse_known_args()[0]
- e = Engine(sys.argv)
- e.load_platform(settings.platform)
-
- Actor.create("rank0", e.host_by_name("Tremblay"), rank0)
- Actor.create("rank1", e.host_by_name("Fafard"), rank1)
-
- e.run()
-
-
-if __name__ == "__main__":
- main()
+++ /dev/null
-#!/usr/bin/env tesh
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-testany.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[ 0.000000] (1:rank0@Tremblay) Post my asynchronous receives
->[ 0.000000] (1:rank0@Tremblay) Send some data to rank-1
->[ 0.025708] (2:rank1@Fafard) Received 0
->[ 0.025708] (2:rank1@Fafard) Send 'Message 0'
->[ 0.209813] (2:rank1@Fafard) Received 1
->[ 0.209813] (2:rank1@Fafard) Send 'Message 1'
->[ 0.393918] (1:rank0@Tremblay) Test for completed comms
->[ 0.393918] (2:rank1@Fafard) Received 2
->[ 0.393918] (2:rank1@Fafard) Send 'Message 2'
->[ 0.393918] (1:rank0@Tremblay) Remove a pending comm.
->[ 0.393918] (1:rank0@Tremblay) Remove a pending comm.
->[ 0.593918] (1:rank0@Tremblay) Remove a pending comm.
->[ 0.593918] (1:rank0@Tremblay) Last comm is complete
+++ /dev/null
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-"""
-This example shows how to block on the completion of a set of communications.
-
-As for the other asynchronous examples, the sender initiate all the messages it wants to send and
-pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
-
-The sender then blocks until all ongoing communication terminate, using simgrid.Comm.wait_all()
-"""
-
-import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
-
-
-def sender(messages_count, msg_size, receivers_count):
- # List in which we store all ongoing communications
- pending_comms = []
-
- # Vector of the used mailboxes
- mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
- for i in range(0, receivers_count)]
-
- # Start dispatching all messages to receivers, in a round robin fashion
- for i in range(0, messages_count):
- content = "Message {:d}".format(i)
- mbox = mboxes[i % receivers_count]
-
- this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
-
- # Create a communication representing the ongoing communication, and store it in pending_comms
- comm = mbox.put_async(content, msg_size)
- pending_comms.append(comm)
-
- # Start sending messages to let the workers know that they should stop
- for i in range(0, receivers_count):
- mbox = mboxes[i]
- this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
- comm = mbox.put_async("finalize", 0)
- pending_comms.append(comm)
-
- this_actor.info("Done dispatching all messages")
-
- # Now that all message exchanges were initiated, wait for their completion in one single call
- Comm.wait_all(pending_comms)
-
- this_actor.info("Goodbye now!")
-
-
-def receiver(my_id):
- mbox = Mailbox.by_name("receiver-{:d}".format(my_id))
-
- this_actor.info("Wait for my first message")
- while True:
- received = mbox.get()
- this_actor.info("I got a '{:s}'.".format(received))
- if received == "finalize":
- break # If it's a finalize message, we're done.
-
-
-if __name__ == '__main__':
- e = Engine(sys.argv)
-
- # Load the platform description
- e.load_platform(sys.argv[1])
-
- Actor.create("sender", Host.by_name("Tremblay"), sender, 5, 1000000, 2)
- Actor.create("receiver", Host.by_name("Ruby"), receiver, 0)
- Actor.create("receiver", Host.by_name("Perl"), receiver, 1)
-
- e.run()
+++ /dev/null
-#!/usr/bin/env tesh
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitall.py ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (2:receiver@Ruby) Wait for my first message
-> [ 0.000000] (3:receiver@Perl) Wait for my first message
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [ 0.004022] (2:receiver@Ruby) I got a 'Message 0'.
-> [ 0.004022] (3:receiver@Perl) I got a 'Message 1'.
-> [ 0.008043] (2:receiver@Ruby) I got a 'Message 2'.
-> [ 0.008043] (3:receiver@Perl) I got a 'Message 3'.
-> [ 0.009995] (3:receiver@Perl) I got a 'finalize'.
-> [ 0.012065] (2:receiver@Ruby) I got a 'Message 4'.
-> [ 0.014016] (2:receiver@Ruby) I got a 'finalize'.
-> [ 0.014016] (1:sender@Tremblay) Goodbye now!
+++ /dev/null
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-"""
-This example implements the following scenario:
-- Multiple workers consume jobs (Job) from a shared mailbox (worker)
-- A client first dispatches several jobs (with a simulated 'cost' - i.e. time the worker will 'process' the job)
-- The client then waits for all job results for a maximum time set by the 'wait timeout' (Comm.wait_all_for)
-- The client then displays the status of individual jobs.
-"""
-
-
-from argparse import ArgumentParser
-from dataclasses import dataclass
-from typing import List
-from uuid import uuid4
-import sys
-
-from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
-
-
-SIMULATED_JOB_SIZE_BYTES = 1024
-SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
-
-
-def parse_requests(requests_str: str) -> List[float]:
- return [float(item.strip()) for item in requests_str.split(",")]
-
-
-def create_parser() -> ArgumentParser:
- parser = ArgumentParser()
- parser.add_argument(
- '--platform',
- type=str,
- required=True,
- help='path to the platform description'
- )
- parser.add_argument(
- "--workers",
- type=int,
- default=1,
- help="number of worker actors to start"
- )
- parser.add_argument(
- "--jobs",
- type=parse_requests,
- default="1,2,3,4,5",
- help="duration of individual jobs sent to the workers by the client"
- )
- parser.add_argument(
- "--wait-timeout",
- type=float,
- default=5.0,
- help="number of seconds before the client gives up waiting for results and aborts the simulation"
- )
- return parser
-
-
-@dataclass
-class Job:
- job_id: str
- duration: float
- result_mailbox: Mailbox
-
-
-def worker(worker_id: str):
- this_actor.info(f"{worker_id} started")
- mailbox: Mailbox = Mailbox.by_name("worker")
- while True:
- job: Job = mailbox.get()
- this_actor.info(f"{worker_id} working on {job.job_id} (will take {job.duration}s to complete)")
- this_actor.sleep_for(job.duration)
- job.result_mailbox.put(f"{worker_id}", SIMULATED_RESULT_SIZE_BYTES)
-
-
-@dataclass
-class AsyncJobResult:
- job: Job
- result_comm: Comm
- async_data: PyGetAsync
-
- @property
- def complete(self) -> bool:
- return self.result_comm.test()
-
- @property
- def status(self) -> str:
- return "complete" if self.complete else "pending"
-
-
-def client(client_id: str, jobs: List[float], wait_timeout: float):
- worker_mailbox: Mailbox = Mailbox.by_name("worker")
- this_actor.info(f"{client_id} started")
- async_job_results: list[AsyncJobResult] = []
- for job_idx, job_duration in enumerate(jobs):
- result_mailbox: Mailbox = Mailbox.by_name(str(uuid4()))
- job = Job(job_id=f"job-{job_idx}", duration=job_duration, result_mailbox=result_mailbox)
- out_comm = worker_mailbox.put_init(Job(
- job_id=f"job-{job_idx}",
- duration=job_duration,
- result_mailbox=result_mailbox
- ), SIMULATED_JOB_SIZE_BYTES)
- out_comm.detach()
- result_comm, async_data = result_mailbox.get_async()
- async_job_results.append(AsyncJobResult(
- job=job,
- result_comm=result_comm,
- async_data=async_data
- ))
- this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
- completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
- logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
- logger(f"received {completed_comms}/{len(async_job_results)} results")
- for result in async_job_results:
- this_actor.info(f"{result.job.job_id}"
- f" status={result.status}"
- f" result_payload={result.async_data.get() if result.complete else ''}")
-
-
-def main():
- settings = create_parser().parse_known_args()[0]
- e = Engine(sys.argv)
- e.load_platform(settings.platform)
- Actor.create("client", Host.by_name("Tremblay"), client, "client", settings.jobs, settings.wait_timeout)
- for worker_idx in range(settings.workers):
- Actor.create("worker", Host.by_name("Ruby"), worker, f"worker-{worker_idx}").daemonize()
- e.run()
-
-
-if __name__ == "__main__":
- main()
+++ /dev/null
-#!/usr/bin/env tesh
-
-p Testing Comm.wait_all_for()
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout 1 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[ 0.000000] (2:worker@Ruby) worker-0 started
->[ 0.000000] (1:client@Tremblay) client started
->[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=1.0s)
->[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
->[ 1.000000] (1:client@Tremblay) received 0/5 results
->[ 1.000000] (1:client@Tremblay) job-0 status=pending result_payload=
->[ 1.000000] (1:client@Tremblay) job-1 status=pending result_payload=
->[ 1.000000] (1:client@Tremblay) job-2 status=pending result_payload=
->[ 1.000000] (1:client@Tremblay) job-3 status=pending result_payload=
->[ 1.000000] (1:client@Tremblay) job-4 status=pending result_payload=
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout 5 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[ 0.000000] (2:worker@Ruby) worker-0 started
->[ 0.000000] (1:client@Tremblay) client started
->[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=5.0s)
->[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
->[ 1.008029] (2:worker@Ruby) worker-0 working on job-1 (will take 2.0s to complete)
->[ 3.014105] (2:worker@Ruby) worker-0 working on job-2 (will take 3.0s to complete)
->[ 5.000000] (1:client@Tremblay) received 2/5 results
->[ 5.000000] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
->[ 5.000000] (1:client@Tremblay) job-1 status=complete result_payload=worker-0
->[ 5.000000] (1:client@Tremblay) job-2 status=pending result_payload=
->[ 5.000000] (1:client@Tremblay) job-3 status=pending result_payload=
->[ 5.000000] (1:client@Tremblay) job-4 status=pending result_payload=
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 1 --wait-timeout -1 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[ 0.000000] (2:worker@Ruby) worker-0 started
->[ 0.000000] (1:client@Tremblay) client started
->[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=-1.0s)
->[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
->[ 1.008029] (2:worker@Ruby) worker-0 working on job-1 (will take 2.0s to complete)
->[ 3.014105] (2:worker@Ruby) worker-0 working on job-2 (will take 3.0s to complete)
->[ 6.020181] (2:worker@Ruby) worker-0 working on job-3 (will take 4.0s to complete)
->[ 10.026257] (2:worker@Ruby) worker-0 working on job-4 (will take 5.0s to complete)
->[ 15.030379] (1:client@Tremblay) received 5/5 results
->[ 15.030379] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
->[ 15.030379] (1:client@Tremblay) job-1 status=complete result_payload=worker-0
->[ 15.030379] (1:client@Tremblay) job-2 status=complete result_payload=worker-0
->[ 15.030379] (1:client@Tremblay) job-3 status=complete result_payload=worker-0
->[ 15.030379] (1:client@Tremblay) job-4 status=complete result_payload=worker-0
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 5 --wait-timeout 3 --jobs 1,2,3,4,5 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[ 0.000000] (2:worker@Ruby) worker-0 started
->[ 0.000000] (3:worker@Ruby) worker-1 started
->[ 0.000000] (4:worker@Ruby) worker-2 started
->[ 0.000000] (5:worker@Ruby) worker-3 started
->[ 0.000000] (6:worker@Ruby) worker-4 started
->[ 0.000000] (1:client@Tremblay) client started
->[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=3.0s)
->[ 0.001954] (6:worker@Ruby) worker-4 working on job-4 (will take 5.0s to complete)
->[ 0.001954] (5:worker@Ruby) worker-3 working on job-3 (will take 4.0s to complete)
->[ 0.001954] (4:worker@Ruby) worker-2 working on job-2 (will take 3.0s to complete)
->[ 0.001954] (3:worker@Ruby) worker-1 working on job-1 (will take 2.0s to complete)
->[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 1.0s to complete)
->[ 3.000000] (1:client@Tremblay) received 2/5 results
->[ 3.000000] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
->[ 3.000000] (1:client@Tremblay) job-1 status=complete result_payload=worker-1
->[ 3.000000] (1:client@Tremblay) job-2 status=pending result_payload=
->[ 3.000000] (1:client@Tremblay) job-3 status=pending result_payload=
->[ 3.000000] (1:client@Tremblay) job-4 status=pending result_payload=
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitallfor.py --platform ${platfdir}/small_platform_fatpipe.xml --workers 5 --wait-timeout -1 --jobs 5,10,5,20,5,40,5,80,5,160 "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
->[ 0.000000] (2:worker@Ruby) worker-0 started
->[ 0.000000] (3:worker@Ruby) worker-1 started
->[ 0.000000] (4:worker@Ruby) worker-2 started
->[ 0.000000] (5:worker@Ruby) worker-3 started
->[ 0.000000] (6:worker@Ruby) worker-4 started
->[ 0.000000] (1:client@Tremblay) client started
->[ 0.000000] (1:client@Tremblay) awaiting results for all jobs (timeout=-1.0s)
->[ 0.001954] (6:worker@Ruby) worker-4 working on job-4 (will take 5.0s to complete)
->[ 0.001954] (5:worker@Ruby) worker-3 working on job-3 (will take 20.0s to complete)
->[ 0.001954] (4:worker@Ruby) worker-2 working on job-2 (will take 5.0s to complete)
->[ 0.001954] (3:worker@Ruby) worker-1 working on job-1 (will take 10.0s to complete)
->[ 0.001954] (2:worker@Ruby) worker-0 working on job-0 (will take 5.0s to complete)
->[ 5.008029] (2:worker@Ruby) worker-0 working on job-7 (will take 80.0s to complete)
->[ 5.008029] (4:worker@Ruby) worker-2 working on job-6 (will take 5.0s to complete)
->[ 5.008029] (6:worker@Ruby) worker-4 working on job-5 (will take 40.0s to complete)
->[ 10.008029] (3:worker@Ruby) worker-1 working on job-8 (will take 5.0s to complete)
->[ 10.014105] (4:worker@Ruby) worker-2 working on job-9 (will take 160.0s to complete)
->[170.018227] (1:client@Tremblay) received 10/10 results
->[170.018227] (1:client@Tremblay) job-0 status=complete result_payload=worker-0
->[170.018227] (1:client@Tremblay) job-1 status=complete result_payload=worker-1
->[170.018227] (1:client@Tremblay) job-2 status=complete result_payload=worker-2
->[170.018227] (1:client@Tremblay) job-3 status=complete result_payload=worker-3
->[170.018227] (1:client@Tremblay) job-4 status=complete result_payload=worker-4
->[170.018227] (1:client@Tremblay) job-5 status=complete result_payload=worker-4
->[170.018227] (1:client@Tremblay) job-6 status=complete result_payload=worker-2
->[170.018227] (1:client@Tremblay) job-7 status=complete result_payload=worker-0
->[170.018227] (1:client@Tremblay) job-8 status=complete result_payload=worker-1
->[170.018227] (1:client@Tremblay) job-9 status=complete result_payload=worker-2
+++ /dev/null
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-"""
-This example shows how to block on the completion of a set of communications.
-
-As for the other asynchronous examples, the sender initiate all the messages it wants to send and
-pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
-
-The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
-will notice events as soon as they occur even if it does not follow the order of the container.
-
-Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
-other messages of this application. As expected, the trace shows that the finalize of worker 1 is
-processed before 'Message 5' that is sent to worker 0.
-"""
-
-import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
-
-
-def sender(messages_count, msg_size, receivers_count):
- # List in which we store all ongoing communications
- pending_comms = []
-
- # Vector of the used mailboxes
- mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
- for i in range(0, receivers_count)]
-
- # Start dispatching all messages to receivers, in a round robin fashion
- for i in range(0, messages_count):
- content = "Message {:d}".format(i)
- mbox = mboxes[i % receivers_count]
-
- this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
-
- # Create a communication representing the ongoing communication, and store it in pending_comms
- comm = mbox.put_async(content, msg_size)
- pending_comms.append(comm)
-
- # Start sending messages to let the workers know that they should stop
- for i in range(0, receivers_count):
- mbox = mboxes[i]
- this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
- comm = mbox.put_async("finalize", 0)
- pending_comms.append(comm)
-
- this_actor.info("Done dispatching all messages")
-
- # Now that all message exchanges were initiated, wait for their completion, in order of completion.
- #
- # This loop waits for first terminating message with wait_any() and remove it with del, until all comms are
- # terminated.
- # Even in this simple example, the pending comms do not terminate in the exact same order of creation.
- while pending_comms:
- changed_pos = Comm.wait_any(pending_comms)
- del pending_comms[changed_pos]
- if changed_pos != 0:
- this_actor.info(
- "Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first."
- .format(changed_pos))
-
- this_actor.info("Goodbye now!")
-
-
-def receiver(my_id):
- mbox = Mailbox.by_name("receiver-{:d}".format(my_id))
- this_actor.info("Wait for my first message")
- while True:
- received = mbox.get()
- this_actor.info("I got a '{:s}'.".format(received))
- if received == "finalize":
- break # If it's a finalize message, we're done.
-
-if __name__ == '__main__':
- e = Engine(sys.argv)
-
- # Load the platform description
- e.load_platform(sys.argv[1])
-
- Actor.create("sender", Host.by_name("Tremblay"), sender, 6, 1000000, 2)
- Actor.create("receiver", Host.by_name("Fafard"), receiver, 0)
- Actor.create("receiver", Host.by_name("Jupiter"), receiver, 1)
-
- e.run()
+++ /dev/null
-#!/usr/bin/env tesh
-
-p Testing Comm.wait_any()
-
-! output sort 19
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitany.py ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)'
-> [ 0.000000] (2:receiver@Fafard) Wait for my first message
-> [ 0.000000] (3:receiver@Jupiter) Wait for my first message
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 5' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [ 0.158397] (2:receiver@Fafard) I got a 'Message 0'.
-> [ 0.169155] (3:receiver@Jupiter) I got a 'Message 1'.
-> [ 0.316794] (2:receiver@Fafard) I got a 'Message 2'.
-> [ 0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
-> [ 0.475190] (2:receiver@Fafard) I got a 'Message 4'.
-> [ 0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [ 0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
-> [ 0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
-> [ 0.526478] (3:receiver@Jupiter) I got a 'finalize'.
-> [ 0.526478] (1:sender@Tremblay) Goodbye now!
import functools
import sys
-from simgrid import Actor, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor
+from simgrid import Actor, ActivitySet, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor
class Sender:
"""
# Actors that are created as object will execute their __call__ method.
# So, the following constitutes the main function of the Sender actor.
def __call__(self):
- pending_comms = []
+ pending_comms = ActivitySet()
mbox = Mailbox.by_name("receiver")
for i in range(self.msg_count):
size = self.msg_size * (i + 1)
this_actor.info("Send '%s' to '%s, msg size: %d'" % (msg, mbox.name, size))
comm = mbox.put_async(msg, size)
- pending_comms.append(comm)
+ pending_comms.push(comm)
this_actor.info("Done dispatching all messages")
# Now that all message exchanges were initiated, wait for their completion in one single call
- Comm.wait_all(pending_comms)
+ pending_comms.wait_all()
this_actor.info("Goodbye now!")
def __call__(self):
mbox = Mailbox.by_name("receiver")
- pending_msgs = []
- pending_comms = []
+ pending_comms = ActivitySet()
this_actor.info("Wait for %d messages asynchronously" % self.msg_count)
for _ in range(self.msg_count):
- comm, data = mbox.get_async()
- pending_comms.append(comm)
- pending_msgs.append(data)
+ comm = mbox.get_async()
+ pending_comms.push(comm)
- while pending_comms:
- index = Comm.wait_any(pending_comms)
- msg = pending_msgs[index].get()
- this_actor.info("I got '%s'." % msg)
- del pending_comms[index]
- del pending_msgs[index]
+ while not pending_comms.empty():
+ comm = pending_comms.wait_any()
+ this_actor.info("I got '%s'." % comm.get_payload())
####################################################################################################
def link_nonlinear(link: Link, capacity: float, n: int) -> float:
link.set_latency(10e-6).seal()
# create routes between nodes
- zone.add_route(sender.netpoint, receiver.netpoint, None, None,
- [LinkInRoute(link, LinkInRoute.Direction.UP)], True)
+ zone.add_route(sender, receiver, [link])
zone.seal()
# create actors Sender/Receiver
from typing import List, Tuple
import sys
-from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync
+from simgrid import Engine, Actor, ActivitySet, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
RECEIVER_MAILBOX_NAME = "receiver"
def __call__(self) -> None:
# List in which we store all ongoing communications
- pending_comms: List[Comm] = []
+ pending_comms = ActivitySet()
# Make a vector of the mailboxes to use
receiver_mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
message_content = f"Message {i}"
this_actor.info(f"Send '{message_content}' to '{receiver_mailbox.name}'")
# Create a communication representing the ongoing communication, and store it in pending_comms
- pending_comms.append(receiver_mailbox.put_async(message_content, self.message_size))
+ pending_comms.push(receiver_mailbox.put_async(message_content, self.message_size))
this_actor.info("Done dispatching all messages")
# Now that all message exchanges were initiated, wait for their completion in one single call
- Comm.wait_all(pending_comms)
+ pending_comms.wait_all()
this_actor.info("Goodbye now!")
def __call__(self):
# List in which we store all incoming msgs
- pending_comms: List[Tuple[Comm, PyGetAsync]] = []
+ pending_comms = ActivitySet()
this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
for _ in range(self.messages_count):
- pending_comms.append(self.mailbox.get_async())
- while pending_comms:
- index = Comm.wait_any([comm for (comm, _) in pending_comms])
- _, async_data = pending_comms[index]
- this_actor.info(f"I got '{async_data.get()}'.")
- pending_comms.pop(index)
+ pending_comms.push(self.mailbox.get_async())
+ while not pending_comms.empty():
+ comm = pending_comms.wait_any()
+ this_actor.info(f"I got '{comm.get_payload()}'.")
def main():
link = zone.create_split_duplex_link("link1", 10e9).set_latency(10e-6).set_concurrency_limit(2).seal()
# create routes between nodes
- zone.add_route(
- sender_host.netpoint,
- receiver_host.netpoint,
- None,
- None,
- [LinkInRoute(link, LinkInRoute.UP)],
- True
- )
+ zone.add_route(sender_host, receiver_host, [link])
zone.seal()
# create actors Sender/Receiver
this_actor.info(f'Run an activity of {100E6:.0E} flops')
this_actor.execute(100E6)
this_actor.info(f'Done working on my activity; this took {Engine.clock - start}s; current peak speed: {host.speed:.0E} flop/s; number of flops computed so far: {host.computed_flops:.0E}')
- Engine
+
start = Engine.clock
this_actor.info("========= Requesting a reset of the computation and load counters")
host.reset_load()
# under the terms of the license (GNU LGPL) which comes with this package.
"""
-/* This example demonstrates how to dynamically modify a graph of tasks.
- *
- * Assuming we have two instances of a service placed on different hosts,
- * we want to send data alternatively to thoses instances.
- *
- * We consider the following graph:
+This example demonstrates how to dynamically modify a graph of tasks.
+
+Assuming we have two instances of a service placed on different hosts,
+we want to send data alternatively to thoses instances.
+
+We consider the following graph:
comm1
┌────────────────────────┐
│ │
└────────────────────────┘
comm2
- */
- """
+
+"""
from argparse import ArgumentParser
import sys
const sg4::Link* link9 = root->create_split_duplex_link("9", "7.20975MBps")->set_latency("1.461517ms")->seal();
- root->add_route(tremblay->get_netpoint(), jupiter->get_netpoint(), nullptr, nullptr,
- {{link9, sg4::LinkInRoute::Direction::UP}}, true);
+ root->add_route(tremblay, jupiter, {link9});
root->seal();
/* set cost callback for MPI_Send and MPI_Recv */
SMPI_app_instance_start("alltoall_mpi", alltoall_mpi,
{e.host_by_name_or_null("Ginette"), e.host_by_name_or_null("Bourassa"),
e.host_by_name_or_null("Jupiter"), e.host_by_name_or_null("Fafard")});
+ SMPI_app_instance_join("alltoall_mpi");
+ XBT_INFO("This other alltoall_mpi instance terminated.");
});
+
e.run();
XBT_INFO("Simulation time %g", simgrid::s4u::Engine::get_clock());
> [Ginette:alltoall_mpi#0:(11) 10.036773] [smpi_masterworkers/INFO] after alltoall 0
> [Bourassa:alltoall_mpi#1:(12) 10.046578] [smpi_masterworkers/INFO] after alltoall 1
> [Fafard:alltoall_mpi#3:(14) 10.046865] [smpi_masterworkers/INFO] after alltoall 3
-> [Jupiter:alltoall_mpi#2:(13) 10.046865] [smpi_masterworkers/INFO] after alltoall 2
\ No newline at end of file
+> [Jupiter:alltoall_mpi#2:(13) 10.046865] [smpi_masterworkers/INFO] after alltoall 2
+> [Ginette:launcher:(10) 10.046865] [smpi_masterworkers/INFO] This other alltoall_mpi instance terminated.
\ No newline at end of file
--- /dev/null
+/* Copyright (c) 2018-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef INCLUDE_SIMGRID_ACTIVITY_SET_H
+#define INCLUDE_SIMGRID_ACTIVITY_SET_H
+
+#include <simgrid/forward.h>
+#include <sys/types.h> /* ssize_t */
+
+/* C interface */
+SG_BEGIN_DECL
+
+XBT_PUBLIC sg_activity_set_t sg_activity_set_init();
+XBT_PUBLIC void sg_activity_set_push(sg_activity_set_t as, sg_activity_t acti);
+XBT_PUBLIC void sg_activity_set_erase(sg_activity_set_t as, sg_activity_t acti);
+XBT_PUBLIC size_t sg_activity_set_size(sg_activity_set_t as);
+XBT_PUBLIC int sg_activity_set_empty(sg_activity_set_t as);
+
+XBT_PUBLIC sg_activity_t sg_activity_set_test_any(sg_activity_set_t as);
+XBT_PUBLIC void sg_activity_set_wait_all(sg_activity_set_t as);
+/** Returns true if it terminated successfully (or false on timeout) */
+XBT_PUBLIC int sg_activity_set_wait_all_for(sg_activity_set_t as, double timeout);
+XBT_PUBLIC sg_activity_t sg_activity_set_wait_any(sg_activity_set_t as);
+XBT_PUBLIC sg_activity_t sg_activity_set_wait_any_for(sg_activity_set_t as, double timeout);
+XBT_PUBLIC void sg_activity_set_delete(sg_activity_set_t as);
+
+SG_END_DECL
+
+#endif /* INCLUDE_SIMGRID_ACTIVITY_SET_H */
/* C interface */
SG_BEGIN_DECL
+XBT_PUBLIC int sg_comm_isinstance(sg_activity_t acti);
+
XBT_PUBLIC void sg_comm_detach(sg_comm_t comm, void (*clean_function)(void*));
XBT_PUBLIC int sg_comm_test(sg_comm_t comm);
XBT_PUBLIC sg_error_t sg_comm_wait(sg_comm_t comm);
XBT_PUBLIC sg_error_t sg_comm_wait_for(sg_comm_t comm, double timeout);
-XBT_PUBLIC void sg_comm_wait_all(sg_comm_t* comms, size_t count);
-XBT_PUBLIC size_t sg_comm_wait_all_for(sg_comm_t* comms, size_t count, double timeout);
-XBT_PUBLIC ssize_t sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout);
-XBT_PUBLIC ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count);
XBT_PUBLIC void sg_comm_unref(sg_comm_t comm);
+#ifndef DOXYGEN
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC
+ void sg_comm_wait_all(sg_comm_t* comms, size_t count);
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC
+ ssize_t sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout);
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC
+ ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count);
+#endif
+
SG_END_DECL
#endif /* INCLUDE_SIMGRID_COMM_H_ */
/* C interface */
SG_BEGIN_DECL
+XBT_PUBLIC int sg_exec_isinstance(sg_activity_t acti);
+
XBT_PUBLIC void sg_exec_set_bound(sg_exec_t exec, double bound);
XBT_PUBLIC const char* sg_exec_get_name(const_sg_exec_t exec);
XBT_PUBLIC void sg_exec_set_name(sg_exec_t exec, const char* name);
XBT_PUBLIC int sg_exec_test(sg_exec_t exec);
XBT_PUBLIC sg_error_t sg_exec_wait(sg_exec_t exec);
XBT_PUBLIC sg_error_t sg_exec_wait_for(sg_exec_t exec, double timeout);
-XBT_PUBLIC ssize_t sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout);
-XBT_PUBLIC ssize_t sg_exec_wait_any(sg_exec_t* execs, size_t count);
+
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC ssize_t
+ sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout);
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC ssize_t
+ sg_exec_wait_any(sg_exec_t* execs, size_t count);
SG_END_DECL
} // namespace simgrid
using s4u_Actor = simgrid::s4u::Actor;
+using s4u_Activity = simgrid::s4u::Activity;
+using s4u_ActivitySet = simgrid::s4u::ActivitySet;
using s4u_Barrier = simgrid::s4u::Barrier;
using s4u_Comm = simgrid::s4u::Comm;
using s4u_Exec = simgrid::s4u::Exec;
#else
typedef struct s4u_Actor s4u_Actor;
+typedef struct s4u_Activity s4u_Activity;
+typedef struct s4u_ActivitySet s4u_ActivitySet;
typedef struct s4u_Barrier s4u_Barrier;
typedef struct s4u_Comm s4u_Comm;
typedef struct s4u_Exec s4u_Exec;
/** Pointer to a constant actor object */
typedef const s4u_Actor* const_sg_actor_t;
+/** Pointer to an activity object */
+typedef s4u_Activity* sg_activity_t;
+/** Pointer to a constant activity object */
+typedef const s4u_Activity* const_sg_activity_t;
+
+/** Pointer to an activity set object */
+typedef s4u_ActivitySet* sg_activity_set_t;
+/** Pointer to a constant activity set object */
+typedef const s4u_ActivitySet* const_sg_activity_set_t;
+
/** @ingroup m_datatypes_management_details
* @brief Type for any SimGrid size
*/
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_PLUGIN_JBOD_HPP
+#define SIMGRID_PLUGIN_JBOD_HPP
+#include <simgrid/s4u/Host.hpp>
+#include <simgrid/s4u/Io.hpp>
+
+namespace simgrid::plugin {
+
+class JbodIo;
+/** Smart pointer to a simgrid::s4u::Activity */
+using JbodIoPtr = boost::intrusive_ptr<JbodIo>;
+XBT_PUBLIC void intrusive_ptr_release(const JbodIo* io);
+XBT_PUBLIC void intrusive_ptr_add_ref(const JbodIo* io);
+
+class Jbod : public s4u::Host {
+public:
+ enum class RAID {RAID0 = 0, RAID1 = 1, RAID4 = 4 , RAID5 = 5, RAID6 = 6};
+ int get_parity_disk_idx() { return parity_disk_idx_; }
+ void update_parity_disk_idx() { parity_disk_idx_ = (parity_disk_idx_- 1) % num_disks_; }
+
+ int get_next_read_disk_idx() { return (++read_disk_idx_) % num_disks_; }
+
+ JbodIoPtr read_async(sg_size_t size);
+ sg_size_t read(sg_size_t size);
+
+ JbodIoPtr write_async(sg_size_t size);
+ sg_size_t write(sg_size_t size);
+
+ static Jbod* create_jbod(s4u::NetZone* zone, const std::string& name, double speed, unsigned int num_disks,
+ RAID raid_level, double read_bandwidth, double write_bandwidth);
+
+protected:
+ void set_num_disks(unsigned int num_disks) { num_disks_ = num_disks; }
+ void set_parity_disk_idx(unsigned int index) { parity_disk_idx_ = index; }
+ void set_read_disk_idx(int index) { read_disk_idx_ = index; }
+ void set_raid_level(RAID raid_level) { raid_level_ = raid_level; }
+
+private:
+ unsigned int num_disks_;
+ RAID raid_level_;
+ unsigned int parity_disk_idx_;
+ int read_disk_idx_;
+};
+
+class JbodIo {
+ const Jbod* jbod_;
+ s4u::CommPtr transfer_;
+ s4u::ExecPtr parity_block_comp_;
+ std::vector<s4u::IoPtr> pending_ios_;
+ s4u::Io::OpType type_;
+ std::atomic_int_fast32_t refcount_{0};
+public:
+
+ explicit JbodIo(const Jbod* jbod, const s4u::CommPtr transfer, const s4u::ExecPtr parity_block_comp,
+ const std::vector<s4u::IoPtr>& pending_ios, s4u::Io::OpType type)
+ : jbod_(jbod), transfer_(transfer), parity_block_comp_(parity_block_comp), pending_ios_(pending_ios), type_(type)
+ {}
+
+ void wait();
+
+#ifndef DOXYGEN
+ friend void intrusive_ptr_release(JbodIo* io)
+ {
+ if (io->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ delete io;
+ }
+ }
+ friend void intrusive_ptr_add_ref(JbodIo* io) { io->refcount_.fetch_add(1, std::memory_order_relaxed); }
+#endif
+};
+
+} // namespace simgrid::plugin
+#endif
\ No newline at end of file
virtual Activity* do_start() = 0;
/** Tests whether the given activity is terminated yet. */
virtual bool test();
- /*! take a vector s4u::ActivityPtr and return the rank of the first finished one (or -1 if none is done). */
- static ssize_t test_any(const std::vector<ActivityPtr>& activities);
/** Blocks the current actor until the activity is terminated */
Activity* wait() { return wait_for(-1.0); }
/** Blocks the current actor until the activity is terminated, or until the time limit is reached\n
* Raises: timeout exception. */
void wait_until(double time_limit);
- /*! take a vector of s4u::ActivityPtr and return when one of them is finished.
- * The return value is the rank of the first finished ActivityPtr. */
- static ssize_t wait_any(const std::vector<ActivityPtr>& activities) { return wait_any_for(activities, -1); }
- /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
- static ssize_t wait_any_for(const std::vector<ActivityPtr>& activities, double timeout);
/** Cancel that activity */
Activity* cancel();
kernel::activity::ActivityImpl* get_impl() const { return pimpl_.get(); }
#ifndef DOXYGEN
+ static ssize_t deprecated_wait_any_for(const std::vector<ActivityPtr>& activities, double timeout); // XBT_ATTRIB_DEPRECATED_v339
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t test_any(const std::vector<ActivityPtr>& activities);
+
friend void intrusive_ptr_release(Activity* a)
{
if (a->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
* dependency or no resource assigned) */
void on_this_veto_cb(const std::function<void(AnyActivity&)>& cb) { on_this_veto.connect(cb); }
- XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(
- const std::function<void(Activity const&)>& cb)
- {
- on_suspend.connect(cb);
- }
- XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(
- const std::function<void(Activity const&)>& cb)
- {
- on_resume.connect(cb);
- }
+#ifndef DOXYGEN
+ XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
+ XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(const std::function<void(Activity const&)>& cb) { on_resume.connect(cb); }
+
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any(const std::vector<ActivityPtr>& activities) { return deprecated_wait_any_for(activities, -1); }
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any_for(const std::vector<ActivityPtr>& activities, double timeout) { return deprecated_wait_any_for(activities, timeout); }
+#endif
AnyActivity* add_successor(ActivityPtr a)
{
#include <vector>
-namespace simgrid::s4u {
+namespace simgrid {
+
+extern template class XBT_PUBLIC xbt::Extendable<s4u::ActivitySet>;
+
+namespace s4u {
/** @brief ActivitiesSet
*
* This class is a container of activities, allowing to wait for the completion of any or all activities in the set.
* activities.
*/
class XBT_PUBLIC ActivitySet : public xbt::Extendable<ActivitySet> {
- std::vector<ActivityPtr>
- activities_; // We use a vector instead of a set to improve reproductibility accross architectures
+ std::atomic_int_fast32_t refcount_{1};
+ std::vector<ActivityPtr> activities_; // Use vectors, not sets for better reproductibility accross architectures
std::vector<ActivityPtr> failed_activities_;
+ void handle_failed_activities();
+
public:
ActivitySet() = default;
ActivitySet(const std::vector<ActivityPtr> init) : activities_(init) {}
ActivityPtr get_failed_activity();
bool has_failed_activities() { return not failed_activities_.empty(); }
+
+ // boost::intrusive_ptr<ActivitySet> support:
+ friend void intrusive_ptr_add_ref(ActivitySet* as)
+ {
+ XBT_ATTRIB_UNUSED auto previous = as->refcount_.fetch_add(1);
+ xbt_assert(previous != 0);
+ }
+
+ friend void intrusive_ptr_release(ActivitySet* as)
+ {
+ if (as->refcount_.fetch_sub(1) == 1)
+ delete as;
+ }
};
-}; // namespace simgrid::s4u
+} // namespace s4u
+} // namespace simgrid
#endif
void* get_dst_data() const { return dst_buff_; }
/** Retrieve the size of the received data. Not to be mixed with @ref Activity::get_remaining() */
size_t get_dst_data_size() const { return dst_buff_size_; }
+ /** Retrieve the payload associated to the communication. You can only do that once the comm is (gracefully)
+ * terminated, and it is only setup by the default copy_data callback (not the SMPI one) */
+ void* get_payload() const;
/* Common functions */
Comm* wait_for(double timeout) override;
- /*! \static take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done). */
- static ssize_t test_any(const std::vector<CommPtr>& comms);
+#ifndef DOXYGEN
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any(const std::vector<CommPtr>& comms) { return deprecated_wait_any_for(comms, -1); }
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout) { return deprecated_wait_any_for(comms, timeout); }
- /*! \static take a vector s4u::CommPtr and return when one of them is finished.
- * The return value is the rank of the first finished CommPtr. */
- static ssize_t wait_any(const std::vector<CommPtr>& comms) { return wait_any_for(comms, -1); }
- /*! \static Same as wait_any, but with a timeout. Return -1 if the timeout occurs.*/
- static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout);
+ static ssize_t deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout);
- /*! \static take a vector s4u::CommPtr and return when all of them is finished. */
- static void wait_all(const std::vector<CommPtr>& comms);
- /*! \static Same as wait_all, but with a timeout. Return the number of terminated comm (less than comms.size() if
- * the timeout occurs). */
- static size_t wait_all_for(const std::vector<CommPtr>& comms, double timeout);
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t test_any(const std::vector<CommPtr>& comms);
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static void wait_all(const std::vector<CommPtr>& comms);
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static size_t
+ wait_all_for(const std::vector<CommPtr>& comms, double timeout);
+#endif
};
} // namespace simgrid::s4u
/*! \static Initiate the creation of an Exec. Setters have to be called afterwards */
static ExecPtr init();
- /*! \static take a vector of s4u::ExecPtr and return when one of them is finished.
- * The return value is the rank of the first finished ExecPtr. */
- static ssize_t wait_any(const std::vector<ExecPtr>& execs) { return wait_any_for(execs, -1); }
- /*! \static Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
- static ssize_t wait_any_for(const std::vector<ExecPtr>& execs, double timeout);
-
/** @brief On sequential executions, returns the amount of flops that remain to be done; This cannot be used on
* parallel executions. */
double get_remaining() const override;
double get_cost() const;
bool is_parallel() const { return parallel_; }
bool is_assigned() const override;
+
+#ifndef DOXYGEN
+ static ssize_t deprecated_wait_any_for(const std::vector<ExecPtr>& execs, double timeout); // XBT_ATTRIB_DEPRECATED_v339
+
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t
+ wait_any(const std::vector<ExecPtr>& execs) { return deprecated_wait_any_for(execs, -1); }
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t
+ wait_any_for(const std::vector<ExecPtr>& execs, double timeout) { return deprecated_wait_any_for(execs, timeout); }
+#endif
};
} // namespace simgrid::s4u
explicit Io(kernel::activity::IoImplPtr pimpl);
Io* do_start() override;
+ static ssize_t deprecated_wait_any_for(const std::vector<IoPtr>& ios, double timeout);
+
public:
enum class OpType { READ, WRITE };
/*! \static Initiate the creation of an I/O. Setters have to be called afterwards */
static IoPtr init();
- /*! \static take a vector of s4u::IoPtr and return when one of them is finished.
- * The return value is the rank of the first finished IoPtr. */
- static ssize_t wait_any(const std::vector<IoPtr>& ios) { return wait_any_for(ios, -1); }
- /*! \static Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
- static ssize_t wait_any_for(const std::vector<IoPtr>& ios, double timeout);
+#ifndef DOXYGEN
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead")
+ static ssize_t wait_any(const std::vector<IoPtr>& ios) { return deprecated_wait_any_for(ios, -1); }
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead")
+ static ssize_t wait_any_for(const std::vector<IoPtr>& ios, double timeout) { return deprecated_wait_any_for(ios, timeout); }
+#endif
double get_remaining() const override;
sg_size_t get_performed_ioops() const;
CommPtr get_init();
/** Creates and start an async data reception to that mailbox */
template <typename T> CommPtr get_async(T** data);
+ /** Creates and start an async data reception to that mailbox. Since the data location is not provided, you'll have to
+ * use Comm::get_payload once the comm terminates */
+ CommPtr get_async();
/** Blocking data reception */
template <typename T> T* get();
*/
void add_route(kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst, kernel::routing::NetPoint* gw_src,
kernel::routing::NetPoint* gw_dst, const std::vector<LinkInRoute>& link_list, bool symmetrical = true);
+ /**
+ * @brief Add a route between 2 netpoints, and same in other direction
+ *
+ * Create a route:
+ * - route between 2 hosts/routers in same netzone, no gateway is needed
+ * - route between 2 netzones, connecting 2 gateways.
+ *
+ * @param src Source netzone's netpoint
+ * @param dst Destination netzone' netpoint
+ * @param gw_src Netpoint of the gateway in the source netzone
+ * @param gw_dst Netpoint of the gateway in the destination netzone
+ * @param link_list List of links
+ */
+ void add_route(kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst, kernel::routing::NetPoint* gw_src,
+ kernel::routing::NetPoint* gw_dst, const std::vector<const Link*>& links);
+
+ /**
+ * @brief Add a route between 2 hosts
+ *
+ * @param src Source host
+ * @param dst Destination host
+ * @param link_list List of links and their direction used in this communication
+ * @param symmetrical Bi-directional communication
+ */
+ void add_route(const Host* src, const Host* dst, const std::vector<LinkInRoute>& link_list, bool symmetrical = true);
+ /**
+ * @brief Add a route between 2 hosts
+ *
+ * @param src Source host
+ * @param dst Destination host
+ * @param link_list List of links. The UP direction will be used on src->dst and DOWN direction on dst->src
+ */
+ void add_route(const Host* src, const Host* dst, const std::vector<const Link*>& links);
void add_bypass_route(kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst,
kernel::routing::NetPoint* gw_src, kernel::routing::NetPoint* gw_dst,
#ifdef __cplusplus
XBT_PUBLIC void SMPI_app_instance_start(const char* name, std::function<void()> const& code,
std::vector<simgrid::s4u::Host*> const& hosts);
+XBT_PUBLIC void SMPI_app_instance_join(const std::string& instance_id);
/* This version without parameter is nice to use with SMPI_app_instance_start() */
XBT_PUBLIC void MPI_Init();
#include "simgrid/kernel/routing/NetPoint.hpp"
#include "simgrid/plugins/load.h"
#include <simgrid/Exception.hpp>
+#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Actor.hpp>
#include <simgrid/s4u/Barrier.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <vector>
namespace py = pybind11;
+using simgrid::s4u::Activity;
+using simgrid::s4u::ActivityPtr;
+using simgrid::s4u::ActivitySet;
+using simgrid::s4u::ActivitySetPtr;
using simgrid::s4u::Actor;
using simgrid::s4u::ActorPtr;
using simgrid::s4u::Barrier;
sg_version_get(&major, &minor, &patch);
return simgrid::xbt::string_printf("%i.%i.%i", major, minor, patch);
}
-
-/** @brief Wrap for mailbox::get_async */
-class PyGetAsync {
- std::unique_ptr<PyObject*> data = std::make_unique<PyObject*>();
-
-public:
- PyObject** get() const { return data.get(); }
-};
-
} // namespace
PYBIND11_DECLARE_HOLDER_TYPE(T, boost::intrusive_ptr<T>)
}
});
},
- "Registers the main function of an actor");
+ "Registers the main function of an actor")
+ .def("set_log_control", [](Engine*, const std::string& settings) { xbt_log_control_set(settings.c_str()); });
/* Class Netzone */
py::class_<simgrid::s4u::NetZone, std::unique_ptr<simgrid::s4u::NetZone, py::nodelete>> netzone(
simgrid::kernel::routing::NetPoint*, simgrid::kernel::routing::NetPoint*,
const std::vector<simgrid::s4u::LinkInRoute>&, bool>(&simgrid::s4u::NetZone::add_route),
"Add a route between 2 netpoints")
+ .def("add_route",
+ py::overload_cast<const simgrid::s4u::Host*, const simgrid::s4u::Host*,
+ const std::vector<simgrid::s4u::LinkInRoute>&, bool>(&simgrid::s4u::NetZone::add_route),
+ "Add a route between 2 netpoints")
+ .def("add_route",
+ py::overload_cast<const simgrid::s4u::Host*, const simgrid::s4u::Host*,
+ const std::vector<const simgrid::s4u::Link*>&>(&simgrid::s4u::NetZone::add_route),
+ "Add a route between 2 netpoints")
.def("create_host", py::overload_cast<const std::string&, double>(&simgrid::s4u::NetZone::create_host),
"Creates a host")
.def("create_host",
"get", [](Mailbox* self) { return py::reinterpret_steal<py::object>(self->get<PyObject>()); },
py::call_guard<py::gil_scoped_release>(), "Blocking data reception")
.def(
- "get_async",
- [](Mailbox* self) -> std::tuple<CommPtr, PyGetAsync> {
- PyGetAsync wrap;
- auto comm = self->get_async(wrap.get());
- return std::make_tuple(std::move(comm), std::move(wrap));
- },
+ "get_async", [](Mailbox* self) -> CommPtr { return self->get_async(); },
py::call_guard<py::gil_scoped_release>(),
"Non-blocking data reception. Use data.get() to get the python object after the communication has finished")
.def("set_receiver", &Mailbox::set_receiver, py::call_guard<py::gil_scoped_release>(),
"Sets the actor as permanent receiver");
- /* Class PyGetAsync */
- py::class_<PyGetAsync>(m, "PyGetAsync", "Wrapper for async get communications")
- .def(py::init<>())
- .def(
- "get", [](const PyGetAsync* self) { return py::reinterpret_steal<py::object>(*(self->get())); },
- "Get python object after async communication in receiver side");
+ /* class Activity */
+ py::class_<Activity, ActivityPtr>(m, "Activity", "Activity. See the C++ documentation for details.");
/* Class Comm */
- py::class_<Comm, CommPtr>(m, "Comm", "Communication. See the C++ documentation for details.")
+ py::class_<Comm, CommPtr, Activity>(m, "Comm", "Communication. See the C++ documentation for details.")
.def_property_readonly("dst_data_size", &Comm::get_dst_data_size, py::call_guard<py::gil_scoped_release>(),
"Retrieve the size of the received data.")
.def_property_readonly("mailbox", &Comm::get_mailbox, py::call_guard<py::gil_scoped_release>(),
"Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
.def("wait_until", &Comm::wait_until, py::call_guard<py::gil_scoped_release>(), py::arg("time_limit"),
"Block until the completion of that communication, or raises TimeoutException after the specified time.")
+ .def(
+ "get_payload",
+ [](const Comm* self) { return py::reinterpret_steal<py::object>((PyObject*)self->get_payload()); },
+ py::call_guard<py::gil_scoped_release>(),
+ "Retrieve the message's payload of a get_async. You cannot call this until after the comm termination.")
.def("detach", py::overload_cast<>(&Comm::detach), py::return_value_policy::reference_internal,
py::call_guard<py::gil_scoped_release>(),
"Start the comm, and ignore its result. It can be completely forgotten after that.")
py::arg("to"), py::arg("simulated_size_in_bytes"),
"Do a blocking communication between two arbitrary hosts.\n\nThis initializes a communication that "
"completely bypass the mailbox and actors mechanism. There is really no limit on the hosts involved. "
- "In particular, the actor does not have to be on one of the involved hosts.")
- .def_static("test_any", &Comm::test_any, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
- "take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done)")
- .def_static("wait_all", &Comm::wait_all, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
- "Block until the completion of all communications in the list.")
- .def_static("wait_all_for", &Comm::wait_all_for, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
- py::arg("timeout"),
- "Block until the completion of all communications in the list, or raises TimeoutException after "
- "the specified timeout.")
- .def_static("wait_any", &Comm::wait_any, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
- "Block until the completion of any communication in the list and return the index of the "
- "terminated one.")
- .def_static("wait_any_for", &Comm::wait_any_for, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
- py::arg("timeout"),
- "Block until the completion of any communication in the list and return the index of the terminated "
- "one, or -1 if a timeout occurred.");
+ "In particular, the actor does not have to be on one of the involved hosts.");
/* Class Io */
- py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr>(m, "Io", "I/O activities. See the C++ documentation for details.")
+ py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr, Activity>(m, "Io",
+ "I/O activities. See the C++ documentation for details.")
.def("test", &simgrid::s4u::Io::test, py::call_guard<py::gil_scoped_release>(),
"Test whether the I/O is terminated.")
.def("wait", &simgrid::s4u::Io::wait, py::call_guard<py::gil_scoped_release>(),
- "Block until the completion of that I/O operation")
- .def_static(
- "wait_any_for", &simgrid::s4u::Io::wait_any_for, py::call_guard<py::gil_scoped_release>(),
- "Block until the completion of any I/O in the list (or timeout) and return the index of the terminated one.")
- .def_static("wait_any", &simgrid::s4u::Io::wait_any, py::call_guard<py::gil_scoped_release>(),
- "Block until the completion of any I/O in the list and return the index of the terminated one.");
+ "Block until the completion of that I/O operation");
/* Class Exec */
- py::class_<simgrid::s4u::Exec, simgrid::s4u::ExecPtr>(m, "Exec", "Execution. See the C++ documentation for details.")
+ py::class_<simgrid::s4u::Exec, simgrid::s4u::ExecPtr, Activity>(m, "Exec",
+ "Execution. See the C++ documentation for details.")
.def_property_readonly("remaining", &simgrid::s4u::Exec::get_remaining, py::call_guard<py::gil_scoped_release>(),
"Amount of flops that remain to be computed until completion (read-only property).")
.def_property_readonly("remaining_ratio", &simgrid::s4u::Exec::get_remaining_ratio,
.def(
"__repr__", [](const IoTaskPtr io) { return "IoTask(" + io->get_name() + ")"; },
"Textual representation of the IoTask");
+
+ /* Class ActivitySet */
+ py::class_<ActivitySet, ActivitySetPtr>(m, "ActivitySet", "ActivitySet. See the C++ documentation for details.")
+ .def(py::init([](std::vector<simgrid::s4u::ActivityPtr> activities) {
+ auto* ret = new ActivitySet();
+ for (auto a : activities)
+ ret->push(a);
+ return ActivitySetPtr(ret);
+ }),
+ "The constructor should take the parameters from the command line, as is ")
+ .def(py::init([]() { return ActivitySetPtr(new ActivitySet()); }),
+ "The constructor should take the parameters from the command line, as is ")
+
+ .def("push", &ActivitySet::push, py::call_guard<py::gil_scoped_release>(), py::arg("activity"),
+ "Add an activity to the set")
+ .def("erase", &ActivitySet::erase, py::call_guard<py::gil_scoped_release>(), py::arg("activity"),
+ "Remove that activity from the set")
+ .def_property_readonly("size", &ActivitySet::size, "Count of activities in the set")
+ .def("empty", &ActivitySet::empty, "Returns whether the set is empty")
+ .def("has_failed_activities", &ActivitySet::has_failed_activities,
+ "Returns whether there is any failed activities")
+ .def("get_failed_activity", &ActivitySet::get_failed_activity, "Returns a failed activity from the set, or None")
+
+ .def("wait_all_for", &ActivitySet::wait_all_for, py::call_guard<py::gil_scoped_release>(), py::arg("timeout"),
+ "Wait for the completion of all activities in the set, but not longer than the provided timeout")
+ .def("wait_all", &ActivitySet::wait_all, py::call_guard<py::gil_scoped_release>(),
+ "Wait for the completion of all activities in the set, endlessly")
+ .def("test_any", &ActivitySet::test_any, py::call_guard<py::gil_scoped_release>(),
+ "Returns the first terminated activity if any, or None if no activity is terminated")
+ .def("wait_any_for", &ActivitySet::wait_any_for, py::call_guard<py::gil_scoped_release>(), py::arg("timeout"),
+ "Wait for the completion of one activity in the set, but not longer than the provided timeout")
+ .def("wait_any", &ActivitySet::wait_any, py::call_guard<py::gil_scoped_release>(),
+ "Wait for the completion of one activity in the set, endlessly")
+
+ .def(
+ "__repr__", [](const ActivitySetPtr as) { return "ActivitySet([...])"; },
+ "Textual representation of the ActivitySet");
}
std::function<void(CommImpl*, void*, size_t)> CommImpl::copy_data_callback_ = [](kernel::activity::CommImpl* comm,
void* buff, size_t buff_size) {
xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
- *(void**)(comm->dst_buff_) = buff;
+ if (comm->dst_buff_ != nullptr) // get_async provided a buffer
+ *(void**)(comm->dst_buff_) = buff;
+ comm->payload_ = buff; // Setup what will be retrieved by s4u::Comm::get_payload()
};
void CommImpl::set_copy_data_callback(const std::function<void(CommImpl*, void*, size_t)>& callback)
{
size_t buff_size = src_buff_size_;
/* If there is no data to copy then return */
- if (not src_buff_ || not dst_buff_ || copied_)
+ if (not src_buff_ || not dst_buff_size_ || copied_)
return;
XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this,
unsigned char* dst_buff_ = nullptr;
size_t src_buff_size_ = 0;
size_t* dst_buff_size_ = nullptr;
+ void* payload_ = nullptr; // If dst_buff_ is NULL, the default copy callback puts the data here
void* src_data_ = nullptr; /* User data associated to the communication */
void* dst_data_ = nullptr;
const std::function<void(void*)>& clean_fun, // used to free the synchro in case of problem after a detached send
const std::function<void(activity::CommImpl*, void*, size_t)>&
copy_data_fun, // used to copy data if not default one
- void* payload, bool detached, std::string fun_call)
+ void* payload, bool detached, std::string_view fun_call)
: SimcallObserver(actor)
, mbox_(mbox)
, payload_size_(payload_size)
CommIrecvSimcall(ActorImpl* actor, activity::MailboxImpl* mbox, unsigned char* dst_buff, size_t* dst_buff_size,
const std::function<bool(void*, void*, activity::CommImpl*)>& match_fun,
const std::function<void(activity::CommImpl*, void*, size_t)>& copy_data_fun, void* payload,
- double rate, std::string fun_call)
+ double rate, std::string_view fun_call)
: SimcallObserver(actor)
, mbox_(mbox)
, dst_buff_(dst_buff)
if (cluster->router_id.empty())
cluster->router_id = cluster->prefix + cluster->id + "_router" + cluster->suffix;
auto* router = zone->create_router(cluster->router_id);
- zone->add_route(router, nullptr, nullptr, nullptr, {});
+ std::vector<simgrid::s4u::LinkInRoute> links;
+ zone->add_route(router, nullptr, nullptr, nullptr, links);
simgrid::kernel::routing::on_cluster_creation(*cluster);
}
chosen = xbt::random::uniform_int(0, possibilities-1);
for (auto const& [aid, actor] : actors_to_run_) {
- if (((not actor.is_todo()) && must_be_todo) || actor.is_done() || (not actor.is_enabled()))
+ if (((not actor.is_todo()) && must_be_todo) || actor.is_done() || (not actor.is_enabled()))
continue;
if (chosen == 0) {
return std::make_pair(aid, valuation.at(aid));
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/plugins/jbod.hpp>
+#include <simgrid/s4u/Comm.hpp>
+#include <simgrid/s4u/Disk.hpp>
+#include <simgrid/s4u/Exec.hpp>
+#include <simgrid/s4u/NetZone.hpp>
+
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_jbod, s4u, "Logging specific to the JBOD implmentation");
+
+namespace simgrid::plugin {
+
+Jbod* Jbod::create_jbod(s4u::NetZone* zone, const std::string& name, double speed, unsigned int num_disks,
+ RAID raid_level, double read_bandwidth, double write_bandwidth)
+{
+ xbt_assert(not ((raid_level == RAID::RAID4 || raid_level == RAID::RAID5) && num_disks < 3),
+ "RAID%d requires at least 3 disks", (int) raid_level);
+ xbt_assert(not (raid_level == RAID::RAID6 && num_disks < 4), "RAID6 requires at least 4 disks");
+
+ auto* jbod = static_cast<Jbod*>(zone->create_host(name, speed));
+ jbod->set_num_disks(num_disks);
+ jbod->set_parity_disk_idx(num_disks -1 );
+ jbod->set_read_disk_idx(-1);
+ jbod->set_raid_level(raid_level);
+ for (unsigned int i = 0; i < num_disks; i++)
+ jbod->create_disk(name + "_disk_" + std::to_string(i), read_bandwidth, write_bandwidth);
+
+ return jbod;
+}
+
+JbodIoPtr Jbod::read_async(sg_size_t size)
+{
+ auto comm = s4u::Comm::sendto_init()->set_source(const_cast<Jbod*>(this))->set_payload_size(size);
+ std::vector<s4u::IoPtr> pending_ios;
+ sg_size_t read_size = 0;
+ std::vector<s4u::Disk*> targets;
+ switch(raid_level_) {
+ case RAID::RAID0:
+ read_size = size / num_disks_;
+ targets = get_disks();
+ break;
+ case RAID::RAID1:
+ read_size = size;
+ targets.push_back(get_disks().at(get_next_read_disk_idx()));
+ break;
+ case RAID::RAID4:
+ read_size = size / (num_disks_ - 1);
+ targets = get_disks();
+ targets.pop_back();
+ break;
+ case RAID::RAID5:
+ read_size = size / (num_disks_ - 1);
+ targets = get_disks();
+ targets.erase(targets.begin() + (get_parity_disk_idx() + 1 % num_disks_));
+ break;
+ case RAID::RAID6:
+ read_size = size / (num_disks_ - 2);
+ targets = get_disks();
+ if ( (get_parity_disk_idx() + 2 % num_disks_) == 0 ) {
+ targets.pop_back();
+ targets.erase(targets.begin());
+ } else if (get_parity_disk_idx() + 1 == static_cast<int>(num_disks_)) {
+ targets.pop_back();
+ targets.pop_back();
+ } else {
+ targets.erase(targets.begin() + (get_parity_disk_idx() + 1) % num_disks_,
+ targets.begin() + get_parity_disk_idx() + 3);
+ }
+ break;
+ default:
+ xbt_die("Unsupported RAID level. Supported level are: 0, 1, 4, 5, and 6");
+ }
+ for (const auto* disk : targets) {
+ auto io = s4u::IoPtr(disk->io_init(read_size, s4u::Io::OpType::READ));
+ io->set_name(disk->get_name())->start();
+ pending_ios.push_back(io);
+ }
+
+ return JbodIoPtr(new JbodIo(this, comm, nullptr, pending_ios, s4u::Io::OpType::READ));
+}
+
+sg_size_t Jbod::read(sg_size_t size)
+{
+ read_async(size)->wait();
+ return size;
+}
+
+JbodIoPtr Jbod::write_async(sg_size_t size)
+{
+ auto comm = s4u::Comm::sendto_init(s4u::Host::current(), const_cast<Jbod*>(this));
+ std::vector<s4u::IoPtr> pending_ios;
+ sg_size_t write_size = 0;
+ switch(raid_level_) {
+ case RAID::RAID0:
+ write_size = size / num_disks_;
+ break;
+ case RAID::RAID1:
+ write_size = size;
+ break;
+ case RAID::RAID4:
+ write_size = size / (num_disks_ - 1);
+ break;
+ case RAID::RAID5:
+ update_parity_disk_idx();
+ write_size = size / (num_disks_ - 1);
+ break;
+ case RAID::RAID6:
+ update_parity_disk_idx();
+ update_parity_disk_idx();
+ write_size = size / (num_disks_ - 2);
+ break;
+ default:
+ xbt_die("Unsupported RAID level. Supported level are: 0, 1, 4, 5, and 6");
+ }
+ for (const auto* disk : get_disks()) {
+ auto io = s4u::IoPtr(disk->io_init(write_size, s4u::Io::OpType::WRITE));
+ io->set_name(disk->get_name());
+ pending_ios.push_back(io);
+ }
+
+ s4u::ExecPtr parity_block_comp = nullptr;
+ if (raid_level_ == RAID::RAID4 || raid_level_ == RAID::RAID5 || raid_level_ == RAID::RAID6) {
+ // Assume 1 flop per byte to write per parity block and two for RAID6.
+ // Do not assign the Exec yet, will be done after the completion of the CommPtr
+ if (raid_level_ == RAID::RAID6)
+ parity_block_comp = s4u::Exec::init()->set_flops_amount(200 * write_size);
+ else
+ parity_block_comp = s4u::Exec::init()->set_flops_amount(write_size);
+ }
+
+ comm->set_payload_size(size)->start();
+ return JbodIoPtr(new JbodIo(this, comm, parity_block_comp, pending_ios, s4u::Io::OpType::WRITE));
+}
+
+sg_size_t Jbod::write(sg_size_t size)
+{
+ write_async(size)->wait();
+ return size;
+}
+
+void JbodIo::wait()
+{
+ if (type_ == s4u::Io::OpType::WRITE) {
+ transfer_->wait();
+ XBT_DEBUG("Data received on JBOD");
+ if (parity_block_comp_) {
+ parity_block_comp_->set_host(const_cast<Jbod*>(jbod_))->wait();
+ XBT_DEBUG("Parity block computed");
+ }
+ XBT_DEBUG("Start writing");
+ for (const auto& io : pending_ios_)
+ io->start();
+ }
+
+ for (const auto& io : pending_ios_) {
+ XBT_DEBUG("Wait for I/O on %s", io->get_cname());
+ io->wait();
+ }
+
+ if (type_ == s4u::Io::OpType::READ) {
+ XBT_DEBUG("Data read on JBOD, send it to %s", s4u::Host::current()->get_cname());
+ transfer_->set_destination(s4u::Host::current())->wait();
+ }
+}
+} // namespace simgrid::plugin
return changed_pos;
}
-ssize_t Activity::wait_any_for(const std::vector<ActivityPtr>& activities, double timeout)
+ssize_t Activity::deprecated_wait_any_for(const std::vector<ActivityPtr>& activities, double timeout) // XBT_ATTRIB_DEPRECATED_v339
{
std::vector<kernel::activity::ActivityImpl*> ractivities(activities.size());
std::transform(begin(activities), end(activities), begin(ractivities),
#include "src/kernel/activity/ActivityImpl.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
#include "src/kernel/actor/CommObserver.hpp"
+#include <simgrid/Exception.hpp>
+#include <simgrid/activity_set.h>
#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Engine.hpp>
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activityset, s4u_activity, "S4U set of activities");
-namespace simgrid::s4u {
+namespace simgrid {
+
+template class xbt::Extendable<s4u::ActivitySet>;
+
+namespace s4u {
void ActivitySet::erase(ActivityPtr a)
{
return ret;
}
+void ActivitySet::handle_failed_activities()
+{
+ for (size_t i = 0; i < activities_.size(); i++) {
+ auto act = activities_[i];
+ if (act->pimpl_->get_state() == kernel::activity::State::FAILED) {
+ act->complete(Activity::State::FAILED);
+
+ failed_activities_.push_back(act);
+ activities_[i] = activities_[activities_.size() - 1];
+ activities_.resize(activities_.size() - 1);
+ i--; // compensate the i++ occuring at the end of the loop
+ }
+ }
+}
+
ActivityPtr ActivitySet::wait_any_for(double timeout)
{
std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
- ssize_t changed_pos = kernel::actor::simcall_blocking(
- [&observer] {
- kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
- observer.get_timeout());
- },
- &observer);
- xbt_assert(changed_pos != -1,
- "ActivityImpl::wait_any_for is not supposed to return -1 but instead to raise exceptions");
-
- auto ret = activities_.at(changed_pos);
- erase(ret);
- ret->complete(Activity::State::FINISHED);
- return ret;
+ try {
+ ssize_t changed_pos = kernel::actor::simcall_blocking(
+ [&observer] {
+ kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+ observer.get_timeout());
+ },
+ &observer);
+ if (changed_pos == -1)
+ throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+
+ auto ret = activities_.at(changed_pos);
+ erase(ret);
+ ret->complete(Activity::State::FINISHED);
+ return ret;
+ } catch (const HostFailureException& e) {
+ handle_failed_activities();
+ throw e;
+ } catch (const NetworkFailureException& e) {
+ handle_failed_activities();
+ throw e;
+ } catch (const StorageFailureException& e) {
+ handle_failed_activities();
+ throw e;
+ }
}
ActivityPtr ActivitySet::get_failed_activity()
return ret;
}
-}; // namespace simgrid::s4u
\ No newline at end of file
+} // namespace s4u
+} // namespace simgrid
+
+SG_BEGIN_DECL
+
+sg_activity_set_t sg_activity_set_init()
+{
+ return new simgrid::s4u::ActivitySet();
+}
+void sg_activity_set_push(sg_activity_set_t as, sg_activity_t acti)
+{
+ as->push(acti);
+}
+void sg_activity_set_erase(sg_activity_set_t as, sg_activity_t acti)
+{
+ as->erase(acti);
+}
+size_t sg_activity_set_size(sg_activity_set_t as)
+{
+ return as->size();
+}
+int sg_activity_set_empty(sg_activity_set_t as)
+{
+ return as->empty();
+}
+
+sg_activity_t sg_activity_set_test_any(sg_activity_set_t as)
+{
+ return as->test_any().get();
+}
+void sg_activity_set_wait_all(sg_activity_set_t as)
+{
+ as->wait_all();
+}
+int sg_activity_set_wait_all_for(sg_activity_set_t as, double timeout)
+{
+ try {
+ as->wait_all_for(timeout);
+ return 1;
+ } catch (const simgrid::TimeoutException& e) {
+ return 0;
+ }
+}
+sg_activity_t sg_activity_set_wait_any(sg_activity_set_t as)
+{
+ return as->wait_any().get();
+}
+sg_activity_t sg_activity_set_wait_any_for(sg_activity_set_t as, double timeout)
+{
+ try {
+ return as->wait_any_for(timeout).get();
+ } catch (const simgrid::TimeoutException& e) {
+ return nullptr;
+ }
+}
+
+void sg_activity_set_delete(sg_activity_set_t as)
+{
+ delete as;
+}
+
+SG_END_DECL
#include <cmath>
#include <simgrid/Exception.hpp>
#include <simgrid/comm.h>
+#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Engine.hpp>
#include <simgrid/s4u/Mailbox.hpp>
return this;
}
+void* Comm::get_payload() const
+{
+ xbt_assert(get_state() == State::FINISHED,
+ "You can only retrieve the payload of a communication that gracefully terminated, but its state is %s.",
+ get_state_str());
+ return static_cast<kernel::activity::CommImpl*>(pimpl_.get())->payload_;
+}
+
Actor* Comm::get_sender() const
{
kernel::actor::ActorImplPtr sender = nullptr;
{
xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
"You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
+
+ auto myself = kernel::actor::ActorImpl::self();
+
if (get_source() != nullptr || get_destination() != nullptr) {
xbt_assert(is_assigned(), "When either from_ or to_ is specified, both must be.");
xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
"Direct host-to-host communications cannot carry any data.");
XBT_DEBUG("host-to-host Comm. Pimpl already created and set, just start it.");
- fire_on_start();
- fire_on_this_start();
kernel::actor::simcall_answered([this] {
pimpl_->set_state(kernel::activity::State::READY);
boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->start();
});
- } else if (src_buff_ != nullptr) { // Sender side
+ fire_on_start();
+ fire_on_this_start();
+ } else if (myself == sender_) {
on_send(*this);
on_this_send(*this);
kernel::actor::CommIsendSimcall observer{sender_,
"Isend"};
pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
&observer);
- } else if (dst_buff_ != nullptr) { // Receiver side
+ } else if (myself == receiver_) {
xbt_assert(not detached_, "Receive cannot be detached");
on_recv(*this);
on_this_recv(*this);
return this;
}
-ssize_t Comm::test_any(const std::vector<CommPtr>& comms)
+ssize_t Comm::test_any(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339
{
- std::vector<ActivityPtr> activities;
- for (const auto& comm : comms)
- activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
- return Activity::test_any(activities);
+ std::vector<kernel::activity::ActivityImpl*> ractivities(comms.size());
+ std::transform(begin(comms), end(comms), begin(ractivities), [](const CommPtr& act) { return act->pimpl_.get(); });
+
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityTestanySimcall observer{issuer, ractivities, "test_any"};
+ ssize_t changed_pos = kernel::actor::simcall_answered(
+ [&observer] {
+ return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
+ },
+ &observer);
+ if (changed_pos != -1)
+ comms.at(changed_pos)->complete(State::FINISHED);
+ return changed_pos;
}
/** @brief Block the calling actor until the communication is finished, or until timeout
return this;
}
-ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
+ssize_t Comm::deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout) // XBT_ATTRIB_DEPRECATED_v339
{
- std::vector<ActivityPtr> activities;
+ if (comms.empty())
+ return -1;
+ ActivitySet set;
for (const auto& comm : comms)
- activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
- ssize_t changed_pos;
+ set.push(comm);
try {
- changed_pos = Activity::wait_any_for(activities, timeout);
+ auto* ret = set.wait_any_for(timeout).get();
+ for (size_t i = 0; i < comms.size(); i++)
+ if (comms[i].get() == ret)
+ return i;
+
+ } catch (TimeoutException& e) {
+ return -1;
} catch (const NetworkFailureException& e) {
- changed_pos = -1;
- for (auto c : comms) {
- if (c->pimpl_->get_state() == kernel::activity::State::FAILED) {
+ for (auto c : comms)
+ if (c->pimpl_->get_state() == kernel::activity::State::FAILED)
c->complete(State::FAILED);
- }
- }
+
e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
}
- return changed_pos;
+ return -1;
}
-void Comm::wait_all(const std::vector<CommPtr>& comms)
+void Comm::wait_all(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339
{
// TODO: this should be a simcall or something
for (const auto& comm : comms)
comm->wait();
}
-size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout)
+size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout) // XBT_ATTRIB_DEPRECATED_v339
{
if (timeout < 0.0) {
- wait_all(comms);
+ for (const auto& comm : comms)
+ comm->wait();
return comms.size();
}
- double deadline = Engine::get_clock() + timeout;
- std::vector<CommPtr> waited_comm(1, nullptr);
- for (size_t i = 0; i < comms.size(); i++) {
- double wait_timeout = std::max(0.0, deadline - Engine::get_clock());
- waited_comm[0] = comms[i];
- // Using wait_any_for() here (and not wait_for) because we don't want comms to be invalidated on timeout
- if (wait_any_for(waited_comm, wait_timeout) == -1) {
- XBT_DEBUG("Timeout (%g): i = %zu", wait_timeout, i);
- return i;
- }
- }
- return comms.size();
+ ActivitySet set;
+ for (auto comm : comms)
+ set.push(comm);
+ set.wait_all_for(timeout);
+
+ return set.size();
}
} // namespace simgrid::s4u
/* **************************** Public C interface *************************** */
+int sg_comm_isinstance(sg_activity_t acti)
+{
+ return dynamic_cast<simgrid::s4u::Comm*>(acti) != nullptr;
+}
+
void sg_comm_detach(sg_comm_t comm, void (*clean_function)(void*))
{
comm->detach(clean_function);
void sg_comm_wait_all(sg_comm_t* comms, size_t count)
{
- sg_comm_wait_all_for(comms, count, -1);
+ simgrid::s4u::ActivitySet as;
+ for (size_t i = 0; i < count; i++)
+ as.push(comms[i]);
+
+ as.wait_all();
}
-size_t sg_comm_wait_all_for(sg_comm_t* comms, size_t count, double timeout)
+ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count)
{
std::vector<simgrid::s4u::CommPtr> s4u_comms;
for (size_t i = 0; i < count; i++)
s4u_comms.emplace_back(comms[i], false);
- size_t pos = simgrid::s4u::Comm::wait_all_for(s4u_comms, timeout);
- for (size_t i = pos; i < count; i++)
- s4u_comms[i]->add_ref();
+ ssize_t pos = simgrid::s4u::Comm::deprecated_wait_any_for(s4u_comms, -1);
+ for (size_t i = 0; i < count; i++) {
+ if (pos != -1 && static_cast<size_t>(pos) != i)
+ s4u_comms[i]->add_ref();
+ }
return pos;
}
-ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count)
-{
- return sg_comm_wait_any_for(comms, count, -1);
-}
-
ssize_t sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout)
{
std::vector<simgrid::s4u::CommPtr> s4u_comms;
for (size_t i = 0; i < count; i++)
s4u_comms.emplace_back(comms[i], false);
- ssize_t pos = simgrid::s4u::Comm::wait_any_for(s4u_comms, timeout);
+ ssize_t pos = simgrid::s4u::Comm::deprecated_wait_any_for(s4u_comms, timeout);
for (size_t i = 0; i < count; i++) {
if (pos != -1 && static_cast<size_t>(pos) != i)
s4u_comms[i]->add_ref();
#include "simgrid/simix.hpp"
#include <simgrid/Exception.hpp>
#include <simgrid/exec.h>
+#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Exec.hpp>
#include <simgrid/s4u/Host.hpp>
return this;
}
-ssize_t Exec::wait_any_for(const std::vector<ExecPtr>& execs, double timeout)
+ssize_t Exec::deprecated_wait_any_for(const std::vector<ExecPtr>& execs, double timeout) // XBT_ATTRIB_DEPRECATED_v339
{
- std::vector<ActivityPtr> activities;
+ if (execs.empty())
+ return -1;
+ ActivitySet set;
for (const auto& exec : execs)
- activities.push_back(boost::dynamic_pointer_cast<Activity>(exec));
- return Activity::wait_any_for(activities, timeout);
+ set.push(exec);
+ try {
+ auto* ret = set.wait_any_for(timeout).get();
+ for (size_t i = 0; i < execs.size(); i++)
+ if (execs[i].get() == ret)
+ return i;
+
+ } catch (TimeoutException& e) {
+ return -1;
+ }
+ return -1;
}
/** @brief change the execution bound
} // namespace simgrid::s4u
/* **************************** Public C interface *************************** */
+int sg_exec_isinstance(sg_activity_t acti)
+{
+ return dynamic_cast<simgrid::s4u::Exec*>(acti) != nullptr;
+}
+
void sg_exec_set_bound(sg_exec_t exec, double bound)
{
exec->set_bound(bound);
return status;
}
-ssize_t sg_exec_wait_any(sg_exec_t* execs, size_t count)
+ssize_t sg_exec_wait_any(sg_exec_t* execs, size_t count) // XBT_ATTRIB_DEPRECATED_v339
{
- return sg_exec_wait_any_for(execs, count, -1.0);
+ std::vector<simgrid::s4u::ExecPtr> s4u_execs;
+ for (size_t i = 0; i < count; i++)
+ s4u_execs.emplace_back(execs[i], false);
+
+ ssize_t pos = simgrid::s4u::Exec::deprecated_wait_any_for(s4u_execs, -1.0);
+ for (size_t i = 0; i < count; i++) {
+ if (pos != -1 && static_cast<size_t>(pos) != i)
+ s4u_execs[i]->add_ref();
+ }
+ return pos;
}
-ssize_t sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout)
+ssize_t sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout) // XBT_ATTRIB_DEPRECATED_v339
{
std::vector<simgrid::s4u::ExecPtr> s4u_execs;
for (size_t i = 0; i < count; i++)
s4u_execs.emplace_back(execs[i], false);
- ssize_t pos = simgrid::s4u::Exec::wait_any_for(s4u_execs, timeout);
+ ssize_t pos = simgrid::s4u::Exec::deprecated_wait_any_for(s4u_execs, timeout);
for (size_t i = 0; i < count; i++) {
if (pos != -1 && static_cast<size_t>(pos) != i)
s4u_execs[i]->add_ref();
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
+#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Disk.hpp>
#include <simgrid/s4u/Io.hpp>
#include <xbt/log.h>
return this;
}
-ssize_t Io::wait_any_for(const std::vector<IoPtr>& ios, double timeout)
+ssize_t Io::deprecated_wait_any_for(const std::vector<IoPtr>& ios, double timeout)
{
- std::vector<ActivityPtr> activities;
+ ActivitySet set;
for (const auto& io : ios)
- activities.push_back(boost::dynamic_pointer_cast<Activity>(io));
- return Activity::wait_any_for(activities, timeout);
+ set.push(boost::dynamic_pointer_cast<Activity>(io));
+
+ auto* ret = set.wait_any_for(timeout).get();
+ for (size_t i = 0; i < ios.size(); i++)
+ if (ios[i].get() == ret)
+ return i;
+
+ return -1;
}
IoPtr Io::set_disk(const_sg_disk_t disk)
return res;
}
+CommPtr Mailbox::get_async()
+{
+ CommPtr res = get_init()->set_dst_data(nullptr, sizeof(void*));
+ res->start();
+ return res;
+}
+
kernel::activity::ActivityImplPtr
Mailbox::iprobe(int type, const std::function<bool(void*, void*, kernel::activity::CommImpl*)>& match_fun, void* data)
{
{
pimpl_->add_route(src, dst, gw_src, gw_dst, link_list, symmetrical);
}
+void NetZone::add_route(kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst,
+ kernel::routing::NetPoint* gw_src, kernel::routing::NetPoint* gw_dst,
+ const std::vector<const Link*>& links)
+{
+ std::vector<LinkInRoute> links_direct;
+ std::vector<LinkInRoute> links_reverse;
+ for (auto* l : links) {
+ links_direct.emplace_back(LinkInRoute(l, LinkInRoute::Direction::UP));
+ links_reverse.emplace_back(LinkInRoute(l, LinkInRoute::Direction::DOWN));
+ }
+ pimpl_->add_route(src, dst, gw_src, gw_dst, links_direct, false);
+ pimpl_->add_route(dst, src, gw_dst, gw_src, links_reverse, false);
+}
+
+void NetZone::add_route(const Host* src, const Host* dst, const std::vector<LinkInRoute>& link_list, bool symmetrical)
+{
+ pimpl_->add_route(src->get_netpoint(), dst->get_netpoint(), nullptr, nullptr, link_list, symmetrical);
+}
+void NetZone::add_route(const Host* src, const Host* dst, const std::vector<const Link*>& links)
+{
+ std::vector<LinkInRoute> links_direct;
+ std::vector<LinkInRoute> links_reverse;
+ for (auto* l : links) {
+ links_direct.emplace_back(LinkInRoute(l, LinkInRoute::Direction::UP));
+ links_reverse.emplace_back(LinkInRoute(l, LinkInRoute::Direction::DOWN));
+ }
+ pimpl_->add_route(src->get_netpoint(), dst->get_netpoint(), nullptr, nullptr, links_direct, false);
+ pimpl_->add_route(dst->get_netpoint(), src->get_netpoint(), nullptr, nullptr, links_reverse, false);
+}
void NetZone::add_bypass_route(kernel::routing::NetPoint* src, kernel::routing::NetPoint* dst,
kernel::routing::NetPoint* gw_src, kernel::routing::NetPoint* gw_dst,
#include <climits>
#include "private.hpp"
+#include "simgrid/s4u/Engine.hpp"
#include "smpi_comm.hpp"
-#include "smpi_info.hpp"
#include "smpi_errhandler.hpp"
+#include "smpi_info.hpp"
#include "src/smpi/include/smpi_actor.hpp"
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(smpi_pmpi);
return MPI_SUCCESS;
case MPI_UNIVERSE_SIZE:
*flag = 1;
- universe_size = smpi_get_universe_size();
+ universe_size = simgrid::s4u::Engine::get_instance()->get_host_count();
*static_cast<int**>(attr_value) = &universe_size;
return MPI_SUCCESS;
case MPI_LASTUSEDCODE:
XBT_PRIVATE simgrid::smpi::ActorExt* smpi_process();
XBT_PRIVATE simgrid::smpi::ActorExt* smpi_process_remote(simgrid::s4u::ActorPtr actor);
-XBT_PRIVATE int smpi_get_universe_size();
XBT_PRIVATE void smpi_deployment_register_process(const std::string& instance_id, int rank,
const simgrid::s4u::Actor* actor);
void ActorExt::init()
{
- xbt_assert(smpi_get_universe_size() != 0, "SimGrid was not initialized properly before entering MPI_Init. "
- "Aborting, please check compilation process and use smpirun.");
-
ActorExt* ext = smpi_process();
// if we are in MPI_Init and argc handling has already been done.
if (ext->initialized())
namespace simgrid::smpi::app {
-static int universe_size = 0;
-
class Instance {
public:
explicit Instance(int max_no_processes) : size_(max_no_processes)
{
auto* group = new simgrid::smpi::Group(size_);
comm_world_ = new simgrid::smpi::Comm(group, nullptr, false, -1);
- universe_size += max_no_processes;
bar_ = s4u::Barrier::create(size_);
}
s4u::BarrierPtr bar_;
rank++;
}
}
+void SMPI_app_instance_join(const std::string& instance_id)
+{
+ std::vector<simgrid::s4u::ActorPtr> actors =
+ simgrid::s4u::Engine::get_instance()->get_filtered_actors([instance_id](simgrid::s4u::ActorPtr act) {
+ auto* actor_instance = act->get_property("instance_id");
+ return actor_instance != nullptr && strcmp(actor_instance, instance_id.c_str()) == 0;
+ });
+
+ for (auto& act : actors)
+ act->join();
+}
void smpi_deployment_register_process(const std::string& instance_id, int rank, const simgrid::s4u::Actor* actor)
{
smpi_instances.clear();
}
-int smpi_get_universe_size()
-{
- return simgrid::smpi::app::universe_size;
-}
-
/** @brief Auxiliary method to get list of hosts to deploy app */
static std::vector<simgrid::s4u::Host*> smpi_get_hosts(const simgrid::s4u::Engine* e, const std::string& hostfile)
{
links[name] = zone->create_link(name, 1e9)->set_latency(1e-9)->seal();
}
links["L0"] = zone->create_link("L0", 1e3)->seal();
- zone->add_route(hosts["S1"]->get_netpoint(), hosts["C1"]->get_netpoint(), nullptr, nullptr,
- {sg4::LinkInRoute(links["L1"]), sg4::LinkInRoute(links["L0"]), sg4::LinkInRoute(links["L2"])});
- zone->add_route(hosts["S2"]->get_netpoint(), hosts["C2"]->get_netpoint(), nullptr, nullptr,
- {sg4::LinkInRoute(links["L3"]), sg4::LinkInRoute(links["L0"]), sg4::LinkInRoute(links["L4"])});
-
+ zone->add_route(hosts["S1"], hosts["C1"], {links["L1"], links["L0"], links["L2"]});
+ zone->add_route(hosts["S2"], hosts["C2"], {links["L3"], links["L0"], links["L4"]});
zone->seal();
sg4::Actor::create("", hosts["S1"], sender, "C1", nullptr);
auto const* host1 = zone->create_host("host1", 1e6)->seal();
auto const* host2 = zone->create_host("host2", 1e6)->seal();
auto* testlink = zone->create_link("L1", 1e10)->seal();
- zone->add_route(host1->get_netpoint(), host2->get_netpoint(), nullptr, nullptr, {sg4::LinkInRoute(testlink)});
+ zone->add_route(host1, host2, {testlink});
simgrid::s4u::Actor::create("dispatcher", engine.host_by_name("host1"), main_dispatcher, testlink);
engine.run();
XBT_LOG_NEW_DEFAULT_CATEGORY(issue105, "Issue105");
static void load_generator(sg4::Mailbox* mailbox)
{
- std::vector<sg4::CommPtr> comms;
+ sg4::ActivitySet comms;
// Send the task messages
for (int i = 0; i < 100; i++) {
auto* payload = new int(i);
sg4::CommPtr comm = mailbox->put_async(payload, 1024);
- comms.push_back(comm);
+ comms.push(comm);
sg4::this_actor::sleep_for(1.0);
}
auto* payload = new int(-1);
sg4::CommPtr comm = mailbox->put_async(payload, 1024);
XBT_INFO("Sent shutdown");
- comms.push_back(comm);
+ comms.push(comm);
// Wait for all messages to be consumed before ending the simulation
- sg4::Comm::wait_all(comms);
+ comms.wait_all();
XBT_INFO("Load generator finished");
}
->set_bandwidth_profile(linkSaBandwidthProfile)
->seal();
- world->add_route(hostGl01->get_netpoint(), hostSa01->get_netpoint(), nullptr, nullptr,
- {{linkSa, sg4::LinkInRoute::Direction::NONE}}, true);
+ world->add_route(hostGl01, hostSa01, {{linkSa, sg4::LinkInRoute::Direction::NONE}}, true);
world->seal();
sg4::Mailbox* mb1 = e.mailbox_by_name_or_create("Mailbox 1");
auto* rootzone = sg4::create_full_zone("root");
auto* hostA = rootzone->create_host("hostA", 1e9);
auto* hostB = rootzone->create_host("hostB", 1e9);
- sg4::LinkInRoute link(rootzone->create_link("backbone", "1")->set_latency("1s")->seal());
- rootzone->add_route(hostA->get_netpoint(), hostB->get_netpoint(), nullptr, nullptr, {link}, true);
+ auto* backb = rootzone->create_link("backbone", "1")->set_latency("1s")->seal();
+ rootzone->add_route(hostA, hostB, {backb});
rootzone->seal();
sg4::Actor::create("ptask", hostA, ptask, hostA, hostB);
hosts.append(host2)
link1 = dijkstra.create_link("link1_up", [1e9]).set_latency(1e-3).set_concurrency_limit(10).seal()
link2 = dijkstra.create_link("link1_down", ["1GBps"]).set_latency("1ms").seal()
- dijkstra.add_route(host1.netpoint, host2.netpoint, None, None, [LinkInRoute(link1)], False)
- dijkstra.add_route(host2.netpoint, host1.netpoint, None, None, [LinkInRoute(link2)], False)
+ dijkstra.add_route(host1, host2, [LinkInRoute(link1)], False)
+ dijkstra.add_route(host2, host1, [LinkInRoute(link2)], False)
dijkstra.seal()
# vivaldi
foreach(x actor actor-autorestart actor-suspend
activity-lifecycle
- comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for wait-any-for
+ comm-get-sender comm-pt2pt comm-fault-scenarios
cloud-interrupt-migration cloud-two-execs
monkey-masterworkers monkey-semaphore
concurrent_rw
## Add the tests.
## Some need to be run with all factories, some don't need tesh to run
-foreach(x actor actor-autorestart actor-suspend activity-lifecycle comm-get-sender wait-all-for wait-any-for
+foreach(x actor actor-autorestart actor-suspend activity-lifecycle comm-get-sender
cloud-interrupt-migration cloud-two-execs concurrent_rw dag-incomplete-simulation dependencies io-set-bw io-stream
vm-live-migration vm-suicide)
set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.tesh)
simgrid::s4u::ActorPtr receiver = simgrid::s4u::Actor::create("receiver", all_hosts[1], []() {
assert_exit(true, 2);
int* data;
- simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("mb")->get_async<int>(&data);
- std::vector<simgrid::s4u::CommPtr> pending_comms = {comm};
- REQUIRE_NETWORK_FAILURE(simgrid::s4u::Comm::wait_any(pending_comms));
+ simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("mb")->get_async<int>(&data);
+ simgrid::s4u::ActivitySet pending_comms({comm});
+ REQUIRE_NETWORK_FAILURE(pending_comms.wait_any());
});
simgrid::s4u::ActorPtr sender = simgrid::s4u::Actor::create("sender", all_hosts[2], []() {
const double timeout = simgrid::s4u::Engine::get_clock() + duration;
bool ret;
try {
- std::vector<Activity> activities = {activity};
+ simgrid::s4u::ActivitySet set;
+ set.push(activity);
+
XBT_DEBUG("calling wait_any_for(%f)", duration);
- ssize_t index = Activity::element_type::wait_any_for(activities, duration);
- if (index == -1) {
- XBT_DEBUG("wait_any_for() timed out");
- INFO("wait_any_for() timeout should expire at expected date: " << timeout);
- REQUIRE(simgrid::s4u::Engine::get_clock() == Approx(timeout));
- ret = false;
- } else {
- XBT_DEBUG("wait_any_for() returned index %zd", index);
- REQUIRE(index == 0);
- ret = true;
- }
+ auto waited_activity = set.wait_any_for(duration);
+
+ XBT_DEBUG("wait_any_for() returned activity %p", waited_activity.get());
+ REQUIRE(waited_activity.get() == activity);
+ ret = true;
+
+ } catch (const simgrid::TimeoutException& e) {
+ XBT_DEBUG("wait_any_for() timed out");
+ INFO("wait_any_for() timeout should expire at expected date: " << timeout);
+ REQUIRE(simgrid::s4u::Engine::get_clock() == Approx(timeout));
+ ret = false;
} catch (const simgrid::Exception& e) {
XBT_DEBUG("wait_any_for() threw an exception: %s", e.what());
ret = true;
pr::Profile* profile_link = pr::ProfileBuilder::from_string("link_profile", ctx.link_profile.str(), 0);
sg4::Link const* link =
zone->create_link("link", LinkBandwidth)->set_latency(LinkLatency)->set_state_profile(profile_link)->seal();
- zone->add_route(sender_host->get_netpoint(), receiver_host->get_netpoint(), nullptr, nullptr,
- {sg4::LinkInRoute{link}}, false);
+ zone->add_route(sender_host, receiver_host, {link});
zone->seal();
sg4::Host::on_onoff_cb([mbox](sg4::Host const& host) {
for (int i = 1; i < cfg_host_count; i++) {
auto hostname = "lilibeth " + std::to_string(i);
auto* host = rootzone->create_host(hostname, 1e9);
- sg4::LinkInRoute link(rootzone->create_link(hostname, "1MBps")->set_latency("24us")->seal());
- rootzone->add_route(master_host->get_netpoint(), host->get_netpoint(), nullptr, nullptr, {link}, true);
+ auto* link = rootzone->create_link(hostname, "1MBps")->set_latency("24us")->seal();
+ rootzone->add_route(master_host, host, {link});
worker_hosts.push_back(host);
}
rootzone->seal();
for i in range(1, host_count):
link = rootzone.create_split_duplex_link(f"link {i}", "1MBps").set_latency("24us")
host = rootzone.create_host(f"lilibeth {i}", 1e9)
- rootzone.add_route(main.netpoint, host.netpoint, None, None, [LinkInRoute(link, LinkInRoute.Direction.UP)], True)
+ rootzone.add_route(main, host, [link])
Actor.create("worker", host, worker, i).set_auto_restart(True)
e.netzone_root.seal()
auto* rootzone = sg4::create_full_zone("root");
auto* paul = rootzone->create_host("Paul", 1e9);
auto* carol = rootzone->create_host("Carol", 1e9);
- sg4::LinkInRoute link(rootzone->create_link("link", "1MBps")->set_latency("24us")->seal());
- rootzone->add_route(paul->get_netpoint(), carol->get_netpoint(), nullptr, nullptr, {link}, true);
+ auto* link = rootzone->create_link("link", "1MBps")->set_latency("24us")->seal();
+ rootzone->add_route(paul, carol, {link});
SharedBuffer buffer;
sg4::Actor::create("producer", paul, producer, std::ref(buffer))->set_auto_restart();
void operator()() const
{
/* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
+ sg4::ActivitySet pending_comms;
/* Make a vector of the mailboxes to use */
std::vector<sg4::Mailbox*> mboxes;
auto* mbox = sg4::Mailbox::by_name(host->get_name());
mboxes.push_back(mbox);
sg4::CommPtr comm = mbox->put_async(payload, msg_size);
- pending_comms.push_back(comm);
+ pending_comms.push(comm);
}
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(pending_comms);
+ pending_comms.wait_all();
XBT_INFO("Goodbye now!");
}
auto* host = zone->create_host(hostname, 1e9);
host->create_disk("disk-" + hostname, 1e9, 1e6);
const auto* link = zone->create_link("link-" + hostname, 1e9);
- zone->add_route(host->get_netpoint(), router, nullptr, nullptr, {sg4::LinkInRoute(link)});
+ zone->add_route(host->get_netpoint(), router, nullptr, nullptr, {link});
}
return zone;
}
+++ /dev/null
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include <cstdlib>
-#include <iostream>
-#include <simgrid/s4u.hpp>
-#include <string>
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(meh, "meh");
-
-static void worker()
-{
- auto* mbox = simgrid::s4u::Mailbox::by_name("meh");
- int input1 = 42;
- int input2 = 51;
-
- XBT_INFO("Sending and receiving %d and %d asynchronously", input1, input2);
-
- auto put1 = mbox->put_async(&input1, 1000 * 1000 * 500);
- auto put2 = mbox->put_async(&input2, 1000 * 1000 * 1000);
-
- int* out1;
- auto get1 = mbox->get_async<int>(&out1);
-
- int* out2;
- auto get2 = mbox->get_async<int>(&out2);
-
- XBT_INFO("All comms have started");
- std::vector<simgrid::s4u::CommPtr> comms = {put1, put2, get1, get2};
-
- while (not comms.empty()) {
- size_t index = simgrid::s4u::Comm::wait_all_for(comms, 0.5);
- if (index < comms.size())
- XBT_INFO("wait_all_for: Timeout reached");
- XBT_INFO("wait_all_for: %zu comms finished (#comms=%zu)", index, comms.size());
- comms.erase(comms.begin(), comms.begin() + index);
- }
-
- XBT_INFO("All comms have finished");
- XBT_INFO("Got %d and %d", *out1, *out2);
-}
-
-int main(int argc, char* argv[])
-
-{
- simgrid::s4u::Engine e(&argc, argv);
- e.load_platform(argv[1]);
- simgrid::s4u::Actor::create("worker", e.host_by_name("Tremblay"), worker);
- e.run();
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-p Testing the wait_all_for feature of S4U
-
-! output sort 19
-$ ${bindir:=.}/wait-all-for ${platfdir:=.}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (1:worker@Tremblay) Sending and receiving 42 and 51 asynchronously
-> [ 0.000000] (1:worker@Tremblay) All comms have started
-> [ 0.500000] (1:worker@Tremblay) wait_all_for: Timeout reached
-> [ 0.500000] (1:worker@Tremblay) wait_all_for: 0 comms finished (#comms=4)
-> [ 1.000000] (1:worker@Tremblay) wait_all_for: Timeout reached
-> [ 1.000000] (1:worker@Tremblay) wait_all_for: 0 comms finished (#comms=4)
-> [ 1.500000] (1:worker@Tremblay) wait_all_for: Timeout reached
-> [ 1.500000] (1:worker@Tremblay) wait_all_for: 1 comms finished (#comms=4)
-> [ 2.000000] (1:worker@Tremblay) wait_all_for: Timeout reached
-> [ 2.000000] (1:worker@Tremblay) wait_all_for: 0 comms finished (#comms=3)
-> [ 2.070331] (1:worker@Tremblay) wait_all_for: 3 comms finished (#comms=3)
-> [ 2.070331] (1:worker@Tremblay) All comms have finished
-> [ 2.070331] (1:worker@Tremblay) Got 42 and 51
+++ /dev/null
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include <cstdlib>
-#include <iostream>
-#include <string>
-#include <simgrid/s4u.hpp>
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(meh, "meh");
-
-static void worker()
-{
- auto* mbox = simgrid::s4u::Mailbox::by_name("meh");
- int input1 = 42;
- int input2 = 51;
-
- XBT_INFO("Sending and receiving %d and %d asynchronously", input1, input2);
-
- auto put1 = mbox->put_async(&input1, 1000 * 1000 * 500);
- auto put2 = mbox->put_async(&input2, 1000 * 1000 * 1000);
-
- int* out1;
- auto get1 = mbox->get_async<int>(&out1);
-
- int* out2;
- auto get2 = mbox->get_async<int>(&out2);
-
- XBT_INFO("All comms have started");
- std::vector<simgrid::s4u::CommPtr> comms = {put1, put2, get1, get2};
-
- while (not comms.empty()) {
- ssize_t index = simgrid::s4u::Comm::wait_any_for(comms, 0.5);
- if (index < 0)
- XBT_INFO("wait_any_for: Timeout reached");
- else {
- XBT_INFO("wait_any_for: A comm finished (index=%zd, #comms=%zu)", index, comms.size());
- comms.erase(comms.begin() + index);
- }
- }
-
- XBT_INFO("All comms have finished");
- XBT_INFO("Got %d and %d", *out1, *out2);
-}
-
-int main(int argc, char* argv[])
-
-{
- simgrid::s4u::Engine e(&argc, argv);
- e.load_platform(argv[1]);
- simgrid::s4u::Actor::create("worker", e.host_by_name("Tremblay"), worker);
- e.run();
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-p Testing the wait_any_for feature of S4U
-
-! output sort 19
-$ ${bindir:=.}/wait-any-for ${platfdir:=.}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (1:worker@Tremblay) Sending and receiving 42 and 51 asynchronously
-> [ 0.000000] (1:worker@Tremblay) All comms have started
-> [ 0.500000] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [ 1.000000] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [ 1.035263] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=4)
-> [ 1.035263] (1:worker@Tremblay) wait_any_for: A comm finished (index=1, #comms=3)
-> [ 1.535263] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [ 2.035263] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [ 2.070331] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=2)
-> [ 2.070331] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=1)
-> [ 2.070331] (1:worker@Tremblay) All comms have finished
-> [ 2.070331] (1:worker@Tremblay) Got 42 and 51
src/plugins/host_dvfs.cpp
src/plugins/host_energy.cpp
src/plugins/host_load.cpp
+ src/plugins/jbod.cpp
src/plugins/link_energy.cpp
src/plugins/link_energy_wifi.cpp
src/plugins/link_load.cpp
set(headers_to_install
include/simgrid/actor.h
+ include/simgrid/activity_set.h
include/simgrid/barrier.h
include/simgrid/comm.h
include/simgrid/engine.h
include/simgrid/plugins/dvfs.h
include/simgrid/plugins/energy.h
include/simgrid/plugins/file_system.h
+ include/simgrid/plugins/jbod.hpp
include/simgrid/plugins/live_migration.h
include/simgrid/plugins/load.h
include/simgrid/plugins/photovoltaic.hpp
# This script is used by various build projects on Jenkins
+case "$JENKINS_HOME" in
+*-qualif)
+ echo "Build skipped on $JENKINS_HOME."
+ exit 0
+ ;;
+esac
+
# See https://ci.inria.fr/simgrid/job/SimGrid/configure
# See https://ci.inria.fr/simgrid/job/Simgrid-Windows/configure