X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/b94ef9f53ddfc2ac1f56fee3e5b1962da78cafe9..d0d1912b2262e2aaf06b39115de5f846e2a86efe:/src/kernel/EngineImpl.cpp diff --git a/src/kernel/EngineImpl.cpp b/src/kernel/EngineImpl.cpp index b119614040..f2ce40346a 100644 --- a/src/kernel/EngineImpl.cpp +++ b/src/kernel/EngineImpl.cpp @@ -41,6 +41,8 @@ EngineImpl::~EngineImpl() for (auto const& kv : links_) if (kv.second) kv.second->destroy(); + actors_to_run_.clear(); + actors_that_ran_.clear(); } void EngineImpl::load_deployment(const std::string& file) const @@ -76,6 +78,86 @@ void EngineImpl::add_model(std::shared_ptr model, const std::ve models_prio_[model_name] = std::move(model); } +/** Wake up all actors waiting for a Surf action to finish */ +void EngineImpl::wake_all_waiting_actors() const +{ + for (auto const& model : models_) { + XBT_DEBUG("Handling the failed actions (if any)"); + while (auto* action = model->extract_failed_action()) { + XBT_DEBUG(" Handling Action %p", action); + if (action->get_activity() != nullptr) + activity::ActivityImplPtr(action->get_activity())->post(); + } + XBT_DEBUG("Handling the terminated actions (if any)"); + while (auto* action = model->extract_done_action()) { + XBT_DEBUG(" Handling Action %p", action); + if (action->get_activity() == nullptr) + XBT_DEBUG("probably vcpu's action %p, skip", action); + else + activity::ActivityImplPtr(action->get_activity())->post(); + } + } +} +/** + * @brief Executes the actors in actors_to_run. + * + * The actors in actors_to_run are run (in parallel if possible). On exit, actors_to_run is empty, and actors_that_ran + * contains the list of actors that just ran. The two lists are swapped so, be careful when using them before and after + * a call to this function. + */ +void EngineImpl::run_all_actors() +{ + simix_global->context_factory->run_all(); + + actors_to_run_.swap(actors_that_ran_); + actors_to_run_.clear(); +} + +/** Execute all the tasks that are queued, e.g. `.then()` callbacks of futures. */ +bool EngineImpl::execute_tasks() +{ + xbt_assert(tasksTemp.empty()); + + if (tasks.empty()) + return false; + + do { + // We don't want the callbacks to modify the vector we are iterating over: + tasks.swap(tasksTemp); + + // Execute all the queued tasks: + for (auto& task : tasksTemp) + task(); + + tasksTemp.clear(); + } while (not tasks.empty()); + + return true; +} + +void EngineImpl::rm_daemon(actor::ActorImpl* actor) +{ + auto it = daemons_.find(actor); + xbt_assert(it != daemons_.end(), "The dying daemon is not a daemon after all. Please report that bug."); + daemons_.erase(it); +} + +void EngineImpl::add_actor_to_run_list_no_check(actor::ActorImpl* actor) +{ + XBT_DEBUG("Inserting [%p] %s(%s) in the to_run list", actor, actor->get_cname(), actor->get_host()->get_cname()); + actors_to_run_.push_back(actor); +} + +void EngineImpl::add_actor_to_run_list(actor::ActorImpl* actor) +{ + if (std::find(begin(actors_to_run_), end(actors_to_run_), actor) != end(actors_to_run_)) { + XBT_DEBUG("Actor %s is already in the to_run list", actor->get_cname()); + } else { + XBT_DEBUG("Inserting [%p] %s(%s) in the to_run list", actor, actor->get_cname(), actor->get_host()->get_cname()); + actors_to_run_.push_back(actor); + } +} + void EngineImpl::run() { if (MC_record_replay_is_active()) { @@ -86,7 +168,7 @@ void EngineImpl::run() double time = 0; do { - XBT_DEBUG("New Schedule Round; size(queue)=%zu", simix_global->actors_to_run.size()); + XBT_DEBUG("New Schedule Round; size(queue)=%zu", actors_to_run_.size()); if (cfg_breakpoint >= 0.0 && surf_get_clock() >= cfg_breakpoint) { XBT_DEBUG("Breakpoint reached (%g)", cfg_breakpoint.get()); @@ -98,13 +180,13 @@ void EngineImpl::run() #endif } - simix_global->execute_tasks(); + execute_tasks(); - while (not simix_global->actors_to_run.empty()) { - XBT_DEBUG("New Sub-Schedule Round; size(queue)=%zu", simix_global->actors_to_run.size()); + while (not actors_to_run_.empty()) { + XBT_DEBUG("New Sub-Schedule Round; size(queue)=%zu", actors_to_run_.size()); - /* Run all processes that are ready to run, possibly in parallel */ - simix_global->run_all_actors(); + /* Run all actors that are ready to run, possibly in parallel */ + run_all_actors(); /* answer sequentially and in a fixed arbitrary order all the simcalls that were issued during that sub-round */ @@ -168,20 +250,20 @@ void EngineImpl::run() * That would thus be a pure waste of time. */ - for (auto const& actor : simix_global->actors_that_ran) { + for (auto const& actor : actors_that_ran_) { if (actor->simcall_.call_ != simix::Simcall::NONE) { actor->simcall_handle(0); } } - simix_global->execute_tasks(); + execute_tasks(); do { - simix_global->wake_all_waiting_actors(); - } while (simix_global->execute_tasks()); + wake_all_waiting_actors(); + } while (execute_tasks()); - /* If only daemon processes remain, cancel their actions, mark them to die and reschedule them */ - if (simix_global->process_list.size() == simix_global->daemons.size()) - for (auto const& dmon : simix_global->daemons) { + /* If only daemon actors remain, cancel their actions, mark them to die and reschedule them */ + if (simix_global->process_list.size() == daemons_.size()) + for (auto const& dmon : daemons_) { XBT_DEBUG("Kill %s", dmon->get_cname()); simix_global->maestro_->kill(dmon); } @@ -196,25 +278,24 @@ void EngineImpl::run() /* Notify all the hosts that have failed */ /* FIXME: iterate through the list of failed host and mark each of them */ - /* as failed. On each host, signal all the running processes with host_fail */ + /* as failed. On each host, signal all the running actors with host_fail */ // Execute timers and tasks until there isn't anything to be done: bool again = false; do { again = timer::Timer::execute_all(); - if (simix_global->execute_tasks()) + if (execute_tasks()) again = true; - simix_global->wake_all_waiting_actors(); + wake_all_waiting_actors(); } while (again); /* Clean actors to destroy */ simix_global->empty_trash(); - XBT_DEBUG("### time %f, #processes %zu, #to_run %zu", time, simix_global->process_list.size(), - simix_global->actors_to_run.size()); + XBT_DEBUG("### time %f, #actors %zu, #to_run %zu", time, simix_global->process_list.size(), actors_to_run_.size()); - if (time < 0. && simix_global->actors_to_run.empty() && not simix_global->process_list.empty()) { - if (simix_global->process_list.size() <= simix_global->daemons.size()) { + if (time < 0. && actors_to_run_.empty() && not simix_global->process_list.empty()) { + if (simix_global->process_list.size() <= daemons_.size()) { XBT_CRITICAL("Oops! Daemon actors cannot do any blocking activity (communications, synchronization, etc) " "once the simulation is over. Please fix your on_exit() functions."); } else { @@ -227,7 +308,7 @@ void EngineImpl::run() simix_global->maestro_->kill(kv.second); } } - } while (time > -1.0 || not simix_global->actors_to_run.empty()); + } while (time > -1.0 || has_actors_to_run()); if (not simix_global->process_list.empty()) THROW_IMPOSSIBLE;