SIMGRID_INSTALL_DIR := ./simgrid-3.5-install
+#SIMGRID_INSTALL_DIR := ../simgrid-git-install
OPTIM_FLAGS += -O3
DEBUG_FLAGS += -g
SRC.loba := main.cpp \
communicator.cpp \
cost_func.cpp \
+ neighbor.cpp \
options.cpp \
process.cpp \
version.cpp
#include <msg/msg.h>
#include <xbt/log.h>
#include "communicator.h"
+#include "simgrid_features.h"
// XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
"Messages from asynchronous pipes");
namespace {
+
bool comm_test_n_destroy(msg_comm_t& comm)
{
if (MSG_comm_test(comm)) {
} else
return false;
}
+
}
+class communicator::message {
+public:
+ message(message_type t, double a): type(t), amount(a) { }
+
+ message_type type;
+ double amount;
+};
+
communicator::communicator()
{
const char* hostname = MSG_host_get_name(MSG_host_self());
- size_t len = std::strlen(hostname);
- recv_mbox = new char[len + 1];
- strcpy(recv_mbox, hostname);
- recv_task = NULL;
- recv_comm = MSG_task_irecv(&recv_task, recv_mbox);
+ ctrl_mbox = bprintf("%s_ctrl", hostname);
+ ctrl_task = NULL;
+ ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
+
+ data_mbox = bprintf("%s_data", hostname);
+ data_task = NULL;
+ data_comm = MSG_task_irecv(&data_task, data_mbox);
}
communicator::~communicator()
{
- send_acknowledge();
+ // fixme: don't know how to free pending communications
+ // (data_comm, ctrl_comm and sent_comm)
+
+ free(data_mbox);
+ free(ctrl_mbox);
+
+ flush_sent();
if (!sent_comm.empty())
WARN1("Lost %ld send communications!", (long )sent_comm.size());
- delete[] recv_mbox;
}
+void communicator::send_info(const neighbor& dest, double amount)
+{
+ message* msg = new message(INFO_MSG, amount);
+ m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg);
+ send(dest.get_ctrl_mbox(), task);
+}
+
+void communicator::send_credit(const neighbor& dest, double amount)
+{
+ message* msg = new message(CREDIT_MSG, amount);
+ m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg);
+ send(dest.get_ctrl_mbox(), task);
+}
+
+void communicator::send_load(const neighbor& dest, double amount)
+{
+ m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL);
+ send(dest.get_data_mbox(), task);
+}
-void communicator::send(m_task_t task, const char *dest)
+void communicator::send(const char* dest, m_task_t task)
{
sent_comm.push_back(MSG_task_isend(task, dest));
- send_acknowledge();
+ flush_sent();
}
-void communicator::send(m_task_t task, const std::string& dest)
+bool communicator::recv_info(double& amount, m_host_t& from)
{
- send(task, dest.c_str());
+ return recv_ctrl(INFO_MSG, amount, from);
+}
+
+bool communicator::recv_credit(double& amount, m_host_t& from)
+{
+ return recv_ctrl(CREDIT_MSG, amount, from);
+}
+
+bool communicator::recv_load(double& amount, m_host_t& from)
+{
+ bool res = comm_test_n_destroy(data_comm);
+ if (res) {
+ amount = MSG_task_get_data_size(data_task);
+ from = MSG_task_get_source(data_task);
+ MSG_task_destroy(data_task);
+ data_task = NULL;
+ data_comm = MSG_task_irecv(&data_task, data_mbox);
+ }
+ return res;
}
-m_task_t communicator::recv()
+bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from)
{
- m_task_t task = NULL;
- if (comm_test_n_destroy(recv_comm)) {
- task = recv_task;
- recv_task = NULL;
- recv_comm = MSG_task_irecv(&recv_task, recv_mbox);
+ bool res = MSG_comm_test(ctrl_comm);
+ if (res) {
+ message* msg = (message* )MSG_task_get_data(ctrl_task);
+ if (msg->type == type) {
+ MSG_comm_destroy(ctrl_comm);
+ amount = msg->amount;
+ from = MSG_task_get_source(ctrl_task);
+ delete msg;
+ MSG_task_destroy(ctrl_task);
+ ctrl_task = NULL;
+ ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox);
+ } else {
+ res = false;
+ }
}
- return task;
+ return res;
}
-int communicator::sent_count()
+int communicator::send_backlog()
{
- send_acknowledge();
+ flush_sent();
return sent_comm.size();
}
-void communicator::send_acknowledge()
+void communicator::flush_sent()
{
std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy);
}
#define COMMUNICATOR_H
#include <list>
-#include <string>
+
#include <msg/msg.h>
+#include <xbt/sysdep.h>
+#include "neighbor.h"
class communicator {
public:
communicator();
~communicator();
- void send(m_task_t task, const char *dest);
- void send(m_task_t task, const std::string& dest);
- m_task_t recv();
- int sent_count();
+ void send_info(const neighbor& dest, double amount);
+ void send_credit(const neighbor& dest, double amount);
+ void send_load(const neighbor& dest, double amount);
+
+ bool recv_info(double& amount, m_host_t& from);
+ bool recv_credit(double& amount, m_host_t& from);
+ bool recv_load(double& amount, m_host_t& from);
+
+ int send_backlog();
private:
- typedef std::list<msg_comm_t> comm_list;
- comm_list sent_comm;
- char* recv_mbox;
- msg_comm_t recv_comm;
- m_task_t recv_task;
+ enum message_type { INFO_MSG, CREDIT_MSG };
+ class message;
+
+ // List of pending send communications
+ std::list<msg_comm_t> sent_comm;
+
+ // Control channel for receiving
+ char* ctrl_mbox;
+ msg_comm_t ctrl_comm;
+ m_task_t ctrl_task;
+
+ // Data channel for receiving
+ char* data_mbox;
+ msg_comm_t data_comm;
+ m_task_t data_task;
- void send_acknowledge();
+ bool recv_ctrl(message_type type, double& amount, m_host_t& from);
+ void send(const char* dest, m_task_t task);
+ void flush_sent();
};
#endif // !COMMUNICATOR_H
#include <sstream>
#include "cost_func.h"
-cost_func::cost_func(const char *param)
+cost_func::cost_func(const char* param)
{
int len = strlen(param);
char tmpbuf[len + 1];
- char *tmp = tmpbuf;
+ char* tmp = tmpbuf;
memcpy(tmp, param, len + 1);
degree = std::count(tmp, tmp + len, ',');
factor = new double[degree + 1];
for (int i = degree ; i > 0 ; i--) {
- char *next = strchr(tmp, ',');
+ char* next = strchr(tmp, ',');
*next++ = '\0';
- factor[i] = atof(tmp);
+ std::istringstream(tmp) >> factor[i];
tmp = next;
}
- factor[0] = atof(tmp);
+ std::istringstream(tmp) >> factor[0];
}
cost_func::~cost_func()
class cost_func {
public:
- cost_func(const char *param);
+ cost_func(const char* param);
~cost_func();
cost_func& operator=(const cost_func& ref);
std::string to_string();
private:
int degree;
- double *factor;
+ double* factor;
};
#endif // !COST_FUNC_H
EXIT_FAILURE_CLEAN = 0x08, // error at cleanup
};
-int simulation_main(int argc, char *argv[])
+int simulation_main(int argc, char* argv[])
{
process proc(argc, argv);
return proc.run();
}
-int main(int argc, char *argv[])
+int main(int argc, char* argv[])
{
// Note: variables used after THROW must be declared as volatile.
volatile int exit_status = 0; // global exit status
// Set default logging parameters
// xbt_log_control_set("simu.thres:verbose");
- xbt_log_control_set("simu.fmt:'[%h %r] [%c/%p] %m%n'");
+ xbt_log_control_set("simu.fmt:'[%h %r] [%c/%p] %m%n'");
// Initialize some MSG internal data.
// Note: MSG_global_init() may throw an exception, but it seems
MSG_create_environment(opt::platform_file);
if (LOG_ISENABLED(xbt_log_priority_verbose)) {
int n = MSG_get_host_number();
- m_host_t *h = MSG_get_host_table();
+ m_host_t* h = MSG_get_host_table();
VERB1("Got %d hosts.", n);
for (int i = 0; i < n; i++)
VERB2("Host #%d named \"%s\".", i, MSG_host_get_name(h[i]));
--- /dev/null
+#include <xbt/sysdep.h>
+#include "neighbor.h"
+
+neighbor::neighbor(const char* hostname)
+{
+ load = std::numeric_limits<double>::infinity();
+ debt = 0.0;
+ name = xbt_strdup(hostname);
+ ctrl_mbox = bprintf("%s_ctrl", hostname);
+ data_mbox = bprintf("%s_data", hostname);
+}
+
+neighbor::~neighbor()
+{
+ free(data_mbox);
+ free(ctrl_mbox);
+ free(name);
+}
class neighbor {
public:
- neighbor(const char* const name_)
- : name(name_)
- , load(std::numeric_limits<double>::infinity()) {
- }
- const std::string& getName() const { return name; }
- double getLoad() const { return load; }
- void setLoad(double l) { load = l; }
+ neighbor(const char* hostname);
+ ~neighbor();
+
+ const char* get_name() const { return name; }
+ const char* get_ctrl_mbox() const { return ctrl_mbox; }
+ const char* get_data_mbox() const { return data_mbox; }
+
+ double get_load() const { return load; }
+ void set_load(double l) { load = l; }
+
+ double get_debt() const { return debt; }
+ void set_debt(double d) { debt = d; }
private:
- std::string name;
+ char* name;
+ char* ctrl_mbox;
+ char* data_mbox;
+
double load;
+ double debt;
};
#endif // !NEIGHBOR_H
int help_requested = 0;
bool version_requested = false;
+ bool bookkeeping = false;
+
cost_func comp_cost("1e9, 0"); // fixme: find better defaults
- cost_func comm_cost("1e9, 0"); // fixme: find better defaults
} // namespace opt
+namespace {
+
+ const char* on_off(bool b)
+ {
+ return b ? "on" : "off";
+ }
+
+}
+
int opt::parse_args(int* argc, char* argv[])
{
- char *tmp = strrchr(argv[0], '/');
+ char* tmp = strrchr(argv[0], '/');
opt::program_name = (tmp ? tmp + 1 : argv[0]);
int c;
opterr = 0;
- while ((c = getopt(*argc, argv, "hc:C:V")) != -1) {
+ while ((c = getopt(*argc, argv, "bc:hV")) != -1) {
switch (c) {
+ case 'b':
+ opt::bookkeeping = true;
+ break;
case 'h':
opt::help_requested++;
break;
case 'c':
opt::comp_cost = cost_func(optarg);
break;
- case 'C':
- opt::comm_cost = cost_func(optarg);
- break;
case 'V':
opt::version_requested = true;
break;
INFO0(",----[ Simulation parameters ]");
INFO1("| platform_file.......: \"%s\"", opt::platform_file);
INFO1("| application_file....: \"%s\"", opt::application_file);
- INFO1("| comp. cost factors..: [%s]", opt::comp_cost.to_string().c_str());
- INFO1("| comm. cost factors..: [%s]", opt::comm_cost.to_string().c_str());
+ INFO1("| bookkeeping.........: %s", on_off(opt::bookkeeping));
+ INFO1("| comp. cost factors..: [%s]", opt::comp_cost.to_string().c_str());
INFO0("`----");
}
if (opt::help_requested < 1)
return;
std::clog << o("-V") << "print version and exit\n";
+ std::clog << o("-b") << "activate bookkeeping\n";
std::clog << oo("-c", "[fn,...]f0")
<< "polynomial factors for computation cost ("
<< opt::comp_cost.to_string() << ")\n";
- std::clog << oo("-C", "[fn,...]f0")
- << "polynomial factors for communication cost ("
- << opt::comm_cost.to_string() << ")\n";
#undef o
#undef oo
extern int help_requested;
extern bool version_requested;
+ extern bool bookkeeping;
+
extern cost_func comp_cost;
- extern cost_func comm_cost;
int parse_args(int* argc, char* argv[]);
void print();
#include <stdexcept>
#include <sstream>
#include <xbt/log.h>
+#include <xbt/time.h>
#include "misc.h"
#include "options.h"
#include "process.h"
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
-struct message {
- double measure;
- double transfer;
-};
-
-process::process(int argc, char *argv[])
+process::process(int argc, char* argv[])
{
if (argc < 2 || !(std::istringstream(argv[1]) >> load))
throw std::invalid_argument("bad or missing initial load");
neigh.assign(argv + 2, argv + argc);
+ expected_load = load;
e_xbt_log_priority_t logp = xbt_log_priority_verbose;
if (!LOG_ISENABLED(logp))
return;
if (!neigh.empty()) {
oss << (neigh.size() > 1 ? "s: " : ": ");
std::transform(neigh.begin(), neigh.end() - 1,
- std::ostream_iterator<std::string>(oss, ", "),
- std::mem_fun_ref(&neighbor::getName));
- oss << neigh.back().getName();
+ std::ostream_iterator<const char*>(oss, ", "),
+ std::mem_fun_ref(&neighbor::get_name));
+ oss << neigh.back().get_name();
}
LOG1(logp, "Got %s.", oss.str().c_str());
print_loads(logp);
int n = 100;
while (n--) {
- compute();
+ if (load > 0)
+ compute();
+ else
+ xbt_sleep(0.5); // fixme
receive();
}
// MSG_process_sleep(100.0); // xxx
void process::receive()
{
- m_task_t task;
- while ((task = comm.recv())) {
- message *msg = (message *)MSG_task_get_data(task);
- DEBUG3("Received load: %g, info: %g from %s",
- msg->transfer, msg->measure,
- MSG_host_get_name(MSG_task_get_source(task)));
- load += msg->transfer;
- // fixme: what is xxx ???
- // neigh[xxx].setLoad(msg->measure);
- }
+ bool received;
+ do {
+ double amount;
+ m_host_t from;
+ received = false;
+ if (comm.recv_info(amount, from)) {
+ // fixme: update neighbor
+ received = true;
+ }
+ if (comm.recv_credit(amount, from)) {
+ expected_load += amount;
+ received = true;
+ }
+ if (comm.recv_load(amount, from)) {
+ load += amount;
+ received = true;
+ }
+ } while (received);
}
void process::compute()
} else {
std::transform(neigh.begin(), neigh.end() - 1,
std::ostream_iterator<double>(oss, ", "),
- std::mem_fun_ref(&neighbor::getLoad));
- oss << neigh.back().getLoad();
+ std::mem_fun_ref(&neighbor::get_load));
+ oss << neigh.back().get_load();
}
LOG1(logp, "Neighbor loads: %s", oss.str().c_str());
}
class process {
public:
- process(int argc, char *argv[]);
+ process(int argc, char* argv[]);
~process() { };
int run();
communicator comm;
std::vector<neighbor> neigh;
double load;
+ double expected_load;
void receive();
void compute();
--- /dev/null
+#ifndef SIMGRID_FEATURES_H
+#define SIMGRID_FEATURES_H
+
+// fixme: dirty hack
+#if defined(XBT_RUNNING_CTX_INITIALIZER)
+# define MSG_WAIT_DESTROYS_COMMS 0
+#else
+# define MSG_WAIT_DESTROYS_COMMS 1
+#endif
+
+#endif // !SIMGRID_FEATURES_H
#include <time.h> // clock()
#include <msg/msg.h>
#include <xbt/log.h>
+#include "simgrid_features.h"
// Creates a new log category and makes it the default
XBT_LOG_NEW_DEFAULT_CATEGORY(simu, "Simulation messages");
EXIT_FAILURE_CLEAN = 0x08, // error at cleanup
};
-int sender(int, char *[])
+int sender(int, char* [])
{
char mbox_stack[N_MBOX][100];
msg_comm_t comm_stack[N_MBOX * N_MESG];
- msg_comm_t *comm = comm_stack;
+ msg_comm_t* pcomm = comm_stack;
for (int i = 0 ; i < N_MBOX ; i++)
sprintf(mbox_stack[i], "MBox_%02d", i);
for (int i = 0 ; i < N_MBOX ; i++)
for (int j = 0 ; j < N_MESG ; j++) {
char task_name[100];
- const char *mailbox = mbox_stack[i];
+ const char* mailbox = mbox_stack[i];
unsigned shift = j;
unsigned comm_size = 1 << shift;
m_task_t task;
INFO4("At %02d, send %s, size %.0f to \"%s\"", n,
MSG_task_get_name(task),
MSG_task_get_data_size(task), mailbox);
- *comm++ = MSG_task_isend(task, mailbox);
+ *pcomm++ = MSG_task_isend(task, mailbox);
++n;
}
INFO0("Wait for communications to terminate...");
- MSG_comm_waitall(comm_stack, comm - comm_stack, -1.0);
+ MSG_comm_waitall(comm_stack, pcomm - comm_stack, -1.0);
+ if (!MSG_WAIT_DESTROYS_COMMS) {
+ while (pcomm > comm_stack)
+ MSG_comm_destroy(*--pcomm);
+ }
INFO0("Finished.");
return 0;
}
-int receiver(int, char *[])
+int receiver(int, char* [])
{
char mbox[N_MBOX][100];
int comm_count[N_MBOX];
}
}
int n = 0;
- while (xbt_dynar_length(dcomms)) {
+ while (!xbt_dynar_is_empty(dcomms)) {
MSG_comm_waitany(dcomms);
xbt_dynar_reset(dcomms);
for (int i = 0 ; i < N_MBOX ; i++) {
return 0;
}
-int main(int argc, char *argv[])
+int main(int argc, char* argv[])
{
- const char *platform_file;
- const char *application_file;
+ const char* platform_file;
+ const char* application_file;
// Note: variables used after THROW must be declared as volatile.
volatile int exit_status; // global exit status
volatile double simulated_time = -1.0;
inline
struct timeval operator-(const struct timeval& a, const struct timeval& b)
{
- timeval result;
+ struct timeval result;
result.tv_sec = a.tv_sec - b.tv_sec;
result.tv_usec = a.tv_usec - b.tv_usec;
if (result.tv_usec < 0) {
return result;
}
-double timertod(const timeval& a)
+double timertod(const struct timeval& a)
{
return a.tv_sec + a.tv_usec / 1e6;
}