- New function NetZone::add_route(host1, host2, links) when you don't need gateways
Also add a variant with s4u::Link, when you don't want to specify the directions
on symmetric routes.
+ - Introduce a Mailbox::get_async() with no payload parameter. You can use the new
+ Comm::get_payload() once the communication is over to retrieve the payload.
SMPI:
- New SMPI_app_instance_join(): wait for the completion of a started MPI instance
Python:
- Make the host_load plugin available from Python. See examples/python/plugin-host-load
+ - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
----------------------------------------------------------------------------
# This file lists the content of the python source package
# Prepared in tools/cmake/Distrib.cmake
+include examples/c/activityset-testany/activityset-testany.c
+include examples/c/activityset-testany/activityset-testany.tesh
+include examples/c/activityset-waitall/activityset-waitall.c
+include examples/c/activityset-waitall/activityset-waitall.tesh
+include examples/c/activityset-waitallfor/activityset-waitallfor.c
+include examples/c/activityset-waitallfor/activityset-waitallfor.tesh
+include examples/c/activityset-waitany/activityset-waitany.c
+include examples/c/activityset-waitany/activityset-waitany.tesh
include examples/c/actor-create/actor-create.c
include examples/c/actor-create/actor-create.tesh
include examples/c/actor-create/actor-create_d.xml
include examples/c/exec-dvfs/exec-dvfs.tesh
include examples/c/exec-remote/exec-remote.c
include examples/c/exec-remote/exec-remote.tesh
-include examples/c/exec-waitany/exec-waitany.c
-include examples/c/exec-waitany/exec-waitany.tesh
include examples/c/io-disk-raw/io-disk-raw.c
include examples/c/io-disk-raw/io-disk-raw.tesh
include examples/c/io-file-remote/io-file-remote.c
include examples/cpp/io-file-remote/s4u-io-file-remote_d.xml
include examples/cpp/io-file-system/s4u-io-file-system.cpp
include examples/cpp/io-file-system/s4u-io-file-system.tesh
-include examples/cpp/io-jbod-raw/s4u-io-jbod-raw.cpp
-include examples/cpp/io-jbod-raw/s4u-io-jbod-raw.tesh
include examples/cpp/io-priority/s4u-io-priority.cpp
include examples/cpp/io-priority/s4u-io-priority.tesh
include examples/cpp/maestro-set/s4u-maestro-set.cpp
include examples/cpp/platform-properties/s4u-platform-properties.tesh
include examples/cpp/plugin-host-load/s4u-plugin-host-load.cpp
include examples/cpp/plugin-host-load/s4u-plugin-host-load.tesh
+include examples/cpp/plugin-jbod/s4u-plugin-jbod.cpp
+include examples/cpp/plugin-jbod/s4u-plugin-jbod.tesh
include examples/cpp/plugin-link-load/s4u-plugin-link-load.cpp
include examples/cpp/plugin-link-load/s4u-plugin-link-load.tesh
include examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp
include examples/cpp/trace-process-migration/s4u-trace-process-migration.tesh
include examples/cpp/trace-route-user-variables/s4u-trace-route-user-variables.cpp
include examples/cpp/trace-route-user-variables/s4u-trace-route-user-variables.tesh
+include examples/python/activityset-testany/activityset-testany.py
+include examples/python/activityset-testany/activityset-testany.tesh
+include examples/python/activityset-waitall/activityset-waitall.py
+include examples/python/activityset-waitall/activityset-waitall.tesh
+include examples/python/activityset-waitallfor/activityset-waitallfor.py
+include examples/python/activityset-waitallfor/activityset-waitallfor.tesh
+include examples/python/activityset-waitany/activityset-waitany.py
+include examples/python/activityset-waitany/activityset-waitany.tesh
include examples/python/actor-create/actor-create.py
include examples/python/actor-create/actor-create.tesh
include examples/python/actor-daemon/actor-daemon.py
include examples/smpi/smpi_s4u_masterworker/CMakeLists.txt
include examples/sthread/CMakeLists.txt
include include/simgrid/Exception.hpp
+include include/simgrid/activityset.h
include include/simgrid/actor.h
include include/simgrid/barrier.h
include include/simgrid/chrono.hpp
foreach(x
actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-stacksize
actor-suspend actor-yield
+ activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
app-masterworker app-token-ring
comm-pingpong comm-wait comm-waitall comm-waitany
cloud-capping cloud-masterworker cloud-migration cloud-simple
dht-pastry
- exec-async exec-basic exec-dvfs exec-remote exec-waitany
+ exec-async exec-basic exec-dvfs exec-remote
energy-exec energy-exec-ptask energy-vm
io-disk-raw io-file-remote io-file-system
platform-failures platform-properties
foreach(x
actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-stacksize
actor-suspend actor-yield
+ activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
app-bittorrent app-chainsend app-masterworker app-token-ring
comm-pingpong comm-wait comm-waitall comm-waitany
cloud-capping cloud-masterworker cloud-migration cloud-simple
dht-kademlia dht-pastry
- exec-async exec-basic exec-dvfs exec-remote exec-waitany
+ exec-async exec-basic exec-dvfs exec-remote
energy-exec energy-exec-ptask energy-vm
io-disk-raw io-file-remote io-file-system
platform-failures platform-properties
--- /dev/null
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_testany, "Messages specific for this s4u example");
+
+static void bob(int argc, char* argv[])
+{
+ XBT_INFO("Create my asynchronous activities");
+ sg_exec_t exec = sg_actor_exec_init(5e9);
+ sg_exec_start(exec);
+
+ sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+ void* payload = NULL;
+ sg_comm_t comm = sg_mailbox_get_async(mbox, &payload);
+
+ sg_activity_set_t pending_activities = sg_activity_set_init();
+ sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+ sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+ XBT_INFO("Sleep_for a while");
+ sg_actor_sleep_for(1);
+
+ XBT_INFO("Test for completed activities");
+ while (!sg_activity_set_empty(pending_activities)) {
+ sg_activity_t completed_one = sg_activity_set_test_any(pending_activities);
+ if (completed_one != NULL) {
+ if (sg_comm_isinstance(completed_one))
+ XBT_INFO("Completed a Comm");
+ if (sg_exec_isinstance(completed_one))
+ XBT_INFO("Completed an Exec");
+ } else {
+ XBT_INFO("Nothing matches, test again in 0.5s");
+ sg_actor_sleep_for(.5);
+ }
+ }
+ XBT_INFO("Last activity is complete");
+ free(payload);
+}
+
+static void alice(int argc, char* argv[])
+{
+ char* payload = xbt_strdup("Message");
+ XBT_INFO("Send '%s'", payload);
+ sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid_init(&argc, argv);
+ xbt_assert(argc > 1,
+ "Usage: %s platform_file\n"
+ "\tExample: %s hosts_with_disks.xml\n",
+ argv[0], argv[0]);
+
+ simgrid_load_platform(argv[1]);
+
+ sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+ sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+ simgrid_run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n"
+> [0.00] [alice] Send 'Message'
+> [0.00] [ bob] Create my asynchronous activities
+> [0.00] [ bob] Sleep_for a while
+> [1.00] [ bob] Test for completed activities
+> [1.00] [ bob] Nothing matches, test again in 0.5s
+> [1.50] [ bob] Nothing matches, test again in 0.5s
+> [2.00] [ bob] Nothing matches, test again in 0.5s
+> [2.50] [ bob] Nothing matches, test again in 0.5s
+> [3.00] [ bob] Nothing matches, test again in 0.5s
+> [3.50] [ bob] Nothing matches, test again in 0.5s
+> [4.00] [ bob] Nothing matches, test again in 0.5s
+> [4.50] [ bob] Nothing matches, test again in 0.5s
+> [5.00] [ bob] Completed an Exec
+> [5.00] [ bob] Nothing matches, test again in 0.5s
+> [5.50] [ bob] Completed a Comm
+> [5.50] [ bob] Last activity is complete
--- /dev/null
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitall, "Messages specific for this s4u example");
+
+static void bob()
+{
+ XBT_INFO("Create my asynchronous activities");
+ sg_exec_t exec = sg_actor_exec_init(5e9);
+ sg_exec_start(exec);
+
+ sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+ void* payload = NULL;
+ sg_comm_t comm = sg_mailbox_get_async(mbox, &payload);
+
+ sg_activity_set_t pending_activities = sg_activity_set_init();
+ sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+ sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+ XBT_INFO("Wait for asynchronous activities to complete, all in one shot.");
+ sg_activity_set_wait_all(pending_activities);
+
+ XBT_INFO("All activities are completed.");
+ free(payload);
+}
+
+static void alice()
+{
+ char* payload = xbt_strdup("Message");
+ XBT_INFO("Send '%s'", payload);
+ sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid_init(&argc, argv);
+ xbt_assert(argc > 1,
+ "Usage: %s platform_file\n"
+ "\tExample: %s hosts_with_disks.xml\n",
+ argv[0], argv[0]);
+
+ simgrid_load_platform(argv[1]);
+
+ sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+ sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+ simgrid_run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitall ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchronous activities to complete, all in one shot.
+> [5.197828] [ bob] All activities are completed.
--- /dev/null
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitallfor, "Messages specific for this s4u example");
+
+static void bob()
+{
+ XBT_INFO("Create my asynchronous activities");
+ sg_exec_t exec = sg_actor_exec_init(5e9);
+ sg_exec_start(exec);
+
+ sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+ void* payload = NULL;
+ sg_comm_t comm = sg_mailbox_get_async(mbox, &payload);
+
+ sg_activity_set_t pending_activities = sg_activity_set_init();
+ sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+ sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+ XBT_INFO("Wait for asynchronous activities to complete");
+ while (!sg_activity_set_empty(pending_activities)) {
+ if (!sg_activity_set_wait_all_for(pending_activities, 1)) {
+ XBT_INFO("Not all activities are terminated yet.");
+ }
+
+ sg_activity_t completed_one = sg_activity_set_test_any(pending_activities);
+ while (completed_one != NULL) {
+ if (sg_comm_isinstance(completed_one))
+ XBT_INFO("Completed a Comm");
+ if (sg_exec_isinstance(completed_one))
+ XBT_INFO("Completed an Exec");
+ completed_one = sg_activity_set_test_any(pending_activities);
+ }
+ }
+
+ XBT_INFO("Last activity is complete");
+ free(payload);
+}
+
+static void alice()
+{
+ char* payload = xbt_strdup("Message");
+ XBT_INFO("Send '%s'", payload);
+ sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid_init(&argc, argv);
+ xbt_assert(argc > 1,
+ "Usage: %s platform_file\n"
+ "\tExample: %s hosts_with_disks.xml\n",
+ argv[0], argv[0]);
+
+ simgrid_load_platform(argv[1]);
+
+ sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+ sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+ simgrid_run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitallfor ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchronous activities to complete
+> [1.000000] [ bob] Not all activities are terminated yet.
+> [2.000000] [ bob] Not all activities are terminated yet.
+> [3.000000] [ bob] Not all activities are terminated yet.
+> [4.000000] [ bob] Not all activities are terminated yet.
+> [5.000000] [ bob] Not all activities are terminated yet.
+> [5.000000] [ bob] Completed an Exec
+> [5.197828] [ bob] Completed a Comm
+> [5.197828] [ bob] Last activity is complete
--- /dev/null
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+
+static void bob()
+{
+ XBT_INFO("Create my asynchronous activities");
+ sg_exec_t exec = sg_actor_exec_init(5e9);
+ sg_exec_start(exec);
+
+ sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+ void* payload = NULL;
+ sg_comm_t comm = sg_mailbox_get_async(mbox, &payload);
+
+ sg_activity_set_t pending_activities = sg_activity_set_init();
+ sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+ sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+ XBT_INFO("Wait for asynchronous activities to complete");
+ while (!sg_activity_set_empty(pending_activities)) {
+
+ sg_activity_t completed_one = sg_activity_set_wait_any(pending_activities);
+ if (sg_comm_isinstance(completed_one))
+ XBT_INFO("Completed a Comm");
+ else if (sg_exec_isinstance(completed_one))
+ XBT_INFO("Completed an Exec");
+ else
+ xbt_die("This activity set is supposed to only contain Comm or Exec");
+ }
+ XBT_INFO("Last activity is complete");
+ free(payload);
+}
+
+static void alice()
+{
+ char* payload = xbt_strdup("Message");
+ XBT_INFO("Send '%s'", payload);
+ sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid_init(&argc, argv);
+ xbt_assert(argc > 1,
+ "Usage: %s platform_file\n"
+ "\tExample: %s hosts_with_disks.xml\n",
+ argv[0], argv[0]);
+
+ simgrid_load_platform(argv[1]);
+
+ sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+ sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+ simgrid_run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchronous activities to complete
+> [5.000000] [ bob] Completed an Exec
+> [5.197828] [ bob] Completed a Comm
+> [5.197828] [ bob] Last activity is complete
+++ /dev/null
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/actor.h"
-#include "simgrid/engine.h"
-#include "simgrid/exec.h"
-#include "simgrid/host.h"
-
-#include "xbt/log.h"
-#include "xbt/sysdep.h"
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(exec_waitany, "Messages specific for this example");
-
-static void worker(int argc, char* argv[])
-{
- xbt_assert(argc > 1);
- int with_timeout = !strcmp(argv[1], "true");
-
- /* Vector in which we store all pending executions*/
- sg_exec_t* pending_execs = xbt_malloc(sizeof(sg_exec_t) * 3);
- int pending_execs_count = 0;
-
- for (int i = 0; i < 3; i++) {
- char* name = bprintf("Exec-%d", i);
- double amount = (6 * (i % 2) + i + 1) * sg_host_get_speed(sg_host_self());
-
- sg_exec_t exec = sg_actor_exec_init(amount);
- sg_exec_set_name(exec, name);
- pending_execs[pending_execs_count++] = exec;
- sg_exec_start(exec);
-
- XBT_INFO("Activity %s has started for %.0f seconds", name, amount / sg_host_get_speed(sg_host_self()));
- free(name);
- }
-
- /* Now that executions were initiated, wait for their completion, in order of termination.
- *
- * This loop waits for first terminating execution with wait_any() and remove it with erase(), until all execs are
- * terminated.
- */
- while (pending_execs_count > 0) {
- ssize_t pos;
- if (with_timeout)
- pos = sg_exec_wait_any_for(pending_execs, pending_execs_count, 4);
- else
- pos = sg_exec_wait_any(pending_execs, pending_execs_count);
-
- if (pos < 0) {
- XBT_INFO("Do not wait any longer for an activity (timeout received)");
- pending_execs_count = 0;
- } else {
- XBT_INFO("Activity at position %zd is complete", pos);
- memmove(pending_execs + pos, pending_execs + pos + 1, sizeof(sg_exec_t) * (pending_execs_count - pos - 1));
- pending_execs_count--;
- }
- XBT_INFO("%d activities remain pending", pending_execs_count);
- }
-
- xbt_free(pending_execs);
-}
-
-int main(int argc, char* argv[])
-{
- simgrid_init(&argc, argv);
- simgrid_load_platform(argv[1]);
-
- const char* worker_argv[] = {"worker", "false"};
- sg_actor_create_("worker", sg_host_by_name("Tremblay"), worker, 2, worker_argv);
-
- worker_argv[1] = "true";
- sg_actor_create_("worker_timeout", sg_host_by_name("Tremblay"), worker, 2, worker_argv);
-
- simgrid_run();
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-$ ${bindir:=.}/c-exec-waitany ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n"
-> [ 0.000000] [ worker] Activity Exec-0 has started for 1 seconds
-> [ 0.000000] [worker_timeout] Activity Exec-0 has started for 1 seconds
-> [ 0.000000] [ worker] Activity Exec-1 has started for 8 seconds
-> [ 0.000000] [worker_timeout] Activity Exec-1 has started for 8 seconds
-> [ 0.000000] [ worker] Activity Exec-2 has started for 3 seconds
-> [ 0.000000] [worker_timeout] Activity Exec-2 has started for 3 seconds
-> [ 1.000000] [worker_timeout] Activity at position 0 is complete
-> [ 1.000000] [worker_timeout] 2 activities remain pending
-> [ 1.000000] [ worker] Activity at position 0 is complete
-> [ 1.000000] [ worker] 2 activities remain pending
-> [ 3.000000] [worker_timeout] Activity at position 1 is complete
-> [ 3.000000] [worker_timeout] 1 activities remain pending
-> [ 3.000000] [ worker] Activity at position 1 is complete
-> [ 3.000000] [ worker] 1 activities remain pending
-> [ 7.000000] [worker_timeout] Do not wait any longer for an activity (timeout received)
-> [ 7.000000] [worker_timeout] 0 activities remain pending
-> [ 8.000000] [ worker] Activity at position 0 is complete
-> [ 8.000000] [ worker] 0 activities remain pending
this_actor.info("Create my asynchronous activities")
exec = this_actor.exec_async(5e9)
- comm, payload = mbox.get_async()
+ comm = mbox.get_async()
io = disk.read_async(300000000)
pending_activities = ActivitySet([exec, comm])
this_actor.info("Create my asynchronous activities")
exec = this_actor.exec_async(5e9)
- comm, payload = mbox.get_async()
+ comm = mbox.get_async()
io = disk.read_async(300000000)
pending_activities = ActivitySet([exec, comm])
this_actor.info("Create my asynchronous activities")
exec = this_actor.exec_async(5e9)
- comm, payload = mbox.get_async()
+ comm = mbox.get_async()
io = disk.read_async(300000000)
pending_activities = ActivitySet([exec, comm])
this_actor.info("Create my asynchronous activities")
exec = this_actor.exec_async(5e9)
- comm, payload = mbox.get_async()
+ comm = mbox.get_async()
io = disk.read_async(300000000)
pending_activities = ActivitySet([exec, comm])
Actor.create("alice", Host.by_name("alice"), alice)
e.run()
-
-"""
-
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/s4u.hpp"
-#include <cstdlib>
-#include <iostream>
-#include <string>
-namespace sg4 = simgrid::s4u;
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
-
-static void bob()
-{
- sg4::Mailbox* mbox = sg4::Mailbox::by_name("mbox");
- const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
- std::string* payload;
-
- XBT_INFO("Create my asynchronous activities");
- auto exec = sg4::this_actor::exec_async(5e9);
- auto comm = mbox->get_async(&payload);
- auto io = disk->read_async(3e8);
-
- sg4::ActivitySet pending_activities({boost::dynamic_pointer_cast<sg4::Activity>(exec),
- boost::dynamic_pointer_cast<sg4::Activity>(comm),
- boost::dynamic_pointer_cast<sg4::Activity>(io)});
-
- XBT_INFO("Wait for asynchronous activities to complete");
- while (not pending_activities.empty()) {
- auto completed_one = pending_activities.wait_any();
- if (completed_one != nullptr) {
- if (boost::dynamic_pointer_cast<sg4::Comm>(completed_one))
- XBT_INFO("Completed a Comm");
- if (boost::dynamic_pointer_cast<sg4::Exec>(completed_one))
- XBT_INFO("Completed an Exec");
- if (boost::dynamic_pointer_cast<sg4::Io>(completed_one))
- XBT_INFO("Completed an I/O");
- }
- }
- XBT_INFO("Last activity is complete");
- delete payload;
-}
-
-static void alice()
-{
- auto* payload = new std::string("Message");
- XBT_INFO("Send '%s'", payload->c_str());
- sg4::Mailbox::by_name("mbox")->put(payload, 6e8);
-}
-
-int main(int argc, char* argv[])
-{
- sg4::Engine e(&argc, argv);
-
- e.load_platform(argv[1]);
-
- sg4::Actor::create("bob", e.host_by_name("bob"), bob);
- sg4::Actor::create("alice", e.host_by_name("alice"), alice);
-
- e.run();
-
- return 0;
-}
-"""
\ No newline at end of file
def rank0():
rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
this_actor.info("Post my asynchronous receives")
- comm1, a1 = rank0_mailbox.get_async()
- comm2, a2 = rank0_mailbox.get_async()
- comm3, a3 = rank0_mailbox.get_async()
+ comm1 = rank0_mailbox.get_async()
+ comm2 = rank0_mailbox.get_async()
+ comm3 = rank0_mailbox.get_async()
pending_comms: List[Comm] = [comm1, comm2, comm3]
this_actor.info("Send some data to rank-1")
from uuid import uuid4
import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
+from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
SIMULATED_JOB_SIZE_BYTES = 1024
@dataclass
class AsyncJobResult:
job: Job
- result_comm: Comm
- async_data: PyGetAsync
+ comm: Comm
@property
def complete(self) -> bool:
- return self.result_comm.test()
+ return self.comm.test()
@property
def status(self) -> str:
result_mailbox=result_mailbox
), SIMULATED_JOB_SIZE_BYTES)
out_comm.detach()
- result_comm, async_data = result_mailbox.get_async()
+ result_comm = result_mailbox.get_async()
async_job_results.append(AsyncJobResult(
job=job,
- result_comm=result_comm,
- async_data=async_data
+ comm=result_comm
))
this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
- completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
+ completed_comms = Comm.wait_all_for([entry.comm for entry in async_job_results], wait_timeout)
logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
logger(f"received {completed_comms}/{len(async_job_results)} results")
for result in async_job_results:
this_actor.info(f"{result.job.job_id}"
f" status={result.status}"
- f" result_payload={result.async_data.get() if result.complete else ''}")
+ f" result_payload={result.comm.get_payload() if result.complete else ''}")
def main():
def __call__(self):
mbox = Mailbox.by_name("receiver")
- pending_msgs = []
pending_comms = []
this_actor.info("Wait for %d messages asynchronously" % self.msg_count)
for _ in range(self.msg_count):
- comm, data = mbox.get_async()
+ comm = mbox.get_async()
pending_comms.append(comm)
- pending_msgs.append(data)
while pending_comms:
index = Comm.wait_any(pending_comms)
- msg = pending_msgs[index].get()
+ msg = pending_comms[index].get_payload()
this_actor.info("I got '%s'." % msg)
del pending_comms[index]
- del pending_msgs[index]
####################################################################################################
def link_nonlinear(link: Link, capacity: float, n: int) -> float:
from typing import List, Tuple
import sys
-from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync
+from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
RECEIVER_MAILBOX_NAME = "receiver"
def __call__(self):
# List in which we store all incoming msgs
- pending_comms: List[Tuple[Comm, PyGetAsync]] = []
+ pending_comms: List[Comm] = []
this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
for _ in range(self.messages_count):
pending_comms.append(self.mailbox.get_async())
while pending_comms:
- index = Comm.wait_any([comm for (comm, _) in pending_comms])
- _, async_data = pending_comms[index]
- this_actor.info(f"I got '{async_data.get()}'.")
+ index = Comm.wait_any(pending_comms)
+ this_actor.info(f"I got '{pending_comms[index].get_payload()}'.")
pending_comms.pop(index)
# under the terms of the license (GNU LGPL) which comes with this package.
"""
-/* This example demonstrates how to dynamically modify a graph of tasks.
- *
- * Assuming we have two instances of a service placed on different hosts,
- * we want to send data alternatively to thoses instances.
- *
- * We consider the following graph:
+This example demonstrates how to dynamically modify a graph of tasks.
+
+Assuming we have two instances of a service placed on different hosts,
+we want to send data alternatively to thoses instances.
+
+We consider the following graph:
comm1
┌────────────────────────┐
│ │
└────────────────────────┘
comm2
- */
- """
+
+"""
from argparse import ArgumentParser
import sys
/* C interface */
SG_BEGIN_DECL
+XBT_PUBLIC int sg_comm_isinstance(sg_activity_t acti);
+
XBT_PUBLIC void sg_comm_detach(sg_comm_t comm, void (*clean_function)(void*));
XBT_PUBLIC int sg_comm_test(sg_comm_t comm);
XBT_PUBLIC sg_error_t sg_comm_wait(sg_comm_t comm);
/* C interface */
SG_BEGIN_DECL
+XBT_PUBLIC int sg_exec_isinstance(sg_activity_t acti);
+
XBT_PUBLIC void sg_exec_set_bound(sg_exec_t exec, double bound);
XBT_PUBLIC const char* sg_exec_get_name(const_sg_exec_t exec);
XBT_PUBLIC void sg_exec_set_name(sg_exec_t exec, const char* name);
XBT_PUBLIC int sg_exec_test(sg_exec_t exec);
XBT_PUBLIC sg_error_t sg_exec_wait(sg_exec_t exec);
XBT_PUBLIC sg_error_t sg_exec_wait_for(sg_exec_t exec, double timeout);
-// XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set instead") TODO: C bindings of ActivitySet
-XBT_PUBLIC ssize_t sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout);
-// XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set instead") TODO: C bindings of ActivitySet
-XBT_PUBLIC ssize_t sg_exec_wait_any(sg_exec_t* execs, size_t count);
+
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC ssize_t
+ sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout);
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC ssize_t
+ sg_exec_wait_any(sg_exec_t* execs, size_t count);
SG_END_DECL
} // namespace simgrid
using s4u_Actor = simgrid::s4u::Actor;
+using s4u_Activity = simgrid::s4u::Activity;
+using s4u_ActivitySet = simgrid::s4u::ActivitySet;
using s4u_Barrier = simgrid::s4u::Barrier;
using s4u_Comm = simgrid::s4u::Comm;
using s4u_Exec = simgrid::s4u::Exec;
#else
typedef struct s4u_Actor s4u_Actor;
+typedef struct s4u_Activity s4u_Activity;
+typedef struct s4u_ActivitySet s4u_ActivitySet;
typedef struct s4u_Barrier s4u_Barrier;
typedef struct s4u_Comm s4u_Comm;
typedef struct s4u_Exec s4u_Exec;
/** Pointer to a constant actor object */
typedef const s4u_Actor* const_sg_actor_t;
+/** Pointer to an activity object */
+typedef s4u_Activity* sg_activity_t;
+/** Pointer to a constant activity object */
+typedef const s4u_Activity* const_sg_activity_t;
+
+/** Pointer to an activity set object */
+typedef s4u_ActivitySet* sg_activity_set_t;
+/** Pointer to a constant activity set object */
+typedef const s4u_ActivitySet* const_sg_activity_set_t;
+
/** @ingroup m_datatypes_management_details
* @brief Type for any SimGrid size
*/
void* get_dst_data() const { return dst_buff_; }
/** Retrieve the size of the received data. Not to be mixed with @ref Activity::get_remaining() */
size_t get_dst_data_size() const { return dst_buff_size_; }
+ /** Retrieve the payload associated to the communication. You can only do that once the comm is (gracefully)
+ * terminated, and it is only setup by the default copy_data callback (not the SMPI one) */
+ void* get_payload() const;
/* Common functions */
CommPtr get_init();
/** Creates and start an async data reception to that mailbox */
template <typename T> CommPtr get_async(T** data);
+ /** Creates and start an async data reception to that mailbox. Since the data location is not provided, you'll have to
+ * use Comm::get_payload once the comm terminates */
+ CommPtr get_async();
/** Blocking data reception */
template <typename T> T* get();
sg_version_get(&major, &minor, &patch);
return simgrid::xbt::string_printf("%i.%i.%i", major, minor, patch);
}
-
-/** @brief Wrap for mailbox::get_async */
-class PyGetAsync {
- std::unique_ptr<PyObject*> data = std::make_unique<PyObject*>();
-
-public:
- PyObject** get() const { return data.get(); }
-};
-
} // namespace
PYBIND11_DECLARE_HOLDER_TYPE(T, boost::intrusive_ptr<T>)
"get", [](Mailbox* self) { return py::reinterpret_steal<py::object>(self->get<PyObject>()); },
py::call_guard<py::gil_scoped_release>(), "Blocking data reception")
.def(
- "get_async",
- [](Mailbox* self) -> std::tuple<CommPtr, PyGetAsync> {
- PyGetAsync wrap;
- auto comm = self->get_async(wrap.get());
- return std::make_tuple(std::move(comm), std::move(wrap));
- },
+ "get_async", [](Mailbox* self) -> CommPtr { return self->get_async(); },
py::call_guard<py::gil_scoped_release>(),
"Non-blocking data reception. Use data.get() to get the python object after the communication has finished")
.def("set_receiver", &Mailbox::set_receiver, py::call_guard<py::gil_scoped_release>(),
"Sets the actor as permanent receiver");
- /* Class PyGetAsync */
- py::class_<PyGetAsync>(m, "PyGetAsync", "Wrapper for async get communications")
- .def(py::init<>())
- .def(
- "get", [](const PyGetAsync* self) { return py::reinterpret_steal<py::object>(*(self->get())); },
- "Get python object after async communication in receiver side");
-
/* class Activity */
- py::class_<Activity, ActivityPtr>(m, "Activityy", "Activity. See the C++ documentation for details.");
+ py::class_<Activity, ActivityPtr>(m, "Activity", "Activity. See the C++ documentation for details.");
/* Class Comm */
py::class_<Comm, CommPtr, Activity>(m, "Comm", "Communication. See the C++ documentation for details.")
"Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
.def("wait_until", &Comm::wait_until, py::call_guard<py::gil_scoped_release>(), py::arg("time_limit"),
"Block until the completion of that communication, or raises TimeoutException after the specified time.")
+ .def(
+ "get_payload",
+ [](const Comm* self) { return py::reinterpret_steal<py::object>((PyObject*)self->get_payload()); },
+ py::call_guard<py::gil_scoped_release>(),
+ "Retrieve the message's payload of a get_async. You cannot call this until after the comm termination.")
.def("detach", py::overload_cast<>(&Comm::detach), py::return_value_policy::reference_internal,
py::call_guard<py::gil_scoped_release>(),
"Start the comm, and ignore its result. It can be completely forgotten after that.")
std::function<void(CommImpl*, void*, size_t)> CommImpl::copy_data_callback_ = [](kernel::activity::CommImpl* comm,
void* buff, size_t buff_size) {
xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
- *(void**)(comm->dst_buff_) = buff;
+ if (comm->dst_buff_ != nullptr) // get_async provided a buffer
+ *(void**)(comm->dst_buff_) = buff;
+ comm->payload_ = buff; // Setup what will be retrieved by s4u::Comm::get_payload()
};
void CommImpl::set_copy_data_callback(const std::function<void(CommImpl*, void*, size_t)>& callback)
{
size_t buff_size = src_buff_size_;
/* If there is no data to copy then return */
- if (not src_buff_ || not dst_buff_ || copied_)
+ if (not src_buff_ || not dst_buff_size_ || copied_)
return;
XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this,
unsigned char* dst_buff_ = nullptr;
size_t src_buff_size_ = 0;
size_t* dst_buff_size_ = nullptr;
+ void* payload_ = nullptr; // If dst_buff_ is NULL, the default copy callback puts the data here
void* src_data_ = nullptr; /* User data associated to the communication */
void* dst_data_ = nullptr;
#include "src/kernel/actor/ActorImpl.hpp"
#include "src/kernel/actor/CommObserver.hpp"
#include <simgrid/Exception.hpp>
+#include <simgrid/activity_set.h>
#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Engine.hpp>
} // namespace s4u
} // namespace simgrid
+
+SG_BEGIN_DECL
+
+sg_activity_set_t sg_activity_set_init()
+{
+ return new simgrid::s4u::ActivitySet();
+}
+void sg_activity_set_push(sg_activity_set_t as, sg_activity_t acti)
+{
+ as->push(acti);
+}
+void sg_activity_set_erase(sg_activity_set_t as, sg_activity_t acti)
+{
+ as->erase(acti);
+}
+size_t sg_activity_set_size(sg_activity_set_t as)
+{
+ return as->size();
+}
+int sg_activity_set_empty(sg_activity_set_t as)
+{
+ return as->empty();
+}
+
+sg_activity_t sg_activity_set_test_any(sg_activity_set_t as)
+{
+ return as->test_any().get();
+}
+void sg_activity_set_wait_all(sg_activity_set_t as)
+{
+ as->wait_all();
+}
+int sg_activity_set_wait_all_for(sg_activity_set_t as, double timeout)
+{
+ try {
+ as->wait_all_for(timeout);
+ return 1;
+ } catch (const simgrid::TimeoutException& e) {
+ return 0;
+ }
+}
+sg_activity_t sg_activity_set_wait_any(sg_activity_set_t as)
+{
+ return as->wait_any().get();
+}
+sg_activity_t sg_activity_set_wait_any_for(sg_activity_set_t as, double timeout)
+{
+ try {
+ return as->wait_any_for(timeout).get();
+ } catch (const simgrid::TimeoutException& e) {
+ return nullptr;
+ }
+}
+
+void sg_activity_set_delete(sg_activity_set_t as)
+{
+ delete as;
+}
+
+SG_END_DECL
#include <cmath>
#include <simgrid/Exception.hpp>
#include <simgrid/comm.h>
+#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Engine.hpp>
#include <simgrid/s4u/Mailbox.hpp>
return this;
}
+void* Comm::get_payload() const
+{
+ xbt_assert(get_state() == State::FINISHED,
+ "You can only retrieve the payload of a communication that gracefully terminated, but its state is %s.",
+ get_state_str());
+ return static_cast<kernel::activity::CommImpl*>(pimpl_.get())->payload_;
+}
+
Actor* Comm::get_sender() const
{
kernel::actor::ActorImplPtr sender = nullptr;
{
xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
"You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
+
+ auto myself = kernel::actor::ActorImpl::self();
+
if (get_source() != nullptr || get_destination() != nullptr) {
xbt_assert(is_assigned(), "When either from_ or to_ is specified, both must be.");
xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
});
fire_on_start();
fire_on_this_start();
- } else if (src_buff_ != nullptr) { // Sender side
+ } else if (myself == sender_) {
on_send(*this);
on_this_send(*this);
kernel::actor::CommIsendSimcall observer{sender_,
"Isend"};
pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
&observer);
- } else if (dst_buff_ != nullptr) { // Receiver side
+ } else if (myself == receiver_) {
xbt_assert(not detached_, "Receive cannot be detached");
on_recv(*this);
on_this_recv(*this);
}
} // namespace simgrid::s4u
/* **************************** Public C interface *************************** */
+int sg_comm_isinstance(sg_activity_t acti)
+{
+ return dynamic_cast<simgrid::s4u::Comm*>(acti) != nullptr;
+}
+
void sg_comm_detach(sg_comm_t comm, void (*clean_function)(void*))
{
comm->detach(clean_function);
} // namespace simgrid::s4u
/* **************************** Public C interface *************************** */
+int sg_exec_isinstance(sg_activity_t acti)
+{
+ return dynamic_cast<simgrid::s4u::Exec*>(acti) != nullptr;
+}
+
void sg_exec_set_bound(sg_exec_t exec, double bound)
{
exec->set_bound(bound);
return res;
}
+CommPtr Mailbox::get_async()
+{
+ CommPtr res = get_init()->set_dst_data(nullptr, sizeof(void*));
+ res->start();
+ return res;
+}
+
kernel::activity::ActivityImplPtr
Mailbox::iprobe(int type, const std::function<bool(void*, void*, kernel::activity::CommImpl*)>& match_fun, void* data)
{
foreach(x actor actor-autorestart actor-suspend
activity-lifecycle
- comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for wait-any-for
+ comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for
cloud-interrupt-migration cloud-two-execs
monkey-masterworkers monkey-semaphore
concurrent_rw
set(headers_to_install
include/simgrid/actor.h
+ include/simgrid/activityset.h
include/simgrid/barrier.h
include/simgrid/comm.h
include/simgrid/engine.h