2 * Copyright (c) 2006-2013. The SimGrid Team.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package.
10 import java.util.ArrayList;
11 import java.util.HashMap;
12 import java.util.Iterator;
13 import java.util.Map.Entry;
15 import org.simgrid.msg.Comm;
16 import org.simgrid.msg.Host;
17 import org.simgrid.msg.Msg;
18 import org.simgrid.msg.MsgException;
19 import org.simgrid.msg.Process;
20 import org.simgrid.msg.RngStream;
21 import org.simgrid.msg.Task;
24 * Main class for peers execution
26 public class Peer extends Process {
27 protected int round = 0;
29 protected double beginReceiveTime;
30 protected double deadline;
32 protected static RngStream stream = new RngStream();
35 protected String mailbox;
36 protected String mailboxTracker;
37 protected String hostname;
38 protected int pieces = 0;
39 protected char[] bitfield = new char[Common.FILE_PIECES];
40 protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
42 protected short[] piecesCount = new short[Common.FILE_PIECES];
44 protected int piecesRequested = 0;
46 protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
47 protected int currentPiece = -1;
49 protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
50 protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
52 protected Comm commReceived = null;
54 public Peer(Host host, String name, String[]args) {
55 super(host,name,args);
59 public void main(String[] args) throws MsgException {
61 if (args.length != 3 && args.length != 2) {
62 Msg.info("Wrong number of arguments");
64 if (args.length == 3) {
65 init(Integer.valueOf(args[0]),true);
68 init(Integer.valueOf(args[0]),false);
70 //Retrieve the deadline
71 deadline = Double.valueOf(args[1]);
73 Msg.info("Wrong deadline supplied");
76 Msg.info("Hi, I'm joining the network with id " + id);
77 //Getting peer data from the tracker
79 Msg.debug("Got " + peers.size() + " peers from the tracker");
80 Msg.debug("Here is my current status: " + getStatus());
81 beginReceiveTime = Msg.getClock();
83 pieces = Common.FILE_PIECES;
93 Msg.info("Couldn't contact the tracker.");
95 Msg.info("Here is my current status: " + getStatus());
98 * Peer main loop when it is leeching.
100 private void leechLoop() {
101 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
102 Msg.debug("Start downloading.");
104 * Send a "handshake" message to all the peers it got
105 * (it couldn't have gotten more than 50 peers anyway)
108 //Wait for at least one "bitfield" message.
110 Msg.debug("Starting main leech loop");
111 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
112 if (commReceived == null) {
113 commReceived = Task.irecv(mailbox);
116 if (commReceived.test()) {
117 handleMessage(commReceived.getTask());
121 //If the user has a pending interesting
122 if (currentPiece != -1) {
123 sendInterestedToPeers();
126 if (currentPieces.size() < Common.MAX_PIECES) {
127 updateCurrentPiece();
130 //We don't execute the choke algorithm if we don't already have a piece
131 if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
133 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
140 catch (MsgException e) {
147 * Peer main loop when it is seeding
149 private void seedLoop() {
150 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
151 Msg.debug("Start seeding.");
152 //start the main seed loop
153 while (Msg.getClock() < deadline) {
154 if (commReceived == null) {
155 commReceived = Task.irecv(mailbox);
158 if (commReceived.test()) {
159 handleMessage(commReceived.getTask());
163 if (Msg.getClock() >= nextChokedUpdate) {
165 //TODO: Change the choked peer algorithm when seeding
166 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
173 catch (MsgException e) {
181 * Initialize the various peer data
182 * @param id id of the peer to take in the network
183 * @param seed indicates if the peer is a seed
185 private void init(int id, boolean seed) {
187 this.mailbox = Integer.toString(id);
188 this.mailboxTracker = "tracker_" + Integer.toString(id);
190 for (int i = 0; i < bitfield.length; i++) {
192 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
193 bitfieldBlocks[i][j] = '1';
198 for (int i = 0; i < bitfield.length; i++) {
200 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
201 bitfieldBlocks[i][j] = '0' ;
205 this.hostname = host.getName();
208 * Retrieves the peer list from the tracker
210 private boolean getPeersData() {
212 boolean success = false, sendSuccess = false;
213 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
214 //Build the task to send to the tracker
215 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
217 while (!sendSuccess && Msg.getClock() < timeout) {
219 Msg.debug("Sending a peer request to the tracker.");
220 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
223 catch (MsgException e) {
227 while (!success && Msg.getClock() < timeout) {
228 commReceived = Task.irecv(this.mailboxTracker);
230 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
231 if (commReceived.getTask() instanceof TrackerTask) {
232 TrackerTask task = (TrackerTask)commReceived.getTask();
233 for (Integer peerId: task.peers) {
234 if (peerId != this.id) {
235 peers.put(peerId, new Connection(peerId));
241 catch (MsgException e) {
250 * Handle a received message sent by another peer
251 * @param task task received.
253 void handleMessage(Task task) {
254 MessageTask message = (MessageTask)task;
255 Connection remotePeer = peers.get(message.peerId);
256 switch (message.type) {
258 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
259 //Check if the peer is in our connection list
260 if (remotePeer == null) {
261 peers.put(message.peerId, new Connection(message.peerId));
262 sendHandshake(message.mailbox);
264 //Send our bitfield to the pair
265 sendBitfield(message.mailbox);
268 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
269 //update the pieces list
270 updatePiecesCountFromBitfield(message.bitfield);
271 //Update the current piece
272 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
273 updateCurrentPiece();
275 remotePeer.bitfield = message.bitfield.clone();
278 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
279 assert remotePeer != null;
280 remotePeer.interested = true;
283 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
284 assert remotePeer != null;
285 remotePeer.interested = false;
288 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
289 assert remotePeer != null;
290 remotePeer.chokedDownload = false;
291 activePeers.put(remotePeer.id,remotePeer);
292 sendRequestsToPeer(remotePeer);
295 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
296 assert remotePeer != null;
297 remotePeer.chokedDownload = true;
298 activePeers.remove(remotePeer.id);
301 if (remotePeer.bitfield == null) {
304 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
305 assert message.index >= 0 && message.index < Common.FILE_PIECES;
306 assert remotePeer.bitfield != null;
307 remotePeer.bitfield[message.index] = '1';
308 piecesCount[message.index]++;
309 //Send interested message to the peer if he has what we want
310 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
311 remotePeer.amInterested = true;
312 sendInterested(remotePeer.mailbox);
315 if (currentPieces.contains(message.index)) {
316 int blockIndex = getFirstBlock(message.index);
317 int blockLength = Common.PIECES_BLOCKS - blockIndex ;
318 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
319 sendRequest(message.mailbox,message.index,blockIndex,blockLength);
323 assert message.index >= 0 && message.index < Common.FILE_PIECES;
324 if (!remotePeer.chokedUpload) {
325 Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
326 if (bitfield[message.index] == '1') {
327 sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);
330 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
335 if (message.stalled) {
336 Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ") is stalled");
339 Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
340 if (bitfield[message.index] == '0') {
341 updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
342 if (pieceComplete(message.index)) {
344 //Removing the piece from our piece list.
345 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
347 //Setting the fact that we have the piece
348 bitfield[message.index] = '1';
350 Msg.debug("My status is now " + getStatus());
351 //Sending the information to all the peers we are connected to
352 sendHave(message.index);
353 //sending UNINTERESTED to peers that doesn't have what we want.
354 updateInterestedAfterReceive();
358 Msg.debug("However, we already have it.");
363 if (remotePeer != null) {
364 remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
366 beginReceiveTime = Msg.getClock();
369 * Wait for the node to receive interesting bitfield messages (ie: non empty)
372 void waitForPieces() {
373 boolean finished = false;
374 while (Msg.getClock() < deadline && !finished) {
375 if (commReceived == null) {
376 commReceived = Task.irecv(mailbox);
379 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
380 handleMessage(commReceived.getTask());
381 if (currentPiece != -1) {
386 catch (MsgException e) {
392 private boolean hasFinished() {
393 for (int i = 0; i < bitfield.length; i++) {
394 if (bitfield[i] == '1') {
401 * Updates the list of who has a piece from a bitfield
402 * @param bitfield bitfield
404 private void updatePiecesCountFromBitfield(char bitfield[]) {
405 for (int i = 0; i < Common.FILE_PIECES; i++) {
406 if (bitfield[i] == '1') {
412 * Update the piece the peer is currently interested in.
413 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
414 * If the peer has less than 3 pieces, he chooses a piece at random.
415 * If the peer has more than pieces, he downloads the pieces that are the less
418 void updateCurrentPiece() {
419 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
422 if (true || pieces < 3) {
423 int i = 0, peerPiece;
425 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
427 } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
430 //trivial min algorithm.
433 currentPieces.add(currentPiece);
434 Msg.debug("New interested piece: " + currentPiece);
435 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
438 * Update the list of current choked and unchoked peers, using the
441 private void updateChokedPeers() {
442 round = (round + 1) % 3;
443 if (peers.size() == 0) {
446 //remove a peer from the list
447 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
449 Entry<Integer,Connection> e = it.next();
450 Connection peerChoked = e.getValue();
451 peerChoked.chokedUpload = true;
452 sendChoked(peerChoked.mailbox);
453 activePeers.remove(e.getKey());
455 Connection peerChoosed = null;
456 //Separate the case from when the peer is seeding.
457 if (pieces == Common.FILE_PIECES) {
458 //Find the last unchoked peer.
459 double unchokeTime = deadline + 1;
460 for (Connection connection : peers.values()) {
461 if (connection.lastUnchoke < unchokeTime && connection.interested) {
462 peerChoosed = connection;
463 unchokeTime = connection.lastUnchoke;
468 //Random optimistic unchoking
473 int idChosen = stream.randInt(0,peers.size() - 1);
474 for (Connection connection : peers.values()) {
476 peerChoosed = connection;
480 } //TODO: Not really the best way ever
481 if (!peerChoosed.interested) {
485 } while (peerChoosed == null && j < Common.MAXIMUM_PEERS);
488 Connection fastest = null;
489 double fastestSpeed = 0;
490 for (Connection c : peers.values()) {
491 if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
493 fastestSpeed = c.peerSpeed;
496 peerChoosed = fastest;
499 if (peerChoosed != null) {
500 activePeers.put(peerChoosed.id,peerChoosed);
501 peerChoosed.chokedUpload = false;
502 peerChoosed.lastUnchoke = Msg.getClock();
503 sendUnchoked(peerChoosed.mailbox);
507 * Updates our "interested" state about peers: send "not interested" to peers
508 * that don't have any more pieces we want.
510 private void updateInterestedAfterReceive() {
512 for (Connection connection : peers.values()) {
514 if (connection.amInterested) {
515 for (Integer piece : currentPieces) {
516 if (connection.bitfield[piece] == '1') {
522 connection.amInterested = false;
523 sendNotInterested(connection.mailbox);
528 private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
529 for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
530 bitfieldBlocks[index][i] = '1';
534 * Returns if a piece is complete in the peer's bitfield.
535 * @param index the index of the piece.
537 private boolean pieceComplete(int index) {
538 for (int i = 0; i < bitfieldBlocks[index].length; i++) {
539 if (bitfieldBlocks[index][i] == '0') {
546 * Returns the first block of a piece that we don't have.
548 private int getFirstBlock(int piece) {
550 for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
551 if (bitfieldBlocks[piece][i] == '0') {
559 * Send request messages to a peer that have unchoked us
560 * @param remotePeer peer data to the peer we want to send the request
562 private void sendRequestsToPeer(Connection remotePeer) {
563 if (remotePeer.bitfield == null) {
566 for (Integer piece : currentPieces) {
567 //Getting the block to send.
568 int blockIndex = -1, blockLength = 0;
569 blockIndex = getFirstBlock(piece);
570 blockLength = Common.PIECES_BLOCKS - blockIndex ;
571 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
572 if (remotePeer.bitfield[piece] == '1') {
573 sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
578 * Find the peers that have the current interested piece and send them
579 * the "interested" message
581 private void sendInterestedToPeers() {
582 if (currentPiece == -1) {
585 for (Connection connection : peers.values()) {
586 if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
587 connection.amInterested = true;
588 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
589 task.dsend(connection.mailbox);
596 * Send a "interested" message to a peer.
598 private void sendInterested(String mailbox) {
599 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
603 * Send a "not interested" message to a peer
604 * @param mailbox mailbox destination mailbox
606 private void sendNotInterested(String mailbox) {
607 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
611 * Send a handshake message to all the peers the peer has.
612 * @param peer peer data
614 private void sendHandshakeAll() {
615 for (Connection remotePeer : peers.values()) {
616 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
618 task.dsend(remotePeer.mailbox);
622 * Send a "handshake" message to an user
623 * @param mailbox mailbox where to we send the message
625 private void sendHandshake(String mailbox) {
626 Msg.debug("Sending a HANDSHAKE to " + mailbox);
627 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
631 * Send a "choked" message to a peer
633 private void sendChoked(String mailbox) {
634 Msg.debug("Sending a CHOKE to " + mailbox);
635 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
639 * Send a "unchoked" message to a peer
641 private void sendUnchoked(String mailbox) {
642 Msg.debug("Sending a UNCHOKE to " + mailbox);
643 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
647 * Send a "HAVE" message to all peers we are connected to
649 private void sendHave(int piece) {
650 Msg.debug("Sending HAVE message to all my peers");
651 for (Connection remotePeer : peers.values()) {
652 MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
653 task.dsend(remotePeer.mailbox);
657 * Send a bitfield message to all the peers the peer has.
658 * @param peer peer data
660 private void sendBitfield(String mailbox) {
661 Msg.debug("Sending a BITFIELD to " + mailbox);
662 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
666 * Send a "request" message to a pair, containing a request for a piece
668 private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
669 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + "," + (blockIndex + blockLength));
670 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, blockLength);
674 * Send a "piece" message to a pair, containing a piece of the file
676 private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
677 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
678 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled, blockIndex, blockLength);
682 private String getStatus() {
684 for (int i = 0; i < Common.FILE_PIECES; i++) {