1 /* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include <simgrid/Exception.hpp>
8 #include <simgrid/s4u/ActivitySet.hpp>
9 #include <simgrid/s4u/Mess.hpp>
10 #include <simgrid/s4u/Engine.hpp>
11 #include <simgrid/s4u/MessageQueue.hpp>
13 #include "src/kernel/activity/MessImpl.hpp"
14 #include "src/kernel/actor/ActorImpl.hpp"
15 #include "src/kernel/actor/SimcallObserver.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_mess, s4u_activity, "S4U asynchronous messaging");
19 namespace simgrid::s4u {
20 xbt::signal<void(Mess const&)> Mess::on_send;
21 xbt::signal<void(Mess const&)> Mess::on_recv;
23 MessPtr Mess::set_queue(MessageQueue* queue)
29 MessPtr Mess::set_payload(void* payload)
35 MessPtr Mess::set_dst_data(void** buff, size_t size)
37 xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
41 dst_buff_size_ = size;
45 Actor* Mess::get_sender() const
47 kernel::actor::ActorImplPtr sender = nullptr;
49 sender = boost::static_pointer_cast<kernel::activity::MessImpl>(pimpl_)->src_actor_;
50 return sender ? sender->get_ciface() : nullptr;
53 Actor* Mess::get_receiver() const
55 kernel::actor::ActorImplPtr receiver = nullptr;
57 receiver = boost::static_pointer_cast<kernel::activity::MessImpl>(pimpl_)->dst_actor_;
58 return receiver ? receiver->get_ciface() : nullptr;
61 Mess* Mess::do_start()
63 xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
64 "You cannot use %s() once your message exchange has started (not implemented)", __func__);
66 auto myself = kernel::actor::ActorImpl::self();
67 if (myself == sender_) {
70 kernel::actor::MessIputSimcall observer{sender_, queue_->get_impl(), get_payload()};
71 pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iput(&observer); },
73 } else if (myself == receiver_) {
76 kernel::actor::MessIgetSimcall observer{receiver_,
78 static_cast<unsigned char*>(dst_buff_),
81 pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iget(&observer); },
84 xbt_die("Cannot start a message exchange before specifying whether we are the sender or the receiver");
87 pimpl_->set_iface(this);
88 pimpl_->set_actor(sender_);
89 // Only throw the signal when both sides are here and the status is READY
90 if (pimpl_->get_state() != kernel::activity::State::WAITING) {
94 state_ = State::STARTED;
98 Mess* Mess::wait_for(double timeout)
100 XBT_DEBUG("Calling Mess::wait_for with state %s", get_state_str());
101 kernel::actor::ActorImpl* issuer = nullptr;
103 case State::FINISHED:
106 throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
108 case State::STARTING:
109 if (get_payload() != nullptr) {
112 kernel::actor::MessIputSimcall observer{sender_, queue_->get_impl(), get_payload()};
113 pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iput(&observer); },
118 kernel::actor::MessIgetSimcall observer{receiver_,
120 static_cast<unsigned char*>(dst_buff_),
123 pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iget(&observer); },
129 issuer = kernel::actor::ActorImpl::self();
130 kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "Wait"};
131 if (kernel::actor::simcall_blocking(
132 [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
134 throw TimeoutException(XBT_THROW_POINT, "Timeouted");
136 } catch (const NetworkFailureException& e) {
137 issuer->simcall_.observer_ = nullptr; // Comm failed on network failure, reset the observer to nullptr
138 complete(State::FAILED);
139 e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
143 case State::CANCELED:
144 throw CancelException(XBT_THROW_POINT, "Message canceled");
149 complete(State::FINISHED);
153 } // namespace simgrid::s4u