1 /* Copyright (c) 2006-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
9 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.Iterator;
12 import java.util.Map.Entry;
14 import org.simgrid.msg.Comm;
15 import org.simgrid.msg.Host;
16 import org.simgrid.msg.Msg;
17 import org.simgrid.msg.MsgException;
18 import org.simgrid.msg.Process;
19 import org.simgrid.msg.RngStream;
20 import org.simgrid.msg.Task;
23 * Main class for peers execution
25 public class Peer extends Process {
26 protected int round = 0;
28 protected double beginReceiveTime;
29 protected double deadline;
31 protected static RngStream stream = new RngStream();
34 protected String mailbox;
35 protected String mailboxTracker;
36 protected String hostname;
37 protected int pieces = 0;
38 protected char[] bitfield = new char[Common.FILE_PIECES];
39 protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
41 protected short[] piecesCount = new short[Common.FILE_PIECES];
43 protected int piecesRequested = 0;
45 protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
46 protected int currentPiece = -1;
48 protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
49 protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
51 protected Comm commReceived = null;
53 public Peer(Host host, String name, String[]args) {
54 super(host,name,args);
58 public void main(String[] args) throws MsgException {
60 if (args.length != 3 && args.length != 2) {
61 Msg.info("Wrong number of arguments");
63 if (args.length == 3) {
64 init(Integer.valueOf(args[0]),true);
67 init(Integer.valueOf(args[0]),false);
69 //Retrieve the deadline
70 deadline = Double.valueOf(args[1]);
72 Msg.info("Wrong deadline supplied");
75 Msg.info("Hi, I'm joining the network with id " + id);
76 //Getting peer data from the tracker
78 Msg.debug("Got " + peers.size() + " peers from the tracker");
79 Msg.debug("Here is my current status: " + getStatus());
80 beginReceiveTime = Msg.getClock();
82 pieces = Common.FILE_PIECES;
92 Msg.info("Couldn't contact the tracker.");
94 Msg.info("Here is my current status: " + getStatus());
97 * Peer main loop when it is leeching.
99 private void leechLoop() {
100 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
101 Msg.debug("Start downloading.");
103 * Send a "handshake" message to all the peers it got
104 * (it couldn't have gotten more than 50 peers anyway)
107 //Wait for at least one "bitfield" message.
109 Msg.debug("Starting main leech loop");
110 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
111 if (commReceived == null) {
112 commReceived = Task.irecv(mailbox);
115 if (commReceived.test()) {
116 handleMessage(commReceived.getTask());
120 //If the user has a pending interesting
121 if (currentPiece != -1) {
122 sendInterestedToPeers();
125 if (currentPieces.size() < Common.MAX_PIECES) {
126 updateCurrentPiece();
129 //We don't execute the choke algorithm if we don't already have a piece
130 if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
132 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
139 catch (MsgException e) {
146 * Peer main loop when it is seeding
148 private void seedLoop() {
149 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
150 Msg.debug("Start seeding.");
151 //start the main seed loop
152 while (Msg.getClock() < deadline) {
153 if (commReceived == null) {
154 commReceived = Task.irecv(mailbox);
157 if (commReceived.test()) {
158 handleMessage(commReceived.getTask());
162 if (Msg.getClock() >= nextChokedUpdate) {
164 //TODO: Change the choked peer algorithm when seeding
165 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
172 catch (MsgException e) {
180 * Initialize the various peer data
181 * @param id id of the peer to take in the network
182 * @param seed indicates if the peer is a seed
184 private void init(int id, boolean seed) {
186 this.mailbox = Integer.toString(id);
187 this.mailboxTracker = "tracker_" + Integer.toString(id);
189 for (int i = 0; i < bitfield.length; i++) {
191 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
192 bitfieldBlocks[i][j] = '1';
197 for (int i = 0; i < bitfield.length; i++) {
199 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
200 bitfieldBlocks[i][j] = '0' ;
204 this.hostname = host.getName();
207 * Retrieves the peer list from the tracker
209 private boolean getPeersData() {
211 boolean success = false, sendSuccess = false;
212 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
213 //Build the task to send to the tracker
214 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
216 while (!sendSuccess && Msg.getClock() < timeout) {
218 Msg.debug("Sending a peer request to the tracker.");
219 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
222 catch (MsgException e) {
226 while (!success && Msg.getClock() < timeout) {
227 commReceived = Task.irecv(this.mailboxTracker);
229 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
230 if (commReceived.getTask() instanceof TrackerTask) {
231 TrackerTask task = (TrackerTask)commReceived.getTask();
232 for (Integer peerId: task.peers) {
233 if (peerId != this.id) {
234 peers.put(peerId, new Connection(peerId));
240 catch (MsgException e) {
249 * Handle a received message sent by another peer
250 * @param task task received.
252 void handleMessage(Task task) {
253 MessageTask message = (MessageTask)task;
254 Connection remotePeer = peers.get(message.peerId);
255 switch (message.type) {
257 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
258 //Check if the peer is in our connection list
259 if (remotePeer == null) {
260 peers.put(message.peerId, new Connection(message.peerId));
261 sendHandshake(message.mailbox);
263 //Send our bitfield to the pair
264 sendBitfield(message.mailbox);
267 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
268 //update the pieces list
269 updatePiecesCountFromBitfield(message.bitfield);
270 //Update the current piece
271 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
272 updateCurrentPiece();
274 remotePeer.bitfield = message.bitfield.clone();
277 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
278 assert remotePeer != null;
279 remotePeer.interested = true;
282 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
283 assert remotePeer != null;
284 remotePeer.interested = false;
287 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
288 assert remotePeer != null;
289 remotePeer.chokedDownload = false;
290 activePeers.put(remotePeer.id,remotePeer);
291 sendRequestsToPeer(remotePeer);
294 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
295 assert remotePeer != null;
296 remotePeer.chokedDownload = true;
297 activePeers.remove(remotePeer.id);
300 if (remotePeer.bitfield == null) {
303 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
304 assert message.index >= 0 && message.index < Common.FILE_PIECES;
305 assert remotePeer.bitfield != null;
306 remotePeer.bitfield[message.index] = '1';
307 piecesCount[message.index]++;
308 //Send interested message to the peer if he has what we want
309 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
310 remotePeer.amInterested = true;
311 sendInterested(remotePeer.mailbox);
314 if (currentPieces.contains(message.index)) {
315 int blockIndex = getFirstBlock(message.index);
316 int blockLength = Common.PIECES_BLOCKS - blockIndex ;
317 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
318 sendRequest(message.mailbox,message.index,blockIndex,blockLength);
322 assert message.index >= 0 && message.index < Common.FILE_PIECES;
323 if (!remotePeer.chokedUpload) {
324 Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
325 if (bitfield[message.index] == '1') {
326 sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);
329 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
334 if (message.stalled) {
335 Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ") is stalled");
338 Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
339 if (bitfield[message.index] == '0') {
340 updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
341 if (pieceComplete(message.index)) {
343 //Removing the piece from our piece list.
344 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
346 //Setting the fact that we have the piece
347 bitfield[message.index] = '1';
349 Msg.debug("My status is now " + getStatus());
350 //Sending the information to all the peers we are connected to
351 sendHave(message.index);
352 //sending UNINTERESTED to peers that doesn't have what we want.
353 updateInterestedAfterReceive();
357 Msg.debug("However, we already have it.");
362 if (remotePeer != null) {
363 remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
365 beginReceiveTime = Msg.getClock();
368 * Wait for the node to receive interesting bitfield messages (ie: non empty)
371 void waitForPieces() {
372 boolean finished = false;
373 while (Msg.getClock() < deadline && !finished) {
374 if (commReceived == null) {
375 commReceived = Task.irecv(mailbox);
378 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
379 handleMessage(commReceived.getTask());
380 if (currentPiece != -1) {
385 catch (MsgException e) {
391 private boolean hasFinished() {
392 for (int i = 0; i < bitfield.length; i++) {
393 if (bitfield[i] == '1') {
400 * Updates the list of who has a piece from a bitfield
401 * @param bitfield bitfield
403 private void updatePiecesCountFromBitfield(char bitfield[]) {
404 for (int i = 0; i < Common.FILE_PIECES; i++) {
405 if (bitfield[i] == '1') {
411 * Update the piece the peer is currently interested in.
412 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
413 * If the peer has less than 3 pieces, he chooses a piece at random.
414 * If the peer has more than pieces, he downloads the pieces that are the less
417 void updateCurrentPiece() {
418 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
421 if (true || pieces < 3) {
422 int i = 0, peerPiece;
424 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
426 } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
429 //trivial min algorithm.
432 currentPieces.add(currentPiece);
433 Msg.debug("New interested piece: " + currentPiece);
434 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
437 * Update the list of current choked and unchoked peers, using the
440 private void updateChokedPeers() {
441 round = (round + 1) % 3;
442 if (peers.size() == 0) {
445 //remove a peer from the list
446 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
448 Entry<Integer,Connection> e = it.next();
449 Connection peerChoked = e.getValue();
450 peerChoked.chokedUpload = true;
451 sendChoked(peerChoked.mailbox);
452 activePeers.remove(e.getKey());
454 Connection peerChoosed = null;
455 //Separate the case from when the peer is seeding.
456 if (pieces == Common.FILE_PIECES) {
457 //Find the last unchoked peer.
458 double unchokeTime = deadline + 1;
459 for (Connection connection : peers.values()) {
460 if (connection.lastUnchoke < unchokeTime && connection.interested) {
461 peerChoosed = connection;
462 unchokeTime = connection.lastUnchoke;
467 //Random optimistic unchoking
472 int idChosen = stream.randInt(0,peers.size() - 1);
473 for (Connection connection : peers.values()) {
475 peerChoosed = connection;
479 } //TODO: Not really the best way ever
480 if (!peerChoosed.interested) {
484 } while (peerChoosed == null && j < Common.MAXIMUM_PEERS);
487 Connection fastest = null;
488 double fastestSpeed = 0;
489 for (Connection c : peers.values()) {
490 if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
492 fastestSpeed = c.peerSpeed;
495 peerChoosed = fastest;
498 if (peerChoosed != null) {
499 activePeers.put(peerChoosed.id,peerChoosed);
500 peerChoosed.chokedUpload = false;
501 peerChoosed.lastUnchoke = Msg.getClock();
502 sendUnchoked(peerChoosed.mailbox);
506 * Updates our "interested" state about peers: send "not interested" to peers
507 * that don't have any more pieces we want.
509 private void updateInterestedAfterReceive() {
511 for (Connection connection : peers.values()) {
513 if (connection.amInterested) {
514 for (Integer piece : currentPieces) {
515 if (connection.bitfield[piece] == '1') {
521 connection.amInterested = false;
522 sendNotInterested(connection.mailbox);
527 private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
528 for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
529 bitfieldBlocks[index][i] = '1';
533 * Returns if a piece is complete in the peer's bitfield.
534 * @param index the index of the piece.
536 private boolean pieceComplete(int index) {
537 for (int i = 0; i < bitfieldBlocks[index].length; i++) {
538 if (bitfieldBlocks[index][i] == '0') {
545 * Returns the first block of a piece that we don't have.
547 private int getFirstBlock(int piece) {
549 for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
550 if (bitfieldBlocks[piece][i] == '0') {
558 * Send request messages to a peer that have unchoked us
559 * @param remotePeer peer data to the peer we want to send the request
561 private void sendRequestsToPeer(Connection remotePeer) {
562 if (remotePeer.bitfield == null) {
565 for (Integer piece : currentPieces) {
566 //Getting the block to send.
567 int blockIndex = -1, blockLength = 0;
568 blockIndex = getFirstBlock(piece);
569 blockLength = Common.PIECES_BLOCKS - blockIndex ;
570 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
571 if (remotePeer.bitfield[piece] == '1') {
572 sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
577 * Find the peers that have the current interested piece and send them
578 * the "interested" message
580 private void sendInterestedToPeers() {
581 if (currentPiece == -1) {
584 for (Connection connection : peers.values()) {
585 if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
586 connection.amInterested = true;
587 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
588 task.dsend(connection.mailbox);
595 * Send a "interested" message to a peer.
597 private void sendInterested(String mailbox) {
598 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
602 * Send a "not interested" message to a peer
603 * @param mailbox mailbox destination mailbox
605 private void sendNotInterested(String mailbox) {
606 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
610 * Send a handshake message to all the peers the peer has.
611 * @param peer peer data
613 private void sendHandshakeAll() {
614 for (Connection remotePeer : peers.values()) {
615 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
617 task.dsend(remotePeer.mailbox);
621 * Send a "handshake" message to an user
622 * @param mailbox mailbox where to we send the message
624 private void sendHandshake(String mailbox) {
625 Msg.debug("Sending a HANDSHAKE to " + mailbox);
626 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
630 * Send a "choked" message to a peer
632 private void sendChoked(String mailbox) {
633 Msg.debug("Sending a CHOKE to " + mailbox);
634 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
638 * Send a "unchoked" message to a peer
640 private void sendUnchoked(String mailbox) {
641 Msg.debug("Sending a UNCHOKE to " + mailbox);
642 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
646 * Send a "HAVE" message to all peers we are connected to
648 private void sendHave(int piece) {
649 Msg.debug("Sending HAVE message to all my peers");
650 for (Connection remotePeer : peers.values()) {
651 MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
652 task.dsend(remotePeer.mailbox);
656 * Send a bitfield message to all the peers the peer has.
657 * @param peer peer data
659 private void sendBitfield(String mailbox) {
660 Msg.debug("Sending a BITFIELD to " + mailbox);
661 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
665 * Send a "request" message to a pair, containing a request for a piece
667 private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
668 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + "," + (blockIndex + blockLength));
669 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, blockLength);
673 * Send a "piece" message to a pair, containing a piece of the file
675 private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
676 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
677 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled, blockIndex, blockLength);
681 private String getStatus() {
683 for (int i = 0; i < Common.FILE_PIECES; i++) {