From: Arnaud Giersch Date: Thu, 16 Dec 2010 23:49:56 +0000 (+0100) Subject: Ongoing work on process logic... X-Git-Tag: v0.1~226 X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/commitdiff_plain/e02d85948b5f10d8b37e39ec94d915c48f46a15a?ds=sidebyside;hp=b78d5563826957045678f4c3bcfdcef6c9bfc48d Ongoing work on process logic... ... but there still remain some bugs to fix... --- diff --git a/TODO b/TODO index 99b97e0..fa61fbb 100644 --- a/TODO +++ b/TODO @@ -1,8 +1,13 @@ + +* fix deadlock bug with ./loba cluster1000.xml -N26 -i2 + * implement loba_* algorithms (start with some trivial one) -* fix process::run when load is 0 - -> wait for a message... - -> how does it work with opt::bookkeeping ??? +* fix process::run (see inline comments) + +* find a better + +* add some statistics about load (im)balance at the end of the simulation * for automatic process topology, -> implement some random initial distribution of load diff --git a/communicator.cpp b/communicator.cpp index 8393515..e6653eb 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -15,11 +15,11 @@ std::string message::to_string() static const char* str[] = { "INFO", "CREDIT", "LOAD", "CTRL_CLOSE", "DATA_CLOSE" }; std::ostringstream oss; - oss << str[type] << " (" << amount << ")"; + oss << str[type] << ": " << amount; return oss.str(); } -const int communicator::send_count_before_flush = 128; +const int communicator::send_count_before_flush = 16; communicator::communicator() : host((hostdata* )MSG_host_get_data(MSG_host_self())) diff --git a/deployment.cpp b/deployment.cpp index d582dc6..43505db 100644 --- a/deployment.cpp +++ b/deployment.cpp @@ -134,6 +134,7 @@ void deployment_torus::generate() b = c; } unsigned width = b; + // here width == ceil(sqrt(size)) unsigned first_on_last_line = (size() - 1) - (size() - 1) % width; DEBUG4("torus size = %u ; width = %u ; height = %u ; foll = %u", diff --git a/main.cpp b/main.cpp index 401a51a..5d43190 100644 --- a/main.cpp +++ b/main.cpp @@ -45,6 +45,25 @@ int simulation_main(int argc, char* argv[]) return result; } +void check_for_lost_load() +{ + const double threshold = 1e-4; + double total_init = process::get_total_load_init(); + double total_exit = process::get_total_load_exit(); + double lost = total_init - total_exit; + double lost_ratio = 100 * lost / total_init; + if (lost_ratio < -threshold) { + CRITICAL2("Gained load at exit! %g (%g%%) <============", + lost, lost_ratio); + } else if (lost_ratio > threshold) { + CRITICAL2("Lost load at exit! %g (%g%%) <============", + lost, lost_ratio); + } else { + DEBUG2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio); + } + +} + int main(int argc, char* argv[]) { // Note: variables used after THROW must be declared as volatile. @@ -113,8 +132,9 @@ int main(int argc, char* argv[]) // Launch the MSG simulation. INFO1("Starting simulation at %f...", MSG_get_clock()); res = MSG_main(); - INFO1("Simulation ended at %f.", MSG_get_clock()); simulated_time = MSG_get_clock(); + INFO1("Simulation ended at %f.", simulated_time); + check_for_lost_load(); if (res != MSG_OK) THROW1(0, 0, "MSG_main() failed with status %#x", res); diff --git a/process.cpp b/process.cpp index b0d7026..1556f3c 100644 --- a/process.cpp +++ b/process.cpp @@ -13,6 +13,9 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc); +double process::total_load_init = 0.0; +double process::total_load_exit = 0.0; + process::process(int argc, char* argv[]) { if (argc < 2 || !(std::istringstream(argv[1]) >> load)) @@ -29,6 +32,7 @@ process::process(int argc, char* argv[]) } expected_load = load; + total_load_init += load; ctrl_close_pending = data_close_pending = neigh.size(); if (neigh.size() == 1) { @@ -56,19 +60,20 @@ process::process(int argc, char* argv[]) process::~process() { + total_load_exit += load; } int process::run() { - bool one_more = true; - INFO1("Initial load: %g", load); VERB0("Starting..."); // first send() to inform neighbors about our load (force it) - prev_load_broadcast = !(opt::bookkeeping? expected_load: load); - send(); + prev_load_broadcast = -1; iter = 0; - while (one_more) { + bool one_more = true; + do { + ++iter; + if (opt::log_rate && iter % opt::log_rate == 0) { if (opt::bookkeeping) INFO3("(%u) current load: %g ; expected: %g", @@ -79,26 +84,37 @@ int process::run() } print_loads(xbt_log_priority_debug); - bool do_compute = load > 0.0; - if (do_compute) { - compute(); - ++iter; - } - - bool close_received = !receive(do_compute? NO_WAIT: WAIT); - if (opt::bookkeeping) expected_load -= load_balance(expected_load); else load -= load_balance(load); send(); + compute(); - if (opt::exit_on_close && close_received) - one_more = false; - if (opt::maxiter && iter >= opt::maxiter) - one_more = false; - } +// NDS for Need To Send +#define NDS ((opt::bookkeeping ? expected_load : load) != prev_load_broadcast) + bool can_recv; + do { + // General idea: do not iterate if there is nothing to + // compute, nor to send. + + // fixme: review this chunk, and remove this NDS macro! + + bool recv_wait = (load == 0 && !NDS); + bool close_received = !receive(recv_wait? WAIT: NO_WAIT); + + if (opt::exit_on_close && close_received) + one_more = false; + else if (opt::maxiter && iter >= opt::maxiter) + one_more = false; + + can_recv = (ctrl_close_pending || data_close_pending); + + } while (one_more && can_recv && load == 0 && !NDS); +#undef NDS + + } while (one_more); VERB0("Going to finalize..."); finalize(); @@ -109,11 +125,9 @@ int process::run() */ VERB0("Done."); + INFO3("Final load after %d iteration%s: %g", iter, ESSE(iter), load); if (opt::bookkeeping) - INFO4("Final load after %d iteration%s: %g ; expected: %g", - iter, ESSE(iter), load, expected_load); - else - INFO3("Final load after %d iteration%s: %g", iter, ESSE(iter), load); + INFO1("Expected load: %g", expected_load); return 0; } @@ -136,11 +150,15 @@ double process::load_balance(double /*my_load*/) void process::compute() { - double duration = opt::comp_cost(load); - m_task_t task = MSG_task_create("computation", duration, 0.0, NULL); - DEBUG2("compute %g flop%s.", duration, ESSE(duration)); - MSG_task_execute(task); - MSG_task_destroy(task); + if (load > 0.0) { + double duration = opt::comp_cost(load); + m_task_t task = MSG_task_create("computation", duration, 0.0, NULL); + DEBUG2("compute %g flop%s.", duration, ESSE(duration)); + MSG_task_execute(task); + MSG_task_destroy(task); + } else { + DEBUG0("nothing to compute !"); + } } void process::send1_no_bookkeeping(neighbor& nb) @@ -201,6 +219,8 @@ void process::send() bool process::receive(recv_wait_mode wait) { + // DEBUG1("go for receive(%s)", + // "NO_WAIT\0WAIT\0\0\0\0WAIT_FOR_CLOSE" + 8 * wait); bool result = true; message* msg; m_host_t from; diff --git a/process.h b/process.h index 268fac9..053e348 100644 --- a/process.h +++ b/process.h @@ -18,8 +18,12 @@ class process { public: + static double get_total_load_init() { return total_load_init; } + static double get_total_load_exit() { return total_load_exit; } + process(int argc, char* argv[]); - ~process(); + virtual ~process(); + int run(); protected: @@ -33,8 +37,10 @@ protected: double sum_of_to_send() const; private: - typedef MAP_TEMPLATE rev_neigh_type; + static double total_load_init; + static double total_load_exit; + typedef MAP_TEMPLATE rev_neigh_type; neigh_type neigh; // list of neighbors (do not alter // after construction!) rev_neigh_type rev_neigh; // map m_host_t -> neighbor