Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add waitfor of Io too
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Fri, 13 Dec 2019 09:46:54 +0000 (10:46 +0100)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Fri, 13 Dec 2019 09:47:58 +0000 (10:47 +0100)
12 files changed:
ChangeLog
examples/s4u/io-async/s4u-io-async.cpp
examples/s4u/io-async/s4u-io-async.tesh
include/simgrid/simix.h
src/kernel/activity/IoImpl.cpp
src/kernel/activity/IoImpl.hpp
src/s4u/s4u_Io.cpp
src/simix/libsmx.cpp
src/simix/popping_accessors.hpp
src/simix/popping_bodies.cpp
src/simix/popping_generated.cpp
src/simix/simcalls.in

index 89cc700..b8acaf6 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -7,6 +7,7 @@ S4U:
 - Actor: Rename migrate() into set_host()
 - Disk: Allow users to get the read and write nominal bandwidth values
 - Exec: Implement wait_for(timeout)
+- Io: Implement wait_for(timeout)
 
 XML:
 - Parse errors now raise a simgrid::ParseError that you may want to catch.
index 597284f..8819f75 100644 (file)
@@ -19,9 +19,25 @@ static void test(sg_size_t size)
   XBT_INFO("Goodbye now!");
 }
 
+static void test_waitfor(sg_size_t size)
+{
+  simgrid::s4u::Disk* disk = simgrid::s4u::Host::current()->get_disks().front();
+  XBT_INFO("Hello! write %llu bytes from %s", size, disk->get_cname());
+
+  simgrid::s4u::IoPtr activity = disk->write_async(size);
+  try {
+    activity->wait_for(0.5);
+  } catch (simgrid::TimeoutException&) {
+    XBT_INFO("Asynchronous write: Timeout!");
+  }
+
+  XBT_INFO("Goodbye now!");
+}
+
 static void test_cancel(sg_size_t size)
 {
   simgrid::s4u::Disk* disk = simgrid::s4u::Host::current()->get_disks().front();
+  simgrid::s4u::this_actor::sleep_for(0.5);
   XBT_INFO("Hello! write %llu bytes from %s", size, disk->get_cname());
 
   simgrid::s4u::IoPtr activity = disk->write_async(size);
@@ -37,6 +53,7 @@ int main(int argc, char* argv[])
   simgrid::s4u::Engine e(&argc, argv);
   e.load_platform(argv[1]);
   simgrid::s4u::Actor::create("test", simgrid::s4u::Host::by_name("bob"), test, 2e7);
+  simgrid::s4u::Actor::create("test_waitfor", simgrid::s4u::Host::by_name("alice"), test_waitfor, 5e7);
   simgrid::s4u::Actor::create("test_cancel", simgrid::s4u::Host::by_name("alice"), test_cancel, 5e7);
 
   e.run();
index 0ff50eb..3b6c006 100644 (file)
@@ -2,8 +2,11 @@
 
 $ ${bindir:=.}/s4u-io-async ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
 > [  0.000000] (1:test@bob) Hello! read 20000000 bytes from Disk1
-> [  0.000000] (2:test_cancel@alice) Hello! write 50000000 bytes from Disk1
+> [  0.000000] (2:test_waitfor@alice) Hello! write 50000000 bytes from Disk1
 > [  0.200000] (1:test@bob) Goodbye now!
-> [  0.500000] (2:test_cancel@alice) I changed my mind, cancel!
-> [  0.500000] (2:test_cancel@alice) Goodbye now!
-> [  0.500000] (0:maestro@) Simulation time 0.5
+> [  0.500000] (2:test_waitfor@alice) Asynchronous write: Timeout!
+> [  0.500000] (2:test_waitfor@alice) Goodbye now!
+> [  0.500000] (3:test_cancel@alice) Hello! write 50000000 bytes from Disk1
+> [  1.000000] (3:test_cancel@alice) I changed my mind, cancel!
+> [  1.000000] (3:test_cancel@alice) Goodbye now!
+> [  1.000000] (0:maestro@) Simulation time 1
index 2e69f6a..cf9e3e0 100644 (file)
@@ -191,7 +191,7 @@ SG_END_DECL
 
 /*****************************   Io   **************************************/
 #ifdef __cplusplus
-XBT_PUBLIC e_smx_state_t simcall_io_wait(const smx_activity_t& io);
+XBT_PUBLIC e_smx_state_t simcall_io_wait(const smx_activity_t& io, double timeout);
 #endif
 /************************** MC simcalls   **********************************/
 SG_BEGIN_DECL
index 783616f..338999b 100644 (file)
 #include "src/mc/mc_replay.hpp"
 #include "src/simix/smx_private.hpp"
 #include "src/surf/StorageImpl.hpp"
+#include "src/surf/cpu_interface.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_io, simix, "Logging specific to SIMIX (io)");
 
-void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* synchro)
+void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* synchro, double timeout)
 {
   XBT_DEBUG("Wait for execution of synchro %p, state %d", synchro, (int)synchro->state_);
 
   /* Associate this simcall to the synchro */
   synchro->register_simcall(simcall);
 
-  if (MC_is_active() || MC_record_replay_is_active())
-    synchro->state_ = simgrid::kernel::activity::State::DONE;
+  if (MC_is_active() || MC_record_replay_is_active()) {
+    int idx = SIMCALL_GET_MC_VALUE(*simcall);
+    if (idx == 0) {
+      synchro->state_ = simgrid::kernel::activity::State::DONE;
+    } else {
+      /* If we reached this point, the wait simcall must have a timeout */
+      /* Otherwise it shouldn't be enabled and executed by the MC */
+      if (timeout < 0.0)
+        THROW_IMPOSSIBLE;
+      synchro->state_ = simgrid::kernel::activity::State::TIMEOUT;
+    }
+    synchro->finish();
+  }
 
   /* If the synchro is already finished then perform the error handling */
   if (synchro->state_ != simgrid::kernel::activity::State::RUNNING)
     synchro->finish();
+  else {
+    /* we need a sleep action (even when there is no timeout) to be notified of host failures */
+    if (synchro->get_disk() != nullptr)
+      synchro->timeout_detector_ = synchro->get_disk()->get_host()->pimpl_cpu->sleep(timeout);
+    else
+      synchro->timeout_detector_ =
+          simgrid::s4u::Host::by_name(synchro->get_storage()->get_host())->pimpl_cpu->sleep(timeout);
+    synchro->timeout_detector_->set_activity(synchro);
+  }
 }
 
 namespace simgrid {
@@ -82,7 +103,15 @@ void IoImpl::post()
       state_ = State::CANCELED;
   } else if (surf_action_->get_state() == resource::Action::State::FINISHED) {
     state_ = State::DONE;
+  } else if (timeout_detector_ && timeout_detector_->get_state() == resource::Action::State::FINISHED) {
+    state_ = State::TIMEOUT;
   }
+
+  if (timeout_detector_) {
+    timeout_detector_->unref();
+    timeout_detector_ = nullptr;
+  }
+
   on_completion(*this);
 
   /* Answer all simcalls associated with the synchro */
@@ -106,6 +135,10 @@ void IoImpl::finish()
       case State::CANCELED:
         simcall->issuer_->exception_ = std::make_exception_ptr(CancelException(XBT_THROW_POINT, "I/O Canceled"));
         break;
+      case State::TIMEOUT:
+        XBT_DEBUG("IoImpl::finish(): execution timeouted");
+        simcall->issuer_->exception_ = std::make_exception_ptr(simgrid::TimeoutException(XBT_THROW_POINT, "Timeouted"));
+        break;
       default:
         xbt_die("Internal error in IoImpl::finish(): unexpected synchro state %d", static_cast<int>(state_));
     }
index cbc2f0d..aaa715d 100644 (file)
@@ -28,11 +28,14 @@ public:
   IoImpl& set_disk(resource::DiskImpl* disk);
 
   sg_size_t get_performed_ioops() { return performed_ioops_; }
+  resource::DiskImpl* get_disk() { return disk_; }
+  resource::StorageImpl* get_storage() { return storage_; }
 
   IoImpl* start();
   void post() override;
   void finish() override;
 
+  resource::Action* timeout_detector_ = nullptr;
   static xbt::signal<void(IoImpl const&)> on_start;
   static xbt::signal<void(IoImpl const&)> on_completion;
 };
index cca68b0..d02bce1 100644 (file)
@@ -56,14 +56,14 @@ Io* Io::cancel()
 
 Io* Io::wait()
 {
-  simcall_io_wait(pimpl_);
-  state_ = State::FINISHED;
-  return this;
+  return this->wait_for(-1);
 }
 
-Io* Io::wait_for(double)
+Io* Io::wait_for(double timeout)
 {
-  THROW_UNIMPLEMENTED;
+  simcall_io_wait(pimpl_, timeout);
+  state_ = State::FINISHED;
+  return this;
 }
 
 bool Io::test()
index 3616ba0..5cc3f8c 100644 (file)
@@ -303,9 +303,9 @@ int simcall_sem_acquire_timeout(smx_sem_t sem, double timeout)
   return simcall_BODY_sem_acquire_timeout(sem, timeout);
 }
 
-e_smx_state_t simcall_io_wait(const smx_activity_t& io)
+e_smx_state_t simcall_io_wait(const smx_activity_t& io, double timeout)
 {
-  return (e_smx_state_t)simcall_BODY_io_wait(static_cast<simgrid::kernel::activity::IoImpl*>(io.get()));
+  return (e_smx_state_t)simcall_BODY_io_wait(static_cast<simgrid::kernel::activity::IoImpl*>(io.get()), timeout);
 }
 
 void simcall_run_kernel(std::function<void()> const& code, simgrid::mc::SimcallInspector* t)
index 96328c2..a3c8dd0 100644 (file)
@@ -933,6 +933,18 @@ static inline void simcall_io_wait__set__io(smx_simcall_t simcall, simgrid::kern
 {
   simgrid::simix::marshal<simgrid::kernel::activity::IoImpl*>(simcall->args_[0], arg);
 }
+static inline double simcall_io_wait__get__timeout(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal<double>(simcall->args_[1]);
+}
+static inline double simcall_io_wait__getraw__timeout(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal_raw<double>(simcall->args_[1]);
+}
+static inline void simcall_io_wait__set__timeout(smx_simcall_t simcall, double arg)
+{
+  simgrid::simix::marshal<double>(simcall->args_[1], arg);
+}
 static inline sg_size_t simcall_io_wait__get__result(smx_simcall_t simcall)
 {
   return simgrid::simix::unmarshal<sg_size_t>(simcall->result_);
@@ -1029,5 +1041,5 @@ XBT_PRIVATE void simcall_HANDLER_cond_wait(smx_simcall_t simcall, smx_cond_t con
 XBT_PRIVATE void simcall_HANDLER_cond_wait_timeout(smx_simcall_t simcall, smx_cond_t cond, smx_mutex_t mutex, double timeout);
 XBT_PRIVATE void simcall_HANDLER_sem_acquire(smx_simcall_t simcall, smx_sem_t sem);
 XBT_PRIVATE void simcall_HANDLER_sem_acquire_timeout(smx_simcall_t simcall, smx_sem_t sem, double timeout);
-XBT_PRIVATE void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* io);
+XBT_PRIVATE void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* io, double timeout);
 XBT_PRIVATE int simcall_HANDLER_mc_random(smx_simcall_t simcall, int min, int max);
index 4462018..356a7b7 100644 (file)
@@ -165,11 +165,11 @@ inline static int simcall_BODY_sem_acquire_timeout(smx_sem_t sem, double timeout
   return simcall<int, smx_sem_t, double>(SIMCALL_SEM_ACQUIRE_TIMEOUT, sem, timeout);
 }
 
-inline static sg_size_t simcall_BODY_io_wait(simgrid::kernel::activity::IoImpl* io)
+inline static sg_size_t simcall_BODY_io_wait(simgrid::kernel::activity::IoImpl* io, double timeout)
 {
   if (0) /* Go to that function to follow the code flow through the simcall barrier */
-    simcall_HANDLER_io_wait(&SIMIX_process_self()->simcall, io);
-  return simcall<sg_size_t, simgrid::kernel::activity::IoImpl*>(SIMCALL_IO_WAIT, io);
+    simcall_HANDLER_io_wait(&SIMIX_process_self()->simcall, io, timeout);
+  return simcall<sg_size_t, simgrid::kernel::activity::IoImpl*, double>(SIMCALL_IO_WAIT, io, timeout);
 }
 
 inline static int simcall_BODY_mc_random(int min, int max)
index df1deab..d6d25a0 100644 (file)
@@ -138,7 +138,7 @@ void simgrid::kernel::actor::ActorImpl::simcall_handle(int value) {
       break;
 
     case SIMCALL_IO_WAIT:
-      simcall_HANDLER_io_wait(&simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::IoImpl*>(simcall.args_[0]));
+      simcall_HANDLER_io_wait(&simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::IoImpl*>(simcall.args_[0]), simgrid::simix::unmarshal<double>(simcall.args_[1]));
       break;
 
     case SIMCALL_MC_RANDOM:
index bf2371e..f0ebde6 100644 (file)
@@ -58,7 +58,7 @@ int        cond_wait_timeout(smx_cond_t cond, smx_mutex_t mutex, double timeout)
 void      sem_acquire(smx_sem_t sem) [[block]];
 int       sem_acquire_timeout(smx_sem_t sem, double timeout) [[block]];
 
-sg_size_t io_wait(simgrid::kernel::activity::IoImpl* io) [[block]];
+sg_size_t io_wait(simgrid::kernel::activity::IoImpl* io, double timeout) [[block]];
 
 int        mc_random(int min, int max);