X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/554255fe3f311a54df171d31a4a27f4004840f59..8c6bbb14f44b8daed5ea43d80880c6bf2155118b:/src/kernel/activity/SemaphoreImpl.cpp diff --git a/src/kernel/activity/SemaphoreImpl.cpp b/src/kernel/activity/SemaphoreImpl.cpp index fb13608087..2bb639663d 100644 --- a/src/kernel/activity/SemaphoreImpl.cpp +++ b/src/kernel/activity/SemaphoreImpl.cpp @@ -1,66 +1,120 @@ -/* Copyright (c) 2019-2021. The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ +#include +#include + #include "src/kernel/activity/SemaphoreImpl.hpp" -#include "src/kernel/activity/SynchroRaw.hpp" -#include "src/kernel/actor/SimcallObserver.hpp" +#include "src/kernel/activity/Synchro.hpp" +#include "src/kernel/actor/SynchroObserver.hpp" +#include "src/kernel/resource/CpuImpl.hpp" + #include // std::isfinite -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_semaphore, simix_synchro, "Semaphore kernel-space implementation"); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_semaphore, ker_synchro, "Semaphore kernel-space implementation"); + +namespace simgrid::kernel::activity { -namespace simgrid { -namespace kernel { -namespace activity { +/* -------- Acquisition -------- */ -void SemaphoreImpl::acquire(actor::ActorImpl* issuer, double timeout) +void SemAcquisitionImpl::wait_for(actor::ActorImpl* issuer, double timeout) { - XBT_DEBUG("Wait semaphore %p (timeout:%f)", this, timeout); xbt_assert(std::isfinite(timeout), "timeout is not finite!"); + xbt_assert(issuer == issuer_, "Cannot wait on acquisitions created by another actor (id %ld)", issuer_->get_pid()); + + XBT_DEBUG("Wait semaphore %p (timeout:%f)", this, timeout); + + this->register_simcall(&issuer_->simcall_); // Block on that acquisition + + if (granted_) { + post(); + } else if (timeout > 0) { + model_action_ = get_issuer()->get_host()->get_cpu()->sleep(timeout); + model_action_->set_activity(this); - if (value_ <= 0) { - RawImplPtr synchro(new RawImpl([this, issuer]() { - this->remove_sleeping_actor(*issuer); - auto* observer = dynamic_cast(issuer->simcall_.observer_); - xbt_assert(observer != nullptr); - observer->set_result(true); - })); - synchro->set_host(issuer->get_host()).set_timeout(timeout).start(); - synchro->register_simcall(&issuer->simcall_); - sleeping_.push_back(*issuer); } else { - value_--; - issuer->simcall_answer(); + // Already in the queue } } -void SemaphoreImpl::release() +void SemAcquisitionImpl::post() { - XBT_DEBUG("Sem release semaphore %p", this); + finish(); +} +void SemAcquisitionImpl::finish() +{ + xbt_assert(simcalls_.size() == 1, "Unexpected number of simcalls waiting: %zu", simcalls_.size()); + actor::Simcall* simcall = simcalls_.front(); + simcalls_.pop_front(); - if (not sleeping_.empty()) { - auto& actor = sleeping_.front(); - sleeping_.pop_front(); - actor.waiting_synchro_ = nullptr; - actor.simcall_answer(); - } else { - value_++; + if (model_action_ != nullptr) { // A timeout was declared + if (model_action_->get_state() == resource::Action::State::FINISHED) { // The timeout elapsed + if (granted_) { // but we got the semaphore, just in time! + set_state(State::DONE); + + } else { // we have to report that timeout + cancel(); // Unregister the acquisition from the semaphore + + /* Return to the englobing simcall that the wait_for timeouted */ + auto* observer = dynamic_cast(get_issuer()->simcall_.observer_); + xbt_assert(observer != nullptr); + observer->set_result(true); + } + } + model_action_->unref(); + model_action_ = nullptr; } -} -/** Increase the refcount for this semaphore */ -SemaphoreImpl* SemaphoreImpl::ref() + simcall->issuer_->waiting_synchro_ = nullptr; + simcall->issuer_->simcall_answer(); +} +void SemAcquisitionImpl::cancel() { - intrusive_ptr_add_ref(this); - return this; + /* Remove myself from the list of interested parties */ + auto issuer = get_issuer(); + auto it = std::find_if(semaphore_->ongoing_acquisitions_.begin(), semaphore_->ongoing_acquisitions_.end(), + [issuer](SemAcquisitionImplPtr acqui) { return acqui->get_issuer() == issuer; }); + xbt_assert(it != semaphore_->ongoing_acquisitions_.end(), + "Cannot find myself in the waiting queue that I have to leave"); + semaphore_->ongoing_acquisitions_.erase(it); } -/** Decrease the refcount for this mutex */ -void SemaphoreImpl::unref() +/* -------- Semaphore -------- */ +unsigned SemaphoreImpl::next_id_ = 0; + +SemAcquisitionImplPtr SemaphoreImpl::acquire_async(actor::ActorImpl* issuer) +{ + auto res = SemAcquisitionImplPtr(new kernel::activity::SemAcquisitionImpl(issuer, this), true); + + if (value_ > 0) { + value_--; + res->granted_ = true; + } else { + /* No free token in the semaphore; register the acquisition */ + ongoing_acquisitions_.push_back(res); + } + return res; +} +void SemaphoreImpl::release() { - intrusive_ptr_release(this); + XBT_DEBUG("Sem release semaphore %p", this); + + if (not ongoing_acquisitions_.empty()) { + /* Release the first waiting actor */ + + auto acqui = ongoing_acquisitions_.front(); + ongoing_acquisitions_.pop_front(); + + acqui->granted_ = true; + if (acqui == acqui->get_issuer()->waiting_synchro_) + acqui->post(); + // else, the issuer is not blocked on this acquisition so no need to release it + + } else { + // nobody's waiting here + value_++; + } } -} // namespace activity -} // namespace kernel -} // namespace simgrid +} // namespace simgrid::kernel::activity