]> AND Private Git Repository - loba.git/commitdiff
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Merge branch 'master' of ssh://info.iut-bm.univ-fcomte.fr/loba
authorAberrahmane Sider <ar.sider@univ-bejaia.dz>
Wed, 25 May 2011 23:17:06 +0000 (00:17 +0100)
committerAberrahmane Sider <ar.sider@univ-bejaia.dz>
Wed, 25 May 2011 23:17:06 +0000 (00:17 +0100)
16 files changed:
BUGS
Makefile
NOTES
README
communicator.cpp
cost_func.cpp
deployment.cpp
hostdata.cpp
main.cpp
messages.cpp
messages.h
options.cpp
options.h
process.cpp
process.h
sync_queue.h [new file with mode: 0644]

diff --git a/BUGS b/BUGS
index 17da8da41aa0d6075a4a2b1f73aa9669b28db95f..040cfc39fd3af4fcb16a7aa4aacf161a2315ec69 100644 (file)
--- a/BUGS
+++ b/BUGS
@@ -1,3 +1,22 @@
+========================================================================
+========================================================================
+##### RESOLVED BUGS COME AFTER THIS ####################################
+========================================================================
+Il faut réviser l'utilisation du mutex entre le thread d'équilibrage
+et le thread de calcul.  Il semble gardé beaucoup trop longtemps.
+
+Bon, une partie du problème est rectifiée par le commit
+48de954 Stop locking the mutex on data_receive.
+
+Pour le reste, je pense maintenant que ça ne gêne pas, au moins dans
+le simulateur.  Pour faire bien, il faudrait plus séparer les deux
+threads d'équilibrage et de calcul, et faire en sorte que chacun garde
+un cache des données globales partagées.  Il suffirait alors de
+synchroniser ces caches à chaque itération.
+
+Les données partagées sont essentiellement les données des voisins :
+load, to_send et debt.
+
 ========================================================================
 Comment expliquer ces différences entre SG 3.5 et SG svn ?
 
index 751418c49ef18bfe4bba1765e539b292022af971..5b0d491b840b1293a4a0501b44d0bf4d0539a6b7 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,7 @@
 #SIMGRID_INSTALL_DIR ?= $(PWD)/simgrid-stable
 SIMGRID_INSTALL_DIR ?= $(PWD)/simgrid-dev
 
-OPTIM_FLAGS += -O3
+OPTIM_FLAGS += -pipe -O3
 DEBUG_FLAGS += -g
 #DEBUG_FLAGS += -pg
 CHECK_FLAGS += -Wall -Wextra
@@ -16,7 +16,7 @@ CPPFLAGS += $(CHECK_FLAGS)
 #CFLAGS += -fgnu89-inline      # workaround simgrid bug
 CFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS)
 
-#CXXFLAGS += -std=c++0x
+CXXFLAGS += -std=c++0x
 CXXFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS)
 
 LDFLAGS += -L $(SIMGRID_INSTALL_DIR)/lib
@@ -26,7 +26,8 @@ LINK.o = $(CXX) $(CXXFLAGS) $(LDFLAGS) $(TARGET_ARCH)
 LDLIBS := -lsimgrid
 
 MAKEDEPEND.FLAGS =  $(CPPFLAGS) -MM -MG -MF $@ $<
-MAKEDEPEND.CXX = $(CXX) $(MAKEDEPEND.FLAGS)
+MAKEDEPEND.C = $(CC) $(CFLAGS) $(MAKEDEPEND.FLAGS)
+MAKEDEPEND.CXX = $(CXX) $(CXXFLAGS) $(MAKEDEPEND.FLAGS)
 
 SRC.loba := main.cpp           \
        communicator.cpp        \
@@ -73,6 +74,8 @@ realclean: clean
        $(RM) $(XML_DEV_FILES)
        $(RM) *~
 
+.%.d: %.c ; $(MAKEDEPEND.C)
+
 .%.d: %.cpp ; $(MAKEDEPEND.CXX)
 
 $(FLAVOURED_LOBA):
diff --git a/NOTES b/NOTES
index 5399a4c982987d69a584edd45bda82d6ba92be21..53bce11e7e31fe340daa071eab018c3e67ddc005 100644 (file)
--- a/NOTES
+++ b/NOTES
@@ -71,3 +71,7 @@ process::expected_load          Current load estimation.
                                   minus pending sends.
                                 * With bookkeeping, it corresponds to the
                                   "virtual load".
+
+process::received_load          Real load received from neighbors.
+                                Used when receiveing data messages, and then
+                                added to real_load.
diff --git a/README b/README
index 694f795ea2aa3a4395b946e1f8fd9706040e9f63..8fcbc7c29716214b8f3ce950f8212df4c720c3fb 100644 (file)
--- a/README
+++ b/README
@@ -199,9 +199,15 @@ Liste de fichiers
 
     main.cpp                    le programme principal
 
+    message.h                   file de messages reçus
+    message.cpp
+
     misc.h                      divers trucs inclassables
     misc.cpp
 
+    msg_thread.h                creation de threads SG/MSG
+    msg_thread.cpp
+
     named_object_list.h         gestion d'une table de constructeurs
                                 avec des noms et des descriptions
 
@@ -216,10 +222,16 @@ Liste de fichiers
 
     simgrid_features.h          macros pour détecter la version de SimGrid
 
-    simple_async.cpp            un simple programme de test
+    statistics.h                pour calculer moyenne, variance, etc.
+
+    synchro.h                   mutex, condition, etc.
+
+    sync_queue.h                lock-free synchronized queue
 
     timer.h                     gestion de timer
 
+    tracing.h                   définitions liées au traçage
+
     version.h                   gestion de la version du programme
     version.cpp
 
@@ -231,11 +243,14 @@ Liste de fichiers
     extract.pl                  outil d'extraction des données à partir des
                                 traces, pour tracer des courbes
 
+    new_loba.sh                 pour créer le squelette d'un nouvel algo
+                                d'équiblibrage loba_*
+
     setlocalversion             calcule un numéro de version à partir du hash
                                 du dernier commit (git)
 
 * autres fichiers
 
     .gitignore                  liste des fichiers ignorés par git
-    valgrind_suppressions_3.5   liste de quelques suppressions pour valgrind
+    valgrind_suppressions       liste de quelques suppressions pour valgrind
                                 avec SimGrid 3.5
index 867d19eb36f1575345569c04fb772e8986be085b..2ddea5d65c0fdb16b1ec1849ce320681e8d5f2d5 100644 (file)
@@ -1,5 +1,5 @@
 #include <algorithm>
-#include <tr1/functional>
+#include <functional>
 #include <msg/msg.h>
 #include <xbt/log.h>
 
@@ -24,9 +24,8 @@ namespace {
 communicator::communicator()
     : host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
 {
-    using std::tr1::bind;
     receiver_thread = new_msg_thread("receiver",
-                                     bind(&communicator::receiver, this));
+                                     std::bind(&communicator::receiver, this));
     receiver_thread->start();
 }
 
@@ -56,7 +55,7 @@ msg_comm_t communicator::real_send(const char* dest, message* msg)
     XBT_DEBUG("send %s to %s", msg->to_string().c_str(), dest);
     m_task_t task = MSG_task_create("message", 0.0, msg->get_size(), msg);
     TRACE_msg_set_task_category(task,
-                                msg->get_type() == message::LOAD ?
+                                msg->get_type() == message::DATA ?
                                 TRACE_CAT_DATA : TRACE_CAT_CTRL);
     return MSG_task_isend(task, dest);
 }
index 12c20f6a2cd88824de1645f4c7650f4f6488c6a9..d6cba8c88ed667484b1f6af6ce0b95a3ec9244da 100644 (file)
@@ -1,5 +1,5 @@
 #include <algorithm>
-#include <tr1/functional>
+#include <functional>
 #include <numeric>
 #include <iterator>
 #include <sstream>
@@ -30,13 +30,12 @@ cost_func::~cost_func()
 
 double cost_func::operator()(double amount) const
 {
-    using std::tr1::bind;
-    using std::tr1::placeholders::_1;
-    using std::tr1::placeholders::_2;
+    using std::placeholders::_1;
+    using std::placeholders::_2;
     return std::accumulate(++factors.begin(), factors.end(), factors.front(),
-                           bind(std::plus<double>(),
-                                bind(std::multiplies<double>(), amount, _1),
-                                _2));
+                           std::bind(std::plus<double>(),
+                                     std::bind(std::multiplies<double>(),
+                                               amount, _1), _2));
 }
 
 std::string cost_func::to_string()
index 705c1ea859539b528810cad95040ac55c9789ca2..8f52806bb15fc591f28b6c5c6215466fb4770627 100644 (file)
@@ -1,6 +1,6 @@
 #include <algorithm>
 #include <cstdlib>
-#include <tr1/functional>
+#include <functional>
 #include <iomanip>
 #include <numeric>
 #include <sstream>
@@ -50,8 +50,7 @@ void deployment_generator::set_link(int host1, int host2)
 
 void deployment_generator::distribute_load()
 {
-    using std::tr1::bind;
-    using std::tr1::placeholders::_1;
+    using std::placeholders::_1;
 
     if (!opt::auto_depl::random_distribution) {
         set_load(0, opt::auto_depl::load);
@@ -63,7 +62,7 @@ void deployment_generator::distribute_load()
     double factor = opt::auto_depl::load /
         std::accumulate(loads.begin(), loads.end(), 0.0);
     std::transform(loads.begin(), loads.end(), loads.begin(),
-                   bind(std::multiplies<double>(), _1, factor));
+                   std::bind(std::multiplies<double>(), _1, factor));
     for (unsigned i = 0 ; i < hosts.size() ; ++i)
         set_load(i, loads[i]);
 }
index 17d7a0816a3ae62603637e4935ecfd3d2ddcf09d..ee8fadce107447fa32c0a3e7da307130ac89758f 100644 (file)
@@ -1,6 +1,6 @@
 #include <algorithm>
 #include <cstring>
-#include <tr1/functional>
+#include <functional>
 #include <stdexcept>
 #include <xbt/log.h>
 #include <xbt/sysdep.h>
@@ -16,18 +16,18 @@ std::vector<hostdata> hostdata::hosts;
 
 void hostdata::create()
 {
-    using std::tr1::bind;
-    using std::tr1::placeholders::_1;
-    using std::tr1::placeholders::_2;
+    using std::placeholders::_1;
+    using std::placeholders::_2;
 
     int nhosts = MSG_get_host_number();
     m_host_t* host_list = MSG_get_host_table();
     // only sort hosts for automatically created deployment
     if (opt::auto_depl::enabled)
         std::sort(host_list, host_list + nhosts,
-                  bind(std::less<int>(), bind(strcmp,
-                                              bind(MSG_host_get_name, _1),
-                                              bind(MSG_host_get_name, _2)), 0));
+                  std::bind(std::less<int>(),
+                            std::bind(strcmp,
+                                      std::bind(MSG_host_get_name, _1),
+                                      std::bind(MSG_host_get_name, _2)), 0));
     hosts.assign(host_list, host_list + nhosts);
     xbt_free(host_list);
 
index a88378daa584a7321a48f71e4c7f0ad8b812dfe1..ed8039e2c0ea5d3d3a392ab12ce067cd5c46f15a 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -1,7 +1,7 @@
 #include <cerrno>
+#include <csignal>
 #include <cstring>              // strchr
 #include <iostream>
-#include <signal.h>
 #include <stdexcept>
 #include <msg/msg.h>
 #include <xbt/log.h>
@@ -38,7 +38,9 @@ namespace {
         EXIT_FAILURE_INIT  = 0x02,  // failed to initialize simulator
         EXIT_FAILURE_SIMU  = 0x04,  // simulation failed
         EXIT_FAILURE_CLEAN = 0x08,  // error at cleanup
-        EXIT_FAILURE_OTHER = 0x10,  // other error
+        EXIT_FAILURE_INTR  = 0x10,  // interrupted by user
+        EXIT_FAILURE_LOAD  = 0x20,  // lost load on exit
+        EXIT_FAILURE_OTHER = 0x40,  // other error
     };
 
     // Cannot be globally initialized...
@@ -106,32 +108,38 @@ static int simulation_main(int argc, char* argv[])
     return result;
 }
 
-static void check_for_lost_load()
+static bool check_for_lost_load()
 {
+    bool res = true;
     double total_init = process::get_total_load_init();
     double total_exit = process::get_total_load_exit();
     double lost = total_init - total_exit;
     double lost_ratio = 100.0 * lost / total_init;
-    if (lost_ratio < -opt::load_ratio_threshold)
+    if (lost_ratio < -opt::load_ratio_threshold) {
         XBT_ERROR("Gained load at exit! %g (%g%%) <============",
                   -lost, -lost_ratio);
-    else if (lost_ratio > opt::load_ratio_threshold)
+        res = false;
+    } else if (lost_ratio > opt::load_ratio_threshold) {
         XBT_ERROR("Lost load at exit! %g (%g%%) <============",
                   lost, lost_ratio);
-    else
+        res = false;
+    } else
         XBT_VERB("Total load at exit looks good: %g (%g%%)", lost, lost_ratio);
 
     double total_running = process::get_total_load_running();
     double running_ratio = 100.0 * total_running / total_init;
-    if (running_ratio < -opt::load_ratio_threshold)
+    if (running_ratio < -opt::load_ratio_threshold) {
         XBT_ERROR("Negative running load at exit! %g (%g%%) <============",
                   total_running, running_ratio);
-    else if (running_ratio > opt::load_ratio_threshold)
+        res = false;
+    } else if (running_ratio > opt::load_ratio_threshold) {
         XBT_ERROR("Remaining running load at exit! %g (%g%%) <============",
                   total_running, running_ratio);
-    else
+        res = false;
+    } else
         XBT_VERB("Running load at exit looks good: %g (%g%%)",
                  total_running, running_ratio);
+    return res;
 }
 
 static void signal_handler(int /*sig*/)
@@ -140,7 +148,12 @@ static void signal_handler(int /*sig*/)
         XBT_CRITICAL(">>>>>>>>>>"
                      " caught CTRL-C: global exit requested "
                      "<<<<<<<<<<");
-        opt::exit_request = true;
+        opt::exit_request = 1;
+    } else {
+        XBT_CRITICAL(">>>>>>>>>>"
+                     " caught CTRL-C for the 2nd time: exit immediately "
+                     "<<<<<<<<<<");
+        exit(EXIT_FAILURE_INTR);
     }
 }
 
@@ -150,7 +163,7 @@ static void install_signal_handler()
     action.sa_handler = signal_handler;
     sigemptyset(&action.sa_mask);
     action.sa_flags = SA_RESTART;
-    if (sigaction (SIGINT, &action, NULL) == -1) {
+    if (sigaction(SIGINT, &action, NULL) == -1) {
         std::cerr << "sigaction: " << strerror(errno) << "\n";
         exit(EXIT_FAILURE_OTHER);
     }
@@ -282,7 +295,8 @@ int main(int argc, char* argv[])
     if (simulated_time >= 0.0) {
         simulation_time.stop();
         elapsed_time.stop();
-        check_for_lost_load();
+        if (!check_for_lost_load())
+            exit_status |= EXIT_FAILURE_LOAD;
         XBT_INFO(",----[ Results ]");
         PR_STATS("Load", loads);
         PR_STATS("Computation", comps);
index f5df3969b38db3d09707f097e93370aaa19dbcb3..fd19f6b979657881f082d09e7bc0ea8a480a3ac2 100644 (file)
@@ -8,61 +8,75 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm);
 
 #include "messages.h"
 
+message::message(message_type t, double a, double c)
+    : type(t), amount(a) , credit(c)
+{
+    // compute message size
+    // arbitrary: 8 for type, and 8 for each double
+    switch (type) {
+    case CTRL:
+        size = opt::bookkeeping ? 24 : 16; // type + amount + (credit)?
+        break;
+    case DATA:
+        size = 16 + opt::comm_cost(amount); // type + amount + data size
+        break;
+    default:
+        size = 8;               // type
+        break;
+    }
+}
+
 std::string message::to_string()
 {
-    static const char* str[] = { "INFO", "CREDIT", "LOAD",
-                                 "CTRL_CLOSE", "DATA_CLOSE" };
+    static const char* str[DATA_CLOSE + 1] = { "CTRL", "DATA",
+                                               "CTRL_CLOSE", "DATA_CLOSE" };
     std::ostringstream oss;
     oss << str[type] << ": " << amount;
     return oss.str();
 }
 
-double message::get_size() const
-{
-    // arbitrary: 8 for type, and 8 for amount
-    double size = 16;
-    if (type == LOAD)
-        size += opt::comm_cost(amount);
-    return size;
-}
-
 void message_queue::push(m_task_t task)
 {
-    mutex.acquire();
-    queue.push(task);
-    cond.signal();
-    mutex.release();
+    if (queue.push(task)) {
+         // list was empty, the push must be signaled
+        mutex.acquire();
+        cond.signal();
+        mutex.release();
+    }
 }
 
 bool message_queue::pop(message*& msg, m_host_t& from, double timeout)
 {
-    if (timeout != 0) {
-        volatile double deadline =
-            timeout > 0 ? MSG_get_clock() + timeout : 0.0;
+    m_task_t task;
+    if (!queue.try_pop(task)) {
+        if (timeout == 0.0)
+            return false;
+
         mutex.acquire();
-        while (queue.empty() && (!deadline || deadline > MSG_get_clock())) {
+        if (!queue.try_pop(task)) {
             xbt_ex_t e;
             XBT_DEBUG("waiting for a message to come");
             TRY {
-                if (deadline)
-                    cond.timedwait(mutex, deadline - MSG_get_clock());
+                if (timeout > 0)
+                    cond.timedwait(mutex, timeout);
                 else
                     cond.wait(mutex);
             }
+            TRY_CLEANUP {
+                mutex.release();
+            }
             CATCH (e) {
                 if (e.category != timeout_error)
                     RETHROW;
                 xbt_ex_free(e);
+                return false;   // got a timeout
             }
+            bool pop_was_successful = queue.try_pop(task);
+            xbt_assert(pop_was_successful);
+        } else {
+            mutex.release();
         }
-        mutex.release();
     }
-
-    if (queue.empty())
-        return false;
-
-    m_task_t task = queue.front();
-    queue.pop();
     msg = static_cast<message*>(MSG_task_get_data(task));
     from = MSG_task_get_source(task);
     MSG_task_destroy(task);
index 605b3be1e42282fa87bf5ac5cdb3cf5a6f9bd292..1886301fc1560c835551c6cc7e4774787292df46 100644 (file)
@@ -5,22 +5,26 @@
 #include <string>
 #include <msg/msg.h>
 #include "synchro.h"
+#include "sync_queue.h"
 
 class message {
 public:
-    enum message_type { INFO, CREDIT, LOAD, CTRL_CLOSE, DATA_CLOSE };
+    enum message_type { CTRL, DATA, CTRL_CLOSE, DATA_CLOSE };
 
-    message(message_type t, double a): type(t), amount(a) { }
+    message(message_type t, double a, double c = 0.0);
 
     message_type get_type() const       { return type;   }
     double get_amount() const           { return amount; }
-    double get_size() const;
+    double get_credit() const           { return credit; }
+    double get_size() const             { return size;   }
 
     std::string to_string();
 
 private:
     message_type type;
     double amount;
+    double credit;
+    double size;
 };
 
 class message_queue {
@@ -39,7 +43,7 @@ public:
 private:
     mutex_t mutex;
     condition_t cond;
-    std::queue<m_task_t> queue;
+    sync_queue<m_task_t> queue;
 };
 
 #endif // !MESSAGES_H
index 3e0f92d764a29cc695522d5a3dd770e99481a6c8..3eed8a259731798ba790f1c59eafad44798b01fc 100644 (file)
@@ -39,7 +39,7 @@ namespace opt {
 
     // Simulation parameters
     int log_rate = 1;
-    bool exit_request = false;
+    volatile std::sig_atomic_t exit_request = 0;
 
     // Platform and deployment
     std::string platform_file;
index 604d1781ebf1deb00ca901de2ae2bcef971a821f..0c69e116f75528bde066e4bbe3455ac92e0224de 100644 (file)
--- a/options.h
+++ b/options.h
@@ -1,6 +1,7 @@
 #ifndef OPTIONS_H
 #define OPTIONS_H
 
+#include <csignal>              // std::sig_atomic_t
 #include <string>
 #include "cost_func.h"
 #include "named_object_list.h"
@@ -23,7 +24,7 @@ namespace opt {
 
     // Simulation parameters
     extern int log_rate;
-    extern bool exit_request;
+    extern volatile std::sig_atomic_t exit_request;
 
     // Platform and deployment
     extern std::string platform_file;
index 54fea8aeb2ed8534d4561715b9595b0950c967ef..2de3016588dd2dd95e66b073eba4b59eb1ab7e18 100644 (file)
@@ -1,5 +1,5 @@
 #include <algorithm>
-#include <tr1/functional>
+#include <functional>
 #include <iterator>
 #include <numeric>
 #include <stdexcept>
@@ -50,6 +50,7 @@ process::process(int argc, char* argv[])
     expected_load = real_load;
     total_load_running += real_load;
     total_load_init += real_load;
+    received_load = 0.0;
 
     ctrl_close_pending = data_close_pending = neigh.size();
     close_received = false;
@@ -58,8 +59,7 @@ process::process(int argc, char* argv[])
     comp_iter = lb_iter = 0;
 
     lb_thread = new_msg_thread("loba",
-                               std::tr1::bind(&process::load_balance_loop,
-                                              this));
+                               std::bind(&process::load_balance_loop, this));
 
     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
     if (!LOG_ISENABLED(logp))
@@ -70,7 +70,7 @@ process::process(int argc, char* argv[])
         oss << ESSE(neigh.size()) << ": ";
         std::transform(neigh.begin(), neigh.end() - 1,
                        std::ostream_iterator<const char*>(oss, ", "),
-                       std::tr1::mem_fn(&neighbor::get_name));
+                       std::mem_fn(&neighbor::get_name));
         oss << neigh.back().get_name();
     }
     XBT_LOG(logp, "Got %s.", oss.str().c_str());
@@ -81,6 +81,8 @@ process::~process()
 {
     delete lb_thread;
     total_load_exit += real_load;
+    xbt_assert(received_load == 0.0,
+               "received_load is %g, but should be 0.0 !", received_load);
     if (opt::log_rate < 0)
         return;
     XBT_INFO("Final load after %d:%d iterations: %g",
@@ -112,8 +114,7 @@ int process::run()
 
 void process::load_balance_loop()
 {
-    using std::tr1::bind;
-    using std::tr1::placeholders::_1;
+    using std::placeholders::_1;
 
     double next_iter_after_date = MSG_get_clock() + opt::min_lb_iter_duration;
     while (still_running()) {
@@ -144,7 +145,7 @@ void process::load_balance_loop()
         // send
         comm.ctrl_flush(false);
         std::for_each(neigh.begin(), neigh.end(),
-                      bind(&process::ctrl_send, this, _1));
+                      std::bind(&process::ctrl_send, this, _1));
         prev_load_broadcast = expected_load;
         mutex.release();
 
@@ -156,10 +157,10 @@ void process::load_balance_loop()
     XBT_DEBUG("send CTRL_CLOSE to %zu neighbor%s",
               neigh.size(), ESSE(neigh.size()));
     std::for_each(neigh.begin(), neigh.end(),
-                  bind(&process::ctrl_close, this, _1));
+                  std::bind(&process::ctrl_close, this, _1));
     while (ctrl_close_pending) {
         comm.ctrl_flush(false);
-        XBT_DEBUG("waiting for %d CTRL CLOSE", ctrl_close_pending);
+        XBT_DEBUG("waiting for %d CTRL_CLOSE", ctrl_close_pending);
         ctrl_receive(-1.0);
     }
     comm.ctrl_flush(true);
@@ -167,24 +168,20 @@ void process::load_balance_loop()
 
 void process::compute_loop()
 {
-    using std::tr1::bind;
-    using std::tr1::placeholders::_1;
+    using std::placeholders::_1;
 
     double next_iter_after_date = MSG_get_clock() + opt::min_comp_iter_duration;
     while (still_running()) {
-        // receive
-        mutex.acquire();
-        if (real_load > 0.0)
-            data_receive(0.0);
-        else
-            data_receive(opt::min_comp_iter_duration);
-        mutex.release();
+        // receive (do not block if there is something to compute)
+        data_receive(real_load > 0.0 ? 0.0 : opt::min_comp_iter_duration);
 
         // send
         comm.data_flush(false);
         mutex.acquire();
+        real_load += received_load;
+        received_load = 0.0;
         std::for_each(neigh.begin(), neigh.end(),
-                      bind(&process::data_send, this, _1));
+                      std::bind(&process::data_send, this, _1));
         mutex.release();
 
         if (real_load == 0.0)
@@ -204,20 +201,19 @@ void process::compute_loop()
     }
 
     XBT_VERB("Going to finalize for %s...", __func__);
-    // last send, for not losing load scheduled to be sent
-    std::for_each(neigh.begin(), neigh.end(),
-                  bind(&process::data_send, this, _1));
     finalizing = true;
-    total_load_running -= real_load;
     XBT_DEBUG("send DATA_CLOSE to %zu neighbor%s",
               neigh.size(), ESSE(neigh.size()));
     std::for_each(neigh.begin(), neigh.end(),
-                  bind(&process::data_close, this, _1));
+                  std::bind(&process::data_close, this, _1));
     while (data_close_pending) {
         comm.data_flush(false);
-        XBT_DEBUG("waiting for %d DATA CLOSE", data_close_pending);
+        XBT_DEBUG("waiting for %d DATA_CLOSE", data_close_pending);
         data_receive(-1.0);
     }
+    real_load += received_load;
+    received_load = 0.0;
+    total_load_running -= real_load;
     comm.data_flush(true);
 }
 
@@ -265,13 +261,12 @@ bool process::still_running()
 
 double process::get_sum_of_to_send() const
 {
-    using std::tr1::bind;
-    using std::tr1::placeholders::_1;
-    using std::tr1::placeholders::_2;
+    using std::placeholders::_1;
+    using std::placeholders::_2;
 
     return std::accumulate(neigh.begin(), neigh.end(), 0.0,
-                           bind(std::plus<double>(),
-                                _1, bind(&neighbor::get_to_send, _2)));
+                           std::bind(std::plus<double>(), _1,
+                                     std::bind(&neighbor::get_to_send, _2)));
 }
 
 void process::load_balance()
@@ -290,20 +285,20 @@ void process::send(neighbor& nb, double amount)
 void process::ctrl_send(neighbor& nb)
 {
     double info_to_send = expected_load;
-    if (info_to_send != prev_load_broadcast) {
-        message* msg = new message(message::INFO, info_to_send);
-        add_ctrl_send_mesg(msg->get_size());
-        comm.ctrl_send(nb.get_ctrl_mbox(), msg);
-    }
-    if (opt::bookkeeping) {
-        double debt_to_send = nb.get_to_send();
+    double debt_to_send;
+    if (opt::bookkeeping) {     // bookkeeping
+        debt_to_send = nb.get_to_send();
         if (debt_to_send > 0.0) {
             nb.set_to_send(0.0);
             nb.set_debt(nb.get_debt() + debt_to_send);
-            message* msg = new message(message::CREDIT, debt_to_send);
-            add_ctrl_send_mesg(msg->get_size());
-            comm.ctrl_send(nb.get_ctrl_mbox(), msg);
         }
+    } else {                    // !bookkeeping
+        debt_to_send = 0.0;
+    }
+    if (info_to_send != prev_load_broadcast || debt_to_send > 0.0) {
+        message* msg = new message(message::CTRL, info_to_send, debt_to_send);
+        add_ctrl_send_mesg(msg->get_size());
+        comm.ctrl_send(nb.get_ctrl_mbox(), msg);
     }
 }
 
@@ -333,7 +328,7 @@ void process::data_send(neighbor& nb)
             amount = std::min(load_to_send, opt::max_transfer_amount);
         else
             amount = load_to_send;
-        message* msg = new message(message::LOAD, amount);
+        message* msg = new message(message::DATA, amount);
         add_data_send_mesg(msg->get_size());
         comm.data_send(nb.get_data_mbox(), msg);
         load_to_send -= amount;
@@ -381,19 +376,15 @@ void process::data_receive(double timeout)
 void process::handle_message(message* msg, m_host_t from)
 {
     switch (msg->get_type()) {
-    case message::INFO: {
+    case message::CTRL: {
         neighbor* n = rev_neigh[from];
         n->set_load(msg->get_amount() + n->get_to_send());
+        expected_load += msg->get_credit(); // may be 0.0 if !opt::bookkeeping
         break;
     }
-    case message::CREDIT:
-        expected_load += msg->get_amount();
-        break;
-    case message::LOAD: {
+    case message::DATA: {
         double ld = msg->get_amount();
-        real_load += ld;
-        if (finalizing)
-            total_load_running -= ld;
+        received_load += ld;
         break;
     }
     case message::CTRL_CLOSE:
@@ -410,12 +401,11 @@ void process::handle_message(message* msg, m_host_t from)
 
 #define print_loads_generic(vec, verbose, logp, cat)                    \
     if (_XBT_LOG_ISENABLEDV((*cat), logp)) {                            \
-        using std::tr1::bind;                                           \
-        using std::tr1::placeholders::_1;                               \
+        using std::placeholders::_1;                                    \
         XBT_XCLOG(cat, logp, "My load: %g (real); %g (expected).  "     \
                   "Neighbor loads:", real_load, expected_load);         \
         std::for_each(vec.begin(), vec.end(),                           \
-                      bind(&neighbor::print, _1, verbose, logp, cat));  \
+                      std::bind(&neighbor::print, _1, verbose, logp, cat)); \
     } else ((void)0)
 
 void process::print_loads(bool verbose,
index ed6e6cf6fbedffc1ab96657aadda320828ca0a2a..57ece07203d300e2d264d8096c3958d654e34439 100644 (file)
--- a/process.h
+++ b/process.h
@@ -5,10 +5,10 @@
 //#undef USE_UNORDERED_MAP
 
 #include <algorithm>
-#include <tr1/functional>
+#include <functional>
 #ifdef USE_UNORDERED_MAP
-#  include <tr1/unordered_map>
-#  define MAP_TEMPLATE std::tr1::unordered_map
+#  include <unordered_map>
+#  define MAP_TEMPLATE std::unordered_map
 #else
 #  include <map>
 #  define MAP_TEMPLATE std::map
@@ -102,6 +102,7 @@ private:
                                 // a same information messages
     double real_load;           // current load
     double expected_load;       // expected load in bookkeeping mode
+    double received_load;       // load received from neighbors
 
     mutex_t mutex;              // synchronization between threads
     condition_t cond;
@@ -169,13 +170,12 @@ private:
 template <typename Compare>
 void process::pneigh_sort_by_load(const Compare& comp)
 {
-    using std::tr1::bind;
-    using std::tr1::placeholders::_1;
-    using std::tr1::placeholders::_2;
+    using std::placeholders::_1;
+    using std::placeholders::_2;
     std::sort(pneigh.begin(), pneigh.end(),
-              bind(comp,
-                   bind(&neighbor::get_load, _1),
-                   bind(&neighbor::get_load, _2)));
+              std::bind(comp,
+                        std::bind(&neighbor::get_load, _1),
+                        std::bind(&neighbor::get_load, _2)));
 }
 
 #endif // !PROCESS_H
diff --git a/sync_queue.h b/sync_queue.h
new file mode 100644 (file)
index 0000000..c3f29e5
--- /dev/null
@@ -0,0 +1,112 @@
+#ifndef SYNC_QUEUE_H
+#define SYNC_QUEUE_H
+
+#if __GNUC__ == 4 && __GNUC_MINOR__ == 4
+#  include <cstdatomic>         // <atomic> is named <cstdatomic> in gcc 4.4
+
+template<typename _Tp>          // fix missing definition in gcc 4.4
+void
+atomic<_Tp*>::store(_Tp* __v, memory_order __m) volatile
+{ atomic_address::store(__v, __m); }
+
+#else
+#  include <atomic>
+#endif
+
+#define SYNC_QUEUE_BUFSIZE 16
+
+template <typename T>
+class sync_queue {
+public:
+    sync_queue()
+    {
+        head_node = tail_node = new node();
+        head.store(head_node->values);
+        tail.store(tail_node->values);
+    }
+
+    ~sync_queue()
+    {
+        node* n = head_node;
+        while (n != NULL) {
+            node* prev = n;
+            n = n->next;
+            delete prev;
+        }
+    }
+
+    bool empty() const
+    {
+        return head.load() == tail.load();
+    }
+
+    // size() is not not thread-safe
+    size_t size() const
+    {
+        size_t count = 0;
+        if (head_node == tail_node) {
+            count = tail.load() - head.load();
+        } else {
+            count =
+                (head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) - head.load();
+            for (node* n = head_node->next; n != tail_node; n = n->next)
+                count += SYNC_QUEUE_BUFSIZE;
+            count += tail.load() - tail_node->values;
+        }
+        return count;
+    }
+
+    bool push(const T& val)
+    {
+        T* old_tail = tail.load();
+        T* new_tail;
+        if (old_tail == tail_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
+            tail_node->next = new node();
+            tail_node = tail_node->next;
+            new_tail = tail_node->values;
+        } else {
+            new_tail = old_tail + 1;
+        }
+        *new_tail = val;
+        tail.store(new_tail);
+        return (old_tail == head.load());
+    }
+
+    bool try_pop(T& res)
+    {
+        T* old_head = head.load();
+        if (old_head == tail.load()) // empty?
+            return false;
+
+        T* new_head;
+        if (old_head == head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) {
+            node* old_head_node = head_node;
+            head_node = head_node->next;
+            delete old_head_node;
+            new_head = head_node->values;
+        } else {
+            new_head = old_head + 1;
+        }
+        res = *new_head;
+        head.store(new_head);
+        return true;
+    }
+
+private:
+    struct node {
+        node(): next(NULL) { }
+        T values[SYNC_QUEUE_BUFSIZE];
+        node* next;
+    };
+
+    node* head_node;
+    node* tail_node;
+    std::atomic<T*> head;
+    std::atomic<T*> tail;
+};
+
+#endif // !SYNC_QUEUE_H
+
+// Local variables:
+// mode: c++
+// End: