Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of https://framagit.org/simgrid/simgrid
[simgrid.git] / examples / cpp / dht-chord / s4u-dht-chord-node.cpp
1 /* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "s4u-dht-chord.hpp"
7
8 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(s4u_chord);
9 namespace sg4 = simgrid::s4u;
10
11 void ChordMessage::destroy(void* message)
12 {
13   delete static_cast<ChordMessage*>(message);
14 }
15
16 /* Returns whether an id belongs to the interval [start, end].
17  *
18  * The parameters are normalized to make sure they are between 0 and nb_keys_ - 1).
19  * 1 belongs to [62, 3]
20  * 1 does not belong to [3, 62]
21  * 63 belongs to [62, 3]
22  * 63 does not belong to [3, 62]
23  * 24 belongs to [21, 29]
24  * 24 does not belong to [29, 21]
25  *
26  * @param id id to check
27  * @param start lower bound
28  * @param end upper bound
29  * @return true if id in in [start, end]
30  */
31 bool Node::is_in_interval(int id, int start, int end)
32 {
33   int i = id % nb_keys_;
34   int s = start % nb_keys_;
35   int e = end % nb_keys_;
36
37   // make sure end >= start and id >= start
38   if (e < s) {
39     e += nb_keys_;
40   }
41
42   if (i < s) {
43     i += nb_keys_;
44   }
45
46   return i <= e;
47 }
48
49 void Node::set_parameters(int nb_bits, int nb_keys, int timeout)
50 {
51   nb_bits_ = nb_bits;
52   nb_keys_ = nb_keys;
53   timeout_ = timeout;
54 }
55
56 /* Initializes the current node as the first one of the system */
57 Node::Node(std::vector<std::string> args)
58 {
59   xbt_assert(args.size() == 3 || args.size() == 5, "Wrong number of arguments for this node");
60
61   // initialize my node
62   id_                = std::stoi(args[1]);
63   XBT_DEBUG("Initialize node with id: %d", id_);
64   random_.set_seed(id_);
65   mailbox_           = sg4::Mailbox::by_name(std::to_string(id_));
66   next_finger_to_fix_ = 0;
67   fingers_.resize(nb_bits_, id_);
68
69   if (args.size() == 3) { // first ring
70     deadline_   = std::stod(args[2]);
71     start_time_ = sg4::Engine::get_clock();
72     XBT_DEBUG("Create a new Chord ring...");
73   } else {
74     known_id_   = std::stoi(args[2]);
75     start_time_ = std::stod(args[3]);
76     deadline_   = std::stod(args[4]);
77     XBT_DEBUG("Hey! Let's join the system in %f seconds (shall leave at time %f)", start_time_,
78               start_time_ + deadline_);
79   }
80 }
81
82 /* Makes the current node join the ring, knowing the id of a node already in the ring
83  *
84  * @param known_id id of a node already in the ring
85  * @return true if the join operation succeeded
86  *  */
87
88 void Node::join(int known_id)
89 {
90   XBT_INFO("Joining the ring with id %d, knowing node %d", id_, known_id);
91   setPredecessor(-1); // no predecessor (yet)
92
93   int successor_id = remoteFindSuccessor(known_id, id_);
94   if (successor_id == -1) {
95     XBT_INFO("Cannot join the ring.");
96   } else {
97     setFinger(0, successor_id);
98     printFingerTable();
99     joined_ = true;
100   }
101 }
102
103 /* Makes the current node quit the system */
104 void Node::leave()
105 {
106   XBT_INFO("Well Guys! I Think it's time for me to leave ;)");
107   notifyAndQuit();
108   joined_ = false;
109 }
110
111 /* Notifies the successor and the predecessor of the current node before leaving */
112 void Node::notifyAndQuit()
113 {
114   // send the PREDECESSOR_LEAVING to our successor
115   auto* pred_msg         = new ChordMessage(MessageType::PREDECESSOR_LEAVING);
116   pred_msg->request_id   = pred_id_;
117   pred_msg->answer_to    = mailbox_;
118
119   XBT_DEBUG("Sending a 'PREDECESSOR_LEAVING' to my successor %d", fingers_[0]);
120   try {
121     sg4::Mailbox::by_name(std::to_string(fingers_[0]))->put(pred_msg, 10, timeout_);
122   } catch (const simgrid::TimeoutException&) {
123     XBT_DEBUG("Timeout expired when sending a 'PREDECESSOR_LEAVING' to my successor %d", fingers_[0]);
124     delete pred_msg;
125   }
126
127   if (pred_id_ != -1 && pred_id_ != id_) {
128     // send the SUCCESSOR_LEAVING to our predecessor (only if I have one that is not me)
129     auto* succ_msg         = new ChordMessage(MessageType::SUCCESSOR_LEAVING);
130     succ_msg->request_id   = fingers_[0];
131     succ_msg->answer_to    = mailbox_;
132     XBT_DEBUG("Sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
133
134     try {
135       sg4::Mailbox::by_name(std::to_string(pred_id_))->put(succ_msg, 10, timeout_);
136     } catch (const simgrid::TimeoutException&) {
137       XBT_DEBUG("Timeout expired when sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
138       delete succ_msg;
139     }
140   }
141 }
142
143 /* Performs a find successor request to a random id */
144 void Node::randomLookup()
145 {
146   int res          = id_;
147   int random_index = random_.uniform_int(0, nb_bits_ - 1);
148   int random_id    = fingers_[random_index];
149   XBT_DEBUG("Making a lookup request for id %d", random_id);
150   if (random_id != id_)
151     res = findSuccessor(random_id);
152   XBT_DEBUG("The successor of node %d is %d", random_id, res);
153 }
154
155 /* Sets a finger of the current node.
156  *
157  * @param node the current node
158  * @param finger_index index of the finger to set (0 to nb_bits_ - 1)
159  * @param id the id to set for this finger
160  */
161 void Node::setFinger(int finger_index, int id)
162 {
163   if (id != fingers_[finger_index]) {
164     fingers_[finger_index] = id;
165     XBT_VERB("My new finger #%d is %d", finger_index, id);
166   }
167 }
168
169 /* Sets the predecessor of the current node.
170  * @param id the id to predecessor, or -1 to unset the predecessor
171  */
172 void Node::setPredecessor(int predecessor_id)
173 {
174   if (predecessor_id != pred_id_) {
175     pred_id_ = predecessor_id;
176     XBT_VERB("My new predecessor is %d", predecessor_id);
177   }
178 }
179
180 /** refreshes the finger table of the current node (called periodically) */
181 void Node::fixFingers()
182 {
183   XBT_DEBUG("Fixing fingers");
184   int id = findSuccessor(id_ + (1U << next_finger_to_fix_));
185   if (id != -1) {
186     if (id != fingers_[next_finger_to_fix_]) {
187       setFinger(next_finger_to_fix_, id);
188       printFingerTable();
189     }
190     next_finger_to_fix_ = (next_finger_to_fix_ + 1) % nb_bits_;
191   }
192 }
193
194 /** Displays the finger table of a node. */
195 void Node::printFingerTable()
196 {
197   if (XBT_LOG_ISENABLED(s4u_chord, xbt_log_priority_verbose)) {
198     XBT_VERB("My finger table:");
199     XBT_VERB("Start | Succ");
200     for (int i = 0; i < nb_bits_; i++) {
201       XBT_VERB(" %3u  | %3d", (id_ + (1U << i)) % nb_keys_, fingers_[i]);
202     }
203
204     XBT_VERB("Predecessor: %d", pred_id_);
205   }
206 }
207
208 /* checks whether the predecessor has failed (called periodically) */
209 void Node::checkPredecessor()
210 {
211   XBT_DEBUG("Checking whether my predecessor is alive");
212   if (pred_id_ == -1)
213     return;
214
215   sg4::Mailbox* mailbox        = sg4::Mailbox::by_name(std::to_string(pred_id_));
216   sg4::Mailbox* return_mailbox = sg4::Mailbox::by_name(std::to_string(id_) + "_is_alive");
217
218   auto* message         = new ChordMessage(MessageType::PREDECESSOR_ALIVE);
219   message->request_id   = pred_id_;
220   message->answer_to    = return_mailbox;
221
222   XBT_DEBUG("Sending a 'Predecessor Alive' request to my predecessor %d", pred_id_);
223   try {
224     mailbox->put(message, 10, timeout_);
225   } catch (const simgrid::TimeoutException&) {
226     XBT_DEBUG("Failed to send the 'Predecessor Alive' request to %d", pred_id_);
227     delete message;
228     return;
229   }
230
231   // receive the answer
232   XBT_DEBUG("Sent 'Predecessor Alive' request to %d, waiting for the answer on my mailbox '%s'", pred_id_,
233             message->answer_to->get_cname());
234   ChordMessage* answer       = nullptr;
235   sg4::CommPtr comm          = return_mailbox->get_async<ChordMessage>(&answer);
236
237   try {
238     comm->wait_for(timeout_);
239     XBT_DEBUG("Received the answer to my 'Predecessor Alive': my predecessor %d is alive", pred_id_);
240     delete answer;
241   } catch (const simgrid::TimeoutException&) {
242     XBT_DEBUG("Failed to receive the answer to my 'Predecessor Alive' request");
243     pred_id_ = -1;
244   }
245 }
246
247 /* Asks its predecessor to a remote node
248  *
249  * @param ask_to the node to ask to
250  * @return the id of its predecessor node, or -1 if the request failed (or if the node does not know its predecessor)
251  */
252 int Node::remoteGetPredecessor(int ask_to)
253 {
254   int predecessor_id                      = -1;
255   sg4::Mailbox* mailbox                   = sg4::Mailbox::by_name(std::to_string(ask_to));
256   sg4::Mailbox* return_mailbox            = sg4::Mailbox::by_name(std::to_string(id_) + "_pred");
257
258   auto* message         = new ChordMessage(MessageType::GET_PREDECESSOR);
259   message->request_id   = id_;
260   message->answer_to    = return_mailbox;
261
262   // send a "Get Predecessor" request to ask_to_id
263   XBT_DEBUG("Sending a 'Get Predecessor' request to %d", ask_to);
264   try {
265     mailbox->put(message, 10, timeout_);
266   } catch (const simgrid::TimeoutException&) {
267     XBT_DEBUG("Failed to send the 'Get Predecessor' request to %d", ask_to);
268     delete message;
269     return predecessor_id;
270   }
271
272   // receive the answer
273   XBT_DEBUG("Sent 'Get Predecessor' request to %d, waiting for the answer on my mailbox '%s'", ask_to,
274             message->answer_to->get_cname());
275   ChordMessage* answer       = nullptr;
276   sg4::CommPtr comm          = return_mailbox->get_async<ChordMessage>(&answer);
277
278   try {
279     comm->wait_for(timeout_);
280     XBT_DEBUG("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to,
281               answer->answer_id);
282     predecessor_id = answer->answer_id;
283     delete answer;
284   } catch (const simgrid::TimeoutException&) {
285     XBT_DEBUG("Failed to receive the answer to my 'Get Predecessor' request");
286     delete answer;
287   }
288
289   return predecessor_id;
290 }
291
292 /* Returns the closest preceding finger of an id with respect to the finger table of the current node.
293  *
294  * @param id the id to find
295  * @return the closest preceding finger of that id
296  */
297 int Node::closestPrecedingFinger(int id)
298 {
299   for (int i = nb_bits_ - 1; i >= 0; i--) {
300     if (is_in_interval(fingers_[i], id_ + 1, id - 1)) {
301       return fingers_[i];
302     }
303   }
304   return id_;
305 }
306
307 /* Makes the current node find the successor node of an id.
308  *
309  * @param id the id to find
310  * @return the id of the successor node, or -1 if the request failed
311  */
312 int Node::findSuccessor(int id)
313 {
314   // is my successor the successor?
315   if (is_in_interval(id, id_ + 1, fingers_[0])) {
316     return fingers_[0];
317   }
318
319   // otherwise, ask the closest preceding finger in my table
320   return remoteFindSuccessor(closestPrecedingFinger(id), id);
321 }
322
323 int Node::remoteFindSuccessor(int ask_to, int id)
324 {
325   int successor                           = -1;
326   sg4::Mailbox* mailbox                   = sg4::Mailbox::by_name(std::to_string(ask_to));
327   sg4::Mailbox* return_mailbox            = sg4::Mailbox::by_name(std::to_string(id_) + "_succ");
328
329   auto* message         = new ChordMessage(MessageType::FIND_SUCCESSOR);
330   message->request_id   = id_;
331   message->answer_to    = return_mailbox;
332
333   // send a "Find Successor" request to ask_to_id
334   XBT_DEBUG("Sending a 'Find Successor' request to %d for id %d", ask_to, id);
335   try {
336     mailbox->put(message, 10, timeout_);
337   } catch (const simgrid::TimeoutException&) {
338     XBT_DEBUG("Failed to send the 'Find Successor' request to %d for id %d", ask_to, id_);
339     delete message;
340     return successor;
341   }
342   // receive the answer
343   XBT_DEBUG("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id);
344   ChordMessage* answer       = nullptr;
345   sg4::CommPtr comm          = return_mailbox->get_async<ChordMessage>(&answer);
346
347   try {
348     comm->wait_for(timeout_);
349     XBT_DEBUG("Received the answer to my 'Find Successor' request for id %d: the successor of key %d is %d",
350               answer->request_id, id_, answer->answer_id);
351     successor = answer->answer_id;
352     delete answer;
353   } catch (const simgrid::TimeoutException&) {
354     XBT_DEBUG("Failed to receive the answer to my 'Find Successor' request");
355     delete answer;
356   }
357
358   return successor;
359 }
360
361 /* Notifies the current node that its predecessor may have changed. */
362 void Node::notify(int predecessor_candidate_id)
363 {
364   if (pred_id_ == -1 || is_in_interval(predecessor_candidate_id, pred_id_ + 1, id_ - 1)) {
365     setPredecessor(predecessor_candidate_id);
366     printFingerTable();
367   } else {
368     XBT_DEBUG("I don't have to change my predecessor to %d", predecessor_candidate_id);
369   }
370 }
371
372 /* Notifies a remote node that its predecessor may have changed. */
373 void Node::remoteNotify(int notify_id, int predecessor_candidate_id) const
374 {
375   auto* message         = new ChordMessage(MessageType::NOTIFY);
376   message->request_id   = predecessor_candidate_id;
377   message->answer_to    = nullptr;
378
379   // send a "Notify" request to notify_id
380   XBT_DEBUG("Sending a 'Notify' request to %d", notify_id);
381   sg4::Mailbox* mailbox = sg4::Mailbox::by_name(std::to_string(notify_id));
382   mailbox->put_init(message, 10)->detach(ChordMessage::destroy);
383 }
384
385 /* This function is called periodically. It checks the immediate successor of the current node. */
386 void Node::stabilize()
387 {
388   XBT_DEBUG("Stabilizing node");
389
390   // get the predecessor of my immediate successor
391   int candidate_id = pred_id_;
392   int successor_id = fingers_[0];
393   if (successor_id != id_)
394     candidate_id = remoteGetPredecessor(successor_id);
395
396   // this node is a candidate to become my new successor
397   if (candidate_id != -1 && is_in_interval(candidate_id, id_ + 1, successor_id - 1)) {
398     setFinger(0, candidate_id);
399   }
400   if (successor_id != id_) {
401     remoteNotify(successor_id, id_);
402   }
403 }
404
405 /* This function is called when a node receives a message.
406  *
407  * @param message the message to handle (don't touch it afterward: it will be destroyed, reused or forwarded)
408  */
409 void Node::handleMessage(ChordMessage* message)
410 {
411   switch (message->type) {
412     case MessageType::FIND_SUCCESSOR:
413       XBT_DEBUG("Received a 'Find Successor' request from %s for id %d", message->issuer_host_name.c_str(),
414                 message->request_id);
415       // is my successor the successor?
416       if (is_in_interval(message->request_id, id_ + 1, fingers_[0])) {
417         message->type      = MessageType::FIND_SUCCESSOR_ANSWER;
418         message->answer_id = fingers_[0];
419         XBT_DEBUG("Sending back a 'Find Successor Answer' to %s (mailbox %s): the successor of %d is %d",
420                   message->issuer_host_name.c_str(), message->answer_to->get_cname(), message->request_id,
421                   message->answer_id);
422         message->answer_to->put_init(message, 10)->detach(ChordMessage::destroy);
423       } else {
424         // otherwise, forward the request to the closest preceding finger in my table
425         int closest = closestPrecedingFinger(message->request_id);
426         XBT_DEBUG("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
427                   message->request_id, closest);
428         sg4::Mailbox* mailbox = sg4::Mailbox::by_name(std::to_string(closest));
429         mailbox->put_init(message, 10)->detach(ChordMessage::destroy);
430       }
431       break;
432
433     case MessageType::GET_PREDECESSOR:
434       XBT_DEBUG("Receiving a 'Get Predecessor' request from %s", message->issuer_host_name.c_str());
435       message->type      = MessageType::GET_PREDECESSOR_ANSWER;
436       message->answer_id = pred_id_;
437       XBT_DEBUG("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
438                 message->issuer_host_name.c_str(), message->answer_to->get_cname(), message->answer_id);
439       message->answer_to->put_init(message, 10)->detach(ChordMessage::destroy);
440       break;
441
442     case MessageType::NOTIFY:
443       // someone is telling me that he may be my new predecessor
444       XBT_DEBUG("Receiving a 'Notify' request from %s", message->issuer_host_name.c_str());
445       notify(message->request_id);
446       delete message;
447       break;
448
449     case MessageType::PREDECESSOR_LEAVING:
450       // my predecessor is about to quit
451       XBT_DEBUG("Receiving a 'Predecessor Leaving' message from %s", message->issuer_host_name.c_str());
452       // modify my predecessor
453       setPredecessor(message->request_id);
454       delete message;
455       /*TODO :
456         >> notify my new predecessor
457         >> send a notify_predecessors !!
458        */
459       break;
460
461     case MessageType::SUCCESSOR_LEAVING:
462       // my successor is about to quit
463       XBT_DEBUG("Receiving a 'Successor Leaving' message from %s", message->issuer_host_name.c_str());
464       // modify my successor FIXME : this should be implicit ?
465       setFinger(0, message->request_id);
466       delete message;
467       /* TODO
468          >> notify my new successor
469          >> update my table & predecessors table */
470       break;
471
472     case MessageType::PREDECESSOR_ALIVE:
473       XBT_DEBUG("Receiving a 'Predecessor Alive' request from %s", message->issuer_host_name.c_str());
474       message->type = MessageType::PREDECESSOR_ALIVE_ANSWER;
475       XBT_DEBUG("Sending back a 'Predecessor Alive Answer' to %s (mailbox %s)", message->issuer_host_name.c_str(),
476                 message->answer_to->get_cname());
477       message->answer_to->put_init(message, 10)->detach(ChordMessage::destroy);
478       break;
479
480     default:
481       XBT_DEBUG("Ignoring unexpected message: %d from %s", static_cast<int>(message->type),
482                 message->issuer_host_name.c_str());
483       delete message;
484   }
485 }
486
487 void Node::operator()()
488 {
489   sg4::this_actor::sleep_for(start_time_);
490   if (known_id_ == -1) {
491     setPredecessor(-1); // -1 means that I have no predecessor
492     printFingerTable();
493     joined_ = true;
494   } else {
495     join(known_id_);
496   }
497
498   if (not joined_)
499     return;
500   ChordMessage* message              = nullptr;
501   double now                         = sg4::Engine::get_clock();
502   double next_stabilize_date         = start_time_ + PERIODIC_STABILIZE_DELAY;
503   double next_fix_fingers_date       = start_time_ + PERIODIC_FIX_FINGERS_DELAY;
504   double next_check_predecessor_date = start_time_ + PERIODIC_CHECK_PREDECESSOR_DELAY;
505   double next_lookup_date            = start_time_ + PERIODIC_LOOKUP_DELAY;
506   sg4::CommPtr comm_receive          = nullptr;
507   while (now < std::min(start_time_ + deadline_, MAX_SIMULATION_TIME)) {
508     if (comm_receive == nullptr)
509       comm_receive = mailbox_->get_async<ChordMessage>(&message);
510     bool comm_completed = true;
511     try {
512       if (not comm_receive->test())
513         comm_completed = false;
514     } catch (const simgrid::TimeoutException&) {
515       XBT_DEBUG("Caught a timeout, go ahead.");
516     }
517
518     if (comm_completed) {
519       if (message != nullptr) {
520         handleMessage(message);
521         message = nullptr;
522       }
523       comm_receive = nullptr;
524     } else {
525       // no task was received: make some periodic calls
526       if (now >= next_stabilize_date) {
527         stabilize();
528         next_stabilize_date = sg4::Engine::get_clock() + PERIODIC_STABILIZE_DELAY;
529       } else if (now >= next_fix_fingers_date) {
530         fixFingers();
531         next_fix_fingers_date = sg4::Engine::get_clock() + PERIODIC_FIX_FINGERS_DELAY;
532       } else if (now >= next_check_predecessor_date) {
533         checkPredecessor();
534         next_check_predecessor_date = sg4::Engine::get_clock() + PERIODIC_CHECK_PREDECESSOR_DELAY;
535       } else if (now >= next_lookup_date) {
536         randomLookup();
537         next_lookup_date = sg4::Engine::get_clock() + PERIODIC_LOOKUP_DELAY;
538       } else {
539         // nothing to do: sleep for a while
540         sg4::this_actor::sleep_for(SLEEP_DELAY);
541       }
542     }
543
544     now = sg4::Engine::get_clock();
545   }
546   if (comm_receive != nullptr) {
547     try {
548       if (comm_receive->test())
549         delete message;
550       else
551         comm_receive->cancel();
552     } catch (const simgrid::TimeoutException&) {
553       XBT_DEBUG("Caught a timeout for last message, nevermind.");
554     }
555   }
556   // leave the ring
557   leave();
558 }