]> AND Private Git Repository - loba.git/blobdiff - process.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Use MSG_comm_waitall for communicator::flush(true).
[loba.git] / process.cpp
index 6d791bdc94e1c6a3a99cfca44ad0b24a57beb155..57c06612ad84bb9cffaa07123e8c113b0148fad9 100644 (file)
@@ -44,6 +44,8 @@ process::process(int argc, char* argv[])
     close_received = false;
     finalizing = false;
 
+    comp_iter = lb_iter = 0;
+
     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
     if (!LOG_ISENABLED(logp))
         return;
@@ -81,10 +83,8 @@ int process::run()
     double next_iter_after_date = 0.0;
     INFO1("Initial load: %g", real_load);
     VERB0("Starting...");
-    comp_iter = lb_iter = 0;
     while (true) {
-        double ld = lb_load();
-        if (ld > 0.0) {
+        if (get_load() > 0.0) {
             double now = MSG_get_clock();
             if (now < next_iter_after_date)
                 MSG_process_sleep(next_iter_after_date - now);
@@ -101,11 +101,10 @@ int process::run()
                           lb_iter, real_load);
             }
 
-            ld -= load_balance(ld);
+            load_balance();
 
             print_loads(true, xbt_log_priority_debug);
         }
-        lb_load() = ld;
 
         // send load information, and load (data) if any
         send_all();
@@ -130,7 +129,7 @@ int process::run()
         // block on receiving unless there is something to compute or
         // to send
         double timeout;
-        if (real_load != 0 || lb_load() != prev_load_broadcast)
+        if (real_load != 0 || get_load() != prev_load_broadcast)
             timeout = 0.0;
         else if (opt::min_iter_duration)
             timeout = opt::min_iter_duration;
@@ -174,11 +173,10 @@ int process::run()
     return 0;
 }
 
-double process::load_balance(double /*my_load*/)
+void process::load_balance()
 {
     if (lb_iter == 1)           // warn only once
         WARN0("process::load_balance() is a no-op!");
-    return 0.0;
 }
 
 void process::compute()
@@ -196,6 +194,13 @@ void process::compute()
     }
 }
 
+void process::send(neighbor& nb, double amount)
+{
+    set_load(get_load() - amount);
+    nb.set_to_send(nb.get_to_send() + amount);
+    nb.set_load(nb.get_load() + amount); // fixme: make this optional?
+}
+
 void process::send1_no_bookkeeping(neighbor& nb)
 {
     if (real_load != prev_load_broadcast)
@@ -310,10 +315,10 @@ void process::finalize()
     std::for_each(neigh.begin(), neigh.end(),
                   bind(&process::finalize1, this, _1));
 
-    DEBUG2("wait for CLOSE from %lu neighbor%s",
-           (unsigned long )neigh.size(), ESSE(neigh.size()));
     while (may_receive()) {
         comm.flush(false);
+        DEBUG2("waiting for %d CTRL and %d DATA CLOSE",
+               ctrl_close_pending, data_close_pending);
         receive(-1.0);
     }