Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
update doc
[simgrid.git] / src / s4u / s4u_Task.cpp
1 #include <cstddef>
2 #include <memory>
3 #include <simgrid/Exception.hpp>
4 #include <simgrid/s4u/Activity.hpp>
5 #include <simgrid/s4u/Comm.hpp>
6 #include <simgrid/s4u/Disk.hpp>
7 #include <simgrid/s4u/Exec.hpp>
8 #include <simgrid/s4u/Io.hpp>
9 #include <simgrid/s4u/Task.hpp>
10 #include <simgrid/simix.hpp>
11 #include <string>
12 #include <xbt/asserts.h>
13
14 #include "src/simgrid/module.hpp"
15
16 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
18
19 namespace simgrid::s4u {
20
21 Task::Task(const std::string& name) : name_(name) {}
22
23 /** @param instance The Task instance to check.
24  *  @brief Return True if this Task instance can start.
25  */
26 bool Task::ready_to_run(std::string instance)
27 {
28   return running_instances_[instance] < parallelism_degree_[instance] && queued_firings_[instance] > 0;
29 }
30
31 /** @param source The sender.
32  *  @brief Receive a token from another Task.
33  *  @note Check upon reception if the Task has received a token from each of its predecessors,
34  * and in this case consumes those tokens and enqueue an execution.
35  */
36 void Task::receive(Task* source)
37 {
38   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
39   predecessors_[source]++;
40   if (source->token_ != nullptr)
41     tokens_received_[source].push_back(source->token_);
42   bool enough_tokens = true;
43   for (auto const& [key, val] : predecessors_)
44     if (val < 1) {
45       enough_tokens = false;
46       break;
47     }
48   if (enough_tokens) {
49     for (auto& [key, val] : predecessors_)
50       val--;
51     enqueue_firings(1);
52   }
53 }
54
55 /** @param instance The Taks instance to complete.
56  *  @brief Task instance routine when finishing an execution of an instance.
57  *  @note The dispatcher instance enqueues a firing for the next instance.
58  *        The collector instance triggers the on_completion signals and sends tokens to successors.
59  *        Others instances enqueue a firing of the collector instance.
60  */
61 void Task::complete(std::string instance)
62 {
63   xbt_assert(Actor::is_maestro());
64   running_instances_[instance]--;
65   count_[instance]++;
66   if (instance == "collector") {
67     on_this_completion(this);
68     on_completion(this);
69     for (auto const& t : successors_)
70       t->receive(this);
71   } else if (instance == "dispatcher") {
72     auto next_instance = load_balancing_function_();
73     xbt_assert(next_instance != "dispatcher" and next_instance != "collector", "Invalid instance selected: %s",
74                next_instance.c_str());
75     queued_firings_[next_instance] = queued_firings_.at(next_instance) + 1;
76     while (ready_to_run(next_instance))
77       fire(next_instance);
78   } else {
79     queued_firings_["collector"]++;
80     while (ready_to_run("collector"))
81       fire("collector");
82   }
83   if (ready_to_run(instance))
84     fire(instance);
85 }
86
87 /** @param n The new parallelism degree of the Task instance.
88  *  @param instance The Task instance to modify.
89  *  @note You can use instance "all" to modify the parallelism degree of all instances of this Task.
90  *        When increasing the degree new executions are started if there is queued firings.
91  *        When decreasing the degree instances already running are NOT stopped.
92  */
93 void Task::set_parallelism_degree(int n, std::string instance)
94 {
95   xbt_assert(n > 0, "Parallelism degree must be above 0.");
96   simgrid::kernel::actor::simcall_answered([this, n, &instance] {
97     if (instance == "all") {
98       for (auto& [key, value] : parallelism_degree_) {
99         parallelism_degree_[key] = n;
100         while (ready_to_run(key))
101           fire(key);
102       }
103     } else {
104       parallelism_degree_[instance] = n;
105       while (ready_to_run(instance))
106         fire(instance);
107     }
108   });
109 }
110
111 /** @param bytes The internal bytes of the Task instance.
112  *  @param instance The Task instance to modify.
113  *  @note Internal bytes are used for Comms between the dispatcher and instance_n,
114  *        and between instance_n and the collector if they are not on the same host.
115  */
116 void Task::set_internal_bytes(int bytes, std::string instance)
117 {
118   simgrid::kernel::actor::simcall_answered([this, bytes, &instance] { internal_bytes_to_send_[instance] = bytes; });
119 }
120
121 /** @param func The load balancing function.
122  *  @note The dispatcher uses this function to determine which instance to trigger next.
123  */
124 void Task::set_load_balancing_function(std::function<std::string()> func)
125 {
126   simgrid::kernel::actor::simcall_answered([this, func] { load_balancing_function_ = func; });
127 }
128
129 /** @param n The number of firings to enqueue.
130  */
131 void Task::enqueue_firings(int n)
132 {
133   simgrid::kernel::actor::simcall_answered([this, n] {
134     queued_firings_["dispatcher"] += n;
135     while (ready_to_run("dispatcher"))
136       fire("dispatcher");
137   });
138 }
139
140 /** @param name The new name to set.
141  *  @brief Set the name of the Task.
142  */
143 void Task::set_name(std::string name)
144 {
145   name_ = name;
146 }
147
148 /** @param amount The amount to set.
149  *  @param instance The Task instance to modify.
150  *  @note Amount in flop for ExecTask and in bytes for CommTask.
151  */
152 void Task::set_amount(double amount, std::string instance)
153 {
154   simgrid::kernel::actor::simcall_answered([this, amount, &instance] { amount_[instance] = amount; });
155 }
156
157 /** @param token The token to set.
158  *  @brief Set the token to send to successors.
159  *  @note The token is passed to each successor after the Task instance collector end, i.e., after the on_completion
160  * callback.
161  */
162 void Task::set_token(std::shared_ptr<Token> token)
163 {
164   simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
165 }
166
167 /** @param t The Task to deque a token from.
168  */
169 void Task::deque_token_from(TaskPtr t)
170 {
171   simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_[t].pop_front(); });
172 }
173
174 void Task::fire(std::string instance)
175 {
176   if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) {
177     current_activities_[instance].pop_front();
178   }
179   if (instance != "dispatcher" and instance != "collector") {
180     on_this_start(this);
181     on_start(this);
182   }
183   running_instances_[instance]++;
184   queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0);
185 }
186
187 /** @param successor The Task to add as a successor.
188  *  @note It also adds this as a predecessor of successor.
189  */
190 void Task::add_successor(TaskPtr successor)
191 {
192   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
193     successors_.insert(successor_p);
194     successor_p->predecessors_.try_emplace(this, 0);
195   });
196 }
197
198 /** @param successor The Task to remove from the successors of this Task.
199  *  @note It also remove this from the predecessors of successor.
200  */
201 void Task::remove_successor(TaskPtr successor)
202 {
203   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
204     successor_p->predecessors_.erase(this);
205     successors_.erase(successor_p);
206   });
207 }
208
209 /** @brief Remove all successors from this Task.
210  */
211 void Task::remove_all_successors()
212 {
213   simgrid::kernel::actor::simcall_answered([this] {
214     while (not successors_.empty()) {
215       auto* successor = *(successors_.begin());
216       successor->predecessors_.erase(this);
217       successors_.erase(successor);
218     }
219   });
220 }
221
222 /** @param n The number of instances to add to this Task (>=0).
223  *  @note Instances goes always from instance_0 to instance_x,
224  *        where x is the current number of instance.
225  */
226 void Task::add_instances(int n)
227 {
228   xbt_assert(n >= 0, "Cannot add a negative number of instances (provided: %d)", n);
229   int instance_count = (int)amount_.size() - 2;
230   for (int i = instance_count; i < n + instance_count; i++) {
231     amount_["instance_" + std::to_string(i)]                 = amount_.at("instance_0");
232     queued_firings_["instance_" + std::to_string(i)]         = 0;
233     running_instances_["instance_" + std::to_string(i)]      = 0;
234     count_["instance_" + std::to_string(i)]                  = 0;
235     parallelism_degree_["instance_" + std::to_string(i)]     = parallelism_degree_.at("instance_0");
236     current_activities_["instance_" + std::to_string(i)]     = {};
237     internal_bytes_to_send_["instance_" + std::to_string(i)] = internal_bytes_to_send_.at("instance_0");
238     ;
239   }
240 }
241
242 /** @param n The number of instances to remove from this Task (>=0).
243  *  @note Instances goes always from instance_0 to instance_x,
244  *        where x is the current number of instance.
245  *        Running instances cannot be removed.
246  */
247 void Task::remove_instances(int n)
248 {
249   int instance_count = (int)amount_.size() - 2;
250   xbt_assert(n >= 0, "Cannot remove a negative number of instances (provided: %d)", n);
251   xbt_assert(instance_count - n > 0, "The number of instances must be above 0 (instances: %d, provided: %d)",
252              instance_count, n);
253   for (int i = instance_count - 1; i >= instance_count - n; i--) {
254     xbt_assert(running_instances_.at("instance_" + std::to_string(i)) == 0,
255                "Cannot remove a running instance (instances: %d)", i);
256     amount_.erase("instance_" + std::to_string(i));
257     queued_firings_.erase("instance_" + std::to_string(i));
258     running_instances_.erase("instance_" + std::to_string(i));
259     count_.erase("instance_" + std::to_string(i));
260     parallelism_degree_.erase("instance_" + std::to_string(i));
261     current_activities_.erase("instance_" + std::to_string(i));
262   }
263 }
264
265 /**
266  *  @brief Default constructor.
267  */
268 ExecTask::ExecTask(const std::string& name) : Task(name)
269 {
270   set_load_balancing_function([]() { return "instance_0"; });
271 }
272
273 /**
274  *  @brief Smart Constructor.
275  */
276 ExecTaskPtr ExecTask::init(const std::string& name)
277 {
278   return ExecTaskPtr(new ExecTask(name));
279 }
280
281 /**
282  *  @brief Smart Constructor.
283  */
284 ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
285 {
286   return init(name)->set_flops(flops)->set_host(host);
287 }
288
289 /** @param instance The Task instance to fire.
290  *  @note Only the dispatcher instance triggers the on_start signal.
291  *        Comms are created if hosts differ between dispatcher and the instance to fire,
292  *        or between the instance and the collector.
293  */
294 void ExecTask::fire(std::string instance)
295 {
296   Task::fire(instance);
297   if (instance == "dispatcher" or instance == "collector") {
298     auto exec = Exec::init()
299                     ->set_name(get_name() + "_" + instance)
300                     ->set_flops_amount(get_amount(instance))
301                     ->set_host(host_[instance]);
302     exec->start();
303     exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
304     store_activity(exec, instance);
305   } else {
306     auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_[instance]);
307     if (host_["dispatcher"] == host_[instance]) {
308       exec->start();
309       store_activity(exec, instance);
310     } else {
311       auto comm = Comm::sendto_init(host_["dispatcher"], host_[instance])
312                       ->set_name(get_name() + "_dispatcher_to_" + instance)
313                       ->set_payload_size(get_internal_bytes("dispatcher"));
314       comm->add_successor(exec);
315       comm->start();
316       store_activity(comm, instance);
317     }
318     if (host_[instance] == host_["collector"]) {
319       exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
320       if (host_["dispatcher"] != host_[instance])
321         store_activity(exec, instance);
322     } else {
323       auto comm = Comm::sendto_init(host_[instance], host_["collector"])
324                       ->set_name(get_name() + instance + "_to_collector")
325                       ->set_payload_size(get_internal_bytes(instance));
326       exec->add_successor(comm);
327       comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
328       comm.detach();
329     }
330   }
331 }
332
333 /** @param host The host to set.
334  *  @param instance The Task instance to modify.
335  *  @brief Set a new host.
336  */
337 ExecTaskPtr ExecTask::set_host(Host* host, std::string instance)
338 {
339   kernel::actor::simcall_answered([this, host, &instance] {
340     if (instance == "all")
341       for (auto& [key, value] : host_)
342         host_[key] = host;
343     else
344       host_[instance] = host;
345   });
346   return this;
347 }
348
349 /** @param flops The amount of flops to set.
350  *  @param instance The Task instance to modify.
351  */
352 ExecTaskPtr ExecTask::set_flops(double flops, std::string instance)
353 {
354   kernel::actor::simcall_answered([this, flops, &instance] { set_amount(flops, instance); });
355   return this;
356 }
357
358 /** @param n The number of instances to add to this Task (>=0).
359     @note Instances goes always from instance_0 to instance_x,
360           where x is the current number of instance.
361  */
362 void ExecTask::add_instances(int n)
363 {
364   Task::add_instances(n);
365   int instance_count = (int)host_.size() - 2;
366   for (int i = instance_count; i < n + instance_count; i++)
367     host_["instance_" + std::to_string(i)] = host_.at("instance_0");
368 }
369
370 /** @param n The number of instances to remove from this Task (>=0).
371     @note Instances goes always from instance_0 to instance_x,
372           where x is the current number of instance.
373           Running instance cannot be removed.
374  */
375 void ExecTask::remove_instances(int n)
376 {
377   Task::remove_instances(n);
378   int instance_count = (int)host_.size() - 2;
379   for (int i = instance_count - 1; i >= instance_count - n; i--)
380     host_.erase("instance_" + std::to_string(i));
381 }
382
383 /**
384  *  @brief Default constructor.
385  */
386 CommTask::CommTask(const std::string& name) : Task(name)
387 {
388   set_load_balancing_function([]() { return "instance_0"; });
389 }
390
391 /**
392  *  @brief Smart constructor.
393  */
394 CommTaskPtr CommTask::init(const std::string& name)
395 {
396   return CommTaskPtr(new CommTask(name));
397 }
398
399 /**
400  *  @brief Smart constructor.
401  */
402 CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
403 {
404   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
405 }
406
407 /** @param instance The Task instance to fire.
408  *  @note Only the dispatcher instance triggers the on_start signal.
409  */
410 void CommTask::fire(std::string instance)
411 {
412   Task::fire(instance);
413   if (instance == "dispatcher" or instance == "collector") {
414     auto exec = Exec::init()
415                     ->set_name(get_name() + "_" + instance)
416                     ->set_flops_amount(get_amount(instance))
417                     ->set_host(instance == "dispatcher" ? source_ : destination_);
418     exec->start();
419     exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
420     store_activity(exec, instance);
421   } else {
422     auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
423     comm->start();
424     comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
425     store_activity(comm, instance);
426   }
427 }
428
429 /**
430  *  @param source The host to set.
431  *  @brief Set a new source host.
432  */
433 CommTaskPtr CommTask::set_source(Host* source)
434 {
435   kernel::actor::simcall_answered([this, source] { source_ = source; });
436   return this;
437 }
438
439 /**
440  *  @param destination The host to set.
441  *  @brief Set a new destination host.
442  */
443 CommTaskPtr CommTask::set_destination(Host* destination)
444 {
445   kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
446   return this;
447 }
448
449 /**
450  *  @param bytes The amount of bytes to set.
451  */
452 CommTaskPtr CommTask::set_bytes(double bytes)
453 {
454   kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
455   return this;
456 }
457
458 /**
459  *  @brief Default constructor.
460  */
461 IoTask::IoTask(const std::string& name) : Task(name)
462 {
463   set_load_balancing_function([]() { return "instance_0"; });
464 }
465
466 /**
467  *  @brief Smart Constructor.
468  */
469 IoTaskPtr IoTask::init(const std::string& name)
470 {
471   return IoTaskPtr(new IoTask(name));
472 }
473
474 /**
475  *  @brief Smart Constructor.
476  */
477 IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
478 {
479   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
480 }
481
482 /**
483  *  @param disk The disk to set.
484  */
485 IoTaskPtr IoTask::set_disk(Disk* disk)
486 {
487   kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
488   return this;
489 }
490
491 /**
492  *  @param bytes The amount of bytes to set.
493  */
494 IoTaskPtr IoTask::set_bytes(double bytes)
495 {
496   kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
497   return this;
498 }
499
500 /**
501  *  @param type The op type to set.
502  */
503 IoTaskPtr IoTask::set_op_type(Io::OpType type)
504 {
505   kernel::actor::simcall_answered([this, type] { type_ = type; });
506   return this;
507 }
508
509 /** @param instance The Task instance to fire.
510  *  @note Only the dispatcher instance triggers the on_start signal.
511  */
512 void IoTask::fire(std::string instance)
513 {
514   Task::fire(instance);
515   if (instance == "dispatcher" or instance == "collector") {
516     auto exec = Exec::init()
517                     ->set_name(get_name() + "_" + instance)
518                     ->set_flops_amount(get_amount(instance))
519                     ->set_host(disk_->get_host());
520     exec->start();
521     exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
522     store_activity(exec, instance);
523   } else {
524     auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
525     io->start();
526     io->on_this_completion_cb([this, instance](Io const&) { complete(instance); });
527     store_activity(io, instance);
528   }
529 }
530 } // namespace simgrid::s4u