3 #include <simgrid/Exception.hpp>
4 #include <simgrid/s4u/Activity.hpp>
5 #include <simgrid/s4u/Comm.hpp>
6 #include <simgrid/s4u/Disk.hpp>
7 #include <simgrid/s4u/Exec.hpp>
8 #include <simgrid/s4u/Io.hpp>
9 #include <simgrid/s4u/Task.hpp>
10 #include <simgrid/simix.hpp>
12 #include <xbt/asserts.h>
14 #include "src/simgrid/module.hpp"
16 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
19 namespace simgrid::s4u {
21 Task::Task(const std::string& name) : name_(name) {}
23 /** @param instance The Task instance to check.
24 * @brief Return True if this Task instance can start.
26 bool Task::ready_to_run(std::string instance)
28 return running_instances_[instance] < parallelism_degree_[instance] && queued_firings_[instance] > 0;
31 /** @param source The sender.
32 * @brief Receive a token from another Task.
33 * @note Check upon reception if the Task has received a token from each of its predecessors,
34 * and in this case consumes those tokens and enqueue an execution.
36 void Task::receive(Task* source)
38 XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
39 predecessors_[source]++;
40 if (source->token_ != nullptr)
41 tokens_received_[source].push_back(source->token_);
42 bool enough_tokens = true;
43 for (auto const& [key, val] : predecessors_)
45 enough_tokens = false;
49 for (auto& [key, val] : predecessors_)
55 /** @param instance The Taks instance to complete.
56 * @brief Task instance routine when finishing an execution of an instance.
57 * @note The dispatcher instance enqueues a firing for the next instance.
58 * The collector instance triggers the on_completion signals and sends tokens to successors.
59 * Others instances enqueue a firing of the collector instance.
61 void Task::complete(std::string instance)
63 xbt_assert(Actor::is_maestro());
64 running_instances_[instance]--;
66 if (instance == "collector") {
67 on_this_completion(this);
69 for (auto const& t : successors_)
71 } else if (instance == "dispatcher") {
72 auto next_instance = load_balancing_function_();
73 xbt_assert(next_instance != "dispatcher" and next_instance != "collector", "Invalid instance selected: %s",
74 next_instance.c_str());
75 queued_firings_[next_instance] = queued_firings_.at(next_instance) + 1;
76 while (ready_to_run(next_instance))
79 queued_firings_["collector"]++;
80 while (ready_to_run("collector"))
83 if (ready_to_run(instance))
87 /** @param n The new parallelism degree of the Task instance.
88 * @param instance The Task instance to modify.
89 * @note You can use instance "all" to modify the parallelism degree of all instances of this Task.
90 * When increasing the degree new executions are started if there is queued firings.
91 * When decreasing the degree instances already running are NOT stopped.
93 void Task::set_parallelism_degree(int n, std::string instance)
95 xbt_assert(n > 0, "Parallelism degree must be above 0.");
96 simgrid::kernel::actor::simcall_answered([this, n, &instance] {
97 if (instance == "all") {
98 for (auto& [key, value] : parallelism_degree_) {
99 parallelism_degree_[key] = n;
100 while (ready_to_run(key))
104 parallelism_degree_[instance] = n;
105 while (ready_to_run(instance))
111 /** @param bytes The internal bytes of the Task instance.
112 * @param instance The Task instance to modify.
113 * @note Internal bytes are used for Comms between the dispatcher and instance_n,
114 * and between instance_n and the collector if they are not on the same host.
116 void Task::set_internal_bytes(int bytes, std::string instance)
118 simgrid::kernel::actor::simcall_answered([this, bytes, &instance] { internal_bytes_to_send_[instance] = bytes; });
121 /** @param func The load balancing function.
122 * @note The dispatcher uses this function to determine which instance to trigger next.
124 void Task::set_load_balancing_function(std::function<std::string()> func)
126 simgrid::kernel::actor::simcall_answered([this, func] { load_balancing_function_ = func; });
129 /** @param n The number of firings to enqueue.
131 void Task::enqueue_firings(int n)
133 simgrid::kernel::actor::simcall_answered([this, n] {
134 queued_firings_["dispatcher"] += n;
135 while (ready_to_run("dispatcher"))
140 /** @param name The new name to set.
141 * @brief Set the name of the Task.
143 void Task::set_name(std::string name)
148 /** @param amount The amount to set.
149 * @param instance The Task instance to modify.
150 * @note Amount in flop for ExecTask and in bytes for CommTask.
152 void Task::set_amount(double amount, std::string instance)
154 simgrid::kernel::actor::simcall_answered([this, amount, &instance] { amount_[instance] = amount; });
157 /** @param token The token to set.
158 * @brief Set the token to send to successors.
159 * @note The token is passed to each successor after the Task instance collector end, i.e., after the on_completion
162 void Task::set_token(std::shared_ptr<Token> token)
164 simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
167 /** @param t The Task to deque a token from.
169 void Task::deque_token_from(TaskPtr t)
171 simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_[t].pop_front(); });
174 void Task::fire(std::string instance)
176 if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) {
177 current_activities_[instance].pop_front();
179 if (instance != "dispatcher" and instance != "collector") {
183 running_instances_[instance]++;
184 queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0);
187 /** @param successor The Task to add as a successor.
188 * @note It also adds this as a predecessor of successor.
190 void Task::add_successor(TaskPtr successor)
192 simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
193 successors_.insert(successor_p);
194 successor_p->predecessors_.try_emplace(this, 0);
198 /** @param successor The Task to remove from the successors of this Task.
199 * @note It also remove this from the predecessors of successor.
201 void Task::remove_successor(TaskPtr successor)
203 simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
204 successor_p->predecessors_.erase(this);
205 successors_.erase(successor_p);
209 /** @brief Remove all successors from this Task.
211 void Task::remove_all_successors()
213 simgrid::kernel::actor::simcall_answered([this] {
214 while (not successors_.empty()) {
215 auto* successor = *(successors_.begin());
216 successor->predecessors_.erase(this);
217 successors_.erase(successor);
222 /** @param n The number of instances to add to this Task (>=0).
223 * @note Instances goes always from instance_0 to instance_x,
224 * where x is the current number of instance.
226 void Task::add_instances(int n)
228 xbt_assert(n >= 0, "Cannot add a negative number of instances (provided: %d)", n);
229 int instance_count = (int)amount_.size() - 2;
230 for (int i = instance_count; i < n + instance_count; i++) {
231 amount_["instance_" + std::to_string(i)] = amount_.at("instance_0");
232 queued_firings_["instance_" + std::to_string(i)] = 0;
233 running_instances_["instance_" + std::to_string(i)] = 0;
234 count_["instance_" + std::to_string(i)] = 0;
235 parallelism_degree_["instance_" + std::to_string(i)] = parallelism_degree_.at("instance_0");
236 current_activities_["instance_" + std::to_string(i)] = {};
237 internal_bytes_to_send_["instance_" + std::to_string(i)] = internal_bytes_to_send_.at("instance_0");
242 /** @param n The number of instances to remove from this Task (>=0).
243 * @note Instances goes always from instance_0 to instance_x,
244 * where x is the current number of instance.
245 * Running instances cannot be removed.
247 void Task::remove_instances(int n)
249 int instance_count = (int)amount_.size() - 2;
250 xbt_assert(n >= 0, "Cannot remove a negative number of instances (provided: %d)", n);
251 xbt_assert(instance_count - n > 0, "The number of instances must be above 0 (instances: %d, provided: %d)",
253 for (int i = instance_count - 1; i >= instance_count - n; i--) {
254 xbt_assert(running_instances_.at("instance_" + std::to_string(i)) == 0,
255 "Cannot remove a running instance (instances: %d)", i);
256 amount_.erase("instance_" + std::to_string(i));
257 queued_firings_.erase("instance_" + std::to_string(i));
258 running_instances_.erase("instance_" + std::to_string(i));
259 count_.erase("instance_" + std::to_string(i));
260 parallelism_degree_.erase("instance_" + std::to_string(i));
261 current_activities_.erase("instance_" + std::to_string(i));
266 * @brief Default constructor.
268 ExecTask::ExecTask(const std::string& name) : Task(name)
270 set_load_balancing_function([]() { return "instance_0"; });
274 * @brief Smart Constructor.
276 ExecTaskPtr ExecTask::init(const std::string& name)
278 return ExecTaskPtr(new ExecTask(name));
282 * @brief Smart Constructor.
284 ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
286 return init(name)->set_flops(flops)->set_host(host);
289 /** @param instance The Task instance to fire.
290 * @note Only the dispatcher instance triggers the on_start signal.
291 * Comms are created if hosts differ between dispatcher and the instance to fire,
292 * or between the instance and the collector.
294 void ExecTask::fire(std::string instance)
296 Task::fire(instance);
297 if (instance == "dispatcher" or instance == "collector") {
298 auto exec = Exec::init()
299 ->set_name(get_name() + "_" + instance)
300 ->set_flops_amount(get_amount(instance))
301 ->set_host(host_[instance]);
303 exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
304 store_activity(exec, instance);
306 auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_[instance]);
307 if (host_["dispatcher"] == host_[instance]) {
309 store_activity(exec, instance);
311 auto comm = Comm::sendto_init(host_["dispatcher"], host_[instance])
312 ->set_name(get_name() + "_dispatcher_to_" + instance)
313 ->set_payload_size(get_internal_bytes("dispatcher"));
314 comm->add_successor(exec);
316 store_activity(comm, instance);
318 if (host_[instance] == host_["collector"]) {
319 exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
320 if (host_["dispatcher"] != host_[instance])
321 store_activity(exec, instance);
323 auto comm = Comm::sendto_init(host_[instance], host_["collector"])
324 ->set_name(get_name() + instance + "_to_collector")
325 ->set_payload_size(get_internal_bytes(instance));
326 exec->add_successor(comm);
327 comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
333 /** @param host The host to set.
334 * @param instance The Task instance to modify.
335 * @brief Set a new host.
337 ExecTaskPtr ExecTask::set_host(Host* host, std::string instance)
339 kernel::actor::simcall_answered([this, host, &instance] {
340 if (instance == "all")
341 for (auto& [key, value] : host_)
344 host_[instance] = host;
349 /** @param flops The amount of flops to set.
350 * @param instance The Task instance to modify.
352 ExecTaskPtr ExecTask::set_flops(double flops, std::string instance)
354 kernel::actor::simcall_answered([this, flops, &instance] { set_amount(flops, instance); });
358 /** @param n The number of instances to add to this Task (>=0).
359 @note Instances goes always from instance_0 to instance_x,
360 where x is the current number of instance.
362 void ExecTask::add_instances(int n)
364 Task::add_instances(n);
365 int instance_count = (int)host_.size() - 2;
366 for (int i = instance_count; i < n + instance_count; i++)
367 host_["instance_" + std::to_string(i)] = host_.at("instance_0");
370 /** @param n The number of instances to remove from this Task (>=0).
371 @note Instances goes always from instance_0 to instance_x,
372 where x is the current number of instance.
373 Running instance cannot be removed.
375 void ExecTask::remove_instances(int n)
377 Task::remove_instances(n);
378 int instance_count = (int)host_.size() - 2;
379 for (int i = instance_count - 1; i >= instance_count - n; i--)
380 host_.erase("instance_" + std::to_string(i));
384 * @brief Default constructor.
386 CommTask::CommTask(const std::string& name) : Task(name)
388 set_load_balancing_function([]() { return "instance_0"; });
392 * @brief Smart constructor.
394 CommTaskPtr CommTask::init(const std::string& name)
396 return CommTaskPtr(new CommTask(name));
400 * @brief Smart constructor.
402 CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
404 return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
407 /** @param instance The Task instance to fire.
408 * @note Only the dispatcher instance triggers the on_start signal.
410 void CommTask::fire(std::string instance)
412 Task::fire(instance);
413 if (instance == "dispatcher" or instance == "collector") {
414 auto exec = Exec::init()
415 ->set_name(get_name() + "_" + instance)
416 ->set_flops_amount(get_amount(instance))
417 ->set_host(instance == "dispatcher" ? source_ : destination_);
419 exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
420 store_activity(exec, instance);
422 auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
424 comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
425 store_activity(comm, instance);
430 * @param source The host to set.
431 * @brief Set a new source host.
433 CommTaskPtr CommTask::set_source(Host* source)
435 kernel::actor::simcall_answered([this, source] { source_ = source; });
440 * @param destination The host to set.
441 * @brief Set a new destination host.
443 CommTaskPtr CommTask::set_destination(Host* destination)
445 kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
450 * @param bytes The amount of bytes to set.
452 CommTaskPtr CommTask::set_bytes(double bytes)
454 kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
459 * @brief Default constructor.
461 IoTask::IoTask(const std::string& name) : Task(name)
463 set_load_balancing_function([]() { return "instance_0"; });
467 * @brief Smart Constructor.
469 IoTaskPtr IoTask::init(const std::string& name)
471 return IoTaskPtr(new IoTask(name));
475 * @brief Smart Constructor.
477 IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
479 return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
483 * @param disk The disk to set.
485 IoTaskPtr IoTask::set_disk(Disk* disk)
487 kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
492 * @param bytes The amount of bytes to set.
494 IoTaskPtr IoTask::set_bytes(double bytes)
496 kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
501 * @param type The op type to set.
503 IoTaskPtr IoTask::set_op_type(Io::OpType type)
505 kernel::actor::simcall_answered([this, type] { type_ = type; });
509 /** @param instance The Task instance to fire.
510 * @note Only the dispatcher instance triggers the on_start signal.
512 void IoTask::fire(std::string instance)
514 Task::fire(instance);
515 if (instance == "dispatcher" or instance == "collector") {
516 auto exec = Exec::init()
517 ->set_name(get_name() + "_" + instance)
518 ->set_flops_amount(get_amount(instance))
519 ->set_host(disk_->get_host());
521 exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
522 store_activity(exec, instance);
524 auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
526 io->on_this_completion_cb([this, instance](Io const&) { complete(instance); });
527 store_activity(io, instance);
530 } // namespace simgrid::s4u