2 * Copyright 2006-2012. The SimGrid Team. All rights reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package.
9 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.Iterator;
13 import java.util.Map.Entry;
15 import org.simgrid.msg.Comm;
16 import org.simgrid.msg.Host;
17 import org.simgrid.msg.Msg;
18 import org.simgrid.msg.MsgException;
19 import org.simgrid.msg.RngStream;
20 import org.simgrid.msg.Process;
21 import org.simgrid.msg.Task;
23 import bittorrent.Connection;
26 * Main class for peers execution
28 public class Peer extends Process {
29 protected int round = 0;
31 protected double beginReceiveTime;
32 protected double deadline;
34 protected static RngStream stream = new RngStream();
37 protected String mailbox;
38 protected String mailboxTracker;
39 protected String hostname;
40 protected int pieces = 0;
41 protected char[] bitfield = new char[Common.FILE_PIECES];
42 protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
44 protected short[] piecesCount = new short[Common.FILE_PIECES];
46 protected int piecesRequested = 0;
48 protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
49 protected int currentPiece = -1;
51 protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
52 protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
54 protected Comm commReceived = null;
56 public Peer(Host host, String name, String[]args) {
57 super(host,name,args);
61 public void main(String[] args) throws MsgException {
63 if (args.length != 3 && args.length != 2) {
64 Msg.info("Wrong number of arguments");
66 if (args.length == 3) {
67 init(Integer.valueOf(args[0]),true);
70 init(Integer.valueOf(args[0]),false);
72 //Retrieve the deadline
73 deadline = Double.valueOf(args[1]);
75 Msg.info("Wrong deadline supplied");
78 Msg.info("Hi, I'm joining the network with id " + id);
79 //Getting peer data from the tracker
81 Msg.debug("Got " + peers.size() + " peers from the tracker");
82 Msg.debug("Here is my current status: " + getStatus());
83 beginReceiveTime = Msg.getClock();
85 pieces = Common.FILE_PIECES;
95 Msg.info("Couldn't contact the tracker.");
97 Msg.info("Here is my current status: " + getStatus());
100 * Peer main loop when it is leeching.
102 private void leechLoop() {
103 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
104 Msg.debug("Start downloading.");
106 * Send a "handshake" message to all the peers it got
107 * (it couldn't have gotten more than 50 peers anyway)
110 //Wait for at least one "bitfield" message.
112 Msg.debug("Starting main leech loop");
113 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
114 if (commReceived == null) {
115 commReceived = Task.irecv(mailbox);
118 if (commReceived.test()) {
119 handleMessage(commReceived.getTask());
123 //If the user has a pending interesting
124 if (currentPiece != -1) {
125 sendInterestedToPeers();
128 if (currentPieces.size() < Common.MAX_PIECES) {
129 updateCurrentPiece();
132 //We don't execute the choke algorithm if we don't already have a piece
133 if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
135 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
142 catch (MsgException e) {
149 * Peer main loop when it is seeding
151 private void seedLoop() {
152 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
153 Msg.debug("Start seeding.");
154 //start the main seed loop
155 while (Msg.getClock() < deadline) {
156 if (commReceived == null) {
157 commReceived = Task.irecv(mailbox);
160 if (commReceived.test()) {
161 handleMessage(commReceived.getTask());
165 if (Msg.getClock() >= nextChokedUpdate) {
167 //TODO: Change the choked peer algorithm when seeding
168 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
175 catch (MsgException e) {
183 * Initialize the various peer data
184 * @param id id of the peer to take in the network
185 * @param seed indicates if the peer is a seed
187 private void init(int id, boolean seed) {
189 this.mailbox = Integer.toString(id);
190 this.mailboxTracker = "tracker_" + Integer.toString(id);
192 for (int i = 0; i < bitfield.length; i++) {
194 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
195 bitfieldBlocks[i][j] = '1';
200 for (int i = 0; i < bitfield.length; i++) {
202 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
203 bitfieldBlocks[i][j] = '0' ;
207 this.hostname = host.getName();
210 * Retrieves the peer list from the tracker
212 private boolean getPeersData() {
214 boolean success = false, sendSuccess = false;
215 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
216 //Build the task to send to the tracker
217 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
219 while (!sendSuccess && Msg.getClock() < timeout) {
221 Msg.debug("Sending a peer request to the tracker.");
222 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
225 catch (MsgException e) {
229 while (!success && Msg.getClock() < timeout) {
230 commReceived = Task.irecv(this.mailboxTracker);
232 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
233 if (commReceived.getTask() instanceof TrackerTask) {
234 TrackerTask task = (TrackerTask)commReceived.getTask();
235 for (Integer peerId: task.peers) {
236 if (peerId != this.id) {
237 peers.put(peerId, new Connection(peerId));
243 catch (MsgException e) {
252 * Handle a received message sent by another peer
253 * @param task task received.
255 void handleMessage(Task task) {
256 MessageTask message = (MessageTask)task;
257 Connection remotePeer = peers.get(message.peerId);
258 switch (message.type) {
260 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
261 //Check if the peer is in our connection list
262 if (remotePeer == null) {
263 peers.put(message.peerId, new Connection(message.peerId));
264 sendHandshake(message.mailbox);
266 //Send our bitfield to the pair
267 sendBitfield(message.mailbox);
270 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
271 //update the pieces list
272 updatePiecesCountFromBitfield(message.bitfield);
273 //Update the current piece
274 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
275 updateCurrentPiece();
277 remotePeer.bitfield = message.bitfield.clone();
280 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
281 assert remotePeer != null;
282 remotePeer.interested = true;
285 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
286 assert remotePeer != null;
287 remotePeer.interested = false;
290 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
291 assert remotePeer != null;
292 remotePeer.chokedDownload = false;
293 activePeers.put(remotePeer.id,remotePeer);
294 sendRequestsToPeer(remotePeer);
297 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
298 assert remotePeer != null;
299 remotePeer.chokedDownload = true;
300 activePeers.remove(remotePeer.id);
303 if (remotePeer.bitfield == null) {
306 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
307 assert message.index >= 0 && message.index < Common.FILE_PIECES;
308 assert remotePeer.bitfield != null;
309 remotePeer.bitfield[message.index] = '1';
310 piecesCount[message.index]++;
311 //Send interested message to the peer if he has what we want
312 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
313 remotePeer.amInterested = true;
314 sendInterested(remotePeer.mailbox);
317 if (currentPieces.contains(message.index)) {
318 int blockIndex = getFirstBlock(message.index);
319 int blockLength = Common.PIECES_BLOCKS - blockIndex ;
320 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
321 sendRequest(message.mailbox,message.index,blockIndex,blockLength);
325 assert message.index >= 0 && message.index < Common.FILE_PIECES;
326 if (!remotePeer.chokedUpload) {
327 Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
328 if (bitfield[message.index] == '1') {
329 sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);
332 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
337 if (message.stalled) {
338 Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ") is stalled");
341 Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
342 if (bitfield[message.index] == '0') {
343 updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
344 if (pieceComplete(message.index)) {
346 //Removing the piece from our piece list.
347 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
349 //Setting the fact that we have the piece
350 bitfield[message.index] = '1';
352 Msg.debug("My status is now " + getStatus());
353 //Sending the information to all the peers we are connected to
354 sendHave(message.index);
355 //sending UNINTERESTED to peers that doesn't have what we want.
356 updateInterestedAfterReceive();
360 Msg.debug("However, we already have it.");
365 if (remotePeer != null) {
366 remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
368 beginReceiveTime = Msg.getClock();
371 * Wait for the node to receive interesting bitfield messages (ie: non empty)
374 void waitForPieces() {
375 boolean finished = false;
376 while (Msg.getClock() < deadline && !finished) {
377 if (commReceived == null) {
378 commReceived = Task.irecv(mailbox);
381 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
382 handleMessage(commReceived.getTask());
383 if (currentPiece != -1) {
388 catch (MsgException e) {
394 private boolean hasFinished() {
395 for (int i = 0; i < bitfield.length; i++) {
396 if (bitfield[i] == '1') {
403 * Updates the list of who has a piece from a bitfield
404 * @param bitfield bitfield
406 private void updatePiecesCountFromBitfield(char bitfield[]) {
407 for (int i = 0; i < Common.FILE_PIECES; i++) {
408 if (bitfield[i] == '1') {
414 * Update the piece the peer is currently interested in.
415 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
416 * If the peer has less than 3 pieces, he chooses a piece at random.
417 * If the peer has more than pieces, he downloads the pieces that are the less
420 void updateCurrentPiece() {
421 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
424 if (true || pieces < 3) {
425 int i = 0, peerPiece;
427 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
429 } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
432 //trivial min algorithm.
435 currentPieces.add(currentPiece);
436 Msg.debug("New interested piece: " + currentPiece);
437 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
440 * Update the list of current choked and unchoked peers, using the
443 private void updateChokedPeers() {
444 round = (round + 1) % 3;
445 if (peers.size() == 0) {
448 //remove a peer from the list
449 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
451 Entry<Integer,Connection> e = it.next();
452 Connection peerChoked = e.getValue();
453 peerChoked.chokedUpload = true;
454 sendChoked(peerChoked.mailbox);
455 activePeers.remove(e.getKey());
457 Connection peerChoosed = null;
458 //Separate the case from when the peer is seeding.
459 if (pieces == Common.FILE_PIECES) {
460 //Find the last unchoked peer.
461 double unchokeTime = deadline + 1;
462 for (Connection connection : peers.values()) {
463 if (connection.lastUnchoke < unchokeTime && connection.interested) {
464 peerChoosed = connection;
465 unchokeTime = connection.lastUnchoke;
470 //Random optimistic unchoking
475 int idChosen = stream.randInt(0,peers.size() - 1);
476 for (Connection connection : peers.values()) {
478 peerChoosed = connection;
482 } //TODO: Not really the best way ever
483 if (!peerChoosed.interested) {
487 } while (peerChoosed == null && j <
488 Common.MAXIMUM_PEERS);
491 Connection fastest = null;
492 double fastestSpeed = 0;
493 for (Connection c : peers.values()) {
494 if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
496 fastestSpeed = c.peerSpeed;
499 peerChoosed = fastest;
502 if (peerChoosed != null) {
503 activePeers.put(peerChoosed.id,peerChoosed);
504 peerChoosed.chokedUpload = false;
505 peerChoosed.lastUnchoke = Msg.getClock();
506 sendUnchoked(peerChoosed.mailbox);
510 * Updates our "interested" state about peers: send "not interested" to peers
511 * that don't have any more pieces we want.
513 private void updateInterestedAfterReceive() {
515 for (Connection connection : peers.values()) {
517 if (connection.amInterested) {
518 for (Integer piece : currentPieces) {
519 if (connection.bitfield[piece] == '1') {
525 connection.amInterested = false;
526 sendNotInterested(connection.mailbox);
531 private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
532 for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
533 bitfieldBlocks[index][i] = '1';
537 * Returns if a piece is complete in the peer's bitfield.
538 * @param index the index of the piece.
540 private boolean pieceComplete(int index) {
541 for (int i = 0; i < bitfieldBlocks[index].length; i++) {
542 if (bitfieldBlocks[index][i] == '0') {
549 * Returns the first block of a piece that we don't have.
551 private int getFirstBlock(int piece) {
553 for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
554 if (bitfieldBlocks[piece][i] == '0') {
562 * Send request messages to a peer that have unchoked us
563 * @param remotePeer peer data to the peer we want to send the request
565 private void sendRequestsToPeer(Connection remotePeer) {
566 if (remotePeer.bitfield == null) {
569 for (Integer piece : currentPieces) {
570 //Getting the block to send.
571 int blockIndex = -1, blockLength = 0;
572 blockIndex = getFirstBlock(piece);
573 blockLength = Common.PIECES_BLOCKS - blockIndex ;
574 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
575 if (remotePeer.bitfield[piece] == '1') {
576 sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
581 * Find the peers that have the current interested piece and send them
582 * the "interested" message
584 private void sendInterestedToPeers() {
585 if (currentPiece == -1) {
588 for (Connection connection : peers.values()) {
589 if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
590 connection.amInterested = true;
591 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
592 task.dsend(connection.mailbox);
599 * Send a "interested" message to a peer.
601 private void sendInterested(String mailbox) {
602 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
606 * Send a "not interested" message to a peer
607 * @param mailbox mailbox destination mailbox
609 private void sendNotInterested(String mailbox) {
610 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
614 * Send a handshake message to all the peers the peer has.
615 * @param peer peer data
617 private void sendHandshakeAll() {
618 for (Connection remotePeer : peers.values()) {
619 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
621 task.dsend(remotePeer.mailbox);
625 * Send a "handshake" message to an user
626 * @param mailbox mailbox where to we send the message
628 private void sendHandshake(String mailbox) {
629 Msg.debug("Sending a HANDSHAKE to " + mailbox);
630 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
634 * Send a "choked" message to a peer
636 private void sendChoked(String mailbox) {
637 Msg.debug("Sending a CHOKE to " + mailbox);
638 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
642 * Send a "unchoked" message to a peer
644 private void sendUnchoked(String mailbox) {
645 Msg.debug("Sending a UNCHOKE to " + mailbox);
646 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
650 * Send a "HAVE" message to all peers we are connected to
652 private void sendHave(int piece) {
653 Msg.debug("Sending HAVE message to all my peers");
654 for (Connection remotePeer : peers.values()) {
655 MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
656 task.dsend(remotePeer.mailbox);
660 * Send a bitfield message to all the peers the peer has.
661 * @param peer peer data
663 private void sendBitfield(String mailbox) {
664 Msg.debug("Sending a BITFIELD to " + mailbox);
665 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
669 * Send a "request" message to a pair, containing a request for a piece
671 private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
672 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + "," + (blockIndex + blockLength));
673 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, blockLength);
677 * Send a "piece" message to a pair, containing a piece of the file
679 private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
680 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
681 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled, blockIndex, blockLength);
685 private String getStatus() {
687 for (int i = 0; i < Common.FILE_PIECES; i++) {