From bb60211deea857d1f67b2613bc34f38babaeb53b Mon Sep 17 00:00:00 2001 From: SUTER Frederic Date: Tue, 27 Apr 2021 15:39:39 +0200 Subject: [PATCH] add a (modern) wait_any for Io activities. Thx agier! --- .../cpp/io-dependent/s4u-io-dependent.cpp | 13 ++++- .../cpp/io-dependent/s4u-io-dependent.tesh | 2 + include/simgrid/s4u/Io.hpp | 5 ++ src/kernel/activity/IoImpl.cpp | 55 ++++++++++++++++++- src/kernel/activity/IoImpl.hpp | 1 + src/kernel/actor/SimcallObserver.cpp | 13 +++++ src/kernel/actor/SimcallObserver.hpp | 16 ++++++ src/s4u/s4u_Io.cpp | 19 +++++++ 8 files changed, 120 insertions(+), 4 deletions(-) diff --git a/examples/cpp/io-dependent/s4u-io-dependent.cpp b/examples/cpp/io-dependent/s4u-io-dependent.cpp index 5b296f38b9..62fce80f17 100644 --- a/examples/cpp/io-dependent/s4u-io-dependent.cpp +++ b/examples/cpp/io-dependent/s4u-io-dependent.cpp @@ -11,11 +11,15 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_test, "Messages specific for this s4u example") static void test() { + std::vector pending_ios; + simgrid::s4u::ExecPtr bob_compute = simgrid::s4u::this_actor::exec_init(1e9); simgrid::s4u::IoPtr bob_write = simgrid::s4u::Host::current()->get_disks().front()->io_init(4000000, simgrid::s4u::Io::OpType::WRITE); + pending_ios.push_back(bob_write); simgrid::s4u::IoPtr carl_read = simgrid::s4u::Host::by_name("carl")->get_disks().front()->io_init(4000000, simgrid::s4u::Io::OpType::READ); + pending_ios.push_back(carl_read); simgrid::s4u::ExecPtr carl_compute = simgrid::s4u::Host::by_name("carl")->exec_init(1e9); // Name the activities (for logging purposes only) @@ -38,10 +42,13 @@ static void test() carl_read->vetoable_start(); carl_compute->vetoable_start(); - // Wait for their completion (should be replaced by a wait_any_for at some point) + // wait for the completion of all activities bob_compute->wait(); - bob_write->wait(); - carl_read->wait(); + while (not pending_ios.empty()) { + int changed_pos = simgrid::s4u::Io::wait_any(&pending_ios); + XBT_INFO("Io '%s' is complete", pending_ios[changed_pos]->get_cname()); + pending_ios.erase(pending_ios.begin() + changed_pos); + } carl_compute->wait(); } diff --git a/examples/cpp/io-dependent/s4u-io-dependent.tesh b/examples/cpp/io-dependent/s4u-io-dependent.tesh index ba01e1dbef..c9b9eccf11 100644 --- a/examples/cpp/io-dependent/s4u-io-dependent.tesh +++ b/examples/cpp/io-dependent/s4u-io-dependent.tesh @@ -5,7 +5,9 @@ $ ${bindir:=.}/s4u-io-dependent ${platfdir}/hosts_with_disks.xml --log=s4u_activ > [ 1.000000] (1:bob@bob) 'bob write' is assigned to a resource and all dependencies are solved. Let's start > [ 1.000000] (1:bob@bob) Remove a dependency from 'bob compute' on 'bob write' > [ 1.100000] (1:bob@bob) 'carl read' is assigned to a resource and all dependencies are solved. Let's start +> [ 1.100000] (1:bob@bob) Io 'bob write' is complete > [ 1.100000] (1:bob@bob) Remove a dependency from 'bob write' on 'carl read' +> [ 1.140000] (1:bob@bob) Io 'carl read' is complete > [ 1.140000] (1:bob@bob) 'carl compute' is assigned to a resource and all dependencies are solved. Let's start > [ 1.140000] (1:bob@bob) Remove a dependency from 'carl read' on 'carl compute' > [ 2.140000] (0:maestro@) Simulation time 2.14 diff --git a/include/simgrid/s4u/Io.hpp b/include/simgrid/s4u/Io.hpp index 5752373847..cb08c3d8f7 100644 --- a/include/simgrid/s4u/Io.hpp +++ b/include/simgrid/s4u/Io.hpp @@ -35,6 +35,11 @@ public: static IoPtr init(); Io* start() override; + /*! take a vector of s4u::IoPtr and return when one of them is finished. + * The return value is the rank of the first finished IoPtr. */ + static int wait_any(std::vector* ios) { return wait_any_for(ios, -1); } + /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/ + static int wait_any_for(std::vector* ios, double timeout); double get_remaining() const override; sg_size_t get_performed_ioops() const; diff --git a/src/kernel/activity/IoImpl.cpp b/src/kernel/activity/IoImpl.cpp index 527aa19a2a..425ae104f7 100644 --- a/src/kernel/activity/IoImpl.cpp +++ b/src/kernel/activity/IoImpl.cpp @@ -8,6 +8,7 @@ #include "simgrid/kernel/resource/Action.hpp" #include "simgrid/s4u/Host.hpp" #include "simgrid/s4u/Io.hpp" +#include "src/kernel/actor/SimcallObserver.hpp" #include "src/kernel/resource/DiskImpl.hpp" #include "src/mc/mc_replay.hpp" #include "src/simix/smx_private.hpp" @@ -94,8 +95,34 @@ void IoImpl::finish() { XBT_DEBUG("IoImpl::finish() in state %s", to_c_str(state_)); while (not simcalls_.empty()) { - const s_smx_simcall* simcall = simcalls_.front(); + smx_simcall_t simcall = simcalls_.front(); simcalls_.pop_front(); + + /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany + * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the + * simcall */ + + if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case + continue; // if process handling comm is killed + if (auto* observer = dynamic_cast(simcall->observer_)) { // simcall is a wait_any? + const auto& ios = observer->get_ios(); + + for (auto* io : ios) { + io->unregister_simcall(simcall); + + if (simcall->timeout_cb_) { + simcall->timeout_cb_->remove(); + simcall->timeout_cb_ = nullptr; + } + } + + if (not MC_is_active() && not MC_record_replay_is_active()) { + auto element = std::find(ios.begin(), ios.end(), this); + int rank = element != ios.end() ? static_cast(std::distance(ios.begin(), element)) : -1; + observer->set_result(rank); + } + } + switch (state_) { case State::FAILED: simcall->issuer_->context_->set_wannadie(); @@ -118,6 +145,32 @@ void IoImpl::finish() } } +void IoImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector& ios, double timeout) +{ + if (timeout < 0.0) { + issuer->simcall_.timeout_cb_ = nullptr; + } else { + issuer->simcall_.timeout_cb_ = simix::Timer::set(SIMIX_get_clock() + timeout, [issuer, &ios]() { + issuer->simcall_.timeout_cb_ = nullptr; + for (auto* io : ios) + io->unregister_simcall(&issuer->simcall_); + // default result (-1) is set in actor::IoWaitanySimcall + issuer->simcall_answer(); + }); + } + + for (auto* io : ios) { + /* associate this simcall to the the synchro */ + io->simcalls_.push_back(&issuer->simcall_); + + /* see if the synchro is already finished */ + if (io->state_ != State::WAITING && io->state_ != State::RUNNING) { + io->finish(); + break; + } + } +} + } // namespace activity } // namespace kernel } // namespace simgrid diff --git a/src/kernel/activity/IoImpl.hpp b/src/kernel/activity/IoImpl.hpp index fc4df57c87..758d9ec702 100644 --- a/src/kernel/activity/IoImpl.hpp +++ b/src/kernel/activity/IoImpl.hpp @@ -37,6 +37,7 @@ public: IoImpl* start(); void post() override; void finish() override; + static void wait_any_for(actor::ActorImpl* issuer, const std::vector& ios, double timeout); }; } // namespace activity } // namespace kernel diff --git a/src/kernel/actor/SimcallObserver.cpp b/src/kernel/actor/SimcallObserver.cpp index bb763ef0f8..11e9e2ce6f 100644 --- a/src/kernel/actor/SimcallObserver.cpp +++ b/src/kernel/actor/SimcallObserver.cpp @@ -132,6 +132,19 @@ std::string ExecutionWaitanySimcall::dot_label() const { return SimcallObserver::dot_label() + "Execution WAITANY"; } + +std::string IoWaitanySimcall::to_string(int times_considered) const +{ + std::string res = SimcallObserver::to_string(times_considered) + "I/O WAITANY"; + res += "(" + (timeout_ == -1.0 ? "" : std::to_string(timeout_)) + ")"; + return res; +} + +std::string IoWaitanySimcall::dot_label() const +{ + return SimcallObserver::dot_label() + "I/O WAITANY"; +} + } // namespace actor } // namespace kernel } // namespace simgrid diff --git a/src/kernel/actor/SimcallObserver.hpp b/src/kernel/actor/SimcallObserver.hpp index d386a30db1..a1a5936685 100644 --- a/src/kernel/actor/SimcallObserver.hpp +++ b/src/kernel/actor/SimcallObserver.hpp @@ -153,6 +153,22 @@ public: const std::vector& get_execs() const { return execs_; } double get_timeout() const { return timeout_; } }; + +class IoWaitanySimcall : public ResultingSimcall { + const std::vector& ios_; + const double timeout_; + +public: + IoWaitanySimcall(smx_actor_t actor, const std::vector& ios, double timeout) + : ResultingSimcall(actor, -1), ios_(ios), timeout_(timeout) + { + } + bool is_visible() const override { return false; } + std::string to_string(int times_considered) const override; + std::string dot_label() const override; + const std::vector& get_ios() const { return ios_; } + double get_timeout() const { return timeout_; } +}; } // namespace actor } // namespace kernel } // namespace simgrid diff --git a/src/s4u/s4u_Io.cpp b/src/s4u/s4u_Io.cpp index 733b481658..ef4846c583 100644 --- a/src/s4u/s4u_Io.cpp +++ b/src/s4u/s4u_Io.cpp @@ -8,6 +8,7 @@ #include "simgrid/s4u/Io.hpp" #include "src/kernel/activity/IoImpl.hpp" #include "src/kernel/actor/ActorImpl.hpp" +#include "src/kernel/actor/SimcallObserver.hpp" #include "xbt/log.h" namespace simgrid { @@ -45,6 +46,24 @@ Io* Io::start() return this; } +int Io::wait_any_for(std::vector* ios, double timeout) +{ + std::vector rios(ios->size()); + std::transform(begin(*ios), end(*ios), begin(rios), + [](const IoPtr& io) { return static_cast(io->pimpl_.get()); }); + + kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); + kernel::actor::IoWaitanySimcall observer{issuer, rios, timeout}; + int changed_pos = kernel::actor::simcall_blocking( + [&observer] { + kernel::activity::IoImpl::wait_any_for(observer.get_issuer(), observer.get_ios(), observer.get_timeout()); + }, + &observer); + if (changed_pos != -1) + ios->at(changed_pos)->complete(State::FINISHED); + return changed_pos; +} + IoPtr Io::set_disk(const_sg_disk_t disk) { xbt_assert(state_ == State::INITED || state_ == State::STARTING, "Cannot set disk once the Io is started"); -- 2.20.1