include examples/c/app-pingpong/app-pingpong_d.xml
include examples/c/app-token-ring/app-token-ring.c
include examples/c/app-token-ring/app-token-ring.tesh
+include examples/c/async-waitall/async-waitall.c
+include examples/c/async-waitall/async-waitall.tesh
+include examples/c/async-waitall/async-waitall_d.xml
include examples/c/async-waitany/async-waitany.c
include examples/c/async-waitany/async-waitany.tesh
include examples/c/async-waitany/async-waitany_d.xml
include teshsuite/msg/async-wait/async-wait3_d.xml
include teshsuite/msg/async-wait/async-wait4_d.xml
include teshsuite/msg/async-wait/async-wait_d.xml
-include teshsuite/msg/async-waitall/async-waitall.c
-include teshsuite/msg/async-waitall/async-waitall.tesh
-include teshsuite/msg/async-waitall/async-waitall_d.xml
include teshsuite/msg/async-waitany/async-waitany.c
include teshsuite/msg/async-waitany/async-waitany.tesh
include teshsuite/msg/async-waitany/async-waitany_d.xml
foreach(x
actor-create actor-daemon actor-exiting actor-join actor-kill actor-migrate actor-suspend actor-yield
- app-pingpong app-token-ring async-waitany io-disk-raw)
+ app-pingpong app-token-ring
+ async-waitall async-waitany
+ io-disk-raw)
add_executable (${x}-c EXCLUDE_FROM_ALL ${x}/${x}.c)
target_link_libraries(${x}-c simgrid)
set_target_properties(${x}-c PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${x})
set(xml_files ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/actor-create/actor-create_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/actor-yield/actor-yield_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/app-pingpong/app-pingpong_d.xml
+ ${CMAKE_CURRENT_SOURCE_DIR}/async-waitall/async-waitall_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/async-waitany/async-waitany_d.xml
PARENT_SCOPE)
foreach(x
actor-create actor-daemon actor-exiting actor-join actor-kill actor-migrate actor-suspend actor-yield
- app-pingpong app-token-ring async-waitany io-disk-raw)
+ app-pingpong app-token-ring
+ async-waitall async-waitany
+ io-disk-raw)
ADD_TESH(c-${x} --setenv platfdir=${CMAKE_HOME_DIRECTORY}/examples/platforms
--setenv bindir=${CMAKE_BINARY_DIR}/examples/c/${x}
--cd ${CMAKE_HOME_DIRECTORY}/examples/c/${x}
--- /dev/null
+/* Copyright (c) 2010-2020. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/host.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/asserts.h"
+#include "xbt/log.h"
+#include "xbt/str.h"
+
+#include <stdio.h> /* snprintf */
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(msg_async_waitall, "Messages specific for this msg example");
+
+static void sender(int argc, char* argv[])
+{
+ xbt_assert(argc == 4, "This function expects 3 parameters from the XML deployment file");
+ long messages_count = xbt_str_parse_int(argv[1], "Invalid message count: %s");
+ long message_size = xbt_str_parse_int(argv[2], "Invalid message size: %s");
+ long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers: %s");
+
+ /* Array in which we store all ongoing communications */
+ sg_comm_t* pending_comms = malloc(sizeof(sg_comm_t) * (messages_count + receivers_count));
+ int pending_comms_count = 0;
+
+ /* Make an array of the mailboxes to use */
+ sg_mailbox_t* mboxes = malloc(sizeof(sg_mailbox_t) * receivers_count);
+ for (long i = 0; i < receivers_count; i++) {
+ char mailbox_name[80];
+ snprintf(mailbox_name, 79, "receiver-%ld", i);
+ sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
+ mboxes[i] = mbox;
+ }
+
+ /* Start dispatching all messages to receivers, in a round robin fashion */
+ for (int i = 0; i < messages_count; i++) {
+ char msg_content[80];
+ snprintf(msg_content, 79, "Message %d", i);
+ sg_mailbox_t mbox = mboxes[i % receivers_count];
+ XBT_INFO("Send '%s' to '%s'", msg_content, sg_mailbox_get_name(mbox));
+ /* Create a communication representing the ongoing communication, and store it in pending_comms */
+ pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, xbt_strdup(msg_content), message_size);
+ }
+
+ /* Start sending messages to let the workers know that they should stop */
+ for (int i = 0; i < receivers_count; i++) {
+ XBT_INFO("Send 'finalize' to 'receiver-%d'", i);
+ char* end_msg = xbt_strdup("finalize");
+ sg_mailbox_t mbox = mboxes[i % receivers_count];
+ pending_comms[pending_comms_count++] = sg_mailbox_put_async(mbox, end_msg, 0);
+ }
+
+ XBT_INFO("Done dispatching all messages");
+
+ /* Now that all message exchanges were initiated, wait for their completion in one single call */
+ sg_comm_wait_all(pending_comms, pending_comms_count);
+
+ free(pending_comms);
+ free(mboxes);
+
+ XBT_INFO("Goodbye now!");
+}
+
+static void receiver(int argc, char* argv[])
+{
+ xbt_assert(argc == 2, "Expecting one parameter from the XML deployment file but got %d", argc);
+ int id = xbt_str_parse_int(argv[1], "ID should be numerical, not %s");
+ char mailbox_name[80];
+ snprintf(mailbox_name, 79, "receiver-%d", id);
+ sg_mailbox_t mbox = sg_mailbox_by_name(mailbox_name);
+ XBT_INFO("Wait for my first message");
+ while (1) {
+ char* received = (char*)sg_mailbox_get(mbox);
+ XBT_INFO("I got a '%s'.", received);
+ if (!strcmp(received, "finalize")) { // If it's a finalize message, we're done
+ xbt_free(received);
+ break;
+ }
+ xbt_free(received);
+ }
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid_init(&argc, argv);
+ xbt_assert(argc > 2,
+ "Usage: %s platform_file deployment_file\n"
+ "\tExample: %s msg_platform.xml msg_deployment.xml\n",
+ argv[0], argv[0]);
+
+ simgrid_load_platform(argv[1]);
+
+ simgrid_register_function("sender", sender);
+ simgrid_register_function("receiver", receiver);
+ simgrid_load_deployment(argv[2]);
+
+ simgrid_run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+! output sort 19
+$ ${bindir:=.}/async-waitall-c ${platfdir:=.}/small_platform_fatpipe.xml ${srcdir:=.}/async-waitall_d.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
+> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
+> [ 0.000000] (2:receiver@Ruby) Wait for my first message
+> [ 0.000000] (3:receiver@Perl) Wait for my first message
+> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
+> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
+> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
+> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
+> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
+> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
+> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
+> [ 0.004022] (2:receiver@Ruby) I got a 'Message 0'.
+> [ 0.004022] (3:receiver@Perl) I got a 'Message 1'.
+> [ 0.008043] (2:receiver@Ruby) I got a 'Message 2'.
+> [ 0.008043] (3:receiver@Perl) I got a 'Message 3'.
+> [ 0.009995] (3:receiver@Perl) I got a 'finalize'.
+> [ 0.012065] (2:receiver@Ruby) I got a 'Message 4'.
+> [ 0.014016] (2:receiver@Ruby) I got a 'finalize'.
+> [ 0.014016] (1:sender@Tremblay) Goodbye now!
<platform version="4.1">
<!-- The master actor (with some arguments) -->
<actor host="Tremblay" function="sender">
- <argument value="3"/> <!-- Number of tasks -->
- <argument value="50000000"/> <!-- Computation size of tasks -->
- <argument value="1000000"/> <!-- Communication size of tasks -->
- <argument value="1"/> <!-- Number of receivers -->
+ <argument value="5"/> <!-- Number of messages -->
+ <argument value="1000000"/> <!-- Size of messages -->
+ <argument value="2"/> <!-- Number of receivers -->
</actor>
- <!-- The receiver processes -->
+ <!-- The receiver actors -->
<actor host="Ruby" function="receiver">
<argument value="0"/>
</actor>
+ <actor host="Perl" function="receiver">
+ <argument value="1"/>
+ </actor>
</platform>
static void sender(int argc, char* argv[])
{
xbt_assert(argc == 4, "Expecting 3 parameters from the XML deployment file but got %d", argc);
- long messages_count = xbt_str_parse_int(argv[1], "Invalid amount of tasks: %s");
- long msg_size = xbt_str_parse_int(argv[2], "Invalid communication size: %s");
+ long messages_count = xbt_str_parse_int(argv[1], "Invalid message count: %s");
+ long msg_size = xbt_str_parse_int(argv[2], "Invalid message size: %s");
long receivers_count = xbt_str_parse_int(argv[3], "Invalid amount of receivers: %s");
/* Array in which we store all ongoing communications */
simgrid_load_deployment(argv[2]);
simgrid_run();
-
XBT_INFO("Simulation time %g", simgrid_get_clock());
return 0;
<platform version="4.1">
<!-- The master actor (with some arguments) -->
<actor host="Tremblay" function="sender">
- <argument value="5"/> <!-- Number of messages -->
- <argument value="1000000"/> <!-- Size of messages -->
- <argument value="2"/> <!-- Number of receivers -->
+ <argument value="5"/> <!-- Number of messages -->
+ <argument value="1000000"/> <!-- Size of messages -->
+ <argument value="2"/> <!-- Number of receivers -->
</actor>
- <!-- The receiver processes. -->
+ <!-- The receiver actors. -->
<actor host="Ruby" function="receiver">
<argument value="0"/>
</actor>
/* C interface */
SG_BEGIN_DECL
+XBT_PUBLIC void sg_comm_wait_all(sg_comm_t* comms, size_t count);
XBT_PUBLIC int sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout);
XBT_PUBLIC int sg_comm_wait_any(sg_comm_t* comms, size_t count);
} // namespace s4u
} // namespace simgrid
/* **************************** Public C interface *************************** */
+void sg_comm_wait_all(sg_comm_t* comms, size_t count)
+{
+ std::vector<simgrid::s4u::CommPtr> s4u_comms;
+ for (unsigned int i = 0; i < count; i++)
+ s4u_comms.emplace_back(comms[i]);
+
+ simgrid::s4u::Comm::wait_all(&s4u_comms);
+ for (unsigned int i = 0; i < count; i++)
+ s4u_comms[i]->unref();
+}
+
int sg_comm_wait_any(sg_comm_t* comms, size_t count)
{
return sg_comm_wait_any_for(comms, count, -1);
}
+
int sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout)
{
std::vector<simgrid::s4u::CommPtr> s4u_comms;
# C examples
-foreach(x async-wait async-waitall async-waitany
+foreach(x async-wait async-waitany
cloud-capping cloud-migration cloud-two-tasks cloud-simple
get_sender host_on_off host_on_off_recv
process-lifetime
${CMAKE_CURRENT_SOURCE_DIR}/async-wait/async-wait2_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/async-wait/async-wait3_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/async-wait/async-wait4_d.xml
- ${CMAKE_CURRENT_SOURCE_DIR}/async-waitall/async-waitall_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/async-waitany/async-waitany_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/io-file-remote/io-file-remote_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/platform-properties/platform-properties_d.xml
if(enable_msg)
foreach(x
- async-wait async-waitall
+ async-wait
app-bittorrent app-chainsend
cloud-capping cloud-migration cloud-two-tasks cloud-simple
energy-pstate
+++ /dev/null
-/* Copyright (c) 2010-2020. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/msg.h"
-
-#include <stdio.h> /* snprintf */
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(msg_async_waitall, "Messages specific for this msg example");
-
-static int sender(int argc, char* argv[])
-{
- xbt_assert(argc == 5, "This function expects 4 parameters from the XML deployment file");
- long number_of_tasks = xbt_str_parse_int(argv[1], "Invalid amount of tasks: %s");
- double task_comp_size = xbt_str_parse_double(argv[2], "Invalid computational size: %s");
- double task_comm_size = xbt_str_parse_double(argv[3], "Invalid communication size: %s");
- long receivers_count = xbt_str_parse_int(argv[4], "Invalid amount of receivers: %s");
-
- msg_comm_t* comm = xbt_new(msg_comm_t, number_of_tasks + receivers_count);
- for (int i = 0; i < number_of_tasks; i++) {
- char mailbox[80];
- char taskname[80];
- snprintf(mailbox, 79, "receiver-%ld", i % receivers_count);
- snprintf(taskname, 79, "Task_%d", i);
- msg_task_t task = MSG_task_create(taskname, task_comp_size, task_comm_size, NULL);
- comm[i] = MSG_task_isend(task, mailbox);
- XBT_INFO("Send to receiver-%ld Task_%d", i % receivers_count, i);
- }
- for (int i = 0; i < receivers_count; i++) {
- char mailbox[80];
- snprintf(mailbox, 79, "receiver-%ld", i % receivers_count);
- msg_task_t task = MSG_task_create("finalize", 0, 0, 0);
- comm[i + number_of_tasks] = MSG_task_isend(task, mailbox);
- XBT_INFO("Send to receiver-%ld finalize", i % receivers_count);
- }
-
- /* Here we are waiting for the completion of all communications */
- MSG_comm_waitall(comm, (number_of_tasks + receivers_count), -1);
- for (int i = 0; i < number_of_tasks + receivers_count; i++)
- MSG_comm_destroy(comm[i]);
-
- XBT_INFO("Goodbye now!");
- xbt_free(comm);
- return 0;
-}
-
-static int receiver(int argc, char* argv[])
-{
- xbt_assert(argc == 2, "This function expects 1 parameter from the XML deployment file");
- int id = xbt_str_parse_int(argv[1], "Any process of this example must have a numerical name, not %s");
-
- char mailbox[80];
- snprintf(mailbox, 79, "receiver-%d", id);
-
- MSG_process_sleep(10);
- while (1) {
- XBT_INFO("Wait to receive a task");
- msg_task_t task = NULL;
- msg_comm_t comm = MSG_task_irecv(&task, mailbox);
- msg_error_t res = MSG_comm_wait(comm, -1);
- xbt_assert(res == MSG_OK, "MSG_task_get failed");
- MSG_comm_destroy(comm);
- XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
- if (strcmp(MSG_task_get_name(task), "finalize") == 0) {
- MSG_task_destroy(task);
- break;
- }
-
- XBT_INFO("Processing \"%s\"", MSG_task_get_name(task));
- MSG_task_execute(task);
- XBT_INFO("\"%s\" done", MSG_task_get_name(task));
- MSG_task_destroy(task);
- }
- XBT_INFO("I'm done. See you!");
- return 0;
-}
-
-int main(int argc, char* argv[])
-{
- MSG_init(&argc, argv);
- xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n"
- "\tExample: %s msg_platform.xml msg_deployment.xml\n",
- argv[0], argv[0]);
-
- MSG_create_environment(argv[1]);
- MSG_function_register("sender", sender);
- MSG_function_register("receiver", receiver);
- MSG_launch_application(argv[2]);
-
- msg_error_t res = MSG_main();
-
- XBT_INFO("Simulation time %g", MSG_get_clock());
-
- return res != MSG_OK;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-p Test1 MSG_comm_waitall() for sender
-
-! output sort 19
-$ ${bindir:=.}/async-waitall ${platfdir:=.}/small_platform_fatpipe.xml ${srcdir:=.}/async-waitall_d.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
-> [ 0.000000] (1:sender@Tremblay) Send to receiver-0 Task_0
-> [ 0.000000] (1:sender@Tremblay) Send to receiver-0 Task_1
-> [ 0.000000] (1:sender@Tremblay) Send to receiver-0 Task_2
-> [ 0.000000] (1:sender@Tremblay) Send to receiver-0 finalize
-> [ 10.000000] (2:receiver@Ruby) Wait to receive a task
-> [ 10.004022] (2:receiver@Ruby) Received "Task_0"
-> [ 10.004022] (2:receiver@Ruby) Processing "Task_0"
-> [ 10.513732] (2:receiver@Ruby) "Task_0" done
-> [ 10.513732] (2:receiver@Ruby) Wait to receive a task
-> [ 10.517753] (2:receiver@Ruby) Received "Task_1"
-> [ 10.517753] (2:receiver@Ruby) Processing "Task_1"
-> [ 11.027463] (2:receiver@Ruby) "Task_1" done
-> [ 11.027463] (2:receiver@Ruby) Wait to receive a task
-> [ 11.031485] (2:receiver@Ruby) Received "Task_2"
-> [ 11.031485] (2:receiver@Ruby) Processing "Task_2"
-> [ 11.541195] (2:receiver@Ruby) "Task_2" done
-> [ 11.541195] (2:receiver@Ruby) Wait to receive a task
-> [ 11.543146] (0:maestro@) Simulation time 11.5431
-> [ 11.543146] (1:sender@Tremblay) Goodbye now!
-> [ 11.543146] (2:receiver@Ruby) Received "finalize"
-> [ 11.543146] (2:receiver@Ruby) I'm done. See you!