]> AND Private Git Repository - loba.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Bug fixed: use a timeout on receive.
authorArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Tue, 18 Jan 2011 22:26:24 +0000 (23:26 +0100)
committerArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Tue, 18 Jan 2011 22:26:24 +0000 (23:26 +0100)
TODO
communicator.cpp
communicator.h
process.cpp
process.h

diff --git a/TODO b/TODO
index 3152ad98d5d06b57c2cffa912612468d1d610318..2083dab8e9c23f6f735ea1fb28058628ac2c7c09 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,15 +1,4 @@
-* segfault with ./loba cluster1000.xml -N64 -a fairstrategy
-
-    this is a deadlock occuring when:
-    - a process is in the finalize stage;
-    - all processes  but one are blocked on receive;
-    - the process that is still running owns all the remaining load,
-      and sends it all to the finalizing process, and then goes in
-      blocking receive.
-    The finalizing process receives the load, and blocks again,
-    waiting for a close message.
-    All processes are then blocked, and non-one is able to see that
-    there is no more load in the system!
+* review receive with timeout.
 
 * verify bookkeeping version.
 
index 8adb258e536b83c0c85d9fbf7524ad71125732f4..90cf990ebb07c31f70d30f89e6723a94bd78bdca 100644 (file)
@@ -83,13 +83,26 @@ void communicator::send(const char* dest, message* msg)
     sent_comm.push_back(comm);
 }
 
-bool communicator::recv(message*& msg, m_host_t& from, bool wait)
+bool communicator::recv(message*& msg, m_host_t& from, double timeout)
 {
-    if (wait) {
+    if (timeout != 0) {
+        volatile double deadline =
+            timeout > 0 ? MSG_get_clock() + timeout : 0.0;
         xbt_mutex_acquire(mutex);
-        while (received.empty()) {
+        while (received.empty() && (!deadline || deadline > MSG_get_clock())) {
+            xbt_ex_t e;
             DEBUG0("waiting for a message to come");
-            xbt_cond_wait(cond, mutex);
+            TRY {
+                if (deadline)
+                    xbt_cond_timedwait(cond, mutex, deadline - MSG_get_clock());
+                else
+                    xbt_cond_wait(cond, mutex);
+            }
+            CATCH (e) {
+                if (e.category != timeout_error)
+                    RETHROW;
+                xbt_ex_free(e);
+            }
         }
         xbt_mutex_release(mutex);
     }
index f00e063494d14aa2680ccda94207797af6550a7b..66dc800b4feeea9502902348847fa85757e41682 100644 (file)
@@ -34,8 +34,9 @@ public:
     void send(const char* dest, message* msg);
 
     // Try to get a message.  Returns true on success.
-    // If "wait" is true, blocks until success.
-    bool recv(message*& msg, m_host_t& from, bool wait);
+    // Parameter "timeout" may be 0 for non-blocking operation, -1 for
+    // infinite waiting, or any positive timeout.
+    bool recv(message*& msg, m_host_t& from, double timeout);
 
     // Try to flush pending sending communications.
     // If "wait" is true, blocks until success.
index 0544b21d9e5b62b08a6a09a25618386913c0d377..0f535763510940caae9cf9373b36a83b4c1f0694 100644 (file)
@@ -123,8 +123,14 @@ int process::run()
 
         // block on receiving unless there is something to compute or
         // to send
-        bool wait = (load == 0 && lb_load() == prev_load_broadcast);
-        receive(wait);
+        double timeout;
+        if (load != 0 || lb_load() != prev_load_broadcast)
+            timeout = 0.0;
+        else if (opt::min_iter_duration)
+            timeout = opt::min_iter_duration;
+        else
+            timeout = 1.0;
+        receive(timeout);
 
         // one of our neighbor is finalizing
         if (opt::exit_on_close && close_received) {
@@ -261,13 +267,13 @@ void process::send()
     comm.flush(false);
 }
 
-void process::receive(bool wait)
+void process::receive(double timeout)
 {
     message* msg;
     m_host_t from;
 
-    DEBUG1("%sblocking receive", "\0non-" + !wait);
-    while (may_receive() && comm.recv(msg, from, wait)) {
+    DEBUG2("%sblocking receive (%g)", "\0non-" + !timeout, timeout);
+    while (may_receive() && comm.recv(msg, from, timeout)) {
         switch (msg->get_type()) {
         case message::INFO: {
             neighbor* n = rev_neigh[from];
@@ -294,7 +300,7 @@ void process::receive(bool wait)
             break;
         }
         delete msg;
-        wait = false;           // only wait on first recv
+        timeout = 0.0;          // only wait on first recv
     }
     comm.flush(false);
 }
@@ -322,7 +328,7 @@ void process::finalize()
            (unsigned long )neigh.size(), ESSE(neigh.size()));
     while (may_receive()) {
         comm.flush(false);
-        receive(true);
+        receive(-1.0);
     }
 
     comm.flush(true);
index b124593cae368cc943779ce85bcbc6b861e61088..fb1fe3f377f9626a5af4fc0bc204e78c5691c5a9 100644 (file)
--- a/process.h
+++ b/process.h
@@ -99,8 +99,10 @@ private:
     // Returns true if there remains neighbors to listen for
     bool may_receive() { return ctrl_close_pending || data_close_pending; }
 
-    // Receive procedure: wait (or not) for a message to come
-    void receive(bool wait);
+    // Receive procedure
+    // Parameter "timeout" may be 0 for non-blocking operation, -1 for
+    // infinite waiting, or any positive timeout.
+    void receive(double timeout);
 
     // Finalize sends a "close" message to each neighbor and wait for
     // all of them to answer.