]> AND Private Git Repository - equilibrage.git/blob - simulation/equil6.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Ajout notes
[equilibrage.git] / simulation / equil6.cpp
1 #include <iostream>
2 #include <list>
3 #include <vector>
4 #include <string>
5 #include <fstream>
6 #include <sstream>
7 #include <stdlib.h>
8 #include <sys/time.h>
9 #include <cmath>
10 #include "msg/msg.h" 
11 #include "xbt/sysdep.h" 
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14
15 using namespace std;
16
17 typedef enum {Unique,Unite,AM,Local,Local2} Type_T; // types de politiques de transfert de charge
18
19 double ALPHA=0.125;           // poids de la dernière mesure d'intervalle d'attente d'équilibrage
20 double BETA=0.125;            // poids de la dernière mesure d'intervalle d'attente d'envois/réceptions
21 int SEUIL_ENVOI=1;            // nombre d'intervalles d'attente avant envoi d'infos
22 double INTER=0.0;             // intervalle de temps pour les tests d'équilibrages
23 double SLEEP=0.0;             // temps de mise en attente des processus
24 double octetsTache=24.0;      // en octets (3 doubles)
25 double flopsTache=500.0;      // en flops (500 opérations)
26 int minNBT=1000,maxNBT=10000; // intervalle du nombre de tâches sur chaque proc
27 int nbTerms=0;                // nombre de procs ayant terminé leurs calculs
28 int nbTI=0,nbTT=0;            // nombres de tâches initiales et traitées
29 double tGDMin=1e100,tGFMax=0; // temps globaux de début et fin
30 int numUnique=-1;             // numéro du processeur reçevant la charge en cas de charge unique
31 vector<int> nbTaches;         // tableau des nombres de tâches par proc
32 double somVit=0;              // somme des vitesses des procs du système
33 double maxTC;                 // temps de calcul max sur répartition initiale
34 double tempsOpt;              // temps optimal de calcul des tâches sur le système
35 int **nbIters=NULL,nbIt;      // tableau des nombres d'itérations nécessaires pour la CV (simulée)
36                               // et nb total itérations
37 int minIt=1,maxIt=1;          // bornes de l'intervalle du nombre d'itérations par tâche 
38 bool mutexTE,mutexTNE,mutexTR; // mutex pour la manipulation des listes de tâches
39 vector<int> iterCumulees;     // itérations locales cumulées par proc
40 unsigned int variante=0;      // indique les variantes éventuelles de l'algo d'équilibrage
41 int nbArretEnvs=0;            // nombre de procs ayant arrêté leur processus d'envois des infos/equs
42 int nbArretRecs=0;            // nombre de procs ayant arrêté leur processus de réception des infos/equs
43 Type_T polit=AM;              /* type de politique de transfert de charge :
44                                  Unique : transfert vers le moins chargé des voisins
45                                  Unite  : transfert d'une unité de charge à chaque voisin moins chargé
46                                  AM     : transfert selon règles données dans thèse AM
47                                  Local  : répartition équilibrée sur voisins ayant moins de charge que
48                                           la moyenne de la source et des destinataires
49                                */
50 vector<int> valsAleat;        // liste explicite de valeurs aléatoires (cross-plateforme)
51
52 typedef enum {BASE=0,LL=1,CU=2,IL=4,IV=8,AB=16} Variante; 
53 /*
54   BASE = algo initial avec limite stricte de la charge locale restante sur l'émetteur
55   LL   = léger dépassement de la limite de charge locale autorisé
56   CU   = charge initiale unique sur un proc
57   IL   = itérations liées (identiques entre les tâches)
58   IV   = itérations réalisées sur tout le vecteur local
59   AB   = processus d'apprentissage AdaBoost
60 */                                                       
61
62 typedef vector<m_task_t> LTaches;
63 typedef vector<m_host_t> LHotes;
64 typedef struct {
65   double temps;
66   int nbT;
67 } Charge;
68 typedef struct {
69   Charge charge;
70   bool fini;
71   bool *connectes;
72   LHotes *voisins;
73   Charge *voCharges;
74   int icl;
75   double inter,*sleep;
76 } Infos;
77 typedef struct {
78   int pid;
79   Charge charge;
80   string nom;
81 } Mess;
82 typedef struct {
83   int nbT;
84   m_task_t *tab;
85 } MessEqu;
86 typedef struct {
87   string src;
88   int *dest,*icl;
89   LTaches *taches;
90   LTaches *recup;
91   LHotes *voisins;
92   bool fini;
93   double *sleep;
94 } Equ;
95
96 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,"Messages specific to equil");
97
98 int envInfos(int argc, char *argv[]);
99 int recInfos(int argc, char *argv[]);
100 int envEqu(int argc, char *argv[]);
101 int recEqu(int argc, char *argv[]);
102 int calculs(int argc, char *argv[]);
103 double ordoOpt(m_host_t *listeHotes);
104 bool lireListe(char *nom, vector<int> &valsAleat);
105 int myRand();
106
107 MSG_error_t test_all(const char *platform_file, const char *application_file);
108
109 typedef enum {PORT_DON,PORT_EQU,MAX_CHANNEL} channel_t;
110
111 // Fonction d'envoi des messages
112 int envInfos(int argc, char *argv[])
113 {
114   int pid;
115   m_task_t *enve = NULL;
116   m_process_t moi;
117   int i,j;
118   MSG_error_t err;
119   string nomHote;
120   Infos *denv=NULL;
121   stringstream sstr;
122   bool fini=false;
123   Charge chargePrec={-1.0,-1};
124   Mess *mess;
125   int compteur;
126
127   // Récupération de son processus
128   moi=MSG_process_self();
129
130   // Récupération nom machine
131   nomHote=string(MSG_host_get_name(MSG_host_self()));
132
133   // Récupération données
134   denv=(Infos *)MSG_process_get_data(moi);
135   denv->charge.temps=denv->icl*flopsTache/MSG_get_host_speed(MSG_host_self());
136
137   // Création tableau des messages d'envois
138   enve=new m_task_t[denv->voisins->size()];
139
140   // Affichage PID
141   pid=MSG_process_self_PID();
142   sstr.clear();
143   sstr << "Processus d'envoi des infos sur " << nomHote.c_str();
144   INFO1("\t* %s",sstr.str().c_str());
145   mess=new Mess;
146   mess->pid=pid;
147   mess->nom=nomHote;
148   compteur=0;
149
150   while(!fini){
151     if(compteur==SEUIL_ENVOI || chargePrec.temps!=denv->charge.temps || chargePrec.nbT!=denv->charge.nbT){
152       chargePrec=denv->charge;
153       // Envois bloquants des infos de charge
154       sstr.clear();
155       sstr.str("");
156       sstr << "Charge de " << mess->nom << " (" << pid << ")";
157       mess->charge=denv->charge;
158       for(i=0;i<denv->voisins->size();++i){
159         enve[i]=MSG_task_create(sstr.str().c_str(),0.0,(double)sizeof(Mess),(void *)mess);
160         err=MSG_task_put(enve[i],(*(denv->voisins))[i],PORT_DON);
161         if(err!=MSG_OK){
162           INFO2("\t* Pb à l'envoi des infos depuis %s vers %s",nomHote.c_str(),MSG_host_get_name((*(denv->voisins))[i]));
163           /*
164             switch(err){
165             case MSG_FATAL:
166             INFO1("\t* message mal initialisé sur %s",nomHote.c_str());
167             break;
168             case MSG_HOST_FAILURE:
169             INFO1("\t* hôte destination non joignable depuis %s",nomHote.c_str());
170             break;
171             case MSG_TRANSFER_FAILURE:
172             INFO1("\t* Erreur de transmission depuis %s",nomHote.c_str());
173             break;
174             }
175           */
176           MSG_task_destroy(enve[i]);
177         }else{
178           INFO1("\t* Envoi OK des infos depuis %s",nomHote.c_str());
179         }
180       }
181       sstr << " " << chargePrec.temps << " (" << chargePrec.nbT << ")";
182       INFO1("\t* %s",sstr.str().c_str());
183       compteur=0;
184     }else{
185       compteur++;
186     }
187     MSG_process_sleep(*(denv->sleep));
188     fini=denv->fini;
189   }
190   nbArretEnvs++;
191   INFO1("\t* Arrêt processus d'envois des infos sur %s",nomHote.c_str());
192
193   // Libérations mémoire
194   delete mess;
195   delete[] enve;
196 }
197
198 // Fonction de réception des messages
199 int recInfos(int argc, char *argv[])
200 {
201   Infos *drec;
202   m_process_t moi;
203   m_task_t recu=NULL;
204   string nomHote;
205   bool fini=false;
206   Mess *mess=NULL;
207   MSG_error_t err;
208   int i,pid;
209   stringstream sstr;
210   double tDeb,tFin;
211
212   // Récupération de son processus
213   moi=MSG_process_self();
214   nomHote=string(MSG_host_get_name(MSG_host_self()));
215
216   // Récupération données
217   drec=(Infos *)MSG_process_get_data(moi);
218
219   // Affichage infos
220   sstr.clear();
221   sstr << "Processus de réception des infos sur " << nomHote.c_str();
222   INFO1("\t\t# %s",sstr.str().c_str());
223
224   // Boucle de réception
225   tDeb=MSG_get_clock();
226   while(!fini){
227     pid=MSG_task_probe_from(PORT_DON);
228     if(pid>=0){
229       err=MSG_task_get(&recu,PORT_DON);
230       if(err!=MSG_OK){
231         INFO1("\t\t# Pb à la réception des infos sur %s",nomHote.c_str());
232       }else{
233         mess=(Mess *)MSG_task_get_data(recu);
234         drec->charge=mess->charge;
235         for(i=0;i<drec->voisins->size() && mess->nom!=MSG_host_get_name((*(drec->voisins))[i]);++i);
236         if(i<drec->voisins->size()){
237           drec->voCharges[i]=mess->charge;
238           drec->connectes[i]=true;
239         }
240         sstr.clear();
241         sstr.str("");
242         sstr << nomHote.c_str() << " a reçu l'info de charge " << drec->charge.temps << " (" << drec->charge.nbT << ") de " << mess->nom;
243         INFO1("\t\t# %s",sstr.str().c_str());
244         MSG_task_destroy(recu);
245         recu=NULL;
246       }
247     }else{
248       MSG_process_sleep(*(drec->sleep));
249     }
250     fini=drec->fini;
251     tFin=MSG_get_clock();
252     if(!fini && fabs(tFin-tDeb)>=drec->inter){
253       // réinitialisation de l'intervalle de temps
254       for(i=0;i<drec->voisins->size();++i){
255         drec->connectes[i]=false;
256       }
257       tDeb=tFin;
258     }
259   }
260   nbArretRecs++;
261   INFO1("\t\t# Arrêt processus de réceptions des infos sur %s",nomHote.c_str());
262 }
263
264 // Fonction d'envoi des données d'équilibrage
265 int envEqu(int argc, char *argv[])
266 {
267   string nomHote;
268   Equ *eenv=NULL;
269   bool fini=false;
270   MSG_error_t err=MSG_OK;
271   m_task_t tache=NULL;
272   int i,j;
273   MessEqu *mess=NULL;
274
275   // Récupération nom machine
276   nomHote=string(MSG_host_get_name(MSG_host_self()));
277   
278   // Récupération données
279   eenv=(Equ *)MSG_process_get_data(MSG_process_self());
280
281   // Allocation du tableau des messages
282   mess=new MessEqu[(*(eenv->voisins)).size()];
283
284   // Affichage infos
285   INFO1("\t\t\t§ Processus d'envoi des équilibrages sur %s",nomHote.c_str());
286
287   // Boucle des envois
288   while(!fini){
289     for(i=0;i<(eenv->voisins)->size();++i){
290       if(eenv->dest[i]>0){
291         mess[i].nbT=eenv->dest[i];
292         mess[i].tab=new m_task_t[mess[i].nbT];
293         mutexTE=true;
294         for(j=0;j<mess[i].nbT;++j){
295           mess[i].tab[j]=(eenv->taches)->back();
296           *(eenv->icl)-=*((int *)MSG_task_get_data((eenv->taches)->back()));
297           (eenv->taches)->pop_back();
298         }
299         mutexTE=false;
300         tache=MSG_task_create("lot tâches",0.0,mess[i].nbT*octetsTache,(void *)(mess+i));
301         err=MSG_task_put(tache,(*(eenv->voisins))[i],PORT_EQU);
302         if(err!=MSG_OK){
303           INFO2("\t\t\t§ Pb à l'envoi d'équilibrage depuis %s vers %s",nomHote.c_str(),MSG_host_get_name((*(eenv->voisins))[i]));
304           mutexTNE=true;
305           for(j=0;j<mess[i].nbT;++j){
306             (eenv->recup)->push_back(mess[i].tab[j]);
307             *(eenv->icl)+=*((int *)MSG_task_get_data((eenv->recup)->back()));
308           }
309           mutexTNE=false;
310           delete[] mess[i].tab;
311         }else{
312           INFO3("\t\t\t§ Envoi OK de %d tâche(s) depuis %s vers %s",mess[i].nbT,nomHote.c_str(),MSG_host_get_name((*(eenv->voisins))[i]));
313           // (*(eenv->taches)).erase((*(eenv->taches)).end()-mess[i].nbT,(*(eenv->taches)).end());
314         }
315         eenv->dest[i]=0;
316       }
317     }
318     MSG_process_sleep(*(eenv->sleep));
319     fini=eenv->fini;
320   }
321   nbArretEnvs++;
322   INFO1("\t\t\t§ Arrêt processus d'envoi des équilibrages sur %s",nomHote.c_str());
323
324   // Libérations mémoire
325   delete[] mess;
326 }
327
328 // Fonction de réception des données d'équilibrage
329 int recEqu(int argc, char *argv[])
330 {
331   string nomHote,lt;
332   Equ *erec=NULL;
333   bool fini=false;
334   MSG_error_t err;
335   m_task_t tache=NULL;
336   m_host_t hsrc;
337   int i,pid;
338   MessEqu *mess;
339
340   // Récupération nom machine
341   nomHote=string(MSG_host_get_name(MSG_host_self()));
342   
343   // Récupération données
344   erec=(Equ *)MSG_process_get_data(MSG_process_self());
345
346   // Affichage infos
347   INFO1("\t\t\t\t@ Processus de réception des équilibrages sur %s",nomHote.c_str());
348
349   // Boucle de réception
350   while(!fini){
351     pid=MSG_task_probe_from(PORT_EQU);
352     if(pid>=0){
353       err=MSG_task_get(&tache,PORT_EQU);
354       if(err!=MSG_OK){
355         INFO1("\t\t\t\t@ Pb à la réception d'un équilibrage sur %s",nomHote.c_str());
356       }else{
357         lt.clear();
358         mess=(MessEqu *)MSG_task_get_data(tache);
359         mutexTR=true;
360         lt=string(MSG_task_get_name(mess->tab[0]))+" oo "+string(MSG_task_get_name(mess->tab[mess->nbT-1]));
361         for(i=0;i<mess->nbT;++i){
362           (erec->taches)->push_back(mess->tab[i]);
363           *(erec->icl)+=*((int *)MSG_task_get_data((erec->taches)->back()));
364           // lt+=string(MSG_task_get_name(mess->tab[i]))+" ";
365         }
366         mutexTR=false;
367         delete[] mess->tab;
368         hsrc=MSG_task_get_source(tache);
369         for(i=0;i<erec->voisins->size() && hsrc!=(*(erec->voisins))[i];++i);
370         if(i<erec->voisins->size()){
371           erec->src=MSG_host_get_name((*(erec->voisins))[i]);
372         }
373         INFO3("\t\t\t\t@ %s a reçu %d tâche(s) de %s",nomHote.c_str(),mess->nbT,erec->src.c_str());
374         INFO2("\t\t\t\t@ %s %s",nomHote.c_str(),lt.c_str());
375         tache=NULL;
376       }
377     }
378     MSG_process_sleep(*(erec->sleep));
379     fini=erec->fini;
380   }
381   nbArretRecs++;
382   INFO1("\t\t\t\t@ Arrêt processus de réceptions des équilibrage sur %s",nomHote.c_str());
383 }
384
385 // Fonction d'équilibrage
386 int calculs(int argc, char *argv[]) // Les paramètres sont : 0 = processus, 1,... = voisins
387 {
388   m_host_t monHote=NULL,*listeHotes=NULL; 
389   m_task_t mess=NULL,tvect=NULL;
390   LTaches listeT,listeTE,listeTR,listeTNE;
391   LHotes voisins;
392   int i,j;
393   MSG_error_t err;
394   int num;
395   string nomHote,nomTache,tmp;
396   stringstream sstr;
397   double tDeb,tFin;
398   m_process_t envi,reci,enve,rece;
399   Infos *denv,*drec;
400   Equ *eenv,*erec;
401   bool fini=false,fam=false,limite;
402   int indLim;
403   int indVM;
404   double diffMax;
405   int dtmp;
406   int nbTachesTraitees=0,nbCon;
407   double tCycD,tCycF,intervalle,*_sleep;
408   double tGlobalDeb, tGlobalFin,tempsCalc;
409   vector<Charge> diffs; 
410   vector<int> indTries,tmpDest;
411   int tacheCrte,sommeTE;
412
413   // Récupération du nom de la machine courante
414   monHote = MSG_host_self();
415   nomHote=string(MSG_host_get_name(monHote));
416   // Récupération de la liste des machines
417   listeHotes=MSG_get_host_table();
418   // Récupération de la position de la machine courante dans la liste
419   for(num=0;num<MSG_get_host_number() && nomHote!=MSG_host_get_name(listeHotes[num]);++num);
420   num++;
421   // Récupération des voisins de la machine courante
422   for(i=0;i<MSG_get_host_number();++i){
423     tmp=string(MSG_host_get_name(listeHotes[i]));
424     for(j=1;j<argc && tmp!=argv[j];++j);
425     if(j<argc){
426       voisins.push_back(listeHotes[i]);
427     }
428   }
429   num--; // numérotation des procs à partir de 0
430   diffs.resize(voisins.size());
431
432   // Affichage infos
433   if(monHote==NULL){
434     INFO0("Unknown host... Stopping Now! ");
435     abort();
436   }else{
437     INFO3("Je suis %s et j'ai %d éléments / %d",nomHote.c_str(),nbTaches[num],nbTI);
438   }
439   sstr.str("");
440   for(i=0;i<voisins.size();++i){
441     sstr << MSG_host_get_name(voisins[i]) << " ";
442   }
443   INFO1("Mes voisins sont : %s",sstr.str().c_str());
444
445   // Allocation des infos de charge
446   _sleep=new double;
447   *_sleep=INTER;
448   denv=new Infos;
449   denv->fini=false;
450   denv->charge.temps=0;
451   denv->charge.nbT=nbTaches[num];
452   denv->voisins=&voisins;
453   denv->voCharges=NULL;
454   denv->icl=iterCumulees[num];
455   denv->inter=INTER;
456   denv->sleep=_sleep;
457   drec=new Infos;
458   drec->fini=false;
459   drec->connectes=new bool[voisins.size()];
460   for(i=0;i<voisins.size();++i){
461     drec->connectes[i]=false;
462   }
463   drec->voisins=&voisins;
464   drec->voCharges=new Charge[voisins.size()];
465   for(i=0;i<voisins.size();++i){
466     drec->voCharges[i].temps=1e100;
467     drec->voCharges[i].nbT=1<<30;
468   }
469   drec->inter=INTER;
470   drec->sleep=_sleep;
471
472   // Crée processus d'envois/réceptions des infos
473   envi=MSG_process_create("envInfos",&envInfos,(void *)denv,monHote);
474   reci=MSG_process_create("recInfos",&recInfos,(void *)drec,monHote);
475
476   // Allocation des messages d'équilibrage
477   eenv=new Equ;
478   eenv->src=nomHote;
479   eenv->dest=new int[voisins.size()];
480   for(i=0;i<voisins.size();++i){
481     eenv->dest[i]=0;
482   }
483   eenv->icl=&iterCumulees[num];
484   eenv->taches=&listeTE;
485   eenv->recup=&listeTNE;
486   eenv->voisins=&voisins;
487   eenv->fini=false;
488   eenv->sleep=_sleep;
489   erec=new Equ;
490   erec->icl=&iterCumulees[num];
491   erec->taches=&listeTR;
492   erec->voisins=&voisins;
493   erec->fini=false;
494   erec->sleep=_sleep;
495
496   mutexTE=mutexTNE=mutexTR=false;
497   tmpDest.resize(voisins.size(),0);
498
499   // Crée processus d'envois/réceptions des équilibrages
500   enve=MSG_process_create("envEqu",&envEqu,(void *)eenv,monHote);
501   rece=MSG_process_create("recEqu",&recEqu,(void *)erec,monHote);
502
503   // Création des tâches
504   nbCon=0;
505   for(i=0;i<num;++i){
506     nbCon+=nbTaches[i];
507   }
508   for(i=0;i<nbTaches[num];++i){
509     sstr.clear();
510     sstr.str("");
511     sstr << "T_" << i << "_" << num+1;
512     sstr >> nomTache;
513     listeT.push_back(MSG_task_create(nomTache.c_str(),flopsTache,octetsTache,(void *) nbIters[nbCon+i]));
514   }
515
516   // Exécution des itérations
517   tGlobalDeb=tCycD=MSG_get_clock();
518   tacheCrte=0;
519   while(!fini){
520
521     // Récupération des tâches non correctement envoyées précédemment
522     while(!mutexTNE && listeTNE.size()>0){
523       listeT.push_back(listeTNE.back());
524       listeTNE.pop_back();
525     }
526
527     // Récupération des tâches reçues d'un équilibrage
528     while(!mutexTR && listeTR.size()>0){
529       listeT.push_back(listeTR.back());
530       listeTR.pop_back();
531     }
532     
533     // Calculs des tâches  
534     if(listeT.size()>0){
535       // Mise à jour de l'état de terminaison globale si nécessaire
536       if(fam){
537         fam=false;
538         nbTerms--;
539         INFO1("%s reprends des calculs...",nomHote.c_str());
540       }
541
542       // Calcul seulement si tâche courante valide
543       if(tacheCrte>=listeT.size()){
544         tacheCrte=0;
545       }
546
547       // Affichage
548       if(!(variante & IV)){
549         INFO2("Nb tâches : %d\tTâche courante : %d",listeT.size(),tacheCrte);
550 #if 0
551         nomTache.clear();
552         for(i=0;i<listeT.size();++i){
553           nomTache+=string(MSG_task_get_name(listeT[i]))+" ";
554         }
555         INFO2("!! %s %s",nomHote.c_str(),nomTache.c_str());
556 #endif
557         INFO3("Exécution tâche %s (%d) sur proc %s",MSG_task_get_name(listeT[tacheCrte]),(*((int *) MSG_task_get_data(listeT[tacheCrte]))),nomHote.c_str()); 
558       }else{
559         INFO2("Exécution des %d tâches locales sur proc %s",listeT.size(),nomHote.c_str()); 
560         /*
561           nomTache.clear();
562           for(i=0;i<listeT.size();++i){
563           nomTache+=string(MSG_task_get_name(listeT[i]))+" (";
564           sstr.clear();
565           sstr.str("");
566           sstr << *((int *) MSG_task_get_data(listeT[i]));
567           nomTache+=sstr.str()+") ";
568           }
569           INFO2("!! %s %s",nomHote.c_str(),nomTache.c_str());
570         */
571       }
572
573       // Calculs 
574       tDeb=MSG_get_clock();
575       if(variante & IV){
576         // Exécution des tâches du vecteur
577         tvect=MSG_task_create("Itération vecteur",flopsTache*listeT.size(),octetsTache*listeT.size(),NULL);
578         if(MSG_task_execute(tvect)!=MSG_OK){
579           INFO0("Pb d'exécution de l'itération sur le vecteur");
580         }
581         MSG_task_destroy(tvect);
582         tvect=NULL;
583
584         tFin=MSG_get_clock();
585         nbTachesTraitees+=listeT.size();
586         // Calcul charge restante et envoi
587         iterCumulees[num]-=listeT.size(); // Suppression des itérations que l'on vient de faire
588         denv->charge.temps=(tFin-tDeb)*iterCumulees[num];
589           
590         // Décompte et suppression éventuelle de la tâche effectuée
591         for(tacheCrte=0;tacheCrte<listeT.size();){
592           (*((int *) MSG_task_get_data(listeT[tacheCrte])))--;
593           if(*((int *) MSG_task_get_data(listeT[tacheCrte]))==0){
594             delete ((int *) MSG_task_get_data(listeT[tacheCrte]));
595             MSG_task_destroy(listeT[tacheCrte]); // Vérifier si la destruction désalloue les data !!!
596             listeT.erase(listeT.begin()+tacheCrte);
597             INFO1("CONVERGENCE DE TACHE %d",tacheCrte);
598           }else{
599             tacheCrte++;
600           }
601         }
602
603         // Mise à jour nombre de tâches locales
604         denv->charge.nbT=listeT.size();
605  
606       }else{
607
608         // Exécution tâche courante
609         if(MSG_task_execute(listeT[tacheCrte])!=MSG_OK){
610           INFO1("Pb d'exécution de la tâche %d",tacheCrte);
611         }
612         tFin=MSG_get_clock();
613         nbTachesTraitees++;
614         
615         // Décompte et suppression éventuelle de la tâche effectuée
616         (*((int *) MSG_task_get_data(listeT[tacheCrte])))--;
617         if(*((int *) MSG_task_get_data(listeT[tacheCrte]))==0){
618           delete ((int *) MSG_task_get_data(listeT[tacheCrte]));
619           MSG_task_destroy(listeT[tacheCrte]); // Vérifier si la destruction désalloue les data !!!
620           listeT.erase(listeT.begin()+tacheCrte);
621           INFO1("CONVERGENCE DE TACHE %d",tacheCrte);
622         }else{
623           tacheCrte++;
624         }
625         
626         // Calcul charge restante et envoi
627         iterCumulees[num]--; // Suppression de l'itération que l'on vient de faire
628         denv->charge.temps=(tFin-tDeb)*iterCumulees[num];
629         denv->charge.nbT=listeT.size();
630       }
631     
632     }else{
633       if(listeTNE.size()==0 && listeTR.size()==0){
634         if(!fam){
635           fam=true;
636           nbTerms++;
637         }
638         if(nbTerms<MSG_get_host_number()){
639           INFO2("%s en attente de calculs... (%f)",nomHote.c_str(),*_sleep);
640           MSG_process_sleep(*_sleep);
641         }
642       }
643     }
644
645     // Décision équilibrage à intervalle régulier (temps discret)
646     tCycF=MSG_get_clock();
647     intervalle=fabs(tCycF-tCycD);
648     INFO4("-----------\t\t%f %f -> %f / %f",tCycD,tCycF,intervalle,drec->inter);
649     if(intervalle>=drec->inter){
650
651       // Affichage charges
652       sstr.clear();
653       sstr.str("");
654       nbCon=0;
655       sstr << nomHote << " : " << denv->charge.temps << " (" << denv->charge.nbT << ")\t";
656       for(i=0;i<voisins.size();++i){
657         sstr << drec->voCharges[i].temps << " (" << drec->voCharges[i].nbT << ") [";
658         if(drec->connectes[i]){ sstr << "X"; nbCon++; }else sstr << " ";
659         sstr << "] ";
660       }
661       INFO1("%s",sstr.str().c_str());
662     
663       // Décision
664       if(listeT.size()>1){ // règles d'équilibrage
665         double somme=denv->charge.temps;
666         int nbTermes=1;
667
668         // Construction du tableau décroissant des diffs de charges
669         for(i=0;i<voisins.size();++i){
670           if(drec->connectes[i] 
671              && eenv->dest[i]==0 // envoi de charge en cours pour le voisin i
672              && (!(variante & LL) || denv->charge.nbT>drec->voCharges[i].nbT)
673              && (denv->charge.temps-drec->voCharges[i].temps)*MSG_get_host_speed(monHote)>flopsTache){
674             diffs[i].temps=denv->charge.temps-drec->voCharges[i].temps;
675             diffs[i].nbT=denv->charge.nbT-drec->voCharges[i].nbT;
676             if(indTries.size()==0){
677               indTries.push_back(i);
678             }else{
679               for(j=0;j<indTries.size() && diffs[i].temps<diffs[indTries[j]].temps;++j);
680               indTries.insert(indTries.begin()+j,i);
681             }
682             somme+=drec->voCharges[i].temps;
683             nbTermes++;
684           }else{
685             diffs[i].temps=0;
686             diffs[i].nbT=0;
687           }
688         }
689         if(nbTermes>0 && polit==Local){
690           somme/=nbTermes;
691
692           // difficile de voir l'intérêt de la boucle sur topologies à degré <=2 !
693           bool fin=false;
694           do{
695             int nbTs=1;
696             double moyC=denv->charge.temps;
697             for(i=0;i<indTries.size() && drec->voCharges[indTries[i]].temps<somme;++i){
698               moyC+=drec->voCharges[indTries[i]].temps;
699               nbTs++;
700             }
701             if(i<indTries.size()){
702               indTries.resize(i);
703             }else{
704               fin=true;
705             }
706             nbTermes=nbTs;
707             somme=moyC/nbTermes;
708           }while(!fin);
709
710         }
711
712         INFO2("Temps = %lf\tTermes = %d",somme,nbTermes);
713
714         for(i=0;i<indTries.size();++i){
715           INFO5("Diff %s : %lf %d %e %e",MSG_host_get_name(voisins[indTries[i]]),diffs[indTries[i]].temps,diffs[indTries[i]].nbT,denv->charge.temps,drec->voCharges[indTries[i]].temps);
716         }
717
718         // Initialisation de la répartition des envois de charges
719         dtmp=0;
720         limite=false;
721         indLim=indTries.size();
722         sommeTE=0;
723
724         // Boucle de calcul de la répartition
725         for(i=0;i<indTries.size() && !limite;++i){
726           switch(polit){
727           case Unique:
728             dtmp+=(tmpDest[indTries[i]]=(int)rint(diffs[indTries[i]].nbT/2.0));
729             break;
730           case Unite:
731             dtmp+=(tmpDest[indTries[i]]=1);
732             break;
733           case AM:
734             dtmp+=(tmpDest[indTries[i]]=(int)rint(diffs[indTries[i]].nbT/(voisins.size()+1.0)));
735             break;
736           case Local:
737             if(drec->voCharges[indTries[i]].temps>0){
738               dtmp+=(tmpDest[indTries[i]]=(int)rint(drec->voCharges[indTries[i]].nbT*(somme-drec->voCharges[indTries[i]].temps)/drec->voCharges[indTries[i]].temps));//(int)rint(diffs[indTries[i]].nbT/2.0));//(voisins.size()+1.0)));
739             }else{
740               // On n'a pas l'info de temps restant sur le proc destination alors
741               // on n'a aucune idée de sa vitesse et donc du nombre de tâches à envoyer...
742               // Le plus raisonnable est de considérer la même vitesse que le proc source
743               dtmp+=(tmpDest[indTries[i]]=(int)rint(denv->charge.nbT*(somme-drec->voCharges[indTries[i]].temps)/denv->charge.temps));
744             }
745             break;
746           }
747           // Calcul de la somme des tâches à envoyer
748           if((polit==Unique && i>=1) ||
749              ((polit==Unite || polit==AM)
750               && denv->charge.nbT<dtmp+drec->voCharges[indTries[i]].nbT+tmpDest[indTries[i]]) ||
751              (polit==Local && drec->voCharges[indTries[i]].temps>=somme)
752              ){
753             limite=true;
754             indLim=i;
755             if(variante & LL){
756               indLim++;
757               sommeTE+=tmpDest[indTries[i]];
758             }
759           }else{
760             sommeTE+=tmpDest[indTries[i]];
761           }
762         }
763
764         INFO1("Somme TE = %d",sommeTE);
765
766         // Isolation des tâches à envoyer
767         if(sommeTE<listeT.size()){
768           for(i=0;i<sommeTE;++i){
769             listeTE.push_back(listeT.back());
770             listeT.pop_back();
771           }
772
773           // Activation de la répartition
774           for(i=0;i<indLim;++i){
775             eenv->dest[indTries[i]]=tmpDest[indTries[i]];
776             INFO2("EQU %s : %d",MSG_host_get_name(voisins[indTries[i]]),eenv->dest[indTries[i]]);
777           }
778         }
779         indTries.clear();
780       }
781       tCycD=tCycF;
782     }
783
784     // MAJ intervalle de temps de équilibrages
785     drec->inter=(1.0-ALPHA)*drec->inter+ALPHA*intervalle;
786     *_sleep=drec->inter;
787
788     // MAJ état de terminaison
789     fini=(listeT.size()==0 && listeTNE.size()==0 && listeTR.size()==0 && nbTerms==MSG_get_host_number());
790   }
791   tGlobalFin=MSG_get_clock();
792
793   // Calcul des temps min et max
794   if(tGlobalDeb<tGDMin){
795     tGDMin=tGlobalDeb;
796   }
797   if(tGlobalFin>tGFMax){
798     tGFMax=tGlobalFin;
799   }  
800
801   // Affichage fin
802   INFO3("Fin calculs sur proc %s : %d (%d)",nomHote.c_str(),nbTachesTraitees,nbTaches[num]);
803   //  INFO5("Fin calculs sur proc %s : %d %d %d %d",nomHote.c_str(),listeT.size(),listeTE.size(),listeTNE.size(),listeTR.size());
804   nbTT+=nbTachesTraitees;
805
806   // Indication terminaison des processus d'envois
807   denv->fini=true;
808   eenv->fini=true;
809   // Attente d'arrêt des comms d'équilibrage en cours
810   while(nbArretEnvs<2*MSG_get_host_number()){
811     //    INFO1("Attente barrière envois sur %s",nomHote.c_str());
812     MSG_process_sleep(*_sleep);
813   }
814   // Indication terminaison des processus de réceptions
815   drec->fini=true;
816   erec->fini=true;
817   // Attente d'arrêt des comms d'infos en cours
818   while(nbArretRecs<2*MSG_get_host_number()){
819     //    INFO1("Attente barrière réceptions sur %s",nomHote.c_str());
820     MSG_process_sleep(*_sleep);
821   }
822
823   // Calcul et affichage de la durée du calcul, de la distance à l'optimal et du gain
824   tempsCalc=tGFMax-tGDMin;
825   //  tempsOpt=ceil((double)nbTT/MSG_get_host_number())*flopsTache*MSG_get_host_number()/somVit;
826   INFO3("Itérations effectuées : %d  \tTotal itérations : %d  \tSurcoût G : %.2f%%",nbTT,nbIt,100*(1.0-(maxTC-tempsCalc)/(maxTC-tempsOpt)));
827   INFO3("Temps de calcul : %f\tTemps optimal : %f\tSurcoût T : %.2f%%",tempsCalc,tempsOpt,100*(tempsCalc-tempsOpt)/tempsOpt);
828   INFO3("Temps initial   : %f\tGain optimal  : %.2f%%   \tGain réel : %.2f%%",maxTC,100*(maxTC-tempsOpt)/maxTC,100*(maxTC-tempsCalc)/maxTC);
829
830   // Désallocation mémoire
831   delete _sleep;
832   listeT.clear();
833   voisins.clear();
834   diffs.clear();
835   indTries.clear();
836   iterCumulees.clear();
837   delete denv;
838   delete[] drec->connectes;
839   delete[] drec->voCharges;
840   delete drec;
841   delete[] eenv->dest;
842   delete eenv;
843   delete erec;
844
845   return 0;
846 }/* end_of_receiver */
847
848
849 /** Test function */
850 MSG_error_t test_all(const char *platform_file,const char *application_file)
851 {
852   MSG_error_t res = MSG_OK;
853   m_host_t *listeHotes=NULL;
854   double maxVit=0,minVit=1e100; // vitesses min et max des procs du système
855   double tempsCalc;
856   int i,j,k;
857                         
858   /*  Simulation setting */
859   MSG_set_channel_number(MAX_CHANNEL);
860   //  MSG_paje_output("equil.trace");
861   MSG_create_environment(platform_file);
862
863   // Allocation du tableau des tâches
864   nbTaches.resize(MSG_get_host_number());
865   // Calcul du nombre initial de tâches locales et comptage du nombre total
866   if(variante & CU){
867     if(numUnique<0){
868       //      numUnique=(myRand()%MSG_get_host_number());
869       numUnique = 0; // On prend vraiment le pire cas
870     }
871     for(i=0;i<MSG_get_host_number();++i){
872       nbTaches[i]=0;
873     }
874     nbTaches[numUnique]=(myRand()%(maxNBT-minNBT+1))+minNBT;
875     nbTI+=nbTaches[numUnique];
876   }else{
877     int nbTG = (myRand()%(maxNBT-minNBT+1))+minNBT;
878     int nbTpP = nbTG / MSG_get_host_number(); // Répartition homogène des
879                                               // tâches sur les procs
880     int surplus = nbTG % MSG_get_host_number();
881     for(i=0 ; i<surplus ; ++i){
882       nbTaches[i] = nbTpP + 1;
883       nbTI += nbTaches[i];
884     }
885     for( ; i<MSG_get_host_number() ; ++i){
886       nbTaches[i] = nbTpP;
887       nbTI += nbTaches[i];
888     }
889   }
890
891   // Allocation et initialisation du tableau des itérations
892   nbIters=new int*[nbTI];
893   nbIt=0;
894   if(variante & AB){
895     /*
896       Répartition pour simuler u processus AdaBoost :
897       maxIt représente le nombre de classifieurs faibles utilisés
898       toutes les tâches prennent maxIt itérations sauf maxIt-1 qui
899       prennent respectivement de 1 à maxIt-1 itérations
900      */
901     for(i=0;i<nbTI;++i){
902       nbIters[i]=new int;
903       *(nbIters[i])=maxIt;
904     }
905     for(i=1;i<maxIt;++i){
906       int indit=myRand()%nbTI;
907       *(nbIters[indit])=i;
908     }
909     for(i=0;i<nbTI;++i){
910       nbIt+=*(nbIters[i]);
911     }
912   }else{
913     nbIters[0]=new int;
914     *(nbIters[0])=(myRand()%(maxIt-minIt+1))+minIt;
915     nbIt+=*(nbIters[0]);
916     for(i=1;i<nbTI;++i){
917       nbIters[i]=new int;
918       if(variante & IL){
919         *(nbIters[i])=*(nbIters[i-1]);
920       }else{
921         *(nbIters[i])=(myRand()%(maxIt-minIt+1))+minIt;
922       }
923       nbIt+=*(nbIters[i]);
924     }
925   }
926
927   // Allocation et calcul des itérations cumulées initiales par proc
928   iterCumulees.resize(MSG_get_host_number(),0);
929   k=0;
930   for(i=0;i<iterCumulees.size();++i){
931     for(j=0;j<nbTaches[i];++j){
932       iterCumulees[i]+=*(nbIters[k++]);
933     }
934   }
935   
936   // Récupération vitesses des machines et temps max de calcul et réglages des tempos
937   listeHotes=MSG_get_host_table();
938   somVit=0;
939   for(i=0;i<MSG_get_host_number();++i){
940     somVit+=MSG_get_host_speed(listeHotes[i]);
941     if(MSG_get_host_speed(listeHotes[i])>maxVit){
942       maxVit=MSG_get_host_speed(listeHotes[i]);
943     }else
944       if(MSG_get_host_speed(listeHotes[i])<minVit){
945         minVit=MSG_get_host_speed(listeHotes[i]);
946       }
947     tempsCalc=iterCumulees[i]*flopsTache/MSG_get_host_speed(listeHotes[i]);
948     if(tempsCalc>maxTC){
949       maxTC=tempsCalc;
950     }
951   }
952   if(SLEEP==0.0){
953     SLEEP=flopsTache/(2*maxVit);
954   }
955   if(INTER==0.0){
956     INTER=0.99*flopsTache*MSG_get_host_number()/somVit; // Réglage délicat car influe sur la vitesse de simu !!!
957   }
958
959   // Calcul du temps optimal
960   tempsOpt=ordoOpt(listeHotes);
961
962   // Affichage des infos générales
963   INFO0("PARAMÈTRES GÉNÉRAUX");
964   INFO0("-------------------");
965   INFO1("     taille données : %f",octetsTache);
966   INFO1("      taille tâches : %f",flopsTache);
967   INFO1("   total itérations : %d",nbIt);
968   INFO1("     temps attentes : %lf",SLEEP);
969   INFO1("intervalle décision : %lf",INTER);
970   INFO1("       coef attente : %f",ALPHA);
971   INFO1("     limite stricte : %s",(variante & LL)?"inactive":"active");
972   INFO1("      charge unique : %s",(variante & CU)?"active":"inactive");
973   INFO1("   itérations liées : %s",(variante & IL)?"active":"inactive");
974   INFO1("  itération vecteur : %s",(variante & IV)?"active":"inactive");
975
976   /*   Application deployment */
977   MSG_function_register("Calculs", calculs);
978   MSG_launch_application(application_file);
979   
980   res = MSG_main();
981
982   nbTaches.clear();
983   delete[] nbIters;
984   return res;
985 } /* end_of_test_all */
986
987
988 /** Main function */
989 int main(int argc, char *argv[])
990 {
991   MSG_error_t res = MSG_OK;
992   int i,flags=0,graine;
993   char *nomPF,*nomDP;
994   MSG_global_init(&argc,argv);
995
996   graine=time(NULL);
997   for(i=0;i<argc;++i){
998     if(argv[i][0]=='-'){
999       switch(argv[i][1]){
1000       case 'A':
1001         variante |= AB;
1002         break;
1003       case 'a':
1004         i++;
1005         ALPHA=atof(argv[i]);
1006         break;
1007       case 'b':
1008         i++;
1009         BETA=atof(argv[i]);
1010         break;
1011       case 'c':
1012         i++;
1013         flopsTache=atof(argv[i]);
1014         break;
1015       case 'd':
1016         i++;
1017         nomDP=argv[i];
1018         flags+=2;
1019         break;
1020       case 'f':
1021         i++;
1022         INTER=atof(argv[i]);
1023         break;
1024       case 'g':
1025         i++;
1026         graine=atoi(argv[i]);
1027         break;
1028       case 'G':
1029         i++;
1030         if(!lireListe(argv[i], valsAleat)){
1031           CRITICAL1 ("Pb while reading random list: %s\n",argv[i]);
1032         }
1033       case 'i':
1034         i++;
1035         minIt=atoi(argv[i]);
1036         break;
1037       case 'I':
1038         i++;
1039         maxIt=atoi(argv[i]);
1040         break;
1041       case 'l':
1042         variante|=IL;
1043         break;
1044       case 'L':
1045         variante|=LL;
1046         break;
1047       case 'o':
1048         i++;
1049         octetsTache=atof(argv[i]);
1050         break;
1051       case 'p':
1052         i++;
1053         nomPF=argv[i];
1054         flags+=1;
1055         break;
1056       case 's':
1057         i++;
1058         SLEEP=atof(argv[i]);
1059         break;
1060       case 't':
1061         i++;
1062         minNBT=atoi(argv[i]);
1063         break;
1064       case 'T':
1065         i++;
1066         maxNBT=atoi(argv[i]);
1067         break;
1068       case 'V':
1069         variante|=IV;
1070         break;
1071       case 'u':
1072         variante|=CU;
1073         if(argc>i+1 && argv[i+1][0]!='-'){
1074           i++;
1075           numUnique=atoi(argv[i]);
1076         }
1077         break;
1078       case 'z':
1079         i++;
1080         switch(argv[i][0]){
1081         case 'U':
1082           polit=Unique;
1083           break;
1084         case '1':
1085           polit=Unite;
1086           break;
1087         case 'A':
1088           polit=AM;
1089           break;
1090         case 'L':
1091           polit=Local;
1092           break;          
1093         case 'l':
1094           polit=Local2;
1095           break;          
1096         }
1097         break;
1098       }
1099     }
1100   }
1101
1102   if(flags!=3){
1103      CRITICAL1 ("Usage : %s -p fichier_plateforme -d fichier_deploiement < options >\n",argv[0]);
1104      CRITICAL0 ("   -A             : application AdaBoost");
1105      CRITICAL0 ("   -a réel        : poids de la dernière mesure d'intervalle d'attente d'équilibrage");
1106      CRITICAL0 ("   -b réel        : poids de la dernière mesure d'intervalle d'attente d'envois/réceptions");
1107      CRITICAL0 ("   -c entier      : quantité de calculs par tâche (en flops)");
1108      CRITICAL0 ("   -f réel        : intervalle de temps pour les tests d'équilibrages (en ms)");
1109      CRITICAL0 ("   -g entier      : initialisation du générateur aléatoire");
1110      CRITICAL0 ("   -i entier      : nombre minimal d'itérations par tâche");
1111      CRITICAL0 ("   -I entier      : nombre maximal d'itérations par tâche");
1112      CRITICAL0 ("   -l             : itérations liées (identiques entre les tâches)");
1113      CRITICAL0 ("   -L             : léger dépassement autorisé de la limite de charge locale");
1114      CRITICAL0 ("   -o entier      : taille des données relatives à une tâche (en octets)");
1115      CRITICAL0 ("   -s réel        : temps de mise en attente des processus (en ms)");
1116      CRITICAL0 ("   -t entier      : nombre minimal de tâches");
1117      CRITICAL0 ("   -T entier      : nombre maximal de tâches");
1118      CRITICAL0 ("   -V             : itérations réalisées sur tout le vecteur local");
1119      CRITICAL0 ("   -u             : charge initiale uniquement sur un processeur");
1120      CRITICAL0 ("   -z [U 1 A L l] : politique de transfert de charge d'un noeud :");
1121      CRITICAL0 ("                    U : transfert vers le moins chargé des voisins");
1122      CRITICAL0 ("                    1 : transfert d'une unité de charge à chaque voisin moins chargé");
1123      CRITICAL0 ("                    A : transfert selon règles données dans thèse AM");
1124      CRITICAL0 ("                    L : répartition équilibrée sur voisins ayant moins de charge");
1125      CRITICAL0 ("                        que la moyenne de la source et des destinataires");
1126      CRITICAL0 ("                    l : ?");
1127      CRITICAL0 ("");
1128      CRITICAL1 ("Exemple:\n\t%s -p platform_equil.xml -d deployment_ligne.xml -g 4 -t 1000 -T 100000 -i 250 -I 1000 -f 0.005 -a 0.5 -u -l -L -V\n",argv[0]);
1129      exit(1);
1130   }
1131
1132   srand(graine);
1133   res = test_all(nomPF,nomDP);
1134   INFO1("Total simulation time: %le", MSG_get_clock());
1135   MSG_clean();
1136
1137   if(res==MSG_OK) return 0; 
1138   else return 1;
1139 } /* end_of_main */
1140
1141 double ordoOpt(m_host_t *listeHotes)
1142 {
1143   int i,j,nb=MSG_get_host_number();
1144   vector<double> ordo;
1145   int ind;
1146   double min,tmp,ret=0;
1147
1148   ordo.resize(nb,0);
1149
1150   for(j=0;j<nbIt;++j){
1151     min=ordo[0]+flopsTache/MSG_get_host_speed(listeHotes[0]);
1152     ind=0;
1153     for(i=1;i<nb;++i){
1154       tmp=ordo[i]+flopsTache/MSG_get_host_speed(listeHotes[i]);
1155       if(tmp<min){
1156         min=tmp;
1157         ind=i;
1158       }
1159     }
1160     ordo[ind]=min;
1161     if(ordo[ind]>ret){
1162       ret=ordo[ind];
1163     }
1164   }
1165
1166   /*
1167     for(i=0;i<nb;++i){
1168       INFO3("Proc %d : %d %f",i+1,(int)rint(ordo[i]*MSG_get_host_speed(listeHotes[i])/flopsTache),ordo[i]);
1169     }
1170   */
1171   ordo.clear();
1172   return ret;
1173 }
1174
1175 bool lireListe(char *nom, vector<int> &valsAleat)
1176 {
1177   ifstream fic;
1178   int i, nb;
1179   bool ret = true;
1180
1181   fic.open(nom);
1182   if(fic.good()){
1183     fic >> nb;
1184     valsAleat.resize(nb);
1185     for(i=0; i<nb; ++i){
1186       fic >> valsAleat[i];
1187     }
1188     fic.close();
1189   }else{
1190     ret = false;
1191   }
1192   return ret;
1193 }
1194
1195 int myRand()
1196 {
1197   static int indice = 0;
1198
1199   if(valsAleat.size() > 0){
1200     int val = valsAleat[indice];
1201     indice = (indice + 1) % valsAleat.size();
1202     return val;
1203   }else{
1204     return rand();
1205   }
1206 }