From: Aberrahmane Sider Date: Wed, 25 May 2011 23:17:06 +0000 (+0100) Subject: Merge branch 'master' of ssh://info.iut-bm.univ-fcomte.fr/loba X-Git-Tag: v0.1~62 X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/commitdiff_plain/80a1fe1a200a4309c36598571b1f4ccb76a8788f?hp=70611cc5a8c28decbc1580c18f3a805347fb846e Merge branch 'master' of ssh://info.iut-bm.univ-fcomte.fr/loba --- diff --git a/BUGS b/BUGS index 17da8da..040cfc3 100644 --- a/BUGS +++ b/BUGS @@ -1,3 +1,22 @@ +======================================================================== +======================================================================== +##### RESOLVED BUGS COME AFTER THIS #################################### +======================================================================== +Il faut réviser l'utilisation du mutex entre le thread d'équilibrage +et le thread de calcul. Il semble gardé beaucoup trop longtemps. + +Bon, une partie du problème est rectifiée par le commit +48de954 Stop locking the mutex on data_receive. + +Pour le reste, je pense maintenant que ça ne gêne pas, au moins dans +le simulateur. Pour faire bien, il faudrait plus séparer les deux +threads d'équilibrage et de calcul, et faire en sorte que chacun garde +un cache des données globales partagées. Il suffirait alors de +synchroniser ces caches à chaque itération. + +Les données partagées sont essentiellement les données des voisins : +load, to_send et debt. + ======================================================================== Comment expliquer ces différences entre SG 3.5 et SG svn ? diff --git a/Makefile b/Makefile index 751418c..5b0d491 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ #SIMGRID_INSTALL_DIR ?= $(PWD)/simgrid-stable SIMGRID_INSTALL_DIR ?= $(PWD)/simgrid-dev -OPTIM_FLAGS += -O3 +OPTIM_FLAGS += -pipe -O3 DEBUG_FLAGS += -g #DEBUG_FLAGS += -pg CHECK_FLAGS += -Wall -Wextra @@ -16,7 +16,7 @@ CPPFLAGS += $(CHECK_FLAGS) #CFLAGS += -fgnu89-inline # workaround simgrid bug CFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS) -#CXXFLAGS += -std=c++0x +CXXFLAGS += -std=c++0x CXXFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS) LDFLAGS += -L $(SIMGRID_INSTALL_DIR)/lib @@ -26,7 +26,8 @@ LINK.o = $(CXX) $(CXXFLAGS) $(LDFLAGS) $(TARGET_ARCH) LDLIBS := -lsimgrid MAKEDEPEND.FLAGS = $(CPPFLAGS) -MM -MG -MF $@ $< -MAKEDEPEND.CXX = $(CXX) $(MAKEDEPEND.FLAGS) +MAKEDEPEND.C = $(CC) $(CFLAGS) $(MAKEDEPEND.FLAGS) +MAKEDEPEND.CXX = $(CXX) $(CXXFLAGS) $(MAKEDEPEND.FLAGS) SRC.loba := main.cpp \ communicator.cpp \ @@ -73,6 +74,8 @@ realclean: clean $(RM) $(XML_DEV_FILES) $(RM) *~ +.%.d: %.c ; $(MAKEDEPEND.C) + .%.d: %.cpp ; $(MAKEDEPEND.CXX) $(FLAVOURED_LOBA): diff --git a/NOTES b/NOTES index 5399a4c..53bce11 100644 --- a/NOTES +++ b/NOTES @@ -71,3 +71,7 @@ process::expected_load Current load estimation. minus pending sends. * With bookkeeping, it corresponds to the "virtual load". + +process::received_load Real load received from neighbors. + Used when receiveing data messages, and then + added to real_load. diff --git a/README b/README index 694f795..8fcbc7c 100644 --- a/README +++ b/README @@ -199,9 +199,15 @@ Liste de fichiers main.cpp le programme principal + message.h file de messages reçus + message.cpp + misc.h divers trucs inclassables misc.cpp + msg_thread.h creation de threads SG/MSG + msg_thread.cpp + named_object_list.h gestion d'une table de constructeurs avec des noms et des descriptions @@ -216,10 +222,16 @@ Liste de fichiers simgrid_features.h macros pour détecter la version de SimGrid - simple_async.cpp un simple programme de test + statistics.h pour calculer moyenne, variance, etc. + + synchro.h mutex, condition, etc. + + sync_queue.h lock-free synchronized queue timer.h gestion de timer + tracing.h définitions liées au traçage + version.h gestion de la version du programme version.cpp @@ -231,11 +243,14 @@ Liste de fichiers extract.pl outil d'extraction des données à partir des traces, pour tracer des courbes + new_loba.sh pour créer le squelette d'un nouvel algo + d'équiblibrage loba_* + setlocalversion calcule un numéro de version à partir du hash du dernier commit (git) * autres fichiers .gitignore liste des fichiers ignorés par git - valgrind_suppressions_3.5 liste de quelques suppressions pour valgrind + valgrind_suppressions liste de quelques suppressions pour valgrind avec SimGrid 3.5 diff --git a/communicator.cpp b/communicator.cpp index 867d19e..2ddea5d 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include @@ -24,9 +24,8 @@ namespace { communicator::communicator() : host(static_cast(MSG_host_get_data(MSG_host_self()))) { - using std::tr1::bind; receiver_thread = new_msg_thread("receiver", - bind(&communicator::receiver, this)); + std::bind(&communicator::receiver, this)); receiver_thread->start(); } @@ -56,7 +55,7 @@ msg_comm_t communicator::real_send(const char* dest, message* msg) XBT_DEBUG("send %s to %s", msg->to_string().c_str(), dest); m_task_t task = MSG_task_create("message", 0.0, msg->get_size(), msg); TRACE_msg_set_task_category(task, - msg->get_type() == message::LOAD ? + msg->get_type() == message::DATA ? TRACE_CAT_DATA : TRACE_CAT_CTRL); return MSG_task_isend(task, dest); } diff --git a/cost_func.cpp b/cost_func.cpp index 12c20f6..d6cba8c 100644 --- a/cost_func.cpp +++ b/cost_func.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include #include @@ -30,13 +30,12 @@ cost_func::~cost_func() double cost_func::operator()(double amount) const { - using std::tr1::bind; - using std::tr1::placeholders::_1; - using std::tr1::placeholders::_2; + using std::placeholders::_1; + using std::placeholders::_2; return std::accumulate(++factors.begin(), factors.end(), factors.front(), - bind(std::plus(), - bind(std::multiplies(), amount, _1), - _2)); + std::bind(std::plus(), + std::bind(std::multiplies(), + amount, _1), _2)); } std::string cost_func::to_string() diff --git a/deployment.cpp b/deployment.cpp index 705c1ea..8f52806 100644 --- a/deployment.cpp +++ b/deployment.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include @@ -50,8 +50,7 @@ void deployment_generator::set_link(int host1, int host2) void deployment_generator::distribute_load() { - using std::tr1::bind; - using std::tr1::placeholders::_1; + using std::placeholders::_1; if (!opt::auto_depl::random_distribution) { set_load(0, opt::auto_depl::load); @@ -63,7 +62,7 @@ void deployment_generator::distribute_load() double factor = opt::auto_depl::load / std::accumulate(loads.begin(), loads.end(), 0.0); std::transform(loads.begin(), loads.end(), loads.begin(), - bind(std::multiplies(), _1, factor)); + std::bind(std::multiplies(), _1, factor)); for (unsigned i = 0 ; i < hosts.size() ; ++i) set_load(i, loads[i]); } diff --git a/hostdata.cpp b/hostdata.cpp index 17d7a08..ee8fadc 100644 --- a/hostdata.cpp +++ b/hostdata.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include @@ -16,18 +16,18 @@ std::vector hostdata::hosts; void hostdata::create() { - using std::tr1::bind; - using std::tr1::placeholders::_1; - using std::tr1::placeholders::_2; + using std::placeholders::_1; + using std::placeholders::_2; int nhosts = MSG_get_host_number(); m_host_t* host_list = MSG_get_host_table(); // only sort hosts for automatically created deployment if (opt::auto_depl::enabled) std::sort(host_list, host_list + nhosts, - bind(std::less(), bind(strcmp, - bind(MSG_host_get_name, _1), - bind(MSG_host_get_name, _2)), 0)); + std::bind(std::less(), + std::bind(strcmp, + std::bind(MSG_host_get_name, _1), + std::bind(MSG_host_get_name, _2)), 0)); hosts.assign(host_list, host_list + nhosts); xbt_free(host_list); diff --git a/main.cpp b/main.cpp index a88378d..ed8039e 100644 --- a/main.cpp +++ b/main.cpp @@ -1,7 +1,7 @@ #include +#include #include // strchr #include -#include #include #include #include @@ -38,7 +38,9 @@ namespace { EXIT_FAILURE_INIT = 0x02, // failed to initialize simulator EXIT_FAILURE_SIMU = 0x04, // simulation failed EXIT_FAILURE_CLEAN = 0x08, // error at cleanup - EXIT_FAILURE_OTHER = 0x10, // other error + EXIT_FAILURE_INTR = 0x10, // interrupted by user + EXIT_FAILURE_LOAD = 0x20, // lost load on exit + EXIT_FAILURE_OTHER = 0x40, // other error }; // Cannot be globally initialized... @@ -106,32 +108,38 @@ static int simulation_main(int argc, char* argv[]) return result; } -static void check_for_lost_load() +static bool check_for_lost_load() { + bool res = true; double total_init = process::get_total_load_init(); double total_exit = process::get_total_load_exit(); double lost = total_init - total_exit; double lost_ratio = 100.0 * lost / total_init; - if (lost_ratio < -opt::load_ratio_threshold) + if (lost_ratio < -opt::load_ratio_threshold) { XBT_ERROR("Gained load at exit! %g (%g%%) <============", -lost, -lost_ratio); - else if (lost_ratio > opt::load_ratio_threshold) + res = false; + } else if (lost_ratio > opt::load_ratio_threshold) { XBT_ERROR("Lost load at exit! %g (%g%%) <============", lost, lost_ratio); - else + res = false; + } else XBT_VERB("Total load at exit looks good: %g (%g%%)", lost, lost_ratio); double total_running = process::get_total_load_running(); double running_ratio = 100.0 * total_running / total_init; - if (running_ratio < -opt::load_ratio_threshold) + if (running_ratio < -opt::load_ratio_threshold) { XBT_ERROR("Negative running load at exit! %g (%g%%) <============", total_running, running_ratio); - else if (running_ratio > opt::load_ratio_threshold) + res = false; + } else if (running_ratio > opt::load_ratio_threshold) { XBT_ERROR("Remaining running load at exit! %g (%g%%) <============", total_running, running_ratio); - else + res = false; + } else XBT_VERB("Running load at exit looks good: %g (%g%%)", total_running, running_ratio); + return res; } static void signal_handler(int /*sig*/) @@ -140,7 +148,12 @@ static void signal_handler(int /*sig*/) XBT_CRITICAL(">>>>>>>>>>" " caught CTRL-C: global exit requested " "<<<<<<<<<<"); - opt::exit_request = true; + opt::exit_request = 1; + } else { + XBT_CRITICAL(">>>>>>>>>>" + " caught CTRL-C for the 2nd time: exit immediately " + "<<<<<<<<<<"); + exit(EXIT_FAILURE_INTR); } } @@ -150,7 +163,7 @@ static void install_signal_handler() action.sa_handler = signal_handler; sigemptyset(&action.sa_mask); action.sa_flags = SA_RESTART; - if (sigaction (SIGINT, &action, NULL) == -1) { + if (sigaction(SIGINT, &action, NULL) == -1) { std::cerr << "sigaction: " << strerror(errno) << "\n"; exit(EXIT_FAILURE_OTHER); } @@ -282,7 +295,8 @@ int main(int argc, char* argv[]) if (simulated_time >= 0.0) { simulation_time.stop(); elapsed_time.stop(); - check_for_lost_load(); + if (!check_for_lost_load()) + exit_status |= EXIT_FAILURE_LOAD; XBT_INFO(",----[ Results ]"); PR_STATS("Load", loads); PR_STATS("Computation", comps); diff --git a/messages.cpp b/messages.cpp index f5df396..fd19f6b 100644 --- a/messages.cpp +++ b/messages.cpp @@ -8,61 +8,75 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm); #include "messages.h" +message::message(message_type t, double a, double c) + : type(t), amount(a) , credit(c) +{ + // compute message size + // arbitrary: 8 for type, and 8 for each double + switch (type) { + case CTRL: + size = opt::bookkeeping ? 24 : 16; // type + amount + (credit)? + break; + case DATA: + size = 16 + opt::comm_cost(amount); // type + amount + data size + break; + default: + size = 8; // type + break; + } +} + std::string message::to_string() { - static const char* str[] = { "INFO", "CREDIT", "LOAD", - "CTRL_CLOSE", "DATA_CLOSE" }; + static const char* str[DATA_CLOSE + 1] = { "CTRL", "DATA", + "CTRL_CLOSE", "DATA_CLOSE" }; std::ostringstream oss; oss << str[type] << ": " << amount; return oss.str(); } -double message::get_size() const -{ - // arbitrary: 8 for type, and 8 for amount - double size = 16; - if (type == LOAD) - size += opt::comm_cost(amount); - return size; -} - void message_queue::push(m_task_t task) { - mutex.acquire(); - queue.push(task); - cond.signal(); - mutex.release(); + if (queue.push(task)) { + // list was empty, the push must be signaled + mutex.acquire(); + cond.signal(); + mutex.release(); + } } bool message_queue::pop(message*& msg, m_host_t& from, double timeout) { - if (timeout != 0) { - volatile double deadline = - timeout > 0 ? MSG_get_clock() + timeout : 0.0; + m_task_t task; + if (!queue.try_pop(task)) { + if (timeout == 0.0) + return false; + mutex.acquire(); - while (queue.empty() && (!deadline || deadline > MSG_get_clock())) { + if (!queue.try_pop(task)) { xbt_ex_t e; XBT_DEBUG("waiting for a message to come"); TRY { - if (deadline) - cond.timedwait(mutex, deadline - MSG_get_clock()); + if (timeout > 0) + cond.timedwait(mutex, timeout); else cond.wait(mutex); } + TRY_CLEANUP { + mutex.release(); + } CATCH (e) { if (e.category != timeout_error) RETHROW; xbt_ex_free(e); + return false; // got a timeout } + bool pop_was_successful = queue.try_pop(task); + xbt_assert(pop_was_successful); + } else { + mutex.release(); } - mutex.release(); } - - if (queue.empty()) - return false; - - m_task_t task = queue.front(); - queue.pop(); msg = static_cast(MSG_task_get_data(task)); from = MSG_task_get_source(task); MSG_task_destroy(task); diff --git a/messages.h b/messages.h index 605b3be..1886301 100644 --- a/messages.h +++ b/messages.h @@ -5,22 +5,26 @@ #include #include #include "synchro.h" +#include "sync_queue.h" class message { public: - enum message_type { INFO, CREDIT, LOAD, CTRL_CLOSE, DATA_CLOSE }; + enum message_type { CTRL, DATA, CTRL_CLOSE, DATA_CLOSE }; - message(message_type t, double a): type(t), amount(a) { } + message(message_type t, double a, double c = 0.0); message_type get_type() const { return type; } double get_amount() const { return amount; } - double get_size() const; + double get_credit() const { return credit; } + double get_size() const { return size; } std::string to_string(); private: message_type type; double amount; + double credit; + double size; }; class message_queue { @@ -39,7 +43,7 @@ public: private: mutex_t mutex; condition_t cond; - std::queue queue; + sync_queue queue; }; #endif // !MESSAGES_H diff --git a/options.cpp b/options.cpp index 3e0f92d..3eed8a2 100644 --- a/options.cpp +++ b/options.cpp @@ -39,7 +39,7 @@ namespace opt { // Simulation parameters int log_rate = 1; - bool exit_request = false; + volatile std::sig_atomic_t exit_request = 0; // Platform and deployment std::string platform_file; diff --git a/options.h b/options.h index 604d178..0c69e11 100644 --- a/options.h +++ b/options.h @@ -1,6 +1,7 @@ #ifndef OPTIONS_H #define OPTIONS_H +#include // std::sig_atomic_t #include #include "cost_func.h" #include "named_object_list.h" @@ -23,7 +24,7 @@ namespace opt { // Simulation parameters extern int log_rate; - extern bool exit_request; + extern volatile std::sig_atomic_t exit_request; // Platform and deployment extern std::string platform_file; diff --git a/process.cpp b/process.cpp index 54fea8a..2de3016 100644 --- a/process.cpp +++ b/process.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include #include #include @@ -50,6 +50,7 @@ process::process(int argc, char* argv[]) expected_load = real_load; total_load_running += real_load; total_load_init += real_load; + received_load = 0.0; ctrl_close_pending = data_close_pending = neigh.size(); close_received = false; @@ -58,8 +59,7 @@ process::process(int argc, char* argv[]) comp_iter = lb_iter = 0; lb_thread = new_msg_thread("loba", - std::tr1::bind(&process::load_balance_loop, - this)); + std::bind(&process::load_balance_loop, this)); e_xbt_log_priority_t logp = xbt_log_priority_verbose; if (!LOG_ISENABLED(logp)) @@ -70,7 +70,7 @@ process::process(int argc, char* argv[]) oss << ESSE(neigh.size()) << ": "; std::transform(neigh.begin(), neigh.end() - 1, std::ostream_iterator(oss, ", "), - std::tr1::mem_fn(&neighbor::get_name)); + std::mem_fn(&neighbor::get_name)); oss << neigh.back().get_name(); } XBT_LOG(logp, "Got %s.", oss.str().c_str()); @@ -81,6 +81,8 @@ process::~process() { delete lb_thread; total_load_exit += real_load; + xbt_assert(received_load == 0.0, + "received_load is %g, but should be 0.0 !", received_load); if (opt::log_rate < 0) return; XBT_INFO("Final load after %d:%d iterations: %g", @@ -112,8 +114,7 @@ int process::run() void process::load_balance_loop() { - using std::tr1::bind; - using std::tr1::placeholders::_1; + using std::placeholders::_1; double next_iter_after_date = MSG_get_clock() + opt::min_lb_iter_duration; while (still_running()) { @@ -144,7 +145,7 @@ void process::load_balance_loop() // send comm.ctrl_flush(false); std::for_each(neigh.begin(), neigh.end(), - bind(&process::ctrl_send, this, _1)); + std::bind(&process::ctrl_send, this, _1)); prev_load_broadcast = expected_load; mutex.release(); @@ -156,10 +157,10 @@ void process::load_balance_loop() XBT_DEBUG("send CTRL_CLOSE to %zu neighbor%s", neigh.size(), ESSE(neigh.size())); std::for_each(neigh.begin(), neigh.end(), - bind(&process::ctrl_close, this, _1)); + std::bind(&process::ctrl_close, this, _1)); while (ctrl_close_pending) { comm.ctrl_flush(false); - XBT_DEBUG("waiting for %d CTRL CLOSE", ctrl_close_pending); + XBT_DEBUG("waiting for %d CTRL_CLOSE", ctrl_close_pending); ctrl_receive(-1.0); } comm.ctrl_flush(true); @@ -167,24 +168,20 @@ void process::load_balance_loop() void process::compute_loop() { - using std::tr1::bind; - using std::tr1::placeholders::_1; + using std::placeholders::_1; double next_iter_after_date = MSG_get_clock() + opt::min_comp_iter_duration; while (still_running()) { - // receive - mutex.acquire(); - if (real_load > 0.0) - data_receive(0.0); - else - data_receive(opt::min_comp_iter_duration); - mutex.release(); + // receive (do not block if there is something to compute) + data_receive(real_load > 0.0 ? 0.0 : opt::min_comp_iter_duration); // send comm.data_flush(false); mutex.acquire(); + real_load += received_load; + received_load = 0.0; std::for_each(neigh.begin(), neigh.end(), - bind(&process::data_send, this, _1)); + std::bind(&process::data_send, this, _1)); mutex.release(); if (real_load == 0.0) @@ -204,20 +201,19 @@ void process::compute_loop() } XBT_VERB("Going to finalize for %s...", __func__); - // last send, for not losing load scheduled to be sent - std::for_each(neigh.begin(), neigh.end(), - bind(&process::data_send, this, _1)); finalizing = true; - total_load_running -= real_load; XBT_DEBUG("send DATA_CLOSE to %zu neighbor%s", neigh.size(), ESSE(neigh.size())); std::for_each(neigh.begin(), neigh.end(), - bind(&process::data_close, this, _1)); + std::bind(&process::data_close, this, _1)); while (data_close_pending) { comm.data_flush(false); - XBT_DEBUG("waiting for %d DATA CLOSE", data_close_pending); + XBT_DEBUG("waiting for %d DATA_CLOSE", data_close_pending); data_receive(-1.0); } + real_load += received_load; + received_load = 0.0; + total_load_running -= real_load; comm.data_flush(true); } @@ -265,13 +261,12 @@ bool process::still_running() double process::get_sum_of_to_send() const { - using std::tr1::bind; - using std::tr1::placeholders::_1; - using std::tr1::placeholders::_2; + using std::placeholders::_1; + using std::placeholders::_2; return std::accumulate(neigh.begin(), neigh.end(), 0.0, - bind(std::plus(), - _1, bind(&neighbor::get_to_send, _2))); + std::bind(std::plus(), _1, + std::bind(&neighbor::get_to_send, _2))); } void process::load_balance() @@ -290,20 +285,20 @@ void process::send(neighbor& nb, double amount) void process::ctrl_send(neighbor& nb) { double info_to_send = expected_load; - if (info_to_send != prev_load_broadcast) { - message* msg = new message(message::INFO, info_to_send); - add_ctrl_send_mesg(msg->get_size()); - comm.ctrl_send(nb.get_ctrl_mbox(), msg); - } - if (opt::bookkeeping) { - double debt_to_send = nb.get_to_send(); + double debt_to_send; + if (opt::bookkeeping) { // bookkeeping + debt_to_send = nb.get_to_send(); if (debt_to_send > 0.0) { nb.set_to_send(0.0); nb.set_debt(nb.get_debt() + debt_to_send); - message* msg = new message(message::CREDIT, debt_to_send); - add_ctrl_send_mesg(msg->get_size()); - comm.ctrl_send(nb.get_ctrl_mbox(), msg); } + } else { // !bookkeeping + debt_to_send = 0.0; + } + if (info_to_send != prev_load_broadcast || debt_to_send > 0.0) { + message* msg = new message(message::CTRL, info_to_send, debt_to_send); + add_ctrl_send_mesg(msg->get_size()); + comm.ctrl_send(nb.get_ctrl_mbox(), msg); } } @@ -333,7 +328,7 @@ void process::data_send(neighbor& nb) amount = std::min(load_to_send, opt::max_transfer_amount); else amount = load_to_send; - message* msg = new message(message::LOAD, amount); + message* msg = new message(message::DATA, amount); add_data_send_mesg(msg->get_size()); comm.data_send(nb.get_data_mbox(), msg); load_to_send -= amount; @@ -381,19 +376,15 @@ void process::data_receive(double timeout) void process::handle_message(message* msg, m_host_t from) { switch (msg->get_type()) { - case message::INFO: { + case message::CTRL: { neighbor* n = rev_neigh[from]; n->set_load(msg->get_amount() + n->get_to_send()); + expected_load += msg->get_credit(); // may be 0.0 if !opt::bookkeeping break; } - case message::CREDIT: - expected_load += msg->get_amount(); - break; - case message::LOAD: { + case message::DATA: { double ld = msg->get_amount(); - real_load += ld; - if (finalizing) - total_load_running -= ld; + received_load += ld; break; } case message::CTRL_CLOSE: @@ -410,12 +401,11 @@ void process::handle_message(message* msg, m_host_t from) #define print_loads_generic(vec, verbose, logp, cat) \ if (_XBT_LOG_ISENABLEDV((*cat), logp)) { \ - using std::tr1::bind; \ - using std::tr1::placeholders::_1; \ + using std::placeholders::_1; \ XBT_XCLOG(cat, logp, "My load: %g (real); %g (expected). " \ "Neighbor loads:", real_load, expected_load); \ std::for_each(vec.begin(), vec.end(), \ - bind(&neighbor::print, _1, verbose, logp, cat)); \ + std::bind(&neighbor::print, _1, verbose, logp, cat)); \ } else ((void)0) void process::print_loads(bool verbose, diff --git a/process.h b/process.h index ed6e6cf..57ece07 100644 --- a/process.h +++ b/process.h @@ -5,10 +5,10 @@ //#undef USE_UNORDERED_MAP #include -#include +#include #ifdef USE_UNORDERED_MAP -# include -# define MAP_TEMPLATE std::tr1::unordered_map +# include +# define MAP_TEMPLATE std::unordered_map #else # include # define MAP_TEMPLATE std::map @@ -102,6 +102,7 @@ private: // a same information messages double real_load; // current load double expected_load; // expected load in bookkeeping mode + double received_load; // load received from neighbors mutex_t mutex; // synchronization between threads condition_t cond; @@ -169,13 +170,12 @@ private: template void process::pneigh_sort_by_load(const Compare& comp) { - using std::tr1::bind; - using std::tr1::placeholders::_1; - using std::tr1::placeholders::_2; + using std::placeholders::_1; + using std::placeholders::_2; std::sort(pneigh.begin(), pneigh.end(), - bind(comp, - bind(&neighbor::get_load, _1), - bind(&neighbor::get_load, _2))); + std::bind(comp, + std::bind(&neighbor::get_load, _1), + std::bind(&neighbor::get_load, _2))); } #endif // !PROCESS_H diff --git a/sync_queue.h b/sync_queue.h new file mode 100644 index 0000000..c3f29e5 --- /dev/null +++ b/sync_queue.h @@ -0,0 +1,112 @@ +#ifndef SYNC_QUEUE_H +#define SYNC_QUEUE_H + +#if __GNUC__ == 4 && __GNUC_MINOR__ == 4 +# include // is named in gcc 4.4 + +template // fix missing definition in gcc 4.4 +void +atomic<_Tp*>::store(_Tp* __v, memory_order __m) volatile +{ atomic_address::store(__v, __m); } + +#else +# include +#endif + +#define SYNC_QUEUE_BUFSIZE 16 + +template +class sync_queue { +public: + sync_queue() + { + head_node = tail_node = new node(); + head.store(head_node->values); + tail.store(tail_node->values); + } + + ~sync_queue() + { + node* n = head_node; + while (n != NULL) { + node* prev = n; + n = n->next; + delete prev; + } + } + + bool empty() const + { + return head.load() == tail.load(); + } + + // size() is not not thread-safe + size_t size() const + { + size_t count = 0; + if (head_node == tail_node) { + count = tail.load() - head.load(); + } else { + count = + (head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) - head.load(); + for (node* n = head_node->next; n != tail_node; n = n->next) + count += SYNC_QUEUE_BUFSIZE; + count += tail.load() - tail_node->values; + } + return count; + } + + bool push(const T& val) + { + T* old_tail = tail.load(); + T* new_tail; + if (old_tail == tail_node->values + (SYNC_QUEUE_BUFSIZE - 1)) { + tail_node->next = new node(); + tail_node = tail_node->next; + new_tail = tail_node->values; + } else { + new_tail = old_tail + 1; + } + *new_tail = val; + tail.store(new_tail); + return (old_tail == head.load()); + } + + bool try_pop(T& res) + { + T* old_head = head.load(); + if (old_head == tail.load()) // empty? + return false; + + T* new_head; + if (old_head == head_node->values + (SYNC_QUEUE_BUFSIZE - 1)) { + node* old_head_node = head_node; + head_node = head_node->next; + delete old_head_node; + new_head = head_node->values; + } else { + new_head = old_head + 1; + } + res = *new_head; + head.store(new_head); + return true; + } + +private: + struct node { + node(): next(NULL) { } + T values[SYNC_QUEUE_BUFSIZE]; + node* next; + }; + + node* head_node; + node* tail_node; + std::atomic head; + std::atomic tail; +}; + +#endif // !SYNC_QUEUE_H + +// Local variables: +// mode: c++ +// End: