]> AND Private Git Repository - loba.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Deadlock fix, and other changes.
authorArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Tue, 21 Dec 2010 16:26:44 +0000 (17:26 +0100)
committerArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Tue, 21 Dec 2010 17:03:50 +0000 (18:03 +0100)
Fix a deadlock occuring when there was no load anymore in the system.
Each remaining process had nothing to compute, nor to send to their
neighbors, and were blocked waiting for an incoming message.

The fix consists in:
* adding a shared global variable total_load_running, the sum of loads
  currently in the system ; and
* ensuring this variable is always up-to-date ; and
* making processes terminate if total_load_running is null.

The use of a global variable is not satisfactory, but it is good
enough for now.

It is also verified, at the end of the simulation, that
total_load_running is null.

Other important changes in this commit are:
* process::receive() now consumes all pending messages (it used to
  consume only one).
* The iteration number is only incremented when there is something to
  compute (load > 0.0).

Note: bookkeeping version may be broken.

TODO
communicator.cpp
main.cpp
options.cpp
options.h
process.cpp
process.h

diff --git a/TODO b/TODO
index fa61fbbe37f4870f8380500f5343fd61839755b5..1d5fe423b8e3bd59936f2f6c6a01a605101fb5d6 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,17 +1,14 @@
+* verify bookkeeping version.
 
-* fix deadlock bug with  ./loba  cluster1000.xml -N26 -i2
+* add a variant to (not) change neighbor load information at send.
 
 * implement loba_* algorithms (start with some trivial one)
 
-* fix process::run (see inline comments)
-
-* find a better 
-
-* add some statistics about load (im)balance at the end of the simulation
+* add some statistics about load (im)balance at the end of the simulation?
 
 * for automatic process topology,
    -> implement some random initial distribution of load
 
 * add synchronized mode
 
-* translate README file ?
+* translate README file?
index 054c98a6c53078d1dcd16b81be2ab889e1e9f588..1570b10148a02dbb2fdfa7a78444d2cae1309b6a 100644 (file)
@@ -20,7 +20,7 @@ std::string message::to_string()
     return oss.str();
 }
 
-const int communicator::send_count_before_flush = 16;
+const int communicator::send_count_before_flush = 4;
 
 communicator::communicator()
     : host((hostdata* )MSG_host_get_data(MSG_host_self()))
index fe584f725f6518347b6e4b096ff10198888bb8a0..01bf3ac34926c11459e62f4b925ce313851b7204 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -48,21 +48,31 @@ int simulation_main(int argc, char* argv[])
 
 void check_for_lost_load()
 {
-    const double threshold = 1e-4;
     double total_init = process::get_total_load_init();
+
     double total_exit = process::get_total_load_exit();
     double lost = total_init - total_exit;
-    double lost_ratio = 100 * lost / total_init;
-    if (lost_ratio < -threshold) {
+    double lost_ratio = 100.0 * lost / total_init;
+    if (lost_ratio < -opt::load_ratio_threshold)
         CRITICAL2("Gained load at exit! %g (%g%%) <============",
                   lost, lost_ratio);
-    } else if (lost_ratio > threshold) {
+    else if (lost_ratio > opt::load_ratio_threshold)
         CRITICAL2("Lost load at exit! %g (%g%%) <============",
                   lost, lost_ratio);
-    } else {
+    else
         DEBUG2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio);
-    }
 
+    double total_running = process::get_total_load_running();
+    double running_ratio = 100.0 * total_running / total_init;
+    if (running_ratio < -opt::load_ratio_threshold)
+        CRITICAL2("Negative running load at exit! %g (%g%%) <============",
+                  total_running, running_ratio);
+    else if (running_ratio > opt::load_ratio_threshold)
+        CRITICAL2("Remaining running load at exit! %g (%g%%) <============",
+                  total_running, running_ratio);
+    else
+        DEBUG2("Running load at exit looks good: %g (%g%%)",
+               total_running, running_ratio);
 }
 
 int main(int argc, char* argv[])
index cf9d7b70bcfdbc6cd14bf08559981fb9d86768d3..2c8e61d672b7c82adfadbbf233805f06f193c225 100644 (file)
@@ -13,6 +13,12 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main);
 
 namespace opt {
 
+    // Constants
+
+    // A sum of loads if considered null if it is less than
+    // load_ratio_threshold percent of the sum of loads at init.
+    const double load_ratio_threshold = 1e-4;
+
     // Global options
     std::string program_name;
     int help_requested = 0;
index e751535bd8f0b3a3a1e9cb2ede251e57a8818103..5c43e6f2962420d2d35e82047227efeb6afe85bd 100644 (file)
--- a/options.h
+++ b/options.h
@@ -10,6 +10,9 @@
 // Global parameters, shared by all the processes
 namespace opt {
 
+    // Constants
+    extern const double load_ratio_threshold;
+
     // Global options
     extern std::string program_name;
     extern int help_requested;
index a24d6ebcbfb5eae01f098611a3373a00f06a872f..8b316787b8ae1d8654e0f941337a64c993573f24 100644 (file)
@@ -15,6 +15,7 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc);
 #include "process.h"
 
 double process::total_load_init = 0.0;
+double process::total_load_running = 0.0;
 double process::total_load_exit = 0.0;
 
 process::process(int argc, char* argv[])
@@ -34,6 +35,7 @@ process::process(int argc, char* argv[])
 
     prev_load_broadcast = -1;   // force sending of load on first send()
     expected_load = load;
+    total_load_running += load;
     total_load_init += load;
 
     ctrl_close_pending = data_close_pending = neigh.size();
@@ -44,6 +46,7 @@ process::process(int argc, char* argv[])
     close_received = false;
     may_receive =  (neigh.size() > 0); // the same as (ctrl_close_pending ||
                                        //              data_close_pending)
+    finalizing = false;
     if (may_receive)
         comm.listen();
 
@@ -60,7 +63,7 @@ process::process(int argc, char* argv[])
         oss << neigh.back().get_name();
     }
     LOG1(logp, "Got %s.", oss.str().c_str());
-    print_loads(true, logp);
+    print_loads(false, logp);
 }
 
 process::~process()
@@ -73,52 +76,60 @@ int process::run()
     INFO1("Initial load: %g", load);
     VERB0("Starting...");
     iter = 0;
-    bool one_more = true;
-    do {
-        ++iter;
+    while (true) {
+        if (load > 0.0) {
+            ++iter;
+            if (opt::log_rate && iter % opt::log_rate == 0) {
+                if (opt::bookkeeping)
+                    INFO3("(%u) current load: %g ; expected: %g",
+                          iter, load, expected_load);
+                else
+                    INFO2("(%u) current load: %g",
+                          iter, load);
+            }
 
-        if (opt::log_rate && iter % opt::log_rate == 0) {
             if (opt::bookkeeping)
-                INFO3("(%u) current load: %g ; expected: %g",
-                      iter, load, expected_load);
+                expected_load -= load_balance(expected_load);
             else
-                INFO2("(%u) current load: %g",
-                      iter, load);
-        }
-        print_loads(true, xbt_log_priority_debug);
+                load -= load_balance(load);
 
-        if (opt::bookkeeping)
-            expected_load -= load_balance(expected_load);
-        else
-            load -= load_balance(load);
+            print_loads(true, xbt_log_priority_debug);
 
-        send();
-        compute();
+            send();
+            compute();
 
-// NDS for Need To Send
-#define NDS ((opt::bookkeeping ? expected_load : load) != prev_load_broadcast)
-        do {
-            // General idea: block on receiving unless there is
-            // something to compute, or to send, or we must exit.
+            if (opt::maxiter && iter >= opt::maxiter)
+                break;
+        } else {
+            // send load information, and load when bookkeeping
+            send();
+        }
 
-            // fixme: review this chunk, and remove this NDS macro!
+        // block on receiving unless there is something to compute or
+        // to send
+        bool recv_wait = (load == 0 &&
+                          ((opt::bookkeeping ? expected_load : load)
+                           == prev_load_broadcast));
+        DEBUG1("CALL RECEIVE(%s)", recv_wait? "WAIT": "NO_WAIT");
+        receive(recv_wait? WAIT: NO_WAIT);
 
-            // FIXME: HAD A DEADLOCK HERE...
+        // one of our neighbor is finalizing
+        if (opt::exit_on_close && close_received)
+            break;
 
-            bool recv_wait = (load == 0 && !NDS);
-            DEBUG1("CALL RECEIVE(%s)", recv_wait? "WAIT": "NO_WAIT");
-            receive(recv_wait? WAIT: NO_WAIT);
+        // have no load and cannot receive anything
+        if (load == 0.0 && !may_receive)
+            break;
 
-            if (opt::exit_on_close && close_received)
-                one_more = false;
-            else if (opt::maxiter && iter >= opt::maxiter)
-                one_more = false;
-            
-        } while (one_more && may_receive && load == 0 && !NDS);
-        DEBUG0("RECEIVE LOOP ENDED");
-#undef NDS
+        // fixme: this check should be implemented with a distributed
+        // algorithm, and not a shared global variable!
+        if (100.0 * total_load_running / total_load_init <=
+            opt::load_ratio_threshold) {
+            VERB0("No more load to balance in system, stopping.");
+            break;
+        }
 
-    } while (one_more);
+    }
     VERB0("Going to finalize...");
     finalize();
 
@@ -227,8 +238,8 @@ void process::receive(recv_wait_mode wait)
     //        "NO_WAIT\0WAIT\0\0\0\0WAIT_FOR_CLOSE" + 8 * wait);
     message* msg;
     m_host_t from;
-    bool do_loop = may_receive;
-    while (do_loop && comm.recv(msg, from, wait)) {
+    bool do_wait = (wait != NO_WAIT);
+    while (may_receive && comm.recv(msg, from, do_wait)) {
         switch (msg->get_type()) {
         case message::INFO: {
             neighbor* n = rev_neigh[from];
@@ -238,9 +249,13 @@ void process::receive(recv_wait_mode wait)
         case message::CREDIT:
             expected_load += msg->get_amount();
             break;
-        case message::LOAD:
-            load += msg->get_amount();
+        case message::LOAD: {
+            double ld = msg->get_amount();
+            load += ld;
+            if (finalizing)
+                total_load_running -= ld;
             break;
+        }
         case message::CTRL_CLOSE:
             if (--ctrl_close_pending == 1)
                 comm.next_close_on_ctrl_is_last();
@@ -256,14 +271,14 @@ void process::receive(recv_wait_mode wait)
         }
         delete msg;
         may_receive = (ctrl_close_pending || data_close_pending);
-        do_loop = (wait == WAIT_FOR_CLOSE) && may_receive;
+        do_wait = (wait == WAIT_FOR_CLOSE);
     }
 }
 
 void process::finalize1(neighbor& nb)
 {
     comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
-    comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));    
+    comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
 }
 
 void process::finalize()
@@ -271,6 +286,9 @@ void process::finalize()
     using namespace std::tr1;
     using namespace std::tr1::placeholders;
 
+    finalizing = true;
+    total_load_running -= load;
+
     DEBUG2("send CLOSE to %d neighbor%s.",
            (int )neigh.size(), ESSE(neigh.size()));
     std::for_each(neigh.begin(), neigh.end(),
index c30b24374e0aed763bb626ed6d087350b1dbc548..4e6ab240f8d73b6f3b15c9fe404568bb82bdc7e2 100644 (file)
--- a/process.h
+++ b/process.h
@@ -19,8 +19,9 @@
 
 class process {
 public:
-    static double get_total_load_init() { return total_load_init; }
-    static double get_total_load_exit() { return total_load_exit; }
+    static double get_total_load_init()    { return total_load_init;    }
+    static double get_total_load_running() { return total_load_running; }
+    static double get_total_load_exit()    { return total_load_exit;    }
 
     process(int argc, char* argv[]);
     virtual ~process();
@@ -48,8 +49,9 @@ protected:
                        xbt_log_category_t cat = _XBT_LOGV(default)) const;
 
 private:
-    static double total_load_init; // sum of neighbor loads at init
-    static double total_load_exit; // sum of neighbor loads at exit
+    static double total_load_init; // sum of process loads at init
+    static double total_load_running; // summ of loads while running
+    static double total_load_exit; // sum of process loads at exit
 
     typedef MAP_TEMPLATE<m_host_t, neighbor*> rev_neigh_type;
     neigh_type neigh;           // list of neighbors (do not alter
@@ -63,6 +65,7 @@ private:
                                 // on data channel
     bool close_received;        // true if we received a "close" message
     bool may_receive;           // true if there remains neighbors to listen for
+    bool finalizing;            // true when finalize() is running
 
     unsigned iter;              // counter of iterations