From eb08fb3c624342be6f198555c0ebb0d847c65d65 Mon Sep 17 00:00:00 2001 From: Frederic Suter Date: Thu, 12 Dec 2019 18:09:03 +0100 Subject: [PATCH] implement Exec::wait_for() --- .gitignore | 1 + ChangeLog | 1 + MANIFEST.in | 2 + examples/s4u/CMakeLists.txt | 2 +- .../s4u/exec-waitfor/s4u-exec-waitfor.cpp | 66 +++++++++++++++++++ .../s4u/exec-waitfor/s4u-exec-waitfor.tesh | 10 +++ include/simgrid/simix.h | 2 +- src/kernel/activity/ExecImpl.cpp | 19 +++++- src/s4u/s4u_Actor.cpp | 24 +++---- src/s4u/s4u_Exec.cpp | 12 ++-- src/simix/libsmx.cpp | 5 +- src/simix/popping_accessors.hpp | 14 +++- src/simix/popping_bodies.cpp | 6 +- src/simix/popping_generated.cpp | 2 +- src/simix/simcalls.in | 2 +- 15 files changed, 137 insertions(+), 31 deletions(-) create mode 100644 examples/s4u/exec-waitfor/s4u-exec-waitfor.cpp create mode 100644 examples/s4u/exec-waitfor/s4u-exec-waitfor.tesh diff --git a/.gitignore b/.gitignore index 5940061402..49d8abb217 100644 --- a/.gitignore +++ b/.gitignore @@ -181,6 +181,7 @@ examples/s4u/exec-monitor/s4u-exec-monitor examples/s4u/exec-ptask/s4u-exec-ptask examples/s4u/exec-remote/s4u-exec-remote examples/s4u/exec-waitany/s4u-exec-waitany +examples/s4u/exec-waitfor/s4u-exec-waitfor examples/s4u/io-async/s4u-io-async examples/s4u/io-file-remote/s4u-io-file-remote examples/s4u/io-file-system/s4u-io-file-system diff --git a/ChangeLog b/ChangeLog index 242ee774db..89cc700996 100644 --- a/ChangeLog +++ b/ChangeLog @@ -6,6 +6,7 @@ S4U: - Actor: Merge signals on_migration_start/end into on_host_change - Actor: Rename migrate() into set_host() - Disk: Allow users to get the read and write nominal bandwidth values +- Exec: Implement wait_for(timeout) XML: - Parse errors now raise a simgrid::ParseError that you may want to catch. diff --git a/MANIFEST.in b/MANIFEST.in index e8e29134c1..f2baf4bdde 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -376,6 +376,8 @@ include examples/s4u/exec-remote/s4u-exec-remote.cpp include examples/s4u/exec-remote/s4u-exec-remote.tesh include examples/s4u/exec-waitany/s4u-exec-waitany.cpp include examples/s4u/exec-waitany/s4u-exec-waitany.tesh +include examples/s4u/exec-waitfor/s4u-exec-waitfor.cpp +include examples/s4u/exec-waitfor/s4u-exec-waitfor.tesh include examples/s4u/io-async/s4u-io-async.cpp include examples/s4u/io-async/s4u-io-async.tesh include examples/s4u/io-disk-raw/s4u-io-disk-raw.cpp diff --git a/examples/s4u/CMakeLists.txt b/examples/s4u/CMakeLists.txt index 7565b6ca22..1130d4d8c9 100644 --- a/examples/s4u/CMakeLists.txt +++ b/examples/s4u/CMakeLists.txt @@ -9,7 +9,7 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill cloud-capping cloud-migration cloud-simple energy-exec energy-boot energy-link energy-vm energy-exec-ptask engine-filtering - exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany + exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany exec-waitfor io-async io-file-system io-file-remote io-disk-raw platform-failures platform-profile platform-properties plugin-hostload diff --git a/examples/s4u/exec-waitfor/s4u-exec-waitfor.cpp b/examples/s4u/exec-waitfor/s4u-exec-waitfor.cpp new file mode 100644 index 0000000000..6aeccfef75 --- /dev/null +++ b/examples/s4u/exec-waitfor/s4u-exec-waitfor.cpp @@ -0,0 +1,66 @@ +/* Copyright (c) 2019. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "simgrid/s4u.hpp" + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_waitfor, "Messages specific for this s4u example"); + +static void worker() +{ + simgrid::s4u::ExecPtr exec; + double amount = 5 * simgrid::s4u::this_actor::get_host()->get_speed(); + XBT_INFO("Create an activity that should run for 5 seconds"); + + exec = simgrid::s4u::this_actor::exec_async(amount); + + /* Now that execution is started, wait for 3 seconds. */ + XBT_INFO("But let it end after 3 seconds"); + try { + exec->wait_for(3); + XBT_INFO("Execution complete"); + } catch (simgrid::TimeoutException&) { + XBT_INFO("Execution Timeout!"); + } + + /* do it again, but this time with a timeout greater than the duration of the execution */ + XBT_INFO("Create another activity that should run for 5 seconds and wait for it for 6 seconds"); + exec = simgrid::s4u::this_actor::exec_async(amount); + try { + exec->wait_for(6); + XBT_INFO("Execution complete"); + } catch (simgrid::TimeoutException&) { + XBT_INFO("Execution Timeout!"); + } + + XBT_INFO("Finally test with a parallel execution"); + auto hosts = simgrid::s4u::Engine::get_instance()->get_all_hosts(); + size_t hosts_count = hosts.size(); + std::vector computation_amounts; + std::vector communication_amounts; + + computation_amounts.assign(hosts_count, 1e9 /*1Gflop*/); + communication_amounts.assign(hosts_count * hosts_count, 0); + for (size_t i = 0; i < hosts_count; i++) + for (size_t j = i + 1; j < hosts_count; j++) + communication_amounts[i * hosts_count + j] = 1e7; // 10 MB + + exec = simgrid::s4u::this_actor::exec_init(hosts, computation_amounts, communication_amounts); + try { + exec->wait_for(2); + XBT_INFO("Parallel Execution complete"); + } catch (simgrid::TimeoutException&) { + XBT_INFO("Parallel Execution Timeout!"); + } +} + +int main(int argc, char* argv[]) +{ + simgrid::s4u::Engine e(&argc, argv); + e.load_platform(argv[1]); + simgrid::s4u::Actor::create("worker", simgrid::s4u::Host::by_name("Tremblay"), worker); + e.run(); + + return 0; +} diff --git a/examples/s4u/exec-waitfor/s4u-exec-waitfor.tesh b/examples/s4u/exec-waitfor/s4u-exec-waitfor.tesh new file mode 100644 index 0000000000..c58d0488d1 --- /dev/null +++ b/examples/s4u/exec-waitfor/s4u-exec-waitfor.tesh @@ -0,0 +1,10 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-exec-waitfor ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n" +> [ 0.000000] [ worker] Create an activity that should run for 5 seconds +> [ 0.000000] [ worker] But let it end after 3 seconds +> [ 3.000000] [ worker] Execution Timeout! +> [ 3.000000] [ worker] Create another activity that should run for 5 seconds and wait for it for 6 seconds +> [ 8.000000] [ worker] Execution complete +> [ 8.000000] [ worker] Finally test with a parallel execution +> [ 10.000000] [ worker] Parallel Execution Timeout! diff --git a/include/simgrid/simix.h b/include/simgrid/simix.h index 9b6ff412a1..2e69f6ab94 100644 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@ -114,7 +114,7 @@ XBT_PUBLIC void SIMIX_comm_copy_buffer_callback(simgrid::kernel::activity::CommI /******************************* Host simcalls ********************************/ #ifdef __cplusplus -XBT_PUBLIC e_smx_state_t simcall_execution_wait(const smx_activity_t& execution); +XBT_PUBLIC e_smx_state_t simcall_execution_wait(const smx_activity_t& execution, double timeout); XBT_PUBLIC unsigned int simcall_execution_waitany_for(simgrid::kernel::activity::ExecImpl* execs[], size_t count, double timeout); XBT_PUBLIC bool simcall_execution_test(const smx_activity_t& execution); diff --git a/src/kernel/activity/ExecImpl.cpp b/src/kernel/activity/ExecImpl.cpp index eda2b58c9f..76b136ff85 100644 --- a/src/kernel/activity/ExecImpl.cpp +++ b/src/kernel/activity/ExecImpl.cpp @@ -17,23 +17,36 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_process); -void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro) +void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro, double timeout) { XBT_DEBUG("Wait for execution of synchro %p, state %d", synchro, (int)synchro->state_); + xbt_assert(std::isfinite(timeout), "timeout is not finite!"); /* Associate this simcall to the synchro */ synchro->register_simcall(simcall); /* set surf's synchro */ if (MC_is_active() || MC_record_replay_is_active()) { - synchro->state_ = simgrid::kernel::activity::State::DONE; + int idx = SIMCALL_GET_MC_VALUE(*simcall); + if (idx == 0) { + synchro->state_ = simgrid::kernel::activity::State::DONE; + } else { + /* If we reached this point, the wait simcall must have a timeout */ + /* Otherwise it shouldn't be enabled and executed by the MC */ + if (timeout < 0.0) + THROW_IMPOSSIBLE; + synchro->state_ = simgrid::kernel::activity::State::TIMEOUT; + } synchro->finish(); return; } /* If the synchro is already finished then perform the error handling */ - if (synchro->state_ != simgrid::kernel::activity::State::RUNNING) + if (synchro->state_ != simgrid::kernel::activity::State::RUNNING) { synchro->finish(); + } else { /* we need a sleep action (even when there is no timeout) to be notified of host failures */ + synchro->set_timeout(timeout); + } } void simcall_HANDLER_execution_test(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro) diff --git a/src/s4u/s4u_Actor.cpp b/src/s4u/s4u_Actor.cpp index 0ac6454070..e0768d86d2 100644 --- a/src/s4u/s4u_Actor.cpp +++ b/src/s4u/s4u_Actor.cpp @@ -345,11 +345,22 @@ void execute(double flops, double priority) void parallel_execute(const std::vector& hosts, const std::vector& flops_amounts, const std::vector& bytes_amounts) { - parallel_execute(hosts, flops_amounts, bytes_amounts, -1); + exec_init(hosts, flops_amounts, bytes_amounts)->wait(); } void parallel_execute(const std::vector& hosts, const std::vector& flops_amounts, const std::vector& bytes_amounts, double timeout) +{ + exec_init(hosts, flops_amounts, bytes_amounts)->wait_for(timeout); +} + +ExecPtr exec_init(double flops_amount) +{ + return ExecPtr(new ExecSeq(get_host(), flops_amount)); +} + +ExecPtr exec_init(const std::vector& hosts, const std::vector& flops_amounts, + const std::vector& bytes_amounts) { xbt_assert(hosts.size() > 0, "Your parallel executions must span over at least one host."); xbt_assert(hosts.size() == flops_amounts.size() || flops_amounts.empty(), @@ -371,17 +382,6 @@ void parallel_execute(const std::vector& hosts, const std::vectorset_timeout(timeout)->wait(); -} - -ExecPtr exec_init(double flops_amount) -{ - return ExecPtr(new ExecSeq(get_host(), flops_amount)); -} - -ExecPtr exec_init(const std::vector& hosts, const std::vector& flops_amounts, - const std::vector& bytes_amounts) -{ return ExecPtr(new ExecPar(hosts, flops_amounts, bytes_amounts)); } diff --git a/src/s4u/s4u_Exec.cpp b/src/s4u/s4u_Exec.cpp index ffe6962a94..66016d3c60 100644 --- a/src/s4u/s4u_Exec.cpp +++ b/src/s4u/s4u_Exec.cpp @@ -39,20 +39,20 @@ bool Exec::test() } Exec* Exec::wait() +{ + return this->wait_for(-1); +} + +Exec* Exec::wait_for(double timeout) { if (state_ == State::INITED) start(); - simcall_execution_wait(pimpl_); + simcall_execution_wait(pimpl_, timeout); state_ = State::FINISHED; on_completion(*Actor::self(), *this); return this; } -Exec* Exec::wait_for(double) -{ - THROW_UNIMPLEMENTED; -} - int Exec::wait_any_for(std::vector* execs, double timeout) { std::unique_ptr rexecs(new kernel::activity::ExecImpl*[execs->size()]); diff --git a/src/simix/libsmx.cpp b/src/simix/libsmx.cpp index 275c34d62d..3616ba05e1 100644 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@ -29,9 +29,10 @@ * * @param execution The execution synchro */ -e_smx_state_t simcall_execution_wait(const smx_activity_t& execution) +e_smx_state_t simcall_execution_wait(const smx_activity_t& execution, double timeout) { - return (e_smx_state_t)simcall_BODY_execution_wait(static_cast(execution.get())); + return (e_smx_state_t)simcall_BODY_execution_wait(static_cast(execution.get()), + timeout); } bool simcall_execution_test(const smx_activity_t& execution) diff --git a/src/simix/popping_accessors.hpp b/src/simix/popping_accessors.hpp index 066b3edc43..96328c22e2 100644 --- a/src/simix/popping_accessors.hpp +++ b/src/simix/popping_accessors.hpp @@ -27,6 +27,18 @@ static inline void simcall_execution_wait__set__execution(smx_simcall_t simcall, { simgrid::simix::marshal(simcall->args_[0], arg); } +static inline double simcall_execution_wait__get__timeout(smx_simcall_t simcall) +{ + return simgrid::simix::unmarshal(simcall->args_[1]); +} +static inline double simcall_execution_wait__getraw__timeout(smx_simcall_t simcall) +{ + return simgrid::simix::unmarshal_raw(simcall->args_[1]); +} +static inline void simcall_execution_wait__set__timeout(smx_simcall_t simcall, double arg) +{ + simgrid::simix::marshal(simcall->args_[1], arg); +} static inline int simcall_execution_wait__get__result(smx_simcall_t simcall) { return simgrid::simix::unmarshal(simcall->result_); @@ -999,7 +1011,7 @@ static inline void simcall_run_blocking__set__code(smx_simcall_t simcall, std::f /* The prototype of all simcall handlers, automatically generated for you */ -XBT_PRIVATE void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution); +XBT_PRIVATE void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution, double timeout); XBT_PRIVATE void simcall_HANDLER_execution_waitany_for(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout); XBT_PRIVATE void simcall_HANDLER_execution_test(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution); XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t sender, smx_mailbox_t mbox, double task_size, double rate, unsigned char* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout); diff --git a/src/simix/popping_bodies.cpp b/src/simix/popping_bodies.cpp index ba60b6d8c3..4462018649 100644 --- a/src/simix/popping_bodies.cpp +++ b/src/simix/popping_bodies.cpp @@ -39,11 +39,11 @@ inline static R simcall(e_smx_simcall_t call, T const&... t) return simgrid::simix::unmarshal(self->simcall.result_); } -inline static int simcall_BODY_execution_wait(simgrid::kernel::activity::ExecImpl* execution) +inline static int simcall_BODY_execution_wait(simgrid::kernel::activity::ExecImpl* execution, double timeout) { if (0) /* Go to that function to follow the code flow through the simcall barrier */ - simcall_HANDLER_execution_wait(&SIMIX_process_self()->simcall, execution); - return simcall(SIMCALL_EXECUTION_WAIT, execution); + simcall_HANDLER_execution_wait(&SIMIX_process_self()->simcall, execution, timeout); + return simcall(SIMCALL_EXECUTION_WAIT, execution, timeout); } inline static int simcall_BODY_execution_waitany_for(simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout) diff --git a/src/simix/popping_generated.cpp b/src/simix/popping_generated.cpp index c0143071ef..df1deab123 100644 --- a/src/simix/popping_generated.cpp +++ b/src/simix/popping_generated.cpp @@ -62,7 +62,7 @@ void simgrid::kernel::actor::ActorImpl::simcall_handle(int value) { return; switch (simcall.call_) { case SIMCALL_EXECUTION_WAIT: - simcall_HANDLER_execution_wait(&simcall, simgrid::simix::unmarshal(simcall.args_[0])); + simcall_HANDLER_execution_wait(&simcall, simgrid::simix::unmarshal(simcall.args_[0]), simgrid::simix::unmarshal(simcall.args_[1])); break; case SIMCALL_EXECUTION_WAITANY_FOR: diff --git a/src/simix/simcalls.in b/src/simix/simcalls.in index 05a4c06aef..bf2371e0ec 100644 --- a/src/simix/simcalls.in +++ b/src/simix/simcalls.in @@ -35,7 +35,7 @@ # Last but not the least, you should declare the new simix call in # ./include/simgrid/simix.h (otherwise you will get a warning at compile time) -int execution_wait(simgrid::kernel::activity::ExecImpl* execution) [[block]]; +int execution_wait(simgrid::kernel::activity::ExecImpl* execution, double timeout) [[block]]; int execution_waitany_for(simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout) [[block]]; bool execution_test(simgrid::kernel::activity::ExecImpl* execution) [[block]]; -- 2.20.1