From: Arnaud Giersch Date: Wed, 9 Feb 2011 12:58:43 +0000 (+0100) Subject: Define class for MSG thread. X-Git-Tag: v0.1~155 X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/commitdiff_plain/0872290a992e2f6db574c18ac36d1987318d477d Define class for MSG thread. --- diff --git a/Makefile b/Makefile index 3990aa5..6afc248 100644 --- a/Makefile +++ b/Makefile @@ -36,6 +36,7 @@ SRC.loba := main.cpp \ $(wildcard loba_*.cpp) \ messages.cpp \ misc.cpp \ + msg_thread.cpp \ neighbor.cpp \ options.cpp \ process.cpp \ diff --git a/communicator.cpp b/communicator.cpp index 3516b68..daf1434 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -14,12 +15,10 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm); communicator::communicator() : host(static_cast(MSG_host_get_data(MSG_host_self()))) { - receiver_mutex.acquire(); - receiver_thread = - MSG_process_create("receiver", communicator::receiver_wrapper, - this, MSG_host_self()); - receiver_cond.wait(receiver_mutex); // wait for the receiver to be ready - receiver_mutex.release(); + using std::tr1::bind; + receiver_thread = new_msg_thread("receiver", + bind(&communicator::receiver, this)); + receiver_thread->start(); } communicator::~communicator() @@ -34,12 +33,8 @@ communicator::~communicator() task = MSG_task_create("finalize", 0.0, 0, NULL); MSG_task_send(task, host->get_data_mbox()); - receiver_mutex.acquire(); - while (receiver_thread) { - XBT_DEBUG("waiting for receiver to terminate"); - receiver_cond.wait(receiver_mutex); - } - receiver_mutex.release(); + receiver_thread->wait(); + delete receiver_thread; if (!received.empty()) XBT_WARN("lost %zu received message%s!", @@ -87,21 +82,6 @@ bool communicator::recv(message*& msg, m_host_t& from, double timeout) return recvd; } -int communicator::receiver_wrapper(int, char* []) -{ - communicator* comm; - comm = static_cast(MSG_process_get_data(MSG_process_self())); - comm->receiver(); - - XBT_DEBUG("terminate"); - comm->receiver_mutex.acquire(); - comm->receiver_thread = NULL; - comm->receiver_cond.signal(); - comm->receiver_mutex.release(); - - return 0; -} - void communicator::receiver() { xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL); @@ -119,11 +99,6 @@ void communicator::receiver() xbt_dynar_push(comms, &chan[i].comm); } - XBT_DEBUG("receiver ready"); - receiver_mutex.acquire(); - receiver_cond.signal(); // signal master that we are ready - receiver_mutex.release(); - while (!xbt_dynar_is_empty(comms)) { int index = MSG_comm_waitany(comms); diff --git a/communicator.h b/communicator.h index 7cb1efc..cfe8624 100644 --- a/communicator.h +++ b/communicator.h @@ -7,7 +7,7 @@ #include #include "hostdata.h" #include "messages.h" -#include "synchro.h" +#include "msg_thread.h" class communicator { public: @@ -37,10 +37,7 @@ private: message_queue received; // Handling of receiving thread - mutex_t receiver_mutex; - condition_t receiver_cond; - m_process_t receiver_thread; - static int receiver_wrapper(int, char* []); + msg_thread* receiver_thread; void receiver(); // Used to chek if a communication is successfull before destroying it diff --git a/main.cpp b/main.cpp index ebf28f1..9bffd8b 100644 --- a/main.cpp +++ b/main.cpp @@ -11,6 +11,7 @@ XBT_LOG_NEW_SUBCATEGORY(depl, main, "Messages from auto deployment"); XBT_LOG_NEW_SUBCATEGORY(comm, simu, "Messages from asynchronous pipes"); XBT_LOG_NEW_SUBCATEGORY(proc, simu, "Messages from base process class"); XBT_LOG_NEW_SUBCATEGORY(loba, simu, "Messages from load-balancer"); +XBT_LOG_NEW_SUBCATEGORY(thrd, simu, "Messages from thread wrapper class"); XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main); diff --git a/msg_thread.cpp b/msg_thread.cpp new file mode 100644 index 0000000..41c4bfb --- /dev/null +++ b/msg_thread.cpp @@ -0,0 +1,68 @@ +#include +#include + +XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(thrd); + +#include "misc.h" +#include "msg_thread.h" + +msg_thread::msg_thread() + : started(false) + , thread(NULL) + , thread_name("msg_thread") +{ +} + +msg_thread::msg_thread(const char* name) + : started(false) + , thread(NULL) + , thread_name(name) +{ +} + +msg_thread::~msg_thread() +{ + if (thread) + throw std::logic_error("trying to destroy running thread"); +} + +void msg_thread::start() +{ + mutex.acquire(); + if (started) + throw std::logic_error("thread was already started"); + XBT_DEBUG("launch \"%s\"", thread_name.c_str()); + thread = MSG_process_create(thread_name.c_str(), + msg_thread::start_wrapper, + this, MSG_host_self()); + started = true; + mutex.release(); +} + +void msg_thread::wait() +{ + mutex.acquire(); + if (!started) + throw std::logic_error("trying to wait a thread that was not started"); + while (thread) { + XBT_DEBUG("waiting for \"%s\" to terminate", + thread_name.c_str()); + cond.wait(mutex); + } + mutex.release(); +} + +int msg_thread::start_wrapper(int, char* []) +{ + msg_thread* self; + self = static_cast(MSG_process_get_data(MSG_process_self())); + XBT_DEBUG("\"%s\" started", self->thread_name.c_str()); + self->run(); + + XBT_DEBUG("terminate \"%s\"", self->thread_name.c_str()); + self->mutex.acquire(); + self->thread = NULL; + self->cond.signal(); + self->mutex.release(); + return 0; +} diff --git a/msg_thread.h b/msg_thread.h new file mode 100644 index 0000000..0fee6ea --- /dev/null +++ b/msg_thread.h @@ -0,0 +1,52 @@ +#ifndef MSG_THREAD_H +#define MSG_THREAD_H + +#include +#include +#include "synchro.h" + +class msg_thread { +public: + msg_thread(); + msg_thread(const char* name); + virtual ~msg_thread(); + + void start(); + void wait(); + + virtual void run() = 0; + +private: + bool started; + mutex_t mutex; + condition_t cond; + m_process_t thread; + std::string thread_name; + + void run_wrapper(); + static int start_wrapper(int, char* []); +}; + +template +class msg_thread_wrapper: public msg_thread { +public: + msg_thread_wrapper(const char* name, Func run) + : msg_thread(name) + , real_run(run) + { } + void run() { real_run(); } +private: + Func real_run; +}; + +template +msg_thread* new_msg_thread(const char* name, Func run) +{ + return new msg_thread_wrapper(name, run); +} + +#endif // !MSG_THREAD_H + +// Local variables: +// mode: c++ +// End: diff --git a/options.cpp b/options.cpp index 2509fe0..3e789cd 100644 --- a/options.cpp +++ b/options.cpp @@ -405,7 +405,8 @@ void opt::usage() << " depl : messages from auto deployment (inherited from main)\n" << " comm : messages from asynchronous pipes\n" << " proc : messages from base process class\n" - << " loba : messages from load-balancer\n"; + << " loba : messages from load-balancer\n" + << " thrd : messages from thread wrapper class\n"; // std::clog << "\nMiscellaneous low-level parameters\n";