From: Arnaud Giersch Date: Wed, 8 Dec 2010 23:27:00 +0000 (+0100) Subject: Wip++... X-Git-Tag: v0.1~252 X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/commitdiff_plain/6f5ec5fdc42f96a8fe95f4b846b163d4dc92e0c8 Wip++... * rework communication logic * add bookkeeping option * add simgrid_features.h --- diff --git a/Makefile b/Makefile index 4b0df07..8f484f1 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ SIMGRID_INSTALL_DIR := ./simgrid-3.5-install +#SIMGRID_INSTALL_DIR := ../simgrid-git-install OPTIM_FLAGS += -O3 DEBUG_FLAGS += -g @@ -32,6 +33,7 @@ SETLOCALVERSION := ./setlocalversion SRC.loba := main.cpp \ communicator.cpp \ cost_func.cpp \ + neighbor.cpp \ options.cpp \ process.cpp \ version.cpp diff --git a/communicator.cpp b/communicator.cpp index 0d724aa..93389c7 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -3,12 +3,14 @@ #include #include #include "communicator.h" +#include "simgrid_features.h" // XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu); XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu, "Messages from asynchronous pipes"); namespace { + bool comm_test_n_destroy(msg_comm_t& comm) { if (MSG_comm_test(comm)) { @@ -17,56 +19,118 @@ namespace { } else return false; } + } +class communicator::message { +public: + message(message_type t, double a): type(t), amount(a) { } + + message_type type; + double amount; +}; + communicator::communicator() { const char* hostname = MSG_host_get_name(MSG_host_self()); - size_t len = std::strlen(hostname); - recv_mbox = new char[len + 1]; - strcpy(recv_mbox, hostname); - recv_task = NULL; - recv_comm = MSG_task_irecv(&recv_task, recv_mbox); + ctrl_mbox = bprintf("%s_ctrl", hostname); + ctrl_task = NULL; + ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox); + + data_mbox = bprintf("%s_data", hostname); + data_task = NULL; + data_comm = MSG_task_irecv(&data_task, data_mbox); } communicator::~communicator() { - send_acknowledge(); + // fixme: don't know how to free pending communications + // (data_comm, ctrl_comm and sent_comm) + + free(data_mbox); + free(ctrl_mbox); + + flush_sent(); if (!sent_comm.empty()) WARN1("Lost %ld send communications!", (long )sent_comm.size()); - delete[] recv_mbox; } +void communicator::send_info(const neighbor& dest, double amount) +{ + message* msg = new message(INFO_MSG, amount); + m_task_t task = MSG_task_create("load msg", 0.0, sizeof *msg, msg); + send(dest.get_ctrl_mbox(), task); +} + +void communicator::send_credit(const neighbor& dest, double amount) +{ + message* msg = new message(CREDIT_MSG, amount); + m_task_t task = MSG_task_create("credit msg", 0.0, sizeof *msg, msg); + send(dest.get_ctrl_mbox(), task); +} + +void communicator::send_load(const neighbor& dest, double amount) +{ + m_task_t task = MSG_task_create("data msg", 0.0, amount, NULL); + send(dest.get_data_mbox(), task); +} -void communicator::send(m_task_t task, const char *dest) +void communicator::send(const char* dest, m_task_t task) { sent_comm.push_back(MSG_task_isend(task, dest)); - send_acknowledge(); + flush_sent(); } -void communicator::send(m_task_t task, const std::string& dest) +bool communicator::recv_info(double& amount, m_host_t& from) { - send(task, dest.c_str()); + return recv_ctrl(INFO_MSG, amount, from); +} + +bool communicator::recv_credit(double& amount, m_host_t& from) +{ + return recv_ctrl(CREDIT_MSG, amount, from); +} + +bool communicator::recv_load(double& amount, m_host_t& from) +{ + bool res = comm_test_n_destroy(data_comm); + if (res) { + amount = MSG_task_get_data_size(data_task); + from = MSG_task_get_source(data_task); + MSG_task_destroy(data_task); + data_task = NULL; + data_comm = MSG_task_irecv(&data_task, data_mbox); + } + return res; } -m_task_t communicator::recv() +bool communicator::recv_ctrl(message_type type, double& amount, m_host_t& from) { - m_task_t task = NULL; - if (comm_test_n_destroy(recv_comm)) { - task = recv_task; - recv_task = NULL; - recv_comm = MSG_task_irecv(&recv_task, recv_mbox); + bool res = MSG_comm_test(ctrl_comm); + if (res) { + message* msg = (message* )MSG_task_get_data(ctrl_task); + if (msg->type == type) { + MSG_comm_destroy(ctrl_comm); + amount = msg->amount; + from = MSG_task_get_source(ctrl_task); + delete msg; + MSG_task_destroy(ctrl_task); + ctrl_task = NULL; + ctrl_comm = MSG_task_irecv(&ctrl_task, ctrl_mbox); + } else { + res = false; + } } - return task; + return res; } -int communicator::sent_count() +int communicator::send_backlog() { - send_acknowledge(); + flush_sent(); return sent_comm.size(); } -void communicator::send_acknowledge() +void communicator::flush_sent() { std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy); } diff --git a/communicator.h b/communicator.h index ca0d3b2..f1ee1d9 100644 --- a/communicator.h +++ b/communicator.h @@ -4,27 +4,46 @@ #define COMMUNICATOR_H #include -#include + #include +#include +#include "neighbor.h" class communicator { public: communicator(); ~communicator(); - void send(m_task_t task, const char *dest); - void send(m_task_t task, const std::string& dest); - m_task_t recv(); - int sent_count(); + void send_info(const neighbor& dest, double amount); + void send_credit(const neighbor& dest, double amount); + void send_load(const neighbor& dest, double amount); + + bool recv_info(double& amount, m_host_t& from); + bool recv_credit(double& amount, m_host_t& from); + bool recv_load(double& amount, m_host_t& from); + + int send_backlog(); private: - typedef std::list comm_list; - comm_list sent_comm; - char* recv_mbox; - msg_comm_t recv_comm; - m_task_t recv_task; + enum message_type { INFO_MSG, CREDIT_MSG }; + class message; + + // List of pending send communications + std::list sent_comm; + + // Control channel for receiving + char* ctrl_mbox; + msg_comm_t ctrl_comm; + m_task_t ctrl_task; + + // Data channel for receiving + char* data_mbox; + msg_comm_t data_comm; + m_task_t data_task; - void send_acknowledge(); + bool recv_ctrl(message_type type, double& amount, m_host_t& from); + void send(const char* dest, m_task_t task); + void flush_sent(); }; #endif // !COMMUNICATOR_H diff --git a/cost_func.cpp b/cost_func.cpp index bdea62e..c3a463b 100644 --- a/cost_func.cpp +++ b/cost_func.cpp @@ -5,21 +5,21 @@ #include #include "cost_func.h" -cost_func::cost_func(const char *param) +cost_func::cost_func(const char* param) { int len = strlen(param); char tmpbuf[len + 1]; - char *tmp = tmpbuf; + char* tmp = tmpbuf; memcpy(tmp, param, len + 1); degree = std::count(tmp, tmp + len, ','); factor = new double[degree + 1]; for (int i = degree ; i > 0 ; i--) { - char *next = strchr(tmp, ','); + char* next = strchr(tmp, ','); *next++ = '\0'; - factor[i] = atof(tmp); + std::istringstream(tmp) >> factor[i]; tmp = next; } - factor[0] = atof(tmp); + std::istringstream(tmp) >> factor[0]; } cost_func::~cost_func() diff --git a/cost_func.h b/cost_func.h index 57d1188..ed97dcf 100644 --- a/cost_func.h +++ b/cost_func.h @@ -6,7 +6,7 @@ class cost_func { public: - cost_func(const char *param); + cost_func(const char* param); ~cost_func(); cost_func& operator=(const cost_func& ref); @@ -14,7 +14,7 @@ public: std::string to_string(); private: int degree; - double *factor; + double* factor; }; #endif // !COST_FUNC_H diff --git a/main.cpp b/main.cpp index 314cac2..d48bd87 100644 --- a/main.cpp +++ b/main.cpp @@ -20,13 +20,13 @@ enum { EXIT_FAILURE_CLEAN = 0x08, // error at cleanup }; -int simulation_main(int argc, char *argv[]) +int simulation_main(int argc, char* argv[]) { process proc(argc, argv); return proc.run(); } -int main(int argc, char *argv[]) +int main(int argc, char* argv[]) { // Note: variables used after THROW must be declared as volatile. volatile int exit_status = 0; // global exit status @@ -39,7 +39,7 @@ int main(int argc, char *argv[]) // Set default logging parameters // xbt_log_control_set("simu.thres:verbose"); - xbt_log_control_set("simu.fmt:'[%h %r] [%c/%p] %m%n'"); + xbt_log_control_set("simu.fmt:'[%h %r] [%c/%p] %m%n'"); // Initialize some MSG internal data. // Note: MSG_global_init() may throw an exception, but it seems @@ -73,7 +73,7 @@ int main(int argc, char *argv[]) MSG_create_environment(opt::platform_file); if (LOG_ISENABLED(xbt_log_priority_verbose)) { int n = MSG_get_host_number(); - m_host_t *h = MSG_get_host_table(); + m_host_t* h = MSG_get_host_table(); VERB1("Got %d hosts.", n); for (int i = 0; i < n; i++) VERB2("Host #%d named \"%s\".", i, MSG_host_get_name(h[i])); diff --git a/neighbor.cpp b/neighbor.cpp new file mode 100644 index 0000000..e7a4666 --- /dev/null +++ b/neighbor.cpp @@ -0,0 +1,18 @@ +#include +#include "neighbor.h" + +neighbor::neighbor(const char* hostname) +{ + load = std::numeric_limits::infinity(); + debt = 0.0; + name = xbt_strdup(hostname); + ctrl_mbox = bprintf("%s_ctrl", hostname); + data_mbox = bprintf("%s_data", hostname); +} + +neighbor::~neighbor() +{ + free(data_mbox); + free(ctrl_mbox); + free(name); +} diff --git a/neighbor.h b/neighbor.h index 13d1108..52a0ef4 100644 --- a/neighbor.h +++ b/neighbor.h @@ -6,17 +6,26 @@ class neighbor { public: - neighbor(const char* const name_) - : name(name_) - , load(std::numeric_limits::infinity()) { - } - const std::string& getName() const { return name; } - double getLoad() const { return load; } - void setLoad(double l) { load = l; } + neighbor(const char* hostname); + ~neighbor(); + + const char* get_name() const { return name; } + const char* get_ctrl_mbox() const { return ctrl_mbox; } + const char* get_data_mbox() const { return data_mbox; } + + double get_load() const { return load; } + void set_load(double l) { load = l; } + + double get_debt() const { return debt; } + void set_debt(double d) { debt = d; } private: - std::string name; + char* name; + char* ctrl_mbox; + char* data_mbox; + double load; + double debt; }; #endif // !NEIGHBOR_H diff --git a/options.cpp b/options.cpp index 24ae952..4dd5315 100644 --- a/options.cpp +++ b/options.cpp @@ -17,29 +17,39 @@ namespace opt { int help_requested = 0; bool version_requested = false; + bool bookkeeping = false; + cost_func comp_cost("1e9, 0"); // fixme: find better defaults - cost_func comm_cost("1e9, 0"); // fixme: find better defaults } // namespace opt +namespace { + + const char* on_off(bool b) + { + return b ? "on" : "off"; + } + +} + int opt::parse_args(int* argc, char* argv[]) { - char *tmp = strrchr(argv[0], '/'); + char* tmp = strrchr(argv[0], '/'); opt::program_name = (tmp ? tmp + 1 : argv[0]); int c; opterr = 0; - while ((c = getopt(*argc, argv, "hc:C:V")) != -1) { + while ((c = getopt(*argc, argv, "bc:hV")) != -1) { switch (c) { + case 'b': + opt::bookkeeping = true; + break; case 'h': opt::help_requested++; break; case 'c': opt::comp_cost = cost_func(optarg); break; - case 'C': - opt::comm_cost = cost_func(optarg); - break; case 'V': opt::version_requested = true; break; @@ -74,8 +84,8 @@ void opt::print() INFO0(",----[ Simulation parameters ]"); INFO1("| platform_file.......: \"%s\"", opt::platform_file); INFO1("| application_file....: \"%s\"", opt::application_file); - INFO1("| comp. cost factors..: [%s]", opt::comp_cost.to_string().c_str()); - INFO1("| comm. cost factors..: [%s]", opt::comm_cost.to_string().c_str()); + INFO1("| bookkeeping.........: %s", on_off(opt::bookkeeping)); + INFO1("| comp. cost factors..: [%s]", opt::comp_cost.to_string().c_str()); INFO0("`----"); } @@ -97,12 +107,10 @@ void opt::usage() if (opt::help_requested < 1) return; std::clog << o("-V") << "print version and exit\n"; + std::clog << o("-b") << "activate bookkeeping\n"; std::clog << oo("-c", "[fn,...]f0") << "polynomial factors for computation cost (" << opt::comp_cost.to_string() << ")\n"; - std::clog << oo("-C", "[fn,...]f0") - << "polynomial factors for communication cost (" - << opt::comm_cost.to_string() << ")\n"; #undef o #undef oo diff --git a/options.h b/options.h index 8f09896..06f059f 100644 --- a/options.h +++ b/options.h @@ -14,8 +14,9 @@ namespace opt { extern int help_requested; extern bool version_requested; + extern bool bookkeeping; + extern cost_func comp_cost; - extern cost_func comm_cost; int parse_args(int* argc, char* argv[]); void print(); diff --git a/process.cpp b/process.cpp index d1576ce..4ee3647 100644 --- a/process.cpp +++ b/process.cpp @@ -4,22 +4,19 @@ #include #include #include +#include #include "misc.h" #include "options.h" #include "process.h" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu); -struct message { - double measure; - double transfer; -}; - -process::process(int argc, char *argv[]) +process::process(int argc, char* argv[]) { if (argc < 2 || !(std::istringstream(argv[1]) >> load)) throw std::invalid_argument("bad or missing initial load"); neigh.assign(argv + 2, argv + argc); + expected_load = load; e_xbt_log_priority_t logp = xbt_log_priority_verbose; if (!LOG_ISENABLED(logp)) return; @@ -29,9 +26,9 @@ process::process(int argc, char *argv[]) if (!neigh.empty()) { oss << (neigh.size() > 1 ? "s: " : ": "); std::transform(neigh.begin(), neigh.end() - 1, - std::ostream_iterator(oss, ", "), - std::mem_fun_ref(&neighbor::getName)); - oss << neigh.back().getName(); + std::ostream_iterator(oss, ", "), + std::mem_fun_ref(&neighbor::get_name)); + oss << neigh.back().get_name(); } LOG1(logp, "Got %s.", oss.str().c_str()); print_loads(logp); @@ -43,7 +40,10 @@ int process::run() int n = 100; while (n--) { - compute(); + if (load > 0) + compute(); + else + xbt_sleep(0.5); // fixme receive(); } // MSG_process_sleep(100.0); // xxx @@ -69,16 +69,24 @@ int process::run() void process::receive() { - m_task_t task; - while ((task = comm.recv())) { - message *msg = (message *)MSG_task_get_data(task); - DEBUG3("Received load: %g, info: %g from %s", - msg->transfer, msg->measure, - MSG_host_get_name(MSG_task_get_source(task))); - load += msg->transfer; - // fixme: what is xxx ??? - // neigh[xxx].setLoad(msg->measure); - } + bool received; + do { + double amount; + m_host_t from; + received = false; + if (comm.recv_info(amount, from)) { + // fixme: update neighbor + received = true; + } + if (comm.recv_credit(amount, from)) { + expected_load += amount; + received = true; + } + if (comm.recv_load(amount, from)) { + load += amount; + received = true; + } + } while (received); } void process::compute() @@ -100,8 +108,8 @@ void process::print_loads(e_xbt_log_priority_t logp) } else { std::transform(neigh.begin(), neigh.end() - 1, std::ostream_iterator(oss, ", "), - std::mem_fun_ref(&neighbor::getLoad)); - oss << neigh.back().getLoad(); + std::mem_fun_ref(&neighbor::get_load)); + oss << neigh.back().get_load(); } LOG1(logp, "Neighbor loads: %s", oss.str().c_str()); } diff --git a/process.h b/process.h index aa1e032..586da55 100644 --- a/process.h +++ b/process.h @@ -8,7 +8,7 @@ class process { public: - process(int argc, char *argv[]); + process(int argc, char* argv[]); ~process() { }; int run(); @@ -16,6 +16,7 @@ private: communicator comm; std::vector neigh; double load; + double expected_load; void receive(); void compute(); diff --git a/simgrid_features.h b/simgrid_features.h new file mode 100644 index 0000000..8fa8d56 --- /dev/null +++ b/simgrid_features.h @@ -0,0 +1,11 @@ +#ifndef SIMGRID_FEATURES_H +#define SIMGRID_FEATURES_H + +// fixme: dirty hack +#if defined(XBT_RUNNING_CTX_INITIALIZER) +# define MSG_WAIT_DESTROYS_COMMS 0 +#else +# define MSG_WAIT_DESTROYS_COMMS 1 +#endif + +#endif // !SIMGRID_FEATURES_H diff --git a/simple_async.cpp b/simple_async.cpp index 2ad0cbc..8e04ba9 100644 --- a/simple_async.cpp +++ b/simple_async.cpp @@ -3,6 +3,7 @@ #include // clock() #include #include +#include "simgrid_features.h" // Creates a new log category and makes it the default XBT_LOG_NEW_DEFAULT_CATEGORY(simu, "Simulation messages"); @@ -19,11 +20,11 @@ enum { EXIT_FAILURE_CLEAN = 0x08, // error at cleanup }; -int sender(int, char *[]) +int sender(int, char* []) { char mbox_stack[N_MBOX][100]; msg_comm_t comm_stack[N_MBOX * N_MESG]; - msg_comm_t *comm = comm_stack; + msg_comm_t* pcomm = comm_stack; for (int i = 0 ; i < N_MBOX ; i++) sprintf(mbox_stack[i], "MBox_%02d", i); @@ -32,7 +33,7 @@ int sender(int, char *[]) for (int i = 0 ; i < N_MBOX ; i++) for (int j = 0 ; j < N_MESG ; j++) { char task_name[100]; - const char *mailbox = mbox_stack[i]; + const char* mailbox = mbox_stack[i]; unsigned shift = j; unsigned comm_size = 1 << shift; m_task_t task; @@ -42,18 +43,22 @@ int sender(int, char *[]) INFO4("At %02d, send %s, size %.0f to \"%s\"", n, MSG_task_get_name(task), MSG_task_get_data_size(task), mailbox); - *comm++ = MSG_task_isend(task, mailbox); + *pcomm++ = MSG_task_isend(task, mailbox); ++n; } INFO0("Wait for communications to terminate..."); - MSG_comm_waitall(comm_stack, comm - comm_stack, -1.0); + MSG_comm_waitall(comm_stack, pcomm - comm_stack, -1.0); + if (!MSG_WAIT_DESTROYS_COMMS) { + while (pcomm > comm_stack) + MSG_comm_destroy(*--pcomm); + } INFO0("Finished."); return 0; } -int receiver(int, char *[]) +int receiver(int, char* []) { char mbox[N_MBOX][100]; int comm_count[N_MBOX]; @@ -77,7 +82,7 @@ int receiver(int, char *[]) } } int n = 0; - while (xbt_dynar_length(dcomms)) { + while (!xbt_dynar_is_empty(dcomms)) { MSG_comm_waitany(dcomms); xbt_dynar_reset(dcomms); for (int i = 0 ; i < N_MBOX ; i++) { @@ -111,10 +116,10 @@ int receiver(int, char *[]) return 0; } -int main(int argc, char *argv[]) +int main(int argc, char* argv[]) { - const char *platform_file; - const char *application_file; + const char* platform_file; + const char* application_file; // Note: variables used after THROW must be declared as volatile. volatile int exit_status; // global exit status volatile double simulated_time = -1.0; diff --git a/timer.h b/timer.h index 0e8c7d9..dbf32e1 100644 --- a/timer.h +++ b/timer.h @@ -28,7 +28,7 @@ struct timeval operator+(const struct timeval& a, const struct timeval& b) inline struct timeval operator-(const struct timeval& a, const struct timeval& b) { - timeval result; + struct timeval result; result.tv_sec = a.tv_sec - b.tv_sec; result.tv_usec = a.tv_usec - b.tv_usec; if (result.tv_usec < 0) { @@ -38,7 +38,7 @@ struct timeval operator-(const struct timeval& a, const struct timeval& b) return result; } -double timertod(const timeval& a) +double timertod(const struct timeval& a) { return a.tv_sec + a.tv_usec / 1e6; }