]> AND Private Git Repository - loba.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Wip++...
authorArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Thu, 9 Dec 2010 16:08:20 +0000 (17:08 +0100)
committerArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Tue, 14 Dec 2010 23:23:51 +0000 (00:23 +0100)
* improve commuicator logic

communicator.cpp
communicator.h
neighbor.cpp
neighbor.h
process.cpp
process.h

index 93389c756d1253b9ade6e530556a3b3eb0965281..da5e2b5f8062755727ed07dcd3617e8a07f0a0fb 100644 (file)
@@ -22,24 +22,18 @@ namespace {
 
 }
 
-class communicator::message  {
-public:
-    message(message_type t, double a): type(t), amount(a) { }
-
-    message_type type;
-    double amount;
-};
-
 communicator::communicator()
 {
     const char* hostname = MSG_host_get_name(MSG_host_self());
-    ctrl_mbox = bprintf("%s_ctrl", hostname);
+    ctrl_mbox = hostname;
+    ctrl_mbox += "_ctrl";
     ctrl_task = NULL;
-    ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
+    ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
 
-    data_mbox = bprintf("%s_data", hostname);
+    data_mbox = hostname;
+    data_mbox += "_data";
     data_task = NULL;
-    data_comm = MSG_task_irecv(&data_task, data_mbox);
+    data_comm = MSG_task_irecv(&data_task, get_data_mbox());
 }
 
 communicator::~communicator()
@@ -47,81 +41,41 @@ communicator::~communicator()
     // fixme: don't know how to free pending communications
     // (data_comm, ctrl_comm and sent_comm)
 
-    free(data_mbox);
-    free(ctrl_mbox);
-
     flush_sent();
     if (!sent_comm.empty())
         WARN1("Lost %ld send communications!", (long )sent_comm.size());
 }
 
-void communicator::send_info(const neighbor& dest, double amount)
-{
-    message* msg = new message(INFO_MSG, amount);
-    m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg);
-    send(dest.get_ctrl_mbox(), task);
-}
-
-void communicator::send_credit(const neighbor& dest, double amount)
-{
-    message* msg = new message(CREDIT_MSG, amount);
-    m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg);
-    send(dest.get_ctrl_mbox(), task);
-}
-
-void communicator::send_load(const neighbor& dest, double amount)
-{
-    m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL);
-    send(dest.get_data_mbox(), task);
-}
-
-void communicator::send(const char* dest, m_task_t task)
+void communicator::send(const char* dest, message* msg)
 {
+    double msg_size = sizeof *msg;
+    if (msg->get_type() == message::LOAD)
+        msg_size += msg->get_amount();
+    m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);    
     sent_comm.push_back(MSG_task_isend(task, dest));
     flush_sent();
 }
 
-bool communicator::recv_info(double& amount, m_host_t& from)
+bool communicator::recv(message*& msg, m_host_t& from)
 {
-    return recv_ctrl(INFO_MSG, amount, from);
-}
+    msg = NULL;
 
-bool communicator::recv_credit(double& amount, m_host_t& from)
-{
-    return recv_ctrl(CREDIT_MSG, amount, from);
-}
+    if (comm_test_n_destroy(ctrl_comm)) {
+        msg = (message* )MSG_task_get_data(ctrl_task);
+        from = MSG_task_get_source(ctrl_task);
+        MSG_task_destroy(ctrl_task);
+        ctrl_task = NULL;
+        ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
 
-bool communicator::recv_load(double& amount, m_host_t& from)
-{
-    bool res = comm_test_n_destroy(data_comm);
-    if (res) {
-        amount = MSG_task_get_data_size(data_task);
+    } else if (comm_test_n_destroy(data_comm)) {
+        msg = (message* )MSG_task_get_data(data_task);
         from = MSG_task_get_source(data_task);
         MSG_task_destroy(data_task);
         data_task = NULL;
-        data_comm = MSG_task_irecv(&data_task, data_mbox);
+        data_comm = MSG_task_irecv(&data_task, get_data_mbox());
     }
-    return res;
-}
 
-bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from)
-{
-    bool res = MSG_comm_test(ctrl_comm);
-    if (res) {
-        message* msg = (message* )MSG_task_get_data(ctrl_task);
-        if (msg->type == type) {
-            MSG_comm_destroy(ctrl_comm);
-            amount = msg->amount;
-            from = MSG_task_get_source(ctrl_task);
-            delete msg;
-            MSG_task_destroy(ctrl_task);
-            ctrl_task = NULL;
-            ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
-        } else {
-            res = false;
-        }
-    }
-    return res;
+    return msg != NULL;
 }
 
 int communicator::send_backlog()
index f1ee1d96cc0eb11d98ad1cb95585f8f6c417448c..c18b662410b7614c4e6d8877d3b44c1aa8967c21 100644 (file)
@@ -4,45 +4,49 @@
 #define COMMUNICATOR_H
 
 #include <list>
-
+#include <string>
 #include <msg/msg.h>
-#include <xbt/sysdep.h>
-#include "neighbor.h"
+
+class message {
+public:
+    enum message_type { INFO, CREDIT, LOAD, CLOSE };
+
+    message(message_type t, double a): type(t), amount(a) { }
+
+    message_type get_type() const       { return type;   }
+    double get_amount() const           { return amount; }
+
+private:
+    message_type type;
+    double amount;
+};
 
 class communicator {
 public:
     communicator();
     ~communicator();
 
-    void send_info(const neighbor& dest, double amount);
-    void send_credit(const neighbor& dest, double amount);
-    void send_load(const neighbor& dest, double amount);
-
-    bool recv_info(double& amount, m_host_t& from);
-    bool recv_credit(double& amount, m_host_t& from);
-    bool recv_load(double& amount, m_host_t& from);
+    void send(const char* dest, message* msg);
+    bool recv(message*& msg, m_host_t& from);
 
     int send_backlog();
 
 private:
-    enum message_type { INFO_MSG, CREDIT_MSG };
-    class message;
-
     // List of pending send communications
     std::list<msg_comm_t> sent_comm;
 
     // Control channel for receiving
-    char*       ctrl_mbox;
+    std::string ctrl_mbox;
     msg_comm_t  ctrl_comm;
     m_task_t    ctrl_task;
 
     // Data channel for receiving
-    char*       data_mbox;
+    std::string data_mbox;
     msg_comm_t  data_comm;
     m_task_t    data_task;
 
-    bool recv_ctrl(message_type type, double& amount, m_host_t& from);
-    void send(const char* dest, m_task_t task);
+    const char* get_ctrl_mbox() const   { return ctrl_mbox.c_str(); }
+    const char* get_data_mbox() const   { return data_mbox.c_str(); }
     void flush_sent();
 };
 
index e7a4666151df22ecc771ef1c9c883cfdb8e4f0ec..8715396e9d2be53313dc35c85a34e88b19ba3a8d 100644 (file)
@@ -1,18 +1,16 @@
-#include <xbt/sysdep.h>
 #include "neighbor.h"
 
 neighbor::neighbor(const char* hostname)
+    : name(hostname)
+    , ctrl_mbox(hostname)
+    , data_mbox(hostname)
+    , load(std::numeric_limits<double>::infinity())
+    , debt(0.0)
 {
-    load = std::numeric_limits<double>::infinity();
-    debt = 0.0;
-    name = xbt_strdup(hostname);
-    ctrl_mbox = bprintf("%s_ctrl", hostname);
-    data_mbox = bprintf("%s_data", hostname);
+    ctrl_mbox += "_ctrl";
+    data_mbox += "_data";
 }
 
 neighbor::~neighbor()
 {
-    free(data_mbox);
-    free(ctrl_mbox);
-    free(name);
 }
index 52a0ef489f700b8f7912e24405bfcaeb6a95f6b9..4bd05a9d86d0965fb5a2c2c82c4eebe56c57ccda 100644 (file)
@@ -9,9 +9,9 @@ public:
     neighbor(const char* hostname);
     ~neighbor();
 
-    const char* get_name() const        { return name; }
-    const char* get_ctrl_mbox() const   { return ctrl_mbox; }
-    const char* get_data_mbox() const   { return data_mbox; }
+    const char* get_name() const        { return name.c_str(); }
+    const char* get_ctrl_mbox() const   { return ctrl_mbox.c_str(); }
+    const char* get_data_mbox() const   { return data_mbox.c_str(); }
 
     double get_load() const             { return load; }
     void set_load(double l)             { load = l;    }
@@ -20,9 +20,9 @@ public:
     void set_debt(double d)             { debt = d;    }
 
 private:
-    char* name;
-    char* ctrl_mbox;
-    char* data_mbox;
+    std::string name;
+    std::string ctrl_mbox;
+    std::string data_mbox;
 
     double load;
     double debt;
index 4ee3647d6895c4eef6586f92f29ffc498b45b14e..b30fab4b0a9d8a43838e1d346bade1e06baa64f8 100644 (file)
@@ -54,6 +54,8 @@ int process::run()
      *    compute load balancing;
      *    send tasks to neighbors;
      * }
+     * finalize;
+     * wait for pending messages;
      */
 
     /* Open Questions :
@@ -69,24 +71,22 @@ int process::run()
 
 void process::receive()
 {
-    bool received;
-    do {
-        double amount;
-        m_host_t from;
-        received = false;
-        if (comm.recv_info(amount, from)) {
+    message* msg;
+    m_host_t from;
+    while (comm.recv(msg, from)) {
+        switch (msg->get_type()) {
+        case message::INFO:
             // fixme: update neighbor
-            received = true;
+            break;
+        case message::CREDIT:
+            expected_load += msg->get_amount();
+            break;
+        case message::LOAD:
+            load += msg->get_amount();
+            break;
         }
-        if (comm.recv_credit(amount, from)) {
-            expected_load += amount;
-            received = true;
-        }
-        if (comm.recv_load(amount, from)) {
-            load += amount;
-            received = true;
-        }
-    } while (received);
+        delete msg;
+    }
 }
 
 void process::compute()
@@ -98,6 +98,11 @@ void process::compute()
     MSG_task_destroy(task);
 }
 
+void process::finalize()
+{
+    // fixme
+}
+
 void process::print_loads(e_xbt_log_priority_t logp)
 {
     if (!LOG_ISENABLED(logp))
index 586da5525059896f011e192b3edee2ed7897d7b1..a301046ab8907fadd611a106fc11be0258d33166 100644 (file)
--- a/process.h
+++ b/process.h
@@ -20,6 +20,7 @@ private:
 
     void receive();
     void compute();
+    void finalize();
     void print_loads(e_xbt_log_priority_t logp = xbt_log_priority_info);
 };