neigh.assign(argv + 2, argv + argc);
expected_load = load;
+
ctrl_close_pending = data_close_pending = neigh.size();
+ if (neigh.size() == 1) {
+ comm.next_close_on_ctrl_is_last();
+ comm.next_close_on_data_is_last();
+ }
+ if (neigh.size() > 0)
+ comm.listen();
e_xbt_log_priority_t logp = xbt_log_priority_verbose;
if (!LOG_ISENABLED(logp))
return;
- LOG1(logp, "My initial load is: %g", load);
std::ostringstream oss;
oss << neigh.size() << " neighbor";
if (!neigh.empty()) {
int process::run()
{
- INFO0("Coucou !");
-
- int n = 100;
- while (n--) {
- if (opt::bookkeeping)
- INFO2("current load: %g ; expected: %g", load, expected_load);
- else
- INFO1("current load: %g", load);
-
- if (load > 0)
- compute();
- else
- xbt_sleep(100); // fixme
- if (!receive(false))
- n = 0;
+ bool one_more = true;
+ unsigned iter = 0;
+
+ INFO1("Initial load: %g", load);
+ VERB0("Starting...");
+ while (one_more) {
+ bool close_received;
+ ++iter;
+
+ if (opt::log_rate && iter % opt::log_rate == 0) {
+ if (opt::bookkeeping)
+ INFO3("(%u) current load: %g ; expected: %g",
+ iter, load, expected_load);
+ else
+ INFO2("(%u) current load: %g",
+ iter, load);
+ }
+
+ compute();
+ close_received = !receive(false);
+
+ /*
+ * compute load balancing;
+ * send tasks to neighbors;
+ */
+
+ comm.flush(false);
+
+ if (opt::exit_on_close && close_received)
+ one_more = false;
+ if (opt::maxiter && iter >= opt::maxiter)
+ one_more = false;
}
- DEBUG0("going to finalize.");
+ VERB0("Going to finalize...");
finalize();
- // MSG_process_sleep(100.0); // xxx
- /* xxx:
- * while (there is something to do) {
- * compute some task;
- * get received tasks;
- * compute load balancing;
- * send tasks to neighbors;
- * }
- * finalize;
- * wait for pending messages;
- */
-
/* Open Questions :
* - definition of load on heterogeneous hosts ?
* - how to detect convergence ?
* - how to manage link failures ?
*/
- DEBUG0("done.");
+ VERB0("Done.");
+ if (opt::bookkeeping)
+ INFO4("Final load after %d iteration%s: %g ; expected: %g",
+ iter, ESSE(iter), load, expected_load);
+ else
+ INFO3("Final load after %d iteration%s: %g", iter, ESSE(iter), load);
return 0;
}
void process::compute()
{
+ // fixme: shall we do something special when duration is 0 ?
double duration = opt::comp_cost(load);
m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
DEBUG2("compute %g flop%s.", duration, ESSE(duration));
MSG_task_destroy(task);
}
+
+// Returns false if a CLOSE message was received.
bool process::receive(bool wait_for_close)
{
bool result = true;
m_host_t from;
while ((ctrl_close_pending ||
data_close_pending) && comm.recv(msg, from, wait_for_close)) {
+ DEBUG2("received %s from %s",
+ msg->to_string().c_str(), MSG_host_get_name(from));
switch (msg->get_type()) {
case message::INFO:
- DEBUG0("received INFO");
// fixme: update neighbor
// need a map m_host_t -> neighbor&
break;
case message::CREDIT:
- DEBUG0("received CREDIT");
expected_load += msg->get_amount();
break;
case message::LOAD:
- DEBUG0("received LOAD");
load += msg->get_amount();
break;
case message::CTRL_CLOSE:
- DEBUG0("received CTRL_CLOSE");
if (--ctrl_close_pending == 1)
comm.next_close_on_ctrl_is_last();
+ DEBUG1("ctrl_close_pending = %d", ctrl_close_pending);
result = false;
break;
case message::DATA_CLOSE:
- DEBUG0("received DATA_CLOSE");
if (--data_close_pending == 1)
comm.next_close_on_data_is_last();
+ DEBUG1("data_close_pending = %d", data_close_pending);
result = false;
break;
}
(int )neigh.size(), ESSE(neigh.size()));
receive(true);
- comm.wait_for_sent();
+ comm.flush(true);
}
void process::print_loads(e_xbt_log_priority_t logp)