Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
strenghten the behavior of Message queues after some Wrench breaking
[simgrid.git] / src / kernel / activity / MessageQueueImpl.cpp
1 /* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/kernel/activity/MessageQueueImpl.hpp"
7
8 #include <unordered_map>
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mq, kernel, "Message queue implementation");
11
12 namespace simgrid::kernel::activity {
13
14 unsigned MessageQueueImpl::next_id_ = 0;
15
16 MessageQueueImpl::~MessageQueueImpl()
17 {
18   try {
19     clear();
20   } catch (const std::bad_alloc& ba) {
21     XBT_ERROR("MessageQueueImpl::clear() failure: %s", ba.what());
22   }
23 }
24
25 /** @brief Removes all message activities from a message queue */
26 void MessageQueueImpl::clear()
27 {
28   while (not queue_.empty()) {
29     auto mess = queue_.back();
30     if (mess->get_state() == State::WAITING) {
31       mess->cancel();
32       mess->set_state(State::FAILED);
33     } else
34       queue_.pop_back();
35   }
36   xbt_assert(queue_.empty());
37 }
38
39 void MessageQueueImpl::push(const MessImplPtr& mess)
40 {
41   mess->set_queue(this);
42   this->queue_.push_back(std::move(mess));
43 }
44
45 void MessageQueueImpl::remove(const MessImplPtr& mess)
46 {
47   xbt_assert(mess->get_queue() == this, "Message %p is in queue %s, not queue %s", mess.get(),
48              (mess->get_queue() ? mess->get_queue()->get_cname() : "(null)"), get_cname());
49
50   mess->set_queue(nullptr);
51   auto it = std::find(queue_.begin(), queue_.end(), mess);
52   if (it != queue_.end())
53     queue_.erase(it);
54   else
55     xbt_die("Message %p not found in queue %s", mess.get(), get_cname());
56 }
57
58 MessImplPtr MessageQueueImpl::find_matching_message(MessImplType type)
59 {
60   auto iter = std::find_if(queue_.begin(), queue_.end(), [&type](const MessImplPtr& mess)
61   {
62     return (mess->get_type() == type);
63   });
64   if (iter == queue_.end()) {
65     XBT_DEBUG("No matching message synchro found");
66     return nullptr;
67   }
68
69   const MessImplPtr& mess = *iter;
70   XBT_DEBUG("Found a matching message synchro %p", mess.get());
71   mess->set_queue(nullptr);
72   MessImplPtr mess_cpy = mess;
73   queue_.erase(iter);
74   return mess_cpy;
75 }
76
77 } // namespace simgrid::kernel::activity