comp_iter = lb_iter = 0;
- compute_thread = new_msg_thread("compute",
- std::tr1::bind(&process::compute_loop,
- this));
+ lb_thread = new_msg_thread("loba",
+ std::tr1::bind(&process::load_balance_loop,
+ this));
e_xbt_log_priority_t logp = xbt_log_priority_verbose;
if (!LOG_ISENABLED(logp))
process::~process()
{
- delete compute_thread;
+ delete lb_thread;
total_load_exit += real_load;
- if (opt::bookkeeping) {
- XBT_INFO("Final load after %d:%d iterations: %g ; expected: %g",
- lb_iter, comp_iter, real_load, expected_load);
- } else {
- XBT_INFO("Final load after %d:%d iterations: %g",
- lb_iter, comp_iter, real_load);
- }
+ if (opt::log_rate < 0)
+ return;
+ XBT_INFO("Final load after %d:%d iterations: %g",
+ lb_iter, comp_iter, real_load);
+ XBT_VERB("Expected load was: %g", expected_load);
XBT_VERB("Total computation for this process: %g", comp);
}
int process::run()
{
- XBT_INFO("Initial load: %g", real_load);
+ if (opt::log_rate >= 0) {
+ XBT_INFO("Initial load: %g", real_load);
+ XBT_VERB("Initial expected load: %g", expected_load);
+ }
XBT_VERB("Starting...");
- compute_thread->start();
- load_balance_loop();
- compute_thread->wait();
+ mutex.acquire();
+ lb_thread->start();
+ while (lb_iter <= opt::comp_iter_delay)
+ cond.wait(mutex);
+ mutex.release();
+ double sleep_duration = opt::comp_time_delay - MSG_get_clock();
+ if (sleep_duration > 0.0)
+ MSG_process_sleep(sleep_duration);
+ compute_loop();
+ lb_thread->wait();
XBT_VERB("Done.");
return 0;
}
double next_iter_after_date = MSG_get_clock() + opt::min_lb_iter_duration;
while (still_running()) {
- ++lb_iter;
+ if (lb_iter == opt::comp_iter_delay) {
+ mutex.acquire();
+ ++lb_iter;
+ cond.signal();
+ mutex.release();
+ } else {
+ ++lb_iter;
+ }
if (opt::log_rate && lb_iter % opt::log_rate == 0) {
- if (opt::bookkeeping)
- XBT_INFO("(%u:%u) current load: %g ; expected: %g",
- lb_iter, comp_iter, real_load, expected_load);
- else
- XBT_INFO("(%u:%u) current load: %g",
- lb_iter, comp_iter, real_load);
+ XBT_INFO("(%u:%u) current load: %g", lb_iter, comp_iter, real_load);
+ XBT_VERB("... expected load: %g", expected_load);
}
if (get_load() > 0.0)
// send
std::for_each(neigh.begin(), neigh.end(),
bind(&process::ctrl_send, this, _1));
+ prev_load_broadcast = get_load();
sleep_until_date(next_iter_after_date, opt::min_lb_iter_duration);
ctrl_receive(0.0);
}
XBT_VERB("Going to finalize for %s...", __func__);
+ // last send, for not losing load scheduled to be sent
+ std::for_each(neigh.begin(), neigh.end(),
+ bind(&process::data_send, this, _1));
+ finalizing = true;
+ total_load_running -= real_load;
XBT_DEBUG("send DATA_CLOSE to %zu neighbor%s",
neigh.size(), ESSE(neigh.size()));
std::for_each(neigh.begin(), neigh.end(),
opt::load_ratio_threshold) {
// fixme: this check should be implemented with a distributed
// algorithm, and not a shared global variable!
- // fixme: should this chunk be moved before call to receive() ?
XBT_VERB("No more load to balance in system.");
last_status = false;
}
if (_XBT_LOG_ISENABLEDV((*cat), logp)) { \
using std::tr1::bind; \
using std::tr1::placeholders::_1; \
- XBT_XCLOG(cat, logp, "Neighbor loads:"); \
+ XBT_XCLOG(cat, logp, "My load: %g (real); %g (expected). " \
+ "Neighbor loads:", real_load, expected_load); \
std::for_each(vec.begin(), vec.end(), \
bind(&neighbor::print, _1, verbose, logp, cat)); \
} else ((void)0)
} else {
load_to_send = nb.get_to_send();
nb.set_to_send(0.0);
+ // do not update real_load here
}
if (load_to_send > 0.0)
comm.data_send(nb.get_data_mbox(),