]> AND Private Git Repository - loba.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
merge de loba_fairstrategy
authorRaphael Couturier <raphael.couturier@univ-fcomte.fr>
Sun, 30 Jan 2011 13:35:19 +0000 (14:35 +0100)
committerRaphael Couturier <raphael.couturier@univ-fcomte.fr>
Sun, 30 Jan 2011 13:35:19 +0000 (14:35 +0100)
Conflicts:
loba_fairstrategy.cpp

23 files changed:
.gitignore
BUGS [new file with mode: 0644]
Makefile
README
TODO
communicator.cpp
communicator.h
extract.pl [new file with mode: 0755]
loba_fairstrategy.cpp
loba_fairstrategy.h
loba_simple.cpp
loba_simple.h
main.cpp
neighbor.cpp
neighbor.h
options.cpp
options.h
process.cpp
process.h
statistics.h [new file with mode: 0644]
tracing.h [new file with mode: 0644]
valgrind_suppressions_3.5
version.cpp

index 1b9d7b76c20ab9aae043f6f16bcf1405659f983f..9dea7c15d1a0d2e7e5353c21b062ca05a5ec8621 100644 (file)
@@ -2,7 +2,7 @@
 *.d
 *.o
 
-cachegrind.out.*
+callgrind.out.*
 gmon.out
 
 core
@@ -12,8 +12,12 @@ localversion
 misc_autogen.h
 
 loba
+loba-dev
+loba-stable
 simple_async
 
+*_dev.xml
+
 simgrid-dev
 simgrid-stable
 */
diff --git a/BUGS b/BUGS
new file mode 100644 (file)
index 0000000..5cc1040
--- /dev/null
+++ b/BUGS
@@ -0,0 +1,17 @@
+========================================================================
+Il semblerait qu'il y ait un bug dans SG 3.5, et qu'on ne puisse pas
+utiliser MSG_comm_waitany() pour l'émetteur *et* le récepteur sans
+risquer d'interblocage.
+
+Le problème devrait être contourné correctement depuis le commit
+cd6b253 Use MSG_comm_waitall for communicator::flush(true).
+
+========================================================================
+Avec SG 3.5, les communications doivent être détruites dès que
+possible avec MSG_comm_destroy().  Si ce n'est pas fait, la simulation
+peut être extrêmement ralentie.
+
+Le problème devrait être contourné correctement depuis le commit
+404a8d5 Do not call flush automatically in communcator::send...
+
+========================================================================
index a80f02591c031fd33f698180d554b7d246f10c03..276053128325715e7761bb88b7c0245a81846888 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -6,11 +6,16 @@ DEBUG_FLAGS += -g
 #DEBUG_FLAGS += -pg
 CHECK_FLAGS += -Wall -Wextra
 
+CC := gcc
 CXX := g++
 
 CPPFLAGS += -I $(SIMGRID_INSTALL_DIR)/include
 CPPFLAGS += $(CHECK_FLAGS)
 
+#CFLAGS += -std=c99
+#CFLAGS += -fgnu89-inline      # workaround simgrid bug
+CFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS)
+
 #CXXFLAGS += -std=c++0x
 CXXFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS)
 
@@ -46,13 +51,25 @@ FLAVOURED_LOBA := loba-dev loba-stable
 TARGETS := $(DEFAULT_TARGETS)  \
           simple_async
 
-.PHONY: all full clean realclean $(FLAVOURED_LOBA)
+XML_FILES =                                            \
+       Dep.xml  Plat.xml                               \
+       platform.xml deployment.xml simple_async.xml    \
+       cluster1000.xml
+
+XML_DEV_FILES = $(XML_FILES:%.xml=%_dev.xml)
+
+.PHONY: all full xml clean realclean $(FLAVOURED_LOBA)
 
 all: $(DEFAULT_TARGETS)
 
 full:
-       $(MAKE) $(FLAVOURED_LOBA)
-       $(MAKE) $(TARGETS)
+       @for target in $(FLAVOURED_LOBA); do    \
+               echo $(MAKE) "$$target";        \
+               $(MAKE) "$$target";             \
+       done
+       $(MAKE) xml $(DEFAULT_TARGETS)
+
+xml: $(XML_DEV_FILES)
 
 clean:
        $(RM) core core.[0-9]* vgcore.[0-9]*
@@ -64,10 +81,14 @@ clean:
 
 realclean: clean
        $(RM) $(FLAVOURED_LOBA)
+       $(RM) $(XML_DEV_FILES)
        $(RM) *~
 
 %.d: %.cpp ; $(MAKEDEPEND.CXX)
 
+%_dev.xml: %.xml
+       sed '/DOCTYPE/s,simgrid.dtd,http://simgrid.gforge.inria.fr/&,' $< > $@
+
 $(FLAVOURED_LOBA):
        $(MAKE) clean
        $(MAKE) SIMGRID_INSTALL_DIR=./simgrid-$(patsubst loba-%,%,$@) loba
diff --git a/README b/README
index d34cb4bf903697082eabe3d3d8a25e7fe9b7408c..bf2d9157cf6271835dde203c847cf9c3560a2c5f 100644 (file)
--- a/README
+++ b/README
@@ -3,6 +3,7 @@ Contenu
 * Compilation de SimGrid
 * Compilation...
 * Utilisation
+* Tracé de courbes
 * Communications
 * Pour ajouter un nouvel algorithme d'équilibrage
 * Pour ajouter une nouvelle option au programme
@@ -45,6 +46,16 @@ Pour changer le niveau de détail des affichages :
 Pour plus de détail sur les options de logging :
     http://simgrid.gforge.inria.fr/doc/group__XBT__log.html#log_use
 
+Tracé de courbes
+================
+
+Le script extract.pl permet d'extraire les données à partir des traces
+de simulation et de le présenter sous un format acceptable par gnuplot
+ou par graph (plotutils).
+
+Exemple:
+        ./loba platform.xml 2>&1 | ./extract.pl | graph -CTX
+
 Communications
 ==============
 
@@ -81,15 +92,12 @@ Pour ajouter un nouvel algorithme d'équilibrage
    - définir une nouvelle classe dérivant de process
    - attention, il faut construire le process explicitement
    - redéfinir la méthode load_balance qui :
-     - reçoit en paramètre la charge à prendre en compte ;
+     - peut récupérer la charge courante avec get_load()
      - peut utiliser et éventuellement réordonner le tableau process::pneigh ;
      - peut récupérer l'information de charge d'un voisin avec
            pneigh[i]->get_load() ;
      - définit la charge à envoyer avec
-           pneigh[i]->set_to_send(quantité) ;
-     - retourne la somme des quantités définies avec set_to_send, 
-       éventuellement à l'aide de la méthode process::sum_of_to_send()
-       qui clacule cette somme.
+           send(pneigh[i], quantité) ;
 
 2. Ajouter l'algorithme dans la liste des options.  Dans options.cpp :
    - faire le #include adéquat ;
@@ -141,6 +149,8 @@ Liste de fichiers
     loba_simple.h               équilibrage simple
     loba_simple.cpp             (à imiter pour ajouter d'autres algorithmes)
 
+    loba_*.{h,cpp}              autres algos d'équilibrage
+
     main.cpp                    le programme principal
 
     misc.h                      divers trucs inclassables
@@ -167,6 +177,21 @@ Liste de fichiers
     version.h                   gestion de la version du programme
     version.cpp
 
+* fichiers auto-générés
+
+    misc_autogen.h              définition des macros XCLOG(...)
+
+* scripts
+
+    colorized-loba              script pour exécuter loba en colorant les
+                                sorties
+
+    extract.pl                  outil d'extraction des données à partir des
+                                traces, pour tracer des courbes
+
+    setlocalversion             calcule un numéro de version à partir du hash
+                                du dernier commit (git)
+
 * autres fichiers
 
     .gitignore                  liste des fichiers ignorés par git
diff --git a/TODO b/TODO
index 6fedf482ad7723201ad91a62d37e13d65beec43d..dd423e27ebf92cd463313c4b19bbc0f9c2495040 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,8 +1,13 @@
+* review receive with timeout.
+
 * verify bookkeeping version.
 
-* add options -j/-J : minimum number of iterations ?
+* add several metrics
+  - message exchanges : number/volume of sent/received data/ctrl messages
+
+* add options -j/-J : minimum number of iterations?
 
-* add a variant to (not) change neighbor load information at send.
+* add a variant to (not) change neighbor load information at send?
 
 * implement loba_* algorithms (start with some trivial one)
 
index 0f9bc7eabb8cac225cf4acf9b0cfff85bc161ae9..e30d73187dc8e184ece920423ed33cca03e16454 100644 (file)
@@ -9,6 +9,8 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm);
 
 #include "misc.h"
 #include "options.h"
+#include "simgrid_features.h"
+#include "tracing.h"
 
 #include "communicator.h"
 
@@ -21,13 +23,10 @@ std::string message::to_string()
     return oss.str();
 }
 
-int communicator::send_count_before_flush = 4;
-
 communicator::communicator()
-    : host((hostdata* )MSG_host_get_data(MSG_host_self()))
+    : host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
     , mutex(xbt_mutex_init())
     , cond(xbt_cond_init())
-    , send_counter(0)
     , ctrl_task(NULL)
     , ctrl_comm(NULL)
     , data_task(NULL)
@@ -37,6 +36,7 @@ communicator::communicator()
     receiver_process =
         MSG_process_create("receiver", communicator::receiver_wrapper,
                            this, MSG_host_self());
+    xbt_cond_wait(cond, mutex); // wait for the receiver to be ready
     xbt_mutex_release(mutex);
 }
 
@@ -81,22 +81,33 @@ void communicator::send(const char* dest, message* msg)
     if (msg->get_type() == message::LOAD)
         msg_size += opt::comm_cost(msg->get_amount());
     m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);    
+    TRACE_msg_set_task_category(task,
+                                msg->get_type() == message::LOAD ?
+                                TRACE_CAT_DATA : TRACE_CAT_CTRL);
     msg_comm_t comm = MSG_task_isend(task, dest);
     sent_comm.push_back(comm);
-
-    if (++send_counter >= send_count_before_flush) {
-        flush(false);
-        send_counter = 0;
-    }
 }
 
-bool communicator::recv(message*& msg, m_host_t& from, bool wait)
+bool communicator::recv(message*& msg, m_host_t& from, double timeout)
 {
-    if (wait) {
+    if (timeout != 0) {
+        volatile double deadline =
+            timeout > 0 ? MSG_get_clock() + timeout : 0.0;
         xbt_mutex_acquire(mutex);
-        while (received.empty()) {
+        while (received.empty() && (!deadline || deadline > MSG_get_clock())) {
+            xbt_ex_t e;
             DEBUG0("waiting for a message to come");
-            xbt_cond_wait(cond, mutex);
+            TRY {
+                if (deadline)
+                    xbt_cond_timedwait(cond, mutex, deadline - MSG_get_clock());
+                else
+                    xbt_cond_wait(cond, mutex);
+            }
+            CATCH (e) {
+                if (e.category != timeout_error)
+                    RETHROW;
+                xbt_ex_free(e);
+            }
         }
         xbt_mutex_release(mutex);
     }
@@ -106,7 +117,7 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait)
 
     m_task_t task = received.front();
     received.pop();
-    msg = (message* )MSG_task_get_data(task);
+    msg = static_cast<message*>(MSG_task_get_data(task));
     from = MSG_task_get_source(task);
     MSG_task_destroy(task);
 
@@ -118,21 +129,14 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait)
 
 void communicator::flush(bool wait)
 {
-    using std::tr1::bind;
-    using std::tr1::placeholders::_1;
-
     sent_comm.remove_if(comm_test_n_destroy);
     if (wait && !sent_comm.empty()) {
-        xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
-        while (!sent_comm.empty()) {
-            std::for_each(sent_comm.begin(), sent_comm.end(),
-                          bind(xbt_dynar_push,
-                               comms, bind(misc::address<msg_comm_t>(), _1)));
-            MSG_comm_waitany(comms);
-            xbt_dynar_reset(comms);
-            sent_comm.remove_if(comm_test_n_destroy);
-        }
-        xbt_dynar_free(&comms);
+        msg_comm_t comms[sent_comm.size()];
+        std::copy(sent_comm.begin(), sent_comm.end(), comms);
+        MSG_comm_waitall(comms, sent_comm.size(), -1.0);
+        if (!MSG_WAIT_DESTROYS_COMMS)
+            std::for_each(sent_comm.begin(), sent_comm.end(), MSG_comm_destroy);
+        sent_comm.clear();
     }
 }
 
@@ -148,7 +152,7 @@ bool communicator::comm_test_n_destroy(msg_comm_t comm)
 int communicator::receiver_wrapper(int, char* [])
 {
     communicator* comm;
-    comm = (communicator* )MSG_process_get_data(MSG_process_self());
+    comm = static_cast<communicator*>(MSG_process_get_data(MSG_process_self()));
     int result = comm->receiver();
 
     DEBUG0("terminate");
@@ -164,6 +168,11 @@ int communicator::receiver()
 {
     ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
     data_comm = MSG_task_irecv(&data_task, get_data_mbox());
+    DEBUG0("receiver ready");
+    xbt_mutex_acquire(mutex);
+    xbt_cond_signal(cond);      // signal master that we are ready
+    xbt_mutex_release(mutex);
+
     xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
     while (ctrl_comm || data_comm) {
 
@@ -177,7 +186,9 @@ int communicator::receiver()
         if (ctrl_comm && comm_test_n_destroy(ctrl_comm)) {
             if (strcmp(MSG_task_get_name(ctrl_task), "finalize")) {
                 DEBUG0("received message from ctrl");
+                xbt_mutex_acquire(mutex);
                 received.push(ctrl_task);
+                xbt_mutex_release(mutex);
                 ctrl_task = NULL;
                 ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
             } else {
@@ -191,7 +202,9 @@ int communicator::receiver()
         if (data_comm && comm_test_n_destroy(data_comm)) {
             if (strcmp(MSG_task_get_name(data_task), "finalize")) {
                 DEBUG0("received message from data");
+                xbt_mutex_acquire(mutex);
                 received.push(data_task);
+                xbt_mutex_release(mutex);
                 data_task = NULL;
                 data_comm = MSG_task_irecv(&data_task, get_data_mbox());
             } else {
@@ -202,7 +215,8 @@ int communicator::receiver()
             }
         }
         xbt_mutex_acquire(mutex);
-        xbt_cond_signal(cond);
+        if (!received.empty())
+            xbt_cond_signal(cond);
         xbt_mutex_release(mutex);
     }
     xbt_dynar_free(&comms);
index b04301ad87574aff718d3fc7b8539d1973183bab..66dc800b4feeea9502902348847fa85757e41682 100644 (file)
@@ -9,14 +9,6 @@
 #include <msg/msg.h>
 #include "hostdata.h"
 
-// Cannot include "options.h" without error, so only declare the
-// needed functions.
-namespace opt {
-    bool parse_args(int* argc, char* argv[]);
-    void print();
-    void usage();
-}
-
 class message {
 public:
     enum message_type { INFO, CREDIT, LOAD, CTRL_CLOSE, DATA_CLOSE };
@@ -42,8 +34,9 @@ public:
     void send(const char* dest, message* msg);
 
     // Try to get a message.  Returns true on success.
-    // If "wait" is true, blocks until success.
-    bool recv(message*& msg, m_host_t& from, bool wait);
+    // Parameter "timeout" may be 0 for non-blocking operation, -1 for
+    // infinite waiting, or any positive timeout.
+    bool recv(message*& msg, m_host_t& from, double timeout);
 
     // Try to flush pending sending communications.
     // If "wait" is true, blocks until success.
@@ -59,8 +52,6 @@ private:
 
     // List of pending send communications
     std::list<msg_comm_t> sent_comm;
-    static int send_count_before_flush;
-    int send_counter;
 
     // Queue of received messages
     std::queue<m_task_t> received;
@@ -83,12 +74,6 @@ private:
 
     // Used to test if a communication is over, and to destroy it if it is
     static bool comm_test_n_destroy(msg_comm_t comm);
-
-    // Make opt::* functions our friends to provide them an access to
-    // send_count_before_flush
-    friend bool opt::parse_args(int*, char* []);
-    friend void opt::print();
-    friend void opt::usage();
 };
 
 #endif // !COMMUNICATOR_H
diff --git a/extract.pl b/extract.pl
new file mode 100755 (executable)
index 0000000..fa393b9
--- /dev/null
@@ -0,0 +1,106 @@
+#!/usr/bin/env perl
+
+use strict;
+use warnings;
+
+my $bookkeeping;
+my $flt = '[+-]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?';
+my $pflt = "($flt)";
+my $prefix = '^\[([^: ]+)(?::loba:\(\d+\))? ' . $pflt . '\] \[proc/INFO\] ';
+my $initmatch = $prefix . 'Initial load: ' . $pflt . '';
+my $finalmatch;
+my $plainmatch;
+
+my %alldata = ();
+
+while (<>) {
+    chomp;
+    if (s{^(?:\[0\.0+\] )?\[main/INFO\] \| bookkeeping\.*: }{}) {
+        $bookkeeping = $_ eq "on";
+        if ($bookkeeping) {
+            $finalmatch = $prefix .
+                'Final load after (\d+):(\d+) iterations: ' . $pflt .
+                ' ; expected: ' . $pflt;
+            $plainmatch = $prefix .
+                '\((\d+):(\d+)\) current load: ' . $pflt .
+                ' ; expected: ' . $pflt;
+        } else {
+            $finalmatch = $prefix .
+                'Final load after (\d+) iterations: ' . $pflt;
+            $plainmatch = $prefix . '\((\d+)\) current load: ' . $pflt;
+        }
+        if (0) {
+            print STDERR "BOOKKEEPING: \"$_\" ($bookkeeping)\n";
+            print STDERR "INITMATCH..: \"$initmatch\"\n";
+            print STDERR "PLAINMATCH.: \"$plainmatch\"\n";
+            print STDERR "FINALMATCH.: \"$finalmatch\"\n";
+        }
+    }
+    next if not defined $bookkeeping;
+    if (m{$plainmatch}) {
+        my $host = $1;
+        my $data;
+        if ($bookkeeping) {
+            $data = {
+                time     => $2,
+                lb       => $3,
+                comp     => $4,
+                load     => $5,
+                expected => $6,
+            };
+        } else {
+            $data = {
+                time     => $2,
+                lb       => $3,
+                comp     => $3,
+                load     => $4,
+                expected => $4,
+            };
+        }
+#        print STDERR "PUSH $host $data->{time} $data->{load} (plain)\n";
+        push @{$alldata{$host}}, $data;
+    } if (m{$initmatch}) {
+        my $host = $1;
+        my $data = {
+            time     => $2,
+            lb       => 0,
+            comp     => 0,
+            load     => $3,
+            expected => $3,
+        };
+#        print STDERR "PUSH $host $data->{time} $data->{load} (init)\n";
+        push @{$alldata{$host}}, $data;
+    } elsif (m{$finalmatch}) {
+        my $host = $1;
+        my $data;
+        if ($bookkeeping) {
+            $data = {
+                time     => $2,
+                lb       => $3,
+                comp     => $4,
+                load     => $5,
+                expected => $6,
+            };
+         } else {
+             $data = {
+                time     => $2,
+                lb       => $3,
+                comp     => $3,
+                load     => $4,
+                expected => $4,
+             };
+         }
+#        print STDERR "PUSH $host $data->{time} $data->{load} (final)\n";
+        push @{$alldata{$host}}, $data;
+    }
+}
+
+foreach my $host (sort(keys %alldata)) {
+#    print STDERR "GOT \"$host\"\n";
+    my $datalist = $alldata{$host};
+    print "# $host\n";
+    foreach my $data (@{$datalist}) {
+        print "$data->{time} $data->{load}\n";
+    }
+    print "\n"
+}
index e1a0d29472c5fbc7eef11f8fcbcf7b6fbc0d0ac1..7e02a21361c646ea6aabc18e084d7233cc2ab219 100644 (file)
@@ -11,39 +11,31 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(loba);
  */
 
 class compare {
-public : 
-    bool operator()(const neighbor*a, const neighbor*b)        {
-        return a->get_load()>b->get_load();
+public:
+    bool operator()(const neighbor*a, const neighbor*b) {
+        return a->get_load() > b->get_load();
     }
 };
 
-double loba_fairstrategy::load_balance(double my_load)
+void loba_fairstrategy::load_balance()
 {
     std::sort(pneigh.begin(), pneigh.end(), compare());
 
                // print_loads_p();
+    //print_loads_p(false, xbt_log_priority_debug);
 
-    double sum_sent=0;
-    bool found=true;
-    
-    while(found) {
-        found=false;
+    bool found = true;
+
+    while (found) {
+        found = false;
         for (unsigned i = 0 ; i < pneigh.size() ; ++i) {
-            double l = pneigh[i]->get_load();
-            if (l >= my_load)
-                continue;
-            if (l < my_load+2) {
-                found=true;
-                pneigh[i]->add_load(1);
-                pneigh[i]->add_to_send(1);
-                INFO1("sent to %s",pneigh[i]->get_name());
-                my_load--;
-                sum_sent++;
+            if (pneigh[i]->get_load() <= get_load() - 2) {
+                found = true;
+                send(pneigh[i], 1);
+                DEBUG1("sent to %s", pneigh[i]->get_name());
             }
         }
     }
-
-    return sum_sent;
 }
 
 // Local variables:
index 9984c367300bb789c5874dd13a20c0395d4e18e1..3e09627c4b5536067dda1d436c5252d7d347e0d0 100644 (file)
@@ -9,7 +9,7 @@ public:
     ~loba_fairstrategy()                                           { }
 
 private:
-    double load_balance(double my_load);
+    void load_balance();
 };
 
 #endif //!LOBA_SIMPLE
index 5702fba8a667fb76d6c982bd495769f1363adec2..2d4103cb0f5139c0ffc93c220ed563fbe3470a81 100644 (file)
@@ -8,15 +8,15 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(loba);
  *   load balance with a least-loaded neighbor,
  *   without breaking the ping-pong condition
  */
-double loba_simple::load_balance(double my_load)
+void loba_simple::load_balance()
 {
     int imin = -1;
     int imax = -1;
-    double min = my_load;
+    double min = get_load();
     double max = -1.0;
     for (unsigned i = 0 ; i < pneigh.size() ; ++i) {
         double l = pneigh[i]->get_load();
-        if (l >= my_load)
+        if (l >= get_load())
             continue;
         if (l < min) {
             imin = i;
@@ -29,13 +29,9 @@ double loba_simple::load_balance(double my_load)
     }
     if (imin != -1) {
         // found someone
-        double balance = (my_load - min) / 2;
-        DEBUG6("%d:%g %d:%g %g %g", imin, min, imax, max, my_load, balance);
-        pneigh[imin]->set_to_send(balance);
-        pneigh[imin]->add_load(balance);
-        return balance;
-    } else {
-        return 0.0;
+        double balance = (get_load() - max) / 2;
+        DEBUG6("%d:%g %d:%g %g %g", imin, min, imax, max, get_load(), balance);
+        send(pneigh[imin], balance);
     }
 }
 
index 1a2fd7366cde092e05270f806730bec7a7ea0ccf..77cd8adba882657334e702e7bffd87c29d3d0efb 100644 (file)
@@ -9,7 +9,7 @@ public:
     ~loba_simple()                                           { }
 
 private:
-    double load_balance(double my_load);
+    void load_balance();
 };
 
 #endif //!LOBA_SIMPLE
index 907869eaad4aa4d2d517160d8648200a84e7198e..fc42d750f9c992b8fe9a27b17f5aac7ec232b48c 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -1,11 +1,6 @@
-#include <algorithm>
-#include <cmath>
 #include <cstring>
-#include <tr1/functional>
 #include <iostream>
-#include <numeric>
 #include <stdexcept>
-#include <vector>
 #include <msg/msg.h>
 #include <xbt/log.h>
 
@@ -24,7 +19,9 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main);
 #include "misc.h"
 #include "options.h"
 #include "process.h"
+#include "statistics.h"
 #include "timer.h"
+#include "tracing.h"
 #include "version.h"
 
 namespace {
@@ -37,9 +34,13 @@ namespace {
         EXIT_FAILURE_CLEAN = 0x08,  // error at cleanup
     };
 
-    std::vector<double> loads;
-    double load_stddev;
-    double load_avg;
+    xbt_mutex_t proc_mutex;
+    xbt_cond_t proc_cond;
+    unsigned proc_counter;
+
+    struct statistics comps;
+    struct statistics loads;
+
 }
 
 static int simulation_main(int argc, char* argv[])
@@ -48,8 +49,28 @@ static int simulation_main(int argc, char* argv[])
     process* proc;
     try {
         proc = opt::loba_algorithms.new_instance(opt::loba_algo, argc, argv);
+
+        xbt_mutex_acquire(proc_mutex);
+        ++proc_counter;
+        xbt_mutex_release(proc_mutex);
+
         result = proc->run();
-        loads.push_back(proc->get_load());
+
+        xbt_mutex_acquire(proc_mutex);
+        comps.push(proc->get_comp());
+        loads.push(proc->get_real_load());
+
+        // Synchronization barrier...
+        // The goal is to circumvent a limitation in SimGrid (at least
+        // in version 3.5): a process must be alive when another one
+        // destroys a communication they had together.
+
+        --proc_counter;
+        xbt_cond_broadcast(proc_cond);
+        while (proc_counter > 0)
+            xbt_cond_wait(proc_cond, proc_mutex);
+        xbt_mutex_release(proc_mutex);
+
         delete proc;
     }
     catch (std::invalid_argument& e) {
@@ -67,12 +88,12 @@ static void check_for_lost_load()
     double lost_ratio = 100.0 * lost / total_init;
     if (lost_ratio < -opt::load_ratio_threshold)
         CRITICAL2("Gained load at exit! %g (%g%%) <============",
-                  lost, lost_ratio);
+                  -lost, -lost_ratio);
     else if (lost_ratio > opt::load_ratio_threshold)
         CRITICAL2("Lost load at exit! %g (%g%%) <============",
                   lost, lost_ratio);
     else
-        DEBUG2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio);
+        VERB2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio);
 
     double total_running = process::get_total_load_running();
     double running_ratio = 100.0 * total_running / total_init;
@@ -83,27 +104,14 @@ static void check_for_lost_load()
         CRITICAL2("Remaining running load at exit! %g (%g%%) <============",
                   total_running, running_ratio);
     else
-        DEBUG2("Running load at exit looks good: %g (%g%%)",
+        VERB2("Running load at exit looks good: %g (%g%%)",
                total_running, running_ratio);
 }
 
-static void compute_load_imbalance()
-{
-    using std::tr1::bind;
-    using std::tr1::placeholders::_1;
-
-    unsigned n = loads.size();
-    load_avg = std::accumulate(loads.begin(), loads.end(), 0.0) / n;
-
-    std::vector<double> diff(loads);
-    std::transform(diff.begin(), diff.end(), diff.begin(),
-                   bind(std::minus<double>(), _1, load_avg));
-    double epsilon = std::accumulate(diff.begin(), diff.end(), 0.0);
-    double square_sum = std::inner_product(diff.begin(), diff.end(),
-                                           diff.begin(), 0.0);
-    double variance = (square_sum - (epsilon * epsilon) / n) / n;
-    load_stddev = sqrt(variance);
-}
+#define PR_STATS(descr, st)                                             \
+    INFO5("| %.*s: %g / %g / %g", 39,                                   \
+          descr " total/avg./stddev. at exit.........................", \
+          st.get_sum(), st.get_mean(), st.get_stddev())
 
 int main(int argc, char* argv[])
 {
@@ -175,15 +183,26 @@ int main(int argc, char* argv[])
             MSG_launch_application(opt::deployment_file.c_str());
         }
 
+        // Register tracing categories
+        TRACE_category(TRACE_CAT_COMP);
+        TRACE_category(TRACE_CAT_CTRL);
+        TRACE_category(TRACE_CAT_DATA);
+
         exit_status = EXIT_FAILURE_SIMU; // =====
 
+        proc_mutex = xbt_mutex_init();
+        proc_cond = xbt_cond_init();
+        proc_counter = 0;
+
         // Launch the MSG simulation.
         INFO1("Starting simulation at %f...", MSG_get_clock());
         res = MSG_main();
         simulated_time = MSG_get_clock();
         INFO1("Simulation ended at %f.", simulated_time);
-        check_for_lost_load();
-        compute_load_imbalance();
+
+        xbt_cond_destroy(proc_cond);
+        xbt_mutex_destroy(proc_mutex);
+
         if (res != MSG_OK)
             THROW1(0, 0, "MSG_main() failed with status %#x", res);
 
@@ -209,10 +228,13 @@ int main(int argc, char* argv[])
     // Report final simulation status.
     if (simulated_time >= 0.0) {
         simulation_time.stop();
+        check_for_lost_load();
         INFO0(",----[ Results ]");
-        INFO2("| Load avg./stddev. at exit.: %g / %g", load_avg, load_stddev);
-        INFO1("| Total simulated time......: %g", simulated_time);
-        INFO1("| Total simulation time.....: %g", simulation_time.duration());
+        PR_STATS("Load", loads);
+        PR_STATS("Computation", comps);
+        INFO1("| Total simulated time...................: %g", simulated_time);
+        INFO1("| Total simulation time..................: %g",
+              simulation_time.duration());
         INFO0("`----");
     }
     if (exit_status)
index c45fc8f3c4daa41627b3ec87f2a7ce72d9c85f9e..752378312ef2a4ee0fad36cefdb89e221aba8bfd 100644 (file)
@@ -8,7 +8,7 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc); // needed to compile neighbor.h
 #include "neighbor.h"
 
 neighbor::neighbor(const char* hostname)
-    : host((hostdata* )MSG_host_get_data(MSG_get_host_by_name(hostname)))
+    : host(static_cast<hostdata*>(MSG_host_get_data(MSG_get_host_by_name(hostname))))
     , load(std::numeric_limits<double>::infinity())
     , debt(0.0)
     , to_send(0.0)
index 62b9b66bf90488b95a5c53f509cbbdfcff281401..b51dab1e19f0ee5bceb796f63467f84d43356149 100644 (file)
@@ -18,7 +18,6 @@ public:
     // Getter and setter for load
     double get_load() const             { return load;    }
     void set_load(double amount)        { load = amount;  }
-    void add_load(double amount)        { load += amount; }
 
     // Getter and setter for debt
     double get_debt() const             { return debt;   }
@@ -27,7 +26,6 @@ public:
     // Getter and setter for to_send
     double get_to_send() const          { return to_send;    }
     void set_to_send(double amount)     { to_send = amount;  }
-    void add_to_send(double amount)     { to_send += amount; }
 
     // Prints its name and load on given category, with given
     // priority.  If verbose is true, prints debt and to_send too.
index fd9b8f7c435ba6c071b355aa40328f9fccb62b1f..fd9da4eb8b05dd0b3d1d0f0246dac670ee3a1422 100644 (file)
@@ -7,6 +7,8 @@
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main);
 
+#include "deployment.h"
+#include "process.h"
 #include "loba_simple.h"
 #include "loba_fairstrategy.h"
 
@@ -45,11 +47,16 @@ namespace opt {
     bool bookkeeping = false;
 
     // Application parameters
-    cost_func comp_cost("1e9, 0"); // fixme: find better defaults
-    cost_func comm_cost("1, 0"); // fixme: find better defaults
-    unsigned comp_maxiter = 10;  // fixme: find better defaults
-    unsigned lb_maxiter = comp_maxiter; // fixme: find better defaults
-    bool exit_on_close = false;
+    // fixme: find better defaults
+    cost_func comp_cost("1e9, 0");
+    cost_func comm_cost("1, 0");
+    double min_iter_duration = 1.0;
+
+    // Parameters for the end of the simulation
+    unsigned lb_maxiter = 0;
+    unsigned comp_maxiter = 0;
+    double time_limit = 0;
+    bool exit_on_close = true;
 
     // Named parameters lists
     loba_algorithms_type loba_algorithms;
@@ -105,7 +112,7 @@ const char* opt_helper::on_off(bool b)
 
 const char* opt_helper::descr(const char* str)
 {
-    const int descr_width = 35;
+    const int descr_width = 40;
     std::string& res = descr_str;
     res = str;
     res.resize(descr_width, '.');
@@ -170,7 +177,7 @@ bool opt::parse_args(int* argc, char* argv[])
     
     int c;
     opterr = 0;
-    while ((c = getopt(*argc, argv, "a:bc:C:ehi:I:l:L:N:s:T:vV")) != -1) {
+    while ((c = getopt(*argc, argv, "a:bc:C:ehi:I:l:L:N:s:t:T:vV")) != -1) {
         switch (c) {
         case 'a':
             opt::loba_algo = optarg;
@@ -180,10 +187,10 @@ bool opt::parse_args(int* argc, char* argv[])
                 && result;
             break;
         case 'b':
-            opt::bookkeeping = true;
+            opt::bookkeeping = !opt::bookkeeping;
             break;
         case 'e':
-            opt::exit_on_close = true;
+            opt::exit_on_close = !opt::exit_on_close;
             break;
         case 'h':
             opt::help_requested++;
@@ -195,12 +202,10 @@ bool opt::parse_args(int* argc, char* argv[])
             opt::comm_cost = cost_func(optarg);
             break;
         case 'i':
-            std::istringstream(optarg) >> opt::comp_maxiter;
+            std::istringstream(optarg) >> opt::lb_maxiter;
             break;
         case 'I':
-            std::istringstream(optarg) >> opt::lb_maxiter;
-            ERROR0("option -I not implemented yet");
-            result = false;
+            std::istringstream(optarg) >> opt::comp_maxiter;
             break;
         case 'l':
             std::istringstream(optarg) >> opt::log_rate;
@@ -212,7 +217,10 @@ bool opt::parse_args(int* argc, char* argv[])
             std::istringstream(optarg) >> opt::auto_depl::nhosts;
             break;
         case 's':
-            std::istringstream(optarg) >> communicator::send_count_before_flush;
+            std::istringstream(optarg) >> opt::min_iter_duration;
+            break;
+        case 't':
+            std::istringstream(optarg) >> opt::time_limit;
             break;
         case 'T':
             opt::auto_depl::topology = optarg;
@@ -264,29 +272,29 @@ void opt::print()
     INFO2("| %s: " format, h.descr(description), value)
 
     INFO0(",----[ Simulation parameters ]");
-    DESCR("log rate", "%s",          h.val_or_string(log_rate, "disabled"));
+    DESCR("log rate", "%s", h.val_or_string(log_rate, "disabled"));
     DESCR("platform file", "\"%s\"", platform_file.c_str());
     if (auto_depl::enabled) {
         INFO0("| automatic deployment enabled");
-        DESCR("- topology", "%s",          auto_depl::topology.c_str());
-        DESCR("- number of hosts", "%s",   h.val_or_string(auto_depl::nhosts,
-                                                           "auto"));
-        DESCR("- initial load", "%s",      h.val_or_string(auto_depl::load,
-                                                           "auto"));
+        DESCR("- topology", "%s", auto_depl::topology.c_str());
+        DESCR("- number of hosts", "%s", h.val_or_string(auto_depl::nhosts,
+                                                         "auto"));
+        DESCR("- initial load", "%s", h.val_or_string(auto_depl::load,
+                                                      "auto"));
     } else {
         DESCR("deployment file", "\"%s\"", deployment_file.c_str());
     }
-    DESCR("load balancing algorithm", "%s",     loba_algo.c_str());
-    DESCR("bookkeeping", "%s",                  h.on_off(bookkeeping));
-    DESCR("computation cost factors", "[%s]",   comp_cost.to_string().c_str());
+    DESCR("load balancing algorithm", "%s", loba_algo.c_str());
+    DESCR("bookkeeping", "%s", h.on_off(bookkeeping));
+    DESCR("computation cost factors", "[%s]", comp_cost.to_string().c_str());
     DESCR("communication cost factors", "[%s]", comm_cost.to_string().c_str());
-    DESCR("maximum number of comp. iterations", "%s",
-          h.val_or_string(comp_maxiter, "infinity"));
+    DESCR("minimum duration between iterations", "%g", min_iter_duration);
     DESCR("maximum number of lb. iterations", "%s",
           h.val_or_string(lb_maxiter, "infinity"));
-    DESCR("exit on close", "%s",                h.on_off(exit_on_close));
-    DESCR("send count before flush", "%d",
-          communicator::send_count_before_flush);
+    DESCR("maximum number of comp. iterations", "%s",
+          h.val_or_string(comp_maxiter, "infinity"));
+    DESCR("time limit", "%s", h.val_or_string(time_limit, "infinity"));
+    DESCR("exit on close", "%s", h.on_off(exit_on_close));
     INFO0("`----");
 
 #undef DESCR
@@ -322,45 +330,55 @@ void opt::usage()
 
     std::clog << "\nSimulation parameters\n";
     std::clog << o("-l value")
-              << "print current load every n-th iterations, 0 to disable"
-              << " (" << opt::log_rate << ")\n";
+              << "print current load every n lb iterations, 0 to disable"
+              << " [" << opt::log_rate << "]\n";
     std::clog << o("-v")
               << "verbose: do not override the default logging parameters\n";
 
     std::clog << "\nAutomatic deployment options\n";
     std::clog << o("-T name")
               << "enable automatic deployment with selected topology"
-              << " (" << opt::auto_depl::topology << ")\n";
+              << " [" << opt::auto_depl::topology << "]\n";
     if (opt::help_requested > 1)
         so_list(opt::topologies);
     std::clog << o("-L value")
               << "total load with auto deployment, 0 for number of hosts"
-              << " (" << opt::auto_depl::load << ")\n";
+              << " [" << opt::auto_depl::load << "]\n";
     std::clog << o("-N value")
-              << "number of hosts to use with auto deployment,"
-              << " 0 for max. (" << opt::auto_depl::nhosts << ")\n";
+              << "number of hosts to use with auto deployment, 0 for max."
+              << " [" << opt::auto_depl::nhosts << "]\n";
 
     std::clog << "\nLoad balancing algorithm\n";
     std::clog << o("-a name") << "load balancing algorithm"
-              << " (" << opt::loba_algo << ")\n";
+              << " [" << opt::loba_algo << "]\n";
     if (opt::help_requested > 1)
         so_list(opt::loba_algorithms);
-    std::clog << o("-b") << "enable bookkeeping\n";
+    std::clog << o("-b") << "toggle bookkeeping (\"virtual load\")"
+              << " [" << opt_helper::on_off(opt::bookkeeping) << "]\n";
 
     std::clog << "\nApplication parameters\n";
     std::clog << o("-c [fn,...]f0")
               << "polynomial factors for computation cost"
-              << " (" << opt::comp_cost.to_string() << ")\n";
+              << " [" << opt::comp_cost.to_string() << "]\n";
     std::clog << o("-C [fn,...]f0")
               << "polynomial factors for communication cost"
-              << " (" << opt::comm_cost.to_string() << ")\n";
-    std::clog << o("-e") << "exit on reception of \"close\" message\n";
+              << " [" << opt::comm_cost.to_string() << "]\n";
+    std::clog << o("-s value")
+              << "minimum duration between iterations"
+              << " [" << opt::min_iter_duration << "]\n";
+
+    std::clog << "\nParameters for the end of the simulation\n";
     std::clog << o("-i value")
-              << "maximum number of comp. iterations, 0 for infinity"
-              << " (" << opt::comp_maxiter << ")\n";
-    std::clog << o("-I value")
               << "maximum number of lb. iterations, 0 for infinity"
-              << " (" << opt::lb_maxiter << ")\n";
+              << " [" << opt::lb_maxiter << "]\n";
+    std::clog << o("-I value")
+              << "maximum number of comp. iterations, 0 for infinity"
+              << " [" << opt::comp_maxiter << "]\n";
+    std::clog << o("-t value")
+              << "time limit (simulated time), 0 for infinity"
+              << " [" << opt::time_limit << "]\n";
+    std::clog << o("-e") << "toggle exit on reception of \"close\" message"
+              << " [" << opt_helper::on_off(opt::exit_on_close) << "]\n";
 
     if (opt::help_requested < 3)
         return;
@@ -376,10 +394,7 @@ void opt::usage()
               << "        proc : messages from base process class\n"
               << "        loba : messages from load-balancer\n";
 
-    std::clog << "\nMiscellaneous low-level parameters\n";
-    std::clog << o("-s count")
-              << "check for finished comm. every `count' send operation"
-              << " (" << communicator::send_count_before_flush << ")\n";
+    // std::clog << "\nMiscellaneous low-level parameters\n";
 
 #undef so_list
 #undef so
index 37a4d8c71159873f99b7c691cc8a5359ed768da4..fe0df828d79247c0cf68d282213dec4d4b02b55a 100644 (file)
--- a/options.h
+++ b/options.h
@@ -3,9 +3,11 @@
 
 #include <string>
 #include "cost_func.h"
-#include "deployment.h"
 #include "named_object_list.h"
-#include "process.h"
+
+// These classes may use include options.h, so make forward declarations
+class deployment_generator;
+class process;
 
 // Global parameters, shared by all the processes
 namespace opt {
@@ -40,8 +42,12 @@ namespace opt {
     // Application parameters
     extern cost_func comp_cost;
     extern cost_func comm_cost;
-    extern unsigned comp_maxiter;
+    extern double min_iter_duration;
+
+    // Parameters for the end of the simulation
     extern unsigned lb_maxiter;
+    extern unsigned comp_maxiter;
+    extern double time_limit;
     extern bool exit_on_close;
 
     // Named parameters lists
index 53cbb59c7434da4a84beb5d2971e8c98ea7ae910..57c06612ad84bb9cffaa07123e8c113b0148fad9 100644 (file)
@@ -1,7 +1,6 @@
 #include <algorithm>
 #include <tr1/functional>
 #include <iterator>
-#include <numeric>
 #include <stdexcept>
 #include <sstream>
 #include <xbt/log.h>
@@ -11,6 +10,7 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc);
 
 #include "misc.h"
 #include "options.h"
+#include "tracing.h"
 
 #include "process.h"
 
@@ -20,7 +20,7 @@ double process::total_load_exit = 0.0;
 
 process::process(int argc, char* argv[])
 {
-    if (argc < 2 || !(std::istringstream(argv[1]) >> load))
+    if (argc < 2 || !(std::istringstream(argv[1]) >> real_load))
         throw std::invalid_argument("bad or missing initial load parameter");
 
     neigh.assign(argv + 2, argv + argc);
@@ -33,15 +33,19 @@ process::process(int argc, char* argv[])
         rev_neigh.insert(std::make_pair(host, ptr));
     }
 
-    prev_load_broadcast = -1;   // force sending of load on first send()
-    expected_load = load;
-    total_load_running += load;
-    total_load_init += load;
+    comp = 0.0;
+
+    prev_load_broadcast = -1;   // force sending of load on first send_all()
+    expected_load = real_load;
+    total_load_running += real_load;
+    total_load_init += real_load;
 
     ctrl_close_pending = data_close_pending = neigh.size();
     close_received = false;
     finalizing = false;
 
+    comp_iter = lb_iter = 0;
+
     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
     if (!LOG_ISENABLED(logp))
         return;
@@ -60,70 +64,101 @@ process::process(int argc, char* argv[])
 
 process::~process()
 {
-    total_load_exit += load;
+    total_load_exit += real_load;
+    if (opt::bookkeeping) {
+        INFO4("Final load after %d:%d iterations: %g ; expected: %g",
+              lb_iter, comp_iter, real_load, expected_load);
+    } else {
+        INFO2("Final load after %d iterations: %g",
+              lb_iter, real_load);
+        if (lb_iter != comp_iter)
+            WARN2("lb_iter (%d) and comp_iter (%d) differ!",
+                  lb_iter, comp_iter);
+    }
+    VERB1("Total computation for this process: %g", comp);
 }
 
 int process::run()
 {
-    INFO1("Initial load: %g", load);
+    double next_iter_after_date = 0.0;
+    INFO1("Initial load: %g", real_load);
     VERB0("Starting...");
-    comp_iter = lb_iter = 0;
     while (true) {
-        if (load > 0.0) {
-            ++comp_iter;
-            if (opt::log_rate && comp_iter % opt::log_rate == 0) {
+        if (get_load() > 0.0) {
+            double now = MSG_get_clock();
+            if (now < next_iter_after_date)
+                MSG_process_sleep(next_iter_after_date - now);
+            next_iter_after_date = MSG_get_clock() + opt::min_iter_duration;
+
+            ++lb_iter;
+
+            if (opt::log_rate && lb_iter % opt::log_rate == 0) {
                 if (opt::bookkeeping)
                     INFO4("(%u:%u) current load: %g ; expected: %g",
-                          comp_iter, lb_iter, load, expected_load);
+                          lb_iter, comp_iter, real_load, expected_load);
                 else
                     INFO2("(%u) current load: %g",
-                          comp_iter, load);
+                          lb_iter, real_load);
             }
 
-            if (opt::bookkeeping)
-                expected_load -= load_balance(expected_load);
-            else
-                load -= load_balance(load);
+            load_balance();
 
             print_loads(true, xbt_log_priority_debug);
+        }
 
-            send();
+        // send load information, and load (data) if any
+        send_all();
+        if (real_load > 0.0) {
+            ++comp_iter;
             compute();
-
-        } else {
-            // send load information, and load when bookkeeping
-            send();
         }
 
-        if (opt::comp_maxiter && comp_iter >= opt::comp_maxiter)
+        if (opt::lb_maxiter && lb_iter >= opt::lb_maxiter) {
+            VERB2("Reached lb_maxiter: %d/%d", lb_iter, opt::lb_maxiter);
+            break;
+        }
+        if (opt::comp_maxiter && comp_iter >= opt::comp_maxiter) {
+            VERB2("Reached comp_maxiter: %d/%d", comp_iter, opt::comp_maxiter);
             break;
-        if (opt::lb_maxiter && lb_iter >= opt::lb_maxiter)
+        }
+        if (opt::time_limit && MSG_get_clock() >= opt::time_limit) {
+            VERB2("Reached time limit: %g/%g", MSG_get_clock(), opt::time_limit);
             break;
+        }
 
         // block on receiving unless there is something to compute or
         // to send
-        bool wait = (load == 0 &&
-                     ((opt::bookkeeping ? expected_load : load)
-                      == prev_load_broadcast));
-        receive(wait);
+        double timeout;
+        if (real_load != 0 || get_load() != prev_load_broadcast)
+            timeout = 0.0;
+        else if (opt::min_iter_duration)
+            timeout = opt::min_iter_duration;
+        else
+            timeout = 1.0;
+        receive(timeout);
 
         // one of our neighbor is finalizing
-        if (opt::exit_on_close && close_received)
+        if (opt::exit_on_close && close_received) {
+            VERB0("Close received");
             break;
+        }
 
         // have no load and cannot receive anything
-        if (load == 0.0 && !may_receive())
+        if (real_load == 0.0 && !may_receive()) {
+            VERB0("I'm a poor lonesome process, and I have no load...");
             break;
+        }
 
         // fixme: this check should be implemented with a distributed
         // algorithm, and not a shared global variable!
         // fixme: should this chunk be moved before call to receive() ?
         if (100.0 * total_load_running / total_load_init <=
             opt::load_ratio_threshold) {
-            VERB0("No more load to balance in system, stopping.");
+            VERB0("No more load to balance in system.");
             break;
+        } else {
+            DEBUG1("still %g load to balance, continuing...", total_load_running);
         }
-
     }
     VERB0("Going to finalize...");
     finalize();
@@ -135,48 +170,41 @@ int process::run()
      */
 
     VERB0("Done.");
-    INFO3("Final load after %d iteration%s: %g",
-          comp_iter, ESSE(comp_iter), load);
-    if (opt::bookkeeping)
-        INFO1("Expected load: %g", expected_load);
     return 0;
 }
 
-double process::sum_of_to_send() const
-{
-    using std::tr1::bind;
-    using std::tr1::placeholders::_1;
-    using std::tr1::placeholders::_2;
-
-    return std::accumulate(neigh.begin(), neigh.end(), 0.0,
-                           bind(std::plus<double>(),
-                                _1, bind(&neighbor::get_to_send, _2)));
-}
-
-double process::load_balance(double /*my_load*/)
+void process::load_balance()
 {
     if (lb_iter == 1)           // warn only once
-        WARN0("process::load_balance is a no-op!");
-    return 0.0;
+        WARN0("process::load_balance() is a no-op!");
 }
 
 void process::compute()
 {
-    if (load > 0.0) {
-        double duration = opt::comp_cost(load);
-        m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
-        DEBUG2("compute %g flop%s", duration, ESSE(duration));
+    if (real_load > 0.0) {
+        double flops = opt::comp_cost(real_load);
+        m_task_t task = MSG_task_create("computation", flops, 0.0, NULL);
+        TRACE_msg_set_task_category(task, TRACE_CAT_COMP);
+        DEBUG2("compute %g flop%s", flops, ESSE(flops));
         MSG_task_execute(task);
+        comp += flops;
         MSG_task_destroy(task);
     } else {
         DEBUG0("nothing to compute !");
     }
 }
 
+void process::send(neighbor& nb, double amount)
+{
+    set_load(get_load() - amount);
+    nb.set_to_send(nb.get_to_send() + amount);
+    nb.set_load(nb.get_load() + amount); // fixme: make this optional?
+}
+
 void process::send1_no_bookkeeping(neighbor& nb)
 {
-    if (load != prev_load_broadcast)
-        comm.send(nb.get_ctrl_mbox(), new message(message::INFO, load));
+    if (real_load != prev_load_broadcast)
+        comm.send(nb.get_ctrl_mbox(), new message(message::INFO, real_load));
     double load_to_send = nb.get_to_send();
     if (load_to_send > 0.0) {
         comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
@@ -200,20 +228,20 @@ void process::send1_bookkeeping(neighbor& nb)
     } else {
         new_debt = nb.get_debt();
     }
-    if (load <= new_debt) {
-        load_to_send = load;
+    if (real_load <= new_debt) {
+        load_to_send = real_load;
         nb.set_debt(new_debt - load_to_send);
-        load = 0.0;
+        real_load = 0.0;
     } else {
         load_to_send = new_debt;
         nb.set_debt(0.0);
-        load -= load_to_send;
+        real_load -= load_to_send;
     }
     if (load_to_send > 0.0)
         comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
 }
 
-void process::send()
+void process::send_all()
 {
     using std::tr1::bind;
     using std::tr1::placeholders::_1;
@@ -225,16 +253,18 @@ void process::send()
     } else {
         std::for_each(neigh.begin(), neigh.end(),
                       bind(&process::send1_no_bookkeeping, this, _1));
-        prev_load_broadcast = load;
+        prev_load_broadcast = real_load;
     }
+    comm.flush(false);
 }
 
-void process::receive(bool wait)
+void process::receive(double timeout)
 {
     message* msg;
     m_host_t from;
 
-    while (may_receive() && comm.recv(msg, from, wait)) {
+    DEBUG2("%sblocking receive (%g)", "\0non-" + !timeout, timeout);
+    while (may_receive() && comm.recv(msg, from, timeout)) {
         switch (msg->get_type()) {
         case message::INFO: {
             neighbor* n = rev_neigh[from];
@@ -246,7 +276,7 @@ void process::receive(bool wait)
             break;
         case message::LOAD: {
             double ld = msg->get_amount();
-            load += ld;
+            real_load += ld;
             if (finalizing)
                 total_load_running -= ld;
             break;
@@ -261,8 +291,9 @@ void process::receive(bool wait)
             break;
         }
         delete msg;
-        wait = false;           // only wait on first recv
+        timeout = 0.0;          // only wait on first recv
     }
+    comm.flush(false);
 }
 
 void process::finalize1(neighbor& nb)
@@ -277,17 +308,19 @@ void process::finalize()
     using std::tr1::placeholders::_1;
 
     finalizing = true;
-    total_load_running -= load;
+    total_load_running -= real_load;
 
     DEBUG2("send CLOSE to %lu neighbor%s",
            (unsigned long )neigh.size(), ESSE(neigh.size()));
     std::for_each(neigh.begin(), neigh.end(),
                   bind(&process::finalize1, this, _1));
 
-    DEBUG2("wait for CLOSE from %lu neighbor%s",
-           (unsigned long )neigh.size(), ESSE(neigh.size()));
-    while (may_receive())
-        receive(true);
+    while (may_receive()) {
+        comm.flush(false);
+        DEBUG2("waiting for %d CTRL and %d DATA CLOSE",
+               ctrl_close_pending, data_close_pending);
+        receive(-1.0);
+    }
 
     comm.flush(true);
 }
index b15f98297cc540481516b4fa0bd3052e5a4d57f2..24d4c395668b2d4209abaf6ddc5b4ebf8c315aa0 100644 (file)
--- a/process.h
+++ b/process.h
@@ -16,6 +16,7 @@
 #include <xbt/log.h>
 #include "communicator.h"
 #include "neighbor.h"
+#include "options.h"
 
 class process {
 public:
@@ -26,7 +27,8 @@ public:
     process(int argc, char* argv[]);
     virtual ~process();
 
-    double get_load() const                { return load; }
+    double get_comp() const                { return comp; }
+    double get_real_load() const           { return real_load; }
 
     int run();
 
@@ -37,8 +39,14 @@ protected:
     pneigh_type pneigh;         // list of pointers to neighbors that
                                 // we are free to reorder
 
-    // Returns the sum of "to_send" for all neighbors.
-    double sum_of_to_send() const;
+    // Get and set current load, which may be real load, or expected
+    // load if opt::bookkeeping is true.
+    double get_load() const;
+    void set_load(double load);
+
+    // Register some amount of load to send to given neighbor.
+    void send(neighbor& nb, double amount);
+    void send(neighbor* nb, double amount) { send(*nb, amount); }
 
     // Calls neighbor::print(verbose, logp, cat) for each member of neigh.
     void print_loads(bool verbose = false,
@@ -52,7 +60,7 @@ protected:
 
 private:
     static double total_load_init; // sum of process loads at init
-    static double total_load_running; // summ of loads while running
+    static double total_load_running; // sum of loads while running
     static double total_load_exit; // sum of process loads at exit
 
     typedef MAP_TEMPLATE<m_host_t, neighbor*> rev_neigh_type;
@@ -71,16 +79,15 @@ private:
     unsigned lb_iter;           // counter of load-balancing iterations
     unsigned comp_iter;         // counter of computation iterations
 
+    double comp;                // total computing done so far (flops)
+
     double prev_load_broadcast; // used to ensure that we do not send
                                 // a same information messages
-    double load;                // current load
+    double real_load;           // current load
     double expected_load;       // expected load in bookkeeping mode
 
     // The load balancing algorithm comes here...
-    // Parameter "my_load" is the load to take into account for myself
-    // (may be load or expected load).
-    // Returns the total load sent to neighbors.
-    virtual double load_balance(double my_load);
+    virtual void load_balance();
 
     // Virtually do some computation
     void compute();
@@ -88,13 +95,17 @@ private:
     // Send procedures, with helpers for bookkeeping mode or not
     void send1_no_bookkeeping(neighbor& nb);
     void send1_bookkeeping(neighbor& nb);
-    void send();
+    void send_all();
 
     // Returns true if there remains neighbors to listen for
-    bool may_receive() { return ctrl_close_pending || data_close_pending; }
+    bool may_receive() const {
+        return ctrl_close_pending || data_close_pending;
+    }
 
-    // Receive procedure: wait (or not) for a message to come
-    void receive(bool wait);
+    // Receive procedure
+    // Parameter "timeout" may be 0 for non-blocking operation, -1 for
+    // infinite waiting, or any positive timeout.
+    void receive(double timeout);
 
     // Finalize sends a "close" message to each neighbor and wait for
     // all of them to answer.
@@ -102,6 +113,24 @@ private:
     void finalize();
 };
 
+inline
+double process::get_load() const
+{
+    if (opt::bookkeeping)
+        return expected_load;
+    else
+        return real_load;
+}
+
+inline
+void process::set_load(double load)
+{
+    if (opt::bookkeeping)
+        expected_load = load;
+    else
+        real_load = load;
+}
+
 #endif // !PROCESS_H
 
 // Local variables:
diff --git a/statistics.h b/statistics.h
new file mode 100644 (file)
index 0000000..7e3fb2e
--- /dev/null
@@ -0,0 +1,41 @@
+#ifndef STATISTICS_H
+#define STATISTICS_H
+
+#include <cmath>
+#include <vector>
+
+class statistics {
+public:
+    statistics()
+        : count(0)
+        , sum(0.0)
+        , mean(0.0)
+        , sqdiff_sum(0.0)
+    { }
+
+    void push(double x) {
+        double delta = x - mean;
+        ++count;
+        sum += x;
+        mean = sum / count;
+        sqdiff_sum += delta * (x - mean);
+    }
+
+    unsigned get_count() const  { return count;                }
+    double get_sum() const      { return sum;                  }
+    double get_mean() const     { return mean;                 }
+    double get_variance() const { return sqdiff_sum / count;   }
+    double get_stddev() const   { return sqrt(get_variance()); }
+
+private:
+    int count;
+    double sum;                 // sum of x_i
+    double mean;                // mean of x_i
+    double sqdiff_sum;          // sum of (x_i - mean)^2
+};
+
+#endif // !STATISTICS_H
+
+// Local variables:
+// mode: c++
+// End:
diff --git a/tracing.h b/tracing.h
new file mode 100644 (file)
index 0000000..f283465
--- /dev/null
+++ b/tracing.h
@@ -0,0 +1,12 @@
+#ifndef TRACING_H
+#define TRACING_H
+
+#define TRACE_CAT_COMP "comp_task"
+#define TRACE_CAT_CTRL "ctrl_mesg"
+#define TRACE_CAT_DATA "data_mesg"
+
+#endif // !TRACING_H
+
+// Local variables:
+// mode: c++
+// End:
index 0baec30680a792b61c2b3a61d3385a30db51dd75..e1e255219084767f91a4d7df44e5ecef8bc72957 100644 (file)
@@ -8,3 +8,20 @@
    fun:MSG_create_environment
    ...
 }
+
+{
+   Memory leak in libc?
+   Memcheck:Leak
+   fun:malloc
+   fun:_dl_map_object_deps
+   fun:dl_open_worker
+   fun:_dl_catch_error
+   fun:_dl_open
+   fun:do_dlopen
+   fun:_dl_catch_error
+   fun:dlerror_run
+   fun:__libc_dlopen_mode
+   fun:init
+   fun:pthread_once
+   fun:backtrace
+}
index 344386d2db19faea5c60cafb8c24156d07db8832..6b6889abcba611a5c2b6a26fb25557838aa441a7 100644 (file)
@@ -14,7 +14,7 @@ namespace version {
     (__DATE__ " " __TIME__);
 
     const std::string copyright
-    ("Copyright (c) 2010, Arnaud Giersch <arnaud.giersch@univ-fcomte.fr>");
+    ("Copyright (c) 2010-2011, Arnaud Giersch <arnaud.giersch@univ-fcomte.fr>");
 
 }