+ running_instances_[instance] = running_instances_[instance] - 1;
+ count_[instance] = count_[instance] + 1;
+ if (instance == "collector") {
+ // XBT_INFO("Trigger on completion: %s - %s", get_cname(), instance.c_str());
+ on_this_completion(this);
+ on_completion(this);
+ for (auto const& t : successors_)
+ t->receive(this);
+ } else if (instance == "dispatcher") {
+ auto next_instance = load_balancing_function_();
+ xbt_assert(next_instance != "dispatcher" and next_instance != "collector", "Invalid instance selected: %s",
+ next_instance.c_str());
+ queued_firings_[next_instance] = queued_firings_.at(next_instance) + 1;
+ while (ready_to_run(next_instance))
+ fire(next_instance);
+ } else {
+ queued_firings_["collector"] = queued_firings_["collector"] + 1;
+ while (ready_to_run("collector"))
+ fire("collector");
+ }
+ if (ready_to_run(instance))
+ fire(instance);
+}
+
+/** @param n The new parallelism degree of the Task.
+ * @brief Set the parallelism degree of the Task to inscrease or decrease horizontal scaling.
+ * @note When increasing the degree the function starts new instances if there is queued firings.
+ * When decreasing the degree the function does NOT stop running instances.
+ */
+void Task::set_parallelism_degree(int n, std::string instance)
+{
+ xbt_assert(n > 0, "Parallelism degree must be above 0.");
+ simgrid::kernel::actor::simcall_answered([this, n, &instance] {
+ if (instance == "all") {
+ for (auto& [key, value] : parallelism_degree_) {
+ parallelism_degree_[key] = n;
+ while (ready_to_run(key))
+ fire(key);
+ }
+ } else {
+ parallelism_degree_[instance] = n;
+ while (ready_to_run(instance))
+ fire(instance);
+ }
+ });
+}
+
+void Task::set_internal_bytes(int bytes, std::string instance)
+{
+ simgrid::kernel::actor::simcall_answered([this, bytes, &instance] { internal_bytes_to_send_[instance] = bytes; });
+}
+
+void Task::set_load_balancing_function(std::function<std::string()> func)
+{
+ simgrid::kernel::actor::simcall_answered([this, func] { load_balancing_function_ = func; });