3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Map.Entry;
8 import org.simgrid.msg.Comm;
9 import org.simgrid.msg.Host;
10 import org.simgrid.msg.Msg;
11 import org.simgrid.msg.MsgException;
12 import org.simgrid.msg.Process;
13 import org.simgrid.msg.Task;
16 * Main class for peers execution
18 public class Peer extends Process {
19 protected int round = 0;
21 protected double deadline;
24 protected String mailbox;
25 protected String mailboxTracker;
26 protected String hostname;
27 protected int pieces = 0;
28 protected char[] bitfield = new char[Common.FILE_PIECES];
29 protected short[] piecesCount = new short[Common.FILE_PIECES];
31 protected int piecesRequested = 0;
33 protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
34 protected int currentPiece = -1;
36 protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
37 protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
39 protected Comm commReceived = null;
41 public Peer(Host host, String name, String[]args) {
42 super(host,name,args);
46 public void main(String[] args) throws MsgException {
48 if (args.length != 3 && args.length != 2) {
49 Msg.info("Wrong number of arguments");
51 if (args.length == 3) {
52 init(Integer.valueOf(args[0]),true);
55 init(Integer.valueOf(args[0]),false);
57 //Retrieve the deadline
58 deadline = Double.valueOf(args[1]);
60 Msg.info("Wrong deadline supplied");
63 Msg.info("Hi, I'm joining the network with id " + id);
64 //Getting peer data from the tracker
66 Msg.debug("Got " + peers.size() + " peers from the tracker");
67 Msg.debug("Here is my current status: " + getStatus());
69 pieces = Common.FILE_PIECES;
79 Msg.info("Couldn't contact the tracker.");
81 Msg.info("Here is my current status: " + getStatus());
84 * Peer main loop when it is leeching.
86 private void leechLoop() {
87 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
88 Msg.debug("Start downloading.");
90 * Send a "handshake" message to all the peers it got
91 * (it couldn't have gotten more than 50 peers anyway)
94 //Wait for at least one "bitfield" message.
96 Msg.debug("Starting main leech loop");
97 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
98 if (commReceived == null) {
99 commReceived = Task.irecv(mailbox);
102 if (commReceived.test()) {
103 handleMessage(commReceived.getTask());
107 //If the user has a pending interesting
108 if (currentPiece != -1) {
109 sendInterestedToPeers();
112 if (currentPieces.size() < Common.MAX_PIECES) {
113 updateCurrentPiece();
116 //We don't execute the choke algorithm if we don't already have a piece
117 if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
119 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
126 catch (MsgException e) {
133 * Peer main loop when it is seeding
135 private void seedLoop() {
136 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
137 Msg.debug("Start seeding.");
138 //start the main seed loop
139 while (Msg.getClock() < deadline) {
140 if (commReceived == null) {
141 commReceived = Task.irecv(mailbox);
144 if (commReceived.test()) {
145 handleMessage(commReceived.getTask());
149 if (Msg.getClock() >= nextChokedUpdate) {
151 //TODO: Change the choked peer algorithm when seeding
152 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
159 catch (MsgException e) {
167 * Initialize the various peer data
168 * @param id id of the peer to take in the network
169 * @param seed indicates if the peer is a seed
171 private void init(int id, boolean seed) {
173 this.mailbox = Integer.toString(id);
174 this.mailboxTracker = "tracker_" + Integer.toString(id);
176 for (int i = 0; i < bitfield.length; i++) {
181 for (int i = 0; i < bitfield.length; i++) {
185 this.hostname = host.getName();
188 * Retrieves the peer list from the tracker
190 private boolean getPeersData() {
192 boolean success = false, sendSuccess = false;
193 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
194 //Build the task to send to the tracker
195 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
197 while (!sendSuccess && Msg.getClock() < timeout) {
199 Msg.debug("Sending a peer request to the tracker.");
200 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
203 catch (MsgException e) {
207 while (!success && Msg.getClock() < timeout) {
208 commReceived = Task.irecv(this.mailboxTracker);
210 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
211 if (commReceived.getTask() instanceof TrackerTask) {
212 TrackerTask task = (TrackerTask)commReceived.getTask();
213 for (Integer peerId: task.peers) {
214 if (peerId != this.id) {
215 peers.put(peerId, new Connection(peerId));
221 catch (MsgException e) {
230 * Handle a received message sent by another peer
231 * @param task task received.
233 void handleMessage(Task task) {
234 MessageTask message = (MessageTask)task;
235 Connection remotePeer = peers.get(message.peerId);
236 switch (message.type) {
238 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
239 //Check if the peer is in our connection list
240 if (remotePeer == null) {
241 peers.put(message.peerId, new Connection(message.peerId));
242 sendHandshake(message.mailbox);
244 //Send our bitfield to the pair
245 sendBitfield(message.mailbox);
248 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
249 //update the pieces list
250 updatePiecesCountFromBitfield(message.bitfield);
251 //Update the current piece
252 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
253 updateCurrentPiece();
255 remotePeer.bitfield = message.bitfield.clone();
258 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
259 assert remotePeer != null;
260 remotePeer.interested = true;
263 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
264 assert remotePeer != null;
265 remotePeer.interested = false;
268 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
269 assert remotePeer != null;
270 remotePeer.chokedDownload = false;
271 activePeers.put(remotePeer.id,remotePeer);
272 sendRequestsToPeer(remotePeer);
275 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
276 assert remotePeer != null;
277 remotePeer.chokedDownload = true;
278 activePeers.remove(remotePeer.id);
281 if (remotePeer.bitfield == null) {
284 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
285 assert message.index >= 0 && message.index < Common.FILE_PIECES;
286 assert remotePeer.bitfield != null;
287 remotePeer.bitfield[message.index] = '1';
288 piecesCount[message.index]++;
289 //Send interested message to the peer if he has what we want
290 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
291 remotePeer.amInterested = true;
292 sendInterested(remotePeer.mailbox);
295 if (currentPieces.contains(message.index)) {
296 sendRequest(message.mailbox,message.index);
300 assert message.index >= 0 && message.index < Common.FILE_PIECES;
301 if (!remotePeer.chokedUpload) {
302 Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
303 if (bitfield[message.index] == '1') {
304 sendPiece(message.mailbox,message.index,false);
307 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
312 if (message.stalled) {
313 Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
316 Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
317 if (bitfield[message.index] == '0') {
319 //Removing the piece from our piece list.
320 //TODO: It can not work, I should test it
321 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
323 //Setting the fact that we have the piece
324 bitfield[message.index] = '1';
326 Msg.debug("My status is now " + getStatus());
327 //Sending the information to all the peers we are connected to
328 sendHave(message.index);
329 //sending UNINTERSTED to peers that doesn't have what we want.
330 updateInterestedAfterReceive();
333 Msg.debug("However, we already have it.");
340 * Wait for the node to receive interesting bitfield messages (ie: non empty)
343 void waitForPieces() {
344 boolean finished = false;
345 while (Msg.getClock() < deadline && !finished) {
346 if (commReceived == null) {
347 commReceived = Task.irecv(mailbox);
350 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
351 handleMessage(commReceived.getTask());
352 if (currentPiece != -1) {
357 catch (MsgException e) {
363 private boolean hasFinished() {
364 for (int i = 0; i < bitfield.length; i++) {
365 if (bitfield[i] == '1') {
372 * Updates the list of who has a piece from a bitfield
373 * @param bitfield bitfield
375 private void updatePiecesCountFromBitfield(char bitfield[]) {
376 for (int i = 0; i < Common.FILE_PIECES; i++) {
377 if (bitfield[i] == '1') {
383 * Update the piece the peer is currently interested in.
384 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
385 * If the peer has less than 3 pieces, he chooses a piece at random.
386 * If the peer has more than pieces, he downloads the pieces that are the less
389 void updateCurrentPiece() {
390 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
393 if (true || pieces < 3) {
394 int i = 0, peerPiece;
396 currentPiece = ((int)Msg.getClock() + id + i) % Common.FILE_PIECES;
398 } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
401 //trivial min algorithm.
404 currentPieces.add(currentPiece);
405 Msg.debug("New interested piece: " + currentPiece);
406 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
409 * Update the list of current choked and unchoked peers, using the
412 private void updateChokedPeers() {
413 round = (round + 1) % 3;
414 if (peers.size() == 0) {
417 //remove a peer from the list
418 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
420 Entry<Integer,Connection> e = it.next();
421 Connection peerChoked = e.getValue();
422 sendChoked(peerChoked.mailbox);
423 peerChoked.chokedUpload = true;
424 activePeers.remove(e.getKey());
426 //Random optimistic unchoking
427 if (round == 0 || true) {
429 Connection peerChoosed = null;
432 int idChosen = ((int)Msg.getClock() + j) % peers.size();
433 for (Connection connection : peers.values()) {
435 peerChoosed = connection;
439 } //TODO: Not really the best way ever
440 if (!peerChoosed.interested) {
444 } while (peerChoosed == null && j < Common.MAXIMUM_PAIRS);
445 if (peerChoosed != null) {
446 activePeers.put(peerChoosed.id,peerChoosed);
447 peerChoosed.chokedUpload = false;
448 sendUnchoked(peerChoosed.mailbox);
453 * Updates our "interested" state about peers: send "not interested" to peers
454 * that don't have any more pieces we want.
456 private void updateInterestedAfterReceive() {
458 for (Connection connection : peers.values()) {
460 if (connection.amInterested) {
461 for (Integer piece : currentPieces) {
462 if (connection.bitfield[piece] == '1') {
468 connection.amInterested = false;
469 sendNotInterested(connection.mailbox);
475 * Send request messages to a peer that have unchoked us
476 * @param remotePeer peer data to the peer we want to send the request
478 private void sendRequestsToPeer(Connection remotePeer) {
479 for (Integer piece : currentPieces) {
480 if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
481 sendRequest(remotePeer.mailbox, piece);
486 * Find the peers that have the current interested piece and send them
487 * the "interested" message
489 private void sendInterestedToPeers() {
490 if (currentPiece == -1) {
493 for (Connection connection : peers.values()) {
494 if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
495 connection.amInterested = true;
496 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
497 task.dsend(connection.mailbox);
504 * Send a "interested" message to a peer.
506 private void sendInterested(String mailbox) {
507 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
511 * Send a "not interested" message to a peer
512 * @param mailbox mailbox destination mailbox
514 private void sendNotInterested(String mailbox) {
515 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
519 * Send a handshake message to all the peers the peer has.
520 * @param peer peer data
522 private void sendHandshakeAll() {
523 for (Connection remotePeer : peers.values()) {
524 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
526 task.dsend(remotePeer.mailbox);
530 * Send a "handshake" message to an user
531 * @param mailbox mailbox where to we send the message
533 private void sendHandshake(String mailbox) {
534 Msg.debug("Sending a HANDSHAKE to " + mailbox);
535 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
539 * Send a "choked" message to a peer
541 private void sendChoked(String mailbox) {
542 Msg.debug("Sending a CHOKE to " + mailbox);
543 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
547 * Send a "unchoked" message to a peer
549 private void sendUnchoked(String mailbox) {
550 Msg.debug("Sending a UNCHOKE to " + mailbox);
551 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
555 * Send a "HAVE" message to all peers we are connected to
557 private void sendHave(int piece) {
558 Msg.debug("Sending HAVE message to all my peers");
559 for (Connection remotePeer : peers.values()) {
560 MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
561 task.dsend(remotePeer.mailbox);
565 * Send a bitfield message to all the peers the peer has.
566 * @param peer peer data
568 private void sendBitfield(String mailbox) {
569 Msg.debug("Sending a BITFIELD to " + mailbox);
570 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
574 * Send a "request" message to a pair, containing a request for a piece
576 private void sendRequest(String mailbox, int piece) {
577 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
578 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
582 * Send a "piece" message to a pair, containing a piece of the file
584 private void sendPiece(String mailbox, int piece, boolean stalled) {
585 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
586 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
590 private String getStatus() {
592 for (int i = 0; i < Common.FILE_PIECES; i++) {