3 import java.rmi.Naming;
4 import java.util.Calendar;
5 import java.util.GregorianCalendar;
6 import java.util.Vector;
8 public class JaceSpawner {
11 private Task tache = null;
12 public static JaceSpawner Instance;
13 private static String superNode_IP = null;
14 private int superNode_port = 1098;
15 private static int spawnerPort = 1099;
16 private static JaceSuperNodeInterface centralServer = null;
17 private JaceSpawnerInterface spawnerRef = null;
19 private String appliName;
20 private String[] params = null;
21 @SuppressWarnings("unused")
22 private String protocol;
23 // private int registerVersion=0;
24 final int NB_HEART_DECONNECT = 3;
25 private int heartTime; // frequency of heartBeat
26 @SuppressWarnings("unused")
27 private int timeBeforeKill; // wait 3 non-response of heartBeat before
28 // considering de Daemon as dead
29 private boolean broadcasting = false;
30 @SuppressWarnings("unused")
32 private static int nbOfDaemonsPerSpawner;
33 private static int nbOfDeamonsPerThread;
34 private Vector<Object> spawnersList;
36 private int nbSavingNodes;
38 // Variables for Mapping
40 private double paramAlgo ;
41 private String idAlgo ;
43 public JaceSpawner(String superNode, int port, String comProtocol,
44 String[] args, int nbDaemonPerSpawner, int nbDaemonPerThread,
45 int nbSavingNodes, int _algo, double _paramAlgo) {
46 // superNode_IP = LocalHost.Instance().resolve(superNode);
48 paramAlgo = _paramAlgo ;
50 superNode_IP = superNode;
52 protocol = comProtocol;
53 nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
54 nbOfDeamonsPerThread = nbDaemonPerThread;
55 this.nbSavingNodes = nbSavingNodes;
58 // if less than 2 params (nb of tasks and name of the appli), error
59 System.err.println( "Parameters error !" ) ;
63 nbTasks = new Integer(args[0]).intValue(); // nb of tasks
66 } catch (Exception e) {
67 System.err.println("Number format exception :" + e ) ;
70 appliName = args[1]; // name of the class to launch
71 if (args.length > 2) { // get the eventual param of the appli
72 params = new String[args.length - 2];
73 for (int i = 0; i < params.length; i++) {
74 params[i] = args[2 + i];
78 c = load.load(appliName);
80 tache = ((Task) c.newInstance());
81 tache.setParam(params);
82 tache.setJaceSize(nbTasks);
86 } catch (Exception e) {
87 System.err.println( "Unable to instantiate the class " + e ) ;
97 public JaceSpawner(String[] params, String appliName, Register reg,
98 int nbTasks, JaceSuperNodeInterface snodeStub, int rank,
99 int heartTime, int tag, int nbdc, int nbsdc,
100 int nbDaemonPerSpawner, int nbDaemonPerThread, String _idAlgo) {
102 nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
103 nbOfDeamonsPerThread = nbDaemonPerThread;
104 if (params.length != 0) {
105 this.params = new String[params.length];
106 for (int i = 0; i < params.length; i++)
107 this.params[i] = params[i];
110 System.err.println( "There is no parameter !" ) ;
112 } catch (Exception e) {
113 System.err.println("Error in copying the parameters: " + e ) ;
117 // System.out.println("xxxxxxxxxxxxxxx reg size="+reg.getSize()+" xxxxxxxxxxxxxx");
118 this.appliName = appliName;
120 this.nbTasks = nbTasks;
121 this.heartTime = heartTime;
122 LocalHost.Instance().setSuperNodeStub(snodeStub);
123 centralServer = snodeStub;
125 Register.Instance().replaceBy(reg);
126 Register.Instance().setSpawnerStub(this.spawnerRef);
127 Register.Instance().getListeOfTasks().viewAll();
131 c = load.load(appliName);
133 tache = ((Task) c.newInstance());
134 tache.setParam(params);
135 tache.setJaceSize(nbTasks);
136 // ****************//
138 } catch (Exception e) {
139 System.err.println("Unable to instantiate the class " + e);
141 RunningApplication.Instance().getChrono().start();
143 RunningApplication.Instance().setName(appliName);
144 RunningApplication.Instance().setNbTasks(nbTasks);
145 RunningApplication.Instance().setRunning(true);
146 RunningApplication.Instance().setNumberOfDisconnections(nbdc);
147 RunningApplication.Instance().setNumberOfSpawnerDisconnections(nbsdc);
148 // System.out.println("+++++++++++++++++++++++++");
151 broadcastRegister(1);
154 * x=Register.Instance().getListeOfTasks().getSize()/nbOfDaemonsPerSpawner
155 * ; int s; if(rank==x)
156 * s=(reg.getListeOfTasks().getSize()%nbOfDaemonsPerSpawner
157 * )/nbOfDeamonsPerThread; else
158 * s=nbOfDaemonsPerSpawner/nbOfDeamonsPerThread;
160 * int debut=nbOfDaemonsPerSpawnerrank;
163 * for(int i=0;i<s+1;i++){
165 * new BroadcastSpawner(i,
166 * debut,nbOfDaemonsPerSpawner,nbOfDeamonsPerThread).start(); }
170 System.out.println("########################");
173 public synchronized static JaceSpawner Instance() {
177 public int getNbOfDeamonsPerThread() {
178 return nbOfDeamonsPerThread;
181 public int getNbOfDeamonsPerSpawner() {
182 return nbOfDaemonsPerSpawner;
185 public void startProcess(Vector<Object> spawnersList) {
186 this.spawnersList = spawnersList;
188 int is = spawnersList.indexOf((Object) Register.Instance()
193 if (is == spawnersList.size() - 1)
196 nextNeighbour = is + 1;
198 * while((spawnersList.elementAt(nextNeighbour) instanceof Node))
200 * System.out.println("waiting till transform of spawner "+nextNeighbour
201 * +" is finished, for setServer"); Thread.sleep(20);
202 * }catch(Exception e1){}
204 HeartBeatSpawner.Instance().setServer(
205 (JaceSpawnerInterface) spawnersList.get(nextNeighbour));
206 HeartBeatSpawner.Instance().setHeartTime(heartTime);
207 HeartBeatSpawner.Instance().start();
208 int previousNeighbour;
210 previousNeighbour = spawnersList.size() - 1;
212 previousNeighbour = is - 1;
213 ScanThreadSpawner.Instance().setHeartTime(heartTime);
214 ScanThreadSpawner.Instance().setServer(
215 (JaceSpawnerInterface) spawnersList.get(previousNeighbour));
216 ScanThreadSpawner.Instance().start();
219 // System.out.println("apres broadcastScanning");
220 new StartScanning().start();
222 System.err.println("Cannot find myself in the spawnersList !");
227 public void setBroadcasting(boolean bool) {
231 public void initialize() {
232 // if(protocol.equals("rmi")){
233 // launch the JaceSpawnerServer
238 // get a Register on the Super Node
239 // completed with the required number of Daemons
240 getRegisterOnSuperNode();
243 createSpawnerNetwork();
248 public void startScanning() {
250 long time = RunningApplication.Instance().getChrono().getValue() / 1000;
251 System.out.println("Start scanning at time: " + time + "s");
252 // lancer le chrono qui gere les heart beat
253 while (RunningApplication.Instance().isRunning() == true) {
254 // 1 etape : scaner tous les "heartTime" milisecondes si les noeuds
255 // enregistes sont encore vivants
256 // res = scanConnectedHosts();
258 // 2 etape : a garder ou pas !!!!! regarder si l'appli est en
259 // attente de noeud pr lui en attribuer 1 nvx
262 Thread.sleep(heartTime);
263 } catch (Exception e) {
266 // /System.out.println("is running = false");
267 if (!JaceDaemon.Instance().isRunning())
271 public synchronized void signalDeadNode(JaceInterface host, int rankOfDead) {
273 TaskId myTaskId = null;
277 RunningApplication.Instance().incrementNumberOfDisconnections();
279 time = RunningApplication.Instance().getChrono().getValue() / 1000;
280 nb = RunningApplication.Instance().getNumberOfDisconnections();
281 nbC = RunningApplication.Instance().getNumberOfCouilles();
282 System.out.println("At time = " + time + "s, NbDisconnection = "
283 + nb + ", NbProblem = " + nbC);
285 // !!!!!!!!!!!!!!actualiser le ListeTask ds le Register
286 myTaskId = Register.Instance().getListeOfTasks()
287 .getTaskIdOfHostStub(host);
288 if (myTaskId == null) {
289 Register.Instance.getListeOfTasks().viewAll();
290 myTaskId = Register.Instance().getListeOfTasks()
291 .getTaskIdOfRank(rankOfDead);
292 JaceInterface deadStub = myTaskId.getHostStub();
293 deadStub.suicide("Not doing a good work");
295 myTaskId.setHostIP(null);
296 myTaskId.setHostName(null);
297 Node noeud = Register.Instance().getNodeOfStub(
298 myTaskId.getHostStub());
299 myTaskId.setHostStub(null);
300 int rankDeaD = myTaskId.getRank();
302 String nomNoeud = noeud.getName();
303 // Register.Instance().removeNodeAt(i);
304 // Register.Instance().removeNode(host.getIP());
305 // System.out.println("fait le remove : taille = " +
306 // Register.Instance().getSize());
308 boolean b = Register.Instance().removeNodeOfName(noeud.getName());
311 System.out.println("Removing Node of rank "
312 + rankDeaD + " : size = "
313 + Register.Instance().getSize());
316 .println("Cannot remove the Node, it doesn't exist anymore: size = "
317 + Register.Instance().getSize());
320 Calendar cal = new GregorianCalendar();
321 System.out.println("At time=" + cal.get(Calendar.MINUTE) + ":"
322 + cal.get(Calendar.SECOND));
324 // retrouver SI POSSIBLE un autre libre pr remplacer celui la pr
327 /**** Sébastien Miquée **/
328 //Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud);
329 Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud, noeud);
331 // broadcastRegister(0);
332 updateConcernedNodes(rankDeaD, noeud, tmpNode);
335 System.out.println("Set scanning on %%%%%%");
336 tmpNode.getStub().setScanning(true);
337 } catch (Exception e) {
338 System.err.println("Unable to setScannig on for the new node: "
342 // Register.Instance().getListeOfTasks().viewAll();
343 for (int z = 0; z < spawnersList.size(); z++)
344 if (!((JaceSpawnerInterface) spawnersList.get(z))
345 .equals(Register.Instance().getSpawnerStub()))
347 ((JaceSpawnerInterface) spawnersList.get(z))
348 .replaceDeamonBy(noeud, tmpNode, rankDeaD);
350 } catch (Exception e) {
352 .println("Unable to broadcast the modifications to all the spawners: "
355 } catch (Exception ee) {
356 System.err.println("Error in signalDeadNode() :" + ee);
360 // verifie si les noeud notes vivant ds le Register.Instance() du SuperNode
362 // retourne 0 si erreur, 1 sinon
364 * private synchronized int scanConnectedHosts() { long time = 0; Node host;
365 * Node tmpNode; long workerTime; long currentTime; int rank; int restempo;
366 * int nb = 0; int nbC = 0; boolean changed = false; int index=0; try{
367 * JaceSpawnerInterface spawnerStub=Register.Instance().getSpawnerStub();
368 * if(spawnerStub.getFinished()==true){
369 * System.out.println("nbre de taches="+Register.Instance().getSize());
370 * ListeTask t=Register.Instance().getListeOfTasks();
371 * for(index=z;index<t.getSize();index++){ TaskId recev = null;
372 * System.out.println("deleting Task************"+index);
374 * recev = t.get(index); JaceInterface stub=recev.getHostStub();
375 * spawnerStub.killApplication(stub); }
379 * } }catch(Exception e){
380 * System.out.println("w aiiiiiiiiiiiiiirrrr"+e+" index="+index); z=index;
383 * if (Register.Instance().getSize() == 0) {
384 * System.out.println("aucun noeuds a scanner");
385 * RunningApplication.Instance().purge(); System.exit(0);
392 // trouver un noeud sur les superNode
393 // pr les requisitionner
395 /*** Sébastien Miquée ***/
397 //private synchronized Node foundToReplaceThisNode(int theRank, String nom) {
398 private synchronized Node foundToReplaceThisNode(int theRank, String nom, Node _n) {
400 boolean found = false ;
403 while( found == false ) {
406 //node = centralServer.getNewNode(LocalHost.Instance().getIP());
407 node = centralServer.getNewNode( idAlgo, _n);
413 Thread.sleep( 1000 ) ;
414 System.out.println("Pas de bon retour !");
416 } catch (Exception e) {
417 // trouver un autre superNode et lui demander le noeud a lui
419 System.err.println("Cannot localize SuperNode ! " + e);
427 System.out.println("Using Node " + node.getName() + " ("
428 + node.getIP() + ") in order to replace " + nom
429 + " size before add: " + Register.Instance().getSize()
431 node.setAliveFlag(true);
434 // rajouter le noeud ds le Register
435 node.setAppliName(RunningApplication.Instance().getName());
437 // lui envoyer mon stub pr qu'il commence a me pinguer des
439 // TODO a mettre ds un thread ????
443 * neighborTask=Register.Instance().getListeOfTasks().getTaskIdOfRank
444 * ((theRank+1)%Register.Instance().getListeOfTasks().getSize());
445 * try{ node.getStub().updateHeart(neighborTask.getHostStub()); }
446 * catch(Exception e) {
447 * System.out.println("nvx noeud deja plu dispo2"); //node = null; }
449 // TODO verif pourkoi superNode me le redonne
450 // alors qu'il fait deja du calcul
451 // int is = Register.Instance().existNode(node.getIP());
452 int is = Register.Instance().existNode(node);
454 System.out.println("The Node is already in the register ! I don't add it.");
455 System.out.println("Node " + node.getName() + " not added !") ;
458 Register.Instance().addNode(node);
460 // !!!!!!!!!!!!!!actualiser le ListeTask
461 TaskId myTaskId = Register.Instance().getListeOfTasks()
462 .getTaskIdOfRank(theRank);
463 myTaskId.setHostIP(node.getIP());
464 myTaskId.setHostName(node.getName());
465 myTaskId.setHostStub(node.getStub());
467 // Register.Instance().getListeOfTasks().viewAll();
470 neighborRank = Register.Instance().getSize() - 1;
472 neighborRank = theRank - 1;
473 TaskId neighborTask2 = Register.Instance().getListeOfTasks()
474 .getTaskIdOfRank(neighborRank);
476 JaceInterface jaceStub = neighborTask2.getHostStub();
477 jaceStub.updateHeart(node.getStub());
478 } catch (Exception e) {
479 System.err.println("Next node unreachable ! " + e);
486 System.out.println("I didn't receive a new Node !");
491 public void replaceBy(JaceSpawnerInterface oldStub,
492 JaceSpawnerInterface stub) {
493 int index = spawnersList.indexOf((Object) oldStub);
495 spawnersList.setElementAt(stub, index);
497 System.err.println("Spawner's stub not foud in spawnersList !");
500 public void getNewSpawner(JaceSpawnerInterface previousSpawner) {
501 //boolean found = false;
504 JaceSpawnerInterface spawnerStub = null;
506 // while (found == false) {
508 // TODO : trouver l'erreur !!!
510 // "pas localise le super node java.lang.NullPointerException"
511 if (centralServer == null) {
512 System.err.println("Central Server not localized !");
514 node = centralServer.getNewNode( idAlgo, null ) ;
515 RunningApplication.Instance()
516 .incrementNumberOfSpawnerDisconnections();
518 } catch (Exception e) {
519 // trouver un autre superNode et lui demander le noeud a lui
520 System.err.println("Super Node not localized !\n " + e);
521 // System.out.println("pas localise le super node " + e);
522 // System.out.println("pas localise le super node " + e);
523 // System.out.println("pas localise le super node " + e);
524 // System.out.println("pas localise le super node " + e);
525 // System.out.println("pas localise le super node " + e);
526 // System.out.println("pas localise le super node " + e);
527 // System.out.println("pas localise le super node " + e);
528 System.err.println("My IP : " + LocalHost.Instance().getIP());
529 if (centralServer == null) {
530 System.err.println("CentralServer is NULL !");
536 index = spawnersList.indexOf((Object) previousSpawner);
538 System.out.println("Using Node " + node.getName()
540 + LocalHost.Instance().resolve(node.getName())
541 + ") to replace a dead spawner\n\n");
543 // Register.Instance().viewAll();
544 // Register.Instance().getListeOfTasks().viewAll();
545 spawnerStub = node.getStub().transformIntoSpawner(
554 RunningApplication.Instance()
555 .getNumberOfDisconnections(),
556 RunningApplication.Instance()
557 .getNumberOfSpawnerDisconnections(),
558 nbOfDaemonsPerSpawner, nbOfDeamonsPerThread
560 spawnersList.setElementAt(spawnerStub, index);
562 // spawnerStub.setIdAlgo( idAlgo ) ;
565 new StartProcessThread(index).start();
566 // spawnerStub.startProcess( spawnersList);
567 } catch (Exception e) {
568 System.err.println("Unable to reach the new spawner: " + e);
570 for (int j = 0; j < spawnersList.size(); j++)
572 if (!((JaceSpawnerInterface) spawnersList.get(j))
573 .equals(Register.Instance().getSpawnerStub())
574 && !((JaceSpawnerInterface) spawnersList.get(j))
575 .equals(spawnerStub)) {
577 .println("Trying to broadcast to spawner of rank "
580 ((JaceSpawnerInterface) spawnersList.get(j))
581 .replaceBy(previousSpawner, spawnerStub);
583 } catch (Exception e) {
585 .println("Unable to broadcast to spawner of rank: "
586 + j + ". Error:" + e);
588 ScanThreadSpawner.Instance().setServer(spawnerStub);
592 previous = spawnersList.size() - 1;
594 previous = index - 1;
596 ((JaceSpawnerInterface) spawnersList.get(previous))
597 .updateHeart(spawnerStub);
598 } catch (Exception e) {
600 .println("Unable to change the server of the heartbeatThread for the previous node of rank "
601 + previous + ". error:" + e);
605 System.err.println("Node is null !");
610 public void broadcastFinished(boolean bool) {
611 for (int i = 0; i < spawnersList.size(); i++)
613 ((JaceSpawnerInterface) spawnersList.get(i)).setFinished(bool);
614 } catch (Exception e) {
616 .println("Unable to propagate the end of the application :"
621 private synchronized void scanAppliNodes() {
624 //ListeTask tskList = null;
628 JaceSpawnerInterface spawnerStub = Register.Instance()
630 if (spawnerStub.getFinished() == true) {
631 System.out.println("Number of tasks ="
632 + Register.Instance().getSize());
634 int x = Register.Instance().getListeOfTasks().getSize()
635 / nbOfDaemonsPerSpawner;
638 s = (Register.Instance().getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
639 / nbOfDeamonsPerThread;
641 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
643 int debut = nbOfDaemonsPerSpawner * rank;
645 // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
646 // i<reg.getSize();i++)
647 // System.out.println(((Node)nodes.elementAt(i)).getName());
649 ListeTask t = Register.Instance().getListeOfTasks();
650 ScanThreadSpawner.Instance().kill();
651 HeartBeatSpawner.Instance().kill();
653 for (int i = 0; i < s + 1; i++) {
655 new KillThread(i, debut, nbOfDaemonsPerSpawner,
656 nbOfDeamonsPerThread, t).start();
661 long finalTime = RunningApplication.Instance().getChrono()
664 int nbe = RunningApplication.Instance()
665 .getNumberOfDisconnections();
667 int nbsdc = RunningApplication.Instance()
668 .getNumberOfSpawnerDisconnections();
670 System.out.println("Application finished successfully !");
671 // System.out.println("Application finished successfully !!!!!!");
672 // System.out.println("Application finished successfully !!!!!!");
673 // System.out.println("Application finished successfully !!!!!!");
674 // System.out.println("Application finished successfully !!!!!!");
675 // System.out.println("Application finished successfully !!!!!!");
677 // .println("Application finished successfully !!!!!!\n");
678 System.out.println("TOTAL TIME in s : " + (finalTime / 1000));
679 System.out.println("nb of desconnections: " + nbe);
680 System.out.println("nb of spawners desconnections: " + nbsdc);
681 if (JaceDaemon.Instance().isRunning()) {
682 JaceDaemon.Instance().reconnectSuperNode();
684 RunningApplication.Instance().purge();
689 RunningApplication.Instance().purge();
693 /** Suprresion of the mapping algorithm on the SuperNode **/
694 centralServer.removeAlgo( idAlgo, 0 ) ;
696 } catch( Exception e ) {
697 System.err.println( "Error the application nodes scan!\n " + e ) ;
701 * if (Register.Instance().getSize() == 0) {
702 * System.out.println("aucun noeuds a scanner");
703 * RunningApplication.Instance().purge(); System.exit(0); return 0;
705 * } else{ tskList = Register.Instance().getListeOfTasks();
707 * //si mon appli a besoin d'un noeud //sinon, on fait rien if ((nbTasks
708 * - Register.Instance().getSize()) > 0) { cptReplaced = 0;
710 * //TODO demander des paquet de nodes, pas qu'un //on scanne toutes les
711 * taches de cette appli for (int ind = 0; ind < tskList.getSize();
712 * ind++) { //si 1 tache a pas de noeud, on trouve 1 remplacant
714 * //if (tskList.get(ind).getHostIP() == null) { if
715 * (tskList.get(ind).getHostStub() == null) { rank =
716 * tskList.get(ind).getRank(); node = foundToReplaceThisNodeTMP(rank);
717 * if (node != null) { cptReplaced++; }
721 * //qd fini de scanner taches, envoyer Register //si remplacement de
722 * noeud (c a d si Register modifier) if (cptReplaced != 0) {
723 * broadcastRegister(0); } try { Thread.currentThread().yield(); } catch
726 * }// fin if(appli.getNeededNodes() > 0) {
727 * //System.out.println("SCAN APPLI : taille : " +
728 * Register.Instance().getSize()); return 1; }
732 // @SuppressWarnings("unused")
733 // private synchronized Node foundToReplaceThisNodeTMP(int theRank) {
735 // boolean found = false;
737 // // while (found == false) {
739 // // TODO : trouver l'erreur !!!
741 // // "pas localise le super node java.lang.NullPointerException"
742 // if (centralServer == null) {
743 // System.out.println("centralServer est NUUUUUUUUULL");
745 // node = centralServer.getNewNode(LocalHost.Instance().getIP());
748 // } catch (Exception e) {
749 // // trouver un autre superNode et lui demander le noeud a lui
750 // System.out.println("TMP pas localise le super node " + e);
751 // System.out.println("TMP pas localise le super node " + e);
752 // System.out.println("TMP pas localise le super node " + e);
753 // System.out.println("TMP pas localise le super node " + e);
754 // System.out.println("TMP pas localise le super node " + e);
755 // System.out.println("TMP pas localise le super node " + e);
756 // System.out.println("TMP pas localise le super node " + e);
757 // System.out.println("mon IP : " + LocalHost.Instance().getIP());
758 // if (centralServer == null) {
759 // System.out.println("centralServer : NULL");
761 // connectSuperNode();
764 // if (node != null) {
765 // System.out.println("COOOOOOOOOOOOOOOOOOOOOOL : requisition de "
766 // + node.getName() + " taille avt add: "
767 // + Register.Instance().getSize() + "\n\n");
768 // node.setAliveFlag(true);
769 // node.setAliveTime();
771 // // rajouter le noeud ds le Register
772 // System.out.println("ds Register, manque "
773 // + (nbTasks - Register.Instance().getSize()));
774 // node.setAppliName(RunningApplication.Instance().getName());
776 // // lui envoyer mon stub pr qu'il commence a me pinguer des
778 // // TODO a mettre ds un thread ????
780 // TaskId neighborTask = Register.Instance().getListeOfTasks()
783 // % Register.Instance().getListeOfTasks()
785 // node.getStub().updateHeart(neighborTask.getHostStub());
786 // // node.getStub().updateHeart(this.spawnerRef);
788 // // int is = Register.Instance().existNode(node.getIP());
789 // int is = Register.Instance().existNode(node);
790 // // TODO verif pourkoi superNode me le redonne
791 // // alors qu'il fait deja du calcul
793 // System.out.println("j'ajoute pas le noeud, il y est deja");
794 // System.out.println("PAS AJOUTEE TMP " + node.getName());
795 // System.out.println("PAS AJOUTEE TMP " + node.getName());
796 // System.out.println("PAS AJOUTEE TMP " + node.getName());
797 // System.out.println("PAS AJOUTEE TMP " + node.getName());
798 // System.out.println("PAS AJOUTEE TMP " + node.getName());
801 // Register.Instance().addNode(node);
803 // // !!!!!!!!!!!!!!actualiser le ListeTask
804 // TaskId myTaskId = Register.Instance().getListeOfTasks()
805 // .getTaskIdOfRank(theRank);
806 // myTaskId.setHostIP(node.getIP());
807 // myTaskId.setHostName(node.getName());
808 // myTaskId.setHostStub(node.getStub());
809 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostIP(node.getIP());
810 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostName(node.getName());
811 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostStub(node.getStub());
813 // } catch (Exception e) {
814 // System.out.println("nvx noeud deja plu dispo");
818 // System.out.println("RADINNNNNNNNNNNNNN TMP ");
823 private void exportObject() {
825 JaceSpawnerServer spawnerServer = null;
827 System.out.println("Name of local machine is: "
828 + LocalHost.Instance().getName());
829 System.out.println("IP of local machine is: "
830 + LocalHost.Instance().getIP());
832 // launch the JaceSpawnerServer
833 spawnerServer = new JaceSpawnerServer();
834 java.rmi.registry.LocateRegistry.createRegistry(spawnerPort);
835 java.rmi.registry.LocateRegistry.getRegistry(spawnerPort).rebind(
836 "JaceSpawnerServer", spawnerServer);
837 spawnerRef = (JaceSpawnerInterface) Naming.lookup("rmi://"
838 + LocalHost.Instance().getIP() + ":" + spawnerPort
839 + "/JaceSpawnerServer");
841 } catch (Exception e) {
843 .println("JaceP2P_Error in JaceSpawner.exportObject() when creating the local JaceSpawnerServer "
845 // System.err.println("exit ds JaceSpawner.exportObject");
851 public void connectSuperNode() {
852 System.out.println("I'm looking for a super node");
853 boolean connected = false;
854 if (!(superNode_IP == null)) {
856 System.out.println("Trying to invoke super node "
858 centralServer = (JaceSuperNodeInterface) Naming.lookup("rmi://"
859 + superNode_IP + ":" + superNode_port
861 System.out.println("Succesfully located " + superNode_IP);
863 // add stub and IP in LocalHost to store it until super node
865 LocalHost.Instance().setSuperNodeStub(centralServer);
866 LocalHost.Instance().setSuperNodeIP(superNode_IP);
867 heartTime = centralServer.getSuperNodeBeat();
868 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
871 } catch (Exception e) {
872 System.err.println("Super Node not accessible, try another one (1/2s)");
875 } catch (Exception e1) {
880 if (connected == false) {
882 SuperNodeListe.Instance().staticInitialization();
883 while (connected == false
884 && i < SuperNodeListe.Instance().getListe().size()) {
885 SuperNodeData d = null;
886 d = SuperNodeListe.Instance().getSuperNodeData(i);
888 superNode_IP = LocalHost.Instance().resolve(d.getIP());
889 superNode_port = d.getPort();
890 // superNode_port = d.getPort();
892 System.out.println("Trying to invoke Super Node "
894 centralServer = (JaceSuperNodeInterface) Naming
895 .lookup("rmi://" + superNode_IP + ":"
896 + superNode_port + "/JaceSuperNode");
897 System.out.println("Succesfully located SuperNode "
899 LocalHost.Instance().setSuperNodeStub(centralServer);
900 LocalHost.Instance().setSuperNodeIP(superNode_IP);
901 heartTime = centralServer.getSuperNodeBeat();
902 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
905 } catch (Exception e) {
907 .println("SuperNode "
909 + " not accessible, trying to locate another one in 0.5s\n");
913 } catch (Exception e1) {
919 if (connected == false) {
920 System.err.println("All the Super Nodes in the list are not accessible. I'm unable to connect to the platform !");
926 // get a Register on the SuperNode
927 // completed with the required number of Daemons
929 public synchronized void getRegisterOnSuperNode() {
930 Register registerSpawner = null;
932 boolean recieved = false;
934 System.out.println("Trying to get a Register on the SuperNode");
935 int nbExtraSpawners = 0;
936 if (nbTasks > nbOfDaemonsPerSpawner) {
937 nbExtraSpawners = (nbTasks - 1) / nbOfDaemonsPerSpawner;
942 registerSpawner = centralServer.getRegisterSpawner(LocalHost
943 .Instance().getIP(), nbTasks, (Task) tache, nbTasks
944 + nbExtraSpawners, algo, paramAlgo);
946 } catch (Exception e) {
948 .println("Unable to recieve a register from superNode "
954 idAlgo = LocalHost.Instance().getIP() + ":" + LocalHost.Instance().getPort() ;
956 if (registerSpawner.getSize() != (nbTasks + nbExtraSpawners)) {
957 System.err.println("I did not recieve enough nodes from superNode!!!! \n killing application !!!!");
958 for (int i = 0; i < registerSpawner.getSize(); i++) {
960 registerSpawner.getNodeAt(i).getStub().reconnectSuperNode();
961 } catch (Exception e) {
962 System.err.println("The reserved node was unable to reconnect to the super node");
968 spawnersList = new Vector<Object>();
969 for (int i = 0; i < nbExtraSpawners && i < registerSpawner.getSize(); i++) {
970 spawnersList.add(registerSpawner.getNodeAt(0));
971 // * nbOfDaemonsPerSpawner));
972 registerSpawner.removeNodeOfName(registerSpawner.getNodeAt(0).getName());
973 // * nbOfDaemonsPerSpawner));
976 registerSpawner.setNbOfTasks(nbTasks);
977 registerSpawner.setNumBackupNeighbors(nbSavingNodes);
979 * System.out.println("Trying to connect another SuperNode");
980 * connectSuperNode(); try { registerSpawner =
981 * centralServer.getRegisterSpawner(LocalHost.Instance().getIP(),
982 * nbTasks); } catch(Exception e1) {}
985 if (registerSpawner != null) {
986 System.out.println("I received the register");
987 // registerSpawner.setVersion(registerVersion);
988 // registerVersion++;
989 Register.Instance().replaceBy(registerSpawner);
990 System.out.println("It contains " + Register.Instance().getSize()
991 + " Nodes" + " " + nbExtraSpawners + " ExtraSpawners");
993 // set each Node aliveTime value to the Spawner current time
994 for (int i = 0; i < Register.Instance().getSize(); i++) {
995 noeud = Register.Instance().getNodeAt(i);
996 noeud.setAliveFlag(true);
997 noeud.setAliveTime();
1001 System.err.println("\n---------------WARNING--------------");
1002 System.err.println("No Daemon available on the SuperNode dispo, try later, please");
1009 // * Set the identifier of the mapping algorithm used.
1010 // * @param _s The mapping identifier
1012 // * @author Sébastien Miquée
1014 // public void setIdAlgo( String _s ) throws RemoteException
1016 // System.err.println("############# SET ID ALGO ################# "+_s);
1020 public class TransformThread extends Thread {
1024 public TransformThread(int i, Node n) {
1032 System.out.println("Trying to transform the spawner ("
1033 + n.getName() + ") of rank " + i);
1034 spawnersList.setElementAt(n.getStub().transformIntoSpawner(
1035 params, appliName, Register.Instance(), nbTasks,
1036 centralServer, i, heartTime, 0, 0, 0,
1037 nbOfDaemonsPerSpawner, nbOfDeamonsPerThread, idAlgo), i);
1038 } catch (Exception e) {
1039 System.err.println("Error while contacting newly acquired spawner ("
1040 + n.getName() + "): " + e);
1042 n = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ;
1044 new TransformThread(i, n).start();
1045 } catch (Exception e1) {
1046 System.err.println("The Super Node is maybe dead: " + e1) ;
1047 for (int z = 0; z < Register.Instance().getSize(); z++) {
1049 Register.Instance().getNodeAt(z).getStub()
1050 .reconnectSuperNode();
1051 } catch (Exception ez) {
1052 System.err.println("The reserved node was unable to reconnect to the super node: \n"
1062 public class StartProcessThread extends Thread {
1065 public StartProcessThread(int i) {
1073 * while((spawnersList.elementAt(i) instanceof Node)) try{
1074 * System.out.println("waiting till transform of spawner "+i+
1075 * " is finished"); Thread.sleep(20); }catch(Exception e1){}
1077 // System.out.println("start process on spawner of rank "+i);
1078 JaceSpawnerInterface spawnerStub = (JaceSpawnerInterface) spawnersList.get(i);
1079 spawnerStub.startProcess(spawnersList);
1080 } catch (Exception e) {
1081 e.printStackTrace(System.out);
1082 System.err.println("Unable to start the process on the spawner of rank "
1083 + i + ".error: " + e);
1088 public void createSpawnerNetwork() {
1091 for (i = 0; i < spawnersList.size(); i++) {
1092 n = (Node) spawnersList.elementAt(i);
1094 // Register.Instance().getListeOfTasks().viewAll();
1095 // spawnersList.setElementAt(n.getStub().transformIntoSpawner(
1096 // params, appliName, Register.Instance(),nbTasks, centralServer,i,
1097 // heartTime,0,0),i);
1098 new TransformThread(i, n).start();
1101 // broadcast the Register.Instance() to all the JaceServer
1102 // in order to start each task on the Daemons
1104 spawnersList.add(Register.Instance().getSpawnerStub());
1105 System.out.println(" rank="+rank+" spawnersList.size()=" + spawnersList.size());
1106 rank = spawnersList.size() - 1;
1108 broadcastRegister(1);
1110 for (int j = 0; j < spawnersList.size(); j++) {
1111 System.out.println("waiting till transform of spawner " + j
1113 while ((spawnersList.elementAt(j) instanceof Node))
1118 } catch (Exception e) {
1123 // for (int k = 0; k < spawnersList.size(); k++)
1126 // ((JaceSpawnerInterface) spawnersList.get( k )).setIdAlgo( idAlgo ) ;
1127 // } catch (Exception e) {
1128 // System.err.println("Unable to propagate the mapping algorithm identifier:" + e) ;
1132 System.out.println("End Transformation of all spawners. Beginning the computing processes");
1134 for (i = 0; i < spawnersList.size(); i++) {
1136 // while(!(spawnersList.elementAt(i) instanceof
1137 // JaceSpawnerInterface))
1139 new StartProcessThread(i).start();
1142 System.out.println("End create Spawner Network!!!!!!!!!");
1145 public JaceSpawnerInterface getSpawnerResponsibleOn(int rank) {
1146 int id = rank / nbOfDaemonsPerSpawner;
1147 return (JaceSpawnerInterface) spawnersList.get(id);
1150 public void createAppli() {
1155 ListeTask tsk = new ListeTask();
1157 JaceInterface nodeStub = null;
1158 TaskId myTask = null;
1160 System.out.println("Application launched, starting the chronometer");
1161 RunningApplication.Instance().getChrono().start();
1163 RunningApplication.Instance().setName(appliName);
1164 RunningApplication.Instance().setNbTasks(nbTasks);
1165 // RunningApplication.Instance().setRegister(Register.Instance());
1167 Register.Instance().setParams(params);
1168 Register.Instance().setAppliName(appliName);
1169 Register.Instance().setSpawnerStub(this.spawnerRef);
1171 // assign a TaskId to each Node of the Register
1172 // and insert the TaskId in tke ListTask
1173 while (i < Register.Instance().getSize() && count < nbTasks) {
1174 tmpNode = Register.Instance().getNodeAt(i);
1175 if (tmpNode.getAliveFlag() == true) {
1176 tmpNode.setAppliName(appliName);
1177 nodeStub = tmpNode.getStub();
1178 nodeName = tmpNode.getName();
1179 nodeIP = tmpNode.getIP();
1181 myTask = new TaskId(appliName, count, nodeStub);
1182 myTask.setHostIP(nodeIP);
1183 myTask.setHostName(nodeName);
1185 tsk.addTask(myTask);
1191 // if not enough Nodes in the Register,
1192 // insert not assigned TaskId in the ListTask
1193 if (count < nbTasks) {
1194 for (int j = count; j < nbTasks; j++) {
1195 tsk.addTask(new TaskId(appliName, j, null));
1197 System.out.println("in Register, misses "
1198 + (nbTasks - Register.Instance().getSize()) + " nodes");
1201 // insert the ListeTask in the Register of the appli
1202 Register.Instance().setListeOfTasks(tsk);
1203 // Register.Instance().getListeOfTasks().viewAll();
1204 RunningApplication.Instance().setRunning(true);
1205 System.out.println("fin create appli");
1208 public class BroadcastSpawner extends Thread {
1211 int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
1213 public BroadcastSpawner(int i, int debut, int nbOfDeamonsPerSpawner,
1214 int nbOfDaemonsPerThread) {
1217 this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
1218 this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
1223 for (int index = debut + i * nbOfDaemonsPerThread; index < debut
1224 + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1225 && index < debut + nbOfDeamonsPerSpawner
1226 && index < Register.Instance().getListeOfTasks().getSize(); index++) {
1228 Register.Instance().getNodeAt(index).getStub().setSpawner(
1229 Register.Instance().getSpawnerStub());
1230 } catch (Exception e) {
1231 System.out.println("can't change spawner stub on node: "
1232 + Register.Instance().getNodeAt(i).getName()
1239 public class KillThread extends Thread {
1242 int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
1245 public KillThread(int i, int debut, int nbOfDeamonsPerSpawner,
1246 int nbOfDaemonsPerThread, ListeTask t) {
1249 this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
1250 this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
1257 for (int index = debut + i * nbOfDaemonsPerThread; index < debut
1258 + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1259 && index < debut + nbOfDeamonsPerSpawner
1260 && index < t.getSize(); index++) {
1263 TaskId recev = null;
1264 System.out.println("deleting Task" + index);
1266 recev = t.getTaskIdOfRank(index);
1268 JaceInterface stub = recev.getHostStub();
1269 System.out.println("name=" + recev.getHostName());
1270 noeud = Register.Instance().getNodeOfStub(stub);
1271 noeud.setAppliName(null);
1272 new ReconnectThread(stub, noeud.getName()).start();
1273 Register.Instance().removeNode(noeud);
1274 // LocalHost.Instance().getSpawnerStub().killApplication(stub);
1276 } catch (Exception e) {
1278 System.err.println("error in killThread on node "
1279 + noeud.getName() + ". " + e);
1280 } catch (Exception e2) {
1281 System.err.println("error in error :" + e2);
1287 class ReconnectThread extends Thread {
1288 JaceInterface stub = null;
1291 public ReconnectThread(JaceInterface s, String name) {
1298 // System.out.println("reconnexion SuperNode");
1299 // Register.Instance().getNode(workerIP).getWorkerStub().reconnectSuperNode();
1301 // stub.reconnectSuperNode();
1302 stub.suicide("fin d'appli");
1303 } catch (Exception e) {
1304 System.err.println("can't kill node " + name);
1312 // faire une copie du Register et l'envoyer aux noeuds qui le compose
1313 // car si il est modif en meme tmp, on envoi pas un truc coherent
1314 private synchronized void broadcastRegister(int requete) {
1315 // Register reg = Register.Instance().clone();
1316 Register reg = Register.Instance();
1319 System.out.println("name of spawner: "
1320 + Register.Instance().getSpawnerStub().getName());
1321 // launch 1 thread to send the Register to all the nodes
1322 while (broadcasting == true)
1324 broadcasting = true;
1325 // Register.Instance().setSpawnerStub(
1326 // Register.Instance().getSpawnerStub());
1327 int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
1330 if ((reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1331 % nbOfDeamonsPerThread == 0)
1332 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1333 / nbOfDeamonsPerThread;
1335 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1336 / nbOfDeamonsPerThread + 1;
1337 else if ((nbOfDaemonsPerSpawner % nbOfDeamonsPerThread) == 0)
1338 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
1340 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread + 1;
1341 int debut = nbOfDaemonsPerSpawner * rank;
1342 System.out.println("rank=" + rank + " debut=" + debut + " s=" + s
1343 + " nbOfDaemonsPerSpawner=" + nbOfDaemonsPerSpawner
1344 + " nbOfDeamonsPerThread=" + nbOfDeamonsPerThread + " x="
1346 for (int i = 0; i < s; i++)
1347 new UpdateRegisterThread(tache, reg, requete, i, debut).start();
1349 * This thread : -updates the goal of the Node beats if necessary
1350 * (stub.updateHeart) -updates the Register on each Node
1351 * (stub.updateRegister)
1353 JaceSpawner.Instance().setBroadcasting(false);
1356 } catch (Exception e) {
1359 } catch (Exception e) {
1361 .println("\n1 node has died during JaceSpawner.broadcastRegister()");
1365 private synchronized void broadcastScanning() {
1366 Register reg = Register.Instance();
1367 while (broadcasting == true)
1370 } catch (Exception e) {
1372 // Register.Instance().viewAll();
1373 Vector<?> nodes = (Vector<?>) Register.Instance().getListOfNodes().clone();
1374 int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
1377 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1378 / nbOfDeamonsPerThread;
1380 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
1382 int debut = nbOfDaemonsPerSpawner * rank;
1384 // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
1385 // i<reg.getSize();i++)
1386 // System.out.println(((Node)nodes.elementAt(i)).getName());
1388 for (int i = 0; i < s + 1; i++) {
1390 new StartScanThread(i, nodes, debut).start();
1395 public Register getRegister(int rank) {
1397 ListeTask listOfTasks = Register.Instance().getListeOfTasks();
1398 Vector<Integer> dependencies = getDependencies(rank, listOfTasks.getSize());
1399 Register g = new Register();
1400 ListeTask newListOfTasks = new ListeTask();
1401 g.setAppliName(Register.Instance().getAppliName());
1402 g.setParams(Register.Instance().getParams());
1403 g.setSpawnerStub(Register.Instance().getSpawnerStub());
1404 g.setNbOfTasks(Register.Instance().getNbOfTasks());
1405 // g.setVersion(reg.getVersion());
1406 for (int j = 0; j < dependencies.size(); j++) {
1407 TaskId id = listOfTasks.getTaskIdOfRank(((Integer) dependencies
1408 .elementAt(j)).intValue());
1409 newListOfTasks.addTask(id);
1410 if (id.getHostStub() != null) {
1411 Node noeud = Register.Instance()
1412 .getNodeOfStub(id.getHostStub());
1416 g.setListeOfTasks(newListOfTasks);
1420 private void updateConcernedNodes(int rank, Node oldNode, Node node) {
1421 ListeTask listOfTasks = Register.Instance().getListeOfTasks();
1422 Vector<?> dependencies = getDependencies(rank, listOfTasks.getSize());
1423 System.out.println("la liste des voisins concernes de : " + rank);
1424 for (int z = 0; z < dependencies.size(); z++)
1425 System.out.print(((Integer) dependencies.elementAt(z)).intValue()
1427 System.out.println();
1428 // Register.Instance().setVersion(registerVersion);
1429 // registerVersion++;
1431 .setSpawnerStub(Register.Instance().getSpawnerStub());
1433 if ((dependencies.size() % nbOfDeamonsPerThread) == 0)
1434 s = dependencies.size() / nbOfDeamonsPerThread;
1436 s = dependencies.size() / nbOfDeamonsPerThread + 1;
1437 Register reg = Register.Instance();
1439 for (int j = 0; j < s; j++) {
1440 new UpdateRegisterConcernedThread(dependencies, reg, j, rank,
1441 oldNode, node).start();
1445 private Vector<Integer> getDependencies(int id, int jaceSize) {
1446 // get computing dependencies
1447 Vector<Integer> neighbors = new Vector<Integer>();
1448 int[] dep = tache.getDependencies(id);
1449 for (int z = 0; z < taille(dep); z++)
1450 neighbors.add(dep[z]);
1451 // System.out.println("la liste des voisins de calcul de: "+id+" concerne");
1452 // for(int z=0;z<neighbors.size();z++)
1453 // System.out.print(((Integer)neighbors.elementAt(z)).intValue()+" ");
1454 // System.out.println();
1456 // get convergence neighbors
1458 while (Math.pow(2, d) < jaceSize) {
1459 if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize))
1460 if (!neighbors.contains((Object) ((int) (id + Math.pow(2, d)))))
1461 neighbors.add((int) (id + Math.pow(2, d)));
1462 if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d))
1463 if (!neighbors.contains((Object) ((int) (id - Math.pow(2, d)))))
1464 neighbors.add((int) (id - Math.pow(2, d)));
1468 // get backup neighbors
1469 int nb = Register.Instance().getNumBackupNeighbors();
1472 for (int j = 1; j <= nb; j++) {
1473 // ------------ 1 - for backups "j + n" (to the right of j)
1474 rankOfBackTask = (id + j) % jaceSize;
1475 if (!neighbors.contains((Object) rankOfBackTask))
1476 neighbors.add(rankOfBackTask);
1478 // ------------ 2 - for backups "j - n" (to the left of j)
1481 rankOfBackTask = tmp % jaceSize;
1483 rankOfBackTask = jaceSize - (Math.abs(tmp) % jaceSize);
1485 if (!neighbors.contains((Object) rankOfBackTask))
1486 neighbors.add(rankOfBackTask);
1494 public static int taille(int[] vect) {
1497 while (x < vect.length && vect[x] >= 0) {
1504 class StartScanning extends Thread {
1506 public StartScanning() {
1516 class StartScanThread extends Thread {
1519 int nbOfDeamonsPerThread, nbOfDeamonsPerSpawner;
1521 StartScanThread(int i, Vector<?> nodes, int debut) {
1525 nbOfDeamonsPerThread = JaceSpawner.Instance().getNbOfDeamonsPerThread();
1526 nbOfDeamonsPerSpawner = JaceSpawner.Instance()
1527 .getNbOfDeamonsPerSpawner();
1532 for (index = debut + i * nbOfDeamonsPerThread; index < debut + i
1533 * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1534 && index < debut + nbOfDeamonsPerSpawner
1535 && index < nodes.size(); index++) {
1537 Node node = (Node) nodes.elementAt(index);
1538 JaceInterface stub = node.getStub();
1539 String name = node.getName();
1542 stub.setScanning(true);
1543 // System.out.println("modify scanning to "+name);
1545 } catch (Exception e) {
1546 System.err.println("Unable to modify scanning to " + name + ":"
1550 // for(int x=0;x<nodes.size();x++)
1551 // System.out.println(((Node)nodes.elementAt(x)).getName());
1552 // System.out.println("nbre total: "+(index-1));