From f4125505064e3ff346b31ab9e48f894672e5a7a7 Mon Sep 17 00:00:00 2001
From: Arnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Date: Thu, 9 Dec 2010 17:08:20 +0100
Subject: [PATCH] Wip++...

* improve commuicator logic
---
 communicator.cpp | 92 ++++++++++++------------------------------------
 communicator.h   | 38 +++++++++++---------
 neighbor.cpp     | 16 ++++-----
 neighbor.h       | 12 +++----
 process.cpp      | 37 ++++++++++---------
 process.h        |  1 +
 6 files changed, 79 insertions(+), 117 deletions(-)

diff --git a/communicator.cpp b/communicator.cpp
index 93389c7..da5e2b5 100644
--- a/communicator.cpp
+++ b/communicator.cpp
@@ -22,24 +22,18 @@ namespace {
 
 }
 
-class communicator::message  {
-public:
-    message(message_type t, double a): type(t), amount(a) { }
-
-    message_type type;
-    double amount;
-};
-
 communicator::communicator()
 {
     const char* hostname = MSG_host_get_name(MSG_host_self());
-    ctrl_mbox = bprintf("%s_ctrl", hostname);
+    ctrl_mbox = hostname;
+    ctrl_mbox += "_ctrl";
     ctrl_task = NULL;
-    ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
+    ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
 
-    data_mbox = bprintf("%s_data", hostname);
+    data_mbox = hostname;
+    data_mbox += "_data";
     data_task = NULL;
-    data_comm = MSG_task_irecv(&data_task, data_mbox);
+    data_comm = MSG_task_irecv(&data_task, get_data_mbox());
 }
 
 communicator::~communicator()
@@ -47,81 +41,41 @@ communicator::~communicator()
     // fixme: don't know how to free pending communications
     // (data_comm, ctrl_comm and sent_comm)
 
-    free(data_mbox);
-    free(ctrl_mbox);
-
     flush_sent();
     if (!sent_comm.empty())
         WARN1("Lost %ld send communications!", (long )sent_comm.size());
 }
 
-void communicator::send_info(const neighbor& dest, double amount)
-{
-    message* msg = new message(INFO_MSG, amount);
-    m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg);
-    send(dest.get_ctrl_mbox(), task);
-}
-
-void communicator::send_credit(const neighbor& dest, double amount)
-{
-    message* msg = new message(CREDIT_MSG, amount);
-    m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg);
-    send(dest.get_ctrl_mbox(), task);
-}
-
-void communicator::send_load(const neighbor& dest, double amount)
-{
-    m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL);
-    send(dest.get_data_mbox(), task);
-}
-
-void communicator::send(const char* dest, m_task_t task)
+void communicator::send(const char* dest, message* msg)
 {
+    double msg_size = sizeof *msg;
+    if (msg->get_type() == message::LOAD)
+        msg_size += msg->get_amount();
+    m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);    
     sent_comm.push_back(MSG_task_isend(task, dest));
     flush_sent();
 }
 
-bool communicator::recv_info(double& amount, m_host_t& from)
+bool communicator::recv(message*& msg, m_host_t& from)
 {
-    return recv_ctrl(INFO_MSG, amount, from);
-}
+    msg = NULL;
 
-bool communicator::recv_credit(double& amount, m_host_t& from)
-{
-    return recv_ctrl(CREDIT_MSG, amount, from);
-}
+    if (comm_test_n_destroy(ctrl_comm)) {
+        msg = (message* )MSG_task_get_data(ctrl_task);
+        from = MSG_task_get_source(ctrl_task);
+        MSG_task_destroy(ctrl_task);
+        ctrl_task = NULL;
+        ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
 
-bool communicator::recv_load(double& amount, m_host_t& from)
-{
-    bool res = comm_test_n_destroy(data_comm);
-    if (res) {
-        amount = MSG_task_get_data_size(data_task);
+    } else if (comm_test_n_destroy(data_comm)) {
+        msg = (message* )MSG_task_get_data(data_task);
         from = MSG_task_get_source(data_task);
         MSG_task_destroy(data_task);
         data_task = NULL;
-        data_comm = MSG_task_irecv(&data_task, data_mbox);
+        data_comm = MSG_task_irecv(&data_task, get_data_mbox());
     }
-    return res;
-}
 
-bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from)
-{
-    bool res = MSG_comm_test(ctrl_comm);
-    if (res) {
-        message* msg = (message* )MSG_task_get_data(ctrl_task);
-        if (msg->type == type) {
-            MSG_comm_destroy(ctrl_comm);
-            amount = msg->amount;
-            from = MSG_task_get_source(ctrl_task);
-            delete msg;
-            MSG_task_destroy(ctrl_task);
-            ctrl_task = NULL;
-            ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
-        } else {
-            res = false;
-        }
-    }
-    return res;
+    return msg != NULL;
 }
 
 int communicator::send_backlog()
diff --git a/communicator.h b/communicator.h
index f1ee1d9..c18b662 100644
--- a/communicator.h
+++ b/communicator.h
@@ -4,45 +4,49 @@
 #define COMMUNICATOR_H
 
 #include <list>
-
+#include <string>
 #include <msg/msg.h>
-#include <xbt/sysdep.h>
-#include "neighbor.h"
+
+class message {
+public:
+    enum message_type { INFO, CREDIT, LOAD, CLOSE };
+
+    message(message_type t, double a): type(t), amount(a) { }
+
+    message_type get_type() const       { return type;   }
+    double get_amount() const           { return amount; }
+
+private:
+    message_type type;
+    double amount;
+};
 
 class communicator {
 public:
     communicator();
     ~communicator();
 
-    void send_info(const neighbor& dest, double amount);
-    void send_credit(const neighbor& dest, double amount);
-    void send_load(const neighbor& dest, double amount);
-
-    bool recv_info(double& amount, m_host_t& from);
-    bool recv_credit(double& amount, m_host_t& from);
-    bool recv_load(double& amount, m_host_t& from);
+    void send(const char* dest, message* msg);
+    bool recv(message*& msg, m_host_t& from);
 
     int send_backlog();
 
 private:
-    enum message_type { INFO_MSG, CREDIT_MSG };
-    class message;
-
     // List of pending send communications
     std::list<msg_comm_t> sent_comm;
 
     // Control channel for receiving
-    char*       ctrl_mbox;
+    std::string ctrl_mbox;
     msg_comm_t  ctrl_comm;
     m_task_t    ctrl_task;
 
     // Data channel for receiving
-    char*       data_mbox;
+    std::string data_mbox;
     msg_comm_t  data_comm;
     m_task_t    data_task;
 
-    bool recv_ctrl(message_type type, double& amount, m_host_t& from);
-    void send(const char* dest, m_task_t task);
+    const char* get_ctrl_mbox() const   { return ctrl_mbox.c_str(); }
+    const char* get_data_mbox() const   { return data_mbox.c_str(); }
     void flush_sent();
 };
 
diff --git a/neighbor.cpp b/neighbor.cpp
index e7a4666..8715396 100644
--- a/neighbor.cpp
+++ b/neighbor.cpp
@@ -1,18 +1,16 @@
-#include <xbt/sysdep.h>
 #include "neighbor.h"
 
 neighbor::neighbor(const char* hostname)
+    : name(hostname)
+    , ctrl_mbox(hostname)
+    , data_mbox(hostname)
+    , load(std::numeric_limits<double>::infinity())
+    , debt(0.0)
 {
-    load = std::numeric_limits<double>::infinity();
-    debt = 0.0;
-    name = xbt_strdup(hostname);
-    ctrl_mbox = bprintf("%s_ctrl", hostname);
-    data_mbox = bprintf("%s_data", hostname);
+    ctrl_mbox += "_ctrl";
+    data_mbox += "_data";
 }
 
 neighbor::~neighbor()
 {
-    free(data_mbox);
-    free(ctrl_mbox);
-    free(name);
 }
diff --git a/neighbor.h b/neighbor.h
index 52a0ef4..4bd05a9 100644
--- a/neighbor.h
+++ b/neighbor.h
@@ -9,9 +9,9 @@ public:
     neighbor(const char* hostname);
     ~neighbor();
 
-    const char* get_name() const        { return name; }
-    const char* get_ctrl_mbox() const   { return ctrl_mbox; }
-    const char* get_data_mbox() const   { return data_mbox; }
+    const char* get_name() const        { return name.c_str(); }
+    const char* get_ctrl_mbox() const   { return ctrl_mbox.c_str(); }
+    const char* get_data_mbox() const   { return data_mbox.c_str(); }
 
     double get_load() const             { return load; }
     void set_load(double l)             { load = l;    }
@@ -20,9 +20,9 @@ public:
     void set_debt(double d)             { debt = d;    }
 
 private:
-    char* name;
-    char* ctrl_mbox;
-    char* data_mbox;
+    std::string name;
+    std::string ctrl_mbox;
+    std::string data_mbox;
 
     double load;
     double debt;
diff --git a/process.cpp b/process.cpp
index 4ee3647..b30fab4 100644
--- a/process.cpp
+++ b/process.cpp
@@ -54,6 +54,8 @@ int process::run()
      *    compute load balancing;
      *    send tasks to neighbors;
      * }
+     * finalize;
+     * wait for pending messages;
      */
 
     /* Open Questions :
@@ -69,24 +71,22 @@ int process::run()
 
 void process::receive()
 {
-    bool received;
-    do {
-        double amount;
-        m_host_t from;
-        received = false;
-        if (comm.recv_info(amount, from)) {
+    message* msg;
+    m_host_t from;
+    while (comm.recv(msg, from)) {
+        switch (msg->get_type()) {
+        case message::INFO:
             // fixme: update neighbor
-            received = true;
+            break;
+        case message::CREDIT:
+            expected_load += msg->get_amount();
+            break;
+        case message::LOAD:
+            load += msg->get_amount();
+            break;
         }
-        if (comm.recv_credit(amount, from)) {
-            expected_load += amount;
-            received = true;
-        }
-        if (comm.recv_load(amount, from)) {
-            load += amount;
-            received = true;
-        }
-    } while (received);
+        delete msg;
+    }
 }
 
 void process::compute()
@@ -98,6 +98,11 @@ void process::compute()
     MSG_task_destroy(task);
 }
 
+void process::finalize()
+{
+    // fixme
+}
+
 void process::print_loads(e_xbt_log_priority_t logp)
 {
     if (!LOG_ISENABLED(logp))
diff --git a/process.h b/process.h
index 586da55..a301046 100644
--- a/process.h
+++ b/process.h
@@ -20,6 +20,7 @@ private:
 
     void receive();
     void compute();
+    void finalize();
     void print_loads(e_xbt_log_priority_t logp = xbt_log_priority_info);
 };
 
-- 
2.39.5