]> AND Private Git Repository - loba.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Wip++...
authorArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Sun, 12 Dec 2010 22:51:22 +0000 (23:51 +0100)
committerArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Tue, 14 Dec 2010 23:27:04 +0000 (00:27 +0100)
* add hostdata
* add process::rev_neigh
* implement process::send

16 files changed:
Makefile
NOTES
TODO [new file with mode: 0644]
communicator.cpp
communicator.h
cost_func.cpp
hostdata.cpp [new file with mode: 0644]
hostdata.h [new file with mode: 0644]
load_balance.h [deleted file]
main.cpp
misc.h
neighbor.cpp
neighbor.h
options.cpp
process.cpp
process.h

index 61fab37d2aa3fa946ae69cdbf4e62fd78dc8926c..0f52ab673ae3d286fa55b2cdb31f7588d372127f 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -33,6 +33,7 @@ SETLOCALVERSION := ./setlocalversion
 SRC.loba := main.cpp           \
        communicator.cpp        \
        cost_func.cpp           \
+       hostdata.cpp            \
        misc.cpp                \
        neighbor.cpp            \
        options.cpp             \
diff --git a/NOTES b/NOTES
index 489006c1308775df2c816b977c1969efd82dc05a..fbb82a3d0c8ebd49e545b783e84e708b6a5f93a5 100644 (file)
--- a/NOTES
+++ b/NOTES
@@ -3,11 +3,9 @@ Process parameters:
     initial_load [neighbors...]
 
 Communications:
-     - two channels per host: normal and low_latency
+     - two channels per host: control and data
 
-How shall we manage link failures ?
-
-Process model (?)
+Process model
 
      while (there is something to do) {
         compute some task;
@@ -23,6 +21,3 @@ Process model (?)
          nothing more to do.
 
      - how to manage link failures?
-
-     - shall we retrieve pending tasks?
-       : Ideally, why shall.  How?  By using some acknowledgment?
diff --git a/TODO b/TODO
new file mode 100644 (file)
index 0000000..1f235b5
--- /dev/null
+++ b/TODO
@@ -0,0 +1,7 @@
+* implement loba_* algorithms (start with some trivial one)
+* add loba algorithm selection (-a number ?)
+
+* implement automatic process topology 
+   (line, ring, star, btree, clique, hypercube, etc..)
+* implement automatic platform generation
+   (number of hosts, all connected, constant bandwidth/latency)
index 0c64b5b9e22869026db3043083ed27f5deec0b32..4b54161108fd6808bb47c8bb8b4ad6cbb163b045 100644 (file)
@@ -1,9 +1,10 @@
+#include "communicator.h"
+
 #include <algorithm>
 #include <tr1/functional>
 #include <sstream>
 #include <msg/msg.h>
 #include <xbt/log.h>
-#include "communicator.h"
 #include "simgrid_features.h"
 #include "misc.h"
 
@@ -21,20 +22,14 @@ std::string message::to_string()
 }
 
 communicator::communicator()
+    : host((hostdata* )MSG_host_get_data(MSG_host_self()))
+    , ctrl_task(NULL)
+    , ctrl_comm(NULL)
+    , ctrl_close_is_last(false)
+    , data_task(NULL)
+    , data_comm(NULL)
+    , data_close_is_last(false)
 {
-    const char* hostname = MSG_host_get_name(MSG_host_self());
-
-    ctrl_mbox = hostname;
-    ctrl_mbox += "_ctrl";
-    ctrl_task = NULL;
-    ctrl_comm = NULL;
-    ctrl_close_is_last = false;
-
-    data_mbox = hostname;
-    data_mbox += "_data";
-    data_task = NULL;
-    data_comm = NULL;
-    data_close_is_last = false;
 }
 
 communicator::~communicator()
index 1e0167760dc545e3226041a655fd01728c96dbf3..4e0757a2dccce15627c3cc13b3a201437d50ef06 100644 (file)
@@ -6,6 +6,7 @@
 #include <list>
 #include <string>
 #include <msg/msg.h>
+#include "hostdata.h"
 
 class message {
 public:
@@ -38,23 +39,24 @@ public:
     void next_close_on_data_is_last();
 
 private:
+    // Myself
+    const hostdata* host;
+
     // List of pending send communications
     std::list<msg_comm_t> sent_comm;
 
     // Control channel for receiving
-    std::string ctrl_mbox;
-    msg_comm_t  ctrl_comm;
     m_task_t    ctrl_task;
+    msg_comm_t  ctrl_comm;
     bool        ctrl_close_is_last;
 
     // Data channel for receiving
-    std::string data_mbox;
-    msg_comm_t  data_comm;
     m_task_t    data_task;
+    msg_comm_t  data_comm;
     bool        data_close_is_last;
 
-    const char* get_ctrl_mbox() const   { return ctrl_mbox.c_str(); }
-    const char* get_data_mbox() const   { return data_mbox.c_str(); }
+    const char* get_ctrl_mbox() const   { return host->get_ctrl_mbox(); }
+    const char* get_data_mbox() const   { return host->get_data_mbox(); }
 
     static void comm_push_in_dynar(xbt_dynar_t dynar, msg_comm_t comm);
     static bool comm_test_n_destroy(msg_comm_t comm);
index 23e6a53c140a53def6f39898b3a410aaea1a2033..d102fd90772c1abd310ca54333acfbe1ee49fdab 100644 (file)
@@ -1,9 +1,10 @@
+#include "cost_func.h"
+
 #include <algorithm>
 #include <cstdlib>
 #include <cstring>
 #include <iterator>
 #include <sstream>
-#include "cost_func.h"
 
 cost_func::cost_func(const char* param)
 {
diff --git a/hostdata.cpp b/hostdata.cpp
new file mode 100644 (file)
index 0000000..96831f0
--- /dev/null
@@ -0,0 +1,44 @@
+#include "hostdata.h"
+
+#include <xbt/log.h>
+#include <stdexcept>
+
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
+
+hostdata* hostdata::instances = NULL;
+
+void hostdata::create()
+{
+    int nhosts = MSG_get_host_number();
+    m_host_t* host_list = MSG_get_host_table();
+    VERB1("Got %d hosts.", nhosts);
+    for (int i = 0; i < nhosts; i++) {
+        hostdata* h = new hostdata(host_list[i]);
+        MSG_host_set_data(host_list[i], h);
+        VERB2("Host #%d named \"%s\".", i, h->get_name());
+        h->next = instances;
+        instances = h;
+    }
+    xbt_free(host_list);
+}
+
+void hostdata::destroy()
+{
+    while (instances) {
+        hostdata* h = instances;
+        instances = h->next;
+        delete h;
+    }
+}
+
+hostdata::hostdata(m_host_t host)
+    : next(NULL)
+    , name(MSG_host_get_name(host))
+    , ctrl_mbox(std::string(name) + "_ctrl")
+    , data_mbox(std::string(name) + "_data")
+{
+}
+
+hostdata::~hostdata()
+{
+}
diff --git a/hostdata.h b/hostdata.h
new file mode 100644 (file)
index 0000000..82e133f
--- /dev/null
@@ -0,0 +1,33 @@
+#ifndef HOSTDATA_H
+#define HOSTDATA_H
+
+#include <string>
+#include <msg/msg.h>
+
+class hostdata {
+public:
+    static void create();
+    static void destroy();
+
+    hostdata(m_host_t host);
+    ~hostdata();
+
+    const char* get_name() const                { return name; }
+    const char* get_ctrl_mbox() const           { return ctrl_mbox.c_str(); }
+    const char* get_data_mbox() const           { return data_mbox.c_str(); }
+
+private:
+    // linked list of hostdata's, used by create/destroy
+    static hostdata* instances;
+    hostdata* next;
+
+    const char* name;
+    std::string ctrl_mbox;
+    std::string data_mbox;
+};
+
+#endif // !HOSTDATA_H
+
+// Local variables:
+// mode: c++
+// End:
diff --git a/load_balance.h b/load_balance.h
deleted file mode 100644 (file)
index d524aa7..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-#ifndef LOAD_BALANCE_H
-#define LOAD_BALANCE_H
-
-class loba {
-public:
-
-};
-
-#endif // !LOAD_BALANCE_H
-
-// Local variables:
-// mode: c++
-// End:
index d48bd877a790782951f61ceac2f4b73b1cd45e3d..a3b790d3fef112bd81fdc86f0a285ed6d6401d43 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -2,6 +2,7 @@
 #include <iostream>
 #include <msg/msg.h>
 #include <xbt/log.h>
+#include "hostdata.h"
 #include "misc.h"
 #include "options.h"
 #include "process.h"
@@ -71,14 +72,7 @@ int main(int argc, char* argv[])
 
         // Create the platform and the application.
         MSG_create_environment(opt::platform_file);
-        if (LOG_ISENABLED(xbt_log_priority_verbose)) {
-            int n = MSG_get_host_number();
-            m_host_t* h = MSG_get_host_table();
-            VERB1("Got %d hosts.", n);
-            for (int i = 0; i < n; i++)
-                VERB2("Host #%d named \"%s\".", i, MSG_host_get_name(h[i]));
-            xbt_free(h);
-        }
+        hostdata::create();
         MSG_launch_application(opt::application_file);
 
         exit_status = EXIT_FAILURE_SIMU; // =====
@@ -103,6 +97,7 @@ int main(int argc, char* argv[])
     }
 
     // Clean the MSG simulation.
+    hostdata::destroy();
     res = MSG_clean();
     if (res != MSG_OK) {
         ERROR1("MSG_clean() failed with status %#x", res);
diff --git a/misc.h b/misc.h
index 4bae95a80ff2fde41c30d8e297308b8c3172ca89..aee128e157aed15d7610f9b2dc9fd7be7fa26bfd 100644 (file)
--- a/misc.h
+++ b/misc.h
 #define LOG_ISENABLED(priority) \
     (_XBT_LOG_ISENABLEDV((*_XBT_LOGV(default)), (priority)))
 
+/* Returns c-string "s" if n > 1, empty string "" otherwise. */
+#define ESSE(n) ((n) > 1 ? misc::str_esse : misc::str_nil)
 namespace misc {
     extern const char str_esse[];
     extern const char str_nil[];
 }
-/* Returns c-string "s" if n > 1, empty string "" otherwise. */
-#define ESSE(n) ((n) > 1 ? misc::str_esse : misc::str_nil)
 
 #endif // !MISC_H
 
index 8715396e9d2be53313dc35c85a34e88b19ba3a8d..3340f8f49678998415d94a3fb9f5b83f43b6128d 100644 (file)
@@ -1,14 +1,14 @@
 #include "neighbor.h"
 
+#include <limits>
+#include <msg/msg.h>
+
 neighbor::neighbor(const char* hostname)
-    : name(hostname)
-    , ctrl_mbox(hostname)
-    , data_mbox(hostname)
+    : host((hostdata* )MSG_host_get_data(MSG_get_host_by_name(hostname)))
     , load(std::numeric_limits<double>::infinity())
     , debt(0.0)
+    , to_send(0.0)
 {
-    ctrl_mbox += "_ctrl";
-    data_mbox += "_data";
 }
 
 neighbor::~neighbor()
index 4bd05a9d86d0965fb5a2c2c82c4eebe56c57ccda..7700cb3c9f6fc113cbfd39e1d27cd0b05f364b90 100644 (file)
@@ -1,31 +1,34 @@
 #ifndef NEIGHBOR_H
 #define NEIGHBOR_H
 
-#include <limits>
-#include <string>
+#include <utility>
+#include "hostdata.h"
 
 class neighbor {
 public:
     neighbor(const char* hostname);
     ~neighbor();
 
-    const char* get_name() const        { return name.c_str(); }
-    const char* get_ctrl_mbox() const   { return ctrl_mbox.c_str(); }
-    const char* get_data_mbox() const   { return data_mbox.c_str(); }
+    const char* get_name() const        { return host->get_name(); }
+    const char* get_ctrl_mbox() const   { return host->get_ctrl_mbox(); }
+    const char* get_data_mbox() const   { return host->get_data_mbox(); }
 
-    double get_load() const             { return load; }
-    void set_load(double l)             { load = l;    }
+    double get_load() const             { return load;   }
+    void set_load(double amount)        { load = amount; }
 
-    double get_debt() const             { return debt; }
-    void set_debt(double d)             { debt = d;    }
+    double get_debt() const             { return debt;   }
+    void set_debt(double amount)        { debt = amount; }
+
+    double get_to_send() const          { return to_send;   }
+    void set_to_send(double amount)     { to_send = amount; }
 
 private:
-    std::string name;
-    std::string ctrl_mbox;
-    std::string data_mbox;
+    const hostdata* host;
 
     double load;
     double debt;
+
+    double to_send;
 };
 
 #endif // !NEIGHBOR_H
index a16023a64d4762b2181ceb03a9432e5fcb987f3c..61861f046f7ac1035cc6cd26186cf4bbe1901ebd 100644 (file)
@@ -1,10 +1,11 @@
+#include "options.h"
+
 #include <cstring>              // strrchr
 #include <iomanip>
 #include <iostream>
 #include <sstream>
 #include <unistd.h>             // getopt
 #include <xbt/log.h>
-#include "options.h"
 #include "misc.h"
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
index bc71b9d270a153f5cf20c517b0da2c8cdbb12879..1a9066d66d013ba234f097fcffa5661cbba2a476 100644 (file)
@@ -1,3 +1,5 @@
+#include "process.h"
+
 #include <algorithm>
 #include <tr1/functional>
 #include <iterator>
@@ -7,7 +9,6 @@
 #include <xbt/time.h>
 #include "misc.h"
 #include "options.h"
-#include "process.h"
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
 
@@ -17,6 +18,11 @@ process::process(int argc, char* argv[])
         throw std::invalid_argument("bad or missing initial load");
 
     neigh.assign(argv + 2, argv + argc);
+
+    std::for_each(neigh.begin(), neigh.end(),
+                  std::tr1::bind(&process::insert_neighbor_in_map,
+                                 this, std::tr1::placeholders::_1));
+
     expected_load = load;
 
     ctrl_close_pending = data_close_pending = neigh.size();
@@ -46,12 +52,11 @@ process::process(int argc, char* argv[])
 int process::run()
 {
     bool one_more = true;
-    unsigned iter = 0;
 
     INFO1("Initial load: %g", load);
     VERB0("Starting...");
+    iter = 0;
     while (one_more) {
-        bool close_received;
         ++iter;
 
         if (opt::log_rate && iter % opt::log_rate == 0) {
@@ -64,13 +69,15 @@ int process::run()
         }
 
         compute();
-        close_received = !receive(false);
 
-        /*
-         *    compute load balancing;
-         *    send tasks to neighbors;
-         */
+        bool close_received = !receive(false);
+
+        if (opt::bookkeeping)
+            expected_load -= load_balance(expected_load);
+        else
+            load -= load_balance(load);
 
+        send();
         comm.flush(false);
 
         if (opt::exit_on_close && close_received)
@@ -106,6 +113,61 @@ void process::compute()
     MSG_task_destroy(task);
 }
 
+double process::load_balance(double /*my_load*/)
+{
+    return 0.0;
+}
+
+void process::send1_no_bookkeeping(neighbor& nb)
+{
+    comm.send(nb.get_ctrl_mbox(), new message(message::INFO, load));
+    double load_to_send = nb.get_to_send();
+    if (load_to_send > 0.0) {
+        comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+        nb.set_to_send(0.0);
+    }
+}
+
+void process::send1_bookkeeping(neighbor& nb)
+{
+    comm.send(nb.get_ctrl_mbox(), new message(message::INFO, expected_load));
+    double load_to_send;
+    double new_debt;
+    double debt_to_send = nb.get_to_send();
+    if (debt_to_send > 0.0) {
+        comm.send(nb.get_ctrl_mbox(),
+                  new message(message::CREDIT, debt_to_send));
+        nb.set_to_send(0.0);
+        new_debt = nb.get_debt() + debt_to_send;
+    } else {
+        new_debt = nb.get_debt();
+    }
+    if (load <= new_debt) {
+        load_to_send = load;
+        nb.set_debt(new_debt - load_to_send);
+        load = 0.0;
+    } else {
+        load_to_send = new_debt;
+        nb.set_debt(0.0);
+        load -= load_to_send;
+    }
+    if (load_to_send > 0.0)
+        comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+}
+
+void process::send()
+{
+    // fixme: shall we send data at all iterations?
+    if (opt::bookkeeping) {
+        std::for_each(neigh.begin(), neigh.end(),
+                      std::tr1::bind(&process::send1_bookkeeping,
+                                     this, std::tr1::placeholders::_1));
+    } else {
+        std::for_each(neigh.begin(), neigh.end(),
+                      std::tr1::bind(&process::send1_no_bookkeeping,
+                                     this, std::tr1::placeholders::_1));
+    }
+}
 
 // Returns false if a CLOSE message was received. 
 bool process::receive(bool wait_for_close)
@@ -118,10 +180,11 @@ bool process::receive(bool wait_for_close)
         DEBUG2("received %s from %s",
                msg->to_string().c_str(), MSG_host_get_name(from));
         switch (msg->get_type()) {
-        case message::INFO:
-            // fixme: update neighbor
-            // need a map m_host_t -> neighbor&
+        case message::INFO: {
+            neighbor* n = rev_neigh[from];
+            n->set_load(msg->get_amount());
             break;
+        }
         case message::CREDIT:
             expected_load += msg->get_amount();
             break;
@@ -146,15 +209,19 @@ bool process::receive(bool wait_for_close)
     return result;
 }
 
+void process::finalize1(neighbor& nb)
+{
+    comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
+    comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));    
+}
+
 void process::finalize()
 {
     DEBUG2("send CLOSE to %d neighbor%s.",
            (int )neigh.size(), ESSE(neigh.size()));
-    std::vector<neighbor>::iterator n;
-    for (n = neigh.begin() ; n != neigh.end() ; ++n) {
-        comm.send(n->get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
-        comm.send(n->get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
-    }
+    std::for_each(neigh.begin(), neigh.end(),
+                  std::tr1::bind(&process::finalize1,
+                                 this, std::tr1::placeholders::_1));
 
     DEBUG2("wait for CLOSE from %d neighbor%s.",
            (int )neigh.size(), ESSE(neigh.size()));
@@ -180,6 +247,13 @@ void process::print_loads(e_xbt_log_priority_t logp)
     LOG1(logp, "Neighbor loads: %s", oss.str().c_str());
 }
 
+void process::insert_neighbor_in_map(neighbor& nb)
+{
+    neighbor* nbp = &nb;
+    m_host_t host = MSG_get_host_by_name(nb.get_name());
+    rev_neigh.insert(std::make_pair(host, nbp));
+}
+
 // Local variables:
 // mode: c++
 // End:
index 3baabbfddad931a25f1fe689faef32b6ca8882e3..a49c6efcd41e376189683c7761126db8abe0d859 100644 (file)
--- a/process.h
+++ b/process.h
@@ -1,8 +1,9 @@
 #ifndef PROCESS_H
 #define PROCESS_H
 
+#include <map>
 #include <vector>
-#include <xbt/log.h>
+#include <msg/msg.h>
 #include "communicator.h"
 #include "neighbor.h"
 
@@ -13,18 +14,29 @@ public:
     int run();
 
 private:
-    communicator comm;
     std::vector<neighbor> neigh;
-    double load;
-    double expected_load;
+    std::map<m_host_t, neighbor*> rev_neigh;
 
+    communicator comm;
     int ctrl_close_pending;
     int data_close_pending;
 
+    unsigned iter;
+
+    double load;
+    double expected_load;
+
     void compute();
+    virtual double load_balance(double my_load);
+    void send1_no_bookkeeping(neighbor& nb);
+    void send1_bookkeeping(neighbor& nb);
+    void send();
     bool receive(bool wait_for_close);
+    void finalize1(neighbor& nb);
     void finalize();
     void print_loads(e_xbt_log_priority_t logp = xbt_log_priority_info);
+
+    void insert_neighbor_in_map(neighbor& nb);
 };
 
 #endif // !PROCESS_H