Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
strenghten the behavior of Message queues after some Wrench breaking
authorFred Suter <suterf@ornl.gov>
Tue, 21 Nov 2023 00:23:34 +0000 (19:23 -0500)
committerFred Suter <suterf@ornl.gov>
Tue, 21 Nov 2023 00:23:34 +0000 (19:23 -0500)
src/kernel/activity/MessImpl.cpp
src/kernel/activity/MessImpl.hpp
src/kernel/activity/MessageQueueImpl.cpp
src/kernel/activity/MessageQueueImpl.hpp

index 28679c2..3a3e307 100644 (file)
@@ -130,9 +130,18 @@ void MessImpl::wait_for(actor::ActorImpl* issuer, double timeout)
   ActivityImpl::wait_for(issuer, timeout);
 }
 
+void MessImpl::cancel()
+{
+  /* if the synchro is a waiting state means that it is still in a mbox so remove from it and delete it */
+  if (get_state() == State::WAITING) {
+      queue_->remove(this);
+      set_state(State::CANCELED);
+  }
+}
+
 void MessImpl::finish()
 {
-  XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
+  XBT_DEBUG("MessImpl::finish() mess %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
             src_actor_.get(), dst_actor_.get());
 
   if (get_iface()) {
index fad011c..2b2932f 100644 (file)
@@ -39,6 +39,9 @@ public:
   void wait_for(actor::ActorImpl* issuer, double timeout) override;
 
   MessImpl* start();
+  void suspend() override { /* no action to suspend for Mess */ }
+  void resume() override { /* no action to resume for Mess */ }
+  void cancel() override;
   void set_exception(actor::ActorImpl* issuer) override {};
   void finish() override;
 
index 714f86b..d649e6e 100644 (file)
@@ -13,6 +13,29 @@ namespace simgrid::kernel::activity {
 
 unsigned MessageQueueImpl::next_id_ = 0;
 
+MessageQueueImpl::~MessageQueueImpl()
+{
+  try {
+    clear();
+  } catch (const std::bad_alloc& ba) {
+    XBT_ERROR("MessageQueueImpl::clear() failure: %s", ba.what());
+  }
+}
+
+/** @brief Removes all message activities from a message queue */
+void MessageQueueImpl::clear()
+{
+  while (not queue_.empty()) {
+    auto mess = queue_.back();
+    if (mess->get_state() == State::WAITING) {
+      mess->cancel();
+      mess->set_state(State::FAILED);
+    } else
+      queue_.pop_back();
+  }
+  xbt_assert(queue_.empty());
+}
+
 void MessageQueueImpl::push(const MessImplPtr& mess)
 {
   mess->set_queue(this);
index 4bc010f..3016e04 100644 (file)
@@ -31,6 +31,8 @@ class MessageQueueImpl {
   MessageQueueImpl& operator=(const MailboxImpl&) = delete;
 
 public:
+  ~MessageQueueImpl();
+
   /** @brief Public interface */
   unsigned get_id() const { return id_; }
 
@@ -41,6 +43,7 @@ public:
   const char* get_cname() const { return name_.c_str(); }
   void push(const MessImplPtr& mess);
   void remove(const MessImplPtr& mess);
+  void clear();
   bool empty() const { return queue_.empty(); }
   size_t size() const { return queue_.size(); }
   const MessImplPtr& front() const { return queue_.front(); }