-/* Copyright (c) 2012-2021. The SimGrid Team. All rights reserved. */
+/* Copyright (c) 2012-2023. The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
#include "s4u-tracker.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
+namespace sg4 = simgrid::s4u;
/*
* User parameters for transferred file data. For the test, the default values are :
/* REQUEST */ 17,
/* PIECE */ 13,
/* CANCEL */ 17}};
- return sizes[static_cast<int>(type)];
+ return sizes.at(static_cast<int>(type));
}
constexpr const char* message_name(MessageType type)
{
constexpr std::array<const char*, 10> names{{"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "HAVE",
"BITFIELD", "REQUEST", "PIECE", "CANCEL"}};
- return names[static_cast<int>(type)];
+ return names.at(static_cast<int>(type));
}
Peer::Peer(std::vector<std::string> args)
xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
try {
id = std::stoi(args[1]);
- mailbox_ = simgrid::s4u::Mailbox::by_name(std::to_string(id));
+ mailbox_ = sg4::Mailbox::by_name(std::to_string(id));
} catch (const std::invalid_argument&) {
throw std::invalid_argument("Invalid ID:" + args[1]);
}
// Getting peer data from the tracker.
if (getPeersFromTracker()) {
XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
- begin_receive_time = simgrid::s4u::Engine::get_clock();
- mailbox_->set_receiver(simgrid::s4u::Actor::self());
+ begin_receive_time = sg4::Engine::get_clock();
+ mailbox_->set_receiver(sg4::Actor::self());
if (hasFinished()) {
sendHandshakeToAllPeers();
} else {
bool Peer::getPeersFromTracker()
{
- simgrid::s4u::Mailbox* tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX);
+ sg4::Mailbox* tracker_mailbox = sg4::Mailbox::by_name(TRACKER_MAILBOX);
// Build the task to send to the tracker
auto* peer_request = new TrackerQuery(id, mailbox_);
try {
// Add the peers the tracker gave us to our peer list.
for (auto const& peer_id : answer->getPeers())
if (id != peer_id)
- connected_peers.emplace(peer_id, Connection(peer_id));
+ connected_peers.try_emplace(peer_id, peer_id);
} catch (const simgrid::TimeoutException&) {
XBT_DEBUG("Timeout expired when requesting peers to tracker");
return false;
void Peer::sendHandshakeToAllPeers()
{
- for (auto const& kv : connected_peers) {
- const Connection& remote_peer = kv.second;
+ for (auto const& [_, remote_peer] : connected_peers) {
auto* handshake = new Message(MessageType::HANDSHAKE, id, mailbox_);
remote_peer.mailbox_->put_init(handshake, message_size(MessageType::HANDSHAKE))->detach();
XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id);
}
}
-void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, MessageType type, uint64_t size)
+void Peer::sendMessage(sg4::Mailbox* mailbox, MessageType type, uint64_t size)
{
XBT_DEBUG("Sending %s to %s", message_name(type), mailbox->get_cname());
mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
}
-void Peer::sendBitfield(simgrid::s4u::Mailbox* mailbox)
+void Peer::sendBitfield(sg4::Mailbox* mailbox)
{
XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
mailbox
->detach();
}
-void Peer::sendPiece(simgrid::s4u::Mailbox* mailbox, unsigned int piece, int block_index, int block_length)
+void Peer::sendPiece(sg4::Mailbox* mailbox, unsigned int piece, int block_index, int block_length)
{
xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
void Peer::sendHaveToAllPeers(unsigned int piece)
{
XBT_DEBUG("Sending HAVE message to all my peers");
- for (auto const& kv : connected_peers) {
- const Connection& remote_peer = kv.second;
+ for (auto const& [_, remote_peer] : connected_peers) {
remote_peer.mailbox_->put_init(new Message(MessageType::HAVE, id, mailbox_, piece), message_size(MessageType::HAVE))
->detach();
}
void Peer::leech()
{
- double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
+ double next_choked_update = sg4::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
XBT_DEBUG("Start downloading.");
/* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
sendHandshakeToAllPeers();
XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
- while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
+ while (sg4::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
if (comm_received == nullptr) {
comm_received = mailbox_->get_async<Message>(&message);
}
comm_received = nullptr;
} else {
// We don't execute the choke algorithm if we don't already have a piece
- if (simgrid::s4u::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
+ if (sg4::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
updateChokedPeers();
next_choked_update += UPDATE_CHOKED_INTERVAL;
} else {
- simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
+ sg4::this_actor::sleep_for(SLEEP_DURATION);
}
}
}
void Peer::seed()
{
- double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
+ double next_choked_update = sg4::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
XBT_DEBUG("Start seeding.");
// start the main seed loop
- while (simgrid::s4u::Engine::get_clock() < deadline) {
+ while (sg4::Engine::get_clock() < deadline) {
if (comm_received == nullptr) {
comm_received = mailbox_->get_async<Message>(&message);
}
delete message;
comm_received = nullptr;
} else {
- if (simgrid::s4u::Engine::get_clock() >= next_choked_update) {
+ if (sg4::Engine::get_clock() >= next_choked_update) {
updateChokedPeers();
// TODO: Change the choked peer algorithm when seeding.
next_choked_update += UPDATE_CHOKED_INTERVAL;
} else {
- simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
+ sg4::this_actor::sleep_for(SLEEP_DURATION);
}
}
}
// Check if the peer is in our connection list.
if (remote_peer == nullptr) {
XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
- connected_peers.emplace(message->peer_id, Connection(message->peer_id));
+ connected_peers.try_emplace(message->peer_id, message->peer_id);
sendMessage(message->return_mailbox, MessageType::HANDSHAKE, message_size(MessageType::HANDSHAKE));
}
// Send our bitfield to the peer
}
// Update the peer speed.
if (remote_peer) {
- remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::get_clock() - begin_receive_time));
+ remote_peer->addSpeedValue(1.0 / (sg4::Engine::get_clock() - begin_receive_time));
}
- begin_receive_time = simgrid::s4u::Engine::get_clock();
+ begin_receive_time = sg4::Engine::get_clock();
}
/** Selects the appropriate piece to download and requests it to the remote_peer */
/**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
if (hasFinished()) {
- double unchoke_time = simgrid::s4u::Engine::get_clock() + 1;
- for (auto& kv : connected_peers) {
- Connection& remote_peer = kv.second;
+ double unchoke_time = sg4::Engine::get_clock() + 1;
+ for (auto& [_, remote_peer] : connected_peers) {
if (remote_peer.last_unchoke < unchoke_time && remote_peer.interested && remote_peer.choked_upload) {
unchoke_time = remote_peer.last_unchoke;
chosen_peer = &remote_peer;
} else {
// Use the "fastest download" policy.
double fastest_speed = 0.0;
- for (auto& kv : connected_peers) {
- Connection& remote_peer = kv.second;
+ for (auto& [_, remote_peer] : connected_peers) {
if (remote_peer.peer_speed > fastest_speed && remote_peer.choked_upload && remote_peer.interested) {
fastest_speed = remote_peer.peer_speed;
chosen_peer = &remote_peer;
xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
chosen_peer->choked_upload = false;
active_peers.insert(chosen_peer);
- chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
+ chosen_peer->last_unchoke = sg4::Engine::get_clock();
XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
updateActivePeersSet(chosen_peer);
sendMessage(chosen_peer->mailbox_, MessageType::UNCHOKE, message_size(MessageType::UNCHOKE));
/** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
void Peer::updateInterestedAfterReceive()
{
- for (auto& kv : connected_peers) {
- Connection& remote_peer = kv.second;
+ for (auto& [_, remote_peer] : connected_peers) {
if (remote_peer.am_interested) {
bool interested = false;
// Check if the peer still has a piece we want.