From: Arnaud Giersch Date: Wed, 29 Feb 2012 14:54:31 +0000 (+0100) Subject: Protect concurrent accesses to shared variables in process. X-Git-Tag: exp_20120229~10 X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/commitdiff_plain/526e1f382ddc9f0010c41a526f3029af769162c9?ds=sidebyside Protect concurrent accesses to shared variables in process. --- diff --git a/BUGS b/BUGS index d313473..632bdde 100644 --- a/BUGS +++ b/BUGS @@ -3,6 +3,9 @@ Les variables globales process::total_load_* ne sont pas protégées contre les accès concurrents. Il n'est donc pas possible actuellement d'exécuter les simulations en parallèle (--cfg=contexts/nthreads). +Corrigé en partie. Il reste quelques "fixme: get locked?" à régler +(ou pas). + ======================================================================== ##### RESOLVED BUGS COME AFTER THIS #################################### ======================================================================== diff --git a/main.cpp b/main.cpp index 8a1d5c8..f31e847 100644 --- a/main.cpp +++ b/main.cpp @@ -274,6 +274,7 @@ int main(int argc, char* argv[]) proc_mutex = new mutex_t(); proc_cond = new condition_t(); + process::set_proc_mutex(proc_mutex); // Launch the MSG simulation. XBT_INFO("Starting simulation at %f...", MSG_get_clock()); @@ -281,6 +282,7 @@ int main(int argc, char* argv[]) simulated_time = MSG_get_clock(); XBT_INFO("Simulation ended at %f.", simulated_time); + process::set_proc_mutex(NULL); delete proc_cond; delete proc_mutex; diff --git a/process.cpp b/process.cpp index c85b605..36bdff0 100644 --- a/process.cpp +++ b/process.cpp @@ -16,6 +16,8 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc); #include "process.h" +mutex_t *process::proc_mutex; + double process::total_load_init = 0.0; double process::total_load_running = 0.0; double process::total_load_exit = 0.0; @@ -58,22 +60,21 @@ process::process(int argc, char* argv[]) rev_neigh.insert(std::make_pair(host, ptr)); } - // Note: there should not be race condition with the current - // version of Simgrid, when updating the global variables. - prev_load_broadcast = -1; // force sending of load on first send_all() expected_load = real_load; - total_load_running += real_load; - total_load_init += real_load; received_load = 0.0; idle_duration = 0.0; convergence = -1.0; + proc_mutex->acquire(); process_counter++; + total_load_init += real_load; + total_load_running += real_load; total_load_average = total_load_running / process_counter; load_diff_threshold = (opt::load_ratio_threshold + opt::avg_load_ratio * total_load_average) / 100.0; + proc_mutex->release(); ctrl_close_pending = data_close_pending = neigh.size(); close_received = false; @@ -103,7 +104,9 @@ process::process(int argc, char* argv[]) process::~process() { delete lb_thread; + proc_mutex->acquire(); total_load_exit += real_load; + proc_mutex->release(); xbt_assert(received_load == 0.0, "received_load is %g, but should be 0.0 !", received_load); if (opt::log_rate < 0) @@ -123,7 +126,7 @@ process::~process() double process::get_iter_deviation() const { - double average_cost = opt::comp_cost(total_load_average); + double average_cost = opt::comp_cost(total_load_average); // fixme: get locked? // Do not count idle periods double comp_iter_opt = acc.comp_amount / average_cost; /* @@ -270,26 +273,29 @@ void process::compute_loop() } real_load += received_load; received_load = 0.0; + proc_mutex->acquire(); total_load_running -= real_load; + proc_mutex->release(); convergence_check(); comm.data_flush(true); } void process::convergence_check() { - double load_diff = std::fabs(real_load - total_load_average); + double average = total_load_average; // fixme: get locked? + double load_diff = std::fabs(real_load - average); bool converged = load_diff <= load_diff_threshold; if (convergence >= 0.0) { if (!converged) { XBT_VERB("current load has diverged: %g (%.4g%%)", - real_load, 100.0 * load_diff / total_load_average); + real_load, 100.0 * load_diff / average); convergence = -1.0; } } else { if (converged) { XBT_VERB("current load has converged: %g (%.4g%%)", - real_load, 100.0 * load_diff / total_load_average); + real_load, 100.0 * load_diff / average); convergence = MSG_get_clock(); } } @@ -327,7 +333,7 @@ bool process::still_running() last_status = false; } else if (100.0 * total_load_running / total_load_init <= - opt::load_ratio_threshold) { + opt::load_ratio_threshold) { // fixme: get locked? // fixme: this check should be implemented with a distributed // algorithm, and not a shared global variable! XBT_VERB("No more load to balance in system."); diff --git a/process.h b/process.h index c31d213..901d381 100644 --- a/process.h +++ b/process.h @@ -25,6 +25,9 @@ class process { public: + static void set_proc_mutex(mutex_t* m) { proc_mutex = m; } + + // Note: normally used with proc_mutex locked. static double get_total_load_init() { return total_load_init; } static double get_total_load_running() { return total_load_running; } static double get_total_load_exit() { return total_load_exit; } @@ -83,6 +86,10 @@ protected: xbt_log_category_t cat = _XBT_LOGV(default)) const; private: + static mutex_t *proc_mutex; // protect access to global variables + // (must be set before constructing + // the first object!) + static double total_load_init; // sum of process loads at init static double total_load_running; // sum of loads while running static double total_load_exit; // sum of process loads at exit