1 /* Copyright (c) 2012-2023. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "bittorrent-peer.h"
8 #include <simgrid/forward.h>
12 #include <stdio.h> /* snprintf */
14 XBT_LOG_NEW_DEFAULT_CATEGORY(bittorrent_peers, "Messages specific for the peers");
17 * User parameters for transferred file data. For the test, the default values are :
18 * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
20 #define FILE_PIECES 10UL
21 #define PIECES_BLOCKS 5UL
22 #define BLOCK_SIZE 16384
24 /** Number of blocks asked by each request */
25 #define BLOCKS_REQUESTED 2UL
27 #define SLEEP_DURATION 1
28 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
30 const char* const message_type_names[10] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED",
31 "HAVE", "BITFIELD", "REQUEST", "PIECE", "CANCEL"};
34 #define MIN(a, b) ((a) < (b) ? (a) : (b))
37 static peer_t peer_init(int id, int seed)
39 peer_t peer = xbt_new(s_peer_t, 1);
42 char mailbox_name[MAILBOX_SIZE];
43 snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
44 peer->mailbox = sg_mailbox_by_name(mailbox_name);
46 peer->connected_peers = xbt_dict_new_homogeneous(NULL);
47 peer->active_peers = xbt_dict_new_homogeneous(NULL);
50 peer->bitfield = (1U << FILE_PIECES) - 1U;
51 peer->bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
54 peer->bitfield_blocks = 0;
57 peer->current_pieces = 0;
58 peer->pieces_count = xbt_new0(short, FILE_PIECES);
59 peer->comm_received = NULL;
65 static void peer_free(peer_t peer)
68 connection_t connection;
69 xbt_dict_cursor_t cursor;
70 xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
73 xbt_dict_free(&peer->connected_peers);
74 xbt_dict_free(&peer->active_peers);
75 xbt_free(peer->pieces_count);
79 /** Peer main function */
80 void peer_run(int argc, char* argv[])
83 xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
86 peer_t peer = peer_init((int)xbt_str_parse_int(argv[1], "Invalid ID"), argc == 4 ? 1 : 0);
89 peer->deadline = xbt_str_parse_double(argv[2], "Invalid deadline");
90 xbt_assert(peer->deadline > 0, "Wrong deadline supplied");
92 char* status = xbt_malloc0(FILE_PIECES + 1);
93 get_status(status, peer->bitfield);
95 XBT_INFO("Hi, I'm joining the network with id %d", peer->id);
97 // Getting peer data from the tracker.
98 if (get_peers_from_tracker(peer)) {
99 XBT_DEBUG("Got %d peers from the tracker. Current status is: %s", xbt_dict_length(peer->connected_peers), status);
100 peer->begin_receive_time = simgrid_get_clock();
101 sg_mailbox_set_receiver(sg_mailbox_get_name(peer->mailbox));
103 if (has_finished(peer->bitfield)) {
104 send_handshake_to_all_peers(peer);
110 XBT_INFO("Couldn't contact the tracker.");
113 get_status(status, peer->bitfield);
114 XBT_INFO("Here is my current status: %s", status);
115 if (peer->comm_received) {
116 sg_comm_unref(peer->comm_received);
123 /** @brief Retrieves the peer list from the tracker */
124 int get_peers_from_tracker(const_peer_t peer)
126 sg_mailbox_t tracker_mailbox = sg_mailbox_by_name(TRACKER_MAILBOX);
128 // Build the task to send to the tracker
129 tracker_query_t peer_request = tracker_query_new(peer->id, peer->mailbox);
131 XBT_DEBUG("Sending a peer request to the tracker.");
132 sg_comm_t request = sg_mailbox_put_async(tracker_mailbox, peer_request, TRACKER_COMM_SIZE);
133 sg_error_t res = sg_comm_wait_for(request, GET_PEERS_TIMEOUT);
135 if (res == SG_ERROR_TIMEOUT) {
136 XBT_DEBUG("Timeout expired when requesting peers to tracker");
137 xbt_free(peer_request);
141 void* message = NULL;
142 sg_comm_t comm_received = sg_mailbox_get_async(peer->mailbox, &message);
143 res = sg_comm_wait_for(comm_received, GET_PEERS_TIMEOUT);
145 const_tracker_answer_t ta = (const_tracker_answer_t)message;
146 // Add the peers the tracker gave us to our peer list.
149 // Add the peers the tracker gave us to our peer list.
150 xbt_dynar_foreach (ta->peers, i, peer_id) {
151 if (peer_id != peer->id)
152 xbt_dict_set_ext(peer->connected_peers, (char*)&peer_id, sizeof(int), connection_new(peer_id));
154 tracker_answer_free(message);
155 } else if (res == SG_ERROR_TIMEOUT) {
156 XBT_DEBUG("Timeout expired when requesting peers to tracker");
157 tracker_answer_free(message);
164 /** @brief Send a handshake message to all the peers the peer has. */
165 void send_handshake_to_all_peers(const_peer_t peer)
167 connection_t remote_peer;
168 xbt_dict_cursor_t cursor = NULL;
170 xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
171 message_t handshake = message_new(MESSAGE_HANDSHAKE, peer->id, peer->mailbox);
172 sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, handshake, MESSAGE_HANDSHAKE_SIZE);
173 sg_comm_detach(comm, NULL);
174 XBT_DEBUG("Sending a HANDSHAKE to %s", sg_mailbox_get_name(remote_peer->mailbox));
178 void send_message(const_peer_t peer, sg_mailbox_t mailbox, e_message_type type, uint64_t size)
180 XBT_DEBUG("Sending %s to %s", message_type_names[type], sg_mailbox_get_name(mailbox));
181 message_t message = message_other_new(type, peer->id, peer->mailbox, peer->bitfield);
182 sg_comm_t comm = sg_mailbox_put_init(mailbox, message, size);
183 sg_comm_detach(comm, NULL);
186 /** @brief Send a bitfield message to all the peers the peer has */
187 void send_bitfield(const_peer_t peer, sg_mailbox_t mailbox)
189 XBT_DEBUG("Sending a BITFIELD to %s", sg_mailbox_get_name(mailbox));
190 message_t message = message_other_new(MESSAGE_BITFIELD, peer->id, peer->mailbox, peer->bitfield);
191 sg_comm_t comm = sg_mailbox_put_init(mailbox, message, MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES));
192 sg_comm_detach(comm, NULL);
195 /** Send a "piece" message to a pair, containing a piece of the file */
196 void send_piece(const_peer_t peer, sg_mailbox_t mailbox, int piece, int block_index, int block_length)
198 XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, sg_mailbox_get_name(mailbox));
199 xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
200 xbt_assert(!peer_has_not_piece(peer, piece), "Tried to send a piece that we doesn't have.");
201 message_t message = message_piece_new(peer->id, peer->mailbox, piece, block_index, block_length);
202 sg_comm_t comm = sg_mailbox_put_init(mailbox, message, BLOCK_SIZE);
203 sg_comm_detach(comm, NULL);
206 /** Send a "HAVE" message to all peers we are connected to */
207 void send_have_to_all_peers(const_peer_t peer, int piece)
209 XBT_DEBUG("Sending HAVE message to all my peers");
210 connection_t remote_peer;
211 xbt_dict_cursor_t cursor = NULL;
213 xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
214 message_t message = message_index_new(MESSAGE_HAVE, peer->id, peer->mailbox, piece);
215 sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_HAVE_SIZE);
216 sg_comm_detach(comm, NULL);
220 /** @brief Send request messages to a peer that have unchoked us */
221 void send_request_to_peer(const_peer_t peer, connection_t remote_peer, int piece)
223 remote_peer->current_piece = piece;
224 xbt_assert(connection_has_piece(remote_peer, piece));
225 int block_index = get_first_missing_block_from(peer, piece);
226 if (block_index != -1) {
227 int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
228 XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", sg_mailbox_get_name(remote_peer->mailbox), piece,
229 block_index, block_length);
230 message_t message = message_request_new(peer->id, peer->mailbox, piece, block_index, block_length);
231 sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_REQUEST_SIZE);
232 sg_comm_detach(comm, NULL);
236 void get_status(char* status, unsigned int bitfield)
238 for (int i = FILE_PIECES - 1; i >= 0; i--)
239 status[i] = (bitfield & (1U << i)) ? '1' : '0';
240 status[FILE_PIECES] = '\0';
243 int has_finished(unsigned int bitfield)
245 return bitfield == (1U << FILE_PIECES) - 1U;
248 /** Indicates if the remote peer has a piece not stored by the local peer */
249 int is_interested(const_peer_t peer, const_connection_t remote_peer)
251 return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1));
254 /** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
255 int is_interested_and_free(const_peer_t peer, const_connection_t remote_peer)
257 for (unsigned int i = 0; i < FILE_PIECES; i++)
258 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i))
263 /** @brief Updates the list of who has a piece from a bitfield */
264 void update_pieces_count_from_bitfield(const_peer_t peer, unsigned int bitfield)
266 for (unsigned int i = 0; i < FILE_PIECES; i++)
267 if (bitfield & (1U << i))
268 peer->pieces_count[i]++;
271 unsigned int count_pieces(unsigned int bitfield)
273 unsigned int count = 0;
274 unsigned int n = bitfield;
282 int nb_interested_peers(const_peer_t peer)
284 xbt_dict_cursor_t cursor = NULL;
286 connection_t connection;
288 xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
289 if (connection->interested)
295 /** @brief Peer main loop when it is leeching. */
296 void leech(peer_t peer)
298 double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
299 XBT_DEBUG("Start downloading.");
301 /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
302 send_handshake_to_all_peers(peer);
303 XBT_DEBUG("Starting main leech loop");
306 while (simgrid_get_clock() < peer->deadline && count_pieces(peer->bitfield) < FILE_PIECES) {
307 if (peer->comm_received == NULL)
308 peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
310 if (sg_comm_test(peer->comm_received)) {
311 peer->message = (message_t)data;
312 handle_message(peer, peer->message);
313 xbt_free(peer->message);
314 peer->comm_received = NULL;
316 // We don't execute the choke algorithm if we don't already have a piece
317 if (simgrid_get_clock() >= next_choked_update && count_pieces(peer->bitfield) > 0) {
318 update_choked_peers(peer);
319 next_choked_update += UPDATE_CHOKED_INTERVAL;
321 sg_actor_sleep_for(SLEEP_DURATION);
325 if (has_finished(peer->bitfield))
326 XBT_DEBUG("%d becomes a seeder", peer->id);
329 /** @brief Peer main loop when it is seeding */
330 void seed(peer_t peer)
332 double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
333 XBT_DEBUG("Start seeding.");
334 // start the main seed loop
336 while (simgrid_get_clock() < peer->deadline) {
337 if (peer->comm_received == NULL)
338 peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
340 if (sg_comm_test(peer->comm_received)) {
341 peer->message = (message_t)data;
342 handle_message(peer, peer->message);
343 xbt_free(peer->message);
344 peer->comm_received = NULL;
346 if (simgrid_get_clock() >= next_choked_update) {
347 update_choked_peers(peer);
348 // TODO: Change the choked peer algorithm when seeding.
349 next_choked_update += UPDATE_CHOKED_INTERVAL;
351 sg_actor_sleep_for(SLEEP_DURATION);
357 void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer)
359 if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
360 // add in the active peers set
361 xbt_dict_set_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int), remote_peer);
362 } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int))) {
363 xbt_dict_remove_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int));
367 /** @brief Handle a received message sent by another peer */
368 void handle_message(peer_t peer, message_t message)
370 XBT_DEBUG("Received a %s message from %s", message_type_names[message->type],
371 sg_mailbox_get_name(message->return_mailbox));
373 connection_t remote_peer = xbt_dict_get_or_null_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int));
374 xbt_assert(remote_peer != NULL || message->type == MESSAGE_HANDSHAKE,
375 "The impossible did happened: A not-in-our-list peer sent us a message.");
377 switch (message->type) {
378 case MESSAGE_HANDSHAKE:
379 // Check if the peer is in our connection list.
380 if (remote_peer == 0) {
381 xbt_dict_set_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int),
382 connection_new(message->peer_id));
383 send_message(peer, message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
385 // Send our bitfield to the peer
386 send_bitfield(peer, message->return_mailbox);
388 case MESSAGE_BITFIELD:
389 // Update the pieces list
390 update_pieces_count_from_bitfield(peer, message->bitfield);
391 // Store the bitfield
392 remote_peer->bitfield = message->bitfield;
393 xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
394 if (is_interested(peer, remote_peer)) {
395 remote_peer->am_interested = 1;
396 send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
399 case MESSAGE_INTERESTED:
400 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
401 // Update the interested state of the peer.
402 remote_peer->interested = 1;
403 update_active_peers_set(peer, remote_peer);
405 case MESSAGE_NOTINTERESTED:
406 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
407 remote_peer->interested = 0;
408 update_active_peers_set(peer, remote_peer);
410 case MESSAGE_UNCHOKE:
411 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
412 xbt_assert(remote_peer->choked_download);
413 remote_peer->choked_download = 0;
414 // Send requests to the peer, since it has unchoked us
415 if (remote_peer->am_interested)
416 request_new_piece_to_peer(peer, remote_peer);
419 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
420 xbt_assert(!remote_peer->choked_download);
421 remote_peer->choked_download = 1;
422 if (remote_peer->current_piece != -1)
423 remove_current_piece(peer, remote_peer, remote_peer->current_piece);
426 XBT_DEBUG("\t for piece %d", message->piece);
427 xbt_assert((message->piece >= 0 && (unsigned)message->piece < FILE_PIECES), "Wrong HAVE message received");
428 remote_peer->bitfield = remote_peer->bitfield | (1U << message->piece);
429 peer->pieces_count[message->piece]++;
430 // If the piece is in our pieces, we tell the peer that we are interested.
431 if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer, message->piece)) {
432 remote_peer->am_interested = 1;
433 send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
434 if (remote_peer->choked_download == 0)
435 request_new_piece_to_peer(peer, remote_peer);
438 case MESSAGE_REQUEST:
439 xbt_assert(remote_peer->interested);
440 xbt_assert((message->piece >= 0 && (unsigned)message->piece < FILE_PIECES), "Wrong request received");
441 if (remote_peer->choked_upload == 0) {
442 XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
443 message->block_index + message->block_length);
444 if (!peer_has_not_piece(peer, message->piece)) {
445 send_piece(peer, message->return_mailbox, message->piece, message->block_index, message->block_length);
448 XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
452 XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
453 message->block_index + message->block_length);
454 xbt_assert(!remote_peer->choked_download);
455 xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
456 xbt_assert((message->piece >= 0 && (unsigned)message->piece < FILE_PIECES), "Wrong piece received");
457 // TODO: Execute a computation.
458 if (peer_has_not_piece(peer, message->piece)) {
459 update_bitfield_blocks(peer, message->piece, message->block_index, message->block_length);
460 if (piece_complete(peer, message->piece)) {
461 // Removing the piece from our piece list
462 remove_current_piece(peer, remote_peer, message->piece);
463 // Setting the fact that we have the piece
464 peer->bitfield = peer->bitfield | (1U << message->piece);
465 char* status = xbt_malloc0(FILE_PIECES + 1);
466 get_status(status, peer->bitfield);
467 XBT_DEBUG("My status is now %s", status);
469 // Sending the information to all the peers we are connected to
470 send_have_to_all_peers(peer, message->piece);
471 // sending UNINTERESTED to peers that do not have what we want.
472 update_interested_after_receive(peer);
473 } else { // piece not completed
474 send_request_to_peer(peer, remote_peer, message->piece); // ask for the next block
477 XBT_DEBUG("However, we already have it");
478 request_new_piece_to_peer(peer, remote_peer);
486 // Update the peer speed.
488 connection_add_speed_value(remote_peer, 1.0 / (simgrid_get_clock() - peer->begin_receive_time));
490 peer->begin_receive_time = simgrid_get_clock();
493 /** Selects the appropriate piece to download and requests it to the remote_peer */
494 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
496 int piece = select_piece_to_download(peer, remote_peer);
498 peer->current_pieces |= (1U << (unsigned int)piece);
499 send_request_to_peer(peer, remote_peer, piece);
503 /** remove current_piece from the list of currently downloaded pieces. */
504 void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int current_piece)
506 peer->current_pieces &= ~(1U << current_piece);
507 remote_peer->current_piece = -1;
510 /** @brief Return the piece to be downloaded
511 * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
512 * If a piece is partially downloaded, this piece will be selected prioritarily
513 * If the peer has strictly less than 4 pieces, he chooses a piece at random.
514 * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
515 * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
516 * @param peer: local peer
517 * @param remote_peer: information about the connection
518 * @return the piece to download if possible. -1 otherwise
520 int select_piece_to_download(const_peer_t peer, const_connection_t remote_peer)
522 int piece = partially_downloaded_piece(peer, remote_peer);
523 // strict priority policy
528 if (count_pieces(peer->current_pieces) >= (FILE_PIECES - count_pieces(peer->bitfield)) &&
529 (is_interested(peer, remote_peer) != 0)) {
530 int nb_interesting_pieces = 0;
531 // compute the number of interesting pieces
532 for (unsigned int i = 0; i < FILE_PIECES; i++) {
533 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
534 nb_interesting_pieces++;
537 xbt_assert(nb_interesting_pieces != 0);
538 // get a random interesting piece
539 int random_piece_index = rand() % nb_interesting_pieces;
540 int current_index = 0;
541 for (unsigned int i = 0; i < FILE_PIECES; i++) {
542 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
543 if (random_piece_index == current_index) {
550 xbt_assert(piece != -1);
553 // Random first policy
554 if (count_pieces(peer->bitfield) < 4 && (is_interested_and_free(peer, remote_peer) != 0)) {
555 int nb_interesting_pieces = 0;
556 // compute the number of interesting pieces
557 for (unsigned int i = 0; i < FILE_PIECES; i++) {
558 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
559 peer_is_not_downloading_piece(peer, i)) {
560 nb_interesting_pieces++;
563 xbt_assert(nb_interesting_pieces != 0);
564 // get a random interesting piece
565 int random_piece_index = rand() % nb_interesting_pieces;
566 int current_index = 0;
567 for (unsigned int i = 0; i < FILE_PIECES; i++) {
568 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
569 peer_is_not_downloading_piece(peer, i)) {
570 if (random_piece_index == current_index) {
577 xbt_assert(piece != -1);
579 } else { // Rarest first policy
580 short min = SHRT_MAX;
581 int nb_min_pieces = 0;
582 int current_index = 0;
583 // compute the smallest number of copies of available pieces
584 for (unsigned int i = 0; i < FILE_PIECES; i++) {
585 if (peer->pieces_count[i] < min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
586 peer_is_not_downloading_piece(peer, i))
587 min = peer->pieces_count[i];
589 xbt_assert(min != SHRT_MAX || (is_interested_and_free(peer, remote_peer) == 0));
590 // compute the number of rarest pieces
591 for (unsigned int i = 0; i < FILE_PIECES; i++) {
592 if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
593 peer_is_not_downloading_piece(peer, i))
596 xbt_assert(nb_min_pieces != 0 || (is_interested_and_free(peer, remote_peer) == 0));
597 // get a random rarest piece
598 int random_rarest_index = 0;
599 if (nb_min_pieces > 0) {
600 random_rarest_index = rand() % nb_min_pieces;
602 for (unsigned int i = 0; i < FILE_PIECES; i++) {
603 if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
604 peer_is_not_downloading_piece(peer, i)) {
605 if (random_rarest_index == current_index) {
612 xbt_assert(piece != -1 || (is_interested_and_free(peer, remote_peer) == 0));
617 /** Update the list of current choked and unchoked peers, using the choke algorithm */
618 void update_choked_peers(peer_t peer)
620 if (nb_interested_peers(peer) == 0)
622 XBT_DEBUG("(%d) update_choked peers %u active peers", peer->id, xbt_dict_size(peer->active_peers));
623 // update the current round
624 peer->round = (peer->round + 1) % 3;
626 char* choked_key = NULL;
627 connection_t chosen_peer = NULL;
628 connection_t choked_peer = NULL;
629 // remove a peer from the list
630 xbt_dict_cursor_t cursor = NULL;
631 xbt_dict_cursor_first(peer->active_peers, &cursor);
632 if (!xbt_dict_is_empty(peer->active_peers)) {
633 choked_key = xbt_dict_cursor_get_key(cursor);
634 choked_peer = xbt_dict_cursor_get_data(cursor);
636 xbt_dict_cursor_free(&cursor);
638 /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
639 if (has_finished(peer->bitfield)) {
640 connection_t connection;
641 double unchoke_time = simgrid_get_clock() + 1;
643 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
644 if (connection->last_unchoke < unchoke_time && (connection->interested != 0) &&
645 (connection->choked_upload != 0)) {
646 unchoke_time = connection->last_unchoke;
647 chosen_peer = connection;
651 // Random optimistic unchoking
652 if (peer->round == 0) {
655 // We choose a random peer to unchoke.
657 if (xbt_dict_length(peer->connected_peers) > 0) {
658 id_chosen = rand() % xbt_dict_length(peer->connected_peers);
661 connection_t connection;
662 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
663 if (i == id_chosen) {
664 chosen_peer = connection;
669 xbt_dict_cursor_free(&cursor);
670 xbt_assert(chosen_peer != NULL, "A peer should have been selected at this point");
671 if ((chosen_peer->interested == 0) || (chosen_peer->choked_upload == 0))
674 XBT_DEBUG("Nothing to do, keep going");
676 } while (chosen_peer == NULL && j < MAXIMUM_PEERS);
678 // Use the "fastest download" policy.
679 connection_t connection;
680 double fastest_speed = 0.0;
681 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
682 if (connection->peer_speed > fastest_speed && (connection->choked_upload != 0) &&
683 (connection->interested != 0)) {
684 chosen_peer = connection;
685 fastest_speed = connection->peer_speed;
691 if (chosen_peer != NULL)
692 XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", peer->id, chosen_peer->id,
693 chosen_peer->interested, chosen_peer->choked_upload);
695 if (choked_peer != chosen_peer) {
696 if (choked_peer != NULL) {
697 xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
698 choked_peer->choked_upload = 1;
699 xbt_assert((*((int*)choked_key) == choked_peer->id));
700 update_active_peers_set(peer, choked_peer);
701 XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, choked_peer->id);
702 send_message(peer, choked_peer->mailbox, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
704 if (chosen_peer != NULL) {
705 xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
706 chosen_peer->choked_upload = 0;
707 xbt_dict_set_ext(peer->active_peers, (char*)&chosen_peer->id, sizeof(int), chosen_peer);
708 chosen_peer->last_unchoke = simgrid_get_clock();
709 XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, chosen_peer->id);
710 update_active_peers_set(peer, chosen_peer);
711 send_message(peer, chosen_peer->mailbox, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
716 /** Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want. */
717 void update_interested_after_receive(const_peer_t peer)
720 xbt_dict_cursor_t cursor;
721 connection_t connection;
722 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
723 if (connection->am_interested != 0) {
725 // Check if the peer still has a piece we want.
726 for (unsigned int i = 0; i < FILE_PIECES; i++) {
727 if (peer_has_not_piece(peer, i) && connection_has_piece(connection, i)) {
732 if (!interested) { // no more piece to download from connection
733 connection->am_interested = 0;
734 send_message(peer, connection->mailbox, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
740 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length)
742 xbt_assert((index >= 0 && (unsigned)index <= FILE_PIECES), "Wrong piece.");
743 xbt_assert((block_index >= 0 && (unsigned)block_index <= PIECES_BLOCKS), "Wrong block : %d.", block_index);
744 for (int i = block_index; i < (block_index + block_length); i++) {
745 peer->bitfield_blocks |= (1ULL << (unsigned int)(index * PIECES_BLOCKS + i));
749 /** Returns if a peer has completed the download of a piece */
750 int piece_complete(const_peer_t peer, int index)
752 for (unsigned int i = 0; i < PIECES_BLOCKS; i++) {
753 if (!(peer->bitfield_blocks & 1ULL << (index * PIECES_BLOCKS + i))) {
760 /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */
761 int get_first_missing_block_from(const_peer_t peer, int piece)
763 for (unsigned int i = 0; i < PIECES_BLOCKS; i++) {
764 if (!(peer->bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) {
771 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
772 int partially_downloaded_piece(const_peer_t peer, const_connection_t remote_peer)
774 for (unsigned int i = 0; i < FILE_PIECES; i++) {
775 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i) &&
776 get_first_missing_block_from(peer, i) > 0)
782 int peer_has_not_piece(const_peer_t peer, unsigned int piece)
784 return !(peer->bitfield & 1U << piece);
787 /** Check that a piece is not currently being download by the peer. */
788 int peer_is_not_downloading_piece(const_peer_t peer, unsigned int piece)
790 return !(peer->current_pieces & 1U << piece);
793 /***************** Connection internal functions ***********************/
794 connection_t connection_new(int id)
796 connection_t connection = xbt_new(s_connection_t, 1);
797 char mailbox_name[MAILBOX_SIZE];
798 snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
800 connection->mailbox = sg_mailbox_by_name(mailbox_name);
801 connection->bitfield = 0;
802 connection->peer_speed = 0;
803 connection->last_unchoke = 0;
804 connection->current_piece = -1;
805 connection->am_interested = 0;
806 connection->interested = 0;
807 connection->choked_upload = 1;
808 connection->choked_download = 1;
813 void connection_add_speed_value(connection_t connection, double speed)
815 connection->peer_speed = connection->peer_speed * 0.6 + speed * 0.4;
818 int connection_has_piece(const_connection_t connection, unsigned int piece)
820 return (connection->bitfield & 1U << piece);
823 /***************** Messages creation functions ***********************/
824 /** @brief Build a new empty message */
825 message_t message_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox)
827 message_t message = xbt_new(s_message_t, 1);
828 message->peer_id = peer_id;
829 message->return_mailbox = return_mailbox;
830 message->type = type;
834 /** Builds a message containing an index. */
835 message_t message_index_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, int index)
837 message_t message = message_new(type, peer_id, return_mailbox);
838 message->piece = index;
842 message_t message_other_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield)
844 message_t message = message_new(type, peer_id, return_mailbox);
845 message->bitfield = bitfield;
849 message_t message_request_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
851 message_t message = message_index_new(MESSAGE_REQUEST, peer_id, return_mailbox, piece);
852 message->block_index = block_index;
853 message->block_length = block_length;
857 message_t message_piece_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
859 message_t message = message_index_new(MESSAGE_PIECE, peer_id, return_mailbox, piece);
860 message->block_index = block_index;
861 message->block_length = block_length;