X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/blobdiff_plain/0e9987fcef822119e3218d14328eb39977525662..271654d06dedba35eab7c26069a42bac89b0cee4:/process.cpp diff --git a/process.cpp b/process.cpp index 34ddbea..39646cd 100644 --- a/process.cpp +++ b/process.cpp @@ -204,10 +204,12 @@ void process::send(neighbor& nb, double amount) void process::send1_no_bookkeeping(neighbor& nb) { if (real_load != prev_load_broadcast) - comm.send(nb.get_ctrl_mbox(), new message(message::INFO, real_load)); + comm.ctrl_send(nb.get_ctrl_mbox(), + new message(message::INFO, real_load)); double load_to_send = nb.get_to_send(); if (load_to_send > 0.0) { - comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send)); + comm.data_send(nb.get_data_mbox(), + new message(message::LOAD, load_to_send)); nb.set_to_send(0.0); } } @@ -215,14 +217,14 @@ void process::send1_no_bookkeeping(neighbor& nb) void process::send1_bookkeeping(neighbor& nb) { if (expected_load != prev_load_broadcast) - comm.send(nb.get_ctrl_mbox(), - new message(message::INFO, expected_load)); + comm.ctrl_send(nb.get_ctrl_mbox(), + new message(message::INFO, expected_load)); double load_to_send; double new_debt; double debt_to_send = nb.get_to_send(); if (debt_to_send > 0.0) { - comm.send(nb.get_ctrl_mbox(), - new message(message::CREDIT, debt_to_send)); + comm.ctrl_send(nb.get_ctrl_mbox(), + new message(message::CREDIT, debt_to_send)); nb.set_to_send(0.0); new_debt = nb.get_debt() + debt_to_send; } else { @@ -238,7 +240,8 @@ void process::send1_bookkeeping(neighbor& nb) real_load -= load_to_send; } if (load_to_send > 0.0) - comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send)); + comm.data_send(nb.get_data_mbox(), + new message(message::LOAD, load_to_send)); } void process::send_all() @@ -255,7 +258,8 @@ void process::send_all() bind(&process::send1_no_bookkeeping, this, _1)); prev_load_broadcast = real_load; } - comm.flush(false); + comm.ctrl_flush(false); + comm.data_flush(false); } void process::receive(double timeout) @@ -264,7 +268,8 @@ void process::receive(double timeout) m_host_t from; XBT_DEBUG("%sblocking receive (%g)", "\0non-" + !timeout, timeout); - while (may_receive() && comm.recv(msg, from, timeout)) { + while (may_receive() && (comm.ctrl_recv(msg, from, timeout) || + comm.data_recv(msg, from, timeout))) { switch (msg->get_type()) { case message::INFO: { neighbor* n = rev_neigh[from]; @@ -293,13 +298,14 @@ void process::receive(double timeout) delete msg; timeout = 0.0; // only wait on first recv } - comm.flush(false); + comm.ctrl_flush(false); + comm.data_flush(false); } void process::finalize1(neighbor& nb) { - comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0)); - comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0)); + comm.ctrl_send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0)); + comm.data_send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0)); } void process::finalize() @@ -315,13 +321,15 @@ void process::finalize() bind(&process::finalize1, this, _1)); while (may_receive()) { - comm.flush(false); + comm.ctrl_flush(false); + comm.data_flush(false); XBT_DEBUG("waiting for %d CTRL and %d DATA CLOSE", ctrl_close_pending, data_close_pending); receive(-1.0); } - comm.flush(true); + comm.ctrl_flush(true); + comm.data_flush(true); } #define print_loads_generic(vec, verbose, logp, cat) \