X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/blobdiff_plain/c86e5ff42db3495538c8b48cb7ea0943a13702b7..88ba6cd5952a9a7a58a8a0d9c5ae0691a1fdc5e1:/process.cpp?ds=sidebyside diff --git a/process.cpp b/process.cpp index 4501dab..39646cd 100644 --- a/process.cpp +++ b/process.cpp @@ -88,7 +88,7 @@ int process::run() double now = MSG_get_clock(); if (now < next_iter_after_date) MSG_process_sleep(next_iter_after_date - now); - next_iter_after_date = MSG_get_clock() + opt::min_iter_duration; + next_iter_after_date = MSG_get_clock() + opt::min_lb_iter_duration; ++lb_iter; @@ -131,8 +131,8 @@ int process::run() double timeout; if (real_load != 0 || get_load() != prev_load_broadcast) timeout = 0.0; - else if (opt::min_iter_duration) - timeout = opt::min_iter_duration; + else if (opt::min_lb_iter_duration) + timeout = opt::min_lb_iter_duration; else timeout = 1.0; receive(timeout); @@ -204,10 +204,12 @@ void process::send(neighbor& nb, double amount) void process::send1_no_bookkeeping(neighbor& nb) { if (real_load != prev_load_broadcast) - comm.send(nb.get_ctrl_mbox(), new message(message::INFO, real_load)); + comm.ctrl_send(nb.get_ctrl_mbox(), + new message(message::INFO, real_load)); double load_to_send = nb.get_to_send(); if (load_to_send > 0.0) { - comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send)); + comm.data_send(nb.get_data_mbox(), + new message(message::LOAD, load_to_send)); nb.set_to_send(0.0); } } @@ -215,14 +217,14 @@ void process::send1_no_bookkeeping(neighbor& nb) void process::send1_bookkeeping(neighbor& nb) { if (expected_load != prev_load_broadcast) - comm.send(nb.get_ctrl_mbox(), - new message(message::INFO, expected_load)); + comm.ctrl_send(nb.get_ctrl_mbox(), + new message(message::INFO, expected_load)); double load_to_send; double new_debt; double debt_to_send = nb.get_to_send(); if (debt_to_send > 0.0) { - comm.send(nb.get_ctrl_mbox(), - new message(message::CREDIT, debt_to_send)); + comm.ctrl_send(nb.get_ctrl_mbox(), + new message(message::CREDIT, debt_to_send)); nb.set_to_send(0.0); new_debt = nb.get_debt() + debt_to_send; } else { @@ -238,7 +240,8 @@ void process::send1_bookkeeping(neighbor& nb) real_load -= load_to_send; } if (load_to_send > 0.0) - comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send)); + comm.data_send(nb.get_data_mbox(), + new message(message::LOAD, load_to_send)); } void process::send_all() @@ -255,7 +258,8 @@ void process::send_all() bind(&process::send1_no_bookkeeping, this, _1)); prev_load_broadcast = real_load; } - comm.flush(false); + comm.ctrl_flush(false); + comm.data_flush(false); } void process::receive(double timeout) @@ -264,7 +268,8 @@ void process::receive(double timeout) m_host_t from; XBT_DEBUG("%sblocking receive (%g)", "\0non-" + !timeout, timeout); - while (may_receive() && comm.recv(msg, from, timeout)) { + while (may_receive() && (comm.ctrl_recv(msg, from, timeout) || + comm.data_recv(msg, from, timeout))) { switch (msg->get_type()) { case message::INFO: { neighbor* n = rev_neigh[from]; @@ -293,13 +298,14 @@ void process::receive(double timeout) delete msg; timeout = 0.0; // only wait on first recv } - comm.flush(false); + comm.ctrl_flush(false); + comm.data_flush(false); } void process::finalize1(neighbor& nb) { - comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0)); - comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0)); + comm.ctrl_send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0)); + comm.data_send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0)); } void process::finalize() @@ -310,19 +316,20 @@ void process::finalize() finalizing = true; total_load_running -= real_load; - XBT_DEBUG("send CLOSE to %lu neighbor%s", - (unsigned long )neigh.size(), ESSE(neigh.size())); + XBT_DEBUG("send CLOSE to %zu neighbor%s", neigh.size(), ESSE(neigh.size())); std::for_each(neigh.begin(), neigh.end(), bind(&process::finalize1, this, _1)); while (may_receive()) { - comm.flush(false); + comm.ctrl_flush(false); + comm.data_flush(false); XBT_DEBUG("waiting for %d CTRL and %d DATA CLOSE", ctrl_close_pending, data_close_pending); receive(-1.0); } - comm.flush(true); + comm.ctrl_flush(true); + comm.data_flush(true); } #define print_loads_generic(vec, verbose, logp, cat) \