]> AND Public Git Repository - simgrid.git/blob - examples/bittorrent/Peer.java
Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add bittorrent example
[simgrid.git] / examples / bittorrent / Peer.java
1 package bittorrent;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Map.Entry;
7
8 import org.simgrid.msg.Comm;
9 import org.simgrid.msg.Host;
10 import org.simgrid.msg.Msg;
11 import org.simgrid.msg.MsgException;
12 import org.simgrid.msg.Process;
13 import org.simgrid.msg.Task;
14
15 /**
16  * Main class for peers execution
17  */
18 public class Peer extends Process {
19         protected int round = 0;
20         
21         protected double deadline;
22         
23         protected int id;
24         protected String mailbox;
25         protected String mailboxTracker;
26         protected String hostname;
27         protected int pieces = 0;
28         protected char[] bitfield = new char[Common.FILE_PIECES];
29         protected short[] piecesCount = new short[Common.FILE_PIECES];
30         
31         protected int piecesRequested = 0;
32         
33         protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
34         protected int currentPiece = -1;
35
36         protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();        
37         protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
38         
39         protected Comm commReceived = null;
40
41         public Peer(Host host, String name, String[]args) {
42                 super(host,name,args);
43         }       
44         
45         @Override
46         public void main(String[] args) throws MsgException {
47                 //Check arguments
48                 if (args.length != 3 && args.length != 2) {
49                         Msg.info("Wrong number of arguments");
50                 }
51                 if (args.length == 3) {
52                         init(Integer.valueOf(args[0]),true);
53                 }
54                 else {
55                         init(Integer.valueOf(args[0]),false);
56                 }
57                 //Retrieve the deadline
58                 deadline = Double.valueOf(args[1]);
59                 if (deadline < 0) {
60                         Msg.info("Wrong deadline supplied");
61                         return;
62                 }
63                 Msg.info("Hi, I'm joining the network with id " + id);
64                 //Getting peer data from the tracker
65                 if (getPeersData()) {
66                         Msg.debug("Got " + peers.size() + " peers from the tracker");
67                         Msg.debug("Here is my current status: " + getStatus());
68                         if (hasFinished()) {
69                                 pieces = Common.FILE_PIECES;
70                                 sendHandshakeAll();
71                                 seedLoop();
72                         }
73                         else {
74                                 leechLoop();
75                                 seedLoop();
76                         }
77                 }
78                 else {
79                         Msg.info("Couldn't contact the tracker.");
80                 }
81                 Msg.info("Here is my current status: " + getStatus());
82         }
83         /**
84          * Peer main loop when it is leeching.
85          */
86         private void leechLoop() {
87                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
88                 Msg.debug("Start downloading.");
89                 /**
90                  * Send a "handshake" message to all the peers it got
91                  * (it couldn't have gotten more than 50 peers anyway)
92                  */
93                 sendHandshakeAll();
94                 //Wait for at least one "bitfield" message.
95                 waitForPieces();
96                 Msg.debug("Starting main leech loop");
97                 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
98                         if (commReceived == null) {
99                                 commReceived = Task.irecv(mailbox);
100                         }
101                         try {
102                                 if (commReceived.test()) {
103                                         handleMessage(commReceived.getTask());
104                                         commReceived = null;
105                                 }
106                                 else {
107                                         //If the user has a pending interesting
108                                         if (currentPiece != -1) {
109                                                 sendInterestedToPeers();
110                                         }
111                                         else {
112                                                 if (currentPieces.size() < Common.MAX_PIECES) {
113                                                         updateCurrentPiece();
114                                                 }
115                                         }
116                                         //We don't execute the choke algorithm if we don't already have a piece
117                                         if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
118                                                 updateChokedPeers();
119                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
120                                         }
121                                         else {
122                                                 waitFor(1);
123                                         }
124                                 }
125                         }
126                         catch (MsgException e) {
127                                 commReceived = null;                            
128                         }
129                 }
130         }
131         
132         /**
133          * Peer main loop when it is seeding
134          */
135         private void seedLoop() {
136                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
137                 Msg.debug("Start seeding.");
138                 //start the main seed loop
139                 while (Msg.getClock() < deadline) {
140                         if (commReceived == null) {
141                                 commReceived = Task.irecv(mailbox);
142                         }
143                         try {
144                                 if (commReceived.test()) {
145                                         handleMessage(commReceived.getTask());
146                                         commReceived = null;
147                                 }
148                                 else {
149                                         if (Msg.getClock() >= nextChokedUpdate) {
150                                                 updateChokedPeers();
151                                                 //TODO: Change the choked peer algorithm when seeding
152                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
153                                         }
154                                         else {
155                                                 waitFor(1);
156                                         }
157                                 }
158                         }
159                         catch (MsgException e) {
160                                 commReceived = null;                            
161                         }
162
163                 }
164         }
165         
166         /**
167          * Initialize the various peer data
168          * @param id id of the peer to take in the network
169          * @param seed indicates if the peer is a seed
170          */
171         private void init(int id, boolean seed) {
172                 this.id = id;
173                 this.mailbox = Integer.toString(id);
174                 this.mailboxTracker = "tracker_" + Integer.toString(id);
175                 if (seed) {
176                         for (int i = 0; i < bitfield.length; i++) {
177                                 bitfield[i] = '1';
178                         }
179                 }
180                 else {
181                         for (int i = 0; i < bitfield.length; i++) {
182                                 bitfield[i] = '0';
183                         }                       
184                 }
185                 this.hostname = host.getName();
186         }
187         /**
188          * Retrieves the peer list from the tracker
189          */
190         private boolean getPeersData() {
191                 
192                 boolean success = false, sendSuccess = false;
193                 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
194                 //Build the task to send to the tracker
195                 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
196                         
197                 while (!sendSuccess && Msg.getClock() < timeout) {
198                         try {
199                                 Msg.debug("Sending a peer request to the tracker.");
200                                 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
201                                 sendSuccess = true;
202                         }
203                         catch (MsgException e) {
204                                 
205                         }
206                 }
207                 while (!success && Msg.getClock() < timeout) {
208                         commReceived = Task.irecv(this.mailboxTracker);
209                         try {
210                                 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
211                                 if (commReceived.getTask() instanceof TrackerTask) {
212                                         TrackerTask task = (TrackerTask)commReceived.getTask();
213                                         for (Integer peerId: task.peers) {
214                                                 if (peerId != this.id) {
215                                                         peers.put(peerId, new Connection(peerId));
216                                                 }       
217                                         }
218                                         success = true;
219                                 }
220                         }
221                         catch (MsgException e) {
222                                 
223                         }
224                         commReceived = null;
225                 }
226                 commReceived = null;
227                 return success;
228         }
229         /**
230          * Handle a received message sent by another peer
231          * @param task task received.
232          */
233         void handleMessage(Task task) {
234                 MessageTask message = (MessageTask)task;
235                 Connection remotePeer = peers.get(message.peerId);
236                 switch (message.type) {
237                         case HANDSHAKE:
238                                 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
239                                 //Check if the peer is in our connection list
240                                 if (remotePeer == null) {
241                                         peers.put(message.peerId, new Connection(message.peerId));
242                                         sendHandshake(message.mailbox);
243                                 }
244                                 //Send our bitfield to the pair
245                                 sendBitfield(message.mailbox);
246                         break;
247                         case BITFIELD:
248                                 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
249                                 //update the pieces list
250                                 updatePiecesCountFromBitfield(message.bitfield);
251                                 //Update the current piece
252                                 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
253                                         updateCurrentPiece();
254                                 }                               
255                                 remotePeer.bitfield  = message.bitfield.clone();
256                         break;
257                         case INTERESTED:
258                                 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
259                                 assert remotePeer != null;
260                                 remotePeer.interested = true;
261                         break;
262                         case NOTINTERESTED:
263                                 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
264                                 assert remotePeer != null;
265                                 remotePeer.interested = false;
266                         break;
267                         case UNCHOKE:
268                                 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
269                                 assert remotePeer != null;
270                                 remotePeer.chokedDownload = false;
271                                 activePeers.put(remotePeer.id,remotePeer);
272                                 sendRequestsToPeer(remotePeer);
273                         break;
274                         case CHOKE:
275                                 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
276                                 assert remotePeer != null;
277                                 remotePeer.chokedDownload = true;
278                                 activePeers.remove(remotePeer.id);
279                         break;
280                         case HAVE:
281                                 if (remotePeer.bitfield == null) {
282                                         return;
283                                 }
284                                 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
285                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
286                                 assert remotePeer.bitfield != null;
287                                 remotePeer.bitfield[message.index] = '1';
288                                 piecesCount[message.index]++; 
289                                 //Send interested message to the peer if he has what we want
290                                 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
291                                         remotePeer.amInterested = true;
292                                         sendInterested(remotePeer.mailbox);
293                                 }
294                                 
295                                 if (currentPieces.contains(message.index)) {
296                                         sendRequest(message.mailbox,message.index);
297                                 }
298                         break;
299                         case REQUEST:
300                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
301                                 if (!remotePeer.chokedUpload) {
302                                         Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
303                                         if (bitfield[message.index] == '1') {
304                                                 sendPiece(message.mailbox,message.index,false); 
305                                         }
306                                         else {
307                                                 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
308                                         }
309                                 }
310                         break;
311                         case PIECE:
312                                 if (message.stalled) {
313                                         Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
314                                 }
315                                 else {
316                                         Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
317                                         if (bitfield[message.index] == '0') {
318                                                 piecesRequested--;
319                                                 //Removing the piece from our piece list.
320                                                 //TODO: It can not work, I should test it
321                                                 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
322                                                 }
323                                                 //Setting the fact that we have the piece
324                                                 bitfield[message.index] = '1';
325                                                 pieces++;
326                                                 Msg.debug("My status is now " + getStatus());
327                                                 //Sending the information to all the peers we are connected to
328                                                 sendHave(message.index);
329                                                 //sending UNINTERSTED to peers that doesn't have what we want.
330                                                 updateInterestedAfterReceive();
331                                         }
332                                         else {
333                                                 Msg.debug("However, we already have it.");
334                                         }
335                                 }
336                         break;
337                 }
338         }
339         /**
340          * Wait for the node to receive interesting bitfield messages (ie: non empty)
341          * to be received
342          */
343         void waitForPieces() {
344                 boolean finished = false;
345                 while (Msg.getClock() < deadline && !finished) {
346                         if (commReceived == null) {
347                                 commReceived = Task.irecv(mailbox);
348                         }
349                         try {
350                                 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
351                                 handleMessage(commReceived.getTask());
352                                 if (currentPiece != -1) {
353                                         finished = true;
354                                 }
355                                 commReceived = null;
356                         }
357                         catch (MsgException e) {
358                                 commReceived = null;
359                         }
360                 }
361         }
362         
363         private boolean hasFinished() {
364                 for (int i = 0; i < bitfield.length; i++) {
365                         if (bitfield[i] == '1') {
366                                 return true;
367                         }
368                 }
369                 return false;
370         }
371         /**
372          * Updates the list of who has a piece from a bitfield
373          * @param bitfield bitfield
374          */
375         private void updatePiecesCountFromBitfield(char bitfield[]) {
376                 for (int i = 0; i < Common.FILE_PIECES; i++) {
377                         if (bitfield[i] == '1') {
378                                 piecesCount[i]++;
379                         }
380                 }
381         }
382         /**
383          * Update the piece the peer is currently interested in.
384          * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
385          * If the peer has less than 3 pieces, he chooses a piece at random.
386          * If the peer has more than pieces, he downloads the pieces that are the less
387          * replicated
388          */
389         void updateCurrentPiece() {
390                 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
391                         return;
392                 }
393                 if (true || pieces < 3) {
394                         int i = 0, peerPiece;
395                         do {
396                                 currentPiece = ((int)Msg.getClock() + id + i) % Common.FILE_PIECES;
397                                 i++;
398                         } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
399                 }
400                 else {
401                         //trivial min algorithm.
402                         //TODO
403                 }
404                 currentPieces.add(currentPiece);
405                 Msg.debug("New interested piece: " + currentPiece);
406                 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
407         }
408         /**
409          * Update the list of current choked and unchoked peers, using the
410          * choke algorithm
411          */
412         private void updateChokedPeers() {
413                 round = (round + 1) % 3;
414                 if (peers.size() == 0) {
415                         return;
416                 }
417                 //remove a peer from the list
418                 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
419                 if (it.hasNext()) {
420                         Entry<Integer,Connection> e = it.next();
421                         Connection peerChoked = e.getValue();
422                         sendChoked(peerChoked.mailbox);
423                         peerChoked.chokedUpload = true;
424                         activePeers.remove(e.getKey());
425                 }
426                 //Random optimistic unchoking
427                 if (round == 0 || true) {
428                         int j = 0, i;
429                         Connection peerChoosed = null;
430                         do {
431                                 i = 0;
432                                 int idChosen = ((int)Msg.getClock() + j) % peers.size();
433                                 for (Connection connection : peers.values()) {
434                                         if (i == idChosen) {
435                                                 peerChoosed = connection;
436                                                 break;
437                                         }
438                                         i++;
439                                 } //TODO: Not really the best way ever
440                                 if (!peerChoosed.interested) {
441                                         peerChoosed = null;
442                                 }
443                                 j++;
444                         } while (peerChoosed == null && j < Common.MAXIMUM_PAIRS);
445                         if (peerChoosed != null) {
446                                 activePeers.put(peerChoosed.id,peerChoosed);
447                                 peerChoosed.chokedUpload = false;
448                                 sendUnchoked(peerChoosed.mailbox);
449                         }
450                 }
451         }
452         /**     
453          * Updates our "interested" state about peers: send "not interested" to peers
454          * that don't have any more pieces we want.
455          */
456         private void updateInterestedAfterReceive() {
457                 boolean interested;
458                 for (Connection connection : peers.values()) {
459                         interested = false;
460                         if (connection.amInterested) {
461                                 for (Integer piece : currentPieces) {
462                                         if (connection.bitfield[piece] == '1') {
463                                                 interested = true;
464                                                 break;
465                                         }
466                                 }       
467                                 if (!interested) {
468                                         connection.amInterested = false;
469                                         sendNotInterested(connection.mailbox);
470                                 }
471                         }
472                 }
473         }
474         /**
475          * Send request messages to a peer that have unchoked us
476          * @param remotePeer peer data to the peer we want to send the request
477          */
478         private void sendRequestsToPeer(Connection remotePeer) {
479                 for (Integer piece : currentPieces) {
480                         if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
481                                 sendRequest(remotePeer.mailbox, piece);
482                         }                       
483                 }
484         }       
485         /**
486          * Find the peers that have the current interested piece and send them
487          * the "interested" message
488          */
489         private void sendInterestedToPeers() {
490                 if (currentPiece == -1) {
491                         return;
492                 }
493                 for (Connection connection : peers.values()) {
494                         if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
495                                 connection.amInterested = true;                         
496                                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
497                                 task.dsend(connection.mailbox);                         
498                         }
499                 }
500                 currentPiece = -1;
501                 piecesRequested++;
502         }
503         /**
504          * Send a "interested" message to a peer.
505          */
506         private void sendInterested(String mailbox) {
507                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
508                 task.dsend(mailbox);                                            
509         }
510         /**
511          * Send a "not interested" message to a peer
512          * @param mailbox mailbox destination mailbox
513          */
514         private void sendNotInterested(String mailbox) {
515                 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
516                 task.dsend(mailbox);                            
517         }
518         /**
519          * Send a handshake message to all the peers the peer has.
520          * @param peer peer data
521          */
522         private void sendHandshakeAll() {
523                 for (Connection remotePeer : peers.values()) {
524                         MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
525                         id);
526                         task.dsend(remotePeer.mailbox);
527                 }
528         }
529         /**
530          * Send a "handshake" message to an user
531          * @param mailbox mailbox where to we send the message
532          */
533         private void sendHandshake(String mailbox) {
534                 Msg.debug("Sending a HANDSHAKE to " + mailbox);
535                 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
536                 task.dsend(mailbox);            
537         }
538         /**
539          * Send a "choked" message to a peer
540          */
541         private void sendChoked(String mailbox) {
542                 Msg.debug("Sending a CHOKE to " + mailbox);
543                 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
544                 task.dsend(mailbox);
545         }
546         /**
547          * Send a "unchoked" message to a peer
548          */
549         private void sendUnchoked(String mailbox) {
550                 Msg.debug("Sending a UNCHOKE to " + mailbox);
551                 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
552                 task.dsend(mailbox);
553         }
554         /**
555          * Send a "HAVE" message to all peers we are connected to
556          */
557         private void sendHave(int piece) {
558                 Msg.debug("Sending HAVE message to all my peers");
559                 for (Connection remotePeer : peers.values()) {
560                         MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
561                         task.dsend(remotePeer.mailbox);
562                 }
563         }
564         /**
565          * Send a bitfield message to all the peers the peer has.
566          * @param peer peer data
567          */
568         private void sendBitfield(String mailbox) {
569                 Msg.debug("Sending a BITFIELD to " + mailbox);
570                 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
571                 task.dsend(mailbox);
572         }
573         /**
574          * Send a "request" message to a pair, containing a request for a piece
575          */
576         private void sendRequest(String mailbox, int piece) {
577                 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
578                 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
579                 task.dsend(mailbox);
580         }
581         /**
582          * Send a "piece" message to a pair, containing a piece of the file
583          */
584         private void sendPiece(String mailbox, int piece, boolean stalled) {
585                 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
586                 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
587                 task.dsend(mailbox);
588         }
589         
590         private String getStatus() {
591                 String s = "";
592                 for (int i = 0; i < Common.FILE_PIECES; i++) {
593                         s = s + bitfield[i];
594                 }
595                 return s;
596         }
597 }
598