Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Rename the CondVar observer as it should
[simgrid.git] / src / kernel / activity / ConditionVariableImpl.cpp
1 /* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/kernel/activity/ConditionVariableImpl.hpp"
7 #include "src/kernel/activity/MutexImpl.hpp"
8 #include "src/kernel/activity/Synchro.hpp"
9 #include "src/kernel/actor/SynchroObserver.hpp"
10 #include <cmath> // std::isfinite
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_condition, ker_synchro, "Condition variables kernel-space implementation");
13
14 /********************************* Condition **********************************/
15
16 namespace simgrid::kernel::activity {
17
18 /**
19  * @brief Signalizes a condition.
20  *
21  * Signalizes a condition and wakes up a sleeping actor.
22  * If there are no actor sleeping, no action is done.
23  */
24 void ConditionVariableImpl::signal()
25 {
26   XBT_DEBUG("Signal condition %p", this);
27
28   /* If there are actors waiting for the condition choose one and try to make it acquire the mutex */
29   if (not sleeping_.empty()) {
30     auto& proc = sleeping_.front();
31     sleeping_.pop_front();
32
33     /* Destroy waiter's synchronization */
34     proc.waiting_synchro_ = nullptr;
35
36     /* Now transform the cond wait simcall into a mutex lock one */
37     actor::Simcall* simcall = &proc.simcall_;
38     const auto* observer    = dynamic_cast<kernel::actor::ConditionVariableObserver*>(simcall->observer_);
39     xbt_assert(observer != nullptr);
40     observer->get_mutex()->lock_async(simcall->issuer_)->wait_for(simcall->issuer_, -1);
41   }
42   XBT_OUT();
43 }
44
45 /**
46  * @brief Broadcasts a condition.
47  *
48  * Signal ALL actors waiting on a condition.
49  * If there are no actor waiting, no action is done.
50  */
51 void ConditionVariableImpl::broadcast()
52 {
53   XBT_DEBUG("Broadcast condition %p", this);
54
55   /* Signal the condition until nobody is waiting on it */
56   while (not sleeping_.empty())
57     signal();
58 }
59
60 void ConditionVariableImpl::wait(MutexImpl* mutex, double timeout, actor::ActorImpl* issuer)
61 {
62   XBT_DEBUG("Wait condition %p", this);
63   xbt_assert(std::isfinite(timeout), "timeout is not finite!");
64
65   /* If there is a mutex unlock it */
66   if (mutex != nullptr) {
67     auto* owner = mutex->get_owner();
68     xbt_assert(owner == issuer,
69                "Actor %s cannot wait on ConditionVariable %p since it does not own the provided mutex %p (which is "
70                "owned by %s).",
71                issuer->get_cname(), this, mutex, (owner == nullptr ? "nobody" : owner->get_cname()));
72     mutex_ = mutex;
73     mutex->unlock(issuer);
74   }
75
76   SynchroImplPtr synchro(new SynchroImpl([this, issuer]() {
77     this->remove_sleeping_actor(*issuer);
78     auto* observer = dynamic_cast<kernel::actor::ConditionVariableObserver*>(issuer->simcall_.observer_);
79     xbt_assert(observer != nullptr);
80     observer->set_result(true);
81   }));
82   synchro->set_host(issuer->get_host()).set_timeout(timeout).start();
83   synchro->register_simcall(&issuer->simcall_);
84   sleeping_.push_back(*issuer);
85 }
86
87 // boost::intrusive_ptr<ConditionVariableImpl> support:
88 void intrusive_ptr_add_ref(ConditionVariableImpl* cond)
89 {
90   cond->refcount_.fetch_add(1, std::memory_order_relaxed);
91 }
92
93 void intrusive_ptr_release(ConditionVariableImpl* cond)
94 {
95   if (cond->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
96     std::atomic_thread_fence(std::memory_order_acquire);
97     xbt_assert(cond->sleeping_.empty(), "Cannot destroy conditional since someone is still using it");
98     delete cond;
99   }
100 }
101 } // namespace simgrid::kernel::activity