]> AND Private Git Repository - loba.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Use a separate thread to handle incoming messages.
authorArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Thu, 6 Jan 2011 15:59:57 +0000 (16:59 +0100)
committerArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Thu, 6 Jan 2011 15:59:57 +0000 (16:59 +0100)
Incoming messages were taken into account very late,
because of the following scheme:

  ...
* do something (compute, load-balance, ...)
  ...
* receive some message
* create a new asynchronous receive communication

Here, we cannot have received any new message, even
if the sender was ready a long time ago, since the
transfer only started when we created the new communication.
Thus, we start a new iteration.

The solution is to implement a separate thread to handle incoming
messages asynchronously.  Those messages are put in a queue to be
consumed later.

communicator.cpp
communicator.h
process.cpp

index f3e6bf9fae1eddd27ec2f25e11977dfd727d15c4..94cdb04160337353a0a1bab8b2797d88c0ce415d 100644 (file)
@@ -1,4 +1,5 @@
 #include <algorithm>
 #include <algorithm>
+#include <cstring>
 #include <tr1/functional>
 #include <sstream>
 #include <msg/msg.h>
 #include <tr1/functional>
 #include <sstream>
 #include <msg/msg.h>
@@ -24,31 +25,52 @@ const int communicator::send_count_before_flush = 4;
 
 communicator::communicator()
     : host((hostdata* )MSG_host_get_data(MSG_host_self()))
 
 communicator::communicator()
     : host((hostdata* )MSG_host_get_data(MSG_host_self()))
+    , mutex(xbt_mutex_init())
+    , cond(xbt_cond_init())
     , send_counter(0)
     , ctrl_task(NULL)
     , ctrl_comm(NULL)
     , send_counter(0)
     , ctrl_task(NULL)
     , ctrl_comm(NULL)
-    , ctrl_close_is_last(false)
     , data_task(NULL)
     , data_comm(NULL)
     , data_task(NULL)
     , data_comm(NULL)
-    , data_close_is_last(false)
 {
 {
+    xbt_mutex_acquire(mutex);
+    receiver_process =
+        MSG_process_create("receiver", communicator::receiver_wrapper,
+                           this, MSG_host_self());
+    xbt_mutex_release(mutex);
 }
 
 communicator::~communicator()
 {
 }
 
 communicator::~communicator()
 {
+    m_task_t task;
+
+    DEBUG0("send finalize to receiver/ctrl");
+    task = MSG_task_create("finalize", 0.0, 0, NULL);
+    MSG_task_send(task, get_ctrl_mbox());
+
+    DEBUG0("send finalize to receiver/data");
+    task = MSG_task_create("finalize", 0.0, 0, NULL);
+    MSG_task_send(task, get_data_mbox());
+
+    DEBUG0("wait for receiver to terminate");
+    xbt_mutex_acquire(mutex);
+    while (receiver_process)
+        xbt_cond_wait(cond, mutex);
+    xbt_mutex_release(mutex);
+
     if (ctrl_comm)
         WARN0("ctrl_comm is pending!");
     if (data_comm)
         WARN0("data_comm is pending!");
     if (ctrl_comm)
         WARN0("ctrl_comm is pending!");
     if (data_comm)
         WARN0("data_comm is pending!");
+    if (!received.empty())
+        WARN2("lost %lu received message%s!",
+              (unsigned long )received.size(), ESSE(received.size()));
     if (!sent_comm.empty())
     if (!sent_comm.empty())
-        WARN2("lost %ld send communication%s!",
-              (long )sent_comm.size(), ESSE(sent_comm.size()));
-}
+        WARN2("lost %lu sent message%s!",
+              (unsigned long )sent_comm.size(), ESSE(sent_comm.size()));
 
 
-void communicator::listen()
-{
-    ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
-    data_comm = MSG_task_irecv(&data_task, get_data_mbox());
+    xbt_cond_destroy(cond);
+    xbt_mutex_destroy(mutex);
 }
 
 void communicator::send(const char* dest, message* msg)
 }
 
 void communicator::send(const char* dest, message* msg)
@@ -69,48 +91,27 @@ void communicator::send(const char* dest, message* msg)
 
 bool communicator::recv(message*& msg, m_host_t& from, bool wait)
 {
 
 bool communicator::recv(message*& msg, m_host_t& from, bool wait)
 {
-    bool restart;
-    msg = NULL;
+    if (wait) {
+        DEBUG0("suspend main process on recv");
+        xbt_mutex_acquire(mutex);
+        while (received.empty())
+            xbt_cond_wait(cond, mutex);
+        xbt_mutex_release(mutex);
+    }
 
 
-    do {
-        if (ctrl_comm && comm_test_n_destroy(ctrl_comm)) {
-            msg = (message* )MSG_task_get_data(ctrl_task);
-            from = MSG_task_get_source(ctrl_task);
-            MSG_task_destroy(ctrl_task);
-            ctrl_task = NULL;
-            ctrl_comm =
-                (!ctrl_close_is_last || msg->get_type() != message::CTRL_CLOSE)
-                ? MSG_task_irecv(&ctrl_task, get_ctrl_mbox())
-                : NULL;
-
-        } else if (data_comm && comm_test_n_destroy(data_comm)) {
-            msg = (message* )MSG_task_get_data(data_task);
-            from = MSG_task_get_source(data_task);
-            MSG_task_destroy(data_task);
-            data_task = NULL;
-            data_comm =
-                (!data_close_is_last || msg->get_type() != message::DATA_CLOSE)
-                ? MSG_task_irecv(&data_task, get_data_mbox())
-                : NULL;
-        }
+    if (received.empty())
+        return false;
 
 
-        restart = wait && !msg && (ctrl_comm || data_comm);
-        if (restart) {
-            xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
-            if (ctrl_comm)
-                xbt_dynar_push(comms, &ctrl_comm);
-            if (data_comm)
-                xbt_dynar_push(comms, &data_comm);
-            MSG_comm_waitany(comms);
-            xbt_dynar_free(&comms);
-        }
-    } while (restart);
+    m_task_t task = received.front();
+    received.pop();
+    msg = (message* )MSG_task_get_data(task);
+    from = MSG_task_get_source(task);
+    MSG_task_destroy(task);
 
 
-    if (msg)
-        DEBUG2("received %s from %s",
-               msg->to_string().c_str(), MSG_host_get_name(from));
+    DEBUG2("received %s from %s",
+           msg->to_string().c_str(), MSG_host_get_name(from));
 
 
-    return msg != NULL;
+    return true;
 }
 
 void communicator::flush(bool wait)
 }
 
 void communicator::flush(bool wait)
@@ -133,16 +134,6 @@ void communicator::flush(bool wait)
     }
 }
 
     }
 }
 
-void communicator::next_close_on_ctrl_is_last()
-{
-    ctrl_close_is_last = true;
-}
-
-void communicator::next_close_on_data_is_last()
-{
-    data_close_is_last = true;
-}
-
 bool communicator::comm_test_n_destroy(msg_comm_t comm)
 {
     if (MSG_comm_test(comm)) {
 bool communicator::comm_test_n_destroy(msg_comm_t comm)
 {
     if (MSG_comm_test(comm)) {
@@ -152,6 +143,70 @@ bool communicator::comm_test_n_destroy(msg_comm_t comm)
         return false;
 }
 
         return false;
 }
 
+int communicator::receiver_wrapper(int, char* [])
+{
+    communicator* comm;
+    comm = (communicator* )MSG_process_get_data(MSG_process_self());
+    int result = comm->receiver();
+
+    DEBUG0("terminate");
+    xbt_mutex_acquire(comm->mutex);
+    comm->receiver_process = NULL;
+    xbt_cond_signal(comm->cond);
+    xbt_mutex_release(comm->mutex);
+
+    return result;
+}
+
+int communicator::receiver()
+{
+    ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
+    data_comm = MSG_task_irecv(&data_task, get_data_mbox());
+    xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
+    while (ctrl_comm || data_comm) {
+
+        if (ctrl_comm)
+            xbt_dynar_push(comms, &ctrl_comm);
+        if (data_comm)
+            xbt_dynar_push(comms, &data_comm);
+        MSG_comm_waitany(comms);
+        xbt_dynar_reset(comms);
+
+        if (ctrl_comm && comm_test_n_destroy(ctrl_comm)) {
+            if (strcmp(MSG_task_get_name(ctrl_task), "finalize")) {
+                DEBUG0("received message from ctrl");
+                received.push(ctrl_task);
+                ctrl_task = NULL;
+                ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
+            } else {
+                DEBUG0("received finalize from ctrl");
+                MSG_task_destroy(ctrl_task);
+                ctrl_task = NULL;
+                ctrl_comm = NULL;
+            }
+        }
+
+        if (data_comm && comm_test_n_destroy(data_comm)) {
+            if (strcmp(MSG_task_get_name(data_task), "finalize")) {
+                DEBUG0("received message from data");
+                received.push(data_task);
+                data_task = NULL;
+                data_comm = MSG_task_irecv(&data_task, get_data_mbox());
+            } else {
+                DEBUG0("received finalize from data");
+                MSG_task_destroy(data_task);
+                data_task = NULL;
+                data_comm = NULL;
+            }
+        }
+        xbt_mutex_acquire(mutex);
+        xbt_cond_signal(cond);
+        xbt_mutex_release(mutex);
+    }
+    xbt_dynar_free(&comms);
+    return 0;
+}
+
 // Local variables:
 // mode: c++
 // End:
 // Local variables:
 // mode: c++
 // End:
index 465d0cf573b3951abaeb54a109194c207710229d..ba4b86e96598cb06b68db102996f4b8e0ef194b3 100644 (file)
@@ -4,6 +4,7 @@
 #define COMMUNICATOR_H
 
 #include <list>
 #define COMMUNICATOR_H
 
 #include <list>
+#include <queue>
 #include <string>
 #include <msg/msg.h>
 #include "hostdata.h"
 #include <string>
 #include <msg/msg.h>
 #include "hostdata.h"
@@ -29,49 +30,50 @@ public:
     communicator();
     ~communicator();
 
     communicator();
     ~communicator();
 
-    // Start to listen for incoming messages
-    void listen();
-
     // Send a message to the "dest" mailbox
     void send(const char* dest, message* msg);
 
     // Send a message to the "dest" mailbox
     void send(const char* dest, message* msg);
 
-    // Try to receive a message.  Returns true on success.
-    // If "wait" is true, blocks until success or error.
+    // Try to get a message.  Returns true on success.
+    // If "wait" is true, blocks until success.
     bool recv(message*& msg, m_host_t& from, bool wait);
 
     // Try to flush pending sending communications.
     // If "wait" is true, blocks until success.
     void flush(bool wait);
 
     bool recv(message*& msg, m_host_t& from, bool wait);
 
     // Try to flush pending sending communications.
     // If "wait" is true, blocks until success.
     void flush(bool wait);
 
-    // Advertise that the next "close" message is the last one, and
-    // that we do not await any message after that, either on the
-    // control or the data channel.
-    void next_close_on_ctrl_is_last();
-    void next_close_on_data_is_last();
-
 private:
     // Myself
     const hostdata* host;
 
 private:
     // Myself
     const hostdata* host;
 
+    // Used to synchronize main and receiver thread 
+    xbt_mutex_t mutex;
+    xbt_cond_t cond;
+
     // List of pending send communications
     std::list<msg_comm_t> sent_comm;
     static const int send_count_before_flush;
     int send_counter;
 
     // List of pending send communications
     std::list<msg_comm_t> sent_comm;
     static const int send_count_before_flush;
     int send_counter;
 
+    // Queue of received messages
+    std::queue<m_task_t> received;
+
     // Control channel for receiving
     m_task_t    ctrl_task;          // receive buffer
     msg_comm_t  ctrl_comm;          // receive communication
     // Control channel for receiving
     m_task_t    ctrl_task;          // receive buffer
     msg_comm_t  ctrl_comm;          // receive communication
-    bool        ctrl_close_is_last; // do not rearm comm after next close
 
     // Data channel for receiving
     m_task_t    data_task;          // receive buffer
     msg_comm_t  data_comm;          // receive communication
 
     // Data channel for receiving
     m_task_t    data_task;          // receive buffer
     msg_comm_t  data_comm;          // receive communication
-    bool        data_close_is_last; // do not rearm comm after next close
 
     const char* get_ctrl_mbox() const   { return host->get_ctrl_mbox(); }
     const char* get_data_mbox() const   { return host->get_data_mbox(); }
 
 
     const char* get_ctrl_mbox() const   { return host->get_ctrl_mbox(); }
     const char* get_data_mbox() const   { return host->get_data_mbox(); }
 
-    // Used to test if a communication is over, and to destroy it if it is.
+    // Handling of receiving thread
+    m_process_t receiver_process;
+    static int receiver_wrapper(int, char* []);
+    int receiver();
+
+    // Used to test if a communication is over, and to destroy it if it is
     static bool comm_test_n_destroy(msg_comm_t comm);
 };
 
     static bool comm_test_n_destroy(msg_comm_t comm);
 };
 
index 0b27f0bddae784512453174ad6a47a3d7ffeda54..5f393bdb39abb837b279a21e285ee293483fcbea 100644 (file)
@@ -39,16 +39,10 @@ process::process(int argc, char* argv[])
     total_load_init += load;
 
     ctrl_close_pending = data_close_pending = neigh.size();
     total_load_init += load;
 
     ctrl_close_pending = data_close_pending = neigh.size();
-    if (neigh.size() == 1) {
-        comm.next_close_on_ctrl_is_last();
-        comm.next_close_on_data_is_last();
-    }
     close_received = false;
     may_receive =  (neigh.size() > 0); // the same as (ctrl_close_pending ||
                                        //              data_close_pending)
     finalizing = false;
     close_received = false;
     may_receive =  (neigh.size() > 0); // the same as (ctrl_close_pending ||
                                        //              data_close_pending)
     finalizing = false;
-    if (may_receive)
-        comm.listen();
 
     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
     if (!LOG_ISENABLED(logp))
 
     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
     if (!LOG_ISENABLED(logp))
@@ -113,7 +107,6 @@ int process::run()
         bool recv_wait = (load == 0 &&
                           ((opt::bookkeeping ? expected_load : load)
                            == prev_load_broadcast));
         bool recv_wait = (load == 0 &&
                           ((opt::bookkeeping ? expected_load : load)
                            == prev_load_broadcast));
-        DEBUG1("CALL RECEIVE(%s)", recv_wait? "WAIT": "NO_WAIT");
         receive(recv_wait? WAIT: NO_WAIT);
 
         // one of our neighbor is finalizing
         receive(recv_wait? WAIT: NO_WAIT);
 
         // one of our neighbor is finalizing
@@ -174,7 +167,7 @@ void process::compute()
     if (load > 0.0) {
         double duration = opt::comp_cost(load);
         m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
     if (load > 0.0) {
         double duration = opt::comp_cost(load);
         m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
-        DEBUG2("compute %g flop%s.", duration, ESSE(duration));
+        DEBUG2("compute %g flop%s", duration, ESSE(duration));
         MSG_task_execute(task);
         MSG_task_destroy(task);
     } else {
         MSG_task_execute(task);
         MSG_task_destroy(task);
     } else {
@@ -240,10 +233,12 @@ void process::send()
 
 void process::receive(recv_wait_mode wait)
 {
 
 void process::receive(recv_wait_mode wait)
 {
-    // DEBUG1("go for receive(%s)",
-    //        "NO_WAIT\0WAIT\0\0\0\0WAIT_FOR_CLOSE" + 8 * wait);
     message* msg;
     m_host_t from;
     message* msg;
     m_host_t from;
+
+    DEBUG1("go for receive(%s)",
+           "NO_WAIT\0WAIT\0\0\0\0WAIT_FOR_CLOSE" + 8 * wait);
+
     bool do_wait = (wait != NO_WAIT);
     while (may_receive && comm.recv(msg, from, do_wait)) {
         switch (msg->get_type()) {
     bool do_wait = (wait != NO_WAIT);
     while (may_receive && comm.recv(msg, from, do_wait)) {
         switch (msg->get_type()) {
@@ -263,15 +258,13 @@ void process::receive(recv_wait_mode wait)
             break;
         }
         case message::CTRL_CLOSE:
             break;
         }
         case message::CTRL_CLOSE:
-            if (--ctrl_close_pending == 1)
-                comm.next_close_on_ctrl_is_last();
-            // DEBUG1("ctrl_close_pending = %d", ctrl_close_pending);
+            ctrl_close_pending--;
+            DEBUG1("ctrl_close_pending = %d", ctrl_close_pending);
             close_received = true;
             break;
         case message::DATA_CLOSE:
             close_received = true;
             break;
         case message::DATA_CLOSE:
-            if (--data_close_pending == 1)
-                comm.next_close_on_data_is_last();
-            // DEBUG1("data_close_pending = %d", data_close_pending);
+            data_close_pending--;
+            DEBUG1("data_close_pending = %d", data_close_pending);
             close_received = true;
             break;
         }
             close_received = true;
             break;
         }
@@ -295,13 +288,13 @@ void process::finalize()
     finalizing = true;
     total_load_running -= load;
 
     finalizing = true;
     total_load_running -= load;
 
-    DEBUG2("send CLOSE to %d neighbor%s.",
-           (int )neigh.size(), ESSE(neigh.size()));
+    DEBUG2("send CLOSE to %lu neighbor%s",
+           (unsigned long )neigh.size(), ESSE(neigh.size()));
     std::for_each(neigh.begin(), neigh.end(),
                   bind(&process::finalize1, this, _1));
 
     std::for_each(neigh.begin(), neigh.end(),
                   bind(&process::finalize1, this, _1));
 
-    DEBUG2("wait for CLOSE from %d neighbor%s.",
-           (int )neigh.size(), ESSE(neigh.size()));
+    DEBUG2("wait for CLOSE from %lu neighbor%s",
+           (unsigned long )neigh.size(), ESSE(neigh.size()));
     receive(WAIT_FOR_CLOSE);
 
     comm.flush(true);
     receive(WAIT_FOR_CLOSE);
 
     comm.flush(true);