typedef boost::intrusive_ptr<Exec> ExecPtr;
XBT_PUBLIC void intrusive_ptr_release(Exec* e);
XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e);
+class ExecSeq;
+class ExecPar;
class Host;
friend XBT_PUBLIC void intrusive_ptr_release(Comm * c);
friend XBT_PUBLIC void intrusive_ptr_add_ref(Comm * c);
friend simgrid::s4u::Exec;
+ friend simgrid::s4u::ExecSeq;
+ friend simgrid::s4u::ExecPar;
friend XBT_PUBLIC void intrusive_ptr_release(Exec * e);
friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec * e);
friend simgrid::s4u::Io;
#endif
XBT_PUBLIC ExecPtr exec_init(double flops_amounts);
+XBT_PUBLIC ExecPtr exec_init(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
+ const std::vector<double>& bytes_amounts);
+
XBT_PUBLIC ExecPtr exec_async(double flops_amounts);
/** @brief Returns the actor ID of the current actor. */
#include <simgrid/forward.h>
#include <simgrid/s4u/Activity.hpp>
+#include <xbt/ex.h>
#include <atomic>
* They are generated from this_actor::exec_init() or Host::execute(), and can be used to model pools of threads or
* similar mechanisms.
*/
-
class XBT_PUBLIC Exec : public Activity {
- Host* host_ = nullptr;
- double flops_amount_ = 0.0;
+ std::string name_ = "";
double priority_ = 1.0;
double bound_ = 0.0;
- std::string name_ = "";
+ double timeout_ = 0.0;
std::string tracing_category_ = "";
std::atomic_int_fast32_t refcount_{0};
+ Host* host_ = nullptr;
- explicit Exec(sg_host_t host, double flops_amount);
+protected:
+ Exec();
+ virtual ~Exec() = default;
public:
+#ifndef DOXYGEN
+ Exec(Exec const&) = delete;
+ Exec& operator=(Exec const&) = delete;
+#endif
+
+ friend simgrid::s4u::ExecSeq;
+ friend simgrid::s4u::ExecPar;
friend XBT_PUBLIC void intrusive_ptr_release(Exec* e);
friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e);
- friend XBT_PUBLIC ExecPtr this_actor::exec_init(double flops_amount);
-
- ~Exec() = default;
-
static xbt::signal<void(ActorPtr)> on_start;
static xbt::signal<void(ActorPtr)> on_completion;
- Exec* start() override;
+ virtual double get_remaining() = 0;
+ virtual double get_remaining_ratio() = 0;
+ virtual Exec* start() = 0;
+ virtual ExecPtr set_host(Host* host) = 0;
+
Exec* wait() override;
Exec* wait_for(double timeout) override;
- Exec* cancel() override;
bool test() override;
- ExecPtr set_priority(double priority);
ExecPtr set_bound(double bound);
- ExecPtr set_host(Host* host);
ExecPtr set_name(std::string name);
+ ExecPtr set_priority(double priority);
ExecPtr set_tracing_category(std::string category);
- Host* get_host();
-
- double get_remaining() override;
- double get_remaining_ratio();
+ ExecPtr set_timeout(double timeout);
+ Exec* cancel() override;
-#ifndef DOXYGEN
- //////////////// Deprecated functions
- XBT_ATTRIB_DEPRECATED_v324("Please use Exec::wait_for()") void wait(double t) override { wait_for(t); }
XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_priority()") ExecPtr setPriority(double priority)
{
return set_priority(priority);
}
XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_bound()") ExecPtr setBound(double bound) { return set_bound(bound); }
+ XBT_ATTRIB_DEPRECATED_v324("Please use Exec::wait_for()") void wait(double t) override { wait_for(t); }
+};
+
+class XBT_PUBLIC ExecSeq : public Exec {
+ double flops_amount_ = 0.0;
+
+ explicit ExecSeq(sg_host_t host, double flops_amount);
+
+public:
+ friend XBT_PUBLIC ExecPtr this_actor::exec_init(double flops_amount);
+
+ ~ExecSeq() = default;
+
+ Exec* start() override;
+
+ ExecPtr set_host(Host* host);
+ Host* get_host();
+
+ double get_remaining() override;
+ double get_remaining_ratio() override;
+
+#ifndef DOXYGEN
+ //////////////// Deprecated functions
XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_host()") ExecPtr setHost(Host* host) { return set_host(host); }
XBT_ATTRIB_DEPRECATED_v323("Please use Exec::get_host()") Host* getHost() { return get_host(); }
XBT_ATTRIB_DEPRECATED_v323("Please use Exec::get_remaining_ratio()") double getRemainingRatio()
}
#endif
};
+
+class XBT_PUBLIC ExecPar : public Exec {
+ std::vector<s4u::Host*> hosts_;
+ std::vector<double> flops_amounts_;
+ std::vector<double> bytes_amounts_;
+ explicit ExecPar(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
+ const std::vector<double>& bytes_amounts);
+ ExecPtr set_host(Host* host) { return this; }
+
+public:
+ ~ExecPar() = default;
+ friend XBT_PUBLIC ExecPtr this_actor::exec_init(const std::vector<s4u::Host*>& hosts,
+ const std::vector<double>& flops_amounts,
+ const std::vector<double>& bytes_amounts);
+ double get_remaining() override;
+ double get_remaining_ratio() override;
+ Exec* start() override;
+};
+
} // namespace s4u
} // namespace simgrid
#include "simgrid/modelchecker.h"
#include "src/mc/mc_replay.hpp"
#include "src/simix/smx_host_private.hpp"
+#include "src/surf/HostImpl.hpp"
#include "src/surf/cpu_interface.hpp"
#include "src/surf/surf_interface.hpp"
namespace kernel {
namespace activity {
-ExecImpl::ExecImpl(std::string name, std::string tracing_category, s4u::Host* host)
- : ActivityImpl(std::move(name)), host_(host)
+ExecImpl::ExecImpl(std::string name, std::string tracing_category) : ActivityImpl(std::move(name))
{
this->state_ = SIMIX_RUNNING;
this->set_category(std::move(tracing_category));
XBT_DEBUG("Create exec %p", this);
}
-ExecImpl::ExecImpl(std::string name, std::string tracing_category, s4u::Host* host, double timeout)
- : ExecImpl(std::move(name), std::move(tracing_category), nullptr)
-{
- if (timeout > 0 && not MC_is_active() && not MC_record_replay_is_active()) {
- timeout_detector_ = host->pimpl_cpu->sleep(timeout);
- timeout_detector_->set_data(this);
- }
- XBT_DEBUG("Create exec %p", this);
-}
-
ExecImpl::~ExecImpl()
{
if (surf_action_)
XBT_DEBUG("Destroy exec %p", this);
}
+ExecImpl* ExecImpl::set_host(s4u::Host* host)
+{
+ host_ = host;
+ return this;
+}
+
+ExecImpl* ExecImpl::set_timeout(double timeout)
+{
+ if (timeout > 0 && not MC_is_active() && not MC_record_replay_is_active()) {
+ timeout_detector_ = host_->pimpl_cpu->sleep(timeout);
+ timeout_detector_->set_data(this);
+ }
+ return this;
+}
+
ExecImpl* ExecImpl::start(double flops_amount, double priority, double bound)
{
if (not MC_is_active() && not MC_record_replay_is_active()) {
return this;
}
+ExecImpl* ExecImpl::start(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
+ const std::vector<double>& bytes_amounts)
+{
+ /* set surf's synchro */
+ if (not MC_is_active() && not MC_record_replay_is_active()) {
+ surf_action_ = surf_host_model->execute_parallel(hosts, flops_amounts.data(), bytes_amounts.data(), -1);
+ if (surf_action_ != nullptr) {
+ surf_action_->set_data(this);
+ }
+ }
+ XBT_DEBUG("Create parallel execute synchro %p", this);
+ ExecImpl::on_creation(this);
+ return this;
+}
void ExecImpl::cancel()
{
XBT_VERB("This exec %p is canceled", this);
double ExecImpl::get_remaining()
{
- xbt_assert(host_ != nullptr, "Calling remains() on a parallel execution is not allowed. "
- "We would need to return a vector instead of a scalar. "
- "Did you mean remainingRatio() instead?");
return surf_action_ ? surf_action_->get_remains() : 0;
}
-double ExecImpl::get_remaining_ratio()
+double ExecImpl::get_seq_remaining_ratio()
+{
+ return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains() / surf_action_->get_cost();
+}
+
+double ExecImpl::get_par_remaining_ratio()
{
- if (host_ ==
- nullptr) // parallel task: their remain is already between 0 and 1 (see comment in ExecImpl::get_remaining())
- return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains();
- else // Actually compute the ratio for sequential tasks
- return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains() / surf_action_->get_cost();
+ // parallel task: their remain is already between 0 and 1
+ return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains();
}
void ExecImpl::set_bound(double bound)
~ExecImpl() override;
public:
- explicit ExecImpl(std::string name, std::string tracing_category, s4u::Host* host);
- explicit ExecImpl(std::string name, std::string tracing_category, s4u::Host* host, double timeout);
+ explicit ExecImpl(std::string name, std::string tracing_category);
ExecImpl* start(double flops_amount, double priority, double bound);
+ ExecImpl* start(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
+ const std::vector<double>& bytes_amounts);
+
+ ExecImpl* set_host(s4u::Host* host);
+ ExecImpl* set_timeout(double timeout);
void cancel();
void post() override;
void finish() override;
double get_remaining();
- double get_remaining_ratio();
+ double get_seq_remaining_ratio();
+ double get_par_remaining_ratio();
void set_bound(double bound); // deprecated. To be removed in v3.25
void set_priority(double priority); // deprecated. To be removed in v3.25
virtual ActivityImpl* migrate(s4u::Host* to);
#include <cmath>
#include "simgrid/Exception.hpp"
+#include "simgrid/s4u/Actor.hpp"
#include "simgrid/s4u/Comm.hpp"
+#include "simgrid/s4u/Exec.hpp"
#include "simgrid/s4u/Mailbox.hpp"
#include "src/instr/instr_private.hpp"
#include "src/kernel/activity/ExecImpl.hpp"
msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout)
{
- e_smx_state_t comp_state;
msg_error_t status = MSG_OK;
xbt_assert((not task->compute) && not task->is_used(), "This task is executed somewhere else. Go fix your code!");
try {
task->set_used();
+ simgrid::s4u::ExecPtr e =
+ simgrid::s4u::this_actor::exec_init(task->hosts_, task->flops_parallel_amount, task->bytes_parallel_amount)
+ ->set_name(task->get_name())
+ ->set_tracing_category(task->get_tracing_category())
+ ->set_timeout(timeout)
+ ->start();
+ task->compute = boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(e->get_impl());
- task->compute = boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(simcall_execution_parallel_start(
- std::move(task->get_name()), task->hosts_.size(), task->hosts_.data(),
- (task->flops_parallel_amount.empty() ? nullptr : task->flops_parallel_amount.data()),
- (task->bytes_parallel_amount.empty() ? nullptr : task->bytes_parallel_amount.data()), -1.0, timeout));
XBT_DEBUG("Parallel execution action created: %p", task->compute.get());
- if (task->has_tracing_category())
- simgrid::simix::simcall([task] { task->compute->set_category(std::move(task->get_tracing_category())); });
- comp_state = simcall_execution_wait(task->compute);
+ e->wait();
task->set_not_used();
- XBT_DEBUG("Execution task '%s' finished in state %d", task->get_cname(), (int)comp_state);
+ XBT_DEBUG("Execution task '%s' finished", task->get_cname());
} catch (simgrid::HostFailureException& e) {
status = MSG_HOST_FAILURE;
} catch (simgrid::TimeoutError& e) {
xbt_assert((task != nullptr), "Cannot get information from a nullptr task");
if (task->compute) {
// Task in progress
- return task->compute->get_remaining_ratio();
+ if (task->is_parallel())
+ return task->compute->get_par_remaining_ratio();
+ else
+ return task->compute->get_seq_remaining_ratio();
} else {
// Task not started (flops_amount is > 0.0) or finished (flops_amount is set to 0.0)
return task->flops_amount > 0.0 ? 1.0 : 0.0;
#include "simgrid/s4u/Actor.hpp"
#include "simgrid/s4u/Exec.hpp"
#include "simgrid/s4u/Host.hpp"
+#include "simgrid/s4u/VirtualMachine.hpp"
#include "src/kernel/activity/ExecImpl.hpp"
#include "src/simix/smx_host_private.hpp"
#include "src/simix/smx_private.hpp"
#include "src/surf/HostImpl.hpp"
+#include <algorithm>
#include <sstream>
XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_actor, "S4U actors");
xbt_assert(hosts.size() * hosts.size() == bytes_amounts.size() || bytes_amounts.empty(),
"bytes_amounts must be a matrix of size host_count * host_count (%zu*%zu), but it's of size %zu.",
hosts.size(), hosts.size(), flops_amounts.size());
-
- /* The vectors live as parameter of parallel_execute. No copy is created for simcall_execution_parallel_start(),
- * but that's OK because simcall_execution_wait() is called from here too.
- */
- smx_activity_t s = simcall_execution_parallel_start("", hosts.size(), hosts.data(),
- (flops_amounts.empty() ? nullptr : flops_amounts.data()),
- (bytes_amounts.empty() ? nullptr : bytes_amounts.data()),
- /* rate */ -1, timeout);
- simcall_execution_wait(s);
+ /* Check that we are not mixing VMs and PMs in the parallel task */
+ bool is_a_vm = (nullptr != dynamic_cast<simgrid::s4u::VirtualMachine*>(hosts.front()));
+ xbt_assert(std::all_of(hosts.begin(), hosts.end(),
+ [is_a_vm](s4u::Host* elm) {
+ bool tmp_is_a_vm = (nullptr != dynamic_cast<simgrid::s4u::VirtualMachine*>(elm));
+ return is_a_vm == tmp_is_a_vm;
+ }),
+ "parallel_execute: mixing VMs and PMs is not supported (yet).");
+ /* checking for infinite values */
+ xbt_assert(std::all_of(flops_amounts.begin(), flops_amounts.end(), [](double elm) { return std::isfinite(elm); }),
+ "flops_amounts comprises infinite values!");
+ xbt_assert(std::all_of(bytes_amounts.begin(), bytes_amounts.end(), [](double elm) { return std::isfinite(elm); }),
+ "flops_amounts comprises infinite values!");
+
+ exec_init(hosts, flops_amounts, bytes_amounts)->set_timeout(timeout)->wait();
}
// deprecated
ExecPtr exec_init(double flops_amount)
{
- return ExecPtr(new Exec(get_host(), flops_amount));
+ return ExecPtr(new ExecSeq(get_host(), flops_amount));
+}
+
+ExecPtr exec_init(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
+ const std::vector<double>& bytes_amounts)
+{
+ return ExecPtr(new ExecPar(hosts, flops_amounts, bytes_amounts));
}
ExecPtr exec_async(double flops)
simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Exec::on_start;
simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Exec::on_completion;
-Exec::Exec(sg_host_t host, double flops_amount) : Activity(), host_(host), flops_amount_(flops_amount)
+Exec::Exec()
{
- Activity::set_remaining(flops_amount_);
- pimpl_ = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl(name_, tracing_category_, host_));
+ pimpl_ = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl(name_, tracing_category_));
}
-Exec* Exec::start()
+bool Exec::test()
{
- simix::simcall([this] {
- boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->start(flops_amount_, 1. / priority_, bound_);
- });
- state_ = State::STARTED;
- on_start(Actor::self());
- return this;
-}
+ xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED);
-Exec* Exec::cancel()
-{
- simgrid::simix::simcall([this] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->cancel(); });
- state_ = State::CANCELED;
- return this;
+ if (state_ == State::FINISHED)
+ return true;
+
+ if (state_ == State::INITED)
+ this->start();
+
+ if (simcall_execution_test(pimpl_)) {
+ state_ = State::FINISHED;
+ return true;
+ }
+
+ return false;
}
Exec* Exec::wait()
THROW_UNIMPLEMENTED;
}
-/** @brief Returns whether the state of the exec is finished */
-bool Exec::test()
+Exec* Exec::cancel()
{
- xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED);
+ simgrid::simix::simcall([this] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->cancel(); });
+ state_ = State::CANCELED;
+ return this;
+}
- if (state_ == State::FINISHED)
- return true;
+void intrusive_ptr_release(simgrid::s4u::Exec* e)
+{
+ if (e->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ delete e;
+ }
+}
- if (state_ == State::INITED)
- this->start();
+void intrusive_ptr_add_ref(simgrid::s4u::Exec* e)
+{
+ e->refcount_.fetch_add(1, std::memory_order_relaxed);
+}
- if (simcall_execution_test(pimpl_)) {
- state_ = State::FINISHED;
- return true;
- }
+/** @brief change the execution bound
+ * This means changing the maximal amount of flops per second that it may consume, regardless of what the host may
+ * deliver. Currently, this cannot be changed once the exec started.
+ */
+ExecPtr Exec::set_bound(double bound)
+{
+ xbt_assert(state_ == State::INITED, "Cannot change the bound of an exec after its start");
+ bound_ = bound;
+ return this;
+}
+ExecPtr Exec::set_timeout(double timeout)
+{
+ xbt_assert(state_ == State::INITED, "Cannot change the bound of an exec after its start");
+ timeout_ = timeout;
+ return this;
+}
- return false;
+ExecPtr Exec::set_name(std::string name)
+{
+ xbt_assert(state_ == State::INITED, "Cannot change the name of an exec after its start");
+ name_ = std::move(name);
+ return this;
}
/** @brief Change the execution priority, don't you think?
return this;
}
-/** @brief change the execution bound, ie the maximal amount of flops per second that it may consume, regardless of what
- * the host may deliver
- *
- * Currently, this cannot be changed once the exec started. */
-ExecPtr Exec::set_bound(double bound)
+ExecPtr Exec::set_tracing_category(std::string category)
{
- xbt_assert(state_ == State::INITED, "Cannot change the bound of an exec after its start");
- bound_ = bound;
+ xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start");
+ tracing_category_ = std::move(category);
return this;
}
+///////////// SEQUENTIAL EXECUTIONS ////////
+ExecSeq::ExecSeq(sg_host_t host, double flops_amount) : Exec(), flops_amount_(flops_amount)
+{
+ Activity::set_remaining(flops_amount_);
+ boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->set_host(host);
+}
+
+Exec* ExecSeq::start()
+{
+ simix::simcall([this] {
+ boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->start(flops_amount_, 1. / priority_, bound_);
+ });
+ state_ = State::STARTED;
+ on_start(Actor::self());
+ return this;
+}
+
+/** @brief Returns whether the state of the exec is finished */
/** @brief Change the host on which this activity takes place.
*
* The activity cannot be terminated already (but it may be started). */
-ExecPtr Exec::set_host(Host* host)
+ExecPtr ExecSeq::set_host(Host* host)
{
xbt_assert(state_ == State::INITED || state_ == State::STARTED,
"Cannot change the host of an exec once it's done (state: %d)", (int)state_);
return this;
}
-ExecPtr Exec::set_name(std::string name)
-{
- xbt_assert(state_ == State::INITED, "Cannot change the name of an exec after its start");
- name_ = std::move(name);
- return this;
-}
-
-ExecPtr Exec::set_tracing_category(std::string category)
-{
- xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start");
- tracing_category_ = std::move(category);
- return this;
-}
/** @brief Retrieve the host on which this activity takes place. */
-Host* Exec::get_host()
+Host* ExecSeq::get_host()
{
return host_;
}
/** @brief Returns the amount of flops that remain to be done */
-double Exec::get_remaining()
+double ExecSeq::get_remaining()
{
return simgrid::simix::simcall(
[this]() { return boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->get_remaining(); });
*
* The returned value is between 0 (completely done) and 1 (nothing done yet).
*/
-double Exec::get_remaining_ratio()
+double ExecSeq::get_remaining_ratio()
{
return simgrid::simix::simcall([this]() {
- return boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->get_remaining_ratio();
+ return boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->get_seq_remaining_ratio();
});
}
-void intrusive_ptr_release(simgrid::s4u::Exec* e)
+///////////// PARALLEL EXECUTIONS ////////
+ExecPar::ExecPar(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
+ const std::vector<double>& bytes_amounts)
+ : Exec(), hosts_(hosts), flops_amounts_(flops_amounts), bytes_amounts_(bytes_amounts)
{
- if (e->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
- std::atomic_thread_fence(std::memory_order_acquire);
- delete e;
- }
+ // For parallel executions, we need a special host to run the timeout detector.
+ host_ = hosts.front();
+ boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->host_ = host_;
}
-void intrusive_ptr_add_ref(simgrid::s4u::Exec* e)
+Exec* ExecPar::start()
{
- e->refcount_.fetch_add(1, std::memory_order_relaxed);
+ simix::simcall([this] {
+ boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->set_timeout(timeout_);
+ boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->start(hosts_, flops_amounts_, bytes_amounts_);
+ });
+ state_ = State::STARTED;
+ on_start(Actor::self());
+ return this;
+}
+double ExecPar::get_remaining_ratio()
+{
+ return simgrid::simix::simcall([this]() {
+ return boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->get_par_remaining_ratio();
+ });
+}
+
+double ExecPar::get_remaining()
+{
+ XBT_WARN("Calling get_remaining() on a parallel execution is not allowed. Call get_remaining_ratio() instead.");
+ return get_remaining_ratio();
}
} // namespace s4u
} // namespace simgrid
ExecPtr Host::exec_async(double flops)
{
- return this_actor::exec_init(flops)->set_host(this);
+ return this_actor::exec_init(flops);
}
void Host::execute(double flops)
void Host::execute(double flops, double priority)
{
- this_actor::exec_init(flops)->set_host(this)->set_priority(1 / priority)->start()->wait();
+ this_actor::exec_init(flops)->set_priority(1 / priority)->start()->wait();
}
} // namespace s4u
XBT_VERB("Executing task '%s'", task->name);
/* Beware! The scheduling data are now used by the surf action directly! no copy was done */
- task->surf_action = surf_host_model->execute_parallel(task->allocation->size(), task->allocation->data(),
- task->flops_amount, task->bytes_amount, task->rate);
+ task->surf_action =
+ surf_host_model->execute_parallel(*task->allocation, task->flops_amount, task->bytes_amount, task->rate);
task->surf_action->set_data(task);
return nullptr;
} else {
- return activity::ExecImplPtr(new activity::ExecImpl("suspend", "", this->host_))->start(0.0, 1.0, 0.0);
+ return activity::ExecImplPtr(new activity::ExecImpl("suspend", ""))->set_host(host_)->start(0.0, 1.0, 0.0);
}
}
}
xbt_assert(std::isfinite(rate), "rate is not finite!");
-
return simgrid::simix::simcall([name, host_nb, host_list, flops_amount, bytes_amount, rate, timeout] {
return SIMIX_execution_parallel_start(std::move(name), host_nb, host_list, flops_amount, bytes_amount, rate,
timeout);
{
return simgrid::simix::simcall([name, category, flops_amount, priority, bound, host] {
return simgrid::kernel::activity::ExecImplPtr(
- new simgrid::kernel::activity::ExecImpl(std::move(name), std::move(category), host))
+ new simgrid::kernel::activity::ExecImpl(std::move(name), std::move(category)))
+ ->set_host(host)
->start(flops_amount, priority, bound);
});
}
SIMIX_execution_parallel_start(std::string name, int host_nb, const sg_host_t* host_list, const double* flops_amount,
const double* bytes_amount, double rate, double timeout)
{
- simgrid::kernel::activity::ExecImplPtr exec = simgrid::kernel::activity::ExecImplPtr(
- new simgrid::kernel::activity::ExecImpl(std::move(name), "", host_list[0], timeout));
+ simgrid::kernel::activity::ExecImplPtr exec =
+ simgrid::kernel::activity::ExecImplPtr(new simgrid::kernel::activity::ExecImpl(std::move(name), ""));
+ std::vector<simgrid::s4u::Host*> hosts;
+ for (int i = 0; i < host_nb; i++)
+ hosts.push_back(host_list[i]);
/* set surf's synchro */
if (not MC_is_active() && not MC_record_replay_is_active()) {
- exec->surf_action_ = surf_host_model->execute_parallel(host_nb, host_list, flops_amount, bytes_amount, rate);
+ exec->surf_action_ = surf_host_model->execute_parallel(hosts, flops_amount, bytes_amount, rate);
if (exec->surf_action_ != nullptr) {
exec->surf_action_->set_data(exec.get());
}
/*********
* Model *
*********/
-
/* Helper function for executeParallelTask */
static inline double has_cost(const double* array, size_t pos)
{
return -1.0;
}
-kernel::resource::Action* HostModel::execute_parallel(size_t host_nb, s4u::Host* const* host_list,
+kernel::resource::Action* HostModel::execute_parallel(const std::vector<s4u::Host*> host_list,
const double* flops_amount, const double* bytes_amount,
double rate)
{
kernel::resource::Action* action = nullptr;
- if ((host_nb == 1) && (has_cost(bytes_amount, 0) <= 0) && (has_cost(flops_amount, 0) > 0)) {
+ if ((host_list.size() == 1) && (has_cost(bytes_amount, 0) <= 0) && (has_cost(flops_amount, 0) > 0)) {
action = host_list[0]->pimpl_cpu->execution_start(flops_amount[0]);
- } else if ((host_nb == 1) && (has_cost(flops_amount, 0) <= 0)) {
+ } else if ((host_list.size() == 1) && (has_cost(flops_amount, 0) <= 0)) {
action = surf_network_model->communicate(host_list[0], host_list[0], bytes_amount[0], rate);
- } else if ((host_nb == 2) && (has_cost(flops_amount, 0) <= 0) && (has_cost(flops_amount, 1) <= 0)) {
+ } else if ((host_list.size() == 2) && (has_cost(flops_amount, 0) <= 0) && (has_cost(flops_amount, 1) <= 0)) {
int nb = 0;
double value = 0.0;
- for (size_t i = 0; i < host_nb * host_nb; i++) {
+ for (size_t i = 0; i < host_list.size() * host_list.size(); i++) {
if (has_cost(bytes_amount, i) > 0.0) {
nb++;
value = has_cost(bytes_amount, i);
public:
HostModel() : Model(Model::UpdateAlgo::FULL) {}
- virtual kernel::resource::Action* execute_parallel(size_t host_nb, s4u::Host* const* host_list,
+ virtual kernel::resource::Action* execute_parallel(const std::vector<s4u::Host*> host_list,
const double* flops_amount, const double* bytes_amount,
double rate);
};
}
}
-kernel::resource::Action* HostL07Model::execute_parallel(size_t host_nb, s4u::Host* const* host_list,
+kernel::resource::Action* HostL07Model::execute_parallel(const std::vector<s4u::Host*> host_list,
const double* flops_amount, const double* bytes_amount,
double rate)
{
- return new L07Action(this, host_nb, host_list, flops_amount, bytes_amount, rate);
+ return new L07Action(this, host_list, flops_amount, bytes_amount, rate);
}
-L07Action::L07Action(kernel::resource::Model* model, size_t host_nb, s4u::Host* const* host_list,
+L07Action::L07Action(kernel::resource::Model* model, const std::vector<s4u::Host*> host_list,
const double* flops_amount, const double* bytes_amount, double rate)
: CpuAction(model, 1, 0), computationAmount_(flops_amount), communicationAmount_(bytes_amount), rate_(rate)
{
double latency = 0.0;
this->set_last_update();
- hostList_.insert(hostList_.end(), host_list, host_list + host_nb);
+ hostList_.insert(hostList_.end(), host_list.begin(), host_list.end());
if (flops_amount != nullptr)
- used_host_nb += std::count_if(flops_amount, flops_amount + host_nb, [](double x) { return x > 0.0; });
+ used_host_nb += std::count_if(flops_amount, flops_amount + host_list.size(), [](double x) { return x > 0.0; });
/* Compute the number of affected resources... */
if(bytes_amount != nullptr) {
std::unordered_set<const char*> affected_links;
- for (size_t k = 0; k < host_nb * host_nb; k++) {
+ for (size_t k = 0; k < host_list.size() * host_list.size(); k++) {
if (bytes_amount[k] <= 0)
continue;
double lat = 0.0;
std::vector<kernel::resource::LinkImpl*> route;
- hostList_[k / host_nb]->route_to(hostList_[k % host_nb], route, &lat);
+ hostList_[k / host_list.size()]->route_to(hostList_[k % host_list.size()], route, &lat);
latency = std::max(latency, lat);
for (auto const& link : route)
link_nb = affected_links.size();
}
- XBT_DEBUG("Creating a parallel task (%p) with %zu hosts and %zu unique links.", this, host_nb, link_nb);
+ XBT_DEBUG("Creating a parallel task (%p) with %zu hosts and %zu unique links.", this, host_list.size(), link_nb);
latency_ = latency;
- set_variable(model->get_maxmin_system()->variable_new(this, 1.0, (rate > 0 ? rate : -1.0), host_nb + link_nb));
+ set_variable(
+ model->get_maxmin_system()->variable_new(this, 1.0, (rate > 0 ? rate : -1.0), host_list.size() + link_nb));
if (latency_ > 0)
model->get_maxmin_system()->update_variable_weight(get_variable(), 0.0);
/* Expand it for the CPUs even if there is nothing to compute, to make sure that it gets expended even if there is no
* communication either */
- for (size_t i = 0; i < host_nb; i++)
+ for (size_t i = 0; i < host_list.size(); i++)
model->get_maxmin_system()->expand(host_list[i]->pimpl_cpu->get_constraint(), get_variable(),
(flops_amount == nullptr ? 0.0 : flops_amount[i]));
if (bytes_amount != nullptr) {
- for (size_t k = 0; k < host_nb * host_nb; k++) {
+ for (size_t k = 0; k < host_list.size() * host_list.size(); k++) {
if (bytes_amount[k] <= 0.0)
continue;
std::vector<kernel::resource::LinkImpl*> route;
- hostList_[k / host_nb]->route_to(hostList_[k % host_nb], route, nullptr);
+ hostList_[k / host_list.size()]->route_to(hostList_[k % host_list.size()], route, nullptr);
for (auto const& link : route)
model->get_maxmin_system()->expand_add(link->get_constraint(), this->get_variable(), bytes_amount[k]);
kernel::resource::Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate)
{
- sg_host_t* host_list = new sg_host_t[2]();
+ std::vector<s4u::Host*> host_list = {src, dst};
double* flops_amount = new double[2]();
double* bytes_amount = new double[4]();
- host_list[0] = src;
- host_list[1] = dst;
bytes_amount[1] = size;
- kernel::resource::Action* res = hostModel_->execute_parallel(2, host_list, flops_amount, bytes_amount, rate);
+ kernel::resource::Action* res = hostModel_->execute_parallel(host_list, flops_amount, bytes_amount, rate);
static_cast<L07Action*>(res)->free_arrays_ = true;
return res;
}
kernel::resource::Action* CpuL07::execution_start(double size)
{
- sg_host_t host_list[1] = {get_host()};
+ std::vector<s4u::Host*> host_list = {get_host()};
double* flops_amount = new double[1]();
flops_amount[0] = size;
kernel::resource::Action* res =
- static_cast<CpuL07Model*>(get_model())->hostModel_->execute_parallel(1, host_list, flops_amount, nullptr, -1);
+ static_cast<CpuL07Model*>(get_model())->hostModel_->execute_parallel(host_list, flops_amount, nullptr, -1);
static_cast<L07Action*>(res)->free_arrays_ = true;
return res;
}
double next_occuring_event(double now) override;
void update_actions_state(double now, double delta) override;
- kernel::resource::Action* execute_parallel(size_t host_nb, s4u::Host* const* host_list, const double* flops_amount,
+ kernel::resource::Action* execute_parallel(const std::vector<s4u::Host*> host_list, const double* flops_amount,
const double* bytes_amount, double rate) override;
};
class L07Action : public CpuAction {
friend Action *CpuL07::execution_start(double size);
friend Action *CpuL07::sleep(double duration);
- friend Action* HostL07Model::execute_parallel(size_t host_nb, s4u::Host* const* host_list, const double* flops_amount,
+ friend Action* HostL07Model::execute_parallel(const std::vector<s4u::Host*> host_list, const double* flops_amount,
const double* bytes_amount, double rate);
friend Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate);
public:
- L07Action(kernel::resource::Model* model, size_t host_nb, s4u::Host* const* host_list, const double* flops_amount,
+ L07Action(kernel::resource::Model* model, const std::vector<s4u::Host*> host_list, const double* flops_amount,
const double* bytes_amount, double rate);
L07Action(const L07Action&) = delete;
L07Action& operator=(const L07Action&) = delete;