From 4f2d08b0ae4625d32890dc4613e1d842ad0f7b77 Mon Sep 17 00:00:00 2001 From: Martin Quinson Date: Wed, 17 Oct 2018 18:11:12 +0200 Subject: [PATCH] Sanitize the API of this_actor::parallel_execute() - Use std::vector instead of C arrays (old API remains but is not documented -- it should be properly deprecated) - The flop_amounts and comm_amounts arrays are not automatically freed by the internal functions, and should be properly cleaned by their creator. - EXCEPTION: in ptask model, sequential exec and regular comms don't have a real caller, so the internal function still has to free these arrays... Sick Sad World. - The proper solution would be to have the only copy of these arrays in the Action instead of having it in s4u. - But for now, Actions start as soon as created. So if you want to init them without starting, you have to have the data in s4u and only create the implementation side when you start the stuff. - That should obviously be fixed :) First step in that direction would be to have the constructor of each action NOT register the action in the LMM, but have an Action::start() in charge of this. For each subclass of Action. --- examples/s4u/exec-ptask/s4u-exec-ptask.cpp | 54 ++++++++++++---------- include/simgrid/s4u/Actor.hpp | 12 ++++- src/s4u/s4u_Actor.cpp | 27 +++++++++++ src/simix/libsmx.cpp | 3 +- src/simix/smx_host.cpp | 4 +- src/surf/HostImpl.cpp | 5 +- src/surf/ptask_L07.cpp | 54 +++++++++++++--------- src/surf/ptask_L07.hpp | 11 +++-- 8 files changed, 110 insertions(+), 60 deletions(-) diff --git a/examples/s4u/exec-ptask/s4u-exec-ptask.cpp b/examples/s4u/exec-ptask/s4u-exec-ptask.cpp index e5b9b6b98a..14907717b4 100644 --- a/examples/s4u/exec-ptask/s4u-exec-ptask.cpp +++ b/examples/s4u/exec-ptask/s4u-exec-ptask.cpp @@ -31,31 +31,29 @@ static void runner() XBT_INFO("First, build a classical parallel task, with 1 Gflop to execute on each node, " "and 10MB to exchange between each pair"); - double* computation_amounts = new double[hosts_count](); - double* communication_amounts = new double[hosts_count * hosts_count](); - for (int i = 0; i < hosts_count; i++) - computation_amounts[i] = 1e9; // 1 Gflop + std::vector computation_amounts; + std::vector communication_amounts; + /* ------[ test 1 ]----------------- */ + computation_amounts.assign(hosts.size(), 1e9 /*1Gflop*/); + communication_amounts.assign(hosts.size() * hosts.size(), 0); for (int i = 0; i < hosts_count; i++) for (int j = i + 1; j < hosts_count; j++) communication_amounts[i * hosts_count + j] = 1e7; // 10 MB - simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, communication_amounts); + simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts); + /* ------[ test 2 ]----------------- */ XBT_INFO("We can do the same with a timeout of one second enabled."); - computation_amounts = new double[hosts_count](); - communication_amounts = new double[hosts_count * hosts_count](); - - for (int i = 0; i < hosts_count; i++) - computation_amounts[i] = 1e9; // 1 Gflop - + computation_amounts.assign(hosts.size(), 1e9 /*1Gflop*/); + communication_amounts.assign(hosts.size() * hosts.size(), 0); for (int i = 0; i < hosts_count; i++) for (int j = i + 1; j < hosts_count; j++) communication_amounts[i * hosts_count + j] = 1e7; // 10 MB try { - simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, communication_amounts, + simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts, 1.0 /* timeout (in seconds)*/); XBT_WARN("Woops, this did not timeout as expected... Please report that bug."); } catch (xbt_ex& e) { @@ -63,28 +61,34 @@ static void runner() XBT_DEBUG("Caught expected exception: %s", e.what()); } + /* ------[ test 3 ]----------------- */ XBT_INFO("Then, build a parallel task involving only computations and no communication (1 Gflop per node)"); - computation_amounts = new double[hosts_count](); - for (int i = 0; i < hosts_count; i++) - computation_amounts[i] = 1e9; // 1 Gflop - simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, nullptr /* no comm */); + computation_amounts.assign(hosts.size(), 1e9 /*1Gflop*/); + communication_amounts.clear(); /* no comm */ + simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts); + /* ------[ test 4 ]----------------- */ XBT_INFO("Then, build a parallel task involving only heterogeneous computations and no communication"); - computation_amounts = new double[hosts_count](); - for (int i = 0; i < hosts_count; i++) + computation_amounts.resize(hosts.size()); + for (int i = 0; i < hosts_count; i++) computation_amounts[i] = 5 * (i + 1) * 1e8; // 500Mflop, 1Gflop, 1.5Gflop - simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, nullptr /* no comm */); + communication_amounts.clear(); /* no comm */ + simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts); + /* ------[ test 5 ]----------------- */ XBT_INFO("Then, build a parallel task with no computation nor communication (synchro only)"); - computation_amounts = new double[hosts_count](); - communication_amounts = new double[hosts_count * hosts_count](); - simgrid::s4u::this_actor::parallel_execute(hosts_count, hosts.data(), computation_amounts, communication_amounts); + computation_amounts.clear(); + communication_amounts.clear(); + simgrid::s4u::this_actor::parallel_execute(hosts, computation_amounts, communication_amounts); + /* ------[ test 6 ]----------------- */ XBT_INFO("Finally, trick the ptask to do a 'remote execution', on host %s", hosts[1]->get_cname()); - computation_amounts = new double[1]{1e9}; + std::vector remote; + remote.push_back(hosts[1]); + computation_amounts.assign(1, 1e9); + communication_amounts.clear(); - simgrid::s4u::Host* remote[] = {hosts[1]}; - simgrid::s4u::this_actor::parallel_execute(1, remote, computation_amounts, nullptr); + simgrid::s4u::this_actor::parallel_execute(remote, computation_amounts, communication_amounts); XBT_INFO("Goodbye now!"); } diff --git a/include/simgrid/s4u/Actor.hpp b/include/simgrid/s4u/Actor.hpp index cbe234dcd2..098c8b90d2 100644 --- a/include/simgrid/s4u/Actor.hpp +++ b/include/simgrid/s4u/Actor.hpp @@ -433,6 +433,9 @@ XBT_PUBLIC void execute(double flop, double priority); * each host, using a vector of flops amount. Then, you should specify the amount of data exchanged between each * hosts during the parallel kernel. For that, a matrix of values is expected. * + * It is OK to build a parallel execution without any computation and/or without any communication. + * Just pass an empty vector to the corresponding parameter. + * * For example, if your list of hosts is ``[host0, host1]``, passing a vector ``[1000, 2000]`` as a `flops_amount` * vector means that `host0` should compute 1000 flops while `host1` will compute 2000 flops. A matrix of * communications' sizes of ``[0, 1, 2, 3]`` specifies the following data exchanges: @@ -461,14 +464,21 @@ XBT_PUBLIC void execute(double flop, double priority); * * \endrst */ +XBT_PUBLIC void parallel_execute(std::vector hosts, std::vector flops_amounts, + std::vector bytes_amounts); -XBT_PUBLIC void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount); /** \rst * Block the actor until the built :ref:`parallel execution ` completes, or until the timeout. * \endrst */ +XBT_PUBLIC void parallel_execute(std::vector hosts, std::vector flops_amounts, + std::vector bytes_amounts, double timeout); + +#ifndef DOXYGEN +XBT_PUBLIC void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount); XBT_PUBLIC void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount, double timeout); +#endif XBT_PUBLIC ExecPtr exec_init(double flops_amounts); XBT_PUBLIC ExecPtr exec_async(double flops_amounts); diff --git a/src/s4u/s4u_Actor.cpp b/src/s4u/s4u_Actor.cpp index 945dfb47fd..56d2827b3f 100644 --- a/src/s4u/s4u_Actor.cpp +++ b/src/s4u/s4u_Actor.cpp @@ -293,11 +293,38 @@ void execute(double flops, double priority) exec_init(flops)->set_priority(priority)->start()->wait(); } +void parallel_execute(std::vector hosts, std::vector flops_amounts, + std::vector bytes_amounts) +{ + parallel_execute(hosts, flops_amounts, bytes_amounts, -1); +} +void parallel_execute(std::vector hosts, std::vector flops_amounts, + std::vector bytes_amounts, double timeout) +{ + xbt_assert(hosts.size() > 0, "Your parallel executions must span over at least one host."); + xbt_assert(hosts.size() == flops_amounts.size() || flops_amounts.empty(), + "Host count (%zu) does not match flops_amount count (%zu).", hosts.size(), flops_amounts.size()); + xbt_assert(hosts.size() * hosts.size() == bytes_amounts.size() || bytes_amounts.empty(), + "bytes_amounts must be a matrix of size host_count * host_count (%zu*%zu), but it's of size %zu.", + hosts.size(), hosts.size(), flops_amounts.size()); + + /* The vectors live as parameter of parallel_execute. No copy is created for simcall_execution_parallel_start(), + * but that's OK because simcall_execution_wait() is called from here too. + */ + smx_activity_t s = simcall_execution_parallel_start("", hosts.size(), hosts.data(), + (flops_amounts.empty() ? nullptr : flops_amounts.data()), + (bytes_amounts.empty() ? nullptr : bytes_amounts.data()), + /* rate */ -1, timeout); + simcall_execution_wait(s); +} + void parallel_execute(int host_nb, s4u::Host** host_list, double* flops_amount, double* bytes_amount, double timeout) { smx_activity_t s = simcall_execution_parallel_start("", host_nb, host_list, flops_amount, bytes_amount, /* rate */ -1, timeout); simcall_execution_wait(s); + delete[] flops_amount; + delete[] bytes_amount; } void parallel_execute(int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount) diff --git a/src/simix/libsmx.cpp b/src/simix/libsmx.cpp index 0dbe5b1617..cff0c834e2 100644 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@ -73,7 +73,8 @@ smx_activity_t simcall_execution_parallel_start(std::string name, int host_nb, s { /* checking for infinite values */ for (int i = 0 ; i < host_nb ; ++i) { - xbt_assert(std::isfinite(flops_amount[i]), "flops_amount[%d] is not finite!", i); + if (flops_amount != nullptr) + xbt_assert(std::isfinite(flops_amount[i]), "flops_amount[%d] is not finite!", i); if (bytes_amount != nullptr) { for (int j = 0 ; j < host_nb ; ++j) { xbt_assert(std::isfinite(bytes_amount[i + host_nb * j]), diff --git a/src/simix/smx_host.cpp b/src/simix/smx_host.cpp index 745a67db77..8c42e6c27c 100644 --- a/src/simix/smx_host.cpp +++ b/src/simix/smx_host.cpp @@ -63,9 +63,7 @@ simgrid::kernel::activity::ExecImplPtr SIMIX_execution_parallel_start(std::strin simgrid::kernel::resource::Action* surf_action = nullptr; simgrid::kernel::resource::Action* timeout_detector = nullptr; if (not MC_is_active() && not MC_record_replay_is_active()) { - sg_host_t* host_list_cpy = new sg_host_t[host_nb]; - std::copy_n(host_list, host_nb, host_list_cpy); - surf_action = surf_host_model->execute_parallel(host_nb, host_list_cpy, flops_amount, bytes_amount, rate); + surf_action = surf_host_model->execute_parallel(host_nb, host_list, flops_amount, bytes_amount, rate); if (timeout > 0) { timeout_detector = host_list[0]->pimpl_cpu->sleep(timeout); } diff --git a/src/surf/HostImpl.cpp b/src/surf/HostImpl.cpp index f718d80ada..ce622c4bf2 100644 --- a/src/surf/HostImpl.cpp +++ b/src/surf/HostImpl.cpp @@ -57,7 +57,7 @@ kernel::resource::Action* HostModel::execute_parallel(int host_nb, s4u::Host** h double* bytes_amount, double rate) { kernel::resource::Action* action = nullptr; - if ((host_nb == 1) && (has_cost(bytes_amount, 0) <= 0)) { + if ((host_nb == 1) && (has_cost(bytes_amount, 0) <= 0) && (has_cost(flops_amount, 0) > 0)) { action = host_list[0]->pimpl_cpu->execution_start(flops_amount[0]); } else if ((host_nb == 1) && (has_cost(flops_amount, 0) <= 0)) { action = surf_network_model->communicate(host_list[0], host_list[0], bytes_amount[0], rate); @@ -87,9 +87,6 @@ kernel::resource::Action* HostModel::execute_parallel(int host_nb, s4u::Host** h " - Self-comms with one host only\n" " - Communications with two hosts and no computation"); } - delete[] host_list; - delete[] flops_amount; - delete[] bytes_amount; return action; } diff --git a/src/surf/ptask_L07.cpp b/src/surf/ptask_L07.cpp index 75f1027130..5943f79ddb 100644 --- a/src/surf/ptask_L07.cpp +++ b/src/surf/ptask_L07.cpp @@ -147,12 +147,14 @@ L07Action::L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* hos double latency = 0.0; this->set_last_update(); - this->hostList_->reserve(host_nb); - for (int i = 0; i < host_nb; i++) { - this->hostList_->push_back(host_list[i]); - if (flops_amount[i] > 0) - nb_used_host++; - } + this->hostList_.reserve(host_nb); + for (int i = 0; i < host_nb; i++) + this->hostList_.push_back(host_list[i]); + + if (flops_amount != nullptr) + for (int i = 0; i < host_nb; i++) + if (flops_amount[i] > 0) + nb_used_host++; /* Compute the number of affected resources... */ if(bytes_amount != nullptr) { @@ -165,7 +167,7 @@ L07Action::L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* hos double lat=0.0; std::vector route; - hostList_->at(i)->route_to(hostList_->at(j), route, &lat); + hostList_.at(i)->route_to(hostList_.at(j), route, &lat); latency = std::max(latency, lat); for (auto const& link : route) @@ -185,15 +187,16 @@ L07Action::L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* hos if (latency_ > 0) model->get_maxmin_system()->update_variable_weight(get_variable(), 0.0); - for (int i = 0; i < host_nb; i++) - model->get_maxmin_system()->expand(host_list[i]->pimpl_cpu->get_constraint(), get_variable(), flops_amount[i]); + if (flops_amount != nullptr) + for (int i = 0; i < host_nb; i++) + model->get_maxmin_system()->expand(host_list[i]->pimpl_cpu->get_constraint(), get_variable(), flops_amount[i]); - if(bytes_amount != nullptr) { + if (bytes_amount != nullptr) { for (int i = 0; i < host_nb; i++) { for (int j = 0; j < host_nb; j++) { if (bytes_amount[i * host_nb + j] > 0.0) { std::vector route; - hostList_->at(i)->route_to(hostList_->at(j), route, nullptr); + hostList_.at(i)->route_to(hostList_.at(j), route, nullptr); for (auto const& link : route) model->get_maxmin_system()->expand_add(link->get_constraint(), this->get_variable(), @@ -207,7 +210,6 @@ L07Action::L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* hos this->set_cost(1.0); this->set_remains(0.0); } - delete[] host_list; } kernel::resource::Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate) @@ -220,7 +222,9 @@ kernel::resource::Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host host_list[1] = dst; bytes_amount[1] = size; - return hostModel_->execute_parallel(2, host_list, flops_amount, bytes_amount, rate); + kernel::resource::Action* res = hostModel_->execute_parallel(2, host_list, flops_amount, bytes_amount, rate); + static_cast(res)->free_arrays_ = true; + return res; } Cpu* CpuL07Model::create_cpu(simgrid::s4u::Host* host, std::vector* speed_per_pstate, int core) @@ -261,13 +265,15 @@ LinkL07::LinkL07(NetworkL07Model* model, const std::string& name, double bandwid kernel::resource::Action* CpuL07::execution_start(double size) { - sg_host_t* host_list = new sg_host_t[1](); - double* flops_amount = new double[1](); + sg_host_t host_list[1] = {get_host()}; - host_list[0] = get_host(); + double* flops_amount = new double[1](); flops_amount[0] = size; - return static_cast(get_model())->hostModel_->execute_parallel(1, host_list, flops_amount, nullptr, -1); + kernel::resource::Action* res = + static_cast(get_model())->hostModel_->execute_parallel(1, host_list, flops_amount, nullptr, -1); + static_cast(res)->free_arrays_ = true; + return res; } kernel::resource::Action* CpuL07::sleep(double duration) @@ -378,17 +384,19 @@ LinkL07::~LinkL07() = default; * Action * **********/ -L07Action::~L07Action(){ - delete hostList_; - delete[] communicationAmount_; - delete[] computationAmount_; +L07Action::~L07Action() +{ + if (free_arrays_) { + delete[] computationAmount_; + delete[] communicationAmount_; + } } void L07Action::updateBound() { double lat_current = 0.0; - int hostNb = hostList_->size(); + int hostNb = hostList_.size(); if (communicationAmount_ != nullptr) { for (int i = 0; i < hostNb; i++) { @@ -397,7 +405,7 @@ void L07Action::updateBound() if (communicationAmount_[i * hostNb + j] > 0) { double lat = 0.0; std::vector route; - hostList_->at(i)->route_to(hostList_->at(j), route, &lat); + hostList_.at(i)->route_to(hostList_.at(j), route, &lat); lat_current = std::max(lat_current, lat * communicationAmount_[i * hostNb + j]); } diff --git a/src/surf/ptask_L07.hpp b/src/surf/ptask_L07.hpp index 98d6940dca..70efcaa82d 100644 --- a/src/surf/ptask_L07.hpp +++ b/src/surf/ptask_L07.hpp @@ -106,6 +106,7 @@ class L07Action : public CpuAction { friend Action *CpuL07::sleep(double duration); friend Action* HostL07Model::execute_parallel(int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount, double rate); + friend Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate); public: L07Action(kernel::resource::Model* model, int host_nb, sg_host_t* host_list, double* flops_amount, @@ -114,11 +115,15 @@ public: void updateBound(); - std::vector* hostList_ = new std::vector(); - double *computationAmount_; - double *communicationAmount_; + std::vector hostList_; + double* computationAmount_; /* pointer to the data that lives in s4u action -- do not free unless if free_arrays */ + double* communicationAmount_; /* pointer to the data that lives in s4u action -- do not free unless if free_arrays */ double latency_; double rate_; + +private: + bool free_arrays_ = false; // By default, computationAmount_ and friends are freed by caller. But not for sequential + // exec and regular comms }; } -- 2.20.1