]> AND Private Git Repository - loba.git/blobdiff - process.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Documentation updates.
[loba.git] / process.cpp
index 4501dab9517d1290415c0a1c6223d50805badfcd..39646cd2c6b493f76dd974c82b42c14e16d1ec0f 100644 (file)
@@ -88,7 +88,7 @@ int process::run()
             double now = MSG_get_clock();
             if (now < next_iter_after_date)
                 MSG_process_sleep(next_iter_after_date - now);
             double now = MSG_get_clock();
             if (now < next_iter_after_date)
                 MSG_process_sleep(next_iter_after_date - now);
-            next_iter_after_date = MSG_get_clock() + opt::min_iter_duration;
+            next_iter_after_date = MSG_get_clock() + opt::min_lb_iter_duration;
 
             ++lb_iter;
 
 
             ++lb_iter;
 
@@ -131,8 +131,8 @@ int process::run()
         double timeout;
         if (real_load != 0 || get_load() != prev_load_broadcast)
             timeout = 0.0;
         double timeout;
         if (real_load != 0 || get_load() != prev_load_broadcast)
             timeout = 0.0;
-        else if (opt::min_iter_duration)
-            timeout = opt::min_iter_duration;
+        else if (opt::min_lb_iter_duration)
+            timeout = opt::min_lb_iter_duration;
         else
             timeout = 1.0;
         receive(timeout);
         else
             timeout = 1.0;
         receive(timeout);
@@ -204,10 +204,12 @@ void process::send(neighbor& nb, double amount)
 void process::send1_no_bookkeeping(neighbor& nb)
 {
     if (real_load != prev_load_broadcast)
 void process::send1_no_bookkeeping(neighbor& nb)
 {
     if (real_load != prev_load_broadcast)
-        comm.send(nb.get_ctrl_mbox(), new message(message::INFO, real_load));
+        comm.ctrl_send(nb.get_ctrl_mbox(),
+                       new message(message::INFO, real_load));
     double load_to_send = nb.get_to_send();
     if (load_to_send > 0.0) {
     double load_to_send = nb.get_to_send();
     if (load_to_send > 0.0) {
-        comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+        comm.data_send(nb.get_data_mbox(),
+                       new message(message::LOAD, load_to_send));
         nb.set_to_send(0.0);
     }
 }
         nb.set_to_send(0.0);
     }
 }
@@ -215,14 +217,14 @@ void process::send1_no_bookkeeping(neighbor& nb)
 void process::send1_bookkeeping(neighbor& nb)
 {
     if (expected_load != prev_load_broadcast)
 void process::send1_bookkeeping(neighbor& nb)
 {
     if (expected_load != prev_load_broadcast)
-        comm.send(nb.get_ctrl_mbox(),
-                  new message(message::INFO, expected_load));
+        comm.ctrl_send(nb.get_ctrl_mbox(),
+                       new message(message::INFO, expected_load));
     double load_to_send;
     double new_debt;
     double debt_to_send = nb.get_to_send();
     if (debt_to_send > 0.0) {
     double load_to_send;
     double new_debt;
     double debt_to_send = nb.get_to_send();
     if (debt_to_send > 0.0) {
-        comm.send(nb.get_ctrl_mbox(),
-                  new message(message::CREDIT, debt_to_send));
+        comm.ctrl_send(nb.get_ctrl_mbox(),
+                       new message(message::CREDIT, debt_to_send));
         nb.set_to_send(0.0);
         new_debt = nb.get_debt() + debt_to_send;
     } else {
         nb.set_to_send(0.0);
         new_debt = nb.get_debt() + debt_to_send;
     } else {
@@ -238,7 +240,8 @@ void process::send1_bookkeeping(neighbor& nb)
         real_load -= load_to_send;
     }
     if (load_to_send > 0.0)
         real_load -= load_to_send;
     }
     if (load_to_send > 0.0)
-        comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+        comm.data_send(nb.get_data_mbox(),
+                       new message(message::LOAD, load_to_send));
 }
 
 void process::send_all()
 }
 
 void process::send_all()
@@ -255,7 +258,8 @@ void process::send_all()
                       bind(&process::send1_no_bookkeeping, this, _1));
         prev_load_broadcast = real_load;
     }
                       bind(&process::send1_no_bookkeeping, this, _1));
         prev_load_broadcast = real_load;
     }
-    comm.flush(false);
+    comm.ctrl_flush(false);
+    comm.data_flush(false);
 }
 
 void process::receive(double timeout)
 }
 
 void process::receive(double timeout)
@@ -264,7 +268,8 @@ void process::receive(double timeout)
     m_host_t from;
 
     XBT_DEBUG("%sblocking receive (%g)", "\0non-" + !timeout, timeout);
     m_host_t from;
 
     XBT_DEBUG("%sblocking receive (%g)", "\0non-" + !timeout, timeout);
-    while (may_receive() && comm.recv(msg, from, timeout)) {
+    while (may_receive() && (comm.ctrl_recv(msg, from, timeout) ||
+                             comm.data_recv(msg, from, timeout))) {
         switch (msg->get_type()) {
         case message::INFO: {
             neighbor* n = rev_neigh[from];
         switch (msg->get_type()) {
         case message::INFO: {
             neighbor* n = rev_neigh[from];
@@ -293,13 +298,14 @@ void process::receive(double timeout)
         delete msg;
         timeout = 0.0;          // only wait on first recv
     }
         delete msg;
         timeout = 0.0;          // only wait on first recv
     }
-    comm.flush(false);
+    comm.ctrl_flush(false);
+    comm.data_flush(false);
 }
 
 void process::finalize1(neighbor& nb)
 {
 }
 
 void process::finalize1(neighbor& nb)
 {
-    comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
-    comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
+    comm.ctrl_send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
+    comm.data_send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
 }
 
 void process::finalize()
 }
 
 void process::finalize()
@@ -310,19 +316,20 @@ void process::finalize()
     finalizing = true;
     total_load_running -= real_load;
 
     finalizing = true;
     total_load_running -= real_load;
 
-    XBT_DEBUG("send CLOSE to %lu neighbor%s",
-              (unsigned long )neigh.size(), ESSE(neigh.size()));
+    XBT_DEBUG("send CLOSE to %zu neighbor%s", neigh.size(), ESSE(neigh.size()));
     std::for_each(neigh.begin(), neigh.end(),
                   bind(&process::finalize1, this, _1));
 
     while (may_receive()) {
     std::for_each(neigh.begin(), neigh.end(),
                   bind(&process::finalize1, this, _1));
 
     while (may_receive()) {
-        comm.flush(false);
+        comm.ctrl_flush(false);
+        comm.data_flush(false);
         XBT_DEBUG("waiting for %d CTRL and %d DATA CLOSE",
                   ctrl_close_pending, data_close_pending);
         receive(-1.0);
     }
 
         XBT_DEBUG("waiting for %d CTRL and %d DATA CLOSE",
                   ctrl_close_pending, data_close_pending);
         receive(-1.0);
     }
 
-    comm.flush(true);
+    comm.ctrl_flush(true);
+    comm.data_flush(true);
 }
 
 #define print_loads_generic(vec, verbose, logp, cat)                    \
 }
 
 #define print_loads_generic(vec, verbose, logp, cat)                    \