From: Arnaud Giersch Date: Mon, 13 Dec 2010 17:57:28 +0000 (+0100) Subject: Wip++... X-Git-Tag: v0.1~245 X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/commitdiff_plain/325b135f0ee33c6c0242a14e2f58a54fb571b032 Wip++... * implement simple load balancing algorithm * re-introduce comm_cost * use "using namespace" for std::tr1 --- diff --git a/Makefile b/Makefile index 0f52ab6..80ab4fe 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,7 @@ SRC.loba := main.cpp \ communicator.cpp \ cost_func.cpp \ hostdata.cpp \ + $(wildcard loba_*.cpp) \ misc.cpp \ neighbor.cpp \ options.cpp \ diff --git a/TODO b/TODO index 1f235b5..c68c38d 100644 --- a/TODO +++ b/TODO @@ -1,6 +1,10 @@ * implement loba_* algorithms (start with some trivial one) * add loba algorithm selection (-a number ?) +* fix process::run when load is 0 + -> wait for a message... + -> how does it work with opt::bookkeeping ??? + * implement automatic process topology (line, ring, star, btree, clique, hypercube, etc..) * implement automatic platform generation diff --git a/communicator.cpp b/communicator.cpp index 4b54161..f3de918 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -7,10 +7,9 @@ #include #include "simgrid_features.h" #include "misc.h" +#include "options.h" -// XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu); -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu, - "Messages from asynchronous pipes"); +XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm); std::string message::to_string() { @@ -21,8 +20,11 @@ std::string message::to_string() return oss.str(); } +const int communicator::send_count_before_flush = 128; + communicator::communicator() : host((hostdata* )MSG_host_get_data(MSG_host_self())) + , send_counter(0) , ctrl_task(NULL) , ctrl_comm(NULL) , ctrl_close_is_last(false) @@ -53,10 +55,15 @@ void communicator::send(const char* dest, message* msg) { double msg_size = sizeof *msg; if (msg->get_type() == message::LOAD) - msg_size += msg->get_amount(); + msg_size += opt::comm_cost(msg->get_amount()); m_task_t task = MSG_task_create("message", 0.0, msg_size, msg); msg_comm_t comm = MSG_task_isend(task, dest); sent_comm.push_back(comm); + + if (++send_counter >= send_count_before_flush) { + flush(false); + send_counter = 0; + } } bool communicator::recv(message*& msg, m_host_t& from, bool wait) @@ -103,13 +110,16 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait) void communicator::flush(bool wait) { + using namespace std::tr1; + using namespace std::tr1::placeholders; + sent_comm.remove_if(comm_test_n_destroy); if (wait && !sent_comm.empty()) { xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL); while (!sent_comm.empty()) { std::for_each(sent_comm.begin(), sent_comm.end(), - std::tr1::bind(comm_push_in_dynar, - comms, std::tr1::placeholders::_1)); + bind(xbt_dynar_push, + comms, bind(misc::address(), _1))); MSG_comm_waitany(comms); xbt_dynar_reset(comms); sent_comm.remove_if(comm_test_n_destroy); @@ -128,11 +138,6 @@ void communicator::next_close_on_data_is_last() data_close_is_last = true; } -void communicator::comm_push_in_dynar(xbt_dynar_t dynar, msg_comm_t comm) -{ - xbt_dynar_push(dynar, &comm); -} - bool communicator::comm_test_n_destroy(msg_comm_t comm) { if (MSG_comm_test(comm)) { diff --git a/communicator.h b/communicator.h index 4e0757a..fb06d34 100644 --- a/communicator.h +++ b/communicator.h @@ -44,6 +44,8 @@ private: // List of pending send communications std::list sent_comm; + static const int send_count_before_flush; + int send_counter; // Control channel for receiving m_task_t ctrl_task; @@ -58,7 +60,6 @@ private: const char* get_ctrl_mbox() const { return host->get_ctrl_mbox(); } const char* get_data_mbox() const { return host->get_data_mbox(); } - static void comm_push_in_dynar(xbt_dynar_t dynar, msg_comm_t comm); static bool comm_test_n_destroy(msg_comm_t comm); }; diff --git a/hostdata.cpp b/hostdata.cpp index 96831f0..b75633a 100644 --- a/hostdata.cpp +++ b/hostdata.cpp @@ -3,7 +3,7 @@ #include #include -XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu); +XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main); hostdata* hostdata::instances = NULL; diff --git a/loba_least_loaded.cpp b/loba_least_loaded.cpp new file mode 100644 index 0000000..9927fd9 --- /dev/null +++ b/loba_least_loaded.cpp @@ -0,0 +1,43 @@ +#include "loba_least_loaded.h" + +#include + +XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(loba); + +/* simple version: + * load balance with a least-loaded neighbor, + * without breaking the ping-pong condition + */ +double loba_least_loaded::load_balance(double my_load) +{ + int imin = -1; + int imax = -1; + double min = my_load; + double max = -1.0; + for (unsigned i = 0 ; i < neigh.size() ; ++i) { + double l = neigh[i].get_load(); + if (l >= my_load) + continue; + if (l < min) { + imin = i; + min = l; + } + if (l > max) { + imax = i; + max = l; + } + } + if (imin != -1) { + // found someone + double balance = (my_load - max) / 2; + DEBUG6("%d:%g %d:%g %g %g", imin, min, imax, max, my_load, balance); + neigh[imin].set_to_send(balance); + return balance; + } else { + return 0.0; + } +} + +// Local variables: +// mode: c++ +// End: diff --git a/loba_least_loaded.h b/loba_least_loaded.h new file mode 100644 index 0000000..893a48a --- /dev/null +++ b/loba_least_loaded.h @@ -0,0 +1,19 @@ +#ifndef LOBA_LEAST_LOADED +#define LOBA_LEAST_LOADED + +#include "process.h" + +class loba_least_loaded: public process { +public: + loba_least_loaded(int argc, char* argv[]): process(argc, argv) { } + ~loba_least_loaded() { } + +private: + double load_balance(double my_load); +}; + +#endif //!LOBA_LEAST_LOADED + +// Local variables: +// mode: c++ +// End: diff --git a/main.cpp b/main.cpp index a3b790d..a40bb9f 100644 --- a/main.cpp +++ b/main.cpp @@ -9,8 +9,14 @@ #include "timer.h" #include "version.h" -// Creates a new log category and makes it the default -XBT_LOG_NEW_DEFAULT_CATEGORY(simu, "Simulation messages"); +// Creates log categories +XBT_LOG_NEW_CATEGORY(simu, "Simulation messages"); +XBT_LOG_NEW_SUBCATEGORY(main, simu, "Messages from global infrastructure"); +XBT_LOG_NEW_SUBCATEGORY(comm, simu, "Messages from asynchronous pipes"); +XBT_LOG_NEW_SUBCATEGORY(proc, simu, "Messages from base process class"); +XBT_LOG_NEW_SUBCATEGORY(loba, simu, "Messages from load-balancer"); + +XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main); // Failure exit status enum { @@ -21,9 +27,11 @@ enum { EXIT_FAILURE_CLEAN = 0x08, // error at cleanup }; +#include "loba_least_loaded.h" int simulation_main(int argc, char* argv[]) { - process proc(argc, argv); + // process proc(argc, argv); + loba_least_loaded proc(argc, argv); return proc.run(); } diff --git a/misc.h b/misc.h index aee128e..2ec625b 100644 --- a/misc.h +++ b/misc.h @@ -1,6 +1,7 @@ #ifndef MISC_H #define MISC_H +#include #include /* Returns true if the given priority is enabled for the default @@ -13,8 +14,15 @@ /* Returns c-string "s" if n > 1, empty string "" otherwise. */ #define ESSE(n) ((n) > 1 ? misc::str_esse : misc::str_nil) namespace misc { + extern const char str_esse[]; extern const char str_nil[]; + + template + struct address: public std::unary_function { + T* operator()(T& ref) { return &ref; } + }; + } #endif // !MISC_H diff --git a/options.cpp b/options.cpp index 61861f0..4f9c52c 100644 --- a/options.cpp +++ b/options.cpp @@ -8,7 +8,7 @@ #include #include "misc.h" -XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu); +XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main); namespace opt { @@ -28,6 +28,7 @@ namespace opt { bool bookkeeping = false; cost_func comp_cost("1e9, 0"); // fixme: find better defaults + cost_func comm_cost("1, 0"); // fixme: find better defaults } // namespace opt @@ -49,7 +50,7 @@ int opt::parse_args(int* argc, char* argv[]) int c; opterr = 0; - while ((c = getopt(*argc, argv, "bc:ehi:l:V")) != -1) { + while ((c = getopt(*argc, argv, "bc:C:ehi:l:V")) != -1) { switch (c) { case 'b': opt::bookkeeping = true; @@ -63,6 +64,9 @@ int opt::parse_args(int* argc, char* argv[]) case 'c': opt::comp_cost = cost_func(optarg); break; + case 'C': + opt::comm_cost = cost_func(optarg); + break; case 'i': std::istringstream(optarg) >> opt::maxiter; break; @@ -114,6 +118,7 @@ void opt::print() INFO1("| exit on close.......: %s", on_off(opt::exit_on_close)); INFO1("| bookkeeping.........: %s", on_off(opt::bookkeeping)); INFO1("| comp. cost factors..: [%s]", opt::comp_cost.to_string().c_str()); + INFO1("| comm. cost factors..: [%s]", opt::comm_cost.to_string().c_str()); INFO0("`----"); } @@ -140,6 +145,9 @@ void opt::usage() std::clog << oo("-c", "[fn,...]f0") << "polynomial factors for computation cost (" << opt::comp_cost.to_string() << ")\n"; + std::clog << oo("-C", "[fn,...]f0") + << "polynomial factors for communication cost (" + << opt::comm_cost.to_string() << ")\n"; std::clog << o("-e") << "exit on close reception\n"; std::clog << oo("-i", "value") << "maximum number of iterations, 0 for infinity (" diff --git a/options.h b/options.h index 8125edb..70c9f5d 100644 --- a/options.h +++ b/options.h @@ -22,6 +22,7 @@ namespace opt { extern bool bookkeeping; extern cost_func comp_cost; + extern cost_func comm_cost; int parse_args(int* argc, char* argv[]); void print(); diff --git a/process.cpp b/process.cpp index e1232f2..9988def 100644 --- a/process.cpp +++ b/process.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -10,18 +11,24 @@ #include "misc.h" #include "options.h" -XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu); +XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc); process::process(int argc, char* argv[]) { + using namespace std::tr1; + using namespace std::tr1::placeholders; + if (argc < 2 || !(std::istringstream(argv[1]) >> load)) throw std::invalid_argument("bad or missing initial load"); neigh.assign(argv + 2, argv + argc); std::for_each(neigh.begin(), neigh.end(), - std::tr1::bind(&process::insert_neighbor_in_map, - this, std::tr1::placeholders::_1)); + bind(&process::insert_neighbor_in_map, this, _1)); + + pneigh.resize(neigh.size()); + std::transform(neigh.begin(), neigh.end(), pneigh.begin(), + misc::address()); expected_load = load; @@ -42,7 +49,7 @@ process::process(int argc, char* argv[]) oss << ESSE(neigh.size()) << ": "; std::transform(neigh.begin(), neigh.end() - 1, std::ostream_iterator(oss, ", "), - std::tr1::mem_fn(&neighbor::get_name)); + mem_fn(&neighbor::get_name)); oss << neigh.back().get_name(); } LOG1(logp, "Got %s.", oss.str().c_str()); @@ -59,6 +66,8 @@ int process::run() INFO1("Initial load: %g", load); VERB0("Starting..."); + // first send to inform neighbors about our load + send(); iter = 0; while (one_more) { ++iter; @@ -82,7 +91,6 @@ int process::run() load -= load_balance(load); send(); - comm.flush(false); if (opt::exit_on_close && close_received) one_more = false; @@ -107,14 +115,14 @@ int process::run() return 0; } -void process::compute() +double process::sum_of_to_send() const { - // fixme: shall we do something special when duration is 0 ? - double duration = opt::comp_cost(load); - m_task_t task = MSG_task_create("computation", duration, 0.0, NULL); - DEBUG2("compute %g flop%s.", duration, ESSE(duration)); - MSG_task_execute(task); - MSG_task_destroy(task); + using namespace std::tr1; + using namespace std::tr1::placeholders; + + return std::accumulate(neigh.begin(), neigh.end(), 0.0, + bind(std::plus(), + _1, bind(&neighbor::get_to_send, _2))); } double process::load_balance(double /*my_load*/) @@ -122,6 +130,21 @@ double process::load_balance(double /*my_load*/) return 0.0; } +void process::compute() +{ + // fixme: shall we do something special when duration is 0 ? + double duration = opt::comp_cost(load); + if (duration > 0) { + m_task_t task = MSG_task_create("computation", duration, 0.0, NULL); + DEBUG2("compute %g flop%s.", duration, ESSE(duration)); + MSG_task_execute(task); + MSG_task_destroy(task); + } else { + xbt_sleep(42); + // xbt_thread_yield(); + } +} + void process::send1_no_bookkeeping(neighbor& nb) { comm.send(nb.get_ctrl_mbox(), new message(message::INFO, load)); @@ -161,16 +184,16 @@ void process::send1_bookkeeping(neighbor& nb) void process::send() { + using namespace std::tr1; + using namespace std::tr1::placeholders; + // fixme: shall we send data at all iterations? - if (opt::bookkeeping) { + if (opt::bookkeeping) std::for_each(neigh.begin(), neigh.end(), - std::tr1::bind(&process::send1_bookkeeping, - this, std::tr1::placeholders::_1)); - } else { + bind(&process::send1_bookkeeping, this, _1)); + else std::for_each(neigh.begin(), neigh.end(), - std::tr1::bind(&process::send1_no_bookkeeping, - this, std::tr1::placeholders::_1)); - } + bind(&process::send1_no_bookkeeping, this, _1)); } // Returns false if a CLOSE message was received. @@ -221,11 +244,13 @@ void process::finalize1(neighbor& nb) void process::finalize() { + using namespace std::tr1; + using namespace std::tr1::placeholders; + DEBUG2("send CLOSE to %d neighbor%s.", (int )neigh.size(), ESSE(neigh.size())); std::for_each(neigh.begin(), neigh.end(), - std::tr1::bind(&process::finalize1, - this, std::tr1::placeholders::_1)); + bind(&process::finalize1, this, _1)); DEBUG2("wait for CLOSE from %d neighbor%s.", (int )neigh.size(), ESSE(neigh.size())); diff --git a/process.h b/process.h index 5802426..c148683 100644 --- a/process.h +++ b/process.h @@ -22,10 +22,17 @@ public: ~process(); int run(); +protected: + typedef std::vector neigh_type; + typedef std::vector pneigh_type; + + neigh_type neigh; + pneigh_type pneigh; + private: - std::vector neigh; - MAP_TEMPLATE rev_neigh; - std::vector pneigh; + typedef MAP_TEMPLATE rev_neigh_type; + + rev_neigh_type rev_neigh; communicator comm; int ctrl_close_pending; @@ -36,8 +43,10 @@ private: double load; double expected_load; - void compute(); + double sum_of_to_send() const; virtual double load_balance(double my_load); + + void compute(); void send1_no_bookkeeping(neighbor& nb); void send1_bookkeeping(neighbor& nb); void send();