communicator.cpp \
cost_func.cpp \
hostdata.cpp \
+ $(wildcard loba_*.cpp) \
misc.cpp \
neighbor.cpp \
options.cpp \
* implement loba_* algorithms (start with some trivial one)
* add loba algorithm selection (-a number ?)
+* fix process::run when load is 0
+ -> wait for a message...
+ -> how does it work with opt::bookkeeping ???
+
* implement automatic process topology
(line, ring, star, btree, clique, hypercube, etc..)
* implement automatic platform generation
#include <xbt/log.h>
#include "simgrid_features.h"
#include "misc.h"
+#include "options.h"
-// XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
- "Messages from asynchronous pipes");
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm);
std::string message::to_string()
{
return oss.str();
}
+const int communicator::send_count_before_flush = 128;
+
communicator::communicator()
: host((hostdata* )MSG_host_get_data(MSG_host_self()))
+ , send_counter(0)
, ctrl_task(NULL)
, ctrl_comm(NULL)
, ctrl_close_is_last(false)
{
double msg_size = sizeof *msg;
if (msg->get_type() == message::LOAD)
- msg_size += msg->get_amount();
+ msg_size += opt::comm_cost(msg->get_amount());
m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);
msg_comm_t comm = MSG_task_isend(task, dest);
sent_comm.push_back(comm);
+
+ if (++send_counter >= send_count_before_flush) {
+ flush(false);
+ send_counter = 0;
+ }
}
bool communicator::recv(message*& msg, m_host_t& from, bool wait)
void communicator::flush(bool wait)
{
+ using namespace std::tr1;
+ using namespace std::tr1::placeholders;
+
sent_comm.remove_if(comm_test_n_destroy);
if (wait && !sent_comm.empty()) {
xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
while (!sent_comm.empty()) {
std::for_each(sent_comm.begin(), sent_comm.end(),
- std::tr1::bind(comm_push_in_dynar,
- comms, std::tr1::placeholders::_1));
+ bind(xbt_dynar_push,
+ comms, bind(misc::address<msg_comm_t>(), _1)));
MSG_comm_waitany(comms);
xbt_dynar_reset(comms);
sent_comm.remove_if(comm_test_n_destroy);
data_close_is_last = true;
}
-void communicator::comm_push_in_dynar(xbt_dynar_t dynar, msg_comm_t comm)
-{
- xbt_dynar_push(dynar, &comm);
-}
-
bool communicator::comm_test_n_destroy(msg_comm_t comm)
{
if (MSG_comm_test(comm)) {
// List of pending send communications
std::list<msg_comm_t> sent_comm;
+ static const int send_count_before_flush;
+ int send_counter;
// Control channel for receiving
m_task_t ctrl_task;
const char* get_ctrl_mbox() const { return host->get_ctrl_mbox(); }
const char* get_data_mbox() const { return host->get_data_mbox(); }
- static void comm_push_in_dynar(xbt_dynar_t dynar, msg_comm_t comm);
static bool comm_test_n_destroy(msg_comm_t comm);
};
#include <xbt/log.h>
#include <stdexcept>
-XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main);
hostdata* hostdata::instances = NULL;
--- /dev/null
+#include "loba_least_loaded.h"
+
+#include <xbt/log.h>
+
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(loba);
+
+/* simple version:
+ * load balance with a least-loaded neighbor,
+ * without breaking the ping-pong condition
+ */
+double loba_least_loaded::load_balance(double my_load)
+{
+ int imin = -1;
+ int imax = -1;
+ double min = my_load;
+ double max = -1.0;
+ for (unsigned i = 0 ; i < neigh.size() ; ++i) {
+ double l = neigh[i].get_load();
+ if (l >= my_load)
+ continue;
+ if (l < min) {
+ imin = i;
+ min = l;
+ }
+ if (l > max) {
+ imax = i;
+ max = l;
+ }
+ }
+ if (imin != -1) {
+ // found someone
+ double balance = (my_load - max) / 2;
+ DEBUG6("%d:%g %d:%g %g %g", imin, min, imax, max, my_load, balance);
+ neigh[imin].set_to_send(balance);
+ return balance;
+ } else {
+ return 0.0;
+ }
+}
+
+// Local variables:
+// mode: c++
+// End:
--- /dev/null
+#ifndef LOBA_LEAST_LOADED
+#define LOBA_LEAST_LOADED
+
+#include "process.h"
+
+class loba_least_loaded: public process {
+public:
+ loba_least_loaded(int argc, char* argv[]): process(argc, argv) { }
+ ~loba_least_loaded() { }
+
+private:
+ double load_balance(double my_load);
+};
+
+#endif //!LOBA_LEAST_LOADED
+
+// Local variables:
+// mode: c++
+// End:
#include "timer.h"
#include "version.h"
-// Creates a new log category and makes it the default
-XBT_LOG_NEW_DEFAULT_CATEGORY(simu, "Simulation messages");
+// Creates log categories
+XBT_LOG_NEW_CATEGORY(simu, "Simulation messages");
+XBT_LOG_NEW_SUBCATEGORY(main, simu, "Messages from global infrastructure");
+XBT_LOG_NEW_SUBCATEGORY(comm, simu, "Messages from asynchronous pipes");
+XBT_LOG_NEW_SUBCATEGORY(proc, simu, "Messages from base process class");
+XBT_LOG_NEW_SUBCATEGORY(loba, simu, "Messages from load-balancer");
+
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main);
// Failure exit status
enum {
EXIT_FAILURE_CLEAN = 0x08, // error at cleanup
};
+#include "loba_least_loaded.h"
int simulation_main(int argc, char* argv[])
{
- process proc(argc, argv);
+ // process proc(argc, argv);
+ loba_least_loaded proc(argc, argv);
return proc.run();
}
#ifndef MISC_H
#define MISC_H
+#include <functional>
#include <xbt/log.h>
/* Returns true if the given priority is enabled for the default
/* Returns c-string "s" if n > 1, empty string "" otherwise. */
#define ESSE(n) ((n) > 1 ? misc::str_esse : misc::str_nil)
namespace misc {
+
extern const char str_esse[];
extern const char str_nil[];
+
+ template <typename T>
+ struct address: public std::unary_function<T&, T*> {
+ T* operator()(T& ref) { return &ref; }
+ };
+
}
#endif // !MISC_H
#include <xbt/log.h>
#include "misc.h"
-XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(main);
namespace opt {
bool bookkeeping = false;
cost_func comp_cost("1e9, 0"); // fixme: find better defaults
+ cost_func comm_cost("1, 0"); // fixme: find better defaults
} // namespace opt
int c;
opterr = 0;
- while ((c = getopt(*argc, argv, "bc:ehi:l:V")) != -1) {
+ while ((c = getopt(*argc, argv, "bc:C:ehi:l:V")) != -1) {
switch (c) {
case 'b':
opt::bookkeeping = true;
case 'c':
opt::comp_cost = cost_func(optarg);
break;
+ case 'C':
+ opt::comm_cost = cost_func(optarg);
+ break;
case 'i':
std::istringstream(optarg) >> opt::maxiter;
break;
INFO1("| exit on close.......: %s", on_off(opt::exit_on_close));
INFO1("| bookkeeping.........: %s", on_off(opt::bookkeeping));
INFO1("| comp. cost factors..: [%s]", opt::comp_cost.to_string().c_str());
+ INFO1("| comm. cost factors..: [%s]", opt::comm_cost.to_string().c_str());
INFO0("`----");
}
std::clog << oo("-c", "[fn,...]f0")
<< "polynomial factors for computation cost ("
<< opt::comp_cost.to_string() << ")\n";
+ std::clog << oo("-C", "[fn,...]f0")
+ << "polynomial factors for communication cost ("
+ << opt::comm_cost.to_string() << ")\n";
std::clog << o("-e") << "exit on close reception\n";
std::clog << oo("-i", "value")
<< "maximum number of iterations, 0 for infinity ("
extern bool bookkeeping;
extern cost_func comp_cost;
+ extern cost_func comm_cost;
int parse_args(int* argc, char* argv[]);
void print();
#include <algorithm>
#include <tr1/functional>
#include <iterator>
+#include <numeric>
#include <stdexcept>
#include <sstream>
#include <xbt/log.h>
#include "misc.h"
#include "options.h"
-XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc);
process::process(int argc, char* argv[])
{
+ using namespace std::tr1;
+ using namespace std::tr1::placeholders;
+
if (argc < 2 || !(std::istringstream(argv[1]) >> load))
throw std::invalid_argument("bad or missing initial load");
neigh.assign(argv + 2, argv + argc);
std::for_each(neigh.begin(), neigh.end(),
- std::tr1::bind(&process::insert_neighbor_in_map,
- this, std::tr1::placeholders::_1));
+ bind(&process::insert_neighbor_in_map, this, _1));
+
+ pneigh.resize(neigh.size());
+ std::transform(neigh.begin(), neigh.end(), pneigh.begin(),
+ misc::address<neighbor>());
expected_load = load;
oss << ESSE(neigh.size()) << ": ";
std::transform(neigh.begin(), neigh.end() - 1,
std::ostream_iterator<const char*>(oss, ", "),
- std::tr1::mem_fn(&neighbor::get_name));
+ mem_fn(&neighbor::get_name));
oss << neigh.back().get_name();
}
LOG1(logp, "Got %s.", oss.str().c_str());
INFO1("Initial load: %g", load);
VERB0("Starting...");
+ // first send to inform neighbors about our load
+ send();
iter = 0;
while (one_more) {
++iter;
load -= load_balance(load);
send();
- comm.flush(false);
if (opt::exit_on_close && close_received)
one_more = false;
return 0;
}
-void process::compute()
+double process::sum_of_to_send() const
{
- // fixme: shall we do something special when duration is 0 ?
- double duration = opt::comp_cost(load);
- m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
- DEBUG2("compute %g flop%s.", duration, ESSE(duration));
- MSG_task_execute(task);
- MSG_task_destroy(task);
+ using namespace std::tr1;
+ using namespace std::tr1::placeholders;
+
+ return std::accumulate(neigh.begin(), neigh.end(), 0.0,
+ bind(std::plus<double>(),
+ _1, bind(&neighbor::get_to_send, _2)));
}
double process::load_balance(double /*my_load*/)
return 0.0;
}
+void process::compute()
+{
+ // fixme: shall we do something special when duration is 0 ?
+ double duration = opt::comp_cost(load);
+ if (duration > 0) {
+ m_task_t task = MSG_task_create("computation", duration, 0.0, NULL);
+ DEBUG2("compute %g flop%s.", duration, ESSE(duration));
+ MSG_task_execute(task);
+ MSG_task_destroy(task);
+ } else {
+ xbt_sleep(42);
+ // xbt_thread_yield();
+ }
+}
+
void process::send1_no_bookkeeping(neighbor& nb)
{
comm.send(nb.get_ctrl_mbox(), new message(message::INFO, load));
void process::send()
{
+ using namespace std::tr1;
+ using namespace std::tr1::placeholders;
+
// fixme: shall we send data at all iterations?
- if (opt::bookkeeping) {
+ if (opt::bookkeeping)
std::for_each(neigh.begin(), neigh.end(),
- std::tr1::bind(&process::send1_bookkeeping,
- this, std::tr1::placeholders::_1));
- } else {
+ bind(&process::send1_bookkeeping, this, _1));
+ else
std::for_each(neigh.begin(), neigh.end(),
- std::tr1::bind(&process::send1_no_bookkeeping,
- this, std::tr1::placeholders::_1));
- }
+ bind(&process::send1_no_bookkeeping, this, _1));
}
// Returns false if a CLOSE message was received.
void process::finalize()
{
+ using namespace std::tr1;
+ using namespace std::tr1::placeholders;
+
DEBUG2("send CLOSE to %d neighbor%s.",
(int )neigh.size(), ESSE(neigh.size()));
std::for_each(neigh.begin(), neigh.end(),
- std::tr1::bind(&process::finalize1,
- this, std::tr1::placeholders::_1));
+ bind(&process::finalize1, this, _1));
DEBUG2("wait for CLOSE from %d neighbor%s.",
(int )neigh.size(), ESSE(neigh.size()));
~process();
int run();
+protected:
+ typedef std::vector<neighbor> neigh_type;
+ typedef std::vector<neighbor*> pneigh_type;
+
+ neigh_type neigh;
+ pneigh_type pneigh;
+
private:
- std::vector<neighbor> neigh;
- MAP_TEMPLATE<m_host_t, neighbor*> rev_neigh;
- std::vector<neighbor*> pneigh;
+ typedef MAP_TEMPLATE<m_host_t, neighbor*> rev_neigh_type;
+
+ rev_neigh_type rev_neigh;
communicator comm;
int ctrl_close_pending;
double load;
double expected_load;
- void compute();
+ double sum_of_to_send() const;
virtual double load_balance(double my_load);
+
+ void compute();
void send1_no_bookkeeping(neighbor& nb);
void send1_bookkeeping(neighbor& nb);
void send();