Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ca72bfbcaa94ccfec566c70539314d2da94c05c0
[simgrid.git] / src / s4u / s4u_Task.cpp
1 #include <memory>
2 #include <simgrid/Exception.hpp>
3 #include <simgrid/s4u/Activity.hpp>
4 #include <simgrid/s4u/Comm.hpp>
5 #include <simgrid/s4u/Disk.hpp>
6 #include <simgrid/s4u/Exec.hpp>
7 #include <simgrid/s4u/Io.hpp>
8 #include <simgrid/s4u/Task.hpp>
9 #include <simgrid/simix.hpp>
10 #include <string>
11 #include <xbt/asserts.h>
12
13 #include "src/simgrid/module.hpp"
14
15 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
16 /**
17   @beginrst
18
19
20 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
21 Tasks can only be instancied using either
22 :cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init`
23 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
24 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
25
26   @endrst
27  */
28 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
29
30 namespace simgrid::s4u {
31
32 Task::Task(const std::string& name) : name_(name) {}
33
34 /**
35  *  @brief Return True if the Task can start a new Activity.
36  *  @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
37  */
38 bool Task::ready_to_run(std::string instance)
39 {
40   return running_instances_[instance] < parallelism_degree_[instance] && queued_firings_[instance] > 0;
41 }
42
43 /**
44  *  @param source The sender.
45  *  @brief Receive a token from another Task.
46  *  @note Check upon reception if the Task has received a token from each of its predecessors,
47  * and in this case consumes those tokens and enqueue an execution.
48  */
49 void Task::receive(Task* source)
50 {
51   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
52   auto source_count = predecessors_[source];
53   predecessors_[source]++;
54   tokens_received_[source].push_back(source->token_);
55   bool enough_tokens = true;
56   for (auto const& [key, val] : predecessors_)
57     if (val < 1) {
58       enough_tokens = false;
59       break;
60     }
61   if (enough_tokens) {
62     for (auto& [key, val] : predecessors_)
63       val--;
64     enqueue_firings(1);
65   }
66 }
67
68 /**
69  *  @brief Task routine when finishing an execution.
70  *  @note Set its working status as false.
71  * Add 1 to its count of finished executions.
72  * Call the on_this_end func.
73  * Fire on_end callback.
74  * Send a token to each of its successors.
75  * Start a new execution if possible.
76  */
77 void Task::complete(std::string instance)
78 {
79   xbt_assert(Actor::is_maestro());
80   running_instances_[instance] = running_instances_[instance] - 1;
81   count_[instance]             = count_[instance] + 1;
82   if (instance == "collector") {
83     // XBT_INFO("Trigger on completion: %s - %s", get_cname(), instance.c_str());
84     on_this_completion(this);
85     on_completion(this);
86     for (auto const& t : successors_)
87       t->receive(this);
88   } else if (instance == "dispatcher") {
89     auto next_instance = load_balancing_function_();
90     xbt_assert(next_instance != "dispatcher" and next_instance != "collector", "Invalid instance selected: %s",
91                next_instance.c_str());
92     queued_firings_[next_instance] = queued_firings_.at(next_instance) + 1;
93     while (ready_to_run(next_instance))
94       fire(next_instance);
95   } else {
96     queued_firings_["collector"] = queued_firings_["collector"] + 1;
97     while (ready_to_run("collector"))
98       fire("collector");
99   }
100   if (ready_to_run(instance))
101     fire(instance);
102 }
103
104 /** @param n The new parallelism degree of the Task.
105  *  @brief Set the parallelism degree of the Task to inscrease or decrease horizontal scaling.
106  *  @note When increasing the degree the function starts new instances if there is queued firings.
107  *        When decreasing the degree the function does NOT stop running instances.
108  */
109 void Task::set_parallelism_degree(int n, std::string instance)
110 {
111   xbt_assert(n > 0, "Parallelism degree must be above 0.");
112   simgrid::kernel::actor::simcall_answered([this, n, &instance] {
113     if (instance == "all") {
114       for (auto& [key, value] : parallelism_degree_) {
115         parallelism_degree_[key] = n;
116         while (ready_to_run(key))
117           fire(key);
118       }
119     } else {
120       parallelism_degree_[instance] = n;
121       while (ready_to_run(instance))
122         fire(instance);
123     }
124   });
125 }
126
127 void Task::set_internal_bytes(int bytes, std::string instance)
128 {
129   simgrid::kernel::actor::simcall_answered([this, bytes, &instance] { internal_bytes_to_send_[instance] = bytes; });
130 }
131
132 void Task::set_load_balancing_function(std::function<std::string()> func)
133 {
134   simgrid::kernel::actor::simcall_answered([this, func] { load_balancing_function_ = func; });
135 }
136
137 /** @param n The number of firings to enqueue.
138  *  @brief Enqueue firing.
139  *  @note Immediatly fire an activity if possible.
140  */
141 void Task::enqueue_firings(int n)
142 {
143   simgrid::kernel::actor::simcall_answered([this, n] {
144     queued_firings_["dispatcher"] += n;
145     while (ready_to_run("dispatcher"))
146       fire("dispatcher");
147   });
148 }
149
150 /** @param name The new name to set.
151  *  @brief Set the name of the Task.
152  */
153 void Task::set_name(std::string name)
154 {
155   name_ = name;
156 }
157
158 /** @param amount The amount to set.
159  *  @brief Set the amout of work to do.
160  *  @note Amount in flop for ExecTask and in bytes for CommTask.
161  */
162 void Task::set_amount(double amount, std::string instance)
163 {
164   simgrid::kernel::actor::simcall_answered([this, amount, &instance] { amount_[instance] = amount; });
165 }
166
167 /** @param token The token to set.
168  *  @brief Set the token to send to successors.
169  *  @note The token is passed to each successor after the task end, i.e., after the on_completion callback.
170  */
171 void Task::set_token(std::shared_ptr<Token> token)
172 {
173   simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
174 }
175
176 void Task::deque_token_from(TaskPtr t)
177 {
178   simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_.at(t).pop_front(); });
179 }
180
181 void Task::fire(std::string instance)
182 {
183   if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) {
184     current_activities_[instance].pop_front();
185   }
186   if (instance != "dispatcher" and instance != "collector") {
187     on_this_start(this);
188     on_start(this);
189   }
190   running_instances_[instance]++;
191   queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0);
192 }
193
194 /** @param successor The Task to add.
195  *  @brief Add a successor to this Task.
196  *  @note It also adds this as a predecessor of successor.
197  */
198 void Task::add_successor(TaskPtr successor)
199 {
200   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
201     successors_.insert(successor_p);
202     successor_p->predecessors_.try_emplace(this, 0);
203   });
204 }
205
206 /** @param successor The Task to remove.
207  *  @brief Remove a successor from this Task.
208  *  @note It also remove this from the predecessors of successor.
209  */
210 void Task::remove_successor(TaskPtr successor)
211 {
212   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
213     successor_p->predecessors_.erase(this);
214     successors_.erase(successor_p);
215   });
216 }
217
218 /**
219  *  @brief TODO
220  */
221 void Task::remove_all_successors()
222 {
223   simgrid::kernel::actor::simcall_answered([this] {
224     while (not successors_.empty()) {
225       auto* successor = *(successors_.begin());
226       successor->predecessors_.erase(this);
227       successors_.erase(successor);
228     }
229   });
230 }
231
232 /**
233  *  @brief TODO
234  */
235 void Task::add_instances(int n)
236 {
237   xbt_assert(n >= 0, "Cannot add a negative number of instances (provided: %d)", n);
238   int instance_count = (int)amount_.size() - 2;
239   for (int i = instance_count; i < n + instance_count; i++) {
240     amount_["instance_" + std::to_string(i)]                 = amount_.at("instance_0");
241     queued_firings_["instance_" + std::to_string(i)]         = 0;
242     running_instances_["instance_" + std::to_string(i)]      = 0;
243     count_["instance_" + std::to_string(i)]                  = 0;
244     parallelism_degree_["instance_" + std::to_string(i)]     = parallelism_degree_.at("instance_0");
245     current_activities_["instance_" + std::to_string(i)]     = {};
246     internal_bytes_to_send_["instance_" + std::to_string(i)] = internal_bytes_to_send_.at("instance_0");
247     ;
248   }
249 }
250
251 /**
252  *  @brief TODO
253  */
254 void Task::remove_instances(int n)
255 {
256   int instance_count = (int)amount_.size() - 2;
257   xbt_assert(n >= 0, "Cannot remove a negative number of instances (provided: %d)", n);
258   xbt_assert(instance_count - n > 0, "The number of instances must be above 0 (instances: %d, provided: %d)",
259              instance_count, n);
260   for (int i = instance_count - 1; i >= instance_count - n; i--)
261     xbt_assert(running_instances_.at("instance_" + std::to_string(i)) == 0,
262                "Cannot remove a running instance (instances: %d)", i);
263   for (int i = instance_count - 1; i >= instance_count - n; i--) {
264     amount_.erase("instance_" + std::to_string(i));
265     queued_firings_.erase("instance_" + std::to_string(i));
266     running_instances_.erase("instance_" + std::to_string(i));
267     count_.erase("instance_" + std::to_string(i));
268     parallelism_degree_.erase("instance_" + std::to_string(i));
269     current_activities_.erase("instance_" + std::to_string(i));
270   }
271 }
272
273 /**
274  *  @brief Default constructor.
275  */
276 ExecTask::ExecTask(const std::string& name) : Task(name)
277 {
278   set_load_balancing_function([]() { return "instance_0"; });
279 }
280
281 /**
282  *  @brief Smart Constructor.
283  */
284 ExecTaskPtr ExecTask::init(const std::string& name)
285 {
286   return ExecTaskPtr(new ExecTask(name));
287 }
288
289 /**
290  *  @brief Smart Constructor.
291  */
292 ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
293 {
294   return init(name)->set_flops(flops)->set_host(host);
295 }
296
297 /**
298  *  @brief Do one execution of the Task.
299  *  @note Call the on_this_start() func.
300  *  Init and start the underlying Activity.
301  */
302 void ExecTask::fire(std::string instance)
303 {
304   Task::fire(instance);
305   if (instance == "dispatcher" or instance == "collector") {
306     auto exec = Exec::init()
307                     ->set_name(get_name() + "_" + instance)
308                     ->set_flops_amount(get_amount(instance))
309                     ->set_host(host_[instance]);
310     exec->start();
311     exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
312     store_activity(exec, instance);
313   } else {
314     auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_[instance]);
315     if (host_["dispatcher"] == host_[instance]) {
316       exec->start();
317       store_activity(exec, instance);
318     } else {
319       auto comm = Comm::sendto_init(host_["dispatcher"], host_[instance])
320                       ->set_name(get_name() + "_dispatcher_to_" + instance)
321                       ->set_payload_size(get_internal_bytes("dispatcher"));
322       comm->add_successor(exec);
323       comm->start();
324       store_activity(comm, instance);
325     }
326     if (host_[instance] == host_["collector"]) {
327       exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
328       if (host_["dispatcher"] != host_[instance])
329         store_activity(exec, instance);
330     } else {
331       auto comm = Comm::sendto_init(host_[instance], host_["collector"])
332                       ->set_name(get_name() + instance + "_to_collector")
333                       ->set_payload_size(get_internal_bytes(instance));
334       exec->add_successor(comm);
335       comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
336       comm.detach();
337     }
338   }
339 }
340
341 /**
342  *  @param host The host to set.
343  *  @brief Set a new host.
344  */
345 ExecTaskPtr ExecTask::set_host(Host* host, std::string instance)
346 {
347   kernel::actor::simcall_answered([this, host, &instance] {
348     if (instance == "all")
349       for (auto& [key, value] : host_)
350         host_[key] = host;
351     else
352       host_[instance] = host;
353   });
354   return this;
355 }
356
357 /**
358  *  @param flops The amount of flops to set.
359  */
360 ExecTaskPtr ExecTask::set_flops(double flops, std::string instance)
361 {
362   kernel::actor::simcall_answered([this, flops, &instance] { set_amount(flops, instance); });
363   return this;
364 }
365
366 /**
367  *  @brief TODO
368  */
369 void ExecTask::add_instances(int n)
370 {
371   Task::add_instances(n);
372   int instance_count = (int)host_.size() - 2;
373   for (int i = instance_count; i < n + instance_count; i++)
374     host_["instance_" + std::to_string(i)] = host_.at("instance_0");
375 }
376
377 /**
378  *  @brief TODO
379  */
380 void ExecTask::remove_instances(int n)
381 {
382   Task::remove_instances(n);
383   int instance_count = (int)host_.size() - 2;
384   for (int i = instance_count - 1; i >= instance_count - n; i--)
385     host_.erase("instance_" + std::to_string(i));
386 }
387
388 /**
389  *  @brief Default constructor.
390  */
391 CommTask::CommTask(const std::string& name) : Task(name)
392 {
393   set_load_balancing_function([]() { return "instance_0"; });
394 }
395
396 /**
397  *  @brief Smart constructor.
398  */
399 CommTaskPtr CommTask::init(const std::string& name)
400 {
401   return CommTaskPtr(new CommTask(name));
402 }
403
404 /**
405  *  @brief Smart constructor.
406  */
407 CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
408 {
409   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
410 }
411
412 /**
413  *  @brief Do one execution of the Task.
414  *  @note Call the on_this_start() func.
415  *  Init and start the underlying Activity.
416  */
417 void CommTask::fire(std::string instance)
418 {
419   Task::fire(instance);
420   if (instance == "dispatcher" or instance == "collector") {
421     auto exec = Exec::init()
422                     ->set_name(get_name() + "_" + instance)
423                     ->set_flops_amount(get_amount(instance))
424                     ->set_host(instance == "dispatcher" ? source_ : destination_);
425     exec->start();
426     exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
427     store_activity(exec, instance);
428   } else {
429     auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
430     comm->start();
431     comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
432     store_activity(comm, instance);
433   }
434 }
435
436 /**
437  *  @param source The host to set.
438  *  @brief Set a new source host.
439  */
440 CommTaskPtr CommTask::set_source(Host* source)
441 {
442   kernel::actor::simcall_answered([this, source] { source_ = source; });
443   return this;
444 }
445
446 /**
447  *  @param destination The host to set.
448  *  @brief Set a new destination host.
449  */
450 CommTaskPtr CommTask::set_destination(Host* destination)
451 {
452   kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
453   return this;
454 }
455
456 /**
457  *  @param bytes The amount of bytes to set.
458  */
459 CommTaskPtr CommTask::set_bytes(double bytes)
460 {
461   kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
462   return this;
463 }
464
465 /**
466  *  @brief Default constructor.
467  */
468 IoTask::IoTask(const std::string& name) : Task(name)
469 {
470   set_load_balancing_function([]() { return "instance_0"; });
471 }
472
473 /**
474  *  @brief Smart Constructor.
475  */
476 IoTaskPtr IoTask::init(const std::string& name)
477 {
478   return IoTaskPtr(new IoTask(name));
479 }
480
481 /**
482  *  @brief Smart Constructor.
483  */
484 IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
485 {
486   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
487 }
488
489 /**
490  *  @param disk The disk to set.
491  *  @brief Set a new disk.
492  */
493 IoTaskPtr IoTask::set_disk(Disk* disk)
494 {
495   kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
496   return this;
497 }
498
499 /**
500  *  @param bytes The amount of bytes to set.
501  */
502 IoTaskPtr IoTask::set_bytes(double bytes)
503 {
504   kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
505   return this;
506 }
507
508 /**  */
509 IoTaskPtr IoTask::set_op_type(Io::OpType type)
510 {
511   kernel::actor::simcall_answered([this, type] { type_ = type; });
512   return this;
513 }
514
515 void IoTask::fire(std::string instance)
516 {
517   Task::fire(instance);
518   if (instance == "dispatcher" or instance == "collector") {
519     auto exec = Exec::init()
520                     ->set_name(get_name() + "_" + instance)
521                     ->set_flops_amount(get_amount(instance))
522                     ->set_host(disk_->get_host());
523     exec->start();
524     exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
525     store_activity(exec, instance);
526   } else {
527     auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
528     io->start();
529     io->on_this_completion_cb([this, instance](Io const&) { complete(instance); });
530     store_activity(io, instance);
531   }
532 }
533
534 } // namespace simgrid::s4u