Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
a2337dc343d47111b8cc31429501c36db7047af3
[simgrid.git] / src / plugins / task.cpp
1 #include <simgrid/Exception.hpp>
2 #include <simgrid/plugins/task.hpp>
3 #include <simgrid/s4u/Comm.hpp>
4 #include <simgrid/s4u/Exec.hpp>
5 #include <simgrid/s4u/Io.hpp>
6 #include <simgrid/simix.hpp>
7
8 #include "src/simgrid/module.hpp"
9
10 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
11 /** @defgroup plugin_task plugin_task Plugin Task
12
13   @beginrst
14
15 This is the task plugin, enabling management of Tasks.
16 To activate this plugin, first call :cpp:func:`Task::init`.
17
18 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
19 Tasks can only be instancied using either
20 :cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
21 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
22 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
23
24   @endrst
25  */
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
27
28 namespace simgrid::plugins {
29
30 xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
31
32 xbt::signal<void(Task*)> Task::on_start;
33 xbt::signal<void(Task*)> Task::on_end;
34
35 Task::Task(const std::string& name) : name_(name) {}
36
37 /**
38  *  @brief Return True if the Task can start a new Activity.
39  *  @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
40  */
41 bool Task::ready_to_run() const
42 {
43   return not working_ && queued_execs_ > 0;
44 }
45
46 /**
47  *  @param source The sender.
48  *  @brief Receive a token from another Task.
49  *  @note Check upon reception if the Task has received a token from each of its predecessors,
50  * and in this case consumes those tokens and enqueue an execution.
51  */
52 void Task::receive(Task* source)
53 {
54   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
55   auto source_count = predecessors_[source]++;
56   if (tokens_received_.size() <= queued_execs_ + source_count)
57     tokens_received_.push_back({});
58   tokens_received_[queued_execs_ + source_count][source] = source->token_;
59   bool enough_tokens = true;
60   for (auto const& [key, val] : predecessors_)
61     if (val < 1) {
62       enough_tokens = false;
63       break;
64     }
65   if (enough_tokens) {
66     for (auto& [key, val] : predecessors_)
67       val--;
68     enqueue_execs(1);
69   }
70 }
71
72 /**
73  *  @brief Task routine when finishing an execution.
74  *  @note Set its working status as false.
75  * Add 1 to its count of finished executions.
76  * Call the on_this_end func.
77  * Fire on_end callback.
78  * Send a token to each of its successors.
79  * Start a new execution if possible.
80  */
81 void Task::complete()
82 {
83   xbt_assert(s4u::Actor::is_maestro());
84   working_ = false;
85   count_++;
86   on_this_end_(this);
87   Task::on_end(this);
88   if (current_activity_)
89     previous_activity_ = std::move(current_activity_);
90   for (auto const& t : successors_)
91     t->receive(this);
92   if (ready_to_run())
93     fire();
94 }
95
96 /** @ingroup plugin_task
97  *  @brief Init the Task plugin.
98  *  @note Add a completion callback to all Activities to call Task::complete().
99  */
100 void Task::init()
101 {
102   static bool inited = false;
103   if (inited)
104     return;
105
106   inited                                  = true;
107   ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
108   simgrid::s4u::Exec::on_completion_cb(
109       [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
110   simgrid::s4u::Comm::on_completion_cb(
111       [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
112   simgrid::s4u::Io::on_completion_cb(
113       [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
114 }
115
116 /** @ingroup plugin_task
117  *  @param n The number of executions to enqueue.
118  *  @brief Enqueue executions.
119  *  @note Immediatly starts an execution if possible.
120  */
121 void Task::enqueue_execs(int n)
122 {
123   simgrid::kernel::actor::simcall_answered([this, n] {
124     queued_execs_ += n;
125     if (ready_to_run())
126       fire();
127   });
128 }
129
130 /** @ingroup plugin_task
131  *  @param amount The amount to set.
132  *  @brief Set the amout of work to do.
133  *  @note Amount in flop for ExecTask and in bytes for CommTask.
134  */
135 void Task::set_amount(double amount)
136 {
137   simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
138 }
139
140 /** @ingroup plugin_task
141  *  @param token The token to set.
142  *  @brief Set the token to send to successors.
143  *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
144  */
145 void Task::set_token(std::shared_ptr<void> token)
146 {
147   simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
148 }
149
150 /** @ingroup plugin_task
151  *  @return Map of tokens received for the next execution.
152  *  @note If there is no queued execution for this task the map might not exist or be partially empty.
153  */
154 std::map<TaskPtr, std::shared_ptr<void>> Task::get_next_execution_tokens() const
155 {
156   return tokens_received_.front();
157 }
158
159 /** @ingroup plugin_task
160  *  @param successor The Task to add.
161  *  @brief Add a successor to this Task.
162  *  @note It also adds this as a predecessor of successor.
163  */
164 void Task::add_successor(TaskPtr successor)
165 {
166   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
167     successors_.insert(successor_p);
168     successor_p->predecessors_.try_emplace(this, 0);
169   });
170 }
171
172 /** @ingroup plugin_task
173  *  @param successor The Task to remove.
174  *  @brief Remove a successor from this Task.
175  *  @note It also remove this from the predecessors of successor.
176  */
177 void Task::remove_successor(TaskPtr successor)
178 {
179   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
180     successor_p->predecessors_.erase(this);
181     successors_.erase(successor_p);
182   });
183 }
184
185 void Task::remove_all_successors()
186 {
187   simgrid::kernel::actor::simcall_answered([this] {
188     while (not successors_.empty()) {
189       auto* successor = *(successors_.begin());
190       successor->predecessors_.erase(this);
191       successors_.erase(successor);
192     }
193   });
194 }
195
196 /** @ingroup plugin_task
197  *  @param func The function to set.
198  *  @brief Set a function to be called before each execution.
199  *  @note The function is called before the underlying Activity starts.
200  */
201 void Task::on_this_start_cb(const std::function<void(Task*)>& func)
202 {
203   simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
204 }
205
206 /** @ingroup plugin_task
207  *  @param func The function to set.
208  *  @brief Set a function to be called after each execution.
209  *  @note The function is called after the underlying Activity ends, but before sending tokens to successors.
210  */
211 void Task::on_this_end_cb(const std::function<void(Task*)>& func)
212 {
213   simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
214 }
215
216 /** @ingroup plugin_task
217  *  @brief Return the number of completed executions.
218  */
219 int Task::get_count() const
220 {
221   return count_;
222 }
223
224 /**
225  *  @brief Default constructor.
226  */
227 ExecTask::ExecTask(const std::string& name) : Task(name) {}
228
229 /** @ingroup plugin_task
230  *  @brief Smart Constructor.
231  */
232 ExecTaskPtr ExecTask::init(const std::string& name)
233 {
234   return ExecTaskPtr(new ExecTask(name));
235 }
236
237 /** @ingroup plugin_task
238  *  @brief Smart Constructor.
239  */
240 ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
241 {
242   return init(name)->set_flops(flops)->set_host(host);
243 }
244
245 /**
246  *  @brief Do one execution of the Task.
247  *  @note Call the on_this_start() func. Set its working status as true.
248  *  Init and start the underlying Activity.
249  */
250 void ExecTask::fire()
251 {
252   on_this_start_(this);
253   Task::on_start(this);
254   working_          = true;
255   queued_execs_     = std::max(queued_execs_ - 1, 0);
256   if (tokens_received_.size() > 0)
257       tokens_received_.pop_front();
258   s4u::ExecPtr exec = s4u::Exec::init();
259   exec->set_name(name_);
260   exec->set_flops_amount(amount_);
261   exec->set_host(host_);
262   exec->start();
263   exec->extension_set(new ExtendedAttributeActivity());
264   exec->extension<ExtendedAttributeActivity>()->task_ = this;
265   current_activity_                                   = exec;
266 }
267
268 /** @ingroup plugin_task
269  *  @param host The host to set.
270  *  @brief Set a new host.
271  */
272 ExecTaskPtr ExecTask::set_host(s4u::Host* host)
273 {
274   kernel::actor::simcall_answered([this, host] { host_ = host; });
275   return this;
276 }
277
278 /** @ingroup plugin_task
279  *  @param flops The amount of flops to set.
280  */
281 ExecTaskPtr ExecTask::set_flops(double flops)
282 {
283   kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
284   return this;
285 }
286
287 /**
288  *  @brief Default constructor.
289  */
290 CommTask::CommTask(const std::string& name) : Task(name) {}
291
292 /** @ingroup plugin_task
293  *  @brief Smart constructor.
294  */
295 CommTaskPtr CommTask::init(const std::string& name)
296 {
297   return CommTaskPtr(new CommTask(name));
298 }
299
300 /** @ingroup plugin_task
301  *  @brief Smart constructor.
302  */
303 CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
304 {
305   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
306 }
307
308 /**
309  *  @brief Do one execution of the Task.
310  *  @note Call the on_this_start() func. Set its working status as true.
311  *  Init and start the underlying Activity.
312  */
313 void CommTask::fire()
314 {
315   on_this_start_(this);
316   Task::on_start(this);
317   working_          = true;
318   queued_execs_     = std::max(queued_execs_ - 1, 0);
319   if (tokens_received_.size() > 0)
320       tokens_received_.pop_front();
321   s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
322   comm->set_name(name_);
323   comm->set_payload_size(amount_);
324   comm->start();
325   comm->extension_set(new ExtendedAttributeActivity());
326   comm->extension<ExtendedAttributeActivity>()->task_ = this;
327   current_activity_                                   = comm;
328 }
329
330 /** @ingroup plugin_task
331  *  @param source The host to set.
332  *  @brief Set a new source host.
333  */
334 CommTaskPtr CommTask::set_source(s4u::Host* source)
335 {
336   kernel::actor::simcall_answered([this, source] { source_ = source; });
337   return this;
338 }
339
340 /** @ingroup plugin_task
341  *  @param destination The host to set.
342  *  @brief Set a new destination host.
343  */
344 CommTaskPtr CommTask::set_destination(s4u::Host* destination)
345 {
346   kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
347   return this;
348 }
349
350 /** @ingroup plugin_task
351  *  @param bytes The amount of bytes to set.
352  */
353 CommTaskPtr CommTask::set_bytes(double bytes)
354 {
355   kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
356   return this;
357 }
358
359 /**
360  *  @brief Default constructor.
361  */
362 IoTask::IoTask(const std::string& name) : Task(name) {}
363
364 /** @ingroup plugin_task
365  *  @brief Smart Constructor.
366  */
367 IoTaskPtr IoTask::init(const std::string& name)
368 {
369   return IoTaskPtr(new IoTask(name));
370 }
371
372 /** @ingroup plugin_task
373  *  @brief Smart Constructor.
374  */
375 IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
376 {
377   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
378 }
379
380 /** @ingroup plugin_task
381  *  @param disk The disk to set.
382  *  @brief Set a new disk.
383  */
384 IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
385 {
386   kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
387   return this;
388 }
389
390 /** @ingroup plugin_task
391  *  @param bytes The amount of bytes to set.
392  */
393 IoTaskPtr IoTask::set_bytes(double bytes)
394 {
395   kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
396   return this;
397 }
398
399 /** @ingroup plugin_task */
400 IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
401 {
402   kernel::actor::simcall_answered([this, type] { type_ = type; });
403   return this;
404 }
405
406 void IoTask::fire()
407 {
408   on_this_start_(this);
409   Task::on_start(this);
410   working_      = true;
411   queued_execs_ = std::max(queued_execs_ - 1, 0);
412   if (tokens_received_.size() > 0)
413       tokens_received_.pop_front();
414   s4u::IoPtr io = s4u::Io::init();
415   io->set_name(name_);
416   io->set_size(amount_);
417   io->set_disk(disk_);
418   io->set_op_type(type_);
419   io->start();
420   io->extension_set(new ExtendedAttributeActivity());
421   io->extension<ExtendedAttributeActivity>()->task_ = this;
422   current_activity_                                 = io;
423 }
424
425 } // namespace simgrid::plugins