include examples/cpp/mc-failing-assert/s4u-mc-failing-assert-statequality.tesh
include examples/cpp/mc-failing-assert/s4u-mc-failing-assert.cpp
include examples/cpp/mc-failing-assert/s4u-mc-failing-assert.tesh
+include examples/cpp/mess-wait/s4u-mess-wait.cpp
+include examples/cpp/mess-wait/s4u-mess-wait.tesh
include examples/cpp/network-factors/s4u-network-factors.cpp
include examples/cpp/network-factors/s4u-network-factors.tesh
include examples/cpp/network-nonlinear/s4u-network-nonlinear.cpp
include examples/cpp/network-ns3/s4u-network-ns3.cpp
include examples/cpp/network-wifi/s4u-network-wifi.cpp
include examples/cpp/network-wifi/s4u-network-wifi.tesh
-include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp
-include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh
include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.cpp
include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.tesh
include examples/cpp/platform-failures/s4u-platform-failures.cpp
include examples/cpp/replay-io/s4u-replay-io_d.xml
include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.cpp
include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.tesh
+include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp
+include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh
include examples/cpp/synchro-barrier/s4u-mc-synchro-barrier.tesh
include examples/cpp/synchro-barrier/s4u-synchro-barrier.cpp
include examples/cpp/synchro-barrier/s4u-synchro-barrier.tesh
include include/simgrid/s4u/Io.hpp
include include/simgrid/s4u/Link.hpp
include include/simgrid/s4u/Mailbox.hpp
+include include/simgrid/s4u/MessageQueue.hpp
include include/simgrid/s4u/Mutex.hpp
include include/simgrid/s4u/NetZone.hpp
include include/simgrid/s4u/Semaphore.hpp
include src/kernel/activity/IoImpl.hpp
include src/kernel/activity/MailboxImpl.cpp
include src/kernel/activity/MailboxImpl.hpp
+include src/kernel/activity/MessImpl.cpp
+include src/kernel/activity/MessImpl.hpp
+include src/kernel/activity/MessageQueueImpl.cpp
+include src/kernel/activity/MessageQueueImpl.hpp
include src/kernel/activity/MutexImpl.cpp
include src/kernel/activity/MutexImpl.hpp
include src/kernel/activity/SemaphoreImpl.cpp
include src/s4u/s4u_Io.cpp
include src/s4u/s4u_Link.cpp
include src/s4u/s4u_Mailbox.cpp
+include src/s4u/s4u_Mess.cpp
+include src/s4u/s4u_MessageQueue.cpp
include src/s4u/s4u_Mutex.cpp
include src/s4u/s4u_Netzone.cpp
include src/s4u/s4u_Semaphore.cpp
exec-ptask-multicore exec-ptask-multicore-latency exec-cpu-nonlinear exec-cpu-factors exec-failure exec-threads
maestro-set
mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
+ mess-wait
network-ns3 network-ns3-wifi network-wifi
io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent
task-dispatch task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example shows how to use simgrid::s4u::this_actor::wait() to wait for a given communication.
+ *
+ * As for the other asynchronous examples, the sender initiate all the messages it wants to send and
+ * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently.
+ *
+ * The sender then loops until there is no ongoing communication.
+ */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_mess_wait, "Messages specific for this s4u example");
+
+static void sender(int messages_count)
+{
+ sg4::MessageQueue* mqueue = sg4::MessageQueue::by_name("control");
+
+ sg4::this_actor::sleep_for(0.5);
+
+ for (int i = 0; i < messages_count; i++) {
+ std::string msg_content = "Message " + std::to_string(i);
+ // Copy the data we send: the 'msg_content' variable is not a stable storage location.
+ // It will be destroyed when this actor leaves the loop, ie before the receiver gets the data
+ auto* payload = new std::string(msg_content);
+
+ /* Create a control message and put it in the message queue */
+ sg4::MessPtr mess = mqueue->put_async(payload);
+ XBT_INFO("Send '%s' to '%s'", msg_content.c_str(), mqueue->get_cname());
+ mess->wait();
+ }
+
+ /* Send message to let the receiver know that it should stop */
+ XBT_INFO("Send 'finalize' to 'receiver'");
+ mqueue->put(new std::string("finalize"), 0);
+}
+
+/* Receiver actor expects 1 argument: its ID */
+static void receiver()
+{
+ sg4::MessageQueue* mqueue = sg4::MessageQueue::by_name("control");
+
+ sg4::this_actor::sleep_for(1);
+
+ XBT_INFO("Wait for my first message");
+ for (bool cont = true; cont;) {
+ std::string* received;
+ sg4::MessPtr mess = mqueue->get_async<std::string>(&received);
+
+ sg4::this_actor::sleep_for(0.1);
+ mess->wait();
+
+ XBT_INFO("I got a '%s'.", received->c_str());
+ if (*received == "finalize")
+ cont = false; // If it's a finalize message, we're done.
+ delete received;
+ }
+}
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+
+ e.load_platform(argv[1]);
+
+ sg4::Actor::create("sender", e.host_by_name("Tremblay"), sender, 3);
+ sg4::Actor::create("receiver", e.host_by_name("Fafard"), receiver);
+
+ e.run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-mess-wait ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [ 0.500000] (1:sender@Tremblay) Send 'Message 0' to 'control'
+> [ 1.000000] (2:receiver@Fafard) Wait for my first message
+> [ 1.100000] (2:receiver@Fafard) I got a 'Message 0'.
+> [ 1.100000] (1:sender@Tremblay) Send 'Message 1' to 'control'
+> [ 1.100000] (1:sender@Tremblay) Send 'Message 2' to 'control'
+> [ 1.200000] (2:receiver@Fafard) I got a 'Message 1'.
+> [ 1.300000] (1:sender@Tremblay) Send 'finalize' to 'receiver'
+> [ 1.300000] (2:receiver@Fafard) I got a 'Message 2'.
+> [ 1.400000] (2:receiver@Fafard) I got a 'finalize'.
\ No newline at end of file
class Mailbox;
+class Mess;
+/** Smart pointer to a simgrid::s4u::Mess */
+using MessPtr = boost::intrusive_ptr<Mess>;
+XBT_PUBLIC void intrusive_ptr_release(Mess* c);
+XBT_PUBLIC void intrusive_ptr_add_ref(Mess* c);
+
+class MessageQueue;
+
class Mutex;
XBT_PUBLIC void intrusive_ptr_release(const Mutex* m);
XBT_PUBLIC void intrusive_ptr_add_ref(const Mutex* m);
using ExecImplPtr = boost::intrusive_ptr<ExecImpl>;
class IoImpl;
using IoImplPtr = boost::intrusive_ptr<IoImpl>;
+ class MessImpl;
+ using MessImplPtr = boost::intrusive_ptr<MessImpl>;
class MutexImpl;
using MutexImplPtr = boost::intrusive_ptr<MutexImpl>;
class MutexAcquisitionImpl;
using SleepImplPtr = boost::intrusive_ptr<SleepImpl>;
class MailboxImpl;
+ class MessageQueueImpl;
}
namespace context {
class Context;
using s4u_File = simgrid::s4u::File;
using s4u_ConditionVariable = simgrid::s4u::ConditionVariable;
using s4u_Mailbox = simgrid::s4u::Mailbox;
+using s4u_MessageQueue = simgrid::s4u::MessageQueue;
using s4u_Mutex = simgrid::s4u::Mutex;
using s4u_Semaphore = simgrid::s4u::Semaphore;
using s4u_Disk = simgrid::s4u::Disk;
typedef struct s4u_File s4u_File;
typedef struct s4u_ConditionVariable s4u_ConditionVariable;
typedef struct s4u_Mailbox s4u_Mailbox;
+typedef struct s4u_MessageQueue s4u_MessageQueue;
typedef struct s4u_Mutex s4u_Mutex;
typedef struct s4u_Semaphore s4u_Semaphore;
typedef struct s4u_Disk s4u_Disk;
typedef const s4u_ConditionVariable* const_sg_cond_t;
typedef s4u_Mailbox* sg_mailbox_t;
typedef const s4u_Mailbox* const_sg_mailbox_t;
+typedef s4u_MessageQueue* sg_messagequeue_t;
+typedef const s4u_MessageQueue* const_sg_messagequeue_t;
typedef s4u_Mutex* sg_mutex_t;
typedef const s4u_Mutex* const_sg_mutex_t;
typedef s4u_Semaphore* sg_sem_t;
#include <simgrid/s4u/Host.hpp>
#include <simgrid/s4u/Link.hpp>
#include <simgrid/s4u/Mailbox.hpp>
+#include <simgrid/s4u/Mess.hpp>
+#include <simgrid/s4u/MessageQueue.hpp>
#include <simgrid/s4u/Mutex.hpp>
#include <simgrid/s4u/NetZone.hpp>
#include <simgrid/s4u/Semaphore.hpp>
friend Comm;
friend Exec;
friend Io;
+ friend Mess;
friend kernel::activity::ActivityImpl;
friend std::vector<ActivityPtr> create_DAG_from_dot(const std::string& filename);
friend std::vector<ActivityPtr> create_DAG_from_DAX(const std::string& filename);
Link* link_by_name_or_null(const std::string& name) const;
Mailbox* mailbox_by_name_or_create(const std::string& name) const;
+ MessageQueue* message_queue_by_name_or_create(const std::string& name) const;
size_t get_actor_count() const;
std::vector<ActorPtr> get_all_actors() const;
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_S4U_MESS_HPP
+#define SIMGRID_S4U_MESS_HPP
+
+#include <simgrid/forward.h>
+#include <simgrid/s4u/Activity.hpp>
+
+#include <string>
+#include <vector>
+
+namespace simgrid::s4u {
+
+class XBT_PUBLIC Mess : public Activity_T<Mess> {
+#ifndef DOXYGEN
+ friend MessageQueue; // Factory of messages
+ friend kernel::activity::MessImpl;
+#endif
+ MessageQueue* queue_ = nullptr;
+ void* payload_ = nullptr;
+ size_t dst_buff_size_ = 0;
+ void* dst_buff_ = nullptr;
+ kernel::activity::ActivityImplPtr pimpl_;
+
+ Mess() = default;
+ Mess* do_start() override;
+
+ static xbt::signal<void(Mess const&)> on_send;
+ xbt::signal<void(Mess const&)> on_this_send;
+ static xbt::signal<void(Mess const&)> on_recv;
+ xbt::signal<void(Mess const&)> on_this_recv;
+
+ /* These ensure that the on_completion signals are really thrown */
+ void fire_on_completion_for_real() const { Activity_T<Mess>::fire_on_completion(); }
+ void fire_on_this_completion_for_real() const { Activity_T<Mess>::fire_on_this_completion(); }
+
+public:
+#ifndef DOXYGEN
+ Mess(Mess const&) = delete;
+ Mess& operator=(Mess const&) = delete;
+#endif
+
+ MessPtr set_queue(MessageQueue* queue);
+ MessageQueue* get_queue() const { return queue_; }
+
+ /** Retrieve the payload associated to the communication. You can only do that once the comm is (gracefully)
+ * terminated */
+ void* get_payload() const { return payload_; }
+ MessPtr set_payload(void* data);
+ MessPtr set_dst_data(void** buff, size_t size);
+ Actor* get_sender() const;
+ Actor* get_receiver() const;
+
+ bool is_assigned() const override { return true; };
+
+ Mess* wait_for(double timeout) override;
+
+ kernel::actor::ActorImpl* sender_ = nullptr;
+ kernel::actor::ActorImpl* receiver_ = nullptr;
+};
+} // namespace simgrid::s4u
+
+#endif /* SIMGRID_S4U_MESS_HPP */
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_S4U_MESSAGEQUEUE_HPP
+#define SIMGRID_S4U_MESSAGEQUEUE_HPP
+
+#include <simgrid/forward.h>
+#include <simgrid/s4u/Mess.hpp>
+#include <smpi/forward.hpp>
+
+#include <string>
+
+namespace simgrid::s4u {
+
+class XBT_PUBLIC MessageQueue {
+#ifndef DOXYGEN
+ friend Mess;
+ friend kernel::activity::MessageQueueImpl;
+#endif
+
+ kernel::activity::MessageQueueImpl* const pimpl_;
+
+ explicit MessageQueue(kernel::activity::MessageQueueImpl * mqueue) : pimpl_(mqueue) {}
+ ~MessageQueue() = default;
+
+protected:
+ kernel::activity::MessageQueueImpl* get_impl() const { return pimpl_; }
+
+public:
+ /** @brief Retrieves the name of that message queue as a C++ string */
+ const std::string& get_name() const;
+ /** @brief Retrieves the name of that message queue as a C string */
+ const char* get_cname() const;
+
+ /** \static Retrieve the message queye associated to the given name. Message queues are created on demand. */
+ static MessageQueue* by_name(const std::string& name);
+
+ /** Returns whether the message queue contains queued messages */
+ bool empty() const;
+
+ /* Returns the number of queued messages */
+ size_t size() const;
+
+ /** Gets the first element in the queue (without dequeuing it), or nullptr if none is there */
+ kernel::activity::MessImplPtr front() const;
+
+ /** Creates (but don't start) a data transmission to that message queue */
+ MessPtr put_init();
+ /** Creates (but don't start) a data transmission to that message queue.
+ *
+ * Please note that if you send a pointer to some data, you must ensure that your data remains live until
+ * consumption, or the receiver will get a pointer to a garbled memory area.
+ */
+ MessPtr put_init(void* payload);
+ /** Creates and start a data transmission to that mailbox.
+ *
+ * Please note that if you send a pointer to some data, you must ensure that your data remains live until
+ * consumption, or the receiver will get a pointer to a garbled memory area.
+ */
+ MessPtr put_async(void* payload);
+
+ /** Blocking data transmission.
+ *
+ * Please note that if you send a pointer to some data, you must ensure that your data remains live until
+ * consumption, or the receiver will get a pointer to a garbled memory area.
+ */
+ void put(void* payload);
+ /** Blocking data transmission with timeout */
+ void put(void* payload, double timeout);
+
+ /** Creates (but don't start) a data reception onto that message queue. */
+ MessPtr get_init();
+ /** Creates and start an async data reception to that message queue */
+ template <typename T> MessPtr get_async(T** data);
+ /** Creates and start an async data reception to that mailbox. Since the data location is not provided, you'll have to
+ * use Mess::get_payload once the messaging operation terminates */
+ MessPtr get_async();
+
+ /** Blocking data reception */
+ template <typename T> T* get();
+ template <typename T> std::unique_ptr<T> get_unique() { return std::unique_ptr<T>(get<T>()); }
+
+ /** Blocking data reception with timeout */
+ template <typename T> T* get(double timeout);
+ template <typename T> std::unique_ptr<T> get_unique(double timeout) { return std::unique_ptr<T>(get<T>(timeout)); }
+};
+
+template <typename T> MessPtr MessageQueue::get_async(T** data)
+{
+ MessPtr res = get_init()->set_dst_data(reinterpret_cast<void**>(data), sizeof(void*));
+ res->start();
+ return res;
+}
+
+template <typename T> T* MessageQueue::get()
+{
+ T* res = nullptr;
+ get_async<T>(&res)->wait();
+ return res;
+}
+
+template <typename T> T* MessageQueue::get(double timeout)
+{
+ T* res = nullptr;
+ get_async<T>(&res)->wait_for(timeout);
+ return res;
+}
+} // namespace simgrid::s4u
+
+#endif /* SIMGRID_S4U_MESSAGEQUEUE_HPP */
for (auto const& [_, mailbox] : mailboxes_)
delete mailbox;
+ for (auto const& [_, queue] : mqueues_)
+ delete queue;
+
/* Kill all actors (but maestro) */
maestro_->kill_all();
run_all_actors();
#include "src/kernel/activity/ExecImpl.hpp"
#include "src/kernel/activity/IoImpl.hpp"
#include "src/kernel/activity/MailboxImpl.hpp"
+#include "src/kernel/activity/MessageQueueImpl.hpp"
#include "src/kernel/activity/SleepImpl.hpp"
#include "src/kernel/activity/Synchro.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
class EngineImpl {
std::unordered_map<std::string, routing::NetPoint*> netpoints_;
std::unordered_map<std::string, activity::MailboxImpl*> mailboxes_;
+ std::unordered_map<std::string, activity::MessageQueueImpl*> mqueues_;
std::unordered_map<std::string, actor::ActorCodeFactory> registered_functions; // Maps function names to actor code
actor::ActorCodeFactory default_function; // Function to use as a fallback when the provided name matches nothing
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/Exception.hpp>
+#include <simgrid/s4u/Host.hpp>
+
+#include "src/kernel/EngineImpl.hpp"
+#include "src/kernel/activity/MessImpl.hpp"
+#include "src/kernel/activity/MessageQueueImpl.hpp"
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mess, kernel, "Kernel message synchronization");
+
+namespace simgrid::kernel::activity {
+
+MessImpl::~MessImpl()
+{
+ if (queue_)
+ queue_->remove(this);
+}
+
+MessImpl& MessImpl::set_type(MessImplType type)
+{
+ type_ = type;
+ return *this;
+}
+
+MessImpl& MessImpl::set_queue(MessageQueueImpl* queue)
+{
+ queue_ = queue;
+ return *this;
+}
+
+MessImpl& MessImpl::set_payload(void* payload)
+{
+ payload_ = payload;
+ return *this;
+}
+
+MessImpl& MessImpl::set_dst_buff(unsigned char* buff, size_t* size)
+{
+ dst_buff_ = buff;
+ dst_buff_size_ = size;
+ return *this;
+}
+
+MessImpl* MessImpl::start()
+{
+ if (get_state() == State::READY)
+ set_state(State::DONE);
+ return this;
+}
+
+ActivityImplPtr MessImpl::iput(actor::MessIputSimcall* observer)
+{
+ auto* queue = observer->get_queue();
+ XBT_DEBUG("put from message queue %p", queue);
+
+ /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
+ MessImplPtr this_mess(new MessImpl());
+ this_mess->set_type(MessImplType::PUT);
+
+ /* Look for message synchro matching our needs.
+ *
+ * If it is not found then push our communication into the rendez-vous point */
+ MessImplPtr other_mess = queue->find_matching_message(MessImplType::GET);
+
+ if (not other_mess) {
+ other_mess = std::move(this_mess);
+ queue->push(other_mess);
+ } else {
+ XBT_DEBUG("Get already pushed");
+ other_mess->set_state(State::READY);
+ }
+
+ observer->set_message(other_mess.get());
+ observer->get_issuer()->activities_.insert(other_mess);
+
+ /* Setup synchro */
+ other_mess->src_actor_ = observer->get_issuer();
+ other_mess->payload_ = observer->get_payload();
+ other_mess->start();
+
+ return other_mess;
+}
+
+ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer)
+{
+ MessImplPtr this_synchro(new MessImpl());
+ this_synchro->set_type(MessImplType::GET);
+
+ auto* queue = observer->get_queue();
+ XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_synchro.get());
+
+ MessImplPtr other_mess = queue->find_matching_message(MessImplType::PUT);
+
+ if (other_mess == nullptr) {
+ XBT_DEBUG("Get pushed first (%zu comm enqueued so far)", queue->size());
+ other_mess = std::move(this_synchro);
+ queue->push(other_mess);
+ } else {
+ XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_mess.get());
+
+ other_mess->set_state(State::READY);
+ }
+
+ observer->get_issuer()->activities_.insert(other_mess);
+ observer->set_message(other_mess.get());
+
+ /* Setup synchro */
+ other_mess->set_dst_buff(observer->get_dst_buff(), observer->get_dst_buff_size());
+ other_mess->dst_actor_ = observer->get_issuer();
+
+ other_mess->start();
+
+ return other_mess;
+}
+
+void MessImpl::finish()
+{
+ XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
+ src_actor_.get(), dst_actor_.get());
+
+ if (get_iface()) {
+ const auto& piface = static_cast<const s4u::Mess&>(*get_iface());
+ set_iface(nullptr); // reset iface to protect against multiple trigger of the on_completion signals
+ piface.fire_on_completion_for_real();
+ piface.fire_on_this_completion_for_real();
+ }
+
+ /* Update synchro state */
+ if (get_state() == State::RUNNING) {
+ set_state(State::DONE);
+ }
+
+ /* If the synchro is still in a rendez-vous point then remove from it */
+ if (queue_)
+ queue_->remove(this);
+
+ if (get_state() == State::DONE && payload_ != nullptr)
+ *(void**)(dst_buff_) = payload_;
+
+ while (not simcalls_.empty()) {
+ actor::Simcall* simcall = simcalls_.front();
+ simcalls_.pop_front();
+
+ /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
+ * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
+ * simcall */
+
+ if (simcall->call_ == actor::Simcall::Type::NONE) // FIXME: maybe a better way to handle this case
+ continue; // if actor handling comm is killed
+
+ handle_activity_waitany(simcall);
+
+ /* Check out for errors */
+
+ if (not simcall->issuer_->get_host()->is_on()) {
+ simcall->issuer_->set_wannadie();
+ } else {
+ // Do not answer to dying actors
+ if (not simcall->issuer_->wannadie()) {
+ set_exception(simcall->issuer_);
+ simcall->issuer_->simcall_answer();
+ }
+ }
+
+ simcall->issuer_->waiting_synchro_ = nullptr;
+ simcall->issuer_->activities_.erase(this);
+ }
+}
+
+} // namespace simgrid::kernel::activity
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_KERNEL_ACTIVITY_MESS_HPP
+#define SIMGRID_KERNEL_ACTIVITY_MESS_HPP
+
+#include "src/kernel/activity/ActivityImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/CommObserver.hpp"
+
+namespace simgrid::kernel::activity {
+
+enum class MessImplType { PUT, GET };
+
+class XBT_PUBLIC MessImpl : public ActivityImpl_T<MessImpl> {
+ ~MessImpl() override;
+
+ MessageQueueImpl* queue_ = nullptr;
+ void* payload_ = nullptr;
+ MessImplType type_ = MessImplType::PUT;
+ unsigned char* dst_buff_ = nullptr;
+ size_t* dst_buff_size_ = nullptr;
+
+public:
+ MessImpl& set_type(MessImplType type);
+ MessImplType get_type() const { return type_; }
+ MessImpl& set_payload(void* payload);
+ void* get_payload() { return payload_; }
+
+ MessImpl& set_queue(MessageQueueImpl* queue);
+ MessageQueueImpl* get_queue() const { return queue_; }
+ MessImpl& set_dst_buff(unsigned char* buff, size_t* size);
+
+ static ActivityImplPtr iput(actor::MessIputSimcall* observer);
+ static ActivityImplPtr iget(actor::MessIgetSimcall* observer);
+
+ MessImpl* start();
+ void set_exception(actor::ActorImpl* issuer) override {};
+ void finish() override;
+
+ actor::ActorImplPtr src_actor_ = nullptr;
+ actor::ActorImplPtr dst_actor_ = nullptr;
+};
+} // namespace simgrid::kernel::activity
+
+#endif
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "src/kernel/activity/MessageQueueImpl.hpp"
+
+#include <unordered_map>
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mq, kernel, "Message queue implementation");
+
+namespace simgrid::kernel::activity {
+
+unsigned MessageQueueImpl::next_id_ = 0;
+
+void MessageQueueImpl::push(const MessImplPtr& mess)
+{
+ mess->set_queue(this);
+ this->queue_.push_back(std::move(mess));
+}
+
+void MessageQueueImpl::remove(const MessImplPtr& mess)
+{
+ xbt_assert(mess->get_queue() == this, "Message %p is in queue %s, not queue %s", mess.get(),
+ (mess->get_queue() ? mess->get_queue()->get_cname() : "(null)"), get_cname());
+
+ mess->set_queue(nullptr);
+ auto it = std::find(queue_.begin(), queue_.end(), mess);
+ if (it != queue_.end())
+ queue_.erase(it);
+ else
+ xbt_die("Message %p not found in queue %s", mess.get(), get_cname());
+}
+
+MessImplPtr MessageQueueImpl::find_matching_message(MessImplType type)
+{
+ auto iter = std::find_if(queue_.begin(), queue_.end(), [&type](const MessImplPtr& mess)
+ {
+ return (mess->get_type() == type);
+ });
+ if (iter == queue_.end()) {
+ XBT_DEBUG("No matching message synchro found");
+ return nullptr;
+ }
+
+ const MessImplPtr& mess = *iter;
+ XBT_DEBUG("Found a matching message synchro %p", mess.get());
+ mess->set_queue(nullptr);
+ MessImplPtr mess_cpy = mess;
+ queue_.erase(iter);
+ return mess_cpy;
+}
+
+} // namespace simgrid::kernel::activity
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_KERNEL_ACTIVITY_MESSAGEQUEUE_HPP
+#define SIMGRID_KERNEL_ACTIVITY_MESSAGEQUEUE_HPP
+
+#include "simgrid/s4u/Engine.hpp"
+#include "simgrid/s4u/MessageQueue.hpp"
+#include "src/kernel/activity/MessImpl.hpp"
+
+namespace simgrid::kernel::activity {
+
+/** @brief Implementation of the s4u::MessageQueue */
+
+class MessageQueueImpl {
+ s4u::MessageQueue piface_;
+ std::string name_;
+ std::deque<MessImplPtr> queue_;
+
+ friend s4u::Engine;
+ friend s4u::MessageQueue;
+ friend s4u::MessageQueue* s4u::Engine::message_queue_by_name_or_create(const std::string& name) const;
+ friend s4u::MessageQueue* s4u::MessageQueue::by_name(const std::string& name);
+
+ static unsigned next_id_; // Next ID to be given
+ const unsigned id_ = next_id_++;
+ explicit MessageQueueImpl(const std::string& name) : piface_(this), name_(name) {}
+ MessageQueueImpl(const MailboxImpl&) = delete;
+ MessageQueueImpl& operator=(const MailboxImpl&) = delete;
+
+public:
+ /** @brief Public interface */
+ unsigned get_id() const { return id_; }
+
+ const s4u::MessageQueue* get_iface() const { return &piface_; }
+ s4u::MessageQueue* get_iface() { return &piface_; }
+
+ const std::string& get_name() const { return name_; }
+ const char* get_cname() const { return name_.c_str(); }
+ void push(const MessImplPtr& mess);
+ void remove(const MessImplPtr& mess);
+ bool empty() const { return queue_.empty(); }
+ size_t size() const { return queue_.size(); }
+ const MessImplPtr& front() const { return queue_.front(); }
+
+ MessImplPtr find_matching_message(MessImplType type);
+};
+} // namespace simgrid::kernel::activity
+
+#endif
#include "simgrid/s4u/Host.hpp"
#include "src/kernel/activity/CommImpl.hpp"
#include "src/kernel/activity/MailboxImpl.hpp"
+#include "src/kernel/activity/MessageQueueImpl.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
#include "src/kernel/actor/SimcallObserver.hpp"
#include "src/mc/mc_config.hpp"
XBT_DEBUG("RecvObserver comm:%p mbox:%u tag:%d", comm_, mbox_->get_id(), tag_);
stream << ' ' << fun_call_;
}
+
std::string CommIrecvSimcall::to_string() const
{
return "CommAsyncRecv(comm_id: " + std::to_string(comm_->get_id()) + " mbox:" + std::to_string(mbox_->get_id()) +
" tag: " + std::to_string(tag_) + ")";
}
+void MessIputSimcall::serialize(std::stringstream& stream) const
+{
+ stream << mess_ << ' ' << queue_;
+ XBT_DEBUG("PutObserver mess:%p queue:%p", mess_, queue_);
+}
+
+std::string MessIputSimcall::to_string() const
+{
+ return "MessAsyncPut(queue:" + queue_->get_name() + ")";
+}
+
+void MessIgetSimcall::serialize(std::stringstream& stream) const
+{
+ stream << mess_ << ' ' << queue_;
+ XBT_DEBUG("GettObserver mess:%p queue:%p", mess_, queue_);
+}
+
+std::string MessIgetSimcall::to_string() const
+{
+ return "MessAsyncGet(queue:" + queue_->get_name() + ")";
+}
+
+
+
} // namespace simgrid::kernel::actor
auto const& get_copy_data_fun() const { return copy_data_fun_; }
};
+class MessIputSimcall final : public SimcallObserver {
+ activity::MessageQueueImpl* queue_;
+ void* payload_;
+ activity::MessImpl* mess_ = {};
+
+public:
+ MessIputSimcall(
+ ActorImpl* actor, activity::MessageQueueImpl* queue, void* payload)
+ : SimcallObserver(actor)
+ , queue_(queue)
+ , payload_(payload)
+ {
+ }
+ void serialize(std::stringstream& stream) const override;
+ std::string to_string() const override;
+ activity::MessageQueueImpl* get_queue() const { return queue_; }
+ void* get_payload() const { return payload_; }
+ void set_message(activity::MessImpl* mess) { mess_ = mess; }
+};
+
+class MessIgetSimcall final : public SimcallObserver {
+ activity::MessageQueueImpl* queue_;
+ unsigned char* dst_buff_;
+ size_t* dst_buff_size_;
+ void* payload_;
+ activity::MessImpl* mess_ = {};
+
+public:
+ MessIgetSimcall(ActorImpl* actor, activity::MessageQueueImpl* queue, unsigned char* dst_buff, size_t* dst_buff_size,
+ void* payload)
+ : SimcallObserver(actor)
+ , queue_(queue)
+ , dst_buff_(dst_buff)
+ , dst_buff_size_(dst_buff_size)
+ , payload_(payload)
+ {
+ }
+ void serialize(std::stringstream& stream) const override;
+ std::string to_string() const override;
+ activity::MessageQueueImpl* get_queue() const { return queue_; }
+ unsigned char* get_dst_buff() const { return dst_buff_; }
+ size_t* get_dst_buff_size() const { return dst_buff_size_; }
+ void* get_payload() const { return payload_; }
+ void set_message(activity::MessImpl* mess) { mess_ = mess; }
+};
+
} // namespace simgrid::kernel::actor
#endif
return mbox->get_iface();
}
+MessageQueue* Engine::message_queue_by_name_or_create(const std::string& name) const
+{
+ /* two actors may have pushed the same mbox_create simcall at the same time */
+ kernel::activity::MessageQueueImpl* queue = kernel::actor::simcall_answered([&name, this] {
+ auto [m, inserted] = pimpl_->mqueues_.try_emplace(name, nullptr);
+ if (inserted) {
+ m->second = new kernel::activity::MessageQueueImpl(name);
+ XBT_DEBUG("Creating a message queue at %p with name %s", m->second, name.c_str());
+ }
+ return m->second;
+ });
+ return queue->get_iface();
+}
+
/** @brief Returns the amount of links in the platform */
size_t Engine::get_link_count() const
{
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <cmath>
+#include <simgrid/Exception.hpp>
+#include <simgrid/s4u/ActivitySet.hpp>
+#include <simgrid/s4u/Mess.hpp>
+#include <simgrid/s4u/Engine.hpp>
+#include <simgrid/s4u/MessageQueue.hpp>
+
+#include "src/kernel/activity/MessImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_mess, s4u_activity, "S4U asynchronous messaging");
+
+namespace simgrid::s4u {
+xbt::signal<void(Mess const&)> Mess::on_send;
+xbt::signal<void(Mess const&)> Mess::on_recv;
+
+MessPtr Mess::set_queue(MessageQueue* queue)
+{
+ queue_ = queue;
+ return this;
+}
+
+MessPtr Mess::set_payload(void* payload)
+{
+ payload_ = payload;
+ return this;
+}
+
+MessPtr Mess::set_dst_data(void** buff, size_t size)
+{
+ xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
+ __FUNCTION__);
+
+ dst_buff_ = buff;
+ dst_buff_size_ = size;
+ return this;
+}
+
+Actor* Mess::get_sender() const
+{
+ kernel::actor::ActorImplPtr sender = nullptr;
+ if (pimpl_)
+ sender = boost::static_pointer_cast<kernel::activity::MessImpl>(pimpl_)->src_actor_;
+ return sender ? sender->get_ciface() : nullptr;
+}
+
+Actor* Mess::get_receiver() const
+{
+ kernel::actor::ActorImplPtr receiver = nullptr;
+ if (pimpl_)
+ receiver = boost::static_pointer_cast<kernel::activity::MessImpl>(pimpl_)->dst_actor_;
+ return receiver ? receiver->get_ciface() : nullptr;
+}
+
+Mess* Mess::do_start()
+{
+ xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
+ "You cannot use %s() once your message exchange has started (not implemented)", __FUNCTION__);
+
+ auto myself = kernel::actor::ActorImpl::self();
+ if (myself == sender_) {
+ on_send(*this);
+ on_this_send(*this);
+ kernel::actor::MessIputSimcall observer{sender_, queue_->get_impl(), get_payload()};
+ pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iput(&observer); },
+ &observer);
+ } else if (myself == receiver_) {
+ on_recv(*this);
+ on_this_recv(*this);
+ kernel::actor::MessIgetSimcall observer{receiver_,
+ queue_->get_impl(),
+ static_cast<unsigned char*>(dst_buff_),
+ &dst_buff_size_,
+ get_payload()};
+ pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iget(&observer); },
+ &observer);
+ } else {
+ xbt_die("Cannot start a message exchange before specifying whether we are the sender or the receiver");
+ }
+
+ pimpl_->set_iface(this);
+ pimpl_->set_actor(sender_);
+ // Only throw the signal when both sides are here and the status is READY
+ if (pimpl_->get_state() != kernel::activity::State::WAITING) {
+ fire_on_start();
+ fire_on_this_start();
+ }
+ state_ = State::STARTED;
+ return this;
+}
+
+Mess* Mess::wait_for(double timeout)
+{
+ XBT_DEBUG("Calling Mess::wait_for with state %s", get_state_str());
+ kernel::actor::ActorImpl* issuer = nullptr;
+ switch (state_) {
+ case State::FINISHED:
+ break;
+ case State::FAILED:
+ throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
+ case State::INITED:
+ case State::STARTING:
+ if (get_payload() != nullptr) {
+ on_send(*this);
+ on_this_send(*this);
+ kernel::actor::MessIputSimcall observer{sender_, queue_->get_impl(), get_payload()};
+ pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iput(&observer); },
+ &observer);
+ } else { // Receiver
+ on_recv(*this);
+ on_this_recv(*this);
+ kernel::actor::MessIgetSimcall observer{receiver_,
+ queue_->get_impl(),
+ static_cast<unsigned char*>(dst_buff_),
+ &dst_buff_size_,
+ get_payload()};
+ pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iget(&observer); },
+ &observer);
+ }
+ break;
+ case State::STARTED:
+ try {
+ issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "Wait"};
+ if (kernel::actor::simcall_blocking(
+ [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
+ &observer)) {
+ throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+ }
+ } catch (const NetworkFailureException& e) {
+ issuer->simcall_.observer_ = nullptr; // Comm failed on network failure, reset the observer to nullptr
+ complete(State::FAILED);
+ e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
+ }
+ break;
+
+ case State::CANCELED:
+ throw CancelException(XBT_THROW_POINT, "Message canceled");
+
+ default:
+ THROW_IMPOSSIBLE;
+ }
+ complete(State::FINISHED);
+ return this;
+}
+
+} // namespace simgrid::s4u
--- /dev/null
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/s4u/Engine.hpp>
+#include <simgrid/s4u/Mess.hpp>
+#include <simgrid/s4u/MessageQueue.hpp>
+
+#include "src/kernel/activity/MessageQueueImpl.hpp"
+
+XBT_LOG_EXTERNAL_CATEGORY(s4u);
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_mqueue, s4u, "S4U Message Queues");
+
+namespace simgrid::s4u {
+
+const std::string& MessageQueue::get_name() const
+{
+ return pimpl_->get_name();
+}
+
+const char* MessageQueue::get_cname() const
+{
+ return pimpl_->get_cname();
+}
+
+MessageQueue* MessageQueue::by_name(const std::string& name)
+{
+ return Engine::get_instance()->message_queue_by_name_or_create(name);
+}
+
+bool MessageQueue::empty() const
+{
+ return pimpl_->empty();
+}
+
+size_t MessageQueue::size() const
+{
+ return pimpl_->size();
+}
+
+kernel::activity::MessImplPtr MessageQueue::front() const
+{
+ return pimpl_->empty() ? nullptr : pimpl_->front();
+}
+
+MessPtr MessageQueue::put_init()
+{
+ MessPtr res(new Mess());
+ res->set_queue(this);
+ res->sender_ = kernel::actor::ActorImpl::self();
+ return res;
+}
+
+MessPtr MessageQueue::put_init(void* payload)
+{
+ return put_init()->set_payload(payload);
+}
+
+MessPtr MessageQueue::put_async(void* payload)
+{
+ xbt_assert(payload != nullptr, "You cannot send nullptr");
+ MessPtr res = put_init(payload);
+ res->start();
+ return res;
+}
+
+void MessageQueue::put(void* payload)
+{
+ xbt_assert(payload != nullptr, "You cannot send nullptr");
+
+ put_async(payload)->wait();
+}
+
+/** Blocking send with timeout */
+void MessageQueue::put(void* payload, double timeout)
+{
+ xbt_assert(payload != nullptr, "You cannot send nullptr");
+
+ put_init()->set_payload(payload)->start()->wait_for(timeout);
+}
+
+MessPtr MessageQueue::get_init()
+{
+ MessPtr res(new Mess());
+ res->set_queue(this);
+ res->receiver_ = kernel::actor::ActorImpl::self();
+ return res;
+}
+
+MessPtr MessageQueue::get_async()
+{
+ MessPtr res = get_init()->set_payload(nullptr);
+ res->start();
+ return res;
+}
+
+} // namespace simgrid::s4u
src/kernel/activity/IoImpl.hpp
src/kernel/activity/MailboxImpl.cpp
src/kernel/activity/MailboxImpl.hpp
+ src/kernel/activity/MessImpl.cpp
+ src/kernel/activity/MessImpl.hpp
+ src/kernel/activity/MessageQueueImpl.cpp
+ src/kernel/activity/MessageQueueImpl.hpp
src/kernel/activity/MutexImpl.cpp
src/kernel/activity/MutexImpl.hpp
src/kernel/activity/SemaphoreImpl.cpp
src/s4u/s4u_Io.cpp
src/s4u/s4u_Link.cpp
src/s4u/s4u_Mailbox.cpp
+ src/s4u/s4u_Mess.cpp
+ src/s4u/s4u_MessageQueue.cpp
src/s4u/s4u_Mutex.cpp
src/s4u/s4u_Netzone.cpp
src/s4u/s4u_Semaphore.cpp
include/simgrid/s4u/Io.hpp
include/simgrid/s4u/Link.hpp
include/simgrid/s4u/Mailbox.hpp
+ include/simgrid/s4u/MessageQueue.hpp
include/simgrid/s4u/Mutex.hpp
include/simgrid/s4u/NetZone.hpp
include/simgrid/s4u/Semaphore.hpp