From: Arnaud Giersch Date: Fri, 10 Dec 2010 22:42:30 +0000 (+0100) Subject: Wip++... X-Git-Tag: v0.1~249 X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/commitdiff_plain/5279f43fc9be71815091e79c76d5bc04c8c88421 Wip++... * improve communicator logic * add options maxiter and exit_on_close * add valgrind suppressions file --- diff --git a/communicator.cpp b/communicator.cpp index 45b52ef..0c64b5b 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include "communicator.h" @@ -11,6 +11,15 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu, "Messages from asynchronous pipes"); +std::string message::to_string() +{ + static const char* str[] = { "INFO", "CREDIT", "LOAD", + "CTRL_CLOSE", "DATA_CLOSE" }; + std::ostringstream oss; + oss << str[type] << " (" << amount << ")"; + return oss.str(); +} + communicator::communicator() { const char* hostname = MSG_host_get_name(MSG_host_self()); @@ -18,13 +27,13 @@ communicator::communicator() ctrl_mbox = hostname; ctrl_mbox += "_ctrl"; ctrl_task = NULL; - ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox()); + ctrl_comm = NULL; ctrl_close_is_last = false; data_mbox = hostname; data_mbox += "_data"; data_task = NULL; - data_comm = MSG_task_irecv(&data_task, get_data_mbox()); + data_comm = NULL; data_close_is_last = false; } @@ -39,14 +48,20 @@ communicator::~communicator() (long )sent_comm.size(), ESSE(sent_comm.size())); } +void communicator::listen() +{ + ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox()); + data_comm = MSG_task_irecv(&data_task, get_data_mbox()); +} + void communicator::send(const char* dest, message* msg) { double msg_size = sizeof *msg; if (msg->get_type() == message::LOAD) msg_size += msg->get_amount(); m_task_t task = MSG_task_create("message", 0.0, msg_size, msg); - sent_comm.push_back(MSG_task_isend(task, dest)); - flush_sent(); + msg_comm_t comm = MSG_task_isend(task, dest); + sent_comm.push_back(comm); } bool communicator::recv(message*& msg, m_host_t& from, bool wait) @@ -62,8 +77,8 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait) ctrl_task = NULL; ctrl_comm = (!ctrl_close_is_last || msg->get_type() != message::CTRL_CLOSE) - ? ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox()) - : ctrl_comm = NULL; + ? MSG_task_irecv(&ctrl_task, get_ctrl_mbox()) + : NULL; } else if (data_comm && comm_test_n_destroy(data_comm)) { msg = (message* )MSG_task_get_data(data_task); @@ -72,8 +87,8 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait) data_task = NULL; data_comm = (!data_close_is_last || msg->get_type() != message::DATA_CLOSE) - ? data_comm = MSG_task_irecv(&data_task, get_data_mbox()) - : data_comm = NULL; + ? MSG_task_irecv(&data_task, get_data_mbox()) + : NULL; } restart = wait && !msg && (ctrl_comm || data_comm); @@ -91,18 +106,21 @@ bool communicator::recv(message*& msg, m_host_t& from, bool wait) return msg != NULL; } -void communicator::wait_for_sent() +void communicator::flush(bool wait) { - xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL); - while (!sent_comm.empty()) { - std::for_each(sent_comm.begin(), sent_comm.end(), - std::tr1::bind(xbt_dynar_push, - comms, std::tr1::placeholders::_1)); - MSG_comm_waitany(comms); - xbt_dynar_reset(comms); - flush_sent(); + sent_comm.remove_if(comm_test_n_destroy); + if (wait && !sent_comm.empty()) { + xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL); + while (!sent_comm.empty()) { + std::for_each(sent_comm.begin(), sent_comm.end(), + std::tr1::bind(comm_push_in_dynar, + comms, std::tr1::placeholders::_1)); + MSG_comm_waitany(comms); + xbt_dynar_reset(comms); + sent_comm.remove_if(comm_test_n_destroy); + } + xbt_dynar_free(&comms); } - xbt_dynar_free(&comms); } void communicator::next_close_on_ctrl_is_last() @@ -115,13 +133,12 @@ void communicator::next_close_on_data_is_last() data_close_is_last = true; } -int communicator::send_backlog() +void communicator::comm_push_in_dynar(xbt_dynar_t dynar, msg_comm_t comm) { - flush_sent(); - return sent_comm.size(); + xbt_dynar_push(dynar, &comm); } -bool communicator::comm_test_n_destroy(msg_comm_t& comm) +bool communicator::comm_test_n_destroy(msg_comm_t comm) { if (MSG_comm_test(comm)) { MSG_comm_destroy(comm); @@ -130,11 +147,6 @@ bool communicator::comm_test_n_destroy(msg_comm_t& comm) return false; } -void communicator::flush_sent() -{ - std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy); -} - // Local variables: // mode: c++ // End: diff --git a/communicator.h b/communicator.h index ea096d6..1e01677 100644 --- a/communicator.h +++ b/communicator.h @@ -16,6 +16,8 @@ public: message_type get_type() const { return type; } double get_amount() const { return amount; } + std::string to_string(); + private: message_type type; double amount; @@ -26,15 +28,15 @@ public: communicator(); ~communicator(); + void listen(); + void send(const char* dest, message* msg); bool recv(message*& msg, m_host_t& from, bool wait); - void wait_for_sent(); + void flush(bool wait); void next_close_on_ctrl_is_last(); void next_close_on_data_is_last(); - int send_backlog(); - private: // List of pending send communications std::list sent_comm; @@ -53,8 +55,9 @@ private: const char* get_ctrl_mbox() const { return ctrl_mbox.c_str(); } const char* get_data_mbox() const { return data_mbox.c_str(); } - static bool comm_test_n_destroy(msg_comm_t& comm); - void flush_sent(); + + static void comm_push_in_dynar(xbt_dynar_t dynar, msg_comm_t comm); + static bool comm_test_n_destroy(msg_comm_t comm); }; #endif // !COMMUNICATOR_H diff --git a/options.cpp b/options.cpp index d51fee2..18d4613 100644 --- a/options.cpp +++ b/options.cpp @@ -1,6 +1,8 @@ -#include +#include // strrchr +#include #include -#include +#include +#include // getopt #include #include "options.h" #include "misc.h" @@ -17,6 +19,9 @@ namespace opt { int help_requested = 0; bool version_requested = false; + unsigned maxiter = 4; + bool exit_on_close = false; + bool bookkeeping = false; cost_func comp_cost("1e9, 0"); // fixme: find better defaults @@ -41,17 +46,23 @@ int opt::parse_args(int* argc, char* argv[]) int c; opterr = 0; - while ((c = getopt(*argc, argv, "bc:hV")) != -1) { + while ((c = getopt(*argc, argv, "bc:ehi:V")) != -1) { switch (c) { case 'b': opt::bookkeeping = true; break; + case 'e': + opt::exit_on_close = true; + break; case 'h': opt::help_requested++; break; case 'c': opt::comp_cost = cost_func(optarg); break; + case 'i': + std::istringstream(optarg) >> opt::maxiter; + break; case 'V': opt::version_requested = true; break; @@ -92,12 +103,13 @@ void opt::print() INFO0(",----[ Simulation parameters ]"); INFO1("| platform_file.......: \"%s\"", opt::platform_file); INFO1("| application_file....: \"%s\"", opt::application_file); + INFO1("| maxiter.............: %u", opt::maxiter); + INFO1("| exit on close.......: %s", on_off(opt::exit_on_close)); INFO1("| bookkeeping.........: %s", on_off(opt::bookkeeping)); INFO1("| comp. cost factors..: [%s]", opt::comp_cost.to_string().c_str()); INFO0("`----"); } -#include void opt::usage() { const int indent1 = 6; @@ -116,10 +128,15 @@ void opt::usage() return; std::clog << o("-V") << "print version and exit\n"; + std::clog << o("-b") << "activate bookkeeping\n"; std::clog << oo("-c", "[fn,...]f0") << "polynomial factors for computation cost (" << opt::comp_cost.to_string() << ")\n"; + std::clog << o("-e") << "exit on close reception\n"; + std::clog << oo("-i", "value") + << "maximum number of iterations, 0 for infinity (" + << opt::maxiter << ")\n"; #undef o #undef oo diff --git a/options.h b/options.h index 06f059f..43286f3 100644 --- a/options.h +++ b/options.h @@ -14,6 +14,9 @@ namespace opt { extern int help_requested; extern bool version_requested; + extern unsigned maxiter; + extern bool exit_on_close; + extern bool bookkeeping; extern cost_func comp_cost; diff --git a/process.cpp b/process.cpp index e9e2348..d8ec500 100644 --- a/process.cpp +++ b/process.cpp @@ -18,7 +18,14 @@ process::process(int argc, char* argv[]) neigh.assign(argv + 2, argv + argc); expected_load = load; + ctrl_close_pending = data_close_pending = neigh.size(); + if (neigh.size() == 1) { + comm.next_close_on_ctrl_is_last(); + comm.next_close_on_data_is_last(); + } + if (neigh.size() > 0) + comm.listen(); e_xbt_log_priority_t logp = xbt_log_priority_verbose; if (!LOG_ISENABLED(logp)) @@ -39,49 +46,51 @@ process::process(int argc, char* argv[]) int process::run() { - INFO0("Coucou !"); + bool one_more = true; + unsigned iter = 0; + VERB0("Starting..."); + while (one_more) { + bool close_received; - int n = 100; - while (n--) { if (opt::bookkeeping) - INFO2("current load: %g ; expected: %g", load, expected_load); + INFO3("(%u) current load: %g ; expected: %g", + iter, load, expected_load); else - INFO1("current load: %g", load); + INFO2("(%u) current load: %g", + iter, load); - if (load > 0) - compute(); - else - xbt_sleep(100); // fixme - if (!receive(false)) - n = 0; + compute(); + close_received = !receive(false); + + /* + * compute load balancing; + * send tasks to neighbors; + */ + + comm.flush(false); + ++iter; + + if (opt::exit_on_close && close_received) + one_more = false; + if (opt::maxiter && iter >= opt::maxiter) + one_more = false; } - DEBUG0("going to finalize."); + VERB0("Going to finalize..."); finalize(); - // MSG_process_sleep(100.0); // xxx - /* xxx: - * while (there is something to do) { - * compute some task; - * get received tasks; - * compute load balancing; - * send tasks to neighbors; - * } - * finalize; - * wait for pending messages; - */ - /* Open Questions : * - definition of load on heterogeneous hosts ? * - how to detect convergence ? * - how to manage link failures ? */ - DEBUG0("done."); + VERB0("Done."); return 0; } void process::compute() { + // fixme: shall we do something special when duration is 0 ? double duration = opt::comp_cost(load); m_task_t task = MSG_task_create("computation", duration, 0.0, NULL); DEBUG2("compute %g flop%s.", duration, ESSE(duration)); @@ -89,6 +98,8 @@ void process::compute() MSG_task_destroy(task); } + +// Returns false if a CLOSE message was received. bool process::receive(bool wait_for_close) { bool result = true; @@ -96,30 +107,29 @@ bool process::receive(bool wait_for_close) m_host_t from; while ((ctrl_close_pending || data_close_pending) && comm.recv(msg, from, wait_for_close)) { + DEBUG2("received %s from %s", + msg->to_string().c_str(), MSG_host_get_name(from)); switch (msg->get_type()) { case message::INFO: - DEBUG0("received INFO"); // fixme: update neighbor // need a map m_host_t -> neighbor& break; case message::CREDIT: - DEBUG0("received CREDIT"); expected_load += msg->get_amount(); break; case message::LOAD: - DEBUG0("received LOAD"); load += msg->get_amount(); break; case message::CTRL_CLOSE: - DEBUG0("received CTRL_CLOSE"); if (--ctrl_close_pending == 1) comm.next_close_on_ctrl_is_last(); + DEBUG1("ctrl_close_pending = %d", ctrl_close_pending); result = false; break; case message::DATA_CLOSE: - DEBUG0("received DATA_CLOSE"); if (--data_close_pending == 1) comm.next_close_on_data_is_last(); + DEBUG1("data_close_pending = %d", data_close_pending); result = false; break; } @@ -142,7 +152,7 @@ void process::finalize() (int )neigh.size(), ESSE(neigh.size())); receive(true); - comm.wait_for_sent(); + comm.flush(true); } void process::print_loads(e_xbt_log_priority_t logp) diff --git a/valgrind_suppressions_3.5 b/valgrind_suppressions_3.5 new file mode 100644 index 0000000..f445238 --- /dev/null +++ b/valgrind_suppressions_3.5 @@ -0,0 +1,10 @@ +{ + Memory leaks in surf_routing.c + Memcheck:Leak + ... + fun:surf_parse_lex + fun:parse_platform_file + fun:SIMIX_create_environment + fun:MSG_create_environment + ... +}