From 2a6c5ce52acb820ea198f6c97cfbe0060219ed4f Mon Sep 17 00:00:00 2001 From: SUTER Frederic Date: Tue, 24 Aug 2021 00:43:30 +0200 Subject: [PATCH] add a FAILED state to activities. tested on comm and exec --- MANIFEST.in | 4 + examples/cpp/CMakeLists.txt | 4 +- .../cpp/comm-failure/s4u-comm-failure.cpp | 130 ++++++++++++++++++ .../cpp/comm-failure/s4u-comm-failure.tesh | 19 +++ .../cpp/exec-failure/s4u-exec-failure.cpp | 84 +++++++++++ .../cpp/exec-failure/s4u-exec-failure.tesh | 16 +++ include/simgrid/s4u/Activity.hpp | 2 +- src/kernel/activity/CommImpl.cpp | 13 +- src/kernel/activity/ExecImpl.cpp | 2 +- src/kernel/activity/IoImpl.cpp | 1 + src/s4u/s4u_Activity.cpp | 9 ++ src/s4u/s4u_Comm.cpp | 21 ++- 12 files changed, 297 insertions(+), 8 deletions(-) create mode 100644 examples/cpp/comm-failure/s4u-comm-failure.cpp create mode 100644 examples/cpp/comm-failure/s4u-comm-failure.tesh create mode 100644 examples/cpp/exec-failure/s4u-exec-failure.cpp create mode 100644 examples/cpp/exec-failure/s4u-exec-failure.tesh diff --git a/MANIFEST.in b/MANIFEST.in index 9c2f175b9c..6fe2661890 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -173,6 +173,8 @@ include examples/cpp/clusters-multicpu/s4u-clusters-multicpu.cpp include examples/cpp/clusters-multicpu/s4u-clusters-multicpu.tesh include examples/cpp/comm-dependent/s4u-comm-dependent.cpp include examples/cpp/comm-dependent/s4u-comm-dependent.tesh +include examples/cpp/comm-failure/s4u-comm-failure.cpp +include examples/cpp/comm-failure/s4u-comm-failure.tesh include examples/cpp/comm-host2host/s4u-comm-host2host.cpp include examples/cpp/comm-host2host/s4u-comm-host2host.tesh include examples/cpp/comm-pingpong/s4u-comm-pingpong.cpp @@ -242,6 +244,8 @@ include examples/cpp/exec-dependent/s4u-exec-dependent.cpp include examples/cpp/exec-dependent/s4u-exec-dependent.tesh include examples/cpp/exec-dvfs/s4u-exec-dvfs.cpp include examples/cpp/exec-dvfs/s4u-exec-dvfs.tesh +include examples/cpp/exec-failure/s4u-exec-failure.cpp +include examples/cpp/exec-failure/s4u-exec-failure.tesh include examples/cpp/exec-ptask-multicore/s4u-exec-ptask-multicore.cpp include examples/cpp/exec-ptask-multicore/s4u-exec-ptask-multicore.tesh include examples/cpp/exec-ptask/s4u-exec-ptask.cpp diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 960123b028..a266bd43cf 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -64,13 +64,13 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize app-bittorrent app-chainsend app-token-ring comm-pingpong comm-ready comm-serialize comm-suspend comm-wait comm-waitany comm-waitall comm-waituntil - comm-dependent comm-host2host + comm-dependent comm-host2host comm-failure cloud-capping cloud-migration cloud-simple dht-chord dht-kademlia energy-exec energy-boot energy-link energy-vm energy-exec-ptask energy-wifi engine-filtering exec-async exec-basic exec-dvfs exec-remote exec-waitany exec-waitfor exec-dependent exec-unassigned - exec-ptask-multicore exec-cpu-nonlinear exec-cpu-factors + exec-ptask-multicore exec-cpu-nonlinear exec-cpu-factors exec-failure maestro-set mc-bugged1 mc-bugged2 mc-electric-fence mc-failing-assert network-wifi diff --git a/examples/cpp/comm-failure/s4u-comm-failure.cpp b/examples/cpp/comm-failure/s4u-comm-failure.cpp new file mode 100644 index 0000000000..cc9a1271d6 --- /dev/null +++ b/examples/cpp/comm-failure/s4u-comm-failure.cpp @@ -0,0 +1,130 @@ +/* Copyright (c) 2021. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +/* This example shows how to serialize a set of communications going through a link + * + * As for the other asynchronous examples, the sender initiates all the messages it wants to send and + * pack the resulting simgrid::s4u::CommPtr objects in a vector. + * At the same time, the receiver starts receiving all messages asynchronously. Without serialization, + * all messages would be received at the same timestamp in the receiver. + * + * However, as they will be serialized in a link of the platform, the messages arrive 2 by 2. + * + * The sender then blocks until all ongoing communication terminate, using simgrid::s4u::Comm::wait_all() + */ + +#include + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_failure, "Messages specific for this s4u example"); +namespace sg4 = simgrid::s4u; + +class Sender { + std::string mailbox1_name; + std::string mailbox2_name; + +public: + Sender(std::string mailbox1_name, std::string mailbox2_name) + : mailbox1_name(mailbox1_name), mailbox2_name(mailbox2_name) + { + } + + void operator()() + { + auto mailbox1 = sg4::Mailbox::by_name(mailbox1_name); + auto mailbox2 = sg4::Mailbox::by_name(mailbox2_name); + + XBT_INFO("Initiating asynchronous send to %s", mailbox1->get_cname()); + auto comm1 = mailbox1->put_async((void*)666, 5); + XBT_INFO("Initiating asynchronous send to %s", mailbox2->get_cname()); + auto comm2 = mailbox2->put_async((void*)666, 2); + + XBT_INFO("Calling wait_any.."); + std::vector pending_comms; + pending_comms.push_back(comm1); + pending_comms.push_back(comm2); + long index; + try { + index = sg4::Comm::wait_any(pending_comms); + XBT_INFO("Wait any returned index %ld (comm to %s)", index, pending_comms.at(index)->get_mailbox()->get_cname()); + } catch (simgrid::NetworkFailureException& e) { + XBT_INFO("Sender has experienced a network failure exception, so it knows that something went wrong"); + XBT_INFO("Now it needs to figure out which of the two comms failed by looking at their state"); + } + + XBT_INFO("Comm to %s has state: %s", comm1->get_mailbox()->get_cname(), comm1->get_state_str()); + XBT_INFO("Comm to %s has state: %s", comm2->get_mailbox()->get_cname(), comm2->get_state_str()); + + try { + comm1->wait(); + } catch (simgrid::NetworkFailureException& e) { + XBT_INFO("Waiting on a FAILED comm raises an exception: '%s'", e.what()); + } + XBT_INFO("Wait for remaining comm, just to be nice"); + pending_comms.erase(pending_comms.begin()); + index = simgrid::s4u::Comm::wait_any(pending_comms); + } +}; + +class Receiver { + std::string mailbox_name; + +public: + explicit Receiver(std::string mailbox_name) : mailbox_name(mailbox_name) {} + + void operator()() + { + auto mailbox = sg4::Mailbox::by_name(mailbox_name); + XBT_INFO("Receiver posting a receive..."); + try { + mailbox->get(); + XBT_INFO("Receiver has received successfully!"); + } catch (simgrid::NetworkFailureException& e) { + XBT_INFO("Receiver has experience a network failure exception"); + } + } +}; + +class LinkKiller { + std::string link_name; + +public: + explicit LinkKiller(std::string link_name) : link_name(link_name) {} + + void operator()() + { + auto link_to_kill = sg4::Link::by_name(link_name); + XBT_INFO("LinkKiller sleeping 10 seconds..."); + sg4::this_actor::sleep_for(10.0); + XBT_INFO("LinkKiller turning off link %s", link_to_kill->get_cname()); + link_to_kill->turn_off(); + XBT_INFO("LinkKiller killed. exiting"); + } +}; + +int main(int argc, char** argv) +{ + + sg4::Engine engine(&argc, argv); + auto* zone = sg4::create_full_zone("AS0"); + auto* host1 = zone->create_host("Host1", "1f"); + auto* host2 = zone->create_host("Host2", "1f"); + auto* host3 = zone->create_host("Host3", "1f"); + + sg4::LinkInRoute linkto2{zone->create_link("linkto2", "1bps")->seal()}; + sg4::LinkInRoute linkto3{zone->create_link("linkto3", "1bps")->seal()}; + + zone->add_route(host1->get_netpoint(), host2->get_netpoint(), nullptr, nullptr, {linkto2}, false); + zone->add_route(host1->get_netpoint(), host3->get_netpoint(), nullptr, nullptr, {linkto3}, false); + zone->seal(); + + sg4::Actor::create("Sender", host1, Sender("mailbox2", "mailbox3")); + sg4::Actor::create("Receiver", host2, Receiver("mailbox2"))->daemonize(); + sg4::Actor::create("Receiver", host3, Receiver("mailbox3"))->daemonize(); + sg4::Actor::create("LinkKiller", host1, LinkKiller("linkto2"))->daemonize(); + + engine.run(); + + return 0; +} diff --git a/examples/cpp/comm-failure/s4u-comm-failure.tesh b/examples/cpp/comm-failure/s4u-comm-failure.tesh new file mode 100644 index 0000000000..c0b12090f4 --- /dev/null +++ b/examples/cpp/comm-failure/s4u-comm-failure.tesh @@ -0,0 +1,19 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-comm-failure "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +> [ 0.000000] (4:LinkKiller@Host1) LinkKiller sleeping 10 seconds... +> [ 0.000000] (2:Receiver@Host2) Receiver posting a receive... +> [ 0.000000] (3:Receiver@Host3) Receiver posting a receive... +> [ 0.000000] (1:Sender@Host1) Initiating asynchronous send to mailbox2 +> [ 0.000000] (1:Sender@Host1) Initiating asynchronous send to mailbox3 +> [ 0.000000] (1:Sender@Host1) Calling wait_any.. +> [ 10.000000] (4:LinkKiller@Host1) LinkKiller turning off link linkto2 +> [ 10.000000] (4:LinkKiller@Host1) LinkKiller killed. exiting +> [ 10.000000] (2:Receiver@Host2) Receiver has experience a network failure exception +> [ 10.000000] (1:Sender@Host1) Sender has experienced a network failure exception, so it knows that something went wrong +> [ 10.000000] (1:Sender@Host1) Now it needs to figure out which of the two comms failed by looking at their state +> [ 10.000000] (1:Sender@Host1) Comm to mailbox2 has state: FAILED +> [ 10.000000] (1:Sender@Host1) Comm to mailbox3 has state: STARTED +> [ 10.000000] (1:Sender@Host1) Waiting on a FAILED comm raises an exception: 'Cannot wait for a failed communication' +> [ 10.000000] (1:Sender@Host1) Wait for remaining comm, just to be nice +> [ 16.494845] (3:Receiver@Host3) Receiver has received successfully! diff --git a/examples/cpp/exec-failure/s4u-exec-failure.cpp b/examples/cpp/exec-failure/s4u-exec-failure.cpp new file mode 100644 index 0000000000..f2f9b7e436 --- /dev/null +++ b/examples/cpp/exec-failure/s4u-exec-failure.cpp @@ -0,0 +1,84 @@ +/* Copyright (c) 2021. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +/* This example shows how to serialize a set of communications going through a link + * + * As for the other asynchronous examples, the sender initiates all the messages it wants to send and + * pack the resulting simgrid::s4u::CommPtr objects in a vector. + * At the same time, the receiver starts receiving all messages asynchronously. Without serialization, + * all messages would be received at the same timestamp in the receiver. + * + * However, as they will be serialized in a link of the platform, the messages arrive 2 by 2. + * + * The sender then blocks until all ongoing communication terminate, using simgrid::s4u::Comm::wait_all() + */ + +#include + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_failure, "Messages specific for this s4u example"); +namespace sg4 = simgrid::s4u; + +static void dispatcher(sg4::Host* host1, sg4::Host* host2) +{ + std::vector pending_execs; + XBT_INFO("Initiating asynchronous exec on %s", host1->get_cname()); + auto exec1 = sg4::this_actor::exec_init(20)->set_host(host1); + pending_execs.push_back(exec1); + exec1->start(); + XBT_INFO("Initiating asynchronous exec on %s", host2->get_cname()); + auto exec2 = sg4::this_actor::exec_init(20)->set_host(host2); + pending_execs.push_back(exec2); + exec2->start(); + + XBT_INFO("Calling wait_any.."); + long index; + try { + index = sg4::Exec::wait_any(pending_execs); + XBT_INFO("Wait any returned index %ld (exec on %s)", index, pending_execs.at(index)->get_host()->get_cname()); + } catch (simgrid::HostFailureException& e) { + XBT_INFO("Dispatcher has experienced a host failure exception, so it knows that something went wrong"); + XBT_INFO("Now it needs to figure out which of the two execs failed by looking at their state"); + } + + XBT_INFO("Exec on %s has state: %s", pending_execs[0]->get_host()->get_cname(), pending_execs[0]->get_state_str()); + XBT_INFO("Exec on %s has state: %s", pending_execs[1]->get_host()->get_cname(), pending_execs[1]->get_state_str()); + + try { + pending_execs[1]->wait(); + } catch (simgrid::HostFailureException& e) { + XBT_INFO("Waiting on a FAILED exec raises an exception: '%s'", e.what()); + } + pending_execs.pop_back(); + XBT_INFO("Wait for remaining exec, just to be nice"); + index = simgrid::s4u::Exec::wait_any(pending_execs); + XBT_INFO("Dispatcher ends"); +} + +static void host_killer(sg4::Host* to_kill) +{ + XBT_INFO("HostKiller sleeping 10 seconds..."); + sg4::this_actor::sleep_for(10.0); + XBT_INFO("HostKiller turning off host %s", to_kill->get_cname()); + to_kill->turn_off(); + XBT_INFO("HostKiller ends"); +} + +int main(int argc, char** argv) +{ + + sg4::Engine engine(&argc, argv); + + auto* zone = sg4::create_full_zone("AS0"); + auto* host1 = zone->create_host("Host1", "1f"); + auto* host2 = zone->create_host("Host2", "1f"); + zone->seal(); + + sg4::Actor::create("Dispatcher", host1, dispatcher, host1, host2); + sg4::Actor::create("HostKiller", host1, host_killer, host2)->daemonize(); + + engine.run(); + + return 0; +} diff --git a/examples/cpp/exec-failure/s4u-exec-failure.tesh b/examples/cpp/exec-failure/s4u-exec-failure.tesh new file mode 100644 index 0000000000..8c37127462 --- /dev/null +++ b/examples/cpp/exec-failure/s4u-exec-failure.tesh @@ -0,0 +1,16 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-exec-failure "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +> [ 0.000000] (1:Dispatcher@Host1) Initiating asynchronous exec on Host1 +> [ 0.000000] (2:HostKiller@Host1) HostKiller sleeping 10 seconds... +> [ 0.000000] (1:Dispatcher@Host1) Initiating asynchronous exec on Host2 +> [ 0.000000] (1:Dispatcher@Host1) Calling wait_any.. +> [ 10.000000] (2:HostKiller@Host1) HostKiller turning off host Host2 +> [ 10.000000] (1:Dispatcher@Host1) Dispatcher has experienced a host failure exception, so it knows that something went wrong +> [ 10.000000] (1:Dispatcher@Host1) Now it needs to figure out which of the two execs failed by looking at their state +> [ 10.000000] (1:Dispatcher@Host1) Exec on Host1 has state: STARTED +> [ 10.000000] (1:Dispatcher@Host1) Exec on Host2 has state: FAILED +> [ 10.000000] (1:Dispatcher@Host1) Waiting on a FAILED exec raises an exception: 'Cannot wait for a failed exec' +> [ 10.000000] (1:Dispatcher@Host1) Wait for remaining exec, just to be nice +> [ 10.000000] (2:HostKiller@Host1) HostKiller ends +> [ 20.000000] (1:Dispatcher@Host1) Dispatcher ends diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index 3e31a941ff..0b87eb52ec 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -34,7 +34,7 @@ class XBT_PUBLIC Activity { public: // enum class State { ... } - XBT_DECLARE_ENUM_CLASS(State, INITED, STARTING, STARTED, CANCELED, FINISHED); + XBT_DECLARE_ENUM_CLASS(State, INITED, STARTING, STARTED, FAILED, CANCELED, FINISHED); protected: Activity() = default; diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index a91af4480b..8785fb7bac 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -591,6 +591,10 @@ void CommImpl::finish() simcall->issuer_->context_->set_wannadie(); } else { switch (state_) { + case State::FAILED: + simcall->issuer_->exception_ = + std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed")); + break; case State::SRC_TIMEOUT: simcall->issuer_->exception_ = std::make_exception_ptr( TimeoutException(XBT_THROW_POINT, "Communication timeouted because of the sender")); @@ -604,17 +608,21 @@ void CommImpl::finish() case State::SRC_HOST_FAILURE: if (simcall->issuer_ == src_actor_) simcall->issuer_->context_->set_wannadie(); - else + else { + state_ = kernel::activity::State::FAILED; simcall->issuer_->exception_ = std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed")); + } break; case State::DST_HOST_FAILURE: if (simcall->issuer_ == dst_actor_) simcall->issuer_->context_->set_wannadie(); - else + else { + state_ = kernel::activity::State::FAILED; simcall->issuer_->exception_ = std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed")); + } break; case State::LINK_FAILURE: @@ -630,6 +638,7 @@ void CommImpl::finish() } else { XBT_DEBUG("I'm neither source nor dest"); } + state_ = kernel::activity::State::FAILED; simcall->issuer_->throw_exception( std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Link failure"))); break; diff --git a/src/kernel/activity/ExecImpl.cpp b/src/kernel/activity/ExecImpl.cpp index 617cc860fb..3b55aff1ca 100644 --- a/src/kernel/activity/ExecImpl.cpp +++ b/src/kernel/activity/ExecImpl.cpp @@ -184,6 +184,7 @@ void ExecImpl::finish() } switch (state_) { case State::FAILED: + piface_->complete(s4u::Activity::State::FAILED); if (simcall->issuer_->get_host()->is_on()) simcall->issuer_->exception_ = std::make_exception_ptr(HostFailureException(XBT_THROW_POINT, "Host failed")); else /* else, the actor will be killed with no possibility to survive */ @@ -249,7 +250,6 @@ void ExecImpl::wait_any_for(actor::ActorImpl* issuer, const std::vectorsimcalls_.push_back(&issuer->simcall_); - /* see if the synchro is already finished */ if (exec->state_ != State::WAITING && exec->state_ != State::RUNNING) { exec->finish(); diff --git a/src/kernel/activity/IoImpl.cpp b/src/kernel/activity/IoImpl.cpp index 4f692b2990..5cd665c82d 100644 --- a/src/kernel/activity/IoImpl.cpp +++ b/src/kernel/activity/IoImpl.cpp @@ -129,6 +129,7 @@ void IoImpl::finish() switch (state_) { case State::FAILED: simcall->issuer_->context_->set_wannadie(); + piface_->complete(s4u::Activity::State::FAILED); simcall->issuer_->exception_ = std::make_exception_ptr(StorageFailureException(XBT_THROW_POINT, "Storage failed")); break; diff --git a/src/s4u/s4u_Activity.cpp b/src/s4u/s4u_Activity.cpp index a174119b82..ae3846b588 100644 --- a/src/s4u/s4u_Activity.cpp +++ b/src/s4u/s4u_Activity.cpp @@ -8,6 +8,8 @@ #include "simgrid/Exception.hpp" #include "simgrid/s4u/Activity.hpp" #include "simgrid/s4u/Engine.hpp" +#include "simgrid/s4u/Exec.hpp" +#include "simgrid/s4u/Io.hpp" #include "src/kernel/activity/ActivityImpl.hpp" #include "src/kernel/actor/ActorImpl.hpp" #include "src/kernel/actor/SimcallObserver.hpp" @@ -30,6 +32,13 @@ Activity* Activity::wait_for(double timeout) if (state_ == State::INITED) vetoable_start(); + if (state_ == State::FAILED) { + if (dynamic_cast(this)) + throw HostFailureException(XBT_THROW_POINT, "Cannot wait for a failed exec"); + if (dynamic_cast(this)) + throw StorageFailureException(XBT_THROW_POINT, "Cannot wait for a failed I/O"); + } + kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout}; if (kernel::actor::simcall_blocking( diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index bcf557de35..ca60e6fbfe 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -44,7 +44,17 @@ ssize_t Comm::wait_any_for(const std::vector& comms, double timeout) std::vector rcomms(comms.size()); std::transform(begin(comms), end(comms), begin(rcomms), [](const CommPtr& comm) { return static_cast(comm->pimpl_.get()); }); - ssize_t changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout); + ssize_t changed_pos = -1; + try { + changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout); + } catch (const NetworkFailureException& e) { + for (auto c : comms) { + if (c->pimpl_->state_ == kernel::activity::State::FAILED) { + c->complete(State::FAILED); + } + } + e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode."); + } if (changed_pos != -1) comms.at(changed_pos)->complete(State::FINISHED); return changed_pos; @@ -214,6 +224,8 @@ Comm* Comm::wait_for(double timeout) switch (state_) { case State::FINISHED: break; + case State::FAILED: + throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication"); case State::INITED: case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication @@ -232,7 +244,12 @@ Comm* Comm::wait_for(double timeout) break; case State::STARTED: - simcall_comm_wait(get_impl(), timeout); + try { + simcall_comm_wait(get_impl(), timeout); + } catch (const NetworkFailureException& e) { + complete(State::FAILED); + e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode."); + } break; case State::CANCELED: -- 2.20.1