SRC.loba := main.cpp \
communicator.cpp \
cost_func.cpp \
+ hostdata.cpp \
misc.cpp \
neighbor.cpp \
options.cpp \
initial_load [neighbors...]
Communications:
- - two channels per host: normal and low_latency
+ - two channels per host: control and data
-How shall we manage link failures ?
-
-Process model (?)
+Process model
while (there is something to do) {
compute some task;
nothing more to do.
- how to manage link failures?
-
- - shall we retrieve pending tasks?
- : Ideally, why shall. How? By using some acknowledgment?
--- /dev/null
+* implement loba_* algorithms (start with some trivial one)
+* add loba algorithm selection (-a number ?)
+
+* implement automatic process topology
+ (line, ring, star, btree, clique, hypercube, etc..)
+* implement automatic platform generation
+ (number of hosts, all connected, constant bandwidth/latency)
+#include "communicator.h"
+
#include <algorithm>
#include <tr1/functional>
#include <sstream>
#include <msg/msg.h>
#include <xbt/log.h>
-#include "communicator.h"
#include "simgrid_features.h"
#include "misc.h"
}
communicator::communicator()
+ : host((hostdata* )MSG_host_get_data(MSG_host_self()))
+ , ctrl_task(NULL)
+ , ctrl_comm(NULL)
+ , ctrl_close_is_last(false)
+ , data_task(NULL)
+ , data_comm(NULL)
+ , data_close_is_last(false)
{
- const char* hostname = MSG_host_get_name(MSG_host_self());
-
- ctrl_mbox = hostname;
- ctrl_mbox += "_ctrl";
- ctrl_task = NULL;
- ctrl_comm = NULL;
- ctrl_close_is_last = false;
-
- data_mbox = hostname;
- data_mbox += "_data";
- data_task = NULL;
- data_comm = NULL;
- data_close_is_last = false;
}
communicator::~communicator()
#include <list>
#include <string>
#include <msg/msg.h>
+#include "hostdata.h"
class message {
public:
void next_close_on_data_is_last();
private:
+ // Myself
+ const hostdata* host;
+
// List of pending send communications
std::list<msg_comm_t> sent_comm;
// Control channel for receiving
- std::string ctrl_mbox;
- msg_comm_t ctrl_comm;
m_task_t ctrl_task;
+ msg_comm_t ctrl_comm;
bool ctrl_close_is_last;
// Data channel for receiving
- std::string data_mbox;
- msg_comm_t data_comm;
m_task_t data_task;
+ msg_comm_t data_comm;
bool data_close_is_last;
- const char* get_ctrl_mbox() const { return ctrl_mbox.c_str(); }
- const char* get_data_mbox() const { return data_mbox.c_str(); }
+ const char* get_ctrl_mbox() const { return host->get_ctrl_mbox(); }
+ const char* get_data_mbox() const { return host->get_data_mbox(); }
static void comm_push_in_dynar(xbt_dynar_t dynar, msg_comm_t comm);
static bool comm_test_n_destroy(msg_comm_t comm);
+#include "cost_func.h"
+
#include <algorithm>
#include <cstdlib>
#include <cstring>
#include <iterator>
#include <sstream>
-#include "cost_func.h"
cost_func::cost_func(const char* param)
{
--- /dev/null
+#include "hostdata.h"
+
+#include <xbt/log.h>
+#include <stdexcept>
+
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
+
+hostdata* hostdata::instances = NULL;
+
+void hostdata::create()
+{
+ int nhosts = MSG_get_host_number();
+ m_host_t* host_list = MSG_get_host_table();
+ VERB1("Got %d hosts.", nhosts);
+ for (int i = 0; i < nhosts; i++) {
+ hostdata* h = new hostdata(host_list[i]);
+ MSG_host_set_data(host_list[i], h);
+ VERB2("Host #%d named \"%s\".", i, h->get_name());
+ h->next = instances;
+ instances = h;
+ }
+ xbt_free(host_list);
+}
+
+void hostdata::destroy()
+{
+ while (instances) {
+ hostdata* h = instances;
+ instances = h->next;
+ delete h;
+ }
+}
+
+hostdata::hostdata(m_host_t host)
+ : next(NULL)
+ , name(MSG_host_get_name(host))
+ , ctrl_mbox(std::string(name) + "_ctrl")
+ , data_mbox(std::string(name) + "_data")
+{
+}
+
+hostdata::~hostdata()
+{
+}
--- /dev/null
+#ifndef HOSTDATA_H
+#define HOSTDATA_H
+
+#include <string>
+#include <msg/msg.h>
+
+class hostdata {
+public:
+ static void create();
+ static void destroy();
+
+ hostdata(m_host_t host);
+ ~hostdata();
+
+ const char* get_name() const { return name; }
+ const char* get_ctrl_mbox() const { return ctrl_mbox.c_str(); }
+ const char* get_data_mbox() const { return data_mbox.c_str(); }
+
+private:
+ // linked list of hostdata's, used by create/destroy
+ static hostdata* instances;
+ hostdata* next;
+
+ const char* name;
+ std::string ctrl_mbox;
+ std::string data_mbox;
+};
+
+#endif // !HOSTDATA_H
+
+// Local variables:
+// mode: c++
+// End:
+++ /dev/null
-#ifndef LOAD_BALANCE_H
-#define LOAD_BALANCE_H
-
-class loba {
-public:
-
-};
-
-#endif // !LOAD_BALANCE_H
-
-// Local variables:
-// mode: c++
-// End:
#include <iostream>
#include <msg/msg.h>
#include <xbt/log.h>
+#include "hostdata.h"
#include "misc.h"
#include "options.h"
#include "process.h"
// Create the platform and the application.
MSG_create_environment(opt::platform_file);
- if (LOG_ISENABLED(xbt_log_priority_verbose)) {
- int n = MSG_get_host_number();
- m_host_t* h = MSG_get_host_table();
- VERB1("Got %d hosts.", n);
- for (int i = 0; i < n; i++)
- VERB2("Host #%d named \"%s\".", i, MSG_host_get_name(h[i]));
- xbt_free(h);
- }
+ hostdata::create();
MSG_launch_application(opt::application_file);
exit_status = EXIT_FAILURE_SIMU; // =====
}
// Clean the MSG simulation.
+ hostdata::destroy();
res = MSG_clean();
if (res != MSG_OK) {
ERROR1("MSG_clean() failed with status %#x", res);
#define LOG_ISENABLED(priority) \
(_XBT_LOG_ISENABLEDV((*_XBT_LOGV(default)), (priority)))
+/* Returns c-string "s" if n > 1, empty string "" otherwise. */
+#define ESSE(n) ((n) > 1 ? misc::str_esse : misc::str_nil)
namespace misc {
extern const char str_esse[];
extern const char str_nil[];
}
-/* Returns c-string "s" if n > 1, empty string "" otherwise. */
-#define ESSE(n) ((n) > 1 ? misc::str_esse : misc::str_nil)
#endif // !MISC_H
#include "neighbor.h"
+#include <limits>
+#include <msg/msg.h>
+
neighbor::neighbor(const char* hostname)
- : name(hostname)
- , ctrl_mbox(hostname)
- , data_mbox(hostname)
+ : host((hostdata* )MSG_host_get_data(MSG_get_host_by_name(hostname)))
, load(std::numeric_limits<double>::infinity())
, debt(0.0)
+ , to_send(0.0)
{
- ctrl_mbox += "_ctrl";
- data_mbox += "_data";
}
neighbor::~neighbor()
#ifndef NEIGHBOR_H
#define NEIGHBOR_H
-#include <limits>
-#include <string>
+#include <utility>
+#include "hostdata.h"
class neighbor {
public:
neighbor(const char* hostname);
~neighbor();
- const char* get_name() const { return name.c_str(); }
- const char* get_ctrl_mbox() const { return ctrl_mbox.c_str(); }
- const char* get_data_mbox() const { return data_mbox.c_str(); }
+ const char* get_name() const { return host->get_name(); }
+ const char* get_ctrl_mbox() const { return host->get_ctrl_mbox(); }
+ const char* get_data_mbox() const { return host->get_data_mbox(); }
- double get_load() const { return load; }
- void set_load(double l) { load = l; }
+ double get_load() const { return load; }
+ void set_load(double amount) { load = amount; }
- double get_debt() const { return debt; }
- void set_debt(double d) { debt = d; }
+ double get_debt() const { return debt; }
+ void set_debt(double amount) { debt = amount; }
+
+ double get_to_send() const { return to_send; }
+ void set_to_send(double amount) { to_send = amount; }
private:
- std::string name;
- std::string ctrl_mbox;
- std::string data_mbox;
+ const hostdata* host;
double load;
double debt;
+
+ double to_send;
};
#endif // !NEIGHBOR_H
+#include "options.h"
+
#include <cstring> // strrchr
#include <iomanip>
#include <iostream>
#include <sstream>
#include <unistd.h> // getopt
#include <xbt/log.h>
-#include "options.h"
#include "misc.h"
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
+#include "process.h"
+
#include <algorithm>
#include <tr1/functional>
#include <iterator>
#include <xbt/time.h>
#include "misc.h"
#include "options.h"
-#include "process.h"
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
throw std::invalid_argument("bad or missing initial load");
neigh.assign(argv + 2, argv + argc);
+
+ std::for_each(neigh.begin(), neigh.end(),
+ std::tr1::bind(&process::insert_neighbor_in_map,
+ this, std::tr1::placeholders::_1));
+
expected_load = load;
ctrl_close_pending = data_close_pending = neigh.size();
int process::run()
{
bool one_more = true;
- unsigned iter = 0;
INFO1("Initial load: %g", load);
VERB0("Starting...");
+ iter = 0;
while (one_more) {
- bool close_received;
++iter;
if (opt::log_rate && iter % opt::log_rate == 0) {
}
compute();
- close_received = !receive(false);
- /*
- * compute load balancing;
- * send tasks to neighbors;
- */
+ bool close_received = !receive(false);
+
+ if (opt::bookkeeping)
+ expected_load -= load_balance(expected_load);
+ else
+ load -= load_balance(load);
+ send();
comm.flush(false);
if (opt::exit_on_close && close_received)
MSG_task_destroy(task);
}
+double process::load_balance(double /*my_load*/)
+{
+ return 0.0;
+}
+
+void process::send1_no_bookkeeping(neighbor& nb)
+{
+ comm.send(nb.get_ctrl_mbox(), new message(message::INFO, load));
+ double load_to_send = nb.get_to_send();
+ if (load_to_send > 0.0) {
+ comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+ nb.set_to_send(0.0);
+ }
+}
+
+void process::send1_bookkeeping(neighbor& nb)
+{
+ comm.send(nb.get_ctrl_mbox(), new message(message::INFO, expected_load));
+ double load_to_send;
+ double new_debt;
+ double debt_to_send = nb.get_to_send();
+ if (debt_to_send > 0.0) {
+ comm.send(nb.get_ctrl_mbox(),
+ new message(message::CREDIT, debt_to_send));
+ nb.set_to_send(0.0);
+ new_debt = nb.get_debt() + debt_to_send;
+ } else {
+ new_debt = nb.get_debt();
+ }
+ if (load <= new_debt) {
+ load_to_send = load;
+ nb.set_debt(new_debt - load_to_send);
+ load = 0.0;
+ } else {
+ load_to_send = new_debt;
+ nb.set_debt(0.0);
+ load -= load_to_send;
+ }
+ if (load_to_send > 0.0)
+ comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
+}
+
+void process::send()
+{
+ // fixme: shall we send data at all iterations?
+ if (opt::bookkeeping) {
+ std::for_each(neigh.begin(), neigh.end(),
+ std::tr1::bind(&process::send1_bookkeeping,
+ this, std::tr1::placeholders::_1));
+ } else {
+ std::for_each(neigh.begin(), neigh.end(),
+ std::tr1::bind(&process::send1_no_bookkeeping,
+ this, std::tr1::placeholders::_1));
+ }
+}
// Returns false if a CLOSE message was received.
bool process::receive(bool wait_for_close)
DEBUG2("received %s from %s",
msg->to_string().c_str(), MSG_host_get_name(from));
switch (msg->get_type()) {
- case message::INFO:
- // fixme: update neighbor
- // need a map m_host_t -> neighbor&
+ case message::INFO: {
+ neighbor* n = rev_neigh[from];
+ n->set_load(msg->get_amount());
break;
+ }
case message::CREDIT:
expected_load += msg->get_amount();
break;
return result;
}
+void process::finalize1(neighbor& nb)
+{
+ comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
+ comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
+}
+
void process::finalize()
{
DEBUG2("send CLOSE to %d neighbor%s.",
(int )neigh.size(), ESSE(neigh.size()));
- std::vector<neighbor>::iterator n;
- for (n = neigh.begin() ; n != neigh.end() ; ++n) {
- comm.send(n->get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
- comm.send(n->get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
- }
+ std::for_each(neigh.begin(), neigh.end(),
+ std::tr1::bind(&process::finalize1,
+ this, std::tr1::placeholders::_1));
DEBUG2("wait for CLOSE from %d neighbor%s.",
(int )neigh.size(), ESSE(neigh.size()));
LOG1(logp, "Neighbor loads: %s", oss.str().c_str());
}
+void process::insert_neighbor_in_map(neighbor& nb)
+{
+ neighbor* nbp = &nb;
+ m_host_t host = MSG_get_host_by_name(nb.get_name());
+ rev_neigh.insert(std::make_pair(host, nbp));
+}
+
// Local variables:
// mode: c++
// End:
#ifndef PROCESS_H
#define PROCESS_H
+#include <map>
#include <vector>
-#include <xbt/log.h>
+#include <msg/msg.h>
#include "communicator.h"
#include "neighbor.h"
int run();
private:
- communicator comm;
std::vector<neighbor> neigh;
- double load;
- double expected_load;
+ std::map<m_host_t, neighbor*> rev_neigh;
+ communicator comm;
int ctrl_close_pending;
int data_close_pending;
+ unsigned iter;
+
+ double load;
+ double expected_load;
+
void compute();
+ virtual double load_balance(double my_load);
+ void send1_no_bookkeeping(neighbor& nb);
+ void send1_bookkeeping(neighbor& nb);
+ void send();
bool receive(bool wait_for_close);
+ void finalize1(neighbor& nb);
void finalize();
void print_loads(e_xbt_log_priority_t logp = xbt_log_priority_info);
+
+ void insert_neighbor_in_map(neighbor& nb);
};
#endif // !PROCESS_H