From: Raphael Couturier Date: Sun, 30 Jan 2011 13:35:19 +0000 (+0100) Subject: merge de loba_fairstrategy X-Git-Tag: v0.1~187^2 X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/commitdiff_plain/d4fbe175e440c3e9de9b5468b85bf56daab62bff?hp=19bd6da8ab56b6bf46c380491ca84fe845a68fcd merge de loba_fairstrategy Conflicts: loba_fairstrategy.cpp --- diff --git a/.gitignore b/.gitignore index 1b9d7b7..9dea7c1 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ *.d *.o -cachegrind.out.* +callgrind.out.* gmon.out core @@ -12,8 +12,12 @@ localversion misc_autogen.h loba +loba-dev +loba-stable simple_async +*_dev.xml + simgrid-dev simgrid-stable */ diff --git a/BUGS b/BUGS new file mode 100644 index 0000000..5cc1040 --- /dev/null +++ b/BUGS @@ -0,0 +1,17 @@ +======================================================================== +Il semblerait qu'il y ait un bug dans SG 3.5, et qu'on ne puisse pas +utiliser MSG_comm_waitany() pour l'émetteur *et* le récepteur sans +risquer d'interblocage. + +Le problème devrait être contourné correctement depuis le commit +cd6b253 Use MSG_comm_waitall for communicator::flush(true). + +======================================================================== +Avec SG 3.5, les communications doivent être détruites dès que +possible avec MSG_comm_destroy(). Si ce n'est pas fait, la simulation +peut être extrêmement ralentie. + +Le problème devrait être contourné correctement depuis le commit +404a8d5 Do not call flush automatically in communcator::send... + +======================================================================== diff --git a/Makefile b/Makefile index a80f025..2760531 100644 --- a/Makefile +++ b/Makefile @@ -6,11 +6,16 @@ DEBUG_FLAGS += -g #DEBUG_FLAGS += -pg CHECK_FLAGS += -Wall -Wextra +CC := gcc CXX := g++ CPPFLAGS += -I $(SIMGRID_INSTALL_DIR)/include CPPFLAGS += $(CHECK_FLAGS) +#CFLAGS += -std=c99 +#CFLAGS += -fgnu89-inline # workaround simgrid bug +CFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS) + #CXXFLAGS += -std=c++0x CXXFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS) @@ -46,13 +51,25 @@ FLAVOURED_LOBA := loba-dev loba-stable TARGETS := $(DEFAULT_TARGETS) \ simple_async -.PHONY: all full clean realclean $(FLAVOURED_LOBA) +XML_FILES = \ + Dep.xml Plat.xml \ + platform.xml deployment.xml simple_async.xml \ + cluster1000.xml + +XML_DEV_FILES = $(XML_FILES:%.xml=%_dev.xml) + +.PHONY: all full xml clean realclean $(FLAVOURED_LOBA) all: $(DEFAULT_TARGETS) full: - $(MAKE) $(FLAVOURED_LOBA) - $(MAKE) $(TARGETS) + @for target in $(FLAVOURED_LOBA); do \ + echo $(MAKE) "$$target"; \ + $(MAKE) "$$target"; \ + done + $(MAKE) xml $(DEFAULT_TARGETS) + +xml: $(XML_DEV_FILES) clean: $(RM) core core.[0-9]* vgcore.[0-9]* @@ -64,10 +81,14 @@ clean: realclean: clean $(RM) $(FLAVOURED_LOBA) + $(RM) $(XML_DEV_FILES) $(RM) *~ %.d: %.cpp ; $(MAKEDEPEND.CXX) +%_dev.xml: %.xml + sed '/DOCTYPE/s,simgrid.dtd,http://simgrid.gforge.inria.fr/&,' $< > $@ + $(FLAVOURED_LOBA): $(MAKE) clean $(MAKE) SIMGRID_INSTALL_DIR=./simgrid-$(patsubst loba-%,%,$@) loba diff --git a/README b/README index d34cb4b..bf2d915 100644 --- a/README +++ b/README @@ -3,6 +3,7 @@ Contenu * Compilation de SimGrid * Compilation... * Utilisation +* Tracé de courbes * Communications * Pour ajouter un nouvel algorithme d'équilibrage * Pour ajouter une nouvelle option au programme @@ -45,6 +46,16 @@ Pour changer le niveau de détail des affichages : Pour plus de détail sur les options de logging : http://simgrid.gforge.inria.fr/doc/group__XBT__log.html#log_use +Tracé de courbes +================ + +Le script extract.pl permet d'extraire les données à partir des traces +de simulation et de le présenter sous un format acceptable par gnuplot +ou par graph (plotutils). + +Exemple: + ./loba platform.xml 2>&1 | ./extract.pl | graph -CTX + Communications ============== @@ -81,15 +92,12 @@ Pour ajouter un nouvel algorithme d'équilibrage - définir une nouvelle classe dérivant de process - attention, il faut construire le process explicitement - redéfinir la méthode load_balance qui : - - reçoit en paramètre la charge à prendre en compte ; + - peut récupérer la charge courante avec get_load() - peut utiliser et éventuellement réordonner le tableau process::pneigh ; - peut récupérer l'information de charge d'un voisin avec pneigh[i]->get_load() ; - définit la charge à envoyer avec - pneigh[i]->set_to_send(quantité) ; - - retourne la somme des quantités définies avec set_to_send, - éventuellement à l'aide de la méthode process::sum_of_to_send() - qui clacule cette somme. + send(pneigh[i], quantité) ; 2. Ajouter l'algorithme dans la liste des options. Dans options.cpp : - faire le #include adéquat ; @@ -141,6 +149,8 @@ Liste de fichiers loba_simple.h équilibrage simple loba_simple.cpp (à imiter pour ajouter d'autres algorithmes) + loba_*.{h,cpp} autres algos d'équilibrage + main.cpp le programme principal misc.h divers trucs inclassables @@ -167,6 +177,21 @@ Liste de fichiers version.h gestion de la version du programme version.cpp +* fichiers auto-générés + + misc_autogen.h définition des macros XCLOG(...) + +* scripts + + colorized-loba script pour exécuter loba en colorant les + sorties + + extract.pl outil d'extraction des données à partir des + traces, pour tracer des courbes + + setlocalversion calcule un numéro de version à partir du hash + du dernier commit (git) + * autres fichiers .gitignore liste des fichiers ignorés par git diff --git a/TODO b/TODO index 6fedf48..dd423e2 100644 --- a/TODO +++ b/TODO @@ -1,8 +1,13 @@ +* review receive with timeout. + * verify bookkeeping version. -* add options -j/-J : minimum number of iterations ? +* add several metrics + - message exchanges : number/volume of sent/received data/ctrl messages + +* add options -j/-J : minimum number of iterations? -* add a variant to (not) change neighbor load information at send. +* add a variant to (not) change neighbor load information at send? * implement loba_* algorithms (start with some trivial one) diff --git a/communicator.cpp b/communicator.cpp index 0f9bc7e..e30d731 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -9,6 +9,8 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm); #include "misc.h" #include "options.h" +#include "simgrid_features.h" +#include "tracing.h" #include "communicator.h" @@ -21,13 +23,10 @@ std::string message::to_string() return oss.str(); } -int communicator::send_count_before_flush = 4; - communicator::communicator() - : host((hostdata* )MSG_host_get_data(MSG_host_self())) + : host(static_cast(MSG_host_get_data(MSG_host_self()))) , mutex(xbt_mutex_init()) , cond(xbt_cond_init()) - , send_counter(0) , ctrl_task(NULL) , ctrl_comm(NULL) , data_task(NULL) @@ -37,6 +36,7 @@ communicator::communicator() receiver_process = MSG_process_create("receiver", communicator::receiver_wrapper, this, MSG_host_self()); + xbt_cond_wait(cond, mutex); // wait for the receiver to be ready xbt_mutex_release(mutex); } @@ -81,22 +81,33 @@ void communicator::send(const char* dest, message* msg) if (msg->get_type() == message::LOAD) msg_size += opt::comm_cost(msg->get_amount()); m_task_t task = MSG_task_create("message", 0.0, msg_size, msg); + TRACE_msg_set_task_category(task, + msg->get_type() == message::LOAD ? + TRACE_CAT_DATA : TRACE_CAT_CTRL); msg_comm_t comm = MSG_task_isend(task, dest); sent_comm.push_back(comm); - - if (++send_counter >= send_count_before_flush) { - flush(false); - send_counter = 0; - } } -bool communicator::recv(message*& msg, m_host_t& from, bool wait) +bool communicator::recv(message*& msg, m_host_t& from, double timeout) { - if (wait) { + if (timeout != 0) { + volatile double deadline = + timeout > 0 ? MSG_get_clock() + timeout : 0.0; xbt_mutex_acquire(mutex); - while (received.empty()) { + while (received.empty() && (!deadline || deadline > MSG_get_clock())) { + xbt_ex_t e; DEBUG0("waiting for a message to come"); - xbt_cond_wait(cond, mutex); + TRY { + if (deadline) + xbt_cond_timedwait(cond, mutex, deadline - MSG_get_clock()); + else + xbt_cond_wait(cond, mutex); + } + CATCH (e) { + if (e.category != timeout_error) + RETHROW; + xbt_ex_free(e); + } } xbt_mutex_release(mutex); } @@ -106,7 +117,7 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait) m_task_t task = received.front(); received.pop(); - msg = (message* )MSG_task_get_data(task); + msg = static_cast(MSG_task_get_data(task)); from = MSG_task_get_source(task); MSG_task_destroy(task); @@ -118,21 +129,14 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait) void communicator::flush(bool wait) { - using std::tr1::bind; - using std::tr1::placeholders::_1; - sent_comm.remove_if(comm_test_n_destroy); if (wait && !sent_comm.empty()) { - xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL); - while (!sent_comm.empty()) { - std::for_each(sent_comm.begin(), sent_comm.end(), - bind(xbt_dynar_push, - comms, bind(misc::address(), _1))); - MSG_comm_waitany(comms); - xbt_dynar_reset(comms); - sent_comm.remove_if(comm_test_n_destroy); - } - xbt_dynar_free(&comms); + msg_comm_t comms[sent_comm.size()]; + std::copy(sent_comm.begin(), sent_comm.end(), comms); + MSG_comm_waitall(comms, sent_comm.size(), -1.0); + if (!MSG_WAIT_DESTROYS_COMMS) + std::for_each(sent_comm.begin(), sent_comm.end(), MSG_comm_destroy); + sent_comm.clear(); } } @@ -148,7 +152,7 @@ bool communicator::comm_test_n_destroy(msg_comm_t comm) int communicator::receiver_wrapper(int, char* []) { communicator* comm; - comm = (communicator* )MSG_process_get_data(MSG_process_self()); + comm = static_cast(MSG_process_get_data(MSG_process_self())); int result = comm->receiver(); DEBUG0("terminate"); @@ -164,6 +168,11 @@ int communicator::receiver() { ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox()); data_comm = MSG_task_irecv(&data_task, get_data_mbox()); + DEBUG0("receiver ready"); + xbt_mutex_acquire(mutex); + xbt_cond_signal(cond); // signal master that we are ready + xbt_mutex_release(mutex); + xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL); while (ctrl_comm || data_comm) { @@ -177,7 +186,9 @@ int communicator::receiver() if (ctrl_comm && comm_test_n_destroy(ctrl_comm)) { if (strcmp(MSG_task_get_name(ctrl_task), "finalize")) { DEBUG0("received message from ctrl"); + xbt_mutex_acquire(mutex); received.push(ctrl_task); + xbt_mutex_release(mutex); ctrl_task = NULL; ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox()); } else { @@ -191,7 +202,9 @@ int communicator::receiver() if (data_comm && comm_test_n_destroy(data_comm)) { if (strcmp(MSG_task_get_name(data_task), "finalize")) { DEBUG0("received message from data"); + xbt_mutex_acquire(mutex); received.push(data_task); + xbt_mutex_release(mutex); data_task = NULL; data_comm = MSG_task_irecv(&data_task, get_data_mbox()); } else { @@ -202,7 +215,8 @@ int communicator::receiver() } } xbt_mutex_acquire(mutex); - xbt_cond_signal(cond); + if (!received.empty()) + xbt_cond_signal(cond); xbt_mutex_release(mutex); } xbt_dynar_free(&comms); diff --git a/communicator.h b/communicator.h index b04301a..66dc800 100644 --- a/communicator.h +++ b/communicator.h @@ -9,14 +9,6 @@ #include #include "hostdata.h" -// Cannot include "options.h" without error, so only declare the -// needed functions. -namespace opt { - bool parse_args(int* argc, char* argv[]); - void print(); - void usage(); -} - class message { public: enum message_type { INFO, CREDIT, LOAD, CTRL_CLOSE, DATA_CLOSE }; @@ -42,8 +34,9 @@ public: void send(const char* dest, message* msg); // Try to get a message. Returns true on success. - // If "wait" is true, blocks until success. - bool recv(message*& msg, m_host_t& from, bool wait); + // Parameter "timeout" may be 0 for non-blocking operation, -1 for + // infinite waiting, or any positive timeout. + bool recv(message*& msg, m_host_t& from, double timeout); // Try to flush pending sending communications. // If "wait" is true, blocks until success. @@ -59,8 +52,6 @@ private: // List of pending send communications std::list sent_comm; - static int send_count_before_flush; - int send_counter; // Queue of received messages std::queue received; @@ -83,12 +74,6 @@ private: // Used to test if a communication is over, and to destroy it if it is static bool comm_test_n_destroy(msg_comm_t comm); - - // Make opt::* functions our friends to provide them an access to - // send_count_before_flush - friend bool opt::parse_args(int*, char* []); - friend void opt::print(); - friend void opt::usage(); }; #endif // !COMMUNICATOR_H diff --git a/extract.pl b/extract.pl new file mode 100755 index 0000000..fa393b9 --- /dev/null +++ b/extract.pl @@ -0,0 +1,106 @@ +#!/usr/bin/env perl + +use strict; +use warnings; + +my $bookkeeping; +my $flt = '[+-]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?'; +my $pflt = "($flt)"; +my $prefix = '^\[([^: ]+)(?::loba:\(\d+\))? ' . $pflt . '\] \[proc/INFO\] '; +my $initmatch = $prefix . 'Initial load: ' . $pflt . ''; +my $finalmatch; +my $plainmatch; + +my %alldata = (); + +while (<>) { + chomp; + if (s{^(?:\[0\.0+\] )?\[main/INFO\] \| bookkeeping\.*: }{}) { + $bookkeeping = $_ eq "on"; + if ($bookkeeping) { + $finalmatch = $prefix . + 'Final load after (\d+):(\d+) iterations: ' . $pflt . + ' ; expected: ' . $pflt; + $plainmatch = $prefix . + '\((\d+):(\d+)\) current load: ' . $pflt . + ' ; expected: ' . $pflt; + } else { + $finalmatch = $prefix . + 'Final load after (\d+) iterations: ' . $pflt; + $plainmatch = $prefix . '\((\d+)\) current load: ' . $pflt; + } + if (0) { + print STDERR "BOOKKEEPING: \"$_\" ($bookkeeping)\n"; + print STDERR "INITMATCH..: \"$initmatch\"\n"; + print STDERR "PLAINMATCH.: \"$plainmatch\"\n"; + print STDERR "FINALMATCH.: \"$finalmatch\"\n"; + } + } + next if not defined $bookkeeping; + if (m{$plainmatch}) { + my $host = $1; + my $data; + if ($bookkeeping) { + $data = { + time => $2, + lb => $3, + comp => $4, + load => $5, + expected => $6, + }; + } else { + $data = { + time => $2, + lb => $3, + comp => $3, + load => $4, + expected => $4, + }; + } +# print STDERR "PUSH $host $data->{time} $data->{load} (plain)\n"; + push @{$alldata{$host}}, $data; + } if (m{$initmatch}) { + my $host = $1; + my $data = { + time => $2, + lb => 0, + comp => 0, + load => $3, + expected => $3, + }; +# print STDERR "PUSH $host $data->{time} $data->{load} (init)\n"; + push @{$alldata{$host}}, $data; + } elsif (m{$finalmatch}) { + my $host = $1; + my $data; + if ($bookkeeping) { + $data = { + time => $2, + lb => $3, + comp => $4, + load => $5, + expected => $6, + }; + } else { + $data = { + time => $2, + lb => $3, + comp => $3, + load => $4, + expected => $4, + }; + } +# print STDERR "PUSH $host $data->{time} $data->{load} (final)\n"; + push @{$alldata{$host}}, $data; + } +} + +foreach my $host (sort(keys %alldata)) { +# print STDERR "GOT \"$host\"\n"; + my $datalist = $alldata{$host}; + print "# $host\n"; + foreach my $data (@{$datalist}) { + print "$data->{time} $data->{load}\n"; + } + print "\n" +} diff --git a/loba_fairstrategy.cpp b/loba_fairstrategy.cpp index e1a0d29..7e02a21 100644 --- a/loba_fairstrategy.cpp +++ b/loba_fairstrategy.cpp @@ -11,39 +11,31 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(loba); */ class compare { -public : - bool operator()(const neighbor*a, const neighbor*b) { - return a->get_load()>b->get_load(); +public: + bool operator()(const neighbor*a, const neighbor*b) { + return a->get_load() > b->get_load(); } }; -double loba_fairstrategy::load_balance(double my_load) +void loba_fairstrategy::load_balance() { std::sort(pneigh.begin(), pneigh.end(), compare()); // print_loads_p(); + //print_loads_p(false, xbt_log_priority_debug); - double sum_sent=0; - bool found=true; - - while(found) { - found=false; + bool found = true; + + while (found) { + found = false; for (unsigned i = 0 ; i < pneigh.size() ; ++i) { - double l = pneigh[i]->get_load(); - if (l >= my_load) - continue; - if (l < my_load+2) { - found=true; - pneigh[i]->add_load(1); - pneigh[i]->add_to_send(1); - INFO1("sent to %s",pneigh[i]->get_name()); - my_load--; - sum_sent++; + if (pneigh[i]->get_load() <= get_load() - 2) { + found = true; + send(pneigh[i], 1); + DEBUG1("sent to %s", pneigh[i]->get_name()); } } } - - return sum_sent; } // Local variables: diff --git a/loba_fairstrategy.h b/loba_fairstrategy.h index 9984c36..3e09627 100644 --- a/loba_fairstrategy.h +++ b/loba_fairstrategy.h @@ -9,7 +9,7 @@ public: ~loba_fairstrategy() { } private: - double load_balance(double my_load); + void load_balance(); }; #endif //!LOBA_SIMPLE diff --git a/loba_simple.cpp b/loba_simple.cpp index 5702fba..2d4103c 100644 --- a/loba_simple.cpp +++ b/loba_simple.cpp @@ -8,15 +8,15 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(loba); * load balance with a least-loaded neighbor, * without breaking the ping-pong condition */ -double loba_simple::load_balance(double my_load) +void loba_simple::load_balance() { int imin = -1; int imax = -1; - double min = my_load; + double min = get_load(); double max = -1.0; for (unsigned i = 0 ; i < pneigh.size() ; ++i) { double l = pneigh[i]->get_load(); - if (l >= my_load) + if (l >= get_load()) continue; if (l < min) { imin = i; @@ -29,13 +29,9 @@ double loba_simple::load_balance(double my_load) } if (imin != -1) { // found someone - double balance = (my_load - min) / 2; - DEBUG6("%d:%g %d:%g %g %g", imin, min, imax, max, my_load, balance); - pneigh[imin]->set_to_send(balance); - pneigh[imin]->add_load(balance); - return balance; - } else { - return 0.0; + double balance = (get_load() - max) / 2; + DEBUG6("%d:%g %d:%g %g %g", imin, min, imax, max, get_load(), balance); + send(pneigh[imin], balance); } } diff --git a/loba_simple.h b/loba_simple.h index 1a2fd73..77cd8ad 100644 --- a/loba_simple.h +++ b/loba_simple.h @@ -9,7 +9,7 @@ public: ~loba_simple() { } private: - double load_balance(double my_load); + void load_balance(); }; #endif //!LOBA_SIMPLE diff --git a/main.cpp b/main.cpp index 907869e..fc42d75 100644 --- a/main.cpp +++ b/main.cpp @@ -1,11 +1,6 @@ -#include -#include #include -#include #include -#include #include -#include #include #include @@ -24,7 +19,9 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main); #include "misc.h" #include "options.h" #include "process.h" +#include "statistics.h" #include "timer.h" +#include "tracing.h" #include "version.h" namespace { @@ -37,9 +34,13 @@ namespace { EXIT_FAILURE_CLEAN = 0x08, // error at cleanup }; - std::vector loads; - double load_stddev; - double load_avg; + xbt_mutex_t proc_mutex; + xbt_cond_t proc_cond; + unsigned proc_counter; + + struct statistics comps; + struct statistics loads; + } static int simulation_main(int argc, char* argv[]) @@ -48,8 +49,28 @@ static int simulation_main(int argc, char* argv[]) process* proc; try { proc = opt::loba_algorithms.new_instance(opt::loba_algo, argc, argv); + + xbt_mutex_acquire(proc_mutex); + ++proc_counter; + xbt_mutex_release(proc_mutex); + result = proc->run(); - loads.push_back(proc->get_load()); + + xbt_mutex_acquire(proc_mutex); + comps.push(proc->get_comp()); + loads.push(proc->get_real_load()); + + // Synchronization barrier... + // The goal is to circumvent a limitation in SimGrid (at least + // in version 3.5): a process must be alive when another one + // destroys a communication they had together. + + --proc_counter; + xbt_cond_broadcast(proc_cond); + while (proc_counter > 0) + xbt_cond_wait(proc_cond, proc_mutex); + xbt_mutex_release(proc_mutex); + delete proc; } catch (std::invalid_argument& e) { @@ -67,12 +88,12 @@ static void check_for_lost_load() double lost_ratio = 100.0 * lost / total_init; if (lost_ratio < -opt::load_ratio_threshold) CRITICAL2("Gained load at exit! %g (%g%%) <============", - lost, lost_ratio); + -lost, -lost_ratio); else if (lost_ratio > opt::load_ratio_threshold) CRITICAL2("Lost load at exit! %g (%g%%) <============", lost, lost_ratio); else - DEBUG2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio); + VERB2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio); double total_running = process::get_total_load_running(); double running_ratio = 100.0 * total_running / total_init; @@ -83,27 +104,14 @@ static void check_for_lost_load() CRITICAL2("Remaining running load at exit! %g (%g%%) <============", total_running, running_ratio); else - DEBUG2("Running load at exit looks good: %g (%g%%)", + VERB2("Running load at exit looks good: %g (%g%%)", total_running, running_ratio); } -static void compute_load_imbalance() -{ - using std::tr1::bind; - using std::tr1::placeholders::_1; - - unsigned n = loads.size(); - load_avg = std::accumulate(loads.begin(), loads.end(), 0.0) / n; - - std::vector diff(loads); - std::transform(diff.begin(), diff.end(), diff.begin(), - bind(std::minus(), _1, load_avg)); - double epsilon = std::accumulate(diff.begin(), diff.end(), 0.0); - double square_sum = std::inner_product(diff.begin(), diff.end(), - diff.begin(), 0.0); - double variance = (square_sum - (epsilon * epsilon) / n) / n; - load_stddev = sqrt(variance); -} +#define PR_STATS(descr, st) \ + INFO5("| %.*s: %g / %g / %g", 39, \ + descr " total/avg./stddev. at exit.........................", \ + st.get_sum(), st.get_mean(), st.get_stddev()) int main(int argc, char* argv[]) { @@ -175,15 +183,26 @@ int main(int argc, char* argv[]) MSG_launch_application(opt::deployment_file.c_str()); } + // Register tracing categories + TRACE_category(TRACE_CAT_COMP); + TRACE_category(TRACE_CAT_CTRL); + TRACE_category(TRACE_CAT_DATA); + exit_status = EXIT_FAILURE_SIMU; // ===== + proc_mutex = xbt_mutex_init(); + proc_cond = xbt_cond_init(); + proc_counter = 0; + // Launch the MSG simulation. INFO1("Starting simulation at %f...", MSG_get_clock()); res = MSG_main(); simulated_time = MSG_get_clock(); INFO1("Simulation ended at %f.", simulated_time); - check_for_lost_load(); - compute_load_imbalance(); + + xbt_cond_destroy(proc_cond); + xbt_mutex_destroy(proc_mutex); + if (res != MSG_OK) THROW1(0, 0, "MSG_main() failed with status %#x", res); @@ -209,10 +228,13 @@ int main(int argc, char* argv[]) // Report final simulation status. if (simulated_time >= 0.0) { simulation_time.stop(); + check_for_lost_load(); INFO0(",----[ Results ]"); - INFO2("| Load avg./stddev. at exit.: %g / %g", load_avg, load_stddev); - INFO1("| Total simulated time......: %g", simulated_time); - INFO1("| Total simulation time.....: %g", simulation_time.duration()); + PR_STATS("Load", loads); + PR_STATS("Computation", comps); + INFO1("| Total simulated time...................: %g", simulated_time); + INFO1("| Total simulation time..................: %g", + simulation_time.duration()); INFO0("`----"); } if (exit_status) diff --git a/neighbor.cpp b/neighbor.cpp index c45fc8f..7523783 100644 --- a/neighbor.cpp +++ b/neighbor.cpp @@ -8,7 +8,7 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc); // needed to compile neighbor.h #include "neighbor.h" neighbor::neighbor(const char* hostname) - : host((hostdata* )MSG_host_get_data(MSG_get_host_by_name(hostname))) + : host(static_cast(MSG_host_get_data(MSG_get_host_by_name(hostname)))) , load(std::numeric_limits::infinity()) , debt(0.0) , to_send(0.0) diff --git a/neighbor.h b/neighbor.h index 62b9b66..b51dab1 100644 --- a/neighbor.h +++ b/neighbor.h @@ -18,7 +18,6 @@ public: // Getter and setter for load double get_load() const { return load; } void set_load(double amount) { load = amount; } - void add_load(double amount) { load += amount; } // Getter and setter for debt double get_debt() const { return debt; } @@ -27,7 +26,6 @@ public: // Getter and setter for to_send double get_to_send() const { return to_send; } void set_to_send(double amount) { to_send = amount; } - void add_to_send(double amount) { to_send += amount; } // Prints its name and load on given category, with given // priority. If verbose is true, prints debt and to_send too. diff --git a/options.cpp b/options.cpp index fd9b8f7..fd9da4e 100644 --- a/options.cpp +++ b/options.cpp @@ -7,6 +7,8 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main); +#include "deployment.h" +#include "process.h" #include "loba_simple.h" #include "loba_fairstrategy.h" @@ -45,11 +47,16 @@ namespace opt { bool bookkeeping = false; // Application parameters - cost_func comp_cost("1e9, 0"); // fixme: find better defaults - cost_func comm_cost("1, 0"); // fixme: find better defaults - unsigned comp_maxiter = 10; // fixme: find better defaults - unsigned lb_maxiter = comp_maxiter; // fixme: find better defaults - bool exit_on_close = false; + // fixme: find better defaults + cost_func comp_cost("1e9, 0"); + cost_func comm_cost("1, 0"); + double min_iter_duration = 1.0; + + // Parameters for the end of the simulation + unsigned lb_maxiter = 0; + unsigned comp_maxiter = 0; + double time_limit = 0; + bool exit_on_close = true; // Named parameters lists loba_algorithms_type loba_algorithms; @@ -105,7 +112,7 @@ const char* opt_helper::on_off(bool b) const char* opt_helper::descr(const char* str) { - const int descr_width = 35; + const int descr_width = 40; std::string& res = descr_str; res = str; res.resize(descr_width, '.'); @@ -170,7 +177,7 @@ bool opt::parse_args(int* argc, char* argv[]) int c; opterr = 0; - while ((c = getopt(*argc, argv, "a:bc:C:ehi:I:l:L:N:s:T:vV")) != -1) { + while ((c = getopt(*argc, argv, "a:bc:C:ehi:I:l:L:N:s:t:T:vV")) != -1) { switch (c) { case 'a': opt::loba_algo = optarg; @@ -180,10 +187,10 @@ bool opt::parse_args(int* argc, char* argv[]) && result; break; case 'b': - opt::bookkeeping = true; + opt::bookkeeping = !opt::bookkeeping; break; case 'e': - opt::exit_on_close = true; + opt::exit_on_close = !opt::exit_on_close; break; case 'h': opt::help_requested++; @@ -195,12 +202,10 @@ bool opt::parse_args(int* argc, char* argv[]) opt::comm_cost = cost_func(optarg); break; case 'i': - std::istringstream(optarg) >> opt::comp_maxiter; + std::istringstream(optarg) >> opt::lb_maxiter; break; case 'I': - std::istringstream(optarg) >> opt::lb_maxiter; - ERROR0("option -I not implemented yet"); - result = false; + std::istringstream(optarg) >> opt::comp_maxiter; break; case 'l': std::istringstream(optarg) >> opt::log_rate; @@ -212,7 +217,10 @@ bool opt::parse_args(int* argc, char* argv[]) std::istringstream(optarg) >> opt::auto_depl::nhosts; break; case 's': - std::istringstream(optarg) >> communicator::send_count_before_flush; + std::istringstream(optarg) >> opt::min_iter_duration; + break; + case 't': + std::istringstream(optarg) >> opt::time_limit; break; case 'T': opt::auto_depl::topology = optarg; @@ -264,29 +272,29 @@ void opt::print() INFO2("| %s: " format, h.descr(description), value) INFO0(",----[ Simulation parameters ]"); - DESCR("log rate", "%s", h.val_or_string(log_rate, "disabled")); + DESCR("log rate", "%s", h.val_or_string(log_rate, "disabled")); DESCR("platform file", "\"%s\"", platform_file.c_str()); if (auto_depl::enabled) { INFO0("| automatic deployment enabled"); - DESCR("- topology", "%s", auto_depl::topology.c_str()); - DESCR("- number of hosts", "%s", h.val_or_string(auto_depl::nhosts, - "auto")); - DESCR("- initial load", "%s", h.val_or_string(auto_depl::load, - "auto")); + DESCR("- topology", "%s", auto_depl::topology.c_str()); + DESCR("- number of hosts", "%s", h.val_or_string(auto_depl::nhosts, + "auto")); + DESCR("- initial load", "%s", h.val_or_string(auto_depl::load, + "auto")); } else { DESCR("deployment file", "\"%s\"", deployment_file.c_str()); } - DESCR("load balancing algorithm", "%s", loba_algo.c_str()); - DESCR("bookkeeping", "%s", h.on_off(bookkeeping)); - DESCR("computation cost factors", "[%s]", comp_cost.to_string().c_str()); + DESCR("load balancing algorithm", "%s", loba_algo.c_str()); + DESCR("bookkeeping", "%s", h.on_off(bookkeeping)); + DESCR("computation cost factors", "[%s]", comp_cost.to_string().c_str()); DESCR("communication cost factors", "[%s]", comm_cost.to_string().c_str()); - DESCR("maximum number of comp. iterations", "%s", - h.val_or_string(comp_maxiter, "infinity")); + DESCR("minimum duration between iterations", "%g", min_iter_duration); DESCR("maximum number of lb. iterations", "%s", h.val_or_string(lb_maxiter, "infinity")); - DESCR("exit on close", "%s", h.on_off(exit_on_close)); - DESCR("send count before flush", "%d", - communicator::send_count_before_flush); + DESCR("maximum number of comp. iterations", "%s", + h.val_or_string(comp_maxiter, "infinity")); + DESCR("time limit", "%s", h.val_or_string(time_limit, "infinity")); + DESCR("exit on close", "%s", h.on_off(exit_on_close)); INFO0("`----"); #undef DESCR @@ -322,45 +330,55 @@ void opt::usage() std::clog << "\nSimulation parameters\n"; std::clog << o("-l value") - << "print current load every n-th iterations, 0 to disable" - << " (" << opt::log_rate << ")\n"; + << "print current load every n lb iterations, 0 to disable" + << " [" << opt::log_rate << "]\n"; std::clog << o("-v") << "verbose: do not override the default logging parameters\n"; std::clog << "\nAutomatic deployment options\n"; std::clog << o("-T name") << "enable automatic deployment with selected topology" - << " (" << opt::auto_depl::topology << ")\n"; + << " [" << opt::auto_depl::topology << "]\n"; if (opt::help_requested > 1) so_list(opt::topologies); std::clog << o("-L value") << "total load with auto deployment, 0 for number of hosts" - << " (" << opt::auto_depl::load << ")\n"; + << " [" << opt::auto_depl::load << "]\n"; std::clog << o("-N value") - << "number of hosts to use with auto deployment," - << " 0 for max. (" << opt::auto_depl::nhosts << ")\n"; + << "number of hosts to use with auto deployment, 0 for max." + << " [" << opt::auto_depl::nhosts << "]\n"; std::clog << "\nLoad balancing algorithm\n"; std::clog << o("-a name") << "load balancing algorithm" - << " (" << opt::loba_algo << ")\n"; + << " [" << opt::loba_algo << "]\n"; if (opt::help_requested > 1) so_list(opt::loba_algorithms); - std::clog << o("-b") << "enable bookkeeping\n"; + std::clog << o("-b") << "toggle bookkeeping (\"virtual load\")" + << " [" << opt_helper::on_off(opt::bookkeeping) << "]\n"; std::clog << "\nApplication parameters\n"; std::clog << o("-c [fn,...]f0") << "polynomial factors for computation cost" - << " (" << opt::comp_cost.to_string() << ")\n"; + << " [" << opt::comp_cost.to_string() << "]\n"; std::clog << o("-C [fn,...]f0") << "polynomial factors for communication cost" - << " (" << opt::comm_cost.to_string() << ")\n"; - std::clog << o("-e") << "exit on reception of \"close\" message\n"; + << " [" << opt::comm_cost.to_string() << "]\n"; + std::clog << o("-s value") + << "minimum duration between iterations" + << " [" << opt::min_iter_duration << "]\n"; + + std::clog << "\nParameters for the end of the simulation\n"; std::clog << o("-i value") - << "maximum number of comp. iterations, 0 for infinity" - << " (" << opt::comp_maxiter << ")\n"; - std::clog << o("-I value") << "maximum number of lb. iterations, 0 for infinity" - << " (" << opt::lb_maxiter << ")\n"; + << " [" << opt::lb_maxiter << "]\n"; + std::clog << o("-I value") + << "maximum number of comp. iterations, 0 for infinity" + << " [" << opt::comp_maxiter << "]\n"; + std::clog << o("-t value") + << "time limit (simulated time), 0 for infinity" + << " [" << opt::time_limit << "]\n"; + std::clog << o("-e") << "toggle exit on reception of \"close\" message" + << " [" << opt_helper::on_off(opt::exit_on_close) << "]\n"; if (opt::help_requested < 3) return; @@ -376,10 +394,7 @@ void opt::usage() << " proc : messages from base process class\n" << " loba : messages from load-balancer\n"; - std::clog << "\nMiscellaneous low-level parameters\n"; - std::clog << o("-s count") - << "check for finished comm. every `count' send operation" - << " (" << communicator::send_count_before_flush << ")\n"; + // std::clog << "\nMiscellaneous low-level parameters\n"; #undef so_list #undef so diff --git a/options.h b/options.h index 37a4d8c..fe0df82 100644 --- a/options.h +++ b/options.h @@ -3,9 +3,11 @@ #include #include "cost_func.h" -#include "deployment.h" #include "named_object_list.h" -#include "process.h" + +// These classes may use include options.h, so make forward declarations +class deployment_generator; +class process; // Global parameters, shared by all the processes namespace opt { @@ -40,8 +42,12 @@ namespace opt { // Application parameters extern cost_func comp_cost; extern cost_func comm_cost; - extern unsigned comp_maxiter; + extern double min_iter_duration; + + // Parameters for the end of the simulation extern unsigned lb_maxiter; + extern unsigned comp_maxiter; + extern double time_limit; extern bool exit_on_close; // Named parameters lists diff --git a/process.cpp b/process.cpp index 53cbb59..57c0661 100644 --- a/process.cpp +++ b/process.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include #include @@ -11,6 +10,7 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc); #include "misc.h" #include "options.h" +#include "tracing.h" #include "process.h" @@ -20,7 +20,7 @@ double process::total_load_exit = 0.0; process::process(int argc, char* argv[]) { - if (argc < 2 || !(std::istringstream(argv[1]) >> load)) + if (argc < 2 || !(std::istringstream(argv[1]) >> real_load)) throw std::invalid_argument("bad or missing initial load parameter"); neigh.assign(argv + 2, argv + argc); @@ -33,15 +33,19 @@ process::process(int argc, char* argv[]) rev_neigh.insert(std::make_pair(host, ptr)); } - prev_load_broadcast = -1; // force sending of load on first send() - expected_load = load; - total_load_running += load; - total_load_init += load; + comp = 0.0; + + prev_load_broadcast = -1; // force sending of load on first send_all() + expected_load = real_load; + total_load_running += real_load; + total_load_init += real_load; ctrl_close_pending = data_close_pending = neigh.size(); close_received = false; finalizing = false; + comp_iter = lb_iter = 0; + e_xbt_log_priority_t logp = xbt_log_priority_verbose; if (!LOG_ISENABLED(logp)) return; @@ -60,70 +64,101 @@ process::process(int argc, char* argv[]) process::~process() { - total_load_exit += load; + total_load_exit += real_load; + if (opt::bookkeeping) { + INFO4("Final load after %d:%d iterations: %g ; expected: %g", + lb_iter, comp_iter, real_load, expected_load); + } else { + INFO2("Final load after %d iterations: %g", + lb_iter, real_load); + if (lb_iter != comp_iter) + WARN2("lb_iter (%d) and comp_iter (%d) differ!", + lb_iter, comp_iter); + } + VERB1("Total computation for this process: %g", comp); } int process::run() { - INFO1("Initial load: %g", load); + double next_iter_after_date = 0.0; + INFO1("Initial load: %g", real_load); VERB0("Starting..."); - comp_iter = lb_iter = 0; while (true) { - if (load > 0.0) { - ++comp_iter; - if (opt::log_rate && comp_iter % opt::log_rate == 0) { + if (get_load() > 0.0) { + double now = MSG_get_clock(); + if (now < next_iter_after_date) + MSG_process_sleep(next_iter_after_date - now); + next_iter_after_date = MSG_get_clock() + opt::min_iter_duration; + + ++lb_iter; + + if (opt::log_rate && lb_iter % opt::log_rate == 0) { if (opt::bookkeeping) INFO4("(%u:%u) current load: %g ; expected: %g", - comp_iter, lb_iter, load, expected_load); + lb_iter, comp_iter, real_load, expected_load); else INFO2("(%u) current load: %g", - comp_iter, load); + lb_iter, real_load); } - if (opt::bookkeeping) - expected_load -= load_balance(expected_load); - else - load -= load_balance(load); + load_balance(); print_loads(true, xbt_log_priority_debug); + } - send(); + // send load information, and load (data) if any + send_all(); + if (real_load > 0.0) { + ++comp_iter; compute(); - - } else { - // send load information, and load when bookkeeping - send(); } - if (opt::comp_maxiter && comp_iter >= opt::comp_maxiter) + if (opt::lb_maxiter && lb_iter >= opt::lb_maxiter) { + VERB2("Reached lb_maxiter: %d/%d", lb_iter, opt::lb_maxiter); + break; + } + if (opt::comp_maxiter && comp_iter >= opt::comp_maxiter) { + VERB2("Reached comp_maxiter: %d/%d", comp_iter, opt::comp_maxiter); break; - if (opt::lb_maxiter && lb_iter >= opt::lb_maxiter) + } + if (opt::time_limit && MSG_get_clock() >= opt::time_limit) { + VERB2("Reached time limit: %g/%g", MSG_get_clock(), opt::time_limit); break; + } // block on receiving unless there is something to compute or // to send - bool wait = (load == 0 && - ((opt::bookkeeping ? expected_load : load) - == prev_load_broadcast)); - receive(wait); + double timeout; + if (real_load != 0 || get_load() != prev_load_broadcast) + timeout = 0.0; + else if (opt::min_iter_duration) + timeout = opt::min_iter_duration; + else + timeout = 1.0; + receive(timeout); // one of our neighbor is finalizing - if (opt::exit_on_close && close_received) + if (opt::exit_on_close && close_received) { + VERB0("Close received"); break; + } // have no load and cannot receive anything - if (load == 0.0 && !may_receive()) + if (real_load == 0.0 && !may_receive()) { + VERB0("I'm a poor lonesome process, and I have no load..."); break; + } // fixme: this check should be implemented with a distributed // algorithm, and not a shared global variable! // fixme: should this chunk be moved before call to receive() ? if (100.0 * total_load_running / total_load_init <= opt::load_ratio_threshold) { - VERB0("No more load to balance in system, stopping."); + VERB0("No more load to balance in system."); break; + } else { + DEBUG1("still %g load to balance, continuing...", total_load_running); } - } VERB0("Going to finalize..."); finalize(); @@ -135,48 +170,41 @@ int process::run() */ VERB0("Done."); - INFO3("Final load after %d iteration%s: %g", - comp_iter, ESSE(comp_iter), load); - if (opt::bookkeeping) - INFO1("Expected load: %g", expected_load); return 0; } -double process::sum_of_to_send() const -{ - using std::tr1::bind; - using std::tr1::placeholders::_1; - using std::tr1::placeholders::_2; - - return std::accumulate(neigh.begin(), neigh.end(), 0.0, - bind(std::plus(), - _1, bind(&neighbor::get_to_send, _2))); -} - -double process::load_balance(double /*my_load*/) +void process::load_balance() { if (lb_iter == 1) // warn only once - WARN0("process::load_balance is a no-op!"); - return 0.0; + WARN0("process::load_balance() is a no-op!"); } void process::compute() { - if (load > 0.0) { - double duration = opt::comp_cost(load); - m_task_t task = MSG_task_create("computation", duration, 0.0, NULL); - DEBUG2("compute %g flop%s", duration, ESSE(duration)); + if (real_load > 0.0) { + double flops = opt::comp_cost(real_load); + m_task_t task = MSG_task_create("computation", flops, 0.0, NULL); + TRACE_msg_set_task_category(task, TRACE_CAT_COMP); + DEBUG2("compute %g flop%s", flops, ESSE(flops)); MSG_task_execute(task); + comp += flops; MSG_task_destroy(task); } else { DEBUG0("nothing to compute !"); } } +void process::send(neighbor& nb, double amount) +{ + set_load(get_load() - amount); + nb.set_to_send(nb.get_to_send() + amount); + nb.set_load(nb.get_load() + amount); // fixme: make this optional? +} + void process::send1_no_bookkeeping(neighbor& nb) { - if (load != prev_load_broadcast) - comm.send(nb.get_ctrl_mbox(), new message(message::INFO, load)); + if (real_load != prev_load_broadcast) + comm.send(nb.get_ctrl_mbox(), new message(message::INFO, real_load)); double load_to_send = nb.get_to_send(); if (load_to_send > 0.0) { comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send)); @@ -200,20 +228,20 @@ void process::send1_bookkeeping(neighbor& nb) } else { new_debt = nb.get_debt(); } - if (load <= new_debt) { - load_to_send = load; + if (real_load <= new_debt) { + load_to_send = real_load; nb.set_debt(new_debt - load_to_send); - load = 0.0; + real_load = 0.0; } else { load_to_send = new_debt; nb.set_debt(0.0); - load -= load_to_send; + real_load -= load_to_send; } if (load_to_send > 0.0) comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send)); } -void process::send() +void process::send_all() { using std::tr1::bind; using std::tr1::placeholders::_1; @@ -225,16 +253,18 @@ void process::send() } else { std::for_each(neigh.begin(), neigh.end(), bind(&process::send1_no_bookkeeping, this, _1)); - prev_load_broadcast = load; + prev_load_broadcast = real_load; } + comm.flush(false); } -void process::receive(bool wait) +void process::receive(double timeout) { message* msg; m_host_t from; - while (may_receive() && comm.recv(msg, from, wait)) { + DEBUG2("%sblocking receive (%g)", "\0non-" + !timeout, timeout); + while (may_receive() && comm.recv(msg, from, timeout)) { switch (msg->get_type()) { case message::INFO: { neighbor* n = rev_neigh[from]; @@ -246,7 +276,7 @@ void process::receive(bool wait) break; case message::LOAD: { double ld = msg->get_amount(); - load += ld; + real_load += ld; if (finalizing) total_load_running -= ld; break; @@ -261,8 +291,9 @@ void process::receive(bool wait) break; } delete msg; - wait = false; // only wait on first recv + timeout = 0.0; // only wait on first recv } + comm.flush(false); } void process::finalize1(neighbor& nb) @@ -277,17 +308,19 @@ void process::finalize() using std::tr1::placeholders::_1; finalizing = true; - total_load_running -= load; + total_load_running -= real_load; DEBUG2("send CLOSE to %lu neighbor%s", (unsigned long )neigh.size(), ESSE(neigh.size())); std::for_each(neigh.begin(), neigh.end(), bind(&process::finalize1, this, _1)); - DEBUG2("wait for CLOSE from %lu neighbor%s", - (unsigned long )neigh.size(), ESSE(neigh.size())); - while (may_receive()) - receive(true); + while (may_receive()) { + comm.flush(false); + DEBUG2("waiting for %d CTRL and %d DATA CLOSE", + ctrl_close_pending, data_close_pending); + receive(-1.0); + } comm.flush(true); } diff --git a/process.h b/process.h index b15f982..24d4c39 100644 --- a/process.h +++ b/process.h @@ -16,6 +16,7 @@ #include #include "communicator.h" #include "neighbor.h" +#include "options.h" class process { public: @@ -26,7 +27,8 @@ public: process(int argc, char* argv[]); virtual ~process(); - double get_load() const { return load; } + double get_comp() const { return comp; } + double get_real_load() const { return real_load; } int run(); @@ -37,8 +39,14 @@ protected: pneigh_type pneigh; // list of pointers to neighbors that // we are free to reorder - // Returns the sum of "to_send" for all neighbors. - double sum_of_to_send() const; + // Get and set current load, which may be real load, or expected + // load if opt::bookkeeping is true. + double get_load() const; + void set_load(double load); + + // Register some amount of load to send to given neighbor. + void send(neighbor& nb, double amount); + void send(neighbor* nb, double amount) { send(*nb, amount); } // Calls neighbor::print(verbose, logp, cat) for each member of neigh. void print_loads(bool verbose = false, @@ -52,7 +60,7 @@ protected: private: static double total_load_init; // sum of process loads at init - static double total_load_running; // summ of loads while running + static double total_load_running; // sum of loads while running static double total_load_exit; // sum of process loads at exit typedef MAP_TEMPLATE rev_neigh_type; @@ -71,16 +79,15 @@ private: unsigned lb_iter; // counter of load-balancing iterations unsigned comp_iter; // counter of computation iterations + double comp; // total computing done so far (flops) + double prev_load_broadcast; // used to ensure that we do not send // a same information messages - double load; // current load + double real_load; // current load double expected_load; // expected load in bookkeeping mode // The load balancing algorithm comes here... - // Parameter "my_load" is the load to take into account for myself - // (may be load or expected load). - // Returns the total load sent to neighbors. - virtual double load_balance(double my_load); + virtual void load_balance(); // Virtually do some computation void compute(); @@ -88,13 +95,17 @@ private: // Send procedures, with helpers for bookkeeping mode or not void send1_no_bookkeeping(neighbor& nb); void send1_bookkeeping(neighbor& nb); - void send(); + void send_all(); // Returns true if there remains neighbors to listen for - bool may_receive() { return ctrl_close_pending || data_close_pending; } + bool may_receive() const { + return ctrl_close_pending || data_close_pending; + } - // Receive procedure: wait (or not) for a message to come - void receive(bool wait); + // Receive procedure + // Parameter "timeout" may be 0 for non-blocking operation, -1 for + // infinite waiting, or any positive timeout. + void receive(double timeout); // Finalize sends a "close" message to each neighbor and wait for // all of them to answer. @@ -102,6 +113,24 @@ private: void finalize(); }; +inline +double process::get_load() const +{ + if (opt::bookkeeping) + return expected_load; + else + return real_load; +} + +inline +void process::set_load(double load) +{ + if (opt::bookkeeping) + expected_load = load; + else + real_load = load; +} + #endif // !PROCESS_H // Local variables: diff --git a/statistics.h b/statistics.h new file mode 100644 index 0000000..7e3fb2e --- /dev/null +++ b/statistics.h @@ -0,0 +1,41 @@ +#ifndef STATISTICS_H +#define STATISTICS_H + +#include +#include + +class statistics { +public: + statistics() + : count(0) + , sum(0.0) + , mean(0.0) + , sqdiff_sum(0.0) + { } + + void push(double x) { + double delta = x - mean; + ++count; + sum += x; + mean = sum / count; + sqdiff_sum += delta * (x - mean); + } + + unsigned get_count() const { return count; } + double get_sum() const { return sum; } + double get_mean() const { return mean; } + double get_variance() const { return sqdiff_sum / count; } + double get_stddev() const { return sqrt(get_variance()); } + +private: + int count; + double sum; // sum of x_i + double mean; // mean of x_i + double sqdiff_sum; // sum of (x_i - mean)^2 +}; + +#endif // !STATISTICS_H + +// Local variables: +// mode: c++ +// End: diff --git a/tracing.h b/tracing.h new file mode 100644 index 0000000..f283465 --- /dev/null +++ b/tracing.h @@ -0,0 +1,12 @@ +#ifndef TRACING_H +#define TRACING_H + +#define TRACE_CAT_COMP "comp_task" +#define TRACE_CAT_CTRL "ctrl_mesg" +#define TRACE_CAT_DATA "data_mesg" + +#endif // !TRACING_H + +// Local variables: +// mode: c++ +// End: diff --git a/valgrind_suppressions_3.5 b/valgrind_suppressions_3.5 index 0baec30..e1e2552 100644 --- a/valgrind_suppressions_3.5 +++ b/valgrind_suppressions_3.5 @@ -8,3 +8,20 @@ fun:MSG_create_environment ... } + +{ + Memory leak in libc? + Memcheck:Leak + fun:malloc + fun:_dl_map_object_deps + fun:dl_open_worker + fun:_dl_catch_error + fun:_dl_open + fun:do_dlopen + fun:_dl_catch_error + fun:dlerror_run + fun:__libc_dlopen_mode + fun:init + fun:pthread_once + fun:backtrace +} diff --git a/version.cpp b/version.cpp index 344386d..6b6889a 100644 --- a/version.cpp +++ b/version.cpp @@ -14,7 +14,7 @@ namespace version { (__DATE__ " " __TIME__); const std::string copyright - ("Copyright (c) 2010, Arnaud Giersch "); + ("Copyright (c) 2010-2011, Arnaud Giersch "); }