11 #include "xbt/sysdep.h"
13 #include "xbt/asserts.h"
17 typedef enum {Unique,Unite,AM,Local,Local2} Type_T; // types de politiques de transfert de charge
19 double ALPHA=0.125; // poids de la dernière mesure d'intervalle d'attente d'équilibrage
20 double BETA=0.125; // poids de la dernière mesure d'intervalle d'attente d'envois/réceptions
21 int SEUIL_ENVOI=1; // nombre d'intervalles d'attente avant envoi d'infos
22 double INTER=0.0; // intervalle de temps pour les tests d'équilibrages
23 double SLEEP=0.0; // temps de mise en attente des processus
24 double octetsTache=24.0; // en octets (3 doubles)
25 double flopsTache=500.0; // en flops (500 opérations)
26 int minNBT=1000,maxNBT=10000; // intervalle du nombre de tâches sur chaque proc
27 int nbTerms=0; // nombre de procs ayant terminé leurs calculs
28 int nbTI=0,nbTT=0; // nombres de tâches initiales et traitées
29 double tGDMin=1e100,tGFMax=0; // temps globaux de début et fin
30 int numUnique=-1; // numéro du processeur reçevant la charge en cas de charge unique
31 vector<int> nbTaches; // tableau des nombres de tâches par proc
32 double somVit=0; // somme des vitesses des procs du système
33 double maxTC; // temps de calcul max sur répartition initiale
34 double tempsOpt; // temps optimal de calcul des tâches sur le système
35 int **nbIters=NULL,nbIt; // tableau des nombres d'itérations nécessaires pour la CV (simulée)
36 // et nb total itérations
37 int minIt=1,maxIt=1; // bornes de l'intervalle du nombre d'itérations par tâche
38 bool mutexTE,mutexTNE,mutexTR; // mutex pour la manipulation des listes de tâches
39 vector<int> iterCumulees; // itérations locales cumulées par proc
40 unsigned int variante=0; // indique les variantes éventuelles de l'algo d'équilibrage
41 int nbArretEnvs=0; // nombre de procs ayant arrêté leur processus d'envois des infos/equs
42 int nbArretRecs=0; // nombre de procs ayant arrêté leur processus de réception des infos/equs
43 Type_T polit=AM; /* type de politique de transfert de charge :
44 Unique : transfert vers le moins chargé des voisins
45 Unite : transfert d'une unité de charge à chaque voisin moins chargé
46 AM : transfert selon règles données dans thèse AM
47 Local : répartition équilibrée sur voisins ayant moins de charge que
48 la moyenne de la source et des destinataires
50 vector<int> valsAleat; // liste explicite de valeurs aléatoires (cross-plateforme)
52 typedef enum {BASE=0,LL=1,CU=2,IL=4,IV=8,AB=16} Variante;
54 BASE = algo initial avec limite stricte de la charge locale restante sur l'émetteur
55 LL = léger dépassement de la limite de charge locale autorisé
56 CU = charge initiale unique sur un proc
57 IL = itérations liées (identiques entre les tâches)
58 IV = itérations réalisées sur tout le vecteur local
59 AB = processus d'apprentissage AdaBoost
62 typedef vector<m_task_t> LTaches;
63 typedef vector<m_host_t> LHotes;
96 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,"Messages specific to equil");
98 int envInfos(int argc, char *argv[]);
99 int recInfos(int argc, char *argv[]);
100 int envEqu(int argc, char *argv[]);
101 int recEqu(int argc, char *argv[]);
102 int calculs(int argc, char *argv[]);
103 double ordoOpt(m_host_t *listeHotes);
104 bool lireListe(char *nom, vector<int> &valsAleat);
107 MSG_error_t test_all(const char *platform_file, const char *application_file);
109 typedef enum {PORT_DON,PORT_EQU,MAX_CHANNEL} channel_t;
111 // Fonction d'envoi des messages
112 int envInfos(int argc, char *argv[])
115 m_task_t *enve = NULL;
123 Charge chargePrec={-1.0,-1};
127 // Récupération de son processus
128 moi=MSG_process_self();
130 // Récupération nom machine
131 nomHote=string(MSG_host_get_name(MSG_host_self()));
133 // Récupération données
134 denv=(Infos *)MSG_process_get_data(moi);
135 denv->charge.temps=denv->icl*flopsTache/MSG_get_host_speed(MSG_host_self());
137 // Création tableau des messages d'envois
138 enve=new m_task_t[denv->voisins->size()];
141 pid=MSG_process_self_PID();
143 sstr << "Processus d'envoi des infos sur " << nomHote.c_str();
144 INFO1("\t* %s",sstr.str().c_str());
151 if(compteur==SEUIL_ENVOI || chargePrec.temps!=denv->charge.temps || chargePrec.nbT!=denv->charge.nbT){
152 chargePrec=denv->charge;
153 // Envois bloquants des infos de charge
156 sstr << "Charge de " << mess->nom << " (" << pid << ")";
157 mess->charge=denv->charge;
158 for(i=0;i<denv->voisins->size();++i){
159 enve[i]=MSG_task_create(sstr.str().c_str(),0.0,(double)sizeof(Mess),(void *)mess);
160 err=MSG_task_put(enve[i],(*(denv->voisins))[i],PORT_DON);
162 INFO2("\t* Pb à l'envoi des infos depuis %s vers %s",nomHote.c_str(),MSG_host_get_name((*(denv->voisins))[i]));
166 INFO1("\t* message mal initialisé sur %s",nomHote.c_str());
168 case MSG_HOST_FAILURE:
169 INFO1("\t* hôte destination non joignable depuis %s",nomHote.c_str());
171 case MSG_TRANSFER_FAILURE:
172 INFO1("\t* Erreur de transmission depuis %s",nomHote.c_str());
176 MSG_task_destroy(enve[i]);
178 INFO1("\t* Envoi OK des infos depuis %s",nomHote.c_str());
181 sstr << " " << chargePrec.temps << " (" << chargePrec.nbT << ")";
182 INFO1("\t* %s",sstr.str().c_str());
187 MSG_process_sleep(*(denv->sleep));
191 INFO1("\t* Arrêt processus d'envois des infos sur %s",nomHote.c_str());
193 // Libérations mémoire
198 // Fonction de réception des messages
199 int recInfos(int argc, char *argv[])
212 // Récupération de son processus
213 moi=MSG_process_self();
214 nomHote=string(MSG_host_get_name(MSG_host_self()));
216 // Récupération données
217 drec=(Infos *)MSG_process_get_data(moi);
221 sstr << "Processus de réception des infos sur " << nomHote.c_str();
222 INFO1("\t\t# %s",sstr.str().c_str());
224 // Boucle de réception
225 tDeb=MSG_get_clock();
227 pid=MSG_task_probe_from(PORT_DON);
229 err=MSG_task_get(&recu,PORT_DON);
231 INFO1("\t\t# Pb à la réception des infos sur %s",nomHote.c_str());
233 mess=(Mess *)MSG_task_get_data(recu);
234 drec->charge=mess->charge;
235 for(i=0;i<drec->voisins->size() && mess->nom!=MSG_host_get_name((*(drec->voisins))[i]);++i);
236 if(i<drec->voisins->size()){
237 drec->voCharges[i]=mess->charge;
238 drec->connectes[i]=true;
242 sstr << nomHote.c_str() << " a reçu l'info de charge " << drec->charge.temps << " (" << drec->charge.nbT << ") de " << mess->nom;
243 INFO1("\t\t# %s",sstr.str().c_str());
244 MSG_task_destroy(recu);
248 MSG_process_sleep(*(drec->sleep));
251 tFin=MSG_get_clock();
252 if(!fini && fabs(tFin-tDeb)>=drec->inter){
253 // réinitialisation de l'intervalle de temps
254 for(i=0;i<drec->voisins->size();++i){
255 drec->connectes[i]=false;
261 INFO1("\t\t# Arrêt processus de réceptions des infos sur %s",nomHote.c_str());
264 // Fonction d'envoi des données d'équilibrage
265 int envEqu(int argc, char *argv[])
270 MSG_error_t err=MSG_OK;
275 // Récupération nom machine
276 nomHote=string(MSG_host_get_name(MSG_host_self()));
278 // Récupération données
279 eenv=(Equ *)MSG_process_get_data(MSG_process_self());
281 // Allocation du tableau des messages
282 mess=new MessEqu[(*(eenv->voisins)).size()];
285 INFO1("\t\t\t§ Processus d'envoi des équilibrages sur %s",nomHote.c_str());
289 for(i=0;i<(eenv->voisins)->size();++i){
291 mess[i].nbT=eenv->dest[i];
292 mess[i].tab=new m_task_t[mess[i].nbT];
294 for(j=0;j<mess[i].nbT;++j){
295 mess[i].tab[j]=(eenv->taches)->back();
296 *(eenv->icl)-=*((int *)MSG_task_get_data((eenv->taches)->back()));
297 (eenv->taches)->pop_back();
300 tache=MSG_task_create("lot tâches",0.0,mess[i].nbT*octetsTache,(void *)(mess+i));
301 err=MSG_task_put(tache,(*(eenv->voisins))[i],PORT_EQU);
303 INFO2("\t\t\t§ Pb à l'envoi d'équilibrage depuis %s vers %s",nomHote.c_str(),MSG_host_get_name((*(eenv->voisins))[i]));
305 for(j=0;j<mess[i].nbT;++j){
306 (eenv->recup)->push_back(mess[i].tab[j]);
307 *(eenv->icl)+=*((int *)MSG_task_get_data((eenv->recup)->back()));
310 delete[] mess[i].tab;
312 INFO3("\t\t\t§ Envoi OK de %d tâche(s) depuis %s vers %s",mess[i].nbT,nomHote.c_str(),MSG_host_get_name((*(eenv->voisins))[i]));
313 // (*(eenv->taches)).erase((*(eenv->taches)).end()-mess[i].nbT,(*(eenv->taches)).end());
318 MSG_process_sleep(*(eenv->sleep));
322 INFO1("\t\t\t§ Arrêt processus d'envoi des équilibrages sur %s",nomHote.c_str());
324 // Libérations mémoire
328 // Fonction de réception des données d'équilibrage
329 int recEqu(int argc, char *argv[])
340 // Récupération nom machine
341 nomHote=string(MSG_host_get_name(MSG_host_self()));
343 // Récupération données
344 erec=(Equ *)MSG_process_get_data(MSG_process_self());
347 INFO1("\t\t\t\t@ Processus de réception des équilibrages sur %s",nomHote.c_str());
349 // Boucle de réception
351 pid=MSG_task_probe_from(PORT_EQU);
353 err=MSG_task_get(&tache,PORT_EQU);
355 INFO1("\t\t\t\t@ Pb à la réception d'un équilibrage sur %s",nomHote.c_str());
358 mess=(MessEqu *)MSG_task_get_data(tache);
360 lt=string(MSG_task_get_name(mess->tab[0]))+" oo "+string(MSG_task_get_name(mess->tab[mess->nbT-1]));
361 for(i=0;i<mess->nbT;++i){
362 (erec->taches)->push_back(mess->tab[i]);
363 *(erec->icl)+=*((int *)MSG_task_get_data((erec->taches)->back()));
364 // lt+=string(MSG_task_get_name(mess->tab[i]))+" ";
368 hsrc=MSG_task_get_source(tache);
369 for(i=0;i<erec->voisins->size() && hsrc!=(*(erec->voisins))[i];++i);
370 if(i<erec->voisins->size()){
371 erec->src=MSG_host_get_name((*(erec->voisins))[i]);
373 INFO3("\t\t\t\t@ %s a reçu %d tâche(s) de %s",nomHote.c_str(),mess->nbT,erec->src.c_str());
374 INFO2("\t\t\t\t@ %s %s",nomHote.c_str(),lt.c_str());
378 MSG_process_sleep(*(erec->sleep));
382 INFO1("\t\t\t\t@ Arrêt processus de réceptions des équilibrage sur %s",nomHote.c_str());
385 // Fonction d'équilibrage
386 int calculs(int argc, char *argv[]) // Les paramètres sont : 0 = processus, 1,... = voisins
388 m_host_t monHote=NULL,*listeHotes=NULL;
389 m_task_t mess=NULL,tvect=NULL;
390 LTaches listeT,listeTE,listeTR,listeTNE;
395 string nomHote,nomTache,tmp;
398 m_process_t envi,reci,enve,rece;
401 bool fini=false,fam=false,limite;
406 int nbTachesTraitees=0,nbCon;
407 double tCycD,tCycF,intervalle,*_sleep;
408 double tGlobalDeb, tGlobalFin,tempsCalc;
409 vector<Charge> diffs;
410 vector<int> indTries,tmpDest;
411 int tacheCrte,sommeTE;
413 // Récupération du nom de la machine courante
414 monHote = MSG_host_self();
415 nomHote=string(MSG_host_get_name(monHote));
416 // Récupération de la liste des machines
417 listeHotes=MSG_get_host_table();
418 // Récupération de la position de la machine courante dans la liste
419 for(num=0;num<MSG_get_host_number() && nomHote!=MSG_host_get_name(listeHotes[num]);++num);
421 // Récupération des voisins de la machine courante
422 for(i=0;i<MSG_get_host_number();++i){
423 tmp=string(MSG_host_get_name(listeHotes[i]));
424 for(j=1;j<argc && tmp!=argv[j];++j);
426 voisins.push_back(listeHotes[i]);
429 num--; // numérotation des procs à partir de 0
430 diffs.resize(voisins.size());
434 INFO0("Unknown host... Stopping Now! ");
437 INFO3("Je suis %s et j'ai %d éléments / %d",nomHote.c_str(),nbTaches[num],nbTI);
440 for(i=0;i<voisins.size();++i){
441 sstr << MSG_host_get_name(voisins[i]) << " ";
443 INFO1("Mes voisins sont : %s",sstr.str().c_str());
445 // Allocation des infos de charge
450 denv->charge.temps=0;
451 denv->charge.nbT=nbTaches[num];
452 denv->voisins=&voisins;
453 denv->voCharges=NULL;
454 denv->icl=iterCumulees[num];
459 drec->connectes=new bool[voisins.size()];
460 for(i=0;i<voisins.size();++i){
461 drec->connectes[i]=false;
463 drec->voisins=&voisins;
464 drec->voCharges=new Charge[voisins.size()];
465 for(i=0;i<voisins.size();++i){
466 drec->voCharges[i].temps=1e100;
467 drec->voCharges[i].nbT=1<<30;
472 // Crée processus d'envois/réceptions des infos
473 envi=MSG_process_create("envInfos",&envInfos,(void *)denv,monHote);
474 reci=MSG_process_create("recInfos",&recInfos,(void *)drec,monHote);
476 // Allocation des messages d'équilibrage
479 eenv->dest=new int[voisins.size()];
480 for(i=0;i<voisins.size();++i){
483 eenv->icl=&iterCumulees[num];
484 eenv->taches=&listeTE;
485 eenv->recup=&listeTNE;
486 eenv->voisins=&voisins;
490 erec->icl=&iterCumulees[num];
491 erec->taches=&listeTR;
492 erec->voisins=&voisins;
496 mutexTE=mutexTNE=mutexTR=false;
497 tmpDest.resize(voisins.size(),0);
499 // Crée processus d'envois/réceptions des équilibrages
500 enve=MSG_process_create("envEqu",&envEqu,(void *)eenv,monHote);
501 rece=MSG_process_create("recEqu",&recEqu,(void *)erec,monHote);
503 // Création des tâches
508 for(i=0;i<nbTaches[num];++i){
511 sstr << "T_" << i << "_" << num+1;
513 listeT.push_back(MSG_task_create(nomTache.c_str(),flopsTache,octetsTache,(void *) nbIters[nbCon+i]));
516 // Exécution des itérations
517 tGlobalDeb=tCycD=MSG_get_clock();
521 // Récupération des tâches non correctement envoyées précédemment
522 while(!mutexTNE && listeTNE.size()>0){
523 listeT.push_back(listeTNE.back());
527 // Récupération des tâches reçues d'un équilibrage
528 while(!mutexTR && listeTR.size()>0){
529 listeT.push_back(listeTR.back());
533 // Calculs des tâches
535 // Mise à jour de l'état de terminaison globale si nécessaire
539 INFO1("%s reprends des calculs...",nomHote.c_str());
542 // Calcul seulement si tâche courante valide
543 if(tacheCrte>=listeT.size()){
548 if(!(variante & IV)){
549 INFO2("Nb tâches : %d\tTâche courante : %d",listeT.size(),tacheCrte);
552 for(i=0;i<listeT.size();++i){
553 nomTache+=string(MSG_task_get_name(listeT[i]))+" ";
555 INFO2("!! %s %s",nomHote.c_str(),nomTache.c_str());
557 INFO3("Exécution tâche %s (%d) sur proc %s",MSG_task_get_name(listeT[tacheCrte]),(*((int *) MSG_task_get_data(listeT[tacheCrte]))),nomHote.c_str());
559 INFO2("Exécution des %d tâches locales sur proc %s",listeT.size(),nomHote.c_str());
562 for(i=0;i<listeT.size();++i){
563 nomTache+=string(MSG_task_get_name(listeT[i]))+" (";
566 sstr << *((int *) MSG_task_get_data(listeT[i]));
567 nomTache+=sstr.str()+") ";
569 INFO2("!! %s %s",nomHote.c_str(),nomTache.c_str());
574 tDeb=MSG_get_clock();
576 // Exécution des tâches du vecteur
577 tvect=MSG_task_create("Itération vecteur",flopsTache*listeT.size(),octetsTache*listeT.size(),NULL);
578 if(MSG_task_execute(tvect)!=MSG_OK){
579 INFO0("Pb d'exécution de l'itération sur le vecteur");
581 MSG_task_destroy(tvect);
584 tFin=MSG_get_clock();
585 nbTachesTraitees+=listeT.size();
586 // Calcul charge restante et envoi
587 iterCumulees[num]-=listeT.size(); // Suppression des itérations que l'on vient de faire
588 denv->charge.temps=(tFin-tDeb)*iterCumulees[num];
590 // Décompte et suppression éventuelle de la tâche effectuée
591 for(tacheCrte=0;tacheCrte<listeT.size();){
592 (*((int *) MSG_task_get_data(listeT[tacheCrte])))--;
593 if(*((int *) MSG_task_get_data(listeT[tacheCrte]))==0){
594 delete ((int *) MSG_task_get_data(listeT[tacheCrte]));
595 MSG_task_destroy(listeT[tacheCrte]); // Vérifier si la destruction désalloue les data !!!
596 listeT.erase(listeT.begin()+tacheCrte);
597 INFO1("CONVERGENCE DE TACHE %d",tacheCrte);
603 // Mise à jour nombre de tâches locales
604 denv->charge.nbT=listeT.size();
608 // Exécution tâche courante
609 if(MSG_task_execute(listeT[tacheCrte])!=MSG_OK){
610 INFO1("Pb d'exécution de la tâche %d",tacheCrte);
612 tFin=MSG_get_clock();
615 // Décompte et suppression éventuelle de la tâche effectuée
616 (*((int *) MSG_task_get_data(listeT[tacheCrte])))--;
617 if(*((int *) MSG_task_get_data(listeT[tacheCrte]))==0){
618 delete ((int *) MSG_task_get_data(listeT[tacheCrte]));
619 MSG_task_destroy(listeT[tacheCrte]); // Vérifier si la destruction désalloue les data !!!
620 listeT.erase(listeT.begin()+tacheCrte);
621 INFO1("CONVERGENCE DE TACHE %d",tacheCrte);
626 // Calcul charge restante et envoi
627 iterCumulees[num]--; // Suppression de l'itération que l'on vient de faire
628 denv->charge.temps=(tFin-tDeb)*iterCumulees[num];
629 denv->charge.nbT=listeT.size();
633 if(listeTNE.size()==0 && listeTR.size()==0){
638 if(nbTerms<MSG_get_host_number()){
639 INFO2("%s en attente de calculs... (%f)",nomHote.c_str(),*_sleep);
640 MSG_process_sleep(*_sleep);
645 // Décision équilibrage à intervalle régulier (temps discret)
646 tCycF=MSG_get_clock();
647 intervalle=fabs(tCycF-tCycD);
648 INFO4("-----------\t\t%f %f -> %f / %f",tCycD,tCycF,intervalle,drec->inter);
649 if(intervalle>=drec->inter){
655 sstr << nomHote << " : " << denv->charge.temps << " (" << denv->charge.nbT << ")\t";
656 for(i=0;i<voisins.size();++i){
657 sstr << drec->voCharges[i].temps << " (" << drec->voCharges[i].nbT << ") [";
658 if(drec->connectes[i]){ sstr << "X"; nbCon++; }else sstr << " ";
661 INFO1("%s",sstr.str().c_str());
664 if(listeT.size()>1){ // règles d'équilibrage
665 double somme=denv->charge.temps;
668 // Construction du tableau décroissant des diffs de charges
669 for(i=0;i<voisins.size();++i){
670 if(drec->connectes[i]
671 && eenv->dest[i]==0 // envoi de charge en cours pour le voisin i
672 && (!(variante & LL) || denv->charge.nbT>drec->voCharges[i].nbT)
673 && (denv->charge.temps-drec->voCharges[i].temps)*MSG_get_host_speed(monHote)>flopsTache){
674 diffs[i].temps=denv->charge.temps-drec->voCharges[i].temps;
675 diffs[i].nbT=denv->charge.nbT-drec->voCharges[i].nbT;
676 if(indTries.size()==0){
677 indTries.push_back(i);
679 for(j=0;j<indTries.size() && diffs[i].temps<diffs[indTries[j]].temps;++j);
680 indTries.insert(indTries.begin()+j,i);
682 somme+=drec->voCharges[i].temps;
689 if(nbTermes>0 && polit==Local){
692 // difficile de voir l'intérêt de la boucle sur topologies à degré <=2 !
696 double moyC=denv->charge.temps;
697 for(i=0;i<indTries.size() && drec->voCharges[indTries[i]].temps<somme;++i){
698 moyC+=drec->voCharges[indTries[i]].temps;
701 if(i<indTries.size()){
712 INFO2("Temps = %lf\tTermes = %d",somme,nbTermes);
714 for(i=0;i<indTries.size();++i){
715 INFO5("Diff %s : %lf %d %e %e",MSG_host_get_name(voisins[indTries[i]]),diffs[indTries[i]].temps,diffs[indTries[i]].nbT,denv->charge.temps,drec->voCharges[indTries[i]].temps);
718 // Initialisation de la répartition des envois de charges
721 indLim=indTries.size();
724 // Boucle de calcul de la répartition
725 for(i=0;i<indTries.size() && !limite;++i){
728 dtmp+=(tmpDest[indTries[i]]=(int)rint(diffs[indTries[i]].nbT/2.0));
731 dtmp+=(tmpDest[indTries[i]]=1);
734 dtmp+=(tmpDest[indTries[i]]=(int)rint(diffs[indTries[i]].nbT/(voisins.size()+1.0)));
737 if(drec->voCharges[indTries[i]].temps>0){
738 dtmp+=(tmpDest[indTries[i]]=(int)rint(drec->voCharges[indTries[i]].nbT*(somme-drec->voCharges[indTries[i]].temps)/drec->voCharges[indTries[i]].temps));//(int)rint(diffs[indTries[i]].nbT/2.0));//(voisins.size()+1.0)));
740 // On n'a pas l'info de temps restant sur le proc destination alors
741 // on n'a aucune idée de sa vitesse et donc du nombre de tâches à envoyer...
742 // Le plus raisonnable est de considérer la même vitesse que le proc source
743 dtmp+=(tmpDest[indTries[i]]=(int)rint(denv->charge.nbT*(somme-drec->voCharges[indTries[i]].temps)/denv->charge.temps));
747 // Calcul de la somme des tâches à envoyer
748 if((polit==Unique && i>=1) ||
749 ((polit==Unite || polit==AM)
750 && denv->charge.nbT<dtmp+drec->voCharges[indTries[i]].nbT+tmpDest[indTries[i]]) ||
751 (polit==Local && drec->voCharges[indTries[i]].temps>=somme)
757 sommeTE+=tmpDest[indTries[i]];
760 sommeTE+=tmpDest[indTries[i]];
764 INFO1("Somme TE = %d",sommeTE);
766 // Isolation des tâches à envoyer
767 if(sommeTE<listeT.size()){
768 for(i=0;i<sommeTE;++i){
769 listeTE.push_back(listeT.back());
773 // Activation de la répartition
774 for(i=0;i<indLim;++i){
775 eenv->dest[indTries[i]]=tmpDest[indTries[i]];
776 INFO2("EQU %s : %d",MSG_host_get_name(voisins[indTries[i]]),eenv->dest[indTries[i]]);
784 // MAJ intervalle de temps de équilibrages
785 drec->inter=(1.0-ALPHA)*drec->inter+ALPHA*intervalle;
788 // MAJ état de terminaison
789 fini=(listeT.size()==0 && listeTNE.size()==0 && listeTR.size()==0 && nbTerms==MSG_get_host_number());
791 tGlobalFin=MSG_get_clock();
793 // Calcul des temps min et max
794 if(tGlobalDeb<tGDMin){
797 if(tGlobalFin>tGFMax){
802 INFO3("Fin calculs sur proc %s : %d (%d)",nomHote.c_str(),nbTachesTraitees,nbTaches[num]);
803 // INFO5("Fin calculs sur proc %s : %d %d %d %d",nomHote.c_str(),listeT.size(),listeTE.size(),listeTNE.size(),listeTR.size());
804 nbTT+=nbTachesTraitees;
806 // Indication terminaison des processus d'envois
809 // Attente d'arrêt des comms d'équilibrage en cours
810 while(nbArretEnvs<2*MSG_get_host_number()){
811 // INFO1("Attente barrière envois sur %s",nomHote.c_str());
812 MSG_process_sleep(*_sleep);
814 // Indication terminaison des processus de réceptions
817 // Attente d'arrêt des comms d'infos en cours
818 while(nbArretRecs<2*MSG_get_host_number()){
819 // INFO1("Attente barrière réceptions sur %s",nomHote.c_str());
820 MSG_process_sleep(*_sleep);
823 // Calcul et affichage de la durée du calcul, de la distance à l'optimal et du gain
824 tempsCalc=tGFMax-tGDMin;
825 // tempsOpt=ceil((double)nbTT/MSG_get_host_number())*flopsTache*MSG_get_host_number()/somVit;
826 INFO3("Itérations effectuées : %d \tTotal itérations : %d \tSurcoût G : %.2f%%",nbTT,nbIt,100*(1.0-(maxTC-tempsCalc)/(maxTC-tempsOpt)));
827 INFO3("Temps de calcul : %f\tTemps optimal : %f\tSurcoût T : %.2f%%",tempsCalc,tempsOpt,100*(tempsCalc-tempsOpt)/tempsOpt);
828 INFO3("Temps initial : %f\tGain optimal : %.2f%% \tGain réel : %.2f%%",maxTC,100*(maxTC-tempsOpt)/maxTC,100*(maxTC-tempsCalc)/maxTC);
830 // Désallocation mémoire
836 iterCumulees.clear();
838 delete[] drec->connectes;
839 delete[] drec->voCharges;
846 }/* end_of_receiver */
850 MSG_error_t test_all(const char *platform_file,const char *application_file)
852 MSG_error_t res = MSG_OK;
853 m_host_t *listeHotes=NULL;
854 double maxVit=0,minVit=1e100; // vitesses min et max des procs du système
858 /* Simulation setting */
859 MSG_set_channel_number(MAX_CHANNEL);
860 // MSG_paje_output("equil.trace");
861 MSG_create_environment(platform_file);
863 // Allocation du tableau des tâches
864 nbTaches.resize(MSG_get_host_number());
865 // Calcul du nombre initial de tâches locales et comptage du nombre total
868 // numUnique=(myRand()%MSG_get_host_number());
869 numUnique = 0; // On prend vraiment le pire cas
871 for(i=0;i<MSG_get_host_number();++i){
874 nbTaches[numUnique]=(myRand()%(maxNBT-minNBT+1))+minNBT;
875 nbTI+=nbTaches[numUnique];
877 int nbTG = (myRand()%(maxNBT-minNBT+1))+minNBT;
878 int nbTpP = nbTG / MSG_get_host_number(); // Répartition homogène des
879 // tâches sur les procs
880 int surplus = nbTG % MSG_get_host_number();
881 for(i=0 ; i<surplus ; ++i){
882 nbTaches[i] = nbTpP + 1;
885 for( ; i<MSG_get_host_number() ; ++i){
891 // Allocation et initialisation du tableau des itérations
892 nbIters=new int*[nbTI];
896 Répartition pour simuler u processus AdaBoost :
897 maxIt représente le nombre de classifieurs faibles utilisés
898 toutes les tâches prennent maxIt itérations sauf maxIt-1 qui
899 prennent respectivement de 1 à maxIt-1 itérations
905 for(i=1;i<maxIt;++i){
906 int indit=myRand()%nbTI;
914 *(nbIters[0])=(myRand()%(maxIt-minIt+1))+minIt;
919 *(nbIters[i])=*(nbIters[i-1]);
921 *(nbIters[i])=(myRand()%(maxIt-minIt+1))+minIt;
927 // Allocation et calcul des itérations cumulées initiales par proc
928 iterCumulees.resize(MSG_get_host_number(),0);
930 for(i=0;i<iterCumulees.size();++i){
931 for(j=0;j<nbTaches[i];++j){
932 iterCumulees[i]+=*(nbIters[k++]);
936 // Récupération vitesses des machines et temps max de calcul et réglages des tempos
937 listeHotes=MSG_get_host_table();
939 for(i=0;i<MSG_get_host_number();++i){
940 somVit+=MSG_get_host_speed(listeHotes[i]);
941 if(MSG_get_host_speed(listeHotes[i])>maxVit){
942 maxVit=MSG_get_host_speed(listeHotes[i]);
944 if(MSG_get_host_speed(listeHotes[i])<minVit){
945 minVit=MSG_get_host_speed(listeHotes[i]);
947 tempsCalc=iterCumulees[i]*flopsTache/MSG_get_host_speed(listeHotes[i]);
953 SLEEP=flopsTache/(2*maxVit);
956 INTER=0.99*flopsTache*MSG_get_host_number()/somVit; // Réglage délicat car influe sur la vitesse de simu !!!
959 // Calcul du temps optimal
960 tempsOpt=ordoOpt(listeHotes);
962 // Affichage des infos générales
963 INFO0("PARAMÈTRES GÉNÉRAUX");
964 INFO0("-------------------");
965 INFO1(" taille données : %f",octetsTache);
966 INFO1(" taille tâches : %f",flopsTache);
967 INFO1(" total itérations : %d",nbIt);
968 INFO1(" temps attentes : %lf",SLEEP);
969 INFO1("intervalle décision : %lf",INTER);
970 INFO1(" coef attente : %f",ALPHA);
971 INFO1(" limite stricte : %s",(variante & LL)?"inactive":"active");
972 INFO1(" charge unique : %s",(variante & CU)?"active":"inactive");
973 INFO1(" itérations liées : %s",(variante & IL)?"active":"inactive");
974 INFO1(" itération vecteur : %s",(variante & IV)?"active":"inactive");
976 /* Application deployment */
977 MSG_function_register("Calculs", calculs);
978 MSG_launch_application(application_file);
985 } /* end_of_test_all */
989 int main(int argc, char *argv[])
991 MSG_error_t res = MSG_OK;
992 int i,flags=0,graine;
994 MSG_global_init(&argc,argv);
1005 ALPHA=atof(argv[i]);
1013 flopsTache=atof(argv[i]);
1022 INTER=atof(argv[i]);
1026 graine=atoi(argv[i]);
1030 if(!lireListe(argv[i], valsAleat)){
1031 CRITICAL1 ("Pb while reading random list: %s\n",argv[i]);
1035 minIt=atoi(argv[i]);
1039 maxIt=atoi(argv[i]);
1049 octetsTache=atof(argv[i]);
1058 SLEEP=atof(argv[i]);
1062 minNBT=atoi(argv[i]);
1066 maxNBT=atoi(argv[i]);
1073 if(argc>i+1 && argv[i+1][0]!='-'){
1075 numUnique=atoi(argv[i]);
1103 CRITICAL1 ("Usage : %s -p fichier_plateforme -d fichier_deploiement < options >\n",argv[0]);
1104 CRITICAL0 (" -A : application AdaBoost");
1105 CRITICAL0 (" -a réel : poids de la dernière mesure d'intervalle d'attente d'équilibrage");
1106 CRITICAL0 (" -b réel : poids de la dernière mesure d'intervalle d'attente d'envois/réceptions");
1107 CRITICAL0 (" -c entier : quantité de calculs par tâche (en flops)");
1108 CRITICAL0 (" -f réel : intervalle de temps pour les tests d'équilibrages (en ms)");
1109 CRITICAL0 (" -g entier : initialisation du générateur aléatoire");
1110 CRITICAL0 (" -i entier : nombre minimal d'itérations par tâche");
1111 CRITICAL0 (" -I entier : nombre maximal d'itérations par tâche");
1112 CRITICAL0 (" -l : itérations liées (identiques entre les tâches)");
1113 CRITICAL0 (" -L : léger dépassement autorisé de la limite de charge locale");
1114 CRITICAL0 (" -o entier : taille des données relatives à une tâche (en octets)");
1115 CRITICAL0 (" -s réel : temps de mise en attente des processus (en ms)");
1116 CRITICAL0 (" -t entier : nombre minimal de tâches");
1117 CRITICAL0 (" -T entier : nombre maximal de tâches");
1118 CRITICAL0 (" -V : itérations réalisées sur tout le vecteur local");
1119 CRITICAL0 (" -u : charge initiale uniquement sur un processeur");
1120 CRITICAL0 (" -z [U 1 A L l] : politique de transfert de charge d'un noeud :");
1121 CRITICAL0 (" U : transfert vers le moins chargé des voisins");
1122 CRITICAL0 (" 1 : transfert d'une unité de charge à chaque voisin moins chargé");
1123 CRITICAL0 (" A : transfert selon règles données dans thèse AM");
1124 CRITICAL0 (" L : répartition équilibrée sur voisins ayant moins de charge");
1125 CRITICAL0 (" que la moyenne de la source et des destinataires");
1126 CRITICAL0 (" l : ?");
1128 CRITICAL1 ("Exemple:\n\t%s -p platform_equil.xml -d deployment_ligne.xml -g 4 -t 1000 -T 100000 -i 250 -I 1000 -f 0.005 -a 0.5 -u -l -L -V\n",argv[0]);
1133 res = test_all(nomPF,nomDP);
1134 INFO1("Total simulation time: %le", MSG_get_clock());
1137 if(res==MSG_OK) return 0;
1141 double ordoOpt(m_host_t *listeHotes)
1143 int i,j,nb=MSG_get_host_number();
1144 vector<double> ordo;
1146 double min,tmp,ret=0;
1150 for(j=0;j<nbIt;++j){
1151 min=ordo[0]+flopsTache/MSG_get_host_speed(listeHotes[0]);
1154 tmp=ordo[i]+flopsTache/MSG_get_host_speed(listeHotes[i]);
1168 INFO3("Proc %d : %d %f",i+1,(int)rint(ordo[i]*MSG_get_host_speed(listeHotes[i])/flopsTache),ordo[i]);
1175 bool lireListe(char *nom, vector<int> &valsAleat)
1184 valsAleat.resize(nb);
1185 for(i=0; i<nb; ++i){
1186 fic >> valsAleat[i];
1197 static int indice = 0;
1199 if(valsAleat.size() > 0){
1200 int val = valsAleat[indice];
1201 indice = (indice + 1) % valsAleat.size();