3 import java.io.BufferedReader;
5 import java.io.IOException;
6 import java.io.InputStreamReader;
7 import java.rmi.Naming;
8 import java.rmi.RemoteException;
9 import java.rmi.registry.LocateRegistry;
10 import java.rmi.registry.Registry;
11 import java.rmi.server.UnicastRemoteObject;
12 import java.util.ArrayList;
13 import java.util.Iterator;
14 import java.util.concurrent.Semaphore;
17 public class Server extends UnicastRemoteObject implements ServicesServer
19 private class DiscCount
23 DiscCount() { nb = 0 ; }
25 protected void inc() { nb++ ; }
27 protected void dec() {
34 protected int getNb() { return nb ; }
38 private class IPAssociation
41 private String hostIP ;
49 protected void setVmIP( String _vmIP )
54 protected void setHostIP( String _hostIP )
59 protected String getVmIP()
64 protected String getHostIP()
72 private static final long serialVersionUID = 1L ;
74 private ArrayList<ConnectedClient> clients ;
75 private ArrayList<ComputingClient> computingClients ;
76 private ArrayList<RunningApplication> applications ;
77 private int max_timeout ;
78 private ConnectedMonitor monitor ;
79 private DiscCount counter ;
80 private ArrayList<IPAssociation> vmIPs ;
81 private String working_directory ;
82 private long save_interleave ;
83 private Semaphore semaSave ;
86 protected Server() throws RemoteException
94 public Integer register( ServicesClient _stub )
100 ip = _stub.getIPHost() ;
101 } catch (RemoteException e) {
102 e.printStackTrace() ;
106 boolean exists = false ;
109 for( i = 0 ; i < clients.size() ; i++ )
111 if( ip.equals( clients.get( i ).getIP() ) )
114 System.out.println( "Client already connected!" ) ;
121 System.out.println( "The client stub will be replaced." ) ;
122 clients.get( i ).setStub( _stub ) ;
123 System.out.println( "(reconnection of " + clients.get( i ).getName() + ")" ) ;
126 System.out.println( "New connection!" ) ;
127 clients.add( new ConnectedClient( _stub ) ) ;
128 System.out.println( "(connection of " + clients.get( clients.size() - 1 ).getName() + ")" ) ;
131 if( clients.size() == 0 )
133 System.out.println( "There is no client connected." ) ;
134 } else if( clients.size() == 1 ) {
135 System.out.println( "There is one client connected." ) ;
137 System.out.println( "There are " + clients.size() + " clients connected." ) ;
148 private void generateVmIP( String _ip )
150 if( _ip != null && ! _ip.equals( "" ) )
152 for( int i = 0 ; i < vmIPs.size() ; i++ )
154 if( vmIPs.get( i ).getHostIP().equalsIgnoreCase( "" ) )
156 vmIPs.get( i ).setHostIP( _ip ) ;
166 public void ping( String _ip )
170 for( int i = 0 ; i < clients.size() ; i++ )
172 if( _ip.equals( clients.get( i ).getIP() ) )
174 clients.get( i ).resetTimeout() ;
183 public void changeStatus( String _ip, String _status )
185 if( _ip != null && _status != null )
187 for( int i = 0 ; i < clients.size() ; i++ )
189 if( _ip.equals( clients.get( i ).getIP() ) )
191 clients.get( i ).setStatus( _status ) ;
192 System.out.println( "Client " + clients.get( i ).getName() + " changed its status to: " + _status ) ;
200 public void init( int _port )
205 clients = new ArrayList<ConnectedClient>() ;
206 computingClients = new ArrayList<ComputingClient>() ;
207 applications = new ArrayList<RunningApplication>() ;
210 working_directory = "/localhome/vmware" ;
212 save_interleave = 30 * 60 * 1000 ;
214 semaSave = new Semaphore( 1 ) ;
218 vmIPs = new ArrayList<IPAssociation>() ;
219 // TODO initialisation of VM IPs
220 for( int i = 2 ; i < 101 ; i++ )
222 vmIPs.add( new IPAssociation() ) ;
223 vmIPs.get( vmIPs.size() - 1 ).setVmIP( "10.11.10." + i ) ;
226 clients = new ArrayList<ConnectedClient>() ;
228 counter = new DiscCount() ;
230 monitor = new ConnectedMonitor() ;
234 // Check if there are running applications ... and restart them :)
240 if( monitor != null ) { monitor.stopMonitor() ; }
242 for( int i = 0 ; i < clients.size() ; i++ )
245 clients.get( i ).getStub().emergencyStop() ;
246 clients.get( i ).getStub().stop() ;
247 } catch (RemoteException e) {
258 private void exportObject()
260 ServicesServer ref = null ;
261 Registry reg = null ;
267 reg = LocateRegistry.getRegistry( port ) ;
269 String tab[] = reg.list() ;
271 System.out.println( "There is an existing RMI Registry on port " +
272 port + " with " + tab.length + " entries!" ) ;
273 for( int i = 0 ; i < tab.length ; i++ )
276 if( UnicastRemoteObject.unexportObject( Naming.lookup(tab[i]), true ) )
278 System.out.println( "Register successfuly deleted!" ) ;
280 System.err.println( "Register undeleted !!!" ) ;
282 } catch( Exception e ) {
283 e.printStackTrace() ;
287 } catch( RemoteException e ) {
291 if ( System.getSecurityManager() == null )
293 System.setSecurityManager( new SecurityManager() ) ;
296 LocateRegistry.createRegistry( port ) ;
297 LocateRegistry.getRegistry( port ).rebind( "Server", this ) ;
298 ref = (ServicesServer) Naming.lookup( "rmi://"
299 + LocalHost.Instance().getIP() + ":" + port
301 } catch ( Exception e ) {
302 System.err.println( "Error in Server.exportObject() when creating local services:" + e ) ;
303 System.err.println( "Exit from Server.exportObject" ) ;
307 LocalHost.Instance().setServerStub( ref ) ;
309 System.out.println( "Server launched on IP " + LocalHost.Instance().getIP() +
310 " on port " + port + "." ) ;
313 /** Fault manager thread **/
314 private class FaultManager extends Thread
318 FaultManager( ConnectedClient _cl )
326 if( cl != null && cl.getStatus().equalsIgnoreCase( "running" ) ||
327 cl.getStatus().equalsIgnoreCase( "saving" ) )
329 ComputingClient cc = cl.getComputingClient() ;
330 // ServicesClient dead = cc.getClient().getStub() ;
331 String ipDead = cc.getClient().getIP() ;
332 SaveNeighbor snDead = null ;
333 for( int i = 0 ; i < computingClients.size() ; i++ )
335 if( computingClients.get( i ).getSaveNeighbor().getIPHost().equalsIgnoreCase( ipDead ) )
337 snDead = computingClients.get( i ).getSaveNeighbor() ;
344 for( int i = 0 ; i < clients.size() ; i++ )
346 if( clients.get( i ).getStatus().equalsIgnoreCase( "connected" ) )
350 // res = clients.get( i ).getStub().startVM() ;
351 // } catch( RemoteException e ) {
352 // e.printStackTrace();
357 //clients.get(i).setStatus( "running" ) ;
359 int pos = computingClients.indexOf( cc ) ;
362 System.err.println( "Dead client not found in the computing clients list!" ) ;
364 System.out.println( "Trying to replace " + cc.getClient().getName() + " with " +
365 clients.get(i).getName() + " ... " ) ;
367 String save_name = computingClients.get( pos ).getLastSave() ;
369 ComputingClient ccl = new ComputingClient( clients.get(i) ) ;
370 clients.get( i ).setComputingClient( ccl ) ;
371 SaveNeighbor sn = computingClients.get( pos ).getSaveNeighbor() ;
372 ccl.setSaveNeighbor( sn ) ;
373 computingClients.set( pos, ccl ) ;
378 res = computingClients.get( pos ).getClient().getStub().
379 retrieveSave( save_name ) ;
380 } catch( RemoteException e ) {
381 System.err.println( "Unable to indicate to client to retrieve last save!" ) ;
382 e.printStackTrace() ;
389 boolean ok_new = false, ok_old = false ;
391 // replace dead client in vmIPs
392 for( int j = 0 ; j < vmIPs.size() ; j++ )
394 if( vmIPs.get( j ).getHostIP().equalsIgnoreCase( computingClients.get( pos ).getClient().getIP() ) )
396 vmIPs.get( j ).setHostIP( "" ) ;
399 if( vmIPs.get( j ).getHostIP().equalsIgnoreCase( ipDead ) )
401 String vmIP = vmIPs.get( j ).getVmIP() ;
402 vmIPs.get( j ).setHostIP( computingClients.get( pos ).getClient().getIP() ) ;
406 computingClients.get( pos ).getClient().getStub().setIPVM( vmIP ) ;
407 } catch( RemoteException e ) {
408 System.err.println( "Unable to set the new VM IP on the replacing client!" ) ;
409 e.printStackTrace() ;
412 if( ok_new && ok_old )
419 // Replacing in RunningApplication
420 applications.get( 0 ).replaceComputingClient( cc, ccl ) ;
422 for( int l = 0 ; l < applications.get(0).getComputingClients().size() ; l++ )
424 applications.get(0).getComputingClients().get( l ).setSaveRequest( false ) ;
428 System.out.println( "Successful redeployment of the VM." ) ;
430 System.err.println( "Unable to deploy the save on the new computing client!" ) ;
434 // System.err.println( "Problem while launching the VM on "
435 // + clients.get(i).getName() + "!" ) ;
440 for( int k = 0 ; k < computingClients.size() ; k++ )
443 computingClients.get( k ).getClient().getStub().
444 replaceSaveNeighbor( snDead, new SaveNeighbor( clients.get( i ).getStub() ) ) ;
445 } catch( RemoteException e ) {
446 System.err.println( "Unable to inform " + computingClients.get( k ).getClient().getName() +
447 " of the replacement of a save neighbor!" ) ;
448 e.printStackTrace() ;
452 System.out.println( "Dead client successfully replaced." ) ;
456 System.err.println( "Dead client not replaced!!" ) ;
463 synchronized( counter ) {
465 counter.notifyAll() ;}
466 } catch( Exception e ) {}
471 /** Monitoring thread **/
472 private class ConnectedMonitor extends Thread
481 protected void stopMonitor() { run = false ; }
491 Iterator<ConnectedClient> it = clients.iterator() ;
492 int nb_disconnections = 0 ;
493 int nb_disconnections_computing = 0 ;
495 while( it.hasNext() )
497 ConnectedClient cl = it.next() ;
500 if( cl.getTimeout() > max_timeout )
502 System.out.println( "Disconnection of " + cl.getName() ) ;
503 if( cl.getStatus().equalsIgnoreCase( "running" ) || cl.getStatus().equalsIgnoreCase( "saving" ) )
505 System.out.println( "A VM was running on it!!" ) ;
506 System.out.println( "I will redeploy a save and restart all VM ..." ) ;
508 // for( int i = 0 ; i < computingClients.size() ; i++ )
510 // if( computingClients.get( i ).getClient().getIP().equals( cl.getIP() ) )
512 // computingClients.remove( i ) ;
516 synchronized( counter ){
520 new Server.FaultManager( cl ).start() ;
521 nb_disconnections_computing++ ;
523 System.out.println( "There was no VM running on it." ) ;
524 System.out.println( "Maybe it will come back later :)" ) ;
528 nb_disconnections++ ;
535 if( clients.size() == 0 )
537 System.out.println( "There is no client connected." ) ;
538 } else if( clients.size() == 1 ) {
539 System.out.println( "There is one client connected." ) ;
541 System.out.println( "There are " + clients.size() + " clients connected." ) ;
546 if( nb_disconnections_computing > 0 )
548 System.out.println( "Sending emergency stop signal to all computing nodes ... " ) ;
550 for( int i = 0 ; i < clients.size() ; i++ )
552 if( clients.get( i ).getStatus().equalsIgnoreCase( "running" )
553 || clients.get( i ).getStatus().equalsIgnoreCase( "saving" ) )
556 clients.get( i ).getStub().emergencyStop() ;
557 } catch( RemoteException e ) {
558 System.err.println( "Unable to invoke emergency stop signal on " + clients.get( i ).getName() ) ;
559 e.printStackTrace() ;
564 System.out.println( "I will redeploy save and restart VMs ... " ) ;
566 synchronized( counter )
568 if( counter.getNb() > 0 )
570 System.out.println( "... waiting all redeployments done ..." ) ;
573 while( counter.getNb() != 0 )
577 } catch( InterruptedException e ) {
578 e.printStackTrace() ;
583 for( int i = 0 ; i < applications.get(0).getComputingClients().size() ; i++ )
585 final ServicesClient sc = applications.get(0).getComputingClients().get( i ).getClient().getStub() ;
587 applications.get( 0 ).getComputingClients().get( i ).setRestartOk( false ) ;
589 new Thread( new Runnable() {
595 sc.restartVMAfterCrash() ;
596 } catch( RemoteException e ) {
597 e.printStackTrace() ;
606 Thread.sleep( 2000 ) ;
607 } catch( InterruptedException e ) {
608 e.printStackTrace() ;
615 public Integer saveOk( String _ip, String _saveName )
618 for( i = 0 ; i < computingClients.size() ; i ++ )
620 if( computingClients.get( i ).getClient().getIP().equalsIgnoreCase( _ip ) )
622 computingClients.get( i ).setLastSave( _saveName ) ;
623 computingClients.get( i ).setSaveStatus( true ) ;
629 boolean all_ok = true ;
632 while( all_ok && i < computingClients.size() )
634 all_ok = all_ok & computingClients.get( i ).getSaveStatus() ;
640 for( i = 0 ; i < computingClients.size() ; i++ )
643 computingClients.get( i ).getClient().getStub().saveOk() ;
644 } catch (RemoteException e) {
647 computingClients.get( i ).setSaveStatus( false ) ;
650 applications.get( 0 ).setLastSaveDate( System.currentTimeMillis() ) ;
657 public Integer changeSaveName( String _ip, String _saveName )
659 if( _ip != null && _ip.length() > 0 && _saveName != null && _saveName.length() > 0 )
661 for( int i = 0 ; i < computingClients.size() ; i ++ )
663 if( computingClients.get( i ).getClient().getIP().equalsIgnoreCase( _ip ) )
665 computingClients.get( i ).setLastSave( _saveName ) ;
666 System.out.println( "Save name successfully change for " + _ip ) ;
676 public ArrayList<ServicesClient> startApplication( int _nb )
678 int nb = clients.size() - computingClients.size() ;
682 ArrayList<ServicesClient> ac = new ArrayList<ServicesClient>() ;
683 ArrayList<ComputingClient> tmp = new ArrayList<ComputingClient>() ;
685 RunningApplication app = new RunningApplication( "Test" ) ;
689 while( i < clients.size() && ac.size() < _nb )
691 if( clients.get(i).getStatus().equalsIgnoreCase( "connected" ) )
695 res = clients.get( i ).getStub().startVM( 0 ) ;
696 } catch( RemoteException e ) {
702 ac.add( clients.get( i ).getStub() ) ;
703 clients.get( i ).setStatus( "running" ) ;
704 ComputingClient cl = new ComputingClient( clients.get( i ) ) ;
705 clients.get( i ).setComputingClient( cl ) ;
706 computingClients.add( cl ) ;
709 System.err.println( "Problem while launching the VM on "
710 + clients.get(i).getName() + "!" ) ;
717 if( ac.size() == _nb )
719 app.setComputingClients( tmp ) ;
720 app.setRunning( true ) ;
721 app.setStartTime( System.currentTimeMillis() ) ;
724 /* Choosing save neighbors */
725 for( i = 0 ; i < tmp.size() ; i++ )
727 if( i == tmp.size() - 1 )
729 index = computingClients.indexOf( tmp.get( i ) ) ;
730 index2 = computingClients.indexOf( tmp.get( 0 ) ) ;
732 if( index == -1 || index2 == -1 )
734 System.err.println( "Problem in ComputingClients list!" ) ;
737 computingClients.get( index ).setSaveNeighbor( new SaveNeighbor( computingClients.get( index2 ).getClient().getStub() )) ;
738 } catch( RemoteException e ) {
739 System.err.println( "Unable to create the save neighbor!" ) ;
740 e.printStackTrace() ;
744 index = computingClients.indexOf( tmp.get( i ) ) ;
745 index2 = computingClients.indexOf( tmp.get( i + 1 ) ) ;
747 if( index == -1 || index2 == -1 )
749 System.err.println( "Problem in ComputingClients list!" ) ;
752 computingClients.get( index ).setSaveNeighbor( new SaveNeighbor( computingClients.get( index2 ).getClient().getStub() ) ) ;
753 } catch( RemoteException e ) {
754 System.err.println( "Unable to create the save neighbor!" ) ;
755 e.printStackTrace() ;
762 applications.add( app ) ;
772 public void requestSave( String _ip )
776 } catch( InterruptedException e ) {
777 System.err.println( "Unable to obtain the semaphore for semaSave!" ) ;
778 e.printStackTrace() ;
781 final String ip = _ip ;
783 new Thread( new Runnable() {
788 treatRequestSave( ip ) ;
794 public void treatRequestSave( String _ip )
796 if( applications.size() > 0 && _ip != null && _ip.length() > 0 )
798 if( (System.currentTimeMillis() - applications.get( 0 ).getLastSaveDate()) > save_interleave )
800 // Mark it as a requester
801 for( int i = 0 ; i < applications.get( 0 ).getComputingClients().size() ; i++ )
803 if( applications.get( 0 ).getComputingClients().get( i ).getClient().getIP().equalsIgnoreCase( _ip ) )
805 applications.get( 0 ).getComputingClients().get( i ).setSaveRequest( true ) ;
815 for( int i = 0 ; i < applications.get( 0 ).getComputingClients().size() ; i++ )
819 ok = applications.get( 0 ).getComputingClients().get( i ).getSaveRequest() ;
821 ok = ok & applications.get( 0 ).getComputingClients().get( i ).getSaveRequest() ;
833 // Thread.sleep( 5000 ) ;
834 // } catch( InterruptedException e1 ) {
835 // e1.printStackTrace() ;
838 for( int i = 0 ; i < applications.get( 0 ).getComputingClients().size() ; i++ )
841 applications.get( 0 ).getComputingClients().get( i ).getClient().getStub().responseSave( true ) ;
842 applications.get( 0 ).getComputingClients().get( i ).setSaveRequest( false ) ;
843 } catch( RemoteException e ) {
844 System.err.println( "Unable to send the save request response to " +
845 applications.get( 0 ).getComputingClients().get( i ).getClient().getName() + "!" ) ;
846 e.printStackTrace() ;
850 applications.get( 0 ).setLastSaveDate( System.currentTimeMillis() ) ;
856 for( int i = 0 ; i < applications.get( 0 ).getComputingClients().size() ; i++ )
858 if( applications.get( 0 ).getComputingClients().get( i ).getClient().getIP().equalsIgnoreCase( _ip ) )
861 applications.get( 0 ).getComputingClients().get( i ).getClient().getStub().responseSave( false ) ;
862 } catch( RemoteException e ) {
863 System.err.println( "Unable to send the save request response to " +
864 applications.get( 0 ).getComputingClients().get( i ).getClient().getName() + "!" ) ;
865 e.printStackTrace() ;
873 System.err.println( "!! Serious problem in treatRequestSave method!!" ) ;
879 public void restartOk( String _ip )
881 if( applications.size() > 0 && _ip != null && _ip.length() > 0 )
883 System.out.println( "Client " + _ip + " has finished to restart ("+applications.get(0).getComputingClients().size()+") ... " ) ;
884 if( (System.currentTimeMillis() - applications.get( 0 ).getLastSaveDate()) > save_interleave )
886 // Has it already finished?
887 for( int i = 0 ; i < applications.get( 0 ).getComputingClients().size() ; i++ )
889 if( applications.get( 0 ).getComputingClients().get( i ).getClient().getIP().equalsIgnoreCase( _ip ) )
891 applications.get( 0 ).getComputingClients().get( i ).setRestartOk( true ) ;
899 for( int i = 0 ; i < applications.get( 0 ).getComputingClients().size() ; i++ )
903 ok = applications.get( 0 ).getComputingClients().get( i ).getRestartOk() ;
905 ok = ok & applications.get( 0 ).getComputingClients().get( i ).getRestartOk() ;
916 applications.get( 0 ).setLastSaveDate( System.currentTimeMillis() ) ;
918 for( int i = 0 ; i < applications.get( 0 ).getComputingClients().size() ; i++ )
920 applications.get( 0 ).getComputingClients().get( i ).setRestartOk( false ) ;
930 public void endApplication()
932 Iterator<ComputingClient> it = computingClients.iterator() ;
934 while( it.hasNext() )
936 ComputingClient cl = it.next() ;
939 cl.getClient().getStub().emergencyStop() ;
940 } catch (RemoteException e) {
944 cl.getClient().setStatus( "connected" ) ;
945 cl.getClient().setComputingClient( null ) ;
950 applications.get( 0 ).setEndTime( System.currentTimeMillis() ) ;
951 applications.get( 0 ).setRunning( false ) ;
952 applications.get( 0 ).clear() ;
959 public String getAssociatedIP( String _ip )
963 for( int i = 0 ; i < vmIPs.size() ; i++ )
965 if( vmIPs.get( i ).getHostIP().equalsIgnoreCase( _ip ) )
967 ret = vmIPs.get( i ).getVmIP() ;
976 public Integer deployVM( String _name, String _archive, String _directory )
981 if( _name != null && _name.length() > 0 && _archive != null && _name.length() > 0
982 && _directory != null && _directory.length() > 0 )
984 System.out.println( "Deploying the VM " + _name + " (" + _archive + ") ... " ) ;
986 File file = new File( working_directory + "/" + _archive ) ;
987 if( ! file.exists() )
989 System.err.println( "There is no archive named " + _archive + " in my working directory!" ) ;
992 } else if( file.isDirectory() ) {
993 System.err.println( _archive + " is a directory!" ) ;
1000 // TODO do a better deployment !!
1006 for( int i = 0 ; i < clients.size() ; i++ )
1010 if( clients.get( i ).getStatus().equalsIgnoreCase( "connected" ) )
1013 ret = clients.get( i ).getStub().deployVM( _name, _archive, _directory ) ;
1014 } catch( RemoteException e ) {
1015 System.err.println( "Unable to deploy the VM on " + clients.get( i ).getName() + "!" ) ;
1016 e.printStackTrace() ;
1019 // The client does not have the archive, we have to send it.
1022 System.out.print( "Sending VM archive to " + clients.get( i ).getName() + " ... " ) ;
1029 wd = clients.get( i ).getStub().getWorkingDirectory() ;
1030 snIP = clients.get( i ).getStub().getIPHost() ;
1031 } catch (RemoteException e2) {
1032 System.err.println( "Unable to retrieve information on " + clients.get( i ).getName() + "!" ) ;
1033 e2.printStackTrace() ;
1038 String[] command = new String[]{ "/usr/bin/scp", working_directory + "/" + _archive,
1044 Process proc = Runtime.getRuntime().exec( command ) ;
1047 if( proc.exitValue() == 0 )
1049 System.out.println( "Initial VM archive successfully sent." ) ;
1051 System.err.println( "Initial VM archive not sent!" ) ;
1052 System.err.println( "Error: " + proc.exitValue() ) ;
1053 BufferedReader b = new BufferedReader( new InputStreamReader( proc.getErrorStream() ) ) ;
1057 while( (l = b.readLine()) != null )
1059 System.err.println( l ) ;
1061 } catch( IOException e ) {
1062 e.printStackTrace() ;
1068 } catch( IOException e ) {
1069 System.err.println( "Error during initial VM archive send command: " ) ;
1070 e.printStackTrace() ;
1073 } catch( InterruptedException e ) {
1074 e.printStackTrace() ;
1089 ret = clients.get( i ).getStub().deployVM( _name, _archive, _directory ) ;
1090 } catch( RemoteException e ) {
1091 System.err.println( "Unable to deploy the VM on " + clients.get( i ).getName() + "!" ) ;
1092 e.printStackTrace() ;
1099 System.out.println( "Initial VM archive successfully deployed on " + clients.get( i ).getName() + "." ) ;
1109 System.err.println( "** " + pb + " machine is not deployed!" ) ;
1111 System.err.println( "** " + pb + " machines are not deployed!" ) ;
1118 public String getWorkingDirectory()
1120 return working_directory ;
1125 /** La programmation est un art, respectons ceux qui la pratiquent !! **/