Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
C version of async-waitall
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Tue, 11 Feb 2020 22:23:50 +0000 (23:23 +0100)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Tue, 11 Feb 2020 22:33:55 +0000 (23:33 +0100)
12 files changed:
MANIFEST.in
examples/c/CMakeLists.txt
examples/c/async-waitall/async-waitall.c [new file with mode: 0644]
examples/c/async-waitall/async-waitall.tesh [new file with mode: 0644]
examples/c/async-waitall/async-waitall_d.xml [moved from teshsuite/msg/async-waitall/async-waitall_d.xml with 50% similarity]
examples/c/async-waitany/async-waitany.c
examples/s4u/async-waitall/s4u-async-waitall_d.xml
include/simgrid/comm.h
src/s4u/s4u_Comm.cpp
teshsuite/msg/CMakeLists.txt
teshsuite/msg/async-waitall/async-waitall.c [deleted file]
teshsuite/msg/async-waitall/async-waitall.tesh [deleted file]

index 4264bf3..27ca76f 100644 (file)
@@ -39,6 +39,9 @@ include examples/c/app-pingpong/app-pingpong.tesh
 include examples/c/app-pingpong/app-pingpong_d.xml
 include examples/c/app-token-ring/app-token-ring.c
 include examples/c/app-token-ring/app-token-ring.tesh
+include examples/c/async-waitall/async-waitall.c
+include examples/c/async-waitall/async-waitall.tesh
+include examples/c/async-waitall/async-waitall_d.xml
 include examples/c/async-waitany/async-waitany.c
 include examples/c/async-waitany/async-waitany.tesh
 include examples/c/async-waitany/async-waitany_d.xml
@@ -636,9 +639,6 @@ include teshsuite/msg/async-wait/async-wait2_d.xml
 include teshsuite/msg/async-wait/async-wait3_d.xml
 include teshsuite/msg/async-wait/async-wait4_d.xml
 include teshsuite/msg/async-wait/async-wait_d.xml
-include teshsuite/msg/async-waitall/async-waitall.c
-include teshsuite/msg/async-waitall/async-waitall.tesh
-include teshsuite/msg/async-waitall/async-waitall_d.xml
 include teshsuite/msg/async-waitany/async-waitany.c
 include teshsuite/msg/async-waitany/async-waitany.tesh
 include teshsuite/msg/async-waitany/async-waitany_d.xml
index 4d0ea2d..871a0bf 100644 (file)
@@ -1,6 +1,8 @@
 foreach(x
         actor-create actor-daemon actor-exiting actor-join actor-kill actor-migrate actor-suspend actor-yield
-        app-pingpong app-token-ring async-waitany io-disk-raw)
+        app-pingpong app-token-ring 
+        async-waitall async-waitany 
+        io-disk-raw)
   add_executable       (${x}-c EXCLUDE_FROM_ALL ${x}/${x}.c)
   target_link_libraries(${x}-c simgrid)
   set_target_properties(${x}-c PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${x})
@@ -16,12 +18,15 @@ set(tesh_files    ${tesh_files}     PARENT_SCOPE)
 set(xml_files     ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/actor-create/actor-create_d.xml
                                ${CMAKE_CURRENT_SOURCE_DIR}/actor-yield/actor-yield_d.xml
                                ${CMAKE_CURRENT_SOURCE_DIR}/app-pingpong/app-pingpong_d.xml
+                               ${CMAKE_CURRENT_SOURCE_DIR}/async-waitall/async-waitall_d.xml
                                ${CMAKE_CURRENT_SOURCE_DIR}/async-waitany/async-waitany_d.xml
                                PARENT_SCOPE)
 
 foreach(x
         actor-create actor-daemon actor-exiting actor-join actor-kill actor-migrate actor-suspend actor-yield
-        app-pingpong app-token-ring async-waitany io-disk-raw)
+        app-pingpong app-token-ring 
+        async-waitall async-waitany 
+        io-disk-raw)
   ADD_TESH(c-${x} --setenv platfdir=${CMAKE_HOME_DIRECTORY}/examples/platforms
                   --setenv bindir=${CMAKE_BINARY_DIR}/examples/c/${x}
                   --cd ${CMAKE_HOME_DIRECTORY}/examples/c/${x}
diff --git a/examples/c/async-waitall/async-waitall.c b/examples/c/async-waitall/async-waitall.c
new file mode 100644 (file)
index 0000000..bd58ed3
--- /dev/null
@@ -0,0 +1,105 @@
+/* Copyright (c) 2010-2020. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/asserts.h"
+#include "xbt/log.h"
+#include "xbt/str.h"
+
+#include <stdio.h> /* snprintf */
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(msg_async_waitall, "Messages specific for this msg example");
+
+static void sender(int argc, char* argv[])
+{
+  xbt_assert(argc == 4, "This function expects 3 parameters from the XML deployment file");
+  long messages_count  = xbt_str_parse_int(argv[1], "Invalid message count: %s");
+  long message_size    = xbt_str_parse_int(argv[2], "Invalid message size: %s");
+  long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers: %s");
+
+  /* Array in which we store all ongoing communications */
+  sg_comm_t* pending_comms = malloc(sizeof(sg_comm_t) * (messages_count + receivers_count));
+  int pending_comms_count  = 0;
+
+  /* Make an array of the mailboxes to use */
+  sg_mailbox_t* mboxes = malloc(sizeof(sg_mailbox_t) * receivers_count);
+  for (long i = 0; i < receivers_count; i++) {
+    char mailbox_name[80];
+    snprintf(mailbox_name, 79, "receiver-%ld", i);
+    sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
+    mboxes[i]         = mbox;
+  }
+
+  /* Start dispatching all messages to receivers, in a round robin fashion */
+  for (int i = 0; i < messages_count; i++) {
+    char msg_content[80];
+    snprintf(msg_content, 79, "Message %d", i);
+    sg_mailbox_t mbox = mboxes[i % receivers_count];
+    XBT_INFO("Send '%s' to '%s'", msg_content, sg_mailbox_get_name(mbox));
+    /* Create a communication representing the ongoing communication, and store it in pending_comms */
+    pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, xbt_strdup(msg_content), message_size);
+  }
+
+  /* Start sending messages to let the workers know that they should stop */
+  for (int i = 0; i < receivers_count; i++) {
+    XBT_INFO("Send 'finalize' to 'receiver-%d'", i);
+    char* end_msg                        = xbt_strdup("finalize");
+    sg_mailbox_t mbox                    = mboxes[i % receivers_count];
+    pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, end_msg, 0);
+  }
+
+  XBT_INFO("Done dispatching all messages");
+
+  /* Now that all message exchanges were initiated, wait for their completion in one single call */
+  sg_comm_wait_all(pending_comms, pending_comms_count);
+
+  free(pending_comms);
+  free(mboxes);
+
+  XBT_INFO("Goodbye now!");
+}
+
+static void receiver(int argc, char* argv[])
+{
+  xbt_assert(argc == 2, "Expecting one parameter from the XML deployment file but got %d", argc);
+  int id = xbt_str_parse_int(argv[1], "ID should be numerical, not %s");
+  char mailbox_name[80];
+  snprintf(mailbox_name, 79, "receiver-%d", id);
+  sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
+  XBT_INFO("Wait for my first message");
+  while (1) {
+    char* received = (char*)sg_mailbox_get(mbox);
+    XBT_INFO("I got a '%s'.", received);
+    if (!strcmp(received, "finalize")) { // If it's a finalize message, we're done
+      xbt_free(received);
+      break;
+    }
+    xbt_free(received);
+  }
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid_init(&argc, argv);
+  xbt_assert(argc > 2,
+             "Usage: %s platform_file deployment_file\n"
+             "\tExample: %s msg_platform.xml msg_deployment.xml\n",
+             argv[0], argv[0]);
+
+  simgrid_load_platform(argv[1]);
+
+  simgrid_register_function("sender", sender);
+  simgrid_register_function("receiver", receiver);
+  simgrid_load_deployment(argv[2]);
+
+  simgrid_run();
+
+  return 0;
+}
diff --git a/examples/c/async-waitall/async-waitall.tesh b/examples/c/async-waitall/async-waitall.tesh
new file mode 100644 (file)
index 0000000..74a300b
--- /dev/null
@@ -0,0 +1,22 @@
+#!/usr/bin/env tesh
+
+! output sort 19
+$ ${bindir:=.}/async-waitall-c ${platfdir:=.}/small_platform_fatpipe.xml ${srcdir:=.}/async-waitall_d.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
+> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
+> [  0.000000] (2:receiver@Ruby) Wait for my first message
+> [  0.000000] (3:receiver@Perl) Wait for my first message
+> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
+> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
+> [  0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
+> [  0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
+> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
+> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
+> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
+> [  0.004022] (2:receiver@Ruby) I got a 'Message 0'.
+> [  0.004022] (3:receiver@Perl) I got a 'Message 1'.
+> [  0.008043] (2:receiver@Ruby) I got a 'Message 2'.
+> [  0.008043] (3:receiver@Perl) I got a 'Message 3'.
+> [  0.009995] (3:receiver@Perl) I got a 'finalize'.
+> [  0.012065] (2:receiver@Ruby) I got a 'Message 4'.
+> [  0.014016] (2:receiver@Ruby) I got a 'finalize'.
+> [  0.014016] (1:sender@Tremblay) Goodbye now!
@@ -3,13 +3,15 @@
 <platform version="4.1">
   <!-- The master actor (with some arguments) -->
   <actor host="Tremblay" function="sender">
-    <argument value="3"/>       <!-- Number of tasks -->
-    <argument value="50000000"/>  <!-- Computation size of tasks -->
-    <argument value="1000000"/>   <!-- Communication size of tasks -->
-    <argument value="1"/>         <!-- Number of receivers -->
+    <argument value="5"/>       <!-- Number of messages -->
+    <argument value="1000000"/> <!-- Size of messages -->
+    <argument value="2"/>       <!-- Number of receivers -->
   </actor>
-  <!-- The receiver processes -->
+  <!-- The receiver actors -->
   <actor host="Ruby" function="receiver">
     <argument value="0"/>
   </actor>
+  <actor host="Perl" function="receiver">
+    <argument value="1"/>
+  </actor>
 </platform>
index 3871f00..d5c0644 100644 (file)
@@ -19,8 +19,8 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(async_waitany, "Messages specific for this example"
 static void sender(int argc, char* argv[])
 {
   xbt_assert(argc == 4, "Expecting 3 parameters from the XML deployment file but got %d", argc);
-  long messages_count  = xbt_str_parse_int(argv[1], "Invalid amount of tasks: %s");
-  long msg_size        = xbt_str_parse_int(argv[2], "Invalid communication size: %s");
+  long messages_count  = xbt_str_parse_int(argv[1], "Invalid message count: %s");
+  long msg_size        = xbt_str_parse_int(argv[2], "Invalid message size: %s");
   long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers: %s");
 
   /* Array in which we store all ongoing communications */
@@ -115,7 +115,6 @@ int main(int argc, char* argv[])
   simgrid_load_deployment(argv[2]);
 
   simgrid_run();
-
   XBT_INFO("Simulation time %g", simgrid_get_clock());
 
   return 0;
index 8e51eff..65611cc 100644 (file)
@@ -3,11 +3,11 @@
 <platform version="4.1">
   <!-- The master actor (with some arguments) -->
   <actor host="Tremblay" function="sender">
-    <argument value="5"/>         <!-- Number of messages -->
-    <argument value="1000000"/>   <!-- Size of messages -->
-    <argument value="2"/>         <!-- Number of receivers -->
+    <argument value="5"/>       <!-- Number of messages -->
+    <argument value="1000000"/> <!-- Size of messages -->
+    <argument value="2"/>       <!-- Number of receivers -->
   </actor>
-  <!-- The receiver processes. -->
+  <!-- The receiver actors. -->
   <actor host="Ruby" function="receiver">
     <argument value="0"/>
   </actor>
index e3b3cd1..3742c42 100644 (file)
@@ -12,6 +12,7 @@
 /* C interface */
 SG_BEGIN_DECL
 
+XBT_PUBLIC void sg_comm_wait_all(sg_comm_t* comms, size_t count);
 XBT_PUBLIC int sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout);
 XBT_PUBLIC int sg_comm_wait_any(sg_comm_t* comms, size_t count);
 
index 14501e9..33c613e 100644 (file)
@@ -255,10 +255,22 @@ Actor* Comm::get_sender() const
 } // namespace s4u
 } // namespace simgrid
 /* **************************** Public C interface *************************** */
+void sg_comm_wait_all(sg_comm_t* comms, size_t count)
+{
+  std::vector<simgrid::s4u::CommPtr> s4u_comms;
+  for (unsigned int i = 0; i < count; i++)
+    s4u_comms.emplace_back(comms[i]);
+
+  simgrid::s4u::Comm::wait_all(&s4u_comms);
+  for (unsigned int i = 0; i < count; i++)
+    s4u_comms[i]->unref();
+}
+
 int sg_comm_wait_any(sg_comm_t* comms, size_t count)
 {
   return sg_comm_wait_any_for(comms, count, -1);
 }
+
 int sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout)
 {
   std::vector<simgrid::s4u::CommPtr> s4u_comms;
index 9f7e48b..d083580 100644 (file)
@@ -1,5 +1,5 @@
 # C examples
-foreach(x async-wait async-waitall async-waitany
+foreach(x async-wait async-waitany
           cloud-capping cloud-migration cloud-two-tasks cloud-simple
           get_sender host_on_off host_on_off_recv
           process-lifetime
@@ -64,7 +64,6 @@ set(xml_files     ${xml_files}     ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/ap
                                    ${CMAKE_CURRENT_SOURCE_DIR}/async-wait/async-wait2_d.xml
                                    ${CMAKE_CURRENT_SOURCE_DIR}/async-wait/async-wait3_d.xml
                                    ${CMAKE_CURRENT_SOURCE_DIR}/async-wait/async-wait4_d.xml
-                                   ${CMAKE_CURRENT_SOURCE_DIR}/async-waitall/async-waitall_d.xml
                                    ${CMAKE_CURRENT_SOURCE_DIR}/async-waitany/async-waitany_d.xml
                                    ${CMAKE_CURRENT_SOURCE_DIR}/io-file-remote/io-file-remote_d.xml
                                    ${CMAKE_CURRENT_SOURCE_DIR}/platform-properties/platform-properties_d.xml
@@ -88,7 +87,7 @@ set(xml_files     ${xml_files}     ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/ap
 
 if(enable_msg)
   foreach(x 
-    async-wait async-waitall
+    async-wait
     app-bittorrent app-chainsend
     cloud-capping cloud-migration cloud-two-tasks cloud-simple
     energy-pstate
diff --git a/teshsuite/msg/async-waitall/async-waitall.c b/teshsuite/msg/async-waitall/async-waitall.c
deleted file mode 100644 (file)
index ef2c868..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-/* Copyright (c) 2010-2020. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/msg.h"
-
-#include <stdio.h> /* snprintf */
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(msg_async_waitall, "Messages specific for this msg example");
-
-static int sender(int argc, char* argv[])
-{
-  xbt_assert(argc == 5, "This function expects 4 parameters from the XML deployment file");
-  long number_of_tasks  = xbt_str_parse_int(argv[1], "Invalid amount of tasks: %s");
-  double task_comp_size = xbt_str_parse_double(argv[2], "Invalid computational size: %s");
-  double task_comm_size = xbt_str_parse_double(argv[3], "Invalid communication size: %s");
-  long receivers_count  = xbt_str_parse_int(argv[4], "Invalid amount of receivers: %s");
-
-  msg_comm_t* comm = xbt_new(msg_comm_t, number_of_tasks + receivers_count);
-  for (int i = 0; i < number_of_tasks; i++) {
-    char mailbox[80];
-    char taskname[80];
-    snprintf(mailbox, 79, "receiver-%ld", i % receivers_count);
-    snprintf(taskname, 79, "Task_%d", i);
-    msg_task_t task = MSG_task_create(taskname, task_comp_size, task_comm_size, NULL);
-    comm[i]         = MSG_task_isend(task, mailbox);
-    XBT_INFO("Send to receiver-%ld Task_%d", i % receivers_count, i);
-  }
-  for (int i = 0; i < receivers_count; i++) {
-    char mailbox[80];
-    snprintf(mailbox, 79, "receiver-%ld", i % receivers_count);
-    msg_task_t task           = MSG_task_create("finalize", 0, 0, 0);
-    comm[i + number_of_tasks] = MSG_task_isend(task, mailbox);
-    XBT_INFO("Send to receiver-%ld finalize", i % receivers_count);
-  }
-
-  /* Here we are waiting for the completion of all communications */
-  MSG_comm_waitall(comm, (number_of_tasks + receivers_count), -1);
-  for (int i = 0; i < number_of_tasks + receivers_count; i++)
-    MSG_comm_destroy(comm[i]);
-
-  XBT_INFO("Goodbye now!");
-  xbt_free(comm);
-  return 0;
-}
-
-static int receiver(int argc, char* argv[])
-{
-  xbt_assert(argc == 2, "This function expects 1 parameter from the XML deployment file");
-  int id = xbt_str_parse_int(argv[1], "Any process of this example must have a numerical name, not %s");
-
-  char mailbox[80];
-  snprintf(mailbox, 79, "receiver-%d", id);
-
-  MSG_process_sleep(10);
-  while (1) {
-    XBT_INFO("Wait to receive a task");
-    msg_task_t task = NULL;
-    msg_comm_t comm = MSG_task_irecv(&task, mailbox);
-    msg_error_t res = MSG_comm_wait(comm, -1);
-    xbt_assert(res == MSG_OK, "MSG_task_get failed");
-    MSG_comm_destroy(comm);
-    XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
-    if (strcmp(MSG_task_get_name(task), "finalize") == 0) {
-      MSG_task_destroy(task);
-      break;
-    }
-
-    XBT_INFO("Processing \"%s\"", MSG_task_get_name(task));
-    MSG_task_execute(task);
-    XBT_INFO("\"%s\" done", MSG_task_get_name(task));
-    MSG_task_destroy(task);
-  }
-  XBT_INFO("I'm done. See you!");
-  return 0;
-}
-
-int main(int argc, char* argv[])
-{
-  MSG_init(&argc, argv);
-  xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n"
-                       "\tExample: %s msg_platform.xml msg_deployment.xml\n",
-             argv[0], argv[0]);
-
-  MSG_create_environment(argv[1]);
-  MSG_function_register("sender", sender);
-  MSG_function_register("receiver", receiver);
-  MSG_launch_application(argv[2]);
-
-  msg_error_t res = MSG_main();
-
-  XBT_INFO("Simulation time %g", MSG_get_clock());
-
-  return res != MSG_OK;
-}
diff --git a/teshsuite/msg/async-waitall/async-waitall.tesh b/teshsuite/msg/async-waitall/async-waitall.tesh
deleted file mode 100644 (file)
index b815ef3..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/usr/bin/env tesh
-p Test1 MSG_comm_waitall() for sender
-
-! output sort 19
-$ ${bindir:=.}/async-waitall ${platfdir:=.}/small_platform_fatpipe.xml ${srcdir:=.}/async-waitall_d.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
-> [  0.000000] (1:sender@Tremblay) Send to receiver-0 Task_0
-> [  0.000000] (1:sender@Tremblay) Send to receiver-0 Task_1
-> [  0.000000] (1:sender@Tremblay) Send to receiver-0 Task_2
-> [  0.000000] (1:sender@Tremblay) Send to receiver-0 finalize
-> [ 10.000000] (2:receiver@Ruby) Wait to receive a task
-> [ 10.004022] (2:receiver@Ruby) Received "Task_0"
-> [ 10.004022] (2:receiver@Ruby) Processing "Task_0"
-> [ 10.513732] (2:receiver@Ruby) "Task_0" done
-> [ 10.513732] (2:receiver@Ruby) Wait to receive a task
-> [ 10.517753] (2:receiver@Ruby) Received "Task_1"
-> [ 10.517753] (2:receiver@Ruby) Processing "Task_1"
-> [ 11.027463] (2:receiver@Ruby) "Task_1" done
-> [ 11.027463] (2:receiver@Ruby) Wait to receive a task
-> [ 11.031485] (2:receiver@Ruby) Received "Task_2"
-> [ 11.031485] (2:receiver@Ruby) Processing "Task_2"
-> [ 11.541195] (2:receiver@Ruby) "Task_2" done
-> [ 11.541195] (2:receiver@Ruby) Wait to receive a task
-> [ 11.543146] (0:maestro@) Simulation time 11.5431
-> [ 11.543146] (1:sender@Tremblay) Goodbye now!
-> [ 11.543146] (2:receiver@Ruby) Received "finalize"
-> [ 11.543146] (2:receiver@Ruby) I'm done. See you!