return oss.str();
}
-const int communicator::send_count_before_flush = 4;
-
communicator::communicator()
: host((hostdata* )MSG_host_get_data(MSG_host_self()))
, mutex(xbt_mutex_init())
, cond(xbt_cond_init())
- , send_counter(0)
, ctrl_task(NULL)
, ctrl_comm(NULL)
, data_task(NULL)
task = MSG_task_create("finalize", 0.0, 0, NULL);
MSG_task_send(task, get_data_mbox());
- DEBUG0("wait for receiver to terminate");
xbt_mutex_acquire(mutex);
- while (receiver_process)
+ while (receiver_process) {
+ DEBUG0("waiting for receiver to terminate");
xbt_cond_wait(cond, mutex);
+ }
xbt_mutex_release(mutex);
if (ctrl_comm)
m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);
msg_comm_t comm = MSG_task_isend(task, dest);
sent_comm.push_back(comm);
-
- if (++send_counter >= send_count_before_flush) {
- flush(false);
- send_counter = 0;
- }
}
bool communicator::recv(message*& msg, m_host_t& from, bool wait)
{
if (wait) {
- DEBUG0("suspend main process on recv");
xbt_mutex_acquire(mutex);
- while (received.empty())
+ while (received.empty()) {
+ DEBUG0("waiting for a message to come");
xbt_cond_wait(cond, mutex);
+ }
xbt_mutex_release(mutex);
}