*.d
*.o
-cachegrind.out.*
+callgrind.out.*
gmon.out
core
misc_autogen.h
loba
+loba-dev
+loba-stable
simple_async
+*_dev.xml
+
simgrid-dev
simgrid-stable
*/
--- /dev/null
+========================================================================
+Il semblerait qu'il y ait un bug dans SG 3.5, et qu'on ne puisse pas
+utiliser MSG_comm_waitany() pour l'émetteur *et* le récepteur sans
+risquer d'interblocage.
+
+Le problème devrait être contourné correctement depuis le commit
+cd6b253 Use MSG_comm_waitall for communicator::flush(true).
+
+========================================================================
+Avec SG 3.5, les communications doivent être détruites dès que
+possible avec MSG_comm_destroy(). Si ce n'est pas fait, la simulation
+peut être extrêmement ralentie.
+
+Le problème devrait être contourné correctement depuis le commit
+404a8d5 Do not call flush automatically in communcator::send...
+
+========================================================================
#DEBUG_FLAGS += -pg
CHECK_FLAGS += -Wall -Wextra
+CC := gcc
CXX := g++
CPPFLAGS += -I $(SIMGRID_INSTALL_DIR)/include
CPPFLAGS += $(CHECK_FLAGS)
+#CFLAGS += -std=c99
+#CFLAGS += -fgnu89-inline # workaround simgrid bug
+CFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS)
+
#CXXFLAGS += -std=c++0x
CXXFLAGS += $(OPTIM_FLAGS) $(DEBUG_FLAGS)
TARGETS := $(DEFAULT_TARGETS) \
simple_async
-.PHONY: all full clean realclean $(FLAVOURED_LOBA)
+XML_FILES = \
+ Dep.xml Plat.xml \
+ platform.xml deployment.xml simple_async.xml \
+ cluster1000.xml
+
+XML_DEV_FILES = $(XML_FILES:%.xml=%_dev.xml)
+
+.PHONY: all full xml clean realclean $(FLAVOURED_LOBA)
all: $(DEFAULT_TARGETS)
full:
- $(MAKE) $(FLAVOURED_LOBA)
- $(MAKE) $(TARGETS)
+ @for target in $(FLAVOURED_LOBA); do \
+ echo $(MAKE) "$$target"; \
+ $(MAKE) "$$target"; \
+ done
+ $(MAKE) xml $(DEFAULT_TARGETS)
+
+xml: $(XML_DEV_FILES)
clean:
$(RM) core core.[0-9]* vgcore.[0-9]*
realclean: clean
$(RM) $(FLAVOURED_LOBA)
+ $(RM) $(XML_DEV_FILES)
$(RM) *~
%.d: %.cpp ; $(MAKEDEPEND.CXX)
+%_dev.xml: %.xml
+ sed '/DOCTYPE/s,simgrid.dtd,http://simgrid.gforge.inria.fr/&,' $< > $@
+
$(FLAVOURED_LOBA):
$(MAKE) clean
$(MAKE) SIMGRID_INSTALL_DIR=./simgrid-$(patsubst loba-%,%,$@) loba
* Compilation de SimGrid
* Compilation...
* Utilisation
+* Tracé de courbes
* Communications
* Pour ajouter un nouvel algorithme d'équilibrage
* Pour ajouter une nouvelle option au programme
Pour plus de détail sur les options de logging :
http://simgrid.gforge.inria.fr/doc/group__XBT__log.html#log_use
+Tracé de courbes
+================
+
+Le script extract.pl permet d'extraire les données à partir des traces
+de simulation et de le présenter sous un format acceptable par gnuplot
+ou par graph (plotutils).
+
+Exemple:
+ ./loba platform.xml 2>&1 | ./extract.pl | graph -CTX
+
Communications
==============
- définir une nouvelle classe dérivant de process
- attention, il faut construire le process explicitement
- redéfinir la méthode load_balance qui :
- - reçoit en paramètre la charge à prendre en compte ;
+ - peut récupérer la charge courante avec get_load()
- peut utiliser et éventuellement réordonner le tableau process::pneigh ;
- peut récupérer l'information de charge d'un voisin avec
pneigh[i]->get_load() ;
- définit la charge à envoyer avec
- pneigh[i]->set_to_send(quantité) ;
- - retourne la somme des quantités définies avec set_to_send,
- éventuellement à l'aide de la méthode process::sum_of_to_send()
- qui clacule cette somme.
+ send(pneigh[i], quantité) ;
2. Ajouter l'algorithme dans la liste des options. Dans options.cpp :
- faire le #include adéquat ;
loba_simple.h équilibrage simple
loba_simple.cpp (à imiter pour ajouter d'autres algorithmes)
+ loba_*.{h,cpp} autres algos d'équilibrage
+
main.cpp le programme principal
misc.h divers trucs inclassables
version.h gestion de la version du programme
version.cpp
+* fichiers auto-générés
+
+ misc_autogen.h définition des macros XCLOG(...)
+
+* scripts
+
+ colorized-loba script pour exécuter loba en colorant les
+ sorties
+
+ extract.pl outil d'extraction des données à partir des
+ traces, pour tracer des courbes
+
+ setlocalversion calcule un numéro de version à partir du hash
+ du dernier commit (git)
+
* autres fichiers
.gitignore liste des fichiers ignorés par git
+* review receive with timeout.
+
* verify bookkeeping version.
-* add options -j/-J : minimum number of iterations ?
+* add several metrics
+ - message exchanges : number/volume of sent/received data/ctrl messages
+
+* add options -j/-J : minimum number of iterations?
-* add a variant to (not) change neighbor load information at send.
+* add a variant to (not) change neighbor load information at send?
* implement loba_* algorithms (start with some trivial one)
#include "misc.h"
#include "options.h"
+#include "simgrid_features.h"
+#include "tracing.h"
#include "communicator.h"
return oss.str();
}
-int communicator::send_count_before_flush = 4;
-
communicator::communicator()
- : host((hostdata* )MSG_host_get_data(MSG_host_self()))
+ : host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
, mutex(xbt_mutex_init())
, cond(xbt_cond_init())
- , send_counter(0)
, ctrl_task(NULL)
, ctrl_comm(NULL)
, data_task(NULL)
receiver_process =
MSG_process_create("receiver", communicator::receiver_wrapper,
this, MSG_host_self());
+ xbt_cond_wait(cond, mutex); // wait for the receiver to be ready
xbt_mutex_release(mutex);
}
if (msg->get_type() == message::LOAD)
msg_size += opt::comm_cost(msg->get_amount());
m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);
+ TRACE_msg_set_task_category(task,
+ msg->get_type() == message::LOAD ?
+ TRACE_CAT_DATA : TRACE_CAT_CTRL);
msg_comm_t comm = MSG_task_isend(task, dest);
sent_comm.push_back(comm);
-
- if (++send_counter >= send_count_before_flush) {
- flush(false);
- send_counter = 0;
- }
}
-bool communicator::recv(message*& msg, m_host_t& from, bool wait)
+bool communicator::recv(message*& msg, m_host_t& from, double timeout)
{
- if (wait) {
+ if (timeout != 0) {
+ volatile double deadline =
+ timeout > 0 ? MSG_get_clock() + timeout : 0.0;
xbt_mutex_acquire(mutex);
- while (received.empty()) {
+ while (received.empty() && (!deadline || deadline > MSG_get_clock())) {
+ xbt_ex_t e;
DEBUG0("waiting for a message to come");
- xbt_cond_wait(cond, mutex);
+ TRY {
+ if (deadline)
+ xbt_cond_timedwait(cond, mutex, deadline - MSG_get_clock());
+ else
+ xbt_cond_wait(cond, mutex);
+ }
+ CATCH (e) {
+ if (e.category != timeout_error)
+ RETHROW;
+ xbt_ex_free(e);
+ }
}
xbt_mutex_release(mutex);
}
m_task_t task = received.front();
received.pop();
- msg = (message* )MSG_task_get_data(task);
+ msg = static_cast<message*>(MSG_task_get_data(task));
from = MSG_task_get_source(task);
MSG_task_destroy(task);
void communicator::flush(bool wait)
{
- using std::tr1::bind;
- using std::tr1::placeholders::_1;
-
sent_comm.remove_if(comm_test_n_destroy);
if (wait && !sent_comm.empty()) {
- xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
- while (!sent_comm.empty()) {
- std::for_each(sent_comm.begin(), sent_comm.end(),
- bind(xbt_dynar_push,
- comms, bind(misc::address<msg_comm_t>(), _1)));
- MSG_comm_waitany(comms);
- xbt_dynar_reset(comms);
- sent_comm.remove_if(comm_test_n_destroy);
- }
- xbt_dynar_free(&comms);
+ msg_comm_t comms[sent_comm.size()];
+ std::copy(sent_comm.begin(), sent_comm.end(), comms);
+ MSG_comm_waitall(comms, sent_comm.size(), -1.0);
+ if (!MSG_WAIT_DESTROYS_COMMS)
+ std::for_each(sent_comm.begin(), sent_comm.end(), MSG_comm_destroy);
+ sent_comm.clear();
}
}
int communicator::receiver_wrapper(int, char* [])
{
communicator* comm;
- comm = (communicator* )MSG_process_get_data(MSG_process_self());
+ comm = static_cast<communicator*>(MSG_process_get_data(MSG_process_self()));
int result = comm->receiver();
DEBUG0("terminate");
{
ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
data_comm = MSG_task_irecv(&data_task, get_data_mbox());
+ DEBUG0("receiver ready");
+ xbt_mutex_acquire(mutex);
+ xbt_cond_signal(cond); // signal master that we are ready
+ xbt_mutex_release(mutex);
+
xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
while (ctrl_comm || data_comm) {
if (ctrl_comm && comm_test_n_destroy(ctrl_comm)) {
if (strcmp(MSG_task_get_name(ctrl_task), "finalize")) {
DEBUG0("received message from ctrl");
+ xbt_mutex_acquire(mutex);
received.push(ctrl_task);
+ xbt_mutex_release(mutex);
ctrl_task = NULL;
ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
} else {
if (data_comm && comm_test_n_destroy(data_comm)) {
if (strcmp(MSG_task_get_name(data_task), "finalize")) {
DEBUG0("received message from data");
+ xbt_mutex_acquire(mutex);
received.push(data_task);
+ xbt_mutex_release(mutex);
data_task = NULL;
data_comm = MSG_task_irecv(&data_task, get_data_mbox());
} else {
}
}
xbt_mutex_acquire(mutex);
- xbt_cond_signal(cond);
+ if (!received.empty())
+ xbt_cond_signal(cond);
xbt_mutex_release(mutex);
}
xbt_dynar_free(&comms);
#include <msg/msg.h>
#include "hostdata.h"
-// Cannot include "options.h" without error, so only declare the
-// needed functions.
-namespace opt {
- bool parse_args(int* argc, char* argv[]);
- void print();
- void usage();
-}
-
class message {
public:
enum message_type { INFO, CREDIT, LOAD, CTRL_CLOSE, DATA_CLOSE };
void send(const char* dest, message* msg);
// Try to get a message. Returns true on success.
- // If "wait" is true, blocks until success.
- bool recv(message*& msg, m_host_t& from, bool wait);
+ // Parameter "timeout" may be 0 for non-blocking operation, -1 for
+ // infinite waiting, or any positive timeout.
+ bool recv(message*& msg, m_host_t& from, double timeout);
// Try to flush pending sending communications.
// If "wait" is true, blocks until success.
// List of pending send communications
std::list<msg_comm_t> sent_comm;
- static int send_count_before_flush;
- int send_counter;
// Queue of received messages
std::queue<m_task_t> received;
// Used to test if a communication is over, and to destroy it if it is
static bool comm_test_n_destroy(msg_comm_t comm);
-
- // Make opt::* functions our friends to provide them an access to
- // send_count_before_flush
- friend bool opt::parse_args(int*, char* []);
- friend void opt::print();
- friend void opt::usage();
};
#endif // !COMMUNICATOR_H
--- /dev/null
+#!/usr/bin/env perl
+
+use strict;
+use warnings;
+
+my $bookkeeping;
+my $flt = '[+-]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?';
+my $pflt = "($flt)";
+my $prefix = '^\[([^: ]+)(?::loba:\(\d+\))? ' . $pflt . '\] \[proc/INFO\] ';
+my $initmatch = $prefix . 'Initial load: ' . $pflt . '';
+my $finalmatch;
+my $plainmatch;
+
+my %alldata = ();
+
+while (<>) {
+ chomp;
+ if (s{^(?:\[0\.0+\] )?\[main/INFO\] \| bookkeeping\.*: }{}) {
+ $bookkeeping = $_ eq "on";
+ if ($bookkeeping) {
+ $finalmatch = $prefix .
+ 'Final load after (\d+):(\d+) iterations: ' . $pflt .
+ ' ; expected: ' . $pflt;
+ $plainmatch = $prefix .
+ '\((\d+):(\d+)\) current load: ' . $pflt .
+ ' ; expected: ' . $pflt;
+ } else {
+ $finalmatch = $prefix .
+ 'Final load after (\d+) iterations: ' . $pflt;
+ $plainmatch = $prefix . '\((\d+)\) current load: ' . $pflt;
+ }
+ if (0) {
+ print STDERR "BOOKKEEPING: \"$_\" ($bookkeeping)\n";
+ print STDERR "INITMATCH..: \"$initmatch\"\n";
+ print STDERR "PLAINMATCH.: \"$plainmatch\"\n";
+ print STDERR "FINALMATCH.: \"$finalmatch\"\n";
+ }
+ }
+ next if not defined $bookkeeping;
+ if (m{$plainmatch}) {
+ my $host = $1;
+ my $data;
+ if ($bookkeeping) {
+ $data = {
+ time => $2,
+ lb => $3,
+ comp => $4,
+ load => $5,
+ expected => $6,
+ };
+ } else {
+ $data = {
+ time => $2,
+ lb => $3,
+ comp => $3,
+ load => $4,
+ expected => $4,
+ };
+ }
+# print STDERR "PUSH $host $data->{time} $data->{load} (plain)\n";
+ push @{$alldata{$host}}, $data;
+ } if (m{$initmatch}) {
+ my $host = $1;
+ my $data = {
+ time => $2,
+ lb => 0,
+ comp => 0,
+ load => $3,
+ expected => $3,
+ };
+# print STDERR "PUSH $host $data->{time} $data->{load} (init)\n";
+ push @{$alldata{$host}}, $data;
+ } elsif (m{$finalmatch}) {
+ my $host = $1;
+ my $data;
+ if ($bookkeeping) {
+ $data = {
+ time => $2,
+ lb => $3,
+ comp => $4,
+ load => $5,
+ expected => $6,
+ };
+ } else {
+ $data = {
+ time => $2,
+ lb => $3,
+ comp => $3,
+ load => $4,
+ expected => $4,
+ };
+ }
+# print STDERR "PUSH $host $data->{time} $data->{load} (final)\n";
+ push @{$alldata{$host}}, $data;
+ }
+}
+
+foreach my $host (sort(keys %alldata)) {
+# print STDERR "GOT \"$host\"\n";
+ my $datalist = $alldata{$host};
+ print "# $host\n";
+ foreach my $data (@{$datalist}) {
+ print "$data->{time} $data->{load}\n";
+ }
+ print "\n"
+}
*/
class compare {
-public :
- bool operator()(const neighbor*a, const neighbor*b) {
- return a->get_load()>b->get_load();
+public:
+ bool operator()(const neighbor*a, const neighbor*b) {
+ return a->get_load() > b->get_load();
}
};
-double loba_fairstrategy::load_balance(double my_load)
+void loba_fairstrategy::load_balance()
{
std::sort(pneigh.begin(), pneigh.end(), compare());
- //print_loads_p();
+ // print_loads_p();
+ //print_loads_p(false, xbt_log_priority_debug);
- double sum_sent=0;
- bool found=true;
-
- while(found) {
- found=false;
+ bool found = true;
+
+ while (found) {
+ found = false;
for (unsigned i = 0 ; i < pneigh.size() ; ++i) {
- double l = pneigh[i]->get_load();
- if (l >= my_load)
- continue;
- if (l < my_load+2) {
- found=true;
- pneigh[i]->add_load(1);
- pneigh[i]->add_to_send(1);
- //INFO1("sent to %s",pneigh[i]->get_name());
- my_load--;
- sum_sent++;
+ if (pneigh[i]->get_load() <= get_load() - 2) {
+ found = true;
+ send(pneigh[i], 1);
+ DEBUG1("sent to %s", pneigh[i]->get_name());
}
}
}
-
- return sum_sent;
}
// Local variables:
~loba_fairstrategy() { }
private:
- double load_balance(double my_load);
+ void load_balance();
};
#endif //!LOBA_SIMPLE
* load balance with a least-loaded neighbor,
* without breaking the ping-pong condition
*/
-double loba_simple::load_balance(double my_load)
+void loba_simple::load_balance()
{
int imin = -1;
int imax = -1;
- double min = my_load;
+ double min = get_load();
double max = -1.0;
for (unsigned i = 0 ; i < pneigh.size() ; ++i) {
double l = pneigh[i]->get_load();
- if (l >= my_load)
+ if (l >= get_load())
continue;
if (l < min) {
imin = i;
}
if (imin != -1) {
// found someone
- double balance = (my_load - min) / 2;
- DEBUG6("%d:%g %d:%g %g %g", imin, min, imax, max, my_load, balance);
- pneigh[imin]->set_to_send(balance);
- pneigh[imin]->add_load(balance);
- return balance;
- } else {
- return 0.0;
+ double balance = (get_load() - max) / 2;
+ DEBUG6("%d:%g %d:%g %g %g", imin, min, imax, max, get_load(), balance);
+ send(pneigh[imin], balance);
}
}
~loba_simple() { }
private:
- double load_balance(double my_load);
+ void load_balance();
};
#endif //!LOBA_SIMPLE
-#include <algorithm>
-#include <cmath>
#include <cstring>
-#include <tr1/functional>
#include <iostream>
-#include <numeric>
#include <stdexcept>
-#include <vector>
#include <msg/msg.h>
#include <xbt/log.h>
#include "misc.h"
#include "options.h"
#include "process.h"
+#include "statistics.h"
#include "timer.h"
+#include "tracing.h"
#include "version.h"
namespace {
EXIT_FAILURE_CLEAN = 0x08, // error at cleanup
};
- std::vector<double> loads;
- double load_stddev;
- double load_avg;
+ xbt_mutex_t proc_mutex;
+ xbt_cond_t proc_cond;
+ unsigned proc_counter;
+
+ struct statistics comps;
+ struct statistics loads;
+
}
static int simulation_main(int argc, char* argv[])
process* proc;
try {
proc = opt::loba_algorithms.new_instance(opt::loba_algo, argc, argv);
+
+ xbt_mutex_acquire(proc_mutex);
+ ++proc_counter;
+ xbt_mutex_release(proc_mutex);
+
result = proc->run();
- loads.push_back(proc->get_load());
+
+ xbt_mutex_acquire(proc_mutex);
+ comps.push(proc->get_comp());
+ loads.push(proc->get_real_load());
+
+ // Synchronization barrier...
+ // The goal is to circumvent a limitation in SimGrid (at least
+ // in version 3.5): a process must be alive when another one
+ // destroys a communication they had together.
+
+ --proc_counter;
+ xbt_cond_broadcast(proc_cond);
+ while (proc_counter > 0)
+ xbt_cond_wait(proc_cond, proc_mutex);
+ xbt_mutex_release(proc_mutex);
+
delete proc;
}
catch (std::invalid_argument& e) {
double lost_ratio = 100.0 * lost / total_init;
if (lost_ratio < -opt::load_ratio_threshold)
CRITICAL2("Gained load at exit! %g (%g%%) <============",
- lost, lost_ratio);
+ -lost, -lost_ratio);
else if (lost_ratio > opt::load_ratio_threshold)
CRITICAL2("Lost load at exit! %g (%g%%) <============",
lost, lost_ratio);
else
- DEBUG2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio);
+ VERB2("Total load at exit looks good: %g (%g%%)", lost, lost_ratio);
double total_running = process::get_total_load_running();
double running_ratio = 100.0 * total_running / total_init;
CRITICAL2("Remaining running load at exit! %g (%g%%) <============",
total_running, running_ratio);
else
- DEBUG2("Running load at exit looks good: %g (%g%%)",
+ VERB2("Running load at exit looks good: %g (%g%%)",
total_running, running_ratio);
}
-static void compute_load_imbalance()
-{
- using std::tr1::bind;
- using std::tr1::placeholders::_1;
-
- unsigned n = loads.size();
- load_avg = std::accumulate(loads.begin(), loads.end(), 0.0) / n;
-
- std::vector<double> diff(loads);
- std::transform(diff.begin(), diff.end(), diff.begin(),
- bind(std::minus<double>(), _1, load_avg));
- double epsilon = std::accumulate(diff.begin(), diff.end(), 0.0);
- double square_sum = std::inner_product(diff.begin(), diff.end(),
- diff.begin(), 0.0);
- double variance = (square_sum - (epsilon * epsilon) / n) / n;
- load_stddev = sqrt(variance);
-}
+#define PR_STATS(descr, st) \
+ INFO5("| %.*s: %g / %g / %g", 39, \
+ descr " total/avg./stddev. at exit.........................", \
+ st.get_sum(), st.get_mean(), st.get_stddev())
int main(int argc, char* argv[])
{
MSG_launch_application(opt::deployment_file.c_str());
}
+ // Register tracing categories
+ TRACE_category(TRACE_CAT_COMP);
+ TRACE_category(TRACE_CAT_CTRL);
+ TRACE_category(TRACE_CAT_DATA);
+
exit_status = EXIT_FAILURE_SIMU; // =====
+ proc_mutex = xbt_mutex_init();
+ proc_cond = xbt_cond_init();
+ proc_counter = 0;
+
// Launch the MSG simulation.
INFO1("Starting simulation at %f...", MSG_get_clock());
res = MSG_main();
simulated_time = MSG_get_clock();
INFO1("Simulation ended at %f.", simulated_time);
- check_for_lost_load();
- compute_load_imbalance();
+
+ xbt_cond_destroy(proc_cond);
+ xbt_mutex_destroy(proc_mutex);
+
if (res != MSG_OK)
THROW1(0, 0, "MSG_main() failed with status %#x", res);
// Report final simulation status.
if (simulated_time >= 0.0) {
simulation_time.stop();
+ check_for_lost_load();
INFO0(",----[ Results ]");
- INFO2("| Load avg./stddev. at exit.: %g / %g", load_avg, load_stddev);
- INFO1("| Total simulated time......: %g", simulated_time);
- INFO1("| Total simulation time.....: %g", simulation_time.duration());
+ PR_STATS("Load", loads);
+ PR_STATS("Computation", comps);
+ INFO1("| Total simulated time...................: %g", simulated_time);
+ INFO1("| Total simulation time..................: %g",
+ simulation_time.duration());
INFO0("`----");
}
if (exit_status)
#include "neighbor.h"
neighbor::neighbor(const char* hostname)
- : host((hostdata* )MSG_host_get_data(MSG_get_host_by_name(hostname)))
+ : host(static_cast<hostdata*>(MSG_host_get_data(MSG_get_host_by_name(hostname))))
, load(std::numeric_limits<double>::infinity())
, debt(0.0)
, to_send(0.0)
// Getter and setter for load
double get_load() const { return load; }
void set_load(double amount) { load = amount; }
- void add_load(double amount) { load += amount; }
// Getter and setter for debt
double get_debt() const { return debt; }
// Getter and setter for to_send
double get_to_send() const { return to_send; }
void set_to_send(double amount) { to_send = amount; }
- void add_to_send(double amount) { to_send += amount; }
// Prints its name and load on given category, with given
// priority. If verbose is true, prints debt and to_send too.
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main);
+#include "deployment.h"
+#include "process.h"
#include "loba_simple.h"
#include "loba_fairstrategy.h"
bool bookkeeping = false;
// Application parameters
- cost_func comp_cost("1e9, 0"); // fixme: find better defaults
- cost_func comm_cost("1, 0"); // fixme: find better defaults
- unsigned comp_maxiter = 10; // fixme: find better defaults
- unsigned lb_maxiter = comp_maxiter; // fixme: find better defaults
- bool exit_on_close = false;
+ // fixme: find better defaults
+ cost_func comp_cost("1e9, 0");
+ cost_func comm_cost("1, 0");
+ double min_iter_duration = 1.0;
+
+ // Parameters for the end of the simulation
+ unsigned lb_maxiter = 0;
+ unsigned comp_maxiter = 0;
+ double time_limit = 0;
+ bool exit_on_close = true;
// Named parameters lists
loba_algorithms_type loba_algorithms;
const char* opt_helper::descr(const char* str)
{
- const int descr_width = 35;
+ const int descr_width = 40;
std::string& res = descr_str;
res = str;
res.resize(descr_width, '.');
int c;
opterr = 0;
- while ((c = getopt(*argc, argv, "a:bc:C:ehi:I:l:L:N:s:T:vV")) != -1) {
+ while ((c = getopt(*argc, argv, "a:bc:C:ehi:I:l:L:N:s:t:T:vV")) != -1) {
switch (c) {
case 'a':
opt::loba_algo = optarg;
&& result;
break;
case 'b':
- opt::bookkeeping = true;
+ opt::bookkeeping = !opt::bookkeeping;
break;
case 'e':
- opt::exit_on_close = true;
+ opt::exit_on_close = !opt::exit_on_close;
break;
case 'h':
opt::help_requested++;
opt::comm_cost = cost_func(optarg);
break;
case 'i':
- std::istringstream(optarg) >> opt::comp_maxiter;
+ std::istringstream(optarg) >> opt::lb_maxiter;
break;
case 'I':
- std::istringstream(optarg) >> opt::lb_maxiter;
- ERROR0("option -I not implemented yet");
- result = false;
+ std::istringstream(optarg) >> opt::comp_maxiter;
break;
case 'l':
std::istringstream(optarg) >> opt::log_rate;
std::istringstream(optarg) >> opt::auto_depl::nhosts;
break;
case 's':
- std::istringstream(optarg) >> communicator::send_count_before_flush;
+ std::istringstream(optarg) >> opt::min_iter_duration;
+ break;
+ case 't':
+ std::istringstream(optarg) >> opt::time_limit;
break;
case 'T':
opt::auto_depl::topology = optarg;
INFO2("| %s: " format, h.descr(description), value)
INFO0(",----[ Simulation parameters ]");
- DESCR("log rate", "%s", h.val_or_string(log_rate, "disabled"));
+ DESCR("log rate", "%s", h.val_or_string(log_rate, "disabled"));
DESCR("platform file", "\"%s\"", platform_file.c_str());
if (auto_depl::enabled) {
INFO0("| automatic deployment enabled");
- DESCR("- topology", "%s", auto_depl::topology.c_str());
- DESCR("- number of hosts", "%s", h.val_or_string(auto_depl::nhosts,
- "auto"));
- DESCR("- initial load", "%s", h.val_or_string(auto_depl::load,
- "auto"));
+ DESCR("- topology", "%s", auto_depl::topology.c_str());
+ DESCR("- number of hosts", "%s", h.val_or_string(auto_depl::nhosts,
+ "auto"));
+ DESCR("- initial load", "%s", h.val_or_string(auto_depl::load,
+ "auto"));
} else {
DESCR("deployment file", "\"%s\"", deployment_file.c_str());
}
- DESCR("load balancing algorithm", "%s", loba_algo.c_str());
- DESCR("bookkeeping", "%s", h.on_off(bookkeeping));
- DESCR("computation cost factors", "[%s]", comp_cost.to_string().c_str());
+ DESCR("load balancing algorithm", "%s", loba_algo.c_str());
+ DESCR("bookkeeping", "%s", h.on_off(bookkeeping));
+ DESCR("computation cost factors", "[%s]", comp_cost.to_string().c_str());
DESCR("communication cost factors", "[%s]", comm_cost.to_string().c_str());
- DESCR("maximum number of comp. iterations", "%s",
- h.val_or_string(comp_maxiter, "infinity"));
+ DESCR("minimum duration between iterations", "%g", min_iter_duration);
DESCR("maximum number of lb. iterations", "%s",
h.val_or_string(lb_maxiter, "infinity"));
- DESCR("exit on close", "%s", h.on_off(exit_on_close));
- DESCR("send count before flush", "%d",
- communicator::send_count_before_flush);
+ DESCR("maximum number of comp. iterations", "%s",
+ h.val_or_string(comp_maxiter, "infinity"));
+ DESCR("time limit", "%s", h.val_or_string(time_limit, "infinity"));
+ DESCR("exit on close", "%s", h.on_off(exit_on_close));
INFO0("`----");
#undef DESCR
std::clog << "\nSimulation parameters\n";
std::clog << o("-l value")
- << "print current load every n-th iterations, 0 to disable"
- << " (" << opt::log_rate << ")\n";
+ << "print current load every n lb iterations, 0 to disable"
+ << " [" << opt::log_rate << "]\n";
std::clog << o("-v")
<< "verbose: do not override the default logging parameters\n";
std::clog << "\nAutomatic deployment options\n";
std::clog << o("-T name")
<< "enable automatic deployment with selected topology"
- << " (" << opt::auto_depl::topology << ")\n";
+ << " [" << opt::auto_depl::topology << "]\n";
if (opt::help_requested > 1)
so_list(opt::topologies);
std::clog << o("-L value")
<< "total load with auto deployment, 0 for number of hosts"
- << " (" << opt::auto_depl::load << ")\n";
+ << " [" << opt::auto_depl::load << "]\n";
std::clog << o("-N value")
- << "number of hosts to use with auto deployment,"
- << " 0 for max. (" << opt::auto_depl::nhosts << ")\n";
+ << "number of hosts to use with auto deployment, 0 for max."
+ << " [" << opt::auto_depl::nhosts << "]\n";
std::clog << "\nLoad balancing algorithm\n";
std::clog << o("-a name") << "load balancing algorithm"
- << " (" << opt::loba_algo << ")\n";
+ << " [" << opt::loba_algo << "]\n";
if (opt::help_requested > 1)
so_list(opt::loba_algorithms);
- std::clog << o("-b") << "enable bookkeeping\n";
+ std::clog << o("-b") << "toggle bookkeeping (\"virtual load\")"
+ << " [" << opt_helper::on_off(opt::bookkeeping) << "]\n";
std::clog << "\nApplication parameters\n";
std::clog << o("-c [fn,...]f0")
<< "polynomial factors for computation cost"
- << " (" << opt::comp_cost.to_string() << ")\n";
+ << " [" << opt::comp_cost.to_string() << "]\n";
std::clog << o("-C [fn,...]f0")
<< "polynomial factors for communication cost"
- << " (" << opt::comm_cost.to_string() << ")\n";
- std::clog << o("-e") << "exit on reception of \"close\" message\n";
+ << " [" << opt::comm_cost.to_string() << "]\n";
+ std::clog << o("-s value")
+ << "minimum duration between iterations"
+ << " [" << opt::min_iter_duration << "]\n";
+
+ std::clog << "\nParameters for the end of the simulation\n";
std::clog << o("-i value")
- << "maximum number of comp. iterations, 0 for infinity"
- << " (" << opt::comp_maxiter << ")\n";
- std::clog << o("-I value")
<< "maximum number of lb. iterations, 0 for infinity"
- << " (" << opt::lb_maxiter << ")\n";
+ << " [" << opt::lb_maxiter << "]\n";
+ std::clog << o("-I value")
+ << "maximum number of comp. iterations, 0 for infinity"
+ << " [" << opt::comp_maxiter << "]\n";
+ std::clog << o("-t value")
+ << "time limit (simulated time), 0 for infinity"
+ << " [" << opt::time_limit << "]\n";
+ std::clog << o("-e") << "toggle exit on reception of \"close\" message"
+ << " [" << opt_helper::on_off(opt::exit_on_close) << "]\n";
if (opt::help_requested < 3)
return;
<< " proc : messages from base process class\n"
<< " loba : messages from load-balancer\n";
- std::clog << "\nMiscellaneous low-level parameters\n";
- std::clog << o("-s count")
- << "check for finished comm. every `count' send operation"
- << " (" << communicator::send_count_before_flush << ")\n";
+ // std::clog << "\nMiscellaneous low-level parameters\n";
#undef so_list
#undef so
#include <string>
#include "cost_func.h"
-#include "deployment.h"
#include "named_object_list.h"
-#include "process.h"
+
+// These classes may use include options.h, so make forward declarations
+class deployment_generator;
+class process;
// Global parameters, shared by all the processes
namespace opt {
// Application parameters
extern cost_func comp_cost;
extern cost_func comm_cost;
- extern unsigned comp_maxiter;
+ extern double min_iter_duration;
+
+ // Parameters for the end of the simulation
extern unsigned lb_maxiter;
+ extern unsigned comp_maxiter;
+ extern double time_limit;
extern bool exit_on_close;
// Named parameters lists
#include <algorithm>
#include <tr1/functional>
#include <iterator>
-#include <numeric>
#include <stdexcept>
#include <sstream>
#include <xbt/log.h>
#include "misc.h"
#include "options.h"
+#include "tracing.h"
#include "process.h"
process::process(int argc, char* argv[])
{
- if (argc < 2 || !(std::istringstream(argv[1]) >> load))
+ if (argc < 2 || !(std::istringstream(argv[1]) >> real_load))
throw std::invalid_argument("bad or missing initial load parameter");
neigh.assign(argv + 2, argv + argc);
rev_neigh.insert(std::make_pair(host, ptr));
}
- prev_load_broadcast = -1; // force sending of load on first send()
- expected_load = load;
- total_load_running += load;
- total_load_init += load;
+ comp = 0.0;
+
+ prev_load_broadcast = -1; // force sending of load on first send_all()
+ expected_load = real_load;
+ total_load_running += real_load;
+ total_load_init += real_load;
ctrl_close_pending = data_close_pending = neigh.size();
close_received = false;
finalizing = false;
+ comp_iter = lb_iter = 0;
+
e_xbt_log_priority_t logp = xbt_log_priority_verbose;
if (!LOG_ISENABLED(logp))
return;
process::~process()
{
- total_load_exit += load;
+ total_load_exit += real_load;
+ if (opt::bookkeeping) {
+ INFO4("Final load after %d:%d iterations: %g ; expected: %g",
+ lb_iter, comp_iter, real_load, expected_load);
+ } else {
+ INFO2("Final load after %d iterations: %g",
+ lb_iter, real_load);
+ if (lb_iter != comp_iter)
+ WARN2("lb_iter (%d) and comp_iter (%d) differ!",
+ lb_iter, comp_iter);
+ }
+ VERB1("Total computation for this process: %g", comp);
}
int process::run()
{
- INFO1("Initial load: %g", load);
+ double next_iter_after_date = 0.0;
+ INFO1("Initial load: %g", real_load);
VERB0("Starting...");
- comp_iter = lb_iter = 0;
while (true) {
- if (load > 0.0) {
- ++comp_iter;
- if (opt::log_rate && comp_iter % opt::log_rate == 0) {
+ if (get_load() > 0.0) {
+ double now = MSG_get_clock();
+ if (now < next_iter_after_date)
+ MSG_process_sleep(next_iter_after_date - now);
+ next_iter_after_date = MSG_get_clock() + opt::min_iter_duration;
+
+ ++lb_iter;
+
+ if (opt::log_rate && lb_iter % opt::log_rate == 0) {
if (opt::bookkeeping)
INFO4("(%u:%u) current load: %g ; expected: %g",
- comp_iter, lb_iter, load, expected_load);
+ lb_iter, comp_iter, real_load, expected_load);
else
INFO2("(%u) current load: %g",
- comp_iter, load);
+ lb_iter, real_load);
}
- if (opt::bookkeeping)
- expected_load -= load_balance(expected_load);
- else
- load -= load_balance(load);
+ load_balance();
print_loads(true, xbt_log_priority_debug);
+ }
- send();
+ // send load information, and load (data) if any
+ send_all();
+ if (real_load > 0.0) {
+ ++comp_iter;
compute();
-
- } else {
- // send load information, and load when bookkeeping
- send();
}
- if (opt::comp_maxiter && comp_iter >= opt::comp_maxiter)
+ if (opt::lb_maxiter && lb_iter >= opt::lb_maxiter) {
+ VERB2("Reached lb_maxiter: %d/%d", lb_iter, opt::lb_maxiter);
+ break;
+ }
+ if (opt::comp_maxiter && comp_iter >= opt::comp_maxiter) {
+ VERB2("Reached comp_maxiter: %d/%d", comp_iter, opt::comp_maxiter);
break;
- if (opt::lb_maxiter && lb_iter >= opt::lb_maxiter)
+ }
+ if (opt::time_limit && MSG_get_clock() >= opt::time_limit) {
+ VERB2("Reached time limit: %g/%g", MSG_get_clock(), opt::time_limit);
break;
+ }
// block on receiving unless there is something to compute or
// to send
- bool wait = (load == 0 &&
- ((opt::bookkeeping ? expected_load : load)
- == prev_load_broadcast));
- receive(wait);
+ double timeout;
+ if (real_load != 0 || get_load() != prev_load_broadcast)
+ timeout = 0.0;
+ else if (opt::min_iter_duration)
+ timeout = opt::min_iter_duration;
+ else
+ timeout = 1.0;
+ receive(timeout);
// one of our neighbor is finalizing
- if (opt::exit_on_close && close_received)
+ if (opt::exit_on_close && close_received) {
+ VERB0("Close received");
break;
+ }
// have no load and cannot receive anything
- if (load == 0.0 && !may_receive())
+ if (real_load == 0.0 && !may_receive()) {
+ VERB0("I'm a poor lonesome process, and I have no load...");
break;
+ }
// fixme: this check should be implemented with a distributed
// algorithm, and not a shared global variable!
// fixme: should this chunk be moved before call to receive() ?
if (100.0 * total_load_running / total_load_init <=
opt::load_ratio_threshold) {
- VERB0("No more load to balance in system, stopping.");
+ VERB0("No more load to balance in system.");
break;
+ } else {
+ DEBUG1("still %g load to balance, continuing...", total_load_running);
}
-
}
VERB0("Going to finalize...");
finalize();
*/
VERB0("Done.");
- INFO3("Final load after %d iteration%s: %g",
- comp_iter, ESSE(comp_iter), load);
- if (opt::bookkeeping)
- INFO1("Expected load: %g", expected_load);
return 0;
}
-double process::sum_of_to_send() const
-{
- using std::tr1::bind;
- using std::tr1::placeholders::_1;
- using std::tr1::placeholders::_2;
-
- return std::accumulate(neigh.begin(), neigh.end(), 0.0,
- bind(std::plus<double>(),
- _1, bind(&neighbor::get_to_send, _2)));
-}
-
-double process::load_balance(double /*my_load*/)
+void process::load_balance()
{
if (lb_iter == 1) // warn only once
- WARN0("process::load_balance is a no-op!");
- return 0.0;
+ WARN0("process::load_balance() is a no-op!");
}
void process::compute()
{
- if (load > 0.0) {
- double duration = opt::comp_cost(load);
- m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
- DEBUG2("compute %g flop%s", duration, ESSE(duration));
+ if (real_load > 0.0) {
+ double flops = opt::comp_cost(real_load);
+ m_task_t task = MSG_task_create("computation", flops, 0.0, NULL);
+ TRACE_msg_set_task_category(task, TRACE_CAT_COMP);
+ DEBUG2("compute %g flop%s", flops, ESSE(flops));
MSG_task_execute(task);
+ comp += flops;
MSG_task_destroy(task);
} else {
DEBUG0("nothing to compute !");
}
}
+void process::send(neighbor& nb, double amount)
+{
+ set_load(get_load() - amount);
+ nb.set_to_send(nb.get_to_send() + amount);
+ nb.set_load(nb.get_load() + amount); // fixme: make this optional?
+}
+
void process::send1_no_bookkeeping(neighbor& nb)
{
- if (load != prev_load_broadcast)
- comm.send(nb.get_ctrl_mbox(), new message(message::INFO, load));
+ if (real_load != prev_load_broadcast)
+ comm.send(nb.get_ctrl_mbox(), new message(message::INFO, real_load));
double load_to_send = nb.get_to_send();
if (load_to_send > 0.0) {
comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
} else {
new_debt = nb.get_debt();
}
- if (load <= new_debt) {
- load_to_send = load;
+ if (real_load <= new_debt) {
+ load_to_send = real_load;
nb.set_debt(new_debt - load_to_send);
- load = 0.0;
+ real_load = 0.0;
} else {
load_to_send = new_debt;
nb.set_debt(0.0);
- load -= load_to_send;
+ real_load -= load_to_send;
}
if (load_to_send > 0.0)
comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
}
-void process::send()
+void process::send_all()
{
using std::tr1::bind;
using std::tr1::placeholders::_1;
} else {
std::for_each(neigh.begin(), neigh.end(),
bind(&process::send1_no_bookkeeping, this, _1));
- prev_load_broadcast = load;
+ prev_load_broadcast = real_load;
}
+ comm.flush(false);
}
-void process::receive(bool wait)
+void process::receive(double timeout)
{
message* msg;
m_host_t from;
- while (may_receive() && comm.recv(msg, from, wait)) {
+ DEBUG2("%sblocking receive (%g)", "\0non-" + !timeout, timeout);
+ while (may_receive() && comm.recv(msg, from, timeout)) {
switch (msg->get_type()) {
case message::INFO: {
neighbor* n = rev_neigh[from];
break;
case message::LOAD: {
double ld = msg->get_amount();
- load += ld;
+ real_load += ld;
if (finalizing)
total_load_running -= ld;
break;
break;
}
delete msg;
- wait = false; // only wait on first recv
+ timeout = 0.0; // only wait on first recv
}
+ comm.flush(false);
}
void process::finalize1(neighbor& nb)
using std::tr1::placeholders::_1;
finalizing = true;
- total_load_running -= load;
+ total_load_running -= real_load;
DEBUG2("send CLOSE to %lu neighbor%s",
(unsigned long )neigh.size(), ESSE(neigh.size()));
std::for_each(neigh.begin(), neigh.end(),
bind(&process::finalize1, this, _1));
- DEBUG2("wait for CLOSE from %lu neighbor%s",
- (unsigned long )neigh.size(), ESSE(neigh.size()));
- while (may_receive())
- receive(true);
+ while (may_receive()) {
+ comm.flush(false);
+ DEBUG2("waiting for %d CTRL and %d DATA CLOSE",
+ ctrl_close_pending, data_close_pending);
+ receive(-1.0);
+ }
comm.flush(true);
}
#include <xbt/log.h>
#include "communicator.h"
#include "neighbor.h"
+#include "options.h"
class process {
public:
process(int argc, char* argv[]);
virtual ~process();
- double get_load() const { return load; }
+ double get_comp() const { return comp; }
+ double get_real_load() const { return real_load; }
int run();
pneigh_type pneigh; // list of pointers to neighbors that
// we are free to reorder
- // Returns the sum of "to_send" for all neighbors.
- double sum_of_to_send() const;
+ // Get and set current load, which may be real load, or expected
+ // load if opt::bookkeeping is true.
+ double get_load() const;
+ void set_load(double load);
+
+ // Register some amount of load to send to given neighbor.
+ void send(neighbor& nb, double amount);
+ void send(neighbor* nb, double amount) { send(*nb, amount); }
// Calls neighbor::print(verbose, logp, cat) for each member of neigh.
void print_loads(bool verbose = false,
private:
static double total_load_init; // sum of process loads at init
- static double total_load_running; // summ of loads while running
+ static double total_load_running; // sum of loads while running
static double total_load_exit; // sum of process loads at exit
typedef MAP_TEMPLATE<m_host_t, neighbor*> rev_neigh_type;
unsigned lb_iter; // counter of load-balancing iterations
unsigned comp_iter; // counter of computation iterations
+ double comp; // total computing done so far (flops)
+
double prev_load_broadcast; // used to ensure that we do not send
// a same information messages
- double load; // current load
+ double real_load; // current load
double expected_load; // expected load in bookkeeping mode
// The load balancing algorithm comes here...
- // Parameter "my_load" is the load to take into account for myself
- // (may be load or expected load).
- // Returns the total load sent to neighbors.
- virtual double load_balance(double my_load);
+ virtual void load_balance();
// Virtually do some computation
void compute();
// Send procedures, with helpers for bookkeeping mode or not
void send1_no_bookkeeping(neighbor& nb);
void send1_bookkeeping(neighbor& nb);
- void send();
+ void send_all();
// Returns true if there remains neighbors to listen for
- bool may_receive() { return ctrl_close_pending || data_close_pending; }
+ bool may_receive() const {
+ return ctrl_close_pending || data_close_pending;
+ }
- // Receive procedure: wait (or not) for a message to come
- void receive(bool wait);
+ // Receive procedure
+ // Parameter "timeout" may be 0 for non-blocking operation, -1 for
+ // infinite waiting, or any positive timeout.
+ void receive(double timeout);
// Finalize sends a "close" message to each neighbor and wait for
// all of them to answer.
void finalize();
};
+inline
+double process::get_load() const
+{
+ if (opt::bookkeeping)
+ return expected_load;
+ else
+ return real_load;
+}
+
+inline
+void process::set_load(double load)
+{
+ if (opt::bookkeeping)
+ expected_load = load;
+ else
+ real_load = load;
+}
+
#endif // !PROCESS_H
// Local variables:
--- /dev/null
+#ifndef STATISTICS_H
+#define STATISTICS_H
+
+#include <cmath>
+#include <vector>
+
+class statistics {
+public:
+ statistics()
+ : count(0)
+ , sum(0.0)
+ , mean(0.0)
+ , sqdiff_sum(0.0)
+ { }
+
+ void push(double x) {
+ double delta = x - mean;
+ ++count;
+ sum += x;
+ mean = sum / count;
+ sqdiff_sum += delta * (x - mean);
+ }
+
+ unsigned get_count() const { return count; }
+ double get_sum() const { return sum; }
+ double get_mean() const { return mean; }
+ double get_variance() const { return sqdiff_sum / count; }
+ double get_stddev() const { return sqrt(get_variance()); }
+
+private:
+ int count;
+ double sum; // sum of x_i
+ double mean; // mean of x_i
+ double sqdiff_sum; // sum of (x_i - mean)^2
+};
+
+#endif // !STATISTICS_H
+
+// Local variables:
+// mode: c++
+// End:
--- /dev/null
+#ifndef TRACING_H
+#define TRACING_H
+
+#define TRACE_CAT_COMP "comp_task"
+#define TRACE_CAT_CTRL "ctrl_mesg"
+#define TRACE_CAT_DATA "data_mesg"
+
+#endif // !TRACING_H
+
+// Local variables:
+// mode: c++
+// End:
fun:MSG_create_environment
...
}
+
+{
+ Memory leak in libc?
+ Memcheck:Leak
+ fun:malloc
+ fun:_dl_map_object_deps
+ fun:dl_open_worker
+ fun:_dl_catch_error
+ fun:_dl_open
+ fun:do_dlopen
+ fun:_dl_catch_error
+ fun:dlerror_run
+ fun:__libc_dlopen_mode
+ fun:init
+ fun:pthread_once
+ fun:backtrace
+}
(__DATE__ " " __TIME__);
const std::string copyright
- ("Copyright (c) 2010, Arnaud Giersch <arnaud.giersch@univ-fcomte.fr>");
+ ("Copyright (c) 2010-2011, Arnaud Giersch <arnaud.giersch@univ-fcomte.fr>");
}