auto comm2 = mailbox2->put_async((void*)666, 2);
XBT_INFO("Calling wait_any..");
- std::vector<sg4::CommPtr> pending_comms;
- pending_comms.push_back(comm1);
- pending_comms.push_back(comm2);
+ sg4::ActivitySet pending_comms;
+ pending_comms.push(comm1);
+ pending_comms.push(comm2);
try {
- long index = sg4::Comm::wait_any(pending_comms);
- XBT_INFO("Wait any returned index %ld (comm to %s)", index, pending_comms.at(index)->get_mailbox()->get_cname());
+ auto* acti = pending_comms.wait_any().get();
+ XBT_INFO("Wait any returned comm to %s", dynamic_cast<sg4::Comm*>(acti)->get_mailbox()->get_cname());
} catch (const simgrid::NetworkFailureException&) {
XBT_INFO("Sender has experienced a network failure exception, so it knows that something went wrong");
XBT_INFO("Now it needs to figure out which of the two comms failed by looking at their state:");
XBT_INFO("Waiting on a FAILED comm raises an exception: '%s'", e.what());
}
XBT_INFO("Wait for remaining comm, just to be nice");
- pending_comms.erase(pending_comms.begin());
- sg4::Comm::wait_any(pending_comms);
+ pending_comms.wait_all();
}
};
foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
app-masterworkers
- comm-wait comm-waitany comm-failure comm-host2host comm-pingpong
+ comm-wait comm-failure comm-host2host comm-pingpong
comm-ready comm-suspend comm-throttling comm-waituntil
exec-async exec-basic exec-dvfs exec-remote exec-ptask
task-io task-simple task-switch-host task-variable-load
+++ /dev/null
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-"""
-This example shows how to block on the completion of a set of communications.
-
-As for the other asynchronous examples, the sender initiate all the messages it wants to send and
-pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
-
-The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
-will notice events as soon as they occur even if it does not follow the order of the container.
-
-Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
-other messages of this application. As expected, the trace shows that the finalize of worker 1 is
-processed before 'Message 5' that is sent to worker 0.
-"""
-
-import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
-
-
-def sender(messages_count, msg_size, receivers_count):
- # List in which we store all ongoing communications
- pending_comms = []
-
- # Vector of the used mailboxes
- mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
- for i in range(0, receivers_count)]
-
- # Start dispatching all messages to receivers, in a round robin fashion
- for i in range(0, messages_count):
- content = "Message {:d}".format(i)
- mbox = mboxes[i % receivers_count]
-
- this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
-
- # Create a communication representing the ongoing communication, and store it in pending_comms
- comm = mbox.put_async(content, msg_size)
- pending_comms.append(comm)
-
- # Start sending messages to let the workers know that they should stop
- for i in range(0, receivers_count):
- mbox = mboxes[i]
- this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
- comm = mbox.put_async("finalize", 0)
- pending_comms.append(comm)
-
- this_actor.info("Done dispatching all messages")
-
- # Now that all message exchanges were initiated, wait for their completion, in order of completion.
- #
- # This loop waits for first terminating message with wait_any() and remove it with del, until all comms are
- # terminated.
- # Even in this simple example, the pending comms do not terminate in the exact same order of creation.
- while pending_comms:
- changed_pos = Comm.wait_any(pending_comms)
- del pending_comms[changed_pos]
- if changed_pos != 0:
- this_actor.info(
- "Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first."
- .format(changed_pos))
-
- this_actor.info("Goodbye now!")
-
-
-def receiver(my_id):
- mbox = Mailbox.by_name("receiver-{:d}".format(my_id))
- this_actor.info("Wait for my first message")
- while True:
- received = mbox.get()
- this_actor.info("I got a '{:s}'.".format(received))
- if received == "finalize":
- break # If it's a finalize message, we're done.
-
-if __name__ == '__main__':
- e = Engine(sys.argv)
-
- # Load the platform description
- e.load_platform(sys.argv[1])
-
- Actor.create("sender", Host.by_name("Tremblay"), sender, 6, 1000000, 2)
- Actor.create("receiver", Host.by_name("Fafard"), receiver, 0)
- Actor.create("receiver", Host.by_name("Jupiter"), receiver, 1)
-
- e.run()
+++ /dev/null
-#!/usr/bin/env tesh
-
-p Testing Comm.wait_any()
-
-! output sort 19
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitany.py ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)'
-> [ 0.000000] (2:receiver@Fafard) Wait for my first message
-> [ 0.000000] (3:receiver@Jupiter) Wait for my first message
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 5' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [ 0.158397] (2:receiver@Fafard) I got a 'Message 0'.
-> [ 0.169155] (3:receiver@Jupiter) I got a 'Message 1'.
-> [ 0.316794] (2:receiver@Fafard) I got a 'Message 2'.
-> [ 0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
-> [ 0.475190] (2:receiver@Fafard) I got a 'Message 4'.
-> [ 0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [ 0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
-> [ 0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
-> [ 0.526478] (3:receiver@Jupiter) I got a 'finalize'.
-> [ 0.526478] (1:sender@Tremblay) Goodbye now!
std::vector<ActivityPtr> activities_; // Use vectors, not sets for better reproductibility accross architectures
std::vector<ActivityPtr> failed_activities_;
+ void handle_failed_activities();
+
public:
ActivitySet() = default;
ActivitySet(const std::vector<ActivityPtr> init) : activities_(init) {}
return ret;
}
+void ActivitySet::handle_failed_activities()
+{
+ for (size_t i = 0; i < activities_.size(); i++) {
+ auto act = activities_[i];
+ if (act->pimpl_->get_state() == kernel::activity::State::FAILED) {
+ act->complete(Activity::State::FAILED);
+
+ failed_activities_.push_back(act);
+ activities_[i] = activities_[activities_.size() - 1];
+ activities_.resize(activities_.size() - 1);
+ i--; // compensate the i++ occuring at the end of the loop
+ }
+ }
+}
+
ActivityPtr ActivitySet::wait_any_for(double timeout)
{
std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
- ssize_t changed_pos = kernel::actor::simcall_blocking(
- [&observer] {
- kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
- observer.get_timeout());
- },
- &observer);
- if (changed_pos == -1)
- throw TimeoutException(XBT_THROW_POINT, "Timeouted");
-
- auto ret = activities_.at(changed_pos);
- erase(ret);
- ret->complete(Activity::State::FINISHED);
- return ret;
+ try {
+ ssize_t changed_pos = kernel::actor::simcall_blocking(
+ [&observer] {
+ kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+ observer.get_timeout());
+ },
+ &observer);
+ if (changed_pos == -1)
+ throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+
+ auto ret = activities_.at(changed_pos);
+ erase(ret);
+ ret->complete(Activity::State::FINISHED);
+ return ret;
+ } catch (const HostFailureException& e) {
+ handle_failed_activities();
+ throw e;
+ } catch (const NetworkFailureException& e) {
+ handle_failed_activities();
+ throw e;
+ } catch (const StorageFailureException& e) {
+ handle_failed_activities();
+ throw e;
+ }
}
ActivityPtr ActivitySet::get_failed_activity()