]> AND Private Git Repository - loba.git/blobdiff - process.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Wip++...
[loba.git] / process.cpp
index e9e234864dfbe733623aeb3ad9edb128a8658e52..d8ec50036a4d6d9d2e93568bc944604bba118375 100644 (file)
@@ -18,7 +18,14 @@ process::process(int argc, char* argv[])
 
     neigh.assign(argv + 2, argv + argc);
     expected_load = load;
 
     neigh.assign(argv + 2, argv + argc);
     expected_load = load;
+
     ctrl_close_pending = data_close_pending = neigh.size();
     ctrl_close_pending = data_close_pending = neigh.size();
+    if (neigh.size() == 1) {
+        comm.next_close_on_ctrl_is_last();
+        comm.next_close_on_data_is_last();
+    }
+    if (neigh.size() > 0)
+        comm.listen();
 
     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
     if (!LOG_ISENABLED(logp))
 
     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
     if (!LOG_ISENABLED(logp))
@@ -39,49 +46,51 @@ process::process(int argc, char* argv[])
 
 int process::run()
 {
 
 int process::run()
 {
-    INFO0("Coucou !");
+    bool one_more = true;
+    unsigned iter = 0;
+    VERB0("Starting...");
+    while (one_more) {
+        bool close_received;
 
 
-    int n = 100;
-    while (n--) {
         if (opt::bookkeeping)
         if (opt::bookkeeping)
-            INFO2("current load: %g ; expected: %g", load, expected_load);
+            INFO3("(%u) current load: %g ; expected: %g",
+                  iter, load, expected_load);
         else
         else
-            INFO1("current load: %g", load);
+            INFO2("(%u) current load: %g",
+                  iter, load);
 
 
-        if (load > 0)
-            compute();
-        else
-            xbt_sleep(100);        // fixme
-        if (!receive(false))
-            n = 0;
+        compute();
+        close_received = !receive(false);
+
+        /*
+         *    compute load balancing;
+         *    send tasks to neighbors;
+         */
+
+        comm.flush(false);
+        ++iter;
+
+        if (opt::exit_on_close && close_received)
+            one_more = false;
+        if (opt::maxiter && iter >= opt::maxiter)
+            one_more = false;
     }
     }
-    DEBUG0("going to finalize.");
+    VERB0("Going to finalize...");
     finalize();
 
     finalize();
 
-    //    MSG_process_sleep(100.0);   // xxx
-    /* xxx:
-     * while (there is something to do) {
-     *    compute some task;
-     *    get received tasks;
-     *    compute load balancing;
-     *    send tasks to neighbors;
-     * }
-     * finalize;
-     * wait for pending messages;
-     */
-
     /* Open Questions :
      * - definition of load on heterogeneous hosts ?
      * - how to detect convergence ?
      * - how to manage link failures ?
      */
 
     /* Open Questions :
      * - definition of load on heterogeneous hosts ?
      * - how to detect convergence ?
      * - how to manage link failures ?
      */
 
-    DEBUG0("done.");
+    VERB0("Done.");
     return 0;
 }
 
 void process::compute()
 {
     return 0;
 }
 
 void process::compute()
 {
+    // fixme: shall we do something special when duration is 0 ?
     double duration = opt::comp_cost(load);
     m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
     DEBUG2("compute %g flop%s.", duration, ESSE(duration));
     double duration = opt::comp_cost(load);
     m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
     DEBUG2("compute %g flop%s.", duration, ESSE(duration));
@@ -89,6 +98,8 @@ void process::compute()
     MSG_task_destroy(task);
 }
 
     MSG_task_destroy(task);
 }
 
+
+// Returns false if a CLOSE message was received. 
 bool process::receive(bool wait_for_close)
 {
     bool result = true;
 bool process::receive(bool wait_for_close)
 {
     bool result = true;
@@ -96,30 +107,29 @@ bool process::receive(bool wait_for_close)
     m_host_t from;
     while ((ctrl_close_pending ||
             data_close_pending) && comm.recv(msg, from, wait_for_close)) {
     m_host_t from;
     while ((ctrl_close_pending ||
             data_close_pending) && comm.recv(msg, from, wait_for_close)) {
+        DEBUG2("received %s from %s",
+               msg->to_string().c_str(), MSG_host_get_name(from));
         switch (msg->get_type()) {
         case message::INFO:
         switch (msg->get_type()) {
         case message::INFO:
-            DEBUG0("received INFO");
             // fixme: update neighbor
             // need a map m_host_t -> neighbor&
             break;
         case message::CREDIT:
             // fixme: update neighbor
             // need a map m_host_t -> neighbor&
             break;
         case message::CREDIT:
-            DEBUG0("received CREDIT");
             expected_load += msg->get_amount();
             break;
         case message::LOAD:
             expected_load += msg->get_amount();
             break;
         case message::LOAD:
-            DEBUG0("received LOAD");
             load += msg->get_amount();
             break;
         case message::CTRL_CLOSE:
             load += msg->get_amount();
             break;
         case message::CTRL_CLOSE:
-            DEBUG0("received CTRL_CLOSE");
             if (--ctrl_close_pending == 1)
                 comm.next_close_on_ctrl_is_last();
             if (--ctrl_close_pending == 1)
                 comm.next_close_on_ctrl_is_last();
+            DEBUG1("ctrl_close_pending = %d", ctrl_close_pending);
             result = false;
             break;
         case message::DATA_CLOSE:
             result = false;
             break;
         case message::DATA_CLOSE:
-            DEBUG0("received DATA_CLOSE");
             if (--data_close_pending == 1)
                 comm.next_close_on_data_is_last();
             if (--data_close_pending == 1)
                 comm.next_close_on_data_is_last();
+            DEBUG1("data_close_pending = %d", data_close_pending);
             result = false;
             break;
         }
             result = false;
             break;
         }
@@ -142,7 +152,7 @@ void process::finalize()
            (int )neigh.size(), ESSE(neigh.size()));
     receive(true);
 
            (int )neigh.size(), ESSE(neigh.size()));
     receive(true);
 
-    comm.wait_for_sent();
+    comm.flush(true);
 }
 
 void process::print_loads(e_xbt_log_priority_t logp)
 }
 
 void process::print_loads(e_xbt_log_priority_t logp)