--- /dev/null
+#
+# Makefile for JaceP2P plateform
+# Author: Sébastien Miquée <sebastien.miquee@univ-fcomte.fr>
+#
+
+SRC=src
+PACKAGE=jaceP2P
+BIN=bin
+JAR=JaceP2P.jar
+
+all: compile jar
+
+compile:clean
+ javac -d ./$(BIN) ./$(SRC)/$(PACKAGE)/*.java
+ rmic -d ./$(BIN) jaceP2P.JaceServer
+ rmic -d ./$(BIN) jaceP2P.JaceSuperNodeServer
+
+rmi:compile
+ rmic -d ./$(BIN) jaceP2P.JaceServer
+ rmic -d ./$(BIN) jaceP2P.JaceSuperNodeServer
+
+jar:
+ jar cvfm ./$(JAR) Manifest -C ./$(BIN) $(PACKAGE)
+
+clean:
+ rm -rf ./$(BIN)/* $(JAR)
+
+
+#
+##
+#
+
--- /dev/null
+Manifest-Version: 1.0
+Main-Class: jaceP2P.JaceP2P
+Class-path: ./JaceP2P.jar:.
\ No newline at end of file
--- /dev/null
+miquee@plop.16872:1253690040
\ No newline at end of file
--- /dev/null
+package jaceP2P;
+
+import java.util.Vector;
+
+public class Backup implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // attributes
+ private int taskRank = -1;
+ private int iteration = -1;
+ private int timeStep = -1;
+ private byte[] data = null;
+
+ // constructeurs
+ public Backup() {
+ }
+
+ public Backup(int rank) {
+ taskRank = rank;
+ }
+
+ public Backup(int rank, byte[] flux) {
+ taskRank = rank;
+ data = flux;
+ }
+
+ public Backup(int rank, int ite, byte[] flux) {
+ taskRank = rank;
+ iteration = ite;
+ data = flux;
+ }
+
+ // methods
+ public synchronized void setTaskRank(int rank) {
+ taskRank = rank;
+ }
+
+ public synchronized void setIteration(int ite) {
+ iteration = ite;
+ }
+
+ public synchronized void setData(byte[] d) {
+ data = d;
+ }
+
+ public synchronized int getTaskRank() {
+ return taskRank;
+ }
+
+ public synchronized Vector<Integer> getIterationStep() {
+ Vector<Integer> v = new Vector<Integer>();
+ v.add(iteration);
+ v.add(timeStep);
+ return v;
+ }
+
+ public synchronized int getIteration() {
+ return iteration;
+ }
+
+ public synchronized byte[] getData() {
+ return data;
+ }
+
+ public synchronized void setStep(int timeStep) {
+ this.timeStep = timeStep;
+ }
+
+ public synchronized int getStep() {
+ return timeStep;
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.util.*;
+
+public class BackupConvg implements java.io.Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public String state;
+ public boolean underTh;
+ public int verifNum;
+ public Vector<?> resp;
+ public boolean localCV_state;
+ public int nb_not_recv;
+ public int sendId;
+ public boolean electedNode;
+ public boolean respSent;
+ public String action;
+ public boolean verdict;
+ public boolean finalStep;
+ public Vector<?> neighbors;
+ public Vector<?> neighborsValues;
+ public int timeStep;
+ public LastSave lastSave;
+ public int jaceP2P_Iteration;
+ public boolean recievedVerdict;
+ public Vector<?> reduceAll;
+ public boolean initialized = false;
+
+ public BackupConvg() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public void affecter(Task sauv) {
+ sauv.underTh = underTh;
+ sauv.state = state;
+ sauv.nb_not_recv = nb_not_recv;
+ sauv.electedNode = electedNode;
+ sauv.respSent = respSent;
+ sauv.neighbors = (Vector) neighbors.clone();
+ sauv.neighborsValues = (Vector) neighborsValues.clone();
+ sauv.resp = (Vector) resp.clone();
+ sauv.verifNum = verifNum;
+ sauv.sendId = sendId;
+ sauv.finalStep = finalStep;
+ sauv.action = action;
+ sauv.verdict = verdict;
+ sauv.localCV_state = localCV_state;
+ sauv.reduceAll = reduceAll;
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.util.Vector;
+
+public class BackupsManager {
+ final int MAX_COUNT_NOT_ALIVE = 3;
+
+ // attributes
+ public static BackupsManager Instance;
+ public int myRank = -1;
+ public Vector<Backup> liste = new Vector<Backup>();
+ public Vector<Backup> liste_Convg = new Vector<Backup>();
+
+ // constructors
+ private BackupsManager() {
+ }
+
+ public static BackupsManager Instance() {
+ if (Instance == null) {
+ Instance = new BackupsManager();
+ }
+ return Instance;
+ }
+
+ // retourne le nb de Backups ds la liste)
+ public synchronized int size() {
+ return liste.size();
+ }
+
+ public synchronized void clean() {
+ for (int i = 0; i < liste.size(); i++) {
+ ((Backup) liste.get(i)).setIteration(-1);
+ ((Backup) liste.get(i)).setData(null);
+ ((Backup) liste_Convg.get(i)).setIteration(-1);
+ ((Backup) liste_Convg.get(i)).setData(null);
+ }
+ }
+
+ public synchronized void purge() {
+ liste.clear();
+ liste_Convg.clear();
+ myRank = -1;
+ Instance = null;
+ }
+
+ public synchronized int getMyRank() {
+ return myRank;
+ }
+
+ public synchronized void initialize(int req) {
+ ListeTask tskList = Register.Instance().getListeOfTasks();
+
+ TaskId t = tskList.getTaskIdOfHostStub(LocalHost.Instance().getStub());
+ if (t == null) {
+ System.out.println("no corresponding task !!!!!!!!!!!!!!!!");
+ // TODO
+ // what can we do ?????????????????
+ }
+ myRank = t.getRank();
+ int rankOfBackTask;
+
+ // get the number of backup nodes there are for each task i
+ int numBackNodes = Register.Instance().getNumBackupNeighbors();
+ System.out.println("numBackNodes : " + numBackNodes);
+ int numOfTasks = Register.Instance().getNbOfTasks();
+ System.out.println("nombre de taches=" + numOfTasks);
+ int tmp;
+ int lastBackupRank;
+ int lastBackupRankConv;
+
+ // ****************************** STEP 1 : Create the empty
+ // BackupsManager
+ // **
+
+ for (int i = 1; i <= numBackNodes; i++) {
+ // ------------ 1 - for backups "i + n" (to the right of i)
+ rankOfBackTask = (myRank + i) % numOfTasks;
+ // System.out.println("i : " + i + ", rankOfBackTask : " +
+ // rankOfBackTask);
+ Backup b_right = new Backup(rankOfBackTask);
+ addBackupTask(b_right, 0); // erase if exists so no redondancy
+ Backup b_rightConv = new Backup(rankOfBackTask);
+ addBackupTask(b_rightConv, 1); // erase if exists so no redondancy
+
+ // ------------ 2 - for backups "i - n" (to the left of i)
+ tmp = myRank - i;
+ if (tmp >= 0) {
+ rankOfBackTask = tmp % numOfTasks;
+ } else {
+ rankOfBackTask = numOfTasks - (Math.abs(tmp) % numOfTasks);
+ }
+ // System.out.println("i : " + i + ", rankOfBackTask : " +
+ // rankOfBackTask);
+ Backup b_left = new Backup(rankOfBackTask);
+ addBackupTask(b_left, 0); // erase if exists so no redondancy
+ Backup b_leftConv = new Backup(rankOfBackTask);
+ addBackupTask(b_leftConv, 1); // erase if exists so no redondancy
+ }
+
+ // ****************************** STEP 2 : get the Backup for my task
+ // ** get an eventual Backup (if there is one) to restart on me (try 3
+ // times)
+ if (req == 0) {
+ int res = -1;
+ int j = 0;
+
+ while (j < 3 && res == -1) {
+ // scan all backup Nodes to know the rank of task
+ // which Backup is the most recent for my tasks
+ lastBackupRank = getLastRemoteBackupRank(); // return -1 if no
+ // Backups
+ lastBackupRankConv = getLastRemoteBackupRankConvg();
+ // Knowing the Node which has the last (most recent) Backup for
+ // me,
+ // I get this Backup in order to restart it
+ System.out.println("I am going to get the Backup on "
+ + lastBackupRank);
+ System.out.println("I am going to get the Backup Conv on "
+ + lastBackupRankConv);
+ res = restartMyTask(lastBackupRank, lastBackupRankConv);
+ if (res != -1) {
+ System.out.println("Backup successfully got and restarted");
+ } else {
+ System.out
+ .println("FAILED to get or restart the Backup at try "
+ + j
+ + "... I retry to get a Backup on another Node ("
+ + (2 - j) + " times again)");
+ }
+ j++;
+ }
+
+ // get backups of all neighboring nodes
+ int destRank;
+ TaskId destTaskId;
+ TaskId id = Register.Instance().getListeOfTasks()
+ .getTaskIdOfHostStub(LocalHost.Instance().getStub());
+ int myRank = id.getRank();
+ JaceInterface destStub;
+ for (int i = 0; i < BackupsManager.Instance().size(); i++) {
+ destRank = BackupsManager.Instance().getBackupTaskAtIndex(i, 0)
+ .getTaskRank();
+ destTaskId = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(destRank);
+ destStub = destTaskId.getHostStub();
+ new GetBackupForNewNode(destStub, myRank).start();
+ }
+
+ } else
+ restartMyTask(-1, -1);
+ }
+
+ public class GetBackupForNewNode extends Thread {
+ JaceInterface stub;
+ int myRank;
+
+ public GetBackupForNewNode(JaceInterface destStub, int myRank) {
+ stub = destStub;
+ this.myRank = myRank;
+ }
+
+ public void run() {
+ try {
+
+ stub.getBackupForNewNode(myRank);
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ public synchronized void addBackupTask(Backup t, int tag) {
+
+ // regarder si existe cette tache ds le vecteur
+ if (tag == 0) {
+ int isIn = existBackupTaskOfRank(t.getTaskRank(), tag);
+
+ if (isIn == -1) { // si elle y est pas on l'isere
+ liste.addElement(t);
+
+ }
+
+ else { // si elle y est on remplace l'ancienne
+ liste.set(isIn, t);
+
+ }
+ } else {
+ int isIn = existBackupTaskOfRank(t.getTaskRank(), tag);
+
+ if (isIn == -1) {
+ liste_Convg.addElement(t);
+ } else
+ liste_Convg.set(isIn, t);
+ }
+ }
+
+ public int existBackupTaskOfRank(int rank, int tag) {
+ int existe = -1;
+ int index = 0;
+ if (tag == 0)
+ while ((existe == -1) && (index < liste.size())) {
+ if (rank == ((Backup) liste.get(index)).getTaskRank()) {
+ existe = index;
+ } else
+ index++;
+ }
+ else
+ while ((existe == -1) && (index < liste_Convg.size())) {
+ if (rank == ((Backup) liste_Convg.get(index)).getTaskRank()) {
+ existe = index;
+ } else
+ index++;
+ }
+ return existe;
+ }
+
+ // returns rank of the task that has my most recent Backup
+ private int getLastRemoteBackupRank() {
+ int rank;
+ TaskId task;
+ JaceInterface stub = null;
+ int ite;
+ int lastIte = -1;
+ int lastBackupRank = -1;
+ Vector<?> v;
+ int timeStep;
+ int lastStep = -1;
+ boolean ack = false;
+ int count;
+
+ // ask for ite number of Backups of all my Backup Nodes
+ for (int i = 0; i < liste.size(); i++) {
+ ite = -1;
+ timeStep = -1;
+ rank = ((Backup) liste.get(i)).getTaskRank();
+ task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank);
+ if (task == null)
+ System.out.println("la tache " + rank
+ + "n'est pas trouvee dans le registre de " + myRank);
+
+ stub = task.getHostStub();
+ if (stub == null) {
+ System.out.println("stub is null, this node is dead !!!!");
+ } else {
+ count = 0;
+ // if node not dead, try 3 times to know
+ // the ite number of its Backup for me
+ do {
+ try {
+ // TODO ???
+ // threads or not, for the invokation ?
+ count++;
+ v = stub.getIterationOfBackup(myRank, 0);
+ ite = ((Integer) v.get(0)).intValue();
+
+ timeStep = ((Integer) v.get(1)).intValue();
+ System.out.println(" data ite = " + ite + " timeStep="
+ + timeStep + " available from task " + rank);
+ ack = true;
+ } catch (Exception e) {
+ System.out.println("The Node " + task.getHostIP()
+ + " does not answer at try " + count);
+ }
+ } while ((ack == false) && (count < MAX_COUNT_NOT_ALIVE));
+ if (lastStep < timeStep) {
+ lastStep = timeStep;
+ lastIte = ite;
+ lastBackupRank = rank;
+ } else if (lastStep == timeStep && ite > lastIte) {
+ lastIte = ite;
+ lastBackupRank = rank;
+ }
+ }
+ }
+ System.out.println("last backup rank =" + lastBackupRank);
+ return lastBackupRank;
+ }
+
+ private int getLastRemoteBackupRankConvg() {
+ int rank;
+ TaskId task;
+ JaceInterface stub = null;
+ int ite;
+ int lastIte = -1;
+ int lastBackupRank = -1;
+ boolean ack = false;
+ int count;
+ Vector<?> v;
+ int timeStep;
+ int lastStep = -1;
+ // ask for ite number of Backups of all my Backup Nodes
+ for (int i = 0; i < liste_Convg.size(); i++) {
+ ite = -1;
+ timeStep = -1;
+ rank = ((Backup) liste_Convg.get(i)).getTaskRank();
+ task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank);
+ stub = task.getHostStub();
+ if (stub == null) {
+ System.out.println("stub is null, this node is dead !!!!");
+ } else {
+ count = 0;
+ // if node not dead, try 3 times to know
+ // the ite number of its Backup for me
+ do {
+ try {
+ // TODO ???
+ // threads or not, for the invokation ?
+ count++;
+ v = stub.getIterationOfBackup(myRank, 1);
+ ite = ((Integer) v.get(0)).intValue();
+ timeStep = ((Integer) v.get(1)).intValue();
+ System.out.println(" conv ite = " + ite + " timeStep="
+ + timeStep + " available from task " + rank);
+ // System.out.println("ite = " + ite +
+ // " available for task " + rank);
+ ack = true;
+ } catch (Exception e) {
+ System.out.println("The Node " + task.getHostIP()
+ + " does not answer at try " + count);
+ }
+ } while ((ack == false) && (count < MAX_COUNT_NOT_ALIVE));
+ if (lastStep < timeStep) {
+ lastStep = timeStep;
+ lastIte = ite;
+ lastBackupRank = rank;
+ } else if (lastStep == timeStep && ite > lastIte) {
+ lastIte = ite;
+ lastBackupRank = rank;
+ }
+
+ }
+ }
+ return lastBackupRank;
+ }
+
+ // return 0 if good (if I successfully got a Backup),
+ // return -1 elsewhere
+ private synchronized int restartMyTask(int lastBackupRank,
+ int lastBackupRankConv) {
+ Backup newBackup = null;
+ Backup newBackupConvg = null;
+ TaskId task;
+ JaceInterface stub = null;
+
+ // If no Backups, lastBackupRank = -1, we start the thread at beginning
+ if (lastBackupRank == -1) {
+ newBackup = null;
+ } else {
+ // get the lastBackupRank on the corresponding BackupNode
+ task = Register.Instance().getListeOfTasks().getTaskIdOfRank(
+ lastBackupRank);
+ stub = task.getHostStub();
+ try {
+ newBackup = stub.getRemoteBackup(myRank, 0);
+ // System.out.println("got new back normale for "+myRank);
+ } catch (Exception e) {
+ System.out.println("the node " + task.getHostIP()
+ + " does not answer");
+ // exit in order to get back and get a previous Backup
+ // since this one which failed was the most recent
+ return -1;
+ }
+ }
+
+ if (lastBackupRankConv == -1) {
+ newBackupConvg = null;
+ } else {
+ // get the lastBackupRank on the corresponding BackupNode
+ task = Register.Instance().getListeOfTasks().getTaskIdOfRank(
+ lastBackupRankConv);
+ stub = task.getHostStub();
+ try {
+
+ newBackupConvg = stub.getRemoteBackup(myRank, 1);
+ // System.out.println("got new back conv for "+myRank);
+ } catch (Exception e) {
+ System.out.println("the node " + task.getHostIP()
+ + " does not answer");
+ // exit in order to get back and get a previous Backup
+ // since this one which failed was the most recent
+ return -1;
+ }
+ }
+
+ // in JaceSession, clean the taskObject (Task)
+ // and the taskThread (Thread)
+ JaceSession.Instance().init();
+ // create 1 TaskLauncher that will (re)start the local task within a
+ // computing thread
+ TaskLauncher launcher = new TaskLauncher();
+ launcher.loadOrReloadTask(newBackup, newBackupConvg);
+ return 0;
+ }
+
+ public Backup getBackupTaskOfRank(int rank, int tag) {
+ int is = -1;
+ if (tag == 0) {
+ if (liste.isEmpty()) {
+ return null;
+ } else {
+ is = existBackupTaskOfRank(rank, tag);
+ if (is != -1) {
+ // System.out.println("chercher un backup normale");
+ return (Backup) liste.get(is);
+ } else {
+ System.out.println("cette tache ou ce backup existe po");
+ return null;
+ }
+ }
+ } else {
+ if (liste_Convg.isEmpty()) {
+ return null;
+ } else {
+ is = existBackupTaskOfRank(rank, tag);
+ if (is != -1) {
+ // System.out.println("chercher un backup convg");
+ return (Backup) liste_Convg.get(is);
+ } else {
+ System.out.println("cette tache ou ce backup existe po");
+ return null;
+ }
+ }
+ }
+ }
+
+ public synchronized Backup getBackupTaskAtIndex(int index, int tag) {
+ if (tag == 0)
+ if (index < liste.size()) {
+ return (Backup) liste.get(index);
+ } else {
+ System.out.println("cette task n'existe pas");
+ return null;
+ }
+ else if (index < liste_Convg.size()) {
+ return (Backup) liste_Convg.get(index);
+ } else {
+ System.out.println("cette task n'existe pas");
+ return null;
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class ForwardCount extends Thread {
+
+ public void run() {
+ SuperNodeListe.Instance().forwardCountNode();
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.net.*;
+import java.io.*;
+
+public class HandleClient extends Thread {
+ Socket s;
+ ObjectInputStream in;
+ String name;
+
+ @SuppressWarnings("static-access")
+ public HandleClient(Socket s) {
+ try {
+ this.s = s;
+ in = new ObjectInputStream(s.getInputStream());
+ ObjectOutputStream out = new ObjectOutputStream(s.getOutputStream());
+ name = s.getInetAddress().getLocalHost().getHostName();
+ Register.Instance().getNodeOfName(name).setOutputStream(out);
+ } catch (Exception e) {
+ System.err.println("error in HandleClient Constructor: " + e);
+ e.printStackTrace(System.out);
+ }
+ }
+
+ public HandleClient(Socket s, int type) {
+ try {
+ this.s = s;
+ in = new ObjectInputStream(s.getInputStream());
+ } catch (Exception e) {
+ System.out.println("error in HandleClient Constructor: " + e);
+ e.printStackTrace(System.out);
+ }
+ }
+
+ public void run() {
+ // int j=0;
+ Message msg = null;
+ try {
+ while (s.isConnected()
+ && LocalHost.Instance().getStartedThreads() == true) {
+ Object x = in.readObject();
+ if (x instanceof Message) {
+ msg = (Message) x;
+ // try{
+ // msg=(Message)in.readObject();
+ if (msg.getTimeStep() == JaceSession.Instance()
+ .getTaskObject().timeStep)
+ // System.out.println("recieved message from "+msg.getSender().getHostName()+" tag="+msg.getSrc_tag()+" iter= "+msg.getSrc_iteration()+" "+j);
+ MsgQueue.Instance().add(msg);
+ // j++;
+ }
+ }
+
+ in.close();
+ s.close();
+ } catch (Exception e) {
+ // System.out.println("error in HandleClient run method :"+e);
+ try {
+
+ in.close();
+ Register.Instance().getNodeOfName(name).getOutputStream()
+ .close();
+ Register.Instance().getNodeOfName(name).setOutputStream(null);
+ s.close();
+ } catch (Exception e1) {
+ // System.out.println("unable to close socket in HandleClient run method :"+e);
+ }
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class HeartBeatSNode extends Thread {
+
+ // atributes
+ public static HeartBeatSNode Instance;
+
+ private JaceSuperNodeInterface server = null;
+ private int beat;
+ // private long time;
+
+ int count = 0;
+
+ // constructors
+ private HeartBeatSNode() {
+ }
+
+ public static HeartBeatSNode Instance() {
+ if (Instance == null) {
+ Instance = new HeartBeatSNode();
+ }
+ return Instance;
+ }
+
+ public void setHeartTime(int timeBeat) {
+ beat = timeBeat;
+ }
+
+ public int getHeartTime() {
+ return beat;
+ }
+
+ public void setServer(JaceSuperNodeInterface serverEntity) {
+ server = serverEntity;
+ count = 0;
+ // JaceBuffer.Instance().purge();
+ // MsgQueue.Instance().purge();
+
+ }
+
+ public JaceSuperNodeInterface getServer() {
+ return server;
+ }
+
+ public void run() {
+ // long timeGiven;
+ // long begin;
+ // long end;
+ while (true) {
+ try {
+ // each "time" milisecondes, get the register if it has changed
+ Thread.sleep(beat);
+ // System.out.println("sleeping for "+beat);
+ // time = System.currentTimeMillis();
+
+ server.beating(TokenThread.Instance().getToken());
+
+ yield();
+
+ } catch (Exception e) {
+ }
+
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class HeartBeatSpawner extends Thread {
+
+ // atributes
+ public static HeartBeatSpawner Instance;
+
+ private boolean running = true;
+ private int beat;
+ // private long time;
+ private JaceSpawnerInterface spawnerStub = null;
+
+ int count = 0;
+
+ // constructors
+ private HeartBeatSpawner() {
+ }
+
+ public static HeartBeatSpawner Instance() {
+ if (Instance == null) {
+ System.out.println("creating new HeartBeatSpawner ");
+ Instance = new HeartBeatSpawner();
+ }
+ return Instance;
+ }
+
+ public void setHeartTime(int timeBeat) {
+ beat = timeBeat;
+ }
+
+ public int getHeartTime() {
+ return beat;
+ }
+
+ public void setServer(JaceSpawnerInterface serverEntity) {
+ spawnerStub = serverEntity;
+ count = 0;
+ // JaceBuffer.Instance().purge();
+ // MsgQueue.Instance().purge();
+
+ }
+
+ public JaceSpawnerInterface getServer() {
+ return spawnerStub;
+ }
+
+ public void kill() {
+ running = false;
+ Instance = null;
+ }
+
+ public void run() {
+ // long timeGiven;
+ // long begin;
+ // long end;
+ while (running) {
+ try {
+ // each "time" milisecondes, get the register if it has changed
+ Thread.sleep(beat);
+ // System.out.println("sleeping for "+beat);
+ // time = System.currentTimeMillis();
+
+ spawnerStub.beating();
+
+ yield();
+
+ } catch (Exception e) {
+ }
+
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class HeartBeatThread extends Thread {
+
+ // atributes
+ public static HeartBeatThread Instance;
+
+ private Object server = null;
+ private int beat;
+ // private long time;
+ private JaceSuperNodeInterface superNodeStub = null;
+ // private JaceInterface stub = null;
+ private boolean running = true;
+ int count = 0;
+
+ // constructors
+ private HeartBeatThread() {
+ }
+
+ public static HeartBeatThread Instance() {
+ if (Instance == null) {
+ Instance = new HeartBeatThread();
+ }
+ return Instance;
+ }
+
+ public void setHeartTime(int timeBeat) {
+ beat = timeBeat;
+ }
+
+ public int getHeartTime() {
+ return beat;
+ }
+
+ public void setServer(Object serverEntity) {
+ server = serverEntity;
+ count = 0;
+ // JaceBuffer.Instance().purge();
+ // MsgQueue.Instance().purge();
+
+ }
+
+ public void kill() {
+ System.out.println("Killing HeartBeatThread ...");
+ running = false;
+ Instance = null;
+ }
+
+ public void run() {
+ // long timeGiven;
+ // long begin;
+ // long end;
+ int count = 0;
+ while (running) {
+ try {
+ // each "time" milisecondes, get the register if it has changed
+ Thread.sleep(beat);
+ // time = System.currentTimeMillis();
+ if (server instanceof JaceSuperNodeInterface) {
+
+ superNodeStub = (JaceSuperNodeInterface) server;
+ // begin = System.currentTimeMillis();
+ superNodeStub.beating(LocalHost.Instance().getStub());
+
+ } // else {
+
+ // stub = (JaceInterface) server;
+
+ // timeGiven = stub.beating(LocalHost.Instance().getStub());
+ // }
+
+ yield();
+
+ } catch (Exception e) {
+ try {
+ if (server instanceof JaceSuperNodeInterface) {
+ System.out.println("The SuperNode is Dead : " + e);
+ LocalHost.Instance().getStub().reconnectSuperNode();
+ } else {
+ // System.out.println("The spawner is Dead : " + e);
+ // LocalHost.Instance().getStub().reconnectSuperNode();
+ System.out
+ .println("The next node maybe dead!!!! count="
+ + count);
+ count++;
+ if (count > 3)
+ try {
+ int myRank;
+ TaskId id = Register.Instance()
+ .getListeOfTasks().getTaskIdOfHostStub(
+ LocalHost.Instance().getStub());
+ myRank = id.getRank();
+ Register newReg = Register.Instance()
+ .getSpawnerStub().getRegister(myRank);
+ if (newReg != null) {
+ Register.Instance().replaceBy(newReg);
+ TaskId neighbor = Register
+ .Instance()
+ .getListeOfTasks()
+ .getTaskIdOfRank(
+ (myRank + 1)
+ % Register
+ .Instance()
+ .getListeOfTasks()
+ .getSize());
+ server = (Object) neighbor.getHostStub();
+ count = 0;
+ } else
+ System.out
+ .println("The server returned a null register oh nooooooooooooooooooo");
+ } catch (Exception e2) {
+ System.err
+ .println("Unable to contact the Spawner :"
+ + e2);
+ }
+ }
+
+ yield();
+ } catch (Exception ex) {
+ System.err.println("Cannot reconnect to the SuperNode "
+ + ex);
+ }
+ }
+ count++;
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.util.ArrayList;
+
+public class JaceBuffer {
+
+ // attributes
+ int id;
+ static int nb = 0;
+ private ArrayList<Message> liste;
+ boolean msgConsumed; // attribut utiliser ds JaceSender pr savoir si msg
+ // enlever de la liste de JaceBuffer
+ long time;
+ boolean stopGet = false;
+
+ // constructors
+
+ public JaceBuffer() {
+ /* liste=new Vector(); */
+ liste = new ArrayList<Message>();
+ msgConsumed = false;
+ time = 0;
+ id = nb;
+ nb++;
+ System.out.println("new JaceBuffer .... id=" + id + " .....");
+ }
+
+ // methods
+ public void purge() {
+
+ liste.clear();
+ }
+
+ // retourne l'index d'un Message de meme tag ET meme destinataire ET meme
+ // envoyeur que "msg"
+ private synchronized int exist(Message msg) {
+ int existe = -1;
+
+ int index = 0;
+ while ((existe == -1) && (index < liste.size())) {
+
+ if (msg.getReceiver().getRank() == (((Message) liste.get(index))
+ .getReceiver().getRank())) {
+ for (int i = 0; i < liste.size(); i++)
+ // System.out.println("exist id="+id+" element "+i+" to "+((Message)liste.get(i)).getReceiver().getRank());
+ existe = index;
+ } else
+ index++;
+ }
+
+ return existe;
+ }
+
+ public synchronized void add(Message msg) {
+ int is = -1;
+
+ synchronized (liste) {
+ is = exist(msg);
+
+ // si existe deja 1 Message de meme tag ET meme envoyeur ET meme
+ // destinataire, on l'ecrase
+ if (is != -1) {
+ liste.set(is, msg);
+
+ // System.out.println("id="+id+" remplacer un message a la place "+is+" ds le buffer pour "+msg.getReceiver().getRank()+" liste size "+liste.size());
+
+ } else {
+ // si existe pas de Message de meme tag ET meme envoyeur ET meme
+ // destinataire, on l'ajoute
+ liste.add(msg);
+ // System.out.println("id="+id+" ajouter un message au buffer pour "+msg.getReceiver().getRank()+" liste size "+liste.size());
+
+ }
+ }
+ try {
+ // notifyAll();
+ synchronized (this) {
+ notify();
+ }
+
+ } catch (Exception e) {
+ System.out.println("error notifying Sender :" + e);
+ }
+
+ }
+
+ public synchronized Message getMessageAt(int index) {
+ // System.out.println("size = " + liste.size());
+ return (Message) liste.get(index);
+ }
+
+ public synchronized Message get() {
+ msgConsumed = false;
+
+ // tant que aucun message, j'attend
+ while (liste.isEmpty() && stopGet == false) {
+ try {
+ // System.out.println("BUFFER : rien, j'attend, liste vide "+liste.isEmpty());
+ wait();
+ } catch (Exception e) {
+ }
+ ;
+ }
+ // System.out.println("Took message from Buffer");
+
+ Message tmp = null;
+ synchronized (liste) {
+
+ try {
+ tmp = (Message) ((Message) liste.get(0)).clone();
+ liste.remove(0);
+ // System.out.println("id="+id+" get message du buffer pour "+tmp.getReceiver().getRank()+" liste size "+liste.size());
+ } catch (Exception e) {
+ System.out.println("unable to get message :" + e);
+ }
+
+ }
+ msgConsumed = true;
+ time = System.currentTimeMillis();
+ return tmp;
+ }
+
+ public synchronized void viewAll() {
+ if (liste.isEmpty()) {
+ // System.out.println("pas de msg a envoyer");
+ } else {
+ Message msg;
+ TaskId sender, dest;
+ System.out.print("id=" + id + " nb msg ds JaceBuffer : "
+ + liste.size());
+ for (int i = 0; i < liste.size(); i++) {
+ msg = (Message) liste.get(i);
+ sender = msg.getSender();
+ dest = msg.getReceiver();
+ System.out.print("\nmsg " + i + " : tag = " + msg.getTag()
+ + ", src : " + sender.getRank() + " "
+ + sender.getHostIP() + ", dest : " + dest.getRank()
+ + " " + dest.getHostIP() + ", data = " + msg.getData()
+ + "\n");
+
+ }
+ }
+ }
+
+ public synchronized int getSize() {
+ return liste.size();
+ }
+
+}
--- /dev/null
+package jaceP2P;
+
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.rmi.Naming;
+
+import and.Mapping.Utils;
+
+public class JaceDaemon {
+ // attributes
+ public static JaceDaemon Instance = null;
+ private String snode_IP = null;
+ private int snode_port = 1098;
+ private int daemonPort = 1098;
+ private int heartTime; // HeartBeat frequency
+ private String protocol;
+ private boolean running = false;
+
+ public JaceDaemon(String superNode, int port, String comProtocol) {
+ if (!superNode.equals("-d")) {
+ // snode_IP = LocalHost.Instance().resolve(superNode); // get IP of
+ // the super node
+ snode_IP = superNode;
+ }
+
+ daemonPort = port;
+ protocol = comProtocol;
+ running = true;
+ Instance = this;
+
+ // initialize();
+ }
+
+ /**
+ *
+ **/
+ public JaceDaemon() {
+ }
+
+ /**
+ *
+ **/
+ public String getProtocol() {
+ return protocol;
+ }
+
+ /**
+ *
+ **/
+ public synchronized static JaceDaemon Instance() {
+ if (Instance == null) {
+ Instance = new JaceDaemon();
+ }
+
+ return Instance;
+ }
+
+ /**
+ *
+ **/
+ public boolean isRunning() {
+ return running;
+ }
+
+ /**
+ *
+ **/
+ public void initialize() {
+ LocalHost.Instance().setPort(daemonPort);
+ exportObject(); // Iinstanciate the JaceServer localy
+ reconnectSuperNode(); // Connect to one of the SuperNodes
+ if (protocol.equals("socket")) {
+ listenForRequests();
+ }
+
+ /*
+ * System.out.println("sleep de 5 secondes avt tuer Daemon"); try {
+ * Thread.sleep(5000); } catch(Exception e1) {}
+ *
+ *
+ * sortir();
+ */
+ }
+
+ /**
+ *
+ **/
+ public void listenForRequests() {
+ ServerSocket ss = null;
+
+ try {
+
+ ss = new ServerSocket(LocalHost.Instance().getSocketPort());
+
+ } catch (Exception e) {
+ System.err.println("Error initializing ServerSocket: " + e);
+ }
+
+ while (true) {
+ Socket s = null;
+ try {
+ // System.out.println( "Nouveau message" ) ;
+ s = ss.accept();
+ new HandleClient(s).start();
+ } catch (Exception e) {
+ System.err.println("Error handling Client: " + e);
+ }
+ }
+ }
+
+ /**
+ *
+ **/
+ private void exportObject() {
+ // if( protocol.equals( "rmi" ) ) {
+ JaceInterface ref = null;
+ JaceServer myServer = null;
+
+ //System.out.println("Name of local machine is : "
+ // + LocalHost.Instance().getName());
+ //System.out.println("IP of local machine is : "
+ // + LocalHost.Instance().getIP());
+
+ try {
+ // launch the JaceServer
+ myServer = new JaceServer( /* this */);
+ java.rmi.registry.LocateRegistry.createRegistry(daemonPort);
+ java.rmi.registry.LocateRegistry.getRegistry(daemonPort).rebind(
+ "JaceServer", myServer);
+ ref = (JaceInterface) Naming.lookup("rmi://"
+ + LocalHost.Instance().getIP() + ":" + daemonPort
+ + "/JaceServer");
+ } catch (Exception e) {
+ System.err
+ .println("JaceP2P_Error in JaceRuntime.exportObject() when creating the local JaceServer "
+ + e);
+ System.err.println("Exit from JaceRuntime.exportObject");
+ System.exit(1);
+ }
+
+ LocalHost.Instance().setStub(ref);
+ // }
+ }
+
+ /**
+ *
+ **/
+ @SuppressWarnings("unused")
+ private void sortir() {
+ if (protocol.equals("rmi")) {
+ try {
+ java.rmi.registry.LocateRegistry.getRegistry(daemonPort)
+ .unbind("JaceServer");
+ //System.out.println("Unbind done !!!!!!!!!!!1");
+ } catch (Exception e) {
+ System.err
+ .println("JaceP2P_Error in JaceRuntime.sortir() when unbind "
+ + e);
+ }
+ }
+
+ }
+
+ /**
+ *
+ **/
+ public void reconnectSuperNode() {
+
+ //System.out.println("I'm looking for a JaceP2P Super Node");
+ //System.out.println(protocol);
+ // if( protocol.equals( "rmi" ) ) {
+ JaceSuperNodeInterface snodeStub = null;
+ boolean connected = false;
+
+ // while( connected == false ) {
+ if (snode_IP != null) {
+ try {
+ //System.out.println("Trying to invoke Super Node " + snode_IP);
+ snodeStub = (JaceSuperNodeInterface) Naming.lookup("rmi://"
+ + snode_IP + ":" + snode_port + "/JaceSuperNode");
+ //System.out.println("Succesfully located " + snode_IP);
+
+ // Add stub and IP in LocalHost to store it until super node
+ // death
+ LocalHost.Instance().setSuperNodeStub(snodeStub);
+ LocalHost.Instance().setSuperNodeIP(snode_IP);
+ connected = true;
+
+ } catch (Exception e) {
+ //System.out
+ // .println("Snode not launched, try another one (1/2s)");
+ try {
+ Thread.sleep(500);
+ } catch (Exception e1) {
+ // nothing
+ }
+ }
+ }
+
+ if (connected == false) {
+ int i = 0;
+
+ SuperNodeListe.Instance().staticInitialization();
+
+ while (connected == false
+ && i < SuperNodeListe.Instance().getListe().size()) {
+ SuperNodeData d = null;
+ d = SuperNodeListe.Instance().getSuperNodeData(i);
+ snode_IP = LocalHost.Instance().resolve(d.getIP());
+ snode_port = d.getPort();
+
+ try {
+ //System.out.println("Trying to invoke super node "
+ // + snode_IP);
+ snodeStub = (JaceSuperNodeInterface) Naming.lookup("rmi://"
+ + snode_IP + ":" + snode_port + "/JaceSuperNode");
+ // System.out.println( "succesfully located " + snode_IP ) ;
+
+ // Add stub and IP in LocalHost to store it until super node
+ // death
+ LocalHost.Instance().setSuperNodeStub(snodeStub);
+ LocalHost.Instance().setSuperNodeIP(snode_IP);
+ connected = true;
+
+ } catch (Exception e) {
+ //System.out.println("Snode " + snode_IP
+ // + " not launched, try another one (1/2s)");
+ i++;
+
+ try {
+ Thread.sleep(500);
+ } catch (Exception e1) {
+ // nothing
+ }
+
+ // If error, exit the loop and reenter in order to find
+ // another Snode
+ // continue ;
+ }
+ }
+ }
+
+ if (connected == false) {
+ System.err
+ .println("All the SuperNodes in the list are dead, unable to connect to the platform");
+ System.exit(1);
+ } else {
+
+ // Registering to the Super Node
+ try {
+ heartTime = snodeStub.getSuperNodeBeat();
+
+ // snodeStub.workerRegistering( LocalHost.Instance().getStub(),
+ // LocalHost.Instance().getIP(),
+ // LocalHost.Instance().getName(),
+ // daemonPort ) ;
+
+ snodeStub.workerRegistering(LocalHost.Instance().getStub(),
+ LocalHost.Instance().getIP(), LocalHost.Instance()
+ .getName(), daemonPort, Utils.createGNode());
+
+ // Launching the heart beat thread
+ HeartBeatThread.Instance().setHeartTime(heartTime);
+
+ //System.out.println("Starting to ping the Super Node");
+ HeartBeatThread.Instance().setServer((Object) snodeStub);
+
+ if (HeartBeatThread.Instance().isAlive() == false) {
+ // HeartBeatThread.Instance().setPriority(Thread.MAX_PRIORITY);
+ HeartBeatThread.Instance().start(); // Starting the beating
+ }
+
+ //System.out.println("Succesfully registered to Super Node "
+ // + snode_IP);
+
+ } catch (Exception e) {
+ System.err
+ .println("JaceP2P_Error in JaceDamon reconnectSuperNode when invoking function workerRegistering() : "
+ + e);
+ reconnectSuperNode();
+ }
+ }
+ // }
+ // }
+ }
+
+ /**
+ *
+ **/
+ public void reinitDaemon() {
+
+ if (JaceDaemon.Instance().getProtocol().equals("rmi")) {
+ SenderRmi.Instance().getBuffer().stopGet = true;
+
+ try {
+ SenderRmi.Instance().notify();
+ } catch (Exception e) {
+ // nothing
+ }
+
+ SenderRmi.Instance().kill();
+
+ } else {
+ SenderSocket.Instance().getBuffer().stopGet = true;
+
+ try {
+ SenderSocket.Instance().notify();
+ } catch (Exception e) {
+ // nothing
+ }
+
+ SenderSocket.Instance().kill();
+ }
+
+ //System.out.println("$$$$$$$$$$ Killed Sender $$$$$$$$$$");
+ ScanThread.Instance().kill();
+ LocalHost.Instance().setStartedThreads(false);
+ System.out.println("Reinitialization of the Daemon");
+
+ System.out.println("I kill the application if any exists");
+ // ScanThread.Instance().setScanning( false ) ;
+ // System.out.println( "Set ScanThread off" ) ;
+ // Sender.Instance().kill() ;
+
+ try {
+ Thread.sleep(2500);
+ } catch (Exception e) {
+ // nothing
+ }
+
+ // Sender.Instance().running = false ;
+ try {
+ // Cleaning JaceBuffer, MsgQueue, Register
+ // and deleting taskObject and taskThread
+ JaceSession.Instance().kill(); // Also to do when a node has been
+ // down, in the case he is back
+ System.out.println("Application killed properly");
+ BackupsManager.Instance().purge();
+ Register.Instance().purge();
+
+ } catch (Exception e) {
+ System.err.println("Crashed killing the application : " + e);
+ }
+
+ System.out.println("Daemon reinitialized");
+ // Runtime.getRuntime().gc() ;
+ reconnectSuperNode();
+ }
+
+}
+
+/** ! **/
+
--- /dev/null
+package jaceP2P;
+
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+import java.util.Vector;
+
+
+
+public interface JaceInterface extends Remote {
+ public void reconnectSuperNode() throws RemoteException;
+
+ public int updateRegister(Register newReg, JaceInterface stub, int req)
+ throws RemoteException;
+
+ public Vector<?> getIterationOfBackup(int remoteRank, int tag)
+ throws RemoteException;
+
+ public Backup getRemoteBackup(int remoteRank, int tag)
+ throws RemoteException;
+
+ public void suicide(String debugMsg) throws RemoteException;
+
+ public void iSendYou(Message msg) throws RemoteException;
+
+ public int getTimeStep() throws RemoteException;
+
+ public void saveTask(int rank, byte[] tsk, int iteration, int timeStep,
+ String appliName, int tag) throws RemoteException;
+
+ public void setSaved(boolean bool) throws RemoteException;
+
+ public boolean getReloading() throws RemoteException;
+
+ public int getVerifNum() throws RemoteException;
+
+ public String getState() throws RemoteException;
+
+ public void initializeVerif(int tag) throws RemoteException;
+
+ public void savOrFinOrRest(int tag, int step, boolean verd,
+ Vector<?> reduceAll) throws RemoteException;
+
+ public boolean setNbNeighboursNotConv(int tag, int idNeigh,
+ int neighborTimeStep) throws RemoteException;
+
+ public int getNbNeighboursNotConv() throws RemoteException;
+
+ public void response(int neighId, int tag, int response,
+ Vector<?> recievedValue) throws RemoteException;
+
+ public boolean ping() throws RemoteException;
+
+ public void updateHeart(JaceInterface stub) throws RemoteException;
+
+ public void updateHeart(JaceSuperNodeInterface stub) throws RemoteException;
+
+ public long beating(JaceInterface stub) throws RemoteException;
+
+ public void setScanning(boolean bool) throws RemoteException;
+
+ public JaceSpawnerInterface transformIntoSpawner(String[] params,
+ String appliName, Register reg, int nbTasks,
+ JaceSuperNodeInterface snodeStub, int rank, int heartTime, int tag,
+ int nbdc, int nbsdc, int nbDaemonPerSpawner, int nbDaemonPerThread)
+ throws RemoteException;
+
+ public void setSpawner(JaceSpawnerInterface spawnerStub)
+ throws RemoteException;
+
+ public void updateRegister(Node oldNode, Node node) throws RemoteException;
+
+ public void getBackupForNewNode(int rank) throws RemoteException;
+}
--- /dev/null
+// Primary class to launch JaceP2P
+
+// jaceP2P.JaceP2P -[Daemon|Spawner|SNode] <Option>
+
+package jaceP2P;
+
+public class JaceP2P {
+
+ static String[] param;
+
+ public static void main(String[] args) {
+
+ if (args.length < 4) {
+ usage();
+ } else {
+ String entity = args[0]; // entity : "-Daemon" or "-Spawner" or
+ // "-SNode"
+ int port = -1;
+
+ String superNodeName = args[1];
+ String comProtocol = args[3];
+ try {
+ port = new Integer(args[2]).intValue();
+
+ } catch (NumberFormatException e) {
+ System.out.println("the port must be an integer: " + e);
+ System.exit(0);
+ }
+ if (entity.equals("-Daemon")) {
+ System.out
+ .println("\n*************** LAUNCHING ---- DAEMON *****");
+ JaceDaemon daemon = new JaceDaemon(superNodeName, port,
+ comProtocol);
+ daemon.initialize();
+ } else if (entity.equals("-Spawner")) {
+ if (args.length > 9) {
+ System.out
+ .println("\n*************** LAUNCHING ---- SPAWNER *******************");
+ int nbDaemonPerSpawner = -1;
+ int nbDaemonPerThread = -1;
+ int nbSavingNodes = -1;
+ int algoMapping = -1;
+ double paramAlgo = 0.5 ;
+
+ try {
+ nbDaemonPerSpawner = new Integer(args[4]).intValue();
+ nbDaemonPerThread = new Integer(args[5]).intValue();
+ nbSavingNodes = new Integer(args[6]).intValue();
+ algoMapping = new Integer(args[7]).intValue();
+ paramAlgo = new Double(args[8]).doubleValue();
+ } catch (NumberFormatException e) {
+ System.out.println("the number of Daemons per spawner and the number of daemons per thread must be integers: "
+ + e);
+ System.exit(0);
+ }
+ //System.out.println("=====> " + algoMapping);
+ param = new String[args.length - 9];
+ for (int i = 9; i < args.length; i++) {
+ param[i - 9] = args[i];
+ System.out.println("=> " + args[i]);
+ }
+ JaceSpawner spawner = new JaceSpawner(superNodeName, port,
+ comProtocol, param, nbDaemonPerSpawner,
+ nbDaemonPerThread, nbSavingNodes, algoMapping, paramAlgo);
+ spawner.initialize();
+ }
+
+ else
+ usage();
+
+ } else if (entity.equals("-SNode"))
+ if (args.length > 4) {
+ System.out
+ .println("\n*************** LAUNCHING ---- SUPER-NODE *******************");
+ int beat = -1;
+ try {
+ beat = new Integer(args[4]).intValue();
+ } catch (NumberFormatException e) {
+ System.out
+ .println("the beat number must be an integer: "
+ + e);
+ System.exit(0);
+ }
+ JaceSuperNode superNode = new JaceSuperNode(superNodeName,
+ port, comProtocol, beat);
+ superNode.initialize();
+ } else
+ usage();
+
+ else
+ usage();
+
+ }
+ }
+
+ public static void usage() {
+ System.out
+ .println("Usage: java jaceP2P.JaceP2P -[Daemon|Spawner|SNode] <Option> ");
+ System.out.println("");
+ System.out
+ .println("-Daemon : <option> = [superNodeName] [daemonPort] [protocol]");
+ System.out
+ .println("ex : java jaceP2P.JaceP2P -Daemon cluster1 1098 rmi");
+ System.out.println("");
+ System.out
+ .println("-SNode : <option> = [superNodeName] [superNodePort] [protocol] [timeHeartBeat]");
+ System.out
+ .println("ex : java jaceP2P.JaceP2P -SNode cluster1 1098 rmi 500");
+ System.out.println("");
+ System.out
+ .println("-Spawner : <option> = [superNodeName] [SpawnerPort] [protocol] nbDaemonPerSpawner nbDaemonPerThread nbresave nbTask URLToTaskName <TaskParam>");
+ System.out
+ .println("ex : java jaceP2P.JaceP2P -Spawner cluster1 1098 rmi 8 4 3 16 http://bilbo/staff/vuillemi/java/LinSolv2 1000 0 20000 5");
+ System.out
+ .println("ex : java jaceP2P.JaceP2P -Spawner cluster1 1098 rmi 8 4 3 16 file:/home/vuillemi/java/LinSolv2 1000 0 20000 5");
+ System.exit(0);
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.Vector;
+
+public class JaceServer extends UnicastRemoteObject implements JaceInterface {
+
+ private static final long serialVersionUID = 1L;
+
+ public JaceServer() throws RemoteException {
+ super();
+ }
+
+ public void reconnectSuperNode() throws RemoteException {
+ JaceDaemon.Instance().reconnectSuperNode();
+ }
+
+ // when a daemon replaces a new one it asks for the backups
+ public void getBackupForNewNode(int rank) throws RemoteException {
+ JaceSession.Instance().getTaskObject().getBackupForNewNode(rank);
+ }
+
+ public void setSpawner(JaceSpawnerInterface spawnerStub)
+ throws RemoteException {
+ Register.Instance().setSpawnerStub(spawnerStub);
+ }
+
+ public JaceSpawnerInterface transformIntoSpawner(String[] params,
+ String appliName, Register reg, int nbTasks,
+ JaceSuperNodeInterface snodeStub, int rank, int heartTime, int tag,
+ int nbdc, int nbsdc, int nbDaemonPerSpawner, int nbDaemonPerThread)
+ throws RemoteException {
+ System.out.println("beginning the transformation .........");
+// @SuppressWarnings("unused")
+// JaceSpawner spawner = new JaceSpawner(params, appliName, reg, nbTasks,
+// snodeStub, rank, heartTime, tag, nbdc, nbsdc,
+// nbDaemonPerSpawner, nbDaemonPerThread);
+ HeartBeatThread.Instance().kill();
+ return Register.Instance().getSpawnerStub();
+ }
+
+ public synchronized int updateRegister(Register newReg,
+ JaceInterface voisinStub, int req) throws RemoteException {
+
+ // If beginning of appli, tell nodes to beat the next neighbor
+ // and no longer the SuperNode
+
+ System.out.println("I change to ping a Daemon");
+ HeartBeatThread.Instance().setServer((Object) voisinStub);
+
+ Calendar cal = new GregorianCalendar();
+ System.out.println("at time=" + cal.get(Calendar.MINUTE) + ":"
+ + cal.get(Calendar.SECOND));
+ System.out.println("name of spawner: "
+ + newReg.getSpawnerStub().getName());
+ // 1 - replace the old Register by the new one
+ System.out.println(" \n\n NEW REGISTER \n\n");
+ // if(Register.Instance().getVersion()<=newReg.getVersion()){
+ System.out.println("replacing Register !!!!!!$$$$$$$$******");
+ Register.Instance().replaceBy(newReg);
+ Register.Instance().viewAll();
+ // }
+
+ // initialise the BackupsManager if not exists (i.e. myRank = -1)
+ // which means that not init so first reload
+ // try {
+ // if (BackupsManager.Instance().getMyRank() == -1) {
+ // 1 - create the BackupsManager
+ // 2 - get an eventual Backup for my Task
+ // 3 - restart it if any
+ // BackupsManager.Instance().initialize();
+
+ new InitiateAppli(req).start();
+ // }
+ return 0;
+ }
+
+ public void updateRegister(Node oldNode, Node node) throws RemoteException {
+
+ // if(tag==1)
+ // HeartBeatThread.Instance().setServer((Object)node.getStub());
+ // 1 - replace the old Register by the new one
+ Calendar cal = new GregorianCalendar();
+ System.out.println("at time=" + cal.get(Calendar.MINUTE) + ":"
+ + cal.get(Calendar.SECOND));
+ System.out.println("Modify REGISTER");
+ try {
+ // System.out.println("blablablablabalablablabalbalab");
+ System.out.println("oldName=" + oldNode.getName());
+ boolean b = Register.Instance().removeNodeOfName(oldNode.getName());
+ if (b)
+ System.out.println("removed old Node!!!!!");
+ else
+ System.out.println("didn't find old Node!!!!");
+
+ Register.Instance().addNode(node);
+ Register.Instance().viewAll();
+ TaskId myTaskId = null;
+ myTaskId = Register.Instance().getListeOfTasks()
+ .getTaskIdOfHostStub(oldNode.getStub());
+ myTaskId.setHostIP(node.getIP());
+ myTaskId.setHostName(node.getName());
+ // Register.Instance().setVersion(version);
+ myTaskId.setHostStub(node.getStub());
+
+ } catch (Exception e) {
+ System.out.println("error in updateregister :" + e);
+ // e.printStackTrace();
+ }
+ }
+
+ public Vector<?> getIterationOfBackup(int remoteRank, int tag)
+ throws RemoteException {
+ Backup b = BackupsManager.Instance().getBackupTaskOfRank(remoteRank,
+ tag);
+ Vector<?> result = b.getIterationStep();
+ return result;
+ }
+
+ public Backup getRemoteBackup(int remoteRank, int tag)
+ throws RemoteException {
+ Backup b = BackupsManager.Instance().getBackupTaskOfRank(remoteRank,
+ tag);
+ return b;
+ }
+
+ public void suicide(String debugMsg) throws RemoteException {
+ System.out.println("suiciiiiiiiiiiiide : "/* + debugMsg */);
+ new ReinitDaemon().start();
+ }
+
+ public void iSendYou(Message msg) throws RemoteException {
+ // on met le Message ds le MsgQueue correspondant ma tache
+ // DS CAS ASYNC, on supprime un msg de meme tag ET meme sender
+ // System.out.println("get msg from MSGQueue sent from "+msg.getSender().getRank());
+ MsgQueue.Instance().add(msg);
+ }
+
+ public int getTimeStep() throws RemoteException {
+ return JaceSession.Instance().getTaskObject().timeStep;
+ }
+
+ public void saveTask(int rank, byte[] tsk, int iteration, int timeStep,
+ String appliName, int tag) throws RemoteException {
+
+ while (JaceSession.Instance().getTaskObject().reloading == true)
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ }
+ Backup back = BackupsManager.Instance().getBackupTaskOfRank(rank, tag);
+ if (back != null) {
+ if (back.getStep() < timeStep
+ || (back.getStep() == timeStep && back.getIteration() < iteration)) {
+ back.setIteration(iteration);
+ // System.out.print("\n\n they put in me a backup with iter="+back.getIteration()+" for the node of rank="+rank+"\n\n");
+ back.setStep(timeStep);
+ back.setData(tsk);
+
+ /*
+ * try{
+ *
+ * TaskId recev=null; recev =
+ * Register.Instance().getListeOfTasks().getTaskIdOfRank(rank);
+ * JaceInterface stub; stub=recev.getHostStub();
+ * stub.setSaved(true);
+ *
+ * }catch(Exception e){
+ * System.out.println("unable to acknowledge receiving the backup :"
+ * +e); }
+ */
+ }
+ } else {
+ System.out.println("No task at this rank");
+ // TODO : what to do ?
+ }
+
+ }
+
+ public void setSaved(boolean bool) throws RemoteException {
+ JaceSession.Instance().getTaskObject().setSaved(bool);
+ }
+
+ public boolean getReloading() throws RemoteException {
+ return JaceSession.Instance().getTaskObject().reloading;
+ }
+
+ public int getVerifNum() throws RemoteException {
+ return JaceSession.Instance().getTaskObject().verifNum;
+ }
+
+ public String getState() throws RemoteException {
+ return JaceSession.Instance().getTaskObject().state;
+ }
+
+ public void initializeVerif(int tag) throws RemoteException {
+ JaceSession.Instance().getTaskObject().initializeVerif(tag);
+ }
+
+ public void savOrFinOrRest(int tag, int step, boolean verd,
+ Vector<?> recievedValue) throws RemoteException {
+ JaceSession.Instance().getTaskObject().savOrFinOrRest(tag, step, verd,
+ recievedValue);
+ }
+
+ public synchronized boolean setNbNeighboursNotConv(int tag, int idNeigh,
+ int neighborTimeStep) throws RemoteException {
+ return JaceSession.Instance().getTaskObject().setNbNeighboursNotConv(
+ tag, idNeigh, neighborTimeStep);
+ }
+
+ public synchronized int getNbNeighboursNotConv() throws RemoteException {
+ return JaceSession.Instance().getTaskObject().nb_not_recv;
+ }
+
+ public synchronized void response(int neighId, int tag, int response,
+ Vector<?> recievedValue) throws RemoteException {
+ JaceSession.Instance().getTaskObject().response(neighId, tag, response,
+ recievedValue);
+ }
+
+ public boolean ping() throws RemoteException {
+ // System.out.println("pingggggggggggg");
+ return true;
+ }
+
+ public void updateHeart(JaceInterface stub) throws RemoteException {
+ System.out.println("I change to ping a Daemon");
+ HeartBeatThread.Instance().setServer((Object) stub);
+ }
+
+ public void updateHeart(JaceSuperNodeInterface stub) throws RemoteException {
+ System.out.println("I change to ping a superNode");
+ HeartBeatThread.Instance().setServer((Object) stub);
+ }
+
+ public long beating(JaceInterface workerStub) throws RemoteException {
+ Node noeud = Register.Instance().getNodeOfStub(workerStub);
+ if (noeud != null) {
+ noeud.setAliveFlag(true);
+ noeud.setAliveTime();
+ // System.out.println("beating$$$$$$$$$");
+ return noeud.getAliveTime();
+
+ } else {
+ // System.out.println("le noeud " + workerIP +
+ // " est pas ds la liste des noeuds connectes");
+ return -1;
+ }
+ }
+
+ public void setScanning(boolean bool) throws RemoteException {
+ // if(bool==true){
+ try {
+ ScanThread.Instance().setScanning(bool);
+ synchronized (ScanThread.Instance()) {
+ ScanThread.Instance().notify();
+ }
+ } catch (Exception e) {
+ System.out.println("error in setScanning: " + e);
+ }
+ // System.out.println("before notify!!!!!!!!!");
+ // try{ScanThread.Instance().notify();}
+ // catch(Exception e){
+ // System.out.println("error in notify:"+e);
+ // }
+ // System.out.println("notify!!!!!!!!!");
+ // }
+ // else
+ // ScanThread.Instance().setScanning(false);
+ }
+
+ class InitiateAppli extends Thread {
+ int req;
+
+ public InitiateAppli(int req) {
+ this.req = req;
+ }
+
+ public void run() {
+ BackupsManager.Instance().initialize(req);
+
+ }
+ }
+
+ class ReinitDaemon extends Thread {
+
+ public ReinitDaemon() {
+ }
+
+ public void run() {
+ JaceDaemon.Instance().reinitDaemon();
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class JaceSession {
+
+ // attributes
+ private static JaceSession Instance;
+ private/* static */Task taskObject = null;
+ private/* static */Thread taskThread = null;
+
+ // constructors
+ private JaceSession() {
+ }
+
+ public static JaceSession Instance() {
+ if (Instance == null) {
+ Instance = new JaceSession();
+ }
+ return Instance;
+ }
+
+ // nettoie les files de comm si pas vides
+ public synchronized void init() {
+ // JaceBuffer.Instance().purge();
+ // MsgQueue.Instance().purge();
+ taskObject = null;
+ taskThread = null;
+ }
+
+ public synchronized void addTaskObject(Task obj) {
+ taskObject = obj;
+ }
+
+ public synchronized Task getTaskObject() {
+ return taskObject;
+ }
+
+ public void addTaskThread(Thread th) {
+ taskThread = th;
+ }
+
+ public Thread getTaskThread() {
+ return taskThread;
+ }
+
+ @SuppressWarnings("deprecation")
+ public void deleteTaskThread() {
+ try {
+ // not to put this because it crashes !!!!
+ // taskThread.yield();
+ // taskThread.interrupt();
+ System.out.println("I m going to kill the thread");
+ // taskThread.join();
+ taskThread.stop();
+ System.out.println("thread stopped without pb!!!!!!!");
+ taskThread = null;
+ } catch (Exception e) {
+ System.out.println("ERROR : thread not join : " + e);
+ System.out.println("I m going to kill the thread");
+ try {
+ taskThread.stop();
+ System.out.println("thread stopped after a problem !!!!!!!");
+ taskThread = null;
+ } catch (Exception e2) {
+ System.out.println("ERROR thread not killed : " + e);
+ }
+
+ }
+ }
+
+ // clean JaceBuffer, MsgQueue, Register
+ // and delete taskObject and taskThread
+ public synchronized void kill() {
+ // JaceBuffer.Instance().purge();
+ MsgQueue.Instance().purge();
+
+ Register.Instance().purge();
+
+ taskObject = null;
+ deleteTaskThread();
+ System.out.println("I SET MY TASK at NUULL");
+
+ System.out.println("Size of MsgQueue : "
+ + MsgQueue.Instance().getSize());
+
+ Instance = null;
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.rmi.Naming;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.Vector;
+
+public class JaceSpawner {
+ private Class<?> c;
+ private Loader load;
+ private Task tache = null;
+ public static JaceSpawner Instance;
+ private static String superNode_IP = null;
+ private int superNode_port = 1098;
+ private static int spawnerPort = 1099;
+ private static JaceSuperNodeInterface centralServer = null;
+ private JaceSpawnerInterface spawnerRef = null;
+ private int nbTasks;
+ private String appliName;
+ private String[] params = null;
+ @SuppressWarnings("unused")
+ private String protocol;
+ // private int registerVersion=0;
+ final int NB_HEART_DECONNECT = 3;
+ private int heartTime; // frequency of heartBeat
+ @SuppressWarnings("unused")
+ private int timeBeforeKill; // wait 3 non-response of heartBeat before
+ // considering de Daemon as dead
+ private boolean broadcasting = false;
+ @SuppressWarnings("unused")
+ private int z = 0;
+ private static int nbOfDaemonsPerSpawner;
+ private static int nbOfDeamonsPerThread;
+ private Vector<Object> spawnersList;
+ private int rank;
+ private int nbSavingNodes;
+
+ // Variables for Mapping
+ private int algo;
+ private double paramAlgo ;
+
+ public JaceSpawner(String superNode, int port, String comProtocol,
+ String[] args, int nbDaemonPerSpawner, int nbDaemonPerThread,
+ int nbSavingNodes, int _algo, double _paramAlgo) {
+ // superNode_IP = LocalHost.Instance().resolve(superNode);
+ algo = _algo;
+ paramAlgo = _paramAlgo ;
+
+ superNode_IP = superNode;
+ spawnerPort = port;
+ protocol = comProtocol;
+ nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
+ nbOfDeamonsPerThread = nbDaemonPerThread;
+ this.nbSavingNodes = nbSavingNodes;
+ if (args.length < 2)
+ {
+ // if less than 2 params (nb of tasks and name of the appli), error
+ System.err.println( "Parameters error !" ) ;
+ System.exit( 1 ) ;
+ } else {
+ try {
+ nbTasks = new Integer(args[0]).intValue(); // nb of tasks
+ // launched by the
+ // spawner
+ } catch (Exception e) {
+ System.err.println("Number format exception :" + e ) ;
+ System.exit( 1 ) ;
+ }
+ appliName = args[1]; // name of the class to launch
+ if (args.length > 2) { // get the eventual param of the appli
+ params = new String[args.length - 2];
+ for (int i = 0; i < params.length; i++) {
+ params[i] = args[2 + i];
+ }
+ }
+ load = new Loader();
+ c = load.load(appliName);
+ try {
+ tache = ((Task) c.newInstance());
+ tache.setParam(params);
+ tache.setJaceSize(nbTasks);
+
+ // ****************//
+ //tache.printDep();
+ } catch (Exception e) {
+ System.err.println( "Unable to instantiate the class " + e ) ;
+ System.exit( 1 ) ;
+ }
+ }
+
+ Instance = this;
+ }
+
+
+
+ public JaceSpawner(String[] params, String appliName, Register reg,
+ int nbTasks, JaceSuperNodeInterface snodeStub, int rank,
+ int heartTime, int tag, int nbdc, int nbsdc,
+ int nbDaemonPerSpawner, int nbDaemonPerThread) {
+ try {
+ nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
+ nbOfDeamonsPerThread = nbDaemonPerThread;
+ if (params.length != 0) {
+ this.params = new String[params.length];
+ for (int i = 0; i < params.length; i++)
+ this.params[i] = params[i];
+ } else {
+ params = null;
+ System.err.println( "There is no parameter !" ) ;
+ }
+ } catch (Exception e) {
+ System.err.println("Error in copying the parameters: " + e ) ;
+ }
+ // System.out.println("xxxxxxxxxxxxxxx reg size="+reg.getSize()+" xxxxxxxxxxxxxx");
+ this.appliName = appliName;
+
+ this.nbTasks = nbTasks;
+ this.heartTime = heartTime;
+ LocalHost.Instance().setSuperNodeStub(snodeStub);
+ centralServer = snodeStub;
+ exportObject();
+ Register.Instance().replaceBy(reg);
+ Register.Instance().setSpawnerStub(this.spawnerRef);
+ Register.Instance().getListeOfTasks().viewAll();
+
+ this.rank = rank;
+ load = new Loader();
+ c = load.load(appliName);
+ try {
+ tache = ((Task) c.newInstance());
+ tache.setParam(params);
+ tache.setJaceSize(nbTasks);
+ // ****************//
+ tache.printDep();
+ } catch (Exception e) {
+ System.err.println("Unable to instantiate the class " + e);
+ }
+ RunningApplication.Instance().getChrono().start();
+
+ RunningApplication.Instance().setName(appliName);
+ RunningApplication.Instance().setNbTasks(nbTasks);
+ RunningApplication.Instance().setRunning(true);
+ RunningApplication.Instance().setNumberOfDisconnections(nbdc);
+ RunningApplication.Instance().setNumberOfSpawnerDisconnections(nbsdc);
+ // System.out.println("+++++++++++++++++++++++++");
+ Instance = this;
+ // if(tag==0)
+ broadcastRegister(1);
+ /*
+ * else{ int
+ * x=Register.Instance().getListeOfTasks().getSize()/nbOfDaemonsPerSpawner
+ * ; int s; if(rank==x)
+ * s=(reg.getListeOfTasks().getSize()%nbOfDaemonsPerSpawner
+ * )/nbOfDeamonsPerThread; else
+ * s=nbOfDaemonsPerSpawner/nbOfDeamonsPerThread;
+ *
+ * int debut=nbOfDaemonsPerSpawnerrank;
+ *
+ *
+ * for(int i=0;i<s+1;i++){
+ *
+ * new BroadcastSpawner(i,
+ * debut,nbOfDaemonsPerSpawner,nbOfDeamonsPerThread).start(); }
+ *
+ * }
+ */
+ System.out.println("########################");
+ }
+
+ public synchronized static JaceSpawner Instance() {
+ return Instance;
+ }
+
+ public int getNbOfDeamonsPerThread() {
+ return nbOfDeamonsPerThread;
+ }
+
+ public int getNbOfDeamonsPerSpawner() {
+ return nbOfDaemonsPerSpawner;
+ }
+
+ public void startProcess(Vector<Object> spawnersList) {
+ this.spawnersList = spawnersList;
+ int is = spawnersList.indexOf((Object) Register.Instance()
+ .getSpawnerStub());
+ if (is != -1) {
+ int nextNeighbour;
+ if (is == spawnersList.size() - 1)
+ nextNeighbour = 0;
+ else
+ nextNeighbour = is + 1;
+ /*
+ * while((spawnersList.elementAt(nextNeighbour) instanceof Node))
+ * try{
+ * System.out.println("waiting till transform of spawner "+nextNeighbour
+ * +" is finished, for setServer"); Thread.sleep(20);
+ * }catch(Exception e1){}
+ */
+ HeartBeatSpawner.Instance().setServer(
+ (JaceSpawnerInterface) spawnersList.get(nextNeighbour));
+ HeartBeatSpawner.Instance().setHeartTime(heartTime);
+ HeartBeatSpawner.Instance().start();
+ int previousNeighbour;
+ if (is == 0)
+ previousNeighbour = spawnersList.size() - 1;
+ else
+ previousNeighbour = is - 1;
+ ScanThreadSpawner.Instance().setHeartTime(heartTime);
+ ScanThreadSpawner.Instance().setServer(
+ (JaceSpawnerInterface) spawnersList.get(previousNeighbour));
+ ScanThreadSpawner.Instance().start();
+
+ broadcastScanning();
+// System.out.println("apres broadcastScanning");
+ new StartScanning().start();
+ } else {
+ System.err.println("Cannot find myself in the spawnersList !");
+ }
+
+ }
+
+ public void setBroadcasting(boolean bool) {
+ broadcasting = bool;
+ }
+
+ public void initialize() {
+ // if(protocol.equals("rmi")){
+ // launch the JaceSpawnerServer
+ exportObject();
+
+ connectSuperNode();
+
+ // get a Register on the Super Node
+ // completed with the required number of Daemons
+ getRegisterOnSuperNode();
+
+ createAppli();
+ createSpawnerNetwork();
+
+ // }
+ }
+
+ public void startScanning() {
+ //int res;
+ long time = RunningApplication.Instance().getChrono().getValue() / 1000;
+ System.out.println("Start scanning at time: " + time + "s");
+ // lancer le chrono qui gere les heart beat
+ while (RunningApplication.Instance().isRunning() == true) {
+ // 1 etape : scaner tous les "heartTime" milisecondes si les noeuds
+ // enregistes sont encore vivants
+ // res = scanConnectedHosts();
+
+ // 2 etape : a garder ou pas !!!!! regarder si l'appli est en
+ // attente de noeud pr lui en attribuer 1 nvx
+ scanAppliNodes();
+ try {
+ Thread.sleep(heartTime);
+ } catch (Exception e) {
+ }
+ }
+ // /System.out.println("is running = false");
+ if (!JaceDaemon.Instance().isRunning())
+ System.exit(0);
+ }
+
+ public synchronized void signalDeadNode(JaceInterface host, int rankOfDead) {
+ try {
+ TaskId myTaskId = null;
+ int nb = 0;
+ int nbC = 0;
+ long time = 0;
+ RunningApplication.Instance().incrementNumberOfDisconnections();
+
+ time = RunningApplication.Instance().getChrono().getValue() / 1000;
+ nb = RunningApplication.Instance().getNumberOfDisconnections();
+ nbC = RunningApplication.Instance().getNumberOfCouilles();
+ System.out.println("At time = " + time + "s, NbDisconnection = "
+ + nb + ", NbProblem = " + nbC);
+
+ // !!!!!!!!!!!!!!actualiser le ListeTask ds le Register
+ myTaskId = Register.Instance().getListeOfTasks()
+ .getTaskIdOfHostStub(host);
+ if (myTaskId == null) {
+ Register.Instance.getListeOfTasks().viewAll();
+ myTaskId = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(rankOfDead);
+ JaceInterface deadStub = myTaskId.getHostStub();
+ deadStub.suicide("Not doing a good work");
+ }
+ myTaskId.setHostIP(null);
+ myTaskId.setHostName(null);
+ Node noeud = Register.Instance().getNodeOfStub(
+ myTaskId.getHostStub());
+ myTaskId.setHostStub(null);
+ int rankDeaD = myTaskId.getRank();
+
+ String nomNoeud = noeud.getName();
+ // Register.Instance().removeNodeAt(i);
+ // Register.Instance().removeNode(host.getIP());
+ // System.out.println("fait le remove : taille = " +
+ // Register.Instance().getSize());
+
+ boolean b = Register.Instance().removeNodeOfName(noeud.getName());
+
+ if (b == true) {
+ System.out.println("Removing Node of rank "
+ + rankDeaD + " : size = "
+ + Register.Instance().getSize());
+ } else {
+ System.err
+ .println("Cannot remove the Node, it doesn't exist anymore: size = "
+ + Register.Instance().getSize());
+ }
+
+ Calendar cal = new GregorianCalendar();
+ System.out.println("At time=" + cal.get(Calendar.MINUTE) + ":"
+ + cal.get(Calendar.SECOND));
+
+ // retrouver SI POSSIBLE un autre libre pr remplacer celui la pr
+ // cette tache
+
+ /**** Sébastien Miquée **/
+ //Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud);
+ Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud, noeud);
+ try {
+ // broadcastRegister(0);
+ updateConcernedNodes(rankDeaD, noeud, tmpNode);
+
+ Thread.sleep(500);
+ System.out.println("Set scanning on %%%%%%");
+ tmpNode.getStub().setScanning(true);
+ } catch (Exception e) {
+ System.err.println("Unable to setScannig on for the new node: "
+ + e);
+ }
+
+ // Register.Instance().getListeOfTasks().viewAll();
+ for (int z = 0; z < spawnersList.size(); z++)
+ if (!((JaceSpawnerInterface) spawnersList.get(z))
+ .equals(Register.Instance().getSpawnerStub()))
+ try {
+ ((JaceSpawnerInterface) spawnersList.get(z))
+ .replaceDeamonBy(noeud, tmpNode, rankDeaD);
+
+ } catch (Exception e) {
+ System.err
+ .println("Unable to broadcast the modifications to all the spawners: "
+ + e);
+ }
+ } catch (Exception ee) {
+ System.err.println("Error in signalDeadNode() :" + ee);
+ }
+ }
+
+ // verifie si les noeud notes vivant ds le Register.Instance() du SuperNode
+ // le sont encore
+ // retourne 0 si erreur, 1 sinon
+ /*
+ * private synchronized int scanConnectedHosts() { long time = 0; Node host;
+ * Node tmpNode; long workerTime; long currentTime; int rank; int restempo;
+ * int nb = 0; int nbC = 0; boolean changed = false; int index=0; try{
+ * JaceSpawnerInterface spawnerStub=Register.Instance().getSpawnerStub();
+ * if(spawnerStub.getFinished()==true){
+ * System.out.println("nbre de taches="+Register.Instance().getSize());
+ * ListeTask t=Register.Instance().getListeOfTasks();
+ * for(index=z;index<t.getSize();index++){ TaskId recev = null;
+ * System.out.println("deleting Task************"+index);
+ *
+ * recev = t.get(index); JaceInterface stub=recev.getHostStub();
+ * spawnerStub.killApplication(stub); }
+ *
+ *
+ *
+ * } }catch(Exception e){
+ * System.out.println("w aiiiiiiiiiiiiiirrrr"+e+" index="+index); z=index;
+ * }
+ *
+ * if (Register.Instance().getSize() == 0) {
+ * System.out.println("aucun noeuds a scanner");
+ * RunningApplication.Instance().purge(); System.exit(0);
+ *
+ * }
+ *
+ * return 1; }
+ */
+
+ // trouver un noeud sur les superNode
+ // pr les requisitionner
+
+ /*** Sébastien Miquée ***/
+
+ //private synchronized Node foundToReplaceThisNode(int theRank, String nom) {
+ private synchronized Node foundToReplaceThisNode(int theRank, String nom, Node _n) {
+ // int i = 0;
+ boolean found = false;
+ Node node = null;
+
+ while (found == false) {
+ try {
+
+ //node = centralServer.getNewNode(LocalHost.Instance().getIP());
+ node = centralServer.getNewNode(LocalHost.Instance().getIP(), _n);
+
+ if( node != null )
+ {
+ found = true ;
+ }
+ } catch (Exception e) {
+ // trouver un autre superNode et lui demander le noeud a lui
+
+ System.err.println("Cannot localize SuperNode ! " + e);
+
+ connectSuperNode();
+ }
+ }
+
+
+ if (node != null) {
+ System.out.println("Using Node " + node.getName() + " ("
+ + node.getIP() + ") in order to replace " + nom
+ + " size before add: " + Register.Instance().getSize()
+ + "\n\n");
+ node.setAliveFlag(true);
+ node.setAliveTime();
+
+ // rajouter le noeud ds le Register
+ node.setAppliName(RunningApplication.Instance().getName());
+
+ // lui envoyer mon stub pr qu'il commence a me pinguer des
+ // maintenant
+ // TODO a mettre ds un thread ????
+
+ /*
+ * TaskId
+ * neighborTask=Register.Instance().getListeOfTasks().getTaskIdOfRank
+ * ((theRank+1)%Register.Instance().getListeOfTasks().getSize());
+ * try{ node.getStub().updateHeart(neighborTask.getHostStub()); }
+ * catch(Exception e) {
+ * System.out.println("nvx noeud deja plu dispo2"); //node = null; }
+ */
+ // TODO verif pourkoi superNode me le redonne
+ // alors qu'il fait deja du calcul
+ // int is = Register.Instance().existNode(node.getIP());
+ int is = Register.Instance().existNode(node);
+ if (is != -1) {
+ System.out.println("The Node is already in the register ! I don't add it.");
+ System.out.println("Node " + node.getName() + " not added !") ;
+ node = null;
+ } else {
+ Register.Instance().addNode(node);
+
+ // !!!!!!!!!!!!!!actualiser le ListeTask
+ TaskId myTaskId = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(theRank);
+ myTaskId.setHostIP(node.getIP());
+ myTaskId.setHostName(node.getName());
+ myTaskId.setHostStub(node.getStub());
+
+ // Register.Instance().getListeOfTasks().viewAll();
+ int neighborRank;
+ if (theRank == 0)
+ neighborRank = Register.Instance().getSize() - 1;
+ else
+ neighborRank = theRank - 1;
+ TaskId neighborTask2 = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(neighborRank);
+ try {
+ JaceInterface jaceStub = neighborTask2.getHostStub();
+ jaceStub.updateHeart(node.getStub());
+ } catch (Exception e) {
+ System.err.println("Next node unreachable ! " + e);
+ // node = null;
+ }
+
+ }
+
+ } else {
+ System.out.println("I didn't receive a new Node !");
+ }
+ return node;
+ }
+
+ public void replaceBy(JaceSpawnerInterface oldStub,
+ JaceSpawnerInterface stub) {
+ int index = spawnersList.indexOf((Object) oldStub);
+ if (index != -1)
+ spawnersList.setElementAt(stub, index);
+ else
+ System.err.println("Spawner's stub not foud in spawnersList !");
+ }
+
+ public void getNewSpawner(JaceSpawnerInterface previousSpawner) {
+ //boolean found = false;
+ Node node = null;
+ int index;
+ JaceSpawnerInterface spawnerStub = null;
+
+ // while (found == false) {
+ try {
+ // TODO : trouver l'erreur !!!
+ // msg d'erreur :
+ // "pas localise le super node java.lang.NullPointerException"
+ if (centralServer == null) {
+ System.err.println("Central Server not localized !");
+ }
+ node = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ;
+ RunningApplication.Instance()
+ .incrementNumberOfSpawnerDisconnections();
+ //found = true;
+ } catch (Exception e) {
+ // trouver un autre superNode et lui demander le noeud a lui
+ System.err.println("Super Node not localized !\n " + e);
+// System.out.println("pas localise le super node " + e);
+// System.out.println("pas localise le super node " + e);
+// System.out.println("pas localise le super node " + e);
+// System.out.println("pas localise le super node " + e);
+// System.out.println("pas localise le super node " + e);
+// System.out.println("pas localise le super node " + e);
+// System.out.println("pas localise le super node " + e);
+ System.err.println("My IP : " + LocalHost.Instance().getIP());
+ if (centralServer == null) {
+ System.err.println("CentralServer is NULL !");
+ }
+ connectSuperNode();
+ }
+ // }
+ if (node != null) {
+ index = spawnersList.indexOf((Object) previousSpawner);
+ if (index != -1) {
+ System.out.println("Using Node " + node.getName()
+ + " ("
+ + LocalHost.Instance().resolve(node.getName())
+ + ") to replace a dead spawner\n\n");
+ try {
+ // Register.Instance().viewAll();
+ // Register.Instance().getListeOfTasks().viewAll();
+ spawnerStub = node.getStub().transformIntoSpawner(
+ params,
+ appliName,
+ Register.Instance(),
+ nbTasks,
+ centralServer,
+ index,
+ heartTime,
+ 1,
+ RunningApplication.Instance()
+ .getNumberOfDisconnections(),
+ RunningApplication.Instance()
+ .getNumberOfSpawnerDisconnections(),
+ nbOfDaemonsPerSpawner, nbOfDeamonsPerThread);
+ spawnersList.setElementAt(spawnerStub, index);
+ new StartProcessThread(index).start();
+ // spawnerStub.startProcess( spawnersList);
+ } catch (Exception e) {
+ System.err.println("Unable to reach the new spawner: " + e);
+ }
+ for (int j = 0; j < spawnersList.size(); j++)
+ try {
+ if (!((JaceSpawnerInterface) spawnersList.get(j))
+ .equals(Register.Instance().getSpawnerStub())
+ && !((JaceSpawnerInterface) spawnersList.get(j))
+ .equals(spawnerStub)) {
+ System.out
+ .println("Trying to broadcast to spawner of rank "
+ + j);
+
+ ((JaceSpawnerInterface) spawnersList.get(j))
+ .replaceBy(previousSpawner, spawnerStub);
+ }
+ } catch (Exception e) {
+ System.err
+ .println("Unable to broadcast to spawner of rank: "
+ + j + ". Error:" + e);
+ }
+ ScanThreadSpawner.Instance().setServer(spawnerStub);
+
+ int previous;
+ if (index == 0)
+ previous = spawnersList.size() - 1;
+ else
+ previous = index - 1;
+ try {
+ ((JaceSpawnerInterface) spawnersList.get(previous))
+ .updateHeart(spawnerStub);
+ } catch (Exception e) {
+ System.err
+ .println("unable to change the server of the heartbeatThread for the previous node of rank "
+ + previous + ". error:" + e);
+ }
+ }
+ } else {
+ System.err.println("Node is null !");
+ }
+
+ }
+
+ public void broadcastFinished(boolean bool) {
+ for (int i = 0; i < spawnersList.size(); i++)
+ try {
+ ((JaceSpawnerInterface) spawnersList.get(i)).setFinished(bool);
+ } catch (Exception e) {
+ System.err
+ .println("Unable to propagate the end of the application :"
+ + e);
+ }
+ }
+
+ private synchronized void scanAppliNodes() {
+
+ //Node node = null;
+ //ListeTask tskList = null;
+ //int cptReplaced;
+ int index = 0;
+ try {
+ JaceSpawnerInterface spawnerStub = Register.Instance()
+ .getSpawnerStub();
+ if (spawnerStub.getFinished() == true) {
+ System.out.println("Number of tasks ="
+ + Register.Instance().getSize());
+
+ int x = Register.Instance().getListeOfTasks().getSize()
+ / nbOfDaemonsPerSpawner;
+ int s;
+ if (rank == x)
+ s = (Register.Instance().getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
+ / nbOfDeamonsPerThread;
+ else
+ s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
+
+ int debut = nbOfDaemonsPerSpawner * rank;
+
+ // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
+ // i<reg.getSize();i++)
+ // System.out.println(((Node)nodes.elementAt(i)).getName());
+
+ ListeTask t = Register.Instance().getListeOfTasks();
+ ScanThreadSpawner.Instance().kill();
+ HeartBeatSpawner.Instance().kill();
+ for (int i = 0; i < s + 1; i++) {
+
+ new KillThread(i, debut, nbOfDaemonsPerSpawner,
+ nbOfDeamonsPerThread, t).start();
+ }
+
+ Thread.sleep(2000);
+
+ long finalTime = RunningApplication.Instance().getChrono()
+ .getValue();
+
+ int nbe = RunningApplication.Instance()
+ .getNumberOfDisconnections();
+
+ int nbsdc = RunningApplication.Instance()
+ .getNumberOfSpawnerDisconnections();
+ System.out.println("Application finished successfully !");
+// System.out.println("Application finished successfully !!!!!!");
+// System.out.println("Application finished successfully !!!!!!");
+// System.out.println("Application finished successfully !!!!!!");
+// System.out.println("Application finished successfully !!!!!!");
+// System.out.println("Application finished successfully !!!!!!");
+// System.out
+// .println("Application finished successfully !!!!!!\n");
+ System.out.println("TOTAL TIME in s : " + (finalTime / 1000));
+ System.out.println("nb of desconnections: " + nbe);
+ System.out.println("nb of spawners desconnections: " + nbsdc);
+ if (JaceDaemon.Instance().isRunning()) {
+ JaceDaemon.Instance().reconnectSuperNode();
+
+ RunningApplication.Instance().purge();
+
+ } else {
+ // purger l'appli
+
+ RunningApplication.Instance().purge();
+ // System.exit(1);
+ }
+ }
+ } catch (Exception e) {
+ System.out
+ .println("w aiiiiiiiiiiiiiirrrr" + e + " index=" + index);
+ z = index;
+ }
+ /*
+ * if (Register.Instance().getSize() == 0) {
+ * System.out.println("aucun noeuds a scanner");
+ * RunningApplication.Instance().purge(); System.exit(0); return 0;
+ *
+ * } else{ tskList = Register.Instance().getListeOfTasks();
+ *
+ * //si mon appli a besoin d'un noeud //sinon, on fait rien if ((nbTasks
+ * - Register.Instance().getSize()) > 0) { cptReplaced = 0;
+ *
+ * //TODO demander des paquet de nodes, pas qu'un //on scanne toutes les
+ * taches de cette appli for (int ind = 0; ind < tskList.getSize();
+ * ind++) { //si 1 tache a pas de noeud, on trouve 1 remplacant
+ *
+ * //if (tskList.get(ind).getHostIP() == null) { if
+ * (tskList.get(ind).getHostStub() == null) { rank =
+ * tskList.get(ind).getRank(); node = foundToReplaceThisNodeTMP(rank);
+ * if (node != null) { cptReplaced++; }
+ *
+ * } }
+ *
+ * //qd fini de scanner taches, envoyer Register //si remplacement de
+ * noeud (c a d si Register modifier) if (cptReplaced != 0) {
+ * broadcastRegister(0); } try { Thread.currentThread().yield(); } catch
+ * (Exception e) {}
+ *
+ * }// fin if(appli.getNeededNodes() > 0) {
+ * //System.out.println("SCAN APPLI : taille : " +
+ * Register.Instance().getSize()); return 1; }
+ */
+ }
+
+// @SuppressWarnings("unused")
+// private synchronized Node foundToReplaceThisNodeTMP(int theRank) {
+// // int i = 0;
+// boolean found = false;
+// Node node = null;
+// // while (found == false) {
+// try {
+// // TODO : trouver l'erreur !!!
+// // msg d'erreur :
+// // "pas localise le super node java.lang.NullPointerException"
+// if (centralServer == null) {
+// System.out.println("centralServer est NUUUUUUUUULL");
+// }
+// node = centralServer.getNewNode(LocalHost.Instance().getIP());
+//
+// found = true;
+// } catch (Exception e) {
+// // trouver un autre superNode et lui demander le noeud a lui
+// System.out.println("TMP pas localise le super node " + e);
+// System.out.println("TMP pas localise le super node " + e);
+// System.out.println("TMP pas localise le super node " + e);
+// System.out.println("TMP pas localise le super node " + e);
+// System.out.println("TMP pas localise le super node " + e);
+// System.out.println("TMP pas localise le super node " + e);
+// System.out.println("TMP pas localise le super node " + e);
+// System.out.println("mon IP : " + LocalHost.Instance().getIP());
+// if (centralServer == null) {
+// System.out.println("centralServer : NULL");
+// }
+// connectSuperNode();
+// }
+// // }
+// if (node != null) {
+// System.out.println("COOOOOOOOOOOOOOOOOOOOOOL : requisition de "
+// + node.getName() + " taille avt add: "
+// + Register.Instance().getSize() + "\n\n");
+// node.setAliveFlag(true);
+// node.setAliveTime();
+//
+// // rajouter le noeud ds le Register
+// System.out.println("ds Register, manque "
+// + (nbTasks - Register.Instance().getSize()));
+// node.setAppliName(RunningApplication.Instance().getName());
+//
+// // lui envoyer mon stub pr qu'il commence a me pinguer des
+// // maintenant
+// // TODO a mettre ds un thread ????
+// try {
+// TaskId neighborTask = Register.Instance().getListeOfTasks()
+// .getTaskIdOfRank(
+// (theRank + 1)
+// % Register.Instance().getListeOfTasks()
+// .getSize());
+// node.getStub().updateHeart(neighborTask.getHostStub());
+// // node.getStub().updateHeart(this.spawnerRef);
+//
+// // int is = Register.Instance().existNode(node.getIP());
+// int is = Register.Instance().existNode(node);
+// // TODO verif pourkoi superNode me le redonne
+// // alors qu'il fait deja du calcul
+// if (is != -1) {
+// System.out.println("j'ajoute pas le noeud, il y est deja");
+// System.out.println("PAS AJOUTEE TMP " + node.getName());
+// System.out.println("PAS AJOUTEE TMP " + node.getName());
+// System.out.println("PAS AJOUTEE TMP " + node.getName());
+// System.out.println("PAS AJOUTEE TMP " + node.getName());
+// System.out.println("PAS AJOUTEE TMP " + node.getName());
+// node = null;
+// } else {
+// Register.Instance().addNode(node);
+//
+// // !!!!!!!!!!!!!!actualiser le ListeTask
+// TaskId myTaskId = Register.Instance().getListeOfTasks()
+// .getTaskIdOfRank(theRank);
+// myTaskId.setHostIP(node.getIP());
+// myTaskId.setHostName(node.getName());
+// myTaskId.setHostStub(node.getStub());
+// // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostIP(node.getIP());
+// // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostName(node.getName());
+// // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostStub(node.getStub());
+// }
+// } catch (Exception e) {
+// System.out.println("nvx noeud deja plu dispo");
+// node = null;
+// }
+// } else {
+// System.out.println("RADINNNNNNNNNNNNNN TMP ");
+// }
+// return node;
+// }
+
+ private void exportObject() {
+
+ JaceSpawnerServer spawnerServer = null;
+
+ System.out.println("Name of local machine is: "
+ + LocalHost.Instance().getName());
+ System.out.println("IP of local machine is: "
+ + LocalHost.Instance().getIP());
+ try {
+ // launch the JaceSpawnerServer
+ spawnerServer = new JaceSpawnerServer();
+ java.rmi.registry.LocateRegistry.createRegistry(spawnerPort);
+ java.rmi.registry.LocateRegistry.getRegistry(spawnerPort).rebind(
+ "JaceSpawnerServer", spawnerServer);
+ spawnerRef = (JaceSpawnerInterface) Naming.lookup("rmi://"
+ + LocalHost.Instance().getIP() + ":" + spawnerPort
+ + "/JaceSpawnerServer");
+
+ } catch (Exception e) {
+ System.err
+ .println("JaceP2P_Error in JaceSpawner.exportObject() when creating the local JaceSpawnerServer "
+ + e);
+// System.err.println("exit ds JaceSpawner.exportObject");
+ System.exit(0);
+ }
+
+ }
+
+ public void connectSuperNode() {
+ System.out.println("I'm looking for a super node");
+ boolean connected = false;
+ if (!(superNode_IP == null)) {
+ try {
+ System.out.println("Trying to invoke super node "
+ + superNode_IP);
+ centralServer = (JaceSuperNodeInterface) Naming.lookup("rmi://"
+ + superNode_IP + ":" + superNode_port
+ + "/JaceSuperNode");
+ System.out.println("succesfully located " + superNode_IP);
+
+ // add stub and IP in LocalHost to store it until super node
+ // death
+ LocalHost.Instance().setSuperNodeStub(centralServer);
+ LocalHost.Instance().setSuperNodeIP(superNode_IP);
+ heartTime = centralServer.getSuperNodeBeat();
+ timeBeforeKill = NB_HEART_DECONNECT * heartTime;
+ connected = true;
+
+ } catch (Exception e) {
+ System.err.println("Super Node not accessible, try another one (1/2s)");
+ try {
+ Thread.sleep(500);
+ } catch (Exception e1) {
+ }
+
+ }
+ }
+ if (connected == false) {
+ int i = 0;
+ SuperNodeListe.Instance().staticInitialization();
+ while (connected == false
+ && i < SuperNodeListe.Instance().getListe().size()) {
+ SuperNodeData d = null;
+ d = SuperNodeListe.Instance().getSuperNodeData(i);
+
+ superNode_IP = LocalHost.Instance().resolve(d.getIP());
+ superNode_port = d.getPort();
+ // superNode_port = d.getPort();
+ try {
+ System.out.println("Trying to invoke Super Node "
+ + superNode_IP);
+ centralServer = (JaceSuperNodeInterface) Naming
+ .lookup("rmi://" + superNode_IP + ":"
+ + superNode_port + "/JaceSuperNode");
+ System.out.println("Succesfully located SuperNode "
+ + superNode_IP);
+ LocalHost.Instance().setSuperNodeStub(centralServer);
+ LocalHost.Instance().setSuperNodeIP(superNode_IP);
+ heartTime = centralServer.getSuperNodeBeat();
+ timeBeforeKill = NB_HEART_DECONNECT * heartTime;
+
+ connected = true;
+ } catch (Exception e) {
+ System.err
+ .println("SuperNode "
+ + superNode_IP
+ + " not accessible, trying to locate another one in 0.5s\n");
+ i++;
+ try {
+ Thread.sleep(500);
+ } catch (Exception e1) {
+ }
+
+ }
+ }
+ }
+ if (connected == false) {
+ System.err.println("All the Super Nodes in the list are not accessible. I'm unable to connect to the platform !");
+ System.exit(1);
+ }
+
+ }
+
+ // get a Register on the SuperNode
+ // completed with the required number of Daemons
+ // or gets NULL
+ public synchronized void getRegisterOnSuperNode() {
+ Register registerSpawner = null;
+ Node noeud = null;
+ boolean recieved = false;
+
+ System.out.println("Trying to get a Register on the SuperNode");
+ int nbExtraSpawners = 0;
+ if (nbTasks > nbOfDaemonsPerSpawner) {
+ nbExtraSpawners = (nbTasks - 1) / nbOfDaemonsPerSpawner;
+
+ }
+ while (!recieved) {
+ try {
+ registerSpawner = centralServer.getRegisterSpawner(LocalHost
+ .Instance().getIP(), nbTasks, (Task) tache, nbTasks
+ + nbExtraSpawners, algo, paramAlgo);
+ recieved = true;
+ } catch (Exception e) {
+ System.err
+ .println("Unable to recieve a register from superNode "
+ + e);
+ connectSuperNode();
+ }
+ }
+ if (registerSpawner.getSize() != (nbTasks + nbExtraSpawners)) {
+ System.err.println("I did not recieve enough nodes from superNode!!!! \n killing application !!!!");
+ for (int i = 0; i < registerSpawner.getSize(); i++) {
+ try {
+ registerSpawner.getNodeAt(i).getStub().reconnectSuperNode();
+ } catch (Exception e) {
+ System.err.println("The reserved node was unable to reconnect to the super node");
+ }
+ }
+ System.exit(0);
+ }
+
+ spawnersList = new Vector<Object>();
+ for (int i = 0; i < nbExtraSpawners && i < registerSpawner.getSize(); i++) {
+ spawnersList.add(registerSpawner.getNodeAt(i
+ * nbOfDaemonsPerSpawner));
+ registerSpawner.removeNode(registerSpawner.getNodeAt(i
+ * nbOfDaemonsPerSpawner));
+ }
+
+ registerSpawner.setNbOfTasks(nbTasks);
+ registerSpawner.setNumBackupNeighbors(nbSavingNodes);
+ /*
+ * System.out.println("Trying to connect another SuperNode");
+ * connectSuperNode(); try { registerSpawner =
+ * centralServer.getRegisterSpawner(LocalHost.Instance().getIP(),
+ * nbTasks); } catch(Exception e1) {}
+ */
+
+ if (registerSpawner != null) {
+ System.out.println("I received the register");
+ // registerSpawner.setVersion(registerVersion);
+ // registerVersion++;
+ Register.Instance().replaceBy(registerSpawner);
+ System.out.println("It contains " + Register.Instance().getSize()
+ + " Nodes" + " " + nbExtraSpawners + " ExtraSpawners");
+
+ // set each Node aliveTime value to the Spawner current time
+ for (int i = 0; i < Register.Instance().getSize(); i++) {
+ noeud = Register.Instance().getNodeAt(i);
+ noeud.setAliveFlag(true);
+ noeud.setAliveTime();
+ }
+
+ } else {
+ System.err.println("\n---------------WARNING--------------");
+ System.err.println("No Daemon available on the SuperNode dispo, try later, please");
+ System.exit(0);
+ }
+ }
+
+ public class TransformThread extends Thread {
+ int i;
+ Node n;
+
+ public TransformThread(int i, Node n) {
+ this.i = i;
+ this.n = n;
+ }
+
+ public void run() {
+
+ try {
+ System.out.println("Trying to transform the spawner ("
+ + n.getName() + ") of rank " + i);
+ spawnersList.setElementAt(n.getStub().transformIntoSpawner(
+ params, appliName, Register.Instance(), nbTasks,
+ centralServer, i, heartTime, 0, 0, 0,
+ nbOfDaemonsPerSpawner, nbOfDeamonsPerThread), i);
+ } catch (Exception e) {
+ System.err.println("Error while contacting newly acquired spawner ("
+ + n.getName() + "): " + e);
+ try {
+ n = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ;
+
+ new TransformThread(i, n).start();
+ } catch (Exception e1) {
+ System.err.println("The Super Node is maybe dead: " + e1) ;
+ for (int z = 0; z < Register.Instance().getSize(); z++) {
+ try {
+ Register.Instance().getNodeAt(z).getStub()
+ .reconnectSuperNode();
+ } catch (Exception ez) {
+ System.err.println("The reserved node was unable to reconnect to the super node: \n"
+ + ez);
+ }
+ }
+ System.exit(0);
+ }
+ }
+ }
+ }
+
+ public class StartProcessThread extends Thread {
+ int i;
+
+ public StartProcessThread(int i) {
+ this.i = i;
+ }
+
+ public void run() {
+ try {
+
+ /*
+ * while((spawnersList.elementAt(i) instanceof Node)) try{
+ * System.out.println("waiting till transform of spawner "+i+
+ * " is finished"); Thread.sleep(20); }catch(Exception e1){}
+ */
+
+ // System.out.println("start process on spawner of rank "+i);
+ JaceSpawnerInterface spawnerStub = (JaceSpawnerInterface) spawnersList
+ .elementAt(i);
+ spawnerStub.startProcess(spawnersList);
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ System.err.println("Unable to start the process on the spawner of rank "
+ + i + ".error: " + e);
+ }
+ }
+ }
+
+ public void createSpawnerNetwork() {
+ Node n;
+ int i;
+ for (i = 0; i < spawnersList.size(); i++) {
+ n = (Node) spawnersList.elementAt(i);
+
+ // Register.Instance().getListeOfTasks().viewAll();
+ // spawnersList.setElementAt(n.getStub().transformIntoSpawner(
+ // params, appliName, Register.Instance(),nbTasks, centralServer,i,
+ // heartTime,0,0),i);
+ new TransformThread(i, n).start();
+
+ }
+ // broadcast the Register.Instance() to all the JaceServer
+ // in order to start each task on the Daemons
+
+ spawnersList.add(Register.Instance().getSpawnerStub());
+ System.out.println(" rank=spawnersList.size()=" + spawnersList.size());
+ rank = spawnersList.size() - 1;
+ broadcastRegister(1);
+ for (int j = 0; j < spawnersList.size(); j++) {
+ System.out.println("waiting till transform of spawner " + j
+ + " is finished");
+ while ((spawnersList.elementAt(j) instanceof Node))
+ try {
+
+ Thread.sleep(20);
+ } catch (Exception e) {
+ }
+ System.out
+ .println("End Transformation of all spawners. Beginning the computing processes");
+ }
+ for (i = 0; i < spawnersList.size(); i++) {
+
+ // while(!(spawnersList.elementAt(i) instanceof
+ // JaceSpawnerInterface))
+
+ new StartProcessThread(i).start();
+
+ }
+ System.out.println("End create Spawner Network!!!!!!!!!");
+ }
+
+ public JaceSpawnerInterface getSpawnerResponsibleOn(int rank) {
+ int id = rank / nbOfDaemonsPerSpawner;
+ return (JaceSpawnerInterface) spawnersList.get(id);
+ }
+
+ public void createAppli() {
+ int count = 0;
+ int i = 0;
+ String nodeName;
+ String nodeIP;
+ ListeTask tsk = new ListeTask();
+ Node tmpNode;
+ JaceInterface nodeStub = null;
+ TaskId myTask = null;
+
+ System.out.println("appli launched, starting the chrono");
+ RunningApplication.Instance().getChrono().start();
+
+ RunningApplication.Instance().setName(appliName);
+ RunningApplication.Instance().setNbTasks(nbTasks);
+ // RunningApplication.Instance().setRegister(Register.Instance());
+
+ Register.Instance().setParams(params);
+ Register.Instance().setAppliName(appliName);
+ Register.Instance().setSpawnerStub(this.spawnerRef);
+
+ // assign a TaskId to each Node of the Register
+ // and insert the TaskId in tke ListTask
+ while (i < Register.Instance().getSize() && count < nbTasks) {
+ tmpNode = Register.Instance().getNodeAt(i);
+ if (tmpNode.getAliveFlag() == true) {
+ tmpNode.setAppliName(appliName);
+ nodeStub = tmpNode.getStub();
+ nodeName = tmpNode.getName();
+ nodeIP = tmpNode.getIP();
+
+ myTask = new TaskId(appliName, count, nodeStub);
+ myTask.setHostIP(nodeIP);
+ myTask.setHostName(nodeName);
+
+ tsk.addTask(myTask);
+ count++;
+ }
+ i++;
+ }
+
+ // if not enough Nodes in the Register,
+ // insert not assigned TaskId in the ListTask
+ if (count < nbTasks) {
+ for (int j = count; j < nbTasks; j++) {
+ tsk.addTask(new TaskId(appliName, j, null));
+ }
+ System.out.println("in Register, misses "
+ + (nbTasks - Register.Instance().getSize()) + " nodes");
+ }
+
+ // insert the ListeTask in the Register of the appli
+ Register.Instance().setListeOfTasks(tsk);
+ // Register.Instance().getListeOfTasks().viewAll();
+ RunningApplication.Instance().setRunning(true);
+ System.out.println("fin create appli");
+ }
+
+ public class BroadcastSpawner extends Thread {
+ int debut;
+ int i;
+ int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
+
+ public BroadcastSpawner(int i, int debut, int nbOfDeamonsPerSpawner,
+ int nbOfDaemonsPerThread) {
+ this.debut = debut;
+ this.i = i;
+ this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
+ this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
+ }
+
+ public void run() {
+
+ for (int index = debut + i * nbOfDaemonsPerThread; index < debut
+ + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
+ && index < debut + nbOfDeamonsPerSpawner
+ && index < Register.Instance().getListeOfTasks().getSize(); index++) {
+ try {
+ Register.Instance().getNodeAt(index).getStub().setSpawner(
+ Register.Instance().getSpawnerStub());
+ } catch (Exception e) {
+ System.out.println("can't change spawner stub on node: "
+ + Register.Instance().getNodeAt(i).getName()
+ + ". error: " + e);
+ }
+ }
+ }
+ }
+
+ public class KillThread extends Thread {
+ int debut;
+ int i;
+ int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
+ ListeTask t;
+
+ public KillThread(int i, int debut, int nbOfDeamonsPerSpawner,
+ int nbOfDaemonsPerThread, ListeTask t) {
+ this.debut = debut;
+ this.i = i;
+ this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
+ this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
+ this.t = t;
+ }
+
+ public void run() {
+
+ // t.viewAll();
+ for (int index = debut + i * nbOfDaemonsPerThread; index < debut
+ + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
+ && index < debut + nbOfDeamonsPerSpawner
+ && index < t.getSize(); index++) {
+ Node noeud = null;
+ try {
+ TaskId recev = null;
+ System.out.println("deleting Task" + index);
+
+ recev = t.getTaskIdOfRank(index);
+
+ JaceInterface stub = recev.getHostStub();
+ System.out.println("name=" + recev.getHostName());
+ noeud = Register.Instance().getNodeOfStub(stub);
+ noeud.setAppliName(null);
+ new ReconnectThread(stub, noeud.getName()).start();
+ Register.Instance().removeNode(noeud);
+ // LocalHost.Instance().getSpawnerStub().killApplication(stub);
+
+ } catch (Exception e) {
+ try {
+ System.err.println("error in killThread on node "
+ + noeud.getName() + ". " + e);
+ } catch (Exception e2) {
+ System.err.println("error in error :" + e2);
+ }
+ }
+ }
+ }
+
+ class ReconnectThread extends Thread {
+ JaceInterface stub = null;
+ String name;
+
+ public ReconnectThread(JaceInterface s, String name) {
+ stub = s;
+ this.name = name;
+ }
+
+ public void run() {
+ try {
+ // System.out.println("reconnexion SuperNode");
+ // Register.Instance().getNode(workerIP).getWorkerStub().reconnectSuperNode();
+
+ // stub.reconnectSuperNode();
+ stub.suicide("fin d'appli");
+ } catch (Exception e) {
+ System.err.println("can't kill node " + name);
+ }
+ yield();
+ }
+ }
+
+ }
+
+ // faire une copie du Register et l'envoyer aux noeuds qui le compose
+ // car si il est modif en meme tmp, on envoi pas un truc coherent
+ private synchronized void broadcastRegister(int requete) {
+ // Register reg = Register.Instance().clone();
+ Register reg = Register.Instance();
+
+ try {
+ System.out.println("name of spawner: "
+ + Register.Instance().getSpawnerStub().getName());
+ // launch 1 thread to send the Register to all the nodes
+ while (broadcasting == true)
+ Thread.sleep(5);
+ broadcasting = true;
+ Register.Instance().setSpawnerStub(
+ Register.Instance().getSpawnerStub());
+ int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
+ int s;
+ if (rank == x)
+ if ((reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
+ % nbOfDeamonsPerThread == 0)
+ s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
+ / nbOfDeamonsPerThread;
+ else
+ s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
+ / nbOfDeamonsPerThread + 1;
+ else if ((nbOfDaemonsPerSpawner % nbOfDeamonsPerThread) == 0)
+ s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
+ else
+ s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread + 1;
+ int debut = nbOfDaemonsPerSpawner * rank;
+ System.out.println("rank=" + rank + " debut=" + debut + " s=" + s
+ + " nbOfDaemonsPerSpawner=" + nbOfDaemonsPerSpawner
+ + " nbOfDeamonsPerThread=" + nbOfDeamonsPerThread + " x="
+ + x);
+ for (int i = 0; i < s; i++)
+ new UpdateRegisterThread(tache, reg, requete, i, debut).start();
+ /*
+ * This thread : -updates the goal of the Node beats if necessary
+ * (stub.updateHeart) -updates the Register on each Node
+ * (stub.updateRegister)
+ */
+ JaceSpawner.Instance().setBroadcasting(false);
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ }
+
+ } catch (Exception e) {
+ System.out
+ .println("\n1 node has died during JaceSpawner.broadcastRegister()");
+ }
+ }
+
+ private synchronized void broadcastScanning() {
+ Register reg = Register.Instance();
+ while (broadcasting == true)
+ try {
+ Thread.sleep(500);
+ } catch (Exception e) {
+ }
+ // Register.Instance().viewAll();
+ Vector<?> nodes = (Vector<?>) Register.Instance().getListOfNodes().clone();
+ int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
+ int s;
+ if (rank == x)
+ s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
+ / nbOfDeamonsPerThread;
+ else
+ s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
+
+ int debut = nbOfDaemonsPerSpawner * rank;
+
+ // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
+ // i<reg.getSize();i++)
+ // System.out.println(((Node)nodes.elementAt(i)).getName());
+
+ for (int i = 0; i < s + 1; i++) {
+
+ new StartScanThread(i, nodes, debut).start();
+ }
+
+ }
+
+ public Register getRegister(int rank) {
+
+ ListeTask listOfTasks = Register.Instance().getListeOfTasks();
+ Vector<Integer> dependencies = getDependencies(rank, listOfTasks.getSize());
+ Register g = new Register();
+ ListeTask newListOfTasks = new ListeTask();
+ g.setAppliName(Register.Instance().getAppliName());
+ g.setParams(Register.Instance().getParams());
+ g.setSpawnerStub(Register.Instance().getSpawnerStub());
+ g.setNbOfTasks(Register.Instance().getNbOfTasks());
+ // g.setVersion(reg.getVersion());
+ for (int j = 0; j < dependencies.size(); j++) {
+ TaskId id = listOfTasks.getTaskIdOfRank(((Integer) dependencies
+ .elementAt(j)).intValue());
+ newListOfTasks.addTask(id);
+ if (id.getHostStub() != null) {
+ Node noeud = Register.Instance()
+ .getNodeOfStub(id.getHostStub());
+ g.addNode(noeud);
+ }
+ }
+ g.setListeOfTasks(newListOfTasks);
+ return g;
+ }
+
+ private void updateConcernedNodes(int rank, Node oldNode, Node node) {
+ ListeTask listOfTasks = Register.Instance().getListeOfTasks();
+ Vector<?> dependencies = getDependencies(rank, listOfTasks.getSize());
+ System.out.println("la liste des voisins concernes de : " + rank);
+ for (int z = 0; z < dependencies.size(); z++)
+ System.out.print(((Integer) dependencies.elementAt(z)).intValue()
+ + " ");
+ System.out.println();
+ // Register.Instance().setVersion(registerVersion);
+ // registerVersion++;
+ Register.Instance()
+ .setSpawnerStub(Register.Instance().getSpawnerStub());
+ int s;
+ if ((dependencies.size() % nbOfDeamonsPerThread) == 0)
+ s = dependencies.size() / nbOfDeamonsPerThread;
+ else
+ s = dependencies.size() / nbOfDeamonsPerThread + 1;
+ Register reg = Register.Instance();
+
+ for (int j = 0; j < s; j++) {
+ new UpdateRegisterConcernedThread(dependencies, reg, j, rank,
+ oldNode, node).start();
+ }
+ }
+
+ private Vector<Integer> getDependencies(int id, int jaceSize) {
+ // get computing dependencies
+ Vector<Integer> neighbors = new Vector<Integer>();
+ int[] dep = tache.getDependencies(id);
+ for (int z = 0; z < taille(dep); z++)
+ neighbors.add(dep[z]);
+ // System.out.println("la liste des voisins de calcul de: "+id+" concerne");
+ // for(int z=0;z<neighbors.size();z++)
+ // System.out.print(((Integer)neighbors.elementAt(z)).intValue()+" ");
+ // System.out.println();
+
+ // get convergence neighbors
+ int d = 0;
+ while (Math.pow(2, d) < jaceSize) {
+ if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize))
+ if (!neighbors.contains((Object) ((int) (id + Math.pow(2, d)))))
+ neighbors.add((int) (id + Math.pow(2, d)));
+ if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d))
+ if (!neighbors.contains((Object) ((int) (id - Math.pow(2, d)))))
+ neighbors.add((int) (id - Math.pow(2, d)));
+ d++;
+ }
+
+ // get backup neighbors
+ int nb = Register.Instance().getNumBackupNeighbors();
+ int rankOfBackTask;
+ int tmp;
+ for (int j = 1; j <= nb; j++) {
+ // ------------ 1 - for backups "j + n" (to the right of j)
+ rankOfBackTask = (id + j) % jaceSize;
+ if (!neighbors.contains((Object) rankOfBackTask))
+ neighbors.add(rankOfBackTask);
+
+ // ------------ 2 - for backups "j - n" (to the left of j)
+ tmp = id - j;
+ if (tmp >= 0) {
+ rankOfBackTask = tmp % jaceSize;
+ } else {
+ rankOfBackTask = jaceSize - (Math.abs(tmp) % jaceSize);
+ }
+ if (!neighbors.contains((Object) rankOfBackTask))
+ neighbors.add(rankOfBackTask);
+ }
+ // adds itself
+ neighbors.add(id);
+ return neighbors;
+
+ }
+
+ public static int taille(int[] vect) {
+ int taille = 0;
+ int x = 0;
+ while (x < vect.length && vect[x] >= 0) {
+ taille++;
+ x++;
+ }
+ return x;
+ }
+
+ class StartScanning extends Thread {
+
+ public StartScanning() {
+ }
+
+ public void run() {
+ startScanning();
+ }
+ }
+
+}
+
+class StartScanThread extends Thread {
+ int i, debut;
+ Vector<?> nodes;
+ int nbOfDeamonsPerThread, nbOfDeamonsPerSpawner;
+
+ StartScanThread(int i, Vector<?> nodes, int debut) {
+ this.i = i;
+ this.nodes = nodes;
+ this.debut = debut;
+ nbOfDeamonsPerThread = JaceSpawner.Instance().getNbOfDeamonsPerThread();
+ nbOfDeamonsPerSpawner = JaceSpawner.Instance()
+ .getNbOfDeamonsPerSpawner();
+ }
+
+ public void run() {
+ int index;
+ for (index = debut + i * nbOfDeamonsPerThread; index < debut + i
+ * nbOfDeamonsPerThread + nbOfDeamonsPerThread
+ && index < debut + nbOfDeamonsPerSpawner
+ && index < nodes.size(); index++) {
+
+ Node node = (Node) nodes.elementAt(index);
+ JaceInterface stub = node.getStub();
+ String name = node.getName();
+ try {
+
+ stub.setScanning(true);
+ // System.out.println("modify scanning to "+name);
+
+ } catch (Exception e) {
+ System.out.println("unable to modify scanning to " + name + ":"
+ + e);
+ }
+ }
+ // for(int x=0;x<nodes.size();x++)
+ // System.out.println(((Node)nodes.elementAt(x)).getName());
+ // System.out.println("nbre total: "+(index-1));
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+import java.util.Vector;
+
+
+
+public interface JaceSpawnerInterface extends Remote {
+
+ public void killApplication(JaceInterface stub) throws RemoteException;
+
+ public long getChronoValue(String appliName) throws RemoteException;
+
+ public void setFinished(boolean bool) throws RemoteException;
+
+ public boolean getFinished() throws RemoteException;
+
+ public void signalDeadNode(JaceInterface host, int rankOfDead)
+ throws RemoteException;
+
+ public void beating() throws RemoteException;
+
+ public void replaceDeamonBy(Node oldNode, Node node, int rank)
+ throws RemoteException;
+
+ public void replaceBy(JaceSpawnerInterface oldStub,
+ JaceSpawnerInterface stub) throws RemoteException;
+
+ public void updateHeart(JaceSpawnerInterface stub) throws RemoteException;
+
+ public void startProcess(Vector<?> spawnersList) throws RemoteException;
+
+ public void setOver(boolean bool) throws RemoteException;
+
+ public String getName() throws RemoteException;
+
+ public Register getRegister(int rank) throws RemoteException;
+
+ public boolean ping() throws RemoteException;
+}
--- /dev/null
+package jaceP2P;
+
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.Vector;
+
+
+public class JaceSpawnerServer extends UnicastRemoteObject implements
+ JaceSpawnerInterface {
+
+ private static final long serialVersionUID = 1L;
+
+ int nbKilled = 0;
+ boolean finished = false;
+
+ // constructeur
+ public JaceSpawnerServer() throws RemoteException {
+ super();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void startProcess(Vector spawnersList) throws RemoteException {
+ JaceSpawner.Instance().startProcess(spawnersList);
+
+ }
+
+ public Register getRegister(int rank) throws RemoteException {
+ return JaceSpawner.Instance().getRegister(rank);
+ }
+
+ public void killApplication(JaceInterface stub) throws RemoteException {
+
+ // Node noeud = Register.Instance().getNodeOfStub(stub);
+ // noeud.setAppliName(null);
+ // new ReconnectThread(stub,noeud.getName()).start();
+ // Register.Instance().removeNode(noeud);
+ // nbKilled++;
+ /*
+ * if (Register.Instance().getSize() < 1) { long finalTime =
+ * RunningApplication.Instance().getChrono().getValue(); int nb =
+ * RunningApplication.Instance().getNumberOfDisconnections();
+ * System.out.println("Application finished successfully !!!!!!");
+ * System.out.println("Application finished successfully !!!!!!");
+ * System.out.println("Application finished successfully !!!!!!");
+ * System.out.println("Application finished successfully !!!!!!");
+ * System.out.println("Application finished successfully !!!!!!");
+ * System.out.println("Application finished successfully !!!!!!");
+ * System.out.println("Application finished successfully !!!!!!");
+ * System.out.println("Application finished successfully !!!!!!");
+ * System.out.println("Application finished successfully !!!!!!\n");
+ * System.out.println("TOTAL TIME in s : " + (finalTime/1000));
+ * System.out.println("nb of desconnections : " + nb);
+ *
+ * //purger l'appli RunningApplication.Instance().purge(); }
+ */
+ }
+
+ public long getChronoValue(String appliName) throws RemoteException {
+ long res = -1;
+ if (RunningApplication.Instance().getName().equals(appliName)) {
+ res = RunningApplication.Instance().getChrono().getValue();
+ // System.out.println("temps chrono : " + res + " ms\n");
+ } else {
+ System.out.println("pas d'appli de ce nom sur ce spawner");
+ }
+ return res;
+ }
+
+ public void setOver(boolean bool) throws RemoteException {
+ JaceSpawner.Instance().broadcastFinished(bool);
+
+ }
+
+ public void setFinished(boolean bool) throws RemoteException {
+ finished = bool;
+
+ }
+
+ public String getName() throws RemoteException {
+ return LocalHost.Instance().getName();
+ }
+
+ public boolean getFinished() throws RemoteException {
+ return finished;
+ }
+
+ public void signalDeadNode(JaceInterface host, int rankOfDead)
+ throws RemoteException {
+ System.out.println("signalDeadNode of rank " + rankOfDead);
+ JaceSpawner.Instance().signalDeadNode(host, rankOfDead);
+ }
+
+ // heartBeat that detects if a Spawner is dead or alive
+ public void beating() throws RemoteException {
+ ScanThreadSpawner.Instance().setAliveTime();
+
+ // System.out.println("spawner is pinging me");
+ }
+
+ public synchronized void replaceBy(JaceSpawnerInterface oldStub,
+ JaceSpawnerInterface stub) throws RemoteException {
+ Calendar cal = new GregorianCalendar();
+ System.out.println("at time=" + cal.get(Calendar.MINUTE) + ":"
+ + cal.get(Calendar.SECOND));
+ System.out.println("replacing a dead spawner by another");
+ JaceSpawner.Instance().replaceBy(oldStub, stub);
+ RunningApplication.Instance().incrementNumberOfSpawnerDisconnections();
+ }
+
+ public void updateHeart(JaceSpawnerInterface stub) throws RemoteException {
+ HeartBeatSpawner.Instance().setServer(stub);
+ }
+
+ public boolean ping() throws RemoteException {
+ return true;
+ }
+
+ public synchronized void replaceDeamonBy(Node oldNode, Node node, int rank)
+ throws RemoteException {
+ Calendar cal = new GregorianCalendar();
+ System.out.println("at time=" + cal.get(Calendar.MINUTE) + ":"
+ + cal.get(Calendar.SECOND));
+ if (Register.Instance().removeNodeOfName(oldNode.getName()))
+ System.out.println("Node " + oldNode.getName() + " of rank " + rank
+ + " has been removed");
+ else
+ System.out.println("Node " + oldNode.getName() + " of rank " + rank
+ + " hasn't been removed");
+ Register.Instance().addNode(node);
+
+ TaskId myTaskId = Register.Instance().getListeOfTasks()
+ .getTaskIdOfHostStub(oldNode.getStub());
+
+ myTaskId.setHostIP(node.getIP());
+
+ myTaskId.setHostName(node.getName());
+ myTaskId.setHostStub(node.getStub());
+ // Register.Instance().setVersion(Register.Instance().getVersion()+1);
+ RunningApplication.Instance().incrementNumberOfDisconnections();
+ System.out.println("replacing node: " + oldNode.getName() + " with: "
+ + node.getName());
+ // Register.Instance().getListeOfTasks().viewAll();
+ }
+
+}
--- /dev/null
+package jaceP2P;
+
+import java.rmi.Naming;
+
+import and.Mapping.GNode;
+
+public class JaceSuperNode {
+ final int NB_HEART_DECONNECT = 3;
+ // attribute
+ private int heartTime; // frequency of heartBeat
+ private int port = 1098; // par la suite, donner par fichier de conf
+ private int timeBeforeKill; // wait for 3 non response of heartBeat to kill
+ // the Daemon
+ String protocol;
+
+ private JaceSuperNodeServer snodeServer ;
+
+ public JaceSuperNode(String superNodeName, int port, String comProtocol,
+ int beat) {
+ heartTime = beat;
+ timeBeforeKill = NB_HEART_DECONNECT * heartTime;
+ this.port = port;
+ protocol = comProtocol;
+ snodeServer = null ;
+ }
+
+ public void initialize() {
+ // if(protocol.equals("rmi")){
+
+ // create his list of SuperNode
+ // containing the IPs and ports
+ // but not already the stubs
+ SuperNodeListe.Instance().staticInitialization();
+
+ HeartBeatSNode.Instance().setHeartTime(heartTime);
+ JaceSuperNodeInterface myStub = null;
+
+ try {
+ snodeServer = new JaceSuperNodeServer(heartTime);
+
+ // lauch the rmiregistry
+ java.rmi.registry.LocateRegistry.createRegistry(port);
+ java.rmi.registry.LocateRegistry.getRegistry(port).rebind(
+ "JaceSuperNode", snodeServer);
+ myStub = (JaceSuperNodeInterface) Naming.lookup("rmi://"
+ + LocalHost.Instance().getIP() + ":" + port
+ + "/JaceSuperNode");
+ LocalHost.Instance().setSuperNodeStub(myStub);
+ LocalHost.Instance().setPort(port);
+ System.out.println("SuperNode " + LocalHost.Instance().getIP()
+ + " launched and waiting for invokations on port " + port);
+ } catch (Exception e) {
+ System.err
+ .println("JaceP2P_Error in JaceSuperNode.initialize() when biding the JaceSuperNodeServer : "
+ + e);
+ System.err.println("Exit in JaceSuperNode.initialise()");
+ System.exit(1);
+ }
+
+ // get the stubs of the conected SuperNodes
+ SuperNodeListe.Instance().locateSuperNodes(myStub);
+
+ // }
+
+ int index = SuperNodeListe.Instance().existSuperNode(
+ LocalHost.Instance().getIP());
+ int next, previous;
+ if (index == (SuperNodeListe.Instance().getListe().size() - 1))
+ next = 0;
+ else
+ next = index + 1;
+ HeartBeatSNode.Instance().setServer(
+ ((SuperNodeData) SuperNodeListe.Instance().getListe()
+ .elementAt(next)).getStub());
+ if (index == 0)
+ previous = SuperNodeListe.Instance().getListe().size() - 1;
+ else
+ previous = index - 1;
+ System.out.println(index + " " + next + " " + previous);
+ try {
+ ((SuperNodeData) SuperNodeListe.Instance().getListe().elementAt(
+ previous)).getStub().updateHeart(
+ ((SuperNodeData) SuperNodeListe.Instance().getListe()
+ .elementAt(index)).getStub());
+ } catch (Exception e) {
+
+ System.err
+ .println("Unable to modify heartbeat server for previous node"
+ + e);
+ }
+
+ HeartBeatSNode.Instance().start();
+ try {
+ Thread.sleep(HeartBeatSNode.Instance().getHeartTime());
+ } catch (Exception e) {
+ }
+ TokenThread.Instance().start();
+ ScanThreadSuperNode.Instance().start();
+ startScanningNodes();
+ }
+
+ // mettre des threads pr scanner
+ public void startScanningNodes() {
+ while (true) {
+ // scan at every "heartTime" milisecondes if nodes registered are
+ // still alive
+ scanConnectedHosts();
+
+ try {
+ Thread.sleep(heartTime);
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ // verify if the nodes labeled "alive" in
+ // Register.Instance() of the SuperNode are still alive
+ private synchronized void scanConnectedHosts() {
+ Node host;
+ long workerTime;
+ long currentTime;
+
+ // detects the nodes connected that should have died
+ for (int i = 0; i < Register.Instance().getSize(); i++) {
+ host = Register.Instance().getNodeAt(i);
+ if (host.getAliveFlag() == true && host.getAppliName() == null) {
+ workerTime = host.getAliveTime();
+ currentTime = System.currentTimeMillis();
+
+ // if the worker time has not changed since more than
+ // "timeBeforeKill" milisecondes, it is considered down
+ if (currentTime - workerTime > timeBeforeKill) {
+ // System.out.println(host.getName() +
+ // " : difference of time = " + (currentTime - workerTime));
+ host.setAliveFlag(false);
+ // try to reconnect the daemon to the super node
+ try {
+ // if(protocol.equals("rmi")){
+ host.getStub().reconnectSuperNode();
+ // System.out.println("Daemon reconnected to the super node");
+ // }
+ } catch (Exception e) {
+ System.out.println("\nDISCONNECTION of " + host.getName()
+ + " size : " + Register.Instance().getSize());
+ }
+
+ // System.out.println("I remove the node because it doesnt answer anymore");
+ Register.Instance().removeNode(host);
+ int index = SuperNodeListe.Instance().existSuperNode(
+ LocalHost.Instance().getIP());
+ ((SuperNodeData) SuperNodeListe.Instance().getListe().get(
+ index)).setNbOfNodes(Register.Instance().getSize());
+ SuperNodeListe.Instance().forwardCountNode();
+
+ GNode deadGNode = snodeServer.delGNodeFromList( host ) ;
+ SuperNodeListe.Instance().removeGNode( deadGNode ) ;
+
+ // Register.Instance().viewAll();
+ // SuperNodeListe.Instance().viewAll();
+ }
+ }
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+import java.util.Vector;
+
+import and.Mapping.Algo;
+import and.Mapping.GNode;
+
+public interface JaceSuperNodeInterface extends Remote {
+// public void workerRegistering(JaceInterface workerStub, String workerIP,
+// String workerName, int port) throws RemoteException;
+
+ public void workerRegistering(JaceInterface workerStub, String workerIP,
+ String workerName, int port, GNode g) throws RemoteException;
+
+ public int getSuperNodeBeat() throws RemoteException;
+
+ public void beating(JaceInterface stub) throws RemoteException;
+
+ public void beating(boolean bool) throws RemoteException;
+
+// public Register getRegisterSpawner(String spawnerIP, int nbTasks)
+// throws RemoteException;
+
+ public Register getRegisterSpawner(String spawnerIP, int nbTasks, Task t,
+ int nbNodes, int algo, double param) throws RemoteException;
+
+// public Register reserveLocalNodes(int nb) throws RemoteException;
+
+// public Node getNewNode(String spawnerIP) throws RemoteException;
+
+ public Node getNewNode(String spawnerIP, Node _deadNode) throws RemoteException;
+
+ public Vector<?> sendStub(String IP, int port, JaceSuperNodeInterface stub)
+ throws RemoteException;
+
+ public void updateCountNode(String IP, int nb) throws RemoteException;
+
+ public void sendSurplus(Vector<?> nodes) throws RemoteException;
+
+ public void removeSuperNode(SuperNodeData d) throws RemoteException;
+
+ public void setToken() throws RemoteException;
+
+ public void updateHeart(JaceSuperNodeInterface stub) throws RemoteException;
+
+ /** ! **/
+ public void addGNode( GNode _g ) throws RemoteException ;
+
+ public void removeGNode( GNode _g ) throws RemoteException ;
+
+ public void setMapping( Algo al ) throws RemoteException ;
+
+}
+
+/** ! **/
--- /dev/null
+package jaceP2P;
+
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.ArrayList;
+import java.util.Vector;
+
+import and.Mapping.Algo;
+import and.Mapping.DefaultMapping;
+import and.Mapping.GNode;
+import and.Mapping.GTask;
+import and.Mapping.Graph;
+import and.Mapping.Grid;
+import and.Mapping.LSM;
+import and.Mapping.Mapping;
+import and.Mapping.QM;
+import and.Mapping.Simple;
+import and.Mapping.Utils;
+
+
+public class JaceSuperNodeServer extends UnicastRemoteObject implements
+ JaceSuperNodeInterface {
+ private static final long serialVersionUID = 1L;
+
+ // Attributes
+ private int beat;
+ private ArrayList<GNode> gnodes = null;
+ private Algo al = null ;
+ private int count = 0 ;
+// private boolean daemonListChange ;
+
+ // Constructors
+
+ public JaceSuperNodeServer(int timeBeat) throws RemoteException {
+ super() ;
+ beat = timeBeat ;
+ gnodes = new ArrayList<GNode>() ;
+// daemonListChange = true ;
+ }
+
+ public int getSuperNodeBeat() throws RemoteException {
+ return beat;
+ }
+
+ public Vector<?> sendStub(String IP, int port, JaceSuperNodeInterface stub)
+ throws RemoteException {
+ SuperNodeListe.Instance().addStubOf(IP, port, stub);
+ System.out.println("Added new superNode (" + IP
+ + ") to list and return List");
+ return SuperNodeListe.Instance().getListe();
+ }
+
+ public void updateCountNode(String IP, int nb) throws RemoteException {
+ SuperNodeListe.Instance().modifCountNode(IP, nb);
+ // System.out.println( "SuperNode " + IP + " has registered " + nb +
+ // " Daemons" ) ;
+ SuperNodeListe.Instance().viewAll();
+ }
+
+ public void sendSurplus(Vector<?> nodes) throws RemoteException {
+ System.out.println("Recieved " + nodes.size() + " nodes");
+
+ for (int i = 0; i < nodes.size(); i++) {
+ Register.Instance().getListe().add((Node) nodes.elementAt(i));
+
+ System.out.println("Adding "
+ + ((Node) nodes.elementAt(i)).getName());
+
+ try {
+ ((Node) nodes.elementAt(i)).getStub().updateHeart(
+ LocalHost.Instance().getSuperNodeStub());
+ int index = SuperNodeListe.Instance().existSuperNode(
+ LocalHost.Instance().getIP());
+ ((SuperNodeData) SuperNodeListe.Instance().getListe()
+ .get(index))
+ .setNbOfNodes(Register.Instance().getSize());
+ new ForwardCount().start();
+ } catch (Exception e) {
+ System.out.println("Error changing Server in SendSurplus : "
+ + e);
+ }
+ }
+ }
+
+ public void setToken() throws RemoteException {
+ System.out.println("I got Token");
+
+ TokenThread.Instance().setToken();
+
+ try {
+ HeartBeatSNode.Instance().getServer().beating(true);
+
+ System.out.println("Put token to true");
+ } catch (Exception e) {
+ System.out
+ .println("Unable to heartBeat the next SuperNode with the new Token : "
+ + e);
+ }
+ }
+
+ public void updateHeart(JaceSuperNodeInterface stub) throws RemoteException {
+ System.out.println("I change to ping a superNode");
+
+ HeartBeatSNode.Instance().setServer(stub);
+ }
+
+ public synchronized void removeSuperNode(SuperNodeData d)
+ throws RemoteException {
+ SuperNodeListe.Instance().removeSuperNode(d);
+ }
+
+ /****************************************************/
+ /****************************************************/
+
+ // Register a Daemon in the Register.Instance() of the SuperNode
+ public synchronized void workerRegistering(JaceInterface workerStub,
+ String workerIP, String workerName, int port, GNode g)
+ throws RemoteException {
+ System.out.println("CONNEXION of " + workerName);
+
+ // Create the node
+ Node noeud = new Node(workerStub);
+ noeud.setName(workerName);
+ noeud.setIP(workerIP);
+ noeud.setAliveFlag(true);
+ noeud.setAliveTime();
+ noeud.setPort(port);
+ noeud.setAppliName(null);
+ noeud.setNbOfBeats(0);
+ noeud.setId( count ) ;
+ noeud.setId( Long.parseLong( workerIP.replace( ".", "" ) ) ) ;
+
+ g.setNode(noeud);
+ g.setId( noeud.getId() ) ;
+ gnodes.add(g);
+
+// daemonListChange = true ;
+
+ // Insert the node in the Register.Instance() of the Super Node
+ Register.Instance().addNode(noeud);
+
+ // Register.Instance().viewAll() ;
+ // SuperNodeListe.Instance().viewAll() ;
+
+ // Inform the other superNode and tell them the nb of Daemon I have
+ // registered
+ int index = SuperNodeListe.Instance().existSuperNode(
+ LocalHost.Instance().getIP());
+
+ ((SuperNodeData) SuperNodeListe.Instance().getListe().get(index))
+ .setNbOfNodes(Register.Instance().getSize());
+ SuperNodeListe.Instance().forwardCountNode();
+
+ SuperNodeListe.Instance().addGNode( g ) ;
+ }
+
+ /****************************************************/
+ /****************************************************/
+
+ // HeartBeat that detects if a Daemon is dead or alive
+ public void beating(JaceInterface stub) throws RemoteException {
+ Node noeud = Register.Instance().getNodeOfStub(stub);
+
+ if (noeud != null) {
+ noeud.setAliveFlag(true);
+ noeud.setAliveTime();
+ noeud.incrementNbOfBeats();
+ // -- sm modif
+ // System.out.println( noeud.getName() + " is pinging me" ) ;
+ } else {
+ // System.out.println( noeud.getName() +
+ // ".................. is not in my list" ) ;
+ }
+ }
+
+ // HeartBeat that detects if a Super Node is dead or alive
+ public void beating(boolean token) throws RemoteException {
+ ScanThreadSuperNode.Instance().setAliveTime();
+ ScanThreadSuperNode.Instance().setToken(token);
+ // -- sm modif
+ // System.out.println( "Super Node is pinging me" ) ;
+ }
+
+
+ /*********************************/
+ /** Mapping !! Sébastien Miquée **/
+ /*********************************/
+
+ public Register getRegisterSpawner(String spawnerIP, int nbTasks, Task t,
+ int nbNoeuds, int algo, double paramAlgo) throws RemoteException {
+
+ // Have we the correct application ?
+ if (t == null) {
+ System.err.println( "Problem of class transmission !" ) ;
+ return null ;
+ }
+
+ if( t.getDependencies( 0 ) == null )
+ {
+ System.err.println( "No redifinition of getDependencies() functions !" ) ;
+ return null ;
+ }
+
+
+ /** Creation of an empty new Register **/
+ Register reg = new Register() ;
+
+ /** Initialization of Grid architecture (G5K for now) **/
+ Grid grid = Utils.createGridG5k(gnodes);
+ grid.initClusters();
+
+ /** Creation of tasks GTask **/
+ ArrayList<GTask> ts = new ArrayList<GTask>();
+ for (int i = 0; i < nbTasks; i++) {
+ ts.add(new GTask( i ) ) ;
+ }
+
+ /** Research of dependencies **/
+ for (int i = 0; i < nbTasks; i++) {
+ int dep[] = null;
+ dep = t.getDependencies(i);
+
+ /** Adding dependencies to tasks **/
+ for (int j = 0; j < dep.length; j++) {
+ if (dep[j] != -1) {
+ ts.get(i).addDependance(ts.get(dep[j]));
+ } else {
+ break;
+ }
+ }
+ }
+
+ Graph graph = new Graph();
+
+ for( int i = 0 ; i < ts.size() ; i++)
+ {
+ graph.addGTask(ts.get(i));
+ }
+
+ // -- debug !
+ // graph.print() ;
+
+ // try {
+ // Thread.sleep(10000) ;
+ // } catch( Exception e ) {}
+
+ // grid.print() ;
+ //
+ // try {
+ // Thread.sleep( 10000 ) ;
+ // } catch( Exception e ) {}
+
+ /** Selection of the mapping algorithm **/
+ al = null ;
+
+ switch (algo) {
+ case 0:
+ al = new Simple(graph, grid);
+ break;
+ case 1:
+ al = new QM(graph, grid, paramAlgo);
+ break;
+ case 2:
+ al = new LSM(graph, grid, paramAlgo);
+ break;
+ default:
+ al = new DefaultMapping( graph, grid, gnodes ) ;
+ }
+
+ if (al != null) {
+ /** Launching the Mapping **/
+ al.map();
+
+ /** Transforming mapping in register **/
+ Mapping mp = al.getMapping();
+
+ /** Creating the register **/
+ ArrayList<GNode> ag = mp.getMappedGNodes();
+
+ for (int i = 0; i < ag.size(); i++) {
+ reg.addNode((Node) ag.get(i).getNode());
+ gnodes.remove(ag.get(i));
+ Register.Instance().removeNode((Node) ag.get(i).getNode());
+ }
+
+ if (ag.size() != 0) {
+ SuperNodeListe.Instance().forwardCountNode();
+ }
+
+ }
+// else {
+// al = new DefaultMapping( graph, grid, gnodes ) ;
+// /** Launching the Mapping **/
+// al.map();
+//
+// /** Transforming mapping in register **/
+// Mapping mp = al.getMapping();
+//
+// /** Creating the register **/
+// ArrayList<GNode> ag = mp.getMappedGNodes();
+//
+// for (int i = 0; i < ag.size(); i++) {
+// reg.addNode((Node) ag.get(i).getNode());
+// gnodes.remove(ag.get(i));
+// Register.Instance().removeNode((Node) ag.get(i).getNode());
+// }
+//
+// if (ag.size() != 0) {
+// SuperNodeListe.Instance().forwardCountNode();
+// }
+// return getRegisterSpawner(spawnerIP, nbTasks);
+// }
+
+// daemonListChange = false ;
+
+ System.out.println( "Spawner returned reg: " + reg ) ;
+
+ /* Mapping distribution over other Super Nodes */
+ SuperNodeListe.Instance().setMapping( al ) ;
+
+ /* Returning result */
+ return reg ;
+ }
+
+ /*****************************************/
+ /**** Sébastien Miquée ****/
+ /** **/
+ /** Recherche nouveau noeud **/
+ /*****************************************/
+
+ protected GNode delGNodeFromList( Node _n )
+ {
+ GNode deadGNode = null ;
+
+ for( int i = 0 ; i < gnodes.size() ; i++ )
+ {
+ if( ((Node)gnodes.get(i).getNode()).getId() == _n.getId() )
+ {
+ deadGNode = gnodes.remove( i ) ;
+ break ;
+ }
+ }
+
+// daemonListChange = true ;
+
+ SuperNodeListe.Instance().removeGNode( deadGNode ) ;
+
+ return deadGNode ;
+ }
+
+
+ public Node getNewNode( String _spawnerIP, Node _deadNode ) throws RemoteException
+ {
+ Node node = null ;
+ GNode remp = null, gnode = null ;
+
+
+ if( _deadNode != null )
+ {
+ gnode = delGNodeFromList( _deadNode ) ;
+
+ /* TODO */
+ Mapping mp = al.getMapping() ;
+
+// ArrayList <GNode> mapped = mp.getMappedGNodes() ;
+
+ mp.removeGNode( gnode ) ;
+
+ /*****************/
+ // mettre directement dans Algo !
+// for( int i = 0 ; i < mapped.size() ; i++ )
+// {
+// gnode = mapped.get( i ) ;
+// tmp = (Node) gnode.getNode() ;
+// if( tmp.getId() == _deadNode.getId() )
+// {
+// mapped.remove( i ) ;
+// break ;
+// }
+// }
+
+ remp = al.replaceNode( gnode, gnodes ) ;
+
+ if( remp != null )
+ {
+ System.out.println( "Replacing node found." ) ;
+ node = (Node) remp.getNode() ;
+ delGNodeFromList( node ) ;
+// gnodes.remove( remp ) ;
+// Register.Instance().removeNode( node );
+ SuperNodeListe.Instance().forwardCountNode();
+ } else {
+ System.err.println( "Replacing node not found !!" ) ;
+ }
+ } else {
+ remp = al.getOtherGNode() ;
+
+ if( remp != null )
+ {
+ System.out.println( "Other new node found." ) ;
+ node = (Node) remp.getNode() ;
+ delGNodeFromList( node ) ;
+// gnodes.remove( remp ) ;
+// Register.Instance().removeNode( node );
+ SuperNodeListe.Instance().forwardCountNode();
+ } else {
+ System.err.println( "Other new node not found !!" ) ;
+ }
+ }
+
+ return node ;
+ }
+
+
+ /**********************************************************/
+ /**********************************************************/
+
+
+// public Node getNewNode(String spawnerIP) throws RemoteException {
+// boolean found = false;
+// int i = 0;
+// Node tmpNode = null;
+// SuperNodeData d = null;
+// //int passage = 0;
+// String snode_IP;
+// JaceSuperNodeInterface stub = null;
+//
+// try {
+// System.out.println("\n" + spawnerIP
+// + " (spawner) requests a new node : ");
+// while (i < Register.Instance().getSize() && found == false) {
+// tmpNode = Register.Instance().getNodeAt(i);
+//
+// if (tmpNode.getAppliName() == null
+// && tmpNode.getAliveFlag() == true) {
+// tmpNode.setAppliName("notnull");
+// found = true;
+//
+// // enlever maintenant le noeud du register de
+// // SuperNode??????
+// // System.out.println("je remove le noeud car il va beater le spawner (getNewNode)");
+// // Register.Instance().removeNodeAt(i);
+// Register.Instance().removeNode(tmpNode);
+// // Register.Instance().removeNode(tmpNode.getIP());
+//
+// int index = SuperNodeListe.Instance().existSuperNode(
+// LocalHost.Instance().getIP());
+//
+// ((SuperNodeData) SuperNodeListe.Instance().getListe()
+// .elementAt(index)).setNbOfNodes(Register.Instance()
+// .getSize());
+// SuperNodeListe.Instance().forwardCountNode();
+// }
+// i++;
+// }
+// } catch (Exception e1) {
+// System.out.println("... plante en cherchant chez moi");
+// }
+//
+// // Register.Instance().viewAll();
+// SuperNodeListe.Instance().viewAll();
+//
+// try {
+// // si pas assez de noeud sur ce superNode,
+// if (found == false) {
+// System.out.println("pas de noeud dispo chez moi");
+//
+// SuperNodeListe snodeListTmp = SuperNodeListe.Instance().clone();
+// // while ( (found == false) && (passage <
+// // SuperNodeListe.Instance().getSize()) ) {
+// while ((found == false) && (snodeListTmp.getSize() > 0)) {
+// // System.out.println("passage = " + passage);
+// // d = SuperNodeListe.Instance().getBestSuperNodeData();
+// d = snodeListTmp.getBestSuperNodeData();
+// System.out.println("KKKKKKKKKKKKKKK ......... le best c "
+// + d.getIP() + " il en a " + d.getNbOfNodes());
+// if (d != null) {
+// snode_IP = d.getIP();
+// // si c moi, je passe au suivant
+// if (LocalHost.Instance().getIP().equals(snode_IP)) {
+// // passage++;
+// System.out
+// .println("OUUUUPS, c moi dc je tente un autre");
+// snodeListTmp.removeSuperNode(d);
+// continue;
+// }
+//
+// stub = d.getStub();
+//
+// if (stub != null) {
+// try {
+// Register tmpReg = stub.reserveLocalNodes(1);
+// if (tmpReg != null) {
+// // for (int j = 0; j < tmpReg.getSize();
+// // j++) {
+// System.out.println("IL EN A 1 !!!!!!!");
+// tmpNode = tmpReg.getNodeAt(0);
+// found = true;
+// System.out.println("le snode " + snode_IP
+// + " me reserve le noeud demande");
+// } else {
+// System.out
+// .println("MERDE !!! pas de noeud dispo sur "
+// + snode_IP);
+// System.out
+// .println("Je demande un noeud a un autre");
+// }
+//
+// } catch (Exception e) {
+// System.out
+// .println("le snode est mort, je demande les noeuds a un autre");
+// // remettre localement a 0 le nb de noeuds de ce
+// // superNode
+// SuperNodeListe.Instance().modifCountNode(
+// snode_IP, 0);
+// snodeListTmp = SuperNodeListe.Instance()
+// .clone();
+// }
+// }
+// } else {
+// System.out
+// .println("PUTAIN !!!! aucun noeud encore dispo sur les snode");
+// // passage = SuperNodeListe.Instance().getSize();
+// }
+// // passage++;
+// snodeListTmp.removeSuperNode(d);
+// }
+// }
+// } catch (Exception e2) {
+// System.out.println("plante en cherchant chez les autres");
+// }
+//
+// // si pas assez de noeud sur tous les superNode,
+// if (found == false) {
+// System.out.println("aucun noeud dispo dans le systeme");
+// } else {
+// System.out.println("je lui donne son noeud");
+// }
+//
+// return tmpNode;
+// }
+
+// public Register reserveLocalNodes(int nb) throws RemoteException {
+// Register reg = null;
+// System.out
+// .println("\nA superNode or a Spawner asks me for a register of "
+// + nb + " Daemons");
+//
+// int count = 0;
+// int i = 0;
+// Node tmpNode;
+//
+// if (Register.Instance().getSize() == 0) {
+// return new Register();
+// } else {
+// reg = new Register();
+//
+// while (i < Register.Instance().getSize() && count < nb) {
+// tmpNode = Register.Instance().getNodeAt(i);
+//
+// if (tmpNode.getAppliName() == null
+// && tmpNode.getAliveFlag() == true
+// /* && tmpNode.getNbOfBeats() > 10 */) {
+// // if node available, alive and not recently connected, then
+// tmpNode.setAppliName("notnull");
+// reg.addNode(tmpNode);
+//
+// // remove now the node from SuperNode or later ?
+// // System.out.println("I remove the Node from SuperNode beating it will now beat the Spawner");
+// Register.Instance().removeNode(tmpNode);
+// count++;
+// }
+// // increment counter only y Node NOT REMOVED
+// else {
+// i++;
+// }
+// }
+//
+// // Register.Instance().viewAll();
+// System.out.println("Number of Daemons reserved on me : " + count);
+// // Inform other SuperNodes that some Daemons initialy registered on
+// // me
+// // are no longer available because reserved for the spawner that
+// // invoked me
+// int index = SuperNodeListe.Instance().existSuperNode(
+// LocalHost.Instance().getIP());
+// ((SuperNodeData) SuperNodeListe.Instance().getListe().elementAt(
+// index)).setNbOfNodes(Register.Instance().getSize());
+//
+// if (count != 0) {
+// SuperNodeListe.Instance().forwardCountNode();
+// }
+//
+// System.out.println("I return " + count + " nodes");
+// // System.out.println( reg ) ;
+// // SuperNodeListe.Instance().viewAll() ;
+// return reg;
+// }
+// }
+
+ @Override
+ public void addGNode( GNode _g ) throws RemoteException
+ {
+ if( _g != null )
+ gnodes.add( _g ) ;
+ }
+
+
+ @Override
+ public void removeGNode( GNode _g ) throws RemoteException
+ {
+ if( _g != null )
+ {
+ for( int i = 0 ; i < gnodes.size() ; i++ )
+ {
+ if( ((Node)gnodes.get(i).getNode()).getId() == ((Node)_g.getNode()).getId() )
+ {
+ gnodes.remove( i ) ;
+ break ;
+ }
+ }
+ }
+
+ }
+
+
+ @Override
+ public void setMapping( Algo _al ) throws RemoteException
+ {
+ al = _al ;
+ }
+
+}
+
+/** ! **/
--- /dev/null
+package jaceP2P;
+
+public class LastSave implements java.io.Serializable {
+ private static final long serialVersionUID = 1L;
+
+ int lastSave;
+
+ public LastSave() {
+ lastSave = 0;
+ }
+
+ public int getLastSave() {
+ return lastSave;
+ }
+
+ public void increment() {
+ lastSave++;
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.util.Vector;
+
+public class ListeTask implements java.io.Serializable, java.lang.Cloneable {
+
+ private static final long serialVersionUID = 1L;
+
+ // attributes
+ protected Vector<TaskId> liste = new Vector<TaskId>();
+ protected TaskId task;
+
+ // constructor
+ public ListeTask() {
+ };
+
+ @SuppressWarnings("unchecked")
+ public synchronized ListeTask clone() {
+ ListeTask l = new ListeTask();
+ l.liste = (Vector<TaskId>) liste.clone();
+ l.task = task;
+ return (l);
+ }
+
+ // methods
+ public synchronized void addTask(TaskId tsk) {
+ liste.addElement(tsk);
+ }
+
+ public synchronized TaskId getTaskIdOfHostStub(JaceInterface hostStub) {
+ int is = -1;
+ if (liste.isEmpty()) {
+ return null;
+ } else {
+ is = existHostStub(hostStub);
+ if (is != -1) {
+ return (TaskId) liste.get(is);
+ } else {
+ System.out
+ .println("There is no task affected to this host stub !!!!! ");
+ return null;
+ }
+ }
+ }
+
+ public synchronized TaskId getTaskIdOfRank(int rank) {
+ int is = -1;
+ if (liste.isEmpty()) {
+ return null;
+ } else {
+ is = existRank(rank);
+ if (is != -1) {
+ return (TaskId) liste.get(is);
+ } else {
+ System.out.println("TaskId of rank=" + rank + " doesn't exist");
+ viewAll();
+ return null;
+ }
+ }
+ }
+
+ public synchronized int getSize() {
+ return liste.size();
+ }
+
+ private synchronized int existHostStub(JaceInterface host) {
+ int existe = -1;
+ int index = 0;
+ while ((existe == -1) && (index < liste.size())) {
+ if (host.equals(((TaskId) liste.get(index)).getHostStub())) {
+ existe = index;
+ } else
+ index++;
+ }
+ return existe;
+ }
+
+ private synchronized int existRank(int rank) {
+ int existe = -1;
+ int index = 0;
+ while ((existe == -1) && (index < liste.size())) {
+ if (((TaskId) liste.get(index)).getRank() == rank) {
+ existe = index;
+ } else
+ index++;
+ }
+ return existe;
+ }
+
+ public synchronized TaskId get(int i) {
+ return (TaskId) liste.get(i);
+ }
+
+ public synchronized void viewAll() {
+ TaskId maTaskId = null;
+ System.out.println("** Tasks **\n");
+ if (liste.size() == 0) {
+ System.out.println("No task");
+ } else {
+ for (int i = 0; i < liste.size(); i++) {
+ maTaskId = get(i);
+ System.out.println("\tTaskId " + i + ", rank : "
+ + maTaskId.getRank() + ", hostName : "
+ + maTaskId.getHostName());
+ }
+ }
+
+ System.out.println("***** *****\n");
+ }
+
+}
+
+/** ! **/
+
--- /dev/null
+package jaceP2P;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+//import java.io.*;
+
+public class Loader {
+
+ // constructor
+ public Loader() {
+ };
+
+ // methods
+
+ // decoupe toute la chaine en element ds un tableau:
+ // element 0 : le chemin
+ // element 1 : le nom de l'appli
+ public String[] parse(String name) {
+ String[] result = new String[2];
+ int i = 0;
+ for (i = name.length() - 1; i >= 0; i--) {
+ // System.out.println(i);
+ if (name.charAt(i) == '/')
+ break;
+ }
+ result[0] = name.substring(0, i + 1);
+ String tmp = name.substring(i + 1, name.length());
+ int index = tmp.indexOf(".");
+ if (index != -1) {
+ tmp = tmp.substring(0, index);
+ }
+ result[1] = tmp;
+ return result;
+ }
+
+ public Class<?> load(String args) {
+ Class<?> myClass = null;
+ try {
+ String urlString = args;
+ // parser toute la chaine en le path et le nom de l'appli: retour ds
+ // un tableau a 2 case appele ptc
+ String[] ptc = parse(urlString);
+ URL[] urls = new URL[1]; // creer un tablo de 1 case contenant des
+ // obj URL de java
+ urls[0] = new URL(ptc[0]); // le remplir avec 1 objet : l'URL creer
+ // a partir du path
+ URLClassLoader ucl = new URLClassLoader(urls);
+ myClass = ucl.loadClass(ptc[1]);
+ } catch (Exception e) {
+ System.err.println(args + " Class not found!");
+ }
+ return myClass;
+ }
+}
--- /dev/null
+package jaceP2P ;
+
+import java.net.InetAddress;
+
+public class LocalHost {
+
+ // attributes
+ private static LocalHost Instance;
+ private static String name;
+ private String IP;
+ private int port;
+ private int socketPort = 1097;
+ private static JaceInterface ref = null;
+ private static JaceSuperNodeInterface refSuperNode = null;
+ @SuppressWarnings("unused")
+ private static String superNodeIP = null;
+ private boolean startedThreads = false;
+
+ private LocalHost() {
+ try {
+ InetAddress ia = InetAddress.getLocalHost();
+ name = ia.getCanonicalHostName();
+ IP = ia.getHostAddress();
+ } catch (Exception e) {
+ System.err.println("Jace Error: Unknown Host: " + e);
+ }
+ }
+
+ public int getSocketPort() {
+ return socketPort;
+ }
+
+ public synchronized static LocalHost Instance() {
+ if (Instance == null) {
+ Instance = new LocalHost();
+ }
+ return Instance;
+ }
+
+ public void kill() {
+ Instance = null;
+ }
+
+ /*
+ * public void setSender(Thread t){ sender=t; } public Thread getSender(){
+ * return sender; }
+ */
+
+ public String resolve(String name) {
+ String ip = null;
+ try {
+ ip = InetAddress.getByName(name).getHostAddress();
+ // System.out.println("resolve : "+ip);
+ } catch (java.net.UnknownHostException e) {
+ System.err.println("Cannot find IP address of " + name + " :" + e);
+ }
+ return ip;
+ }
+
+ public synchronized void setPort(int portOfComm) {
+ port = portOfComm;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public synchronized String getName() {
+ return name;
+ }
+
+ public synchronized String getIP() {
+ return IP;
+ }
+
+ public synchronized void setStub(JaceInterface stub) {
+ ref = stub;
+ }
+
+ public synchronized void setSuperNodeStub(JaceSuperNodeInterface stub) {
+ refSuperNode = stub;
+ }
+
+ public JaceSuperNodeInterface getSuperNodeStub() {
+ return refSuperNode;
+ }
+
+ public synchronized void setSuperNodeIP(String IP) {
+ superNodeIP = IP;
+ }
+
+ public synchronized JaceInterface getStub() {
+ return ref;
+ }
+
+ public synchronized void setStartedThreads(boolean b) {
+ startedThreads = b;
+ }
+
+ public synchronized boolean getStartedThreads() {
+ return startedThreads;
+ }
+}
+
+/** ! **/
--- /dev/null
+package jaceP2P;
+
+public class Message implements java.io.Serializable, Cloneable {
+
+ private static final long serialVersionUID = 1L;
+
+ // attributs
+ private int tag = -1;
+ private TaskId Src = null;
+ private TaskId Dest = null;
+ private Object obj;
+ private int timeStep;
+ private int src_iteration;
+ private int src_tag = -1;
+ //private Integer srcTag;
+ private double error = 0;
+
+ // TODO : add appliname too in Message object
+
+ // constructors
+ public Message(Object obj) {
+ this.obj = obj;
+ }
+
+ public Message() {
+ }
+
+ public void setParam(Object buffer, TaskId src, TaskId dest, int tag,
+ int time, int iter, int src_tag, double erreur_locale) {
+ this.obj = buffer;
+ this.Src = src;
+ this.Dest = dest;
+ this.tag = tag;
+ this.timeStep = time;
+ this.src_iteration = iter;
+ this.src_tag = src_tag;
+ error = erreur_locale;
+ }
+
+ // methods
+ public Object getData() {
+ return obj;
+ }
+
+ public double getLocalError() {
+ return error;
+ }
+
+ public int getSrc_iteration() {
+ return src_iteration;
+ }
+
+ public int getSrc_tag() {
+ return src_tag;
+ }
+
+ public void setTag(int val) {
+ this.tag = val;
+ }
+
+ public int getTag() {
+ return tag;
+ }
+
+ public TaskId getSender() {
+ return Src;
+ }
+
+ public TaskId getReceiver() {
+ return Dest;
+ }
+
+ public int getTimeStep() {
+ return timeStep;
+ }
+
+ public Object clone() {
+ Message tmp = new Message();
+ tmp.setParam(this.obj, this.Src, this.Dest, this.tag, this.timeStep,
+ this.src_iteration, this.src_tag, this.error);
+ return (Object) tmp;
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class MsgChrono implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // attributes
+ long ms = 0;
+
+ // constructors
+ public MsgChrono() {
+ }
+
+ // methods
+ public void start() {
+ ms = System.currentTimeMillis();
+ }
+
+ public long getValue() {
+ long result = System.currentTimeMillis();
+ return result - ms;
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.util.Vector;
+
+public class MsgQueue {
+
+ public static MsgQueue Instance;
+ // attributes
+ private Vector<Message> liste;
+
+ // constructors
+ public MsgQueue() {
+ liste = new Vector<Message>();
+ }
+
+ public synchronized static MsgQueue Instance() {
+ if (Instance == null) {
+ Instance = new MsgQueue();
+ }
+ return Instance;
+ }
+
+ // retourne l'index d'un Message de meme tag
+ // retourne -1 si un tel Message existe pas
+ private synchronized int exist(int tag, String appli, int time) {
+ int existe = -1;
+ int index = 0;
+ while ((existe == -1) && (index < liste.size())) {
+ if ((tag == ((Message) liste.get(index)).getTag())
+ && (appli.equals(((Message) liste.get(index)).getSender()
+ .getAppliName()))
+ && (time == ((Message) liste.get(index)).getTimeStep())
+
+ ) {
+ existe = index;
+ } else
+ index++;
+ }
+ return existe;
+ }
+
+ // retourne l'index d'un Message de meme tag ET meme sender ke "msg"
+ // retourne -1 si un tel Message existe pas
+ private synchronized int exist(int sender, int tag, String appli, int time) {
+ int existe = -1;
+ int index = 0;
+ while ((existe == -1) && (index < liste.size())) {
+ if ((tag == ((Message) liste.get(index)).getTag())
+ && (sender == ((Message) liste.get(index)).getSender()
+ .getRank())
+ && (appli.equals(((Message) liste.get(index)).getSender()
+ .getAppliName()))
+ && (time == ((Message) liste.get(index)).getTimeStep())
+
+ ) {
+ existe = index;
+ } else
+ index++;
+ }
+ return existe;
+ }
+
+ // retourne l'index d'un Message de meme tag ET meme sender ke "msg"
+ // retourne -1 si un tel Message existe pas
+ private synchronized int exist(Message msg) {
+ int existe = -1;
+ int index = 0;
+ while ((existe == -1) && (index < liste.size())) {
+ if ((msg.getTag() == ((Message) liste.get(index)).getTag()) // meme
+ // tag
+ && (msg.getSender().getRank() == ((Message) liste
+ .get(index)).getSender().getRank()) // meme tache
+ // envoyeur
+ && ((msg.getSender().getAppliName()).equals(JaceSession
+ .Instance().getTaskObject().getId().getAppliName())) // meme
+ // appli
+ && (msg.getTimeStep() == JaceSession.Instance()
+ .getTaskObject().getTimeStep())) { // meme timeStep
+ existe = index;
+ } else
+ index++;
+ }
+ return existe;
+ }
+
+ // methods
+ public void purge() {
+ Instance = null;
+ liste.clear();
+ }
+
+ // ajoute un msg ds file
+ public synchronized void add(Message msg) {
+ int is = -1;
+ is = exist(msg);
+
+ // si existe deja 1 Message de meme tag ET meme envoyeur, on l'ecrase
+ if (is != -1) {
+ // liste.remove(is);
+ liste.setElementAt(msg, is);
+ }
+
+ // si existe pas de Message de meme tag ET meme envoyeur, on l'ajoute
+ else {
+ liste.add(msg);
+ }
+
+ // ensuite on reveille le thread de calcul en attente eventuelle sur le
+ // MsgQueue
+ /*
+ * try { //notifyAll(); } catch(Exception e){};
+ */
+ }
+
+ // retourne Message de tag "tag"
+ public synchronized Message get(int id, int tag) {
+ int is = -1;
+ if (liste.isEmpty()) {
+ return null;
+ } else {
+ // System.out.println("chercher "+tag+"---"+id);
+ is = exist(id, tag, JaceSession.Instance().getTaskObject().getId()
+ .getAppliName(), JaceSession.Instance().getTaskObject()
+ .getTimeStep());
+ if (is != -1) {
+ // System.out.println("exitse ds la FA");
+ Message tmp = (Message) liste.get(is);
+ liste.remove(is);
+ // System.out.println(tmp);
+ // System.out.println("MSGQUEUE : j'ai recup " + tmp +
+ // " de tag " + tmp.getTag() + " de " +
+ // tmp.getSender().getHostIP());
+ return tmp;
+ } else {
+ // System.out.println("existe po :(((( "+tag+"---"+id);
+ return null;
+ }
+ }
+ }
+
+ public synchronized Message get(int tag) {
+ int is = -1;
+ // System.out.println("//////////kk viens de chercher un message");
+ if (liste.isEmpty()) {
+ return null;
+ } else {
+ is = exist(tag, JaceSession.Instance().getTaskObject().getId()
+ .getAppliName(), JaceSession.Instance().getTaskObject()
+ .getTimeStep());
+ if (is != -1) {
+ Message tmp = (Message) liste.get(is);
+ liste.remove(is);
+ // System.out.println("MSGQUEUE : j'ai recup " + tmp +
+ // " de tag " + tmp.getTag() + " de " +
+ // tmp.getSender().getHostIP());
+ return tmp;
+ } else
+ return null;
+ }
+ }
+
+ // les reception bloqunte
+ /*
+ * public synchronized Message getBl(int id) { int is = -1; //
+ * System.out.println("Taille de la file: "+liste.size()); if
+ * (liste.isEmpty()) { try { wait(); } catch(Exception e){};} while ((is =
+ * exist
+ * (id,JaceSession.Instance().getTaskObject().getId().getAppliName(),JaceSession
+ * .Instance().getTaskObject().getTimeStep())) == -1){ try{ wait(); }
+ * catch(Exception e){}; try { //notifyAll(); } catch(Exception e){}; }
+ * Message tmp; tmp = (Message)liste.get(is); liste.remove(is);
+ * //System.out.println("MSGQUEUE : j'ai recup " + tmp + " de tag " +
+ * tmp.getTag() + " de " + tmp.getSender().getHostIP()); //
+ * System.out.println("Taille de la file: "+liste.size()); return tmp; }
+ *
+ * public synchronized Message getBlTag(int id) { int is = -1; //
+ * System.out.println("Taille de la file: "+liste.size()); if
+ * (liste.isEmpty()) { try { wait(); } catch(Exception e){}; }
+ *
+ * while ((is =
+ * exist(id,JaceSession.Instance().getTaskObject().getId().getAppliName
+ * (),JaceSession.Instance().getTaskObject().getTimeStep())) == -1) { try{
+ * wait(); } catch(Exception e){}; try { //notifyAll(); } catch(Exception
+ * e){}; } Message tmp; tmp = (Message)liste.get(is); liste.remove(is);
+ * //System.out.println("MSGQUEUE : j'ai recup " + tmp + " de tag " +
+ * tmp.getTag() + " de " + tmp.getSender().getHostIP()); //
+ * System.out.println("Taille de la file: "+liste.size()); return tmp; }
+ *
+ * public synchronized Message getBl(int id,int tag) { int is = -1;
+ * //System.out.println("je cherche "+id+" "+tag);
+ * //System.out.println("Taille de la file: "+liste.size()); if
+ * (liste.isEmpty()) { try { wait(); }catch(Exception e){}; } while ((is =
+ * exist
+ * (id,tag,JaceSession.Instance().getTaskObject().getId().getAppliName()
+ * ,JaceSession.Instance().getTaskObject().getTimeStep())) == -1) { try {
+ * wait(); } catch(Exception e){}; try { //notifyAll(); } catch(Exception
+ * e){}; } Message tmp; tmp = (Message)liste.get(is); liste.remove(is);
+ * //System.out.println("MSGQUEUE : j'ai recup " + tmp + " de tag " +
+ * tmp.getTag() + " de " + tmp.getSender().getHostIP()); return tmp; }
+ */
+
+ public synchronized int getSize() {
+ return liste.size();
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+public class Node implements java.io.Serializable {
+ private static final long serialVersionUID = 1L;
+
+ // attributes
+ private String ip;
+ private String name = null;
+ private String appliName = null;
+ private boolean aliveFlag;
+ private JaceInterface stub;
+ private long aliveTime = System.currentTimeMillis();
+ @SuppressWarnings("unused")
+ private int port;
+ private int nbOfBeats = 0;
+ private Object o = null;
+ private long id = -1;
+
+ public Node(JaceInterface s) {
+ stub = s;
+ }
+
+ public void setOutputStream(OutputStream out) {
+ this.o = out;
+ }
+
+ public void setId( long _id )
+ {
+ id = _id ;
+ }
+
+ public long getId()
+ {
+ return id ;
+ }
+
+ public OutputStream getOutputStream() {
+ return (ObjectOutputStream) o;
+ }
+
+ public synchronized void setName(String name) {
+ this.name = name;
+ }
+
+ public synchronized void setIP(String adr) {
+ this.ip = adr;
+ }
+
+ public synchronized void setAliveFlag(boolean val) {
+ aliveFlag = val;
+ }
+
+ public synchronized void setAliveTime() {
+ aliveTime = System.currentTimeMillis();
+ }
+
+ public synchronized long getAliveTime() {
+ return aliveTime;
+ }
+
+ public synchronized void setPort(int portOfComm) {
+ port = portOfComm;
+ }
+
+ public synchronized void setAppliName(String appli) {
+ this.appliName = appli;
+ }
+
+ public synchronized String getAppliName() {
+ return appliName;
+ }
+
+ public synchronized void setNbOfBeats(int nb) {
+ nbOfBeats = nb;
+ }
+
+ public synchronized JaceInterface getStub() {
+ return stub;
+ }
+
+ public synchronized String getName() {
+ return name;
+ }
+
+ public synchronized void incrementNbOfBeats() {
+ nbOfBeats++;
+ if (nbOfBeats > 10000000) {
+ nbOfBeats = 100;
+ }
+ }
+
+ public synchronized boolean getAliveFlag() {
+ return aliveFlag;
+ }
+
+ public synchronized int getNbOfBeats() {
+ return nbOfBeats;
+ }
+
+ public synchronized String getIP() {
+ return ip;
+ }
+}
+
+/** ! **/
--- /dev/null
+package jaceP2P;
+
+import java.util.Vector;
+
+
+public class Register implements java.io.Serializable, java.lang.Cloneable {
+
+ private static final long serialVersionUID = 1L;
+
+ // a voir si on le met pas ds autre objet
+ final int MAX_COUNT_NOT_ALIVE = 3;
+
+ // attributes
+ public static Register Instance;
+ private String appliName; // name of the appli
+ private Vector<Node> liste; // list of the nodes of the Register
+ // private ListeTask listeOfTasks = null; //liste of the TaskId of the appli
+ // private int version=0;
+ private String[] params; // params of the appli
+ private int numBackupNeighbors = 3;
+ private int numOfTasks;
+ private JaceSpawnerInterface spawnerStub = null;
+ private ListeTask listeOfTasks = null; // liste of the TaskId of the appli
+
+ // constructors
+
+ public Register() {
+ liste = new Vector<Node>();
+ }
+
+ public synchronized static Register Instance() {
+ if (Instance == null) {
+ Instance = new Register();
+ }
+ return Instance;
+ }
+
+ public synchronized String getAppliName() {
+ return appliName;
+ }
+
+ public synchronized int existNode(Node node) {
+ if (node == null) {
+ System.out
+ .println("!!!!!!!!!!!!!!!!!!!!!!!!! node est null ds existNode");
+ }
+ int existe = -1;
+ int index = 0;
+ while ((existe == -1) && (index < liste.size())) {
+ if (node.equals((Node) liste.get(index))) {
+ existe = index;
+ } else
+ index++;
+ }
+ return existe;
+ }
+
+ /*
+ * public synchronized int getVersion(){ return version; }
+ *
+ * public void setVersion(int version){ this.version=version; }
+ */
+
+ public synchronized void addNode(Node host) {
+ int is = -1;
+ if (host == null) {
+ System.out
+ .println("ds Register.addNode : host.getIP() == null !!!!!!!!!!!!!!!");
+ }
+ is = existNodeOfStub(host.getStub());
+ if (is != -1) {
+ System.out.println("This node exists yet, I replace it");
+ // System.out.println("MERDE !!");
+ // System.out.println("MERDE !!");
+ // System.out.println("MERDE !!");
+ // System.out.println("MERDE !!");
+ // System.out.println("MERDE !!");
+ // System.out.println("MERDE !!");
+ // System.out.println("MERDE !!");
+ liste.setElementAt(host, is);
+ } else {
+ liste.add(host);
+ }
+ }
+
+ public synchronized int existNodeOfStub(JaceInterface stub) {
+ // System.out.println("remote " + stub + "\n\n");
+ if (stub == null) {
+ System.out
+ .println("!!!!!!!!!!!!!!!!!!!!!!!!! stub = NULL ds existNodeOfStub = ");
+ }
+ int existe = -1;
+ int index = 0;
+ while ((existe == -1) && (index < liste.size())) {
+
+ if (stub.equals(((Node) liste.get(index)).getStub())) {
+ existe = index;
+ } else
+ index++;
+ }
+ return existe;
+ }
+
+ public synchronized Node getNodeOfStub(JaceInterface stub) {
+ int is = -1;
+ if (liste.isEmpty()) {
+ return null;
+ } else {
+ if (stub == null) {
+ System.out
+ .println("ds Register.getNodeOfStub : stub == null !!!!!!!!!!!!!!!");
+ }
+ is = existNodeOfStub(stub);
+ if (is != -1) {
+ return (Node) liste.get(is);
+ } else {
+ // System.out.println("stub : ce noeud existe poooooooooooo");
+ return null;
+ }
+ }
+ }
+
+ public synchronized Node getNodeOfName(String name) {
+ int is = -1;
+ if (liste.isEmpty()) {
+ return null;
+ } else {
+ if (name == null)
+ System.out
+ .println("ds Register.getNodeOfName : name == null !!!!!!!!!!!!!!!");
+ int index = 0;
+ while ((is == -1) && (index < liste.size())) {
+
+ if (name.equals(((Node) liste.get(index)).getName())) {
+ is = index;
+ } else
+ index++;
+ }
+
+ if (is != -1) {
+ return (Node) liste.get(is);
+ } else {
+ // System.out.println("stub : ce noeud existe poooooooooooo");
+ return null;
+ }
+
+ }
+ }
+
+ public synchronized Node getNodeAt(int index) {
+ return (Node) liste.get(index);
+ }
+
+ public synchronized boolean removeNode(Node n) {
+ return (liste.remove(n));
+ }
+
+ public synchronized boolean removeNodeOfName(String s) {
+ boolean removed = false;
+ int i = 0;
+ // System.out.println("begin remove");
+ // viewAll();
+ while (i < liste.size() && !removed) {
+ // System.out.println("i="+i+" s="+s);
+ if (((Node) liste.get(i)).getName().equals(s)) {
+ liste.remove(i);
+ removed = true;
+ } else
+ i++;
+ }
+ // System.out.println("end remove");
+ return removed;
+ }
+
+ public synchronized int getSize() {
+ return liste.size();
+ }
+
+ public synchronized Vector<Node> getListe() {
+ return liste;
+ }
+
+ public synchronized void setParams(String[] myParams) {
+ params = myParams;
+ }
+
+ public synchronized void setAppliName(String name) {
+ appliName = name;
+ }
+
+ public synchronized void setSpawnerStub(JaceSpawnerInterface ref) {
+ spawnerStub = ref;
+ }
+
+ public synchronized void replaceBy(Register newReg) {
+ Instance = null;
+ liste.clear();
+ Instance = newReg;
+ }
+
+ public synchronized void setListeOfTasks(ListeTask myListe) {
+ listeOfTasks = myListe;
+ }
+
+ public synchronized ListeTask getListeOfTasks() {
+ return listeOfTasks;
+ }
+
+ public synchronized int getNumBackupNeighbors() {
+ return numBackupNeighbors;
+ }
+
+ public void setNumBackupNeighbors(int i) {
+ numBackupNeighbors = i;
+ }
+
+ public synchronized JaceSpawnerInterface getSpawnerStub() {
+ return spawnerStub;
+ }
+
+ public synchronized void purge() {
+ for (int i = 0; i < liste.size(); i++)
+ if (((Node) liste.elementAt(i)).getOutputStream() != null) {
+ try {
+ ((Node) liste.elementAt(i)).getOutputStream().close();
+ } catch (Exception e) {
+ System.out.println("unable to close outputStream :" + e);
+ }
+ }
+ liste.clear();
+ Instance = null;
+ }
+
+ public synchronized String[] getParams() {
+ return params;
+ }
+
+ public synchronized Vector<Node> getListOfNodes() {
+ return liste;
+ }
+
+ public void setInstance() {
+ Instance = this;
+ }
+
+ public void setNbOfTasks(int nbOfTasks) {
+ numOfTasks = nbOfTasks;
+ }
+
+ public int getNbOfTasks() {
+ return numOfTasks;
+ }
+
+ // affiche la liste des nom (ou IP si pas de nom) des machine du Register
+ public synchronized void viewAll() {
+ String aff = "Nb of Daemons registered: " + getSize();
+ Node noeud = null;
+ String inter = "\n\t";
+ String count = "";
+ if (liste.isEmpty()) {
+ System.out.println("My Register is empty !!!!!");
+ } else {
+ for (int i = 0; i < liste.size(); i++) {
+ noeud = getNodeAt(i);
+ count = (i + 1) + " : ";
+ if (i > 0) {
+ // inter = "\n";
+ }
+ if (noeud.getName() != null) {
+ aff += inter + count + noeud.getName() + /*
+ * " avec appli : "
+ * +
+ * noeud.getAppliName
+ * () +
+ */", alive : "
+ + noeud.getAliveFlag() + ", count : "
+ + noeud.getNbOfBeats();
+ } else {
+ aff += inter + count + noeud.getIP() + " "
+ + noeud.getName() + " with appli : "
+ + noeud.getAppliName() + ", alive : "
+ + noeud.getAliveFlag();
+
+ }
+ /*
+ * if (noeud.getWorkerStub() != null) { aff +=
+ * ", stub = PAS null"; } else { aff += ", stub = NULL"; }
+ */
+ }
+ aff += "";
+ System.out.println(aff);
+ }
+ System.out.println("\n");
+ }
+
+}
--- /dev/null
+package jaceP2P;
+
+
+public class RunningApplication /* implements java.io.Serializable */{
+
+ // attributes
+ public static RunningApplication Instance;
+
+ private String name; // nom de l'application
+ @SuppressWarnings("unused")
+ private int nbTasks; // nb de taches de l'appli
+ private MsgChrono chrono = new MsgChrono(); // chrono global de l'appli
+ private boolean running = false;
+
+ private int numberOfDisconnections = 0; // nb de deconnection de demons
+ // depuis debut appli
+ private int numberOfSpawnerDisconnections = 0; // nb de deconnection de
+ // spawners depuis debut
+ // appli
+ private int numberOfCouilles = 0; // nb de deconnection depuis debut appli
+ private int numberOfSuicides = 0; // nb de deconnection depuis debut appli
+
+ private RunningApplication() {
+ }
+
+ public static RunningApplication Instance() {
+ if (Instance == null) {
+ Instance = new RunningApplication();
+ }
+ return Instance;
+ }
+
+ public synchronized MsgChrono getChrono() {
+ return chrono;
+ }
+
+ // modificateurs d'attribut
+ public synchronized void setName(String appliName) {
+ name = appliName;
+ }
+
+ public synchronized String getName() {
+ return name;
+ }
+
+ public synchronized int getNumberOfCouilles() {
+ return numberOfCouilles;
+ }
+
+ public synchronized void setNbTasks(int nb) {
+ nbTasks = nb;
+ }
+
+ public void setRunning(boolean value) {
+ running = value;
+ }
+
+ public synchronized int getNumberOfSpawnerDisconnections() {
+ return numberOfSpawnerDisconnections;
+ }
+
+ public synchronized int getNumberOfDisconnections() {
+ return numberOfDisconnections;
+ }
+
+ public synchronized void purge() {
+ Instance = null;
+
+ running = false;
+ try {
+ Thread.sleep(5000);
+ } catch (Exception e) {
+ }
+ Register.Instance().purge();
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public synchronized void incrementNumberOfDisconnections() {
+ numberOfDisconnections++;
+ }
+
+ public synchronized void incrementNumberOfSpawnerDisconnections() {
+ numberOfSpawnerDisconnections++;
+ }
+
+ public synchronized void setNumberOfDisconnections(int nb) {
+ numberOfDisconnections = nb;
+ }
+
+ public synchronized void setNumberOfSpawnerDisconnections(int nb) {
+ numberOfSpawnerDisconnections = nb;
+ }
+
+ public synchronized void incrementNumberOfCouille() {
+ numberOfCouilles++;
+ }
+
+ public synchronized void incrementNumberOfSuicides() {
+ numberOfSuicides++;
+ }
+
+}
--- /dev/null
+package jaceP2P;
+
+import java.rmi.RemoteException;
+
+
+public class ScanThread extends Thread {
+ public static ScanThread Instance;
+ public boolean scanning = false;
+ public int beat;
+ public int timeBeforeKill;
+ public int myRank;
+ public int neighborRank;
+ public boolean running = false;
+
+ private ScanThread() {
+ beat = HeartBeatThread.Instance().getHeartTime();
+ timeBeforeKill = beat * 3;
+ running = true;
+
+ }
+
+ public static ScanThread Instance() {
+ if (Instance == null) {
+ System.out.println("Creating new ScanThread ");
+ Instance = new ScanThread();
+ }
+ return Instance;
+ }
+
+ public void setScanning(boolean bool) {
+// System.out.println("in setScanning !!!!!");
+ scanning = bool;
+ }
+
+ public void kill() {
+ Instance = null;
+ running = false;
+ }
+
+ @SuppressWarnings("static-access")
+ public void run() {
+ System.out.println("Start ScanThread.......");
+ while (running) {
+ // System.out.println("ScanThread alive.......");
+ if (scanning == false)
+ try {
+ this.wait();
+ } catch (Exception e) {
+ }
+ else {
+ // System.out.println("test Neighbor: "+neighborRank);
+ testNeighbor();
+ try {
+ this.sleep(beat);
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("static-access")
+ public void testNeighbor() {
+ Node host;
+ //Node tmpNode;
+ long workerTime;
+ long currentTime;
+
+ //int restempo;
+ //int nb = 0;
+ //int nbC = 0;
+ //boolean changed = false;
+ try {
+ TaskId id = Register.Instance().getListeOfTasks()
+ .getTaskIdOfHostStub(LocalHost.Instance().getStub());
+ myRank = id.getRank();
+ if (myRank == 0)
+ neighborRank = Register.Instance().getNbOfTasks() - 1;
+ else
+ neighborRank = myRank - 1;
+ try {
+ // TaskId myTaskId = null;
+ TaskId neighborTask = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(neighborRank);
+ JaceInterface jaceStub = neighborTask.getHostStub();
+ host = Register.Instance().getNodeOfStub(jaceStub);
+ // if (host.getAliveFlag() == true) {
+ workerTime = host.getAliveTime();
+ currentTime = System.currentTimeMillis();
+ if (currentTime - workerTime > timeBeforeKill) {
+ // Calendar cal = new GregorianCalendar();
+ // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
+ // System.out.println("currentTime="+currentTime+" workerTime="+workerTime+" timeBeforeKill="+timeBeforeKill);
+
+ host.setAliveFlag(false);
+ host.setAppliName(null);
+
+ // String ip = LocalHost.Instance().resolve(host.getName());
+ boolean reponse;
+ boolean dead = false;
+ try {
+ Node noeud = Register.Instance().getNodeOfStub(
+ host.getStub());
+
+ reponse = host.getStub().ping();
+ if (reponse == true) {
+ noeud.setAliveFlag(true);
+ noeud.setAliveTime();
+ noeud.getAliveTime();
+ // System.out.println("the previous node is still alive");
+ } else {
+ dead = true;
+ //changed = true;
+ host.getStub().suicide("Not responding");
+ System.out.println("\n\nNot responding node "
+ + host.getName() + " (" + host.getIP()
+ + ") size : "
+ + Register.Instance().getNbOfTasks());
+
+ }
+ } catch (Exception e) {
+ dead = true;
+ //changed = true;
+
+ System.out.println("\n\nDisconnection of "
+ + host.getName() + " (" + host.getIP()
+ + ") size : "
+ + Register.Instance().getNbOfTasks());
+ }
+
+ if (dead == true) {
+ JaceSpawnerInterface spawnerStub = Register.Instance()
+ .getSpawnerStub();
+ if (spawnerStub == null)
+ System.err.println("SpawnerStub is null");
+ try {
+ spawnerStub.signalDeadNode(jaceStub, neighborRank);
+
+ this.sleep(10 * beat);
+
+ } catch (RemoteException e1) {
+ System.err.println("Couldn't reach the spawner: "
+ + e1);
+ }
+ }
+ }
+ // }
+ } catch (Exception e) {
+ System.err.println("Error in testNeighbor: " + e);
+ }
+ } catch (Exception e) {
+ if (Register.Instance().getListeOfTasks() == null)
+ System.err.println("Task list is null: " + e);
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+
+
+
+public class ScanThreadSpawner extends Thread {
+ public static ScanThreadSpawner Instance;
+ public boolean scanning = false;
+ public int beat;
+ public int timeBeforeKill;
+ public long aliveTime = System.currentTimeMillis();
+ public int myRank;
+ public int neighborRank;
+ public boolean running = false;
+ public JaceSpawnerInterface previousSpawner = null;
+
+ private ScanThreadSpawner() {
+ // beat=HeartBeatThread.Instance().getHeartTime();
+ // timeBeforeKill=beat*3;
+ running = true;
+
+ }
+
+ public void setHeartTime(int beat) {
+ this.beat = beat;
+ timeBeforeKill = beat * 4;
+ }
+
+ public void setServer(JaceSpawnerInterface stub) {
+ previousSpawner = stub;
+ aliveTime = System.currentTimeMillis();
+ }
+
+ public static ScanThreadSpawner Instance() {
+ if (Instance == null) {
+ System.out.println("creating new ScanThreadSpawner ");
+ Instance = new ScanThreadSpawner();
+ }
+ return Instance;
+ }
+
+ public void kill() {
+ Instance = null;
+ running = false;
+ }
+
+ public void setAliveTime() {
+ aliveTime = System.currentTimeMillis();
+ // System.out.println("spawner is pinging me at "+aliveTime);
+ }
+
+ @SuppressWarnings("static-access")
+ public void run() {
+ System.out.println("start ScanThreadSpawner.......");
+ while (running) {
+ // System.out.println("ScanThread alive.......");
+ try {
+ this.sleep(beat);
+ } catch (Exception e) {
+ }
+ // System.out.println("test Neighbor: "+neighborRank);
+ testNeighbor();
+
+ }
+
+ }
+
+ public void testNeighbor() {
+ //Node host;
+ //Node tmpNode;
+ //long workerTime;
+ long currentTime;
+
+ //int restempo;
+ //int nb = 0;
+ //int nbC = 0;
+ //boolean changed = false;
+ try {
+ currentTime = System.currentTimeMillis();
+ if (currentTime - aliveTime > timeBeforeKill
+ && Register.Instance().getSpawnerStub().getFinished() == false) {
+ Calendar cal = new GregorianCalendar();
+ System.out.println("at time=" + cal.get(Calendar.MINUTE) + ":"
+ + cal.get(Calendar.SECOND));
+ System.out.println("neighbor maybe Dead, "
+ + (currentTime - aliveTime) + " " + currentTime + " "
+ + aliveTime + " result "
+ + (currentTime - aliveTime > timeBeforeKill)
+ + " timebk=" + timeBeforeKill);
+ boolean response = false;
+ try {
+ response = previousSpawner.ping();
+ System.out.println("previous spawner is still alive");
+ } catch (Exception e) {
+ System.out
+ .println("the previous spawner is officially dead "
+ + e);
+ }
+ if (response == false) {
+ JaceSpawner.Instance().getNewSpawner(previousSpawner);
+ sleep(4 * beat);
+ }
+ }
+ } catch (Exception e) {
+ System.out.println("error in test neighbor: " + e);
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+
+public class ScanThreadSuperNode extends Thread {
+ public static ScanThreadSuperNode Instance = null;
+ // public boolean scanning=false;
+ public int beat;
+ public int timeBeforeKill;
+ public int myRank;
+ public int neighborRank;
+ private long aliveTime = System.currentTimeMillis();
+ private boolean token = false;
+
+ // public boolean running=false;
+ private ScanThreadSuperNode() {
+ beat = HeartBeatSNode.Instance().getHeartTime();
+ timeBeforeKill = beat * 4;
+ System.out.println("timebfk=" + timeBeforeKill);
+ // running=true;
+
+ }
+
+ public static ScanThreadSuperNode Instance() {
+ if (Instance == null) {
+ System.out.println("creating new ScanThreadSuperNode ");
+ Instance = new ScanThreadSuperNode();
+ }
+ return Instance;
+ }
+
+ public synchronized void setAliveTime() {
+ aliveTime = System.currentTimeMillis();
+ // System.out.println("alive Time="+aliveTime);
+ }
+
+ public synchronized void setToken(boolean bool) {
+ token = bool;
+ }
+
+ // public void setScanning(boolean bool){
+ // System.out.println("in setScanning !!!!!");
+ // scanning=bool;
+ // }
+ // public void kill(){
+ // Instance=null;
+ // running=false;
+ // }
+ @SuppressWarnings("static-access")
+ public void run() {
+ System.out.println("start ScanThread for SuperNode.......");
+ while (true) {
+ // System.out.println("ScanThread alive.......");
+ /*
+ * if(scanning==false) try{ this.wait(); }catch(Exception e){} else{
+ */
+ // System.out.println("test Neighbor: "+neighborRank);
+ testNeighbor();
+ try {
+ this.sleep(beat);
+ } catch (Exception e) {
+ }
+
+ }
+ }
+
+ public void testNeighbor() {
+ //Node host;
+ //Node tmpNode;
+ //long workerTime;
+ long currentTime;
+
+ //int restempo;
+ //int nb = 0;
+ //int nbC = 0;
+ //boolean changed = false;
+ try {
+
+ currentTime = System.currentTimeMillis();
+ if (currentTime - aliveTime > timeBeforeKill) {
+ System.out.println("neighbor maybe Dead, "
+ + (currentTime - aliveTime) + " " + currentTime + " "
+ + aliveTime + " result "
+ + (currentTime - aliveTime > timeBeforeKill)
+ + " timebk=" + timeBeforeKill);
+ int index = SuperNodeListe.Instance().existSuperNode(
+ LocalHost.Instance().getIP());
+ if (index > 0)
+ index--;
+ else
+ index = SuperNodeListe.Instance().getListe().size() - 1;
+ SuperNodeData data;
+ synchronized (SuperNodeListe.Instance()) {
+ System.out.println("removing "
+ + ((SuperNodeData) SuperNodeListe.Instance()
+ .getListe().elementAt(index)).getIP());
+ data = (SuperNodeData) SuperNodeListe.Instance().getListe()
+ .elementAt(index);
+ SuperNodeListe.Instance().removeSuperNode(
+ ((SuperNodeData) SuperNodeListe.Instance()
+ .getListe().elementAt(index)));
+
+ }
+ // diffusing the message concerning a dead superNode
+ for (int i = 0; i < SuperNodeListe.Instance().getListe().size(); i++)
+ try {
+ if (!((SuperNodeData) SuperNodeListe.Instance()
+ .getListe().elementAt(i)).getIP().equals(
+ LocalHost.Instance().getIP()))
+ ((SuperNodeData) SuperNodeListe.Instance()
+ .getListe().elementAt(i)).getStub()
+ .removeSuperNode(data);
+ } catch (Exception e2) {
+ System.out
+ .println("error diffusing the message concerning a dead superNode: "
+ + e2);
+ }
+ if (token == true) {
+ // synchronized(TokenThread.Instance()){
+ TokenThread.Instance().setToken();
+ // TokenThread.Instance().notify();
+ // }
+ }
+ index = SuperNodeListe.Instance().existSuperNode(
+ LocalHost.Instance().getIP());
+ if (index > 0)
+ index--;
+ else
+ index = SuperNodeListe.Instance().getListe().size() - 1;
+ ((SuperNodeData) SuperNodeListe.Instance().getListe()
+ .elementAt(index)).getStub().updateHeart(
+ LocalHost.Instance().getSuperNodeStub());
+
+ }
+ } catch (Exception e) {
+ System.out.println("error in testNeighbor: " + e);
+ }
+ }
+
+}
--- /dev/null
+package jaceP2P;
+
+public class SendVerdictThread extends Thread {
+ int myId;
+ int verifNum;
+ int sendId;
+ boolean verdict;
+
+ public SendVerdictThread(int myId, int sendId, int verifNum, boolean verdict) {
+ this.myId = myId;
+ this.verifNum = verifNum;
+ this.sendId = sendId;
+ this.verdict = verdict;
+ }
+
+ public void run() {
+ while (JaceSession.Instance().getTaskObject().action
+ .equals("sendVerdict")) {
+ try {
+ JaceSession.Instance().getTaskObject().broadcastVerdict(myId,
+ sendId, verifNum, verdict);
+ JaceSession.Instance().getTaskObject().broadcastTasks(3);
+ } catch (Exception e) {
+ System.out
+ .println("le message de verdict n'est pas recu :" + e);
+ try {
+ Thread.sleep(500);
+ } catch (Exception ex) {
+ }
+ }
+
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class SendVerifThread extends Thread {
+ int myId;
+ int verifNum;
+ int sendId;
+
+ public SendVerifThread(int myId, int sendId, int verifNum) {
+ this.myId = myId;
+ this.verifNum = verifNum;
+ this.sendId = sendId;
+
+ }
+
+ public void run() {
+ while (JaceSession.Instance().getTaskObject().action
+ .equals("sendVerif")) {
+ try {
+ JaceSession.Instance().getTaskObject().broadcastVerif(myId,
+ sendId, verifNum);
+ JaceSession.Instance().getTaskObject().broadcastTasks(3);
+ } catch (Exception e) {
+ System.out.println("le message de verif n'est pas recu :" + e);
+ try {
+ Thread.sleep(500);
+ } catch (Exception ex) {
+ }
+ }
+
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class Sender extends Thread {
+
+ // attributes
+ static Sender Instance = null;
+ protected JaceBuffer buffer;
+ Message msg = null;
+ boolean msgIsSent; // attribute used in the JaceSender to know if msg has
+ // been sent
+ JaceInterface stub = null;
+ boolean running;
+
+ // constructors
+ public static Sender Instance() {
+ if (Instance == null) {
+ Instance = new Sender();
+ }
+ return Instance;
+ }
+
+ public Sender() {
+ setPriority(MAX_PRIORITY);
+ running = true;
+ // buffer=new JaceBuffer();
+ Instance = this;
+ }
+
+ public static void setInstance(SenderRmi s) {
+ Instance = s;
+ }
+
+ public static void setInstance(SenderSocket s) {
+ Instance = s;
+ }
+
+ // methods
+ public void kill() {
+ Instance = null;
+ }
+
+ public JaceBuffer getBuffer() {
+ return buffer;
+ }
+
+ public void run() {
+ msg = null;
+ while (running) {
+ // consumme the msg to send (doing a wait, so the msg is processed
+ // immediately)
+ // System.out.println("the sender thread is alive");
+ msg = buffer.get();
+
+ if (msg.getReceiver() == null) {
+ System.out.println("In jaceSend recv = null !!!!!");
+ // System.out.println("msg.getReceiver() : " +
+ // msg.getReceiver());
+
+ System.out.println(" recv is null in msg of SENDER "
+ + LocalHost.Instance().getName());
+
+ }
+
+ // if no destinatory, I send nothing
+ if (msg.getReceiver().getHostStub() == null) {
+ System.out.println("SENDER : the dest is null so I send nothing, the msg is simply lost");
+ yield();
+ }
+
+ // if there is a destinatory,
+ else {
+ msgIsSent = false;
+ System.out.println(msg.getReceiver().getHostName()) ;
+ System.out.println(msg.getSender().getHostName()) ;
+
+ // sending of the message !
+ // stub =
+ // RemoteStubs.Instance().lookUp(msg.getReceiver().getHostIP()).getStub();
+ stub = msg.getReceiver().getHostStub();
+ // if (stub == null) {
+ // System.out.println("SENDER : stub nuuuuuuuuuuuuuuuuuuuuuuuuuuull");
+ // }
+ try {
+ // System.out.println("tryin to send to " +
+ // msg.getReceiver().getRank() + "(" +
+ // Register.Instance().getListeOfTasks().getTaskIdOfRank(msg.getReceiver().getRank()).getHostName()
+ // + ")");
+ stub.iSendYou(msg);
+
+
+
+ // System.out.println("ENVOI succesful to " +
+ // msg.getReceiver().getRank() + "(" +
+ // Register.Instance().getListeOfTasks().getTaskIdOfRank(msg.getReceiver().getRank()).getHostName()
+ // + ")");
+ // System.out.println("envoi succesful to " +
+ // msg.getReceiver().getHostIP());
+ msgIsSent = true;
+ yield();
+ } catch (Exception e) {
+ System.out.println(e);
+
+ // System.out.println("SENDER : the node of task " +
+ // msg.getReceiver().getHostIP() + " is dead !!!! " + e);
+ // System.out.println("SENDER : the dest is dead so I send nothing, message lost");
+ yield();
+ }
+ }
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class SenderRmi extends Sender {
+
+ // attributes
+ static SenderRmi Instance = null;
+
+ // constructors
+ public static SenderRmi Instance() {
+ if (Instance == null) {
+ Instance = new SenderRmi();
+ }
+ return Instance;
+ }
+
+ public SenderRmi() {
+ setPriority(MAX_PRIORITY);
+ running = true;
+ buffer = new JaceBuffer();
+ Instance = this;
+ }
+
+ // methods
+ public void kill() {
+ Instance = null;
+ }
+
+ public void run() {
+ msg = null ;
+ int site_delay = 0 ;
+
+ while (running) {
+ // consumme the msg to send (doing a wait, so the msg is processed
+ // immediately)
+
+ msg = buffer.get();
+
+ if (msg.getReceiver() == null) {
+// System.out.println("in jaceSend recv = null !!!!!");
+
+ System.out.println("Recv is null in msg of SENDER "
+ + LocalHost.Instance().getName());
+ }
+
+ if (msg.getReceiver().getHostStub() == null) {
+ System.out.println("SENDER : the destination is null, so I send nothing, the msg is simply lost");
+ yield();
+ }
+
+ // if there is a destinatory,
+ else {
+ msgIsSent = false;
+
+ stub = msg.getReceiver().getHostStub();
+
+ try {
+ // For simulating latency in network
+ msg.getReceiver() ;
+ Thread.sleep( site_delay ) ;
+
+ // Send the message
+ stub.iSendYou(msg);
+
+ msgIsSent = true;
+
+ yield();
+ } catch (Exception e) {
+ System.out
+ .println("Can't send the messgae to "
+ + msg.getReceiver().getHostName()
+ + ". Error: " + e);
+ yield();
+ }
+ }
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.io.ObjectOutputStream;
+import java.net.Socket;
+
+
+
+public class SenderSocket extends Sender {
+
+ // attributes
+ static SenderSocket Instance = null;
+ // JaceBuffer buffer;
+ // Message msg = null;
+ String name;
+
+ // boolean msgIsSent; //attribute used in the JaceSender to know if msg has
+ // been sent
+ // JaceInterface stub = null;
+ // boolean running;
+ // constructors
+ public static SenderSocket Instance() {
+ if (Instance == null) {
+ Instance = new SenderSocket();
+ }
+ return Instance;
+ }
+
+ private SenderSocket() {
+ setPriority(MAX_PRIORITY);
+ running = true;
+ buffer = new JaceBuffer();
+ Instance = this;
+ }
+
+ // methods
+ public void kill() {
+ Instance = null;
+ running = false;
+ }
+
+ public void run() {
+ msg = null;
+ while (running) {
+ // consumme the msg to send (doing a wait, so the msg is processed
+ // immediately)
+ // System.out.println("the sender thread is alive");
+ msg = buffer.get();
+ if (msg != null) {
+
+ msgIsSent = false;
+
+ // sending of the message !
+ // stub =
+ // RemoteStubs.Instance().lookUp(msg.getReceiver().getHostIP()).getStub();
+ // stub = msg.getReceiver().getHostStub();
+ name = msg.getReceiver().getHostName();
+ try {
+ if (Register.Instance().getNodeOfName(name)
+ .getOutputStream() != null) {
+ ObjectOutputStream out = (ObjectOutputStream) Register
+ .Instance().getNodeOfName(name)
+ .getOutputStream();
+ out.writeObject((Object) msg);
+ out.flush();
+ out.reset();
+ // System.out.println("tryin to send to " +
+ // msg.getReceiver().getRank() + "(" +
+ // Register.Instance().getListeOfTasks().getTaskIdOfRank(msg.getReceiver().getRank()).getHostName()
+ // +
+ // ")"+" tag="+msg.getSrc_tag()+" iter= "+msg.getSrc_iteration());
+ } else {
+ Socket s = new Socket(name, LocalHost.Instance()
+ .getSocketPort());
+ ObjectOutputStream out = new ObjectOutputStream(s
+ .getOutputStream());
+ out.writeObject((Object) msg);
+ out.flush();
+ out.reset();
+ Register.Instance().getNodeOfName(name)
+ .setOutputStream(out);
+ new HandleClient(s, 1).start();
+ }
+ // if (stub == null) {
+ // System.out.println("SENDER : stub nuuuuuuuuuuuuuuuuuuuuuuuuuuull");
+ // }
+
+ // stub.iSendYou(msg);
+
+ // System.out.println("ENVOI succesful to " +
+ // msg.getReceiver().getRank() + "(" +
+ // Register.Instance().getListeOfTasks().getTaskIdOfRank(msg.getReceiver().getRank()).getHostName()
+ // + ")");
+ // System.out.println("envoi succesful to " +
+ // msg.getReceiver().getHostIP());
+ msgIsSent = true;
+ yield();
+ } catch (Exception e) {
+ System.out
+ .println("error in SenderSocket run method :" + e);
+
+ // System.out.println("SENDER : the node of task " +
+ // msg.getReceiver().getHostIP() + " is dead !!!! " + e);
+ // System.out.println("SENDER : the dest is dead so I send nothing, message lost");
+ yield();
+ }
+ }
+ // }
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class SuperNodeData implements java.io.Serializable {
+ private static final long serialVersionUID = 1L;
+
+ // attributes
+ private String IP = null;
+ private int port;
+ private JaceSuperNodeInterface stub = null;
+ private int nbOfNodes = 0;
+
+ // constructor
+ public SuperNodeData(String u, int p) {
+ this.IP = u;
+ this.port = p;
+ }
+
+ public void setIP(String u) {
+ this.IP = u;
+ }
+
+ public void setPort(int p) {
+ this.port = p;
+ }
+
+ public String getIP() {
+ return this.IP;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public void setStub(JaceSuperNodeInterface s) {
+ this.stub = s;
+ }
+
+ public JaceSuperNodeInterface getStub() {
+ return this.stub;
+ }
+
+ public void setNbOfNodes(int nb) {
+ this.nbOfNodes = nb;
+ }
+
+ public int getNbOfNodes() {
+ return this.nbOfNodes;
+ }
+
+}
--- /dev/null
+package jaceP2P;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.rmi.Naming;
+import java.util.Random;
+import java.util.Vector;
+
+import and.Mapping.Algo;
+import and.Mapping.GNode;
+
+public class SuperNodeListe implements Cloneable {
+
+ // attributes
+ public static SuperNodeListe Instance;
+ private Random r;
+ @SuppressWarnings("unchecked")
+ private Vector liste = null;
+
+ // constructors
+ @SuppressWarnings("unchecked")
+ public SuperNodeListe() {
+ liste = new Vector();
+ r = new Random();
+ }
+
+ public synchronized static SuperNodeListe Instance() {
+ if (Instance == null) {
+ Instance = new SuperNodeListe();
+ }
+ return Instance;
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized SuperNodeListe clone() {
+ SuperNodeListe d = new SuperNodeListe();
+ // d.r = (Random) r.clone();
+ d.liste = (Vector) liste.clone();
+ return d;
+ }
+
+ // methods
+
+ // TODO change to have non-static list
+ // using a config file
+ // in order to discover new superNodes dynamicaly
+ @SuppressWarnings("unchecked")
+ public void staticInitialization() {
+ String adr = "";
+ String home = System.getProperty("user.home");
+ String fichier = home + "/supernode.conf";
+
+ // lecture du fichier texte
+ try {
+ InputStream ips = new FileInputStream(fichier);
+ InputStreamReader ipsr = new InputStreamReader(ips);
+ BufferedReader br = new BufferedReader(ipsr);
+
+ while ((adr = br.readLine()) != null) {
+ liste.add(new SuperNodeData(adr, 1098));
+ }
+
+ br.close();
+ } catch (Exception e) {
+ System.out.println(e.toString());
+ }
+
+ // String adr1 =
+ // LocalHost.Instance().resolve("azur-38.sophia.grid5000.fr");
+ // String adr2 =
+ // LocalHost.Instance().resolve("cluster1.iut-bm.univ-fcomte.fr");
+ // String adr3 =
+ // LocalHost.Instance().resolve("smiths.iut-bm.univ-fcomte.fr");
+ // String adr1="193.52.61.142";
+ // String adr2="172.20.96.21";
+
+ // String adr3="193.52.61.140";
+ // System.out.println("adr3 = " + adr3);
+ // liste.add(new SuperNodeData(adr1, 1098));
+ // liste.add(new SuperNodeData(adr2, 1098));
+ // liste.add(new SuperNodeData(adr3, 1098));
+ }
+
+ public void fileInitialization(String fileName) {
+ // TODO modif JaceParser pr lire
+ // donnees ds un fichier de conf
+ }
+
+ @SuppressWarnings("unchecked")
+ public Vector getListe() {
+ return liste;
+ }
+
+ public int getSize() {
+ return liste.size();
+ }
+
+ // Randomly returns a super Node of the liste
+ public SuperNodeData getASuperNodeData() {
+ int index = r.nextInt() % liste.size();
+ if (index < 0) {
+ index = -index;
+ }
+ return (SuperNodeData) liste.elementAt(index);
+ }
+
+ public SuperNodeData getSuperNodeData(int i) {
+ return (SuperNodeData) liste.elementAt(i);
+ }
+
+ // if returns 0, there are no nodes at all
+ public SuperNodeData getBestSuperNodeData() {
+ int max = 0;
+ SuperNodeData d = null;
+ SuperNodeData return_d = null;
+
+ for (int i = 0; i < liste.size(); i++) {
+ d = (SuperNodeData) liste.elementAt(i);
+ if (d.getNbOfNodes() > max) {
+ max = d.getNbOfNodes();
+ return_d = (SuperNodeData) liste.elementAt(i);
+ }
+ }
+ return return_d;
+ }
+
+ public void addStub(int index, JaceSuperNodeInterface stub) {
+ ((SuperNodeData) liste.elementAt(index)).setStub(stub);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void addStubOf(String snode, int port, JaceSuperNodeInterface stub) {
+ int is = -1;
+ is = existSuperNode(snode);
+ if (is != -1) {
+ ((SuperNodeData) liste.elementAt(is)).setStub(stub);
+ } else {
+ System.out.println("this superNode does not exist : " + snode);
+ System.out.println("add the new SuperNode");
+ SuperNodeData data = new SuperNodeData(snode, port);
+ data.setStub(stub);
+ data.setNbOfNodes(0);
+ liste.add(data);
+
+ }
+ }
+
+ public int getTotalDaemons() {
+ int s = 0;
+
+ for (int i = 0; i < liste.size(); i++) {
+ s = s + ((SuperNodeData) liste.get(i)).getNbOfNodes();
+ System.out.println("s=" + s + " " + Register.Instance().getSize());
+ }
+ return s;
+ }
+
+ public synchronized JaceSuperNodeInterface getStubOf(String superNode) {
+ int is = -1;
+ if (liste.isEmpty()) {
+ return null;
+ } else {
+ if (superNode == null) {
+ System.out
+ .println("In Register.getNode : host == null !!!!!!!!!!!!!!!");
+ }
+ is = existSuperNode(superNode);
+ if (is != -1) {
+ return ((SuperNodeData) liste.get(is)).getStub();
+ } else {
+ System.out.println("this superNode does not exist");
+ return null;
+ }
+ }
+ }
+
+ public synchronized int existSuperNode(String snode) {
+ if (snode == null) {
+ System.out
+ .println("!!!!!!!!!!!!!!!!!!!!!!!!! hostIP in existNode = "
+ + snode);
+ }
+ int existe = -1;
+ int index = 0;
+ while ((existe == -1) && (index < liste.size())) {
+ if (snode.equals(((SuperNodeData) liste.get(index)).getIP())) {
+ existe = index;
+ } else
+ index++;
+ }
+ return existe;
+ }
+
+ public void forwardCountNode() {
+ SuperNodeData d = null;
+ JaceSuperNodeInterface remoteStub = null;
+
+ for (int i = 0; i < liste.size(); i++) {
+ d = (SuperNodeData) liste.elementAt(i);
+ if (!d.getIP().equals(LocalHost.Instance().getIP())) {
+ // if not me, I inform the other super nodes
+ remoteStub = d.getStub();
+ try {
+ remoteStub.updateCountNode(LocalHost.Instance().getIP(),
+ Register.Instance().getSize());
+ // System.out.println("envoye : " +
+ // Register.Instance().getSize() + " a " + d.getIP());
+ } catch (Exception e) {
+ // System.out.println(d.getIP() + " is probably dead");
+ d.setNbOfNodes(0);
+ }
+ }
+ }
+ }
+
+ // TODO
+ // compare with stub rather than IP adresses
+ public void modifCountNode(String IP, int nb) {
+ int is = -1;
+ is = existSuperNode(IP);
+ if (is != -1) {
+ SuperNodeData d = (SuperNodeData) liste.elementAt(is);
+ d.setNbOfNodes(nb);
+ } else {
+ System.out.println("ce superNode existe pas ds ma liste : " + IP);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void locateSuperNodes(JaceSuperNodeInterface stub) {
+ String remoteIP;
+ int remotePort;
+ int i = 0;
+ boolean connected = false;
+ boolean inStaticList = false;
+ while (i < liste.size() && !connected) {
+ remoteIP = LocalHost.Instance().resolve(
+ ((SuperNodeData) liste.elementAt(i)).getIP());
+ remotePort = ((SuperNodeData) liste.elementAt(i)).getPort();
+
+ // If I'm not this superNode, I locate the others
+ if (!remoteIP.equals(LocalHost.Instance().getIP())) {
+ JaceSuperNodeInterface remoteStub;
+ //int nb;
+ try {
+ // get the stub of remote SuperNodes
+ remoteStub = (JaceSuperNodeInterface) Naming
+ .lookup("rmi://" + remoteIP + ":" + remotePort
+ + "/JaceSuperNode");
+ System.out.println("trying to contact " + remoteIP);
+ // add this stub in the local SuperNodeListe
+ // SuperNodeListe.Instance().addStub(i, remoteStub);
+
+ // Send my stub to this corresponding remote SuperNode
+ liste = (Vector<?>) remoteStub.sendStub(LocalHost.Instance()
+ .getIP(), LocalHost.Instance().getPort(), stub);
+ System.out.println("Recieved List");
+ connected = true;
+ // tell this remote SuperNode that I have registered no
+ // Daemons yet
+ // remoteStub.updateCountNode(LocalHost.Instance().getIP(),
+ // 0);
+
+ // get the nb of Daemons the remote SuperNode has already
+ // registered
+ // nb = remoteStub.getNbOfNodes();
+ // ((SuperNodeData)liste.elementAt(i)).setNbOfNodes(nb);
+
+ // System.out.println("bien envoye mon stub a " + remoteIP);
+ System.out.println("size:" + liste.size());
+ viewAll();
+ } catch (Exception e) {
+ System.out.println("SuperNode " + remoteIP
+ + " is down or not already launched: " + e);
+ }
+ } else
+ inStaticList = true;
+ i++;
+ }
+ if (!connected)
+ if (inStaticList) {
+ liste.clear();
+ SuperNodeData data = new SuperNodeData(LocalHost.Instance()
+ .getIP(), LocalHost.Instance().getPort());
+ data.setStub(stub);
+ data.setNbOfNodes(0);
+ liste.add(data);
+ TokenThread.Instance().setToken();
+ } else {
+ System.out
+ .println("no superNode alive in the static List, closing application");
+ System.exit(1);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public void addSuperNode(SuperNodeData d) {
+ int is = -1;
+ String name = d.getIP();
+ is = existSuperNode(name);
+ if (is != -1) {
+ liste.setElementAt(d, is);
+ } else {
+ liste.add(d);
+ }
+ }
+
+ public void removeSuperNode(SuperNodeData d) {
+ String name = d.getIP();
+ int i = existSuperNode(name);
+ if (i != -1) {
+ // System.out.println("je remove le noeud de rang " + i);
+ liste.remove(i);
+ } else {
+ System.out.println("super node existe pas");
+ }
+ }
+
+ public void viewAll() {
+ System.out.println("\nConfig of the superNodes :");
+ String msg = "";
+ for (int i = 0; i < liste.size(); i++) {
+ SuperNodeData d = (SuperNodeData) liste.elementAt(i);
+ if (d.getIP().equals(LocalHost.Instance().getIP())) {
+ msg = "me : " + Register.Instance().getSize();
+ } else {
+ System.out.println(d.getIP() + " : "
+ + ((SuperNodeData) liste.elementAt(i)).getNbOfNodes());
+ }
+ }
+ System.out.println(msg);
+ // Register.Instance().viewAll();
+ System.out.print("\n\n");
+ }
+
+
+ /****************************************/
+ /*********** Sébastien Miquée ***********/
+ /****************************************/
+
+ public void addGNode( GNode _g )
+ {
+ if( _g != null )
+ {
+
+ SuperNodeData d = null ;
+ JaceSuperNodeInterface remoteStub = null ;
+
+ for( int i = 0 ; i < liste.size() ; i++ )
+ {
+ d = (SuperNodeData) liste.elementAt( i ) ;
+ if( ! d.getIP().equals(LocalHost.Instance().getIP() ) )
+ {
+ // if not me, I inform the other super nodes
+ remoteStub = d.getStub() ;
+ try {
+ remoteStub.addGNode( _g ) ;
+ } catch( Exception e ) {
+ System.err.println( "Unable to add new GNode at SuperNode " + d.getIP() ) ;
+ }
+ }
+ }
+ }
+ }
+
+ public void removeGNode( GNode _g )
+ {
+ if( _g != null )
+ {
+
+ SuperNodeData d = null ;
+ JaceSuperNodeInterface remoteStub = null ;
+
+ for( int i = 0 ; i < liste.size() ; i++ )
+ {
+ d = (SuperNodeData) liste.elementAt( i ) ;
+ if( ! d.getIP().equals(LocalHost.Instance().getIP() ) )
+ {
+ // if not me, I inform the other super nodes
+ remoteStub = d.getStub() ;
+ try {
+ remoteStub.removeGNode( _g ) ;
+ } catch( Exception e ) {
+ System.err.println( "Unable to remove GNode at SuperNode " + d.getIP() ) ;
+ }
+ }
+ }
+ }
+ }
+
+ public void setMapping( Algo al )
+ {
+ if( al != null )
+ {
+ SuperNodeData d = null ;
+ JaceSuperNodeInterface remoteStub = null ;
+
+ for( int i = 0 ; i < liste.size() ; i++ )
+ {
+ d = (SuperNodeData) liste.elementAt( i ) ;
+ if( ! d.getIP().equals(LocalHost.Instance().getIP() ) )
+ {
+ // if not me, I inform the other super nodes
+ remoteStub = d.getStub() ;
+ try {
+ remoteStub.setMapping( al ) ;
+ } catch( Exception e ) {
+ System.err.println( "Unable to set mapping data at SuperNode " + d.getIP() ) ;
+ }
+ }
+ }
+ }
+ }
+
+}
--- /dev/null
+package jaceP2P;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.rmi.RemoteException;
+import java.util.Vector;
+
+//import java.util.Calendar;
+//import java.util.GregorianCalendar;
+
+//import com.jamonapi.*;
+
+public class Task implements Runnable, Cloneable, java.io.Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public double errorLoc = 0;
+ public int saveParameter;
+ public int jaceMyId;
+ public int jaceSize;
+ public TaskId jaceTaskId = null;
+ public String[] jaceArgs;
+ public boolean reloading = false;
+ public String state = "NORMAL";
+ public int nb_not_recv;
+ public boolean electedNode = false;
+ public boolean respSent = false;
+ public Vector<Integer> resp;
+ public int verifNum = 0;
+ public LastSave lastSave = new LastSave();
+ public Vector<Integer> neighbors;
+ public Vector<Boolean> neighborsValues;
+ public Vector<Integer> dependancies;
+ public Vector<Boolean> values;
+ public int sendId;
+ public String action = "nothing";
+ public boolean verdict = false;
+ public boolean recievedVerdict = false;
+ public int jaceP2P_Iteration = 0;
+ public boolean finalStep = false;
+ // public Monitor mon1=null;
+ public int timeStep = 0; // time discretization counter for non-stationary
+ // problems
+ public boolean localCV_state;
+ public boolean jaceP2P_globalCV_state = false;
+ // attribute to know if an appli has finished yet
+ private boolean finalize = false;
+ public boolean pseudoPerBeg;
+ public boolean pseudoPerEnd;
+ public boolean underTh = false;
+ public boolean saved[];
+ // attributes for BackupNodes
+ private int saveRound = 0;
+ private int[] saveTab = null;
+ public boolean savedResults = false;
+ public Task sauv;
+ public BackupConvg sauvConvg = new BackupConvg();
+ public boolean postReloading = false;
+ public Vector<?> reduceAll;
+
+ int cpt = 0;
+ int count = 0;
+ Thread th;
+
+ public Task() {
+ reduceAll = new Vector<Object>();
+ dependancies = new Vector<Integer>();
+ values = new Vector<Boolean>();
+ resp = new Vector<Integer>();
+ saved = new boolean[2];
+ sauv = this;
+ neighbors = new Vector<Integer>();
+ neighborsValues = new Vector<Boolean>();
+ }
+
+ public void getBackupForNewNode(int rank) {
+ TaskId task = null;
+ JaceInterface stub = null;
+ task = Register.Instance().getListeOfTasks().getTaskIdOfRank(rank);
+
+ stub = task.getHostStub();
+
+ // if no stub there is a problem
+ if (stub == null) {
+ System.out.println("unable to SEND backup on task of rank " + rank);
+ } else {
+ // if there is a stub, send the stream to that node
+ try {
+ ByteArrayOutputStream stream;
+ synchronized (sauv) {
+ stream = convertTask2stream(sauv);
+ stub.saveTask(jaceMyId, stream.toByteArray(), sauv.lastSave
+ .getLastSave(), sauv.timeStep, Register.Instance()
+ .getAppliName(), 0);
+ }
+ synchronized (sauvConvg) {
+ if (sauvConvg.initialized == true) {
+ stream = convertBackupConv2stream(sauvConvg);
+ stub.saveTask(jaceMyId, stream.toByteArray(),
+ sauvConvg.lastSave.getLastSave(),
+ sauvConvg.timeStep, Register.Instance()
+ .getAppliName(), 1);
+ }
+ }
+ } catch (Exception e) {
+ System.out.println("error in getBackupForNewNode :" + e);
+ e.printStackTrace(System.out);
+ }
+ }
+ }
+
+ // method to overload by user in the appli to convert the stream sent by the
+ // BackupNode to a Task object of the type of the appli this method is
+ // called
+ // in TaskLauncher.loadBackupAndRestart() and
+ // TaskLauncher.loadOrReloadTask()
+ public Task jaceP2P_ConvertStream(ObjectInputStream stream) {
+ return null;
+ }
+
+ public void printSav() {
+ }
+
+ // method to overload by user in the appli to specify the reduceAll method
+ public synchronized void reduceAll(Vector<?> recievedValue) {
+ }
+
+ public void setId(TaskId Id) {
+ jaceTaskId = Id;
+ jaceMyId = jaceTaskId.getRank();
+ try {
+ jaceSize = Register.Instance().getNbOfTasks();
+ } catch (Exception e) {
+ try {
+ System.out.println("setId is bad !! " + e + " "
+ + LocalHost.Instance().getName());
+ // jaceSize = Register.Instance().getListeOfTasks().getSize();
+ } catch (Exception e2) {
+ System.out.println("not localised the spawner : " + e2);
+ }
+ }
+ }
+
+ public void setParam(String[] arg) {
+ this.jaceArgs = arg;
+
+ }
+
+ public void setJaceSize(int nbTasks) {
+ jaceSize = nbTasks;
+ }
+
+ public TaskId getId() {
+ return jaceTaskId;
+ // return myIdd;
+ }
+
+ public int getTimeStep() {
+ return timeStep;
+ }
+
+ // method to overload by user in the appli to identify the neighbors of a
+ // node of rank i
+
+ public int[] getDependencies(int i) {
+ return null ;
+ }
+
+ // method to overload by user in the appli to init each task at beginning of
+ // computation
+
+ public void jaceP2P_InitTask() {
+ }
+
+ // TaskLauncher.loadOrReloadTask()
+ public void jaceP2P_ReinitTask() {
+ }
+
+ // method to overload by user in the appli to safeguard the results
+ public void saveResults() {
+ }
+
+ // method to overload by user in the appli to save only the requiered
+ // attributes (iter, vecteurs,......) this method is called in
+ // Task.jaceP2P_Save()
+ public Task jaceP2P_SaveFromCrash() {
+ System.out.println("JaceSaveFromCrash Task");
+ return null;
+ }
+
+ public void jaceFinalize() {
+
+ finalize = true;
+ System.out.println("ready to Death Task:" + jaceMyId);
+
+ try {
+ Register.Instance().getSpawnerStub().killApplication(
+ LocalHost.Instance().getStub());
+
+ } catch (Exception e) {
+ System.out
+ .println("pas reussit a joindre Spawner pr killApplication : "
+ + e);
+ }
+
+ JaceDaemon.Instance().reinitDaemon();
+
+ }
+
+ public void jaceP2P_ReinitConv() {
+ // System.out.println("reinit conv");
+
+ timeStep++;
+ state = "NORMAL";
+ // last_iter.removeAllElements();
+ try {
+ reinitializeVectors();
+
+ } catch (Exception e) {
+ System.out.println("erreur ds jaceP2P_reinitConv():" + e);
+ }
+ pseudoPerBeg = false;
+ pseudoPerEnd = false;
+ synchronized (lastSave) {
+ lastSave = new LastSave();
+ }
+ // reinit les var de conv de Task generique
+ localCV_state = false;
+ jaceP2P_globalCV_state = false;
+
+ }
+
+ public void reinitializeVectors() throws RemoteException {
+ values.removeAllElements();
+ dependancies.removeAllElements();
+ neighbors.removeAllElements();
+ neighborsValues.removeAllElements();
+ resp.removeAllElements();
+ }
+
+ public long jaceP2P_getChronoValue() {
+ long result = 0;
+
+ try {
+ result = Register.Instance().getSpawnerStub().getChronoValue(
+ Register.Instance().getAppliName());
+ } catch (Exception e) {
+ System.out
+ .println("JaceP2P_Error in Task.jaceP2P_getChronoValue() on SuperNode : "
+ + e);
+ }
+
+ return result;
+ }
+
+ public void run() {
+ //System.out.println("ds run()");
+ }
+
+ /**
+ *asynchronous sending (non blocking), of data object to an other task
+ *
+ * @param buffer
+ * the object (serializable) data to be send
+ *@param dest
+ * the task id for receiver's task
+ *@param tag
+ * the tag for message
+ */
+ @SuppressWarnings("static-access")
+ public void jaceSend(Object buffer, int dest, int tag, double erreur_locale) {
+ // System.out.println("dest : " + dest);
+
+ TaskId recev = null;
+ try {
+ recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(dest);
+
+ if (recev == null) {
+ System.out.println("In jaceSend recv = null !");
+ try {
+ JaceSession.Instance().getTaskThread().sleep(10);
+ JaceSession.Instance().getTaskThread().yield();
+ } catch (Exception e) {
+ }
+ }
+
+ // TODO : virer ce else, mais chercher pkoi recev est null des fois
+ else {
+ if (recev.getRank() != dest) {
+ System.out.println("Problem !! pas le meme dest que ds les param");
+ }
+
+ // creer le Message
+ // TODO : rajouter nom de l'appli ds Messag
+ try {
+ JaceInterface stub;
+ stub = recev.getHostStub();
+ if (stub.getTimeStep() == timeStep) {
+ // System.out.println("************ "+verifNum+" *************");
+ Message msg = new Message();
+ msg.setParam(buffer, jaceTaskId, recev, tag, timeStep,
+ jaceP2P_Iteration, verifNum, erreur_locale);
+
+ // on met le message ds
+ // JaceBuffer.Instance() (la liste des Message a
+ // envoyer)
+
+ /*
+ * if(JaceDaemon.Instance().getProtocol().equals("socket"
+ * )) SenderSocket.Instance().buffer.add(msg); else
+ * if(JaceDaemon.Instance().getProtocol().equals("rmi"))
+ * SenderRmi.Instance().buffer.add(msg);
+ */
+ // System.out.println("putting message to "+dest+" in the buffer");
+ Sender.Instance().getBuffer().add(msg);
+
+ // if(JaceDaemon.Instance().getProtocol().equals("rmi"))
+ // SenderRmi.Instance().getBuffer().add(msg);
+ // else
+ // SenderSocket.Instance().getBuffer().add(msg);
+ }
+ } catch (Exception e) {
+ System.out.println("unable to send data message to " + dest
+ + ": " + e);
+ }
+ // System.out.println("TASK : g mis un msg qui doit etre envoye");
+ // envoie toujours asynchrone !!!!!!!! : le Message partira
+ // instantanement car
+ // -Sender faisait JaceBuffer.Instance().get() qui contient un
+ // wait()
+ // -et ici, Task fait JaceBuffer.Instance().add(msg) qui
+ // contient un notifyAll()
+ try {
+ JaceSession.Instance().getTaskThread().sleep(10);
+ JaceSession.Instance().getTaskThread().yield();
+ } catch (Exception e) {
+ }
+ // System.out.println("TASK : je sort de jaceSend");
+ }
+ } catch (Exception e) {
+ if (Register.Instance().getListeOfTasks() == null)
+ System.out.println("Liste des taches est nulle: " + e);
+ }
+
+ }
+
+ /**
+ *not-blocking reception of data object, return an object
+ *
+ * @param dest
+ * the task id for the task sender
+ *@param tag
+ * the tag for message
+ */
+ @SuppressWarnings("static-access")
+ public Object jaceReceive(int sender, int tag) {
+ if (jaceP2P_Iteration == 0 || postReloading) {
+ try {
+ if (notExist(sender)) {
+ setDep(sender);
+ // last_iter.add(0);
+ }
+ } catch (Exception e) {
+ }
+ }
+
+ Message tmp = MsgQueue.Instance().get(sender, tag);
+ if (tmp != null) {
+ // System.out.println("recu MSG de tache " + sender + " (" +
+ // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
+ // + ") MsgQueue : " +
+ // MsgQueue.Instance().getSize()+" message src_tag="+tmp.getSrc_tag()+
+ // " localError="+tmp.getLocalError());
+ try {
+
+ int index;
+ if (underTh == true && jaceP2P_Iteration != 0 && !reloading) {
+ index = depIndex(sender);
+
+ if ((!(state.equals("VERIF")) || verifNum == tmp
+ .getSrc_tag())) {
+
+ // System.out.println("dep["+sender+"]=true, index="+index);
+ setValues(index, true);
+ }
+ }
+
+ } catch (Exception e) {
+ System.out.println("error jaceReceive :" + e);
+ System.out.println("sender=" + sender);
+ }
+ return (tmp.getData());
+ } else {
+ try {
+ // System.out.println("RIENNN recu de tache " + sender + " (" +
+ // Register.Instance().getListeOfTasks().getTaskIdOfRank(sender).getHostName()
+ // + ") taille MsgQueue : " + MsgQueue.Instance().getSize());
+ } catch (Exception e) {
+ if (Register.Instance().getListeOfTasks() == null)
+ System.out.println("Liste des taches est nulle: " + e);
+ }
+ try {
+ JaceSession.Instance().getTaskThread().sleep(10);
+ JaceSession.Instance().getTaskThread().yield();
+ } catch (Exception e) {
+ }
+ return null;
+ }
+ }
+
+ public boolean notExist(int sender) {
+ int i = dependancies.indexOf((Object) (new Integer(sender)));
+ if (i == -1)
+ return true;
+ else
+ return false;
+ }
+
+ public void setDep(int value) {
+ dependancies.add(new Integer(value));
+ values.add(new Boolean(false));
+ }
+
+ public int depIndex(int sender) {
+ int index = dependancies.indexOf((Object) (new Integer(sender)));
+ return index;
+ }
+
+ public void setValues(int index, boolean value) {
+ values.setElementAt(new Boolean(value), index);
+ }
+
+ @SuppressWarnings("static-access")
+ public void jaceP2P_Save() {
+ try {
+
+ if (jaceP2P_Iteration == 0) {
+ // request 0 when task at barrier
+ // request 1 when only at convergence
+ broadcastTasks(0);
+ } else if ((jaceP2P_Iteration % saveParameter) == 0) {
+ // clone the Task at that step of computations
+ saveRound++;
+ synchronized (sauv) {
+ sauv = getTask2save();
+
+ // send it to the corresponding BackupNode in a round robin
+ // fashion
+ new SaveTaskThread(sauv).start();
+ }
+ }
+
+ JaceSession.Instance().getTaskThread().sleep(10);
+ JaceSession.Instance().getTaskThread().yield();
+ } catch (Exception e) {
+ }
+ }
+
+ // request 0 when Saving all task
+ // request 3 when saving only convergence data
+ @SuppressWarnings("static-access")
+ public void broadcastTasks(int request) {
+ ByteArrayOutputStream stream;
+ if (request == 0) {
+ // 1 - clone the Task at that step of computations and serialize it
+ try {
+ synchronized (sauv) {
+ sauv = getTask2save();
+ stream = convertTask2stream(sauv);
+
+ // 2 - create de saveTab if necessary
+ if (saveTab == null) {
+ createSaveTab();
+ }
+ TaskId task = null;
+ JaceInterface stub = null;
+
+ // 3 - send the stream to all the BackupNodes
+ for (int i = 0; i < saveTab.length; i++) {
+ // System.out.println("saveTab[" + i + "] = " +
+ // saveTab[i]);
+
+ // 3.1 - get the stub of destinatory
+ // System.out.println("Saving on neighbor "+i);
+ try {
+ task = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
+ stub = task.getHostStub();
+ } catch (Exception e) {
+ System.out
+ .println("pb in the broadcast, ligne d'assignation de task ds broadcats: "
+ + e);
+ }
+
+ // 3.2 - send the stream to that stub
+
+ // System.out.println("saving on second list");
+ if (stub != null) {
+ new BroadcastTaskThread(stub, jaceMyId, stream
+ .toByteArray(),
+ sauv.lastSave.getLastSave(), sauv.timeStep,
+ Register.Instance().getAppliName(), 0,
+ saveTab[i]).start();
+
+ }
+
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ } else {
+ synchronized (sauvConvg) {
+ sauvConvg = getBackupConvg2Save();
+ stream = convertBackupConv2stream(sauvConvg);
+
+ // 2 - create de saveTab if necessary
+ if (saveTab == null) {
+ createSaveTab();
+ }
+ TaskId task = null;
+ JaceInterface stub = null;
+
+ // 3 - send the stream to all the BackupNodes
+ for (int i = 0; i < saveTab.length; i++) {
+ // System.out.println("saveTab[" + i + "] = " + saveTab[i]);
+
+ // 3.1 - get the stub of destinatory
+ // System.out.println("Saving on neighbor "+i);
+ try {
+ task = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(saveTab[i]); // ///////////////////////////pb
+ stub = task.getHostStub();
+ } catch (Exception e) {
+ System.out
+ .println("pb in the broadcast, ligne d'assignation de task ds broadcats: "
+ + e);
+ }
+
+ // 3.2 - send the stream to that stub
+ /*
+ * if(request==3){ if (stub != null) { new
+ * BroadcastTaskThread(stub, jaceMyId, stream.toByteArray(),
+ * sauvConvg.lastSave.getLastSave(),timeStep,
+ * Register.Instance().getAppliName(),1,saveTab[i]).start();
+ * } //System.out.println("saving on second list"); } else{
+ */
+ if (stub != null) {
+ new BroadcastTaskThread(stub, jaceMyId, stream
+ .toByteArray(), sauvConvg.lastSave
+ .getLastSave(), sauvConvg.timeStep, Register
+ .Instance().getAppliName(), 1, saveTab[i])
+ .start();
+ }
+
+ // System.out.println("saving on first list");
+ }
+ }
+ }
+ try {
+ // JaceSession.Instance().getTaskThread().sleep(10);
+ JaceSession.Instance().getTaskThread().yield();
+ }
+
+ catch (Exception e) {
+ }
+ }
+
+ private ByteArrayOutputStream convertBackupConv2stream(BackupConvg t) {
+ // System.out.println("beginning of the checkpointing process......");
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
+ fluxOut.writeObject(t);
+ fluxOut.close();
+ } catch (Exception e) {
+ System.out
+ .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
+ + e);
+ }
+ // System.out.println("taille du tablo de byte : " +
+ // stream.toByteArray().length + " bytes");
+ return stream;
+ }
+
+ // convert the task in stream to send it to the BackupNode
+ private ByteArrayOutputStream convertTask2stream(Task t) {
+ // System.out.println("beginning of the checkpointing process......");
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ ObjectOutputStream fluxOut = new ObjectOutputStream(stream);
+ fluxOut.writeObject(t);
+ fluxOut.close();
+ } catch (Exception e) {
+ System.out
+ .println("JaceP2P_Error in Task.jaceP2P_ReinitConv() when converting Task in Stream : "
+ + e);
+ }
+ // System.out.println("taille du tablo de byte : " +
+ // stream.toByteArray().length + " bytes");
+ return stream;
+ }
+
+ // get the task of the appli with the data to save (in
+ // jaceP2P_SaveFromCrash() overloaded in the appli)
+ @SuppressWarnings("unchecked")
+ public Task getTask2save() {
+
+ sauv = jaceP2P_SaveFromCrash();
+
+ // System.out.println("Saving data at it="+sauv.it);
+ // assign the default attributes of a task (params of the appli and
+ // TaskId of the local node)
+ sauv.setId(this.jaceTaskId);
+ sauv.setParam(Register.Instance().getParams());
+ sauv.timeStep = timeStep;
+
+ // assign the iteration number that had the appli at the moment of the
+ // checkpointing
+ sauv.jaceP2P_Iteration = jaceP2P_Iteration;
+ sauv.saveRound = saveRound;
+
+ sauv.underTh = underTh;
+ sauv.jaceP2P_globalCV_state = jaceP2P_globalCV_state;
+ // System.out.println("I checkpoint the task at ite " +
+ // sauv.jaceP2P_Iteration);
+ // attributes needed to detect global convergence
+ try {
+
+ sauv.state = state;
+ sauv.nb_not_recv = nb_not_recv;
+ sauv.electedNode = electedNode;
+ sauv.respSent = respSent;
+ sauv.neighbors = (Vector) neighbors.clone();
+ sauv.neighborsValues = (Vector) neighborsValues.clone();
+ sauv.resp = (Vector) resp.clone();
+ sauv.verifNum = verifNum;
+ sauv.sendId = sendId;
+ sauv.reduceAll = reduceAll;
+ sauv.finalStep = finalStep;
+ sauv.action = action;
+ sauv.verdict = verdict;
+ sauv.localCV_state = localCV_state;
+ sauv.recievedVerdict = recievedVerdict;
+ synchronized (lastSave) {
+ lastSave.increment();
+ sauv.lastSave = lastSave;
+ }
+ } catch (Exception e) {
+ System.out.println("probleme ds rmi");
+ }
+
+ return sauv;
+ }
+
+ @SuppressWarnings("unchecked")
+ public BackupConvg getBackupConvg2Save() {
+
+ try {
+
+ // sauvConvg=new BackupConvg();
+ sauvConvg.state = state;
+ sauvConvg.underTh = underTh;
+ sauvConvg.nb_not_recv = nb_not_recv;
+ sauvConvg.electedNode = electedNode;
+ sauvConvg.respSent = respSent;
+ sauvConvg.neighbors = (Vector) neighbors.clone();
+ sauvConvg.neighborsValues = (Vector) neighborsValues.clone();
+ sauvConvg.resp = (Vector) resp.clone();
+ sauvConvg.verifNum = verifNum;
+ sauvConvg.sendId = sendId;
+ sauvConvg.finalStep = finalStep;
+ sauvConvg.action = action;
+ sauvConvg.verdict = verdict;
+ sauvConvg.localCV_state = localCV_state;
+ sauvConvg.timeStep = timeStep;
+ sauvConvg.recievedVerdict = recievedVerdict;
+ sauvConvg.jaceP2P_Iteration = jaceP2P_Iteration;
+ sauvConvg.reduceAll = reduceAll;
+
+ synchronized (lastSave) {
+ lastSave.increment();
+ sauvConvg.lastSave = lastSave;
+ }
+ sauvConvg.initialized = true;
+ } catch (Exception e) {
+ System.out.println("probleme ds rmi" + e);
+ }
+
+ return sauvConvg;
+ }
+
+ private void createSaveTab() {
+
+ saveTab = new int[BackupsManager.Instance().size()];
+
+ // 2 - assign the taskId to each cell of the saveTab
+ // System.out.println("in TASK");
+ for (int i = 0; i < saveTab.length; i++) {
+ saveTab[i] = BackupsManager.Instance().getBackupTaskAtIndex(i, 0)
+ .getTaskRank();
+ // System.out.print(saveTab[i] + ", ");
+ }
+ // System.out.println("saveTab created..... size : " + saveTab.length);
+ }
+
+ public void setSaved(boolean bool) {
+ synchronized (saved) {
+ if (bool == true) {
+ if (saved[0] == false)
+ saved[0] = true;
+ else if (saved[1] == false)
+ saved[1] = true;
+ } else {
+ saved[0] = false;
+ saved[1] = false;
+ }
+ }
+
+ }
+
+ public void waitForAck(int tag) {
+ if (Register.Instance().getNumBackupNeighbors() != 0) {
+ setSaved(false);
+ broadcastTasks(tag);
+ // Calendar cal = new GregorianCalendar();
+ // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
+ // System.out.println("sleeping till acknowledge saved");
+ while (getSaved() == false)
+ try {
+ Thread.sleep(50);
+ count++;
+ if (count > 30) {
+ try {
+ Register newReg = Register.Instance()
+ .getSpawnerStub().getRegister(jaceMyId);
+ if (newReg != null) {
+ Register.Instance().replaceBy(newReg);
+ } else
+ System.out
+ .println("I got a null register from the spawner");
+ broadcastTasks(tag);
+ System.out
+ .println("sleeping till acknowledge saved");
+ count = 0;
+ } catch (Exception e2) {
+ System.out
+ .println("unable to get register from spawner :"
+ + e2);
+ e2.printStackTrace(System.out);
+ }
+ }
+ } catch (Exception e) {
+ }
+ // Calendar cal1 = new GregorianCalendar();
+ // System.out.println("end save at time="+cal1.get(Calendar.MINUTE)+":"+cal1.get(Calendar.SECOND));
+ }
+ }
+
+ public void jaceP2P_GlobalConvergence(boolean under_th) {
+
+ try {
+
+ Thread.yield();
+ if (under_th != underTh && postReloading == false) {
+ // 1 - update sous seuil
+ underTh = under_th;
+ }
+ if (jaceP2P_Iteration == 0
+ && (postReloading == false || resp.size() == 0)) {
+ detectNeighbors(jaceMyId, jaceSize);
+ initialize_state();
+ sendId = -1;
+ verifNum = 0;
+ reloading = false;
+ broadcastTasks(3);
+ }
+
+ // affiche les valeurs des variables
+ try {
+ // Calendar cal = new GregorianCalendar();
+ // System.out.println("at time="+cal.get(Calendar.MINUTE)+":"+cal.get(Calendar.SECOND));
+ // System.out.println("MyId="+jaceMyId+" sous seuil="+underTh+" etat="+state+" verifNum="+verifNum+" localCV="+localCV_state+" leader="+electedNode+" respSent="+respSent);
+ // System.out.println("dep="+getValues()+" pBeg="+pseudoPerBeg+" PEnd="+pseudoPerEnd+" sendId="+sendId);
+ // System.out.println("neigh="+nb_not_recv+" action="+action+" negative_resp="+testNegativeResp()+" reloading="+reloading+"\n"+" finalStep="+finalStep+" SavedResults="+savedResults+" NeghNotCV="+getNeighbourNotCV()+" postReload="+postReloading);
+ printResp();
+ printDep();
+ } catch (Exception e) {
+ System.out.println("error printing status in Task :" + e);
+ }
+ Thread.yield();
+ if (action.equals("sendVerif") && state.equals("VERIF")
+ && postReloading == true) {
+ // System.out.println("je passe ds send verif");
+
+ new SendVerifThread(jaceMyId, sendId, verifNum).start();
+
+ // System.out.println("send Verif");
+
+ } else if (action.equals("sendVerdict") && postReloading == true) {
+ // System.out.println("je passe ds send verdict-------");
+
+ new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
+ .start();
+
+ // System.out.println("verifNum="+verifNum);
+ }
+
+ if (state.equals("NORMAL") && action.equals("nothing")) {
+ if (underTh == false)
+ reinitializePPEr();
+ else
+ // System.out.println("sous seuil="+underTh+" PseudoPerBeg="+localStub.getPseudoPerBeg());
+ if (pseudoPerBeg == false) {
+ // System.out.println("ds la condition");
+ pseudoPerBeg = true;
+ } else if (pseudoPerEnd == true) {
+
+ localCV_state = true;
+ if (nb_not_recv == 0) {
+ // localStub.setAction("sendVerif");
+ try {
+ broadcastVerif(jaceMyId, -1, verifNum + 1);
+ // localStub.setAction("nothing");
+
+ electedNode = true;
+ recievedVerdict = true;
+ initializeVerifLeader();
+ state = "VERIF";
+ broadcastTasks(3);
+ } catch (Exception e) {
+ System.out
+ .println("le message de verification n'est pas recu: "
+ + e);
+ Register.Instance().viewAll();
+ }
+ } else if (nb_not_recv == 1)
+ if (sendId == -1) {
+
+ int neighId = getNeighbourNotCV();
+ TaskId recev = null;
+
+ try {
+
+ recev = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(neighId);
+ JaceInterface stub = recev.getHostStub();
+ if (stub.setNbNeighboursNotConv(verifNum,
+ jaceMyId, timeStep)) {
+ // stub.setLeftNeighbourCV(true);
+ // System.out.println("sent convergence message to "+neighId);
+ sendId = neighId;
+ state = "WAIT4V";
+ recievedVerdict = false;
+ broadcastTasks(3);
+ }
+ } catch (Exception e) {
+ System.out
+ .println("unable to decrease the number of neighbors not converged on node :"
+ + recev.getHostName()
+ + " count ="
+ + count
+ + " . error:" + e);
+ count++;
+ Register.Instance().viewAll();
+ if (count > 3) {
+ try {
+ int myRank;
+ TaskId id = Register.Instance()
+ .getListeOfTasks()
+ .getTaskIdOfHostStub(
+ LocalHost.Instance()
+ .getStub());
+ myRank = id.getRank();
+ Register.Instance().replaceBy(
+ Register.Instance()
+ .getSpawnerStub()
+ .getRegister(myRank));
+
+ count = 0;
+ } catch (Exception e2) {
+ System.out
+ .println("unable to contact the spawner :"
+ + e2);
+ }
+ }
+ }
+ }
+ } else if (pseudoPerBeg == true && getValues())
+ pseudoPerEnd = true;
+ } else if (state.equals("WAIT4V") && nb_not_recv == 0) {
+ int neighId = getRankLeader();
+ if (neighId == -1) {
+ try {
+ broadcastVerif(jaceMyId, -1, verifNum + 1);
+ electedNode = true;
+ recievedVerdict = true;
+ initializeVerifLeader();
+ state = "VERIF";
+
+ broadcastTasks(3);
+ } catch (Exception e) {
+ System.out
+ .println("le message de verification n'est pas recu:"
+ + e);
+ }
+ } else if (jaceMyId > neighId) {
+ try {
+ broadcastVerif(jaceMyId, -1, verifNum + 1);
+ electedNode = true;
+ recievedVerdict = true;
+ initializeVerifLeader();
+ state = "VERIF";
+ broadcastTasks(3);
+ } catch (Exception e) {
+ System.out
+ .println("le message de verification n'est pas recu:"
+ + e);
+ }
+ }
+ } else if (state.equals("WAIT4V")) {
+ if (underTh == false) {
+ localCV_state = false;
+ broadcastTasks(3);
+ }
+ } else if (state.equals("VERIF")) {
+ // if(localStub.getNewerDep(0) && localStub.getNewerDep(1))
+ // localStub.setPseudoPerEnd(true);
+ // if(!underTh)
+ if (electedNode == true) {
+ if ((!underTh && !postReloading) || !localCV_state
+ || testNegativeResp()) {
+ try {
+
+ broadcastVerdict(jaceMyId, -2, verifNum + 1, false);
+ verifNum++;
+ initialize_state();
+ reinitializePPEr();
+ broadcastTasks(3);
+ } catch (Exception e) {
+ }
+ } else if (postReloading) {
+ if (recievedAllResp())
+ if (!testNegativeResp()) {
+ try {
+ broadcastVerdict(jaceMyId, -2, verifNum,
+ true);
+ if (finalStep == true
+ && state.equals("VERIF")) {
+ initializeSavLeader();
+ state = "SAVING";
+ waitForAck(0);
+ } else {
+ state = "FINISHED";
+ waitForAck(1);
+ }
+ } catch (Exception e) {
+ System.out.println("erreur: " + e);
+ }
+ }
+ } else if (pseudoPerEnd == true) {
+ if (recievedAllResp())
+ if (!testNegativeResp()) {
+ try {
+ broadcastVerdict(jaceMyId, -2, verifNum,
+ true);
+ if (finalStep == true
+ && state.equals("VERIF")) {
+ initializeSavLeader();
+ state = "SAVING";
+ waitForAck(0);
+ } else {
+ state = "FINISHED";
+ waitForAck(3);
+ }
+
+ } catch (Exception e) {
+ System.out.println("erreur: " + e);
+ }
+ } else {
+ try {
+
+ broadcastVerdict(jaceMyId, -2,
+ verifNum + 1, false);
+ verifNum++;
+ initialize_state();
+ reinitializePPEr();
+ broadcastTasks(3);
+ } catch (Exception e) {
+ System.out
+ .println("unable to broadcast a negative verdict :"
+ + e);
+ }
+ }
+ }
+
+ else if (getValues())
+ pseudoPerEnd = true;
+ } else if (!respSent) {
+ if (!underTh || !localCV_state || testNegativeResp()) {
+ if (action.equals("nothing")) {
+ int neighId = sendId;
+ try {
+ TaskId recev = null;
+ recev = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(neighId);
+ JaceInterface stub = recev.getHostStub();
+ // System.out.println("tryin to send negative response to "+neighId);
+ stub.response(jaceMyId, verifNum, -1, null);
+ // System.out.println("send negative response to "+neighId);
+ respSent = true;
+ broadcastTasks(3);
+
+ } catch (Exception e) {
+ System.out
+ .println("response not received:" + e);
+ Register.Instance().viewAll();
+ }
+ }
+ } else if (pseudoPerEnd) {
+ // System.out.print("The daemon can send a response ");
+ int index = recievedAllRespMinusOne();
+ // System.out.println("to node of index ="+index);
+ if (index != -1) {
+ int rank = getNeighborRank(index);
+ // if(jaceMyId!=jaceSize-1 &&
+ // localStub.getSendRight()==true){
+ try {
+ TaskId recev = null;
+ recev = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(rank);
+ JaceInterface stub = recev.getHostStub();
+
+ if (getResp(index) == 1) {
+ stub.response(jaceMyId, verifNum, 1,
+ reduceAll);
+ // System.out.println("send positive response to"+rank);
+ } else {
+ stub.response(jaceMyId, verifNum, -1, null);
+ // System.out.println("send negative response to"+rank);
+ }
+ respSent = true;
+ broadcastTasks(3);
+
+ } catch (Exception e) {
+ System.out.println("response not received by "
+ + rank + ": " + e);
+ Register.Instance().viewAll();
+ }
+ }
+ } else if (getValues())
+ pseudoPerEnd = true;
+ }
+
+ } else if (state.equals("SAVING") && !action.equals("sendVerdict")) {
+
+ if (savedResults == false) {
+
+ saveResults();
+ savedResults = true;
+ broadcastTasks(3);
+ } else if (electedNode) {
+ if (recievedAllResp())
+ try {
+ // System.out.println("recieved all responses");
+ JaceSpawnerInterface spawnerStub = Register
+ .Instance().getSpawnerStub();
+ // System.out.println("##### callin spawnerStub.setFinished(true) #####");
+ spawnerStub.setOver(true);
+ // localStub.setState("FINISHED");
+ } catch (Exception e) {
+ System.out.println("erreur erreur" + e);
+ }
+ } else if (!respSent) {
+ int index = recievedAllRespMinusOne();
+ if (index != -1) {
+ int rank = getNeighborRank(index);
+
+ // if(jaceMyId!=jaceSize-1 &&
+ // localStub.getSendRight()==true){
+ try {
+ TaskId recev = null;
+ recev = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(rank);
+ JaceInterface stub = recev.getHostStub();
+ if (stub.getReloading() == false
+ && stub.getState().equals("SAVING")) {
+ action = "sendResponse";
+ stub.response(jaceMyId, verifNum, 1, null);
+ respSent = true;
+ action = "nothing";
+ broadcastTasks(3);
+ // System.out.println("send response to"+rank);
+ }
+ } catch (Exception e) {
+ System.out.println("reponse non recue" + e);
+ }
+ }
+ }
+ } else if (state.equals("FINISHED")
+ && !action.equals("sendVerdict") && recievedVerdict) {
+
+ jaceP2P_globalCV_state = true;
+ // System.out.println("Finished");
+ // System.out.println("Finished");
+ // System.out.println("Finished");
+ // System.out.println("Finished");
+ System.out.println("Finished");
+ }
+
+ if (postReloading == true)
+ postReloading = false;
+
+ } catch (Exception e) {
+ System.out.println(" Exception in Global Convergence :" + e);
+ e.printStackTrace(System.out);
+ Register.Instance().viewAll();
+ }
+ // mon1.stop();
+ }
+
+ public synchronized int getResp(int index) throws RemoteException {
+ int res = 1;
+ for (int i = 0; i < resp.size(); i++)
+ if (i != index)
+ if (((Integer) resp.get(i)).intValue() == -1)
+ res = -1;
+ return res;
+ }
+
+ public synchronized void response(int neighId, int tag, int response,
+ Vector<?> recievedValue) throws RemoteException {
+ // System.out.println("inside response function");
+ // System.out.println("sleeping till not reloading");
+ while (reloading == true) {
+ try {
+ Thread.sleep(10);
+ // System.out.println("sleeping till not reloading");
+ } catch (Exception e) {
+ }
+ }
+ if (verifNum == tag) {
+ // System.out.println("inside condition");
+ int indexNeigh = neighbors.indexOf((Object) neighId);
+ // System.out.println("after gettin index="+index);
+ // System.out.println("index="+indexNeigh+" size de resp ="+resp.size());
+ // printResp();
+ // try{
+ // int xyz=((Integer)(resp.elementAt(indexNeigh))).intValue();
+ // }catch(Exception e){
+ // System.out.println("fuckkkkkkkkkkkkkkkkkk error:"+e);
+ // }
+
+ if (response == 1
+ && !state.equals("SAVING")
+ && (((Integer) (resp.elementAt(indexNeigh))).intValue()) != 1) {
+ // System.out.println("calling reduceAll()");
+ reduceAll(recievedValue);
+ }
+ // System.out.println("after calculating reduceAll");
+ resp.setElementAt(response, indexNeigh);
+
+ // System.out.println("get response ............");
+ waitForAck(3);
+
+ } else
+ throw new RemoteException();
+ }
+
+ public void initializeSavLeader() {
+ for (int i = 0; i < resp.size(); i++)
+ resp.setElementAt(0, i);
+ respSent = false;
+ }
+
+ public boolean recievedAllResp() {
+ boolean bool = true;
+ for (int i = 0; i < resp.size(); i++)
+ if (((Integer) (resp.get(i))).intValue() == 0) {
+ bool = false;
+ break;
+ }
+ return bool;
+ }
+
+ public int recievedAllRespMinusOne() {
+ int nbOfZeros = 0;
+ int indexOfZero = -1;
+ for (int i = 0; i < resp.size(); i++)
+ if (((Integer) (resp.get(i))).intValue() == 0) {
+ nbOfZeros++;
+ indexOfZero = i;
+ }
+ if (nbOfZeros > 1)
+ return -1;
+ else
+ return indexOfZero;
+ }
+
+ public synchronized boolean setNbNeighboursNotConv(int tag, int idNeigh,
+ int neighborTimeStep) throws RemoteException {
+ // System.out.println("ds setNbNeighboursNotConv !!!!!!!!!!!!!!!");
+ if (tag == verifNum && !action.equals("sendVerdict")
+ && neighborTimeStep == timeStep) {
+ if (idNeigh == -1) {
+ nb_not_recv--;
+ return false;
+ } else {
+ // System.out.println("sleeping till not reloading");
+ while (reloading == true) {
+ try {
+ Thread.sleep(10);
+ // System.out.println("sleeping till not reloading");
+ } catch (Exception e) {
+ }
+ }
+ int i = neighbors.indexOf((Object) idNeigh);
+ if (((Boolean) neighborsValues.get(i)).booleanValue() == false) {
+ nb_not_recv--;
+ // System.out.println("le noeud "+idNeigh+" a envoyer un message de pseudoconvergence");
+ neighborsValues.setElementAt(new Boolean(true), i);
+ waitForAck(3);
+
+ }
+ }
+ return true;
+ } else if (tag == verifNum - 1 && !action.equals("sendVerdict")
+ && reloading == false && neighborTimeStep == timeStep
+ && jaceP2P_Iteration != 0)
+ return true;
+ else
+ return false;
+ }
+
+ public void detectNeighbors(int id, int jaceSize) {
+ // System.out.println("detect neighbors !!!! ");
+ int d = 0;
+ while (Math.pow(2, d) < jaceSize) {
+ if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize)) {
+ neighbors.add((int) (id + Math.pow(2, d)));
+ neighborsValues.add(new Boolean(false));
+ resp.add(0);
+ }
+ if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d)) {
+ neighbors.add((int) (id - Math.pow(2, d)));
+ neighborsValues.add(new Boolean(false));
+ resp.add(0);
+ }
+ d++;
+ }
+ }
+
+ public synchronized void initialize_state() {
+
+ // System.out.println("initialiser\n");
+
+ nb_not_recv = neighbors.size();
+ for (int i = 0; i < neighbors.size(); i++)
+ neighborsValues.setElementAt(new Boolean(false), i);
+ for (int i = 0; i < resp.size(); i++)
+ resp.setElementAt(0, i);
+ underTh = false;
+ electedNode = false;
+ localCV_state = false;
+ verdict = false;
+
+ state = "NORMAL";
+ }
+
+ public int getNeighborRank(int index) throws RemoteException {
+ int rank = ((Integer) (neighbors.get(index))).intValue();
+ return rank;
+ }
+
+ public synchronized int getNeighbourNotCV() {
+ int neighId = -1;
+ for (int i = 0; i < neighbors.size(); i++)
+ if (((Boolean) neighborsValues.get(i)).booleanValue() == false)
+ neighId = ((Integer) neighbors.elementAt(i)).intValue();
+ return neighId;
+ }
+
+ public boolean getValues() {
+ boolean bool = true;
+ for (int i = 0; i < values.size(); i++)
+ if (((Boolean) values.elementAt(i)).equals(new Boolean(false))) {
+ bool = false;
+ break;
+ }
+ // System.out.println("getValues() have been called and it returned "+bool);
+ return bool;
+ }
+
+ public boolean testNegativeResp() {
+ boolean bool = false;
+ for (int i = 0; i < resp.size(); i++)
+ if (((Integer) (resp.get(i))).intValue() == -1) {
+ bool = true;
+ break;
+ }
+ return bool;
+ }
+
+ public void printResp() {
+ // for(int i=0;i<resp.size();i++)
+ // System.out.print(" resp["+((Integer)neighbors.get(i)).intValue()+"]="+((Integer)resp.get(i)).intValue());
+ // System.out.print("\n");
+ }
+
+ public void printDep() {
+ // for(int i=0;i<values.size();i++)
+ // System.out.print(" dep["+((Integer)dependancies.get(i)).intValue()+"]="+((Boolean)values.get(i)).booleanValue());
+ // System.out.print("\n");
+ }
+
+ public void broadcastVerif(int id, int neighId, int tag)
+ throws RemoteException {
+
+ TaskId recev = null;
+ // TaskId local =
+ // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
+ JaceInterface stub;
+ // System.out.println("Id="+id+" neighId="+ neighId+" tag="+tag);
+ for (int i = 0; i < neighbors.size(); i++)
+ if (neighId != ((Integer) neighbors.get(i)).intValue()) {
+ recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
+ ((Integer) neighbors.get(i)).intValue());
+ stub = recev.getHostStub();
+ stub.initializeVerif(tag);
+ // System.out.println("broadcast verification to :"+((Integer)neighbors.get(i)).intValue());
+ }
+ if (action.equals("sendVerif"))
+ action = "nothing";
+ }
+
+ public void initializeVerifLeader() throws RemoteException {
+ reinitializePPEr();
+ for (int i = 0; i < resp.size(); i++)
+ resp.setElementAt(0, i);
+ verifNum++;
+ respSent = false;
+ }
+
+ public void initializeVerif(int tag) throws RemoteException {
+ // System.out.println("Inside initializeVerif @@@@@");
+ // System.out.println("sleeping till not reloading");
+ while (reloading == true) {
+ try {
+ Thread.sleep(10);
+ // System.out.println("sleeping till not reloading");
+ } catch (Exception e) {
+ }
+ }
+ if (verifNum + 1 == tag)
+ if (state.equals("WAIT4V")) {
+ action = "sendVerif";
+ reinitializePPEr();
+ for (int i = 0; i < resp.size(); i++)
+ resp.setElementAt(0, i);
+ verifNum++;
+
+ respSent = false;
+ state = "VERIF";
+ waitForAck(3);
+
+ new SendVerifThread(jaceMyId, sendId, verifNum).start();
+
+ } else if (state.equals("VERIF")) {
+ } else {
+ throw new RemoteException();
+
+ }
+
+ }
+
+ public boolean getSaved() {
+ if (saved[1])
+ return true;
+ else
+ return false;
+ }
+
+ public void reinitializePPEr() {
+ pseudoPerBeg = false;
+ pseudoPerEnd = false;
+ for (int i = 0; i < values.size(); i++)
+ values.setElementAt(new Boolean(false), i);
+
+ }
+
+ public void broadcastVerdict(int id, int neighId, int tag, boolean verd)
+ throws RemoteException {
+ Boolean verdicto = verd;
+ TaskId recev = null;
+ // TaskId local =
+ // Register.Instance().getListeOfTasks().getTaskIdOfRank(id);
+ JaceInterface stub;
+ for (int i = 0; i < neighbors.size(); i++)
+ if (((Integer) neighbors.get(i)).intValue() != neighId) {
+ // System.out.println("broadcasting verdict "+verdicto+" to node of Rank "+((Integer)neighbors.get(i)).intValue());
+ recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
+ ((Integer) neighbors.get(i)).intValue());
+ stub = recev.getHostStub();
+ stub.savOrFinOrRest(tag, timeStep, verdicto, reduceAll);
+ }
+ action = "nothing";
+ if (verdict == false)
+ sendId = -1;
+ }
+
+ public void savOrFinOrRest(int tag, int step, boolean verd,
+ Vector<?> reduceAll) {
+ // System.out.println("Recieved verd "+verd+" sleeping till not reloading");
+ while (reloading == true) {
+ try {
+ Thread.sleep(10);
+ // System.out.println("sleeping till not reloading");
+ } catch (Exception e) {
+ }
+ }
+ if (verd == true) {
+ if (verifNum == tag && timeStep == step && state.equals("VERIF")) {
+ action = "sendVerdict";
+ if (finalStep == true) {
+ for (int i = 0; i < resp.size(); i++)
+ resp.setElementAt(0, i);
+ // System.out.println("sleeping till response is sent");
+ while (action.equals("sendResponse") || respSent == false)
+ try {
+ Thread.sleep(10);
+ // System.out.println("sleeping till response is sent");
+ } catch (Exception e) {
+ }
+ respSent = false;
+ state = "SAVING";
+ verdict = true;
+ recievedVerdict = true;
+
+ } else {
+ state = "FINISHED";
+ verdict = true;
+ recievedVerdict = true;
+ }
+ // System.out.println("//// Stetting reduceAll\\\\");
+ this.reduceAll = reduceAll;
+ waitForAck(3);
+
+ new SendVerdictThread(jaceMyId, sendId, verifNum, verdict)
+ .start();
+ }
+ } else if (verifNum < tag && timeStep == step) {
+ verifNum = tag;
+ initialize_state();
+ reinitializePPEr();
+ verdict = false;
+ action = "sendVerdict";
+ recievedVerdict = true;
+ waitForAck(3);
+
+ new SendVerdictThread(jaceMyId, sendId, verifNum, verdict).start();
+ }
+
+ }
+
+ public int getRankLeader() throws RemoteException {
+ int neighId = -1;
+ for (int i = 0; i < neighbors.size(); i++) {
+ TaskId recev = null;
+ recev = Register.Instance().getListeOfTasks().getTaskIdOfRank(
+ ((Integer) (neighbors.get(i))).intValue());
+ JaceInterface stub = recev.getHostStub();
+ if (stub.getNbNeighboursNotConv() == 0)
+ neighId = ((Integer) (neighbors.get(i))).intValue();
+ }
+ return neighId;
+ }
+
+ class SaveTaskThread extends Thread {
+ Task sauv;
+
+ public SaveTaskThread(Task s) {
+ this.sauv = s;
+ }
+
+ @SuppressWarnings("static-access")
+ public void run() {
+ // If array of BackupNodes not created, create it
+ if (saveTab == null) {
+ createSaveTab();
+ }
+
+ if (finalize == false) { // if finalization step, it will crash
+ // because register purged yet
+
+ ByteArrayOutputStream stream = convertTask2stream(sauv);
+
+ // find the BackupNode to send
+ int taskRankDest;
+ TaskId task = null;
+ JaceInterface stub = null;
+ boolean sent = false;
+ int j = 0;
+
+ while (j < saveTab.length && sent == false) {
+ // 1 - find the remote task Id to send the backup to
+ if (jaceSize < (2 * Register.Instance()
+ .getNumBackupNeighbors() + 1)) {
+ taskRankDest = saveTab[saveRound % (jaceSize - 1)];
+ } else {
+ taskRankDest = saveTab[saveRound
+ % (2 * Register.Instance()
+ .getNumBackupNeighbors())];
+ }
+
+ // 2 - knowing the destination task Id, get the stub of the
+ // corresponding node
+ try {
+ task = Register.Instance().getListeOfTasks()
+ .getTaskIdOfRank(taskRankDest); // ///////////////////////////pb
+ stub = task.getHostStub();
+ } catch (Exception e) {
+ System.out
+ .println("problem in SaveTaskThread on assignation line in save : "
+ + e);
+ }
+ // System.out.println("ite " + jaceP2P_Iteration +
+ // ".......... SENDING on task " + taskRankDest);
+
+ // if no stub there is a problem
+ if (stub == null) {
+ j++;
+ saveRound++;
+ // System.out.println("unable to SEND backup on task of rank "
+ // + taskRankDest);
+ }
+ // 3 - try to send the stream
+ else {
+ // if there is a stub, send the stream to that node
+ try {
+ stub.saveTask(jaceMyId, stream.toByteArray(),
+ sauv.lastSave.getLastSave(), sauv.timeStep,
+ Register.Instance().getAppliName(), 0);
+ // System.out.println("saved data on "+taskRankDest+" iteration= "+sauv.lastSave.getLastSave()+
+ // "timeStep="+sauv.timeStep+" !!!!!!!!");
+
+ // Vector v = stub.getIterationOfBackup(jaceMyId,0);
+ // int ite=((Integer)v.get(0)).intValue();
+ // System.out.println("******************************************sauvegarde de donnees: ite="+ite+" taskDest="+taskRankDest+" ******************************************************");
+ sent = true;
+
+ } catch (Exception e) {
+ System.out
+ .println("JaceP2P_Error in Task.jaceP2P_Save() when saving stream: "
+ + e);
+ j++;
+ saveRound++;
+
+ }
+ }
+
+ try {
+ JaceSession.Instance().getTaskThread().sleep(10);
+ JaceSession.Instance().getTaskThread().yield();
+ } catch (Exception e) {
+ }
+ }
+
+ // 5 - if stream not sent at all, do something (WHAT ???)
+ if (j > saveTab.length) {
+ System.out
+ .println("No more alive neighbors for storing the Backup");
+ // TODO : what to do if no BackupNode has answered ???
+ }
+
+ }
+ }
+ }
+
+}
+
+class BroadcastTaskThread extends Thread {
+ JaceInterface stub;
+ int rank;
+ byte[] tsk;
+ int iteration;
+ int tag;
+ String appliName;
+ int timeStep;
+ int dest;
+
+ // constructor
+ public BroadcastTaskThread(JaceInterface theStub, int theRank,
+ byte[] theTsk, int theIteration, int timeStep, String theAppliName,
+ int tag, int dest) {
+ this.stub = theStub;
+ this.rank = theRank;
+ this.tsk = theTsk;
+ this.iteration = theIteration;
+ this.timeStep = timeStep;
+ this.appliName = theAppliName;
+ this.tag = tag;
+ this.dest = dest;
+ // System.out.println("tag="+tag);
+ }
+
+ // method launched by start()
+ public void run() {
+ try {
+ stub.saveTask(rank, tsk, iteration, timeStep, appliName, tag);
+ JaceSession.Instance().getTaskObject().setSaved(true);
+
+ if (tag == 0) {
+ // Vector v = stub.getIterationOfBackup(rank,0);
+ // int ite=((Integer)v.get(0)).intValue();
+ // System.out.println("++++++++++ Broadcast data ite="+iteration+" dest="+dest+" timeStep="+timeStep);
+
+ } else {
+ // Vector v = stub.getIterationOfBackup(rank,1);
+ // int ite=((Integer)v.get(0)).intValue();
+ // System.out.println("++++++++++ Broadcast dataConvg ite="+iteration+" dest="+dest+" timeStep="+timeStep);
+ // System.out.println("+++++++++++++++++++++++++++++++++++++++++ Broadcast convgData ite="+ite+" dest="+dest+" +++++++++++++++++++++++++++++++++++++++++++++++++++++");
+
+ }
+ } catch (Exception e) {
+ System.out
+ .println("node not reachable by JaceServer.saveTask() in BroadcastTaskThread :"
+ + e);
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+public class TaskId implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // attributes
+ private String appliName = null;
+ private int rank = -1;
+ private String hostIP = null; // TODO a virer et laisser que le hostStub
+ private String hostName = null; // TODO a virer et laisser que le hostStub
+ private JaceInterface hostStub = null;
+
+ // constructors
+ public TaskId() {
+ }
+
+ public TaskId(String nom, int rang, JaceInterface s) {
+ this.appliName = nom;
+ this.rank = rang;
+ // this.host = hosts;
+ this.hostStub = s;
+ }
+
+ public void setAppliName(String nom) {
+ this.appliName = nom;
+ }
+
+ // TODO a virer et laisser que le hostStub
+ public void setHostIP(String nom) {
+ this.hostIP = nom;
+ }
+
+ // TODO a virer et laisser que le hostStub
+ public void setHostName(String nom) {
+ this.hostName = nom;
+ }
+
+ public void setRank(int val) {
+ this.rank = val;
+ }
+
+ public void setHostStub(JaceInterface s) {
+ this.hostStub = s;
+ }
+
+ public String getAppliName() {
+ return this.appliName;
+ }
+
+ // TODO a virer et laisser que le hostStub
+ public String getHostIP() {
+ return this.hostIP;
+ }
+
+ // TODO a virer et laisser que le hostStub
+ public String getHostName() {
+ return this.hostName;
+ }
+
+ public int getRank() {
+ return this.rank;
+ }
+
+ public JaceInterface getHostStub() {
+ return this.hostStub;
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.Vector;
+
+
+
+public class TaskLauncher {
+
+ // attributes
+ Class<?> c;
+ Loader load;
+
+ // constructors
+ TaskLauncher() {
+ }
+
+ @SuppressWarnings({ "unchecked", "static-access" })
+ public void loadOrReloadTask(Backup b, Backup bConvg) {
+ String appliName = Register.Instance().getAppliName();
+ System.out.println("appliName=" + appliName);
+ Task tsk = null;
+ BackupConvg tskConvg = new BackupConvg();
+ // parse the file:/path and transform the .class of the user in java
+ // "Class" object
+ load = new Loader();
+ c = load.load(appliName);
+ try {
+ // instanciate the appli (the .class of the user) and load it
+ tsk = ((Task) c.newInstance());
+ } catch (Exception e) {
+ System.out.println("unable to instanciate the class :" + e);
+ }
+ try {
+ if (b != null) {
+ byte[] tab = b.getData();
+ // System.out.println("size of the byte array : " + tab.length +
+ // " bytes");
+
+ // convert the stream received in an object of the same type of
+ // the user class, that inherits from Tasks (but is not such
+ // generic as a simple Task object)
+ try {
+ ObjectInputStream stream = new ObjectInputStream(
+ new ByteArrayInputStream(tab));
+ tsk = tsk.jaceP2P_ConvertStream(stream);
+
+ stream.close();
+ } catch (Exception e) {
+ System.out
+ .println("JaceP2P_Error ds TaskLauncher.loadOrReloadTask() "
+ + "when converting the stream in Task : "
+ + e);
+ }
+ } else {
+ TaskId id = Register.Instance().getListeOfTasks()
+ .getTaskIdOfHostStub(LocalHost.Instance().getStub());
+ // assign the attributes of the Task object :
+ tsk.setId(id); // assign JaceMyId, jaceTaskId, jaceSize
+ tsk.setParam(Register.Instance().getParams()); // assign
+ // jaceArgs
+
+ System.out.println("PAS de backup");
+ }
+
+ // insert Task in taskObject, the attribute of the JaceSession class
+ JaceSession.Instance().addTaskObject(tsk);
+ System.out.println("after add task");
+ if (b != null) {
+
+ int it = b.getIteration();
+ tsk.reloading = true;
+ System.out.println("timeStep=" + tsk.timeStep);
+
+ // reinit the Task object as it was overloaded by the user in
+ // the
+ if (bConvg != null) {
+ int itConv = bConvg.getIteration();
+ if (itConv > it) {
+ byte[] tab1 = bConvg.getData();
+ ObjectInputStream stream = new ObjectInputStream(
+ new ByteArrayInputStream(tab1));
+ tskConvg = (BackupConvg) stream.readObject();
+ if (tskConvg.timeStep >= tsk.timeStep) {
+ System.out.println("state=" + tskConvg.state);
+ tsk.state = tskConvg.state;
+ tsk.nb_not_recv = tskConvg.nb_not_recv;
+ tsk.electedNode = tskConvg.electedNode;
+ tsk.respSent = tskConvg.respSent;
+ System.out.println("avant le copiage des vecteurs");
+ tsk.neighbors = (Vector) (tskConvg.neighbors)
+ .clone();
+
+ tsk.neighborsValues = (Vector) (tskConvg.neighborsValues)
+ .clone();
+ if (bConvg.getIteration() >= b.getIteration())
+ tsk.reduceAll = (Vector) (tskConvg.reduceAll)
+ .clone();
+ tsk.resp = (Vector) (tskConvg.resp).clone();
+ System.out.println("apres le copiage des vecteurs");
+ tsk.underTh = tskConvg.underTh;
+ tsk.verifNum = tskConvg.verifNum;
+ tsk.sendId = tskConvg.sendId;
+ tsk.finalStep = tskConvg.finalStep;
+ tsk.action = tskConvg.action;
+ tsk.verdict = tskConvg.verdict;
+ tsk.localCV_state = tskConvg.localCV_state;
+ tsk.timeStep = tskConvg.timeStep;
+ tsk.lastSave = tskConvg.lastSave;
+
+ // tsk.jaceP2P_Iteration=tskConvg.jaceP2P_Iteration;
+ tsk.recievedVerdict = tskConvg.recievedVerdict;
+ }
+ }
+ }
+
+ System.out.println("ReinitTask!!!!");
+ tsk.jaceP2P_ReinitTask();
+ tsk.postReloading = true;
+ tsk.reloading = false;
+ } else {
+ // initialize the task with the appli specific data (Method
+ // jaceP2P_InitTask() overloaded in the appli code by the user)
+ System.out.println("before Init task");
+
+ tsk.jaceP2P_InitTask();
+ System.out.println("after Init task");
+ }
+
+ if (LocalHost.Instance().getStartedThreads() == false) {
+ // create the thread Sender (only 1 Sender thread per node, even
+ // if several tasks like in Jace) which is the thread
+ // responsible
+ // for sending messages stored in JaceBuffer (the queue of
+ // messages to send)
+ if (JaceDaemon.Instance().getProtocol().equals("rmi")) {
+ Sender.setInstance(SenderRmi.Instance());
+ SenderRmi.Instance().start();
+
+ } else {
+ Sender.setInstance(SenderSocket.Instance());
+ SenderSocket.Instance().start();
+
+ }
+ ScanThread.Instance().start();
+ LocalHost.Instance().setStartedThreads(true);
+ }
+
+ // Create the computing thread (the thread of the Task object) and
+ // start it
+ Thread th = new Thread(tsk, new String().valueOf(tsk.jaceMyId));
+ th.start();
+
+ // keep this thread in an attribute of JaceSession (taskThread)
+ JaceSession.Instance().addTaskThread(th);
+
+ Thread.currentThread().yield();
+
+ } catch (Exception e) {
+ System.out.println("PB in TaskLauncher.loadOrReloadTask() : " + e);
+ e.printStackTrace(System.out);
+ }
+ }
+
+}
--- /dev/null
+package jaceP2P;
+
+import java.util.Vector;
+
+public class TokenThread extends Thread {
+
+ public static TokenThread Instance = null;
+
+ public boolean token = false;
+
+ public TokenThread() {
+ }
+
+ public boolean getToken() {
+ return token;
+ }
+
+ public static TokenThread Instance() {
+
+ if (Instance == null)
+ Instance = new TokenThread();
+ return Instance;
+ }
+
+ public void run() {
+
+ while (true) {
+ try {
+ Thread.sleep(2000);
+ // System.out.println("TokenThread still alive");
+ if (token) {
+ // System.out.println("distribute!!!!");
+ distribute();
+ } else {
+ // System.out.println("waiting !!!! token="+token);
+ wait();
+
+ }
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ public void setToken() {
+ token = true;
+ ScanThreadSuperNode.Instance().setToken(false);
+ System.out.println("the token is put to true in the tokenthread");
+ try {
+ synchronized (TokenThread.Instance()) {
+ TokenThread.Instance().notify();
+ }
+ } catch (Exception e) {
+ System.out.println("unable to notify :" + e);
+ }
+ // System.out.println("finished set token");
+ }
+
+ @SuppressWarnings("unchecked")
+ public void distribute() {
+ System.out.println("totalDaemons="
+ + SuperNodeListe.Instance().getTotalDaemons());
+ int nbLocal = Register.Instance().getSize();
+ int totalDaemons;
+ Vector liste = SuperNodeListe.Instance().getListe();
+ synchronized (SuperNodeListe.Instance()) {
+ synchronized (Register.Instance()) {
+ if (nbLocal > 5) {
+ totalDaemons = SuperNodeListe.Instance().getTotalDaemons();
+ int nbDaemons = totalDaemons / liste.size();
+ // System.out.println("nbDaemons="+nbDaemons+" totalDaemons="+totalDaemons+" nbLocal="+nbLocal);
+ if (nbDaemons < nbLocal)
+ for (int i = 0; i < liste.size(); i++)
+ if (((SuperNodeData) liste.get(i)).getNbOfNodes() < nbDaemons)
+ try {
+ // System.out.println("Snode="+i+" nodes="+((SuperNodeData)liste.get(i)).getNbOfNodes());
+ Vector nodes = Register.Instance()
+ .getListe();
+ int amountToSend = nbDaemons
+ - ((SuperNodeData) liste.get(i))
+ .getNbOfNodes();
+ Vector newVector = new Vector();
+ for (int j = 0; j < amountToSend; j++) {
+ newVector.add(nodes.elementAt(j));
+ }
+ ((SuperNodeData) liste.get(i)).getStub()
+ .sendSurplus(newVector);
+ for (int j = 0; j < amountToSend; j++)
+ nodes.remove(0);
+ int index = SuperNodeListe.Instance()
+ .existSuperNode(
+ LocalHost.Instance()
+ .getIP());
+ ((SuperNodeData) SuperNodeListe.Instance()
+ .getListe().get(index))
+ .setNbOfNodes(Register.Instance()
+ .getSize());
+ System.out.println("size reg="
+ + Register.Instance().getSize()
+ + " liste="
+ + ((SuperNodeData) SuperNodeListe
+ .Instance().getListe().get(
+ index))
+ .getNbOfNodes());
+ new ForwardCount().start();
+ } catch (Exception e) {
+
+ System.out
+ .println("unable to send surplus to "
+ + ((SuperNodeData) liste
+ .get(i)).getIP()
+ + " :" + e);
+ }
+
+ }
+ }
+ }
+ boolean sendToken = true;
+ while (sendToken) {
+ // index of local node
+ int index = SuperNodeListe.Instance().existSuperNode(
+ LocalHost.Instance().getIP());
+ // index of next node
+
+ // System.out.println("index="+index);
+ // System.out.println(SuperNodeListe.Instance().getListe().size());
+ // System.out.println(((SuperNodeData)SuperNodeListe.Instance().getListe().elementAt(0)).getIP());
+ index = (index + 1) % SuperNodeListe.Instance().getListe().size();
+ // System.out.println("index="+index);
+ try {
+ ((SuperNodeData) liste.elementAt(index)).getStub().setToken();
+ if (index != SuperNodeListe.Instance().existSuperNode(
+ LocalHost.Instance().getIP()))
+ token = false;
+ sendToken = false;
+ System.out.println("Passing token to " + index);
+ } catch (Exception e) {
+ try {
+ System.out.println("Unable to send Token to "
+ + ((SuperNodeData) liste.elementAt(index)).getIP()
+ + " :" + e);
+ /*
+ * SuperNodeListe.Instance().removeSuperNode(((SuperNodeData)
+ * liste.elementAt(index)));
+ *
+ * try{ for(int
+ * i=0;i<SuperNodeListe.Instance().getListe().size();i++)
+ *
+ * ((SuperNodeData)SuperNodeListe.Instance().getListe().
+ * elementAt
+ * (i)).getStub.removeSuperNode(((SuperNodeData)liste
+ * .elementAt(index)));
+ *
+ * }catch(Exception e2){System.out.println(
+ * "diffuse the message concerning a dead superNode: "+e2);
+ * } if(index==SuperNodeListe.Instance().getListe().size())
+ * index=0;
+ * HeartBeatThread.Instance().setServer(((SuperNodeData
+ * )liste.elementAt(index)).getStub()); try{
+ */
+
+ Thread.sleep(HeartBeatSNode.Instance().getHeartTime());
+ } catch (Exception e2) {
+ System.out.println("-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_");
+ }
+ }
+ }
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.util.Vector;
+
+class UpdateRegisterConcernedThread extends Thread {
+ Register reg = null;
+ int requete;
+ int i, rank;
+ Task tache = null;
+ Vector<?> neighbors;
+ int nbOfDeamonsPerThread;
+ Node oldNode, node;
+
+ public UpdateRegisterConcernedThread(Vector<?> neighbors, Register r, int i,
+ int rank, Node noeud, Node tmpNode) {
+
+ this.reg = r;
+ this.rank = rank;
+ this.i = i;
+ //his.tache = tache;
+ this.neighbors = neighbors;
+ oldNode = noeud;
+ node = tmpNode;
+ nbOfDeamonsPerThread = JaceSpawner.Instance().getNbOfDeamonsPerThread();
+ }
+
+ public void run() {
+ ListeTask listOfTasks = Register.Instance().getListeOfTasks().clone();
+ // listOfTasks.viewAll();
+ for (int index = i * nbOfDeamonsPerThread; index < i
+ * nbOfDeamonsPerThread + nbOfDeamonsPerThread
+ && index < neighbors.size(); index++) {
+
+ try {
+ if (((Integer) neighbors.elementAt(index)).intValue() == rank) {
+ Register g = new Register();
+ ListeTask newListOfTasks = new ListeTask();
+ g.setAppliName(reg.getAppliName());
+ g.setParams(reg.getParams());
+
+ g.setSpawnerStub(JaceSpawner.Instance()
+ .getSpawnerResponsibleOn(rank));
+ g.setNbOfTasks(reg.getNbOfTasks());
+
+ for (int j = 0; j < neighbors.size(); j++) {
+ TaskId id = listOfTasks
+ .getTaskIdOfRank(((Integer) neighbors
+ .elementAt(j)).intValue());
+ newListOfTasks.addTask(id);
+ if (id.getHostStub() != null) {
+ Node noeud = reg.getNodeOfStub(id.getHostStub());
+ Node n2 = noeud;
+
+ g.addNode(n2);
+ }
+ }
+ g.setListeOfTasks(newListOfTasks);
+
+ TaskId neighborTask = listOfTasks
+ .getTaskIdOfRank((((Integer) neighbors
+ .elementAt(index)).intValue() + 1)
+ % listOfTasks.getSize());
+ JaceInterface stubVoisin = neighborTask.getHostStub();
+ JaceInterface stub = listOfTasks.getTaskIdOfRank(
+ ((Integer) neighbors.elementAt(index)).intValue())
+ .getHostStub();
+ if (stub == null || stubVoisin == null) {
+ System.out.println("stub NULL");
+ } else {
+
+ String name = listOfTasks.getTaskIdOfRank(
+ ((Integer) neighbors.elementAt(index))
+ .intValue()).getHostName();
+
+ try {
+ // whatever the case, uate the Register on each Node
+
+ // JaceSpawnerInterface
+ // ja=Register.Instance().getSpawnerStub();
+ // System.out.println("avant update register");
+
+ // 0 l'applejaacoja commence
+ stub.updateRegister(g, stubVoisin, 0);
+
+ // System.out.println(" after update register");
+ // if updateRegister worked, that means Node is
+ // Alive
+ Node noeud = Register.Instance()
+ .getNodeOfStub(stub);
+ if (noeud != null) {
+ noeud.setAliveFlag(true);
+ noeud.setAliveTime();
+
+ }
+ } catch (Exception e) {
+ System.out
+ .println(name
+ + " ("
+ + LocalHost.Instance()
+ .resolve(name)
+ + " rank="
+ + index
+ + ") cannot be reached by UpdateRegisterConcernedThread for new node "
+ + " size : "
+ + Register.Instance().getSize()
+ + " :" + e);
+
+ }
+ }
+ } else {
+ //int tag;
+ TaskId id = listOfTasks
+ .getTaskIdOfRank(((Integer) neighbors
+ .elementAt(index)).intValue());
+ String name = id.getHostName();
+
+ try {
+
+ JaceInterface stub = id.getHostStub();
+ stub.updateRegister(oldNode, node);
+ } catch (Exception e) {
+ System.out
+ .println(name
+ + " ("
+ + LocalHost.Instance().resolve(name)
+ + " rank="
+ + index
+ + ") cannot be reached by UpdateRegisterConcernedThread "
+ + " size : "
+ + Register.Instance().getSize() + " :"
+ + e);
+ }
+ }
+ } catch (Exception e2) {
+ System.out.println("error in updateRegisterConcernedThread :"
+ + e2);
+ }
+ }
+ // System.out.println("at the end of an updateRegisterCon. j="+i);
+ // Register.Instance().getListeOfTasks().viewAll();
+ }
+
+ @SuppressWarnings("unused")
+ private Vector<Integer> getDependencies(int id, int jaceSize) {
+ // get computing dependencies
+ Vector<Integer> neighbors = new Vector<Integer>();
+ int[] dep = tache.getDependencies(id);
+ for (int z = 0; z < taille(dep); z++)
+ neighbors.add(dep[z]);
+ // System.out.println("la liste des voisins de calcul de: "+id);
+ // for(int z=0;z<neighbors.size();z++)
+ // System.out.print(((Integer)neighbors.elementAt(z)).intValue()+" ");
+ // System.out.println();
+ // get convergence neighbors
+ int d = 0;
+ while (Math.pow(2, d) < jaceSize) {
+ if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize))
+ if (!neighbors.contains((Object) ((int) (id + Math.pow(2, d)))))
+ neighbors.add((int) (id + Math.pow(2, d)));
+ if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d))
+ if (!neighbors.contains((Object) ((int) (id - Math.pow(2, d)))))
+ neighbors.add((int) (id - Math.pow(2, d)));
+ d++;
+ }
+
+ // get backup neighbors
+ int nb = reg.getNumBackupNeighbors();
+ int rankOfBackTask;
+ int tmp;
+ for (int j = 1; j <= nb; j++) {
+ // ------------ 1 - for backups "j + n" (to the right of j)
+ rankOfBackTask = (id + j) % jaceSize;
+ if (!neighbors.contains((Object) rankOfBackTask))
+ neighbors.add(rankOfBackTask);
+
+ // ------------ 2 - for backups "j - n" (to the left of j)
+ tmp = id - j;
+ if (tmp >= 0) {
+ rankOfBackTask = tmp % jaceSize;
+ } else {
+ rankOfBackTask = jaceSize - (Math.abs(tmp) % jaceSize);
+ }
+ if (!neighbors.contains((Object) rankOfBackTask))
+ neighbors.add(rankOfBackTask);
+ }
+ // adds itself
+ neighbors.add(id);
+ return neighbors;
+
+ }
+
+ public static int taille(int[] vect) {
+ int taille = 0;
+ int x = 0;
+ while (x < vect.length && vect[x] >= 0) {
+ taille++;
+ x++;
+ }
+ return x;
+ }
+}
--- /dev/null
+package jaceP2P;
+
+import java.util.Vector;
+
+class UpdateRegisterThread extends Thread {
+ // Register reg = Register.Instance();
+ Register reg = null;
+ int requete;
+ int i, debut;
+ Task tache = null;
+ int nbOfDeamonsPerThread;
+ int nbOfDeamonsPerSpawner;
+
+ public UpdateRegisterThread(Task tache, Register r, int req, int i,
+ int debut) {
+
+ this.reg = r;
+ this.requete = req;
+ this.i = i;
+ this.tache = tache;
+ this.debut = debut;
+ nbOfDeamonsPerThread = JaceSpawner.Instance().getNbOfDeamonsPerThread();
+ nbOfDeamonsPerSpawner = JaceSpawner.Instance()
+ .getNbOfDeamonsPerSpawner();
+ // requete=1 if beginning of appli to ping the spawner and not a
+ // supernode
+ // requete=0 if only update Register
+ /*
+ * told in another way : if requete = 1, update the Register AND the
+ * beating Server on each node else, only update the Register on each
+ * node
+ */
+
+ }
+
+ // method launched by start()
+ public void run() {
+ ListeTask listOfTasks = reg.getListeOfTasks();
+ // System.out.println("Thread "+i+" start from "+(debut+i*nbOfDeamonsPerThread)+" till "+(debut+i*nbOfDeamonsPerThread+nbOfDeamonsPerThread)+" or "+(debut+nbOfDeamonsPerSpawner)+" or "+listOfTasks.getSize());
+ for (int index = debut + i * nbOfDeamonsPerThread; index < debut + i
+ * nbOfDeamonsPerThread + nbOfDeamonsPerThread
+ && index < debut + nbOfDeamonsPerSpawner
+ && index < listOfTasks.getSize(); index++) {
+ // Node node = Register.Instance().getNodeAt(index);
+ // Node node = reg.getNodeAt(index);
+ // Node voisin=reg.getNodeAt((index+1)%reg.getSize());
+ Vector<Integer> dependencies = getDependencies(index, listOfTasks.getSize());
+ // System.out.println("la liste des voisins de : "+index);
+ // for(int z=0;z<dependencies.size();z++)
+ // System.out.print(((Integer)dependencies.elementAt(z)).intValue()+" ");
+ // System.out.println();
+ System.out.println(dependencies.size());
+ Register g = new Register();
+ ListeTask newListOfTasks = new ListeTask();
+ g.setAppliName(reg.getAppliName());
+ g.setParams(reg.getParams());
+ g.setSpawnerStub(reg.getSpawnerStub());
+ g.setNbOfTasks(reg.getNbOfTasks());
+ // g.setVersion(reg.getVersion());
+ for (int j = 0; j < dependencies.size(); j++) {
+ TaskId id = listOfTasks.getTaskIdOfRank(((Integer) dependencies
+ .elementAt(j)).intValue());
+ // System.out.println("got id of :"+((Integer)dependencies.elementAt(j)).intValue());
+ newListOfTasks.addTask(id);
+ if (id.getHostStub() != null) {
+ Node noeud = reg.getNodeOfStub(id.getHostStub());
+ g.addNode(noeud);
+ }
+ }
+ g.setListeOfTasks(newListOfTasks);
+ // g.setInstance();
+ TaskId neighborTask = listOfTasks.getTaskIdOfRank((index + 1)
+ % listOfTasks.getSize());
+ // if (node == null) {
+ // System.out.println("node NUULL");
+ // }
+ JaceInterface stubVoisin = neighborTask.getHostStub();
+ JaceInterface stub = listOfTasks.getTaskIdOfRank(index)
+ .getHostStub();
+ if (stub == null && stubVoisin == null) {
+ System.out.println("stub NULL");
+ }
+
+ String name = listOfTasks.getTaskIdOfRank(index).getHostName();
+
+ try {
+
+ // whatever the case, uate the Register on each Node
+
+ // JaceSpawnerInterface ja=Register.Instance().getSpawnerStub();
+ // System.out.println("avant update register");
+
+ // 1 l'appli vient de commencer
+ stub.updateRegister(g, stubVoisin, requete);
+ // System.out.println("Envoi du registre a :"+index);
+ // g.viewAll();
+ // System.out.println("ancien registre");
+ // reg.viewAll();
+ // System.out.println("Le registre initial");
+ // Register.Instance().viewAll();
+ // System.out.println(" after update register");
+ // if updateRegister worked, that means Node is Alive
+ Node noeud = Register.Instance().getNodeOfStub(stub);
+ if (noeud != null) {
+ noeud.setAliveFlag(true);
+ noeud.setAliveTime();
+
+ }
+ } catch (Exception e) {
+ System.out.println(name + " ("
+ + LocalHost.Instance().resolve(name) + " rank=" + index
+ + ") cannot be reached by UpdateRegisterThread "
+ + " size : " + Register.Instance().getSize() + " :"
+ + e);
+ try {
+ stub.suicide(" by Update");
+ // System.out.println("I suicide " + name + "\n\n");
+ } catch (Exception e2) {
+ // System.out.println("It is already Dead " + name +
+ // " : \n\n");
+ }
+ }
+ }
+
+ }
+
+ private Vector<Integer> getDependencies(int id, int jaceSize) {
+ // get computing dependencies
+ Vector<Integer> neighbors = new Vector<Integer>();
+ int[] dep = null;
+ // try{
+ // if(tache==null)
+ // System.out.println("tache = null");
+ dep = tache.getDependencies(id);
+ // }catch(Exception e)
+ // {e.printStackTrace(System.out);}
+ for (int z = 0; z < taille(dep); z++)
+ neighbors.add(dep[z]);
+ // System.out.println("la liste des voisins de calcul de: "+id);
+ // for(int z=0;z<neighbors.size();z++)
+ // System.out.print(((Integer)neighbors.elementAt(z)).intValue()+" ");
+ // System.out.println();
+ // get convergence neighbors
+ int d = 0;
+ while (Math.pow(2, d) < jaceSize) {
+ if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize))
+ if (!neighbors.contains((Object) ((int) (id + Math.pow(2, d)))))
+ neighbors.add((int) (id + Math.pow(2, d)));
+ if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d))
+ if (!neighbors.contains((Object) ((int) (id - Math.pow(2, d)))))
+ neighbors.add((int) (id - Math.pow(2, d)));
+ d++;
+ }
+ // System.out.println("taille="+neighbors.size());
+ // get backup neighbors
+ int nb = reg.getNumBackupNeighbors();
+ int rankOfBackTask;
+ int tmp;
+ for (int j = 1; j <= nb; j++) {
+ // System.out.println("dans la boucle nb="+nb+" j="+j);
+ // ------------ 1 - for backups "j + n" (to the right of j)
+ rankOfBackTask = (id + j) % jaceSize;
+ if (!neighbors.contains((Object) rankOfBackTask))
+ neighbors.add(rankOfBackTask);
+
+ // ------------ 2 - for backups "j - n" (to the left of j)
+ tmp = id - j;
+ if (tmp >= 0) {
+ rankOfBackTask = tmp % jaceSize;
+ } else {
+ rankOfBackTask = jaceSize - (Math.abs(tmp) % jaceSize);
+ }
+ if (!neighbors.contains((Object) rankOfBackTask))
+ neighbors.add(rankOfBackTask);
+ }
+ // System.out.println("taille="+neighbors.size());
+ // adds itself
+ neighbors.add(id);
+ return neighbors;
+
+ }
+
+ public static int taille(int[] vect) {
+ int taille = 0;
+ int x = 0;
+ while (x < vect.length && vect[x] >= 0) {
+ taille++;
+ x++;
+ }
+ return x;
+ }
+}