Fix a deadlock occuring when there was no load anymore in the system.
Each remaining process had nothing to compute, nor to send to their
neighbors, and were blocked waiting for an incoming message.
The fix consists in:
* adding a shared global variable total_load_running, the sum of loads
currently in the system ; and
* ensuring this variable is always up-to-date ; and
* making processes terminate if total_load_running is null.
The use of a global variable is not satisfactory, but it is good
enough for now.
It is also verified, at the end of the simulation, that
total_load_running is null.
Other important changes in this commit are:
* process::receive() now consumes all pending messages (it used to
consume only one).
* The iteration number is only incremented when there is something to
compute (load > 0.0).
Note: bookkeeping version may be broken.
+* verify bookkeeping version.
-* fix deadlock bug with ./loba cluster1000.xml -N26 -i2
+* add a variant to (not) change neighbor load information at send.
* implement loba_* algorithms (start with some trivial one)
* implement loba_* algorithms (start with some trivial one)
-* fix process::run (see inline comments)
-
-* find a better
-
-* add some statistics about load (im)balance at the end of the simulation
+* add some statistics about load (im)balance at the end of the simulation?
* for automatic process topology,
-> implement some random initial distribution of load
* add synchronized mode
* for automatic process topology,
-> implement some random initial distribution of load
* add synchronized mode
-* translate README file ?
+* translate README file?
-const int communicator::send_count_before_flush = 16;
+const int communicator::send_count_before_flush = 4;
communicator::communicator()
: host((hostdata* )MSG_host_get_data(MSG_host_self()))
communicator::communicator()
: host((hostdata* )MSG_host_get_data(MSG_host_self()))
void check_for_lost_load()
{
void check_for_lost_load()
{
- const double threshold = 1e-4;
double total_init = process::get_total_load_init();
double total_init = process::get_total_load_init();
double total_exit = process::get_total_load_exit();
double lost = total_init - total_exit;
double total_exit = process::get_total_load_exit();
double lost = total_init - total_exit;
- double lost_ratio = 100 * lost / total_init;
- if (lost_ratio < -threshold) {
+ double lost_ratio = 100.0 * lost / total_init;
+ if (lost_ratio < -opt::load_ratio_threshold)
CRITICAL2("Gained load at exit! %g (%g%%) <============",
lost, lost_ratio);
CRITICAL2("Gained load at exit! %g (%g%%) <============",
lost, lost_ratio);
- } else if (lost_ratio > threshold) {
+ else if (lost_ratio > opt::load_ratio_threshold)
CRITICAL2("Lost load at exit! %g (%g%%) <============",
lost, lost_ratio);
CRITICAL2("Lost load at exit! %g (%g%%) <============",
lost, lost_ratio);
DEBUG2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio);
DEBUG2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio);
+ double total_running = process::get_total_load_running();
+ double running_ratio = 100.0 * total_running / total_init;
+ if (running_ratio < -opt::load_ratio_threshold)
+ CRITICAL2("Negative running load at exit! %g (%g%%) <============",
+ total_running, running_ratio);
+ else if (running_ratio > opt::load_ratio_threshold)
+ CRITICAL2("Remaining running load at exit! %g (%g%%) <============",
+ total_running, running_ratio);
+ else
+ DEBUG2("Running load at exit looks good: %g (%g%%)",
+ total_running, running_ratio);
}
int main(int argc, char* argv[])
}
int main(int argc, char* argv[])
+ // Constants
+
+ // A sum of loads if considered null if it is less than
+ // load_ratio_threshold percent of the sum of loads at init.
+ const double load_ratio_threshold = 1e-4;
+
// Global options
std::string program_name;
int help_requested = 0;
// Global options
std::string program_name;
int help_requested = 0;
// Global parameters, shared by all the processes
namespace opt {
// Global parameters, shared by all the processes
namespace opt {
+ // Constants
+ extern const double load_ratio_threshold;
+
// Global options
extern std::string program_name;
extern int help_requested;
// Global options
extern std::string program_name;
extern int help_requested;
#include "process.h"
double process::total_load_init = 0.0;
#include "process.h"
double process::total_load_init = 0.0;
+double process::total_load_running = 0.0;
double process::total_load_exit = 0.0;
process::process(int argc, char* argv[])
double process::total_load_exit = 0.0;
process::process(int argc, char* argv[])
prev_load_broadcast = -1; // force sending of load on first send()
expected_load = load;
prev_load_broadcast = -1; // force sending of load on first send()
expected_load = load;
+ total_load_running += load;
total_load_init += load;
ctrl_close_pending = data_close_pending = neigh.size();
total_load_init += load;
ctrl_close_pending = data_close_pending = neigh.size();
close_received = false;
may_receive = (neigh.size() > 0); // the same as (ctrl_close_pending ||
// data_close_pending)
close_received = false;
may_receive = (neigh.size() > 0); // the same as (ctrl_close_pending ||
// data_close_pending)
if (may_receive)
comm.listen();
if (may_receive)
comm.listen();
oss << neigh.back().get_name();
}
LOG1(logp, "Got %s.", oss.str().c_str());
oss << neigh.back().get_name();
}
LOG1(logp, "Got %s.", oss.str().c_str());
- print_loads(true, logp);
+ print_loads(false, logp);
INFO1("Initial load: %g", load);
VERB0("Starting...");
iter = 0;
INFO1("Initial load: %g", load);
VERB0("Starting...");
iter = 0;
- bool one_more = true;
- do {
- ++iter;
+ while (true) {
+ if (load > 0.0) {
+ ++iter;
+ if (opt::log_rate && iter % opt::log_rate == 0) {
+ if (opt::bookkeeping)
+ INFO3("(%u) current load: %g ; expected: %g",
+ iter, load, expected_load);
+ else
+ INFO2("(%u) current load: %g",
+ iter, load);
+ }
- if (opt::log_rate && iter % opt::log_rate == 0) {
- INFO3("(%u) current load: %g ; expected: %g",
- iter, load, expected_load);
+ expected_load -= load_balance(expected_load);
- INFO2("(%u) current load: %g",
- iter, load);
- }
- print_loads(true, xbt_log_priority_debug);
+ load -= load_balance(load);
- if (opt::bookkeeping)
- expected_load -= load_balance(expected_load);
- else
- load -= load_balance(load);
+ print_loads(true, xbt_log_priority_debug);
-// NDS for Need To Send
-#define NDS ((opt::bookkeeping ? expected_load : load) != prev_load_broadcast)
- do {
- // General idea: block on receiving unless there is
- // something to compute, or to send, or we must exit.
+ if (opt::maxiter && iter >= opt::maxiter)
+ break;
+ } else {
+ // send load information, and load when bookkeeping
+ send();
+ }
- // fixme: review this chunk, and remove this NDS macro!
+ // block on receiving unless there is something to compute or
+ // to send
+ bool recv_wait = (load == 0 &&
+ ((opt::bookkeeping ? expected_load : load)
+ == prev_load_broadcast));
+ DEBUG1("CALL RECEIVE(%s)", recv_wait? "WAIT": "NO_WAIT");
+ receive(recv_wait? WAIT: NO_WAIT);
- // FIXME: HAD A DEADLOCK HERE...
+ // one of our neighbor is finalizing
+ if (opt::exit_on_close && close_received)
+ break;
- bool recv_wait = (load == 0 && !NDS);
- DEBUG1("CALL RECEIVE(%s)", recv_wait? "WAIT": "NO_WAIT");
- receive(recv_wait? WAIT: NO_WAIT);
+ // have no load and cannot receive anything
+ if (load == 0.0 && !may_receive)
+ break;
- if (opt::exit_on_close && close_received)
- one_more = false;
- else if (opt::maxiter && iter >= opt::maxiter)
- one_more = false;
-
- } while (one_more && may_receive && load == 0 && !NDS);
- DEBUG0("RECEIVE LOOP ENDED");
-#undef NDS
+ // fixme: this check should be implemented with a distributed
+ // algorithm, and not a shared global variable!
+ if (100.0 * total_load_running / total_load_init <=
+ opt::load_ratio_threshold) {
+ VERB0("No more load to balance in system, stopping.");
+ break;
+ }
VERB0("Going to finalize...");
finalize();
VERB0("Going to finalize...");
finalize();
// "NO_WAIT\0WAIT\0\0\0\0WAIT_FOR_CLOSE" + 8 * wait);
message* msg;
m_host_t from;
// "NO_WAIT\0WAIT\0\0\0\0WAIT_FOR_CLOSE" + 8 * wait);
message* msg;
m_host_t from;
- bool do_loop = may_receive;
- while (do_loop && comm.recv(msg, from, wait)) {
+ bool do_wait = (wait != NO_WAIT);
+ while (may_receive && comm.recv(msg, from, do_wait)) {
switch (msg->get_type()) {
case message::INFO: {
neighbor* n = rev_neigh[from];
switch (msg->get_type()) {
case message::INFO: {
neighbor* n = rev_neigh[from];
case message::CREDIT:
expected_load += msg->get_amount();
break;
case message::CREDIT:
expected_load += msg->get_amount();
break;
- case message::LOAD:
- load += msg->get_amount();
+ case message::LOAD: {
+ double ld = msg->get_amount();
+ load += ld;
+ if (finalizing)
+ total_load_running -= ld;
case message::CTRL_CLOSE:
if (--ctrl_close_pending == 1)
comm.next_close_on_ctrl_is_last();
case message::CTRL_CLOSE:
if (--ctrl_close_pending == 1)
comm.next_close_on_ctrl_is_last();
}
delete msg;
may_receive = (ctrl_close_pending || data_close_pending);
}
delete msg;
may_receive = (ctrl_close_pending || data_close_pending);
- do_loop = (wait == WAIT_FOR_CLOSE) && may_receive;
+ do_wait = (wait == WAIT_FOR_CLOSE);
}
}
void process::finalize1(neighbor& nb)
{
comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
}
}
void process::finalize1(neighbor& nb)
{
comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
- comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
+ comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
}
void process::finalize()
}
void process::finalize()
using namespace std::tr1;
using namespace std::tr1::placeholders;
using namespace std::tr1;
using namespace std::tr1::placeholders;
+ finalizing = true;
+ total_load_running -= load;
+
DEBUG2("send CLOSE to %d neighbor%s.",
(int )neigh.size(), ESSE(neigh.size()));
std::for_each(neigh.begin(), neigh.end(),
DEBUG2("send CLOSE to %d neighbor%s.",
(int )neigh.size(), ESSE(neigh.size()));
std::for_each(neigh.begin(), neigh.end(),
- static double get_total_load_init() { return total_load_init; }
- static double get_total_load_exit() { return total_load_exit; }
+ static double get_total_load_init() { return total_load_init; }
+ static double get_total_load_running() { return total_load_running; }
+ static double get_total_load_exit() { return total_load_exit; }
process(int argc, char* argv[]);
virtual ~process();
process(int argc, char* argv[]);
virtual ~process();
xbt_log_category_t cat = _XBT_LOGV(default)) const;
private:
xbt_log_category_t cat = _XBT_LOGV(default)) const;
private:
- static double total_load_init; // sum of neighbor loads at init
- static double total_load_exit; // sum of neighbor loads at exit
+ static double total_load_init; // sum of process loads at init
+ static double total_load_running; // summ of loads while running
+ static double total_load_exit; // sum of process loads at exit
typedef MAP_TEMPLATE<m_host_t, neighbor*> rev_neigh_type;
neigh_type neigh; // list of neighbors (do not alter
typedef MAP_TEMPLATE<m_host_t, neighbor*> rev_neigh_type;
neigh_type neigh; // list of neighbors (do not alter
// on data channel
bool close_received; // true if we received a "close" message
bool may_receive; // true if there remains neighbors to listen for
// on data channel
bool close_received; // true if we received a "close" message
bool may_receive; // true if there remains neighbors to listen for
+ bool finalizing; // true when finalize() is running
unsigned iter; // counter of iterations
unsigned iter; // counter of iterations