From: Frederic Suter Date: Thu, 19 Mar 2020 14:26:03 +0000 (+0100) Subject: convert and massevely rewrite app-bittorrent X-Git-Tag: v3.26~728 X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/c877721341b71a10f05fe7864ed41e5b7925c587?ds=sidebyside;hp=1cde8d0c05880204fb76bdb7dcfae129b58b9f69 convert and massevely rewrite app-bittorrent --- diff --git a/MANIFEST.in b/MANIFEST.in index 09810cb88d..b9ba73a80a 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -39,6 +39,15 @@ include examples/c/actor-suspend/actor-suspend.tesh include examples/c/actor-yield/actor-yield.c include examples/c/actor-yield/actor-yield.tesh include examples/c/actor-yield/actor-yield_d.xml +include examples/c/app-bittorrent/app-bittorrent.c +include examples/c/app-bittorrent/app-bittorrent.h +include examples/c/app-bittorrent/app-bittorrent.tesh +include examples/c/app-bittorrent/app-bittorrent_d.xml +include examples/c/app-bittorrent/bittorrent-peer.c +include examples/c/app-bittorrent/bittorrent-peer.h +include examples/c/app-bittorrent/generate.py +include examples/c/app-bittorrent/tracker.c +include examples/c/app-bittorrent/tracker.h include examples/c/app-chainsend/app-chainsend.tesh include examples/c/app-chainsend/app-chainsend_d.xml include examples/c/app-chainsend/broadcaster.c @@ -655,19 +664,6 @@ include teshsuite/mc/random-bug/random-bug-nocrash.tesh include teshsuite/mc/random-bug/random-bug-replay.tesh include teshsuite/mc/random-bug/random-bug.cpp include teshsuite/mc/random-bug/random-bug.tesh -include teshsuite/msg/app-bittorrent/app-bittorrent.tesh -include teshsuite/msg/app-bittorrent/app-bittorrent_d.xml -include teshsuite/msg/app-bittorrent/bittorrent-messages.c -include teshsuite/msg/app-bittorrent/bittorrent-messages.h -include teshsuite/msg/app-bittorrent/bittorrent-peer.c -include teshsuite/msg/app-bittorrent/bittorrent-peer.h -include teshsuite/msg/app-bittorrent/bittorrent.c -include teshsuite/msg/app-bittorrent/bittorrent.h -include teshsuite/msg/app-bittorrent/connection.c -include teshsuite/msg/app-bittorrent/connection.h -include teshsuite/msg/app-bittorrent/generate.py -include teshsuite/msg/app-bittorrent/tracker.c -include teshsuite/msg/app-bittorrent/tracker.h include teshsuite/msg/cloud-two-tasks/cloud-two-tasks.c include teshsuite/msg/cloud-two-tasks/cloud-two-tasks.tesh include teshsuite/msg/get_sender/get_sender.c diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 8126348a19..e74de6efa6 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -18,11 +18,21 @@ foreach(x add_dependencies(tests ${x}-c) set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.tesh) - set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.c) + set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.c) endforeach() # Multi-files examples ###################### +# bittorrent example +add_executable (app-bittorrent-c EXCLUDE_FROM_ALL app-bittorrent/app-bittorrent.c app-bittorrent/bittorrent-peer.c app-bittorrent/tracker.c) +target_link_libraries(app-bittorrent-c simgrid) +set_target_properties(app-bittorrent-c PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/app-bittorrent) +add_dependencies(tests app-bittorrent-c) + +foreach (file app-bittorrent bittorrent-peer tracker) + set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/${file}.c ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/${file}.h) +endforeach() + # Chainsend example add_executable (app-chainsend-c EXCLUDE_FROM_ALL app-chainsend/chainsend.c app-chainsend/broadcaster.c @@ -32,9 +42,9 @@ set_target_properties(app-chainsend-c PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAK add_dependencies(tests app-chainsend-c) foreach (file chainsend broadcaster peer) - set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/${file}.c) + set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/${file}.c) endforeach() -set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/chainsend.h) +set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/chainsend.h) #DHT-Kademlia add_executable (dht-kademlia-c EXCLUDE_FROM_ALL dht-kademlia/dht-kademlia.c dht-kademlia/node.c dht-kademlia/routing_table.c dht-kademlia/message.c dht-kademlia/answer.c) @@ -49,18 +59,20 @@ endforeach() # Add all extra files to the archive #################################### -set(teshsuite_src ${teshsuite_src} PARENT_SCOPE) -set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/app-chainsend.tesh +set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/app-bittorrent.tesh + ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/app-chainsend.tesh ${CMAKE_CURRENT_SOURCE_DIR}/app-masterworker/app-masterworker-multicore.tesh ${CMAKE_CURRENT_SOURCE_DIR}/app-masterworker/app-masterworker-vivaldi.tesh ${CMAKE_CURRENT_SOURCE_DIR}/dht-kademlia/dht-kademlia.tesh PARENT_SCOPE) -set(bin_files ${bin_files} ${CMAKE_CURRENT_SOURCE_DIR}/dht-kademlia/generate.py PARENT_SCOPE) +set(bin_files ${bin_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/generate.py + ${CMAKE_CURRENT_SOURCE_DIR}/dht-kademlia/generate.py PARENT_SCOPE) set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/dht-kademlia/common.h ${CMAKE_CURRENT_SOURCE_DIR}/dht-kademlia/dht-kademlia.c PARENT_SCOPE) set(xml_files ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/actor-create/actor-create_d.xml ${CMAKE_CURRENT_SOURCE_DIR}/actor-lifetime/actor-lifetime_d.xml ${CMAKE_CURRENT_SOURCE_DIR}/actor-yield/actor-yield_d.xml + ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/app-bittorrent_d.xml ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/app-chainsend_d.xml ${CMAKE_CURRENT_SOURCE_DIR}/app-masterworker/app-masterworker_d.xml ${CMAKE_CURRENT_SOURCE_DIR}/app-masterworker/app-masterworker-multicore_d.xml @@ -80,7 +92,7 @@ set(xml_files ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/actor-create/actor-cr foreach(x actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-stacksize actor-suspend actor-yield - app-chainsend app-masterworker app-pingpong app-token-ring + app-bittorrent app-chainsend app-masterworker app-pingpong app-token-ring async-wait async-waitall async-waitany cloud-capping cloud-masterworker cloud-migration cloud-simple exec-async exec-basic exec-dvfs exec-remote exec-waitany @@ -118,6 +130,12 @@ else() set(parallel-factories "thread;ucontext;raw;boost") endif() +ADD_TESH_FACTORIES(c-app-bittorrent-parallel "raw" --cfg contexts/nthreads:4 ${CONTEXTS_SYNCHRO} + --setenv bindir=${CMAKE_BINARY_DIR}/examples/c/app-bittorrent + --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/platforms + --setenv platfdir=${CMAKE_HOME_DIRECTORY}/examples/platforms + --cd ${CMAKE_HOME_DIRECTORY}/examples/c/app-bittorrent app-bittorrent.tesh) + ADD_TESH_FACTORIES(c-dht-kademlia-parallel "${parallel-factories}" --cfg contexts/nthreads:4 ${CONTEXTS_SYNCHRO} --setenv bindir=${CMAKE_BINARY_DIR}/examples/c/dht-kademlia --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/c/dht-kademlia diff --git a/teshsuite/msg/app-bittorrent/bittorrent.c b/examples/c/app-bittorrent/app-bittorrent.c similarity index 62% rename from teshsuite/msg/app-bittorrent/bittorrent.c rename to examples/c/app-bittorrent/app-bittorrent.c index ce6ff2701d..56a3cc1cb4 100644 --- a/teshsuite/msg/app-bittorrent/bittorrent.c +++ b/examples/c/app-bittorrent/app-bittorrent.c @@ -3,27 +3,29 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ -#include "bittorrent.h" +#include "app-bittorrent.h" #include "bittorrent-peer.h" #include "tracker.h" -#include + +#include +#include /** Bittorrent example launcher */ int main(int argc, char* argv[]) { - MSG_init(&argc, argv); + simgrid_init(&argc, argv); /* Check the arguments */ xbt_assert(argc > 2, "Usage: %s platform_file deployment_file", argv[0]); - MSG_create_environment(argv[1]); + simgrid_load_platform(argv[1]); - MSG_function_register("tracker", tracker); - MSG_function_register("peer", peer); + simgrid_register_function("tracker", tracker); + simgrid_register_function("peer", peer); - MSG_launch_application(argv[2]); + simgrid_load_deployment(argv[2]); - MSG_main(); + simgrid_run(); return 0; } diff --git a/examples/c/app-bittorrent/app-bittorrent.h b/examples/c/app-bittorrent/app-bittorrent.h new file mode 100644 index 0000000000..2afbfbb093 --- /dev/null +++ b/examples/c/app-bittorrent/app-bittorrent.h @@ -0,0 +1,88 @@ +/* Copyright (c) 2012-2020. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef BITTORRENT_BITTORRENT_H_ +#define BITTORRENT_BITTORRENT_H_ +#include +#include +#include +#include +#include +#include +#include + +#define MAILBOX_SIZE 40 +#define TRACKER_MAILBOX "tracker_mailbox" +/** Max number of pairs sent by the tracker to clients */ +#define MAXIMUM_PEERS 50 +/** Interval of time where the peer should send a request to the tracker */ +#define TRACKER_QUERY_INTERVAL 1000 +/** Communication size for a task to the tracker */ +#define TRACKER_COMM_SIZE 1 +#define GET_PEERS_TIMEOUT 10000 +/** Number of peers that can be unchocked at a given time */ +#define MAX_UNCHOKED_PEERS 4 +/** Interval between each update of the choked peers */ +#define UPDATE_CHOKED_INTERVAL 30 + +/** Message sizes + * Sizes based on report by A. Legout et al, Understanding BitTorrent: An Experimental Perspective + * http://hal.inria.fr/inria-00000156/en + */ +#define MESSAGE_HANDSHAKE_SIZE 68 +#define MESSAGE_CHOKE_SIZE 5 +#define MESSAGE_UNCHOKE_SIZE 5 +#define MESSAGE_INTERESTED_SIZE 5 +#define MESSAGE_NOTINTERESTED_SIZE 5 +#define MESSAGE_HAVE_SIZE 9 +#define MESSAGE_BITFIELD_SIZE 5 +#define MESSAGE_REQUEST_SIZE 17 +#define MESSAGE_PIECE_SIZE 13 +#define MESSAGE_CANCEL_SIZE 17 + +/** Types of messages exchanged between two peers. */ +typedef enum { + MESSAGE_HANDSHAKE, + MESSAGE_CHOKE, + MESSAGE_UNCHOKE, + MESSAGE_INTERESTED, + MESSAGE_NOTINTERESTED, + MESSAGE_HAVE, + MESSAGE_BITFIELD, + MESSAGE_REQUEST, + MESSAGE_PIECE, + MESSAGE_CANCEL +} e_message_type; + +/** Message data */ +typedef struct s_message { + e_message_type type; + int peer_id; + sg_mailbox_t return_mailbox; + unsigned int bitfield; + int piece; + int block_index; + int block_length; +} s_message_t; +typedef s_message_t* message_t; + +/** Builds a new value-less message */ +message_t message_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox); +/** Builds a new "have/piece" message */ +message_t message_index_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, int index); + +message_t message_other_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield); +/** Builds a new bitfield message */ +message_t message_bitfield_new(int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield); +/** Builds a new "request" message */ +message_t message_request_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length); +/** Build a new "piece" message */ +message_t message_piece_new(int peer_id, sg_mailbox_t return_mailbox, int index, int block_index, int block_length); + +/** Free a message task */ +void message_free(void*); + +#endif /* BITTORRENT_BITTORRENT_H_ */ diff --git a/teshsuite/msg/app-bittorrent/app-bittorrent.tesh b/examples/c/app-bittorrent/app-bittorrent.tesh similarity index 88% rename from teshsuite/msg/app-bittorrent/app-bittorrent.tesh rename to examples/c/app-bittorrent/app-bittorrent.tesh index 91c2c6ab4c..cd97abc277 100644 --- a/teshsuite/msg/app-bittorrent/app-bittorrent.tesh +++ b/examples/c/app-bittorrent/app-bittorrent.tesh @@ -1,10 +1,10 @@ #!/usr/bin/env tesh -p Testing the Bittorrent implementation with MSG +p Bittorrent implementation ! timeout 10 ! output sort 19 -$ ${bindir:=.}/bittorrent ${platfdir}/cluster_backbone.xml app-bittorrent_d.xml "--log=root.fmt:[%12.6r]%e(%i:%P@%h)%e%m%n" +$ ${bindir:=.}/app-bittorrent-c ${platfdir}/cluster_backbone.xml app-bittorrent_d.xml "--log=root.fmt:[%12.6r]%e(%i:%P@%h)%e%m%n" > [ 0.000000] (1:tracker@node-0.simgrid.org) Tracker launched. > [ 0.000000] (2:peer@node-1.simgrid.org) Hi, I'm joining the network with id 2 > [ 0.000000] (3:peer@node-2.simgrid.org) Hi, I'm joining the network with id 3 diff --git a/teshsuite/msg/app-bittorrent/app-bittorrent_d.xml b/examples/c/app-bittorrent/app-bittorrent_d.xml similarity index 100% rename from teshsuite/msg/app-bittorrent/app-bittorrent_d.xml rename to examples/c/app-bittorrent/app-bittorrent_d.xml diff --git a/teshsuite/msg/app-bittorrent/bittorrent-peer.c b/examples/c/app-bittorrent/bittorrent-peer.c similarity index 58% rename from teshsuite/msg/app-bittorrent/bittorrent-peer.c rename to examples/c/app-bittorrent/bittorrent-peer.c index 4c2bdef16a..05d4e2f406 100644 --- a/teshsuite/msg/app-bittorrent/bittorrent-peer.c +++ b/examples/c/app-bittorrent/bittorrent-peer.c @@ -4,15 +4,13 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "bittorrent-peer.h" -#include "bittorrent-messages.h" -#include "connection.h" #include "tracker.h" -#include +#include #include #include /* snprintf */ -XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers"); +XBT_LOG_NEW_DEFAULT_CATEGORY(bittorrent_peers, "Messages specific for the peers"); /* * User parameters for transferred file data. For the test, the default values are : @@ -21,44 +19,61 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers"); #define FILE_PIECES 10UL #define PIECES_BLOCKS 5UL #define BLOCK_SIZE 16384 -static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE; /** Number of blocks asked by each request */ -#define BLOCKS_REQUESTED 2 +#define BLOCKS_REQUESTED 2UL #define SLEEP_DURATION 1 +#define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0) -int count_pieces(unsigned int bitfield) +#ifndef MIN +#define MIN(a, b) ((a) < (b) ? (a) : (b)) +#endif + +static peer_t peer_init(int id, int seed) { - int count = 0; - unsigned int n = bitfield; - while (n) { - count += n & 1U; - n >>= 1U; + peer_t peer = xbt_new(s_peer_t, 1); + peer->id = id; + + char mailbox_name[MAILBOX_SIZE]; + snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id); + peer->mailbox = sg_mailbox_by_name(mailbox_name); + + peer->connected_peers = xbt_dict_new_homogeneous(NULL); + peer->active_peers = xbt_dict_new_homogeneous(NULL); + + if (seed) { + peer->bitfield = (1U << FILE_PIECES) - 1U; + peer->bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL; + } else { + peer->bitfield = 0; + peer->bitfield_blocks = 0; } - return count; -} -int peer_has_not_piece(const s_peer_t* peer, unsigned int piece) -{ - return !(peer->bitfield & 1U << piece); -} + peer->current_pieces = 0; + peer->pieces_count = xbt_new0(short, FILE_PIECES); + peer->comm_received = NULL; + peer->round = 0; -/** Check that a piece is not currently being download by the peer. */ -int peer_is_not_downloading_piece(const s_peer_t* peer, unsigned int piece) -{ - return !(peer->current_pieces & 1U << piece); + return peer; } -void get_status(char** status, unsigned int bitfield) +static void peer_free(peer_t peer) { - for (int i = FILE_PIECES - 1; i >= 0; i--) - (*status)[i] = (bitfield & (1U << i)) ? '1' : '0'; - (*status)[FILE_PIECES] = '\0'; + char* key; + connection_t connection; + xbt_dict_cursor_t cursor; + xbt_dict_foreach (peer->connected_peers, cursor, key, connection) + connection_free(connection); + + xbt_dict_free(&peer->connected_peers); + xbt_dict_free(&peer->active_peers); + xbt_free(peer->pieces_count); + xbt_free(peer); } /** Peer main function */ -int peer(int argc, char* argv[]) +void peer(int argc, char* argv[]) { // Check arguments xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments"); @@ -67,23 +82,26 @@ int peer(int argc, char* argv[]) peer_t peer = peer_init(xbt_str_parse_int(argv[1], "Invalid ID: %s"), argc == 4 ? 1 : 0); // Retrieve deadline - double deadline = xbt_str_parse_double(argv[2], "Invalid deadline: %s"); - xbt_assert(deadline > 0, "Wrong deadline supplied"); + peer->deadline = xbt_str_parse_double(argv[2], "Invalid deadline: %s"); + xbt_assert(peer->deadline > 0, "Wrong deadline supplied"); char* status = xbt_malloc0(FILE_PIECES + 1); get_status(&status, peer->bitfield); + XBT_INFO("Hi, I'm joining the network with id %d", peer->id); + // Getting peer data from the tracker. - if (get_peers_data(peer)) { - XBT_DEBUG("Got %d peers from the tracker. Current status is: %s", xbt_dict_length(peer->peers), status); - peer->begin_receive_time = MSG_get_clock(); - MSG_mailbox_set_async(peer->mailbox); + if (get_peers_from_tracker(peer)) { + XBT_DEBUG("Got %d peers from the tracker. Current status is: %s", xbt_dict_length(peer->connected_peers), status); + peer->begin_receive_time = simgrid_get_clock(); + sg_mailbox_set_receiver(sg_mailbox_get_name(peer->mailbox)); + if (has_finished(peer->bitfield)) { - send_handshake_all(peer); + send_handshake_to_all_peers(peer); } else { - leech_loop(peer, deadline); + leech(peer); } - seed_loop(peer, deadline); + seed(peer); } else { XBT_INFO("Couldn't contact the tracker."); } @@ -91,202 +109,249 @@ int peer(int argc, char* argv[]) get_status(&status, peer->bitfield); XBT_INFO("Here is my current status: %s", status); if (peer->comm_received) { - MSG_comm_destroy(peer->comm_received); + sg_comm_unref(peer->comm_received); } xbt_free(status); peer_free(peer); - return 0; } -/** @brief Peer main loop when it is leeching. - * @param peer peer data - * @param deadline time at which the peer has to leave - */ -void leech_loop(peer_t peer, double deadline) +/** @brief Retrieves the peer list from the tracker */ +int get_peers_from_tracker(const_peer_t peer) { - double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL; - XBT_DEBUG("Start downloading."); + sg_mailbox_t tracker_mailbox = sg_mailbox_by_name(TRACKER_MAILBOX); - /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */ - send_handshake_all(peer); - XBT_DEBUG("Starting main leech loop"); + // Build the task to send to the tracker + tracker_query_t peer_request = tracker_query_new(peer->id, peer->mailbox); - while (MSG_get_clock() < deadline && count_pieces(peer->bitfield) < FILE_PIECES) { - if (peer->comm_received == NULL) { - peer->task_received = NULL; - peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox); - } - if (MSG_comm_test(peer->comm_received)) { - msg_error_t status = MSG_comm_get_status(peer->comm_received); - MSG_comm_destroy(peer->comm_received); - peer->comm_received = NULL; - if (status == MSG_OK) { - handle_message(peer, peer->task_received); - } - } else { - // We don't execute the choke algorithm if we don't already have a piece - if (MSG_get_clock() >= next_choked_update && count_pieces(peer->bitfield) > 0) { - update_choked_peers(peer); - next_choked_update += UPDATE_CHOKED_INTERVAL; - } else { - MSG_process_sleep(SLEEP_DURATION); - } - } + XBT_DEBUG("Sending a peer request to the tracker."); + sg_comm_t request = sg_mailbox_put_async(tracker_mailbox, peer_request, TRACKER_COMM_SIZE); + sg_error_t res = sg_comm_wait_for(request, GET_PEERS_TIMEOUT); + + if (res == SG_ERROR_TIMEOUT) { + XBT_DEBUG("Timeout expired when requesting peers to tracker"); + xbt_free(peer_request); + return 0; } - if (has_finished(peer->bitfield)) - XBT_DEBUG("%d becomes a seeder", peer->id); -} -/** @brief Peer main loop when it is seeding - * @param peer peer data - * @param deadline time when the peer will leave - */ -void seed_loop(peer_t peer, double deadline) -{ - double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL; - XBT_DEBUG("Start seeding."); - // start the main seed loop - while (MSG_get_clock() < deadline) { - if (peer->comm_received == NULL) { - peer->task_received = NULL; - peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox); - } - if (MSG_comm_test(peer->comm_received)) { - msg_error_t status = MSG_comm_get_status(peer->comm_received); - MSG_comm_destroy(peer->comm_received); - peer->comm_received = NULL; - if (status == MSG_OK) { - handle_message(peer, peer->task_received); - } - } else { - if (MSG_get_clock() >= next_choked_update) { - update_choked_peers(peer); - // TODO: Change the choked peer algorithm when seeding. - next_choked_update += UPDATE_CHOKED_INTERVAL; - } else { - MSG_process_sleep(SLEEP_DURATION); - } + void* message = NULL; + tracker_answer_t ta = NULL; + sg_comm_t comm_received = sg_mailbox_get_async(peer->mailbox, &message); + res = sg_comm_wait_for(comm_received, GET_PEERS_TIMEOUT); + if (res == SG_OK) { + ta = (tracker_answer_t)message; + // Add the peers the tracker gave us to our peer list. + unsigned i; + int peer_id; + // Add the peers the tracker gave us to our peer list. + xbt_dynar_foreach (ta->peers, i, peer_id) { + if (peer_id != peer->id) + xbt_dict_set_ext(peer->connected_peers, (char*)&peer_id, sizeof(int), connection_new(peer_id)); } + tracker_answer_free(message); + } else if (res == SG_ERROR_TIMEOUT) { + XBT_DEBUG("Timeout expired when requesting peers to tracker"); + tracker_answer_free(message); + return 0; } + + return 1; } -/** @brief Retrieves the peer list from the tracker - * @param peer current peer data - */ -int get_peers_data(const s_peer_t* peer) +/** @brief Send a handshake message to all the peers the peer has. */ +void send_handshake_to_all_peers(const_peer_t peer) { - int success = 0; - double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT; - - // Build the task to send to the tracker - tracker_task_data_t data = - tracker_task_data_new(MSG_host_get_name(MSG_host_self()), peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE); - msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data); - while ((success == 0) && MSG_get_clock() < timeout) { - XBT_DEBUG("Sending a peer request to the tracker."); - msg_error_t status = MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX, GET_PEERS_TIMEOUT); - if (status == MSG_OK) { - success = 1; - } - } - - success = 0; - msg_task_t task_received = NULL; - while ((success == 0) && MSG_get_clock() < timeout) { - msg_comm_t comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker); - msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT); - if (status == MSG_OK) { - tracker_task_data_t data = MSG_task_get_data(task_received); - unsigned i; - int peer_id; - // Add the peers the tracker gave us to our peer list. - xbt_dynar_foreach (data->peers, i, peer_id) { - if (peer_id != peer->id) - xbt_dict_set_ext(peer->peers, (char*)&peer_id, sizeof(int), connection_new(peer_id)); - } - success = 1; - // free the communication and the task - MSG_comm_destroy(comm_received); - tracker_task_data_free(data); - MSG_task_destroy(task_received); - } + connection_t remote_peer; + xbt_dict_cursor_t cursor = NULL; + char* key; + xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) { + message_t handshake = message_new(MESSAGE_HANDSHAKE, peer->id, peer->mailbox); + sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, handshake, MESSAGE_HANDSHAKE_SIZE); + sg_comm_detach(comm, NULL); + XBT_DEBUG("Sending a HANDSHAKE to %s", sg_mailbox_get_name(remote_peer->mailbox)); } +} - return success; +void send_message(const_peer_t peer, sg_mailbox_t mailbox, e_message_type type, uint64_t size) +{ + const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"}; + XBT_DEBUG("Sending %s to %s", type_names[type], sg_mailbox_get_name(mailbox)); + message_t message = message_other_new(type, peer->id, peer->mailbox, peer->bitfield); + sg_comm_t comm = sg_mailbox_put_init(mailbox, message, size); + sg_comm_detach(comm, NULL); } -/** @brief Initialize the peer data. - * @param peer peer data - * @param id id of the peer to take in the network - * @param seed indicates if the peer is a seed. - */ -peer_t peer_init(int id, int seed) +/** @brief Send a bitfield message to all the peers the peer has */ +void send_bitfield(const_peer_t peer, sg_mailbox_t mailbox) { - peer_t peer = xbt_new(s_peer_t, 1); - peer->id = id; - peer->hostname = MSG_host_get_name(MSG_host_self()); + XBT_DEBUG("Sending a BITFIELD to %s", sg_mailbox_get_name(mailbox)); + message_t message = message_bitfield_new(peer->id, peer->mailbox, peer->bitfield); + sg_comm_t comm = sg_mailbox_put_init(mailbox, message, MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES)); + sg_comm_detach(comm, NULL); +} - snprintf(peer->mailbox, MAILBOX_SIZE - 1, "%d", id); - snprintf(peer->mailbox_tracker, MAILBOX_SIZE - 1, "tracker_%d", id); - peer->peers = xbt_dict_new_homogeneous(NULL); - peer->active_peers = xbt_dict_new_homogeneous(NULL); +/** Send a "piece" message to a pair, containing a piece of the file */ +void send_piece(const_peer_t peer, sg_mailbox_t mailbox, int piece, int block_index, int block_length) +{ + XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, sg_mailbox_get_name(mailbox)); + xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist."); + xbt_assert(!peer_has_not_piece(peer, piece), "Tried to send a piece that we doesn't have."); + message_t message = message_piece_new(peer->id, peer->mailbox, piece, block_index, block_length); + sg_comm_t comm = sg_mailbox_put_init(mailbox, message, BLOCK_SIZE); + sg_comm_detach(comm, NULL); +} - if (seed) { - peer->bitfield = (1U << FILE_PIECES) - 1U; - peer->bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL; - } else { - peer->bitfield = 0; - peer->bitfield_blocks = 0; +/** Send a "HAVE" message to all peers we are connected to */ +void send_have_to_all_peers(const_peer_t peer, int piece) +{ + XBT_DEBUG("Sending HAVE message to all my peers"); + connection_t remote_peer; + xbt_dict_cursor_t cursor = NULL; + char* key; + xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) { + message_t message = message_index_new(MESSAGE_HAVE, peer->id, peer->mailbox, piece); + sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_HAVE_SIZE); + sg_comm_detach(comm, NULL); } +} - peer->current_pieces = 0; +/** @brief Send request messages to a peer that have unchoked us */ +void send_request_to_peer(const_peer_t peer, connection_t remote_peer, int piece) +{ + remote_peer->current_piece = piece; + xbt_assert(connection_has_piece(remote_peer, piece)); + int block_index = get_first_missing_block_from(peer, piece); + if (block_index != -1) { + int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index); + XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", sg_mailbox_get_name(remote_peer->mailbox), piece, + block_index, block_length); + message_t message = message_request_new(peer->id, peer->mailbox, piece, block_index, block_length); + sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_REQUEST_SIZE); + sg_comm_detach(comm, NULL); + } +} - peer->pieces_count = xbt_new0(short, FILE_PIECES); +void get_status(char** status, unsigned int bitfield) +{ + for (int i = FILE_PIECES - 1; i >= 0; i--) + (*status)[i] = (bitfield & (1U << i)) ? '1' : '0'; + (*status)[FILE_PIECES] = '\0'; +} - peer->comm_received = NULL; +int has_finished(unsigned int bitfield) +{ + return bitfield == (1U << FILE_PIECES) - 1U; +} - peer->round = 0; +/** Indicates if the remote peer has a piece not stored by the local peer */ +int is_interested(const_peer_t peer, const_connection_t remote_peer) +{ + return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1)); +} - return peer; +/** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */ +int is_interested_and_free(const_peer_t peer, const_connection_t remote_peer) +{ + for (int i = 0; i < FILE_PIECES; i++) + if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i)) + return 1; + return 0; } -/** Destroys a poor peer object. */ -void peer_free(peer_t peer) +/** @brief Updates the list of who has a piece from a bitfield */ +void update_pieces_count_from_bitfield(const_peer_t peer, unsigned int bitfield) { - char* key; - connection_t connection; - xbt_dict_cursor_t cursor; - xbt_dict_foreach (peer->peers, cursor, key, connection) { - connection_free(connection); - } - xbt_dict_free(&peer->peers); - xbt_dict_free(&peer->active_peers); - xbt_free(peer->pieces_count); - xbt_free(peer); + for (int i = 0; i < FILE_PIECES; i++) + if (bitfield & (1U << i)) + peer->pieces_count[i]++; } -/** @brief Returns if a peer has finished downloading the file - * @param bitfield peer bitfield - */ -int has_finished(unsigned int bitfield) +int count_pieces(unsigned int bitfield) { - return bitfield == (1U << FILE_PIECES) - 1U; + int count = 0; + unsigned int n = bitfield; + while (n) { + count += n & 1U; + n >>= 1U; + } + return count; } -int nb_interested_peers(const s_peer_t* peer) +int nb_interested_peers(const_peer_t peer) { xbt_dict_cursor_t cursor = NULL; char* key; connection_t connection; int nb = 0; - xbt_dict_foreach (peer->peers, cursor, key, connection) { + xbt_dict_foreach (peer->connected_peers, cursor, key, connection) if (connection->interested) nb++; - } + return nb; } +/** @brief Peer main loop when it is leeching. */ +void leech(peer_t peer) +{ + double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL; + XBT_DEBUG("Start downloading."); + + /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */ + send_handshake_to_all_peers(peer); + XBT_DEBUG("Starting main leech loop"); + + void* data = NULL; + while (simgrid_get_clock() < peer->deadline && count_pieces(peer->bitfield) < FILE_PIECES) { + if (peer->comm_received == NULL) + peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data); + + if (sg_comm_test(peer->comm_received)) { + peer->message = (message_t)data; + handle_message(peer, peer->message); + message_free(peer->message); + peer->comm_received = NULL; + } else { + // We don't execute the choke algorithm if we don't already have a piece + if (simgrid_get_clock() >= next_choked_update && count_pieces(peer->bitfield) > 0) { + update_choked_peers(peer); + next_choked_update += UPDATE_CHOKED_INTERVAL; + } else { + sg_actor_sleep_for(SLEEP_DURATION); + } + } + } + if (has_finished(peer->bitfield)) + XBT_DEBUG("%d becomes a seeder", peer->id); +} + +/** @brief Peer main loop when it is seeding */ +void seed(peer_t peer) +{ + double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL; + XBT_DEBUG("Start seeding."); + // start the main seed loop + void* data = NULL; + while (simgrid_get_clock() < peer->deadline) { + if (peer->comm_received == NULL) + peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data); + + if (sg_comm_test(peer->comm_received)) { + peer->message = (message_t)data; + handle_message(peer, peer->message); + message_free(data); + peer->comm_received = NULL; + } else { + if (simgrid_get_clock() >= next_choked_update) { + update_choked_peers(peer); + // TODO: Change the choked peer algorithm when seeding. + next_choked_update += UPDATE_CHOKED_INTERVAL; + } else { + sg_actor_sleep_for(SLEEP_DURATION); + } + } + } +} + void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer) { if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) { @@ -297,30 +362,27 @@ void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer) } } -/** @brief Handle a received message sent by another peer - * @param peer Peer data - * @param task task received. - */ -void handle_message(peer_t peer, msg_task_t task) +/** @brief Handle a received message sent by another peer */ +void handle_message(peer_t peer, message_t message) { const char* type_names[10] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "HAVE", "BITFIELD", "REQUEST", "PIECE", "CANCEL"}; - message_t message = MSG_task_get_data(task); - XBT_DEBUG("Received a %s message from %s (%s)", type_names[message->type], message->mailbox, - message->issuer_host_name); + XBT_DEBUG("Received a %s message from %s", type_names[message->type], sg_mailbox_get_name(message->return_mailbox)); - connection_t remote_peer; - remote_peer = xbt_dict_get_or_null_ext(peer->peers, (char*)&message->peer_id, sizeof(int)); + connection_t remote_peer = xbt_dict_get_or_null_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int)); + xbt_assert(remote_peer != NULL || message->type == MESSAGE_HANDSHAKE, + "The impossible did happened: A not-in-our-list peer sent us a message."); switch (message->type) { case MESSAGE_HANDSHAKE: // Check if the peer is in our connection list. if (remote_peer == 0) { - xbt_dict_set_ext(peer->peers, (char*)&message->peer_id, sizeof(int), connection_new(message->peer_id)); - send_handshake(peer, message->mailbox); + xbt_dict_set_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int), + connection_new(message->peer_id)); + send_message(peer, message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE); } // Send our bitfield to the peer - send_bitfield(peer, message->mailbox); + send_bitfield(peer, message->return_mailbox); break; case MESSAGE_BITFIELD: // Update the pieces list @@ -330,7 +392,7 @@ void handle_message(peer_t peer, msg_task_t task) xbt_assert(!remote_peer->am_interested, "Should not be interested at first"); if (is_interested(peer, remote_peer)) { remote_peer->am_interested = 1; - send_interested(peer, message->mailbox); + send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE); } break; case MESSAGE_INTERESTED: @@ -360,55 +422,55 @@ void handle_message(peer_t peer, msg_task_t task) remove_current_piece(peer, remote_peer, remote_peer->current_piece); break; case MESSAGE_HAVE: - XBT_DEBUG("\t for piece %d", message->index); - xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong HAVE message received"); - remote_peer->bitfield = remote_peer->bitfield | (1U << message->index); - peer->pieces_count[message->index]++; + XBT_DEBUG("\t for piece %d", message->piece); + xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong HAVE message received"); + remote_peer->bitfield = remote_peer->bitfield | (1U << message->piece); + peer->pieces_count[message->piece]++; // If the piece is in our pieces, we tell the peer that we are interested. - if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer, message->index)) { + if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer, message->piece)) { remote_peer->am_interested = 1; - send_interested(peer, message->mailbox); + send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE); if (remote_peer->choked_download == 0) request_new_piece_to_peer(peer, remote_peer); } break; case MESSAGE_REQUEST: xbt_assert(remote_peer->interested); - xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong request received"); + xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong request received"); if (remote_peer->choked_upload == 0) { - XBT_DEBUG("\t for piece %d (%d,%d)", message->index, message->block_index, + XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index, message->block_index + message->block_length); - if (!peer_has_not_piece(peer, message->index)) { - send_piece(peer, message->mailbox, message->index, message->block_index, message->block_length); + if (!peer_has_not_piece(peer, message->piece)) { + send_piece(peer, message->return_mailbox, message->piece, message->block_index, message->block_length); } } else { XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id); } break; case MESSAGE_PIECE: - XBT_DEBUG(" \t for piece %d (%d,%d)", message->index, message->block_index, + XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index, message->block_index + message->block_length); xbt_assert(!remote_peer->choked_download); xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !"); - xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong piece received"); + xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong piece received"); // TODO: Execute à computation. - if (peer_has_not_piece(peer, message->index)) { - update_bitfield_blocks(peer, message->index, message->block_index, message->block_length); - if (piece_complete(peer, message->index)) { + if (peer_has_not_piece(peer, message->piece)) { + update_bitfield_blocks(peer, message->piece, message->block_index, message->block_length); + if (piece_complete(peer, message->piece)) { // Removing the piece from our piece list - remove_current_piece(peer, remote_peer, message->index); + remove_current_piece(peer, remote_peer, message->piece); // Setting the fact that we have the piece - peer->bitfield = peer->bitfield | (1U << message->index); + peer->bitfield = peer->bitfield | (1U << message->piece); char* status = xbt_malloc0(FILE_PIECES + 1); get_status(&status, peer->bitfield); XBT_DEBUG("My status is now %s", status); xbt_free(status); // Sending the information to all the peers we are connected to - send_have(peer, message->index); + send_have_to_all_peers(peer, message->piece); // sending UNINTERESTED to peers that do not have what we want. update_interested_after_receive(peer); } else { // piece not completed - send_request_to_peer(peer, remote_peer, message->index); // ask for the next block + send_request_to_peer(peer, remote_peer, message->piece); // ask for the next block } } else { XBT_DEBUG("However, we already have it"); @@ -422,11 +484,9 @@ void handle_message(peer_t peer, msg_task_t task) } // Update the peer speed. if (remote_peer) { - connection_add_speed_value(remote_peer, 1.0 / (MSG_get_clock() - peer->begin_receive_time)); + connection_add_speed_value(remote_peer, 1.0 / (simgrid_get_clock() - peer->begin_receive_time)); } - peer->begin_receive_time = MSG_get_clock(); - - task_message_free(task); + peer->begin_receive_time = simgrid_get_clock(); } /** Selects the appropriate piece to download and requests it to the remote_peer */ @@ -446,19 +506,6 @@ void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int cu remote_peer->current_piece = -1; } -/** @brief Updates the list of who has a piece from a bitfield - * @param peer peer we want to update the list - * @param bitfield bitfield - */ -void update_pieces_count_from_bitfield(const s_peer_t* peer, unsigned int bitfield) -{ - for (int i = 0; i < FILE_PIECES; i++) { - if (bitfield & (1U << i)) { - peer->pieces_count[i]++; - } - } -} - /** @brief Return the piece to be downloaded * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole : * If a piece is partially downloaded, this piece will be selected prioritarily @@ -469,7 +516,7 @@ void update_pieces_count_from_bitfield(const s_peer_t* peer, unsigned int bitfie * @param remote_peer: information about the connection * @return the piece to download if possible. -1 otherwise */ -int select_piece_to_download(const s_peer_t* peer, const s_connection_t* remote_peer) +int select_piece_to_download(const_peer_t peer, const_connection_t remote_peer) { int piece = partially_downloaded_piece(peer, remote_peer); // strict priority policy @@ -566,9 +613,7 @@ int select_piece_to_download(const s_peer_t* peer, const s_connection_t* remote_ } } -/** @brief Update the list of current choked and unchoked peers, using the choke algorithm - * @param peer the current peer - */ +/** Update the list of current choked and unchoked peers, using the choke algorithm */ void update_choked_peers(peer_t peer) { if (nb_interested_peers(peer) == 0) @@ -592,9 +637,9 @@ void update_choked_peers(peer_t peer) /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/ if (has_finished(peer->bitfield)) { connection_t connection; - double unchoke_time = MSG_get_clock() + 1; + double unchoke_time = simgrid_get_clock() + 1; - xbt_dict_foreach (peer->peers, cursor, key, connection) { + xbt_dict_foreach (peer->connected_peers, cursor, key, connection) { if (connection->last_unchoke < unchoke_time && (connection->interested != 0) && (connection->choked_upload != 0)) { unchoke_time = connection->last_unchoke; @@ -608,12 +653,12 @@ void update_choked_peers(peer_t peer) do { // We choose a random peer to unchoke. int id_chosen = 0; - if (xbt_dict_length(peer->peers) > 0) { - id_chosen = rand() % xbt_dict_length(peer->peers); + if (xbt_dict_length(peer->connected_peers) > 0) { + id_chosen = rand() % xbt_dict_length(peer->connected_peers); } - int i = 0; + int i = 0; connection_t connection; - xbt_dict_foreach (peer->peers, cursor, key, connection) { + xbt_dict_foreach (peer->connected_peers, cursor, key, connection) { if (i == id_chosen) { chosen_peer = connection; break; @@ -632,7 +677,7 @@ void update_choked_peers(peer_t peer) // Use the "fastest download" policy. connection_t connection; double fastest_speed = 0.0; - xbt_dict_foreach (peer->peers, cursor, key, connection) { + xbt_dict_foreach (peer->connected_peers, cursor, key, connection) { if (connection->peer_speed > fastest_speed && (connection->choked_upload != 0) && (connection->interested != 0)) { chosen_peer = connection; @@ -653,29 +698,27 @@ void update_choked_peers(peer_t peer) xbt_assert((*((int*)choked_key) == choked_peer->id)); update_active_peers_set(peer, choked_peer); XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, choked_peer->id); - send_choked(peer, choked_peer->mailbox); + send_message(peer, choked_peer->mailbox, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE); } if (chosen_peer != NULL) { xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer"); chosen_peer->choked_upload = 0; xbt_dict_set_ext(peer->active_peers, (char*)&chosen_peer->id, sizeof(int), chosen_peer); - chosen_peer->last_unchoke = MSG_get_clock(); + chosen_peer->last_unchoke = simgrid_get_clock(); XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, chosen_peer->id); update_active_peers_set(peer, chosen_peer); - send_unchoked(peer, chosen_peer->mailbox); + send_message(peer, chosen_peer->mailbox, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE); } } } -/** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want. - * @param peer our peer data - */ -void update_interested_after_receive(const s_peer_t* peer) +/** Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want. */ +void update_interested_after_receive(const_peer_t peer) { char* key; xbt_dict_cursor_t cursor; connection_t connection; - xbt_dict_foreach (peer->peers, cursor, key, connection) { + xbt_dict_foreach (peer->connected_peers, cursor, key, connection) { if (connection->am_interested != 0) { int interested = 0; // Check if the peer still has a piece we want. @@ -687,7 +730,7 @@ void update_interested_after_receive(const s_peer_t* peer) } if (!interested) { // no more piece to download from connection connection->am_interested = 0; - send_notinterested(peer, connection->mailbox); + send_message(peer, connection->mailbox, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE); } } } @@ -703,7 +746,7 @@ void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_l } /** Returns if a peer has completed the download of a piece */ -int piece_complete(const s_peer_t* peer, int index) +int piece_complete(const_peer_t peer, int index) { for (int i = 0; i < PIECES_BLOCKS; i++) { if (!(peer->bitfield_blocks & 1ULL << (index * PIECES_BLOCKS + i))) { @@ -714,7 +757,7 @@ int piece_complete(const s_peer_t* peer, int index) } /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */ -int get_first_block(const s_peer_t* peer, int piece) +int get_first_missing_block_from(const_peer_t peer, int piece) { for (int i = 0; i < PIECES_BLOCKS; i++) { if (!(peer->bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) { @@ -724,164 +767,115 @@ int get_first_block(const s_peer_t* peer, int piece) return -1; } -/** Indicates if the remote peer has a piece not stored by the local peer */ -int is_interested(const s_peer_t* peer, const s_connection_t* remote_peer) -{ - return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1)); -} - -/** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */ -int is_interested_and_free(const s_peer_t* peer, const s_connection_t* remote_peer) -{ - for (int i = 0; i < FILE_PIECES; i++) { - if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i)) { - return 1; - } - } - return 0; -} - /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */ -int partially_downloaded_piece(const s_peer_t* peer, const s_connection_t* remote_peer) +int partially_downloaded_piece(const_peer_t peer, const_connection_t remote_peer) { for (int i = 0; i < FILE_PIECES; i++) { if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i) && - get_first_block(peer, i) > 0) + get_first_missing_block_from(peer, i) > 0) return i; } return -1; } -/** @brief Send request messages to a peer that have unchoked us - * @param peer peer - * @param remote_peer peer data to the peer we want to send the request - */ -void send_request_to_peer(const s_peer_t* peer, connection_t remote_peer, int piece) +int peer_has_not_piece(const_peer_t peer, unsigned int piece) { - remote_peer->current_piece = piece; - xbt_assert(connection_has_piece(remote_peer, piece)); - int block_index = get_first_block(peer, piece); - if (block_index != -1) { - int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index); - send_request(peer, remote_peer->mailbox, piece, block_index, block_length); - } + return !(peer->bitfield & 1U << piece); } -/*********************************************************** - * - * Low level message functions - * - ***********************************************************/ +/** Check that a piece is not currently being download by the peer. */ +int peer_is_not_downloading_piece(const_peer_t peer, unsigned int piece) +{ + return !(peer->current_pieces & 1U << piece); +} -/** @brief Send a "interested" message to a peer - * @param peer peer data - * @param mailbox destination mailbox - */ -void send_interested(const s_peer_t* peer, const char* mailbox) +/***************** Connection internal functions ***********************/ +connection_t connection_new(int id) { - msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, peer->id, - task_message_size(MESSAGE_INTERESTED)); - MSG_task_dsend(task, mailbox, task_message_free); - XBT_DEBUG("Sending INTERESTED to %s", mailbox); + connection_t connection = xbt_new(s_connection_t, 1); + char mailbox_name[MAILBOX_SIZE]; + snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id); + connection->id = id; + connection->mailbox = sg_mailbox_by_name(mailbox_name); + connection->bitfield = 0; + connection->peer_speed = 0; + connection->last_unchoke = 0; + connection->current_piece = -1; + connection->am_interested = 0; + connection->interested = 0; + connection->choked_upload = 1; + connection->choked_download = 1; + + return connection; } -/** @brief Send a "not interested" message to a peer - * @param peer peer data - * @param mailbox destination mailbox - */ -void send_notinterested(const s_peer_t* peer, const char* mailbox) +void connection_add_speed_value(connection_t connection, double speed) { - msg_task_t task = task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox, peer->id, - task_message_size(MESSAGE_NOTINTERESTED)); - MSG_task_dsend(task, mailbox, task_message_free); - XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox); + connection->peer_speed = connection->peer_speed * 0.6 + speed * 0.4; } -/** @brief Send a handshake message to all the peers the peer has. - * @param peer peer data - */ -void send_handshake_all(const s_peer_t* peer) +void connection_free(void* data) { - connection_t remote_peer; - xbt_dict_cursor_t cursor = NULL; - char* key; - xbt_dict_foreach (peer->peers, cursor, key, remote_peer) { - msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, peer->id, - task_message_size(MESSAGE_HANDSHAKE)); - MSG_task_dsend(task, remote_peer->mailbox, task_message_free); - XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox); - } + connection_t co = (connection_t)data; + xbt_free(co); } -/** @brief Send a "handshake" message to an user - * @param peer peer data - * @param mailbox mailbox where to we send the message - */ -void send_handshake(const s_peer_t* peer, const char* mailbox) +int connection_has_piece(const_connection_t connection, unsigned int piece) { - XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox); - msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, peer->id, - task_message_size(MESSAGE_HANDSHAKE)); - MSG_task_dsend(task, mailbox, task_message_free); + return (connection->bitfield & 1U << piece); } -/** Send a "choked" message to a peer. */ -void send_choked(const s_peer_t* peer, const char* mailbox) +/***************** Messages creation functions ***********************/ +/** @brief Build a new empty message */ +message_t message_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox) { - XBT_DEBUG("Sending a CHOKE to %s", mailbox); - msg_task_t task = - task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id, task_message_size(MESSAGE_CHOKE)); - MSG_task_dsend(task, mailbox, task_message_free); + message_t message = xbt_new(s_message_t, 1); + message->peer_id = peer_id; + message->return_mailbox = return_mailbox; + message->type = type; + return message; } -/** Send a "unchoked" message to a peer */ -void send_unchoked(const s_peer_t* peer, const char* mailbox) +/** Builds a message containing an index. */ +message_t message_index_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, int index) { - XBT_DEBUG("Sending a UNCHOKE to %s", mailbox); - msg_task_t task = - task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox, peer->id, task_message_size(MESSAGE_UNCHOKE)); - MSG_task_dsend(task, mailbox, task_message_free); + message_t message = message_new(type, peer_id, return_mailbox); + message->piece = index; + return message; } -/** Send a "HAVE" message to all peers we are connected to */ -void send_have(const s_peer_t* peer, int piece) +message_t message_bitfield_new(int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield) { - XBT_DEBUG("Sending HAVE message to all my peers"); - connection_t remote_peer; - xbt_dict_cursor_t cursor = NULL; - char* key; - xbt_dict_foreach (peer->peers, cursor, key, remote_peer) { - msg_task_t task = task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox, peer->id, piece, - task_message_size(MESSAGE_HAVE)); - MSG_task_dsend(task, remote_peer->mailbox, task_message_free); - } + message_t message = message_new(MESSAGE_BITFIELD, peer_id, return_mailbox); + message->bitfield = bitfield; + return message; } -/** @brief Send a bitfield message to all the peers the peer has. - * @param peer peer data - */ -void send_bitfield(const s_peer_t* peer, const char* mailbox) +message_t message_other_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield) { - XBT_DEBUG("Sending a BITFIELD to %s", mailbox); - msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, peer->bitfield, FILE_PIECES); - MSG_task_dsend(task, mailbox, task_message_free); + message_t message = message_new(type, peer_id, return_mailbox); + message->bitfield = bitfield; + return message; } -/** Send a "request" message to a pair, containing a request for a piece */ -void send_request(const s_peer_t* peer, const char* mailbox, int piece, int block_index, int block_length) +message_t message_request_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length) { - XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece, block_index, block_length); - msg_task_t task = task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length); - MSG_task_dsend(task, mailbox, task_message_free); + message_t message = message_index_new(MESSAGE_REQUEST, peer_id, return_mailbox, piece); + message->block_index = block_index; + message->block_length = block_length; + return message; } -/** Send a "piece" message to a pair, containing a piece of the file */ -void send_piece(const s_peer_t* peer, const char* mailbox, int piece, int block_index, int block_length) +message_t message_piece_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length) { - XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, mailbox); - xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist."); - xbt_assert(!peer_has_not_piece(peer, piece), "Tried to send a piece that we doesn't have."); - msg_task_t task = - task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length, BLOCK_SIZE); - MSG_task_dsend(task, mailbox, task_message_free); + message_t message = message_index_new(MESSAGE_PIECE, peer_id, return_mailbox, piece); + message->block_index = block_index; + message->block_length = block_length; + return message; +} + +void message_free(void* task) +{ + message_t message = (message_t)task; + xbt_free(message); } diff --git a/examples/c/app-bittorrent/bittorrent-peer.h b/examples/c/app-bittorrent/bittorrent-peer.h new file mode 100644 index 0000000000..b1a88fbab6 --- /dev/null +++ b/examples/c/app-bittorrent/bittorrent-peer.h @@ -0,0 +1,104 @@ +/* Copyright (c) 2012-2020. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef BITTORRENT_PEER_H +#define BITTORRENT_PEER_H +#include "app-bittorrent.h" + +#include +#include +#include +#include + +/** Contains the connection data of a peer. */ +typedef struct s_connection { + int id; // Peer id + sg_mailbox_t mailbox; + unsigned int bitfield; // Fields + double peer_speed; + double last_unchoke; + int current_piece; + unsigned int am_interested : 1; // Indicates if we are interested in something the peer has + unsigned int interested : 1; // Indicates if the peer is interested in one of our pieces + unsigned int choked_upload : 1; // Indicates if the peer is choked for the current peer + unsigned int choked_download : 1; // Indicates if the peer has choked the current peer +} s_connection_t; + +typedef s_connection_t* connection_t; +typedef const s_connection_t* const_connection_t; + +connection_t connection_new(int id); +void connection_free(void* data); +void connection_add_speed_value(connection_t connection, double speed); +int connection_has_piece(const_connection_t connection, unsigned int piece); + +/** Peer data */ +typedef struct s_peer { + int id; // peer id + double deadline; + sg_mailbox_t mailbox; // peer mailbox. + + xbt_dict_t connected_peers; // peers list + xbt_dict_t active_peers; // active peers list + + unsigned int bitfield; // list of pieces the peer has. + unsigned long long bitfield_blocks; // list of blocks the peer has. + short* pieces_count; // number of peers that have each piece. + unsigned int current_pieces; // current pieces the peer is downloading + double begin_receive_time; // time when the receiving communication has begun, useful for calculating host speed. + int round; // current round for the chocking algorithm. + + sg_comm_t comm_received; // current comm + message_t message; // current message being received +} s_peer_t; +typedef s_peer_t* peer_t; +typedef const s_peer_t* const_peer_t; + +/** Peer main function */ +void peer(int argc, char* argv[]); + +int get_peers_from_tracker(const_peer_t peer); +void send_handshake_to_all_peers(const_peer_t peer); +void send_message(const_peer_t peer, sg_mailbox_t mailbox, e_message_type type, uint64_t size); +void send_bitfield(const_peer_t peer, sg_mailbox_t mailbox); +void send_piece(const_peer_t peer, sg_mailbox_t mailbox, int piece, int block_index, int block_length); +void send_have_to_all_peers(const_peer_t peer, int piece); +void send_request_to_peer(const_peer_t peer, connection_t remote_peer, int piece); + +void get_status(char** status, unsigned int bitfield); +int has_finished(unsigned int bitfield); +int is_interested(const_peer_t peer, const_connection_t remote_peer); +int is_interested_and_free(const_peer_t peer, const_connection_t remote_peer); +void update_pieces_count_from_bitfield(const_peer_t peer, unsigned int bitfield); + +int count_pieces(unsigned int bitfield); +int nb_interested_peers(const_peer_t peer); + +void leech(peer_t peer); +void seed(peer_t peer); + +void handle_message(peer_t peer, message_t task); + +void update_choked_peers(peer_t peer); + +void update_interested_after_receive(const_peer_t peer); + +void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length); +int piece_complete(const_peer_t peer, int index); +int get_first_missing_block_from(const_peer_t peer, int piece); + +int peer_has_not_piece(const_peer_t peer, unsigned int piece); +int peer_is_not_downloading_piece(const_peer_t peer, unsigned int piece); + +int partially_downloaded_piece(const_peer_t peer, const_connection_t remote_peer); + +void request_new_piece_to_peer(peer_t peer, connection_t remote_peer); +void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int current_piece); + +void update_active_peers_set(const_peer_t peer, connection_t remote_peer); +int select_piece_to_download(const_peer_t peer, const_connection_t remote_peer); + +#endif /* BITTORRENT_PEER_H */ diff --git a/teshsuite/msg/app-bittorrent/generate.py b/examples/c/app-bittorrent/generate.py similarity index 100% rename from teshsuite/msg/app-bittorrent/generate.py rename to examples/c/app-bittorrent/generate.py diff --git a/examples/c/app-bittorrent/tracker.c b/examples/c/app-bittorrent/tracker.c new file mode 100644 index 0000000000..2492711101 --- /dev/null +++ b/examples/c/app-bittorrent/tracker.c @@ -0,0 +1,142 @@ +/* Copyright (c) 2012-2020. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "tracker.h" + +#include "simgrid/actor.h" +#include "simgrid/comm.h" +#include "simgrid/engine.h" +#include "xbt/log.h" + +XBT_LOG_NEW_DEFAULT_CATEGORY(bittorrent_tracker, "Messages specific for the tracker"); + +void tracker_answer_free(void* data) +{ + tracker_answer_t a = (tracker_answer_t)data; + xbt_dynar_free(&a->peers); + free(a); +} + +static int is_in_list(const_xbt_dynar_t peers, int id) +{ + return xbt_dynar_member(peers, &id); +} + +/** + * Tracker main function + * @param argc number of arguments + * @param argv arguments + */ +void tracker(int argc, char* argv[]) +{ + // Checking arguments + xbt_assert(argc == 2, "Wrong number of arguments for the tracker."); + // Retrieving end time + double deadline = xbt_str_parse_double(argv[1], "Invalid deadline: %s"); + xbt_assert(deadline > 0, "Wrong deadline supplied"); + + // Building peers array + xbt_dynar_t peers_list = xbt_dynar_new(sizeof(int), NULL); + + sg_mailbox_t mailbox = sg_mailbox_by_name(TRACKER_MAILBOX); + + XBT_INFO("Tracker launched."); + sg_comm_t comm_received = NULL; + void* received = NULL; + + while (simgrid_get_clock() < deadline) { + if (comm_received == NULL) + comm_received = sg_mailbox_get_async(mailbox, &received); + + if (sg_comm_test(comm_received)) { + // Retrieve the data sent by the peer. + xbt_assert(received != NULL); + tracker_query_t tq = (tracker_query_t)received; + + // Add the peer to our peer list. + if (!is_in_list(peers_list, tq->peer_id)) + xbt_dynar_push_as(peers_list, int, tq->peer_id); + + // Sending peers to the requesting peer + tracker_answer_t ta = tracker_answer_new(TRACKER_QUERY_INTERVAL); + int next_peer; + int peers_length = xbt_dynar_length(peers_list); + for (int i = 0; i < MAXIMUM_PEERS && i < peers_length; i++) { + do { + next_peer = xbt_dynar_get_as(peers_list, rand() % peers_length, int); + } while (is_in_list(ta->peers, next_peer)); + xbt_dynar_push_as(ta->peers, int, next_peer); + } + // sending the task back to the peer. + sg_comm_t answer = sg_mailbox_put_init(tq->return_mailbox, ta, TRACKER_COMM_SIZE); + sg_comm_detach(answer, tracker_answer_free); + + xbt_free(tq); + comm_received = NULL; + received = NULL; + } else { + sg_actor_sleep_for(1); + } + } + // Free the remaining communication if any + if (comm_received) + sg_comm_unref(comm_received); + // Free the peers list + xbt_dynar_free(&peers_list); + + XBT_INFO("Tracker is leaving"); +} + +tracker_query_t tracker_query_new(int peer_id, sg_mailbox_t return_mailbox) +{ + tracker_query_t tq = xbt_new(s_tracker_query_t, 1); + tq->peer_id = peer_id; + tq->return_mailbox = return_mailbox; + return tq; +} + +tracker_answer_t tracker_answer_new(int interval) +{ + tracker_answer_t ta = xbt_new(s_tracker_answer_t, 1); + ta->interval = interval; + ta->peers = xbt_dynar_new(sizeof(int), NULL); + ; + return ta; +} + +/** + * Build a new task for the tracker. + * @param issuer_host_name Hostname of the issuer. For debugging purposes + */ +// tracker_task_data_t tracker_task_data_new(const char* issuer_host_name, sg_mailbox_t mailbox, int peer_id, int +// uploaded, +// int downloaded, int left) +//{ +// tracker_task_data_t task = xbt_new(s_tracker_task_data_t, 1); +// +// task->type = TRACKER_TASK_QUERY; +// task->issuer_host_name = issuer_host_name; +// task->mailbox = mailbox; +// task->peer_id = peer_id; +// task->uploaded = uploaded; +// task->downloaded = downloaded; +// task->left = left; +// +// task->peers = xbt_dynar_new(sizeof(int), NULL); +// +// return task; +//} +// +///** +// * Free the data structure of a tracker task. +// * @param task data to free +// */ +// void tracker_task_data_free(tracker_task_data_t task) +//{ +// xbt_dynar_free(&task->peers); +// xbt_free(task); +//} +// diff --git a/examples/c/app-bittorrent/tracker.h b/examples/c/app-bittorrent/tracker.h new file mode 100644 index 0000000000..f285236e3f --- /dev/null +++ b/examples/c/app-bittorrent/tracker.h @@ -0,0 +1,36 @@ +/* Copyright (c) 2012-2020. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef BITTORRENT_TRACKER_H +#define BITTORRENT_TRACKER_H +#include "app-bittorrent.h" +#include + +void tracker(int argc, char* argv[]); +/** + * Task types exchanged between a node and the tracker + */ +typedef enum { TRACKER_TASK_QUERY, TRACKER_TASK_ANSWER } e_tracker_task_type_t; +/** + * Tasks exchanged between a tracker and peers. + */ +typedef struct s_tracker_query { + int peer_id; // peer id + sg_mailbox_t return_mailbox; // mailbox where the tracker should answer +} s_tracker_query_t; +typedef s_tracker_query_t* tracker_query_t; + +typedef struct s_tracker_answer { + int interval; // how often the peer should contact the tracker (unused for now) + xbt_dynar_t peers; // the peer list the peer has asked for. +} s_tracker_answer_t; +typedef s_tracker_answer_t* tracker_answer_t; + +tracker_query_t tracker_query_new(int peer_id, sg_mailbox_t return_mailbox); +tracker_answer_t tracker_answer_new(int interval); +void tracker_answer_free(void* data); + +#endif /* BITTORRENT_TRACKER_H */ diff --git a/teshsuite/msg/CMakeLists.txt b/teshsuite/msg/CMakeLists.txt index 6782fe736d..732649cbfb 100644 --- a/teshsuite/msg/CMakeLists.txt +++ b/teshsuite/msg/CMakeLists.txt @@ -1,4 +1,3 @@ -# C examples foreach(x cloud-two-tasks get_sender io-file task_listen_from task_destroy_cancel) if(enable_msg) add_executable (${x} EXCLUDE_FROM_ALL ${x}/${x}.c) @@ -11,30 +10,15 @@ foreach(x cloud-two-tasks get_sender io-file task_listen_from task_destroy_cance set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.c) endforeach() -if(enable_msg) - add_executable (bittorrent EXCLUDE_FROM_ALL app-bittorrent/bittorrent.c app-bittorrent/bittorrent-messages.c app-bittorrent/bittorrent-peer.c app-bittorrent/tracker.c app-bittorrent/connection.c) - target_link_libraries(bittorrent simgrid) - set_target_properties(bittorrent PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/app-bittorrent) - add_dependencies(tests bittorrent) -endif() -foreach (file bittorrent connection bittorrent-messages bittorrent-peer tracker) - set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/${file}.c ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/${file}.h) -endforeach() - set(teshsuite_src ${teshsuite_src} PARENT_SCOPE) -set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/app-bittorrent.tesh PARENT_SCOPE) -set(bin_files ${bin_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/generate.py PARENT_SCOPE) -set(xml_files ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/app-bittorrent_d.xml - PARENT_SCOPE) +set(tesh_files ${tesh_files} PARENT_SCOPE) if(enable_msg) - foreach(x app-bittorrent cloud-two-tasks get_sender task_destroy_cancel task_listen_from io-file) + foreach(x cloud-two-tasks get_sender task_destroy_cancel task_listen_from io-file) ADD_TESH_FACTORIES(tesh-msg-${x} "raw" --setenv platfdir=${CMAKE_HOME_DIRECTORY}/examples/platforms --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/msg/${x} --cd ${CMAKE_HOME_DIRECTORY}/teshsuite/msg/${x} ${CMAKE_HOME_DIRECTORY}/teshsuite/msg/${x}/${x}.tesh) endforeach() - - ADD_TESH_FACTORIES(tesh-app-bittorrent-parallel "raw" --cfg contexts/nthreads:4 ${CONTEXTS_SYNCHRO} --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/msg/app-bittorrent --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/platforms --setenv platfdir=${CMAKE_HOME_DIRECTORY}/examples/platforms --cd ${CMAKE_HOME_DIRECTORY}/teshsuite/msg/app-bittorrent app-bittorrent.tesh) endif() diff --git a/teshsuite/msg/app-bittorrent/bittorrent-messages.c b/teshsuite/msg/app-bittorrent/bittorrent-messages.c deleted file mode 100644 index 0970308f43..0000000000 --- a/teshsuite/msg/app-bittorrent/bittorrent-messages.c +++ /dev/null @@ -1,119 +0,0 @@ -/* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#include "bittorrent-messages.h" -#include "bittorrent.h" - -#include /* snprintf */ - -XBT_LOG_NEW_DEFAULT_CATEGORY(msg_messages, "Messages specific for the message factory"); - -#define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0) - -/** @brief Build a new empty message - * @param type type of the message - * @param issuer_host_name hostname of the issuer, for debugging purposes - * @param mailbox mailbox where the peer should answer - * @param peer_id id of the issuer - * @param size message size in bytes - */ -msg_task_t task_message_new(e_message_type type, const char* issuer_host_name, const char* mailbox, int peer_id, - int size) -{ - message_t message = xbt_new(s_message_t, 1); - message->issuer_host_name = issuer_host_name; - message->peer_id = peer_id; - message->mailbox = mailbox; - message->type = type; - msg_task_t task = MSG_task_create(NULL, 0, size, message); - XBT_DEBUG("type: %d size: %d", (int)type, size); - return task; -} - -/** Builds a message containing an index. */ -msg_task_t task_message_index_new(e_message_type type, const char* issuer_host_name, const char* mailbox, int peer_id, - int index, int varsize) -{ - msg_task_t task = task_message_new(type, issuer_host_name, mailbox, peer_id, task_message_size(type) + varsize); - message_t message = MSG_task_get_data(task); - message->index = index; - return task; -} - -msg_task_t task_message_bitfield_new(const char* issuer_host_name, const char* mailbox, int peer_id, - unsigned int bitfield, int bitfield_size) -{ - msg_task_t task = task_message_new(MESSAGE_BITFIELD, issuer_host_name, mailbox, peer_id, - task_message_size(MESSAGE_BITFIELD) + BITS_TO_BYTES(bitfield_size)); - message_t message = MSG_task_get_data(task); - message->bitfield = bitfield; - return task; -} - -msg_task_t task_message_request_new(const char* issuer_host_name, const char* mailbox, int peer_id, int index, - int block_index, int block_length) -{ - msg_task_t task = task_message_index_new(MESSAGE_REQUEST, issuer_host_name, mailbox, peer_id, index, 0); - message_t message = MSG_task_get_data(task); - message->block_index = block_index; - message->block_length = block_length; - return task; -} - -msg_task_t task_message_piece_new(const char* issuer_host_name, const char* mailbox, int peer_id, int index, - int block_index, int block_length, int block_size) -{ - msg_task_t task = - task_message_index_new(MESSAGE_PIECE, issuer_host_name, mailbox, peer_id, index, block_length * block_size); - message_t message = MSG_task_get_data(task); - message->block_index = block_index; - message->block_length = block_length; - return task; -} - -void task_message_free(void* task) -{ - message_t message = MSG_task_get_data(task); - xbt_free(message); - MSG_task_destroy(task); -} - -int task_message_size(e_message_type type) -{ - int size = 0; - switch (type) { - case MESSAGE_HANDSHAKE: - size = MESSAGE_HANDSHAKE_SIZE; - break; - case MESSAGE_CHOKE: - size = MESSAGE_CHOKE_SIZE; - break; - case MESSAGE_UNCHOKE: - size = MESSAGE_UNCHOKE_SIZE; - break; - case MESSAGE_INTERESTED: - case MESSAGE_NOTINTERESTED: - size = MESSAGE_INTERESTED_SIZE; - break; - case MESSAGE_HAVE: - size = MESSAGE_HAVE_SIZE; - break; - case MESSAGE_BITFIELD: - size = MESSAGE_BITFIELD_SIZE; - break; - case MESSAGE_REQUEST: - size = MESSAGE_REQUEST_SIZE; - break; - case MESSAGE_PIECE: - size = MESSAGE_PIECE_SIZE; - break; - case MESSAGE_CANCEL: - size = MESSAGE_CANCEL_SIZE; - break; - default: - THROW_IMPOSSIBLE; - } - return size; -} diff --git a/teshsuite/msg/app-bittorrent/bittorrent-messages.h b/teshsuite/msg/app-bittorrent/bittorrent-messages.h deleted file mode 100644 index 4afeb0c5bd..0000000000 --- a/teshsuite/msg/app-bittorrent/bittorrent-messages.h +++ /dev/null @@ -1,72 +0,0 @@ -/* Copyright (c) 2012-2020. The SimGrid Team. - * All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#ifndef BITTORRENT_MESSAGES_H_ -#define BITTORRENT_MESSAGES_H_ -#include - -/** Message sizes - * Sizes based on report by A. Legout et al, Understanding BitTorrent: An Experimental Perspective - * http://hal.inria.fr/inria-00000156/en - */ -#define MESSAGE_HANDSHAKE_SIZE 68 -#define MESSAGE_CHOKE_SIZE 5 -#define MESSAGE_UNCHOKE_SIZE 5 -#define MESSAGE_INTERESTED_SIZE 5 -#define MESSAGE_NOTINTERESTED_SIZE 5 -#define MESSAGE_HAVE_SIZE 9 -#define MESSAGE_BITFIELD_SIZE 5 -#define MESSAGE_REQUEST_SIZE 17 -#define MESSAGE_PIECE_SIZE 13 -#define MESSAGE_CANCEL_SIZE 17 - -/** Types of messages exchanged between two peers. */ -typedef enum { - MESSAGE_HANDSHAKE, - MESSAGE_CHOKE, - MESSAGE_UNCHOKE, - MESSAGE_INTERESTED, - MESSAGE_NOTINTERESTED, - MESSAGE_HAVE, - MESSAGE_BITFIELD, - MESSAGE_REQUEST, - MESSAGE_PIECE, - MESSAGE_CANCEL -} e_message_type; - -/** Message data */ -typedef struct s_message { - e_message_type type; - const char* mailbox; - const char* issuer_host_name; - int peer_id; - unsigned int bitfield; - int index; - int block_index; - int block_length; -} s_message_t; -typedef s_message_t* message_t; - -/** Builds a new value-less message */ -msg_task_t task_message_new(e_message_type type, const char* issuer_host_name, const char* mailbox, int peer_id, - int size); -/** Builds a new "have/piece" message */ -msg_task_t task_message_index_new(e_message_type type, const char* issuer_host_name, const char* mailbox, int peer_id, - int index, int varsize); -/** Builds a new bitfield message */ -msg_task_t task_message_bitfield_new(const char* issuer_host_name, const char* mailbox, int peer_id, - unsigned int bitfield, int bitfield_size); -/** Builds a new "request" message */ -msg_task_t task_message_request_new(const char* issuer_host_name, const char* mailbox, int peer_id, int index, - int block_index, int block_length); -/** Build a new "piece" message */ -msg_task_t task_message_piece_new(const char* issuer_host_name, const char* mailbox, int peer_id, int index, - int block_index, int block_length, int block_size); -/** Free a message task */ -void task_message_free(void*); -int task_message_size(e_message_type type); - -#endif /* BITTORRENT_MESSAGES_H_ */ diff --git a/teshsuite/msg/app-bittorrent/bittorrent-peer.h b/teshsuite/msg/app-bittorrent/bittorrent-peer.h deleted file mode 100644 index e6f46313b6..0000000000 --- a/teshsuite/msg/app-bittorrent/bittorrent-peer.h +++ /dev/null @@ -1,94 +0,0 @@ -/* Copyright (c) 2012-2020. The SimGrid Team. - * All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#ifndef BITTORRENT_PEER_H -#define BITTORRENT_PEER_H -#include "bittorrent.h" -#include "connection.h" -#include -#include -#include - -/** Peer data */ -typedef struct s_peer { - int id; // peer id - - unsigned int bitfield; // list of pieces the peer has. - unsigned long long bitfield_blocks; // list of blocks the peer has. - short* pieces_count; // number of peers that have each piece. - - unsigned int current_pieces; // current pieces the peer is downloading - - xbt_dict_t peers; // peers list - xbt_dict_t active_peers; // active peers list - int round; // current round for the chocking algorithm. - - char mailbox[MAILBOX_SIZE]; // peer mailbox. - char mailbox_tracker[MAILBOX_SIZE]; // pair mailbox while communicating with the tracker. - const char* hostname; // peer hostname - - msg_task_t task_received; // current task being received - msg_comm_t comm_received; // current comm - - double begin_receive_time; // time when the receiving communication has begun, useful for calculating host speed. -} s_peer_t; -typedef s_peer_t* peer_t; - -/** Peer main function */ -int peer(int argc, char* argv[]); -void get_status(char** status, unsigned int bitfield); - -int get_peers_data(const s_peer_t* peer); -void leech_loop(peer_t peer, double deadline); -void seed_loop(peer_t peer, double deadline); - -peer_t peer_init(int id, int seed); -void peer_free(peer_t peer); - -int has_finished(unsigned int bitfield); - -void handle_message(peer_t peer, msg_task_t task); - -void update_pieces_count_from_bitfield(const s_peer_t* peer, unsigned int bitfield); -void update_choked_peers(peer_t peer); - -void update_interested_after_receive(const s_peer_t* peer); - -void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length); -int piece_complete(const s_peer_t* peer, int index); -int get_first_block(const s_peer_t* peer, int piece); - -int peer_has_not_piece(const s_peer_t* peer, unsigned int piece); -int peer_is_not_downloading_piece(const s_peer_t* peer, unsigned int piece); -int count_pieces(unsigned int bitfield); - -int nb_interested_peers(const s_peer_t* peer); -int is_interested(const s_peer_t* peer, const s_connection_t* remote_peer); -int is_interested_and_free(const s_peer_t* peer, const s_connection_t* remote_peer); -int partially_downloaded_piece(const s_peer_t* peer, const s_connection_t* remote_peer); - -void request_new_piece_to_peer(peer_t peer, connection_t remote_peer); -void send_request_to_peer(const s_peer_t* peer, connection_t remote_peer, int piece); -void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int current_piece); - -void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer); -int select_piece_to_download(const s_peer_t* peer, const s_connection_t* remote_peer); - -void send_handshake_all(const s_peer_t* peer); - -void send_interested(const s_peer_t* peer, const char* mailbox); - -void send_notinterested(const s_peer_t* peer, const char* mailbox); -void send_handshake(const s_peer_t* peer, const char* mailbox); -void send_bitfield(const s_peer_t* peer, const char* mailbox); -void send_choked(const s_peer_t* peer, const char* mailbox); -void send_unchoked(const s_peer_t* peer, const char* mailbox); -void send_have(const s_peer_t* peer, int piece); - -void send_request(const s_peer_t* peer, const char* mailbox, int piece, int block_index, int block_length); -void send_piece(const s_peer_t* peer, const char* mailbox, int piece, int block_index, int block_length); - -#endif /* BITTORRENT_PEER_H */ diff --git a/teshsuite/msg/app-bittorrent/bittorrent.h b/teshsuite/msg/app-bittorrent/bittorrent.h deleted file mode 100644 index 93ba317f2b..0000000000 --- a/teshsuite/msg/app-bittorrent/bittorrent.h +++ /dev/null @@ -1,24 +0,0 @@ -/* Copyright (c) 2012-2020. The SimGrid Team. - * All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#ifndef BITTORRENT_BITTORRENT_H_ -#define BITTORRENT_BITTORRENT_H_ - -#define MAILBOX_SIZE 40 -#define TRACKER_MAILBOX "tracker_mailbox" -/** Max number of pairs sent by the tracker to clients */ -#define MAXIMUM_PEERS 50 -/** Interval of time where the peer should send a request to the tracker */ -#define TRACKER_QUERY_INTERVAL 1000 -/** Communication size for a task to the tracker */ -#define TRACKER_COMM_SIZE 1 -#define GET_PEERS_TIMEOUT 10000 -/** Number of peers that can be unchocked at a given time */ -#define MAX_UNCHOKED_PEERS 4 -/** Interval between each update of the choked peers */ -#define UPDATE_CHOKED_INTERVAL 30 - -#endif /* BITTORRENT_BITTORRENT_H_ */ diff --git a/teshsuite/msg/app-bittorrent/connection.c b/teshsuite/msg/app-bittorrent/connection.c deleted file mode 100644 index 59f1b53edf..0000000000 --- a/teshsuite/msg/app-bittorrent/connection.c +++ /dev/null @@ -1,45 +0,0 @@ -/* Copyright (c) 2012-2020. The SimGrid Team. - * All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#include "connection.h" -#include "bittorrent.h" -#include -XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(msg_peers); - -connection_t connection_new(int id) -{ - connection_t connection = xbt_new(s_connection_t, 1); - - connection->id = id; - connection->mailbox = bprintf("%d", id); - connection->bitfield = 0; - connection->current_piece = -1; - connection->interested = 0; - connection->am_interested = 0; - connection->choked_upload = 1; - connection->choked_download = 1; - connection->peer_speed = 0; - connection->last_unchoke = 0; - - return connection; -} - -void connection_add_speed_value(connection_t connection, double speed) -{ - connection->peer_speed = connection->peer_speed * 0.6 + speed * 0.4; -} - -void connection_free(void* data) -{ - connection_t co = (connection_t)data; - xbt_free(co->mailbox); - xbt_free(co); -} - -int connection_has_piece(const s_connection_t* connection, unsigned int piece) -{ - return (connection->bitfield & 1U << piece); -} diff --git a/teshsuite/msg/app-bittorrent/connection.h b/teshsuite/msg/app-bittorrent/connection.h deleted file mode 100644 index bb44b69182..0000000000 --- a/teshsuite/msg/app-bittorrent/connection.h +++ /dev/null @@ -1,39 +0,0 @@ -/* Copyright (c) 2012-2020. The SimGrid Team. - * All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#ifndef BITTORRENT_CONNECTION_H_ -#define BITTORRENT_CONNECTION_H_ - -/** Contains the connection data of a peer. */ -typedef struct s_connection { - int id; // Peer id - unsigned int bitfield; // Fields - char* mailbox; - int messages_count; - double peer_speed; - double last_unchoke; - int current_piece; - unsigned int am_interested : 1; // Indicates if we are interested in something the peer has - unsigned int interested : 1; // Indicates if the peer is interested in one of our pieces - unsigned int choked_upload : 1; // Indicates if the peer is choked for the current peer - unsigned int choked_download : 1; // Indicates if the peer has choked the current peer -} s_connection_t; - -typedef s_connection_t* connection_t; - -/** @brief Build a new connection object from the peer id. - * @param id id of the peer - */ -connection_t connection_new(int id); -/** @brief Add a new value to the peer speed average - * @param connection connection data - * @param speed speed to add to the speed average - */ -void connection_add_speed_value(connection_t connection, double speed); -/** Frees a connection object */ -void connection_free(void* data); -int connection_has_piece(const s_connection_t* connection, unsigned int piece); -#endif /* BITTORRENT_CONNECTION_H_ */ diff --git a/teshsuite/msg/app-bittorrent/tracker.c b/teshsuite/msg/app-bittorrent/tracker.c deleted file mode 100644 index ae0463339c..0000000000 --- a/teshsuite/msg/app-bittorrent/tracker.c +++ /dev/null @@ -1,132 +0,0 @@ -/* Copyright (c) 2012-2020. The SimGrid Team. - * All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#include "tracker.h" -#include - -static void task_free(void* data); - -XBT_LOG_NEW_DEFAULT_CATEGORY(msg_tracker, "Messages specific for the tracker"); -/** - * Tracker main function - * @param argc number of arguments - * @param argv arguments - */ -int tracker(int argc, char* argv[]) -{ - // Checking arguments - xbt_assert(argc == 2, "Wrong number of arguments for the tracker."); - // Retrieving end time - double deadline = xbt_str_parse_double(argv[1], "Invalid deadline: %s"); - xbt_assert(deadline > 0, "Wrong deadline supplied"); - - // Building peers array - xbt_dynar_t peers_list = xbt_dynar_new(sizeof(int), NULL); - - XBT_INFO("Tracker launched."); - - msg_comm_t comm_received = NULL; - msg_task_t task_received = NULL; - - while (MSG_get_clock() < deadline) { - if (comm_received == NULL) { - comm_received = MSG_task_irecv(&task_received, TRACKER_MAILBOX); - } - if (MSG_comm_test(comm_received)) { - // Check for correct status - if (MSG_comm_get_status(comm_received) == MSG_OK) { - // Retrieve the data sent by the peer. - tracker_task_data_t data = MSG_task_get_data(task_received); - // Add the peer to our peer list. - if (!is_in_list(peers_list, data->peer_id)) { - xbt_dynar_push_as(peers_list, int, data->peer_id); - } - // Sending peers to the peer - int next_peer; - int peers_length = xbt_dynar_length(peers_list); - for (int i = 0; i < MAXIMUM_PEERS && i < peers_length; i++) { - do { - next_peer = xbt_dynar_get_as(peers_list, rand() % peers_length, int); - } while (is_in_list(data->peers, next_peer)); - xbt_dynar_push_as(data->peers, int, next_peer); - } - // setting the interval - data->interval = TRACKER_QUERY_INTERVAL; - // sending the task back to the peer. - MSG_task_dsend(task_received, data->mailbox, task_free); - // destroy the communication. - } - MSG_comm_destroy(comm_received); - comm_received = NULL; - task_received = NULL; - } else { - MSG_process_sleep(1); - } - } - // Free the remaining communication if any - if (comm_received) { - MSG_comm_destroy(comm_received); - } - // Free the peers list - xbt_dynar_free(&peers_list); - - XBT_INFO("Tracker is leaving"); - - return 0; -} - -/** - * Build a new task for the tracker. - * @param issuer_host_name Hostname of the issuer. For debugging purposes - */ -tracker_task_data_t tracker_task_data_new(const char* issuer_host_name, const char* mailbox, int peer_id, int uploaded, - int downloaded, int left) -{ - tracker_task_data_t task = xbt_new(s_tracker_task_data_t, 1); - - task->type = TRACKER_TASK_QUERY; - task->issuer_host_name = issuer_host_name; - task->mailbox = mailbox; - task->peer_id = peer_id; - task->uploaded = uploaded; - task->downloaded = downloaded; - task->left = left; - - task->peers = xbt_dynar_new(sizeof(int), NULL); - - return task; -} - -/** - * Free a tracker task that has not successfully been sent. - * @param data Task to free - */ -static void task_free(void* data) -{ - tracker_task_data_t task_data = MSG_task_get_data(data); - tracker_task_data_free(task_data); - MSG_task_destroy(data); -} - -/** - * Free the data structure of a tracker task. - * @param task data to free - */ -void tracker_task_data_free(tracker_task_data_t task) -{ - xbt_dynar_free(&task->peers); - xbt_free(task); -} - -/** - * Returns if the given id is in the peers lsit - * @param peers dynar containing the peers - * @param id identifier of the peer to test - */ -int is_in_list(const_xbt_dynar_t peers, int id) -{ - return xbt_dynar_member(peers, &id); -} diff --git a/teshsuite/msg/app-bittorrent/tracker.h b/teshsuite/msg/app-bittorrent/tracker.h deleted file mode 100644 index 89751214fe..0000000000 --- a/teshsuite/msg/app-bittorrent/tracker.h +++ /dev/null @@ -1,42 +0,0 @@ -/* Copyright (c) 2012-2020. The SimGrid Team. - * All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#ifndef BITTORRENT_TRACKER_H -#define BITTORRENT_TRACKER_H -#include "bittorrent.h" -#include -/** - * Tracker main function - */ -int tracker(int argc, char* argv[]); -/** - * Task types exchanged between a node and the tracker - */ -typedef enum { TRACKER_TASK_QUERY, TRACKER_TASK_ANSWER } e_tracker_task_type_t; -/** - * Tasks exchanged between a tracker and peers. - */ -typedef struct s_tracker_task_data { - e_tracker_task_type_t type; // type of the task - const char* mailbox; // mailbox where the tracker should answer - const char* issuer_host_name; // hostname, for debug purposes - // Query data - int peer_id; // peer id - int uploaded; // how much the peer has already uploaded - int downloaded; // how much the peer has downloaded - int left; // how much the peer has left - // Answer data - int interval; // how often the peer should contact the tracker (unused for now) - xbt_dynar_t peers; // the peer list the peer has asked for. -} s_tracker_task_data_t; -typedef s_tracker_task_data_t* tracker_task_data_t; - -tracker_task_data_t tracker_task_data_new(const char* issuer_host_name, const char* mailbox, int peer_id, int uploaded, - int downloaded, int left); -void tracker_task_data_free(tracker_task_data_t task); - -int is_in_list(const_xbt_dynar_t peers, int id); -#endif /* BITTORRENT_TRACKER_H */