Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
4e412f298195ecacf6b249a998b891c811583f05
[simgrid.git] / examples / cpp / dht-chord / s4u-dht-chord-node.cpp
1 /* Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "s4u-dht-chord.hpp"
7
8 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(s4u_chord);
9 namespace sg4 = simgrid::s4u;
10
11 /* Returns whether an id belongs to the interval [start, end].
12  *
13  * The parameters are normalized to make sure they are between 0 and nb_keys - 1).
14  * 1 belongs to [62, 3]
15  * 1 does not belong to [3, 62]
16  * 63 belongs to [62, 3]
17  * 63 does not belong to [3, 62]
18  * 24 belongs to [21, 29]
19  * 24 does not belong to [29, 21]
20  *
21  * @param id id to check
22  * @param start lower bound
23  * @param end upper bound
24  * @return true if id in in [start, end]
25  */
26 static bool is_in_interval(int id, int start, int end)
27 {
28   int i = id % nb_keys;
29   int s = start % nb_keys;
30   int e = end % nb_keys;
31
32   // make sure end >= start and id >= start
33   if (e < s) {
34     e += nb_keys;
35   }
36
37   if (i < s) {
38     i += nb_keys;
39   }
40
41   return i <= e;
42 }
43
44 void ChordMessage::destroy(void* message)
45 {
46   delete static_cast<ChordMessage*>(message);
47 }
48
49 /* Initializes the current node as the first one of the system */
50 Node::Node(std::vector<std::string> args)
51 {
52   xbt_assert(args.size() == 3 || args.size() == 5, "Wrong number of arguments for this node");
53
54   // initialize my node
55   id_                = std::stoi(args[1]);
56   XBT_DEBUG("Initialize node with id: %d", id_);
57   random.set_seed(id_);
58   mailbox_           = sg4::Mailbox::by_name(std::to_string(id_));
59   next_finger_to_fix = 0;
60   fingers_.resize(nb_bits, id_);
61
62   if (args.size() == 3) { // first ring
63     deadline_   = std::stod(args[2]);
64     start_time_ = sg4::Engine::get_clock();
65     XBT_DEBUG("Create a new Chord ring...");
66   } else {
67     known_id_   = std::stoi(args[2]);
68     start_time_ = std::stod(args[3]);
69     deadline_   = std::stod(args[4]);
70     XBT_DEBUG("Hey! Let's join the system in %f seconds (shall leave at time %f)", start_time_,
71               start_time_ + deadline_);
72   }
73 }
74
75 /* Makes the current node join the ring, knowing the id of a node already in the ring
76  *
77  * @param known_id id of a node already in the ring
78  * @return true if the join operation succeeded
79  *  */
80
81 void Node::join(int known_id)
82 {
83   XBT_INFO("Joining the ring with id %d, knowing node %d", id_, known_id);
84   setPredecessor(-1); // no predecessor (yet)
85
86   int successor_id = remoteFindSuccessor(known_id, id_);
87   if (successor_id == -1) {
88     XBT_INFO("Cannot join the ring.");
89   } else {
90     setFinger(0, successor_id);
91     printFingerTable();
92     joined = true;
93   }
94 }
95
96 /* Makes the current node quit the system */
97 void Node::leave()
98 {
99   XBT_INFO("Well Guys! I Think it's time for me to leave ;)");
100   notifyAndQuit();
101   joined = false;
102 }
103
104 /* Notifies the successor and the predecessor of the current node before leaving */
105 void Node::notifyAndQuit()
106 {
107   // send the PREDECESSOR_LEAVING to our successor
108   auto* pred_msg         = new ChordMessage(MessageType::PREDECESSOR_LEAVING);
109   pred_msg->request_id   = pred_id_;
110   pred_msg->answer_to    = mailbox_;
111
112   XBT_DEBUG("Sending a 'PREDECESSOR_LEAVING' to my successor %d", fingers_[0]);
113   try {
114     sg4::Mailbox::by_name(std::to_string(fingers_[0]))->put(pred_msg, 10, timeout);
115   } catch (const simgrid::TimeoutException&) {
116     XBT_DEBUG("Timeout expired when sending a 'PREDECESSOR_LEAVING' to my successor %d", fingers_[0]);
117     delete pred_msg;
118   }
119
120   if (pred_id_ != -1 && pred_id_ != id_) {
121     // send the SUCCESSOR_LEAVING to our predecessor (only if I have one that is not me)
122     auto* succ_msg         = new ChordMessage(MessageType::SUCCESSOR_LEAVING);
123     succ_msg->request_id   = fingers_[0];
124     succ_msg->answer_to    = mailbox_;
125     XBT_DEBUG("Sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
126
127     try {
128       sg4::Mailbox::by_name(std::to_string(pred_id_))->put(succ_msg, 10, timeout);
129     } catch (const simgrid::TimeoutException&) {
130       XBT_DEBUG("Timeout expired when sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
131       delete succ_msg;
132     }
133   }
134 }
135
136 /* Performs a find successor request to a random id */
137 void Node::randomLookup()
138 {
139   int res          = id_;
140   int random_index = random.uniform_int(0, nb_bits - 1);
141   int random_id    = fingers_[random_index];
142   XBT_DEBUG("Making a lookup request for id %d", random_id);
143   if (random_id != id_)
144     res = findSuccessor(random_id);
145   XBT_DEBUG("The successor of node %d is %d", random_id, res);
146 }
147
148 /* Sets a finger of the current node.
149  *
150  * @param node the current node
151  * @param finger_index index of the finger to set (0 to nb_bits - 1)
152  * @param id the id to set for this finger
153  */
154 void Node::setFinger(int finger_index, int id)
155 {
156   if (id != fingers_[finger_index]) {
157     fingers_[finger_index] = id;
158     XBT_VERB("My new finger #%d is %d", finger_index, id);
159   }
160 }
161
162 /* Sets the predecessor of the current node.
163  * @param id the id to predecessor, or -1 to unset the predecessor
164  */
165 void Node::setPredecessor(int predecessor_id)
166 {
167   if (predecessor_id != pred_id_) {
168     pred_id_ = predecessor_id;
169     XBT_VERB("My new predecessor is %d", predecessor_id);
170   }
171 }
172
173 /** refreshes the finger table of the current node (called periodically) */
174 void Node::fixFingers()
175 {
176   XBT_DEBUG("Fixing fingers");
177   int id = findSuccessor(id_ + (1U << next_finger_to_fix));
178   if (id != -1) {
179     if (id != fingers_[next_finger_to_fix]) {
180       setFinger(next_finger_to_fix, id);
181       printFingerTable();
182     }
183     next_finger_to_fix = (next_finger_to_fix + 1) % nb_bits;
184   }
185 }
186
187 /** Displays the finger table of a node. */
188 void Node::printFingerTable()
189 {
190   if (XBT_LOG_ISENABLED(s4u_chord, xbt_log_priority_verbose)) {
191     XBT_VERB("My finger table:");
192     XBT_VERB("Start | Succ");
193     for (int i = 0; i < nb_bits; i++) {
194       XBT_VERB(" %3u  | %3d", (id_ + (1U << i)) % nb_keys, fingers_[i]);
195     }
196
197     XBT_VERB("Predecessor: %d", pred_id_);
198   }
199 }
200
201 /* checks whether the predecessor has failed (called periodically) */
202 void Node::checkPredecessor()
203 {
204   XBT_DEBUG("Checking whether my predecessor is alive");
205   if (pred_id_ == -1)
206     return;
207
208   sg4::Mailbox* mailbox        = sg4::Mailbox::by_name(std::to_string(pred_id_));
209   sg4::Mailbox* return_mailbox = sg4::Mailbox::by_name(std::to_string(id_) + "_is_alive");
210
211   auto* message         = new ChordMessage(MessageType::PREDECESSOR_ALIVE);
212   message->request_id   = pred_id_;
213   message->answer_to    = return_mailbox;
214
215   XBT_DEBUG("Sending a 'Predecessor Alive' request to my predecessor %d", pred_id_);
216   try {
217     mailbox->put(message, 10, timeout);
218   } catch (const simgrid::TimeoutException&) {
219     XBT_DEBUG("Failed to send the 'Predecessor Alive' request to %d", pred_id_);
220     delete message;
221     return;
222   }
223
224   // receive the answer
225   XBT_DEBUG("Sent 'Predecessor Alive' request to %d, waiting for the answer on my mailbox '%s'", pred_id_,
226             message->answer_to->get_cname());
227   ChordMessage* answer       = nullptr;
228   sg4::CommPtr comm          = return_mailbox->get_async<ChordMessage>(&answer);
229
230   try {
231     comm->wait_for(timeout);
232     XBT_DEBUG("Received the answer to my 'Predecessor Alive': my predecessor %d is alive", pred_id_);
233     delete answer;
234   } catch (const simgrid::TimeoutException&) {
235     XBT_DEBUG("Failed to receive the answer to my 'Predecessor Alive' request");
236     pred_id_ = -1;
237   }
238 }
239
240 /* Asks its predecessor to a remote node
241  *
242  * @param ask_to the node to ask to
243  * @return the id of its predecessor node, or -1 if the request failed (or if the node does not know its predecessor)
244  */
245 int Node::remoteGetPredecessor(int ask_to)
246 {
247   int predecessor_id                      = -1;
248   sg4::Mailbox* mailbox                   = sg4::Mailbox::by_name(std::to_string(ask_to));
249   sg4::Mailbox* return_mailbox            = sg4::Mailbox::by_name(std::to_string(id_) + "_pred");
250
251   auto* message         = new ChordMessage(MessageType::GET_PREDECESSOR);
252   message->request_id   = id_;
253   message->answer_to    = return_mailbox;
254
255   // send a "Get Predecessor" request to ask_to_id
256   XBT_DEBUG("Sending a 'Get Predecessor' request to %d", ask_to);
257   try {
258     mailbox->put(message, 10, timeout);
259   } catch (const simgrid::TimeoutException&) {
260     XBT_DEBUG("Failed to send the 'Get Predecessor' request to %d", ask_to);
261     delete message;
262     return predecessor_id;
263   }
264
265   // receive the answer
266   XBT_DEBUG("Sent 'Get Predecessor' request to %d, waiting for the answer on my mailbox '%s'", ask_to,
267             message->answer_to->get_cname());
268   ChordMessage* answer       = nullptr;
269   sg4::CommPtr comm          = return_mailbox->get_async<ChordMessage>(&answer);
270
271   try {
272     comm->wait_for(timeout);
273     XBT_DEBUG("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to,
274               answer->answer_id);
275     predecessor_id = answer->answer_id;
276     delete answer;
277   } catch (const simgrid::TimeoutException&) {
278     XBT_DEBUG("Failed to receive the answer to my 'Get Predecessor' request");
279     delete answer;
280   }
281
282   return predecessor_id;
283 }
284
285 /* Returns the closest preceding finger of an id with respect to the finger table of the current node.
286  *
287  * @param id the id to find
288  * @return the closest preceding finger of that id
289  */
290 int Node::closestPrecedingFinger(int id)
291 {
292   for (int i = nb_bits - 1; i >= 0; i--) {
293     if (is_in_interval(fingers_[i], id_ + 1, id - 1)) {
294       return fingers_[i];
295     }
296   }
297   return id_;
298 }
299
300 /* Makes the current node find the successor node of an id.
301  *
302  * @param id the id to find
303  * @return the id of the successor node, or -1 if the request failed
304  */
305 int Node::findSuccessor(int id)
306 {
307   // is my successor the successor?
308   if (is_in_interval(id, id_ + 1, fingers_[0])) {
309     return fingers_[0];
310   }
311
312   // otherwise, ask the closest preceding finger in my table
313   return remoteFindSuccessor(closestPrecedingFinger(id), id);
314 }
315
316 int Node::remoteFindSuccessor(int ask_to, int id)
317 {
318   int successor                           = -1;
319   sg4::Mailbox* mailbox                   = sg4::Mailbox::by_name(std::to_string(ask_to));
320   sg4::Mailbox* return_mailbox            = sg4::Mailbox::by_name(std::to_string(id_) + "_succ");
321
322   auto* message         = new ChordMessage(MessageType::FIND_SUCCESSOR);
323   message->request_id   = id_;
324   message->answer_to    = return_mailbox;
325
326   // send a "Find Successor" request to ask_to_id
327   XBT_DEBUG("Sending a 'Find Successor' request to %d for id %d", ask_to, id);
328   try {
329     mailbox->put(message, 10, timeout);
330   } catch (const simgrid::TimeoutException&) {
331     XBT_DEBUG("Failed to send the 'Find Successor' request to %d for id %d", ask_to, id_);
332     delete message;
333     return successor;
334   }
335   // receive the answer
336   XBT_DEBUG("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id);
337   ChordMessage* answer       = nullptr;
338   sg4::CommPtr comm          = return_mailbox->get_async<ChordMessage>(&answer);
339
340   try {
341     comm->wait_for(timeout);
342     XBT_DEBUG("Received the answer to my 'Find Successor' request for id %d: the successor of key %d is %d",
343               answer->request_id, id_, answer->answer_id);
344     successor = answer->answer_id;
345     delete answer;
346   } catch (const simgrid::TimeoutException&) {
347     XBT_DEBUG("Failed to receive the answer to my 'Find Successor' request");
348     delete answer;
349   }
350
351   return successor;
352 }
353
354 /* Notifies the current node that its predecessor may have changed. */
355 void Node::notify(int predecessor_candidate_id)
356 {
357   if (pred_id_ == -1 || is_in_interval(predecessor_candidate_id, pred_id_ + 1, id_ - 1)) {
358     setPredecessor(predecessor_candidate_id);
359     printFingerTable();
360   } else {
361     XBT_DEBUG("I don't have to change my predecessor to %d", predecessor_candidate_id);
362   }
363 }
364
365 /* Notifies a remote node that its predecessor may have changed. */
366 void Node::remoteNotify(int notify_id, int predecessor_candidate_id) const
367 {
368   auto* message         = new ChordMessage(MessageType::NOTIFY);
369   message->request_id   = predecessor_candidate_id;
370   message->answer_to    = nullptr;
371
372   // send a "Notify" request to notify_id
373   XBT_DEBUG("Sending a 'Notify' request to %d", notify_id);
374   sg4::Mailbox* mailbox = sg4::Mailbox::by_name(std::to_string(notify_id));
375   mailbox->put_init(message, 10)->detach(ChordMessage::destroy);
376 }
377
378 /* This function is called periodically. It checks the immediate successor of the current node. */
379 void Node::stabilize()
380 {
381   XBT_DEBUG("Stabilizing node");
382
383   // get the predecessor of my immediate successor
384   int candidate_id = pred_id_;
385   int successor_id = fingers_[0];
386   if (successor_id != id_)
387     candidate_id = remoteGetPredecessor(successor_id);
388
389   // this node is a candidate to become my new successor
390   if (candidate_id != -1 && is_in_interval(candidate_id, id_ + 1, successor_id - 1)) {
391     setFinger(0, candidate_id);
392   }
393   if (successor_id != id_) {
394     remoteNotify(successor_id, id_);
395   }
396 }
397
398 /* This function is called when a node receives a message.
399  *
400  * @param message the message to handle (don't touch it afterward: it will be destroyed, reused or forwarded)
401  */
402 void Node::handleMessage(ChordMessage* message)
403 {
404   switch (message->type) {
405     case MessageType::FIND_SUCCESSOR:
406       XBT_DEBUG("Received a 'Find Successor' request from %s for id %d", message->issuer_host_name.c_str(),
407                 message->request_id);
408       // is my successor the successor?
409       if (is_in_interval(message->request_id, id_ + 1, fingers_[0])) {
410         message->type      = MessageType::FIND_SUCCESSOR_ANSWER;
411         message->answer_id = fingers_[0];
412         XBT_DEBUG("Sending back a 'Find Successor Answer' to %s (mailbox %s): the successor of %d is %d",
413                   message->issuer_host_name.c_str(), message->answer_to->get_cname(), message->request_id,
414                   message->answer_id);
415         message->answer_to->put_init(message, 10)->detach(ChordMessage::destroy);
416       } else {
417         // otherwise, forward the request to the closest preceding finger in my table
418         int closest = closestPrecedingFinger(message->request_id);
419         XBT_DEBUG("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
420                   message->request_id, closest);
421         sg4::Mailbox* mailbox = sg4::Mailbox::by_name(std::to_string(closest));
422         mailbox->put_init(message, 10)->detach(ChordMessage::destroy);
423       }
424       break;
425
426     case MessageType::GET_PREDECESSOR:
427       XBT_DEBUG("Receiving a 'Get Predecessor' request from %s", message->issuer_host_name.c_str());
428       message->type      = MessageType::GET_PREDECESSOR_ANSWER;
429       message->answer_id = pred_id_;
430       XBT_DEBUG("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
431                 message->issuer_host_name.c_str(), message->answer_to->get_cname(), message->answer_id);
432       message->answer_to->put_init(message, 10)->detach(ChordMessage::destroy);
433       break;
434
435     case MessageType::NOTIFY:
436       // someone is telling me that he may be my new predecessor
437       XBT_DEBUG("Receiving a 'Notify' request from %s", message->issuer_host_name.c_str());
438       notify(message->request_id);
439       delete message;
440       break;
441
442     case MessageType::PREDECESSOR_LEAVING:
443       // my predecessor is about to quit
444       XBT_DEBUG("Receiving a 'Predecessor Leaving' message from %s", message->issuer_host_name.c_str());
445       // modify my predecessor
446       setPredecessor(message->request_id);
447       delete message;
448       /*TODO :
449         >> notify my new predecessor
450         >> send a notify_predecessors !!
451        */
452       break;
453
454     case MessageType::SUCCESSOR_LEAVING:
455       // my successor is about to quit
456       XBT_DEBUG("Receiving a 'Successor Leaving' message from %s", message->issuer_host_name.c_str());
457       // modify my successor FIXME : this should be implicit ?
458       setFinger(0, message->request_id);
459       delete message;
460       /* TODO
461          >> notify my new successor
462          >> update my table & predecessors table */
463       break;
464
465     case MessageType::PREDECESSOR_ALIVE:
466       XBT_DEBUG("Receiving a 'Predecessor Alive' request from %s", message->issuer_host_name.c_str());
467       message->type = MessageType::PREDECESSOR_ALIVE_ANSWER;
468       XBT_DEBUG("Sending back a 'Predecessor Alive Answer' to %s (mailbox %s)", message->issuer_host_name.c_str(),
469                 message->answer_to->get_cname());
470       message->answer_to->put_init(message, 10)->detach(ChordMessage::destroy);
471       break;
472
473     default:
474       XBT_DEBUG("Ignoring unexpected message: %d from %s", static_cast<int>(message->type),
475                 message->issuer_host_name.c_str());
476       delete message;
477   }
478 }
479
480 void Node::operator()()
481 {
482   sg4::this_actor::sleep_for(start_time_);
483   if (known_id_ == -1) {
484     setPredecessor(-1); // -1 means that I have no predecessor
485     printFingerTable();
486     joined = true;
487   } else {
488     join(known_id_);
489   }
490
491   if (not joined)
492     return;
493   ChordMessage* message              = nullptr;
494   double now                         = sg4::Engine::get_clock();
495   double next_stabilize_date         = start_time_ + PERIODIC_STABILIZE_DELAY;
496   double next_fix_fingers_date       = start_time_ + PERIODIC_FIX_FINGERS_DELAY;
497   double next_check_predecessor_date = start_time_ + PERIODIC_CHECK_PREDECESSOR_DELAY;
498   double next_lookup_date            = start_time_ + PERIODIC_LOOKUP_DELAY;
499   sg4::CommPtr comm_receive          = nullptr;
500   while (now < std::min(start_time_ + deadline_, MAX_SIMULATION_TIME)) {
501     if (comm_receive == nullptr)
502       comm_receive = mailbox_->get_async<ChordMessage>(&message);
503     bool comm_completed = true;
504     try {
505       if (not comm_receive->test())
506         comm_completed = false;
507     } catch (const simgrid::TimeoutException&) {
508       XBT_DEBUG("Caught a timeout, go ahead.");
509     }
510
511     if (comm_completed) {
512       if (message != nullptr) {
513         handleMessage(message);
514         message = nullptr;
515       }
516       comm_receive = nullptr;
517     } else {
518       // no task was received: make some periodic calls
519       if (now >= next_stabilize_date) {
520         stabilize();
521         next_stabilize_date = sg4::Engine::get_clock() + PERIODIC_STABILIZE_DELAY;
522       } else if (now >= next_fix_fingers_date) {
523         fixFingers();
524         next_fix_fingers_date = sg4::Engine::get_clock() + PERIODIC_FIX_FINGERS_DELAY;
525       } else if (now >= next_check_predecessor_date) {
526         checkPredecessor();
527         next_check_predecessor_date = sg4::Engine::get_clock() + PERIODIC_CHECK_PREDECESSOR_DELAY;
528       } else if (now >= next_lookup_date) {
529         randomLookup();
530         next_lookup_date = sg4::Engine::get_clock() + PERIODIC_LOOKUP_DELAY;
531       } else {
532         // nothing to do: sleep for a while
533         sg4::this_actor::sleep_for(SLEEP_DELAY);
534       }
535     }
536
537     now = sg4::Engine::get_clock();
538   }
539   if (comm_receive != nullptr) {
540     try {
541       if (comm_receive->test())
542         delete message;
543       else
544         comm_receive->cancel();
545     } catch (const simgrid::TimeoutException&) {
546       XBT_DEBUG("Caught a timeout for last message, nevermind.");
547     }
548   }
549   // leave the ring
550   leave();
551 }