double now = MSG_get_clock();
if (now < next_iter_after_date)
MSG_process_sleep(next_iter_after_date - now);
- next_iter_after_date = MSG_get_clock() + opt::min_iter_duration;
+ next_iter_after_date = MSG_get_clock() + opt::min_lb_iter_duration;
++lb_iter;
double timeout;
if (real_load != 0 || get_load() != prev_load_broadcast)
timeout = 0.0;
- else if (opt::min_iter_duration)
- timeout = opt::min_iter_duration;
+ else if (opt::min_lb_iter_duration)
+ timeout = opt::min_lb_iter_duration;
else
timeout = 1.0;
receive(timeout);
void process::send1_no_bookkeeping(neighbor& nb)
{
if (real_load != prev_load_broadcast)
- comm.send(nb.get_ctrl_mbox(), new message(message::INFO, real_load));
+ comm.ctrl_send(nb.get_ctrl_mbox(),
+ new message(message::INFO, real_load));
double load_to_send = nb.get_to_send();
if (load_to_send > 0.0) {
- comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+ comm.data_send(nb.get_data_mbox(),
+ new message(message::LOAD, load_to_send));
nb.set_to_send(0.0);
}
}
void process::send1_bookkeeping(neighbor& nb)
{
if (expected_load != prev_load_broadcast)
- comm.send(nb.get_ctrl_mbox(),
- new message(message::INFO, expected_load));
+ comm.ctrl_send(nb.get_ctrl_mbox(),
+ new message(message::INFO, expected_load));
double load_to_send;
double new_debt;
double debt_to_send = nb.get_to_send();
if (debt_to_send > 0.0) {
- comm.send(nb.get_ctrl_mbox(),
- new message(message::CREDIT, debt_to_send));
+ comm.ctrl_send(nb.get_ctrl_mbox(),
+ new message(message::CREDIT, debt_to_send));
nb.set_to_send(0.0);
new_debt = nb.get_debt() + debt_to_send;
} else {
real_load -= load_to_send;
}
if (load_to_send > 0.0)
- comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+ comm.data_send(nb.get_data_mbox(),
+ new message(message::LOAD, load_to_send));
}
void process::send_all()
bind(&process::send1_no_bookkeeping, this, _1));
prev_load_broadcast = real_load;
}
- comm.flush(false);
+ comm.ctrl_flush(false);
+ comm.data_flush(false);
}
void process::receive(double timeout)
m_host_t from;
XBT_DEBUG("%sblocking receive (%g)", "\0non-" + !timeout, timeout);
- while (may_receive() && comm.recv(msg, from, timeout)) {
+ while (may_receive() && (comm.ctrl_recv(msg, from, timeout) ||
+ comm.data_recv(msg, from, timeout))) {
switch (msg->get_type()) {
case message::INFO: {
neighbor* n = rev_neigh[from];
delete msg;
timeout = 0.0; // only wait on first recv
}
- comm.flush(false);
+ comm.ctrl_flush(false);
+ comm.data_flush(false);
}
void process::finalize1(neighbor& nb)
{
- comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
- comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
+ comm.ctrl_send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
+ comm.data_send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
}
void process::finalize()
bind(&process::finalize1, this, _1));
while (may_receive()) {
- comm.flush(false);
+ comm.ctrl_flush(false);
+ comm.data_flush(false);
XBT_DEBUG("waiting for %d CTRL and %d DATA CLOSE",
ctrl_close_pending, data_close_pending);
receive(-1.0);
}
- comm.flush(true);
+ comm.ctrl_flush(true);
+ comm.data_flush(true);
}
#define print_loads_generic(vec, verbose, logp, cat) \