#include <algorithm>
+#include <tr1/functional>
#include <msg/msg.h>
#include <xbt/log.h>
communicator::communicator()
: host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
{
- receiver_mutex.acquire();
- receiver_thread =
- MSG_process_create("receiver", communicator::receiver_wrapper,
- this, MSG_host_self());
- receiver_cond.wait(receiver_mutex); // wait for the receiver to be ready
- receiver_mutex.release();
+ using std::tr1::bind;
+ receiver_thread = new_msg_thread("receiver",
+ bind(&communicator::receiver, this));
+ receiver_thread->start();
}
communicator::~communicator()
task = MSG_task_create("finalize", 0.0, 0, NULL);
MSG_task_send(task, host->get_data_mbox());
- receiver_mutex.acquire();
- while (receiver_thread) {
- XBT_DEBUG("waiting for receiver to terminate");
- receiver_cond.wait(receiver_mutex);
- }
- receiver_mutex.release();
+ receiver_thread->wait();
+ delete receiver_thread;
if (!received.empty())
XBT_WARN("lost %zu received message%s!",
return recvd;
}
-int communicator::receiver_wrapper(int, char* [])
-{
- communicator* comm;
- comm = static_cast<communicator*>(MSG_process_get_data(MSG_process_self()));
- comm->receiver();
-
- XBT_DEBUG("terminate");
- comm->receiver_mutex.acquire();
- comm->receiver_thread = NULL;
- comm->receiver_cond.signal();
- comm->receiver_mutex.release();
-
- return 0;
-}
-
void communicator::receiver()
{
xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
xbt_dynar_push(comms, &chan[i].comm);
}
- XBT_DEBUG("receiver ready");
- receiver_mutex.acquire();
- receiver_cond.signal(); // signal master that we are ready
- receiver_mutex.release();
-
while (!xbt_dynar_is_empty(comms)) {
int index = MSG_comm_waitany(comms);