From f84705b633d5ebb1f5e71eaeac8204014285f225 Mon Sep 17 00:00:00 2001 From: Martin Quinson Date: Fri, 25 Feb 2022 01:56:04 +0100 Subject: [PATCH] Implement SemaphoreImpl::acquire_async (timeouts are allowed) --- .../c/synchro-semaphore/synchro-semaphore.c | 14 +-- .../synchro-semaphore/synchro-semaphore.tesh | 75 ++++++++-------- include/simgrid/forward.h | 2 + src/kernel/activity/ActivityImpl.cpp | 2 +- src/kernel/activity/MutexImpl.cpp | 13 ++- src/kernel/activity/MutexImpl.hpp | 3 +- src/kernel/activity/SemaphoreImpl.cpp | 90 +++++++++++++++---- src/kernel/activity/SemaphoreImpl.hpp | 33 ++++++- src/s4u/s4u_Activity.cpp | 1 + src/s4u/s4u_Semaphore.cpp | 4 +- 10 files changed, 165 insertions(+), 72 deletions(-) diff --git a/examples/c/synchro-semaphore/synchro-semaphore.c b/examples/c/synchro-semaphore/synchro-semaphore.c index 022f7a7309..c7124a40f2 100644 --- a/examples/c/synchro-semaphore/synchro-semaphore.c +++ b/examples/c/synchro-semaphore/synchro-semaphore.c @@ -23,22 +23,24 @@ static void peer(int argc, char* argv[]) double wait_time = xbt_str_parse_double(argv[i], "Invalid wait time"); i++; sg_actor_sleep_for(wait_time); - XBT_INFO("Trying to acquire %d (%sblocking)", i, sg_sem_would_block(sem) ? "" : "not "); // cover the two cases: with and without timeout if (i > 1) { - while (sg_sem_acquire_timeout(sem, 3.0)) - XBT_INFO("Timeout.. Try again %d", i); + XBT_INFO("Trying for 1 sec to acquire #%d (that is %sfree)", i, sg_sem_would_block(sem) ? "not " : ""); + while (sg_sem_acquire_timeout(sem, 1.)) { + XBT_INFO("Timeout.. Try #%d for another second.", i); + } } else { + XBT_INFO("Acquire #%d (that is %sfree)", i, sg_sem_would_block(sem) ? "not " : ""); sg_sem_acquire(sem); } - XBT_INFO("Acquired %d", i); + XBT_INFO("Acquired #%d", i); wait_time = xbt_str_parse_double(argv[i], "Invalid wait time"); i++; sg_actor_sleep_for(wait_time); - XBT_INFO("Releasing %d", i); + XBT_INFO("Releasing #%d", i); sg_sem_release(sem); - XBT_INFO("Released %d", i); + XBT_INFO("Released #%d", i); } sg_actor_sleep_for(50); XBT_INFO("Done"); diff --git a/examples/c/synchro-semaphore/synchro-semaphore.tesh b/examples/c/synchro-semaphore/synchro-semaphore.tesh index 42b97b3d85..9739ebcf72 100644 --- a/examples/c/synchro-semaphore/synchro-semaphore.tesh +++ b/examples/c/synchro-semaphore/synchro-semaphore.tesh @@ -2,43 +2,48 @@ p Testing synchronization with semaphores -! output sort 19 $ ${bindir:=.}/c-synchro-semaphore ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" > [ 0.000000] (0:maestro@) Semaphore initialized with capacity = 1 -> [ 0.000000] (1:Alice@Fafard) Trying to acquire 1 (not blocking) -> [ 0.000000] (1:Alice@Fafard) Acquired 1 -> [ 0.900000] (2:Bob@Fafard) Trying to acquire 1 (blocking) -> [ 1.000000] (1:Alice@Fafard) Releasing 2 -> [ 1.000000] (2:Bob@Fafard) Acquired 1 -> [ 1.000000] (1:Alice@Fafard) Released 2 -> [ 2.000000] (2:Bob@Fafard) Releasing 2 -> [ 2.000000] (2:Bob@Fafard) Released 2 -> [ 3.000000] (2:Bob@Fafard) Trying to acquire 3 (not blocking) -> [ 3.000000] (2:Bob@Fafard) Acquired 3 -> [ 4.000000] (1:Alice@Fafard) Trying to acquire 3 (blocking) -> [ 5.000000] (2:Bob@Fafard) Releasing 4 -> [ 5.000000] (1:Alice@Fafard) Acquired 3 -> [ 5.000000] (2:Bob@Fafard) Released 4 -> [ 7.000000] (2:Bob@Fafard) Trying to acquire 5 (blocking) -> [ 10.000000] (1:Alice@Fafard) Releasing 4 -> [ 10.000000] (2:Bob@Fafard) Timeout.. Try again 5 -> [ 10.000000] (2:Bob@Fafard) Acquired 5 -> [ 10.000000] (1:Alice@Fafard) Released 4 -> [ 10.000000] (2:Bob@Fafard) Releasing 6 -> [ 10.000000] (2:Bob@Fafard) Released 6 -> [ 10.000000] (2:Bob@Fafard) Trying to acquire 7 (not blocking) -> [ 10.000000] (2:Bob@Fafard) Acquired 7 -> [ 11.000000] (1:Alice@Fafard) Trying to acquire 5 (blocking) -> [ 14.000000] (1:Alice@Fafard) Timeout.. Try again 5 -> [ 15.000000] (2:Bob@Fafard) Releasing 8 -> [ 15.000000] (1:Alice@Fafard) Acquired 5 -> [ 15.000000] (2:Bob@Fafard) Released 8 -> [ 17.000000] (1:Alice@Fafard) Releasing 6 -> [ 17.000000] (1:Alice@Fafard) Released 6 -> [ 22.000000] (1:Alice@Fafard) Trying to acquire 7 (not blocking) -> [ 22.000000] (1:Alice@Fafard) Acquired 7 -> [ 22.000000] (1:Alice@Fafard) Releasing 8 -> [ 22.000000] (1:Alice@Fafard) Released 8 +> [ 0.000000] (1:Alice@Fafard) Acquire #1 (that is free) +> [ 0.000000] (1:Alice@Fafard) Acquired #1 +> [ 0.900000] (2:Bob@Fafard) Acquire #1 (that is not free) +> [ 1.000000] (1:Alice@Fafard) Releasing #2 +> [ 1.000000] (2:Bob@Fafard) Acquired #1 +> [ 1.000000] (1:Alice@Fafard) Released #2 +> [ 2.000000] (2:Bob@Fafard) Releasing #2 +> [ 2.000000] (2:Bob@Fafard) Released #2 +> [ 3.000000] (2:Bob@Fafard) Trying for 1 sec to acquire #3 (that is free) +> [ 3.000000] (2:Bob@Fafard) Acquired #3 +> [ 4.000000] (1:Alice@Fafard) Trying for 1 sec to acquire #3 (that is not free) +> [ 5.000000] (2:Bob@Fafard) Releasing #4 +> [ 5.000000] (1:Alice@Fafard) Timeout.. Try #3 for another second. +> [ 5.000000] (2:Bob@Fafard) Released #4 +> [ 5.000000] (1:Alice@Fafard) Acquired #3 +> [ 7.000000] (2:Bob@Fafard) Trying for 1 sec to acquire #5 (that is not free) +> [ 8.000000] (2:Bob@Fafard) Timeout.. Try #5 for another second. +> [ 9.000000] (2:Bob@Fafard) Timeout.. Try #5 for another second. +> [ 10.000000] (1:Alice@Fafard) Releasing #4 +> [ 10.000000] (2:Bob@Fafard) Timeout.. Try #5 for another second. +> [ 10.000000] (1:Alice@Fafard) Released #4 +> [ 10.000000] (2:Bob@Fafard) Acquired #5 +> [ 10.000000] (2:Bob@Fafard) Releasing #6 +> [ 10.000000] (2:Bob@Fafard) Released #6 +> [ 10.000000] (2:Bob@Fafard) Trying for 1 sec to acquire #7 (that is free) +> [ 10.000000] (2:Bob@Fafard) Acquired #7 +> [ 11.000000] (1:Alice@Fafard) Trying for 1 sec to acquire #5 (that is not free) +> [ 12.000000] (1:Alice@Fafard) Timeout.. Try #5 for another second. +> [ 13.000000] (1:Alice@Fafard) Timeout.. Try #5 for another second. +> [ 14.000000] (1:Alice@Fafard) Timeout.. Try #5 for another second. +> [ 15.000000] (2:Bob@Fafard) Releasing #8 +> [ 15.000000] (1:Alice@Fafard) Timeout.. Try #5 for another second. +> [ 15.000000] (2:Bob@Fafard) Released #8 +> [ 15.000000] (1:Alice@Fafard) Acquired #5 +> [ 17.000000] (1:Alice@Fafard) Releasing #6 +> [ 17.000000] (1:Alice@Fafard) Released #6 +> [ 22.000000] (1:Alice@Fafard) Trying for 1 sec to acquire #7 (that is free) +> [ 22.000000] (1:Alice@Fafard) Acquired #7 +> [ 22.000000] (1:Alice@Fafard) Releasing #8 +> [ 22.000000] (1:Alice@Fafard) Released #8 > [ 65.000000] (2:Bob@Fafard) Done > [ 72.000000] (1:Alice@Fafard) Done > [ 72.000000] (0:maestro@) Finished diff --git a/include/simgrid/forward.h b/include/simgrid/forward.h index f723289033..8f9c545de1 100644 --- a/include/simgrid/forward.h +++ b/include/simgrid/forward.h @@ -146,6 +146,8 @@ namespace activity { using SynchroImplPtr = boost::intrusive_ptr; class SemaphoreImpl; using SemaphoreImplPtr = boost::intrusive_ptr; + class SemAcquisitionImpl; + using SemAcquisitionImplPtr = boost::intrusive_ptr; XBT_PUBLIC void intrusive_ptr_add_ref(SemaphoreImpl* sem); XBT_PUBLIC void intrusive_ptr_release(SemaphoreImpl* sem); class SleepImpl; diff --git a/src/kernel/activity/ActivityImpl.cpp b/src/kernel/activity/ActivityImpl.cpp index 96d19d8e61..36382b7c3b 100644 --- a/src/kernel/activity/ActivityImpl.cpp +++ b/src/kernel/activity/ActivityImpl.cpp @@ -126,7 +126,7 @@ void ActivityImpl::wait_for(actor::ActorImpl* issuer, double timeout) issuer->exception_ = nullptr; auto* observer = dynamic_cast(issuer->simcall_.observer_); xbt_assert(observer != nullptr); - observer->set_result(true); + observer->set_result(true); // Returns that the wait_for timeouted })); synchro->set_host(issuer->get_host()).set_timeout(timeout).start(); synchro->register_simcall(&issuer->simcall_); diff --git a/src/kernel/activity/MutexImpl.cpp b/src/kernel/activity/MutexImpl.cpp index bfce33a84e..74af186015 100644 --- a/src/kernel/activity/MutexImpl.cpp +++ b/src/kernel/activity/MutexImpl.cpp @@ -29,10 +29,7 @@ bool MutexAcquisitionImpl::test(actor::ActorImpl*) void MutexAcquisitionImpl::wait_for(actor::ActorImpl* issuer, double timeout) { xbt_assert(mutex_->owner_ != nullptr); // it was locked either by someone else or by me during the lock_async - xbt_assert( - issuer == issuer_, - "Actors can only wait acquisitions that they created themselves while this one was created by actor id %ld.", - issuer_->get_pid()); + xbt_assert(issuer == issuer_, "Cannot wait on acquisitions created by another actor (id %ld)", issuer_->get_pid()); xbt_assert(timeout < 0, "Timeouts on mutex acquisitions are not implemented yet."); this->register_simcall(&issuer_->simcall_); // Block on that acquisition @@ -60,8 +57,7 @@ MutexAcquisitionImplPtr MutexImpl::lock_async(actor::ActorImpl* issuer) auto res = MutexAcquisitionImplPtr(new kernel::activity::MutexAcquisitionImpl(issuer, this), true); if (owner_ != nullptr) { - /* FIXME: check if the host is active ? */ - /* Somebody using the mutex, use a synchronization to get host failures */ + /* Somebody is using the mutex; register the acquisition */ sleeping_.push_back(res); } else { owner_ = issuer; @@ -103,12 +99,13 @@ void MutexImpl::unlock(actor::ActorImpl* issuer) if (not sleeping_.empty()) { /* Give the ownership to the first waiting actor */ auto acq = sleeping_.front(); - owner_ = acq->get_issuer(); + sleeping_.pop_front(); + owner_ = acq->get_issuer(); if (acq == owner_->waiting_synchro_) acq->finish(); + // else, the issuer is not blocked on this acquisition so no need to release it - sleeping_.pop_front(); } else { /* nobody to wake up */ owner_ = nullptr; diff --git a/src/kernel/activity/MutexImpl.hpp b/src/kernel/activity/MutexImpl.hpp index 347880d057..1c233575fb 100644 --- a/src/kernel/activity/MutexImpl.hpp +++ b/src/kernel/activity/MutexImpl.hpp @@ -41,8 +41,7 @@ namespace activity { * of a set if some transitions may become disabled in between, while you don't have to reconsider them if you can reuse * your previous computations). */ -class XBT_PUBLIC MutexAcquisitionImpl - : public ActivityImpl_T { // Acquisition: n. The act or process of acquiring. +class XBT_PUBLIC MutexAcquisitionImpl : public ActivityImpl_T { actor::ActorImpl* issuer_ = nullptr; MutexImpl* mutex_ = nullptr; diff --git a/src/kernel/activity/SemaphoreImpl.cpp b/src/kernel/activity/SemaphoreImpl.cpp index 021ef85150..cf23116f27 100644 --- a/src/kernel/activity/SemaphoreImpl.cpp +++ b/src/kernel/activity/SemaphoreImpl.cpp @@ -3,9 +3,14 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ +#include +#include + #include "src/kernel/activity/SemaphoreImpl.hpp" #include "src/kernel/activity/Synchro.hpp" #include "src/kernel/actor/SimcallObserver.hpp" +#include "src/kernel/resource/CpuImpl.hpp" + #include // std::isfinite XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_semaphore, ker_synchro, "Semaphore kernel-space implementation"); @@ -14,36 +19,91 @@ namespace simgrid { namespace kernel { namespace activity { -void SemaphoreImpl::acquire(actor::ActorImpl* issuer, double timeout) +void SemAcquisitionImpl::wait_for(actor::ActorImpl* issuer, double timeout) { - XBT_DEBUG("Wait semaphore %p (timeout:%f)", this, timeout); xbt_assert(std::isfinite(timeout), "timeout is not finite!"); + xbt_assert(issuer == issuer_, "Cannot wait on acquisitions created by another actor (id %ld)", issuer_->get_pid()); + + XBT_DEBUG("Wait semaphore %p (timeout:%f)", this, timeout); + + this->register_simcall(&issuer_->simcall_); // Block on that acquisition + + if (granted_) { + post(); + } else if (timeout > 0) { + surf_action_ = get_issuer()->get_host()->get_cpu()->sleep(timeout); + surf_action_->set_activity(this); + + } else { + // Already in the queue + } +} +void SemAcquisitionImpl::post() +{ + finish(); +} +void SemAcquisitionImpl::finish() +{ + xbt_assert(simcalls_.size() == 1, "Unexpected number of simcalls waiting: %zu", simcalls_.size()); + smx_simcall_t simcall = simcalls_.front(); + simcalls_.pop_front(); + + if (surf_action_ != nullptr) { // A timeout was declared + if (surf_action_->get_state() == resource::Action::State::FINISHED) { // The timeout elapsed + if (granted_) { // but we got the semaphore, just in time! + set_state(State::DONE); + + } else { // we have to report that timeout + /* Remove myself from the list of interested parties */ + auto issuer = get_issuer(); + auto it = std::find_if(semaphore_->sleeping_.begin(), semaphore_->sleeping_.end(), + [issuer](SemAcquisitionImplPtr acqui) { return acqui->get_issuer() == issuer; }); + xbt_assert(it != semaphore_->sleeping_.end(), "Cannot find myself in the waiting queue that I have to leave"); + semaphore_->sleeping_.erase(it); + + /* Return to the englobing simcall that the wait_for timeouted */ + auto* observer = dynamic_cast(issuer->simcall_.observer_); + xbt_assert(observer != nullptr); + observer->set_result(true); + } + } + surf_action_->unref(); + } + + simcall->issuer_->waiting_synchro_ = nullptr; + simcall->issuer_->simcall_answer(); +} + +SemAcquisitionImplPtr SemaphoreImpl::acquire_async(actor::ActorImpl* issuer) +{ + auto res = SemAcquisitionImplPtr(new kernel::activity::SemAcquisitionImpl(issuer, this), true); if (value_ <= 0) { - SynchroImplPtr synchro(new SynchroImpl([this, issuer]() { - this->remove_sleeping_actor(*issuer); - auto* observer = dynamic_cast(issuer->simcall_.observer_); - xbt_assert(observer != nullptr); - observer->set_result(true); - })); - synchro->set_host(issuer->get_host()).set_timeout(timeout).start(); - synchro->register_simcall(&issuer->simcall_); - sleeping_.push_back(*issuer); + /* No free token in the semaphore; register the acquisition */ + sleeping_.push_back(res); } else { value_--; - issuer->simcall_answer(); + res->granted_ = true; } + return res; } void SemaphoreImpl::release() { XBT_DEBUG("Sem release semaphore %p", this); if (not sleeping_.empty()) { - auto& actor = sleeping_.front(); + /* Release the first waiting actor */ + + auto acqui = sleeping_.front(); sleeping_.pop_front(); - actor.waiting_synchro_ = nullptr; - actor.simcall_answer(); + + acqui->granted_ = true; + if (acqui == acqui->get_issuer()->waiting_synchro_) + acqui->post(); + // else, the issuer is not blocked on this acquisition so no need to release it + } else { + // nobody's waiting here value_++; } } diff --git a/src/kernel/activity/SemaphoreImpl.hpp b/src/kernel/activity/SemaphoreImpl.hpp index 3704fc4e82..9347ea9e35 100644 --- a/src/kernel/activity/SemaphoreImpl.hpp +++ b/src/kernel/activity/SemaphoreImpl.hpp @@ -16,11 +16,39 @@ namespace simgrid { namespace kernel { namespace activity { +/** Semaphore Acquisition: the act / process of acquiring the semaphore. + * + * You can declare some interest on a semaphore without being blocked waiting if it's already empty. + * See the documentation of the MutexAcquisitionImpl for further details. + */ +class XBT_PUBLIC SemAcquisitionImpl : public ActivityImpl_T { + actor::ActorImpl* issuer_ = nullptr; + SemaphoreImpl* semaphore_ = nullptr; + bool granted_ = false; + + friend SemaphoreImpl; + +public: + SemAcquisitionImpl(actor::ActorImpl* issuer, SemaphoreImpl* sem) : issuer_(issuer), semaphore_(sem) {} + SemaphoreImplPtr get_semaphore() { return semaphore_; } + actor::ActorImpl* get_issuer() { return issuer_; } + + bool test(actor::ActorImpl* issuer = nullptr) override { return granted_; } + void wait_for(actor::ActorImpl* issuer, double timeout) override; + void post() override; + void finish() override; + void set_exception(actor::ActorImpl* issuer) override + { /* nothing to do */ + } +}; + class XBT_PUBLIC SemaphoreImpl { std::atomic_int_fast32_t refcount_{1}; s4u::Semaphore piface_; unsigned int value_; - actor::SynchroList sleeping_; /* list of sleeping actors*/ + std::deque sleeping_; /* ongoing acquisitions */ + + friend SemAcquisitionImpl; public: explicit SemaphoreImpl(unsigned int value) : piface_(this), value_(value){}; @@ -28,10 +56,9 @@ public: SemaphoreImpl(SemaphoreImpl const&) = delete; SemaphoreImpl& operator=(SemaphoreImpl const&) = delete; - void acquire(actor::ActorImpl* issuer, double timeout); + SemAcquisitionImplPtr acquire_async(actor::ActorImpl* issuer); void release(); bool would_block() const { return (value_ == 0); } - void remove_sleeping_actor(actor::ActorImpl& actor) { xbt::intrusive_erase(sleeping_, actor); } unsigned int get_capacity() const { return value_; } bool is_used() const { return not sleeping_.empty(); } diff --git a/src/s4u/s4u_Activity.cpp b/src/s4u/s4u_Activity.cpp index 32f7bd1fb6..15111b80d1 100644 --- a/src/s4u/s4u_Activity.cpp +++ b/src/s4u/s4u_Activity.cpp @@ -59,6 +59,7 @@ Activity* Activity::wait_for(double timeout) throw HostFailureException(XBT_THROW_POINT, "Cannot wait for a failed exec"); if (dynamic_cast(this)) throw StorageFailureException(XBT_THROW_POINT, "Cannot wait for a failed I/O"); + THROW_IMPOSSIBLE; } kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); diff --git a/src/s4u/s4u_Semaphore.cpp b/src/s4u/s4u_Semaphore.cpp index 05a203c9a7..d0a1b79598 100644 --- a/src/s4u/s4u_Semaphore.cpp +++ b/src/s4u/s4u_Semaphore.cpp @@ -22,7 +22,7 @@ void Semaphore::acquire() { kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); kernel::actor::SemAcquireSimcall observer{issuer, pimpl_}; - kernel::actor::simcall_blocking([&observer] { observer.get_sem()->acquire(observer.get_issuer(), -1.0); }, &observer); + kernel::actor::simcall_blocking([this, issuer] { pimpl_->acquire_async(issuer)->wait_for(issuer, -1.0); }, &observer); } bool Semaphore::acquire_timeout(double timeout) @@ -30,7 +30,7 @@ bool Semaphore::acquire_timeout(double timeout) kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); kernel::actor::SemAcquireSimcall observer{issuer, pimpl_, timeout}; return kernel::actor::simcall_blocking( - [&observer] { observer.get_sem()->acquire(observer.get_issuer(), observer.get_timeout()); }, &observer); + [this, issuer, timeout] { pimpl_->acquire_async(issuer)->wait_for(issuer, timeout); }, &observer); } void Semaphore::release() -- 2.20.1