+#include <memory>
#include <simgrid/Exception.hpp>
#include <simgrid/plugins/task.hpp>
#include <simgrid/s4u/Comm.hpp>
namespace simgrid::plugins {
+xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
+
xbt::signal<void(Task*)> Task::on_start;
xbt::signal<void(Task*)> Task::on_end;
Task::Task(const std::string& name) : name_(name) {}
-/**
- * @param predecessor The Task to add.
- * @brief Add a predecessor to this Task.
- */
-void Task::add_predecessor(Task* predecessor)
-{
- if (predecessors_.find(predecessor) == predecessors_.end())
- simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_[predecessor] = 0; });
-}
-
-/**
- * @param predecessor The Task to remove.
- * @brief Remove a predecessor from this Task.
- */
-void Task::remove_predecessor(Task* predecessor)
-{
- simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_.erase(predecessor); });
-}
-
/**
* @brief Return True if the Task can start a new Activity.
* @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
void Task::receive(Task* source)
{
XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
- auto it = predecessors_.find(source);
- simgrid::kernel::actor::simcall_answered([this, it] {
- it->second++;
- bool enough_tokens = true;
- for (auto const& [key, val] : predecessors_)
- if (val < 1) {
- enough_tokens = false;
- break;
- }
- if (enough_tokens) {
- for (auto& [key, val] : predecessors_)
- val--;
- enqueue_execs(1);
+ auto source_count = predecessors_[source]++;
+ if (tokens_received_.size() <= queued_execs_ + source_count)
+ tokens_received_.push_back({});
+ tokens_received_[queued_execs_ + source_count][source] = source->token_;
+ bool enough_tokens = true;
+ for (auto const& [key, val] : predecessors_)
+ if (val < 1) {
+ enough_tokens = false;
+ break;
}
- });
+ if (enough_tokens) {
+ for (auto& [key, val] : predecessors_)
+ val--;
+ enqueue_execs(1);
+ }
}
/**
*/
void Task::complete()
{
- simgrid::kernel::actor::simcall_answered([this] {
- working_ = false;
- count_++;
- });
- for (auto const& end_func : end_func_handlers_)
- end_func(this);
+ xbt_assert(s4u::Actor::is_maestro());
+ working_ = false;
+ count_++;
+ on_this_end_(this);
Task::on_end(this);
+ if (current_activity_)
+ previous_activity_ = std::move(current_activity_);
for (auto const& t : successors_)
t->receive(this);
if (ready_to_run())
*/
void Task::init()
{
- if (Task::inited_)
+ static bool inited = false;
+ if (inited)
return;
- Task::inited_ = true;
+
+ inited = true;
ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
simgrid::s4u::Exec::on_completion_cb(
[](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
}
+/** @ingroup plugin_task
+ * @param token The token to set.
+ * @brief Set the token to send to successors.
+ * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ */
+void Task::set_token(std::shared_ptr<Token> token)
+{
+ simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+}
+
+/** @ingroup plugin_task
+ * @return Map of tokens received for the next execution.
+ * @note If there is no queued execution for this task the map might not exist or be partially empty.
+ */
+std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+{
+ return tokens_received_.front()[t];
+}
+
/** @ingroup plugin_task
* @param successor The Task to add.
* @brief Add a successor to this Task.
*/
void Task::add_successor(TaskPtr successor)
{
- simgrid::kernel::actor::simcall_answered([this, successor] { successors_.insert(successor.get()); });
- successor->add_predecessor(this);
+ simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
+ successors_.insert(successor_p);
+ successor_p->predecessors_.try_emplace(this, 0);
+ });
}
/** @ingroup plugin_task
*/
void Task::remove_successor(TaskPtr successor)
{
- simgrid::kernel::actor::simcall_answered([this, successor] { successors_.erase(successor.get()); });
- successor->remove_predecessor(this);
+ simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
+ successor_p->predecessors_.erase(this);
+ successors_.erase(successor_p);
+ });
}
void Task::remove_all_successors()
* @brief Set a function to be called before each execution.
* @note The function is called before the underlying Activity starts.
*/
-void Task::on_this_start(const std::function<void(Task*)>& func)
+void Task::on_this_start_cb(const std::function<void(Task*)>& func)
{
- simgrid::kernel::actor::simcall_answered([this, &func] { start_func_handlers_.push_back(func); });
+ simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
}
/** @ingroup plugin_task
* @brief Set a function to be called after each execution.
* @note The function is called after the underlying Activity ends, but before sending tokens to successors.
*/
-void Task::on_this_end(const std::function<void(Task*)>& func)
+void Task::on_this_end_cb(const std::function<void(Task*)>& func)
{
- simgrid::kernel::actor::simcall_answered([this, &func] { end_func_handlers_.push_back(func); });
+ simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
}
/** @ingroup plugin_task
*/
void ExecTask::fire()
{
- for (auto const& start_func : start_func_handlers_)
- start_func(this);
+ on_this_start_(this);
Task::on_start(this);
- kernel::actor::simcall_answered([this] {
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- });
+ working_ = true;
+ queued_execs_ = std::max(queued_execs_ - 1, 0);
+ if (tokens_received_.size() > 0)
+ tokens_received_.pop_front();
s4u::ExecPtr exec = s4u::Exec::init();
exec->set_name(name_);
exec->set_flops_amount(amount_);
exec->start();
exec->extension_set(new ExtendedAttributeActivity());
exec->extension<ExtendedAttributeActivity>()->task_ = this;
- kernel::actor::simcall_answered([this, exec] { current_activity_ = exec; });
+ current_activity_ = exec;
}
/** @ingroup plugin_task
*/
void CommTask::fire()
{
- for (auto const& start_func : start_func_handlers_)
- start_func(this);
+ on_this_start_(this);
Task::on_start(this);
- kernel::actor::simcall_answered([this] {
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- });
+ working_ = true;
+ queued_execs_ = std::max(queued_execs_ - 1, 0);
+ if (tokens_received_.size() > 0)
+ tokens_received_.pop_front();
s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
comm->set_name(name_);
comm->set_payload_size(amount_);
comm->start();
comm->extension_set(new ExtendedAttributeActivity());
comm->extension<ExtendedAttributeActivity>()->task_ = this;
- kernel::actor::simcall_answered([this, comm] { current_activity_ = comm; });
+ current_activity_ = comm;
}
/** @ingroup plugin_task
void IoTask::fire()
{
- for (auto const& start_func : start_func_handlers_)
- start_func(this);
+ on_this_start_(this);
Task::on_start(this);
- kernel::actor::simcall_answered([this] {
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- });
+ working_ = true;
+ queued_execs_ = std::max(queued_execs_ - 1, 0);
+ if (tokens_received_.size() > 0)
+ tokens_received_.pop_front();
s4u::IoPtr io = s4u::Io::init();
io->set_name(name_);
io->set_size(amount_);
io->start();
io->extension_set(new ExtendedAttributeActivity());
io->extension<ExtendedAttributeActivity>()->task_ = this;
- kernel::actor::simcall_answered([this, io] { current_activity_ = io; });
+ current_activity_ = io;
}
} // namespace simgrid::plugins
-
-simgrid::xbt::Extension<simgrid::s4u::Activity, simgrid::plugins::ExtendedAttributeActivity>
- simgrid::plugins::ExtendedAttributeActivity::EXTENSION_ID;
-bool simgrid::plugins::Task::inited_ = false;