- Actor: Rename migrate() into set_host()
- Disk: Allow users to get the read and write nominal bandwidth values
- Exec: Implement wait_for(timeout)
+- Io: Implement wait_for(timeout)
XML:
- Parse errors now raise a simgrid::ParseError that you may want to catch.
XBT_INFO("Goodbye now!");
}
+static void test_waitfor(sg_size_t size)
+{
+ simgrid::s4u::Disk* disk = simgrid::s4u::Host::current()->get_disks().front();
+ XBT_INFO("Hello! write %llu bytes from %s", size, disk->get_cname());
+
+ simgrid::s4u::IoPtr activity = disk->write_async(size);
+ try {
+ activity->wait_for(0.5);
+ } catch (simgrid::TimeoutException&) {
+ XBT_INFO("Asynchronous write: Timeout!");
+ }
+
+ XBT_INFO("Goodbye now!");
+}
+
static void test_cancel(sg_size_t size)
{
simgrid::s4u::Disk* disk = simgrid::s4u::Host::current()->get_disks().front();
+ simgrid::s4u::this_actor::sleep_for(0.5);
XBT_INFO("Hello! write %llu bytes from %s", size, disk->get_cname());
simgrid::s4u::IoPtr activity = disk->write_async(size);
simgrid::s4u::Engine e(&argc, argv);
e.load_platform(argv[1]);
simgrid::s4u::Actor::create("test", simgrid::s4u::Host::by_name("bob"), test, 2e7);
+ simgrid::s4u::Actor::create("test_waitfor", simgrid::s4u::Host::by_name("alice"), test_waitfor, 5e7);
simgrid::s4u::Actor::create("test_cancel", simgrid::s4u::Host::by_name("alice"), test_cancel, 5e7);
e.run();
$ ${bindir:=.}/s4u-io-async ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
> [ 0.000000] (1:test@bob) Hello! read 20000000 bytes from Disk1
-> [ 0.000000] (2:test_cancel@alice) Hello! write 50000000 bytes from Disk1
+> [ 0.000000] (2:test_waitfor@alice) Hello! write 50000000 bytes from Disk1
> [ 0.200000] (1:test@bob) Goodbye now!
-> [ 0.500000] (2:test_cancel@alice) I changed my mind, cancel!
-> [ 0.500000] (2:test_cancel@alice) Goodbye now!
-> [ 0.500000] (0:maestro@) Simulation time 0.5
+> [ 0.500000] (2:test_waitfor@alice) Asynchronous write: Timeout!
+> [ 0.500000] (2:test_waitfor@alice) Goodbye now!
+> [ 0.500000] (3:test_cancel@alice) Hello! write 50000000 bytes from Disk1
+> [ 1.000000] (3:test_cancel@alice) I changed my mind, cancel!
+> [ 1.000000] (3:test_cancel@alice) Goodbye now!
+> [ 1.000000] (0:maestro@) Simulation time 1
/***************************** Io **************************************/
#ifdef __cplusplus
-XBT_PUBLIC e_smx_state_t simcall_io_wait(const smx_activity_t& io);
+XBT_PUBLIC e_smx_state_t simcall_io_wait(const smx_activity_t& io, double timeout);
#endif
/************************** MC simcalls **********************************/
SG_BEGIN_DECL
#include "src/mc/mc_replay.hpp"
#include "src/simix/smx_private.hpp"
#include "src/surf/StorageImpl.hpp"
+#include "src/surf/cpu_interface.hpp"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_io, simix, "Logging specific to SIMIX (io)");
-void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* synchro)
+void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* synchro, double timeout)
{
XBT_DEBUG("Wait for execution of synchro %p, state %d", synchro, (int)synchro->state_);
/* Associate this simcall to the synchro */
synchro->register_simcall(simcall);
- if (MC_is_active() || MC_record_replay_is_active())
- synchro->state_ = simgrid::kernel::activity::State::DONE;
+ if (MC_is_active() || MC_record_replay_is_active()) {
+ int idx = SIMCALL_GET_MC_VALUE(*simcall);
+ if (idx == 0) {
+ synchro->state_ = simgrid::kernel::activity::State::DONE;
+ } else {
+ /* If we reached this point, the wait simcall must have a timeout */
+ /* Otherwise it shouldn't be enabled and executed by the MC */
+ if (timeout < 0.0)
+ THROW_IMPOSSIBLE;
+ synchro->state_ = simgrid::kernel::activity::State::TIMEOUT;
+ }
+ synchro->finish();
+ }
/* If the synchro is already finished then perform the error handling */
if (synchro->state_ != simgrid::kernel::activity::State::RUNNING)
synchro->finish();
+ else {
+ /* we need a sleep action (even when there is no timeout) to be notified of host failures */
+ if (synchro->get_disk() != nullptr)
+ synchro->timeout_detector_ = synchro->get_disk()->get_host()->pimpl_cpu->sleep(timeout);
+ else
+ synchro->timeout_detector_ =
+ simgrid::s4u::Host::by_name(synchro->get_storage()->get_host())->pimpl_cpu->sleep(timeout);
+ synchro->timeout_detector_->set_activity(synchro);
+ }
}
namespace simgrid {
state_ = State::CANCELED;
} else if (surf_action_->get_state() == resource::Action::State::FINISHED) {
state_ = State::DONE;
+ } else if (timeout_detector_ && timeout_detector_->get_state() == resource::Action::State::FINISHED) {
+ state_ = State::TIMEOUT;
}
+
+ if (timeout_detector_) {
+ timeout_detector_->unref();
+ timeout_detector_ = nullptr;
+ }
+
on_completion(*this);
/* Answer all simcalls associated with the synchro */
case State::CANCELED:
simcall->issuer_->exception_ = std::make_exception_ptr(CancelException(XBT_THROW_POINT, "I/O Canceled"));
break;
+ case State::TIMEOUT:
+ XBT_DEBUG("IoImpl::finish(): execution timeouted");
+ simcall->issuer_->exception_ = std::make_exception_ptr(simgrid::TimeoutException(XBT_THROW_POINT, "Timeouted"));
+ break;
default:
xbt_die("Internal error in IoImpl::finish(): unexpected synchro state %d", static_cast<int>(state_));
}
IoImpl& set_disk(resource::DiskImpl* disk);
sg_size_t get_performed_ioops() { return performed_ioops_; }
+ resource::DiskImpl* get_disk() { return disk_; }
+ resource::StorageImpl* get_storage() { return storage_; }
IoImpl* start();
void post() override;
void finish() override;
+ resource::Action* timeout_detector_ = nullptr;
static xbt::signal<void(IoImpl const&)> on_start;
static xbt::signal<void(IoImpl const&)> on_completion;
};
Io* Io::wait()
{
- simcall_io_wait(pimpl_);
- state_ = State::FINISHED;
- return this;
+ return this->wait_for(-1);
}
-Io* Io::wait_for(double)
+Io* Io::wait_for(double timeout)
{
- THROW_UNIMPLEMENTED;
+ simcall_io_wait(pimpl_, timeout);
+ state_ = State::FINISHED;
+ return this;
}
bool Io::test()
return simcall_BODY_sem_acquire_timeout(sem, timeout);
}
-e_smx_state_t simcall_io_wait(const smx_activity_t& io)
+e_smx_state_t simcall_io_wait(const smx_activity_t& io, double timeout)
{
- return (e_smx_state_t)simcall_BODY_io_wait(static_cast<simgrid::kernel::activity::IoImpl*>(io.get()));
+ return (e_smx_state_t)simcall_BODY_io_wait(static_cast<simgrid::kernel::activity::IoImpl*>(io.get()), timeout);
}
void simcall_run_kernel(std::function<void()> const& code, simgrid::mc::SimcallInspector* t)
{
simgrid::simix::marshal<simgrid::kernel::activity::IoImpl*>(simcall->args_[0], arg);
}
+static inline double simcall_io_wait__get__timeout(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal<double>(simcall->args_[1]);
+}
+static inline double simcall_io_wait__getraw__timeout(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal_raw<double>(simcall->args_[1]);
+}
+static inline void simcall_io_wait__set__timeout(smx_simcall_t simcall, double arg)
+{
+ simgrid::simix::marshal<double>(simcall->args_[1], arg);
+}
static inline sg_size_t simcall_io_wait__get__result(smx_simcall_t simcall)
{
return simgrid::simix::unmarshal<sg_size_t>(simcall->result_);
XBT_PRIVATE void simcall_HANDLER_cond_wait_timeout(smx_simcall_t simcall, smx_cond_t cond, smx_mutex_t mutex, double timeout);
XBT_PRIVATE void simcall_HANDLER_sem_acquire(smx_simcall_t simcall, smx_sem_t sem);
XBT_PRIVATE void simcall_HANDLER_sem_acquire_timeout(smx_simcall_t simcall, smx_sem_t sem, double timeout);
-XBT_PRIVATE void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* io);
+XBT_PRIVATE void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* io, double timeout);
XBT_PRIVATE int simcall_HANDLER_mc_random(smx_simcall_t simcall, int min, int max);
return simcall<int, smx_sem_t, double>(SIMCALL_SEM_ACQUIRE_TIMEOUT, sem, timeout);
}
-inline static sg_size_t simcall_BODY_io_wait(simgrid::kernel::activity::IoImpl* io)
+inline static sg_size_t simcall_BODY_io_wait(simgrid::kernel::activity::IoImpl* io, double timeout)
{
if (0) /* Go to that function to follow the code flow through the simcall barrier */
- simcall_HANDLER_io_wait(&SIMIX_process_self()->simcall, io);
- return simcall<sg_size_t, simgrid::kernel::activity::IoImpl*>(SIMCALL_IO_WAIT, io);
+ simcall_HANDLER_io_wait(&SIMIX_process_self()->simcall, io, timeout);
+ return simcall<sg_size_t, simgrid::kernel::activity::IoImpl*, double>(SIMCALL_IO_WAIT, io, timeout);
}
inline static int simcall_BODY_mc_random(int min, int max)
break;
case SIMCALL_IO_WAIT:
- simcall_HANDLER_io_wait(&simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::IoImpl*>(simcall.args_[0]));
+ simcall_HANDLER_io_wait(&simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::IoImpl*>(simcall.args_[0]), simgrid::simix::unmarshal<double>(simcall.args_[1]));
break;
case SIMCALL_MC_RANDOM:
void sem_acquire(smx_sem_t sem) [[block]];
int sem_acquire_timeout(smx_sem_t sem, double timeout) [[block]];
-sg_size_t io_wait(simgrid::kernel::activity::IoImpl* io) [[block]];
+sg_size_t io_wait(simgrid::kernel::activity::IoImpl* io, double timeout) [[block]];
int mc_random(int min, int max);