1 /* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <simgrid/Exception.hpp>
7 #include <simgrid/s4u/Host.hpp>
9 #include "src/kernel/EngineImpl.hpp"
10 #include "src/kernel/activity/MessImpl.hpp"
11 #include "src/kernel/activity/MessageQueueImpl.hpp"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mess, kernel, "Kernel message synchronization");
15 namespace simgrid::kernel::activity {
23 MessImpl& MessImpl::set_type(MessImplType type)
29 MessImpl& MessImpl::set_queue(MessageQueueImpl* queue)
35 MessImpl& MessImpl::set_payload(void* payload)
41 MessImpl& MessImpl::set_dst_buff(unsigned char* buff, size_t* size)
44 dst_buff_size_ = size;
48 MessImpl* MessImpl::start()
50 if (get_state() == State::READY) {
51 XBT_DEBUG("Starting message exchange %p from '%s' to '%s' (state: %s)", this, src_actor_->get_host()->get_cname(),
52 dst_actor_->get_host()->get_cname(), get_state_str());
53 set_state(State::RUNNING);
59 ActivityImplPtr MessImpl::iput(actor::MessIputSimcall* observer)
61 auto* queue = observer->get_queue();
62 XBT_DEBUG("put from message queue %p", queue);
64 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
65 MessImplPtr this_mess(new MessImpl());
66 this_mess->set_type(MessImplType::PUT);
68 /* Look for message synchro matching our needs.
70 * If it is not found then push our communication into the rendez-vous point */
71 MessImplPtr other_mess = queue->find_matching_message(MessImplType::GET);
74 other_mess = std::move(this_mess);
75 queue->push(other_mess);
77 XBT_DEBUG("Get already pushed");
78 other_mess->set_state(State::READY);
81 observer->set_message(other_mess.get());
82 observer->get_issuer()->activities_.insert(other_mess);
85 other_mess->src_actor_ = observer->get_issuer();
86 other_mess->payload_ = observer->get_payload();
92 ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer)
94 MessImplPtr this_mess(new MessImpl());
95 this_mess->set_type(MessImplType::GET);
97 auto* queue = observer->get_queue();
98 XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_mess.get());
100 MessImplPtr other_mess = queue->find_matching_message(MessImplType::PUT);
102 if (other_mess == nullptr) {
103 XBT_DEBUG("Get pushed first (%zu comm enqueued so far)", queue->size());
104 other_mess = std::move(this_mess);
105 queue->push(other_mess);
107 XBT_DEBUG("Match my %p with the existing %p", this_mess.get(), other_mess.get());
109 other_mess->set_state(State::READY);
112 observer->get_issuer()->activities_.insert(other_mess);
113 observer->set_message(other_mess.get());
116 other_mess->set_dst_buff(observer->get_dst_buff(), observer->get_dst_buff_size());
117 other_mess->dst_actor_ = observer->get_issuer();
124 void MessImpl::wait_for(actor::ActorImpl* issuer, double timeout)
126 XBT_DEBUG("MessImpl::wait_for(%g), %p, state %s", timeout, this, get_state_str());
128 /* Associate this simcall to the wait synchro */
129 register_simcall(&issuer->simcall_);
130 ActivityImpl::wait_for(issuer, timeout);
133 void MessImpl::finish()
135 XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
136 src_actor_.get(), dst_actor_.get());
139 const auto& piface = static_cast<const s4u::Mess&>(*get_iface());
140 set_iface(nullptr); // reset iface to protect against multiple trigger of the on_completion signals
141 piface.fire_on_completion_for_real();
142 piface.fire_on_this_completion_for_real();
145 /* Update synchro state */
146 if (get_state() == State::RUNNING) {
147 set_state(State::DONE);
150 /* If the synchro is still in a rendez-vous point then remove from it */
152 queue_->remove(this);
154 if (get_state() == State::DONE && payload_ != nullptr)
155 *(void**)(dst_buff_) = payload_;
157 while (not simcalls_.empty()) {
158 actor::Simcall* simcall = simcalls_.front();
159 simcalls_.pop_front();
161 /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
162 * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
165 if (simcall->call_ == actor::Simcall::Type::NONE) // FIXME: maybe a better way to handle this case
166 continue; // if actor handling comm is killed
168 handle_activity_waitany(simcall);
170 /* Check out for errors */
172 if (not simcall->issuer_->get_host()->is_on()) {
173 simcall->issuer_->set_wannadie();
175 // Do not answer to dying actors
176 if (not simcall->issuer_->wannadie()) {
177 set_exception(simcall->issuer_);
178 simcall->issuer_->simcall_answer();
182 simcall->issuer_->waiting_synchro_ = nullptr;
183 simcall->issuer_->activities_.erase(this);
187 } // namespace simgrid::kernel::activity