]> AND Private Git Repository - loba.git/blobdiff - communicator.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Wip++...
[loba.git] / communicator.cpp
index b78b1c3a97bdda6d6ddba7524b23d433d5c4bbe1..93389c756d1253b9ade6e530556a3b3eb0965281 100644 (file)
@@ -3,12 +3,14 @@
 #include <msg/msg.h>
 #include <xbt/log.h>
 #include "communicator.h"
 #include <msg/msg.h>
 #include <xbt/log.h>
 #include "communicator.h"
+#include "simgrid_features.h"
 
 // XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
                                 "Messages from asynchronous pipes");
 
 namespace {
 
 // XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
                                 "Messages from asynchronous pipes");
 
 namespace {
+
     bool comm_test_n_destroy(msg_comm_t& comm)
     {
         if (MSG_comm_test(comm)) {
     bool comm_test_n_destroy(msg_comm_t& comm)
     {
         if (MSG_comm_test(comm)) {
@@ -17,56 +19,118 @@ namespace {
         } else
             return false;
     }
         } else
             return false;
     }
+
 }
 
 }
 
+class communicator::message  {
+public:
+    message(message_type t, double a): type(t), amount(a) { }
+
+    message_type type;
+    double amount;
+};
+
 communicator::communicator()
 {
     const char* hostname = MSG_host_get_name(MSG_host_self());
 communicator::communicator()
 {
     const char* hostname = MSG_host_get_name(MSG_host_self());
-    size_t len = std::strlen(hostname);
-    recv_mbox = new char[len + 1];
-    strcpy(recv_mbox, hostname);
-    recv_task = NULL;
-    recv_comm = MSG_task_irecv(&recv_task, recv_mbox);
+    ctrl_mbox = bprintf("%s_ctrl", hostname);
+    ctrl_task = NULL;
+    ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
+
+    data_mbox = bprintf("%s_data", hostname);
+    data_task = NULL;
+    data_comm = MSG_task_irecv(&data_task, data_mbox);
 }
 
 communicator::~communicator()
 {
 }
 
 communicator::~communicator()
 {
-    send_acknowledge();
+    // fixme: don't know how to free pending communications
+    // (data_comm, ctrl_comm and sent_comm)
+
+    free(data_mbox);
+    free(ctrl_mbox);
+
+    flush_sent();
     if (!sent_comm.empty())
         WARN1("Lost %ld send communications!", (long )sent_comm.size());
     if (!sent_comm.empty())
         WARN1("Lost %ld send communications!", (long )sent_comm.size());
-    delete[] recv_mbox;
 }
 
 }
 
+void communicator::send_info(const neighbor& dest, double amount)
+{
+    message* msg = new message(INFO_MSG, amount);
+    m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg);
+    send(dest.get_ctrl_mbox(), task);
+}
+
+void communicator::send_credit(const neighbor& dest, double amount)
+{
+    message* msg = new message(CREDIT_MSG, amount);
+    m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg);
+    send(dest.get_ctrl_mbox(), task);
+}
 
 
-void communicator::send(m_task_t task, const char *dest)
+void communicator::send_load(const neighbor& dest, double amount)
+{
+    m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL);
+    send(dest.get_data_mbox(), task);
+}
+
+void communicator::send(const char* dest, m_task_t task)
 {
     sent_comm.push_back(MSG_task_isend(task, dest));
 {
     sent_comm.push_back(MSG_task_isend(task, dest));
-    send_acknowledge();
+    flush_sent();
+}
+
+bool communicator::recv_info(double& amount, m_host_t& from)
+{
+    return recv_ctrl(INFO_MSG, amount, from);
 }
 
 }
 
-void communicator::send(m_task_t task, const std::string& dest)
+bool communicator::recv_credit(double& amount, m_host_t& from)
 {
 {
-    send(task, dest.c_str());
+    return recv_ctrl(CREDIT_MSG, amount, from);
 }
 
 }
 
-m_task_t communicator::recv()
-{ 
-   m_task_t task = NULL;
-   if (comm_test_n_destroy(recv_comm)) {
-        task = recv_task;
-        recv_task = NULL;
-        recv_comm = MSG_task_irecv(&recv_task, recv_mbox);
+bool communicator::recv_load(double& amount, m_host_t& from)
+{
+    bool res = comm_test_n_destroy(data_comm);
+    if (res) {
+        amount = MSG_task_get_data_size(data_task);
+        from = MSG_task_get_source(data_task);
+        MSG_task_destroy(data_task);
+        data_task = NULL;
+        data_comm = MSG_task_irecv(&data_task, data_mbox);
+    }
+    return res;
+}
+
+bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from)
+{
+    bool res = MSG_comm_test(ctrl_comm);
+    if (res) {
+        message* msg = (message* )MSG_task_get_data(ctrl_task);
+        if (msg->type == type) {
+            MSG_comm_destroy(ctrl_comm);
+            amount = msg->amount;
+            from = MSG_task_get_source(ctrl_task);
+            delete msg;
+            MSG_task_destroy(ctrl_task);
+            ctrl_task = NULL;
+            ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
+        } else {
+            res = false;
+        }
     }
     }
-    return task;
+    return res;
 }
 
 }
 
-int communicator::sent_count()
+int communicator::send_backlog()
 {
 {
-    send_acknowledge();
+    flush_sent();
     return sent_comm.size();
 }
 
     return sent_comm.size();
 }
 
-void communicator::send_acknowledge()
+void communicator::flush_sent()
 {
     std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy);
 }
 {
     std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy);
 }