]> AND Private Git Repository - loba.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Wip++...
authorArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Fri, 10 Dec 2010 22:42:30 +0000 (23:42 +0100)
committerArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Tue, 14 Dec 2010 23:26:14 +0000 (00:26 +0100)
* improve communicator logic
* add options maxiter and exit_on_close
* add valgrind suppressions file

communicator.cpp
communicator.h
options.cpp
options.h
process.cpp
valgrind_suppressions_3.5 [new file with mode: 0644]

index 45b52ef73eb1c76bbfecd528bcb89aab8c8a5bc9..0c64b5b9e22869026db3043083ed27f5deec0b32 100644 (file)
@@ -1,6 +1,6 @@
 #include <algorithm>
 #include <tr1/functional>
-#include <cstring>
+#include <sstream>
 #include <msg/msg.h>
 #include <xbt/log.h>
 #include "communicator.h"
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
                                 "Messages from asynchronous pipes");
 
+std::string message::to_string()
+{
+    static const char* str[] = { "INFO", "CREDIT", "LOAD",
+                                 "CTRL_CLOSE", "DATA_CLOSE" };
+    std::ostringstream oss;
+    oss << str[type] << " (" << amount << ")";
+    return oss.str();
+}
+
 communicator::communicator()
 {
     const char* hostname = MSG_host_get_name(MSG_host_self());
@@ -18,13 +27,13 @@ communicator::communicator()
     ctrl_mbox = hostname;
     ctrl_mbox += "_ctrl";
     ctrl_task = NULL;
-    ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
+    ctrl_comm = NULL;
     ctrl_close_is_last = false;
 
     data_mbox = hostname;
     data_mbox += "_data";
     data_task = NULL;
-    data_comm = MSG_task_irecv(&data_task, get_data_mbox());
+    data_comm = NULL;
     data_close_is_last = false;
 }
 
@@ -39,14 +48,20 @@ communicator::~communicator()
               (long )sent_comm.size(), ESSE(sent_comm.size()));
 }
 
+void communicator::listen()
+{
+    ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
+    data_comm = MSG_task_irecv(&data_task, get_data_mbox());
+}
+
 void communicator::send(const char* dest, message* msg)
 {
     double msg_size = sizeof *msg;
     if (msg->get_type() == message::LOAD)
         msg_size += msg->get_amount();
     m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);    
-    sent_comm.push_back(MSG_task_isend(task, dest));
-    flush_sent();
+    msg_comm_t comm = MSG_task_isend(task, dest);
+    sent_comm.push_back(comm);
 }
 
 bool communicator::recv(message*& msg, m_host_t& from, bool wait)
@@ -62,8 +77,8 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait)
             ctrl_task = NULL;
             ctrl_comm =
                 (!ctrl_close_is_last || msg->get_type() != message::CTRL_CLOSE)
-                ? ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox())
-                : ctrl_comm = NULL;
+                ? MSG_task_irecv(&ctrl_task, get_ctrl_mbox())
+                : NULL;
 
         } else if (data_comm && comm_test_n_destroy(data_comm)) {
             msg = (message* )MSG_task_get_data(data_task);
@@ -72,8 +87,8 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait)
             data_task = NULL;
             data_comm =
                 (!data_close_is_last || msg->get_type() != message::DATA_CLOSE)
-                ? data_comm = MSG_task_irecv(&data_task, get_data_mbox())
-                : data_comm = NULL;
+                ? MSG_task_irecv(&data_task, get_data_mbox())
+                : NULL;
         }
 
         restart = wait && !msg && (ctrl_comm || data_comm);
@@ -91,18 +106,21 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait)
     return msg != NULL;
 }
 
-void communicator::wait_for_sent()
+void communicator::flush(bool wait)
 {
-    xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
-    while (!sent_comm.empty()) {
-        std::for_each(sent_comm.begin(), sent_comm.end(),
-                      std::tr1::bind(xbt_dynar_push,
-                                     comms, std::tr1::placeholders::_1));
-        MSG_comm_waitany(comms);
-        xbt_dynar_reset(comms);
-        flush_sent();
+    sent_comm.remove_if(comm_test_n_destroy);
+    if (wait && !sent_comm.empty()) {
+        xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
+        while (!sent_comm.empty()) {
+            std::for_each(sent_comm.begin(), sent_comm.end(),
+                          std::tr1::bind(comm_push_in_dynar,
+                                         comms, std::tr1::placeholders::_1));
+            MSG_comm_waitany(comms);
+            xbt_dynar_reset(comms);
+            sent_comm.remove_if(comm_test_n_destroy);
+        }
+        xbt_dynar_free(&comms);
     }
-    xbt_dynar_free(&comms);
 }
 
 void communicator::next_close_on_ctrl_is_last()
@@ -115,13 +133,12 @@ void communicator::next_close_on_data_is_last()
     data_close_is_last = true;
 }
 
-int communicator::send_backlog()
+void communicator::comm_push_in_dynar(xbt_dynar_t dynar, msg_comm_t comm)
 {
-    flush_sent();
-    return sent_comm.size();
+    xbt_dynar_push(dynar, &comm);
 }
 
-bool communicator::comm_test_n_destroy(msg_comm_t& comm)
+bool communicator::comm_test_n_destroy(msg_comm_t comm)
 {
     if (MSG_comm_test(comm)) {
         MSG_comm_destroy(comm);
@@ -130,11 +147,6 @@ bool communicator::comm_test_n_destroy(msg_comm_t& comm)
         return false;
 }
 
-void communicator::flush_sent()
-{
-    std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy);
-}
-
 // Local variables:
 // mode: c++
 // End:
index ea096d60e974f33ab909cb3cf38dbcfa27f2c624..1e0167760dc545e3226041a655fd01728c96dbf3 100644 (file)
@@ -16,6 +16,8 @@ public:
     message_type get_type() const       { return type;   }
     double get_amount() const           { return amount; }
 
+    std::string to_string();
+
 private:
     message_type type;
     double amount;
@@ -26,15 +28,15 @@ public:
     communicator();
     ~communicator();
 
+    void listen();
+
     void send(const char* dest, message* msg);
     bool recv(message*& msg, m_host_t& from, bool wait);
-    void wait_for_sent();
+    void flush(bool wait);
 
     void next_close_on_ctrl_is_last();
     void next_close_on_data_is_last();
 
-    int send_backlog();
-
 private:
     // List of pending send communications
     std::list<msg_comm_t> sent_comm;
@@ -53,8 +55,9 @@ private:
 
     const char* get_ctrl_mbox() const   { return ctrl_mbox.c_str(); }
     const char* get_data_mbox() const   { return data_mbox.c_str(); }
-    static bool comm_test_n_destroy(msg_comm_t& comm);
-    void flush_sent();
+
+    static void comm_push_in_dynar(xbt_dynar_t dynar, msg_comm_t comm);
+    static bool comm_test_n_destroy(msg_comm_t comm);
 };
 
 #endif // !COMMUNICATOR_H
index d51fee22843a3803d878815bae0cc8d6fb7ff924..18d4613dd77ade3686e7b09bae8c031e67803c25 100644 (file)
@@ -1,6 +1,8 @@
-#include <cstring>
+#include <cstring>              // strrchr
+#include <iomanip>
 #include <iostream>
-#include <unistd.h>
+#include <sstream>
+#include <unistd.h>             // getopt
 #include <xbt/log.h>
 #include "options.h"
 #include "misc.h"
@@ -17,6 +19,9 @@ namespace opt {
     int help_requested = 0;
     bool version_requested = false;
 
+    unsigned maxiter = 4;
+    bool exit_on_close = false;
+
     bool bookkeeping = false;
 
     cost_func comp_cost("1e9, 0"); // fixme: find better defaults
@@ -41,17 +46,23 @@ int opt::parse_args(int* argc, char* argv[])
 
     int c;
     opterr = 0;
-    while ((c = getopt(*argc, argv, "bc:hV")) != -1) {
+    while ((c = getopt(*argc, argv, "bc:ehi:V")) != -1) {
         switch (c) {
         case 'b':
             opt::bookkeeping = true;
             break;
+        case 'e':
+            opt::exit_on_close = true;
+            break;
         case 'h':
             opt::help_requested++;
             break;
         case 'c':
             opt::comp_cost = cost_func(optarg);
             break;
+        case 'i':
+            std::istringstream(optarg) >> opt::maxiter;
+            break;
         case 'V':
             opt::version_requested = true;
             break;
@@ -92,12 +103,13 @@ void opt::print()
     INFO0(",----[ Simulation parameters ]");
     INFO1("| platform_file.......: \"%s\"", opt::platform_file);
     INFO1("| application_file....: \"%s\"", opt::application_file);
+    INFO1("| maxiter.............: %u",     opt::maxiter);
+    INFO1("| exit on close.......: %s",     on_off(opt::exit_on_close));
     INFO1("| bookkeeping.........: %s",     on_off(opt::bookkeeping));
     INFO1("| comp. cost factors..: [%s]",   opt::comp_cost.to_string().c_str());
     INFO0("`----");
 }
 
-#include <iomanip>
 void opt::usage()
 {
     const int indent1 = 6;
@@ -116,10 +128,15 @@ void opt::usage()
         return;
 
     std::clog << o("-V") << "print version and exit\n";
+
     std::clog << o("-b") << "activate bookkeeping\n";
     std::clog << oo("-c", "[fn,...]f0")
               << "polynomial factors for computation cost ("
               << opt::comp_cost.to_string() << ")\n";
+    std::clog << o("-e") << "exit on close reception\n";
+    std::clog << oo("-i", "value")
+              << "maximum number of iterations, 0 for infinity ("
+              << opt::maxiter << ")\n";
 
 #undef o
 #undef oo
index 06f059f961f64566baff5be3f911f434ebdd6d47..43286f3606a84f45570c0019be7acb9a0e5c706e 100644 (file)
--- a/options.h
+++ b/options.h
@@ -14,6 +14,9 @@ namespace opt {
     extern int help_requested;
     extern bool version_requested;
 
+    extern unsigned maxiter;
+    extern bool exit_on_close;
+
     extern bool bookkeeping;
 
     extern cost_func comp_cost;
index e9e234864dfbe733623aeb3ad9edb128a8658e52..d8ec50036a4d6d9d2e93568bc944604bba118375 100644 (file)
@@ -18,7 +18,14 @@ process::process(int argc, char* argv[])
 
     neigh.assign(argv + 2, argv + argc);
     expected_load = load;
+
     ctrl_close_pending = data_close_pending = neigh.size();
+    if (neigh.size() == 1) {
+        comm.next_close_on_ctrl_is_last();
+        comm.next_close_on_data_is_last();
+    }
+    if (neigh.size() > 0)
+        comm.listen();
 
     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
     if (!LOG_ISENABLED(logp))
@@ -39,49 +46,51 @@ process::process(int argc, char* argv[])
 
 int process::run()
 {
-    INFO0("Coucou !");
+    bool one_more = true;
+    unsigned iter = 0;
+    VERB0("Starting...");
+    while (one_more) {
+        bool close_received;
 
-    int n = 100;
-    while (n--) {
         if (opt::bookkeeping)
-            INFO2("current load: %g ; expected: %g", load, expected_load);
+            INFO3("(%u) current load: %g ; expected: %g",
+                  iter, load, expected_load);
         else
-            INFO1("current load: %g", load);
+            INFO2("(%u) current load: %g",
+                  iter, load);
 
-        if (load > 0)
-            compute();
-        else
-            xbt_sleep(100);        // fixme
-        if (!receive(false))
-            n = 0;
+        compute();
+        close_received = !receive(false);
+
+        /*
+         *    compute load balancing;
+         *    send tasks to neighbors;
+         */
+
+        comm.flush(false);
+        ++iter;
+
+        if (opt::exit_on_close && close_received)
+            one_more = false;
+        if (opt::maxiter && iter >= opt::maxiter)
+            one_more = false;
     }
-    DEBUG0("going to finalize.");
+    VERB0("Going to finalize...");
     finalize();
 
-    //    MSG_process_sleep(100.0);   // xxx
-    /* xxx:
-     * while (there is something to do) {
-     *    compute some task;
-     *    get received tasks;
-     *    compute load balancing;
-     *    send tasks to neighbors;
-     * }
-     * finalize;
-     * wait for pending messages;
-     */
-
     /* Open Questions :
      * - definition of load on heterogeneous hosts ?
      * - how to detect convergence ?
      * - how to manage link failures ?
      */
 
-    DEBUG0("done.");
+    VERB0("Done.");
     return 0;
 }
 
 void process::compute()
 {
+    // fixme: shall we do something special when duration is 0 ?
     double duration = opt::comp_cost(load);
     m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
     DEBUG2("compute %g flop%s.", duration, ESSE(duration));
@@ -89,6 +98,8 @@ void process::compute()
     MSG_task_destroy(task);
 }
 
+
+// Returns false if a CLOSE message was received. 
 bool process::receive(bool wait_for_close)
 {
     bool result = true;
@@ -96,30 +107,29 @@ bool process::receive(bool wait_for_close)
     m_host_t from;
     while ((ctrl_close_pending ||
             data_close_pending) && comm.recv(msg, from, wait_for_close)) {
+        DEBUG2("received %s from %s",
+               msg->to_string().c_str(), MSG_host_get_name(from));
         switch (msg->get_type()) {
         case message::INFO:
-            DEBUG0("received INFO");
             // fixme: update neighbor
             // need a map m_host_t -> neighbor&
             break;
         case message::CREDIT:
-            DEBUG0("received CREDIT");
             expected_load += msg->get_amount();
             break;
         case message::LOAD:
-            DEBUG0("received LOAD");
             load += msg->get_amount();
             break;
         case message::CTRL_CLOSE:
-            DEBUG0("received CTRL_CLOSE");
             if (--ctrl_close_pending == 1)
                 comm.next_close_on_ctrl_is_last();
+            DEBUG1("ctrl_close_pending = %d", ctrl_close_pending);
             result = false;
             break;
         case message::DATA_CLOSE:
-            DEBUG0("received DATA_CLOSE");
             if (--data_close_pending == 1)
                 comm.next_close_on_data_is_last();
+            DEBUG1("data_close_pending = %d", data_close_pending);
             result = false;
             break;
         }
@@ -142,7 +152,7 @@ void process::finalize()
            (int )neigh.size(), ESSE(neigh.size()));
     receive(true);
 
-    comm.wait_for_sent();
+    comm.flush(true);
 }
 
 void process::print_loads(e_xbt_log_priority_t logp)
diff --git a/valgrind_suppressions_3.5 b/valgrind_suppressions_3.5
new file mode 100644 (file)
index 0000000..f445238
--- /dev/null
@@ -0,0 +1,10 @@
+{
+   Memory leaks in surf_routing.c
+   Memcheck:Leak
+   ...
+   fun:surf_parse_lex
+   fun:parse_platform_file
+   fun:SIMIX_create_environment
+   fun:MSG_create_environment
+   ...
+}