From d08986414f2aaf9c858b0f63668c6e36ee4cb61f Mon Sep 17 00:00:00 2001 From: Martin Quinson Date: Wed, 8 Jul 2020 16:48:57 +0200 Subject: [PATCH] Tentative implementation of Activity::suspend() and resume() I decided not to change the State when the activity is suspended, because that State merely denotes whether pimpl_ was created already or not. This commit also adds a is_suspended() method, as well as an example. --- docs/source/app_s4u.rst | 11 +++ examples/README.rst | 15 ++++ examples/s4u/CMakeLists.txt | 2 +- .../s4u/comm-suspend/s4u-comm-suspend.cpp | 71 +++++++++++++++++++ .../s4u/comm-suspend/s4u-comm-suspend.tesh | 11 +++ .../s4u/comm-suspend/s4u-comm-suspend_d.xml | 6 ++ include/simgrid/s4u/Activity.hpp | 14 +++- src/s4u/s4u_Activity.cpp | 28 +++++++- src/s4u/s4u_Comm.cpp | 4 ++ src/s4u/s4u_Exec.cpp | 4 ++ src/s4u/s4u_Io.cpp | 4 ++ 11 files changed, 165 insertions(+), 5 deletions(-) create mode 100644 examples/s4u/comm-suspend/s4u-comm-suspend.cpp create mode 100644 examples/s4u/comm-suspend/s4u-comm-suspend.tesh create mode 100644 examples/s4u/comm-suspend/s4u-comm-suspend_d.xml diff --git a/docs/source/app_s4u.rst b/docs/source/app_s4u.rst index fdb8c905ac..13425270b3 100644 --- a/docs/source/app_s4u.rst +++ b/docs/source/app_s4u.rst @@ -1708,6 +1708,17 @@ Activities life cycle .. autodoxymethod:: simgrid::s4u::Activity::wait_until(double time_limit) .. autodoxymethod:: simgrid::s4u::Activity::vetoable_start() +Suspending and resuming an activity +----------------------------------- + +.. tabs:: + + .. group-tab:: C++ + + .. autodoxymethod:: simgrid::s4u::Activity::suspend + .. autodoxymethod:: simgrid::s4u::Activity::resume + .. autodoxymethod:: simgrid::s4u::Activity::is_suspended + .. _API_s4u_Comm: ============= diff --git a/examples/README.rst b/examples/README.rst index 3ac16487c6..f334c3bd68 100644 --- a/examples/README.rst +++ b/examples/README.rst @@ -301,6 +301,21 @@ Communications on the Network See also :cpp:func:`sg_mailbox_put_async()` and :cpp:func:`sg_comm__wait()`. + - **Suspending communications:** + The ``suspend()`` and ``resume()`` functions allow to block the + progression of a given communication for a while and then unblock it. + ``is_suspended()`` can be used to retrieve whether the activity is + currently blocked or not. + + .. tabs:: + + .. example-tab:: examples/s4u/comm-suspend/s4u-comm-suspend.cpp + + See also :cpp:func:`simgrid::s4u::Activity::suspend()` + :cpp:func:`simgrid::s4u::Activity::resume()` and + :cpp:func:`simgrid::s4u::Activity::is_suspended()`. + + - **Waiting for all communications in a set:** The ``wait_all()`` function is useful when you want to block until all activities in a given set have completed. diff --git a/examples/s4u/CMakeLists.txt b/examples/s4u/CMakeLists.txt index 5961f089c3..4f70f84473 100644 --- a/examples/s4u/CMakeLists.txt +++ b/examples/s4u/CMakeLists.txt @@ -62,7 +62,7 @@ endif() foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize app-bittorrent app-chainsend app-pingpong app-token-ring - comm-ready comm-wait comm-waitany comm-waitall comm-waituntil + comm-ready comm-suspend comm-wait comm-waitany comm-waitall comm-waituntil comm-dependent cloud-capping cloud-migration cloud-simple dht-chord dht-kademlia diff --git a/examples/s4u/comm-suspend/s4u-comm-suspend.cpp b/examples/s4u/comm-suspend/s4u-comm-suspend.cpp new file mode 100644 index 0000000000..144a0293aa --- /dev/null +++ b/examples/s4u/comm-suspend/s4u-comm-suspend.cpp @@ -0,0 +1,71 @@ +/* Copyright (c) 2010-2020. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +/* This example shows how to suspend and resume an asynchronous communication. */ + +#include "simgrid/s4u.hpp" +#include +#include +#include + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_wait, "Messages specific for this s4u example"); + +static void sender(int argc, char**) +{ + xbt_assert(argc == 1, "Expecting no parameter from the XML deployment file but got %d", argc - 1); + + simgrid::s4u::Mailbox* mbox = simgrid::s4u::Mailbox::by_name("receiver"); + + // Copy the data we send: the 'msg_content' variable is not a stable storage location. + // It will be destroyed when this actor leaves the loop, ie before the receiver gets the data + std::string* payload = new std::string("Sent message"); + + /* Create a communication representing the ongoing communication and then */ + simgrid::s4u::CommPtr comm = mbox->put_init(payload, 13194230); + XBT_INFO("Suspend the communication before it starts (remaining: %.0f bytes) and wait a second.", + comm->get_remaining()); + simgrid::s4u::this_actor::sleep_for(1); + XBT_INFO("Now, start the communication (remaining: %.0f bytes) and wait another second.", comm->get_remaining()); + comm->start(); + simgrid::s4u::this_actor::sleep_for(1); + + XBT_INFO("There is still %.0f bytes to transfer in this communication. Suspend it for one second.", + comm->get_remaining()); + comm->suspend(); + XBT_INFO("Now there is %.0f bytes to transfer. Resume it and wait for its completion.", comm->get_remaining()); + comm->resume(); + comm->wait(); + XBT_INFO("There is %f bytes to transfer after the communication completion.", comm->get_remaining()); + XBT_INFO("Suspending a completed activity is a no-op."); + comm->suspend(); +} + +static void receiver(int, char**) +{ + simgrid::s4u::Mailbox* mbox = simgrid::s4u::Mailbox::by_name("receiver"); + + XBT_INFO("Wait for the message."); + void* payload = mbox->get(); + + const std::string* received = static_cast(payload); + XBT_INFO("I got '%s'.", received->c_str()); + + delete received; +} + +int main(int argc, char* argv[]) +{ + xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n", argv[0]); + + simgrid::s4u::Engine e(&argc, argv); + e.register_function("sender", &sender); + e.register_function("receiver", &receiver); + + e.load_platform(argv[1]); + e.load_deployment(argv[2]); + e.run(); + + return 0; +} diff --git a/examples/s4u/comm-suspend/s4u-comm-suspend.tesh b/examples/s4u/comm-suspend/s4u-comm-suspend.tesh new file mode 100644 index 0000000000..8d05012e06 --- /dev/null +++ b/examples/s4u/comm-suspend/s4u-comm-suspend.tesh @@ -0,0 +1,11 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-comm-suspend ${platfdir}/small_platform.xml s4u-comm-suspend_d.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" +> [ 0.000000] (1:sender@Tremblay) Suspend the communication before it starts (remaining: 13194230 bytes) and wait a second. +> [ 0.000000] (2:receiver@Jupiter) Wait for the message. +> [ 1.000000] (1:sender@Tremblay) Now, start the communication (remaining: 13194230 bytes) and wait another second. +> [ 2.000000] (1:sender@Tremblay) There is still 6660438 bytes to transfer in this communication. Suspend it for one second. +> [ 2.000000] (1:sender@Tremblay) Now there is 6660438 bytes to transfer. Resume it and wait for its completion. +> [ 3.000000] (2:receiver@Jupiter) I got 'Sent message'. +> [ 3.000000] (1:sender@Tremblay) There is 0.000000 bytes to transfer after the communication completion. +> [ 3.000000] (1:sender@Tremblay) Suspending a completed activity is a no-op. diff --git a/examples/s4u/comm-suspend/s4u-comm-suspend_d.xml b/examples/s4u/comm-suspend/s4u-comm-suspend_d.xml new file mode 100644 index 0000000000..3c2490089a --- /dev/null +++ b/examples/s4u/comm-suspend/s4u-comm-suspend_d.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index 2572d61eee..4a8789a6d0 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -76,12 +76,12 @@ public: * This function is optional: you can call wait() even if you didn't call start() */ virtual Activity* start() = 0; - /** Blocks until the activity is terminated */ + /** Blocks the current actor until the activity is terminated */ virtual Activity* wait() = 0; - /** Blocks until the activity is terminated, or until the timeout is elapsed + /** Blocks the current actor until the activity is terminated, or until the timeout is elapsed\n * Raises: timeout exception.*/ virtual Activity* wait_for(double timeout) = 0; - /** Blocks until the activity is terminated, or until the time limit is reached + /** Blocks the current actor until the activity is terminated, or until the time limit is reached\n * Raises: timeout exception. */ void wait_until(double time_limit); @@ -93,6 +93,13 @@ public: /** Tests whether the given activity is terminated yet. */ virtual bool test(); + /** Blocks the progression of this activity until it gets resumed */ + virtual Activity* suspend(); + /** Unblock the progression of this activity if it was suspended previously */ + virtual Activity* resume(); + /** Whether or not the progression of this activity is blocked */ + bool is_suspended() { return suspended_; } + virtual const char* get_cname() const = 0; virtual const std::string& get_name() const = 0; @@ -127,6 +134,7 @@ private: kernel::activity::ActivityImplPtr pimpl_ = nullptr; Activity::State state_ = Activity::State::INITED; double remains_ = 0; + bool suspended_ = false; std::vector successors_; std::set dependencies_; std::atomic_int_fast32_t refcount_{0}; diff --git a/src/s4u/s4u_Activity.cpp b/src/s4u/s4u_Activity.cpp index f1a24ae50d..18d27cf4c4 100644 --- a/src/s4u/s4u_Activity.cpp +++ b/src/s4u/s4u_Activity.cpp @@ -42,9 +42,35 @@ bool Activity::test() return false; } +Activity* Activity::suspend() +{ + if (suspended_) + return this; // Already suspended + suspended_ = true; + + if (state_ == State::STARTED) + pimpl_->suspend(); + + return this; +} + +Activity* Activity::resume() +{ + if (not suspended_) + return this; // nothing to restore when it's not suspended + + if (state_ == State::STARTED) + pimpl_->resume(); + + return this; +} + double Activity::get_remaining() const { - return remains_; + if (state_ == State::INITED || state_ == State::STARTING) + return remains_; + else + return pimpl_->get_remaining(); } Activity* Activity::set_remaining(double remains) diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index 4d5bb786b4..914138569f 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -138,6 +138,10 @@ Comm* Comm::start() } else { xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver"); } + + if (suspended_) + pimpl_->suspend(); + state_ = State::STARTED; return this; } diff --git a/src/s4u/s4u_Exec.cpp b/src/s4u/s4u_Exec.cpp index b0c71750ba..7908468702 100644 --- a/src/s4u/s4u_Exec.cpp +++ b/src/s4u/s4u_Exec.cpp @@ -180,6 +180,10 @@ Exec* Exec::start() .set_flops_amount(flops_amounts_.front()) .start(); }); + + if (suspended_) + pimpl_->suspend(); + state_ = State::STARTED; on_start(*Actor::self(), *this); return this; diff --git a/src/s4u/s4u_Io.cpp b/src/s4u/s4u_Io.cpp index cc9794aff4..4831daf91f 100644 --- a/src/s4u/s4u_Io.cpp +++ b/src/s4u/s4u_Io.cpp @@ -44,6 +44,10 @@ Io* Io::start() .start(); } }); + + if (suspended_) + pimpl_->suspend(); + state_ = State::STARTED; return this; } -- 2.20.1