1 /* Copyright (c) 2009-2023. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 /* simple test to schedule a DAX file with the Min-Min algorithm. */
8 #include <simgrid/host.h>
9 #include <simgrid/s4u.hpp>
12 XBT_LOG_NEW_DEFAULT_CATEGORY(dag_scheduling, "Logging specific to this example");
13 namespace sg4 = simgrid::s4u;
15 struct HostAttribute {
16 /* Earliest time at which a host is ready to execute a task */
17 double available_at = 0.0;
18 sg4::Exec* last_scheduled_task = nullptr;
21 static double sg_host_get_available_at(const sg4::Host* host)
23 return host->get_data<HostAttribute>()->available_at;
26 static void sg_host_set_available_at(const sg4::Host* host, double time)
28 host->get_data<HostAttribute>()->available_at = time;
31 static sg4::Exec* sg_host_get_last_scheduled_task(const sg4::Host* host)
33 return host->get_data<HostAttribute>()->last_scheduled_task;
36 static void sg_host_set_last_scheduled_task(const sg4::Host* host, sg4::ExecPtr task)
38 host->get_data<HostAttribute>()->last_scheduled_task = task.get();
41 static bool dependency_exists(const sg4::Exec* src, sg4::Exec* dst)
43 const auto& dependencies = src->get_dependencies();
44 const auto& successors = src->get_successors();
45 return (std::find(successors.begin(), successors.end(), dst) != successors.end() ||
46 dependencies.find(dst) != dependencies.end());
49 static std::vector<sg4::Exec*> get_ready_tasks(const std::vector<sg4::ActivityPtr>& dax)
51 std::vector<sg4::Exec*> ready_tasks;
52 std::map<sg4::Exec*, unsigned int> candidate_execs;
55 // Only look at activity that have their dependencies solved but are not assigned
56 if (a->dependencies_solved() && not a->is_assigned()) {
57 // if it is an exec, it's ready
58 if (auto* exec = dynamic_cast<sg4::Exec*>(a.get()))
59 ready_tasks.push_back(exec);
60 // if it a comm, we consider its successor as a candidate. If a candidate solves all its dependencies,
61 // i.e., get all its input data, it's ready
62 if (const auto* comm = dynamic_cast<sg4::Comm*>(a.get())) {
63 auto* next_exec = static_cast<sg4::Exec*>(comm->get_successors().front().get());
64 candidate_execs[next_exec]++;
65 if (next_exec->get_dependencies().size() == candidate_execs[next_exec])
66 ready_tasks.push_back(next_exec);
70 XBT_DEBUG("There are %zu ready tasks", ready_tasks.size());
74 static double finish_on_at(const sg4::ExecPtr task, const sg4::Host* host)
76 double data_available = 0.;
77 double last_data_available = -1.0;
78 /* compute last_data_available */
79 for (const auto& parent : task->get_dependencies()) {
81 if (const auto* comm = dynamic_cast<sg4::Comm*>(parent.get())) {
82 auto source = comm->get_source();
83 XBT_DEBUG("transfer from %s to %s", source->get_cname(), host->get_cname());
84 /* Estimate the redistribution time from this parent */
86 if (comm->get_remaining() <= 1e-6) {
90 sg_host_get_route_latency(source, host) + comm->get_remaining() / sg_host_get_route_bandwidth(source, host);
92 // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
94 data_available = *comm->get_data<double>() + redist_time;
97 /* no transfer, control dependency */
98 if (const auto* exec = dynamic_cast<sg4::Exec*>(parent.get()))
99 data_available = exec->get_finish_time();
101 if (last_data_available < data_available)
102 last_data_available = data_available;
105 return std::max(sg_host_get_available_at(host), last_data_available) + task->get_remaining() / host->get_speed();
108 static sg4::Host* get_best_host(const sg4::ExecPtr exec)
110 std::vector<sg4::Host*> hosts = sg4::Engine::get_instance()->get_all_hosts();
111 auto best_host = hosts.front();
112 double min_EFT = finish_on_at(exec, best_host);
114 for (const auto& h : hosts) {
115 double EFT = finish_on_at(exec, h);
116 XBT_DEBUG("%s finishes on %s at %f", exec->get_cname(), h->get_cname(), EFT);
126 static void schedule_on(sg4::ExecPtr exec, sg4::Host* host)
128 exec->set_host(host);
129 // we can also set the destination of all the input comms of this exec
130 for (const auto& pred : exec->get_dependencies()) {
131 auto* comm = dynamic_cast<sg4::Comm*>(pred.get());
132 if (comm != nullptr) {
133 comm->set_destination(host);
134 delete comm->get_data<double>();
137 // we can also set the source of all the output comms of this exec
138 for (const auto& succ : exec->get_successors()) {
139 auto* comm = dynamic_cast<sg4::Comm*>(succ.get());
141 comm->set_source(host);
145 int main(int argc, char** argv)
147 sg4::Engine e(&argc, argv);
148 std::set<sg4::Activity*> vetoed;
149 e.track_vetoed_activities(&vetoed);
151 sg4::Activity::on_completion_cb([](sg4::Activity const& activity) {
152 // when an Exec completes, we need to set the potential start time of all its ouput comms
153 const auto* exec = dynamic_cast<sg4::Exec const*>(&activity);
154 if (exec == nullptr) // Only Execs are concerned here
156 for (const auto& succ : exec->get_successors()) {
157 auto* comm = dynamic_cast<sg4::Comm*>(succ.get());
158 if (comm != nullptr) {
159 auto* finish_time = new double(exec->get_finish_time());
160 // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
162 comm->set_data(finish_time);
167 e.load_platform(argv[1]);
169 /* Allocating the host attribute */
170 unsigned long total_nhosts = e.get_host_count();
171 const auto hosts = e.get_all_hosts();
172 std::vector<HostAttribute> host_attributes(total_nhosts);
173 for (unsigned long i = 0; i < total_nhosts; i++)
174 hosts[i]->set_data(&host_attributes[i]);
176 /* load the DAX file */
177 auto dax = sg4::create_DAG_from_DAX(argv[2]);
179 /* Schedule the root first */
180 auto* root = static_cast<sg4::Exec*>(dax.front().get());
181 auto host = get_best_host(root);
182 schedule_on(root, host);
186 while (not vetoed.empty()) {
187 XBT_DEBUG("Start new scheduling round");
188 /* Get the set of ready tasks */
189 auto ready_tasks = get_ready_tasks(dax);
192 if (ready_tasks.empty()) {
193 /* there is no ready task, let advance the simulation */
197 /* For each ready task:
198 * get the host that minimizes the completion time.
199 * select the task that has the minimum completion time on its best host.
201 double min_finish_time = -1.0;
202 sg4::Exec* selected_task = nullptr;
203 sg4::Host* selected_host = nullptr;
205 for (auto task : ready_tasks) {
206 XBT_DEBUG("%s is ready", task->get_cname());
207 host = get_best_host(task);
208 double finish_time = finish_on_at(task, host);
209 if (min_finish_time < 0 || finish_time < min_finish_time) {
210 min_finish_time = finish_time;
211 selected_task = task;
212 selected_host = host;
216 XBT_INFO("Schedule %s on %s", selected_task->get_cname(), selected_host->get_cname());
217 schedule_on(selected_task, selected_host);
220 * tasks can be executed concurrently when they can by default.
221 * Yet schedulers take decisions assuming that tasks wait for resource availability to start.
222 * The solution (well crude hack is to keep track of the last task scheduled on a host and add a special type of
223 * dependency if needed to force the sequential execution meant by the scheduler.
224 * If the last scheduled task is already done, has failed or is a predecessor of the current task, no need for a
228 if (auto last_scheduled_task = sg_host_get_last_scheduled_task(selected_host);
229 last_scheduled_task && (last_scheduled_task->get_state() != sg4::Activity::State::FINISHED) &&
230 (last_scheduled_task->get_state() != sg4::Activity::State::FAILED) &&
231 not dependency_exists(sg_host_get_last_scheduled_task(selected_host), selected_task))
232 last_scheduled_task->add_successor(selected_task);
234 sg_host_set_last_scheduled_task(selected_host, selected_task);
235 sg_host_set_available_at(selected_host, min_finish_time);
241 XBT_INFO("Simulation Time: %f", simgrid_get_clock());