]> AND Private Git Repository - loba.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Define class for MSG thread.
authorArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Wed, 9 Feb 2011 12:58:43 +0000 (13:58 +0100)
committerArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Wed, 9 Feb 2011 13:06:50 +0000 (14:06 +0100)
Makefile
communicator.cpp
communicator.h
main.cpp
msg_thread.cpp [new file with mode: 0644]
msg_thread.h [new file with mode: 0644]
options.cpp

index 3990aa5b0d3db0ed986843f3bb05eddea3d9e11d..6afc248ee1398a6577e66790d02f64c63641e76f 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -36,6 +36,7 @@ SRC.loba := main.cpp          \
        $(wildcard loba_*.cpp)  \
        messages.cpp            \
        misc.cpp                \
        $(wildcard loba_*.cpp)  \
        messages.cpp            \
        misc.cpp                \
+       msg_thread.cpp          \
        neighbor.cpp            \
        options.cpp             \
        process.cpp             \
        neighbor.cpp            \
        options.cpp             \
        process.cpp             \
index 3516b68146ee8844a3c8ec8c33fad1b3c19befc6..daf14348393761cded88e166bbdfd4d313e4622e 100644 (file)
@@ -1,4 +1,5 @@
 #include <algorithm>
 #include <algorithm>
+#include <tr1/functional>
 #include <msg/msg.h>
 #include <xbt/log.h>
 
 #include <msg/msg.h>
 #include <xbt/log.h>
 
@@ -14,12 +15,10 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm);
 communicator::communicator()
     : host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
 {
 communicator::communicator()
     : host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
 {
-    receiver_mutex.acquire();
-    receiver_thread =
-        MSG_process_create("receiver", communicator::receiver_wrapper,
-                           this, MSG_host_self());
-    receiver_cond.wait(receiver_mutex); // wait for the receiver to be ready
-    receiver_mutex.release();
+    using std::tr1::bind;
+    receiver_thread = new_msg_thread("receiver",
+                                     bind(&communicator::receiver, this));
+    receiver_thread->start();
 }
 
 communicator::~communicator()
 }
 
 communicator::~communicator()
@@ -34,12 +33,8 @@ communicator::~communicator()
     task = MSG_task_create("finalize", 0.0, 0, NULL);
     MSG_task_send(task, host->get_data_mbox());
 
     task = MSG_task_create("finalize", 0.0, 0, NULL);
     MSG_task_send(task, host->get_data_mbox());
 
-    receiver_mutex.acquire();
-    while (receiver_thread) {
-        XBT_DEBUG("waiting for receiver to terminate");
-        receiver_cond.wait(receiver_mutex);
-    }
-    receiver_mutex.release();
+    receiver_thread->wait();
+    delete receiver_thread;
 
     if (!received.empty())
         XBT_WARN("lost %zu received message%s!",
 
     if (!received.empty())
         XBT_WARN("lost %zu received message%s!",
@@ -87,21 +82,6 @@ bool communicator::recv(message*& msg, m_host_t& from, double timeout)
     return recvd;
 }
 
     return recvd;
 }
 
-int communicator::receiver_wrapper(int, char* [])
-{
-    communicator* comm;
-    comm = static_cast<communicator*>(MSG_process_get_data(MSG_process_self()));
-    comm->receiver();
-
-    XBT_DEBUG("terminate");
-    comm->receiver_mutex.acquire();
-    comm->receiver_thread = NULL;
-    comm->receiver_cond.signal();
-    comm->receiver_mutex.release();
-
-    return 0;
-}
-
 void communicator::receiver()
 {
     xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
 void communicator::receiver()
 {
     xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
@@ -119,11 +99,6 @@ void communicator::receiver()
         xbt_dynar_push(comms, &chan[i].comm);
     }
 
         xbt_dynar_push(comms, &chan[i].comm);
     }
 
-    XBT_DEBUG("receiver ready");
-    receiver_mutex.acquire();
-    receiver_cond.signal();     // signal master that we are ready
-    receiver_mutex.release();
-
     while (!xbt_dynar_is_empty(comms)) {
 
         int index = MSG_comm_waitany(comms);
     while (!xbt_dynar_is_empty(comms)) {
 
         int index = MSG_comm_waitany(comms);
index 7cb1efcb0883374084f9aeb872b5a2fad1bd96b6..cfe8624e62493f37425c5ed4c440e15a63896b58 100644 (file)
@@ -7,7 +7,7 @@
 #include <msg/msg.h>
 #include "hostdata.h"
 #include "messages.h"
 #include <msg/msg.h>
 #include "hostdata.h"
 #include "messages.h"
-#include "synchro.h"
+#include "msg_thread.h"
 
 class communicator {
 public:
 
 class communicator {
 public:
@@ -37,10 +37,7 @@ private:
     message_queue received;
 
     // Handling of receiving thread
     message_queue received;
 
     // Handling of receiving thread
-    mutex_t receiver_mutex;
-    condition_t receiver_cond;
-    m_process_t receiver_thread;
-    static int receiver_wrapper(int, char* []);
+    msg_thread* receiver_thread;
     void receiver();
 
     // Used to chek if a communication is successfull before destroying it
     void receiver();
 
     // Used to chek if a communication is successfull before destroying it
index ebf28f10c379e715c1a9b45878d1a271b6a1953b..9bffd8b2fbdc8d35fc3de9c0bce6229831f1f740 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -11,6 +11,7 @@ XBT_LOG_NEW_SUBCATEGORY(depl, main, "Messages from auto deployment");
 XBT_LOG_NEW_SUBCATEGORY(comm, simu, "Messages from asynchronous pipes");
 XBT_LOG_NEW_SUBCATEGORY(proc, simu, "Messages from base process class");
 XBT_LOG_NEW_SUBCATEGORY(loba, simu, "Messages from load-balancer");
 XBT_LOG_NEW_SUBCATEGORY(comm, simu, "Messages from asynchronous pipes");
 XBT_LOG_NEW_SUBCATEGORY(proc, simu, "Messages from base process class");
 XBT_LOG_NEW_SUBCATEGORY(loba, simu, "Messages from load-balancer");
+XBT_LOG_NEW_SUBCATEGORY(thrd, simu, "Messages from thread wrapper class");
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main);
 
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main);
 
diff --git a/msg_thread.cpp b/msg_thread.cpp
new file mode 100644 (file)
index 0000000..41c4bfb
--- /dev/null
@@ -0,0 +1,68 @@
+#include <stdexcept>
+#include <xbt/log.h>
+
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(thrd);
+
+#include "misc.h"
+#include "msg_thread.h"
+
+msg_thread::msg_thread()
+    : started(false)
+    , thread(NULL)
+    , thread_name("msg_thread")
+{
+}
+
+msg_thread::msg_thread(const char* name)
+    : started(false)
+    , thread(NULL)
+    , thread_name(name)
+{
+}
+
+msg_thread::~msg_thread()
+{
+    if (thread)
+        throw std::logic_error("trying to destroy running thread");
+}
+
+void msg_thread::start()
+{
+    mutex.acquire();
+    if (started)
+        throw std::logic_error("thread was already started");
+    XBT_DEBUG("launch \"%s\"", thread_name.c_str());
+    thread = MSG_process_create(thread_name.c_str(),
+                                msg_thread::start_wrapper,
+                                this, MSG_host_self());
+    started = true;
+    mutex.release();
+}
+
+void msg_thread::wait()
+{
+    mutex.acquire();
+    if (!started)
+        throw std::logic_error("trying to wait a thread that was not started");
+    while (thread) {
+        XBT_DEBUG("waiting for \"%s\" to terminate",
+                  thread_name.c_str());
+        cond.wait(mutex);
+    }
+    mutex.release();
+}
+
+int msg_thread::start_wrapper(int, char* [])
+{
+    msg_thread* self;
+    self = static_cast<msg_thread*>(MSG_process_get_data(MSG_process_self()));
+    XBT_DEBUG("\"%s\" started", self->thread_name.c_str());
+    self->run();
+
+    XBT_DEBUG("terminate \"%s\"", self->thread_name.c_str());
+    self->mutex.acquire();
+    self->thread = NULL;
+    self->cond.signal();
+    self->mutex.release();
+    return 0;
+}
diff --git a/msg_thread.h b/msg_thread.h
new file mode 100644 (file)
index 0000000..0fee6ea
--- /dev/null
@@ -0,0 +1,52 @@
+#ifndef MSG_THREAD_H
+#define MSG_THREAD_H
+
+#include <string>
+#include <msg/msg.h>
+#include "synchro.h"
+
+class msg_thread {
+public:
+    msg_thread();
+    msg_thread(const char* name);
+    virtual ~msg_thread();
+
+    void start();
+    void wait();
+
+    virtual void run() = 0;
+
+private:
+    bool started;
+    mutex_t mutex;
+    condition_t cond;
+    m_process_t thread;
+    std::string thread_name;
+
+    void run_wrapper();
+    static int start_wrapper(int, char* []);
+};
+
+template <typename Func>
+class msg_thread_wrapper: public msg_thread {
+public:
+    msg_thread_wrapper(const char* name, Func run)
+        : msg_thread(name)
+        , real_run(run)
+    { }
+    void run() { real_run(); }
+private:
+    Func real_run;
+};
+
+template <typename Func>
+msg_thread* new_msg_thread(const char* name, Func run)
+{
+    return new msg_thread_wrapper<Func>(name, run);
+}
+
+#endif // !MSG_THREAD_H
+
+// Local variables:
+// mode: c++
+// End:
index 2509fe0f20e39107864ec390f53b3dc96f30ef9a..3e789cdb52bcff209fbbfaf2ffe7762e5ada5c7c 100644 (file)
@@ -405,7 +405,8 @@ void opt::usage()
               << "        depl : messages from auto deployment (inherited from main)\n"
               << "        comm : messages from asynchronous pipes\n"
               << "        proc : messages from base process class\n"
               << "        depl : messages from auto deployment (inherited from main)\n"
               << "        comm : messages from asynchronous pipes\n"
               << "        proc : messages from base process class\n"
-              << "        loba : messages from load-balancer\n";
+              << "        loba : messages from load-balancer\n"
+              << "        thrd : messages from thread wrapper class\n";
 
     // std::clog << "\nMiscellaneous low-level parameters\n";
 
 
     // std::clog << "\nMiscellaneous low-level parameters\n";