Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into 'master'
[simgrid.git] / examples / s4u / app-bittorrent / s4u-peer.cpp
1 /* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7 #include <array>
8 #include <climits>
9
10 #include "s4u-peer.hpp"
11 #include "s4u-tracker.hpp"
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
14
15 /*
16  * User parameters for transferred file data. For the test, the default values are :
17  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
18  */
19 constexpr unsigned long FILE_PIECES   = 10UL;
20 constexpr unsigned long PIECES_BLOCKS = 5UL;
21 constexpr int BLOCK_SIZE              = 16384;
22
23 /** Number of blocks asked by each request */
24 constexpr unsigned long BLOCKS_REQUESTED = 2UL;
25
26 constexpr double SLEEP_DURATION     = 1.0;
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
28
29 constexpr std::array<const char*, 10> message_type_names{
30     {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "HAVE", "BITFIELD", "REQUEST", "PIECE", "CANCEL"}};
31
32 Peer::Peer(std::vector<std::string> args)
33 {
34   // Check arguments
35   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
36   try {
37     id       = std::stoi(args[1]);
38     mailbox_ = simgrid::s4u::Mailbox::by_name(std::to_string(id));
39   } catch (const std::invalid_argument&) {
40     throw std::invalid_argument("Invalid ID:" + args[1]);
41   }
42   random.set_seed(id);
43
44   try {
45     deadline = std::stod(args[2]);
46   } catch (const std::invalid_argument&) {
47     throw std::invalid_argument("Invalid deadline:" + args[2]);
48   }
49   xbt_assert(deadline > 0, "Wrong deadline supplied");
50
51   if (args.size() == 4 && args[3] == "1") {
52     bitfield_       = (1U << FILE_PIECES) - 1U;
53     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
54   }
55   pieces_count.resize(FILE_PIECES);
56
57   XBT_INFO("Hi, I'm joining the network with id %d", id);
58 }
59
60 /** Peer main function */
61 void Peer::operator()()
62 {
63   // Getting peer data from the tracker.
64   if (getPeersFromTracker()) {
65     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
66     begin_receive_time = simgrid::s4u::Engine::get_clock();
67     mailbox_->set_receiver(simgrid::s4u::Actor::self());
68     if (hasFinished()) {
69       sendHandshakeToAllPeers();
70     } else {
71       leech();
72     }
73     seed();
74   } else {
75     XBT_INFO("Couldn't contact the tracker.");
76   }
77
78   XBT_INFO("Here is my current status: %s", getStatus().c_str());
79 }
80
81 bool Peer::getPeersFromTracker()
82 {
83   simgrid::s4u::Mailbox* tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX);
84   // Build the task to send to the tracker
85   auto* peer_request = new TrackerQuery(id, mailbox_);
86   try {
87     XBT_DEBUG("Sending a peer request to the tracker.");
88     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
89   } catch (const simgrid::TimeoutException&) {
90     XBT_DEBUG("Timeout expired when requesting peers to tracker");
91     delete peer_request;
92     return false;
93   }
94
95   try {
96     auto* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
97     // Add the peers the tracker gave us to our peer list.
98     for (auto const& peer_id : answer->getPeers())
99       if (id != peer_id)
100         connected_peers.emplace(peer_id, Connection(peer_id));
101     delete answer;
102   } catch (const simgrid::TimeoutException&) {
103     XBT_DEBUG("Timeout expired when requesting peers to tracker");
104     return false;
105   }
106   return true;
107 }
108
109 void Peer::sendHandshakeToAllPeers()
110 {
111   for (auto const& kv : connected_peers) {
112     const Connection& remote_peer = kv.second;
113     auto* handshake               = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
114     remote_peer.mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
115     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id);
116   }
117 }
118
119 void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, e_message_type type, uint64_t size)
120 {
121   XBT_DEBUG("Sending %s to %s", message_type_names.at(type), mailbox->get_cname());
122   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
123 }
124
125 void Peer::sendBitfield(simgrid::s4u::Mailbox* mailbox)
126 {
127   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
128   mailbox
129       ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
130                  MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
131       ->detach();
132 }
133
134 void Peer::sendPiece(simgrid::s4u::Mailbox* mailbox, unsigned int piece, int block_index, int block_length)
135 {
136   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
137   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
138   mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
139 }
140
141 void Peer::sendHaveToAllPeers(unsigned int piece)
142 {
143   XBT_DEBUG("Sending HAVE message to all my peers");
144   for (auto const& kv : connected_peers) {
145     const Connection& remote_peer = kv.second;
146     remote_peer.mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
147   }
148 }
149
150 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
151 {
152   remote_peer->current_piece = piece;
153   xbt_assert(remote_peer->hasPiece(piece));
154   int block_index = getFirstMissingBlockFrom(piece);
155   if (block_index != -1) {
156     int block_length = static_cast<int>(std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index));
157     XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
158               block_length);
159     remote_peer->mailbox_
160         ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
161         ->detach();
162   }
163 }
164
165 std::string Peer::getStatus() const
166 {
167   std::string res;
168   for (unsigned i = 0; i < FILE_PIECES; i++)
169     res += (bitfield_ & (1U << i)) ? '1' : '0';
170   return res;
171 }
172
173 bool Peer::hasFinished() const
174 {
175   return bitfield_ == (1U << FILE_PIECES) - 1U;
176 }
177
178 /** Indicates if the remote peer has a piece not stored by the local peer */
179 bool Peer::isInterestedBy(const Connection* remote_peer) const
180 {
181   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
182 }
183
184 bool Peer::isInterestedByFree(const Connection* remote_peer) const
185 {
186   for (unsigned int i = 0; i < FILE_PIECES; i++)
187     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
188       return true;
189   return false;
190 }
191
192 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
193 {
194   for (unsigned int i = 0; i < FILE_PIECES; i++)
195     if (bitfield & (1U << i))
196       pieces_count[i]++;
197 }
198
199 unsigned int Peer::countPieces(unsigned int bitfield) const
200 {
201   unsigned int count = 0U;
202   unsigned int n     = bitfield;
203   while (n) {
204     count += n & 1U;
205     n >>= 1U;
206   }
207   return count;
208 }
209
210 int Peer::nbInterestedPeers() const
211 {
212   int nb = 0;
213   for (auto const& kv : connected_peers)
214     if (kv.second.interested)
215       nb++;
216   return nb;
217 }
218
219 void Peer::leech()
220 {
221   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
222   XBT_DEBUG("Start downloading.");
223
224   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
225   sendHandshakeToAllPeers();
226   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
227
228   void* data = nullptr;
229   while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
230     if (comm_received == nullptr) {
231       comm_received = mailbox_->get_async(&data);
232     }
233     if (comm_received->test()) {
234       message = static_cast<Message*>(data);
235       handleMessage();
236       delete message;
237       comm_received = nullptr;
238     } else {
239       // We don't execute the choke algorithm if we don't already have a piece
240       if (simgrid::s4u::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
241         updateChokedPeers();
242         next_choked_update += UPDATE_CHOKED_INTERVAL;
243       } else {
244         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
245       }
246     }
247   }
248   if (hasFinished())
249     XBT_DEBUG("%d becomes a seeder", id);
250 }
251
252 void Peer::seed()
253 {
254   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
255   XBT_DEBUG("Start seeding.");
256   // start the main seed loop
257   void* data = nullptr;
258   while (simgrid::s4u::Engine::get_clock() < deadline) {
259     if (comm_received == nullptr) {
260       comm_received = mailbox_->get_async(&data);
261     }
262     if (comm_received->test()) {
263       message = static_cast<Message*>(data);
264       handleMessage();
265       delete message;
266       comm_received = nullptr;
267     } else {
268       if (simgrid::s4u::Engine::get_clock() >= next_choked_update) {
269         updateChokedPeers();
270         // TODO: Change the choked peer algorithm when seeding.
271         next_choked_update += UPDATE_CHOKED_INTERVAL;
272       } else {
273         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
274       }
275     }
276   }
277 }
278
279 void Peer::updateActivePeersSet(Connection* remote_peer)
280 {
281   if (remote_peer->interested && not remote_peer->choked_upload)
282     active_peers.insert(remote_peer);
283   else
284     active_peers.erase(remote_peer);
285 }
286
287 void Peer::handleMessage()
288 {
289   XBT_DEBUG("Received a %s message from %s", message_type_names.at(message->type),
290             message->return_mailbox->get_cname());
291
292   auto known_peer         = connected_peers.find(message->peer_id);
293   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : &known_peer->second;
294   xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE,
295              "The impossible did happened: A not-in-our-list peer sent us a message.");
296
297   switch (message->type) {
298     case MESSAGE_HANDSHAKE:
299       // Check if the peer is in our connection list.
300       if (remote_peer == nullptr) {
301         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
302         connected_peers.emplace(message->peer_id, Connection(message->peer_id));
303         sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
304       }
305       // Send our bitfield to the peer
306       sendBitfield(message->return_mailbox);
307       break;
308     case MESSAGE_BITFIELD:
309       // Update the pieces list
310       updatePiecesCountFromBitfield(message->bitfield);
311       // Store the bitfield
312       remote_peer->bitfield = message->bitfield;
313       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
314       if (isInterestedBy(remote_peer)) {
315         remote_peer->am_interested = true;
316         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
317       }
318       break;
319     case MESSAGE_INTERESTED:
320       // Update the interested state of the peer.
321       remote_peer->interested = true;
322       updateActivePeersSet(remote_peer);
323       break;
324     case MESSAGE_NOTINTERESTED:
325       remote_peer->interested = false;
326       updateActivePeersSet(remote_peer);
327       break;
328     case MESSAGE_UNCHOKE:
329       xbt_assert(remote_peer->choked_download);
330       remote_peer->choked_download = false;
331       // Send requests to the peer, since it has unchoked us
332       if (remote_peer->am_interested)
333         requestNewPieceTo(remote_peer);
334       break;
335     case MESSAGE_CHOKE:
336       xbt_assert(not remote_peer->choked_download);
337       remote_peer->choked_download = true;
338       if (remote_peer->current_piece != -1)
339         removeCurrentPiece(remote_peer, remote_peer->current_piece);
340       break;
341     case MESSAGE_HAVE:
342       XBT_DEBUG("\t for piece %d", message->piece);
343       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
344                  "Wrong HAVE message received");
345       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
346       pieces_count[message->piece]++;
347       // If the piece is in our pieces, we tell the peer that we are interested.
348       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
349         remote_peer->am_interested = true;
350         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
351         if (not remote_peer->choked_download)
352           requestNewPieceTo(remote_peer);
353       }
354       break;
355     case MESSAGE_REQUEST:
356       xbt_assert(remote_peer->interested);
357       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
358                  "Wrong HAVE message received");
359       if (not remote_peer->choked_upload) {
360         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
361                   message->block_index + message->block_length);
362         if (not hasNotPiece(message->piece)) {
363           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
364         }
365       } else {
366         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
367       }
368       break;
369     case MESSAGE_PIECE:
370       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
371                 message->block_index + message->block_length);
372       xbt_assert(not remote_peer->choked_download);
373       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
374       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
375                  "Wrong piece received");
376       // TODO: Execute a computation.
377       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
378         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
379         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
380           // Removing the piece from our piece list
381           removeCurrentPiece(remote_peer, message->piece);
382           // Setting the fact that we have the piece
383           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
384           XBT_DEBUG("My status is now %s", getStatus().c_str());
385           // Sending the information to all the peers we are connected to
386           sendHaveToAllPeers(message->piece);
387           // sending UNINTERESTED to peers that do not have what we want.
388           updateInterestedAfterReceive();
389         } else {                                      // piece not completed
390           sendRequestTo(remote_peer, message->piece); // ask for the next block
391         }
392       } else {
393         XBT_DEBUG("However, we already have it");
394         requestNewPieceTo(remote_peer);
395       }
396       break;
397     case MESSAGE_CANCEL:
398       break;
399     default:
400       THROW_IMPOSSIBLE;
401   }
402   // Update the peer speed.
403   if (remote_peer) {
404     remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::get_clock() - begin_receive_time));
405   }
406   begin_receive_time = simgrid::s4u::Engine::get_clock();
407 }
408
409 /** Selects the appropriate piece to download and requests it to the remote_peer */
410 void Peer::requestNewPieceTo(Connection* remote_peer)
411 {
412   int piece = selectPieceToDownload(remote_peer);
413   if (piece != -1) {
414     current_pieces |= (1U << (unsigned int)piece);
415     sendRequestTo(remote_peer, piece);
416   }
417 }
418
419 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
420 {
421   current_pieces &= ~(1U << current_piece);
422   remote_peer->current_piece = -1;
423 }
424
425 /** @brief Return the piece to be downloaded
426  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
427  * If a piece is partially downloaded, this piece will be selected prioritarily
428  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
429  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
430  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
431  * @param remote_peer: information about the connection
432  * @return the piece to download if possible. -1 otherwise
433  */
434 int Peer::selectPieceToDownload(const Connection* remote_peer)
435 {
436   int piece = partiallyDownloadedPiece(remote_peer);
437   // strict priority policy
438   if (piece != -1)
439     return piece;
440
441   // end game mode
442   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
443     int nb_interesting_pieces = 0;
444     // compute the number of interesting pieces
445     for (unsigned int i = 0; i < FILE_PIECES; i++)
446       if (remotePeerHasMissingPiece(remote_peer, i))
447         nb_interesting_pieces++;
448
449     xbt_assert(nb_interesting_pieces != 0);
450     // get a random interesting piece
451     int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
452     int current_index      = 0;
453     for (unsigned int i = 0; i < FILE_PIECES; i++) {
454       if (remotePeerHasMissingPiece(remote_peer, i)) {
455         if (random_piece_index == current_index) {
456           piece = i;
457           break;
458         }
459         current_index++;
460       }
461     }
462     xbt_assert(piece != -1);
463     return piece;
464   }
465   // Random first policy
466   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
467     int nb_interesting_pieces = 0;
468     // compute the number of interesting pieces
469     for (unsigned int i = 0; i < FILE_PIECES; i++)
470       if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
471         nb_interesting_pieces++;
472     xbt_assert(nb_interesting_pieces != 0);
473     // get a random interesting piece
474     int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
475     int current_index      = 0;
476     for (unsigned int i = 0; i < FILE_PIECES; i++) {
477       if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
478         if (random_piece_index == current_index) {
479           piece = i;
480           break;
481         }
482         current_index++;
483       }
484     }
485     xbt_assert(piece != -1);
486     return piece;
487   } else { // Rarest first policy
488     short min         = SHRT_MAX;
489     int nb_min_pieces = 0;
490     int current_index = 0;
491     // compute the smallest number of copies of available pieces
492     for (unsigned int i = 0; i < FILE_PIECES; i++) {
493       if (pieces_count[i] < min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
494         min = pieces_count[i];
495     }
496
497     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
498     // compute the number of rarest pieces
499     for (unsigned int i = 0; i < FILE_PIECES; i++)
500       if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
501         nb_min_pieces++;
502
503     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
504     // get a random rarest piece
505     int random_rarest_index = 0;
506     if (nb_min_pieces > 0) {
507       random_rarest_index = random.uniform_int(0, nb_min_pieces - 1);
508     }
509     for (unsigned int i = 0; i < FILE_PIECES; i++)
510       if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
511         if (random_rarest_index == current_index) {
512           piece = i;
513           break;
514         }
515         current_index++;
516       }
517
518     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
519     return piece;
520   }
521 }
522
523 void Peer::updateChokedPeers()
524 {
525   if (nbInterestedPeers() == 0)
526     return;
527   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
528   // update the current round
529   round_                  = (round_ + 1) % 3;
530   Connection* chosen_peer = nullptr;
531   // select first active peer and remove it from the set
532   Connection* choked_peer;
533   if (active_peers.empty()) {
534     choked_peer = nullptr;
535   } else {
536     choked_peer = *active_peers.begin();
537     active_peers.erase(choked_peer);
538   }
539
540   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
541   if (hasFinished()) {
542     double unchoke_time = simgrid::s4u::Engine::get_clock() + 1;
543     for (auto& kv : connected_peers) {
544       Connection& remote_peer = kv.second;
545       if (remote_peer.last_unchoke < unchoke_time && remote_peer.interested && remote_peer.choked_upload) {
546         unchoke_time = remote_peer.last_unchoke;
547         chosen_peer  = &remote_peer;
548       }
549     }
550   } else {
551     // Random optimistic unchoking
552     if (round_ == 0) {
553       int j = 0;
554       do {
555         // We choose a random peer to unchoke.
556         auto chosen_peer_it = connected_peers.begin();
557         std::advance(chosen_peer_it, random.uniform_int(0, static_cast<int>(connected_peers.size() - 1)));
558         chosen_peer = &chosen_peer_it->second;
559         if (not chosen_peer->interested || not chosen_peer->choked_upload)
560           chosen_peer = nullptr;
561         else
562           XBT_DEBUG("Nothing to do, keep going");
563         j++;
564       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
565     } else {
566       // Use the "fastest download" policy.
567       double fastest_speed = 0.0;
568       for (auto& kv : connected_peers) {
569         Connection& remote_peer = kv.second;
570         if (remote_peer.peer_speed > fastest_speed && remote_peer.choked_upload && remote_peer.interested) {
571           fastest_speed = remote_peer.peer_speed;
572           chosen_peer   = &remote_peer;
573         }
574       }
575     }
576   }
577
578   if (chosen_peer != nullptr)
579     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
580               chosen_peer->interested, chosen_peer->choked_upload);
581
582   if (choked_peer != chosen_peer) {
583     if (choked_peer != nullptr) {
584       xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
585       choked_peer->choked_upload = true;
586       updateActivePeersSet(choked_peer);
587       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
588       sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
589     }
590     if (chosen_peer != nullptr) {
591       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
592       chosen_peer->choked_upload = false;
593       active_peers.insert(chosen_peer);
594       chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
595       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
596       updateActivePeersSet(chosen_peer);
597       sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
598     }
599   }
600 }
601
602 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
603 void Peer::updateInterestedAfterReceive()
604 {
605   for (auto& kv : connected_peers) {
606     Connection& remote_peer = kv.second;
607     if (remote_peer.am_interested) {
608       bool interested = false;
609       // Check if the peer still has a piece we want.
610       for (unsigned int i = 0; i < FILE_PIECES; i++)
611         if (remotePeerHasMissingPiece(&remote_peer, i)) {
612           interested = true;
613           break;
614         }
615
616       if (not interested) { // no more piece to download from connection
617         remote_peer.am_interested = false;
618         sendMessage(remote_peer.mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
619       }
620     }
621   }
622 }
623
624 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
625 {
626   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
627   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
628              block_index);
629   for (int i = block_index; i < (block_index + block_length); i++)
630     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
631 }
632
633 bool Peer::hasCompletedPiece(unsigned int piece) const
634 {
635   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
636     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
637       return false;
638   return true;
639 }
640
641 int Peer::getFirstMissingBlockFrom(int piece) const
642 {
643   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
644     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
645       return i;
646   return -1;
647 }
648
649 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
650 int Peer::partiallyDownloadedPiece(const Connection* remote_peer) const
651 {
652   for (unsigned int i = 0; i < FILE_PIECES; i++)
653     if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
654       return i;
655   return -1;
656 }