From 0c4a3402a57c61218bf1d1c6d8c31f5c3e1482f9 Mon Sep 17 00:00:00 2001 From: Arnaud Giersch Date: Tue, 21 Dec 2010 17:26:44 +0100 Subject: [PATCH] Deadlock fix, and other changes. Fix a deadlock occuring when there was no load anymore in the system. Each remaining process had nothing to compute, nor to send to their neighbors, and were blocked waiting for an incoming message. The fix consists in: * adding a shared global variable total_load_running, the sum of loads currently in the system ; and * ensuring this variable is always up-to-date ; and * making processes terminate if total_load_running is null. The use of a global variable is not satisfactory, but it is good enough for now. It is also verified, at the end of the simulation, that total_load_running is null. Other important changes in this commit are: * process::receive() now consumes all pending messages (it used to consume only one). * The iteration number is only incremented when there is something to compute (load > 0.0). Note: bookkeeping version may be broken. --- TODO | 11 ++--- communicator.cpp | 2 +- main.cpp | 22 +++++++--- options.cpp | 6 +++ options.h | 3 ++ process.cpp | 102 ++++++++++++++++++++++++++++------------------- process.h | 11 +++-- 7 files changed, 97 insertions(+), 60 deletions(-) diff --git a/TODO b/TODO index fa61fbb..1d5fe42 100644 --- a/TODO +++ b/TODO @@ -1,17 +1,14 @@ +* verify bookkeeping version. -* fix deadlock bug with ./loba cluster1000.xml -N26 -i2 +* add a variant to (not) change neighbor load information at send. * implement loba_* algorithms (start with some trivial one) -* fix process::run (see inline comments) - -* find a better - -* add some statistics about load (im)balance at the end of the simulation +* add some statistics about load (im)balance at the end of the simulation? * for automatic process topology, -> implement some random initial distribution of load * add synchronized mode -* translate README file ? +* translate README file? diff --git a/communicator.cpp b/communicator.cpp index 054c98a..1570b10 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -20,7 +20,7 @@ std::string message::to_string() return oss.str(); } -const int communicator::send_count_before_flush = 16; +const int communicator::send_count_before_flush = 4; communicator::communicator() : host((hostdata* )MSG_host_get_data(MSG_host_self())) diff --git a/main.cpp b/main.cpp index fe584f7..01bf3ac 100644 --- a/main.cpp +++ b/main.cpp @@ -48,21 +48,31 @@ int simulation_main(int argc, char* argv[]) void check_for_lost_load() { - const double threshold = 1e-4; double total_init = process::get_total_load_init(); + double total_exit = process::get_total_load_exit(); double lost = total_init - total_exit; - double lost_ratio = 100 * lost / total_init; - if (lost_ratio < -threshold) { + double lost_ratio = 100.0 * lost / total_init; + if (lost_ratio < -opt::load_ratio_threshold) CRITICAL2("Gained load at exit! %g (%g%%) <============", lost, lost_ratio); - } else if (lost_ratio > threshold) { + else if (lost_ratio > opt::load_ratio_threshold) CRITICAL2("Lost load at exit! %g (%g%%) <============", lost, lost_ratio); - } else { + else DEBUG2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio); - } + double total_running = process::get_total_load_running(); + double running_ratio = 100.0 * total_running / total_init; + if (running_ratio < -opt::load_ratio_threshold) + CRITICAL2("Negative running load at exit! %g (%g%%) <============", + total_running, running_ratio); + else if (running_ratio > opt::load_ratio_threshold) + CRITICAL2("Remaining running load at exit! %g (%g%%) <============", + total_running, running_ratio); + else + DEBUG2("Running load at exit looks good: %g (%g%%)", + total_running, running_ratio); } int main(int argc, char* argv[]) diff --git a/options.cpp b/options.cpp index cf9d7b7..2c8e61d 100644 --- a/options.cpp +++ b/options.cpp @@ -13,6 +13,12 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main); namespace opt { + // Constants + + // A sum of loads if considered null if it is less than + // load_ratio_threshold percent of the sum of loads at init. + const double load_ratio_threshold = 1e-4; + // Global options std::string program_name; int help_requested = 0; diff --git a/options.h b/options.h index e751535..5c43e6f 100644 --- a/options.h +++ b/options.h @@ -10,6 +10,9 @@ // Global parameters, shared by all the processes namespace opt { + // Constants + extern const double load_ratio_threshold; + // Global options extern std::string program_name; extern int help_requested; diff --git a/process.cpp b/process.cpp index a24d6eb..8b31678 100644 --- a/process.cpp +++ b/process.cpp @@ -15,6 +15,7 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc); #include "process.h" double process::total_load_init = 0.0; +double process::total_load_running = 0.0; double process::total_load_exit = 0.0; process::process(int argc, char* argv[]) @@ -34,6 +35,7 @@ process::process(int argc, char* argv[]) prev_load_broadcast = -1; // force sending of load on first send() expected_load = load; + total_load_running += load; total_load_init += load; ctrl_close_pending = data_close_pending = neigh.size(); @@ -44,6 +46,7 @@ process::process(int argc, char* argv[]) close_received = false; may_receive = (neigh.size() > 0); // the same as (ctrl_close_pending || // data_close_pending) + finalizing = false; if (may_receive) comm.listen(); @@ -60,7 +63,7 @@ process::process(int argc, char* argv[]) oss << neigh.back().get_name(); } LOG1(logp, "Got %s.", oss.str().c_str()); - print_loads(true, logp); + print_loads(false, logp); } process::~process() @@ -73,52 +76,60 @@ int process::run() INFO1("Initial load: %g", load); VERB0("Starting..."); iter = 0; - bool one_more = true; - do { - ++iter; + while (true) { + if (load > 0.0) { + ++iter; + if (opt::log_rate && iter % opt::log_rate == 0) { + if (opt::bookkeeping) + INFO3("(%u) current load: %g ; expected: %g", + iter, load, expected_load); + else + INFO2("(%u) current load: %g", + iter, load); + } - if (opt::log_rate && iter % opt::log_rate == 0) { if (opt::bookkeeping) - INFO3("(%u) current load: %g ; expected: %g", - iter, load, expected_load); + expected_load -= load_balance(expected_load); else - INFO2("(%u) current load: %g", - iter, load); - } - print_loads(true, xbt_log_priority_debug); + load -= load_balance(load); - if (opt::bookkeeping) - expected_load -= load_balance(expected_load); - else - load -= load_balance(load); + print_loads(true, xbt_log_priority_debug); - send(); - compute(); + send(); + compute(); -// NDS for Need To Send -#define NDS ((opt::bookkeeping ? expected_load : load) != prev_load_broadcast) - do { - // General idea: block on receiving unless there is - // something to compute, or to send, or we must exit. + if (opt::maxiter && iter >= opt::maxiter) + break; + } else { + // send load information, and load when bookkeeping + send(); + } - // fixme: review this chunk, and remove this NDS macro! + // block on receiving unless there is something to compute or + // to send + bool recv_wait = (load == 0 && + ((opt::bookkeeping ? expected_load : load) + == prev_load_broadcast)); + DEBUG1("CALL RECEIVE(%s)", recv_wait? "WAIT": "NO_WAIT"); + receive(recv_wait? WAIT: NO_WAIT); - // FIXME: HAD A DEADLOCK HERE... + // one of our neighbor is finalizing + if (opt::exit_on_close && close_received) + break; - bool recv_wait = (load == 0 && !NDS); - DEBUG1("CALL RECEIVE(%s)", recv_wait? "WAIT": "NO_WAIT"); - receive(recv_wait? WAIT: NO_WAIT); + // have no load and cannot receive anything + if (load == 0.0 && !may_receive) + break; - if (opt::exit_on_close && close_received) - one_more = false; - else if (opt::maxiter && iter >= opt::maxiter) - one_more = false; - - } while (one_more && may_receive && load == 0 && !NDS); - DEBUG0("RECEIVE LOOP ENDED"); -#undef NDS + // fixme: this check should be implemented with a distributed + // algorithm, and not a shared global variable! + if (100.0 * total_load_running / total_load_init <= + opt::load_ratio_threshold) { + VERB0("No more load to balance in system, stopping."); + break; + } - } while (one_more); + } VERB0("Going to finalize..."); finalize(); @@ -227,8 +238,8 @@ void process::receive(recv_wait_mode wait) // "NO_WAIT\0WAIT\0\0\0\0WAIT_FOR_CLOSE" + 8 * wait); message* msg; m_host_t from; - bool do_loop = may_receive; - while (do_loop && comm.recv(msg, from, wait)) { + bool do_wait = (wait != NO_WAIT); + while (may_receive && comm.recv(msg, from, do_wait)) { switch (msg->get_type()) { case message::INFO: { neighbor* n = rev_neigh[from]; @@ -238,9 +249,13 @@ void process::receive(recv_wait_mode wait) case message::CREDIT: expected_load += msg->get_amount(); break; - case message::LOAD: - load += msg->get_amount(); + case message::LOAD: { + double ld = msg->get_amount(); + load += ld; + if (finalizing) + total_load_running -= ld; break; + } case message::CTRL_CLOSE: if (--ctrl_close_pending == 1) comm.next_close_on_ctrl_is_last(); @@ -256,14 +271,14 @@ void process::receive(recv_wait_mode wait) } delete msg; may_receive = (ctrl_close_pending || data_close_pending); - do_loop = (wait == WAIT_FOR_CLOSE) && may_receive; + do_wait = (wait == WAIT_FOR_CLOSE); } } void process::finalize1(neighbor& nb) { comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0)); - comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0)); + comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0)); } void process::finalize() @@ -271,6 +286,9 @@ void process::finalize() using namespace std::tr1; using namespace std::tr1::placeholders; + finalizing = true; + total_load_running -= load; + DEBUG2("send CLOSE to %d neighbor%s.", (int )neigh.size(), ESSE(neigh.size())); std::for_each(neigh.begin(), neigh.end(), diff --git a/process.h b/process.h index c30b243..4e6ab24 100644 --- a/process.h +++ b/process.h @@ -19,8 +19,9 @@ class process { public: - static double get_total_load_init() { return total_load_init; } - static double get_total_load_exit() { return total_load_exit; } + static double get_total_load_init() { return total_load_init; } + static double get_total_load_running() { return total_load_running; } + static double get_total_load_exit() { return total_load_exit; } process(int argc, char* argv[]); virtual ~process(); @@ -48,8 +49,9 @@ protected: xbt_log_category_t cat = _XBT_LOGV(default)) const; private: - static double total_load_init; // sum of neighbor loads at init - static double total_load_exit; // sum of neighbor loads at exit + static double total_load_init; // sum of process loads at init + static double total_load_running; // summ of loads while running + static double total_load_exit; // sum of process loads at exit typedef MAP_TEMPLATE rev_neigh_type; neigh_type neigh; // list of neighbors (do not alter @@ -63,6 +65,7 @@ private: // on data channel bool close_received; // true if we received a "close" message bool may_receive; // true if there remains neighbors to listen for + bool finalizing; // true when finalize() is running unsigned iter; // counter of iterations -- 2.39.5