Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of framagit.org:simgrid/simgrid
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Mon, 24 Jul 2023 22:10:04 +0000 (00:10 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Mon, 24 Jul 2023 22:10:04 +0000 (00:10 +0200)
36 files changed:
ChangeLog
MANIFEST.in
examples/c/CMakeLists.txt
examples/c/activityset-testany/activityset-testany.c [new file with mode: 0644]
examples/c/activityset-testany/activityset-testany.tesh [new file with mode: 0644]
examples/c/activityset-waitall/activityset-waitall.c [new file with mode: 0644]
examples/c/activityset-waitall/activityset-waitall.tesh [new file with mode: 0644]
examples/c/activityset-waitallfor/activityset-waitallfor.c [new file with mode: 0644]
examples/c/activityset-waitallfor/activityset-waitallfor.tesh [new file with mode: 0644]
examples/c/activityset-waitany/activityset-waitany.c [new file with mode: 0644]
examples/c/activityset-waitany/activityset-waitany.tesh [new file with mode: 0644]
examples/c/exec-waitany/exec-waitany.c [deleted file]
examples/c/exec-waitany/exec-waitany.tesh [deleted file]
examples/python/activityset-testany/activityset-testany.py
examples/python/activityset-waitall/activityset-waitall.py
examples/python/activityset-waitallfor/activityset-waitallfor.py
examples/python/activityset-waitany/activityset-waitany.py
examples/python/comm-testany/comm-testany.py
examples/python/comm-waitallfor/comm-waitallfor.py
examples/python/network-nonlinear/network-nonlinear.py
examples/python/platform-comm-serialize/platform-comm-serialize.py
examples/python/task-switch-host/task-switch-host.py
include/simgrid/comm.h
include/simgrid/exec.h
include/simgrid/forward.h
include/simgrid/s4u/Comm.hpp
include/simgrid/s4u/Mailbox.hpp
src/bindings/python/simgrid_python.cpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/CommImpl.hpp
src/s4u/s4u_ActivitySet.cpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Exec.cpp
src/s4u/s4u_Mailbox.cpp
teshsuite/s4u/CMakeLists.txt
tools/cmake/DefinePackages.cmake

index fd83eff..158d75b 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -5,6 +5,8 @@ S4U:
  - New function NetZone::add_route(host1, host2, links) when you don't need gateways
    Also add a variant with s4u::Link, when you don't want to specify the directions
    on symmetric routes.
+ - Introduce a Mailbox::get_async() with no payload parameter. You can use the new 
+   Comm::get_payload() once the communication is over to retrieve the payload.
 
 SMPI:
  - New SMPI_app_instance_join(): wait for the completion of a started MPI instance
@@ -12,6 +14,7 @@ SMPI:
 
 Python:
  - Make the host_load plugin available from Python. See examples/python/plugin-host-load
+ - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
 
 ----------------------------------------------------------------------------
 
index d2ad983..e2a2725 100644 (file)
@@ -1,6 +1,14 @@
 # This file lists the content of the python source package
 # Prepared in tools/cmake/Distrib.cmake
 
+include examples/c/activityset-testany/activityset-testany.c
+include examples/c/activityset-testany/activityset-testany.tesh
+include examples/c/activityset-waitall/activityset-waitall.c
+include examples/c/activityset-waitall/activityset-waitall.tesh
+include examples/c/activityset-waitallfor/activityset-waitallfor.c
+include examples/c/activityset-waitallfor/activityset-waitallfor.tesh
+include examples/c/activityset-waitany/activityset-waitany.c
+include examples/c/activityset-waitany/activityset-waitany.tesh
 include examples/c/actor-create/actor-create.c
 include examples/c/actor-create/actor-create.tesh
 include examples/c/actor-create/actor-create_d.xml
@@ -102,8 +110,6 @@ include examples/c/exec-dvfs/exec-dvfs.c
 include examples/c/exec-dvfs/exec-dvfs.tesh
 include examples/c/exec-remote/exec-remote.c
 include examples/c/exec-remote/exec-remote.tesh
-include examples/c/exec-waitany/exec-waitany.c
-include examples/c/exec-waitany/exec-waitany.tesh
 include examples/c/io-disk-raw/io-disk-raw.c
 include examples/c/io-disk-raw/io-disk-raw.tesh
 include examples/c/io-file-remote/io-file-remote.c
@@ -304,8 +310,6 @@ include examples/cpp/io-file-remote/s4u-io-file-remote.tesh
 include examples/cpp/io-file-remote/s4u-io-file-remote_d.xml
 include examples/cpp/io-file-system/s4u-io-file-system.cpp
 include examples/cpp/io-file-system/s4u-io-file-system.tesh
-include examples/cpp/io-jbod-raw/s4u-io-jbod-raw.cpp
-include examples/cpp/io-jbod-raw/s4u-io-jbod-raw.tesh
 include examples/cpp/io-priority/s4u-io-priority.cpp
 include examples/cpp/io-priority/s4u-io-priority.tesh
 include examples/cpp/maestro-set/s4u-maestro-set.cpp
@@ -361,6 +365,8 @@ include examples/cpp/platform-properties/s4u-platform-properties.cpp
 include examples/cpp/platform-properties/s4u-platform-properties.tesh
 include examples/cpp/plugin-host-load/s4u-plugin-host-load.cpp
 include examples/cpp/plugin-host-load/s4u-plugin-host-load.tesh
+include examples/cpp/plugin-jbod/s4u-plugin-jbod.cpp
+include examples/cpp/plugin-jbod/s4u-plugin-jbod.tesh
 include examples/cpp/plugin-link-load/s4u-plugin-link-load.cpp
 include examples/cpp/plugin-link-load/s4u-plugin-link-load.tesh
 include examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp
@@ -416,6 +422,14 @@ include examples/cpp/trace-process-migration/s4u-trace-process-migration.cpp
 include examples/cpp/trace-process-migration/s4u-trace-process-migration.tesh
 include examples/cpp/trace-route-user-variables/s4u-trace-route-user-variables.cpp
 include examples/cpp/trace-route-user-variables/s4u-trace-route-user-variables.tesh
+include examples/python/activityset-testany/activityset-testany.py
+include examples/python/activityset-testany/activityset-testany.tesh
+include examples/python/activityset-waitall/activityset-waitall.py
+include examples/python/activityset-waitall/activityset-waitall.tesh
+include examples/python/activityset-waitallfor/activityset-waitallfor.py
+include examples/python/activityset-waitallfor/activityset-waitallfor.tesh
+include examples/python/activityset-waitany/activityset-waitany.py
+include examples/python/activityset-waitany/activityset-waitany.tesh
 include examples/python/actor-create/actor-create.py
 include examples/python/actor-create/actor-create.tesh
 include examples/python/actor-daemon/actor-daemon.py
@@ -1904,6 +1918,7 @@ include examples/smpi/replay_multiple_manual_deploy/CMakeLists.txt
 include examples/smpi/smpi_s4u_masterworker/CMakeLists.txt
 include examples/sthread/CMakeLists.txt
 include include/simgrid/Exception.hpp
+include include/simgrid/activityset.h
 include include/simgrid/actor.h
 include include/simgrid/barrier.h
 include include/simgrid/chrono.hpp
index 25cccf5..13349f7 100644 (file)
@@ -4,11 +4,12 @@
 foreach(x
         actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-stacksize
         actor-suspend actor-yield
+        activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
         app-masterworker app-token-ring
         comm-pingpong comm-wait comm-waitall comm-waitany
         cloud-capping cloud-masterworker cloud-migration cloud-simple
         dht-pastry
-        exec-async exec-basic exec-dvfs exec-remote exec-waitany
+        exec-async exec-basic exec-dvfs exec-remote
         energy-exec energy-exec-ptask energy-vm
         io-disk-raw io-file-remote io-file-system
         platform-failures platform-properties
@@ -96,11 +97,12 @@ set(xml_files     ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/actor-create/actor-cr
 foreach(x
         actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-stacksize
         actor-suspend actor-yield
+        activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
         app-bittorrent app-chainsend app-masterworker app-token-ring
         comm-pingpong comm-wait comm-waitall comm-waitany
         cloud-capping  cloud-masterworker cloud-migration cloud-simple
         dht-kademlia dht-pastry
-        exec-async exec-basic exec-dvfs exec-remote exec-waitany
+        exec-async exec-basic exec-dvfs exec-remote
         energy-exec energy-exec-ptask energy-vm
         io-disk-raw io-file-remote io-file-system
         platform-failures platform-properties
diff --git a/examples/c/activityset-testany/activityset-testany.c b/examples/c/activityset-testany/activityset-testany.c
new file mode 100644 (file)
index 0000000..370e684
--- /dev/null
@@ -0,0 +1,76 @@
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_testany, "Messages specific for this s4u example");
+
+static void bob(int argc, char* argv[])
+{
+  XBT_INFO("Create my asynchronous activities");
+  sg_exec_t exec = sg_actor_exec_init(5e9);
+  sg_exec_start(exec);
+
+  sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+  void* payload     = NULL;
+  sg_comm_t comm    = sg_mailbox_get_async(mbox, &payload);
+
+  sg_activity_set_t pending_activities = sg_activity_set_init();
+  sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+  sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+  XBT_INFO("Sleep_for a while");
+  sg_actor_sleep_for(1);
+
+  XBT_INFO("Test for completed activities");
+  while (!sg_activity_set_empty(pending_activities)) {
+    sg_activity_t completed_one = sg_activity_set_test_any(pending_activities);
+    if (completed_one != NULL) {
+      if (sg_comm_isinstance(completed_one))
+        XBT_INFO("Completed a Comm");
+      if (sg_exec_isinstance(completed_one))
+        XBT_INFO("Completed an Exec");
+    } else {
+      XBT_INFO("Nothing matches, test again in 0.5s");
+      sg_actor_sleep_for(.5);
+    }
+  }
+  XBT_INFO("Last activity is complete");
+  free(payload);
+}
+
+static void alice(int argc, char* argv[])
+{
+  char* payload = xbt_strdup("Message");
+  XBT_INFO("Send '%s'", payload);
+  sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid_init(&argc, argv);
+  xbt_assert(argc > 1,
+             "Usage: %s platform_file\n"
+             "\tExample: %s hosts_with_disks.xml\n",
+             argv[0], argv[0]);
+
+  simgrid_load_platform(argv[1]);
+
+  sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+  sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+  simgrid_run();
+
+  return 0;
+}
diff --git a/examples/c/activityset-testany/activityset-testany.tesh b/examples/c/activityset-testany/activityset-testany.tesh
new file mode 100644 (file)
index 0000000..6293fa3
--- /dev/null
@@ -0,0 +1,19 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n"
+> [0.00] [alice] Send 'Message'
+> [0.00] [  bob] Create my asynchronous activities
+> [0.00] [  bob] Sleep_for a while
+> [1.00] [  bob] Test for completed activities
+> [1.00] [  bob] Nothing matches, test again in 0.5s
+> [1.50] [  bob] Nothing matches, test again in 0.5s
+> [2.00] [  bob] Nothing matches, test again in 0.5s
+> [2.50] [  bob] Nothing matches, test again in 0.5s
+> [3.00] [  bob] Nothing matches, test again in 0.5s
+> [3.50] [  bob] Nothing matches, test again in 0.5s
+> [4.00] [  bob] Nothing matches, test again in 0.5s
+> [4.50] [  bob] Nothing matches, test again in 0.5s
+> [5.00] [  bob] Completed an Exec
+> [5.00] [  bob] Nothing matches, test again in 0.5s
+> [5.50] [  bob] Completed a Comm
+> [5.50] [  bob] Last activity is complete
diff --git a/examples/c/activityset-waitall/activityset-waitall.c b/examples/c/activityset-waitall/activityset-waitall.c
new file mode 100644 (file)
index 0000000..c63f091
--- /dev/null
@@ -0,0 +1,63 @@
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitall, "Messages specific for this s4u example");
+
+static void bob()
+{
+  XBT_INFO("Create my asynchronous activities");
+  sg_exec_t exec = sg_actor_exec_init(5e9);
+  sg_exec_start(exec);
+
+  sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+  void* payload     = NULL;
+  sg_comm_t comm    = sg_mailbox_get_async(mbox, &payload);
+
+  sg_activity_set_t pending_activities = sg_activity_set_init();
+  sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+  sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+  XBT_INFO("Wait for asynchronous activities to complete, all in one shot.");
+  sg_activity_set_wait_all(pending_activities);
+
+  XBT_INFO("All activities are completed.");
+  free(payload);
+}
+
+static void alice()
+{
+  char* payload = xbt_strdup("Message");
+  XBT_INFO("Send '%s'", payload);
+  sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid_init(&argc, argv);
+  xbt_assert(argc > 1,
+             "Usage: %s platform_file\n"
+             "\tExample: %s hosts_with_disks.xml\n",
+             argv[0], argv[0]);
+
+  simgrid_load_platform(argv[1]);
+
+  sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+  sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+  simgrid_run();
+
+  return 0;
+}
diff --git a/examples/c/activityset-waitall/activityset-waitall.tesh b/examples/c/activityset-waitall/activityset-waitall.tesh
new file mode 100644 (file)
index 0000000..1093563
--- /dev/null
@@ -0,0 +1,7 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitall ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchronous activities to complete, all in one shot.
+> [5.197828] [  bob] All activities are completed.
diff --git a/examples/c/activityset-waitallfor/activityset-waitallfor.c b/examples/c/activityset-waitallfor/activityset-waitallfor.c
new file mode 100644 (file)
index 0000000..93bbe67
--- /dev/null
@@ -0,0 +1,76 @@
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waitallfor, "Messages specific for this s4u example");
+
+static void bob()
+{
+  XBT_INFO("Create my asynchronous activities");
+  sg_exec_t exec = sg_actor_exec_init(5e9);
+  sg_exec_start(exec);
+
+  sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+  void* payload     = NULL;
+  sg_comm_t comm    = sg_mailbox_get_async(mbox, &payload);
+
+  sg_activity_set_t pending_activities = sg_activity_set_init();
+  sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+  sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+  XBT_INFO("Wait for asynchronous activities to complete");
+  while (!sg_activity_set_empty(pending_activities)) {
+    if (!sg_activity_set_wait_all_for(pending_activities, 1)) {
+      XBT_INFO("Not all activities are terminated yet.");
+    }
+
+    sg_activity_t completed_one = sg_activity_set_test_any(pending_activities);
+    while (completed_one != NULL) {
+      if (sg_comm_isinstance(completed_one))
+        XBT_INFO("Completed a Comm");
+      if (sg_exec_isinstance(completed_one))
+        XBT_INFO("Completed an Exec");
+      completed_one = sg_activity_set_test_any(pending_activities);
+    }
+  }
+
+  XBT_INFO("Last activity is complete");
+  free(payload);
+}
+
+static void alice()
+{
+  char* payload = xbt_strdup("Message");
+  XBT_INFO("Send '%s'", payload);
+  sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid_init(&argc, argv);
+  xbt_assert(argc > 1,
+             "Usage: %s platform_file\n"
+             "\tExample: %s hosts_with_disks.xml\n",
+             argv[0], argv[0]);
+
+  simgrid_load_platform(argv[1]);
+
+  sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+  sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+  simgrid_run();
+
+  return 0;
+}
diff --git a/examples/c/activityset-waitallfor/activityset-waitallfor.tesh b/examples/c/activityset-waitallfor/activityset-waitallfor.tesh
new file mode 100644 (file)
index 0000000..26a73c5
--- /dev/null
@@ -0,0 +1,14 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitallfor ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchronous activities to complete
+> [1.000000] [  bob] Not all activities are terminated yet.
+> [2.000000] [  bob] Not all activities are terminated yet.
+> [3.000000] [  bob] Not all activities are terminated yet.
+> [4.000000] [  bob] Not all activities are terminated yet.
+> [5.000000] [  bob] Not all activities are terminated yet.
+> [5.000000] [  bob] Completed an Exec
+> [5.197828] [  bob] Completed a Comm
+> [5.197828] [  bob] Last activity is complete
diff --git a/examples/c/activityset-waitany/activityset-waitany.c b/examples/c/activityset-waitany/activityset-waitany.c
new file mode 100644 (file)
index 0000000..183693f
--- /dev/null
@@ -0,0 +1,71 @@
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/activity_set.h"
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/exec.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/sysdep.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+
+static void bob()
+{
+  XBT_INFO("Create my asynchronous activities");
+  sg_exec_t exec = sg_actor_exec_init(5e9);
+  sg_exec_start(exec);
+
+  sg_mailbox_t mbox = sg_mailbox_by_name("mbox");
+  void* payload     = NULL;
+  sg_comm_t comm    = sg_mailbox_get_async(mbox, &payload);
+
+  sg_activity_set_t pending_activities = sg_activity_set_init();
+  sg_activity_set_push(pending_activities, (sg_activity_t)exec);
+  sg_activity_set_push(pending_activities, (sg_activity_t)comm);
+
+  XBT_INFO("Wait for asynchronous activities to complete");
+  while (!sg_activity_set_empty(pending_activities)) {
+
+    sg_activity_t completed_one = sg_activity_set_wait_any(pending_activities);
+    if (sg_comm_isinstance(completed_one))
+      XBT_INFO("Completed a Comm");
+    else if (sg_exec_isinstance(completed_one))
+      XBT_INFO("Completed an Exec");
+    else
+      xbt_die("This activity set is supposed to only contain Comm or Exec");
+  }
+  XBT_INFO("Last activity is complete");
+  free(payload);
+}
+
+static void alice()
+{
+  char* payload = xbt_strdup("Message");
+  XBT_INFO("Send '%s'", payload);
+  sg_mailbox_put(sg_mailbox_by_name("mbox"), payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid_init(&argc, argv);
+  xbt_assert(argc > 1,
+             "Usage: %s platform_file\n"
+             "\tExample: %s hosts_with_disks.xml\n",
+             argv[0], argv[0]);
+
+  simgrid_load_platform(argv[1]);
+
+  sg_actor_create("alice", sg_host_by_name("alice"), alice, 0, NULL);
+  sg_actor_create("bob", sg_host_by_name("bob"), bob, 0, NULL);
+
+  simgrid_run();
+
+  return 0;
+}
diff --git a/examples/c/activityset-waitany/activityset-waitany.tesh b/examples/c/activityset-waitany/activityset-waitany.tesh
new file mode 100644 (file)
index 0000000..dac02a8
--- /dev/null
@@ -0,0 +1,9 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/c-activityset-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchronous activities to complete
+> [5.000000] [  bob] Completed an Exec
+> [5.197828] [  bob] Completed a Comm
+> [5.197828] [  bob] Last activity is complete
diff --git a/examples/c/exec-waitany/exec-waitany.c b/examples/c/exec-waitany/exec-waitany.c
deleted file mode 100644 (file)
index 2dd9d86..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/actor.h"
-#include "simgrid/engine.h"
-#include "simgrid/exec.h"
-#include "simgrid/host.h"
-
-#include "xbt/log.h"
-#include "xbt/sysdep.h"
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(exec_waitany, "Messages specific for this example");
-
-static void worker(int argc, char* argv[])
-{
-  xbt_assert(argc > 1);
-  int with_timeout = !strcmp(argv[1], "true");
-
-  /* Vector in which we store all pending executions*/
-  sg_exec_t* pending_execs = xbt_malloc(sizeof(sg_exec_t) * 3);
-  int pending_execs_count  = 0;
-
-  for (int i = 0; i < 3; i++) {
-    char* name    = bprintf("Exec-%d", i);
-    double amount = (6 * (i % 2) + i + 1) * sg_host_get_speed(sg_host_self());
-
-    sg_exec_t exec = sg_actor_exec_init(amount);
-    sg_exec_set_name(exec, name);
-    pending_execs[pending_execs_count++] = exec;
-    sg_exec_start(exec);
-
-    XBT_INFO("Activity %s has started for %.0f seconds", name, amount / sg_host_get_speed(sg_host_self()));
-    free(name);
-  }
-
-  /* Now that executions were initiated, wait for their completion, in order of termination.
-   *
-   * This loop waits for first terminating execution with wait_any() and remove it with erase(), until all execs are
-   * terminated.
-   */
-  while (pending_execs_count > 0) {
-    ssize_t pos;
-    if (with_timeout)
-      pos = sg_exec_wait_any_for(pending_execs, pending_execs_count, 4);
-    else
-      pos = sg_exec_wait_any(pending_execs, pending_execs_count);
-
-    if (pos < 0) {
-      XBT_INFO("Do not wait any longer for an activity (timeout received)");
-      pending_execs_count = 0;
-    } else {
-      XBT_INFO("Activity at position %zd is complete", pos);
-      memmove(pending_execs + pos, pending_execs + pos + 1, sizeof(sg_exec_t) * (pending_execs_count - pos - 1));
-      pending_execs_count--;
-    }
-    XBT_INFO("%d activities remain pending", pending_execs_count);
-  }
-
-  xbt_free(pending_execs);
-}
-
-int main(int argc, char* argv[])
-{
-  simgrid_init(&argc, argv);
-  simgrid_load_platform(argv[1]);
-
-  const char* worker_argv[] = {"worker", "false"};
-  sg_actor_create_("worker", sg_host_by_name("Tremblay"), worker, 2, worker_argv);
-
-  worker_argv[1] = "true";
-  sg_actor_create_("worker_timeout", sg_host_by_name("Tremblay"), worker, 2, worker_argv);
-
-  simgrid_run();
-  return 0;
-}
diff --git a/examples/c/exec-waitany/exec-waitany.tesh b/examples/c/exec-waitany/exec-waitany.tesh
deleted file mode 100644 (file)
index 57fb2ba..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/usr/bin/env tesh
-
-$ ${bindir:=.}/c-exec-waitany ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n"
-> [  0.000000] [        worker] Activity Exec-0 has started for 1 seconds
-> [  0.000000] [worker_timeout] Activity Exec-0 has started for 1 seconds
-> [  0.000000] [        worker] Activity Exec-1 has started for 8 seconds
-> [  0.000000] [worker_timeout] Activity Exec-1 has started for 8 seconds
-> [  0.000000] [        worker] Activity Exec-2 has started for 3 seconds
-> [  0.000000] [worker_timeout] Activity Exec-2 has started for 3 seconds
-> [  1.000000] [worker_timeout] Activity at position 0 is complete
-> [  1.000000] [worker_timeout] 2 activities remain pending
-> [  1.000000] [        worker] Activity at position 0 is complete
-> [  1.000000] [        worker] 2 activities remain pending
-> [  3.000000] [worker_timeout] Activity at position 1 is complete
-> [  3.000000] [worker_timeout] 1 activities remain pending
-> [  3.000000] [        worker] Activity at position 1 is complete
-> [  3.000000] [        worker] 1 activities remain pending
-> [  7.000000] [worker_timeout] Do not wait any longer for an activity (timeout received)
-> [  7.000000] [worker_timeout] 0 activities remain pending
-> [  8.000000] [        worker] Activity at position 0 is complete
-> [  8.000000] [        worker] 0 activities remain pending
index 34f1e9a..6a8901f 100644 (file)
@@ -16,7 +16,7 @@ def bob():
 
   this_actor.info("Create my asynchronous activities")
   exec = this_actor.exec_async(5e9)
-  comm, payload = mbox.get_async()
+  comm = mbox.get_async()
   io   = disk.read_async(300000000)
 
   pending_activities = ActivitySet([exec, comm])
index 36b70b8..4e4d1ee 100644 (file)
@@ -16,7 +16,7 @@ def bob():
 
   this_actor.info("Create my asynchronous activities")
   exec = this_actor.exec_async(5e9)
-  comm, payload = mbox.get_async()
+  comm = mbox.get_async()
   io   = disk.read_async(300000000)
 
   pending_activities = ActivitySet([exec, comm])
index 4f28809..44b3c6f 100644 (file)
@@ -16,7 +16,7 @@ def bob():
 
   this_actor.info("Create my asynchronous activities")
   exec = this_actor.exec_async(5e9)
-  comm, payload = mbox.get_async()
+  comm = mbox.get_async()
   io   = disk.read_async(300000000)
 
   pending_activities = ActivitySet([exec, comm])
index c2139a8..88ac531 100644 (file)
@@ -16,7 +16,7 @@ def bob():
 
   this_actor.info("Create my asynchronous activities")
   exec = this_actor.exec_async(5e9)
-  comm, payload = mbox.get_async()
+  comm = mbox.get_async()
   io   = disk.read_async(300000000)
 
   pending_activities = ActivitySet([exec, comm])
@@ -50,71 +50,3 @@ if __name__ == '__main__':
   Actor.create("alice", Host.by_name("alice"), alice)
 
   e.run()
-
-"""
-
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/s4u.hpp"
-#include <cstdlib>
-#include <iostream>
-#include <string>
-namespace sg4 = simgrid::s4u;
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
-
-static void bob()
-{
-  sg4::Mailbox* mbox    = sg4::Mailbox::by_name("mbox");
-  const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
-  std::string* payload;
-
-  XBT_INFO("Create my asynchronous activities");
-  auto exec = sg4::this_actor::exec_async(5e9);
-  auto comm = mbox->get_async(&payload);
-  auto io   = disk->read_async(3e8);
-
-  sg4::ActivitySet pending_activities({boost::dynamic_pointer_cast<sg4::Activity>(exec),
-                                       boost::dynamic_pointer_cast<sg4::Activity>(comm),
-                                       boost::dynamic_pointer_cast<sg4::Activity>(io)});
-
-  XBT_INFO("Wait for asynchronous activities to complete");
-  while (not pending_activities.empty()) {
-    auto completed_one = pending_activities.wait_any();
-    if (completed_one != nullptr) {
-      if (boost::dynamic_pointer_cast<sg4::Comm>(completed_one))
-        XBT_INFO("Completed a Comm");
-      if (boost::dynamic_pointer_cast<sg4::Exec>(completed_one))
-        XBT_INFO("Completed an Exec");
-      if (boost::dynamic_pointer_cast<sg4::Io>(completed_one))
-        XBT_INFO("Completed an I/O");
-    }
-  }
-  XBT_INFO("Last activity is complete");
-  delete payload;
-}
-
-static void alice()
-{
-  auto* payload = new std::string("Message");
-  XBT_INFO("Send '%s'", payload->c_str());
-  sg4::Mailbox::by_name("mbox")->put(payload, 6e8);
-}
-
-int main(int argc, char* argv[])
-{
-  sg4::Engine e(&argc, argv);
-
-  e.load_platform(argv[1]);
-
-  sg4::Actor::create("bob", e.host_by_name("bob"), bob);
-  sg4::Actor::create("alice", e.host_by_name("alice"), alice);
-
-  e.run();
-
-  return 0;
-}
-"""
\ No newline at end of file
index 52220cf..84469b0 100644 (file)
@@ -24,9 +24,9 @@ def create_parser() -> ArgumentParser:
 def rank0():
     rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
     this_actor.info("Post my asynchronous receives")
-    comm1, a1 = rank0_mailbox.get_async()
-    comm2, a2 = rank0_mailbox.get_async()
-    comm3, a3 = rank0_mailbox.get_async()
+    comm1 = rank0_mailbox.get_async()
+    comm2 = rank0_mailbox.get_async()
+    comm3 = rank0_mailbox.get_async()
     pending_comms: List[Comm] = [comm1, comm2, comm3]
 
     this_actor.info("Send some data to rank-1")
index 0f38795..d72490b 100644 (file)
@@ -18,7 +18,7 @@ from typing import List
 from uuid import uuid4
 import sys
 
-from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
+from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
 
 
 SIMULATED_JOB_SIZE_BYTES = 1024
@@ -78,12 +78,11 @@ def worker(worker_id: str):
 @dataclass
 class AsyncJobResult:
     job: Job
-    result_comm: Comm
-    async_data: PyGetAsync
+    comm: Comm
 
     @property
     def complete(self) -> bool:
-        return self.result_comm.test()
+        return self.comm.test()
 
     @property
     def status(self) -> str:
@@ -103,20 +102,19 @@ def client(client_id: str, jobs: List[float], wait_timeout: float):
             result_mailbox=result_mailbox
         ), SIMULATED_JOB_SIZE_BYTES)
         out_comm.detach()
-        result_comm, async_data = result_mailbox.get_async()
+        result_comm = result_mailbox.get_async()
         async_job_results.append(AsyncJobResult(
             job=job,
-            result_comm=result_comm,
-            async_data=async_data
+            comm=result_comm
         ))
     this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
-    completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
+    completed_comms = Comm.wait_all_for([entry.comm for entry in async_job_results], wait_timeout)
     logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
     logger(f"received {completed_comms}/{len(async_job_results)} results")
     for result in async_job_results:
         this_actor.info(f"{result.job.job_id}"
                         f" status={result.status}"
-                        f" result_payload={result.async_data.get() if result.complete else ''}")
+                        f" result_payload={result.comm.get_payload() if result.complete else ''}")
 
 
 def main():
index 7bc1218..96b0981 100644 (file)
@@ -50,21 +50,18 @@ class Receiver:
     def __call__(self):
         mbox = Mailbox.by_name("receiver")
 
-        pending_msgs = []
         pending_comms = []
 
         this_actor.info("Wait for %d messages asynchronously" % self.msg_count)
         for _ in range(self.msg_count):
-            comm, data = mbox.get_async()
+            comm = mbox.get_async()
             pending_comms.append(comm)
-            pending_msgs.append(data)
 
         while pending_comms:
             index = Comm.wait_any(pending_comms)
-            msg = pending_msgs[index].get()
+            msg = pending_comms[index].get_payload()
             this_actor.info("I got '%s'." % msg)
             del pending_comms[index]
-            del pending_msgs[index]
 
 ####################################################################################################
 def link_nonlinear(link: Link, capacity: float, n: int) -> float:
index bbd871d..eaa880c 100644 (file)
@@ -6,7 +6,7 @@
 from typing import List, Tuple
 import sys
 
-from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync
+from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
 
 
 RECEIVER_MAILBOX_NAME = "receiver"
@@ -44,14 +44,13 @@ class Receiver(object):
 
     def __call__(self):
         # List in which we store all incoming msgs
-        pending_comms: List[Tuple[Comm, PyGetAsync]] = []
+        pending_comms: List[Comm] = []
         this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
         for _ in range(self.messages_count):
             pending_comms.append(self.mailbox.get_async())
         while pending_comms:
-            index = Comm.wait_any([comm for (comm, _) in pending_comms])
-            _, async_data = pending_comms[index]
-            this_actor.info(f"I got '{async_data.get()}'.")
+            index = Comm.wait_any(pending_comms)
+            this_actor.info(f"I got '{pending_comms[index].get_payload()}'.")
             pending_comms.pop(index)
 
 
index b2904c3..5be8922 100644 (file)
@@ -4,12 +4,12 @@
 # under the terms of the license (GNU LGPL) which comes with this package.
 
 """
-/* This example demonstrates how to dynamically modify a graph of tasks.
- *
- * Assuming we have two instances of a service placed on different hosts,
- * we want to send data alternatively to thoses instances.
- *
- * We consider the following graph:
+This example demonstrates how to dynamically modify a graph of tasks.
+
+Assuming we have two instances of a service placed on different hosts,
+we want to send data alternatively to thoses instances.
+
+We consider the following graph:
 
            comm1
      ┌────────────────────────┐
@@ -26,8 +26,8 @@
      │                        │
      └────────────────────────┘
            comm2
- */
- """
+
+"""
 
 from argparse import ArgumentParser
 import sys
index ee5f5e7..299b743 100644 (file)
@@ -12,6 +12,8 @@
 /* C interface */
 SG_BEGIN_DECL
 
+XBT_PUBLIC int sg_comm_isinstance(sg_activity_t acti);
+
 XBT_PUBLIC void sg_comm_detach(sg_comm_t comm, void (*clean_function)(void*));
 XBT_PUBLIC int sg_comm_test(sg_comm_t comm);
 XBT_PUBLIC sg_error_t sg_comm_wait(sg_comm_t comm);
index aed0390..565684c 100644 (file)
@@ -12,6 +12,8 @@
 /* C interface */
 SG_BEGIN_DECL
 
+XBT_PUBLIC int sg_exec_isinstance(sg_activity_t acti);
+
 XBT_PUBLIC void sg_exec_set_bound(sg_exec_t exec, double bound);
 XBT_PUBLIC const char* sg_exec_get_name(const_sg_exec_t exec);
 XBT_PUBLIC void sg_exec_set_name(sg_exec_t exec, const char* name);
@@ -24,10 +26,11 @@ XBT_PUBLIC void sg_exec_cancel(sg_exec_t exec);
 XBT_PUBLIC int sg_exec_test(sg_exec_t exec);
 XBT_PUBLIC sg_error_t sg_exec_wait(sg_exec_t exec);
 XBT_PUBLIC sg_error_t sg_exec_wait_for(sg_exec_t exec, double timeout);
-// XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set instead") TODO: C bindings of ActivitySet
-XBT_PUBLIC ssize_t sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout);
-// XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set instead") TODO: C bindings of ActivitySet
-XBT_PUBLIC ssize_t sg_exec_wait_any(sg_exec_t* execs, size_t count);
+
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC ssize_t
+    sg_exec_wait_any_for(sg_exec_t* execs, size_t count, double timeout);
+XBT_ATTRIB_DEPRECATED_v339("Please use sg_activity_set_t instead") XBT_PUBLIC ssize_t
+    sg_exec_wait_any(sg_exec_t* execs, size_t count);
 
 SG_END_DECL
 
index 6c72682..85fe6a7 100644 (file)
@@ -221,6 +221,8 @@ class RemoteApp;
 } // namespace simgrid
 
 using s4u_Actor             = simgrid::s4u::Actor;
+using s4u_Activity          = simgrid::s4u::Activity;
+using s4u_ActivitySet       = simgrid::s4u::ActivitySet;
 using s4u_Barrier           = simgrid::s4u::Barrier;
 using s4u_Comm              = simgrid::s4u::Comm;
 using s4u_Exec              = simgrid::s4u::Exec;
@@ -239,6 +241,8 @@ using smx_activity_t = simgrid::kernel::activity::ActivityImpl*;
 #else
 
 typedef struct s4u_Actor s4u_Actor;
+typedef struct s4u_Activity s4u_Activity;
+typedef struct s4u_ActivitySet s4u_ActivitySet;
 typedef struct s4u_Barrier s4u_Barrier;
 typedef struct s4u_Comm s4u_Comm;
 typedef struct s4u_Exec s4u_Exec;
@@ -289,6 +293,16 @@ typedef s4u_Actor* sg_actor_t;
 /** Pointer to a constant actor object */
 typedef const s4u_Actor* const_sg_actor_t;
 
+/** Pointer to an activity object */
+typedef s4u_Activity* sg_activity_t;
+/** Pointer to a constant activity object */
+typedef const s4u_Activity* const_sg_activity_t;
+
+/** Pointer to an activity set object */
+typedef s4u_ActivitySet* sg_activity_set_t;
+/** Pointer to a constant activity set object */
+typedef const s4u_ActivitySet* const_sg_activity_set_t;
+
 /** @ingroup m_datatypes_management_details
  * @brief Type for any SimGrid size
  */
index 3b1f417..c8a0156 100644 (file)
@@ -151,6 +151,9 @@ public:
   void* get_dst_data() const { return dst_buff_; }
   /** Retrieve the size of the received data. Not to be mixed with @ref Activity::get_remaining()  */
   size_t get_dst_data_size() const { return dst_buff_size_; }
+  /** Retrieve the payload associated to the communication. You can only do that once the comm is (gracefully)
+   * terminated, and it is only setup by the default copy_data callback (not the SMPI one) */
+  void* get_payload() const;
 
   /* Common functions */
 
index b5723df..b8eddff 100644 (file)
@@ -118,6 +118,9 @@ public:
   CommPtr get_init();
   /** Creates and start an async data reception to that mailbox */
   template <typename T> CommPtr get_async(T** data);
+  /** Creates and start an async data reception to that mailbox. Since the data location is not provided, you'll have to
+   * use Comm::get_payload once the comm terminates */
+  CommPtr get_async();
 
   /** Blocking data reception */
   template <typename T> T* get();
index a8e9ce7..04f0a16 100644 (file)
@@ -76,15 +76,6 @@ std::string get_simgrid_version()
   sg_version_get(&major, &minor, &patch);
   return simgrid::xbt::string_printf("%i.%i.%i", major, minor, patch);
 }
-
-/** @brief Wrap for mailbox::get_async */
-class PyGetAsync {
-  std::unique_ptr<PyObject*> data = std::make_unique<PyObject*>();
-
-public:
-  PyObject** get() const { return data.get(); }
-};
-
 } // namespace
 
 PYBIND11_DECLARE_HOLDER_TYPE(T, boost::intrusive_ptr<T>)
@@ -634,26 +625,14 @@ PYBIND11_MODULE(simgrid, m)
           "get", [](Mailbox* self) { return py::reinterpret_steal<py::object>(self->get<PyObject>()); },
           py::call_guard<py::gil_scoped_release>(), "Blocking data reception")
       .def(
-          "get_async",
-          [](Mailbox* self) -> std::tuple<CommPtr, PyGetAsync> {
-            PyGetAsync wrap;
-            auto comm = self->get_async(wrap.get());
-            return std::make_tuple(std::move(comm), std::move(wrap));
-          },
+          "get_async", [](Mailbox* self) -> CommPtr { return self->get_async(); },
           py::call_guard<py::gil_scoped_release>(),
           "Non-blocking data reception. Use data.get() to get the python object after the communication has finished")
       .def("set_receiver", &Mailbox::set_receiver, py::call_guard<py::gil_scoped_release>(),
            "Sets the actor as permanent receiver");
 
-  /* Class PyGetAsync */
-  py::class_<PyGetAsync>(m, "PyGetAsync", "Wrapper for async get communications")
-      .def(py::init<>())
-      .def(
-          "get", [](const PyGetAsync* self) { return py::reinterpret_steal<py::object>(*(self->get())); },
-          "Get python object after async communication in receiver side");
-
   /* class Activity */
-  py::class_<Activity, ActivityPtr>(m, "Activityy", "Activity. See the C++ documentation for details.");
+  py::class_<Activity, ActivityPtr>(m, "Activity", "Activity. See the C++ documentation for details.");
 
   /* Class Comm */
   py::class_<Comm, CommPtr, Activity>(m, "Comm", "Communication. See the C++ documentation for details.")
@@ -693,6 +672,11 @@ PYBIND11_MODULE(simgrid, m)
            "Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
       .def("wait_until", &Comm::wait_until, py::call_guard<py::gil_scoped_release>(), py::arg("time_limit"),
            "Block until the completion of that communication, or raises TimeoutException after the specified time.")
+      .def(
+          "get_payload",
+          [](const Comm* self) { return py::reinterpret_steal<py::object>((PyObject*)self->get_payload()); },
+          py::call_guard<py::gil_scoped_release>(),
+          "Retrieve the message's payload of a get_async. You cannot call this until after the comm termination.")
       .def("detach", py::overload_cast<>(&Comm::detach), py::return_value_policy::reference_internal,
            py::call_guard<py::gil_scoped_release>(),
            "Start the comm, and ignore its result. It can be completely forgotten after that.")
index e5c1ddf..25a7661 100644 (file)
@@ -37,7 +37,9 @@ CommImpl::CommImpl()
 std::function<void(CommImpl*, void*, size_t)> CommImpl::copy_data_callback_ = [](kernel::activity::CommImpl* comm,
                                                                                  void* buff, size_t buff_size) {
   xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
-  *(void**)(comm->dst_buff_) = buff;
+  if (comm->dst_buff_ != nullptr) // get_async provided a buffer
+    *(void**)(comm->dst_buff_) = buff;
+  comm->payload_ = buff; // Setup what will be retrieved by s4u::Comm::get_payload()
 };
 
 void CommImpl::set_copy_data_callback(const std::function<void(CommImpl*, void*, size_t)>& callback)
@@ -192,7 +194,7 @@ void CommImpl::copy_data()
 {
   size_t buff_size = src_buff_size_;
   /* If there is no data to copy then return */
-  if (not src_buff_ || not dst_buff_ || copied_)
+  if (not src_buff_ || not dst_buff_size_ || copied_)
     return;
 
   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this,
index 4fcf67c..f3d9839 100644 (file)
@@ -98,6 +98,7 @@ expectations of the other side, too. See  */
   unsigned char* dst_buff_ = nullptr;
   size_t src_buff_size_    = 0;
   size_t* dst_buff_size_   = nullptr;
+  void* payload_           = nullptr; // If dst_buff_ is NULL, the default copy callback puts the data here
 
   void* src_data_ = nullptr; /* User data associated to the communication */
   void* dst_data_ = nullptr;
index 6e7eded..8ba545a 100644 (file)
@@ -7,6 +7,7 @@
 #include "src/kernel/actor/ActorImpl.hpp"
 #include "src/kernel/actor/CommObserver.hpp"
 #include <simgrid/Exception.hpp>
+#include <simgrid/activity_set.h>
 #include <simgrid/s4u/ActivitySet.hpp>
 #include <simgrid/s4u/Engine.hpp>
 
@@ -97,3 +98,63 @@ ActivityPtr ActivitySet::get_failed_activity()
 
 } // namespace s4u
 } // namespace simgrid
+
+SG_BEGIN_DECL
+
+sg_activity_set_t sg_activity_set_init()
+{
+  return new simgrid::s4u::ActivitySet();
+}
+void sg_activity_set_push(sg_activity_set_t as, sg_activity_t acti)
+{
+  as->push(acti);
+}
+void sg_activity_set_erase(sg_activity_set_t as, sg_activity_t acti)
+{
+  as->erase(acti);
+}
+size_t sg_activity_set_size(sg_activity_set_t as)
+{
+  return as->size();
+}
+int sg_activity_set_empty(sg_activity_set_t as)
+{
+  return as->empty();
+}
+
+sg_activity_t sg_activity_set_test_any(sg_activity_set_t as)
+{
+  return as->test_any().get();
+}
+void sg_activity_set_wait_all(sg_activity_set_t as)
+{
+  as->wait_all();
+}
+int sg_activity_set_wait_all_for(sg_activity_set_t as, double timeout)
+{
+  try {
+    as->wait_all_for(timeout);
+    return 1;
+  } catch (const simgrid::TimeoutException& e) {
+    return 0;
+  }
+}
+sg_activity_t sg_activity_set_wait_any(sg_activity_set_t as)
+{
+  return as->wait_any().get();
+}
+sg_activity_t sg_activity_set_wait_any_for(sg_activity_set_t as, double timeout)
+{
+  try {
+    return as->wait_any_for(timeout).get();
+  } catch (const simgrid::TimeoutException& e) {
+    return nullptr;
+  }
+}
+
+void sg_activity_set_delete(sg_activity_set_t as)
+{
+  delete as;
+}
+
+SG_END_DECL
index d88103a..404321a 100644 (file)
@@ -6,6 +6,7 @@
 #include <cmath>
 #include <simgrid/Exception.hpp>
 #include <simgrid/comm.h>
+#include <simgrid/s4u/ActivitySet.hpp>
 #include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Engine.hpp>
 #include <simgrid/s4u/Mailbox.hpp>
@@ -283,6 +284,14 @@ CommPtr Comm::set_payload_size(uint64_t bytes)
   return this;
 }
 
+void* Comm::get_payload() const
+{
+  xbt_assert(get_state() == State::FINISHED,
+             "You can only retrieve the payload of a communication that gracefully terminated, but its state is %s.",
+             get_state_str());
+  return static_cast<kernel::activity::CommImpl*>(pimpl_.get())->payload_;
+}
+
 Actor* Comm::get_sender() const
 {
   kernel::actor::ActorImplPtr sender = nullptr;
@@ -309,6 +318,9 @@ Comm* Comm::do_start()
 {
   xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
              "You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
+
+  auto myself = kernel::actor::ActorImpl::self();
+
   if (get_source() != nullptr || get_destination() != nullptr) {
     xbt_assert(is_assigned(), "When either from_ or to_ is specified, both must be.");
     xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
@@ -320,7 +332,7 @@ Comm* Comm::do_start()
     });
     fire_on_start();
     fire_on_this_start();
-  } else if (src_buff_ != nullptr) { // Sender side
+  } else if (myself == sender_) {
     on_send(*this);
     on_this_send(*this);
     kernel::actor::CommIsendSimcall observer{sender_,
@@ -337,7 +349,7 @@ Comm* Comm::do_start()
                                              "Isend"};
     pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
                                              &observer);
-  } else if (dst_buff_ != nullptr) { // Receiver side
+  } else if (myself == receiver_) {
     xbt_assert(not detached_, "Receive cannot be detached");
     on_recv(*this);
     on_this_recv(*this);
@@ -498,6 +510,11 @@ size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout)
 }
 } // namespace simgrid::s4u
 /* **************************** Public C interface *************************** */
+int sg_comm_isinstance(sg_activity_t acti)
+{
+  return dynamic_cast<simgrid::s4u::Comm*>(acti) != nullptr;
+}
+
 void sg_comm_detach(sg_comm_t comm, void (*clean_function)(void*))
 {
   comm->detach(clean_function);
index 4a641e2..caa7466 100644 (file)
@@ -259,6 +259,11 @@ bool Exec::is_assigned() const
 } // namespace simgrid::s4u
 
 /* **************************** Public C interface *************************** */
+int sg_exec_isinstance(sg_activity_t acti)
+{
+  return dynamic_cast<simgrid::s4u::Exec*>(acti) != nullptr;
+}
+
 void sg_exec_set_bound(sg_exec_t exec, double bound)
 {
   exec->set_bound(bound);
index 6ce4a03..76613fb 100644 (file)
@@ -127,6 +127,13 @@ CommPtr Mailbox::get_init()
   return res;
 }
 
+CommPtr Mailbox::get_async()
+{
+  CommPtr res = get_init()->set_dst_data(nullptr, sizeof(void*));
+  res->start();
+  return res;
+}
+
 kernel::activity::ActivityImplPtr
 Mailbox::iprobe(int type, const std::function<bool(void*, void*, kernel::activity::CommImpl*)>& match_fun, void* data)
 {
index 56d9e61..0ea50b2 100644 (file)
@@ -6,7 +6,7 @@ endforeach()
 
 foreach(x actor actor-autorestart actor-suspend
         activity-lifecycle
-        comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for wait-any-for
+        comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for
         cloud-interrupt-migration cloud-two-execs
        monkey-masterworkers monkey-semaphore
         concurrent_rw
index 96da677..5456284 100644 (file)
@@ -649,6 +649,7 @@ set(MC_SIMGRID_MC_SRC  src/mc/explo/simgrid_mc.cpp)
 
 set(headers_to_install
   include/simgrid/actor.h
+  include/simgrid/activityset.h
   include/simgrid/barrier.h
   include/simgrid/comm.h
   include/simgrid/engine.h