+#include "process.h"
+
#include <algorithm>
#include <tr1/functional>
#include <iterator>
#include <xbt/time.h>
#include "misc.h"
#include "options.h"
-#include "process.h"
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
throw std::invalid_argument("bad or missing initial load");
neigh.assign(argv + 2, argv + argc);
+
+ std::for_each(neigh.begin(), neigh.end(),
+ std::tr1::bind(&process::insert_neighbor_in_map,
+ this, std::tr1::placeholders::_1));
+
expected_load = load;
ctrl_close_pending = data_close_pending = neigh.size();
e_xbt_log_priority_t logp = xbt_log_priority_verbose;
if (!LOG_ISENABLED(logp))
return;
- LOG1(logp, "My initial load is: %g", load);
std::ostringstream oss;
oss << neigh.size() << " neighbor";
if (!neigh.empty()) {
print_loads(logp);
}
+process::~process()
+{
+}
+
int process::run()
{
bool one_more = true;
- unsigned iter = 0;
+
+ INFO1("Initial load: %g", load);
VERB0("Starting...");
+ iter = 0;
while (one_more) {
- bool close_received;
+ ++iter;
- if (opt::bookkeeping)
- INFO3("(%u) current load: %g ; expected: %g",
- iter, load, expected_load);
- else
- INFO2("(%u) current load: %g",
- iter, load);
+ if (opt::log_rate && iter % opt::log_rate == 0) {
+ if (opt::bookkeeping)
+ INFO3("(%u) current load: %g ; expected: %g",
+ iter, load, expected_load);
+ else
+ INFO2("(%u) current load: %g",
+ iter, load);
+ }
compute();
- close_received = !receive(false);
- /*
- * compute load balancing;
- * send tasks to neighbors;
- */
+ bool close_received = !receive(false);
+
+ if (opt::bookkeeping)
+ expected_load -= load_balance(expected_load);
+ else
+ load -= load_balance(load);
+ send();
comm.flush(false);
- ++iter;
if (opt::exit_on_close && close_received)
one_more = false;
*/
VERB0("Done.");
+ if (opt::bookkeeping)
+ INFO4("Final load after %d iteration%s: %g ; expected: %g",
+ iter, ESSE(iter), load, expected_load);
+ else
+ INFO3("Final load after %d iteration%s: %g", iter, ESSE(iter), load);
return 0;
}
MSG_task_destroy(task);
}
+double process::load_balance(double /*my_load*/)
+{
+ return 0.0;
+}
+
+void process::send1_no_bookkeeping(neighbor& nb)
+{
+ comm.send(nb.get_ctrl_mbox(), new message(message::INFO, load));
+ double load_to_send = nb.get_to_send();
+ if (load_to_send > 0.0) {
+ comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+ nb.set_to_send(0.0);
+ }
+}
+
+void process::send1_bookkeeping(neighbor& nb)
+{
+ comm.send(nb.get_ctrl_mbox(), new message(message::INFO, expected_load));
+ double load_to_send;
+ double new_debt;
+ double debt_to_send = nb.get_to_send();
+ if (debt_to_send > 0.0) {
+ comm.send(nb.get_ctrl_mbox(),
+ new message(message::CREDIT, debt_to_send));
+ nb.set_to_send(0.0);
+ new_debt = nb.get_debt() + debt_to_send;
+ } else {
+ new_debt = nb.get_debt();
+ }
+ if (load <= new_debt) {
+ load_to_send = load;
+ nb.set_debt(new_debt - load_to_send);
+ load = 0.0;
+ } else {
+ load_to_send = new_debt;
+ nb.set_debt(0.0);
+ load -= load_to_send;
+ }
+ if (load_to_send > 0.0)
+ comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+}
+
+void process::send()
+{
+ // fixme: shall we send data at all iterations?
+ if (opt::bookkeeping) {
+ std::for_each(neigh.begin(), neigh.end(),
+ std::tr1::bind(&process::send1_bookkeeping,
+ this, std::tr1::placeholders::_1));
+ } else {
+ std::for_each(neigh.begin(), neigh.end(),
+ std::tr1::bind(&process::send1_no_bookkeeping,
+ this, std::tr1::placeholders::_1));
+ }
+}
// Returns false if a CLOSE message was received.
bool process::receive(bool wait_for_close)
DEBUG2("received %s from %s",
msg->to_string().c_str(), MSG_host_get_name(from));
switch (msg->get_type()) {
- case message::INFO:
- // fixme: update neighbor
- // need a map m_host_t -> neighbor&
+ case message::INFO: {
+ neighbor* n = rev_neigh[from];
+ n->set_load(msg->get_amount());
break;
+ }
case message::CREDIT:
expected_load += msg->get_amount();
break;
return result;
}
+void process::finalize1(neighbor& nb)
+{
+ comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
+ comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
+}
+
void process::finalize()
{
DEBUG2("send CLOSE to %d neighbor%s.",
(int )neigh.size(), ESSE(neigh.size()));
- std::vector<neighbor>::iterator n;
- for (n = neigh.begin() ; n != neigh.end() ; ++n) {
- comm.send(n->get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
- comm.send(n->get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
- }
+ std::for_each(neigh.begin(), neigh.end(),
+ std::tr1::bind(&process::finalize1,
+ this, std::tr1::placeholders::_1));
DEBUG2("wait for CLOSE from %d neighbor%s.",
(int )neigh.size(), ESSE(neigh.size()));
LOG1(logp, "Neighbor loads: %s", oss.str().c_str());
}
+void process::insert_neighbor_in_map(neighbor& nb)
+{
+ neighbor* nbp = &nb;
+ m_host_t host = MSG_get_host_by_name(nb.get_name());
+ rev_neigh.insert(std::make_pair(host, nbp));
+}
+
// Local variables:
// mode: c++
// End: