$(wildcard loba_*.cpp) \
messages.cpp \
misc.cpp \
+ msg_thread.cpp \
neighbor.cpp \
options.cpp \
process.cpp \
#include <algorithm>
+#include <tr1/functional>
#include <msg/msg.h>
#include <xbt/log.h>
communicator::communicator()
: host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
{
- receiver_mutex.acquire();
- receiver_thread =
- MSG_process_create("receiver", communicator::receiver_wrapper,
- this, MSG_host_self());
- receiver_cond.wait(receiver_mutex); // wait for the receiver to be ready
- receiver_mutex.release();
+ using std::tr1::bind;
+ receiver_thread = new_msg_thread("receiver",
+ bind(&communicator::receiver, this));
+ receiver_thread->start();
}
communicator::~communicator()
task = MSG_task_create("finalize", 0.0, 0, NULL);
MSG_task_send(task, host->get_data_mbox());
- receiver_mutex.acquire();
- while (receiver_thread) {
- XBT_DEBUG("waiting for receiver to terminate");
- receiver_cond.wait(receiver_mutex);
- }
- receiver_mutex.release();
+ receiver_thread->wait();
+ delete receiver_thread;
if (!received.empty())
XBT_WARN("lost %zu received message%s!",
return recvd;
}
-int communicator::receiver_wrapper(int, char* [])
-{
- communicator* comm;
- comm = static_cast<communicator*>(MSG_process_get_data(MSG_process_self()));
- comm->receiver();
-
- XBT_DEBUG("terminate");
- comm->receiver_mutex.acquire();
- comm->receiver_thread = NULL;
- comm->receiver_cond.signal();
- comm->receiver_mutex.release();
-
- return 0;
-}
-
void communicator::receiver()
{
xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
xbt_dynar_push(comms, &chan[i].comm);
}
- XBT_DEBUG("receiver ready");
- receiver_mutex.acquire();
- receiver_cond.signal(); // signal master that we are ready
- receiver_mutex.release();
-
while (!xbt_dynar_is_empty(comms)) {
int index = MSG_comm_waitany(comms);
#include <msg/msg.h>
#include "hostdata.h"
#include "messages.h"
-#include "synchro.h"
+#include "msg_thread.h"
class communicator {
public:
message_queue received;
// Handling of receiving thread
- mutex_t receiver_mutex;
- condition_t receiver_cond;
- m_process_t receiver_thread;
- static int receiver_wrapper(int, char* []);
+ msg_thread* receiver_thread;
void receiver();
// Used to chek if a communication is successfull before destroying it
XBT_LOG_NEW_SUBCATEGORY(comm, simu, "Messages from asynchronous pipes");
XBT_LOG_NEW_SUBCATEGORY(proc, simu, "Messages from base process class");
XBT_LOG_NEW_SUBCATEGORY(loba, simu, "Messages from load-balancer");
+XBT_LOG_NEW_SUBCATEGORY(thrd, simu, "Messages from thread wrapper class");
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main);
--- /dev/null
+#include <stdexcept>
+#include <xbt/log.h>
+
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(thrd);
+
+#include "misc.h"
+#include "msg_thread.h"
+
+msg_thread::msg_thread()
+ : started(false)
+ , thread(NULL)
+ , thread_name("msg_thread")
+{
+}
+
+msg_thread::msg_thread(const char* name)
+ : started(false)
+ , thread(NULL)
+ , thread_name(name)
+{
+}
+
+msg_thread::~msg_thread()
+{
+ if (thread)
+ throw std::logic_error("trying to destroy running thread");
+}
+
+void msg_thread::start()
+{
+ mutex.acquire();
+ if (started)
+ throw std::logic_error("thread was already started");
+ XBT_DEBUG("launch \"%s\"", thread_name.c_str());
+ thread = MSG_process_create(thread_name.c_str(),
+ msg_thread::start_wrapper,
+ this, MSG_host_self());
+ started = true;
+ mutex.release();
+}
+
+void msg_thread::wait()
+{
+ mutex.acquire();
+ if (!started)
+ throw std::logic_error("trying to wait a thread that was not started");
+ while (thread) {
+ XBT_DEBUG("waiting for \"%s\" to terminate",
+ thread_name.c_str());
+ cond.wait(mutex);
+ }
+ mutex.release();
+}
+
+int msg_thread::start_wrapper(int, char* [])
+{
+ msg_thread* self;
+ self = static_cast<msg_thread*>(MSG_process_get_data(MSG_process_self()));
+ XBT_DEBUG("\"%s\" started", self->thread_name.c_str());
+ self->run();
+
+ XBT_DEBUG("terminate \"%s\"", self->thread_name.c_str());
+ self->mutex.acquire();
+ self->thread = NULL;
+ self->cond.signal();
+ self->mutex.release();
+ return 0;
+}
--- /dev/null
+#ifndef MSG_THREAD_H
+#define MSG_THREAD_H
+
+#include <string>
+#include <msg/msg.h>
+#include "synchro.h"
+
+class msg_thread {
+public:
+ msg_thread();
+ msg_thread(const char* name);
+ virtual ~msg_thread();
+
+ void start();
+ void wait();
+
+ virtual void run() = 0;
+
+private:
+ bool started;
+ mutex_t mutex;
+ condition_t cond;
+ m_process_t thread;
+ std::string thread_name;
+
+ void run_wrapper();
+ static int start_wrapper(int, char* []);
+};
+
+template <typename Func>
+class msg_thread_wrapper: public msg_thread {
+public:
+ msg_thread_wrapper(const char* name, Func run)
+ : msg_thread(name)
+ , real_run(run)
+ { }
+ void run() { real_run(); }
+private:
+ Func real_run;
+};
+
+template <typename Func>
+msg_thread* new_msg_thread(const char* name, Func run)
+{
+ return new msg_thread_wrapper<Func>(name, run);
+}
+
+#endif // !MSG_THREAD_H
+
+// Local variables:
+// mode: c++
+// End:
<< " depl : messages from auto deployment (inherited from main)\n"
<< " comm : messages from asynchronous pipes\n"
<< " proc : messages from base process class\n"
- << " loba : messages from load-balancer\n";
+ << " loba : messages from load-balancer\n"
+ << " thrd : messages from thread wrapper class\n";
// std::clog << "\nMiscellaneous low-level parameters\n";