communicator::communicator()
: host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
- , receiver_mutex(xbt_mutex_init())
- , receiver_cond(xbt_cond_init())
{
- xbt_mutex_acquire(receiver_mutex);
+ receiver_mutex.acquire();
receiver_thread =
MSG_process_create("receiver", communicator::receiver_wrapper,
this, MSG_host_self());
- // wait for the receiver to be ready
- xbt_cond_wait(receiver_cond, receiver_mutex);
- xbt_mutex_release(receiver_mutex);
+ receiver_cond.wait(receiver_mutex); // wait for the receiver to be ready
+ receiver_mutex.release();
}
communicator::~communicator()
task = MSG_task_create("finalize", 0.0, 0, NULL);
MSG_task_send(task, host->get_data_mbox());
- xbt_mutex_acquire(receiver_mutex);
+ receiver_mutex.acquire();
while (receiver_thread) {
XBT_DEBUG("waiting for receiver to terminate");
- xbt_cond_wait(receiver_cond, receiver_mutex);
+ receiver_cond.wait(receiver_mutex);
}
- xbt_mutex_release(receiver_mutex);
+ receiver_mutex.release();
if (!received.empty())
XBT_WARN("lost %zu received message%s!",
if (!sent_comm.empty())
XBT_WARN("lost %zu sent message%s!",
sent_comm.size(), ESSE(sent_comm.size()));
-
- xbt_cond_destroy(receiver_cond);
- xbt_mutex_destroy(receiver_mutex);
}
void communicator::send(const char* dest, message* msg)
if (timeout != 0) {
volatile double deadline =
timeout > 0 ? MSG_get_clock() + timeout : 0.0;
- xbt_mutex_acquire(receiver_mutex);
+ receiver_mutex.acquire();
while (received.empty() && (!deadline || deadline > MSG_get_clock())) {
xbt_ex_t e;
XBT_DEBUG("waiting for a message to come");
TRY {
if (deadline)
- xbt_cond_timedwait(receiver_cond, receiver_mutex,
- deadline - MSG_get_clock());
+ receiver_cond.timedwait(receiver_mutex,
+ deadline - MSG_get_clock());
else
- xbt_cond_wait(receiver_cond, receiver_mutex);
+ receiver_cond.wait(receiver_mutex);
}
CATCH (e) {
if (e.category != timeout_error)
xbt_ex_free(e);
}
}
- xbt_mutex_release(receiver_mutex);
+ receiver_mutex.release();
}
if (received.empty())
comm->receiver();
XBT_DEBUG("terminate");
- xbt_mutex_acquire(comm->receiver_mutex);
+ comm->receiver_mutex.acquire();
comm->receiver_thread = NULL;
- xbt_cond_signal(comm->receiver_cond);
- xbt_mutex_release(comm->receiver_mutex);
+ comm->receiver_cond.signal();
+ comm->receiver_mutex.release();
return 0;
}
}
XBT_DEBUG("receiver ready");
- xbt_mutex_acquire(receiver_mutex);
- xbt_cond_signal(receiver_cond); // signal master that we are ready
- xbt_mutex_release(receiver_mutex);
+ receiver_mutex.acquire();
+ receiver_cond.signal(); // signal master that we are ready
+ receiver_mutex.release();
while (!xbt_dynar_is_empty(comms)) {
comm_check_n_destroy(ch->comm);
if (strcmp(MSG_task_get_name(ch->task), "finalize")) {
XBT_DEBUG("received message on %s", ch->mbox);
- xbt_mutex_acquire(receiver_mutex);
+ receiver_mutex.acquire();
received.push(ch->task);
- xbt_cond_signal(receiver_cond);
- xbt_mutex_release(receiver_mutex);
+ receiver_cond.signal();
+ receiver_mutex.release();
ch->task = NULL;
ch->comm = MSG_task_irecv(&ch->task, ch->mbox);
xbt_dynar_set_as(comms, index, msg_comm_t, ch->comm);
#include "options.h"
#include "process.h"
#include "statistics.h"
+#include "synchro.h"
#include "timer.h"
#include "tracing.h"
#include "version.h"
EXIT_FAILURE_CLEAN = 0x08, // error at cleanup
};
- xbt_mutex_t proc_mutex;
- xbt_cond_t proc_cond;
- unsigned proc_counter;
+ mutex_t proc_mutex;
+ condition_t proc_cond;
+ unsigned proc_counter = 0;
statistics comps;
statistics loads;
try {
proc = opt::loba_algorithms.new_instance(opt::loba_algo, argc, argv);
- xbt_mutex_acquire(proc_mutex);
+ proc_mutex.acquire();
++proc_counter;
- xbt_mutex_release(proc_mutex);
+ proc_mutex.release();
result = proc->run();
- xbt_mutex_acquire(proc_mutex);
+ proc_mutex.acquire();
comps.push(proc->get_comp());
loads.push(proc->get_real_load());
// destroys a communication they had together.
--proc_counter;
- xbt_cond_broadcast(proc_cond);
+ proc_cond.broadcast();
while (proc_counter > 0)
- xbt_cond_wait(proc_cond, proc_mutex);
- xbt_mutex_release(proc_mutex);
+ proc_cond.wait(proc_mutex);
+ proc_mutex.release();
delete proc;
}
exit_status = EXIT_FAILURE_SIMU; // =====
- proc_mutex = xbt_mutex_init();
- proc_cond = xbt_cond_init();
- proc_counter = 0;
-
// Launch the MSG simulation.
XBT_INFO("Starting simulation at %f...", MSG_get_clock());
res = MSG_main();
simulated_time = MSG_get_clock();
XBT_INFO("Simulation ended at %f.", simulated_time);
- xbt_cond_destroy(proc_cond);
- xbt_mutex_destroy(proc_mutex);
-
if (res != MSG_OK)
THROW1(0, 0, "MSG_main() failed with status %#x", res);
--- /dev/null
+#ifndef SYNCHRO_H
+#define SYNCHRO_H
+
+#include <xbt/synchro.h>
+
+class mutex_t {
+public:
+ mutex_t() { mutex = xbt_mutex_init(); }
+ ~mutex_t() { xbt_mutex_destroy(mutex); }
+ void acquire() { xbt_mutex_acquire(mutex); }
+ void release() { xbt_mutex_release(mutex); }
+ xbt_mutex_t get() { return mutex; }
+
+private:
+ xbt_mutex_t mutex;
+};
+
+class condition_t {
+public:
+ condition_t() { cond = xbt_cond_init(); }
+ ~condition_t() { xbt_cond_destroy(cond); }
+ void broadcast() { xbt_cond_broadcast(cond); }
+ void signal() { xbt_cond_signal(cond); }
+ void wait(mutex_t& mutex) { xbt_cond_wait(cond, mutex.get()); }
+ void timedwait(mutex_t& mutex, double delay) {
+ xbt_cond_timedwait(cond, mutex.get(), delay);
+ }
+
+private:
+ xbt_cond_t cond;
+};
+
+#endif // !SYNCHRO_H
+
+// Local variables:
+// mode: c++
+// End: