]> AND Private Git Repository - loba.git/blobdiff - communicator.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Improved synchronization with receiver process.
[loba.git] / communicator.cpp
index 753b04a98ed4e529ae31dd38e6d57b85962fd54e..8adb258e536b83c0c85d9fbf7524ad71125732f4 100644 (file)
@@ -34,6 +34,7 @@ communicator::communicator()
     receiver_process =
         MSG_process_create("receiver", communicator::receiver_wrapper,
                            this, MSG_host_self());
     receiver_process =
         MSG_process_create("receiver", communicator::receiver_wrapper,
                            this, MSG_host_self());
+    xbt_cond_wait(cond, mutex); // wait for the receiver to be ready
     xbt_mutex_release(mutex);
 }
 
     xbt_mutex_release(mutex);
 }
 
@@ -156,6 +157,11 @@ int communicator::receiver()
 {
     ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
     data_comm = MSG_task_irecv(&data_task, get_data_mbox());
 {
     ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
     data_comm = MSG_task_irecv(&data_task, get_data_mbox());
+    DEBUG0("receiver ready");
+    xbt_mutex_acquire(mutex);
+    xbt_cond_signal(cond);      // signal master that we are ready
+    xbt_mutex_release(mutex);
+
     xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
     while (ctrl_comm || data_comm) {
 
     xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
     while (ctrl_comm || data_comm) {
 
@@ -169,7 +175,9 @@ int communicator::receiver()
         if (ctrl_comm && comm_test_n_destroy(ctrl_comm)) {
             if (strcmp(MSG_task_get_name(ctrl_task), "finalize")) {
                 DEBUG0("received message from ctrl");
         if (ctrl_comm && comm_test_n_destroy(ctrl_comm)) {
             if (strcmp(MSG_task_get_name(ctrl_task), "finalize")) {
                 DEBUG0("received message from ctrl");
+                xbt_mutex_acquire(mutex);
                 received.push(ctrl_task);
                 received.push(ctrl_task);
+                xbt_mutex_release(mutex);
                 ctrl_task = NULL;
                 ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
             } else {
                 ctrl_task = NULL;
                 ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
             } else {
@@ -183,7 +191,9 @@ int communicator::receiver()
         if (data_comm && comm_test_n_destroy(data_comm)) {
             if (strcmp(MSG_task_get_name(data_task), "finalize")) {
                 DEBUG0("received message from data");
         if (data_comm && comm_test_n_destroy(data_comm)) {
             if (strcmp(MSG_task_get_name(data_task), "finalize")) {
                 DEBUG0("received message from data");
+                xbt_mutex_acquire(mutex);
                 received.push(data_task);
                 received.push(data_task);
+                xbt_mutex_release(mutex);
                 data_task = NULL;
                 data_comm = MSG_task_irecv(&data_task, get_data_mbox());
             } else {
                 data_task = NULL;
                 data_comm = MSG_task_irecv(&data_task, get_data_mbox());
             } else {
@@ -194,7 +204,8 @@ int communicator::receiver()
             }
         }
         xbt_mutex_acquire(mutex);
             }
         }
         xbt_mutex_acquire(mutex);
-        xbt_cond_signal(cond);
+        if (!received.empty())
+            xbt_cond_signal(cond);
         xbt_mutex_release(mutex);
     }
     xbt_dynar_free(&comms);
         xbt_mutex_release(mutex);
     }
     xbt_dynar_free(&comms);