]> AND Private Git Repository - loba.git/blobdiff - process.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Be consistent and hold mutex in any case when returning from condition_t::timedwait().
[loba.git] / process.cpp
index dfbf1239a24b19ee80a5407cb9e51d03d9dec451..3413931f058c1603994fbfa1ece1da06ebb542ca 100644 (file)
@@ -6,7 +6,6 @@
 #include <stdexcept>
 #include <sstream>
 #include <xbt/log.h>
-#include <xbt/time.h>
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc);
 
@@ -24,6 +23,7 @@ double process::total_load_exit = 0.0;
 
 int process::process_counter = 0;
 double process::total_load_average;
+double process::average_load_ratio;
 double process::load_diff_threshold;
 
 std::atomic<int> process::convergence_counter(0);
@@ -57,7 +57,7 @@ process::process(int argc, char* argv[])
     pneigh.reserve(neigh.size());
     for (unsigned i = 0 ; i < neigh.size() ; i++) {
         neighbor* ptr = &neigh[i];
-        m_host_t host = MSG_get_host_by_name(ptr->get_name());
+        msg_host_t host = MSG_get_host_by_name(ptr->get_name());
         pneigh.push_back(ptr);
         rev_neigh.insert(std::make_pair(host, ptr));
     }
@@ -75,8 +75,13 @@ process::process(int argc, char* argv[])
     total_load_init += real_load;
     total_load_running += real_load;
     total_load_average = total_load_running / process_counter;
+    if (opt::avg_load_ratio >= 0.0)
+        average_load_ratio = opt::avg_load_ratio;
+    else
+        average_load_ratio = 100.0 *
+            (process_counter / -opt::avg_load_ratio) / total_load_average;
     load_diff_threshold = (opt::load_ratio_threshold +
-                           opt::avg_load_ratio * total_load_average) / 100.0;
+                           average_load_ratio * total_load_average) / 100.0;
     proc_mutex->release();
 
     ctrl_close_pending = data_close_pending = neigh.size();
@@ -118,10 +123,10 @@ process::~process()
              lb_iter, comp_iter, all_comp_iter, real_load);
     if (convergence >= 0.0)
         XBT_INFO("Convergence within %g%% was achieved at time %g",
-                 opt::avg_load_ratio, convergence);
+                 average_load_ratio, convergence);
     else
         XBT_INFO("Convergence within %g%% was not achieved",
-                 opt::avg_load_ratio);
+                 average_load_ratio);
     XBT_VERB("Expected load was: %g", expected_load);
     XBT_VERB("Total computation for this process: %g", get_comp_amount());
     print_loads(true, xbt_log_priority_debug);
@@ -250,8 +255,8 @@ void process::compute_loop()
         idle_duration += MSG_get_clock() - idle_since_date;
         ++comp_iter;
         double flops = opt::comp_cost(real_load);
-        m_task_t task = MSG_task_create("computation", flops, 0.0, NULL);
-        TRACE_msg_set_task_category(task, TRACE_CAT_COMP);
+        msg_task_t task = MSG_task_create("computation", flops, 0.0, NULL);
+        // MSG_task_set_category(task, TRACE_CAT_COMP);
         XBT_DEBUG("compute %g flop%s", flops, ESSE(flops));
         MSG_task_execute(task);
         add_comp_amount(flops);
@@ -289,19 +294,22 @@ void process::convergence_check()
     double load_diff = std::fabs(real_load - average);
     bool converged = load_diff <= load_diff_threshold;
 
-    if (convergence >= 0.0) {
-        if (!converged) {
-            XBT_VERB("current load has diverged: %g (%.4g%%)",
-                     real_load, 100.0 * load_diff / average);
-            convergence = -1.0;
-            convergence_counter++;
-        }
-    } else {
-        if (converged) {
+    if (converged) {
+        if (convergence < 0) {
             XBT_VERB("current load has converged: %g (%.4g%%)",
                      real_load,  100.0 * load_diff / average);
             convergence = MSG_get_clock();
-            convergence_counter--;
+            local_convergence_counter = opt::exit_on_convergence;
+        }
+        if (local_convergence_counter > 0 && --local_convergence_counter == 0)
+                --convergence_counter;
+    } else {
+        if (convergence >= 0.0) {
+            XBT_VERB("current load has diverged: %g (%.4g%%)",
+                     real_load, 100.0 * load_diff / average);
+            convergence = -1.0;
+            if (local_convergence_counter == 0)
+                ++convergence_counter;
         }
     }
 }
@@ -412,7 +420,11 @@ void process::data_send(neighbor& nb)
         else
             excess_load = real_load;
 
-        double balance = nb.get_debt() - nb.get_credit();
+        double balance;
+        if (nb.get_credit() > 0.0)
+            balance = nb.get_debt() - nb.get_credit();
+        else
+            balance = nb.get_debt();
         load_to_send = std::min(excess_load,
                                 std::max(0.0, balance));
 
@@ -452,7 +464,7 @@ void process::data_close(neighbor& nb)
 void process::ctrl_receive(double timeout)
 {
     message* msg;
-    m_host_t from;
+    msg_host_t from;
 
     XBT_DEBUG("%sblocking receive on ctrl (%g)", "\0non-" + !timeout, timeout);
     while (ctrl_close_pending && comm.ctrl_recv(msg, from, timeout)) {
@@ -466,7 +478,7 @@ void process::ctrl_receive(double timeout)
 void process::data_receive(double timeout)
 {
     message* msg;
-    m_host_t from;
+    msg_host_t from;
 
     XBT_DEBUG("%sblocking receive on data (%g)", "\0non-" + !timeout, timeout);
     while (data_close_pending && comm.data_recv(msg, from, timeout)) {
@@ -477,7 +489,7 @@ void process::data_receive(double timeout)
     }
 }
 
-void process::handle_message(message* msg, m_host_t from)
+void process::handle_message(message* msg, msg_host_t from)
 {
     switch (msg->get_type()) {
     case message::CTRL: {