]> AND Private Git Repository - loba.git/blobdiff - process.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Separate ctrl and data communications.
[loba.git] / process.cpp
index 34ddbea2b713d3071ee24d83bda4ef7c3269c6e9..39646cd2c6b493f76dd974c82b42c14e16d1ec0f 100644 (file)
@@ -204,10 +204,12 @@ void process::send(neighbor& nb, double amount)
 void process::send1_no_bookkeeping(neighbor& nb)
 {
     if (real_load != prev_load_broadcast)
-        comm.send(nb.get_ctrl_mbox(), new message(message::INFO, real_load));
+        comm.ctrl_send(nb.get_ctrl_mbox(),
+                       new message(message::INFO, real_load));
     double load_to_send = nb.get_to_send();
     if (load_to_send > 0.0) {
-        comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+        comm.data_send(nb.get_data_mbox(),
+                       new message(message::LOAD, load_to_send));
         nb.set_to_send(0.0);
     }
 }
@@ -215,14 +217,14 @@ void process::send1_no_bookkeeping(neighbor& nb)
 void process::send1_bookkeeping(neighbor& nb)
 {
     if (expected_load != prev_load_broadcast)
-        comm.send(nb.get_ctrl_mbox(),
-                  new message(message::INFO, expected_load));
+        comm.ctrl_send(nb.get_ctrl_mbox(),
+                       new message(message::INFO, expected_load));
     double load_to_send;
     double new_debt;
     double debt_to_send = nb.get_to_send();
     if (debt_to_send > 0.0) {
-        comm.send(nb.get_ctrl_mbox(),
-                  new message(message::CREDIT, debt_to_send));
+        comm.ctrl_send(nb.get_ctrl_mbox(),
+                       new message(message::CREDIT, debt_to_send));
         nb.set_to_send(0.0);
         new_debt = nb.get_debt() + debt_to_send;
     } else {
@@ -238,7 +240,8 @@ void process::send1_bookkeeping(neighbor& nb)
         real_load -= load_to_send;
     }
     if (load_to_send > 0.0)
-        comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+        comm.data_send(nb.get_data_mbox(),
+                       new message(message::LOAD, load_to_send));
 }
 
 void process::send_all()
@@ -255,7 +258,8 @@ void process::send_all()
                       bind(&process::send1_no_bookkeeping, this, _1));
         prev_load_broadcast = real_load;
     }
-    comm.flush(false);
+    comm.ctrl_flush(false);
+    comm.data_flush(false);
 }
 
 void process::receive(double timeout)
@@ -264,7 +268,8 @@ void process::receive(double timeout)
     m_host_t from;
 
     XBT_DEBUG("%sblocking receive (%g)", "\0non-" + !timeout, timeout);
-    while (may_receive() && comm.recv(msg, from, timeout)) {
+    while (may_receive() && (comm.ctrl_recv(msg, from, timeout) ||
+                             comm.data_recv(msg, from, timeout))) {
         switch (msg->get_type()) {
         case message::INFO: {
             neighbor* n = rev_neigh[from];
@@ -293,13 +298,14 @@ void process::receive(double timeout)
         delete msg;
         timeout = 0.0;          // only wait on first recv
     }
-    comm.flush(false);
+    comm.ctrl_flush(false);
+    comm.data_flush(false);
 }
 
 void process::finalize1(neighbor& nb)
 {
-    comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
-    comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
+    comm.ctrl_send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
+    comm.data_send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
 }
 
 void process::finalize()
@@ -315,13 +321,15 @@ void process::finalize()
                   bind(&process::finalize1, this, _1));
 
     while (may_receive()) {
-        comm.flush(false);
+        comm.ctrl_flush(false);
+        comm.data_flush(false);
         XBT_DEBUG("waiting for %d CTRL and %d DATA CLOSE",
                   ctrl_close_pending, data_close_pending);
         receive(-1.0);
     }
 
-    comm.flush(true);
+    comm.ctrl_flush(true);
+    comm.data_flush(true);
 }
 
 #define print_loads_generic(vec, verbose, logp, cat)                    \