close_received = false;
finalizing = false;
+ comp_iter = lb_iter = 0;
+
e_xbt_log_priority_t logp = xbt_log_priority_verbose;
if (!LOG_ISENABLED(logp))
return;
double next_iter_after_date = 0.0;
INFO1("Initial load: %g", real_load);
VERB0("Starting...");
- comp_iter = lb_iter = 0;
while (true) {
- double ld = lb_load();
- if (ld > 0.0) {
+ if (get_load() > 0.0) {
double now = MSG_get_clock();
if (now < next_iter_after_date)
MSG_process_sleep(next_iter_after_date - now);
lb_iter, real_load);
}
- ld -= load_balance(ld);
+ load_balance();
print_loads(true, xbt_log_priority_debug);
}
- lb_load() = ld;
// send load information, and load (data) if any
send_all();
// block on receiving unless there is something to compute or
// to send
double timeout;
- if (real_load != 0 || lb_load() != prev_load_broadcast)
+ if (real_load != 0 || get_load() != prev_load_broadcast)
timeout = 0.0;
else if (opt::min_iter_duration)
timeout = opt::min_iter_duration;
return 0;
}
-double process::load_balance(double /*my_load*/)
+void process::load_balance()
{
if (lb_iter == 1) // warn only once
WARN0("process::load_balance() is a no-op!");
- return 0.0;
}
void process::compute()
}
}
+void process::send(neighbor& nb, double amount)
+{
+ set_load(get_load() - amount);
+ nb.set_to_send(nb.get_to_send() + amount);
+ nb.set_load(nb.get_load() + amount); // fixme: make this optional?
+}
+
void process::send1_no_bookkeeping(neighbor& nb)
{
if (real_load != prev_load_broadcast)
std::for_each(neigh.begin(), neigh.end(),
bind(&process::finalize1, this, _1));
- DEBUG2("wait for CLOSE from %lu neighbor%s",
- (unsigned long )neigh.size(), ESSE(neigh.size()));
while (may_receive()) {
comm.flush(false);
+ DEBUG2("waiting for %d CTRL and %d DATA CLOSE",
+ ctrl_close_pending, data_close_pending);
receive(-1.0);
}