1 /* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "routing_table.hpp"
9 XBT_LOG_NEW_DEFAULT_CATEGORY(kademlia_node, "Messages specific for this example");
10 namespace sg4 = simgrid::s4u;
13 static void destroy(void* message)
15 const auto* msg = static_cast<Message*>(message);
20 * Try to asynchronously get a new message from given mailbox. Return null if none available.
22 Message* Node::receive(sg4::Mailbox* mailbox)
24 if (receive_comm == nullptr)
25 receive_comm = mailbox->get_async<kademlia::Message>(&received_msg);
26 if (not receive_comm->test())
28 receive_comm = nullptr;
33 * @brief Tries to join the network
34 * @param known_id id of the node I know in the network.
36 bool Node::join(unsigned int known_id)
38 bool got_answer = false;
40 /* Add the guy we know to our routing table and ourselves. */
41 routingTableUpdate(id_);
42 routingTableUpdate(known_id);
44 /* First step: Send a "FIND_NODE" request to the node we know */
45 sendFindNode(known_id, id_);
47 sg4::Mailbox* mailbox = sg4::Mailbox::by_name(std::to_string(id_));
49 if (Message* msg = receive(mailbox)) {
50 XBT_DEBUG("Received an answer from the node I know.");
52 // retrieve the node list and ping them.
53 if (const Answer* node_list = msg->answer_.get()) {
54 for (auto const& [contact, _] : node_list->getNodes())
55 routingTableUpdate(contact);
61 sg4::this_actor::sleep_for(1);
62 } while (not got_answer);
64 /* Second step: Send a FIND_NODE to a random node in buckets */
65 unsigned int bucket_id = table.findBucket(known_id)->getId();
66 xbt_assert(bucket_id <= IDENTIFIER_SIZE);
67 for (unsigned int i = 0; ((bucket_id > i) || (bucket_id + i) <= IDENTIFIER_SIZE) && i < JOIN_BUCKETS_QUERIES; i++) {
69 unsigned int id_in_bucket = get_id_in_prefix(id_, bucket_id - i);
70 findNode(id_in_bucket, false);
72 if (bucket_id + i <= IDENTIFIER_SIZE) {
73 unsigned int id_in_bucket = get_id_in_prefix(id_, bucket_id + i);
74 findNode(id_in_bucket, false);
80 /** @brief Send a "FIND_NODE" to a node
81 * @param id node we are querying
82 * @param destination node we are trying to find.
84 void Node::sendFindNode(unsigned int id, unsigned int destination) const
86 /* Gets the mailbox to send to */
87 sg4::Mailbox* mailbox = sg4::Mailbox::by_name(std::to_string(id));
91 new Message(id_, destination, sg4::Mailbox::by_name(std::to_string(id_)), sg4::Host::current()->get_cname());
94 mailbox->put_init(msg, 1)->detach(kademlia::destroy);
95 XBT_VERB("Asking %u for its closest nodes", id);
99 * Sends to the best "KADEMLIA_ALPHA" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best
102 unsigned int Node::sendFindNodeToBest(const Answer* node_list) const
106 unsigned int destination = node_list->getDestinationId();
107 for (auto const& [node_to_query, _] : node_list->getNodes()) {
108 /* We need to have at most "KADEMLIA_ALPHA" requests each time, according to the protocol */
109 /* Gets the node we want to send the query to */
110 if (node_to_query != id_) { /* No need to query ourselves */
111 sendFindNode(node_to_query, destination);
115 if (j == KADEMLIA_ALPHA)
121 /** @brief Updates/Puts the node id unsigned into our routing table
122 * @param id The id of the node we need to add unsigned into our routing table
124 void Node::routingTableUpdate(unsigned int id)
126 // retrieval of the bucket in which the should be
127 Bucket* bucket = table.findBucket(id);
129 // check if the id is already in the bucket.
130 auto id_pos = std::find(bucket->nodes_.begin(), bucket->nodes_.end(), id);
132 if (id_pos == bucket->nodes_.end()) {
133 /* We check if the bucket is full or not. If it is, we evict an old element */
134 if (bucket->nodes_.size() >= BUCKET_SIZE) {
135 bucket->nodes_.pop_back();
137 bucket->nodes_.push_front(id);
138 XBT_VERB("I'm adding to my routing table %08x", id);
140 // We push the element to the front
141 bucket->nodes_.erase(id_pos);
142 bucket->nodes_.push_front(id);
143 XBT_VERB("I'm updating %08x", id);
147 /** @brief Finds the closest nodes to the node given.
148 * @param node : our node
149 * @param destination_id : the id of the guy we are trying to find
151 std::unique_ptr<Answer> Node::findClosest(unsigned int destination_id)
153 auto answer = std::make_unique<Answer>(destination_id);
154 /* We find the corresponding bucket for the id */
155 const Bucket* bucket = table.findBucket(destination_id);
156 int bucket_id = bucket->getId();
157 xbt_assert((bucket_id <= IDENTIFIER_SIZE), "Bucket found has a wrong identifier");
158 /* So, we copy the contents of the bucket unsigned into our answer */
159 answer->addBucket(bucket);
161 /* However, if we don't have enough elements in our bucket, we NEED to include at least "BUCKET_SIZE" elements
162 * (if, of course, we know at least "BUCKET_SIZE" elements. So we're going to look unsigned into the other buckets.
164 for (int i = 1; answer->getSize() < BUCKET_SIZE && ((bucket_id - i > 0) || (bucket_id + i < IDENTIFIER_SIZE)); i++) {
165 /* We check the previous buckets */
166 if (bucket_id - i >= 0) {
167 const Bucket* bucket_p = &table.getBucketAt(bucket_id - i);
168 answer->addBucket(bucket_p);
170 /* We check the next buckets */
171 if (bucket_id + i <= IDENTIFIER_SIZE) {
172 const Bucket* bucket_n = &table.getBucketAt(bucket_id + i);
173 answer->addBucket(bucket_n);
176 /* We trim the array to have only BUCKET_SIZE or less elements */
182 /** @brief Send a request to find a node in the node routing table.
183 * @param id_to_find the id of the node we are trying to find
185 bool Node::findNode(unsigned int id_to_find, bool count_in_stats)
187 unsigned int queries;
188 unsigned int answers;
189 bool destination_found = false;
190 unsigned int nodes_added = 0;
191 double global_timeout = sg4::Engine::get_clock() + FIND_NODE_GLOBAL_TIMEOUT;
192 unsigned int steps = 0;
194 /* First we build a list of who we already know */
195 std::unique_ptr<Answer> node_list = findClosest(id_to_find);
196 xbt_assert((node_list != nullptr), "node_list incorrect");
197 XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
199 /* Ask the nodes on our list if they have information about the node we are trying to find */
202 queries = sendFindNodeToBest(node_list.get());
204 double timeout = sg4::Engine::get_clock() + FIND_NODE_TIMEOUT;
206 double time_beginreceive = sg4::Engine::get_clock();
208 sg4::Mailbox* mailbox = sg4::Mailbox::by_name(std::to_string(id_));
210 if (Message* msg = receive(mailbox)) {
211 // Check if what we have received is what we are looking for.
212 if (msg->answer_ && msg->answer_->getDestinationId() == id_to_find) {
213 routingTableUpdate(msg->sender_id_);
215 for (auto const& [contact, _] : node_list->getNodes())
216 routingTableUpdate(contact);
219 nodes_added = node_list->merge(msg->answer_.get());
220 XBT_DEBUG("Received an answer from %s (%s) with %zu nodes on it", msg->answer_to_->get_cname(),
221 msg->issuer_host_name_.c_str(), msg->answer_->getSize());
224 routingTableUpdate(msg->sender_id_);
225 XBT_DEBUG("Received a wrong answer for a FIND_NODE");
229 // Update the timeout if we didn't have our answer
230 timeout += sg4::Engine::get_clock() - time_beginreceive;
231 time_beginreceive = sg4::Engine::get_clock();
235 sg4::this_actor::sleep_for(1);
237 } while (sg4::Engine::get_clock() < timeout && answers < queries);
238 destination_found = node_list->destinationFound();
239 } while (not destination_found && (nodes_added > 0 || answers == 0) && sg4::Engine::get_clock() < global_timeout &&
242 if (destination_found) {
246 XBT_VERB("FIND_NODE on %08x success in %u steps", id_to_find, steps);
247 routingTableUpdate(id_to_find);
249 if (count_in_stats) {
251 XBT_VERB("%08x not found in %u steps", id_to_find, steps);
254 return destination_found;
257 /** @brief Does a pseudo-random lookup for someone in the system
258 * @param node caller node data
260 void Node::randomLookup()
262 unsigned int id_to_look = RANDOM_LOOKUP_NODE; // Totally random.
263 /* TODO: Use some pseudo-random generator. */
264 XBT_DEBUG("I'm doing a random lookup");
265 findNode(id_to_look, true);
268 /** @brief Handles the answer to an incoming "find_node" task */
269 void Node::handleFindNode(const Message* msg)
271 routingTableUpdate(msg->sender_id_);
272 XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x", msg->answer_to_->get_cname(),
273 msg->issuer_host_name_.c_str(), msg->destination_id_);
274 // Building the answer to the request
275 auto* answer = new Message(id_, msg->destination_id_, findClosest(msg->destination_id_),
276 sg4::Mailbox::by_name(std::to_string(id_)), sg4::Host::current()->get_cname());
277 // Sending the answer
278 msg->answer_to_->put_init(answer, 1)->detach(kademlia::destroy);
281 void Node::displaySuccessRate() const
283 XBT_INFO("%u/%u FIND_NODE have succeeded", find_node_success, find_node_success + find_node_failed);
285 } // namespace kademlia
286 /**@brief Returns an identifier which is in a specific bucket of a routing table
287 * @param id id of the routing table owner
288 * @param prefix id of the bucket where we want that identifier to be
290 unsigned int get_id_in_prefix(unsigned int id, unsigned int prefix)
295 return (1U << (prefix - 1)) ^ id;
299 /** @brief Returns the prefix of an identifier.
300 * The prefix is the id of the bucket in which the remote identifier xor our identifier should be stored.
301 * @param id : big unsigned int id to test
302 * @param nb_bits : key size
304 unsigned int get_node_prefix(unsigned int id, unsigned int nb_bits)
306 unsigned int size = sizeof(unsigned int) * 8;
307 for (unsigned int j = 0; j < size; j++) {
308 if (((id >> (size - 1 - j)) & 0x1) != 0) {