1 #include <simgrid/Exception.hpp>
2 #include <simgrid/plugins/task.hpp>
3 #include <simgrid/s4u/Comm.hpp>
4 #include <simgrid/s4u/Exec.hpp>
5 #include <simgrid/s4u/Io.hpp>
6 #include <simgrid/simix.hpp>
8 #include "src/simgrid/module.hpp"
10 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
11 /** @defgroup plugin_task plugin_task Plugin Task
15 This is the task plugin, enabling management of Tasks.
16 To activate this plugin, first call :cpp:func:`Task::init`.
18 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
19 Tasks can only be instancied using either
20 :cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
21 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
22 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
28 namespace simgrid::plugins {
30 xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
32 xbt::signal<void(Task*)> Task::on_start;
33 xbt::signal<void(Task*)> Task::on_end;
35 Task::Task(const std::string& name) : name_(name) {}
38 * @brief Return True if the Task can start a new Activity.
39 * @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
41 bool Task::ready_to_run() const
43 return not working_ && queued_execs_ > 0;
47 * @param source The sender.
48 * @brief Receive a token from another Task.
49 * @note Check upon reception if the Task has received a token from each of its predecessors,
50 * and in this case consumes those tokens and enqueue an execution.
52 void Task::receive(Task* source)
54 XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
55 auto source_count = predecessors_[source]++;
56 if (tokens_received_.size() <= queued_execs_ + source_count)
57 tokens_received_.push_back({});
58 tokens_received_[queued_execs_ + source_count][source] = source->token_;
59 bool enough_tokens = true;
60 for (auto const& [key, val] : predecessors_)
62 enough_tokens = false;
66 for (auto& [key, val] : predecessors_)
73 * @brief Task routine when finishing an execution.
74 * @note Set its working status as false.
75 * Add 1 to its count of finished executions.
76 * Call the on_this_end func.
77 * Fire on_end callback.
78 * Send a token to each of its successors.
79 * Start a new execution if possible.
83 xbt_assert(s4u::Actor::is_maestro());
88 if (current_activity_)
89 previous_activity_ = std::move(current_activity_);
90 for (auto const& t : successors_)
96 /** @ingroup plugin_task
97 * @brief Init the Task plugin.
98 * @note Add a completion callback to all Activities to call Task::complete().
102 static bool inited = false;
107 ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
108 simgrid::s4u::Exec::on_completion_cb(
109 [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
110 simgrid::s4u::Comm::on_completion_cb(
111 [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
112 simgrid::s4u::Io::on_completion_cb(
113 [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
116 /** @ingroup plugin_task
117 * @param n The number of executions to enqueue.
118 * @brief Enqueue executions.
119 * @note Immediatly starts an execution if possible.
121 void Task::enqueue_execs(int n)
123 simgrid::kernel::actor::simcall_answered([this, n] {
130 /** @ingroup plugin_task
131 * @param amount The amount to set.
132 * @brief Set the amout of work to do.
133 * @note Amount in flop for ExecTask and in bytes for CommTask.
135 void Task::set_amount(double amount)
137 simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
140 /** @ingroup plugin_task
141 * @param token The token to set.
142 * @brief Set the token to send to successors.
143 * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
145 void Task::set_token(std::shared_ptr<void> token)
147 simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
150 /** @ingroup plugin_task
151 * @return Map of tokens received for the next execution.
152 * @note If there is no queued execution for this task the map might not exist or be partially empty.
154 std::map<TaskPtr, std::shared_ptr<void>> Task::get_next_execution_tokens() const
156 return tokens_received_.front();
159 /** @ingroup plugin_task
160 * @param successor The Task to add.
161 * @brief Add a successor to this Task.
162 * @note It also adds this as a predecessor of successor.
164 void Task::add_successor(TaskPtr successor)
166 simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
167 successors_.insert(successor_p);
168 successor_p->predecessors_.try_emplace(this, 0);
172 /** @ingroup plugin_task
173 * @param successor The Task to remove.
174 * @brief Remove a successor from this Task.
175 * @note It also remove this from the predecessors of successor.
177 void Task::remove_successor(TaskPtr successor)
179 simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
180 successor_p->predecessors_.erase(this);
181 successors_.erase(successor_p);
185 void Task::remove_all_successors()
187 simgrid::kernel::actor::simcall_answered([this] {
188 while (not successors_.empty()) {
189 auto* successor = *(successors_.begin());
190 successor->predecessors_.erase(this);
191 successors_.erase(successor);
196 /** @ingroup plugin_task
197 * @param func The function to set.
198 * @brief Set a function to be called before each execution.
199 * @note The function is called before the underlying Activity starts.
201 void Task::on_this_start_cb(const std::function<void(Task*)>& func)
203 simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
206 /** @ingroup plugin_task
207 * @param func The function to set.
208 * @brief Set a function to be called after each execution.
209 * @note The function is called after the underlying Activity ends, but before sending tokens to successors.
211 void Task::on_this_end_cb(const std::function<void(Task*)>& func)
213 simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
216 /** @ingroup plugin_task
217 * @brief Return the number of completed executions.
219 int Task::get_count() const
225 * @brief Default constructor.
227 ExecTask::ExecTask(const std::string& name) : Task(name) {}
229 /** @ingroup plugin_task
230 * @brief Smart Constructor.
232 ExecTaskPtr ExecTask::init(const std::string& name)
234 return ExecTaskPtr(new ExecTask(name));
237 /** @ingroup plugin_task
238 * @brief Smart Constructor.
240 ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
242 return init(name)->set_flops(flops)->set_host(host);
246 * @brief Do one execution of the Task.
247 * @note Call the on_this_start() func. Set its working status as true.
248 * Init and start the underlying Activity.
250 void ExecTask::fire()
252 on_this_start_(this);
253 Task::on_start(this);
255 queued_execs_ = std::max(queued_execs_ - 1, 0);
256 if (tokens_received_.size() > 0)
257 tokens_received_.pop_front();
258 s4u::ExecPtr exec = s4u::Exec::init();
259 exec->set_name(name_);
260 exec->set_flops_amount(amount_);
261 exec->set_host(host_);
263 exec->extension_set(new ExtendedAttributeActivity());
264 exec->extension<ExtendedAttributeActivity>()->task_ = this;
265 current_activity_ = exec;
268 /** @ingroup plugin_task
269 * @param host The host to set.
270 * @brief Set a new host.
272 ExecTaskPtr ExecTask::set_host(s4u::Host* host)
274 kernel::actor::simcall_answered([this, host] { host_ = host; });
278 /** @ingroup plugin_task
279 * @param flops The amount of flops to set.
281 ExecTaskPtr ExecTask::set_flops(double flops)
283 kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
288 * @brief Default constructor.
290 CommTask::CommTask(const std::string& name) : Task(name) {}
292 /** @ingroup plugin_task
293 * @brief Smart constructor.
295 CommTaskPtr CommTask::init(const std::string& name)
297 return CommTaskPtr(new CommTask(name));
300 /** @ingroup plugin_task
301 * @brief Smart constructor.
303 CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
305 return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
309 * @brief Do one execution of the Task.
310 * @note Call the on_this_start() func. Set its working status as true.
311 * Init and start the underlying Activity.
313 void CommTask::fire()
315 on_this_start_(this);
316 Task::on_start(this);
318 queued_execs_ = std::max(queued_execs_ - 1, 0);
319 if (tokens_received_.size() > 0)
320 tokens_received_.pop_front();
321 s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
322 comm->set_name(name_);
323 comm->set_payload_size(amount_);
325 comm->extension_set(new ExtendedAttributeActivity());
326 comm->extension<ExtendedAttributeActivity>()->task_ = this;
327 current_activity_ = comm;
330 /** @ingroup plugin_task
331 * @param source The host to set.
332 * @brief Set a new source host.
334 CommTaskPtr CommTask::set_source(s4u::Host* source)
336 kernel::actor::simcall_answered([this, source] { source_ = source; });
340 /** @ingroup plugin_task
341 * @param destination The host to set.
342 * @brief Set a new destination host.
344 CommTaskPtr CommTask::set_destination(s4u::Host* destination)
346 kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
350 /** @ingroup plugin_task
351 * @param bytes The amount of bytes to set.
353 CommTaskPtr CommTask::set_bytes(double bytes)
355 kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
360 * @brief Default constructor.
362 IoTask::IoTask(const std::string& name) : Task(name) {}
364 /** @ingroup plugin_task
365 * @brief Smart Constructor.
367 IoTaskPtr IoTask::init(const std::string& name)
369 return IoTaskPtr(new IoTask(name));
372 /** @ingroup plugin_task
373 * @brief Smart Constructor.
375 IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
377 return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
380 /** @ingroup plugin_task
381 * @param disk The disk to set.
382 * @brief Set a new disk.
384 IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
386 kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
390 /** @ingroup plugin_task
391 * @param bytes The amount of bytes to set.
393 IoTaskPtr IoTask::set_bytes(double bytes)
395 kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
399 /** @ingroup plugin_task */
400 IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
402 kernel::actor::simcall_answered([this, type] { type_ = type; });
408 on_this_start_(this);
409 Task::on_start(this);
411 queued_execs_ = std::max(queued_execs_ - 1, 0);
412 if (tokens_received_.size() > 0)
413 tokens_received_.pop_front();
414 s4u::IoPtr io = s4u::Io::init();
416 io->set_size(amount_);
418 io->set_op_type(type_);
420 io->extension_set(new ExtendedAttributeActivity());
421 io->extension<ExtendedAttributeActivity>()->task_ = this;
422 current_activity_ = io;
425 } // namespace simgrid::plugins