#include <algorithm>
+#include <tr1/functional>
#include <msg/msg.h>
#include <xbt/log.h>
communicator::communicator()
: host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
{
- receiver_mutex.acquire();
- receiver_thread =
- MSG_process_create("receiver", communicator::receiver_wrapper,
- this, MSG_host_self());
- receiver_cond.wait(receiver_mutex); // wait for the receiver to be ready
- receiver_mutex.release();
+ using std::tr1::bind;
+ receiver_thread = new_msg_thread("receiver",
+ bind(&communicator::receiver, this));
+ receiver_thread->start();
}
communicator::~communicator()
task = MSG_task_create("finalize", 0.0, 0, NULL);
MSG_task_send(task, host->get_data_mbox());
- receiver_mutex.acquire();
- while (receiver_thread) {
- XBT_DEBUG("waiting for receiver to terminate");
- receiver_cond.wait(receiver_mutex);
- }
- receiver_mutex.release();
+ receiver_thread->wait();
+ delete receiver_thread;
if (!received.empty())
XBT_WARN("lost %zu received message%s!",
void communicator::flush(bool wait)
{
- sent_comm.remove_if(comm_test_n_destroy);
+ sent_comm_type::iterator bound;
+ bound = std::remove_if(sent_comm.begin(), sent_comm.end(),
+ comm_test_n_destroy);
+ sent_comm.erase(bound, sent_comm.end());
if (wait && !sent_comm.empty()) {
msg_comm_t comms[sent_comm.size()];
std::copy(sent_comm.begin(), sent_comm.end(), comms);
}
}
-bool communicator::recv(message*& msg, m_host_t& from, double timeout)
-{
- XBT_DEBUG("waiting for a message to come");
- bool recvd = received.pop(msg, from, timeout);
- if (recvd)
- XBT_DEBUG("received %s from %s",
- msg->to_string().c_str(), MSG_host_get_name(from));
- return recvd;
-}
-
-int communicator::receiver_wrapper(int, char* [])
-{
- communicator* comm;
- comm = static_cast<communicator*>(MSG_process_get_data(MSG_process_self()));
- comm->receiver();
-
- XBT_DEBUG("terminate");
- comm->receiver_mutex.acquire();
- comm->receiver_thread = NULL;
- comm->receiver_cond.signal();
- comm->receiver_mutex.release();
-
- return 0;
-}
-
void communicator::receiver()
{
xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
xbt_dynar_push(comms, &chan[i].comm);
}
- XBT_DEBUG("receiver ready");
- receiver_mutex.acquire();
- receiver_cond.signal(); // signal master that we are ready
- receiver_mutex.release();
-
while (!xbt_dynar_is_empty(comms)) {
int index = MSG_comm_waitany(comms);