]> AND Private Git Repository - loba.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Wip++...
authorArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Fri, 10 Dec 2010 15:48:14 +0000 (16:48 +0100)
committerArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Tue, 14 Dec 2010 23:24:41 +0000 (00:24 +0100)
* improve communicator logic
* add ESSE macro

communicator.cpp
communicator.h
cost_func.cpp
misc.h
options.cpp
process.cpp
process.h

index da5e2b5f8062755727ed07dcd3617e8a07f0a0fb..45b52ef73eb1c76bbfecd528bcb89aab8c8a5bc9 100644 (file)
@@ -1,49 +1,42 @@
 #include <algorithm>
+#include <tr1/functional>
 #include <cstring>
 #include <msg/msg.h>
 #include <xbt/log.h>
 #include "communicator.h"
 #include "simgrid_features.h"
+#include "misc.h"
 
 // XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
                                 "Messages from asynchronous pipes");
 
-namespace {
-
-    bool comm_test_n_destroy(msg_comm_t& comm)
-    {
-        if (MSG_comm_test(comm)) {
-            MSG_comm_destroy(comm);
-            return true;
-        } else
-            return false;
-    }
-
-}
-
 communicator::communicator()
 {
     const char* hostname = MSG_host_get_name(MSG_host_self());
+
     ctrl_mbox = hostname;
     ctrl_mbox += "_ctrl";
     ctrl_task = NULL;
     ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
+    ctrl_close_is_last = false;
 
     data_mbox = hostname;
     data_mbox += "_data";
     data_task = NULL;
     data_comm = MSG_task_irecv(&data_task, get_data_mbox());
+    data_close_is_last = false;
 }
 
 communicator::~communicator()
 {
-    // fixme: don't know how to free pending communications
-    // (data_comm, ctrl_comm and sent_comm)
-
-    flush_sent();
+    if (ctrl_comm)
+        WARN0("ctrl_comm is pending!");
+    if (data_comm)
+        WARN0("data_comm is pending!");
     if (!sent_comm.empty())
-        WARN1("Lost %ld send communications!", (long )sent_comm.size());
+        WARN2("lost %ld send communication%s!",
+              (long )sent_comm.size(), ESSE(sent_comm.size()));
 }
 
 void communicator::send(const char* dest, message* msg)
@@ -56,34 +49,87 @@ void communicator::send(const char* dest, message* msg)
     flush_sent();
 }
 
-bool communicator::recv(message*& msg, m_host_t& from)
+bool communicator::recv(message*& msg, m_host_t& from, bool wait)
 {
+    bool restart;
     msg = NULL;
 
-    if (comm_test_n_destroy(ctrl_comm)) {
-        msg = (message* )MSG_task_get_data(ctrl_task);
-        from = MSG_task_get_source(ctrl_task);
-        MSG_task_destroy(ctrl_task);
-        ctrl_task = NULL;
-        ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
-
-    } else if (comm_test_n_destroy(data_comm)) {
-        msg = (message* )MSG_task_get_data(data_task);
-        from = MSG_task_get_source(data_task);
-        MSG_task_destroy(data_task);
-        data_task = NULL;
-        data_comm = MSG_task_irecv(&data_task, get_data_mbox());
-    }
+    do {
+        if (ctrl_comm && comm_test_n_destroy(ctrl_comm)) {
+            msg = (message* )MSG_task_get_data(ctrl_task);
+            from = MSG_task_get_source(ctrl_task);
+            MSG_task_destroy(ctrl_task);
+            ctrl_task = NULL;
+            ctrl_comm =
+                (!ctrl_close_is_last || msg->get_type() != message::CTRL_CLOSE)
+                ? ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox())
+                : ctrl_comm = NULL;
+
+        } else if (data_comm && comm_test_n_destroy(data_comm)) {
+            msg = (message* )MSG_task_get_data(data_task);
+            from = MSG_task_get_source(data_task);
+            MSG_task_destroy(data_task);
+            data_task = NULL;
+            data_comm =
+                (!data_close_is_last || msg->get_type() != message::DATA_CLOSE)
+                ? data_comm = MSG_task_irecv(&data_task, get_data_mbox())
+                : data_comm = NULL;
+        }
+
+        restart = wait && !msg && (ctrl_comm || data_comm);
+        if (restart) {
+            xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
+            if (ctrl_comm)
+                xbt_dynar_push(comms, &ctrl_comm);
+            if (data_comm)
+                xbt_dynar_push(comms, &data_comm);
+            MSG_comm_waitany(comms);
+            xbt_dynar_free(&comms);
+        }
+    } while (restart);
 
     return msg != NULL;
 }
 
+void communicator::wait_for_sent()
+{
+    xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
+    while (!sent_comm.empty()) {
+        std::for_each(sent_comm.begin(), sent_comm.end(),
+                      std::tr1::bind(xbt_dynar_push,
+                                     comms, std::tr1::placeholders::_1));
+        MSG_comm_waitany(comms);
+        xbt_dynar_reset(comms);
+        flush_sent();
+    }
+    xbt_dynar_free(&comms);
+}
+
+void communicator::next_close_on_ctrl_is_last()
+{
+    ctrl_close_is_last = true;
+}
+
+void communicator::next_close_on_data_is_last()
+{
+    data_close_is_last = true;
+}
+
 int communicator::send_backlog()
 {
     flush_sent();
     return sent_comm.size();
 }
 
+bool communicator::comm_test_n_destroy(msg_comm_t& comm)
+{
+    if (MSG_comm_test(comm)) {
+        MSG_comm_destroy(comm);
+        return true;
+    } else
+        return false;
+}
+
 void communicator::flush_sent()
 {
     std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy);
index c18b662410b7614c4e6d8877d3b44c1aa8967c21..ea096d60e974f33ab909cb3cf38dbcfa27f2c624 100644 (file)
@@ -9,7 +9,7 @@
 
 class message {
 public:
-    enum message_type { INFO, CREDIT, LOAD, CLOSE };
+    enum message_type { INFO, CREDIT, LOAD, CTRL_CLOSE, DATA_CLOSE };
 
     message(message_type t, double a): type(t), amount(a) { }
 
@@ -27,7 +27,11 @@ public:
     ~communicator();
 
     void send(const char* dest, message* msg);
-    bool recv(message*& msg, m_host_t& from);
+    bool recv(message*& msg, m_host_t& from, bool wait);
+    void wait_for_sent();
+
+    void next_close_on_ctrl_is_last();
+    void next_close_on_data_is_last();
 
     int send_backlog();
 
@@ -39,14 +43,17 @@ private:
     std::string ctrl_mbox;
     msg_comm_t  ctrl_comm;
     m_task_t    ctrl_task;
+    bool        ctrl_close_is_last;
 
     // Data channel for receiving
     std::string data_mbox;
     msg_comm_t  data_comm;
     m_task_t    data_task;
+    bool        data_close_is_last;
 
     const char* get_ctrl_mbox() const   { return ctrl_mbox.c_str(); }
     const char* get_data_mbox() const   { return data_mbox.c_str(); }
+    static bool comm_test_n_destroy(msg_comm_t& comm);
     void flush_sent();
 };
 
index c3a463b508c36ec0d9d428e35bddc385e3f73b50..23e6a53c140a53def6f39898b3a410aaea1a2033 100644 (file)
@@ -40,10 +40,10 @@ cost_func& cost_func::operator=(const cost_func& ref)
 
 double cost_func::operator()(double amount) const
 {
-    double ret = factor[degree];
+    double result = factor[degree];
     for (int i = degree - 1; i >= 0 ; i--)
-        ret = amount * ret + factor[i];
-    return ret;
+        result = amount * result + factor[i];
+    return result;
 }
 
 std::string cost_func::to_string()
diff --git a/misc.h b/misc.h
index a4a6db1e5bf141495b6977e1a59554921a68ee97..1decbefd4b27fa6497aa3e0b94c5c70d9a496ac5 100644 (file)
--- a/misc.h
+++ b/misc.h
@@ -10,6 +10,8 @@
 #define LOG_ISENABLED(priority) \
     (_XBT_LOG_ISENABLEDV((*_XBT_LOGV(default)), (priority)))
 
+#define ESSE(n) ((n) > 1 ? "s" : "")
+
 #endif // !MISC_H
 
 // Local variables:
index 4dd53150f784f3f5f680d19b46e7082c80bb8a32..d51fee22843a3803d878815bae0cc8d6fb7ff924 100644 (file)
@@ -34,6 +34,8 @@ namespace {
 
 int opt::parse_args(int* argc, char* argv[])
 {
+    int result = 1;
+
     char* tmp = strrchr(argv[0], '/');
     opt::program_name = (tmp ? tmp + 1 : argv[0]);
 
@@ -54,29 +56,35 @@ int opt::parse_args(int* argc, char* argv[])
             opt::version_requested = true;
             break;
         case '?':
-            WARN1("invalid option -- '%c'", optopt);
+            ERROR1("invalid option -- '%c'", optopt);
+            result = 0;
             break;
         }
     }
     if (opt::version_requested || opt::help_requested)
         return 1;
 
-    switch (*argc - optind) {
+    int rem_args = *argc - optind;
+    switch (rem_args) {
     case 0:
         ERROR0("missing parameter -- <plaform_file>");
     case 1:
         ERROR0("missing parameter -- <application_file>");
-        return 0;
+        result = 0;
+        break;
 
     default:
         opt::platform_file = argv[optind];
         opt::application_file = argv[optind + 1];
+        if (rem_args == 2)
+            break;
         for (int i = optind + 2 ; i < *argc ; ++i)
-            WARN1("unused parameter -- \"%s\"", argv[i]);
+            ERROR1("unused parameter -- \"%s\"", argv[i]);
+        result = 0;
         break;
     }
 
-    return 1;
+    return result;
 }
 
 void opt::print()
@@ -106,6 +114,7 @@ void opt::usage()
               << "print help and exit (use -hh or -hhh for extended help)\n";
     if (opt::help_requested < 1)
         return;
+
     std::clog << o("-V") << "print version and exit\n";
     std::clog << o("-b") << "activate bookkeeping\n";
     std::clog << oo("-c", "[fn,...]f0")
index b30fab4b0a9d8a43838e1d346bade1e06baa64f8..e9e234864dfbe733623aeb3ad9edb128a8658e52 100644 (file)
@@ -1,5 +1,5 @@
 #include <algorithm>
-#include <functional>
+#include <tr1/functional>
 #include <iterator>
 #include <stdexcept>
 #include <sstream>
@@ -15,8 +15,11 @@ process::process(int argc, char* argv[])
 {
     if (argc < 2 || !(std::istringstream(argv[1]) >> load))
         throw std::invalid_argument("bad or missing initial load");
+
     neigh.assign(argv + 2, argv + argc);
     expected_load = load;
+    ctrl_close_pending = data_close_pending = neigh.size();
+
     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
     if (!LOG_ISENABLED(logp))
         return;
@@ -24,10 +27,10 @@ process::process(int argc, char* argv[])
     std::ostringstream oss;
     oss << neigh.size() << " neighbor";
     if (!neigh.empty()) {
-        oss << (neigh.size() > 1 ? "s: " : ": ");
+        oss << ESSE(neigh.size()) << ": ";
         std::transform(neigh.begin(), neigh.end() - 1,
                        std::ostream_iterator<const char*>(oss, ", "),
-                       std::mem_fun_ref(&neighbor::get_name));
+                       std::tr1::mem_fn(&neighbor::get_name));
         oss << neigh.back().get_name();
     }
     LOG1(logp, "Got %s.", oss.str().c_str());
@@ -40,12 +43,21 @@ int process::run()
 
     int n = 100;
     while (n--) {
+        if (opt::bookkeeping)
+            INFO2("current load: %g ; expected: %g", load, expected_load);
+        else
+            INFO1("current load: %g", load);
+
         if (load > 0)
             compute();
         else
-            xbt_sleep(0.5);     // fixme
-        receive();
+            xbt_sleep(100);        // fixme
+        if (!receive(false))
+            n = 0;
     }
+    DEBUG0("going to finalize.");
+    finalize();
+
     //    MSG_process_sleep(100.0);   // xxx
     /* xxx:
      * while (there is something to do) {
@@ -64,56 +76,87 @@ int process::run()
      * - how to manage link failures ?
      */
 
-    // xxx: shall we retrieve pending tasks?
-
+    DEBUG0("done.");
     return 0;
 }
 
-void process::receive()
+void process::compute()
+{
+    double duration = opt::comp_cost(load);
+    m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
+    DEBUG2("compute %g flop%s.", duration, ESSE(duration));
+    MSG_task_execute(task);
+    MSG_task_destroy(task);
+}
+
+bool process::receive(bool wait_for_close)
 {
+    bool result = true;
     message* msg;
     m_host_t from;
-    while (comm.recv(msg, from)) {
+    while ((ctrl_close_pending ||
+            data_close_pending) && comm.recv(msg, from, wait_for_close)) {
         switch (msg->get_type()) {
         case message::INFO:
+            DEBUG0("received INFO");
             // fixme: update neighbor
+            // need a map m_host_t -> neighbor&
             break;
         case message::CREDIT:
+            DEBUG0("received CREDIT");
             expected_load += msg->get_amount();
             break;
         case message::LOAD:
+            DEBUG0("received LOAD");
             load += msg->get_amount();
             break;
+        case message::CTRL_CLOSE:
+            DEBUG0("received CTRL_CLOSE");
+            if (--ctrl_close_pending == 1)
+                comm.next_close_on_ctrl_is_last();
+            result = false;
+            break;
+        case message::DATA_CLOSE:
+            DEBUG0("received DATA_CLOSE");
+            if (--data_close_pending == 1)
+                comm.next_close_on_data_is_last();
+            result = false;
+            break;
         }
         delete msg;
     }
-}
-
-void process::compute()
-{
-    double duration = opt::comp_cost(load);
-    m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
-    DEBUG2("Compute %g flop%s.", duration, duration > 1 ? "s" : "");
-    MSG_task_execute(task);
-    MSG_task_destroy(task);
+    return result;
 }
 
 void process::finalize()
 {
-    // fixme
+    DEBUG2("send CLOSE to %d neighbor%s.",
+           (int )neigh.size(), ESSE(neigh.size()));
+    std::vector<neighbor>::iterator n;
+    for (n = neigh.begin() ; n != neigh.end() ; ++n) {
+        comm.send(n->get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
+        comm.send(n->get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
+    }
+
+    DEBUG2("wait for CLOSE from %d neighbor%s.",
+           (int )neigh.size(), ESSE(neigh.size()));
+    receive(true);
+
+    comm.wait_for_sent();
 }
 
 void process::print_loads(e_xbt_log_priority_t logp)
 {
     if (!LOG_ISENABLED(logp))
         return;
+
     std::ostringstream oss;
     if (neigh.empty()) {
         oss << "no neighbor!";
     } else {
         std::transform(neigh.begin(), neigh.end() - 1,
                        std::ostream_iterator<double>(oss, ", "),
-                       std::mem_fun_ref(&neighbor::get_load));
+                       std::tr1::mem_fn(&neighbor::get_load));
         oss << neigh.back().get_load();
     }
     LOG1(logp, "Neighbor loads: %s", oss.str().c_str());
index a301046ab8907fadd611a106fc11be0258d33166..3baabbfddad931a25f1fe689faef32b6ca8882e3 100644 (file)
--- a/process.h
+++ b/process.h
@@ -18,8 +18,11 @@ private:
     double load;
     double expected_load;
 
-    void receive();
+    int ctrl_close_pending;
+    int data_close_pending;
+
     void compute();
+    bool receive(bool wait_for_close);
     void finalize();
     void print_loads(e_xbt_log_priority_t logp = xbt_log_priority_info);
 };