neigh.assign(argv + 2, argv + argc);
expected_load = load;
+
ctrl_close_pending = data_close_pending = neigh.size();
+ if (neigh.size() == 1) {
+ comm.next_close_on_ctrl_is_last();
+ comm.next_close_on_data_is_last();
+ }
+ if (neigh.size() > 0)
+ comm.listen();
e_xbt_log_priority_t logp = xbt_log_priority_verbose;
if (!LOG_ISENABLED(logp))
int process::run()
{
- INFO0("Coucou !");
+ bool one_more = true;
+ unsigned iter = 0;
+ VERB0("Starting...");
+ while (one_more) {
+ bool close_received;
- int n = 100;
- while (n--) {
if (opt::bookkeeping)
- INFO2("current load: %g ; expected: %g", load, expected_load);
+ INFO3("(%u) current load: %g ; expected: %g",
+ iter, load, expected_load);
else
- INFO1("current load: %g", load);
+ INFO2("(%u) current load: %g",
+ iter, load);
- if (load > 0)
- compute();
- else
- xbt_sleep(100); // fixme
- if (!receive(false))
- n = 0;
+ compute();
+ close_received = !receive(false);
+
+ /*
+ * compute load balancing;
+ * send tasks to neighbors;
+ */
+
+ comm.flush(false);
+ ++iter;
+
+ if (opt::exit_on_close && close_received)
+ one_more = false;
+ if (opt::maxiter && iter >= opt::maxiter)
+ one_more = false;
}
- DEBUG0("going to finalize.");
+ VERB0("Going to finalize...");
finalize();
- // MSG_process_sleep(100.0); // xxx
- /* xxx:
- * while (there is something to do) {
- * compute some task;
- * get received tasks;
- * compute load balancing;
- * send tasks to neighbors;
- * }
- * finalize;
- * wait for pending messages;
- */
-
/* Open Questions :
* - definition of load on heterogeneous hosts ?
* - how to detect convergence ?
* - how to manage link failures ?
*/
- DEBUG0("done.");
+ VERB0("Done.");
return 0;
}
void process::compute()
{
+ // fixme: shall we do something special when duration is 0 ?
double duration = opt::comp_cost(load);
m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
DEBUG2("compute %g flop%s.", duration, ESSE(duration));
MSG_task_destroy(task);
}
+
+// Returns false if a CLOSE message was received.
bool process::receive(bool wait_for_close)
{
bool result = true;
m_host_t from;
while ((ctrl_close_pending ||
data_close_pending) && comm.recv(msg, from, wait_for_close)) {
+ DEBUG2("received %s from %s",
+ msg->to_string().c_str(), MSG_host_get_name(from));
switch (msg->get_type()) {
case message::INFO:
- DEBUG0("received INFO");
// fixme: update neighbor
// need a map m_host_t -> neighbor&
break;
case message::CREDIT:
- DEBUG0("received CREDIT");
expected_load += msg->get_amount();
break;
case message::LOAD:
- DEBUG0("received LOAD");
load += msg->get_amount();
break;
case message::CTRL_CLOSE:
- DEBUG0("received CTRL_CLOSE");
if (--ctrl_close_pending == 1)
comm.next_close_on_ctrl_is_last();
+ DEBUG1("ctrl_close_pending = %d", ctrl_close_pending);
result = false;
break;
case message::DATA_CLOSE:
- DEBUG0("received DATA_CLOSE");
if (--data_close_pending == 1)
comm.next_close_on_data_is_last();
+ DEBUG1("data_close_pending = %d", data_close_pending);
result = false;
break;
}
(int )neigh.size(), ESSE(neigh.size()));
receive(true);
- comm.wait_for_sent();
+ comm.flush(true);
}
void process::print_loads(e_xbt_log_priority_t logp)