]> AND Private Git Repository - loba.git/blobdiff - communicator.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Wip++...
[loba.git] / communicator.cpp
index 0d724aa171e11c3d1fe7793ece5d5210941fd670..da5e2b5f8062755727ed07dcd3617e8a07f0a0fb 100644 (file)
@@ -3,12 +3,14 @@
 #include <msg/msg.h>
 #include <xbt/log.h>
 #include "communicator.h"
+#include "simgrid_features.h"
 
 // XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
                                 "Messages from asynchronous pipes");
 
 namespace {
+
     bool comm_test_n_destroy(msg_comm_t& comm)
     {
         if (MSG_comm_test(comm)) {
@@ -17,56 +19,72 @@ namespace {
         } else
             return false;
     }
+
 }
 
 communicator::communicator()
 {
     const char* hostname = MSG_host_get_name(MSG_host_self());
-    size_t len = std::strlen(hostname);
-    recv_mbox = new char[len + 1];
-    strcpy(recv_mbox, hostname);
-    recv_task = NULL;
-    recv_comm = MSG_task_irecv(&recv_task, recv_mbox);
+    ctrl_mbox = hostname;
+    ctrl_mbox += "_ctrl";
+    ctrl_task = NULL;
+    ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
+
+    data_mbox = hostname;
+    data_mbox += "_data";
+    data_task = NULL;
+    data_comm = MSG_task_irecv(&data_task, get_data_mbox());
 }
 
 communicator::~communicator()
 {
-    send_acknowledge();
+    // fixme: don't know how to free pending communications
+    // (data_comm, ctrl_comm and sent_comm)
+
+    flush_sent();
     if (!sent_comm.empty())
         WARN1("Lost %ld send communications!", (long )sent_comm.size());
-    delete[] recv_mbox;
 }
 
-
-void communicator::send(m_task_t task, const char *dest)
+void communicator::send(const char* dest, message* msg)
 {
+    double msg_size = sizeof *msg;
+    if (msg->get_type() == message::LOAD)
+        msg_size += msg->get_amount();
+    m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);    
     sent_comm.push_back(MSG_task_isend(task, dest));
-    send_acknowledge();
+    flush_sent();
 }
 
-void communicator::send(m_task_t task, const std::string& dest)
+bool communicator::recv(message*& msg, m_host_t& from)
 {
-    send(task, dest.c_str());
-}
+    msg = NULL;
 
-m_task_t communicator::recv()
-{
-   m_task_t task = NULL;
-   if (comm_test_n_destroy(recv_comm)) {
-        task = recv_task;
-        recv_task = NULL;
-        recv_comm = MSG_task_irecv(&recv_task, recv_mbox);
+    if (comm_test_n_destroy(ctrl_comm)) {
+        msg = (message* )MSG_task_get_data(ctrl_task);
+        from = MSG_task_get_source(ctrl_task);
+        MSG_task_destroy(ctrl_task);
+        ctrl_task = NULL;
+        ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
+
+    } else if (comm_test_n_destroy(data_comm)) {
+        msg = (message* )MSG_task_get_data(data_task);
+        from = MSG_task_get_source(data_task);
+        MSG_task_destroy(data_task);
+        data_task = NULL;
+        data_comm = MSG_task_irecv(&data_task, get_data_mbox());
     }
-    return task;
+
+    return msg != NULL;
 }
 
-int communicator::sent_count()
+int communicator::send_backlog()
 {
-    send_acknowledge();
+    flush_sent();
     return sent_comm.size();
 }
 
-void communicator::send_acknowledge()
+void communicator::flush_sent()
 {
     std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy);
 }