From 82a490fdda330eb916eb059929de4be817fe8a87 Mon Sep 17 00:00:00 2001 From: Arnaud Giersch Date: Tue, 8 Feb 2011 23:27:03 +0100 Subject: [PATCH] Define classes for mutex and condition objects. --- communicator.cpp | 46 ++++++++++++++++++++-------------------------- communicator.h | 5 +++-- main.cpp | 26 ++++++++++---------------- synchro.h | 37 +++++++++++++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 44 deletions(-) create mode 100644 synchro.h diff --git a/communicator.cpp b/communicator.cpp index 96cd598..506f126 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -25,16 +25,13 @@ std::string message::to_string() communicator::communicator() : host(static_cast(MSG_host_get_data(MSG_host_self()))) - , receiver_mutex(xbt_mutex_init()) - , receiver_cond(xbt_cond_init()) { - xbt_mutex_acquire(receiver_mutex); + receiver_mutex.acquire(); receiver_thread = MSG_process_create("receiver", communicator::receiver_wrapper, this, MSG_host_self()); - // wait for the receiver to be ready - xbt_cond_wait(receiver_cond, receiver_mutex); - xbt_mutex_release(receiver_mutex); + receiver_cond.wait(receiver_mutex); // wait for the receiver to be ready + receiver_mutex.release(); } communicator::~communicator() @@ -49,12 +46,12 @@ communicator::~communicator() task = MSG_task_create("finalize", 0.0, 0, NULL); MSG_task_send(task, host->get_data_mbox()); - xbt_mutex_acquire(receiver_mutex); + receiver_mutex.acquire(); while (receiver_thread) { XBT_DEBUG("waiting for receiver to terminate"); - xbt_cond_wait(receiver_cond, receiver_mutex); + receiver_cond.wait(receiver_mutex); } - xbt_mutex_release(receiver_mutex); + receiver_mutex.release(); if (!received.empty()) XBT_WARN("lost %zu received message%s!", @@ -62,9 +59,6 @@ communicator::~communicator() if (!sent_comm.empty()) XBT_WARN("lost %zu sent message%s!", sent_comm.size(), ESSE(sent_comm.size())); - - xbt_cond_destroy(receiver_cond); - xbt_mutex_destroy(receiver_mutex); } void communicator::send(const char* dest, message* msg) @@ -86,16 +80,16 @@ bool communicator::recv(message*& msg, m_host_t& from, double timeout) if (timeout != 0) { volatile double deadline = timeout > 0 ? MSG_get_clock() + timeout : 0.0; - xbt_mutex_acquire(receiver_mutex); + receiver_mutex.acquire(); while (received.empty() && (!deadline || deadline > MSG_get_clock())) { xbt_ex_t e; XBT_DEBUG("waiting for a message to come"); TRY { if (deadline) - xbt_cond_timedwait(receiver_cond, receiver_mutex, - deadline - MSG_get_clock()); + receiver_cond.timedwait(receiver_mutex, + deadline - MSG_get_clock()); else - xbt_cond_wait(receiver_cond, receiver_mutex); + receiver_cond.wait(receiver_mutex); } CATCH (e) { if (e.category != timeout_error) @@ -103,7 +97,7 @@ bool communicator::recv(message*& msg, m_host_t& from, double timeout) xbt_ex_free(e); } } - xbt_mutex_release(receiver_mutex); + receiver_mutex.release(); } if (received.empty()) @@ -142,10 +136,10 @@ int communicator::receiver_wrapper(int, char* []) comm->receiver(); XBT_DEBUG("terminate"); - xbt_mutex_acquire(comm->receiver_mutex); + comm->receiver_mutex.acquire(); comm->receiver_thread = NULL; - xbt_cond_signal(comm->receiver_cond); - xbt_mutex_release(comm->receiver_mutex); + comm->receiver_cond.signal(); + comm->receiver_mutex.release(); return 0; } @@ -168,9 +162,9 @@ void communicator::receiver() } XBT_DEBUG("receiver ready"); - xbt_mutex_acquire(receiver_mutex); - xbt_cond_signal(receiver_cond); // signal master that we are ready - xbt_mutex_release(receiver_mutex); + receiver_mutex.acquire(); + receiver_cond.signal(); // signal master that we are ready + receiver_mutex.release(); while (!xbt_dynar_is_empty(comms)) { @@ -184,10 +178,10 @@ void communicator::receiver() comm_check_n_destroy(ch->comm); if (strcmp(MSG_task_get_name(ch->task), "finalize")) { XBT_DEBUG("received message on %s", ch->mbox); - xbt_mutex_acquire(receiver_mutex); + receiver_mutex.acquire(); received.push(ch->task); - xbt_cond_signal(receiver_cond); - xbt_mutex_release(receiver_mutex); + receiver_cond.signal(); + receiver_mutex.release(); ch->task = NULL; ch->comm = MSG_task_irecv(&ch->task, ch->mbox); xbt_dynar_set_as(comms, index, msg_comm_t, ch->comm); diff --git a/communicator.h b/communicator.h index c335920..b51f743 100644 --- a/communicator.h +++ b/communicator.h @@ -8,6 +8,7 @@ #include #include #include "hostdata.h" +#include "synchro.h" class message { public: @@ -53,8 +54,8 @@ private: std::queue received; // Handling of receiving thread - xbt_mutex_t receiver_mutex; - xbt_cond_t receiver_cond; + mutex_t receiver_mutex; + condition_t receiver_cond; m_process_t receiver_thread; static int receiver_wrapper(int, char* []); void receiver(); diff --git a/main.cpp b/main.cpp index 0aa4327..ebf28f1 100644 --- a/main.cpp +++ b/main.cpp @@ -20,6 +20,7 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main); #include "options.h" #include "process.h" #include "statistics.h" +#include "synchro.h" #include "timer.h" #include "tracing.h" #include "version.h" @@ -34,9 +35,9 @@ namespace { EXIT_FAILURE_CLEAN = 0x08, // error at cleanup }; - xbt_mutex_t proc_mutex; - xbt_cond_t proc_cond; - unsigned proc_counter; + mutex_t proc_mutex; + condition_t proc_cond; + unsigned proc_counter = 0; statistics comps; statistics loads; @@ -50,13 +51,13 @@ static int simulation_main(int argc, char* argv[]) try { proc = opt::loba_algorithms.new_instance(opt::loba_algo, argc, argv); - xbt_mutex_acquire(proc_mutex); + proc_mutex.acquire(); ++proc_counter; - xbt_mutex_release(proc_mutex); + proc_mutex.release(); result = proc->run(); - xbt_mutex_acquire(proc_mutex); + proc_mutex.acquire(); comps.push(proc->get_comp()); loads.push(proc->get_real_load()); @@ -66,10 +67,10 @@ static int simulation_main(int argc, char* argv[]) // destroys a communication they had together. --proc_counter; - xbt_cond_broadcast(proc_cond); + proc_cond.broadcast(); while (proc_counter > 0) - xbt_cond_wait(proc_cond, proc_mutex); - xbt_mutex_release(proc_mutex); + proc_cond.wait(proc_mutex); + proc_mutex.release(); delete proc; } @@ -190,19 +191,12 @@ int main(int argc, char* argv[]) exit_status = EXIT_FAILURE_SIMU; // ===== - proc_mutex = xbt_mutex_init(); - proc_cond = xbt_cond_init(); - proc_counter = 0; - // Launch the MSG simulation. XBT_INFO("Starting simulation at %f...", MSG_get_clock()); res = MSG_main(); simulated_time = MSG_get_clock(); XBT_INFO("Simulation ended at %f.", simulated_time); - xbt_cond_destroy(proc_cond); - xbt_mutex_destroy(proc_mutex); - if (res != MSG_OK) THROW1(0, 0, "MSG_main() failed with status %#x", res); diff --git a/synchro.h b/synchro.h new file mode 100644 index 0000000..7b04f92 --- /dev/null +++ b/synchro.h @@ -0,0 +1,37 @@ +#ifndef SYNCHRO_H +#define SYNCHRO_H + +#include + +class mutex_t { +public: + mutex_t() { mutex = xbt_mutex_init(); } + ~mutex_t() { xbt_mutex_destroy(mutex); } + void acquire() { xbt_mutex_acquire(mutex); } + void release() { xbt_mutex_release(mutex); } + xbt_mutex_t get() { return mutex; } + +private: + xbt_mutex_t mutex; +}; + +class condition_t { +public: + condition_t() { cond = xbt_cond_init(); } + ~condition_t() { xbt_cond_destroy(cond); } + void broadcast() { xbt_cond_broadcast(cond); } + void signal() { xbt_cond_signal(cond); } + void wait(mutex_t& mutex) { xbt_cond_wait(cond, mutex.get()); } + void timedwait(mutex_t& mutex, double delay) { + xbt_cond_timedwait(cond, mutex.get(), delay); + } + +private: + xbt_cond_t cond; +}; + +#endif // !SYNCHRO_H + +// Local variables: +// mode: c++ +// End: -- 2.39.5