From 5ea4e2d40842a43c9a5c2c4d360f928e5dbd3532 Mon Sep 17 00:00:00 2001 From: Arnaud Giersch Date: Mon, 7 Feb 2011 14:37:10 +0100 Subject: [PATCH] Do not use MSG_comm_test() after MSG_comm_waitany(). --- communicator.cpp | 63 +++++++++++++++++++++--------------------------- communicator.h | 1 + 2 files changed, 29 insertions(+), 35 deletions(-) diff --git a/communicator.cpp b/communicator.cpp index 649ff80..0b06d6c 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -164,6 +164,25 @@ int communicator::receiver_wrapper(int, char* []) return result; } +void communicator::receiver1(msg_comm_t& comm, m_task_t& task, const char* mbox) +{ + MSG_comm_destroy(comm); + if (strcmp(MSG_task_get_name(task), "finalize")) { + XBT_DEBUG("received message on %s", mbox); + xbt_mutex_acquire(mutex); + received.push(task); + xbt_cond_signal(cond); + xbt_mutex_release(mutex); + task = NULL; + comm = MSG_task_irecv(&task, mbox); + } else { + XBT_DEBUG("received finalize on %s", mbox); + MSG_task_destroy(task); + task = NULL; + comm = NULL; + } +} + int communicator::receiver() { ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox()); @@ -180,44 +199,18 @@ int communicator::receiver() xbt_dynar_push(comms, &ctrl_comm); if (data_comm) xbt_dynar_push(comms, &data_comm); - MSG_comm_waitany(comms); + int recvd = MSG_comm_waitany(comms); + msg_comm_t comm = xbt_dynar_get_as(comms, recvd, msg_comm_t); xbt_dynar_reset(comms); - if (ctrl_comm && comm_test_n_destroy(ctrl_comm)) { - if (strcmp(MSG_task_get_name(ctrl_task), "finalize")) { - XBT_DEBUG("received message from ctrl"); - xbt_mutex_acquire(mutex); - received.push(ctrl_task); - xbt_mutex_release(mutex); - ctrl_task = NULL; - ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox()); - } else { - XBT_DEBUG("received finalize from ctrl"); - MSG_task_destroy(ctrl_task); - ctrl_task = NULL; - ctrl_comm = NULL; - } - } - - if (data_comm && comm_test_n_destroy(data_comm)) { - if (strcmp(MSG_task_get_name(data_task), "finalize")) { - XBT_DEBUG("received message from data"); - xbt_mutex_acquire(mutex); - received.push(data_task); - xbt_mutex_release(mutex); - data_task = NULL; - data_comm = MSG_task_irecv(&data_task, get_data_mbox()); - } else { - XBT_DEBUG("received finalize from data"); - MSG_task_destroy(data_task); - data_task = NULL; - data_comm = NULL; - } + if (comm == ctrl_comm) + receiver1(ctrl_comm, ctrl_task, get_ctrl_mbox()); + else if (comm == data_comm) + receiver1(data_comm, data_task, get_data_mbox()); + else { + XBT_ERROR("Handling unknown comm -- %p", comm); + MSG_comm_destroy(comm); } - xbt_mutex_acquire(mutex); - if (!received.empty()) - xbt_cond_signal(cond); - xbt_mutex_release(mutex); } xbt_dynar_free(&comms); return 0; diff --git a/communicator.h b/communicator.h index 66dc800..0d47ca0 100644 --- a/communicator.h +++ b/communicator.h @@ -70,6 +70,7 @@ private: // Handling of receiving thread m_process_t receiver_process; static int receiver_wrapper(int, char* []); + void receiver1(msg_comm_t& comm, m_task_t& task, const char* mbox); int receiver(); // Used to test if a communication is over, and to destroy it if it is -- 2.39.5