+ running_instances_[instance]--;
+ count_[instance]++;
+ if (instance == "collector") {
+ on_this_completion(this);
+ on_completion(this);
+ for (auto const& t : successors_)
+ t->receive(this);
+ } else if (instance == "dispatcher") {
+ auto next_instance = load_balancing_function_();
+ xbt_assert(next_instance != "dispatcher" and next_instance != "collector", "Invalid instance selected: %s",
+ next_instance.c_str());
+ queued_firings_[next_instance] = queued_firings_.at(next_instance) + 1;
+ while (ready_to_run(next_instance))
+ fire(next_instance);
+ } else {
+ queued_firings_["collector"]++;
+ while (ready_to_run("collector"))
+ fire("collector");
+ }
+ if (ready_to_run(instance))
+ fire(instance);
+}
+
+/** @param n The new parallelism degree of the Task instance.
+ * @param instance The Task instance to modify.
+ * @note You can use instance "all" to modify the parallelism degree of all instances of this Task.
+ * When increasing the degree new executions are started if there is queued firings.
+ * When decreasing the degree instances already running are NOT stopped.
+ */
+void Task::set_parallelism_degree(int n, std::string instance)
+{
+ xbt_assert(n > 0, "Parallelism degree must be above 0.");
+ simgrid::kernel::actor::simcall_answered([this, n, &instance] {
+ if (instance == "all") {
+ for (auto& [key, value] : parallelism_degree_) {
+ parallelism_degree_[key] = n;
+ while (ready_to_run(key))
+ fire(key);
+ }
+ } else {
+ parallelism_degree_[instance] = n;
+ while (ready_to_run(instance))
+ fire(instance);
+ }
+ });
+}
+
+/** @param bytes The internal bytes of the Task instance.
+ * @param instance The Task instance to modify.
+ * @note Internal bytes are used for Comms between the dispatcher and instance_n,
+ * and between instance_n and the collector if they are not on the same host.
+ */
+void Task::set_internal_bytes(int bytes, std::string instance)
+{
+ simgrid::kernel::actor::simcall_answered([this, bytes, &instance] { internal_bytes_to_send_[instance] = bytes; });
+}
+
+/** @param func The load balancing function.
+ * @note The dispatcher uses this function to determine which instance to trigger next.
+ */
+void Task::set_load_balancing_function(std::function<std::string()> func)
+{
+ simgrid::kernel::actor::simcall_answered([this, func] { load_balancing_function_ = func; });