#include "simgrid/s4u/Actor.hpp"
#include "simgrid/s4u/Exec.hpp"
#include "src/kernel/activity/ExecImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/mc/checker/SimcallObserver.hpp"
#include "xbt/log.h"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_exec, s4u_activity, "S4U asynchronous executions");
if (state_ == State::INITED)
vetoable_start();
- kernel::actor::ActorImpl* issuer = Actor::self()->get_impl();
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
kernel::actor::simcall_blocking<void>([this, issuer, timeout] { this->get_impl()->wait_for(issuer, timeout); });
state_ = State::FINISHED;
on_completion(*this);
std::transform(begin(*execs), end(*execs), begin(rexecs),
[](const ExecPtr& exec) { return static_cast<kernel::activity::ExecImpl*>(exec->pimpl_.get()); });
- int changed_pos = simcall_execution_waitany_for(rexecs.data(), rexecs.size(), timeout);
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ mc::ExecutionWaitanySimcall observer{issuer, &rexecs, timeout};
+ kernel::actor::simcall_blocking<void>(
+ [&observer] {
+ kernel::activity::ExecImpl::wait_any_for(observer.get_issuer(), observer.get_execs(), observer.get_timeout());
+ },
+ &observer);
+ int changed_pos = observer.get_result();
if (changed_pos != -1) {
on_completion(*(execs->at(changed_pos)));
execs->at(changed_pos)->release_dependencies();
{
kernel::actor::simcall([this] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->cancel(); });
state_ = State::CANCELED;
+ on_completion(*this);
return this;
}
*/
ExecPtr Exec::set_bound(double bound)
{
- xbt_assert(state_ == State::INITED, "Cannot change the bound of an exec after its start");
+ xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+ "Cannot change the bound of an exec after its start");
bound_ = bound;
return this;
}
ExecPtr Exec::set_timeout(double timeout) // XBT_ATTRIB_DEPRECATED_v329
{
- xbt_assert(state_ == State::INITED, "Cannot change the bound of an exec after its start");
+ xbt_assert(state_ == State::INITED|| state_ == State::STARTING,
+ "Cannot change the bound of an exec after its start");
timeout_ = timeout;
return this;
}
ExecPtr Exec::set_flops_amount(double flops_amount)
{
- xbt_assert(state_ == State::INITED, "Cannot change the flop_amount of an exec after its start");
+ xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+ "Cannot change the flop_amount of an exec after its start");
flops_amounts_.assign(1, flops_amount);
Activity::set_remaining(flops_amounts_.front());
return this;
ExecPtr Exec::set_flops_amounts(const std::vector<double>& flops_amounts)
{
- xbt_assert(state_ == State::INITED, "Cannot change the flops_amounts of an exec after its start");
+ xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+ "Cannot change the flops_amounts of an exec after its start");
flops_amounts_ = flops_amounts;
parallel_ = true;
return this;
ExecPtr Exec::set_bytes_amounts(const std::vector<double>& bytes_amounts)
{
- xbt_assert(state_ == State::INITED, "Cannot change the bytes_amounts of an exec after its start");
+ xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+ "Cannot change the bytes_amounts of an exec after its start");
bytes_amounts_ = bytes_amounts;
parallel_ = true;
return this;
return this;
}
-///////////// SEQUENTIAL EXECUTIONS ////////
Exec* Exec::start()
{
if (is_parallel())
} // namespace s4u
} // namespace simgrid
+
/* **************************** Public C interface *************************** */
void sg_exec_set_bound(sg_exec_t exec, double bound)
{
void sg_exec_start(sg_exec_t exec)
{
- exec->start();
+ exec->vetoable_start();
}
void sg_exec_cancel(sg_exec_t exec)